working version
[oweals/gnunet.git] / src / transport / plugin_transport_http_client.c
index 715125dd0f2b9c19841cb32f7c361749d24ec0ea..bc2051eef16a1d9fb9901d5eb6b1a213019aeea5 100644 (file)
@@ -51,19 +51,16 @@ client_log (CURL * curl, curl_infotype type, char *data, size_t size, void *cls)
       text[size] = '\n';
       text[size + 1] = '\0';
     }
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client: %X - %s", cls, text);
+#if BUILD_HTTPS
+    GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "transport-https", "Client: %X - %s", cls, text);
+#else
+    GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "transport-http", "Client: %X - %s", cls, text);
+#endif
   }
   return 0;
 }
 #endif
 
-int
-client_send (struct Session *s, struct HTTP_Message *msg)
-{
-  GNUNET_CONTAINER_DLL_insert (s->msg_head, s->msg_tail, msg);
-  return GNUNET_OK;
-}
-
 /**
  * Task performing curl operations
  * @param cls plugin as closure
@@ -142,6 +139,27 @@ client_schedule (struct Plugin *plugin)
 }
 
 
+int
+client_send (struct Session *s, struct HTTP_Message *msg)
+{
+  GNUNET_CONTAINER_DLL_insert (s->msg_head, s->msg_tail, msg);
+
+  if ((s != NULL) && (s->client_put_paused == GNUNET_YES))
+  {
+#if VERBOSE_CLIENT
+    GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, s->plugin->name, "Client: %X was suspended, unpausing\n", s->client_put);
+#endif
+    s->client_put_paused = GNUNET_NO;
+    curl_easy_pause(s->client_put, CURLPAUSE_CONT);
+  }
+
+  client_schedule (s->plugin);
+
+  return GNUNET_OK;
+}
+
+
+
 /**
  * Task performing curl operations
  * @param cls plugin as closure
@@ -183,7 +201,7 @@ client_run (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
        {
 #if DEBUG_HTTP
          GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name,
-                   "Connection to '%s'  %s ended\n", GNUNET_i2s(&s->target), GNUNET_a2s (s->addr, s->addrlen));
+                   "Client: %X connection to '%s'  %s ended\n", msg->easy_handle, GNUNET_i2s(&s->target), GNUNET_a2s (s->addr, s->addrlen));
 #endif
          client_disconnect(s);
          //GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name,"Notifying about ended session to peer `%s' `%s'\n", GNUNET_i2s (&s->target), http_plugin_address_to_string (plugin, s->addr, s->addrlen));
@@ -208,14 +226,17 @@ client_disconnect (struct Session *s)
   struct HTTP_Message * msg;
   struct HTTP_Message * t;
 
+
+
+  if (s->client_put != NULL)
+  {
 #if DEBUG_HTTP
   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name,
-                   "Client: Deleting outbound PUT session to peer `%s'\n",
+                   "Client: %X Deleting outbound PUT session to peer `%s'\n",
+                   s->client_put,
                    GNUNET_i2s (&s->target));
 #endif
 
-  if (s->client_put != NULL)
-  {
     mret = curl_multi_remove_handle (plugin->client_mh, s->client_put);
     if (mret != CURLM_OK)
     {
@@ -227,14 +248,22 @@ client_disconnect (struct Session *s)
     s->client_put = NULL;
   }
 
+
+  if (s->recv_wakeup_task != GNUNET_SCHEDULER_NO_TASK)
+  {
+   GNUNET_SCHEDULER_cancel (s->recv_wakeup_task);
+   s->recv_wakeup_task = GNUNET_SCHEDULER_NO_TASK;
+  }
+
+  if (s->client_get != NULL)
+  {
 #if DEBUG_HTTP
   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name,
-                   "Client: Deleting outbound GET session to peer `%s'\n",
+                   "Client: %X Deleting outbound GET session to peer `%s'\n",
+                   s->client_get,
                    GNUNET_i2s (&s->target));
 #endif
 
-  if (s->client_get != NULL)
-  {
     mret = curl_multi_remove_handle (plugin->client_mh, s->client_get);
     if (mret != CURLM_OK)
     {
@@ -279,10 +308,13 @@ client_receive_mst_cb (void *cls, void *client,
   struct GNUNET_TIME_Relative delay;
 
   delay = http_plugin_receive (s, &s->target, message, s, s->addr, s->addrlen);
+  GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, "CLIENT: CLIENT DELAY %llu ms\n",
+              delay.rel_value);
 
-  s->delay = GNUNET_TIME_absolute_add(GNUNET_TIME_absolute_get(), delay);
 
-  if (GNUNET_TIME_absolute_get().abs_value < s->delay.abs_value)
+  s->next_receive = GNUNET_TIME_absolute_add(GNUNET_TIME_absolute_get(), delay);
+
+  if (GNUNET_TIME_absolute_get().abs_value < s->next_receive.abs_value)
   {
 #if VERBOSE_CLIENT
     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, "Client: peer `%s' address `%s' next read delayed for %llu ms\n",
@@ -290,6 +322,18 @@ client_receive_mst_cb (void *cls, void *client,
 #endif
   }
 }
+static void
+client_wake_up (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct Session *s = cls;
+
+  s->recv_wakeup_task = GNUNET_SCHEDULER_NO_TASK;
+  if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
+    return;
+
+  if (s->client_get != NULL)
+    curl_easy_pause(s->client_get, CURLPAUSE_CONT);
+}
 
 /**
 * Callback method used with libcurl
@@ -304,32 +348,46 @@ static size_t
 client_receive (void *stream, size_t size, size_t nmemb, void *cls)
 {
   struct Session *s = cls;
-  struct Plugin *plugin = s->plugin;
+  struct GNUNET_TIME_Absolute now;
+  size_t len = size * nmemb;
+
 
 #if VERBOSE_CLIENT
+  struct Plugin *plugin = s->plugin;
   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, "Client: Received %Zu bytes from peer `%s'\n",
-                   size * nmemb,
+                   len,
                    GNUNET_i2s (&s->target));
 #endif
 
-  if (GNUNET_TIME_absolute_get().abs_value < s->delay.abs_value)
+  now = GNUNET_TIME_absolute_get();
+  if (now.abs_value < s->next_receive.abs_value)
   {
 #if DEBUG_CLIENT
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "no inbound bandwidth available! Next read was delayed for  %llu ms\n",
-                s, GNUNET_TIME_absolute_get_difference(s->delay, GNUNET_TIME_absolute_get()).rel_value);
+                "No inbound bandwidth available! Next read was delayed for  %llu ms\n",
+                s, GNUNET_TIME_absolute_get_difference(s->next_receive, GNUNET_TIME_absolute_get()).rel_value);
+#endif
+#if 0
+    if (s->recv_wakeup_task != GNUNET_SCHEDULER_NO_TASK)
+    {
+      GNUNET_SCHEDULER_cancel (s->recv_wakeup_task);
+      s->recv_wakeup_task = GNUNET_SCHEDULER_NO_TASK;
+    }
+    s->recv_wakeup_task = GNUNET_SCHEDULER_add_delayed( GNUNET_TIME_absolute_get_difference(s->next_receive, now), &client_wake_up, s);
+    return CURLPAUSE_ALL;
 #endif
-    return 0;
   }
 
 
   if (s->msg_tk == NULL)
       s->msg_tk = GNUNET_SERVER_mst_create (&client_receive_mst_cb, s);
 
-  GNUNET_SERVER_mst_receive (s->msg_tk, s, stream, size * nmemb, GNUNET_NO,
+  GNUNET_SERVER_mst_receive (s->msg_tk, s, stream, len, GNUNET_NO,
                              GNUNET_NO);
 
-  return (size * nmemb);
+  return len;
+
+  client_wake_up (NULL, NULL);
 }
 
 /**
@@ -339,32 +397,29 @@ client_receive (void *stream, size_t size, size_t nmemb, void *cls)
  * @param size size of an individual element
  * @param nmemb count of elements that can be written to the buffer
  * @param ptr source pointer, passed to the libcurl handle
- * @return bytes written to stream
+ * @return bytes written to stream, returning 0 will terminate connection!
  */
 static size_t
 client_send_cb (void *stream, size_t size, size_t nmemb, void *cls)
 {
   struct Session *s = cls;
-  //struct Plugin *plugin = s->plugin;
+#if VERBOSE_CLIENT
+  struct Plugin *plugin = s->plugin;
+#endif
   size_t bytes_sent = 0;
   size_t len;
 
   struct HTTP_Message *msg = s->msg_head;
-/*
-  if (s->put_paused == GNUNET_NO)
-    return CURL_READFUNC_PAUSE;
-  if ((s->msg_head == NULL) && (s->put_paused == GNUNET_YES))
+
+  if (msg == NULL)
   {
 #if VERBOSE_CLIENT
-    GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, "Suspending handle `%s' `%s'\n",
-                     GNUNET_i2s (&s->target),GNUNET_a2s (s->addr, s->addrlen));
+    GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, "Client: %X Nothing to send! Suspending PUT handle!\n", s->client_put);
 #endif
-    s->put_paused = GNUNET_NO;
+    s->client_put_paused = GNUNET_YES;
     return CURL_READFUNC_PAUSE;
   }
-*/
-  if (msg == NULL)
-    return bytes_sent;
+
   GNUNET_assert (msg != NULL);
   /* data to send */
   if (msg->pos < msg->size)
@@ -388,15 +443,16 @@ client_send_cb (void *stream, size_t size, size_t nmemb, void *cls)
   /* no data to send */
   else
   {
+    GNUNET_assert (0);
     bytes_sent = 0;
   }
 
   if (msg->pos == msg->size)
   {
 #if VERBOSE_CLIENT
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Message with %u bytes sent, removing message from queue\n",
-                s, msg->pos);
+    GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name,
+                "Client: %X Message with %u bytes sent, removing message from queue\n",
+                s->client_put, msg->size, msg->pos);
 #endif
     /* Calling transmit continuation  */
     if (NULL != msg->transmit_cont)
@@ -426,7 +482,6 @@ client_connect (struct Session *s)
   plugin->last_tag++;
   /* create url */
   GNUNET_asprintf (&url, "%s%s;%u", http_plugin_address_to_string (plugin, s->addr, s->addrlen), GNUNET_h2s_full (&plugin->env->my_identity->hashPubKey),plugin->last_tag);
-  //GNUNET_asprintf (&url, "http://www.heise.de", http_plugin_address_to_string (plugin, s->addr, s->addrlen), GNUNET_h2s_full (&plugin->env->my_identity->hashPubKey),plugin->last_tag);
 #if 0
   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name,
                    "URL `%s'\n",