improving datastore API --- not working yet
[oweals/gnunet.git] / src / datastore / datastore_api.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/datastore_api.c
23  * @brief Management for the datastore for files stored on a GNUnet node.  Implements
24  *        a priority queue for requests (with timeouts).
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet_arm_service.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_datastore_service.h"
31 #include "gnunet_statistics_service.h"
32 #include "datastore.h"
33
34 /**
35  * If a client stopped asking for more results, how many more do
36  * we receive from the DB before killing the connection?  Trade-off
37  * between re-doing TCP handshakes and (needlessly) receiving 
38  * useless results.
39  */
40 #define MAX_EXCESS_RESULTS 8
41
42 /**
43  * Context for processing status messages.
44  */
45 struct StatusContext
46 {
47   /**
48    * Continuation to call with the status.
49    */
50   GNUNET_DATASTORE_ContinuationWithStatus cont;
51
52   /**
53    * Closure for cont.
54    */
55   void *cont_cls;
56
57 };
58
59
60 /**
61  * Context for processing result messages.
62  */
63 struct ResultContext
64 {
65   /**
66    * Iterator to call with the result.
67    */
68   GNUNET_DATASTORE_Iterator iter;
69
70   /**
71    * Closure for iter.
72    */
73   void *iter_cls;
74
75 };
76
77
78 /**
79  *  Context for a queue operation.
80  */
81 union QueueContext
82 {
83
84   struct StatusContext sc;
85   
86   struct ResultContext rc;
87
88 };
89
90
91
92 /**
93  * Entry in our priority queue.
94  */
95 struct GNUNET_DATASTORE_QueueEntry
96 {
97
98   /**
99    * This is a linked list.
100    */
101   struct GNUNET_DATASTORE_QueueEntry *next;
102
103   /**
104    * This is a linked list.
105    */
106   struct GNUNET_DATASTORE_QueueEntry *prev;
107
108   /**
109    * Handle to the master context.
110    */
111   struct GNUNET_DATASTORE_Handle *h;
112
113   /**
114    * Response processor (NULL if we are not waiting for a response).
115    * This struct should be used for the closure, function-specific
116    * arguments can be passed via 'qc'.
117    */
118   GNUNET_CLIENT_MessageHandler response_proc;
119
120   /**
121    * Function to call after transmission of the request.
122    */
123   GNUNET_DATASTORE_ContinuationWithStatus cont;
124    
125   /**
126    * Closure for 'cont'.
127    */
128   void *cont_cls;
129
130   /**
131    * Context for the operation.
132    */
133   union QueueContext qc;
134
135   /**
136    * Task for timeout signalling.
137    */
138   GNUNET_SCHEDULER_TaskIdentifier task;
139
140   /**
141    * Timeout for the current operation.
142    */
143   struct GNUNET_TIME_Absolute timeout;
144
145   /**
146    * Priority in the queue.
147    */
148   unsigned int priority;
149
150   /**
151    * Maximum allowed length of queue (otherwise
152    * this request should be discarded).
153    */
154   unsigned int max_queue;
155
156   /**
157    * Number of bytes in the request message following
158    * this struct.  32-bit value for nicer memory
159    * access (and overall struct alignment).
160    */
161   uint32_t message_size;
162
163   /**
164    * Has this message been transmitted to the service?
165    * Only ever GNUNET_YES for the head of the queue.
166    * Note that the overall struct should end at a 
167    * multiple of 64 bits.
168    */
169   int32_t was_transmitted;
170   
171 };
172
173 /**
174  * Handle to the datastore service. 
175  */
176 struct GNUNET_DATASTORE_Handle
177 {
178
179   /**
180    * Our configuration.
181    */
182   const struct GNUNET_CONFIGURATION_Handle *cfg;
183
184
185   /**
186    * Current connection to the datastore service.
187    */
188   struct GNUNET_CLIENT_Connection *client;
189
190   /**
191    * Handle for statistics.
192    */
193   struct GNUNET_STATISTICS_Handle *stats;
194
195   /**
196    * Current transmit handle.
197    */
198   struct GNUNET_CLIENT_TransmitHandle *th;
199
200   /**
201    * Current head of priority queue.
202    */
203   struct GNUNET_DATASTORE_QueueEntry *queue_head;
204
205   /**
206    * Current tail of priority queue.
207    */
208   struct GNUNET_DATASTORE_QueueEntry *queue_tail;
209
210   /**
211    * Task for trying to reconnect.
212    */
213   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
214
215   /**
216    * How quickly should we retry?  Used for exponential back-off on
217    * connect-errors.
218    */
219   struct GNUNET_TIME_Relative retry_time;
220
221   /**
222    * Number of entries in the queue.
223    */
224   unsigned int queue_size;
225
226   /**
227    * Number of results we're receiving for the current query
228    * after application stopped to care.  Used to determine when
229    * to reset the connection.
230    */
231   unsigned int result_count;
232
233   /**
234    * Are we currently trying to receive from the service?
235    */
236   int in_receive;
237
238 };
239
240
241
242 /**
243  * Connect to the datastore service.
244  *
245  * @param cfg configuration to use
246  * @return handle to use to access the service
247  */
248 struct GNUNET_DATASTORE_Handle *
249 GNUNET_DATASTORE_connect (const struct
250                           GNUNET_CONFIGURATION_Handle
251                           *cfg)
252 {
253   struct GNUNET_CLIENT_Connection *c;
254   struct GNUNET_DATASTORE_Handle *h;
255   
256   c = GNUNET_CLIENT_connect ("datastore", cfg);
257   if (c == NULL)
258     return NULL; /* oops */
259   h = GNUNET_malloc (sizeof(struct GNUNET_DATASTORE_Handle) + 
260                      GNUNET_SERVER_MAX_MESSAGE_SIZE - 1);
261   h->client = c;
262   h->cfg = cfg;
263   h->stats = GNUNET_STATISTICS_create ("datastore-api",
264                                        cfg);
265   return h;
266 }
267
268
269 /**
270  * Transmit DROP message to datastore service.
271  *
272  * @param cls the 'struct GNUNET_DATASTORE_Handle'
273  * @param size number of bytes that can be copied to buf
274  * @param buf where to copy the drop message
275  * @return number of bytes written to buf
276  */
277 static size_t
278 transmit_drop (void *cls,
279                size_t size, 
280                void *buf)
281 {
282   struct GNUNET_DATASTORE_Handle *h = cls;
283   struct GNUNET_MessageHeader *hdr;
284   
285   if (buf == NULL)
286     {
287       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
288                   _("Failed to transmit request to drop database.\n"));
289       GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
290       return 0;
291     }
292   GNUNET_assert (size >= sizeof(struct GNUNET_MessageHeader));
293   hdr = buf;
294   hdr->size = htons(sizeof(struct GNUNET_MessageHeader));
295   hdr->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
296   GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
297   return sizeof(struct GNUNET_MessageHeader);
298 }
299
300
301 /**
302  * Disconnect from the datastore service (and free
303  * associated resources).
304  *
305  * @param h handle to the datastore
306  * @param drop set to GNUNET_YES to delete all data in datastore (!)
307  */
308 void
309 GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
310                              int drop)
311 {
312   struct GNUNET_DATASTORE_QueueEntry *qe;
313
314   if (h->client != NULL)
315     {
316       GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
317       h->client = NULL;
318     }
319   if (h->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
320     {
321       GNUNET_SCHEDULER_cancel (h->reconnect_task);
322       h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
323     }
324   while (NULL != (qe = h->queue_head))
325     {
326       GNUNET_assert (NULL != qe->response_proc);
327       qe->response_proc (qe, NULL);
328     }
329   if (GNUNET_YES == drop) 
330     {
331       h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
332       if (h->client != NULL)
333         {
334           if (NULL != 
335               GNUNET_CLIENT_notify_transmit_ready (h->client,
336                                                    sizeof(struct GNUNET_MessageHeader),
337                                                    GNUNET_TIME_UNIT_MINUTES,
338                                                    GNUNET_YES,
339                                                    &transmit_drop,
340                                                    h))
341             return;
342           GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
343         }
344       GNUNET_break (0);
345     }
346   GNUNET_STATISTICS_destroy (h->stats,
347                              GNUNET_NO);
348   GNUNET_free (h);
349 }
350
351
352 /**
353  * A request has timed out (before being transmitted to the service).
354  *
355  * @param cls the 'struct GNUNET_DATASTORE_QueueEntry'
356  * @param tc scheduler context
357  */
358 static void
359 timeout_queue_entry (void *cls,
360                      const struct GNUNET_SCHEDULER_TaskContext *tc)
361 {
362   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
363
364   GNUNET_STATISTICS_update (qe->h->stats,
365                             gettext_noop ("# queue entry timeouts"),
366                             1,
367                             GNUNET_NO);
368   qe->task = GNUNET_SCHEDULER_NO_TASK;
369   GNUNET_assert (qe->was_transmitted == GNUNET_NO);
370   qe->response_proc (qe, NULL);
371 }
372
373
374 /**
375  * Create a new entry for our priority queue (and possibly discard other entires if
376  * the queue is getting too long).
377  *
378  * @param h handle to the datastore
379  * @param msize size of the message to queue
380  * @param queue_priority priority of the entry
381  * @param max_queue_size at what queue size should this request be dropped
382  *        (if other requests of higher priority are in the queue)
383  * @param timeout timeout for the operation
384  * @param response_proc function to call with replies (can be NULL)
385  * @param qc client context (NOT a closure for response_proc)
386  * @return NULL if the queue is full (and this entry was dropped)
387  */
388 static struct GNUNET_DATASTORE_QueueEntry *
389 make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
390                   size_t msize,
391                   unsigned int queue_priority,
392                   unsigned int max_queue_size,
393                   struct GNUNET_TIME_Relative timeout,
394                   GNUNET_CLIENT_MessageHandler response_proc,            
395                   const union QueueContext *qc)
396 {
397   struct GNUNET_DATASTORE_QueueEntry *ret;
398   struct GNUNET_DATASTORE_QueueEntry *pos;
399   unsigned int c;
400
401   c = 0;
402   pos = h->queue_head;
403   while ( (pos != NULL) &&
404           (c < max_queue_size) &&
405           (pos->priority >= queue_priority) )
406     {
407       c++;
408       pos = pos->next;
409     }
410   ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize);
411   ret->h = h;
412   ret->response_proc = response_proc;
413   ret->qc = *qc;
414   ret->timeout = GNUNET_TIME_relative_to_absolute (timeout);
415   ret->priority = queue_priority;
416   ret->max_queue = max_queue_size;
417   ret->message_size = msize;
418   ret->was_transmitted = GNUNET_NO;
419   if (pos == NULL)
420     {
421       /* append at the tail */
422       pos = h->queue_tail;
423     }
424   else
425     {
426       pos = pos->prev; 
427       /* do not insert at HEAD if HEAD query was already
428          transmitted and we are still receiving replies! */
429       if ( (pos == NULL) &&
430            (h->queue_head->was_transmitted) )
431         pos = h->queue_head;
432     }
433   c++;
434   GNUNET_STATISTICS_update (h->stats,
435                             gettext_noop ("# queue entries created"),
436                             1,
437                             GNUNET_NO);
438   GNUNET_CONTAINER_DLL_insert_after (h->queue_head,
439                                      h->queue_tail,
440                                      pos,
441                                      ret);
442   h->queue_size++;
443   if (c > max_queue_size)
444     {
445       GNUNET_STATISTICS_update (h->stats,
446                                 gettext_noop ("# queue overflows"),
447                                 1,
448                                 GNUNET_NO);
449       response_proc (ret, NULL);
450       return NULL;
451     }
452   ret->task = GNUNET_SCHEDULER_add_delayed (timeout,
453                                             &timeout_queue_entry,
454                                             ret);
455   pos = ret->next;
456   while (pos != NULL) 
457     {
458       if (pos->max_queue < h->queue_size)
459         {
460           GNUNET_assert (pos->response_proc != NULL);
461           pos->response_proc (pos, NULL);
462           break;
463         }
464       pos = pos->next;
465     }
466   return ret;
467 }
468
469
470 /**
471  * Process entries in the queue (or do nothing if we are already
472  * doing so).
473  * 
474  * @param h handle to the datastore
475  */
476 static void
477 process_queue (struct GNUNET_DATASTORE_Handle *h);
478
479
480 /**
481  * Try reconnecting to the datastore service.
482  *
483  * @param cls the 'struct GNUNET_DATASTORE_Handle'
484  * @param tc scheduler context
485  */
486 static void
487 try_reconnect (void *cls,
488                const struct GNUNET_SCHEDULER_TaskContext *tc)
489 {
490   struct GNUNET_DATASTORE_Handle *h = cls;
491
492   if (h->retry_time.rel_value < GNUNET_CONSTANTS_SERVICE_RETRY.rel_value)
493     h->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY;
494   else
495     h->retry_time = GNUNET_TIME_relative_multiply (h->retry_time, 2);
496   if (h->retry_time.rel_value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.rel_value)
497     h->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
498   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
499   h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
500   if (h->client == NULL)
501     {
502       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
503                   "DATASTORE reconnect failed (fatally)\n");
504       return;
505     }
506   GNUNET_STATISTICS_update (h->stats,
507                             gettext_noop ("# datastore connections (re)created"),
508                             1,
509                             GNUNET_NO);
510 #if DEBUG_DATASTORE
511   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
512               "Reconnected to DATASTORE\n");
513 #endif
514   process_queue (h);
515 }
516
517
518 /**
519  * Disconnect from the service and then try reconnecting to the datastore service
520  * after some delay.
521  *
522  * @param h handle to datastore to disconnect and reconnect
523  */
524 static void
525 do_disconnect (struct GNUNET_DATASTORE_Handle *h)
526 {
527   if (h->client == NULL)
528     {
529 #if DEBUG_DATASTORE
530       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
531                   "client NULL in disconnect, will not try to reconnect\n");
532 #endif
533       return;
534     }
535 #if 0
536   GNUNET_STATISTICS_update (stats,
537                             gettext_noop ("# reconnected to DATASTORE"),
538                             1,
539                             GNUNET_NO);
540 #endif
541   GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
542   h->client = NULL;
543   h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->retry_time,
544                                                     &try_reconnect,
545                                                     h);      
546 }
547
548
549 /**
550  * Transmit request from queue to datastore service.
551  *
552  * @param cls the 'struct GNUNET_DATASTORE_Handle'
553  * @param size number of bytes that can be copied to buf
554  * @param buf where to copy the drop message
555  * @return number of bytes written to buf
556  */
557 static size_t
558 transmit_request (void *cls,
559                   size_t size, 
560                   void *buf)
561 {
562   struct GNUNET_DATASTORE_Handle *h = cls;
563   struct GNUNET_DATASTORE_QueueEntry *qe;
564   size_t msize;
565
566   h->th = NULL;
567   if (NULL == (qe = h->queue_head))
568     return 0; /* no entry in queue */
569   if (buf == NULL)
570     {
571       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
572                   _("Failed to transmit request to DATASTORE.\n"));
573       GNUNET_STATISTICS_update (h->stats,
574                                 gettext_noop ("# transmission request failures"),
575                                 1,
576                                 GNUNET_NO);
577       do_disconnect (h);
578       return 0;
579     }
580   if (size < (msize = qe->message_size))
581     {
582       process_queue (h);
583       return 0;
584     }
585  #if DEBUG_DATASTORE
586   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
587               "Transmitting %u byte request to DATASTORE\n",
588               msize);
589 #endif
590   memcpy (buf, &qe[1], msize);
591   qe->was_transmitted = GNUNET_YES;
592   GNUNET_SCHEDULER_cancel (qe->task);
593   qe->task = GNUNET_SCHEDULER_NO_TASK;
594   h->in_receive = GNUNET_YES;
595   GNUNET_CLIENT_receive (h->client,
596                          qe->response_proc,
597                          qe,
598                          GNUNET_TIME_absolute_get_remaining (qe->timeout));
599   GNUNET_STATISTICS_update (h->stats,
600                             gettext_noop ("# bytes sent to datastore"),
601                             1,
602                             GNUNET_NO);
603   return msize;
604 }
605
606
607 /**
608  * Process entries in the queue (or do nothing if we are already
609  * doing so).
610  * 
611  * @param h handle to the datastore
612  */
613 static void
614 process_queue (struct GNUNET_DATASTORE_Handle *h)
615 {
616   struct GNUNET_DATASTORE_QueueEntry *qe;
617
618   if (NULL == (qe = h->queue_head))
619     {
620 #if DEBUG_DATASTORE > 1
621       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
622                   "Queue empty\n");
623 #endif
624       return; /* no entry in queue */
625     }
626   if (qe->was_transmitted == GNUNET_YES)
627     {
628 #if DEBUG_DATASTORE > 1
629       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
630                   "Head request already transmitted\n");
631 #endif
632       return; /* waiting for replies */
633     }
634   if (h->th != NULL)
635     {
636 #if DEBUG_DATASTORE > 1
637       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
638                   "Pending transmission request\n");
639 #endif
640       return; /* request pending */
641     }
642   if (h->client == NULL)
643     {
644 #if DEBUG_DATASTORE > 1
645       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
646                   "Not connected\n");
647 #endif
648       return; /* waiting for reconnect */
649     }
650 #if DEBUG_DATASTORE
651   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
652               "Queueing %u byte request to DATASTORE\n",
653               qe->message_size);
654 #endif
655   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
656                                                qe->message_size,
657                                                GNUNET_TIME_absolute_get_remaining (qe->timeout),
658                                                GNUNET_YES,
659                                                &transmit_request,
660                                                h);
661 }
662
663
664 /**
665  * Dummy continuation used to do nothing (but be non-zero).
666  *
667  * @param cls closure
668  * @param result result 
669  * @param emsg error message
670  */
671 static void
672 drop_status_cont (void *cls, int32_t result, const char *emsg)
673 {
674   /* do nothing */
675 }
676
677
678 static void
679 free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
680 {
681   struct GNUNET_DATASTORE_Handle *h = qe->h;
682
683   GNUNET_CONTAINER_DLL_remove (h->queue_head,
684                                h->queue_tail,
685                                qe);
686   if (qe->task != GNUNET_SCHEDULER_NO_TASK)
687     {
688       GNUNET_SCHEDULER_cancel (qe->task);
689       qe->task = GNUNET_SCHEDULER_NO_TASK;
690     }
691   h->queue_size--;
692   GNUNET_free (qe);
693 }
694
695 /**
696  * Type of a function to call when we receive a message
697  * from the service.
698  *
699  * @param cls closure
700  * @param msg message received, NULL on timeout or fatal error
701  */
702 static void 
703 process_status_message (void *cls,
704                         const struct
705                         GNUNET_MessageHeader * msg)
706 {
707   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
708   struct GNUNET_DATASTORE_Handle *h = qe->h;
709   struct StatusContext rc = qe->qc.sc;
710   const struct StatusMessage *sm;
711   const char *emsg;
712   int32_t status;
713   int was_transmitted;
714
715   h->in_receive = GNUNET_NO;
716   was_transmitted = qe->was_transmitted;
717   if (msg == NULL)
718     {      
719       free_queue_entry (qe);
720       if (NULL == h->client)
721         return; /* forced disconnect */
722       if (rc.cont != NULL)
723         rc.cont (rc.cont_cls, 
724                  GNUNET_SYSERR,
725                  _("Failed to receive status response from database."));
726       if (was_transmitted == GNUNET_YES)
727         do_disconnect (h);
728       return;
729     }
730   GNUNET_assert (GNUNET_YES == qe->was_transmitted);
731   GNUNET_assert (h->queue_head == qe);
732   free_queue_entry (qe);
733   if ( (ntohs(msg->size) < sizeof(struct StatusMessage)) ||
734        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS) ) 
735     {
736       GNUNET_break (0);
737       h->retry_time = GNUNET_TIME_UNIT_ZERO;
738       do_disconnect (h);
739       if (rc.cont != NULL)
740         rc.cont (rc.cont_cls, 
741                  GNUNET_SYSERR,
742                  _("Error reading response from datastore service"));
743       return;
744     }
745   sm = (const struct StatusMessage*) msg;
746   status = ntohl(sm->status);
747   emsg = NULL;
748   if (ntohs(msg->size) > sizeof(struct StatusMessage))
749     {
750       emsg = (const char*) &sm[1];
751       if (emsg[ntohs(msg->size) - sizeof(struct StatusMessage) - 1] != '\0')
752         {
753           GNUNET_break (0);
754           emsg = _("Invalid error message received from datastore service");
755         }
756     }  
757   if ( (status == GNUNET_SYSERR) &&
758        (emsg == NULL) )
759     {
760       GNUNET_break (0);
761       emsg = _("Invalid error message received from datastore service");
762     }
763 #if DEBUG_DATASTORE
764   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
765               "Received status %d/%s\n",
766               (int) status,
767               emsg);
768 #endif
769   GNUNET_STATISTICS_update (h->stats,
770                             gettext_noop ("# status messages received"),
771                             1,
772                             GNUNET_NO);
773   h->retry_time.rel_value = 0;
774   process_queue (h);
775   if (rc.cont != NULL)
776     rc.cont (rc.cont_cls, 
777              status,
778              emsg);
779 }
780
781
782 /**
783  * Store an item in the datastore.  If the item is already present,
784  * the priorities are summed up and the higher expiration time and
785  * lower anonymity level is used.
786  *
787  * @param h handle to the datastore
788  * @param rid reservation ID to use (from "reserve"); use 0 if no
789  *            prior reservation was made
790  * @param key key for the value
791  * @param size number of bytes in data
792  * @param data content stored
793  * @param type type of the content
794  * @param priority priority of the content
795  * @param anonymity anonymity-level for the content
796  * @param replication how often should the content be replicated to other peers?
797  * @param expiration expiration time for the content
798  * @param queue_priority ranking of this request in the priority queue
799  * @param max_queue_size at what queue size should this request be dropped
800  *        (if other requests of higher priority are in the queue)
801  * @param timeout timeout for the operation
802  * @param cont continuation to call when done
803  * @param cont_cls closure for cont
804  * @return NULL if the entry was not queued, otherwise a handle that can be used to
805  *         cancel; note that even if NULL is returned, the callback will be invoked
806  *         (or rather, will already have been invoked)
807  */
808 struct GNUNET_DATASTORE_QueueEntry *
809 GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
810                       uint32_t rid,
811                       const GNUNET_HashCode * key,
812                       size_t size,
813                       const void *data,
814                       enum GNUNET_BLOCK_Type type,
815                       uint32_t priority,
816                       uint32_t anonymity,
817                       uint32_t replication,
818                       struct GNUNET_TIME_Absolute expiration,
819                       unsigned int queue_priority,
820                       unsigned int max_queue_size,
821                       struct GNUNET_TIME_Relative timeout,
822                       GNUNET_DATASTORE_ContinuationWithStatus cont,
823                       void *cont_cls)
824 {
825   struct GNUNET_DATASTORE_QueueEntry *qe;
826   struct DataMessage *dm;
827   size_t msize;
828   union QueueContext qc;
829
830 #if DEBUG_DATASTORE
831   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
832               "Asked to put %u bytes of data under key `%s'\n",
833               size,
834               GNUNET_h2s (key));
835 #endif
836   msize = sizeof(struct DataMessage) + size;
837   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
838   qc.sc.cont = cont;
839   qc.sc.cont_cls = cont_cls;
840   qe = make_queue_entry (h, msize,
841                          queue_priority, max_queue_size, timeout,
842                          &process_status_message, &qc);
843   if (qe == NULL)
844     {
845 #if DEBUG_DATASTORE
846       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
847                   "Could not create queue entry for PUT\n");
848 #endif
849       return NULL;
850     }
851   GNUNET_STATISTICS_update (h->stats,
852                             gettext_noop ("# PUT requests executed"),
853                             1,
854                             GNUNET_NO);
855   dm = (struct DataMessage* ) &qe[1];
856   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
857   dm->header.size = htons(msize);
858   dm->rid = htonl(rid);
859   dm->size = htonl( (uint32_t) size);
860   dm->type = htonl(type);
861   dm->priority = htonl(priority);
862   dm->anonymity = htonl(anonymity);
863   dm->uid = GNUNET_htonll(0);
864   dm->expiration = GNUNET_TIME_absolute_hton(expiration);
865   dm->key = *key;
866   memcpy (&dm[1], data, size);
867   process_queue (h);
868   return qe;
869 }
870
871
872 /**
873  * Reserve space in the datastore.  This function should be used
874  * to avoid "out of space" failures during a longer sequence of "put"
875  * operations (for example, when a file is being inserted).
876  *
877  * @param h handle to the datastore
878  * @param amount how much space (in bytes) should be reserved (for content only)
879  * @param entries how many entries will be created (to calculate per-entry overhead)
880  * @param queue_priority ranking of this request in the priority queue
881  * @param max_queue_size at what queue size should this request be dropped
882  *        (if other requests of higher priority are in the queue)
883  * @param timeout how long to wait at most for a response (or before dying in queue)
884  * @param cont continuation to call when done; "success" will be set to
885  *             a positive reservation value if space could be reserved.
886  * @param cont_cls closure for cont
887  * @return NULL if the entry was not queued, otherwise a handle that can be used to
888  *         cancel; note that even if NULL is returned, the callback will be invoked
889  *         (or rather, will already have been invoked)
890  */
891 struct GNUNET_DATASTORE_QueueEntry *
892 GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
893                           uint64_t amount,
894                           uint32_t entries,
895                           unsigned int queue_priority,
896                           unsigned int max_queue_size,
897                           struct GNUNET_TIME_Relative timeout,
898                           GNUNET_DATASTORE_ContinuationWithStatus cont,
899                           void *cont_cls)
900 {
901   struct GNUNET_DATASTORE_QueueEntry *qe;
902   struct ReserveMessage *rm;
903   union QueueContext qc;
904
905   if (cont == NULL)
906     cont = &drop_status_cont;
907 #if DEBUG_DATASTORE
908   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
909               "Asked to reserve %llu bytes of data and %u entries'\n",
910               (unsigned long long) amount,
911               (unsigned int) entries);
912 #endif
913   qc.sc.cont = cont;
914   qc.sc.cont_cls = cont_cls;
915   qe = make_queue_entry (h, sizeof(struct ReserveMessage),
916                          queue_priority, max_queue_size, timeout,
917                          &process_status_message, &qc);
918   if (qe == NULL)
919     {
920 #if DEBUG_DATASTORE
921       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
922                   "Could not create queue entry to reserve\n");
923 #endif
924       return NULL;
925     }
926   GNUNET_STATISTICS_update (h->stats,
927                             gettext_noop ("# RESERVE requests executed"),
928                             1,
929                             GNUNET_NO);
930   rm = (struct ReserveMessage*) &qe[1];
931   rm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
932   rm->header.size = htons(sizeof (struct ReserveMessage));
933   rm->entries = htonl(entries);
934   rm->amount = GNUNET_htonll(amount);
935   process_queue (h);
936   return qe;
937 }
938
939
940 /**
941  * Signal that all of the data for which a reservation was made has
942  * been stored and that whatever excess space might have been reserved
943  * can now be released.
944  *
945  * @param h handle to the datastore
946  * @param rid reservation ID (value of "success" in original continuation
947  *        from the "reserve" function).
948  * @param queue_priority ranking of this request in the priority queue
949  * @param max_queue_size at what queue size should this request be dropped
950  *        (if other requests of higher priority are in the queue)
951  * @param queue_priority ranking of this request in the priority queue
952  * @param max_queue_size at what queue size should this request be dropped
953  *        (if other requests of higher priority are in the queue)
954  * @param timeout how long to wait at most for a response
955  * @param cont continuation to call when done
956  * @param cont_cls closure for cont
957  * @return NULL if the entry was not queued, otherwise a handle that can be used to
958  *         cancel; note that even if NULL is returned, the callback will be invoked
959  *         (or rather, will already have been invoked)
960  */
961 struct GNUNET_DATASTORE_QueueEntry *
962 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
963                                   uint32_t rid,
964                                   unsigned int queue_priority,
965                                   unsigned int max_queue_size,
966                                   struct GNUNET_TIME_Relative timeout,
967                                   GNUNET_DATASTORE_ContinuationWithStatus cont,
968                                   void *cont_cls)
969 {
970   struct GNUNET_DATASTORE_QueueEntry *qe;
971   struct ReleaseReserveMessage *rrm;
972   union QueueContext qc;
973
974   if (cont == NULL)
975     cont = &drop_status_cont;
976 #if DEBUG_DATASTORE
977   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
978               "Asked to release reserve %d\n",
979               rid);
980 #endif
981   qc.sc.cont = cont;
982   qc.sc.cont_cls = cont_cls;
983   qe = make_queue_entry (h, sizeof(struct ReleaseReserveMessage),
984                          queue_priority, max_queue_size, timeout,
985                          &process_status_message, &qc);
986   if (qe == NULL)
987     {
988 #if DEBUG_DATASTORE
989       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
990                   "Could not create queue entry to release reserve\n");
991 #endif
992       return NULL;
993     }
994   GNUNET_STATISTICS_update (h->stats,
995                             gettext_noop ("# RELEASE RESERVE requests executed"),
996                             1,
997                             GNUNET_NO);
998   rrm = (struct ReleaseReserveMessage*) &qe[1];
999   rrm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
1000   rrm->header.size = htons(sizeof (struct ReleaseReserveMessage));
1001   rrm->rid = htonl(rid);
1002   process_queue (h);
1003   return qe;
1004 }
1005
1006
1007 /**
1008  * Update a value in the datastore.
1009  *
1010  * @param h handle to the datastore
1011  * @param uid identifier for the value
1012  * @param priority how much to increase the priority of the value
1013  * @param expiration new expiration value should be MAX of existing and this argument
1014  * @param queue_priority ranking of this request in the priority queue
1015  * @param max_queue_size at what queue size should this request be dropped
1016  *        (if other requests of higher priority are in the queue)
1017  * @param timeout how long to wait at most for a response
1018  * @param cont continuation to call when done
1019  * @param cont_cls closure for cont
1020  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1021  *         cancel; note that even if NULL is returned, the callback will be invoked
1022  *         (or rather, will already have been invoked)
1023  */
1024 struct GNUNET_DATASTORE_QueueEntry *
1025 GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
1026                          uint64_t uid,
1027                          uint32_t priority,
1028                          struct GNUNET_TIME_Absolute expiration,
1029                          unsigned int queue_priority,
1030                          unsigned int max_queue_size,
1031                          struct GNUNET_TIME_Relative timeout,
1032                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1033                          void *cont_cls)
1034 {
1035   struct GNUNET_DATASTORE_QueueEntry *qe;
1036   struct UpdateMessage *um;
1037   union QueueContext qc;
1038
1039   if (cont == NULL)
1040     cont = &drop_status_cont;
1041 #if DEBUG_DATASTORE
1042   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1043               "Asked to update entry %llu raising priority by %u and expiration to %llu\n",
1044               uid,
1045               (unsigned int) priority,
1046               (unsigned long long) expiration.abs_value);
1047 #endif
1048   qc.sc.cont = cont;
1049   qc.sc.cont_cls = cont_cls;
1050   qe = make_queue_entry (h, sizeof(struct UpdateMessage),
1051                          queue_priority, max_queue_size, timeout,
1052                          &process_status_message, &qc);
1053   if (qe == NULL)
1054     {
1055 #if DEBUG_DATASTORE
1056       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1057                   "Could not create queue entry for UPDATE\n");
1058 #endif
1059       return NULL;
1060     }
1061   GNUNET_STATISTICS_update (h->stats,
1062                             gettext_noop ("# UPDATE requests executed"),
1063                             1,
1064                             GNUNET_NO);
1065   um = (struct UpdateMessage*) &qe[1];
1066   um->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
1067   um->header.size = htons(sizeof (struct UpdateMessage));
1068   um->priority = htonl(priority);
1069   um->expiration = GNUNET_TIME_absolute_hton(expiration);
1070   um->uid = GNUNET_htonll(uid);
1071   process_queue (h);
1072   return qe;
1073 }
1074
1075
1076 /**
1077  * Explicitly remove some content from the database.
1078  * The "cont"inuation will be called with status
1079  * "GNUNET_OK" if content was removed, "GNUNET_NO"
1080  * if no matching entry was found and "GNUNET_SYSERR"
1081  * on all other types of errors.
1082  *
1083  * @param h handle to the datastore
1084  * @param key key for the value
1085  * @param size number of bytes in data
1086  * @param data content stored
1087  * @param queue_priority ranking of this request in the priority queue
1088  * @param max_queue_size at what queue size should this request be dropped
1089  *        (if other requests of higher priority are in the queue)
1090  * @param timeout how long to wait at most for a response
1091  * @param cont continuation to call when done
1092  * @param cont_cls closure for cont
1093  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1094  *         cancel; note that even if NULL is returned, the callback will be invoked
1095  *         (or rather, will already have been invoked)
1096  */
1097 struct GNUNET_DATASTORE_QueueEntry *
1098 GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
1099                          const GNUNET_HashCode *key,
1100                          size_t size, 
1101                          const void *data,
1102                          unsigned int queue_priority,
1103                          unsigned int max_queue_size,
1104                          struct GNUNET_TIME_Relative timeout,
1105                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1106                          void *cont_cls)
1107 {
1108   struct GNUNET_DATASTORE_QueueEntry *qe;
1109   struct DataMessage *dm;
1110   size_t msize;
1111   union QueueContext qc;
1112
1113   if (cont == NULL)
1114     cont = &drop_status_cont;
1115 #if DEBUG_DATASTORE
1116   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1117               "Asked to remove %u bytes under key `%s'\n",
1118               size,
1119               GNUNET_h2s (key));
1120 #endif
1121   qc.sc.cont = cont;
1122   qc.sc.cont_cls = cont_cls;
1123   msize = sizeof(struct DataMessage) + size;
1124   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1125   qe = make_queue_entry (h, msize,
1126                          queue_priority, max_queue_size, timeout,
1127                          &process_status_message, &qc);
1128   if (qe == NULL)
1129     {
1130 #if DEBUG_DATASTORE
1131       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1132                   "Could not create queue entry for REMOVE\n");
1133 #endif
1134       return NULL;
1135     }
1136   GNUNET_STATISTICS_update (h->stats,
1137                             gettext_noop ("# REMOVE requests executed"),
1138                             1,
1139                             GNUNET_NO);
1140   dm = (struct DataMessage*) &qe[1];
1141   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
1142   dm->header.size = htons(msize);
1143   dm->rid = htonl(0);
1144   dm->size = htonl(size);
1145   dm->type = htonl(0);
1146   dm->priority = htonl(0);
1147   dm->anonymity = htonl(0);
1148   dm->uid = GNUNET_htonll(0);
1149   dm->expiration = GNUNET_TIME_absolute_hton(GNUNET_TIME_UNIT_ZERO_ABS);
1150   dm->key = *key;
1151   memcpy (&dm[1], data, size);
1152   process_queue (h);
1153   return qe;
1154 }
1155
1156
1157 /**
1158  * Type of a function to call when we receive a message
1159  * from the service.
1160  *
1161  * @param cls closure
1162  * @param msg message received, NULL on timeout or fatal error
1163  */
1164 static void 
1165 process_result_message (void *cls,
1166                         const struct GNUNET_MessageHeader * msg)
1167 {
1168   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
1169   struct GNUNET_DATASTORE_Handle *h = qe->h;
1170   struct ResultContext rc = qe->qc.rc;
1171   const struct DataMessage *dm;
1172   int was_transmitted;
1173
1174   h->in_receive = GNUNET_NO;
1175   if (msg == NULL)
1176    {
1177       was_transmitted = qe->was_transmitted;
1178       free_queue_entry (qe);
1179       if (was_transmitted == GNUNET_YES)
1180         {
1181           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1182                       _("Failed to receive response from database.\n"));
1183           do_disconnect (h);
1184         }
1185       else
1186         {
1187 #if DEBUG_DATASTORE
1188           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1189                       "Request dropped due to finite datastore queue length.\n");
1190 #endif
1191         }
1192       if (rc.iter != NULL)
1193         rc.iter (rc.iter_cls,
1194                  NULL, 0, NULL, 0, 0, 0, 
1195                  GNUNET_TIME_UNIT_ZERO_ABS, 0); 
1196       return;
1197     }
1198   GNUNET_assert (GNUNET_YES == qe->was_transmitted);
1199   GNUNET_assert (h->queue_head == qe);
1200   if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) 
1201     {
1202       GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader));
1203       free_queue_entry (qe);
1204 #if DEBUG_DATASTORE
1205       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1206                   "Received end of result set, new queue size is %u\n",
1207                   h->queue_size);
1208 #endif
1209       if (rc.iter != NULL)
1210         rc.iter (rc.iter_cls,
1211                  NULL, 0, NULL, 0, 0, 0, 
1212                  GNUNET_TIME_UNIT_ZERO_ABS, 0); 
1213       h->retry_time.rel_value = 0;
1214       h->result_count = 0;
1215       process_queue (h);
1216       return;
1217     }
1218   if ( (ntohs(msg->size) < sizeof(struct DataMessage)) ||
1219        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) ||
1220        (ntohs(msg->size) != sizeof(struct DataMessage) + ntohl (((const struct DataMessage*)msg)->size)) )
1221     {
1222       GNUNET_break (0);
1223       free_queue_entry (qe);
1224       h->retry_time = GNUNET_TIME_UNIT_ZERO;
1225       do_disconnect (h);
1226       if (rc.iter != NULL)
1227         rc.iter (rc.iter_cls,
1228                  NULL, 0, NULL, 0, 0, 0, 
1229                  GNUNET_TIME_UNIT_ZERO_ABS, 0); 
1230       return;
1231     }
1232   GNUNET_STATISTICS_update (h->stats,
1233                             gettext_noop ("# Results received"),
1234                             1,
1235                             GNUNET_NO);
1236   if (rc.iter == NULL)
1237     {
1238       h->result_count++;
1239       GNUNET_STATISTICS_update (h->stats,
1240                                 gettext_noop ("# Excess results received"),
1241                                 1,
1242                                 GNUNET_NO);
1243       if (h->result_count > MAX_EXCESS_RESULTS)
1244         {
1245           free_queue_entry (qe);
1246           GNUNET_STATISTICS_update (h->stats,
1247                                     gettext_noop ("# Forced database connection resets"),
1248                                     1,
1249                                     GNUNET_NO);
1250           h->retry_time = GNUNET_TIME_UNIT_ZERO;
1251           do_disconnect (h);      
1252           return;
1253         }
1254       GNUNET_DATASTORE_iterate_get_next (h);
1255       return;
1256     }
1257   dm = (const struct DataMessage*) msg;
1258 #if DEBUG_DATASTORE
1259   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1260               "Received result %llu with type %u and size %u with key %s\n",
1261               (unsigned long long) GNUNET_ntohll(dm->uid),
1262               ntohl(dm->type),
1263               ntohl(dm->size),
1264               GNUNET_h2s(&dm->key));
1265 #endif
1266   h->retry_time.rel_value = 0;
1267   rc.iter (rc.iter_cls,
1268            &dm->key,
1269            ntohl(dm->size),
1270            &dm[1],
1271            ntohl(dm->type),
1272            ntohl(dm->priority),
1273            ntohl(dm->anonymity),
1274            GNUNET_TIME_absolute_ntoh(dm->expiration),   
1275            GNUNET_ntohll(dm->uid));
1276 }
1277
1278
1279 /**
1280  * Get a random value from the datastore for content replication.
1281  * Returns a single, random value among those with the highest
1282  * replication score, lowering positive replication scores by one for
1283  * the chosen value (if only content with a replication score exists,
1284  * a random value is returned and replication scores are not changed).
1285  *
1286  * @param h handle to the datastore
1287  * @param queue_priority ranking of this request in the priority queue
1288  * @param max_queue_size at what queue size should this request be dropped
1289  *        (if other requests of higher priority are in the queue)
1290  * @param timeout how long to wait at most for a response
1291  * @param iter function to call on a random value; it
1292  *        will be called once with a value (if available)
1293  *        and always once with a value of NULL.
1294  * @param iter_cls closure for iter
1295  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1296  *         cancel; note that even if NULL is returned, the callback will be invoked
1297  *         (or rather, will already have been invoked)
1298  */
1299 struct GNUNET_DATASTORE_QueueEntry *
1300 GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
1301                                       unsigned int queue_priority,
1302                                       unsigned int max_queue_size,
1303                                       struct GNUNET_TIME_Relative timeout,
1304                                       GNUNET_DATASTORE_Iterator iter, 
1305                                       void *iter_cls)
1306 {
1307   struct GNUNET_DATASTORE_QueueEntry *qe;
1308   struct GNUNET_MessageHeader *m;
1309   union QueueContext qc;
1310
1311 #if DEBUG_DATASTORE
1312   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1313               "Asked to get random entry in %llu ms\n",
1314               (unsigned long long) timeout.rel_value);
1315 #endif
1316   qc.rc.iter = iter;
1317   qc.rc.iter_cls = iter_cls;
1318   qe = make_queue_entry (h, sizeof(struct GNUNET_MessageHeader),
1319                          queue_priority, max_queue_size, timeout,
1320                          &process_result_message, &qc);
1321   if (qe == NULL)
1322     {
1323 #if DEBUG_DATASTORE
1324       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1325                   "Could not create queue entry for GET RANDOM\n");
1326 #endif
1327       return NULL;    
1328     }
1329   GNUNET_STATISTICS_update (h->stats,
1330                             gettext_noop ("# GET RANDOM requests executed"),
1331                             1,
1332                             GNUNET_NO);
1333   m = (struct GNUNET_MessageHeader*) &qe[1];
1334   m->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM);
1335   m->size = htons(sizeof (struct GNUNET_MessageHeader));
1336   process_queue (h);
1337   return qe;
1338 }
1339
1340
1341 /**
1342  * Get a zero-anonymity value from the datastore.
1343  *
1344  * @param h handle to the datastore
1345  * @param queue_priority ranking of this request in the priority queue
1346  * @param max_queue_size at what queue size should this request be dropped
1347  *        (if other requests of higher priority are in the queue)
1348  * @param timeout how long to wait at most for a response
1349  * @param type allowed type for the operation
1350  * @param iter function to call on a random value; it
1351  *        will be called once with a value (if available)
1352  *        and always once with a value of NULL.
1353  * @param iter_cls closure for iter
1354  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1355  *         cancel; note that even if NULL is returned, the callback will be invoked
1356  *         (or rather, will already have been invoked)
1357  */
1358 struct GNUNET_DATASTORE_QueueEntry *
1359 GNUNET_DATASTORE_iterate_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1360                                          unsigned int queue_priority,
1361                                          unsigned int max_queue_size,
1362                                          struct GNUNET_TIME_Relative timeout,
1363                                          enum GNUNET_BLOCK_Type type,
1364                                          GNUNET_DATASTORE_Iterator iter, 
1365                                          void *iter_cls)
1366 {
1367   struct GNUNET_DATASTORE_QueueEntry *qe;
1368   struct GetZeroAnonymityMessage *m;
1369   union QueueContext qc;
1370
1371 #if DEBUG_DATASTORE
1372   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1373               "Asked to get zero-anonymity entry in %llu ms\n",
1374               (unsigned long long) timeout.rel_value);
1375 #endif
1376   qc.rc.iter = iter;
1377   qc.rc.iter_cls = iter_cls;
1378   qe = make_queue_entry (h, sizeof(struct GetZeroAnonymityMessage),
1379                          queue_priority, max_queue_size, timeout,
1380                          &process_result_message, &qc);
1381   if (qe == NULL)
1382     {
1383 #if DEBUG_DATASTORE
1384       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1385                   "Could not create queue entry for zero-anonymity iteration\n");
1386 #endif
1387       return NULL;    
1388     }
1389   GNUNET_STATISTICS_update (h->stats,
1390                             gettext_noop ("# GET ZERO ANONYMITY requests executed"),
1391                             1,
1392                             GNUNET_NO);
1393   m = (struct GetZeroAnonymityMessage*) &qe[1];
1394   m->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY);
1395   m->header.size = htons(sizeof (struct GetZeroAnonymityMessage));
1396   m->type = htonl ((uint32_t) type);
1397   process_queue (h);
1398   return qe;
1399 }
1400
1401
1402
1403 /**
1404  * Iterate over the results for a particular key
1405  * in the datastore.  The iterator will only be called
1406  * once initially; if the first call did contain a
1407  * result, further results can be obtained by calling
1408  * "GNUNET_DATASTORE_iterate_get_next" with the given argument.
1409  *
1410  * @param h handle to the datastore
1411  * @param key maybe NULL (to match all entries)
1412  * @param type desired type, 0 for any
1413  * @param queue_priority ranking of this request in the priority queue
1414  * @param max_queue_size at what queue size should this request be dropped
1415  *        (if other requests of higher priority are in the queue)
1416  * @param timeout how long to wait at most for a response
1417  * @param iter function to call on each matching value;
1418  *        will be called once with a NULL value at the end
1419  * @param iter_cls closure for iter
1420  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1421  *         cancel; note that even if NULL is returned, the callback will be invoked
1422  *         (or rather, will already have been invoked)
1423  */
1424 struct GNUNET_DATASTORE_QueueEntry *
1425 GNUNET_DATASTORE_iterate_key (struct GNUNET_DATASTORE_Handle *h,
1426                               const GNUNET_HashCode * key,
1427                               enum GNUNET_BLOCK_Type type,
1428                               unsigned int queue_priority,
1429                               unsigned int max_queue_size,
1430                               struct GNUNET_TIME_Relative timeout,
1431                               GNUNET_DATASTORE_Iterator iter, 
1432                               void *iter_cls)
1433 {
1434   struct GNUNET_DATASTORE_QueueEntry *qe;
1435   struct GetMessage *gm;
1436   union QueueContext qc;
1437
1438 #if DEBUG_DATASTORE
1439   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1440               "Asked to look for data of type %u under key `%s'\n",
1441               (unsigned int) type,
1442               GNUNET_h2s (key));
1443 #endif
1444   qc.rc.iter = iter;
1445   qc.rc.iter_cls = iter_cls;
1446   qe = make_queue_entry (h, sizeof(struct GetMessage),
1447                          queue_priority, max_queue_size, timeout,
1448                          &process_result_message, &qc);
1449   if (qe == NULL)
1450     {
1451 #if DEBUG_DATASTORE
1452       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1453                   "Could not queue request for `%s'\n",
1454                   GNUNET_h2s (key));
1455 #endif
1456       return NULL;
1457     }
1458   GNUNET_STATISTICS_update (h->stats,
1459                             gettext_noop ("# GET requests executed"),
1460                             1,
1461                             GNUNET_NO);
1462   gm = (struct GetMessage*) &qe[1];
1463   gm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1464   gm->type = htonl(type);
1465   if (key != NULL)
1466     {
1467       gm->header.size = htons(sizeof (struct GetMessage));
1468       gm->key = *key;
1469     }
1470   else
1471     {
1472       gm->header.size = htons(sizeof (struct GetMessage) - sizeof(GNUNET_HashCode));
1473     }
1474   process_queue (h);
1475   return qe;
1476 }
1477
1478
1479 /**
1480  * Function called to trigger obtaining the next result
1481  * from the datastore.
1482  * 
1483  * @param h handle to the datastore
1484  */
1485 void 
1486 GNUNET_DATASTORE_iterate_get_next (struct GNUNET_DATASTORE_Handle *h)
1487 {
1488   struct GNUNET_DATASTORE_QueueEntry *qe = h->queue_head;
1489
1490   GNUNET_assert (&process_result_message == qe->response_proc);
1491   h->in_receive = GNUNET_YES;
1492   GNUNET_CLIENT_receive (h->client,
1493                          qe->response_proc,
1494                          qe,
1495                          GNUNET_TIME_absolute_get_remaining (qe->timeout));
1496 }
1497
1498
1499 /**
1500  * Cancel a datastore operation.  The final callback from the
1501  * operation must not have been done yet.
1502  * 
1503  * @param qe operation to cancel
1504  */
1505 void
1506 GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
1507 {
1508   struct GNUNET_DATASTORE_Handle *h;
1509
1510   h = qe->h;
1511 #if DEBUG_DATASTORE
1512   GNUNET_log  (GNUNET_ERROR_TYPE_DEBUG,
1513                "Pending DATASTORE request %p cancelled (%d, %d)\n",
1514                qe,
1515                qe->was_transmitted,
1516                h->queue_head == qe);
1517 #endif
1518   if (GNUNET_YES == qe->was_transmitted) 
1519     {
1520       free_queue_entry (qe);
1521       h->retry_time = GNUNET_TIME_UNIT_ZERO;
1522       do_disconnect (h);
1523       return;
1524     }
1525   free_queue_entry (qe);
1526   process_queue (h);
1527 }
1528
1529
1530 /* end of datastore_api.c */