From 14ddd612091d3a894901bdf6213db7487178f6e2 Mon Sep 17 00:00:00 2001 From: Florian Dold Date: Sun, 6 Oct 2013 22:54:21 +0000 Subject: [PATCH] MQ for CORE --- src/core/Makefile.am | 10 ++- src/core/core_api.c | 129 ++++++++++++++++++++++++++++++ src/include/gnunet_core_service.h | 18 +++++ src/include/gnunet_mesh_service.h | 4 +- src/util/mq.c | 1 - 5 files changed, 158 insertions(+), 4 deletions(-) diff --git a/src/core/Makefile.am b/src/core/Makefile.am index e0c34f54c..9a68925b2 100644 --- a/src/core/Makefile.am +++ b/src/core/Makefile.am @@ -60,7 +60,7 @@ gnunet_core_DEPENDENCIES = \ libgnunetcore.la if HAVE_TESTING - TESTING_TESTS = test_core_api_send_to_self + TESTING_TESTS = test_core_api_send_to_self test_core_api_mq endif check_PROGRAMS = \ @@ -98,6 +98,14 @@ test_core_api_send_to_self_LDADD = \ $(top_builddir)/src/transport/libgnunettransport.la \ $(top_builddir)/src/util/libgnunetutil.la +test_core_api_mq_SOURCES = \ + test_core_api_mq.c +test_core_api_mq_LDADD = \ + $(top_builddir)/src/core/libgnunetcore.la \ + $(top_builddir)/src/testing/libgnunettesting.la \ + $(top_builddir)/src/transport/libgnunettransport.la \ + $(top_builddir)/src/util/libgnunetutil.la + test_core_api_start_only_SOURCES = \ test_core_api_start_only.c test_core_api_start_only_LDADD = \ diff --git a/src/core/core_api.c b/src/core/core_api.c index 7aa3e0519..34c235bbd 100644 --- a/src/core/core_api.c +++ b/src/core/core_api.c @@ -25,6 +25,7 @@ * @author Christian Grothoff */ #include "platform.h" +#include "gnunet_util_lib.h" #include "gnunet_constants.h" #include "gnunet_core_service.h" #include "core.h" @@ -144,6 +145,13 @@ struct PeerRecord }; +struct CoreMQState +{ + struct GNUNET_PeerIdentity target; + struct GNUNET_CORE_Handle *core; + struct GNUNET_CORE_TransmitHandle *th; +}; + /** * Type of function called upon completion. @@ -1387,4 +1395,125 @@ GNUNET_CORE_is_peer_connected_sync (const struct GNUNET_CORE_Handle *h, } +/** + * Function called to notify a client about the connection + * begin ready to queue more data. "buf" will be + * NULL and "size" zero if the connection was closed for + * writing in the meantime. + * + * @param cls closure + * @param size number of bytes available in buf + * @param buf where the callee should write the message + * @return number of bytes written to buf + */ +static size_t +core_mq_ntr (void *cls, size_t size, + void *buf) +{ + struct GNUNET_MQ_Handle *mq = cls; + struct CoreMQState *mqs = GNUNET_MQ_impl_state (mq); + const struct GNUNET_MessageHeader *mh = GNUNET_MQ_impl_current (mq); + size_t msg_size = ntohs (mh->size); + GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "core-mq", "ntr called (size %u, type %u)\n", + msg_size, ntohs (mh->type)); + mqs->th = NULL; + if (NULL == buf) + { + GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "core-mq", "send error\n"); + GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_WRITE); + return 0; + } + memcpy (buf, mh, msg_size); + GNUNET_MQ_impl_send_commit (mq); + GNUNET_MQ_impl_send_continue (mq); + return msg_size; +} + + +/** + * Signature of functions implementing the + * sending functionality of a message queue. + * + * @param mq the message queue + * @param msg the message to send + * @param impl_state state of the implementation + */ +static void +core_mq_send (struct GNUNET_MQ_Handle *mq, + const struct GNUNET_MessageHeader *msg, + void *impl_state) +{ + struct CoreMQState *mqs = impl_state; + GNUNET_assert (NULL == mqs->th); + GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "core-mq", "Sending queued message (size %u)\n", + ntohs (msg->size)); + mqs->th = GNUNET_CORE_notify_transmit_ready (mqs->core, GNUNET_YES, 0, + GNUNET_TIME_UNIT_FOREVER_REL, + &mqs->target, + ntohs (msg->size), core_mq_ntr, mq); +} + + +/** + * Signature of functions implementing the + * destruction of a message queue. + * Implementations must not free @a mq, but should + * take care of @a impl_state. + * + * @param mq the message queue to destroy + * @param impl_state state of the implementation + */ +static void +core_mq_destroy (struct GNUNET_MQ_Handle *mq, void *impl_state) +{ + struct CoreMQState *mqs = impl_state; + if (NULL != mqs->th) + { + GNUNET_CORE_notify_transmit_ready_cancel (mqs->th); + mqs->th = NULL; + } + GNUNET_free (mqs); +} + + +/** + * Implementation function that cancels the currently sent message. + * + * @param mq message queue + * @param impl_state state specific to the implementation + */ +static void +core_mq_cancel (struct GNUNET_MQ_Handle *mq, void *impl_state) +{ + struct CoreMQState *mqs = impl_state; + GNUNET_assert (NULL != mqs->th); + GNUNET_CORE_notify_transmit_ready_cancel (mqs->th); +} + + +/** + * Create a message queue for sending messages to a peer with CORE. + * Messages may only be queued with #GNUNET_MQ_send once the init callback has + * been called for the given handle. + * There must only be one queue per peer for each core handle. + * The message queue can only be used to transmit messages, + * not to receive them. + * + * @param h the core handle + * @param target the target peer for this queue, may not be NULL + * @return a message queue for sending messages over the core handle + * to the target peer + */ +struct GNUNET_MQ_Handle * +GNUNET_CORE_mq_create (struct GNUNET_CORE_Handle *h, + const struct GNUNET_PeerIdentity *target) +{ + struct CoreMQState *mqs = GNUNET_new (struct CoreMQState); + mqs->core = h; + mqs->target = *target; + return GNUNET_MQ_queue_for_callbacks (core_mq_send, core_mq_destroy, + core_mq_cancel, mqs, + NULL, NULL, NULL); +} + /* end of core_api.c */ diff --git a/src/include/gnunet_core_service.h b/src/include/gnunet_core_service.h index 64daadcea..4eca0cc03 100644 --- a/src/include/gnunet_core_service.h +++ b/src/include/gnunet_core_service.h @@ -292,6 +292,24 @@ GNUNET_CORE_is_peer_connected_sync (const struct GNUNET_CORE_Handle *h, const struct GNUNET_PeerIdentity *pid); +/** + * Create a message queue for sending messages to a peer with CORE. + * Messages may only be queued with #GNUNET_MQ_send once the init callback has + * been called for the given handle. + * There must only be one queue per peer for each core handle. + * The message queue can only be used to transmit messages, + * not to receive them. + * + * @param h the core handle + * @param target the target peer for this queue, may not be NULL + * @return a message queue for sending messages over the core handle + * to the target peer + */ +struct GNUNET_MQ_Handle * +GNUNET_CORE_mq_create (struct GNUNET_CORE_Handle *h, + const struct GNUNET_PeerIdentity *target); + + #if 0 /* keep Emacsens' auto-indent happy */ { #endif diff --git a/src/include/gnunet_mesh_service.h b/src/include/gnunet_mesh_service.h index 08748d87b..fa6382683 100644 --- a/src/include/gnunet_mesh_service.h +++ b/src/include/gnunet_mesh_service.h @@ -438,8 +438,8 @@ GNUNET_MESH_get_tunnels_cancel (struct GNUNET_MESH_Handle *h); * The message queue can only be used to transmit messages, * not to receive them. * - * @param tunnel the tunnel to create the message qeue for - * @return a message queue to messages over the tunnel + * @param tunnel the tunnel to create the message queue for + * @return a message queue for sending messages over the tunnel */ struct GNUNET_MQ_Handle * GNUNET_MESH_mq_create (struct GNUNET_MESH_Tunnel *tunnel); diff --git a/src/util/mq.c b/src/util/mq.c index c4f9e0d0b..1a3374087 100644 --- a/src/util/mq.c +++ b/src/util/mq.c @@ -278,7 +278,6 @@ GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq) return; } - GNUNET_assert (NULL != mq->envelope_tail); GNUNET_assert (NULL != mq->envelope_head); mq->current_envelope = mq->envelope_head; -- 2.25.1