fix
[oweals/gnunet.git] / src / datastore / datastore_api.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/datastore_api.c
23  * @brief Management for the datastore for files stored on a GNUnet node.  Implements
24  *        a priority queue for requests (with timeouts).
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet_arm_service.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_datastore_service.h"
31 #include "gnunet_statistics_service.h"
32 #include "datastore.h"
33
34 /**
35  * If a client stopped asking for more results, how many more do
36  * we receive from the DB before killing the connection?  Trade-off
37  * between re-doing TCP handshakes and (needlessly) receiving 
38  * useless results.
39  */
40 #define MAX_EXCESS_RESULTS 8
41
42 /**
43  * Context for processing status messages.
44  */
45 struct StatusContext
46 {
47   /**
48    * Continuation to call with the status.
49    */
50   GNUNET_DATASTORE_ContinuationWithStatus cont;
51
52   /**
53    * Closure for cont.
54    */
55   void *cont_cls;
56
57 };
58
59
60 /**
61  * Context for processing result messages.
62  */
63 struct ResultContext
64 {
65   /**
66    * Function to call with the result.
67    */
68   GNUNET_DATASTORE_DatumProcessor proc;
69
70   /**
71    * Closure for proc.
72    */
73   void *proc_cls;
74
75 };
76
77
78 /**
79  *  Context for a queue operation.
80  */
81 union QueueContext
82 {
83
84   struct StatusContext sc;
85   
86   struct ResultContext rc;
87
88 };
89
90
91
92 /**
93  * Entry in our priority queue.
94  */
95 struct GNUNET_DATASTORE_QueueEntry
96 {
97
98   /**
99    * This is a linked list.
100    */
101   struct GNUNET_DATASTORE_QueueEntry *next;
102
103   /**
104    * This is a linked list.
105    */
106   struct GNUNET_DATASTORE_QueueEntry *prev;
107
108   /**
109    * Handle to the master context.
110    */
111   struct GNUNET_DATASTORE_Handle *h;
112
113   /**
114    * Response processor (NULL if we are not waiting for a response).
115    * This struct should be used for the closure, function-specific
116    * arguments can be passed via 'qc'.
117    */
118   GNUNET_CLIENT_MessageHandler response_proc;
119
120   /**
121    * Function to call after transmission of the request.
122    */
123   GNUNET_DATASTORE_ContinuationWithStatus cont;
124    
125   /**
126    * Closure for 'cont'.
127    */
128   void *cont_cls;
129
130   /**
131    * Context for the operation.
132    */
133   union QueueContext qc;
134
135   /**
136    * Task for timeout signalling.
137    */
138   GNUNET_SCHEDULER_TaskIdentifier task;
139
140   /**
141    * Timeout for the current operation.
142    */
143   struct GNUNET_TIME_Absolute timeout;
144
145   /**
146    * Priority in the queue.
147    */
148   unsigned int priority;
149
150   /**
151    * Maximum allowed length of queue (otherwise
152    * this request should be discarded).
153    */
154   unsigned int max_queue;
155
156   /**
157    * Number of bytes in the request message following
158    * this struct.  32-bit value for nicer memory
159    * access (and overall struct alignment).
160    */
161   uint32_t message_size;
162
163   /**
164    * Has this message been transmitted to the service?
165    * Only ever GNUNET_YES for the head of the queue.
166    * Note that the overall struct should end at a 
167    * multiple of 64 bits.
168    */
169   int was_transmitted;
170   
171 };
172
173 /**
174  * Handle to the datastore service. 
175  */
176 struct GNUNET_DATASTORE_Handle
177 {
178
179   /**
180    * Our configuration.
181    */
182   const struct GNUNET_CONFIGURATION_Handle *cfg;
183
184   /**
185    * Current connection to the datastore service.
186    */
187   struct GNUNET_CLIENT_Connection *client;
188
189   /**
190    * Handle for statistics.
191    */
192   struct GNUNET_STATISTICS_Handle *stats;
193
194   /**
195    * Current transmit handle.
196    */
197   struct GNUNET_CLIENT_TransmitHandle *th;
198
199   /**
200    * Current head of priority queue.
201    */
202   struct GNUNET_DATASTORE_QueueEntry *queue_head;
203
204   /**
205    * Current tail of priority queue.
206    */
207   struct GNUNET_DATASTORE_QueueEntry *queue_tail;
208
209   /**
210    * Task for trying to reconnect.
211    */
212   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
213
214   /**
215    * How quickly should we retry?  Used for exponential back-off on
216    * connect-errors.
217    */
218   struct GNUNET_TIME_Relative retry_time;
219
220   /**
221    * Number of entries in the queue.
222    */
223   unsigned int queue_size;
224
225   /**
226    * Number of results we're receiving for the current query
227    * after application stopped to care.  Used to determine when
228    * to reset the connection.
229    */
230   unsigned int result_count;
231
232   /**
233    * Are we currently trying to receive from the service?
234    */
235   int in_receive;
236
237   /**
238    * We should ignore the next message(s) from the service.
239    */
240   unsigned int skip_next_messages;
241
242 };
243
244
245
246 /**
247  * Connect to the datastore service.
248  *
249  * @param cfg configuration to use
250  * @return handle to use to access the service
251  */
252 struct GNUNET_DATASTORE_Handle *
253 GNUNET_DATASTORE_connect (const struct
254                           GNUNET_CONFIGURATION_Handle
255                           *cfg)
256 {
257   struct GNUNET_CLIENT_Connection *c;
258   struct GNUNET_DATASTORE_Handle *h;
259   
260   c = GNUNET_CLIENT_connect ("datastore", cfg);
261   if (c == NULL)
262     return NULL; /* oops */
263   h = GNUNET_malloc (sizeof(struct GNUNET_DATASTORE_Handle) + 
264                      GNUNET_SERVER_MAX_MESSAGE_SIZE - 1);
265   h->client = c;
266   h->cfg = cfg;
267   h->stats = GNUNET_STATISTICS_create ("datastore-api",
268                                        cfg);
269   return h;
270 }
271
272
273 /**
274  * Transmit DROP message to datastore service.
275  *
276  * @param cls the 'struct GNUNET_DATASTORE_Handle'
277  * @param size number of bytes that can be copied to buf
278  * @param buf where to copy the drop message
279  * @return number of bytes written to buf
280  */
281 static size_t
282 transmit_drop (void *cls,
283                size_t size, 
284                void *buf)
285 {
286   struct GNUNET_DATASTORE_Handle *h = cls;
287   struct GNUNET_MessageHeader *hdr;
288   
289   if (buf == NULL)
290     {
291       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
292                   _("Failed to transmit request to drop database.\n"));
293       GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
294       return 0;
295     }
296   GNUNET_assert (size >= sizeof(struct GNUNET_MessageHeader));
297   hdr = buf;
298   hdr->size = htons(sizeof(struct GNUNET_MessageHeader));
299   hdr->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
300   GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
301   return sizeof(struct GNUNET_MessageHeader);
302 }
303
304
305 /**
306  * Disconnect from the datastore service (and free
307  * associated resources).
308  *
309  * @param h handle to the datastore
310  * @param drop set to GNUNET_YES to delete all data in datastore (!)
311  */
312 void
313 GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
314                              int drop)
315 {
316   struct GNUNET_DATASTORE_QueueEntry *qe;
317
318   if (h->client != NULL)
319     {
320       GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
321       h->client = NULL;
322     }
323   if (h->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
324     {
325       GNUNET_SCHEDULER_cancel (h->reconnect_task);
326       h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
327     }
328   while (NULL != (qe = h->queue_head))
329     {
330       GNUNET_assert (NULL != qe->response_proc);
331       qe->response_proc (h, NULL);
332     }
333   if (GNUNET_YES == drop) 
334     {
335       h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
336       if (h->client != NULL)
337         {
338           if (NULL != 
339               GNUNET_CLIENT_notify_transmit_ready (h->client,
340                                                    sizeof(struct GNUNET_MessageHeader),
341                                                    GNUNET_TIME_UNIT_MINUTES,
342                                                    GNUNET_YES,
343                                                    &transmit_drop,
344                                                    h))
345             return;
346           GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
347         }
348       GNUNET_break (0);
349     }
350   GNUNET_STATISTICS_destroy (h->stats,
351                              GNUNET_NO);
352   GNUNET_free (h);
353 }
354
355
356 /**
357  * A request has timed out (before being transmitted to the service).
358  *
359  * @param cls the 'struct GNUNET_DATASTORE_QueueEntry'
360  * @param tc scheduler context
361  */
362 static void
363 timeout_queue_entry (void *cls,
364                      const struct GNUNET_SCHEDULER_TaskContext *tc)
365 {
366   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
367
368   GNUNET_STATISTICS_update (qe->h->stats,
369                             gettext_noop ("# queue entry timeouts"),
370                             1,
371                             GNUNET_NO);
372   qe->task = GNUNET_SCHEDULER_NO_TASK;
373   GNUNET_assert (qe->was_transmitted == GNUNET_NO);
374   qe->response_proc (qe->h, NULL);
375 }
376
377
378 /**
379  * Create a new entry for our priority queue (and possibly discard other entires if
380  * the queue is getting too long).
381  *
382  * @param h handle to the datastore
383  * @param msize size of the message to queue
384  * @param queue_priority priority of the entry
385  * @param max_queue_size at what queue size should this request be dropped
386  *        (if other requests of higher priority are in the queue)
387  * @param timeout timeout for the operation
388  * @param response_proc function to call with replies (can be NULL)
389  * @param qc client context (NOT a closure for response_proc)
390  * @return NULL if the queue is full 
391  */
392 static struct GNUNET_DATASTORE_QueueEntry *
393 make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
394                   size_t msize,
395                   unsigned int queue_priority,
396                   unsigned int max_queue_size,
397                   struct GNUNET_TIME_Relative timeout,
398                   GNUNET_CLIENT_MessageHandler response_proc,            
399                   const union QueueContext *qc)
400 {
401   struct GNUNET_DATASTORE_QueueEntry *ret;
402   struct GNUNET_DATASTORE_QueueEntry *pos;
403   unsigned int c;
404
405   c = 0;
406   pos = h->queue_head;
407   while ( (pos != NULL) &&
408           (c < max_queue_size) &&
409           (pos->priority >= queue_priority) )
410     {
411       c++;
412       pos = pos->next;
413     }
414   if (c >= max_queue_size)
415     {
416       GNUNET_STATISTICS_update (h->stats,
417                                 gettext_noop ("# queue overflows"),
418                                 1,
419                                 GNUNET_NO);
420       return NULL;
421     }
422   ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize);
423   ret->h = h;
424   ret->response_proc = response_proc;
425   ret->qc = *qc;
426   ret->timeout = GNUNET_TIME_relative_to_absolute (timeout);
427   ret->priority = queue_priority;
428   ret->max_queue = max_queue_size;
429   ret->message_size = msize;
430   ret->was_transmitted = GNUNET_NO;
431   if (pos == NULL)
432     {
433       /* append at the tail */
434       pos = h->queue_tail;
435     }
436   else
437     {
438       pos = pos->prev; 
439       /* do not insert at HEAD if HEAD query was already
440          transmitted and we are still receiving replies! */
441       if ( (pos == NULL) &&
442            (h->queue_head->was_transmitted) )
443         pos = h->queue_head;
444     }
445   c++;
446   GNUNET_STATISTICS_update (h->stats,
447                             gettext_noop ("# queue entries created"),
448                             1,
449                             GNUNET_NO);
450   GNUNET_CONTAINER_DLL_insert_after (h->queue_head,
451                                      h->queue_tail,
452                                      pos,
453                                      ret);
454   h->queue_size++;
455   ret->task = GNUNET_SCHEDULER_add_delayed (timeout,
456                                             &timeout_queue_entry,
457                                             ret);
458   pos = ret->next;
459   while (pos != NULL) 
460     {
461       if (pos->max_queue < h->queue_size)
462         {
463           GNUNET_assert (pos->response_proc != NULL);
464           /* move 'pos' element to head so that it will be 
465              killed on 'NULL' call below */
466           GNUNET_CONTAINER_DLL_remove (h->queue_head,
467                                        h->queue_tail,
468                                        pos);
469           GNUNET_CONTAINER_DLL_insert (h->queue_head,
470                                        h->queue_tail,
471                                        pos);
472           pos->response_proc (h, NULL);
473           break;
474         }
475       pos = pos->next;
476     }
477   return ret;
478 }
479
480
481 /**
482  * Process entries in the queue (or do nothing if we are already
483  * doing so).
484  * 
485  * @param h handle to the datastore
486  */
487 static void
488 process_queue (struct GNUNET_DATASTORE_Handle *h);
489
490
491 /**
492  * Try reconnecting to the datastore service.
493  *
494  * @param cls the 'struct GNUNET_DATASTORE_Handle'
495  * @param tc scheduler context
496  */
497 static void
498 try_reconnect (void *cls,
499                const struct GNUNET_SCHEDULER_TaskContext *tc)
500 {
501   struct GNUNET_DATASTORE_Handle *h = cls;
502
503   if (h->retry_time.rel_value < GNUNET_CONSTANTS_SERVICE_RETRY.rel_value)
504     h->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY;
505   else
506     h->retry_time = GNUNET_TIME_relative_multiply (h->retry_time, 2);
507   if (h->retry_time.rel_value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.rel_value)
508     h->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
509   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
510   h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
511   if (h->client == NULL)
512     {
513       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
514                   "DATASTORE reconnect failed (fatally)\n");
515       return;
516     }
517   GNUNET_STATISTICS_update (h->stats,
518                             gettext_noop ("# datastore connections (re)created"),
519                             1,
520                             GNUNET_NO);
521 #if DEBUG_DATASTORE
522   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
523               "Reconnected to DATASTORE\n");
524 #endif
525   process_queue (h);
526 }
527
528
529 /**
530  * Disconnect from the service and then try reconnecting to the datastore service
531  * after some delay.
532  *
533  * @param h handle to datastore to disconnect and reconnect
534  */
535 static void
536 do_disconnect (struct GNUNET_DATASTORE_Handle *h)
537 {
538   if (h->client == NULL)
539     {
540 #if DEBUG_DATASTORE
541       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
542                   "client NULL in disconnect, will not try to reconnect\n");
543 #endif
544       return;
545     }
546 #if 0
547   GNUNET_STATISTICS_update (stats,
548                             gettext_noop ("# reconnected to DATASTORE"),
549                             1,
550                             GNUNET_NO);
551 #endif
552   GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
553   h->skip_next_messages = 0;
554   h->client = NULL;
555   h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->retry_time,
556                                                     &try_reconnect,
557                                                     h);      
558 }
559
560
561 /**
562  * Transmit request from queue to datastore service.
563  *
564  * @param cls the 'struct GNUNET_DATASTORE_Handle'
565  * @param size number of bytes that can be copied to buf
566  * @param buf where to copy the drop message
567  * @return number of bytes written to buf
568  */
569 static size_t
570 transmit_request (void *cls,
571                   size_t size, 
572                   void *buf)
573 {
574   struct GNUNET_DATASTORE_Handle *h = cls;
575   struct GNUNET_DATASTORE_QueueEntry *qe;
576   size_t msize;
577
578   h->th = NULL;
579   if (NULL == (qe = h->queue_head))
580     return 0; /* no entry in queue */
581   if (buf == NULL)
582     {
583       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
584                   _("Failed to transmit request to DATASTORE.\n"));
585       GNUNET_STATISTICS_update (h->stats,
586                                 gettext_noop ("# transmission request failures"),
587                                 1,
588                                 GNUNET_NO);
589       do_disconnect (h);
590       return 0;
591     }
592   if (size < (msize = qe->message_size))
593     {
594       process_queue (h);
595       return 0;
596     }
597  #if DEBUG_DATASTORE
598   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
599               "Transmitting %u byte request to DATASTORE\n",
600               msize);
601 #endif
602   memcpy (buf, &qe[1], msize);
603   qe->was_transmitted = GNUNET_YES;
604   GNUNET_SCHEDULER_cancel (qe->task);
605   qe->task = GNUNET_SCHEDULER_NO_TASK;
606   h->in_receive = GNUNET_YES;
607   GNUNET_CLIENT_receive (h->client,
608                          qe->response_proc,
609                          h,
610                          GNUNET_TIME_absolute_get_remaining (qe->timeout));
611   GNUNET_STATISTICS_update (h->stats,
612                             gettext_noop ("# bytes sent to datastore"),
613                             1,
614                             GNUNET_NO);
615   return msize;
616 }
617
618
619 /**
620  * Process entries in the queue (or do nothing if we are already
621  * doing so).
622  * 
623  * @param h handle to the datastore
624  */
625 static void
626 process_queue (struct GNUNET_DATASTORE_Handle *h)
627 {
628   struct GNUNET_DATASTORE_QueueEntry *qe;
629
630   if (NULL == (qe = h->queue_head))
631     {
632 #if DEBUG_DATASTORE > 1
633       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
634                   "Queue empty\n");
635 #endif
636       return; /* no entry in queue */
637     }
638   if (qe->was_transmitted == GNUNET_YES)
639     {
640 #if DEBUG_DATASTORE > 1
641       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
642                   "Head request already transmitted\n");
643 #endif
644       return; /* waiting for replies */
645     }
646   if (h->th != NULL)
647     {
648 #if DEBUG_DATASTORE > 1
649       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
650                   "Pending transmission request\n");
651 #endif
652       return; /* request pending */
653     }
654   if (h->client == NULL)
655     {
656 #if DEBUG_DATASTORE > 1
657       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
658                   "Not connected\n");
659 #endif
660       return; /* waiting for reconnect */
661     }
662 #if DEBUG_DATASTORE
663   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
664               "Queueing %u byte request to DATASTORE\n",
665               qe->message_size);
666 #endif
667   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
668                                                qe->message_size,
669                                                GNUNET_TIME_absolute_get_remaining (qe->timeout),
670                                                GNUNET_YES,
671                                                &transmit_request,
672                                                h);
673 }
674
675
676 /**
677  * Dummy continuation used to do nothing (but be non-zero).
678  *
679  * @param cls closure
680  * @param result result 
681  * @param emsg error message
682  */
683 static void
684 drop_status_cont (void *cls, int32_t result, const char *emsg)
685 {
686   /* do nothing */
687 }
688
689
690 static void
691 free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
692 {
693   struct GNUNET_DATASTORE_Handle *h = qe->h;
694
695   GNUNET_CONTAINER_DLL_remove (h->queue_head,
696                                h->queue_tail,
697                                qe);
698   if (qe->task != GNUNET_SCHEDULER_NO_TASK)
699     {
700       GNUNET_SCHEDULER_cancel (qe->task);
701       qe->task = GNUNET_SCHEDULER_NO_TASK;
702     }
703   h->queue_size--;
704   qe->was_transmitted = GNUNET_SYSERR; /* use-after-free warning */
705   GNUNET_free (qe);
706 }
707
708 /**
709  * Type of a function to call when we receive a message
710  * from the service.
711  *
712  * @param cls closure
713  * @param msg message received, NULL on timeout or fatal error
714  */
715 static void 
716 process_status_message (void *cls,
717                         const struct
718                         GNUNET_MessageHeader * msg)
719 {
720   struct GNUNET_DATASTORE_Handle *h = cls;
721   struct GNUNET_DATASTORE_QueueEntry *qe;
722   struct StatusContext rc;
723   const struct StatusMessage *sm;
724   const char *emsg;
725   int32_t status;
726   int was_transmitted;
727
728   h->in_receive = GNUNET_NO;
729   if (h->skip_next_messages > 0)
730     {
731       h->skip_next_messages--;
732       process_queue (h);
733       return;
734    } 
735   if (NULL == (qe = h->queue_head))
736     {
737       GNUNET_break (0);
738       do_disconnect (h);
739       return;
740     }
741   rc = qe->qc.sc;
742   if (msg == NULL)
743     {      
744       was_transmitted = qe->was_transmitted;
745       free_queue_entry (qe);
746       if (NULL == h->client)
747         return; /* forced disconnect */
748       if (rc.cont != NULL)
749         rc.cont (rc.cont_cls, 
750                  GNUNET_SYSERR,
751                  _("Failed to receive status response from database."));
752       if (was_transmitted == GNUNET_YES)
753         do_disconnect (h);
754       return;
755     }
756   GNUNET_assert (GNUNET_YES == qe->was_transmitted);
757   free_queue_entry (qe);
758   if ( (ntohs(msg->size) < sizeof(struct StatusMessage)) ||
759        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS) ) 
760     {
761       GNUNET_break (0);
762       h->retry_time = GNUNET_TIME_UNIT_ZERO;
763       do_disconnect (h);
764       if (rc.cont != NULL)
765         rc.cont (rc.cont_cls, 
766                  GNUNET_SYSERR,
767                  _("Error reading response from datastore service"));
768       return;
769     }
770   sm = (const struct StatusMessage*) msg;
771   status = ntohl(sm->status);
772   emsg = NULL;
773   if (ntohs(msg->size) > sizeof(struct StatusMessage))
774     {
775       emsg = (const char*) &sm[1];
776       if (emsg[ntohs(msg->size) - sizeof(struct StatusMessage) - 1] != '\0')
777         {
778           GNUNET_break (0);
779           emsg = _("Invalid error message received from datastore service");
780         }
781     }  
782   if ( (status == GNUNET_SYSERR) &&
783        (emsg == NULL) )
784     {
785       GNUNET_break (0);
786       emsg = _("Invalid error message received from datastore service");
787     }
788 #if DEBUG_DATASTORE
789   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
790               "Received status %d/%s\n",
791               (int) status,
792               emsg);
793 #endif
794   GNUNET_STATISTICS_update (h->stats,
795                             gettext_noop ("# status messages received"),
796                             1,
797                             GNUNET_NO);
798   h->retry_time.rel_value = 0;
799   process_queue (h);
800   if (rc.cont != NULL)
801     rc.cont (rc.cont_cls, 
802              status,
803              emsg);
804 }
805
806
807 /**
808  * Store an item in the datastore.  If the item is already present,
809  * the priorities are summed up and the higher expiration time and
810  * lower anonymity level is used.
811  *
812  * @param h handle to the datastore
813  * @param rid reservation ID to use (from "reserve"); use 0 if no
814  *            prior reservation was made
815  * @param key key for the value
816  * @param size number of bytes in data
817  * @param data content stored
818  * @param type type of the content
819  * @param priority priority of the content
820  * @param anonymity anonymity-level for the content
821  * @param replication how often should the content be replicated to other peers?
822  * @param expiration expiration time for the content
823  * @param queue_priority ranking of this request in the priority queue
824  * @param max_queue_size at what queue size should this request be dropped
825  *        (if other requests of higher priority are in the queue)
826  * @param timeout timeout for the operation
827  * @param cont continuation to call when done
828  * @param cont_cls closure for cont
829  * @return NULL if the entry was not queued, otherwise a handle that can be used to
830  *         cancel; note that even if NULL is returned, the callback will be invoked
831  *         (or rather, will already have been invoked)
832  */
833 struct GNUNET_DATASTORE_QueueEntry *
834 GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
835                       uint32_t rid,
836                       const GNUNET_HashCode * key,
837                       size_t size,
838                       const void *data,
839                       enum GNUNET_BLOCK_Type type,
840                       uint32_t priority,
841                       uint32_t anonymity,
842                       uint32_t replication,
843                       struct GNUNET_TIME_Absolute expiration,
844                       unsigned int queue_priority,
845                       unsigned int max_queue_size,
846                       struct GNUNET_TIME_Relative timeout,
847                       GNUNET_DATASTORE_ContinuationWithStatus cont,
848                       void *cont_cls)
849 {
850   struct GNUNET_DATASTORE_QueueEntry *qe;
851   struct DataMessage *dm;
852   size_t msize;
853   union QueueContext qc;
854
855 #if DEBUG_DATASTORE
856   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
857               "Asked to put %u bytes of data under key `%s' for %llu ms\n",
858               size,
859               GNUNET_h2s (key),
860               GNUNET_TIME_absolute_get_remaining (expiration).rel_value);
861 #endif
862   msize = sizeof(struct DataMessage) + size;
863   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
864   qc.sc.cont = cont;
865   qc.sc.cont_cls = cont_cls;
866   qe = make_queue_entry (h, msize,
867                          queue_priority, max_queue_size, timeout,
868                          &process_status_message, &qc);
869   if (qe == NULL)
870     {
871 #if DEBUG_DATASTORE
872       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
873                   "Could not create queue entry for PUT\n");
874 #endif
875       return NULL;
876     }
877   GNUNET_STATISTICS_update (h->stats,
878                             gettext_noop ("# PUT requests executed"),
879                             1,
880                             GNUNET_NO);
881   dm = (struct DataMessage* ) &qe[1];
882   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
883   dm->header.size = htons(msize);
884   dm->rid = htonl(rid);
885   dm->size = htonl( (uint32_t) size);
886   dm->type = htonl(type);
887   dm->priority = htonl(priority);
888   dm->anonymity = htonl(anonymity);
889   dm->replication = htonl (replication);
890   dm->reserved = htonl (0);
891   dm->uid = GNUNET_htonll(0);
892   dm->expiration = GNUNET_TIME_absolute_hton(expiration);
893   dm->key = *key;
894   memcpy (&dm[1], data, size);
895   process_queue (h);
896   return qe;
897 }
898
899
900 /**
901  * Reserve space in the datastore.  This function should be used
902  * to avoid "out of space" failures during a longer sequence of "put"
903  * operations (for example, when a file is being inserted).
904  *
905  * @param h handle to the datastore
906  * @param amount how much space (in bytes) should be reserved (for content only)
907  * @param entries how many entries will be created (to calculate per-entry overhead)
908  * @param queue_priority ranking of this request in the priority queue
909  * @param max_queue_size at what queue size should this request be dropped
910  *        (if other requests of higher priority are in the queue)
911  * @param timeout how long to wait at most for a response (or before dying in queue)
912  * @param cont continuation to call when done; "success" will be set to
913  *             a positive reservation value if space could be reserved.
914  * @param cont_cls closure for cont
915  * @return NULL if the entry was not queued, otherwise a handle that can be used to
916  *         cancel; note that even if NULL is returned, the callback will be invoked
917  *         (or rather, will already have been invoked)
918  */
919 struct GNUNET_DATASTORE_QueueEntry *
920 GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
921                           uint64_t amount,
922                           uint32_t entries,
923                           unsigned int queue_priority,
924                           unsigned int max_queue_size,
925                           struct GNUNET_TIME_Relative timeout,
926                           GNUNET_DATASTORE_ContinuationWithStatus cont,
927                           void *cont_cls)
928 {
929   struct GNUNET_DATASTORE_QueueEntry *qe;
930   struct ReserveMessage *rm;
931   union QueueContext qc;
932
933   if (cont == NULL)
934     cont = &drop_status_cont;
935 #if DEBUG_DATASTORE
936   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
937               "Asked to reserve %llu bytes of data and %u entries'\n",
938               (unsigned long long) amount,
939               (unsigned int) entries);
940 #endif
941   qc.sc.cont = cont;
942   qc.sc.cont_cls = cont_cls;
943   qe = make_queue_entry (h, sizeof(struct ReserveMessage),
944                          queue_priority, max_queue_size, timeout,
945                          &process_status_message, &qc);
946   if (qe == NULL)
947     {
948 #if DEBUG_DATASTORE
949       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
950                   "Could not create queue entry to reserve\n");
951 #endif
952       return NULL;
953     }
954   GNUNET_STATISTICS_update (h->stats,
955                             gettext_noop ("# RESERVE requests executed"),
956                             1,
957                             GNUNET_NO);
958   rm = (struct ReserveMessage*) &qe[1];
959   rm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
960   rm->header.size = htons(sizeof (struct ReserveMessage));
961   rm->entries = htonl(entries);
962   rm->amount = GNUNET_htonll(amount);
963   process_queue (h);
964   return qe;
965 }
966
967
968 /**
969  * Signal that all of the data for which a reservation was made has
970  * been stored and that whatever excess space might have been reserved
971  * can now be released.
972  *
973  * @param h handle to the datastore
974  * @param rid reservation ID (value of "success" in original continuation
975  *        from the "reserve" function).
976  * @param queue_priority ranking of this request in the priority queue
977  * @param max_queue_size at what queue size should this request be dropped
978  *        (if other requests of higher priority are in the queue)
979  * @param queue_priority ranking of this request in the priority queue
980  * @param max_queue_size at what queue size should this request be dropped
981  *        (if other requests of higher priority are in the queue)
982  * @param timeout how long to wait at most for a response
983  * @param cont continuation to call when done
984  * @param cont_cls closure for cont
985  * @return NULL if the entry was not queued, otherwise a handle that can be used to
986  *         cancel; note that even if NULL is returned, the callback will be invoked
987  *         (or rather, will already have been invoked)
988  */
989 struct GNUNET_DATASTORE_QueueEntry *
990 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
991                                   uint32_t rid,
992                                   unsigned int queue_priority,
993                                   unsigned int max_queue_size,
994                                   struct GNUNET_TIME_Relative timeout,
995                                   GNUNET_DATASTORE_ContinuationWithStatus cont,
996                                   void *cont_cls)
997 {
998   struct GNUNET_DATASTORE_QueueEntry *qe;
999   struct ReleaseReserveMessage *rrm;
1000   union QueueContext qc;
1001
1002   if (cont == NULL)
1003     cont = &drop_status_cont;
1004 #if DEBUG_DATASTORE
1005   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1006               "Asked to release reserve %d\n",
1007               rid);
1008 #endif
1009   qc.sc.cont = cont;
1010   qc.sc.cont_cls = cont_cls;
1011   qe = make_queue_entry (h, sizeof(struct ReleaseReserveMessage),
1012                          queue_priority, max_queue_size, timeout,
1013                          &process_status_message, &qc);
1014   if (qe == NULL)
1015     {
1016 #if DEBUG_DATASTORE
1017       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1018                   "Could not create queue entry to release reserve\n");
1019 #endif
1020       return NULL;
1021     }
1022   GNUNET_STATISTICS_update (h->stats,
1023                             gettext_noop ("# RELEASE RESERVE requests executed"),
1024                             1,
1025                             GNUNET_NO);
1026   rrm = (struct ReleaseReserveMessage*) &qe[1];
1027   rrm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
1028   rrm->header.size = htons(sizeof (struct ReleaseReserveMessage));
1029   rrm->rid = htonl(rid);
1030   process_queue (h);
1031   return qe;
1032 }
1033
1034
1035 /**
1036  * Update a value in the datastore.
1037  *
1038  * @param h handle to the datastore
1039  * @param uid identifier for the value
1040  * @param priority how much to increase the priority of the value
1041  * @param expiration new expiration value should be MAX of existing and this argument
1042  * @param queue_priority ranking of this request in the priority queue
1043  * @param max_queue_size at what queue size should this request be dropped
1044  *        (if other requests of higher priority are in the queue)
1045  * @param timeout how long to wait at most for a response
1046  * @param cont continuation to call when done
1047  * @param cont_cls closure for cont
1048  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1049  *         cancel; note that even if NULL is returned, the callback will be invoked
1050  *         (or rather, will already have been invoked)
1051  */
1052 struct GNUNET_DATASTORE_QueueEntry *
1053 GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
1054                          uint64_t uid,
1055                          uint32_t priority,
1056                          struct GNUNET_TIME_Absolute expiration,
1057                          unsigned int queue_priority,
1058                          unsigned int max_queue_size,
1059                          struct GNUNET_TIME_Relative timeout,
1060                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1061                          void *cont_cls)
1062 {
1063   struct GNUNET_DATASTORE_QueueEntry *qe;
1064   struct UpdateMessage *um;
1065   union QueueContext qc;
1066
1067   if (cont == NULL)
1068     cont = &drop_status_cont;
1069 #if DEBUG_DATASTORE
1070   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1071               "Asked to update entry %llu raising priority by %u and expiration to %llu\n",
1072               uid,
1073               (unsigned int) priority,
1074               (unsigned long long) expiration.abs_value);
1075 #endif
1076   qc.sc.cont = cont;
1077   qc.sc.cont_cls = cont_cls;
1078   qe = make_queue_entry (h, sizeof(struct UpdateMessage),
1079                          queue_priority, max_queue_size, timeout,
1080                          &process_status_message, &qc);
1081   if (qe == NULL)
1082     {
1083 #if DEBUG_DATASTORE
1084       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1085                   "Could not create queue entry for UPDATE\n");
1086 #endif
1087       return NULL;
1088     }
1089   GNUNET_STATISTICS_update (h->stats,
1090                             gettext_noop ("# UPDATE requests executed"),
1091                             1,
1092                             GNUNET_NO);
1093   um = (struct UpdateMessage*) &qe[1];
1094   um->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
1095   um->header.size = htons(sizeof (struct UpdateMessage));
1096   um->priority = htonl(priority);
1097   um->expiration = GNUNET_TIME_absolute_hton(expiration);
1098   um->uid = GNUNET_htonll(uid);
1099   process_queue (h);
1100   return qe;
1101 }
1102
1103
1104 /**
1105  * Explicitly remove some content from the database.
1106  * The "cont"inuation will be called with status
1107  * "GNUNET_OK" if content was removed, "GNUNET_NO"
1108  * if no matching entry was found and "GNUNET_SYSERR"
1109  * on all other types of errors.
1110  *
1111  * @param h handle to the datastore
1112  * @param key key for the value
1113  * @param size number of bytes in data
1114  * @param data content stored
1115  * @param queue_priority ranking of this request in the priority queue
1116  * @param max_queue_size at what queue size should this request be dropped
1117  *        (if other requests of higher priority are in the queue)
1118  * @param timeout how long to wait at most for a response
1119  * @param cont continuation to call when done
1120  * @param cont_cls closure for cont
1121  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1122  *         cancel; note that even if NULL is returned, the callback will be invoked
1123  *         (or rather, will already have been invoked)
1124  */
1125 struct GNUNET_DATASTORE_QueueEntry *
1126 GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
1127                          const GNUNET_HashCode *key,
1128                          size_t size,
1129                          const void *data,
1130                          unsigned int queue_priority,
1131                          unsigned int max_queue_size,
1132                          struct GNUNET_TIME_Relative timeout,
1133                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1134                          void *cont_cls)
1135 {
1136   struct GNUNET_DATASTORE_QueueEntry *qe;
1137   struct DataMessage *dm;
1138   size_t msize;
1139   union QueueContext qc;
1140
1141   if (cont == NULL)
1142     cont = &drop_status_cont;
1143 #if DEBUG_DATASTORE
1144   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1145               "Asked to remove %u bytes under key `%s'\n",
1146               size,
1147               GNUNET_h2s (key));
1148 #endif
1149   qc.sc.cont = cont;
1150   qc.sc.cont_cls = cont_cls;
1151   msize = sizeof(struct DataMessage) + size;
1152   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1153   qe = make_queue_entry (h, msize,
1154                          queue_priority, max_queue_size, timeout,
1155                          &process_status_message, &qc);
1156   if (qe == NULL)
1157     {
1158 #if DEBUG_DATASTORE
1159       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1160                   "Could not create queue entry for REMOVE\n");
1161 #endif
1162       return NULL;
1163     }
1164   GNUNET_STATISTICS_update (h->stats,
1165                             gettext_noop ("# REMOVE requests executed"),
1166                             1,
1167                             GNUNET_NO);
1168   dm = (struct DataMessage*) &qe[1];
1169   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
1170   dm->header.size = htons(msize);
1171   dm->rid = htonl(0);
1172   dm->size = htonl(size);
1173   dm->type = htonl(0);
1174   dm->priority = htonl(0);
1175   dm->anonymity = htonl(0);
1176   dm->uid = GNUNET_htonll(0);
1177   dm->expiration = GNUNET_TIME_absolute_hton(GNUNET_TIME_UNIT_ZERO_ABS);
1178   dm->key = *key;
1179   memcpy (&dm[1], data, size);
1180   process_queue (h);
1181   return qe;
1182 }
1183
1184
1185 /**
1186  * Type of a function to call when we receive a message
1187  * from the service.
1188  *
1189  * @param cls closure
1190  * @param msg message received, NULL on timeout or fatal error
1191  */
1192 static void 
1193 process_result_message (void *cls,
1194                         const struct GNUNET_MessageHeader *msg)
1195 {
1196   struct GNUNET_DATASTORE_Handle *h = cls;
1197   struct GNUNET_DATASTORE_QueueEntry *qe;
1198   struct ResultContext rc;
1199   const struct DataMessage *dm;
1200
1201   h->in_receive = GNUNET_NO;
1202   if (h->skip_next_messages > 0)
1203     {
1204       h->skip_next_messages--;
1205       process_queue (h);
1206       return;
1207     }
1208   if (msg == NULL)
1209     {
1210       qe = h->queue_head;
1211       GNUNET_assert (NULL != qe);
1212       if (qe->was_transmitted == GNUNET_YES)
1213         {
1214           rc = qe->qc.rc;
1215           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1216                       _("Failed to receive response from database.\n"));
1217           do_disconnect (h);
1218           free_queue_entry (qe);
1219           if (rc.proc != NULL)
1220             rc.proc (rc.proc_cls,
1221                      NULL, 0, NULL, 0, 0, 0, 
1222                      GNUNET_TIME_UNIT_ZERO_ABS, 0);    
1223         }
1224       return;
1225     }
1226   if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) 
1227     {
1228       GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader));
1229       qe = h->queue_head;
1230       rc = qe->qc.rc;
1231       GNUNET_assert (GNUNET_YES == qe->was_transmitted);
1232       free_queue_entry (qe);
1233 #if DEBUG_DATASTORE
1234       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1235                   "Received end of result set, new queue size is %u\n",
1236                   h->queue_size);
1237 #endif
1238       if (rc.proc != NULL)
1239         rc.proc (rc.proc_cls,
1240                  NULL, 0, NULL, 0, 0, 0, 
1241                  GNUNET_TIME_UNIT_ZERO_ABS, 0); 
1242       h->retry_time.rel_value = 0;
1243       h->result_count = 0;
1244       process_queue (h);
1245       return;
1246     }
1247   qe = h->queue_head;
1248   rc = qe->qc.rc;
1249   GNUNET_assert (GNUNET_YES == qe->was_transmitted);
1250   if ( (ntohs(msg->size) < sizeof(struct DataMessage)) ||
1251        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) ||
1252        (ntohs(msg->size) != sizeof(struct DataMessage) + ntohl (((const struct DataMessage*)msg)->size)) )
1253     {
1254       GNUNET_break (0);
1255       free_queue_entry (qe);
1256       h->retry_time = GNUNET_TIME_UNIT_ZERO;
1257       do_disconnect (h);
1258       if (rc.proc != NULL)
1259         rc.proc (rc.proc_cls,
1260                  NULL, 0, NULL, 0, 0, 0, 
1261                  GNUNET_TIME_UNIT_ZERO_ABS, 0);
1262       return;
1263     }
1264   GNUNET_STATISTICS_update (h->stats,
1265                             gettext_noop ("# Results received"),
1266                             1,
1267                             GNUNET_NO);
1268   dm = (const struct DataMessage*) msg;
1269 #if DEBUG_DATASTORE
1270   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1271               "Received result %llu with type %u and size %u with key %s\n",
1272               (unsigned long long) GNUNET_ntohll(dm->uid),
1273               ntohl(dm->type),
1274               ntohl(dm->size),
1275               GNUNET_h2s(&dm->key));
1276 #endif
1277   free_queue_entry (qe);
1278   h->retry_time.rel_value = 0;
1279   rc.proc (rc.proc_cls,
1280            &dm->key,
1281            ntohl(dm->size),
1282            &dm[1],
1283            ntohl(dm->type),
1284            ntohl(dm->priority),
1285            ntohl(dm->anonymity),
1286            GNUNET_TIME_absolute_ntoh(dm->expiration),   
1287            GNUNET_ntohll(dm->uid));
1288 }
1289
1290
1291 /**
1292  * Get a random value from the datastore for content replication.
1293  * Returns a single, random value among those with the highest
1294  * replication score, lowering positive replication scores by one for
1295  * the chosen value (if only content with a replication score exists,
1296  * a random value is returned and replication scores are not changed).
1297  *
1298  * @param h handle to the datastore
1299  * @param queue_priority ranking of this request in the priority queue
1300  * @param max_queue_size at what queue size should this request be dropped
1301  *        (if other requests of higher priority are in the queue)
1302  * @param timeout how long to wait at most for a response
1303  * @param proc function to call on a random value; it
1304  *        will be called once with a value (if available)
1305  *        and always once with a value of NULL.
1306  * @param proc_cls closure for proc
1307  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1308  *         cancel
1309  */
1310 struct GNUNET_DATASTORE_QueueEntry *
1311 GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
1312                                       unsigned int queue_priority,
1313                                       unsigned int max_queue_size,
1314                                       struct GNUNET_TIME_Relative timeout,
1315                                       GNUNET_DATASTORE_DatumProcessor proc, 
1316                                       void *proc_cls)
1317 {
1318   struct GNUNET_DATASTORE_QueueEntry *qe;
1319   struct GNUNET_MessageHeader *m;
1320   union QueueContext qc;
1321
1322   GNUNET_assert (NULL != proc);
1323 #if DEBUG_DATASTORE
1324   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1325               "Asked to get replication entry in %llu ms\n",
1326               (unsigned long long) timeout.rel_value);
1327 #endif
1328   qc.rc.proc = proc;
1329   qc.rc.proc_cls = proc_cls;
1330   qe = make_queue_entry (h, sizeof(struct GNUNET_MessageHeader),
1331                          queue_priority, max_queue_size, timeout,
1332                          &process_result_message, &qc);
1333   if (qe == NULL)
1334     {
1335 #if DEBUG_DATASTORE
1336       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1337                   "Could not create queue entry for GET REPLICATION\n");
1338 #endif
1339       return NULL;    
1340     }
1341   GNUNET_STATISTICS_update (h->stats,
1342                             gettext_noop ("# GET REPLICATION requests executed"),
1343                             1,
1344                             GNUNET_NO);
1345   m = (struct GNUNET_MessageHeader*) &qe[1];
1346   m->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION);
1347   m->size = htons(sizeof (struct GNUNET_MessageHeader));
1348   process_queue (h);
1349   return qe;
1350 }
1351
1352
1353 /**
1354  * Get a single zero-anonymity value from the datastore.
1355  *
1356  * @param h handle to the datastore
1357  * @param offset offset of the result (mod #num-results); set to
1358  *               a random 64-bit value initially; then increment by
1359  *               one each time; detect that all results have been found by uid
1360  *               being again the first uid ever returned.
1361  * @param queue_priority ranking of this request in the priority queue
1362  * @param max_queue_size at what queue size should this request be dropped
1363  *        (if other requests of higher priority are in the queue)
1364  * @param timeout how long to wait at most for a response
1365  * @param type allowed type for the operation (never zero)
1366  * @param proc function to call on a random value; it
1367  *        will be called once with a value (if available)
1368  *        or with NULL if none value exists.
1369  * @param proc_cls closure for proc
1370  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1371  *         cancel
1372  */
1373 struct GNUNET_DATASTORE_QueueEntry *
1374 GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1375                                      uint64_t offset,
1376                                      unsigned int queue_priority,
1377                                      unsigned int max_queue_size,
1378                                      struct GNUNET_TIME_Relative timeout,
1379                                      enum GNUNET_BLOCK_Type type,
1380                                      GNUNET_DATASTORE_DatumProcessor proc, 
1381                                      void *proc_cls)
1382 {
1383   struct GNUNET_DATASTORE_QueueEntry *qe;
1384   struct GetZeroAnonymityMessage *m;
1385   union QueueContext qc;
1386
1387   GNUNET_assert (NULL != proc);
1388   GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY);
1389 #if DEBUG_DATASTORE
1390   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1391               "Asked to get %llu-th zero-anonymity entry of type %d in %llu ms\n",
1392               (unsigned long long) offset,
1393               type,
1394               (unsigned long long) timeout.rel_value);
1395 #endif
1396   qc.rc.proc = proc;
1397   qc.rc.proc_cls = proc_cls;
1398   qe = make_queue_entry (h, sizeof(struct GetZeroAnonymityMessage),
1399                          queue_priority, max_queue_size, timeout,
1400                          &process_result_message, &qc);
1401   if (qe == NULL)
1402     {
1403 #if DEBUG_DATASTORE
1404       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1405                   "Could not create queue entry for zero-anonymity procation\n");
1406 #endif
1407       return NULL;    
1408     }
1409   GNUNET_STATISTICS_update (h->stats,
1410                             gettext_noop ("# GET ZERO ANONYMITY requests executed"),
1411                             1,
1412                             GNUNET_NO);
1413   m = (struct GetZeroAnonymityMessage*) &qe[1];
1414   m->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY);
1415   m->header.size = htons(sizeof (struct GetZeroAnonymityMessage));
1416   m->type = htonl ((uint32_t) type);
1417   m->offset = GNUNET_htonll (offset);
1418   process_queue (h);
1419   return qe;
1420 }
1421
1422
1423 /**
1424  * Get a result for a particular key from the datastore.  The processor
1425  * will only be called once.
1426  *
1427  * @param h handle to the datastore
1428  * @param offset offset of the result (mod #num-results); set to
1429  *               a random 64-bit value initially; then increment by
1430  *               one each time; detect that all results have been found by uid
1431  *               being again the first uid ever returned.
1432  * @param key maybe NULL (to match all entries)
1433  * @param type desired type, 0 for any
1434  * @param queue_priority ranking of this request in the priority queue
1435  * @param max_queue_size at what queue size should this request be dropped
1436  *        (if other requests of higher priority are in the queue)
1437  * @param timeout how long to wait at most for a response
1438  * @param proc function to call on each matching value;
1439  *        will be called once with a NULL value at the end
1440  * @param proc_cls closure for proc
1441  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1442  *         cancel
1443  */
1444 struct GNUNET_DATASTORE_QueueEntry *
1445 GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h,
1446                           uint64_t offset,
1447                           const GNUNET_HashCode * key,
1448                           enum GNUNET_BLOCK_Type type,
1449                           unsigned int queue_priority,
1450                           unsigned int max_queue_size,
1451                           struct GNUNET_TIME_Relative timeout,
1452                           GNUNET_DATASTORE_DatumProcessor proc, 
1453                           void *proc_cls)
1454 {
1455   struct GNUNET_DATASTORE_QueueEntry *qe;
1456   struct GetMessage *gm;
1457   union QueueContext qc;
1458
1459   GNUNET_assert (NULL != proc);
1460 #if DEBUG_DATASTORE
1461   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1462               "Asked to look for data of type %u under key `%s'\n",
1463               (unsigned int) type,
1464               GNUNET_h2s (key));
1465 #endif
1466   qc.rc.proc = proc;
1467   qc.rc.proc_cls = proc_cls;
1468   qe = make_queue_entry (h, sizeof(struct GetMessage),
1469                          queue_priority, max_queue_size, timeout,
1470                          &process_result_message, &qc);
1471   if (qe == NULL)
1472     {
1473 #if DEBUG_DATASTORE
1474       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1475                   "Could not queue request for `%s'\n",
1476                   GNUNET_h2s (key));
1477 #endif
1478       return NULL;
1479     }
1480   GNUNET_STATISTICS_update (h->stats,
1481                             gettext_noop ("# GET requests executed"),
1482                             1,
1483                             GNUNET_NO);
1484   gm = (struct GetMessage*) &qe[1];
1485   gm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1486   gm->type = htonl(type);
1487   gm->offset = GNUNET_htonll (offset);
1488   if (key != NULL)
1489     {
1490       gm->header.size = htons(sizeof (struct GetMessage));
1491       gm->key = *key;
1492     }
1493   else
1494     {
1495       gm->header.size = htons(sizeof (struct GetMessage) - sizeof(GNUNET_HashCode));
1496     }
1497   process_queue (h);
1498   return qe;
1499 }
1500
1501
1502 /**
1503  * Cancel a datastore operation.  The final callback from the
1504  * operation must not have been done yet.
1505  * 
1506  * @param qe operation to cancel
1507  */
1508 void
1509 GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
1510 {
1511   struct GNUNET_DATASTORE_Handle *h;
1512
1513   GNUNET_assert (GNUNET_SYSERR != qe->was_transmitted);
1514   h = qe->h;
1515 #if DEBUG_DATASTORE
1516   GNUNET_log  (GNUNET_ERROR_TYPE_DEBUG,
1517                "Pending DATASTORE request %p cancelled (%d, %d)\n",
1518                qe,
1519                qe->was_transmitted,
1520                h->queue_head == qe);
1521 #endif
1522   if (GNUNET_YES == qe->was_transmitted) 
1523     {
1524       free_queue_entry (qe);
1525       h->skip_next_messages++;
1526       return;
1527     }
1528   free_queue_entry (qe);
1529   process_queue (h);
1530 }
1531
1532
1533 /* end of datastore_api.c */