6bf9abacaa488188747bf85f2983538a0696ad79
[oweals/gnunet.git] / src / datastore / datastore_api.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/datastore_api.c
23  * @brief Management for the datastore for files stored on a GNUnet node.  Implements
24  *        a priority queue for requests (with timeouts).
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet_arm_service.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_datastore_service.h"
31 #include "gnunet_statistics_service.h"
32 #include "datastore.h"
33
34 /**
35  * If a client stopped asking for more results, how many more do
36  * we receive from the DB before killing the connection?  Trade-off
37  * between re-doing TCP handshakes and (needlessly) receiving 
38  * useless results.
39  */
40 #define MAX_EXCESS_RESULTS 8
41
42 /**
43  * Context for processing status messages.
44  */
45 struct StatusContext
46 {
47   /**
48    * Continuation to call with the status.
49    */
50   GNUNET_DATASTORE_ContinuationWithStatus cont;
51
52   /**
53    * Closure for cont.
54    */
55   void *cont_cls;
56
57 };
58
59
60 /**
61  * Context for processing result messages.
62  */
63 struct ResultContext
64 {
65   /**
66    * Function to call with the result.
67    */
68   GNUNET_DATASTORE_DatumProcessor proc;
69
70   /**
71    * Closure for proc.
72    */
73   void *proc_cls;
74
75 };
76
77
78 /**
79  *  Context for a queue operation.
80  */
81 union QueueContext
82 {
83
84   struct StatusContext sc;
85   
86   struct ResultContext rc;
87
88 };
89
90
91
92 /**
93  * Entry in our priority queue.
94  */
95 struct GNUNET_DATASTORE_QueueEntry
96 {
97
98   /**
99    * This is a linked list.
100    */
101   struct GNUNET_DATASTORE_QueueEntry *next;
102
103   /**
104    * This is a linked list.
105    */
106   struct GNUNET_DATASTORE_QueueEntry *prev;
107
108   /**
109    * Handle to the master context.
110    */
111   struct GNUNET_DATASTORE_Handle *h;
112
113   /**
114    * Response processor (NULL if we are not waiting for a response).
115    * This struct should be used for the closure, function-specific
116    * arguments can be passed via 'qc'.
117    */
118   GNUNET_CLIENT_MessageHandler response_proc;
119
120   /**
121    * Function to call after transmission of the request.
122    */
123   GNUNET_DATASTORE_ContinuationWithStatus cont;
124    
125   /**
126    * Closure for 'cont'.
127    */
128   void *cont_cls;
129
130   /**
131    * Context for the operation.
132    */
133   union QueueContext qc;
134
135   /**
136    * Task for timeout signalling.
137    */
138   GNUNET_SCHEDULER_TaskIdentifier task;
139
140   /**
141    * Timeout for the current operation.
142    */
143   struct GNUNET_TIME_Absolute timeout;
144
145   /**
146    * Priority in the queue.
147    */
148   unsigned int priority;
149
150   /**
151    * Maximum allowed length of queue (otherwise
152    * this request should be discarded).
153    */
154   unsigned int max_queue;
155
156   /**
157    * Number of bytes in the request message following
158    * this struct.  32-bit value for nicer memory
159    * access (and overall struct alignment).
160    */
161   uint32_t message_size;
162
163   /**
164    * Has this message been transmitted to the service?
165    * Only ever GNUNET_YES for the head of the queue.
166    * Note that the overall struct should end at a 
167    * multiple of 64 bits.
168    */
169   int was_transmitted;
170   
171 };
172
173 /**
174  * Handle to the datastore service. 
175  */
176 struct GNUNET_DATASTORE_Handle
177 {
178
179   /**
180    * Our configuration.
181    */
182   const struct GNUNET_CONFIGURATION_Handle *cfg;
183
184   /**
185    * Current connection to the datastore service.
186    */
187   struct GNUNET_CLIENT_Connection *client;
188
189   /**
190    * Handle for statistics.
191    */
192   struct GNUNET_STATISTICS_Handle *stats;
193
194   /**
195    * Current transmit handle.
196    */
197   struct GNUNET_CLIENT_TransmitHandle *th;
198
199   /**
200    * Current head of priority queue.
201    */
202   struct GNUNET_DATASTORE_QueueEntry *queue_head;
203
204   /**
205    * Current tail of priority queue.
206    */
207   struct GNUNET_DATASTORE_QueueEntry *queue_tail;
208
209   /**
210    * Task for trying to reconnect.
211    */
212   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
213
214   /**
215    * How quickly should we retry?  Used for exponential back-off on
216    * connect-errors.
217    */
218   struct GNUNET_TIME_Relative retry_time;
219
220   /**
221    * Number of entries in the queue.
222    */
223   unsigned int queue_size;
224
225   /**
226    * Number of results we're receiving for the current query
227    * after application stopped to care.  Used to determine when
228    * to reset the connection.
229    */
230   unsigned int result_count;
231
232   /**
233    * Are we currently trying to receive from the service?
234    */
235   int in_receive;
236
237   /**
238    * We should ignore the next message(s) from the service.
239    */
240   unsigned int skip_next_messages;
241
242 };
243
244
245
246 /**
247  * Connect to the datastore service.
248  *
249  * @param cfg configuration to use
250  * @return handle to use to access the service
251  */
252 struct GNUNET_DATASTORE_Handle *
253 GNUNET_DATASTORE_connect (const struct
254                           GNUNET_CONFIGURATION_Handle
255                           *cfg)
256 {
257   struct GNUNET_CLIENT_Connection *c;
258   struct GNUNET_DATASTORE_Handle *h;
259   
260   c = GNUNET_CLIENT_connect ("datastore", cfg);
261   if (c == NULL)
262     return NULL; /* oops */
263   h = GNUNET_malloc (sizeof(struct GNUNET_DATASTORE_Handle) + 
264                      GNUNET_SERVER_MAX_MESSAGE_SIZE - 1);
265   h->client = c;
266   h->cfg = cfg;
267   h->stats = GNUNET_STATISTICS_create ("datastore-api",
268                                        cfg);
269   return h;
270 }
271
272
273 /**
274  * Transmit DROP message to datastore service.
275  *
276  * @param cls the 'struct GNUNET_DATASTORE_Handle'
277  * @param size number of bytes that can be copied to buf
278  * @param buf where to copy the drop message
279  * @return number of bytes written to buf
280  */
281 static size_t
282 transmit_drop (void *cls,
283                size_t size, 
284                void *buf)
285 {
286   struct GNUNET_DATASTORE_Handle *h = cls;
287   struct GNUNET_MessageHeader *hdr;
288   
289   if (buf == NULL)
290     {
291       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
292                   _("Failed to transmit request to drop database.\n"));
293       GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
294       return 0;
295     }
296   GNUNET_assert (size >= sizeof(struct GNUNET_MessageHeader));
297   hdr = buf;
298   hdr->size = htons(sizeof(struct GNUNET_MessageHeader));
299   hdr->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
300   GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
301   return sizeof(struct GNUNET_MessageHeader);
302 }
303
304
305 /**
306  * Disconnect from the datastore service (and free
307  * associated resources).
308  *
309  * @param h handle to the datastore
310  * @param drop set to GNUNET_YES to delete all data in datastore (!)
311  */
312 void
313 GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
314                              int drop)
315 {
316   struct GNUNET_DATASTORE_QueueEntry *qe;
317
318   if (h->client != NULL)
319     {
320       GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
321       h->client = NULL;
322     }
323   if (h->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
324     {
325       GNUNET_SCHEDULER_cancel (h->reconnect_task);
326       h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
327     }
328   if (NULL != h->th)
329     {
330       GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
331       h->th = NULL;
332     }
333   while (NULL != (qe = h->queue_head))
334     {
335       GNUNET_assert (NULL != qe->response_proc);
336       qe->response_proc (h, NULL);
337     }
338   if (GNUNET_YES == drop) 
339     {
340       h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
341       if (h->client != NULL)
342         {
343           if (NULL != 
344               GNUNET_CLIENT_notify_transmit_ready (h->client,
345                                                    sizeof(struct GNUNET_MessageHeader),
346                                                    GNUNET_TIME_UNIT_MINUTES,
347                                                    GNUNET_YES,
348                                                    &transmit_drop,
349                                                    h))
350             return;
351           GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
352           h->client = NULL;
353         }
354       GNUNET_break (0);
355     }
356   GNUNET_STATISTICS_destroy (h->stats,
357                              GNUNET_NO);
358   h->stats = NULL;
359   GNUNET_free (h);
360 }
361
362
363 /**
364  * A request has timed out (before being transmitted to the service).
365  *
366  * @param cls the 'struct GNUNET_DATASTORE_QueueEntry'
367  * @param tc scheduler context
368  */
369 static void
370 timeout_queue_entry (void *cls,
371                      const struct GNUNET_SCHEDULER_TaskContext *tc)
372 {
373   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
374   int in_receive;
375
376   GNUNET_STATISTICS_update (qe->h->stats,
377                             gettext_noop ("# queue entry timeouts"),
378                             1,
379                             GNUNET_NO);
380   qe->task = GNUNET_SCHEDULER_NO_TASK;
381   GNUNET_assert (qe->was_transmitted == GNUNET_NO); 
382   in_receive = qe->h->in_receive;
383   qe->response_proc (qe->h, NULL);
384   qe->h->in_receive = in_receive;
385 }
386
387
388 /**
389  * Create a new entry for our priority queue (and possibly discard other entires if
390  * the queue is getting too long).
391  *
392  * @param h handle to the datastore
393  * @param msize size of the message to queue
394  * @param queue_priority priority of the entry
395  * @param max_queue_size at what queue size should this request be dropped
396  *        (if other requests of higher priority are in the queue)
397  * @param timeout timeout for the operation
398  * @param response_proc function to call with replies (can be NULL)
399  * @param qc client context (NOT a closure for response_proc)
400  * @return NULL if the queue is full 
401  */
402 static struct GNUNET_DATASTORE_QueueEntry *
403 make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
404                   size_t msize,
405                   unsigned int queue_priority,
406                   unsigned int max_queue_size,
407                   struct GNUNET_TIME_Relative timeout,
408                   GNUNET_CLIENT_MessageHandler response_proc,            
409                   const union QueueContext *qc)
410 {
411   struct GNUNET_DATASTORE_QueueEntry *ret;
412   struct GNUNET_DATASTORE_QueueEntry *pos;
413   unsigned int c;
414   int in_receive;
415
416   c = 0;
417   pos = h->queue_head;
418   while ( (pos != NULL) &&
419           (c < max_queue_size) &&
420           (pos->priority >= queue_priority) )
421     {
422       c++;
423       pos = pos->next;
424     }
425   if (c >= max_queue_size)
426     {
427       GNUNET_STATISTICS_update (h->stats,
428                                 gettext_noop ("# queue overflows"),
429                                 1,
430                                 GNUNET_NO);
431       return NULL;
432     }
433   ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize);
434   ret->h = h;
435   ret->response_proc = response_proc;
436   ret->qc = *qc;
437   ret->timeout = GNUNET_TIME_relative_to_absolute (timeout);
438   ret->priority = queue_priority;
439   ret->max_queue = max_queue_size;
440   ret->message_size = msize;
441   ret->was_transmitted = GNUNET_NO;
442   if (pos == NULL)
443     {
444       /* append at the tail */
445       pos = h->queue_tail;
446     }
447   else
448     {
449       pos = pos->prev; 
450       /* do not insert at HEAD if HEAD query was already
451          transmitted and we are still receiving replies! */
452       if ( (pos == NULL) &&
453            (h->queue_head->was_transmitted) )
454         pos = h->queue_head;
455     }
456   c++;
457   GNUNET_STATISTICS_update (h->stats,
458                             gettext_noop ("# queue entries created"),
459                             1,
460                             GNUNET_NO);
461   GNUNET_CONTAINER_DLL_insert_after (h->queue_head,
462                                      h->queue_tail,
463                                      pos,
464                                      ret);
465   h->queue_size++;
466   ret->task = GNUNET_SCHEDULER_add_delayed (timeout,
467                                             &timeout_queue_entry,
468                                             ret);
469   in_receive = h->in_receive;
470   pos = ret->next;
471   while (pos != NULL) 
472     {
473       if ( (pos->max_queue < h->queue_size) &&
474            (pos->was_transmitted == GNUNET_NO) )
475         {
476           GNUNET_assert (pos->response_proc != NULL);
477           /* move 'pos' element to head so that it will be 
478              killed on 'NULL' call below */
479           GNUNET_CONTAINER_DLL_remove (h->queue_head,
480                                        h->queue_tail,
481                                        pos);
482           GNUNET_CONTAINER_DLL_insert (h->queue_head,
483                                        h->queue_tail,
484                                        pos);
485           GNUNET_STATISTICS_update (h->stats,
486                                     gettext_noop ("# Requests dropped from datastore queue"),
487                                     1,
488                                     GNUNET_NO);
489           pos->response_proc (h, NULL);
490           break;
491         }
492       pos = pos->next;
493     }
494   h->in_receive = in_receive;
495   return ret;
496 }
497
498
499 /**
500  * Process entries in the queue (or do nothing if we are already
501  * doing so).
502  * 
503  * @param h handle to the datastore
504  */
505 static void
506 process_queue (struct GNUNET_DATASTORE_Handle *h);
507
508
509 /**
510  * Try reconnecting to the datastore service.
511  *
512  * @param cls the 'struct GNUNET_DATASTORE_Handle'
513  * @param tc scheduler context
514  */
515 static void
516 try_reconnect (void *cls,
517                const struct GNUNET_SCHEDULER_TaskContext *tc)
518 {
519   struct GNUNET_DATASTORE_Handle *h = cls;
520
521   if (h->retry_time.rel_value < GNUNET_CONSTANTS_SERVICE_RETRY.rel_value)
522     h->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY;
523   else
524     h->retry_time = GNUNET_TIME_relative_multiply (h->retry_time, 2);
525   if (h->retry_time.rel_value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.rel_value)
526     h->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
527   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
528   h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
529   if (h->client == NULL)
530     {
531       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
532                   "DATASTORE reconnect failed (fatally)\n");
533       return;
534     }
535   GNUNET_STATISTICS_update (h->stats,
536                             gettext_noop ("# datastore connections (re)created"),
537                             1,
538                             GNUNET_NO);
539 #if DEBUG_DATASTORE
540   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
541               "Reconnected to DATASTORE\n");
542 #endif
543   process_queue (h);
544 }
545
546
547 /**
548  * Disconnect from the service and then try reconnecting to the datastore service
549  * after some delay.
550  *
551  * @param h handle to datastore to disconnect and reconnect
552  */
553 static void
554 do_disconnect (struct GNUNET_DATASTORE_Handle *h)
555 {
556   if (h->client == NULL)
557     {
558 #if DEBUG_DATASTORE
559       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
560                   "client NULL in disconnect, will not try to reconnect\n");
561 #endif
562       return;
563     }
564 #if 0
565   GNUNET_STATISTICS_update (stats,
566                             gettext_noop ("# reconnected to DATASTORE"),
567                             1,
568                             GNUNET_NO);
569 #endif
570   GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
571   h->skip_next_messages = 0;
572   h->client = NULL;
573   h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->retry_time,
574                                                     &try_reconnect,
575                                                     h);      
576 }
577
578
579 /**
580  * Transmit request from queue to datastore service.
581  *
582  * @param cls the 'struct GNUNET_DATASTORE_Handle'
583  * @param size number of bytes that can be copied to buf
584  * @param buf where to copy the drop message
585  * @return number of bytes written to buf
586  */
587 static size_t
588 transmit_request (void *cls,
589                   size_t size, 
590                   void *buf)
591 {
592   struct GNUNET_DATASTORE_Handle *h = cls;
593   struct GNUNET_DATASTORE_QueueEntry *qe;
594   size_t msize;
595
596   h->th = NULL;
597   if (NULL == (qe = h->queue_head))
598     return 0; /* no entry in queue */
599   if (buf == NULL)
600     {
601       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
602                   _("Failed to transmit request to DATASTORE.\n"));
603       GNUNET_STATISTICS_update (h->stats,
604                                 gettext_noop ("# transmission request failures"),
605                                 1,
606                                 GNUNET_NO);
607       do_disconnect (h);
608       return 0;
609     }
610   if (size < (msize = qe->message_size))
611     {
612       process_queue (h);
613       return 0;
614     }
615  #if DEBUG_DATASTORE
616   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
617               "Transmitting %u byte request to DATASTORE\n",
618               msize);
619 #endif
620   memcpy (buf, &qe[1], msize);
621   qe->was_transmitted = GNUNET_YES;
622   GNUNET_SCHEDULER_cancel (qe->task);
623   qe->task = GNUNET_SCHEDULER_NO_TASK;
624   GNUNET_assert (GNUNET_NO == h->in_receive);
625   h->in_receive = GNUNET_YES;
626   GNUNET_CLIENT_receive (h->client,
627                          qe->response_proc,
628                          h,
629                          GNUNET_TIME_absolute_get_remaining (qe->timeout));
630   GNUNET_STATISTICS_update (h->stats,
631                             gettext_noop ("# bytes sent to datastore"),
632                             1,
633                             GNUNET_NO);
634   return msize;
635 }
636
637
638 /**
639  * Process entries in the queue (or do nothing if we are already
640  * doing so).
641  * 
642  * @param h handle to the datastore
643  */
644 static void
645 process_queue (struct GNUNET_DATASTORE_Handle *h)
646 {
647   struct GNUNET_DATASTORE_QueueEntry *qe;
648
649   if (NULL == (qe = h->queue_head))
650     {
651 #if DEBUG_DATASTORE > 1
652       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
653                   "Queue empty\n");
654 #endif
655       return; /* no entry in queue */
656     }
657   if (qe->was_transmitted == GNUNET_YES)
658     {
659 #if DEBUG_DATASTORE > 1
660       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
661                   "Head request already transmitted\n");
662 #endif
663       return; /* waiting for replies */
664     }
665   if (h->th != NULL)
666     {
667 #if DEBUG_DATASTORE > 1
668       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
669                   "Pending transmission request\n");
670 #endif
671       return; /* request pending */
672     }
673   if (h->client == NULL)
674     {
675 #if DEBUG_DATASTORE > 1
676       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
677                   "Not connected\n");
678 #endif
679       return; /* waiting for reconnect */
680     }
681   if (GNUNET_YES == h->in_receive)
682     {
683       /* wait for response to previous query */
684       return; 
685     }
686 #if DEBUG_DATASTORE
687   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
688               "Queueing %u byte request to DATASTORE\n",
689               qe->message_size);
690 #endif
691   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
692                                                qe->message_size,
693                                                GNUNET_TIME_absolute_get_remaining (qe->timeout),
694                                                GNUNET_YES,
695                                                &transmit_request,
696                                                h);
697   GNUNET_assert (GNUNET_NO == h->in_receive);
698   GNUNET_break (NULL != h->th);
699 }
700
701
702 /**
703  * Dummy continuation used to do nothing (but be non-zero).
704  *
705  * @param cls closure
706  * @param result result 
707  * @param emsg error message
708  */
709 static void
710 drop_status_cont (void *cls, int32_t result, const char *emsg)
711 {
712   /* do nothing */
713 }
714
715
716 /**
717  * Free a queue entry.  Removes the given entry from the
718  * queue and releases associated resources.  Does NOT
719  * call the callback.
720  * 
721  * @param qe entry to free.
722  */
723 static void
724 free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
725 {
726   struct GNUNET_DATASTORE_Handle *h = qe->h;
727
728   GNUNET_CONTAINER_DLL_remove (h->queue_head,
729                                h->queue_tail,
730                                qe);
731   if (qe->task != GNUNET_SCHEDULER_NO_TASK)
732     {
733       GNUNET_SCHEDULER_cancel (qe->task);
734       qe->task = GNUNET_SCHEDULER_NO_TASK;
735     }
736   h->queue_size--;
737   qe->was_transmitted = GNUNET_SYSERR; /* use-after-free warning */
738   GNUNET_free (qe);
739 }
740
741
742 /**
743  * Type of a function to call when we receive a message
744  * from the service.
745  *
746  * @param cls closure
747  * @param msg message received, NULL on timeout or fatal error
748  */
749 static void 
750 process_status_message (void *cls,
751                         const struct
752                         GNUNET_MessageHeader * msg)
753 {
754   struct GNUNET_DATASTORE_Handle *h = cls;
755   struct GNUNET_DATASTORE_QueueEntry *qe;
756   struct StatusContext rc;
757   const struct StatusMessage *sm;
758   const char *emsg;
759   int32_t status;
760   int was_transmitted;
761
762   h->in_receive = GNUNET_NO;
763   if (h->skip_next_messages > 0)
764     {
765       h->skip_next_messages--;
766       process_queue (h);
767       return;
768    } 
769   if (NULL == (qe = h->queue_head))
770     {
771       GNUNET_break (0);
772       do_disconnect (h);
773       return;
774     }
775   rc = qe->qc.sc;
776   if (msg == NULL)
777     {      
778       was_transmitted = qe->was_transmitted;
779       free_queue_entry (qe);
780       if (NULL == h->client)
781         return; /* forced disconnect */
782       if (rc.cont != NULL)
783         rc.cont (rc.cont_cls, 
784                  GNUNET_SYSERR,
785                  _("Failed to receive status response from database."));
786       if (was_transmitted == GNUNET_YES)
787         do_disconnect (h);
788       else
789         process_queue (h);
790       return;
791     }
792   GNUNET_assert (GNUNET_YES == qe->was_transmitted);
793   free_queue_entry (qe);
794   if ( (ntohs(msg->size) < sizeof(struct StatusMessage)) ||
795        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS) ) 
796     {
797       GNUNET_break (0);
798       h->retry_time = GNUNET_TIME_UNIT_ZERO;
799       do_disconnect (h);
800       if (rc.cont != NULL)
801         rc.cont (rc.cont_cls, 
802                  GNUNET_SYSERR,
803                  _("Error reading response from datastore service"));
804       return;
805     }
806   sm = (const struct StatusMessage*) msg;
807   status = ntohl(sm->status);
808   emsg = NULL;
809   if (ntohs(msg->size) > sizeof(struct StatusMessage))
810     {
811       emsg = (const char*) &sm[1];
812       if (emsg[ntohs(msg->size) - sizeof(struct StatusMessage) - 1] != '\0')
813         {
814           GNUNET_break (0);
815           emsg = _("Invalid error message received from datastore service");
816         }
817     }  
818   if ( (status == GNUNET_SYSERR) &&
819        (emsg == NULL) )
820     {
821       GNUNET_break (0);
822       emsg = _("Invalid error message received from datastore service");
823     }
824 #if DEBUG_DATASTORE
825   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
826               "Received status %d/%s\n",
827               (int) status,
828               emsg);
829 #endif
830   GNUNET_STATISTICS_update (h->stats,
831                             gettext_noop ("# status messages received"),
832                             1,
833                             GNUNET_NO);
834   h->retry_time.rel_value = 0;
835   process_queue (h);
836   if (rc.cont != NULL)
837     rc.cont (rc.cont_cls, 
838              status,
839              emsg);
840 }
841
842
843 /**
844  * Store an item in the datastore.  If the item is already present,
845  * the priorities are summed up and the higher expiration time and
846  * lower anonymity level is used.
847  *
848  * @param h handle to the datastore
849  * @param rid reservation ID to use (from "reserve"); use 0 if no
850  *            prior reservation was made
851  * @param key key for the value
852  * @param size number of bytes in data
853  * @param data content stored
854  * @param type type of the content
855  * @param priority priority of the content
856  * @param anonymity anonymity-level for the content
857  * @param replication how often should the content be replicated to other peers?
858  * @param expiration expiration time for the content
859  * @param queue_priority ranking of this request in the priority queue
860  * @param max_queue_size at what queue size should this request be dropped
861  *        (if other requests of higher priority are in the queue)
862  * @param timeout timeout for the operation
863  * @param cont continuation to call when done
864  * @param cont_cls closure for cont
865  * @return NULL if the entry was not queued, otherwise a handle that can be used to
866  *         cancel; note that even if NULL is returned, the callback will be invoked
867  *         (or rather, will already have been invoked)
868  */
869 struct GNUNET_DATASTORE_QueueEntry *
870 GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
871                       uint32_t rid,
872                       const GNUNET_HashCode * key,
873                       size_t size,
874                       const void *data,
875                       enum GNUNET_BLOCK_Type type,
876                       uint32_t priority,
877                       uint32_t anonymity,
878                       uint32_t replication,
879                       struct GNUNET_TIME_Absolute expiration,
880                       unsigned int queue_priority,
881                       unsigned int max_queue_size,
882                       struct GNUNET_TIME_Relative timeout,
883                       GNUNET_DATASTORE_ContinuationWithStatus cont,
884                       void *cont_cls)
885 {
886   struct GNUNET_DATASTORE_QueueEntry *qe;
887   struct DataMessage *dm;
888   size_t msize;
889   union QueueContext qc;
890
891 #if DEBUG_DATASTORE
892   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
893               "Asked to put %u bytes of data under key `%s' for %llu ms\n",
894               size,
895               GNUNET_h2s (key),
896               GNUNET_TIME_absolute_get_remaining (expiration).rel_value);
897 #endif
898   msize = sizeof(struct DataMessage) + size;
899   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
900   qc.sc.cont = cont;
901   qc.sc.cont_cls = cont_cls;
902   qe = make_queue_entry (h, msize,
903                          queue_priority, max_queue_size, timeout,
904                          &process_status_message, &qc);
905   if (qe == NULL)
906     {
907 #if DEBUG_DATASTORE
908       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
909                   "Could not create queue entry for PUT\n");
910 #endif
911       return NULL;
912     }
913   GNUNET_STATISTICS_update (h->stats,
914                             gettext_noop ("# PUT requests executed"),
915                             1,
916                             GNUNET_NO);
917   dm = (struct DataMessage* ) &qe[1];
918   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
919   dm->header.size = htons(msize);
920   dm->rid = htonl(rid);
921   dm->size = htonl( (uint32_t) size);
922   dm->type = htonl(type);
923   dm->priority = htonl(priority);
924   dm->anonymity = htonl(anonymity);
925   dm->replication = htonl (replication);
926   dm->reserved = htonl (0);
927   dm->uid = GNUNET_htonll(0);
928   dm->expiration = GNUNET_TIME_absolute_hton(expiration);
929   dm->key = *key;
930   memcpy (&dm[1], data, size);
931   process_queue (h);
932   return qe;
933 }
934
935
936 /**
937  * Reserve space in the datastore.  This function should be used
938  * to avoid "out of space" failures during a longer sequence of "put"
939  * operations (for example, when a file is being inserted).
940  *
941  * @param h handle to the datastore
942  * @param amount how much space (in bytes) should be reserved (for content only)
943  * @param entries how many entries will be created (to calculate per-entry overhead)
944  * @param queue_priority ranking of this request in the priority queue
945  * @param max_queue_size at what queue size should this request be dropped
946  *        (if other requests of higher priority are in the queue)
947  * @param timeout how long to wait at most for a response (or before dying in queue)
948  * @param cont continuation to call when done; "success" will be set to
949  *             a positive reservation value if space could be reserved.
950  * @param cont_cls closure for cont
951  * @return NULL if the entry was not queued, otherwise a handle that can be used to
952  *         cancel; note that even if NULL is returned, the callback will be invoked
953  *         (or rather, will already have been invoked)
954  */
955 struct GNUNET_DATASTORE_QueueEntry *
956 GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
957                           uint64_t amount,
958                           uint32_t entries,
959                           unsigned int queue_priority,
960                           unsigned int max_queue_size,
961                           struct GNUNET_TIME_Relative timeout,
962                           GNUNET_DATASTORE_ContinuationWithStatus cont,
963                           void *cont_cls)
964 {
965   struct GNUNET_DATASTORE_QueueEntry *qe;
966   struct ReserveMessage *rm;
967   union QueueContext qc;
968
969   if (cont == NULL)
970     cont = &drop_status_cont;
971 #if DEBUG_DATASTORE
972   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
973               "Asked to reserve %llu bytes of data and %u entries'\n",
974               (unsigned long long) amount,
975               (unsigned int) entries);
976 #endif
977   qc.sc.cont = cont;
978   qc.sc.cont_cls = cont_cls;
979   qe = make_queue_entry (h, sizeof(struct ReserveMessage),
980                          queue_priority, max_queue_size, timeout,
981                          &process_status_message, &qc);
982   if (qe == NULL)
983     {
984 #if DEBUG_DATASTORE
985       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
986                   "Could not create queue entry to reserve\n");
987 #endif
988       return NULL;
989     }
990   GNUNET_STATISTICS_update (h->stats,
991                             gettext_noop ("# RESERVE requests executed"),
992                             1,
993                             GNUNET_NO);
994   rm = (struct ReserveMessage*) &qe[1];
995   rm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
996   rm->header.size = htons(sizeof (struct ReserveMessage));
997   rm->entries = htonl(entries);
998   rm->amount = GNUNET_htonll(amount);
999   process_queue (h);
1000   return qe;
1001 }
1002
1003
1004 /**
1005  * Signal that all of the data for which a reservation was made has
1006  * been stored and that whatever excess space might have been reserved
1007  * can now be released.
1008  *
1009  * @param h handle to the datastore
1010  * @param rid reservation ID (value of "success" in original continuation
1011  *        from the "reserve" function).
1012  * @param queue_priority ranking of this request in the priority queue
1013  * @param max_queue_size at what queue size should this request be dropped
1014  *        (if other requests of higher priority are in the queue)
1015  * @param queue_priority ranking of this request in the priority queue
1016  * @param max_queue_size at what queue size should this request be dropped
1017  *        (if other requests of higher priority are in the queue)
1018  * @param timeout how long to wait at most for a response
1019  * @param cont continuation to call when done
1020  * @param cont_cls closure for cont
1021  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1022  *         cancel; note that even if NULL is returned, the callback will be invoked
1023  *         (or rather, will already have been invoked)
1024  */
1025 struct GNUNET_DATASTORE_QueueEntry *
1026 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
1027                                   uint32_t rid,
1028                                   unsigned int queue_priority,
1029                                   unsigned int max_queue_size,
1030                                   struct GNUNET_TIME_Relative timeout,
1031                                   GNUNET_DATASTORE_ContinuationWithStatus cont,
1032                                   void *cont_cls)
1033 {
1034   struct GNUNET_DATASTORE_QueueEntry *qe;
1035   struct ReleaseReserveMessage *rrm;
1036   union QueueContext qc;
1037
1038   if (cont == NULL)
1039     cont = &drop_status_cont;
1040 #if DEBUG_DATASTORE
1041   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1042               "Asked to release reserve %d\n",
1043               rid);
1044 #endif
1045   qc.sc.cont = cont;
1046   qc.sc.cont_cls = cont_cls;
1047   qe = make_queue_entry (h, sizeof(struct ReleaseReserveMessage),
1048                          queue_priority, max_queue_size, timeout,
1049                          &process_status_message, &qc);
1050   if (qe == NULL)
1051     {
1052 #if DEBUG_DATASTORE
1053       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1054                   "Could not create queue entry to release reserve\n");
1055 #endif
1056       return NULL;
1057     }
1058   GNUNET_STATISTICS_update (h->stats,
1059                             gettext_noop ("# RELEASE RESERVE requests executed"),
1060                             1,
1061                             GNUNET_NO);
1062   rrm = (struct ReleaseReserveMessage*) &qe[1];
1063   rrm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
1064   rrm->header.size = htons(sizeof (struct ReleaseReserveMessage));
1065   rrm->rid = htonl(rid);
1066   process_queue (h);
1067   return qe;
1068 }
1069
1070
1071 /**
1072  * Update a value in the datastore.
1073  *
1074  * @param h handle to the datastore
1075  * @param uid identifier for the value
1076  * @param priority how much to increase the priority of the value
1077  * @param expiration new expiration value should be MAX of existing and this argument
1078  * @param queue_priority ranking of this request in the priority queue
1079  * @param max_queue_size at what queue size should this request be dropped
1080  *        (if other requests of higher priority are in the queue)
1081  * @param timeout how long to wait at most for a response
1082  * @param cont continuation to call when done
1083  * @param cont_cls closure for cont
1084  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1085  *         cancel; note that even if NULL is returned, the callback will be invoked
1086  *         (or rather, will already have been invoked)
1087  */
1088 struct GNUNET_DATASTORE_QueueEntry *
1089 GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
1090                          uint64_t uid,
1091                          uint32_t priority,
1092                          struct GNUNET_TIME_Absolute expiration,
1093                          unsigned int queue_priority,
1094                          unsigned int max_queue_size,
1095                          struct GNUNET_TIME_Relative timeout,
1096                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1097                          void *cont_cls)
1098 {
1099   struct GNUNET_DATASTORE_QueueEntry *qe;
1100   struct UpdateMessage *um;
1101   union QueueContext qc;
1102
1103   if (cont == NULL)
1104     cont = &drop_status_cont;
1105 #if DEBUG_DATASTORE
1106   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1107               "Asked to update entry %llu raising priority by %u and expiration to %llu\n",
1108               uid,
1109               (unsigned int) priority,
1110               (unsigned long long) expiration.abs_value);
1111 #endif
1112   qc.sc.cont = cont;
1113   qc.sc.cont_cls = cont_cls;
1114   qe = make_queue_entry (h, sizeof(struct UpdateMessage),
1115                          queue_priority, max_queue_size, timeout,
1116                          &process_status_message, &qc);
1117   if (qe == NULL)
1118     {
1119 #if DEBUG_DATASTORE
1120       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1121                   "Could not create queue entry for UPDATE\n");
1122 #endif
1123       return NULL;
1124     }
1125   GNUNET_STATISTICS_update (h->stats,
1126                             gettext_noop ("# UPDATE requests executed"),
1127                             1,
1128                             GNUNET_NO);
1129   um = (struct UpdateMessage*) &qe[1];
1130   um->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
1131   um->header.size = htons(sizeof (struct UpdateMessage));
1132   um->priority = htonl(priority);
1133   um->expiration = GNUNET_TIME_absolute_hton(expiration);
1134   um->uid = GNUNET_htonll(uid);
1135   process_queue (h);
1136   return qe;
1137 }
1138
1139
1140 /**
1141  * Explicitly remove some content from the database.
1142  * The "cont"inuation will be called with status
1143  * "GNUNET_OK" if content was removed, "GNUNET_NO"
1144  * if no matching entry was found and "GNUNET_SYSERR"
1145  * on all other types of errors.
1146  *
1147  * @param h handle to the datastore
1148  * @param key key for the value
1149  * @param size number of bytes in data
1150  * @param data content stored
1151  * @param queue_priority ranking of this request in the priority queue
1152  * @param max_queue_size at what queue size should this request be dropped
1153  *        (if other requests of higher priority are in the queue)
1154  * @param timeout how long to wait at most for a response
1155  * @param cont continuation to call when done
1156  * @param cont_cls closure for cont
1157  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1158  *         cancel; note that even if NULL is returned, the callback will be invoked
1159  *         (or rather, will already have been invoked)
1160  */
1161 struct GNUNET_DATASTORE_QueueEntry *
1162 GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
1163                          const GNUNET_HashCode *key,
1164                          size_t size,
1165                          const void *data,
1166                          unsigned int queue_priority,
1167                          unsigned int max_queue_size,
1168                          struct GNUNET_TIME_Relative timeout,
1169                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1170                          void *cont_cls)
1171 {
1172   struct GNUNET_DATASTORE_QueueEntry *qe;
1173   struct DataMessage *dm;
1174   size_t msize;
1175   union QueueContext qc;
1176
1177   if (cont == NULL)
1178     cont = &drop_status_cont;
1179 #if DEBUG_DATASTORE
1180   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1181               "Asked to remove %u bytes under key `%s'\n",
1182               size,
1183               GNUNET_h2s (key));
1184 #endif
1185   qc.sc.cont = cont;
1186   qc.sc.cont_cls = cont_cls;
1187   msize = sizeof(struct DataMessage) + size;
1188   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1189   qe = make_queue_entry (h, msize,
1190                          queue_priority, max_queue_size, timeout,
1191                          &process_status_message, &qc);
1192   if (qe == NULL)
1193     {
1194 #if DEBUG_DATASTORE
1195       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1196                   "Could not create queue entry for REMOVE\n");
1197 #endif
1198       return NULL;
1199     }
1200   GNUNET_STATISTICS_update (h->stats,
1201                             gettext_noop ("# REMOVE requests executed"),
1202                             1,
1203                             GNUNET_NO);
1204   dm = (struct DataMessage*) &qe[1];
1205   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
1206   dm->header.size = htons(msize);
1207   dm->rid = htonl(0);
1208   dm->size = htonl(size);
1209   dm->type = htonl(0);
1210   dm->priority = htonl(0);
1211   dm->anonymity = htonl(0);
1212   dm->uid = GNUNET_htonll(0);
1213   dm->expiration = GNUNET_TIME_absolute_hton(GNUNET_TIME_UNIT_ZERO_ABS);
1214   dm->key = *key;
1215   memcpy (&dm[1], data, size);
1216   process_queue (h);
1217   return qe;
1218 }
1219
1220
1221 /**
1222  * Type of a function to call when we receive a message
1223  * from the service.
1224  *
1225  * @param cls closure
1226  * @param msg message received, NULL on timeout or fatal error
1227  */
1228 static void 
1229 process_result_message (void *cls,
1230                         const struct GNUNET_MessageHeader *msg)
1231 {
1232   struct GNUNET_DATASTORE_Handle *h = cls;
1233   struct GNUNET_DATASTORE_QueueEntry *qe;
1234   struct ResultContext rc;
1235   const struct DataMessage *dm;
1236
1237   h->in_receive = GNUNET_NO;
1238   if (h->skip_next_messages > 0)
1239     {
1240       h->skip_next_messages--;
1241       process_queue (h);
1242       return;
1243     }
1244   if (msg == NULL)
1245     {
1246       qe = h->queue_head;
1247       GNUNET_assert (NULL != qe);
1248       if (qe->was_transmitted == GNUNET_YES)
1249         {
1250           rc = qe->qc.rc;
1251           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1252                       _("Failed to receive response from database.\n"));
1253           do_disconnect (h);
1254           free_queue_entry (qe);
1255           if (rc.proc != NULL)
1256             rc.proc (rc.proc_cls,
1257                      NULL, 0, NULL, 0, 0, 0, 
1258                      GNUNET_TIME_UNIT_ZERO_ABS, 0);    
1259         }
1260       else
1261         process_queue (h);
1262       return;
1263     }
1264   if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) 
1265     {
1266       GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader));
1267       qe = h->queue_head;
1268       rc = qe->qc.rc;
1269       GNUNET_assert (GNUNET_YES == qe->was_transmitted);
1270       free_queue_entry (qe);
1271 #if DEBUG_DATASTORE
1272       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1273                   "Received end of result set, new queue size is %u\n",
1274                   h->queue_size);
1275 #endif
1276       if (rc.proc != NULL)
1277         rc.proc (rc.proc_cls,
1278                  NULL, 0, NULL, 0, 0, 0, 
1279                  GNUNET_TIME_UNIT_ZERO_ABS, 0); 
1280       h->retry_time.rel_value = 0;
1281       h->result_count = 0;
1282       process_queue (h);
1283       return;
1284     }
1285   qe = h->queue_head;
1286   rc = qe->qc.rc;
1287   GNUNET_assert (GNUNET_YES == qe->was_transmitted);
1288   if ( (ntohs(msg->size) < sizeof(struct DataMessage)) ||
1289        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) ||
1290        (ntohs(msg->size) != sizeof(struct DataMessage) + ntohl (((const struct DataMessage*)msg)->size)) )
1291     {
1292       GNUNET_break (0);
1293       free_queue_entry (qe);
1294       h->retry_time = GNUNET_TIME_UNIT_ZERO;
1295       do_disconnect (h);
1296       if (rc.proc != NULL)
1297         rc.proc (rc.proc_cls,
1298                  NULL, 0, NULL, 0, 0, 0, 
1299                  GNUNET_TIME_UNIT_ZERO_ABS, 0);
1300       return;
1301     }
1302   GNUNET_STATISTICS_update (h->stats,
1303                             gettext_noop ("# Results received"),
1304                             1,
1305                             GNUNET_NO);
1306   dm = (const struct DataMessage*) msg;
1307 #if DEBUG_DATASTORE
1308   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1309               "Received result %llu with type %u and size %u with key %s\n",
1310               (unsigned long long) GNUNET_ntohll(dm->uid),
1311               ntohl(dm->type),
1312               ntohl(dm->size),
1313               GNUNET_h2s(&dm->key));
1314 #endif
1315   free_queue_entry (qe);
1316   h->retry_time.rel_value = 0;
1317   process_queue (h);
1318   if (rc.proc != NULL)
1319     rc.proc (rc.proc_cls,
1320              &dm->key,
1321              ntohl(dm->size),
1322              &dm[1],
1323              ntohl(dm->type),
1324              ntohl(dm->priority),
1325              ntohl(dm->anonymity),
1326              GNUNET_TIME_absolute_ntoh(dm->expiration), 
1327              GNUNET_ntohll(dm->uid));
1328 }
1329
1330
1331 /**
1332  * Get a random value from the datastore for content replication.
1333  * Returns a single, random value among those with the highest
1334  * replication score, lowering positive replication scores by one for
1335  * the chosen value (if only content with a replication score exists,
1336  * a random value is returned and replication scores are not changed).
1337  *
1338  * @param h handle to the datastore
1339  * @param queue_priority ranking of this request in the priority queue
1340  * @param max_queue_size at what queue size should this request be dropped
1341  *        (if other requests of higher priority are in the queue)
1342  * @param timeout how long to wait at most for a response
1343  * @param proc function to call on a random value; it
1344  *        will be called once with a value (if available)
1345  *        and always once with a value of NULL.
1346  * @param proc_cls closure for proc
1347  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1348  *         cancel
1349  */
1350 struct GNUNET_DATASTORE_QueueEntry *
1351 GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
1352                                       unsigned int queue_priority,
1353                                       unsigned int max_queue_size,
1354                                       struct GNUNET_TIME_Relative timeout,
1355                                       GNUNET_DATASTORE_DatumProcessor proc, 
1356                                       void *proc_cls)
1357 {
1358   struct GNUNET_DATASTORE_QueueEntry *qe;
1359   struct GNUNET_MessageHeader *m;
1360   union QueueContext qc;
1361
1362   GNUNET_assert (NULL != proc);
1363 #if DEBUG_DATASTORE
1364   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1365               "Asked to get replication entry in %llu ms\n",
1366               (unsigned long long) timeout.rel_value);
1367 #endif
1368   qc.rc.proc = proc;
1369   qc.rc.proc_cls = proc_cls;
1370   qe = make_queue_entry (h, sizeof(struct GNUNET_MessageHeader),
1371                          queue_priority, max_queue_size, timeout,
1372                          &process_result_message, &qc);
1373   if (qe == NULL)
1374     {
1375 #if DEBUG_DATASTORE
1376       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1377                   "Could not create queue entry for GET REPLICATION\n");
1378 #endif
1379       return NULL;    
1380     }
1381   GNUNET_STATISTICS_update (h->stats,
1382                             gettext_noop ("# GET REPLICATION requests executed"),
1383                             1,
1384                             GNUNET_NO);
1385   m = (struct GNUNET_MessageHeader*) &qe[1];
1386   m->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION);
1387   m->size = htons(sizeof (struct GNUNET_MessageHeader));
1388   process_queue (h);
1389   return qe;
1390 }
1391
1392
1393 /**
1394  * Get a single zero-anonymity value from the datastore.
1395  *
1396  * @param h handle to the datastore
1397  * @param offset offset of the result (mod #num-results); set to
1398  *               a random 64-bit value initially; then increment by
1399  *               one each time; detect that all results have been found by uid
1400  *               being again the first uid ever returned.
1401  * @param queue_priority ranking of this request in the priority queue
1402  * @param max_queue_size at what queue size should this request be dropped
1403  *        (if other requests of higher priority are in the queue)
1404  * @param timeout how long to wait at most for a response
1405  * @param type allowed type for the operation (never zero)
1406  * @param proc function to call on a random value; it
1407  *        will be called once with a value (if available)
1408  *        or with NULL if none value exists.
1409  * @param proc_cls closure for proc
1410  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1411  *         cancel
1412  */
1413 struct GNUNET_DATASTORE_QueueEntry *
1414 GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1415                                      uint64_t offset,
1416                                      unsigned int queue_priority,
1417                                      unsigned int max_queue_size,
1418                                      struct GNUNET_TIME_Relative timeout,
1419                                      enum GNUNET_BLOCK_Type type,
1420                                      GNUNET_DATASTORE_DatumProcessor proc, 
1421                                      void *proc_cls)
1422 {
1423   struct GNUNET_DATASTORE_QueueEntry *qe;
1424   struct GetZeroAnonymityMessage *m;
1425   union QueueContext qc;
1426
1427   GNUNET_assert (NULL != proc);
1428   GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY);
1429 #if DEBUG_DATASTORE
1430   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1431               "Asked to get %llu-th zero-anonymity entry of type %d in %llu ms\n",
1432               (unsigned long long) offset,
1433               type,
1434               (unsigned long long) timeout.rel_value);
1435 #endif
1436   qc.rc.proc = proc;
1437   qc.rc.proc_cls = proc_cls;
1438   qe = make_queue_entry (h, sizeof(struct GetZeroAnonymityMessage),
1439                          queue_priority, max_queue_size, timeout,
1440                          &process_result_message, &qc);
1441   if (qe == NULL)
1442     {
1443 #if DEBUG_DATASTORE
1444       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1445                   "Could not create queue entry for zero-anonymity procation\n");
1446 #endif
1447       return NULL;    
1448     }
1449   GNUNET_STATISTICS_update (h->stats,
1450                             gettext_noop ("# GET ZERO ANONYMITY requests executed"),
1451                             1,
1452                             GNUNET_NO);
1453   m = (struct GetZeroAnonymityMessage*) &qe[1];
1454   m->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY);
1455   m->header.size = htons(sizeof (struct GetZeroAnonymityMessage));
1456   m->type = htonl ((uint32_t) type);
1457   m->offset = GNUNET_htonll (offset);
1458   process_queue (h);
1459   return qe;
1460 }
1461
1462
1463 /**
1464  * Get a result for a particular key from the datastore.  The processor
1465  * will only be called once.
1466  *
1467  * @param h handle to the datastore
1468  * @param offset offset of the result (mod #num-results); set to
1469  *               a random 64-bit value initially; then increment by
1470  *               one each time; detect that all results have been found by uid
1471  *               being again the first uid ever returned.
1472  * @param key maybe NULL (to match all entries)
1473  * @param type desired type, 0 for any
1474  * @param queue_priority ranking of this request in the priority queue
1475  * @param max_queue_size at what queue size should this request be dropped
1476  *        (if other requests of higher priority are in the queue)
1477  * @param timeout how long to wait at most for a response
1478  * @param proc function to call on each matching value;
1479  *        will be called once with a NULL value at the end
1480  * @param proc_cls closure for proc
1481  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1482  *         cancel
1483  */
1484 struct GNUNET_DATASTORE_QueueEntry *
1485 GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h,
1486                           uint64_t offset,
1487                           const GNUNET_HashCode * key,
1488                           enum GNUNET_BLOCK_Type type,
1489                           unsigned int queue_priority,
1490                           unsigned int max_queue_size,
1491                           struct GNUNET_TIME_Relative timeout,
1492                           GNUNET_DATASTORE_DatumProcessor proc, 
1493                           void *proc_cls)
1494 {
1495   struct GNUNET_DATASTORE_QueueEntry *qe;
1496   struct GetMessage *gm;
1497   union QueueContext qc;
1498
1499   GNUNET_assert (NULL != proc);
1500 #if DEBUG_DATASTORE
1501   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1502               "Asked to look for data of type %u under key `%s'\n",
1503               (unsigned int) type,
1504               GNUNET_h2s (key));
1505 #endif
1506   qc.rc.proc = proc;
1507   qc.rc.proc_cls = proc_cls;
1508   qe = make_queue_entry (h, sizeof(struct GetMessage),
1509                          queue_priority, max_queue_size, timeout,
1510                          &process_result_message, &qc);
1511   if (qe == NULL)
1512     {
1513 #if DEBUG_DATASTORE
1514       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1515                   "Could not queue request for `%s'\n",
1516                   GNUNET_h2s (key));
1517 #endif
1518       return NULL;
1519     }
1520   GNUNET_STATISTICS_update (h->stats,
1521                             gettext_noop ("# GET requests executed"),
1522                             1,
1523                             GNUNET_NO);
1524   gm = (struct GetMessage*) &qe[1];
1525   gm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1526   gm->type = htonl(type);
1527   gm->offset = GNUNET_htonll (offset);
1528   if (key != NULL)
1529     {
1530       gm->header.size = htons(sizeof (struct GetMessage));
1531       gm->key = *key;
1532     }
1533   else
1534     {
1535       gm->header.size = htons(sizeof (struct GetMessage) - sizeof(GNUNET_HashCode));
1536     }
1537   process_queue (h);
1538   return qe;
1539 }
1540
1541
1542 /**
1543  * Cancel a datastore operation.  The final callback from the
1544  * operation must not have been done yet.
1545  * 
1546  * @param qe operation to cancel
1547  */
1548 void
1549 GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
1550 {
1551   struct GNUNET_DATASTORE_Handle *h;
1552
1553   GNUNET_assert (GNUNET_SYSERR != qe->was_transmitted);
1554   h = qe->h;
1555 #if DEBUG_DATASTORE
1556   GNUNET_log  (GNUNET_ERROR_TYPE_DEBUG,
1557                "Pending DATASTORE request %p cancelled (%d, %d)\n",
1558                qe,
1559                qe->was_transmitted,
1560                h->queue_head == qe);
1561 #endif
1562   if (GNUNET_YES == qe->was_transmitted) 
1563     {
1564       free_queue_entry (qe);
1565       h->skip_next_messages++;
1566       return;
1567     }
1568   free_queue_entry (qe);
1569   process_queue (h);
1570 }
1571
1572
1573 /* end of datastore_api.c */