#define DEBUG_HTTP GNUNET_YES
#define VERBOSE_SERVER GNUNET_YES
#define VERBOSE_CLIENT GNUNET_YES
+#define VERBOSE_CURL GNUNET_NO
#if BUILD_HTTPS
#define LIBGNUNET_PLUGIN_TRANSPORT_INIT libgnunet_plugin_transport_https_init
*/
struct Plugin *plugin;
+ /**
+ * next pointer for double linked list
+ */
+ struct HTTP_Message *msg_head;
+
+ /**
+ * previous pointer for double linked list
+ */
+ struct HTTP_Message *msg_tail;
+
+
/**
* message stream tokenizer for incoming data
*/
void *client_put;
void *client_get;
+ int put_paused;
void *server_recv;
void *server_send;
};
+/**
+ * Message to send using http
+ */
+struct HTTP_Message
+{
+ /**
+ * next pointer for double linked list
+ */
+ struct HTTP_Message *next;
+
+ /**
+ * previous pointer for double linked list
+ */
+ struct HTTP_Message *prev;
+
+ /**
+ * buffer containing data to send
+ */
+ char *buf;
+
+ /**
+ * amount of data already sent
+ */
+ size_t pos;
+
+ /**
+ * buffer length
+ */
+ size_t size;
+
+ /**
+ * Continuation function to call once the transmission buffer
+ * has again space available. NULL if there is no
+ * continuation to call.
+ */
+ GNUNET_TRANSPORT_TransmitContinuation transmit_cont;
+
+ /**
+ * Closure for transmit_cont.
+ */
+ void *transmit_cont_cls;
+};
+
void
delete_session (struct Session *s);
const void *addr, size_t addrlen,
GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls);
+struct GNUNET_TIME_Relative
+http_plugin_receive (void *cls, const struct GNUNET_PeerIdentity * peer,
+ const struct GNUNET_MessageHeader * message,
+ struct Session * session,
+ const char *sender_address,
+ uint16_t sender_address_len);
+
const char *
http_plugin_address_to_string (void *cls, const void *addr, size_t addrlen);
client_connect (struct Session *s);
int
-client_send (struct Session *s, const char *msgbuf, size_t msgbuf_size);
+client_send (struct Session *s, struct HTTP_Message *msg);
int
client_start (struct Plugin *plugin);
server_disconnect (struct Session *s);
int
-server_send (struct Session *s, const char *msgbuf, size_t msgbuf_size);
+server_send (struct Session *s, struct HTTP_Message * msg);
int
server_start (struct Plugin *plugin);
#include "plugin_transport_http.h"
-#if VERBOSE_CLIENT
+#if VERBOSE_CURL
/**
* Function to log curl debug messages with GNUNET_log
* @param curl handle
#endif
int
-client_send (struct Session *s, const char *msgbuf, size_t msgbuf_size)
+client_send (struct Session *s, struct HTTP_Message *msg)
{
+ GNUNET_CONTAINER_DLL_insert (s->msg_head, s->msg_tail, msg);
return GNUNET_OK;
}
"Connection to '%s' %s ended\n", GNUNET_i2s(&s->target), http_plugin_address_to_string(plugin, s->addr, s->addrlen));
#endif
client_disconnect(s);
- GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name,"Notifying about ended session to peer `%s' `%s'\n", GNUNET_i2s (&s->target), http_plugin_address_to_string (plugin, s->addr, s->addrlen));
+ //GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name,"Notifying about ended session to peer `%s' `%s'\n", GNUNET_i2s (&s->target), http_plugin_address_to_string (plugin, s->addr, s->addrlen));
if (s->msg_tk != NULL)
GNUNET_SERVER_mst_destroy (s->msg_tk);
notify_session_end (plugin, &s->target, s);
int res = GNUNET_OK;
CURLMcode mret;
struct Plugin *plugin = s->plugin;
+ struct HTTP_Message * msg;
+ struct HTTP_Message * t;
#if 0
GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name,
s->client_get = NULL;
}
+ msg = s->msg_head;
+ while (msg != NULL)
+ {
+ t = msg->next;
+ if (NULL != msg->transmit_cont)
+ msg->transmit_cont (msg->transmit_cont_cls, &s->target, GNUNET_SYSERR);
+ GNUNET_CONTAINER_DLL_remove(s->msg_head, s->msg_tail, msg);
+ GNUNET_free (msg);
+ msg = t;
+ }
+
plugin->cur_connections -= 2;
/* Re-schedule since handles have changed */
if (plugin->client_perform_task != GNUNET_SCHEDULER_NO_TASK)
}
static void
-curl_receive_mst_cb (void *cls, void *client,
+client_receive_mst_cb (void *cls, void *client,
const struct GNUNET_MessageHeader *message)
{
struct Session *s = cls;
struct Plugin *plugin = s->plugin;
- struct GNUNET_TRANSPORT_ATS_Information distance[2];
struct GNUNET_TIME_Relative delay;
- distance[0].type = htonl (GNUNET_TRANSPORT_ATS_QUALITY_NET_DISTANCE);
- distance[0].value = htonl (1);
- distance[1].type = htonl (GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR);
- distance[1].value = htonl (0);
+ delay = http_plugin_receive (s, &s->target, message, s, s->addr, s->addrlen);
- delay = plugin->env->receive (plugin->env->cls, &s->target, message, (const struct GNUNET_TRANSPORT_ATS_Information*) &distance, 2, s, s->addr, s->addrlen);
s->delay = GNUNET_TIME_absolute_add(GNUNET_TIME_absolute_get(), delay);
if (GNUNET_TIME_absolute_get().abs_value < s->delay.abs_value)
* @return bytes read from stream
*/
static size_t
-curl_receive_cb (void *stream, size_t size, size_t nmemb, void *cls)
+client_receive (void *stream, size_t size, size_t nmemb, void *cls)
{
struct Session *s = cls;
struct Plugin *plugin = s->plugin;
if (GNUNET_TIME_absolute_get().abs_value < s->delay.abs_value)
{
-#if DEBUG_HTTP
+#if DEBUG_CLIENT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Connection %X: no inbound bandwidth available! Next read was delayed for %llu ms\n",
+ "no inbound bandwidth available! Next read was delayed for %llu ms\n",
s, GNUNET_TIME_absolute_get_difference(s->delay, GNUNET_TIME_absolute_get()).rel_value);
#endif
return 0;
}
if (s->msg_tk == NULL)
- s->msg_tk = GNUNET_SERVER_mst_create (&curl_receive_mst_cb, s);
+ s->msg_tk = GNUNET_SERVER_mst_create (&client_receive_mst_cb, s);
GNUNET_SERVER_mst_receive (s->msg_tk, s, stream, size * nmemb, GNUNET_NO,
GNUNET_NO);
* @return bytes written to stream
*/
static size_t
-curl_send_cb (void *stream, size_t size, size_t nmemb, void *ptr)
+client_send_cb (void *stream, size_t size, size_t nmemb, void *cls)
{
+ struct Session *s = cls;
+ //struct Plugin *plugin = s->plugin;
size_t bytes_sent = 0;
-
-#if 0
- struct Session *ps = ptr;
- struct HTTP_Message *msg = ps->pending_msgs_tail;
-
size_t len;
- if (ps->send_active == GNUNET_NO)
+ struct HTTP_Message *msg = s->msg_head;
+/*
+ if (s->put_paused == GNUNET_NO)
return CURL_READFUNC_PAUSE;
- if ((ps->pending_msgs_tail == NULL) && (ps->send_active == GNUNET_YES))
+ if ((s->msg_head == NULL) && (s->put_paused == GNUNET_YES))
{
-#if DEBUG_CONNECTIONS
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Connection %X: No Message to send, pausing connection\n", ps);
+#if VERBOSE_CLIENT
+ GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, "Suspending handle `%s' `%s'\n",
+ GNUNET_i2s (&s->target),GNUNET_a2s (s->addr, s->addrlen));
#endif
- ps->send_active = GNUNET_NO;
+ s->put_paused = GNUNET_NO;
return CURL_READFUNC_PAUSE;
}
-
+*/
+ if (msg == NULL)
+ return bytes_sent;
GNUNET_assert (msg != NULL);
-
/* data to send */
if (msg->pos < msg->size)
{
#if DEBUG_CONNECTIONS
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Connection %X: Message with %u bytes sent, removing message from queue\n",
- ps, msg->pos);
+ s, msg->pos);
#endif
/* Calling transmit continuation */
- if (NULL != ps->pending_msgs_tail->transmit_cont)
- msg->transmit_cont (ps->pending_msgs_tail->transmit_cont_cls,
- &(ps->peercontext)->identity, GNUNET_OK);
- ps->queue_length_cur -= msg->size;
- remove_http_message (ps, msg);
+ if (NULL != msg->transmit_cont)
+ msg->transmit_cont (msg->transmit_cont_cls, &s->target, GNUNET_OK);
+ GNUNET_CONTAINER_DLL_remove(s->msg_head, s->msg_tail, msg);
+ GNUNET_free (msg);
}
-
-#endif
return bytes_sent;
}
#endif
/* create get connection */
s->client_get = curl_easy_init ();
-#if VERBOSE_CLIENT
+#if VERBOSE_CURL
curl_easy_setopt (s->client_get, CURLOPT_VERBOSE, 1L);
curl_easy_setopt (s->client_get, CURLOPT_DEBUGFUNCTION, &client_log);
curl_easy_setopt (s->client_get, CURLOPT_DEBUGDATA, s->client_get);
curl_easy_setopt (s->client_get, CURLOPT_URL, url);
//curl_easy_setopt (s->client_get, CURLOPT_HEADERFUNCTION, &curl_get_header_cb);
//curl_easy_setopt (s->client_get, CURLOPT_WRITEHEADER, ps);
- curl_easy_setopt (s->client_get, CURLOPT_READFUNCTION, curl_send_cb);
+ curl_easy_setopt (s->client_get, CURLOPT_READFUNCTION, client_send_cb);
curl_easy_setopt (s->client_get, CURLOPT_READDATA, s);
- curl_easy_setopt (s->client_get, CURLOPT_WRITEFUNCTION, curl_receive_cb);
+ curl_easy_setopt (s->client_get, CURLOPT_WRITEFUNCTION, client_receive);
curl_easy_setopt (s->client_get, CURLOPT_WRITEDATA, s);
curl_easy_setopt (s->client_get, CURLOPT_TIMEOUT_MS,
(long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
/* create put connection */
s->client_put = curl_easy_init ();
-#if VERBOSE_CLIENT
+#if VERBOSE_CURL
curl_easy_setopt (s->client_put, CURLOPT_VERBOSE, 1L);
curl_easy_setopt (s->client_put, CURLOPT_DEBUGFUNCTION, &client_log);
curl_easy_setopt (s->client_put, CURLOPT_DEBUGDATA, s->client_put);
curl_easy_setopt (s->client_put, CURLOPT_PUT, 1L);
//curl_easy_setopt (s->client_put, CURLOPT_HEADERFUNCTION, &curl_put_header_cb);
//curl_easy_setopt (s->client_put, CURLOPT_WRITEHEADER, ps);
- curl_easy_setopt (s->client_put, CURLOPT_READFUNCTION, curl_send_cb);
+ curl_easy_setopt (s->client_put, CURLOPT_READFUNCTION, client_send_cb);
curl_easy_setopt (s->client_put, CURLOPT_READDATA, s);
- curl_easy_setopt (s->client_put, CURLOPT_WRITEFUNCTION, curl_receive_cb);
+ curl_easy_setopt (s->client_put, CURLOPT_WRITEFUNCTION, client_receive);
curl_easy_setopt (s->client_put, CURLOPT_WRITEDATA, s);
curl_easy_setopt (s->client_put, CURLOPT_TIMEOUT_MS,
(long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
return GNUNET_SYSERR;
}
+struct GNUNET_TIME_Relative
+http_plugin_receive (void *cls, const struct GNUNET_PeerIdentity * peer,
+ const struct GNUNET_MessageHeader * message,
+ struct Session * session,
+ const char *sender_address,
+ uint16_t sender_address_len)
+{
+ struct Session *s = cls;
+ struct Plugin *plugin = s->plugin;
+ struct GNUNET_TRANSPORT_ATS_Information distance[2];
+ struct GNUNET_TIME_Relative delay;
+
+ distance[0].type = htonl (GNUNET_TRANSPORT_ATS_QUALITY_NET_DISTANCE);
+ distance[0].value = htonl (1);
+ distance[1].type = htonl (GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR);
+ distance[1].value = htonl (0);
+
+ delay = plugin->env->receive (plugin->env->cls, &s->target, message, (const struct GNUNET_TRANSPORT_ATS_Information*) &distance, 2, s, s->addr, s->addrlen);
+ return delay;
+}
+
/**
* Function called for a quick conversion of the binary address to
* a numeric address. Note that the caller must not free the
s->transmit_cont = cont;
s->transmit_cont_cls = cont_cls;
s->next = NULL;
-
+ s->delay = GNUNET_TIME_absolute_get_forever();
return s;
}
GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls)
{
struct Plugin *plugin = cls;
-
+ struct HTTP_Message *msg;
GNUNET_assert (plugin != NULL);
int res = GNUNET_SYSERR;
return GNUNET_SYSERR;
}
}
- else if (s->inbound == GNUNET_NO)
- res = client_send (s, msgbuf, msgbuf_size);
- else if (s->inbound == GNUNET_YES)
- res = server_send (s, msgbuf, msgbuf_size);
+
+ msg = GNUNET_malloc (sizeof (struct HTTP_Message) + msgbuf_size);
+ msg->next = NULL;
+ msg->size = msgbuf_size;
+ msg->pos = 0;
+ msg->buf = (char *) &msg[1];
+ msg->transmit_cont = cont;
+ msg->transmit_cont_cls = cont_cls;
+ memcpy (msg->buf, msgbuf, msgbuf_size);
+
+ if (s->inbound == GNUNET_NO)
+ res = client_send (s, msg);
+ if (s->inbound == GNUNET_YES)
+ res = server_send (s, msg);
return res;
}
#endif
+/**
+ * Callback called by MessageStreamTokenizer when a message has arrived
+ * @param cls current session as closure
+ * @param client clien
+ * @param message the message to be forwarded to transport service
+ */
+static void
+server_receive_mst_cb (void *cls, void *client,
+ const struct GNUNET_MessageHeader *message)
+{
+ struct Session *s = cls;
+ struct Plugin *plugin = s->plugin;
+ struct GNUNET_TIME_Relative delay;
+
+ delay = http_plugin_receive (s, &s->target, message, s, s->addr, s->addrlen);
+
+ s->delay = GNUNET_TIME_absolute_add(GNUNET_TIME_absolute_get(), delay);
+
+ if (GNUNET_TIME_absolute_get().abs_value < s->delay.abs_value)
+ {
+#if VERBOSE_CLIENT
+ GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, "Server: peer `%s' address `%s' next read delayed for %llu ms\n",
+ GNUNET_i2s (&s->target), GNUNET_a2s (s->addr, s->addrlen), delay);
+#endif
+ }
+}
+
+/**
+ * Callback called by MHD when it needs data to send
+ * @param cls current session
+ * @param pos position in buffer
+ * @param buf the buffer to write data to
+ * @param max max number of bytes available in buffer
+ * @return bytes written to buffer
+ */
+static ssize_t
+mhd_send_callback (void *cls, uint64_t pos, char *buf, size_t max)
+{
+ struct Session *s = cls;
+ struct HTTP_Message *msg;
+ int bytes_read = 0;
+
+ msg = s->msg_head;
+ if (msg != NULL)
+ {
+ /* sending */
+ if ((msg->size - msg->pos) <= max)
+ {
+ memcpy (buf, &msg->buf[msg->pos], (msg->size - msg->pos));
+ bytes_read = msg->size - msg->pos;
+ msg->pos += (msg->size - msg->pos);
+ }
+ else
+ {
+ memcpy (buf, &msg->buf[msg->pos], max);
+ msg->pos += max;
+ bytes_read = max;
+ }
+
+ /* removing message */
+ if (msg->pos == msg->size)
+ {
+ if (NULL != msg->transmit_cont)
+ msg->transmit_cont (msg->transmit_cont_cls, &s->target, GNUNET_OK);
+ GNUNET_CONTAINER_DLL_remove(s->msg_head, s->msg_tail, msg);
+ GNUNET_free (msg);
+ }
+ }
+#if DEBUG_CONNECTIONS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Connection %X: MHD has sent %u bytes\n",
+ s, bytes_read);
+#endif
+ return bytes_read;
+}
+
/**
* Process GET or PUT request received via MHD. For
* GET, queue response that will send back our pending
res = MHD_queue_response (mhd_connection, MHD_HTTP_NOT_FOUND, response);
MHD_destroy_response (response);
return res;
-
-
found:
-
-
sc = GNUNET_malloc (sizeof (struct ServerConnection));
sc->mhd_conn = mhd_connection;
sc->direction = direction;
s->server_recv = sc;
(*httpSessionCache) = sc;
- return MHD_YES;
}
+
+
/* existing connection */
sc = (*httpSessionCache);
s = sc->session;
return MHD_YES;
}
+ GNUNET_assert (s != NULL);
+ if (sc->direction == _SEND)
+ {
+ response =
+ MHD_create_response_from_callback (-1, 32 * 1024, &mhd_send_callback,
+ s, NULL);
+ res = MHD_queue_response (mhd_connection, MHD_HTTP_OK, response);
+ MHD_destroy_response (response);
+ return MHD_YES;
+ }
+ if (sc->direction == _RECEIVE)
+ {
+ if (*upload_data_size == 0)
+ {
+#if VERBOSE_SERVER
+ GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name,
+ "Server: peer `%s' PUT on address `%s' connected\n",
+ GNUNET_i2s (&s->target), GNUNET_a2s (s->addr, s->addrlen));
+#endif
+ return MHD_YES;
+ }
+
+ /* Recieving data */
+ if ((*upload_data_size > 0))
+ {
+#if VERBOSE_SERVER
+ GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name,
+ "Server: peer `%s' PUT on address `%s' received %u bytes\n",
+ GNUNET_i2s (&s->target), GNUNET_a2s (s->addr, s->addrlen));
+#endif
+ if ((GNUNET_TIME_absolute_get().abs_value < s->delay.abs_value))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Connection %X: PUT with %u bytes forwarded to MST\n", s,
+ *upload_data_size);
+
+ if (s->msg_tk == NULL)
+ {
+ s->msg_tk = GNUNET_SERVER_mst_create (&server_receive_mst_cb, s);
+ }
+ res = GNUNET_SERVER_mst_receive (s->msg_tk, s, upload_data, *upload_data_size, GNUNET_NO, GNUNET_NO);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Server: Received %u bytes\n",
+ *upload_data_size);
+ (*upload_data_size) = 0;
+ }
+ else
+ {
+/*
+#if DEBUG_HTTP
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Connection %X: no inbound bandwidth available! Next read was delayed for %llu ms\n",
+ s, ps->peercontext->delay.rel_value);
+#endif
+*/
+ }
+ return MHD_YES;
+ }
+ else
+ return MHD_NO;
+ }
return res;
}
tc = s->server_send;
tc->disconnect = GNUNET_YES;
}
+ if (s->msg_tk != NULL)
+ GNUNET_SERVER_mst_destroy(s->msg_tk);
}
GNUNET_free (sc);
}
plugin->cur_connections--;
+
if ((s->server_send == NULL) && (s->server_recv == NULL))
{
#if VERBOSE_SERVER
"Server: peer `%s' on address `%s' disconnected\n",
GNUNET_i2s (&s->target), GNUNET_a2s (s->addr, s->addrlen));
#endif
+
notify_session_end(s->plugin, &s->target, s);
}
}
}
int
-server_send (struct Session *s, const char *msgbuf, size_t msgbuf_size)
+server_send (struct Session *s, struct HTTP_Message * msg)
{
+ GNUNET_CONTAINER_DLL_insert (s->msg_head, s->msg_tail, msg);
return GNUNET_OK;
}
while (s != NULL)
{
t = s->next;
+ if (s->msg_tk != NULL)
+ GNUNET_SERVER_mst_destroy(s->msg_tk);
delete_session (s);
s = t;
}