15139b17dce227ec88592eabebf1c56b825b3dbc
[oweals/gnunet.git] / src / datastore / datastore_api.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/datastore_api.c
23  * @brief Management for the datastore for files stored on a GNUnet node.  Implements
24  *        a priority queue for requests (with timeouts).
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet_arm_service.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_datastore_service.h"
31 #include "gnunet_statistics_service.h"
32 #include "datastore.h"
33
34 /**
35  * If a client stopped asking for more results, how many more do
36  * we receive from the DB before killing the connection?  Trade-off
37  * between re-doing TCP handshakes and (needlessly) receiving 
38  * useless results.
39  */
40 #define MAX_EXCESS_RESULTS 8
41
42 /**
43  * Context for processing status messages.
44  */
45 struct StatusContext
46 {
47   /**
48    * Continuation to call with the status.
49    */
50   GNUNET_DATASTORE_ContinuationWithStatus cont;
51
52   /**
53    * Closure for cont.
54    */
55   void *cont_cls;
56
57 };
58
59
60 /**
61  * Context for processing result messages.
62  */
63 struct ResultContext
64 {
65   /**
66    * Function to call with the result.
67    */
68   GNUNET_DATASTORE_DatumProcessor proc;
69
70   /**
71    * Closure for proc.
72    */
73   void *proc_cls;
74
75 };
76
77
78 /**
79  *  Context for a queue operation.
80  */
81 union QueueContext
82 {
83
84   struct StatusContext sc;
85   
86   struct ResultContext rc;
87
88 };
89
90
91
92 /**
93  * Entry in our priority queue.
94  */
95 struct GNUNET_DATASTORE_QueueEntry
96 {
97
98   /**
99    * This is a linked list.
100    */
101   struct GNUNET_DATASTORE_QueueEntry *next;
102
103   /**
104    * This is a linked list.
105    */
106   struct GNUNET_DATASTORE_QueueEntry *prev;
107
108   /**
109    * Handle to the master context.
110    */
111   struct GNUNET_DATASTORE_Handle *h;
112
113   /**
114    * Response processor (NULL if we are not waiting for a response).
115    * This struct should be used for the closure, function-specific
116    * arguments can be passed via 'qc'.
117    */
118   GNUNET_CLIENT_MessageHandler response_proc;
119
120   /**
121    * Function to call after transmission of the request.
122    */
123   GNUNET_DATASTORE_ContinuationWithStatus cont;
124    
125   /**
126    * Closure for 'cont'.
127    */
128   void *cont_cls;
129
130   /**
131    * Context for the operation.
132    */
133   union QueueContext qc;
134
135   /**
136    * Task for timeout signalling.
137    */
138   GNUNET_SCHEDULER_TaskIdentifier task;
139
140   /**
141    * Timeout for the current operation.
142    */
143   struct GNUNET_TIME_Absolute timeout;
144
145   /**
146    * Priority in the queue.
147    */
148   unsigned int priority;
149
150   /**
151    * Maximum allowed length of queue (otherwise
152    * this request should be discarded).
153    */
154   unsigned int max_queue;
155
156   /**
157    * Number of bytes in the request message following
158    * this struct.  32-bit value for nicer memory
159    * access (and overall struct alignment).
160    */
161   uint32_t message_size;
162
163   /**
164    * Has this message been transmitted to the service?
165    * Only ever GNUNET_YES for the head of the queue.
166    * Note that the overall struct should end at a 
167    * multiple of 64 bits.
168    */
169   int was_transmitted;
170   
171 };
172
173 /**
174  * Handle to the datastore service. 
175  */
176 struct GNUNET_DATASTORE_Handle
177 {
178
179   /**
180    * Our configuration.
181    */
182   const struct GNUNET_CONFIGURATION_Handle *cfg;
183
184   /**
185    * Current connection to the datastore service.
186    */
187   struct GNUNET_CLIENT_Connection *client;
188
189   /**
190    * Handle for statistics.
191    */
192   struct GNUNET_STATISTICS_Handle *stats;
193
194   /**
195    * Current transmit handle.
196    */
197   struct GNUNET_CLIENT_TransmitHandle *th;
198
199   /**
200    * Current head of priority queue.
201    */
202   struct GNUNET_DATASTORE_QueueEntry *queue_head;
203
204   /**
205    * Current tail of priority queue.
206    */
207   struct GNUNET_DATASTORE_QueueEntry *queue_tail;
208
209   /**
210    * Task for trying to reconnect.
211    */
212   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
213
214   /**
215    * How quickly should we retry?  Used for exponential back-off on
216    * connect-errors.
217    */
218   struct GNUNET_TIME_Relative retry_time;
219
220   /**
221    * Number of entries in the queue.
222    */
223   unsigned int queue_size;
224
225   /**
226    * Number of results we're receiving for the current query
227    * after application stopped to care.  Used to determine when
228    * to reset the connection.
229    */
230   unsigned int result_count;
231
232   /**
233    * Are we currently trying to receive from the service?
234    */
235   int in_receive;
236
237   /**
238    * We should ignore the next message(s) from the service.
239    */
240   unsigned int skip_next_messages;
241
242 };
243
244
245
246 /**
247  * Connect to the datastore service.
248  *
249  * @param cfg configuration to use
250  * @return handle to use to access the service
251  */
252 struct GNUNET_DATASTORE_Handle *
253 GNUNET_DATASTORE_connect (const struct
254                           GNUNET_CONFIGURATION_Handle
255                           *cfg)
256 {
257   struct GNUNET_CLIENT_Connection *c;
258   struct GNUNET_DATASTORE_Handle *h;
259   
260   c = GNUNET_CLIENT_connect ("datastore", cfg);
261   if (c == NULL)
262     return NULL; /* oops */
263   h = GNUNET_malloc (sizeof(struct GNUNET_DATASTORE_Handle) + 
264                      GNUNET_SERVER_MAX_MESSAGE_SIZE - 1);
265   h->client = c;
266   h->cfg = cfg;
267   h->stats = GNUNET_STATISTICS_create ("datastore-api",
268                                        cfg);
269   return h;
270 }
271
272
273 /**
274  * Transmit DROP message to datastore service.
275  *
276  * @param cls the 'struct GNUNET_DATASTORE_Handle'
277  * @param size number of bytes that can be copied to buf
278  * @param buf where to copy the drop message
279  * @return number of bytes written to buf
280  */
281 static size_t
282 transmit_drop (void *cls,
283                size_t size, 
284                void *buf)
285 {
286   struct GNUNET_DATASTORE_Handle *h = cls;
287   struct GNUNET_MessageHeader *hdr;
288   
289   if (buf == NULL)
290     {
291       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
292                   _("Failed to transmit request to drop database.\n"));
293       GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
294       return 0;
295     }
296   GNUNET_assert (size >= sizeof(struct GNUNET_MessageHeader));
297   hdr = buf;
298   hdr->size = htons(sizeof(struct GNUNET_MessageHeader));
299   hdr->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
300   GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
301   return sizeof(struct GNUNET_MessageHeader);
302 }
303
304
305 /**
306  * Disconnect from the datastore service (and free
307  * associated resources).
308  *
309  * @param h handle to the datastore
310  * @param drop set to GNUNET_YES to delete all data in datastore (!)
311  */
312 void
313 GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
314                              int drop)
315 {
316   struct GNUNET_DATASTORE_QueueEntry *qe;
317
318   if (NULL != h->th)
319     {
320       GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
321       h->th = NULL;
322     }
323   if (h->client != NULL)
324     {
325       GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
326       h->client = NULL;
327     }
328   if (h->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
329     {
330       GNUNET_SCHEDULER_cancel (h->reconnect_task);
331       h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
332     }
333   while (NULL != (qe = h->queue_head))
334     {
335       GNUNET_assert (NULL != qe->response_proc);
336       qe->response_proc (h, NULL);
337     }
338   if (GNUNET_YES == drop) 
339     {
340       h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
341       if (h->client != NULL)
342         {
343           if (NULL != 
344               GNUNET_CLIENT_notify_transmit_ready (h->client,
345                                                    sizeof(struct GNUNET_MessageHeader),
346                                                    GNUNET_TIME_UNIT_MINUTES,
347                                                    GNUNET_YES,
348                                                    &transmit_drop,
349                                                    h))
350             return;
351           GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
352           h->client = NULL;
353         }
354       GNUNET_break (0);
355     }
356   GNUNET_STATISTICS_destroy (h->stats,
357                              GNUNET_NO);
358   h->stats = NULL;
359   GNUNET_free (h);
360 }
361
362
363 /**
364  * A request has timed out (before being transmitted to the service).
365  *
366  * @param cls the 'struct GNUNET_DATASTORE_QueueEntry'
367  * @param tc scheduler context
368  */
369 static void
370 timeout_queue_entry (void *cls,
371                      const struct GNUNET_SCHEDULER_TaskContext *tc)
372 {
373   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
374
375   GNUNET_STATISTICS_update (qe->h->stats,
376                             gettext_noop ("# queue entry timeouts"),
377                             1,
378                             GNUNET_NO);
379   qe->task = GNUNET_SCHEDULER_NO_TASK;
380   GNUNET_assert (qe->was_transmitted == GNUNET_NO); 
381   qe->response_proc (qe->h, NULL);
382 }
383
384
385 /**
386  * Create a new entry for our priority queue (and possibly discard other entires if
387  * the queue is getting too long).
388  *
389  * @param h handle to the datastore
390  * @param msize size of the message to queue
391  * @param queue_priority priority of the entry
392  * @param max_queue_size at what queue size should this request be dropped
393  *        (if other requests of higher priority are in the queue)
394  * @param timeout timeout for the operation
395  * @param response_proc function to call with replies (can be NULL)
396  * @param qc client context (NOT a closure for response_proc)
397  * @return NULL if the queue is full 
398  */
399 static struct GNUNET_DATASTORE_QueueEntry *
400 make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
401                   size_t msize,
402                   unsigned int queue_priority,
403                   unsigned int max_queue_size,
404                   struct GNUNET_TIME_Relative timeout,
405                   GNUNET_CLIENT_MessageHandler response_proc,            
406                   const union QueueContext *qc)
407 {
408   struct GNUNET_DATASTORE_QueueEntry *ret;
409   struct GNUNET_DATASTORE_QueueEntry *pos;
410   unsigned int c;
411
412   c = 0;
413   pos = h->queue_head;
414   while ( (pos != NULL) &&
415           (c < max_queue_size) &&
416           (pos->priority >= queue_priority) )
417     {
418       c++;
419       pos = pos->next;
420     }
421   if (c >= max_queue_size)
422     {
423       GNUNET_STATISTICS_update (h->stats,
424                                 gettext_noop ("# queue overflows"),
425                                 1,
426                                 GNUNET_NO);
427       return NULL;
428     }
429   ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize);
430   ret->h = h;
431   ret->response_proc = response_proc;
432   ret->qc = *qc;
433   ret->timeout = GNUNET_TIME_relative_to_absolute (timeout);
434   ret->priority = queue_priority;
435   ret->max_queue = max_queue_size;
436   ret->message_size = msize;
437   ret->was_transmitted = GNUNET_NO;
438   if (pos == NULL)
439     {
440       /* append at the tail */
441       pos = h->queue_tail;
442     }
443   else
444     {
445       pos = pos->prev; 
446       /* do not insert at HEAD if HEAD query was already
447          transmitted and we are still receiving replies! */
448       if ( (pos == NULL) &&
449            (h->queue_head->was_transmitted) )
450         pos = h->queue_head;
451     }
452   c++;
453   GNUNET_STATISTICS_update (h->stats,
454                             gettext_noop ("# queue entries created"),
455                             1,
456                             GNUNET_NO);
457   GNUNET_CONTAINER_DLL_insert_after (h->queue_head,
458                                      h->queue_tail,
459                                      pos,
460                                      ret);
461   h->queue_size++;
462   ret->task = GNUNET_SCHEDULER_add_delayed (timeout,
463                                             &timeout_queue_entry,
464                                             ret);
465   pos = ret->next;
466   while (pos != NULL) 
467     {
468       if ( (pos->max_queue < h->queue_size) &&
469            (pos->was_transmitted == GNUNET_NO) )
470         {
471           GNUNET_assert (pos->response_proc != NULL);
472           /* move 'pos' element to head so that it will be 
473              killed on 'NULL' call below */
474           GNUNET_CONTAINER_DLL_remove (h->queue_head,
475                                        h->queue_tail,
476                                        pos);
477           GNUNET_CONTAINER_DLL_insert (h->queue_head,
478                                        h->queue_tail,
479                                        pos);
480           GNUNET_STATISTICS_update (h->stats,
481                                     gettext_noop ("# Requests dropped from datastore queue"),
482                                     1,
483                                     GNUNET_NO);
484           pos->response_proc (h, NULL);
485           break;
486         }
487       pos = pos->next;
488     }
489   return ret;
490 }
491
492
493 /**
494  * Process entries in the queue (or do nothing if we are already
495  * doing so).
496  * 
497  * @param h handle to the datastore
498  */
499 static void
500 process_queue (struct GNUNET_DATASTORE_Handle *h);
501
502
503 /**
504  * Try reconnecting to the datastore service.
505  *
506  * @param cls the 'struct GNUNET_DATASTORE_Handle'
507  * @param tc scheduler context
508  */
509 static void
510 try_reconnect (void *cls,
511                const struct GNUNET_SCHEDULER_TaskContext *tc)
512 {
513   struct GNUNET_DATASTORE_Handle *h = cls;
514
515   if (h->retry_time.rel_value < GNUNET_CONSTANTS_SERVICE_RETRY.rel_value)
516     h->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY;
517   else
518     h->retry_time = GNUNET_TIME_relative_multiply (h->retry_time, 2);
519   if (h->retry_time.rel_value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.rel_value)
520     h->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
521   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
522   h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
523   if (h->client == NULL)
524     {
525       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
526                   "DATASTORE reconnect failed (fatally)\n");
527       return;
528     }
529   GNUNET_STATISTICS_update (h->stats,
530                             gettext_noop ("# datastore connections (re)created"),
531                             1,
532                             GNUNET_NO);
533 #if DEBUG_DATASTORE
534   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
535               "Reconnected to DATASTORE\n");
536 #endif
537   process_queue (h);
538 }
539
540
541 /**
542  * Disconnect from the service and then try reconnecting to the datastore service
543  * after some delay.
544  *
545  * @param h handle to datastore to disconnect and reconnect
546  */
547 static void
548 do_disconnect (struct GNUNET_DATASTORE_Handle *h)
549 {
550   if (h->client == NULL)
551     {
552 #if DEBUG_DATASTORE
553       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
554                   "client NULL in disconnect, will not try to reconnect\n");
555 #endif
556       return;
557     }
558 #if 0
559   GNUNET_STATISTICS_update (stats,
560                             gettext_noop ("# reconnected to DATASTORE"),
561                             1,
562                             GNUNET_NO);
563 #endif
564   GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
565   h->skip_next_messages = 0;
566   h->client = NULL;
567   h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->retry_time,
568                                                     &try_reconnect,
569                                                     h);      
570 }
571
572
573 /**
574  * Function called whenever we receive a message from
575  * the service.  Calls the appropriate handler.
576  *
577  * @param cls the 'struct GNUNET_DATASTORE_Handle'
578  * @param msg the received message
579  */
580 static void 
581 receive_cb (void *cls,
582             const struct GNUNET_MessageHeader *msg)
583 {
584   struct GNUNET_DATASTORE_Handle *h = cls;
585   struct GNUNET_DATASTORE_QueueEntry *qe;
586
587   h->in_receive = GNUNET_NO;
588   if (h->skip_next_messages > 0)
589     {
590       h->skip_next_messages--;
591       process_queue (h);
592       return;
593    } 
594   if (NULL == (qe = h->queue_head))
595     {
596       GNUNET_break (0);
597       process_queue (h);
598       return; 
599     }
600   qe->response_proc (h, msg);
601 }
602
603
604 /**
605  * Transmit request from queue to datastore service.
606  *
607  * @param cls the 'struct GNUNET_DATASTORE_Handle'
608  * @param size number of bytes that can be copied to buf
609  * @param buf where to copy the drop message
610  * @return number of bytes written to buf
611  */
612 static size_t
613 transmit_request (void *cls,
614                   size_t size, 
615                   void *buf)
616 {
617   struct GNUNET_DATASTORE_Handle *h = cls;
618   struct GNUNET_DATASTORE_QueueEntry *qe;
619   size_t msize;
620
621   h->th = NULL;
622   if (NULL == (qe = h->queue_head))
623     return 0; /* no entry in queue */
624   if (buf == NULL)
625     {
626       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
627                   _("Failed to transmit request to DATASTORE.\n"));
628       GNUNET_STATISTICS_update (h->stats,
629                                 gettext_noop ("# transmission request failures"),
630                                 1,
631                                 GNUNET_NO);
632       do_disconnect (h);
633       return 0;
634     }
635   if (size < (msize = qe->message_size))
636     {
637       process_queue (h);
638       return 0;
639     }
640  #if DEBUG_DATASTORE
641   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
642               "Transmitting %u byte request to DATASTORE\n",
643               msize);
644 #endif
645   memcpy (buf, &qe[1], msize);
646   qe->was_transmitted = GNUNET_YES;
647   GNUNET_SCHEDULER_cancel (qe->task);
648   qe->task = GNUNET_SCHEDULER_NO_TASK;
649   GNUNET_assert (GNUNET_NO == h->in_receive);
650   h->in_receive = GNUNET_YES;
651   GNUNET_CLIENT_receive (h->client,
652                          &receive_cb,
653                          h,
654                          GNUNET_TIME_absolute_get_remaining (qe->timeout));
655   GNUNET_STATISTICS_update (h->stats,
656                             gettext_noop ("# bytes sent to datastore"),
657                             1,
658                             GNUNET_NO);
659   return msize;
660 }
661
662
663 /**
664  * Process entries in the queue (or do nothing if we are already
665  * doing so).
666  * 
667  * @param h handle to the datastore
668  */
669 static void
670 process_queue (struct GNUNET_DATASTORE_Handle *h)
671 {
672   struct GNUNET_DATASTORE_QueueEntry *qe;
673
674   if (NULL == (qe = h->queue_head))
675     {
676 #if DEBUG_DATASTORE > 1
677       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
678                   "Queue empty\n");
679 #endif
680       return; /* no entry in queue */
681     }
682   if (qe->was_transmitted == GNUNET_YES)
683     {
684 #if DEBUG_DATASTORE > 1
685       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
686                   "Head request already transmitted\n");
687 #endif
688       return; /* waiting for replies */
689     }
690   if (h->th != NULL)
691     {
692 #if DEBUG_DATASTORE > 1
693       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
694                   "Pending transmission request\n");
695 #endif
696       return; /* request pending */
697     }
698   if (h->client == NULL)
699     {
700 #if DEBUG_DATASTORE > 1
701       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
702                   "Not connected\n");
703 #endif
704       return; /* waiting for reconnect */
705     }
706   if (GNUNET_YES == h->in_receive)
707     {
708       /* wait for response to previous query */
709       return; 
710     }
711 #if DEBUG_DATASTORE
712   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
713               "Queueing %u byte request to DATASTORE\n",
714               qe->message_size);
715 #endif
716   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
717                                                qe->message_size,
718                                                GNUNET_TIME_absolute_get_remaining (qe->timeout),
719                                                GNUNET_YES,
720                                                &transmit_request,
721                                                h);
722   GNUNET_assert (GNUNET_NO == h->in_receive);
723   GNUNET_break (NULL != h->th);
724 }
725
726
727 /**
728  * Dummy continuation used to do nothing (but be non-zero).
729  *
730  * @param cls closure
731  * @param result result 
732  * @param emsg error message
733  */
734 static void
735 drop_status_cont (void *cls, int32_t result, const char *emsg)
736 {
737   /* do nothing */
738 }
739
740
741 /**
742  * Free a queue entry.  Removes the given entry from the
743  * queue and releases associated resources.  Does NOT
744  * call the callback.
745  * 
746  * @param qe entry to free.
747  */
748 static void
749 free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
750 {
751   struct GNUNET_DATASTORE_Handle *h = qe->h;
752
753   GNUNET_CONTAINER_DLL_remove (h->queue_head,
754                                h->queue_tail,
755                                qe);
756   if (qe->task != GNUNET_SCHEDULER_NO_TASK)
757     {
758       GNUNET_SCHEDULER_cancel (qe->task);
759       qe->task = GNUNET_SCHEDULER_NO_TASK;
760     }
761   h->queue_size--;
762   qe->was_transmitted = GNUNET_SYSERR; /* use-after-free warning */
763   GNUNET_free (qe);
764 }
765
766
767 /**
768  * Type of a function to call when we receive a message
769  * from the service.
770  *
771  * @param cls closure
772  * @param msg message received, NULL on timeout or fatal error
773  */
774 static void 
775 process_status_message (void *cls,
776                         const struct
777                         GNUNET_MessageHeader * msg)
778 {
779   struct GNUNET_DATASTORE_Handle *h = cls;
780   struct GNUNET_DATASTORE_QueueEntry *qe;
781   struct StatusContext rc;
782   const struct StatusMessage *sm;
783   const char *emsg;
784   int32_t status;
785   int was_transmitted;
786
787   if (NULL == (qe = h->queue_head))
788     {
789       GNUNET_break (0);
790       do_disconnect (h);
791       return;
792     }
793   rc = qe->qc.sc;
794   if (msg == NULL)
795     {      
796       was_transmitted = qe->was_transmitted;
797       free_queue_entry (qe);
798       if (was_transmitted == GNUNET_YES)
799         do_disconnect (h);
800       else
801         process_queue (h);
802       if (rc.cont != NULL)
803         rc.cont (rc.cont_cls, 
804                  GNUNET_SYSERR,
805                  _("Failed to receive status response from database."));
806       return;
807     }
808   GNUNET_assert (GNUNET_YES == qe->was_transmitted);
809   free_queue_entry (qe);
810   if ( (ntohs(msg->size) < sizeof(struct StatusMessage)) ||
811        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS) ) 
812     {
813       GNUNET_break (0);
814       h->retry_time = GNUNET_TIME_UNIT_ZERO;
815       do_disconnect (h);
816       if (rc.cont != NULL)
817         rc.cont (rc.cont_cls, 
818                  GNUNET_SYSERR,
819                  _("Error reading response from datastore service"));
820       return;
821     }
822   sm = (const struct StatusMessage*) msg;
823   status = ntohl(sm->status);
824   emsg = NULL;
825   if (ntohs(msg->size) > sizeof(struct StatusMessage))
826     {
827       emsg = (const char*) &sm[1];
828       if (emsg[ntohs(msg->size) - sizeof(struct StatusMessage) - 1] != '\0')
829         {
830           GNUNET_break (0);
831           emsg = _("Invalid error message received from datastore service");
832         }
833     }  
834   if ( (status == GNUNET_SYSERR) &&
835        (emsg == NULL) )
836     {
837       GNUNET_break (0);
838       emsg = _("Invalid error message received from datastore service");
839     }
840 #if DEBUG_DATASTORE
841   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
842               "Received status %d/%s\n",
843               (int) status,
844               emsg);
845 #endif
846   GNUNET_STATISTICS_update (h->stats,
847                             gettext_noop ("# status messages received"),
848                             1,
849                             GNUNET_NO);
850   h->retry_time.rel_value = 0;
851   process_queue (h);
852   if (rc.cont != NULL)
853     rc.cont (rc.cont_cls, 
854              status,
855              emsg);
856 }
857
858
859 /**
860  * Store an item in the datastore.  If the item is already present,
861  * the priorities are summed up and the higher expiration time and
862  * lower anonymity level is used.
863  *
864  * @param h handle to the datastore
865  * @param rid reservation ID to use (from "reserve"); use 0 if no
866  *            prior reservation was made
867  * @param key key for the value
868  * @param size number of bytes in data
869  * @param data content stored
870  * @param type type of the content
871  * @param priority priority of the content
872  * @param anonymity anonymity-level for the content
873  * @param replication how often should the content be replicated to other peers?
874  * @param expiration expiration time for the content
875  * @param queue_priority ranking of this request in the priority queue
876  * @param max_queue_size at what queue size should this request be dropped
877  *        (if other requests of higher priority are in the queue)
878  * @param timeout timeout for the operation
879  * @param cont continuation to call when done
880  * @param cont_cls closure for cont
881  * @return NULL if the entry was not queued, otherwise a handle that can be used to
882  *         cancel; note that even if NULL is returned, the callback will be invoked
883  *         (or rather, will already have been invoked)
884  */
885 struct GNUNET_DATASTORE_QueueEntry *
886 GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
887                       uint32_t rid,
888                       const GNUNET_HashCode * key,
889                       size_t size,
890                       const void *data,
891                       enum GNUNET_BLOCK_Type type,
892                       uint32_t priority,
893                       uint32_t anonymity,
894                       uint32_t replication,
895                       struct GNUNET_TIME_Absolute expiration,
896                       unsigned int queue_priority,
897                       unsigned int max_queue_size,
898                       struct GNUNET_TIME_Relative timeout,
899                       GNUNET_DATASTORE_ContinuationWithStatus cont,
900                       void *cont_cls)
901 {
902   struct GNUNET_DATASTORE_QueueEntry *qe;
903   struct DataMessage *dm;
904   size_t msize;
905   union QueueContext qc;
906
907 #if DEBUG_DATASTORE
908   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
909               "Asked to put %u bytes of data under key `%s' for %llu ms\n",
910               size,
911               GNUNET_h2s (key),
912               GNUNET_TIME_absolute_get_remaining (expiration).rel_value);
913 #endif
914   msize = sizeof(struct DataMessage) + size;
915   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
916   qc.sc.cont = cont;
917   qc.sc.cont_cls = cont_cls;
918   qe = make_queue_entry (h, msize,
919                          queue_priority, max_queue_size, timeout,
920                          &process_status_message, &qc);
921   if (qe == NULL)
922     {
923 #if DEBUG_DATASTORE
924       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
925                   "Could not create queue entry for PUT\n");
926 #endif
927       return NULL;
928     }
929   GNUNET_STATISTICS_update (h->stats,
930                             gettext_noop ("# PUT requests executed"),
931                             1,
932                             GNUNET_NO);
933   dm = (struct DataMessage* ) &qe[1];
934   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
935   dm->header.size = htons(msize);
936   dm->rid = htonl(rid);
937   dm->size = htonl( (uint32_t) size);
938   dm->type = htonl(type);
939   dm->priority = htonl(priority);
940   dm->anonymity = htonl(anonymity);
941   dm->replication = htonl (replication);
942   dm->reserved = htonl (0);
943   dm->uid = GNUNET_htonll(0);
944   dm->expiration = GNUNET_TIME_absolute_hton(expiration);
945   dm->key = *key;
946   memcpy (&dm[1], data, size);
947   process_queue (h);
948   return qe;
949 }
950
951
952 /**
953  * Reserve space in the datastore.  This function should be used
954  * to avoid "out of space" failures during a longer sequence of "put"
955  * operations (for example, when a file is being inserted).
956  *
957  * @param h handle to the datastore
958  * @param amount how much space (in bytes) should be reserved (for content only)
959  * @param entries how many entries will be created (to calculate per-entry overhead)
960  * @param queue_priority ranking of this request in the priority queue
961  * @param max_queue_size at what queue size should this request be dropped
962  *        (if other requests of higher priority are in the queue)
963  * @param timeout how long to wait at most for a response (or before dying in queue)
964  * @param cont continuation to call when done; "success" will be set to
965  *             a positive reservation value if space could be reserved.
966  * @param cont_cls closure for cont
967  * @return NULL if the entry was not queued, otherwise a handle that can be used to
968  *         cancel; note that even if NULL is returned, the callback will be invoked
969  *         (or rather, will already have been invoked)
970  */
971 struct GNUNET_DATASTORE_QueueEntry *
972 GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
973                           uint64_t amount,
974                           uint32_t entries,
975                           unsigned int queue_priority,
976                           unsigned int max_queue_size,
977                           struct GNUNET_TIME_Relative timeout,
978                           GNUNET_DATASTORE_ContinuationWithStatus cont,
979                           void *cont_cls)
980 {
981   struct GNUNET_DATASTORE_QueueEntry *qe;
982   struct ReserveMessage *rm;
983   union QueueContext qc;
984
985   if (cont == NULL)
986     cont = &drop_status_cont;
987 #if DEBUG_DATASTORE
988   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
989               "Asked to reserve %llu bytes of data and %u entries'\n",
990               (unsigned long long) amount,
991               (unsigned int) entries);
992 #endif
993   qc.sc.cont = cont;
994   qc.sc.cont_cls = cont_cls;
995   qe = make_queue_entry (h, sizeof(struct ReserveMessage),
996                          queue_priority, max_queue_size, timeout,
997                          &process_status_message, &qc);
998   if (qe == NULL)
999     {
1000 #if DEBUG_DATASTORE
1001       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1002                   "Could not create queue entry to reserve\n");
1003 #endif
1004       return NULL;
1005     }
1006   GNUNET_STATISTICS_update (h->stats,
1007                             gettext_noop ("# RESERVE requests executed"),
1008                             1,
1009                             GNUNET_NO);
1010   rm = (struct ReserveMessage*) &qe[1];
1011   rm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
1012   rm->header.size = htons(sizeof (struct ReserveMessage));
1013   rm->entries = htonl(entries);
1014   rm->amount = GNUNET_htonll(amount);
1015   process_queue (h);
1016   return qe;
1017 }
1018
1019
1020 /**
1021  * Signal that all of the data for which a reservation was made has
1022  * been stored and that whatever excess space might have been reserved
1023  * can now be released.
1024  *
1025  * @param h handle to the datastore
1026  * @param rid reservation ID (value of "success" in original continuation
1027  *        from the "reserve" function).
1028  * @param queue_priority ranking of this request in the priority queue
1029  * @param max_queue_size at what queue size should this request be dropped
1030  *        (if other requests of higher priority are in the queue)
1031  * @param queue_priority ranking of this request in the priority queue
1032  * @param max_queue_size at what queue size should this request be dropped
1033  *        (if other requests of higher priority are in the queue)
1034  * @param timeout how long to wait at most for a response
1035  * @param cont continuation to call when done
1036  * @param cont_cls closure for cont
1037  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1038  *         cancel; note that even if NULL is returned, the callback will be invoked
1039  *         (or rather, will already have been invoked)
1040  */
1041 struct GNUNET_DATASTORE_QueueEntry *
1042 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
1043                                   uint32_t rid,
1044                                   unsigned int queue_priority,
1045                                   unsigned int max_queue_size,
1046                                   struct GNUNET_TIME_Relative timeout,
1047                                   GNUNET_DATASTORE_ContinuationWithStatus cont,
1048                                   void *cont_cls)
1049 {
1050   struct GNUNET_DATASTORE_QueueEntry *qe;
1051   struct ReleaseReserveMessage *rrm;
1052   union QueueContext qc;
1053
1054   if (cont == NULL)
1055     cont = &drop_status_cont;
1056 #if DEBUG_DATASTORE
1057   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1058               "Asked to release reserve %d\n",
1059               rid);
1060 #endif
1061   qc.sc.cont = cont;
1062   qc.sc.cont_cls = cont_cls;
1063   qe = make_queue_entry (h, sizeof(struct ReleaseReserveMessage),
1064                          queue_priority, max_queue_size, timeout,
1065                          &process_status_message, &qc);
1066   if (qe == NULL)
1067     {
1068 #if DEBUG_DATASTORE
1069       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1070                   "Could not create queue entry to release reserve\n");
1071 #endif
1072       return NULL;
1073     }
1074   GNUNET_STATISTICS_update (h->stats,
1075                             gettext_noop ("# RELEASE RESERVE requests executed"),
1076                             1,
1077                             GNUNET_NO);
1078   rrm = (struct ReleaseReserveMessage*) &qe[1];
1079   rrm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
1080   rrm->header.size = htons(sizeof (struct ReleaseReserveMessage));
1081   rrm->rid = htonl(rid);
1082   process_queue (h);
1083   return qe;
1084 }
1085
1086
1087 /**
1088  * Update a value in the datastore.
1089  *
1090  * @param h handle to the datastore
1091  * @param uid identifier for the value
1092  * @param priority how much to increase the priority of the value
1093  * @param expiration new expiration value should be MAX of existing and this argument
1094  * @param queue_priority ranking of this request in the priority queue
1095  * @param max_queue_size at what queue size should this request be dropped
1096  *        (if other requests of higher priority are in the queue)
1097  * @param timeout how long to wait at most for a response
1098  * @param cont continuation to call when done
1099  * @param cont_cls closure for cont
1100  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1101  *         cancel; note that even if NULL is returned, the callback will be invoked
1102  *         (or rather, will already have been invoked)
1103  */
1104 struct GNUNET_DATASTORE_QueueEntry *
1105 GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
1106                          uint64_t uid,
1107                          uint32_t priority,
1108                          struct GNUNET_TIME_Absolute expiration,
1109                          unsigned int queue_priority,
1110                          unsigned int max_queue_size,
1111                          struct GNUNET_TIME_Relative timeout,
1112                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1113                          void *cont_cls)
1114 {
1115   struct GNUNET_DATASTORE_QueueEntry *qe;
1116   struct UpdateMessage *um;
1117   union QueueContext qc;
1118
1119   if (cont == NULL)
1120     cont = &drop_status_cont;
1121 #if DEBUG_DATASTORE
1122   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1123               "Asked to update entry %llu raising priority by %u and expiration to %llu\n",
1124               uid,
1125               (unsigned int) priority,
1126               (unsigned long long) expiration.abs_value);
1127 #endif
1128   qc.sc.cont = cont;
1129   qc.sc.cont_cls = cont_cls;
1130   qe = make_queue_entry (h, sizeof(struct UpdateMessage),
1131                          queue_priority, max_queue_size, timeout,
1132                          &process_status_message, &qc);
1133   if (qe == NULL)
1134     {
1135 #if DEBUG_DATASTORE
1136       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1137                   "Could not create queue entry for UPDATE\n");
1138 #endif
1139       return NULL;
1140     }
1141   GNUNET_STATISTICS_update (h->stats,
1142                             gettext_noop ("# UPDATE requests executed"),
1143                             1,
1144                             GNUNET_NO);
1145   um = (struct UpdateMessage*) &qe[1];
1146   um->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
1147   um->header.size = htons(sizeof (struct UpdateMessage));
1148   um->priority = htonl(priority);
1149   um->expiration = GNUNET_TIME_absolute_hton(expiration);
1150   um->uid = GNUNET_htonll(uid);
1151   process_queue (h);
1152   return qe;
1153 }
1154
1155
1156 /**
1157  * Explicitly remove some content from the database.
1158  * The "cont"inuation will be called with status
1159  * "GNUNET_OK" if content was removed, "GNUNET_NO"
1160  * if no matching entry was found and "GNUNET_SYSERR"
1161  * on all other types of errors.
1162  *
1163  * @param h handle to the datastore
1164  * @param key key for the value
1165  * @param size number of bytes in data
1166  * @param data content stored
1167  * @param queue_priority ranking of this request in the priority queue
1168  * @param max_queue_size at what queue size should this request be dropped
1169  *        (if other requests of higher priority are in the queue)
1170  * @param timeout how long to wait at most for a response
1171  * @param cont continuation to call when done
1172  * @param cont_cls closure for cont
1173  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1174  *         cancel; note that even if NULL is returned, the callback will be invoked
1175  *         (or rather, will already have been invoked)
1176  */
1177 struct GNUNET_DATASTORE_QueueEntry *
1178 GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
1179                          const GNUNET_HashCode *key,
1180                          size_t size,
1181                          const void *data,
1182                          unsigned int queue_priority,
1183                          unsigned int max_queue_size,
1184                          struct GNUNET_TIME_Relative timeout,
1185                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1186                          void *cont_cls)
1187 {
1188   struct GNUNET_DATASTORE_QueueEntry *qe;
1189   struct DataMessage *dm;
1190   size_t msize;
1191   union QueueContext qc;
1192
1193   if (cont == NULL)
1194     cont = &drop_status_cont;
1195 #if DEBUG_DATASTORE
1196   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1197               "Asked to remove %u bytes under key `%s'\n",
1198               size,
1199               GNUNET_h2s (key));
1200 #endif
1201   qc.sc.cont = cont;
1202   qc.sc.cont_cls = cont_cls;
1203   msize = sizeof(struct DataMessage) + size;
1204   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1205   qe = make_queue_entry (h, msize,
1206                          queue_priority, max_queue_size, timeout,
1207                          &process_status_message, &qc);
1208   if (qe == NULL)
1209     {
1210 #if DEBUG_DATASTORE
1211       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1212                   "Could not create queue entry for REMOVE\n");
1213 #endif
1214       return NULL;
1215     }
1216   GNUNET_STATISTICS_update (h->stats,
1217                             gettext_noop ("# REMOVE requests executed"),
1218                             1,
1219                             GNUNET_NO);
1220   dm = (struct DataMessage*) &qe[1];
1221   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
1222   dm->header.size = htons(msize);
1223   dm->rid = htonl(0);
1224   dm->size = htonl(size);
1225   dm->type = htonl(0);
1226   dm->priority = htonl(0);
1227   dm->anonymity = htonl(0);
1228   dm->uid = GNUNET_htonll(0);
1229   dm->expiration = GNUNET_TIME_absolute_hton(GNUNET_TIME_UNIT_ZERO_ABS);
1230   dm->key = *key;
1231   memcpy (&dm[1], data, size);
1232   process_queue (h);
1233   return qe;
1234 }
1235
1236
1237 /**
1238  * Type of a function to call when we receive a message
1239  * from the service.
1240  *
1241  * @param cls closure
1242  * @param msg message received, NULL on timeout or fatal error
1243  */
1244 static void 
1245 process_result_message (void *cls,
1246                         const struct GNUNET_MessageHeader *msg)
1247 {
1248   struct GNUNET_DATASTORE_Handle *h = cls;
1249   struct GNUNET_DATASTORE_QueueEntry *qe;
1250   struct ResultContext rc;
1251   const struct DataMessage *dm;
1252   int was_transmitted;
1253
1254   if (msg == NULL)
1255     {
1256       qe = h->queue_head;
1257       GNUNET_assert (NULL != qe);
1258       rc = qe->qc.rc;
1259       was_transmitted = qe->was_transmitted;
1260       free_queue_entry (qe);
1261       if (was_transmitted == GNUNET_YES)
1262         {
1263           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1264                       _("Failed to receive response from database.\n"));
1265           do_disconnect (h);
1266         }
1267       else
1268         {
1269           process_queue (h);
1270         }
1271       if (rc.proc != NULL)
1272         rc.proc (rc.proc_cls,
1273                  NULL, 0, NULL, 0, 0, 0, 
1274                  GNUNET_TIME_UNIT_ZERO_ABS, 0);         
1275       return;
1276     }
1277   if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) 
1278     {
1279       GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader));
1280       qe = h->queue_head;
1281       rc = qe->qc.rc;
1282       GNUNET_assert (GNUNET_YES == qe->was_transmitted);
1283       free_queue_entry (qe);
1284 #if DEBUG_DATASTORE
1285       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1286                   "Received end of result set, new queue size is %u\n",
1287                   h->queue_size);
1288 #endif
1289       if (rc.proc != NULL)
1290         rc.proc (rc.proc_cls,
1291                  NULL, 0, NULL, 0, 0, 0, 
1292                  GNUNET_TIME_UNIT_ZERO_ABS, 0); 
1293       h->retry_time.rel_value = 0;
1294       h->result_count = 0;
1295       process_queue (h);
1296       return;
1297     }
1298   qe = h->queue_head;
1299   rc = qe->qc.rc;
1300   if (GNUNET_YES != qe->was_transmitted)
1301     {
1302       GNUNET_break (0);
1303       free_queue_entry (qe);
1304       h->retry_time = GNUNET_TIME_UNIT_ZERO;
1305       do_disconnect (h);
1306       if (rc.proc != NULL)
1307         rc.proc (rc.proc_cls,
1308                  NULL, 0, NULL, 0, 0, 0, 
1309                  GNUNET_TIME_UNIT_ZERO_ABS, 0);
1310       return;
1311     }
1312   if ( (ntohs(msg->size) < sizeof(struct DataMessage)) ||
1313        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) ||
1314        (ntohs(msg->size) != sizeof(struct DataMessage) + ntohl (((const struct DataMessage*)msg)->size)) )
1315     {
1316       GNUNET_break (0);
1317       free_queue_entry (qe);
1318       h->retry_time = GNUNET_TIME_UNIT_ZERO;
1319       do_disconnect (h);
1320       if (rc.proc != NULL)
1321         rc.proc (rc.proc_cls,
1322                  NULL, 0, NULL, 0, 0, 0, 
1323                  GNUNET_TIME_UNIT_ZERO_ABS, 0);
1324       return;
1325     }
1326   GNUNET_STATISTICS_update (h->stats,
1327                             gettext_noop ("# Results received"),
1328                             1,
1329                             GNUNET_NO);
1330   dm = (const struct DataMessage*) msg;
1331 #if DEBUG_DATASTORE
1332   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1333               "Received result %llu with type %u and size %u with key %s\n",
1334               (unsigned long long) GNUNET_ntohll(dm->uid),
1335               ntohl(dm->type),
1336               ntohl(dm->size),
1337               GNUNET_h2s(&dm->key));
1338 #endif
1339   free_queue_entry (qe);
1340   h->retry_time.rel_value = 0;
1341   process_queue (h);
1342   if (rc.proc != NULL)
1343     rc.proc (rc.proc_cls,
1344              &dm->key,
1345              ntohl(dm->size),
1346              &dm[1],
1347              ntohl(dm->type),
1348              ntohl(dm->priority),
1349              ntohl(dm->anonymity),
1350              GNUNET_TIME_absolute_ntoh(dm->expiration), 
1351              GNUNET_ntohll(dm->uid));
1352 }
1353
1354
1355 /**
1356  * Get a random value from the datastore for content replication.
1357  * Returns a single, random value among those with the highest
1358  * replication score, lowering positive replication scores by one for
1359  * the chosen value (if only content with a replication score exists,
1360  * a random value is returned and replication scores are not changed).
1361  *
1362  * @param h handle to the datastore
1363  * @param queue_priority ranking of this request in the priority queue
1364  * @param max_queue_size at what queue size should this request be dropped
1365  *        (if other requests of higher priority are in the queue)
1366  * @param timeout how long to wait at most for a response
1367  * @param proc function to call on a random value; it
1368  *        will be called once with a value (if available)
1369  *        and always once with a value of NULL.
1370  * @param proc_cls closure for proc
1371  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1372  *         cancel
1373  */
1374 struct GNUNET_DATASTORE_QueueEntry *
1375 GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
1376                                       unsigned int queue_priority,
1377                                       unsigned int max_queue_size,
1378                                       struct GNUNET_TIME_Relative timeout,
1379                                       GNUNET_DATASTORE_DatumProcessor proc, 
1380                                       void *proc_cls)
1381 {
1382   struct GNUNET_DATASTORE_QueueEntry *qe;
1383   struct GNUNET_MessageHeader *m;
1384   union QueueContext qc;
1385
1386   GNUNET_assert (NULL != proc);
1387 #if DEBUG_DATASTORE
1388   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1389               "Asked to get replication entry in %llu ms\n",
1390               (unsigned long long) timeout.rel_value);
1391 #endif
1392   qc.rc.proc = proc;
1393   qc.rc.proc_cls = proc_cls;
1394   qe = make_queue_entry (h, sizeof(struct GNUNET_MessageHeader),
1395                          queue_priority, max_queue_size, timeout,
1396                          &process_result_message, &qc);
1397   if (qe == NULL)
1398     {
1399 #if DEBUG_DATASTORE
1400       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1401                   "Could not create queue entry for GET REPLICATION\n");
1402 #endif
1403       return NULL;    
1404     }
1405   GNUNET_STATISTICS_update (h->stats,
1406                             gettext_noop ("# GET REPLICATION requests executed"),
1407                             1,
1408                             GNUNET_NO);
1409   m = (struct GNUNET_MessageHeader*) &qe[1];
1410   m->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION);
1411   m->size = htons(sizeof (struct GNUNET_MessageHeader));
1412   process_queue (h);
1413   return qe;
1414 }
1415
1416
1417 /**
1418  * Get a single zero-anonymity value from the datastore.
1419  *
1420  * @param h handle to the datastore
1421  * @param offset offset of the result (modulo num-results); set to
1422  *               a random 64-bit value initially; then increment by
1423  *               one each time; detect that all results have been found by uid
1424  *               being again the first uid ever returned.
1425  * @param queue_priority ranking of this request in the priority queue
1426  * @param max_queue_size at what queue size should this request be dropped
1427  *        (if other requests of higher priority are in the queue)
1428  * @param timeout how long to wait at most for a response
1429  * @param type allowed type for the operation (never zero)
1430  * @param proc function to call on a random value; it
1431  *        will be called once with a value (if available)
1432  *        or with NULL if none value exists.
1433  * @param proc_cls closure for proc
1434  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1435  *         cancel
1436  */
1437 struct GNUNET_DATASTORE_QueueEntry *
1438 GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1439                                      uint64_t offset,
1440                                      unsigned int queue_priority,
1441                                      unsigned int max_queue_size,
1442                                      struct GNUNET_TIME_Relative timeout,
1443                                      enum GNUNET_BLOCK_Type type,
1444                                      GNUNET_DATASTORE_DatumProcessor proc, 
1445                                      void *proc_cls)
1446 {
1447   struct GNUNET_DATASTORE_QueueEntry *qe;
1448   struct GetZeroAnonymityMessage *m;
1449   union QueueContext qc;
1450
1451   GNUNET_assert (NULL != proc);
1452   GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY);
1453 #if DEBUG_DATASTORE
1454   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1455               "Asked to get %llu-th zero-anonymity entry of type %d in %llu ms\n",
1456               (unsigned long long) offset,
1457               type,
1458               (unsigned long long) timeout.rel_value);
1459 #endif
1460   qc.rc.proc = proc;
1461   qc.rc.proc_cls = proc_cls;
1462   qe = make_queue_entry (h, sizeof(struct GetZeroAnonymityMessage),
1463                          queue_priority, max_queue_size, timeout,
1464                          &process_result_message, &qc);
1465   if (qe == NULL)
1466     {
1467 #if DEBUG_DATASTORE
1468       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1469                   "Could not create queue entry for zero-anonymity procation\n");
1470 #endif
1471       return NULL;    
1472     }
1473   GNUNET_STATISTICS_update (h->stats,
1474                             gettext_noop ("# GET ZERO ANONYMITY requests executed"),
1475                             1,
1476                             GNUNET_NO);
1477   m = (struct GetZeroAnonymityMessage*) &qe[1];
1478   m->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY);
1479   m->header.size = htons(sizeof (struct GetZeroAnonymityMessage));
1480   m->type = htonl ((uint32_t) type);
1481   m->offset = GNUNET_htonll (offset);
1482   process_queue (h);
1483   return qe;
1484 }
1485
1486
1487 /**
1488  * Get a result for a particular key from the datastore.  The processor
1489  * will only be called once.
1490  *
1491  * @param h handle to the datastore
1492  * @param offset offset of the result (modulo num-results); set to
1493  *               a random 64-bit value initially; then increment by
1494  *               one each time; detect that all results have been found by uid
1495  *               being again the first uid ever returned.
1496  * @param key maybe NULL (to match all entries)
1497  * @param type desired type, 0 for any
1498  * @param queue_priority ranking of this request in the priority queue
1499  * @param max_queue_size at what queue size should this request be dropped
1500  *        (if other requests of higher priority are in the queue)
1501  * @param timeout how long to wait at most for a response
1502  * @param proc function to call on each matching value;
1503  *        will be called once with a NULL value at the end
1504  * @param proc_cls closure for proc
1505  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1506  *         cancel
1507  */
1508 struct GNUNET_DATASTORE_QueueEntry *
1509 GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h,
1510                           uint64_t offset,
1511                           const GNUNET_HashCode * key,
1512                           enum GNUNET_BLOCK_Type type,
1513                           unsigned int queue_priority,
1514                           unsigned int max_queue_size,
1515                           struct GNUNET_TIME_Relative timeout,
1516                           GNUNET_DATASTORE_DatumProcessor proc, 
1517                           void *proc_cls)
1518 {
1519   struct GNUNET_DATASTORE_QueueEntry *qe;
1520   struct GetMessage *gm;
1521   union QueueContext qc;
1522
1523   GNUNET_assert (NULL != proc);
1524 #if DEBUG_DATASTORE
1525   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1526               "Asked to look for data of type %u under key `%s'\n",
1527               (unsigned int) type,
1528               GNUNET_h2s (key));
1529 #endif
1530   qc.rc.proc = proc;
1531   qc.rc.proc_cls = proc_cls;
1532   qe = make_queue_entry (h, sizeof(struct GetMessage),
1533                          queue_priority, max_queue_size, timeout,
1534                          &process_result_message, &qc);
1535   if (qe == NULL)
1536     {
1537 #if DEBUG_DATASTORE
1538       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1539                   "Could not queue request for `%s'\n",
1540                   GNUNET_h2s (key));
1541 #endif
1542       return NULL;
1543     }
1544   GNUNET_STATISTICS_update (h->stats,
1545                             gettext_noop ("# GET requests executed"),
1546                             1,
1547                             GNUNET_NO);
1548   gm = (struct GetMessage*) &qe[1];
1549   gm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1550   gm->type = htonl(type);
1551   gm->offset = GNUNET_htonll (offset);
1552   if (key != NULL)
1553     {
1554       gm->header.size = htons(sizeof (struct GetMessage));
1555       gm->key = *key;
1556     }
1557   else
1558     {
1559       gm->header.size = htons(sizeof (struct GetMessage) - sizeof(GNUNET_HashCode));
1560     }
1561   process_queue (h);
1562   return qe;
1563 }
1564
1565
1566 /**
1567  * Cancel a datastore operation.  The final callback from the
1568  * operation must not have been done yet.
1569  * 
1570  * @param qe operation to cancel
1571  */
1572 void
1573 GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
1574 {
1575   struct GNUNET_DATASTORE_Handle *h;
1576
1577   GNUNET_assert (GNUNET_SYSERR != qe->was_transmitted);
1578   h = qe->h;
1579 #if DEBUG_DATASTORE
1580   GNUNET_log  (GNUNET_ERROR_TYPE_DEBUG,
1581                "Pending DATASTORE request %p cancelled (%d, %d)\n",
1582                qe,
1583                qe->was_transmitted,
1584                h->queue_head == qe);
1585 #endif
1586   if (GNUNET_YES == qe->was_transmitted) 
1587     {
1588       free_queue_entry (qe);
1589       h->skip_next_messages++;
1590       return;
1591     }
1592   free_queue_entry (qe);
1593   process_queue (h);
1594 }
1595
1596
1597 /* end of datastore_api.c */