/*
This file is part of GNUnet.
- Copyright (C) 2009-2014 GNUnet e.V.
+ Copyright (C) 2009-2014, 2016 GNUnet e.V.
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
/**
* Our connection to the service.
*/
- struct GNUNET_CLIENT_Connection *client;
-
- /**
- * Handle for transmitting a request.
- */
- struct GNUNET_CLIENT_TransmitHandle *th;
+ struct GNUNET_MQ_Handle *mq;
/**
* Function called with the peer.
/**
- * Transmits the monitor request to the CORE service.
- *
- * Function is called to notify a client about the socket begin ready
- * to queue more data. @a buf will be NULL and @a size zero if the
- * socket was closed for writing in the meantime.
+ * Protocol error, reconnect to CORE service and notify
+ * client.
*
- * @param cls closure, our `struct GNUNET_CORE_MonitorHandle *`
- * @param size number of bytes available in @a buf
- * @param buf where the callee should write the message
- * @return number of bytes written to @a buf
+ * @param mh monitoring session to reconnect to CORE
*/
-static size_t
-transmit_monitor_request (void *cls,
- size_t size,
- void *buf);
+static void
+reconnect (struct GNUNET_CORE_MonitorHandle *mh);
/**
- * Protocol error, reconnect to CORE service and notify
- * client.
+ * Generic error handler, called with the appropriate error code and
+ * the same closure specified at the creation of the message queue.
+ * Not every message queue implementation supports an error handler.
*
- * @param mh monitoring session to reconnect to CORE
+ * @param cls closure, a `struct GNUNET_CORE_MonitorHandle *`
+ * @param error error code
*/
static void
-reconnect (struct GNUNET_CORE_MonitorHandle *mh)
+handle_mq_error (void *cls,
+ enum GNUNET_MQ_Error error)
{
- GNUNET_CLIENT_disconnect (mh->client);
- /* FIXME: use backoff? */
- mh->client = GNUNET_CLIENT_connect ("core", mh->cfg);
- GNUNET_assert (NULL != mh->client);
- mh->th =
- GNUNET_CLIENT_notify_transmit_ready (mh->client,
- sizeof (struct GNUNET_MessageHeader),
- GNUNET_TIME_UNIT_FOREVER_REL,
- GNUNET_YES,
- &transmit_monitor_request, mh);
- /* notify callback about reconnect */
- mh->peer_cb (mh->peer_cb_cls,
- NULL,
- GNUNET_CORE_KX_CORE_DISCONNECT,
- GNUNET_TIME_UNIT_FOREVER_ABS);
+ struct GNUNET_CORE_MonitorHandle *mh = cls;
+
+ reconnect (mh);
}
* Receive reply from CORE service with information about a peer.
*
* @param cls our `struct GNUNET_CORE_MonitorHandle *`
- * @param msg NULL on error or last entry
+ * @param mon_message monitor message
*/
static void
-receive_info (void *cls,
- const struct GNUNET_MessageHeader *msg)
+handle_receive_info (void *cls,
+ const struct MonitorNotifyMessage *mon_message)
{
struct GNUNET_CORE_MonitorHandle *mh = cls;
- const struct MonitorNotifyMessage *mon_message;
- uint16_t msize;
- if (NULL == msg)
- {
- reconnect (mh);
- return;
- }
- msize = ntohs (msg->size);
- /* Handle incorrect message type or size, disconnect and clean up */
- if ((ntohs (msg->type) != GNUNET_MESSAGE_TYPE_CORE_MONITOR_NOTIFY) ||
- (sizeof (struct MonitorNotifyMessage) != msize))
- {
- GNUNET_break (0);
- reconnect (mh);
- return;
- }
- mon_message = (const struct MonitorNotifyMessage *) msg;
- GNUNET_CLIENT_receive (mh->client,
- &receive_info, mh,
- GNUNET_TIME_UNIT_FOREVER_REL);
mh->peer_cb (mh->peer_cb_cls,
&mon_message->peer,
(enum GNUNET_CORE_KxState) ntohl (mon_message->state),
/**
- * Transmits the monitor request to the CORE service.
- *
- * Function is called to notify a client about the socket begin ready
- * to queue more data. @a buf will be NULL and @a size zero if the
- * socket was closed for writing in the meantime.
+ * Protocol error, reconnect to CORE service and notify
+ * client.
*
- * @param cls closure, our `struct GNUNET_CORE_MonitorHandle *`
- * @param size number of bytes available in @a buf
- * @param buf where the callee should write the message
- * @return number of bytes written to @a buf
+ * @param mh monitoring session to reconnect to CORE
*/
-static size_t
-transmit_monitor_request (void *cls,
- size_t size,
- void *buf)
+static void
+reconnect (struct GNUNET_CORE_MonitorHandle *mh)
{
- struct GNUNET_CORE_MonitorHandle *mh = cls;
+ GNUNET_MQ_hd_fixed_size (receive_info,
+ GNUNET_MESSAGE_TYPE_CORE_MONITOR_NOTIFY,
+ struct MonitorNotifyMessage);
+ struct GNUNET_MQ_MessageHandler handlers[] = {
+ make_receive_info_handler (mh),
+ GNUNET_MQ_handler_end ()
+ };
+ struct GNUNET_MQ_Envelope *env;
struct GNUNET_MessageHeader *msg;
- int msize;
- mh->th = NULL;
- msize = sizeof (struct GNUNET_MessageHeader);
- if ((size < msize) || (NULL == buf))
- {
- reconnect (mh);
- return 0;
- }
- msg = (struct GNUNET_MessageHeader *) buf;
- msg->size = htons (msize);
- msg->type = htons (GNUNET_MESSAGE_TYPE_CORE_MONITOR_PEERS);
- GNUNET_CLIENT_receive (mh->client,
- &receive_info, mh,
- GNUNET_TIME_UNIT_FOREVER_REL);
- return msize;
+ if (NULL != mh->mq)
+ GNUNET_MQ_destroy (mh->mq);
+ /* FIXME: use backoff? */
+ mh->mq = GNUNET_CLIENT_connecT (mh->cfg,
+ "core",
+ handlers,
+ &handle_mq_error,
+ mh);
+ if (NULL == mh->mq)
+ return;
+ /* notify callback about reconnect */
+ mh->peer_cb (mh->peer_cb_cls,
+ NULL,
+ GNUNET_CORE_KX_CORE_DISCONNECT,
+ GNUNET_TIME_UNIT_FOREVER_ABS);
+ env = GNUNET_MQ_msg (msg,
+ GNUNET_MESSAGE_TYPE_CORE_MONITOR_PEERS);
+ GNUNET_MQ_send (mh->mq,
+ env);
}
void *peer_cb_cls)
{
struct GNUNET_CORE_MonitorHandle *mh;
- struct GNUNET_CLIENT_Connection *client;
GNUNET_assert (NULL != peer_cb);
- client = GNUNET_CLIENT_connect ("core", cfg);
- if (NULL == client)
- return NULL;
mh = GNUNET_new (struct GNUNET_CORE_MonitorHandle);
mh->cfg = cfg;
- mh->client = client;
mh->peer_cb = peer_cb;
mh->peer_cb_cls = peer_cb_cls;
- mh->th =
- GNUNET_CLIENT_notify_transmit_ready (client,
- sizeof (struct GNUNET_MessageHeader),
- GNUNET_TIME_UNIT_FOREVER_REL,
- GNUNET_YES,
- &transmit_monitor_request, mh);
+ reconnect (mh);
+ if (NULL == mh->mq)
+ {
+ GNUNET_free (mh);
+ return NULL;
+ }
return mh;
}
void
GNUNET_CORE_monitor_stop (struct GNUNET_CORE_MonitorHandle *mh)
{
- if (NULL != mh->th)
- {
- GNUNET_CLIENT_notify_transmit_ready_cancel (mh->th);
- mh->th = NULL;
- }
- if (NULL != mh->client)
+ if (NULL != mh->mq)
{
- GNUNET_CLIENT_disconnect (mh->client);
- mh->client = NULL;
+ GNUNET_MQ_destroy (mh->mq);
+ mh->mq = NULL;
}
GNUNET_free (mh);
}