From: Matthias Wachs Date: Fri, 24 Sep 2010 12:40:31 +0000 (+0000) Subject: Introduced limited per connection queue size X-Git-Tag: initial-import-from-subversion-38251~20255 X-Git-Url: https://git.librecmc.org/?a=commitdiff_plain;h=85588b1effe525b6de67d2cb3a6bc24424a1e3f7;p=oweals%2Fgnunet.git Introduced limited per connection queue size --- diff --git a/src/transport/plugin_transport_http.c b/src/transport/plugin_transport_http.c index 4389bbefc..0238746bc 100644 --- a/src/transport/plugin_transport_http.c +++ b/src/transport/plugin_transport_http.c @@ -316,6 +316,17 @@ struct Session * inbound session: mhd_connection * */ void * recv_endpoint; + + /** + * Current queue size + */ + size_t queue_length_cur; + + /** + * Max queue size + */ + size_t queue_length_max; + }; /** @@ -503,8 +514,8 @@ static int remove_http_message (struct Session * ps, struct HTTP_Message * msg) /** * Iterator to remove peer context * @param cls the plugin - * @key the peers public key hashcode - * @value the peer context + * @param key the peers public key hashcode + * @param value the peer context * @return GNUNET_YES on success */ int remove_peer_context_Iterator (void *cls, const GNUNET_HashCode *key, void *value) @@ -858,6 +869,7 @@ mhd_send_callback (void *cls, uint64_t pos, char *buf, size_t max) { if (NULL!=msg->transmit_cont) msg->transmit_cont (msg->transmit_cont_cls,&pc->identity,GNUNET_OK); + ps->queue_length_cur -= msg->size; remove_http_message(ps,msg); } } @@ -1017,6 +1029,8 @@ mdh_access_cb (void *cls, ps->recv_active=GNUNET_NO; ps->peercontext=pc; ps->session_id =id_num; + ps->queue_length_cur = 0; + ps->queue_length_max = GNUNET_SERVER_MAX_MESSAGE_SIZE; ps->url = create_url (plugin, ps->addr, ps->addrlen, ps->session_id); GNUNET_CONTAINER_DLL_insert(pc->head,pc->tail,ps); GNUNET_STATISTICS_update (plugin->env->stats, @@ -1436,6 +1450,7 @@ static size_t curl_send_cb(void *stream, size_t size, size_t nmemb, void *ptr) /* Calling transmit continuation */ if (NULL != ps->pending_msgs_tail->transmit_cont) msg->transmit_cont (ps->pending_msgs_tail->transmit_cont_cls,&(ps->peercontext)->identity,GNUNET_OK); + ps->queue_length_cur -= msg->size; remove_http_message(ps, msg); } return bytes_sent; @@ -1533,9 +1548,14 @@ static void curl_handle_finished (struct Plugin *plugin) curl_multi_remove_handle(plugin->multi_handle,ps->send_endpoint); //curl_easy_cleanup(ps->send_endpoint); //ps->send_endpoint=NULL; - cur_msg = ps->pending_msgs_tail; - if (( NULL != cur_msg) && ( NULL != cur_msg->transmit_cont)) - cur_msg->transmit_cont (cur_msg->transmit_cont_cls,&pc->identity,GNUNET_SYSERR); + while (ps->pending_msgs_tail != NULL) + { + cur_msg = ps->pending_msgs_tail; + if ( NULL != cur_msg->transmit_cont) + cur_msg->transmit_cont (cur_msg->transmit_cont_cls,&pc->identity,GNUNET_SYSERR); + ps->queue_length_cur -= cur_msg->size; + remove_http_message(ps,cur_msg); + } } /* GET connection failed */ if (msg->easy_handle == ps->recv_endpoint) @@ -1570,19 +1590,25 @@ static void curl_handle_finished (struct Plugin *plugin) http_result); #endif /* Calling transmit continuation */ - cur_msg = ps->pending_msgs_tail; - if (( NULL != cur_msg) && (NULL != cur_msg->transmit_cont)) + while (ps->pending_msgs_tail != NULL) { - /* HTTP 1xx : Last message before here was informational */ - if ((http_result >=100) && (http_result < 200)) - cur_msg->transmit_cont (cur_msg->transmit_cont_cls,&pc->identity,GNUNET_OK); - /* HTTP 2xx: successful operations */ - if ((http_result >=200) && (http_result < 300)) - cur_msg->transmit_cont (cur_msg->transmit_cont_cls,&pc->identity,GNUNET_OK); - /* HTTP 3xx..5xx: error */ - if ((http_result >=300) && (http_result < 600)) - cur_msg->transmit_cont (cur_msg->transmit_cont_cls,&pc->identity,GNUNET_SYSERR); + cur_msg = ps->pending_msgs_tail; + if ( NULL != cur_msg->transmit_cont) + { + /* HTTP 1xx : Last message before here was informational */ + if ((http_result >=100) && (http_result < 200)) + cur_msg->transmit_cont (cur_msg->transmit_cont_cls,&pc->identity,GNUNET_OK); + /* HTTP 2xx: successful operations */ + if ((http_result >=200) && (http_result < 300)) + cur_msg->transmit_cont (cur_msg->transmit_cont_cls,&pc->identity,GNUNET_OK); + /* HTTP 3xx..5xx: error */ + if ((http_result >=300) && (http_result < 600)) + cur_msg->transmit_cont (cur_msg->transmit_cont_cls,&pc->identity,GNUNET_SYSERR); + } + ps->queue_length_cur -= cur_msg->size; + remove_http_message(ps,cur_msg); } + ps->send_connected = GNUNET_NO; ps->send_active = GNUNET_NO; curl_multi_remove_handle(plugin->multi_handle,ps->send_endpoint); @@ -1651,7 +1677,7 @@ static void curl_perform (void *cls, /** * Function setting up file descriptors and scheduling task to run * - * @param cls plugin as closure + * @param plugin plugin as closure * @return GNUNET_SYSERR for hard failure, GNUNET_OK for ok */ static int curl_schedule(struct Plugin *plugin) @@ -1925,7 +1951,6 @@ static int send_check_connections (struct Plugin *plugin, struct Session *ps) /** * select best session to transmit data to peer * - * @param cls closure * @param pc peer context of target peer * @param addr address of target peer * @param addrlen address length @@ -2168,6 +2193,8 @@ http_plugin_send (void *cls, ps->pending_msgs_tail = NULL; ps->peercontext=pc; ps->session_id = pc->session_id_counter; + ps->queue_length_cur = 0; + ps->queue_length_max = GNUNET_SERVER_MAX_MESSAGE_SIZE; pc->session_id_counter++; ps->url = create_url (plugin, ps->addr, ps->addrlen, ps->session_id); if (ps->msgtok == NULL) @@ -2187,18 +2214,26 @@ http_plugin_send (void *cls, } } - /* create msg */ - msg = GNUNET_malloc (sizeof (struct HTTP_Message) + msgbuf_size); - msg->next = NULL; - msg->size = msgbuf_size; - msg->pos = 0; - msg->buf = (char *) &msg[1]; - msg->transmit_cont = cont; - msg->transmit_cont_cls = cont_cls; - memcpy (msg->buf,msgbuf, msgbuf_size); - GNUNET_CONTAINER_DLL_insert(ps->pending_msgs_head,ps->pending_msgs_tail,msg); - - if (send_check_connections (plugin, ps) == GNUNET_SYSERR) + if (msgbuf_size >= (ps->queue_length_max - ps->queue_length_cur)) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR,"Queue %X full: (%u) bytes in queue, would discard message (%u)\n", ps, (ps->queue_length_max - ps->queue_length_cur), msgbuf_size); + //return GNUNET_SYSERR; + } + + /* create msg */ + msg = GNUNET_malloc (sizeof (struct HTTP_Message) + msgbuf_size); + msg->next = NULL; + msg->size = msgbuf_size; + msg->pos = 0; + msg->buf = (char *) &msg[1]; + msg->transmit_cont = cont; + msg->transmit_cont_cls = cont_cls; + memcpy (msg->buf,msgbuf, msgbuf_size); + + GNUNET_CONTAINER_DLL_insert(ps->pending_msgs_head,ps->pending_msgs_tail,msg); + ps->queue_length_cur += msgbuf_size; + + if (send_check_connections (plugin, ps) == GNUNET_SYSERR) return GNUNET_SYSERR; if (force_address != GNUNET_YES) pc->last_session = ps;