fix
[oweals/gnunet.git] / src / fs / gnunet-service-fs_drq.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_drq.c
23  * @brief queueing of requests to the datastore service
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet-service-fs_drq.h"
28
29
30 /**
31  * Signature of a function that is called whenever a datastore
32  * request can be processed (or an entry put on the queue times out).
33  *
34  * @param cls closure
35  * @param ok GNUNET_OK if DS is ready, GNUNET_SYSERR on timeout
36  */
37 typedef void (*RequestFunction)(void *cls,
38                                 int ok);
39
40
41 /**
42  * Doubly-linked list of our requests for the datastore.
43  */
44 struct DatastoreRequestQueue
45 {
46
47   /**
48    * This is a doubly-linked list.
49    */
50   struct DatastoreRequestQueue *next;
51
52   /**
53    * This is a doubly-linked list.
54    */
55   struct DatastoreRequestQueue *prev;
56
57   /**
58    * Function to call for each entry.
59    */
60   GNUNET_DATASTORE_Iterator iter;
61
62   /**
63    * Closure for iter.
64    */
65   void *iter_cls;
66
67   /**
68    * Key we are doing the 'get' for.
69    */
70   GNUNET_HashCode key;
71
72   /**
73    * Timeout for this operation.
74    */
75   struct GNUNET_TIME_Absolute timeout;
76     
77   /**
78    * ID of task used for signaling timeout.
79    */
80   GNUNET_SCHEDULER_TaskIdentifier task;
81
82   /**
83    * Datastore entry type we are doing the 'get' for.
84    */
85   uint32_t type;
86
87   /**
88    * Is this request at the head of the queue irrespective of its
89    * timeout value?
90    */
91   int forced_head;
92
93 };
94
95 /**
96  * Our scheduler.
97  */
98 static struct GNUNET_SCHEDULER_Handle *sched;
99
100 /**
101  * Our configuration.
102  */
103 static const struct GNUNET_CONFIGURATION_Handle *cfg;
104
105 /**
106  * Head of request queue for the datastore, sorted by timeout.
107  */
108 static struct DatastoreRequestQueue *drq_head;
109
110 /**
111  * Tail of request queue for the datastore.
112  */
113 static struct DatastoreRequestQueue *drq_tail;
114
115 /**
116  * Our connection to the datastore.
117  */
118 static struct GNUNET_DATASTORE_Handle *dsh;
119
120 /**
121  * Pointer to the currently actively running request,
122  * NULL if none is running.
123  */
124 static struct DatastoreRequestQueue *drq_running;
125
126
127 /**
128  * Run the next DS request in our queue, we're done with the current
129  * one.
130  */
131 static void
132 next_ds_request ();
133
134
135 /**
136  * Wrapper for the datastore get operation.  Makes sure to trigger the
137  * next datastore operation in the queue once the operation is
138  * complete.
139  *
140  * @param cls our 'struct DatastoreRequestQueue*'
141  * @param key key for the content
142  * @param size number of bytes in data
143  * @param data content stored
144  * @param type type of the content
145  * @param priority priority of the content
146  * @param anonymity anonymity-level for the content
147  * @param expiration expiration time for the content
148  * @param uid unique identifier for the datum;
149  *        maybe 0 if no unique identifier is available
150  */
151 static void
152 get_iterator (void *cls,
153               const GNUNET_HashCode * key,
154               uint32_t size,
155               const void *data,
156               uint32_t type,
157               uint32_t priority,
158               uint32_t anonymity,
159               struct GNUNET_TIME_Absolute
160               expiration, 
161               uint64_t uid)
162 {
163   struct DatastoreRequestQueue *gc = cls;
164
165   if (gc->iter == NULL) 
166     {
167       /* stop the iteration */
168       if (key != NULL)
169         GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
170     }
171   else
172     {
173       gc->iter (gc->iter_cls,
174                 key, size, data, type,
175                 priority, anonymity, expiration, uid);
176     }
177   if (key == NULL)
178     {
179       GNUNET_assert (gc == drq_running);
180       GNUNET_free (gc);
181       drq_running = NULL;
182       next_ds_request ();
183     }
184 }
185
186
187 /**
188  * A datastore request can be run right now.  Run it.
189  *
190  * @param cls closure (of type "struct DatastoreRequestQueue*")
191  * @param tc task context, unused
192  */
193 static void
194 run_next_request (void *cls,
195                   const struct GNUNET_SCHEDULER_TaskContext *tc)
196 {
197   struct DatastoreRequestQueue *gc = cls;
198
199   gc->task = GNUNET_SCHEDULER_NO_TASK;
200   GNUNET_DATASTORE_get (dsh, &gc->key, gc->type, 
201                         &get_iterator,
202                         gc,
203                         GNUNET_TIME_absolute_get_remaining(gc->timeout));
204 }
205
206
207 /**
208  * Run the next DS request in our queue, we're done with the current
209  * one.
210  */
211 static void
212 next_ds_request ()
213 {
214   struct DatastoreRequestQueue *e;
215
216   GNUNET_free_non_null (drq_running);
217   drq_running = NULL;
218   e = drq_head;
219   if (e == NULL)
220     return;
221   GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
222   drq_running = e;
223   GNUNET_SCHEDULER_cancel (sched, e->task);
224   e->task = GNUNET_SCHEDULER_add_now (sched,
225                                       &run_next_request,
226                                       e);
227 }
228
229
230 /**
231  * A datastore request had to be timed out. 
232  *
233  * @param cls closure (unused)
234  * @param tc task context, unused
235  */
236 static void
237 timeout_ds_request (void *cls,
238                     const struct GNUNET_SCHEDULER_TaskContext *tc)
239 {
240   struct DatastoreRequestQueue *e = cls;
241
242   e->task = GNUNET_SCHEDULER_NO_TASK;
243   GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
244   if (e->iter != NULL)
245     e->iter (e->iter_cls,
246              NULL, 0, NULL, 0, 0, 0, 
247              GNUNET_TIME_UNIT_ZERO_ABS, 0);
248   GNUNET_free (e);  
249 }
250
251
252 /**
253  * Task run during shutdown.
254  *
255  * @param cls unused
256  * @param tc unused
257  */
258 static void
259 shutdown_task (void *cls,
260                const struct GNUNET_SCHEDULER_TaskContext *tc)
261 {
262   struct DatastoreRequestQueue *drq;
263
264   GNUNET_assert (NULL != dsh);
265   GNUNET_DATASTORE_disconnect (dsh,
266                                GNUNET_NO);
267   dsh = NULL;
268   while (NULL != (drq = drq_head))
269     {
270       drq_head = drq->next;
271       GNUNET_SCHEDULER_cancel (sched, drq->task);
272       if (drq->iter != NULL)
273         drq->iter (drq->iter_cls,
274                    NULL, 0, NULL, 0, 0, 0, 
275                    GNUNET_TIME_UNIT_ZERO_ABS, 0);
276       GNUNET_free (drq);
277     }
278   drq_tail = NULL;
279 }
280
281
282 /**
283  * Iterate over the results for a particular key
284  * in the datastore.  The iterator will only be called
285  * once initially; if the first call did contain a
286  * result, further results can be obtained by calling
287  * "GNUNET_DATASTORE_get_next" with the given argument.
288  *
289  * @param key maybe NULL (to match all entries)
290  * @param type desired type, 0 for any
291  * @param iter function to call on each matching value;
292  *        will be called once with a NULL value at the end
293  * @param iter_cls closure for iter
294  * @param timeout how long to wait at most for a response
295  * @param immediate should this be queued immediately at
296  *        the head of the queue (irrespecitive of the timeout)?
297  */
298 struct DatastoreRequestQueue *
299 GNUNET_FS_drq_get (const GNUNET_HashCode * key,
300                    uint32_t type,
301                    GNUNET_DATASTORE_Iterator iter, 
302                    void *iter_cls,
303                    struct GNUNET_TIME_Relative timeout,
304                    int immediate)
305 {
306   struct DatastoreRequestQueue *e;
307   struct DatastoreRequestQueue *bef;
308
309   e = GNUNET_malloc (sizeof (struct DatastoreRequestQueue));
310   e->timeout = GNUNET_TIME_relative_to_absolute (timeout);
311   e->forced_head = immediate;
312   e->key = *key;
313   e->type = type;
314   e->iter = iter;
315   e->iter_cls = iter_cls;
316   e->timeout = GNUNET_TIME_relative_to_absolute (timeout);
317   if (GNUNET_YES == immediate)
318     {
319       /* local request, highest prio, put at head of queue
320          regardless of deadline */
321       bef = NULL;
322     }
323   else
324     {
325       bef = drq_tail;
326       while ( (NULL != bef) &&
327               (e->timeout.value < bef->timeout.value) &&
328               (GNUNET_YES != e->forced_head) )
329         bef = bef->prev;
330     }
331   GNUNET_CONTAINER_DLL_insert_after (drq_head, drq_tail, bef, e);
332   e->task = GNUNET_SCHEDULER_add_delayed (sched,
333                                           timeout,
334                                           &timeout_ds_request,
335                                           e);
336   if (drq_running == NULL)
337     next_ds_request ();
338   return e;                                    
339 }
340
341
342 /**
343  * Cancel the given operation.
344  *
345  * @param drq the queued operation (must not have been
346  *        triggered so far)
347  */
348 void
349 GNUNET_FS_drq_get_cancel (struct DatastoreRequestQueue *drq)
350 {
351   if (drq == drq_running)
352     {
353       /* 'DATASTORE_get' has already been started (and this call might
354          actually be be legal since it is possible that the client has
355          not yet received any calls to its the iterator; so we need to
356          cancel somehow; we do this by zeroing the 'iter' field, which
357          stops the iteration */
358       drq_running->iter = NULL;
359     }
360   else
361     {
362       GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, drq);
363       GNUNET_SCHEDULER_cancel (sched, drq->task);
364       GNUNET_free (drq);
365     }
366 }
367
368
369 /**
370  * Function called to trigger obtaining the next result
371  * from the datastore.
372  * 
373  * @param more GNUNET_YES to get more results, GNUNET_NO to abort
374  *        iteration (with a final call to "iter" with key/data == NULL).
375  */
376 void
377 GNUNET_FS_drq_get_next (int more)
378 {
379   GNUNET_DATASTORE_get_next (dsh, more);
380 }
381
382
383 /**
384  * Explicitly remove some content from the database.
385  * The "cont"inuation will be called with status
386  * "GNUNET_OK" if content was removed, "GNUNET_NO"
387  * if no matching entry was found and "GNUNET_SYSERR"
388  * on all other types of errors.
389  *
390  * @param key key for the value
391  * @param size number of bytes in data
392  * @param data content stored
393  * @param cont continuation to call when done
394  * @param cont_cls closure for cont
395  * @param timeout how long to wait at most for a response
396  */
397 void
398 GNUNET_FS_drq_remove (const GNUNET_HashCode *key,
399                       uint32_t size, const void *data,
400                       GNUNET_DATASTORE_ContinuationWithStatus cont,
401                       void *cont_cls,
402                       struct GNUNET_TIME_Relative timeout)
403 {
404   if (dsh == NULL)
405     {
406       GNUNET_break (0);
407       return;
408     }
409   GNUNET_DATASTORE_remove (dsh, key, size, data,
410                            cont, cont_cls, timeout);
411 }
412
413
414 /**
415  * Setup datastore request queues.
416  * 
417  * @param s scheduler to use
418  * @param c configuration to use
419  * @return GNUNET_OK on success
420  */
421 int 
422 GNUNET_FS_drq_init (struct GNUNET_SCHEDULER_Handle *s,
423                     const struct GNUNET_CONFIGURATION_Handle *c)
424 {
425   sched = s;
426   cfg = c;
427   dsh = GNUNET_DATASTORE_connect (cfg,
428                                   sched);
429   if (NULL == dsh)
430     {
431       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
432                   _("Failed to connect to `%s' service.\n"),
433                   "datastore");
434       return GNUNET_SYSERR;
435     }
436   GNUNET_SCHEDULER_add_delayed (sched,
437                                 GNUNET_TIME_UNIT_FOREVER_REL,
438                                 &shutdown_task,
439                                 NULL);
440   return GNUNET_OK;
441 }
442
443
444 /* end of gnunet-service-fs_drq.c */