From 5d702d02c5f2cf86400c469f4eb162a0f63d6cd8 Mon Sep 17 00:00:00 2001 From: Matthias Wachs Date: Mon, 21 Jun 2010 16:29:45 +0000 Subject: [PATCH] added support for multiple messages in a transfer --- src/transport/plugin_transport_http.c | 98 +++++++++++++++------- src/transport/test_plugin_transport_http.c | 36 +++++++- 2 files changed, 101 insertions(+), 33 deletions(-) diff --git a/src/transport/plugin_transport_http.c b/src/transport/plugin_transport_http.c index 2bf21e02d..628fff100 100644 --- a/src/transport/plugin_transport_http.c +++ b/src/transport/plugin_transport_http.c @@ -39,7 +39,7 @@ #include -#define DEBUG_CURL GNUNET_YES +#define DEBUG_CURL GNUNET_NO #define DEBUG_HTTP GNUNET_NO /** @@ -394,7 +394,7 @@ static void requestCompletedCallback (void *cls, struct MHD_Connection * connect cs = *httpSessionCache; if (cs != NULL) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,"Connection from peer `%s' was terminated\n",GNUNET_i2s(&cs->sender)); + /*GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,"Connection from peer `%s' was terminated\n",GNUNET_i2s(&cs->sender));*/ /* session set to inactive */ cs->is_active = GNUNET_NO; cs->is_put_in_progress = GNUNET_NO; @@ -438,10 +438,11 @@ accessHandlerCallback (void *cls, char address[INET6_ADDRSTRLEN+14]; struct GNUNET_PeerIdentity pi_in; int res = GNUNET_NO; - struct GNUNET_MessageHeader *gn_msg; + struct GNUNET_MessageHeader *cur_msg; int send_error_to_client; - gn_msg = NULL; + + cur_msg = NULL; send_error_to_client = GNUNET_NO; if ( NULL == *httpSessionCache) @@ -493,7 +494,7 @@ accessHandlerCallback (void *cls, if ( GNUNET_YES == res) { /* existing session for this address found */ - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,"Session for peer `%s' found\n",GNUNET_i2s(&cs->sender)); + /*GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,"Session for peer `%s' found\n",GNUNET_i2s(&cs->sender));*/ break; } cs = cs->next; @@ -564,7 +565,6 @@ accessHandlerCallback (void *cls, /* copy uploaded data to buffer */ memcpy(&cs->pending_inbound_msg->buf[cs->pending_inbound_msg->pos],upload_data,*upload_data_size); cs->pending_inbound_msg->pos += *upload_data_size; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,"%u bytes added to message of %u bytes\n",*upload_data_size, cs->pending_inbound_msg->pos); *upload_data_size = 0; return MHD_YES; } @@ -596,45 +596,85 @@ accessHandlerCallback (void *cls, if ((*upload_data_size == 0) && (cs->is_put_in_progress == GNUNET_YES) && (cs->is_bad_request == GNUNET_NO)) { send_error_to_client = GNUNET_YES; - struct GNUNET_MessageHeader * gn_msg = NULL; - /*check message and forward here */ + cur_msg = NULL; + /* split and check messages and forward here */ /* checking size */ if (cs->pending_inbound_msg->pos >= sizeof (struct GNUNET_MessageHeader)) { + cur_msg = (struct GNUNET_MessageHeader *) cs->pending_inbound_msg->buf; + unsigned int len = ntohs (cur_msg->size); - gn_msg = GNUNET_malloc (cs->pending_inbound_msg->pos); - memcpy (gn_msg,cs->pending_inbound_msg->buf,cs->pending_inbound_msg->pos); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,"msg->size: %u \n",ntohs (gn_msg->size)); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,"msg->type: %u \n",ntohs (gn_msg->type)); - - //MY VERSION: if ((ntohs(gn_msg->size) == cs->pending_inbound_msg->pos)) - if ((ntohs(gn_msg->size) <= cs->pending_inbound_msg->pos)) + if (len == cs->pending_inbound_msg->pos) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,"Recieved GNUnet message type %u size %u and payload %u \n",ntohs (gn_msg->type), ntohs (gn_msg->size), ntohs (gn_msg->size)-sizeof(struct GNUNET_MessageHeader)); - /* forwarding message to transport */ - char * tmp = NULL; + /* one message in recieved data, can pass directly*/ if ( AF_INET == cs->addr_inbound->sin_family) { - tmp = GNUNET_malloc (INET_ADDRSTRLEN + 14); inet_ntop(AF_INET, &(cs->addr_inbound)->sin_addr,address,INET_ADDRSTRLEN); GNUNET_asprintf(&tmp,"%s:%u",address,ntohs(cs->addr_inbound->sin_port)); } - /* Incoming IPv6 connection */ + if ( AF_INET6 == cs->addr_inbound->sin_family) { - tmp = GNUNET_malloc (INET6_ADDRSTRLEN + 14); inet_ntop(AF_INET6, &((struct sockaddr_in6 *) cs->addr_inbound)->sin6_addr,address,INET6_ADDRSTRLEN); GNUNET_asprintf(&tmp,"[%s]:%u",address,ntohs(cs->addr_inbound->sin_port)); } - if (NULL != tmp) + plugin->env->receive(plugin->env->cls, &(cs->sender), cur_msg, 1, NULL , tmp, strlen(tmp)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,"Forwarded 1 message with %u bytes of data to transport service\n", cs->pending_inbound_msg->pos); + GNUNET_free(tmp); + send_error_to_client = GNUNET_NO; + } + if (len < cs->pending_inbound_msg->pos) + { + /* more than one message in recieved data, have to split up*/ + char * tmp = NULL; + unsigned int bytes_proc = 0; + unsigned int c_msgs = 0; + unsigned int len; + /* one message in recieved data, can pass directly*/ + if ( AF_INET == cs->addr_inbound->sin_family) { - plugin->env->receive(plugin->env, &(cs->sender), gn_msg, 1, cs , tmp, strlen(tmp)); - GNUNET_free_non_null(tmp); + inet_ntop(AF_INET, &(cs->addr_inbound)->sin_addr,address,INET_ADDRSTRLEN); + GNUNET_asprintf(&tmp,"%s:%u",address,ntohs(cs->addr_inbound->sin_port)); + } + + if ( AF_INET6 == cs->addr_inbound->sin_family) + { + inet_ntop(AF_INET6, &((struct sockaddr_in6 *) cs->addr_inbound)->sin6_addr,address,INET6_ADDRSTRLEN); + GNUNET_asprintf(&tmp,"[%s]:%u",address,ntohs(cs->addr_inbound->sin_port)); + } send_error_to_client = GNUNET_NO; + while (bytes_proc < cs->pending_inbound_msg->pos) + { + cur_msg = (struct GNUNET_MessageHeader *) &cs->pending_inbound_msg->buf[bytes_proc]; + len = ntohs (cur_msg->size); + if ((bytes_proc+len) <=cs->pending_inbound_msg->pos) + { + plugin->env->receive(plugin->env->cls, &(cs->sender), cur_msg, 1, NULL , tmp, strlen(tmp)); + bytes_proc += ntohs(cur_msg->size); + c_msgs++; + } + else + { + send_error_to_client = GNUNET_YES; + break; + } + } + if (send_error_to_client == GNUNET_NO) + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,"Forwarded %u messages with %u bytes of data to transport service\n", + c_msgs, bytes_proc); + else + GNUNET_log (GNUNET_ERROR_TYPE_ERROR,"Forwarded %u messages with %u bytes, last msg was inconsistent, %u bytes left\n", + c_msgs, bytes_proc,cs->pending_inbound_msg->pos-bytes_proc); + GNUNET_free(tmp); + } + if (len > cs->pending_inbound_msg->pos) + { + /* message size bigger than data recieved -> malformed */ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,"Recieved malformed message: size in header %u bytes, recieved: %u \n", len, cs->pending_inbound_msg->pos); } } @@ -647,14 +687,13 @@ accessHandlerCallback (void *cls, } else { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,"Recieved malformed message with %u bytes\n", cs->pending_inbound_msg->pos); response = MHD_create_response_from_data (strlen (HTTP_PUT_RESPONSE),HTTP_PUT_RESPONSE, MHD_NO, MHD_NO); res = MHD_queue_response (session, MHD_HTTP_BAD_REQUEST, response); MHD_destroy_response (response); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,"Sent HTTP/1.1: 400 BAD REQUEST as PUT Response\n"); } - GNUNET_free_non_null (gn_msg); + //GNUNET_free_non_null (cur_msg); cs->is_put_in_progress = GNUNET_NO; cs->is_bad_request = GNUNET_NO; cs->pending_inbound_msg->pos = 0; @@ -1185,9 +1224,7 @@ http_plugin_send (void *cls, address = NULL; /* find session for peer */ ses = find_session_by_pi (target); - if (NULL != ses ) - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Existing session for peer `%s' found\n", GNUNET_i2s(target)); + /* if (NULL != ses ) GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Existing session for peer `%s' found\n", GNUNET_i2s(target));*/ if ( ses == NULL) { /* create new session object */ @@ -1271,6 +1308,7 @@ http_plugin_send (void *cls, tmp->next = msg; } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,"HTTP Plugin: sending %u bytes of data from peer `%s' to peer `%s'\n",msgbuf_size,GNUNET_i2s(plugin->env->my_identity),GNUNET_i2s(&ses->sender)); if (msg == ses->pending_outbound_msg) { bytes_sent = send_select_init (ses); @@ -1635,8 +1673,6 @@ libgnunet_plugin_transport_http_init (void *cls) long long unsigned int port; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,"Starting http plugin...\n"); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,"size_t %u\n",sizeof(size_t)); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,"uint64_t %u\n",sizeof(uint64_t)); plugin = GNUNET_malloc (sizeof (struct Plugin)); plugin->env = env; diff --git a/src/transport/test_plugin_transport_http.c b/src/transport/test_plugin_transport_http.c index 3d1c8d5e5..63e8be3a1 100644 --- a/src/transport/test_plugin_transport_http.c +++ b/src/transport/test_plugin_transport_http.c @@ -314,6 +314,11 @@ static int fail_msg_transmited_bigger_max_size; */ static int fail_msg_transmited_max_size; +/** + * Test: transmit 2 msgs. in in send operation + */ +static int fail_multiple_msgs_in_transmission; + /** * Test: connect to peer without peer identification */ @@ -377,7 +382,7 @@ shutdown_clean () GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test connect with wrong data failed\n"); fail = 1; } - if ((fail_msgs_transmited_to_local_addrs != count_str_addr) || (fail_msg_transmited_max_size == GNUNET_YES) || (fail_msg_transmited_bigger_max_size == GNUNET_YES)) + if ((fail_msgs_transmited_to_local_addrs != count_str_addr) || (fail_msg_transmited_max_size == GNUNET_YES) || (fail_msg_transmited_bigger_max_size == GNUNET_YES) || (fail_multiple_msgs_in_transmission != GNUNET_NO)) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test sending with plugin failed\n"); fail = 1; @@ -513,6 +518,10 @@ receive (void *cls, uint16_t sender_address_len) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Testcase recieved new message from peer `%s' (`%s') with type %u and length %u\n", GNUNET_i2s(peer), sender_address, ntohs(message->type), ntohs(message->size)); + if ((ntohs(message->type) == 40) && (fail_multiple_msgs_in_transmission == 1)) + fail_multiple_msgs_in_transmission++; + if ((ntohs(message->type) == 41) && (fail_multiple_msgs_in_transmission == 2)) + fail_multiple_msgs_in_transmission = GNUNET_NO; return GNUNET_TIME_UNIT_ZERO; } @@ -993,6 +1002,29 @@ static void run_connection_tests( ) type ++; } + /* send a multiple GNUNET_messages at a time*/ + GNUNET_free(tmp); + tmp = GNUNET_malloc(4 * sizeof(struct GNUNET_MessageHeader)); + struct GNUNET_MessageHeader * msg1 = (struct GNUNET_MessageHeader *) tmp; + msg1->size = htons(2 * sizeof(struct GNUNET_MessageHeader)); + msg1->type = htons(40); + struct GNUNET_MessageHeader * msg2 = &msg1[2]; + msg2->size = htons(2 * sizeof(struct GNUNET_MessageHeader)); + msg2->type = htons(41); + api->send(api->cls, &my_identity, tmp, 4 * sizeof(struct GNUNET_MessageHeader), 0, TIMEOUT, NULL,addr_head->addr, addr_head->addrlen, GNUNET_YES, &task_send_cont, &fail_multiple_msgs_in_transmission); + + + /* send a multiple GNUNET_messages at a time, second message has incorrect size*/ + GNUNET_free(tmp); + tmp = GNUNET_malloc(4 * sizeof(struct GNUNET_MessageHeader)); + msg1 = (struct GNUNET_MessageHeader *) tmp; + msg1->size = htons(2 * sizeof(struct GNUNET_MessageHeader)); + msg1->type = htons(40); + msg2 = &msg1[2]; + msg2->size = htons(3 * sizeof(struct GNUNET_MessageHeader)); + msg2->type = htons(41); + api->send(api->cls, &my_identity, tmp, 4 * sizeof(struct GNUNET_MessageHeader), 0, TIMEOUT, NULL,addr_head->addr, addr_head->addrlen, GNUNET_YES, &task_send_cont, NULL); + /* send a message with size GNUNET_SERVER_MAX_MESSAGE_SIZE )*/ GNUNET_free(tmp); tmp = GNUNET_malloc(GNUNET_SERVER_MAX_MESSAGE_SIZE); @@ -1001,7 +1033,6 @@ static void run_connection_tests( ) memcpy(tmp,&msg,sizeof(struct GNUNET_MessageHeader)); api->send(api->cls, &my_identity, tmp, GNUNET_SERVER_MAX_MESSAGE_SIZE, 0, TIMEOUT, NULL,addr_head->addr, addr_head->addrlen, GNUNET_YES, &task_send_cont, &fail_msg_transmited_bigger_max_size); - /* send a message with size GNUNET_SERVER_MAX_MESSAGE_SIZE-1 */ GNUNET_free(tmp); tmp = GNUNET_malloc(GNUNET_SERVER_MAX_MESSAGE_SIZE-1); @@ -1043,6 +1074,7 @@ run (void *cls, fail_addr_to_str = GNUNET_YES; fail_msgs_transmited_to_local_addrs = 0; fail_msg_transmited_max_size = GNUNET_YES; + fail_multiple_msgs_in_transmission = GNUNET_YES; addr_head = NULL; count_str_addr = 0; -- 2.25.1