fs hackery
[oweals/gnunet.git] / src / fs / gnunet-service-fs_drq.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_drq.c
23  * @brief queueing of requests to the datastore service
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet-service-fs_drq.h"
28
29 #define DEBUG_DRQ GNUNET_NO
30
31 /**
32  * Signature of a function that is called whenever a datastore
33  * request can be processed (or an entry put on the queue times out).
34  *
35  * @param cls closure
36  * @param ok GNUNET_OK if DS is ready, GNUNET_SYSERR on timeout
37  */
38 typedef void (*RequestFunction)(void *cls,
39                                 int ok);
40
41
42 /**
43  * Doubly-linked list of our requests for the datastore.
44  */
45 struct DatastoreRequestQueue
46 {
47
48   /**
49    * This is a doubly-linked list.
50    */
51   struct DatastoreRequestQueue *next;
52
53   /**
54    * This is a doubly-linked list.
55    */
56   struct DatastoreRequestQueue *prev;
57
58   /**
59    * Function to call for each entry.
60    */
61   GNUNET_DATASTORE_Iterator iter;
62
63   /**
64    * Closure for iter.
65    */
66   void *iter_cls;
67
68   /**
69    * Key we are doing the 'get' for.
70    */
71   GNUNET_HashCode key;
72
73   /**
74    * Timeout for this operation.
75    */
76   struct GNUNET_TIME_Absolute timeout;
77     
78   /**
79    * ID of task used for signaling timeout.
80    */
81   GNUNET_SCHEDULER_TaskIdentifier task;
82
83   /**
84    * Datastore entry type we are doing the 'get' for.
85    */
86   uint32_t type;
87
88   /**
89    * Is this request at the head of the queue irrespective of its
90    * timeout value?
91    */
92   int forced_head;
93
94 };
95
96 /**
97  * Our scheduler.
98  */
99 static struct GNUNET_SCHEDULER_Handle *sched;
100
101 /**
102  * Our configuration.
103  */
104 static const struct GNUNET_CONFIGURATION_Handle *cfg;
105
106 /**
107  * Head of request queue for the datastore, sorted by timeout.
108  */
109 static struct DatastoreRequestQueue *drq_head;
110
111 /**
112  * Tail of request queue for the datastore.
113  */
114 static struct DatastoreRequestQueue *drq_tail;
115
116 /**
117  * Our connection to the datastore.
118  */
119 static struct GNUNET_DATASTORE_Handle *dsh;
120
121 /**
122  * Pointer to the currently actively running request,
123  * NULL if none is running.
124  */
125 static struct DatastoreRequestQueue *drq_running;
126
127
128 /**
129  * Run the next DS request in our queue, we're done with the current
130  * one.
131  */
132 static void
133 next_ds_request ();
134
135
136 /**
137  * Wrapper for the datastore get operation.  Makes sure to trigger the
138  * next datastore operation in the queue once the operation is
139  * complete.
140  *
141  * @param cls our 'struct DatastoreRequestQueue*'
142  * @param key key for the content
143  * @param size number of bytes in data
144  * @param data content stored
145  * @param type type of the content
146  * @param priority priority of the content
147  * @param anonymity anonymity-level for the content
148  * @param expiration expiration time for the content
149  * @param uid unique identifier for the datum;
150  *        maybe 0 if no unique identifier is available
151  */
152 static void
153 get_iterator (void *cls,
154               const GNUNET_HashCode * key,
155               uint32_t size,
156               const void *data,
157               uint32_t type,
158               uint32_t priority,
159               uint32_t anonymity,
160               struct GNUNET_TIME_Absolute
161               expiration, 
162               uint64_t uid)
163 {
164   struct DatastoreRequestQueue *gc = cls;
165
166   if (gc->iter == NULL) 
167     {
168       /* stop the iteration */
169 #if DEBUG_DRQ
170       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
171                   "Iteration terminated\n");
172 #endif
173       if (key != NULL)
174         GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
175     }
176   else
177     {
178 #if DEBUG_DRQ
179       if (key != NULL)
180         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
181                     "Iteration produced %u-byte result for `%s'\n",
182                     size,
183                     GNUNET_h2s (key));
184 #endif
185       gc->iter (gc->iter_cls,
186                 key, size, data, type,
187                 priority, anonymity, expiration, uid);
188     }
189   if (key == NULL)
190     {
191 #if DEBUG_DRQ
192       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
193                   "Iteration completed\n");
194 #endif
195       GNUNET_assert (gc == drq_running);
196       GNUNET_free (gc);
197       drq_running = NULL;
198       next_ds_request ();
199     }
200 }
201
202
203 /**
204  * A datastore request can be run right now.  Run it.
205  *
206  * @param cls closure (of type "struct DatastoreRequestQueue*")
207  * @param tc task context, unused
208  */
209 static void
210 run_next_request (void *cls,
211                   const struct GNUNET_SCHEDULER_TaskContext *tc)
212 {
213   struct DatastoreRequestQueue *gc = cls;
214
215   gc->task = GNUNET_SCHEDULER_NO_TASK;
216 #if DEBUG_DRQ
217   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
218               "Running datastore request for `%s' of type %u\n",
219               GNUNET_h2s (&gc->key),
220               gc->type);
221 #endif
222   GNUNET_DATASTORE_get (dsh, 
223                         &gc->key,
224                         gc->type, 
225                         &get_iterator,
226                         gc,
227                         GNUNET_TIME_absolute_get_remaining(gc->timeout));
228 }
229
230
231 /**
232  * Run the next DS request in our queue, we're done with the current
233  * one.
234  */
235 static void
236 next_ds_request ()
237 {
238   struct DatastoreRequestQueue *e;
239
240   GNUNET_free_non_null (drq_running);
241   drq_running = NULL;
242   e = drq_head;
243   if (e == NULL)
244     return;
245   GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
246   drq_running = e;
247   GNUNET_SCHEDULER_cancel (sched, e->task);
248   e->task = GNUNET_SCHEDULER_add_now (sched,
249                                       &run_next_request,
250                                       e);
251 }
252
253
254 /**
255  * A datastore request had to be timed out. 
256  *
257  * @param cls closure (unused)
258  * @param tc task context, unused
259  */
260 static void
261 timeout_ds_request (void *cls,
262                     const struct GNUNET_SCHEDULER_TaskContext *tc)
263 {
264   struct DatastoreRequestQueue *e = cls;
265
266 #if DEBUG_DRQ
267   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
268               "Datastore request timed out in queue before transmission\n");
269 #endif
270   e->task = GNUNET_SCHEDULER_NO_TASK;
271   GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
272   if (e->iter != NULL)
273     e->iter (e->iter_cls,
274              NULL, 0, NULL, 0, 0, 0, 
275              GNUNET_TIME_UNIT_ZERO_ABS, 0);
276   GNUNET_free (e);  
277 }
278
279
280 /**
281  * Task run during shutdown.
282  *
283  * @param cls unused
284  * @param tc unused
285  */
286 static void
287 shutdown_task (void *cls,
288                const struct GNUNET_SCHEDULER_TaskContext *tc)
289 {
290   struct DatastoreRequestQueue *drq;
291
292 #if DEBUG_DRQ
293   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
294               "DRQ shutdown initiated\n");
295 #endif
296   GNUNET_assert (NULL != dsh);
297   GNUNET_DATASTORE_disconnect (dsh,
298                                GNUNET_NO);
299   dsh = NULL;
300   while (NULL != (drq = drq_head))
301     {
302       drq_head = drq->next;
303       GNUNET_SCHEDULER_cancel (sched, drq->task);
304       if (drq->iter != NULL)
305         drq->iter (drq->iter_cls,
306                    NULL, 0, NULL, 0, 0, 0, 
307                    GNUNET_TIME_UNIT_ZERO_ABS, 0);
308       GNUNET_free (drq);
309     }
310   drq_tail = NULL;
311   if (drq_running != NULL)
312     {
313       GNUNET_SCHEDULER_cancel (sched,
314                                drq_running->task);
315       drq_running->iter (drq_running->iter_cls,
316                          NULL, 0, NULL, 0, 0, 0, 
317                          GNUNET_TIME_UNIT_ZERO_ABS, 0);
318       GNUNET_free (drq_running);
319       drq_running = NULL;
320     }
321 }
322
323
324 /**
325  * Iterate over the results for a particular key
326  * in the datastore.  The iterator will only be called
327  * once initially; if the first call did contain a
328  * result, further results can be obtained by calling
329  * "GNUNET_DATASTORE_get_next" with the given argument.
330  *
331  * @param key maybe NULL (to match all entries)
332  * @param type desired type, 0 for any
333  * @param iter function to call on each matching value;
334  *        will be called once with a NULL value at the end
335  * @param iter_cls closure for iter
336  * @param timeout how long to wait at most for a response
337  * @param immediate should this be queued immediately at
338  *        the head of the queue (irrespecitive of the timeout)?
339  */
340 struct DatastoreRequestQueue *
341 GNUNET_FS_drq_get (const GNUNET_HashCode * key,
342                    uint32_t type,
343                    GNUNET_DATASTORE_Iterator iter, 
344                    void *iter_cls,
345                    struct GNUNET_TIME_Relative timeout,
346                    int immediate)
347 {
348   struct DatastoreRequestQueue *e;
349   struct DatastoreRequestQueue *bef;
350
351 #if DEBUG_DRQ
352   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
353               "DRQ receives request for `%s' of type %u\n",
354               GNUNET_h2s (key),
355               type);
356 #endif
357   e = GNUNET_malloc (sizeof (struct DatastoreRequestQueue));
358   e->timeout = GNUNET_TIME_relative_to_absolute (timeout);
359   e->forced_head = immediate;
360   e->key = *key;
361   e->type = type;
362   e->iter = iter;
363   e->iter_cls = iter_cls;
364   e->timeout = GNUNET_TIME_relative_to_absolute (timeout);
365   if (GNUNET_YES == immediate)
366     {
367       /* local request, highest prio, put at head of queue
368          regardless of deadline */
369       bef = NULL;
370     }
371   else
372     {
373       bef = drq_tail;
374       while ( (NULL != bef) &&
375               (e->timeout.value < bef->timeout.value) &&
376               (GNUNET_YES != e->forced_head) )
377         bef = bef->prev;
378     }
379   GNUNET_CONTAINER_DLL_insert_after (drq_head, drq_tail, bef, e);
380   e->task = GNUNET_SCHEDULER_add_delayed (sched,
381                                           timeout,
382                                           &timeout_ds_request,
383                                           e);
384   if (drq_running == NULL)
385     next_ds_request ();
386   return e;                                    
387 }
388
389
390 /**
391  * Cancel the given operation.
392  *
393  * @param drq the queued operation (must not have been
394  *        triggered so far)
395  */
396 void
397 GNUNET_FS_drq_get_cancel (struct DatastoreRequestQueue *drq)
398 {
399 #if DEBUG_DRQ
400   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
401               "DRQ receives request cancellation request\n");
402 #endif
403   if (drq == drq_running)
404     {
405       /* 'DATASTORE_get' has already been started (and this call might
406          actually be be legal since it is possible that the client has
407          not yet received any calls to its the iterator; so we need to
408          cancel somehow; we do this by zeroing the 'iter' field, which
409          stops the iteration */
410       drq_running->iter = NULL;
411     }
412   else
413     {
414       GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, drq);
415       GNUNET_SCHEDULER_cancel (sched, drq->task);
416       GNUNET_free (drq);
417     }
418 }
419
420
421 /**
422  * Function called to trigger obtaining the next result
423  * from the datastore.
424  * 
425  * @param more GNUNET_YES to get more results, GNUNET_NO to abort
426  *        iteration (with a final call to "iter" with key/data == NULL).
427  */
428 void
429 GNUNET_FS_drq_get_next (int more)
430 {
431 #if DEBUG_DRQ
432   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
433               "DRQ receives request for next result (more is %d)\n",
434               more);
435 #endif
436   GNUNET_DATASTORE_get_next (dsh, more);
437 }
438
439
440 /**
441  * Closure for 'drq_remove_cont'.
442  */
443 struct RemoveContext
444 {
445   struct GNUNET_DATASTORE_Handle *rmdsh; 
446   GNUNET_DATASTORE_ContinuationWithStatus cont;
447   void *cont_cls;
448 };
449
450
451 static void 
452 drq_remove_cont (void *cls,
453                  int success,
454                  const char *msg)
455 {
456   struct RemoveContext *rc = cls;
457
458   rc->cont (rc->cont_cls,
459             success,
460             msg);
461   GNUNET_DATASTORE_disconnect (rc->rmdsh, GNUNET_NO);
462   GNUNET_free (rc);
463 }
464
465
466 /**
467  * Explicitly remove some content from the database.
468  * The "cont"inuation will be called with status
469  * "GNUNET_OK" if content was removed, "GNUNET_NO"
470  * if no matching entry was found and "GNUNET_SYSERR"
471  * on all other types of errors.
472  *
473  * @param key key for the value
474  * @param size number of bytes in data
475  * @param data content stored
476  * @param cont continuation to call when done
477  * @param cont_cls closure for cont
478  * @param timeout how long to wait at most for a response
479  */
480 void
481 GNUNET_FS_drq_remove (const GNUNET_HashCode *key,
482                       uint32_t size, const void *data,
483                       GNUNET_DATASTORE_ContinuationWithStatus cont,
484                       void *cont_cls,
485                       struct GNUNET_TIME_Relative timeout)
486 {
487   struct GNUNET_DATASTORE_Handle *rmdsh; 
488   struct RemoveContext *rc;
489
490   rmdsh = GNUNET_DATASTORE_connect (cfg,
491                                     sched);
492   if (rmdsh == NULL)
493     {
494       GNUNET_break (0);
495       cont (cont_cls,
496             GNUNET_SYSERR,
497             _("Failed to connect to datastore"));
498       return;
499     }
500   rc = GNUNET_malloc (sizeof (struct RemoveContext));
501   rc->cont = cont;
502   rc->cont_cls = cont_cls;
503   rc->rmdsh = rmdsh;
504   GNUNET_DATASTORE_remove (rmdsh, key, size, data,
505                            &drq_remove_cont, 
506                            rc, timeout);
507 }
508
509
510 /**
511  * Setup datastore request queues.
512  * 
513  * @param s scheduler to use
514  * @param c configuration to use
515  * @return GNUNET_OK on success
516  */
517 int 
518 GNUNET_FS_drq_init (struct GNUNET_SCHEDULER_Handle *s,
519                     const struct GNUNET_CONFIGURATION_Handle *c)
520 {
521   sched = s;
522   cfg = c;
523   dsh = GNUNET_DATASTORE_connect (cfg,
524                                   sched);
525   if (NULL == dsh)
526     {
527       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
528                   _("Failed to connect to `%s' service.\n"),
529                   "datastore");
530       return GNUNET_SYSERR;
531     }
532   GNUNET_SCHEDULER_add_delayed (sched,
533                                 GNUNET_TIME_UNIT_FOREVER_REL,
534                                 &shutdown_task,
535                                 NULL);
536   return GNUNET_OK;
537 }
538
539
540 /* end of gnunet-service-fs_drq.c */