From c2a2296221c174ae68fb82f3373a5594391474cb Mon Sep 17 00:00:00 2001 From: Matthias Wachs Date: Thu, 15 Sep 2011 09:31:18 +0000 Subject: [PATCH] more functionality --- src/transport/plugin_transport_http.h | 10 ++ src/transport/plugin_transport_http_client.c | 125 ++++++++++++++- src/transport/plugin_transport_http_new.c | 26 +++- src/transport/plugin_transport_http_server.c | 151 ++++++++++++++++++- 4 files changed, 305 insertions(+), 7 deletions(-) diff --git a/src/transport/plugin_transport_http.h b/src/transport/plugin_transport_http.h index 84af33a59..3cba2ace0 100644 --- a/src/transport/plugin_transport_http.h +++ b/src/transport/plugin_transport_http.h @@ -114,13 +114,21 @@ struct Plugin int max_connections; + + /* Plugin values */ + + + int cur_connections; + /* * Server handles */ struct MHD_Daemon *server_v4; + GNUNET_SCHEDULER_TaskIdentifier server_v4_task; struct MHD_Daemon *server_v6; + GNUNET_SCHEDULER_TaskIdentifier server_v6_task; char *crypto_init; char *key; @@ -135,6 +143,8 @@ struct Plugin */ CURLM *client_mh; + GNUNET_SCHEDULER_TaskIdentifier client_perform_task; + }; /** diff --git a/src/transport/plugin_transport_http_client.c b/src/transport/plugin_transport_http_client.c index 9a5d4b261..41300285a 100644 --- a/src/transport/plugin_transport_http_client.c +++ b/src/transport/plugin_transport_http_client.c @@ -96,6 +96,116 @@ client_send (struct Session *s, const char *msgbuf, size_t msgbuf_size) return GNUNET_OK; } +/** + * Task performing curl operations + * @param cls plugin as closure + * @param tc gnunet scheduler task context + */ +static void +client_perform (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); + +/** + * Function setting up file descriptors and scheduling task to run + * + * @param plugin plugin as closure + * @return GNUNET_SYSERR for hard failure, GNUNET_OK for ok + */ +static int +client_schedule_next_perform (struct Plugin *plugin) +{ + fd_set rs; + fd_set ws; + fd_set es; + int max; + struct GNUNET_NETWORK_FDSet *grs; + struct GNUNET_NETWORK_FDSet *gws; + long to; + CURLMcode mret; + struct GNUNET_TIME_Relative timeout; + + /* Cancel previous scheduled task */ + if (plugin->client_perform_task!= GNUNET_SCHEDULER_NO_TASK) + { + GNUNET_SCHEDULER_cancel (plugin->client_perform_task); + plugin->client_perform_task = GNUNET_SCHEDULER_NO_TASK; + } + + max = -1; + FD_ZERO (&rs); + FD_ZERO (&ws); + FD_ZERO (&es); + mret = curl_multi_fdset (plugin->client_mh, &rs, &ws, &es, &max); + if (mret != CURLM_OK) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("%s failed at %s:%d: `%s'\n"), + "curl_multi_fdset", __FILE__, __LINE__, + curl_multi_strerror (mret)); + return GNUNET_SYSERR; + } + mret = curl_multi_timeout (plugin->client_mh, &to); + if (to == -1) + timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5); + else + timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, to); + if (mret != CURLM_OK) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("%s failed at %s:%d: `%s'\n"), + "curl_multi_timeout", __FILE__, __LINE__, + curl_multi_strerror (mret)); + return GNUNET_SYSERR; + } + + grs = GNUNET_NETWORK_fdset_create (); + gws = GNUNET_NETWORK_fdset_create (); + GNUNET_NETWORK_fdset_copy_native (grs, &rs, max + 1); + GNUNET_NETWORK_fdset_copy_native (gws, &ws, max + 1); + plugin->client_perform_task = + GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT, + GNUNET_SCHEDULER_NO_TASK, + timeout, + grs, + gws, + &client_perform, + plugin); + GNUNET_NETWORK_fdset_destroy (gws); + GNUNET_NETWORK_fdset_destroy (grs); + return GNUNET_OK; +} + + +/** + * Task performing curl operations + * @param cls plugin as closure + * @param tc gnunet scheduler task context + */ +static void +client_perform (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct Plugin *plugin = cls; + static unsigned int handles_last_run; + int running; + CURLMcode mret; + + GNUNET_assert (cls != NULL); + + plugin->client_perform_task = GNUNET_SCHEDULER_NO_TASK; + if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) + return; + do + { + running = 0; + mret = curl_multi_perform (plugin->client_mh, &running); + if ((running < handles_last_run) && (running > 0)) + { + + } + //curl_handle_finished (plugin); + handles_last_run = running; + } + while (mret == CURLM_CALL_MULTI_PERFORM); + client_schedule_next_perform (plugin); +} + int client_connect (struct Session *s) { @@ -138,10 +248,10 @@ client_connect (struct Session *s) //curl_easy_setopt (s->client_get, CURLOPT_READDATA, ps); //curl_easy_setopt (s->client_get, CURLOPT_WRITEFUNCTION, curl_receive_cb); //curl_easy_setopt (s->client_get, CURLOPT_WRITEDATA, ps); - curl_easy_setopt (s->client_get, CURLOPT_TIMEOUT, + curl_easy_setopt (s->client_get, CURLOPT_TIMEOUT_MS, (long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value); //curl_easy_setopt (s->client_get, CURLOPT_PRIVATE, ps); - curl_easy_setopt (s->client_get, CURLOPT_CONNECTTIMEOUT, + curl_easy_setopt (s->client_get, CURLOPT_CONNECTTIMEOUT_MS, (long) HTTP_NOT_VALIDATED_TIMEOUT.rel_value); curl_easy_setopt (s->client_get, CURLOPT_BUFFERSIZE, 2 * GNUNET_SERVER_MAX_MESSAGE_SIZE); @@ -169,10 +279,10 @@ client_connect (struct Session *s) //curl_easy_setopt (s->client_put, CURLOPT_READDATA, ps); //curl_easy_setopt (s->client_put, CURLOPT_WRITEFUNCTION, curl_receive_cb); //curl_easy_setopt (s->client_put, CURLOPT_WRITEDATA, ps); - curl_easy_setopt (s->client_put, CURLOPT_TIMEOUT, + curl_easy_setopt (s->client_put, CURLOPT_TIMEOUT_MS, (long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value); //curl_easy_setopt (s->client_put, CURLOPT_PRIVATE, ps); - curl_easy_setopt (s->client_put, CURLOPT_CONNECTTIMEOUT, + curl_easy_setopt (s->client_put, CURLOPT_CONNECTTIMEOUT_MS, (long) HTTP_NOT_VALIDATED_TIMEOUT.rel_value); curl_easy_setopt (s->client_put, CURLOPT_BUFFERSIZE, 2 * GNUNET_SERVER_MAX_MESSAGE_SIZE); @@ -201,6 +311,7 @@ client_connect (struct Session *s) } /* Perform connect */ + s->plugin->client_perform_task = GNUNET_SCHEDULER_add_now (client_perform, s->plugin); return res; } @@ -227,6 +338,12 @@ client_start (struct Plugin *plugin) void client_stop (struct Plugin *plugin) { + if (plugin->client_perform_task != GNUNET_SCHEDULER_NO_TASK) + { + GNUNET_SCHEDULER_cancel (plugin->client_perform_task); + plugin->client_perform_task = GNUNET_SCHEDULER_NO_TASK; + } + curl_multi_cleanup (plugin->client_mh); curl_global_cleanup (); } diff --git a/src/transport/plugin_transport_http_new.c b/src/transport/plugin_transport_http_new.c index 3ad7dbac4..8fdce38e3 100644 --- a/src/transport/plugin_transport_http_new.c +++ b/src/transport/plugin_transport_http_new.c @@ -481,9 +481,18 @@ http_plugin_send (void *cls, const struct GNUNET_PeerIdentity *target, /* look for existing connection */ s = lookup_session (plugin, target, addr, addrlen, force_address); - /* create new connection */ + /* create new outbound connection */ if (s == NULL) { + if (plugin->max_connections <= plugin->cur_connections) + { + GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING, plugin->name, + "Maximum number of connections reached, " + "cannot connect to peer `%s'\n", + GNUNET_i2s (target)); + return res; + } + #if DEBUG_HTTP GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR, plugin->name, "Initiiating new connection to peer `%s'\n", @@ -492,7 +501,12 @@ http_plugin_send (void *cls, const struct GNUNET_PeerIdentity *target, s = create_session (plugin, target, addr, addrlen, cont, cont_cls); GNUNET_CONTAINER_DLL_insert (plugin->head, plugin->tail, s); // initiate new connection - client_connect (s); + if (GNUNET_SYSERR == (res = client_connect (s))) + { + GNUNET_CONTAINER_DLL_remove (plugin->head, plugin->tail, s); + delete_session (s); + return GNUNET_SYSERR; + } } else if (s->inbound == GNUNET_NO) res = client_send (s, msgbuf, msgbuf_size); @@ -839,6 +853,14 @@ configure_plugin (struct Plugin *plugin) } plugin->port = port; + /* Optional parameters */ + unsigned long long maxneigh; + if (GNUNET_OK != + GNUNET_CONFIGURATION_get_value_number (plugin->env->cfg, plugin->name, + "MAX_CONNECTIONS", &maxneigh)) + maxneigh = 128; + plugin->max_connections = maxneigh; + return res; } diff --git a/src/transport/plugin_transport_http_server.c b/src/transport/plugin_transport_http_server.c index 96d3e27ae..b74f68b96 100644 --- a/src/transport/plugin_transport_http_server.c +++ b/src/transport/plugin_transport_http_server.c @@ -48,7 +48,12 @@ server_log (void *arg, const char *fmt, va_list ap) static int server_accept_cb (void *cls, const struct sockaddr *addr, socklen_t addr_len) { - return 0; + struct Plugin * plugin = cls; + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "server: server_accept_cb\n"); + if (plugin->cur_connections <= plugin->max_connections) + return MHD_YES; + else + return MHD_NO; } @@ -220,6 +225,7 @@ server_access_cb (void *cls, struct MHD_Connection *mhd_connection, const char *upload_data, size_t * upload_data_size, void **httpSessionCache) { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "server: server_access_cb\n"); return 0; } @@ -227,6 +233,7 @@ static void server_disconnect_cb (void *cls, struct MHD_Connection *connection, void **httpSessionCache) { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "server: server_disconnect_cb\n"); } int @@ -241,6 +248,132 @@ server_send (struct Session *s, const char *msgbuf, size_t msgbuf_size) return GNUNET_OK; } +/** + * Function that queries MHD's select sets and + * starts the task waiting for them. + * @param plugin plugin + * @param daemon_handle the MHD daemon handle + * @return gnunet task identifier + */ +static GNUNET_SCHEDULER_TaskIdentifier +server_schedule_daemon (struct Plugin *plugin, struct MHD_Daemon *daemon_handle); + +/** + * Call MHD IPv4 to process pending requests and then go back + * and schedule the next run. + * @param cls plugin as closure + * @param tc task context + */ +static void +http_server_daemon_v4_run (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct Plugin *plugin = cls; + GNUNET_assert (cls != NULL); + + plugin->server_v4_task = GNUNET_SCHEDULER_NO_TASK; + + if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) + return; + + GNUNET_assert (MHD_YES == MHD_run (plugin->server_v4)); + plugin->server_v4_task = server_schedule_daemon (plugin, plugin->server_v4); +} + + +/** + * Call MHD IPv6 to process pending requests and then go back + * and schedule the next run. + * @param cls plugin as closure + * @param tc task context + */ +static void +http_server_daemon_v6_run (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct Plugin *plugin = cls; + GNUNET_assert (cls != NULL); + + plugin->server_v6_task = GNUNET_SCHEDULER_NO_TASK; + + if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) + return; + + GNUNET_assert (MHD_YES == MHD_run (plugin->server_v6)); + plugin->server_v6_task = server_schedule_daemon (plugin, plugin->server_v6); +} + +/** + * Function that queries MHD's select sets and + * starts the task waiting for them. + * @param plugin plugin + * @param daemon_handle the MHD daemon handle + * @return gnunet task identifier + */ +static GNUNET_SCHEDULER_TaskIdentifier +server_schedule_daemon (struct Plugin *plugin, struct MHD_Daemon *daemon_handle) +{ + GNUNET_SCHEDULER_TaskIdentifier ret; + fd_set rs; + fd_set ws; + fd_set es; + struct GNUNET_NETWORK_FDSet *wrs; + struct GNUNET_NETWORK_FDSet *wws; + struct GNUNET_NETWORK_FDSet *wes; + int max; + unsigned long long timeout; + int haveto; + struct GNUNET_TIME_Relative tv; + + ret = GNUNET_SCHEDULER_NO_TASK; + FD_ZERO (&rs); + FD_ZERO (&ws); + FD_ZERO (&es); + wrs = GNUNET_NETWORK_fdset_create (); + wes = GNUNET_NETWORK_fdset_create (); + wws = GNUNET_NETWORK_fdset_create (); + max = -1; + GNUNET_assert (MHD_YES == MHD_get_fdset (daemon_handle, &rs, &ws, &es, &max)); + haveto = MHD_get_timeout (daemon_handle, &timeout); + if (haveto == MHD_YES) + tv.rel_value = (uint64_t) timeout; + else + tv = GNUNET_TIME_UNIT_SECONDS; + GNUNET_NETWORK_fdset_copy_native (wrs, &rs, max + 1); + GNUNET_NETWORK_fdset_copy_native (wws, &ws, max + 1); + GNUNET_NETWORK_fdset_copy_native (wes, &es, max + 1); + if (daemon_handle == plugin->server_v4) + { + if (plugin->server_v4_task != GNUNET_SCHEDULER_NO_TASK) + { + GNUNET_SCHEDULER_cancel (plugin->server_v4_task); + plugin->server_v4_task = GNUNET_SCHEDULER_NO_TASK; + } + + ret = + GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT, + GNUNET_SCHEDULER_NO_TASK, tv, wrs, wws, + &http_server_daemon_v4_run, plugin); + } + if (daemon_handle == plugin->server_v6) + { + if (plugin->server_v6_task != GNUNET_SCHEDULER_NO_TASK) + { + GNUNET_SCHEDULER_cancel (plugin->server_v6_task); + plugin->server_v6_task = GNUNET_SCHEDULER_NO_TASK; + } + + ret = + GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT, + GNUNET_SCHEDULER_NO_TASK, tv, wrs, wws, + &http_server_daemon_v6_run, plugin); + } + GNUNET_NETWORK_fdset_destroy (wrs); + GNUNET_NETWORK_fdset_destroy (wws); + GNUNET_NETWORK_fdset_destroy (wes); + return ret; +} + int server_start (struct Plugin *plugin) { @@ -334,6 +467,11 @@ server_start (struct Plugin *plugin) res = GNUNET_SYSERR; } + if (plugin->server_v4 != NULL) + plugin->server_v4_task = server_schedule_daemon (plugin, plugin->server_v4); + if (plugin->server_v6 != NULL) + plugin->server_v6_task = server_schedule_daemon (plugin, plugin->server_v6); + #if DEBUG_HTTP GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, "%s server component started on port %u\n", plugin->name, @@ -345,6 +483,17 @@ server_start (struct Plugin *plugin) void server_stop (struct Plugin *plugin) { + if (plugin->server_v4_task != GNUNET_SCHEDULER_NO_TASK) + { + GNUNET_SCHEDULER_cancel (plugin->server_v4_task); + plugin->server_v4_task = GNUNET_SCHEDULER_NO_TASK; + } + + if (plugin->server_v6_task != GNUNET_SCHEDULER_NO_TASK) + { + GNUNET_SCHEDULER_cancel (plugin->server_v6_task); + plugin->server_v6_task = GNUNET_SCHEDULER_NO_TASK; + } if (plugin->server_v4 != NULL) { -- 2.25.1