d861157d9b048fb29e1c20b15aa4d67e09596ac7
[oweals/gnunet.git] / src / datastore / datastore_api.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/datastore_api.c
23  * @brief Management for the datastore for files stored on a GNUnet node.  Implements
24  *        a priority queue for requests (with timeouts).
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet_arm_service.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_datastore_service.h"
31 #include "gnunet_statistics_service.h"
32 #include "datastore.h"
33
34 /**
35  * If a client stopped asking for more results, how many more do
36  * we receive from the DB before killing the connection?  Trade-off
37  * between re-doing TCP handshakes and (needlessly) receiving 
38  * useless results.
39  */
40 #define MAX_EXCESS_RESULTS 8
41
42 /**
43  * Context for processing status messages.
44  */
45 struct StatusContext
46 {
47   /**
48    * Continuation to call with the status.
49    */
50   GNUNET_DATASTORE_ContinuationWithStatus cont;
51
52   /**
53    * Closure for cont.
54    */
55   void *cont_cls;
56
57 };
58
59
60 /**
61  * Context for processing result messages.
62  */
63 struct ResultContext
64 {
65   /**
66    * Function to call with the result.
67    */
68   GNUNET_DATASTORE_DatumProcessor proc;
69
70   /**
71    * Closure for proc.
72    */
73   void *proc_cls;
74
75 };
76
77
78 /**
79  *  Context for a queue operation.
80  */
81 union QueueContext
82 {
83
84   struct StatusContext sc;
85   
86   struct ResultContext rc;
87
88 };
89
90
91
92 /**
93  * Entry in our priority queue.
94  */
95 struct GNUNET_DATASTORE_QueueEntry
96 {
97
98   /**
99    * This is a linked list.
100    */
101   struct GNUNET_DATASTORE_QueueEntry *next;
102
103   /**
104    * This is a linked list.
105    */
106   struct GNUNET_DATASTORE_QueueEntry *prev;
107
108   /**
109    * Handle to the master context.
110    */
111   struct GNUNET_DATASTORE_Handle *h;
112
113   /**
114    * Response processor (NULL if we are not waiting for a response).
115    * This struct should be used for the closure, function-specific
116    * arguments can be passed via 'qc'.
117    */
118   GNUNET_CLIENT_MessageHandler response_proc;
119
120   /**
121    * Function to call after transmission of the request.
122    */
123   GNUNET_DATASTORE_ContinuationWithStatus cont;
124    
125   /**
126    * Closure for 'cont'.
127    */
128   void *cont_cls;
129
130   /**
131    * Context for the operation.
132    */
133   union QueueContext qc;
134
135   /**
136    * Task for timeout signalling.
137    */
138   GNUNET_SCHEDULER_TaskIdentifier task;
139
140   /**
141    * Timeout for the current operation.
142    */
143   struct GNUNET_TIME_Absolute timeout;
144
145   /**
146    * Priority in the queue.
147    */
148   unsigned int priority;
149
150   /**
151    * Maximum allowed length of queue (otherwise
152    * this request should be discarded).
153    */
154   unsigned int max_queue;
155
156   /**
157    * Number of bytes in the request message following
158    * this struct.  32-bit value for nicer memory
159    * access (and overall struct alignment).
160    */
161   uint32_t message_size;
162
163   /**
164    * Has this message been transmitted to the service?
165    * Only ever GNUNET_YES for the head of the queue.
166    * Note that the overall struct should end at a 
167    * multiple of 64 bits.
168    */
169   int was_transmitted;
170   
171 };
172
173 /**
174  * Handle to the datastore service. 
175  */
176 struct GNUNET_DATASTORE_Handle
177 {
178
179   /**
180    * Our configuration.
181    */
182   const struct GNUNET_CONFIGURATION_Handle *cfg;
183
184   /**
185    * Current connection to the datastore service.
186    */
187   struct GNUNET_CLIENT_Connection *client;
188
189   /**
190    * Handle for statistics.
191    */
192   struct GNUNET_STATISTICS_Handle *stats;
193
194   /**
195    * Current transmit handle.
196    */
197   struct GNUNET_CLIENT_TransmitHandle *th;
198
199   /**
200    * Current head of priority queue.
201    */
202   struct GNUNET_DATASTORE_QueueEntry *queue_head;
203
204   /**
205    * Current tail of priority queue.
206    */
207   struct GNUNET_DATASTORE_QueueEntry *queue_tail;
208
209   /**
210    * Task for trying to reconnect.
211    */
212   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
213
214   /**
215    * How quickly should we retry?  Used for exponential back-off on
216    * connect-errors.
217    */
218   struct GNUNET_TIME_Relative retry_time;
219
220   /**
221    * Number of entries in the queue.
222    */
223   unsigned int queue_size;
224
225   /**
226    * Number of results we're receiving for the current query
227    * after application stopped to care.  Used to determine when
228    * to reset the connection.
229    */
230   unsigned int result_count;
231
232   /**
233    * Are we currently trying to receive from the service?
234    */
235   int in_receive;
236
237   /**
238    * We should ignore the next message(s) from the service.
239    */
240   unsigned int skip_next_messages;
241
242 };
243
244
245
246 /**
247  * Connect to the datastore service.
248  *
249  * @param cfg configuration to use
250  * @return handle to use to access the service
251  */
252 struct GNUNET_DATASTORE_Handle *
253 GNUNET_DATASTORE_connect (const struct
254                           GNUNET_CONFIGURATION_Handle
255                           *cfg)
256 {
257   struct GNUNET_CLIENT_Connection *c;
258   struct GNUNET_DATASTORE_Handle *h;
259   
260   c = GNUNET_CLIENT_connect ("datastore", cfg);
261   if (c == NULL)
262     return NULL; /* oops */
263   h = GNUNET_malloc (sizeof(struct GNUNET_DATASTORE_Handle) + 
264                      GNUNET_SERVER_MAX_MESSAGE_SIZE - 1);
265   h->client = c;
266   h->cfg = cfg;
267   h->stats = GNUNET_STATISTICS_create ("datastore-api",
268                                        cfg);
269   return h;
270 }
271
272
273 /**
274  * Transmit DROP message to datastore service.
275  *
276  * @param cls the 'struct GNUNET_DATASTORE_Handle'
277  * @param size number of bytes that can be copied to buf
278  * @param buf where to copy the drop message
279  * @return number of bytes written to buf
280  */
281 static size_t
282 transmit_drop (void *cls,
283                size_t size, 
284                void *buf)
285 {
286   struct GNUNET_DATASTORE_Handle *h = cls;
287   struct GNUNET_MessageHeader *hdr;
288   
289   if (buf == NULL)
290     {
291       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
292                   _("Failed to transmit request to drop database.\n"));
293       GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
294       return 0;
295     }
296   GNUNET_assert (size >= sizeof(struct GNUNET_MessageHeader));
297   hdr = buf;
298   hdr->size = htons(sizeof(struct GNUNET_MessageHeader));
299   hdr->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
300   GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
301   return sizeof(struct GNUNET_MessageHeader);
302 }
303
304
305 /**
306  * Disconnect from the datastore service (and free
307  * associated resources).
308  *
309  * @param h handle to the datastore
310  * @param drop set to GNUNET_YES to delete all data in datastore (!)
311  */
312 void
313 GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
314                              int drop)
315 {
316   struct GNUNET_DATASTORE_QueueEntry *qe;
317
318   if (h->client != NULL)
319     {
320       GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
321       h->client = NULL;
322     }
323   if (h->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
324     {
325       GNUNET_SCHEDULER_cancel (h->reconnect_task);
326       h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
327     }
328   while (NULL != (qe = h->queue_head))
329     {
330       GNUNET_assert (NULL != qe->response_proc);
331       qe->response_proc (h, NULL);
332     }
333   if (GNUNET_YES == drop) 
334     {
335       h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
336       if (h->client != NULL)
337         {
338           if (NULL != 
339               GNUNET_CLIENT_notify_transmit_ready (h->client,
340                                                    sizeof(struct GNUNET_MessageHeader),
341                                                    GNUNET_TIME_UNIT_MINUTES,
342                                                    GNUNET_YES,
343                                                    &transmit_drop,
344                                                    h))
345             return;
346           GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
347         }
348       GNUNET_break (0);
349     }
350   GNUNET_STATISTICS_destroy (h->stats,
351                              GNUNET_NO);
352   GNUNET_free (h);
353 }
354
355
356 /**
357  * A request has timed out (before being transmitted to the service).
358  *
359  * @param cls the 'struct GNUNET_DATASTORE_QueueEntry'
360  * @param tc scheduler context
361  */
362 static void
363 timeout_queue_entry (void *cls,
364                      const struct GNUNET_SCHEDULER_TaskContext *tc)
365 {
366   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
367
368   GNUNET_STATISTICS_update (qe->h->stats,
369                             gettext_noop ("# queue entry timeouts"),
370                             1,
371                             GNUNET_NO);
372   qe->task = GNUNET_SCHEDULER_NO_TASK;
373   GNUNET_assert (qe->was_transmitted == GNUNET_NO);
374   qe->response_proc (qe->h, NULL);
375 }
376
377
378 /**
379  * Create a new entry for our priority queue (and possibly discard other entires if
380  * the queue is getting too long).
381  *
382  * @param h handle to the datastore
383  * @param msize size of the message to queue
384  * @param queue_priority priority of the entry
385  * @param max_queue_size at what queue size should this request be dropped
386  *        (if other requests of higher priority are in the queue)
387  * @param timeout timeout for the operation
388  * @param response_proc function to call with replies (can be NULL)
389  * @param qc client context (NOT a closure for response_proc)
390  * @return NULL if the queue is full 
391  */
392 static struct GNUNET_DATASTORE_QueueEntry *
393 make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
394                   size_t msize,
395                   unsigned int queue_priority,
396                   unsigned int max_queue_size,
397                   struct GNUNET_TIME_Relative timeout,
398                   GNUNET_CLIENT_MessageHandler response_proc,            
399                   const union QueueContext *qc)
400 {
401   struct GNUNET_DATASTORE_QueueEntry *ret;
402   struct GNUNET_DATASTORE_QueueEntry *pos;
403   unsigned int c;
404
405   c = 0;
406   pos = h->queue_head;
407   while ( (pos != NULL) &&
408           (c < max_queue_size) &&
409           (pos->priority >= queue_priority) )
410     {
411       c++;
412       pos = pos->next;
413     }
414   if (c >= max_queue_size)
415     {
416       GNUNET_STATISTICS_update (h->stats,
417                                 gettext_noop ("# queue overflows"),
418                                 1,
419                                 GNUNET_NO);
420       return NULL;
421     }
422   ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize);
423   ret->h = h;
424   ret->response_proc = response_proc;
425   ret->qc = *qc;
426   ret->timeout = GNUNET_TIME_relative_to_absolute (timeout);
427   ret->priority = queue_priority;
428   ret->max_queue = max_queue_size;
429   ret->message_size = msize;
430   ret->was_transmitted = GNUNET_NO;
431   if (pos == NULL)
432     {
433       /* append at the tail */
434       pos = h->queue_tail;
435     }
436   else
437     {
438       pos = pos->prev; 
439       /* do not insert at HEAD if HEAD query was already
440          transmitted and we are still receiving replies! */
441       if ( (pos == NULL) &&
442            (h->queue_head->was_transmitted) )
443         pos = h->queue_head;
444     }
445   c++;
446   GNUNET_STATISTICS_update (h->stats,
447                             gettext_noop ("# queue entries created"),
448                             1,
449                             GNUNET_NO);
450   GNUNET_CONTAINER_DLL_insert_after (h->queue_head,
451                                      h->queue_tail,
452                                      pos,
453                                      ret);
454   h->queue_size++;
455   ret->task = GNUNET_SCHEDULER_add_delayed (timeout,
456                                             &timeout_queue_entry,
457                                             ret);
458   pos = ret->next;
459   while (pos != NULL) 
460     {
461       if (pos->max_queue < h->queue_size)
462         {
463           GNUNET_assert (pos->response_proc != NULL);
464           /* move 'pos' element to head so that it will be 
465              killed on 'NULL' call below */
466           GNUNET_CONTAINER_DLL_remove (h->queue_head,
467                                        h->queue_tail,
468                                        pos);
469           GNUNET_CONTAINER_DLL_insert (h->queue_head,
470                                        h->queue_tail,
471                                        pos);
472           pos->response_proc (h, NULL);
473           break;
474         }
475       pos = pos->next;
476     }
477   return ret;
478 }
479
480
481 /**
482  * Process entries in the queue (or do nothing if we are already
483  * doing so).
484  * 
485  * @param h handle to the datastore
486  */
487 static void
488 process_queue (struct GNUNET_DATASTORE_Handle *h);
489
490
491 /**
492  * Try reconnecting to the datastore service.
493  *
494  * @param cls the 'struct GNUNET_DATASTORE_Handle'
495  * @param tc scheduler context
496  */
497 static void
498 try_reconnect (void *cls,
499                const struct GNUNET_SCHEDULER_TaskContext *tc)
500 {
501   struct GNUNET_DATASTORE_Handle *h = cls;
502
503   if (h->retry_time.rel_value < GNUNET_CONSTANTS_SERVICE_RETRY.rel_value)
504     h->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY;
505   else
506     h->retry_time = GNUNET_TIME_relative_multiply (h->retry_time, 2);
507   if (h->retry_time.rel_value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.rel_value)
508     h->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
509   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
510   h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
511   if (h->client == NULL)
512     {
513       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
514                   "DATASTORE reconnect failed (fatally)\n");
515       return;
516     }
517   GNUNET_STATISTICS_update (h->stats,
518                             gettext_noop ("# datastore connections (re)created"),
519                             1,
520                             GNUNET_NO);
521 #if DEBUG_DATASTORE
522   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
523               "Reconnected to DATASTORE\n");
524 #endif
525   process_queue (h);
526 }
527
528
529 /**
530  * Disconnect from the service and then try reconnecting to the datastore service
531  * after some delay.
532  *
533  * @param h handle to datastore to disconnect and reconnect
534  */
535 static void
536 do_disconnect (struct GNUNET_DATASTORE_Handle *h)
537 {
538   if (h->client == NULL)
539     {
540 #if DEBUG_DATASTORE
541       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
542                   "client NULL in disconnect, will not try to reconnect\n");
543 #endif
544       return;
545     }
546 #if 0
547   GNUNET_STATISTICS_update (stats,
548                             gettext_noop ("# reconnected to DATASTORE"),
549                             1,
550                             GNUNET_NO);
551 #endif
552   GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
553   h->skip_next_messages = 0;
554   h->client = NULL;
555   h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->retry_time,
556                                                     &try_reconnect,
557                                                     h);      
558 }
559
560
561 /**
562  * Transmit request from queue to datastore service.
563  *
564  * @param cls the 'struct GNUNET_DATASTORE_Handle'
565  * @param size number of bytes that can be copied to buf
566  * @param buf where to copy the drop message
567  * @return number of bytes written to buf
568  */
569 static size_t
570 transmit_request (void *cls,
571                   size_t size, 
572                   void *buf)
573 {
574   struct GNUNET_DATASTORE_Handle *h = cls;
575   struct GNUNET_DATASTORE_QueueEntry *qe;
576   size_t msize;
577
578   h->th = NULL;
579   if (NULL == (qe = h->queue_head))
580     return 0; /* no entry in queue */
581   if (buf == NULL)
582     {
583       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
584                   _("Failed to transmit request to DATASTORE.\n"));
585       GNUNET_STATISTICS_update (h->stats,
586                                 gettext_noop ("# transmission request failures"),
587                                 1,
588                                 GNUNET_NO);
589       do_disconnect (h);
590       return 0;
591     }
592   if (size < (msize = qe->message_size))
593     {
594       process_queue (h);
595       return 0;
596     }
597  #if DEBUG_DATASTORE
598   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
599               "Transmitting %u byte request to DATASTORE\n",
600               msize);
601 #endif
602   memcpy (buf, &qe[1], msize);
603   qe->was_transmitted = GNUNET_YES;
604   GNUNET_SCHEDULER_cancel (qe->task);
605   qe->task = GNUNET_SCHEDULER_NO_TASK;
606   h->in_receive = GNUNET_YES;
607   GNUNET_CLIENT_receive (h->client,
608                          qe->response_proc,
609                          h,
610                          GNUNET_TIME_absolute_get_remaining (qe->timeout));
611   GNUNET_STATISTICS_update (h->stats,
612                             gettext_noop ("# bytes sent to datastore"),
613                             1,
614                             GNUNET_NO);
615   return msize;
616 }
617
618
619 /**
620  * Process entries in the queue (or do nothing if we are already
621  * doing so).
622  * 
623  * @param h handle to the datastore
624  */
625 static void
626 process_queue (struct GNUNET_DATASTORE_Handle *h)
627 {
628   struct GNUNET_DATASTORE_QueueEntry *qe;
629
630   if (NULL == (qe = h->queue_head))
631     {
632 #if DEBUG_DATASTORE > 1
633       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
634                   "Queue empty\n");
635 #endif
636       return; /* no entry in queue */
637     }
638   if (qe->was_transmitted == GNUNET_YES)
639     {
640 #if DEBUG_DATASTORE > 1
641       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
642                   "Head request already transmitted\n");
643 #endif
644       return; /* waiting for replies */
645     }
646   if (h->th != NULL)
647     {
648 #if DEBUG_DATASTORE > 1
649       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
650                   "Pending transmission request\n");
651 #endif
652       return; /* request pending */
653     }
654   if (h->client == NULL)
655     {
656 #if DEBUG_DATASTORE > 1
657       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
658                   "Not connected\n");
659 #endif
660       return; /* waiting for reconnect */
661     }
662   if (GNUNET_YES == h->in_receive)
663     {
664       /* wait for response to previous query */
665       return; 
666     }
667 #if DEBUG_DATASTORE
668   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
669               "Queueing %u byte request to DATASTORE\n",
670               qe->message_size);
671 #endif
672   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
673                                                qe->message_size,
674                                                GNUNET_TIME_absolute_get_remaining (qe->timeout),
675                                                GNUNET_YES,
676                                                &transmit_request,
677                                                h);
678 }
679
680
681 /**
682  * Dummy continuation used to do nothing (but be non-zero).
683  *
684  * @param cls closure
685  * @param result result 
686  * @param emsg error message
687  */
688 static void
689 drop_status_cont (void *cls, int32_t result, const char *emsg)
690 {
691   /* do nothing */
692 }
693
694
695 /**
696  * Free a queue entry.  Removes the given entry from the
697  * queue and releases associated resources.  Does NOT
698  * call the callback.
699  * 
700  * @param qe entry to free.
701  */
702 static void
703 free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
704 {
705   struct GNUNET_DATASTORE_Handle *h = qe->h;
706
707   GNUNET_CONTAINER_DLL_remove (h->queue_head,
708                                h->queue_tail,
709                                qe);
710   if (qe->task != GNUNET_SCHEDULER_NO_TASK)
711     {
712       GNUNET_SCHEDULER_cancel (qe->task);
713       qe->task = GNUNET_SCHEDULER_NO_TASK;
714     }
715   h->queue_size--;
716   qe->was_transmitted = GNUNET_SYSERR; /* use-after-free warning */
717   GNUNET_free (qe);
718 }
719
720
721 /**
722  * Type of a function to call when we receive a message
723  * from the service.
724  *
725  * @param cls closure
726  * @param msg message received, NULL on timeout or fatal error
727  */
728 static void 
729 process_status_message (void *cls,
730                         const struct
731                         GNUNET_MessageHeader * msg)
732 {
733   struct GNUNET_DATASTORE_Handle *h = cls;
734   struct GNUNET_DATASTORE_QueueEntry *qe;
735   struct StatusContext rc;
736   const struct StatusMessage *sm;
737   const char *emsg;
738   int32_t status;
739   int was_transmitted;
740
741   h->in_receive = GNUNET_NO;
742   if (h->skip_next_messages > 0)
743     {
744       h->skip_next_messages--;
745       process_queue (h);
746       return;
747    } 
748   if (NULL == (qe = h->queue_head))
749     {
750       GNUNET_break (0);
751       do_disconnect (h);
752       return;
753     }
754   rc = qe->qc.sc;
755   if (msg == NULL)
756     {      
757       was_transmitted = qe->was_transmitted;
758       free_queue_entry (qe);
759       if (NULL == h->client)
760         return; /* forced disconnect */
761       if (rc.cont != NULL)
762         rc.cont (rc.cont_cls, 
763                  GNUNET_SYSERR,
764                  _("Failed to receive status response from database."));
765       if (was_transmitted == GNUNET_YES)
766         do_disconnect (h);
767       return;
768     }
769   GNUNET_assert (GNUNET_YES == qe->was_transmitted);
770   free_queue_entry (qe);
771   if ( (ntohs(msg->size) < sizeof(struct StatusMessage)) ||
772        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS) ) 
773     {
774       GNUNET_break (0);
775       h->retry_time = GNUNET_TIME_UNIT_ZERO;
776       do_disconnect (h);
777       if (rc.cont != NULL)
778         rc.cont (rc.cont_cls, 
779                  GNUNET_SYSERR,
780                  _("Error reading response from datastore service"));
781       return;
782     }
783   sm = (const struct StatusMessage*) msg;
784   status = ntohl(sm->status);
785   emsg = NULL;
786   if (ntohs(msg->size) > sizeof(struct StatusMessage))
787     {
788       emsg = (const char*) &sm[1];
789       if (emsg[ntohs(msg->size) - sizeof(struct StatusMessage) - 1] != '\0')
790         {
791           GNUNET_break (0);
792           emsg = _("Invalid error message received from datastore service");
793         }
794     }  
795   if ( (status == GNUNET_SYSERR) &&
796        (emsg == NULL) )
797     {
798       GNUNET_break (0);
799       emsg = _("Invalid error message received from datastore service");
800     }
801 #if DEBUG_DATASTORE
802   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
803               "Received status %d/%s\n",
804               (int) status,
805               emsg);
806 #endif
807   GNUNET_STATISTICS_update (h->stats,
808                             gettext_noop ("# status messages received"),
809                             1,
810                             GNUNET_NO);
811   h->retry_time.rel_value = 0;
812   process_queue (h);
813   if (rc.cont != NULL)
814     rc.cont (rc.cont_cls, 
815              status,
816              emsg);
817 }
818
819
820 /**
821  * Store an item in the datastore.  If the item is already present,
822  * the priorities are summed up and the higher expiration time and
823  * lower anonymity level is used.
824  *
825  * @param h handle to the datastore
826  * @param rid reservation ID to use (from "reserve"); use 0 if no
827  *            prior reservation was made
828  * @param key key for the value
829  * @param size number of bytes in data
830  * @param data content stored
831  * @param type type of the content
832  * @param priority priority of the content
833  * @param anonymity anonymity-level for the content
834  * @param replication how often should the content be replicated to other peers?
835  * @param expiration expiration time for the content
836  * @param queue_priority ranking of this request in the priority queue
837  * @param max_queue_size at what queue size should this request be dropped
838  *        (if other requests of higher priority are in the queue)
839  * @param timeout timeout for the operation
840  * @param cont continuation to call when done
841  * @param cont_cls closure for cont
842  * @return NULL if the entry was not queued, otherwise a handle that can be used to
843  *         cancel; note that even if NULL is returned, the callback will be invoked
844  *         (or rather, will already have been invoked)
845  */
846 struct GNUNET_DATASTORE_QueueEntry *
847 GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
848                       uint32_t rid,
849                       const GNUNET_HashCode * key,
850                       size_t size,
851                       const void *data,
852                       enum GNUNET_BLOCK_Type type,
853                       uint32_t priority,
854                       uint32_t anonymity,
855                       uint32_t replication,
856                       struct GNUNET_TIME_Absolute expiration,
857                       unsigned int queue_priority,
858                       unsigned int max_queue_size,
859                       struct GNUNET_TIME_Relative timeout,
860                       GNUNET_DATASTORE_ContinuationWithStatus cont,
861                       void *cont_cls)
862 {
863   struct GNUNET_DATASTORE_QueueEntry *qe;
864   struct DataMessage *dm;
865   size_t msize;
866   union QueueContext qc;
867
868 #if DEBUG_DATASTORE
869   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
870               "Asked to put %u bytes of data under key `%s' for %llu ms\n",
871               size,
872               GNUNET_h2s (key),
873               GNUNET_TIME_absolute_get_remaining (expiration).rel_value);
874 #endif
875   msize = sizeof(struct DataMessage) + size;
876   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
877   qc.sc.cont = cont;
878   qc.sc.cont_cls = cont_cls;
879   qe = make_queue_entry (h, msize,
880                          queue_priority, max_queue_size, timeout,
881                          &process_status_message, &qc);
882   if (qe == NULL)
883     {
884 #if DEBUG_DATASTORE
885       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
886                   "Could not create queue entry for PUT\n");
887 #endif
888       return NULL;
889     }
890   GNUNET_STATISTICS_update (h->stats,
891                             gettext_noop ("# PUT requests executed"),
892                             1,
893                             GNUNET_NO);
894   dm = (struct DataMessage* ) &qe[1];
895   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
896   dm->header.size = htons(msize);
897   dm->rid = htonl(rid);
898   dm->size = htonl( (uint32_t) size);
899   dm->type = htonl(type);
900   dm->priority = htonl(priority);
901   dm->anonymity = htonl(anonymity);
902   dm->replication = htonl (replication);
903   dm->reserved = htonl (0);
904   dm->uid = GNUNET_htonll(0);
905   dm->expiration = GNUNET_TIME_absolute_hton(expiration);
906   dm->key = *key;
907   memcpy (&dm[1], data, size);
908   process_queue (h);
909   return qe;
910 }
911
912
913 /**
914  * Reserve space in the datastore.  This function should be used
915  * to avoid "out of space" failures during a longer sequence of "put"
916  * operations (for example, when a file is being inserted).
917  *
918  * @param h handle to the datastore
919  * @param amount how much space (in bytes) should be reserved (for content only)
920  * @param entries how many entries will be created (to calculate per-entry overhead)
921  * @param queue_priority ranking of this request in the priority queue
922  * @param max_queue_size at what queue size should this request be dropped
923  *        (if other requests of higher priority are in the queue)
924  * @param timeout how long to wait at most for a response (or before dying in queue)
925  * @param cont continuation to call when done; "success" will be set to
926  *             a positive reservation value if space could be reserved.
927  * @param cont_cls closure for cont
928  * @return NULL if the entry was not queued, otherwise a handle that can be used to
929  *         cancel; note that even if NULL is returned, the callback will be invoked
930  *         (or rather, will already have been invoked)
931  */
932 struct GNUNET_DATASTORE_QueueEntry *
933 GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
934                           uint64_t amount,
935                           uint32_t entries,
936                           unsigned int queue_priority,
937                           unsigned int max_queue_size,
938                           struct GNUNET_TIME_Relative timeout,
939                           GNUNET_DATASTORE_ContinuationWithStatus cont,
940                           void *cont_cls)
941 {
942   struct GNUNET_DATASTORE_QueueEntry *qe;
943   struct ReserveMessage *rm;
944   union QueueContext qc;
945
946   if (cont == NULL)
947     cont = &drop_status_cont;
948 #if DEBUG_DATASTORE
949   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
950               "Asked to reserve %llu bytes of data and %u entries'\n",
951               (unsigned long long) amount,
952               (unsigned int) entries);
953 #endif
954   qc.sc.cont = cont;
955   qc.sc.cont_cls = cont_cls;
956   qe = make_queue_entry (h, sizeof(struct ReserveMessage),
957                          queue_priority, max_queue_size, timeout,
958                          &process_status_message, &qc);
959   if (qe == NULL)
960     {
961 #if DEBUG_DATASTORE
962       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
963                   "Could not create queue entry to reserve\n");
964 #endif
965       return NULL;
966     }
967   GNUNET_STATISTICS_update (h->stats,
968                             gettext_noop ("# RESERVE requests executed"),
969                             1,
970                             GNUNET_NO);
971   rm = (struct ReserveMessage*) &qe[1];
972   rm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
973   rm->header.size = htons(sizeof (struct ReserveMessage));
974   rm->entries = htonl(entries);
975   rm->amount = GNUNET_htonll(amount);
976   process_queue (h);
977   return qe;
978 }
979
980
981 /**
982  * Signal that all of the data for which a reservation was made has
983  * been stored and that whatever excess space might have been reserved
984  * can now be released.
985  *
986  * @param h handle to the datastore
987  * @param rid reservation ID (value of "success" in original continuation
988  *        from the "reserve" function).
989  * @param queue_priority ranking of this request in the priority queue
990  * @param max_queue_size at what queue size should this request be dropped
991  *        (if other requests of higher priority are in the queue)
992  * @param queue_priority ranking of this request in the priority queue
993  * @param max_queue_size at what queue size should this request be dropped
994  *        (if other requests of higher priority are in the queue)
995  * @param timeout how long to wait at most for a response
996  * @param cont continuation to call when done
997  * @param cont_cls closure for cont
998  * @return NULL if the entry was not queued, otherwise a handle that can be used to
999  *         cancel; note that even if NULL is returned, the callback will be invoked
1000  *         (or rather, will already have been invoked)
1001  */
1002 struct GNUNET_DATASTORE_QueueEntry *
1003 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
1004                                   uint32_t rid,
1005                                   unsigned int queue_priority,
1006                                   unsigned int max_queue_size,
1007                                   struct GNUNET_TIME_Relative timeout,
1008                                   GNUNET_DATASTORE_ContinuationWithStatus cont,
1009                                   void *cont_cls)
1010 {
1011   struct GNUNET_DATASTORE_QueueEntry *qe;
1012   struct ReleaseReserveMessage *rrm;
1013   union QueueContext qc;
1014
1015   if (cont == NULL)
1016     cont = &drop_status_cont;
1017 #if DEBUG_DATASTORE
1018   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1019               "Asked to release reserve %d\n",
1020               rid);
1021 #endif
1022   qc.sc.cont = cont;
1023   qc.sc.cont_cls = cont_cls;
1024   qe = make_queue_entry (h, sizeof(struct ReleaseReserveMessage),
1025                          queue_priority, max_queue_size, timeout,
1026                          &process_status_message, &qc);
1027   if (qe == NULL)
1028     {
1029 #if DEBUG_DATASTORE
1030       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1031                   "Could not create queue entry to release reserve\n");
1032 #endif
1033       return NULL;
1034     }
1035   GNUNET_STATISTICS_update (h->stats,
1036                             gettext_noop ("# RELEASE RESERVE requests executed"),
1037                             1,
1038                             GNUNET_NO);
1039   rrm = (struct ReleaseReserveMessage*) &qe[1];
1040   rrm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
1041   rrm->header.size = htons(sizeof (struct ReleaseReserveMessage));
1042   rrm->rid = htonl(rid);
1043   process_queue (h);
1044   return qe;
1045 }
1046
1047
1048 /**
1049  * Update a value in the datastore.
1050  *
1051  * @param h handle to the datastore
1052  * @param uid identifier for the value
1053  * @param priority how much to increase the priority of the value
1054  * @param expiration new expiration value should be MAX of existing and this argument
1055  * @param queue_priority ranking of this request in the priority queue
1056  * @param max_queue_size at what queue size should this request be dropped
1057  *        (if other requests of higher priority are in the queue)
1058  * @param timeout how long to wait at most for a response
1059  * @param cont continuation to call when done
1060  * @param cont_cls closure for cont
1061  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1062  *         cancel; note that even if NULL is returned, the callback will be invoked
1063  *         (or rather, will already have been invoked)
1064  */
1065 struct GNUNET_DATASTORE_QueueEntry *
1066 GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
1067                          uint64_t uid,
1068                          uint32_t priority,
1069                          struct GNUNET_TIME_Absolute expiration,
1070                          unsigned int queue_priority,
1071                          unsigned int max_queue_size,
1072                          struct GNUNET_TIME_Relative timeout,
1073                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1074                          void *cont_cls)
1075 {
1076   struct GNUNET_DATASTORE_QueueEntry *qe;
1077   struct UpdateMessage *um;
1078   union QueueContext qc;
1079
1080   if (cont == NULL)
1081     cont = &drop_status_cont;
1082 #if DEBUG_DATASTORE
1083   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1084               "Asked to update entry %llu raising priority by %u and expiration to %llu\n",
1085               uid,
1086               (unsigned int) priority,
1087               (unsigned long long) expiration.abs_value);
1088 #endif
1089   qc.sc.cont = cont;
1090   qc.sc.cont_cls = cont_cls;
1091   qe = make_queue_entry (h, sizeof(struct UpdateMessage),
1092                          queue_priority, max_queue_size, timeout,
1093                          &process_status_message, &qc);
1094   if (qe == NULL)
1095     {
1096 #if DEBUG_DATASTORE
1097       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1098                   "Could not create queue entry for UPDATE\n");
1099 #endif
1100       return NULL;
1101     }
1102   GNUNET_STATISTICS_update (h->stats,
1103                             gettext_noop ("# UPDATE requests executed"),
1104                             1,
1105                             GNUNET_NO);
1106   um = (struct UpdateMessage*) &qe[1];
1107   um->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
1108   um->header.size = htons(sizeof (struct UpdateMessage));
1109   um->priority = htonl(priority);
1110   um->expiration = GNUNET_TIME_absolute_hton(expiration);
1111   um->uid = GNUNET_htonll(uid);
1112   process_queue (h);
1113   return qe;
1114 }
1115
1116
1117 /**
1118  * Explicitly remove some content from the database.
1119  * The "cont"inuation will be called with status
1120  * "GNUNET_OK" if content was removed, "GNUNET_NO"
1121  * if no matching entry was found and "GNUNET_SYSERR"
1122  * on all other types of errors.
1123  *
1124  * @param h handle to the datastore
1125  * @param key key for the value
1126  * @param size number of bytes in data
1127  * @param data content stored
1128  * @param queue_priority ranking of this request in the priority queue
1129  * @param max_queue_size at what queue size should this request be dropped
1130  *        (if other requests of higher priority are in the queue)
1131  * @param timeout how long to wait at most for a response
1132  * @param cont continuation to call when done
1133  * @param cont_cls closure for cont
1134  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1135  *         cancel; note that even if NULL is returned, the callback will be invoked
1136  *         (or rather, will already have been invoked)
1137  */
1138 struct GNUNET_DATASTORE_QueueEntry *
1139 GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
1140                          const GNUNET_HashCode *key,
1141                          size_t size,
1142                          const void *data,
1143                          unsigned int queue_priority,
1144                          unsigned int max_queue_size,
1145                          struct GNUNET_TIME_Relative timeout,
1146                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1147                          void *cont_cls)
1148 {
1149   struct GNUNET_DATASTORE_QueueEntry *qe;
1150   struct DataMessage *dm;
1151   size_t msize;
1152   union QueueContext qc;
1153
1154   if (cont == NULL)
1155     cont = &drop_status_cont;
1156 #if DEBUG_DATASTORE
1157   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1158               "Asked to remove %u bytes under key `%s'\n",
1159               size,
1160               GNUNET_h2s (key));
1161 #endif
1162   qc.sc.cont = cont;
1163   qc.sc.cont_cls = cont_cls;
1164   msize = sizeof(struct DataMessage) + size;
1165   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1166   qe = make_queue_entry (h, msize,
1167                          queue_priority, max_queue_size, timeout,
1168                          &process_status_message, &qc);
1169   if (qe == NULL)
1170     {
1171 #if DEBUG_DATASTORE
1172       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1173                   "Could not create queue entry for REMOVE\n");
1174 #endif
1175       return NULL;
1176     }
1177   GNUNET_STATISTICS_update (h->stats,
1178                             gettext_noop ("# REMOVE requests executed"),
1179                             1,
1180                             GNUNET_NO);
1181   dm = (struct DataMessage*) &qe[1];
1182   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
1183   dm->header.size = htons(msize);
1184   dm->rid = htonl(0);
1185   dm->size = htonl(size);
1186   dm->type = htonl(0);
1187   dm->priority = htonl(0);
1188   dm->anonymity = htonl(0);
1189   dm->uid = GNUNET_htonll(0);
1190   dm->expiration = GNUNET_TIME_absolute_hton(GNUNET_TIME_UNIT_ZERO_ABS);
1191   dm->key = *key;
1192   memcpy (&dm[1], data, size);
1193   process_queue (h);
1194   return qe;
1195 }
1196
1197
1198 /**
1199  * Type of a function to call when we receive a message
1200  * from the service.
1201  *
1202  * @param cls closure
1203  * @param msg message received, NULL on timeout or fatal error
1204  */
1205 static void 
1206 process_result_message (void *cls,
1207                         const struct GNUNET_MessageHeader *msg)
1208 {
1209   struct GNUNET_DATASTORE_Handle *h = cls;
1210   struct GNUNET_DATASTORE_QueueEntry *qe;
1211   struct ResultContext rc;
1212   const struct DataMessage *dm;
1213
1214   h->in_receive = GNUNET_NO;
1215   if (h->skip_next_messages > 0)
1216     {
1217       h->skip_next_messages--;
1218       process_queue (h);
1219       return;
1220     }
1221   if (msg == NULL)
1222     {
1223       qe = h->queue_head;
1224       GNUNET_assert (NULL != qe);
1225       if (qe->was_transmitted == GNUNET_YES)
1226         {
1227           rc = qe->qc.rc;
1228           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1229                       _("Failed to receive response from database.\n"));
1230           do_disconnect (h);
1231           free_queue_entry (qe);
1232           if (rc.proc != NULL)
1233             rc.proc (rc.proc_cls,
1234                      NULL, 0, NULL, 0, 0, 0, 
1235                      GNUNET_TIME_UNIT_ZERO_ABS, 0);    
1236         }
1237       return;
1238     }
1239   if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) 
1240     {
1241       GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader));
1242       qe = h->queue_head;
1243       rc = qe->qc.rc;
1244       GNUNET_assert (GNUNET_YES == qe->was_transmitted);
1245       free_queue_entry (qe);
1246 #if DEBUG_DATASTORE
1247       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1248                   "Received end of result set, new queue size is %u\n",
1249                   h->queue_size);
1250 #endif
1251       if (rc.proc != NULL)
1252         rc.proc (rc.proc_cls,
1253                  NULL, 0, NULL, 0, 0, 0, 
1254                  GNUNET_TIME_UNIT_ZERO_ABS, 0); 
1255       h->retry_time.rel_value = 0;
1256       h->result_count = 0;
1257       process_queue (h);
1258       return;
1259     }
1260   qe = h->queue_head;
1261   rc = qe->qc.rc;
1262   GNUNET_assert (GNUNET_YES == qe->was_transmitted);
1263   if ( (ntohs(msg->size) < sizeof(struct DataMessage)) ||
1264        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) ||
1265        (ntohs(msg->size) != sizeof(struct DataMessage) + ntohl (((const struct DataMessage*)msg)->size)) )
1266     {
1267       GNUNET_break (0);
1268       free_queue_entry (qe);
1269       h->retry_time = GNUNET_TIME_UNIT_ZERO;
1270       do_disconnect (h);
1271       if (rc.proc != NULL)
1272         rc.proc (rc.proc_cls,
1273                  NULL, 0, NULL, 0, 0, 0, 
1274                  GNUNET_TIME_UNIT_ZERO_ABS, 0);
1275       return;
1276     }
1277   GNUNET_STATISTICS_update (h->stats,
1278                             gettext_noop ("# Results received"),
1279                             1,
1280                             GNUNET_NO);
1281   dm = (const struct DataMessage*) msg;
1282 #if DEBUG_DATASTORE
1283   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1284               "Received result %llu with type %u and size %u with key %s\n",
1285               (unsigned long long) GNUNET_ntohll(dm->uid),
1286               ntohl(dm->type),
1287               ntohl(dm->size),
1288               GNUNET_h2s(&dm->key));
1289 #endif
1290   free_queue_entry (qe);
1291   h->retry_time.rel_value = 0;
1292   if (rc.proc != NULL)
1293     rc.proc (rc.proc_cls,
1294              &dm->key,
1295              ntohl(dm->size),
1296              &dm[1],
1297              ntohl(dm->type),
1298              ntohl(dm->priority),
1299              ntohl(dm->anonymity),
1300              GNUNET_TIME_absolute_ntoh(dm->expiration), 
1301              GNUNET_ntohll(dm->uid));
1302 }
1303
1304
1305 /**
1306  * Get a random value from the datastore for content replication.
1307  * Returns a single, random value among those with the highest
1308  * replication score, lowering positive replication scores by one for
1309  * the chosen value (if only content with a replication score exists,
1310  * a random value is returned and replication scores are not changed).
1311  *
1312  * @param h handle to the datastore
1313  * @param queue_priority ranking of this request in the priority queue
1314  * @param max_queue_size at what queue size should this request be dropped
1315  *        (if other requests of higher priority are in the queue)
1316  * @param timeout how long to wait at most for a response
1317  * @param proc function to call on a random value; it
1318  *        will be called once with a value (if available)
1319  *        and always once with a value of NULL.
1320  * @param proc_cls closure for proc
1321  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1322  *         cancel
1323  */
1324 struct GNUNET_DATASTORE_QueueEntry *
1325 GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
1326                                       unsigned int queue_priority,
1327                                       unsigned int max_queue_size,
1328                                       struct GNUNET_TIME_Relative timeout,
1329                                       GNUNET_DATASTORE_DatumProcessor proc, 
1330                                       void *proc_cls)
1331 {
1332   struct GNUNET_DATASTORE_QueueEntry *qe;
1333   struct GNUNET_MessageHeader *m;
1334   union QueueContext qc;
1335
1336   GNUNET_assert (NULL != proc);
1337 #if DEBUG_DATASTORE
1338   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1339               "Asked to get replication entry in %llu ms\n",
1340               (unsigned long long) timeout.rel_value);
1341 #endif
1342   qc.rc.proc = proc;
1343   qc.rc.proc_cls = proc_cls;
1344   qe = make_queue_entry (h, sizeof(struct GNUNET_MessageHeader),
1345                          queue_priority, max_queue_size, timeout,
1346                          &process_result_message, &qc);
1347   if (qe == NULL)
1348     {
1349 #if DEBUG_DATASTORE
1350       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1351                   "Could not create queue entry for GET REPLICATION\n");
1352 #endif
1353       return NULL;    
1354     }
1355   GNUNET_STATISTICS_update (h->stats,
1356                             gettext_noop ("# GET REPLICATION requests executed"),
1357                             1,
1358                             GNUNET_NO);
1359   m = (struct GNUNET_MessageHeader*) &qe[1];
1360   m->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION);
1361   m->size = htons(sizeof (struct GNUNET_MessageHeader));
1362   process_queue (h);
1363   return qe;
1364 }
1365
1366
1367 /**
1368  * Get a single zero-anonymity value from the datastore.
1369  *
1370  * @param h handle to the datastore
1371  * @param offset offset of the result (mod #num-results); set to
1372  *               a random 64-bit value initially; then increment by
1373  *               one each time; detect that all results have been found by uid
1374  *               being again the first uid ever returned.
1375  * @param queue_priority ranking of this request in the priority queue
1376  * @param max_queue_size at what queue size should this request be dropped
1377  *        (if other requests of higher priority are in the queue)
1378  * @param timeout how long to wait at most for a response
1379  * @param type allowed type for the operation (never zero)
1380  * @param proc function to call on a random value; it
1381  *        will be called once with a value (if available)
1382  *        or with NULL if none value exists.
1383  * @param proc_cls closure for proc
1384  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1385  *         cancel
1386  */
1387 struct GNUNET_DATASTORE_QueueEntry *
1388 GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1389                                      uint64_t offset,
1390                                      unsigned int queue_priority,
1391                                      unsigned int max_queue_size,
1392                                      struct GNUNET_TIME_Relative timeout,
1393                                      enum GNUNET_BLOCK_Type type,
1394                                      GNUNET_DATASTORE_DatumProcessor proc, 
1395                                      void *proc_cls)
1396 {
1397   struct GNUNET_DATASTORE_QueueEntry *qe;
1398   struct GetZeroAnonymityMessage *m;
1399   union QueueContext qc;
1400
1401   GNUNET_assert (NULL != proc);
1402   GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY);
1403 #if DEBUG_DATASTORE
1404   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1405               "Asked to get %llu-th zero-anonymity entry of type %d in %llu ms\n",
1406               (unsigned long long) offset,
1407               type,
1408               (unsigned long long) timeout.rel_value);
1409 #endif
1410   qc.rc.proc = proc;
1411   qc.rc.proc_cls = proc_cls;
1412   qe = make_queue_entry (h, sizeof(struct GetZeroAnonymityMessage),
1413                          queue_priority, max_queue_size, timeout,
1414                          &process_result_message, &qc);
1415   if (qe == NULL)
1416     {
1417 #if DEBUG_DATASTORE
1418       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1419                   "Could not create queue entry for zero-anonymity procation\n");
1420 #endif
1421       return NULL;    
1422     }
1423   GNUNET_STATISTICS_update (h->stats,
1424                             gettext_noop ("# GET ZERO ANONYMITY requests executed"),
1425                             1,
1426                             GNUNET_NO);
1427   m = (struct GetZeroAnonymityMessage*) &qe[1];
1428   m->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY);
1429   m->header.size = htons(sizeof (struct GetZeroAnonymityMessage));
1430   m->type = htonl ((uint32_t) type);
1431   m->offset = GNUNET_htonll (offset);
1432   process_queue (h);
1433   return qe;
1434 }
1435
1436
1437 /**
1438  * Get a result for a particular key from the datastore.  The processor
1439  * will only be called once.
1440  *
1441  * @param h handle to the datastore
1442  * @param offset offset of the result (mod #num-results); set to
1443  *               a random 64-bit value initially; then increment by
1444  *               one each time; detect that all results have been found by uid
1445  *               being again the first uid ever returned.
1446  * @param key maybe NULL (to match all entries)
1447  * @param type desired type, 0 for any
1448  * @param queue_priority ranking of this request in the priority queue
1449  * @param max_queue_size at what queue size should this request be dropped
1450  *        (if other requests of higher priority are in the queue)
1451  * @param timeout how long to wait at most for a response
1452  * @param proc function to call on each matching value;
1453  *        will be called once with a NULL value at the end
1454  * @param proc_cls closure for proc
1455  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1456  *         cancel
1457  */
1458 struct GNUNET_DATASTORE_QueueEntry *
1459 GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h,
1460                           uint64_t offset,
1461                           const GNUNET_HashCode * key,
1462                           enum GNUNET_BLOCK_Type type,
1463                           unsigned int queue_priority,
1464                           unsigned int max_queue_size,
1465                           struct GNUNET_TIME_Relative timeout,
1466                           GNUNET_DATASTORE_DatumProcessor proc, 
1467                           void *proc_cls)
1468 {
1469   struct GNUNET_DATASTORE_QueueEntry *qe;
1470   struct GetMessage *gm;
1471   union QueueContext qc;
1472
1473   GNUNET_assert (NULL != proc);
1474 #if DEBUG_DATASTORE
1475   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1476               "Asked to look for data of type %u under key `%s'\n",
1477               (unsigned int) type,
1478               GNUNET_h2s (key));
1479 #endif
1480   qc.rc.proc = proc;
1481   qc.rc.proc_cls = proc_cls;
1482   qe = make_queue_entry (h, sizeof(struct GetMessage),
1483                          queue_priority, max_queue_size, timeout,
1484                          &process_result_message, &qc);
1485   if (qe == NULL)
1486     {
1487 #if DEBUG_DATASTORE
1488       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1489                   "Could not queue request for `%s'\n",
1490                   GNUNET_h2s (key));
1491 #endif
1492       return NULL;
1493     }
1494   GNUNET_STATISTICS_update (h->stats,
1495                             gettext_noop ("# GET requests executed"),
1496                             1,
1497                             GNUNET_NO);
1498   gm = (struct GetMessage*) &qe[1];
1499   gm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1500   gm->type = htonl(type);
1501   gm->offset = GNUNET_htonll (offset);
1502   if (key != NULL)
1503     {
1504       gm->header.size = htons(sizeof (struct GetMessage));
1505       gm->key = *key;
1506     }
1507   else
1508     {
1509       gm->header.size = htons(sizeof (struct GetMessage) - sizeof(GNUNET_HashCode));
1510     }
1511   process_queue (h);
1512   return qe;
1513 }
1514
1515
1516 /**
1517  * Cancel a datastore operation.  The final callback from the
1518  * operation must not have been done yet.
1519  * 
1520  * @param qe operation to cancel
1521  */
1522 void
1523 GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
1524 {
1525   struct GNUNET_DATASTORE_Handle *h;
1526
1527   GNUNET_assert (GNUNET_SYSERR != qe->was_transmitted);
1528   h = qe->h;
1529 #if DEBUG_DATASTORE
1530   GNUNET_log  (GNUNET_ERROR_TYPE_DEBUG,
1531                "Pending DATASTORE request %p cancelled (%d, %d)\n",
1532                qe,
1533                qe->was_transmitted,
1534                h->queue_head == qe);
1535 #endif
1536   if (GNUNET_YES == qe->was_transmitted) 
1537     {
1538       free_queue_entry (qe);
1539       h->skip_next_messages++;
1540       return;
1541     }
1542   free_queue_entry (qe);
1543   process_queue (h);
1544 }
1545
1546
1547 /* end of datastore_api.c */