struct GNUNET_MQ_Envelope *prev;
/**
- * Actual allocated message header,
- * usually points to the end of the containing GNUNET_MQ_Envelope
+ * Actual allocated message header.
+ * The GNUNET_MQ_Envelope header is allocated at
+ * the end of the message.
*/
struct GNUNET_MessageHeader *mh;
/**
* Called after the message was sent irrevocably.
*/
- GNUNET_MQ_NotifyCallback sent_cb;
+ GNUNET_SCHEDULER_TaskCallback sent_cb;
/**
* Closure for @e send_cb
* Did the application call #GNUNET_MQ_env_set_options()?
*/
int have_custom_options;
-
};
*/
void *error_handler_cls;
+ /**
+ * Task to asynchronously run #impl_send_continue().
+ */
+ struct GNUNET_SCHEDULER_Task *send_task;
+
/**
* Linked list of messages pending to be sent
*/
*/
struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map;
- /**
- * Task scheduled during #GNUNET_MQ_impl_send_continue.
- */
- struct GNUNET_SCHEDULER_Task *continue_task;
-
/**
* Functions to call on queue destruction; kept in a DLL.
*/
* Number of entries we have in the envelope-DLL.
*/
unsigned int queue_length;
-};
-
-/**
- * Implementation-specific state for connection to
- * client (MQ for server).
- */
-struct ServerClientSocketState
-{
/**
- * Handle of the client that connected to the server.
+ * #GNUNET_YES if GNUNET_MQ_impl_evacuate was called.
+ * FIXME: is this dead?
*/
- struct GNUNET_SERVER_Client *client;
+ int evacuate_called;
/**
- * Active transmission request to the client.
+ * #GNUNET_YES if GNUNET_MQ_impl_send_in_flight() was called.
*/
- struct GNUNET_SERVER_TransmitHandle *th;
+ int in_flight;
};
/**
* Implementation-specific state for connection to
- * service (MQ for clients).
+ * client (MQ for server).
*/
-struct ClientConnectionState
+struct ServerClientSocketState
{
/**
- * Did we call receive alread alreadyy?
- */
- int receive_active;
-
- /**
- * Do we also want to receive?
- */
- int receive_requested;
-
- /**
- * Connection to the service.
+ * Handle of the client that connected to the server.
*/
- struct GNUNET_CLIENT_Connection *connection;
+ struct GNUNET_SERVER_Client *client;
/**
- * Active transmission request (or NULL).
+ * Active transmission request to the client.
*/
- struct GNUNET_CLIENT_TransmitHandle *th;
+ struct GNUNET_SERVER_TransmitHandle *th;
};
}
done:
if (GNUNET_NO == handled)
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "No handler for message of type %d\n",
- ntohs (mh->type));
+ LOG (GNUNET_ERROR_TYPE_INFO,
+ "No handler for message of type %d and size %d\n",
+ ntohs (mh->type),
+ ntohs (mh->size));
}
* @param mqm the message to discard
*/
void
-GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *mqm)
+GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *ev)
{
- GNUNET_assert (NULL == mqm->parent_queue);
- GNUNET_free (mqm);
+ GNUNET_assert (NULL == ev->parent_queue);
+ GNUNET_free (ev);
}
unsigned int
GNUNET_MQ_get_length (struct GNUNET_MQ_Handle *mq)
{
- return mq->queue_length;
+ if (GNUNET_YES != mq->in_flight)
+ {
+ return mq->queue_length;
+ }
+ return mq->queue_length - 1;
}
mq->queue_length++;
ev->parent_queue = mq;
/* is the implementation busy? queue it! */
- if (NULL != mq->current_envelope)
+ if ( (NULL != mq->current_envelope) ||
+ (NULL != mq->send_task) )
{
GNUNET_CONTAINER_DLL_insert_tail (mq->envelope_head,
mq->envelope_tail,
}
-
/**
* Task run to call the send implementation for the next queued
* message, if any. Only useful for implementing message queues,
impl_send_continue (void *cls)
{
struct GNUNET_MQ_Handle *mq = cls;
- struct GNUNET_MQ_Envelope *current_envelope;
- mq->continue_task = NULL;
+ mq->send_task = NULL;
/* call is only valid if we're actually currently sending
* a message */
- current_envelope = mq->current_envelope;
- GNUNET_assert (NULL != current_envelope);
- current_envelope->parent_queue = NULL;
+ if (NULL == mq->envelope_head)
+ return;
+ mq->current_envelope = mq->envelope_head;
+ GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
+ mq->envelope_tail,
+ mq->current_envelope);
+ mq->send_impl (mq,
+ mq->current_envelope->mh,
+ mq->impl_state);
+}
+
+
+/**
+ * Call the send implementation for the next queued message, if any.
+ * Only useful for implementing message queues, results in undefined
+ * behavior if not used carefully.
+ *
+ * @param mq message queue to send the next message with
+ */
+void
+GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq)
+{
+ struct GNUNET_MQ_Envelope *current_envelope;
+ GNUNET_SCHEDULER_TaskCallback cb;
+
GNUNET_assert (0 < mq->queue_length);
mq->queue_length--;
- if (NULL == mq->envelope_head)
- {
- mq->current_envelope = NULL;
- }
- else
+ mq->in_flight = GNUNET_NO;
+ current_envelope = mq->current_envelope;
+ current_envelope->parent_queue = NULL;
+ mq->current_envelope = NULL;
+ GNUNET_assert (NULL == mq->send_task);
+ mq->send_task = GNUNET_SCHEDULER_add_now (&impl_send_continue,
+ mq);
+ if (NULL != (cb = current_envelope->sent_cb))
{
- mq->current_envelope = mq->envelope_head;
- GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
- mq->envelope_tail,
- mq->current_envelope);
- mq->send_impl (mq,
- mq->current_envelope->mh,
- mq->impl_state);
+ current_envelope->sent_cb = NULL;
+ cb (current_envelope->sent_cls);
}
- if (NULL != current_envelope->sent_cb)
- current_envelope->sent_cb (current_envelope->sent_cls);
GNUNET_free (current_envelope);
}
/**
- * Call the send implementation for the next queued message, if any.
+ * Call the send notification for the current message, but do not
+ * try to send the next message until #GNUNET_MQ_impl_send_continue
+ * is called.
+ *
* Only useful for implementing message queues, results in undefined
* behavior if not used carefully.
*
* @param mq message queue to send the next message with
*/
void
-GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq)
+GNUNET_MQ_impl_send_in_flight (struct GNUNET_MQ_Handle *mq)
{
- GNUNET_assert (NULL == mq->continue_task);
- mq->continue_task = GNUNET_SCHEDULER_add_now (&impl_send_continue,
- mq);
+ struct GNUNET_MQ_Envelope *current_envelope;
+ GNUNET_SCHEDULER_TaskCallback cb;
+
+ mq->in_flight = GNUNET_YES;
+ /* call is only valid if we're actually currently sending
+ * a message */
+ current_envelope = mq->current_envelope;
+ GNUNET_assert (NULL != current_envelope);
+ /* can't call cancel from now on anymore */
+ current_envelope->parent_queue = NULL;
+ if (NULL != (cb = current_envelope->sent_cb))
+ {
+ current_envelope->sent_cb = NULL;
+ cb (current_envelope->sent_cls);
+ }
}
const struct GNUNET_MessageHeader *
GNUNET_MQ_impl_current (struct GNUNET_MQ_Handle *mq)
{
- if (NULL == mq->current_envelope)
- GNUNET_assert (0);
- if (NULL == mq->current_envelope->mh)
- GNUNET_assert (0);
+ GNUNET_assert (NULL != mq->current_envelope);
+ GNUNET_assert (NULL != mq->current_envelope->mh);
return mq->current_envelope->mh;
}
uint16_t size,
uint16_t type)
{
- struct GNUNET_MQ_Envelope *mqm;
+ struct GNUNET_MQ_Envelope *ev;
- mqm = GNUNET_malloc (sizeof *mqm + size);
- mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1];
- mqm->mh->size = htons (size);
- mqm->mh->type = htons (type);
+ ev = GNUNET_malloc (size + sizeof (struct GNUNET_MQ_Envelope));
+ ev->mh = (struct GNUNET_MessageHeader *) &ev[1];
+ ev->mh->size = htons (size);
+ ev->mh->type = htons (type);
if (NULL != mhp)
- *mhp = mqm->mh;
- return mqm;
+ *mhp = ev->mh;
+ return ev;
}
}
-/**
- * Type of a function to call when we receive a message
- * from the service.
- *
- * @param cls closure
- * @param msg message received, NULL on timeout or fatal error
- */
-static void
-handle_client_message (void *cls,
- const struct GNUNET_MessageHeader *msg)
-{
- struct GNUNET_MQ_Handle *mq = cls;
- struct ClientConnectionState *state;
-
- state = mq->impl_state;
- if (NULL == msg)
- {
- GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ);
- return;
- }
- GNUNET_CLIENT_receive (state->connection,
- &handle_client_message,
- mq,
- GNUNET_TIME_UNIT_FOREVER_REL);
- GNUNET_MQ_inject_message (mq, msg);
-}
-
-
-/**
- * Transmit a queued message to the session's client.
- *
- * @param cls consensus session
- * @param size number of bytes available in @a buf
- * @param buf where the callee should write the message
- * @return number of bytes written to buf
- */
-static size_t
-connection_client_transmit_queued (void *cls,
- size_t size,
- void *buf)
-{
- struct GNUNET_MQ_Handle *mq = cls;
- const struct GNUNET_MessageHeader *msg;
- struct ClientConnectionState *state = mq->impl_state;
- size_t msg_size;
-
- GNUNET_assert (NULL != mq);
- state->th = NULL;
- msg = GNUNET_MQ_impl_current (mq);
-
- if (NULL == buf)
- {
- GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ);
- return 0;
- }
-
- if ( (GNUNET_YES == state->receive_requested) &&
- (GNUNET_NO == state->receive_active) )
- {
- state->receive_active = GNUNET_YES;
- GNUNET_CLIENT_receive (state->connection,
- &handle_client_message,
- mq,
- GNUNET_TIME_UNIT_FOREVER_REL);
- }
-
- msg_size = ntohs (msg->size);
- GNUNET_assert (size >= msg_size);
- GNUNET_memcpy (buf, msg, msg_size);
- state->th = NULL;
-
- GNUNET_MQ_impl_send_continue (mq);
-
- return msg_size;
-}
-
-
-static void
-connection_client_destroy_impl (struct GNUNET_MQ_Handle *mq,
- void *impl_state)
-{
- struct ClientConnectionState *state = impl_state;
-
- if (NULL != state->th)
- {
- GNUNET_CLIENT_notify_transmit_ready_cancel (state->th);
- state->th = NULL;
- }
- GNUNET_CLIENT_disconnect (state->connection);
- GNUNET_free (impl_state);
-}
-
-
-static void
-connection_client_send_impl (struct GNUNET_MQ_Handle *mq,
- const struct GNUNET_MessageHeader *msg,
- void *impl_state)
-{
- struct ClientConnectionState *state = impl_state;
-
- GNUNET_assert (NULL != state);
- GNUNET_assert (NULL == state->th);
- state->th =
- GNUNET_CLIENT_notify_transmit_ready (state->connection,
- ntohs (msg->size),
- GNUNET_TIME_UNIT_FOREVER_REL,
- GNUNET_NO,
- &connection_client_transmit_queued,
- mq);
- GNUNET_assert (NULL != state->th);
-}
-
-
-static void
-connection_client_cancel_impl (struct GNUNET_MQ_Handle *mq,
- void *impl_state)
-{
- struct ClientConnectionState *state = impl_state;
-
- if (NULL != state->th)
- {
- GNUNET_CLIENT_notify_transmit_ready_cancel (state->th);
- state->th = NULL;
- }
- else if (NULL != mq->continue_task)
- {
- GNUNET_SCHEDULER_cancel (mq->continue_task);
- mq->continue_task = NULL;
- }
- else
- GNUNET_assert (0);
-}
-
-
-struct GNUNET_MQ_Handle *
-GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connection,
- const struct GNUNET_MQ_MessageHandler *handlers,
- GNUNET_MQ_ErrorHandler error_handler,
- void *error_handler_cls)
-{
- struct GNUNET_MQ_Handle *mq;
- struct ClientConnectionState *state;
- unsigned int i;
-
- mq = GNUNET_new (struct GNUNET_MQ_Handle);
- if (NULL != handlers)
- {
- for (i=0;NULL != handlers[i].cb; i++) ;
- mq->handlers = GNUNET_new_array (i + 1,
- struct GNUNET_MQ_MessageHandler);
- GNUNET_memcpy (mq->handlers,
- handlers,
- i * sizeof (struct GNUNET_MQ_MessageHandler));
- }
- mq->error_handler = error_handler;
- mq->error_handler_cls = error_handler_cls;
- state = GNUNET_new (struct ClientConnectionState);
- state->connection = connection;
- mq->impl_state = state;
- mq->send_impl = &connection_client_send_impl;
- mq->destroy_impl = &connection_client_destroy_impl;
- mq->cancel_impl = &connection_client_cancel_impl;
- if (NULL != handlers)
- state->receive_requested = GNUNET_YES;
-
- return mq;
-}
-
-
/**
* Associate the assoc_data in mq with a unique request id.
*
* @param cb_cls closure for the callback
*/
void
-GNUNET_MQ_notify_sent (struct GNUNET_MQ_Envelope *mqm,
- GNUNET_MQ_NotifyCallback cb,
+GNUNET_MQ_notify_sent (struct GNUNET_MQ_Envelope *ev,
+ GNUNET_SCHEDULER_TaskCallback cb,
void *cb_cls)
{
- mqm->sent_cb = cb;
- mqm->sent_cls = cb_cls;
+ GNUNET_assert (NULL == ev->sent_cb);
+ ev->sent_cb = cb;
+ ev->sent_cls = cb_cls;
}
{
mq->destroy_impl (mq, mq->impl_state);
}
- if (NULL != mq->continue_task)
+ if (NULL != mq->send_task)
{
- GNUNET_SCHEDULER_cancel (mq->continue_task);
- mq->continue_task = NULL;
+ GNUNET_SCHEDULER_cancel (mq->send_task);
+ mq->send_task = NULL;
}
while (NULL != mq->envelope_head)
{
GNUNET_assert (NULL != mq);
GNUNET_assert (NULL != mq->cancel_impl);
+ mq->evacuate_called = GNUNET_NO;
+
if (mq->current_envelope == ev)
{
// complex case, we already started with transmitting
mq->queue_length--;
}
- ev->parent_queue = NULL;
- ev->mh = NULL;
- GNUNET_free (ev);
+ if (GNUNET_YES != mq->evacuate_called)
+ {
+ ev->parent_queue = NULL;
+ ev->mh = NULL;
+ /* also frees ev */
+ GNUNET_free (ev);
+ }
}
}
+/**
+ * Insert @a env into the envelope DLL starting at @a env_head
+ * Note that @a env must not be in any MQ while this function
+ * is used with DLLs defined outside of the MQ module. This
+ * is just in case some application needs to also manage a
+ * FIFO of envelopes independent of MQ itself and wants to
+ * re-use the pointers internal to @a env. Use with caution.
+ *
+ * @param[in|out] env_head of envelope DLL
+ * @param[in|out] env_tail tail of envelope DLL
+ * @param[in|out] env element to insert at the tail
+ */
+void
+GNUNET_MQ_dll_insert_tail (struct GNUNET_MQ_Envelope **env_head,
+ struct GNUNET_MQ_Envelope **env_tail,
+ struct GNUNET_MQ_Envelope *env)
+{
+ GNUNET_CONTAINER_DLL_insert_tail (*env_head,
+ *env_tail,
+ env);
+}
+
+
+/**
+ * Remove @a env from the envelope DLL starting at @a env_head.
+ * Note that @a env must not be in any MQ while this function
+ * is used with DLLs defined outside of the MQ module. This
+ * is just in case some application needs to also manage a
+ * FIFO of envelopes independent of MQ itself and wants to
+ * re-use the pointers internal to @a env. Use with caution.
+ *
+ * @param[in|out] env_head of envelope DLL
+ * @param[in|out] env_tail tail of envelope DLL
+ * @param[in|out] env element to remove from the DLL
+ */
+void
+GNUNET_MQ_dll_remove (struct GNUNET_MQ_Envelope **env_head,
+ struct GNUNET_MQ_Envelope **env_tail,
+ struct GNUNET_MQ_Envelope *env)
+{
+ GNUNET_CONTAINER_DLL_remove (*env_head,
+ *env_tail,
+ env);
+}
+
+
/* end of mq.c */