6eafbee8bae6a89d30ffb6d94841679a2d6a25d6
[oweals/gnunet.git] / src / datastore / datastore_api.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/datastore_api.c
23  * @brief Management for the datastore for files stored on a GNUnet node.  Implements
24  *        a priority queue for requests (with timeouts).
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet_arm_service.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_datastore_service.h"
31 #include "gnunet_statistics_service.h"
32 #include "datastore.h"
33
34 /**
35  * If a client stopped asking for more results, how many more do
36  * we receive from the DB before killing the connection?  Trade-off
37  * between re-doing TCP handshakes and (needlessly) receiving 
38  * useless results.
39  */
40 #define MAX_EXCESS_RESULTS 8
41
42 /**
43  * Context for processing status messages.
44  */
45 struct StatusContext
46 {
47   /**
48    * Continuation to call with the status.
49    */
50   GNUNET_DATASTORE_ContinuationWithStatus cont;
51
52   /**
53    * Closure for cont.
54    */
55   void *cont_cls;
56
57 };
58
59
60 /**
61  * Context for processing result messages.
62  */
63 struct ResultContext
64 {
65   /**
66    * Function to call with the result.
67    */
68   GNUNET_DATASTORE_DatumProcessor proc;
69
70   /**
71    * Closure for proc.
72    */
73   void *proc_cls;
74
75 };
76
77
78 /**
79  *  Context for a queue operation.
80  */
81 union QueueContext
82 {
83
84   struct StatusContext sc;
85   
86   struct ResultContext rc;
87
88 };
89
90
91
92 /**
93  * Entry in our priority queue.
94  */
95 struct GNUNET_DATASTORE_QueueEntry
96 {
97
98   /**
99    * This is a linked list.
100    */
101   struct GNUNET_DATASTORE_QueueEntry *next;
102
103   /**
104    * This is a linked list.
105    */
106   struct GNUNET_DATASTORE_QueueEntry *prev;
107
108   /**
109    * Handle to the master context.
110    */
111   struct GNUNET_DATASTORE_Handle *h;
112
113   /**
114    * Response processor (NULL if we are not waiting for a response).
115    * This struct should be used for the closure, function-specific
116    * arguments can be passed via 'qc'.
117    */
118   GNUNET_CLIENT_MessageHandler response_proc;
119
120   /**
121    * Function to call after transmission of the request.
122    */
123   GNUNET_DATASTORE_ContinuationWithStatus cont;
124    
125   /**
126    * Closure for 'cont'.
127    */
128   void *cont_cls;
129
130   /**
131    * Context for the operation.
132    */
133   union QueueContext qc;
134
135   /**
136    * Task for timeout signalling.
137    */
138   GNUNET_SCHEDULER_TaskIdentifier task;
139
140   /**
141    * Timeout for the current operation.
142    */
143   struct GNUNET_TIME_Absolute timeout;
144
145   /**
146    * Priority in the queue.
147    */
148   unsigned int priority;
149
150   /**
151    * Maximum allowed length of queue (otherwise
152    * this request should be discarded).
153    */
154   unsigned int max_queue;
155
156   /**
157    * Number of bytes in the request message following
158    * this struct.  32-bit value for nicer memory
159    * access (and overall struct alignment).
160    */
161   uint32_t message_size;
162
163   /**
164    * Has this message been transmitted to the service?
165    * Only ever GNUNET_YES for the head of the queue.
166    * Note that the overall struct should end at a 
167    * multiple of 64 bits.
168    */
169   int was_transmitted;
170   
171 };
172
173 /**
174  * Handle to the datastore service. 
175  */
176 struct GNUNET_DATASTORE_Handle
177 {
178
179   /**
180    * Our configuration.
181    */
182   const struct GNUNET_CONFIGURATION_Handle *cfg;
183
184   /**
185    * Current connection to the datastore service.
186    */
187   struct GNUNET_CLIENT_Connection *client;
188
189   /**
190    * Handle for statistics.
191    */
192   struct GNUNET_STATISTICS_Handle *stats;
193
194   /**
195    * Current transmit handle.
196    */
197   struct GNUNET_CLIENT_TransmitHandle *th;
198
199   /**
200    * Current head of priority queue.
201    */
202   struct GNUNET_DATASTORE_QueueEntry *queue_head;
203
204   /**
205    * Current tail of priority queue.
206    */
207   struct GNUNET_DATASTORE_QueueEntry *queue_tail;
208
209   /**
210    * Task for trying to reconnect.
211    */
212   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
213
214   /**
215    * How quickly should we retry?  Used for exponential back-off on
216    * connect-errors.
217    */
218   struct GNUNET_TIME_Relative retry_time;
219
220   /**
221    * Number of entries in the queue.
222    */
223   unsigned int queue_size;
224
225   /**
226    * Number of results we're receiving for the current query
227    * after application stopped to care.  Used to determine when
228    * to reset the connection.
229    */
230   unsigned int result_count;
231
232   /**
233    * Are we currently trying to receive from the service?
234    */
235   int in_receive;
236
237   /**
238    * We should ignore the next message(s) from the service.
239    */
240   unsigned int skip_next_messages;
241
242 };
243
244
245
246 /**
247  * Connect to the datastore service.
248  *
249  * @param cfg configuration to use
250  * @return handle to use to access the service
251  */
252 struct GNUNET_DATASTORE_Handle *
253 GNUNET_DATASTORE_connect (const struct
254                           GNUNET_CONFIGURATION_Handle
255                           *cfg)
256 {
257   struct GNUNET_CLIENT_Connection *c;
258   struct GNUNET_DATASTORE_Handle *h;
259   
260   c = GNUNET_CLIENT_connect ("datastore", cfg);
261   if (c == NULL)
262     return NULL; /* oops */
263   h = GNUNET_malloc (sizeof(struct GNUNET_DATASTORE_Handle) + 
264                      GNUNET_SERVER_MAX_MESSAGE_SIZE - 1);
265   h->client = c;
266   h->cfg = cfg;
267   h->stats = GNUNET_STATISTICS_create ("datastore-api",
268                                        cfg);
269   return h;
270 }
271
272
273 /**
274  * Transmit DROP message to datastore service.
275  *
276  * @param cls the 'struct GNUNET_DATASTORE_Handle'
277  * @param size number of bytes that can be copied to buf
278  * @param buf where to copy the drop message
279  * @return number of bytes written to buf
280  */
281 static size_t
282 transmit_drop (void *cls,
283                size_t size, 
284                void *buf)
285 {
286   struct GNUNET_DATASTORE_Handle *h = cls;
287   struct GNUNET_MessageHeader *hdr;
288   
289   if (buf == NULL)
290     {
291       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
292                   _("Failed to transmit request to drop database.\n"));
293       GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
294       return 0;
295     }
296   GNUNET_assert (size >= sizeof(struct GNUNET_MessageHeader));
297   hdr = buf;
298   hdr->size = htons(sizeof(struct GNUNET_MessageHeader));
299   hdr->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
300   GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
301   return sizeof(struct GNUNET_MessageHeader);
302 }
303
304
305 /**
306  * Disconnect from the datastore service (and free
307  * associated resources).
308  *
309  * @param h handle to the datastore
310  * @param drop set to GNUNET_YES to delete all data in datastore (!)
311  */
312 void
313 GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
314                              int drop)
315 {
316   struct GNUNET_DATASTORE_QueueEntry *qe;
317
318   if (NULL != h->th)
319     {
320       GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
321       h->th = NULL;
322     }
323   if (h->client != NULL)
324     {
325       GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
326       h->client = NULL;
327     }
328   if (h->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
329     {
330       GNUNET_SCHEDULER_cancel (h->reconnect_task);
331       h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
332     }
333   while (NULL != (qe = h->queue_head))
334     {
335       GNUNET_assert (NULL != qe->response_proc);
336       qe->response_proc (h, NULL);
337     }
338   if (GNUNET_YES == drop) 
339     {
340       h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
341       if (h->client != NULL)
342         {
343           if (NULL != 
344               GNUNET_CLIENT_notify_transmit_ready (h->client,
345                                                    sizeof(struct GNUNET_MessageHeader),
346                                                    GNUNET_TIME_UNIT_MINUTES,
347                                                    GNUNET_YES,
348                                                    &transmit_drop,
349                                                    h))
350             return;
351           GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
352           h->client = NULL;
353         }
354       GNUNET_break (0);
355     }
356   GNUNET_STATISTICS_destroy (h->stats,
357                              GNUNET_NO);
358   h->stats = NULL;
359   GNUNET_free (h);
360 }
361
362
363 /**
364  * A request has timed out (before being transmitted to the service).
365  *
366  * @param cls the 'struct GNUNET_DATASTORE_QueueEntry'
367  * @param tc scheduler context
368  */
369 static void
370 timeout_queue_entry (void *cls,
371                      const struct GNUNET_SCHEDULER_TaskContext *tc)
372 {
373   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
374
375   GNUNET_STATISTICS_update (qe->h->stats,
376                             gettext_noop ("# queue entry timeouts"),
377                             1,
378                             GNUNET_NO);
379   qe->task = GNUNET_SCHEDULER_NO_TASK;
380   GNUNET_assert (qe->was_transmitted == GNUNET_NO); 
381   qe->response_proc (qe->h, NULL);
382 }
383
384
385 /**
386  * Create a new entry for our priority queue (and possibly discard other entires if
387  * the queue is getting too long).
388  *
389  * @param h handle to the datastore
390  * @param msize size of the message to queue
391  * @param queue_priority priority of the entry
392  * @param max_queue_size at what queue size should this request be dropped
393  *        (if other requests of higher priority are in the queue)
394  * @param timeout timeout for the operation
395  * @param response_proc function to call with replies (can be NULL)
396  * @param qc client context (NOT a closure for response_proc)
397  * @return NULL if the queue is full 
398  */
399 static struct GNUNET_DATASTORE_QueueEntry *
400 make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
401                   size_t msize,
402                   unsigned int queue_priority,
403                   unsigned int max_queue_size,
404                   struct GNUNET_TIME_Relative timeout,
405                   GNUNET_CLIENT_MessageHandler response_proc,            
406                   const union QueueContext *qc)
407 {
408   struct GNUNET_DATASTORE_QueueEntry *ret;
409   struct GNUNET_DATASTORE_QueueEntry *pos;
410   unsigned int c;
411
412   c = 0;
413   pos = h->queue_head;
414   while ( (pos != NULL) &&
415           (c < max_queue_size) &&
416           (pos->priority >= queue_priority) )
417     {
418       c++;
419       pos = pos->next;
420     }
421   if (c >= max_queue_size)
422     {
423       GNUNET_STATISTICS_update (h->stats,
424                                 gettext_noop ("# queue overflows"),
425                                 1,
426                                 GNUNET_NO);
427       return NULL;
428     }
429   ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize);
430   ret->h = h;
431   ret->response_proc = response_proc;
432   ret->qc = *qc;
433   ret->timeout = GNUNET_TIME_relative_to_absolute (timeout);
434   ret->priority = queue_priority;
435   ret->max_queue = max_queue_size;
436   ret->message_size = msize;
437   ret->was_transmitted = GNUNET_NO;
438   if (pos == NULL)
439     {
440       /* append at the tail */
441       pos = h->queue_tail;
442     }
443   else
444     {
445       pos = pos->prev; 
446       /* do not insert at HEAD if HEAD query was already
447          transmitted and we are still receiving replies! */
448       if ( (pos == NULL) &&
449            (h->queue_head->was_transmitted) )
450         pos = h->queue_head;
451     }
452   c++;
453   GNUNET_STATISTICS_update (h->stats,
454                             gettext_noop ("# queue entries created"),
455                             1,
456                             GNUNET_NO);
457   GNUNET_CONTAINER_DLL_insert_after (h->queue_head,
458                                      h->queue_tail,
459                                      pos,
460                                      ret);
461   h->queue_size++;
462   ret->task = GNUNET_SCHEDULER_add_delayed (timeout,
463                                             &timeout_queue_entry,
464                                             ret);
465   pos = ret->next;
466   while (pos != NULL) 
467     {
468       if ( (pos->max_queue < h->queue_size) &&
469            (pos->was_transmitted == GNUNET_NO) )
470         {
471           GNUNET_assert (pos->response_proc != NULL);
472           /* move 'pos' element to head so that it will be 
473              killed on 'NULL' call below */
474           GNUNET_CONTAINER_DLL_remove (h->queue_head,
475                                        h->queue_tail,
476                                        pos);
477           GNUNET_CONTAINER_DLL_insert (h->queue_head,
478                                        h->queue_tail,
479                                        pos);
480           GNUNET_STATISTICS_update (h->stats,
481                                     gettext_noop ("# Requests dropped from datastore queue"),
482                                     1,
483                                     GNUNET_NO);
484           pos->response_proc (h, NULL);
485           break;
486         }
487       pos = pos->next;
488     }
489   return ret;
490 }
491
492
493 /**
494  * Process entries in the queue (or do nothing if we are already
495  * doing so).
496  * 
497  * @param h handle to the datastore
498  */
499 static void
500 process_queue (struct GNUNET_DATASTORE_Handle *h);
501
502
503 /**
504  * Try reconnecting to the datastore service.
505  *
506  * @param cls the 'struct GNUNET_DATASTORE_Handle'
507  * @param tc scheduler context
508  */
509 static void
510 try_reconnect (void *cls,
511                const struct GNUNET_SCHEDULER_TaskContext *tc)
512 {
513   struct GNUNET_DATASTORE_Handle *h = cls;
514
515   if (h->retry_time.rel_value < GNUNET_CONSTANTS_SERVICE_RETRY.rel_value)
516     h->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY;
517   else
518     h->retry_time = GNUNET_TIME_relative_multiply (h->retry_time, 2);
519   if (h->retry_time.rel_value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.rel_value)
520     h->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
521   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
522   h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
523   if (h->client == NULL)
524     {
525       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
526                   "DATASTORE reconnect failed (fatally)\n");
527       return;
528     }
529   GNUNET_STATISTICS_update (h->stats,
530                             gettext_noop ("# datastore connections (re)created"),
531                             1,
532                             GNUNET_NO);
533 #if DEBUG_DATASTORE
534   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
535               "Reconnected to DATASTORE\n");
536 #endif
537   process_queue (h);
538 }
539
540
541 /**
542  * Disconnect from the service and then try reconnecting to the datastore service
543  * after some delay.
544  *
545  * @param h handle to datastore to disconnect and reconnect
546  */
547 static void
548 do_disconnect (struct GNUNET_DATASTORE_Handle *h)
549 {
550   if (h->client == NULL)
551     {
552 #if DEBUG_DATASTORE
553       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
554                   "client NULL in disconnect, will not try to reconnect\n");
555 #endif
556       return;
557     }
558 #if 0
559   GNUNET_STATISTICS_update (stats,
560                             gettext_noop ("# reconnected to DATASTORE"),
561                             1,
562                             GNUNET_NO);
563 #endif
564   GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
565   h->skip_next_messages = 0;
566   h->client = NULL;
567   h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->retry_time,
568                                                     &try_reconnect,
569                                                     h);      
570 }
571
572
573 /**
574  * Function called whenever we receive a message from
575  * the service.  Calls the appropriate handler.
576  *
577  * @param cls the 'struct GNUNET_DATASTORE_Handle'
578  * @param msg the received message
579  */
580 static void 
581 receive_cb (void *cls,
582             const struct GNUNET_MessageHeader *msg)
583 {
584   struct GNUNET_DATASTORE_Handle *h = cls;
585   struct GNUNET_DATASTORE_QueueEntry *qe;
586
587   h->in_receive = GNUNET_NO;
588   if (h->skip_next_messages > 0)
589     {
590       h->skip_next_messages--;
591       process_queue (h);
592       return;
593    } 
594   if (NULL == (qe = h->queue_head))
595     {
596       GNUNET_break (0);
597       process_queue (h);
598       return; 
599     }
600   qe->response_proc (h, msg);
601 }
602
603
604 /**
605  * Transmit request from queue to datastore service.
606  *
607  * @param cls the 'struct GNUNET_DATASTORE_Handle'
608  * @param size number of bytes that can be copied to buf
609  * @param buf where to copy the drop message
610  * @return number of bytes written to buf
611  */
612 static size_t
613 transmit_request (void *cls,
614                   size_t size, 
615                   void *buf)
616 {
617   struct GNUNET_DATASTORE_Handle *h = cls;
618   struct GNUNET_DATASTORE_QueueEntry *qe;
619   size_t msize;
620
621   h->th = NULL;
622   if (NULL == (qe = h->queue_head))
623     return 0; /* no entry in queue */
624   if (buf == NULL)
625     {
626       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
627                   _("Failed to transmit request to DATASTORE.\n"));
628       GNUNET_STATISTICS_update (h->stats,
629                                 gettext_noop ("# transmission request failures"),
630                                 1,
631                                 GNUNET_NO);
632       do_disconnect (h);
633       return 0;
634     }
635   if (size < (msize = qe->message_size))
636     {
637       process_queue (h);
638       return 0;
639     }
640  #if DEBUG_DATASTORE
641   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
642               "Transmitting %u byte request to DATASTORE\n",
643               msize);
644 #endif
645   memcpy (buf, &qe[1], msize);
646   qe->was_transmitted = GNUNET_YES;
647   GNUNET_SCHEDULER_cancel (qe->task);
648   qe->task = GNUNET_SCHEDULER_NO_TASK;
649   GNUNET_assert (GNUNET_NO == h->in_receive);
650   h->in_receive = GNUNET_YES;
651   GNUNET_CLIENT_receive (h->client,
652                          &receive_cb,
653                          h,
654                          GNUNET_TIME_absolute_get_remaining (qe->timeout));
655   GNUNET_STATISTICS_update (h->stats,
656                             gettext_noop ("# bytes sent to datastore"),
657                             1,
658                             GNUNET_NO);
659   return msize;
660 }
661
662
663 /**
664  * Process entries in the queue (or do nothing if we are already
665  * doing so).
666  * 
667  * @param h handle to the datastore
668  */
669 static void
670 process_queue (struct GNUNET_DATASTORE_Handle *h)
671 {
672   struct GNUNET_DATASTORE_QueueEntry *qe;
673
674   if (NULL == (qe = h->queue_head))
675     {
676 #if DEBUG_DATASTORE > 1
677       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
678                   "Queue empty\n");
679 #endif
680       return; /* no entry in queue */
681     }
682   if (qe->was_transmitted == GNUNET_YES)
683     {
684 #if DEBUG_DATASTORE > 1
685       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
686                   "Head request already transmitted\n");
687 #endif
688       return; /* waiting for replies */
689     }
690   if (h->th != NULL)
691     {
692 #if DEBUG_DATASTORE > 1
693       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
694                   "Pending transmission request\n");
695 #endif
696       return; /* request pending */
697     }
698   if (h->client == NULL)
699     {
700 #if DEBUG_DATASTORE > 1
701       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
702                   "Not connected\n");
703 #endif
704       return; /* waiting for reconnect */
705     }
706   if (GNUNET_YES == h->in_receive)
707     {
708       /* wait for response to previous query */
709       return; 
710     }
711 #if DEBUG_DATASTORE
712   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
713               "Queueing %u byte request to DATASTORE\n",
714               qe->message_size);
715 #endif
716   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
717                                                qe->message_size,
718                                                GNUNET_TIME_absolute_get_remaining (qe->timeout),
719                                                GNUNET_YES,
720                                                &transmit_request,
721                                                h);
722   GNUNET_assert (GNUNET_NO == h->in_receive);
723   GNUNET_break (NULL != h->th);
724 }
725
726
727 /**
728  * Dummy continuation used to do nothing (but be non-zero).
729  *
730  * @param cls closure
731  * @param result result 
732  * @param emsg error message
733  */
734 static void
735 drop_status_cont (void *cls, int32_t result, const char *emsg)
736 {
737   /* do nothing */
738 }
739
740
741 /**
742  * Free a queue entry.  Removes the given entry from the
743  * queue and releases associated resources.  Does NOT
744  * call the callback.
745  * 
746  * @param qe entry to free.
747  */
748 static void
749 free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
750 {
751   struct GNUNET_DATASTORE_Handle *h = qe->h;
752
753   GNUNET_CONTAINER_DLL_remove (h->queue_head,
754                                h->queue_tail,
755                                qe);
756   if (qe->task != GNUNET_SCHEDULER_NO_TASK)
757     {
758       GNUNET_SCHEDULER_cancel (qe->task);
759       qe->task = GNUNET_SCHEDULER_NO_TASK;
760     }
761   h->queue_size--;
762   qe->was_transmitted = GNUNET_SYSERR; /* use-after-free warning */
763   GNUNET_free (qe);
764 }
765
766
767 /**
768  * Type of a function to call when we receive a message
769  * from the service.
770  *
771  * @param cls closure
772  * @param msg message received, NULL on timeout or fatal error
773  */
774 static void 
775 process_status_message (void *cls,
776                         const struct
777                         GNUNET_MessageHeader * msg)
778 {
779   struct GNUNET_DATASTORE_Handle *h = cls;
780   struct GNUNET_DATASTORE_QueueEntry *qe;
781   struct StatusContext rc;
782   const struct StatusMessage *sm;
783   const char *emsg;
784   int32_t status;
785   int was_transmitted;
786
787   if (NULL == (qe = h->queue_head))
788     {
789       GNUNET_break (0);
790       do_disconnect (h);
791       return;
792     }
793   rc = qe->qc.sc;
794   if (msg == NULL)
795     {      
796       was_transmitted = qe->was_transmitted;
797       free_queue_entry (qe);
798       if (NULL == h->client)
799         return; /* forced disconnect */
800       if (rc.cont != NULL)
801         rc.cont (rc.cont_cls, 
802                  GNUNET_SYSERR,
803                  _("Failed to receive status response from database."));
804       if (was_transmitted == GNUNET_YES)
805         do_disconnect (h);
806       else
807         process_queue (h);
808       return;
809     }
810   GNUNET_assert (GNUNET_YES == qe->was_transmitted);
811   free_queue_entry (qe);
812   if ( (ntohs(msg->size) < sizeof(struct StatusMessage)) ||
813        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS) ) 
814     {
815       GNUNET_break (0);
816       h->retry_time = GNUNET_TIME_UNIT_ZERO;
817       do_disconnect (h);
818       if (rc.cont != NULL)
819         rc.cont (rc.cont_cls, 
820                  GNUNET_SYSERR,
821                  _("Error reading response from datastore service"));
822       return;
823     }
824   sm = (const struct StatusMessage*) msg;
825   status = ntohl(sm->status);
826   emsg = NULL;
827   if (ntohs(msg->size) > sizeof(struct StatusMessage))
828     {
829       emsg = (const char*) &sm[1];
830       if (emsg[ntohs(msg->size) - sizeof(struct StatusMessage) - 1] != '\0')
831         {
832           GNUNET_break (0);
833           emsg = _("Invalid error message received from datastore service");
834         }
835     }  
836   if ( (status == GNUNET_SYSERR) &&
837        (emsg == NULL) )
838     {
839       GNUNET_break (0);
840       emsg = _("Invalid error message received from datastore service");
841     }
842 #if DEBUG_DATASTORE
843   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
844               "Received status %d/%s\n",
845               (int) status,
846               emsg);
847 #endif
848   GNUNET_STATISTICS_update (h->stats,
849                             gettext_noop ("# status messages received"),
850                             1,
851                             GNUNET_NO);
852   h->retry_time.rel_value = 0;
853   process_queue (h);
854   if (rc.cont != NULL)
855     rc.cont (rc.cont_cls, 
856              status,
857              emsg);
858 }
859
860
861 /**
862  * Store an item in the datastore.  If the item is already present,
863  * the priorities are summed up and the higher expiration time and
864  * lower anonymity level is used.
865  *
866  * @param h handle to the datastore
867  * @param rid reservation ID to use (from "reserve"); use 0 if no
868  *            prior reservation was made
869  * @param key key for the value
870  * @param size number of bytes in data
871  * @param data content stored
872  * @param type type of the content
873  * @param priority priority of the content
874  * @param anonymity anonymity-level for the content
875  * @param replication how often should the content be replicated to other peers?
876  * @param expiration expiration time for the content
877  * @param queue_priority ranking of this request in the priority queue
878  * @param max_queue_size at what queue size should this request be dropped
879  *        (if other requests of higher priority are in the queue)
880  * @param timeout timeout for the operation
881  * @param cont continuation to call when done
882  * @param cont_cls closure for cont
883  * @return NULL if the entry was not queued, otherwise a handle that can be used to
884  *         cancel; note that even if NULL is returned, the callback will be invoked
885  *         (or rather, will already have been invoked)
886  */
887 struct GNUNET_DATASTORE_QueueEntry *
888 GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
889                       uint32_t rid,
890                       const GNUNET_HashCode * key,
891                       size_t size,
892                       const void *data,
893                       enum GNUNET_BLOCK_Type type,
894                       uint32_t priority,
895                       uint32_t anonymity,
896                       uint32_t replication,
897                       struct GNUNET_TIME_Absolute expiration,
898                       unsigned int queue_priority,
899                       unsigned int max_queue_size,
900                       struct GNUNET_TIME_Relative timeout,
901                       GNUNET_DATASTORE_ContinuationWithStatus cont,
902                       void *cont_cls)
903 {
904   struct GNUNET_DATASTORE_QueueEntry *qe;
905   struct DataMessage *dm;
906   size_t msize;
907   union QueueContext qc;
908
909 #if DEBUG_DATASTORE
910   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
911               "Asked to put %u bytes of data under key `%s' for %llu ms\n",
912               size,
913               GNUNET_h2s (key),
914               GNUNET_TIME_absolute_get_remaining (expiration).rel_value);
915 #endif
916   msize = sizeof(struct DataMessage) + size;
917   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
918   qc.sc.cont = cont;
919   qc.sc.cont_cls = cont_cls;
920   qe = make_queue_entry (h, msize,
921                          queue_priority, max_queue_size, timeout,
922                          &process_status_message, &qc);
923   if (qe == NULL)
924     {
925 #if DEBUG_DATASTORE
926       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
927                   "Could not create queue entry for PUT\n");
928 #endif
929       return NULL;
930     }
931   GNUNET_STATISTICS_update (h->stats,
932                             gettext_noop ("# PUT requests executed"),
933                             1,
934                             GNUNET_NO);
935   dm = (struct DataMessage* ) &qe[1];
936   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
937   dm->header.size = htons(msize);
938   dm->rid = htonl(rid);
939   dm->size = htonl( (uint32_t) size);
940   dm->type = htonl(type);
941   dm->priority = htonl(priority);
942   dm->anonymity = htonl(anonymity);
943   dm->replication = htonl (replication);
944   dm->reserved = htonl (0);
945   dm->uid = GNUNET_htonll(0);
946   dm->expiration = GNUNET_TIME_absolute_hton(expiration);
947   dm->key = *key;
948   memcpy (&dm[1], data, size);
949   process_queue (h);
950   return qe;
951 }
952
953
954 /**
955  * Reserve space in the datastore.  This function should be used
956  * to avoid "out of space" failures during a longer sequence of "put"
957  * operations (for example, when a file is being inserted).
958  *
959  * @param h handle to the datastore
960  * @param amount how much space (in bytes) should be reserved (for content only)
961  * @param entries how many entries will be created (to calculate per-entry overhead)
962  * @param queue_priority ranking of this request in the priority queue
963  * @param max_queue_size at what queue size should this request be dropped
964  *        (if other requests of higher priority are in the queue)
965  * @param timeout how long to wait at most for a response (or before dying in queue)
966  * @param cont continuation to call when done; "success" will be set to
967  *             a positive reservation value if space could be reserved.
968  * @param cont_cls closure for cont
969  * @return NULL if the entry was not queued, otherwise a handle that can be used to
970  *         cancel; note that even if NULL is returned, the callback will be invoked
971  *         (or rather, will already have been invoked)
972  */
973 struct GNUNET_DATASTORE_QueueEntry *
974 GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
975                           uint64_t amount,
976                           uint32_t entries,
977                           unsigned int queue_priority,
978                           unsigned int max_queue_size,
979                           struct GNUNET_TIME_Relative timeout,
980                           GNUNET_DATASTORE_ContinuationWithStatus cont,
981                           void *cont_cls)
982 {
983   struct GNUNET_DATASTORE_QueueEntry *qe;
984   struct ReserveMessage *rm;
985   union QueueContext qc;
986
987   if (cont == NULL)
988     cont = &drop_status_cont;
989 #if DEBUG_DATASTORE
990   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
991               "Asked to reserve %llu bytes of data and %u entries'\n",
992               (unsigned long long) amount,
993               (unsigned int) entries);
994 #endif
995   qc.sc.cont = cont;
996   qc.sc.cont_cls = cont_cls;
997   qe = make_queue_entry (h, sizeof(struct ReserveMessage),
998                          queue_priority, max_queue_size, timeout,
999                          &process_status_message, &qc);
1000   if (qe == NULL)
1001     {
1002 #if DEBUG_DATASTORE
1003       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1004                   "Could not create queue entry to reserve\n");
1005 #endif
1006       return NULL;
1007     }
1008   GNUNET_STATISTICS_update (h->stats,
1009                             gettext_noop ("# RESERVE requests executed"),
1010                             1,
1011                             GNUNET_NO);
1012   rm = (struct ReserveMessage*) &qe[1];
1013   rm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
1014   rm->header.size = htons(sizeof (struct ReserveMessage));
1015   rm->entries = htonl(entries);
1016   rm->amount = GNUNET_htonll(amount);
1017   process_queue (h);
1018   return qe;
1019 }
1020
1021
1022 /**
1023  * Signal that all of the data for which a reservation was made has
1024  * been stored and that whatever excess space might have been reserved
1025  * can now be released.
1026  *
1027  * @param h handle to the datastore
1028  * @param rid reservation ID (value of "success" in original continuation
1029  *        from the "reserve" function).
1030  * @param queue_priority ranking of this request in the priority queue
1031  * @param max_queue_size at what queue size should this request be dropped
1032  *        (if other requests of higher priority are in the queue)
1033  * @param queue_priority ranking of this request in the priority queue
1034  * @param max_queue_size at what queue size should this request be dropped
1035  *        (if other requests of higher priority are in the queue)
1036  * @param timeout how long to wait at most for a response
1037  * @param cont continuation to call when done
1038  * @param cont_cls closure for cont
1039  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1040  *         cancel; note that even if NULL is returned, the callback will be invoked
1041  *         (or rather, will already have been invoked)
1042  */
1043 struct GNUNET_DATASTORE_QueueEntry *
1044 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
1045                                   uint32_t rid,
1046                                   unsigned int queue_priority,
1047                                   unsigned int max_queue_size,
1048                                   struct GNUNET_TIME_Relative timeout,
1049                                   GNUNET_DATASTORE_ContinuationWithStatus cont,
1050                                   void *cont_cls)
1051 {
1052   struct GNUNET_DATASTORE_QueueEntry *qe;
1053   struct ReleaseReserveMessage *rrm;
1054   union QueueContext qc;
1055
1056   if (cont == NULL)
1057     cont = &drop_status_cont;
1058 #if DEBUG_DATASTORE
1059   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1060               "Asked to release reserve %d\n",
1061               rid);
1062 #endif
1063   qc.sc.cont = cont;
1064   qc.sc.cont_cls = cont_cls;
1065   qe = make_queue_entry (h, sizeof(struct ReleaseReserveMessage),
1066                          queue_priority, max_queue_size, timeout,
1067                          &process_status_message, &qc);
1068   if (qe == NULL)
1069     {
1070 #if DEBUG_DATASTORE
1071       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1072                   "Could not create queue entry to release reserve\n");
1073 #endif
1074       return NULL;
1075     }
1076   GNUNET_STATISTICS_update (h->stats,
1077                             gettext_noop ("# RELEASE RESERVE requests executed"),
1078                             1,
1079                             GNUNET_NO);
1080   rrm = (struct ReleaseReserveMessage*) &qe[1];
1081   rrm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
1082   rrm->header.size = htons(sizeof (struct ReleaseReserveMessage));
1083   rrm->rid = htonl(rid);
1084   process_queue (h);
1085   return qe;
1086 }
1087
1088
1089 /**
1090  * Update a value in the datastore.
1091  *
1092  * @param h handle to the datastore
1093  * @param uid identifier for the value
1094  * @param priority how much to increase the priority of the value
1095  * @param expiration new expiration value should be MAX of existing and this argument
1096  * @param queue_priority ranking of this request in the priority queue
1097  * @param max_queue_size at what queue size should this request be dropped
1098  *        (if other requests of higher priority are in the queue)
1099  * @param timeout how long to wait at most for a response
1100  * @param cont continuation to call when done
1101  * @param cont_cls closure for cont
1102  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1103  *         cancel; note that even if NULL is returned, the callback will be invoked
1104  *         (or rather, will already have been invoked)
1105  */
1106 struct GNUNET_DATASTORE_QueueEntry *
1107 GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
1108                          uint64_t uid,
1109                          uint32_t priority,
1110                          struct GNUNET_TIME_Absolute expiration,
1111                          unsigned int queue_priority,
1112                          unsigned int max_queue_size,
1113                          struct GNUNET_TIME_Relative timeout,
1114                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1115                          void *cont_cls)
1116 {
1117   struct GNUNET_DATASTORE_QueueEntry *qe;
1118   struct UpdateMessage *um;
1119   union QueueContext qc;
1120
1121   if (cont == NULL)
1122     cont = &drop_status_cont;
1123 #if DEBUG_DATASTORE
1124   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1125               "Asked to update entry %llu raising priority by %u and expiration to %llu\n",
1126               uid,
1127               (unsigned int) priority,
1128               (unsigned long long) expiration.abs_value);
1129 #endif
1130   qc.sc.cont = cont;
1131   qc.sc.cont_cls = cont_cls;
1132   qe = make_queue_entry (h, sizeof(struct UpdateMessage),
1133                          queue_priority, max_queue_size, timeout,
1134                          &process_status_message, &qc);
1135   if (qe == NULL)
1136     {
1137 #if DEBUG_DATASTORE
1138       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1139                   "Could not create queue entry for UPDATE\n");
1140 #endif
1141       return NULL;
1142     }
1143   GNUNET_STATISTICS_update (h->stats,
1144                             gettext_noop ("# UPDATE requests executed"),
1145                             1,
1146                             GNUNET_NO);
1147   um = (struct UpdateMessage*) &qe[1];
1148   um->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
1149   um->header.size = htons(sizeof (struct UpdateMessage));
1150   um->priority = htonl(priority);
1151   um->expiration = GNUNET_TIME_absolute_hton(expiration);
1152   um->uid = GNUNET_htonll(uid);
1153   process_queue (h);
1154   return qe;
1155 }
1156
1157
1158 /**
1159  * Explicitly remove some content from the database.
1160  * The "cont"inuation will be called with status
1161  * "GNUNET_OK" if content was removed, "GNUNET_NO"
1162  * if no matching entry was found and "GNUNET_SYSERR"
1163  * on all other types of errors.
1164  *
1165  * @param h handle to the datastore
1166  * @param key key for the value
1167  * @param size number of bytes in data
1168  * @param data content stored
1169  * @param queue_priority ranking of this request in the priority queue
1170  * @param max_queue_size at what queue size should this request be dropped
1171  *        (if other requests of higher priority are in the queue)
1172  * @param timeout how long to wait at most for a response
1173  * @param cont continuation to call when done
1174  * @param cont_cls closure for cont
1175  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1176  *         cancel; note that even if NULL is returned, the callback will be invoked
1177  *         (or rather, will already have been invoked)
1178  */
1179 struct GNUNET_DATASTORE_QueueEntry *
1180 GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
1181                          const GNUNET_HashCode *key,
1182                          size_t size,
1183                          const void *data,
1184                          unsigned int queue_priority,
1185                          unsigned int max_queue_size,
1186                          struct GNUNET_TIME_Relative timeout,
1187                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1188                          void *cont_cls)
1189 {
1190   struct GNUNET_DATASTORE_QueueEntry *qe;
1191   struct DataMessage *dm;
1192   size_t msize;
1193   union QueueContext qc;
1194
1195   if (cont == NULL)
1196     cont = &drop_status_cont;
1197 #if DEBUG_DATASTORE
1198   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1199               "Asked to remove %u bytes under key `%s'\n",
1200               size,
1201               GNUNET_h2s (key));
1202 #endif
1203   qc.sc.cont = cont;
1204   qc.sc.cont_cls = cont_cls;
1205   msize = sizeof(struct DataMessage) + size;
1206   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1207   qe = make_queue_entry (h, msize,
1208                          queue_priority, max_queue_size, timeout,
1209                          &process_status_message, &qc);
1210   if (qe == NULL)
1211     {
1212 #if DEBUG_DATASTORE
1213       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1214                   "Could not create queue entry for REMOVE\n");
1215 #endif
1216       return NULL;
1217     }
1218   GNUNET_STATISTICS_update (h->stats,
1219                             gettext_noop ("# REMOVE requests executed"),
1220                             1,
1221                             GNUNET_NO);
1222   dm = (struct DataMessage*) &qe[1];
1223   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
1224   dm->header.size = htons(msize);
1225   dm->rid = htonl(0);
1226   dm->size = htonl(size);
1227   dm->type = htonl(0);
1228   dm->priority = htonl(0);
1229   dm->anonymity = htonl(0);
1230   dm->uid = GNUNET_htonll(0);
1231   dm->expiration = GNUNET_TIME_absolute_hton(GNUNET_TIME_UNIT_ZERO_ABS);
1232   dm->key = *key;
1233   memcpy (&dm[1], data, size);
1234   process_queue (h);
1235   return qe;
1236 }
1237
1238
1239 /**
1240  * Type of a function to call when we receive a message
1241  * from the service.
1242  *
1243  * @param cls closure
1244  * @param msg message received, NULL on timeout or fatal error
1245  */
1246 static void 
1247 process_result_message (void *cls,
1248                         const struct GNUNET_MessageHeader *msg)
1249 {
1250   struct GNUNET_DATASTORE_Handle *h = cls;
1251   struct GNUNET_DATASTORE_QueueEntry *qe;
1252   struct ResultContext rc;
1253   const struct DataMessage *dm;
1254
1255   if (msg == NULL)
1256     {
1257       qe = h->queue_head;
1258       GNUNET_assert (NULL != qe);
1259       if (qe->was_transmitted == GNUNET_YES)
1260         {
1261           rc = qe->qc.rc;
1262           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1263                       _("Failed to receive response from database.\n"));
1264           do_disconnect (h);
1265           free_queue_entry (qe);
1266           if (rc.proc != NULL)
1267             rc.proc (rc.proc_cls,
1268                      NULL, 0, NULL, 0, 0, 0, 
1269                      GNUNET_TIME_UNIT_ZERO_ABS, 0);    
1270         }
1271       else
1272         process_queue (h);
1273       return;
1274     }
1275   if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) 
1276     {
1277       GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader));
1278       qe = h->queue_head;
1279       rc = qe->qc.rc;
1280       GNUNET_assert (GNUNET_YES == qe->was_transmitted);
1281       free_queue_entry (qe);
1282 #if DEBUG_DATASTORE
1283       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1284                   "Received end of result set, new queue size is %u\n",
1285                   h->queue_size);
1286 #endif
1287       if (rc.proc != NULL)
1288         rc.proc (rc.proc_cls,
1289                  NULL, 0, NULL, 0, 0, 0, 
1290                  GNUNET_TIME_UNIT_ZERO_ABS, 0); 
1291       h->retry_time.rel_value = 0;
1292       h->result_count = 0;
1293       process_queue (h);
1294       return;
1295     }
1296   qe = h->queue_head;
1297   rc = qe->qc.rc;
1298   if (GNUNET_YES != qe->was_transmitted)
1299     {
1300       GNUNET_break (0);
1301       free_queue_entry (qe);
1302       h->retry_time = GNUNET_TIME_UNIT_ZERO;
1303       do_disconnect (h);
1304       if (rc.proc != NULL)
1305         rc.proc (rc.proc_cls,
1306                  NULL, 0, NULL, 0, 0, 0, 
1307                  GNUNET_TIME_UNIT_ZERO_ABS, 0);
1308       return;
1309     }
1310   if ( (ntohs(msg->size) < sizeof(struct DataMessage)) ||
1311        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) ||
1312        (ntohs(msg->size) != sizeof(struct DataMessage) + ntohl (((const struct DataMessage*)msg)->size)) )
1313     {
1314       GNUNET_break (0);
1315       free_queue_entry (qe);
1316       h->retry_time = GNUNET_TIME_UNIT_ZERO;
1317       do_disconnect (h);
1318       if (rc.proc != NULL)
1319         rc.proc (rc.proc_cls,
1320                  NULL, 0, NULL, 0, 0, 0, 
1321                  GNUNET_TIME_UNIT_ZERO_ABS, 0);
1322       return;
1323     }
1324   GNUNET_STATISTICS_update (h->stats,
1325                             gettext_noop ("# Results received"),
1326                             1,
1327                             GNUNET_NO);
1328   dm = (const struct DataMessage*) msg;
1329 #if DEBUG_DATASTORE
1330   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1331               "Received result %llu with type %u and size %u with key %s\n",
1332               (unsigned long long) GNUNET_ntohll(dm->uid),
1333               ntohl(dm->type),
1334               ntohl(dm->size),
1335               GNUNET_h2s(&dm->key));
1336 #endif
1337   free_queue_entry (qe);
1338   h->retry_time.rel_value = 0;
1339   process_queue (h);
1340   if (rc.proc != NULL)
1341     rc.proc (rc.proc_cls,
1342              &dm->key,
1343              ntohl(dm->size),
1344              &dm[1],
1345              ntohl(dm->type),
1346              ntohl(dm->priority),
1347              ntohl(dm->anonymity),
1348              GNUNET_TIME_absolute_ntoh(dm->expiration), 
1349              GNUNET_ntohll(dm->uid));
1350 }
1351
1352
1353 /**
1354  * Get a random value from the datastore for content replication.
1355  * Returns a single, random value among those with the highest
1356  * replication score, lowering positive replication scores by one for
1357  * the chosen value (if only content with a replication score exists,
1358  * a random value is returned and replication scores are not changed).
1359  *
1360  * @param h handle to the datastore
1361  * @param queue_priority ranking of this request in the priority queue
1362  * @param max_queue_size at what queue size should this request be dropped
1363  *        (if other requests of higher priority are in the queue)
1364  * @param timeout how long to wait at most for a response
1365  * @param proc function to call on a random value; it
1366  *        will be called once with a value (if available)
1367  *        and always once with a value of NULL.
1368  * @param proc_cls closure for proc
1369  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1370  *         cancel
1371  */
1372 struct GNUNET_DATASTORE_QueueEntry *
1373 GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
1374                                       unsigned int queue_priority,
1375                                       unsigned int max_queue_size,
1376                                       struct GNUNET_TIME_Relative timeout,
1377                                       GNUNET_DATASTORE_DatumProcessor proc, 
1378                                       void *proc_cls)
1379 {
1380   struct GNUNET_DATASTORE_QueueEntry *qe;
1381   struct GNUNET_MessageHeader *m;
1382   union QueueContext qc;
1383
1384   GNUNET_assert (NULL != proc);
1385 #if DEBUG_DATASTORE
1386   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1387               "Asked to get replication entry in %llu ms\n",
1388               (unsigned long long) timeout.rel_value);
1389 #endif
1390   qc.rc.proc = proc;
1391   qc.rc.proc_cls = proc_cls;
1392   qe = make_queue_entry (h, sizeof(struct GNUNET_MessageHeader),
1393                          queue_priority, max_queue_size, timeout,
1394                          &process_result_message, &qc);
1395   if (qe == NULL)
1396     {
1397 #if DEBUG_DATASTORE
1398       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1399                   "Could not create queue entry for GET REPLICATION\n");
1400 #endif
1401       return NULL;    
1402     }
1403   GNUNET_STATISTICS_update (h->stats,
1404                             gettext_noop ("# GET REPLICATION requests executed"),
1405                             1,
1406                             GNUNET_NO);
1407   m = (struct GNUNET_MessageHeader*) &qe[1];
1408   m->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION);
1409   m->size = htons(sizeof (struct GNUNET_MessageHeader));
1410   process_queue (h);
1411   return qe;
1412 }
1413
1414
1415 /**
1416  * Get a single zero-anonymity value from the datastore.
1417  *
1418  * @param h handle to the datastore
1419  * @param offset offset of the result (modulo num-results); set to
1420  *               a random 64-bit value initially; then increment by
1421  *               one each time; detect that all results have been found by uid
1422  *               being again the first uid ever returned.
1423  * @param queue_priority ranking of this request in the priority queue
1424  * @param max_queue_size at what queue size should this request be dropped
1425  *        (if other requests of higher priority are in the queue)
1426  * @param timeout how long to wait at most for a response
1427  * @param type allowed type for the operation (never zero)
1428  * @param proc function to call on a random value; it
1429  *        will be called once with a value (if available)
1430  *        or with NULL if none value exists.
1431  * @param proc_cls closure for proc
1432  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1433  *         cancel
1434  */
1435 struct GNUNET_DATASTORE_QueueEntry *
1436 GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1437                                      uint64_t offset,
1438                                      unsigned int queue_priority,
1439                                      unsigned int max_queue_size,
1440                                      struct GNUNET_TIME_Relative timeout,
1441                                      enum GNUNET_BLOCK_Type type,
1442                                      GNUNET_DATASTORE_DatumProcessor proc, 
1443                                      void *proc_cls)
1444 {
1445   struct GNUNET_DATASTORE_QueueEntry *qe;
1446   struct GetZeroAnonymityMessage *m;
1447   union QueueContext qc;
1448
1449   GNUNET_assert (NULL != proc);
1450   GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY);
1451 #if DEBUG_DATASTORE
1452   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1453               "Asked to get %llu-th zero-anonymity entry of type %d in %llu ms\n",
1454               (unsigned long long) offset,
1455               type,
1456               (unsigned long long) timeout.rel_value);
1457 #endif
1458   qc.rc.proc = proc;
1459   qc.rc.proc_cls = proc_cls;
1460   qe = make_queue_entry (h, sizeof(struct GetZeroAnonymityMessage),
1461                          queue_priority, max_queue_size, timeout,
1462                          &process_result_message, &qc);
1463   if (qe == NULL)
1464     {
1465 #if DEBUG_DATASTORE
1466       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1467                   "Could not create queue entry for zero-anonymity procation\n");
1468 #endif
1469       return NULL;    
1470     }
1471   GNUNET_STATISTICS_update (h->stats,
1472                             gettext_noop ("# GET ZERO ANONYMITY requests executed"),
1473                             1,
1474                             GNUNET_NO);
1475   m = (struct GetZeroAnonymityMessage*) &qe[1];
1476   m->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY);
1477   m->header.size = htons(sizeof (struct GetZeroAnonymityMessage));
1478   m->type = htonl ((uint32_t) type);
1479   m->offset = GNUNET_htonll (offset);
1480   process_queue (h);
1481   return qe;
1482 }
1483
1484
1485 /**
1486  * Get a result for a particular key from the datastore.  The processor
1487  * will only be called once.
1488  *
1489  * @param h handle to the datastore
1490  * @param offset offset of the result (modulo num-results); set to
1491  *               a random 64-bit value initially; then increment by
1492  *               one each time; detect that all results have been found by uid
1493  *               being again the first uid ever returned.
1494  * @param key maybe NULL (to match all entries)
1495  * @param type desired type, 0 for any
1496  * @param queue_priority ranking of this request in the priority queue
1497  * @param max_queue_size at what queue size should this request be dropped
1498  *        (if other requests of higher priority are in the queue)
1499  * @param timeout how long to wait at most for a response
1500  * @param proc function to call on each matching value;
1501  *        will be called once with a NULL value at the end
1502  * @param proc_cls closure for proc
1503  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1504  *         cancel
1505  */
1506 struct GNUNET_DATASTORE_QueueEntry *
1507 GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h,
1508                           uint64_t offset,
1509                           const GNUNET_HashCode * key,
1510                           enum GNUNET_BLOCK_Type type,
1511                           unsigned int queue_priority,
1512                           unsigned int max_queue_size,
1513                           struct GNUNET_TIME_Relative timeout,
1514                           GNUNET_DATASTORE_DatumProcessor proc, 
1515                           void *proc_cls)
1516 {
1517   struct GNUNET_DATASTORE_QueueEntry *qe;
1518   struct GetMessage *gm;
1519   union QueueContext qc;
1520
1521   GNUNET_assert (NULL != proc);
1522 #if DEBUG_DATASTORE
1523   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1524               "Asked to look for data of type %u under key `%s'\n",
1525               (unsigned int) type,
1526               GNUNET_h2s (key));
1527 #endif
1528   qc.rc.proc = proc;
1529   qc.rc.proc_cls = proc_cls;
1530   qe = make_queue_entry (h, sizeof(struct GetMessage),
1531                          queue_priority, max_queue_size, timeout,
1532                          &process_result_message, &qc);
1533   if (qe == NULL)
1534     {
1535 #if DEBUG_DATASTORE
1536       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1537                   "Could not queue request for `%s'\n",
1538                   GNUNET_h2s (key));
1539 #endif
1540       return NULL;
1541     }
1542   GNUNET_STATISTICS_update (h->stats,
1543                             gettext_noop ("# GET requests executed"),
1544                             1,
1545                             GNUNET_NO);
1546   gm = (struct GetMessage*) &qe[1];
1547   gm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1548   gm->type = htonl(type);
1549   gm->offset = GNUNET_htonll (offset);
1550   if (key != NULL)
1551     {
1552       gm->header.size = htons(sizeof (struct GetMessage));
1553       gm->key = *key;
1554     }
1555   else
1556     {
1557       gm->header.size = htons(sizeof (struct GetMessage) - sizeof(GNUNET_HashCode));
1558     }
1559   process_queue (h);
1560   return qe;
1561 }
1562
1563
1564 /**
1565  * Cancel a datastore operation.  The final callback from the
1566  * operation must not have been done yet.
1567  * 
1568  * @param qe operation to cancel
1569  */
1570 void
1571 GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
1572 {
1573   struct GNUNET_DATASTORE_Handle *h;
1574
1575   GNUNET_assert (GNUNET_SYSERR != qe->was_transmitted);
1576   h = qe->h;
1577 #if DEBUG_DATASTORE
1578   GNUNET_log  (GNUNET_ERROR_TYPE_DEBUG,
1579                "Pending DATASTORE request %p cancelled (%d, %d)\n",
1580                qe,
1581                qe->was_transmitted,
1582                h->queue_head == qe);
1583 #endif
1584   if (GNUNET_YES == qe->was_transmitted) 
1585     {
1586       free_queue_entry (qe);
1587       h->skip_next_messages++;
1588       return;
1589     }
1590   free_queue_entry (qe);
1591   process_queue (h);
1592 }
1593
1594
1595 /* end of datastore_api.c */