9738588a7d4073b9c054f4addc8da3ec35091e3d
[oweals/gnunet.git] / src / datastore / datastore_api.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/datastore_api.c
23  * @brief Management for the datastore for files stored on a GNUnet node.  Implements
24  *        a priority queue for requests (with timeouts).
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet_arm_service.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_datastore_service.h"
31 #include "gnunet_statistics_service.h"
32 #include "datastore.h"
33
34 /**
35  * If a client stopped asking for more results, how many more do
36  * we receive from the DB before killing the connection?  Trade-off
37  * between re-doing TCP handshakes and (needlessly) receiving 
38  * useless results.
39  */
40 #define MAX_EXCESS_RESULTS 8
41
42 /**
43  * Context for processing status messages.
44  */
45 struct StatusContext
46 {
47   /**
48    * Continuation to call with the status.
49    */
50   GNUNET_DATASTORE_ContinuationWithStatus cont;
51
52   /**
53    * Closure for cont.
54    */
55   void *cont_cls;
56
57 };
58
59
60 /**
61  * Context for processing result messages.
62  */
63 struct ResultContext
64 {
65   /**
66    * Function to call with the result.
67    */
68   GNUNET_DATASTORE_DatumProcessor proc;
69
70   /**
71    * Closure for proc.
72    */
73   void *proc_cls;
74
75 };
76
77
78 /**
79  *  Context for a queue operation.
80  */
81 union QueueContext
82 {
83
84   struct StatusContext sc;
85   
86   struct ResultContext rc;
87
88 };
89
90
91
92 /**
93  * Entry in our priority queue.
94  */
95 struct GNUNET_DATASTORE_QueueEntry
96 {
97
98   /**
99    * This is a linked list.
100    */
101   struct GNUNET_DATASTORE_QueueEntry *next;
102
103   /**
104    * This is a linked list.
105    */
106   struct GNUNET_DATASTORE_QueueEntry *prev;
107
108   /**
109    * Handle to the master context.
110    */
111   struct GNUNET_DATASTORE_Handle *h;
112
113   /**
114    * Response processor (NULL if we are not waiting for a response).
115    * This struct should be used for the closure, function-specific
116    * arguments can be passed via 'qc'.
117    */
118   GNUNET_CLIENT_MessageHandler response_proc;
119
120   /**
121    * Function to call after transmission of the request.
122    */
123   GNUNET_DATASTORE_ContinuationWithStatus cont;
124    
125   /**
126    * Closure for 'cont'.
127    */
128   void *cont_cls;
129
130   /**
131    * Context for the operation.
132    */
133   union QueueContext qc;
134
135   /**
136    * Task for timeout signalling.
137    */
138   GNUNET_SCHEDULER_TaskIdentifier task;
139
140   /**
141    * Timeout for the current operation.
142    */
143   struct GNUNET_TIME_Absolute timeout;
144
145   /**
146    * Priority in the queue.
147    */
148   unsigned int priority;
149
150   /**
151    * Maximum allowed length of queue (otherwise
152    * this request should be discarded).
153    */
154   unsigned int max_queue;
155
156   /**
157    * Number of bytes in the request message following
158    * this struct.  32-bit value for nicer memory
159    * access (and overall struct alignment).
160    */
161   uint32_t message_size;
162
163   /**
164    * Has this message been transmitted to the service?
165    * Only ever GNUNET_YES for the head of the queue.
166    * Note that the overall struct should end at a 
167    * multiple of 64 bits.
168    */
169   int was_transmitted;
170   
171 };
172
173 /**
174  * Handle to the datastore service. 
175  */
176 struct GNUNET_DATASTORE_Handle
177 {
178
179   /**
180    * Our configuration.
181    */
182   const struct GNUNET_CONFIGURATION_Handle *cfg;
183
184   /**
185    * Current connection to the datastore service.
186    */
187   struct GNUNET_CLIENT_Connection *client;
188
189   /**
190    * Handle for statistics.
191    */
192   struct GNUNET_STATISTICS_Handle *stats;
193
194   /**
195    * Current transmit handle.
196    */
197   struct GNUNET_CLIENT_TransmitHandle *th;
198
199   /**
200    * Current head of priority queue.
201    */
202   struct GNUNET_DATASTORE_QueueEntry *queue_head;
203
204   /**
205    * Current tail of priority queue.
206    */
207   struct GNUNET_DATASTORE_QueueEntry *queue_tail;
208
209   /**
210    * Task for trying to reconnect.
211    */
212   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
213
214   /**
215    * How quickly should we retry?  Used for exponential back-off on
216    * connect-errors.
217    */
218   struct GNUNET_TIME_Relative retry_time;
219
220   /**
221    * Number of entries in the queue.
222    */
223   unsigned int queue_size;
224
225   /**
226    * Number of results we're receiving for the current query
227    * after application stopped to care.  Used to determine when
228    * to reset the connection.
229    */
230   unsigned int result_count;
231
232   /**
233    * Are we currently trying to receive from the service?
234    */
235   int in_receive;
236
237   /**
238    * We should ignore the next message(s) from the service.
239    */
240   unsigned int skip_next_messages;
241
242 };
243
244
245
246 /**
247  * Connect to the datastore service.
248  *
249  * @param cfg configuration to use
250  * @return handle to use to access the service
251  */
252 struct GNUNET_DATASTORE_Handle *
253 GNUNET_DATASTORE_connect (const struct
254                           GNUNET_CONFIGURATION_Handle
255                           *cfg)
256 {
257   struct GNUNET_CLIENT_Connection *c;
258   struct GNUNET_DATASTORE_Handle *h;
259   
260   c = GNUNET_CLIENT_connect ("datastore", cfg);
261   if (c == NULL)
262     return NULL; /* oops */
263   h = GNUNET_malloc (sizeof(struct GNUNET_DATASTORE_Handle) + 
264                      GNUNET_SERVER_MAX_MESSAGE_SIZE - 1);
265   h->client = c;
266   h->cfg = cfg;
267   h->stats = GNUNET_STATISTICS_create ("datastore-api",
268                                        cfg);
269   return h;
270 }
271
272
273 /**
274  * Transmit DROP message to datastore service.
275  *
276  * @param cls the 'struct GNUNET_DATASTORE_Handle'
277  * @param size number of bytes that can be copied to buf
278  * @param buf where to copy the drop message
279  * @return number of bytes written to buf
280  */
281 static size_t
282 transmit_drop (void *cls,
283                size_t size, 
284                void *buf)
285 {
286   struct GNUNET_DATASTORE_Handle *h = cls;
287   struct GNUNET_MessageHeader *hdr;
288   
289   if (buf == NULL)
290     {
291       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
292                   _("Failed to transmit request to drop database.\n"));
293       GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
294       return 0;
295     }
296   GNUNET_assert (size >= sizeof(struct GNUNET_MessageHeader));
297   hdr = buf;
298   hdr->size = htons(sizeof(struct GNUNET_MessageHeader));
299   hdr->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
300   GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
301   return sizeof(struct GNUNET_MessageHeader);
302 }
303
304
305 /**
306  * Disconnect from the datastore service (and free
307  * associated resources).
308  *
309  * @param h handle to the datastore
310  * @param drop set to GNUNET_YES to delete all data in datastore (!)
311  */
312 void
313 GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
314                              int drop)
315 {
316   struct GNUNET_DATASTORE_QueueEntry *qe;
317
318   if (h->client != NULL)
319     {
320       GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
321       h->client = NULL;
322     }
323   if (h->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
324     {
325       GNUNET_SCHEDULER_cancel (h->reconnect_task);
326       h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
327     }
328   if (NULL != h->th)
329     {
330       GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
331       h->th = NULL;
332     }
333   while (NULL != (qe = h->queue_head))
334     {
335       GNUNET_assert (NULL != qe->response_proc);
336       qe->response_proc (h, NULL);
337     }
338   if (GNUNET_YES == drop) 
339     {
340       h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
341       if (h->client != NULL)
342         {
343           if (NULL != 
344               GNUNET_CLIENT_notify_transmit_ready (h->client,
345                                                    sizeof(struct GNUNET_MessageHeader),
346                                                    GNUNET_TIME_UNIT_MINUTES,
347                                                    GNUNET_YES,
348                                                    &transmit_drop,
349                                                    h))
350             return;
351           GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
352           h->client = NULL;
353         }
354       GNUNET_break (0);
355     }
356   GNUNET_STATISTICS_destroy (h->stats,
357                              GNUNET_NO);
358   h->stats = NULL;
359   GNUNET_free (h);
360 }
361
362
363 /**
364  * A request has timed out (before being transmitted to the service).
365  *
366  * @param cls the 'struct GNUNET_DATASTORE_QueueEntry'
367  * @param tc scheduler context
368  */
369 static void
370 timeout_queue_entry (void *cls,
371                      const struct GNUNET_SCHEDULER_TaskContext *tc)
372 {
373   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
374
375   GNUNET_STATISTICS_update (qe->h->stats,
376                             gettext_noop ("# queue entry timeouts"),
377                             1,
378                             GNUNET_NO);
379   qe->task = GNUNET_SCHEDULER_NO_TASK;
380   GNUNET_assert (qe->was_transmitted == GNUNET_NO);
381   qe->response_proc (qe->h, NULL);
382 }
383
384
385 /**
386  * Create a new entry for our priority queue (and possibly discard other entires if
387  * the queue is getting too long).
388  *
389  * @param h handle to the datastore
390  * @param msize size of the message to queue
391  * @param queue_priority priority of the entry
392  * @param max_queue_size at what queue size should this request be dropped
393  *        (if other requests of higher priority are in the queue)
394  * @param timeout timeout for the operation
395  * @param response_proc function to call with replies (can be NULL)
396  * @param qc client context (NOT a closure for response_proc)
397  * @return NULL if the queue is full 
398  */
399 static struct GNUNET_DATASTORE_QueueEntry *
400 make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
401                   size_t msize,
402                   unsigned int queue_priority,
403                   unsigned int max_queue_size,
404                   struct GNUNET_TIME_Relative timeout,
405                   GNUNET_CLIENT_MessageHandler response_proc,            
406                   const union QueueContext *qc)
407 {
408   struct GNUNET_DATASTORE_QueueEntry *ret;
409   struct GNUNET_DATASTORE_QueueEntry *pos;
410   unsigned int c;
411
412   c = 0;
413   pos = h->queue_head;
414   while ( (pos != NULL) &&
415           (c < max_queue_size) &&
416           (pos->priority >= queue_priority) )
417     {
418       c++;
419       pos = pos->next;
420     }
421   if (c >= max_queue_size)
422     {
423       GNUNET_STATISTICS_update (h->stats,
424                                 gettext_noop ("# queue overflows"),
425                                 1,
426                                 GNUNET_NO);
427       return NULL;
428     }
429   ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize);
430   ret->h = h;
431   ret->response_proc = response_proc;
432   ret->qc = *qc;
433   ret->timeout = GNUNET_TIME_relative_to_absolute (timeout);
434   ret->priority = queue_priority;
435   ret->max_queue = max_queue_size;
436   ret->message_size = msize;
437   ret->was_transmitted = GNUNET_NO;
438   if (pos == NULL)
439     {
440       /* append at the tail */
441       pos = h->queue_tail;
442     }
443   else
444     {
445       pos = pos->prev; 
446       /* do not insert at HEAD if HEAD query was already
447          transmitted and we are still receiving replies! */
448       if ( (pos == NULL) &&
449            (h->queue_head->was_transmitted) )
450         pos = h->queue_head;
451     }
452   c++;
453   GNUNET_STATISTICS_update (h->stats,
454                             gettext_noop ("# queue entries created"),
455                             1,
456                             GNUNET_NO);
457   GNUNET_CONTAINER_DLL_insert_after (h->queue_head,
458                                      h->queue_tail,
459                                      pos,
460                                      ret);
461   h->queue_size++;
462   ret->task = GNUNET_SCHEDULER_add_delayed (timeout,
463                                             &timeout_queue_entry,
464                                             ret);
465   pos = ret->next;
466   while (pos != NULL) 
467     {
468       if ( (pos->max_queue < h->queue_size) &&
469            (pos->was_transmitted == GNUNET_NO) )
470         {
471           GNUNET_assert (pos->response_proc != NULL);
472           /* move 'pos' element to head so that it will be 
473              killed on 'NULL' call below */
474           GNUNET_CONTAINER_DLL_remove (h->queue_head,
475                                        h->queue_tail,
476                                        pos);
477           GNUNET_CONTAINER_DLL_insert (h->queue_head,
478                                        h->queue_tail,
479                                        pos);
480           pos->response_proc (h, NULL);
481           break;
482         }
483       pos = pos->next;
484     }
485   return ret;
486 }
487
488
489 /**
490  * Process entries in the queue (or do nothing if we are already
491  * doing so).
492  * 
493  * @param h handle to the datastore
494  */
495 static void
496 process_queue (struct GNUNET_DATASTORE_Handle *h);
497
498
499 /**
500  * Try reconnecting to the datastore service.
501  *
502  * @param cls the 'struct GNUNET_DATASTORE_Handle'
503  * @param tc scheduler context
504  */
505 static void
506 try_reconnect (void *cls,
507                const struct GNUNET_SCHEDULER_TaskContext *tc)
508 {
509   struct GNUNET_DATASTORE_Handle *h = cls;
510
511   if (h->retry_time.rel_value < GNUNET_CONSTANTS_SERVICE_RETRY.rel_value)
512     h->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY;
513   else
514     h->retry_time = GNUNET_TIME_relative_multiply (h->retry_time, 2);
515   if (h->retry_time.rel_value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.rel_value)
516     h->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
517   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
518   h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
519   if (h->client == NULL)
520     {
521       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
522                   "DATASTORE reconnect failed (fatally)\n");
523       return;
524     }
525   GNUNET_STATISTICS_update (h->stats,
526                             gettext_noop ("# datastore connections (re)created"),
527                             1,
528                             GNUNET_NO);
529 #if DEBUG_DATASTORE
530   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
531               "Reconnected to DATASTORE\n");
532 #endif
533   process_queue (h);
534 }
535
536
537 /**
538  * Disconnect from the service and then try reconnecting to the datastore service
539  * after some delay.
540  *
541  * @param h handle to datastore to disconnect and reconnect
542  */
543 static void
544 do_disconnect (struct GNUNET_DATASTORE_Handle *h)
545 {
546   if (h->client == NULL)
547     {
548 #if DEBUG_DATASTORE
549       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
550                   "client NULL in disconnect, will not try to reconnect\n");
551 #endif
552       return;
553     }
554 #if 0
555   GNUNET_STATISTICS_update (stats,
556                             gettext_noop ("# reconnected to DATASTORE"),
557                             1,
558                             GNUNET_NO);
559 #endif
560   GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
561   h->skip_next_messages = 0;
562   h->client = NULL;
563   h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->retry_time,
564                                                     &try_reconnect,
565                                                     h);      
566 }
567
568
569 /**
570  * Transmit request from queue to datastore service.
571  *
572  * @param cls the 'struct GNUNET_DATASTORE_Handle'
573  * @param size number of bytes that can be copied to buf
574  * @param buf where to copy the drop message
575  * @return number of bytes written to buf
576  */
577 static size_t
578 transmit_request (void *cls,
579                   size_t size, 
580                   void *buf)
581 {
582   struct GNUNET_DATASTORE_Handle *h = cls;
583   struct GNUNET_DATASTORE_QueueEntry *qe;
584   size_t msize;
585
586   h->th = NULL;
587   if (NULL == (qe = h->queue_head))
588     return 0; /* no entry in queue */
589   if (buf == NULL)
590     {
591       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
592                   _("Failed to transmit request to DATASTORE.\n"));
593       GNUNET_STATISTICS_update (h->stats,
594                                 gettext_noop ("# transmission request failures"),
595                                 1,
596                                 GNUNET_NO);
597       do_disconnect (h);
598       return 0;
599     }
600   if (size < (msize = qe->message_size))
601     {
602       process_queue (h);
603       return 0;
604     }
605  #if DEBUG_DATASTORE
606   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
607               "Transmitting %u byte request to DATASTORE\n",
608               msize);
609 #endif
610   memcpy (buf, &qe[1], msize);
611   qe->was_transmitted = GNUNET_YES;
612   GNUNET_SCHEDULER_cancel (qe->task);
613   qe->task = GNUNET_SCHEDULER_NO_TASK;
614   GNUNET_assert (GNUNET_NO == h->in_receive);
615   h->in_receive = GNUNET_YES;
616   GNUNET_CLIENT_receive (h->client,
617                          qe->response_proc,
618                          h,
619                          GNUNET_TIME_absolute_get_remaining (qe->timeout));
620   GNUNET_STATISTICS_update (h->stats,
621                             gettext_noop ("# bytes sent to datastore"),
622                             1,
623                             GNUNET_NO);
624   return msize;
625 }
626
627
628 /**
629  * Process entries in the queue (or do nothing if we are already
630  * doing so).
631  * 
632  * @param h handle to the datastore
633  */
634 static void
635 process_queue (struct GNUNET_DATASTORE_Handle *h)
636 {
637   struct GNUNET_DATASTORE_QueueEntry *qe;
638
639   if (NULL == (qe = h->queue_head))
640     {
641 #if DEBUG_DATASTORE > 1
642       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
643                   "Queue empty\n");
644 #endif
645       return; /* no entry in queue */
646     }
647   if (qe->was_transmitted == GNUNET_YES)
648     {
649 #if DEBUG_DATASTORE > 1
650       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
651                   "Head request already transmitted\n");
652 #endif
653       return; /* waiting for replies */
654     }
655   if (h->th != NULL)
656     {
657 #if DEBUG_DATASTORE > 1
658       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
659                   "Pending transmission request\n");
660 #endif
661       return; /* request pending */
662     }
663   if (h->client == NULL)
664     {
665 #if DEBUG_DATASTORE > 1
666       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
667                   "Not connected\n");
668 #endif
669       return; /* waiting for reconnect */
670     }
671   if (GNUNET_YES == h->in_receive)
672     {
673       /* wait for response to previous query */
674       return; 
675     }
676 #if DEBUG_DATASTORE
677   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
678               "Queueing %u byte request to DATASTORE\n",
679               qe->message_size);
680 #endif
681   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
682                                                qe->message_size,
683                                                GNUNET_TIME_absolute_get_remaining (qe->timeout),
684                                                GNUNET_YES,
685                                                &transmit_request,
686                                                h);
687   GNUNET_assert (GNUNET_NO == h->in_receive);
688   GNUNET_break (NULL != h->th);
689 }
690
691
692 /**
693  * Dummy continuation used to do nothing (but be non-zero).
694  *
695  * @param cls closure
696  * @param result result 
697  * @param emsg error message
698  */
699 static void
700 drop_status_cont (void *cls, int32_t result, const char *emsg)
701 {
702   /* do nothing */
703 }
704
705
706 /**
707  * Free a queue entry.  Removes the given entry from the
708  * queue and releases associated resources.  Does NOT
709  * call the callback.
710  * 
711  * @param qe entry to free.
712  */
713 static void
714 free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
715 {
716   struct GNUNET_DATASTORE_Handle *h = qe->h;
717
718   GNUNET_CONTAINER_DLL_remove (h->queue_head,
719                                h->queue_tail,
720                                qe);
721   if (qe->task != GNUNET_SCHEDULER_NO_TASK)
722     {
723       GNUNET_SCHEDULER_cancel (qe->task);
724       qe->task = GNUNET_SCHEDULER_NO_TASK;
725     }
726   h->queue_size--;
727   qe->was_transmitted = GNUNET_SYSERR; /* use-after-free warning */
728   GNUNET_free (qe);
729 }
730
731
732 /**
733  * Type of a function to call when we receive a message
734  * from the service.
735  *
736  * @param cls closure
737  * @param msg message received, NULL on timeout or fatal error
738  */
739 static void 
740 process_status_message (void *cls,
741                         const struct
742                         GNUNET_MessageHeader * msg)
743 {
744   struct GNUNET_DATASTORE_Handle *h = cls;
745   struct GNUNET_DATASTORE_QueueEntry *qe;
746   struct StatusContext rc;
747   const struct StatusMessage *sm;
748   const char *emsg;
749   int32_t status;
750   int was_transmitted;
751
752   h->in_receive = GNUNET_NO;
753   if (h->skip_next_messages > 0)
754     {
755       h->skip_next_messages--;
756       process_queue (h);
757       return;
758    } 
759   if (NULL == (qe = h->queue_head))
760     {
761       GNUNET_break (0);
762       do_disconnect (h);
763       return;
764     }
765   rc = qe->qc.sc;
766   if (msg == NULL)
767     {      
768       was_transmitted = qe->was_transmitted;
769       free_queue_entry (qe);
770       if (NULL == h->client)
771         return; /* forced disconnect */
772       if (rc.cont != NULL)
773         rc.cont (rc.cont_cls, 
774                  GNUNET_SYSERR,
775                  _("Failed to receive status response from database."));
776       if (was_transmitted == GNUNET_YES)
777         do_disconnect (h);
778       else
779         process_queue (h);
780       return;
781     }
782   GNUNET_assert (GNUNET_YES == qe->was_transmitted);
783   free_queue_entry (qe);
784   if ( (ntohs(msg->size) < sizeof(struct StatusMessage)) ||
785        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS) ) 
786     {
787       GNUNET_break (0);
788       h->retry_time = GNUNET_TIME_UNIT_ZERO;
789       do_disconnect (h);
790       if (rc.cont != NULL)
791         rc.cont (rc.cont_cls, 
792                  GNUNET_SYSERR,
793                  _("Error reading response from datastore service"));
794       return;
795     }
796   sm = (const struct StatusMessage*) msg;
797   status = ntohl(sm->status);
798   emsg = NULL;
799   if (ntohs(msg->size) > sizeof(struct StatusMessage))
800     {
801       emsg = (const char*) &sm[1];
802       if (emsg[ntohs(msg->size) - sizeof(struct StatusMessage) - 1] != '\0')
803         {
804           GNUNET_break (0);
805           emsg = _("Invalid error message received from datastore service");
806         }
807     }  
808   if ( (status == GNUNET_SYSERR) &&
809        (emsg == NULL) )
810     {
811       GNUNET_break (0);
812       emsg = _("Invalid error message received from datastore service");
813     }
814 #if DEBUG_DATASTORE
815   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
816               "Received status %d/%s\n",
817               (int) status,
818               emsg);
819 #endif
820   GNUNET_STATISTICS_update (h->stats,
821                             gettext_noop ("# status messages received"),
822                             1,
823                             GNUNET_NO);
824   h->retry_time.rel_value = 0;
825   process_queue (h);
826   if (rc.cont != NULL)
827     rc.cont (rc.cont_cls, 
828              status,
829              emsg);
830 }
831
832
833 /**
834  * Store an item in the datastore.  If the item is already present,
835  * the priorities are summed up and the higher expiration time and
836  * lower anonymity level is used.
837  *
838  * @param h handle to the datastore
839  * @param rid reservation ID to use (from "reserve"); use 0 if no
840  *            prior reservation was made
841  * @param key key for the value
842  * @param size number of bytes in data
843  * @param data content stored
844  * @param type type of the content
845  * @param priority priority of the content
846  * @param anonymity anonymity-level for the content
847  * @param replication how often should the content be replicated to other peers?
848  * @param expiration expiration time for the content
849  * @param queue_priority ranking of this request in the priority queue
850  * @param max_queue_size at what queue size should this request be dropped
851  *        (if other requests of higher priority are in the queue)
852  * @param timeout timeout for the operation
853  * @param cont continuation to call when done
854  * @param cont_cls closure for cont
855  * @return NULL if the entry was not queued, otherwise a handle that can be used to
856  *         cancel; note that even if NULL is returned, the callback will be invoked
857  *         (or rather, will already have been invoked)
858  */
859 struct GNUNET_DATASTORE_QueueEntry *
860 GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
861                       uint32_t rid,
862                       const GNUNET_HashCode * key,
863                       size_t size,
864                       const void *data,
865                       enum GNUNET_BLOCK_Type type,
866                       uint32_t priority,
867                       uint32_t anonymity,
868                       uint32_t replication,
869                       struct GNUNET_TIME_Absolute expiration,
870                       unsigned int queue_priority,
871                       unsigned int max_queue_size,
872                       struct GNUNET_TIME_Relative timeout,
873                       GNUNET_DATASTORE_ContinuationWithStatus cont,
874                       void *cont_cls)
875 {
876   struct GNUNET_DATASTORE_QueueEntry *qe;
877   struct DataMessage *dm;
878   size_t msize;
879   union QueueContext qc;
880
881 #if DEBUG_DATASTORE
882   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
883               "Asked to put %u bytes of data under key `%s' for %llu ms\n",
884               size,
885               GNUNET_h2s (key),
886               GNUNET_TIME_absolute_get_remaining (expiration).rel_value);
887 #endif
888   msize = sizeof(struct DataMessage) + size;
889   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
890   qc.sc.cont = cont;
891   qc.sc.cont_cls = cont_cls;
892   qe = make_queue_entry (h, msize,
893                          queue_priority, max_queue_size, timeout,
894                          &process_status_message, &qc);
895   if (qe == NULL)
896     {
897 #if DEBUG_DATASTORE
898       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
899                   "Could not create queue entry for PUT\n");
900 #endif
901       return NULL;
902     }
903   GNUNET_STATISTICS_update (h->stats,
904                             gettext_noop ("# PUT requests executed"),
905                             1,
906                             GNUNET_NO);
907   dm = (struct DataMessage* ) &qe[1];
908   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
909   dm->header.size = htons(msize);
910   dm->rid = htonl(rid);
911   dm->size = htonl( (uint32_t) size);
912   dm->type = htonl(type);
913   dm->priority = htonl(priority);
914   dm->anonymity = htonl(anonymity);
915   dm->replication = htonl (replication);
916   dm->reserved = htonl (0);
917   dm->uid = GNUNET_htonll(0);
918   dm->expiration = GNUNET_TIME_absolute_hton(expiration);
919   dm->key = *key;
920   memcpy (&dm[1], data, size);
921   process_queue (h);
922   return qe;
923 }
924
925
926 /**
927  * Reserve space in the datastore.  This function should be used
928  * to avoid "out of space" failures during a longer sequence of "put"
929  * operations (for example, when a file is being inserted).
930  *
931  * @param h handle to the datastore
932  * @param amount how much space (in bytes) should be reserved (for content only)
933  * @param entries how many entries will be created (to calculate per-entry overhead)
934  * @param queue_priority ranking of this request in the priority queue
935  * @param max_queue_size at what queue size should this request be dropped
936  *        (if other requests of higher priority are in the queue)
937  * @param timeout how long to wait at most for a response (or before dying in queue)
938  * @param cont continuation to call when done; "success" will be set to
939  *             a positive reservation value if space could be reserved.
940  * @param cont_cls closure for cont
941  * @return NULL if the entry was not queued, otherwise a handle that can be used to
942  *         cancel; note that even if NULL is returned, the callback will be invoked
943  *         (or rather, will already have been invoked)
944  */
945 struct GNUNET_DATASTORE_QueueEntry *
946 GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
947                           uint64_t amount,
948                           uint32_t entries,
949                           unsigned int queue_priority,
950                           unsigned int max_queue_size,
951                           struct GNUNET_TIME_Relative timeout,
952                           GNUNET_DATASTORE_ContinuationWithStatus cont,
953                           void *cont_cls)
954 {
955   struct GNUNET_DATASTORE_QueueEntry *qe;
956   struct ReserveMessage *rm;
957   union QueueContext qc;
958
959   if (cont == NULL)
960     cont = &drop_status_cont;
961 #if DEBUG_DATASTORE
962   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
963               "Asked to reserve %llu bytes of data and %u entries'\n",
964               (unsigned long long) amount,
965               (unsigned int) entries);
966 #endif
967   qc.sc.cont = cont;
968   qc.sc.cont_cls = cont_cls;
969   qe = make_queue_entry (h, sizeof(struct ReserveMessage),
970                          queue_priority, max_queue_size, timeout,
971                          &process_status_message, &qc);
972   if (qe == NULL)
973     {
974 #if DEBUG_DATASTORE
975       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
976                   "Could not create queue entry to reserve\n");
977 #endif
978       return NULL;
979     }
980   GNUNET_STATISTICS_update (h->stats,
981                             gettext_noop ("# RESERVE requests executed"),
982                             1,
983                             GNUNET_NO);
984   rm = (struct ReserveMessage*) &qe[1];
985   rm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
986   rm->header.size = htons(sizeof (struct ReserveMessage));
987   rm->entries = htonl(entries);
988   rm->amount = GNUNET_htonll(amount);
989   process_queue (h);
990   return qe;
991 }
992
993
994 /**
995  * Signal that all of the data for which a reservation was made has
996  * been stored and that whatever excess space might have been reserved
997  * can now be released.
998  *
999  * @param h handle to the datastore
1000  * @param rid reservation ID (value of "success" in original continuation
1001  *        from the "reserve" function).
1002  * @param queue_priority ranking of this request in the priority queue
1003  * @param max_queue_size at what queue size should this request be dropped
1004  *        (if other requests of higher priority are in the queue)
1005  * @param queue_priority ranking of this request in the priority queue
1006  * @param max_queue_size at what queue size should this request be dropped
1007  *        (if other requests of higher priority are in the queue)
1008  * @param timeout how long to wait at most for a response
1009  * @param cont continuation to call when done
1010  * @param cont_cls closure for cont
1011  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1012  *         cancel; note that even if NULL is returned, the callback will be invoked
1013  *         (or rather, will already have been invoked)
1014  */
1015 struct GNUNET_DATASTORE_QueueEntry *
1016 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
1017                                   uint32_t rid,
1018                                   unsigned int queue_priority,
1019                                   unsigned int max_queue_size,
1020                                   struct GNUNET_TIME_Relative timeout,
1021                                   GNUNET_DATASTORE_ContinuationWithStatus cont,
1022                                   void *cont_cls)
1023 {
1024   struct GNUNET_DATASTORE_QueueEntry *qe;
1025   struct ReleaseReserveMessage *rrm;
1026   union QueueContext qc;
1027
1028   if (cont == NULL)
1029     cont = &drop_status_cont;
1030 #if DEBUG_DATASTORE
1031   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1032               "Asked to release reserve %d\n",
1033               rid);
1034 #endif
1035   qc.sc.cont = cont;
1036   qc.sc.cont_cls = cont_cls;
1037   qe = make_queue_entry (h, sizeof(struct ReleaseReserveMessage),
1038                          queue_priority, max_queue_size, timeout,
1039                          &process_status_message, &qc);
1040   if (qe == NULL)
1041     {
1042 #if DEBUG_DATASTORE
1043       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1044                   "Could not create queue entry to release reserve\n");
1045 #endif
1046       return NULL;
1047     }
1048   GNUNET_STATISTICS_update (h->stats,
1049                             gettext_noop ("# RELEASE RESERVE requests executed"),
1050                             1,
1051                             GNUNET_NO);
1052   rrm = (struct ReleaseReserveMessage*) &qe[1];
1053   rrm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
1054   rrm->header.size = htons(sizeof (struct ReleaseReserveMessage));
1055   rrm->rid = htonl(rid);
1056   process_queue (h);
1057   return qe;
1058 }
1059
1060
1061 /**
1062  * Update a value in the datastore.
1063  *
1064  * @param h handle to the datastore
1065  * @param uid identifier for the value
1066  * @param priority how much to increase the priority of the value
1067  * @param expiration new expiration value should be MAX of existing and this argument
1068  * @param queue_priority ranking of this request in the priority queue
1069  * @param max_queue_size at what queue size should this request be dropped
1070  *        (if other requests of higher priority are in the queue)
1071  * @param timeout how long to wait at most for a response
1072  * @param cont continuation to call when done
1073  * @param cont_cls closure for cont
1074  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1075  *         cancel; note that even if NULL is returned, the callback will be invoked
1076  *         (or rather, will already have been invoked)
1077  */
1078 struct GNUNET_DATASTORE_QueueEntry *
1079 GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
1080                          uint64_t uid,
1081                          uint32_t priority,
1082                          struct GNUNET_TIME_Absolute expiration,
1083                          unsigned int queue_priority,
1084                          unsigned int max_queue_size,
1085                          struct GNUNET_TIME_Relative timeout,
1086                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1087                          void *cont_cls)
1088 {
1089   struct GNUNET_DATASTORE_QueueEntry *qe;
1090   struct UpdateMessage *um;
1091   union QueueContext qc;
1092
1093   if (cont == NULL)
1094     cont = &drop_status_cont;
1095 #if DEBUG_DATASTORE
1096   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1097               "Asked to update entry %llu raising priority by %u and expiration to %llu\n",
1098               uid,
1099               (unsigned int) priority,
1100               (unsigned long long) expiration.abs_value);
1101 #endif
1102   qc.sc.cont = cont;
1103   qc.sc.cont_cls = cont_cls;
1104   qe = make_queue_entry (h, sizeof(struct UpdateMessage),
1105                          queue_priority, max_queue_size, timeout,
1106                          &process_status_message, &qc);
1107   if (qe == NULL)
1108     {
1109 #if DEBUG_DATASTORE
1110       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1111                   "Could not create queue entry for UPDATE\n");
1112 #endif
1113       return NULL;
1114     }
1115   GNUNET_STATISTICS_update (h->stats,
1116                             gettext_noop ("# UPDATE requests executed"),
1117                             1,
1118                             GNUNET_NO);
1119   um = (struct UpdateMessage*) &qe[1];
1120   um->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
1121   um->header.size = htons(sizeof (struct UpdateMessage));
1122   um->priority = htonl(priority);
1123   um->expiration = GNUNET_TIME_absolute_hton(expiration);
1124   um->uid = GNUNET_htonll(uid);
1125   process_queue (h);
1126   return qe;
1127 }
1128
1129
1130 /**
1131  * Explicitly remove some content from the database.
1132  * The "cont"inuation will be called with status
1133  * "GNUNET_OK" if content was removed, "GNUNET_NO"
1134  * if no matching entry was found and "GNUNET_SYSERR"
1135  * on all other types of errors.
1136  *
1137  * @param h handle to the datastore
1138  * @param key key for the value
1139  * @param size number of bytes in data
1140  * @param data content stored
1141  * @param queue_priority ranking of this request in the priority queue
1142  * @param max_queue_size at what queue size should this request be dropped
1143  *        (if other requests of higher priority are in the queue)
1144  * @param timeout how long to wait at most for a response
1145  * @param cont continuation to call when done
1146  * @param cont_cls closure for cont
1147  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1148  *         cancel; note that even if NULL is returned, the callback will be invoked
1149  *         (or rather, will already have been invoked)
1150  */
1151 struct GNUNET_DATASTORE_QueueEntry *
1152 GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
1153                          const GNUNET_HashCode *key,
1154                          size_t size,
1155                          const void *data,
1156                          unsigned int queue_priority,
1157                          unsigned int max_queue_size,
1158                          struct GNUNET_TIME_Relative timeout,
1159                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1160                          void *cont_cls)
1161 {
1162   struct GNUNET_DATASTORE_QueueEntry *qe;
1163   struct DataMessage *dm;
1164   size_t msize;
1165   union QueueContext qc;
1166
1167   if (cont == NULL)
1168     cont = &drop_status_cont;
1169 #if DEBUG_DATASTORE
1170   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1171               "Asked to remove %u bytes under key `%s'\n",
1172               size,
1173               GNUNET_h2s (key));
1174 #endif
1175   qc.sc.cont = cont;
1176   qc.sc.cont_cls = cont_cls;
1177   msize = sizeof(struct DataMessage) + size;
1178   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1179   qe = make_queue_entry (h, msize,
1180                          queue_priority, max_queue_size, timeout,
1181                          &process_status_message, &qc);
1182   if (qe == NULL)
1183     {
1184 #if DEBUG_DATASTORE
1185       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1186                   "Could not create queue entry for REMOVE\n");
1187 #endif
1188       return NULL;
1189     }
1190   GNUNET_STATISTICS_update (h->stats,
1191                             gettext_noop ("# REMOVE requests executed"),
1192                             1,
1193                             GNUNET_NO);
1194   dm = (struct DataMessage*) &qe[1];
1195   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
1196   dm->header.size = htons(msize);
1197   dm->rid = htonl(0);
1198   dm->size = htonl(size);
1199   dm->type = htonl(0);
1200   dm->priority = htonl(0);
1201   dm->anonymity = htonl(0);
1202   dm->uid = GNUNET_htonll(0);
1203   dm->expiration = GNUNET_TIME_absolute_hton(GNUNET_TIME_UNIT_ZERO_ABS);
1204   dm->key = *key;
1205   memcpy (&dm[1], data, size);
1206   process_queue (h);
1207   return qe;
1208 }
1209
1210
1211 /**
1212  * Type of a function to call when we receive a message
1213  * from the service.
1214  *
1215  * @param cls closure
1216  * @param msg message received, NULL on timeout or fatal error
1217  */
1218 static void 
1219 process_result_message (void *cls,
1220                         const struct GNUNET_MessageHeader *msg)
1221 {
1222   struct GNUNET_DATASTORE_Handle *h = cls;
1223   struct GNUNET_DATASTORE_QueueEntry *qe;
1224   struct ResultContext rc;
1225   const struct DataMessage *dm;
1226
1227   h->in_receive = GNUNET_NO;
1228   if (h->skip_next_messages > 0)
1229     {
1230       h->skip_next_messages--;
1231       process_queue (h);
1232       return;
1233     }
1234   if (msg == NULL)
1235     {
1236       qe = h->queue_head;
1237       GNUNET_assert (NULL != qe);
1238       if (qe->was_transmitted == GNUNET_YES)
1239         {
1240           rc = qe->qc.rc;
1241           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1242                       _("Failed to receive response from database.\n"));
1243           do_disconnect (h);
1244           free_queue_entry (qe);
1245           if (rc.proc != NULL)
1246             rc.proc (rc.proc_cls,
1247                      NULL, 0, NULL, 0, 0, 0, 
1248                      GNUNET_TIME_UNIT_ZERO_ABS, 0);    
1249         }
1250       else
1251         process_queue (h);
1252       return;
1253     }
1254   if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) 
1255     {
1256       GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader));
1257       qe = h->queue_head;
1258       rc = qe->qc.rc;
1259       GNUNET_assert (GNUNET_YES == qe->was_transmitted);
1260       free_queue_entry (qe);
1261 #if DEBUG_DATASTORE
1262       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1263                   "Received end of result set, new queue size is %u\n",
1264                   h->queue_size);
1265 #endif
1266       if (rc.proc != NULL)
1267         rc.proc (rc.proc_cls,
1268                  NULL, 0, NULL, 0, 0, 0, 
1269                  GNUNET_TIME_UNIT_ZERO_ABS, 0); 
1270       h->retry_time.rel_value = 0;
1271       h->result_count = 0;
1272       process_queue (h);
1273       return;
1274     }
1275   qe = h->queue_head;
1276   rc = qe->qc.rc;
1277   GNUNET_assert (GNUNET_YES == qe->was_transmitted);
1278   if ( (ntohs(msg->size) < sizeof(struct DataMessage)) ||
1279        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) ||
1280        (ntohs(msg->size) != sizeof(struct DataMessage) + ntohl (((const struct DataMessage*)msg)->size)) )
1281     {
1282       GNUNET_break (0);
1283       free_queue_entry (qe);
1284       h->retry_time = GNUNET_TIME_UNIT_ZERO;
1285       do_disconnect (h);
1286       if (rc.proc != NULL)
1287         rc.proc (rc.proc_cls,
1288                  NULL, 0, NULL, 0, 0, 0, 
1289                  GNUNET_TIME_UNIT_ZERO_ABS, 0);
1290       return;
1291     }
1292   GNUNET_STATISTICS_update (h->stats,
1293                             gettext_noop ("# Results received"),
1294                             1,
1295                             GNUNET_NO);
1296   dm = (const struct DataMessage*) msg;
1297 #if DEBUG_DATASTORE
1298   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1299               "Received result %llu with type %u and size %u with key %s\n",
1300               (unsigned long long) GNUNET_ntohll(dm->uid),
1301               ntohl(dm->type),
1302               ntohl(dm->size),
1303               GNUNET_h2s(&dm->key));
1304 #endif
1305   free_queue_entry (qe);
1306   h->retry_time.rel_value = 0;
1307   process_queue (h);
1308   if (rc.proc != NULL)
1309     rc.proc (rc.proc_cls,
1310              &dm->key,
1311              ntohl(dm->size),
1312              &dm[1],
1313              ntohl(dm->type),
1314              ntohl(dm->priority),
1315              ntohl(dm->anonymity),
1316              GNUNET_TIME_absolute_ntoh(dm->expiration), 
1317              GNUNET_ntohll(dm->uid));
1318 }
1319
1320
1321 /**
1322  * Get a random value from the datastore for content replication.
1323  * Returns a single, random value among those with the highest
1324  * replication score, lowering positive replication scores by one for
1325  * the chosen value (if only content with a replication score exists,
1326  * a random value is returned and replication scores are not changed).
1327  *
1328  * @param h handle to the datastore
1329  * @param queue_priority ranking of this request in the priority queue
1330  * @param max_queue_size at what queue size should this request be dropped
1331  *        (if other requests of higher priority are in the queue)
1332  * @param timeout how long to wait at most for a response
1333  * @param proc function to call on a random value; it
1334  *        will be called once with a value (if available)
1335  *        and always once with a value of NULL.
1336  * @param proc_cls closure for proc
1337  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1338  *         cancel
1339  */
1340 struct GNUNET_DATASTORE_QueueEntry *
1341 GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
1342                                       unsigned int queue_priority,
1343                                       unsigned int max_queue_size,
1344                                       struct GNUNET_TIME_Relative timeout,
1345                                       GNUNET_DATASTORE_DatumProcessor proc, 
1346                                       void *proc_cls)
1347 {
1348   struct GNUNET_DATASTORE_QueueEntry *qe;
1349   struct GNUNET_MessageHeader *m;
1350   union QueueContext qc;
1351
1352   GNUNET_assert (NULL != proc);
1353 #if DEBUG_DATASTORE
1354   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1355               "Asked to get replication entry in %llu ms\n",
1356               (unsigned long long) timeout.rel_value);
1357 #endif
1358   qc.rc.proc = proc;
1359   qc.rc.proc_cls = proc_cls;
1360   qe = make_queue_entry (h, sizeof(struct GNUNET_MessageHeader),
1361                          queue_priority, max_queue_size, timeout,
1362                          &process_result_message, &qc);
1363   if (qe == NULL)
1364     {
1365 #if DEBUG_DATASTORE
1366       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1367                   "Could not create queue entry for GET REPLICATION\n");
1368 #endif
1369       return NULL;    
1370     }
1371   GNUNET_STATISTICS_update (h->stats,
1372                             gettext_noop ("# GET REPLICATION requests executed"),
1373                             1,
1374                             GNUNET_NO);
1375   m = (struct GNUNET_MessageHeader*) &qe[1];
1376   m->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION);
1377   m->size = htons(sizeof (struct GNUNET_MessageHeader));
1378   process_queue (h);
1379   return qe;
1380 }
1381
1382
1383 /**
1384  * Get a single zero-anonymity value from the datastore.
1385  *
1386  * @param h handle to the datastore
1387  * @param offset offset of the result (mod #num-results); set to
1388  *               a random 64-bit value initially; then increment by
1389  *               one each time; detect that all results have been found by uid
1390  *               being again the first uid ever returned.
1391  * @param queue_priority ranking of this request in the priority queue
1392  * @param max_queue_size at what queue size should this request be dropped
1393  *        (if other requests of higher priority are in the queue)
1394  * @param timeout how long to wait at most for a response
1395  * @param type allowed type for the operation (never zero)
1396  * @param proc function to call on a random value; it
1397  *        will be called once with a value (if available)
1398  *        or with NULL if none value exists.
1399  * @param proc_cls closure for proc
1400  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1401  *         cancel
1402  */
1403 struct GNUNET_DATASTORE_QueueEntry *
1404 GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1405                                      uint64_t offset,
1406                                      unsigned int queue_priority,
1407                                      unsigned int max_queue_size,
1408                                      struct GNUNET_TIME_Relative timeout,
1409                                      enum GNUNET_BLOCK_Type type,
1410                                      GNUNET_DATASTORE_DatumProcessor proc, 
1411                                      void *proc_cls)
1412 {
1413   struct GNUNET_DATASTORE_QueueEntry *qe;
1414   struct GetZeroAnonymityMessage *m;
1415   union QueueContext qc;
1416
1417   GNUNET_assert (NULL != proc);
1418   GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY);
1419 #if DEBUG_DATASTORE
1420   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1421               "Asked to get %llu-th zero-anonymity entry of type %d in %llu ms\n",
1422               (unsigned long long) offset,
1423               type,
1424               (unsigned long long) timeout.rel_value);
1425 #endif
1426   qc.rc.proc = proc;
1427   qc.rc.proc_cls = proc_cls;
1428   qe = make_queue_entry (h, sizeof(struct GetZeroAnonymityMessage),
1429                          queue_priority, max_queue_size, timeout,
1430                          &process_result_message, &qc);
1431   if (qe == NULL)
1432     {
1433 #if DEBUG_DATASTORE
1434       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1435                   "Could not create queue entry for zero-anonymity procation\n");
1436 #endif
1437       return NULL;    
1438     }
1439   GNUNET_STATISTICS_update (h->stats,
1440                             gettext_noop ("# GET ZERO ANONYMITY requests executed"),
1441                             1,
1442                             GNUNET_NO);
1443   m = (struct GetZeroAnonymityMessage*) &qe[1];
1444   m->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY);
1445   m->header.size = htons(sizeof (struct GetZeroAnonymityMessage));
1446   m->type = htonl ((uint32_t) type);
1447   m->offset = GNUNET_htonll (offset);
1448   process_queue (h);
1449   return qe;
1450 }
1451
1452
1453 /**
1454  * Get a result for a particular key from the datastore.  The processor
1455  * will only be called once.
1456  *
1457  * @param h handle to the datastore
1458  * @param offset offset of the result (mod #num-results); set to
1459  *               a random 64-bit value initially; then increment by
1460  *               one each time; detect that all results have been found by uid
1461  *               being again the first uid ever returned.
1462  * @param key maybe NULL (to match all entries)
1463  * @param type desired type, 0 for any
1464  * @param queue_priority ranking of this request in the priority queue
1465  * @param max_queue_size at what queue size should this request be dropped
1466  *        (if other requests of higher priority are in the queue)
1467  * @param timeout how long to wait at most for a response
1468  * @param proc function to call on each matching value;
1469  *        will be called once with a NULL value at the end
1470  * @param proc_cls closure for proc
1471  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1472  *         cancel
1473  */
1474 struct GNUNET_DATASTORE_QueueEntry *
1475 GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h,
1476                           uint64_t offset,
1477                           const GNUNET_HashCode * key,
1478                           enum GNUNET_BLOCK_Type type,
1479                           unsigned int queue_priority,
1480                           unsigned int max_queue_size,
1481                           struct GNUNET_TIME_Relative timeout,
1482                           GNUNET_DATASTORE_DatumProcessor proc, 
1483                           void *proc_cls)
1484 {
1485   struct GNUNET_DATASTORE_QueueEntry *qe;
1486   struct GetMessage *gm;
1487   union QueueContext qc;
1488
1489   GNUNET_assert (NULL != proc);
1490 #if DEBUG_DATASTORE
1491   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1492               "Asked to look for data of type %u under key `%s'\n",
1493               (unsigned int) type,
1494               GNUNET_h2s (key));
1495 #endif
1496   qc.rc.proc = proc;
1497   qc.rc.proc_cls = proc_cls;
1498   qe = make_queue_entry (h, sizeof(struct GetMessage),
1499                          queue_priority, max_queue_size, timeout,
1500                          &process_result_message, &qc);
1501   if (qe == NULL)
1502     {
1503 #if DEBUG_DATASTORE
1504       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1505                   "Could not queue request for `%s'\n",
1506                   GNUNET_h2s (key));
1507 #endif
1508       return NULL;
1509     }
1510   GNUNET_STATISTICS_update (h->stats,
1511                             gettext_noop ("# GET requests executed"),
1512                             1,
1513                             GNUNET_NO);
1514   gm = (struct GetMessage*) &qe[1];
1515   gm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1516   gm->type = htonl(type);
1517   gm->offset = GNUNET_htonll (offset);
1518   if (key != NULL)
1519     {
1520       gm->header.size = htons(sizeof (struct GetMessage));
1521       gm->key = *key;
1522     }
1523   else
1524     {
1525       gm->header.size = htons(sizeof (struct GetMessage) - sizeof(GNUNET_HashCode));
1526     }
1527   process_queue (h);
1528   return qe;
1529 }
1530
1531
1532 /**
1533  * Cancel a datastore operation.  The final callback from the
1534  * operation must not have been done yet.
1535  * 
1536  * @param qe operation to cancel
1537  */
1538 void
1539 GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
1540 {
1541   struct GNUNET_DATASTORE_Handle *h;
1542
1543   GNUNET_assert (GNUNET_SYSERR != qe->was_transmitted);
1544   h = qe->h;
1545 #if DEBUG_DATASTORE
1546   GNUNET_log  (GNUNET_ERROR_TYPE_DEBUG,
1547                "Pending DATASTORE request %p cancelled (%d, %d)\n",
1548                qe,
1549                qe->was_transmitted,
1550                h->queue_head == qe);
1551 #endif
1552   if (GNUNET_YES == qe->was_transmitted) 
1553     {
1554       free_queue_entry (qe);
1555       h->skip_next_messages++;
1556       return;
1557     }
1558   free_queue_entry (qe);
1559   process_queue (h);
1560 }
1561
1562
1563 /* end of datastore_api.c */