remove send on connect
[oweals/gnunet.git] / src / datastore / datastore_api.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/datastore_api.c
23  * @brief Management for the datastore for files stored on a GNUnet node.  Implements
24  *        a priority queue for requests (with timeouts).
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet_arm_service.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_datastore_service.h"
31 #include "gnunet_statistics_service.h"
32 #include "datastore.h"
33
34 /**
35  * If a client stopped asking for more results, how many more do
36  * we receive from the DB before killing the connection?  Trade-off
37  * between re-doing TCP handshakes and (needlessly) receiving 
38  * useless results.
39  */
40 #define MAX_EXCESS_RESULTS 8
41
42 /**
43  * Context for processing status messages.
44  */
45 struct StatusContext
46 {
47   /**
48    * Continuation to call with the status.
49    */
50   GNUNET_DATASTORE_ContinuationWithStatus cont;
51
52   /**
53    * Closure for cont.
54    */
55   void *cont_cls;
56
57 };
58
59
60 /**
61  * Context for processing result messages.
62  */
63 struct ResultContext
64 {
65   /**
66    * Function to call with the result.
67    */
68   GNUNET_DATASTORE_DatumProcessor proc;
69
70   /**
71    * Closure for proc.
72    */
73   void *proc_cls;
74
75 };
76
77
78 /**
79  *  Context for a queue operation.
80  */
81 union QueueContext
82 {
83
84   struct StatusContext sc;
85   
86   struct ResultContext rc;
87
88 };
89
90
91
92 /**
93  * Entry in our priority queue.
94  */
95 struct GNUNET_DATASTORE_QueueEntry
96 {
97
98   /**
99    * This is a linked list.
100    */
101   struct GNUNET_DATASTORE_QueueEntry *next;
102
103   /**
104    * This is a linked list.
105    */
106   struct GNUNET_DATASTORE_QueueEntry *prev;
107
108   /**
109    * Handle to the master context.
110    */
111   struct GNUNET_DATASTORE_Handle *h;
112
113   /**
114    * Response processor (NULL if we are not waiting for a response).
115    * This struct should be used for the closure, function-specific
116    * arguments can be passed via 'qc'.
117    */
118   GNUNET_CLIENT_MessageHandler response_proc;
119
120   /**
121    * Function to call after transmission of the request.
122    */
123   GNUNET_DATASTORE_ContinuationWithStatus cont;
124    
125   /**
126    * Closure for 'cont'.
127    */
128   void *cont_cls;
129
130   /**
131    * Context for the operation.
132    */
133   union QueueContext qc;
134
135   /**
136    * Task for timeout signalling.
137    */
138   GNUNET_SCHEDULER_TaskIdentifier task;
139
140   /**
141    * Timeout for the current operation.
142    */
143   struct GNUNET_TIME_Absolute timeout;
144
145   /**
146    * Priority in the queue.
147    */
148   unsigned int priority;
149
150   /**
151    * Maximum allowed length of queue (otherwise
152    * this request should be discarded).
153    */
154   unsigned int max_queue;
155
156   /**
157    * Number of bytes in the request message following
158    * this struct.  32-bit value for nicer memory
159    * access (and overall struct alignment).
160    */
161   uint32_t message_size;
162
163   /**
164    * Has this message been transmitted to the service?
165    * Only ever GNUNET_YES for the head of the queue.
166    * Note that the overall struct should end at a 
167    * multiple of 64 bits.
168    */
169   int was_transmitted;
170   
171 };
172
173 /**
174  * Handle to the datastore service. 
175  */
176 struct GNUNET_DATASTORE_Handle
177 {
178
179   /**
180    * Our configuration.
181    */
182   const struct GNUNET_CONFIGURATION_Handle *cfg;
183
184   /**
185    * Current connection to the datastore service.
186    */
187   struct GNUNET_CLIENT_Connection *client;
188
189   /**
190    * Handle for statistics.
191    */
192   struct GNUNET_STATISTICS_Handle *stats;
193
194   /**
195    * Current transmit handle.
196    */
197   struct GNUNET_CLIENT_TransmitHandle *th;
198
199   /**
200    * Current head of priority queue.
201    */
202   struct GNUNET_DATASTORE_QueueEntry *queue_head;
203
204   /**
205    * Current tail of priority queue.
206    */
207   struct GNUNET_DATASTORE_QueueEntry *queue_tail;
208
209   /**
210    * Task for trying to reconnect.
211    */
212   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
213
214   /**
215    * How quickly should we retry?  Used for exponential back-off on
216    * connect-errors.
217    */
218   struct GNUNET_TIME_Relative retry_time;
219
220   /**
221    * Number of entries in the queue.
222    */
223   unsigned int queue_size;
224
225   /**
226    * Number of results we're receiving for the current query
227    * after application stopped to care.  Used to determine when
228    * to reset the connection.
229    */
230   unsigned int result_count;
231
232   /**
233    * Are we currently trying to receive from the service?
234    */
235   int in_receive;
236
237   /**
238    * We should ignore the next message(s) from the service.
239    */
240   unsigned int skip_next_messages;
241
242 };
243
244
245
246 /**
247  * Connect to the datastore service.
248  *
249  * @param cfg configuration to use
250  * @return handle to use to access the service
251  */
252 struct GNUNET_DATASTORE_Handle *
253 GNUNET_DATASTORE_connect (const struct
254                           GNUNET_CONFIGURATION_Handle
255                           *cfg)
256 {
257   struct GNUNET_CLIENT_Connection *c;
258   struct GNUNET_DATASTORE_Handle *h;
259   
260   c = GNUNET_CLIENT_connect ("datastore", cfg);
261   if (c == NULL)
262     return NULL; /* oops */
263   h = GNUNET_malloc (sizeof(struct GNUNET_DATASTORE_Handle) + 
264                      GNUNET_SERVER_MAX_MESSAGE_SIZE - 1);
265   h->client = c;
266   h->cfg = cfg;
267   h->stats = GNUNET_STATISTICS_create ("datastore-api",
268                                        cfg);
269   return h;
270 }
271
272
273 /**
274  * Transmit DROP message to datastore service.
275  *
276  * @param cls the 'struct GNUNET_DATASTORE_Handle'
277  * @param size number of bytes that can be copied to buf
278  * @param buf where to copy the drop message
279  * @return number of bytes written to buf
280  */
281 static size_t
282 transmit_drop (void *cls,
283                size_t size, 
284                void *buf)
285 {
286   struct GNUNET_DATASTORE_Handle *h = cls;
287   struct GNUNET_MessageHeader *hdr;
288   
289   if (buf == NULL)
290     {
291       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
292                   _("Failed to transmit request to drop database.\n"));
293       GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
294       return 0;
295     }
296   GNUNET_assert (size >= sizeof(struct GNUNET_MessageHeader));
297   hdr = buf;
298   hdr->size = htons(sizeof(struct GNUNET_MessageHeader));
299   hdr->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
300   GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
301   return sizeof(struct GNUNET_MessageHeader);
302 }
303
304
305 /**
306  * Disconnect from the datastore service (and free
307  * associated resources).
308  *
309  * @param h handle to the datastore
310  * @param drop set to GNUNET_YES to delete all data in datastore (!)
311  */
312 void
313 GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
314                              int drop)
315 {
316   struct GNUNET_DATASTORE_QueueEntry *qe;
317
318 #if DEBUG_DATASTORE
319   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
320               "Datastore disconnect\n");
321 #endif
322   if (NULL != h->th)
323     {
324       GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
325       h->th = NULL;
326     }
327   if (h->client != NULL)
328     {
329       GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
330       h->client = NULL;
331     }
332   if (h->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
333     {
334       GNUNET_SCHEDULER_cancel (h->reconnect_task);
335       h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
336     }
337   while (NULL != (qe = h->queue_head))
338     {
339       GNUNET_assert (NULL != qe->response_proc);
340       qe->response_proc (h, NULL);
341     }
342   if (GNUNET_YES == drop) 
343     {
344       h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
345       if (h->client != NULL)
346         {
347           if (NULL != 
348               GNUNET_CLIENT_notify_transmit_ready (h->client,
349                                                    sizeof(struct GNUNET_MessageHeader),
350                                                    GNUNET_TIME_UNIT_MINUTES,
351                                                    GNUNET_YES,
352                                                    &transmit_drop,
353                                                    h))
354             return;
355           GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
356           h->client = NULL;
357         }
358       GNUNET_break (0);
359     }
360   GNUNET_STATISTICS_destroy (h->stats,
361                              GNUNET_NO);
362   h->stats = NULL;
363   GNUNET_free (h);
364 }
365
366
367 /**
368  * A request has timed out (before being transmitted to the service).
369  *
370  * @param cls the 'struct GNUNET_DATASTORE_QueueEntry'
371  * @param tc scheduler context
372  */
373 static void
374 timeout_queue_entry (void *cls,
375                      const struct GNUNET_SCHEDULER_TaskContext *tc)
376 {
377   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
378
379   GNUNET_STATISTICS_update (qe->h->stats,
380                             gettext_noop ("# queue entry timeouts"),
381                             1,
382                             GNUNET_NO);
383   qe->task = GNUNET_SCHEDULER_NO_TASK;
384   GNUNET_assert (qe->was_transmitted == GNUNET_NO); 
385 #if DEBUG_DATASTORE
386   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
387               "Timeout of request in datastore queue\n");
388 #endif
389   qe->response_proc (qe->h, NULL);
390 }
391
392
393 /**
394  * Create a new entry for our priority queue (and possibly discard other entires if
395  * the queue is getting too long).
396  *
397  * @param h handle to the datastore
398  * @param msize size of the message to queue
399  * @param queue_priority priority of the entry
400  * @param max_queue_size at what queue size should this request be dropped
401  *        (if other requests of higher priority are in the queue)
402  * @param timeout timeout for the operation
403  * @param response_proc function to call with replies (can be NULL)
404  * @param qc client context (NOT a closure for response_proc)
405  * @return NULL if the queue is full 
406  */
407 static struct GNUNET_DATASTORE_QueueEntry *
408 make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
409                   size_t msize,
410                   unsigned int queue_priority,
411                   unsigned int max_queue_size,
412                   struct GNUNET_TIME_Relative timeout,
413                   GNUNET_CLIENT_MessageHandler response_proc,            
414                   const union QueueContext *qc)
415 {
416   struct GNUNET_DATASTORE_QueueEntry *ret;
417   struct GNUNET_DATASTORE_QueueEntry *pos;
418   unsigned int c;
419
420   c = 0;
421   pos = h->queue_head;
422   while ( (pos != NULL) &&
423           (c < max_queue_size) &&
424           (pos->priority >= queue_priority) )
425     {
426       c++;
427       pos = pos->next;
428     }
429   if (c >= max_queue_size)
430     {
431       GNUNET_STATISTICS_update (h->stats,
432                                 gettext_noop ("# queue overflows"),
433                                 1,
434                                 GNUNET_NO);
435       return NULL;
436     }
437   ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize);
438   ret->h = h;
439   ret->response_proc = response_proc;
440   ret->qc = *qc;
441   ret->timeout = GNUNET_TIME_relative_to_absolute (timeout);
442   ret->priority = queue_priority;
443   ret->max_queue = max_queue_size;
444   ret->message_size = msize;
445   ret->was_transmitted = GNUNET_NO;
446   if (pos == NULL)
447     {
448       /* append at the tail */
449       pos = h->queue_tail;
450     }
451   else
452     {
453       pos = pos->prev; 
454       /* do not insert at HEAD if HEAD query was already
455          transmitted and we are still receiving replies! */
456       if ( (pos == NULL) &&
457            (h->queue_head->was_transmitted) )
458         pos = h->queue_head;
459     }
460   c++;
461   GNUNET_STATISTICS_update (h->stats,
462                             gettext_noop ("# queue entries created"),
463                             1,
464                             GNUNET_NO);
465   GNUNET_CONTAINER_DLL_insert_after (h->queue_head,
466                                      h->queue_tail,
467                                      pos,
468                                      ret);
469   h->queue_size++;
470   ret->task = GNUNET_SCHEDULER_add_delayed (timeout,
471                                             &timeout_queue_entry,
472                                             ret);
473   pos = ret->next;
474   while (pos != NULL) 
475     {
476       if ( (pos->max_queue < h->queue_size) &&
477            (pos->was_transmitted == GNUNET_NO) )
478         {
479           GNUNET_assert (pos->response_proc != NULL);
480           /* move 'pos' element to head so that it will be 
481              killed on 'NULL' call below */
482 #if DEBUG_DATASTORE
483           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
484                       "Dropping request from datastore queue\n");
485 #endif
486           GNUNET_CONTAINER_DLL_remove (h->queue_head,
487                                        h->queue_tail,
488                                        pos);
489           GNUNET_CONTAINER_DLL_insert (h->queue_head,
490                                        h->queue_tail,
491                                        pos);
492           GNUNET_STATISTICS_update (h->stats,
493                                     gettext_noop ("# Requests dropped from datastore queue"),
494                                     1,
495                                     GNUNET_NO);
496           GNUNET_assert (h->queue_head == pos);
497           pos->response_proc (h, NULL);
498           break;
499         }
500       pos = pos->next;
501     }
502   return ret;
503 }
504
505
506 /**
507  * Process entries in the queue (or do nothing if we are already
508  * doing so).
509  * 
510  * @param h handle to the datastore
511  */
512 static void
513 process_queue (struct GNUNET_DATASTORE_Handle *h);
514
515
516 /**
517  * Try reconnecting to the datastore service.
518  *
519  * @param cls the 'struct GNUNET_DATASTORE_Handle'
520  * @param tc scheduler context
521  */
522 static void
523 try_reconnect (void *cls,
524                const struct GNUNET_SCHEDULER_TaskContext *tc)
525 {
526   struct GNUNET_DATASTORE_Handle *h = cls;
527
528   if (h->retry_time.rel_value < GNUNET_CONSTANTS_SERVICE_RETRY.rel_value)
529     h->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY;
530   else
531     h->retry_time = GNUNET_TIME_relative_multiply (h->retry_time, 2);
532   if (h->retry_time.rel_value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.rel_value)
533     h->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
534   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
535   h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
536   if (h->client == NULL)
537     {
538       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
539                   "DATASTORE reconnect failed (fatally)\n");
540       return;
541     }
542   GNUNET_STATISTICS_update (h->stats,
543                             gettext_noop ("# datastore connections (re)created"),
544                             1,
545                             GNUNET_NO);
546 #if DEBUG_DATASTORE
547   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
548               "Reconnected to DATASTORE\n");
549 #endif
550   process_queue (h);
551 }
552
553
554 /**
555  * Disconnect from the service and then try reconnecting to the datastore service
556  * after some delay.
557  *
558  * @param h handle to datastore to disconnect and reconnect
559  */
560 static void
561 do_disconnect (struct GNUNET_DATASTORE_Handle *h)
562 {
563   if (h->client == NULL)
564     {
565 #if DEBUG_DATASTORE
566       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
567                   "client NULL in disconnect, will not try to reconnect\n");
568 #endif
569       return;
570     }
571 #if 0
572   GNUNET_STATISTICS_update (stats,
573                             gettext_noop ("# reconnected to DATASTORE"),
574                             1,
575                             GNUNET_NO);
576 #endif
577   GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
578   h->skip_next_messages = 0;
579   h->client = NULL;
580   h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->retry_time,
581                                                     &try_reconnect,
582                                                     h);      
583 }
584
585
586 /**
587  * Function called whenever we receive a message from
588  * the service.  Calls the appropriate handler.
589  *
590  * @param cls the 'struct GNUNET_DATASTORE_Handle'
591  * @param msg the received message
592  */
593 static void 
594 receive_cb (void *cls,
595             const struct GNUNET_MessageHeader *msg)
596 {
597   struct GNUNET_DATASTORE_Handle *h = cls;
598   struct GNUNET_DATASTORE_QueueEntry *qe;
599
600   h->in_receive = GNUNET_NO;
601 #if DEBUG_DATASTORE
602   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
603               "Receiving reply from datastore\n");
604 #endif
605   if (h->skip_next_messages > 0)
606     {
607       h->skip_next_messages--;
608       process_queue (h);
609       return;
610    } 
611   if (NULL == (qe = h->queue_head))
612     {
613       GNUNET_break (0);
614       process_queue (h);
615       return; 
616     }
617   qe->response_proc (h, msg);
618 }
619
620
621 /**
622  * Transmit request from queue to datastore service.
623  *
624  * @param cls the 'struct GNUNET_DATASTORE_Handle'
625  * @param size number of bytes that can be copied to buf
626  * @param buf where to copy the drop message
627  * @return number of bytes written to buf
628  */
629 static size_t
630 transmit_request (void *cls,
631                   size_t size, 
632                   void *buf)
633 {
634   struct GNUNET_DATASTORE_Handle *h = cls;
635   struct GNUNET_DATASTORE_QueueEntry *qe;
636   size_t msize;
637
638   h->th = NULL;
639   if (NULL == (qe = h->queue_head))
640     return 0; /* no entry in queue */
641   if (buf == NULL)
642     {
643       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
644                   _("Failed to transmit request to DATASTORE.\n"));
645       GNUNET_STATISTICS_update (h->stats,
646                                 gettext_noop ("# transmission request failures"),
647                                 1,
648                                 GNUNET_NO);
649       do_disconnect (h);
650       return 0;
651     }
652   if (size < (msize = qe->message_size))
653     {
654       process_queue (h);
655       return 0;
656     }
657  #if DEBUG_DATASTORE
658   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
659               "Transmitting %u byte request to DATASTORE\n",
660               msize);
661 #endif
662   memcpy (buf, &qe[1], msize);
663   qe->was_transmitted = GNUNET_YES;
664   GNUNET_SCHEDULER_cancel (qe->task);
665   qe->task = GNUNET_SCHEDULER_NO_TASK;
666   GNUNET_assert (GNUNET_NO == h->in_receive);
667   h->in_receive = GNUNET_YES;
668   GNUNET_CLIENT_receive (h->client,
669                          &receive_cb,
670                          h,
671                          GNUNET_TIME_absolute_get_remaining (qe->timeout));
672   GNUNET_STATISTICS_update (h->stats,
673                             gettext_noop ("# bytes sent to datastore"),
674                             1,
675                             GNUNET_NO);
676   return msize;
677 }
678
679
680 /**
681  * Process entries in the queue (or do nothing if we are already
682  * doing so).
683  * 
684  * @param h handle to the datastore
685  */
686 static void
687 process_queue (struct GNUNET_DATASTORE_Handle *h)
688 {
689   struct GNUNET_DATASTORE_QueueEntry *qe;
690
691   if (NULL == (qe = h->queue_head))
692     {
693 #if DEBUG_DATASTORE > 1
694       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
695                   "Queue empty\n");
696 #endif
697       return; /* no entry in queue */
698     }
699   if (qe->was_transmitted == GNUNET_YES)
700     {
701 #if DEBUG_DATASTORE > 1
702       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
703                   "Head request already transmitted\n");
704 #endif
705       return; /* waiting for replies */
706     }
707   if (h->th != NULL)
708     {
709 #if DEBUG_DATASTORE > 1
710       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
711                   "Pending transmission request\n");
712 #endif
713       return; /* request pending */
714     }
715   if (h->client == NULL)
716     {
717 #if DEBUG_DATASTORE > 1
718       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
719                   "Not connected\n");
720 #endif
721       return; /* waiting for reconnect */
722     }
723   if (GNUNET_YES == h->in_receive)
724     {
725       /* wait for response to previous query */
726       return; 
727     }
728 #if DEBUG_DATASTORE
729   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
730               "Queueing %u byte request to DATASTORE\n",
731               qe->message_size);
732 #endif
733   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
734                                                qe->message_size,
735                                                GNUNET_TIME_absolute_get_remaining (qe->timeout),
736                                                GNUNET_YES,
737                                                &transmit_request,
738                                                h);
739   GNUNET_assert (GNUNET_NO == h->in_receive);
740   GNUNET_break (NULL != h->th);
741 }
742
743
744 /**
745  * Dummy continuation used to do nothing (but be non-zero).
746  *
747  * @param cls closure
748  * @param result result 
749  * @param emsg error message
750  */
751 static void
752 drop_status_cont (void *cls, int32_t result, const char *emsg)
753 {
754   /* do nothing */
755 }
756
757
758 /**
759  * Free a queue entry.  Removes the given entry from the
760  * queue and releases associated resources.  Does NOT
761  * call the callback.
762  * 
763  * @param qe entry to free.
764  */
765 static void
766 free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
767 {
768   struct GNUNET_DATASTORE_Handle *h = qe->h;
769
770   GNUNET_CONTAINER_DLL_remove (h->queue_head,
771                                h->queue_tail,
772                                qe);
773   if (qe->task != GNUNET_SCHEDULER_NO_TASK)
774     {
775       GNUNET_SCHEDULER_cancel (qe->task);
776       qe->task = GNUNET_SCHEDULER_NO_TASK;
777     }
778   h->queue_size--;
779   qe->was_transmitted = GNUNET_SYSERR; /* use-after-free warning */
780   GNUNET_free (qe);
781 }
782
783
784 /**
785  * Type of a function to call when we receive a message
786  * from the service.
787  *
788  * @param cls closure
789  * @param msg message received, NULL on timeout or fatal error
790  */
791 static void 
792 process_status_message (void *cls,
793                         const struct
794                         GNUNET_MessageHeader * msg)
795 {
796   struct GNUNET_DATASTORE_Handle *h = cls;
797   struct GNUNET_DATASTORE_QueueEntry *qe;
798   struct StatusContext rc;
799   const struct StatusMessage *sm;
800   const char *emsg;
801   int32_t status;
802   int was_transmitted;
803
804   if (NULL == (qe = h->queue_head))
805     {
806       GNUNET_break (0);
807       do_disconnect (h);
808       return;
809     }
810   rc = qe->qc.sc;
811   if (msg == NULL)
812     {      
813       was_transmitted = qe->was_transmitted;
814       free_queue_entry (qe);
815       if (was_transmitted == GNUNET_YES)
816         do_disconnect (h);
817       else
818         process_queue (h);
819       if (rc.cont != NULL)
820         rc.cont (rc.cont_cls, 
821                  GNUNET_SYSERR,
822                  _("Failed to receive status response from database."));
823       return;
824     }
825   GNUNET_assert (GNUNET_YES == qe->was_transmitted);
826   free_queue_entry (qe);
827   if ( (ntohs(msg->size) < sizeof(struct StatusMessage)) ||
828        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS) ) 
829     {
830       GNUNET_break (0);
831       h->retry_time = GNUNET_TIME_UNIT_ZERO;
832       do_disconnect (h);
833       if (rc.cont != NULL)
834         rc.cont (rc.cont_cls, 
835                  GNUNET_SYSERR,
836                  _("Error reading response from datastore service"));
837       return;
838     }
839   sm = (const struct StatusMessage*) msg;
840   status = ntohl(sm->status);
841   emsg = NULL;
842   if (ntohs(msg->size) > sizeof(struct StatusMessage))
843     {
844       emsg = (const char*) &sm[1];
845       if (emsg[ntohs(msg->size) - sizeof(struct StatusMessage) - 1] != '\0')
846         {
847           GNUNET_break (0);
848           emsg = _("Invalid error message received from datastore service");
849         }
850     }  
851   if ( (status == GNUNET_SYSERR) &&
852        (emsg == NULL) )
853     {
854       GNUNET_break (0);
855       emsg = _("Invalid error message received from datastore service");
856     }
857 #if DEBUG_DATASTORE
858   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
859               "Received status %d/%s\n",
860               (int) status,
861               emsg);
862 #endif
863   GNUNET_STATISTICS_update (h->stats,
864                             gettext_noop ("# status messages received"),
865                             1,
866                             GNUNET_NO);
867   h->retry_time.rel_value = 0;
868   process_queue (h);
869   if (rc.cont != NULL)
870     rc.cont (rc.cont_cls, 
871              status,
872              emsg);
873 }
874
875
876 /**
877  * Store an item in the datastore.  If the item is already present,
878  * the priorities are summed up and the higher expiration time and
879  * lower anonymity level is used.
880  *
881  * @param h handle to the datastore
882  * @param rid reservation ID to use (from "reserve"); use 0 if no
883  *            prior reservation was made
884  * @param key key for the value
885  * @param size number of bytes in data
886  * @param data content stored
887  * @param type type of the content
888  * @param priority priority of the content
889  * @param anonymity anonymity-level for the content
890  * @param replication how often should the content be replicated to other peers?
891  * @param expiration expiration time for the content
892  * @param queue_priority ranking of this request in the priority queue
893  * @param max_queue_size at what queue size should this request be dropped
894  *        (if other requests of higher priority are in the queue)
895  * @param timeout timeout for the operation
896  * @param cont continuation to call when done
897  * @param cont_cls closure for cont
898  * @return NULL if the entry was not queued, otherwise a handle that can be used to
899  *         cancel; note that even if NULL is returned, the callback will be invoked
900  *         (or rather, will already have been invoked)
901  */
902 struct GNUNET_DATASTORE_QueueEntry *
903 GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
904                       uint32_t rid,
905                       const GNUNET_HashCode * key,
906                       size_t size,
907                       const void *data,
908                       enum GNUNET_BLOCK_Type type,
909                       uint32_t priority,
910                       uint32_t anonymity,
911                       uint32_t replication,
912                       struct GNUNET_TIME_Absolute expiration,
913                       unsigned int queue_priority,
914                       unsigned int max_queue_size,
915                       struct GNUNET_TIME_Relative timeout,
916                       GNUNET_DATASTORE_ContinuationWithStatus cont,
917                       void *cont_cls)
918 {
919   struct GNUNET_DATASTORE_QueueEntry *qe;
920   struct DataMessage *dm;
921   size_t msize;
922   union QueueContext qc;
923
924 #if DEBUG_DATASTORE
925   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
926               "Asked to put %u bytes of data under key `%s' for %llu ms\n",
927               size,
928               GNUNET_h2s (key),
929               GNUNET_TIME_absolute_get_remaining (expiration).rel_value);
930 #endif
931   msize = sizeof(struct DataMessage) + size;
932   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
933   qc.sc.cont = cont;
934   qc.sc.cont_cls = cont_cls;
935   qe = make_queue_entry (h, msize,
936                          queue_priority, max_queue_size, timeout,
937                          &process_status_message, &qc);
938   if (qe == NULL)
939     {
940 #if DEBUG_DATASTORE
941       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
942                   "Could not create queue entry for PUT\n");
943 #endif
944       return NULL;
945     }
946   GNUNET_STATISTICS_update (h->stats,
947                             gettext_noop ("# PUT requests executed"),
948                             1,
949                             GNUNET_NO);
950   dm = (struct DataMessage* ) &qe[1];
951   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
952   dm->header.size = htons(msize);
953   dm->rid = htonl(rid);
954   dm->size = htonl( (uint32_t) size);
955   dm->type = htonl(type);
956   dm->priority = htonl(priority);
957   dm->anonymity = htonl(anonymity);
958   dm->replication = htonl (replication);
959   dm->reserved = htonl (0);
960   dm->uid = GNUNET_htonll(0);
961   dm->expiration = GNUNET_TIME_absolute_hton(expiration);
962   dm->key = *key;
963   memcpy (&dm[1], data, size);
964   process_queue (h);
965   return qe;
966 }
967
968
969 /**
970  * Reserve space in the datastore.  This function should be used
971  * to avoid "out of space" failures during a longer sequence of "put"
972  * operations (for example, when a file is being inserted).
973  *
974  * @param h handle to the datastore
975  * @param amount how much space (in bytes) should be reserved (for content only)
976  * @param entries how many entries will be created (to calculate per-entry overhead)
977  * @param queue_priority ranking of this request in the priority queue
978  * @param max_queue_size at what queue size should this request be dropped
979  *        (if other requests of higher priority are in the queue)
980  * @param timeout how long to wait at most for a response (or before dying in queue)
981  * @param cont continuation to call when done; "success" will be set to
982  *             a positive reservation value if space could be reserved.
983  * @param cont_cls closure for cont
984  * @return NULL if the entry was not queued, otherwise a handle that can be used to
985  *         cancel; note that even if NULL is returned, the callback will be invoked
986  *         (or rather, will already have been invoked)
987  */
988 struct GNUNET_DATASTORE_QueueEntry *
989 GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
990                           uint64_t amount,
991                           uint32_t entries,
992                           unsigned int queue_priority,
993                           unsigned int max_queue_size,
994                           struct GNUNET_TIME_Relative timeout,
995                           GNUNET_DATASTORE_ContinuationWithStatus cont,
996                           void *cont_cls)
997 {
998   struct GNUNET_DATASTORE_QueueEntry *qe;
999   struct ReserveMessage *rm;
1000   union QueueContext qc;
1001
1002   if (cont == NULL)
1003     cont = &drop_status_cont;
1004 #if DEBUG_DATASTORE
1005   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1006               "Asked to reserve %llu bytes of data and %u entries\n",
1007               (unsigned long long) amount,
1008               (unsigned int) entries);
1009 #endif
1010   qc.sc.cont = cont;
1011   qc.sc.cont_cls = cont_cls;
1012   qe = make_queue_entry (h, sizeof(struct ReserveMessage),
1013                          queue_priority, max_queue_size, timeout,
1014                          &process_status_message, &qc);
1015   if (qe == NULL)
1016     {
1017 #if DEBUG_DATASTORE
1018       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1019                   "Could not create queue entry to reserve\n");
1020 #endif
1021       return NULL;
1022     }
1023   GNUNET_STATISTICS_update (h->stats,
1024                             gettext_noop ("# RESERVE requests executed"),
1025                             1,
1026                             GNUNET_NO);
1027   rm = (struct ReserveMessage*) &qe[1];
1028   rm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
1029   rm->header.size = htons(sizeof (struct ReserveMessage));
1030   rm->entries = htonl(entries);
1031   rm->amount = GNUNET_htonll(amount);
1032   process_queue (h);
1033   return qe;
1034 }
1035
1036
1037 /**
1038  * Signal that all of the data for which a reservation was made has
1039  * been stored and that whatever excess space might have been reserved
1040  * can now be released.
1041  *
1042  * @param h handle to the datastore
1043  * @param rid reservation ID (value of "success" in original continuation
1044  *        from the "reserve" function).
1045  * @param queue_priority ranking of this request in the priority queue
1046  * @param max_queue_size at what queue size should this request be dropped
1047  *        (if other requests of higher priority are in the queue)
1048  * @param queue_priority ranking of this request in the priority queue
1049  * @param max_queue_size at what queue size should this request be dropped
1050  *        (if other requests of higher priority are in the queue)
1051  * @param timeout how long to wait at most for a response
1052  * @param cont continuation to call when done
1053  * @param cont_cls closure for cont
1054  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1055  *         cancel; note that even if NULL is returned, the callback will be invoked
1056  *         (or rather, will already have been invoked)
1057  */
1058 struct GNUNET_DATASTORE_QueueEntry *
1059 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
1060                                   uint32_t rid,
1061                                   unsigned int queue_priority,
1062                                   unsigned int max_queue_size,
1063                                   struct GNUNET_TIME_Relative timeout,
1064                                   GNUNET_DATASTORE_ContinuationWithStatus cont,
1065                                   void *cont_cls)
1066 {
1067   struct GNUNET_DATASTORE_QueueEntry *qe;
1068   struct ReleaseReserveMessage *rrm;
1069   union QueueContext qc;
1070
1071   if (cont == NULL)
1072     cont = &drop_status_cont;
1073 #if DEBUG_DATASTORE
1074   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1075               "Asked to release reserve %d\n",
1076               rid);
1077 #endif
1078   qc.sc.cont = cont;
1079   qc.sc.cont_cls = cont_cls;
1080   qe = make_queue_entry (h, sizeof(struct ReleaseReserveMessage),
1081                          queue_priority, max_queue_size, timeout,
1082                          &process_status_message, &qc);
1083   if (qe == NULL)
1084     {
1085 #if DEBUG_DATASTORE
1086       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1087                   "Could not create queue entry to release reserve\n");
1088 #endif
1089       return NULL;
1090     }
1091   GNUNET_STATISTICS_update (h->stats,
1092                             gettext_noop ("# RELEASE RESERVE requests executed"),
1093                             1,
1094                             GNUNET_NO);
1095   rrm = (struct ReleaseReserveMessage*) &qe[1];
1096   rrm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
1097   rrm->header.size = htons(sizeof (struct ReleaseReserveMessage));
1098   rrm->rid = htonl(rid);
1099   process_queue (h);
1100   return qe;
1101 }
1102
1103
1104 /**
1105  * Update a value in the datastore.
1106  *
1107  * @param h handle to the datastore
1108  * @param uid identifier for the value
1109  * @param priority how much to increase the priority of the value
1110  * @param expiration new expiration value should be MAX of existing and this argument
1111  * @param queue_priority ranking of this request in the priority queue
1112  * @param max_queue_size at what queue size should this request be dropped
1113  *        (if other requests of higher priority are in the queue)
1114  * @param timeout how long to wait at most for a response
1115  * @param cont continuation to call when done
1116  * @param cont_cls closure for cont
1117  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1118  *         cancel; note that even if NULL is returned, the callback will be invoked
1119  *         (or rather, will already have been invoked)
1120  */
1121 struct GNUNET_DATASTORE_QueueEntry *
1122 GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
1123                          uint64_t uid,
1124                          uint32_t priority,
1125                          struct GNUNET_TIME_Absolute expiration,
1126                          unsigned int queue_priority,
1127                          unsigned int max_queue_size,
1128                          struct GNUNET_TIME_Relative timeout,
1129                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1130                          void *cont_cls)
1131 {
1132   struct GNUNET_DATASTORE_QueueEntry *qe;
1133   struct UpdateMessage *um;
1134   union QueueContext qc;
1135
1136   if (cont == NULL)
1137     cont = &drop_status_cont;
1138 #if DEBUG_DATASTORE
1139   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1140               "Asked to update entry %llu raising priority by %u and expiration to %llu\n",
1141               uid,
1142               (unsigned int) priority,
1143               (unsigned long long) expiration.abs_value);
1144 #endif
1145   qc.sc.cont = cont;
1146   qc.sc.cont_cls = cont_cls;
1147   qe = make_queue_entry (h, sizeof(struct UpdateMessage),
1148                          queue_priority, max_queue_size, timeout,
1149                          &process_status_message, &qc);
1150   if (qe == NULL)
1151     {
1152 #if DEBUG_DATASTORE
1153       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1154                   "Could not create queue entry for UPDATE\n");
1155 #endif
1156       return NULL;
1157     }
1158   GNUNET_STATISTICS_update (h->stats,
1159                             gettext_noop ("# UPDATE requests executed"),
1160                             1,
1161                             GNUNET_NO);
1162   um = (struct UpdateMessage*) &qe[1];
1163   um->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
1164   um->header.size = htons(sizeof (struct UpdateMessage));
1165   um->priority = htonl(priority);
1166   um->expiration = GNUNET_TIME_absolute_hton(expiration);
1167   um->uid = GNUNET_htonll(uid);
1168   process_queue (h);
1169   return qe;
1170 }
1171
1172
1173 /**
1174  * Explicitly remove some content from the database.
1175  * The "cont"inuation will be called with status
1176  * "GNUNET_OK" if content was removed, "GNUNET_NO"
1177  * if no matching entry was found and "GNUNET_SYSERR"
1178  * on all other types of errors.
1179  *
1180  * @param h handle to the datastore
1181  * @param key key for the value
1182  * @param size number of bytes in data
1183  * @param data content stored
1184  * @param queue_priority ranking of this request in the priority queue
1185  * @param max_queue_size at what queue size should this request be dropped
1186  *        (if other requests of higher priority are in the queue)
1187  * @param timeout how long to wait at most for a response
1188  * @param cont continuation to call when done
1189  * @param cont_cls closure for cont
1190  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1191  *         cancel; note that even if NULL is returned, the callback will be invoked
1192  *         (or rather, will already have been invoked)
1193  */
1194 struct GNUNET_DATASTORE_QueueEntry *
1195 GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
1196                          const GNUNET_HashCode *key,
1197                          size_t size,
1198                          const void *data,
1199                          unsigned int queue_priority,
1200                          unsigned int max_queue_size,
1201                          struct GNUNET_TIME_Relative timeout,
1202                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1203                          void *cont_cls)
1204 {
1205   struct GNUNET_DATASTORE_QueueEntry *qe;
1206   struct DataMessage *dm;
1207   size_t msize;
1208   union QueueContext qc;
1209
1210   if (cont == NULL)
1211     cont = &drop_status_cont;
1212 #if DEBUG_DATASTORE
1213   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1214               "Asked to remove %u bytes under key `%s'\n",
1215               size,
1216               GNUNET_h2s (key));
1217 #endif
1218   qc.sc.cont = cont;
1219   qc.sc.cont_cls = cont_cls;
1220   msize = sizeof(struct DataMessage) + size;
1221   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1222   qe = make_queue_entry (h, msize,
1223                          queue_priority, max_queue_size, timeout,
1224                          &process_status_message, &qc);
1225   if (qe == NULL)
1226     {
1227 #if DEBUG_DATASTORE
1228       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1229                   "Could not create queue entry for REMOVE\n");
1230 #endif
1231       return NULL;
1232     }
1233   GNUNET_STATISTICS_update (h->stats,
1234                             gettext_noop ("# REMOVE requests executed"),
1235                             1,
1236                             GNUNET_NO);
1237   dm = (struct DataMessage*) &qe[1];
1238   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
1239   dm->header.size = htons(msize);
1240   dm->rid = htonl(0);
1241   dm->size = htonl(size);
1242   dm->type = htonl(0);
1243   dm->priority = htonl(0);
1244   dm->anonymity = htonl(0);
1245   dm->uid = GNUNET_htonll(0);
1246   dm->expiration = GNUNET_TIME_absolute_hton(GNUNET_TIME_UNIT_ZERO_ABS);
1247   dm->key = *key;
1248   memcpy (&dm[1], data, size);
1249   process_queue (h);
1250   return qe;
1251 }
1252
1253
1254 /**
1255  * Type of a function to call when we receive a message
1256  * from the service.
1257  *
1258  * @param cls closure
1259  * @param msg message received, NULL on timeout or fatal error
1260  */
1261 static void 
1262 process_result_message (void *cls,
1263                         const struct GNUNET_MessageHeader *msg)
1264 {
1265   struct GNUNET_DATASTORE_Handle *h = cls;
1266   struct GNUNET_DATASTORE_QueueEntry *qe;
1267   struct ResultContext rc;
1268   const struct DataMessage *dm;
1269   int was_transmitted;
1270
1271   if (msg == NULL)
1272     {
1273       qe = h->queue_head;
1274       GNUNET_assert (NULL != qe);
1275       rc = qe->qc.rc;
1276       was_transmitted = qe->was_transmitted;
1277       free_queue_entry (qe);
1278       if (was_transmitted == GNUNET_YES)
1279         {
1280           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1281                       _("Failed to receive response from database.\n"));
1282           do_disconnect (h);
1283         }
1284       else
1285         {
1286           process_queue (h);
1287         }
1288       if (rc.proc != NULL)
1289         rc.proc (rc.proc_cls,
1290                  NULL, 0, NULL, 0, 0, 0, 
1291                  GNUNET_TIME_UNIT_ZERO_ABS, 0);         
1292       return;
1293     }
1294   if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) 
1295     {
1296       GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader));
1297       qe = h->queue_head;
1298       rc = qe->qc.rc;
1299       GNUNET_assert (GNUNET_YES == qe->was_transmitted);
1300       free_queue_entry (qe);
1301 #if DEBUG_DATASTORE
1302       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1303                   "Received end of result set, new queue size is %u\n",
1304                   h->queue_size);
1305 #endif
1306       if (rc.proc != NULL)
1307         rc.proc (rc.proc_cls,
1308                  NULL, 0, NULL, 0, 0, 0, 
1309                  GNUNET_TIME_UNIT_ZERO_ABS, 0); 
1310       h->retry_time.rel_value = 0;
1311       h->result_count = 0;
1312       process_queue (h);
1313       return;
1314     }
1315   qe = h->queue_head;
1316   GNUNET_assert (NULL != qe);
1317   rc = qe->qc.rc;
1318   if (GNUNET_YES != qe->was_transmitted)
1319     {
1320       GNUNET_break (0);
1321       free_queue_entry (qe);
1322       h->retry_time = GNUNET_TIME_UNIT_ZERO;
1323       do_disconnect (h);
1324       if (rc.proc != NULL)
1325         rc.proc (rc.proc_cls,
1326                  NULL, 0, NULL, 0, 0, 0, 
1327                  GNUNET_TIME_UNIT_ZERO_ABS, 0);
1328       return;
1329     }
1330   if ( (ntohs(msg->size) < sizeof(struct DataMessage)) ||
1331        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) ||
1332        (ntohs(msg->size) != sizeof(struct DataMessage) + ntohl (((const struct DataMessage*)msg)->size)) )
1333     {
1334       GNUNET_break (0);
1335       free_queue_entry (qe);
1336       h->retry_time = GNUNET_TIME_UNIT_ZERO;
1337       do_disconnect (h);
1338       if (rc.proc != NULL)
1339         rc.proc (rc.proc_cls,
1340                  NULL, 0, NULL, 0, 0, 0, 
1341                  GNUNET_TIME_UNIT_ZERO_ABS, 0);
1342       return;
1343     }
1344   GNUNET_STATISTICS_update (h->stats,
1345                             gettext_noop ("# Results received"),
1346                             1,
1347                             GNUNET_NO);
1348   dm = (const struct DataMessage*) msg;
1349 #if DEBUG_DATASTORE
1350   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1351               "Received result %llu with type %u and size %u with key %s\n",
1352               (unsigned long long) GNUNET_ntohll(dm->uid),
1353               ntohl(dm->type),
1354               ntohl(dm->size),
1355               GNUNET_h2s(&dm->key));
1356 #endif
1357   free_queue_entry (qe);
1358   h->retry_time.rel_value = 0;
1359   process_queue (h);
1360   if (rc.proc != NULL)
1361     rc.proc (rc.proc_cls,
1362              &dm->key,
1363              ntohl(dm->size),
1364              &dm[1],
1365              ntohl(dm->type),
1366              ntohl(dm->priority),
1367              ntohl(dm->anonymity),
1368              GNUNET_TIME_absolute_ntoh(dm->expiration), 
1369              GNUNET_ntohll(dm->uid));
1370 }
1371
1372
1373 /**
1374  * Get a random value from the datastore for content replication.
1375  * Returns a single, random value among those with the highest
1376  * replication score, lowering positive replication scores by one for
1377  * the chosen value (if only content with a replication score exists,
1378  * a random value is returned and replication scores are not changed).
1379  *
1380  * @param h handle to the datastore
1381  * @param queue_priority ranking of this request in the priority queue
1382  * @param max_queue_size at what queue size should this request be dropped
1383  *        (if other requests of higher priority are in the queue)
1384  * @param timeout how long to wait at most for a response
1385  * @param proc function to call on a random value; it
1386  *        will be called once with a value (if available)
1387  *        and always once with a value of NULL.
1388  * @param proc_cls closure for proc
1389  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1390  *         cancel
1391  */
1392 struct GNUNET_DATASTORE_QueueEntry *
1393 GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
1394                                       unsigned int queue_priority,
1395                                       unsigned int max_queue_size,
1396                                       struct GNUNET_TIME_Relative timeout,
1397                                       GNUNET_DATASTORE_DatumProcessor proc, 
1398                                       void *proc_cls)
1399 {
1400   struct GNUNET_DATASTORE_QueueEntry *qe;
1401   struct GNUNET_MessageHeader *m;
1402   union QueueContext qc;
1403
1404   GNUNET_assert (NULL != proc);
1405 #if DEBUG_DATASTORE
1406   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1407               "Asked to get replication entry in %llu ms\n",
1408               (unsigned long long) timeout.rel_value);
1409 #endif
1410   qc.rc.proc = proc;
1411   qc.rc.proc_cls = proc_cls;
1412   qe = make_queue_entry (h, sizeof(struct GNUNET_MessageHeader),
1413                          queue_priority, max_queue_size, timeout,
1414                          &process_result_message, &qc);
1415   if (qe == NULL)
1416     {
1417 #if DEBUG_DATASTORE
1418       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1419                   "Could not create queue entry for GET REPLICATION\n");
1420 #endif
1421       return NULL;    
1422     }
1423   GNUNET_STATISTICS_update (h->stats,
1424                             gettext_noop ("# GET REPLICATION requests executed"),
1425                             1,
1426                             GNUNET_NO);
1427   m = (struct GNUNET_MessageHeader*) &qe[1];
1428   m->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION);
1429   m->size = htons(sizeof (struct GNUNET_MessageHeader));
1430   process_queue (h);
1431   return qe;
1432 }
1433
1434
1435 /**
1436  * Get a single zero-anonymity value from the datastore.
1437  *
1438  * @param h handle to the datastore
1439  * @param offset offset of the result (modulo num-results); set to
1440  *               a random 64-bit value initially; then increment by
1441  *               one each time; detect that all results have been found by uid
1442  *               being again the first uid ever returned.
1443  * @param queue_priority ranking of this request in the priority queue
1444  * @param max_queue_size at what queue size should this request be dropped
1445  *        (if other requests of higher priority are in the queue)
1446  * @param timeout how long to wait at most for a response
1447  * @param type allowed type for the operation (never zero)
1448  * @param proc function to call on a random value; it
1449  *        will be called once with a value (if available)
1450  *        or with NULL if none value exists.
1451  * @param proc_cls closure for proc
1452  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1453  *         cancel
1454  */
1455 struct GNUNET_DATASTORE_QueueEntry *
1456 GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1457                                      uint64_t offset,
1458                                      unsigned int queue_priority,
1459                                      unsigned int max_queue_size,
1460                                      struct GNUNET_TIME_Relative timeout,
1461                                      enum GNUNET_BLOCK_Type type,
1462                                      GNUNET_DATASTORE_DatumProcessor proc, 
1463                                      void *proc_cls)
1464 {
1465   struct GNUNET_DATASTORE_QueueEntry *qe;
1466   struct GetZeroAnonymityMessage *m;
1467   union QueueContext qc;
1468
1469   GNUNET_assert (NULL != proc);
1470   GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY);
1471 #if DEBUG_DATASTORE
1472   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1473               "Asked to get %llu-th zero-anonymity entry of type %d in %llu ms\n",
1474               (unsigned long long) offset,
1475               type,
1476               (unsigned long long) timeout.rel_value);
1477 #endif
1478   qc.rc.proc = proc;
1479   qc.rc.proc_cls = proc_cls;
1480   qe = make_queue_entry (h, sizeof(struct GetZeroAnonymityMessage),
1481                          queue_priority, max_queue_size, timeout,
1482                          &process_result_message, &qc);
1483   if (qe == NULL)
1484     {
1485 #if DEBUG_DATASTORE
1486       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1487                   "Could not create queue entry for zero-anonymity procation\n");
1488 #endif
1489       return NULL;    
1490     }
1491   GNUNET_STATISTICS_update (h->stats,
1492                             gettext_noop ("# GET ZERO ANONYMITY requests executed"),
1493                             1,
1494                             GNUNET_NO);
1495   m = (struct GetZeroAnonymityMessage*) &qe[1];
1496   m->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY);
1497   m->header.size = htons(sizeof (struct GetZeroAnonymityMessage));
1498   m->type = htonl ((uint32_t) type);
1499   m->offset = GNUNET_htonll (offset);
1500   process_queue (h);
1501   return qe;
1502 }
1503
1504
1505 /**
1506  * Get a result for a particular key from the datastore.  The processor
1507  * will only be called once.
1508  *
1509  * @param h handle to the datastore
1510  * @param offset offset of the result (modulo num-results); set to
1511  *               a random 64-bit value initially; then increment by
1512  *               one each time; detect that all results have been found by uid
1513  *               being again the first uid ever returned.
1514  * @param key maybe NULL (to match all entries)
1515  * @param type desired type, 0 for any
1516  * @param queue_priority ranking of this request in the priority queue
1517  * @param max_queue_size at what queue size should this request be dropped
1518  *        (if other requests of higher priority are in the queue)
1519  * @param timeout how long to wait at most for a response
1520  * @param proc function to call on each matching value;
1521  *        will be called once with a NULL value at the end
1522  * @param proc_cls closure for proc
1523  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1524  *         cancel
1525  */
1526 struct GNUNET_DATASTORE_QueueEntry *
1527 GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h,
1528                           uint64_t offset,
1529                           const GNUNET_HashCode * key,
1530                           enum GNUNET_BLOCK_Type type,
1531                           unsigned int queue_priority,
1532                           unsigned int max_queue_size,
1533                           struct GNUNET_TIME_Relative timeout,
1534                           GNUNET_DATASTORE_DatumProcessor proc, 
1535                           void *proc_cls)
1536 {
1537   struct GNUNET_DATASTORE_QueueEntry *qe;
1538   struct GetMessage *gm;
1539   union QueueContext qc;
1540
1541   GNUNET_assert (NULL != proc);
1542 #if DEBUG_DATASTORE
1543   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1544               "Asked to look for data of type %u under key `%s'\n",
1545               (unsigned int) type,
1546               GNUNET_h2s (key));
1547 #endif
1548   qc.rc.proc = proc;
1549   qc.rc.proc_cls = proc_cls;
1550   qe = make_queue_entry (h, sizeof(struct GetMessage),
1551                          queue_priority, max_queue_size, timeout,
1552                          &process_result_message, &qc);
1553   if (qe == NULL)
1554     {
1555 #if DEBUG_DATASTORE
1556       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1557                   "Could not queue request for `%s'\n",
1558                   GNUNET_h2s (key));
1559 #endif
1560       return NULL;
1561     }
1562   GNUNET_STATISTICS_update (h->stats,
1563                             gettext_noop ("# GET requests executed"),
1564                             1,
1565                             GNUNET_NO);
1566   gm = (struct GetMessage*) &qe[1];
1567   gm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1568   gm->type = htonl(type);
1569   gm->offset = GNUNET_htonll (offset);
1570   if (key != NULL)
1571     {
1572       gm->header.size = htons(sizeof (struct GetMessage));
1573       gm->key = *key;
1574     }
1575   else
1576     {
1577       gm->header.size = htons(sizeof (struct GetMessage) - sizeof(GNUNET_HashCode));
1578     }
1579   process_queue (h);
1580   return qe;
1581 }
1582
1583
1584 /**
1585  * Cancel a datastore operation.  The final callback from the
1586  * operation must not have been done yet.
1587  * 
1588  * @param qe operation to cancel
1589  */
1590 void
1591 GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
1592 {
1593   struct GNUNET_DATASTORE_Handle *h;
1594
1595   GNUNET_assert (GNUNET_SYSERR != qe->was_transmitted);
1596   h = qe->h;
1597 #if DEBUG_DATASTORE
1598   GNUNET_log  (GNUNET_ERROR_TYPE_DEBUG,
1599                "Pending DATASTORE request %p cancelled (%d, %d)\n",
1600                qe,
1601                qe->was_transmitted,
1602                h->queue_head == qe);
1603 #endif
1604   if (GNUNET_YES == qe->was_transmitted) 
1605     {
1606       free_queue_entry (qe);
1607       h->skip_next_messages++;
1608       return;
1609     }
1610   free_queue_entry (qe);
1611   process_queue (h);
1612 }
1613
1614
1615 /* end of datastore_api.c */