From: Florian Dold Date: Sun, 16 Oct 2016 22:07:40 +0000 (+0000) Subject: implement impl_in_flight API for MQ, replacing evacuation X-Git-Tag: initial-import-from-subversion-38251~92 X-Git-Url: https://git.librecmc.org/?a=commitdiff_plain;h=b368ebf988178ed83775a94b604885aa89e25406;p=oweals%2Fgnunet.git implement impl_in_flight API for MQ, replacing evacuation --- diff --git a/src/include/gnunet_mq_lib.h b/src/include/gnunet_mq_lib.h index 95afb6d75..16f1f531a 100644 --- a/src/include/gnunet_mq_lib.h +++ b/src/include/gnunet_mq_lib.h @@ -702,6 +702,9 @@ GNUNET_MQ_inject_error (struct GNUNET_MQ_Handle *mq, /** * Call the send implementation for the next queued message, if any. + * Calls the send notification for the current message unless + * #GNUNET_MQ_impl_send_in_flight was called for this envelope. + * * Only useful for implementing message queues, results in undefined * behavior if not used carefully. * @@ -712,35 +715,17 @@ GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq); /** - * Get the message that should currently be sent. The returned - * message is only valid until #GNUNET_MQ_impl_send_continue is - * called. Fails if there is no current message. Only useful for - * implementing message queues, results in undefined behavior if not - * used carefully. + * Call the send notification for the current message, but do not + * try to send the message until #gnunet_mq_impl_send_continue + * is called. * - * @param mq message queue with the current message, only valid - * until #GNUNET_MQ_impl_send_continue() is called - * @return message to send, never NULL - */ -const struct GNUNET_MessageHeader * -GNUNET_MQ_impl_current (struct GNUNET_MQ_Handle *mq); - - -/** - * Get the message that is currently being sent when cancellation of that - * message is requested. The returned buffer must be freed by the caller. - * - * This function may be called at most once in the cancel_impl - * function of a message queue. - * - * Use this function to avoid copying a half-sent message. + * only useful for implementing message queues, results in undefined + * behavior if not used carefully. * - * @param mq message queue - * @return pointer to store the message being canceled, - * must be freed by the caller + * @param mq message queue to send the next message with */ -struct GNUNET_MessageHeader * -GNUNET_MQ_impl_cancel_evacuate (struct GNUNET_MQ_Handle *mq); +void +GNUNET_MQ_impl_send_in_flight (struct GNUNET_MQ_Handle *mq); /** diff --git a/src/util/mq.c b/src/util/mq.c index da6c0b86f..6d3517dae 100644 --- a/src/util/mq.c +++ b/src/util/mq.c @@ -144,15 +144,22 @@ struct GNUNET_MQ_Handle */ struct GNUNET_MQ_Envelope *current_envelope; + /** + * GNUNET_YES if the sent notification was called + * for the current envelope. + */ + int send_notification_called; + /** * Map of associations, lazily allocated */ struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map; /** - * Task scheduled during #GNUNET_MQ_impl_send_continue. + * Task scheduled during #GNUNET_MQ_impl_send_continue + * or #GNUNET_MQ_impl_send_in_flight */ - struct GNUNET_SCHEDULER_Task *continue_task; + struct GNUNET_SCHEDULER_Task *send_task; /** * Functions to call on queue destruction; kept in a DLL. @@ -344,8 +351,7 @@ void GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *ev) { GNUNET_assert (NULL == ev->parent_queue); - /* also frees ev */ - GNUNET_free (ev->mh); + GNUNET_free (ev); } @@ -421,6 +427,34 @@ GNUNET_MQ_send_copy (struct GNUNET_MQ_Handle *mq, } +/** + * Task run to call the send notification for the next queued + * message, if any. Only useful for implementing message queues, + * results in undefined behavior if not used carefully. + * + * @param cls message queue to send the next message with + */ +static void +impl_send_in_flight (void *cls) +{ + struct GNUNET_MQ_Handle *mq = cls; + struct GNUNET_MQ_Envelope *current_envelope; + + mq->send_task = NULL; + /* call is only valid if we're actually currently sending + * a message */ + current_envelope = mq->current_envelope; + GNUNET_assert (NULL != current_envelope); + /* can't call cancel from now on anymore */ + current_envelope->parent_queue = NULL; + if ( (GNUNET_NO == mq->send_notification_called) && + (NULL != current_envelope->sent_cb) ) + { + current_envelope->sent_cb (current_envelope->sent_cls); + } + mq->send_notification_called = GNUNET_YES; +} + /** * Task run to call the send implementation for the next queued @@ -435,12 +469,12 @@ impl_send_continue (void *cls) struct GNUNET_MQ_Handle *mq = cls; struct GNUNET_MQ_Envelope *current_envelope; - mq->continue_task = NULL; + mq->send_task = NULL; /* call is only valid if we're actually currently sending * a message */ current_envelope = mq->current_envelope; GNUNET_assert (NULL != current_envelope); - current_envelope->parent_queue = NULL; + impl_send_in_flight (mq); GNUNET_assert (0 < mq->queue_length); mq->queue_length--; if (NULL == mq->envelope_head) @@ -453,14 +487,12 @@ impl_send_continue (void *cls) GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, mq->current_envelope); + mq->send_notification_called = GNUNET_NO; mq->send_impl (mq, mq->current_envelope->mh, mq->impl_state); } - if (NULL != current_envelope->sent_cb) - current_envelope->sent_cb (current_envelope->sent_cls); - /* also frees current_envelope */ - GNUNET_free (current_envelope->mh); + GNUNET_free (current_envelope); } @@ -474,9 +506,32 @@ impl_send_continue (void *cls) void GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq) { - GNUNET_assert (NULL == mq->continue_task); - mq->continue_task = GNUNET_SCHEDULER_add_now (&impl_send_continue, - mq); + /* maybe #GNUNET_MQ_impl_send_in_flight was called? */ + if (NULL != mq->send_task) + { + GNUNET_SCHEDULER_cancel (mq->send_task); + } + mq->send_task = GNUNET_SCHEDULER_add_now (&impl_send_continue, + mq); +} + + +/** + * Call the send notification for the current message, but do not + * try to send the message until #gnunet_mq_impl_send_continue + * is called. + * + * only useful for implementing message queues, results in undefined + * behavior if not used carefully. + * + * @param mq message queue to send the next message with + */ +void +GNUNET_MQ_impl_send_in_flight (struct GNUNET_MQ_Handle *mq) +{ + GNUNET_assert (NULL == mq->send_task); + mq->send_task = GNUNET_SCHEDULER_add_now (&impl_send_in_flight, + mq); } @@ -592,11 +647,9 @@ GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t type) { struct GNUNET_MQ_Envelope *ev; - void *mem; - mem = GNUNET_malloc (size + sizeof (struct GNUNET_MQ_Envelope)); - ev = mem + size; - ev->mh = mem; + ev = GNUNET_malloc (size + sizeof (struct GNUNET_MQ_Envelope)); + ev->mh = (struct GNUNET_MessageHeader *) &ev[1]; ev->mh->size = htons (size); ev->mh->type = htons (type); if (NULL != mhp) @@ -867,10 +920,10 @@ connection_client_cancel_impl (struct GNUNET_MQ_Handle *mq, GNUNET_CLIENT_notify_transmit_ready_cancel (state->th); state->th = NULL; } - else if (NULL != mq->continue_task) + else if (NULL != mq->send_task) { - GNUNET_SCHEDULER_cancel (mq->continue_task); - mq->continue_task = NULL; + GNUNET_SCHEDULER_cancel (mq->send_task); + mq->send_task = NULL; } else GNUNET_assert (0); @@ -1028,10 +1081,10 @@ GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq) { mq->destroy_impl (mq, mq->impl_state); } - if (NULL != mq->continue_task) + if (NULL != mq->send_task) { - GNUNET_SCHEDULER_cancel (mq->continue_task); - mq->continue_task = NULL; + GNUNET_SCHEDULER_cancel (mq->send_task); + mq->send_task = NULL; } while (NULL != mq->envelope_head) { @@ -1136,6 +1189,7 @@ GNUNET_MQ_send_cancel (struct GNUNET_MQ_Envelope *ev) GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, mq->current_envelope); + mq->send_notification_called = GNUNET_NO; mq->send_impl (mq, mq->current_envelope->mh, mq->impl_state); @@ -1154,8 +1208,9 @@ GNUNET_MQ_send_cancel (struct GNUNET_MQ_Envelope *ev) if (GNUNET_YES != mq->evacuate_called) { ev->parent_queue = NULL; + ev->mh = NULL; /* also frees ev */ - GNUNET_free (ev->mh); + GNUNET_free (ev); } } @@ -1299,34 +1354,4 @@ GNUNET_MQ_destroy_notify_cancel (struct GNUNET_MQ_DestroyNotificationHandle *dnh } -/** - * Get the message that is currently being sent when cancellation of that - * message is requested. The returned buffer must be freed by the caller. - * - * This function may be called at most once in the cancel_impl - * function of a message queue. - * - * Use this function to avoid copying a half-sent message. - * - * @param mq message queue - * @return pointer to store the message being canceled, - * must be freed by the caller - */ -struct GNUNET_MessageHeader * -GNUNET_MQ_impl_cancel_evacuate (struct GNUNET_MQ_Handle *mq) -{ - struct GNUNET_MessageHeader *mh; - - GNUNET_assert (GNUNET_NO == mq->evacuate_called); - GNUNET_assert (NULL != mq->current_envelope); - - mq->evacuate_called = GNUNET_YES; - mh = mq->current_envelope->mh; - mq->current_envelope->parent_queue = NULL; - mq->current_envelope = NULL; - - return mh; -} - - /* end of mq.c */