3ab3d2503f2f2e167744942ae89e5f85bc3e1a84
[oweals/gnunet.git] / src / datastore / datastore_api.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/datastore_api.c
23  * @brief Management for the datastore for files stored on a GNUnet node.  Implements
24  *        a priority queue for requests (with timeouts).
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet_arm_service.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_datastore_service.h"
31 #include "gnunet_statistics_service.h"
32 #include "datastore.h"
33
34 #define LOG(kind,...) GNUNET_log_from (kind, "datastore-api",__VA_ARGS__)
35
36 /**
37  * If a client stopped asking for more results, how many more do
38  * we receive from the DB before killing the connection?  Trade-off
39  * between re-doing TCP handshakes and (needlessly) receiving
40  * useless results.
41  */
42 #define MAX_EXCESS_RESULTS 8
43
44 /**
45  * Context for processing status messages.
46  */
47 struct StatusContext
48 {
49   /**
50    * Continuation to call with the status.
51    */
52   GNUNET_DATASTORE_ContinuationWithStatus cont;
53
54   /**
55    * Closure for cont.
56    */
57   void *cont_cls;
58
59 };
60
61
62 /**
63  * Context for processing result messages.
64  */
65 struct ResultContext
66 {
67   /**
68    * Function to call with the result.
69    */
70   GNUNET_DATASTORE_DatumProcessor proc;
71
72   /**
73    * Closure for proc.
74    */
75   void *proc_cls;
76
77 };
78
79
80 /**
81  *  Context for a queue operation.
82  */
83 union QueueContext
84 {
85
86   struct StatusContext sc;
87
88   struct ResultContext rc;
89
90 };
91
92
93
94 /**
95  * Entry in our priority queue.
96  */
97 struct GNUNET_DATASTORE_QueueEntry
98 {
99
100   /**
101    * This is a linked list.
102    */
103   struct GNUNET_DATASTORE_QueueEntry *next;
104
105   /**
106    * This is a linked list.
107    */
108   struct GNUNET_DATASTORE_QueueEntry *prev;
109
110   /**
111    * Handle to the master context.
112    */
113   struct GNUNET_DATASTORE_Handle *h;
114
115   /**
116    * Response processor (NULL if we are not waiting for a response).
117    * This struct should be used for the closure, function-specific
118    * arguments can be passed via 'qc'.
119    */
120   GNUNET_CLIENT_MessageHandler response_proc;
121
122   /**
123    * Function to call after transmission of the request.
124    */
125   GNUNET_DATASTORE_ContinuationWithStatus cont;
126
127   /**
128    * Closure for 'cont'.
129    */
130   void *cont_cls;
131
132   /**
133    * Context for the operation.
134    */
135   union QueueContext qc;
136
137   /**
138    * Task for timeout signalling.
139    */
140   GNUNET_SCHEDULER_TaskIdentifier task;
141
142   /**
143    * Timeout for the current operation.
144    */
145   struct GNUNET_TIME_Absolute timeout;
146
147   /**
148    * Priority in the queue.
149    */
150   unsigned int priority;
151
152   /**
153    * Maximum allowed length of queue (otherwise
154    * this request should be discarded).
155    */
156   unsigned int max_queue;
157
158   /**
159    * Number of bytes in the request message following
160    * this struct.  32-bit value for nicer memory
161    * access (and overall struct alignment).
162    */
163   uint32_t message_size;
164
165   /**
166    * Has this message been transmitted to the service?
167    * Only ever GNUNET_YES for the head of the queue.
168    * Note that the overall struct should end at a
169    * multiple of 64 bits.
170    */
171   int was_transmitted;
172
173 };
174
175 /**
176  * Handle to the datastore service.
177  */
178 struct GNUNET_DATASTORE_Handle
179 {
180
181   /**
182    * Our configuration.
183    */
184   const struct GNUNET_CONFIGURATION_Handle *cfg;
185
186   /**
187    * Current connection to the datastore service.
188    */
189   struct GNUNET_CLIENT_Connection *client;
190
191   /**
192    * Handle for statistics.
193    */
194   struct GNUNET_STATISTICS_Handle *stats;
195
196   /**
197    * Current transmit handle.
198    */
199   struct GNUNET_CLIENT_TransmitHandle *th;
200
201   /**
202    * Current head of priority queue.
203    */
204   struct GNUNET_DATASTORE_QueueEntry *queue_head;
205
206   /**
207    * Current tail of priority queue.
208    */
209   struct GNUNET_DATASTORE_QueueEntry *queue_tail;
210
211   /**
212    * Task for trying to reconnect.
213    */
214   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
215
216   /**
217    * How quickly should we retry?  Used for exponential back-off on
218    * connect-errors.
219    */
220   struct GNUNET_TIME_Relative retry_time;
221
222   /**
223    * Number of entries in the queue.
224    */
225   unsigned int queue_size;
226
227   /**
228    * Number of results we're receiving for the current query
229    * after application stopped to care.  Used to determine when
230    * to reset the connection.
231    */
232   unsigned int result_count;
233
234   /**
235    * Are we currently trying to receive from the service?
236    */
237   int in_receive;
238
239   /**
240    * We should ignore the next message(s) from the service.
241    */
242   unsigned int skip_next_messages;
243
244 };
245
246
247
248 /**
249  * Connect to the datastore service.
250  *
251  * @param cfg configuration to use
252  * @return handle to use to access the service
253  */
254 struct GNUNET_DATASTORE_Handle *
255 GNUNET_DATASTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
256 {
257   struct GNUNET_CLIENT_Connection *c;
258   struct GNUNET_DATASTORE_Handle *h;
259
260   c = GNUNET_CLIENT_connect ("datastore", cfg);
261   if (c == NULL)
262     return NULL;                /* oops */
263   h = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_Handle) +
264                      GNUNET_SERVER_MAX_MESSAGE_SIZE - 1);
265   h->client = c;
266   h->cfg = cfg;
267   h->stats = GNUNET_STATISTICS_create ("datastore-api", cfg);
268   return h;
269 }
270
271
272 /**
273  * Transmit DROP message to datastore service.
274  *
275  * @param cls the 'struct GNUNET_DATASTORE_Handle'
276  * @param size number of bytes that can be copied to buf
277  * @param buf where to copy the drop message
278  * @return number of bytes written to buf
279  */
280 static size_t
281 transmit_drop (void *cls, size_t size, void *buf)
282 {
283   struct GNUNET_DATASTORE_Handle *h = cls;
284   struct GNUNET_MessageHeader *hdr;
285
286   if (buf == NULL)
287   {
288     LOG (GNUNET_ERROR_TYPE_WARNING,
289          _("Failed to transmit request to drop database.\n"));
290     GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
291     return 0;
292   }
293   GNUNET_assert (size >= sizeof (struct GNUNET_MessageHeader));
294   hdr = buf;
295   hdr->size = htons (sizeof (struct GNUNET_MessageHeader));
296   hdr->type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
297   GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
298   return sizeof (struct GNUNET_MessageHeader);
299 }
300
301
302 /**
303  * Disconnect from the datastore service (and free
304  * associated resources).
305  *
306  * @param h handle to the datastore
307  * @param drop set to GNUNET_YES to delete all data in datastore (!)
308  */
309 void
310 GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h, int drop)
311 {
312   struct GNUNET_DATASTORE_QueueEntry *qe;
313
314 #if DEBUG_DATASTORE
315   LOG (GNUNET_ERROR_TYPE_DEBUG, "Datastore disconnect\n");
316 #endif
317   if (NULL != h->th)
318   {
319     GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
320     h->th = NULL;
321   }
322   if (h->client != NULL)
323   {
324     GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
325     h->client = NULL;
326   }
327   if (h->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
328   {
329     GNUNET_SCHEDULER_cancel (h->reconnect_task);
330     h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
331   }
332   while (NULL != (qe = h->queue_head))
333   {
334     GNUNET_assert (NULL != qe->response_proc);
335     qe->response_proc (h, NULL);
336   }
337   if (GNUNET_YES == drop)
338   {
339     h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
340     if (h->client != NULL)
341     {
342       if (NULL !=
343           GNUNET_CLIENT_notify_transmit_ready (h->client,
344                                                sizeof (struct
345                                                        GNUNET_MessageHeader),
346                                                GNUNET_TIME_UNIT_MINUTES,
347                                                GNUNET_YES, &transmit_drop, h))
348         return;
349       GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
350       h->client = NULL;
351     }
352     GNUNET_break (0);
353   }
354   GNUNET_STATISTICS_destroy (h->stats, GNUNET_NO);
355   h->stats = NULL;
356   GNUNET_free (h);
357 }
358
359
360 /**
361  * A request has timed out (before being transmitted to the service).
362  *
363  * @param cls the 'struct GNUNET_DATASTORE_QueueEntry'
364  * @param tc scheduler context
365  */
366 static void
367 timeout_queue_entry (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
368 {
369   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
370
371   GNUNET_STATISTICS_update (qe->h->stats,
372                             gettext_noop ("# queue entry timeouts"), 1,
373                             GNUNET_NO);
374   qe->task = GNUNET_SCHEDULER_NO_TASK;
375   GNUNET_assert (qe->was_transmitted == GNUNET_NO);
376 #if DEBUG_DATASTORE
377   LOG (GNUNET_ERROR_TYPE_DEBUG, "Timeout of request in datastore queue\n");
378 #endif
379   qe->response_proc (qe->h, NULL);
380 }
381
382
383 /**
384  * Create a new entry for our priority queue (and possibly discard other entires if
385  * the queue is getting too long).
386  *
387  * @param h handle to the datastore
388  * @param msize size of the message to queue
389  * @param queue_priority priority of the entry
390  * @param max_queue_size at what queue size should this request be dropped
391  *        (if other requests of higher priority are in the queue)
392  * @param timeout timeout for the operation
393  * @param response_proc function to call with replies (can be NULL)
394  * @param qc client context (NOT a closure for response_proc)
395  * @return NULL if the queue is full
396  */
397 static struct GNUNET_DATASTORE_QueueEntry *
398 make_queue_entry (struct GNUNET_DATASTORE_Handle *h, size_t msize,
399                   unsigned int queue_priority, unsigned int max_queue_size,
400                   struct GNUNET_TIME_Relative timeout,
401                   GNUNET_CLIENT_MessageHandler response_proc,
402                   const union QueueContext *qc)
403 {
404   struct GNUNET_DATASTORE_QueueEntry *ret;
405   struct GNUNET_DATASTORE_QueueEntry *pos;
406   unsigned int c;
407
408   c = 0;
409   pos = h->queue_head;
410   while ((pos != NULL) && (c < max_queue_size) &&
411          (pos->priority >= queue_priority))
412   {
413     c++;
414     pos = pos->next;
415   }
416   if (c >= max_queue_size)
417   {
418     GNUNET_STATISTICS_update (h->stats, gettext_noop ("# queue overflows"), 1,
419                               GNUNET_NO);
420     return NULL;
421   }
422   ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize);
423   ret->h = h;
424   ret->response_proc = response_proc;
425   ret->qc = *qc;
426   ret->timeout = GNUNET_TIME_relative_to_absolute (timeout);
427   ret->priority = queue_priority;
428   ret->max_queue = max_queue_size;
429   ret->message_size = msize;
430   ret->was_transmitted = GNUNET_NO;
431   if (pos == NULL)
432   {
433     /* append at the tail */
434     pos = h->queue_tail;
435   }
436   else
437   {
438     pos = pos->prev;
439     /* do not insert at HEAD if HEAD query was already
440      * transmitted and we are still receiving replies! */
441     if ((pos == NULL) && (h->queue_head->was_transmitted))
442       pos = h->queue_head;
443   }
444   c++;
445   GNUNET_STATISTICS_update (h->stats, gettext_noop ("# queue entries created"),
446                             1, GNUNET_NO);
447   GNUNET_CONTAINER_DLL_insert_after (h->queue_head, h->queue_tail, pos, ret);
448   h->queue_size++;
449   ret->task = GNUNET_SCHEDULER_add_delayed (timeout, &timeout_queue_entry, ret);
450   pos = ret->next;
451   while (pos != NULL)
452   {
453     if ((pos->max_queue < h->queue_size) && (pos->was_transmitted == GNUNET_NO))
454     {
455       GNUNET_assert (pos->response_proc != NULL);
456       /* move 'pos' element to head so that it will be
457        * killed on 'NULL' call below */
458 #if DEBUG_DATASTORE
459       LOG (GNUNET_ERROR_TYPE_DEBUG, "Dropping request from datastore queue\n");
460 #endif
461       GNUNET_CONTAINER_DLL_remove (h->queue_head, h->queue_tail, pos);
462       GNUNET_CONTAINER_DLL_insert (h->queue_head, h->queue_tail, pos);
463       GNUNET_STATISTICS_update (h->stats,
464                                 gettext_noop
465                                 ("# Requests dropped from datastore queue"), 1,
466                                 GNUNET_NO);
467       GNUNET_assert (h->queue_head == pos);
468       pos->response_proc (h, NULL);
469       break;
470     }
471     pos = pos->next;
472   }
473   return ret;
474 }
475
476
477 /**
478  * Process entries in the queue (or do nothing if we are already
479  * doing so).
480  *
481  * @param h handle to the datastore
482  */
483 static void
484 process_queue (struct GNUNET_DATASTORE_Handle *h);
485
486
487 /**
488  * Try reconnecting to the datastore service.
489  *
490  * @param cls the 'struct GNUNET_DATASTORE_Handle'
491  * @param tc scheduler context
492  */
493 static void
494 try_reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
495 {
496   struct GNUNET_DATASTORE_Handle *h = cls;
497
498   if (h->retry_time.rel_value < GNUNET_CONSTANTS_SERVICE_RETRY.rel_value)
499     h->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY;
500   else
501     h->retry_time = GNUNET_TIME_relative_multiply (h->retry_time, 2);
502   if (h->retry_time.rel_value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.rel_value)
503     h->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
504   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
505   h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
506   if (h->client == NULL)
507   {
508     LOG (GNUNET_ERROR_TYPE_ERROR, "DATASTORE reconnect failed (fatally)\n");
509     return;
510   }
511   GNUNET_STATISTICS_update (h->stats,
512                             gettext_noop
513                             ("# datastore connections (re)created"), 1,
514                             GNUNET_NO);
515 #if DEBUG_DATASTORE
516   LOG (GNUNET_ERROR_TYPE_DEBUG, "Reconnected to DATASTORE\n");
517 #endif
518   process_queue (h);
519 }
520
521
522 /**
523  * Disconnect from the service and then try reconnecting to the datastore service
524  * after some delay.
525  *
526  * @param h handle to datastore to disconnect and reconnect
527  */
528 static void
529 do_disconnect (struct GNUNET_DATASTORE_Handle *h)
530 {
531   if (h->client == NULL)
532   {
533 #if DEBUG_DATASTORE
534     LOG (GNUNET_ERROR_TYPE_DEBUG,
535          "client NULL in disconnect, will not try to reconnect\n");
536 #endif
537     return;
538   }
539 #if 0
540   GNUNET_STATISTICS_update (stats, gettext_noop ("# reconnected to DATASTORE"),
541                             1, GNUNET_NO);
542 #endif
543   GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
544   h->skip_next_messages = 0;
545   h->client = NULL;
546   h->reconnect_task =
547       GNUNET_SCHEDULER_add_delayed (h->retry_time, &try_reconnect, h);
548 }
549
550
551 /**
552  * Function called whenever we receive a message from
553  * the service.  Calls the appropriate handler.
554  *
555  * @param cls the 'struct GNUNET_DATASTORE_Handle'
556  * @param msg the received message
557  */
558 static void
559 receive_cb (void *cls, const struct GNUNET_MessageHeader *msg)
560 {
561   struct GNUNET_DATASTORE_Handle *h = cls;
562   struct GNUNET_DATASTORE_QueueEntry *qe;
563
564   h->in_receive = GNUNET_NO;
565 #if DEBUG_DATASTORE
566   LOG (GNUNET_ERROR_TYPE_DEBUG, "Receiving reply from datastore\n");
567 #endif
568   if (h->skip_next_messages > 0)
569   {
570     h->skip_next_messages--;
571     process_queue (h);
572     return;
573   }
574   if (NULL == (qe = h->queue_head))
575   {
576     GNUNET_break (0);
577     process_queue (h);
578     return;
579   }
580   qe->response_proc (h, msg);
581 }
582
583
584 /**
585  * Transmit request from queue to datastore service.
586  *
587  * @param cls the 'struct GNUNET_DATASTORE_Handle'
588  * @param size number of bytes that can be copied to buf
589  * @param buf where to copy the drop message
590  * @return number of bytes written to buf
591  */
592 static size_t
593 transmit_request (void *cls, size_t size, void *buf)
594 {
595   struct GNUNET_DATASTORE_Handle *h = cls;
596   struct GNUNET_DATASTORE_QueueEntry *qe;
597   size_t msize;
598
599   h->th = NULL;
600   if (NULL == (qe = h->queue_head))
601     return 0;                   /* no entry in queue */
602   if (buf == NULL)
603   {
604     LOG (GNUNET_ERROR_TYPE_WARNING,
605          _("Failed to transmit request to DATASTORE.\n"));
606     GNUNET_STATISTICS_update (h->stats,
607                               gettext_noop ("# transmission request failures"),
608                               1, GNUNET_NO);
609     do_disconnect (h);
610     return 0;
611   }
612   if (size < (msize = qe->message_size))
613   {
614     process_queue (h);
615     return 0;
616   }
617 #if DEBUG_DATASTORE
618   LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmitting %u byte request to DATASTORE\n",
619        msize);
620 #endif
621   memcpy (buf, &qe[1], msize);
622   qe->was_transmitted = GNUNET_YES;
623   GNUNET_SCHEDULER_cancel (qe->task);
624   qe->task = GNUNET_SCHEDULER_NO_TASK;
625   GNUNET_assert (GNUNET_NO == h->in_receive);
626   h->in_receive = GNUNET_YES;
627   GNUNET_CLIENT_receive (h->client, &receive_cb, h,
628                          GNUNET_TIME_absolute_get_remaining (qe->timeout));
629   GNUNET_STATISTICS_update (h->stats,
630                             gettext_noop ("# bytes sent to datastore"), 1,
631                             GNUNET_NO);
632   return msize;
633 }
634
635
636 /**
637  * Process entries in the queue (or do nothing if we are already
638  * doing so).
639  *
640  * @param h handle to the datastore
641  */
642 static void
643 process_queue (struct GNUNET_DATASTORE_Handle *h)
644 {
645   struct GNUNET_DATASTORE_QueueEntry *qe;
646
647   if (NULL == (qe = h->queue_head))
648   {
649 #if DEBUG_DATASTORE > 1
650     LOG (GNUNET_ERROR_TYPE_DEBUG, "Queue empty\n");
651 #endif
652     return;                     /* no entry in queue */
653   }
654   if (qe->was_transmitted == GNUNET_YES)
655   {
656 #if DEBUG_DATASTORE > 1
657     LOG (GNUNET_ERROR_TYPE_DEBUG, "Head request already transmitted\n");
658 #endif
659     return;                     /* waiting for replies */
660   }
661   if (h->th != NULL)
662   {
663 #if DEBUG_DATASTORE > 1
664     LOG (GNUNET_ERROR_TYPE_DEBUG, "Pending transmission request\n");
665 #endif
666     return;                     /* request pending */
667   }
668   if (h->client == NULL)
669   {
670 #if DEBUG_DATASTORE > 1
671     LOG (GNUNET_ERROR_TYPE_DEBUG, "Not connected\n");
672 #endif
673     return;                     /* waiting for reconnect */
674   }
675   if (GNUNET_YES == h->in_receive)
676   {
677     /* wait for response to previous query */
678     return;
679   }
680 #if DEBUG_DATASTORE
681   LOG (GNUNET_ERROR_TYPE_DEBUG, "Queueing %u byte request to DATASTORE\n",
682        qe->message_size);
683 #endif
684   h->th =
685       GNUNET_CLIENT_notify_transmit_ready (h->client, qe->message_size,
686                                            GNUNET_TIME_absolute_get_remaining
687                                            (qe->timeout), GNUNET_YES,
688                                            &transmit_request, h);
689   GNUNET_assert (GNUNET_NO == h->in_receive);
690   GNUNET_break (NULL != h->th);
691 }
692
693
694 /**
695  * Dummy continuation used to do nothing (but be non-zero).
696  *
697  * @param cls closure
698  * @param result result
699  * @param emsg error message
700  */
701 static void
702 drop_status_cont (void *cls, int32_t result, const char *emsg)
703 {
704   /* do nothing */
705 }
706
707
708 /**
709  * Free a queue entry.  Removes the given entry from the
710  * queue and releases associated resources.  Does NOT
711  * call the callback.
712  *
713  * @param qe entry to free.
714  */
715 static void
716 free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
717 {
718   struct GNUNET_DATASTORE_Handle *h = qe->h;
719
720   GNUNET_CONTAINER_DLL_remove (h->queue_head, h->queue_tail, qe);
721   if (qe->task != GNUNET_SCHEDULER_NO_TASK)
722   {
723     GNUNET_SCHEDULER_cancel (qe->task);
724     qe->task = GNUNET_SCHEDULER_NO_TASK;
725   }
726   h->queue_size--;
727   qe->was_transmitted = GNUNET_SYSERR;  /* use-after-free warning */
728   GNUNET_free (qe);
729 }
730
731
732 /**
733  * Type of a function to call when we receive a message
734  * from the service.
735  *
736  * @param cls closure
737  * @param msg message received, NULL on timeout or fatal error
738  */
739 static void
740 process_status_message (void *cls, const struct GNUNET_MessageHeader *msg)
741 {
742   struct GNUNET_DATASTORE_Handle *h = cls;
743   struct GNUNET_DATASTORE_QueueEntry *qe;
744   struct StatusContext rc;
745   const struct StatusMessage *sm;
746   const char *emsg;
747   int32_t status;
748   int was_transmitted;
749
750   if (NULL == (qe = h->queue_head))
751   {
752     GNUNET_break (0);
753     do_disconnect (h);
754     return;
755   }
756   rc = qe->qc.sc;
757   if (msg == NULL)
758   {
759     was_transmitted = qe->was_transmitted;
760     free_queue_entry (qe);
761     if (was_transmitted == GNUNET_YES)
762       do_disconnect (h);
763     else
764       process_queue (h);
765     if (rc.cont != NULL)
766       rc.cont (rc.cont_cls, GNUNET_SYSERR,
767                _("Failed to receive status response from database."));
768     return;
769   }
770   GNUNET_assert (GNUNET_YES == qe->was_transmitted);
771   free_queue_entry (qe);
772   if ((ntohs (msg->size) < sizeof (struct StatusMessage)) ||
773       (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS))
774   {
775     GNUNET_break (0);
776     h->retry_time = GNUNET_TIME_UNIT_ZERO;
777     do_disconnect (h);
778     if (rc.cont != NULL)
779       rc.cont (rc.cont_cls, GNUNET_SYSERR,
780                _("Error reading response from datastore service"));
781     return;
782   }
783   sm = (const struct StatusMessage *) msg;
784   status = ntohl (sm->status);
785   emsg = NULL;
786   if (ntohs (msg->size) > sizeof (struct StatusMessage))
787   {
788     emsg = (const char *) &sm[1];
789     if (emsg[ntohs (msg->size) - sizeof (struct StatusMessage) - 1] != '\0')
790     {
791       GNUNET_break (0);
792       emsg = _("Invalid error message received from datastore service");
793     }
794   }
795   if ((status == GNUNET_SYSERR) && (emsg == NULL))
796   {
797     GNUNET_break (0);
798     emsg = _("Invalid error message received from datastore service");
799   }
800 #if DEBUG_DATASTORE
801   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received status %d/%s\n", (int) status, emsg);
802 #endif
803   GNUNET_STATISTICS_update (h->stats,
804                             gettext_noop ("# status messages received"), 1,
805                             GNUNET_NO);
806   h->retry_time.rel_value = 0;
807   process_queue (h);
808   if (rc.cont != NULL)
809     rc.cont (rc.cont_cls, status, emsg);
810 }
811
812
813 /**
814  * Store an item in the datastore.  If the item is already present,
815  * the priorities are summed up and the higher expiration time and
816  * lower anonymity level is used.
817  *
818  * @param h handle to the datastore
819  * @param rid reservation ID to use (from "reserve"); use 0 if no
820  *            prior reservation was made
821  * @param key key for the value
822  * @param size number of bytes in data
823  * @param data content stored
824  * @param type type of the content
825  * @param priority priority of the content
826  * @param anonymity anonymity-level for the content
827  * @param replication how often should the content be replicated to other peers?
828  * @param expiration expiration time for the content
829  * @param queue_priority ranking of this request in the priority queue
830  * @param max_queue_size at what queue size should this request be dropped
831  *        (if other requests of higher priority are in the queue)
832  * @param timeout timeout for the operation
833  * @param cont continuation to call when done
834  * @param cont_cls closure for cont
835  * @return NULL if the entry was not queued, otherwise a handle that can be used to
836  *         cancel; note that even if NULL is returned, the callback will be invoked
837  *         (or rather, will already have been invoked)
838  */
839 struct GNUNET_DATASTORE_QueueEntry *
840 GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, uint32_t rid,
841                       const GNUNET_HashCode * key, size_t size,
842                       const void *data, enum GNUNET_BLOCK_Type type,
843                       uint32_t priority, uint32_t anonymity,
844                       uint32_t replication,
845                       struct GNUNET_TIME_Absolute expiration,
846                       unsigned int queue_priority, unsigned int max_queue_size,
847                       struct GNUNET_TIME_Relative timeout,
848                       GNUNET_DATASTORE_ContinuationWithStatus cont,
849                       void *cont_cls)
850 {
851   struct GNUNET_DATASTORE_QueueEntry *qe;
852   struct DataMessage *dm;
853   size_t msize;
854   union QueueContext qc;
855
856 #if DEBUG_DATASTORE
857   LOG (GNUNET_ERROR_TYPE_DEBUG,
858        "Asked to put %u bytes of data under key `%s' for %llu ms\n", size,
859        GNUNET_h2s (key),
860        GNUNET_TIME_absolute_get_remaining (expiration).rel_value);
861 #endif
862   msize = sizeof (struct DataMessage) + size;
863   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
864   qc.sc.cont = cont;
865   qc.sc.cont_cls = cont_cls;
866   qe = make_queue_entry (h, msize, queue_priority, max_queue_size, timeout,
867                          &process_status_message, &qc);
868   if (qe == NULL)
869   {
870 #if DEBUG_DATASTORE
871     LOG (GNUNET_ERROR_TYPE_DEBUG, "Could not create queue entry for PUT\n");
872 #endif
873     return NULL;
874   }
875   GNUNET_STATISTICS_update (h->stats, gettext_noop ("# PUT requests executed"),
876                             1, GNUNET_NO);
877   dm = (struct DataMessage *) &qe[1];
878   dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
879   dm->header.size = htons (msize);
880   dm->rid = htonl (rid);
881   dm->size = htonl ((uint32_t) size);
882   dm->type = htonl (type);
883   dm->priority = htonl (priority);
884   dm->anonymity = htonl (anonymity);
885   dm->replication = htonl (replication);
886   dm->reserved = htonl (0);
887   dm->uid = GNUNET_htonll (0);
888   dm->expiration = GNUNET_TIME_absolute_hton (expiration);
889   dm->key = *key;
890   memcpy (&dm[1], data, size);
891   process_queue (h);
892   return qe;
893 }
894
895
896 /**
897  * Reserve space in the datastore.  This function should be used
898  * to avoid "out of space" failures during a longer sequence of "put"
899  * operations (for example, when a file is being inserted).
900  *
901  * @param h handle to the datastore
902  * @param amount how much space (in bytes) should be reserved (for content only)
903  * @param entries how many entries will be created (to calculate per-entry overhead)
904  * @param queue_priority ranking of this request in the priority queue
905  * @param max_queue_size at what queue size should this request be dropped
906  *        (if other requests of higher priority are in the queue)
907  * @param timeout how long to wait at most for a response (or before dying in queue)
908  * @param cont continuation to call when done; "success" will be set to
909  *             a positive reservation value if space could be reserved.
910  * @param cont_cls closure for cont
911  * @return NULL if the entry was not queued, otherwise a handle that can be used to
912  *         cancel; note that even if NULL is returned, the callback will be invoked
913  *         (or rather, will already have been invoked)
914  */
915 struct GNUNET_DATASTORE_QueueEntry *
916 GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, uint64_t amount,
917                           uint32_t entries, unsigned int queue_priority,
918                           unsigned int max_queue_size,
919                           struct GNUNET_TIME_Relative timeout,
920                           GNUNET_DATASTORE_ContinuationWithStatus cont,
921                           void *cont_cls)
922 {
923   struct GNUNET_DATASTORE_QueueEntry *qe;
924   struct ReserveMessage *rm;
925   union QueueContext qc;
926
927   if (cont == NULL)
928     cont = &drop_status_cont;
929 #if DEBUG_DATASTORE
930   LOG (GNUNET_ERROR_TYPE_DEBUG,
931        "Asked to reserve %llu bytes of data and %u entries\n",
932        (unsigned long long) amount, (unsigned int) entries);
933 #endif
934   qc.sc.cont = cont;
935   qc.sc.cont_cls = cont_cls;
936   qe = make_queue_entry (h, sizeof (struct ReserveMessage), queue_priority,
937                          max_queue_size, timeout, &process_status_message, &qc);
938   if (qe == NULL)
939   {
940 #if DEBUG_DATASTORE
941     LOG (GNUNET_ERROR_TYPE_DEBUG, "Could not create queue entry to reserve\n");
942 #endif
943     return NULL;
944   }
945   GNUNET_STATISTICS_update (h->stats,
946                             gettext_noop ("# RESERVE requests executed"), 1,
947                             GNUNET_NO);
948   rm = (struct ReserveMessage *) &qe[1];
949   rm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
950   rm->header.size = htons (sizeof (struct ReserveMessage));
951   rm->entries = htonl (entries);
952   rm->amount = GNUNET_htonll (amount);
953   process_queue (h);
954   return qe;
955 }
956
957
958 /**
959  * Signal that all of the data for which a reservation was made has
960  * been stored and that whatever excess space might have been reserved
961  * can now be released.
962  *
963  * @param h handle to the datastore
964  * @param rid reservation ID (value of "success" in original continuation
965  *        from the "reserve" function).
966  * @param queue_priority ranking of this request in the priority queue
967  * @param max_queue_size at what queue size should this request be dropped
968  *        (if other requests of higher priority are in the queue)
969  * @param queue_priority ranking of this request in the priority queue
970  * @param max_queue_size at what queue size should this request be dropped
971  *        (if other requests of higher priority are in the queue)
972  * @param timeout how long to wait at most for a response
973  * @param cont continuation to call when done
974  * @param cont_cls closure for cont
975  * @return NULL if the entry was not queued, otherwise a handle that can be used to
976  *         cancel; note that even if NULL is returned, the callback will be invoked
977  *         (or rather, will already have been invoked)
978  */
979 struct GNUNET_DATASTORE_QueueEntry *
980 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
981                                   uint32_t rid, unsigned int queue_priority,
982                                   unsigned int max_queue_size,
983                                   struct GNUNET_TIME_Relative timeout,
984                                   GNUNET_DATASTORE_ContinuationWithStatus cont,
985                                   void *cont_cls)
986 {
987   struct GNUNET_DATASTORE_QueueEntry *qe;
988   struct ReleaseReserveMessage *rrm;
989   union QueueContext qc;
990
991   if (cont == NULL)
992     cont = &drop_status_cont;
993 #if DEBUG_DATASTORE
994   LOG (GNUNET_ERROR_TYPE_DEBUG, "Asked to release reserve %d\n", rid);
995 #endif
996   qc.sc.cont = cont;
997   qc.sc.cont_cls = cont_cls;
998   qe = make_queue_entry (h, sizeof (struct ReleaseReserveMessage),
999                          queue_priority, max_queue_size, timeout,
1000                          &process_status_message, &qc);
1001   if (qe == NULL)
1002   {
1003 #if DEBUG_DATASTORE
1004     LOG (GNUNET_ERROR_TYPE_DEBUG,
1005          "Could not create queue entry to release reserve\n");
1006 #endif
1007     return NULL;
1008   }
1009   GNUNET_STATISTICS_update (h->stats,
1010                             gettext_noop
1011                             ("# RELEASE RESERVE requests executed"), 1,
1012                             GNUNET_NO);
1013   rrm = (struct ReleaseReserveMessage *) &qe[1];
1014   rrm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
1015   rrm->header.size = htons (sizeof (struct ReleaseReserveMessage));
1016   rrm->rid = htonl (rid);
1017   process_queue (h);
1018   return qe;
1019 }
1020
1021
1022 /**
1023  * Update a value in the datastore.
1024  *
1025  * @param h handle to the datastore
1026  * @param uid identifier for the value
1027  * @param priority how much to increase the priority of the value
1028  * @param expiration new expiration value should be MAX of existing and this argument
1029  * @param queue_priority ranking of this request in the priority queue
1030  * @param max_queue_size at what queue size should this request be dropped
1031  *        (if other requests of higher priority are in the queue)
1032  * @param timeout how long to wait at most for a response
1033  * @param cont continuation to call when done
1034  * @param cont_cls closure for cont
1035  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1036  *         cancel; note that even if NULL is returned, the callback will be invoked
1037  *         (or rather, will already have been invoked)
1038  */
1039 struct GNUNET_DATASTORE_QueueEntry *
1040 GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, uint64_t uid,
1041                          uint32_t priority,
1042                          struct GNUNET_TIME_Absolute expiration,
1043                          unsigned int queue_priority,
1044                          unsigned int max_queue_size,
1045                          struct GNUNET_TIME_Relative timeout,
1046                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1047                          void *cont_cls)
1048 {
1049   struct GNUNET_DATASTORE_QueueEntry *qe;
1050   struct UpdateMessage *um;
1051   union QueueContext qc;
1052
1053   if (cont == NULL)
1054     cont = &drop_status_cont;
1055 #if DEBUG_DATASTORE
1056   LOG (GNUNET_ERROR_TYPE_DEBUG,
1057        "Asked to update entry %llu raising priority by %u and expiration to %llu\n",
1058        uid, (unsigned int) priority, (unsigned long long) expiration.abs_value);
1059 #endif
1060   qc.sc.cont = cont;
1061   qc.sc.cont_cls = cont_cls;
1062   qe = make_queue_entry (h, sizeof (struct UpdateMessage), queue_priority,
1063                          max_queue_size, timeout, &process_status_message, &qc);
1064   if (qe == NULL)
1065   {
1066 #if DEBUG_DATASTORE
1067     LOG (GNUNET_ERROR_TYPE_DEBUG, "Could not create queue entry for UPDATE\n");
1068 #endif
1069     return NULL;
1070   }
1071   GNUNET_STATISTICS_update (h->stats,
1072                             gettext_noop ("# UPDATE requests executed"), 1,
1073                             GNUNET_NO);
1074   um = (struct UpdateMessage *) &qe[1];
1075   um->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
1076   um->header.size = htons (sizeof (struct UpdateMessage));
1077   um->priority = htonl (priority);
1078   um->expiration = GNUNET_TIME_absolute_hton (expiration);
1079   um->uid = GNUNET_htonll (uid);
1080   process_queue (h);
1081   return qe;
1082 }
1083
1084
1085 /**
1086  * Explicitly remove some content from the database.
1087  * The "cont"inuation will be called with status
1088  * "GNUNET_OK" if content was removed, "GNUNET_NO"
1089  * if no matching entry was found and "GNUNET_SYSERR"
1090  * on all other types of errors.
1091  *
1092  * @param h handle to the datastore
1093  * @param key key for the value
1094  * @param size number of bytes in data
1095  * @param data content stored
1096  * @param queue_priority ranking of this request in the priority queue
1097  * @param max_queue_size at what queue size should this request be dropped
1098  *        (if other requests of higher priority are in the queue)
1099  * @param timeout how long to wait at most for a response
1100  * @param cont continuation to call when done
1101  * @param cont_cls closure for cont
1102  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1103  *         cancel; note that even if NULL is returned, the callback will be invoked
1104  *         (or rather, will already have been invoked)
1105  */
1106 struct GNUNET_DATASTORE_QueueEntry *
1107 GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
1108                          const GNUNET_HashCode * key, size_t size,
1109                          const void *data, unsigned int queue_priority,
1110                          unsigned int max_queue_size,
1111                          struct GNUNET_TIME_Relative timeout,
1112                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1113                          void *cont_cls)
1114 {
1115   struct GNUNET_DATASTORE_QueueEntry *qe;
1116   struct DataMessage *dm;
1117   size_t msize;
1118   union QueueContext qc;
1119
1120   if (cont == NULL)
1121     cont = &drop_status_cont;
1122 #if DEBUG_DATASTORE
1123   LOG (GNUNET_ERROR_TYPE_DEBUG, "Asked to remove %u bytes under key `%s'\n",
1124        size, GNUNET_h2s (key));
1125 #endif
1126   qc.sc.cont = cont;
1127   qc.sc.cont_cls = cont_cls;
1128   msize = sizeof (struct DataMessage) + size;
1129   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1130   qe = make_queue_entry (h, msize, queue_priority, max_queue_size, timeout,
1131                          &process_status_message, &qc);
1132   if (qe == NULL)
1133   {
1134 #if DEBUG_DATASTORE
1135     LOG (GNUNET_ERROR_TYPE_DEBUG, "Could not create queue entry for REMOVE\n");
1136 #endif
1137     return NULL;
1138   }
1139   GNUNET_STATISTICS_update (h->stats,
1140                             gettext_noop ("# REMOVE requests executed"), 1,
1141                             GNUNET_NO);
1142   dm = (struct DataMessage *) &qe[1];
1143   dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
1144   dm->header.size = htons (msize);
1145   dm->rid = htonl (0);
1146   dm->size = htonl (size);
1147   dm->type = htonl (0);
1148   dm->priority = htonl (0);
1149   dm->anonymity = htonl (0);
1150   dm->uid = GNUNET_htonll (0);
1151   dm->expiration = GNUNET_TIME_absolute_hton (GNUNET_TIME_UNIT_ZERO_ABS);
1152   dm->key = *key;
1153   memcpy (&dm[1], data, size);
1154   process_queue (h);
1155   return qe;
1156 }
1157
1158
1159 /**
1160  * Type of a function to call when we receive a message
1161  * from the service.
1162  *
1163  * @param cls closure
1164  * @param msg message received, NULL on timeout or fatal error
1165  */
1166 static void
1167 process_result_message (void *cls, const struct GNUNET_MessageHeader *msg)
1168 {
1169   struct GNUNET_DATASTORE_Handle *h = cls;
1170   struct GNUNET_DATASTORE_QueueEntry *qe;
1171   struct ResultContext rc;
1172   const struct DataMessage *dm;
1173   int was_transmitted;
1174
1175   if (msg == NULL)
1176   {
1177     qe = h->queue_head;
1178     GNUNET_assert (NULL != qe);
1179     rc = qe->qc.rc;
1180     was_transmitted = qe->was_transmitted;
1181     free_queue_entry (qe);
1182     if (was_transmitted == GNUNET_YES)
1183     {
1184       LOG (GNUNET_ERROR_TYPE_WARNING,
1185            _("Failed to receive response from database.\n"));
1186       do_disconnect (h);
1187     }
1188     else
1189     {
1190       process_queue (h);
1191     }
1192     if (rc.proc != NULL)
1193       rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
1194                0);
1195     return;
1196   }
1197   if (ntohs (msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END)
1198   {
1199     GNUNET_break (ntohs (msg->size) == sizeof (struct GNUNET_MessageHeader));
1200     qe = h->queue_head;
1201     rc = qe->qc.rc;
1202     GNUNET_assert (GNUNET_YES == qe->was_transmitted);
1203     free_queue_entry (qe);
1204 #if DEBUG_DATASTORE
1205     LOG (GNUNET_ERROR_TYPE_DEBUG,
1206          "Received end of result set, new queue size is %u\n", h->queue_size);
1207 #endif
1208     if (rc.proc != NULL)
1209       rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
1210                0);
1211     h->retry_time.rel_value = 0;
1212     h->result_count = 0;
1213     process_queue (h);
1214     return;
1215   }
1216   qe = h->queue_head;
1217   GNUNET_assert (NULL != qe);
1218   rc = qe->qc.rc;
1219   if (GNUNET_YES != qe->was_transmitted)
1220   {
1221     GNUNET_break (0);
1222     free_queue_entry (qe);
1223     h->retry_time = GNUNET_TIME_UNIT_ZERO;
1224     do_disconnect (h);
1225     if (rc.proc != NULL)
1226       rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
1227                0);
1228     return;
1229   }
1230   if ((ntohs (msg->size) < sizeof (struct DataMessage)) ||
1231       (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) ||
1232       (ntohs (msg->size) !=
1233        sizeof (struct DataMessage) +
1234        ntohl (((const struct DataMessage *) msg)->size)))
1235   {
1236     GNUNET_break (0);
1237     free_queue_entry (qe);
1238     h->retry_time = GNUNET_TIME_UNIT_ZERO;
1239     do_disconnect (h);
1240     if (rc.proc != NULL)
1241       rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
1242                0);
1243     return;
1244   }
1245   GNUNET_STATISTICS_update (h->stats, gettext_noop ("# Results received"), 1,
1246                             GNUNET_NO);
1247   dm = (const struct DataMessage *) msg;
1248 #if DEBUG_DATASTORE
1249   LOG (GNUNET_ERROR_TYPE_DEBUG,
1250        "Received result %llu with type %u and size %u with key %s\n",
1251        (unsigned long long) GNUNET_ntohll (dm->uid), ntohl (dm->type),
1252        ntohl (dm->size), GNUNET_h2s (&dm->key));
1253 #endif
1254   free_queue_entry (qe);
1255   h->retry_time.rel_value = 0;
1256   process_queue (h);
1257   if (rc.proc != NULL)
1258     rc.proc (rc.proc_cls, &dm->key, ntohl (dm->size), &dm[1], ntohl (dm->type),
1259              ntohl (dm->priority), ntohl (dm->anonymity),
1260              GNUNET_TIME_absolute_ntoh (dm->expiration),
1261              GNUNET_ntohll (dm->uid));
1262 }
1263
1264
1265 /**
1266  * Get a random value from the datastore for content replication.
1267  * Returns a single, random value among those with the highest
1268  * replication score, lowering positive replication scores by one for
1269  * the chosen value (if only content with a replication score exists,
1270  * a random value is returned and replication scores are not changed).
1271  *
1272  * @param h handle to the datastore
1273  * @param queue_priority ranking of this request in the priority queue
1274  * @param max_queue_size at what queue size should this request be dropped
1275  *        (if other requests of higher priority are in the queue)
1276  * @param timeout how long to wait at most for a response
1277  * @param proc function to call on a random value; it
1278  *        will be called once with a value (if available)
1279  *        and always once with a value of NULL.
1280  * @param proc_cls closure for proc
1281  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1282  *         cancel
1283  */
1284 struct GNUNET_DATASTORE_QueueEntry *
1285 GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
1286                                       unsigned int queue_priority,
1287                                       unsigned int max_queue_size,
1288                                       struct GNUNET_TIME_Relative timeout,
1289                                       GNUNET_DATASTORE_DatumProcessor proc,
1290                                       void *proc_cls)
1291 {
1292   struct GNUNET_DATASTORE_QueueEntry *qe;
1293   struct GNUNET_MessageHeader *m;
1294   union QueueContext qc;
1295
1296   GNUNET_assert (NULL != proc);
1297 #if DEBUG_DATASTORE
1298   LOG (GNUNET_ERROR_TYPE_DEBUG, "Asked to get replication entry in %llu ms\n",
1299        (unsigned long long) timeout.rel_value);
1300 #endif
1301   qc.rc.proc = proc;
1302   qc.rc.proc_cls = proc_cls;
1303   qe = make_queue_entry (h, sizeof (struct GNUNET_MessageHeader),
1304                          queue_priority, max_queue_size, timeout,
1305                          &process_result_message, &qc);
1306   if (qe == NULL)
1307   {
1308 #if DEBUG_DATASTORE
1309     LOG (GNUNET_ERROR_TYPE_DEBUG,
1310          "Could not create queue entry for GET REPLICATION\n");
1311 #endif
1312     return NULL;
1313   }
1314   GNUNET_STATISTICS_update (h->stats,
1315                             gettext_noop
1316                             ("# GET REPLICATION requests executed"), 1,
1317                             GNUNET_NO);
1318   m = (struct GNUNET_MessageHeader *) &qe[1];
1319   m->type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION);
1320   m->size = htons (sizeof (struct GNUNET_MessageHeader));
1321   process_queue (h);
1322   return qe;
1323 }
1324
1325
1326 /**
1327  * Get a single zero-anonymity value from the datastore.
1328  *
1329  * @param h handle to the datastore
1330  * @param offset offset of the result (modulo num-results); set to
1331  *               a random 64-bit value initially; then increment by
1332  *               one each time; detect that all results have been found by uid
1333  *               being again the first uid ever returned.
1334  * @param queue_priority ranking of this request in the priority queue
1335  * @param max_queue_size at what queue size should this request be dropped
1336  *        (if other requests of higher priority are in the queue)
1337  * @param timeout how long to wait at most for a response
1338  * @param type allowed type for the operation (never zero)
1339  * @param proc function to call on a random value; it
1340  *        will be called once with a value (if available)
1341  *        or with NULL if none value exists.
1342  * @param proc_cls closure for proc
1343  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1344  *         cancel
1345  */
1346 struct GNUNET_DATASTORE_QueueEntry *
1347 GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1348                                      uint64_t offset,
1349                                      unsigned int queue_priority,
1350                                      unsigned int max_queue_size,
1351                                      struct GNUNET_TIME_Relative timeout,
1352                                      enum GNUNET_BLOCK_Type type,
1353                                      GNUNET_DATASTORE_DatumProcessor proc,
1354                                      void *proc_cls)
1355 {
1356   struct GNUNET_DATASTORE_QueueEntry *qe;
1357   struct GetZeroAnonymityMessage *m;
1358   union QueueContext qc;
1359
1360   GNUNET_assert (NULL != proc);
1361   GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY);
1362 #if DEBUG_DATASTORE
1363   LOG (GNUNET_ERROR_TYPE_DEBUG,
1364        "Asked to get %llu-th zero-anonymity entry of type %d in %llu ms\n",
1365        (unsigned long long) offset, type,
1366        (unsigned long long) timeout.rel_value);
1367 #endif
1368   qc.rc.proc = proc;
1369   qc.rc.proc_cls = proc_cls;
1370   qe = make_queue_entry (h, sizeof (struct GetZeroAnonymityMessage),
1371                          queue_priority, max_queue_size, timeout,
1372                          &process_result_message, &qc);
1373   if (qe == NULL)
1374   {
1375 #if DEBUG_DATASTORE
1376     LOG (GNUNET_ERROR_TYPE_DEBUG,
1377          "Could not create queue entry for zero-anonymity procation\n");
1378 #endif
1379     return NULL;
1380   }
1381   GNUNET_STATISTICS_update (h->stats,
1382                             gettext_noop
1383                             ("# GET ZERO ANONYMITY requests executed"), 1,
1384                             GNUNET_NO);
1385   m = (struct GetZeroAnonymityMessage *) &qe[1];
1386   m->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY);
1387   m->header.size = htons (sizeof (struct GetZeroAnonymityMessage));
1388   m->type = htonl ((uint32_t) type);
1389   m->offset = GNUNET_htonll (offset);
1390   process_queue (h);
1391   return qe;
1392 }
1393
1394
1395 /**
1396  * Get a result for a particular key from the datastore.  The processor
1397  * will only be called once.
1398  *
1399  * @param h handle to the datastore
1400  * @param offset offset of the result (modulo num-results); set to
1401  *               a random 64-bit value initially; then increment by
1402  *               one each time; detect that all results have been found by uid
1403  *               being again the first uid ever returned.
1404  * @param key maybe NULL (to match all entries)
1405  * @param type desired type, 0 for any
1406  * @param queue_priority ranking of this request in the priority queue
1407  * @param max_queue_size at what queue size should this request be dropped
1408  *        (if other requests of higher priority are in the queue)
1409  * @param timeout how long to wait at most for a response
1410  * @param proc function to call on each matching value;
1411  *        will be called once with a NULL value at the end
1412  * @param proc_cls closure for proc
1413  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1414  *         cancel
1415  */
1416 struct GNUNET_DATASTORE_QueueEntry *
1417 GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h, uint64_t offset,
1418                           const GNUNET_HashCode * key,
1419                           enum GNUNET_BLOCK_Type type,
1420                           unsigned int queue_priority,
1421                           unsigned int max_queue_size,
1422                           struct GNUNET_TIME_Relative timeout,
1423                           GNUNET_DATASTORE_DatumProcessor proc, void *proc_cls)
1424 {
1425   struct GNUNET_DATASTORE_QueueEntry *qe;
1426   struct GetMessage *gm;
1427   union QueueContext qc;
1428
1429   GNUNET_assert (NULL != proc);
1430 #if DEBUG_DATASTORE
1431   LOG (GNUNET_ERROR_TYPE_DEBUG,
1432        "Asked to look for data of type %u under key `%s'\n",
1433        (unsigned int) type, GNUNET_h2s (key));
1434 #endif
1435   qc.rc.proc = proc;
1436   qc.rc.proc_cls = proc_cls;
1437   qe = make_queue_entry (h, sizeof (struct GetMessage), queue_priority,
1438                          max_queue_size, timeout, &process_result_message, &qc);
1439   if (qe == NULL)
1440   {
1441 #if DEBUG_DATASTORE
1442     LOG (GNUNET_ERROR_TYPE_DEBUG, "Could not queue request for `%s'\n",
1443          GNUNET_h2s (key));
1444 #endif
1445     return NULL;
1446   }
1447   GNUNET_STATISTICS_update (h->stats, gettext_noop ("# GET requests executed"),
1448                             1, GNUNET_NO);
1449   gm = (struct GetMessage *) &qe[1];
1450   gm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1451   gm->type = htonl (type);
1452   gm->offset = GNUNET_htonll (offset);
1453   if (key != NULL)
1454   {
1455     gm->header.size = htons (sizeof (struct GetMessage));
1456     gm->key = *key;
1457   }
1458   else
1459   {
1460     gm->header.size =
1461         htons (sizeof (struct GetMessage) - sizeof (GNUNET_HashCode));
1462   }
1463   process_queue (h);
1464   return qe;
1465 }
1466
1467
1468 /**
1469  * Cancel a datastore operation.  The final callback from the
1470  * operation must not have been done yet.
1471  *
1472  * @param qe operation to cancel
1473  */
1474 void
1475 GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
1476 {
1477   struct GNUNET_DATASTORE_Handle *h;
1478
1479   GNUNET_assert (GNUNET_SYSERR != qe->was_transmitted);
1480   h = qe->h;
1481 #if DEBUG_DATASTORE
1482   LOG (GNUNET_ERROR_TYPE_DEBUG,
1483        "Pending DATASTORE request %p cancelled (%d, %d)\n", qe,
1484        qe->was_transmitted, h->queue_head == qe);
1485 #endif
1486   if (GNUNET_YES == qe->was_transmitted)
1487   {
1488     free_queue_entry (qe);
1489     h->skip_next_messages++;
1490     return;
1491   }
1492   free_queue_entry (qe);
1493   process_queue (h);
1494 }
1495
1496
1497 /* end of datastore_api.c */