reorder
[oweals/gnunet.git] / src / fs / gnunet-service-fs_drq.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_drq.c
23  * @brief queueing of requests to the datastore service
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet-service-fs_drq.h"
28
29
30 /**
31  * Signature of a function that is called whenever a datastore
32  * request can be processed (or an entry put on the queue times out).
33  *
34  * @param cls closure
35  * @param ok GNUNET_OK if DS is ready, GNUNET_SYSERR on timeout
36  */
37 typedef void (*RequestFunction)(void *cls,
38                                 int ok);
39
40
41 /**
42  * Doubly-linked list of our requests for the datastore.
43  */
44 struct DatastoreRequestQueue
45 {
46
47   /**
48    * This is a doubly-linked list.
49    */
50   struct DatastoreRequestQueue *next;
51
52   /**
53    * This is a doubly-linked list.
54    */
55   struct DatastoreRequestQueue *prev;
56
57   /**
58    * Function to call (will issue the request).
59    */
60   RequestFunction req;
61
62   /**
63    * Closure for req.
64    */
65   void *req_cls;
66
67   /**
68    * When should this request time-out because we don't care anymore?
69    */
70   struct GNUNET_TIME_Absolute timeout;
71     
72   /**
73    * ID of task used for signaling timeout.
74    */
75   GNUNET_SCHEDULER_TaskIdentifier task;
76
77   /**
78    * Is this request at the head of the queue irrespective of its
79    * timeout value?
80    */
81   int forced_head;
82
83 };
84
85 /**
86  * Our scheduler.
87  */
88 static struct GNUNET_SCHEDULER_Handle *sched;
89
90 /**
91  * Our configuration.
92  */
93 static const struct GNUNET_CONFIGURATION_Handle *cfg;
94
95 /**
96  * Head of request queue for the datastore, sorted by timeout.
97  */
98 static struct DatastoreRequestQueue *drq_head;
99
100 /**
101  * Tail of request queue for the datastore.
102  */
103 static struct DatastoreRequestQueue *drq_tail;
104
105 /**
106  * Our connection to the datastore.
107  */
108 static struct GNUNET_DATASTORE_Handle *dsh;
109
110 /**
111  * Pointer to the currently actively running request,
112  * NULL if none is running.
113  */
114 static struct DatastoreRequestQueue *drq_running;
115
116
117 /**
118  * A datastore request had to be timed out. 
119  *
120  * @param cls closure (unused)
121  * @param tc task context, unused
122  */
123 static void
124 timeout_ds_request (void *cls,
125                     const struct GNUNET_SCHEDULER_TaskContext *tc)
126 {
127   struct DatastoreRequestQueue *e = cls;
128
129   e->task = GNUNET_SCHEDULER_NO_TASK;
130   GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
131   e->req (e->req_cls, GNUNET_NO);
132   GNUNET_free (e);  
133 }
134
135
136 /**
137  * A datastore request can be run right now.  Run it.
138  *
139  * @param cls closure (of type "struct DatastoreRequestQueue*")
140  * @param tc task context, unused
141  */
142 static void
143 run_next_request (void *cls,
144                   const struct GNUNET_SCHEDULER_TaskContext *tc)
145 {
146   struct DatastoreRequestQueue *e = cls;
147   
148   e->req (e->req_cls, GNUNET_YES);
149 }
150
151
152 /**
153  * Run the next DS request in our queue, we're done with the current
154  * one.
155  */
156 static void
157 next_ds_request ()
158 {
159   struct DatastoreRequestQueue *e;
160
161   GNUNET_free_non_null (drq_running);
162   drq_running = NULL;
163   e = drq_head;
164   if (e == NULL)
165     return;
166   GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
167   drq_running = e;
168   GNUNET_SCHEDULER_cancel (sched, e->task);
169   e->task = GNUNET_SCHEDULER_add_now (sched,
170                                       &run_next_request,
171                                       e);
172 }
173
174
175 /**
176  * Remove a pending request from the request queue.
177  *
178  * @param req request to remove
179  */
180 static void
181 dequeue_ds_request (struct DatastoreRequestQueue *req)  
182 {
183   GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, req);
184   GNUNET_SCHEDULER_cancel (sched, req->task);
185   GNUNET_free (req);
186 }
187
188
189 /**
190  * Queue a request for the datastore.
191  *
192  * @param deadline by when the request should run
193  * @param fun function to call once the request can be run
194  * @param fun_cls closure for fun
195  * @param immediate should this be queued immediately at
196  *        the head of the queue (irrespecitive of the deadline)?
197  * @return handle that can be used to dequeue the request
198  */
199 static struct DatastoreRequestQueue *
200 queue_ds_request (struct GNUNET_TIME_Relative deadline,
201                   RequestFunction fun,
202                   void *fun_cls,
203                   int immediate)
204 {
205   struct DatastoreRequestQueue *e;
206   struct DatastoreRequestQueue *bef;
207
208   e = GNUNET_malloc (sizeof (struct DatastoreRequestQueue));
209   e->timeout = GNUNET_TIME_relative_to_absolute (deadline);
210   e->req = fun;
211   e->req_cls = fun_cls;
212   e->forced_head = immediate;
213   if (GNUNET_YES == immediate)
214     {
215       /* local request, highest prio, put at head of queue
216          regardless of deadline */
217       bef = NULL;
218     }
219   else
220     {
221       bef = drq_tail;
222       while ( (NULL != bef) &&
223               (e->timeout.value < bef->timeout.value) &&
224               (GNUNET_YES != e->forced_head) )
225         bef = bef->prev;
226     }
227   GNUNET_CONTAINER_DLL_insert_after (drq_head, drq_tail, bef, e);
228   e->task = GNUNET_SCHEDULER_add_delayed (sched,
229                                           deadline,
230                                           &timeout_ds_request,
231                                           e);
232   if (drq_running == NULL)
233     next_ds_request ();
234   return e;                                    
235 }
236
237
238 /**
239  * Task run during shutdown.
240  *
241  * @param cls unused
242  * @param tc unused
243  */
244 static void
245 shutdown_task (void *cls,
246                const struct GNUNET_SCHEDULER_TaskContext *tc)
247 {
248   struct DatastoreRequestQueue *drq;
249
250   GNUNET_assert (NULL != dsh);
251   GNUNET_DATASTORE_disconnect (dsh,
252                                GNUNET_NO);
253   dsh = NULL;
254   while (NULL != (drq = drq_head))
255     {
256       drq_head = drq->next;
257       GNUNET_SCHEDULER_cancel (sched, drq->task);
258       drq->req (drq->req_cls, GNUNET_NO);
259       GNUNET_free (drq);
260     }
261   drq_tail = NULL;
262 }
263
264
265 /**
266  * Closure for 'do_get' and 'get_iterator'.
267  */
268 struct GetClosure
269 {
270   /**
271    * Key we are doing the 'get' for.
272    */
273   GNUNET_HashCode key;
274
275   /**
276    * Datastore entry type we are doing the 'get' for.
277    */
278   uint32_t type;
279
280   /**
281    * Function to call for each entry.
282    */
283   GNUNET_DATASTORE_Iterator iter;
284
285   /**
286    * Closure for iter.
287    */
288   void *iter_cls;
289
290   /**
291    * Timeout for this operation.
292    */
293   struct GNUNET_TIME_Absolute timeout;
294 };
295
296
297 /**
298  * Wrapper for the datastore get operation.  Makes sure to trigger the
299  * next datastore operation in the queue once the operation is
300  * complete.
301  *
302  * @param cls our 'struct GetClosure*'
303  * @param key key for the content
304  * @param size number of bytes in data
305  * @param data content stored
306  * @param type type of the content
307  * @param priority priority of the content
308  * @param anonymity anonymity-level for the content
309  * @param expiration expiration time for the content
310  * @param uid unique identifier for the datum;
311  *        maybe 0 if no unique identifier is available
312  */
313 static void
314 get_iterator (void *cls,
315               const GNUNET_HashCode * key,
316               uint32_t size,
317               const void *data,
318               uint32_t type,
319               uint32_t priority,
320               uint32_t anonymity,
321               struct GNUNET_TIME_Absolute
322               expiration, 
323               uint64_t uid)
324 {
325   struct GetClosure *gc = cls;
326
327   if (gc->iter == NULL) 
328     {
329       /* stop the iteration */
330       if (key != NULL)
331         GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
332     }
333   else
334     {
335       gc->iter (gc->iter_cls,
336                 key, size, data, type,
337                 priority, anonymity, expiration, uid);
338     }
339   if (key == NULL)
340     {
341       next_ds_request ();
342       GNUNET_free (gc);
343     }
344 }
345
346
347 /**
348  * We're at the head of the reqeust queue, execute the
349  * get operation (or signal error).
350  *
351  * @param cls the 'struct GetClosure'
352  * @param ok GNUNET_OK if we can run the GET, otherwise
353  *        we need to time out
354  */
355 static void
356 do_get (void *cls,
357         int ok)
358 {
359   struct GetClosure *gc = cls;
360
361   if (ok != GNUNET_OK)
362     {
363       if (gc->iter != NULL)
364         gc->iter (gc->iter_cls,
365                   NULL, 0, NULL, 0, 0, 0, 
366                   GNUNET_TIME_UNIT_ZERO_ABS, 0);
367       GNUNET_free (gc);
368       next_ds_request ();
369       return;
370     }
371   GNUNET_DATASTORE_get (dsh, &gc->key, gc->type, 
372                         &get_iterator,
373                         gc,
374                         GNUNET_TIME_absolute_get_remaining(gc->timeout));
375 }
376
377
378 /**
379  * Iterate over the results for a particular key
380  * in the datastore.  The iterator will only be called
381  * once initially; if the first call did contain a
382  * result, further results can be obtained by calling
383  * "GNUNET_DATASTORE_get_next" with the given argument.
384  *
385  * @param key maybe NULL (to match all entries)
386  * @param type desired type, 0 for any
387  * @param iter function to call on each matching value;
388  *        will be called once with a NULL value at the end
389  * @param iter_cls closure for iter
390  * @param timeout how long to wait at most for a response
391  * @param immediate should this be queued immediately at
392  *        the head of the queue (irrespecitive of the timeout)?
393  */
394 struct DatastoreRequestQueue *
395 GNUNET_FS_drq_get (const GNUNET_HashCode * key,
396                    uint32_t type,
397                    GNUNET_DATASTORE_Iterator iter, 
398                    void *iter_cls,
399                    struct GNUNET_TIME_Relative timeout,
400                    int immediate)
401 {
402   struct GetClosure *gc;
403
404   gc = GNUNET_malloc (sizeof (struct GetClosure));
405   gc->key = *key;
406   gc->type = type;
407   gc->iter = iter;
408   gc->iter_cls = iter_cls;
409   gc->timeout = GNUNET_TIME_relative_to_absolute (timeout);
410   return queue_ds_request (timeout,
411                            &do_get,
412                            gc,
413                            immediate);
414 }
415
416
417 /**
418  * Cancel the given operation.
419  *
420  * @param drq the queued operation (must not have been
421  *        triggered so far)
422  */
423 void
424 GNUNET_FS_drq_get_cancel (struct DatastoreRequestQueue *drq)
425 {
426   struct GetClosure *gc;
427   if (drq == drq_running)
428     {
429       /* 'DATASTORE_get' has already been started (and this call might
430          actually be be legal since it is possible that the client has
431          not yet received any calls to its the iterator; so we need
432          to cancel somehow; we do this by getting to the 'GetClosure'
433          and zeroing the 'iter' field, which stops the iteration */
434       gc = drq_running->req_cls;
435       gc->iter = NULL;
436     }
437   else
438     {
439       dequeue_ds_request (drq);  
440     }
441 }
442
443
444 /**
445  * Function called to trigger obtaining the next result
446  * from the datastore.
447  * 
448  * @param more GNUNET_YES to get more results, GNUNET_NO to abort
449  *        iteration (with a final call to "iter" with key/data == NULL).
450  */
451 void
452 GNUNET_FS_drq_get_next (int more)
453 {
454   GNUNET_DATASTORE_get_next (dsh, more);
455 }
456
457
458 /**
459  * Explicitly remove some content from the database.
460  * The "cont"inuation will be called with status
461  * "GNUNET_OK" if content was removed, "GNUNET_NO"
462  * if no matching entry was found and "GNUNET_SYSERR"
463  * on all other types of errors.
464  *
465  * @param key key for the value
466  * @param size number of bytes in data
467  * @param data content stored
468  * @param cont continuation to call when done
469  * @param cont_cls closure for cont
470  * @param timeout how long to wait at most for a response
471  */
472 void
473 GNUNET_FS_drq_remove (const GNUNET_HashCode *key,
474                       uint32_t size, const void *data,
475                       GNUNET_DATASTORE_ContinuationWithStatus cont,
476                       void *cont_cls,
477                       struct GNUNET_TIME_Relative timeout)
478 {
479   if (dsh == NULL)
480     {
481       GNUNET_break (0);
482       return;
483     }
484   GNUNET_DATASTORE_remove (dsh, key, size, data,
485                            cont, cont_cls, timeout);
486 }
487
488
489 /**
490  * Setup datastore request queues.
491  * 
492  * @param s scheduler to use
493  * @param c configuration to use
494  * @return GNUNET_OK on success
495  */
496 int 
497 GNUNET_FS_drq_init (struct GNUNET_SCHEDULER_Handle *s,
498                     const struct GNUNET_CONFIGURATION_Handle *c)
499 {
500   sched = s;
501   cfg = c;
502   dsh = GNUNET_DATASTORE_connect (cfg,
503                                   sched);
504   if (NULL == dsh)
505     {
506       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
507                   _("Failed to connect to `%s' service.\n"),
508                   "datastore");
509       return GNUNET_SYSERR;
510     }
511   GNUNET_SCHEDULER_add_delayed (sched,
512                                 GNUNET_TIME_UNIT_FOREVER_REL,
513                                 &shutdown_task,
514                                 NULL);
515   return GNUNET_OK;
516 }
517
518
519 /* end of gnunet-service-fs_drq.c */