2 This file is part of GNUnet.
3 Copyright (C) 2013-2016 GNUnet e.V.
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
16 * @file peerstore/peerstore_api.c
17 * @brief API for peerstore
18 * @author Omar Tarabai
19 * @author Christian Grothoff
22 #include "gnunet_util_lib.h"
23 #include "peerstore.h"
24 #include "peerstore_common.h"
26 #define LOG(kind,...) GNUNET_log_from (kind, "peerstore-api",__VA_ARGS__)
28 /******************************************************************************/
29 /************************ DATA STRUCTURES ****************************/
30 /******************************************************************************/
33 * Handle to the PEERSTORE service.
35 struct GNUNET_PEERSTORE_Handle
41 const struct GNUNET_CONFIGURATION_Handle *cfg;
46 struct GNUNET_MQ_Handle *mq;
49 * Head of active STORE requests.
51 struct GNUNET_PEERSTORE_StoreContext *store_head;
54 * Tail of active STORE requests.
56 struct GNUNET_PEERSTORE_StoreContext *store_tail;
59 * Head of active ITERATE requests.
61 struct GNUNET_PEERSTORE_IterateContext *iterate_head;
64 * Tail of active ITERATE requests.
66 struct GNUNET_PEERSTORE_IterateContext *iterate_tail;
69 * Hashmap of watch requests
71 struct GNUNET_CONTAINER_MultiHashMap *watches;
74 * Are we in the process of disconnecting but need to sync first?
81 * Context for a store request
83 struct GNUNET_PEERSTORE_StoreContext
88 struct GNUNET_PEERSTORE_StoreContext *next;
93 struct GNUNET_PEERSTORE_StoreContext *prev;
96 * Handle to the PEERSTORE service.
98 struct GNUNET_PEERSTORE_Handle *h;
101 * Continuation called with service response
103 GNUNET_PEERSTORE_Continuation cont;
106 * Closure for @e cont
111 * Which subsystem does the store?
116 * Key for the store operation.
121 * Contains @e size bytes.
126 * Peer the store is for.
128 struct GNUNET_PeerIdentity peer;
131 * Number of bytes in @e value.
136 * When does the value expire?
138 struct GNUNET_TIME_Absolute expiry;
141 * Options for the store operation.
143 enum GNUNET_PEERSTORE_StoreOption options;
148 * Context for a iterate request
150 struct GNUNET_PEERSTORE_IterateContext
155 struct GNUNET_PEERSTORE_IterateContext *next;
160 struct GNUNET_PEERSTORE_IterateContext *prev;
163 * Handle to the PEERSTORE service.
165 struct GNUNET_PEERSTORE_Handle *h;
168 * Which subsystem does the store?
173 * Peer the store is for.
175 struct GNUNET_PeerIdentity peer;
178 * Key for the store operation.
185 struct GNUNET_TIME_Relative timeout;
188 * Callback with each matching record
190 GNUNET_PEERSTORE_Processor callback;
193 * Closure for @e callback
198 * #GNUNET_YES if we are currently processing records.
203 * Task identifier for the function called
204 * on iterate request timeout
206 struct GNUNET_SCHEDULER_Task *timeout_task;
211 * Context for a watch request
213 struct GNUNET_PEERSTORE_WatchContext
218 struct GNUNET_PEERSTORE_WatchContext *next;
223 struct GNUNET_PEERSTORE_WatchContext *prev;
226 * Handle to the PEERSTORE service.
228 struct GNUNET_PEERSTORE_Handle *h;
231 * Callback with each record received
233 GNUNET_PEERSTORE_Processor callback;
236 * Closure for @e callback
241 * Hash of the combined key
243 struct GNUNET_HashCode keyhash;
247 /******************************************************************************/
248 /******************* DECLARATIONS *********************/
249 /******************************************************************************/
252 * Close the existing connection to PEERSTORE and reconnect.
254 * @param h handle to the service
257 reconnect (struct GNUNET_PEERSTORE_Handle *h);
261 * Callback after MQ envelope is sent
263 * @param cls a `struct GNUNET_PEERSTORE_StoreContext *`
266 store_request_sent (void *cls)
268 struct GNUNET_PEERSTORE_StoreContext *sc = cls;
269 GNUNET_PEERSTORE_Continuation cont;
273 cont_cls = sc->cont_cls;
274 GNUNET_PEERSTORE_store_cancel (sc);
276 cont (cont_cls, GNUNET_OK);
280 /******************************************************************************/
281 /******************* CONNECTION FUNCTIONS *********************/
282 /******************************************************************************/
286 * Function called when we had trouble talking to the service.
289 handle_client_error (void *cls,
290 enum GNUNET_MQ_Error error)
292 struct GNUNET_PEERSTORE_Handle *h = cls;
294 LOG (GNUNET_ERROR_TYPE_ERROR,
295 "Received an error notification from MQ of type: %d\n",
302 * Iterator over previous watches to resend them
304 * @param cls the `struct GNUNET_PEERSTORE_Handle`
305 * @param key key for the watch
306 * @param value the `struct GNUNET_PEERSTORE_WatchContext *`
307 * @return #GNUNET_YES (continue to iterate)
310 rewatch_it (void *cls,
311 const struct GNUNET_HashCode *key,
314 struct GNUNET_PEERSTORE_Handle *h = cls;
315 struct GNUNET_PEERSTORE_WatchContext *wc = value;
316 struct StoreKeyHashMessage *hm;
317 struct GNUNET_MQ_Envelope *ev;
319 ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
320 hm->keyhash = wc->keyhash;
321 GNUNET_MQ_send (h->mq, ev);
327 * Called when the iterate request is timedout
329 * @param cls a `struct GNUNET_PEERSTORE_IterateContext *`
332 iterate_timeout (void *cls)
334 struct GNUNET_PEERSTORE_IterateContext *ic = cls;
335 GNUNET_PEERSTORE_Processor callback;
338 ic->timeout_task = NULL;
339 callback = ic->callback;
340 callback_cls = ic->callback_cls;
341 GNUNET_PEERSTORE_iterate_cancel (ic);
342 if (NULL != callback)
343 callback (callback_cls,
350 * Iterator over watch requests to cancel them.
353 * @param key key to the watch request
354 * @param value watch context
355 * @return #GNUNET_YES to continue iteration
358 destroy_watch (void *cls,
359 const struct GNUNET_HashCode *key,
362 struct GNUNET_PEERSTORE_WatchContext *wc = value;
364 GNUNET_PEERSTORE_watch_cancel (wc);
370 * Kill the connection to the service. This can be delayed in case of pending
371 * STORE requests and the user explicitly asked to sync first. Otherwise it is
372 * performed instantly.
374 * @param h Handle to the service.
377 do_disconnect (struct GNUNET_PEERSTORE_Handle *h)
381 GNUNET_MQ_destroy (h->mq);
389 * Connect to the PEERSTORE service.
391 * @param cfg configuration to use
392 * @return NULL on error
394 struct GNUNET_PEERSTORE_Handle *
395 GNUNET_PEERSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
397 struct GNUNET_PEERSTORE_Handle *h;
399 h = GNUNET_new (struct GNUNET_PEERSTORE_Handle);
401 h->disconnecting = GNUNET_NO;
413 * Disconnect from the PEERSTORE service. Any pending ITERATE and WATCH requests
415 * Any pending STORE requests will depend on @e snyc_first flag.
417 * @param h handle to disconnect
418 * @param sync_first send any pending STORE requests before disconnecting
421 GNUNET_PEERSTORE_disconnect (struct GNUNET_PEERSTORE_Handle *h,
424 struct GNUNET_PEERSTORE_IterateContext *ic;
425 struct GNUNET_PEERSTORE_StoreContext *sc;
427 LOG (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting.\n");
428 if (NULL != h->watches)
430 GNUNET_CONTAINER_multihashmap_iterate (h->watches, &destroy_watch, NULL);
431 GNUNET_CONTAINER_multihashmap_destroy (h->watches);
434 while (NULL != (ic = h->iterate_head))
437 GNUNET_PEERSTORE_iterate_cancel (ic);
439 if (NULL != h->store_head)
441 if (GNUNET_YES == sync_first)
443 LOG (GNUNET_ERROR_TYPE_DEBUG,
444 "Delaying disconnection due to pending store requests.\n");
445 h->disconnecting = GNUNET_YES;
448 while (NULL != (sc = h->store_head))
449 GNUNET_PEERSTORE_store_cancel (sc);
455 /******************************************************************************/
456 /******************* STORE FUNCTIONS *********************/
457 /******************************************************************************/
461 * Cancel a store request
463 * @param sc Store request context
466 GNUNET_PEERSTORE_store_cancel (struct GNUNET_PEERSTORE_StoreContext *sc)
468 struct GNUNET_PEERSTORE_Handle *h = sc->h;
470 GNUNET_CONTAINER_DLL_remove (sc->h->store_head, sc->h->store_tail, sc);
471 GNUNET_free (sc->sub_system);
472 GNUNET_free (sc->value);
473 GNUNET_free (sc->key);
475 if ((GNUNET_YES == h->disconnecting) && (NULL == h->store_head))
481 * Store a new entry in the PEERSTORE.
482 * Note that stored entries can be lost in some cases
483 * such as power failure.
485 * @param h Handle to the PEERSTORE service
486 * @param sub_system name of the sub system
487 * @param peer Peer Identity
488 * @param key entry key
489 * @param value entry value BLOB
490 * @param size size of @e value
491 * @param expiry absolute time after which the entry is (possibly) deleted
492 * @param options options specific to the storage operation
493 * @param cont Continuation function after the store request is sent
494 * @param cont_cls Closure for @a cont
496 struct GNUNET_PEERSTORE_StoreContext *
497 GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h,
498 const char *sub_system,
499 const struct GNUNET_PeerIdentity *peer,
501 const void *value, size_t size,
502 struct GNUNET_TIME_Absolute expiry,
503 enum GNUNET_PEERSTORE_StoreOption options,
504 GNUNET_PEERSTORE_Continuation cont,
507 struct GNUNET_MQ_Envelope *ev;
508 struct GNUNET_PEERSTORE_StoreContext *sc;
510 LOG (GNUNET_ERROR_TYPE_DEBUG,
511 "Storing value (size: %lu) for subsytem `%s', peer `%s', key `%s'\n",
512 size, sub_system, GNUNET_i2s (peer), key);
513 ev = PEERSTORE_create_record_mq_envelope (sub_system, peer, key, value, size,
515 GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
516 sc = GNUNET_new (struct GNUNET_PEERSTORE_StoreContext);
518 sc->sub_system = GNUNET_strdup (sub_system);
520 sc->key = GNUNET_strdup (key);
521 sc->value = GNUNET_memdup (value, size);
524 sc->options = options;
526 sc->cont_cls = cont_cls;
529 GNUNET_CONTAINER_DLL_insert_tail (h->store_head, h->store_tail, sc);
530 GNUNET_MQ_notify_sent (ev, &store_request_sent, sc);
531 GNUNET_MQ_send (h->mq, ev);
537 /******************************************************************************/
538 /******************* ITERATE FUNCTIONS *********************/
539 /******************************************************************************/
543 * When a response for iterate request is received
545 * @param cls a `struct GNUNET_PEERSTORE_Handle *`
546 * @param msg message received
549 handle_iterate_end (void *cls,
550 const struct GNUNET_MessageHeader *msg)
552 struct GNUNET_PEERSTORE_Handle *h = cls;
553 struct GNUNET_PEERSTORE_IterateContext *ic;
554 GNUNET_PEERSTORE_Processor callback;
557 ic = h->iterate_head;
560 LOG (GNUNET_ERROR_TYPE_ERROR,
561 _("Unexpected iteration response, this should not happen.\n"));
565 callback = ic->callback;
566 callback_cls = ic->callback_cls;
567 ic->iterating = GNUNET_NO;
568 GNUNET_PEERSTORE_iterate_cancel (ic);
569 if (NULL != callback)
570 callback (callback_cls, NULL, NULL);
575 * When a response for iterate request is received, check the
576 * message is well-formed.
578 * @param cls a `struct GNUNET_PEERSTORE_Handle *`
579 * @param msg message received
582 check_iterate_result (void *cls,
583 const struct StoreRecordMessage *msg)
585 /* we defer validation to #handle_iterate_result */
591 * When a response for iterate request is received
593 * @param cls a `struct GNUNET_PEERSTORE_Handle *`
594 * @param msg message received
597 handle_iterate_result (void *cls,
598 const struct StoreRecordMessage *msg)
600 struct GNUNET_PEERSTORE_Handle *h = cls;
601 struct GNUNET_PEERSTORE_IterateContext *ic;
602 GNUNET_PEERSTORE_Processor callback;
604 struct GNUNET_PEERSTORE_Record *record;
606 ic = h->iterate_head;
609 LOG (GNUNET_ERROR_TYPE_ERROR,
610 _("Unexpected iteration response, this should not happen.\n"));
614 ic->iterating = GNUNET_YES;
615 callback = ic->callback;
616 callback_cls = ic->callback_cls;
617 if (NULL == callback)
619 record = PEERSTORE_parse_record_message (msg);
622 callback (callback_cls,
624 _("Received a malformed response from service."));
628 callback (callback_cls,
631 PEERSTORE_destroy_record (record);
637 * Cancel an iterate request
638 * Please do not call after the iterate request is done
640 * @param ic Iterate request context as returned by GNUNET_PEERSTORE_iterate()
643 GNUNET_PEERSTORE_iterate_cancel (struct GNUNET_PEERSTORE_IterateContext *ic)
645 if (NULL != ic->timeout_task)
647 GNUNET_SCHEDULER_cancel (ic->timeout_task);
648 ic->timeout_task = NULL;
650 if (GNUNET_NO == ic->iterating)
652 GNUNET_CONTAINER_DLL_remove (ic->h->iterate_head,
655 GNUNET_free (ic->sub_system);
656 GNUNET_free_non_null (ic->key);
665 * Iterate over records matching supplied key information
667 * @param h handle to the PEERSTORE service
668 * @param sub_system name of sub system
669 * @param peer Peer identity (can be NULL)
670 * @param key entry key string (can be NULL)
671 * @param timeout time after which the iterate request is canceled
672 * @param callback function called with each matching record, all NULL's on end
673 * @param callback_cls closure for @a callback
674 * @return Handle to iteration request
676 struct GNUNET_PEERSTORE_IterateContext *
677 GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h,
678 const char *sub_system,
679 const struct GNUNET_PeerIdentity *peer,
681 struct GNUNET_TIME_Relative timeout,
682 GNUNET_PEERSTORE_Processor callback,
685 struct GNUNET_MQ_Envelope *ev;
686 struct GNUNET_PEERSTORE_IterateContext *ic;
688 ev = PEERSTORE_create_record_mq_envelope (sub_system,
692 GNUNET_TIME_UNIT_FOREVER_ABS,
694 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
695 ic = GNUNET_new (struct GNUNET_PEERSTORE_IterateContext);
697 ic->callback = callback;
698 ic->callback_cls = callback_cls;
700 ic->sub_system = GNUNET_strdup (sub_system);
704 ic->key = GNUNET_strdup (key);
705 ic->timeout = timeout;
706 GNUNET_CONTAINER_DLL_insert_tail (h->iterate_head,
709 LOG (GNUNET_ERROR_TYPE_DEBUG,
710 "Sending an iterate request for sub system `%s'\n",
712 GNUNET_MQ_send (h->mq, ev);
714 GNUNET_SCHEDULER_add_delayed (timeout,
721 /******************************************************************************/
722 /******************* WATCH FUNCTIONS *********************/
723 /******************************************************************************/
726 * When a watch record is received, validate it is well-formed.
728 * @param cls a `struct GNUNET_PEERSTORE_Handle *`
729 * @param msg message received
732 check_watch_record (void *cls,
733 const struct StoreRecordMessage *msg)
735 /* we defer validation to #handle_watch_result */
741 * When a watch record is received, process it.
743 * @param cls a `struct GNUNET_PEERSTORE_Handle *`
744 * @param msg message received
747 handle_watch_record (void *cls,
748 const struct StoreRecordMessage *msg)
750 struct GNUNET_PEERSTORE_Handle *h = cls;
751 struct GNUNET_PEERSTORE_Record *record;
752 struct GNUNET_HashCode keyhash;
753 struct GNUNET_PEERSTORE_WatchContext *wc;
755 LOG (GNUNET_ERROR_TYPE_DEBUG,
756 "Received a watch record from service.\n");
757 record = PEERSTORE_parse_record_message (msg);
763 PEERSTORE_hash_key (record->sub_system,
767 // FIXME: what if there are multiple watches for the same key?
768 wc = GNUNET_CONTAINER_multihashmap_get (h->watches,
772 LOG (GNUNET_ERROR_TYPE_ERROR,
773 _("Received a watch result for a non existing watch.\n"));
774 PEERSTORE_destroy_record (record);
778 if (NULL != wc->callback)
779 wc->callback (wc->callback_cls,
782 PEERSTORE_destroy_record (record);
787 * Close the existing connection to PEERSTORE and reconnect.
789 * @param h handle to the service
792 reconnect (struct GNUNET_PEERSTORE_Handle *h)
794 struct GNUNET_MQ_MessageHandler mq_handlers[] = {
795 GNUNET_MQ_hd_fixed_size (iterate_end,
796 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END,
797 struct GNUNET_MessageHeader,
799 GNUNET_MQ_hd_var_size (iterate_result,
800 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD,
801 struct StoreRecordMessage,
803 GNUNET_MQ_hd_var_size (watch_record,
804 GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD,
805 struct StoreRecordMessage,
807 GNUNET_MQ_handler_end ()
809 struct GNUNET_PEERSTORE_IterateContext *ic;
810 struct GNUNET_PEERSTORE_IterateContext *next;
811 GNUNET_PEERSTORE_Processor icb;
813 struct GNUNET_PEERSTORE_StoreContext *sc;
814 struct GNUNET_MQ_Envelope *ev;
816 LOG (GNUNET_ERROR_TYPE_DEBUG,
817 "Reconnecting...\n");
818 for (ic = h->iterate_head; NULL != ic; ic = next)
821 if (GNUNET_YES == ic->iterating)
824 icb_cls = ic->callback_cls;
825 GNUNET_PEERSTORE_iterate_cancel (ic);
829 "Iteration canceled due to reconnection");
834 GNUNET_MQ_destroy (h->mq);
837 h->mq = GNUNET_CLIENT_connect (h->cfg,
840 &handle_client_error,
844 LOG (GNUNET_ERROR_TYPE_DEBUG,
845 "Resending pending requests after reconnect.\n");
846 if (NULL != h->watches)
847 GNUNET_CONTAINER_multihashmap_iterate (h->watches,
850 for (ic = h->iterate_head; NULL != ic; ic = ic->next)
852 ev = PEERSTORE_create_record_mq_envelope (ic->sub_system,
856 GNUNET_TIME_UNIT_FOREVER_ABS,
858 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
859 GNUNET_MQ_send (h->mq, ev);
860 if (NULL != ic->timeout_task)
861 GNUNET_SCHEDULER_cancel (ic->timeout_task);
863 = GNUNET_SCHEDULER_add_delayed (ic->timeout,
867 for (sc = h->store_head; NULL != sc; sc = sc->next)
869 ev = PEERSTORE_create_record_mq_envelope (sc->sub_system,
876 GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
877 GNUNET_MQ_notify_sent (ev,
880 GNUNET_MQ_send (h->mq,
887 * Cancel a watch request
889 * @param wc handle to the watch request
892 GNUNET_PEERSTORE_watch_cancel (struct GNUNET_PEERSTORE_WatchContext *wc)
894 struct GNUNET_PEERSTORE_Handle *h = wc->h;
895 struct GNUNET_MQ_Envelope *ev;
896 struct StoreKeyHashMessage *hm;
898 LOG (GNUNET_ERROR_TYPE_DEBUG,
899 "Canceling watch.\n");
900 ev = GNUNET_MQ_msg (hm,
901 GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL);
902 hm->keyhash = wc->keyhash;
903 GNUNET_MQ_send (h->mq, ev);
904 GNUNET_CONTAINER_multihashmap_remove (h->watches,
912 * Request watching a given key
913 * User will be notified with any new values added to key
915 * @param h handle to the PEERSTORE service
916 * @param sub_system name of sub system
917 * @param peer Peer identity
918 * @param key entry key string
919 * @param callback function called with each new value
920 * @param callback_cls closure for @a callback
921 * @return Handle to watch request
923 struct GNUNET_PEERSTORE_WatchContext *
924 GNUNET_PEERSTORE_watch (struct GNUNET_PEERSTORE_Handle *h,
925 const char *sub_system,
926 const struct GNUNET_PeerIdentity *peer,
928 GNUNET_PEERSTORE_Processor callback,
931 struct GNUNET_MQ_Envelope *ev;
932 struct StoreKeyHashMessage *hm;
933 struct GNUNET_PEERSTORE_WatchContext *wc;
935 ev = GNUNET_MQ_msg (hm,
936 GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
937 PEERSTORE_hash_key (sub_system,
941 wc = GNUNET_new (struct GNUNET_PEERSTORE_WatchContext);
942 wc->callback = callback;
943 wc->callback_cls = callback_cls;
945 wc->keyhash = hm->keyhash;
946 if (NULL == h->watches)
947 h->watches = GNUNET_CONTAINER_multihashmap_create (5,
949 GNUNET_assert (GNUNET_OK ==
950 GNUNET_CONTAINER_multihashmap_put (h->watches,
953 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
954 LOG (GNUNET_ERROR_TYPE_DEBUG,
955 "Sending a watch request for subsystem `%s', peer `%s', key `%s'.\n",
959 GNUNET_MQ_send (h->mq,
964 /* end of peerstore_api.c */