#include "gnunet_statistics_service.h"
#include "mesh_path.h"
-#include "mesh_protocol_enc.h"
-#include "mesh_enc.h"
+#include "mesh_protocol.h"
+#include "mesh.h"
#include "gnunet-service-mesh_connection.h"
#include "gnunet-service-mesh_peer.h"
#include "gnunet-service-mesh_tunnel.h"
*/
struct MeshConnectionQueue
{
+ /**
+ * Peer queue handle, to cancel if necessary.
+ */
struct MeshPeerQueue *q;
+
+ /**
+ * Continuation to call once sent.
+ */
GMC_sent cont;
+
+ /**
+ * Closure for @c cont.
+ */
void *cont_cls;
};
*/
static struct GNUNET_TIME_Relative refresh_connection_time;
+/**
+ * How often to send path create / ACKs.
+ */
+static struct GNUNET_TIME_Relative create_connection_time;
+
/******************************************************************************/
/******************************** STATIC ***********************************/
return "MESH_CONNECTION_ACK";
case MESH_CONNECTION_READY:
return "MESH_CONNECTION_READY";
+ case MESH_CONNECTION_DESTROYED:
+ return "MESH_CONNECTION_DESTROYED";
default:
return "MESH_CONNECTION_STATE_ERROR";
}
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Connection %s state was %s\n",
GMC_2s (c), GMC_state2s (c->state));
+ if (MESH_CONNECTION_DESTROYED == c->state)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "state not changing anymore\n");
+ return;
+ }
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Connection %s state is now %s\n",
GMC_2s (c), GMC_state2s (state));
*
* @param cls Closure (FC).
* @param c Connection this message was on.
+ * @param q Queue handler this call invalidates.
* @param type Type of message sent.
* @param fwd Was this a FWD going message?
* @param size Size of the message.
* @param size Size of the message.
* @param wait Time spent waiting for core (only the time for THIS message)
*/
-static void
+static void
message_sent (void *cls,
struct MeshConnection *c, uint16_t type,
int fwd, size_t size,
* #GNUNET_NO for BCK.
* #GNUNET_SYSERR for errors.
*/
-static int
+static int
is_fwd (const struct MeshConnection *c,
const struct GNUNET_PeerIdentity *sender)
{
*
* @param c Connection to keep alive..
* @param fwd Is this a FWD keepalive? (owner -> dest).
+ *
+ * FIXME use only one type, register in GMC_send_prebuilt_message()
*/
static void
connection_keepalive (struct MeshConnection *c, int fwd)
{
case MESH_CONNECTION_NEW:
GNUNET_break (0);
+ /* fall-through */
case MESH_CONNECTION_SENT:
connection_recreate (c, fwd);
break;
connection_fwd_keepalive (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct MeshConnection *c = cls;
+ struct GNUNET_TIME_Relative delay;
c->fwd_maintenance_task = GNUNET_SCHEDULER_NO_TASK;
if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
return;
connection_maintain (c, GNUNET_YES);
- c->fwd_maintenance_task = GNUNET_SCHEDULER_add_delayed (refresh_connection_time,
+ delay = c->state == MESH_CONNECTION_READY ?
+ refresh_connection_time : create_connection_time;
+ c->fwd_maintenance_task = GNUNET_SCHEDULER_add_delayed (delay,
&connection_fwd_keepalive,
c);
}
connection_bck_keepalive (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct MeshConnection *c = cls;
+ struct GNUNET_TIME_Relative delay;
c->bck_maintenance_task = GNUNET_SCHEDULER_NO_TASK;
if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
return;
connection_maintain (c, GNUNET_NO);
- c->bck_maintenance_task = GNUNET_SCHEDULER_add_delayed (refresh_connection_time,
+ delay = c->state == MESH_CONNECTION_READY ?
+ refresh_connection_time : create_connection_time;
+ c->bck_maintenance_task = GNUNET_SCHEDULER_add_delayed (delay,
&connection_bck_keepalive,
c);
}
/**
* Cancel all transmissions that belong to a certain connection.
- *
+ *
* If the connection is scheduled for destruction and no more messages are left,
* the connection will be destroyed by the continuation call.
*
struct MeshFlowControl *fc;
struct MeshPeer *peer;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ " *** Cancel %s queues for connection %s\n",
+ fwd ? "FWD" : "BCK", GMC_2s (c));
if (NULL == c)
{
GNUNET_break (0);
{
GNUNET_SCHEDULER_cancel (fc->poll_task);
fc->poll_task = GNUNET_SCHEDULER_NO_TASK;
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " *** Cancel POLL in ccq for fc %p\n", fc);
}
peer = get_hop (c, fwd);
GMP_queue_cancel (peer, c);
*
* @param cls Closure (FC).
* @param c Connection this message was on.
+ * @param q Queue handler this call invalidates.
* @param type Type of message sent.
* @param fwd Was this a FWD going message?
* @param size Size of the message.
{
struct MeshFlowControl *fc = cls;
- LOG (GNUNET_ERROR_TYPE_DEBUG, " *** POLL sent, scheduling new one!\n");
+ if (2 == c->destroy)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " *** POLL canceled on shutdown\n");
+ return;
+ }
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ " *** POLL sent for , scheduling new one!\n");
fc->poll_msg = NULL;
fc->poll_time = GNUNET_TIME_STD_BACKOFF (fc->poll_time);
fc->poll_task = GNUNET_SCHEDULER_add_delayed (fc->poll_time,
&connection_poll, fc);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " task %u\n", fc->poll_task);
+
}
/**
if (GNUNET_SCHEDULER_NO_TASK != *ti)
GNUNET_SCHEDULER_cancel (*ti);
- if (GMC_is_origin (c, fwd)) /* Endpoint */
+ if (GMC_is_origin (c, fwd)) /* Startpoint */
{
f = fwd ? &connection_fwd_keepalive : &connection_bck_keepalive;
*ti = GNUNET_SCHEDULER_add_delayed (refresh_connection_time, f, c);
}
- else /* Relay */
+ else /* Relay, endpoint. */
{
struct GNUNET_TIME_Relative delay;
* Add the connection to the list of both neighbors.
*
* @param c Connection.
+ *
+ * @return #GNUNET_OK if everything went fine
+ * #GNUNET_SYSERR if the was an error and @c c is malformed.
*/
-static void
+static int
register_neighbors (struct MeshConnection *c)
{
- struct MeshPeer *peer;
+ struct MeshPeer *next_peer;
+ struct MeshPeer *prev_peer;
- peer = get_next_hop (c);
- if (GNUNET_NO == GMP_is_neighbor (peer))
- {
- GMC_destroy (c);
- return;
- }
- GMP_add_connection (peer, c);
- peer = get_prev_hop (c);
- if (GNUNET_NO == GMP_is_neighbor (peer))
- {
- GMC_destroy (c);
- return;
- }
- GMP_add_connection (peer, c);
+ next_peer = get_next_hop (c);
+ prev_peer = get_prev_hop (c);
+
+ if (GNUNET_NO == GMP_is_neighbor (next_peer)
+ || GNUNET_NO == GMP_is_neighbor (prev_peer))
+ return GNUNET_SYSERR;
+
+ GMP_add_connection (next_peer, c);
+ GMP_add_connection (prev_peer, c);
+
+ return GNUNET_OK;
}
LOG (GNUNET_ERROR_TYPE_DEBUG, " Creating connection\n");
c = GMC_new (cid, NULL, path_duplicate (path), own_pos);
if (NULL == c)
+ {
+ path_destroy (path);
return GNUNET_OK;
+ }
connection_reset_timeout (c, GNUNET_YES);
}
else
{
- path = NULL;
+ path = path_duplicate (c->path);
}
if (MESH_CONNECTION_NEW == c->state)
connection_change_state (c, MESH_CONNECTION_SENT);
if (c->own_pos == size - 1)
{
LOG (GNUNET_ERROR_TYPE_DEBUG, " It's for us!\n");
- GMP_add_path_to_origin (orig_peer, path, GNUNET_YES);
+ GMP_add_path_to_origin (orig_peer, path_duplicate (path), GNUNET_YES);
add_to_peer (c, orig_peer);
if (MESH_TUNNEL3_NEW == GMT_get_state (c->t))
connection_change_state (c, MESH_CONNECTION_ACK);
/* Keep tunnel alive in direction dest->owner*/
- connection_reset_timeout (c, GNUNET_NO);
+ c->bck_maintenance_task =
+ GNUNET_SCHEDULER_add_delayed (create_connection_time,
+ &connection_bck_keepalive, c);
}
else
{
/* It's for somebody else! Retransmit. */
LOG (GNUNET_ERROR_TYPE_DEBUG, " Retransmitting.\n");
GMP_add_path (dest_peer, path_duplicate (path), GNUNET_NO);
- GMP_add_path_to_origin (orig_peer, path, GNUNET_NO);
+ GMP_add_path_to_origin (orig_peer, path_duplicate (path), GNUNET_NO);
GMC_send_prebuilt_message (message, c, GNUNET_YES, NULL, NULL);
}
+ path_destroy (path);
return GNUNET_OK;
}
struct MeshConnection *c;
struct MeshPeerPath *p;
struct MeshPeer *pi;
+ enum MeshConnectionState oldstate;
int fwd;
LOG (GNUNET_ERROR_TYPE_DEBUG, "\n\n");
return GNUNET_OK;
}
-
- LOG (GNUNET_ERROR_TYPE_DEBUG, " via peer %s\n",
- GNUNET_i2s (peer));
+ oldstate = c->state;
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " via peer %s\n", GNUNET_i2s (peer));
pi = GMP_get (peer);
if (get_next_hop (c) == pi)
{
LOG (GNUNET_ERROR_TYPE_DEBUG, " SYNACK\n");
fwd = GNUNET_NO;
- if (MESH_CONNECTION_SENT == c->state)
+ if (MESH_CONNECTION_SENT == oldstate)
connection_change_state (c, MESH_CONNECTION_ACK);
}
else if (get_prev_hop (c) == pi)
GNUNET_break_op (0);
return GNUNET_OK;
}
+
connection_reset_timeout (c, fwd);
/* Add path to peers? */
/* Message for us as creator? */
if (GMC_is_origin (c, GNUNET_YES))
{
+ if (GNUNET_NO != fwd)
+ {
+ GNUNET_break_op (0);
+ return GNUNET_OK;
+ }
LOG (GNUNET_ERROR_TYPE_DEBUG, " Connection (SYN)ACK for us!\n");
+
+ /* If just created, cancel the short timeout and start a long one */
+ if (MESH_CONNECTION_SENT == oldstate)
+ connection_reset_timeout (c, GNUNET_YES);
+
+ /* Change connection and tunnel state */
connection_change_state (c, MESH_CONNECTION_READY);
if (MESH_TUNNEL3_WAITING == GMT_get_state (c->t))
GMT_change_state (c->t, MESH_TUNNEL3_READY);
+
+ /* Send ACK (~TCP ACK)*/
send_connection_ack (c, GNUNET_YES);
return GNUNET_OK;
}
/* Message for us as destination? */
if (GMC_is_terminal (c, GNUNET_YES))
{
+ if (GNUNET_YES != fwd)
+ {
+ GNUNET_break_op (0);
+ return GNUNET_OK;
+ }
LOG (GNUNET_ERROR_TYPE_DEBUG, " Connection ACK for us!\n");
- connection_change_state (c, MESH_CONNECTION_READY);
+
+ /* If just created, cancel the short timeout and start a long one */
+ if (MESH_CONNECTION_ACK == oldstate)
+ connection_reset_timeout (c, GNUNET_NO);
+
+ /* Change tunnel state */
if (MESH_TUNNEL3_WAITING == GMT_get_state (c->t))
GMT_change_state (c->t, MESH_TUNNEL3_READY);
+
return GNUNET_OK;
}
*/
GNUNET_STATISTICS_update (stats, "# control on unknown tunnel",
1, GNUNET_NO);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " connection unknown: already destroyed?\n");
return GNUNET_OK;
}
fwd = is_fwd (c, peer);
}
GMC_send_prebuilt_message (message, c, fwd, NULL, NULL);
c->destroy = GNUNET_YES;
+ c->state = MESH_CONNECTION_DESTROYED;
return GNUNET_OK;
}
if (NULL == c)
{
GNUNET_STATISTICS_update (stats, "# unknown connection", 1, GNUNET_NO);
- LOG (GNUNET_ERROR_TYPE_DEBUG, "WARNING connection unknown\n");
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "WARNING connection %s unknown\n",
+ GNUNET_h2s (&msg->cid));
return GNUNET_OK;
}
return;
}
+ if (GNUNET_NO != c->destroy)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " being destroyed, why bother...\n");
+ return;
+ }
+
/* Get available buffer space */
if (GMC_is_terminal (c, fwd))
{
GNUNET_SCHEDULER_shutdown ();
return;
}
+ create_connection_time = GNUNET_TIME_UNIT_SECONDS;
connections = GNUNET_CONTAINER_multihashmap_create (1024, GNUNET_YES);
}
+
+/**
+ * Destroy each connection on shutdown.
+ *
+ * @param cls Closure (unused).
+ * @param key Current key code (CID, unused).
+ * @param value Value in the hash map (connection)
+ *
+ * @return #GNUNET_YES, because we should continue to iterate,
+ */
+static int
+shutdown_iterator (void *cls,
+ const struct GNUNET_HashCode *key,
+ void *value)
+{
+ struct MeshConnection *c = value;
+
+ GMC_destroy (c);
+ return GNUNET_YES;
+}
+
+
/**
* Shut down the connections subsystem.
*/
void
GMC_shutdown (void)
{
+ GNUNET_CONTAINER_multihashmap_iterate (connections, &shutdown_iterator, NULL);
GNUNET_CONTAINER_multihashmap_destroy (connections);
+ connections = NULL;
}
if (0 == own_pos)
{
c->fwd_maintenance_task =
- GNUNET_SCHEDULER_add_delayed (refresh_connection_time,
+ GNUNET_SCHEDULER_add_delayed (create_connection_time,
&connection_fwd_keepalive, c);
}
- register_neighbors (c);
+ if (GNUNET_OK != register_neighbors (c))
+ {
+ GMC_destroy (c);
+ return NULL;
+ }
+
return c;
}
c->destroy = 2;
LOG (GNUNET_ERROR_TYPE_DEBUG, "destroying connection %s\n", GMC_2s (c));
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " fc's f: %p, b: %p\n",
+ &c->fwd_fc, &c->bck_fc);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " fc tasks f: %u, b: %u\n",
+ c->fwd_fc.poll_task, c->bck_fc.poll_task);
/* Cancel all traffic */
connection_cancel_queues (c, GNUNET_YES);
connection_cancel_queues (c, GNUNET_NO);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " fc tasks f: %u, b: %u\n",
+ c->fwd_fc.poll_task, c->bck_fc.poll_task);
+
/* Cancel maintainance task (keepalive/timeout) */
if (GNUNET_SCHEDULER_NO_TASK != c->fwd_maintenance_task)
GNUNET_SCHEDULER_cancel (c->fwd_maintenance_task);
if (GNUNET_SCHEDULER_NO_TASK != c->bck_maintenance_task)
GNUNET_SCHEDULER_cancel (c->bck_maintenance_task);
+ if (GNUNET_SCHEDULER_NO_TASK != c->fwd_fc.poll_task)
+ {
+ GNUNET_SCHEDULER_cancel (c->fwd_fc.poll_task);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " *** POLL FWD canceled\n");
+ }
+ if (GNUNET_SCHEDULER_NO_TASK != c->bck_fc.poll_task)
+ {
+ GNUNET_SCHEDULER_cancel (c->bck_fc.poll_task);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " *** POLL BCK canceled\n");
+ }
+ if (NULL != c->fwd_fc.poll_msg)
+ {
+ GMC_cancel (c->fwd_fc.poll_msg);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " *** POLL msg FWD canceled\n");
+ }
+ if (NULL != c->bck_fc.poll_msg)
+ {
+ GMC_cancel (c->bck_fc.poll_msg);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " *** POLL msg BCK canceled\n");
+ }
/* Unregister from neighbors */
unregister_neighbors (c);
if (GNUNET_NO == GMC_is_origin (c, GNUNET_YES))
path_destroy (c->path);
+ GNUNET_break (GNUNET_YES ==
+ GNUNET_CONTAINER_multihashmap_remove (connections, &c->id, c));
+
GNUNET_free (c);
}
{
int fwd;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ " notify broken on %s due to %s disconnect\n",
+ GMC_2s (c), GMP_2s (peer));
+
fwd = peer == get_prev_hop (c);
if (GNUNET_YES == GMC_is_terminal (c, fwd))
* (the one we just scheduled), so no point in checking whether to
* destroy immediately. */
c->destroy = GNUNET_YES;
+ c->state = MESH_CONNECTION_DESTROYED;
/**
* Cancel all queues, if no message is left, connection will be destroyed.
* @param cont Continuation called once message is sent. Can be NULL.
* @param cont_cls Closure for @c cont.
*
- * @return Handle to cancel the message before it's sent. NULL on error.
+ * @return Handle to cancel the message before it's sent.
+ * NULL on error or if @c cont is NULL.
* Invalid on @c cont call.
*/
struct MeshConnectionQueue *
if (0 == ttl)
{
GNUNET_break_op (0);
+ GNUNET_free (data);
return NULL;
}
emsg->cid = c->id;
fc->queue_n--;
fc->next_pid--;
}
+ GNUNET_free (data);
return NULL; /* Drop this message */
}
LOG (GNUNET_ERROR_TYPE_DEBUG, " C_P+ %p %u\n", c, c->pending_messages);
c->pending_messages++;
+ if (NULL == cont)
+ {
+ (void) GMP_queue_add (get_hop (c, fwd), data, type, size, c, fwd,
+ &message_sent, NULL);
+ return NULL;
+ }
+
q = GNUNET_new (struct MeshConnectionQueue);
q->q = GMP_queue_add (get_hop (c, fwd), data, type, size, c, fwd,
&message_sent, q);
if (NULL == q->q)
{
GNUNET_break (0);
+ GNUNET_free (data);
GNUNET_free (q);
return NULL;
}
* is called. Once the continuation is called, the message is no longer in the
* queue.
*
- * If the send function was given no continuation, GMC_cancel should
- * NOT be called, since it's not possible to determine if the message has
- * already been sent.
- *
* @param q Handle to the queue.
*/
void
if (GNUNET_NO == GMC_is_terminal (c, GNUNET_NO))
GMC_send_prebuilt_message (&msg.header, c, GNUNET_NO, NULL, NULL);
c->destroy = GNUNET_YES;
+ c->state = MESH_CONNECTION_DESTROYED;
}
struct MeshFlowControl *fc;
fc = fwd ? &c->fwd_fc : &c->bck_fc;
- if (GNUNET_SCHEDULER_NO_TASK != fc->poll_task && NULL != fc->poll_msg)
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " *** POLL %s requested\n",
+ fwd ? "FWD" : "BCK");
+ if (GNUNET_SCHEDULER_NO_TASK != fc->poll_task || NULL != fc->poll_msg)
{
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " *** not needed (%u, %p)\n",
+ fc->poll_task, fc->poll_msg);
return;
}
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " *** POLL started on request\n");
fc->poll_task = GNUNET_SCHEDULER_add_delayed (fc->poll_time,
&connection_poll,
fc);
return buf;
}
return GNUNET_h2s (&c->id);
-}
\ No newline at end of file
+}