76d9e5f3c8363d4fe804863c61105c8a02fdfd83
[oweals/gnunet.git] / src / datastore / datastore_api.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/datastore_api.c
23  * @brief Management for the datastore for files stored on a GNUnet node.  Implements
24  *        a priority queue for requests (with timeouts).
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet_arm_service.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_datastore_service.h"
31 #include "gnunet_statistics_service.h"
32 #include "datastore.h"
33
34 /**
35  * If a client stopped asking for more results, how many more do
36  * we receive from the DB before killing the connection?  Trade-off
37  * between re-doing TCP handshakes and (needlessly) receiving 
38  * useless results.
39  */
40 #define MAX_EXCESS_RESULTS 8
41
42 /**
43  * Context for processing status messages.
44  */
45 struct StatusContext
46 {
47   /**
48    * Continuation to call with the status.
49    */
50   GNUNET_DATASTORE_ContinuationWithStatus cont;
51
52   /**
53    * Closure for cont.
54    */
55   void *cont_cls;
56
57 };
58
59
60 /**
61  * Context for processing result messages.
62  */
63 struct ResultContext
64 {
65   /**
66    * Function to call with the result.
67    */
68   GNUNET_DATASTORE_DatumProcessor proc;
69
70   /**
71    * Closure for proc.
72    */
73   void *proc_cls;
74
75 };
76
77
78 /**
79  *  Context for a queue operation.
80  */
81 union QueueContext
82 {
83
84   struct StatusContext sc;
85   
86   struct ResultContext rc;
87
88 };
89
90
91
92 /**
93  * Entry in our priority queue.
94  */
95 struct GNUNET_DATASTORE_QueueEntry
96 {
97
98   /**
99    * This is a linked list.
100    */
101   struct GNUNET_DATASTORE_QueueEntry *next;
102
103   /**
104    * This is a linked list.
105    */
106   struct GNUNET_DATASTORE_QueueEntry *prev;
107
108   /**
109    * Handle to the master context.
110    */
111   struct GNUNET_DATASTORE_Handle *h;
112
113   /**
114    * Response processor (NULL if we are not waiting for a response).
115    * This struct should be used for the closure, function-specific
116    * arguments can be passed via 'qc'.
117    */
118   GNUNET_CLIENT_MessageHandler response_proc;
119
120   /**
121    * Function to call after transmission of the request.
122    */
123   GNUNET_DATASTORE_ContinuationWithStatus cont;
124    
125   /**
126    * Closure for 'cont'.
127    */
128   void *cont_cls;
129
130   /**
131    * Context for the operation.
132    */
133   union QueueContext qc;
134
135   /**
136    * Task for timeout signalling.
137    */
138   GNUNET_SCHEDULER_TaskIdentifier task;
139
140   /**
141    * Timeout for the current operation.
142    */
143   struct GNUNET_TIME_Absolute timeout;
144
145   /**
146    * Priority in the queue.
147    */
148   unsigned int priority;
149
150   /**
151    * Maximum allowed length of queue (otherwise
152    * this request should be discarded).
153    */
154   unsigned int max_queue;
155
156   /**
157    * Number of bytes in the request message following
158    * this struct.  32-bit value for nicer memory
159    * access (and overall struct alignment).
160    */
161   uint32_t message_size;
162
163   /**
164    * Has this message been transmitted to the service?
165    * Only ever GNUNET_YES for the head of the queue.
166    * Note that the overall struct should end at a 
167    * multiple of 64 bits.
168    */
169   int was_transmitted;
170   
171 };
172
173 /**
174  * Handle to the datastore service. 
175  */
176 struct GNUNET_DATASTORE_Handle
177 {
178
179   /**
180    * Our configuration.
181    */
182   const struct GNUNET_CONFIGURATION_Handle *cfg;
183
184   /**
185    * Current connection to the datastore service.
186    */
187   struct GNUNET_CLIENT_Connection *client;
188
189   /**
190    * Handle for statistics.
191    */
192   struct GNUNET_STATISTICS_Handle *stats;
193
194   /**
195    * Current transmit handle.
196    */
197   struct GNUNET_CLIENT_TransmitHandle *th;
198
199   /**
200    * Current head of priority queue.
201    */
202   struct GNUNET_DATASTORE_QueueEntry *queue_head;
203
204   /**
205    * Current tail of priority queue.
206    */
207   struct GNUNET_DATASTORE_QueueEntry *queue_tail;
208
209   /**
210    * Task for trying to reconnect.
211    */
212   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
213
214   /**
215    * How quickly should we retry?  Used for exponential back-off on
216    * connect-errors.
217    */
218   struct GNUNET_TIME_Relative retry_time;
219
220   /**
221    * Number of entries in the queue.
222    */
223   unsigned int queue_size;
224
225   /**
226    * Number of results we're receiving for the current query
227    * after application stopped to care.  Used to determine when
228    * to reset the connection.
229    */
230   unsigned int result_count;
231
232   /**
233    * Are we currently trying to receive from the service?
234    */
235   int in_receive;
236
237   /**
238    * We should ignore the next message(s) from the service.
239    */
240   unsigned int skip_next_messages;
241
242 };
243
244
245
246 /**
247  * Connect to the datastore service.
248  *
249  * @param cfg configuration to use
250  * @return handle to use to access the service
251  */
252 struct GNUNET_DATASTORE_Handle *
253 GNUNET_DATASTORE_connect (const struct
254                           GNUNET_CONFIGURATION_Handle
255                           *cfg)
256 {
257   struct GNUNET_CLIENT_Connection *c;
258   struct GNUNET_DATASTORE_Handle *h;
259   
260   c = GNUNET_CLIENT_connect ("datastore", cfg);
261   if (c == NULL)
262     return NULL; /* oops */
263   h = GNUNET_malloc (sizeof(struct GNUNET_DATASTORE_Handle) + 
264                      GNUNET_SERVER_MAX_MESSAGE_SIZE - 1);
265   h->client = c;
266   h->cfg = cfg;
267   h->stats = GNUNET_STATISTICS_create ("datastore-api",
268                                        cfg);
269   return h;
270 }
271
272
273 /**
274  * Transmit DROP message to datastore service.
275  *
276  * @param cls the 'struct GNUNET_DATASTORE_Handle'
277  * @param size number of bytes that can be copied to buf
278  * @param buf where to copy the drop message
279  * @return number of bytes written to buf
280  */
281 static size_t
282 transmit_drop (void *cls,
283                size_t size, 
284                void *buf)
285 {
286   struct GNUNET_DATASTORE_Handle *h = cls;
287   struct GNUNET_MessageHeader *hdr;
288   
289   if (buf == NULL)
290     {
291       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
292                   _("Failed to transmit request to drop database.\n"));
293       GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
294       return 0;
295     }
296   GNUNET_assert (size >= sizeof(struct GNUNET_MessageHeader));
297   hdr = buf;
298   hdr->size = htons(sizeof(struct GNUNET_MessageHeader));
299   hdr->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
300   GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
301   return sizeof(struct GNUNET_MessageHeader);
302 }
303
304
305 /**
306  * Disconnect from the datastore service (and free
307  * associated resources).
308  *
309  * @param h handle to the datastore
310  * @param drop set to GNUNET_YES to delete all data in datastore (!)
311  */
312 void
313 GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
314                              int drop)
315 {
316   struct GNUNET_DATASTORE_QueueEntry *qe;
317
318   if (h->client != NULL)
319     {
320       GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
321       h->client = NULL;
322     }
323   if (h->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
324     {
325       GNUNET_SCHEDULER_cancel (h->reconnect_task);
326       h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
327     }
328   while (NULL != (qe = h->queue_head))
329     {
330       GNUNET_assert (NULL != qe->response_proc);
331       qe->response_proc (h, NULL);
332     }
333   if (GNUNET_YES == drop) 
334     {
335       h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
336       if (h->client != NULL)
337         {
338           if (NULL != 
339               GNUNET_CLIENT_notify_transmit_ready (h->client,
340                                                    sizeof(struct GNUNET_MessageHeader),
341                                                    GNUNET_TIME_UNIT_MINUTES,
342                                                    GNUNET_YES,
343                                                    &transmit_drop,
344                                                    h))
345             return;
346           GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
347         }
348       GNUNET_break (0);
349     }
350   GNUNET_STATISTICS_destroy (h->stats,
351                              GNUNET_NO);
352   GNUNET_free (h);
353 }
354
355
356 /**
357  * A request has timed out (before being transmitted to the service).
358  *
359  * @param cls the 'struct GNUNET_DATASTORE_QueueEntry'
360  * @param tc scheduler context
361  */
362 static void
363 timeout_queue_entry (void *cls,
364                      const struct GNUNET_SCHEDULER_TaskContext *tc)
365 {
366   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
367
368   GNUNET_STATISTICS_update (qe->h->stats,
369                             gettext_noop ("# queue entry timeouts"),
370                             1,
371                             GNUNET_NO);
372   qe->task = GNUNET_SCHEDULER_NO_TASK;
373   GNUNET_assert (qe->was_transmitted == GNUNET_NO);
374   qe->response_proc (qe->h, NULL);
375 }
376
377
378 /**
379  * Create a new entry for our priority queue (and possibly discard other entires if
380  * the queue is getting too long).
381  *
382  * @param h handle to the datastore
383  * @param msize size of the message to queue
384  * @param queue_priority priority of the entry
385  * @param max_queue_size at what queue size should this request be dropped
386  *        (if other requests of higher priority are in the queue)
387  * @param timeout timeout for the operation
388  * @param response_proc function to call with replies (can be NULL)
389  * @param qc client context (NOT a closure for response_proc)
390  * @return NULL if the queue is full 
391  */
392 static struct GNUNET_DATASTORE_QueueEntry *
393 make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
394                   size_t msize,
395                   unsigned int queue_priority,
396                   unsigned int max_queue_size,
397                   struct GNUNET_TIME_Relative timeout,
398                   GNUNET_CLIENT_MessageHandler response_proc,            
399                   const union QueueContext *qc)
400 {
401   struct GNUNET_DATASTORE_QueueEntry *ret;
402   struct GNUNET_DATASTORE_QueueEntry *pos;
403   unsigned int c;
404
405   c = 0;
406   pos = h->queue_head;
407   while ( (pos != NULL) &&
408           (c < max_queue_size) &&
409           (pos->priority >= queue_priority) )
410     {
411       c++;
412       pos = pos->next;
413     }
414   if (c >= max_queue_size)
415     {
416       GNUNET_STATISTICS_update (h->stats,
417                                 gettext_noop ("# queue overflows"),
418                                 1,
419                                 GNUNET_NO);
420       return NULL;
421     }
422   ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize);
423   ret->h = h;
424   ret->response_proc = response_proc;
425   ret->qc = *qc;
426   ret->timeout = GNUNET_TIME_relative_to_absolute (timeout);
427   ret->priority = queue_priority;
428   ret->max_queue = max_queue_size;
429   ret->message_size = msize;
430   ret->was_transmitted = GNUNET_NO;
431   if (pos == NULL)
432     {
433       /* append at the tail */
434       pos = h->queue_tail;
435     }
436   else
437     {
438       pos = pos->prev; 
439       /* do not insert at HEAD if HEAD query was already
440          transmitted and we are still receiving replies! */
441       if ( (pos == NULL) &&
442            (h->queue_head->was_transmitted) )
443         pos = h->queue_head;
444     }
445   c++;
446   GNUNET_STATISTICS_update (h->stats,
447                             gettext_noop ("# queue entries created"),
448                             1,
449                             GNUNET_NO);
450   GNUNET_CONTAINER_DLL_insert_after (h->queue_head,
451                                      h->queue_tail,
452                                      pos,
453                                      ret);
454   h->queue_size++;
455   ret->task = GNUNET_SCHEDULER_add_delayed (timeout,
456                                             &timeout_queue_entry,
457                                             ret);
458   pos = ret->next;
459   while (pos != NULL) 
460     {
461       if (pos->max_queue < h->queue_size)
462         {
463           GNUNET_assert (pos->response_proc != NULL);
464           /* move 'pos' element to head so that it will be 
465              killed on 'NULL' call below */
466           GNUNET_CONTAINER_DLL_remove (h->queue_head,
467                                        h->queue_tail,
468                                        pos);
469           GNUNET_CONTAINER_DLL_insert (h->queue_head,
470                                        h->queue_tail,
471                                        pos);
472           pos->response_proc (h, NULL);
473           break;
474         }
475       pos = pos->next;
476     }
477   return ret;
478 }
479
480
481 /**
482  * Process entries in the queue (or do nothing if we are already
483  * doing so).
484  * 
485  * @param h handle to the datastore
486  */
487 static void
488 process_queue (struct GNUNET_DATASTORE_Handle *h);
489
490
491 /**
492  * Try reconnecting to the datastore service.
493  *
494  * @param cls the 'struct GNUNET_DATASTORE_Handle'
495  * @param tc scheduler context
496  */
497 static void
498 try_reconnect (void *cls,
499                const struct GNUNET_SCHEDULER_TaskContext *tc)
500 {
501   struct GNUNET_DATASTORE_Handle *h = cls;
502
503   if (h->retry_time.rel_value < GNUNET_CONSTANTS_SERVICE_RETRY.rel_value)
504     h->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY;
505   else
506     h->retry_time = GNUNET_TIME_relative_multiply (h->retry_time, 2);
507   if (h->retry_time.rel_value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.rel_value)
508     h->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
509   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
510   h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
511   if (h->client == NULL)
512     {
513       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
514                   "DATASTORE reconnect failed (fatally)\n");
515       return;
516     }
517   GNUNET_STATISTICS_update (h->stats,
518                             gettext_noop ("# datastore connections (re)created"),
519                             1,
520                             GNUNET_NO);
521 #if DEBUG_DATASTORE
522   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
523               "Reconnected to DATASTORE\n");
524 #endif
525   process_queue (h);
526 }
527
528
529 /**
530  * Disconnect from the service and then try reconnecting to the datastore service
531  * after some delay.
532  *
533  * @param h handle to datastore to disconnect and reconnect
534  */
535 static void
536 do_disconnect (struct GNUNET_DATASTORE_Handle *h)
537 {
538   if (h->client == NULL)
539     {
540 #if DEBUG_DATASTORE
541       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
542                   "client NULL in disconnect, will not try to reconnect\n");
543 #endif
544       return;
545     }
546 #if 0
547   GNUNET_STATISTICS_update (stats,
548                             gettext_noop ("# reconnected to DATASTORE"),
549                             1,
550                             GNUNET_NO);
551 #endif
552   GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
553   h->skip_next_messages = 0;
554   h->client = NULL;
555   h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->retry_time,
556                                                     &try_reconnect,
557                                                     h);      
558 }
559
560
561 /**
562  * Transmit request from queue to datastore service.
563  *
564  * @param cls the 'struct GNUNET_DATASTORE_Handle'
565  * @param size number of bytes that can be copied to buf
566  * @param buf where to copy the drop message
567  * @return number of bytes written to buf
568  */
569 static size_t
570 transmit_request (void *cls,
571                   size_t size, 
572                   void *buf)
573 {
574   struct GNUNET_DATASTORE_Handle *h = cls;
575   struct GNUNET_DATASTORE_QueueEntry *qe;
576   size_t msize;
577
578   h->th = NULL;
579   if (NULL == (qe = h->queue_head))
580     return 0; /* no entry in queue */
581   if (buf == NULL)
582     {
583       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
584                   _("Failed to transmit request to DATASTORE.\n"));
585       GNUNET_STATISTICS_update (h->stats,
586                                 gettext_noop ("# transmission request failures"),
587                                 1,
588                                 GNUNET_NO);
589       do_disconnect (h);
590       return 0;
591     }
592   if (size < (msize = qe->message_size))
593     {
594       process_queue (h);
595       return 0;
596     }
597  #if DEBUG_DATASTORE
598   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
599               "Transmitting %u byte request to DATASTORE\n",
600               msize);
601 #endif
602   memcpy (buf, &qe[1], msize);
603   qe->was_transmitted = GNUNET_YES;
604   GNUNET_SCHEDULER_cancel (qe->task);
605   qe->task = GNUNET_SCHEDULER_NO_TASK;
606   h->in_receive = GNUNET_YES;
607   GNUNET_CLIENT_receive (h->client,
608                          qe->response_proc,
609                          h,
610                          GNUNET_TIME_absolute_get_remaining (qe->timeout));
611   GNUNET_STATISTICS_update (h->stats,
612                             gettext_noop ("# bytes sent to datastore"),
613                             1,
614                             GNUNET_NO);
615   return msize;
616 }
617
618
619 /**
620  * Process entries in the queue (or do nothing if we are already
621  * doing so).
622  * 
623  * @param h handle to the datastore
624  */
625 static void
626 process_queue (struct GNUNET_DATASTORE_Handle *h)
627 {
628   struct GNUNET_DATASTORE_QueueEntry *qe;
629
630   if (NULL == (qe = h->queue_head))
631     {
632 #if DEBUG_DATASTORE > 1
633       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
634                   "Queue empty\n");
635 #endif
636       return; /* no entry in queue */
637     }
638   if (qe->was_transmitted == GNUNET_YES)
639     {
640 #if DEBUG_DATASTORE > 1
641       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
642                   "Head request already transmitted\n");
643 #endif
644       return; /* waiting for replies */
645     }
646   if (h->th != NULL)
647     {
648 #if DEBUG_DATASTORE > 1
649       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
650                   "Pending transmission request\n");
651 #endif
652       return; /* request pending */
653     }
654   if (h->client == NULL)
655     {
656 #if DEBUG_DATASTORE > 1
657       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
658                   "Not connected\n");
659 #endif
660       return; /* waiting for reconnect */
661     }
662   if (GNUNET_YES == h->in_receive)
663     {
664       /* wait for response to previous query */
665       return; 
666     }
667 #if DEBUG_DATASTORE
668   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
669               "Queueing %u byte request to DATASTORE\n",
670               qe->message_size);
671 #endif
672   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
673                                                qe->message_size,
674                                                GNUNET_TIME_absolute_get_remaining (qe->timeout),
675                                                GNUNET_YES,
676                                                &transmit_request,
677                                                h);
678 }
679
680
681 /**
682  * Dummy continuation used to do nothing (but be non-zero).
683  *
684  * @param cls closure
685  * @param result result 
686  * @param emsg error message
687  */
688 static void
689 drop_status_cont (void *cls, int32_t result, const char *emsg)
690 {
691   /* do nothing */
692 }
693
694
695 static void
696 free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
697 {
698   struct GNUNET_DATASTORE_Handle *h = qe->h;
699
700   GNUNET_CONTAINER_DLL_remove (h->queue_head,
701                                h->queue_tail,
702                                qe);
703   if (qe->task != GNUNET_SCHEDULER_NO_TASK)
704     {
705       GNUNET_SCHEDULER_cancel (qe->task);
706       qe->task = GNUNET_SCHEDULER_NO_TASK;
707     }
708   h->queue_size--;
709   qe->was_transmitted = GNUNET_SYSERR; /* use-after-free warning */
710   GNUNET_free (qe);
711 }
712
713 /**
714  * Type of a function to call when we receive a message
715  * from the service.
716  *
717  * @param cls closure
718  * @param msg message received, NULL on timeout or fatal error
719  */
720 static void 
721 process_status_message (void *cls,
722                         const struct
723                         GNUNET_MessageHeader * msg)
724 {
725   struct GNUNET_DATASTORE_Handle *h = cls;
726   struct GNUNET_DATASTORE_QueueEntry *qe;
727   struct StatusContext rc;
728   const struct StatusMessage *sm;
729   const char *emsg;
730   int32_t status;
731   int was_transmitted;
732
733   h->in_receive = GNUNET_NO;
734   if (h->skip_next_messages > 0)
735     {
736       h->skip_next_messages--;
737       process_queue (h);
738       return;
739    } 
740   if (NULL == (qe = h->queue_head))
741     {
742       GNUNET_break (0);
743       do_disconnect (h);
744       return;
745     }
746   rc = qe->qc.sc;
747   if (msg == NULL)
748     {      
749       was_transmitted = qe->was_transmitted;
750       free_queue_entry (qe);
751       if (NULL == h->client)
752         return; /* forced disconnect */
753       if (rc.cont != NULL)
754         rc.cont (rc.cont_cls, 
755                  GNUNET_SYSERR,
756                  _("Failed to receive status response from database."));
757       if (was_transmitted == GNUNET_YES)
758         do_disconnect (h);
759       return;
760     }
761   GNUNET_assert (GNUNET_YES == qe->was_transmitted);
762   free_queue_entry (qe);
763   if ( (ntohs(msg->size) < sizeof(struct StatusMessage)) ||
764        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS) ) 
765     {
766       GNUNET_break (0);
767       h->retry_time = GNUNET_TIME_UNIT_ZERO;
768       do_disconnect (h);
769       if (rc.cont != NULL)
770         rc.cont (rc.cont_cls, 
771                  GNUNET_SYSERR,
772                  _("Error reading response from datastore service"));
773       return;
774     }
775   sm = (const struct StatusMessage*) msg;
776   status = ntohl(sm->status);
777   emsg = NULL;
778   if (ntohs(msg->size) > sizeof(struct StatusMessage))
779     {
780       emsg = (const char*) &sm[1];
781       if (emsg[ntohs(msg->size) - sizeof(struct StatusMessage) - 1] != '\0')
782         {
783           GNUNET_break (0);
784           emsg = _("Invalid error message received from datastore service");
785         }
786     }  
787   if ( (status == GNUNET_SYSERR) &&
788        (emsg == NULL) )
789     {
790       GNUNET_break (0);
791       emsg = _("Invalid error message received from datastore service");
792     }
793 #if DEBUG_DATASTORE
794   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
795               "Received status %d/%s\n",
796               (int) status,
797               emsg);
798 #endif
799   GNUNET_STATISTICS_update (h->stats,
800                             gettext_noop ("# status messages received"),
801                             1,
802                             GNUNET_NO);
803   h->retry_time.rel_value = 0;
804   process_queue (h);
805   if (rc.cont != NULL)
806     rc.cont (rc.cont_cls, 
807              status,
808              emsg);
809 }
810
811
812 /**
813  * Store an item in the datastore.  If the item is already present,
814  * the priorities are summed up and the higher expiration time and
815  * lower anonymity level is used.
816  *
817  * @param h handle to the datastore
818  * @param rid reservation ID to use (from "reserve"); use 0 if no
819  *            prior reservation was made
820  * @param key key for the value
821  * @param size number of bytes in data
822  * @param data content stored
823  * @param type type of the content
824  * @param priority priority of the content
825  * @param anonymity anonymity-level for the content
826  * @param replication how often should the content be replicated to other peers?
827  * @param expiration expiration time for the content
828  * @param queue_priority ranking of this request in the priority queue
829  * @param max_queue_size at what queue size should this request be dropped
830  *        (if other requests of higher priority are in the queue)
831  * @param timeout timeout for the operation
832  * @param cont continuation to call when done
833  * @param cont_cls closure for cont
834  * @return NULL if the entry was not queued, otherwise a handle that can be used to
835  *         cancel; note that even if NULL is returned, the callback will be invoked
836  *         (or rather, will already have been invoked)
837  */
838 struct GNUNET_DATASTORE_QueueEntry *
839 GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
840                       uint32_t rid,
841                       const GNUNET_HashCode * key,
842                       size_t size,
843                       const void *data,
844                       enum GNUNET_BLOCK_Type type,
845                       uint32_t priority,
846                       uint32_t anonymity,
847                       uint32_t replication,
848                       struct GNUNET_TIME_Absolute expiration,
849                       unsigned int queue_priority,
850                       unsigned int max_queue_size,
851                       struct GNUNET_TIME_Relative timeout,
852                       GNUNET_DATASTORE_ContinuationWithStatus cont,
853                       void *cont_cls)
854 {
855   struct GNUNET_DATASTORE_QueueEntry *qe;
856   struct DataMessage *dm;
857   size_t msize;
858   union QueueContext qc;
859
860 #if DEBUG_DATASTORE
861   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
862               "Asked to put %u bytes of data under key `%s' for %llu ms\n",
863               size,
864               GNUNET_h2s (key),
865               GNUNET_TIME_absolute_get_remaining (expiration).rel_value);
866 #endif
867   msize = sizeof(struct DataMessage) + size;
868   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
869   qc.sc.cont = cont;
870   qc.sc.cont_cls = cont_cls;
871   qe = make_queue_entry (h, msize,
872                          queue_priority, max_queue_size, timeout,
873                          &process_status_message, &qc);
874   if (qe == NULL)
875     {
876 #if DEBUG_DATASTORE
877       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
878                   "Could not create queue entry for PUT\n");
879 #endif
880       return NULL;
881     }
882   GNUNET_STATISTICS_update (h->stats,
883                             gettext_noop ("# PUT requests executed"),
884                             1,
885                             GNUNET_NO);
886   dm = (struct DataMessage* ) &qe[1];
887   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
888   dm->header.size = htons(msize);
889   dm->rid = htonl(rid);
890   dm->size = htonl( (uint32_t) size);
891   dm->type = htonl(type);
892   dm->priority = htonl(priority);
893   dm->anonymity = htonl(anonymity);
894   dm->replication = htonl (replication);
895   dm->reserved = htonl (0);
896   dm->uid = GNUNET_htonll(0);
897   dm->expiration = GNUNET_TIME_absolute_hton(expiration);
898   dm->key = *key;
899   memcpy (&dm[1], data, size);
900   process_queue (h);
901   return qe;
902 }
903
904
905 /**
906  * Reserve space in the datastore.  This function should be used
907  * to avoid "out of space" failures during a longer sequence of "put"
908  * operations (for example, when a file is being inserted).
909  *
910  * @param h handle to the datastore
911  * @param amount how much space (in bytes) should be reserved (for content only)
912  * @param entries how many entries will be created (to calculate per-entry overhead)
913  * @param queue_priority ranking of this request in the priority queue
914  * @param max_queue_size at what queue size should this request be dropped
915  *        (if other requests of higher priority are in the queue)
916  * @param timeout how long to wait at most for a response (or before dying in queue)
917  * @param cont continuation to call when done; "success" will be set to
918  *             a positive reservation value if space could be reserved.
919  * @param cont_cls closure for cont
920  * @return NULL if the entry was not queued, otherwise a handle that can be used to
921  *         cancel; note that even if NULL is returned, the callback will be invoked
922  *         (or rather, will already have been invoked)
923  */
924 struct GNUNET_DATASTORE_QueueEntry *
925 GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
926                           uint64_t amount,
927                           uint32_t entries,
928                           unsigned int queue_priority,
929                           unsigned int max_queue_size,
930                           struct GNUNET_TIME_Relative timeout,
931                           GNUNET_DATASTORE_ContinuationWithStatus cont,
932                           void *cont_cls)
933 {
934   struct GNUNET_DATASTORE_QueueEntry *qe;
935   struct ReserveMessage *rm;
936   union QueueContext qc;
937
938   if (cont == NULL)
939     cont = &drop_status_cont;
940 #if DEBUG_DATASTORE
941   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
942               "Asked to reserve %llu bytes of data and %u entries'\n",
943               (unsigned long long) amount,
944               (unsigned int) entries);
945 #endif
946   qc.sc.cont = cont;
947   qc.sc.cont_cls = cont_cls;
948   qe = make_queue_entry (h, sizeof(struct ReserveMessage),
949                          queue_priority, max_queue_size, timeout,
950                          &process_status_message, &qc);
951   if (qe == NULL)
952     {
953 #if DEBUG_DATASTORE
954       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
955                   "Could not create queue entry to reserve\n");
956 #endif
957       return NULL;
958     }
959   GNUNET_STATISTICS_update (h->stats,
960                             gettext_noop ("# RESERVE requests executed"),
961                             1,
962                             GNUNET_NO);
963   rm = (struct ReserveMessage*) &qe[1];
964   rm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
965   rm->header.size = htons(sizeof (struct ReserveMessage));
966   rm->entries = htonl(entries);
967   rm->amount = GNUNET_htonll(amount);
968   process_queue (h);
969   return qe;
970 }
971
972
973 /**
974  * Signal that all of the data for which a reservation was made has
975  * been stored and that whatever excess space might have been reserved
976  * can now be released.
977  *
978  * @param h handle to the datastore
979  * @param rid reservation ID (value of "success" in original continuation
980  *        from the "reserve" function).
981  * @param queue_priority ranking of this request in the priority queue
982  * @param max_queue_size at what queue size should this request be dropped
983  *        (if other requests of higher priority are in the queue)
984  * @param queue_priority ranking of this request in the priority queue
985  * @param max_queue_size at what queue size should this request be dropped
986  *        (if other requests of higher priority are in the queue)
987  * @param timeout how long to wait at most for a response
988  * @param cont continuation to call when done
989  * @param cont_cls closure for cont
990  * @return NULL if the entry was not queued, otherwise a handle that can be used to
991  *         cancel; note that even if NULL is returned, the callback will be invoked
992  *         (or rather, will already have been invoked)
993  */
994 struct GNUNET_DATASTORE_QueueEntry *
995 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
996                                   uint32_t rid,
997                                   unsigned int queue_priority,
998                                   unsigned int max_queue_size,
999                                   struct GNUNET_TIME_Relative timeout,
1000                                   GNUNET_DATASTORE_ContinuationWithStatus cont,
1001                                   void *cont_cls)
1002 {
1003   struct GNUNET_DATASTORE_QueueEntry *qe;
1004   struct ReleaseReserveMessage *rrm;
1005   union QueueContext qc;
1006
1007   if (cont == NULL)
1008     cont = &drop_status_cont;
1009 #if DEBUG_DATASTORE
1010   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1011               "Asked to release reserve %d\n",
1012               rid);
1013 #endif
1014   qc.sc.cont = cont;
1015   qc.sc.cont_cls = cont_cls;
1016   qe = make_queue_entry (h, sizeof(struct ReleaseReserveMessage),
1017                          queue_priority, max_queue_size, timeout,
1018                          &process_status_message, &qc);
1019   if (qe == NULL)
1020     {
1021 #if DEBUG_DATASTORE
1022       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1023                   "Could not create queue entry to release reserve\n");
1024 #endif
1025       return NULL;
1026     }
1027   GNUNET_STATISTICS_update (h->stats,
1028                             gettext_noop ("# RELEASE RESERVE requests executed"),
1029                             1,
1030                             GNUNET_NO);
1031   rrm = (struct ReleaseReserveMessage*) &qe[1];
1032   rrm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
1033   rrm->header.size = htons(sizeof (struct ReleaseReserveMessage));
1034   rrm->rid = htonl(rid);
1035   process_queue (h);
1036   return qe;
1037 }
1038
1039
1040 /**
1041  * Update a value in the datastore.
1042  *
1043  * @param h handle to the datastore
1044  * @param uid identifier for the value
1045  * @param priority how much to increase the priority of the value
1046  * @param expiration new expiration value should be MAX of existing and this argument
1047  * @param queue_priority ranking of this request in the priority queue
1048  * @param max_queue_size at what queue size should this request be dropped
1049  *        (if other requests of higher priority are in the queue)
1050  * @param timeout how long to wait at most for a response
1051  * @param cont continuation to call when done
1052  * @param cont_cls closure for cont
1053  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1054  *         cancel; note that even if NULL is returned, the callback will be invoked
1055  *         (or rather, will already have been invoked)
1056  */
1057 struct GNUNET_DATASTORE_QueueEntry *
1058 GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
1059                          uint64_t uid,
1060                          uint32_t priority,
1061                          struct GNUNET_TIME_Absolute expiration,
1062                          unsigned int queue_priority,
1063                          unsigned int max_queue_size,
1064                          struct GNUNET_TIME_Relative timeout,
1065                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1066                          void *cont_cls)
1067 {
1068   struct GNUNET_DATASTORE_QueueEntry *qe;
1069   struct UpdateMessage *um;
1070   union QueueContext qc;
1071
1072   if (cont == NULL)
1073     cont = &drop_status_cont;
1074 #if DEBUG_DATASTORE
1075   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1076               "Asked to update entry %llu raising priority by %u and expiration to %llu\n",
1077               uid,
1078               (unsigned int) priority,
1079               (unsigned long long) expiration.abs_value);
1080 #endif
1081   qc.sc.cont = cont;
1082   qc.sc.cont_cls = cont_cls;
1083   qe = make_queue_entry (h, sizeof(struct UpdateMessage),
1084                          queue_priority, max_queue_size, timeout,
1085                          &process_status_message, &qc);
1086   if (qe == NULL)
1087     {
1088 #if DEBUG_DATASTORE
1089       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1090                   "Could not create queue entry for UPDATE\n");
1091 #endif
1092       return NULL;
1093     }
1094   GNUNET_STATISTICS_update (h->stats,
1095                             gettext_noop ("# UPDATE requests executed"),
1096                             1,
1097                             GNUNET_NO);
1098   um = (struct UpdateMessage*) &qe[1];
1099   um->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
1100   um->header.size = htons(sizeof (struct UpdateMessage));
1101   um->priority = htonl(priority);
1102   um->expiration = GNUNET_TIME_absolute_hton(expiration);
1103   um->uid = GNUNET_htonll(uid);
1104   process_queue (h);
1105   return qe;
1106 }
1107
1108
1109 /**
1110  * Explicitly remove some content from the database.
1111  * The "cont"inuation will be called with status
1112  * "GNUNET_OK" if content was removed, "GNUNET_NO"
1113  * if no matching entry was found and "GNUNET_SYSERR"
1114  * on all other types of errors.
1115  *
1116  * @param h handle to the datastore
1117  * @param key key for the value
1118  * @param size number of bytes in data
1119  * @param data content stored
1120  * @param queue_priority ranking of this request in the priority queue
1121  * @param max_queue_size at what queue size should this request be dropped
1122  *        (if other requests of higher priority are in the queue)
1123  * @param timeout how long to wait at most for a response
1124  * @param cont continuation to call when done
1125  * @param cont_cls closure for cont
1126  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1127  *         cancel; note that even if NULL is returned, the callback will be invoked
1128  *         (or rather, will already have been invoked)
1129  */
1130 struct GNUNET_DATASTORE_QueueEntry *
1131 GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
1132                          const GNUNET_HashCode *key,
1133                          size_t size,
1134                          const void *data,
1135                          unsigned int queue_priority,
1136                          unsigned int max_queue_size,
1137                          struct GNUNET_TIME_Relative timeout,
1138                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1139                          void *cont_cls)
1140 {
1141   struct GNUNET_DATASTORE_QueueEntry *qe;
1142   struct DataMessage *dm;
1143   size_t msize;
1144   union QueueContext qc;
1145
1146   if (cont == NULL)
1147     cont = &drop_status_cont;
1148 #if DEBUG_DATASTORE
1149   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1150               "Asked to remove %u bytes under key `%s'\n",
1151               size,
1152               GNUNET_h2s (key));
1153 #endif
1154   qc.sc.cont = cont;
1155   qc.sc.cont_cls = cont_cls;
1156   msize = sizeof(struct DataMessage) + size;
1157   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1158   qe = make_queue_entry (h, msize,
1159                          queue_priority, max_queue_size, timeout,
1160                          &process_status_message, &qc);
1161   if (qe == NULL)
1162     {
1163 #if DEBUG_DATASTORE
1164       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1165                   "Could not create queue entry for REMOVE\n");
1166 #endif
1167       return NULL;
1168     }
1169   GNUNET_STATISTICS_update (h->stats,
1170                             gettext_noop ("# REMOVE requests executed"),
1171                             1,
1172                             GNUNET_NO);
1173   dm = (struct DataMessage*) &qe[1];
1174   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
1175   dm->header.size = htons(msize);
1176   dm->rid = htonl(0);
1177   dm->size = htonl(size);
1178   dm->type = htonl(0);
1179   dm->priority = htonl(0);
1180   dm->anonymity = htonl(0);
1181   dm->uid = GNUNET_htonll(0);
1182   dm->expiration = GNUNET_TIME_absolute_hton(GNUNET_TIME_UNIT_ZERO_ABS);
1183   dm->key = *key;
1184   memcpy (&dm[1], data, size);
1185   process_queue (h);
1186   return qe;
1187 }
1188
1189
1190 /**
1191  * Type of a function to call when we receive a message
1192  * from the service.
1193  *
1194  * @param cls closure
1195  * @param msg message received, NULL on timeout or fatal error
1196  */
1197 static void 
1198 process_result_message (void *cls,
1199                         const struct GNUNET_MessageHeader *msg)
1200 {
1201   struct GNUNET_DATASTORE_Handle *h = cls;
1202   struct GNUNET_DATASTORE_QueueEntry *qe;
1203   struct ResultContext rc;
1204   const struct DataMessage *dm;
1205
1206   h->in_receive = GNUNET_NO;
1207   if (h->skip_next_messages > 0)
1208     {
1209       h->skip_next_messages--;
1210       process_queue (h);
1211       return;
1212     }
1213   if (msg == NULL)
1214     {
1215       qe = h->queue_head;
1216       GNUNET_assert (NULL != qe);
1217       if (qe->was_transmitted == GNUNET_YES)
1218         {
1219           rc = qe->qc.rc;
1220           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1221                       _("Failed to receive response from database.\n"));
1222           do_disconnect (h);
1223           free_queue_entry (qe);
1224           if (rc.proc != NULL)
1225             rc.proc (rc.proc_cls,
1226                      NULL, 0, NULL, 0, 0, 0, 
1227                      GNUNET_TIME_UNIT_ZERO_ABS, 0);    
1228         }
1229       return;
1230     }
1231   if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) 
1232     {
1233       GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader));
1234       qe = h->queue_head;
1235       rc = qe->qc.rc;
1236       GNUNET_assert (GNUNET_YES == qe->was_transmitted);
1237       free_queue_entry (qe);
1238 #if DEBUG_DATASTORE
1239       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1240                   "Received end of result set, new queue size is %u\n",
1241                   h->queue_size);
1242 #endif
1243       if (rc.proc != NULL)
1244         rc.proc (rc.proc_cls,
1245                  NULL, 0, NULL, 0, 0, 0, 
1246                  GNUNET_TIME_UNIT_ZERO_ABS, 0); 
1247       h->retry_time.rel_value = 0;
1248       h->result_count = 0;
1249       process_queue (h);
1250       return;
1251     }
1252   qe = h->queue_head;
1253   rc = qe->qc.rc;
1254   GNUNET_assert (GNUNET_YES == qe->was_transmitted);
1255   if ( (ntohs(msg->size) < sizeof(struct DataMessage)) ||
1256        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) ||
1257        (ntohs(msg->size) != sizeof(struct DataMessage) + ntohl (((const struct DataMessage*)msg)->size)) )
1258     {
1259       GNUNET_break (0);
1260       free_queue_entry (qe);
1261       h->retry_time = GNUNET_TIME_UNIT_ZERO;
1262       do_disconnect (h);
1263       if (rc.proc != NULL)
1264         rc.proc (rc.proc_cls,
1265                  NULL, 0, NULL, 0, 0, 0, 
1266                  GNUNET_TIME_UNIT_ZERO_ABS, 0);
1267       return;
1268     }
1269   GNUNET_STATISTICS_update (h->stats,
1270                             gettext_noop ("# Results received"),
1271                             1,
1272                             GNUNET_NO);
1273   dm = (const struct DataMessage*) msg;
1274 #if DEBUG_DATASTORE
1275   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1276               "Received result %llu with type %u and size %u with key %s\n",
1277               (unsigned long long) GNUNET_ntohll(dm->uid),
1278               ntohl(dm->type),
1279               ntohl(dm->size),
1280               GNUNET_h2s(&dm->key));
1281 #endif
1282   free_queue_entry (qe);
1283   h->retry_time.rel_value = 0;
1284   rc.proc (rc.proc_cls,
1285            &dm->key,
1286            ntohl(dm->size),
1287            &dm[1],
1288            ntohl(dm->type),
1289            ntohl(dm->priority),
1290            ntohl(dm->anonymity),
1291            GNUNET_TIME_absolute_ntoh(dm->expiration),   
1292            GNUNET_ntohll(dm->uid));
1293 }
1294
1295
1296 /**
1297  * Get a random value from the datastore for content replication.
1298  * Returns a single, random value among those with the highest
1299  * replication score, lowering positive replication scores by one for
1300  * the chosen value (if only content with a replication score exists,
1301  * a random value is returned and replication scores are not changed).
1302  *
1303  * @param h handle to the datastore
1304  * @param queue_priority ranking of this request in the priority queue
1305  * @param max_queue_size at what queue size should this request be dropped
1306  *        (if other requests of higher priority are in the queue)
1307  * @param timeout how long to wait at most for a response
1308  * @param proc function to call on a random value; it
1309  *        will be called once with a value (if available)
1310  *        and always once with a value of NULL.
1311  * @param proc_cls closure for proc
1312  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1313  *         cancel
1314  */
1315 struct GNUNET_DATASTORE_QueueEntry *
1316 GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
1317                                       unsigned int queue_priority,
1318                                       unsigned int max_queue_size,
1319                                       struct GNUNET_TIME_Relative timeout,
1320                                       GNUNET_DATASTORE_DatumProcessor proc, 
1321                                       void *proc_cls)
1322 {
1323   struct GNUNET_DATASTORE_QueueEntry *qe;
1324   struct GNUNET_MessageHeader *m;
1325   union QueueContext qc;
1326
1327   GNUNET_assert (NULL != proc);
1328 #if DEBUG_DATASTORE
1329   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1330               "Asked to get replication entry in %llu ms\n",
1331               (unsigned long long) timeout.rel_value);
1332 #endif
1333   qc.rc.proc = proc;
1334   qc.rc.proc_cls = proc_cls;
1335   qe = make_queue_entry (h, sizeof(struct GNUNET_MessageHeader),
1336                          queue_priority, max_queue_size, timeout,
1337                          &process_result_message, &qc);
1338   if (qe == NULL)
1339     {
1340 #if DEBUG_DATASTORE
1341       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1342                   "Could not create queue entry for GET REPLICATION\n");
1343 #endif
1344       return NULL;    
1345     }
1346   GNUNET_STATISTICS_update (h->stats,
1347                             gettext_noop ("# GET REPLICATION requests executed"),
1348                             1,
1349                             GNUNET_NO);
1350   m = (struct GNUNET_MessageHeader*) &qe[1];
1351   m->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION);
1352   m->size = htons(sizeof (struct GNUNET_MessageHeader));
1353   process_queue (h);
1354   return qe;
1355 }
1356
1357
1358 /**
1359  * Get a single zero-anonymity value from the datastore.
1360  *
1361  * @param h handle to the datastore
1362  * @param offset offset of the result (mod #num-results); set to
1363  *               a random 64-bit value initially; then increment by
1364  *               one each time; detect that all results have been found by uid
1365  *               being again the first uid ever returned.
1366  * @param queue_priority ranking of this request in the priority queue
1367  * @param max_queue_size at what queue size should this request be dropped
1368  *        (if other requests of higher priority are in the queue)
1369  * @param timeout how long to wait at most for a response
1370  * @param type allowed type for the operation (never zero)
1371  * @param proc function to call on a random value; it
1372  *        will be called once with a value (if available)
1373  *        or with NULL if none value exists.
1374  * @param proc_cls closure for proc
1375  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1376  *         cancel
1377  */
1378 struct GNUNET_DATASTORE_QueueEntry *
1379 GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1380                                      uint64_t offset,
1381                                      unsigned int queue_priority,
1382                                      unsigned int max_queue_size,
1383                                      struct GNUNET_TIME_Relative timeout,
1384                                      enum GNUNET_BLOCK_Type type,
1385                                      GNUNET_DATASTORE_DatumProcessor proc, 
1386                                      void *proc_cls)
1387 {
1388   struct GNUNET_DATASTORE_QueueEntry *qe;
1389   struct GetZeroAnonymityMessage *m;
1390   union QueueContext qc;
1391
1392   GNUNET_assert (NULL != proc);
1393   GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY);
1394 #if DEBUG_DATASTORE
1395   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1396               "Asked to get %llu-th zero-anonymity entry of type %d in %llu ms\n",
1397               (unsigned long long) offset,
1398               type,
1399               (unsigned long long) timeout.rel_value);
1400 #endif
1401   qc.rc.proc = proc;
1402   qc.rc.proc_cls = proc_cls;
1403   qe = make_queue_entry (h, sizeof(struct GetZeroAnonymityMessage),
1404                          queue_priority, max_queue_size, timeout,
1405                          &process_result_message, &qc);
1406   if (qe == NULL)
1407     {
1408 #if DEBUG_DATASTORE
1409       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1410                   "Could not create queue entry for zero-anonymity procation\n");
1411 #endif
1412       return NULL;    
1413     }
1414   GNUNET_STATISTICS_update (h->stats,
1415                             gettext_noop ("# GET ZERO ANONYMITY requests executed"),
1416                             1,
1417                             GNUNET_NO);
1418   m = (struct GetZeroAnonymityMessage*) &qe[1];
1419   m->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY);
1420   m->header.size = htons(sizeof (struct GetZeroAnonymityMessage));
1421   m->type = htonl ((uint32_t) type);
1422   m->offset = GNUNET_htonll (offset);
1423   process_queue (h);
1424   return qe;
1425 }
1426
1427
1428 /**
1429  * Get a result for a particular key from the datastore.  The processor
1430  * will only be called once.
1431  *
1432  * @param h handle to the datastore
1433  * @param offset offset of the result (mod #num-results); set to
1434  *               a random 64-bit value initially; then increment by
1435  *               one each time; detect that all results have been found by uid
1436  *               being again the first uid ever returned.
1437  * @param key maybe NULL (to match all entries)
1438  * @param type desired type, 0 for any
1439  * @param queue_priority ranking of this request in the priority queue
1440  * @param max_queue_size at what queue size should this request be dropped
1441  *        (if other requests of higher priority are in the queue)
1442  * @param timeout how long to wait at most for a response
1443  * @param proc function to call on each matching value;
1444  *        will be called once with a NULL value at the end
1445  * @param proc_cls closure for proc
1446  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1447  *         cancel
1448  */
1449 struct GNUNET_DATASTORE_QueueEntry *
1450 GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h,
1451                           uint64_t offset,
1452                           const GNUNET_HashCode * key,
1453                           enum GNUNET_BLOCK_Type type,
1454                           unsigned int queue_priority,
1455                           unsigned int max_queue_size,
1456                           struct GNUNET_TIME_Relative timeout,
1457                           GNUNET_DATASTORE_DatumProcessor proc, 
1458                           void *proc_cls)
1459 {
1460   struct GNUNET_DATASTORE_QueueEntry *qe;
1461   struct GetMessage *gm;
1462   union QueueContext qc;
1463
1464   GNUNET_assert (NULL != proc);
1465 #if DEBUG_DATASTORE
1466   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1467               "Asked to look for data of type %u under key `%s'\n",
1468               (unsigned int) type,
1469               GNUNET_h2s (key));
1470 #endif
1471   qc.rc.proc = proc;
1472   qc.rc.proc_cls = proc_cls;
1473   qe = make_queue_entry (h, sizeof(struct GetMessage),
1474                          queue_priority, max_queue_size, timeout,
1475                          &process_result_message, &qc);
1476   if (qe == NULL)
1477     {
1478 #if DEBUG_DATASTORE
1479       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1480                   "Could not queue request for `%s'\n",
1481                   GNUNET_h2s (key));
1482 #endif
1483       return NULL;
1484     }
1485   GNUNET_STATISTICS_update (h->stats,
1486                             gettext_noop ("# GET requests executed"),
1487                             1,
1488                             GNUNET_NO);
1489   gm = (struct GetMessage*) &qe[1];
1490   gm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1491   gm->type = htonl(type);
1492   gm->offset = GNUNET_htonll (offset);
1493   if (key != NULL)
1494     {
1495       gm->header.size = htons(sizeof (struct GetMessage));
1496       gm->key = *key;
1497     }
1498   else
1499     {
1500       gm->header.size = htons(sizeof (struct GetMessage) - sizeof(GNUNET_HashCode));
1501     }
1502   process_queue (h);
1503   return qe;
1504 }
1505
1506
1507 /**
1508  * Cancel a datastore operation.  The final callback from the
1509  * operation must not have been done yet.
1510  * 
1511  * @param qe operation to cancel
1512  */
1513 void
1514 GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
1515 {
1516   struct GNUNET_DATASTORE_Handle *h;
1517
1518   GNUNET_assert (GNUNET_SYSERR != qe->was_transmitted);
1519   h = qe->h;
1520 #if DEBUG_DATASTORE
1521   GNUNET_log  (GNUNET_ERROR_TYPE_DEBUG,
1522                "Pending DATASTORE request %p cancelled (%d, %d)\n",
1523                qe,
1524                qe->was_transmitted,
1525                h->queue_head == qe);
1526 #endif
1527   if (GNUNET_YES == qe->was_transmitted) 
1528     {
1529       free_queue_entry (qe);
1530       h->skip_next_messages++;
1531       return;
1532     }
1533   free_queue_entry (qe);
1534   process_queue (h);
1535 }
1536
1537
1538 /* end of datastore_api.c */