-fix leak
[oweals/gnunet.git] / src / peerstore / gnunet-service-peerstore.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2014, 2015, 2016 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file peerstore/gnunet-service-peerstore.c
23  * @brief peerstore service implementation
24  * @author Omar Tarabai
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "peerstore.h"
29 #include "gnunet_peerstore_plugin.h"
30 #include "peerstore_common.h"
31
32
33 /**
34  * Interval for expired records cleanup (in seconds)
35  */
36 #define EXPIRED_RECORDS_CLEANUP_INTERVAL 300    /* 5mins */
37
38 /**
39  * Our configuration.
40  */
41 static const struct GNUNET_CONFIGURATION_Handle *cfg;
42
43 /**
44  * Database plugin library name
45  */
46 static char *db_lib_name;
47
48 /**
49  * Database handle
50  */
51 static struct GNUNET_PEERSTORE_PluginFunctions *db;
52
53 /**
54  * Hashmap with all watch requests
55  */
56 static struct GNUNET_CONTAINER_MultiHashMap *watchers;
57
58 /**
59  * Task run to clean up expired records.
60  */
61 static struct GNUNET_SCHEDULER_Task *expire_task;
62
63 /**
64  * Are we in the process of shutting down the service? #GNUNET_YES / #GNUNET_NO
65  */
66 static int in_shutdown;
67
68 /**
69  * Number of connected clients.
70  */
71 static unsigned int num_clients;
72
73
74 /**
75  * Perform the actual shutdown operations
76  */
77 static void
78 do_shutdown ()
79 {
80   if (NULL != db_lib_name)
81   {
82     GNUNET_break (NULL ==
83                   GNUNET_PLUGIN_unload (db_lib_name,
84                                         db));
85     GNUNET_free (db_lib_name);
86     db_lib_name = NULL;
87   }
88   if (NULL != watchers)
89   {
90     GNUNET_CONTAINER_multihashmap_destroy (watchers);
91     watchers = NULL;
92   }
93   if (NULL != expire_task)
94   {
95     GNUNET_SCHEDULER_cancel (expire_task);
96     expire_task = NULL;
97   }
98   GNUNET_SCHEDULER_shutdown ();
99 }
100
101
102 /**
103  * Task run during shutdown.
104  *
105  * @param cls unused
106  */
107 static void
108 shutdown_task (void *cls)
109 {
110   in_shutdown = GNUNET_YES;
111   if (0 == num_clients)      /* Only when no connected clients. */
112     do_shutdown ();
113 }
114
115
116 /* Forward declaration */
117 static void
118 expire_records_continuation (void *cls,
119                              int success);
120
121
122 /**
123  * Deletes any expired records from storage
124  */
125 static void
126 cleanup_expired_records (void *cls)
127 {
128   int ret;
129
130   expire_task = NULL;
131   GNUNET_assert (NULL != db);
132   ret = db->expire_records (db->cls,
133                             GNUNET_TIME_absolute_get (),
134                             &expire_records_continuation,
135                             NULL);
136   if (GNUNET_OK != ret)
137   {
138     GNUNET_assert (NULL == expire_task);
139     expire_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
140                                                 (GNUNET_TIME_UNIT_SECONDS,
141                                                  EXPIRED_RECORDS_CLEANUP_INTERVAL),
142                                                 &cleanup_expired_records,
143                                                 NULL);
144   }
145 }
146
147
148 /**
149  * Continuation to expire_records called by the peerstore plugin
150  *
151  * @param cls unused
152  * @param success count of records deleted or #GNUNET_SYSERR
153  */
154 static void
155 expire_records_continuation (void *cls,
156                              int success)
157 {
158   if (success > 0)
159     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
160                 "%d records expired.\n",
161                 success);
162   GNUNET_assert (NULL == expire_task);
163   expire_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
164                                               (GNUNET_TIME_UNIT_SECONDS,
165                                                EXPIRED_RECORDS_CLEANUP_INTERVAL),
166                                               &cleanup_expired_records,
167                                               NULL);
168 }
169
170
171 /**
172  * A client disconnected.  Remove all of its data structure entries.
173  *
174  * @param cls closure, NULL
175  * @param client identification of the client
176  * @param mq the message queue
177  * @return
178  */
179 static void *
180 client_connect_cb (void *cls,
181                    struct GNUNET_SERVICE_Client *client,
182                    struct GNUNET_MQ_Handle *mq)
183 {
184   num_clients++;
185   return client;
186 }
187
188
189 /**
190  * Search for a disconnected client and remove it
191  *
192  * @param cls closuer, a `struct GNUNET_SERVICE_Client`
193  * @param key hash of record key
194  * @param value the watcher client, a `struct GNUNET_SERVICE_Client *`
195  * @return #GNUNET_OK to continue iterating
196  */
197 static int
198 client_disconnect_it (void *cls,
199                       const struct GNUNET_HashCode *key,
200                       void *value)
201 {
202   if (value == cls)
203   {
204     GNUNET_CONTAINER_multihashmap_remove (watchers,
205                                           key,
206                                           value);
207     num_clients++;
208   }
209   return GNUNET_OK;
210 }
211
212
213 /**
214  * A client disconnected.  Remove all of its data structure entries.
215  *
216  * @param cls closure, NULL
217  * @param client identification of the client
218  */
219 static void
220 client_disconnect_cb (void *cls,
221                       struct GNUNET_SERVICE_Client *client,
222                       void *app_cls)
223 {
224   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
225               "A client disconnected, cleaning up.\n");
226   if (NULL != watchers)
227     GNUNET_CONTAINER_multihashmap_iterate (watchers,
228                                            &client_disconnect_it,
229                                            client);
230   num_clients--;
231   if ( (0 == num_clients) &&
232        in_shutdown)
233     do_shutdown ();
234 }
235
236
237 /**
238  * Function called by for each matching record.
239  *
240  * @param cls closure
241  * @param record peerstore record found
242  * @param emsg error message or NULL if no errors
243  * @return #GNUNET_YES to continue iteration
244  */
245 static void
246 record_iterator (void *cls,
247                  const struct GNUNET_PEERSTORE_Record *record,
248                  const char *emsg)
249 {
250   struct GNUNET_PEERSTORE_Record *cls_record = cls;
251   struct GNUNET_MQ_Envelope *env;
252
253   if (NULL == record)
254   {
255     /* No more records */
256     struct GNUNET_MessageHeader *endmsg;
257
258     env = GNUNET_MQ_msg (endmsg,
259                          GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END);
260     GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (cls_record->client),
261                     env);
262     if (NULL == emsg)
263       GNUNET_SERVICE_client_continue (cls_record->client);
264     else
265       GNUNET_SERVICE_client_drop (cls_record->client);
266     PEERSTORE_destroy_record (cls_record);
267     return;
268   }
269
270   env = PEERSTORE_create_record_mq_envelope (record->sub_system,
271                                              record->peer,
272                                              record->key,
273                                              record->value,
274                                              record->value_size,
275                                              record->expiry,
276                                              0,
277                                              GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD);
278   GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (cls_record->client),
279                   env);
280 }
281
282
283 /**
284  * Iterator over all watcher clients
285  * to notify them of a new record
286  *
287  * @param cls closure, a `struct GNUNET_PEERSTORE_Record *`
288  * @param key hash of record key
289  * @param value the watcher client, a `struct GNUNET_SERVICE_Client *`
290  * @return #GNUNET_YES to continue iterating
291  */
292 static int
293 watch_notifier_it (void *cls,
294                    const struct GNUNET_HashCode *key,
295                    void *value)
296 {
297   struct GNUNET_PEERSTORE_Record *record = cls;
298   struct GNUNET_SERVICE_Client *client = value;
299   struct GNUNET_MQ_Envelope *env;
300
301   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
302               "Found a watcher to update.\n");
303   env = PEERSTORE_create_record_mq_envelope (record->sub_system,
304                                              record->peer,
305                                              record->key,
306                                              record->value,
307                                              record->value_size,
308                                              record->expiry,
309                                              0,
310                                              GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD);
311   GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client),
312                   env);
313   return GNUNET_YES;
314 }
315
316
317 /**
318  * Given a new record, notifies watchers
319  *
320  * @param record changed record to update watchers with
321  */
322 static void
323 watch_notifier (struct GNUNET_PEERSTORE_Record *record)
324 {
325   struct GNUNET_HashCode keyhash;
326
327   PEERSTORE_hash_key (record->sub_system,
328                       record->peer,
329                       record->key,
330                       &keyhash);
331   GNUNET_CONTAINER_multihashmap_get_multiple (watchers,
332                                               &keyhash,
333                                               &watch_notifier_it,
334                                               record);
335 }
336
337
338 /**
339  * Handle a watch cancel request from client
340  *
341  * @param cls identification of the client
342  * @param hm the actual message
343  */
344 static void
345 handle_watch_cancel (void *cls,
346                      const struct StoreKeyHashMessage *hm)
347 {
348   struct GNUNET_SERVICE_Client *client = cls;
349
350   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
351               "Received a watch cancel request.\n");
352   if (GNUNET_OK !=
353       GNUNET_CONTAINER_multihashmap_remove (watchers,
354                                             &hm->keyhash,
355                                             client))
356   {
357     GNUNET_break (0);
358     GNUNET_SERVICE_client_drop (client);
359     return;
360   }
361   num_clients++;
362   GNUNET_SERVICE_client_continue (client);
363 }
364
365
366 /**
367  * Handle a watch request from client
368  *
369  * @param cls identification of the client
370  * @param hm the actual message
371  */
372 static void
373 handle_watch (void *cls,
374               const struct StoreKeyHashMessage *hm)
375 {
376   struct GNUNET_SERVICE_Client *client = cls;
377
378   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
379               "Received a watch request.\n");
380   num_clients--; /* do not count watchers */
381   GNUNET_SERVICE_client_mark_monitor (client);
382   GNUNET_CONTAINER_multihashmap_put (watchers,
383                                      &hm->keyhash,
384                                      client,
385                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
386   GNUNET_SERVICE_client_continue (client);
387 }
388
389
390 /**
391  * Check an iterate request from client
392  *
393  * @param cls client identification of the client
394  * @param srm the actual message
395  * @return #GNUNET_OK if @a srm is well-formed
396  */
397 static int
398 check_iterate (void *cls,
399                const struct StoreRecordMessage *srm)
400 {
401   struct GNUNET_PEERSTORE_Record *record;
402
403   record = PEERSTORE_parse_record_message (srm);
404   if (NULL == record)
405   {
406     GNUNET_break (0);
407     return GNUNET_SYSERR;
408   }
409   if (NULL == record->sub_system)
410   {
411     GNUNET_break (0);
412     PEERSTORE_destroy_record (record);
413     return GNUNET_SYSERR;
414   }
415   PEERSTORE_destroy_record (record);
416   return GNUNET_OK;
417 }
418
419
420 /**
421  * Handle an iterate request from client
422  *
423  * @param cls identification of the client
424  * @param srm the actual message
425  */
426 static void
427 handle_iterate (void *cls,
428                 const struct StoreRecordMessage *srm)
429 {
430   struct GNUNET_SERVICE_Client *client = cls;
431   struct GNUNET_PEERSTORE_Record *record;
432
433   record = PEERSTORE_parse_record_message (srm);
434   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
435               "Iterate request: ss `%s', peer `%s', key `%s'\n",
436               record->sub_system,
437               (NULL == record->peer) ? "NULL" : GNUNET_i2s (record->peer),
438               (NULL == record->key) ? "NULL" : record->key);
439   record->client = client;
440   if (GNUNET_OK !=
441       db->iterate_records (db->cls,
442                            record->sub_system,
443                            record->peer,
444                            record->key,
445                            &record_iterator,
446                            record))
447   {
448     GNUNET_SERVICE_client_drop (client);
449     PEERSTORE_destroy_record (record);
450   }
451 }
452
453
454 /**
455  * Continuation of store_record called by the peerstore plugin
456  *
457  * @param cls closure
458  * @param success result
459  */
460 static void
461 store_record_continuation (void *cls,
462                            int success)
463 {
464   struct GNUNET_PEERSTORE_Record *record = cls;
465
466   if (GNUNET_OK == success)
467   {
468     watch_notifier (record);
469     GNUNET_SERVICE_client_continue (record->client);
470   }
471   else
472   {
473     GNUNET_SERVICE_client_drop (record->client);
474   }
475   PEERSTORE_destroy_record (record);
476 }
477
478
479 /**
480  * Check a store request from client
481  *
482  * @param cls client identification of the client
483  * @param srm the actual message
484  * @return #GNUNET_OK if @a srm is well-formed
485  */
486 static int
487 check_store (void *cls,
488               const struct StoreRecordMessage *srm)
489 {
490   struct GNUNET_PEERSTORE_Record *record;
491
492   record = PEERSTORE_parse_record_message (srm);
493   if (NULL == record)
494   {
495     GNUNET_break (0);
496     return GNUNET_SYSERR;
497   }
498   if ( (NULL == record->sub_system) ||
499        (NULL == record->peer) ||
500        (NULL == record->key) )
501   {
502     GNUNET_break (0);
503     PEERSTORE_destroy_record (record);
504     return GNUNET_SYSERR;
505   }
506   PEERSTORE_destroy_record (record);
507   return GNUNET_OK;
508 }
509
510
511 /**
512  * Handle a store request from client
513  *
514  * @param cls client identification of the client
515  * @param srm the actual message
516  */
517 static void
518 handle_store (void *cls,
519               const struct StoreRecordMessage *srm)
520 {
521   struct GNUNET_SERVICE_Client *client = cls;
522   struct GNUNET_PEERSTORE_Record *record;
523
524   record = PEERSTORE_parse_record_message (srm);
525   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
526               "Received a store request. Sub system `%s' Peer `%s Key `%s' Options: %d.\n",
527               record->sub_system,
528               GNUNET_i2s (record->peer),
529               record->key,
530               ntohl (srm->options));
531   record->client = client;
532   if (GNUNET_OK !=
533       db->store_record (db->cls,
534                         record->sub_system,
535                         record->peer,
536                         record->key,
537                         record->value,
538                         record->value_size,
539                         *record->expiry,
540                         ntohl (srm->options),
541                         &store_record_continuation,
542                         record))
543   {
544     PEERSTORE_destroy_record (record);
545     GNUNET_SERVICE_client_drop (client);
546     return;
547   }
548 }
549
550
551 /**
552  * Peerstore service runner.
553  *
554  * @param cls closure
555  * @param c configuration to use
556  * @param service the initialized service
557  */
558 static void
559 run (void *cls,
560      const struct GNUNET_CONFIGURATION_Handle *c,
561      struct GNUNET_SERVICE_Handle *service)
562 {
563   char *database;
564
565   in_shutdown = GNUNET_NO;
566   cfg = c;
567   if (GNUNET_OK !=
568       GNUNET_CONFIGURATION_get_value_string (cfg,
569                                              "peerstore",
570                                              "DATABASE",
571                                              &database))
572   {
573     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
574                                "peerstore",
575                                "DATABASE");
576     GNUNET_SCHEDULER_shutdown ();
577     return;
578   }
579   GNUNET_asprintf (&db_lib_name,
580                    "libgnunet_plugin_peerstore_%s",
581                    database);
582   db = GNUNET_PLUGIN_load (db_lib_name,
583                            (void *) cfg);
584   GNUNET_free (database);
585   if (NULL == db)
586   {
587     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
588                 _("Could not load database backend `%s'\n"),
589                 db_lib_name);
590     GNUNET_SCHEDULER_shutdown ();
591     return;
592   }
593   watchers = GNUNET_CONTAINER_multihashmap_create (10,
594                                                    GNUNET_NO);
595   expire_task = GNUNET_SCHEDULER_add_now (&cleanup_expired_records,
596                                           NULL);
597   GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
598                                  NULL);
599 }
600
601
602 /**
603  * Define "main" method using service macro.
604  */
605 GNUNET_SERVICE_MAIN
606 ("peerstore",
607  GNUNET_SERVICE_OPTION_SOFT_SHUTDOWN,
608  &run,
609  &client_connect_cb,
610  &client_disconnect_cb,
611  NULL,
612  GNUNET_MQ_hd_var_size (store,
613                         GNUNET_MESSAGE_TYPE_PEERSTORE_STORE,
614                         struct StoreRecordMessage,
615                         NULL),
616  GNUNET_MQ_hd_var_size (iterate,
617                         GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE,
618                         struct StoreRecordMessage,
619                         NULL),
620  GNUNET_MQ_hd_fixed_size (watch,
621                           GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH,
622                           struct StoreKeyHashMessage,
623                           NULL),
624  GNUNET_MQ_hd_fixed_size (watch_cancel,
625                           GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL,
626                           struct StoreKeyHashMessage,
627                           NULL),
628  GNUNET_MQ_handler_end ());
629
630
631 /* end of gnunet-service-peerstore.c */