text[size] = '\n';
text[size + 1] = '\0';
}
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client: %X - %s", cls, text);
+#if BUILD_HTTPS
+ GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "transport-https", "Client: %X - %s", cls, text);
+#else
+ GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "transport-http", "Client: %X - %s", cls, text);
+#endif
}
return 0;
}
#endif
-int
-client_send (struct Session *s, struct HTTP_Message *msg)
-{
- GNUNET_CONTAINER_DLL_insert (s->msg_head, s->msg_tail, msg);
- return GNUNET_OK;
-}
-
/**
* Task performing curl operations
* @param cls plugin as closure
}
+int
+client_send (struct Session *s, struct HTTP_Message *msg)
+{
+ GNUNET_CONTAINER_DLL_insert (s->msg_head, s->msg_tail, msg);
+
+ if ((s != NULL) && (s->client_put_paused == GNUNET_YES))
+ {
+#if VERBOSE_CLIENT
+ GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, s->plugin->name, "Client: %X was suspended, unpausing\n", s->client_put);
+#endif
+ s->client_put_paused = GNUNET_NO;
+ curl_easy_pause(s->client_put, CURLPAUSE_CONT);
+ }
+
+ client_schedule (s->plugin);
+
+ return GNUNET_OK;
+}
+
+
+
/**
* Task performing curl operations
* @param cls plugin as closure
{
#if DEBUG_HTTP
GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name,
- "Connection to '%s' %s ended\n", GNUNET_i2s(&s->target), GNUNET_a2s (s->addr, s->addrlen));
+ "Client: %X connection to '%s' %s ended\n", msg->easy_handle, GNUNET_i2s(&s->target), GNUNET_a2s (s->addr, s->addrlen));
#endif
client_disconnect(s);
//GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name,"Notifying about ended session to peer `%s' `%s'\n", GNUNET_i2s (&s->target), http_plugin_address_to_string (plugin, s->addr, s->addrlen));
struct HTTP_Message * msg;
struct HTTP_Message * t;
+
+
+ if (s->client_put != NULL)
+ {
#if DEBUG_HTTP
GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name,
- "Client: Deleting outbound PUT session to peer `%s'\n",
+ "Client: %X Deleting outbound PUT session to peer `%s'\n",
+ s->client_put,
GNUNET_i2s (&s->target));
#endif
- if (s->client_put != NULL)
- {
mret = curl_multi_remove_handle (plugin->client_mh, s->client_put);
if (mret != CURLM_OK)
{
s->client_put = NULL;
}
+
+ if (s->recv_wakeup_task != GNUNET_SCHEDULER_NO_TASK)
+ {
+ GNUNET_SCHEDULER_cancel (s->recv_wakeup_task);
+ s->recv_wakeup_task = GNUNET_SCHEDULER_NO_TASK;
+ }
+
+ if (s->client_get != NULL)
+ {
#if DEBUG_HTTP
GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name,
- "Client: Deleting outbound GET session to peer `%s'\n",
+ "Client: %X Deleting outbound GET session to peer `%s'\n",
+ s->client_get,
GNUNET_i2s (&s->target));
#endif
- if (s->client_get != NULL)
- {
mret = curl_multi_remove_handle (plugin->client_mh, s->client_get);
if (mret != CURLM_OK)
{
struct GNUNET_TIME_Relative delay;
delay = http_plugin_receive (s, &s->target, message, s, s->addr, s->addrlen);
+ GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, "CLIENT: CLIENT DELAY %llu ms\n",
+ delay.rel_value);
- s->delay = GNUNET_TIME_absolute_add(GNUNET_TIME_absolute_get(), delay);
- if (GNUNET_TIME_absolute_get().abs_value < s->delay.abs_value)
+ s->next_receive = GNUNET_TIME_absolute_add(GNUNET_TIME_absolute_get(), delay);
+
+ if (GNUNET_TIME_absolute_get().abs_value < s->next_receive.abs_value)
{
#if VERBOSE_CLIENT
GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, "Client: peer `%s' address `%s' next read delayed for %llu ms\n",
#endif
}
}
+static void
+client_wake_up (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct Session *s = cls;
+
+ s->recv_wakeup_task = GNUNET_SCHEDULER_NO_TASK;
+ if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
+ return;
+
+ if (s->client_get != NULL)
+ curl_easy_pause(s->client_get, CURLPAUSE_CONT);
+}
/**
* Callback method used with libcurl
client_receive (void *stream, size_t size, size_t nmemb, void *cls)
{
struct Session *s = cls;
- struct Plugin *plugin = s->plugin;
+ struct GNUNET_TIME_Absolute now;
+ size_t len = size * nmemb;
+
#if VERBOSE_CLIENT
+ struct Plugin *plugin = s->plugin;
GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, "Client: Received %Zu bytes from peer `%s'\n",
- size * nmemb,
+ len,
GNUNET_i2s (&s->target));
#endif
- if (GNUNET_TIME_absolute_get().abs_value < s->delay.abs_value)
+ now = GNUNET_TIME_absolute_get();
+ if (now.abs_value < s->next_receive.abs_value)
{
#if DEBUG_CLIENT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "no inbound bandwidth available! Next read was delayed for %llu ms\n",
- s, GNUNET_TIME_absolute_get_difference(s->delay, GNUNET_TIME_absolute_get()).rel_value);
+ "No inbound bandwidth available! Next read was delayed for %llu ms\n",
+ s, GNUNET_TIME_absolute_get_difference(s->next_receive, GNUNET_TIME_absolute_get()).rel_value);
+#endif
+#if 0
+ if (s->recv_wakeup_task != GNUNET_SCHEDULER_NO_TASK)
+ {
+ GNUNET_SCHEDULER_cancel (s->recv_wakeup_task);
+ s->recv_wakeup_task = GNUNET_SCHEDULER_NO_TASK;
+ }
+ s->recv_wakeup_task = GNUNET_SCHEDULER_add_delayed( GNUNET_TIME_absolute_get_difference(s->next_receive, now), &client_wake_up, s);
+ return CURLPAUSE_ALL;
#endif
- return 0;
}
if (s->msg_tk == NULL)
s->msg_tk = GNUNET_SERVER_mst_create (&client_receive_mst_cb, s);
- GNUNET_SERVER_mst_receive (s->msg_tk, s, stream, size * nmemb, GNUNET_NO,
+ GNUNET_SERVER_mst_receive (s->msg_tk, s, stream, len, GNUNET_NO,
GNUNET_NO);
- return (size * nmemb);
+ return len;
+
+ client_wake_up (NULL, NULL);
}
/**
* @param size size of an individual element
* @param nmemb count of elements that can be written to the buffer
* @param ptr source pointer, passed to the libcurl handle
- * @return bytes written to stream
+ * @return bytes written to stream, returning 0 will terminate connection!
*/
static size_t
client_send_cb (void *stream, size_t size, size_t nmemb, void *cls)
{
struct Session *s = cls;
- //struct Plugin *plugin = s->plugin;
+#if VERBOSE_CLIENT
+ struct Plugin *plugin = s->plugin;
+#endif
size_t bytes_sent = 0;
size_t len;
struct HTTP_Message *msg = s->msg_head;
-/*
- if (s->put_paused == GNUNET_NO)
- return CURL_READFUNC_PAUSE;
- if ((s->msg_head == NULL) && (s->put_paused == GNUNET_YES))
+
+ if (msg == NULL)
{
#if VERBOSE_CLIENT
- GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, "Suspending handle `%s' `%s'\n",
- GNUNET_i2s (&s->target),GNUNET_a2s (s->addr, s->addrlen));
+ GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, "Client: %X Nothing to send! Suspending PUT handle!\n", s->client_put);
#endif
- s->put_paused = GNUNET_NO;
+ s->client_put_paused = GNUNET_YES;
return CURL_READFUNC_PAUSE;
}
-*/
- if (msg == NULL)
- return bytes_sent;
+
GNUNET_assert (msg != NULL);
/* data to send */
if (msg->pos < msg->size)
/* no data to send */
else
{
+ GNUNET_assert (0);
bytes_sent = 0;
}
if (msg->pos == msg->size)
{
#if VERBOSE_CLIENT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Message with %u bytes sent, removing message from queue\n",
- s, msg->pos);
+ GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name,
+ "Client: %X Message with %u bytes sent, removing message from queue\n",
+ s->client_put, msg->size, msg->pos);
#endif
/* Calling transmit continuation */
if (NULL != msg->transmit_cont)
plugin->last_tag++;
/* create url */
GNUNET_asprintf (&url, "%s%s;%u", http_plugin_address_to_string (plugin, s->addr, s->addrlen), GNUNET_h2s_full (&plugin->env->my_identity->hashPubKey),plugin->last_tag);
- //GNUNET_asprintf (&url, "http://www.heise.de", http_plugin_address_to_string (plugin, s->addr, s->addrlen), GNUNET_h2s_full (&plugin->env->my_identity->hashPubKey),plugin->last_tag);
#if 0
GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name,
"URL `%s'\n",