X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Ftransport%2Fplugin_transport_http_client.c;h=62be281abb93710a5d427d52ee4f2dbbcbda8064;hb=83b19539f4d322b43683f5838b72e9ec2c8e6073;hp=3b3a4705b34bc033fc425b21dade8a321dc26519;hpb=74b34ed9b400f74a7977e268626b85b51acfedd4;p=oweals%2Fgnunet.git diff --git a/src/transport/plugin_transport_http_client.c b/src/transport/plugin_transport_http_client.c index 3b3a4705b..62be281ab 100644 --- a/src/transport/plugin_transport_http_client.c +++ b/src/transport/plugin_transport_http_client.c @@ -51,19 +51,18 @@ client_log (CURL * curl, curl_infotype type, char *data, size_t size, void *cls) text[size] = '\n'; text[size + 1] = '\0'; } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client: %X - %s", cls, text); +#if BUILD_HTTPS + GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "transport-https", + "Client: %X - %s", cls, text); +#else + GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "transport-http", + "Client: %X - %s", cls, text); +#endif } return 0; } #endif -int -client_send (struct Session *s, struct HTTP_Message *msg) -{ - GNUNET_CONTAINER_DLL_insert (s->msg_head, s->msg_tail, msg); - return GNUNET_OK; -} - /** * Task performing curl operations * @param cls plugin as closure @@ -79,7 +78,7 @@ client_run (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); * @return GNUNET_SYSERR for hard failure, GNUNET_OK for ok */ static int -client_schedule (struct Plugin *plugin) +client_schedule (struct Plugin *plugin, int now) { fd_set rs; fd_set ws; @@ -92,7 +91,7 @@ client_schedule (struct Plugin *plugin) struct GNUNET_TIME_Relative timeout; /* Cancel previous scheduled task */ - if (plugin->client_perform_task!= GNUNET_SCHEDULER_NO_TASK) + if (plugin->client_perform_task != GNUNET_SCHEDULER_NO_TASK) { GNUNET_SCHEDULER_cancel (plugin->client_perform_task); plugin->client_perform_task = GNUNET_SCHEDULER_NO_TASK; @@ -112,9 +111,12 @@ client_schedule (struct Plugin *plugin) } mret = curl_multi_timeout (plugin->client_mh, &to); if (to == -1) - timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5); + timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1); else timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, to); + if (now == GNUNET_YES) + timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 1); + if (mret != CURLM_OK) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("%s failed at %s:%d: `%s'\n"), @@ -130,18 +132,36 @@ client_schedule (struct Plugin *plugin) plugin->client_perform_task = GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT, - GNUNET_SCHEDULER_NO_TASK, - timeout, - grs, - gws, - &client_run, - plugin); + GNUNET_SCHEDULER_NO_TASK, timeout, grs, gws, + &client_run, plugin); GNUNET_NETWORK_fdset_destroy (gws); GNUNET_NETWORK_fdset_destroy (grs); return GNUNET_OK; } +int +client_send (struct Session *s, struct HTTP_Message *msg) +{ + GNUNET_CONTAINER_DLL_insert (s->msg_head, s->msg_tail, msg); + + if ((s != NULL) && (s->client_put_paused == GNUNET_YES)) + { +#if VERBOSE_CLIENT + GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, s->plugin->name, + "Client: %X was suspended, unpausing\n", s->client_put); +#endif + s->client_put_paused = GNUNET_NO; + curl_easy_pause (s->client_put, CURLPAUSE_CONT); + } + + client_schedule (s->plugin, GNUNET_YES); + + return GNUNET_OK; +} + + + /** * Task performing curl operations * @param cls plugin as closure @@ -151,7 +171,6 @@ static void client_run (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { struct Plugin *plugin = cls; - static unsigned int handles_last_run; int running; CURLMcode mret; @@ -166,35 +185,49 @@ client_run (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) running = 0; mret = curl_multi_perform (plugin->client_mh, &running); - CURLMsg * msg; + CURLMsg *msg; int msgs_left; - while ((msg = curl_multi_info_read(plugin->client_mh, &msgs_left))) - { - CURL *easy_h = msg->easy_handle; - struct Session *s; - GNUNET_assert (easy_h != NULL); - - GNUNET_assert (CURLE_OK == curl_easy_getinfo(easy_h, CURLINFO_PRIVATE, &s)); - GNUNET_assert (s != NULL); - if (msg->msg == CURLMSG_DONE) - { -#if DEBUG_HTTP - GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, - "Connection to '%s' %s ended\n", GNUNET_i2s(&s->target), http_plugin_address_to_string(plugin, s->addr, s->addrlen)); -#endif - client_disconnect(s); - //GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name,"Notifying about ended session to peer `%s' `%s'\n", GNUNET_i2s (&s->target), http_plugin_address_to_string (plugin, s->addr, s->addrlen)); - if (s->msg_tk != NULL) - GNUNET_SERVER_mst_destroy (s->msg_tk); - notify_session_end (plugin, &s->target, s); - } + while ((msg = curl_multi_info_read (plugin->client_mh, &msgs_left))) + { + CURL *easy_h = msg->easy_handle; + struct Session *s = NULL; + char *d = (char *) s; + + + //GNUNET_assert (easy_h != NULL); + if (easy_h == NULL) + { + GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, + "Client: connection to ended with reason %i: `%s', %i handles running\n", + msg->data.result, + curl_easy_strerror (msg->data.result), running); + continue; + } + + GNUNET_assert (CURLE_OK == + curl_easy_getinfo (easy_h, CURLINFO_PRIVATE, &d)); + s = (struct Session *) d; + GNUNET_assert (s != NULL); + + if (msg->msg == CURLMSG_DONE) + { + GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, + "Client: %X connection to '%s' %s ended with reason %i: `%s'\n", + msg->easy_handle, GNUNET_i2s (&s->target), + http_plugin_address_to_string (NULL, s->addr, + s->addrlen), + msg->data.result, + curl_easy_strerror (msg->data.result)); + + client_disconnect (s); + //GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name,"Notifying about ended session to peer `%s' `%s'\n", GNUNET_i2s (&s->target), http_plugin_address_to_string (plugin, s->addr, s->addrlen)); + notify_session_end (plugin, &s->target, s); + } } - - handles_last_run = running; } while (mret == CURLM_CALL_MULTI_PERFORM); - client_schedule (plugin); + client_schedule (plugin, GNUNET_NO); } int @@ -203,17 +236,19 @@ client_disconnect (struct Session *s) int res = GNUNET_OK; CURLMcode mret; struct Plugin *plugin = s->plugin; - struct HTTP_Message * msg; - struct HTTP_Message * t; + struct HTTP_Message *msg; + struct HTTP_Message *t; + -#if 0 - GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, - "Deleting outbound PUT session to peer `%s'\n", - GNUNET_i2s (&s->target)); -#endif if (s->client_put != NULL) { +#if DEBUG_HTTP + GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, + "Client: %X Deleting outbound PUT session to peer `%s'\n", + s->client_put, GNUNET_i2s (&s->target)); +#endif + mret = curl_multi_remove_handle (plugin->client_mh, s->client_put); if (mret != CURLM_OK) { @@ -225,14 +260,21 @@ client_disconnect (struct Session *s) s->client_put = NULL; } -#if DEBUG_HTTP - GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, - "Deleting outbound GET session to peer `%s'\n", - GNUNET_i2s (&s->target)); -#endif + + if (s->recv_wakeup_task != GNUNET_SCHEDULER_NO_TASK) + { + GNUNET_SCHEDULER_cancel (s->recv_wakeup_task); + s->recv_wakeup_task = GNUNET_SCHEDULER_NO_TASK; + } if (s->client_get != NULL) { +#if DEBUG_HTTP + GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, + "Client: %X Deleting outbound GET session to peer `%s'\n", + s->client_get, GNUNET_i2s (&s->target)); +#endif + mret = curl_multi_remove_handle (plugin->client_mh, s->client_get); if (mret != CURLM_OK) { @@ -250,7 +292,7 @@ client_disconnect (struct Session *s) t = msg->next; if (NULL != msg->transmit_cont) msg->transmit_cont (msg->transmit_cont_cls, &s->target, GNUNET_SYSERR); - GNUNET_CONTAINER_DLL_remove(s->msg_head, s->msg_tail, msg); + GNUNET_CONTAINER_DLL_remove (s->msg_head, s->msg_tail, msg); GNUNET_free (msg); msg = t; } @@ -263,32 +305,53 @@ client_disconnect (struct Session *s) plugin->client_perform_task = GNUNET_SCHEDULER_NO_TASK; } - plugin->client_perform_task = GNUNET_SCHEDULER_add_now(client_run, plugin); + client_schedule (plugin, GNUNET_YES); return res; } static void client_receive_mst_cb (void *cls, void *client, - const struct GNUNET_MessageHeader *message) + const struct GNUNET_MessageHeader *message) { struct Session *s = cls; - struct Plugin *plugin = s->plugin; struct GNUNET_TIME_Relative delay; delay = http_plugin_receive (s, &s->target, message, s, s->addr, s->addrlen); + s->next_receive = + GNUNET_TIME_absolute_add (GNUNET_TIME_absolute_get (), delay); - s->delay = GNUNET_TIME_absolute_add(GNUNET_TIME_absolute_get(), delay); - - if (GNUNET_TIME_absolute_get().abs_value < s->delay.abs_value) + if (GNUNET_TIME_absolute_get ().abs_value < s->next_receive.abs_value) { #if VERBOSE_CLIENT - GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, "Client: peer `%s' address `%s' next read delayed for %llu ms\n", - GNUNET_i2s (&s->target), GNUNET_a2s (s->addr, s->addrlen), delay); + struct Plugin *plugin = s->plugin; + + GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, + "Client: peer `%s' address `%s' next read delayed for %llu ms\n", + GNUNET_i2s (&s->target), GNUNET_a2s (s->addr, s->addrlen), + delay); #endif } } +static void +client_wake_up (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct Session *s = cls; + + s->recv_wakeup_task = GNUNET_SCHEDULER_NO_TASK; + + if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) + return; + + GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, s->plugin->name, + "Client: %X Waking up receive handle\n", s->client_get); + + if (s->client_get != NULL) + curl_easy_pause (s->client_get, CURLPAUSE_CONT); + +} + /** * Callback method used with libcurl * Method is called when libcurl needs to write data during sending @@ -302,30 +365,46 @@ static size_t client_receive (void *stream, size_t size, size_t nmemb, void *cls) { struct Session *s = cls; + struct GNUNET_TIME_Absolute now; + size_t len = size * nmemb; + + +#if VERBOSE_CLIENT struct Plugin *plugin = s->plugin; - if (GNUNET_TIME_absolute_get().abs_value < s->delay.abs_value) + GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, + "Client: Received %Zu bytes from peer `%s'\n", len, + GNUNET_i2s (&s->target)); +#endif + + now = GNUNET_TIME_absolute_get (); + if (now.abs_value < s->next_receive.abs_value) { + struct GNUNET_TIME_Absolute now = GNUNET_TIME_absolute_get (); + struct GNUNET_TIME_Relative delta = + GNUNET_TIME_absolute_get_difference (now, s->next_receive); #if DEBUG_CLIENT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "no inbound bandwidth available! Next read was delayed for %llu ms\n", - s, GNUNET_TIME_absolute_get_difference(s->delay, GNUNET_TIME_absolute_get()).rel_value); + GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, + "Client: %X No inbound bandwidth available! Next read was delayed for %llu ms\n", + s->client_get, delta.rel_value); #endif - return 0; + if (s->recv_wakeup_task != GNUNET_SCHEDULER_NO_TASK) + { + GNUNET_SCHEDULER_cancel (s->recv_wakeup_task); + s->recv_wakeup_task = GNUNET_SCHEDULER_NO_TASK; + } + s->recv_wakeup_task = + GNUNET_SCHEDULER_add_delayed (delta, &client_wake_up, s); + return CURLPAUSE_ALL; } + if (s->msg_tk == NULL) - s->msg_tk = GNUNET_SERVER_mst_create (&client_receive_mst_cb, s); + s->msg_tk = GNUNET_SERVER_mst_create (&client_receive_mst_cb, s); - GNUNET_SERVER_mst_receive (s->msg_tk, s, stream, size * nmemb, GNUNET_NO, - GNUNET_NO); + GNUNET_SERVER_mst_receive (s->msg_tk, s, stream, len, GNUNET_NO, GNUNET_NO); -#if VERBOSE_CLIENT - GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, "Received %u bytes from peer `%s'\n", - size * nmemb, - GNUNET_i2s (&s->target)); -#endif - return (size * nmemb); + return len; } /** @@ -335,32 +414,32 @@ client_receive (void *stream, size_t size, size_t nmemb, void *cls) * @param size size of an individual element * @param nmemb count of elements that can be written to the buffer * @param ptr source pointer, passed to the libcurl handle - * @return bytes written to stream + * @return bytes written to stream, returning 0 will terminate connection! */ static size_t client_send_cb (void *stream, size_t size, size_t nmemb, void *cls) { struct Session *s = cls; - //struct Plugin *plugin = s->plugin; + +#if VERBOSE_CLIENT + struct Plugin *plugin = s->plugin; +#endif size_t bytes_sent = 0; size_t len; struct HTTP_Message *msg = s->msg_head; -/* - if (s->put_paused == GNUNET_NO) - return CURL_READFUNC_PAUSE; - if ((s->msg_head == NULL) && (s->put_paused == GNUNET_YES)) + + if (msg == NULL) { #if VERBOSE_CLIENT - GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, "Suspending handle `%s' `%s'\n", - GNUNET_i2s (&s->target),GNUNET_a2s (s->addr, s->addrlen)); + GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, + "Client: %X Nothing to send! Suspending PUT handle!\n", + s->client_put); #endif - s->put_paused = GNUNET_NO; + s->client_put_paused = GNUNET_YES; return CURL_READFUNC_PAUSE; } -*/ - if (msg == NULL) - return bytes_sent; + GNUNET_assert (msg != NULL); /* data to send */ if (msg->pos < msg->size) @@ -384,20 +463,21 @@ client_send_cb (void *stream, size_t size, size_t nmemb, void *cls) /* no data to send */ else { + GNUNET_assert (0); bytes_sent = 0; } if (msg->pos == msg->size) { -#if DEBUG_CONNECTIONS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Connection %X: Message with %u bytes sent, removing message from queue\n", - s, msg->pos); +#if VERBOSE_CLIENT + GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, + "Client: %X Message with %u bytes sent, removing message from queue\n", + s->client_put, msg->size, msg->pos); #endif /* Calling transmit continuation */ if (NULL != msg->transmit_cont) msg->transmit_cont (msg->transmit_cont_cls, &s->target, GNUNET_OK); - GNUNET_CONTAINER_DLL_remove(s->msg_head, s->msg_tail, msg); + GNUNET_CONTAINER_DLL_remove (s->msg_head, s->msg_tail, msg); GNUNET_free (msg); } return bytes_sent; @@ -421,11 +501,12 @@ client_connect (struct Session *s) plugin->last_tag++; /* create url */ - GNUNET_asprintf (&url, "%s%s;%u", http_plugin_address_to_string (plugin, s->addr, s->addrlen), GNUNET_h2s_full (&plugin->env->my_identity->hashPubKey),plugin->last_tag); + GNUNET_asprintf (&url, "%s%s;%u", + http_plugin_address_to_string (plugin, s->addr, s->addrlen), + GNUNET_h2s_full (&plugin->env->my_identity->hashPubKey), + plugin->last_tag); #if 0 - GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, - "URL `%s'\n", - url); + GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, "URL `%s'\n", url); #endif /* create get connection */ s->client_get = curl_easy_init (); @@ -512,7 +593,7 @@ client_connect (struct Session *s) plugin->cur_connections += 2; /* Re-schedule since handles have changed */ - if (plugin->client_perform_task!= GNUNET_SCHEDULER_NO_TASK) + if (plugin->client_perform_task != GNUNET_SCHEDULER_NO_TASK) { GNUNET_SCHEDULER_cancel (plugin->client_perform_task); plugin->client_perform_task = GNUNET_SCHEDULER_NO_TASK;