From acb0b0077234212372463cc24366ac79e4bcceb6 Mon Sep 17 00:00:00 2001 From: Matthias Wachs Date: Fri, 16 Sep 2011 14:24:29 +0000 Subject: [PATCH] client sending & receiving --- src/transport/plugin_transport_http.h | 9 +- src/transport/plugin_transport_http_client.c | 162 +++++++++++++++++-- src/transport/plugin_transport_http_server.c | 4 + 3 files changed, 162 insertions(+), 13 deletions(-) diff --git a/src/transport/plugin_transport_http.h b/src/transport/plugin_transport_http.h index 11b369d40..b7b89e6e6 100644 --- a/src/transport/plugin_transport_http.h +++ b/src/transport/plugin_transport_http.h @@ -82,6 +82,7 @@ struct Plugin */ struct GNUNET_NAT_Handle *nat; + /** * ipv4 DLL head */ @@ -124,7 +125,6 @@ struct Plugin int cur_connections; uint32_t last_tag; - /* * Server handles */ @@ -180,9 +180,9 @@ struct Session struct Plugin *plugin; /** - * The client (used to identify this connection) + * message stream tokenizer for incoming data */ - /* void *client; */ + struct GNUNET_SERVER_MessageStreamTokenizer *msg_tk; /** * Continuation function to call once the transmission buffer @@ -232,7 +232,8 @@ struct Session void *server_recv; void *server_send; - + struct GNUNET_TIME_Absolute delay; + GNUNET_SCHEDULER_TaskIdentifier reset_task; uint32_t tag; }; diff --git a/src/transport/plugin_transport_http_client.c b/src/transport/plugin_transport_http_client.c index c2962394f..04a985906 100644 --- a/src/transport/plugin_transport_http_client.c +++ b/src/transport/plugin_transport_http_client.c @@ -183,7 +183,9 @@ client_run (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) "Connection to '%s' %s ended\n", GNUNET_i2s(&s->target), http_plugin_address_to_string(plugin, s->addr, s->addrlen)); #endif client_disconnect(s); - GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name,"Notifying about ended session to peer `%s' `%s'\n", GNUNET_i2s (&s->target), GNUNET_a2s (s->addr, s->addrlen)); + GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name,"Notifying about ended session to peer `%s' `%s'\n", GNUNET_i2s (&s->target), http_plugin_address_to_string (plugin, s->addr, s->addrlen)); + if (s->msg_tk != NULL) + GNUNET_SERVER_mst_destroy (s->msg_tk); notify_session_end (plugin, &s->target, s); } } @@ -252,6 +254,148 @@ client_disconnect (struct Session *s) return res; } +static void +curl_receive_mst_cb (void *cls, void *client, + const struct GNUNET_MessageHeader *message) +{ + struct Session *s = cls; + struct Plugin *plugin = s->plugin; + struct GNUNET_TRANSPORT_ATS_Information distance[2]; + struct GNUNET_TIME_Relative delay; + + distance[0].type = htonl (GNUNET_TRANSPORT_ATS_QUALITY_NET_DISTANCE); + distance[0].value = htonl (1); + distance[1].type = htonl (GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR); + distance[1].value = htonl (0); + + delay = plugin->env->receive (plugin->env->cls, &s->target, message, (const struct GNUNET_TRANSPORT_ATS_Information*) &distance, 2, s, s->addr, s->addrlen); + s->delay = GNUNET_TIME_absolute_add(GNUNET_TIME_absolute_get(), delay); + + if (GNUNET_TIME_absolute_get().abs_value < s->delay.abs_value) + { +#if VERBOSE_CLIENT + GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, "Client: peer `%s' address `%s' next read delayed for %llu ms\n", + GNUNET_i2s (&s->target), GNUNET_a2s (s->addr, s->addrlen), delay); +#endif + } +} + +/** +* Callback method used with libcurl +* Method is called when libcurl needs to write data during sending +* @param stream pointer where to write data +* @param size size of an individual element +* @param nmemb count of elements that can be written to the buffer +* @param ptr destination pointer, passed to the libcurl handle +* @return bytes read from stream +*/ +static size_t +curl_receive_cb (void *stream, size_t size, size_t nmemb, void *cls) +{ + struct Session *s = cls; + struct Plugin *plugin = s->plugin; + + if (GNUNET_TIME_absolute_get().abs_value < s->delay.abs_value) + { +#if DEBUG_HTTP + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Connection %X: no inbound bandwidth available! Next read was delayed for %llu ms\n", + s, GNUNET_TIME_absolute_get_difference(s->delay, GNUNET_TIME_absolute_get()).rel_value); +#endif + return 0; + } + + if (s->msg_tk == NULL) + s->msg_tk = GNUNET_SERVER_mst_create (&curl_receive_mst_cb, s); + + GNUNET_SERVER_mst_receive (s->msg_tk, s, stream, size * nmemb, GNUNET_NO, + GNUNET_NO); + +#if VERBOSE_CLIENT + GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, "Received %u bytes from peer `%s'\n", + size * nmemb, + GNUNET_i2s (&s->target)); +#endif + return (size * nmemb); +} + +/** + * Callback method used with libcurl + * Method is called when libcurl needs to read data during sending + * @param stream pointer where to write data + * @param size size of an individual element + * @param nmemb count of elements that can be written to the buffer + * @param ptr source pointer, passed to the libcurl handle + * @return bytes written to stream + */ +static size_t +curl_send_cb (void *stream, size_t size, size_t nmemb, void *ptr) +{ + size_t bytes_sent = 0; + +#if 0 + struct Session *ps = ptr; + struct HTTP_Message *msg = ps->pending_msgs_tail; + + size_t len; + + if (ps->send_active == GNUNET_NO) + return CURL_READFUNC_PAUSE; + if ((ps->pending_msgs_tail == NULL) && (ps->send_active == GNUNET_YES)) + { +#if DEBUG_CONNECTIONS + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Connection %X: No Message to send, pausing connection\n", ps); +#endif + ps->send_active = GNUNET_NO; + return CURL_READFUNC_PAUSE; + } + + GNUNET_assert (msg != NULL); + + /* data to send */ + if (msg->pos < msg->size) + { + /* data fit in buffer */ + if ((msg->size - msg->pos) <= (size * nmemb)) + { + len = (msg->size - msg->pos); + memcpy (stream, &msg->buf[msg->pos], len); + msg->pos += len; + bytes_sent = len; + } + else + { + len = size * nmemb; + memcpy (stream, &msg->buf[msg->pos], len); + msg->pos += len; + bytes_sent = len; + } + } + /* no data to send */ + else + { + bytes_sent = 0; + } + + if (msg->pos == msg->size) + { +#if DEBUG_CONNECTIONS + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Connection %X: Message with %u bytes sent, removing message from queue\n", + ps, msg->pos); +#endif + /* Calling transmit continuation */ + if (NULL != ps->pending_msgs_tail->transmit_cont) + msg->transmit_cont (ps->pending_msgs_tail->transmit_cont_cls, + &(ps->peercontext)->identity, GNUNET_OK); + ps->queue_length_cur -= msg->size; + remove_http_message (ps, msg); + } + +#endif + return bytes_sent; +} int client_connect (struct Session *s) @@ -292,10 +436,10 @@ client_connect (struct Session *s) curl_easy_setopt (s->client_get, CURLOPT_URL, url); //curl_easy_setopt (s->client_get, CURLOPT_HEADERFUNCTION, &curl_get_header_cb); //curl_easy_setopt (s->client_get, CURLOPT_WRITEHEADER, ps); - //curl_easy_setopt (s->client_get, CURLOPT_READFUNCTION, curl_send_cb); - //curl_easy_setopt (s->client_get, CURLOPT_READDATA, ps); - //curl_easy_setopt (s->client_get, CURLOPT_WRITEFUNCTION, curl_receive_cb); - //curl_easy_setopt (s->client_get, CURLOPT_WRITEDATA, ps); + curl_easy_setopt (s->client_get, CURLOPT_READFUNCTION, curl_send_cb); + curl_easy_setopt (s->client_get, CURLOPT_READDATA, s); + curl_easy_setopt (s->client_get, CURLOPT_WRITEFUNCTION, curl_receive_cb); + curl_easy_setopt (s->client_get, CURLOPT_WRITEDATA, s); curl_easy_setopt (s->client_get, CURLOPT_TIMEOUT_MS, (long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value); curl_easy_setopt (s->client_get, CURLOPT_PRIVATE, s); @@ -323,10 +467,10 @@ client_connect (struct Session *s) curl_easy_setopt (s->client_put, CURLOPT_PUT, 1L); //curl_easy_setopt (s->client_put, CURLOPT_HEADERFUNCTION, &curl_put_header_cb); //curl_easy_setopt (s->client_put, CURLOPT_WRITEHEADER, ps); - //curl_easy_setopt (s->client_put, CURLOPT_READFUNCTION, curl_send_cb); - //curl_easy_setopt (s->client_put, CURLOPT_READDATA, ps); - //curl_easy_setopt (s->client_put, CURLOPT_WRITEFUNCTION, curl_receive_cb); - //curl_easy_setopt (s->client_put, CURLOPT_WRITEDATA, ps); + curl_easy_setopt (s->client_put, CURLOPT_READFUNCTION, curl_send_cb); + curl_easy_setopt (s->client_put, CURLOPT_READDATA, s); + curl_easy_setopt (s->client_put, CURLOPT_WRITEFUNCTION, curl_receive_cb); + curl_easy_setopt (s->client_put, CURLOPT_WRITEDATA, s); curl_easy_setopt (s->client_put, CURLOPT_TIMEOUT_MS, (long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value); curl_easy_setopt (s->client_put, CURLOPT_PRIVATE, s); diff --git a/src/transport/plugin_transport_http_server.c b/src/transport/plugin_transport_http_server.c index df164aa4b..43d9171eb 100644 --- a/src/transport/plugin_transport_http_server.c +++ b/src/transport/plugin_transport_http_server.c @@ -292,6 +292,9 @@ server_access_cb (void *cls, struct MHD_Connection *mhd_connection, if (check == GNUNET_NO) goto error; + + plugin->cur_connections++; + #if VERBOSE_SERVER GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "server: New inbound connection from %s with tag %u\n", GNUNET_h2s_full(&(target.hashPubKey)), tag); #endif @@ -495,6 +498,7 @@ server_disconnect_cb (void *cls, struct MHD_Connection *connection, } t = t->next; } + plugin->cur_connections--; if ((s->server_send == NULL) && (s->server_recv == NULL)) { -- 2.25.1