2 This file is part of GNUnet.
3 Copyright (C) 2013-2016 GNUnet e.V.
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
19 * @file peerstore/peerstore_api.c
20 * @brief API for peerstore
21 * @author Omar Tarabai
22 * @author Christian Grothoff
25 #include "gnunet_util_lib.h"
26 #include "peerstore.h"
27 #include "peerstore_common.h"
29 #define LOG(kind,...) GNUNET_log_from (kind, "peerstore-api",__VA_ARGS__)
31 /******************************************************************************/
32 /************************ DATA STRUCTURES ****************************/
33 /******************************************************************************/
36 * Handle to the PEERSTORE service.
38 struct GNUNET_PEERSTORE_Handle
44 const struct GNUNET_CONFIGURATION_Handle *cfg;
49 struct GNUNET_MQ_Handle *mq;
52 * Head of active STORE requests.
54 struct GNUNET_PEERSTORE_StoreContext *store_head;
57 * Tail of active STORE requests.
59 struct GNUNET_PEERSTORE_StoreContext *store_tail;
62 * Head of active ITERATE requests.
64 struct GNUNET_PEERSTORE_IterateContext *iterate_head;
67 * Tail of active ITERATE requests.
69 struct GNUNET_PEERSTORE_IterateContext *iterate_tail;
72 * Hashmap of watch requests
74 struct GNUNET_CONTAINER_MultiHashMap *watches;
77 * Are we in the process of disconnecting but need to sync first?
84 * Context for a store request
86 struct GNUNET_PEERSTORE_StoreContext
91 struct GNUNET_PEERSTORE_StoreContext *next;
96 struct GNUNET_PEERSTORE_StoreContext *prev;
99 * Handle to the PEERSTORE service.
101 struct GNUNET_PEERSTORE_Handle *h;
104 * Continuation called with service response
106 GNUNET_PEERSTORE_Continuation cont;
109 * Closure for @e cont
114 * Which subsystem does the store?
119 * Key for the store operation.
124 * Contains @e size bytes.
129 * Peer the store is for.
131 struct GNUNET_PeerIdentity peer;
134 * Number of bytes in @e value.
139 * When does the value expire?
141 struct GNUNET_TIME_Absolute expiry;
144 * Options for the store operation.
146 enum GNUNET_PEERSTORE_StoreOption options;
151 * Context for a iterate request
153 struct GNUNET_PEERSTORE_IterateContext
158 struct GNUNET_PEERSTORE_IterateContext *next;
163 struct GNUNET_PEERSTORE_IterateContext *prev;
166 * Handle to the PEERSTORE service.
168 struct GNUNET_PEERSTORE_Handle *h;
171 * Which subsystem does the store?
176 * Peer the store is for.
178 struct GNUNET_PeerIdentity peer;
181 * Key for the store operation.
188 struct GNUNET_TIME_Relative timeout;
191 * Callback with each matching record
193 GNUNET_PEERSTORE_Processor callback;
196 * Closure for @e callback
201 * #GNUNET_YES if we are currently processing records.
206 * Task identifier for the function called
207 * on iterate request timeout
209 struct GNUNET_SCHEDULER_Task *timeout_task;
214 * Context for a watch request
216 struct GNUNET_PEERSTORE_WatchContext
221 struct GNUNET_PEERSTORE_WatchContext *next;
226 struct GNUNET_PEERSTORE_WatchContext *prev;
229 * Handle to the PEERSTORE service.
231 struct GNUNET_PEERSTORE_Handle *h;
234 * Callback with each record received
236 GNUNET_PEERSTORE_Processor callback;
239 * Closure for @e callback
244 * Hash of the combined key
246 struct GNUNET_HashCode keyhash;
250 /******************************************************************************/
251 /******************* DECLARATIONS *********************/
252 /******************************************************************************/
255 * Close the existing connection to PEERSTORE and reconnect.
257 * @param h handle to the service
260 reconnect (struct GNUNET_PEERSTORE_Handle *h);
264 * Callback after MQ envelope is sent
266 * @param cls a `struct GNUNET_PEERSTORE_StoreContext *`
269 store_request_sent (void *cls)
271 struct GNUNET_PEERSTORE_StoreContext *sc = cls;
272 GNUNET_PEERSTORE_Continuation cont;
276 cont_cls = sc->cont_cls;
277 GNUNET_PEERSTORE_store_cancel (sc);
279 cont (cont_cls, GNUNET_OK);
283 /******************************************************************************/
284 /******************* CONNECTION FUNCTIONS *********************/
285 /******************************************************************************/
289 * Function called when we had trouble talking to the service.
292 handle_client_error (void *cls,
293 enum GNUNET_MQ_Error error)
295 struct GNUNET_PEERSTORE_Handle *h = cls;
297 LOG (GNUNET_ERROR_TYPE_ERROR,
298 "Received an error notification from MQ of type: %d\n",
305 * Iterator over previous watches to resend them
307 * @param cls the `struct GNUNET_PEERSTORE_Handle`
308 * @param key key for the watch
309 * @param value the `struct GNUNET_PEERSTORE_WatchContext *`
310 * @return #GNUNET_YES (continue to iterate)
313 rewatch_it (void *cls,
314 const struct GNUNET_HashCode *key,
317 struct GNUNET_PEERSTORE_Handle *h = cls;
318 struct GNUNET_PEERSTORE_WatchContext *wc = value;
319 struct StoreKeyHashMessage *hm;
320 struct GNUNET_MQ_Envelope *ev;
322 ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
323 hm->keyhash = wc->keyhash;
324 GNUNET_MQ_send (h->mq, ev);
330 * Called when the iterate request is timedout
332 * @param cls a `struct GNUNET_PEERSTORE_IterateContext *`
335 iterate_timeout (void *cls)
337 struct GNUNET_PEERSTORE_IterateContext *ic = cls;
338 GNUNET_PEERSTORE_Processor callback;
341 ic->timeout_task = NULL;
342 callback = ic->callback;
343 callback_cls = ic->callback_cls;
344 GNUNET_PEERSTORE_iterate_cancel (ic);
345 if (NULL != callback)
346 callback (callback_cls,
353 * Iterator over watch requests to cancel them.
356 * @param key key to the watch request
357 * @param value watch context
358 * @return #GNUNET_YES to continue iteration
361 destroy_watch (void *cls,
362 const struct GNUNET_HashCode *key,
365 struct GNUNET_PEERSTORE_WatchContext *wc = value;
367 GNUNET_PEERSTORE_watch_cancel (wc);
373 * Kill the connection to the service. This can be delayed in case of pending
374 * STORE requests and the user explicitly asked to sync first. Otherwise it is
375 * performed instantly.
377 * @param h Handle to the service.
380 do_disconnect (struct GNUNET_PEERSTORE_Handle *h)
384 GNUNET_MQ_destroy (h->mq);
392 * Connect to the PEERSTORE service.
394 * @param cfg configuration to use
395 * @return NULL on error
397 struct GNUNET_PEERSTORE_Handle *
398 GNUNET_PEERSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
400 struct GNUNET_PEERSTORE_Handle *h;
402 h = GNUNET_new (struct GNUNET_PEERSTORE_Handle);
404 h->disconnecting = GNUNET_NO;
416 * Disconnect from the PEERSTORE service. Any pending ITERATE and WATCH requests
418 * Any pending STORE requests will depend on @e snyc_first flag.
420 * @param h handle to disconnect
421 * @param sync_first send any pending STORE requests before disconnecting
424 GNUNET_PEERSTORE_disconnect (struct GNUNET_PEERSTORE_Handle *h,
427 struct GNUNET_PEERSTORE_IterateContext *ic;
428 struct GNUNET_PEERSTORE_StoreContext *sc;
430 LOG (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting.\n");
431 if (NULL != h->watches)
433 GNUNET_CONTAINER_multihashmap_iterate (h->watches, &destroy_watch, NULL);
434 GNUNET_CONTAINER_multihashmap_destroy (h->watches);
437 while (NULL != (ic = h->iterate_head))
440 GNUNET_PEERSTORE_iterate_cancel (ic);
442 if (NULL != h->store_head)
444 if (GNUNET_YES == sync_first)
446 LOG (GNUNET_ERROR_TYPE_DEBUG,
447 "Delaying disconnection due to pending store requests.\n");
448 h->disconnecting = GNUNET_YES;
451 while (NULL != (sc = h->store_head))
452 GNUNET_PEERSTORE_store_cancel (sc);
458 /******************************************************************************/
459 /******************* STORE FUNCTIONS *********************/
460 /******************************************************************************/
464 * Cancel a store request
466 * @param sc Store request context
469 GNUNET_PEERSTORE_store_cancel (struct GNUNET_PEERSTORE_StoreContext *sc)
471 struct GNUNET_PEERSTORE_Handle *h = sc->h;
473 GNUNET_CONTAINER_DLL_remove (sc->h->store_head, sc->h->store_tail, sc);
474 GNUNET_free (sc->sub_system);
475 GNUNET_free (sc->value);
476 GNUNET_free (sc->key);
478 if ((GNUNET_YES == h->disconnecting) && (NULL == h->store_head))
484 * Store a new entry in the PEERSTORE.
485 * Note that stored entries can be lost in some cases
486 * such as power failure.
488 * @param h Handle to the PEERSTORE service
489 * @param sub_system name of the sub system
490 * @param peer Peer Identity
491 * @param key entry key
492 * @param value entry value BLOB
493 * @param size size of @e value
494 * @param expiry absolute time after which the entry is (possibly) deleted
495 * @param options options specific to the storage operation
496 * @param cont Continuation function after the store request is sent
497 * @param cont_cls Closure for @a cont
499 struct GNUNET_PEERSTORE_StoreContext *
500 GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h,
501 const char *sub_system,
502 const struct GNUNET_PeerIdentity *peer,
504 const void *value, size_t size,
505 struct GNUNET_TIME_Absolute expiry,
506 enum GNUNET_PEERSTORE_StoreOption options,
507 GNUNET_PEERSTORE_Continuation cont,
510 struct GNUNET_MQ_Envelope *ev;
511 struct GNUNET_PEERSTORE_StoreContext *sc;
513 LOG (GNUNET_ERROR_TYPE_DEBUG,
514 "Storing value (size: %lu) for subsytem `%s', peer `%s', key `%s'\n",
515 size, sub_system, GNUNET_i2s (peer), key);
516 ev = PEERSTORE_create_record_mq_envelope (sub_system, peer, key, value, size,
518 GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
519 sc = GNUNET_new (struct GNUNET_PEERSTORE_StoreContext);
521 sc->sub_system = GNUNET_strdup (sub_system);
523 sc->key = GNUNET_strdup (key);
524 sc->value = GNUNET_memdup (value, size);
527 sc->options = options;
529 sc->cont_cls = cont_cls;
532 GNUNET_CONTAINER_DLL_insert_tail (h->store_head, h->store_tail, sc);
533 GNUNET_MQ_notify_sent (ev, &store_request_sent, sc);
534 GNUNET_MQ_send (h->mq, ev);
540 /******************************************************************************/
541 /******************* ITERATE FUNCTIONS *********************/
542 /******************************************************************************/
546 * When a response for iterate request is received
548 * @param cls a `struct GNUNET_PEERSTORE_Handle *`
549 * @param msg message received
552 handle_iterate_end (void *cls,
553 const struct GNUNET_MessageHeader *msg)
555 struct GNUNET_PEERSTORE_Handle *h = cls;
556 struct GNUNET_PEERSTORE_IterateContext *ic;
557 GNUNET_PEERSTORE_Processor callback;
560 ic = h->iterate_head;
563 LOG (GNUNET_ERROR_TYPE_ERROR,
564 _("Unexpected iteration response, this should not happen.\n"));
568 callback = ic->callback;
569 callback_cls = ic->callback_cls;
570 ic->iterating = GNUNET_NO;
571 GNUNET_PEERSTORE_iterate_cancel (ic);
572 if (NULL != callback)
573 callback (callback_cls, NULL, NULL);
578 * When a response for iterate request is received, check the
579 * message is well-formed.
581 * @param cls a `struct GNUNET_PEERSTORE_Handle *`
582 * @param msg message received
585 check_iterate_result (void *cls,
586 const struct StoreRecordMessage *msg)
588 /* we defer validation to #handle_iterate_result */
594 * When a response for iterate request is received
596 * @param cls a `struct GNUNET_PEERSTORE_Handle *`
597 * @param msg message received
600 handle_iterate_result (void *cls,
601 const struct StoreRecordMessage *msg)
603 struct GNUNET_PEERSTORE_Handle *h = cls;
604 struct GNUNET_PEERSTORE_IterateContext *ic;
605 GNUNET_PEERSTORE_Processor callback;
607 struct GNUNET_PEERSTORE_Record *record;
609 ic = h->iterate_head;
612 LOG (GNUNET_ERROR_TYPE_ERROR,
613 _("Unexpected iteration response, this should not happen.\n"));
617 ic->iterating = GNUNET_YES;
618 callback = ic->callback;
619 callback_cls = ic->callback_cls;
620 if (NULL == callback)
622 record = PEERSTORE_parse_record_message (msg);
625 callback (callback_cls,
627 _("Received a malformed response from service."));
631 callback (callback_cls,
634 PEERSTORE_destroy_record (record);
640 * Cancel an iterate request
641 * Please do not call after the iterate request is done
643 * @param ic Iterate request context as returned by GNUNET_PEERSTORE_iterate()
646 GNUNET_PEERSTORE_iterate_cancel (struct GNUNET_PEERSTORE_IterateContext *ic)
648 if (NULL != ic->timeout_task)
650 GNUNET_SCHEDULER_cancel (ic->timeout_task);
651 ic->timeout_task = NULL;
653 if (GNUNET_NO == ic->iterating)
655 GNUNET_CONTAINER_DLL_remove (ic->h->iterate_head,
658 GNUNET_free (ic->sub_system);
659 GNUNET_free_non_null (ic->key);
668 * Iterate over records matching supplied key information
670 * @param h handle to the PEERSTORE service
671 * @param sub_system name of sub system
672 * @param peer Peer identity (can be NULL)
673 * @param key entry key string (can be NULL)
674 * @param timeout time after which the iterate request is canceled
675 * @param callback function called with each matching record, all NULL's on end
676 * @param callback_cls closure for @a callback
677 * @return Handle to iteration request
679 struct GNUNET_PEERSTORE_IterateContext *
680 GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h,
681 const char *sub_system,
682 const struct GNUNET_PeerIdentity *peer,
684 struct GNUNET_TIME_Relative timeout,
685 GNUNET_PEERSTORE_Processor callback,
688 struct GNUNET_MQ_Envelope *ev;
689 struct GNUNET_PEERSTORE_IterateContext *ic;
691 ev = PEERSTORE_create_record_mq_envelope (sub_system,
695 GNUNET_TIME_UNIT_FOREVER_ABS,
697 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
698 ic = GNUNET_new (struct GNUNET_PEERSTORE_IterateContext);
700 ic->callback = callback;
701 ic->callback_cls = callback_cls;
703 ic->sub_system = GNUNET_strdup (sub_system);
707 ic->key = GNUNET_strdup (key);
708 ic->timeout = timeout;
709 GNUNET_CONTAINER_DLL_insert_tail (h->iterate_head,
712 LOG (GNUNET_ERROR_TYPE_DEBUG,
713 "Sending an iterate request for sub system `%s'\n",
715 GNUNET_MQ_send (h->mq, ev);
717 GNUNET_SCHEDULER_add_delayed (timeout,
724 /******************************************************************************/
725 /******************* WATCH FUNCTIONS *********************/
726 /******************************************************************************/
729 * When a watch record is received, validate it is well-formed.
731 * @param cls a `struct GNUNET_PEERSTORE_Handle *`
732 * @param msg message received
735 check_watch_record (void *cls,
736 const struct StoreRecordMessage *msg)
738 /* we defer validation to #handle_watch_result */
744 * When a watch record is received, process it.
746 * @param cls a `struct GNUNET_PEERSTORE_Handle *`
747 * @param msg message received
750 handle_watch_record (void *cls,
751 const struct StoreRecordMessage *msg)
753 struct GNUNET_PEERSTORE_Handle *h = cls;
754 struct GNUNET_PEERSTORE_Record *record;
755 struct GNUNET_HashCode keyhash;
756 struct GNUNET_PEERSTORE_WatchContext *wc;
758 LOG (GNUNET_ERROR_TYPE_DEBUG,
759 "Received a watch record from service.\n");
760 record = PEERSTORE_parse_record_message (msg);
766 PEERSTORE_hash_key (record->sub_system,
770 // FIXME: what if there are multiple watches for the same key?
771 wc = GNUNET_CONTAINER_multihashmap_get (h->watches,
775 LOG (GNUNET_ERROR_TYPE_ERROR,
776 _("Received a watch result for a non existing watch.\n"));
777 PEERSTORE_destroy_record (record);
781 if (NULL != wc->callback)
782 wc->callback (wc->callback_cls,
785 PEERSTORE_destroy_record (record);
790 * Close the existing connection to PEERSTORE and reconnect.
792 * @param h handle to the service
795 reconnect (struct GNUNET_PEERSTORE_Handle *h)
797 struct GNUNET_MQ_MessageHandler mq_handlers[] = {
798 GNUNET_MQ_hd_fixed_size (iterate_end,
799 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END,
800 struct GNUNET_MessageHeader,
802 GNUNET_MQ_hd_var_size (iterate_result,
803 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD,
804 struct StoreRecordMessage,
806 GNUNET_MQ_hd_var_size (watch_record,
807 GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD,
808 struct StoreRecordMessage,
810 GNUNET_MQ_handler_end ()
812 struct GNUNET_PEERSTORE_IterateContext *ic;
813 struct GNUNET_PEERSTORE_IterateContext *next;
814 GNUNET_PEERSTORE_Processor icb;
816 struct GNUNET_PEERSTORE_StoreContext *sc;
817 struct GNUNET_MQ_Envelope *ev;
819 LOG (GNUNET_ERROR_TYPE_DEBUG,
820 "Reconnecting...\n");
821 for (ic = h->iterate_head; NULL != ic; ic = next)
824 if (GNUNET_YES == ic->iterating)
827 icb_cls = ic->callback_cls;
828 GNUNET_PEERSTORE_iterate_cancel (ic);
832 "Iteration canceled due to reconnection");
837 GNUNET_MQ_destroy (h->mq);
840 h->mq = GNUNET_CLIENT_connect (h->cfg,
843 &handle_client_error,
847 LOG (GNUNET_ERROR_TYPE_DEBUG,
848 "Resending pending requests after reconnect.\n");
849 if (NULL != h->watches)
850 GNUNET_CONTAINER_multihashmap_iterate (h->watches,
853 for (ic = h->iterate_head; NULL != ic; ic = ic->next)
855 ev = PEERSTORE_create_record_mq_envelope (ic->sub_system,
859 GNUNET_TIME_UNIT_FOREVER_ABS,
861 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
862 GNUNET_MQ_send (h->mq, ev);
863 if (NULL != ic->timeout_task)
864 GNUNET_SCHEDULER_cancel (ic->timeout_task);
866 = GNUNET_SCHEDULER_add_delayed (ic->timeout,
870 for (sc = h->store_head; NULL != sc; sc = sc->next)
872 ev = PEERSTORE_create_record_mq_envelope (sc->sub_system,
879 GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
880 GNUNET_MQ_notify_sent (ev,
883 GNUNET_MQ_send (h->mq,
890 * Cancel a watch request
892 * @param wc handle to the watch request
895 GNUNET_PEERSTORE_watch_cancel (struct GNUNET_PEERSTORE_WatchContext *wc)
897 struct GNUNET_PEERSTORE_Handle *h = wc->h;
898 struct GNUNET_MQ_Envelope *ev;
899 struct StoreKeyHashMessage *hm;
901 LOG (GNUNET_ERROR_TYPE_DEBUG,
902 "Canceling watch.\n");
903 ev = GNUNET_MQ_msg (hm,
904 GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL);
905 hm->keyhash = wc->keyhash;
906 GNUNET_MQ_send (h->mq, ev);
907 GNUNET_CONTAINER_multihashmap_remove (h->watches,
915 * Request watching a given key
916 * User will be notified with any new values added to key
918 * @param h handle to the PEERSTORE service
919 * @param sub_system name of sub system
920 * @param peer Peer identity
921 * @param key entry key string
922 * @param callback function called with each new value
923 * @param callback_cls closure for @a callback
924 * @return Handle to watch request
926 struct GNUNET_PEERSTORE_WatchContext *
927 GNUNET_PEERSTORE_watch (struct GNUNET_PEERSTORE_Handle *h,
928 const char *sub_system,
929 const struct GNUNET_PeerIdentity *peer,
931 GNUNET_PEERSTORE_Processor callback,
934 struct GNUNET_MQ_Envelope *ev;
935 struct StoreKeyHashMessage *hm;
936 struct GNUNET_PEERSTORE_WatchContext *wc;
938 ev = GNUNET_MQ_msg (hm,
939 GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
940 PEERSTORE_hash_key (sub_system,
944 wc = GNUNET_new (struct GNUNET_PEERSTORE_WatchContext);
945 wc->callback = callback;
946 wc->callback_cls = callback_cls;
948 wc->keyhash = hm->keyhash;
949 if (NULL == h->watches)
950 h->watches = GNUNET_CONTAINER_multihashmap_create (5,
952 GNUNET_assert (GNUNET_OK ==
953 GNUNET_CONTAINER_multihashmap_put (h->watches,
956 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
957 LOG (GNUNET_ERROR_TYPE_DEBUG,
958 "Sending a watch request for subsystem `%s', peer `%s', key `%s'.\n",
962 GNUNET_MQ_send (h->mq,
967 /* end of peerstore_api.c */