GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
- by the Free Software Foundation; either version 2, or (at your
+ by the Free Software Foundation; either version 3, or (at your
option) any later version.
GNUnet is distributed in the hope that it will be useful, but
* @file util/mq.c
* @brief general purpose request queue
*/
-
#include "platform.h"
-#include "gnunet_common.h"
#include "gnunet_util_lib.h"
#define LOG(kind,...) GNUNET_log_from (kind, "mq",__VA_ARGS__)
};
-
-
/**
- * Call the right callback for a message.
+ * Call the message message handler that was registered
+ * for the type of the given message in the given message queue.
+ *
+ * This function is indended to be used for the implementation
+ * of message queues.
*
* @param mq message queue with the handlers
* @param mh message to dispatch
handled = GNUNET_YES;
}
}
-
+
if (GNUNET_NO == handled)
LOG (GNUNET_ERROR_TYPE_WARNING, "no handler for message of type %d\n", ntohs (mh->type));
}
/**
- * Call the right callback for an error condition.
+ * Call the error handler of a message queue with the given
+ * error code. If there is no error handler, log a warning.
+ *
+ * This function is intended to be used by the implementation
+ * of message queues.
*
* @param mq message queue
+ * @param error the error type
*/
void
GNUNET_MQ_inject_error (struct GNUNET_MQ_Handle *mq,
/**
* Send a message with the give message queue.
* May only be called once per message.
- *
+ *
* @param mq message queue
- * @param ev the message to send.
+ * @param ev the envelope with the message to send.
*/
void
GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev)
{
GNUNET_assert (NULL != mq);
GNUNET_assert (NULL == ev->parent_queue);
-
+
/* is the implementation busy? queue it! */
if (NULL != mq->current_envelope)
{
*
* @param send function the implements sending messages
* @param destroy function that implements destroying the queue
- * @param destroy function that implements canceling a message
- * @param state for the queue, passed to 'send' and 'destroy'
+ * @param cancel function that implements canceling a message
+ * @param impl_state for the queue, passed to 'send' and 'destroy'
* @param handlers array of message handlers
* @param error_handler handler for read and write errors
+ * @param cls closure for message handlers and error handler
* @return a new message queue
*/
struct GNUNET_MQ_Handle *
}
+/**
+ * Implementation of the GNUNET_MQ_msg_nested_mh macro.
+ *
+ * @param mhp pointer to the message header pointer that will be changed to allocate at
+ * the newly allocated space for the message.
+ * @param base_size size of the data before the nested message
+ * @param type type of the message in the envelope
+ * @param nested_mh the message to append to the message after base_size
+ */
struct GNUNET_MQ_Envelope *
GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp, uint16_t base_size, uint16_t type,
const struct GNUNET_MessageHeader *nested_mh)
void *impl_state)
{
struct ServerClientSocketState *state = impl_state;
-
+
GNUNET_assert (NULL != mq);
GNUNET_assert (NULL != state);
GNUNET_SERVER_client_drop (state->client);
GNUNET_MQ_impl_send_commit (mq);
- state->th =
+ state->th =
GNUNET_SERVER_notify_transmit_ready (state->client, ntohs (msg->size),
GNUNET_TIME_UNIT_FOREVER_REL,
&transmit_queued, mq);
struct ClientConnectionState *state;
state = mq->impl_state;
-
+
if (NULL == msg)
{
GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ);
GNUNET_MQ_impl_send_commit (mq);
- state->th =
- GNUNET_CLIENT_notify_transmit_ready (state->connection, ntohs (msg->size),
+ state->th =
+ GNUNET_CLIENT_notify_transmit_ready (state->connection, ntohs (msg->size),
GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO,
&connection_client_transmit_queued, mq);
}
struct GNUNET_MQ_Handle *
GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connection,
const struct GNUNET_MQ_MessageHandler *handlers,
+ GNUNET_MQ_ErrorHandler error_handler,
void *cls)
{
struct GNUNET_MQ_Handle *mq;
mq = GNUNET_new (struct GNUNET_MQ_Handle);
mq->handlers = handlers;
+ mq->error_handler = error_handler;
mq->handlers_cls = cls;
state = GNUNET_new (struct ClientConnectionState);
state->connection = connection;
* Associate the assoc_data in mq with a unique request id.
*
* @param mq message queue, id will be unique for the queue
- * @param mqm message to associate
* @param assoc_data to associate
*/
uint32_t
if (NULL == mq->assoc_map)
return NULL;
val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
- GNUNET_assert (NULL != val);
- GNUNET_CONTAINER_multihashmap32_remove (mq->assoc_map, request_id, val);
+ GNUNET_CONTAINER_multihashmap32_remove_all (mq->assoc_map, request_id);
return val;
}
void
GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq)
{
- /* FIXME: destroy all pending messages in the queue */
-
if (NULL != mq->destroy_impl)
{
mq->destroy_impl (mq, mq->impl_state);
}
+ while (NULL != mq->envelope_head)
+ {
+ struct GNUNET_MQ_Envelope *ev;
+ ev = mq->envelope_head;
+ GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, ev);
+ GNUNET_MQ_discard (ev);
+ }
+
+ if (NULL != mq->current_envelope)
+ {
+ GNUNET_MQ_discard (mq->current_envelope);
+ mq->current_envelope = NULL;
+ }
+
+ if (NULL != mq->assoc_map)
+ {
+ GNUNET_CONTAINER_multihashmap32_destroy (mq->assoc_map);
+ mq->assoc_map = NULL;
+ }
+
GNUNET_free (mq);
}
-
struct GNUNET_MessageHeader *
GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh, uint16_t base_size)
{