From 53f7395bed9850cab4c1ec98a565f190e138dbb2 Mon Sep 17 00:00:00 2001 From: Matthias Wachs Date: Mon, 5 Jul 2010 12:24:56 +0000 Subject: [PATCH] --- src/transport/plugin_transport_http.c | 273 +++++++++++++++----------- 1 file changed, 155 insertions(+), 118 deletions(-) diff --git a/src/transport/plugin_transport_http.c b/src/transport/plugin_transport_http.c index 7a9e788a9..db846094c 100644 --- a/src/transport/plugin_transport_http.c +++ b/src/transport/plugin_transport_http.c @@ -43,6 +43,9 @@ #define DEBUG_CURL GNUNET_YES #define DEBUG_HTTP GNUNET_NO +#define INBOUND GNUNET_NO +#define OUTBOUND GNUNET_YES + /** * Text of the response sent back after the last bytes of a PUT * request have been received (just to formally obey the HTTP @@ -198,15 +201,20 @@ struct HTTP_Session */ unsigned int direction; + unsigned int send_connected; + unsigned int send_active; + unsigned int recv_connected; + unsigned int recv_active; + /** * entity managing sending data */ - void * send_connection_endpoint; + void * send_endpoint; /** * entity managing recieving data */ - void * recieve_connection_endpoint; + void * recv_endpoint; }; @@ -505,9 +513,9 @@ static char * create_url(void * cls, const void * addr, size_t addrlen) * @return GNUNET_SYSERR if msg not found, GNUNET_OK on success */ -static int remove_http_message(struct HTTP_Connection * con, struct HTTP_Message * msg) +static int remove_http_message(struct HTTP_Session * ps, struct HTTP_Message * msg) { - GNUNET_CONTAINER_DLL_remove(con->pending_msgs_head,con->pending_msgs_tail,msg); + GNUNET_CONTAINER_DLL_remove(ps->pending_msgs_head,ps->pending_msgs_tail,msg); GNUNET_free(msg); return GNUNET_OK; } @@ -1108,19 +1116,19 @@ static size_t curl_header_function( void *ptr, size_t size, size_t nmemb, void * */ static size_t send_curl_read_callback(void *stream, size_t size, size_t nmemb, void *ptr) { - struct HTTP_Connection * con = ptr; - struct HTTP_Message * msg = con->pending_msgs_tail; + struct HTTP_Session * ps = ptr; + struct HTTP_Message * msg = ps->pending_msgs_tail; size_t bytes_sent; size_t len; - if (con->pending_msgs_tail == NULL) + if (ps->pending_msgs_tail == NULL) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,"Connection %X: No Message to send, pausing connection\n",con); - con->put_send_paused = GNUNET_YES; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,"Connection %X: No Message to send, pausing connection\n",ps); + ps->send_active = GNUNET_NO; return CURL_READFUNC_PAUSE; } - msg = con->pending_msgs_tail; + msg = ps->pending_msgs_tail; /* data to send */ if (msg->pos < msg->size) { @@ -1148,11 +1156,11 @@ static size_t send_curl_read_callback(void *stream, size_t size, size_t nmemb, v if ( msg->pos == msg->size) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,"Connection %X: Message with %u bytes sent, removing message from queue \n",con, msg->pos); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,"Connection %X: Message with %u bytes sent, removing message from queue \n",ps, msg->pos); /* Calling transmit continuation */ - if (( NULL != con->pending_msgs_tail) && (NULL != con->pending_msgs_tail->transmit_cont)) - msg->transmit_cont (con->pending_msgs_tail->transmit_cont_cls,&(con->session)->identity,GNUNET_OK); - remove_http_message(con, msg); + if (( NULL != ps->pending_msgs_tail) && (NULL != ps->pending_msgs_tail->transmit_cont)) + msg->transmit_cont (ps->pending_msgs_tail->transmit_cont_cls,&(ps->peercontext)->identity,GNUNET_OK); + remove_http_message(ps, msg); } return bytes_sent; } @@ -1194,7 +1202,7 @@ static size_t send_schedule(void *cls, struct Session* ses ); * @param con connection * @return bytes sent to peer */ -static ssize_t send_check_connections (void *cls, struct Session* ses , struct HTTP_Connection *con) +static ssize_t send_check_connections (void *cls, struct Session* ses , struct HTTP_Session *ps) { struct Plugin *plugin = cls; int bytes_sent = 0; @@ -1204,96 +1212,106 @@ static ssize_t send_check_connections (void *cls, struct Session* ses , struct H GNUNET_assert(cls !=NULL); - if (con->get_connected == GNUNET_NO) + if (ps->direction == OUTBOUND) { - if (con->get_curl_handle == NULL) - { - con->get_curl_handle = curl_easy_init(); + /* Check if session is connected to receive data, otherwise connect to peer */ + if (ps->recv_connected == GNUNET_NO) + { + if (ps->recv_endpoint == NULL) + { + ps->recv_endpoint = curl_easy_init(); #if DEBUG_CURL - curl_easy_setopt(con->get_curl_handle, CURLOPT_VERBOSE, 1L); + curl_easy_setopt(ps->recv_endpoint, CURLOPT_VERBOSE, 1L); #endif - curl_easy_setopt(con->get_curl_handle, CURLOPT_URL, con->url); - //curl_easy_setopt(con->put_curl_handle, CURLOPT_PUT, 1L); - curl_easy_setopt(con->get_curl_handle, CURLOPT_HEADERFUNCTION, &curl_header_function); - curl_easy_setopt(con->get_curl_handle, CURLOPT_WRITEHEADER, con); - curl_easy_setopt(con->get_curl_handle, CURLOPT_READFUNCTION, send_curl_read_callback); - curl_easy_setopt(con->get_curl_handle, CURLOPT_READDATA, con); - curl_easy_setopt(con->get_curl_handle, CURLOPT_WRITEFUNCTION, send_curl_write_callback); - curl_easy_setopt(con->get_curl_handle, CURLOPT_WRITEDATA, con); - curl_easy_setopt(con->get_curl_handle, CURLOPT_TIMEOUT, (long) timeout.value); - curl_easy_setopt(con->get_curl_handle, CURLOPT_PRIVATE, con); - curl_easy_setopt(con->get_curl_handle, CURLOPT_CONNECTTIMEOUT, HTTP_CONNECT_TIMEOUT); - curl_easy_setopt(con->get_curl_handle, CURLOPT_BUFFERSIZE, GNUNET_SERVER_MAX_MESSAGE_SIZE); - - mret = curl_multi_add_handle(plugin->multi_handle, con->get_curl_handle); - if (mret != CURLM_OK) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - _("%s failed at %s:%d: `%s'\n"), - "curl_multi_add_handle", __FILE__, __LINE__, - curl_multi_strerror (mret)); - return -1; - } - - if (con->get_msgtok != NULL) - con->get_msgtok = GNUNET_SERVER_mst_create (&curl_write_mst_cb, con); + curl_easy_setopt(ps->recv_endpoint, CURLOPT_URL, ps->url); + curl_easy_setopt(ps->recv_endpoint, CURLOPT_HEADERFUNCTION, &curl_header_function); + curl_easy_setopt(ps->recv_endpoint, CURLOPT_WRITEHEADER, ps); + curl_easy_setopt(ps->recv_endpoint, CURLOPT_READFUNCTION, send_curl_read_callback); + curl_easy_setopt(ps->recv_endpoint, CURLOPT_READDATA, ps); + curl_easy_setopt(ps->recv_endpoint, CURLOPT_WRITEFUNCTION, send_curl_write_callback); + curl_easy_setopt(ps->recv_endpoint, CURLOPT_WRITEDATA, ps); + curl_easy_setopt(ps->recv_endpoint, CURLOPT_TIMEOUT, (long) timeout.value); + curl_easy_setopt(ps->recv_endpoint, CURLOPT_PRIVATE, ps); + curl_easy_setopt(ps->recv_endpoint, CURLOPT_CONNECTTIMEOUT, HTTP_CONNECT_TIMEOUT); + curl_easy_setopt(ps->recv_endpoint, CURLOPT_BUFFERSIZE, GNUNET_SERVER_MAX_MESSAGE_SIZE); + + mret = curl_multi_add_handle(plugin->multi_handle, ps->recv_endpoint); + if (mret != CURLM_OK) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + _("%s failed at %s:%d: `%s'\n"), + "curl_multi_add_handle", __FILE__, __LINE__, + curl_multi_strerror (mret)); + return -1; + } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,"Connection %X: inbound not connected, initiating connection\n",con); + if (ps->msgtok != NULL) + ps->msgtok = GNUNET_SERVER_mst_create (&curl_write_mst_cb, ps); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,"Connection %X: inbound not connected, initiating connection\n",ps); + } } - } - /* PUT already connected, no need to initiate connection */ - if ((con->put_connected == GNUNET_YES) && (con->put_curl_handle != NULL)) - { - if (con->put_send_paused == GNUNET_NO) + /* Check if session is connected to send data, otherwise connect to peer */ + if ((ps->send_connected == GNUNET_YES) && (ps->send_endpoint!= NULL)) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,"Connection %X: outbound active, enqueueing message\n",con); - return bytes_sent; + if (ps->send_connected == GNUNET_NO) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,"Connection %X: outbound active, enqueueing message\n",ps); + return bytes_sent; + } + if (ps->send_active == GNUNET_NO) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,"Connection %X: outbound paused, unpausing existing connection and enqueueing message\n",ps); + curl_easy_pause(ps->send_endpoint,CURLPAUSE_CONT); + ps->send_active=GNUNET_YES; + return bytes_sent; + } } - if (con->put_send_paused == GNUNET_YES) + + /* not connected, initiate connection */ + if ( NULL == ps->send_endpoint) + ps->send_endpoint = curl_easy_init(); + GNUNET_assert (ps->send_endpoint != NULL); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,"Connection %X: outbound not connected, initiating connection\n",ps); + + GNUNET_assert (NULL != ps->pending_msgs_tail); + msg = ps->pending_msgs_tail; + + #if DEBUG_CURL + curl_easy_setopt(ps->send_endpoint, CURLOPT_VERBOSE, 1L); + #endif + curl_easy_setopt(ps->send_endpoint, CURLOPT_URL, ps->url); + curl_easy_setopt(ps->send_endpoint, CURLOPT_PUT, 1L); + curl_easy_setopt(ps->send_endpoint, CURLOPT_READFUNCTION, send_curl_read_callback); + curl_easy_setopt(ps->send_endpoint, CURLOPT_READDATA, ps); + curl_easy_setopt(ps->send_endpoint, CURLOPT_WRITEFUNCTION, send_curl_write_callback); + curl_easy_setopt(ps->send_endpoint, CURLOPT_READDATA, ps); + curl_easy_setopt(ps->send_endpoint, CURLOPT_TIMEOUT, (long) timeout.value); + curl_easy_setopt(ps->send_endpoint, CURLOPT_PRIVATE, ps); + curl_easy_setopt(ps->send_endpoint, CURLOPT_CONNECTTIMEOUT, HTTP_CONNECT_TIMEOUT); + curl_easy_setopt(ps->send_endpoint, CURLOPT_BUFFERSIZE, GNUNET_SERVER_MAX_MESSAGE_SIZE); + + mret = curl_multi_add_handle(plugin->multi_handle, ps->send_endpoint); + if (mret != CURLM_OK) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,"Connection %X: outbound paused, unpausing existing connection and enqueueing message\n",con); - curl_easy_pause(con->put_curl_handle,CURLPAUSE_CONT); - con->put_send_paused=GNUNET_NO; - return bytes_sent; + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + _("%s failed at %s:%d: `%s'\n"), + "curl_multi_add_handle", __FILE__, __LINE__, + curl_multi_strerror (mret)); + return -1; } + ps->send_connected = GNUNET_YES; + bytes_sent = send_schedule (plugin, NULL); + return bytes_sent; } - - /* not connected, initiate connection */ - if ( NULL == con->put_curl_handle) - con->put_curl_handle = curl_easy_init(); - GNUNET_assert (con->put_curl_handle != NULL); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,"Connection %X: outbound not connected, initiating connection\n",con); - - GNUNET_assert (NULL != con->pending_msgs_tail); - msg = con->pending_msgs_tail; - -#if DEBUG_CURL - curl_easy_setopt(con->put_curl_handle, CURLOPT_VERBOSE, 1L); -#endif - curl_easy_setopt(con->put_curl_handle, CURLOPT_URL, con->url); - curl_easy_setopt(con->put_curl_handle, CURLOPT_PUT, 1L); - curl_easy_setopt(con->put_curl_handle, CURLOPT_READFUNCTION, send_curl_read_callback); - curl_easy_setopt(con->put_curl_handle, CURLOPT_READDATA, con); - curl_easy_setopt(con->put_curl_handle, CURLOPT_WRITEFUNCTION, send_curl_write_callback); - curl_easy_setopt(con->put_curl_handle, CURLOPT_READDATA, con); - curl_easy_setopt(con->put_curl_handle, CURLOPT_TIMEOUT, (long) timeout.value); - curl_easy_setopt(con->put_curl_handle, CURLOPT_PRIVATE, con); - curl_easy_setopt(con->put_curl_handle, CURLOPT_CONNECTTIMEOUT, HTTP_CONNECT_TIMEOUT); - curl_easy_setopt(con->put_curl_handle, CURLOPT_BUFFERSIZE, GNUNET_SERVER_MAX_MESSAGE_SIZE); - - mret = curl_multi_add_handle(plugin->multi_handle, con->put_curl_handle); - if (mret != CURLM_OK) + if (ps->direction == INBOUND) { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - _("%s failed at %s:%d: `%s'\n"), - "curl_multi_add_handle", __FILE__, __LINE__, - curl_multi_strerror (mret)); - return -1; + GNUNET_assert (NULL != ps->pending_msgs_tail); + msg = ps->pending_msgs_tail; + if ((ps->recv_connected==GNUNET_YES) && (ps->recv_connected==GNUNET_YES)) + bytes_sent = msg->size; + return bytes_sent; } - con->put_connected = GNUNET_YES; - bytes_sent = send_schedule (plugin, ses); - return bytes_sent; } static void send_execute (void *cls, @@ -1546,9 +1564,9 @@ http_plugin_send (void *cls, void *cont_cls) { struct Plugin *plugin = cls; - struct Session *cs; + //struct Session *cs; struct HTTP_Message *msg; - struct HTTP_Connection *con; + //struct HTTP_Connection *con; struct HTTP_PeerContext * pc; @@ -1558,8 +1576,8 @@ http_plugin_send (void *cls, GNUNET_assert ((addr!=NULL) && (addrlen != 0)); /* get session from hashmap */ - cs = session_get(plugin, target); - con = session_check_outbound_address(plugin, cs, addr, addrlen); + //cs = session_get(plugin, target); + //con = session_check_outbound_address(plugin, cs, addr, addrlen); pc = GNUNET_CONTAINER_multihashmap_get (plugin->peers, &target->hashPubKey); @@ -1576,22 +1594,54 @@ http_plugin_send (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,"SEND: PEER CONTEXT FOUND\n"); } ps = get_HTTP_Session(plugin, pc, addr, addrlen); - if (ps==NULL) + /* session not existing, but address forced -> creating new session */ + if ((ps==NULL) && (force_address == GNUNET_YES)) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,"SEND: CREATING NEW SESSION %s\n",http_plugin_address_to_string(NULL, addr, addrlen)); ps = GNUNET_malloc(sizeof (struct HTTP_Session)); ps->addr = GNUNET_malloc(addrlen); memcpy(ps->addr,addr,addrlen); ps->addrlen = addrlen; - ps->direction=GNUNET_YES; + ps->direction=OUTBOUND; + ps->recv_connected = GNUNET_NO; + ps->send_connected = GNUNET_NO; ps->pending_msgs_head = NULL; ps->pending_msgs_tail = NULL; ps->url = create_url (plugin, ps->addr, ps->addrlen); GNUNET_CONTAINER_DLL_insert(pc->head,pc->tail,ps); } - else + /* session not existing, address not forced -> looking for other session */ + if ((ps==NULL) && (force_address == GNUNET_NO)) + { + /* FIXME: CREATING SESSION, SHOULD CHOOSE EXISTING */ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,"SEND: CREATING NEW SESSION %s\n",http_plugin_address_to_string(NULL, addr, addrlen)); + ps = GNUNET_malloc(sizeof (struct HTTP_Session)); + ps->addr = GNUNET_malloc(addrlen); + memcpy(ps->addr,addr,addrlen); + ps->addrlen = addrlen; + ps->direction=OUTBOUND; + ps->recv_connected = GNUNET_NO; + ps->send_connected = GNUNET_NO; + ps->pending_msgs_head = NULL; + ps->pending_msgs_tail = NULL; + ps->url = create_url (plugin, ps->addr, ps->addrlen); + GNUNET_CONTAINER_DLL_insert(pc->head,pc->tail,ps); + } + if ((ps==NULL) && (force_address == GNUNET_SYSERR)) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,"SEND: SESSION CONTEXT FOUND\n"); + /* FIXME: CREATING SESSION, SHOULD CHOOSE EXISTING */ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,"SEND: CREATING NEW SESSION %s\n",http_plugin_address_to_string(NULL, addr, addrlen)); + ps = GNUNET_malloc(sizeof (struct HTTP_Session)); + ps->addr = GNUNET_malloc(addrlen); + memcpy(ps->addr,addr,addrlen); + ps->addrlen = addrlen; + ps->direction=OUTBOUND; + ps->recv_connected = GNUNET_NO; + ps->send_connected = GNUNET_NO; + ps->pending_msgs_head = NULL; + ps->pending_msgs_tail = NULL; + ps->url = create_url (plugin, ps->addr, ps->addrlen); + GNUNET_CONTAINER_DLL_insert(pc->head,pc->tail,ps); } char * force = GNUNET_malloc(30); @@ -1603,12 +1653,12 @@ http_plugin_send (void *cls, strcpy(force,"reliable bi-direc. address addr."); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,"Transport tells me to send %u bytes to `%s' %s (%s), session: %X\n", msgbuf_size, - GNUNET_i2s(&cs->identity), + GNUNET_i2s(&pc->identity), force, http_plugin_address_to_string(NULL, addr, addrlen), - session); + ps); - //GNUNET_free(force); + GNUNET_free(force); /* create msg */ msg = GNUNET_malloc (sizeof (struct HTTP_Message) + msgbuf_size); msg->next = NULL; @@ -1618,22 +1668,9 @@ http_plugin_send (void *cls, msg->transmit_cont = cont; msg->transmit_cont_cls = cont_cls; memcpy (msg->buf,msgbuf, msgbuf_size); + GNUNET_CONTAINER_DLL_insert(ps->pending_msgs_head,ps->pending_msgs_tail,msg); - /* must use this address */ - if (force_address == GNUNET_YES) - { - /* enqueue in connection message queue */ - GNUNET_CONTAINER_DLL_insert(con->pending_msgs_head,con->pending_msgs_tail,msg); - //GNUNET_CONTAINER_DLL_insert(ps->pending_msgs_head,ps->pending_msgs_tail,msg); - } - /* can use existing connection to send */ - else - { - /* enqueue in connection message queue */ - GNUNET_CONTAINER_DLL_insert(con->pending_msgs_head,con->pending_msgs_tail,msg); - //GNUNET_CONTAINER_DLL_insert(ps->pending_msgs_head,ps->pending_msgs_tail,msg); - } - return send_check_connections (plugin, cs, con); + return send_check_connections (plugin, session, ps); } -- 2.25.1