67125c4511dd3b6b9b23336629b79832ec5217db
[oweals/gnunet.git] / src / datastore / datastore_api.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/datastore_api.c
23  * @brief Management for the datastore for files stored on a GNUnet node.  Implements
24  *        a priority queue for requests (with timeouts).
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet_arm_service.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_datastore_service.h"
31 #include "gnunet_statistics_service.h"
32 #include "datastore.h"
33
34 /**
35  * If a client stopped asking for more results, how many more do
36  * we receive from the DB before killing the connection?  Trade-off
37  * between re-doing TCP handshakes and (needlessly) receiving 
38  * useless results.
39  */
40 #define MAX_EXCESS_RESULTS 8
41
42 /**
43  * Context for processing status messages.
44  */
45 struct StatusContext
46 {
47   /**
48    * Continuation to call with the status.
49    */
50   GNUNET_DATASTORE_ContinuationWithStatus cont;
51
52   /**
53    * Closure for cont.
54    */
55   void *cont_cls;
56
57 };
58
59
60 /**
61  * Context for processing result messages.
62  */
63 struct ResultContext
64 {
65   /**
66    * Function to call with the result.
67    */
68   GNUNET_DATASTORE_DatumProcessor proc;
69
70   /**
71    * Closure for proc.
72    */
73   void *proc_cls;
74
75 };
76
77
78 /**
79  *  Context for a queue operation.
80  */
81 union QueueContext
82 {
83
84   struct StatusContext sc;
85   
86   struct ResultContext rc;
87
88 };
89
90
91
92 /**
93  * Entry in our priority queue.
94  */
95 struct GNUNET_DATASTORE_QueueEntry
96 {
97
98   /**
99    * This is a linked list.
100    */
101   struct GNUNET_DATASTORE_QueueEntry *next;
102
103   /**
104    * This is a linked list.
105    */
106   struct GNUNET_DATASTORE_QueueEntry *prev;
107
108   /**
109    * Handle to the master context.
110    */
111   struct GNUNET_DATASTORE_Handle *h;
112
113   /**
114    * Response processor (NULL if we are not waiting for a response).
115    * This struct should be used for the closure, function-specific
116    * arguments can be passed via 'qc'.
117    */
118   GNUNET_CLIENT_MessageHandler response_proc;
119
120   /**
121    * Function to call after transmission of the request.
122    */
123   GNUNET_DATASTORE_ContinuationWithStatus cont;
124    
125   /**
126    * Closure for 'cont'.
127    */
128   void *cont_cls;
129
130   /**
131    * Context for the operation.
132    */
133   union QueueContext qc;
134
135   /**
136    * Task for timeout signalling.
137    */
138   GNUNET_SCHEDULER_TaskIdentifier task;
139
140   /**
141    * Timeout for the current operation.
142    */
143   struct GNUNET_TIME_Absolute timeout;
144
145   /**
146    * Priority in the queue.
147    */
148   unsigned int priority;
149
150   /**
151    * Maximum allowed length of queue (otherwise
152    * this request should be discarded).
153    */
154   unsigned int max_queue;
155
156   /**
157    * Number of bytes in the request message following
158    * this struct.  32-bit value for nicer memory
159    * access (and overall struct alignment).
160    */
161   uint32_t message_size;
162
163   /**
164    * Has this message been transmitted to the service?
165    * Only ever GNUNET_YES for the head of the queue.
166    * Note that the overall struct should end at a 
167    * multiple of 64 bits.
168    */
169   int was_transmitted;
170   
171 };
172
173 /**
174  * Handle to the datastore service. 
175  */
176 struct GNUNET_DATASTORE_Handle
177 {
178
179   /**
180    * Our configuration.
181    */
182   const struct GNUNET_CONFIGURATION_Handle *cfg;
183
184   /**
185    * Current connection to the datastore service.
186    */
187   struct GNUNET_CLIENT_Connection *client;
188
189   /**
190    * Handle for statistics.
191    */
192   struct GNUNET_STATISTICS_Handle *stats;
193
194   /**
195    * Current transmit handle.
196    */
197   struct GNUNET_CLIENT_TransmitHandle *th;
198
199   /**
200    * Current head of priority queue.
201    */
202   struct GNUNET_DATASTORE_QueueEntry *queue_head;
203
204   /**
205    * Current tail of priority queue.
206    */
207   struct GNUNET_DATASTORE_QueueEntry *queue_tail;
208
209   /**
210    * Task for trying to reconnect.
211    */
212   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
213
214   /**
215    * How quickly should we retry?  Used for exponential back-off on
216    * connect-errors.
217    */
218   struct GNUNET_TIME_Relative retry_time;
219
220   /**
221    * Number of entries in the queue.
222    */
223   unsigned int queue_size;
224
225   /**
226    * Number of results we're receiving for the current query
227    * after application stopped to care.  Used to determine when
228    * to reset the connection.
229    */
230   unsigned int result_count;
231
232   /**
233    * Are we currently trying to receive from the service?
234    */
235   int in_receive;
236
237   /**
238    * We should ignore the next message(s) from the service.
239    */
240   unsigned int skip_next_messages;
241
242 };
243
244
245
246 /**
247  * Connect to the datastore service.
248  *
249  * @param cfg configuration to use
250  * @return handle to use to access the service
251  */
252 struct GNUNET_DATASTORE_Handle *
253 GNUNET_DATASTORE_connect (const struct
254                           GNUNET_CONFIGURATION_Handle
255                           *cfg)
256 {
257   struct GNUNET_CLIENT_Connection *c;
258   struct GNUNET_DATASTORE_Handle *h;
259   
260   c = GNUNET_CLIENT_connect ("datastore", cfg);
261   if (c == NULL)
262     return NULL; /* oops */
263   h = GNUNET_malloc (sizeof(struct GNUNET_DATASTORE_Handle) + 
264                      GNUNET_SERVER_MAX_MESSAGE_SIZE - 1);
265   h->client = c;
266   h->cfg = cfg;
267   h->stats = GNUNET_STATISTICS_create ("datastore-api",
268                                        cfg);
269   return h;
270 }
271
272
273 /**
274  * Transmit DROP message to datastore service.
275  *
276  * @param cls the 'struct GNUNET_DATASTORE_Handle'
277  * @param size number of bytes that can be copied to buf
278  * @param buf where to copy the drop message
279  * @return number of bytes written to buf
280  */
281 static size_t
282 transmit_drop (void *cls,
283                size_t size, 
284                void *buf)
285 {
286   struct GNUNET_DATASTORE_Handle *h = cls;
287   struct GNUNET_MessageHeader *hdr;
288   
289   if (buf == NULL)
290     {
291       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
292                   _("Failed to transmit request to drop database.\n"));
293       GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
294       return 0;
295     }
296   GNUNET_assert (size >= sizeof(struct GNUNET_MessageHeader));
297   hdr = buf;
298   hdr->size = htons(sizeof(struct GNUNET_MessageHeader));
299   hdr->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
300   GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
301   return sizeof(struct GNUNET_MessageHeader);
302 }
303
304
305 /**
306  * Disconnect from the datastore service (and free
307  * associated resources).
308  *
309  * @param h handle to the datastore
310  * @param drop set to GNUNET_YES to delete all data in datastore (!)
311  */
312 void
313 GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
314                              int drop)
315 {
316   struct GNUNET_DATASTORE_QueueEntry *qe;
317
318   if (NULL != h->th)
319     {
320       GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
321       h->th = NULL;
322     }
323   if (h->client != NULL)
324     {
325       GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
326       h->client = NULL;
327     }
328   if (h->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
329     {
330       GNUNET_SCHEDULER_cancel (h->reconnect_task);
331       h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
332     }
333   while (NULL != (qe = h->queue_head))
334     {
335       GNUNET_assert (NULL != qe->response_proc);
336       qe->response_proc (h, NULL);
337     }
338   if (GNUNET_YES == drop) 
339     {
340       h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
341       if (h->client != NULL)
342         {
343           if (NULL != 
344               GNUNET_CLIENT_notify_transmit_ready (h->client,
345                                                    sizeof(struct GNUNET_MessageHeader),
346                                                    GNUNET_TIME_UNIT_MINUTES,
347                                                    GNUNET_YES,
348                                                    &transmit_drop,
349                                                    h))
350             return;
351           GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
352           h->client = NULL;
353         }
354       GNUNET_break (0);
355     }
356   GNUNET_STATISTICS_destroy (h->stats,
357                              GNUNET_NO);
358   h->stats = NULL;
359   GNUNET_free (h);
360 }
361
362
363 /**
364  * A request has timed out (before being transmitted to the service).
365  *
366  * @param cls the 'struct GNUNET_DATASTORE_QueueEntry'
367  * @param tc scheduler context
368  */
369 static void
370 timeout_queue_entry (void *cls,
371                      const struct GNUNET_SCHEDULER_TaskContext *tc)
372 {
373   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
374
375   GNUNET_STATISTICS_update (qe->h->stats,
376                             gettext_noop ("# queue entry timeouts"),
377                             1,
378                             GNUNET_NO);
379   qe->task = GNUNET_SCHEDULER_NO_TASK;
380   GNUNET_assert (qe->was_transmitted == GNUNET_NO); 
381   qe->response_proc (qe->h, NULL);
382 }
383
384
385 /**
386  * Create a new entry for our priority queue (and possibly discard other entires if
387  * the queue is getting too long).
388  *
389  * @param h handle to the datastore
390  * @param msize size of the message to queue
391  * @param queue_priority priority of the entry
392  * @param max_queue_size at what queue size should this request be dropped
393  *        (if other requests of higher priority are in the queue)
394  * @param timeout timeout for the operation
395  * @param response_proc function to call with replies (can be NULL)
396  * @param qc client context (NOT a closure for response_proc)
397  * @return NULL if the queue is full 
398  */
399 static struct GNUNET_DATASTORE_QueueEntry *
400 make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
401                   size_t msize,
402                   unsigned int queue_priority,
403                   unsigned int max_queue_size,
404                   struct GNUNET_TIME_Relative timeout,
405                   GNUNET_CLIENT_MessageHandler response_proc,            
406                   const union QueueContext *qc)
407 {
408   struct GNUNET_DATASTORE_QueueEntry *ret;
409   struct GNUNET_DATASTORE_QueueEntry *pos;
410   unsigned int c;
411
412   c = 0;
413   pos = h->queue_head;
414   while ( (pos != NULL) &&
415           (c < max_queue_size) &&
416           (pos->priority >= queue_priority) )
417     {
418       c++;
419       pos = pos->next;
420     }
421   if (c >= max_queue_size)
422     {
423       GNUNET_STATISTICS_update (h->stats,
424                                 gettext_noop ("# queue overflows"),
425                                 1,
426                                 GNUNET_NO);
427       return NULL;
428     }
429   ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize);
430   ret->h = h;
431   ret->response_proc = response_proc;
432   ret->qc = *qc;
433   ret->timeout = GNUNET_TIME_relative_to_absolute (timeout);
434   ret->priority = queue_priority;
435   ret->max_queue = max_queue_size;
436   ret->message_size = msize;
437   ret->was_transmitted = GNUNET_NO;
438   if (pos == NULL)
439     {
440       /* append at the tail */
441       pos = h->queue_tail;
442     }
443   else
444     {
445       pos = pos->prev; 
446       /* do not insert at HEAD if HEAD query was already
447          transmitted and we are still receiving replies! */
448       if ( (pos == NULL) &&
449            (h->queue_head->was_transmitted) )
450         pos = h->queue_head;
451     }
452   c++;
453   GNUNET_STATISTICS_update (h->stats,
454                             gettext_noop ("# queue entries created"),
455                             1,
456                             GNUNET_NO);
457   GNUNET_CONTAINER_DLL_insert_after (h->queue_head,
458                                      h->queue_tail,
459                                      pos,
460                                      ret);
461   h->queue_size++;
462   ret->task = GNUNET_SCHEDULER_add_delayed (timeout,
463                                             &timeout_queue_entry,
464                                             ret);
465   pos = ret->next;
466   while (pos != NULL) 
467     {
468       if ( (pos->max_queue < h->queue_size) &&
469            (pos->was_transmitted == GNUNET_NO) )
470         {
471           GNUNET_assert (pos->response_proc != NULL);
472           /* move 'pos' element to head so that it will be 
473              killed on 'NULL' call below */
474           GNUNET_CONTAINER_DLL_remove (h->queue_head,
475                                        h->queue_tail,
476                                        pos);
477           GNUNET_CONTAINER_DLL_insert (h->queue_head,
478                                        h->queue_tail,
479                                        pos);
480           GNUNET_STATISTICS_update (h->stats,
481                                     gettext_noop ("# Requests dropped from datastore queue"),
482                                     1,
483                                     GNUNET_NO);
484           GNUNET_assert (h->queue_head == pos);
485           pos->response_proc (h, NULL);
486           break;
487         }
488       pos = pos->next;
489     }
490   return ret;
491 }
492
493
494 /**
495  * Process entries in the queue (or do nothing if we are already
496  * doing so).
497  * 
498  * @param h handle to the datastore
499  */
500 static void
501 process_queue (struct GNUNET_DATASTORE_Handle *h);
502
503
504 /**
505  * Try reconnecting to the datastore service.
506  *
507  * @param cls the 'struct GNUNET_DATASTORE_Handle'
508  * @param tc scheduler context
509  */
510 static void
511 try_reconnect (void *cls,
512                const struct GNUNET_SCHEDULER_TaskContext *tc)
513 {
514   struct GNUNET_DATASTORE_Handle *h = cls;
515
516   if (h->retry_time.rel_value < GNUNET_CONSTANTS_SERVICE_RETRY.rel_value)
517     h->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY;
518   else
519     h->retry_time = GNUNET_TIME_relative_multiply (h->retry_time, 2);
520   if (h->retry_time.rel_value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.rel_value)
521     h->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
522   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
523   h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
524   if (h->client == NULL)
525     {
526       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
527                   "DATASTORE reconnect failed (fatally)\n");
528       return;
529     }
530   GNUNET_STATISTICS_update (h->stats,
531                             gettext_noop ("# datastore connections (re)created"),
532                             1,
533                             GNUNET_NO);
534 #if DEBUG_DATASTORE
535   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
536               "Reconnected to DATASTORE\n");
537 #endif
538   process_queue (h);
539 }
540
541
542 /**
543  * Disconnect from the service and then try reconnecting to the datastore service
544  * after some delay.
545  *
546  * @param h handle to datastore to disconnect and reconnect
547  */
548 static void
549 do_disconnect (struct GNUNET_DATASTORE_Handle *h)
550 {
551   if (h->client == NULL)
552     {
553 #if DEBUG_DATASTORE
554       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
555                   "client NULL in disconnect, will not try to reconnect\n");
556 #endif
557       return;
558     }
559 #if 0
560   GNUNET_STATISTICS_update (stats,
561                             gettext_noop ("# reconnected to DATASTORE"),
562                             1,
563                             GNUNET_NO);
564 #endif
565   GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
566   h->skip_next_messages = 0;
567   h->client = NULL;
568   h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->retry_time,
569                                                     &try_reconnect,
570                                                     h);      
571 }
572
573
574 /**
575  * Function called whenever we receive a message from
576  * the service.  Calls the appropriate handler.
577  *
578  * @param cls the 'struct GNUNET_DATASTORE_Handle'
579  * @param msg the received message
580  */
581 static void 
582 receive_cb (void *cls,
583             const struct GNUNET_MessageHeader *msg)
584 {
585   struct GNUNET_DATASTORE_Handle *h = cls;
586   struct GNUNET_DATASTORE_QueueEntry *qe;
587
588   h->in_receive = GNUNET_NO;
589   if (h->skip_next_messages > 0)
590     {
591       h->skip_next_messages--;
592       process_queue (h);
593       return;
594    } 
595   if (NULL == (qe = h->queue_head))
596     {
597       GNUNET_break (0);
598       process_queue (h);
599       return; 
600     }
601   qe->response_proc (h, msg);
602 }
603
604
605 /**
606  * Transmit request from queue to datastore service.
607  *
608  * @param cls the 'struct GNUNET_DATASTORE_Handle'
609  * @param size number of bytes that can be copied to buf
610  * @param buf where to copy the drop message
611  * @return number of bytes written to buf
612  */
613 static size_t
614 transmit_request (void *cls,
615                   size_t size, 
616                   void *buf)
617 {
618   struct GNUNET_DATASTORE_Handle *h = cls;
619   struct GNUNET_DATASTORE_QueueEntry *qe;
620   size_t msize;
621
622   h->th = NULL;
623   if (NULL == (qe = h->queue_head))
624     return 0; /* no entry in queue */
625   if (buf == NULL)
626     {
627       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
628                   _("Failed to transmit request to DATASTORE.\n"));
629       GNUNET_STATISTICS_update (h->stats,
630                                 gettext_noop ("# transmission request failures"),
631                                 1,
632                                 GNUNET_NO);
633       do_disconnect (h);
634       return 0;
635     }
636   if (size < (msize = qe->message_size))
637     {
638       process_queue (h);
639       return 0;
640     }
641  #if DEBUG_DATASTORE
642   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
643               "Transmitting %u byte request to DATASTORE\n",
644               msize);
645 #endif
646   memcpy (buf, &qe[1], msize);
647   qe->was_transmitted = GNUNET_YES;
648   GNUNET_SCHEDULER_cancel (qe->task);
649   qe->task = GNUNET_SCHEDULER_NO_TASK;
650   GNUNET_assert (GNUNET_NO == h->in_receive);
651   h->in_receive = GNUNET_YES;
652   GNUNET_CLIENT_receive (h->client,
653                          &receive_cb,
654                          h,
655                          GNUNET_TIME_absolute_get_remaining (qe->timeout));
656   GNUNET_STATISTICS_update (h->stats,
657                             gettext_noop ("# bytes sent to datastore"),
658                             1,
659                             GNUNET_NO);
660   return msize;
661 }
662
663
664 /**
665  * Process entries in the queue (or do nothing if we are already
666  * doing so).
667  * 
668  * @param h handle to the datastore
669  */
670 static void
671 process_queue (struct GNUNET_DATASTORE_Handle *h)
672 {
673   struct GNUNET_DATASTORE_QueueEntry *qe;
674
675   if (NULL == (qe = h->queue_head))
676     {
677 #if DEBUG_DATASTORE > 1
678       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
679                   "Queue empty\n");
680 #endif
681       return; /* no entry in queue */
682     }
683   if (qe->was_transmitted == GNUNET_YES)
684     {
685 #if DEBUG_DATASTORE > 1
686       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
687                   "Head request already transmitted\n");
688 #endif
689       return; /* waiting for replies */
690     }
691   if (h->th != NULL)
692     {
693 #if DEBUG_DATASTORE > 1
694       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
695                   "Pending transmission request\n");
696 #endif
697       return; /* request pending */
698     }
699   if (h->client == NULL)
700     {
701 #if DEBUG_DATASTORE > 1
702       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
703                   "Not connected\n");
704 #endif
705       return; /* waiting for reconnect */
706     }
707   if (GNUNET_YES == h->in_receive)
708     {
709       /* wait for response to previous query */
710       return; 
711     }
712 #if DEBUG_DATASTORE
713   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
714               "Queueing %u byte request to DATASTORE\n",
715               qe->message_size);
716 #endif
717   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
718                                                qe->message_size,
719                                                GNUNET_TIME_absolute_get_remaining (qe->timeout),
720                                                GNUNET_YES,
721                                                &transmit_request,
722                                                h);
723   GNUNET_assert (GNUNET_NO == h->in_receive);
724   GNUNET_break (NULL != h->th);
725 }
726
727
728 /**
729  * Dummy continuation used to do nothing (but be non-zero).
730  *
731  * @param cls closure
732  * @param result result 
733  * @param emsg error message
734  */
735 static void
736 drop_status_cont (void *cls, int32_t result, const char *emsg)
737 {
738   /* do nothing */
739 }
740
741
742 /**
743  * Free a queue entry.  Removes the given entry from the
744  * queue and releases associated resources.  Does NOT
745  * call the callback.
746  * 
747  * @param qe entry to free.
748  */
749 static void
750 free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
751 {
752   struct GNUNET_DATASTORE_Handle *h = qe->h;
753
754   GNUNET_CONTAINER_DLL_remove (h->queue_head,
755                                h->queue_tail,
756                                qe);
757   if (qe->task != GNUNET_SCHEDULER_NO_TASK)
758     {
759       GNUNET_SCHEDULER_cancel (qe->task);
760       qe->task = GNUNET_SCHEDULER_NO_TASK;
761     }
762   h->queue_size--;
763   qe->was_transmitted = GNUNET_SYSERR; /* use-after-free warning */
764   GNUNET_free (qe);
765 }
766
767
768 /**
769  * Type of a function to call when we receive a message
770  * from the service.
771  *
772  * @param cls closure
773  * @param msg message received, NULL on timeout or fatal error
774  */
775 static void 
776 process_status_message (void *cls,
777                         const struct
778                         GNUNET_MessageHeader * msg)
779 {
780   struct GNUNET_DATASTORE_Handle *h = cls;
781   struct GNUNET_DATASTORE_QueueEntry *qe;
782   struct StatusContext rc;
783   const struct StatusMessage *sm;
784   const char *emsg;
785   int32_t status;
786   int was_transmitted;
787
788   if (NULL == (qe = h->queue_head))
789     {
790       GNUNET_break (0);
791       do_disconnect (h);
792       return;
793     }
794   rc = qe->qc.sc;
795   if (msg == NULL)
796     {      
797       was_transmitted = qe->was_transmitted;
798       free_queue_entry (qe);
799       if (was_transmitted == GNUNET_YES)
800         do_disconnect (h);
801       else
802         process_queue (h);
803       if (rc.cont != NULL)
804         rc.cont (rc.cont_cls, 
805                  GNUNET_SYSERR,
806                  _("Failed to receive status response from database."));
807       return;
808     }
809   GNUNET_assert (GNUNET_YES == qe->was_transmitted);
810   free_queue_entry (qe);
811   if ( (ntohs(msg->size) < sizeof(struct StatusMessage)) ||
812        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS) ) 
813     {
814       GNUNET_break (0);
815       h->retry_time = GNUNET_TIME_UNIT_ZERO;
816       do_disconnect (h);
817       if (rc.cont != NULL)
818         rc.cont (rc.cont_cls, 
819                  GNUNET_SYSERR,
820                  _("Error reading response from datastore service"));
821       return;
822     }
823   sm = (const struct StatusMessage*) msg;
824   status = ntohl(sm->status);
825   emsg = NULL;
826   if (ntohs(msg->size) > sizeof(struct StatusMessage))
827     {
828       emsg = (const char*) &sm[1];
829       if (emsg[ntohs(msg->size) - sizeof(struct StatusMessage) - 1] != '\0')
830         {
831           GNUNET_break (0);
832           emsg = _("Invalid error message received from datastore service");
833         }
834     }  
835   if ( (status == GNUNET_SYSERR) &&
836        (emsg == NULL) )
837     {
838       GNUNET_break (0);
839       emsg = _("Invalid error message received from datastore service");
840     }
841 #if DEBUG_DATASTORE
842   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
843               "Received status %d/%s\n",
844               (int) status,
845               emsg);
846 #endif
847   GNUNET_STATISTICS_update (h->stats,
848                             gettext_noop ("# status messages received"),
849                             1,
850                             GNUNET_NO);
851   h->retry_time.rel_value = 0;
852   process_queue (h);
853   if (rc.cont != NULL)
854     rc.cont (rc.cont_cls, 
855              status,
856              emsg);
857 }
858
859
860 /**
861  * Store an item in the datastore.  If the item is already present,
862  * the priorities are summed up and the higher expiration time and
863  * lower anonymity level is used.
864  *
865  * @param h handle to the datastore
866  * @param rid reservation ID to use (from "reserve"); use 0 if no
867  *            prior reservation was made
868  * @param key key for the value
869  * @param size number of bytes in data
870  * @param data content stored
871  * @param type type of the content
872  * @param priority priority of the content
873  * @param anonymity anonymity-level for the content
874  * @param replication how often should the content be replicated to other peers?
875  * @param expiration expiration time for the content
876  * @param queue_priority ranking of this request in the priority queue
877  * @param max_queue_size at what queue size should this request be dropped
878  *        (if other requests of higher priority are in the queue)
879  * @param timeout timeout for the operation
880  * @param cont continuation to call when done
881  * @param cont_cls closure for cont
882  * @return NULL if the entry was not queued, otherwise a handle that can be used to
883  *         cancel; note that even if NULL is returned, the callback will be invoked
884  *         (or rather, will already have been invoked)
885  */
886 struct GNUNET_DATASTORE_QueueEntry *
887 GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
888                       uint32_t rid,
889                       const GNUNET_HashCode * key,
890                       size_t size,
891                       const void *data,
892                       enum GNUNET_BLOCK_Type type,
893                       uint32_t priority,
894                       uint32_t anonymity,
895                       uint32_t replication,
896                       struct GNUNET_TIME_Absolute expiration,
897                       unsigned int queue_priority,
898                       unsigned int max_queue_size,
899                       struct GNUNET_TIME_Relative timeout,
900                       GNUNET_DATASTORE_ContinuationWithStatus cont,
901                       void *cont_cls)
902 {
903   struct GNUNET_DATASTORE_QueueEntry *qe;
904   struct DataMessage *dm;
905   size_t msize;
906   union QueueContext qc;
907
908 #if DEBUG_DATASTORE
909   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
910               "Asked to put %u bytes of data under key `%s' for %llu ms\n",
911               size,
912               GNUNET_h2s (key),
913               GNUNET_TIME_absolute_get_remaining (expiration).rel_value);
914 #endif
915   msize = sizeof(struct DataMessage) + size;
916   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
917   qc.sc.cont = cont;
918   qc.sc.cont_cls = cont_cls;
919   qe = make_queue_entry (h, msize,
920                          queue_priority, max_queue_size, timeout,
921                          &process_status_message, &qc);
922   if (qe == NULL)
923     {
924 #if DEBUG_DATASTORE
925       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
926                   "Could not create queue entry for PUT\n");
927 #endif
928       return NULL;
929     }
930   GNUNET_STATISTICS_update (h->stats,
931                             gettext_noop ("# PUT requests executed"),
932                             1,
933                             GNUNET_NO);
934   dm = (struct DataMessage* ) &qe[1];
935   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
936   dm->header.size = htons(msize);
937   dm->rid = htonl(rid);
938   dm->size = htonl( (uint32_t) size);
939   dm->type = htonl(type);
940   dm->priority = htonl(priority);
941   dm->anonymity = htonl(anonymity);
942   dm->replication = htonl (replication);
943   dm->reserved = htonl (0);
944   dm->uid = GNUNET_htonll(0);
945   dm->expiration = GNUNET_TIME_absolute_hton(expiration);
946   dm->key = *key;
947   memcpy (&dm[1], data, size);
948   process_queue (h);
949   return qe;
950 }
951
952
953 /**
954  * Reserve space in the datastore.  This function should be used
955  * to avoid "out of space" failures during a longer sequence of "put"
956  * operations (for example, when a file is being inserted).
957  *
958  * @param h handle to the datastore
959  * @param amount how much space (in bytes) should be reserved (for content only)
960  * @param entries how many entries will be created (to calculate per-entry overhead)
961  * @param queue_priority ranking of this request in the priority queue
962  * @param max_queue_size at what queue size should this request be dropped
963  *        (if other requests of higher priority are in the queue)
964  * @param timeout how long to wait at most for a response (or before dying in queue)
965  * @param cont continuation to call when done; "success" will be set to
966  *             a positive reservation value if space could be reserved.
967  * @param cont_cls closure for cont
968  * @return NULL if the entry was not queued, otherwise a handle that can be used to
969  *         cancel; note that even if NULL is returned, the callback will be invoked
970  *         (or rather, will already have been invoked)
971  */
972 struct GNUNET_DATASTORE_QueueEntry *
973 GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
974                           uint64_t amount,
975                           uint32_t entries,
976                           unsigned int queue_priority,
977                           unsigned int max_queue_size,
978                           struct GNUNET_TIME_Relative timeout,
979                           GNUNET_DATASTORE_ContinuationWithStatus cont,
980                           void *cont_cls)
981 {
982   struct GNUNET_DATASTORE_QueueEntry *qe;
983   struct ReserveMessage *rm;
984   union QueueContext qc;
985
986   if (cont == NULL)
987     cont = &drop_status_cont;
988 #if DEBUG_DATASTORE
989   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
990               "Asked to reserve %llu bytes of data and %u entries'\n",
991               (unsigned long long) amount,
992               (unsigned int) entries);
993 #endif
994   qc.sc.cont = cont;
995   qc.sc.cont_cls = cont_cls;
996   qe = make_queue_entry (h, sizeof(struct ReserveMessage),
997                          queue_priority, max_queue_size, timeout,
998                          &process_status_message, &qc);
999   if (qe == NULL)
1000     {
1001 #if DEBUG_DATASTORE
1002       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1003                   "Could not create queue entry to reserve\n");
1004 #endif
1005       return NULL;
1006     }
1007   GNUNET_STATISTICS_update (h->stats,
1008                             gettext_noop ("# RESERVE requests executed"),
1009                             1,
1010                             GNUNET_NO);
1011   rm = (struct ReserveMessage*) &qe[1];
1012   rm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
1013   rm->header.size = htons(sizeof (struct ReserveMessage));
1014   rm->entries = htonl(entries);
1015   rm->amount = GNUNET_htonll(amount);
1016   process_queue (h);
1017   return qe;
1018 }
1019
1020
1021 /**
1022  * Signal that all of the data for which a reservation was made has
1023  * been stored and that whatever excess space might have been reserved
1024  * can now be released.
1025  *
1026  * @param h handle to the datastore
1027  * @param rid reservation ID (value of "success" in original continuation
1028  *        from the "reserve" function).
1029  * @param queue_priority ranking of this request in the priority queue
1030  * @param max_queue_size at what queue size should this request be dropped
1031  *        (if other requests of higher priority are in the queue)
1032  * @param queue_priority ranking of this request in the priority queue
1033  * @param max_queue_size at what queue size should this request be dropped
1034  *        (if other requests of higher priority are in the queue)
1035  * @param timeout how long to wait at most for a response
1036  * @param cont continuation to call when done
1037  * @param cont_cls closure for cont
1038  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1039  *         cancel; note that even if NULL is returned, the callback will be invoked
1040  *         (or rather, will already have been invoked)
1041  */
1042 struct GNUNET_DATASTORE_QueueEntry *
1043 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
1044                                   uint32_t rid,
1045                                   unsigned int queue_priority,
1046                                   unsigned int max_queue_size,
1047                                   struct GNUNET_TIME_Relative timeout,
1048                                   GNUNET_DATASTORE_ContinuationWithStatus cont,
1049                                   void *cont_cls)
1050 {
1051   struct GNUNET_DATASTORE_QueueEntry *qe;
1052   struct ReleaseReserveMessage *rrm;
1053   union QueueContext qc;
1054
1055   if (cont == NULL)
1056     cont = &drop_status_cont;
1057 #if DEBUG_DATASTORE
1058   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1059               "Asked to release reserve %d\n",
1060               rid);
1061 #endif
1062   qc.sc.cont = cont;
1063   qc.sc.cont_cls = cont_cls;
1064   qe = make_queue_entry (h, sizeof(struct ReleaseReserveMessage),
1065                          queue_priority, max_queue_size, timeout,
1066                          &process_status_message, &qc);
1067   if (qe == NULL)
1068     {
1069 #if DEBUG_DATASTORE
1070       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1071                   "Could not create queue entry to release reserve\n");
1072 #endif
1073       return NULL;
1074     }
1075   GNUNET_STATISTICS_update (h->stats,
1076                             gettext_noop ("# RELEASE RESERVE requests executed"),
1077                             1,
1078                             GNUNET_NO);
1079   rrm = (struct ReleaseReserveMessage*) &qe[1];
1080   rrm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
1081   rrm->header.size = htons(sizeof (struct ReleaseReserveMessage));
1082   rrm->rid = htonl(rid);
1083   process_queue (h);
1084   return qe;
1085 }
1086
1087
1088 /**
1089  * Update a value in the datastore.
1090  *
1091  * @param h handle to the datastore
1092  * @param uid identifier for the value
1093  * @param priority how much to increase the priority of the value
1094  * @param expiration new expiration value should be MAX of existing and this argument
1095  * @param queue_priority ranking of this request in the priority queue
1096  * @param max_queue_size at what queue size should this request be dropped
1097  *        (if other requests of higher priority are in the queue)
1098  * @param timeout how long to wait at most for a response
1099  * @param cont continuation to call when done
1100  * @param cont_cls closure for cont
1101  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1102  *         cancel; note that even if NULL is returned, the callback will be invoked
1103  *         (or rather, will already have been invoked)
1104  */
1105 struct GNUNET_DATASTORE_QueueEntry *
1106 GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
1107                          uint64_t uid,
1108                          uint32_t priority,
1109                          struct GNUNET_TIME_Absolute expiration,
1110                          unsigned int queue_priority,
1111                          unsigned int max_queue_size,
1112                          struct GNUNET_TIME_Relative timeout,
1113                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1114                          void *cont_cls)
1115 {
1116   struct GNUNET_DATASTORE_QueueEntry *qe;
1117   struct UpdateMessage *um;
1118   union QueueContext qc;
1119
1120   if (cont == NULL)
1121     cont = &drop_status_cont;
1122 #if DEBUG_DATASTORE
1123   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1124               "Asked to update entry %llu raising priority by %u and expiration to %llu\n",
1125               uid,
1126               (unsigned int) priority,
1127               (unsigned long long) expiration.abs_value);
1128 #endif
1129   qc.sc.cont = cont;
1130   qc.sc.cont_cls = cont_cls;
1131   qe = make_queue_entry (h, sizeof(struct UpdateMessage),
1132                          queue_priority, max_queue_size, timeout,
1133                          &process_status_message, &qc);
1134   if (qe == NULL)
1135     {
1136 #if DEBUG_DATASTORE
1137       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1138                   "Could not create queue entry for UPDATE\n");
1139 #endif
1140       return NULL;
1141     }
1142   GNUNET_STATISTICS_update (h->stats,
1143                             gettext_noop ("# UPDATE requests executed"),
1144                             1,
1145                             GNUNET_NO);
1146   um = (struct UpdateMessage*) &qe[1];
1147   um->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
1148   um->header.size = htons(sizeof (struct UpdateMessage));
1149   um->priority = htonl(priority);
1150   um->expiration = GNUNET_TIME_absolute_hton(expiration);
1151   um->uid = GNUNET_htonll(uid);
1152   process_queue (h);
1153   return qe;
1154 }
1155
1156
1157 /**
1158  * Explicitly remove some content from the database.
1159  * The "cont"inuation will be called with status
1160  * "GNUNET_OK" if content was removed, "GNUNET_NO"
1161  * if no matching entry was found and "GNUNET_SYSERR"
1162  * on all other types of errors.
1163  *
1164  * @param h handle to the datastore
1165  * @param key key for the value
1166  * @param size number of bytes in data
1167  * @param data content stored
1168  * @param queue_priority ranking of this request in the priority queue
1169  * @param max_queue_size at what queue size should this request be dropped
1170  *        (if other requests of higher priority are in the queue)
1171  * @param timeout how long to wait at most for a response
1172  * @param cont continuation to call when done
1173  * @param cont_cls closure for cont
1174  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1175  *         cancel; note that even if NULL is returned, the callback will be invoked
1176  *         (or rather, will already have been invoked)
1177  */
1178 struct GNUNET_DATASTORE_QueueEntry *
1179 GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
1180                          const GNUNET_HashCode *key,
1181                          size_t size,
1182                          const void *data,
1183                          unsigned int queue_priority,
1184                          unsigned int max_queue_size,
1185                          struct GNUNET_TIME_Relative timeout,
1186                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1187                          void *cont_cls)
1188 {
1189   struct GNUNET_DATASTORE_QueueEntry *qe;
1190   struct DataMessage *dm;
1191   size_t msize;
1192   union QueueContext qc;
1193
1194   if (cont == NULL)
1195     cont = &drop_status_cont;
1196 #if DEBUG_DATASTORE
1197   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1198               "Asked to remove %u bytes under key `%s'\n",
1199               size,
1200               GNUNET_h2s (key));
1201 #endif
1202   qc.sc.cont = cont;
1203   qc.sc.cont_cls = cont_cls;
1204   msize = sizeof(struct DataMessage) + size;
1205   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1206   qe = make_queue_entry (h, msize,
1207                          queue_priority, max_queue_size, timeout,
1208                          &process_status_message, &qc);
1209   if (qe == NULL)
1210     {
1211 #if DEBUG_DATASTORE
1212       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1213                   "Could not create queue entry for REMOVE\n");
1214 #endif
1215       return NULL;
1216     }
1217   GNUNET_STATISTICS_update (h->stats,
1218                             gettext_noop ("# REMOVE requests executed"),
1219                             1,
1220                             GNUNET_NO);
1221   dm = (struct DataMessage*) &qe[1];
1222   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
1223   dm->header.size = htons(msize);
1224   dm->rid = htonl(0);
1225   dm->size = htonl(size);
1226   dm->type = htonl(0);
1227   dm->priority = htonl(0);
1228   dm->anonymity = htonl(0);
1229   dm->uid = GNUNET_htonll(0);
1230   dm->expiration = GNUNET_TIME_absolute_hton(GNUNET_TIME_UNIT_ZERO_ABS);
1231   dm->key = *key;
1232   memcpy (&dm[1], data, size);
1233   process_queue (h);
1234   return qe;
1235 }
1236
1237
1238 /**
1239  * Type of a function to call when we receive a message
1240  * from the service.
1241  *
1242  * @param cls closure
1243  * @param msg message received, NULL on timeout or fatal error
1244  */
1245 static void 
1246 process_result_message (void *cls,
1247                         const struct GNUNET_MessageHeader *msg)
1248 {
1249   struct GNUNET_DATASTORE_Handle *h = cls;
1250   struct GNUNET_DATASTORE_QueueEntry *qe;
1251   struct ResultContext rc;
1252   const struct DataMessage *dm;
1253   int was_transmitted;
1254
1255   if (msg == NULL)
1256     {
1257       qe = h->queue_head;
1258       GNUNET_assert (NULL != qe);
1259       rc = qe->qc.rc;
1260       was_transmitted = qe->was_transmitted;
1261       free_queue_entry (qe);
1262       if (was_transmitted == GNUNET_YES)
1263         {
1264           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1265                       _("Failed to receive response from database.\n"));
1266           do_disconnect (h);
1267         }
1268       else
1269         {
1270           process_queue (h);
1271         }
1272       if (rc.proc != NULL)
1273         rc.proc (rc.proc_cls,
1274                  NULL, 0, NULL, 0, 0, 0, 
1275                  GNUNET_TIME_UNIT_ZERO_ABS, 0);         
1276       return;
1277     }
1278   if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) 
1279     {
1280       GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader));
1281       qe = h->queue_head;
1282       rc = qe->qc.rc;
1283       GNUNET_assert (GNUNET_YES == qe->was_transmitted);
1284       free_queue_entry (qe);
1285 #if DEBUG_DATASTORE
1286       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1287                   "Received end of result set, new queue size is %u\n",
1288                   h->queue_size);
1289 #endif
1290       if (rc.proc != NULL)
1291         rc.proc (rc.proc_cls,
1292                  NULL, 0, NULL, 0, 0, 0, 
1293                  GNUNET_TIME_UNIT_ZERO_ABS, 0); 
1294       h->retry_time.rel_value = 0;
1295       h->result_count = 0;
1296       process_queue (h);
1297       return;
1298     }
1299   qe = h->queue_head;
1300   GNUNET_assert (NULL != qe);
1301   rc = qe->qc.rc;
1302   if (GNUNET_YES != qe->was_transmitted)
1303     {
1304       GNUNET_break (0);
1305       free_queue_entry (qe);
1306       h->retry_time = GNUNET_TIME_UNIT_ZERO;
1307       do_disconnect (h);
1308       if (rc.proc != NULL)
1309         rc.proc (rc.proc_cls,
1310                  NULL, 0, NULL, 0, 0, 0, 
1311                  GNUNET_TIME_UNIT_ZERO_ABS, 0);
1312       return;
1313     }
1314   if ( (ntohs(msg->size) < sizeof(struct DataMessage)) ||
1315        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) ||
1316        (ntohs(msg->size) != sizeof(struct DataMessage) + ntohl (((const struct DataMessage*)msg)->size)) )
1317     {
1318       GNUNET_break (0);
1319       free_queue_entry (qe);
1320       h->retry_time = GNUNET_TIME_UNIT_ZERO;
1321       do_disconnect (h);
1322       if (rc.proc != NULL)
1323         rc.proc (rc.proc_cls,
1324                  NULL, 0, NULL, 0, 0, 0, 
1325                  GNUNET_TIME_UNIT_ZERO_ABS, 0);
1326       return;
1327     }
1328   GNUNET_STATISTICS_update (h->stats,
1329                             gettext_noop ("# Results received"),
1330                             1,
1331                             GNUNET_NO);
1332   dm = (const struct DataMessage*) msg;
1333 #if DEBUG_DATASTORE
1334   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1335               "Received result %llu with type %u and size %u with key %s\n",
1336               (unsigned long long) GNUNET_ntohll(dm->uid),
1337               ntohl(dm->type),
1338               ntohl(dm->size),
1339               GNUNET_h2s(&dm->key));
1340 #endif
1341   free_queue_entry (qe);
1342   h->retry_time.rel_value = 0;
1343   process_queue (h);
1344   if (rc.proc != NULL)
1345     rc.proc (rc.proc_cls,
1346              &dm->key,
1347              ntohl(dm->size),
1348              &dm[1],
1349              ntohl(dm->type),
1350              ntohl(dm->priority),
1351              ntohl(dm->anonymity),
1352              GNUNET_TIME_absolute_ntoh(dm->expiration), 
1353              GNUNET_ntohll(dm->uid));
1354 }
1355
1356
1357 /**
1358  * Get a random value from the datastore for content replication.
1359  * Returns a single, random value among those with the highest
1360  * replication score, lowering positive replication scores by one for
1361  * the chosen value (if only content with a replication score exists,
1362  * a random value is returned and replication scores are not changed).
1363  *
1364  * @param h handle to the datastore
1365  * @param queue_priority ranking of this request in the priority queue
1366  * @param max_queue_size at what queue size should this request be dropped
1367  *        (if other requests of higher priority are in the queue)
1368  * @param timeout how long to wait at most for a response
1369  * @param proc function to call on a random value; it
1370  *        will be called once with a value (if available)
1371  *        and always once with a value of NULL.
1372  * @param proc_cls closure for proc
1373  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1374  *         cancel
1375  */
1376 struct GNUNET_DATASTORE_QueueEntry *
1377 GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
1378                                       unsigned int queue_priority,
1379                                       unsigned int max_queue_size,
1380                                       struct GNUNET_TIME_Relative timeout,
1381                                       GNUNET_DATASTORE_DatumProcessor proc, 
1382                                       void *proc_cls)
1383 {
1384   struct GNUNET_DATASTORE_QueueEntry *qe;
1385   struct GNUNET_MessageHeader *m;
1386   union QueueContext qc;
1387
1388   GNUNET_assert (NULL != proc);
1389 #if DEBUG_DATASTORE
1390   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1391               "Asked to get replication entry in %llu ms\n",
1392               (unsigned long long) timeout.rel_value);
1393 #endif
1394   qc.rc.proc = proc;
1395   qc.rc.proc_cls = proc_cls;
1396   qe = make_queue_entry (h, sizeof(struct GNUNET_MessageHeader),
1397                          queue_priority, max_queue_size, timeout,
1398                          &process_result_message, &qc);
1399   if (qe == NULL)
1400     {
1401 #if DEBUG_DATASTORE
1402       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1403                   "Could not create queue entry for GET REPLICATION\n");
1404 #endif
1405       return NULL;    
1406     }
1407   GNUNET_STATISTICS_update (h->stats,
1408                             gettext_noop ("# GET REPLICATION requests executed"),
1409                             1,
1410                             GNUNET_NO);
1411   m = (struct GNUNET_MessageHeader*) &qe[1];
1412   m->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION);
1413   m->size = htons(sizeof (struct GNUNET_MessageHeader));
1414   process_queue (h);
1415   return qe;
1416 }
1417
1418
1419 /**
1420  * Get a single zero-anonymity value from the datastore.
1421  *
1422  * @param h handle to the datastore
1423  * @param offset offset of the result (modulo num-results); set to
1424  *               a random 64-bit value initially; then increment by
1425  *               one each time; detect that all results have been found by uid
1426  *               being again the first uid ever returned.
1427  * @param queue_priority ranking of this request in the priority queue
1428  * @param max_queue_size at what queue size should this request be dropped
1429  *        (if other requests of higher priority are in the queue)
1430  * @param timeout how long to wait at most for a response
1431  * @param type allowed type for the operation (never zero)
1432  * @param proc function to call on a random value; it
1433  *        will be called once with a value (if available)
1434  *        or with NULL if none value exists.
1435  * @param proc_cls closure for proc
1436  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1437  *         cancel
1438  */
1439 struct GNUNET_DATASTORE_QueueEntry *
1440 GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1441                                      uint64_t offset,
1442                                      unsigned int queue_priority,
1443                                      unsigned int max_queue_size,
1444                                      struct GNUNET_TIME_Relative timeout,
1445                                      enum GNUNET_BLOCK_Type type,
1446                                      GNUNET_DATASTORE_DatumProcessor proc, 
1447                                      void *proc_cls)
1448 {
1449   struct GNUNET_DATASTORE_QueueEntry *qe;
1450   struct GetZeroAnonymityMessage *m;
1451   union QueueContext qc;
1452
1453   GNUNET_assert (NULL != proc);
1454   GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY);
1455 #if DEBUG_DATASTORE
1456   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1457               "Asked to get %llu-th zero-anonymity entry of type %d in %llu ms\n",
1458               (unsigned long long) offset,
1459               type,
1460               (unsigned long long) timeout.rel_value);
1461 #endif
1462   qc.rc.proc = proc;
1463   qc.rc.proc_cls = proc_cls;
1464   qe = make_queue_entry (h, sizeof(struct GetZeroAnonymityMessage),
1465                          queue_priority, max_queue_size, timeout,
1466                          &process_result_message, &qc);
1467   if (qe == NULL)
1468     {
1469 #if DEBUG_DATASTORE
1470       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1471                   "Could not create queue entry for zero-anonymity procation\n");
1472 #endif
1473       return NULL;    
1474     }
1475   GNUNET_STATISTICS_update (h->stats,
1476                             gettext_noop ("# GET ZERO ANONYMITY requests executed"),
1477                             1,
1478                             GNUNET_NO);
1479   m = (struct GetZeroAnonymityMessage*) &qe[1];
1480   m->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY);
1481   m->header.size = htons(sizeof (struct GetZeroAnonymityMessage));
1482   m->type = htonl ((uint32_t) type);
1483   m->offset = GNUNET_htonll (offset);
1484   process_queue (h);
1485   return qe;
1486 }
1487
1488
1489 /**
1490  * Get a result for a particular key from the datastore.  The processor
1491  * will only be called once.
1492  *
1493  * @param h handle to the datastore
1494  * @param offset offset of the result (modulo num-results); set to
1495  *               a random 64-bit value initially; then increment by
1496  *               one each time; detect that all results have been found by uid
1497  *               being again the first uid ever returned.
1498  * @param key maybe NULL (to match all entries)
1499  * @param type desired type, 0 for any
1500  * @param queue_priority ranking of this request in the priority queue
1501  * @param max_queue_size at what queue size should this request be dropped
1502  *        (if other requests of higher priority are in the queue)
1503  * @param timeout how long to wait at most for a response
1504  * @param proc function to call on each matching value;
1505  *        will be called once with a NULL value at the end
1506  * @param proc_cls closure for proc
1507  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1508  *         cancel
1509  */
1510 struct GNUNET_DATASTORE_QueueEntry *
1511 GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h,
1512                           uint64_t offset,
1513                           const GNUNET_HashCode * key,
1514                           enum GNUNET_BLOCK_Type type,
1515                           unsigned int queue_priority,
1516                           unsigned int max_queue_size,
1517                           struct GNUNET_TIME_Relative timeout,
1518                           GNUNET_DATASTORE_DatumProcessor proc, 
1519                           void *proc_cls)
1520 {
1521   struct GNUNET_DATASTORE_QueueEntry *qe;
1522   struct GetMessage *gm;
1523   union QueueContext qc;
1524
1525   GNUNET_assert (NULL != proc);
1526 #if DEBUG_DATASTORE
1527   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1528               "Asked to look for data of type %u under key `%s'\n",
1529               (unsigned int) type,
1530               GNUNET_h2s (key));
1531 #endif
1532   qc.rc.proc = proc;
1533   qc.rc.proc_cls = proc_cls;
1534   qe = make_queue_entry (h, sizeof(struct GetMessage),
1535                          queue_priority, max_queue_size, timeout,
1536                          &process_result_message, &qc);
1537   if (qe == NULL)
1538     {
1539 #if DEBUG_DATASTORE
1540       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1541                   "Could not queue request for `%s'\n",
1542                   GNUNET_h2s (key));
1543 #endif
1544       return NULL;
1545     }
1546   GNUNET_STATISTICS_update (h->stats,
1547                             gettext_noop ("# GET requests executed"),
1548                             1,
1549                             GNUNET_NO);
1550   gm = (struct GetMessage*) &qe[1];
1551   gm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1552   gm->type = htonl(type);
1553   gm->offset = GNUNET_htonll (offset);
1554   if (key != NULL)
1555     {
1556       gm->header.size = htons(sizeof (struct GetMessage));
1557       gm->key = *key;
1558     }
1559   else
1560     {
1561       gm->header.size = htons(sizeof (struct GetMessage) - sizeof(GNUNET_HashCode));
1562     }
1563   process_queue (h);
1564   return qe;
1565 }
1566
1567
1568 /**
1569  * Cancel a datastore operation.  The final callback from the
1570  * operation must not have been done yet.
1571  * 
1572  * @param qe operation to cancel
1573  */
1574 void
1575 GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
1576 {
1577   struct GNUNET_DATASTORE_Handle *h;
1578
1579   GNUNET_assert (GNUNET_SYSERR != qe->was_transmitted);
1580   h = qe->h;
1581 #if DEBUG_DATASTORE
1582   GNUNET_log  (GNUNET_ERROR_TYPE_DEBUG,
1583                "Pending DATASTORE request %p cancelled (%d, %d)\n",
1584                qe,
1585                qe->was_transmitted,
1586                h->queue_head == qe);
1587 #endif
1588   if (GNUNET_YES == qe->was_transmitted) 
1589     {
1590       free_queue_entry (qe);
1591       h->skip_next_messages++;
1592       return;
1593     }
1594   free_queue_entry (qe);
1595   process_queue (h);
1596 }
1597
1598
1599 /* end of datastore_api.c */