From: Christian Grothoff Date: Thu, 23 Jun 2016 19:14:56 +0000 (+0000) Subject: converting core monitor to MQ X-Git-Tag: initial-import-from-subversion-38251~726 X-Git-Url: https://git.librecmc.org/?a=commitdiff_plain;h=44ad80664bfcb495eb82599680bde78ccb02cd6d;p=oweals%2Fgnunet.git converting core monitor to MQ --- diff --git a/src/core/core_api_monitor_peers.c b/src/core/core_api_monitor_peers.c index 419670d7c..bafcd3e94 100644 --- a/src/core/core_api_monitor_peers.c +++ b/src/core/core_api_monitor_peers.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - Copyright (C) 2009-2014 GNUnet e.V. + Copyright (C) 2009-2014, 2016 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -43,12 +43,7 @@ struct GNUNET_CORE_MonitorHandle /** * Our connection to the service. */ - struct GNUNET_CLIENT_Connection *client; - - /** - * Handle for transmitting a request. - */ - struct GNUNET_CLIENT_TransmitHandle *th; + struct GNUNET_MQ_Handle *mq; /** * Function called with the peer. @@ -64,47 +59,30 @@ struct GNUNET_CORE_MonitorHandle /** - * Transmits the monitor request to the CORE service. - * - * Function is called to notify a client about the socket begin ready - * to queue more data. @a buf will be NULL and @a size zero if the - * socket was closed for writing in the meantime. + * Protocol error, reconnect to CORE service and notify + * client. * - * @param cls closure, our `struct GNUNET_CORE_MonitorHandle *` - * @param size number of bytes available in @a buf - * @param buf where the callee should write the message - * @return number of bytes written to @a buf + * @param mh monitoring session to reconnect to CORE */ -static size_t -transmit_monitor_request (void *cls, - size_t size, - void *buf); +static void +reconnect (struct GNUNET_CORE_MonitorHandle *mh); /** - * Protocol error, reconnect to CORE service and notify - * client. + * Generic error handler, called with the appropriate error code and + * the same closure specified at the creation of the message queue. + * Not every message queue implementation supports an error handler. * - * @param mh monitoring session to reconnect to CORE + * @param cls closure, a `struct GNUNET_CORE_MonitorHandle *` + * @param error error code */ static void -reconnect (struct GNUNET_CORE_MonitorHandle *mh) +handle_mq_error (void *cls, + enum GNUNET_MQ_Error error) { - GNUNET_CLIENT_disconnect (mh->client); - /* FIXME: use backoff? */ - mh->client = GNUNET_CLIENT_connect ("core", mh->cfg); - GNUNET_assert (NULL != mh->client); - mh->th = - GNUNET_CLIENT_notify_transmit_ready (mh->client, - sizeof (struct GNUNET_MessageHeader), - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_YES, - &transmit_monitor_request, mh); - /* notify callback about reconnect */ - mh->peer_cb (mh->peer_cb_cls, - NULL, - GNUNET_CORE_KX_CORE_DISCONNECT, - GNUNET_TIME_UNIT_FOREVER_ABS); + struct GNUNET_CORE_MonitorHandle *mh = cls; + + reconnect (mh); } @@ -112,34 +90,14 @@ reconnect (struct GNUNET_CORE_MonitorHandle *mh) * Receive reply from CORE service with information about a peer. * * @param cls our `struct GNUNET_CORE_MonitorHandle *` - * @param msg NULL on error or last entry + * @param mon_message monitor message */ static void -receive_info (void *cls, - const struct GNUNET_MessageHeader *msg) +handle_receive_info (void *cls, + const struct MonitorNotifyMessage *mon_message) { struct GNUNET_CORE_MonitorHandle *mh = cls; - const struct MonitorNotifyMessage *mon_message; - uint16_t msize; - if (NULL == msg) - { - reconnect (mh); - return; - } - msize = ntohs (msg->size); - /* Handle incorrect message type or size, disconnect and clean up */ - if ((ntohs (msg->type) != GNUNET_MESSAGE_TYPE_CORE_MONITOR_NOTIFY) || - (sizeof (struct MonitorNotifyMessage) != msize)) - { - GNUNET_break (0); - reconnect (mh); - return; - } - mon_message = (const struct MonitorNotifyMessage *) msg; - GNUNET_CLIENT_receive (mh->client, - &receive_info, mh, - GNUNET_TIME_UNIT_FOREVER_REL); mh->peer_cb (mh->peer_cb_cls, &mon_message->peer, (enum GNUNET_CORE_KxState) ntohl (mon_message->state), @@ -148,40 +106,43 @@ receive_info (void *cls, /** - * Transmits the monitor request to the CORE service. - * - * Function is called to notify a client about the socket begin ready - * to queue more data. @a buf will be NULL and @a size zero if the - * socket was closed for writing in the meantime. + * Protocol error, reconnect to CORE service and notify + * client. * - * @param cls closure, our `struct GNUNET_CORE_MonitorHandle *` - * @param size number of bytes available in @a buf - * @param buf where the callee should write the message - * @return number of bytes written to @a buf + * @param mh monitoring session to reconnect to CORE */ -static size_t -transmit_monitor_request (void *cls, - size_t size, - void *buf) +static void +reconnect (struct GNUNET_CORE_MonitorHandle *mh) { - struct GNUNET_CORE_MonitorHandle *mh = cls; + GNUNET_MQ_hd_fixed_size (receive_info, + GNUNET_MESSAGE_TYPE_CORE_MONITOR_NOTIFY, + struct MonitorNotifyMessage); + struct GNUNET_MQ_MessageHandler handlers[] = { + make_receive_info_handler (mh), + GNUNET_MQ_handler_end () + }; + struct GNUNET_MQ_Envelope *env; struct GNUNET_MessageHeader *msg; - int msize; - mh->th = NULL; - msize = sizeof (struct GNUNET_MessageHeader); - if ((size < msize) || (NULL == buf)) - { - reconnect (mh); - return 0; - } - msg = (struct GNUNET_MessageHeader *) buf; - msg->size = htons (msize); - msg->type = htons (GNUNET_MESSAGE_TYPE_CORE_MONITOR_PEERS); - GNUNET_CLIENT_receive (mh->client, - &receive_info, mh, - GNUNET_TIME_UNIT_FOREVER_REL); - return msize; + if (NULL != mh->mq) + GNUNET_MQ_destroy (mh->mq); + /* FIXME: use backoff? */ + mh->mq = GNUNET_CLIENT_connecT (mh->cfg, + "core", + handlers, + &handle_mq_error, + mh); + if (NULL == mh->mq) + return; + /* notify callback about reconnect */ + mh->peer_cb (mh->peer_cb_cls, + NULL, + GNUNET_CORE_KX_CORE_DISCONNECT, + GNUNET_TIME_UNIT_FOREVER_ABS); + env = GNUNET_MQ_msg (msg, + GNUNET_MESSAGE_TYPE_CORE_MONITOR_PEERS); + GNUNET_MQ_send (mh->mq, + env); } @@ -207,23 +168,18 @@ GNUNET_CORE_monitor_start (const struct GNUNET_CONFIGURATION_Handle *cfg, void *peer_cb_cls) { struct GNUNET_CORE_MonitorHandle *mh; - struct GNUNET_CLIENT_Connection *client; GNUNET_assert (NULL != peer_cb); - client = GNUNET_CLIENT_connect ("core", cfg); - if (NULL == client) - return NULL; mh = GNUNET_new (struct GNUNET_CORE_MonitorHandle); mh->cfg = cfg; - mh->client = client; mh->peer_cb = peer_cb; mh->peer_cb_cls = peer_cb_cls; - mh->th = - GNUNET_CLIENT_notify_transmit_ready (client, - sizeof (struct GNUNET_MessageHeader), - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_YES, - &transmit_monitor_request, mh); + reconnect (mh); + if (NULL == mh->mq) + { + GNUNET_free (mh); + return NULL; + } return mh; } @@ -236,15 +192,10 @@ GNUNET_CORE_monitor_start (const struct GNUNET_CONFIGURATION_Handle *cfg, void GNUNET_CORE_monitor_stop (struct GNUNET_CORE_MonitorHandle *mh) { - if (NULL != mh->th) - { - GNUNET_CLIENT_notify_transmit_ready_cancel (mh->th); - mh->th = NULL; - } - if (NULL != mh->client) + if (NULL != mh->mq) { - GNUNET_CLIENT_disconnect (mh->client); - mh->client = NULL; + GNUNET_MQ_destroy (mh->mq); + mh->mq = NULL; } GNUNET_free (mh); }