From 426be645faa129f78dc8484f8c596a7cb00549b2 Mon Sep 17 00:00:00 2001 From: Matthias Wachs Date: Fri, 16 Sep 2011 19:44:24 +0000 Subject: [PATCH] sending and receiving implemented --- src/transport/plugin_transport_http.h | 2 +- src/transport/plugin_transport_http_client.c | 25 +++--- src/transport/plugin_transport_http_new.c | 94 +++++++++++++++++--- src/transport/plugin_transport_http_server.c | 92 ++++++++++++------- 4 files changed, 157 insertions(+), 56 deletions(-) diff --git a/src/transport/plugin_transport_http.h b/src/transport/plugin_transport_http.h index 5e452ba84..d11bbd15c 100644 --- a/src/transport/plugin_transport_http.h +++ b/src/transport/plugin_transport_http.h @@ -45,7 +45,7 @@ #define DEBUG_HTTP GNUNET_YES #define VERBOSE_SERVER GNUNET_YES #define VERBOSE_CLIENT GNUNET_YES -#define VERBOSE_CURL GNUNET_NO +#define VERBOSE_CURL GNUNET_YES #if BUILD_HTTPS #define LIBGNUNET_PLUGIN_TRANSPORT_INIT libgnunet_plugin_transport_https_init diff --git a/src/transport/plugin_transport_http_client.c b/src/transport/plugin_transport_http_client.c index 3b3a4705b..224adb474 100644 --- a/src/transport/plugin_transport_http_client.c +++ b/src/transport/plugin_transport_http_client.c @@ -181,7 +181,7 @@ client_run (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { #if DEBUG_HTTP GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, - "Connection to '%s' %s ended\n", GNUNET_i2s(&s->target), http_plugin_address_to_string(plugin, s->addr, s->addrlen)); + "Connection to '%s' %s ended\n", GNUNET_i2s(&s->target), GNUNET_a2s (s->addr, s->addrlen)); #endif client_disconnect(s); //GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name,"Notifying about ended session to peer `%s' `%s'\n", GNUNET_i2s (&s->target), http_plugin_address_to_string (plugin, s->addr, s->addrlen)); @@ -206,9 +206,9 @@ client_disconnect (struct Session *s) struct HTTP_Message * msg; struct HTTP_Message * t; -#if 0 +#if DEBUG_HTTP GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, - "Deleting outbound PUT session to peer `%s'\n", + "Client: Deleting outbound PUT session to peer `%s'\n", GNUNET_i2s (&s->target)); #endif @@ -227,7 +227,7 @@ client_disconnect (struct Session *s) #if DEBUG_HTTP GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, - "Deleting outbound GET session to peer `%s'\n", + "Client: Deleting outbound GET session to peer `%s'\n", GNUNET_i2s (&s->target)); #endif @@ -304,6 +304,12 @@ client_receive (void *stream, size_t size, size_t nmemb, void *cls) struct Session *s = cls; struct Plugin *plugin = s->plugin; +#if VERBOSE_CLIENT + GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, "Client: Received %Zu bytes from peer `%s'\n", + size * nmemb, + GNUNET_i2s (&s->target)); +#endif + if (GNUNET_TIME_absolute_get().abs_value < s->delay.abs_value) { #if DEBUG_CLIENT @@ -314,17 +320,13 @@ client_receive (void *stream, size_t size, size_t nmemb, void *cls) return 0; } + if (s->msg_tk == NULL) s->msg_tk = GNUNET_SERVER_mst_create (&client_receive_mst_cb, s); GNUNET_SERVER_mst_receive (s->msg_tk, s, stream, size * nmemb, GNUNET_NO, GNUNET_NO); -#if VERBOSE_CLIENT - GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, "Received %u bytes from peer `%s'\n", - size * nmemb, - GNUNET_i2s (&s->target)); -#endif return (size * nmemb); } @@ -389,9 +391,9 @@ client_send_cb (void *stream, size_t size, size_t nmemb, void *cls) if (msg->pos == msg->size) { -#if DEBUG_CONNECTIONS +#if VERBOSE_CLIENT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Connection %X: Message with %u bytes sent, removing message from queue\n", + "Message with %u bytes sent, removing message from queue\n", s, msg->pos); #endif /* Calling transmit continuation */ @@ -422,6 +424,7 @@ client_connect (struct Session *s) plugin->last_tag++; /* create url */ GNUNET_asprintf (&url, "%s%s;%u", http_plugin_address_to_string (plugin, s->addr, s->addrlen), GNUNET_h2s_full (&plugin->env->my_identity->hashPubKey),plugin->last_tag); + //GNUNET_asprintf (&url, "http://www.heise.de", http_plugin_address_to_string (plugin, s->addr, s->addrlen), GNUNET_h2s_full (&plugin->env->my_identity->hashPubKey),plugin->last_tag); #if 0 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, "URL `%s'\n", diff --git a/src/transport/plugin_transport_http_new.c b/src/transport/plugin_transport_http_new.c index 48477b809..e998636b2 100644 --- a/src/transport/plugin_transport_http_new.c +++ b/src/transport/plugin_transport_http_new.c @@ -375,6 +375,7 @@ http_plugin_address_to_string (void *cls, const void *addr, size_t addrlen) struct Session * lookup_session (struct Plugin *plugin, const struct GNUNET_PeerIdentity *target, + struct Session * session, const void *addr, size_t addrlen, int force_address) { struct Session *s = NULL; @@ -385,39 +386,79 @@ lookup_session (struct Plugin *plugin, const struct GNUNET_PeerIdentity *target, t = plugin->head; if (t == NULL) return NULL; - while (t->next != NULL) + while (t != NULL) { e_peer = GNUNET_NO; e_addr = GNUNET_NO; + +#if DEBUG_HTTP + GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR, plugin->name, + "Comparing session %X <-> %X\n", session, t); +#endif + if (0 == memcmp (target, &t->target, sizeof (struct GNUNET_PeerIdentity))) { e_peer = GNUNET_YES; if (addrlen == t->addrlen) { if (0 == memcmp (addr, &t->addr, addrlen)) + { e_addr = GNUNET_YES; + } + } + if ((t == session)) + { +#if DEBUG_HTTP + GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR, plugin->name, + "Session %X: %s: \n", t, GNUNET_a2s (t->addr, t->addrlen)); + GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR, plugin->name, + "Session %X: %s: \n", session, GNUNET_a2s (session->addr, session->addrlen)); + +#endif + if(t->addrlen == session->addrlen) + { + GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR, plugin->name, + "length ok\n"); + if (0 == memcmp (session->addr, t->addr, t->addrlen)) + { + GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR, plugin->name, + "equal\n"); + e_addr = GNUNET_YES; + } + } } } +#if DEBUG_HTTP + GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR, plugin->name, + "Session %X: E_PEER YES : %i E_ADDR: %i force %u: \n", t, e_peer, e_addr, force_address); +#endif + if ((e_peer == GNUNET_YES) && (force_address == GNUNET_NO)) { s = t; break; } - else if ((e_peer == GNUNET_YES) && (force_address == GNUNET_YES) && - (e_addr == GNUNET_YES)) + if ((e_peer == GNUNET_YES) && (force_address == GNUNET_YES) && (e_addr == GNUNET_YES)) { s = t; +#if DEBUG_HTTP + GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR, plugin->name, + "Session %X: HERE!\n", t, e_addr, s); +#endif break; } - else if ((e_peer == GNUNET_YES) && (force_address == GNUNET_SYSERR)) + if ((e_peer == GNUNET_YES) && (force_address == GNUNET_SYSERR)) { s = t; break; } + if (s != NULL) + break; t = t->next; } + return s; } @@ -514,15 +555,22 @@ http_plugin_send (void *cls, const struct GNUNET_PeerIdentity *target, #if DEBUG_HTTP GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR, plugin->name, - "Sending %u bytes to peer `%s'\n", msgbuf_size, - GNUNET_i2s (target)); + "Sending %u bytes to peer `%s' on address `%s' %X %i\n", msgbuf_size, + GNUNET_i2s (target), GNUNET_a2s (addr, addrlen), session, force_address); #endif struct Session *s = NULL; /* look for existing connection */ - s = lookup_session (plugin, target, addr, addrlen, force_address); - + s = lookup_session (plugin, target, session, addr, addrlen, 1); +#if DEBUG_HTTP + GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR, plugin->name, + "%s exisiting session\n", (s!=NULL) ? "Found" : "NOT Found"); +#endif + // FIXME DEBUGGING + if (session != NULL) + s= session; + //FIXME END /* create new outbound connection */ if (s == NULL) { @@ -561,10 +609,26 @@ http_plugin_send (void *cls, const struct GNUNET_PeerIdentity *target, memcpy (msg->buf, msgbuf, msgbuf_size); if (s->inbound == GNUNET_NO) - res = client_send (s, msg); + { +#if DEBUG_HTTP + GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR, plugin->name, + "Using client session to send to `%s'\n", + GNUNET_i2s (target)); +#endif + client_send (s, msg); + res = msgbuf_size; + } if (s->inbound == GNUNET_YES) - res = server_send (s, msg); + { + server_send (s, msg); + res = msgbuf_size; +#if DEBUG_HTTP + GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR, plugin->name, + "Using server session to send to `%s'\n", + GNUNET_i2s (target)); +#endif + } return res; } @@ -614,6 +678,8 @@ nat_connection_reversal (void *cls, const struct sockaddr *addr, socklen_t addrlen) { + + } static void @@ -771,7 +837,7 @@ nat_port_map_callback (void *cls, int add_remove, const struct sockaddr *addr, { GNUNET_assert (cls != NULL); struct Plugin *plugin = cls; - + static int limit; #if DEBUG_HTTP GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, "NPMC called %s to address `%s'\n", @@ -782,7 +848,11 @@ nat_port_map_callback (void *cls, int add_remove, const struct sockaddr *addr, switch (add_remove) { case GNUNET_YES: - nat_add_address (cls, add_remove, addr, addrlen); + // FIXME DEBUGGING + if (limit < 1) + nat_add_address (cls, add_remove, addr, addrlen); + limit++; + // FIXME END break; case GNUNET_NO: nat_remove_address (cls, add_remove, addr, addrlen); diff --git a/src/transport/plugin_transport_http_server.c b/src/transport/plugin_transport_http_server.c index 98fbfda1c..0c2ab54cf 100644 --- a/src/transport/plugin_transport_http_server.c +++ b/src/transport/plugin_transport_http_server.c @@ -37,7 +37,7 @@ server_log (void *arg, const char *fmt, va_list ap) vsnprintf (text, sizeof (text), fmt, ap); va_end (ap); - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "server: %s\n", text); + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Server: %s\n", text); } struct ServerConnection @@ -70,7 +70,7 @@ server_accept_cb (void *cls, const struct sockaddr *addr, socklen_t addr_len) return MHD_YES; else { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "server: Cannot accept new connections\n"); + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Server: Cannot accept new connections\n"); return MHD_NO; } } @@ -267,12 +267,12 @@ server_receive_mst_cb (void *cls, void *client, * @return bytes written to buffer */ static ssize_t -mhd_send_callback (void *cls, uint64_t pos, char *buf, size_t max) +server_send_callback (void *cls, uint64_t pos, char *buf, size_t max) { struct Session *s = cls; struct HTTP_Message *msg; int bytes_read = 0; - + //static int c = 0; msg = s->msg_head; if (msg != NULL) { @@ -299,7 +299,8 @@ mhd_send_callback (void *cls, uint64_t pos, char *buf, size_t max) GNUNET_free (msg); } } -#if DEBUG_CONNECTIONS + +#if VERBOSE_CLIENT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Connection %X: MHD has sent %u bytes\n", s, bytes_read); #endif @@ -371,7 +372,7 @@ server_access_cb (void *cls, struct MHD_Connection *mhd_connection, plugin->cur_connections++; #if VERBOSE_SERVER - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "server: New inbound connection from %s with tag %u\n", GNUNET_h2s_full(&(target.hashPubKey)), tag); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Server: New inbound connection from %s with tag %u\n", GNUNET_i2s(&target), tag); #endif /* find duplicate session */ @@ -388,7 +389,7 @@ server_access_cb (void *cls, struct MHD_Connection *mhd_connection, if (t != NULL) { #if VERBOSE_SERVER - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "server: Duplicate session, dismissing new connection from peer `%s'\n", GNUNET_i2s (&target)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Server: Duplicate session, dismissing new connection from peer `%s'\n", GNUNET_i2s (&target)); #endif goto error; } @@ -411,13 +412,13 @@ server_access_cb (void *cls, struct MHD_Connection *mhd_connection, goto create; #if VERBOSE_SERVER - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "server: Found existing semi-session for `%s'\n", GNUNET_i2s (&target)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Server: Found existing semi-session for `%s'\n", GNUNET_i2s (&target)); #endif if ((direction == _SEND) && (t->server_send != NULL)) { #if VERBOSE_SERVER - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "server: Duplicate GET session, dismissing new connection from peer `%s'\n", GNUNET_i2s (&target)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Server: Duplicate GET session, dismissing new connection from peer `%s'\n", GNUNET_i2s (&target)); #endif goto error; } @@ -427,7 +428,7 @@ server_access_cb (void *cls, struct MHD_Connection *mhd_connection, GNUNET_CONTAINER_DLL_remove(plugin->server_semi_head, plugin->server_semi_tail, s); GNUNET_CONTAINER_DLL_insert(plugin->head, plugin->tail, s); #if VERBOSE_SERVER - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "server: Found matching semi-session, merging session for peer `%s'\n", GNUNET_i2s (&target)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Server: Found matching semi-session, merging session for peer `%s'\n", GNUNET_i2s (&target)); #endif goto found; @@ -435,7 +436,7 @@ server_access_cb (void *cls, struct MHD_Connection *mhd_connection, if ((direction == _RECEIVE) && (t->server_recv != NULL)) { #if VERBOSE_SERVER - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "server: Duplicate PUT session, dismissing new connection from peer `%s'\n", GNUNET_i2s (&target)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Server: Duplicate PUT session, dismissing new connection from peer `%s'\n", GNUNET_i2s (&target)); #endif goto error; } @@ -445,7 +446,7 @@ server_access_cb (void *cls, struct MHD_Connection *mhd_connection, GNUNET_CONTAINER_DLL_remove(plugin->server_semi_head, plugin->server_semi_tail, s); GNUNET_CONTAINER_DLL_insert(plugin->head, plugin->tail, s); #if VERBOSE_SERVER - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "server: Found matching semi-session, merging session for peer `%s'\n", GNUNET_i2s (&target)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Server: Found matching semi-session, merging session for peer `%s'\n", GNUNET_i2s (&target)); #endif goto found; } @@ -453,7 +454,7 @@ server_access_cb (void *cls, struct MHD_Connection *mhd_connection, create: /* create new session */ #if VERBOSE_SERVER - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "server: Creating new session for peer `%s' \n", GNUNET_i2s (&target)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Server: Creating new session for peer `%s' \n", GNUNET_i2s (&target)); #endif s = create_session(plugin, @@ -473,7 +474,7 @@ create: goto found; error: - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "server: Invalid connection request\n"); + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Server: Invalid connection request\n"); response = MHD_create_response_from_data (strlen (HTTP_ERROR_RESPONSE),HTTP_ERROR_RESPONSE, MHD_NO, MHD_NO); res = MHD_queue_response (mhd_connection, MHD_HTTP_NOT_FOUND, response); MHD_destroy_response (response); @@ -488,6 +489,10 @@ found: if (direction == _RECEIVE) s->server_recv = sc; + int to = (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value / 1000); + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Server: Setting Timeout to %u\n", to); + //MHD_set_connection_option (mhd_connection, MHD_CONNECTION_OPTION_TIMEOUT, to); + (*httpSessionCache) = sc; } @@ -512,8 +517,7 @@ found: GNUNET_assert (s != NULL); if (sc->direction == _SEND) { - response = - MHD_create_response_from_callback (-1, 32 * 1024, &mhd_send_callback, + response = MHD_create_response_from_callback (-1, 32 * 1024, &server_send_callback, s, NULL); res = MHD_queue_response (mhd_connection, MHD_HTTP_OK, response); MHD_destroy_response (response); @@ -536,8 +540,8 @@ found: { #if VERBOSE_SERVER GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, - "Server: peer `%s' PUT on address `%s' received %u bytes\n", - GNUNET_i2s (&s->target), GNUNET_a2s (s->addr, s->addrlen)); + "Server: peer `%s' PUT on address `%s' received %Zu bytes\n", + GNUNET_i2s (&s->target), GNUNET_a2s (s->addr, s->addrlen), *upload_data_size); #endif if ((GNUNET_TIME_absolute_get().abs_value < s->delay.abs_value)) { @@ -551,7 +555,7 @@ found: } res = GNUNET_SERVER_mst_receive (s->msg_tk, s, upload_data, *upload_data_size, GNUNET_NO, GNUNET_NO); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Server: Received %u bytes\n", + "Server: Received %Zu bytes\n", *upload_data_size); (*upload_data_size) = 0; } @@ -573,6 +577,16 @@ found: return res; } +/** + * Function that queries MHD's select sets and + * starts the task waiting for them. + * @param plugin plugin + * @param daemon_handle the MHD daemon handle + * @return gnunet task identifier + */ +static GNUNET_SCHEDULER_TaskIdentifier +server_schedule (struct Plugin *plugin, struct MHD_Daemon *daemon_handle); + static void server_disconnect_cb (void *cls, struct MHD_Connection *connection, void **httpSessionCache) @@ -592,33 +606,37 @@ server_disconnect_cb (void *cls, struct MHD_Connection *connection, { #if VERBOSE_SERVER GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, - "Server: peer `%s' PUT on address `%s' disconnected\n", + "Server: peer `%s' GET on address `%s' disconnected\n", GNUNET_i2s (&s->target), GNUNET_a2s (s->addr, s->addrlen)); #endif s->server_send = NULL; - /* FIXME miminimize timeout here */ + if (s->server_recv != NULL) { tc = s->server_recv; tc->disconnect = GNUNET_YES; + MHD_set_connection_option (sc->mhd_conn, MHD_CONNECTION_OPTION_TIMEOUT, 1); } } if (sc->direction == _RECEIVE) { #if VERBOSE_SERVER GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, - "Server: peer `%s' GET on address `%s' disconnected\n", + "Server: peer `%s' PUT on address `%s' disconnected\n", GNUNET_i2s (&s->target), GNUNET_a2s (s->addr, s->addrlen)); #endif s->server_recv = NULL; - //MHD_ if (s->server_send != NULL) { tc = s->server_send; tc->disconnect = GNUNET_YES; + MHD_set_connection_option (sc->mhd_conn, MHD_CONNECTION_OPTION_TIMEOUT, 1); } if (s->msg_tk != NULL) + { GNUNET_SERVER_mst_destroy(s->msg_tk); + s->msg_tk = NULL; + } } GNUNET_free (sc); @@ -635,6 +653,19 @@ server_disconnect_cb (void *cls, struct MHD_Connection *connection, } plugin->cur_connections--; + if (plugin->server_v4_task != GNUNET_SCHEDULER_NO_TASK) + { + GNUNET_SCHEDULER_cancel(plugin->server_v4_task); + plugin->server_v4_task = GNUNET_SCHEDULER_NO_TASK; + } + plugin->server_v4_task = server_schedule (plugin, plugin->server_v4); + + if (plugin->server_v6_task != GNUNET_SCHEDULER_NO_TASK) + { + GNUNET_SCHEDULER_cancel(plugin->server_v6_task); + plugin->server_v6_task = GNUNET_SCHEDULER_NO_TASK; + } + plugin->server_v6_task = server_schedule (plugin, plugin->server_v6); if ((s->server_send == NULL) && (s->server_recv == NULL)) { @@ -643,6 +674,11 @@ server_disconnect_cb (void *cls, struct MHD_Connection *connection, "Server: peer `%s' on address `%s' disconnected\n", GNUNET_i2s (&s->target), GNUNET_a2s (s->addr, s->addrlen)); #endif + if (s->msg_tk != NULL) + { + GNUNET_SERVER_mst_destroy(s->msg_tk); + s->msg_tk = NULL; + } notify_session_end(s->plugin, &s->target, s); } @@ -681,15 +717,7 @@ server_send (struct Session *s, struct HTTP_Message * msg) return GNUNET_OK; } -/** - * Function that queries MHD's select sets and - * starts the task waiting for them. - * @param plugin plugin - * @param daemon_handle the MHD daemon handle - * @return gnunet task identifier - */ -static GNUNET_SCHEDULER_TaskIdentifier -server_schedule (struct Plugin *plugin, struct MHD_Daemon *daemon_handle); + /** * Call MHD IPv4 to process pending requests and then go back -- 2.25.1