2 This file is part of GNUnet.
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file peerstore/peerstore_api.c
23 * @brief API for peerstore
24 * @author Omar Tarabai
27 #include "gnunet_util_lib.h"
28 #include "peerstore.h"
29 #include "peerstore_common.h"
31 #define LOG(kind,...) GNUNET_log_from (kind, "peerstore-api",__VA_ARGS__)
33 /******************************************************************************/
34 /************************ DATA STRUCTURES ****************************/
35 /******************************************************************************/
38 * Handle to the PEERSTORE service.
40 struct GNUNET_PEERSTORE_Handle
46 const struct GNUNET_CONFIGURATION_Handle *cfg;
49 * Connection to the service.
51 struct GNUNET_CLIENT_Connection *client;
56 struct GNUNET_MQ_Handle *mq;
59 * Head of active STORE requests.
61 struct GNUNET_PEERSTORE_StoreContext *store_head;
64 * Tail of active STORE requests.
66 struct GNUNET_PEERSTORE_StoreContext *store_tail;
69 * Head of active ITERATE requests.
71 struct GNUNET_PEERSTORE_IterateContext *iterate_head;
74 * Tail of active ITERATE requests.
76 struct GNUNET_PEERSTORE_IterateContext *iterate_tail;
79 * Hashmap of watch requests
81 struct GNUNET_CONTAINER_MultiHashMap *watches;
86 * Context for a store request
88 struct GNUNET_PEERSTORE_StoreContext
93 struct GNUNET_PEERSTORE_StoreContext *next;
98 struct GNUNET_PEERSTORE_StoreContext *prev;
101 * Handle to the PEERSTORE service.
103 struct GNUNET_PEERSTORE_Handle *h;
106 * MQ Envelope with store request message
108 struct GNUNET_MQ_Envelope *ev;
111 * Continuation called with service response
113 GNUNET_PEERSTORE_Continuation cont;
121 * #GNUNET_YES / #GNUNET_NO
122 * if sent, cannot be canceled
129 * Context for a iterate request
131 struct GNUNET_PEERSTORE_IterateContext
136 struct GNUNET_PEERSTORE_IterateContext *next;
141 struct GNUNET_PEERSTORE_IterateContext *prev;
144 * Handle to the PEERSTORE service.
146 struct GNUNET_PEERSTORE_Handle *h;
149 * MQ Envelope with iterate request message
151 struct GNUNET_MQ_Envelope *ev;
154 * Callback with each matching record
156 GNUNET_PEERSTORE_Processor callback;
159 * Closure for 'callback'
164 * #GNUNET_YES / #GNUNET_NO
165 * if sent, cannot be canceled
170 * Task identifier for the function called
171 * on iterate request timeout
173 GNUNET_SCHEDULER_TaskIdentifier timeout_task;
178 * Context for a watch request
180 struct GNUNET_PEERSTORE_WatchContext
185 struct GNUNET_PEERSTORE_WatchContext *next;
190 struct GNUNET_PEERSTORE_WatchContext *prev;
193 * Handle to the PEERSTORE service.
195 struct GNUNET_PEERSTORE_Handle *h;
198 * MQ Envelope with watch request message
200 struct GNUNET_MQ_Envelope *ev;
203 * Callback with each record received
205 GNUNET_PEERSTORE_Processor callback;
208 * Closure for 'callback'
213 * Hash of the combined key
215 struct GNUNET_HashCode keyhash;
218 * #GNUNET_YES / #GNUNET_NO
219 * if sent, cannot be canceled
225 /******************************************************************************/
226 /******************* DECLARATIONS *********************/
227 /******************************************************************************/
230 * When a response for store request is received
232 * @param cls a 'struct GNUNET_PEERSTORE_StoreContext *'
233 * @param msg message received, NULL on timeout or fatal error
235 void handle_store_result (void *cls, const struct GNUNET_MessageHeader *msg);
238 * When a response for iterate request is received
240 * @param cls a 'struct GNUNET_PEERSTORE_Handle *'
241 * @param msg message received, NULL on timeout or fatal error
243 void handle_iterate_result (void *cls, const struct GNUNET_MessageHeader *msg);
246 * When a watch record is received
248 * @param cls a 'struct GNUNET_PEERSTORE_Handle *'
249 * @param msg message received, NULL on timeout or fatal error
251 void handle_watch_result (void *cls, const struct GNUNET_MessageHeader *msg);
254 * Close the existing connection to PEERSTORE and reconnect.
256 * @param h handle to the service
259 reconnect (struct GNUNET_PEERSTORE_Handle *h);
262 * MQ message handlers
264 static const struct GNUNET_MQ_MessageHandler mq_handlers[] = {
265 {&handle_store_result, GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_OK, sizeof(struct GNUNET_MessageHeader)},
266 {&handle_store_result, GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_FAIL, sizeof(struct GNUNET_MessageHeader)},
267 {&handle_iterate_result, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD, 0},
268 {&handle_iterate_result, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END, sizeof(struct GNUNET_MessageHeader)},
269 {&handle_watch_result, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD, 0},
270 GNUNET_MQ_HANDLERS_END
273 /******************************************************************************/
274 /******************* CONNECTION FUNCTIONS *********************/
275 /******************************************************************************/
278 handle_client_error (void *cls, enum GNUNET_MQ_Error error)
280 struct GNUNET_PEERSTORE_Handle *h = cls;
282 LOG(GNUNET_ERROR_TYPE_ERROR, "Received an error notification from MQ of type: %d\n", error);
287 * Close the existing connection to PEERSTORE and reconnect.
289 * @param h handle to the service
292 reconnect (struct GNUNET_PEERSTORE_Handle *h)
294 LOG(GNUNET_ERROR_TYPE_DEBUG, "Reconnecting...\n");
297 GNUNET_MQ_destroy(h->mq);
300 if (NULL != h->client)
302 GNUNET_CLIENT_disconnect (h->client);
305 h->client = GNUNET_CLIENT_connect ("peerstore", h->cfg);
306 //FIXME: retry connecting if fails again (client == NULL)
307 h->mq = GNUNET_MQ_queue_for_connection_client(h->client,
309 &handle_client_error,
311 //FIXME: resend pending requests after reconnecting
316 * Connect to the PEERSTORE service.
318 * @return NULL on error
320 struct GNUNET_PEERSTORE_Handle *
321 GNUNET_PEERSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
323 struct GNUNET_PEERSTORE_Handle *h;
325 h = GNUNET_new (struct GNUNET_PEERSTORE_Handle);
326 h->client = GNUNET_CLIENT_connect ("peerstore", cfg);
327 if(NULL == h->client)
333 h->mq = GNUNET_MQ_queue_for_connection_client(h->client,
335 &handle_client_error,
342 LOG(GNUNET_ERROR_TYPE_DEBUG, "New connection created\n");
347 * Disconnect from the PEERSTORE service
348 * Do not call in case of pending requests
350 * @param h handle to disconnect
353 GNUNET_PEERSTORE_disconnect(struct GNUNET_PEERSTORE_Handle *h)
355 if(NULL != h->watches)
357 GNUNET_CONTAINER_multihashmap_destroy(h->watches);
362 GNUNET_MQ_destroy(h->mq);
365 if (NULL != h->client)
367 GNUNET_CLIENT_disconnect (h->client);
371 LOG(GNUNET_ERROR_TYPE_DEBUG, "Disconnected, BYE!\n");
375 /******************************************************************************/
376 /******************* STORE FUNCTIONS *********************/
377 /******************************************************************************/
380 * When a response for store request is received
382 * @param cls a 'struct GNUNET_PEERSTORE_Handle *'
383 * @param msg message received, NULL on timeout or fatal error
385 void handle_store_result (void *cls, const struct GNUNET_MessageHeader *msg)
387 struct GNUNET_PEERSTORE_Handle *h = cls;
388 struct GNUNET_PEERSTORE_StoreContext *sc;
390 GNUNET_PEERSTORE_Continuation cont;
396 LOG(GNUNET_ERROR_TYPE_ERROR, "Unexpected store response, this should not happen.\n");
401 cont_cls = sc->cont_cls;
402 GNUNET_CONTAINER_DLL_remove(h->store_head, h->store_tail, sc);
404 if(NULL == msg) /* Connection error */
407 cont(cont_cls, GNUNET_SYSERR);
411 if(NULL != cont) /* Run continuation */
413 msg_type = ntohs(msg->type);
414 if(GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_OK == msg_type)
415 cont(cont_cls, GNUNET_OK);
416 else if(GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_FAIL == msg_type)
417 cont(cont_cls, GNUNET_SYSERR);
423 * Callback after MQ envelope is sent
425 * @param cls a 'struct GNUNET_PEERSTORE_StoreContext *'
427 void store_request_sent (void *cls)
429 struct GNUNET_PEERSTORE_StoreContext *sc = cls;
431 sc->request_sent = GNUNET_YES;
436 * Cancel a store request
438 * @param sc Store request context
441 GNUNET_PEERSTORE_store_cancel (struct GNUNET_PEERSTORE_StoreContext *sc)
443 LOG(GNUNET_ERROR_TYPE_DEBUG,
444 "Canceling store request.\n");
445 if(GNUNET_NO == sc->request_sent)
449 GNUNET_MQ_send_cancel(sc->ev);
452 GNUNET_CONTAINER_DLL_remove(sc->h->store_head, sc->h->store_tail, sc);
456 { /* request already sent, will have to wait for response */
463 * Store a new entry in the PEERSTORE
465 * @param h Handle to the PEERSTORE service
466 * @param sub_system name of the sub system
467 * @param peer Peer Identity
468 * @param key entry key
469 * @param value entry value BLOB
470 * @param size size of 'value'
471 * @param expiry absolute time after which the entry is (possibly) deleted
472 * @param cont Continuation function after the store request is processed
473 * @param cont_cls Closure for 'cont'
475 struct GNUNET_PEERSTORE_StoreContext *
476 GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h,
477 const char *sub_system,
478 const struct GNUNET_PeerIdentity *peer,
482 struct GNUNET_TIME_Absolute expiry,
483 GNUNET_PEERSTORE_Continuation cont,
486 struct GNUNET_MQ_Envelope *ev; //FIXME: add 'replace' flag in store function (similar to multihashmap)
487 struct GNUNET_PEERSTORE_StoreContext *sc;
489 LOG (GNUNET_ERROR_TYPE_DEBUG,
490 "Storing value (size: %lu) for subsytem `%s', peer `%s', key `%s'\n",
491 size, sub_system, GNUNET_i2s (peer), key);
492 ev = PEERSTORE_create_record_mq_envelope(sub_system,
498 GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
499 sc = GNUNET_new(struct GNUNET_PEERSTORE_StoreContext);
502 sc->cont_cls = cont_cls;
504 sc->request_sent = GNUNET_NO;
505 GNUNET_CONTAINER_DLL_insert(h->store_head, h->store_tail, sc);
506 GNUNET_MQ_notify_sent(ev, &store_request_sent, sc);
507 GNUNET_MQ_send(h->mq, ev);
512 /******************************************************************************/
513 /******************* ITERATE FUNCTIONS *********************/
514 /******************************************************************************/
517 * When a response for iterate request is received
519 * @param cls a 'struct GNUNET_PEERSTORE_Handle *'
520 * @param msg message received, NULL on timeout or fatal error
522 void handle_iterate_result (void *cls, const struct GNUNET_MessageHeader *msg)
524 struct GNUNET_PEERSTORE_Handle *h = cls;
525 struct GNUNET_PEERSTORE_IterateContext *ic;
526 GNUNET_PEERSTORE_Processor callback;
529 struct GNUNET_PEERSTORE_Record *record;
532 ic = h->iterate_head;
535 LOG(GNUNET_ERROR_TYPE_ERROR, "Unexpected iteration response, this should not happen.\n");
539 callback = ic->callback;
540 callback_cls = ic->callback_cls;
541 if(NULL == msg) /* Connection error */
545 callback(callback_cls, NULL,
546 _("Error communicating with `PEERSTORE' service."));
550 msg_type = ntohs(msg->type);
551 if(GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END == msg_type)
553 GNUNET_PEERSTORE_iterate_cancel(ic);
555 callback(callback_cls, NULL, NULL);
560 record = PEERSTORE_parse_record_message(msg);
562 continue_iter = callback(callback_cls, record, _("Received a malformed response from service."));
564 continue_iter = callback(callback_cls, record, NULL);
565 if(GNUNET_NO == continue_iter)
572 * Callback after MQ envelope is sent
574 * @param cls a 'struct GNUNET_PEERSTORE_IterateContext *'
576 void iterate_request_sent (void *cls)
578 struct GNUNET_PEERSTORE_IterateContext *ic = cls;
580 LOG(GNUNET_ERROR_TYPE_DEBUG, "Iterate request sent to service.\n");
581 ic->request_sent = GNUNET_YES;
586 * Called when the iterate request is timedout
588 * @param cls a 'struct GNUNET_PEERSTORE_IterateContext *'
590 void iterate_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
592 struct GNUNET_PEERSTORE_IterateContext *ic = cls;
594 ic->timeout_task = GNUNET_SCHEDULER_NO_TASK;
595 GNUNET_PEERSTORE_iterate_cancel(ic);
599 * Cancel an iterate request
600 * Please do not call after the iterate request is done
602 * @param ic Iterate request context as returned by GNUNET_PEERSTORE_iterate()
605 GNUNET_PEERSTORE_iterate_cancel (struct GNUNET_PEERSTORE_IterateContext *ic)
607 LOG(GNUNET_ERROR_TYPE_DEBUG, "Canceling iterate request.\n");
608 if(GNUNET_SCHEDULER_NO_TASK != ic->timeout_task)
610 GNUNET_SCHEDULER_cancel(ic->timeout_task);
611 ic->timeout_task = GNUNET_SCHEDULER_NO_TASK;
613 if(GNUNET_NO == ic->request_sent)
617 GNUNET_MQ_send_cancel(ic->ev);
620 GNUNET_CONTAINER_DLL_remove(ic->h->iterate_head, ic->h->iterate_tail, ic);
628 * Iterate over records matching supplied key information
630 * @param h handle to the PEERSTORE service
631 * @param sub_system name of sub system
632 * @param peer Peer identity (can be NULL)
633 * @param key entry key string (can be NULL)
634 * @param timeout time after which the iterate request is canceled
635 * @param callback function called with each matching record, all NULL's on end
636 * @param callback_cls closure for @a callback
637 * @return Handle to iteration request
639 struct GNUNET_PEERSTORE_IterateContext *
640 GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h,
641 const char *sub_system,
642 const struct GNUNET_PeerIdentity *peer,
644 struct GNUNET_TIME_Relative timeout,
645 GNUNET_PEERSTORE_Processor callback, void *callback_cls)
647 struct GNUNET_MQ_Envelope *ev;
648 struct GNUNET_PEERSTORE_IterateContext *ic;
650 ev = PEERSTORE_create_record_mq_envelope(sub_system,
656 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
657 ic = GNUNET_new(struct GNUNET_PEERSTORE_IterateContext);
658 ic->callback = callback;
659 ic->callback_cls = callback_cls;
662 ic->request_sent = GNUNET_NO;
663 GNUNET_CONTAINER_DLL_insert(h->iterate_head, h->iterate_tail, ic);
664 LOG(GNUNET_ERROR_TYPE_DEBUG,
665 "Sending an iterate request for sub system `%s'\n", sub_system);
666 GNUNET_MQ_notify_sent(ev, &iterate_request_sent, ic);
667 GNUNET_MQ_send(h->mq, ev);
668 ic->timeout_task = GNUNET_SCHEDULER_add_delayed(timeout, &iterate_timeout, ic);
672 /******************************************************************************/
673 /******************* WATCH FUNCTIONS *********************/
674 /******************************************************************************/
677 * When a watch record is received
679 * @param cls a 'struct GNUNET_PEERSTORE_Handle *'
680 * @param msg message received, NULL on timeout or fatal error
682 void handle_watch_result (void *cls, const struct GNUNET_MessageHeader *msg)
684 /*struct GNUNET_PEERSTORE_Handle *h = cls;
685 struct GNUNET_PEERSTORE_WatchContext *wc;
686 GNUNET_PEERSTORE_Processor callback;
691 struct GNUNET_PEERSTORE_IterateContext *ic;
693 struct GNUNET_PEERSTORE_Record *record;
696 ic = h->iterate_head;
699 LOG(GNUNET_ERROR_TYPE_ERROR, "Unexpected iteration response, this should not happen.\n");
703 callback = ic->callback;
704 callback_cls = ic->callback_cls;
705 if(NULL == msg) * Connection error *
709 callback(callback_cls, NULL,
710 _("Error communicating with `PEERSTORE' service."));
714 msg_type = ntohs(msg->type);
715 if(GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END == msg_type)
717 GNUNET_PEERSTORE_iterate_cancel(ic);
719 callback(callback_cls, NULL, NULL);
724 record = PEERSTORE_parse_record_message(msg);
726 continue_iter = callback(callback_cls, record, _("Received a malformed response from service."));
728 continue_iter = callback(callback_cls, record, NULL);
729 if(GNUNET_NO == continue_iter)
735 * Callback after MQ envelope is sent
737 * @param cls a 'struct GNUNET_PEERSTORE_WatchContext *'
739 void watch_request_sent (void *cls)
741 struct GNUNET_PEERSTORE_WatchContext *wc = cls;
743 wc->request_sent = GNUNET_YES;
748 * Cancel a watch request
750 * @wc handle to the watch request
753 GNUNET_PEERSTORE_watch_cancel(struct GNUNET_PEERSTORE_WatchContext *wc)
755 struct GNUNET_PEERSTORE_Handle *h = wc->h;
756 struct GNUNET_MQ_Envelope *ev;
757 struct StoreKeyHashMessage *hm;
759 LOG(GNUNET_ERROR_TYPE_DEBUG, "Canceling watch.\n");
760 if(GNUNET_YES == wc->request_sent) /* If request already sent to service, send a cancel request. */
762 ev = GNUNET_MQ_msg(hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL);
763 GNUNET_MQ_send(h->mq, ev);
765 wc->callback_cls = NULL;
769 GNUNET_MQ_send_cancel(wc->ev);
772 GNUNET_CONTAINER_multihashmap_remove(h->watches, &wc->keyhash, wc);
778 * Request watching a given key
779 * User will be notified with any new values added to key
781 * @param h handle to the PEERSTORE service
782 * @param sub_system name of sub system
783 * @param peer Peer identity
784 * @param key entry key string
785 * @param callback function called with each new value
786 * @param callback_cls closure for @a callback
787 * @return Handle to watch request
789 struct GNUNET_PEERSTORE_WatchContext *
790 GNUNET_PEERSTORE_watch (struct GNUNET_PEERSTORE_Handle *h,
791 const char *sub_system,
792 const struct GNUNET_PeerIdentity *peer,
794 GNUNET_PEERSTORE_Processor callback, void *callback_cls)
796 struct GNUNET_MQ_Envelope *ev;
797 struct StoreKeyHashMessage *hm;
798 struct GNUNET_PEERSTORE_WatchContext *wc;
800 ev = GNUNET_MQ_msg(hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
801 PEERSTORE_hash_key(sub_system, peer, key, &hm->keyhash);
802 wc = GNUNET_new(struct GNUNET_PEERSTORE_WatchContext);
803 wc->callback = callback;
804 wc->callback_cls = callback_cls;
807 wc->request_sent = GNUNET_NO;
808 wc->keyhash = hm->keyhash;
809 if(NULL == h->watches)
810 h->watches = GNUNET_CONTAINER_multihashmap_create(5, GNUNET_NO);
811 GNUNET_CONTAINER_multihashmap_put(h->watches, &wc->keyhash,
812 wc, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
813 LOG(GNUNET_ERROR_TYPE_DEBUG,
814 "Sending a watch request for sub system `%s'.\n", sub_system);
815 GNUNET_MQ_notify_sent(ev, &watch_request_sent, wc);
816 GNUNET_MQ_send(h->mq, ev);
820 /* end of peerstore_api.c */