additional tests
[oweals/gnunet.git] / src / fs / gnunet-service-fs_drq.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_drq.c
23  * @brief queueing of requests to the datastore service
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet-service-fs_drq.h"
28
29 #define DEBUG_DRQ GNUNET_YES
30
31 /**
32  * Signature of a function that is called whenever a datastore
33  * request can be processed (or an entry put on the queue times out).
34  *
35  * @param cls closure
36  * @param ok GNUNET_OK if DS is ready, GNUNET_SYSERR on timeout
37  */
38 typedef void (*RequestFunction)(void *cls,
39                                 int ok);
40
41
42 /**
43  * Doubly-linked list of our requests for the datastore.
44  */
45 struct DatastoreRequestQueue
46 {
47
48   /**
49    * This is a doubly-linked list.
50    */
51   struct DatastoreRequestQueue *next;
52
53   /**
54    * This is a doubly-linked list.
55    */
56   struct DatastoreRequestQueue *prev;
57
58   /**
59    * Function to call for each entry.
60    */
61   GNUNET_DATASTORE_Iterator iter;
62
63   /**
64    * Closure for iter.
65    */
66   void *iter_cls;
67
68   /**
69    * Key we are doing the 'get' for.
70    */
71   GNUNET_HashCode key;
72
73   /**
74    * Timeout for this operation.
75    */
76   struct GNUNET_TIME_Absolute timeout;
77     
78   /**
79    * ID of task used for signaling timeout.
80    */
81   GNUNET_SCHEDULER_TaskIdentifier task;
82
83   /**
84    * Datastore entry type we are doing the 'get' for.
85    */
86   uint32_t type;
87
88   /**
89    * Is this request at the head of the queue irrespective of its
90    * timeout value?
91    */
92   int forced_head;
93
94 };
95
96 /**
97  * Our scheduler.
98  */
99 static struct GNUNET_SCHEDULER_Handle *sched;
100
101 /**
102  * Our configuration.
103  */
104 static const struct GNUNET_CONFIGURATION_Handle *cfg;
105
106 /**
107  * Head of request queue for the datastore, sorted by timeout.
108  */
109 static struct DatastoreRequestQueue *drq_head;
110
111 /**
112  * Tail of request queue for the datastore.
113  */
114 static struct DatastoreRequestQueue *drq_tail;
115
116 /**
117  * Our connection to the datastore.
118  */
119 static struct GNUNET_DATASTORE_Handle *dsh;
120
121 /**
122  * Pointer to the currently actively running request,
123  * NULL if none is running.
124  */
125 static struct DatastoreRequestQueue *drq_running;
126
127
128 /**
129  * Run the next DS request in our queue, we're done with the current
130  * one.
131  */
132 static void
133 next_ds_request ();
134
135
136 /**
137  * Wrapper for the datastore get operation.  Makes sure to trigger the
138  * next datastore operation in the queue once the operation is
139  * complete.
140  *
141  * @param cls our 'struct DatastoreRequestQueue*'
142  * @param key key for the content
143  * @param size number of bytes in data
144  * @param data content stored
145  * @param type type of the content
146  * @param priority priority of the content
147  * @param anonymity anonymity-level for the content
148  * @param expiration expiration time for the content
149  * @param uid unique identifier for the datum;
150  *        maybe 0 if no unique identifier is available
151  */
152 static void
153 get_iterator (void *cls,
154               const GNUNET_HashCode * key,
155               uint32_t size,
156               const void *data,
157               uint32_t type,
158               uint32_t priority,
159               uint32_t anonymity,
160               struct GNUNET_TIME_Absolute
161               expiration, 
162               uint64_t uid)
163 {
164   struct DatastoreRequestQueue *gc = cls;
165
166   if (gc->iter == NULL) 
167     {
168       /* stop the iteration */
169 #if DEBUG_DRQ
170       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
171                   "Iteration terminated\n");
172 #endif
173       if (key != NULL)
174         GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
175     }
176   else
177     {
178 #if DEBUG_DRQ
179       if (key != NULL)
180         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
181                     "Iteration produced %u-byte result for `%s'\n",
182                     size,
183                     GNUNET_h2s (key));
184 #endif
185       gc->iter (gc->iter_cls,
186                 key, size, data, type,
187                 priority, anonymity, expiration, uid);
188     }
189   if (key == NULL)
190     {
191 #if DEBUG_DRQ
192       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
193                   "Iteration completed\n");
194 #endif
195       GNUNET_assert (gc == drq_running);
196       GNUNET_free (gc);
197       drq_running = NULL;
198       next_ds_request ();
199     }
200 }
201
202
203 /**
204  * A datastore request can be run right now.  Run it.
205  *
206  * @param cls closure (of type "struct DatastoreRequestQueue*")
207  * @param tc task context, unused
208  */
209 static void
210 run_next_request (void *cls,
211                   const struct GNUNET_SCHEDULER_TaskContext *tc)
212 {
213   struct DatastoreRequestQueue *gc = cls;
214
215   gc->task = GNUNET_SCHEDULER_NO_TASK;
216 #if DEBUG_DRQ
217   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
218               "Running datastore request for `%s' of type %u\n",
219               GNUNET_h2s (&gc->key),
220               gc->type);
221 #endif
222   GNUNET_DATASTORE_get (dsh, &gc->key, gc->type, 
223                         &get_iterator,
224                         gc,
225                         GNUNET_TIME_absolute_get_remaining(gc->timeout));
226 }
227
228
229 /**
230  * Run the next DS request in our queue, we're done with the current
231  * one.
232  */
233 static void
234 next_ds_request ()
235 {
236   struct DatastoreRequestQueue *e;
237
238   GNUNET_free_non_null (drq_running);
239   drq_running = NULL;
240   e = drq_head;
241   if (e == NULL)
242     return;
243   GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
244   drq_running = e;
245   GNUNET_SCHEDULER_cancel (sched, e->task);
246   e->task = GNUNET_SCHEDULER_add_now (sched,
247                                       &run_next_request,
248                                       e);
249 }
250
251
252 /**
253  * A datastore request had to be timed out. 
254  *
255  * @param cls closure (unused)
256  * @param tc task context, unused
257  */
258 static void
259 timeout_ds_request (void *cls,
260                     const struct GNUNET_SCHEDULER_TaskContext *tc)
261 {
262   struct DatastoreRequestQueue *e = cls;
263
264 #if DEBUG_DRQ
265   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
266               "Datastore request timed out in queue before transmission\n");
267 #endif
268   e->task = GNUNET_SCHEDULER_NO_TASK;
269   GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
270   if (e->iter != NULL)
271     e->iter (e->iter_cls,
272              NULL, 0, NULL, 0, 0, 0, 
273              GNUNET_TIME_UNIT_ZERO_ABS, 0);
274   GNUNET_free (e);  
275 }
276
277
278 /**
279  * Task run during shutdown.
280  *
281  * @param cls unused
282  * @param tc unused
283  */
284 static void
285 shutdown_task (void *cls,
286                const struct GNUNET_SCHEDULER_TaskContext *tc)
287 {
288   struct DatastoreRequestQueue *drq;
289
290 #if DEBUG_DRQ
291   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
292               "DRQ shutdown initiated\n");
293 #endif
294   GNUNET_assert (NULL != dsh);
295   GNUNET_DATASTORE_disconnect (dsh,
296                                GNUNET_NO);
297   dsh = NULL;
298   while (NULL != (drq = drq_head))
299     {
300       drq_head = drq->next;
301       GNUNET_SCHEDULER_cancel (sched, drq->task);
302       if (drq->iter != NULL)
303         drq->iter (drq->iter_cls,
304                    NULL, 0, NULL, 0, 0, 0, 
305                    GNUNET_TIME_UNIT_ZERO_ABS, 0);
306       GNUNET_free (drq);
307     }
308   drq_tail = NULL;
309 }
310
311
312 /**
313  * Iterate over the results for a particular key
314  * in the datastore.  The iterator will only be called
315  * once initially; if the first call did contain a
316  * result, further results can be obtained by calling
317  * "GNUNET_DATASTORE_get_next" with the given argument.
318  *
319  * @param key maybe NULL (to match all entries)
320  * @param type desired type, 0 for any
321  * @param iter function to call on each matching value;
322  *        will be called once with a NULL value at the end
323  * @param iter_cls closure for iter
324  * @param timeout how long to wait at most for a response
325  * @param immediate should this be queued immediately at
326  *        the head of the queue (irrespecitive of the timeout)?
327  */
328 struct DatastoreRequestQueue *
329 GNUNET_FS_drq_get (const GNUNET_HashCode * key,
330                    uint32_t type,
331                    GNUNET_DATASTORE_Iterator iter, 
332                    void *iter_cls,
333                    struct GNUNET_TIME_Relative timeout,
334                    int immediate)
335 {
336   struct DatastoreRequestQueue *e;
337   struct DatastoreRequestQueue *bef;
338
339 #if DEBUG_DRQ
340   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
341               "DRQ receives request for `%s' of type %u\n",
342               GNUNET_h2s (key),
343               type);
344 #endif
345   e = GNUNET_malloc (sizeof (struct DatastoreRequestQueue));
346   e->timeout = GNUNET_TIME_relative_to_absolute (timeout);
347   e->forced_head = immediate;
348   e->key = *key;
349   e->type = type;
350   e->iter = iter;
351   e->iter_cls = iter_cls;
352   e->timeout = GNUNET_TIME_relative_to_absolute (timeout);
353   if (GNUNET_YES == immediate)
354     {
355       /* local request, highest prio, put at head of queue
356          regardless of deadline */
357       bef = NULL;
358     }
359   else
360     {
361       bef = drq_tail;
362       while ( (NULL != bef) &&
363               (e->timeout.value < bef->timeout.value) &&
364               (GNUNET_YES != e->forced_head) )
365         bef = bef->prev;
366     }
367   GNUNET_CONTAINER_DLL_insert_after (drq_head, drq_tail, bef, e);
368   e->task = GNUNET_SCHEDULER_add_delayed (sched,
369                                           timeout,
370                                           &timeout_ds_request,
371                                           e);
372   if (drq_running == NULL)
373     next_ds_request ();
374   return e;                                    
375 }
376
377
378 /**
379  * Cancel the given operation.
380  *
381  * @param drq the queued operation (must not have been
382  *        triggered so far)
383  */
384 void
385 GNUNET_FS_drq_get_cancel (struct DatastoreRequestQueue *drq)
386 {
387 #if DEBUG_DRQ
388   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
389               "DRQ receives request cancellation request\n");
390 #endif
391   if (drq == drq_running)
392     {
393       /* 'DATASTORE_get' has already been started (and this call might
394          actually be be legal since it is possible that the client has
395          not yet received any calls to its the iterator; so we need to
396          cancel somehow; we do this by zeroing the 'iter' field, which
397          stops the iteration */
398       drq_running->iter = NULL;
399     }
400   else
401     {
402       GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, drq);
403       GNUNET_SCHEDULER_cancel (sched, drq->task);
404       GNUNET_free (drq);
405     }
406 }
407
408
409 /**
410  * Function called to trigger obtaining the next result
411  * from the datastore.
412  * 
413  * @param more GNUNET_YES to get more results, GNUNET_NO to abort
414  *        iteration (with a final call to "iter" with key/data == NULL).
415  */
416 void
417 GNUNET_FS_drq_get_next (int more)
418 {
419 #if DEBUG_DRQ
420   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
421               "DRQ receives request for next result (more is %d)\n",
422               more);
423 #endif
424   GNUNET_DATASTORE_get_next (dsh, more);
425 }
426
427
428 /**
429  * Closure for 'drq_remove_cont'.
430  */
431 struct RemoveContext
432 {
433   struct GNUNET_DATASTORE_Handle *rmdsh; 
434   GNUNET_DATASTORE_ContinuationWithStatus cont;
435   void *cont_cls;
436 };
437
438
439 static void 
440 drq_remove_cont (void *cls,
441                  int success,
442                  const char *msg)
443 {
444   struct RemoveContext *rc = cls;
445
446   rc->cont (rc->cont_cls,
447             success,
448             msg);
449   GNUNET_DATASTORE_disconnect (rc->rmdsh, GNUNET_NO);
450   GNUNET_free (rc);
451 }
452
453
454 /**
455  * Explicitly remove some content from the database.
456  * The "cont"inuation will be called with status
457  * "GNUNET_OK" if content was removed, "GNUNET_NO"
458  * if no matching entry was found and "GNUNET_SYSERR"
459  * on all other types of errors.
460  *
461  * @param key key for the value
462  * @param size number of bytes in data
463  * @param data content stored
464  * @param cont continuation to call when done
465  * @param cont_cls closure for cont
466  * @param timeout how long to wait at most for a response
467  */
468 void
469 GNUNET_FS_drq_remove (const GNUNET_HashCode *key,
470                       uint32_t size, const void *data,
471                       GNUNET_DATASTORE_ContinuationWithStatus cont,
472                       void *cont_cls,
473                       struct GNUNET_TIME_Relative timeout)
474 {
475   struct GNUNET_DATASTORE_Handle *rmdsh; 
476   struct RemoveContext *rc;
477
478   rmdsh = GNUNET_DATASTORE_connect (cfg,
479                                     sched);
480   if (rmdsh == NULL)
481     {
482       GNUNET_break (0);
483       cont (cont_cls,
484             GNUNET_SYSERR,
485             _("Failed to connect to datastore"));
486       return;
487     }
488   rc = GNUNET_malloc (sizeof (struct RemoveContext));
489   rc->cont = cont;
490   rc->cont_cls = cont_cls;
491   rc->rmdsh = rmdsh;
492   GNUNET_DATASTORE_remove (rmdsh, key, size, data,
493                            &drq_remove_cont, 
494                            rc, timeout);
495 }
496
497
498 /**
499  * Setup datastore request queues.
500  * 
501  * @param s scheduler to use
502  * @param c configuration to use
503  * @return GNUNET_OK on success
504  */
505 int 
506 GNUNET_FS_drq_init (struct GNUNET_SCHEDULER_Handle *s,
507                     const struct GNUNET_CONFIGURATION_Handle *c)
508 {
509   sched = s;
510   cfg = c;
511   dsh = GNUNET_DATASTORE_connect (cfg,
512                                   sched);
513   if (NULL == dsh)
514     {
515       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
516                   _("Failed to connect to `%s' service.\n"),
517                   "datastore");
518       return GNUNET_SYSERR;
519     }
520   GNUNET_SCHEDULER_add_delayed (sched,
521                                 GNUNET_TIME_UNIT_FOREVER_REL,
522                                 &shutdown_task,
523                                 NULL);
524   return GNUNET_OK;
525 }
526
527
528 /* end of gnunet-service-fs_drq.c */