00a443fe481cf5ec5aba5e1533749d0a5e7801f1
[oweals/gnunet.git] / src / datastore / datastore_api.c
1 /*
2      This file is part of GNUnet
3      (C) 2004-2013 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/datastore_api.c
23  * @brief Management for the datastore for files stored on a GNUnet node.  Implements
24  *        a priority queue for requests (with timeouts).
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet_arm_service.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_datastore_service.h"
31 #include "gnunet_statistics_service.h"
32 #include "datastore.h"
33
34 #define LOG(kind,...) GNUNET_log_from (kind, "datastore-api",__VA_ARGS__)
35
36 /**
37  * Collect an instane number of statistics?  May cause excessive IPC.
38  */
39 #define INSANE_STATISTICS GNUNET_NO
40
41 /**
42  * If a client stopped asking for more results, how many more do
43  * we receive from the DB before killing the connection?  Trade-off
44  * between re-doing TCP handshakes and (needlessly) receiving
45  * useless results.
46  */
47 #define MAX_EXCESS_RESULTS 8
48
49 /**
50  * Context for processing status messages.
51  */
52 struct StatusContext
53 {
54   /**
55    * Continuation to call with the status.
56    */
57   GNUNET_DATASTORE_ContinuationWithStatus cont;
58
59   /**
60    * Closure for cont.
61    */
62   void *cont_cls;
63
64 };
65
66
67 /**
68  * Context for processing result messages.
69  */
70 struct ResultContext
71 {
72   /**
73    * Function to call with the result.
74    */
75   GNUNET_DATASTORE_DatumProcessor proc;
76
77   /**
78    * Closure for proc.
79    */
80   void *proc_cls;
81
82 };
83
84
85 /**
86  *  Context for a queue operation.
87  */
88 union QueueContext
89 {
90
91   struct StatusContext sc;
92
93   struct ResultContext rc;
94
95 };
96
97
98
99 /**
100  * Entry in our priority queue.
101  */
102 struct GNUNET_DATASTORE_QueueEntry
103 {
104
105   /**
106    * This is a linked list.
107    */
108   struct GNUNET_DATASTORE_QueueEntry *next;
109
110   /**
111    * This is a linked list.
112    */
113   struct GNUNET_DATASTORE_QueueEntry *prev;
114
115   /**
116    * Handle to the master context.
117    */
118   struct GNUNET_DATASTORE_Handle *h;
119
120   /**
121    * Response processor (NULL if we are not waiting for a response).
122    * This struct should be used for the closure, function-specific
123    * arguments can be passed via 'qc'.
124    */
125   GNUNET_CLIENT_MessageHandler response_proc;
126
127   /**
128    * Function to call after transmission of the request.
129    */
130   GNUNET_DATASTORE_ContinuationWithStatus cont;
131
132   /**
133    * Closure for 'cont'.
134    */
135   void *cont_cls;
136
137   /**
138    * Context for the operation.
139    */
140   union QueueContext qc;
141
142   /**
143    * Task for timeout signalling.
144    */
145   GNUNET_SCHEDULER_TaskIdentifier task;
146
147   /**
148    * Timeout for the current operation.
149    */
150   struct GNUNET_TIME_Absolute timeout;
151
152   /**
153    * Priority in the queue.
154    */
155   unsigned int priority;
156
157   /**
158    * Maximum allowed length of queue (otherwise
159    * this request should be discarded).
160    */
161   unsigned int max_queue;
162
163   /**
164    * Number of bytes in the request message following
165    * this struct.  32-bit value for nicer memory
166    * access (and overall struct alignment).
167    */
168   uint32_t message_size;
169
170   /**
171    * Has this message been transmitted to the service?
172    * Only ever GNUNET_YES for the head of the queue.
173    * Note that the overall struct should end at a
174    * multiple of 64 bits.
175    */
176   int was_transmitted;
177
178 };
179
180 /**
181  * Handle to the datastore service.
182  */
183 struct GNUNET_DATASTORE_Handle
184 {
185
186   /**
187    * Our configuration.
188    */
189   const struct GNUNET_CONFIGURATION_Handle *cfg;
190
191   /**
192    * Current connection to the datastore service.
193    */
194   struct GNUNET_CLIENT_Connection *client;
195
196   /**
197    * Handle for statistics.
198    */
199   struct GNUNET_STATISTICS_Handle *stats;
200
201   /**
202    * Current transmit handle.
203    */
204   struct GNUNET_CLIENT_TransmitHandle *th;
205
206   /**
207    * Current head of priority queue.
208    */
209   struct GNUNET_DATASTORE_QueueEntry *queue_head;
210
211   /**
212    * Current tail of priority queue.
213    */
214   struct GNUNET_DATASTORE_QueueEntry *queue_tail;
215
216   /**
217    * Task for trying to reconnect.
218    */
219   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
220
221   /**
222    * How quickly should we retry?  Used for exponential back-off on
223    * connect-errors.
224    */
225   struct GNUNET_TIME_Relative retry_time;
226
227   /**
228    * Number of entries in the queue.
229    */
230   unsigned int queue_size;
231
232   /**
233    * Number of results we're receiving for the current query
234    * after application stopped to care.  Used to determine when
235    * to reset the connection.
236    */
237   unsigned int result_count;
238
239   /**
240    * Are we currently trying to receive from the service?
241    */
242   int in_receive;
243
244   /**
245    * We should ignore the next message(s) from the service.
246    */
247   unsigned int skip_next_messages;
248
249 };
250
251
252
253 /**
254  * Connect to the datastore service.
255  *
256  * @param cfg configuration to use
257  * @return handle to use to access the service
258  */
259 struct GNUNET_DATASTORE_Handle *
260 GNUNET_DATASTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
261 {
262   struct GNUNET_CLIENT_Connection *c;
263   struct GNUNET_DATASTORE_Handle *h;
264
265   c = GNUNET_CLIENT_connect ("datastore", cfg);
266   if (c == NULL)
267     return NULL;                /* oops */
268   h = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_Handle) +
269                      GNUNET_SERVER_MAX_MESSAGE_SIZE - 1);
270   h->client = c;
271   h->cfg = cfg;
272   h->stats = GNUNET_STATISTICS_create ("datastore-api", cfg);
273   return h;
274 }
275
276
277 /**
278  * Task used by 'transmit_drop' to disconnect the datastore.
279  *
280  * @param cls the datastore handle
281  * @param tc scheduler context
282  */
283 static void
284 disconnect_after_drop (void *cls,
285                        const struct GNUNET_SCHEDULER_TaskContext *tc)
286 {
287   struct GNUNET_DATASTORE_Handle *h = cls;
288
289   GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
290 }
291
292
293 /**
294  * Transmit DROP message to datastore service.
295  *
296  * @param cls the 'struct GNUNET_DATASTORE_Handle'
297  * @param size number of bytes that can be copied to buf
298  * @param buf where to copy the drop message
299  * @return number of bytes written to buf
300  */
301 static size_t
302 transmit_drop (void *cls, size_t size, void *buf)
303 {
304   struct GNUNET_DATASTORE_Handle *h = cls;
305   struct GNUNET_MessageHeader *hdr;
306
307   if (buf == NULL)
308   {
309     LOG (GNUNET_ERROR_TYPE_WARNING,
310          _("Failed to transmit request to drop database.\n"));
311     GNUNET_SCHEDULER_add_continuation (&disconnect_after_drop, h,
312                                        GNUNET_SCHEDULER_REASON_PREREQ_DONE);
313     return 0;
314   }
315   GNUNET_assert (size >= sizeof (struct GNUNET_MessageHeader));
316   hdr = buf;
317   hdr->size = htons (sizeof (struct GNUNET_MessageHeader));
318   hdr->type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
319   GNUNET_SCHEDULER_add_continuation (&disconnect_after_drop, h,
320                                      GNUNET_SCHEDULER_REASON_PREREQ_DONE);
321   return sizeof (struct GNUNET_MessageHeader);
322 }
323
324
325 /**
326  * Disconnect from the datastore service (and free
327  * associated resources).
328  *
329  * @param h handle to the datastore
330  * @param drop set to GNUNET_YES to delete all data in datastore (!)
331  */
332 void
333 GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h, int drop)
334 {
335   struct GNUNET_DATASTORE_QueueEntry *qe;
336
337   LOG (GNUNET_ERROR_TYPE_DEBUG, "Datastore disconnect\n");
338   if (NULL != h->th)
339   {
340     GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
341     h->th = NULL;
342   }
343   if (h->client != NULL)
344   {
345     GNUNET_CLIENT_disconnect (h->client);
346     h->client = NULL;
347   }
348   if (h->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
349   {
350     GNUNET_SCHEDULER_cancel (h->reconnect_task);
351     h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
352   }
353   while (NULL != (qe = h->queue_head))
354   {
355     GNUNET_assert (NULL != qe->response_proc);
356     qe->response_proc (h, NULL);
357   }
358   if (GNUNET_YES == drop)
359   {
360     h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
361     if (h->client != NULL)
362     {
363       if (NULL !=
364           GNUNET_CLIENT_notify_transmit_ready (h->client,
365                                                sizeof (struct
366                                                        GNUNET_MessageHeader),
367                                                GNUNET_TIME_UNIT_MINUTES,
368                                                GNUNET_YES, &transmit_drop, h))
369         return;
370       GNUNET_CLIENT_disconnect (h->client);
371       h->client = NULL;
372     }
373     GNUNET_break (0);
374   }
375   GNUNET_STATISTICS_destroy (h->stats, GNUNET_NO);
376   h->stats = NULL;
377   GNUNET_free (h);
378 }
379
380
381 /**
382  * A request has timed out (before being transmitted to the service).
383  *
384  * @param cls the `struct GNUNET_DATASTORE_QueueEntry`
385  * @param tc scheduler context
386  */
387 static void
388 timeout_queue_entry (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
389 {
390   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
391
392   GNUNET_STATISTICS_update (qe->h->stats,
393                             gettext_noop ("# queue entry timeouts"), 1,
394                             GNUNET_NO);
395   qe->task = GNUNET_SCHEDULER_NO_TASK;
396   GNUNET_assert (GNUNET_NO == qe->was_transmitted);
397   LOG (GNUNET_ERROR_TYPE_DEBUG,
398        "Timeout of request in datastore queue\n");
399   /* response_proc's expect request at the head of the queue! */
400   GNUNET_CONTAINER_DLL_remove (h->queue_head, h->queue_tail, qe);
401   GNUNET_CONTAINER_DLL_insert (h->queue_head, h->queue_tail, qe);
402   GNUNET_assert (h->queue_head == qe);
403   qe->response_proc (qe->h, NULL);
404 }
405
406
407 /**
408  * Create a new entry for our priority queue (and possibly discard other entires if
409  * the queue is getting too long).
410  *
411  * @param h handle to the datastore
412  * @param msize size of the message to queue
413  * @param queue_priority priority of the entry
414  * @param max_queue_size at what queue size should this request be dropped
415  *        (if other requests of higher priority are in the queue)
416  * @param timeout timeout for the operation
417  * @param response_proc function to call with replies (can be NULL)
418  * @param qc client context (NOT a closure for @a response_proc)
419  * @return NULL if the queue is full
420  */
421 static struct GNUNET_DATASTORE_QueueEntry *
422 make_queue_entry (struct GNUNET_DATASTORE_Handle *h, size_t msize,
423                   unsigned int queue_priority, unsigned int max_queue_size,
424                   struct GNUNET_TIME_Relative timeout,
425                   GNUNET_CLIENT_MessageHandler response_proc,
426                   const union QueueContext *qc)
427 {
428   struct GNUNET_DATASTORE_QueueEntry *ret;
429   struct GNUNET_DATASTORE_QueueEntry *pos;
430   unsigned int c;
431
432   c = 0;
433   pos = h->queue_head;
434   while ((pos != NULL) && (c < max_queue_size) &&
435          (pos->priority >= queue_priority))
436   {
437     c++;
438     pos = pos->next;
439   }
440   if (c >= max_queue_size)
441   {
442     GNUNET_STATISTICS_update (h->stats, gettext_noop ("# queue overflows"), 1,
443                               GNUNET_NO);
444     return NULL;
445   }
446   ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize);
447   ret->h = h;
448   ret->response_proc = response_proc;
449   ret->qc = *qc;
450   ret->timeout = GNUNET_TIME_relative_to_absolute (timeout);
451   ret->priority = queue_priority;
452   ret->max_queue = max_queue_size;
453   ret->message_size = msize;
454   ret->was_transmitted = GNUNET_NO;
455   if (pos == NULL)
456   {
457     /* append at the tail */
458     pos = h->queue_tail;
459   }
460   else
461   {
462     pos = pos->prev;
463     /* do not insert at HEAD if HEAD query was already
464      * transmitted and we are still receiving replies! */
465     if ((pos == NULL) && (h->queue_head->was_transmitted))
466       pos = h->queue_head;
467   }
468   c++;
469 #if INSANE_STATISTICS
470   GNUNET_STATISTICS_update (h->stats, gettext_noop ("# queue entries created"),
471                             1, GNUNET_NO);
472 #endif
473   GNUNET_CONTAINER_DLL_insert_after (h->queue_head, h->queue_tail, pos, ret);
474   h->queue_size++;
475   ret->task = GNUNET_SCHEDULER_add_delayed (timeout, &timeout_queue_entry, ret);
476   for (pos = ret->next; NULL != pos; pos = pos->next)
477   {
478     if ((pos->max_queue < h->queue_size) && (pos->was_transmitted == GNUNET_NO))
479     {
480       GNUNET_assert (NULL != pos->response_proc);
481       /* move 'pos' element to head so that it will be
482        * killed on 'NULL' call below */
483       LOG (GNUNET_ERROR_TYPE_DEBUG,
484            "Dropping request from datastore queue\n");
485       /* response_proc's expect request at the head of the queue! */
486       GNUNET_CONTAINER_DLL_remove (h->queue_head, h->queue_tail, pos);
487       GNUNET_CONTAINER_DLL_insert (h->queue_head, h->queue_tail, pos);
488       GNUNET_STATISTICS_update (h->stats,
489                                 gettext_noop
490                                 ("# Requests dropped from datastore queue"), 1,
491                                 GNUNET_NO);
492       GNUNET_assert (h->queue_head == pos);
493       pos->response_proc (h, NULL);
494       break;
495     }
496   }
497   return ret;
498 }
499
500
501 /**
502  * Process entries in the queue (or do nothing if we are already
503  * doing so).
504  *
505  * @param h handle to the datastore
506  */
507 static void
508 process_queue (struct GNUNET_DATASTORE_Handle *h);
509
510
511 /**
512  * Try reconnecting to the datastore service.
513  *
514  * @param cls the 'struct GNUNET_DATASTORE_Handle'
515  * @param tc scheduler context
516  */
517 static void
518 try_reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
519 {
520   struct GNUNET_DATASTORE_Handle *h = cls;
521
522   h->retry_time = GNUNET_TIME_STD_BACKOFF (h->retry_time);
523   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
524   h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
525   if (h->client == NULL)
526   {
527     LOG (GNUNET_ERROR_TYPE_ERROR, "DATASTORE reconnect failed (fatally)\n");
528     return;
529   }
530   GNUNET_STATISTICS_update (h->stats,
531                             gettext_noop
532                             ("# datastore connections (re)created"), 1,
533                             GNUNET_NO);
534   LOG (GNUNET_ERROR_TYPE_DEBUG, "Reconnected to DATASTORE\n");
535   process_queue (h);
536 }
537
538
539 /**
540  * Disconnect from the service and then try reconnecting to the datastore service
541  * after some delay.
542  *
543  * @param h handle to datastore to disconnect and reconnect
544  */
545 static void
546 do_disconnect (struct GNUNET_DATASTORE_Handle *h)
547 {
548   if (h->client == NULL)
549   {
550     LOG (GNUNET_ERROR_TYPE_DEBUG,
551          "client NULL in disconnect, will not try to reconnect\n");
552     return;
553   }
554   GNUNET_CLIENT_disconnect (h->client);
555   h->skip_next_messages = 0;
556   h->client = NULL;
557   h->reconnect_task =
558       GNUNET_SCHEDULER_add_delayed (h->retry_time, &try_reconnect, h);
559 }
560
561
562 /**
563  * Function called whenever we receive a message from
564  * the service.  Calls the appropriate handler.
565  *
566  * @param cls the 'struct GNUNET_DATASTORE_Handle'
567  * @param msg the received message
568  */
569 static void
570 receive_cb (void *cls, const struct GNUNET_MessageHeader *msg)
571 {
572   struct GNUNET_DATASTORE_Handle *h = cls;
573   struct GNUNET_DATASTORE_QueueEntry *qe;
574
575   h->in_receive = GNUNET_NO;
576   LOG (GNUNET_ERROR_TYPE_DEBUG, "Receiving reply from datastore\n");
577   if (h->skip_next_messages > 0)
578   {
579     h->skip_next_messages--;
580     process_queue (h);
581     return;
582   }
583   if (NULL == (qe = h->queue_head))
584   {
585     GNUNET_break (0);
586     process_queue (h);
587     return;
588   }
589   qe->response_proc (h, msg);
590 }
591
592
593 /**
594  * Transmit request from queue to datastore service.
595  *
596  * @param cls the 'struct GNUNET_DATASTORE_Handle'
597  * @param size number of bytes that can be copied to buf
598  * @param buf where to copy the drop message
599  * @return number of bytes written to buf
600  */
601 static size_t
602 transmit_request (void *cls, size_t size, void *buf)
603 {
604   struct GNUNET_DATASTORE_Handle *h = cls;
605   struct GNUNET_DATASTORE_QueueEntry *qe;
606   size_t msize;
607
608   h->th = NULL;
609   if (NULL == (qe = h->queue_head))
610     return 0;                   /* no entry in queue */
611   if (buf == NULL)
612   {
613     LOG (GNUNET_ERROR_TYPE_DEBUG, "Failed to transmit request to DATASTORE.\n");
614     GNUNET_STATISTICS_update (h->stats,
615                               gettext_noop ("# transmission request failures"),
616                               1, GNUNET_NO);
617     do_disconnect (h);
618     return 0;
619   }
620   if (size < (msize = qe->message_size))
621   {
622     process_queue (h);
623     return 0;
624   }
625   LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmitting %u byte request to DATASTORE\n",
626        msize);
627   memcpy (buf, &qe[1], msize);
628   qe->was_transmitted = GNUNET_YES;
629   GNUNET_SCHEDULER_cancel (qe->task);
630   qe->task = GNUNET_SCHEDULER_NO_TASK;
631   GNUNET_assert (GNUNET_NO == h->in_receive);
632   h->in_receive = GNUNET_YES;
633   GNUNET_CLIENT_receive (h->client, &receive_cb, h,
634                          GNUNET_TIME_absolute_get_remaining (qe->timeout));
635 #if INSANE_STATISTICS
636   GNUNET_STATISTICS_update (h->stats,
637                             gettext_noop ("# bytes sent to datastore"), 1,
638                             GNUNET_NO);
639 #endif
640   return msize;
641 }
642
643
644 /**
645  * Process entries in the queue (or do nothing if we are already
646  * doing so).
647  *
648  * @param h handle to the datastore
649  */
650 static void
651 process_queue (struct GNUNET_DATASTORE_Handle *h)
652 {
653   struct GNUNET_DATASTORE_QueueEntry *qe;
654
655   if (NULL == (qe = h->queue_head))
656   {
657     LOG (GNUNET_ERROR_TYPE_DEBUG, "Queue empty\n");
658     return;                     /* no entry in queue */
659   }
660   if (qe->was_transmitted == GNUNET_YES)
661   {
662     LOG (GNUNET_ERROR_TYPE_DEBUG, "Head request already transmitted\n");
663     return;                     /* waiting for replies */
664   }
665   if (h->th != NULL)
666   {
667     LOG (GNUNET_ERROR_TYPE_DEBUG, "Pending transmission request\n");
668     return;                     /* request pending */
669   }
670   if (h->client == NULL)
671   {
672     LOG (GNUNET_ERROR_TYPE_DEBUG, "Not connected\n");
673     return;                     /* waiting for reconnect */
674   }
675   if (GNUNET_YES == h->in_receive)
676   {
677     /* wait for response to previous query */
678     return;
679   }
680   LOG (GNUNET_ERROR_TYPE_DEBUG, "Queueing %u byte request to DATASTORE\n",
681        qe->message_size);
682   h->th =
683       GNUNET_CLIENT_notify_transmit_ready (h->client, qe->message_size,
684                                            GNUNET_TIME_absolute_get_remaining
685                                            (qe->timeout), GNUNET_YES,
686                                            &transmit_request, h);
687   GNUNET_assert (GNUNET_NO == h->in_receive);
688   GNUNET_break (NULL != h->th);
689 }
690
691
692 /**
693  * Dummy continuation used to do nothing (but be non-zero).
694  *
695  * @param cls closure
696  * @param result result
697  * @param min_expiration expiration time
698  * @param emsg error message
699  */
700 static void
701 drop_status_cont (void *cls, int32_t result,
702                   struct GNUNET_TIME_Absolute min_expiration,
703                   const char *emsg)
704 {
705   /* do nothing */
706 }
707
708
709 /**
710  * Free a queue entry.  Removes the given entry from the
711  * queue and releases associated resources.  Does NOT
712  * call the callback.
713  *
714  * @param qe entry to free.
715  */
716 static void
717 free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
718 {
719   struct GNUNET_DATASTORE_Handle *h = qe->h;
720
721   GNUNET_CONTAINER_DLL_remove (h->queue_head, h->queue_tail, qe);
722   if (qe->task != GNUNET_SCHEDULER_NO_TASK)
723   {
724     GNUNET_SCHEDULER_cancel (qe->task);
725     qe->task = GNUNET_SCHEDULER_NO_TASK;
726   }
727   h->queue_size--;
728   qe->was_transmitted = GNUNET_SYSERR;  /* use-after-free warning */
729   GNUNET_free (qe);
730 }
731
732
733 /**
734  * Type of a function to call when we receive a message
735  * from the service.
736  *
737  * @param cls closure
738  * @param msg message received, NULL on timeout or fatal error
739  */
740 static void
741 process_status_message (void *cls, const struct GNUNET_MessageHeader *msg)
742 {
743   struct GNUNET_DATASTORE_Handle *h = cls;
744   struct GNUNET_DATASTORE_QueueEntry *qe;
745   struct StatusContext rc;
746   const struct StatusMessage *sm;
747   const char *emsg;
748   int32_t status;
749   int was_transmitted;
750
751   if (NULL == (qe = h->queue_head))
752   {
753     GNUNET_break (0);
754     do_disconnect (h);
755     return;
756   }
757   rc = qe->qc.sc;
758   if (msg == NULL)
759   {
760     was_transmitted = qe->was_transmitted;
761     free_queue_entry (qe);
762     if (was_transmitted == GNUNET_YES)
763       do_disconnect (h);
764     else
765       process_queue (h);
766     if (rc.cont != NULL)
767       rc.cont (rc.cont_cls, GNUNET_SYSERR,
768                GNUNET_TIME_UNIT_ZERO_ABS,
769                _("Failed to receive status response from database."));
770     return;
771   }
772   GNUNET_assert (GNUNET_YES == qe->was_transmitted);
773   free_queue_entry (qe);
774   if ((ntohs (msg->size) < sizeof (struct StatusMessage)) ||
775       (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS))
776   {
777     GNUNET_break (0);
778     h->retry_time = GNUNET_TIME_UNIT_ZERO;
779     do_disconnect (h);
780     if (rc.cont != NULL)
781       rc.cont (rc.cont_cls, GNUNET_SYSERR,
782                GNUNET_TIME_UNIT_ZERO_ABS,
783                _("Error reading response from datastore service"));
784     return;
785   }
786   sm = (const struct StatusMessage *) msg;
787   status = ntohl (sm->status);
788   emsg = NULL;
789   if (ntohs (msg->size) > sizeof (struct StatusMessage))
790   {
791     emsg = (const char *) &sm[1];
792     if (emsg[ntohs (msg->size) - sizeof (struct StatusMessage) - 1] != '\0')
793     {
794       GNUNET_break (0);
795       emsg = _("Invalid error message received from datastore service");
796     }
797   }
798   if ((status == GNUNET_SYSERR) && (emsg == NULL))
799   {
800     GNUNET_break (0);
801     emsg = _("Invalid error message received from datastore service");
802   }
803   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received status %d/%s\n", (int) status, emsg);
804   GNUNET_STATISTICS_update (h->stats,
805                             gettext_noop ("# status messages received"), 1,
806                             GNUNET_NO);
807   h->retry_time = GNUNET_TIME_UNIT_ZERO;
808   process_queue (h);
809   if (rc.cont != NULL)
810     rc.cont (rc.cont_cls, status,
811              GNUNET_TIME_absolute_ntoh (sm->min_expiration),
812              emsg);
813 }
814
815
816 /**
817  * Store an item in the datastore.  If the item is already present,
818  * the priorities are summed up and the higher expiration time and
819  * lower anonymity level is used.
820  *
821  * @param h handle to the datastore
822  * @param rid reservation ID to use (from "reserve"); use 0 if no
823  *            prior reservation was made
824  * @param key key for the value
825  * @param size number of bytes in data
826  * @param data content stored
827  * @param type type of the content
828  * @param priority priority of the content
829  * @param anonymity anonymity-level for the content
830  * @param replication how often should the content be replicated to other peers?
831  * @param expiration expiration time for the content
832  * @param queue_priority ranking of this request in the priority queue
833  * @param max_queue_size at what queue size should this request be dropped
834  *        (if other requests of higher priority are in the queue)
835  * @param timeout timeout for the operation
836  * @param cont continuation to call when done
837  * @param cont_cls closure for cont
838  * @return NULL if the entry was not queued, otherwise a handle that can be used to
839  *         cancel; note that even if NULL is returned, the callback will be invoked
840  *         (or rather, will already have been invoked)
841  */
842 struct GNUNET_DATASTORE_QueueEntry *
843 GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, uint32_t rid,
844                       const struct GNUNET_HashCode * key, size_t size,
845                       const void *data, enum GNUNET_BLOCK_Type type,
846                       uint32_t priority, uint32_t anonymity,
847                       uint32_t replication,
848                       struct GNUNET_TIME_Absolute expiration,
849                       unsigned int queue_priority, unsigned int max_queue_size,
850                       struct GNUNET_TIME_Relative timeout,
851                       GNUNET_DATASTORE_ContinuationWithStatus cont,
852                       void *cont_cls)
853 {
854   struct GNUNET_DATASTORE_QueueEntry *qe;
855   struct DataMessage *dm;
856   size_t msize;
857   union QueueContext qc;
858
859   LOG (GNUNET_ERROR_TYPE_DEBUG,
860        "Asked to put %u bytes of data under key `%s' for %s\n", size,
861        GNUNET_h2s (key),
862        GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (expiration),
863                                                GNUNET_YES));
864   msize = sizeof (struct DataMessage) + size;
865   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
866   qc.sc.cont = cont;
867   qc.sc.cont_cls = cont_cls;
868   qe = make_queue_entry (h, msize, queue_priority, max_queue_size, timeout,
869                          &process_status_message, &qc);
870   if (qe == NULL)
871   {
872     LOG (GNUNET_ERROR_TYPE_DEBUG, "Could not create queue entry for PUT\n");
873     return NULL;
874   }
875   GNUNET_STATISTICS_update (h->stats, gettext_noop ("# PUT requests executed"),
876                             1, GNUNET_NO);
877   dm = (struct DataMessage *) &qe[1];
878   dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
879   dm->header.size = htons (msize);
880   dm->rid = htonl (rid);
881   dm->size = htonl ((uint32_t) size);
882   dm->type = htonl (type);
883   dm->priority = htonl (priority);
884   dm->anonymity = htonl (anonymity);
885   dm->replication = htonl (replication);
886   dm->reserved = htonl (0);
887   dm->uid = GNUNET_htonll (0);
888   dm->expiration = GNUNET_TIME_absolute_hton (expiration);
889   dm->key = *key;
890   memcpy (&dm[1], data, size);
891   process_queue (h);
892   return qe;
893 }
894
895
896 /**
897  * Reserve space in the datastore.  This function should be used
898  * to avoid "out of space" failures during a longer sequence of "put"
899  * operations (for example, when a file is being inserted).
900  *
901  * @param h handle to the datastore
902  * @param amount how much space (in bytes) should be reserved (for content only)
903  * @param entries how many entries will be created (to calculate per-entry overhead)
904  * @param queue_priority ranking of this request in the priority queue
905  * @param max_queue_size at what queue size should this request be dropped
906  *        (if other requests of higher priority are in the queue)
907  * @param timeout how long to wait at most for a response (or before dying in queue)
908  * @param cont continuation to call when done; "success" will be set to
909  *             a positive reservation value if space could be reserved.
910  * @param cont_cls closure for cont
911  * @return NULL if the entry was not queued, otherwise a handle that can be used to
912  *         cancel; note that even if NULL is returned, the callback will be invoked
913  *         (or rather, will already have been invoked)
914  */
915 struct GNUNET_DATASTORE_QueueEntry *
916 GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, uint64_t amount,
917                           uint32_t entries, unsigned int queue_priority,
918                           unsigned int max_queue_size,
919                           struct GNUNET_TIME_Relative timeout,
920                           GNUNET_DATASTORE_ContinuationWithStatus cont,
921                           void *cont_cls)
922 {
923   struct GNUNET_DATASTORE_QueueEntry *qe;
924   struct ReserveMessage *rm;
925   union QueueContext qc;
926
927   if (cont == NULL)
928     cont = &drop_status_cont;
929   LOG (GNUNET_ERROR_TYPE_DEBUG,
930        "Asked to reserve %llu bytes of data and %u entries\n",
931        (unsigned long long) amount, (unsigned int) entries);
932   qc.sc.cont = cont;
933   qc.sc.cont_cls = cont_cls;
934   qe = make_queue_entry (h, sizeof (struct ReserveMessage), queue_priority,
935                          max_queue_size, timeout, &process_status_message, &qc);
936   if (qe == NULL)
937   {
938     LOG (GNUNET_ERROR_TYPE_DEBUG, "Could not create queue entry to reserve\n");
939     return NULL;
940   }
941   GNUNET_STATISTICS_update (h->stats,
942                             gettext_noop ("# RESERVE requests executed"), 1,
943                             GNUNET_NO);
944   rm = (struct ReserveMessage *) &qe[1];
945   rm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
946   rm->header.size = htons (sizeof (struct ReserveMessage));
947   rm->entries = htonl (entries);
948   rm->amount = GNUNET_htonll (amount);
949   process_queue (h);
950   return qe;
951 }
952
953
954 /**
955  * Signal that all of the data for which a reservation was made has
956  * been stored and that whatever excess space might have been reserved
957  * can now be released.
958  *
959  * @param h handle to the datastore
960  * @param rid reservation ID (value of "success" in original continuation
961  *        from the "reserve" function).
962  * @param queue_priority ranking of this request in the priority queue
963  * @param max_queue_size at what queue size should this request be dropped
964  *        (if other requests of higher priority are in the queue)
965  * @param queue_priority ranking of this request in the priority queue
966  * @param max_queue_size at what queue size should this request be dropped
967  *        (if other requests of higher priority are in the queue)
968  * @param timeout how long to wait at most for a response
969  * @param cont continuation to call when done
970  * @param cont_cls closure for cont
971  * @return NULL if the entry was not queued, otherwise a handle that can be used to
972  *         cancel; note that even if NULL is returned, the callback will be invoked
973  *         (or rather, will already have been invoked)
974  */
975 struct GNUNET_DATASTORE_QueueEntry *
976 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
977                                   uint32_t rid, unsigned int queue_priority,
978                                   unsigned int max_queue_size,
979                                   struct GNUNET_TIME_Relative timeout,
980                                   GNUNET_DATASTORE_ContinuationWithStatus cont,
981                                   void *cont_cls)
982 {
983   struct GNUNET_DATASTORE_QueueEntry *qe;
984   struct ReleaseReserveMessage *rrm;
985   union QueueContext qc;
986
987   if (cont == NULL)
988     cont = &drop_status_cont;
989   LOG (GNUNET_ERROR_TYPE_DEBUG, "Asked to release reserve %d\n", rid);
990   qc.sc.cont = cont;
991   qc.sc.cont_cls = cont_cls;
992   qe = make_queue_entry (h, sizeof (struct ReleaseReserveMessage),
993                          queue_priority, max_queue_size, timeout,
994                          &process_status_message, &qc);
995   if (qe == NULL)
996   {
997     LOG (GNUNET_ERROR_TYPE_DEBUG,
998          "Could not create queue entry to release reserve\n");
999     return NULL;
1000   }
1001   GNUNET_STATISTICS_update (h->stats,
1002                             gettext_noop
1003                             ("# RELEASE RESERVE requests executed"), 1,
1004                             GNUNET_NO);
1005   rrm = (struct ReleaseReserveMessage *) &qe[1];
1006   rrm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
1007   rrm->header.size = htons (sizeof (struct ReleaseReserveMessage));
1008   rrm->rid = htonl (rid);
1009   process_queue (h);
1010   return qe;
1011 }
1012
1013
1014 /**
1015  * Update a value in the datastore.
1016  *
1017  * @param h handle to the datastore
1018  * @param uid identifier for the value
1019  * @param priority how much to increase the priority of the value
1020  * @param expiration new expiration value should be MAX of existing and this argument
1021  * @param queue_priority ranking of this request in the priority queue
1022  * @param max_queue_size at what queue size should this request be dropped
1023  *        (if other requests of higher priority are in the queue)
1024  * @param timeout how long to wait at most for a response
1025  * @param cont continuation to call when done
1026  * @param cont_cls closure for cont
1027  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1028  *         cancel; note that even if NULL is returned, the callback will be invoked
1029  *         (or rather, will already have been invoked)
1030  */
1031 struct GNUNET_DATASTORE_QueueEntry *
1032 GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, uint64_t uid,
1033                          uint32_t priority,
1034                          struct GNUNET_TIME_Absolute expiration,
1035                          unsigned int queue_priority,
1036                          unsigned int max_queue_size,
1037                          struct GNUNET_TIME_Relative timeout,
1038                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1039                          void *cont_cls)
1040 {
1041   struct GNUNET_DATASTORE_QueueEntry *qe;
1042   struct UpdateMessage *um;
1043   union QueueContext qc;
1044
1045   if (cont == NULL)
1046     cont = &drop_status_cont;
1047   LOG (GNUNET_ERROR_TYPE_DEBUG,
1048        "Asked to update entry %llu raising priority by %u and expiration to %s\n",
1049        uid,
1050        (unsigned int) priority,
1051        GNUNET_STRINGS_absolute_time_to_string (expiration));
1052   qc.sc.cont = cont;
1053   qc.sc.cont_cls = cont_cls;
1054   qe = make_queue_entry (h, sizeof (struct UpdateMessage), queue_priority,
1055                          max_queue_size, timeout, &process_status_message, &qc);
1056   if (qe == NULL)
1057   {
1058     LOG (GNUNET_ERROR_TYPE_DEBUG, "Could not create queue entry for UPDATE\n");
1059     return NULL;
1060   }
1061   GNUNET_STATISTICS_update (h->stats,
1062                             gettext_noop ("# UPDATE requests executed"), 1,
1063                             GNUNET_NO);
1064   um = (struct UpdateMessage *) &qe[1];
1065   um->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
1066   um->header.size = htons (sizeof (struct UpdateMessage));
1067   um->priority = htonl (priority);
1068   um->expiration = GNUNET_TIME_absolute_hton (expiration);
1069   um->uid = GNUNET_htonll (uid);
1070   process_queue (h);
1071   return qe;
1072 }
1073
1074
1075 /**
1076  * Explicitly remove some content from the database.
1077  * The "cont"inuation will be called with status
1078  * "GNUNET_OK" if content was removed, "GNUNET_NO"
1079  * if no matching entry was found and "GNUNET_SYSERR"
1080  * on all other types of errors.
1081  *
1082  * @param h handle to the datastore
1083  * @param key key for the value
1084  * @param size number of bytes in data
1085  * @param data content stored
1086  * @param queue_priority ranking of this request in the priority queue
1087  * @param max_queue_size at what queue size should this request be dropped
1088  *        (if other requests of higher priority are in the queue)
1089  * @param timeout how long to wait at most for a response
1090  * @param cont continuation to call when done
1091  * @param cont_cls closure for cont
1092  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1093  *         cancel; note that even if NULL is returned, the callback will be invoked
1094  *         (or rather, will already have been invoked)
1095  */
1096 struct GNUNET_DATASTORE_QueueEntry *
1097 GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
1098                          const struct GNUNET_HashCode * key, size_t size,
1099                          const void *data, unsigned int queue_priority,
1100                          unsigned int max_queue_size,
1101                          struct GNUNET_TIME_Relative timeout,
1102                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1103                          void *cont_cls)
1104 {
1105   struct GNUNET_DATASTORE_QueueEntry *qe;
1106   struct DataMessage *dm;
1107   size_t msize;
1108   union QueueContext qc;
1109
1110   if (cont == NULL)
1111     cont = &drop_status_cont;
1112   LOG (GNUNET_ERROR_TYPE_DEBUG, "Asked to remove %u bytes under key `%s'\n",
1113        size, GNUNET_h2s (key));
1114   qc.sc.cont = cont;
1115   qc.sc.cont_cls = cont_cls;
1116   msize = sizeof (struct DataMessage) + size;
1117   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1118   qe = make_queue_entry (h, msize, queue_priority, max_queue_size, timeout,
1119                          &process_status_message, &qc);
1120   if (qe == NULL)
1121   {
1122     LOG (GNUNET_ERROR_TYPE_DEBUG, "Could not create queue entry for REMOVE\n");
1123     return NULL;
1124   }
1125   GNUNET_STATISTICS_update (h->stats,
1126                             gettext_noop ("# REMOVE requests executed"), 1,
1127                             GNUNET_NO);
1128   dm = (struct DataMessage *) &qe[1];
1129   dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
1130   dm->header.size = htons (msize);
1131   dm->rid = htonl (0);
1132   dm->size = htonl (size);
1133   dm->type = htonl (0);
1134   dm->priority = htonl (0);
1135   dm->anonymity = htonl (0);
1136   dm->uid = GNUNET_htonll (0);
1137   dm->expiration = GNUNET_TIME_absolute_hton (GNUNET_TIME_UNIT_ZERO_ABS);
1138   dm->key = *key;
1139   memcpy (&dm[1], data, size);
1140   process_queue (h);
1141   return qe;
1142 }
1143
1144
1145 /**
1146  * Type of a function to call when we receive a message
1147  * from the service.
1148  *
1149  * @param cls closure
1150  * @param msg message received, NULL on timeout or fatal error
1151  */
1152 static void
1153 process_result_message (void *cls, const struct GNUNET_MessageHeader *msg)
1154 {
1155   struct GNUNET_DATASTORE_Handle *h = cls;
1156   struct GNUNET_DATASTORE_QueueEntry *qe;
1157   struct ResultContext rc;
1158   const struct DataMessage *dm;
1159   int was_transmitted;
1160
1161   if (NULL == msg)
1162   {
1163     qe = h->queue_head;
1164     GNUNET_assert (NULL != qe);
1165     rc = qe->qc.rc;
1166     was_transmitted = qe->was_transmitted;
1167     free_queue_entry (qe);
1168     if (GNUNET_YES == was_transmitted)
1169     {
1170       LOG (GNUNET_ERROR_TYPE_WARNING,
1171            _("Failed to receive response from database.\n"));
1172       do_disconnect (h);
1173     }
1174     else
1175     {
1176       process_queue (h);
1177     }
1178     if (NULL != rc.proc)
1179       rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
1180                0);
1181     return;
1182   }
1183   if (ntohs (msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END)
1184   {
1185     GNUNET_break (ntohs (msg->size) == sizeof (struct GNUNET_MessageHeader));
1186     qe = h->queue_head;
1187     rc = qe->qc.rc;
1188     GNUNET_assert (GNUNET_YES == qe->was_transmitted);
1189     free_queue_entry (qe);
1190     LOG (GNUNET_ERROR_TYPE_DEBUG,
1191          "Received end of result set, new queue size is %u\n", h->queue_size);
1192     h->retry_time = GNUNET_TIME_UNIT_ZERO;
1193     h->result_count = 0;
1194     process_queue (h);
1195     if (NULL != rc.proc)
1196       rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
1197                0);
1198     return;
1199   }
1200   qe = h->queue_head;
1201   GNUNET_assert (NULL != qe);
1202   rc = qe->qc.rc;
1203   if (GNUNET_YES != qe->was_transmitted)
1204   {
1205     GNUNET_break (0);
1206     free_queue_entry (qe);
1207     h->retry_time = GNUNET_TIME_UNIT_ZERO;
1208     do_disconnect (h);
1209     if (rc.proc != NULL)
1210       rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
1211                0);
1212     return;
1213   }
1214   if ((ntohs (msg->size) < sizeof (struct DataMessage)) ||
1215       (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) ||
1216       (ntohs (msg->size) !=
1217        sizeof (struct DataMessage) +
1218        ntohl (((const struct DataMessage *) msg)->size)))
1219   {
1220     GNUNET_break (0);
1221     free_queue_entry (qe);
1222     h->retry_time = GNUNET_TIME_UNIT_ZERO;
1223     do_disconnect (h);
1224     if (rc.proc != NULL)
1225       rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
1226                0);
1227     return;
1228   }
1229 #if INSANE_STATISTICS
1230   GNUNET_STATISTICS_update (h->stats, gettext_noop ("# Results received"), 1,
1231                             GNUNET_NO);
1232 #endif
1233   dm = (const struct DataMessage *) msg;
1234   LOG (GNUNET_ERROR_TYPE_DEBUG,
1235        "Received result %llu with type %u and size %u with key %s\n",
1236        (unsigned long long) GNUNET_ntohll (dm->uid), ntohl (dm->type),
1237        ntohl (dm->size), GNUNET_h2s (&dm->key));
1238   free_queue_entry (qe);
1239   h->retry_time = GNUNET_TIME_UNIT_ZERO;
1240   process_queue (h);
1241   if (rc.proc != NULL)
1242     rc.proc (rc.proc_cls, &dm->key, ntohl (dm->size), &dm[1], ntohl (dm->type),
1243              ntohl (dm->priority), ntohl (dm->anonymity),
1244              GNUNET_TIME_absolute_ntoh (dm->expiration),
1245              GNUNET_ntohll (dm->uid));
1246 }
1247
1248
1249 /**
1250  * Get a random value from the datastore for content replication.
1251  * Returns a single, random value among those with the highest
1252  * replication score, lowering positive replication scores by one for
1253  * the chosen value (if only content with a replication score exists,
1254  * a random value is returned and replication scores are not changed).
1255  *
1256  * @param h handle to the datastore
1257  * @param queue_priority ranking of this request in the priority queue
1258  * @param max_queue_size at what queue size should this request be dropped
1259  *        (if other requests of higher priority are in the queue)
1260  * @param timeout how long to wait at most for a response
1261  * @param proc function to call on a random value; it
1262  *        will be called once with a value (if available)
1263  *        and always once with a value of NULL.
1264  * @param proc_cls closure for proc
1265  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1266  *         cancel
1267  */
1268 struct GNUNET_DATASTORE_QueueEntry *
1269 GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
1270                                       unsigned int queue_priority,
1271                                       unsigned int max_queue_size,
1272                                       struct GNUNET_TIME_Relative timeout,
1273                                       GNUNET_DATASTORE_DatumProcessor proc,
1274                                       void *proc_cls)
1275 {
1276   struct GNUNET_DATASTORE_QueueEntry *qe;
1277   struct GNUNET_MessageHeader *m;
1278   union QueueContext qc;
1279
1280   GNUNET_assert (NULL != proc);
1281   LOG (GNUNET_ERROR_TYPE_DEBUG,
1282        "Asked to get replication entry in %s\n",
1283        GNUNET_STRINGS_relative_time_to_string (timeout, GNUNET_YES));
1284   qc.rc.proc = proc;
1285   qc.rc.proc_cls = proc_cls;
1286   qe = make_queue_entry (h, sizeof (struct GNUNET_MessageHeader),
1287                          queue_priority, max_queue_size, timeout,
1288                          &process_result_message, &qc);
1289   if (NULL == qe)
1290   {
1291     LOG (GNUNET_ERROR_TYPE_DEBUG,
1292          "Could not create queue entry for GET REPLICATION\n");
1293     return NULL;
1294   }
1295   GNUNET_STATISTICS_update (h->stats,
1296                             gettext_noop
1297                             ("# GET REPLICATION requests executed"), 1,
1298                             GNUNET_NO);
1299   m = (struct GNUNET_MessageHeader *) &qe[1];
1300   m->type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION);
1301   m->size = htons (sizeof (struct GNUNET_MessageHeader));
1302   process_queue (h);
1303   return qe;
1304 }
1305
1306
1307 /**
1308  * Get a single zero-anonymity value from the datastore.
1309  *
1310  * @param h handle to the datastore
1311  * @param offset offset of the result (modulo num-results); set to
1312  *               a random 64-bit value initially; then increment by
1313  *               one each time; detect that all results have been found by uid
1314  *               being again the first uid ever returned.
1315  * @param queue_priority ranking of this request in the priority queue
1316  * @param max_queue_size at what queue size should this request be dropped
1317  *        (if other requests of higher priority are in the queue)
1318  * @param timeout how long to wait at most for a response
1319  * @param type allowed type for the operation (never zero)
1320  * @param proc function to call on a random value; it
1321  *        will be called once with a value (if available)
1322  *        or with NULL if none value exists.
1323  * @param proc_cls closure for proc
1324  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1325  *         cancel
1326  */
1327 struct GNUNET_DATASTORE_QueueEntry *
1328 GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1329                                      uint64_t offset,
1330                                      unsigned int queue_priority,
1331                                      unsigned int max_queue_size,
1332                                      struct GNUNET_TIME_Relative timeout,
1333                                      enum GNUNET_BLOCK_Type type,
1334                                      GNUNET_DATASTORE_DatumProcessor proc,
1335                                      void *proc_cls)
1336 {
1337   struct GNUNET_DATASTORE_QueueEntry *qe;
1338   struct GetZeroAnonymityMessage *m;
1339   union QueueContext qc;
1340
1341   GNUNET_assert (NULL != proc);
1342   GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY);
1343   LOG (GNUNET_ERROR_TYPE_DEBUG,
1344        "Asked to get %llu-th zero-anonymity entry of type %d in %s\n",
1345        (unsigned long long) offset, type,
1346        GNUNET_STRINGS_relative_time_to_string (timeout, GNUNET_YES));
1347   qc.rc.proc = proc;
1348   qc.rc.proc_cls = proc_cls;
1349   qe = make_queue_entry (h, sizeof (struct GetZeroAnonymityMessage),
1350                          queue_priority, max_queue_size, timeout,
1351                          &process_result_message, &qc);
1352   if (NULL == qe)
1353   {
1354     LOG (GNUNET_ERROR_TYPE_DEBUG,
1355          "Could not create queue entry for zero-anonymity procation\n");
1356     return NULL;
1357   }
1358   GNUNET_STATISTICS_update (h->stats,
1359                             gettext_noop
1360                             ("# GET ZERO ANONYMITY requests executed"), 1,
1361                             GNUNET_NO);
1362   m = (struct GetZeroAnonymityMessage *) &qe[1];
1363   m->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY);
1364   m->header.size = htons (sizeof (struct GetZeroAnonymityMessage));
1365   m->type = htonl ((uint32_t) type);
1366   m->offset = GNUNET_htonll (offset);
1367   process_queue (h);
1368   return qe;
1369 }
1370
1371
1372 /**
1373  * Get a result for a particular key from the datastore.  The processor
1374  * will only be called once.
1375  *
1376  * @param h handle to the datastore
1377  * @param offset offset of the result (modulo num-results); set to
1378  *               a random 64-bit value initially; then increment by
1379  *               one each time; detect that all results have been found by uid
1380  *               being again the first uid ever returned.
1381  * @param key maybe NULL (to match all entries)
1382  * @param type desired type, 0 for any
1383  * @param queue_priority ranking of this request in the priority queue
1384  * @param max_queue_size at what queue size should this request be dropped
1385  *        (if other requests of higher priority are in the queue)
1386  * @param timeout how long to wait at most for a response
1387  * @param proc function to call on each matching value;
1388  *        will be called once with a NULL value at the end
1389  * @param proc_cls closure for proc
1390  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1391  *         cancel
1392  */
1393 struct GNUNET_DATASTORE_QueueEntry *
1394 GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h, uint64_t offset,
1395                           const struct GNUNET_HashCode * key,
1396                           enum GNUNET_BLOCK_Type type,
1397                           unsigned int queue_priority,
1398                           unsigned int max_queue_size,
1399                           struct GNUNET_TIME_Relative timeout,
1400                           GNUNET_DATASTORE_DatumProcessor proc, void *proc_cls)
1401 {
1402   struct GNUNET_DATASTORE_QueueEntry *qe;
1403   struct GetMessage *gm;
1404   union QueueContext qc;
1405
1406   GNUNET_assert (NULL != proc);
1407   LOG (GNUNET_ERROR_TYPE_DEBUG,
1408        "Asked to look for data of type %u under key `%s'\n",
1409        (unsigned int) type, GNUNET_h2s (key));
1410   qc.rc.proc = proc;
1411   qc.rc.proc_cls = proc_cls;
1412   qe = make_queue_entry (h, sizeof (struct GetMessage), queue_priority,
1413                          max_queue_size, timeout, &process_result_message, &qc);
1414   if (qe == NULL)
1415   {
1416     LOG (GNUNET_ERROR_TYPE_DEBUG, "Could not queue request for `%s'\n",
1417          GNUNET_h2s (key));
1418     return NULL;
1419   }
1420 #if INSANE_STATISTICS
1421   GNUNET_STATISTICS_update (h->stats, gettext_noop ("# GET requests executed"),
1422                             1, GNUNET_NO);
1423 #endif
1424   gm = (struct GetMessage *) &qe[1];
1425   gm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1426   gm->type = htonl (type);
1427   gm->offset = GNUNET_htonll (offset);
1428   if (key != NULL)
1429   {
1430     gm->header.size = htons (sizeof (struct GetMessage));
1431     gm->key = *key;
1432   }
1433   else
1434   {
1435     gm->header.size =
1436         htons (sizeof (struct GetMessage) - sizeof (struct GNUNET_HashCode));
1437   }
1438   process_queue (h);
1439   return qe;
1440 }
1441
1442
1443 /**
1444  * Cancel a datastore operation.  The final callback from the
1445  * operation must not have been done yet.
1446  *
1447  * @param qe operation to cancel
1448  */
1449 void
1450 GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
1451 {
1452   struct GNUNET_DATASTORE_Handle *h;
1453
1454   GNUNET_assert (GNUNET_SYSERR != qe->was_transmitted);
1455   h = qe->h;
1456   LOG (GNUNET_ERROR_TYPE_DEBUG,
1457        "Pending DATASTORE request %p cancelled (%d, %d)\n", qe,
1458        qe->was_transmitted, h->queue_head == qe);
1459   if (GNUNET_YES == qe->was_transmitted)
1460   {
1461     free_queue_entry (qe);
1462     h->skip_next_messages++;
1463     return;
1464   }
1465   free_queue_entry (qe);
1466   process_queue (h);
1467 }
1468
1469
1470 /* end of datastore_api.c */