shutdown callback
[oweals/gnunet.git] / src / fs / gnunet-service-fs_drq.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_drq.c
23  * @brief queueing of requests to the datastore service
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet-service-fs_drq.h"
28
29 #define DEBUG_DRQ GNUNET_NO
30
31 /**
32  * Signature of a function that is called whenever a datastore
33  * request can be processed (or an entry put on the queue times out).
34  *
35  * @param cls closure
36  * @param ok GNUNET_OK if DS is ready, GNUNET_SYSERR on timeout
37  */
38 typedef void (*RequestFunction)(void *cls,
39                                 int ok);
40
41
42 /**
43  * Doubly-linked list of our requests for the datastore.
44  */
45 struct DatastoreRequestQueue
46 {
47
48   /**
49    * This is a doubly-linked list.
50    */
51   struct DatastoreRequestQueue *next;
52
53   /**
54    * This is a doubly-linked list.
55    */
56   struct DatastoreRequestQueue *prev;
57
58   /**
59    * Function to call for each entry.
60    */
61   GNUNET_DATASTORE_Iterator iter;
62
63   /**
64    * Closure for iter.
65    */
66   void *iter_cls;
67
68   /**
69    * Key we are doing the 'get' for.
70    */
71   GNUNET_HashCode key;
72
73   /**
74    * Timeout for this operation.
75    */
76   struct GNUNET_TIME_Absolute timeout;
77     
78   /**
79    * ID of task used for signaling timeout.
80    */
81   GNUNET_SCHEDULER_TaskIdentifier task;
82
83   /**
84    * Datastore entry type we are doing the 'get' for.
85    */
86   enum GNUNET_BLOCK_Type type;
87
88   /**
89    * Is this request at the head of the queue irrespective of its
90    * timeout value?
91    */
92   int forced_head;
93
94 };
95
96 /**
97  * Our scheduler.
98  */
99 static struct GNUNET_SCHEDULER_Handle *sched;
100
101 /**
102  * Our configuration.
103  */
104 static const struct GNUNET_CONFIGURATION_Handle *cfg;
105
106 /**
107  * Head of request queue for the datastore, sorted by timeout.
108  */
109 static struct DatastoreRequestQueue *drq_head;
110
111 /**
112  * Tail of request queue for the datastore.
113  */
114 static struct DatastoreRequestQueue *drq_tail;
115
116
117 /**
118  * Pointer to the currently actively running request,
119  * NULL if none is running.
120  */
121 static struct DatastoreRequestQueue *drq_running;
122
123
124 /**
125  * Run the next DS request in our queue, we're done with the current
126  * one.
127  */
128 static void
129 next_ds_request ();
130
131
132 /**
133  * Wrapper for the datastore get operation.  Makes sure to trigger the
134  * next datastore operation in the queue once the operation is
135  * complete.
136  *
137  * @param cls our 'struct DatastoreRequestQueue*'
138  * @param key key for the content
139  * @param size number of bytes in data
140  * @param data content stored
141  * @param type type of the content
142  * @param priority priority of the content
143  * @param anonymity anonymity-level for the content
144  * @param expiration expiration time for the content
145  * @param uid unique identifier for the datum;
146  *        maybe 0 if no unique identifier is available
147  */
148 static void
149 get_iterator (void *cls,
150               const GNUNET_HashCode * key,
151               uint32_t size,
152               const void *data,
153               enum GNUNET_BLOCK_Type type,
154               uint32_t priority,
155               uint32_t anonymity,
156               struct GNUNET_TIME_Absolute
157               expiration, 
158               uint64_t uid)
159 {
160   struct DatastoreRequestQueue *gc = cls;
161
162   if (gc->iter == NULL) 
163     {
164       /* stop the iteration */
165 #if DEBUG_DRQ
166       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
167                   "Iteration terminated\n");
168 #endif
169       if (key != NULL)
170         GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
171     }
172   else
173     {
174 #if DEBUG_DRQ
175       if (key != NULL)
176         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
177                     "Iteration produced %u-byte result for `%s'\n",
178                     size,
179                     GNUNET_h2s (key));
180 #endif
181       gc->iter (gc->iter_cls,
182                 key, size, data, type,
183                 priority, anonymity, expiration, uid);
184     }
185   if (key == NULL)
186     {
187 #if DEBUG_DRQ
188       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
189                   "Iteration completed\n");
190 #endif
191       GNUNET_assert (gc == drq_running);
192       GNUNET_free (gc);
193       drq_running = NULL;
194       next_ds_request ();
195     }
196 }
197
198
199 /**
200  * A datastore request can be run right now.  Run it.
201  *
202  * @param cls closure (of type "struct DatastoreRequestQueue*")
203  * @param tc task context, unused
204  */
205 static void
206 run_next_request (void *cls,
207                   const struct GNUNET_SCHEDULER_TaskContext *tc)
208 {
209   struct DatastoreRequestQueue *gc = cls;
210
211   gc->task = GNUNET_SCHEDULER_NO_TASK;
212 #if DEBUG_DRQ
213   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
214               "Running datastore request for `%s' of type %u\n",
215               GNUNET_h2s (&gc->key),
216               gc->type);
217 #endif
218   GNUNET_DATASTORE_get (dsh, 
219                         &gc->key,
220                         gc->type, 
221                         42 /* FIXME */, 64 /* FIXME */,
222                         GNUNET_TIME_absolute_get_remaining(gc->timeout),
223                         &get_iterator,
224                         gc);
225 }
226
227
228 /**
229  * Run the next DS request in our queue, we're done with the current
230  * one.
231  */
232 static void
233 next_ds_request ()
234 {
235   struct DatastoreRequestQueue *e;
236
237   GNUNET_free_non_null (drq_running);
238   drq_running = NULL;
239   e = drq_head;
240   if (e == NULL)
241     return;
242   GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
243   drq_running = e;
244   GNUNET_SCHEDULER_cancel (sched, e->task);
245   e->task = GNUNET_SCHEDULER_add_now (sched,
246                                       &run_next_request,
247                                       e);
248 }
249
250
251 /**
252  * A datastore request had to be timed out. 
253  *
254  * @param cls closure (unused)
255  * @param tc task context, unused
256  */
257 static void
258 timeout_ds_request (void *cls,
259                     const struct GNUNET_SCHEDULER_TaskContext *tc)
260 {
261   struct DatastoreRequestQueue *e = cls;
262
263 #if DEBUG_DRQ
264   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
265               "Datastore request timed out in queue before transmission\n");
266 #endif
267   e->task = GNUNET_SCHEDULER_NO_TASK;
268   GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
269   if (e->iter != NULL)
270     e->iter (e->iter_cls,
271              NULL, 0, NULL, 0, 0, 0, 
272              GNUNET_TIME_UNIT_ZERO_ABS, 0);
273   GNUNET_free (e);  
274 }
275
276
277 /**
278  * Task run during shutdown.
279  *
280  * @param cls unused
281  * @param tc unused
282  */
283 static void
284 shutdown_task (void *cls,
285                const struct GNUNET_SCHEDULER_TaskContext *tc)
286 {
287   struct DatastoreRequestQueue *drq;
288
289 #if DEBUG_DRQ
290   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
291               "DRQ shutdown initiated\n");
292 #endif
293   GNUNET_assert (NULL != dsh);
294   while (NULL != (drq = drq_head))
295     {
296       drq_head = drq->next;
297       GNUNET_SCHEDULER_cancel (sched, drq->task);
298       if (drq->iter != NULL)
299         drq->iter (drq->iter_cls,
300                    NULL, 0, NULL, 0, 0, 0, 
301                    GNUNET_TIME_UNIT_ZERO_ABS, 0);
302       GNUNET_free (drq);
303     }
304   drq_tail = NULL;
305   if (drq_running != NULL)
306     {
307       if (drq_running->task != GNUNET_SCHEDULER_NO_TASK)
308         {
309           GNUNET_SCHEDULER_cancel (sched,
310                                    drq_running->task);
311         }
312       if (drq_running->iter != NULL)
313         {
314           drq_running->iter (drq_running->iter_cls,
315                              NULL, 0, NULL, 0, 0, 0, 
316                              GNUNET_TIME_UNIT_ZERO_ABS, 0);
317         }
318       GNUNET_free (drq_running);
319       drq_running = NULL;
320     }
321 }
322
323
324 /**
325  * Iterate over the results for a particular key
326  * in the datastore.  The iterator will only be called
327  * once initially; if the first call did contain a
328  * result, further results can be obtained by calling
329  * "GNUNET_DATASTORE_get_next" with the given argument.
330  *
331  * @param key maybe NULL (to match all entries)
332  * @param type desired type, 0 for any
333  * @param iter function to call on each matching value;
334  *        will be called once with a NULL value at the end
335  * @param iter_cls closure for iter
336  * @param timeout how long to wait at most for a response
337  * @param immediate should this be queued immediately at
338  *        the head of the queue (irrespecitive of the timeout)?
339  */
340 struct DatastoreRequestQueue *
341 GNUNET_FS_drq_get (const GNUNET_HashCode * key,
342                    enum GNUNET_BLOCK_Type type,
343                    GNUNET_DATASTORE_Iterator iter, 
344                    void *iter_cls,
345                    struct GNUNET_TIME_Relative timeout,
346                    int immediate)
347 {
348   struct DatastoreRequestQueue *e;
349   struct DatastoreRequestQueue *bef;
350
351 #if DEBUG_DRQ
352   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
353               "DRQ receives request for `%s' of type %u\n",
354               GNUNET_h2s (key),
355               type);
356 #endif
357   e = GNUNET_malloc (sizeof (struct DatastoreRequestQueue));
358   e->timeout = GNUNET_TIME_relative_to_absolute (timeout);
359   e->forced_head = immediate;
360   e->key = *key;
361   e->type = type;
362   e->iter = iter;
363   e->iter_cls = iter_cls;
364   e->timeout = GNUNET_TIME_relative_to_absolute (timeout);
365   if (GNUNET_YES == immediate)
366     {
367       /* local request, highest prio, put at head of queue
368          regardless of deadline */
369       bef = NULL;
370     }
371   else
372     {
373       bef = drq_tail;
374       while ( (NULL != bef) &&
375               (e->timeout.value < bef->timeout.value) &&
376               (GNUNET_YES != e->forced_head) )
377         bef = bef->prev;
378     }
379   GNUNET_CONTAINER_DLL_insert_after (drq_head, drq_tail, bef, e);
380   e->task = GNUNET_SCHEDULER_add_delayed (sched,
381                                           timeout,
382                                           &timeout_ds_request,
383                                           e);
384   if (drq_running == NULL)
385     next_ds_request ();
386   return e;                                    
387 }
388
389
390 /**
391  * Cancel the given operation.
392  *
393  * @param drq the queued operation (must not have been
394  *        triggered so far)
395  */
396 void
397 GNUNET_FS_drq_get_cancel (struct DatastoreRequestQueue *drq)
398 {
399 #if DEBUG_DRQ
400   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
401               "DRQ receives request cancellation request\n");
402 #endif
403   if (drq == drq_running)
404     {
405       /* 'DATASTORE_get' has already been started (and this call might
406          actually be be legal since it is possible that the client has
407          not yet received any calls to its the iterator; so we need to
408          cancel somehow; we do this by zeroing the 'iter' field, which
409          stops the iteration */
410       drq_running->iter = NULL;
411     }
412   else
413     {
414       GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, drq);
415       GNUNET_SCHEDULER_cancel (sched, drq->task);
416       GNUNET_free (drq);
417     }
418 }
419
420
421 /**
422  * Function called to trigger obtaining the next result
423  * from the datastore.
424  * 
425  * @param more GNUNET_YES to get more results, GNUNET_NO to abort
426  *        iteration (with a final call to "iter" with key/data == NULL).
427  */
428 void
429 GNUNET_FS_drq_get_next (int more)
430 {
431 #if DEBUG_DRQ
432   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
433               "DRQ receives request for next result (more is %d)\n",
434               more);
435 #endif
436   GNUNET_DATASTORE_get_next (dsh, more);
437 }
438
439
440 /**
441  * Closure for 'drq_remove_cont'.
442  */
443 struct RemoveContext
444 {
445   struct GNUNET_DATASTORE_Handle *rmdsh; 
446   GNUNET_DATASTORE_ContinuationWithStatus cont;
447   void *cont_cls;
448 };
449
450
451 static void 
452 drq_remove_cont (void *cls,
453                  int success,
454                  const char *msg)
455 {
456   struct RemoveContext *rc = cls;
457
458   rc->cont (rc->cont_cls,
459             success,
460             msg);
461   GNUNET_free (rc);
462 }
463
464
465 /**
466  * Explicitly remove some content from the database.
467  * The "cont"inuation will be called with status
468  * "GNUNET_OK" if content was removed, "GNUNET_NO"
469  * if no matching entry was found and "GNUNET_SYSERR"
470  * on all other types of errors.
471  *
472  * @param key key for the value
473  * @param size number of bytes in data
474  * @param data content stored
475  * @param cont continuation to call when done
476  * @param cont_cls closure for cont
477  * @param timeout how long to wait at most for a response
478  */
479 void
480 GNUNET_FS_drq_remove (const GNUNET_HashCode *key,
481                       uint32_t size, const void *data,
482                       GNUNET_DATASTORE_ContinuationWithStatus cont,
483                       void *cont_cls,
484                       struct GNUNET_TIME_Relative timeout)
485 {
486   struct GNUNET_DATASTORE_Handle *rmdsh; 
487   struct RemoveContext *rc;
488
489   if (rmdsh == NULL)
490     {
491       GNUNET_break (0);
492       cont (cont_cls,
493             GNUNET_SYSERR,
494             _("Failed to connect to datastore"));
495       return;
496     }
497   rc = GNUNET_malloc (sizeof (struct RemoveContext));
498   rc->cont = cont;
499   rc->cont_cls = cont_cls;
500   rc->rmdsh = rmdsh;
501   GNUNET_DATASTORE_remove (rmdsh, key, size, data,
502                            -3, 128,
503                            timeout,
504                            &drq_remove_cont, 
505                            rc);
506 }
507
508
509 /**
510  * Setup datastore request queues.
511  * 
512  * @param s scheduler to use
513  * @param c configuration to use
514  * @return GNUNET_OK on success
515  */
516 int 
517 GNUNET_FS_drq_init (struct GNUNET_SCHEDULER_Handle *s,
518                     const struct GNUNET_CONFIGURATION_Handle *c)
519 {
520   sched = s;
521   cfg = c;
522   dsh = GNUNET_DATASTORE_connect (cfg,
523                                   sched);
524   if (NULL == dsh)
525     {
526       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
527                   _("Failed to connect to `%s' service.\n"),
528                   "datastore");
529       return GNUNET_SYSERR;
530     }
531   GNUNET_SCHEDULER_add_delayed (sched,
532                                 GNUNET_TIME_UNIT_FOREVER_REL,
533                                 &shutdown_task,
534                                 NULL);
535   return GNUNET_OK;
536 }
537
538
539 /* end of gnunet-service-fs_drq.c */