X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Futil%2Fserver_nc.c;h=1cf3df8bdc6966b38867b7186eefa283ed45e344;hb=0ea8e006d5f5ef84e31e000607bd24a23f8fc1ed;hp=89e59b7997df4356bee4a82afbf858831fcd89d9;hpb=1e83b104b323d90664e90af7a12166308ba6ee7a;p=oweals%2Fgnunet.git diff --git a/src/util/server_nc.c b/src/util/server_nc.c index 89e59b799..1cf3df8bd 100644 --- a/src/util/server_nc.c +++ b/src/util/server_nc.c @@ -33,29 +33,72 @@ #include "gnunet_time_lib.h" +#define DEBUG_SERVER_NC GNUNET_NO + +/** + * Entry in list of messages pending to be transmitted. + */ struct PendingMessageList { + /** + * This is a linked list. + */ struct PendingMessageList *next; + /** + * Message to transmit (allocated at the end of this + * struct, do not free) + */ const struct GNUNET_MessageHeader *msg; + /** + * Can this message be dropped? + */ int can_drop; }; +/** + * Lists of clients we manage for notifications. + */ struct ClientList { + /** + * This is a linked list. + */ struct ClientList *next; + /** + * Overall context this client belongs to. + */ + struct GNUNET_SERVER_NotificationContext *nc; + + /** + * Handle to the client. + */ struct GNUNET_SERVER_Client *client; + /** + * Handle for pending transmission request to the client (or NULL). + */ struct GNUNET_CONNECTION_TransmitHandle *th; - struct PendingMessageList *pending; + /** + * Head of linked list of requests queued for transmission. + */ + struct PendingMessageList *pending_head; + /** + * Tail of linked list of requests queued for transmission. + */ + struct PendingMessageList *pending_tail; + + /** + * Number of messages currently in the list. + */ unsigned int num_pending; }; @@ -72,15 +115,30 @@ struct ClientList struct GNUNET_SERVER_NotificationContext { + /** + * Server we do notifications for. + */ struct GNUNET_SERVER_Handle *server; + /** + * List of clients receiving notifications. + */ struct ClientList *clients; + /** + * Maximum number of optional messages to queue per client. + */ unsigned int queue_length; }; +/** + * Client has disconnected, clean up. + * + * @param cls our 'struct GNUNET_SERVER_NotificationContext *' + * @param client handle of client that disconnected + */ static void handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) @@ -90,6 +148,11 @@ handle_client_disconnect (void *cls, struct ClientList *prev; struct PendingMessageList *pml; + if (client == NULL) + { + nc->server = NULL; + return; + } prev = NULL; pos = nc->clients; while (NULL != pos) @@ -105,11 +168,17 @@ handle_client_disconnect (void *cls, nc->clients = pos->next; else prev->next = pos->next; - while (NULL != (pml = pos->pending)) + while (NULL != (pml = pos->pending_head)) { - pos->pending = pml->next; + pos->pending_head = pml->next; GNUNET_free (pml); } + GNUNET_SERVER_client_drop (client); + if (pos->th != NULL) + { + GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->th); + pos->th = NULL; + } GNUNET_free (pos); } @@ -153,18 +222,19 @@ GNUNET_SERVER_notification_context_destroy (struct GNUNET_SERVER_NotificationCon while (NULL != (pos = nc->clients)) { nc->clients = pos->next; - GNUNET_SERVER_receive_done (pos->client, GNUNET_NO); GNUNET_SERVER_client_drop (pos->client); - while (NULL != (pml = pos->pending)) + GNUNET_SERVER_receive_done (pos->client, GNUNET_NO); + while (NULL != (pml = pos->pending_head)) { - pos->pending = pml->next; + pos->pending_head = pml->next; GNUNET_free (pml); } GNUNET_free (pos); } - GNUNET_SERVER_disconnect_notify_cancel (nc->server, - &handle_client_disconnect, - nc); + if (nc->server != NULL) + GNUNET_SERVER_disconnect_notify_cancel (nc->server, + &handle_client_disconnect, + nc); GNUNET_free (nc); } @@ -179,9 +249,136 @@ void GNUNET_SERVER_notification_context_add (struct GNUNET_SERVER_NotificationContext *nc, struct GNUNET_SERVER_Client *client) { + struct ClientList *cl; + + cl = GNUNET_malloc (sizeof (struct ClientList)); + cl->next = nc->clients; + cl->nc = nc; + cl->client = client; + GNUNET_SERVER_client_keep (client); + nc->clients = cl; +} + + +/** + * Function called to notify a client about the socket begin ready to + * queue more data. "buf" will be NULL and "size" zero if the socket + * was closed for writing in the meantime. + * + * @param cls the 'struct ClientList *' + * @param size number of bytes available in buf + * @param buf where the callee should write the message + * @return number of bytes written to buf + */ +static size_t +transmit_message (void *cls, + size_t size, + void *buf) +{ + struct ClientList *cl = cls; + char *cbuf = buf; + struct PendingMessageList *pml; + uint16_t msize; + size_t ret; + + cl->th = NULL; + if (buf == NULL) + { + /* 'cl' should be freed via disconnect notification shortly */ + return 0; + } + ret = 0; + while (cl->pending_head != NULL) + { + pml = cl->pending_head; + msize = ntohs (pml->msg->size); + if (size < msize) + break; + cl->pending_head = pml->next; + if (pml->next == NULL) + cl->pending_tail = NULL; +#if DEBUG_SERVER_NC + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Copying message of type %u and size %u from pending queue to transmission buffer\n", + ntohs (pml->msg->type), + msize); +#endif + memcpy (&cbuf[ret], pml->msg, msize); + ret += msize; + size -= msize; + GNUNET_free (pml); + cl->num_pending--; + } + if (cl->pending_head != NULL) + cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client, + ntohs (cl->pending_head->msg->size), + GNUNET_TIME_UNIT_FOREVER_REL, + &transmit_message, + cl); + return ret; } +/** + * Send a message to a particular client. + * + * @param nc context to modify + * @param client client to transmit to + * @param msg message to send + * @param can_drop can this message be dropped due to queue length limitations + */ +static void +do_unicast (struct GNUNET_SERVER_NotificationContext *nc, + struct ClientList *client, + const struct GNUNET_MessageHeader *msg, + int can_drop) +{ + struct PendingMessageList *pml; + uint16_t size; + + if ( (client->num_pending > nc->queue_length) && + (GNUNET_YES == can_drop) ) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Dropping message of type %u and size %u due to full queue (%u entries)\n", + ntohs (msg->type), + ntohs (msg->size), + (unsigned int) nc->queue_length); + return; /* drop! */ + } + if (client->num_pending > nc->queue_length) + { + /* FIXME: consider checking for other messages in the + queue that are 'droppable' */ + } + client->num_pending++; + size = ntohs (msg->size); + pml = GNUNET_malloc (sizeof (struct PendingMessageList) + size); + pml->msg = (const struct GNUNET_MessageHeader*) &pml[1]; + pml->can_drop = can_drop; +#if DEBUG_SERVER_NC + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Adding message of type %u and size %u to pending queue (which has %u entries)\n", + ntohs (msg->type), + ntohs (msg->size), + (unsigned int) nc->queue_length); +#endif + memcpy (&pml[1], msg, size); + /* append */ + if (client->pending_tail != NULL) + client->pending_tail->next = pml; + else + client->pending_head = pml; + client->pending_tail = pml; + if (client->th == NULL) + client->th = GNUNET_SERVER_notify_transmit_ready (client->client, + ntohs (client->pending_head->msg->size), + GNUNET_TIME_UNIT_FOREVER_REL, + &transmit_message, + client); +} + + /** * Send a message to a particular client; must have * already been added to the notification context. @@ -197,6 +394,17 @@ GNUNET_SERVER_notification_context_unicast (struct GNUNET_SERVER_NotificationCon const struct GNUNET_MessageHeader *msg, int can_drop) { + struct ClientList *pos; + + pos = nc->clients; + while (NULL != pos) + { + if (pos->client == client) + break; + pos = pos->next; + } + GNUNET_assert (pos != NULL); + do_unicast (nc, pos, msg, can_drop); } @@ -212,6 +420,14 @@ GNUNET_SERVER_notification_context_broadcast (struct GNUNET_SERVER_NotificationC const struct GNUNET_MessageHeader *msg, int can_drop) { + struct ClientList *pos; + + pos = nc->clients; + while (NULL != pos) + { + do_unicast (nc, pos, msg, can_drop); + pos = pos->next; + } }