}
+/**
+ * A message from the handler's message queue to a neighbour was
+ * transmitted. Now trigger (possibly delayed) notification of the
+ * neighbour's message queue that we are done and thus ready for
+ * the next message.
+ *
+ * @param cls the `struct Neighbour` where the message was sent
+ */
+static void
+notify_send_done_fin (void *cls)
+{
+ struct Neighbour *n = cls;
+
+ n->timeout_task = NULL;
+ n->is_ready = GNUNET_YES;
+ GNUNET_MQ_impl_send_continue (n->mq);
+}
+
+
/**
* A message from the handler's message queue to a neighbour was
* transmitted. Now trigger (possibly delayed) notification of the
{
GNUNET_BANDWIDTH_tracker_consume (&n->out_tracker,
n->env_size + n->traffic_overhead);
- n->traffic_overhead = 0;
n->env = NULL;
+ n->traffic_overhead = 0;
}
delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker,
128);
GNUNET_MQ_impl_send_continue (n->mq);
return;
}
+ GNUNET_MQ_impl_send_in_flight (n->mq);
/* cannot send even a small message without violating
- quota, wait a before notifying MQ */
+ quota, wait a before allowing MQ to send next message */
n->timeout_task = GNUNET_SCHEDULER_add_delayed (delay,
- ¬ify_send_done,
+ ¬ify_send_done_fin,
n);
}
GNUNET_MQ_impl_send_continue (mq);
return;
}
+ GNUNET_assert (NULL == n->env);
n->env = GNUNET_MQ_msg_nested_mh (obm,
GNUNET_MESSAGE_TYPE_TRANSPORT_SEND,
msg);
static void
connect_fail_continuation (struct ClientState *cstate)
{
- LOG (GNUNET_ERROR_TYPE_INFO,
- "Failed to establish TCP connection to `%s:%u', no further addresses to try.\n",
- cstate->hostname,
- cstate->port);
+ LOG (GNUNET_ERROR_TYPE_WARNING,
+ "Failed to establish connection to `%s', no further addresses to try.\n",
+ cstate->service_name);
GNUNET_break (NULL == cstate->ap_head);
GNUNET_break (NULL == cstate->ap_tail);
GNUNET_break (NULL == cstate->dns_active);
ssize_t ret;
size_t len;
const char *pos;
+ int notify_in_flight;
cstate->send_task = NULL;
pos = (const char *) cstate->msg;
GNUNET_MQ_ERROR_WRITE);
return;
}
- if (0 == cstate->msg_off)
- {
- GNUNET_MQ_impl_send_in_flight (cstate->mq);
- }
+ notify_in_flight = (0 == cstate->msg_off);
cstate->msg_off += ret;
if (cstate->msg_off < len)
{
cstate->sock,
&transmit_ready,
cstate);
+ if (notify_in_flight)
+ GNUNET_MQ_impl_send_in_flight (cstate->mq);
return;
}
cstate->msg = NULL;
{
/* defer destruction */
cstate->in_destroy = GNUNET_YES;
+ cstate->mq = NULL;
return;
}
if (NULL != cstate->dns_active)
GNUNET_NO);
if (GNUNET_SYSERR == ret)
{
- GNUNET_MQ_inject_error (cstate->mq,
- GNUNET_MQ_ERROR_READ);
+ if (NULL != cstate->mq)
+ GNUNET_MQ_inject_error (cstate->mq,
+ GNUNET_MQ_ERROR_READ);
+ if (GNUNET_YES == cstate->in_destroy)
+ connection_client_destroy_impl (cstate->mq,
+ cstate);
return;
}
if (GNUNET_YES == cstate->in_destroy)
#endif
if ( (0 == (cstate->attempts++ % 2)) ||
- (0 == cstate->port) )
+ (0 == cstate->port) ||
+ (NULL == cstate->hostname) )
{
- /* on even rounds, try UNIX first */
+ /* on even rounds, try UNIX first, or always
+ if we do not have a DNS name and TCP port. */
cstate->sock = try_unixpath (cstate->service_name,
cstate->cfg);
if (NULL != cstate->sock)
{
connect_success_continuation (cstate);
return;
- }
+ }
+ }
+ if ( (NULL == cstate->hostname) ||
+ (0 == cstate->port) )
+ {
+ /* All options failed. Boo! */
+ connect_fail_continuation (cstate);
+ return;
}
cstate->dns_active
= GNUNET_RESOLVER_ip_get (cstate->hostname,
* @return the message queue, NULL on error
*/
struct GNUNET_MQ_Handle *
-GNUNET_CLIENT_connecT2 (const struct GNUNET_CONFIGURATION_Handle *cfg,
- const char *service_name,
- const struct GNUNET_MQ_MessageHandler *handlers,
- GNUNET_MQ_ErrorHandler error_handler,
- void *error_handler_cls)
+GNUNET_CLIENT_connecT (const struct GNUNET_CONFIGURATION_Handle *cfg,
+ const char *service_name,
+ const struct GNUNET_MQ_MessageHandler *handlers,
+ GNUNET_MQ_ErrorHandler error_handler,
+ void *error_handler_cls)
{
struct ClientState *cstate;
*/
void *error_handler_cls;
+ /**
+ * Task to asynchronously run #impl_send_continue().
+ */
+ struct GNUNET_SCHEDULER_Task *send_task;
+
/**
* Linked list of messages pending to be sent
*/
*/
struct GNUNET_MQ_Envelope *current_envelope;
- /**
- * GNUNET_YES if the sent notification was called
- * for the current envelope.
- */
- int send_notification_called;
-
/**
* Map of associations, lazily allocated
*/
struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map;
- /**
- * Task scheduled during #GNUNET_MQ_impl_send_continue
- * or #GNUNET_MQ_impl_send_in_flight
- */
- struct GNUNET_SCHEDULER_Task *send_task;
-
/**
* Functions to call on queue destruction; kept in a DLL.
*/
unsigned int queue_length;
/**
- * GNUNET_YES if GNUNET_MQ_impl_evacuate was called.
+ * #GNUNET_YES if GNUNET_MQ_impl_evacuate was called.
+ * FIXME: is this dead?
*/
int evacuate_called;
+
+ /**
+ * #GNUNET_YES if GNUNET_MQ_impl_send_in_flight() was called.
+ */
+ int in_flight;
};
unsigned int
GNUNET_MQ_get_length (struct GNUNET_MQ_Handle *mq)
{
- return mq->queue_length;
+ return mq->queue_length - (GNUNET_YES == mq->in_flight) ? 1 : 0;
}
mq->queue_length++;
ev->parent_queue = mq;
/* is the implementation busy? queue it! */
- if (NULL != mq->current_envelope)
+ if ( (NULL != mq->current_envelope) ||
+ (NULL != mq->send_task) )
{
GNUNET_CONTAINER_DLL_insert_tail (mq->envelope_head,
mq->envelope_tail,
}
-/**
- * Task run to call the send notification for the next queued
- * message, if any. Only useful for implementing message queues,
- * results in undefined behavior if not used carefully.
- *
- * @param cls message queue to send the next message with
- */
-static void
-impl_send_in_flight (void *cls)
-{
- struct GNUNET_MQ_Handle *mq = cls;
- struct GNUNET_MQ_Envelope *current_envelope;
-
- mq->send_task = NULL;
- /* call is only valid if we're actually currently sending
- * a message */
- current_envelope = mq->current_envelope;
- GNUNET_assert (NULL != current_envelope);
- /* can't call cancel from now on anymore */
- current_envelope->parent_queue = NULL;
- if ( (GNUNET_NO == mq->send_notification_called) &&
- (NULL != current_envelope->sent_cb) )
- {
- current_envelope->sent_cb (current_envelope->sent_cls);
- }
- mq->send_notification_called = GNUNET_YES;
-}
-
-
/**
* Task run to call the send implementation for the next queued
* message, if any. Only useful for implementing message queues,
impl_send_continue (void *cls)
{
struct GNUNET_MQ_Handle *mq = cls;
- struct GNUNET_MQ_Envelope *current_envelope;
-
+
mq->send_task = NULL;
/* call is only valid if we're actually currently sending
* a message */
- current_envelope = mq->current_envelope;
- GNUNET_assert (NULL != current_envelope);
- impl_send_in_flight (mq);
- GNUNET_assert (0 < mq->queue_length);
- mq->queue_length--;
if (NULL == mq->envelope_head)
- {
- mq->current_envelope = NULL;
- }
- else
- {
- mq->current_envelope = mq->envelope_head;
- GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
- mq->envelope_tail,
- mq->current_envelope);
- mq->send_notification_called = GNUNET_NO;
- mq->send_impl (mq,
- mq->current_envelope->mh,
- mq->impl_state);
- }
- GNUNET_free (current_envelope);
+ return;
+ mq->current_envelope = mq->envelope_head;
+ GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
+ mq->envelope_tail,
+ mq->current_envelope);
+ mq->send_impl (mq,
+ mq->current_envelope->mh,
+ mq->impl_state);
}
void
GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq)
{
- /* maybe #GNUNET_MQ_impl_send_in_flight was called? */
- if (NULL != mq->send_task)
- {
- GNUNET_SCHEDULER_cancel (mq->send_task);
- }
+ struct GNUNET_MQ_Envelope *current_envelope;
+ GNUNET_MQ_NotifyCallback cb;
+
+ GNUNET_assert (0 < mq->queue_length);
+ mq->queue_length--;
+ current_envelope = mq->current_envelope;
+ current_envelope->parent_queue = NULL;
+ mq->current_envelope = NULL;
+ GNUNET_assert (NULL == mq->send_task);
mq->send_task = GNUNET_SCHEDULER_add_now (&impl_send_continue,
- mq);
+ mq);
+ if (NULL != (cb = current_envelope->sent_cb))
+ {
+ current_envelope->sent_cb = NULL;
+ cb (current_envelope->sent_cls);
+ }
+ GNUNET_free (current_envelope);
}
/**
* Call the send notification for the current message, but do not
- * try to send the next message until #gnunet_mq_impl_send_continue
+ * try to send the next message until #GNUNET_MQ_impl_send_continue
* is called.
*
- * only useful for implementing message queues, results in undefined
+ * Only useful for implementing message queues, results in undefined
* behavior if not used carefully.
*
* @param mq message queue to send the next message with
void
GNUNET_MQ_impl_send_in_flight (struct GNUNET_MQ_Handle *mq)
{
- GNUNET_assert (NULL == mq->send_task);
- mq->send_task = GNUNET_SCHEDULER_add_now (&impl_send_in_flight,
- mq);
+ struct GNUNET_MQ_Envelope *current_envelope;
+ GNUNET_MQ_NotifyCallback cb;
+
+ mq->in_flight = GNUNET_YES;
+ /* call is only valid if we're actually currently sending
+ * a message */
+ current_envelope = mq->current_envelope;
+ GNUNET_assert (NULL != current_envelope);
+ /* can't call cancel from now on anymore */
+ current_envelope->parent_queue = NULL;
+ if (NULL != (cb = current_envelope->sent_cb))
+ {
+ current_envelope->sent_cb = NULL;
+ cb (current_envelope->sent_cls);
+ }
}
GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
mq->envelope_tail,
mq->current_envelope);
- mq->send_notification_called = GNUNET_NO;
mq->send_impl (mq,
mq->current_envelope->mh,
mq->impl_state);