/*
This file is part of GNUnet.
- Copyright (C) 2013 Christian Grothoff (and other contributing authors)
+ Copyright (C) 2013 GNUnet e.V.
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
You should have received a copy of the GNU General Public License
along with GNUnet; see the file COPYING. If not, write to the
- Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- Boston, MA 02111-1307, USA.
+ Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ Boston, MA 02110-1301, USA.
*/
/**
#define LOG(kind,...) GNUNET_log_from (kind, "util-client-mgr", __VA_ARGS__)
+struct OperationListItem
+{
+ struct OperationListItem *prev;
+ struct OperationListItem *next;
+
+ /**
+ * Operation ID.
+ */
+ uint64_t op_id;
+
+ /**
+ * Continuation to invoke with the result of an operation.
+ */
+ GNUNET_ResultCallback result_cb;
+
+ /**
+ * Closure for @a result_cb.
+ */
+ void *cls;
+};
+
+
/**
* List of arrays of message handlers.
*/
*/
const struct GNUNET_CLIENT_MANAGER_MessageHandler *handlers;
+ /**
+ * First operation in the linked list.
+ */
+ struct OperationListItem *op_head;
+
+ /**
+ * Last operation in the linked list.
+ */
+ struct OperationListItem *op_tail;
+
+ /**
+ * Last operation ID used.
+ */
+ uint64_t last_op_id;
+
/**
* Disconnect callback.
*/
size = ntohs (msg->size);
/* FIXME: decrease reconnect_delay gradually after a successful reconnection */
}
+ else /* disconnected */
+ {
+ mgr->client_tmit = NULL;
+ }
+
+ if (GNUNET_YES == mgr->is_disconnecting)
+ return;
size_t i = 0;
while (NULL != mgr->handlers[i].callback)
static void
-schedule_disconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+schedule_disconnect (void *cls)
{
struct GNUNET_CLIENT_MANAGER_Connection *mgr = cls;
+
GNUNET_CLIENT_MANAGER_disconnect (mgr, GNUNET_NO,
- mgr->disconnect_cb, mgr->disconnect_cls);
+ mgr->disconnect_cb,
+ mgr->disconnect_cls);
}
*
* @param cls
* struct GNUNET_CLIENT_MANAGER_Connection
- * @param size
+ * @param buf_size
* Number of bytes available in @a buf.
* @param buf
* Where to copy the message.
static size_t
send_next_message (void *cls, size_t buf_size, void *buf)
{
- LOG (GNUNET_ERROR_TYPE_DEBUG, "send_next_message()\n");
struct GNUNET_CLIENT_MANAGER_Connection *mgr = cls;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "send_next_message()\n");
if (NULL == buf)
{
/* disconnected */
uint16_t size = ntohs (mqi->msg->size);
mgr->client_tmit = NULL;
GNUNET_assert (size <= buf_size);
- memcpy (buf, mqi->msg, size);
+ GNUNET_memcpy (buf, mqi->msg, size);
- GNUNET_CONTAINER_DLL_remove (mgr->tmit_head, mgr->tmit_tail, mqi);
+ GNUNET_CONTAINER_DLL_remove (mgr->tmit_head,
+ mgr->tmit_tail,
+ mqi);
GNUNET_free (mqi->msg);
GNUNET_free (mqi);
}
else if (GNUNET_YES == mgr->is_disconnecting)
{
- GNUNET_SCHEDULER_add_now (&schedule_disconnect, mgr);
+ (void) GNUNET_SCHEDULER_add_now (&schedule_disconnect, mgr);
return size;
}
{
if (GNUNET_YES == mgr->is_disconnecting)
GNUNET_CLIENT_MANAGER_disconnect (mgr, GNUNET_NO,
- mgr->disconnect_cb, mgr->disconnect_cls);
+ mgr->disconnect_cb,
+ mgr->disconnect_cls);
return;
}
mgr->client_tmit
= GNUNET_CLIENT_notify_transmit_ready (mgr->client,
- ntohs (mgr->tmit_head->msg->size),
+ GNUNET_SERVER_MAX_MESSAGE_SIZE - 1,
GNUNET_TIME_UNIT_FOREVER_REL,
GNUNET_NO,
&send_next_message,
*
* @param cls
* Channel handle.
- * @param tc
- * Scheduler context.
*/
static void
-schedule_reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+schedule_reconnect (void *cls)
{
struct GNUNET_CLIENT_MANAGER_Connection *mgr = cls;
- mgr->reconnect_task = NULL;
+ mgr->reconnect_task = NULL;
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Connecting to %s service.\n", mgr->service_name);
+ "Connecting to %s service.\n",
+ mgr->service_name);
GNUNET_assert (NULL == mgr->client);
- mgr->client = GNUNET_CLIENT_connect (mgr->service_name, mgr->cfg);
+ mgr->client = GNUNET_CLIENT_connect (mgr->service_name,
+ mgr->cfg);
GNUNET_assert (NULL != mgr->client);
-
transmit_next (mgr);
}
struct GNUNET_CLIENT_MANAGER_Connection *
GNUNET_CLIENT_MANAGER_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
const char *service_name,
- const struct
- GNUNET_CLIENT_MANAGER_MessageHandler *handlers)
+ const struct GNUNET_CLIENT_MANAGER_MessageHandler *handlers)
{
- struct GNUNET_CLIENT_MANAGER_Connection *
- mgr = GNUNET_malloc (sizeof (*mgr));
+ struct GNUNET_CLIENT_MANAGER_Connection *mgr;
+
+ mgr = GNUNET_new (struct GNUNET_CLIENT_MANAGER_Connection);
mgr->cfg = cfg;
mgr->service_name = service_name;
mgr->handlers = handlers;
mgr->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
- mgr->reconnect_task = GNUNET_SCHEDULER_add_now (&schedule_reconnect, mgr);
+ mgr->reconnect_task = GNUNET_SCHEDULER_add_now (&schedule_reconnect,
+ mgr);
return mgr;
}
GNUNET_ContinuationCallback disconnect_cb,
void *cls)
{
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting (%d)\n", transmit_queue);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Disconnecting (%d)\n",
+ transmit_queue);
mgr->disconnect_cb = disconnect_cb;
mgr->disconnect_cls = cls;
if (NULL != mgr->tmit_head)
GNUNET_CLIENT_MANAGER_drop_queue (mgr);
}
}
- if (mgr->reconnect_task != NULL)
+ if (NULL != mgr->reconnect_task)
{
GNUNET_SCHEDULER_cancel (mgr->reconnect_task);
mgr->reconnect_task = NULL;
"Scheduling task to reconnect to service in %s.\n",
GNUNET_STRINGS_relative_time_to_string (mgr->reconnect_delay, GNUNET_YES));
mgr->reconnect_task =
- GNUNET_SCHEDULER_add_delayed (mgr->reconnect_delay, &schedule_reconnect, mgr);
+ GNUNET_SCHEDULER_add_delayed (mgr->reconnect_delay,
+ &schedule_reconnect,
+ mgr);
mgr->reconnect_delay = GNUNET_TIME_STD_BACKOFF (mgr->reconnect_delay);
}
GNUNET_CLIENT_MANAGER_transmit (struct GNUNET_CLIENT_MANAGER_Connection *mgr,
struct GNUNET_MessageHeader *msg)
{
- struct MessageQueueItem *mqi = GNUNET_malloc (sizeof (*mqi));
- mqi->msg = msg;
- GNUNET_CONTAINER_DLL_insert_tail (mgr->tmit_head, mgr->tmit_tail, mqi);
+ struct MessageQueueItem *mqi;
+
+ mqi = GNUNET_new (struct MessageQueueItem);
+ mqi->msg = GNUNET_copy_message (msg);
+ GNUNET_CONTAINER_DLL_insert_tail (mgr->tmit_head,
+ mgr->tmit_tail,
+ mqi);
transmit_next (mgr);
}
GNUNET_CLIENT_MANAGER_transmit_now (struct GNUNET_CLIENT_MANAGER_Connection *mgr,
struct GNUNET_MessageHeader *msg)
{
- struct MessageQueueItem *mqi = GNUNET_malloc (sizeof (*mqi));
- mqi->msg = msg;
- GNUNET_CONTAINER_DLL_insert (mgr->tmit_head, mgr->tmit_tail, mqi);
+ struct MessageQueueItem *mqi;
+
+ mqi = GNUNET_new (struct MessageQueueItem);
+ mqi->msg = GNUNET_copy_message (msg);
+ GNUNET_CONTAINER_DLL_insert (mgr->tmit_head,
+ mgr->tmit_tail,
+ mqi);
transmit_next (mgr);
}
void
GNUNET_CLIENT_MANAGER_drop_queue (struct GNUNET_CLIENT_MANAGER_Connection *mgr)
{
- struct MessageQueueItem *cur, *next = mgr->tmit_head;
+ struct MessageQueueItem *cur;
+ struct MessageQueueItem *next;
+
+ next = mgr->tmit_head;
while (NULL != next)
{
cur = next;
mgr->user_ctx_size = size;
mgr->user_ctx = ctx;
}
+
+
+/**
+ * Get a unique operation ID to distinguish between asynchronous requests.
+ *
+ * @param mgr
+ * Client manager connection.
+ *
+ * @return Operation ID to use.
+ */
+uint64_t
+GNUNET_CLIENT_MANAGER_op_get_next_id (struct GNUNET_CLIENT_MANAGER_Connection *mgr)
+{
+ return ++mgr->last_op_id;
+}
+
+
+/**
+ * Find operation by ID.
+ *
+ * @param mgr
+ * Client manager connection.
+ * @param op_id
+ * Operation ID to look up.
+ *
+ * @return Operation, or NULL if not found.
+ */
+static struct OperationListItem *
+op_find (struct GNUNET_CLIENT_MANAGER_Connection *mgr,
+ uint64_t op_id)
+{
+ struct OperationListItem *op;
+
+ for (op = mgr->op_head; NULL != op; op = op->next)
+ if (op->op_id == op_id)
+ return op;
+ return NULL;
+}
+
+
+/**
+ * Find operation by ID.
+ *
+ * @param mgr
+ * Client manager connection.
+ * @param op_id
+ * Operation ID to look up.
+ * @param[out] result_cb
+ * If an operation was found, its result callback is returned here.
+ * @param[out] cls
+ * If an operation was found, its closure is returned here.
+ *
+ * @return #GNUNET_YES if an operation was found,
+ * #GNUNET_NO if not found.
+ */
+int
+GNUNET_CLIENT_MANAGER_op_find (struct GNUNET_CLIENT_MANAGER_Connection *mgr,
+ uint64_t op_id,
+ GNUNET_ResultCallback *result_cb,
+ void **cls)
+{
+ struct OperationListItem *op = op_find (mgr, op_id);
+ if (NULL != op)
+ {
+ *result_cb = op->result_cb;
+ *cls = op->cls;
+ return GNUNET_YES;
+ }
+ return GNUNET_NO;
+}
+
+
+/**
+ * Add a new operation.
+ *
+ * @param mgr
+ * Client manager connection.
+ * @param result_cb
+ * Function to call with the result of the operation.
+ * @param cls
+ * Closure for @a result_cb.
+ *
+ * @return ID of the new operation.
+ */
+uint64_t
+GNUNET_CLIENT_MANAGER_op_add (struct GNUNET_CLIENT_MANAGER_Connection *mgr,
+ GNUNET_ResultCallback result_cb,
+ void *cls)
+{
+ struct OperationListItem *op;
+
+ if (NULL == result_cb)
+ return 0;
+ op = GNUNET_new (struct OperationListItem);
+ op->op_id = GNUNET_CLIENT_MANAGER_op_get_next_id (mgr);
+ op->result_cb = result_cb;
+ op->cls = cls;
+ GNUNET_CONTAINER_DLL_insert_tail (mgr->op_head,
+ mgr->op_tail,
+ op);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Added operation #%" PRIu64 "\n",
+ mgr,
+ op->op_id);
+ return op->op_id;
+}
+
+
+/**
+ * Remove an operation, and call its result callback (unless it was cancelled).
+ *
+ *
+ * @param mgr
+ * Client manager connection.
+ * @param op_id
+ * Operation ID.
+ * @param result_code
+ * Result of the operation.
+ * @param data
+ * Data result of the operation.
+ * @param data_size
+ * Size of @a data.
+ * @param cancel
+ * Is the operation cancelled?
+ * #GNUNET_NO Not cancelled, result callback is called.
+ * #GNUNET_YES Cancelled, result callback is not called.
+ *
+ * @return #GNUNET_YES if the operation was found and removed,
+ * #GNUNET_NO if the operation was not found.
+ */
+static int
+op_result (struct GNUNET_CLIENT_MANAGER_Connection *mgr,
+ uint64_t op_id,
+ int64_t result_code,
+ const void *data,
+ uint16_t data_size,
+ uint8_t cancel)
+{
+ if (0 == op_id)
+ return GNUNET_NO;
+
+ struct OperationListItem *op = op_find (mgr, op_id);
+ if (NULL == op)
+ {
+ LOG (GNUNET_ERROR_TYPE_WARNING,
+ "Could not find operation #%" PRIu64 "\n", op_id);
+ return GNUNET_NO;
+ }
+
+ GNUNET_CONTAINER_DLL_remove (mgr->op_head,
+ mgr->op_tail,
+ op);
+
+ if ( (GNUNET_YES != cancel) &&
+ (NULL != op->result_cb) )
+ op->result_cb (op->cls,
+ result_code, data,
+ data_size);
+ GNUNET_free (op);
+ return GNUNET_YES;
+}
+
+
+/**
+ * Call the result callback of an operation and remove it.
+ *
+ * @param mgr
+ * Client manager connection.
+ * @param op_id
+ * Operation ID.
+ * @param result_code
+ * Result of the operation.
+ * @param data
+ * Data result of the operation.
+ * @param data_size
+ * Size of @a data.
+ *
+ * @return #GNUNET_YES if the operation was found and removed,
+ * #GNUNET_NO if the operation was not found.
+ */
+int
+GNUNET_CLIENT_MANAGER_op_result (struct GNUNET_CLIENT_MANAGER_Connection *mgr,
+ uint64_t op_id,
+ int64_t result_code,
+ const void *data,
+ uint16_t data_size)
+{
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Received result for operation #%" PRIu64 ": %" PRId64 " (size: %u)\n",
+ mgr, op_id, result_code, data_size);
+ return op_result (mgr, op_id, result_code, data, data_size, GNUNET_NO);
+}
+
+
+/**
+ * Cancel an operation.
+ *
+ * @param mgr
+ * Client manager connection.
+ * @param op_id
+ * Operation ID.
+ *
+ * @return #GNUNET_YES if the operation was found and removed,
+ * #GNUNET_NO if the operation was not found.
+ */
+int
+GNUNET_CLIENT_MANAGER_op_cancel (struct GNUNET_CLIENT_MANAGER_Connection *mgr,
+ uint64_t op_id)
+{
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Cancelling operation #%" PRIu64 "\n",
+ mgr,
+ op_id);
+ return op_result (mgr, op_id, 0, NULL, 0, GNUNET_YES);
+}