2 This file is part of GNUnet.
3 Copyright (C) 2009-2016 GNUnet e.V.
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 Boston, MA 02110-1301, USA.
21 * @file core/core_api.c
22 * @brief core service; this is the main API for encrypted P2P
24 * @author Christian Grothoff
27 #include "gnunet_util_lib.h"
28 #include "gnunet_constants.h"
29 #include "gnunet_core_service.h"
32 #define LOG(kind,...) GNUNET_log_from (kind, "core-api",__VA_ARGS__)
36 * Information we track for each peer.
42 * Corresponding CORE handle.
44 struct GNUNET_CORE_Handle *h;
47 * Message queue for the peer.
49 struct GNUNET_MQ_Handle *mq;
52 * Message we are currently trying to pass to the CORE service
53 * for this peer (from @e mq).
55 struct GNUNET_MQ_Envelope *env;
58 * Value the client returned when we connected, used
59 * as the closure in various places.
64 * Peer the record is about.
66 struct GNUNET_PeerIdentity peer;
69 * SendMessageRequest ID generator for this peer.
77 * Context for the core service connection.
79 struct GNUNET_CORE_Handle
83 * Configuration we're using.
85 const struct GNUNET_CONFIGURATION_Handle *cfg;
88 * Closure for the various callbacks.
93 * Function to call once we've handshaked with the core service.
95 GNUNET_CORE_StartupCallback init;
98 * Function to call whenever we're notified about a peer connecting.
100 GNUNET_CORE_ConnectEventHandler connects;
103 * Function to call whenever we're notified about a peer disconnecting.
105 GNUNET_CORE_DisconnectEventHandler disconnects;
108 * Function handlers for messages of particular type.
110 struct GNUNET_MQ_MessageHandler *handlers;
113 * Our message queue for transmissions to the service.
115 struct GNUNET_MQ_Handle *mq;
118 * Hash map listing all of the peers that we are currently
121 struct GNUNET_CONTAINER_MultiPeerMap *peers;
124 * Identity of this peer.
126 struct GNUNET_PeerIdentity me;
129 * ID of reconnect task (if any).
131 struct GNUNET_SCHEDULER_Task *reconnect_task;
134 * Current delay we use for re-trying to connect to core.
136 struct GNUNET_TIME_Relative retry_backoff;
139 * Number of entries in the handlers array.
144 * Did we ever get INIT?
152 * Our current client connection went down. Clean it up
153 * and try to reconnect!
155 * @param h our handle to the core service
158 reconnect (struct GNUNET_CORE_Handle *h);
162 * Task schedule to try to re-connect to core.
164 * @param cls the `struct GNUNET_CORE_Handle`
165 * @param tc task context
168 reconnect_task (void *cls)
170 struct GNUNET_CORE_Handle *h = cls;
172 h->reconnect_task = NULL;
173 LOG (GNUNET_ERROR_TYPE_DEBUG,
174 "Connecting to CORE service after delay\n");
180 * Notify clients about disconnect and free the entry for connected
183 * @param cls the `struct GNUNET_CORE_Handle *`
184 * @param key the peer identity (not used)
185 * @param value the `struct PeerRecord` to free.
186 * @return #GNUNET_YES (continue)
189 disconnect_and_free_peer_entry (void *cls,
190 const struct GNUNET_PeerIdentity *key,
193 struct GNUNET_CORE_Handle *h = cls;
194 struct PeerRecord *pr = value;
196 GNUNET_assert (pr->h == h);
197 if (NULL != h->disconnects)
198 h->disconnects (h->cls,
201 GNUNET_assert (GNUNET_YES ==
202 GNUNET_CONTAINER_multipeermap_remove (h->peers,
205 GNUNET_MQ_destroy (pr->mq);
206 GNUNET_assert (NULL == pr->mq);
209 GNUNET_MQ_discard (pr->env);
218 * Close down any existing connection to the CORE service and
219 * try re-establishing it later.
221 * @param h our handle
224 reconnect_later (struct GNUNET_CORE_Handle *h)
226 GNUNET_assert (NULL == h->reconnect_task);
229 GNUNET_MQ_destroy (h->mq);
232 GNUNET_assert (NULL == h->reconnect_task);
234 GNUNET_SCHEDULER_add_delayed (h->retry_backoff,
237 GNUNET_CONTAINER_multipeermap_iterate (h->peers,
238 &disconnect_and_free_peer_entry,
240 h->retry_backoff = GNUNET_TIME_STD_BACKOFF (h->retry_backoff);
245 * Error handler for the message queue to the CORE service.
246 * On errors, we reconnect.
248 * @param cls closure, a `struct GNUNET_CORE_Handle *`
249 * @param error error code
252 handle_mq_error (void *cls,
253 enum GNUNET_MQ_Error error)
255 struct GNUNET_CORE_Handle *h = cls;
257 LOG (GNUNET_ERROR_TYPE_DEBUG,
265 * Inquire with CORE what options should be set for a message
266 * so that it is transmitted with the given @a priority and
267 * the given @a cork value.
269 * @param cork desired corking
270 * @param priority desired message priority
271 * @param[out] flags set to `flags` value for #GNUNET_MQ_set_options()
272 * @return `extra` argument to give to #GNUNET_MQ_set_options()
275 GNUNET_CORE_get_mq_options (int cork,
276 enum GNUNET_CORE_Priority priority,
279 *flags = ((uint64_t) priority) + (((uint64_t) cork) << 32);
285 * Implement sending functionality of a message queue for
286 * us sending messages to a peer.
288 * @param mq the message queue
289 * @param msg the message to send
290 * @param impl_state state of the implementation
293 core_mq_send_impl (struct GNUNET_MQ_Handle *mq,
294 const struct GNUNET_MessageHeader *msg,
297 struct PeerRecord *pr = impl_state;
298 struct GNUNET_CORE_Handle *h = pr->h;
299 struct SendMessageRequest *smr;
300 struct SendMessage *sm;
301 struct GNUNET_MQ_Envelope *env;
305 enum GNUNET_CORE_Priority priority;
309 /* We're currently reconnecting, pretend this worked */
310 GNUNET_MQ_impl_send_continue (mq);
313 GNUNET_assert (NULL == pr->env);
314 /* extract options from envelope */
315 env = GNUNET_MQ_get_current_envelope (mq);
316 GNUNET_break (NULL ==
317 GNUNET_MQ_env_get_options (env,
319 cork = (int) (flags >> 32);
320 priority = (uint32_t) flags;
322 /* check message size for sanity */
323 msize = ntohs (msg->size);
324 if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (struct SendMessage))
327 GNUNET_MQ_impl_send_continue (mq);
331 /* ask core for transmission */
332 LOG (GNUNET_ERROR_TYPE_DEBUG,
333 "Asking core for transmission of %u bytes to `%s'\n",
334 (unsigned int) msize,
335 GNUNET_i2s (&pr->peer));
336 env = GNUNET_MQ_msg (smr,
337 GNUNET_MESSAGE_TYPE_CORE_SEND_REQUEST);
338 smr->priority = htonl ((uint32_t) priority);
339 // smr->deadline = GNUNET_TIME_absolute_hton (deadline);
340 smr->peer = pr->peer;
341 smr->reserved = htonl (0);
342 smr->size = htons (msize);
343 smr->smr_id = htons (++pr->smr_id_gen);
344 GNUNET_MQ_send (h->mq,
347 /* prepare message with actual transmission data */
348 pr->env = GNUNET_MQ_msg_nested_mh (sm,
349 GNUNET_MESSAGE_TYPE_CORE_SEND,
351 sm->priority = htonl ((uint32_t) priority);
352 // sm->deadline = GNUNET_TIME_absolute_hton (deadline);
354 sm->cork = htonl ((uint32_t) cork);
355 sm->reserved = htonl (0);
356 LOG (GNUNET_ERROR_TYPE_DEBUG,
357 "Calling get_message with buffer of %u bytes (%s)\n",
358 (unsigned int) msize,
359 cork ? "corked" : "uncorked");
364 * Handle destruction of a message queue. Implementations must not
365 * free @a mq, but should take care of @a impl_state.
367 * @param mq the message queue to destroy
368 * @param impl_state state of the implementation
371 core_mq_destroy_impl (struct GNUNET_MQ_Handle *mq,
374 struct PeerRecord *pr = impl_state;
376 GNUNET_assert (mq == pr->mq);
382 * Implementation function that cancels the currently sent message.
383 * Should basically undo whatever #mq_send_impl() did.
385 * @param mq message queue
386 * @param impl_state state specific to the implementation
389 core_mq_cancel_impl (struct GNUNET_MQ_Handle *mq,
392 struct PeerRecord *pr = impl_state;
394 GNUNET_assert (NULL != pr->env);
395 GNUNET_MQ_discard (pr->env);
401 * We had an error processing a message we forwarded from a peer to
402 * the CORE service. We should just complain about it but otherwise
403 * continue processing.
406 * @param error error code
409 core_mq_error_handler (void *cls,
410 enum GNUNET_MQ_Error error)
412 /* struct PeerRecord *pr = cls; */
419 * Add the given peer to the list of our connected peers
420 * and create the respective data structures and notify
423 * @param h the core handle
424 * @param peer the peer that is connecting to us
427 connect_peer (struct GNUNET_CORE_Handle *h,
428 const struct GNUNET_PeerIdentity *peer)
430 struct PeerRecord *pr;
434 pr = GNUNET_new (struct PeerRecord);
437 GNUNET_assert (GNUNET_YES ==
438 GNUNET_CONTAINER_multipeermap_put (h->peers,
441 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
442 pr->mq = GNUNET_MQ_queue_for_callbacks (&core_mq_send_impl,
443 &core_mq_destroy_impl,
444 &core_mq_cancel_impl,
447 &core_mq_error_handler,
449 /* get our default options */
450 extra = GNUNET_CORE_get_mq_options (GNUNET_NO,
451 GNUNET_CORE_PRIO_BEST_EFFORT,
453 GNUNET_MQ_set_options (pr->mq,
456 if (NULL != h->connects)
458 pr->client_cls = h->connects (h->cls,
461 GNUNET_MQ_set_handlers_closure (pr->mq,
468 * Handle init reply message received from CORE service. Notify
469 * application that we are now connected to the CORE. Also fake
470 * loopback connection.
472 * @param cls the `struct GNUNET_CORE_Handle`
473 * @param m the init reply
476 handle_init_reply (void *cls,
477 const struct InitReplyMessage *m)
479 struct GNUNET_CORE_Handle *h = cls;
480 GNUNET_CORE_StartupCallback init;
482 GNUNET_break (0 == ntohl (m->reserved));
483 h->retry_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
484 if (NULL != (init = h->init))
486 /* mark so we don't call init on reconnect */
488 h->me = m->my_identity;
489 LOG (GNUNET_ERROR_TYPE_DEBUG,
490 "Connected to core service of peer `%s'.\n",
491 GNUNET_i2s (&h->me));
492 h->have_init = GNUNET_YES;
498 LOG (GNUNET_ERROR_TYPE_DEBUG,
499 "Successfully reconnected to core service.\n");
500 if (GNUNET_NO == h->have_init)
502 h->me = m->my_identity;
503 h->have_init = GNUNET_YES;
507 GNUNET_break (0 == memcmp (&h->me,
509 sizeof (struct GNUNET_PeerIdentity)));
512 /* fake 'connect to self' */
519 * Handle connect message received from CORE service.
520 * Notify the application about the new connection.
522 * @param cls the `struct GNUNET_CORE_Handle`
523 * @param cnm the connect message
526 handle_connect_notify (void *cls,
527 const struct ConnectNotifyMessage *cnm)
529 struct GNUNET_CORE_Handle *h = cls;
530 struct PeerRecord *pr;
532 LOG (GNUNET_ERROR_TYPE_DEBUG,
533 "Received notification about connection from `%s'.\n",
534 GNUNET_i2s (&cnm->peer));
535 if (0 == memcmp (&h->me,
537 sizeof (struct GNUNET_PeerIdentity)))
539 /* connect to self!? */
543 pr = GNUNET_CONTAINER_multipeermap_get (h->peers,
557 * Handle disconnect message received from CORE service.
558 * Notify the application about the lost connection.
560 * @param cls the `struct GNUNET_CORE_Handle`
561 * @param dnm message about the disconnect event
564 handle_disconnect_notify (void *cls,
565 const struct DisconnectNotifyMessage *dnm)
567 struct GNUNET_CORE_Handle *h = cls;
568 struct PeerRecord *pr;
570 if (0 == memcmp (&h->me,
572 sizeof (struct GNUNET_PeerIdentity)))
574 /* disconnect from self!? */
578 GNUNET_break (0 == ntohl (dnm->reserved));
579 LOG (GNUNET_ERROR_TYPE_DEBUG,
580 "Received notification about disconnect from `%s'.\n",
581 GNUNET_i2s (&dnm->peer));
582 pr = GNUNET_CONTAINER_multipeermap_get (h->peers,
590 disconnect_and_free_peer_entry (h,
597 * Check that message received from CORE service is well-formed.
599 * @param cls the `struct GNUNET_CORE_Handle`
600 * @param ntm the message we got
601 * @return #GNUNET_OK if the message is well-formed
604 check_notify_inbound (void *cls,
605 const struct NotifyTrafficMessage *ntm)
608 const struct GNUNET_MessageHeader *em;
610 msize = ntohs (ntm->header.size) - sizeof (struct NotifyTrafficMessage);
611 if (msize < sizeof (struct GNUNET_MessageHeader))
614 return GNUNET_SYSERR;
616 em = (const struct GNUNET_MessageHeader *) &ntm[1];
617 if (msize != ntohs (em->size))
620 return GNUNET_SYSERR;
627 * Handle inbound message received from CORE service. If applicable,
628 * notify the application.
630 * @param cls the `struct GNUNET_CORE_Handle`
631 * @param ntm the message we got from CORE.
634 handle_notify_inbound (void *cls,
635 const struct NotifyTrafficMessage *ntm)
637 struct GNUNET_CORE_Handle *h = cls;
638 const struct GNUNET_MessageHeader *em;
639 struct PeerRecord *pr;
641 LOG (GNUNET_ERROR_TYPE_DEBUG,
642 "Received inbound message from `%s'.\n",
643 GNUNET_i2s (&ntm->peer));
644 em = (const struct GNUNET_MessageHeader *) &ntm[1];
645 pr = GNUNET_CONTAINER_multipeermap_get (h->peers,
653 GNUNET_MQ_inject_message (pr->mq,
659 * Handle message received from CORE service notifying us that we are
660 * now allowed to send a message to a peer. If that message is still
661 * pending, put it into the queue to be transmitted.
663 * @param cls the `struct GNUNET_CORE_Handle`
664 * @param smr the message we got
667 handle_send_ready (void *cls,
668 const struct SendMessageReady *smr)
670 struct GNUNET_CORE_Handle *h = cls;
671 struct PeerRecord *pr;
673 pr = GNUNET_CONTAINER_multipeermap_get (h->peers,
681 LOG (GNUNET_ERROR_TYPE_DEBUG,
682 "Received notification about transmission readiness to `%s'.\n",
683 GNUNET_i2s (&smr->peer));
686 /* request must have been cancelled between the original request
687 * and the response from CORE, ignore CORE's readiness */
690 if (ntohs (smr->smr_id) != pr->smr_id_gen)
692 /* READY message is for expired or cancelled message,
693 * ignore! (we should have already sent another request) */
697 /* ok, all good, send message out! */
698 GNUNET_MQ_send (h->mq,
701 GNUNET_MQ_impl_send_continue (pr->mq);
706 * Our current client connection went down. Clean it up and try to
709 * @param h our handle to the core service
712 reconnect (struct GNUNET_CORE_Handle *h)
714 struct GNUNET_MQ_MessageHandler handlers[] = {
715 GNUNET_MQ_hd_fixed_size (init_reply,
716 GNUNET_MESSAGE_TYPE_CORE_INIT_REPLY,
717 struct InitReplyMessage,
719 GNUNET_MQ_hd_fixed_size (connect_notify,
720 GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT,
721 struct ConnectNotifyMessage,
723 GNUNET_MQ_hd_fixed_size (disconnect_notify,
724 GNUNET_MESSAGE_TYPE_CORE_NOTIFY_DISCONNECT,
725 struct DisconnectNotifyMessage,
727 GNUNET_MQ_hd_var_size (notify_inbound,
728 GNUNET_MESSAGE_TYPE_CORE_NOTIFY_INBOUND,
729 struct NotifyTrafficMessage,
731 GNUNET_MQ_hd_fixed_size (send_ready,
732 GNUNET_MESSAGE_TYPE_CORE_SEND_READY,
733 struct SendMessageReady,
735 GNUNET_MQ_handler_end ()
737 struct InitMessage *init;
738 struct GNUNET_MQ_Envelope *env;
741 GNUNET_assert (NULL == h->mq);
742 h->mq = GNUNET_CLIENT_connect (h->cfg,
752 env = GNUNET_MQ_msg_extra (init,
753 sizeof (uint16_t) * h->hcnt,
754 GNUNET_MESSAGE_TYPE_CORE_INIT);
755 LOG (GNUNET_ERROR_TYPE_INFO,
756 "(Re)connecting to CORE service\n");
757 init->options = htonl (0);
758 ts = (uint16_t *) &init[1];
759 for (unsigned int hpos = 0; hpos < h->hcnt; hpos++)
760 ts[hpos] = htons (h->handlers[hpos].type);
761 GNUNET_MQ_send (h->mq,
767 * Connect to the core service. Note that the connection may complete
768 * (or fail) asynchronously.
770 * @param cfg configuration to use
771 * @param cls closure for the various callbacks that follow (including handlers in the handlers array)
772 * @param init callback to call once we have successfully
773 * connected to the core service
774 * @param connects function to call on peer connect, can be NULL
775 * @param disconnects function to call on peer disconnect / timeout, can be NULL
776 * @param handlers callbacks for messages we care about, NULL-terminated
777 * @return handle to the core service (only useful for disconnect until @a init is called);
778 * NULL on error (in this case, init is never called)
780 struct GNUNET_CORE_Handle *
781 GNUNET_CORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
783 GNUNET_CORE_StartupCallback init,
784 GNUNET_CORE_ConnectEventHandler connects,
785 GNUNET_CORE_DisconnectEventHandler disconnects,
786 const struct GNUNET_MQ_MessageHandler *handlers)
788 struct GNUNET_CORE_Handle *h;
791 h = GNUNET_new (struct GNUNET_CORE_Handle);
795 h->connects = connects;
796 h->disconnects = disconnects;
797 h->peers = GNUNET_CONTAINER_multipeermap_create (128,
800 if (NULL != handlers)
801 while (NULL != handlers[hcnt].cb)
803 h->handlers = GNUNET_new_array (hcnt + 1,
804 struct GNUNET_MQ_MessageHandler);
805 if (NULL != handlers)
806 GNUNET_memcpy (h->handlers,
808 hcnt * sizeof (struct GNUNET_MQ_MessageHandler));
810 GNUNET_assert (hcnt <
811 (GNUNET_SERVER_MAX_MESSAGE_SIZE -
812 sizeof (struct InitMessage)) / sizeof (uint16_t));
813 LOG (GNUNET_ERROR_TYPE_DEBUG,
814 "Connecting to CORE service\n");
818 GNUNET_CORE_disconnect (h);
826 * Disconnect from the core service.
828 * @param handle connection to core to disconnect
831 GNUNET_CORE_disconnect (struct GNUNET_CORE_Handle *handle)
833 LOG (GNUNET_ERROR_TYPE_DEBUG,
834 "Disconnecting from CORE service\n");
835 GNUNET_CONTAINER_multipeermap_iterate (handle->peers,
836 &disconnect_and_free_peer_entry,
838 GNUNET_CONTAINER_multipeermap_destroy (handle->peers);
839 handle->peers = NULL;
840 if (NULL != handle->reconnect_task)
842 GNUNET_SCHEDULER_cancel (handle->reconnect_task);
843 handle->reconnect_task = NULL;
845 if (NULL != handle->mq)
847 GNUNET_MQ_destroy (handle->mq);
850 GNUNET_free (handle->handlers);
851 GNUNET_free (handle);
856 * Obtain the message queue for a connected peer.
858 * @param h the core handle
859 * @param pid the identity of the peer to check if it has been connected to us
860 * @return NULL if peer is not connected
862 struct GNUNET_MQ_Handle *
863 GNUNET_CORE_get_mq (const struct GNUNET_CORE_Handle *h,
864 const struct GNUNET_PeerIdentity *pid)
866 struct PeerRecord *pr;
868 pr = GNUNET_CONTAINER_multipeermap_get (h->peers,
876 /* end of core_api.c */