ds
[oweals/gnunet.git] / src / fs / gnunet-service-fs_drq.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_drq.c
23  * @brief queueing of requests to the datastore service
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet-service-fs_drq.h"
28
29 #define DEBUG_DRQ GNUNET_NO
30
31 /**
32  * Signature of a function that is called whenever a datastore
33  * request can be processed (or an entry put on the queue times out).
34  *
35  * @param cls closure
36  * @param ok GNUNET_OK if DS is ready, GNUNET_SYSERR on timeout
37  */
38 typedef void (*RequestFunction)(void *cls,
39                                 int ok);
40
41
42 /**
43  * Doubly-linked list of our requests for the datastore.
44  */
45 struct DatastoreRequestQueue
46 {
47
48   /**
49    * This is a doubly-linked list.
50    */
51   struct DatastoreRequestQueue *next;
52
53   /**
54    * This is a doubly-linked list.
55    */
56   struct DatastoreRequestQueue *prev;
57
58   /**
59    * Function to call for each entry.
60    */
61   GNUNET_DATASTORE_Iterator iter;
62
63   /**
64    * Closure for iter.
65    */
66   void *iter_cls;
67
68   /**
69    * Key we are doing the 'get' for.
70    */
71   GNUNET_HashCode key;
72
73   /**
74    * Timeout for this operation.
75    */
76   struct GNUNET_TIME_Absolute timeout;
77     
78   /**
79    * ID of task used for signaling timeout.
80    */
81   GNUNET_SCHEDULER_TaskIdentifier task;
82
83   /**
84    * Datastore entry type we are doing the 'get' for.
85    */
86   enum GNUNET_BLOCK_Type type;
87
88   /**
89    * Is this request at the head of the queue irrespective of its
90    * timeout value?
91    */
92   int forced_head;
93
94 };
95
96 /**
97  * Our scheduler.
98  */
99 static struct GNUNET_SCHEDULER_Handle *sched;
100
101 /**
102  * Our configuration.
103  */
104 static const struct GNUNET_CONFIGURATION_Handle *cfg;
105
106 /**
107  * Head of request queue for the datastore, sorted by timeout.
108  */
109 static struct DatastoreRequestQueue *drq_head;
110
111 /**
112  * Tail of request queue for the datastore.
113  */
114 static struct DatastoreRequestQueue *drq_tail;
115
116 /**
117  * Our connection to the datastore.
118  */
119 static struct GNUNET_DATASTORE_Handle *dsh;
120
121 /**
122  * Pointer to the currently actively running request,
123  * NULL if none is running.
124  */
125 static struct DatastoreRequestQueue *drq_running;
126
127
128 /**
129  * Run the next DS request in our queue, we're done with the current
130  * one.
131  */
132 static void
133 next_ds_request ();
134
135
136 /**
137  * Wrapper for the datastore get operation.  Makes sure to trigger the
138  * next datastore operation in the queue once the operation is
139  * complete.
140  *
141  * @param cls our 'struct DatastoreRequestQueue*'
142  * @param key key for the content
143  * @param size number of bytes in data
144  * @param data content stored
145  * @param type type of the content
146  * @param priority priority of the content
147  * @param anonymity anonymity-level for the content
148  * @param expiration expiration time for the content
149  * @param uid unique identifier for the datum;
150  *        maybe 0 if no unique identifier is available
151  */
152 static void
153 get_iterator (void *cls,
154               const GNUNET_HashCode * key,
155               uint32_t size,
156               const void *data,
157               enum GNUNET_BLOCK_Type type,
158               uint32_t priority,
159               uint32_t anonymity,
160               struct GNUNET_TIME_Absolute
161               expiration, 
162               uint64_t uid)
163 {
164   struct DatastoreRequestQueue *gc = cls;
165
166   if (gc->iter == NULL) 
167     {
168       /* stop the iteration */
169 #if DEBUG_DRQ
170       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
171                   "Iteration terminated\n");
172 #endif
173       if (key != NULL)
174         GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
175     }
176   else
177     {
178 #if DEBUG_DRQ
179       if (key != NULL)
180         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
181                     "Iteration produced %u-byte result for `%s'\n",
182                     size,
183                     GNUNET_h2s (key));
184 #endif
185       gc->iter (gc->iter_cls,
186                 key, size, data, type,
187                 priority, anonymity, expiration, uid);
188     }
189   if (key == NULL)
190     {
191 #if DEBUG_DRQ
192       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
193                   "Iteration completed\n");
194 #endif
195       GNUNET_assert (gc == drq_running);
196       GNUNET_free (gc);
197       drq_running = NULL;
198       next_ds_request ();
199     }
200 }
201
202
203 /**
204  * A datastore request can be run right now.  Run it.
205  *
206  * @param cls closure (of type "struct DatastoreRequestQueue*")
207  * @param tc task context, unused
208  */
209 static void
210 run_next_request (void *cls,
211                   const struct GNUNET_SCHEDULER_TaskContext *tc)
212 {
213   struct DatastoreRequestQueue *gc = cls;
214
215   gc->task = GNUNET_SCHEDULER_NO_TASK;
216 #if DEBUG_DRQ
217   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
218               "Running datastore request for `%s' of type %u\n",
219               GNUNET_h2s (&gc->key),
220               gc->type);
221 #endif
222   GNUNET_DATASTORE_get (dsh, 
223                         &gc->key,
224                         gc->type, 
225                         42 /* FIXME */, 64 /* FIXME */,
226                         GNUNET_TIME_absolute_get_remaining(gc->timeout),
227                         &get_iterator,
228                         gc);
229 }
230
231
232 /**
233  * Run the next DS request in our queue, we're done with the current
234  * one.
235  */
236 static void
237 next_ds_request ()
238 {
239   struct DatastoreRequestQueue *e;
240
241   GNUNET_free_non_null (drq_running);
242   drq_running = NULL;
243   e = drq_head;
244   if (e == NULL)
245     return;
246   GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
247   drq_running = e;
248   GNUNET_SCHEDULER_cancel (sched, e->task);
249   e->task = GNUNET_SCHEDULER_add_now (sched,
250                                       &run_next_request,
251                                       e);
252 }
253
254
255 /**
256  * A datastore request had to be timed out. 
257  *
258  * @param cls closure (unused)
259  * @param tc task context, unused
260  */
261 static void
262 timeout_ds_request (void *cls,
263                     const struct GNUNET_SCHEDULER_TaskContext *tc)
264 {
265   struct DatastoreRequestQueue *e = cls;
266
267 #if DEBUG_DRQ
268   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
269               "Datastore request timed out in queue before transmission\n");
270 #endif
271   e->task = GNUNET_SCHEDULER_NO_TASK;
272   GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
273   if (e->iter != NULL)
274     e->iter (e->iter_cls,
275              NULL, 0, NULL, 0, 0, 0, 
276              GNUNET_TIME_UNIT_ZERO_ABS, 0);
277   GNUNET_free (e);  
278 }
279
280
281 /**
282  * Task run during shutdown.
283  *
284  * @param cls unused
285  * @param tc unused
286  */
287 static void
288 shutdown_task (void *cls,
289                const struct GNUNET_SCHEDULER_TaskContext *tc)
290 {
291   struct DatastoreRequestQueue *drq;
292
293 #if DEBUG_DRQ
294   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
295               "DRQ shutdown initiated\n");
296 #endif
297   GNUNET_assert (NULL != dsh);
298   GNUNET_DATASTORE_disconnect (dsh,
299                                GNUNET_NO);
300   dsh = NULL;
301   while (NULL != (drq = drq_head))
302     {
303       drq_head = drq->next;
304       GNUNET_SCHEDULER_cancel (sched, drq->task);
305       if (drq->iter != NULL)
306         drq->iter (drq->iter_cls,
307                    NULL, 0, NULL, 0, 0, 0, 
308                    GNUNET_TIME_UNIT_ZERO_ABS, 0);
309       GNUNET_free (drq);
310     }
311   drq_tail = NULL;
312   if (drq_running != NULL)
313     {
314       if (drq_running->task != GNUNET_SCHEDULER_NO_TASK)
315         {
316           GNUNET_SCHEDULER_cancel (sched,
317                                    drq_running->task);
318         }
319       if (drq_running->iter != NULL)
320         {
321           drq_running->iter (drq_running->iter_cls,
322                              NULL, 0, NULL, 0, 0, 0, 
323                              GNUNET_TIME_UNIT_ZERO_ABS, 0);
324         }
325       GNUNET_free (drq_running);
326       drq_running = NULL;
327     }
328 }
329
330
331 /**
332  * Iterate over the results for a particular key
333  * in the datastore.  The iterator will only be called
334  * once initially; if the first call did contain a
335  * result, further results can be obtained by calling
336  * "GNUNET_DATASTORE_get_next" with the given argument.
337  *
338  * @param key maybe NULL (to match all entries)
339  * @param type desired type, 0 for any
340  * @param iter function to call on each matching value;
341  *        will be called once with a NULL value at the end
342  * @param iter_cls closure for iter
343  * @param timeout how long to wait at most for a response
344  * @param immediate should this be queued immediately at
345  *        the head of the queue (irrespecitive of the timeout)?
346  */
347 struct DatastoreRequestQueue *
348 GNUNET_FS_drq_get (const GNUNET_HashCode * key,
349                    enum GNUNET_BLOCK_Type type,
350                    GNUNET_DATASTORE_Iterator iter, 
351                    void *iter_cls,
352                    struct GNUNET_TIME_Relative timeout,
353                    int immediate)
354 {
355   struct DatastoreRequestQueue *e;
356   struct DatastoreRequestQueue *bef;
357
358 #if DEBUG_DRQ
359   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
360               "DRQ receives request for `%s' of type %u\n",
361               GNUNET_h2s (key),
362               type);
363 #endif
364   e = GNUNET_malloc (sizeof (struct DatastoreRequestQueue));
365   e->timeout = GNUNET_TIME_relative_to_absolute (timeout);
366   e->forced_head = immediate;
367   e->key = *key;
368   e->type = type;
369   e->iter = iter;
370   e->iter_cls = iter_cls;
371   e->timeout = GNUNET_TIME_relative_to_absolute (timeout);
372   if (GNUNET_YES == immediate)
373     {
374       /* local request, highest prio, put at head of queue
375          regardless of deadline */
376       bef = NULL;
377     }
378   else
379     {
380       bef = drq_tail;
381       while ( (NULL != bef) &&
382               (e->timeout.value < bef->timeout.value) &&
383               (GNUNET_YES != e->forced_head) )
384         bef = bef->prev;
385     }
386   GNUNET_CONTAINER_DLL_insert_after (drq_head, drq_tail, bef, e);
387   e->task = GNUNET_SCHEDULER_add_delayed (sched,
388                                           timeout,
389                                           &timeout_ds_request,
390                                           e);
391   if (drq_running == NULL)
392     next_ds_request ();
393   return e;                                    
394 }
395
396
397 /**
398  * Cancel the given operation.
399  *
400  * @param drq the queued operation (must not have been
401  *        triggered so far)
402  */
403 void
404 GNUNET_FS_drq_get_cancel (struct DatastoreRequestQueue *drq)
405 {
406 #if DEBUG_DRQ
407   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
408               "DRQ receives request cancellation request\n");
409 #endif
410   if (drq == drq_running)
411     {
412       /* 'DATASTORE_get' has already been started (and this call might
413          actually be be legal since it is possible that the client has
414          not yet received any calls to its the iterator; so we need to
415          cancel somehow; we do this by zeroing the 'iter' field, which
416          stops the iteration */
417       drq_running->iter = NULL;
418     }
419   else
420     {
421       GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, drq);
422       GNUNET_SCHEDULER_cancel (sched, drq->task);
423       GNUNET_free (drq);
424     }
425 }
426
427
428 /**
429  * Function called to trigger obtaining the next result
430  * from the datastore.
431  * 
432  * @param more GNUNET_YES to get more results, GNUNET_NO to abort
433  *        iteration (with a final call to "iter" with key/data == NULL).
434  */
435 void
436 GNUNET_FS_drq_get_next (int more)
437 {
438 #if DEBUG_DRQ
439   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
440               "DRQ receives request for next result (more is %d)\n",
441               more);
442 #endif
443   GNUNET_DATASTORE_get_next (dsh, more);
444 }
445
446
447 /**
448  * Closure for 'drq_remove_cont'.
449  */
450 struct RemoveContext
451 {
452   struct GNUNET_DATASTORE_Handle *rmdsh; 
453   GNUNET_DATASTORE_ContinuationWithStatus cont;
454   void *cont_cls;
455 };
456
457
458 static void 
459 drq_remove_cont (void *cls,
460                  int success,
461                  const char *msg)
462 {
463   struct RemoveContext *rc = cls;
464
465   rc->cont (rc->cont_cls,
466             success,
467             msg);
468   GNUNET_DATASTORE_disconnect (rc->rmdsh, GNUNET_NO);
469   GNUNET_free (rc);
470 }
471
472
473 /**
474  * Explicitly remove some content from the database.
475  * The "cont"inuation will be called with status
476  * "GNUNET_OK" if content was removed, "GNUNET_NO"
477  * if no matching entry was found and "GNUNET_SYSERR"
478  * on all other types of errors.
479  *
480  * @param key key for the value
481  * @param size number of bytes in data
482  * @param data content stored
483  * @param cont continuation to call when done
484  * @param cont_cls closure for cont
485  * @param timeout how long to wait at most for a response
486  */
487 void
488 GNUNET_FS_drq_remove (const GNUNET_HashCode *key,
489                       uint32_t size, const void *data,
490                       GNUNET_DATASTORE_ContinuationWithStatus cont,
491                       void *cont_cls,
492                       struct GNUNET_TIME_Relative timeout)
493 {
494   struct GNUNET_DATASTORE_Handle *rmdsh; 
495   struct RemoveContext *rc;
496
497   rmdsh = GNUNET_DATASTORE_connect (cfg,
498                                     sched);
499   if (rmdsh == NULL)
500     {
501       GNUNET_break (0);
502       cont (cont_cls,
503             GNUNET_SYSERR,
504             _("Failed to connect to datastore"));
505       return;
506     }
507   rc = GNUNET_malloc (sizeof (struct RemoveContext));
508   rc->cont = cont;
509   rc->cont_cls = cont_cls;
510   rc->rmdsh = rmdsh;
511   GNUNET_DATASTORE_remove (rmdsh, key, size, data,
512                            -3, 128,
513                            timeout,
514                            &drq_remove_cont, 
515                            rc);
516 }
517
518
519 /**
520  * Setup datastore request queues.
521  * 
522  * @param s scheduler to use
523  * @param c configuration to use
524  * @return GNUNET_OK on success
525  */
526 int 
527 GNUNET_FS_drq_init (struct GNUNET_SCHEDULER_Handle *s,
528                     const struct GNUNET_CONFIGURATION_Handle *c)
529 {
530   sched = s;
531   cfg = c;
532   dsh = GNUNET_DATASTORE_connect (cfg,
533                                   sched);
534   if (NULL == dsh)
535     {
536       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
537                   _("Failed to connect to `%s' service.\n"),
538                   "datastore");
539       return GNUNET_SYSERR;
540     }
541   GNUNET_SCHEDULER_add_delayed (sched,
542                                 GNUNET_TIME_UNIT_FOREVER_REL,
543                                 &shutdown_task,
544                                 NULL);
545   return GNUNET_OK;
546 }
547
548
549 /* end of gnunet-service-fs_drq.c */