uncrustify as demanded.
[oweals/gnunet.git] / src / peerstore / gnunet-service-peerstore.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2014, 2015, 2016 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19  */
20
21 /**
22  * @file peerstore/gnunet-service-peerstore.c
23  * @brief peerstore service implementation
24  * @author Omar Tarabai
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "peerstore.h"
29 #include "gnunet_peerstore_plugin.h"
30 #include "peerstore_common.h"
31
32
33 /**
34  * Interval for expired records cleanup (in seconds)
35  */
36 #define EXPIRED_RECORDS_CLEANUP_INTERVAL 300 /* 5mins */
37
38 /**
39  * Our configuration.
40  */
41 static const struct GNUNET_CONFIGURATION_Handle *cfg;
42
43 /**
44  * Database plugin library name
45  */
46 static char *db_lib_name;
47
48 /**
49  * Database handle
50  */
51 static struct GNUNET_PEERSTORE_PluginFunctions *db;
52
53 /**
54  * Hashmap with all watch requests
55  */
56 static struct GNUNET_CONTAINER_MultiHashMap *watchers;
57
58 /**
59  * Task run to clean up expired records.
60  */
61 static struct GNUNET_SCHEDULER_Task *expire_task;
62
63 /**
64  * Are we in the process of shutting down the service? #GNUNET_YES / #GNUNET_NO
65  */
66 static int in_shutdown;
67
68 /**
69  * Number of connected clients.
70  */
71 static unsigned int num_clients;
72
73
74 /**
75  * Perform the actual shutdown operations
76  */
77 static void
78 do_shutdown()
79 {
80   if (NULL != db_lib_name)
81     {
82       GNUNET_break(NULL == GNUNET_PLUGIN_unload(db_lib_name, db));
83       GNUNET_free(db_lib_name);
84       db_lib_name = NULL;
85     }
86   if (NULL != watchers)
87     {
88       GNUNET_CONTAINER_multihashmap_destroy(watchers);
89       watchers = NULL;
90     }
91   if (NULL != expire_task)
92     {
93       GNUNET_SCHEDULER_cancel(expire_task);
94       expire_task = NULL;
95     }
96   GNUNET_SCHEDULER_shutdown();
97 }
98
99
100 /**
101  * Task run during shutdown.
102  *
103  * @param cls unused
104  */
105 static void
106 shutdown_task(void *cls)
107 {
108   in_shutdown = GNUNET_YES;
109   if (0 == num_clients) /* Only when no connected clients. */
110     do_shutdown();
111 }
112
113
114 /* Forward declaration */
115 static void
116 expire_records_continuation(void *cls, int success);
117
118
119 /**
120  * Deletes any expired records from storage
121  */
122 static void
123 cleanup_expired_records(void *cls)
124 {
125   int ret;
126
127   expire_task = NULL;
128   GNUNET_assert(NULL != db);
129   ret = db->expire_records(db->cls,
130                            GNUNET_TIME_absolute_get(),
131                            &expire_records_continuation,
132                            NULL);
133   if (GNUNET_OK != ret)
134     {
135       GNUNET_assert(NULL == expire_task);
136       expire_task = GNUNET_SCHEDULER_add_delayed(
137         GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS,
138                                       EXPIRED_RECORDS_CLEANUP_INTERVAL),
139         &cleanup_expired_records,
140         NULL);
141     }
142 }
143
144
145 /**
146  * Continuation to expire_records called by the peerstore plugin
147  *
148  * @param cls unused
149  * @param success count of records deleted or #GNUNET_SYSERR
150  */
151 static void
152 expire_records_continuation(void *cls, int success)
153 {
154   if (success > 0)
155     GNUNET_log(GNUNET_ERROR_TYPE_INFO, "%d records expired.\n", success);
156   GNUNET_assert(NULL == expire_task);
157   expire_task = GNUNET_SCHEDULER_add_delayed(
158     GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS,
159                                   EXPIRED_RECORDS_CLEANUP_INTERVAL),
160     &cleanup_expired_records,
161     NULL);
162 }
163
164
165 /**
166  * A client disconnected.  Remove all of its data structure entries.
167  *
168  * @param cls closure, NULL
169  * @param client identification of the client
170  * @param mq the message queue
171  * @return
172  */
173 static void *
174 client_connect_cb(void *cls,
175                   struct GNUNET_SERVICE_Client *client,
176                   struct GNUNET_MQ_Handle *mq)
177 {
178   num_clients++;
179   return client;
180 }
181
182
183 /**
184  * Search for a disconnected client and remove it
185  *
186  * @param cls closuer, a `struct GNUNET_SERVICE_Client`
187  * @param key hash of record key
188  * @param value the watcher client, a `struct GNUNET_SERVICE_Client *`
189  * @return #GNUNET_OK to continue iterating
190  */
191 static int
192 client_disconnect_it(void *cls, const struct GNUNET_HashCode *key, void *value)
193 {
194   if (value == cls)
195     {
196       GNUNET_assert(GNUNET_YES ==
197                     GNUNET_CONTAINER_multihashmap_remove(watchers, key, value));
198       num_clients++;
199     }
200   return GNUNET_OK;
201 }
202
203
204 /**
205  * A client disconnected.  Remove all of its data structure entries.
206  *
207  * @param cls closure, NULL
208  * @param client identification of the client
209  */
210 static void
211 client_disconnect_cb(void *cls,
212                      struct GNUNET_SERVICE_Client *client,
213                      void *app_cls)
214 {
215   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "A client disconnected, cleaning up.\n");
216   if (NULL != watchers)
217     GNUNET_CONTAINER_multihashmap_iterate(watchers,
218                                           &client_disconnect_it,
219                                           client);
220   num_clients--;
221   if ((0 == num_clients) && in_shutdown)
222     do_shutdown();
223 }
224
225
226 /**
227  * Function called by for each matching record.
228  *
229  * @param cls closure
230  * @param record peerstore record found
231  * @param emsg error message or NULL if no errors
232  * @return #GNUNET_YES to continue iteration
233  */
234 static void
235 record_iterator(void *cls,
236                 const struct GNUNET_PEERSTORE_Record *record,
237                 const char *emsg)
238 {
239   struct GNUNET_PEERSTORE_Record *cls_record = cls;
240   struct GNUNET_MQ_Envelope *env;
241
242   if (NULL == record)
243     {
244       /* No more records */
245       struct GNUNET_MessageHeader *endmsg;
246
247       env = GNUNET_MQ_msg(endmsg, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END);
248       GNUNET_MQ_send(GNUNET_SERVICE_client_get_mq(cls_record->client), env);
249       if (NULL == emsg)
250         {
251           GNUNET_SERVICE_client_continue(cls_record->client);
252         }
253       else
254         {
255           GNUNET_break(0);
256           GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "Failed to iterate: %s\n", emsg);
257           GNUNET_SERVICE_client_drop(cls_record->client);
258         }
259       PEERSTORE_destroy_record(cls_record);
260       return;
261     }
262
263   env = PEERSTORE_create_record_mq_envelope(
264     record->sub_system,
265     &record->peer,
266     record->key,
267     record->value,
268     record->value_size,
269     record->expiry,
270     0,
271     GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD);
272   GNUNET_MQ_send(GNUNET_SERVICE_client_get_mq(cls_record->client), env);
273 }
274
275
276 /**
277  * Iterator over all watcher clients
278  * to notify them of a new record
279  *
280  * @param cls closure, a `struct GNUNET_PEERSTORE_Record *`
281  * @param key hash of record key
282  * @param value the watcher client, a `struct GNUNET_SERVICE_Client *`
283  * @return #GNUNET_YES to continue iterating
284  */
285 static int
286 watch_notifier_it(void *cls, const struct GNUNET_HashCode *key, void *value)
287 {
288   struct GNUNET_PEERSTORE_Record *record = cls;
289   struct GNUNET_SERVICE_Client *client = value;
290   struct GNUNET_MQ_Envelope *env;
291
292   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Found a watcher to update.\n");
293   env = PEERSTORE_create_record_mq_envelope(
294     record->sub_system,
295     &record->peer,
296     record->key,
297     record->value,
298     record->value_size,
299     record->expiry,
300     0,
301     GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD);
302   GNUNET_MQ_send(GNUNET_SERVICE_client_get_mq(client), env);
303   return GNUNET_YES;
304 }
305
306
307 /**
308  * Given a new record, notifies watchers
309  *
310  * @param record changed record to update watchers with
311  */
312 static void
313 watch_notifier(struct GNUNET_PEERSTORE_Record *record)
314 {
315   struct GNUNET_HashCode keyhash;
316
317   PEERSTORE_hash_key(record->sub_system, &record->peer, record->key, &keyhash);
318   GNUNET_CONTAINER_multihashmap_get_multiple(watchers,
319                                              &keyhash,
320                                              &watch_notifier_it,
321                                              record);
322 }
323
324
325 /**
326  * Handle a watch cancel request from client
327  *
328  * @param cls identification of the client
329  * @param hm the actual message
330  */
331 static void
332 handle_watch_cancel(void *cls, const struct StoreKeyHashMessage *hm)
333 {
334   struct GNUNET_SERVICE_Client *client = cls;
335
336   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Received a watch cancel request.\n");
337   if (GNUNET_OK !=
338       GNUNET_CONTAINER_multihashmap_remove(watchers, &hm->keyhash, client))
339     {
340       GNUNET_break(0);
341       GNUNET_SERVICE_client_drop(client);
342       return;
343     }
344   num_clients++;
345   GNUNET_SERVICE_client_continue(client);
346 }
347
348
349 /**
350  * Handle a watch request from client
351  *
352  * @param cls identification of the client
353  * @param hm the actual message
354  */
355 static void
356 handle_watch(void *cls, const struct StoreKeyHashMessage *hm)
357 {
358   struct GNUNET_SERVICE_Client *client = cls;
359
360   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Received a watch request.\n");
361   num_clients--; /* do not count watchers */
362   GNUNET_SERVICE_client_mark_monitor(client);
363   GNUNET_CONTAINER_multihashmap_put(watchers,
364                                     &hm->keyhash,
365                                     client,
366                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
367   GNUNET_SERVICE_client_continue(client);
368 }
369
370
371 /**
372  * Check an iterate request from client
373  *
374  * @param cls client identification of the client
375  * @param srm the actual message
376  * @return #GNUNET_OK if @a srm is well-formed
377  */
378 static int
379 check_iterate(void *cls, const struct StoreRecordMessage *srm)
380 {
381   struct GNUNET_PEERSTORE_Record *record;
382
383   record = PEERSTORE_parse_record_message(srm);
384   if (NULL == record)
385     {
386       GNUNET_break(0);
387       return GNUNET_SYSERR;
388     }
389   if (NULL == record->sub_system)
390     {
391       GNUNET_break(0);
392       PEERSTORE_destroy_record(record);
393       return GNUNET_SYSERR;
394     }
395   PEERSTORE_destroy_record(record);
396   return GNUNET_OK;
397 }
398
399
400 /**
401  * Handle an iterate request from client
402  *
403  * @param cls identification of the client
404  * @param srm the actual message
405  */
406 static void
407 handle_iterate(void *cls, const struct StoreRecordMessage *srm)
408 {
409   struct GNUNET_SERVICE_Client *client = cls;
410   struct GNUNET_PEERSTORE_Record *record;
411
412   record = PEERSTORE_parse_record_message(srm);
413   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
414              "Iterate request: ss `%s', peer `%s', key `%s'\n",
415              record->sub_system,
416              GNUNET_i2s(&record->peer),
417              (NULL == record->key) ? "NULL" : record->key);
418   record->client = client;
419   if (GNUNET_OK !=
420       db->iterate_records(db->cls,
421                           record->sub_system,
422                           (ntohs(srm->peer_set)) ? &record->peer : NULL,
423                           record->key,
424                           &record_iterator,
425                           record))
426     {
427       GNUNET_break(0);
428       GNUNET_SERVICE_client_drop(client);
429       PEERSTORE_destroy_record(record);
430     }
431 }
432
433
434 /**
435  * Continuation of store_record called by the peerstore plugin
436  *
437  * @param cls closure
438  * @param success result
439  */
440 static void
441 store_record_continuation(void *cls, int success)
442 {
443   struct GNUNET_PEERSTORE_Record *record = cls;
444
445   if (GNUNET_OK == success)
446     {
447       watch_notifier(record);
448       GNUNET_SERVICE_client_continue(record->client);
449     }
450   else
451     {
452       GNUNET_break(0);
453       GNUNET_SERVICE_client_drop(record->client);
454     }
455   PEERSTORE_destroy_record(record);
456 }
457
458
459 /**
460  * Check a store request from client
461  *
462  * @param cls client identification of the client
463  * @param srm the actual message
464  * @return #GNUNET_OK if @a srm is well-formed
465  */
466 static int
467 check_store(void *cls, const struct StoreRecordMessage *srm)
468 {
469   struct GNUNET_PEERSTORE_Record *record;
470
471   record = PEERSTORE_parse_record_message(srm);
472   if (NULL == record)
473     {
474       GNUNET_break(0);
475       return GNUNET_SYSERR;
476     }
477   if ((NULL == record->sub_system) || (NULL == record->key))
478     {
479       GNUNET_break(0);
480       PEERSTORE_destroy_record(record);
481       return GNUNET_SYSERR;
482     }
483   PEERSTORE_destroy_record(record);
484   return GNUNET_OK;
485 }
486
487
488 /**
489  * Handle a store request from client
490  *
491  * @param cls client identification of the client
492  * @param srm the actual message
493  */
494 static void
495 handle_store(void *cls, const struct StoreRecordMessage *srm)
496 {
497   struct GNUNET_SERVICE_Client *client = cls;
498   struct GNUNET_PEERSTORE_Record *record;
499
500   record = PEERSTORE_parse_record_message(srm);
501   GNUNET_log(
502     GNUNET_ERROR_TYPE_INFO,
503     "Received a store request. Sub system `%s' Peer `%s Key `%s' Options: %u.\n",
504     record->sub_system,
505     GNUNET_i2s(&record->peer),
506     record->key,
507     (uint32_t)ntohl(srm->options));
508   record->client = client;
509   if (GNUNET_OK != db->store_record(db->cls,
510                                     record->sub_system,
511                                     &record->peer,
512                                     record->key,
513                                     record->value,
514                                     record->value_size,
515                                     record->expiry,
516                                     ntohl(srm->options),
517                                     &store_record_continuation,
518                                     record))
519     {
520       GNUNET_break(0);
521       PEERSTORE_destroy_record(record);
522       GNUNET_SERVICE_client_drop(client);
523       return;
524     }
525 }
526
527
528 /**
529  * Peerstore service runner.
530  *
531  * @param cls closure
532  * @param c configuration to use
533  * @param service the initialized service
534  */
535 static void
536 run(void *cls,
537     const struct GNUNET_CONFIGURATION_Handle *c,
538     struct GNUNET_SERVICE_Handle *service)
539 {
540   char *database;
541
542   in_shutdown = GNUNET_NO;
543   cfg = c;
544   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string(cfg,
545                                                          "peerstore",
546                                                          "DATABASE",
547                                                          &database))
548     {
549       GNUNET_log_config_missing(GNUNET_ERROR_TYPE_ERROR,
550                                 "peerstore",
551                                 "DATABASE");
552       GNUNET_SCHEDULER_shutdown();
553       return;
554     }
555   GNUNET_asprintf(&db_lib_name, "libgnunet_plugin_peerstore_%s", database);
556   db = GNUNET_PLUGIN_load(db_lib_name, (void *)cfg);
557   GNUNET_free(database);
558   if (NULL == db)
559     {
560       GNUNET_log(GNUNET_ERROR_TYPE_ERROR,
561                  _("Could not load database backend `%s'\n"),
562                  db_lib_name);
563       GNUNET_SCHEDULER_shutdown();
564       return;
565     }
566   watchers = GNUNET_CONTAINER_multihashmap_create(10, GNUNET_NO);
567   expire_task = GNUNET_SCHEDULER_add_now(&cleanup_expired_records, NULL);
568   GNUNET_SCHEDULER_add_shutdown(&shutdown_task, NULL);
569 }
570
571
572 /**
573  * Define "main" method using service macro.
574  */
575 GNUNET_SERVICE_MAIN(
576   "peerstore",
577   GNUNET_SERVICE_OPTION_SOFT_SHUTDOWN,
578   &run,
579   &client_connect_cb,
580   &client_disconnect_cb,
581   NULL,
582   GNUNET_MQ_hd_var_size(store,
583                         GNUNET_MESSAGE_TYPE_PEERSTORE_STORE,
584                         struct StoreRecordMessage,
585                         NULL),
586   GNUNET_MQ_hd_var_size(iterate,
587                         GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE,
588                         struct StoreRecordMessage,
589                         NULL),
590   GNUNET_MQ_hd_fixed_size(watch,
591                           GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH,
592                           struct StoreKeyHashMessage,
593                           NULL),
594   GNUNET_MQ_hd_fixed_size(watch_cancel,
595                           GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL,
596                           struct StoreKeyHashMessage,
597                           NULL),
598   GNUNET_MQ_handler_end());
599
600
601 /* end of gnunet-service-peerstore.c */