(no commit message)
[oweals/gnunet.git] / src / datastore / datastore_api.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/datastore_api.c
23  * @brief Management for the datastore for files stored on a GNUnet node.  Implements
24  *        a priority queue for requests (with timeouts).
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet_arm_service.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_datastore_service.h"
31 #include "gnunet_statistics_service.h"
32 #include "datastore.h"
33
34 /**
35  * If a client stopped asking for more results, how many more do
36  * we receive from the DB before killing the connection?  Trade-off
37  * between re-doing TCP handshakes and (needlessly) receiving 
38  * useless results.
39  */
40 #define MAX_EXCESS_RESULTS 8
41
42 /**
43  * Context for processing status messages.
44  */
45 struct StatusContext
46 {
47   /**
48    * Continuation to call with the status.
49    */
50   GNUNET_DATASTORE_ContinuationWithStatus cont;
51
52   /**
53    * Closure for cont.
54    */
55   void *cont_cls;
56
57 };
58
59
60 /**
61  * Context for processing result messages.
62  */
63 struct ResultContext
64 {
65   /**
66    * Iterator to call with the result.
67    */
68   GNUNET_DATASTORE_Iterator iter;
69
70   /**
71    * Closure for iter.
72    */
73   void *iter_cls;
74
75 };
76
77
78 /**
79  *  Context for a queue operation.
80  */
81 union QueueContext
82 {
83
84   struct StatusContext sc;
85   
86   struct ResultContext rc;
87
88 };
89
90
91
92 /**
93  * Entry in our priority queue.
94  */
95 struct GNUNET_DATASTORE_QueueEntry
96 {
97
98   /**
99    * This is a linked list.
100    */
101   struct GNUNET_DATASTORE_QueueEntry *next;
102
103   /**
104    * This is a linked list.
105    */
106   struct GNUNET_DATASTORE_QueueEntry *prev;
107
108   /**
109    * Handle to the master context.
110    */
111   struct GNUNET_DATASTORE_Handle *h;
112
113   /**
114    * Response processor (NULL if we are not waiting for a response).
115    * This struct should be used for the closure, function-specific
116    * arguments can be passed via 'qc'.
117    */
118   GNUNET_CLIENT_MessageHandler response_proc;
119
120   /**
121    * Function to call after transmission of the request.
122    */
123   GNUNET_DATASTORE_ContinuationWithStatus cont;
124    
125   /**
126    * Closure for 'cont'.
127    */
128   void *cont_cls;
129
130   /**
131    * Context for the operation.
132    */
133   union QueueContext qc;
134
135   /**
136    * Task for timeout signalling.
137    */
138   GNUNET_SCHEDULER_TaskIdentifier task;
139
140   /**
141    * Timeout for the current operation.
142    */
143   struct GNUNET_TIME_Absolute timeout;
144
145   /**
146    * Priority in the queue.
147    */
148   unsigned int priority;
149
150   /**
151    * Maximum allowed length of queue (otherwise
152    * this request should be discarded).
153    */
154   unsigned int max_queue;
155
156   /**
157    * Number of bytes in the request message following
158    * this struct.  32-bit value for nicer memory
159    * access (and overall struct alignment).
160    */
161   uint32_t message_size;
162
163   /**
164    * Has this message been transmitted to the service?
165    * Only ever GNUNET_YES for the head of the queue.
166    * Note that the overall struct should end at a 
167    * multiple of 64 bits.
168    */
169   int32_t was_transmitted;
170   
171 };
172
173 /**
174  * Handle to the datastore service. 
175  */
176 struct GNUNET_DATASTORE_Handle
177 {
178
179   /**
180    * Our configuration.
181    */
182   const struct GNUNET_CONFIGURATION_Handle *cfg;
183
184
185   /**
186    * Current connection to the datastore service.
187    */
188   struct GNUNET_CLIENT_Connection *client;
189
190   /**
191    * Handle for statistics.
192    */
193   struct GNUNET_STATISTICS_Handle *stats;
194
195   /**
196    * Current transmit handle.
197    */
198   struct GNUNET_CLIENT_TransmitHandle *th;
199
200   /**
201    * Current head of priority queue.
202    */
203   struct GNUNET_DATASTORE_QueueEntry *queue_head;
204
205   /**
206    * Current tail of priority queue.
207    */
208   struct GNUNET_DATASTORE_QueueEntry *queue_tail;
209
210   /**
211    * Task for trying to reconnect.
212    */
213   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
214
215   /**
216    * How quickly should we retry?  Used for exponential back-off on
217    * connect-errors.
218    */
219   struct GNUNET_TIME_Relative retry_time;
220
221   /**
222    * Number of entries in the queue.
223    */
224   unsigned int queue_size;
225
226   /**
227    * Number of results we're receiving for the current query
228    * after application stopped to care.  Used to determine when
229    * to reset the connection.
230    */
231   unsigned int result_count;
232
233   /**
234    * Are we currently trying to receive from the service?
235    */
236   int in_receive;
237
238 };
239
240
241
242 /**
243  * Connect to the datastore service.
244  *
245  * @param cfg configuration to use
246  * @return handle to use to access the service
247  */
248 struct GNUNET_DATASTORE_Handle *
249 GNUNET_DATASTORE_connect (const struct
250                           GNUNET_CONFIGURATION_Handle
251                           *cfg)
252 {
253   struct GNUNET_CLIENT_Connection *c;
254   struct GNUNET_DATASTORE_Handle *h;
255   
256   c = GNUNET_CLIENT_connect ("datastore", cfg);
257   if (c == NULL)
258     return NULL; /* oops */
259   h = GNUNET_malloc (sizeof(struct GNUNET_DATASTORE_Handle) + 
260                      GNUNET_SERVER_MAX_MESSAGE_SIZE - 1);
261   h->client = c;
262   h->cfg = cfg;
263   h->stats = GNUNET_STATISTICS_create ("datastore-api",
264                                        cfg);
265   return h;
266 }
267
268
269 /**
270  * Transmit DROP message to datastore service.
271  *
272  * @param cls the 'struct GNUNET_DATASTORE_Handle'
273  * @param size number of bytes that can be copied to buf
274  * @param buf where to copy the drop message
275  * @return number of bytes written to buf
276  */
277 static size_t
278 transmit_drop (void *cls,
279                size_t size, 
280                void *buf)
281 {
282   struct GNUNET_DATASTORE_Handle *h = cls;
283   struct GNUNET_MessageHeader *hdr;
284   
285   if (buf == NULL)
286     {
287       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
288                   _("Failed to transmit request to drop database.\n"));
289       GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
290       return 0;
291     }
292   GNUNET_assert (size >= sizeof(struct GNUNET_MessageHeader));
293   hdr = buf;
294   hdr->size = htons(sizeof(struct GNUNET_MessageHeader));
295   hdr->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
296   GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
297   return sizeof(struct GNUNET_MessageHeader);
298 }
299
300
301 /**
302  * Disconnect from the datastore service (and free
303  * associated resources).
304  *
305  * @param h handle to the datastore
306  * @param drop set to GNUNET_YES to delete all data in datastore (!)
307  */
308 void GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
309                                   int drop)
310 {
311   struct GNUNET_DATASTORE_QueueEntry *qe;
312
313   if (h->client != NULL)
314     {
315       GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
316       h->client = NULL;
317     }
318   if (h->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
319     {
320       GNUNET_SCHEDULER_cancel (h->reconnect_task);
321       h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
322     }
323   while (NULL != (qe = h->queue_head))
324     {
325       GNUNET_assert (NULL != qe->response_proc);
326       qe->response_proc (qe, NULL);
327     }
328   if (GNUNET_YES == drop) 
329     {
330       h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
331       if (h->client != NULL)
332         {
333           if (NULL != 
334               GNUNET_CLIENT_notify_transmit_ready (h->client,
335                                                    sizeof(struct GNUNET_MessageHeader),
336                                                    GNUNET_TIME_UNIT_MINUTES,
337                                                    GNUNET_YES,
338                                                    &transmit_drop,
339                                                    h))
340             return;
341           GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
342         }
343       GNUNET_break (0);
344     }
345   GNUNET_STATISTICS_destroy (h->stats,
346                              GNUNET_NO);
347   GNUNET_free (h);
348 }
349
350
351 /**
352  * A request has timed out (before being transmitted to the service).
353  *
354  * @param cls the 'struct GNUNET_DATASTORE_QueueEntry'
355  * @param tc scheduler context
356  */
357 static void
358 timeout_queue_entry (void *cls,
359                      const struct GNUNET_SCHEDULER_TaskContext *tc)
360 {
361   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
362
363   GNUNET_STATISTICS_update (qe->h->stats,
364                             gettext_noop ("# queue entry timeouts"),
365                             1,
366                             GNUNET_NO);
367   qe->task = GNUNET_SCHEDULER_NO_TASK;
368   GNUNET_assert (qe->was_transmitted == GNUNET_NO);
369   qe->response_proc (qe, NULL);
370 }
371
372
373 /**
374  * Create a new entry for our priority queue (and possibly discard other entires if
375  * the queue is getting too long).
376  *
377  * @param h handle to the datastore
378  * @param msize size of the message to queue
379  * @param queue_priority priority of the entry
380  * @param max_queue_size at what queue size should this request be dropped
381  *        (if other requests of higher priority are in the queue)
382  * @param timeout timeout for the operation
383  * @param response_proc function to call with replies (can be NULL)
384  * @param qc client context (NOT a closure for response_proc)
385  * @return NULL if the queue is full (and this entry was dropped)
386  */
387 static struct GNUNET_DATASTORE_QueueEntry *
388 make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
389                   size_t msize,
390                   unsigned int queue_priority,
391                   unsigned int max_queue_size,
392                   struct GNUNET_TIME_Relative timeout,
393                   GNUNET_CLIENT_MessageHandler response_proc,            
394                   const union QueueContext *qc)
395 {
396   struct GNUNET_DATASTORE_QueueEntry *ret;
397   struct GNUNET_DATASTORE_QueueEntry *pos;
398   unsigned int c;
399
400   c = 0;
401   pos = h->queue_head;
402   while ( (pos != NULL) &&
403           (c < max_queue_size) &&
404           (pos->priority >= queue_priority) )
405     {
406       c++;
407       pos = pos->next;
408     }
409   ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize);
410   ret->h = h;
411   ret->response_proc = response_proc;
412   ret->qc = *qc;
413   ret->timeout = GNUNET_TIME_relative_to_absolute (timeout);
414   ret->priority = queue_priority;
415   ret->max_queue = max_queue_size;
416   ret->message_size = msize;
417   ret->was_transmitted = GNUNET_NO;
418   if (pos == NULL)
419     {
420       /* append at the tail */
421       pos = h->queue_tail;
422     }
423   else
424     {
425       pos = pos->prev; 
426       /* do not insert at HEAD if HEAD query was already
427          transmitted and we are still receiving replies! */
428       if ( (pos == NULL) &&
429            (h->queue_head->was_transmitted) )
430         pos = h->queue_head;
431     }
432   c++;
433   GNUNET_STATISTICS_update (h->stats,
434                             gettext_noop ("# queue entries created"),
435                             1,
436                             GNUNET_NO);
437   GNUNET_CONTAINER_DLL_insert_after (h->queue_head,
438                                      h->queue_tail,
439                                      pos,
440                                      ret);
441   h->queue_size++;
442   if (c > max_queue_size)
443     {
444       GNUNET_STATISTICS_update (h->stats,
445                                 gettext_noop ("# queue overflows"),
446                                 1,
447                                 GNUNET_NO);
448       response_proc (ret, NULL);
449       return NULL;
450     }
451   ret->task = GNUNET_SCHEDULER_add_delayed (timeout,
452                                             &timeout_queue_entry,
453                                             ret);
454   pos = ret->next;
455   while (pos != NULL) 
456     {
457       if (pos->max_queue < h->queue_size)
458         {
459           GNUNET_assert (pos->response_proc != NULL);
460           pos->response_proc (pos, NULL);
461           break;
462         }
463       pos = pos->next;
464     }
465   return ret;
466 }
467
468
469 /**
470  * Process entries in the queue (or do nothing if we are already
471  * doing so).
472  * 
473  * @param h handle to the datastore
474  */
475 static void
476 process_queue (struct GNUNET_DATASTORE_Handle *h);
477
478
479 /**
480  * Try reconnecting to the datastore service.
481  *
482  * @param cls the 'struct GNUNET_DATASTORE_Handle'
483  * @param tc scheduler context
484  */
485 static void
486 try_reconnect (void *cls,
487                const struct GNUNET_SCHEDULER_TaskContext *tc)
488 {
489   struct GNUNET_DATASTORE_Handle *h = cls;
490
491   if (h->retry_time.rel_value < GNUNET_CONSTANTS_SERVICE_RETRY.rel_value)
492     h->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY;
493   else
494     h->retry_time = GNUNET_TIME_relative_multiply (h->retry_time, 2);
495   if (h->retry_time.rel_value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.rel_value)
496     h->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
497   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
498   h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
499   if (h->client == NULL)
500     {
501       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
502                   "DATASTORE reconnect failed (fatally)\n");
503       return;
504     }
505   GNUNET_STATISTICS_update (h->stats,
506                             gettext_noop ("# datastore connections (re)created"),
507                             1,
508                             GNUNET_NO);
509 #if DEBUG_DATASTORE
510   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
511               "Reconnected to DATASTORE\n");
512 #endif
513   process_queue (h);
514 }
515
516
517 /**
518  * Disconnect from the service and then try reconnecting to the datastore service
519  * after some delay.
520  *
521  * @param h handle to datastore to disconnect and reconnect
522  */
523 static void
524 do_disconnect (struct GNUNET_DATASTORE_Handle *h)
525 {
526   if (h->client == NULL)
527     {
528 #if DEBUG_DATASTORE
529       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
530                   "client NULL in disconnect, will not try to reconnect\n");
531 #endif
532       return;
533     }
534 #if 0
535   GNUNET_STATISTICS_update (stats,
536                             gettext_noop ("# reconnected to DATASTORE"),
537                             1,
538                             GNUNET_NO);
539 #endif
540   GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
541   h->client = NULL;
542   h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->retry_time,
543                                                     &try_reconnect,
544                                                     h);      
545 }
546
547
548 /**
549  * Transmit request from queue to datastore service.
550  *
551  * @param cls the 'struct GNUNET_DATASTORE_Handle'
552  * @param size number of bytes that can be copied to buf
553  * @param buf where to copy the drop message
554  * @return number of bytes written to buf
555  */
556 static size_t
557 transmit_request (void *cls,
558                   size_t size, 
559                   void *buf)
560 {
561   struct GNUNET_DATASTORE_Handle *h = cls;
562   struct GNUNET_DATASTORE_QueueEntry *qe;
563   size_t msize;
564
565   h->th = NULL;
566   if (NULL == (qe = h->queue_head))
567     return 0; /* no entry in queue */
568   if (buf == NULL)
569     {
570       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
571                   _("Failed to transmit request to DATASTORE.\n"));
572       GNUNET_STATISTICS_update (h->stats,
573                                 gettext_noop ("# transmission request failures"),
574                                 1,
575                                 GNUNET_NO);
576       do_disconnect (h);
577       return 0;
578     }
579   if (size < (msize = qe->message_size))
580     {
581       process_queue (h);
582       return 0;
583     }
584  #if DEBUG_DATASTORE
585   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
586               "Transmitting %u byte request to DATASTORE\n",
587               msize);
588 #endif
589   memcpy (buf, &qe[1], msize);
590   qe->was_transmitted = GNUNET_YES;
591   GNUNET_SCHEDULER_cancel (qe->task);
592   qe->task = GNUNET_SCHEDULER_NO_TASK;
593   h->in_receive = GNUNET_YES;
594   GNUNET_CLIENT_receive (h->client,
595                          qe->response_proc,
596                          qe,
597                          GNUNET_TIME_absolute_get_remaining (qe->timeout));
598   GNUNET_STATISTICS_update (h->stats,
599                             gettext_noop ("# bytes sent to datastore"),
600                             1,
601                             GNUNET_NO);
602   return msize;
603 }
604
605
606 /**
607  * Process entries in the queue (or do nothing if we are already
608  * doing so).
609  * 
610  * @param h handle to the datastore
611  */
612 static void
613 process_queue (struct GNUNET_DATASTORE_Handle *h)
614 {
615   struct GNUNET_DATASTORE_QueueEntry *qe;
616
617   if (NULL == (qe = h->queue_head))
618     {
619 #if DEBUG_DATASTORE > 1
620       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
621                   "Queue empty\n");
622 #endif
623       return; /* no entry in queue */
624     }
625   if (qe->was_transmitted == GNUNET_YES)
626     {
627 #if DEBUG_DATASTORE > 1
628       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
629                   "Head request already transmitted\n");
630 #endif
631       return; /* waiting for replies */
632     }
633   if (h->th != NULL)
634     {
635 #if DEBUG_DATASTORE > 1
636       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
637                   "Pending transmission request\n");
638 #endif
639       return; /* request pending */
640     }
641   if (h->client == NULL)
642     {
643 #if DEBUG_DATASTORE > 1
644       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
645                   "Not connected\n");
646 #endif
647       return; /* waiting for reconnect */
648     }
649 #if DEBUG_DATASTORE
650   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
651               "Queueing %u byte request to DATASTORE\n",
652               qe->message_size);
653 #endif
654   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
655                                                qe->message_size,
656                                                GNUNET_TIME_absolute_get_remaining (qe->timeout),
657                                                GNUNET_YES,
658                                                &transmit_request,
659                                                h);
660 }
661
662
663 /**
664  * Dummy continuation used to do nothing (but be non-zero).
665  *
666  * @param cls closure
667  * @param result result 
668  * @param emsg error message
669  */
670 static void
671 drop_status_cont (void *cls, int result, const char *emsg)
672 {
673   /* do nothing */
674 }
675
676
677 static void
678 free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
679 {
680   struct GNUNET_DATASTORE_Handle *h = qe->h;
681
682   GNUNET_CONTAINER_DLL_remove (h->queue_head,
683                                h->queue_tail,
684                                qe);
685   if (qe->task != GNUNET_SCHEDULER_NO_TASK)
686     {
687       GNUNET_SCHEDULER_cancel (qe->task);
688       qe->task = GNUNET_SCHEDULER_NO_TASK;
689     }
690   h->queue_size--;
691   GNUNET_free (qe);
692 }
693
694 /**
695  * Type of a function to call when we receive a message
696  * from the service.
697  *
698  * @param cls closure
699  * @param msg message received, NULL on timeout or fatal error
700  */
701 static void 
702 process_status_message (void *cls,
703                         const struct
704                         GNUNET_MessageHeader * msg)
705 {
706   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
707   struct GNUNET_DATASTORE_Handle *h = qe->h;
708   struct StatusContext rc = qe->qc.sc;
709   const struct StatusMessage *sm;
710   const char *emsg;
711   int32_t status;
712   int was_transmitted;
713
714   h->in_receive = GNUNET_NO;
715   was_transmitted = qe->was_transmitted;
716   if (msg == NULL)
717     {      
718       free_queue_entry (qe);
719       if (NULL == h->client)
720         return; /* forced disconnect */
721       if (rc.cont != NULL)
722         rc.cont (rc.cont_cls, 
723                  GNUNET_SYSERR,
724                  _("Failed to receive status response from database."));
725       if (was_transmitted == GNUNET_YES)
726         do_disconnect (h);
727       return;
728     }
729   GNUNET_assert (GNUNET_YES == qe->was_transmitted);
730   GNUNET_assert (h->queue_head == qe);
731   free_queue_entry (qe);
732   if ( (ntohs(msg->size) < sizeof(struct StatusMessage)) ||
733        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS) ) 
734     {
735       GNUNET_break (0);
736       h->retry_time = GNUNET_TIME_UNIT_ZERO;
737       do_disconnect (h);
738       if (rc.cont != NULL)
739         rc.cont (rc.cont_cls, 
740                  GNUNET_SYSERR,
741                  _("Error reading response from datastore service"));
742       return;
743     }
744   sm = (const struct StatusMessage*) msg;
745   status = ntohl(sm->status);
746   emsg = NULL;
747   if (ntohs(msg->size) > sizeof(struct StatusMessage))
748     {
749       emsg = (const char*) &sm[1];
750       if (emsg[ntohs(msg->size) - sizeof(struct StatusMessage) - 1] != '\0')
751         {
752           GNUNET_break (0);
753           emsg = _("Invalid error message received from datastore service");
754         }
755     }  
756   if ( (status == GNUNET_SYSERR) &&
757        (emsg == NULL) )
758     {
759       GNUNET_break (0);
760       emsg = _("Invalid error message received from datastore service");
761     }
762 #if DEBUG_DATASTORE
763   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
764               "Received status %d/%s\n",
765               (int) status,
766               emsg);
767 #endif
768   GNUNET_STATISTICS_update (h->stats,
769                             gettext_noop ("# status messages received"),
770                             1,
771                             GNUNET_NO);
772   h->retry_time.rel_value = 0;
773   process_queue (h);
774   if (rc.cont != NULL)
775     rc.cont (rc.cont_cls, 
776              status,
777              emsg);
778 }
779
780
781 /**
782  * Store an item in the datastore.  If the item is already present,
783  * the priorities are summed up and the higher expiration time and
784  * lower anonymity level is used.
785  *
786  * @param h handle to the datastore
787  * @param rid reservation ID to use (from "reserve"); use 0 if no
788  *            prior reservation was made
789  * @param key key for the value
790  * @param size number of bytes in data
791  * @param data content stored
792  * @param type type of the content
793  * @param priority priority of the content
794  * @param anonymity anonymity-level for the content
795  * @param expiration expiration time for the content
796  * @param queue_priority ranking of this request in the priority queue
797  * @param max_queue_size at what queue size should this request be dropped
798  *        (if other requests of higher priority are in the queue)
799  * @param timeout timeout for the operation
800  * @param cont continuation to call when done
801  * @param cont_cls closure for cont
802  * @return NULL if the entry was not queued, otherwise a handle that can be used to
803  *         cancel; note that even if NULL is returned, the callback will be invoked
804  *         (or rather, will already have been invoked)
805  */
806 struct GNUNET_DATASTORE_QueueEntry *
807 GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
808                       int rid,
809                       const GNUNET_HashCode * key,
810                       size_t size,
811                       const void *data,
812                       enum GNUNET_BLOCK_Type type,
813                       uint32_t priority,
814                       uint32_t anonymity,
815                       struct GNUNET_TIME_Absolute expiration,
816                       unsigned int queue_priority,
817                       unsigned int max_queue_size,
818                       struct GNUNET_TIME_Relative timeout,
819                       GNUNET_DATASTORE_ContinuationWithStatus cont,
820                       void *cont_cls)
821 {
822   struct GNUNET_DATASTORE_QueueEntry *qe;
823   struct DataMessage *dm;
824   size_t msize;
825   union QueueContext qc;
826
827 #if DEBUG_DATASTORE
828   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
829               "Asked to put %u bytes of data under key `%s'\n",
830               size,
831               GNUNET_h2s (key));
832 #endif
833   msize = sizeof(struct DataMessage) + size;
834   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
835   qc.sc.cont = cont;
836   qc.sc.cont_cls = cont_cls;
837   qe = make_queue_entry (h, msize,
838                          queue_priority, max_queue_size, timeout,
839                          &process_status_message, &qc);
840   if (qe == NULL)
841     {
842 #if DEBUG_DATASTORE
843       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
844                   "Could not create queue entry for PUT\n");
845 #endif
846       return NULL;
847     }
848   GNUNET_STATISTICS_update (h->stats,
849                             gettext_noop ("# PUT requests executed"),
850                             1,
851                             GNUNET_NO);
852   dm = (struct DataMessage* ) &qe[1];
853   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
854   dm->header.size = htons(msize);
855   dm->rid = htonl(rid);
856   dm->size = htonl( (uint32_t) size);
857   dm->type = htonl(type);
858   dm->priority = htonl(priority);
859   dm->anonymity = htonl(anonymity);
860   dm->uid = GNUNET_htonll(0);
861   dm->expiration = GNUNET_TIME_absolute_hton(expiration);
862   dm->key = *key;
863   memcpy (&dm[1], data, size);
864   process_queue (h);
865   return qe;
866 }
867
868
869 /**
870  * Reserve space in the datastore.  This function should be used
871  * to avoid "out of space" failures during a longer sequence of "put"
872  * operations (for example, when a file is being inserted).
873  *
874  * @param h handle to the datastore
875  * @param amount how much space (in bytes) should be reserved (for content only)
876  * @param entries how many entries will be created (to calculate per-entry overhead)
877  * @param queue_priority ranking of this request in the priority queue
878  * @param max_queue_size at what queue size should this request be dropped
879  *        (if other requests of higher priority are in the queue)
880  * @param timeout how long to wait at most for a response (or before dying in queue)
881  * @param cont continuation to call when done; "success" will be set to
882  *             a positive reservation value if space could be reserved.
883  * @param cont_cls closure for cont
884  * @return NULL if the entry was not queued, otherwise a handle that can be used to
885  *         cancel; note that even if NULL is returned, the callback will be invoked
886  *         (or rather, will already have been invoked)
887  */
888 struct GNUNET_DATASTORE_QueueEntry *
889 GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
890                           uint64_t amount,
891                           uint32_t entries,
892                           unsigned int queue_priority,
893                           unsigned int max_queue_size,
894                           struct GNUNET_TIME_Relative timeout,
895                           GNUNET_DATASTORE_ContinuationWithStatus cont,
896                           void *cont_cls)
897 {
898   struct GNUNET_DATASTORE_QueueEntry *qe;
899   struct ReserveMessage *rm;
900   union QueueContext qc;
901
902   if (cont == NULL)
903     cont = &drop_status_cont;
904 #if DEBUG_DATASTORE
905   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
906               "Asked to reserve %llu bytes of data and %u entries'\n",
907               (unsigned long long) amount,
908               (unsigned int) entries);
909 #endif
910   qc.sc.cont = cont;
911   qc.sc.cont_cls = cont_cls;
912   qe = make_queue_entry (h, sizeof(struct ReserveMessage),
913                          queue_priority, max_queue_size, timeout,
914                          &process_status_message, &qc);
915   if (qe == NULL)
916     {
917 #if DEBUG_DATASTORE
918       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
919                   "Could not create queue entry to reserve\n");
920 #endif
921       return NULL;
922     }
923   GNUNET_STATISTICS_update (h->stats,
924                             gettext_noop ("# RESERVE requests executed"),
925                             1,
926                             GNUNET_NO);
927   rm = (struct ReserveMessage*) &qe[1];
928   rm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
929   rm->header.size = htons(sizeof (struct ReserveMessage));
930   rm->entries = htonl(entries);
931   rm->amount = GNUNET_htonll(amount);
932   process_queue (h);
933   return qe;
934 }
935
936
937 /**
938  * Signal that all of the data for which a reservation was made has
939  * been stored and that whatever excess space might have been reserved
940  * can now be released.
941  *
942  * @param h handle to the datastore
943  * @param rid reservation ID (value of "success" in original continuation
944  *        from the "reserve" function).
945  * @param queue_priority ranking of this request in the priority queue
946  * @param max_queue_size at what queue size should this request be dropped
947  *        (if other requests of higher priority are in the queue)
948  * @param queue_priority ranking of this request in the priority queue
949  * @param max_queue_size at what queue size should this request be dropped
950  *        (if other requests of higher priority are in the queue)
951  * @param timeout how long to wait at most for a response
952  * @param cont continuation to call when done
953  * @param cont_cls closure for cont
954  * @return NULL if the entry was not queued, otherwise a handle that can be used to
955  *         cancel; note that even if NULL is returned, the callback will be invoked
956  *         (or rather, will already have been invoked)
957  */
958 struct GNUNET_DATASTORE_QueueEntry *
959 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
960                                   int rid,
961                                   unsigned int queue_priority,
962                                   unsigned int max_queue_size,
963                                   struct GNUNET_TIME_Relative timeout,
964                                   GNUNET_DATASTORE_ContinuationWithStatus cont,
965                                   void *cont_cls)
966 {
967   struct GNUNET_DATASTORE_QueueEntry *qe;
968   struct ReleaseReserveMessage *rrm;
969   union QueueContext qc;
970
971   if (cont == NULL)
972     cont = &drop_status_cont;
973 #if DEBUG_DATASTORE
974   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
975               "Asked to release reserve %d\n",
976               rid);
977 #endif
978   qc.sc.cont = cont;
979   qc.sc.cont_cls = cont_cls;
980   qe = make_queue_entry (h, sizeof(struct ReleaseReserveMessage),
981                          queue_priority, max_queue_size, timeout,
982                          &process_status_message, &qc);
983   if (qe == NULL)
984     {
985 #if DEBUG_DATASTORE
986       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
987                   "Could not create queue entry to release reserve\n");
988 #endif
989       return NULL;
990     }
991   GNUNET_STATISTICS_update (h->stats,
992                             gettext_noop ("# RELEASE RESERVE requests executed"),
993                             1,
994                             GNUNET_NO);
995   rrm = (struct ReleaseReserveMessage*) &qe[1];
996   rrm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
997   rrm->header.size = htons(sizeof (struct ReleaseReserveMessage));
998   rrm->rid = htonl(rid);
999   process_queue (h);
1000   return qe;
1001 }
1002
1003
1004 /**
1005  * Update a value in the datastore.
1006  *
1007  * @param h handle to the datastore
1008  * @param uid identifier for the value
1009  * @param priority how much to increase the priority of the value
1010  * @param expiration new expiration value should be MAX of existing and this argument
1011  * @param queue_priority ranking of this request in the priority queue
1012  * @param max_queue_size at what queue size should this request be dropped
1013  *        (if other requests of higher priority are in the queue)
1014  * @param timeout how long to wait at most for a response
1015  * @param cont continuation to call when done
1016  * @param cont_cls closure for cont
1017  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1018  *         cancel; note that even if NULL is returned, the callback will be invoked
1019  *         (or rather, will already have been invoked)
1020  */
1021 struct GNUNET_DATASTORE_QueueEntry *
1022 GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
1023                          unsigned long long uid,
1024                          uint32_t priority,
1025                          struct GNUNET_TIME_Absolute expiration,
1026                          unsigned int queue_priority,
1027                          unsigned int max_queue_size,
1028                          struct GNUNET_TIME_Relative timeout,
1029                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1030                          void *cont_cls)
1031 {
1032   struct GNUNET_DATASTORE_QueueEntry *qe;
1033   struct UpdateMessage *um;
1034   union QueueContext qc;
1035
1036   if (cont == NULL)
1037     cont = &drop_status_cont;
1038 #if DEBUG_DATASTORE
1039   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1040               "Asked to update entry %llu raising priority by %u and expiration to %llu\n",
1041               uid,
1042               (unsigned int) priority,
1043               (unsigned long long) expiration.abs_value);
1044 #endif
1045   qc.sc.cont = cont;
1046   qc.sc.cont_cls = cont_cls;
1047   qe = make_queue_entry (h, sizeof(struct UpdateMessage),
1048                          queue_priority, max_queue_size, timeout,
1049                          &process_status_message, &qc);
1050   if (qe == NULL)
1051     {
1052 #if DEBUG_DATASTORE
1053       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1054                   "Could not create queue entry for UPDATE\n");
1055 #endif
1056       return NULL;
1057     }
1058   GNUNET_STATISTICS_update (h->stats,
1059                             gettext_noop ("# UPDATE requests executed"),
1060                             1,
1061                             GNUNET_NO);
1062   um = (struct UpdateMessage*) &qe[1];
1063   um->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
1064   um->header.size = htons(sizeof (struct UpdateMessage));
1065   um->priority = htonl(priority);
1066   um->expiration = GNUNET_TIME_absolute_hton(expiration);
1067   um->uid = GNUNET_htonll(uid);
1068   process_queue (h);
1069   return qe;
1070 }
1071
1072
1073 /**
1074  * Explicitly remove some content from the database.
1075  * The "cont"inuation will be called with status
1076  * "GNUNET_OK" if content was removed, "GNUNET_NO"
1077  * if no matching entry was found and "GNUNET_SYSERR"
1078  * on all other types of errors.
1079  *
1080  * @param h handle to the datastore
1081  * @param key key for the value
1082  * @param size number of bytes in data
1083  * @param data content stored
1084  * @param queue_priority ranking of this request in the priority queue
1085  * @param max_queue_size at what queue size should this request be dropped
1086  *        (if other requests of higher priority are in the queue)
1087  * @param timeout how long to wait at most for a response
1088  * @param cont continuation to call when done
1089  * @param cont_cls closure for cont
1090  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1091  *         cancel; note that even if NULL is returned, the callback will be invoked
1092  *         (or rather, will already have been invoked)
1093  */
1094 struct GNUNET_DATASTORE_QueueEntry *
1095 GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
1096                          const GNUNET_HashCode *key,
1097                          size_t size, 
1098                          const void *data,
1099                          unsigned int queue_priority,
1100                          unsigned int max_queue_size,
1101                          struct GNUNET_TIME_Relative timeout,
1102                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1103                          void *cont_cls)
1104 {
1105   struct GNUNET_DATASTORE_QueueEntry *qe;
1106   struct DataMessage *dm;
1107   size_t msize;
1108   union QueueContext qc;
1109
1110   if (cont == NULL)
1111     cont = &drop_status_cont;
1112 #if DEBUG_DATASTORE
1113   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1114               "Asked to remove %u bytes under key `%s'\n",
1115               size,
1116               GNUNET_h2s (key));
1117 #endif
1118   qc.sc.cont = cont;
1119   qc.sc.cont_cls = cont_cls;
1120   msize = sizeof(struct DataMessage) + size;
1121   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1122   qe = make_queue_entry (h, msize,
1123                          queue_priority, max_queue_size, timeout,
1124                          &process_status_message, &qc);
1125   if (qe == NULL)
1126     {
1127 #if DEBUG_DATASTORE
1128       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1129                   "Could not create queue entry for REMOVE\n");
1130 #endif
1131       return NULL;
1132     }
1133   GNUNET_STATISTICS_update (h->stats,
1134                             gettext_noop ("# REMOVE requests executed"),
1135                             1,
1136                             GNUNET_NO);
1137   dm = (struct DataMessage*) &qe[1];
1138   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
1139   dm->header.size = htons(msize);
1140   dm->rid = htonl(0);
1141   dm->size = htonl(size);
1142   dm->type = htonl(0);
1143   dm->priority = htonl(0);
1144   dm->anonymity = htonl(0);
1145   dm->uid = GNUNET_htonll(0);
1146   dm->expiration = GNUNET_TIME_absolute_hton(GNUNET_TIME_UNIT_ZERO_ABS);
1147   dm->key = *key;
1148   memcpy (&dm[1], data, size);
1149   process_queue (h);
1150   return qe;
1151 }
1152
1153
1154 /**
1155  * Type of a function to call when we receive a message
1156  * from the service.
1157  *
1158  * @param cls closure
1159  * @param msg message received, NULL on timeout or fatal error
1160  */
1161 static void 
1162 process_result_message (void *cls,
1163                         const struct GNUNET_MessageHeader * msg)
1164 {
1165   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
1166   struct GNUNET_DATASTORE_Handle *h = qe->h;
1167   struct ResultContext rc = qe->qc.rc;
1168   const struct DataMessage *dm;
1169   int was_transmitted;
1170
1171   h->in_receive = GNUNET_NO;
1172   if (msg == NULL)
1173    {
1174       was_transmitted = qe->was_transmitted;
1175       free_queue_entry (qe);
1176       if (was_transmitted == GNUNET_YES)
1177         {
1178           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1179                       _("Failed to receive response from database.\n"));
1180           do_disconnect (h);
1181         }
1182       else
1183         {
1184 #if DEBUG_DATASTORE
1185           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1186                       "Request dropped due to finite datastore queue length.\n");
1187 #endif
1188         }
1189       if (rc.iter != NULL)
1190         rc.iter (rc.iter_cls,
1191                  NULL, 0, NULL, 0, 0, 0, 
1192                  GNUNET_TIME_UNIT_ZERO_ABS, 0); 
1193       return;
1194     }
1195   GNUNET_assert (GNUNET_YES == qe->was_transmitted);
1196   GNUNET_assert (h->queue_head == qe);
1197   if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) 
1198     {
1199       GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader));
1200 #if DEBUG_DATASTORE
1201       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1202                   "Received end of result set\n");
1203 #endif
1204       free_queue_entry (qe);
1205       if (rc.iter != NULL)
1206         rc.iter (rc.iter_cls,
1207                  NULL, 0, NULL, 0, 0, 0, 
1208                  GNUNET_TIME_UNIT_ZERO_ABS, 0); 
1209       h->retry_time.rel_value = 0;
1210       h->result_count = 0;
1211       process_queue (h);
1212       return;
1213     }
1214   if ( (ntohs(msg->size) < sizeof(struct DataMessage)) ||
1215        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) ||
1216        (ntohs(msg->size) != sizeof(struct DataMessage) + ntohl (((const struct DataMessage*)msg)->size)) )
1217     {
1218       GNUNET_break (0);
1219       free_queue_entry (qe);
1220       h->retry_time = GNUNET_TIME_UNIT_ZERO;
1221       do_disconnect (h);
1222       if (rc.iter != NULL)
1223         rc.iter (rc.iter_cls,
1224                  NULL, 0, NULL, 0, 0, 0, 
1225                  GNUNET_TIME_UNIT_ZERO_ABS, 0); 
1226       return;
1227     }
1228   GNUNET_STATISTICS_update (h->stats,
1229                             gettext_noop ("# Results received"),
1230                             1,
1231                             GNUNET_NO);
1232   if (rc.iter == NULL)
1233     {
1234       h->result_count++;
1235       GNUNET_STATISTICS_update (h->stats,
1236                                 gettext_noop ("# Excess results received"),
1237                                 1,
1238                                 GNUNET_NO);
1239       if (h->result_count > MAX_EXCESS_RESULTS)
1240         {
1241           free_queue_entry (qe);
1242           GNUNET_STATISTICS_update (h->stats,
1243                                     gettext_noop ("# Forced database connection resets"),
1244                                     1,
1245                                     GNUNET_NO);
1246           h->retry_time = GNUNET_TIME_UNIT_ZERO;
1247           do_disconnect (h);      
1248           return;
1249         }
1250       GNUNET_DATASTORE_get_next (h, GNUNET_NO);
1251       return;
1252     }
1253   dm = (const struct DataMessage*) msg;
1254 #if DEBUG_DATASTORE
1255   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1256               "Received result %llu with type %u and size %u with key %s\n",
1257               (unsigned long long) GNUNET_ntohll(dm->uid),
1258               ntohl(dm->type),
1259               ntohl(dm->size),
1260               GNUNET_h2s(&dm->key));
1261 #endif
1262   h->retry_time.rel_value = 0;
1263   rc.iter (rc.iter_cls,
1264            &dm->key,
1265            ntohl(dm->size),
1266            &dm[1],
1267            ntohl(dm->type),
1268            ntohl(dm->priority),
1269            ntohl(dm->anonymity),
1270            GNUNET_TIME_absolute_ntoh(dm->expiration),   
1271            GNUNET_ntohll(dm->uid));
1272 }
1273
1274
1275 /**
1276  * Get a random value from the datastore.
1277  *
1278  * @param h handle to the datastore
1279  * @param queue_priority ranking of this request in the priority queue
1280  * @param max_queue_size at what queue size should this request be dropped
1281  *        (if other requests of higher priority are in the queue)
1282  * @param timeout how long to wait at most for a response
1283  * @param iter function to call on a random value; it
1284  *        will be called once with a value (if available)
1285  *        and always once with a value of NULL.
1286  * @param iter_cls closure for iter
1287  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1288  *         cancel; note that even if NULL is returned, the callback will be invoked
1289  *         (or rather, will already have been invoked)
1290  */
1291 struct GNUNET_DATASTORE_QueueEntry *
1292 GNUNET_DATASTORE_get_random (struct GNUNET_DATASTORE_Handle *h,
1293                              unsigned int queue_priority,
1294                              unsigned int max_queue_size,
1295                              struct GNUNET_TIME_Relative timeout,
1296                              GNUNET_DATASTORE_Iterator iter, 
1297                              void *iter_cls)
1298 {
1299   struct GNUNET_DATASTORE_QueueEntry *qe;
1300   struct GNUNET_MessageHeader *m;
1301   union QueueContext qc;
1302
1303 #if DEBUG_DATASTORE
1304   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1305               "Asked to get random entry in %llu ms\n",
1306               (unsigned long long) timeout.rel_value);
1307 #endif
1308   qc.rc.iter = iter;
1309   qc.rc.iter_cls = iter_cls;
1310   qe = make_queue_entry (h, sizeof(struct GNUNET_MessageHeader),
1311                          queue_priority, max_queue_size, timeout,
1312                          &process_result_message, &qc);
1313   if (qe == NULL)
1314     {
1315 #if DEBUG_DATASTORE
1316       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1317                   "Could not create queue entry for GET RANDOM\n");
1318 #endif
1319       return NULL;    
1320     }
1321   GNUNET_STATISTICS_update (h->stats,
1322                             gettext_noop ("# GET RANDOM requests executed"),
1323                             1,
1324                             GNUNET_NO);
1325   m = (struct GNUNET_MessageHeader*) &qe[1];
1326   m->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM);
1327   m->size = htons(sizeof (struct GNUNET_MessageHeader));
1328   process_queue (h);
1329   return qe;
1330 }
1331
1332
1333 /**
1334  * Get a zero-anonymity value from the datastore.
1335  *
1336  * @param h handle to the datastore
1337  * @param queue_priority ranking of this request in the priority queue
1338  * @param max_queue_size at what queue size should this request be dropped
1339  *        (if other requests of higher priority are in the queue)
1340  * @param timeout how long to wait at most for a response
1341  * @param type allowed type for the operation
1342  * @param iter function to call on a random value; it
1343  *        will be called once with a value (if available)
1344  *        and always once with a value of NULL.
1345  * @param iter_cls closure for iter
1346  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1347  *         cancel; note that even if NULL is returned, the callback will be invoked
1348  *         (or rather, will already have been invoked)
1349  */
1350 struct GNUNET_DATASTORE_QueueEntry *
1351 GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1352                                      unsigned int queue_priority,
1353                                      unsigned int max_queue_size,
1354                                      struct GNUNET_TIME_Relative timeout,
1355                                      enum GNUNET_BLOCK_Type type,
1356                                      GNUNET_DATASTORE_Iterator iter, 
1357                                      void *iter_cls)
1358 {
1359   struct GNUNET_DATASTORE_QueueEntry *qe;
1360   struct GetZeroAnonymityMessage *m;
1361   union QueueContext qc;
1362
1363 #if DEBUG_DATASTORE
1364   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1365               "Asked to get zero-anonymity entry in %llu ms\n",
1366               (unsigned long long) timeout.rel_value);
1367 #endif
1368   qc.rc.iter = iter;
1369   qc.rc.iter_cls = iter_cls;
1370   qe = make_queue_entry (h, sizeof(struct GetZeroAnonymityMessage),
1371                          queue_priority, max_queue_size, timeout,
1372                          &process_result_message, &qc);
1373   if (qe == NULL)
1374     {
1375 #if DEBUG_DATASTORE
1376       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1377                   "Could not create queue entry for zero-anonymity iteration\n");
1378 #endif
1379       return NULL;    
1380     }
1381   GNUNET_STATISTICS_update (h->stats,
1382                             gettext_noop ("# GET ZERO ANONYMITY requests executed"),
1383                             1,
1384                             GNUNET_NO);
1385   m = (struct GetZeroAnonymityMessage*) &qe[1];
1386   m->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY);
1387   m->header.size = htons(sizeof (struct GetZeroAnonymityMessage));
1388   m->type = htonl ((uint32_t) type);
1389   process_queue (h);
1390   return qe;
1391 }
1392
1393
1394
1395 /**
1396  * Iterate over the results for a particular key
1397  * in the datastore.  The iterator will only be called
1398  * once initially; if the first call did contain a
1399  * result, further results can be obtained by calling
1400  * "GNUNET_DATASTORE_get_next" with the given argument.
1401  *
1402  * @param h handle to the datastore
1403  * @param key maybe NULL (to match all entries)
1404  * @param type desired type, 0 for any
1405  * @param queue_priority ranking of this request in the priority queue
1406  * @param max_queue_size at what queue size should this request be dropped
1407  *        (if other requests of higher priority are in the queue)
1408  * @param timeout how long to wait at most for a response
1409  * @param iter function to call on each matching value;
1410  *        will be called once with a NULL value at the end
1411  * @param iter_cls closure for iter
1412  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1413  *         cancel; note that even if NULL is returned, the callback will be invoked
1414  *         (or rather, will already have been invoked)
1415  */
1416 struct GNUNET_DATASTORE_QueueEntry *
1417 GNUNET_DATASTORE_get (struct GNUNET_DATASTORE_Handle *h,
1418                       const GNUNET_HashCode * key,
1419                       enum GNUNET_BLOCK_Type type,
1420                       unsigned int queue_priority,
1421                       unsigned int max_queue_size,
1422                       struct GNUNET_TIME_Relative timeout,
1423                       GNUNET_DATASTORE_Iterator iter, 
1424                       void *iter_cls)
1425 {
1426   struct GNUNET_DATASTORE_QueueEntry *qe;
1427   struct GetMessage *gm;
1428   union QueueContext qc;
1429
1430 #if DEBUG_DATASTORE
1431   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1432               "Asked to look for data of type %u under key `%s'\n",
1433               (unsigned int) type,
1434               GNUNET_h2s (key));
1435 #endif
1436   qc.rc.iter = iter;
1437   qc.rc.iter_cls = iter_cls;
1438   qe = make_queue_entry (h, sizeof(struct GetMessage),
1439                          queue_priority, max_queue_size, timeout,
1440                          &process_result_message, &qc);
1441   if (qe == NULL)
1442     {
1443 #if DEBUG_DATASTORE
1444       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1445                   "Could not queue request for `%s'\n",
1446                   GNUNET_h2s (key));
1447 #endif
1448       return NULL;
1449     }
1450   GNUNET_STATISTICS_update (h->stats,
1451                             gettext_noop ("# GET requests executed"),
1452                             1,
1453                             GNUNET_NO);
1454   gm = (struct GetMessage*) &qe[1];
1455   gm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1456   gm->type = htonl(type);
1457   if (key != NULL)
1458     {
1459       gm->header.size = htons(sizeof (struct GetMessage));
1460       gm->key = *key;
1461     }
1462   else
1463     {
1464       gm->header.size = htons(sizeof (struct GetMessage) - sizeof(GNUNET_HashCode));
1465     }
1466   process_queue (h);
1467   return qe;
1468 }
1469
1470
1471 /**
1472  * Function called to trigger obtaining the next result
1473  * from the datastore.
1474  * 
1475  * @param h handle to the datastore
1476  * @param more GNUNET_YES to get moxre results, GNUNET_NO to abort
1477  *        iteration (with a final call to "iter" with key/data == NULL).
1478  */
1479 void 
1480 GNUNET_DATASTORE_get_next (struct GNUNET_DATASTORE_Handle *h,
1481                            int more)
1482 {
1483   struct GNUNET_DATASTORE_QueueEntry *qe = h->queue_head;
1484   struct ResultContext rc = qe->qc.rc;
1485
1486   GNUNET_assert (&process_result_message == qe->response_proc);
1487   if (GNUNET_YES != more)
1488     {
1489       qe->qc.rc.iter = NULL;
1490       qe->qc.rc.iter_cls = NULL;
1491       if (rc.iter != NULL)
1492         rc.iter (rc.iter_cls,
1493                  NULL, 0, NULL, 0, 0, 0, 
1494                  GNUNET_TIME_UNIT_ZERO_ABS, 0); 
1495     }
1496   h->in_receive = GNUNET_YES;
1497   GNUNET_CLIENT_receive (h->client,
1498                          qe->response_proc,
1499                          qe,
1500                          GNUNET_TIME_absolute_get_remaining (qe->timeout));
1501 }
1502
1503
1504 /**
1505  * Cancel a datastore operation.  The final callback from the
1506  * operation must not have been done yet.
1507  * 
1508  * @param qe operation to cancel
1509  */
1510 void
1511 GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
1512 {
1513   struct GNUNET_DATASTORE_Handle *h;
1514   int reconnect;
1515
1516   h = qe->h;
1517 #if DEBUG_DATASTORE
1518   GNUNET_log  (GNUNET_ERROR_TYPE_DEBUG,
1519                "Pending DATASTORE request %p cancelled (%d, %d)\n",
1520                qe,
1521                qe->was_transmitted,
1522                h->queue_head == qe);
1523 #endif
1524   reconnect = GNUNET_NO;
1525   if (GNUNET_YES == qe->was_transmitted) 
1526     {
1527       if (qe->response_proc == &process_result_message) 
1528         {
1529           qe->qc.rc.iter = NULL;    
1530           if (GNUNET_YES != h->in_receive)
1531             GNUNET_DATASTORE_get_next (h, GNUNET_YES);
1532         }
1533       else
1534         {
1535           qe->qc.sc.cont = NULL;
1536         }
1537       return;
1538     }
1539   free_queue_entry (qe);
1540   process_queue (h);
1541 }
1542
1543
1544 /* end of datastore_api.c */