#include "plugin_transport_http.h"
-#if VERBOSE_CLIENT
+#if VERBOSE_CURL
/**
* Function to log curl debug messages with GNUNET_log
* @param curl handle
text[size] = '\n';
text[size + 1] = '\0';
}
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client: %X - %s", cls, text);
+#if BUILD_HTTPS
+ GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
+ "transport-https", "Client: %X - %s", cls, text);
+#else
+ GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
+ "transport-http", "Client: %X - %s", cls, text);
+#endif
}
return 0;
}
#endif
-int
-client_send (struct Session *s, const char *msgbuf, size_t msgbuf_size)
-{
- return GNUNET_OK;
-}
-
/**
* Task performing curl operations
* @param cls plugin as closure
* @return GNUNET_SYSERR for hard failure, GNUNET_OK for ok
*/
static int
-client_schedule (struct Plugin *plugin)
+client_schedule (struct Plugin *plugin, int now)
{
fd_set rs;
fd_set ws;
}
mret = curl_multi_timeout (plugin->client_mh, &to);
if (to == -1)
- timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
+ timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1);
else
timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, to);
+ if (now == GNUNET_YES)
+ timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 1);
+
if (mret != CURLM_OK)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("%s failed at %s:%d: `%s'\n"),
gws = GNUNET_NETWORK_fdset_create ();
GNUNET_NETWORK_fdset_copy_native (grs, &rs, max + 1);
GNUNET_NETWORK_fdset_copy_native (gws, &ws, max + 1);
+
plugin->client_perform_task =
GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
GNUNET_SCHEDULER_NO_TASK,
}
+int
+client_send (struct Session *s, struct HTTP_Message *msg)
+{
+ GNUNET_CONTAINER_DLL_insert (s->msg_head, s->msg_tail, msg);
+
+ if ((s != NULL) && (s->client_put_paused == GNUNET_YES))
+ {
+#if VERBOSE_CLIENT
+ GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, s->plugin->name,
+ "Client: %X was suspended, unpausing\n", s->client_put);
+#endif
+ s->client_put_paused = GNUNET_NO;
+ curl_easy_pause(s->client_put, CURLPAUSE_CONT);
+ }
+
+ client_schedule (s->plugin, GNUNET_YES);
+
+ return GNUNET_OK;
+}
+
+
+
/**
* Task performing curl operations
* @param cls plugin as closure
client_run (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct Plugin *plugin = cls;
- static unsigned int handles_last_run;
int running;
CURLMcode mret;
plugin->client_perform_task = GNUNET_SCHEDULER_NO_TASK;
if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
return;
+
do
{
running = 0;
mret = curl_multi_perform (plugin->client_mh, &running);
- if ((running < handles_last_run) && (running > 0))
- {
- }
- //curl_handle_finished (plugin);
- handles_last_run = running;
+ CURLMsg * msg;
+ int msgs_left;
+ while ((msg = curl_multi_info_read(plugin->client_mh, &msgs_left)))
+ {
+ CURL *easy_h = msg->easy_handle;
+ struct Session *s = NULL;
+ char * d = (char *) s;
+
+
+ //GNUNET_assert (easy_h != NULL);
+ if (easy_h == NULL)
+ {
+ GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name,
+ "Client: connection to ended with reason %i: `%s', %i handles running\n",
+ msg->data.result,
+ curl_easy_strerror(msg->data.result),
+ running);
+ continue;
+ }
+
+ GNUNET_assert (CURLE_OK == curl_easy_getinfo(easy_h, CURLINFO_PRIVATE, &d));
+ s = (struct Session *) d;
+ GNUNET_assert (s != NULL);
+
+ if (msg->msg == CURLMSG_DONE)
+ {
+ GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name,
+ "Client: %X connection to '%s' %s ended with reason %i: `%s'\n",
+ msg->easy_handle, GNUNET_i2s(&s->target),
+ http_plugin_address_to_string (NULL, s->addr, s->addrlen),
+ msg->data.result,
+ curl_easy_strerror(msg->data.result));
+
+ client_disconnect(s);
+ //GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name,"Notifying about ended session to peer `%s' `%s'\n", GNUNET_i2s (&s->target), http_plugin_address_to_string (plugin, s->addr, s->addrlen));
+ notify_session_end (plugin, &s->target, s);
+ }
+ }
}
while (mret == CURLM_CALL_MULTI_PERFORM);
- client_schedule (plugin);
+ client_schedule (plugin, GNUNET_NO);
}
int
int res = GNUNET_OK;
CURLMcode mret;
struct Plugin *plugin = s->plugin;
+ struct HTTP_Message * msg;
+ struct HTTP_Message * t;
+
+
+ if (s->client_put != NULL)
+ {
#if DEBUG_HTTP
- GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR, plugin->name,
- "Deleting outbound session peer `%s'\n",
+ GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name,
+ "Client: %X Deleting outbound PUT session to peer `%s'\n",
+ s->client_put,
GNUNET_i2s (&s->target));
#endif
- mret = curl_multi_remove_handle (plugin->client_mh, s->client_put);
- if (mret != CURLM_OK)
- {
+ mret = curl_multi_remove_handle (plugin->client_mh, s->client_put);
+ if (mret != CURLM_OK)
+ {
+ curl_easy_cleanup (s->client_put);
+ res = GNUNET_SYSERR;
+ GNUNET_break (0);
+ }
curl_easy_cleanup (s->client_put);
- res = GNUNET_SYSERR;
- GNUNET_break (0);
+ s->client_put = NULL;
}
- curl_easy_cleanup (s->client_put);
- mret = curl_multi_remove_handle (plugin->client_mh, s->client_get);
- if (mret != CURLM_OK)
+
+ if (s->recv_wakeup_task != GNUNET_SCHEDULER_NO_TASK)
{
+ GNUNET_SCHEDULER_cancel (s->recv_wakeup_task);
+ s->recv_wakeup_task = GNUNET_SCHEDULER_NO_TASK;
+ }
+
+ if (s->client_get != NULL)
+ {
+#if DEBUG_HTTP
+ GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name,
+ "Client: %X Deleting outbound GET session to peer `%s'\n",
+ s->client_get,
+ GNUNET_i2s (&s->target));
+#endif
+
+ mret = curl_multi_remove_handle (plugin->client_mh, s->client_get);
+ if (mret != CURLM_OK)
+ {
+ curl_easy_cleanup (s->client_get);
+ res = GNUNET_SYSERR;
+ GNUNET_break (0);
+ }
curl_easy_cleanup (s->client_get);
- res = GNUNET_SYSERR;
- GNUNET_break (0);
+ s->client_get = NULL;
}
- curl_easy_cleanup (s->client_get);
+ msg = s->msg_head;
+ while (msg != NULL)
+ {
+ t = msg->next;
+ if (NULL != msg->transmit_cont)
+ msg->transmit_cont (msg->transmit_cont_cls, &s->target, GNUNET_SYSERR);
+ GNUNET_CONTAINER_DLL_remove(s->msg_head, s->msg_tail, msg);
+ GNUNET_free (msg);
+ msg = t;
+ }
+
+ plugin->cur_connections -= 2;
/* Re-schedule since handles have changed */
- if (plugin->client_perform_task!= GNUNET_SCHEDULER_NO_TASK)
+ if (plugin->client_perform_task != GNUNET_SCHEDULER_NO_TASK)
{
GNUNET_SCHEDULER_cancel (plugin->client_perform_task);
plugin->client_perform_task = GNUNET_SCHEDULER_NO_TASK;
}
- plugin->client_perform_task = GNUNET_SCHEDULER_add_now(client_run, plugin);
+ client_schedule (plugin, GNUNET_YES);
return res;
}
+static void
+client_receive_mst_cb (void *cls, void *client,
+ const struct GNUNET_MessageHeader *message)
+{
+ struct Session *s = cls;
+ struct GNUNET_TIME_Relative delay;
+
+ delay = http_plugin_receive (s, &s->target, message, s, s->addr, s->addrlen);
+ s->next_receive = GNUNET_TIME_absolute_add(GNUNET_TIME_absolute_get(), delay);
+
+ if (GNUNET_TIME_absolute_get().abs_value < s->next_receive.abs_value)
+ {
+#if VERBOSE_CLIENT
+ struct Plugin *plugin = s->plugin;
+ GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name,
+ "Client: peer `%s' address `%s' next read delayed for %llu ms\n",
+ GNUNET_i2s (&s->target), GNUNET_a2s (s->addr, s->addrlen), delay);
+#endif
+ }
+}
+static void
+client_wake_up (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct Session *s = cls;
+
+ s->recv_wakeup_task = GNUNET_SCHEDULER_NO_TASK;
+
+ if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
+ return;
+
+ GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, s->plugin->name,
+ "Client: %X Waking up receive handle\n",
+ s->client_get);
+
+ if (s->client_get != NULL)
+ curl_easy_pause(s->client_get, CURLPAUSE_CONT);
+
+}
+
+/**
+* Callback method used with libcurl
+* Method is called when libcurl needs to write data during sending
+* @param stream pointer where to write data
+* @param size size of an individual element
+* @param nmemb count of elements that can be written to the buffer
+* @param ptr destination pointer, passed to the libcurl handle
+* @return bytes read from stream
+*/
+static size_t
+client_receive (void *stream, size_t size, size_t nmemb, void *cls)
+{
+ struct Session *s = cls;
+ struct GNUNET_TIME_Absolute now;
+ size_t len = size * nmemb;
+
+
+#if VERBOSE_CLIENT
+ struct Plugin *plugin = s->plugin;
+ GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name,
+ "Client: Received %Zu bytes from peer `%s'\n",
+ len,
+ GNUNET_i2s (&s->target));
+#endif
+
+ now = GNUNET_TIME_absolute_get();
+ if (now.abs_value < s->next_receive.abs_value)
+ {
+ struct GNUNET_TIME_Absolute now = GNUNET_TIME_absolute_get();
+ struct GNUNET_TIME_Relative delta = GNUNET_TIME_absolute_get_difference(now, s->next_receive);
+#if DEBUG_CLIENT
+ GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name,
+ "Client: %X No inbound bandwidth available! Next read was delayed for %llu ms\n",
+ s->client_get, delta.rel_value);
+#endif
+ if (s->recv_wakeup_task != GNUNET_SCHEDULER_NO_TASK)
+ {
+ GNUNET_SCHEDULER_cancel (s->recv_wakeup_task);
+ s->recv_wakeup_task = GNUNET_SCHEDULER_NO_TASK;
+ }
+ s->recv_wakeup_task = GNUNET_SCHEDULER_add_delayed (delta, &client_wake_up, s);
+ return CURLPAUSE_ALL;
+ }
+
+
+ if (s->msg_tk == NULL)
+ s->msg_tk = GNUNET_SERVER_mst_create (&client_receive_mst_cb, s);
+
+ GNUNET_SERVER_mst_receive (s->msg_tk, s, stream, len, GNUNET_NO,
+ GNUNET_NO);
+
+ return len;
+}
+
+/**
+ * Callback method used with libcurl
+ * Method is called when libcurl needs to read data during sending
+ * @param stream pointer where to write data
+ * @param size size of an individual element
+ * @param nmemb count of elements that can be written to the buffer
+ * @param ptr source pointer, passed to the libcurl handle
+ * @return bytes written to stream, returning 0 will terminate connection!
+ */
+static size_t
+client_send_cb (void *stream, size_t size, size_t nmemb, void *cls)
+{
+ struct Session *s = cls;
+#if VERBOSE_CLIENT
+ struct Plugin *plugin = s->plugin;
+#endif
+ size_t bytes_sent = 0;
+ size_t len;
+
+ struct HTTP_Message *msg = s->msg_head;
+
+ if (msg == NULL)
+ {
+#if VERBOSE_CLIENT
+ GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name,
+ "Client: %X Nothing to send! Suspending PUT handle!\n", s->client_put);
+#endif
+ s->client_put_paused = GNUNET_YES;
+ return CURL_READFUNC_PAUSE;
+ }
+
+ GNUNET_assert (msg != NULL);
+ /* data to send */
+ if (msg->pos < msg->size)
+ {
+ /* data fit in buffer */
+ if ((msg->size - msg->pos) <= (size * nmemb))
+ {
+ len = (msg->size - msg->pos);
+ memcpy (stream, &msg->buf[msg->pos], len);
+ msg->pos += len;
+ bytes_sent = len;
+ }
+ else
+ {
+ len = size * nmemb;
+ memcpy (stream, &msg->buf[msg->pos], len);
+ msg->pos += len;
+ bytes_sent = len;
+ }
+ }
+ /* no data to send */
+ else
+ {
+ GNUNET_assert (0);
+ bytes_sent = 0;
+ }
+
+ if (msg->pos == msg->size)
+ {
+#if VERBOSE_CLIENT
+ GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name,
+ "Client: %X Message with %u bytes sent, removing message from queue\n",
+ s->client_put, msg->size, msg->pos);
+#endif
+ /* Calling transmit continuation */
+ if (NULL != msg->transmit_cont)
+ msg->transmit_cont (msg->transmit_cont_cls, &s->target, GNUNET_OK);
+ GNUNET_CONTAINER_DLL_remove(s->msg_head, s->msg_tail, msg);
+ GNUNET_free (msg);
+ }
+ return bytes_sent;
+}
int
client_connect (struct Session *s)
{
+ struct Plugin *plugin = s->plugin;
int res = GNUNET_OK;
char *url;
CURLMcode mret;
-#if DEBUG_HTTP
- GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR, s->plugin->name,
+#if VERBOSE_CLIENT
+ GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name,
"Initiating outbound session peer `%s'\n",
GNUNET_i2s (&s->target));
#endif
s->inbound = GNUNET_NO;
+ plugin->last_tag++;
/* create url */
- GNUNET_asprintf (&url, "%s://%s/", s->plugin->protocol,
- http_plugin_address_to_string (NULL, s->addr, s->addrlen));
-
-#if DEBUG_HTTP
- GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR, s->plugin->name, "URL `%s'\n", url);
+ GNUNET_asprintf (&url, "%s%s;%u",
+ http_plugin_address_to_string (plugin, s->addr, s->addrlen),
+ GNUNET_h2s_full (&plugin->env->my_identity->hashPubKey),plugin->last_tag);
+#if 0
+ GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name,
+ "URL `%s'\n",
+ url);
#endif
-
/* create get connection */
s->client_get = curl_easy_init ();
-#if VERBOSE_CLIENT
+#if VERBOSE_CURL
curl_easy_setopt (s->client_get, CURLOPT_VERBOSE, 1L);
curl_easy_setopt (s->client_get, CURLOPT_DEBUGFUNCTION, &client_log);
curl_easy_setopt (s->client_get, CURLOPT_DEBUGDATA, s->client_get);
curl_easy_setopt (s->client_get, CURLOPT_URL, url);
//curl_easy_setopt (s->client_get, CURLOPT_HEADERFUNCTION, &curl_get_header_cb);
//curl_easy_setopt (s->client_get, CURLOPT_WRITEHEADER, ps);
- //curl_easy_setopt (s->client_get, CURLOPT_READFUNCTION, curl_send_cb);
- //curl_easy_setopt (s->client_get, CURLOPT_READDATA, ps);
- //curl_easy_setopt (s->client_get, CURLOPT_WRITEFUNCTION, curl_receive_cb);
- //curl_easy_setopt (s->client_get, CURLOPT_WRITEDATA, ps);
+ curl_easy_setopt (s->client_get, CURLOPT_READFUNCTION, client_send_cb);
+ curl_easy_setopt (s->client_get, CURLOPT_READDATA, s);
+ curl_easy_setopt (s->client_get, CURLOPT_WRITEFUNCTION, client_receive);
+ curl_easy_setopt (s->client_get, CURLOPT_WRITEDATA, s);
curl_easy_setopt (s->client_get, CURLOPT_TIMEOUT_MS,
(long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
- //curl_easy_setopt (s->client_get, CURLOPT_PRIVATE, ps);
+ curl_easy_setopt (s->client_get, CURLOPT_PRIVATE, s);
curl_easy_setopt (s->client_get, CURLOPT_CONNECTTIMEOUT_MS,
(long) HTTP_NOT_VALIDATED_TIMEOUT.rel_value);
curl_easy_setopt (s->client_get, CURLOPT_BUFFERSIZE,
/* create put connection */
s->client_put = curl_easy_init ();
-#if VERBOSE_CLIENT
+#if VERBOSE_CURL
curl_easy_setopt (s->client_put, CURLOPT_VERBOSE, 1L);
curl_easy_setopt (s->client_put, CURLOPT_DEBUGFUNCTION, &client_log);
curl_easy_setopt (s->client_put, CURLOPT_DEBUGDATA, s->client_put);
curl_easy_setopt (s->client_put, CURLOPT_PUT, 1L);
//curl_easy_setopt (s->client_put, CURLOPT_HEADERFUNCTION, &curl_put_header_cb);
//curl_easy_setopt (s->client_put, CURLOPT_WRITEHEADER, ps);
- //curl_easy_setopt (s->client_put, CURLOPT_READFUNCTION, curl_send_cb);
- //curl_easy_setopt (s->client_put, CURLOPT_READDATA, ps);
- //curl_easy_setopt (s->client_put, CURLOPT_WRITEFUNCTION, curl_receive_cb);
- //curl_easy_setopt (s->client_put, CURLOPT_WRITEDATA, ps);
+ curl_easy_setopt (s->client_put, CURLOPT_READFUNCTION, client_send_cb);
+ curl_easy_setopt (s->client_put, CURLOPT_READDATA, s);
+ curl_easy_setopt (s->client_put, CURLOPT_WRITEFUNCTION, client_receive);
+ curl_easy_setopt (s->client_put, CURLOPT_WRITEDATA, s);
curl_easy_setopt (s->client_put, CURLOPT_TIMEOUT_MS,
(long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
- //curl_easy_setopt (s->client_put, CURLOPT_PRIVATE, ps);
+ curl_easy_setopt (s->client_put, CURLOPT_PRIVATE, s);
curl_easy_setopt (s->client_put, CURLOPT_CONNECTTIMEOUT_MS,
(long) HTTP_NOT_VALIDATED_TIMEOUT.rel_value);
curl_easy_setopt (s->client_put, CURLOPT_BUFFERSIZE,
GNUNET_free (url);
- mret = curl_multi_add_handle (s->plugin->client_mh, s->client_get);
+ mret = curl_multi_add_handle (plugin->client_mh, s->client_get);
if (mret != CURLM_OK)
{
curl_easy_cleanup (s->client_get);
GNUNET_break (0);
}
- mret = curl_multi_add_handle (s->plugin->client_mh, s->client_put);
+ mret = curl_multi_add_handle (plugin->client_mh, s->client_put);
if (mret != CURLM_OK)
{
- curl_multi_remove_handle (s->plugin->client_mh, s->client_get);
+ curl_multi_remove_handle (plugin->client_mh, s->client_get);
curl_easy_cleanup (s->client_get);
curl_easy_cleanup (s->client_put);
res = GNUNET_SYSERR;
}
/* Perform connect */
- s->plugin->client_perform_task = GNUNET_SCHEDULER_add_now (client_run, s->plugin);
+ plugin->cur_connections += 2;
+
+ /* Re-schedule since handles have changed */
+ if (plugin->client_perform_task!= GNUNET_SCHEDULER_NO_TASK)
+ {
+ GNUNET_SCHEDULER_cancel (plugin->client_perform_task);
+ plugin->client_perform_task = GNUNET_SCHEDULER_NO_TASK;
+ }
+ plugin->client_perform_task = GNUNET_SCHEDULER_add_now (client_run, plugin);
return res;
}