};
+
/**
- * Linked list of messages to be transmitted to
- * the client. Each entry is followed by the
- * actual message.
+ * Linked list of messages to be transmitted to the client. Each
+ * entry is followed by the actual message.
*/
struct ClientMessageQueueEntry
{
/**
- * This is a linked list.
+ * This is a doubly-linked list.
*/
struct ClientMessageQueueEntry *next;
+
+ /**
+ * This is a doubly-linked list.
+ */
+ struct ClientMessageQueueEntry *prev;
};
*/
struct ClientMessageQueueEntry *message_queue_tail;
+ /**
+ * Current transmit request handle.
+ */
+ struct GNUNET_CONNECTION_TransmitHandle *th;
+
/**
* Is a call to "transmit_send_continuation" pending? If so, we
* must not free this struct (even if the corresponding client
uint16_t msize;
size_t tsize;
const struct GNUNET_MessageHeader *msg;
- struct GNUNET_CONNECTION_TransmitHandle *th;
char *cbuf;
+ client->th = NULL;
if (buf == NULL)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
/* fatal error with client, free message queue! */
while (NULL != (q = client->message_queue_head))
{
- client->message_queue_head = q->next;
+ GNUNET_CONTAINER_DLL_remove (client->message_queue_head,
+ client->message_queue_tail,
+ q);
GNUNET_free (q);
}
- client->message_queue_tail = NULL;
client->message_count = 0;
return 0;
}
"Transmitting message of type %u to client.\n",
ntohs (msg->type));
#endif
- client->message_queue_head = q->next;
- if (q->next == NULL)
- client->message_queue_tail = NULL;
+ GNUNET_CONTAINER_DLL_remove (client->message_queue_head,
+ client->message_queue_tail,
+ q);
memcpy (&cbuf[tsize], msg, msize);
tsize += msize;
GNUNET_free (q);
if (NULL != q)
{
GNUNET_assert (msize >= sizeof (struct GNUNET_MessageHeader));
- th = GNUNET_SERVER_notify_transmit_ready (client->client,
- msize,
- GNUNET_TIME_UNIT_FOREVER_REL,
- &transmit_to_client_callback,
- client);
- GNUNET_assert (th != NULL);
+ client->th = GNUNET_SERVER_notify_transmit_ready (client->client,
+ msize,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &transmit_to_client_callback,
+ client);
+ GNUNET_assert (client->th != NULL);
}
return tsize;
}
{
struct ClientMessageQueueEntry *q;
uint16_t msize;
- struct GNUNET_CONNECTION_TransmitHandle *th;
if ((client->message_count >= MAX_PENDING) && (GNUNET_YES == may_drop))
{
/* TODO: call to statistics... */
return;
}
- client->message_count++;
msize = ntohs (msg->size);
GNUNET_assert (msize >= sizeof (struct GNUNET_MessageHeader));
q = GNUNET_malloc (sizeof (struct ClientMessageQueueEntry) + msize);
memcpy (&q[1], msg, msize);
- /* append to message queue */
- if (client->message_queue_tail == NULL)
- {
- client->message_queue_tail = q;
- }
- else
- {
- client->message_queue_tail->next = q;
- client->message_queue_tail = q;
- }
- if (client->message_queue_head == NULL)
+ GNUNET_CONTAINER_DLL_insert_after (client->message_queue_head,
+ client->message_queue_tail,
+ client->message_queue_tail,
+ q);
+ client->message_count++;
+ if (client->th == NULL)
{
- client->message_queue_head = q;
- th = GNUNET_SERVER_notify_transmit_ready (client->client,
- msize,
- GNUNET_TIME_UNIT_FOREVER_REL,
- &transmit_to_client_callback,
- client);
- GNUNET_assert (th != NULL);
+ client->th = GNUNET_SERVER_notify_transmit_ready (client->client,
+ msize,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &transmit_to_client_callback,
+ client);
+ GNUNET_assert (client->th != NULL);
}
}
return;
while (NULL != (mqe = pos->message_queue_head))
{
- pos->message_queue_head = mqe->next;
+ GNUNET_CONTAINER_DLL_remove (pos->message_queue_head,
+ pos->message_queue_tail,
+ mqe);
+ pos->message_count--;
GNUNET_free (mqe);
}
- pos->message_queue_head = NULL;
if (prev == NULL)
clients = pos->next;
else
pos->client = NULL;
return;
}
+ if (pos->th != NULL)
+ {
+ GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->th);
+ pos->th = NULL;
+ }
+ GNUNET_break (0 == pos->message_count);
GNUNET_free (pos);
}