ef373695063ed11e94ed89c7b2239b9486571fae
[oweals/gnunet.git] / src / datastore / datastore_api.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/datastore_api.c
23  * @brief Management for the datastore for files stored on a GNUnet node.  Implements
24  *        a priority queue for requests (with timeouts).
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet_arm_service.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_datastore_service.h"
31 #include "gnunet_statistics_service.h"
32 #include "datastore.h"
33
34 /**
35  * If a client stopped asking for more results, how many more do
36  * we receive from the DB before killing the connection?  Trade-off
37  * between re-doing TCP handshakes and (needlessly) receiving 
38  * useless results.
39  */
40 #define MAX_EXCESS_RESULTS 8
41
42 /**
43  * Context for processing status messages.
44  */
45 struct StatusContext
46 {
47   /**
48    * Continuation to call with the status.
49    */
50   GNUNET_DATASTORE_ContinuationWithStatus cont;
51
52   /**
53    * Closure for cont.
54    */
55   void *cont_cls;
56
57 };
58
59
60 /**
61  * Context for processing result messages.
62  */
63 struct ResultContext
64 {
65   /**
66    * Iterator to call with the result.
67    */
68   GNUNET_DATASTORE_Iterator iter;
69
70   /**
71    * Closure for iter.
72    */
73   void *iter_cls;
74
75 };
76
77
78 /**
79  *  Context for a queue operation.
80  */
81 union QueueContext
82 {
83
84   struct StatusContext sc;
85   
86   struct ResultContext rc;
87
88 };
89
90
91
92 /**
93  * Entry in our priority queue.
94  */
95 struct GNUNET_DATASTORE_QueueEntry
96 {
97
98   /**
99    * This is a linked list.
100    */
101   struct GNUNET_DATASTORE_QueueEntry *next;
102
103   /**
104    * This is a linked list.
105    */
106   struct GNUNET_DATASTORE_QueueEntry *prev;
107
108   /**
109    * Handle to the master context.
110    */
111   struct GNUNET_DATASTORE_Handle *h;
112
113   /**
114    * Response processor (NULL if we are not waiting for a response).
115    * This struct should be used for the closure, function-specific
116    * arguments can be passed via 'qc'.
117    */
118   GNUNET_CLIENT_MessageHandler response_proc;
119
120   /**
121    * Function to call after transmission of the request.
122    */
123   GNUNET_DATASTORE_ContinuationWithStatus cont;
124    
125   /**
126    * Closure for 'cont'.
127    */
128   void *cont_cls;
129
130   /**
131    * Context for the operation.
132    */
133   union QueueContext qc;
134
135   /**
136    * Task for timeout signalling.
137    */
138   GNUNET_SCHEDULER_TaskIdentifier task;
139
140   /**
141    * Timeout for the current operation.
142    */
143   struct GNUNET_TIME_Absolute timeout;
144
145   /**
146    * Priority in the queue.
147    */
148   unsigned int priority;
149
150   /**
151    * Maximum allowed length of queue (otherwise
152    * this request should be discarded).
153    */
154   unsigned int max_queue;
155
156   /**
157    * Number of bytes in the request message following
158    * this struct.  32-bit value for nicer memory
159    * access (and overall struct alignment).
160    */
161   uint32_t message_size;
162
163   /**
164    * Has this message been transmitted to the service?
165    * Only ever GNUNET_YES for the head of the queue.
166    * Note that the overall struct should end at a 
167    * multiple of 64 bits.
168    */
169   int was_transmitted;
170   
171   /**
172    * Are we expecting a single message in response to this
173    * request (and, if it is data, no 'END' message)?
174    */
175   int one_shot; 
176   
177 };
178
179 /**
180  * Handle to the datastore service. 
181  */
182 struct GNUNET_DATASTORE_Handle
183 {
184
185   /**
186    * Our configuration.
187    */
188   const struct GNUNET_CONFIGURATION_Handle *cfg;
189
190
191   /**
192    * Current connection to the datastore service.
193    */
194   struct GNUNET_CLIENT_Connection *client;
195
196   /**
197    * Handle for statistics.
198    */
199   struct GNUNET_STATISTICS_Handle *stats;
200
201   /**
202    * Current transmit handle.
203    */
204   struct GNUNET_CLIENT_TransmitHandle *th;
205
206   /**
207    * Current head of priority queue.
208    */
209   struct GNUNET_DATASTORE_QueueEntry *queue_head;
210
211   /**
212    * Current tail of priority queue.
213    */
214   struct GNUNET_DATASTORE_QueueEntry *queue_tail;
215
216   /**
217    * Task for trying to reconnect.
218    */
219   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
220
221   /**
222    * How quickly should we retry?  Used for exponential back-off on
223    * connect-errors.
224    */
225   struct GNUNET_TIME_Relative retry_time;
226
227   /**
228    * Number of entries in the queue.
229    */
230   unsigned int queue_size;
231
232   /**
233    * Number of results we're receiving for the current query
234    * after application stopped to care.  Used to determine when
235    * to reset the connection.
236    */
237   unsigned int result_count;
238
239   /**
240    * Are we currently trying to receive from the service?
241    */
242   int in_receive;
243
244 };
245
246
247
248 /**
249  * Connect to the datastore service.
250  *
251  * @param cfg configuration to use
252  * @return handle to use to access the service
253  */
254 struct GNUNET_DATASTORE_Handle *
255 GNUNET_DATASTORE_connect (const struct
256                           GNUNET_CONFIGURATION_Handle
257                           *cfg)
258 {
259   struct GNUNET_CLIENT_Connection *c;
260   struct GNUNET_DATASTORE_Handle *h;
261   
262   c = GNUNET_CLIENT_connect ("datastore", cfg);
263   if (c == NULL)
264     return NULL; /* oops */
265   h = GNUNET_malloc (sizeof(struct GNUNET_DATASTORE_Handle) + 
266                      GNUNET_SERVER_MAX_MESSAGE_SIZE - 1);
267   h->client = c;
268   h->cfg = cfg;
269   h->stats = GNUNET_STATISTICS_create ("datastore-api",
270                                        cfg);
271   return h;
272 }
273
274
275 /**
276  * Transmit DROP message to datastore service.
277  *
278  * @param cls the 'struct GNUNET_DATASTORE_Handle'
279  * @param size number of bytes that can be copied to buf
280  * @param buf where to copy the drop message
281  * @return number of bytes written to buf
282  */
283 static size_t
284 transmit_drop (void *cls,
285                size_t size, 
286                void *buf)
287 {
288   struct GNUNET_DATASTORE_Handle *h = cls;
289   struct GNUNET_MessageHeader *hdr;
290   
291   if (buf == NULL)
292     {
293       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
294                   _("Failed to transmit request to drop database.\n"));
295       GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
296       return 0;
297     }
298   GNUNET_assert (size >= sizeof(struct GNUNET_MessageHeader));
299   hdr = buf;
300   hdr->size = htons(sizeof(struct GNUNET_MessageHeader));
301   hdr->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
302   GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
303   return sizeof(struct GNUNET_MessageHeader);
304 }
305
306
307 /**
308  * Disconnect from the datastore service (and free
309  * associated resources).
310  *
311  * @param h handle to the datastore
312  * @param drop set to GNUNET_YES to delete all data in datastore (!)
313  */
314 void
315 GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
316                              int drop)
317 {
318   struct GNUNET_DATASTORE_QueueEntry *qe;
319
320   if (h->client != NULL)
321     {
322       GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
323       h->client = NULL;
324     }
325   if (h->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
326     {
327       GNUNET_SCHEDULER_cancel (h->reconnect_task);
328       h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
329     }
330   while (NULL != (qe = h->queue_head))
331     {
332       GNUNET_assert (NULL != qe->response_proc);
333       qe->response_proc (qe, NULL);
334     }
335   if (GNUNET_YES == drop) 
336     {
337       h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
338       if (h->client != NULL)
339         {
340           if (NULL != 
341               GNUNET_CLIENT_notify_transmit_ready (h->client,
342                                                    sizeof(struct GNUNET_MessageHeader),
343                                                    GNUNET_TIME_UNIT_MINUTES,
344                                                    GNUNET_YES,
345                                                    &transmit_drop,
346                                                    h))
347             return;
348           GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
349         }
350       GNUNET_break (0);
351     }
352   GNUNET_STATISTICS_destroy (h->stats,
353                              GNUNET_NO);
354   GNUNET_free (h);
355 }
356
357
358 /**
359  * A request has timed out (before being transmitted to the service).
360  *
361  * @param cls the 'struct GNUNET_DATASTORE_QueueEntry'
362  * @param tc scheduler context
363  */
364 static void
365 timeout_queue_entry (void *cls,
366                      const struct GNUNET_SCHEDULER_TaskContext *tc)
367 {
368   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
369
370   GNUNET_STATISTICS_update (qe->h->stats,
371                             gettext_noop ("# queue entry timeouts"),
372                             1,
373                             GNUNET_NO);
374   qe->task = GNUNET_SCHEDULER_NO_TASK;
375   GNUNET_assert (qe->was_transmitted == GNUNET_NO);
376   qe->response_proc (qe, NULL);
377 }
378
379
380 /**
381  * Create a new entry for our priority queue (and possibly discard other entires if
382  * the queue is getting too long).
383  *
384  * @param h handle to the datastore
385  * @param msize size of the message to queue
386  * @param queue_priority priority of the entry
387  * @param max_queue_size at what queue size should this request be dropped
388  *        (if other requests of higher priority are in the queue)
389  * @param timeout timeout for the operation
390  * @param response_proc function to call with replies (can be NULL)
391  * @param qc client context (NOT a closure for response_proc)
392  * @return NULL if the queue is full (and this entry was dropped)
393  */
394 static struct GNUNET_DATASTORE_QueueEntry *
395 make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
396                   size_t msize,
397                   unsigned int queue_priority,
398                   unsigned int max_queue_size,
399                   struct GNUNET_TIME_Relative timeout,
400                   GNUNET_CLIENT_MessageHandler response_proc,            
401                   const union QueueContext *qc)
402 {
403   struct GNUNET_DATASTORE_QueueEntry *ret;
404   struct GNUNET_DATASTORE_QueueEntry *pos;
405   unsigned int c;
406
407   c = 0;
408   pos = h->queue_head;
409   while ( (pos != NULL) &&
410           (c < max_queue_size) &&
411           (pos->priority >= queue_priority) )
412     {
413       c++;
414       pos = pos->next;
415     }
416   ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize);
417   ret->h = h;
418   ret->response_proc = response_proc;
419   ret->qc = *qc;
420   ret->timeout = GNUNET_TIME_relative_to_absolute (timeout);
421   ret->priority = queue_priority;
422   ret->max_queue = max_queue_size;
423   ret->message_size = msize;
424   ret->was_transmitted = GNUNET_NO;
425   if (pos == NULL)
426     {
427       /* append at the tail */
428       pos = h->queue_tail;
429     }
430   else
431     {
432       pos = pos->prev; 
433       /* do not insert at HEAD if HEAD query was already
434          transmitted and we are still receiving replies! */
435       if ( (pos == NULL) &&
436            (h->queue_head->was_transmitted) )
437         pos = h->queue_head;
438     }
439   c++;
440   GNUNET_STATISTICS_update (h->stats,
441                             gettext_noop ("# queue entries created"),
442                             1,
443                             GNUNET_NO);
444   GNUNET_CONTAINER_DLL_insert_after (h->queue_head,
445                                      h->queue_tail,
446                                      pos,
447                                      ret);
448   h->queue_size++;
449   if (c > max_queue_size)
450     {
451       GNUNET_STATISTICS_update (h->stats,
452                                 gettext_noop ("# queue overflows"),
453                                 1,
454                                 GNUNET_NO);
455       response_proc (ret, NULL);
456       return NULL;
457     }
458   ret->task = GNUNET_SCHEDULER_add_delayed (timeout,
459                                             &timeout_queue_entry,
460                                             ret);
461   pos = ret->next;
462   while (pos != NULL) 
463     {
464       if (pos->max_queue < h->queue_size)
465         {
466           GNUNET_assert (pos->response_proc != NULL);
467           pos->response_proc (pos, NULL);
468           break;
469         }
470       pos = pos->next;
471     }
472   return ret;
473 }
474
475
476 /**
477  * Process entries in the queue (or do nothing if we are already
478  * doing so).
479  * 
480  * @param h handle to the datastore
481  */
482 static void
483 process_queue (struct GNUNET_DATASTORE_Handle *h);
484
485
486 /**
487  * Try reconnecting to the datastore service.
488  *
489  * @param cls the 'struct GNUNET_DATASTORE_Handle'
490  * @param tc scheduler context
491  */
492 static void
493 try_reconnect (void *cls,
494                const struct GNUNET_SCHEDULER_TaskContext *tc)
495 {
496   struct GNUNET_DATASTORE_Handle *h = cls;
497
498   if (h->retry_time.rel_value < GNUNET_CONSTANTS_SERVICE_RETRY.rel_value)
499     h->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY;
500   else
501     h->retry_time = GNUNET_TIME_relative_multiply (h->retry_time, 2);
502   if (h->retry_time.rel_value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.rel_value)
503     h->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
504   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
505   h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
506   if (h->client == NULL)
507     {
508       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
509                   "DATASTORE reconnect failed (fatally)\n");
510       return;
511     }
512   GNUNET_STATISTICS_update (h->stats,
513                             gettext_noop ("# datastore connections (re)created"),
514                             1,
515                             GNUNET_NO);
516 #if DEBUG_DATASTORE
517   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
518               "Reconnected to DATASTORE\n");
519 #endif
520   process_queue (h);
521 }
522
523
524 /**
525  * Disconnect from the service and then try reconnecting to the datastore service
526  * after some delay.
527  *
528  * @param h handle to datastore to disconnect and reconnect
529  */
530 static void
531 do_disconnect (struct GNUNET_DATASTORE_Handle *h)
532 {
533   if (h->client == NULL)
534     {
535 #if DEBUG_DATASTORE
536       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
537                   "client NULL in disconnect, will not try to reconnect\n");
538 #endif
539       return;
540     }
541 #if 0
542   GNUNET_STATISTICS_update (stats,
543                             gettext_noop ("# reconnected to DATASTORE"),
544                             1,
545                             GNUNET_NO);
546 #endif
547   GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
548   h->client = NULL;
549   h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->retry_time,
550                                                     &try_reconnect,
551                                                     h);      
552 }
553
554
555 /**
556  * Transmit request from queue to datastore service.
557  *
558  * @param cls the 'struct GNUNET_DATASTORE_Handle'
559  * @param size number of bytes that can be copied to buf
560  * @param buf where to copy the drop message
561  * @return number of bytes written to buf
562  */
563 static size_t
564 transmit_request (void *cls,
565                   size_t size, 
566                   void *buf)
567 {
568   struct GNUNET_DATASTORE_Handle *h = cls;
569   struct GNUNET_DATASTORE_QueueEntry *qe;
570   size_t msize;
571
572   h->th = NULL;
573   if (NULL == (qe = h->queue_head))
574     return 0; /* no entry in queue */
575   if (buf == NULL)
576     {
577       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
578                   _("Failed to transmit request to DATASTORE.\n"));
579       GNUNET_STATISTICS_update (h->stats,
580                                 gettext_noop ("# transmission request failures"),
581                                 1,
582                                 GNUNET_NO);
583       do_disconnect (h);
584       return 0;
585     }
586   if (size < (msize = qe->message_size))
587     {
588       process_queue (h);
589       return 0;
590     }
591  #if DEBUG_DATASTORE
592   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
593               "Transmitting %u byte request to DATASTORE\n",
594               msize);
595 #endif
596   memcpy (buf, &qe[1], msize);
597   qe->was_transmitted = GNUNET_YES;
598   GNUNET_SCHEDULER_cancel (qe->task);
599   qe->task = GNUNET_SCHEDULER_NO_TASK;
600   h->in_receive = GNUNET_YES;
601   GNUNET_CLIENT_receive (h->client,
602                          qe->response_proc,
603                          qe,
604                          GNUNET_TIME_absolute_get_remaining (qe->timeout));
605   GNUNET_STATISTICS_update (h->stats,
606                             gettext_noop ("# bytes sent to datastore"),
607                             1,
608                             GNUNET_NO);
609   return msize;
610 }
611
612
613 /**
614  * Process entries in the queue (or do nothing if we are already
615  * doing so).
616  * 
617  * @param h handle to the datastore
618  */
619 static void
620 process_queue (struct GNUNET_DATASTORE_Handle *h)
621 {
622   struct GNUNET_DATASTORE_QueueEntry *qe;
623
624   if (NULL == (qe = h->queue_head))
625     {
626 #if DEBUG_DATASTORE > 1
627       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
628                   "Queue empty\n");
629 #endif
630       return; /* no entry in queue */
631     }
632   if (qe->was_transmitted == GNUNET_YES)
633     {
634 #if DEBUG_DATASTORE > 1
635       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
636                   "Head request already transmitted\n");
637 #endif
638       return; /* waiting for replies */
639     }
640   if (h->th != NULL)
641     {
642 #if DEBUG_DATASTORE > 1
643       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
644                   "Pending transmission request\n");
645 #endif
646       return; /* request pending */
647     }
648   if (h->client == NULL)
649     {
650 #if DEBUG_DATASTORE > 1
651       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
652                   "Not connected\n");
653 #endif
654       return; /* waiting for reconnect */
655     }
656 #if DEBUG_DATASTORE
657   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
658               "Queueing %u byte request to DATASTORE\n",
659               qe->message_size);
660 #endif
661   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
662                                                qe->message_size,
663                                                GNUNET_TIME_absolute_get_remaining (qe->timeout),
664                                                GNUNET_YES,
665                                                &transmit_request,
666                                                h);
667 }
668
669
670 /**
671  * Dummy continuation used to do nothing (but be non-zero).
672  *
673  * @param cls closure
674  * @param result result 
675  * @param emsg error message
676  */
677 static void
678 drop_status_cont (void *cls, int32_t result, const char *emsg)
679 {
680   /* do nothing */
681 }
682
683
684 static void
685 free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
686 {
687   struct GNUNET_DATASTORE_Handle *h = qe->h;
688
689   GNUNET_CONTAINER_DLL_remove (h->queue_head,
690                                h->queue_tail,
691                                qe);
692   if (qe->task != GNUNET_SCHEDULER_NO_TASK)
693     {
694       GNUNET_SCHEDULER_cancel (qe->task);
695       qe->task = GNUNET_SCHEDULER_NO_TASK;
696     }
697   h->queue_size--;
698   GNUNET_free (qe);
699 }
700
701 /**
702  * Type of a function to call when we receive a message
703  * from the service.
704  *
705  * @param cls closure
706  * @param msg message received, NULL on timeout or fatal error
707  */
708 static void 
709 process_status_message (void *cls,
710                         const struct
711                         GNUNET_MessageHeader * msg)
712 {
713   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
714   struct GNUNET_DATASTORE_Handle *h = qe->h;
715   struct StatusContext rc = qe->qc.sc;
716   const struct StatusMessage *sm;
717   const char *emsg;
718   int32_t status;
719   int was_transmitted;
720
721   h->in_receive = GNUNET_NO;
722   was_transmitted = qe->was_transmitted;
723   if (msg == NULL)
724     {      
725       free_queue_entry (qe);
726       if (NULL == h->client)
727         return; /* forced disconnect */
728       if (rc.cont != NULL)
729         rc.cont (rc.cont_cls, 
730                  GNUNET_SYSERR,
731                  _("Failed to receive status response from database."));
732       if (was_transmitted == GNUNET_YES)
733         do_disconnect (h);
734       return;
735     }
736   GNUNET_assert (GNUNET_YES == qe->was_transmitted);
737   GNUNET_assert (h->queue_head == qe);
738   free_queue_entry (qe);
739   if ( (ntohs(msg->size) < sizeof(struct StatusMessage)) ||
740        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS) ) 
741     {
742       GNUNET_break (0);
743       h->retry_time = GNUNET_TIME_UNIT_ZERO;
744       do_disconnect (h);
745       if (rc.cont != NULL)
746         rc.cont (rc.cont_cls, 
747                  GNUNET_SYSERR,
748                  _("Error reading response from datastore service"));
749       return;
750     }
751   sm = (const struct StatusMessage*) msg;
752   status = ntohl(sm->status);
753   emsg = NULL;
754   if (ntohs(msg->size) > sizeof(struct StatusMessage))
755     {
756       emsg = (const char*) &sm[1];
757       if (emsg[ntohs(msg->size) - sizeof(struct StatusMessage) - 1] != '\0')
758         {
759           GNUNET_break (0);
760           emsg = _("Invalid error message received from datastore service");
761         }
762     }  
763   if ( (status == GNUNET_SYSERR) &&
764        (emsg == NULL) )
765     {
766       GNUNET_break (0);
767       emsg = _("Invalid error message received from datastore service");
768     }
769 #if DEBUG_DATASTORE
770   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
771               "Received status %d/%s\n",
772               (int) status,
773               emsg);
774 #endif
775   GNUNET_STATISTICS_update (h->stats,
776                             gettext_noop ("# status messages received"),
777                             1,
778                             GNUNET_NO);
779   h->retry_time.rel_value = 0;
780   process_queue (h);
781   if (rc.cont != NULL)
782     rc.cont (rc.cont_cls, 
783              status,
784              emsg);
785 }
786
787
788 /**
789  * Store an item in the datastore.  If the item is already present,
790  * the priorities are summed up and the higher expiration time and
791  * lower anonymity level is used.
792  *
793  * @param h handle to the datastore
794  * @param rid reservation ID to use (from "reserve"); use 0 if no
795  *            prior reservation was made
796  * @param key key for the value
797  * @param size number of bytes in data
798  * @param data content stored
799  * @param type type of the content
800  * @param priority priority of the content
801  * @param anonymity anonymity-level for the content
802  * @param replication how often should the content be replicated to other peers?
803  * @param expiration expiration time for the content
804  * @param queue_priority ranking of this request in the priority queue
805  * @param max_queue_size at what queue size should this request be dropped
806  *        (if other requests of higher priority are in the queue)
807  * @param timeout timeout for the operation
808  * @param cont continuation to call when done
809  * @param cont_cls closure for cont
810  * @return NULL if the entry was not queued, otherwise a handle that can be used to
811  *         cancel; note that even if NULL is returned, the callback will be invoked
812  *         (or rather, will already have been invoked)
813  */
814 struct GNUNET_DATASTORE_QueueEntry *
815 GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
816                       uint32_t rid,
817                       const GNUNET_HashCode * key,
818                       size_t size,
819                       const void *data,
820                       enum GNUNET_BLOCK_Type type,
821                       uint32_t priority,
822                       uint32_t anonymity,
823                       uint32_t replication,
824                       struct GNUNET_TIME_Absolute expiration,
825                       unsigned int queue_priority,
826                       unsigned int max_queue_size,
827                       struct GNUNET_TIME_Relative timeout,
828                       GNUNET_DATASTORE_ContinuationWithStatus cont,
829                       void *cont_cls)
830 {
831   struct GNUNET_DATASTORE_QueueEntry *qe;
832   struct DataMessage *dm;
833   size_t msize;
834   union QueueContext qc;
835
836 #if DEBUG_DATASTORE
837   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
838               "Asked to put %u bytes of data under key `%s'\n",
839               size,
840               GNUNET_h2s (key));
841 #endif
842   msize = sizeof(struct DataMessage) + size;
843   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
844   qc.sc.cont = cont;
845   qc.sc.cont_cls = cont_cls;
846   qe = make_queue_entry (h, msize,
847                          queue_priority, max_queue_size, timeout,
848                          &process_status_message, &qc);
849   if (qe == NULL)
850     {
851 #if DEBUG_DATASTORE
852       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
853                   "Could not create queue entry for PUT\n");
854 #endif
855       return NULL;
856     }
857   GNUNET_STATISTICS_update (h->stats,
858                             gettext_noop ("# PUT requests executed"),
859                             1,
860                             GNUNET_NO);
861   dm = (struct DataMessage* ) &qe[1];
862   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
863   dm->header.size = htons(msize);
864   dm->rid = htonl(rid);
865   dm->size = htonl( (uint32_t) size);
866   dm->type = htonl(type);
867   dm->priority = htonl(priority);
868   dm->anonymity = htonl(anonymity);
869   dm->uid = GNUNET_htonll(0);
870   dm->expiration = GNUNET_TIME_absolute_hton(expiration);
871   dm->key = *key;
872   memcpy (&dm[1], data, size);
873   process_queue (h);
874   return qe;
875 }
876
877
878 /**
879  * Reserve space in the datastore.  This function should be used
880  * to avoid "out of space" failures during a longer sequence of "put"
881  * operations (for example, when a file is being inserted).
882  *
883  * @param h handle to the datastore
884  * @param amount how much space (in bytes) should be reserved (for content only)
885  * @param entries how many entries will be created (to calculate per-entry overhead)
886  * @param queue_priority ranking of this request in the priority queue
887  * @param max_queue_size at what queue size should this request be dropped
888  *        (if other requests of higher priority are in the queue)
889  * @param timeout how long to wait at most for a response (or before dying in queue)
890  * @param cont continuation to call when done; "success" will be set to
891  *             a positive reservation value if space could be reserved.
892  * @param cont_cls closure for cont
893  * @return NULL if the entry was not queued, otherwise a handle that can be used to
894  *         cancel; note that even if NULL is returned, the callback will be invoked
895  *         (or rather, will already have been invoked)
896  */
897 struct GNUNET_DATASTORE_QueueEntry *
898 GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
899                           uint64_t amount,
900                           uint32_t entries,
901                           unsigned int queue_priority,
902                           unsigned int max_queue_size,
903                           struct GNUNET_TIME_Relative timeout,
904                           GNUNET_DATASTORE_ContinuationWithStatus cont,
905                           void *cont_cls)
906 {
907   struct GNUNET_DATASTORE_QueueEntry *qe;
908   struct ReserveMessage *rm;
909   union QueueContext qc;
910
911   if (cont == NULL)
912     cont = &drop_status_cont;
913 #if DEBUG_DATASTORE
914   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
915               "Asked to reserve %llu bytes of data and %u entries'\n",
916               (unsigned long long) amount,
917               (unsigned int) entries);
918 #endif
919   qc.sc.cont = cont;
920   qc.sc.cont_cls = cont_cls;
921   qe = make_queue_entry (h, sizeof(struct ReserveMessage),
922                          queue_priority, max_queue_size, timeout,
923                          &process_status_message, &qc);
924   if (qe == NULL)
925     {
926 #if DEBUG_DATASTORE
927       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
928                   "Could not create queue entry to reserve\n");
929 #endif
930       return NULL;
931     }
932   GNUNET_STATISTICS_update (h->stats,
933                             gettext_noop ("# RESERVE requests executed"),
934                             1,
935                             GNUNET_NO);
936   rm = (struct ReserveMessage*) &qe[1];
937   rm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
938   rm->header.size = htons(sizeof (struct ReserveMessage));
939   rm->entries = htonl(entries);
940   rm->amount = GNUNET_htonll(amount);
941   process_queue (h);
942   return qe;
943 }
944
945
946 /**
947  * Signal that all of the data for which a reservation was made has
948  * been stored and that whatever excess space might have been reserved
949  * can now be released.
950  *
951  * @param h handle to the datastore
952  * @param rid reservation ID (value of "success" in original continuation
953  *        from the "reserve" function).
954  * @param queue_priority ranking of this request in the priority queue
955  * @param max_queue_size at what queue size should this request be dropped
956  *        (if other requests of higher priority are in the queue)
957  * @param queue_priority ranking of this request in the priority queue
958  * @param max_queue_size at what queue size should this request be dropped
959  *        (if other requests of higher priority are in the queue)
960  * @param timeout how long to wait at most for a response
961  * @param cont continuation to call when done
962  * @param cont_cls closure for cont
963  * @return NULL if the entry was not queued, otherwise a handle that can be used to
964  *         cancel; note that even if NULL is returned, the callback will be invoked
965  *         (or rather, will already have been invoked)
966  */
967 struct GNUNET_DATASTORE_QueueEntry *
968 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
969                                   uint32_t rid,
970                                   unsigned int queue_priority,
971                                   unsigned int max_queue_size,
972                                   struct GNUNET_TIME_Relative timeout,
973                                   GNUNET_DATASTORE_ContinuationWithStatus cont,
974                                   void *cont_cls)
975 {
976   struct GNUNET_DATASTORE_QueueEntry *qe;
977   struct ReleaseReserveMessage *rrm;
978   union QueueContext qc;
979
980   if (cont == NULL)
981     cont = &drop_status_cont;
982 #if DEBUG_DATASTORE
983   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
984               "Asked to release reserve %d\n",
985               rid);
986 #endif
987   qc.sc.cont = cont;
988   qc.sc.cont_cls = cont_cls;
989   qe = make_queue_entry (h, sizeof(struct ReleaseReserveMessage),
990                          queue_priority, max_queue_size, timeout,
991                          &process_status_message, &qc);
992   if (qe == NULL)
993     {
994 #if DEBUG_DATASTORE
995       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
996                   "Could not create queue entry to release reserve\n");
997 #endif
998       return NULL;
999     }
1000   GNUNET_STATISTICS_update (h->stats,
1001                             gettext_noop ("# RELEASE RESERVE requests executed"),
1002                             1,
1003                             GNUNET_NO);
1004   rrm = (struct ReleaseReserveMessage*) &qe[1];
1005   rrm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
1006   rrm->header.size = htons(sizeof (struct ReleaseReserveMessage));
1007   rrm->rid = htonl(rid);
1008   process_queue (h);
1009   return qe;
1010 }
1011
1012
1013 /**
1014  * Update a value in the datastore.
1015  *
1016  * @param h handle to the datastore
1017  * @param uid identifier for the value
1018  * @param priority how much to increase the priority of the value
1019  * @param expiration new expiration value should be MAX of existing and this argument
1020  * @param queue_priority ranking of this request in the priority queue
1021  * @param max_queue_size at what queue size should this request be dropped
1022  *        (if other requests of higher priority are in the queue)
1023  * @param timeout how long to wait at most for a response
1024  * @param cont continuation to call when done
1025  * @param cont_cls closure for cont
1026  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1027  *         cancel; note that even if NULL is returned, the callback will be invoked
1028  *         (or rather, will already have been invoked)
1029  */
1030 struct GNUNET_DATASTORE_QueueEntry *
1031 GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
1032                          uint64_t uid,
1033                          uint32_t priority,
1034                          struct GNUNET_TIME_Absolute expiration,
1035                          unsigned int queue_priority,
1036                          unsigned int max_queue_size,
1037                          struct GNUNET_TIME_Relative timeout,
1038                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1039                          void *cont_cls)
1040 {
1041   struct GNUNET_DATASTORE_QueueEntry *qe;
1042   struct UpdateMessage *um;
1043   union QueueContext qc;
1044
1045   if (cont == NULL)
1046     cont = &drop_status_cont;
1047 #if DEBUG_DATASTORE
1048   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1049               "Asked to update entry %llu raising priority by %u and expiration to %llu\n",
1050               uid,
1051               (unsigned int) priority,
1052               (unsigned long long) expiration.abs_value);
1053 #endif
1054   qc.sc.cont = cont;
1055   qc.sc.cont_cls = cont_cls;
1056   qe = make_queue_entry (h, sizeof(struct UpdateMessage),
1057                          queue_priority, max_queue_size, timeout,
1058                          &process_status_message, &qc);
1059   if (qe == NULL)
1060     {
1061 #if DEBUG_DATASTORE
1062       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1063                   "Could not create queue entry for UPDATE\n");
1064 #endif
1065       return NULL;
1066     }
1067   GNUNET_STATISTICS_update (h->stats,
1068                             gettext_noop ("# UPDATE requests executed"),
1069                             1,
1070                             GNUNET_NO);
1071   um = (struct UpdateMessage*) &qe[1];
1072   um->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
1073   um->header.size = htons(sizeof (struct UpdateMessage));
1074   um->priority = htonl(priority);
1075   um->expiration = GNUNET_TIME_absolute_hton(expiration);
1076   um->uid = GNUNET_htonll(uid);
1077   process_queue (h);
1078   return qe;
1079 }
1080
1081
1082 /**
1083  * Explicitly remove some content from the database.
1084  * The "cont"inuation will be called with status
1085  * "GNUNET_OK" if content was removed, "GNUNET_NO"
1086  * if no matching entry was found and "GNUNET_SYSERR"
1087  * on all other types of errors.
1088  *
1089  * @param h handle to the datastore
1090  * @param key key for the value
1091  * @param size number of bytes in data
1092  * @param data content stored
1093  * @param queue_priority ranking of this request in the priority queue
1094  * @param max_queue_size at what queue size should this request be dropped
1095  *        (if other requests of higher priority are in the queue)
1096  * @param timeout how long to wait at most for a response
1097  * @param cont continuation to call when done
1098  * @param cont_cls closure for cont
1099  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1100  *         cancel; note that even if NULL is returned, the callback will be invoked
1101  *         (or rather, will already have been invoked)
1102  */
1103 struct GNUNET_DATASTORE_QueueEntry *
1104 GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
1105                          const GNUNET_HashCode *key,
1106                          size_t size, 
1107                          const void *data,
1108                          unsigned int queue_priority,
1109                          unsigned int max_queue_size,
1110                          struct GNUNET_TIME_Relative timeout,
1111                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1112                          void *cont_cls)
1113 {
1114   struct GNUNET_DATASTORE_QueueEntry *qe;
1115   struct DataMessage *dm;
1116   size_t msize;
1117   union QueueContext qc;
1118
1119   if (cont == NULL)
1120     cont = &drop_status_cont;
1121 #if DEBUG_DATASTORE
1122   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1123               "Asked to remove %u bytes under key `%s'\n",
1124               size,
1125               GNUNET_h2s (key));
1126 #endif
1127   qc.sc.cont = cont;
1128   qc.sc.cont_cls = cont_cls;
1129   msize = sizeof(struct DataMessage) + size;
1130   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1131   qe = make_queue_entry (h, msize,
1132                          queue_priority, max_queue_size, timeout,
1133                          &process_status_message, &qc);
1134   if (qe == NULL)
1135     {
1136 #if DEBUG_DATASTORE
1137       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1138                   "Could not create queue entry for REMOVE\n");
1139 #endif
1140       return NULL;
1141     }
1142   GNUNET_STATISTICS_update (h->stats,
1143                             gettext_noop ("# REMOVE requests executed"),
1144                             1,
1145                             GNUNET_NO);
1146   dm = (struct DataMessage*) &qe[1];
1147   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
1148   dm->header.size = htons(msize);
1149   dm->rid = htonl(0);
1150   dm->size = htonl(size);
1151   dm->type = htonl(0);
1152   dm->priority = htonl(0);
1153   dm->anonymity = htonl(0);
1154   dm->uid = GNUNET_htonll(0);
1155   dm->expiration = GNUNET_TIME_absolute_hton(GNUNET_TIME_UNIT_ZERO_ABS);
1156   dm->key = *key;
1157   memcpy (&dm[1], data, size);
1158   process_queue (h);
1159   return qe;
1160 }
1161
1162
1163 /**
1164  * Type of a function to call when we receive a message
1165  * from the service.
1166  *
1167  * @param cls closure
1168  * @param msg message received, NULL on timeout or fatal error
1169  */
1170 static void 
1171 process_result_message (void *cls,
1172                         const struct GNUNET_MessageHeader * msg)
1173 {
1174   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
1175   struct GNUNET_DATASTORE_Handle *h = qe->h;
1176   struct ResultContext rc = qe->qc.rc;
1177   const struct DataMessage *dm;
1178   int was_transmitted;
1179
1180   h->in_receive = GNUNET_NO;
1181   if (msg == NULL)
1182    {
1183       was_transmitted = qe->was_transmitted;
1184       free_queue_entry (qe);
1185       if (was_transmitted == GNUNET_YES)
1186         {
1187           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1188                       _("Failed to receive response from database.\n"));
1189           do_disconnect (h);
1190         }
1191       else
1192         {
1193 #if DEBUG_DATASTORE
1194           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1195                       "Request dropped due to finite datastore queue length.\n");
1196 #endif
1197         }
1198       if (rc.iter != NULL)
1199         rc.iter (rc.iter_cls,
1200                  NULL, 0, NULL, 0, 0, 0, 
1201                  GNUNET_TIME_UNIT_ZERO_ABS, 0); 
1202       return;
1203     }
1204   GNUNET_assert (GNUNET_YES == qe->was_transmitted);
1205   GNUNET_assert (h->queue_head == qe);
1206   if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) 
1207     {
1208       GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader));
1209       free_queue_entry (qe);
1210 #if DEBUG_DATASTORE
1211       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1212                   "Received end of result set, new queue size is %u\n",
1213                   h->queue_size);
1214 #endif
1215       if (rc.iter != NULL)
1216         rc.iter (rc.iter_cls,
1217                  NULL, 0, NULL, 0, 0, 0, 
1218                  GNUNET_TIME_UNIT_ZERO_ABS, 0); 
1219       h->retry_time.rel_value = 0;
1220       h->result_count = 0;
1221       process_queue (h);
1222       return;
1223     }
1224   if ( (ntohs(msg->size) < sizeof(struct DataMessage)) ||
1225        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) ||
1226        (ntohs(msg->size) != sizeof(struct DataMessage) + ntohl (((const struct DataMessage*)msg)->size)) )
1227     {
1228       GNUNET_break (0);
1229       free_queue_entry (qe);
1230       h->retry_time = GNUNET_TIME_UNIT_ZERO;
1231       do_disconnect (h);
1232       if (rc.iter != NULL)
1233         rc.iter (rc.iter_cls,
1234                  NULL, 0, NULL, 0, 0, 0, 
1235                  GNUNET_TIME_UNIT_ZERO_ABS, 0); 
1236       return;
1237     }
1238   GNUNET_STATISTICS_update (h->stats,
1239                             gettext_noop ("# Results received"),
1240                             1,
1241                             GNUNET_NO);
1242   if (rc.iter == NULL)
1243     {
1244       h->result_count++;
1245       GNUNET_STATISTICS_update (h->stats,
1246                                 gettext_noop ("# Excess results received"),
1247                                 1,
1248                                 GNUNET_NO);
1249       if (h->result_count > MAX_EXCESS_RESULTS)
1250         {
1251           free_queue_entry (qe);
1252           GNUNET_STATISTICS_update (h->stats,
1253                                     gettext_noop ("# Forced database connection resets"),
1254                                     1,
1255                                     GNUNET_NO);
1256           h->retry_time = GNUNET_TIME_UNIT_ZERO;
1257           do_disconnect (h);      
1258           return;
1259         }
1260       if (GNUNET_YES == qe->one_shot)
1261         free_queue_entry (qe);
1262       else
1263         GNUNET_DATASTORE_iterate_get_next (h);
1264       return;
1265     }
1266   dm = (const struct DataMessage*) msg;
1267 #if DEBUG_DATASTORE
1268   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1269               "Received result %llu with type %u and size %u with key %s\n",
1270               (unsigned long long) GNUNET_ntohll(dm->uid),
1271               ntohl(dm->type),
1272               ntohl(dm->size),
1273               GNUNET_h2s(&dm->key));
1274 #endif
1275   if (GNUNET_YES == qe->one_shot)
1276     free_queue_entry (qe);
1277   h->retry_time.rel_value = 0;
1278   rc.iter (rc.iter_cls,
1279            &dm->key,
1280            ntohl(dm->size),
1281            &dm[1],
1282            ntohl(dm->type),
1283            ntohl(dm->priority),
1284            ntohl(dm->anonymity),
1285            GNUNET_TIME_absolute_ntoh(dm->expiration),   
1286            GNUNET_ntohll(dm->uid));
1287 }
1288
1289
1290 /**
1291  * Get a random value from the datastore for content replication.
1292  * Returns a single, random value among those with the highest
1293  * replication score, lowering positive replication scores by one for
1294  * the chosen value (if only content with a replication score exists,
1295  * a random value is returned and replication scores are not changed).
1296  *
1297  * @param h handle to the datastore
1298  * @param queue_priority ranking of this request in the priority queue
1299  * @param max_queue_size at what queue size should this request be dropped
1300  *        (if other requests of higher priority are in the queue)
1301  * @param timeout how long to wait at most for a response
1302  * @param iter function to call on a random value; it
1303  *        will be called once with a value (if available)
1304  *        and always once with a value of NULL.
1305  * @param iter_cls closure for iter
1306  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1307  *         cancel; note that even if NULL is returned, the callback will be invoked
1308  *         (or rather, will already have been invoked)
1309  */
1310 struct GNUNET_DATASTORE_QueueEntry *
1311 GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
1312                                       unsigned int queue_priority,
1313                                       unsigned int max_queue_size,
1314                                       struct GNUNET_TIME_Relative timeout,
1315                                       GNUNET_DATASTORE_Iterator iter, 
1316                                       void *iter_cls)
1317 {
1318   struct GNUNET_DATASTORE_QueueEntry *qe;
1319   struct GNUNET_MessageHeader *m;
1320   union QueueContext qc;
1321
1322 #if DEBUG_DATASTORE
1323   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1324               "Asked to get replication entry in %llu ms\n",
1325               (unsigned long long) timeout.rel_value);
1326 #endif
1327   qc.rc.iter = iter;
1328   qc.rc.iter_cls = iter_cls;
1329   qe = make_queue_entry (h, sizeof(struct GNUNET_MessageHeader),
1330                          queue_priority, max_queue_size, timeout,
1331                          &process_result_message, &qc);
1332   if (qe == NULL)
1333     {
1334 #if DEBUG_DATASTORE
1335       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1336                   "Could not create queue entry for GET REPLICATION\n");
1337 #endif
1338       return NULL;    
1339     }
1340   qe->one_shot = GNUNET_YES;
1341   GNUNET_STATISTICS_update (h->stats,
1342                             gettext_noop ("# GET REPLICATION requests executed"),
1343                             1,
1344                             GNUNET_NO);
1345   m = (struct GNUNET_MessageHeader*) &qe[1];
1346   m->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION);
1347   m->size = htons(sizeof (struct GNUNET_MessageHeader));
1348   process_queue (h);
1349   return qe;
1350 }
1351
1352
1353 /**
1354  * Get a zero-anonymity value from the datastore.
1355  *
1356  * @param h handle to the datastore
1357  * @param queue_priority ranking of this request in the priority queue
1358  * @param max_queue_size at what queue size should this request be dropped
1359  *        (if other requests of higher priority are in the queue)
1360  * @param timeout how long to wait at most for a response
1361  * @param type allowed type for the operation
1362  * @param iter function to call on a random value; it
1363  *        will be called once with a value (if available)
1364  *        and always once with a value of NULL.
1365  * @param iter_cls closure for iter
1366  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1367  *         cancel; note that even if NULL is returned, the callback will be invoked
1368  *         (or rather, will already have been invoked)
1369  */
1370 struct GNUNET_DATASTORE_QueueEntry *
1371 GNUNET_DATASTORE_iterate_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1372                                          unsigned int queue_priority,
1373                                          unsigned int max_queue_size,
1374                                          struct GNUNET_TIME_Relative timeout,
1375                                          enum GNUNET_BLOCK_Type type,
1376                                          GNUNET_DATASTORE_Iterator iter, 
1377                                          void *iter_cls)
1378 {
1379   struct GNUNET_DATASTORE_QueueEntry *qe;
1380   struct GetZeroAnonymityMessage *m;
1381   union QueueContext qc;
1382
1383   GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY);
1384 #if DEBUG_DATASTORE
1385   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1386               "Asked to get zero-anonymity entry in %llu ms\n",
1387               (unsigned long long) timeout.rel_value);
1388 #endif
1389   qc.rc.iter = iter;
1390   qc.rc.iter_cls = iter_cls;
1391   qe = make_queue_entry (h, sizeof(struct GetZeroAnonymityMessage),
1392                          queue_priority, max_queue_size, timeout,
1393                          &process_result_message, &qc);
1394   if (qe == NULL)
1395     {
1396 #if DEBUG_DATASTORE
1397       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1398                   "Could not create queue entry for zero-anonymity iteration\n");
1399 #endif
1400       return NULL;    
1401     }
1402   GNUNET_STATISTICS_update (h->stats,
1403                             gettext_noop ("# GET ZERO ANONYMITY requests executed"),
1404                             1,
1405                             GNUNET_NO);
1406   m = (struct GetZeroAnonymityMessage*) &qe[1];
1407   m->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY);
1408   m->header.size = htons(sizeof (struct GetZeroAnonymityMessage));
1409   m->type = htonl ((uint32_t) type);
1410   process_queue (h);
1411   return qe;
1412 }
1413
1414
1415
1416 /**
1417  * Iterate over the results for a particular key
1418  * in the datastore.  The iterator will only be called
1419  * once initially; if the first call did contain a
1420  * result, further results can be obtained by calling
1421  * "GNUNET_DATASTORE_iterate_get_next" with the given argument.
1422  *
1423  * @param h handle to the datastore
1424  * @param key maybe NULL (to match all entries)
1425  * @param type desired type, 0 for any
1426  * @param queue_priority ranking of this request in the priority queue
1427  * @param max_queue_size at what queue size should this request be dropped
1428  *        (if other requests of higher priority are in the queue)
1429  * @param timeout how long to wait at most for a response
1430  * @param iter function to call on each matching value;
1431  *        will be called once with a NULL value at the end
1432  * @param iter_cls closure for iter
1433  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1434  *         cancel; note that even if NULL is returned, the callback will be invoked
1435  *         (or rather, will already have been invoked)
1436  */
1437 struct GNUNET_DATASTORE_QueueEntry *
1438 GNUNET_DATASTORE_iterate_key (struct GNUNET_DATASTORE_Handle *h,
1439                               const GNUNET_HashCode * key,
1440                               enum GNUNET_BLOCK_Type type,
1441                               unsigned int queue_priority,
1442                               unsigned int max_queue_size,
1443                               struct GNUNET_TIME_Relative timeout,
1444                               GNUNET_DATASTORE_Iterator iter, 
1445                               void *iter_cls)
1446 {
1447   struct GNUNET_DATASTORE_QueueEntry *qe;
1448   struct GetMessage *gm;
1449   union QueueContext qc;
1450
1451 #if DEBUG_DATASTORE
1452   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1453               "Asked to look for data of type %u under key `%s'\n",
1454               (unsigned int) type,
1455               GNUNET_h2s (key));
1456 #endif
1457   qc.rc.iter = iter;
1458   qc.rc.iter_cls = iter_cls;
1459   qe = make_queue_entry (h, sizeof(struct GetMessage),
1460                          queue_priority, max_queue_size, timeout,
1461                          &process_result_message, &qc);
1462   if (qe == NULL)
1463     {
1464 #if DEBUG_DATASTORE
1465       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1466                   "Could not queue request for `%s'\n",
1467                   GNUNET_h2s (key));
1468 #endif
1469       return NULL;
1470     }
1471   GNUNET_STATISTICS_update (h->stats,
1472                             gettext_noop ("# GET requests executed"),
1473                             1,
1474                             GNUNET_NO);
1475   gm = (struct GetMessage*) &qe[1];
1476   gm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1477   gm->type = htonl(type);
1478   if (key != NULL)
1479     {
1480       gm->header.size = htons(sizeof (struct GetMessage));
1481       gm->key = *key;
1482     }
1483   else
1484     {
1485       gm->header.size = htons(sizeof (struct GetMessage) - sizeof(GNUNET_HashCode));
1486     }
1487   process_queue (h);
1488   return qe;
1489 }
1490
1491
1492 /**
1493  * Function called to trigger obtaining the next result
1494  * from the datastore.
1495  * 
1496  * @param h handle to the datastore
1497  */
1498 void 
1499 GNUNET_DATASTORE_iterate_get_next (struct GNUNET_DATASTORE_Handle *h)
1500 {
1501   struct GNUNET_DATASTORE_QueueEntry *qe = h->queue_head;
1502
1503   GNUNET_assert (&process_result_message == qe->response_proc);
1504   h->in_receive = GNUNET_YES;
1505   GNUNET_CLIENT_receive (h->client,
1506                          qe->response_proc,
1507                          qe,
1508                          GNUNET_TIME_absolute_get_remaining (qe->timeout));
1509 }
1510
1511
1512 /**
1513  * Cancel a datastore operation.  The final callback from the
1514  * operation must not have been done yet.
1515  * 
1516  * @param qe operation to cancel
1517  */
1518 void
1519 GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
1520 {
1521   struct GNUNET_DATASTORE_Handle *h;
1522
1523   h = qe->h;
1524 #if DEBUG_DATASTORE
1525   GNUNET_log  (GNUNET_ERROR_TYPE_DEBUG,
1526                "Pending DATASTORE request %p cancelled (%d, %d)\n",
1527                qe,
1528                qe->was_transmitted,
1529                h->queue_head == qe);
1530 #endif
1531   if (GNUNET_YES == qe->was_transmitted) 
1532     {
1533       free_queue_entry (qe);
1534       h->retry_time = GNUNET_TIME_UNIT_ZERO;
1535       do_disconnect (h);
1536       return;
1537     }
1538   free_queue_entry (qe);
1539   process_queue (h);
1540 }
1541
1542
1543 /* end of datastore_api.c */