From 0bcccf6577555ed605e12a02ee09ffa8595b8a51 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Fri, 22 Jan 2010 14:20:31 +0000 Subject: [PATCH] finishing nc implementation --- src/util/server_nc.c | 137 +++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 132 insertions(+), 5 deletions(-) diff --git a/src/util/server_nc.c b/src/util/server_nc.c index 89e59b799..e9c2f5a38 100644 --- a/src/util/server_nc.c +++ b/src/util/server_nc.c @@ -50,11 +50,15 @@ struct ClientList struct ClientList *next; + struct GNUNET_SERVER_NotificationContext *nc; + struct GNUNET_SERVER_Client *client; struct GNUNET_CONNECTION_TransmitHandle *th; - struct PendingMessageList *pending; + struct PendingMessageList *pending_head; + + struct PendingMessageList *pending_tail; unsigned int num_pending; @@ -105,9 +109,9 @@ handle_client_disconnect (void *cls, nc->clients = pos->next; else prev->next = pos->next; - while (NULL != (pml = pos->pending)) + while (NULL != (pml = pos->pending_head)) { - pos->pending = pml->next; + pos->pending_head = pml->next; GNUNET_free (pml); } GNUNET_free (pos); @@ -155,9 +159,9 @@ GNUNET_SERVER_notification_context_destroy (struct GNUNET_SERVER_NotificationCon nc->clients = pos->next; GNUNET_SERVER_receive_done (pos->client, GNUNET_NO); GNUNET_SERVER_client_drop (pos->client); - while (NULL != (pml = pos->pending)) + while (NULL != (pml = pos->pending_head)) { - pos->pending = pml->next; + pos->pending_head = pml->next; GNUNET_free (pml); } GNUNET_free (pos); @@ -179,9 +183,113 @@ void GNUNET_SERVER_notification_context_add (struct GNUNET_SERVER_NotificationContext *nc, struct GNUNET_SERVER_Client *client) { + struct ClientList *cl; + + cl = GNUNET_malloc (sizeof (struct ClientList)); + cl->next = nc->clients; + cl->nc = nc; + cl->client = client; + nc->clients = cl; } +/** + * Function called to notify a client about the socket begin ready to + * queue more data. "buf" will be NULL and "size" zero if the socket + * was closed for writing in the meantime. + * + * @param cls the 'struct ClientList *' + * @param size number of bytes available in buf + * @param buf where the callee should write the message + * @return number of bytes written to buf + */ +static size_t +transmit_message (void *cls, + size_t size, + void *buf) +{ + struct ClientList *cl = cls; + char *cbuf = buf; + struct PendingMessageList *pml; + uint16_t msize; + size_t ret; + + cl->th = NULL; + if (buf == NULL) + { + /* 'cl' should be freed via disconnect notification shortly */ + return 0; + } + ret = 0; + while (cl->pending_head != NULL) + { + pml = cl->pending_head; + cl->pending_head = pml->next; + if (pml->next == NULL) + cl->pending_tail = NULL; + msize = ntohs (pml->msg->size); + if (size < msize) + break; + memcpy (&cbuf[ret], pml->msg, msize); + ret += msize; + size -= msize; + GNUNET_free (pml); + } + if (cl->pending_head != NULL) + cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client, + ntohs (cl->pending_head->msg->size), + GNUNET_TIME_UNIT_FOREVER_REL, + &transmit_message, + cl); + return ret; +} + + +/** + * Send a message to a particular client. + * + * @param nc context to modify + * @param client client to transmit to + * @param msg message to send + * @param can_drop can this message be dropped due to queue length limitations + */ +static void +do_unicast (struct GNUNET_SERVER_NotificationContext *nc, + struct ClientList *client, + const struct GNUNET_MessageHeader *msg, + int can_drop) +{ + struct PendingMessageList *pml; + uint16_t size; + + if ( (client->num_pending > nc->queue_length) && + (GNUNET_YES == can_drop) ) + return; /* drop! */ + if (client->num_pending > nc->queue_length) + { + /* FIXME: consider checking for other messages in the + queue that are 'droppable' */ + } + size = ntohs (msg->size); + pml = GNUNET_malloc (sizeof (struct PendingMessageList) + size); + pml->msg = (const struct GNUNET_MessageHeader*) &pml[1]; + pml->can_drop = can_drop; + memcpy (&pml[1], msg, size); + /* append */ + if (client->pending_tail != NULL) + client->pending_tail->next = pml; + else + client->pending_head = pml; + client->pending_tail = pml; + if (client->th == NULL) + client->th = GNUNET_SERVER_notify_transmit_ready (client->client, + ntohs (client->pending_head->msg->size), + GNUNET_TIME_UNIT_FOREVER_REL, + &transmit_message, + client); +} + + /** * Send a message to a particular client; must have * already been added to the notification context. @@ -197,6 +305,17 @@ GNUNET_SERVER_notification_context_unicast (struct GNUNET_SERVER_NotificationCon const struct GNUNET_MessageHeader *msg, int can_drop) { + struct ClientList *pos; + + pos = nc->clients; + while (NULL != pos) + { + if (pos->client == client) + break; + pos = pos->next; + } + GNUNET_assert (pos != NULL); + do_unicast (nc, pos, msg, can_drop); } @@ -212,6 +331,14 @@ GNUNET_SERVER_notification_context_broadcast (struct GNUNET_SERVER_NotificationC const struct GNUNET_MessageHeader *msg, int can_drop) { + struct ClientList *pos; + + pos = nc->clients; + while (NULL != pos) + { + do_unicast (nc, pos, msg, can_drop); + pos = pos->next; + } } -- 2.25.1