f9d4dff7eab1d36f2583bcb7485da0ce3bd65991
[oweals/gnunet.git] / src / datastore / datastore_api.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/datastore_api.c
23  * @brief Management for the datastore for files stored on a GNUnet node.  Implements
24  *        a priority queue for requests (with timeouts).
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet_arm_service.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_datastore_service.h"
31 #include "gnunet_statistics_service.h"
32 #include "datastore.h"
33
34 #define LOG(kind,...) GNUNET_log_from (kind, "datastore-api",__VA_ARGS__)
35
36 /**
37  * If a client stopped asking for more results, how many more do
38  * we receive from the DB before killing the connection?  Trade-off
39  * between re-doing TCP handshakes and (needlessly) receiving
40  * useless results.
41  */
42 #define MAX_EXCESS_RESULTS 8
43
44 /**
45  * Context for processing status messages.
46  */
47 struct StatusContext
48 {
49   /**
50    * Continuation to call with the status.
51    */
52   GNUNET_DATASTORE_ContinuationWithStatus cont;
53
54   /**
55    * Closure for cont.
56    */
57   void *cont_cls;
58
59 };
60
61
62 /**
63  * Context for processing result messages.
64  */
65 struct ResultContext
66 {
67   /**
68    * Function to call with the result.
69    */
70   GNUNET_DATASTORE_DatumProcessor proc;
71
72   /**
73    * Closure for proc.
74    */
75   void *proc_cls;
76
77 };
78
79
80 /**
81  *  Context for a queue operation.
82  */
83 union QueueContext
84 {
85
86   struct StatusContext sc;
87
88   struct ResultContext rc;
89
90 };
91
92
93
94 /**
95  * Entry in our priority queue.
96  */
97 struct GNUNET_DATASTORE_QueueEntry
98 {
99
100   /**
101    * This is a linked list.
102    */
103   struct GNUNET_DATASTORE_QueueEntry *next;
104
105   /**
106    * This is a linked list.
107    */
108   struct GNUNET_DATASTORE_QueueEntry *prev;
109
110   /**
111    * Handle to the master context.
112    */
113   struct GNUNET_DATASTORE_Handle *h;
114
115   /**
116    * Response processor (NULL if we are not waiting for a response).
117    * This struct should be used for the closure, function-specific
118    * arguments can be passed via 'qc'.
119    */
120   GNUNET_CLIENT_MessageHandler response_proc;
121
122   /**
123    * Function to call after transmission of the request.
124    */
125   GNUNET_DATASTORE_ContinuationWithStatus cont;
126
127   /**
128    * Closure for 'cont'.
129    */
130   void *cont_cls;
131
132   /**
133    * Context for the operation.
134    */
135   union QueueContext qc;
136
137   /**
138    * Task for timeout signalling.
139    */
140   GNUNET_SCHEDULER_TaskIdentifier task;
141
142   /**
143    * Timeout for the current operation.
144    */
145   struct GNUNET_TIME_Absolute timeout;
146
147   /**
148    * Priority in the queue.
149    */
150   unsigned int priority;
151
152   /**
153    * Maximum allowed length of queue (otherwise
154    * this request should be discarded).
155    */
156   unsigned int max_queue;
157
158   /**
159    * Number of bytes in the request message following
160    * this struct.  32-bit value for nicer memory
161    * access (and overall struct alignment).
162    */
163   uint32_t message_size;
164
165   /**
166    * Has this message been transmitted to the service?
167    * Only ever GNUNET_YES for the head of the queue.
168    * Note that the overall struct should end at a
169    * multiple of 64 bits.
170    */
171   int was_transmitted;
172
173 };
174
175 /**
176  * Handle to the datastore service.
177  */
178 struct GNUNET_DATASTORE_Handle
179 {
180
181   /**
182    * Our configuration.
183    */
184   const struct GNUNET_CONFIGURATION_Handle *cfg;
185
186   /**
187    * Current connection to the datastore service.
188    */
189   struct GNUNET_CLIENT_Connection *client;
190
191   /**
192    * Handle for statistics.
193    */
194   struct GNUNET_STATISTICS_Handle *stats;
195
196   /**
197    * Current transmit handle.
198    */
199   struct GNUNET_CLIENT_TransmitHandle *th;
200
201   /**
202    * Current head of priority queue.
203    */
204   struct GNUNET_DATASTORE_QueueEntry *queue_head;
205
206   /**
207    * Current tail of priority queue.
208    */
209   struct GNUNET_DATASTORE_QueueEntry *queue_tail;
210
211   /**
212    * Task for trying to reconnect.
213    */
214   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
215
216   /**
217    * How quickly should we retry?  Used for exponential back-off on
218    * connect-errors.
219    */
220   struct GNUNET_TIME_Relative retry_time;
221
222   /**
223    * Number of entries in the queue.
224    */
225   unsigned int queue_size;
226
227   /**
228    * Number of results we're receiving for the current query
229    * after application stopped to care.  Used to determine when
230    * to reset the connection.
231    */
232   unsigned int result_count;
233
234   /**
235    * Are we currently trying to receive from the service?
236    */
237   int in_receive;
238
239   /**
240    * We should ignore the next message(s) from the service.
241    */
242   unsigned int skip_next_messages;
243
244 };
245
246
247
248 /**
249  * Connect to the datastore service.
250  *
251  * @param cfg configuration to use
252  * @return handle to use to access the service
253  */
254 struct GNUNET_DATASTORE_Handle *
255 GNUNET_DATASTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
256 {
257   struct GNUNET_CLIENT_Connection *c;
258   struct GNUNET_DATASTORE_Handle *h;
259
260   c = GNUNET_CLIENT_connect ("datastore", cfg);
261   if (c == NULL)
262     return NULL;                /* oops */
263   h = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_Handle) +
264                      GNUNET_SERVER_MAX_MESSAGE_SIZE - 1);
265   h->client = c;
266   h->cfg = cfg;
267   h->stats = GNUNET_STATISTICS_create ("datastore-api", cfg);
268   return h;
269 }
270
271
272 /**
273  * Task used by 'transmit_drop' to disconnect the datastore.
274  *
275  * @param cls the datastore handle
276  * @param tc scheduler context
277  */
278 static void
279 disconnect_after_drop (void *cls,
280                        const struct GNUNET_SCHEDULER_TaskContext *tc)
281 {
282   struct GNUNET_DATASTORE_Handle *h = cls;
283
284   GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
285 }
286
287
288 /**
289  * Transmit DROP message to datastore service.
290  *
291  * @param cls the 'struct GNUNET_DATASTORE_Handle'
292  * @param size number of bytes that can be copied to buf
293  * @param buf where to copy the drop message
294  * @return number of bytes written to buf
295  */
296 static size_t
297 transmit_drop (void *cls, size_t size, void *buf)
298 {
299   struct GNUNET_DATASTORE_Handle *h = cls;
300   struct GNUNET_MessageHeader *hdr;
301
302   if (buf == NULL)
303   {
304     LOG (GNUNET_ERROR_TYPE_WARNING,
305          _("Failed to transmit request to drop database.\n"));
306     GNUNET_SCHEDULER_add_continuation (&disconnect_after_drop, h,
307                                        GNUNET_SCHEDULER_REASON_PREREQ_DONE);
308     return 0;
309   }
310   GNUNET_assert (size >= sizeof (struct GNUNET_MessageHeader));
311   hdr = buf;
312   hdr->size = htons (sizeof (struct GNUNET_MessageHeader));
313   hdr->type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
314   GNUNET_SCHEDULER_add_continuation (&disconnect_after_drop, h,
315                                      GNUNET_SCHEDULER_REASON_PREREQ_DONE);
316   return sizeof (struct GNUNET_MessageHeader);
317 }
318
319
320 /**
321  * Disconnect from the datastore service (and free
322  * associated resources).
323  *
324  * @param h handle to the datastore
325  * @param drop set to GNUNET_YES to delete all data in datastore (!)
326  */
327 void
328 GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h, int drop)
329 {
330   struct GNUNET_DATASTORE_QueueEntry *qe;
331
332 #if DEBUG_DATASTORE
333   LOG (GNUNET_ERROR_TYPE_DEBUG, "Datastore disconnect\n");
334 #endif
335   if (NULL != h->th)
336   {
337     GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
338     h->th = NULL;
339   }
340   if (h->client != NULL)
341   {
342     GNUNET_CLIENT_disconnect (h->client);
343     h->client = NULL;
344   }
345   if (h->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
346   {
347     GNUNET_SCHEDULER_cancel (h->reconnect_task);
348     h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
349   }
350   while (NULL != (qe = h->queue_head))
351   {
352     GNUNET_assert (NULL != qe->response_proc);
353     qe->response_proc (h, NULL);
354   }
355   if (GNUNET_YES == drop)
356   {
357     h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
358     if (h->client != NULL)
359     {
360       if (NULL !=
361           GNUNET_CLIENT_notify_transmit_ready (h->client,
362                                                sizeof (struct
363                                                        GNUNET_MessageHeader),
364                                                GNUNET_TIME_UNIT_MINUTES,
365                                                GNUNET_YES, &transmit_drop, h))
366         return;
367       GNUNET_CLIENT_disconnect (h->client);
368       h->client = NULL;
369     }
370     GNUNET_break (0);
371   }
372   GNUNET_STATISTICS_destroy (h->stats, GNUNET_NO);
373   h->stats = NULL;
374   GNUNET_free (h);
375 }
376
377
378 /**
379  * A request has timed out (before being transmitted to the service).
380  *
381  * @param cls the 'struct GNUNET_DATASTORE_QueueEntry'
382  * @param tc scheduler context
383  */
384 static void
385 timeout_queue_entry (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
386 {
387   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
388
389   GNUNET_STATISTICS_update (qe->h->stats,
390                             gettext_noop ("# queue entry timeouts"), 1,
391                             GNUNET_NO);
392   qe->task = GNUNET_SCHEDULER_NO_TASK;
393   GNUNET_assert (qe->was_transmitted == GNUNET_NO);
394 #if DEBUG_DATASTORE
395   LOG (GNUNET_ERROR_TYPE_DEBUG, "Timeout of request in datastore queue\n");
396 #endif
397   qe->response_proc (qe->h, NULL);
398 }
399
400
401 /**
402  * Create a new entry for our priority queue (and possibly discard other entires if
403  * the queue is getting too long).
404  *
405  * @param h handle to the datastore
406  * @param msize size of the message to queue
407  * @param queue_priority priority of the entry
408  * @param max_queue_size at what queue size should this request be dropped
409  *        (if other requests of higher priority are in the queue)
410  * @param timeout timeout for the operation
411  * @param response_proc function to call with replies (can be NULL)
412  * @param qc client context (NOT a closure for response_proc)
413  * @return NULL if the queue is full
414  */
415 static struct GNUNET_DATASTORE_QueueEntry *
416 make_queue_entry (struct GNUNET_DATASTORE_Handle *h, size_t msize,
417                   unsigned int queue_priority, unsigned int max_queue_size,
418                   struct GNUNET_TIME_Relative timeout,
419                   GNUNET_CLIENT_MessageHandler response_proc,
420                   const union QueueContext *qc)
421 {
422   struct GNUNET_DATASTORE_QueueEntry *ret;
423   struct GNUNET_DATASTORE_QueueEntry *pos;
424   unsigned int c;
425
426   c = 0;
427   pos = h->queue_head;
428   while ((pos != NULL) && (c < max_queue_size) &&
429          (pos->priority >= queue_priority))
430   {
431     c++;
432     pos = pos->next;
433   }
434   if (c >= max_queue_size)
435   {
436     GNUNET_STATISTICS_update (h->stats, gettext_noop ("# queue overflows"), 1,
437                               GNUNET_NO);
438     return NULL;
439   }
440   ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize);
441   ret->h = h;
442   ret->response_proc = response_proc;
443   ret->qc = *qc;
444   ret->timeout = GNUNET_TIME_relative_to_absolute (timeout);
445   ret->priority = queue_priority;
446   ret->max_queue = max_queue_size;
447   ret->message_size = msize;
448   ret->was_transmitted = GNUNET_NO;
449   if (pos == NULL)
450   {
451     /* append at the tail */
452     pos = h->queue_tail;
453   }
454   else
455   {
456     pos = pos->prev;
457     /* do not insert at HEAD if HEAD query was already
458      * transmitted and we are still receiving replies! */
459     if ((pos == NULL) && (h->queue_head->was_transmitted))
460       pos = h->queue_head;
461   }
462   c++;
463   GNUNET_STATISTICS_update (h->stats, gettext_noop ("# queue entries created"),
464                             1, GNUNET_NO);
465   GNUNET_CONTAINER_DLL_insert_after (h->queue_head, h->queue_tail, pos, ret);
466   h->queue_size++;
467   ret->task = GNUNET_SCHEDULER_add_delayed (timeout, &timeout_queue_entry, ret);
468   pos = ret->next;
469   while (pos != NULL)
470   {
471     if ((pos->max_queue < h->queue_size) && (pos->was_transmitted == GNUNET_NO))
472     {
473       GNUNET_assert (pos->response_proc != NULL);
474       /* move 'pos' element to head so that it will be
475        * killed on 'NULL' call below */
476 #if DEBUG_DATASTORE
477       LOG (GNUNET_ERROR_TYPE_DEBUG, "Dropping request from datastore queue\n");
478 #endif
479       GNUNET_CONTAINER_DLL_remove (h->queue_head, h->queue_tail, pos);
480       GNUNET_CONTAINER_DLL_insert (h->queue_head, h->queue_tail, pos);
481       GNUNET_STATISTICS_update (h->stats,
482                                 gettext_noop
483                                 ("# Requests dropped from datastore queue"), 1,
484                                 GNUNET_NO);
485       GNUNET_assert (h->queue_head == pos);
486       pos->response_proc (h, NULL);
487       break;
488     }
489     pos = pos->next;
490   }
491   return ret;
492 }
493
494
495 /**
496  * Process entries in the queue (or do nothing if we are already
497  * doing so).
498  *
499  * @param h handle to the datastore
500  */
501 static void
502 process_queue (struct GNUNET_DATASTORE_Handle *h);
503
504
505 /**
506  * Try reconnecting to the datastore service.
507  *
508  * @param cls the 'struct GNUNET_DATASTORE_Handle'
509  * @param tc scheduler context
510  */
511 static void
512 try_reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
513 {
514   struct GNUNET_DATASTORE_Handle *h = cls;
515
516   if (h->retry_time.rel_value < GNUNET_CONSTANTS_SERVICE_RETRY.rel_value)
517     h->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY;
518   else
519     h->retry_time = GNUNET_TIME_relative_multiply (h->retry_time, 2);
520   if (h->retry_time.rel_value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.rel_value)
521     h->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
522   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
523   h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
524   if (h->client == NULL)
525   {
526     LOG (GNUNET_ERROR_TYPE_ERROR, "DATASTORE reconnect failed (fatally)\n");
527     return;
528   }
529   GNUNET_STATISTICS_update (h->stats,
530                             gettext_noop
531                             ("# datastore connections (re)created"), 1,
532                             GNUNET_NO);
533 #if DEBUG_DATASTORE
534   LOG (GNUNET_ERROR_TYPE_DEBUG, "Reconnected to DATASTORE\n");
535 #endif
536   process_queue (h);
537 }
538
539
540 /**
541  * Disconnect from the service and then try reconnecting to the datastore service
542  * after some delay.
543  *
544  * @param h handle to datastore to disconnect and reconnect
545  */
546 static void
547 do_disconnect (struct GNUNET_DATASTORE_Handle *h)
548 {
549   if (h->client == NULL)
550   {
551 #if DEBUG_DATASTORE
552     LOG (GNUNET_ERROR_TYPE_DEBUG,
553          "client NULL in disconnect, will not try to reconnect\n");
554 #endif
555     return;
556   }
557 #if 0
558   GNUNET_STATISTICS_update (stats, gettext_noop ("# reconnected to DATASTORE"),
559                             1, GNUNET_NO);
560 #endif
561   GNUNET_CLIENT_disconnect (h->client);
562   h->skip_next_messages = 0;
563   h->client = NULL;
564   h->reconnect_task =
565       GNUNET_SCHEDULER_add_delayed (h->retry_time, &try_reconnect, h);
566 }
567
568
569 /**
570  * Function called whenever we receive a message from
571  * the service.  Calls the appropriate handler.
572  *
573  * @param cls the 'struct GNUNET_DATASTORE_Handle'
574  * @param msg the received message
575  */
576 static void
577 receive_cb (void *cls, const struct GNUNET_MessageHeader *msg)
578 {
579   struct GNUNET_DATASTORE_Handle *h = cls;
580   struct GNUNET_DATASTORE_QueueEntry *qe;
581
582   h->in_receive = GNUNET_NO;
583 #if DEBUG_DATASTORE
584   LOG (GNUNET_ERROR_TYPE_DEBUG, "Receiving reply from datastore\n");
585 #endif
586   if (h->skip_next_messages > 0)
587   {
588     h->skip_next_messages--;
589     process_queue (h);
590     return;
591   }
592   if (NULL == (qe = h->queue_head))
593   {
594     GNUNET_break (0);
595     process_queue (h);
596     return;
597   }
598   qe->response_proc (h, msg);
599 }
600
601
602 /**
603  * Transmit request from queue to datastore service.
604  *
605  * @param cls the 'struct GNUNET_DATASTORE_Handle'
606  * @param size number of bytes that can be copied to buf
607  * @param buf where to copy the drop message
608  * @return number of bytes written to buf
609  */
610 static size_t
611 transmit_request (void *cls, size_t size, void *buf)
612 {
613   struct GNUNET_DATASTORE_Handle *h = cls;
614   struct GNUNET_DATASTORE_QueueEntry *qe;
615   size_t msize;
616
617   h->th = NULL;
618   if (NULL == (qe = h->queue_head))
619     return 0;                   /* no entry in queue */
620   if (buf == NULL)
621   {
622 #if DEBUG_DATASTORE
623     LOG (GNUNET_ERROR_TYPE_DEBUG, "Failed to transmit request to DATASTORE.\n");
624 #endif
625     GNUNET_STATISTICS_update (h->stats,
626                               gettext_noop ("# transmission request failures"),
627                               1, GNUNET_NO);
628     do_disconnect (h);
629     return 0;
630   }
631   if (size < (msize = qe->message_size))
632   {
633     process_queue (h);
634     return 0;
635   }
636 #if DEBUG_DATASTORE
637   LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmitting %u byte request to DATASTORE\n",
638        msize);
639 #endif
640   memcpy (buf, &qe[1], msize);
641   qe->was_transmitted = GNUNET_YES;
642   GNUNET_SCHEDULER_cancel (qe->task);
643   qe->task = GNUNET_SCHEDULER_NO_TASK;
644   GNUNET_assert (GNUNET_NO == h->in_receive);
645   h->in_receive = GNUNET_YES;
646   GNUNET_CLIENT_receive (h->client, &receive_cb, h,
647                          GNUNET_TIME_absolute_get_remaining (qe->timeout));
648   GNUNET_STATISTICS_update (h->stats,
649                             gettext_noop ("# bytes sent to datastore"), 1,
650                             GNUNET_NO);
651   return msize;
652 }
653
654
655 /**
656  * Process entries in the queue (or do nothing if we are already
657  * doing so).
658  *
659  * @param h handle to the datastore
660  */
661 static void
662 process_queue (struct GNUNET_DATASTORE_Handle *h)
663 {
664   struct GNUNET_DATASTORE_QueueEntry *qe;
665
666   if (NULL == (qe = h->queue_head))
667   {
668 #if DEBUG_DATASTORE > 1
669     LOG (GNUNET_ERROR_TYPE_DEBUG, "Queue empty\n");
670 #endif
671     return;                     /* no entry in queue */
672   }
673   if (qe->was_transmitted == GNUNET_YES)
674   {
675 #if DEBUG_DATASTORE > 1
676     LOG (GNUNET_ERROR_TYPE_DEBUG, "Head request already transmitted\n");
677 #endif
678     return;                     /* waiting for replies */
679   }
680   if (h->th != NULL)
681   {
682 #if DEBUG_DATASTORE > 1
683     LOG (GNUNET_ERROR_TYPE_DEBUG, "Pending transmission request\n");
684 #endif
685     return;                     /* request pending */
686   }
687   if (h->client == NULL)
688   {
689 #if DEBUG_DATASTORE > 1
690     LOG (GNUNET_ERROR_TYPE_DEBUG, "Not connected\n");
691 #endif
692     return;                     /* waiting for reconnect */
693   }
694   if (GNUNET_YES == h->in_receive)
695   {
696     /* wait for response to previous query */
697     return;
698   }
699 #if DEBUG_DATASTORE
700   LOG (GNUNET_ERROR_TYPE_DEBUG, "Queueing %u byte request to DATASTORE\n",
701        qe->message_size);
702 #endif
703   h->th =
704       GNUNET_CLIENT_notify_transmit_ready (h->client, qe->message_size,
705                                            GNUNET_TIME_absolute_get_remaining
706                                            (qe->timeout), GNUNET_YES,
707                                            &transmit_request, h);
708   GNUNET_assert (GNUNET_NO == h->in_receive);
709   GNUNET_break (NULL != h->th);
710 }
711
712
713 /**
714  * Dummy continuation used to do nothing (but be non-zero).
715  *
716  * @param cls closure
717  * @param result result
718  * @param min_expiration expiration time
719  * @param emsg error message
720  */
721 static void
722 drop_status_cont (void *cls, int32_t result, 
723                   struct GNUNET_TIME_Absolute min_expiration,
724                   const char *emsg)
725 {
726   /* do nothing */
727 }
728
729
730 /**
731  * Free a queue entry.  Removes the given entry from the
732  * queue and releases associated resources.  Does NOT
733  * call the callback.
734  *
735  * @param qe entry to free.
736  */
737 static void
738 free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
739 {
740   struct GNUNET_DATASTORE_Handle *h = qe->h;
741
742   GNUNET_CONTAINER_DLL_remove (h->queue_head, h->queue_tail, qe);
743   if (qe->task != GNUNET_SCHEDULER_NO_TASK)
744   {
745     GNUNET_SCHEDULER_cancel (qe->task);
746     qe->task = GNUNET_SCHEDULER_NO_TASK;
747   }
748   h->queue_size--;
749   qe->was_transmitted = GNUNET_SYSERR;  /* use-after-free warning */
750   GNUNET_free (qe);
751 }
752
753
754 /**
755  * Type of a function to call when we receive a message
756  * from the service.
757  *
758  * @param cls closure
759  * @param msg message received, NULL on timeout or fatal error
760  */
761 static void
762 process_status_message (void *cls, const struct GNUNET_MessageHeader *msg)
763 {
764   struct GNUNET_DATASTORE_Handle *h = cls;
765   struct GNUNET_DATASTORE_QueueEntry *qe;
766   struct StatusContext rc;
767   const struct StatusMessage *sm;
768   const char *emsg;
769   int32_t status;
770   int was_transmitted;
771
772   if (NULL == (qe = h->queue_head))
773   {
774     GNUNET_break (0);
775     do_disconnect (h);
776     return;
777   }
778   rc = qe->qc.sc;
779   if (msg == NULL)
780   {
781     was_transmitted = qe->was_transmitted;
782     free_queue_entry (qe);
783     if (was_transmitted == GNUNET_YES)
784       do_disconnect (h);
785     else
786       process_queue (h);
787     if (rc.cont != NULL)
788       rc.cont (rc.cont_cls, GNUNET_SYSERR,
789                GNUNET_TIME_UNIT_ZERO_ABS,
790                _("Failed to receive status response from database."));
791     return;
792   }
793   GNUNET_assert (GNUNET_YES == qe->was_transmitted);
794   free_queue_entry (qe);
795   if ((ntohs (msg->size) < sizeof (struct StatusMessage)) ||
796       (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS))
797   {
798     GNUNET_break (0);
799     h->retry_time = GNUNET_TIME_UNIT_ZERO;
800     do_disconnect (h);
801     if (rc.cont != NULL)
802       rc.cont (rc.cont_cls, GNUNET_SYSERR,
803                GNUNET_TIME_UNIT_ZERO_ABS,
804                _("Error reading response from datastore service"));
805     return;
806   }
807   sm = (const struct StatusMessage *) msg;
808   status = ntohl (sm->status);
809   emsg = NULL;
810   if (ntohs (msg->size) > sizeof (struct StatusMessage))
811   {
812     emsg = (const char *) &sm[1];
813     if (emsg[ntohs (msg->size) - sizeof (struct StatusMessage) - 1] != '\0')
814     {
815       GNUNET_break (0);
816       emsg = _("Invalid error message received from datastore service");
817     }
818   }
819   if ((status == GNUNET_SYSERR) && (emsg == NULL))
820   {
821     GNUNET_break (0);
822     emsg = _("Invalid error message received from datastore service");
823   }
824 #if DEBUG_DATASTORE
825   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received status %d/%s\n", (int) status, emsg);
826 #endif
827   GNUNET_STATISTICS_update (h->stats,
828                             gettext_noop ("# status messages received"), 1,
829                             GNUNET_NO);
830   h->retry_time.rel_value = 0;
831   process_queue (h);
832   if (rc.cont != NULL)
833     rc.cont (rc.cont_cls, status, 
834              GNUNET_TIME_absolute_ntoh (sm->min_expiration),
835              emsg);
836 }
837
838
839 /**
840  * Store an item in the datastore.  If the item is already present,
841  * the priorities are summed up and the higher expiration time and
842  * lower anonymity level is used.
843  *
844  * @param h handle to the datastore
845  * @param rid reservation ID to use (from "reserve"); use 0 if no
846  *            prior reservation was made
847  * @param key key for the value
848  * @param size number of bytes in data
849  * @param data content stored
850  * @param type type of the content
851  * @param priority priority of the content
852  * @param anonymity anonymity-level for the content
853  * @param replication how often should the content be replicated to other peers?
854  * @param expiration expiration time for the content
855  * @param queue_priority ranking of this request in the priority queue
856  * @param max_queue_size at what queue size should this request be dropped
857  *        (if other requests of higher priority are in the queue)
858  * @param timeout timeout for the operation
859  * @param cont continuation to call when done
860  * @param cont_cls closure for cont
861  * @return NULL if the entry was not queued, otherwise a handle that can be used to
862  *         cancel; note that even if NULL is returned, the callback will be invoked
863  *         (or rather, will already have been invoked)
864  */
865 struct GNUNET_DATASTORE_QueueEntry *
866 GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, uint32_t rid,
867                       const GNUNET_HashCode * key, size_t size,
868                       const void *data, enum GNUNET_BLOCK_Type type,
869                       uint32_t priority, uint32_t anonymity,
870                       uint32_t replication,
871                       struct GNUNET_TIME_Absolute expiration,
872                       unsigned int queue_priority, unsigned int max_queue_size,
873                       struct GNUNET_TIME_Relative timeout,
874                       GNUNET_DATASTORE_ContinuationWithStatus cont,
875                       void *cont_cls)
876 {
877   struct GNUNET_DATASTORE_QueueEntry *qe;
878   struct DataMessage *dm;
879   size_t msize;
880   union QueueContext qc;
881
882 #if DEBUG_DATASTORE
883   LOG (GNUNET_ERROR_TYPE_DEBUG,
884        "Asked to put %u bytes of data under key `%s' for %llu ms\n", size,
885        GNUNET_h2s (key),
886        GNUNET_TIME_absolute_get_remaining (expiration).rel_value);
887 #endif
888   msize = sizeof (struct DataMessage) + size;
889   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
890   qc.sc.cont = cont;
891   qc.sc.cont_cls = cont_cls;
892   qe = make_queue_entry (h, msize, queue_priority, max_queue_size, timeout,
893                          &process_status_message, &qc);
894   if (qe == NULL)
895   {
896 #if DEBUG_DATASTORE
897     LOG (GNUNET_ERROR_TYPE_DEBUG, "Could not create queue entry for PUT\n");
898 #endif
899     return NULL;
900   }
901   GNUNET_STATISTICS_update (h->stats, gettext_noop ("# PUT requests executed"),
902                             1, GNUNET_NO);
903   dm = (struct DataMessage *) &qe[1];
904   dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
905   dm->header.size = htons (msize);
906   dm->rid = htonl (rid);
907   dm->size = htonl ((uint32_t) size);
908   dm->type = htonl (type);
909   dm->priority = htonl (priority);
910   dm->anonymity = htonl (anonymity);
911   dm->replication = htonl (replication);
912   dm->reserved = htonl (0);
913   dm->uid = GNUNET_htonll (0);
914   dm->expiration = GNUNET_TIME_absolute_hton (expiration);
915   dm->key = *key;
916   memcpy (&dm[1], data, size);
917   process_queue (h);
918   return qe;
919 }
920
921
922 /**
923  * Reserve space in the datastore.  This function should be used
924  * to avoid "out of space" failures during a longer sequence of "put"
925  * operations (for example, when a file is being inserted).
926  *
927  * @param h handle to the datastore
928  * @param amount how much space (in bytes) should be reserved (for content only)
929  * @param entries how many entries will be created (to calculate per-entry overhead)
930  * @param queue_priority ranking of this request in the priority queue
931  * @param max_queue_size at what queue size should this request be dropped
932  *        (if other requests of higher priority are in the queue)
933  * @param timeout how long to wait at most for a response (or before dying in queue)
934  * @param cont continuation to call when done; "success" will be set to
935  *             a positive reservation value if space could be reserved.
936  * @param cont_cls closure for cont
937  * @return NULL if the entry was not queued, otherwise a handle that can be used to
938  *         cancel; note that even if NULL is returned, the callback will be invoked
939  *         (or rather, will already have been invoked)
940  */
941 struct GNUNET_DATASTORE_QueueEntry *
942 GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, uint64_t amount,
943                           uint32_t entries, unsigned int queue_priority,
944                           unsigned int max_queue_size,
945                           struct GNUNET_TIME_Relative timeout,
946                           GNUNET_DATASTORE_ContinuationWithStatus cont,
947                           void *cont_cls)
948 {
949   struct GNUNET_DATASTORE_QueueEntry *qe;
950   struct ReserveMessage *rm;
951   union QueueContext qc;
952
953   if (cont == NULL)
954     cont = &drop_status_cont;
955 #if DEBUG_DATASTORE
956   LOG (GNUNET_ERROR_TYPE_DEBUG,
957        "Asked to reserve %llu bytes of data and %u entries\n",
958        (unsigned long long) amount, (unsigned int) entries);
959 #endif
960   qc.sc.cont = cont;
961   qc.sc.cont_cls = cont_cls;
962   qe = make_queue_entry (h, sizeof (struct ReserveMessage), queue_priority,
963                          max_queue_size, timeout, &process_status_message, &qc);
964   if (qe == NULL)
965   {
966 #if DEBUG_DATASTORE
967     LOG (GNUNET_ERROR_TYPE_DEBUG, "Could not create queue entry to reserve\n");
968 #endif
969     return NULL;
970   }
971   GNUNET_STATISTICS_update (h->stats,
972                             gettext_noop ("# RESERVE requests executed"), 1,
973                             GNUNET_NO);
974   rm = (struct ReserveMessage *) &qe[1];
975   rm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
976   rm->header.size = htons (sizeof (struct ReserveMessage));
977   rm->entries = htonl (entries);
978   rm->amount = GNUNET_htonll (amount);
979   process_queue (h);
980   return qe;
981 }
982
983
984 /**
985  * Signal that all of the data for which a reservation was made has
986  * been stored and that whatever excess space might have been reserved
987  * can now be released.
988  *
989  * @param h handle to the datastore
990  * @param rid reservation ID (value of "success" in original continuation
991  *        from the "reserve" function).
992  * @param queue_priority ranking of this request in the priority queue
993  * @param max_queue_size at what queue size should this request be dropped
994  *        (if other requests of higher priority are in the queue)
995  * @param queue_priority ranking of this request in the priority queue
996  * @param max_queue_size at what queue size should this request be dropped
997  *        (if other requests of higher priority are in the queue)
998  * @param timeout how long to wait at most for a response
999  * @param cont continuation to call when done
1000  * @param cont_cls closure for cont
1001  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1002  *         cancel; note that even if NULL is returned, the callback will be invoked
1003  *         (or rather, will already have been invoked)
1004  */
1005 struct GNUNET_DATASTORE_QueueEntry *
1006 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
1007                                   uint32_t rid, unsigned int queue_priority,
1008                                   unsigned int max_queue_size,
1009                                   struct GNUNET_TIME_Relative timeout,
1010                                   GNUNET_DATASTORE_ContinuationWithStatus cont,
1011                                   void *cont_cls)
1012 {
1013   struct GNUNET_DATASTORE_QueueEntry *qe;
1014   struct ReleaseReserveMessage *rrm;
1015   union QueueContext qc;
1016
1017   if (cont == NULL)
1018     cont = &drop_status_cont;
1019 #if DEBUG_DATASTORE
1020   LOG (GNUNET_ERROR_TYPE_DEBUG, "Asked to release reserve %d\n", rid);
1021 #endif
1022   qc.sc.cont = cont;
1023   qc.sc.cont_cls = cont_cls;
1024   qe = make_queue_entry (h, sizeof (struct ReleaseReserveMessage),
1025                          queue_priority, max_queue_size, timeout,
1026                          &process_status_message, &qc);
1027   if (qe == NULL)
1028   {
1029 #if DEBUG_DATASTORE
1030     LOG (GNUNET_ERROR_TYPE_DEBUG,
1031          "Could not create queue entry to release reserve\n");
1032 #endif
1033     return NULL;
1034   }
1035   GNUNET_STATISTICS_update (h->stats,
1036                             gettext_noop
1037                             ("# RELEASE RESERVE requests executed"), 1,
1038                             GNUNET_NO);
1039   rrm = (struct ReleaseReserveMessage *) &qe[1];
1040   rrm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
1041   rrm->header.size = htons (sizeof (struct ReleaseReserveMessage));
1042   rrm->rid = htonl (rid);
1043   process_queue (h);
1044   return qe;
1045 }
1046
1047
1048 /**
1049  * Update a value in the datastore.
1050  *
1051  * @param h handle to the datastore
1052  * @param uid identifier for the value
1053  * @param priority how much to increase the priority of the value
1054  * @param expiration new expiration value should be MAX of existing and this argument
1055  * @param queue_priority ranking of this request in the priority queue
1056  * @param max_queue_size at what queue size should this request be dropped
1057  *        (if other requests of higher priority are in the queue)
1058  * @param timeout how long to wait at most for a response
1059  * @param cont continuation to call when done
1060  * @param cont_cls closure for cont
1061  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1062  *         cancel; note that even if NULL is returned, the callback will be invoked
1063  *         (or rather, will already have been invoked)
1064  */
1065 struct GNUNET_DATASTORE_QueueEntry *
1066 GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, uint64_t uid,
1067                          uint32_t priority,
1068                          struct GNUNET_TIME_Absolute expiration,
1069                          unsigned int queue_priority,
1070                          unsigned int max_queue_size,
1071                          struct GNUNET_TIME_Relative timeout,
1072                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1073                          void *cont_cls)
1074 {
1075   struct GNUNET_DATASTORE_QueueEntry *qe;
1076   struct UpdateMessage *um;
1077   union QueueContext qc;
1078
1079   if (cont == NULL)
1080     cont = &drop_status_cont;
1081 #if DEBUG_DATASTORE
1082   LOG (GNUNET_ERROR_TYPE_DEBUG,
1083        "Asked to update entry %llu raising priority by %u and expiration to %llu\n",
1084        uid, (unsigned int) priority, (unsigned long long) expiration.abs_value);
1085 #endif
1086   qc.sc.cont = cont;
1087   qc.sc.cont_cls = cont_cls;
1088   qe = make_queue_entry (h, sizeof (struct UpdateMessage), queue_priority,
1089                          max_queue_size, timeout, &process_status_message, &qc);
1090   if (qe == NULL)
1091   {
1092 #if DEBUG_DATASTORE
1093     LOG (GNUNET_ERROR_TYPE_DEBUG, "Could not create queue entry for UPDATE\n");
1094 #endif
1095     return NULL;
1096   }
1097   GNUNET_STATISTICS_update (h->stats,
1098                             gettext_noop ("# UPDATE requests executed"), 1,
1099                             GNUNET_NO);
1100   um = (struct UpdateMessage *) &qe[1];
1101   um->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
1102   um->header.size = htons (sizeof (struct UpdateMessage));
1103   um->priority = htonl (priority);
1104   um->expiration = GNUNET_TIME_absolute_hton (expiration);
1105   um->uid = GNUNET_htonll (uid);
1106   process_queue (h);
1107   return qe;
1108 }
1109
1110
1111 /**
1112  * Explicitly remove some content from the database.
1113  * The "cont"inuation will be called with status
1114  * "GNUNET_OK" if content was removed, "GNUNET_NO"
1115  * if no matching entry was found and "GNUNET_SYSERR"
1116  * on all other types of errors.
1117  *
1118  * @param h handle to the datastore
1119  * @param key key for the value
1120  * @param size number of bytes in data
1121  * @param data content stored
1122  * @param queue_priority ranking of this request in the priority queue
1123  * @param max_queue_size at what queue size should this request be dropped
1124  *        (if other requests of higher priority are in the queue)
1125  * @param timeout how long to wait at most for a response
1126  * @param cont continuation to call when done
1127  * @param cont_cls closure for cont
1128  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1129  *         cancel; note that even if NULL is returned, the callback will be invoked
1130  *         (or rather, will already have been invoked)
1131  */
1132 struct GNUNET_DATASTORE_QueueEntry *
1133 GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
1134                          const GNUNET_HashCode * key, size_t size,
1135                          const void *data, unsigned int queue_priority,
1136                          unsigned int max_queue_size,
1137                          struct GNUNET_TIME_Relative timeout,
1138                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1139                          void *cont_cls)
1140 {
1141   struct GNUNET_DATASTORE_QueueEntry *qe;
1142   struct DataMessage *dm;
1143   size_t msize;
1144   union QueueContext qc;
1145
1146   if (cont == NULL)
1147     cont = &drop_status_cont;
1148 #if DEBUG_DATASTORE
1149   LOG (GNUNET_ERROR_TYPE_DEBUG, "Asked to remove %u bytes under key `%s'\n",
1150        size, GNUNET_h2s (key));
1151 #endif
1152   qc.sc.cont = cont;
1153   qc.sc.cont_cls = cont_cls;
1154   msize = sizeof (struct DataMessage) + size;
1155   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1156   qe = make_queue_entry (h, msize, queue_priority, max_queue_size, timeout,
1157                          &process_status_message, &qc);
1158   if (qe == NULL)
1159   {
1160 #if DEBUG_DATASTORE
1161     LOG (GNUNET_ERROR_TYPE_DEBUG, "Could not create queue entry for REMOVE\n");
1162 #endif
1163     return NULL;
1164   }
1165   GNUNET_STATISTICS_update (h->stats,
1166                             gettext_noop ("# REMOVE requests executed"), 1,
1167                             GNUNET_NO);
1168   dm = (struct DataMessage *) &qe[1];
1169   dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
1170   dm->header.size = htons (msize);
1171   dm->rid = htonl (0);
1172   dm->size = htonl (size);
1173   dm->type = htonl (0);
1174   dm->priority = htonl (0);
1175   dm->anonymity = htonl (0);
1176   dm->uid = GNUNET_htonll (0);
1177   dm->expiration = GNUNET_TIME_absolute_hton (GNUNET_TIME_UNIT_ZERO_ABS);
1178   dm->key = *key;
1179   memcpy (&dm[1], data, size);
1180   process_queue (h);
1181   return qe;
1182 }
1183
1184
1185 /**
1186  * Type of a function to call when we receive a message
1187  * from the service.
1188  *
1189  * @param cls closure
1190  * @param msg message received, NULL on timeout or fatal error
1191  */
1192 static void
1193 process_result_message (void *cls, const struct GNUNET_MessageHeader *msg)
1194 {
1195   struct GNUNET_DATASTORE_Handle *h = cls;
1196   struct GNUNET_DATASTORE_QueueEntry *qe;
1197   struct ResultContext rc;
1198   const struct DataMessage *dm;
1199   int was_transmitted;
1200
1201   if (msg == NULL)
1202   {
1203     qe = h->queue_head;
1204     GNUNET_assert (NULL != qe);
1205     rc = qe->qc.rc;
1206     was_transmitted = qe->was_transmitted;
1207     free_queue_entry (qe);
1208     if (was_transmitted == GNUNET_YES)
1209     {
1210       LOG (GNUNET_ERROR_TYPE_WARNING,
1211            _("Failed to receive response from database.\n"));
1212       do_disconnect (h);
1213     }
1214     else
1215     {
1216       process_queue (h);
1217     }
1218     if (rc.proc != NULL)
1219       rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
1220                0);
1221     return;
1222   }
1223   if (ntohs (msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END)
1224   {
1225     GNUNET_break (ntohs (msg->size) == sizeof (struct GNUNET_MessageHeader));
1226     qe = h->queue_head;
1227     rc = qe->qc.rc;
1228     GNUNET_assert (GNUNET_YES == qe->was_transmitted);
1229     free_queue_entry (qe);
1230 #if DEBUG_DATASTORE
1231     LOG (GNUNET_ERROR_TYPE_DEBUG,
1232          "Received end of result set, new queue size is %u\n", h->queue_size);
1233 #endif
1234     h->retry_time.rel_value = 0;
1235     h->result_count = 0;
1236     process_queue (h);
1237     if (rc.proc != NULL)
1238       rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
1239                0);
1240     return;
1241   }
1242   qe = h->queue_head;
1243   GNUNET_assert (NULL != qe);
1244   rc = qe->qc.rc;
1245   if (GNUNET_YES != qe->was_transmitted)
1246   {
1247     GNUNET_break (0);
1248     free_queue_entry (qe);
1249     h->retry_time = GNUNET_TIME_UNIT_ZERO;
1250     do_disconnect (h);
1251     if (rc.proc != NULL)
1252       rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
1253                0);
1254     return;
1255   }
1256   if ((ntohs (msg->size) < sizeof (struct DataMessage)) ||
1257       (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) ||
1258       (ntohs (msg->size) !=
1259        sizeof (struct DataMessage) +
1260        ntohl (((const struct DataMessage *) msg)->size)))
1261   {
1262     GNUNET_break (0);
1263     free_queue_entry (qe);
1264     h->retry_time = GNUNET_TIME_UNIT_ZERO;
1265     do_disconnect (h);
1266     if (rc.proc != NULL)
1267       rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
1268                0);
1269     return;
1270   }
1271   GNUNET_STATISTICS_update (h->stats, gettext_noop ("# Results received"), 1,
1272                             GNUNET_NO);
1273   dm = (const struct DataMessage *) msg;
1274 #if DEBUG_DATASTORE
1275   LOG (GNUNET_ERROR_TYPE_DEBUG,
1276        "Received result %llu with type %u and size %u with key %s\n",
1277        (unsigned long long) GNUNET_ntohll (dm->uid), ntohl (dm->type),
1278        ntohl (dm->size), GNUNET_h2s (&dm->key));
1279 #endif
1280   free_queue_entry (qe);
1281   h->retry_time.rel_value = 0;
1282   process_queue (h);
1283   if (rc.proc != NULL)
1284     rc.proc (rc.proc_cls, &dm->key, ntohl (dm->size), &dm[1], ntohl (dm->type),
1285              ntohl (dm->priority), ntohl (dm->anonymity),
1286              GNUNET_TIME_absolute_ntoh (dm->expiration),
1287              GNUNET_ntohll (dm->uid));
1288 }
1289
1290
1291 /**
1292  * Get a random value from the datastore for content replication.
1293  * Returns a single, random value among those with the highest
1294  * replication score, lowering positive replication scores by one for
1295  * the chosen value (if only content with a replication score exists,
1296  * a random value is returned and replication scores are not changed).
1297  *
1298  * @param h handle to the datastore
1299  * @param queue_priority ranking of this request in the priority queue
1300  * @param max_queue_size at what queue size should this request be dropped
1301  *        (if other requests of higher priority are in the queue)
1302  * @param timeout how long to wait at most for a response
1303  * @param proc function to call on a random value; it
1304  *        will be called once with a value (if available)
1305  *        and always once with a value of NULL.
1306  * @param proc_cls closure for proc
1307  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1308  *         cancel
1309  */
1310 struct GNUNET_DATASTORE_QueueEntry *
1311 GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
1312                                       unsigned int queue_priority,
1313                                       unsigned int max_queue_size,
1314                                       struct GNUNET_TIME_Relative timeout,
1315                                       GNUNET_DATASTORE_DatumProcessor proc,
1316                                       void *proc_cls)
1317 {
1318   struct GNUNET_DATASTORE_QueueEntry *qe;
1319   struct GNUNET_MessageHeader *m;
1320   union QueueContext qc;
1321
1322   GNUNET_assert (NULL != proc);
1323 #if DEBUG_DATASTORE
1324   LOG (GNUNET_ERROR_TYPE_DEBUG, "Asked to get replication entry in %llu ms\n",
1325        (unsigned long long) timeout.rel_value);
1326 #endif
1327   qc.rc.proc = proc;
1328   qc.rc.proc_cls = proc_cls;
1329   qe = make_queue_entry (h, sizeof (struct GNUNET_MessageHeader),
1330                          queue_priority, max_queue_size, timeout,
1331                          &process_result_message, &qc);
1332   if (qe == NULL)
1333   {
1334 #if DEBUG_DATASTORE
1335     LOG (GNUNET_ERROR_TYPE_DEBUG,
1336          "Could not create queue entry for GET REPLICATION\n");
1337 #endif
1338     return NULL;
1339   }
1340   GNUNET_STATISTICS_update (h->stats,
1341                             gettext_noop
1342                             ("# GET REPLICATION requests executed"), 1,
1343                             GNUNET_NO);
1344   m = (struct GNUNET_MessageHeader *) &qe[1];
1345   m->type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION);
1346   m->size = htons (sizeof (struct GNUNET_MessageHeader));
1347   process_queue (h);
1348   return qe;
1349 }
1350
1351
1352 /**
1353  * Get a single zero-anonymity value from the datastore.
1354  *
1355  * @param h handle to the datastore
1356  * @param offset offset of the result (modulo num-results); set to
1357  *               a random 64-bit value initially; then increment by
1358  *               one each time; detect that all results have been found by uid
1359  *               being again the first uid ever returned.
1360  * @param queue_priority ranking of this request in the priority queue
1361  * @param max_queue_size at what queue size should this request be dropped
1362  *        (if other requests of higher priority are in the queue)
1363  * @param timeout how long to wait at most for a response
1364  * @param type allowed type for the operation (never zero)
1365  * @param proc function to call on a random value; it
1366  *        will be called once with a value (if available)
1367  *        or with NULL if none value exists.
1368  * @param proc_cls closure for proc
1369  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1370  *         cancel
1371  */
1372 struct GNUNET_DATASTORE_QueueEntry *
1373 GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1374                                      uint64_t offset,
1375                                      unsigned int queue_priority,
1376                                      unsigned int max_queue_size,
1377                                      struct GNUNET_TIME_Relative timeout,
1378                                      enum GNUNET_BLOCK_Type type,
1379                                      GNUNET_DATASTORE_DatumProcessor proc,
1380                                      void *proc_cls)
1381 {
1382   struct GNUNET_DATASTORE_QueueEntry *qe;
1383   struct GetZeroAnonymityMessage *m;
1384   union QueueContext qc;
1385
1386   GNUNET_assert (NULL != proc);
1387   GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY);
1388 #if DEBUG_DATASTORE
1389   LOG (GNUNET_ERROR_TYPE_DEBUG,
1390        "Asked to get %llu-th zero-anonymity entry of type %d in %llu ms\n",
1391        (unsigned long long) offset, type,
1392        (unsigned long long) timeout.rel_value);
1393 #endif
1394   qc.rc.proc = proc;
1395   qc.rc.proc_cls = proc_cls;
1396   qe = make_queue_entry (h, sizeof (struct GetZeroAnonymityMessage),
1397                          queue_priority, max_queue_size, timeout,
1398                          &process_result_message, &qc);
1399   if (qe == NULL)
1400   {
1401 #if DEBUG_DATASTORE
1402     LOG (GNUNET_ERROR_TYPE_DEBUG,
1403          "Could not create queue entry for zero-anonymity procation\n");
1404 #endif
1405     return NULL;
1406   }
1407   GNUNET_STATISTICS_update (h->stats,
1408                             gettext_noop
1409                             ("# GET ZERO ANONYMITY requests executed"), 1,
1410                             GNUNET_NO);
1411   m = (struct GetZeroAnonymityMessage *) &qe[1];
1412   m->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY);
1413   m->header.size = htons (sizeof (struct GetZeroAnonymityMessage));
1414   m->type = htonl ((uint32_t) type);
1415   m->offset = GNUNET_htonll (offset);
1416   process_queue (h);
1417   return qe;
1418 }
1419
1420
1421 /**
1422  * Get a result for a particular key from the datastore.  The processor
1423  * will only be called once.
1424  *
1425  * @param h handle to the datastore
1426  * @param offset offset of the result (modulo num-results); set to
1427  *               a random 64-bit value initially; then increment by
1428  *               one each time; detect that all results have been found by uid
1429  *               being again the first uid ever returned.
1430  * @param key maybe NULL (to match all entries)
1431  * @param type desired type, 0 for any
1432  * @param queue_priority ranking of this request in the priority queue
1433  * @param max_queue_size at what queue size should this request be dropped
1434  *        (if other requests of higher priority are in the queue)
1435  * @param timeout how long to wait at most for a response
1436  * @param proc function to call on each matching value;
1437  *        will be called once with a NULL value at the end
1438  * @param proc_cls closure for proc
1439  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1440  *         cancel
1441  */
1442 struct GNUNET_DATASTORE_QueueEntry *
1443 GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h, uint64_t offset,
1444                           const GNUNET_HashCode * key,
1445                           enum GNUNET_BLOCK_Type type,
1446                           unsigned int queue_priority,
1447                           unsigned int max_queue_size,
1448                           struct GNUNET_TIME_Relative timeout,
1449                           GNUNET_DATASTORE_DatumProcessor proc, void *proc_cls)
1450 {
1451   struct GNUNET_DATASTORE_QueueEntry *qe;
1452   struct GetMessage *gm;
1453   union QueueContext qc;
1454
1455   GNUNET_assert (NULL != proc);
1456 #if DEBUG_DATASTORE
1457   LOG (GNUNET_ERROR_TYPE_DEBUG,
1458        "Asked to look for data of type %u under key `%s'\n",
1459        (unsigned int) type, GNUNET_h2s (key));
1460 #endif
1461   qc.rc.proc = proc;
1462   qc.rc.proc_cls = proc_cls;
1463   qe = make_queue_entry (h, sizeof (struct GetMessage), queue_priority,
1464                          max_queue_size, timeout, &process_result_message, &qc);
1465   if (qe == NULL)
1466   {
1467 #if DEBUG_DATASTORE
1468     LOG (GNUNET_ERROR_TYPE_DEBUG, "Could not queue request for `%s'\n",
1469          GNUNET_h2s (key));
1470 #endif
1471     return NULL;
1472   }
1473   GNUNET_STATISTICS_update (h->stats, gettext_noop ("# GET requests executed"),
1474                             1, GNUNET_NO);
1475   gm = (struct GetMessage *) &qe[1];
1476   gm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1477   gm->type = htonl (type);
1478   gm->offset = GNUNET_htonll (offset);
1479   if (key != NULL)
1480   {
1481     gm->header.size = htons (sizeof (struct GetMessage));
1482     gm->key = *key;
1483   }
1484   else
1485   {
1486     gm->header.size =
1487         htons (sizeof (struct GetMessage) - sizeof (GNUNET_HashCode));
1488   }
1489   process_queue (h);
1490   return qe;
1491 }
1492
1493
1494 /**
1495  * Cancel a datastore operation.  The final callback from the
1496  * operation must not have been done yet.
1497  *
1498  * @param qe operation to cancel
1499  */
1500 void
1501 GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
1502 {
1503   struct GNUNET_DATASTORE_Handle *h;
1504
1505   GNUNET_assert (GNUNET_SYSERR != qe->was_transmitted);
1506   h = qe->h;
1507 #if DEBUG_DATASTORE
1508   LOG (GNUNET_ERROR_TYPE_DEBUG,
1509        "Pending DATASTORE request %p cancelled (%d, %d)\n", qe,
1510        qe->was_transmitted, h->queue_head == qe);
1511 #endif
1512   if (GNUNET_YES == qe->was_transmitted)
1513   {
1514     free_queue_entry (qe);
1515     h->skip_next_messages++;
1516     return;
1517   }
1518   free_queue_entry (qe);
1519   process_queue (h);
1520 }
1521
1522
1523 /* end of datastore_api.c */