2 This file is part of GNUnet.
3 Copyright (C) 2013-2016 GNUnet e.V.
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
18 SPDX-License-Identifier: AGPL3.0-or-later
21 * @file peerstore/peerstore_api.c
22 * @brief API for peerstore
23 * @author Omar Tarabai
24 * @author Christian Grothoff
27 #include "gnunet_util_lib.h"
28 #include "peerstore.h"
29 #include "peerstore_common.h"
31 #define LOG(kind,...) GNUNET_log_from (kind, "peerstore-api",__VA_ARGS__)
33 /******************************************************************************/
34 /************************ DATA STRUCTURES ****************************/
35 /******************************************************************************/
38 * Handle to the PEERSTORE service.
40 struct GNUNET_PEERSTORE_Handle
46 const struct GNUNET_CONFIGURATION_Handle *cfg;
51 struct GNUNET_MQ_Handle *mq;
54 * Head of active STORE requests.
56 struct GNUNET_PEERSTORE_StoreContext *store_head;
59 * Tail of active STORE requests.
61 struct GNUNET_PEERSTORE_StoreContext *store_tail;
64 * Head of active ITERATE requests.
66 struct GNUNET_PEERSTORE_IterateContext *iterate_head;
69 * Tail of active ITERATE requests.
71 struct GNUNET_PEERSTORE_IterateContext *iterate_tail;
74 * Hashmap of watch requests
76 struct GNUNET_CONTAINER_MultiHashMap *watches;
79 * Are we in the process of disconnecting but need to sync first?
86 * Context for a store request
88 struct GNUNET_PEERSTORE_StoreContext
93 struct GNUNET_PEERSTORE_StoreContext *next;
98 struct GNUNET_PEERSTORE_StoreContext *prev;
101 * Handle to the PEERSTORE service.
103 struct GNUNET_PEERSTORE_Handle *h;
106 * Continuation called with service response
108 GNUNET_PEERSTORE_Continuation cont;
111 * Closure for @e cont
116 * Which subsystem does the store?
121 * Key for the store operation.
126 * Contains @e size bytes.
131 * Peer the store is for.
133 struct GNUNET_PeerIdentity peer;
136 * Number of bytes in @e value.
141 * When does the value expire?
143 struct GNUNET_TIME_Absolute expiry;
146 * Options for the store operation.
148 enum GNUNET_PEERSTORE_StoreOption options;
153 * Context for a iterate request
155 struct GNUNET_PEERSTORE_IterateContext
160 struct GNUNET_PEERSTORE_IterateContext *next;
165 struct GNUNET_PEERSTORE_IterateContext *prev;
168 * Handle to the PEERSTORE service.
170 struct GNUNET_PEERSTORE_Handle *h;
173 * Which subsystem does the store?
178 * Peer the store is for.
180 struct GNUNET_PeerIdentity peer;
183 * Key for the store operation.
188 * Callback with each matching record
190 GNUNET_PEERSTORE_Processor callback;
193 * Closure for @e callback
198 * #GNUNET_YES if we are currently processing records.
205 * Context for a watch request
207 struct GNUNET_PEERSTORE_WatchContext
212 struct GNUNET_PEERSTORE_WatchContext *next;
217 struct GNUNET_PEERSTORE_WatchContext *prev;
220 * Handle to the PEERSTORE service.
222 struct GNUNET_PEERSTORE_Handle *h;
225 * Callback with each record received
227 GNUNET_PEERSTORE_Processor callback;
230 * Closure for @e callback
235 * Hash of the combined key
237 struct GNUNET_HashCode keyhash;
241 /******************************************************************************/
242 /******************* DECLARATIONS *********************/
243 /******************************************************************************/
246 * Close the existing connection to PEERSTORE and reconnect.
248 * @param h handle to the service
251 reconnect (struct GNUNET_PEERSTORE_Handle *h);
255 * Callback after MQ envelope is sent
257 * @param cls a `struct GNUNET_PEERSTORE_StoreContext *`
260 store_request_sent (void *cls)
262 struct GNUNET_PEERSTORE_StoreContext *sc = cls;
263 GNUNET_PEERSTORE_Continuation cont;
267 cont_cls = sc->cont_cls;
268 GNUNET_PEERSTORE_store_cancel (sc);
270 cont (cont_cls, GNUNET_OK);
274 /******************************************************************************/
275 /******************* CONNECTION FUNCTIONS *********************/
276 /******************************************************************************/
280 * Function called when we had trouble talking to the service.
283 handle_client_error (void *cls,
284 enum GNUNET_MQ_Error error)
286 struct GNUNET_PEERSTORE_Handle *h = cls;
288 LOG (GNUNET_ERROR_TYPE_ERROR,
289 "Received an error notification from MQ of type: %d\n",
296 * Iterator over previous watches to resend them
298 * @param cls the `struct GNUNET_PEERSTORE_Handle`
299 * @param key key for the watch
300 * @param value the `struct GNUNET_PEERSTORE_WatchContext *`
301 * @return #GNUNET_YES (continue to iterate)
304 rewatch_it (void *cls,
305 const struct GNUNET_HashCode *key,
308 struct GNUNET_PEERSTORE_Handle *h = cls;
309 struct GNUNET_PEERSTORE_WatchContext *wc = value;
310 struct StoreKeyHashMessage *hm;
311 struct GNUNET_MQ_Envelope *ev;
313 ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
314 hm->keyhash = wc->keyhash;
315 GNUNET_MQ_send (h->mq, ev);
321 * Iterator over watch requests to cancel them.
324 * @param key key to the watch request
325 * @param value watch context
326 * @return #GNUNET_YES to continue iteration
329 destroy_watch (void *cls,
330 const struct GNUNET_HashCode *key,
333 struct GNUNET_PEERSTORE_WatchContext *wc = value;
335 GNUNET_PEERSTORE_watch_cancel (wc);
341 * Kill the connection to the service. This can be delayed in case of pending
342 * STORE requests and the user explicitly asked to sync first. Otherwise it is
343 * performed instantly.
345 * @param h Handle to the service.
348 do_disconnect (struct GNUNET_PEERSTORE_Handle *h)
352 GNUNET_MQ_destroy (h->mq);
360 * Connect to the PEERSTORE service.
362 * @param cfg configuration to use
363 * @return NULL on error
365 struct GNUNET_PEERSTORE_Handle *
366 GNUNET_PEERSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
368 struct GNUNET_PEERSTORE_Handle *h;
370 h = GNUNET_new (struct GNUNET_PEERSTORE_Handle);
372 h->disconnecting = GNUNET_NO;
384 * Disconnect from the PEERSTORE service. Any pending ITERATE and WATCH requests
386 * Any pending STORE requests will depend on @e snyc_first flag.
388 * @param h handle to disconnect
389 * @param sync_first send any pending STORE requests before disconnecting
392 GNUNET_PEERSTORE_disconnect (struct GNUNET_PEERSTORE_Handle *h,
395 struct GNUNET_PEERSTORE_IterateContext *ic;
396 struct GNUNET_PEERSTORE_StoreContext *sc;
398 LOG (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting.\n");
399 if (NULL != h->watches)
401 GNUNET_CONTAINER_multihashmap_iterate (h->watches, &destroy_watch, NULL);
402 GNUNET_CONTAINER_multihashmap_destroy (h->watches);
405 while (NULL != (ic = h->iterate_head))
408 GNUNET_PEERSTORE_iterate_cancel (ic);
410 if (NULL != h->store_head)
412 if (GNUNET_YES == sync_first)
414 LOG (GNUNET_ERROR_TYPE_DEBUG,
415 "Delaying disconnection due to pending store requests.\n");
416 h->disconnecting = GNUNET_YES;
419 while (NULL != (sc = h->store_head))
420 GNUNET_PEERSTORE_store_cancel (sc);
426 /******************************************************************************/
427 /******************* STORE FUNCTIONS *********************/
428 /******************************************************************************/
432 * Cancel a store request
434 * @param sc Store request context
437 GNUNET_PEERSTORE_store_cancel (struct GNUNET_PEERSTORE_StoreContext *sc)
439 struct GNUNET_PEERSTORE_Handle *h = sc->h;
441 GNUNET_CONTAINER_DLL_remove (sc->h->store_head, sc->h->store_tail, sc);
442 GNUNET_free (sc->sub_system);
443 GNUNET_free (sc->value);
444 GNUNET_free (sc->key);
446 if ((GNUNET_YES == h->disconnecting) && (NULL == h->store_head))
452 * Store a new entry in the PEERSTORE.
453 * Note that stored entries can be lost in some cases
454 * such as power failure.
456 * @param h Handle to the PEERSTORE service
457 * @param sub_system name of the sub system
458 * @param peer Peer Identity
459 * @param key entry key
460 * @param value entry value BLOB
461 * @param size size of @e value
462 * @param expiry absolute time after which the entry is (possibly) deleted
463 * @param options options specific to the storage operation
464 * @param cont Continuation function after the store request is sent
465 * @param cont_cls Closure for @a cont
467 struct GNUNET_PEERSTORE_StoreContext *
468 GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h,
469 const char *sub_system,
470 const struct GNUNET_PeerIdentity *peer,
472 const void *value, size_t size,
473 struct GNUNET_TIME_Absolute expiry,
474 enum GNUNET_PEERSTORE_StoreOption options,
475 GNUNET_PEERSTORE_Continuation cont,
478 struct GNUNET_MQ_Envelope *ev;
479 struct GNUNET_PEERSTORE_StoreContext *sc;
481 LOG (GNUNET_ERROR_TYPE_DEBUG,
482 "Storing value (size: %lu) for subsytem `%s', peer `%s', key `%s'\n",
483 size, sub_system, GNUNET_i2s (peer), key);
484 ev = PEERSTORE_create_record_mq_envelope (sub_system, peer, key, value, size,
486 GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
487 sc = GNUNET_new (struct GNUNET_PEERSTORE_StoreContext);
489 sc->sub_system = GNUNET_strdup (sub_system);
491 sc->key = GNUNET_strdup (key);
492 sc->value = GNUNET_memdup (value, size);
495 sc->options = options;
497 sc->cont_cls = cont_cls;
500 GNUNET_CONTAINER_DLL_insert_tail (h->store_head, h->store_tail, sc);
501 GNUNET_MQ_notify_sent (ev, &store_request_sent, sc);
502 GNUNET_MQ_send (h->mq, ev);
508 /******************************************************************************/
509 /******************* ITERATE FUNCTIONS *********************/
510 /******************************************************************************/
514 * When a response for iterate request is received
516 * @param cls a `struct GNUNET_PEERSTORE_Handle *`
517 * @param msg message received
520 handle_iterate_end (void *cls,
521 const struct GNUNET_MessageHeader *msg)
523 struct GNUNET_PEERSTORE_Handle *h = cls;
524 struct GNUNET_PEERSTORE_IterateContext *ic;
525 GNUNET_PEERSTORE_Processor callback;
528 ic = h->iterate_head;
531 LOG (GNUNET_ERROR_TYPE_ERROR,
532 _("Unexpected iteration response, this should not happen.\n"));
536 callback = ic->callback;
537 callback_cls = ic->callback_cls;
538 ic->iterating = GNUNET_NO;
539 GNUNET_PEERSTORE_iterate_cancel (ic);
540 if (NULL != callback)
541 callback (callback_cls, NULL, NULL);
546 * When a response for iterate request is received, check the
547 * message is well-formed.
549 * @param cls a `struct GNUNET_PEERSTORE_Handle *`
550 * @param msg message received
553 check_iterate_result (void *cls,
554 const struct StoreRecordMessage *msg)
556 /* we defer validation to #handle_iterate_result */
562 * When a response for iterate request is received
564 * @param cls a `struct GNUNET_PEERSTORE_Handle *`
565 * @param msg message received
568 handle_iterate_result (void *cls,
569 const struct StoreRecordMessage *msg)
571 struct GNUNET_PEERSTORE_Handle *h = cls;
572 struct GNUNET_PEERSTORE_IterateContext *ic;
573 GNUNET_PEERSTORE_Processor callback;
575 struct GNUNET_PEERSTORE_Record *record;
577 ic = h->iterate_head;
580 LOG (GNUNET_ERROR_TYPE_ERROR,
581 _("Unexpected iteration response, this should not happen.\n"));
585 ic->iterating = GNUNET_YES;
586 callback = ic->callback;
587 callback_cls = ic->callback_cls;
588 if (NULL == callback)
590 record = PEERSTORE_parse_record_message (msg);
593 callback (callback_cls,
595 _("Received a malformed response from service."));
599 callback (callback_cls,
602 PEERSTORE_destroy_record (record);
608 * Cancel an iterate request
609 * Please do not call after the iterate request is done
611 * @param ic Iterate request context as returned by GNUNET_PEERSTORE_iterate()
614 GNUNET_PEERSTORE_iterate_cancel (struct GNUNET_PEERSTORE_IterateContext *ic)
616 if (GNUNET_NO == ic->iterating)
618 GNUNET_CONTAINER_DLL_remove (ic->h->iterate_head,
621 GNUNET_free (ic->sub_system);
622 GNUNET_free_non_null (ic->key);
631 * Iterate over records matching supplied key information
633 * @param h handle to the PEERSTORE service
634 * @param sub_system name of sub system
635 * @param peer Peer identity (can be NULL)
636 * @param key entry key string (can be NULL)
637 * @param callback function called with each matching record, all NULL's on end
638 * @param callback_cls closure for @a callback
639 * @return Handle to iteration request
641 struct GNUNET_PEERSTORE_IterateContext *
642 GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h,
643 const char *sub_system,
644 const struct GNUNET_PeerIdentity *peer,
646 GNUNET_PEERSTORE_Processor callback,
649 struct GNUNET_MQ_Envelope *ev;
650 struct GNUNET_PEERSTORE_IterateContext *ic;
652 ev = PEERSTORE_create_record_mq_envelope (sub_system,
656 GNUNET_TIME_UNIT_FOREVER_ABS,
658 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
659 ic = GNUNET_new (struct GNUNET_PEERSTORE_IterateContext);
660 ic->callback = callback;
661 ic->callback_cls = callback_cls;
663 ic->sub_system = GNUNET_strdup (sub_system);
667 ic->key = GNUNET_strdup (key);
668 GNUNET_CONTAINER_DLL_insert_tail (h->iterate_head,
671 LOG (GNUNET_ERROR_TYPE_DEBUG,
672 "Sending an iterate request for sub system `%s'\n",
674 GNUNET_MQ_send (h->mq, ev);
679 /******************************************************************************/
680 /******************* WATCH FUNCTIONS *********************/
681 /******************************************************************************/
684 * When a watch record is received, validate it is well-formed.
686 * @param cls a `struct GNUNET_PEERSTORE_Handle *`
687 * @param msg message received
690 check_watch_record (void *cls,
691 const struct StoreRecordMessage *msg)
693 /* we defer validation to #handle_watch_result */
699 * When a watch record is received, process it.
701 * @param cls a `struct GNUNET_PEERSTORE_Handle *`
702 * @param msg message received
705 handle_watch_record (void *cls,
706 const struct StoreRecordMessage *msg)
708 struct GNUNET_PEERSTORE_Handle *h = cls;
709 struct GNUNET_PEERSTORE_Record *record;
710 struct GNUNET_HashCode keyhash;
711 struct GNUNET_PEERSTORE_WatchContext *wc;
713 LOG (GNUNET_ERROR_TYPE_DEBUG,
714 "Received a watch record from service.\n");
715 record = PEERSTORE_parse_record_message (msg);
721 PEERSTORE_hash_key (record->sub_system,
725 // FIXME: what if there are multiple watches for the same key?
726 wc = GNUNET_CONTAINER_multihashmap_get (h->watches,
730 LOG (GNUNET_ERROR_TYPE_ERROR,
731 _("Received a watch result for a non existing watch.\n"));
732 PEERSTORE_destroy_record (record);
736 if (NULL != wc->callback)
737 wc->callback (wc->callback_cls,
740 PEERSTORE_destroy_record (record);
745 * Close the existing connection to PEERSTORE and reconnect.
747 * @param h handle to the service
750 reconnect (struct GNUNET_PEERSTORE_Handle *h)
752 struct GNUNET_MQ_MessageHandler mq_handlers[] = {
753 GNUNET_MQ_hd_fixed_size (iterate_end,
754 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END,
755 struct GNUNET_MessageHeader,
757 GNUNET_MQ_hd_var_size (iterate_result,
758 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD,
759 struct StoreRecordMessage,
761 GNUNET_MQ_hd_var_size (watch_record,
762 GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD,
763 struct StoreRecordMessage,
765 GNUNET_MQ_handler_end ()
767 struct GNUNET_PEERSTORE_IterateContext *ic;
768 struct GNUNET_PEERSTORE_IterateContext *next;
769 GNUNET_PEERSTORE_Processor icb;
771 struct GNUNET_PEERSTORE_StoreContext *sc;
772 struct GNUNET_MQ_Envelope *ev;
774 LOG (GNUNET_ERROR_TYPE_DEBUG,
775 "Reconnecting...\n");
776 for (ic = h->iterate_head; NULL != ic; ic = next)
779 if (GNUNET_YES == ic->iterating)
782 icb_cls = ic->callback_cls;
783 GNUNET_PEERSTORE_iterate_cancel (ic);
787 "Iteration canceled due to reconnection");
792 GNUNET_MQ_destroy (h->mq);
795 h->mq = GNUNET_CLIENT_connect (h->cfg,
798 &handle_client_error,
802 LOG (GNUNET_ERROR_TYPE_DEBUG,
803 "Resending pending requests after reconnect.\n");
804 if (NULL != h->watches)
805 GNUNET_CONTAINER_multihashmap_iterate (h->watches,
808 for (ic = h->iterate_head; NULL != ic; ic = ic->next)
810 ev = PEERSTORE_create_record_mq_envelope (ic->sub_system,
814 GNUNET_TIME_UNIT_FOREVER_ABS,
816 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
817 GNUNET_MQ_send (h->mq, ev);
819 for (sc = h->store_head; NULL != sc; sc = sc->next)
821 ev = PEERSTORE_create_record_mq_envelope (sc->sub_system,
828 GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
829 GNUNET_MQ_notify_sent (ev,
832 GNUNET_MQ_send (h->mq,
839 * Cancel a watch request
841 * @param wc handle to the watch request
844 GNUNET_PEERSTORE_watch_cancel (struct GNUNET_PEERSTORE_WatchContext *wc)
846 struct GNUNET_PEERSTORE_Handle *h = wc->h;
847 struct GNUNET_MQ_Envelope *ev;
848 struct StoreKeyHashMessage *hm;
850 LOG (GNUNET_ERROR_TYPE_DEBUG,
851 "Canceling watch.\n");
852 ev = GNUNET_MQ_msg (hm,
853 GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL);
854 hm->keyhash = wc->keyhash;
855 GNUNET_MQ_send (h->mq, ev);
856 GNUNET_CONTAINER_multihashmap_remove (h->watches,
864 * Request watching a given key
865 * User will be notified with any new values added to key
867 * @param h handle to the PEERSTORE service
868 * @param sub_system name of sub system
869 * @param peer Peer identity
870 * @param key entry key string
871 * @param callback function called with each new value
872 * @param callback_cls closure for @a callback
873 * @return Handle to watch request
875 struct GNUNET_PEERSTORE_WatchContext *
876 GNUNET_PEERSTORE_watch (struct GNUNET_PEERSTORE_Handle *h,
877 const char *sub_system,
878 const struct GNUNET_PeerIdentity *peer,
880 GNUNET_PEERSTORE_Processor callback,
883 struct GNUNET_MQ_Envelope *ev;
884 struct StoreKeyHashMessage *hm;
885 struct GNUNET_PEERSTORE_WatchContext *wc;
887 ev = GNUNET_MQ_msg (hm,
888 GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
889 PEERSTORE_hash_key (sub_system,
893 wc = GNUNET_new (struct GNUNET_PEERSTORE_WatchContext);
894 wc->callback = callback;
895 wc->callback_cls = callback_cls;
897 wc->keyhash = hm->keyhash;
898 if (NULL == h->watches)
899 h->watches = GNUNET_CONTAINER_multihashmap_create (5,
901 GNUNET_assert (GNUNET_OK ==
902 GNUNET_CONTAINER_multihashmap_put (h->watches,
905 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
906 LOG (GNUNET_ERROR_TYPE_DEBUG,
907 "Sending a watch request for subsystem `%s', peer `%s', key `%s'.\n",
911 GNUNET_MQ_send (h->mq,
916 /* end of peerstore_api.c */