peerstore: watch functionality
[oweals/gnunet.git] / src / peerstore / gnunet-service-peerstore.c
1 /*
2      This file is part of GNUnet.
3      (C) 
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file peerstore/gnunet-service-peerstore.c
23  * @brief peerstore service implementation
24  * @author Omar Tarabai
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "peerstore.h"
29 #include "gnunet_peerstore_plugin.h"
30 #include "peerstore_common.h"
31
32 /**
33  * Context of a PEERSTORE watch
34  */
35 struct WatchContext
36 {
37
38   /**
39    * Hash of key of watched record
40    */
41   struct GNUNET_HashCode keyhash;
42
43   /**
44    * Client requested the watch
45    */
46   struct GNUNET_SERVER_Client *client;
47
48 };
49
50 /**
51  * Interval for expired records cleanup (in seconds)
52  */
53 #define CLEANUP_INTERVAL 300 /* 5mins */
54
55 /**
56  * Our configuration.
57  */
58 static const struct GNUNET_CONFIGURATION_Handle *cfg;
59
60 /**
61  * Database plugin library name
62  */
63 char *db_lib_name;
64
65 /**
66  * Database handle
67  */
68 static struct GNUNET_PEERSTORE_PluginFunctions *db;
69
70 /**
71  * Hashmap with all watch requests
72  */
73 static struct GNUNET_CONTAINER_MultiHashMap *watchers;
74
75 /**
76  * Our notification context.
77  */
78 static struct GNUNET_SERVER_NotificationContext *nc;
79
80 /**
81  * Task run during shutdown.
82  *
83  * @param cls unused
84  * @param tc unused
85  */
86 static void
87 shutdown_task (void *cls,
88                const struct GNUNET_SCHEDULER_TaskContext *tc)
89 {
90   if(NULL != db_lib_name)
91   {
92     GNUNET_break (NULL == GNUNET_PLUGIN_unload (db_lib_name, db));
93     GNUNET_free (db_lib_name);
94     db_lib_name = NULL;
95   }
96   GNUNET_SERVER_notification_context_destroy(nc);
97   GNUNET_CONTAINER_multihashmap_destroy(watchers);
98   GNUNET_SCHEDULER_shutdown();
99 }
100
101 /**
102  * Deletes any expired records from storage
103  */
104 static void
105 cleanup_expired_records(void *cls,
106     const struct GNUNET_SCHEDULER_TaskContext *tc)
107 {
108   int deleted;
109
110   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
111     return;
112   GNUNET_assert(NULL != db);
113   deleted = db->expire_records(db->cls, GNUNET_TIME_absolute_get());
114   GNUNET_log(GNUNET_ERROR_TYPE_INFO, "%d records expired.\n", deleted);
115   GNUNET_SCHEDULER_add_delayed(
116       GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, CLEANUP_INTERVAL),
117       &cleanup_expired_records, NULL);
118 }
119
120
121 /**
122  * A client disconnected.  Remove all of its data structure entries.
123  *
124  * @param cls closure, NULL
125  * @param client identification of the client
126  */
127 static void
128 handle_client_disconnect (void *cls,
129                           struct GNUNET_SERVER_Client
130                           * client)
131 {
132 }
133
134 /**
135  * Function called by for each matching record.
136  *
137  * @param cls closure
138  * @param peer peer identity
139  * @param sub_system name of the GNUnet sub system responsible
140  * @param value stored value
141  * @param size size of stored value
142  */
143 int record_iterator(void *cls,
144     struct GNUNET_PEERSTORE_Record *record,
145     char *emsg)
146 {
147   struct GNUNET_SERVER_TransmitContext *tc = cls;
148   struct StoreRecordMessage *srm;
149
150   srm = PEERSTORE_create_record_message(record->sub_system,
151       record->peer,
152       record->key,
153       record->value,
154       record->value_size,
155       record->expiry,
156       GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD);
157   GNUNET_SERVER_transmit_context_append_message(tc, (const struct GNUNET_MessageHeader *)srm);
158   return GNUNET_YES;
159 }
160
161 /**
162  * Iterator over all watcher clients
163  * to notify them of a new record
164  *
165  * @param cls closuer, a 'struct GNUNET_PEERSTORE_Record *'
166  * @param key hash of record key
167  * @param value the watcher client, a 'struct GNUNET_SERVER_Client *'
168  * @return #GNUNET_YES to continue iterating
169  */
170 int watch_notifier_it(void *cls,
171     const struct GNUNET_HashCode *key,
172     void *value)
173 {
174   struct GNUNET_PEERSTORE_Record *record = cls;
175   struct GNUNET_SERVER_Client *client = value;
176   struct StoreRecordMessage *srm;
177
178   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Found a watcher to update.\n");
179   if(NULL == value)
180   {
181     GNUNET_CONTAINER_multihashmap_remove(watchers, key, value);
182     return GNUNET_YES;
183   }
184   srm = PEERSTORE_create_record_message(record->sub_system,
185       record->peer,
186       record->key,
187       record->value,
188       record->value_size,
189       record->expiry,
190       GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD);
191   GNUNET_SERVER_notification_context_unicast(nc, client,
192       (const struct GNUNET_MessageHeader *)srm, GNUNET_YES);
193   return GNUNET_YES;
194 }
195
196 /**
197  * Given a new record, notifies watchers
198  *
199  * @cls closure, a 'struct GNUNET_PEERSTORE_Record *'
200  * @tc unused
201  */
202 void watch_notifier (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
203 {
204   struct GNUNET_PEERSTORE_Record *record = cls;
205   struct GNUNET_HashCode keyhash;
206
207   GNUNET_log(GNUNET_ERROR_TYPE_INFO, "Sending update to any watchers.\n");
208   PEERSTORE_hash_key(record->sub_system,
209       record->peer,
210       record->key,
211       &keyhash);
212   GNUNET_CONTAINER_multihashmap_get_multiple(watchers, &keyhash, &watch_notifier_it, record);
213 }
214
215 /**
216  * Handle a watch cancel request from client
217  *
218  * @param cls unused
219  * @param client identification of the client
220  * @param message the actual message
221  */
222 void handle_watch_cancel (void *cls,
223     struct GNUNET_SERVER_Client *client,
224     const struct GNUNET_MessageHeader *message)
225 {
226   struct StoreKeyHashMessage *hm;
227
228   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Received a watch cancel request from client.\n");
229   hm = (struct StoreKeyHashMessage *) message;
230   GNUNET_CONTAINER_multihashmap_remove(watchers, &hm->keyhash, client);
231   GNUNET_SERVER_receive_done(client, GNUNET_OK);
232 }
233
234 /**
235  * Handle a watch request from client
236  *
237  * @param cls unused
238  * @param client identification of the client
239  * @param message the actual message
240  */
241 void handle_watch (void *cls,
242     struct GNUNET_SERVER_Client *client,
243     const struct GNUNET_MessageHeader *message)
244 {
245   struct StoreKeyHashMessage *hm;
246
247   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Received a watch request from client.\n");
248   hm = (struct StoreKeyHashMessage *) message;
249   GNUNET_SERVER_client_mark_monitor(client);
250   GNUNET_SERVER_notification_context_add(nc, client);
251   GNUNET_CONTAINER_multihashmap_put(watchers, &hm->keyhash,
252      client, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
253   GNUNET_SERVER_receive_done(client, GNUNET_OK);
254 }
255
256 /**
257  * Handle an iterate request from client
258  *
259  * @param cls unused
260  * @param client identification of the client
261  * @param message the actual message
262  */
263 void handle_iterate (void *cls,
264     struct GNUNET_SERVER_Client *client,
265     const struct GNUNET_MessageHeader *message)
266 {
267   struct GNUNET_PEERSTORE_Record *record;
268   struct GNUNET_SERVER_TransmitContext *tc;
269
270   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Received an iterate request from client.\n");
271   record = PEERSTORE_parse_record_message(message);
272   if(NULL == record)
273   {
274     GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "Malformed iterate request from client\n");
275     GNUNET_SERVER_receive_done(client, GNUNET_SYSERR);
276     return;
277   }
278   if(NULL == record->sub_system)
279   {
280     GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "Sub system not supplied in client iterate request\n");
281     GNUNET_SERVER_receive_done(client, GNUNET_SYSERR);
282     return;
283   }
284   tc = GNUNET_SERVER_transmit_context_create (client);
285   if(GNUNET_OK == db->iterate_records(db->cls,
286       record->sub_system,
287       record->peer,
288       record->key,
289       &record_iterator,
290       tc))
291   {
292     GNUNET_SERVER_transmit_context_append_data(tc, NULL, 0, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END);
293     GNUNET_SERVER_transmit_context_run (tc, GNUNET_TIME_UNIT_FOREVER_REL);
294   }
295   else
296   {
297     GNUNET_free(tc);
298     GNUNET_SERVER_receive_done(client, GNUNET_SYSERR);
299   }
300   GNUNET_free(record); /* FIXME: destroy record */
301 }
302
303 /**
304  * Handle a store request from client
305  *
306  * @param cls unused
307  * @param client identification of the client
308  * @param message the actual message
309  */
310 void handle_store (void *cls,
311     struct GNUNET_SERVER_Client *client,
312     const struct GNUNET_MessageHeader *message)
313 {
314   struct GNUNET_PEERSTORE_Record *record;
315   struct GNUNET_SERVER_TransmitContext *tc;
316
317   record = PEERSTORE_parse_record_message(message);
318   if(NULL == record)
319   {
320     GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "Malformed store request from client\n");
321     GNUNET_SERVER_receive_done(client, GNUNET_SYSERR);
322     return;
323   }
324   if(NULL == record->sub_system
325       || NULL == record->peer
326       || NULL == record->key)
327   {
328     /* FIXME: Destroy record */
329     GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "Full key not supplied in client store request\n");
330     GNUNET_SERVER_receive_done(client, GNUNET_SYSERR);
331     return;
332   }
333   GNUNET_log(GNUNET_ERROR_TYPE_INFO, "Received a store request (size: %lu) for sub system `%s', peer `%s', key `%s'\n",
334       record->value_size,
335       record->sub_system,
336       GNUNET_i2s (record->peer),
337       record->key);
338   if(GNUNET_OK != db->store_record(db->cls,
339       record->sub_system,
340       record->peer,
341       record->key,
342       record->value,
343       record->value_size,
344       *record->expiry))
345   {
346     /* FIXME: Destroy record */
347     GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "Failed to store requested value, sqlite database error.");
348     GNUNET_SERVER_receive_done(client, GNUNET_SYSERR);
349     return;
350   }
351   tc = GNUNET_SERVER_transmit_context_create (client);
352   GNUNET_SERVER_transmit_context_append_data(tc, NULL, 0, GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_OK);
353   GNUNET_SERVER_transmit_context_run (tc, GNUNET_TIME_UNIT_FOREVER_REL);
354   GNUNET_SCHEDULER_add_continuation(&watch_notifier, record, -1);
355 }
356
357 /**
358  * Peerstore service runner.
359  *
360  * @param cls closure
361  * @param server the initialized server
362  * @param c configuration to use
363  */
364 static void
365 run (void *cls,
366      struct GNUNET_SERVER_Handle *server,
367      const struct GNUNET_CONFIGURATION_Handle *c)
368 {
369   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
370       {&handle_store, NULL, GNUNET_MESSAGE_TYPE_PEERSTORE_STORE, 0},
371       {&handle_iterate, NULL, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE, 0},
372       {&handle_watch, NULL, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH, sizeof(struct StoreKeyHashMessage)},
373       {&handle_watch_cancel, NULL, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL, sizeof(struct StoreKeyHashMessage)},
374       {NULL, NULL, 0, 0}
375   };
376   char *database;
377
378   cfg = c;
379   if (GNUNET_OK !=
380         GNUNET_CONFIGURATION_get_value_string (cfg, "peerstore", "DATABASE",
381                                                &database))
382     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "No database backend configured\n");
383
384   else
385   {
386     GNUNET_asprintf (&db_lib_name, "libgnunet_plugin_peerstore_%s", database);
387     db = GNUNET_PLUGIN_load(db_lib_name, (void *) cfg);
388     GNUNET_free(database);
389   }
390   if(NULL == db)
391           GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "Could not load database backend `%s'\n", db_lib_name);
392   else
393   {
394     nc = GNUNET_SERVER_notification_context_create (server, 16);
395     watchers = GNUNET_CONTAINER_multihashmap_create(10, GNUNET_NO);
396     GNUNET_SCHEDULER_add_now(&cleanup_expired_records, NULL);
397     GNUNET_SERVER_add_handlers (server, handlers);
398     GNUNET_SERVER_disconnect_notify (server,
399              &handle_client_disconnect,
400              NULL);
401   }
402   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
403                                 &shutdown_task,
404                                 NULL);
405 }
406
407
408 /**
409  * The main function for the peerstore service.
410  *
411  * @param argc number of arguments from the command line
412  * @param argv command line arguments
413  * @return 0 ok, 1 on error
414  */
415 int
416 main (int argc, char *const *argv)
417 {
418   return (GNUNET_OK ==
419           GNUNET_SERVICE_run (argc,
420                               argv,
421                               "peerstore",
422                               GNUNET_SERVICE_OPTION_NONE,
423                               &run, NULL)) ? 0 : 1;
424 }
425
426 /* end of gnunet-service-peerstore.c */