proper shutdown
[oweals/gnunet.git] / src / datastore / datastore_api.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/datastore_api.c
23  * @brief Management for the datastore for files stored on a GNUnet node.  Implements
24  *        a priority queue for requests (with timeouts).
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet_arm_service.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_datastore_service.h"
31 #include "gnunet_statistics_service.h"
32 #include "datastore.h"
33
34 /**
35  * If a client stopped asking for more results, how many more do
36  * we receive from the DB before killing the connection?  Trade-off
37  * between re-doing TCP handshakes and (needlessly) receiving 
38  * useless results.
39  */
40 #define MAX_EXCESS_RESULTS 8
41
42 /**
43  * Context for processing status messages.
44  */
45 struct StatusContext
46 {
47   /**
48    * Continuation to call with the status.
49    */
50   GNUNET_DATASTORE_ContinuationWithStatus cont;
51
52   /**
53    * Closure for cont.
54    */
55   void *cont_cls;
56
57 };
58
59
60 /**
61  * Context for processing result messages.
62  */
63 struct ResultContext
64 {
65   /**
66    * Function to call with the result.
67    */
68   GNUNET_DATASTORE_DatumProcessor proc;
69
70   /**
71    * Closure for proc.
72    */
73   void *proc_cls;
74
75 };
76
77
78 /**
79  *  Context for a queue operation.
80  */
81 union QueueContext
82 {
83
84   struct StatusContext sc;
85   
86   struct ResultContext rc;
87
88 };
89
90
91
92 /**
93  * Entry in our priority queue.
94  */
95 struct GNUNET_DATASTORE_QueueEntry
96 {
97
98   /**
99    * This is a linked list.
100    */
101   struct GNUNET_DATASTORE_QueueEntry *next;
102
103   /**
104    * This is a linked list.
105    */
106   struct GNUNET_DATASTORE_QueueEntry *prev;
107
108   /**
109    * Handle to the master context.
110    */
111   struct GNUNET_DATASTORE_Handle *h;
112
113   /**
114    * Response processor (NULL if we are not waiting for a response).
115    * This struct should be used for the closure, function-specific
116    * arguments can be passed via 'qc'.
117    */
118   GNUNET_CLIENT_MessageHandler response_proc;
119
120   /**
121    * Function to call after transmission of the request.
122    */
123   GNUNET_DATASTORE_ContinuationWithStatus cont;
124    
125   /**
126    * Closure for 'cont'.
127    */
128   void *cont_cls;
129
130   /**
131    * Context for the operation.
132    */
133   union QueueContext qc;
134
135   /**
136    * Task for timeout signalling.
137    */
138   GNUNET_SCHEDULER_TaskIdentifier task;
139
140   /**
141    * Timeout for the current operation.
142    */
143   struct GNUNET_TIME_Absolute timeout;
144
145   /**
146    * Priority in the queue.
147    */
148   unsigned int priority;
149
150   /**
151    * Maximum allowed length of queue (otherwise
152    * this request should be discarded).
153    */
154   unsigned int max_queue;
155
156   /**
157    * Number of bytes in the request message following
158    * this struct.  32-bit value for nicer memory
159    * access (and overall struct alignment).
160    */
161   uint32_t message_size;
162
163   /**
164    * Has this message been transmitted to the service?
165    * Only ever GNUNET_YES for the head of the queue.
166    * Note that the overall struct should end at a 
167    * multiple of 64 bits.
168    */
169   int was_transmitted;
170   
171 };
172
173 /**
174  * Handle to the datastore service. 
175  */
176 struct GNUNET_DATASTORE_Handle
177 {
178
179   /**
180    * Our configuration.
181    */
182   const struct GNUNET_CONFIGURATION_Handle *cfg;
183
184   /**
185    * Current connection to the datastore service.
186    */
187   struct GNUNET_CLIENT_Connection *client;
188
189   /**
190    * Handle for statistics.
191    */
192   struct GNUNET_STATISTICS_Handle *stats;
193
194   /**
195    * Current transmit handle.
196    */
197   struct GNUNET_CLIENT_TransmitHandle *th;
198
199   /**
200    * Current head of priority queue.
201    */
202   struct GNUNET_DATASTORE_QueueEntry *queue_head;
203
204   /**
205    * Current tail of priority queue.
206    */
207   struct GNUNET_DATASTORE_QueueEntry *queue_tail;
208
209   /**
210    * Task for trying to reconnect.
211    */
212   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
213
214   /**
215    * How quickly should we retry?  Used for exponential back-off on
216    * connect-errors.
217    */
218   struct GNUNET_TIME_Relative retry_time;
219
220   /**
221    * Number of entries in the queue.
222    */
223   unsigned int queue_size;
224
225   /**
226    * Number of results we're receiving for the current query
227    * after application stopped to care.  Used to determine when
228    * to reset the connection.
229    */
230   unsigned int result_count;
231
232   /**
233    * Are we currently trying to receive from the service?
234    */
235   int in_receive;
236
237   /**
238    * We should ignore the next message(s) from the service.
239    */
240   unsigned int skip_next_messages;
241
242 };
243
244
245
246 /**
247  * Connect to the datastore service.
248  *
249  * @param cfg configuration to use
250  * @return handle to use to access the service
251  */
252 struct GNUNET_DATASTORE_Handle *
253 GNUNET_DATASTORE_connect (const struct
254                           GNUNET_CONFIGURATION_Handle
255                           *cfg)
256 {
257   struct GNUNET_CLIENT_Connection *c;
258   struct GNUNET_DATASTORE_Handle *h;
259   
260   c = GNUNET_CLIENT_connect ("datastore", cfg);
261   if (c == NULL)
262     return NULL; /* oops */
263   h = GNUNET_malloc (sizeof(struct GNUNET_DATASTORE_Handle) + 
264                      GNUNET_SERVER_MAX_MESSAGE_SIZE - 1);
265   h->client = c;
266   h->cfg = cfg;
267   h->stats = GNUNET_STATISTICS_create ("datastore-api",
268                                        cfg);
269   return h;
270 }
271
272
273 /**
274  * Transmit DROP message to datastore service.
275  *
276  * @param cls the 'struct GNUNET_DATASTORE_Handle'
277  * @param size number of bytes that can be copied to buf
278  * @param buf where to copy the drop message
279  * @return number of bytes written to buf
280  */
281 static size_t
282 transmit_drop (void *cls,
283                size_t size, 
284                void *buf)
285 {
286   struct GNUNET_DATASTORE_Handle *h = cls;
287   struct GNUNET_MessageHeader *hdr;
288   
289   if (buf == NULL)
290     {
291       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
292                   _("Failed to transmit request to drop database.\n"));
293       GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
294       return 0;
295     }
296   GNUNET_assert (size >= sizeof(struct GNUNET_MessageHeader));
297   hdr = buf;
298   hdr->size = htons(sizeof(struct GNUNET_MessageHeader));
299   hdr->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
300   GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
301   return sizeof(struct GNUNET_MessageHeader);
302 }
303
304
305 /**
306  * Disconnect from the datastore service (and free
307  * associated resources).
308  *
309  * @param h handle to the datastore
310  * @param drop set to GNUNET_YES to delete all data in datastore (!)
311  */
312 void
313 GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
314                              int drop)
315 {
316   struct GNUNET_DATASTORE_QueueEntry *qe;
317
318   if (h->client != NULL)
319     {
320       GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
321       h->client = NULL;
322     }
323   if (h->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
324     {
325       GNUNET_SCHEDULER_cancel (h->reconnect_task);
326       h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
327     }
328   if (NULL != h->th)
329     {
330       GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
331       h->th = NULL;
332     }
333   while (NULL != (qe = h->queue_head))
334     {
335       GNUNET_assert (NULL != qe->response_proc);
336       qe->response_proc (h, NULL);
337     }
338   if (GNUNET_YES == drop) 
339     {
340       h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
341       if (h->client != NULL)
342         {
343           if (NULL != 
344               GNUNET_CLIENT_notify_transmit_ready (h->client,
345                                                    sizeof(struct GNUNET_MessageHeader),
346                                                    GNUNET_TIME_UNIT_MINUTES,
347                                                    GNUNET_YES,
348                                                    &transmit_drop,
349                                                    h))
350             return;
351           GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
352           h->client = NULL;
353         }
354       GNUNET_break (0);
355     }
356   GNUNET_STATISTICS_destroy (h->stats,
357                              GNUNET_NO);
358   h->stats = NULL;
359   GNUNET_free (h);
360 }
361
362
363 /**
364  * A request has timed out (before being transmitted to the service).
365  *
366  * @param cls the 'struct GNUNET_DATASTORE_QueueEntry'
367  * @param tc scheduler context
368  */
369 static void
370 timeout_queue_entry (void *cls,
371                      const struct GNUNET_SCHEDULER_TaskContext *tc)
372 {
373   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
374
375   GNUNET_STATISTICS_update (qe->h->stats,
376                             gettext_noop ("# queue entry timeouts"),
377                             1,
378                             GNUNET_NO);
379   qe->task = GNUNET_SCHEDULER_NO_TASK;
380   GNUNET_assert (qe->was_transmitted == GNUNET_NO);
381   qe->response_proc (qe->h, NULL);
382 }
383
384
385 /**
386  * Create a new entry for our priority queue (and possibly discard other entires if
387  * the queue is getting too long).
388  *
389  * @param h handle to the datastore
390  * @param msize size of the message to queue
391  * @param queue_priority priority of the entry
392  * @param max_queue_size at what queue size should this request be dropped
393  *        (if other requests of higher priority are in the queue)
394  * @param timeout timeout for the operation
395  * @param response_proc function to call with replies (can be NULL)
396  * @param qc client context (NOT a closure for response_proc)
397  * @return NULL if the queue is full 
398  */
399 static struct GNUNET_DATASTORE_QueueEntry *
400 make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
401                   size_t msize,
402                   unsigned int queue_priority,
403                   unsigned int max_queue_size,
404                   struct GNUNET_TIME_Relative timeout,
405                   GNUNET_CLIENT_MessageHandler response_proc,            
406                   const union QueueContext *qc)
407 {
408   struct GNUNET_DATASTORE_QueueEntry *ret;
409   struct GNUNET_DATASTORE_QueueEntry *pos;
410   unsigned int c;
411
412   c = 0;
413   pos = h->queue_head;
414   while ( (pos != NULL) &&
415           (c < max_queue_size) &&
416           (pos->priority >= queue_priority) )
417     {
418       c++;
419       pos = pos->next;
420     }
421   if (c >= max_queue_size)
422     {
423       GNUNET_STATISTICS_update (h->stats,
424                                 gettext_noop ("# queue overflows"),
425                                 1,
426                                 GNUNET_NO);
427       return NULL;
428     }
429   ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize);
430   ret->h = h;
431   ret->response_proc = response_proc;
432   ret->qc = *qc;
433   ret->timeout = GNUNET_TIME_relative_to_absolute (timeout);
434   ret->priority = queue_priority;
435   ret->max_queue = max_queue_size;
436   ret->message_size = msize;
437   ret->was_transmitted = GNUNET_NO;
438   if (pos == NULL)
439     {
440       /* append at the tail */
441       pos = h->queue_tail;
442     }
443   else
444     {
445       pos = pos->prev; 
446       /* do not insert at HEAD if HEAD query was already
447          transmitted and we are still receiving replies! */
448       if ( (pos == NULL) &&
449            (h->queue_head->was_transmitted) )
450         pos = h->queue_head;
451     }
452   c++;
453   GNUNET_STATISTICS_update (h->stats,
454                             gettext_noop ("# queue entries created"),
455                             1,
456                             GNUNET_NO);
457   GNUNET_CONTAINER_DLL_insert_after (h->queue_head,
458                                      h->queue_tail,
459                                      pos,
460                                      ret);
461   h->queue_size++;
462   ret->task = GNUNET_SCHEDULER_add_delayed (timeout,
463                                             &timeout_queue_entry,
464                                             ret);
465   pos = ret->next;
466   while (pos != NULL) 
467     {
468       if (pos->max_queue < h->queue_size)
469         {
470           GNUNET_assert (pos->response_proc != NULL);
471           /* move 'pos' element to head so that it will be 
472              killed on 'NULL' call below */
473           GNUNET_CONTAINER_DLL_remove (h->queue_head,
474                                        h->queue_tail,
475                                        pos);
476           GNUNET_CONTAINER_DLL_insert (h->queue_head,
477                                        h->queue_tail,
478                                        pos);
479           pos->response_proc (h, NULL);
480           break;
481         }
482       pos = pos->next;
483     }
484   return ret;
485 }
486
487
488 /**
489  * Process entries in the queue (or do nothing if we are already
490  * doing so).
491  * 
492  * @param h handle to the datastore
493  */
494 static void
495 process_queue (struct GNUNET_DATASTORE_Handle *h);
496
497
498 /**
499  * Try reconnecting to the datastore service.
500  *
501  * @param cls the 'struct GNUNET_DATASTORE_Handle'
502  * @param tc scheduler context
503  */
504 static void
505 try_reconnect (void *cls,
506                const struct GNUNET_SCHEDULER_TaskContext *tc)
507 {
508   struct GNUNET_DATASTORE_Handle *h = cls;
509
510   if (h->retry_time.rel_value < GNUNET_CONSTANTS_SERVICE_RETRY.rel_value)
511     h->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY;
512   else
513     h->retry_time = GNUNET_TIME_relative_multiply (h->retry_time, 2);
514   if (h->retry_time.rel_value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.rel_value)
515     h->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
516   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
517   h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
518   if (h->client == NULL)
519     {
520       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
521                   "DATASTORE reconnect failed (fatally)\n");
522       return;
523     }
524   GNUNET_STATISTICS_update (h->stats,
525                             gettext_noop ("# datastore connections (re)created"),
526                             1,
527                             GNUNET_NO);
528 #if DEBUG_DATASTORE
529   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
530               "Reconnected to DATASTORE\n");
531 #endif
532   process_queue (h);
533 }
534
535
536 /**
537  * Disconnect from the service and then try reconnecting to the datastore service
538  * after some delay.
539  *
540  * @param h handle to datastore to disconnect and reconnect
541  */
542 static void
543 do_disconnect (struct GNUNET_DATASTORE_Handle *h)
544 {
545   if (h->client == NULL)
546     {
547 #if DEBUG_DATASTORE
548       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
549                   "client NULL in disconnect, will not try to reconnect\n");
550 #endif
551       return;
552     }
553 #if 0
554   GNUNET_STATISTICS_update (stats,
555                             gettext_noop ("# reconnected to DATASTORE"),
556                             1,
557                             GNUNET_NO);
558 #endif
559   GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
560   h->skip_next_messages = 0;
561   h->client = NULL;
562   h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->retry_time,
563                                                     &try_reconnect,
564                                                     h);      
565 }
566
567
568 /**
569  * Transmit request from queue to datastore service.
570  *
571  * @param cls the 'struct GNUNET_DATASTORE_Handle'
572  * @param size number of bytes that can be copied to buf
573  * @param buf where to copy the drop message
574  * @return number of bytes written to buf
575  */
576 static size_t
577 transmit_request (void *cls,
578                   size_t size, 
579                   void *buf)
580 {
581   struct GNUNET_DATASTORE_Handle *h = cls;
582   struct GNUNET_DATASTORE_QueueEntry *qe;
583   size_t msize;
584
585   h->th = NULL;
586   if (NULL == (qe = h->queue_head))
587     return 0; /* no entry in queue */
588   if (buf == NULL)
589     {
590       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
591                   _("Failed to transmit request to DATASTORE.\n"));
592       GNUNET_STATISTICS_update (h->stats,
593                                 gettext_noop ("# transmission request failures"),
594                                 1,
595                                 GNUNET_NO);
596       do_disconnect (h);
597       return 0;
598     }
599   if (size < (msize = qe->message_size))
600     {
601       process_queue (h);
602       return 0;
603     }
604  #if DEBUG_DATASTORE
605   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
606               "Transmitting %u byte request to DATASTORE\n",
607               msize);
608 #endif
609   memcpy (buf, &qe[1], msize);
610   qe->was_transmitted = GNUNET_YES;
611   GNUNET_SCHEDULER_cancel (qe->task);
612   qe->task = GNUNET_SCHEDULER_NO_TASK;
613   GNUNET_assert (GNUNET_NO == h->in_receive);
614   h->in_receive = GNUNET_YES;
615   GNUNET_CLIENT_receive (h->client,
616                          qe->response_proc,
617                          h,
618                          GNUNET_TIME_absolute_get_remaining (qe->timeout));
619   GNUNET_STATISTICS_update (h->stats,
620                             gettext_noop ("# bytes sent to datastore"),
621                             1,
622                             GNUNET_NO);
623   return msize;
624 }
625
626
627 /**
628  * Process entries in the queue (or do nothing if we are already
629  * doing so).
630  * 
631  * @param h handle to the datastore
632  */
633 static void
634 process_queue (struct GNUNET_DATASTORE_Handle *h)
635 {
636   struct GNUNET_DATASTORE_QueueEntry *qe;
637
638   if (NULL == (qe = h->queue_head))
639     {
640 #if DEBUG_DATASTORE > 1
641       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
642                   "Queue empty\n");
643 #endif
644       return; /* no entry in queue */
645     }
646   if (qe->was_transmitted == GNUNET_YES)
647     {
648 #if DEBUG_DATASTORE > 1
649       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
650                   "Head request already transmitted\n");
651 #endif
652       return; /* waiting for replies */
653     }
654   if (h->th != NULL)
655     {
656 #if DEBUG_DATASTORE > 1
657       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
658                   "Pending transmission request\n");
659 #endif
660       return; /* request pending */
661     }
662   if (h->client == NULL)
663     {
664 #if DEBUG_DATASTORE > 1
665       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
666                   "Not connected\n");
667 #endif
668       return; /* waiting for reconnect */
669     }
670   if (GNUNET_YES == h->in_receive)
671     {
672       /* wait for response to previous query */
673       return; 
674     }
675 #if DEBUG_DATASTORE
676   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
677               "Queueing %u byte request to DATASTORE\n",
678               qe->message_size);
679 #endif
680   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
681                                                qe->message_size,
682                                                GNUNET_TIME_absolute_get_remaining (qe->timeout),
683                                                GNUNET_YES,
684                                                &transmit_request,
685                                                h);
686   GNUNET_assert (GNUNET_NO == h->in_receive);
687   GNUNET_break (NULL != h->th);
688 }
689
690
691 /**
692  * Dummy continuation used to do nothing (but be non-zero).
693  *
694  * @param cls closure
695  * @param result result 
696  * @param emsg error message
697  */
698 static void
699 drop_status_cont (void *cls, int32_t result, const char *emsg)
700 {
701   /* do nothing */
702 }
703
704
705 /**
706  * Free a queue entry.  Removes the given entry from the
707  * queue and releases associated resources.  Does NOT
708  * call the callback.
709  * 
710  * @param qe entry to free.
711  */
712 static void
713 free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
714 {
715   struct GNUNET_DATASTORE_Handle *h = qe->h;
716
717   GNUNET_CONTAINER_DLL_remove (h->queue_head,
718                                h->queue_tail,
719                                qe);
720   if (qe->task != GNUNET_SCHEDULER_NO_TASK)
721     {
722       GNUNET_SCHEDULER_cancel (qe->task);
723       qe->task = GNUNET_SCHEDULER_NO_TASK;
724     }
725   h->queue_size--;
726   qe->was_transmitted = GNUNET_SYSERR; /* use-after-free warning */
727   GNUNET_free (qe);
728 }
729
730
731 /**
732  * Type of a function to call when we receive a message
733  * from the service.
734  *
735  * @param cls closure
736  * @param msg message received, NULL on timeout or fatal error
737  */
738 static void 
739 process_status_message (void *cls,
740                         const struct
741                         GNUNET_MessageHeader * msg)
742 {
743   struct GNUNET_DATASTORE_Handle *h = cls;
744   struct GNUNET_DATASTORE_QueueEntry *qe;
745   struct StatusContext rc;
746   const struct StatusMessage *sm;
747   const char *emsg;
748   int32_t status;
749   int was_transmitted;
750
751   h->in_receive = GNUNET_NO;
752   if (h->skip_next_messages > 0)
753     {
754       h->skip_next_messages--;
755       process_queue (h);
756       return;
757    } 
758   if (NULL == (qe = h->queue_head))
759     {
760       GNUNET_break (0);
761       do_disconnect (h);
762       return;
763     }
764   rc = qe->qc.sc;
765   if (msg == NULL)
766     {      
767       was_transmitted = qe->was_transmitted;
768       free_queue_entry (qe);
769       if (NULL == h->client)
770         return; /* forced disconnect */
771       if (rc.cont != NULL)
772         rc.cont (rc.cont_cls, 
773                  GNUNET_SYSERR,
774                  _("Failed to receive status response from database."));
775       if (was_transmitted == GNUNET_YES)
776         do_disconnect (h);
777       else
778         process_queue (h);
779       return;
780     }
781   GNUNET_assert (GNUNET_YES == qe->was_transmitted);
782   free_queue_entry (qe);
783   if ( (ntohs(msg->size) < sizeof(struct StatusMessage)) ||
784        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS) ) 
785     {
786       GNUNET_break (0);
787       h->retry_time = GNUNET_TIME_UNIT_ZERO;
788       do_disconnect (h);
789       if (rc.cont != NULL)
790         rc.cont (rc.cont_cls, 
791                  GNUNET_SYSERR,
792                  _("Error reading response from datastore service"));
793       return;
794     }
795   sm = (const struct StatusMessage*) msg;
796   status = ntohl(sm->status);
797   emsg = NULL;
798   if (ntohs(msg->size) > sizeof(struct StatusMessage))
799     {
800       emsg = (const char*) &sm[1];
801       if (emsg[ntohs(msg->size) - sizeof(struct StatusMessage) - 1] != '\0')
802         {
803           GNUNET_break (0);
804           emsg = _("Invalid error message received from datastore service");
805         }
806     }  
807   if ( (status == GNUNET_SYSERR) &&
808        (emsg == NULL) )
809     {
810       GNUNET_break (0);
811       emsg = _("Invalid error message received from datastore service");
812     }
813 #if DEBUG_DATASTORE
814   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
815               "Received status %d/%s\n",
816               (int) status,
817               emsg);
818 #endif
819   GNUNET_STATISTICS_update (h->stats,
820                             gettext_noop ("# status messages received"),
821                             1,
822                             GNUNET_NO);
823   h->retry_time.rel_value = 0;
824   process_queue (h);
825   if (rc.cont != NULL)
826     rc.cont (rc.cont_cls, 
827              status,
828              emsg);
829 }
830
831
832 /**
833  * Store an item in the datastore.  If the item is already present,
834  * the priorities are summed up and the higher expiration time and
835  * lower anonymity level is used.
836  *
837  * @param h handle to the datastore
838  * @param rid reservation ID to use (from "reserve"); use 0 if no
839  *            prior reservation was made
840  * @param key key for the value
841  * @param size number of bytes in data
842  * @param data content stored
843  * @param type type of the content
844  * @param priority priority of the content
845  * @param anonymity anonymity-level for the content
846  * @param replication how often should the content be replicated to other peers?
847  * @param expiration expiration time for the content
848  * @param queue_priority ranking of this request in the priority queue
849  * @param max_queue_size at what queue size should this request be dropped
850  *        (if other requests of higher priority are in the queue)
851  * @param timeout timeout for the operation
852  * @param cont continuation to call when done
853  * @param cont_cls closure for cont
854  * @return NULL if the entry was not queued, otherwise a handle that can be used to
855  *         cancel; note that even if NULL is returned, the callback will be invoked
856  *         (or rather, will already have been invoked)
857  */
858 struct GNUNET_DATASTORE_QueueEntry *
859 GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
860                       uint32_t rid,
861                       const GNUNET_HashCode * key,
862                       size_t size,
863                       const void *data,
864                       enum GNUNET_BLOCK_Type type,
865                       uint32_t priority,
866                       uint32_t anonymity,
867                       uint32_t replication,
868                       struct GNUNET_TIME_Absolute expiration,
869                       unsigned int queue_priority,
870                       unsigned int max_queue_size,
871                       struct GNUNET_TIME_Relative timeout,
872                       GNUNET_DATASTORE_ContinuationWithStatus cont,
873                       void *cont_cls)
874 {
875   struct GNUNET_DATASTORE_QueueEntry *qe;
876   struct DataMessage *dm;
877   size_t msize;
878   union QueueContext qc;
879
880 #if DEBUG_DATASTORE
881   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
882               "Asked to put %u bytes of data under key `%s' for %llu ms\n",
883               size,
884               GNUNET_h2s (key),
885               GNUNET_TIME_absolute_get_remaining (expiration).rel_value);
886 #endif
887   msize = sizeof(struct DataMessage) + size;
888   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
889   qc.sc.cont = cont;
890   qc.sc.cont_cls = cont_cls;
891   qe = make_queue_entry (h, msize,
892                          queue_priority, max_queue_size, timeout,
893                          &process_status_message, &qc);
894   if (qe == NULL)
895     {
896 #if DEBUG_DATASTORE
897       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
898                   "Could not create queue entry for PUT\n");
899 #endif
900       return NULL;
901     }
902   GNUNET_STATISTICS_update (h->stats,
903                             gettext_noop ("# PUT requests executed"),
904                             1,
905                             GNUNET_NO);
906   dm = (struct DataMessage* ) &qe[1];
907   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
908   dm->header.size = htons(msize);
909   dm->rid = htonl(rid);
910   dm->size = htonl( (uint32_t) size);
911   dm->type = htonl(type);
912   dm->priority = htonl(priority);
913   dm->anonymity = htonl(anonymity);
914   dm->replication = htonl (replication);
915   dm->reserved = htonl (0);
916   dm->uid = GNUNET_htonll(0);
917   dm->expiration = GNUNET_TIME_absolute_hton(expiration);
918   dm->key = *key;
919   memcpy (&dm[1], data, size);
920   process_queue (h);
921   return qe;
922 }
923
924
925 /**
926  * Reserve space in the datastore.  This function should be used
927  * to avoid "out of space" failures during a longer sequence of "put"
928  * operations (for example, when a file is being inserted).
929  *
930  * @param h handle to the datastore
931  * @param amount how much space (in bytes) should be reserved (for content only)
932  * @param entries how many entries will be created (to calculate per-entry overhead)
933  * @param queue_priority ranking of this request in the priority queue
934  * @param max_queue_size at what queue size should this request be dropped
935  *        (if other requests of higher priority are in the queue)
936  * @param timeout how long to wait at most for a response (or before dying in queue)
937  * @param cont continuation to call when done; "success" will be set to
938  *             a positive reservation value if space could be reserved.
939  * @param cont_cls closure for cont
940  * @return NULL if the entry was not queued, otherwise a handle that can be used to
941  *         cancel; note that even if NULL is returned, the callback will be invoked
942  *         (or rather, will already have been invoked)
943  */
944 struct GNUNET_DATASTORE_QueueEntry *
945 GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
946                           uint64_t amount,
947                           uint32_t entries,
948                           unsigned int queue_priority,
949                           unsigned int max_queue_size,
950                           struct GNUNET_TIME_Relative timeout,
951                           GNUNET_DATASTORE_ContinuationWithStatus cont,
952                           void *cont_cls)
953 {
954   struct GNUNET_DATASTORE_QueueEntry *qe;
955   struct ReserveMessage *rm;
956   union QueueContext qc;
957
958   if (cont == NULL)
959     cont = &drop_status_cont;
960 #if DEBUG_DATASTORE
961   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
962               "Asked to reserve %llu bytes of data and %u entries'\n",
963               (unsigned long long) amount,
964               (unsigned int) entries);
965 #endif
966   qc.sc.cont = cont;
967   qc.sc.cont_cls = cont_cls;
968   qe = make_queue_entry (h, sizeof(struct ReserveMessage),
969                          queue_priority, max_queue_size, timeout,
970                          &process_status_message, &qc);
971   if (qe == NULL)
972     {
973 #if DEBUG_DATASTORE
974       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
975                   "Could not create queue entry to reserve\n");
976 #endif
977       return NULL;
978     }
979   GNUNET_STATISTICS_update (h->stats,
980                             gettext_noop ("# RESERVE requests executed"),
981                             1,
982                             GNUNET_NO);
983   rm = (struct ReserveMessage*) &qe[1];
984   rm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
985   rm->header.size = htons(sizeof (struct ReserveMessage));
986   rm->entries = htonl(entries);
987   rm->amount = GNUNET_htonll(amount);
988   process_queue (h);
989   return qe;
990 }
991
992
993 /**
994  * Signal that all of the data for which a reservation was made has
995  * been stored and that whatever excess space might have been reserved
996  * can now be released.
997  *
998  * @param h handle to the datastore
999  * @param rid reservation ID (value of "success" in original continuation
1000  *        from the "reserve" function).
1001  * @param queue_priority ranking of this request in the priority queue
1002  * @param max_queue_size at what queue size should this request be dropped
1003  *        (if other requests of higher priority are in the queue)
1004  * @param queue_priority ranking of this request in the priority queue
1005  * @param max_queue_size at what queue size should this request be dropped
1006  *        (if other requests of higher priority are in the queue)
1007  * @param timeout how long to wait at most for a response
1008  * @param cont continuation to call when done
1009  * @param cont_cls closure for cont
1010  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1011  *         cancel; note that even if NULL is returned, the callback will be invoked
1012  *         (or rather, will already have been invoked)
1013  */
1014 struct GNUNET_DATASTORE_QueueEntry *
1015 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
1016                                   uint32_t rid,
1017                                   unsigned int queue_priority,
1018                                   unsigned int max_queue_size,
1019                                   struct GNUNET_TIME_Relative timeout,
1020                                   GNUNET_DATASTORE_ContinuationWithStatus cont,
1021                                   void *cont_cls)
1022 {
1023   struct GNUNET_DATASTORE_QueueEntry *qe;
1024   struct ReleaseReserveMessage *rrm;
1025   union QueueContext qc;
1026
1027   if (cont == NULL)
1028     cont = &drop_status_cont;
1029 #if DEBUG_DATASTORE
1030   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1031               "Asked to release reserve %d\n",
1032               rid);
1033 #endif
1034   qc.sc.cont = cont;
1035   qc.sc.cont_cls = cont_cls;
1036   qe = make_queue_entry (h, sizeof(struct ReleaseReserveMessage),
1037                          queue_priority, max_queue_size, timeout,
1038                          &process_status_message, &qc);
1039   if (qe == NULL)
1040     {
1041 #if DEBUG_DATASTORE
1042       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1043                   "Could not create queue entry to release reserve\n");
1044 #endif
1045       return NULL;
1046     }
1047   GNUNET_STATISTICS_update (h->stats,
1048                             gettext_noop ("# RELEASE RESERVE requests executed"),
1049                             1,
1050                             GNUNET_NO);
1051   rrm = (struct ReleaseReserveMessage*) &qe[1];
1052   rrm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
1053   rrm->header.size = htons(sizeof (struct ReleaseReserveMessage));
1054   rrm->rid = htonl(rid);
1055   process_queue (h);
1056   return qe;
1057 }
1058
1059
1060 /**
1061  * Update a value in the datastore.
1062  *
1063  * @param h handle to the datastore
1064  * @param uid identifier for the value
1065  * @param priority how much to increase the priority of the value
1066  * @param expiration new expiration value should be MAX of existing and this argument
1067  * @param queue_priority ranking of this request in the priority queue
1068  * @param max_queue_size at what queue size should this request be dropped
1069  *        (if other requests of higher priority are in the queue)
1070  * @param timeout how long to wait at most for a response
1071  * @param cont continuation to call when done
1072  * @param cont_cls closure for cont
1073  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1074  *         cancel; note that even if NULL is returned, the callback will be invoked
1075  *         (or rather, will already have been invoked)
1076  */
1077 struct GNUNET_DATASTORE_QueueEntry *
1078 GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
1079                          uint64_t uid,
1080                          uint32_t priority,
1081                          struct GNUNET_TIME_Absolute expiration,
1082                          unsigned int queue_priority,
1083                          unsigned int max_queue_size,
1084                          struct GNUNET_TIME_Relative timeout,
1085                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1086                          void *cont_cls)
1087 {
1088   struct GNUNET_DATASTORE_QueueEntry *qe;
1089   struct UpdateMessage *um;
1090   union QueueContext qc;
1091
1092   if (cont == NULL)
1093     cont = &drop_status_cont;
1094 #if DEBUG_DATASTORE
1095   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1096               "Asked to update entry %llu raising priority by %u and expiration to %llu\n",
1097               uid,
1098               (unsigned int) priority,
1099               (unsigned long long) expiration.abs_value);
1100 #endif
1101   qc.sc.cont = cont;
1102   qc.sc.cont_cls = cont_cls;
1103   qe = make_queue_entry (h, sizeof(struct UpdateMessage),
1104                          queue_priority, max_queue_size, timeout,
1105                          &process_status_message, &qc);
1106   if (qe == NULL)
1107     {
1108 #if DEBUG_DATASTORE
1109       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1110                   "Could not create queue entry for UPDATE\n");
1111 #endif
1112       return NULL;
1113     }
1114   GNUNET_STATISTICS_update (h->stats,
1115                             gettext_noop ("# UPDATE requests executed"),
1116                             1,
1117                             GNUNET_NO);
1118   um = (struct UpdateMessage*) &qe[1];
1119   um->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
1120   um->header.size = htons(sizeof (struct UpdateMessage));
1121   um->priority = htonl(priority);
1122   um->expiration = GNUNET_TIME_absolute_hton(expiration);
1123   um->uid = GNUNET_htonll(uid);
1124   process_queue (h);
1125   return qe;
1126 }
1127
1128
1129 /**
1130  * Explicitly remove some content from the database.
1131  * The "cont"inuation will be called with status
1132  * "GNUNET_OK" if content was removed, "GNUNET_NO"
1133  * if no matching entry was found and "GNUNET_SYSERR"
1134  * on all other types of errors.
1135  *
1136  * @param h handle to the datastore
1137  * @param key key for the value
1138  * @param size number of bytes in data
1139  * @param data content stored
1140  * @param queue_priority ranking of this request in the priority queue
1141  * @param max_queue_size at what queue size should this request be dropped
1142  *        (if other requests of higher priority are in the queue)
1143  * @param timeout how long to wait at most for a response
1144  * @param cont continuation to call when done
1145  * @param cont_cls closure for cont
1146  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1147  *         cancel; note that even if NULL is returned, the callback will be invoked
1148  *         (or rather, will already have been invoked)
1149  */
1150 struct GNUNET_DATASTORE_QueueEntry *
1151 GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
1152                          const GNUNET_HashCode *key,
1153                          size_t size,
1154                          const void *data,
1155                          unsigned int queue_priority,
1156                          unsigned int max_queue_size,
1157                          struct GNUNET_TIME_Relative timeout,
1158                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1159                          void *cont_cls)
1160 {
1161   struct GNUNET_DATASTORE_QueueEntry *qe;
1162   struct DataMessage *dm;
1163   size_t msize;
1164   union QueueContext qc;
1165
1166   if (cont == NULL)
1167     cont = &drop_status_cont;
1168 #if DEBUG_DATASTORE
1169   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1170               "Asked to remove %u bytes under key `%s'\n",
1171               size,
1172               GNUNET_h2s (key));
1173 #endif
1174   qc.sc.cont = cont;
1175   qc.sc.cont_cls = cont_cls;
1176   msize = sizeof(struct DataMessage) + size;
1177   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1178   qe = make_queue_entry (h, msize,
1179                          queue_priority, max_queue_size, timeout,
1180                          &process_status_message, &qc);
1181   if (qe == NULL)
1182     {
1183 #if DEBUG_DATASTORE
1184       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1185                   "Could not create queue entry for REMOVE\n");
1186 #endif
1187       return NULL;
1188     }
1189   GNUNET_STATISTICS_update (h->stats,
1190                             gettext_noop ("# REMOVE requests executed"),
1191                             1,
1192                             GNUNET_NO);
1193   dm = (struct DataMessage*) &qe[1];
1194   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
1195   dm->header.size = htons(msize);
1196   dm->rid = htonl(0);
1197   dm->size = htonl(size);
1198   dm->type = htonl(0);
1199   dm->priority = htonl(0);
1200   dm->anonymity = htonl(0);
1201   dm->uid = GNUNET_htonll(0);
1202   dm->expiration = GNUNET_TIME_absolute_hton(GNUNET_TIME_UNIT_ZERO_ABS);
1203   dm->key = *key;
1204   memcpy (&dm[1], data, size);
1205   process_queue (h);
1206   return qe;
1207 }
1208
1209
1210 /**
1211  * Type of a function to call when we receive a message
1212  * from the service.
1213  *
1214  * @param cls closure
1215  * @param msg message received, NULL on timeout or fatal error
1216  */
1217 static void 
1218 process_result_message (void *cls,
1219                         const struct GNUNET_MessageHeader *msg)
1220 {
1221   struct GNUNET_DATASTORE_Handle *h = cls;
1222   struct GNUNET_DATASTORE_QueueEntry *qe;
1223   struct ResultContext rc;
1224   const struct DataMessage *dm;
1225
1226   h->in_receive = GNUNET_NO;
1227   if (h->skip_next_messages > 0)
1228     {
1229       h->skip_next_messages--;
1230       process_queue (h);
1231       return;
1232     }
1233   if (msg == NULL)
1234     {
1235       qe = h->queue_head;
1236       GNUNET_assert (NULL != qe);
1237       if (qe->was_transmitted == GNUNET_YES)
1238         {
1239           rc = qe->qc.rc;
1240           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1241                       _("Failed to receive response from database.\n"));
1242           do_disconnect (h);
1243           free_queue_entry (qe);
1244           if (rc.proc != NULL)
1245             rc.proc (rc.proc_cls,
1246                      NULL, 0, NULL, 0, 0, 0, 
1247                      GNUNET_TIME_UNIT_ZERO_ABS, 0);    
1248         }
1249       else
1250         process_queue (h);
1251       return;
1252     }
1253   if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) 
1254     {
1255       GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader));
1256       qe = h->queue_head;
1257       rc = qe->qc.rc;
1258       GNUNET_assert (GNUNET_YES == qe->was_transmitted);
1259       free_queue_entry (qe);
1260 #if DEBUG_DATASTORE
1261       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1262                   "Received end of result set, new queue size is %u\n",
1263                   h->queue_size);
1264 #endif
1265       if (rc.proc != NULL)
1266         rc.proc (rc.proc_cls,
1267                  NULL, 0, NULL, 0, 0, 0, 
1268                  GNUNET_TIME_UNIT_ZERO_ABS, 0); 
1269       h->retry_time.rel_value = 0;
1270       h->result_count = 0;
1271       process_queue (h);
1272       return;
1273     }
1274   qe = h->queue_head;
1275   rc = qe->qc.rc;
1276   GNUNET_assert (GNUNET_YES == qe->was_transmitted);
1277   if ( (ntohs(msg->size) < sizeof(struct DataMessage)) ||
1278        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) ||
1279        (ntohs(msg->size) != sizeof(struct DataMessage) + ntohl (((const struct DataMessage*)msg)->size)) )
1280     {
1281       GNUNET_break (0);
1282       free_queue_entry (qe);
1283       h->retry_time = GNUNET_TIME_UNIT_ZERO;
1284       do_disconnect (h);
1285       if (rc.proc != NULL)
1286         rc.proc (rc.proc_cls,
1287                  NULL, 0, NULL, 0, 0, 0, 
1288                  GNUNET_TIME_UNIT_ZERO_ABS, 0);
1289       return;
1290     }
1291   GNUNET_STATISTICS_update (h->stats,
1292                             gettext_noop ("# Results received"),
1293                             1,
1294                             GNUNET_NO);
1295   dm = (const struct DataMessage*) msg;
1296 #if DEBUG_DATASTORE
1297   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1298               "Received result %llu with type %u and size %u with key %s\n",
1299               (unsigned long long) GNUNET_ntohll(dm->uid),
1300               ntohl(dm->type),
1301               ntohl(dm->size),
1302               GNUNET_h2s(&dm->key));
1303 #endif
1304   free_queue_entry (qe);
1305   h->retry_time.rel_value = 0;
1306   process_queue (h);
1307   if (rc.proc != NULL)
1308     rc.proc (rc.proc_cls,
1309              &dm->key,
1310              ntohl(dm->size),
1311              &dm[1],
1312              ntohl(dm->type),
1313              ntohl(dm->priority),
1314              ntohl(dm->anonymity),
1315              GNUNET_TIME_absolute_ntoh(dm->expiration), 
1316              GNUNET_ntohll(dm->uid));
1317 }
1318
1319
1320 /**
1321  * Get a random value from the datastore for content replication.
1322  * Returns a single, random value among those with the highest
1323  * replication score, lowering positive replication scores by one for
1324  * the chosen value (if only content with a replication score exists,
1325  * a random value is returned and replication scores are not changed).
1326  *
1327  * @param h handle to the datastore
1328  * @param queue_priority ranking of this request in the priority queue
1329  * @param max_queue_size at what queue size should this request be dropped
1330  *        (if other requests of higher priority are in the queue)
1331  * @param timeout how long to wait at most for a response
1332  * @param proc function to call on a random value; it
1333  *        will be called once with a value (if available)
1334  *        and always once with a value of NULL.
1335  * @param proc_cls closure for proc
1336  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1337  *         cancel
1338  */
1339 struct GNUNET_DATASTORE_QueueEntry *
1340 GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
1341                                       unsigned int queue_priority,
1342                                       unsigned int max_queue_size,
1343                                       struct GNUNET_TIME_Relative timeout,
1344                                       GNUNET_DATASTORE_DatumProcessor proc, 
1345                                       void *proc_cls)
1346 {
1347   struct GNUNET_DATASTORE_QueueEntry *qe;
1348   struct GNUNET_MessageHeader *m;
1349   union QueueContext qc;
1350
1351   GNUNET_assert (NULL != proc);
1352 #if DEBUG_DATASTORE
1353   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1354               "Asked to get replication entry in %llu ms\n",
1355               (unsigned long long) timeout.rel_value);
1356 #endif
1357   qc.rc.proc = proc;
1358   qc.rc.proc_cls = proc_cls;
1359   qe = make_queue_entry (h, sizeof(struct GNUNET_MessageHeader),
1360                          queue_priority, max_queue_size, timeout,
1361                          &process_result_message, &qc);
1362   if (qe == NULL)
1363     {
1364 #if DEBUG_DATASTORE
1365       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1366                   "Could not create queue entry for GET REPLICATION\n");
1367 #endif
1368       return NULL;    
1369     }
1370   GNUNET_STATISTICS_update (h->stats,
1371                             gettext_noop ("# GET REPLICATION requests executed"),
1372                             1,
1373                             GNUNET_NO);
1374   m = (struct GNUNET_MessageHeader*) &qe[1];
1375   m->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION);
1376   m->size = htons(sizeof (struct GNUNET_MessageHeader));
1377   process_queue (h);
1378   return qe;
1379 }
1380
1381
1382 /**
1383  * Get a single zero-anonymity value from the datastore.
1384  *
1385  * @param h handle to the datastore
1386  * @param offset offset of the result (mod #num-results); set to
1387  *               a random 64-bit value initially; then increment by
1388  *               one each time; detect that all results have been found by uid
1389  *               being again the first uid ever returned.
1390  * @param queue_priority ranking of this request in the priority queue
1391  * @param max_queue_size at what queue size should this request be dropped
1392  *        (if other requests of higher priority are in the queue)
1393  * @param timeout how long to wait at most for a response
1394  * @param type allowed type for the operation (never zero)
1395  * @param proc function to call on a random value; it
1396  *        will be called once with a value (if available)
1397  *        or with NULL if none value exists.
1398  * @param proc_cls closure for proc
1399  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1400  *         cancel
1401  */
1402 struct GNUNET_DATASTORE_QueueEntry *
1403 GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1404                                      uint64_t offset,
1405                                      unsigned int queue_priority,
1406                                      unsigned int max_queue_size,
1407                                      struct GNUNET_TIME_Relative timeout,
1408                                      enum GNUNET_BLOCK_Type type,
1409                                      GNUNET_DATASTORE_DatumProcessor proc, 
1410                                      void *proc_cls)
1411 {
1412   struct GNUNET_DATASTORE_QueueEntry *qe;
1413   struct GetZeroAnonymityMessage *m;
1414   union QueueContext qc;
1415
1416   GNUNET_assert (NULL != proc);
1417   GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY);
1418 #if DEBUG_DATASTORE
1419   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1420               "Asked to get %llu-th zero-anonymity entry of type %d in %llu ms\n",
1421               (unsigned long long) offset,
1422               type,
1423               (unsigned long long) timeout.rel_value);
1424 #endif
1425   qc.rc.proc = proc;
1426   qc.rc.proc_cls = proc_cls;
1427   qe = make_queue_entry (h, sizeof(struct GetZeroAnonymityMessage),
1428                          queue_priority, max_queue_size, timeout,
1429                          &process_result_message, &qc);
1430   if (qe == NULL)
1431     {
1432 #if DEBUG_DATASTORE
1433       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1434                   "Could not create queue entry for zero-anonymity procation\n");
1435 #endif
1436       return NULL;    
1437     }
1438   GNUNET_STATISTICS_update (h->stats,
1439                             gettext_noop ("# GET ZERO ANONYMITY requests executed"),
1440                             1,
1441                             GNUNET_NO);
1442   m = (struct GetZeroAnonymityMessage*) &qe[1];
1443   m->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY);
1444   m->header.size = htons(sizeof (struct GetZeroAnonymityMessage));
1445   m->type = htonl ((uint32_t) type);
1446   m->offset = GNUNET_htonll (offset);
1447   process_queue (h);
1448   return qe;
1449 }
1450
1451
1452 /**
1453  * Get a result for a particular key from the datastore.  The processor
1454  * will only be called once.
1455  *
1456  * @param h handle to the datastore
1457  * @param offset offset of the result (mod #num-results); set to
1458  *               a random 64-bit value initially; then increment by
1459  *               one each time; detect that all results have been found by uid
1460  *               being again the first uid ever returned.
1461  * @param key maybe NULL (to match all entries)
1462  * @param type desired type, 0 for any
1463  * @param queue_priority ranking of this request in the priority queue
1464  * @param max_queue_size at what queue size should this request be dropped
1465  *        (if other requests of higher priority are in the queue)
1466  * @param timeout how long to wait at most for a response
1467  * @param proc function to call on each matching value;
1468  *        will be called once with a NULL value at the end
1469  * @param proc_cls closure for proc
1470  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1471  *         cancel
1472  */
1473 struct GNUNET_DATASTORE_QueueEntry *
1474 GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h,
1475                           uint64_t offset,
1476                           const GNUNET_HashCode * key,
1477                           enum GNUNET_BLOCK_Type type,
1478                           unsigned int queue_priority,
1479                           unsigned int max_queue_size,
1480                           struct GNUNET_TIME_Relative timeout,
1481                           GNUNET_DATASTORE_DatumProcessor proc, 
1482                           void *proc_cls)
1483 {
1484   struct GNUNET_DATASTORE_QueueEntry *qe;
1485   struct GetMessage *gm;
1486   union QueueContext qc;
1487
1488   GNUNET_assert (NULL != proc);
1489 #if DEBUG_DATASTORE
1490   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1491               "Asked to look for data of type %u under key `%s'\n",
1492               (unsigned int) type,
1493               GNUNET_h2s (key));
1494 #endif
1495   qc.rc.proc = proc;
1496   qc.rc.proc_cls = proc_cls;
1497   qe = make_queue_entry (h, sizeof(struct GetMessage),
1498                          queue_priority, max_queue_size, timeout,
1499                          &process_result_message, &qc);
1500   if (qe == NULL)
1501     {
1502 #if DEBUG_DATASTORE
1503       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1504                   "Could not queue request for `%s'\n",
1505                   GNUNET_h2s (key));
1506 #endif
1507       return NULL;
1508     }
1509   GNUNET_STATISTICS_update (h->stats,
1510                             gettext_noop ("# GET requests executed"),
1511                             1,
1512                             GNUNET_NO);
1513   gm = (struct GetMessage*) &qe[1];
1514   gm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1515   gm->type = htonl(type);
1516   gm->offset = GNUNET_htonll (offset);
1517   if (key != NULL)
1518     {
1519       gm->header.size = htons(sizeof (struct GetMessage));
1520       gm->key = *key;
1521     }
1522   else
1523     {
1524       gm->header.size = htons(sizeof (struct GetMessage) - sizeof(GNUNET_HashCode));
1525     }
1526   process_queue (h);
1527   return qe;
1528 }
1529
1530
1531 /**
1532  * Cancel a datastore operation.  The final callback from the
1533  * operation must not have been done yet.
1534  * 
1535  * @param qe operation to cancel
1536  */
1537 void
1538 GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
1539 {
1540   struct GNUNET_DATASTORE_Handle *h;
1541
1542   GNUNET_assert (GNUNET_SYSERR != qe->was_transmitted);
1543   h = qe->h;
1544 #if DEBUG_DATASTORE
1545   GNUNET_log  (GNUNET_ERROR_TYPE_DEBUG,
1546                "Pending DATASTORE request %p cancelled (%d, %d)\n",
1547                qe,
1548                qe->was_transmitted,
1549                h->queue_head == qe);
1550 #endif
1551   if (GNUNET_YES == qe->was_transmitted) 
1552     {
1553       free_queue_entry (qe);
1554       h->skip_next_messages++;
1555       return;
1556     }
1557   free_queue_entry (qe);
1558   process_queue (h);
1559 }
1560
1561
1562 /* end of datastore_api.c */