* @author Christian Grothoff
*/
#include "platform.h"
+#include "gnunet_util_lib.h"
#include "gnunet_constants.h"
#include "gnunet_core_service.h"
#include "core.h"
};
+struct CoreMQState
+{
+ struct GNUNET_PeerIdentity target;
+ struct GNUNET_CORE_Handle *core;
+ struct GNUNET_CORE_TransmitHandle *th;
+};
+
/**
* Type of function called upon completion.
/**
* Task schedule to try to re-connect to core.
*
- * @param cls the 'struct GNUNET_CORE_Handle'
+ * @param cls the `struct GNUNET_CORE_Handle`
* @param tc task context
*/
static void
struct GNUNET_CORE_Handle *h = cls;
h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to CORE service after delay\n");
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Connecting to CORE service after delay\n");
reconnect (h);
}
* Notify clients about disconnect and free
* the entry for connected peer.
*
- * @param cls the 'struct GNUNET_CORE_Handle*'
+ * @param cls the `struct GNUNET_CORE_Handle *`
* @param key the peer identity (not used)
- * @param value the 'struct PeerRecord' to free.
+ * @param value the `struct PeerRecord` to free.
* @return #GNUNET_YES (continue)
*/
static int
h->currently_down = GNUNET_YES;
GNUNET_assert (h->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
h->reconnect_task =
- GNUNET_SCHEDULER_add_delayed (h->retry_backoff, &reconnect_task, h);
+ GNUNET_SCHEDULER_add_delayed (h->retry_backoff,
+ &reconnect_task, h);
while (NULL != (cm = h->control_pending_head))
{
GNUNET_CONTAINER_DLL_remove (h->control_pending_head,
* @param tc context, can be NULL (!)
*/
static void
-transmission_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
+transmission_timeout (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc);
/**
* @param tc context, can be NULL (!)
*/
static void
-transmission_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+transmission_timeout (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct PeerRecord *pr = cls;
struct GNUNET_CORE_Handle *h = pr->ch;
/**
* Transmit the next message to the core service.
*
- * @param cls closure with the 'struct GNUNET_CORE_Handle'
+ * @param cls closure with the `struct GNUNET_CORE_Handle`
* @param size number of bytes available in @a buf
* @param buf where the callee should write the message
- * @return number of bytes written to buf
+ * @return number of bytes written to @a buf
*/
static size_t
transmit_message (void *cls, size_t size, void *buf)
/**
* Handler for notification messages received from the core.
*
- * @param cls our "struct GNUNET_CORE_Handle"
+ * @param cls our `struct GNUNET_CORE_Handle`
* @param msg the message received from the core service
*/
static void
unsigned int hpos;
GNUNET_assert (NULL == h->client);
- GNUNET_assert (h->currently_down == GNUNET_YES);
+ GNUNET_assert (GNUNET_YES == h->currently_down);
+ GNUNET_assert (NULL != h->cfg);
h->client = GNUNET_CLIENT_connect ("core", h->cfg);
if (NULL == h->client)
{
GNUNET_assert (h->hcnt <
(GNUNET_SERVER_MAX_MESSAGE_SIZE -
sizeof (struct InitMessage)) / sizeof (uint16_t));
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to CORE service\n");
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Connecting to CORE service\n");
reconnect (h);
return h;
}
}
+/**
+ * Function called to notify a client about the connection
+ * begin ready to queue more data. "buf" will be
+ * NULL and "size" zero if the connection was closed for
+ * writing in the meantime.
+ *
+ * @param cls closure
+ * @param size number of bytes available in buf
+ * @param buf where the callee should write the message
+ * @return number of bytes written to buf
+ */
+static size_t
+core_mq_ntr (void *cls, size_t size,
+ void *buf)
+{
+ struct GNUNET_MQ_Handle *mq = cls;
+ struct CoreMQState *mqs = GNUNET_MQ_impl_state (mq);
+ const struct GNUNET_MessageHeader *mh = GNUNET_MQ_impl_current (mq);
+ size_t msg_size = ntohs (mh->size);
+ GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "core-mq", "ntr called (size %u, type %u)\n",
+ msg_size, ntohs (mh->type));
+ mqs->th = NULL;
+ if (NULL == buf)
+ {
+ GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "core-mq", "send error\n");
+ GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_WRITE);
+ return 0;
+ }
+ memcpy (buf, mh, msg_size);
+ GNUNET_MQ_impl_send_continue (mq);
+ return msg_size;
+}
+
+
+/**
+ * Signature of functions implementing the
+ * sending functionality of a message queue.
+ *
+ * @param mq the message queue
+ * @param msg the message to send
+ * @param impl_state state of the implementation
+ */
+static void
+core_mq_send (struct GNUNET_MQ_Handle *mq,
+ const struct GNUNET_MessageHeader *msg,
+ void *impl_state)
+{
+ struct CoreMQState *mqs = impl_state;
+ GNUNET_assert (NULL == mqs->th);
+ GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "core-mq", "Sending queued message (size %u)\n",
+ ntohs (msg->size));
+ mqs->th = GNUNET_CORE_notify_transmit_ready (mqs->core, GNUNET_YES, 0,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &mqs->target,
+ ntohs (msg->size), core_mq_ntr, mq);
+}
+
+
+/**
+ * Signature of functions implementing the
+ * destruction of a message queue.
+ * Implementations must not free @a mq, but should
+ * take care of @a impl_state.
+ *
+ * @param mq the message queue to destroy
+ * @param impl_state state of the implementation
+ */
+static void
+core_mq_destroy (struct GNUNET_MQ_Handle *mq, void *impl_state)
+{
+ struct CoreMQState *mqs = impl_state;
+ if (NULL != mqs->th)
+ {
+ GNUNET_CORE_notify_transmit_ready_cancel (mqs->th);
+ mqs->th = NULL;
+ }
+ GNUNET_free (mqs);
+}
+
+
+/**
+ * Implementation function that cancels the currently sent message.
+ *
+ * @param mq message queue
+ * @param impl_state state specific to the implementation
+ */
+static void
+core_mq_cancel (struct GNUNET_MQ_Handle *mq, void *impl_state)
+{
+ struct CoreMQState *mqs = impl_state;
+ GNUNET_assert (NULL != mqs->th);
+ GNUNET_CORE_notify_transmit_ready_cancel (mqs->th);
+}
+
+
+/**
+ * Create a message queue for sending messages to a peer with CORE.
+ * Messages may only be queued with #GNUNET_MQ_send once the init callback has
+ * been called for the given handle.
+ * There must only be one queue per peer for each core handle.
+ * The message queue can only be used to transmit messages,
+ * not to receive them.
+ *
+ * @param h the core handle
+ * @param target the target peer for this queue, may not be NULL
+ * @return a message queue for sending messages over the core handle
+ * to the target peer
+ */
+struct GNUNET_MQ_Handle *
+GNUNET_CORE_mq_create (struct GNUNET_CORE_Handle *h,
+ const struct GNUNET_PeerIdentity *target)
+{
+ struct CoreMQState *mqs = GNUNET_new (struct CoreMQState);
+ mqs->core = h;
+ mqs->target = *target;
+ return GNUNET_MQ_queue_for_callbacks (core_mq_send, core_mq_destroy,
+ core_mq_cancel, mqs,
+ NULL, NULL, NULL);
+}
+
/* end of core_api.c */