fixing drq code
[oweals/gnunet.git] / src / fs / gnunet-service-fs_drq.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_drq.c
23  * @brief queueing of requests to the datastore service
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet-service-fs_drq.h"
28
29
30 /**
31  * Signature of a function that is called whenever a datastore
32  * request can be processed (or an entry put on the queue times out).
33  *
34  * @param cls closure
35  * @param ok GNUNET_OK if DS is ready, GNUNET_SYSERR on timeout
36  */
37 typedef void (*RequestFunction)(void *cls,
38                                 int ok);
39
40
41 /**
42  * Doubly-linked list of our requests for the datastore.
43  */
44 struct DatastoreRequestQueue
45 {
46
47   /**
48    * This is a doubly-linked list.
49    */
50   struct DatastoreRequestQueue *next;
51
52   /**
53    * This is a doubly-linked list.
54    */
55   struct DatastoreRequestQueue *prev;
56
57   /**
58    * Function to call (will issue the request).
59    */
60   RequestFunction req;
61
62   /**
63    * Closure for req.
64    */
65   void *req_cls;
66
67   /**
68    * When should this request time-out because we don't care anymore?
69    */
70   struct GNUNET_TIME_Absolute timeout;
71     
72   /**
73    * ID of task used for signaling timeout.
74    */
75   GNUNET_SCHEDULER_TaskIdentifier task;
76
77   /**
78    * Is this request at the head of the queue irrespective of its
79    * timeout value?
80    */
81   int forced_head;
82
83 };
84
85 /**
86  * Our scheduler.
87  */
88 static struct GNUNET_SCHEDULER_Handle *sched;
89
90 /**
91  * Our configuration.
92  */
93 const struct GNUNET_CONFIGURATION_Handle *cfg;
94
95 /**
96  * Head of request queue for the datastore, sorted by timeout.
97  */
98 static struct DatastoreRequestQueue *drq_head;
99
100 /**
101  * Tail of request queue for the datastore.
102  */
103 static struct DatastoreRequestQueue *drq_tail;
104
105 /**
106  * Our connection to the datastore.
107  */
108 static struct GNUNET_DATASTORE_Handle *dsh;
109
110 /**
111  * Pointer to the currently actively running request,
112  * NULL if none is running.
113  */
114 static struct DatastoreRequestQueue *drq_running;
115
116
117 /**
118  * A datastore request had to be timed out. 
119  *
120  * @param cls closure (unused)
121  * @param tc task context, unused
122  */
123 static void
124 timeout_ds_request (void *cls,
125                     const struct GNUNET_SCHEDULER_TaskContext *tc)
126 {
127   struct DatastoreRequestQueue *e = cls;
128
129   e->task = GNUNET_SCHEDULER_NO_TASK;
130   GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
131   e->req (e->req_cls, GNUNET_NO);
132   GNUNET_free (e);  
133 }
134
135
136 /**
137  * A datastore request can be run right now.  Run it.
138  *
139  * @param cls closure (of type "struct DatastoreRequestQueue*")
140  * @param tc task context, unused
141  */
142 static void
143 run_next_request (void *cls,
144                   const struct GNUNET_SCHEDULER_TaskContext *tc)
145 {
146   struct DatastoreRequestQueue *e = cls;
147
148   GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
149   drq_running = e;
150   e->req (e->req_cls, GNUNET_YES);
151 }
152
153
154 /**
155  * Run the next DS request in our queue, we're done with the current
156  * one.
157  */
158 static void
159 next_ds_request ()
160 {
161   struct DatastoreRequestQueue *e;
162
163   GNUNET_free_non_null (drq_running);
164   drq_running = NULL;
165   e = drq_head;
166   if (e == NULL)
167     return;
168   GNUNET_SCHEDULER_cancel (sched, e->task);
169   e->task = GNUNET_SCHEDULER_add_now (sched,
170                                       &run_next_request,
171                                       e);
172 }
173
174
175 /**
176  * Remove a pending request from the request queue.
177  *
178  * @param req request to remove
179  */
180 static void
181 dequeue_ds_request (struct DatastoreRequestQueue *req)  
182 {
183   GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, req);
184   GNUNET_SCHEDULER_cancel (sched, req->task);
185   GNUNET_free (req);
186 }
187
188
189 /**
190  * Queue a request for the datastore.
191  *
192  * @param deadline by when the request should run
193  * @param fun function to call once the request can be run
194  * @param fun_cls closure for fun
195  * @param immediate should this be queued immediately at
196  *        the head of the queue (irrespecitive of the deadline)?
197  * @return handle that can be used to dequeue the request
198  */
199 static struct DatastoreRequestQueue *
200 queue_ds_request (struct GNUNET_TIME_Relative deadline,
201                   RequestFunction fun,
202                   void *fun_cls,
203                   int immediate)
204 {
205   struct DatastoreRequestQueue *e;
206   struct DatastoreRequestQueue *bef;
207
208   e = GNUNET_malloc (sizeof (struct DatastoreRequestQueue));
209   e->timeout = GNUNET_TIME_relative_to_absolute (deadline);
210   e->req = fun;
211   e->req_cls = fun_cls;
212   e->forced_head = immediate;
213   if (GNUNET_YES == immediate)
214     {
215       /* local request, highest prio, put at head of queue
216          regardless of deadline */
217       bef = NULL;
218     }
219   else
220     {
221       bef = drq_tail;
222       while ( (NULL != bef) &&
223               (e->timeout.value < bef->timeout.value) &&
224               (GNUNET_YES != e->forced_head) )
225         bef = bef->prev;
226     }
227   GNUNET_CONTAINER_DLL_insert_after (drq_head, drq_tail, bef, e);
228   e->task = GNUNET_SCHEDULER_add_delayed (sched,
229                                           deadline,
230                                           &timeout_ds_request,
231                                           e);
232   if (drq_running == NULL)
233     next_ds_request ();
234   return e;                                    
235 }
236
237
238 /**
239  * Task run during shutdown.
240  *
241  * @param cls unused
242  * @param tc unused
243  */
244 static void
245 shutdown_task (void *cls,
246                const struct GNUNET_SCHEDULER_TaskContext *tc)
247 {
248   struct DatastoreRequestQueue *drq;
249
250   GNUNET_assert (NULL != dsh);
251   GNUNET_DATASTORE_disconnect (dsh,
252                                GNUNET_NO);
253   dsh = NULL;
254   while (NULL != (drq = drq_head))
255     {
256       drq_head = drq->next;
257       GNUNET_SCHEDULER_cancel (sched, drq->task);
258       drq->req (drq->req_cls, GNUNET_NO);
259       GNUNET_free (drq);
260     }
261   drq_tail = NULL;
262 }
263
264
265 /**
266  * Closure for 'do_get' and 'get_iterator'.
267  */
268 struct GetClosure
269 {
270   /**
271    * Key we are doing the 'get' for.
272    */
273   GNUNET_HashCode key;
274
275   /**
276    * Datastore entry type we are doing the 'get' for.
277    */
278   uint32_t type;
279
280   /**
281    * Function to call for each entry.
282    */
283   GNUNET_DATASTORE_Iterator iter;
284
285   /**
286    * Closure for iter.
287    */
288   void *iter_cls;
289
290   /**
291    * Timeout for this operation.
292    */
293   struct GNUNET_TIME_Absolute timeout;
294 };
295
296
297 /**
298  * Wrapper for the datastore get operation.  Makes sure to trigger the
299  * next datastore operation in the queue once the operation is
300  * complete.
301  *
302  * @param cls our 'struct GetClosure*'
303  */
304 static void
305 get_iterator (void *cls,
306               const GNUNET_HashCode * key,
307               uint32_t size,
308               const void *data,
309               uint32_t type,
310               uint32_t priority,
311               uint32_t anonymity,
312               struct GNUNET_TIME_Absolute
313               expiration, 
314               uint64_t uid)
315 {
316   struct GetClosure *gc = cls;
317
318   if (gc->iter == NULL)
319     {
320       /* stop the iteration */
321       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
322     }
323   else
324     {
325       gc->iter (gc->iter_cls,
326                 key, size, data, type,
327                 priority, anonymity, expiration, uid);
328     }
329   if (key == NULL)
330     {
331       next_ds_request ();
332       GNUNET_free (gc);
333     }
334 }
335
336
337 /**
338  * We're at the head of the reqeust queue, execute the
339  * get operation (or signal error).
340  *
341  * @param cls the 'struct GetClosure'
342  * @param ok GNUNET_OK if we can run the GET, otherwise
343  *        we need to time out
344  */
345 static void
346 do_get (void *cls,
347         int ok)
348 {
349   struct GetClosure *gc = cls;
350
351   if (ok != GNUNET_OK)
352     {
353       if (gc->iter != NULL)
354         gc->iter (gc->iter_cls,
355                   NULL, 0, NULL, 0, 0, 0, 
356                   GNUNET_TIME_UNIT_ZERO_ABS, 0);
357       GNUNET_free (gc);
358       next_ds_request ();
359       return;
360     }
361   GNUNET_DATASTORE_get (dsh, &gc->key, gc->type, 
362                         &get_iterator,
363                         gc,
364                         GNUNET_TIME_absolute_get_remaining(gc->timeout));
365 }
366
367
368 /**
369  * Iterate over the results for a particular key
370  * in the datastore.  The iterator will only be called
371  * once initially; if the first call did contain a
372  * result, further results can be obtained by calling
373  * "GNUNET_DATASTORE_get_next" with the given argument.
374  *
375  * @param key maybe NULL (to match all entries)
376  * @param type desired type, 0 for any
377  * @param iter function to call on each matching value;
378  *        will be called once with a NULL value at the end
379  * @param iter_cls closure for iter
380  * @param timeout how long to wait at most for a response
381  * @param immediate should this be queued immediately at
382  *        the head of the queue (irrespecitive of the timeout)?
383  */
384 struct DatastoreRequestQueue *
385 GNUNET_FS_drq_get (const GNUNET_HashCode * key,
386                    uint32_t type,
387                    GNUNET_DATASTORE_Iterator iter, 
388                    void *iter_cls,
389                    struct GNUNET_TIME_Relative timeout,
390                    int immediate)
391 {
392   struct GetClosure *gc;
393
394   gc = GNUNET_malloc (sizeof (struct GetClosure));
395   gc->key = *key;
396   gc->type = type;
397   gc->iter = iter;
398   gc->iter_cls = iter_cls;
399   gc->timeout = GNUNET_TIME_relative_to_absolute (timeout);
400   return queue_ds_request (timeout,
401                            &do_get,
402                            gc,
403                            immediate);
404 }
405
406
407 /**
408  * Cancel the given operation.
409  *
410  * @param drq the queued operation (must not have been
411  *        triggered so far)
412  */
413 void
414 GNUNET_FS_drq_get_cancel (struct DatastoreRequestQueue *drq)
415 {
416   struct GetClosure *gc;
417   if (drq == drq_running)
418     {
419       /* 'DATASTORE_get' has already been started (and this call might
420          actually be be legal since it is possible that the client has
421          not yet received any calls to its the iterator; so we need
422          to cancel somehow; we do this by getting to the 'GetClosure'
423          and zeroing the 'iter' field, which stops the iteration */
424       gc = drq_running->req_cls;
425       gc->iter = NULL;
426     }
427   else
428     {
429       dequeue_ds_request (drq);  
430     }
431 }
432
433
434 /**
435  * Function called to trigger obtaining the next result
436  * from the datastore.
437  * 
438  * @param more GNUNET_YES to get more results, GNUNET_NO to abort
439  *        iteration (with a final call to "iter" with key/data == NULL).
440  */
441 void
442 GNUNET_FS_drq_get_next (int more)
443 {
444   GNUNET_DATASTORE_get_next (dsh, more);
445 }
446
447
448 /**
449  * Explicitly remove some content from the database.
450  * The "cont"inuation will be called with status
451  * "GNUNET_OK" if content was removed, "GNUNET_NO"
452  * if no matching entry was found and "GNUNET_SYSERR"
453  * on all other types of errors.
454  *
455  * @param key key for the value
456  * @param size number of bytes in data
457  * @param data content stored
458  * @param cont continuation to call when done
459  * @param cont_cls closure for cont
460  * @param timeout how long to wait at most for a response
461  */
462 void
463 GNUNET_FS_drq_remove (const GNUNET_HashCode *key,
464                       uint32_t size, const void *data,
465                       GNUNET_DATASTORE_ContinuationWithStatus cont,
466                       void *cont_cls,
467                       struct GNUNET_TIME_Relative timeout)
468 {
469   if (dsh == NULL)
470     {
471       GNUNET_break (0);
472       return;
473     }
474   GNUNET_DATASTORE_remove (dsh, key, size, data,
475                            cont, cont_cls, timeout);
476 }
477
478
479 /**
480  * Setup datastore request queues.
481  * 
482  * @param s scheduler to use
483  * @param c configuration to use
484  * @return GNUNET_OK on success
485  */
486 int 
487 GNUNET_FS_drq_init (struct GNUNET_SCHEDULER_Handle *s,
488                     const struct GNUNET_CONFIGURATION_Handle *c)
489 {
490   sched = s;
491   cfg = c;
492   dsh = GNUNET_DATASTORE_connect (cfg,
493                                   sched);
494   if (NULL == dsh)
495     {
496       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
497                   _("Failed to connect to `%s' service.\n"),
498                   "datastore");
499       return GNUNET_SYSERR;
500     }
501   GNUNET_SCHEDULER_add_delayed (sched,
502                                 GNUNET_TIME_UNIT_FOREVER_REL,
503                                 &shutdown_task,
504                                 NULL);
505   return GNUNET_OK;
506 }
507
508
509 /* end of gnunet-service-fs_drq.c */