65c14f67336c7e354028264e6464defaeaa33042
[oweals/gnunet.git] / src / datastore / datastore_api.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/datastore_api.c
23  * @brief Management for the datastore for files stored on a GNUnet node.  Implements
24  *        a priority queue for requests (with timeouts).
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet_arm_service.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_datastore_service.h"
31 #include "gnunet_statistics_service.h"
32 #include "datastore.h"
33
34 /**
35  * If a client stopped asking for more results, how many more do
36  * we receive from the DB before killing the connection?  Trade-off
37  * between re-doing TCP handshakes and (needlessly) receiving 
38  * useless results.
39  */
40 #define MAX_EXCESS_RESULTS 8
41
42 /**
43  * Context for processing status messages.
44  */
45 struct StatusContext
46 {
47   /**
48    * Continuation to call with the status.
49    */
50   GNUNET_DATASTORE_ContinuationWithStatus cont;
51
52   /**
53    * Closure for cont.
54    */
55   void *cont_cls;
56
57 };
58
59
60 /**
61  * Context for processing result messages.
62  */
63 struct ResultContext
64 {
65   /**
66    * Function to call with the result.
67    */
68   GNUNET_DATASTORE_DatumProcessor proc;
69
70   /**
71    * Closure for proc.
72    */
73   void *proc_cls;
74
75 };
76
77
78 /**
79  *  Context for a queue operation.
80  */
81 union QueueContext
82 {
83
84   struct StatusContext sc;
85   
86   struct ResultContext rc;
87
88 };
89
90
91
92 /**
93  * Entry in our priority queue.
94  */
95 struct GNUNET_DATASTORE_QueueEntry
96 {
97
98   /**
99    * This is a linked list.
100    */
101   struct GNUNET_DATASTORE_QueueEntry *next;
102
103   /**
104    * This is a linked list.
105    */
106   struct GNUNET_DATASTORE_QueueEntry *prev;
107
108   /**
109    * Handle to the master context.
110    */
111   struct GNUNET_DATASTORE_Handle *h;
112
113   /**
114    * Response processor (NULL if we are not waiting for a response).
115    * This struct should be used for the closure, function-specific
116    * arguments can be passed via 'qc'.
117    */
118   GNUNET_CLIENT_MessageHandler response_proc;
119
120   /**
121    * Function to call after transmission of the request.
122    */
123   GNUNET_DATASTORE_ContinuationWithStatus cont;
124    
125   /**
126    * Closure for 'cont'.
127    */
128   void *cont_cls;
129
130   /**
131    * Context for the operation.
132    */
133   union QueueContext qc;
134
135   /**
136    * Task for timeout signalling.
137    */
138   GNUNET_SCHEDULER_TaskIdentifier task;
139
140   /**
141    * Timeout for the current operation.
142    */
143   struct GNUNET_TIME_Absolute timeout;
144
145   /**
146    * Priority in the queue.
147    */
148   unsigned int priority;
149
150   /**
151    * Maximum allowed length of queue (otherwise
152    * this request should be discarded).
153    */
154   unsigned int max_queue;
155
156   /**
157    * Number of bytes in the request message following
158    * this struct.  32-bit value for nicer memory
159    * access (and overall struct alignment).
160    */
161   uint32_t message_size;
162
163   /**
164    * Has this message been transmitted to the service?
165    * Only ever GNUNET_YES for the head of the queue.
166    * Note that the overall struct should end at a 
167    * multiple of 64 bits.
168    */
169   int was_transmitted;
170   
171 };
172
173 /**
174  * Handle to the datastore service. 
175  */
176 struct GNUNET_DATASTORE_Handle
177 {
178
179   /**
180    * Our configuration.
181    */
182   const struct GNUNET_CONFIGURATION_Handle *cfg;
183
184   /**
185    * Current connection to the datastore service.
186    */
187   struct GNUNET_CLIENT_Connection *client;
188
189   /**
190    * Handle for statistics.
191    */
192   struct GNUNET_STATISTICS_Handle *stats;
193
194   /**
195    * Current transmit handle.
196    */
197   struct GNUNET_CLIENT_TransmitHandle *th;
198
199   /**
200    * Current head of priority queue.
201    */
202   struct GNUNET_DATASTORE_QueueEntry *queue_head;
203
204   /**
205    * Current tail of priority queue.
206    */
207   struct GNUNET_DATASTORE_QueueEntry *queue_tail;
208
209   /**
210    * Task for trying to reconnect.
211    */
212   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
213
214   /**
215    * How quickly should we retry?  Used for exponential back-off on
216    * connect-errors.
217    */
218   struct GNUNET_TIME_Relative retry_time;
219
220   /**
221    * Number of entries in the queue.
222    */
223   unsigned int queue_size;
224
225   /**
226    * Number of results we're receiving for the current query
227    * after application stopped to care.  Used to determine when
228    * to reset the connection.
229    */
230   unsigned int result_count;
231
232   /**
233    * Are we currently trying to receive from the service?
234    */
235   int in_receive;
236
237   /**
238    * We should ignore the next message(s) from the service.
239    */
240   unsigned int skip_next_messages;
241
242 };
243
244
245
246 /**
247  * Connect to the datastore service.
248  *
249  * @param cfg configuration to use
250  * @return handle to use to access the service
251  */
252 struct GNUNET_DATASTORE_Handle *
253 GNUNET_DATASTORE_connect (const struct
254                           GNUNET_CONFIGURATION_Handle
255                           *cfg)
256 {
257   struct GNUNET_CLIENT_Connection *c;
258   struct GNUNET_DATASTORE_Handle *h;
259   
260   c = GNUNET_CLIENT_connect ("datastore", cfg);
261   if (c == NULL)
262     return NULL; /* oops */
263   h = GNUNET_malloc (sizeof(struct GNUNET_DATASTORE_Handle) + 
264                      GNUNET_SERVER_MAX_MESSAGE_SIZE - 1);
265   h->client = c;
266   h->cfg = cfg;
267   h->stats = GNUNET_STATISTICS_create ("datastore-api",
268                                        cfg);
269   return h;
270 }
271
272
273 /**
274  * Transmit DROP message to datastore service.
275  *
276  * @param cls the 'struct GNUNET_DATASTORE_Handle'
277  * @param size number of bytes that can be copied to buf
278  * @param buf where to copy the drop message
279  * @return number of bytes written to buf
280  */
281 static size_t
282 transmit_drop (void *cls,
283                size_t size, 
284                void *buf)
285 {
286   struct GNUNET_DATASTORE_Handle *h = cls;
287   struct GNUNET_MessageHeader *hdr;
288   
289   if (buf == NULL)
290     {
291       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
292                   _("Failed to transmit request to drop database.\n"));
293       GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
294       return 0;
295     }
296   GNUNET_assert (size >= sizeof(struct GNUNET_MessageHeader));
297   hdr = buf;
298   hdr->size = htons(sizeof(struct GNUNET_MessageHeader));
299   hdr->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
300   GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
301   return sizeof(struct GNUNET_MessageHeader);
302 }
303
304
305 /**
306  * Disconnect from the datastore service (and free
307  * associated resources).
308  *
309  * @param h handle to the datastore
310  * @param drop set to GNUNET_YES to delete all data in datastore (!)
311  */
312 void
313 GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
314                              int drop)
315 {
316   struct GNUNET_DATASTORE_QueueEntry *qe;
317
318   if (h->client != NULL)
319     {
320       GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
321       h->client = NULL;
322     }
323   if (h->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
324     {
325       GNUNET_SCHEDULER_cancel (h->reconnect_task);
326       h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
327     }
328   if (NULL != h->th)
329     {
330       GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
331       h->th = NULL;
332     }
333   while (NULL != (qe = h->queue_head))
334     {
335       GNUNET_assert (NULL != qe->response_proc);
336       qe->response_proc (h, NULL);
337     }
338   if (GNUNET_YES == drop) 
339     {
340       h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
341       if (h->client != NULL)
342         {
343           if (NULL != 
344               GNUNET_CLIENT_notify_transmit_ready (h->client,
345                                                    sizeof(struct GNUNET_MessageHeader),
346                                                    GNUNET_TIME_UNIT_MINUTES,
347                                                    GNUNET_YES,
348                                                    &transmit_drop,
349                                                    h))
350             return;
351           GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
352           h->client = NULL;
353         }
354       GNUNET_break (0);
355     }
356   GNUNET_STATISTICS_destroy (h->stats,
357                              GNUNET_NO);
358   h->stats = NULL;
359   GNUNET_free (h);
360 }
361
362
363 /**
364  * A request has timed out (before being transmitted to the service).
365  *
366  * @param cls the 'struct GNUNET_DATASTORE_QueueEntry'
367  * @param tc scheduler context
368  */
369 static void
370 timeout_queue_entry (void *cls,
371                      const struct GNUNET_SCHEDULER_TaskContext *tc)
372 {
373   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
374
375   GNUNET_STATISTICS_update (qe->h->stats,
376                             gettext_noop ("# queue entry timeouts"),
377                             1,
378                             GNUNET_NO);
379   qe->task = GNUNET_SCHEDULER_NO_TASK;
380   GNUNET_assert (qe->was_transmitted == GNUNET_NO);
381   qe->response_proc (qe->h, NULL);
382 }
383
384
385 /**
386  * Create a new entry for our priority queue (and possibly discard other entires if
387  * the queue is getting too long).
388  *
389  * @param h handle to the datastore
390  * @param msize size of the message to queue
391  * @param queue_priority priority of the entry
392  * @param max_queue_size at what queue size should this request be dropped
393  *        (if other requests of higher priority are in the queue)
394  * @param timeout timeout for the operation
395  * @param response_proc function to call with replies (can be NULL)
396  * @param qc client context (NOT a closure for response_proc)
397  * @return NULL if the queue is full 
398  */
399 static struct GNUNET_DATASTORE_QueueEntry *
400 make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
401                   size_t msize,
402                   unsigned int queue_priority,
403                   unsigned int max_queue_size,
404                   struct GNUNET_TIME_Relative timeout,
405                   GNUNET_CLIENT_MessageHandler response_proc,            
406                   const union QueueContext *qc)
407 {
408   struct GNUNET_DATASTORE_QueueEntry *ret;
409   struct GNUNET_DATASTORE_QueueEntry *pos;
410   unsigned int c;
411
412   c = 0;
413   pos = h->queue_head;
414   while ( (pos != NULL) &&
415           (c < max_queue_size) &&
416           (pos->priority >= queue_priority) )
417     {
418       c++;
419       pos = pos->next;
420     }
421   if (c >= max_queue_size)
422     {
423       GNUNET_STATISTICS_update (h->stats,
424                                 gettext_noop ("# queue overflows"),
425                                 1,
426                                 GNUNET_NO);
427       return NULL;
428     }
429   ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize);
430   ret->h = h;
431   ret->response_proc = response_proc;
432   ret->qc = *qc;
433   ret->timeout = GNUNET_TIME_relative_to_absolute (timeout);
434   ret->priority = queue_priority;
435   ret->max_queue = max_queue_size;
436   ret->message_size = msize;
437   ret->was_transmitted = GNUNET_NO;
438   if (pos == NULL)
439     {
440       /* append at the tail */
441       pos = h->queue_tail;
442     }
443   else
444     {
445       pos = pos->prev; 
446       /* do not insert at HEAD if HEAD query was already
447          transmitted and we are still receiving replies! */
448       if ( (pos == NULL) &&
449            (h->queue_head->was_transmitted) )
450         pos = h->queue_head;
451     }
452   c++;
453   GNUNET_STATISTICS_update (h->stats,
454                             gettext_noop ("# queue entries created"),
455                             1,
456                             GNUNET_NO);
457   GNUNET_CONTAINER_DLL_insert_after (h->queue_head,
458                                      h->queue_tail,
459                                      pos,
460                                      ret);
461   h->queue_size++;
462   ret->task = GNUNET_SCHEDULER_add_delayed (timeout,
463                                             &timeout_queue_entry,
464                                             ret);
465   pos = ret->next;
466   while (pos != NULL) 
467     {
468       if ( (pos->max_queue < h->queue_size) &&
469            (pos->was_transmitted == GNUNET_NO) )
470         {
471           GNUNET_assert (pos->response_proc != NULL);
472           /* move 'pos' element to head so that it will be 
473              killed on 'NULL' call below */
474           GNUNET_CONTAINER_DLL_remove (h->queue_head,
475                                        h->queue_tail,
476                                        pos);
477           GNUNET_CONTAINER_DLL_insert (h->queue_head,
478                                        h->queue_tail,
479                                        pos);
480           GNUNET_STATISTICS_update (h->stats,
481                                     gettext_noop ("# Requests dropped from datastore queue"),
482                                     1,
483                                     GNUNET_NO);
484           pos->response_proc (h, NULL);
485           break;
486         }
487       pos = pos->next;
488     }
489   return ret;
490 }
491
492
493 /**
494  * Process entries in the queue (or do nothing if we are already
495  * doing so).
496  * 
497  * @param h handle to the datastore
498  */
499 static void
500 process_queue (struct GNUNET_DATASTORE_Handle *h);
501
502
503 /**
504  * Try reconnecting to the datastore service.
505  *
506  * @param cls the 'struct GNUNET_DATASTORE_Handle'
507  * @param tc scheduler context
508  */
509 static void
510 try_reconnect (void *cls,
511                const struct GNUNET_SCHEDULER_TaskContext *tc)
512 {
513   struct GNUNET_DATASTORE_Handle *h = cls;
514
515   if (h->retry_time.rel_value < GNUNET_CONSTANTS_SERVICE_RETRY.rel_value)
516     h->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY;
517   else
518     h->retry_time = GNUNET_TIME_relative_multiply (h->retry_time, 2);
519   if (h->retry_time.rel_value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.rel_value)
520     h->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
521   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
522   h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
523   if (h->client == NULL)
524     {
525       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
526                   "DATASTORE reconnect failed (fatally)\n");
527       return;
528     }
529   GNUNET_STATISTICS_update (h->stats,
530                             gettext_noop ("# datastore connections (re)created"),
531                             1,
532                             GNUNET_NO);
533 #if DEBUG_DATASTORE
534   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
535               "Reconnected to DATASTORE\n");
536 #endif
537   process_queue (h);
538 }
539
540
541 /**
542  * Disconnect from the service and then try reconnecting to the datastore service
543  * after some delay.
544  *
545  * @param h handle to datastore to disconnect and reconnect
546  */
547 static void
548 do_disconnect (struct GNUNET_DATASTORE_Handle *h)
549 {
550   if (h->client == NULL)
551     {
552 #if DEBUG_DATASTORE
553       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
554                   "client NULL in disconnect, will not try to reconnect\n");
555 #endif
556       return;
557     }
558 #if 0
559   GNUNET_STATISTICS_update (stats,
560                             gettext_noop ("# reconnected to DATASTORE"),
561                             1,
562                             GNUNET_NO);
563 #endif
564   GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
565   h->skip_next_messages = 0;
566   h->client = NULL;
567   h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->retry_time,
568                                                     &try_reconnect,
569                                                     h);      
570 }
571
572
573 /**
574  * Transmit request from queue to datastore service.
575  *
576  * @param cls the 'struct GNUNET_DATASTORE_Handle'
577  * @param size number of bytes that can be copied to buf
578  * @param buf where to copy the drop message
579  * @return number of bytes written to buf
580  */
581 static size_t
582 transmit_request (void *cls,
583                   size_t size, 
584                   void *buf)
585 {
586   struct GNUNET_DATASTORE_Handle *h = cls;
587   struct GNUNET_DATASTORE_QueueEntry *qe;
588   size_t msize;
589
590   h->th = NULL;
591   if (NULL == (qe = h->queue_head))
592     return 0; /* no entry in queue */
593   if (buf == NULL)
594     {
595       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
596                   _("Failed to transmit request to DATASTORE.\n"));
597       GNUNET_STATISTICS_update (h->stats,
598                                 gettext_noop ("# transmission request failures"),
599                                 1,
600                                 GNUNET_NO);
601       do_disconnect (h);
602       return 0;
603     }
604   if (size < (msize = qe->message_size))
605     {
606       process_queue (h);
607       return 0;
608     }
609  #if DEBUG_DATASTORE
610   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
611               "Transmitting %u byte request to DATASTORE\n",
612               msize);
613 #endif
614   memcpy (buf, &qe[1], msize);
615   qe->was_transmitted = GNUNET_YES;
616   GNUNET_SCHEDULER_cancel (qe->task);
617   qe->task = GNUNET_SCHEDULER_NO_TASK;
618   GNUNET_assert (GNUNET_NO == h->in_receive);
619   h->in_receive = GNUNET_YES;
620   GNUNET_CLIENT_receive (h->client,
621                          qe->response_proc,
622                          h,
623                          GNUNET_TIME_absolute_get_remaining (qe->timeout));
624   GNUNET_STATISTICS_update (h->stats,
625                             gettext_noop ("# bytes sent to datastore"),
626                             1,
627                             GNUNET_NO);
628   return msize;
629 }
630
631
632 /**
633  * Process entries in the queue (or do nothing if we are already
634  * doing so).
635  * 
636  * @param h handle to the datastore
637  */
638 static void
639 process_queue (struct GNUNET_DATASTORE_Handle *h)
640 {
641   struct GNUNET_DATASTORE_QueueEntry *qe;
642
643   if (NULL == (qe = h->queue_head))
644     {
645 #if DEBUG_DATASTORE > 1
646       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
647                   "Queue empty\n");
648 #endif
649       return; /* no entry in queue */
650     }
651   if (qe->was_transmitted == GNUNET_YES)
652     {
653 #if DEBUG_DATASTORE > 1
654       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
655                   "Head request already transmitted\n");
656 #endif
657       return; /* waiting for replies */
658     }
659   if (h->th != NULL)
660     {
661 #if DEBUG_DATASTORE > 1
662       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
663                   "Pending transmission request\n");
664 #endif
665       return; /* request pending */
666     }
667   if (h->client == NULL)
668     {
669 #if DEBUG_DATASTORE > 1
670       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
671                   "Not connected\n");
672 #endif
673       return; /* waiting for reconnect */
674     }
675   if (GNUNET_YES == h->in_receive)
676     {
677       /* wait for response to previous query */
678       return; 
679     }
680 #if DEBUG_DATASTORE
681   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
682               "Queueing %u byte request to DATASTORE\n",
683               qe->message_size);
684 #endif
685   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
686                                                qe->message_size,
687                                                GNUNET_TIME_absolute_get_remaining (qe->timeout),
688                                                GNUNET_YES,
689                                                &transmit_request,
690                                                h);
691   GNUNET_assert (GNUNET_NO == h->in_receive);
692   GNUNET_break (NULL != h->th);
693 }
694
695
696 /**
697  * Dummy continuation used to do nothing (but be non-zero).
698  *
699  * @param cls closure
700  * @param result result 
701  * @param emsg error message
702  */
703 static void
704 drop_status_cont (void *cls, int32_t result, const char *emsg)
705 {
706   /* do nothing */
707 }
708
709
710 /**
711  * Free a queue entry.  Removes the given entry from the
712  * queue and releases associated resources.  Does NOT
713  * call the callback.
714  * 
715  * @param qe entry to free.
716  */
717 static void
718 free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
719 {
720   struct GNUNET_DATASTORE_Handle *h = qe->h;
721
722   GNUNET_CONTAINER_DLL_remove (h->queue_head,
723                                h->queue_tail,
724                                qe);
725   if (qe->task != GNUNET_SCHEDULER_NO_TASK)
726     {
727       GNUNET_SCHEDULER_cancel (qe->task);
728       qe->task = GNUNET_SCHEDULER_NO_TASK;
729     }
730   h->queue_size--;
731   qe->was_transmitted = GNUNET_SYSERR; /* use-after-free warning */
732   GNUNET_free (qe);
733 }
734
735
736 /**
737  * Type of a function to call when we receive a message
738  * from the service.
739  *
740  * @param cls closure
741  * @param msg message received, NULL on timeout or fatal error
742  */
743 static void 
744 process_status_message (void *cls,
745                         const struct
746                         GNUNET_MessageHeader * msg)
747 {
748   struct GNUNET_DATASTORE_Handle *h = cls;
749   struct GNUNET_DATASTORE_QueueEntry *qe;
750   struct StatusContext rc;
751   const struct StatusMessage *sm;
752   const char *emsg;
753   int32_t status;
754   int was_transmitted;
755
756   h->in_receive = GNUNET_NO;
757   if (h->skip_next_messages > 0)
758     {
759       h->skip_next_messages--;
760       process_queue (h);
761       return;
762    } 
763   if (NULL == (qe = h->queue_head))
764     {
765       GNUNET_break (0);
766       do_disconnect (h);
767       return;
768     }
769   rc = qe->qc.sc;
770   if (msg == NULL)
771     {      
772       was_transmitted = qe->was_transmitted;
773       free_queue_entry (qe);
774       if (NULL == h->client)
775         return; /* forced disconnect */
776       if (rc.cont != NULL)
777         rc.cont (rc.cont_cls, 
778                  GNUNET_SYSERR,
779                  _("Failed to receive status response from database."));
780       if (was_transmitted == GNUNET_YES)
781         do_disconnect (h);
782       else
783         process_queue (h);
784       return;
785     }
786   GNUNET_assert (GNUNET_YES == qe->was_transmitted);
787   free_queue_entry (qe);
788   if ( (ntohs(msg->size) < sizeof(struct StatusMessage)) ||
789        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS) ) 
790     {
791       GNUNET_break (0);
792       h->retry_time = GNUNET_TIME_UNIT_ZERO;
793       do_disconnect (h);
794       if (rc.cont != NULL)
795         rc.cont (rc.cont_cls, 
796                  GNUNET_SYSERR,
797                  _("Error reading response from datastore service"));
798       return;
799     }
800   sm = (const struct StatusMessage*) msg;
801   status = ntohl(sm->status);
802   emsg = NULL;
803   if (ntohs(msg->size) > sizeof(struct StatusMessage))
804     {
805       emsg = (const char*) &sm[1];
806       if (emsg[ntohs(msg->size) - sizeof(struct StatusMessage) - 1] != '\0')
807         {
808           GNUNET_break (0);
809           emsg = _("Invalid error message received from datastore service");
810         }
811     }  
812   if ( (status == GNUNET_SYSERR) &&
813        (emsg == NULL) )
814     {
815       GNUNET_break (0);
816       emsg = _("Invalid error message received from datastore service");
817     }
818 #if DEBUG_DATASTORE
819   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
820               "Received status %d/%s\n",
821               (int) status,
822               emsg);
823 #endif
824   GNUNET_STATISTICS_update (h->stats,
825                             gettext_noop ("# status messages received"),
826                             1,
827                             GNUNET_NO);
828   h->retry_time.rel_value = 0;
829   process_queue (h);
830   if (rc.cont != NULL)
831     rc.cont (rc.cont_cls, 
832              status,
833              emsg);
834 }
835
836
837 /**
838  * Store an item in the datastore.  If the item is already present,
839  * the priorities are summed up and the higher expiration time and
840  * lower anonymity level is used.
841  *
842  * @param h handle to the datastore
843  * @param rid reservation ID to use (from "reserve"); use 0 if no
844  *            prior reservation was made
845  * @param key key for the value
846  * @param size number of bytes in data
847  * @param data content stored
848  * @param type type of the content
849  * @param priority priority of the content
850  * @param anonymity anonymity-level for the content
851  * @param replication how often should the content be replicated to other peers?
852  * @param expiration expiration time for the content
853  * @param queue_priority ranking of this request in the priority queue
854  * @param max_queue_size at what queue size should this request be dropped
855  *        (if other requests of higher priority are in the queue)
856  * @param timeout timeout for the operation
857  * @param cont continuation to call when done
858  * @param cont_cls closure for cont
859  * @return NULL if the entry was not queued, otherwise a handle that can be used to
860  *         cancel; note that even if NULL is returned, the callback will be invoked
861  *         (or rather, will already have been invoked)
862  */
863 struct GNUNET_DATASTORE_QueueEntry *
864 GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
865                       uint32_t rid,
866                       const GNUNET_HashCode * key,
867                       size_t size,
868                       const void *data,
869                       enum GNUNET_BLOCK_Type type,
870                       uint32_t priority,
871                       uint32_t anonymity,
872                       uint32_t replication,
873                       struct GNUNET_TIME_Absolute expiration,
874                       unsigned int queue_priority,
875                       unsigned int max_queue_size,
876                       struct GNUNET_TIME_Relative timeout,
877                       GNUNET_DATASTORE_ContinuationWithStatus cont,
878                       void *cont_cls)
879 {
880   struct GNUNET_DATASTORE_QueueEntry *qe;
881   struct DataMessage *dm;
882   size_t msize;
883   union QueueContext qc;
884
885 #if DEBUG_DATASTORE
886   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
887               "Asked to put %u bytes of data under key `%s' for %llu ms\n",
888               size,
889               GNUNET_h2s (key),
890               GNUNET_TIME_absolute_get_remaining (expiration).rel_value);
891 #endif
892   msize = sizeof(struct DataMessage) + size;
893   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
894   qc.sc.cont = cont;
895   qc.sc.cont_cls = cont_cls;
896   qe = make_queue_entry (h, msize,
897                          queue_priority, max_queue_size, timeout,
898                          &process_status_message, &qc);
899   if (qe == NULL)
900     {
901 #if DEBUG_DATASTORE
902       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
903                   "Could not create queue entry for PUT\n");
904 #endif
905       return NULL;
906     }
907   GNUNET_STATISTICS_update (h->stats,
908                             gettext_noop ("# PUT requests executed"),
909                             1,
910                             GNUNET_NO);
911   dm = (struct DataMessage* ) &qe[1];
912   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
913   dm->header.size = htons(msize);
914   dm->rid = htonl(rid);
915   dm->size = htonl( (uint32_t) size);
916   dm->type = htonl(type);
917   dm->priority = htonl(priority);
918   dm->anonymity = htonl(anonymity);
919   dm->replication = htonl (replication);
920   dm->reserved = htonl (0);
921   dm->uid = GNUNET_htonll(0);
922   dm->expiration = GNUNET_TIME_absolute_hton(expiration);
923   dm->key = *key;
924   memcpy (&dm[1], data, size);
925   process_queue (h);
926   return qe;
927 }
928
929
930 /**
931  * Reserve space in the datastore.  This function should be used
932  * to avoid "out of space" failures during a longer sequence of "put"
933  * operations (for example, when a file is being inserted).
934  *
935  * @param h handle to the datastore
936  * @param amount how much space (in bytes) should be reserved (for content only)
937  * @param entries how many entries will be created (to calculate per-entry overhead)
938  * @param queue_priority ranking of this request in the priority queue
939  * @param max_queue_size at what queue size should this request be dropped
940  *        (if other requests of higher priority are in the queue)
941  * @param timeout how long to wait at most for a response (or before dying in queue)
942  * @param cont continuation to call when done; "success" will be set to
943  *             a positive reservation value if space could be reserved.
944  * @param cont_cls closure for cont
945  * @return NULL if the entry was not queued, otherwise a handle that can be used to
946  *         cancel; note that even if NULL is returned, the callback will be invoked
947  *         (or rather, will already have been invoked)
948  */
949 struct GNUNET_DATASTORE_QueueEntry *
950 GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
951                           uint64_t amount,
952                           uint32_t entries,
953                           unsigned int queue_priority,
954                           unsigned int max_queue_size,
955                           struct GNUNET_TIME_Relative timeout,
956                           GNUNET_DATASTORE_ContinuationWithStatus cont,
957                           void *cont_cls)
958 {
959   struct GNUNET_DATASTORE_QueueEntry *qe;
960   struct ReserveMessage *rm;
961   union QueueContext qc;
962
963   if (cont == NULL)
964     cont = &drop_status_cont;
965 #if DEBUG_DATASTORE
966   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
967               "Asked to reserve %llu bytes of data and %u entries'\n",
968               (unsigned long long) amount,
969               (unsigned int) entries);
970 #endif
971   qc.sc.cont = cont;
972   qc.sc.cont_cls = cont_cls;
973   qe = make_queue_entry (h, sizeof(struct ReserveMessage),
974                          queue_priority, max_queue_size, timeout,
975                          &process_status_message, &qc);
976   if (qe == NULL)
977     {
978 #if DEBUG_DATASTORE
979       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
980                   "Could not create queue entry to reserve\n");
981 #endif
982       return NULL;
983     }
984   GNUNET_STATISTICS_update (h->stats,
985                             gettext_noop ("# RESERVE requests executed"),
986                             1,
987                             GNUNET_NO);
988   rm = (struct ReserveMessage*) &qe[1];
989   rm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
990   rm->header.size = htons(sizeof (struct ReserveMessage));
991   rm->entries = htonl(entries);
992   rm->amount = GNUNET_htonll(amount);
993   process_queue (h);
994   return qe;
995 }
996
997
998 /**
999  * Signal that all of the data for which a reservation was made has
1000  * been stored and that whatever excess space might have been reserved
1001  * can now be released.
1002  *
1003  * @param h handle to the datastore
1004  * @param rid reservation ID (value of "success" in original continuation
1005  *        from the "reserve" function).
1006  * @param queue_priority ranking of this request in the priority queue
1007  * @param max_queue_size at what queue size should this request be dropped
1008  *        (if other requests of higher priority are in the queue)
1009  * @param queue_priority ranking of this request in the priority queue
1010  * @param max_queue_size at what queue size should this request be dropped
1011  *        (if other requests of higher priority are in the queue)
1012  * @param timeout how long to wait at most for a response
1013  * @param cont continuation to call when done
1014  * @param cont_cls closure for cont
1015  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1016  *         cancel; note that even if NULL is returned, the callback will be invoked
1017  *         (or rather, will already have been invoked)
1018  */
1019 struct GNUNET_DATASTORE_QueueEntry *
1020 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
1021                                   uint32_t rid,
1022                                   unsigned int queue_priority,
1023                                   unsigned int max_queue_size,
1024                                   struct GNUNET_TIME_Relative timeout,
1025                                   GNUNET_DATASTORE_ContinuationWithStatus cont,
1026                                   void *cont_cls)
1027 {
1028   struct GNUNET_DATASTORE_QueueEntry *qe;
1029   struct ReleaseReserveMessage *rrm;
1030   union QueueContext qc;
1031
1032   if (cont == NULL)
1033     cont = &drop_status_cont;
1034 #if DEBUG_DATASTORE
1035   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1036               "Asked to release reserve %d\n",
1037               rid);
1038 #endif
1039   qc.sc.cont = cont;
1040   qc.sc.cont_cls = cont_cls;
1041   qe = make_queue_entry (h, sizeof(struct ReleaseReserveMessage),
1042                          queue_priority, max_queue_size, timeout,
1043                          &process_status_message, &qc);
1044   if (qe == NULL)
1045     {
1046 #if DEBUG_DATASTORE
1047       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1048                   "Could not create queue entry to release reserve\n");
1049 #endif
1050       return NULL;
1051     }
1052   GNUNET_STATISTICS_update (h->stats,
1053                             gettext_noop ("# RELEASE RESERVE requests executed"),
1054                             1,
1055                             GNUNET_NO);
1056   rrm = (struct ReleaseReserveMessage*) &qe[1];
1057   rrm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
1058   rrm->header.size = htons(sizeof (struct ReleaseReserveMessage));
1059   rrm->rid = htonl(rid);
1060   process_queue (h);
1061   return qe;
1062 }
1063
1064
1065 /**
1066  * Update a value in the datastore.
1067  *
1068  * @param h handle to the datastore
1069  * @param uid identifier for the value
1070  * @param priority how much to increase the priority of the value
1071  * @param expiration new expiration value should be MAX of existing and this argument
1072  * @param queue_priority ranking of this request in the priority queue
1073  * @param max_queue_size at what queue size should this request be dropped
1074  *        (if other requests of higher priority are in the queue)
1075  * @param timeout how long to wait at most for a response
1076  * @param cont continuation to call when done
1077  * @param cont_cls closure for cont
1078  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1079  *         cancel; note that even if NULL is returned, the callback will be invoked
1080  *         (or rather, will already have been invoked)
1081  */
1082 struct GNUNET_DATASTORE_QueueEntry *
1083 GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
1084                          uint64_t uid,
1085                          uint32_t priority,
1086                          struct GNUNET_TIME_Absolute expiration,
1087                          unsigned int queue_priority,
1088                          unsigned int max_queue_size,
1089                          struct GNUNET_TIME_Relative timeout,
1090                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1091                          void *cont_cls)
1092 {
1093   struct GNUNET_DATASTORE_QueueEntry *qe;
1094   struct UpdateMessage *um;
1095   union QueueContext qc;
1096
1097   if (cont == NULL)
1098     cont = &drop_status_cont;
1099 #if DEBUG_DATASTORE
1100   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1101               "Asked to update entry %llu raising priority by %u and expiration to %llu\n",
1102               uid,
1103               (unsigned int) priority,
1104               (unsigned long long) expiration.abs_value);
1105 #endif
1106   qc.sc.cont = cont;
1107   qc.sc.cont_cls = cont_cls;
1108   qe = make_queue_entry (h, sizeof(struct UpdateMessage),
1109                          queue_priority, max_queue_size, timeout,
1110                          &process_status_message, &qc);
1111   if (qe == NULL)
1112     {
1113 #if DEBUG_DATASTORE
1114       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1115                   "Could not create queue entry for UPDATE\n");
1116 #endif
1117       return NULL;
1118     }
1119   GNUNET_STATISTICS_update (h->stats,
1120                             gettext_noop ("# UPDATE requests executed"),
1121                             1,
1122                             GNUNET_NO);
1123   um = (struct UpdateMessage*) &qe[1];
1124   um->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
1125   um->header.size = htons(sizeof (struct UpdateMessage));
1126   um->priority = htonl(priority);
1127   um->expiration = GNUNET_TIME_absolute_hton(expiration);
1128   um->uid = GNUNET_htonll(uid);
1129   process_queue (h);
1130   return qe;
1131 }
1132
1133
1134 /**
1135  * Explicitly remove some content from the database.
1136  * The "cont"inuation will be called with status
1137  * "GNUNET_OK" if content was removed, "GNUNET_NO"
1138  * if no matching entry was found and "GNUNET_SYSERR"
1139  * on all other types of errors.
1140  *
1141  * @param h handle to the datastore
1142  * @param key key for the value
1143  * @param size number of bytes in data
1144  * @param data content stored
1145  * @param queue_priority ranking of this request in the priority queue
1146  * @param max_queue_size at what queue size should this request be dropped
1147  *        (if other requests of higher priority are in the queue)
1148  * @param timeout how long to wait at most for a response
1149  * @param cont continuation to call when done
1150  * @param cont_cls closure for cont
1151  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1152  *         cancel; note that even if NULL is returned, the callback will be invoked
1153  *         (or rather, will already have been invoked)
1154  */
1155 struct GNUNET_DATASTORE_QueueEntry *
1156 GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
1157                          const GNUNET_HashCode *key,
1158                          size_t size,
1159                          const void *data,
1160                          unsigned int queue_priority,
1161                          unsigned int max_queue_size,
1162                          struct GNUNET_TIME_Relative timeout,
1163                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1164                          void *cont_cls)
1165 {
1166   struct GNUNET_DATASTORE_QueueEntry *qe;
1167   struct DataMessage *dm;
1168   size_t msize;
1169   union QueueContext qc;
1170
1171   if (cont == NULL)
1172     cont = &drop_status_cont;
1173 #if DEBUG_DATASTORE
1174   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1175               "Asked to remove %u bytes under key `%s'\n",
1176               size,
1177               GNUNET_h2s (key));
1178 #endif
1179   qc.sc.cont = cont;
1180   qc.sc.cont_cls = cont_cls;
1181   msize = sizeof(struct DataMessage) + size;
1182   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1183   qe = make_queue_entry (h, msize,
1184                          queue_priority, max_queue_size, timeout,
1185                          &process_status_message, &qc);
1186   if (qe == NULL)
1187     {
1188 #if DEBUG_DATASTORE
1189       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1190                   "Could not create queue entry for REMOVE\n");
1191 #endif
1192       return NULL;
1193     }
1194   GNUNET_STATISTICS_update (h->stats,
1195                             gettext_noop ("# REMOVE requests executed"),
1196                             1,
1197                             GNUNET_NO);
1198   dm = (struct DataMessage*) &qe[1];
1199   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
1200   dm->header.size = htons(msize);
1201   dm->rid = htonl(0);
1202   dm->size = htonl(size);
1203   dm->type = htonl(0);
1204   dm->priority = htonl(0);
1205   dm->anonymity = htonl(0);
1206   dm->uid = GNUNET_htonll(0);
1207   dm->expiration = GNUNET_TIME_absolute_hton(GNUNET_TIME_UNIT_ZERO_ABS);
1208   dm->key = *key;
1209   memcpy (&dm[1], data, size);
1210   process_queue (h);
1211   return qe;
1212 }
1213
1214
1215 /**
1216  * Type of a function to call when we receive a message
1217  * from the service.
1218  *
1219  * @param cls closure
1220  * @param msg message received, NULL on timeout or fatal error
1221  */
1222 static void 
1223 process_result_message (void *cls,
1224                         const struct GNUNET_MessageHeader *msg)
1225 {
1226   struct GNUNET_DATASTORE_Handle *h = cls;
1227   struct GNUNET_DATASTORE_QueueEntry *qe;
1228   struct ResultContext rc;
1229   const struct DataMessage *dm;
1230
1231   h->in_receive = GNUNET_NO;
1232   if (h->skip_next_messages > 0)
1233     {
1234       h->skip_next_messages--;
1235       process_queue (h);
1236       return;
1237     }
1238   if (msg == NULL)
1239     {
1240       qe = h->queue_head;
1241       GNUNET_assert (NULL != qe);
1242       if (qe->was_transmitted == GNUNET_YES)
1243         {
1244           rc = qe->qc.rc;
1245           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1246                       _("Failed to receive response from database.\n"));
1247           do_disconnect (h);
1248           free_queue_entry (qe);
1249           if (rc.proc != NULL)
1250             rc.proc (rc.proc_cls,
1251                      NULL, 0, NULL, 0, 0, 0, 
1252                      GNUNET_TIME_UNIT_ZERO_ABS, 0);    
1253         }
1254       else
1255         process_queue (h);
1256       return;
1257     }
1258   if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) 
1259     {
1260       GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader));
1261       qe = h->queue_head;
1262       rc = qe->qc.rc;
1263       GNUNET_assert (GNUNET_YES == qe->was_transmitted);
1264       free_queue_entry (qe);
1265 #if DEBUG_DATASTORE
1266       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1267                   "Received end of result set, new queue size is %u\n",
1268                   h->queue_size);
1269 #endif
1270       if (rc.proc != NULL)
1271         rc.proc (rc.proc_cls,
1272                  NULL, 0, NULL, 0, 0, 0, 
1273                  GNUNET_TIME_UNIT_ZERO_ABS, 0); 
1274       h->retry_time.rel_value = 0;
1275       h->result_count = 0;
1276       process_queue (h);
1277       return;
1278     }
1279   qe = h->queue_head;
1280   rc = qe->qc.rc;
1281   GNUNET_assert (GNUNET_YES == qe->was_transmitted);
1282   if ( (ntohs(msg->size) < sizeof(struct DataMessage)) ||
1283        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) ||
1284        (ntohs(msg->size) != sizeof(struct DataMessage) + ntohl (((const struct DataMessage*)msg)->size)) )
1285     {
1286       GNUNET_break (0);
1287       free_queue_entry (qe);
1288       h->retry_time = GNUNET_TIME_UNIT_ZERO;
1289       do_disconnect (h);
1290       if (rc.proc != NULL)
1291         rc.proc (rc.proc_cls,
1292                  NULL, 0, NULL, 0, 0, 0, 
1293                  GNUNET_TIME_UNIT_ZERO_ABS, 0);
1294       return;
1295     }
1296   GNUNET_STATISTICS_update (h->stats,
1297                             gettext_noop ("# Results received"),
1298                             1,
1299                             GNUNET_NO);
1300   dm = (const struct DataMessage*) msg;
1301 #if DEBUG_DATASTORE
1302   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1303               "Received result %llu with type %u and size %u with key %s\n",
1304               (unsigned long long) GNUNET_ntohll(dm->uid),
1305               ntohl(dm->type),
1306               ntohl(dm->size),
1307               GNUNET_h2s(&dm->key));
1308 #endif
1309   free_queue_entry (qe);
1310   h->retry_time.rel_value = 0;
1311   process_queue (h);
1312   if (rc.proc != NULL)
1313     rc.proc (rc.proc_cls,
1314              &dm->key,
1315              ntohl(dm->size),
1316              &dm[1],
1317              ntohl(dm->type),
1318              ntohl(dm->priority),
1319              ntohl(dm->anonymity),
1320              GNUNET_TIME_absolute_ntoh(dm->expiration), 
1321              GNUNET_ntohll(dm->uid));
1322 }
1323
1324
1325 /**
1326  * Get a random value from the datastore for content replication.
1327  * Returns a single, random value among those with the highest
1328  * replication score, lowering positive replication scores by one for
1329  * the chosen value (if only content with a replication score exists,
1330  * a random value is returned and replication scores are not changed).
1331  *
1332  * @param h handle to the datastore
1333  * @param queue_priority ranking of this request in the priority queue
1334  * @param max_queue_size at what queue size should this request be dropped
1335  *        (if other requests of higher priority are in the queue)
1336  * @param timeout how long to wait at most for a response
1337  * @param proc function to call on a random value; it
1338  *        will be called once with a value (if available)
1339  *        and always once with a value of NULL.
1340  * @param proc_cls closure for proc
1341  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1342  *         cancel
1343  */
1344 struct GNUNET_DATASTORE_QueueEntry *
1345 GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
1346                                       unsigned int queue_priority,
1347                                       unsigned int max_queue_size,
1348                                       struct GNUNET_TIME_Relative timeout,
1349                                       GNUNET_DATASTORE_DatumProcessor proc, 
1350                                       void *proc_cls)
1351 {
1352   struct GNUNET_DATASTORE_QueueEntry *qe;
1353   struct GNUNET_MessageHeader *m;
1354   union QueueContext qc;
1355
1356   GNUNET_assert (NULL != proc);
1357 #if DEBUG_DATASTORE
1358   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1359               "Asked to get replication entry in %llu ms\n",
1360               (unsigned long long) timeout.rel_value);
1361 #endif
1362   qc.rc.proc = proc;
1363   qc.rc.proc_cls = proc_cls;
1364   qe = make_queue_entry (h, sizeof(struct GNUNET_MessageHeader),
1365                          queue_priority, max_queue_size, timeout,
1366                          &process_result_message, &qc);
1367   if (qe == NULL)
1368     {
1369 #if DEBUG_DATASTORE
1370       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1371                   "Could not create queue entry for GET REPLICATION\n");
1372 #endif
1373       return NULL;    
1374     }
1375   GNUNET_STATISTICS_update (h->stats,
1376                             gettext_noop ("# GET REPLICATION requests executed"),
1377                             1,
1378                             GNUNET_NO);
1379   m = (struct GNUNET_MessageHeader*) &qe[1];
1380   m->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION);
1381   m->size = htons(sizeof (struct GNUNET_MessageHeader));
1382   process_queue (h);
1383   return qe;
1384 }
1385
1386
1387 /**
1388  * Get a single zero-anonymity value from the datastore.
1389  *
1390  * @param h handle to the datastore
1391  * @param offset offset of the result (mod #num-results); set to
1392  *               a random 64-bit value initially; then increment by
1393  *               one each time; detect that all results have been found by uid
1394  *               being again the first uid ever returned.
1395  * @param queue_priority ranking of this request in the priority queue
1396  * @param max_queue_size at what queue size should this request be dropped
1397  *        (if other requests of higher priority are in the queue)
1398  * @param timeout how long to wait at most for a response
1399  * @param type allowed type for the operation (never zero)
1400  * @param proc function to call on a random value; it
1401  *        will be called once with a value (if available)
1402  *        or with NULL if none value exists.
1403  * @param proc_cls closure for proc
1404  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1405  *         cancel
1406  */
1407 struct GNUNET_DATASTORE_QueueEntry *
1408 GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1409                                      uint64_t offset,
1410                                      unsigned int queue_priority,
1411                                      unsigned int max_queue_size,
1412                                      struct GNUNET_TIME_Relative timeout,
1413                                      enum GNUNET_BLOCK_Type type,
1414                                      GNUNET_DATASTORE_DatumProcessor proc, 
1415                                      void *proc_cls)
1416 {
1417   struct GNUNET_DATASTORE_QueueEntry *qe;
1418   struct GetZeroAnonymityMessage *m;
1419   union QueueContext qc;
1420
1421   GNUNET_assert (NULL != proc);
1422   GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY);
1423 #if DEBUG_DATASTORE
1424   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1425               "Asked to get %llu-th zero-anonymity entry of type %d in %llu ms\n",
1426               (unsigned long long) offset,
1427               type,
1428               (unsigned long long) timeout.rel_value);
1429 #endif
1430   qc.rc.proc = proc;
1431   qc.rc.proc_cls = proc_cls;
1432   qe = make_queue_entry (h, sizeof(struct GetZeroAnonymityMessage),
1433                          queue_priority, max_queue_size, timeout,
1434                          &process_result_message, &qc);
1435   if (qe == NULL)
1436     {
1437 #if DEBUG_DATASTORE
1438       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1439                   "Could not create queue entry for zero-anonymity procation\n");
1440 #endif
1441       return NULL;    
1442     }
1443   GNUNET_STATISTICS_update (h->stats,
1444                             gettext_noop ("# GET ZERO ANONYMITY requests executed"),
1445                             1,
1446                             GNUNET_NO);
1447   m = (struct GetZeroAnonymityMessage*) &qe[1];
1448   m->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY);
1449   m->header.size = htons(sizeof (struct GetZeroAnonymityMessage));
1450   m->type = htonl ((uint32_t) type);
1451   m->offset = GNUNET_htonll (offset);
1452   process_queue (h);
1453   return qe;
1454 }
1455
1456
1457 /**
1458  * Get a result for a particular key from the datastore.  The processor
1459  * will only be called once.
1460  *
1461  * @param h handle to the datastore
1462  * @param offset offset of the result (mod #num-results); set to
1463  *               a random 64-bit value initially; then increment by
1464  *               one each time; detect that all results have been found by uid
1465  *               being again the first uid ever returned.
1466  * @param key maybe NULL (to match all entries)
1467  * @param type desired type, 0 for any
1468  * @param queue_priority ranking of this request in the priority queue
1469  * @param max_queue_size at what queue size should this request be dropped
1470  *        (if other requests of higher priority are in the queue)
1471  * @param timeout how long to wait at most for a response
1472  * @param proc function to call on each matching value;
1473  *        will be called once with a NULL value at the end
1474  * @param proc_cls closure for proc
1475  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1476  *         cancel
1477  */
1478 struct GNUNET_DATASTORE_QueueEntry *
1479 GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h,
1480                           uint64_t offset,
1481                           const GNUNET_HashCode * key,
1482                           enum GNUNET_BLOCK_Type type,
1483                           unsigned int queue_priority,
1484                           unsigned int max_queue_size,
1485                           struct GNUNET_TIME_Relative timeout,
1486                           GNUNET_DATASTORE_DatumProcessor proc, 
1487                           void *proc_cls)
1488 {
1489   struct GNUNET_DATASTORE_QueueEntry *qe;
1490   struct GetMessage *gm;
1491   union QueueContext qc;
1492
1493   GNUNET_assert (NULL != proc);
1494 #if DEBUG_DATASTORE
1495   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1496               "Asked to look for data of type %u under key `%s'\n",
1497               (unsigned int) type,
1498               GNUNET_h2s (key));
1499 #endif
1500   qc.rc.proc = proc;
1501   qc.rc.proc_cls = proc_cls;
1502   qe = make_queue_entry (h, sizeof(struct GetMessage),
1503                          queue_priority, max_queue_size, timeout,
1504                          &process_result_message, &qc);
1505   if (qe == NULL)
1506     {
1507 #if DEBUG_DATASTORE
1508       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1509                   "Could not queue request for `%s'\n",
1510                   GNUNET_h2s (key));
1511 #endif
1512       return NULL;
1513     }
1514   GNUNET_STATISTICS_update (h->stats,
1515                             gettext_noop ("# GET requests executed"),
1516                             1,
1517                             GNUNET_NO);
1518   gm = (struct GetMessage*) &qe[1];
1519   gm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1520   gm->type = htonl(type);
1521   gm->offset = GNUNET_htonll (offset);
1522   if (key != NULL)
1523     {
1524       gm->header.size = htons(sizeof (struct GetMessage));
1525       gm->key = *key;
1526     }
1527   else
1528     {
1529       gm->header.size = htons(sizeof (struct GetMessage) - sizeof(GNUNET_HashCode));
1530     }
1531   process_queue (h);
1532   return qe;
1533 }
1534
1535
1536 /**
1537  * Cancel a datastore operation.  The final callback from the
1538  * operation must not have been done yet.
1539  * 
1540  * @param qe operation to cancel
1541  */
1542 void
1543 GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
1544 {
1545   struct GNUNET_DATASTORE_Handle *h;
1546
1547   GNUNET_assert (GNUNET_SYSERR != qe->was_transmitted);
1548   h = qe->h;
1549 #if DEBUG_DATASTORE
1550   GNUNET_log  (GNUNET_ERROR_TYPE_DEBUG,
1551                "Pending DATASTORE request %p cancelled (%d, %d)\n",
1552                qe,
1553                qe->was_transmitted,
1554                h->queue_head == qe);
1555 #endif
1556   if (GNUNET_YES == qe->was_transmitted) 
1557     {
1558       free_queue_entry (qe);
1559       h->skip_next_messages++;
1560       return;
1561     }
1562   free_queue_entry (qe);
1563   process_queue (h);
1564 }
1565
1566
1567 /* end of datastore_api.c */