#include "gnunet_statistics_service.h"
#include "mesh_path.h"
-#include "mesh_protocol_enc.h"
-#include "mesh_enc.h"
+#include "mesh_protocol.h"
+#include "mesh.h"
#include "gnunet-service-mesh_connection.h"
#include "gnunet-service-mesh_peer.h"
#include "gnunet-service-mesh_tunnel.h"
-#include "gnunet-service-mesh_channel.h"
#define LOG(level, ...) GNUNET_log_from (level,"mesh-con",__VA_ARGS__)
* How frequently to poll for ACKs.
*/
struct GNUNET_TIME_Relative poll_time;
+
+ /**
+ * Queued poll message, to cancel if not necessary anymore (got ACK).
+ */
+ struct MeshConnectionQueue *poll_msg;
+
+ /**
+ * Queued poll message, to cancel if not necessary anymore (got ACK).
+ */
+ struct MeshConnectionQueue *ack_msg;
};
/**
int destroy;
};
+/**
+ * Handle for messages queued but not yet sent.
+ */
+struct MeshConnectionQueue
+{
+ /**
+ * Peer queue handle, to cancel if necessary.
+ */
+ struct MeshPeerQueue *q;
+
+ /**
+ * Was this a forced message? (Do not account for it)
+ */
+ int forced;
+
+ /**
+ * Continuation to call once sent.
+ */
+ GMC_sent cont;
+
+ /**
+ * Closure for @c cont.
+ */
+ void *cont_cls;
+};
+
/******************************************************************************/
/******************************* GLOBALS ***********************************/
/******************************************************************************/
*/
static struct GNUNET_TIME_Relative refresh_connection_time;
+/**
+ * How often to send path create / ACKs.
+ */
+static struct GNUNET_TIME_Relative create_connection_time;
+
/******************************************************************************/
/******************************** STATIC ***********************************/
return "MESH_CONNECTION_ACK";
case MESH_CONNECTION_READY:
return "MESH_CONNECTION_READY";
+ case MESH_CONNECTION_DESTROYED:
+ return "MESH_CONNECTION_DESTROYED";
default:
return "MESH_CONNECTION_STATE_ERROR";
}
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Connection %s state was %s\n",
GMC_2s (c), GMC_state2s (c->state));
+ if (MESH_CONNECTION_DESTROYED == c->state)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "state not changing anymore\n");
+ return;
+ }
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Connection %s state is now %s\n",
GMC_2s (c), GMC_state2s (state));
}
+/**
+ * Callback called when a queued ACK message is sent.
+ *
+ * @param cls Closure (FC).
+ * @param c Connection this message was on.
+ * @param q Queue handler this call invalidates.
+ * @param type Type of message sent.
+ * @param fwd Was this a FWD going message?
+ * @param size Size of the message.
+ */
+static void
+ack_sent (void *cls,
+ struct MeshConnection *c,
+ struct MeshConnectionQueue *q,
+ uint16_t type, int fwd, size_t size)
+{
+ struct MeshFlowControl *fc = cls;
+
+ fc->ack_msg = NULL;
+}
+
+
/**
* Send an ACK on the connection, informing the predecessor about
* the available buffer space. Should not be called in case the peer
- * is origin (no predecessor).
+ * is origin (no predecessor) in the @c fwd direction.
*
* Note that for fwd ack, the FWD mean forward *traffic* (root->dest),
* the ACK itself goes "back" (dest->root).
*
* @param c Connection on which to send the ACK.
* @param buffer How much space free to advertise?
- * @param fwd Is this FWD ACK? (Going dest->owner)
+ * @param fwd Is this FWD ACK? (Going dest -> root)
+ * @param force Don't optimize out.
*/
static void
-send_ack (struct MeshConnection *c, unsigned int buffer, int fwd)
+send_ack (struct MeshConnection *c, unsigned int buffer, int fwd, int force)
{
struct MeshFlowControl *next_fc;
struct MeshFlowControl *prev_fc;
LOG (GNUNET_ERROR_TYPE_DEBUG,
"connection send %s ack on %s\n",
- fwd ? "FWD" : "BCK", GMC_2s (c));
+ GM_f2s (fwd), GMC_2s (c));
- /* Check if we need to transmit the ACK */
+ /* Check if we need to transmit the ACK. */
delta = prev_fc->last_ack_sent - prev_fc->last_pid_recv;
- if (3 < delta && buffer < delta)
+ if (3 < delta && buffer < delta && GNUNET_NO == force)
{
LOG (GNUNET_ERROR_TYPE_DEBUG, "Not sending ACK, buffer > 3\n");
LOG (GNUNET_ERROR_TYPE_DEBUG,
" last pid %u, last ack %u, qmax %u, q %u\n",
prev_fc->last_pid_recv, prev_fc->last_ack_sent,
next_fc->queue_max, next_fc->queue_n);
- if (ack == prev_fc->last_ack_sent)
+ if (ack == prev_fc->last_ack_sent && GNUNET_NO == force)
{
LOG (GNUNET_ERROR_TYPE_DEBUG, "Not sending FWD ACK, not needed\n");
return;
}
+ /* Check if message is already in queue */
+ if (NULL != prev_fc->ack_msg)
+ {
+ if (GM_is_pid_bigger (ack, prev_fc->last_ack_sent))
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " canceling old ACK\n");
+ GMC_cancel (prev_fc->ack_msg);
+ /* GMC_cancel triggers ack_sent(), which clears fc->ack_msg */
+ }
+ else
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " same ACK already in queue\n");
+ return;
+ }
+ }
+
prev_fc->last_ack_sent = ack;
/* Build ACK message and send on connection */
msg.ack = htonl (ack);
msg.cid = c->id;
- GMC_send_prebuilt_message (&msg.header, c, NULL, !fwd);
+ prev_fc->ack_msg = GMC_send_prebuilt_message (&msg.header, c,
+ !fwd, GNUNET_YES,
+ &ack_sent, prev_fc);
}
*
* Calculates the average time and connection packet tracking.
*
- * @param cls Closure.
+ * @param cls Closure (ConnectionQueue Handle).
* @param c Connection this message was on.
* @param type Type of message sent.
* @param fwd Was this a FWD going message?
* @param size Size of the message.
* @param wait Time spent waiting for core (only the time for THIS message)
*/
-static void
+static void
message_sent (void *cls,
struct MeshConnection *c, uint16_t type,
int fwd, size_t size,
{
struct MeshConnectionPerformance *p;
struct MeshFlowControl *fc;
+ struct MeshConnectionQueue *q = cls;
double usecsperbyte;
+ int forced;
fc = fwd ? &c->fwd_fc : &c->bck_fc;
- LOG (GNUNET_ERROR_TYPE_DEBUG, "! sent %s\n", GNUNET_MESH_DEBUG_M2S (type));
- LOG (GNUNET_ERROR_TYPE_DEBUG, "! Q_N- %p %u\n", fc, fc->queue_n);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "! sent %s %s\n",
+ GM_f2s (fwd),
+ GM_m2s (type));
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "! C_P- %p %u\n", c, c->pending_messages);
+ if (NULL != q)
+ {
+ forced = q->forced;
+ if (NULL != q->cont)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "! calling cont\n");
+ q->cont (q->cont_cls, c, q, type, fwd, size);
+ }
+ GNUNET_free (q);
+ }
+ else if (type == GNUNET_MESSAGE_TYPE_MESH_ENCRYPTED)
+ {
+ /* If NULL == q and ENCRYPTED == type, message must have been ch_mngmnt */
+ forced = GNUNET_YES;
+ }
+ else
+ {
+ forced = GNUNET_NO;
+ }
c->pending_messages--;
if (GNUNET_YES == c->destroy && 0 == c->pending_messages)
{
{
case GNUNET_MESSAGE_TYPE_MESH_ENCRYPTED:
fc->last_pid_sent++;
- fc->queue_n--;
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "! accounting pid %u\n",
- fc->last_pid_sent);
- GMC_send_ack (c, fwd);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "! Q_N- %p %u\n", fc, fc->queue_n);
+ if (GNUNET_NO == forced)
+ {
+ fc->queue_n--;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "! accounting pid %u\n",
+ fc->last_pid_sent);
+ }
+ else
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "! forced, Q_N not accounting pid %u\n",
+ fc->last_pid_sent);
+ }
+ GMC_send_ack (c, fwd, GNUNET_NO);
+ break;
+
+ case GNUNET_MESSAGE_TYPE_MESH_POLL:
+ fc->poll_msg = NULL;
+ break;
+
+ case GNUNET_MESSAGE_TYPE_MESH_ACK:
+ fc->ack_msg = NULL;
break;
+
default:
break;
}
p->avg /= p->size;
}
p->idx = (p->idx + 1) % AVG_MSGS;
-
-// if (NULL != c->t)
-// {
-// c->t->pending_messages--;
-// if (GNUNET_YES == c->t->destroy && 0 == t->pending_messages)
-// {
-// LOG (GNUNET_ERROR_TYPE_DEBUG, "* destroying tunnel!\n");
-// GMT_destroy (c->t);
-// }
-// }
}
{
GNUNET_PEER_Id id;
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Get prev hop, own pos %u\n", c->own_pos);
if (0 == c->own_pos || c->path->length < 2)
id = c->path->peers[0];
else
* @param c Connection to check.
* @param sender Peer identity of neighbor.
*
- * @return GNUNET_YES in case the sender is the 'prev' hop and therefore
- * the traffic is 'FWD'. GNUNET_NO for BCK. GNUNET_SYSERR for errors.
+ * @return #GNUNET_YES in case the sender is the 'prev' hop and therefore
+ * the traffic is 'FWD'.
+ * #GNUNET_NO for BCK.
+ * #GNUNET_SYSERR for errors.
*/
-static int
+static int
is_fwd (const struct MeshConnection *c,
const struct GNUNET_PeerIdentity *sender)
{
* or a first CONNECTION_ACK directed to us.
*
* @param connection Connection to confirm.
- * @param fwd Should we send it FWD?
+ * @param fwd Should we send it FWD? (root->dest)
* (First (~SYNACK) goes BCK, second (~ACK) goes FWD)
*/
static void
struct MeshTunnel3 *t;
t = connection->t;
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Send connection ack\n");
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Send connection %s ACK\n",
+ !GM_f2s (fwd));
GMP_queue_add (get_hop (connection, fwd), NULL,
GNUNET_MESSAGE_TYPE_MESH_CONNECTION_ACK,
sizeof (struct GNUNET_MESH_ConnectionACK),
- connection, NULL, fwd,
- &message_sent, NULL);
- if (MESH_TUNNEL3_NEW == GMT_get_state (t))
- GMT_change_state (t, MESH_TUNNEL3_WAITING);
+ connection, fwd, &message_sent, NULL);
+ connection->pending_messages++;
+ if (MESH_TUNNEL3_NEW == GMT_get_cstate (t))
+ GMT_change_cstate (t, MESH_TUNNEL3_WAITING);
if (MESH_CONNECTION_READY != connection->state)
connection_change_state (connection, MESH_CONNECTION_SENT);
}
msg.cid = c->id;
msg.peer1 = *id1;
msg.peer2 = *id2;
- GMC_send_prebuilt_message (&msg.header, c, NULL, fwd);
+ GMC_send_prebuilt_message (&msg.header, c, fwd, GNUNET_YES, NULL, NULL);
}
-
/**
* Send keepalive packets for a connection.
*
struct GNUNET_MESH_ConnectionKeepAlive *msg;
size_t size = sizeof (struct GNUNET_MESH_ConnectionKeepAlive);
char cbuf[size];
- uint16_t type;
-
- type = fwd ? GNUNET_MESSAGE_TYPE_MESH_FWD_KEEPALIVE :
- GNUNET_MESSAGE_TYPE_MESH_BCK_KEEPALIVE;
LOG (GNUNET_ERROR_TYPE_DEBUG,
"sending %s keepalive for connection %s]\n",
- fwd ? "FWD" : "BCK", GMC_2s (c));
+ GM_f2s (fwd), GMC_2s (c));
msg = (struct GNUNET_MESH_ConnectionKeepAlive *) cbuf;
msg->header.size = htons (size);
- msg->header.type = htons (type);
+ msg->header.type = htons (GNUNET_MESSAGE_TYPE_MESH_KEEPALIVE);
msg->cid = c->id;
+ msg->reserved = htonl (0);
- GMC_send_prebuilt_message (&msg->header, c, NULL, fwd);
+ GMC_send_prebuilt_message (&msg->header, c, fwd, GNUNET_YES, NULL, NULL);
}
* Send CONNECTION_{CREATE/ACK} packets for a connection.
*
* @param c Connection for which to send the message.
- * @param fwd If GNUNET_YES, send CREATE, otherwise send ACK.
+ * @param fwd If #GNUNET_YES, send CREATE, otherwise send ACK.
*/
static void
connection_recreate (struct MeshConnection *c, int fwd)
static void
connection_maintain (struct MeshConnection *c, int fwd)
{
- if (MESH_TUNNEL3_SEARCHING == GMT_get_state (c->t))
+ if (MESH_TUNNEL3_SEARCHING == GMT_get_cstate (c->t))
{
/* TODO DHT GET with RO_BART */
return;
{
case MESH_CONNECTION_NEW:
GNUNET_break (0);
+ /* fall-through */
case MESH_CONNECTION_SENT:
connection_recreate (c, fwd);
break;
connection_fwd_keepalive (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct MeshConnection *c = cls;
+ struct GNUNET_TIME_Relative delay;
c->fwd_maintenance_task = GNUNET_SCHEDULER_NO_TASK;
if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
return;
connection_maintain (c, GNUNET_YES);
- c->fwd_maintenance_task = GNUNET_SCHEDULER_add_delayed (refresh_connection_time,
+ delay = c->state == MESH_CONNECTION_READY ?
+ refresh_connection_time : create_connection_time;
+ c->fwd_maintenance_task = GNUNET_SCHEDULER_add_delayed (delay,
&connection_fwd_keepalive,
c);
}
connection_bck_keepalive (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct MeshConnection *c = cls;
+ struct GNUNET_TIME_Relative delay;
c->bck_maintenance_task = GNUNET_SCHEDULER_NO_TASK;
if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
return;
connection_maintain (c, GNUNET_NO);
- c->bck_maintenance_task = GNUNET_SCHEDULER_add_delayed (refresh_connection_time,
+ delay = c->state == MESH_CONNECTION_READY ?
+ refresh_connection_time : create_connection_time;
+ c->bck_maintenance_task = GNUNET_SCHEDULER_add_delayed (delay,
&connection_bck_keepalive,
c);
}
LOG (GNUNET_ERROR_TYPE_DEBUG,
"connection_unlock_queue %s on %s\n",
- fwd ? "FWD" : "BCK", GMC_2s (c));
+ GM_f2s (fwd), GMC_2s (c));
if (GMC_is_terminal (c, fwd))
{
/**
* Cancel all transmissions that belong to a certain connection.
*
- * @param c Connection which to cancel.
+ * If the connection is scheduled for destruction and no more messages are left,
+ * the connection will be destroyed by the continuation call.
+ *
+ * @param c Connection which to cancel. Might be destroyed during this call.
* @param fwd Cancel fwd traffic?
*/
static void
connection_cancel_queues (struct MeshConnection *c, int fwd)
{
-
struct MeshFlowControl *fc;
struct MeshPeer *peer;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ " *** Cancel %s queues for connection %s\n",
+ GM_f2s (fwd), GMC_2s (c));
if (NULL == c)
{
GNUNET_break (0);
{
GNUNET_SCHEDULER_cancel (fc->poll_task);
fc->poll_task = GNUNET_SCHEDULER_NO_TASK;
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " *** Cancel POLL in ccq for fc %p\n", fc);
}
peer = get_hop (c, fwd);
GMP_queue_cancel (peer, c);
}
+/**
+ * Function called if a connection has been stalled for a while,
+ * possibly due to a missed ACK. Poll the neighbor about its ACK status.
+ *
+ * @param cls Closure (poll ctx).
+ * @param tc TaskContext.
+ */
+static void
+connection_poll (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
+
+
+/**
+ * Callback called when a queued POLL message is sent.
+ *
+ * @param cls Closure (FC).
+ * @param c Connection this message was on.
+ * @param q Queue handler this call invalidates.
+ * @param type Type of message sent.
+ * @param fwd Was this a FWD going message?
+ * @param size Size of the message.
+ */
+static void
+poll_sent (void *cls,
+ struct MeshConnection *c,
+ struct MeshConnectionQueue *q,
+ uint16_t type, int fwd, size_t size)
+{
+ struct MeshFlowControl *fc = cls;
+
+ if (2 == c->destroy)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " *** POLL canceled on shutdown\n");
+ return;
+ }
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ " *** POLL sent for , scheduling new one!\n");
+ fc->poll_msg = NULL;
+ fc->poll_time = GNUNET_TIME_STD_BACKOFF (fc->poll_time);
+ fc->poll_task = GNUNET_SCHEDULER_add_delayed (fc->poll_time,
+ &connection_poll, fc);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " task %u\n", fc->poll_task);
+
+}
+
/**
* Function called if a connection has been stalled for a while,
* possibly due to a missed ACK. Poll the neighbor about its ACK status.
LOG (GNUNET_ERROR_TYPE_DEBUG, " *** Polling!\n");
LOG (GNUNET_ERROR_TYPE_DEBUG, " *** connection [%s]\n", GMC_2s (c));
LOG (GNUNET_ERROR_TYPE_DEBUG, " *** %s\n",
- fc == &c->fwd_fc ? "FWD" : "BCK");
+ fc == &c->fwd_fc ? "FWD" : "BCK");
msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_POLL);
msg.header.size = htons (sizeof (msg));
- LOG (GNUNET_ERROR_TYPE_DEBUG, " *** pid (%u)!\n", fc->last_pid_sent);
- GMC_send_prebuilt_message (&msg.header, c, NULL, fc == &c->fwd_fc);
- fc->poll_time = GNUNET_TIME_STD_BACKOFF (fc->poll_time);
- fc->poll_task = GNUNET_SCHEDULER_add_delayed (fc->poll_time,
- &connection_poll, fc);
+ msg.pid = htonl (fc->last_pid_sent);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " *** last pid sent: %u!\n", fc->last_pid_sent);
+ fc->poll_msg = GMC_send_prebuilt_message (&msg.header, c,
+ fc == &c->fwd_fc, GNUNET_YES,
+ &poll_sent, fc);
}
* a keepalive or a path confirmation message (either create or ACK).
* - For all other peers, this means to destroy the connection,
* due to lack of activity.
- * Starts the tiemout if no timeout was running (connection just created).
+ * Starts the timeout if no timeout was running (connection just created).
*
* @param c Connection whose timeout to reset.
* @param fwd Is this forward?
ti = fwd ? &c->fwd_maintenance_task : &c->bck_maintenance_task;
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Connection %s reset timeout\n", GM_f2s (fwd));
+
if (GNUNET_SCHEDULER_NO_TASK != *ti)
GNUNET_SCHEDULER_cancel (*ti);
- if (GMC_is_origin (c, fwd)) /* Endpoint */
+ if (GMC_is_origin (c, fwd)) /* Startpoint */
{
f = fwd ? &connection_fwd_keepalive : &connection_bck_keepalive;
*ti = GNUNET_SCHEDULER_add_delayed (refresh_connection_time, f, c);
}
- else /* Relay */
+ else /* Relay, endpoint. */
{
struct GNUNET_TIME_Relative delay;
* Add the connection to the list of both neighbors.
*
* @param c Connection.
+ *
+ * @return #GNUNET_OK if everything went fine
+ * #GNUNET_SYSERR if the was an error and @c c is malformed.
*/
-static void
+static int
register_neighbors (struct MeshConnection *c)
{
- struct MeshPeer *peer;
+ struct MeshPeer *next_peer;
+ struct MeshPeer *prev_peer;
- peer = get_next_hop (c);
- if (GNUNET_NO == GMP_is_neighbor (peer))
- {
- GMC_destroy (c);
- return;
- }
- GMP_add_connection (peer, c);
- peer = get_prev_hop (c);
- if (GNUNET_NO == GMP_is_neighbor (peer))
+ next_peer = get_next_hop (c);
+ prev_peer = get_prev_hop (c);
+
+ if (GNUNET_NO == GMP_is_neighbor (next_peer)
+ || GNUNET_NO == GMP_is_neighbor (prev_peer))
{
- GMC_destroy (c);
- return;
+ if (GMC_is_origin (c, GNUNET_YES))
+ GNUNET_break (0);
+ else
+ GNUNET_break_op (0);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " register neighbors failed\n");
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " prev: %s, %d\n",
+ GMP_2s (prev_peer), GMP_is_neighbor (prev_peer));
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " next: %s, %d\n",
+ GMP_2s (next_peer), GMP_is_neighbor (next_peer));
+ return GNUNET_SYSERR;
}
- GMP_add_connection (peer, c);
+
+ GMP_add_connection (next_peer, c);
+ GMP_add_connection (prev_peer, c);
+
+ return GNUNET_OK;
}
LOG (GNUNET_ERROR_TYPE_DEBUG, " Creating connection\n");
c = GMC_new (cid, NULL, path_duplicate (path), own_pos);
if (NULL == c)
+ {
+ path_destroy (path);
return GNUNET_OK;
+ }
connection_reset_timeout (c, GNUNET_YES);
}
else
{
- path = NULL;
+ path = path_duplicate (c->path);
}
if (MESH_CONNECTION_NEW == c->state)
connection_change_state (c, MESH_CONNECTION_SENT);
if (c->own_pos == size - 1)
{
LOG (GNUNET_ERROR_TYPE_DEBUG, " It's for us!\n");
- GMP_add_path_to_origin (orig_peer, path, GNUNET_YES);
+ GMP_add_path_to_origin (orig_peer, path_duplicate (path), GNUNET_YES);
add_to_peer (c, orig_peer);
- if (MESH_TUNNEL3_NEW == GMT_get_state (c->t))
- GMT_change_state (c->t, MESH_TUNNEL3_WAITING);
+ if (MESH_TUNNEL3_NEW == GMT_get_cstate (c->t))
+ GMT_change_cstate (c->t, MESH_TUNNEL3_WAITING);
send_connection_ack (c, GNUNET_NO);
if (MESH_CONNECTION_SENT == c->state)
connection_change_state (c, MESH_CONNECTION_ACK);
/* Keep tunnel alive in direction dest->owner*/
- connection_reset_timeout (c, GNUNET_NO);
+ if (GNUNET_SCHEDULER_NO_TASK == c->bck_maintenance_task)
+ {
+ c->bck_maintenance_task =
+ GNUNET_SCHEDULER_add_delayed (create_connection_time,
+ &connection_bck_keepalive, c);
+ }
}
else
{
/* It's for somebody else! Retransmit. */
LOG (GNUNET_ERROR_TYPE_DEBUG, " Retransmitting.\n");
GMP_add_path (dest_peer, path_duplicate (path), GNUNET_NO);
- GMP_add_path_to_origin (orig_peer, path, GNUNET_NO);
- GMC_send_prebuilt_message (message, c, NULL, GNUNET_YES);
+ GMP_add_path_to_origin (orig_peer, path_duplicate (path), GNUNET_NO);
+ GMC_send_prebuilt_message (message, c, GNUNET_YES, GNUNET_YES, NULL, NULL);
}
+ path_destroy (path);
return GNUNET_OK;
}
struct MeshConnection *c;
struct MeshPeerPath *p;
struct MeshPeer *pi;
+ enum MeshConnectionState oldstate;
int fwd;
LOG (GNUNET_ERROR_TYPE_DEBUG, "\n\n");
return GNUNET_OK;
}
-
- LOG (GNUNET_ERROR_TYPE_DEBUG, " via peer %s\n",
- GNUNET_i2s (peer));
+ oldstate = c->state;
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " via peer %s\n", GNUNET_i2s (peer));
pi = GMP_get (peer);
if (get_next_hop (c) == pi)
{
LOG (GNUNET_ERROR_TYPE_DEBUG, " SYNACK\n");
fwd = GNUNET_NO;
- if (MESH_CONNECTION_SENT == c->state)
+ if (MESH_CONNECTION_SENT == oldstate)
connection_change_state (c, MESH_CONNECTION_ACK);
}
else if (get_prev_hop (c) == pi)
GNUNET_break_op (0);
return GNUNET_OK;
}
+
connection_reset_timeout (c, fwd);
/* Add path to peers? */
/* Message for us as creator? */
if (GMC_is_origin (c, GNUNET_YES))
{
+ if (GNUNET_NO != fwd)
+ {
+ GNUNET_break_op (0);
+ return GNUNET_OK;
+ }
LOG (GNUNET_ERROR_TYPE_DEBUG, " Connection (SYN)ACK for us!\n");
+
+ /* If just created, cancel the short timeout and start a long one */
+ if (MESH_CONNECTION_SENT == oldstate)
+ connection_reset_timeout (c, GNUNET_YES);
+
+ /* Change connection state */
connection_change_state (c, MESH_CONNECTION_READY);
- GMT_change_state (c->t, MESH_TUNNEL3_READY);
send_connection_ack (c, GNUNET_YES);
- GMT_send_queued_data (c->t, GNUNET_YES);
+
+ /* Change tunnel state, trigger KX */
+ if (MESH_TUNNEL3_WAITING == GMT_get_cstate (c->t))
+ GMT_change_cstate (c->t, MESH_TUNNEL3_READY);
+
return GNUNET_OK;
}
/* Message for us as destination? */
if (GMC_is_terminal (c, GNUNET_YES))
{
+ if (GNUNET_YES != fwd)
+ {
+ GNUNET_break_op (0);
+ return GNUNET_OK;
+ }
LOG (GNUNET_ERROR_TYPE_DEBUG, " Connection ACK for us!\n");
- connection_change_state (c, MESH_CONNECTION_READY);
- GMT_change_state (c->t, MESH_TUNNEL3_READY);
- GMT_send_queued_data (c->t, GNUNET_NO);
+
+ /* If just created, cancel the short timeout and start a long one */
+ if (MESH_CONNECTION_ACK == oldstate)
+ connection_reset_timeout (c, GNUNET_NO);
+
+ /* Change tunnel state */
+ if (MESH_TUNNEL3_WAITING == GMT_get_cstate (c->t))
+ GMT_change_cstate (c->t, MESH_TUNNEL3_READY);
+
return GNUNET_OK;
}
LOG (GNUNET_ERROR_TYPE_DEBUG, " not for us, retransmitting...\n");
- GMC_send_prebuilt_message (message, c, NULL, fwd);
+ GMC_send_prebuilt_message (message, c, fwd, GNUNET_YES, NULL, NULL);
return GNUNET_OK;
}
}
fwd = is_fwd (c, id);
- connection_cancel_queues (c, !fwd);
if (GMC_is_terminal (c, fwd))
{
if (0 < c->pending_messages)
}
else
{
- GMC_send_prebuilt_message (message, c, NULL, fwd);
+ GMC_send_prebuilt_message (message, c, fwd, GNUNET_YES, NULL, NULL);
c->destroy = GNUNET_YES;
+ connection_cancel_queues (c, !fwd);
}
return GNUNET_OK;
*/
GNUNET_STATISTICS_update (stats, "# control on unknown tunnel",
1, GNUNET_NO);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " connection unknown: already destroyed?\n");
return GNUNET_OK;
}
fwd = is_fwd (c, peer);
GNUNET_break_op (0);
return GNUNET_OK;
}
- GMC_send_prebuilt_message (message, c, NULL, fwd);
+ GMC_send_prebuilt_message (message, c, fwd, GNUNET_YES, NULL, NULL);
c->destroy = GNUNET_YES;
+ c->state = MESH_CONNECTION_DESTROYED;
return GNUNET_OK;
}
}
type = ntohs (msg->header.type);
LOG (GNUNET_ERROR_TYPE_DEBUG, "\n\n");
- LOG (GNUNET_ERROR_TYPE_DEBUG, "got a %s message from %s\n",
- GNUNET_MESH_DEBUG_M2S (type), GNUNET_i2s (peer));
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "got a %s message (#%u) from %s\n",
+ GM_m2s (type), ntohl (msg->pid), GNUNET_i2s (peer));
/* Check connection */
c = connection_get (&msg->cid);
if (NULL == c)
{
GNUNET_STATISTICS_update (stats, "# unknown connection", 1, GNUNET_NO);
- LOG (GNUNET_ERROR_TYPE_DEBUG, "WARNING connection unknown\n");
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "WARNING connection %s unknown\n",
+ GNUNET_h2s (&msg->cid));
return GNUNET_OK;
}
/* Check PID */
fc = fwd ? &c->bck_fc : &c->fwd_fc;
pid = ntohl (msg->pid);
- if (GMC_is_pid_bigger (pid, fc->last_ack_sent))
+ if (GM_is_pid_bigger (pid, fc->last_ack_sent))
{
GNUNET_STATISTICS_update (stats, "# unsolicited message", 1, GNUNET_NO);
LOG (GNUNET_ERROR_TYPE_DEBUG,
pid, fc->last_pid_recv, fc->last_ack_sent);
return GNUNET_OK;
}
- if (GNUNET_NO == GMC_is_pid_bigger (pid, fc->last_pid_recv))
+ if (GNUNET_NO == GM_is_pid_bigger (pid, fc->last_pid_recv))
{
GNUNET_STATISTICS_update (stats, "# duplicate PID", 1, GNUNET_NO);
LOG (GNUNET_ERROR_TYPE_DEBUG,
return GNUNET_OK;
}
fc->last_pid_recv = pid;
- GMT_handle_encrypted (c->t, msg, fwd);
- GMC_send_ack (c, fwd);
+ GMT_handle_encrypted (c->t, msg);
+ GMC_send_ack (c, fwd, GNUNET_NO);
return GNUNET_OK;
}
{
GNUNET_STATISTICS_update (stats, "# TTL drops", 1, GNUNET_NO);
LOG (GNUNET_ERROR_TYPE_WARNING, " TTL is 0, DROPPING!\n");
- GMC_send_ack (c, fwd);
+ GMC_send_ack (c, fwd, GNUNET_NO);
return GNUNET_OK;
}
+
GNUNET_STATISTICS_update (stats, "# messages forwarded", 1, GNUNET_NO);
+ GMC_send_prebuilt_message (&msg->header, c, fwd, GNUNET_NO, NULL, NULL);
+
+ return GNUNET_OK;
+}
+
+/**
+ * Generic handler for mesh network encrypted traffic.
+ *
+ * @param peer Peer identity this notification is about.
+ * @param msg Encrypted message.
+ *
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+handle_mesh_kx (const struct GNUNET_PeerIdentity *peer,
+ const struct GNUNET_MESH_KX *msg)
+{
+ struct MeshConnection *c;
+ struct MeshPeer *neighbor;
+ GNUNET_PEER_Id peer_id;
+ size_t size;
+ uint16_t type;
+ int fwd;
+
+ /* Check size */
+ size = ntohs (msg->header.size);
+ if (size <
+ sizeof (struct GNUNET_MESH_Encrypted) +
+ sizeof (struct GNUNET_MessageHeader))
+ {
+ GNUNET_break_op (0);
+ return GNUNET_OK;
+ }
+ type = ntohs (msg->header.type);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "\n\n");
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "got a %s message from %s\n",
+ GM_m2s (type), GNUNET_i2s (peer));
+
+ /* Check connection */
+ c = connection_get (&msg->cid);
+ if (NULL == c)
+ {
+ GNUNET_STATISTICS_update (stats, "# unknown connection", 1, GNUNET_NO);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "WARNING connection unknown\n");
+ return GNUNET_OK;
+ }
+
+ /* Check if origin is as expected */
+ neighbor = get_prev_hop (c);
+ peer_id = GNUNET_PEER_search (peer);
+ if (peer_id == GMP_get_short_id (neighbor))
+ {
+ fwd = GNUNET_YES;
+ }
+ else
+ {
+ neighbor = get_next_hop (c);
+ if (peer_id == GMP_get_short_id (neighbor))
+ {
+ fwd = GNUNET_NO;
+ }
+ else
+ {
+ /* Unexpected peer sending traffic on a connection. */
+ GNUNET_break_op (0);
+ return GNUNET_OK;
+ }
+ }
- GMC_send_prebuilt_message (&msg->header, c, NULL, fwd);
+ /* Count as connection confirmation. */
+ if (MESH_CONNECTION_SENT == c->state || MESH_CONNECTION_ACK == c->state)
+ connection_change_state (c, MESH_CONNECTION_READY);
+ connection_reset_timeout (c, fwd);
+ if (NULL != c->t)
+ {
+ if (MESH_TUNNEL3_WAITING == GMT_get_cstate (c->t))
+ GMT_change_cstate (c->t, MESH_TUNNEL3_READY);
+ }
+
+ /* Is this message for us? */
+ if (GMC_is_terminal (c, fwd))
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " message for us!\n");
+ GNUNET_STATISTICS_update (stats, "# messages received", 1, GNUNET_NO);
+ if (NULL == c->t)
+ {
+ GNUNET_break (0);
+ return GNUNET_OK;
+ }
+ GMT_handle_kx (c->t, &msg[1].header);
+ return GNUNET_OK;
+ }
+
+ /* Message not for us: forward to next hop */
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " not for us, retransmitting...\n");
+ GNUNET_STATISTICS_update (stats, "# messages forwarded", 1, GNUNET_NO);
+ GMC_send_prebuilt_message (&msg->header, c, fwd, GNUNET_NO, NULL, NULL);
return GNUNET_OK;
}
}
+/**
+ * Core handler for key exchange traffic (ephemeral key, ping, pong).
+ *
+ * @param cls Closure (unused).
+ * @param message Message received.
+ * @param peer Peer who sent the message.
+ *
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+int
+GMC_handle_kx (void *cls, const struct GNUNET_PeerIdentity *peer,
+ const struct GNUNET_MessageHeader *message)
+{
+ return handle_mesh_kx (peer,
+ (struct GNUNET_MESH_KX *) message);
+}
+
+
/**
* Core handler for mesh network traffic point-to-point acks.
*
ack = ntohl (msg->ack);
LOG (GNUNET_ERROR_TYPE_DEBUG, " ACK %u (was %u)\n",
ack, fc->last_ack_recv);
- if (GMC_is_pid_bigger (ack, fc->last_ack_recv))
+ if (GM_is_pid_bigger (ack, fc->last_ack_recv))
fc->last_ack_recv = ack;
/* Cancel polling if the ACK is big enough. */
if (GNUNET_SCHEDULER_NO_TASK != fc->poll_task &&
- GMC_is_pid_bigger (fc->last_ack_recv, fc->last_pid_sent))
+ GM_is_pid_bigger (fc->last_ack_recv, fc->last_pid_sent))
{
LOG (GNUNET_ERROR_TYPE_DEBUG, " Cancel poll\n");
GNUNET_SCHEDULER_cancel (fc->poll_task);
int fwd;
LOG (GNUNET_ERROR_TYPE_DEBUG, "\n\n");
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Got a POLL packet from %s!\n",
- GNUNET_i2s (peer));
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Got a POLL packet from %s!\n",
+ GNUNET_i2s (peer));
msg = (struct GNUNET_MESH_Poll *) message;
id = GNUNET_PEER_search (peer);
if (GMP_get_short_id (get_next_hop (c)) == id)
{
- LOG (GNUNET_ERROR_TYPE_DEBUG, " FWD ACK\n");
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " FWD FC\n");
fc = &c->fwd_fc;
}
else if (GMP_get_short_id (get_prev_hop (c)) == id)
{
- LOG (GNUNET_ERROR_TYPE_DEBUG, " BCK ACK\n");
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " BCK FC\n");
fc = &c->bck_fc;
}
else
}
pid = ntohl (msg->pid);
- LOG (GNUNET_ERROR_TYPE_DEBUG, " PID %u, OLD %u\n",
- pid, fc->last_pid_recv);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " PID %u, OLD %u\n", pid, fc->last_pid_recv);
fc->last_pid_recv = pid;
- fwd = fc == &c->fwd_fc;
- GMC_send_ack (c, fwd);
+ fwd = fc == &c->bck_fc;
+ GMC_send_ack (c, fwd, GNUNET_YES);
return GNUNET_OK;
}
struct GNUNET_MESH_ConnectionKeepAlive *msg;
struct MeshConnection *c;
struct MeshPeer *neighbor;
+ GNUNET_PEER_Id peer_id;
int fwd;
msg = (struct GNUNET_MESH_ConnectionKeepAlive *) message;
return GNUNET_OK;
}
- fwd = GNUNET_MESSAGE_TYPE_MESH_FWD_KEEPALIVE == ntohs (message->type) ?
- GNUNET_YES : GNUNET_NO;
-
- /* Check if origin is as expected */
- neighbor = get_hop (c, fwd);
- if (GNUNET_PEER_search (peer) != GMP_get_short_id (neighbor))
+ /* Check if origin is as expected TODO refactor and reuse */
+ peer_id = GNUNET_PEER_search (peer);
+ neighbor = get_prev_hop (c);
+ if (peer_id == GMP_get_short_id (neighbor))
{
- GNUNET_break_op (0);
- return GNUNET_OK;
+ fwd = GNUNET_YES;
+ }
+ else
+ {
+ neighbor = get_next_hop (c);
+ if (peer_id == GMP_get_short_id (neighbor))
+ {
+ fwd = GNUNET_NO;
+ }
+ else
+ {
+ GNUNET_break_op (0);
+ return GNUNET_OK;
+ }
}
connection_change_state (c, MESH_CONNECTION_READY);
return GNUNET_OK;
GNUNET_STATISTICS_update (stats, "# keepalives forwarded", 1, GNUNET_NO);
- GMC_send_prebuilt_message (message, c, NULL, fwd);
+ GMC_send_prebuilt_message (message, c, fwd, GNUNET_YES, NULL, NULL);
return GNUNET_OK;
}
* the direction and the position of the peer.
*
* @param c Which connection to send the hop-by-hop ACK.
- * @param fwd Is this a fwd ACK? (will go dest->root)
+ * @param fwd Is this a fwd ACK? (will go dest->root).
+ * @param force Send the ACK even if suboptimal (e.g. requested by POLL).
*/
void
-GMC_send_ack (struct MeshConnection *c, int fwd)
+GMC_send_ack (struct MeshConnection *c, int fwd, int force)
{
unsigned int buffer;
LOG (GNUNET_ERROR_TYPE_DEBUG,
"GMC send %s ACK on %s\n",
- fwd ? "FWD" : "BCK", GMC_2s (c));
+ GM_f2s (fwd), GMC_2s (c));
if (NULL == c)
{
return;
}
+ if (GNUNET_NO != c->destroy)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " being destroyed, why bother...\n");
+ return;
+ }
+
/* Get available buffer space */
if (GMC_is_terminal (c, fwd))
{
LOG (GNUNET_ERROR_TYPE_DEBUG, " getting from all channels\n");
- buffer = GMT_get_buffer (c->t, fwd);
+ buffer = GMT_get_channels_buffer (c->t);
}
else
{
buffer = GMC_get_buffer (c, fwd);
}
LOG (GNUNET_ERROR_TYPE_DEBUG, " buffer available: %u\n", buffer);
+ if (0 == buffer && GNUNET_NO == force)
+ return;
/* Send available buffer space */
if (GMC_is_origin (c, fwd))
{
GNUNET_assert (NULL != c->t);
LOG (GNUNET_ERROR_TYPE_DEBUG, " sending on channels...\n");
- if (0 < buffer)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG, " really sending!\n");
- GMT_unchoke_channels (c->t, fwd);
- }
+ GMT_unchoke_channels (c->t);
}
else
{
LOG (GNUNET_ERROR_TYPE_DEBUG, " sending on connection\n");
- send_ack (c, buffer, fwd);
+ send_ack (c, buffer, fwd, force);
}
}
GNUNET_SCHEDULER_shutdown ();
return;
}
+ create_connection_time = GNUNET_TIME_UNIT_SECONDS;
connections = GNUNET_CONTAINER_multihashmap_create (1024, GNUNET_YES);
}
+
+/**
+ * Destroy each connection on shutdown.
+ *
+ * @param cls Closure (unused).
+ * @param key Current key code (CID, unused).
+ * @param value Value in the hash map (connection)
+ *
+ * @return #GNUNET_YES, because we should continue to iterate,
+ */
+static int
+shutdown_iterator (void *cls,
+ const struct GNUNET_HashCode *key,
+ void *value)
+{
+ struct MeshConnection *c = value;
+
+ GMC_destroy (c);
+ return GNUNET_YES;
+}
+
+
/**
* Shut down the connections subsystem.
*/
void
GMC_shutdown (void)
{
+ GNUNET_CONTAINER_multihashmap_iterate (connections, &shutdown_iterator, NULL);
GNUNET_CONTAINER_multihashmap_destroy (connections);
+ connections = NULL;
}
if (0 == own_pos)
{
c->fwd_maintenance_task =
- GNUNET_SCHEDULER_add_delayed (refresh_connection_time,
- &connection_fwd_keepalive, c);
+ GNUNET_SCHEDULER_add_delayed (create_connection_time,
+ &connection_fwd_keepalive, c);
+ }
+ if (GNUNET_OK != register_neighbors (c))
+ {
+ GMC_destroy (c);
+ return NULL;
}
- register_neighbors (c);
+
return c;
}
if (NULL == c)
return;
+ if (2 == c->destroy) /* cancel queues -> GMP_queue_cancel -> q_destroy -> */
+ return; /* -> message_sent -> GMC_destroy. Don't loop. */
+ c->destroy = 2;
+
LOG (GNUNET_ERROR_TYPE_DEBUG, "destroying connection %s\n", GMC_2s (c));
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " fc's f: %p, b: %p\n",
+ &c->fwd_fc, &c->bck_fc);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " fc tasks f: %u, b: %u\n",
+ c->fwd_fc.poll_task, c->bck_fc.poll_task);
/* Cancel all traffic */
connection_cancel_queues (c, GNUNET_YES);
connection_cancel_queues (c, GNUNET_NO);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " fc tasks f: %u, b: %u\n",
+ c->fwd_fc.poll_task, c->bck_fc.poll_task);
+
/* Cancel maintainance task (keepalive/timeout) */
- if (GNUNET_SCHEDULER_NO_TASK != c->fwd_maintenance_task)
- GNUNET_SCHEDULER_cancel (c->fwd_maintenance_task);
- if (GNUNET_SCHEDULER_NO_TASK != c->bck_maintenance_task)
- GNUNET_SCHEDULER_cancel (c->bck_maintenance_task);
+ if (NULL != c->fwd_fc.poll_msg)
+ {
+ GMC_cancel (c->fwd_fc.poll_msg);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " *** POLL msg FWD canceled\n");
+ }
+ if (NULL != c->bck_fc.poll_msg)
+ {
+ GMC_cancel (c->bck_fc.poll_msg);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " *** POLL msg BCK canceled\n");
+ }
/* Unregister from neighbors */
unregister_neighbors (c);
GNUNET_STATISTICS_update (stats, "# connections", -1, GNUNET_NO);
if (NULL != c->t)
GMT_remove_connection (c->t, c);
+
+ if (GNUNET_NO == GMC_is_origin (c, GNUNET_YES))
+ path_destroy (c->path);
+ if (GNUNET_SCHEDULER_NO_TASK != c->fwd_maintenance_task)
+ GNUNET_SCHEDULER_cancel (c->fwd_maintenance_task);
+ if (GNUNET_SCHEDULER_NO_TASK != c->bck_maintenance_task)
+ GNUNET_SCHEDULER_cancel (c->bck_maintenance_task);
+ if (GNUNET_SCHEDULER_NO_TASK != c->fwd_fc.poll_task)
+ {
+ GNUNET_SCHEDULER_cancel (c->fwd_fc.poll_task);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " *** POLL FWD canceled\n");
+ }
+ if (GNUNET_SCHEDULER_NO_TASK != c->bck_fc.poll_task)
+ {
+ GNUNET_SCHEDULER_cancel (c->bck_fc.poll_task);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " *** POLL BCK canceled\n");
+ }
+
+ GNUNET_break (GNUNET_YES ==
+ GNUNET_CONTAINER_multihashmap_remove (connections, &c->id, c));
+
GNUNET_free (c);
}
const struct MeshPeerPath *
GMC_get_path (const struct MeshConnection *c)
{
- return c->path;
+ if (GNUNET_NO == c->destroy)
+ return c->path;
+ return NULL;
}
struct MeshFlowControl *fc;
fc = fwd ? &c->fwd_fc : &c->bck_fc;
- if (GMC_is_pid_bigger(fc->last_pid_recv, fc->last_ack_sent))
+ if (GM_is_pid_bigger(fc->last_pid_recv, fc->last_ack_sent))
{
return 0;
}
void
GMC_allow (struct MeshConnection *c, unsigned int buffer, int fwd)
{
- send_ack (c, buffer, fwd);
+ send_ack (c, buffer, fwd, GNUNET_NO);
}
{
int fwd;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ " notify broken on %s due to %s disconnect\n",
+ GMC_2s (c), GMP_2s (peer));
+
fwd = peer == get_prev_hop (c);
- connection_cancel_queues (c, !fwd);
- if (GMC_is_terminal (c, fwd))
+ if (GNUNET_YES == GMC_is_terminal (c, fwd))
{
/* Local shutdown, no one to notify about this. */
GMC_destroy (c);
return;
}
-
- send_broken (c, &my_full_id, GMP_get_id (peer), fwd);
+ if (GNUNET_NO == c->destroy)
+ send_broken (c, &my_full_id, GMP_get_id (peer), fwd);
/* Connection will have at least one pending message
* (the one we just scheduled), so no point in checking whether to
* destroy immediately. */
c->destroy = GNUNET_YES;
+ c->state = MESH_CONNECTION_DESTROYED;
+
+ /**
+ * Cancel all queues, if no message is left, connection will be destroyed.
+ */
+ connection_cancel_queues (c, !fwd);
return;
}
* @param c Connection.
* @param fwd Is this about fwd traffic?
*
- * @return GNUNET_YES if origin, GNUNET_NO if relay/terminal.
+ * @return #GNUNET_YES if origin, #GNUNET_NO if relay/terminal.
*/
int
GMC_is_origin (struct MeshConnection *c, int fwd)
* @param fwd Is this about fwd traffic?
* Note that the ROOT is the terminal for BCK traffic!
*
- * @return GNUNET_YES if terminal, GNUNET_NO if relay/origin.
+ * @return #GNUNET_YES if terminal, #GNUNET_NO if relay/origin.
*/
int
GMC_is_terminal (struct MeshConnection *c, int fwd)
* @param c Connection.
* @param fwd Is this about fwd traffic?
*
- * @return GNUNET_YES in case it's OK.
+ * @return #GNUNET_YES in case it's OK to send.
*/
int
GMC_is_sendable (struct MeshConnection *c, int fwd)
struct MeshFlowControl *fc;
fc = fwd ? &c->fwd_fc : &c->bck_fc;
- if (GMC_is_pid_bigger (fc->last_ack_recv, fc->last_pid_sent))
+ if (GM_is_pid_bigger (fc->last_ack_recv, fc->last_pid_sent))
return GNUNET_YES;
return GNUNET_NO;
}
* @param message Message to send. Function makes a copy of it.
* If message is not hop-by-hop, decrements TTL of copy.
* @param c Connection on which this message is transmitted.
- * @param ch Channel on which this message is transmitted, or NULL.
* @param fwd Is this a fwd message?
+ * @param force Force the connection to accept the message (buffer overfill).
+ * @param cont Continuation called once message is sent. Can be NULL.
+ * @param cont_cls Closure for @c cont.
+ *
+ * @return Handle to cancel the message before it's sent.
+ * NULL on error or if @c cont is NULL.
+ * Invalid on @c cont call.
*/
-void
+struct MeshConnectionQueue *
GMC_send_prebuilt_message (const struct GNUNET_MessageHeader *message,
- struct MeshConnection *c,
- struct MeshChannel *ch,
- int fwd)
+ struct MeshConnection *c, int fwd, int force,
+ GMC_sent cont, void *cont_cls)
{
struct MeshFlowControl *fc;
+ struct MeshConnectionQueue *q;
void *data;
size_t size;
uint16_t type;
data = GNUNET_malloc (size);
memcpy (data, message, size);
type = ntohs (message->type);
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Send %s (%u) on connection %s\n",
- GNUNET_MESH_DEBUG_M2S (type), size, GMC_2s (c));
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Send %s (%u bytes) on connection %s\n",
+ GM_m2s (type), size, GMC_2s (c));
fc = fwd ? &c->fwd_fc : &c->bck_fc;
- droppable = GNUNET_YES;
+ droppable = GNUNET_NO == force;
switch (type)
{
struct GNUNET_MESH_Encrypted *emsg;
+ struct GNUNET_MESH_KX *kmsg;
struct GNUNET_MESH_ACK *amsg;
struct GNUNET_MESH_Poll *pmsg;
struct GNUNET_MESH_ConnectionDestroy *dmsg;
if (0 == ttl)
{
GNUNET_break_op (0);
- return;
+ GNUNET_free (data);
+ return NULL;
}
emsg->cid = c->id;
emsg->ttl = htonl (ttl - 1);
- emsg->pid = htonl (fwd ? c->fwd_fc.next_pid++ : c->bck_fc.next_pid++);
+ emsg->pid = htonl (fc->next_pid++);
LOG (GNUNET_ERROR_TYPE_DEBUG, " Q_N+ %p %u\n", fc, fc->queue_n);
- fc->queue_n++;
- LOG (GNUNET_ERROR_TYPE_DEBUG, "pid %u\n", ntohl (emsg->pid));
- LOG (GNUNET_ERROR_TYPE_DEBUG, "last pid %u\n", fc->last_pid_sent);
- LOG (GNUNET_ERROR_TYPE_DEBUG, " ack %u\n", fc->last_ack_recv);
- if (GMC_is_pid_bigger (fc->last_pid_sent + 1, fc->last_ack_recv))
+ if (GNUNET_YES == droppable)
+ {
+ fc->queue_n++;
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "pid %u\n", ntohl (emsg->pid));
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "last pid sent %u\n", fc->last_pid_sent);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " ack recv %u\n", fc->last_ack_recv);
+ }
+ else
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " not droppable, Q_N stays the same\n");
+ }
+ if (GM_is_pid_bigger (fc->last_pid_sent + 1, fc->last_ack_recv))
{
GMC_start_poll (c, fwd);
}
break;
+ case GNUNET_MESSAGE_TYPE_MESH_KX:
+ kmsg = (struct GNUNET_MESH_KX *) data;
+ kmsg->cid = c->id;
+ break;
+
case GNUNET_MESSAGE_TYPE_MESH_ACK:
amsg = (struct GNUNET_MESH_ACK *) data;
amsg->cid = c->id;
case GNUNET_MESSAGE_TYPE_MESH_POLL:
pmsg = (struct GNUNET_MESH_Poll *) data;
pmsg->cid = c->id;
- pmsg->pid = htonl (fwd ? c->fwd_fc.last_pid_sent : c->bck_fc.last_pid_sent);
LOG (GNUNET_ERROR_TYPE_DEBUG, " poll %u\n", ntohl (pmsg->pid));
droppable = GNUNET_NO;
break;
case GNUNET_MESSAGE_TYPE_MESH_CONNECTION_CREATE:
case GNUNET_MESSAGE_TYPE_MESH_CONNECTION_ACK:
+ case GNUNET_MESSAGE_TYPE_MESH_KEEPALIVE:
break;
default:
GNUNET_break (0);
+ GNUNET_free (data);
+ return NULL;
}
if (fc->queue_n > fc->queue_max && droppable)
"queue full: %u/%u\n",
fc->queue_n, fc->queue_max);
if (GNUNET_MESSAGE_TYPE_MESH_ENCRYPTED == type)
+ {
fc->queue_n--;
- return; /* Drop this message */
+ fc->next_pid--;
+ }
+ GNUNET_free (data);
+ return NULL; /* Drop this message */
}
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " C_P+ %p %u\n", c, c->pending_messages);
c->pending_messages++;
- GMP_queue_add (get_hop (c, fwd), data, type, size, c, ch, fwd,
- &message_sent, NULL);
+ q = GNUNET_new (struct MeshConnectionQueue);
+ q->forced = !droppable;
+ q->q = GMP_queue_add (get_hop (c, fwd), data, type, size, c, fwd,
+ &message_sent, q);
+ if (NULL == q->q)
+ {
+ GNUNET_break (0);
+ GNUNET_free (data);
+ GNUNET_free (q);
+ return NULL;
+ }
+ q->cont = cont;
+ q->cont_cls = cont_cls;
+ return q;
+}
+
+
+/**
+ * Cancel a previously sent message while it's in the queue.
+ *
+ * ONLY can be called before the continuation given to the send function
+ * is called. Once the continuation is called, the message is no longer in the
+ * queue.
+ *
+ * @param q Handle to the queue.
+ */
+void
+GMC_cancel (struct MeshConnectionQueue *q)
+{
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "! GMC cancel message\n");
+
+ /* queue destroy calls message_sent, which calls q->cont and frees q */
+ GMP_queue_destroy (q->q, GNUNET_YES);
}
void
GMC_send_create (struct MeshConnection *connection)
{
- enum MeshTunnel3State state;
+ enum MeshTunnel3CState state;
size_t size;
size = sizeof (struct GNUNET_MESH_ConnectionCreate);
size += connection->path->length * sizeof (struct GNUNET_PeerIdentity);
+
LOG (GNUNET_ERROR_TYPE_DEBUG, "Send connection create\n");
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " C_P+ %p %u (create)\n",
+ connection, connection->pending_messages);
+ connection->pending_messages++;
+
GMP_queue_add (get_next_hop (connection), NULL,
GNUNET_MESSAGE_TYPE_MESH_CONNECTION_CREATE,
- size, connection, NULL,
- GNUNET_YES, &message_sent, NULL);
- state = GMT_get_state (connection->t);
+ size, connection, GNUNET_YES, &message_sent, NULL);
+
+ state = GMT_get_cstate (connection->t);
if (MESH_TUNNEL3_SEARCHING == state || MESH_TUNNEL3_NEW == state)
- GMT_change_state (connection->t, MESH_TUNNEL3_WAITING);
+ GMT_change_cstate (connection->t, MESH_TUNNEL3_WAITING);
if (MESH_CONNECTION_NEW == connection->state)
connection_change_state (connection, MESH_CONNECTION_SENT);
}
GMC_2s (c));
if (GNUNET_NO == GMC_is_terminal (c, GNUNET_YES))
- GMC_send_prebuilt_message (&msg.header, c, NULL, GNUNET_YES);
+ GMC_send_prebuilt_message (&msg.header, c,
+ GNUNET_YES, GNUNET_YES, NULL, NULL);
if (GNUNET_NO == GMC_is_terminal (c, GNUNET_NO))
- GMC_send_prebuilt_message (&msg.header, c, NULL, GNUNET_NO);
+ GMC_send_prebuilt_message (&msg.header, c,
+ GNUNET_NO, GNUNET_YES, NULL, NULL);
c->destroy = GNUNET_YES;
+ c->state = MESH_CONNECTION_DESTROYED;
}
struct MeshFlowControl *fc;
fc = fwd ? &c->fwd_fc : &c->bck_fc;
- if (GNUNET_SCHEDULER_NO_TASK != fc->poll_task)
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " *** POLL %s requested\n",
+ GM_f2s (fwd));
+ if (GNUNET_SCHEDULER_NO_TASK != fc->poll_task || NULL != fc->poll_msg)
{
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " *** not needed (%u, %p)\n",
+ fc->poll_task, fc->poll_msg);
return;
}
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " *** POLL started on request\n");
fc->poll_task = GNUNET_SCHEDULER_add_delayed (fc->poll_time,
&connection_poll,
fc);
const char *
GMC_2s (struct MeshConnection *c)
{
+ if (NULL == c)
+ return "NULL";
+
if (NULL != c->t)
{
static char buf[128];
return buf;
}
return GNUNET_h2s (&c->id);
-}
\ No newline at end of file
+}