/**
* Peer queue handle, to cancel if necessary.
*/
- struct CadetPeerQueue *q;
+ struct CadetPeerQueue *peer_q;
/**
* Continuation to call once sent.
/**
* How many connections are we willing to maintain.
- * Local connections are always allowed, even if there are more connections than max.
+ * Local connections are always allowed,
+ * even if there are more connections than max.
*/
static unsigned long long max_connections;
}
+/**
+ * Update performance information if we are a connection's endpoint.
+ *
+ * @param c Connection to update.
+ * @param wait How much time did we wait to send the last message.
+ * @param size Size of the last message.
+ */
+static void
+update_perf (struct CadetConnection *c,
+ struct GNUNET_TIME_Relative wait,
+ uint16_t size)
+{
+ struct CadetConnectionPerformance *p;
+ double usecsperbyte;
+
+ if (NULL == c->perf)
+ return; /* Only endpoints are interested in timing. */
+
+ p = c->perf;
+ usecsperbyte = ((double) wait.rel_value_us) / size;
+ if (p->size == AVG_MSGS)
+ {
+ /* Array is full. Substract oldest value, add new one and store. */
+ p->avg -= (p->usecsperbyte[p->idx] / AVG_MSGS);
+ p->usecsperbyte[p->idx] = usecsperbyte;
+ p->avg += (p->usecsperbyte[p->idx] / AVG_MSGS);
+ }
+ else
+ {
+ /* Array not yet full. Add current value to avg and store. */
+ p->usecsperbyte[p->idx] = usecsperbyte;
+ p->avg *= p->size;
+ p->avg += p->usecsperbyte[p->idx];
+ p->size++;
+ p->avg /= p->size;
+ }
+ p->idx = (p->idx + 1) % AVG_MSGS;
+}
+
+
/**
* Callback called when a connection queued message is sent.
*
* Calculates the average time and connection packet tracking.
*
- * @param cls Closure (ConnectionQueue Handle).
+ * @param cls Closure (ConnectionQueue Handle), can be NULL.
* @param c Connection this message was on.
+ * @param fwd Was this a FWD going message?
* @param sent Was it really sent? (Could have been canceled)
* @param type Type of message sent.
- * @param pid Packet ID, or 0 if not applicable (create, destroy, etc).
- * @param fwd Was this a FWD going message?
+ * @param payload_type Type of payload, if applicable.
+ * @param pid Message ID, or 0 if not applicable (create, destroy, etc).
* @param size Size of the message.
* @param wait Time spent waiting for core (only the time for THIS message)
- * @return #GNUNET_YES if connection was destroyed, #GNUNET_NO otherwise.
*/
-static int
+static void
conn_message_sent (void *cls,
- struct CadetConnection *c, int sent,
- uint16_t type, uint32_t pid, int fwd, size_t size,
+ struct CadetConnection *c, int fwd, int sent,
+ uint16_t type, uint16_t payload_type, uint32_t pid,
+ size_t size,
struct GNUNET_TIME_Relative wait)
{
- struct CadetConnectionPerformance *p;
- struct CadetFlowControl *fc;
struct CadetConnectionQueue *q = cls;
- double usecsperbyte;
+ struct CadetFlowControl *fc;
int forced;
GCC_check_connections ();
- LOG (GNUNET_ERROR_TYPE_DEBUG, "connection message_sent\n");
+ /* If c is NULL, nothing to update. */
+ if (NULL == c)
+ {
+ if (type != GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN
+ && type != GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY)
+ {
+ LOG (GNUNET_ERROR_TYPE_ERROR, "Message %s sent on NULL connection!\n",
+ GC_m2s (type));
+ }
+ GCC_check_connections ();
+ return;
+ }
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "connection message_sent\n");
GCC_debug (c, GNUNET_ERROR_TYPE_DEBUG);
+ /* Update flow control info. */
fc = fwd ? &c->fwd_fc : &c->bck_fc;
LOG (GNUNET_ERROR_TYPE_DEBUG, " %ssent %s %s pid %u\n",
- sent ? "" : "not ", GC_f2s (fwd), GC_m2s (type), pid);
+ sent ? "" : "not ", GC_f2s (fwd),
+ GC_m2s (type), GC_m2s (payload_type), pid);
if (NULL != q)
{
forced = q->forced;
{
forced = GNUNET_NO;
}
- if (NULL == c)
- {
- if (type != GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN
- && type != GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY)
- {
- LOG (GNUNET_ERROR_TYPE_ERROR, "Message %s sent on NULL connection!\n",
- GC_m2s (type));
- }
- GCC_check_connections ();
- return GNUNET_NO;
- }
+
LOG (GNUNET_ERROR_TYPE_DEBUG, " C_P- %p %u\n", c, c->pending_messages);
c->pending_messages--;
if ( (GNUNET_YES == c->destroy) &&
"! destroying connection!\n");
GCC_destroy (c);
GCC_check_connections ();
- return GNUNET_YES;
+ return;
}
+
/* Send ACK if needed, after accounting for sent ID in fc->queue_n */
switch (type)
{
}
LOG (GNUNET_ERROR_TYPE_DEBUG, "! message sent!\n");
- if (NULL == c->perf)
- return GNUNET_NO; /* Only endpoints are interested in timing. */
-
- p = c->perf;
- usecsperbyte = ((double) wait.rel_value_us) / size;
- if (p->size == AVG_MSGS)
- {
- /* Array is full. Substract oldest value, add new one and store. */
- p->avg -= (p->usecsperbyte[p->idx] / AVG_MSGS);
- p->usecsperbyte[p->idx] = usecsperbyte;
- p->avg += (p->usecsperbyte[p->idx] / AVG_MSGS);
- }
- else
- {
- /* Array not yet full. Add current value to avg and store. */
- p->usecsperbyte[p->idx] = usecsperbyte;
- p->avg *= p->size;
- p->avg += p->usecsperbyte[p->idx];
- p->size++;
- p->avg /= p->size;
- }
- p->idx = (p->idx + 1) % AVG_MSGS;
+ update_perf (c, wait, size);
GCC_check_connections ();
- return GNUNET_NO;
}
* Is traffic coming from this sender 'FWD' traffic?
*
* @param c Connection to check.
- * @param sender Peer identity of neighbor.
+ * @param sender Short peer identity of neighbor.
*
* @return #GNUNET_YES in case the sender is the 'prev' hop and therefore
* the traffic is 'FWD'.
* #GNUNET_NO for BCK.
- * #GNUNET_SYSERR for errors.
+ * #GNUNET_SYSERR for errors (sender isn't a hop in the connection).
*/
static int
is_fwd (const struct CadetConnection *c,
- const struct GNUNET_PeerIdentity *sender)
+ const struct CadetPeer *sender)
{
GNUNET_PEER_Id id;
- id = GNUNET_PEER_search (sender);
+ id = GCP_get_short_id (sender);
if (GCP_get_short_id (get_prev_hop (c)) == id)
return GNUNET_YES;
if (GCP_get_short_id (get_next_hop (c)) == id)
return GNUNET_NO;
- GNUNET_break (0);
return GNUNET_SYSERR;
}
* Sends a CONNECTION ACK message in reponse to a received CONNECTION_CREATE
* or a first CONNECTION_ACK directed to us.
*
- * @param connection Connection to confirm.
+ * @param c Connection to confirm.
* @param fwd Should we send it FWD? (root->dest)
* (First (~SYNACK) goes BCK, second (~ACK) goes FWD)
*/
static void
-send_connection_ack (struct CadetConnection *connection, int fwd)
+send_connection_ack (struct CadetConnection *c, int fwd)
{
+ struct GNUNET_CADET_ConnectionACK msg;
struct CadetTunnel *t;
size_t size = sizeof (struct GNUNET_CADET_ConnectionACK);
GCC_check_connections ();
- t = connection->t;
+ t = c->t;
LOG (GNUNET_ERROR_TYPE_INFO,
"==> { C %s ACK} %19s on conn %s (%p) %s [%5u]\n",
- GC_f2s (!fwd), "", GCC_2s (connection), connection, GC_f2s (fwd), size);
- GCP_queue_add (get_hop (connection, fwd), NULL,
- GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK, UINT16_MAX, 0,
- size, connection, fwd, &conn_message_sent, NULL);
- connection->pending_messages++;
+ GC_f2s (!fwd), "", GCC_2s (c), c, GC_f2s (fwd), size);
+
+ msg.header.size = htons (sizeof (struct GNUNET_CADET_ConnectionACK));
+ msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK);
+ msg.cid = c->id;
+
+ GNUNET_assert (NULL == c->maintenance_q);
+ c->maintenance_q = GCP_send (get_hop (c, fwd), &msg.header,
+ GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK, 0,
+ c, fwd,
+ &conn_message_sent, NULL);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " C_P+ %p %u (conn`ACK)\n",
+ c, c->pending_messages);
+ c->pending_messages++;
+
if (CADET_TUNNEL_NEW == GCT_get_cstate (t))
GCT_change_cstate (t, CADET_TUNNEL_WAITING);
- if (CADET_CONNECTION_READY != connection->state)
- connection_change_state (connection, CADET_CONNECTION_SENT);
+ if (CADET_CONNECTION_READY != c->state)
+ connection_change_state (c, CADET_CONNECTION_SENT);
GCC_check_connections ();
}
* @param connection_id Connection ID.
* @param id1 Peer that has disconnected, probably local peer.
* @param id2 Peer that has disconnected can be NULL if unknown.
- * @param peer Peer to notify (neighbor who sent the connection).
+ * @param neighbor Peer to notify (neighbor who sent the connection).
*/
static void
send_broken_unknown (const struct GNUNET_CADET_Hash *connection_id,
const struct GNUNET_PeerIdentity *id1,
const struct GNUNET_PeerIdentity *id2,
- const struct GNUNET_PeerIdentity *peer_id)
+ struct CadetPeer *neighbor)
{
struct GNUNET_CADET_ConnectionBroken *msg;
- struct CadetPeerQueue *q;
- struct CadetPeer *neighbor;
GCC_check_connections ();
LOG (GNUNET_ERROR_TYPE_INFO, "--> BROKEN on unknown connection %s\n",
msg->peer2 = *id2;
else
memset (&msg->peer2, 0, sizeof (msg->peer2));
- neighbor = GCP_get (peer_id, GNUNET_NO); /* We MUST know neighbor. */
- GNUNET_assert (NULL != neighbor);
- q = GCP_queue_add (neighbor, msg, GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN,
- UINT16_MAX, 2,
- sizeof (struct GNUNET_CADET_ConnectionBroken),
- NULL, GNUNET_SYSERR, /* connection, fwd */
- NULL, NULL); /* continuation */
- GNUNET_assert (NULL != q);
+ GNUNET_assert (NULL != GCP_send (neighbor, &msg->header,
+ UINT16_MAX, 2,
+ NULL, GNUNET_SYSERR, /* connection, fwd */
+ NULL, NULL)); /* continuation */
GCC_check_connections ();
}
}
-/**
- * @brief Re-initiate traffic on this connection if necessary.
- *
- * Check if there is traffic queued towards this peer
- * and the core transmit handle is NULL (traffic was stalled).
- * If so, call core tmt rdy.
- *
- * @param c Connection on which initiate traffic.
- * @param fwd Is this about fwd traffic?
- */
-static void
-connection_unlock_queue (struct CadetConnection *c, int fwd)
-{
- struct CadetPeer *peer;
-
- GCC_check_connections ();
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "connection_unlock_queue %s on %s\n",
- GC_f2s (fwd), GCC_2s (c));
-
- if (GCC_is_terminal (c, fwd))
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG, " is terminal, can unlock!\n");
- return;
- }
-
- peer = get_hop (c, fwd);
- GCP_queue_unlock (peer, c);
- GCC_check_connections ();
-}
-
-
/**
* Cancel all transmissions that belong to a certain connection.
*
int fwd)
{
struct CadetFlowControl *fc;
- struct CadetPeer *peer;
GCC_check_connections ();
LOG (GNUNET_ERROR_TYPE_DEBUG,
GCC_cancel (fc->poll_msg);
LOG (GNUNET_ERROR_TYPE_DEBUG, " cancelled POLL msg for fc %p\n", fc);
}
- peer = get_hop (c, fwd);
- GCP_queue_cancel (peer, c);
GCC_check_connections ();
}
}
-/**
- * Resend all queued messages for a connection on other connections of the
- * same tunnel, if possible. The connection WILL BE DESTROYED by this function.
- *
- * @param c Connection whose messages to resend.
- * @param fwd Resend fwd messages?
- */
-static void
-resend_messages_and_destroy (struct CadetConnection *c, int fwd)
-{
- struct GNUNET_MessageHeader *out_msg;
- struct CadetTunnel *t = c->t;
- struct CadetPeer *neighbor;
- unsigned int pending;
- int destroyed;
-
- GCC_check_connections ();
- mark_destroyed (c);
-
- destroyed = GNUNET_NO;
- neighbor = get_hop (c, fwd);
- pending = c->pending_messages;
-
- while (NULL != (out_msg = GCP_connection_pop (neighbor, c, &destroyed)))
- {
- if (NULL != t)
- GCT_resend_message (out_msg, t);
- GNUNET_free (out_msg);
- }
-
- /* All pending messages should have been popped,
- * and the connection destroyed by the continuation.
- */
- if (GNUNET_YES != destroyed)
- {
- if (0 != pending)
- {
- GNUNET_break (0);
- GCC_debug (c, GNUNET_ERROR_TYPE_ERROR);
- if (NULL != t) GCT_debug (t, GNUNET_ERROR_TYPE_ERROR);
- }
- GCC_destroy (c);
- }
- GCC_check_connections ();
-}
-
-
/**
* Generic connection timeout implementation.
*
static void
connection_timeout (struct CadetConnection *c, int fwd)
{
- struct CadetFlowControl *reverse_fc;
-
GCC_check_connections ();
- reverse_fc = fwd ? &c->bck_fc : &c->fwd_fc;
LOG (GNUNET_ERROR_TYPE_INFO,
"Connection %s %s timed out. Destroying.\n",
return;
}
- /* If dest, salvage queued traffic. */
+ /* If dest, send "broken" notification. */
if (GCC_is_terminal (c, fwd))
{
- const struct GNUNET_PeerIdentity *next_hop;
+ struct CadetPeer *next_hop;
- next_hop = GCP_get_id (fwd ? get_prev_hop (c) : get_next_hop (c));
+ next_hop = fwd ? get_prev_hop (c) : get_next_hop (c);
send_broken_unknown (&c->id, &my_full_id, NULL, next_hop);
- if (0 < reverse_fc->queue_n)
- resend_messages_and_destroy (c, !fwd);
- GCC_check_connections ();
- return;
}
GCC_destroy (c);
* Log receipt of message on stderr (INFO level).
*
* @param message Message received.
- * @param peer Peer who sent the message.
- * @param hash Connection ID.
+ * @param peer Peer who sent the message.
+ * @param conn_id Connection ID of the message.
*/
static void
log_message (const struct GNUNET_MessageHeader *message,
- const struct GNUNET_PeerIdentity *peer,
- const struct GNUNET_CADET_Hash *hash)
+ const struct CadetPeer *peer,
+ const struct GNUNET_CADET_Hash *conn_id)
{
uint16_t size;
uint16_t type;
arrow = "--";
}
LOG (GNUNET_ERROR_TYPE_INFO, "<%s %s on conn %s from %s, %6u bytes\n",
- arrow, GC_m2s (type), GNUNET_h2s (GC_h2hc (hash)),
- GNUNET_i2s (peer), (unsigned int) size);
+ arrow, GC_m2s (type), GNUNET_h2s (GC_h2hc (conn_id)),
+ GCP_2s(peer), (unsigned int) size);
}
/******************************************************************************/
/******************************************************************************/
/**
- * Core handler for connection creation.
+ * Handler for connection creation.
*
- * @param cls Closure (unused).
- * @param peer Sender (neighbor).
- * @param message Message.
- * @return #GNUNET_OK to keep the connection open,
- * #GNUNET_SYSERR to close it (signal serious error)
+ * @param peer Message sender (neighbor).
+ * @param msg Message itself.
*/
-int
-GCC_handle_create (void *cls,
- const struct GNUNET_PeerIdentity *peer,
- const struct GNUNET_MessageHeader *message)
+void
+GCC_handle_create (struct CadetPeer *peer,
+ const struct GNUNET_CADET_ConnectionCreate *msg)
{
- struct GNUNET_CADET_ConnectionCreate *msg;
+ const struct GNUNET_CADET_Hash *cid;
struct GNUNET_PeerIdentity *id;
- struct GNUNET_CADET_Hash *cid;
struct CadetPeerPath *path;
struct CadetPeer *dest_peer;
struct CadetPeer *orig_peer;
uint16_t size;
GCC_check_connections ();
- /* Check size */
- size = ntohs (message->size);
- if (size < sizeof (struct GNUNET_CADET_ConnectionCreate))
- {
- GNUNET_break_op (0);
- return GNUNET_OK;
- }
+ size = ntohs (msg->header.size);
/* Calculate hops */
size -= sizeof (struct GNUNET_CADET_ConnectionCreate);
- if (size % sizeof (struct GNUNET_PeerIdentity))
- {
- GNUNET_break_op (0);
- return GNUNET_OK;
- }
if (0 != size % sizeof (struct GNUNET_PeerIdentity))
{
GNUNET_break_op (0);
- return GNUNET_OK;
+ return;
}
size /= sizeof (struct GNUNET_PeerIdentity);
if (1 > size)
{
GNUNET_break_op (0);
- return GNUNET_OK;
+ return;
}
LOG (GNUNET_ERROR_TYPE_DEBUG, " path has %u hops.\n", size);
/* Get parameters */
- msg = (struct GNUNET_CADET_ConnectionCreate *) message;
cid = &msg->cid;
- log_message (message, peer, cid);
+ log_message (&msg->header, peer, cid);
id = (struct GNUNET_PeerIdentity *) &msg[1];
LOG (GNUNET_ERROR_TYPE_DEBUG, " origin: %s\n", GNUNET_i2s (id));
/* Path was malformed, probably our own ID was not in it. */
GNUNET_STATISTICS_update (stats, "# malformed paths", 1, GNUNET_NO);
GNUNET_break_op (0);
- return GNUNET_OK;
+ return;
}
-
if (0 == own_pos)
{
/* We received this request from a neighbor, we cannot be origin */
GNUNET_STATISTICS_update (stats, "# fake paths", 1, GNUNET_NO);
GNUNET_break_op (0);
path_destroy (path);
- return GNUNET_OK;
+ return;
}
LOG (GNUNET_ERROR_TYPE_DEBUG, " Own position: %u\n", own_pos);
GNUNET_break (0);
path_destroy (path);
GCC_check_connections ();
- return GNUNET_OK;
+ return;
}
send_broken_unknown (cid, &my_full_id,
GNUNET_PEER_resolve2 (path->peers[own_pos + 1]),
peer);
path_destroy (path);
GCC_check_connections ();
- return GNUNET_OK;
+ return;
}
GCP_add_path_to_all (path, GNUNET_NO);
connection_reset_timeout (c, GNUNET_YES);
LOG (GNUNET_ERROR_TYPE_DEBUG, " Retransmitting.\n");
GCP_add_path (dest_peer, path_duplicate (path), GNUNET_NO);
GCP_add_path_to_origin (orig_peer, path_duplicate (path), GNUNET_NO);
- GNUNET_assert (NULL == GCC_send_prebuilt_message (message, 0, 0, c,
- GNUNET_YES, GNUNET_YES,
- NULL, NULL));
+ GNUNET_assert (NULL ==
+ GCC_send_prebuilt_message (&msg->header, 0, 0, c,
+ GNUNET_YES, GNUNET_YES,
+ NULL, NULL));
}
path_destroy (path);
GCC_check_connections ();
- return GNUNET_OK;
}
/**
- * Core handler for path confirmations.
+ * Handler for connection confirmations.
*
- * @param cls closure
- * @param message message
- * @param peer peer identity this notification is about
- * @return #GNUNET_OK to keep the connection open,
- * #GNUNET_SYSERR to close it (signal serious error)
+ * @param peer Message sender (neighbor).
+ * @param msg Message itself.
*/
-int
-GCC_handle_confirm (void *cls,
- const struct GNUNET_PeerIdentity *peer,
- const struct GNUNET_MessageHeader *message)
+void
+GCC_handle_confirm (struct CadetPeer *peer,
+ const struct GNUNET_CADET_ConnectionACK *msg)
{
- struct GNUNET_CADET_ConnectionACK *msg;
struct CadetConnection *c;
- struct CadetPeerPath *p;
- struct CadetPeer *pi;
enum CadetConnectionState oldstate;
int fwd;
GCC_check_connections ();
- msg = (struct GNUNET_CADET_ConnectionACK *) message;
- log_message (message, peer, &msg->cid);
+ log_message (&msg->header, peer, &msg->cid);
c = connection_get (&msg->cid);
if (NULL == c)
{
" don't know the connection!\n");
send_broken_unknown (&msg->cid, &my_full_id, NULL, peer);
GCC_check_connections ();
- return GNUNET_OK;
+ return;
}
-
if (GNUNET_NO != c->destroy)
{
GNUNET_assert (CADET_CONNECTION_DESTROYED == c->state);
+ GNUNET_STATISTICS_update (stats, "# control on dying connection",
+ 1, GNUNET_NO);
LOG (GNUNET_ERROR_TYPE_DEBUG,
"connection %s being destroyed, ignoring confirm\n",
GCC_2s (c));
GCC_check_connections ();
- return GNUNET_OK;
+ return;
}
oldstate = c->state;
- LOG (GNUNET_ERROR_TYPE_DEBUG, " via peer %s\n", GNUNET_i2s (peer));
- pi = GCP_get (peer, GNUNET_YES);
- if (get_next_hop (c) == pi)
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " via peer %s\n", GCP_2s (peer));
+ if (get_next_hop (c) == peer)
{
LOG (GNUNET_ERROR_TYPE_DEBUG, " SYNACK\n");
fwd = GNUNET_NO;
if (CADET_CONNECTION_SENT == oldstate)
connection_change_state (c, CADET_CONNECTION_ACK);
}
- else if (get_prev_hop (c) == pi)
+ else if (get_prev_hop (c) == peer)
{
LOG (GNUNET_ERROR_TYPE_DEBUG, " FINAL ACK\n");
fwd = GNUNET_YES;
}
else
{
+ GNUNET_STATISTICS_update (stats, "# control on connection from wrong peer",
+ 1, GNUNET_NO);
GNUNET_break_op (0);
- return GNUNET_OK;
+ return;
}
connection_reset_timeout (c, fwd);
/* Add path to peers? */
- p = c->path;
- if (NULL != p)
+ if (NULL != c->path)
{
- GCP_add_path_to_all (p, GNUNET_YES);
+ GCP_add_path_to_all (c->path, GNUNET_YES);
}
else
{
}
/* Message for us as creator? */
- if (GCC_is_origin (c, GNUNET_YES))
+ if (GNUNET_YES == GCC_is_origin (c, GNUNET_YES))
{
if (GNUNET_NO != fwd)
{
- GNUNET_break_op (0);
- return GNUNET_OK;
+ GNUNET_break (0);
+ return;
}
LOG (GNUNET_ERROR_TYPE_DEBUG, " Connection (SYN)ACK for us!\n");
if (CADET_CONNECTION_SENT == oldstate)
connection_reset_timeout (c, GNUNET_YES);
- /* Change connection state */
+ /* Change connection state, send ACK */
connection_change_state (c, CADET_CONNECTION_READY);
send_connection_ack (c, GNUNET_YES);
if (CADET_TUNNEL_WAITING == GCT_get_cstate (c->t))
GCT_change_cstate (c->t, CADET_TUNNEL_READY);
GCC_check_connections ();
- return GNUNET_OK;
+ return;
}
/* Message for us as destination? */
{
if (GNUNET_YES != fwd)
{
- GNUNET_break_op (0);
- return GNUNET_OK;
+ GNUNET_break (0);
+ return;
}
LOG (GNUNET_ERROR_TYPE_DEBUG, " Connection ACK for us!\n");
if (CADET_TUNNEL_WAITING == GCT_get_cstate (c->t))
GCT_change_cstate (c->t, CADET_TUNNEL_READY);
GCC_check_connections ();
- return GNUNET_OK;
+ return;
}
LOG (GNUNET_ERROR_TYPE_DEBUG, " not for us, retransmitting...\n");
GNUNET_assert (NULL ==
- GCC_send_prebuilt_message (message, 0, 0, c, fwd,
+ GCC_send_prebuilt_message (&msg->header, 0, 0, c, fwd,
GNUNET_YES, NULL, NULL));
GCC_check_connections ();
- return GNUNET_OK;
+ return;
}
/**
- * Core handler for notifications of broken connections.
+ * Handler for notifications of broken connections.
*
- * @param cls Closure (unused).
- * @param id Peer identity of sending neighbor.
- * @param message Message.
- * @return #GNUNET_OK to keep the connection open,
- * #GNUNET_SYSERR to close it (signal serious error)
+ * @param peer Message sender (neighbor).
+ * @param msg Message itself.
*/
-int
-GCC_handle_broken (void* cls,
- const struct GNUNET_PeerIdentity* id,
- const struct GNUNET_MessageHeader* message)
+void
+GCC_handle_broken (struct CadetPeer *peer,
+ const struct GNUNET_CADET_ConnectionBroken *msg)
{
- struct GNUNET_CADET_ConnectionBroken *msg;
struct CadetConnection *c;
struct CadetTunnel *t;
- int pending;
int fwd;
GCC_check_connections ();
- msg = (struct GNUNET_CADET_ConnectionBroken *) message;
- log_message (message, id, &msg->cid);
+ log_message (&msg->header, peer, &msg->cid);
LOG (GNUNET_ERROR_TYPE_DEBUG, " regarding %s\n",
GNUNET_i2s (&msg->peer1));
LOG (GNUNET_ERROR_TYPE_DEBUG, " regarding %s\n",
if (NULL == c)
{
LOG (GNUNET_ERROR_TYPE_DEBUG, " duplicate CONNECTION_BROKEN\n");
+ GNUNET_STATISTICS_update (stats, "# duplicate CONNECTION_BROKEN",
+ 1, GNUNET_NO);
GCC_check_connections ();
- return GNUNET_OK;
+ return;
}
t = c->t;
- fwd = is_fwd (c, id);
+ fwd = is_fwd (c, peer);
+ if (GNUNET_SYSERR == fwd)
+ {
+ GNUNET_break_op (0);
+ GCC_check_connections ();
+ return;
+ }
mark_destroyed (c);
if (GCC_is_terminal (c, fwd))
{
/* A terminal connection should not have 't' set to NULL. */
GNUNET_break (0);
GCC_debug (c, GNUNET_ERROR_TYPE_ERROR);
- return GNUNET_OK;
+ return;
}
endpoint = GCP_get_short (c->path->peers[c->path->length - 1], GNUNET_YES);
if (2 < c->path->length)
GCT_remove_connection (t, c);
c->t = NULL;
- pending = c->pending_messages;
- if (0 < pending)
- resend_messages_and_destroy (c, !fwd);
- else
- GCC_destroy (c);
+ GCC_destroy (c);
}
else
{
- GNUNET_assert (NULL == GCC_send_prebuilt_message (message, 0, 0, c, fwd,
- GNUNET_YES, NULL, NULL));
+ GNUNET_assert (NULL ==
+ GCC_send_prebuilt_message (&msg->header, 0, 0, c, fwd,
+ GNUNET_YES, NULL, NULL));
connection_cancel_queues (c, !fwd);
}
GCC_check_connections ();
- return GNUNET_OK;
+ return;
}
/**
- * Core handler for tunnel destruction
+ * Handler for notifications of destroyed connections.
*
- * @param cls Closure (unused).
- * @param peer Peer identity of sending neighbor.
- * @param message Message.
- * @return #GNUNET_OK to keep the connection open,
- * #GNUNET_SYSERR to close it (signal serious error)
+ * @param peer Message sender (neighbor).
+ * @param msg Message itself.
*/
-int
-GCC_handle_destroy (void *cls,
- const struct GNUNET_PeerIdentity *peer,
- const struct GNUNET_MessageHeader *message)
+void
+GCC_handle_destroy (struct CadetPeer *peer,
+ const struct GNUNET_CADET_ConnectionDestroy *msg)
{
- const struct GNUNET_CADET_ConnectionDestroy *msg;
struct CadetConnection *c;
int fwd;
GCC_check_connections ();
- msg = (const struct GNUNET_CADET_ConnectionDestroy *) message;
- log_message (message, peer, &msg->cid);
+ log_message (&msg->header, peer, &msg->cid);
c = connection_get (&msg->cid);
if (NULL == c)
{
"# control on unknown connection",
1, GNUNET_NO);
LOG (GNUNET_ERROR_TYPE_DEBUG,
- " connection unknown: already destroyed?\n");
+ " connection unknown destroyed: previously destroyed?\n");
GCC_check_connections ();
- return GNUNET_OK;
+ return;
}
+
fwd = is_fwd (c, peer);
if (GNUNET_SYSERR == fwd)
{
- GNUNET_break_op (0); /* FIXME */
- return GNUNET_OK;
+ GNUNET_break_op (0);
+ GCC_check_connections ();
+ return;
}
+
if (GNUNET_NO == GCC_is_terminal (c, fwd))
{
GNUNET_assert (NULL ==
- GCC_send_prebuilt_message (message, 0, 0, c, fwd,
+ GCC_send_prebuilt_message (&msg->header, 0, 0, c, fwd,
GNUNET_YES, NULL, NULL));
}
else if (0 == c->pending_messages)
LOG (GNUNET_ERROR_TYPE_DEBUG, " directly destroying connection!\n");
GCC_destroy (c);
GCC_check_connections ();
- return GNUNET_OK;
+ return;
}
mark_destroyed (c);
if (NULL != c->t)
c->t = NULL;
}
GCC_check_connections ();
- return GNUNET_OK;
+ return;
}
/**
- * Check the message against internal state and test if it goes FWD or BCK.
- *
- * Updates the PID, state and timeout values for the connection.
+ * Handler for cadet network traffic hop-by-hop acks.
*
- * @param message Message to check. It must belong to an existing connection.
- * @param minimum_size The message cannot be smaller than this value.
- * @param cid Connection ID (even if @a c is NULL, the ID is still needed).
- * @param c Connection this message should belong. If NULL, check fails.
- * @param neighbor Neighbor that sent the message.
+ * @param peer Message sender (neighbor).
+ * @param msg Message itself.
*/
-static int
-check_message (const struct GNUNET_MessageHeader *message,
- size_t minimum_size,
- const struct GNUNET_CADET_Hash* cid,
- struct CadetConnection *c,
- const struct GNUNET_PeerIdentity *neighbor,
- uint32_t pid)
+void
+GCC_handle_ack (struct CadetPeer *peer,
+ const struct GNUNET_CADET_ACK *msg)
{
- GNUNET_PEER_Id neighbor_id;
+ struct CadetConnection *c;
struct CadetFlowControl *fc;
- struct CadetPeer *hop;
+ uint32_t ack;
int fwd;
- uint16_t type;
-
- /* Check size */
- if (ntohs (message->size) < minimum_size)
- {
- GNUNET_break_op (0);
- LOG (GNUNET_ERROR_TYPE_WARNING, "Size %u < %u\n",
- ntohs (message->size), minimum_size);
- return GNUNET_SYSERR;
- }
- /* Check connection */
+ GCC_check_connections ();
+ log_message (&msg->header, peer, &msg->cid);
+ c = connection_get (&msg->cid);
if (NULL == c)
{
GNUNET_STATISTICS_update (stats,
- "# unknown connection",
- 1, GNUNET_NO);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "%s on unknown connection %s\n",
- GC_m2s (ntohs (message->type)),
- GNUNET_h2s (GC_h2hc (cid)));
- send_broken_unknown (cid,
+ "# ack on unknown connection",
+ 1,
+ GNUNET_NO);
+ send_broken_unknown (&msg->cid,
&my_full_id,
NULL,
- neighbor);
- return GNUNET_SYSERR;
+ peer);
+ GCC_check_connections ();
+ return;
+ }
+
+ /* Is this a forward or backward ACK? */
+ if (get_next_hop (c) == peer)
+ {
+ fc = &c->fwd_fc;
+ fwd = GNUNET_YES;
+ }
+ else if (get_prev_hop (c) == peer)
+ {
+ fc = &c->bck_fc;
+ fwd = GNUNET_NO;
+ }
+ else
+ {
+ GNUNET_break_op (0);
+ return;
+ }
+
+ ack = ntohl (msg->ack);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " %s ACK %u (was %u)\n",
+ GC_f2s (fwd), ack, fc->last_ack_recv);
+ if (GC_is_pid_bigger (ack, fc->last_ack_recv))
+ fc->last_ack_recv = ack;
+
+ /* Cancel polling if the ACK is big enough. */
+ if (NULL != fc->poll_task &&
+ GC_is_pid_bigger (fc->last_ack_recv, fc->last_pid_sent))
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " Cancel poll\n");
+ GNUNET_SCHEDULER_cancel (fc->poll_task);
+ fc->poll_task = NULL;
+ fc->poll_time = GNUNET_TIME_UNIT_SECONDS;
+ }
+
+ GCC_check_connections ();
+}
+
+
+/**
+ * Handler for cadet network traffic hop-by-hop data counter polls.
+ *
+ * @param peer Message sender (neighbor).
+ * @param msg Message itself.
+ */
+void
+GCC_handle_poll (struct CadetPeer *peer,
+ const struct GNUNET_CADET_Poll *msg)
+{
+ struct CadetConnection *c;
+ struct CadetFlowControl *fc;
+ uint32_t pid;
+ int fwd;
+
+ GCC_check_connections ();
+ log_message (&msg->header, peer, &msg->cid);
+ c = connection_get (&msg->cid);
+ if (NULL == c)
+ {
+ GNUNET_STATISTICS_update (stats, "# poll on unknown connection", 1,
+ GNUNET_NO);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "POLL message on unknown connection %s!\n",
+ GNUNET_h2s (GC_h2hc (&msg->cid)));
+ send_broken_unknown (&msg->cid,
+ &my_full_id,
+ NULL,
+ peer);
+ GCC_check_connections ();
+ return;
+ }
+
+ /* Is this a forward or backward ACK?
+ * Note: a poll should never be needed in a loopback case,
+ * since there is no possiblility of packet loss there, so
+ * this way of discerining FWD/BCK should not be a problem.
+ */
+ if (get_next_hop (c) == peer)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " FWD FC\n");
+ fc = &c->fwd_fc;
+ }
+ else if (get_prev_hop (c) == peer)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " BCK FC\n");
+ fc = &c->bck_fc;
+ }
+ else
+ {
+ GNUNET_break_op (0);
+ return;
+ }
+
+ pid = ntohl (msg->pid);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " PID %u, OLD %u\n", pid, fc->last_pid_recv);
+ fc->last_pid_recv = pid;
+ fwd = fc == &c->bck_fc;
+ GCC_send_ack (c, fwd, GNUNET_YES);
+ GCC_check_connections ();
+}
+
+
+/**
+ * Check the message against internal state and test if it goes FWD or BCK.
+ *
+ * Updates the PID, state and timeout values for the connection.
+ *
+ * @param message Message to check. It must belong to an existing connection.
+ * @param cid Connection ID (even if @a c is NULL, the ID is still needed).
+ * @param c Connection this message should belong. If NULL, check fails.
+ * @param sender Neighbor that sent the message.
+ *
+ * @return #GNUNET_YES if the message goes FWD.
+ * #GNUNET_NO if it goes BCK.
+ * #GNUNET_SYSERR if there is an error (unauthorized sender, ...).
+ */
+static int
+check_message (const struct GNUNET_MessageHeader *message,
+ const struct GNUNET_CADET_Hash* cid,
+ struct CadetConnection *c,
+ struct CadetPeer *sender,
+ uint32_t pid)
+{
+ struct CadetFlowControl *fc;
+ struct CadetPeer *hop;
+ int fwd;
+ uint16_t type;
+
+ /* Check connection */
+ if (NULL == c)
+ {
+ GNUNET_STATISTICS_update (stats,
+ "# unknown connection",
+ 1, GNUNET_NO);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s on unknown connection %s\n",
+ GC_m2s (ntohs (message->type)),
+ GNUNET_h2s (GC_h2hc (cid)));
+ send_broken_unknown (cid,
+ &my_full_id,
+ NULL,
+ sender);
+ return GNUNET_SYSERR;
}
/* Check if origin is as expected */
- neighbor_id = GNUNET_PEER_search (neighbor);
hop = get_prev_hop (c);
- if (neighbor_id == GCP_get_short_id (hop))
+ if (sender == hop)
{
fwd = GNUNET_YES;
}
{
hop = get_next_hop (c);
GNUNET_break (hop == c->next_peer);
- if (neighbor_id == GCP_get_short_id (hop))
+ if (sender == hop)
{
fwd = GNUNET_NO;
}
/**
- * Generic handler for cadet network encrypted traffic.
+ * Handler for key exchange traffic (Axolotl KX).
*
- * @param peer Peer identity this notification is about.
- * @param msg Encrypted message.
- * @return #GNUNET_OK to keep the connection open,
- * #GNUNET_SYSERR to close it (signal serious error)
+ * @param peer Message sender (neighbor).
+ * @param msg Message itself.
*/
-static int
-handle_cadet_encrypted (const struct GNUNET_PeerIdentity *peer,
- const struct GNUNET_MessageHeader *message)
+void
+GCC_handle_kx (struct CadetPeer *peer,
+ const struct GNUNET_CADET_KX *msg)
{
- const struct GNUNET_CADET_AX *ax_msg;
const struct GNUNET_CADET_Hash* cid;
struct CadetConnection *c;
- size_t minimum_size;
- size_t overhead;
- uint32_t pid;
int fwd;
GCC_check_connections ();
- GNUNET_assert (GNUNET_MESSAGE_TYPE_CADET_AX == ntohs (message->type));
- overhead = sizeof (struct GNUNET_CADET_AX);
- ax_msg = (const struct GNUNET_CADET_AX *) message;
- cid = &ax_msg->cid;
- pid = ntohl (ax_msg->pid);
- log_message (message, peer, cid);
-
- minimum_size = sizeof (struct GNUNET_MessageHeader) + overhead;
+ cid = &msg->cid;
+ log_message (&msg->header, peer, cid);
+
c = connection_get (cid);
- fwd = check_message (message,
- minimum_size,
+ fwd = check_message (&msg->header,
cid,
c,
peer,
- pid);
+ 0);
/* If something went wrong, discard message. */
if (GNUNET_SYSERR == fwd)
{
+ GNUNET_break_op (0);
GCC_check_connections ();
- return GNUNET_OK;
+ return;
}
/* Is this message for us? */
if (GCC_is_terminal (c, fwd))
{
- GNUNET_STATISTICS_update (stats, "# received encrypted", 1, GNUNET_NO);
-
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " message for us!\n");
+ GNUNET_STATISTICS_update (stats, "# received KX", 1, GNUNET_NO);
if (NULL == c->t)
{
- GNUNET_break (GNUNET_NO != c->destroy);
- return GNUNET_OK;
+ GNUNET_break (0);
+ return;
}
- GCT_handle_encrypted (c->t, message);
- GCC_send_ack (c, fwd, GNUNET_NO);
+ GCT_handle_kx (c->t, &msg[1].header);
GCC_check_connections ();
- return GNUNET_OK;
+ return;
}
/* Message not for us: forward to next hop */
LOG (GNUNET_ERROR_TYPE_DEBUG, " not for us, retransmitting...\n");
GNUNET_STATISTICS_update (stats, "# messages forwarded", 1, GNUNET_NO);
- GNUNET_assert (NULL == GCC_send_prebuilt_message (message, 0, 0, c, fwd,
+ GNUNET_assert (NULL == GCC_send_prebuilt_message (&msg->header, 0, 0, c, fwd,
GNUNET_NO, NULL, NULL));
GCC_check_connections ();
- return GNUNET_OK;
}
/**
- * Generic handler for cadet network encrypted traffic.
+ * Handler for encrypted cadet network traffic (channel mgmt, data).
*
- * @param peer Peer identity this notification is about.
- * @param msg Encrypted message.
- * @return #GNUNET_OK to keep the connection open,
- * #GNUNET_SYSERR to close it (signal serious error)
+ * @param peer Message sender (neighbor).
+ * @param msg Message itself.
*/
-static int
-handle_cadet_kx (const struct GNUNET_PeerIdentity *peer,
- const struct GNUNET_CADET_KX *msg)
+void
+GCC_handle_encrypted (struct CadetPeer *peer,
+ const struct GNUNET_CADET_AX *msg)
{
const struct GNUNET_CADET_Hash* cid;
struct CadetConnection *c;
- size_t expected_size;
+ uint32_t pid;
int fwd;
GCC_check_connections ();
cid = &msg->cid;
+ pid = ntohl (msg->pid);
log_message (&msg->header, peer, cid);
- expected_size = sizeof (struct GNUNET_CADET_KX)
- + sizeof (struct GNUNET_MessageHeader);
c = connection_get (cid);
fwd = check_message (&msg->header,
- expected_size,
cid,
c,
peer,
- 0);
+ pid);
/* If something went wrong, discard message. */
if (GNUNET_SYSERR == fwd)
- return GNUNET_OK;
+ {
+ GNUNET_break_op (0);
+ GCC_check_connections ();
+ return;
+ }
/* Is this message for us? */
if (GCC_is_terminal (c, fwd))
{
- LOG (GNUNET_ERROR_TYPE_DEBUG, " message for us!\n");
- GNUNET_STATISTICS_update (stats, "# received KX", 1, GNUNET_NO);
+ GNUNET_STATISTICS_update (stats, "# received encrypted", 1, GNUNET_NO);
+
if (NULL == c->t)
{
- GNUNET_break (0);
- return GNUNET_OK;
+ GNUNET_break (GNUNET_NO != c->destroy);
+ return;
}
- GCT_handle_kx (c->t, &msg[1].header);
+ GCT_handle_encrypted (c->t, &msg->header);
+ GCC_send_ack (c, fwd, GNUNET_NO);
GCC_check_connections ();
- return GNUNET_OK;
+ return;
}
/* Message not for us: forward to next hop */
GNUNET_assert (NULL == GCC_send_prebuilt_message (&msg->header, 0, 0, c, fwd,
GNUNET_NO, NULL, NULL));
GCC_check_connections ();
- return GNUNET_OK;
-}
-
-
-/**
- * Core handler for key exchange traffic (ephemeral key, ping, pong).
- *
- * @param cls Closure (unused).
- * @param message Message received.
- * @param peer Peer who sent the message.
- * @return #GNUNET_OK to keep the connection open,
- * #GNUNET_SYSERR to close it (signal serious error)
- */
-int
-GCC_handle_kx (void *cls,
- const struct GNUNET_PeerIdentity *peer,
- const struct GNUNET_MessageHeader *message)
-{
- GCC_check_connections ();
- return handle_cadet_kx (peer, (struct GNUNET_CADET_KX *) message);
-}
-
-
-/**
- * Core handler for encrypted cadet network traffic (channel mgmt, data).
- *
- * @param cls Closure (unused).
- * @param message Message received.
- * @param peer Peer who sent the message.
- * @return #GNUNET_OK to keep the connection open,
- * #GNUNET_SYSERR to close it (signal serious error)
- */
-int
-GCC_handle_encrypted (void *cls, const struct GNUNET_PeerIdentity *peer,
- const struct GNUNET_MessageHeader *message)
-{
- GCC_check_connections ();
- return handle_cadet_encrypted (peer, message);
-}
-
-
-/**
- * Core handler for cadet network traffic point-to-point acks.
- *
- * @param cls closure
- * @param message message
- * @param peer peer identity this notification is about
- * @return #GNUNET_OK to keep the connection open,
- * #GNUNET_SYSERR to close it (signal serious error)
- */
-int
-GCC_handle_ack (void *cls, const struct GNUNET_PeerIdentity *peer,
- const struct GNUNET_MessageHeader *message)
-{
- struct GNUNET_CADET_ACK *msg;
- struct CadetConnection *c;
- struct CadetFlowControl *fc;
- GNUNET_PEER_Id id;
- uint32_t ack;
- int fwd;
-
- GCC_check_connections ();
- msg = (struct GNUNET_CADET_ACK *) message;
- log_message (message, peer, &msg->cid);
- c = connection_get (&msg->cid);
- if (NULL == c)
- {
- GNUNET_STATISTICS_update (stats,
- "# ack on unknown connection",
- 1,
- GNUNET_NO);
- send_broken_unknown (&msg->cid,
- &my_full_id,
- NULL,
- peer);
- GCC_check_connections ();
- return GNUNET_OK;
- }
-
- /* Is this a forward or backward ACK? */
- id = GNUNET_PEER_search (peer);
- if (GCP_get_short_id (get_next_hop (c)) == id)
- {
- fc = &c->fwd_fc;
- fwd = GNUNET_YES;
- }
- else if (GCP_get_short_id (get_prev_hop (c)) == id)
- {
- fc = &c->bck_fc;
- fwd = GNUNET_NO;
- }
- else
- {
- GNUNET_break_op (0);
- return GNUNET_OK;
- }
-
- ack = ntohl (msg->ack);
- LOG (GNUNET_ERROR_TYPE_DEBUG, " %s ACK %u (was %u)\n",
- GC_f2s (fwd), ack, fc->last_ack_recv);
- if (GC_is_pid_bigger (ack, fc->last_ack_recv))
- fc->last_ack_recv = ack;
-
- /* Cancel polling if the ACK is big enough. */
- if (NULL != fc->poll_task &&
- GC_is_pid_bigger (fc->last_ack_recv, fc->last_pid_sent))
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG, " Cancel poll\n");
- GNUNET_SCHEDULER_cancel (fc->poll_task);
- fc->poll_task = NULL;
- fc->poll_time = GNUNET_TIME_UNIT_SECONDS;
- }
-
- connection_unlock_queue (c, fwd);
- GCC_check_connections ();
- return GNUNET_OK;
-}
-
-
-/**
- * Core handler for cadet network traffic point-to-point ack polls.
- *
- * @param cls closure
- * @param message message
- * @param peer peer identity this notification is about
- * @return #GNUNET_OK to keep the connection open,
- * #GNUNET_SYSERR to close it (signal serious error)
- */
-int
-GCC_handle_poll (void *cls,
- const struct GNUNET_PeerIdentity *peer,
- const struct GNUNET_MessageHeader *message)
-{
- struct GNUNET_CADET_Poll *msg;
- struct CadetConnection *c;
- struct CadetFlowControl *fc;
- GNUNET_PEER_Id id;
- uint32_t pid;
- int fwd;
-
- GCC_check_connections ();
- msg = (struct GNUNET_CADET_Poll *) message;
- log_message (message, peer, &msg->cid);
- c = connection_get (&msg->cid);
- if (NULL == c)
- {
- GNUNET_STATISTICS_update (stats, "# poll on unknown connection", 1,
- GNUNET_NO);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "POLL message on unknown connection %s!\n",
- GNUNET_h2s (GC_h2hc (&msg->cid)));
- send_broken_unknown (&msg->cid,
- &my_full_id,
- NULL,
- peer);
- GCC_check_connections ();
- return GNUNET_OK;
- }
-
- /* Is this a forward or backward ACK?
- * Note: a poll should never be needed in a loopback case,
- * since there is no possiblility of packet loss there, so
- * this way of discerining FWD/BCK should not be a problem.
- */
- id = GNUNET_PEER_search (peer);
- if (GCP_get_short_id (get_next_hop (c)) == id)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG, " FWD FC\n");
- fc = &c->fwd_fc;
- }
- else if (GCP_get_short_id (get_prev_hop (c)) == id)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG, " BCK FC\n");
- fc = &c->bck_fc;
- }
- else
- {
- GNUNET_break_op (0);
- return GNUNET_OK;
- }
-
- pid = ntohl (msg->pid);
- LOG (GNUNET_ERROR_TYPE_DEBUG, " PID %u, OLD %u\n", pid, fc->last_pid_recv);
- fc->last_pid_recv = pid;
- fwd = fc == &c->bck_fc;
- GCC_send_ack (c, fwd, GNUNET_YES);
- GCC_check_connections ();
-
- return GNUNET_OK;
-}
-
-
-/**
- * Send an ACK on the appropriate connection/channel, depending on
- * the direction and the position of the peer.
- *
- * @param c Which connection to send the hop-by-hop ACK.
- * @param fwd Is this a fwd ACK? (will go dest->root).
- * @param force Send the ACK even if suboptimal (e.g. requested by POLL).
- */
-void
-GCC_send_ack (struct CadetConnection *c, int fwd, int force)
-{
- unsigned int buffer;
-
- GCC_check_connections ();
- LOG (GNUNET_ERROR_TYPE_DEBUG, "GCC send %s ACK on %s\n",
- GC_f2s (fwd), GCC_2s (c));
-
- if (NULL == c)
- {
- GNUNET_break (0);
- return;
- }
-
- if (GNUNET_NO != c->destroy)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG, " being destroyed, why bother...\n");
- GCC_check_connections ();
- return;
- }
-
- /* Get available buffer space */
- if (GCC_is_terminal (c, fwd))
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG, " getting from all channels\n");
- buffer = GCT_get_channels_buffer (c->t);
- }
- else
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG, " getting from one connection\n");
- buffer = GCC_get_buffer (c, fwd);
- }
- LOG (GNUNET_ERROR_TYPE_DEBUG, " buffer available: %u\n", buffer);
- if (0 == buffer && GNUNET_NO == force)
- {
- GCC_check_connections ();
- return;
- }
-
- /* Send available buffer space */
- if (GCC_is_origin (c, fwd))
- {
- GNUNET_assert (NULL != c->t);
- LOG (GNUNET_ERROR_TYPE_DEBUG, " sending on channels...\n");
- GCT_unchoke_channels (c->t);
- }
- else
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG, " sending on connection\n");
- send_ack (c, buffer, fwd, force);
- }
- GCC_check_connections ();
}
* Create a connection.
*
* @param cid Connection ID (either created locally or imposed remotely).
- * @param t Tunnel this connection belongs to (or NULL);
+ * @param t Tunnel this connection belongs to (or NULL for transit connections);
* @param path Path this connection has to use (copy is made).
* @param own_pos Own position in the @c path path.
*
- * @return Newly created connection, NULL in case of error (own id not in path).
- */
+ * @return Newly created connection.
+ * NULL in case of error: own id not in path, wrong neighbors, ...
+*/
struct CadetConnection *
GCC_new (const struct GNUNET_CADET_Hash *cid,
struct CadetTunnel *t,
}
+/**
+ * Connection is no longer needed: destroy it.
+ *
+ * Cancels all pending traffic (including possible DESTROY messages), all
+ * maintenance tasks and removes the connection from neighbor peers and tunnel.
+ *
+ * @param c Connection to destroy.
+ */
void
GCC_destroy (struct CadetConnection *c)
{
* @param message Message to send. Function makes a copy of it.
* If message is not hop-by-hop, decrements TTL of copy.
* @param payload_type Type of payload, in case the message is encrypted.
+ * @param payload_id ID of the payload (PID, ACK, ...).
* @param c Connection on which this message is transmitted.
* @param fwd Is this a fwd message?
* @param force Force the connection to accept the message (buffer overfill).
{
struct CadetFlowControl *fc;
struct CadetConnectionQueue *q;
- void *data;
+ struct GNUNET_MessageHeader *copy;
size_t size;
uint16_t type;
int droppable;
}
size = ntohs (message->size);
- data = GNUNET_malloc (size);
- GNUNET_memcpy (data, message, size);
+ copy = GNUNET_malloc (size);
+ GNUNET_memcpy (copy, message, size);
type = ntohs (message->type);
LOG (GNUNET_ERROR_TYPE_INFO,
"--> %s (%s %4u) on conn %s (%p) %s [%5u]\n",
struct GNUNET_CADET_ConnectionBroken *bmsg;
case GNUNET_MESSAGE_TYPE_CADET_AX:
- axmsg = (struct GNUNET_CADET_AX *) data;
+ axmsg = (struct GNUNET_CADET_AX *) copy;
axmsg->cid = c->id;
LOG (GNUNET_ERROR_TYPE_DEBUG, " Q_N+ %p %u\n", fc, fc->queue_n);
LOG (GNUNET_ERROR_TYPE_DEBUG, "last pid sent %u\n", fc->last_pid_sent);
break;
case GNUNET_MESSAGE_TYPE_CADET_KX:
- kmsg = (struct GNUNET_CADET_KX *) data;
+ kmsg = (struct GNUNET_CADET_KX *) copy;
kmsg->cid = c->id;
break;
case GNUNET_MESSAGE_TYPE_CADET_ACK:
- amsg = (struct GNUNET_CADET_ACK *) data;
+ amsg = (struct GNUNET_CADET_ACK *) copy;
amsg->cid = c->id;
LOG (GNUNET_ERROR_TYPE_DEBUG, " ack %u\n", ntohl (amsg->ack));
droppable = GNUNET_NO;
break;
case GNUNET_MESSAGE_TYPE_CADET_POLL:
- pmsg = (struct GNUNET_CADET_Poll *) data;
+ pmsg = (struct GNUNET_CADET_Poll *) copy;
pmsg->cid = c->id;
LOG (GNUNET_ERROR_TYPE_DEBUG, " POLL %u\n", ntohl (pmsg->pid));
droppable = GNUNET_NO;
break;
case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY:
- dmsg = (struct GNUNET_CADET_ConnectionDestroy *) data;
+ dmsg = (struct GNUNET_CADET_ConnectionDestroy *) copy;
dmsg->cid = c->id;
break;
case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN:
- bmsg = (struct GNUNET_CADET_ConnectionBroken *) data;
+ bmsg = (struct GNUNET_CADET_ConnectionBroken *) copy;
bmsg->cid = c->id;
break;
case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE:
case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK:
+ GNUNET_break (0); /* Should've used specific functions. */
break;
default:
GNUNET_break (0);
- GNUNET_free (data);
+ GNUNET_free (copy);
return NULL;
}
{
fc->queue_n--;
}
- GNUNET_free (data);
+ GNUNET_free (copy);
return NULL; /* Drop this message */
}
q = GNUNET_new (struct CadetConnectionQueue);
q->forced = !droppable;
- q->q = GCP_queue_add (get_hop (c, fwd), data, type, payload_type, payload_id,
- size, c, fwd, &conn_message_sent, q);
- if (NULL == q->q)
+ q->peer_q = GCP_send (get_hop (c, fwd), copy,
+ payload_type, payload_id,
+ c, fwd,
+ &conn_message_sent, q);
+ if (NULL == q->peer_q)
{
LOG (GNUNET_ERROR_TYPE_DEBUG, "dropping msg on %s, NULL q\n", GCC_2s (c));
- GNUNET_free (data);
+ GNUNET_free (copy);
GNUNET_free (q);
GCC_check_connections ();
return NULL;
{
LOG (GNUNET_ERROR_TYPE_DEBUG, "! GCC cancel message\n");
- /* queue destroy calls message_sent, which calls q->cont and frees q */
- GCP_queue_destroy (q->q, GNUNET_YES, GNUNET_NO, 0);
+ /* send_cancel calls message_sent, which calls q->cont and frees q */
+ GCP_send_cancel (q->peer_q);
GCC_check_connections ();
}
* Sends a CREATE CONNECTION message for a path to a peer.
* Changes the connection and tunnel states if necessary.
*
- * @param connection Connection to create.
+ * @param c Connection to create.
*/
void
-GCC_send_create (struct CadetConnection *connection)
+GCC_send_create (struct CadetConnection *c)
{
enum CadetTunnelCState state;
size_t size;
GCC_check_connections ();
size = sizeof (struct GNUNET_CADET_ConnectionCreate);
- size += connection->path->length * sizeof (struct GNUNET_PeerIdentity);
+ size += c->path->length * sizeof (struct GNUNET_PeerIdentity);
+ {
+ /* Allocate message on the stack */
+ unsigned char cbuf[size];
+ struct GNUNET_CADET_ConnectionCreate *msg;
+ struct GNUNET_PeerIdentity *peers;
+
+ msg = (struct GNUNET_CADET_ConnectionCreate *) cbuf;
+ msg->header.size = htons (size);
+ msg->header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE);
+ msg->cid = *GCC_get_id (c);
+ peers = (struct GNUNET_PeerIdentity *) &msg[1];
+ for (int i = 0; i < c->path->length; i++)
+ {
+ GNUNET_PEER_resolve (c->path->peers[i], peers++);
+ }
+ GNUNET_assert (NULL == c->maintenance_q);
+ c->maintenance_q = GCP_send (get_next_hop (c),
+ &msg->header,
+ GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE, 0,
+ c, GNUNET_YES,
+ &conn_message_sent, NULL);
+ }
LOG (GNUNET_ERROR_TYPE_INFO, "==> %s %19s on conn %s (%p) FWD [%5u]\n",
GC_m2s (GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE), "",
- GCC_2s (connection), connection, size);
+ GCC_2s (c), c, size);
LOG (GNUNET_ERROR_TYPE_DEBUG, " C_P+ %p %u (create)\n",
- connection, connection->pending_messages);
- connection->pending_messages++;
-
- connection->maintenance_q =
- GCP_queue_add (get_next_hop (connection), NULL,
- GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE, UINT16_MAX, 0,
- size, connection, GNUNET_YES, &conn_message_sent, NULL);
+ c, c->pending_messages);
+ c->pending_messages++;
- state = GCT_get_cstate (connection->t);
+ state = GCT_get_cstate (c->t);
if (CADET_TUNNEL_SEARCHING == state || CADET_TUNNEL_NEW == state)
- GCT_change_cstate (connection->t, CADET_TUNNEL_WAITING);
- if (CADET_CONNECTION_NEW == connection->state)
- connection_change_state (connection, CADET_CONNECTION_SENT);
+ GCT_change_cstate (c->t, CADET_TUNNEL_WAITING);
+ if (CADET_CONNECTION_NEW == c->state)
+ connection_change_state (c, CADET_CONNECTION_SENT);
+ GCC_check_connections ();
+}
+
+
+/**
+ * Send an ACK on the appropriate connection/channel, depending on
+ * the direction and the position of the peer.
+ *
+ * @param c Which connection to send the hop-by-hop ACK.
+ * @param fwd Is this a fwd ACK? (will go dest->root).
+ * @param force Send the ACK even if suboptimal (e.g. requested by POLL).
+ */
+void
+GCC_send_ack (struct CadetConnection *c, int fwd, int force)
+{
+ unsigned int buffer;
+
+ GCC_check_connections ();
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "GCC send %s ACK on %s\n",
+ GC_f2s (fwd), GCC_2s (c));
+
+ if (NULL == c)
+ {
+ GNUNET_break (0);
+ return;
+ }
+
+ if (GNUNET_NO != c->destroy)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " being destroyed, why bother...\n");
+ GCC_check_connections ();
+ return;
+ }
+
+ /* Get available buffer space */
+ if (GCC_is_terminal (c, fwd))
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " getting from all channels\n");
+ buffer = GCT_get_channels_buffer (c->t);
+ }
+ else
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " getting from one connection\n");
+ buffer = GCC_get_buffer (c, fwd);
+ }
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " buffer available: %u\n", buffer);
+ if (0 == buffer && GNUNET_NO == force)
+ {
+ GCC_check_connections ();
+ return;
+ }
+
+ /* Send available buffer space */
+ if (GNUNET_YES == GCC_is_origin (c, fwd))
+ {
+ GNUNET_assert (NULL != c->t);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " sending on channels...\n");
+ GCT_unchoke_channels (c->t);
+ }
+ else
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " sending on connection\n");
+ send_ack (c, buffer, fwd, force);
+ }
GCC_check_connections ();
}
/**
- * Core handler for connection creation.
+ * Handler for connection creation.
*
- * @param cls Closure (unused).
- * @param peer Sender (neighbor).
- * @param message Message.
- * @return #GNUNET_OK to keep the connection open,
- * #GNUNET_SYSERR to close it (signal serious error)
+ * @param peer Message sender (neighbor).
+ * @param msg Message itself.
*/
-int
-GCC_handle_create (void *cls,
- const struct GNUNET_PeerIdentity *peer,
- const struct GNUNET_MessageHeader *message);
+void
+GCC_handle_create (struct CadetPeer *peer,
+ const struct GNUNET_CADET_ConnectionCreate *msg);
/**
- * Core handler for path confirmations.
+ * Handler for connection confirmations.
*
- * @param cls closure
- * @param message message
- * @param peer peer identity this notification is about
- * @return #GNUNET_OK to keep the connection open,
- * #GNUNET_SYSERR to close it (signal serious error)
+ * @param peer Message sender (neighbor).
+ * @param msg Message itself.
*/
-int
-GCC_handle_confirm (void *cls,
- const struct GNUNET_PeerIdentity *peer,
- const struct GNUNET_MessageHeader *message);
+void
+GCC_handle_confirm (struct CadetPeer *peer,
+ const struct GNUNET_CADET_ConnectionACK *msg);
/**
- * Core handler for notifications of broken paths
+ * Handler for notifications of broken connections.
*
- * @param cls Closure (unused).
- * @param id Peer identity of sending neighbor.
- * @param message Message.
- * @return #GNUNET_OK to keep the connection open,
- * #GNUNET_SYSERR to close it (signal serious error)
+ * @param peer Message sender (neighbor).
+ * @param msg Message itself.
*/
-int
-GCC_handle_broken (void* cls,
- const struct GNUNET_PeerIdentity* id,
- const struct GNUNET_MessageHeader* message);
+void
+GCC_handle_broken (struct CadetPeer *peer,
+ const struct GNUNET_CADET_ConnectionBroken *msg);
/**
- * Core handler for tunnel destruction
+ * Handler for notifications of destroyed connections.
*
- * @param cls Closure (unused).
- * @param peer Peer identity of sending neighbor.
- * @param message Message.
- *
- * @return GNUNET_OK to keep the connection open,
- * GNUNET_SYSERR to close it (signal serious error)
+ * @param peer Message sender (neighbor).
+ * @param msg Message itself.
*/
-int
-GCC_handle_destroy (void *cls, const struct GNUNET_PeerIdentity *peer,
- const struct GNUNET_MessageHeader *message);
+void
+GCC_handle_destroy (struct CadetPeer *peer,
+ const struct GNUNET_CADET_ConnectionDestroy *msg);
/**
- * Core handler for key exchange traffic (ephemeral key, ping, pong).
+ * Handler for cadet network traffic hop-by-hop acks.
*
- * @param cls Closure (unused).
- * @param message Message received.
- * @param peer Peer who sent the message.
+ * @param peer Message sender (neighbor).
+ * @param msg Message itself.
+ */
+void
+GCC_handle_ack (struct CadetPeer *peer,
+ const struct GNUNET_CADET_ACK *msg);
+
+/**
+ * Handler for cadet network traffic hop-by-hop data counter polls.
*
- * @return GNUNET_OK to keep the connection open,
- * GNUNET_SYSERR to close it (signal serious error)
+ * @param peer Message sender (neighbor).
+ * @param msg Message itself.
*/
-int
-GCC_handle_kx (void *cls, const struct GNUNET_PeerIdentity *peer,
- const struct GNUNET_MessageHeader *message);
+void
+GCC_handle_poll (struct CadetPeer *peer,
+ const struct GNUNET_CADET_Poll *msg);
/**
- * Core handler for encrypted cadet network traffic (channel mgmt, data).
+ * Handler for key exchange traffic (Axolotl KX).
*
- * @param cls Closure (unused).
- * @param message Message received.
- * @param peer Peer who sent the message.
+ * @param peer Message sender (neighbor).
+ * @param msg Message itself.
+ */
+void
+GCC_handle_kx (struct CadetPeer *peer,
+ const struct GNUNET_CADET_KX *msg);
+
+/**
+ * Handler for encrypted cadet network traffic (channel mgmt, data).
*
- * @return GNUNET_OK to keep the connection open,
- * GNUNET_SYSERR to close it (signal serious error)
+ * @param peer Message sender (neighbor).
+ * @param msg Message itself.
*/
-int
-GCC_handle_encrypted (void *cls, const struct GNUNET_PeerIdentity *peer,
- const struct GNUNET_MessageHeader *message);
+void
+GCC_handle_encrypted (struct CadetPeer *peer,
+ const struct GNUNET_CADET_AX *msg);
/**
* Core handler for axolotl key exchange traffic.
GCC_handle_ax (void *cls, const struct GNUNET_PeerIdentity *peer,
struct GNUNET_MessageHeader *message);
-/**
- * Core handler for cadet network traffic point-to-point acks.
- *
- * @param cls closure
- * @param message message
- * @param peer peer identity this notification is about
- *
- * @return GNUNET_OK to keep the connection open,
- * GNUNET_SYSERR to close it (signal serious error)
- */
-int
-GCC_handle_ack (void *cls, const struct GNUNET_PeerIdentity *peer,
- const struct GNUNET_MessageHeader *message);
-
-/**
- * Core handler for cadet network traffic point-to-point ack polls.
- *
- * @param cls closure
- * @param message message
- * @param peer peer identity this notification is about
- *
- * @return GNUNET_OK to keep the connection open,
- * GNUNET_SYSERR to close it (signal serious error)
- */
-int
-GCC_handle_poll (void *cls, const struct GNUNET_PeerIdentity *peer,
- const struct GNUNET_MessageHeader *message);
-
/**
* Core handler for cadet keepalives.
*
* Create a connection.
*
* @param cid Connection ID (either created locally or imposed remotely).
- * @param t Tunnel this connection belongs to (or NULL);
+ * @param t Tunnel this connection belongs to (or NULL for transit connections);
* @param path Path this connection has to use (copy is made).
* @param own_pos Own position in the @c path path.
*
- * @return Newly created connection, NULL in case of error (own id not in path).
+ * @return Newly created connection.
+ * NULL in case of error: own id not in path, wrong neighbors, ...
*/
struct CadetConnection *
GCC_new (const struct GNUNET_CADET_Hash *cid,
* @param message Message to send. Function makes a copy of it.
* If message is not hop-by-hop, decrements TTL of copy.
* @param payload_type Type of payload, in case the message is encrypted.
+ * @param payload_id ID of the payload (PID, ACK, ...).
* @param c Connection on which this message is transmitted.
* @param fwd Is this a fwd message?
* @param force Force the connection to accept the message (buffer overfill).
struct CadetPeer *p = value;
struct CadetTunnel *t;
- GCP_debug (p, GNUNET_ERROR_TYPE_ERROR);
-
t = GCP_get_tunnel (p);
if (NULL != t)
GCT_debug (t, GNUNET_ERROR_TYPE_ERROR);
/******************************** STRUCTS **********************************/
/******************************************************************************/
+
/**
- * Struct containing info about a queued transmission to this peer
+ * Struct containing all information regarding a given peer
*/
-struct CadetPeerQueue
+struct CadetPeer
{
/**
- * DLL next
+ * ID of the peer
*/
- struct CadetPeerQueue *next;
+ GNUNET_PEER_Id id;
/**
- * DLL previous
+ * Last time we heard from this peer
*/
- struct CadetPeerQueue *prev;
+ struct GNUNET_TIME_Absolute last_contact;
/**
- * Peer this transmission is directed to.
+ * Paths to reach the peer, ordered by ascending hop count
*/
- struct CadetPeer *peer;
+ struct CadetPeerPath *path_head;
/**
- * Connection this message belongs to.
+ * Paths to reach the peer, ordered by ascending hop count
*/
- struct CadetConnection *c;
+ struct CadetPeerPath *path_tail;
/**
- * Is FWD in c?
+ * Handle to stop the DHT search for paths to this peer
*/
- int fwd;
+ struct GCD_search_handle *search_h;
/**
- * Pointer to info stucture used as cls.
+ * Handle to stop the DHT search for paths to this peer
*/
- void *cls;
+ struct GNUNET_SCHEDULER_Task *search_delayed;
/**
- * Type of message
+ * Tunnel to this peer, if any.
*/
- uint16_t type;
+ struct CadetTunnel *tunnel;
/**
- * Type of message
+ * Connections that go through this peer; indexed by tid.
*/
- uint16_t payload_type;
+ struct GNUNET_CONTAINER_MultiHashMap *connections;
/**
- * Type of message
+ * Handle for core transmissions.
*/
- uint32_t payload_id;
+ struct GNUNET_MQ_Handle *core_mq;
/**
- * Size of the message
+ * How many messages are in the queue to this peer.
*/
- size_t size;
+ unsigned int queue_n;
/**
- * Set when this message starts waiting for CORE.
+ * Hello message.
*/
- struct GNUNET_TIME_Absolute start_waiting;
+ struct GNUNET_HELLO_Message* hello;
/**
- * Function to call on sending.
+ * Handle to us offering the HELLO to the transport.
*/
- GCP_sent cont;
+ struct GNUNET_TRANSPORT_OfferHelloHandle *hello_offer;
/**
- * Closure for callback.
+ * Handle to our ATS request asking ATS to suggest an address
+ * to TRANSPORT for this peer (to establish a direct link).
*/
- void *cont_cls;
+ struct GNUNET_ATS_ConnectivitySuggestHandle *connectivity_suggestion;
+
};
/**
- * Struct containing all information regarding a given peer
+ * Information about a queued message on the peer level.
*/
-struct CadetPeer
-{
- /**
- * ID of the peer
- */
- GNUNET_PEER_Id id;
-
- /**
- * Last time we heard from this peer
- */
- struct GNUNET_TIME_Absolute last_contact;
+struct CadetPeerQueue {
/**
- * Paths to reach the peer, ordered by ascending hop count
- */
- struct CadetPeerPath *path_head;
-
- /**
- * Paths to reach the peer, ordered by ascending hop count
- */
- struct CadetPeerPath *path_tail;
-
- /**
- * Handle to stop the DHT search for paths to this peer
+ * Envelope to cancel message before MQ sends it.
*/
- struct GCD_search_handle *search_h;
+ struct GNUNET_MQ_Envelope *env;
/**
- * Handle to stop the DHT search for paths to this peer
+ * Peer (neighbor) this message is being sent to.
*/
- struct GNUNET_SCHEDULER_Task *search_delayed;
+ struct CadetPeer *peer;
/**
- * Tunnel to this peer, if any.
+ * Continuation to call to notify higher layers about message sent.
*/
- struct CadetTunnel *tunnel;
+ GCP_sent cont;
/**
- * Connections that go through this peer; indexed by tid.
+ * Closure for @a cont.
*/
- struct GNUNET_CONTAINER_MultiHashMap *connections;
+ void *cont_cls;
/**
- * Handle for queued transmissions
+ * Time when message was queued for sending.
*/
- struct GNUNET_CORE_TransmitHandle *core_transmit;
+ struct GNUNET_TIME_Absolute queue_timestamp;
/**
- * Timestamp
+ * #GNUNET_YES if message was management traffic (POLL, ACK, ...).
*/
- struct GNUNET_TIME_Absolute tmt_time;
+ int management_traffic;
/**
- * Transmission queue to core DLL head
+ * Message type.
*/
- struct CadetPeerQueue *queue_head;
+ uint16_t type;
/**
- * Transmission queue to core DLL tail
+ * Message size.
*/
- struct CadetPeerQueue *queue_tail;
+ uint16_t size;
/**
- * How many messages are in the queue to this peer.
+ * Type of the message's payload, if it was encrypted data.
*/
- unsigned int queue_n;
+ uint16_t payload_type;
/**
- * Hello message.
+ *ID of the payload (PID, ACK #, ...).
*/
- struct GNUNET_HELLO_Message* hello;
+ uint16_t payload_id;
/**
- * Handle to us offering the HELLO to the transport.
+ * Connection this message was sent on.
*/
- struct GNUNET_TRANSPORT_OfferHelloHandle *hello_offer;
+ struct CadetConnection *c;
/**
- * Handle to our ATS request asking ATS to suggest an address
- * to TRANSPORT for this peer (to establish a direct link).
+ * Direction in @a c this message was send on (#GNUNET_YES = FWD).
*/
- struct GNUNET_ATS_ConnectivitySuggestHandle *connectivity_suggestion;
-
+ int c_fwd;
};
static int in_shutdown;
-/******************************************************************************/
-/***************************** DEBUG *********************************/
-/******************************************************************************/
-
-/**
- * Log all kinds of info about the queueing status of a peer.
- *
- * @param p Peer whose queue to show.
- * @param level Error level to use for logging.
- */
-static void
-queue_debug (const struct CadetPeer *p, enum GNUNET_ErrorType level)
-{
- struct GNUNET_TIME_Relative core_wait_time;
- struct CadetPeerQueue *q;
- int do_log;
-
- do_log = GNUNET_get_log_call_status (level & (~GNUNET_ERROR_TYPE_BULK),
- "cadet-p2p",
- __FILE__, __FUNCTION__, __LINE__);
- if (0 == do_log)
- return;
-
- LOG2 (level, "QQQ Message queue towards %s\n", GCP_2s (p));
- LOG2 (level, "QQQ queue length: %u\n", p->queue_n);
- LOG2 (level, "QQQ core tmt rdy: %p\n", p->core_transmit);
- if (NULL != p->core_transmit)
- {
- core_wait_time = GNUNET_TIME_absolute_get_duration (p->tmt_time);
- LOG2 (level, "QQQ core called %s ago\n",
- GNUNET_STRINGS_relative_time_to_string (core_wait_time, GNUNET_NO));
- }
- for (q = p->queue_head; NULL != q; q = q->next)
- {
- LOG2 (level, "QQQ - %s %s on %s\n",
- GC_m2s (q->type), GC_f2s (q->fwd), GCC_2s (q->c));
- LOG2 (level, "QQQ payload %s, %u\n",
- GC_m2s (q->payload_type), q->payload_id);
- LOG2 (level, "QQQ size: %u bytes\n", q->size);
- }
-
- LOG2 (level, "QQQ End queue towards %s\n", GCP_2s (p));
-}
-
-
-/**
- * Log all kinds of info about a peer.
- *
- * @param peer Peer.
- */
-void
-GCP_debug (const struct CadetPeer *p, enum GNUNET_ErrorType level)
-{
- struct CadetPeerPath *path;
- unsigned int conns;
- int do_log;
-
- do_log = GNUNET_get_log_call_status (level & (~GNUNET_ERROR_TYPE_BULK),
- "cadet-p2p",
- __FILE__, __FUNCTION__, __LINE__);
- if (0 == do_log)
- return;
-
- if (NULL == p)
- {
- LOG2 (level, "PPP DEBUG PEER NULL\n");
- return;
- }
-
- LOG2 (level, "PPP DEBUG PEER %s\n", GCP_2s (p));
- LOG2 (level, "PPP last contact %s\n",
- GNUNET_STRINGS_absolute_time_to_string (p->last_contact));
- for (path = p->path_head; NULL != path; path = path->next)
- {
- char *s;
-
- s = path_2s (path);
- LOG2 (level, "PPP path: %s\n", s);
- GNUNET_free (s);
- }
-
- LOG2 (level, "PPP core transmit handle %p\n", p->core_transmit);
- LOG2 (level, "PPP DHT GET handle %p\n", p->search_h);
- conns = 0;
- if (NULL != p->connections)
- conns += GNUNET_CONTAINER_multihashmap_size (p->connections);
- LOG2 (level, "PPP # connections over link to peer: %u\n", conns);
- queue_debug (p, level);
- LOG2 (level, "PPP DEBUG END\n");
-}
-
-
/******************************************************************************/
/***************************** CORE HELPERS *********************************/
/******************************************************************************/
/**
* Method called whenever a given peer connects.
*
- * @param cls closure
- * @param peer peer identity this notification is about
+ * @param cls Core closure (unused).
+ * @param peer Peer identity this notification is about
+ * @param mq Message Queue to this peer.
+ *
+ * @return Internal closure for handlers (CadetPeer struct).
*/
-static void
-core_connect (void *cls,
- const struct GNUNET_PeerIdentity *peer)
+static void *
+core_connect_handler (void *cls,
+ const struct GNUNET_PeerIdentity *peer,
+ struct GNUNET_MQ_Handle *mq)
{
struct CadetPeer *neighbor;
struct CadetPeerPath *path;
sizeof (own_id),
"%s",
GNUNET_i2s (&my_full_id));
+
+ /* Save a path to the neighbor */
neighbor = GCP_get (peer, GNUNET_YES);
if (myid == neighbor->id)
{
path = path_new (2);
path->peers[1] = neighbor->id;
GNUNET_PEER_change_rc (neighbor->id, 1);
+ GNUNET_assert (NULL == neighbor->core_mq);
+ neighbor->core_mq = mq;
}
path->peers[0] = myid;
GNUNET_PEER_change_rc (myid, 1);
GCP_add_path (neighbor, path, GNUNET_YES);
+ /* Create the connections hashmap */
GNUNET_assert (NULL == neighbor->connections);
neighbor->connections = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_NO);
GNUNET_STATISTICS_update (stats,
if ( (NULL != GCP_get_tunnel (neighbor)) &&
(0 > GNUNET_CRYPTO_cmp_peer_identity (&my_full_id, peer)) )
+ {
GCP_connect (neighbor);
+ }
GCC_check_connections ();
+
+ return neighbor;
}
/**
* Method called whenever a peer disconnects.
*
- * @param cls closure
- * @param peer peer identity this notification is about
+ * @param cls Core closure (unused).
+ * @param peer Peer identity this notification is about.
+ * @param internal_cls Internal closure (CadetPeer struct).
*/
static void
-core_disconnect (void *cls,
- const struct GNUNET_PeerIdentity *peer)
+core_disconnect_handler (void *cls,
+ const struct GNUNET_PeerIdentity *peer,
+ void *internal_cls)
{
- struct CadetPeer *p;
+ struct CadetPeer *p = internal_cls;
struct CadetPeerPath *direct_path;
char own_id[16];
GCC_check_connections ();
strncpy (own_id, GNUNET_i2s (&my_full_id), 16);
own_id[15] = '\0';
- p = GNUNET_CONTAINER_multipeermap_get (peers, peer);
- if (NULL == p)
- {
- GNUNET_break (GNUNET_YES == in_shutdown);
- return;
- }
if (myid == p->id)
+ {
LOG (GNUNET_ERROR_TYPE_INFO,
"DISCONNECTED %s (self)\n",
own_id);
+ }
else
+ {
LOG (GNUNET_ERROR_TYPE_INFO,
"DISCONNECTED %s <= %s\n",
own_id, GNUNET_i2s (peer));
+ p->core_mq = NULL;
+ }
direct_path = pop_direct_path (p);
if (NULL != p->connections)
{
GNUNET_CONTAINER_multihashmap_destroy (p->connections);
p->connections = NULL;
}
- if (NULL != p->core_transmit)
- {
- GNUNET_CORE_notify_transmit_ready_cancel (p->core_transmit);
- p->core_transmit = NULL;
- p->tmt_time.abs_value_us = 0;
- }
GNUNET_STATISTICS_update (stats,
"# peers",
-1,
}
-/**
- * Functions to handle messages from core
- */
-static struct GNUNET_CORE_MessageHandler core_handlers[] = {
- {&GCC_handle_create, GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE, 0},
- {&GCC_handle_confirm, GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK,
- sizeof (struct GNUNET_CADET_ConnectionACK)},
- {&GCC_handle_broken, GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN,
- sizeof (struct GNUNET_CADET_ConnectionBroken)},
- {&GCC_handle_destroy, GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY,
- sizeof (struct GNUNET_CADET_ConnectionDestroy)},
- {&GCC_handle_ack, GNUNET_MESSAGE_TYPE_CADET_ACK,
- sizeof (struct GNUNET_CADET_ACK)},
- {&GCC_handle_poll, GNUNET_MESSAGE_TYPE_CADET_POLL,
- sizeof (struct GNUNET_CADET_Poll)},
- {&GCC_handle_kx, GNUNET_MESSAGE_TYPE_CADET_KX, 0},
- {&GCC_handle_encrypted, GNUNET_MESSAGE_TYPE_CADET_AX, 0},
- {NULL, 0, 0}
-};
-
+/******************************************************************************/
+/******************************************************************************/
+/******************************************************************************/
+/******************************************************************************/
+/******************************************************************************/
/**
- * To be called on core init/fail.
+ * Check if the create_connection message has the appropriate size.
*
- * @param cls Closure (config)
- * @param identity the public identity of this peer
+ * @param cls Closure (unused).
+ * @param msg Message to check.
+ *
+ * @return #GNUNET_YES if size is correct, #GNUNET_NO otherwise.
*/
-static void
-core_init (void *cls,
- const struct GNUNET_PeerIdentity *identity)
+static int
+check_create (void *cls, const struct GNUNET_CADET_ConnectionCreate *msg)
{
- const struct GNUNET_CONFIGURATION_Handle *c = cls;
- static int i = 0;
+ uint16_t size;
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Core init\n");
- if (0 != memcmp (identity, &my_full_id, sizeof (my_full_id)))
+ size = ntohs (msg->header.size);
+ if (size < sizeof (*msg))
{
- LOG (GNUNET_ERROR_TYPE_ERROR, _("Wrong CORE service\n"));
- LOG (GNUNET_ERROR_TYPE_ERROR, " core id %s\n", GNUNET_i2s (identity));
- LOG (GNUNET_ERROR_TYPE_ERROR, " my id %s\n", GNUNET_i2s (&my_full_id));
- GNUNET_CORE_disconnect (core_handle);
- core_handle = GNUNET_CORE_connect (c, /* Main configuration */
- NULL, /* Closure passed to CADET functions */
- &core_init, /* Call core_init once connected */
- &core_connect, /* Handle connects */
- &core_disconnect, /* remove peers on disconnects */
- NULL, /* Don't notify about all incoming messages */
- GNUNET_NO, /* For header only in notification */
- NULL, /* Don't notify about all outbound messages */
- GNUNET_NO, /* For header-only out notification */
- core_handlers); /* Register these handlers */
- if (10 < i++)
- GNUNET_assert (0);
+ GNUNET_break_op (0);
+ return GNUNET_NO;
}
- GML_start ();
+ return GNUNET_YES;
}
-
/**
- * Core callback to write a pre-constructed data packet to core buffer
- *
- * @param cls Closure (CadetTransmissionDescriptor with data in "data" member).
- * @param size Number of bytes available in buf.
- * @param buf Where the to write the message.
- *
- * @return number of bytes written to buf
- */
-static size_t
-send_core_data_raw (void *cls, size_t size, void *buf)
+ * Handle for #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE
+ *
+ * @param cls Closure (CadetPeer for neighbor that sent the message).
+ * @param msg Message itself.
+ */
+static void
+handle_create (void *cls, const struct GNUNET_CADET_ConnectionCreate *msg)
{
- struct GNUNET_MessageHeader *msg = cls;
- size_t total_size;
-
- GNUNET_assert (NULL != msg);
- total_size = ntohs (msg->size);
-
- if (total_size > size)
- {
- GNUNET_break (0);
- return 0;
- }
- GNUNET_memcpy (buf, msg, total_size);
- GNUNET_free (cls);
- return total_size;
+ struct CadetPeer *peer = cls;
+ GCC_handle_create (peer, msg);
}
/**
- * Function to send a create connection message to a peer.
+ * Handle for #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK
*
- * @param c Connection to create.
- * @param size number of bytes available in buf
- * @param buf where the callee should write the message
- * @return number of bytes written to buf
+ * @param cls Closure (CadetPeer for neighbor that sent the message).
+ * @param msg Message itself.
*/
-static size_t
-send_core_connection_create (struct CadetConnection *c, size_t size, void *buf)
+static void
+handle_confirm (void *cls, const struct GNUNET_CADET_ConnectionACK *msg)
{
- struct GNUNET_CADET_ConnectionCreate *msg;
- struct GNUNET_PeerIdentity *peer_ptr;
- const struct CadetPeerPath *p = GCC_get_path (c);
- size_t size_needed;
- int i;
-
- if (NULL == p)
- return 0;
-
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending CONNECTION CREATE...\n");
- size_needed =
- sizeof (struct GNUNET_CADET_ConnectionCreate) +
- p->length * sizeof (struct GNUNET_PeerIdentity);
-
- if (size < size_needed || NULL == buf)
- {
- GNUNET_break (0);
- return 0;
- }
- msg = (struct GNUNET_CADET_ConnectionCreate *) buf;
- msg->header.size = htons (size_needed);
- msg->header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE);
- msg->cid = *GCC_get_id (c);
+ struct CadetPeer *peer = cls;
+ GCC_handle_confirm (peer, msg);
+}
- peer_ptr = (struct GNUNET_PeerIdentity *) &msg[1];
- for (i = 0; i < p->length; i++)
- {
- GNUNET_PEER_resolve (p->peers[i], peer_ptr++);
- }
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "CONNECTION CREATE (%u bytes long) sent!\n",
- size_needed);
- return size_needed;
+/**
+ * Handle for #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN
+ *
+ * @param cls Closure (CadetPeer for neighbor that sent the message).
+ * @param msg Message itself.
+ */
+static void
+handle_broken (void *cls, const struct GNUNET_CADET_ConnectionBroken *msg)
+{
+ struct CadetPeer *peer = cls;
+ GCC_handle_broken (peer, msg);
}
/**
- * Creates a path ack message in buf and frees all unused resources.
- *
- * @param c Connection to send an ACK on.
- * @param size number of bytes available in buf
- * @param buf where the callee should write the message
+ * Handle for #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY
*
- * @return number of bytes written to buf
+ * @param cls Closure (CadetPeer for neighbor that sent the message).
+ * @param msg Message itself.
*/
-static size_t
-send_core_connection_ack (struct CadetConnection *c, size_t size, void *buf)
+static void
+handle_destroy (void *cls, const struct GNUNET_CADET_ConnectionDestroy *msg)
{
- struct GNUNET_CADET_ConnectionACK *msg = buf;
+ struct CadetPeer *peer = cls;
+ GCC_handle_destroy (peer, msg);
+}
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending CONNECTION ACK...\n");
- if (sizeof (struct GNUNET_CADET_ConnectionACK) > size)
- {
- GNUNET_break (0);
- return 0;
- }
- msg->header.size = htons (sizeof (struct GNUNET_CADET_ConnectionACK));
- msg->header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK);
- msg->cid = *GCC_get_id (c);
- LOG (GNUNET_ERROR_TYPE_DEBUG, "CONNECTION ACK sent!\n");
- return sizeof (struct GNUNET_CADET_ConnectionACK);
+/**
+ * Handle for #GNUNET_MESSAGE_TYPE_CADET_ACK
+ *
+ * @param cls Closure (CadetPeer for neighbor that sent the message).
+ * @param msg Message itself.
+ */
+static void
+handle_ack (void *cls, const struct GNUNET_CADET_ACK *msg)
+{
+ struct CadetPeer *peer = cls;
+ GCC_handle_ack (peer, msg);
}
-/******************************************************************************/
-/******************************** STATIC ***********************************/
-/******************************************************************************/
+/**
+ * Handle for #GNUNET_MESSAGE_TYPE_CADET_POLL
+ *
+ * @param cls Closure (CadetPeer for neighbor that sent the message).
+ * @param msg Message itself.
+ */
+static void
+handle_poll (void *cls, const struct GNUNET_CADET_Poll *msg)
+{
+ struct CadetPeer *peer = cls;
+ GCC_handle_poll (peer, msg);
+}
/**
- * Get priority for a queued message.
+ * Check if the Key eXchange message has the appropriate size.
*
- * @param q Queued message
+ * @param cls Closure (unused).
+ * @param msg Message to check.
*
- * @return CORE priority to use.
+ * @return #GNUNET_YES if size is correct, #GNUNET_NO otherwise.
*/
-static enum GNUNET_CORE_Priority
-get_priority (struct CadetPeerQueue *q)
+static int
+check_kx (void *cls, const struct GNUNET_CADET_KX *msg)
{
- enum GNUNET_CORE_Priority low;
- enum GNUNET_CORE_Priority high;
+ uint16_t size;
+ uint16_t expected_size;
- if (NULL == q)
- {
- GNUNET_break (0);
- return GNUNET_CORE_PRIO_BACKGROUND;
- }
+ size = ntohs (msg->header.size);
+ expected_size = sizeof (struct GNUNET_CADET_KX)
+ + sizeof (struct GNUNET_MessageHeader);
- /* Relayed traffic has lower priority, our own traffic has higher */
- if (NULL == q->c || GNUNET_NO == GCC_is_origin (q->c, q->fwd))
- {
- low = GNUNET_CORE_PRIO_BEST_EFFORT;
- high = GNUNET_CORE_PRIO_URGENT;
- }
- else
+ if (size < expected_size)
{
- low = GNUNET_CORE_PRIO_URGENT;
- high = GNUNET_CORE_PRIO_CRITICAL_CONTROL;
+ GNUNET_break_op (0);
+ return GNUNET_NO;
}
+ return GNUNET_YES;
+}
- /* Bulky payload has lower priority, control traffic has higher. */
- if (GNUNET_MESSAGE_TYPE_CADET_AX == q->type)
- return low;
- return high;
+/**
+ * Handle for #GNUNET_MESSAGE_TYPE_CADET_KX
+ *
+ * @param cls Closure (CadetPeer for neighbor that sent the message).
+ * @param msg Message itself.
+ */
+static void
+handle_kx (void *cls, const struct GNUNET_CADET_KX *msg)
+{
+ struct CadetPeer *peer = cls;
+ GCC_handle_kx (peer, msg);
}
/**
- * Destroy the peer_info and free any allocated resources linked to it
+ * Check if the encrypted message has the appropriate size.
*
- * @param peer The peer_info to destroy.
- * @return #GNUNET_OK on success
+ * @param cls Closure (unused).
+ * @param msg Message to check.
+ *
+ * @return #GNUNET_YES if size is correct, #GNUNET_NO otherwise.
*/
static int
-peer_destroy (struct CadetPeer *peer)
+check_encrypted (void *cls, const struct GNUNET_CADET_AX *msg)
{
- struct GNUNET_PeerIdentity id;
- struct CadetPeerPath *p;
- struct CadetPeerPath *nextp;
+ uint16_t size;
+ uint16_t minimum_size;
- GNUNET_PEER_resolve (peer->id, &id);
- GNUNET_PEER_change_rc (peer->id, -1);
+ size = ntohs (msg->header.size);
+ minimum_size = sizeof (struct GNUNET_CADET_AX)
+ + sizeof (struct GNUNET_MessageHeader);
- LOG (GNUNET_ERROR_TYPE_INFO,
+ if (size < minimum_size)
+ {
+ GNUNET_break_op (0);
+ return GNUNET_NO;
+ }
+ return GNUNET_YES;
+}
+
+/**
+ * Handle for #GNUNET_MESSAGE_TYPE_CADET_AX (AXolotl encrypted traffic).
+ *
+ * @param cls Closure (CadetPeer for neighbor that sent the message).
+ * @param msg Message itself.
+ */
+static void
+handle_encrypted (void *cls, const struct GNUNET_CADET_AX *msg)
+{
+ struct CadetPeer *peer = cls;
+ GCC_handle_encrypted (peer, msg);
+}
+
+
+/**
+ * To be called on core init/fail.
+ *
+ * @param cls Closure (config)
+ * @param identity The public identity of this peer.
+ */
+static void
+core_init_notify (void *cls,
+ const struct GNUNET_PeerIdentity *identity);
+
+
+static void
+connect_to_core (const struct GNUNET_CONFIGURATION_Handle *c)
+{
+ struct GNUNET_MQ_MessageHandler core_handlers[] = {
+ GNUNET_MQ_hd_var_size (create,
+ GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE,
+ struct GNUNET_CADET_ConnectionCreate,
+ NULL),
+ GNUNET_MQ_hd_fixed_size (confirm,
+ GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK,
+ struct GNUNET_CADET_ConnectionACK,
+ NULL),
+ GNUNET_MQ_hd_fixed_size (broken,
+ GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN,
+ struct GNUNET_CADET_ConnectionBroken,
+ NULL),
+ GNUNET_MQ_hd_fixed_size (destroy,
+ GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY,
+ struct GNUNET_CADET_ConnectionDestroy,
+ NULL),
+ GNUNET_MQ_hd_fixed_size (ack,
+ GNUNET_MESSAGE_TYPE_CADET_ACK,
+ struct GNUNET_CADET_ACK,
+ NULL),
+ GNUNET_MQ_hd_fixed_size (poll,
+ GNUNET_MESSAGE_TYPE_CADET_POLL,
+ struct GNUNET_CADET_Poll,
+ NULL),
+ GNUNET_MQ_hd_var_size (kx,
+ GNUNET_MESSAGE_TYPE_CADET_KX,
+ struct GNUNET_CADET_KX,
+ NULL),
+ GNUNET_MQ_hd_var_size (encrypted,
+ GNUNET_MESSAGE_TYPE_CADET_AX,
+ struct GNUNET_CADET_AX,
+ NULL),
+ GNUNET_MQ_handler_end ()
+ };
+ core_handle = GNUNET_CORE_connecT (c, NULL,
+ &core_init_notify,
+ &core_connect_handler,
+ &core_disconnect_handler,
+ core_handlers);
+}
+
+/******************************************************************************/
+/******************************************************************************/
+/******************************************************************************/
+/******************************************************************************/
+/******************************************************************************/
+
+/**
+ * To be called on core init/fail.
+ *
+ * @param cls Closure (config)
+ * @param identity The public identity of this peer.
+ */
+static void
+core_init_notify (void *cls,
+ const struct GNUNET_PeerIdentity *core_identity)
+{
+ const struct GNUNET_CONFIGURATION_Handle *c = cls;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Core init\n");
+ if (0 != memcmp (core_identity, &my_full_id, sizeof (my_full_id)))
+ {
+ LOG (GNUNET_ERROR_TYPE_ERROR, _("Wrong CORE service\n"));
+ LOG (GNUNET_ERROR_TYPE_ERROR, " core id %s\n", GNUNET_i2s (core_identity));
+ LOG (GNUNET_ERROR_TYPE_ERROR, " my id %s\n", GNUNET_i2s (&my_full_id));
+ GNUNET_CORE_disconnecT (core_handle);
+ connect_to_core (c);
+ return;
+ }
+ GML_start ();
+}
+
+
+/******************************************************************************/
+/******************************** STATIC ***********************************/
+/******************************************************************************/
+
+
+/**
+ * Get priority for a queued message.
+ *
+ * @param q Queued message
+ *
+ * @return CORE priority to use.
+ *
+ * FIXME make static
+ * FIXME use when sending
+ */
+enum GNUNET_CORE_Priority
+get_priority (struct CadetPeerQueue *q)
+{
+ enum GNUNET_CORE_Priority low;
+ enum GNUNET_CORE_Priority high;
+
+ if (NULL == q)
+ {
+ GNUNET_break (0);
+ return GNUNET_CORE_PRIO_BACKGROUND;
+ }
+
+ /* Relayed traffic has lower priority, our own traffic has higher */
+ if (NULL == q->c || GNUNET_NO == GCC_is_origin (q->c, q->c_fwd))
+ {
+ low = GNUNET_CORE_PRIO_BEST_EFFORT;
+ high = GNUNET_CORE_PRIO_URGENT;
+ }
+ else
+ {
+ low = GNUNET_CORE_PRIO_URGENT;
+ high = GNUNET_CORE_PRIO_CRITICAL_CONTROL;
+ }
+
+ /* Bulky payload has lower priority, control traffic has higher. */
+ if (GNUNET_MESSAGE_TYPE_CADET_AX == q->type)
+ return low;
+ return high;
+}
+
+
+/**
+ * Destroy the peer_info and free any allocated resources linked to it
+ *
+ * @param peer The peer_info to destroy.
+ * @return #GNUNET_OK on success
+ */
+static int
+peer_destroy (struct CadetPeer *peer)
+{
+ struct GNUNET_PeerIdentity id;
+ struct CadetPeerPath *p;
+ struct CadetPeerPath *nextp;
+
+ GNUNET_PEER_resolve (peer->id, &id);
+ GNUNET_PEER_change_rc (peer->id, -1);
+
+ LOG (GNUNET_ERROR_TYPE_INFO,
"destroying peer %s\n",
GNUNET_i2s (&id));
GNUNET_ATS_connectivity_suggest_cancel (peer->connectivity_suggestion);
peer->connectivity_suggestion = NULL;
}
- while (NULL != peer->queue_head)
- {
- /* This function destroys the current peer->queue_head but
- * replaces it with the next in the queue, so it is correct
- * to while() here.
- */
- GCP_queue_destroy (peer->queue_head, GNUNET_YES, GNUNET_NO, 0);
- }
- if (NULL != peer->core_transmit)
- {
- GNUNET_break (0); /* GCP_queue_destroy should've cancelled it! */
- GNUNET_CORE_notify_transmit_ready_cancel (peer->core_transmit);
- peer->core_transmit = NULL;
- }
GNUNET_free_non_null (peer->hello);
GNUNET_free (peer);
}
-
/**
* Check if peer is searching for a path (either active or delayed search).
*
}
-/**
- * Is this queue element sendable?
- *
- * - All management traffic is always sendable.
- * - For payload traffic, check the connection flow control.
- *
- * @param q Queue element to inspect.
- * @return #GNUNET_YES if it is sendable, #GNUNET_NO otherwise.
- */
-static int
-queue_is_sendable (struct CadetPeerQueue *q)
-{
- /* Is PID-independent? */
- switch (q->type)
- {
- case GNUNET_MESSAGE_TYPE_CADET_ACK:
- case GNUNET_MESSAGE_TYPE_CADET_POLL:
- case GNUNET_MESSAGE_TYPE_CADET_KX:
- case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE:
- case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK:
- case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY:
- case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN:
- return GNUNET_YES;
-
- case GNUNET_MESSAGE_TYPE_CADET_AX:
- break;
-
- default:
- GNUNET_break (0);
- }
-
- return GCC_is_sendable (q->c, q->fwd);
-}
-
-
-/**
- * Get first sendable message.
- *
- * @param peer The destination peer.
- *
- * @return First transmittable message, if any. Otherwise, NULL.
- */
-static struct CadetPeerQueue *
-peer_get_first_message (const struct CadetPeer *peer)
-{
- struct CadetPeerQueue *q;
-
- for (q = peer->queue_head; NULL != q; q = q->next)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Checking q:%p on c:%s\n", q, GCC_2s (q->c));
- if (queue_is_sendable (q))
- return q;
- }
-
- return NULL;
-}
-
-
/**
* Function to process paths received for a new peer addition. The recorded
* paths form the initial tunnel, which can be optimized later.
}
-/**
- * Adjust core requested size to accomodate an ACK.
- *
- * @param message_size Requested size.
- *
- * @return Size enough to fit @c message_size and an ACK.
- */
-static size_t
-get_core_size (size_t message_size)
-{
- return message_size + sizeof (struct GNUNET_CADET_ACK);
-}
-
/**
* Test if a message type is connection management traffic
* or regular payload traffic.
}
-/**
- * Fill a core buffer with the appropriate data for the queued message.
- *
- * @param queue Queue element for the message.
- * @param buf Core buffer to fill.
- * @param size Size remaining in @c buf.
- * @param[out] pid In case its an encrypted payload, set payload.
- *
- * @return Bytes written to @c buf.
- */
-static size_t
-fill_buf (struct CadetPeerQueue *queue, void *buf, size_t size, uint32_t *pid)
-{
- struct CadetConnection *c = queue->c;
- size_t msg_size;
-
- switch (queue->type)
- {
- case GNUNET_MESSAGE_TYPE_CADET_AX:
- *pid = GCC_get_pid (queue->c, queue->fwd);
- LOG (GNUNET_ERROR_TYPE_DEBUG, " ax payload ID %u\n", *pid);
- msg_size = send_core_data_raw (queue->cls, size, buf);
- ((struct GNUNET_CADET_AX *) buf)->pid = htonl (*pid);
- break;
- case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY:
- case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN:
- case GNUNET_MESSAGE_TYPE_CADET_KX:
- case GNUNET_MESSAGE_TYPE_CADET_ACK:
- case GNUNET_MESSAGE_TYPE_CADET_POLL:
- LOG (GNUNET_ERROR_TYPE_DEBUG, " raw %s\n", GC_m2s (queue->type));
- msg_size = send_core_data_raw (queue->cls, size, buf);
- break;
- case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE:
- LOG (GNUNET_ERROR_TYPE_DEBUG, " path create\n");
- if (GCC_is_origin (c, GNUNET_YES))
- msg_size = send_core_connection_create (c, size, buf);
- else
- msg_size = send_core_data_raw (queue->cls, size, buf);
- break;
- case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK:
- LOG (GNUNET_ERROR_TYPE_DEBUG, " path ack\n");
- if (GCC_is_origin (c, GNUNET_NO) ||
- GCC_is_origin (c, GNUNET_YES))
- {
- msg_size = send_core_connection_ack (c, size, buf);
- }
- else
- {
- msg_size = send_core_data_raw (queue->cls, size, buf);
- }
- break;
- case GNUNET_MESSAGE_TYPE_CADET_DATA:
- case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_CREATE:
- case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_DESTROY:
- /* This should be encapsulted */
- msg_size = 0;
- GNUNET_assert (0);
- break;
- default:
- GNUNET_break (0);
- LOG (GNUNET_ERROR_TYPE_WARNING, " type unknown: %u\n", queue->type);
- msg_size = 0;
- }
-
- GNUNET_assert (size >= msg_size);
-
- return msg_size;
-}
-
-
/**
* Debug function should NEVER return true in production code, useful to
* simulate losses for testcases.
*
- * @param q Queue handle with info about the message.
- *
* @return #GNUNET_YES or #GNUNET_NO with the decision to drop.
*/
static int
-should_I_drop (struct CadetPeerQueue *q)
+should_I_drop (void)
{
if (0 == drop_percent)
return GNUNET_NO;
}
-/**
- * Core callback to write a queued packet to core buffer
- *
- * @param cls Closure (peer info).
- * @param size Number of bytes available in buf.
- * @param buf Where the to write the message.
- *
- * @return number of bytes written to buf
- */
-static size_t
-queue_send (void *cls, size_t size, void *buf)
-{
- struct CadetPeer *peer = cls;
- struct CadetConnection *c;
- struct CadetPeerQueue *queue;
- struct GNUNET_TIME_Relative core_wait_time;
- const char *wait_s;
- const struct GNUNET_PeerIdentity *dst_id;
- size_t msg_size;
- size_t total_size;
- size_t rest;
- char *dst;
- uint32_t pid;
-
- GCC_check_connections ();
- LOG (GNUNET_ERROR_TYPE_DEBUG, "\n");
- LOG (GNUNET_ERROR_TYPE_DEBUG, "\n");
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Queue send towards %s (max %u)\n",
- GCP_2s (peer), size);
-
- /* Sanity checking */
- if (NULL == buf || 0 == size)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG, " not allowed/\n");
- if (GNUNET_NO == in_shutdown)
- {
- queue = peer_get_first_message (peer);
- if (NULL == queue)
- {
- peer->core_transmit = NULL;
- peer->tmt_time.abs_value_us = 0;
- GCC_check_connections ();
- return 0;
- }
- dst_id = GNUNET_PEER_resolve2 (peer->id);
- peer->core_transmit =
- GNUNET_CORE_notify_transmit_ready (core_handle,
- GNUNET_NO, get_priority (queue),
- GNUNET_TIME_UNIT_FOREVER_REL,
- dst_id,
- get_core_size (queue->size),
- &queue_send,
- peer);
- peer->tmt_time = GNUNET_TIME_absolute_get ();
- }
- else
- {
- peer->core_transmit = NULL;
- peer->tmt_time.abs_value_us = 0;
- }
- GCC_check_connections ();
- return 0;
- }
-
- /* Init */
- rest = size;
- total_size = 0;
- dst = (char *) buf;
- pid = 0;
- peer->core_transmit = NULL;
- queue = peer_get_first_message (peer);
- if (NULL == queue)
- {
- GNUNET_break (0); /* Core tmt_rdy should've been canceled */
- peer->tmt_time.abs_value_us = 0;
- return 0;
- }
- core_wait_time = GNUNET_TIME_absolute_get_duration (peer->tmt_time);
- wait_s = GNUNET_STRINGS_relative_time_to_string (core_wait_time, GNUNET_YES);
- if (core_wait_time.rel_value_us >= 1000000)
- {
- LOG (GNUNET_ERROR_TYPE_WARNING,
- " %s: core wait time %s (> 1 second) for %u bytes\n",
- GCP_2s (peer), wait_s, queue->size);
- }
- peer->tmt_time.abs_value_us = 0;
-
- /* Copy all possible messages to the core buffer */
- while (NULL != queue && rest >= queue->size)
- {
- c = queue->c;
-
- LOG (GNUNET_ERROR_TYPE_DEBUG, " on conn %s %s\n",
- GCC_2s (c), GC_f2s(queue->fwd));
- LOG (GNUNET_ERROR_TYPE_DEBUG, " size %u ok (%u/%u)\n",
- queue->size, total_size, size);
-
- msg_size = fill_buf (queue, (void *) dst, size, &pid);
-
- if (should_I_drop (queue))
- {
- LOG (GNUNET_ERROR_TYPE_WARNING, "DD %s (%s %u) on conn %s %s\n",
- GC_m2s (queue->type), GC_m2s (queue->payload_type),
- queue->payload_id, GCC_2s (c), GC_f2s (queue->fwd));
- msg_size = 0;
- }
- else
- {
- LOG (GNUNET_ERROR_TYPE_INFO,
- ">>> %s (%s %4u) on conn %s (%p) %s [%5u], after %s\n",
- GC_m2s (queue->type), GC_m2s (queue->payload_type),
- queue->payload_id, GCC_2s (c), c,
- GC_f2s (queue->fwd), msg_size, wait_s);
- }
- total_size += msg_size;
- rest -= msg_size;
- dst = &dst[msg_size];
- msg_size = 0;
-
- /* Free queue, but cls was freed by send_core_* in fill_buf. */
- (void) GCP_queue_destroy (queue, GNUNET_NO, GNUNET_YES, pid);
-
- /* Next! */
- queue = peer_get_first_message (peer);
- }
-
- /* If more data in queue, send next */
- if (NULL != queue)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG, " more data! (%u)\n", queue->size);
- if (NULL == peer->core_transmit)
- {
- dst_id = GNUNET_PEER_resolve2 (peer->id);
- peer->core_transmit =
- GNUNET_CORE_notify_transmit_ready (core_handle,
- GNUNET_NO, get_priority (queue),
- GNUNET_TIME_UNIT_FOREVER_REL,
- dst_id,
- get_core_size (queue->size),
- &queue_send,
- peer);
- peer->tmt_time = GNUNET_TIME_absolute_get ();
- queue->start_waiting = GNUNET_TIME_absolute_get ();
- }
- else
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG, "* tmt rdy called somewhere else\n");
- }
-// GCC_start_poll (); FIXME needed?
- }
- else
- {
-// GCC_stop_poll(); FIXME needed?
- }
-
- LOG (GNUNET_ERROR_TYPE_DEBUG, " return %d\n", total_size);
- queue_debug (peer, GNUNET_ERROR_TYPE_DEBUG);
- GCC_check_connections ();
- return total_size;
-}
-
-
/******************************************************************************/
/******************************** API ***********************************/
/******************************************************************************/
-
/**
- * Free a transmission that was already queued with all resources
- * associated to the request.
+ * Call the continuation after a message has been sent or dropped.
*
- * If connection was marked to be destroyed, and this was the last queued
- * message on it, the connection will be free'd as a result.
- *
- * @param queue Queue handler to cancel.
- * @param clear_cls Is it necessary to free associated cls?
- * @param sent Was it really sent? (Could have been canceled)
- * @param pid PID, if relevant (was sent and was a payload message).
- *
- * @return #GNUNET_YES if connection was destroyed as a result,
- * #GNUNET_NO otherwise.
+ * @param q Queue handle.
+ * @param sent #GNUNET_YES if was sent to CORE, #GNUNET_NO if dropped.
*/
-int
-GCP_queue_destroy (struct CadetPeerQueue *queue,
- int clear_cls,
- int sent,
- uint32_t pid)
+static void
+call_peer_cont (const struct CadetPeerQueue *q, int sent)
{
- struct CadetPeer *peer;
- int connection_destroyed;
-
- GCC_check_connections ();
- peer = queue->peer;
- LOG (GNUNET_ERROR_TYPE_DEBUG, "queue destroy %s\n", GC_m2s (queue->type));
- if (GNUNET_YES == clear_cls)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG, " free cls\n");
- switch (queue->type)
- {
- case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY:
- LOG (GNUNET_ERROR_TYPE_INFO, "destroying a DESTROY message\n");
- /* fall through */
- case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK:
- case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE:
- case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN:
- case GNUNET_MESSAGE_TYPE_CADET_KX:
- case GNUNET_MESSAGE_TYPE_CADET_AX:
- case GNUNET_MESSAGE_TYPE_CADET_ACK:
- case GNUNET_MESSAGE_TYPE_CADET_POLL:
- GNUNET_free_non_null (queue->cls);
- break;
-
- default:
- GNUNET_break (0);
- LOG (GNUNET_ERROR_TYPE_ERROR, " type %s unknown!\n",
- GC_m2s (queue->type));
- }
- }
- GNUNET_CONTAINER_DLL_remove (peer->queue_head, peer->queue_tail, queue);
-
- if (!is_connection_management (queue->type))
- {
- peer->queue_n--;
- }
-
- if (NULL != queue->cont)
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " core mq just sent %s\n", GC_m2s (q->type));
+ if (NULL != q->cont)
{
struct GNUNET_TIME_Relative wait_time;
- wait_time = GNUNET_TIME_absolute_get_duration (queue->start_waiting);
+ wait_time = GNUNET_TIME_absolute_get_duration (q->queue_timestamp);
LOG (GNUNET_ERROR_TYPE_DEBUG, " calling callback, time elapsed %s\n",
GNUNET_STRINGS_relative_time_to_string (wait_time, GNUNET_NO));
- connection_destroyed = queue->cont (queue->cont_cls,
- queue->c, sent, queue->type, pid,
- queue->fwd, queue->size, wait_time);
- }
- else
- {
- connection_destroyed = GNUNET_NO;
+ q->cont (q->cont_cls,
+ q->c, q->c_fwd, sent,
+ q->type, q->payload_type, q->payload_id,
+ q->size, wait_time);
}
+}
+
+
+/**
+ * Function called by MQ when a message is sent to CORE.
+ *
+ * @param cls Closure (queue handle).
+ */
+static void
+mq_sent (void *cls)
+{
+ struct CadetPeerQueue *q = cls;
- if (NULL == peer_get_first_message (peer) && NULL != peer->core_transmit)
+ if (GNUNET_NO == q->management_traffic)
{
- GNUNET_CORE_notify_transmit_ready_cancel (peer->core_transmit);
- peer->core_transmit = NULL;
- peer->tmt_time.abs_value_us = 0;
+ q->peer->queue_n--;
}
-
- GNUNET_free (queue);
- GCC_check_connections ();
- return connection_destroyed;
+ call_peer_cont (q, GNUNET_YES);
+ GNUNET_free (q);
}
/**
- * @brief Queue and pass message to core when possible.
+ * @brief Send a message to another peer (using CORE).
*
* @param peer Peer towards which to queue the message.
- * @param cls Closure (@c type dependant). It will be used by queue_send to
- * build the message to be sent if not already prebuilt.
- * @param type Type of the message.
- * @param payload_type Type of the message's payload
+ * @param message Message to send.
+ * @param payload_type Type of the message's payload, for debug messages.
* 0 if the message is a retransmission (unknown payload).
* UINT16_MAX if the message does not have payload.
* @param payload_id ID of the payload (MID, ACK #, etc)
- * @param size Size of the message.
* @param c Connection this message belongs to (can be NULL).
* @param fwd Is this a message going root->dest? (FWD ACK are NOT FWD!)
- * @param cont Continuation to be called once CORE has taken the message.
+ * @param cont Continuation to be called once CORE has sent the message.
* @param cont_cls Closure for @c cont.
*
- * @return Handle to cancel the message before it is sent. Once cont is called
- * message has been sent and therefore the handle is no longer valid.
+ * @return A handle to the message in the queue or NULL (if dropped).
*/
struct CadetPeerQueue *
-GCP_queue_add (struct CadetPeer *peer,
- void *cls,
- uint16_t type,
- uint16_t payload_type,
- uint32_t payload_id,
- size_t size,
- struct CadetConnection *c,
- int fwd,
- GCP_sent cont,
- void *cont_cls)
+GCP_send (struct CadetPeer *peer,
+ const struct GNUNET_MessageHeader *message,
+ uint16_t payload_type,
+ uint32_t payload_id,
+ struct CadetConnection *c,
+ int fwd,
+ GCP_sent cont,
+ void *cont_cls)
{
struct CadetPeerQueue *q;
- int priority;
- int call_core;
+ uint16_t type;
+ uint16_t size;
GCC_check_connections ();
+ type = ntohs (message->type);
+ size = ntohs (message->size);
LOG (GNUNET_ERROR_TYPE_DEBUG,
"que %s (%s %4u) on conn %s (%p) %s towards %s (size %u)\n",
GC_m2s (type), GC_m2s (payload_type), payload_id,
if (NULL == peer->connections)
{
/* We are not connected to this peer, ignore request. */
+ GNUNET_break (0);
LOG (GNUNET_ERROR_TYPE_INFO, "%s not a neighbor\n", GCP_2s (peer));
GNUNET_STATISTICS_update (stats, "# messages dropped due to wrong hop", 1,
GNUNET_NO);
return NULL;
}
- priority = 0;
- if (is_connection_management (type))
- {
- priority = 100;
- }
- LOG (GNUNET_ERROR_TYPE_DEBUG, "priority %d\n", priority);
-
- call_core = (NULL == c || GNUNET_MESSAGE_TYPE_CADET_KX == type) ?
- GNUNET_YES : GCC_is_sendable (c, fwd);
q = GNUNET_new (struct CadetPeerQueue);
- q->cls = cls;
+ q->env = GNUNET_MQ_msg_copy (message);
+ q->peer = peer;
+ q->cont = cont;
+ q->cont_cls = cont_cls;
+ q->queue_timestamp = GNUNET_TIME_absolute_get ();
+ q->management_traffic = is_connection_management (type);
q->type = type;
+ q->size = size;
q->payload_type = payload_type;
q->payload_id = payload_id;
- q->size = size;
- q->peer = peer;
q->c = c;
- q->fwd = fwd;
- q->cont = cont;
- q->cont_cls = cont_cls;
- if (100 > priority)
- {
- GNUNET_CONTAINER_DLL_insert_tail (peer->queue_head, peer->queue_tail, q);
- peer->queue_n++;
- }
- else
- {
- GNUNET_CONTAINER_DLL_insert (peer->queue_head, peer->queue_tail, q);
- call_core = GNUNET_YES;
- }
+ q->c_fwd = fwd;
+ GNUNET_MQ_notify_sent (q->env, mq_sent, q);
- q->start_waiting = GNUNET_TIME_absolute_get ();
- if (NULL == peer->core_transmit && GNUNET_YES == call_core)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "calling core tmt rdy towards %s for %u bytes\n",
- GCP_2s (peer), size);
- peer->core_transmit =
- GNUNET_CORE_notify_transmit_ready (core_handle,
- GNUNET_NO, get_priority (q),
- GNUNET_TIME_UNIT_FOREVER_REL,
- GNUNET_PEER_resolve2 (peer->id),
- get_core_size (size),
- &queue_send, peer);
- peer->tmt_time = GNUNET_TIME_absolute_get ();
- }
- else if (GNUNET_NO == call_core)
+ if (GNUNET_YES == q->management_traffic)
{
- LOG (GNUNET_ERROR_TYPE_DEBUG, "core tmt rdy towards %s not needed\n",
- GCP_2s (peer));
-
+ GNUNET_MQ_send (peer->core_mq, q->env); // FIXME implement "_urgent", use
}
else
{
- struct GNUNET_TIME_Relative elapsed;
- elapsed = GNUNET_TIME_absolute_get_duration (peer->tmt_time);
- LOG (GNUNET_ERROR_TYPE_DEBUG, "core tmt rdy towards %s already called %s\n",
- GCP_2s (peer),
- GNUNET_STRINGS_relative_time_to_string (elapsed, GNUNET_NO));
-
- }
- queue_debug (peer, GNUNET_ERROR_TYPE_DEBUG);
- GCC_check_connections ();
- return q;
-}
-
-
-/**
- * Cancel all queued messages to a peer that belong to a certain connection.
- *
- * @param peer Peer towards whom to cancel.
- * @param c Connection whose queued messages to cancel. Might be destroyed by
- * the sent continuation call.
- */
-void
-GCP_queue_cancel (struct CadetPeer *peer,
- struct CadetConnection *c)
-{
- struct CadetPeerQueue *q;
- struct CadetPeerQueue *next;
- struct CadetPeerQueue *prev;
- int connection_destroyed;
-
- GCC_check_connections ();
- connection_destroyed = GNUNET_NO;
- for (q = peer->queue_head; NULL != q; q = next)
- {
- prev = q->prev;
- if (q->c == c)
+ if (GNUNET_YES == should_I_drop())
{
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "GMP queue cancel %s\n",
- GC_m2s (q->type));
- GNUNET_assert (GNUNET_NO == connection_destroyed);
- if (GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY == q->type)
- {
- q->c = NULL;
- }
- else
- {
- connection_destroyed = GCP_queue_destroy (q, GNUNET_YES, GNUNET_NO, 0);
- }
-
- /* Get next from prev, q->next might be already freed:
- * queue destroy -> callback -> GCC_destroy -> cancel_queues -> here
- */
- if (NULL == prev)
- next = peer->queue_head;
- else
- next = prev->next;
- }
- else
- {
- next = q->next;
- }
- }
-
- if ( (NULL == peer->queue_head) &&
- (NULL != peer->core_transmit) )
- {
- GNUNET_CORE_notify_transmit_ready_cancel (peer->core_transmit);
- peer->core_transmit = NULL;
- peer->tmt_time.abs_value_us = 0;
- }
- GCC_check_connections ();
-}
-
-
-/**
- * Get the first transmittable message for a connection.
- *
- * @param peer Neighboring peer.
- * @param c Connection.
- *
- * @return First transmittable message.
- */
-static struct CadetPeerQueue *
-connection_get_first_message (struct CadetPeer *peer, struct CadetConnection *c)
-{
- struct CadetPeerQueue *q;
-
- for (q = peer->queue_head; NULL != q; q = q->next)
- {
- if (q->c != c)
- continue;
- if (queue_is_sendable (q))
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG, " sendable!!\n");
- return q;
+ LOG (GNUNET_ERROR_TYPE_WARNING, "DD %s (%s %u) on conn %s %s\n",
+ GC_m2s (q->type), GC_m2s (q->payload_type),
+ q->payload_id, GCC_2s (c), GC_f2s (q->c_fwd));
+ GNUNET_MQ_discard (q->env);
+ call_peer_cont (q, GNUNET_NO);
+ GNUNET_free (q);
+ return NULL;
}
- LOG (GNUNET_ERROR_TYPE_DEBUG, " not sendable\n");
+ GNUNET_MQ_send (peer->core_mq, q->env);
+ peer->queue_n++;
}
- return NULL;
-}
-
-
-/**
- * Get the first message for a connection and unqueue it.
- *
- * Only tunnel (or higher) level messages are unqueued. Connection specific
- * messages are silently destroyed upon encounter.
- *
- * @param peer Neighboring peer.
- * @param c Connection.
- * @param destroyed[in/out] Was the connection destroyed (prev/as a result)?.
- * Can NOT be NULL.
- *
- * @return First message for this connection.
- */
-struct GNUNET_MessageHeader *
-GCP_connection_pop (struct CadetPeer *peer,
- struct CadetConnection *c,
- int *destroyed)
-{
- struct CadetPeerQueue *q;
- struct CadetPeerQueue *next;
- struct GNUNET_MessageHeader *msg;
- int dest;
-
GCC_check_connections ();
- GNUNET_assert (NULL != destroyed);
- LOG (GNUNET_ERROR_TYPE_DEBUG, "connection_pop on conn %p\n", c);
- for (q = peer->queue_head; NULL != q; q = next)
- {
- next = q->next;
- if (q->c != c)
- continue;
- LOG (GNUNET_ERROR_TYPE_DEBUG, " - queued: %s (%s %u), cont: %p\n",
- GC_m2s (q->type), GC_m2s (q->payload_type), q->payload_id,
- q->cont);
- switch (q->type)
- {
- case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE:
- case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK:
- case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY:
- case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN:
- case GNUNET_MESSAGE_TYPE_CADET_ACK:
- case GNUNET_MESSAGE_TYPE_CADET_POLL:
- dest = GCP_queue_destroy (q, GNUNET_YES, GNUNET_NO, 0);
- if (GNUNET_YES == dest)
- {
- GNUNET_break (GNUNET_NO == *destroyed);
- *destroyed = GNUNET_YES;
- }
- continue;
-
- case GNUNET_MESSAGE_TYPE_CADET_KX:
- case GNUNET_MESSAGE_TYPE_CADET_AX:
- case GNUNET_MESSAGE_TYPE_CADET_AX_KX:
- msg = (struct GNUNET_MessageHeader *) q->cls;
- dest = GCP_queue_destroy (q, GNUNET_NO, GNUNET_NO, 0);
- if (GNUNET_YES == dest)
- {
- GNUNET_break (GNUNET_NO == *destroyed);
- *destroyed = GNUNET_YES;
- }
- return msg;
-
- default:
- GNUNET_break (0);
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Unknown message %s\n", GC_m2s (q->type));
- }
- }
- GCC_check_connections ();
- return NULL;
+ return q;
}
/**
- * Unlock a possibly locked queue for a connection.
+ * Cancel sending a message. Message must have been sent with
+ * #GCP_send before. May not be called after the notify sent
+ * callback has been called.
*
- * If there is a message that can be sent on this connection, call core for it.
- * Otherwise (if core transmit is already called or there is no sendable
- * message) do nothing.
+ * It DOES call the continuation given to #GCP_send.
*
- * @param peer Peer who keeps the queue.
- * @param c Connection whose messages to unlock.
+ * @param q Queue handle to cancel
*/
void
-GCP_queue_unlock (struct CadetPeer *peer, struct CadetConnection *c)
+GCP_send_cancel (struct CadetPeerQueue *q)
{
- struct CadetPeerQueue *q;
- size_t size;
-
- GCC_check_connections ();
- if (NULL != peer->core_transmit)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG, " already unlocked!\n");
- return; /* Already unlocked */
- }
-
- q = connection_get_first_message (peer, c);
- if (NULL == q)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG, " queue empty!\n");
- return; /* Nothing to transmit */
- }
-
- size = q->size;
- peer->core_transmit =
- GNUNET_CORE_notify_transmit_ready (core_handle,
- GNUNET_NO, get_priority (q),
- GNUNET_TIME_UNIT_FOREVER_REL,
- GNUNET_PEER_resolve2 (peer->id),
- get_core_size (size),
- &queue_send,
- peer);
- peer->tmt_time = GNUNET_TIME_absolute_get ();
- GCC_check_connections ();
+ call_peer_cont (q, GNUNET_NO);
+ GNUNET_MQ_send_cancel (q->env);
+ GNUNET_free (q);
}
LOG (GNUNET_ERROR_TYPE_WARNING, "**************************************\n");
}
ats_ch = GNUNET_ATS_connectivity_init (c);
- core_handle = GNUNET_CORE_connect (c, /* Main configuration */
- NULL, /* Closure passed to CADET functions */
- &core_init, /* Call core_init once connected */
- &core_connect, /* Handle connects */
- &core_disconnect, /* remove peers on disconnects */
- NULL, /* Don't notify about all incoming messages */
- GNUNET_NO, /* For header only in notification */
- NULL, /* Don't notify about all outbound messages */
- GNUNET_NO, /* For header-only out notification */
- core_handlers); /* Register these handlers */
+ connect_to_core (c);
if (NULL == core_handle)
{
GNUNET_break (0);
GNUNET_SCHEDULER_shutdown ();
- return;
}
-
}
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Shutting down peer subsystem\n");
in_shutdown = GNUNET_YES;
- GNUNET_CONTAINER_multipeermap_iterate (peers,
- &shutdown_peer,
- NULL);
if (NULL != core_handle)
{
- GNUNET_CORE_disconnect (core_handle);
- core_handle = NULL;
+ GNUNET_CORE_disconnecT (core_handle);
+ core_handle = NULL;
}
if (NULL != ats_ch)
{
ats_ch = NULL;
}
GNUNET_PEER_change_rc (myid, -1);
+ /* With MQ API, CORE calls the disconnect handler for every peer
+ * after calling GNUNET_CORE_disconnecT, shutdown must occur *after* that.
+ */
+ GNUNET_CONTAINER_multipeermap_iterate (peers,
+ &shutdown_peer,
+ NULL);
GNUNET_CONTAINER_multipeermap_destroy (peers);
peers = NULL;
}
}
/* Is not a neighbor but connections is not NULL, probably disconnecting */
- GNUNET_break (0);
return GNUNET_NO;
}
{
unsigned int i;
- /* TODO: invert and add */
+ /* TODO: invert and add to origin */
+ /* TODO: replace all "GCP_add_path" with this, make the other one static */
GCC_check_connections ();
for (i = 0; i < p->length && p->peers[i] != myid; i++) /* skip'em */ ;
for (i++; i < p->length; i++)
struct CadetPeer;
/**
- * Struct containing info about a queued transmission to this peer
+ * Handle to queued messages on a peer level.
*/
struct CadetPeerQueue;
*
* @param cls Closure.
* @param c Connection this message was on.
+ * @param fwd Was this a FWD going message?
* @param sent Was it really sent? (Could have been canceled)
* @param type Type of message sent.
- * @param pid Packet ID, or 0 if not applicable (create, destroy, etc).
- * @param fwd Was this a FWD going message?
+ * @param payload_type Type of payload, if applicable.
+ * @param pid Message ID, or 0 if not applicable (create, destroy, etc).
* @param size Size of the message.
* @param wait Time spent waiting for core (only the time for THIS message)
- * @return #GNUNET_YES if connection was destroyed, #GNUNET_NO otherwise.
*/
-typedef int
+typedef void
(*GCP_sent) (void *cls,
- struct CadetConnection *c, int sent,
- uint16_t type, uint32_t pid, int fwd, size_t size,
+ struct CadetConnection *c, int fwd, int sent,
+ uint16_t type, uint16_t payload_type, uint32_t pid,
+ size_t size,
struct GNUNET_TIME_Relative wait);
/**
GCP_connect (struct CadetPeer *peer);
/**
- * Free a transmission that was already queued with all resources
- * associated to the request.
- *
- * If connection was marked to be destroyed, and this was the last queued
- * message on it, the connection will be free'd as a result.
- *
- * @param queue Queue handler to cancel.
- * @param clear_cls Is it necessary to free associated cls?
- * @param sent Was it really sent? (Could have been canceled)
- * @param pid PID, if relevant (was sent and was a payload message).
- *
- * @return #GNUNET_YES if connection was destroyed as a result,
- * #GNUNET_NO otherwise.
- */
-int
-GCP_queue_destroy (struct CadetPeerQueue *queue, int clear_cls,
- int sent, uint32_t pid);
-
-/**
- * @brief Queue and pass message to core when possible.
+ * @brief Send a message to another peer (using CORE).
*
* @param peer Peer towards which to queue the message.
- * @param cls Closure (@c type dependant). It will be used by queue_send to
- * build the message to be sent if not already prebuilt.
- * @param type Type of the message.
- * @param payload_type Type of the message's payload
+ * @param message Message to send.
+ * @param payload_type Type of the message's payload, for debug messages.
* 0 if the message is a retransmission (unknown payload).
* UINT16_MAX if the message does not have payload.
* @param payload_id ID of the payload (MID, ACK #, etc)
- * @param size Size of the message.
* @param c Connection this message belongs to (can be NULL).
* @param fwd Is this a message going root->dest? (FWD ACK are NOT FWD!)
- * @param cont Continuation to be called once CORE has taken the message.
+ * @param cont Continuation to be called once CORE has sent the message.
* @param cont_cls Closure for @c cont.
- *
- * @return Handle to cancel the message before it is sent. Once cont is called
- * message has been sent and therefore the handle is no longer valid.
*/
struct CadetPeerQueue *
-GCP_queue_add (struct CadetPeer *peer,
- void *cls,
- uint16_t type,
- uint16_t payload_type,
- uint32_t payload_id,
- size_t size,
- struct CadetConnection *c,
- int fwd,
- GCP_sent cont,
- void *cont_cls);
-
-/**
- * Cancel all queued messages to a peer that belong to a certain connection.
- *
- * @param peer Peer towards whom to cancel.
- * @param c Connection whose queued messages to cancel. Might be destroyed by
- * the sent continuation call.
- */
-void
-GCP_queue_cancel (struct CadetPeer *peer, struct CadetConnection *c);
-
-/**
- * Get the first message for a connection and unqueue it.
- *
- * Only tunnel (or higher) level messages are unqueued. Connection specific
- * messages are silently destroyed upon encounter.
- *
- * @param peer Neighboring peer.
- * @param c Connection.
- * @param destroyed[in/out] Was the connection destroyed as a result?.
- * Can NOT be NULL.
- *
- *
- * @return First message for this connection.
- */
-struct GNUNET_MessageHeader *
-GCP_connection_pop (struct CadetPeer *peer,
- struct CadetConnection *c,
- int *destroyed);
+GCP_send (struct CadetPeer *peer,
+ const struct GNUNET_MessageHeader *message,
+ uint16_t payload_type,
+ uint32_t payload_id,
+ struct CadetConnection *c,
+ int fwd,
+ GCP_sent cont,
+ void *cont_cls);
/**
- * Unlock a possibly locked queue for a connection.
+ * Cancel sending a message. Message must have been sent with
+ * #GCP_send before. May not be called after the notify sent
+ * callback has been called.
*
- * If there is a message that can be sent on this connection, call core for it.
- * Otherwise (if core transmit is already called or there is no sendable
- * message) do nothing.
+ * It does NOT call the continuation given to #GCP_send.
*
- * @param peer Peer who keeps the queue.
- * @param c Connection whose messages to unlock.
+ * @param q Queue handle to cancel
*/
void
-GCP_queue_unlock (struct CadetPeer *peer, struct CadetConnection *c);
+GCP_send_cancel (struct CadetPeerQueue *q);
/**
* Set tunnel.
{
GNUNET_break (0);
GCT_debug (t, GNUNET_ERROR_TYPE_ERROR);
- GCP_debug (t->peer, GNUNET_ERROR_TYPE_ERROR);
}
return NULL;
}
*
* @param t Tunnel on which the message came.
* @param message Payload of KX message.
+ *
+ * FIXME: not needed anymore
+ * - substitute with call to kx_ax
+ * - eliminate encapsulation
*/
void
GCT_handle_kx (struct CadetTunnel *t,
}
-/**
- * Sends an already built and encrypted message on a tunnel, choosing the best
- * connection. Useful for re-queueing messages queued on a destroyed connection.
- *
- * @param message Message to send. Function modifies it.
- * @param t Tunnel on which this message is transmitted.
- */
-void
-GCT_resend_message (const struct GNUNET_MessageHeader *message,
- struct CadetTunnel *t)
-{
- struct CadetConnection *c;
- int fwd;
-
- c = tunnel_get_connection (t);
- if (NULL == c)
- {
- /* TODO queue in tunnel, marked as encrypted */
- LOG (GNUNET_ERROR_TYPE_DEBUG, "No connection available, dropping.\n");
- return;
- }
- fwd = GCC_is_origin (c, GNUNET_YES);
- GNUNET_break (NULL == GCC_send_prebuilt_message (message, UINT16_MAX, 0,
- c, fwd,
- GNUNET_YES, NULL, NULL));
-}
-
-
/**
* Is the tunnel directed towards the local peer?
*
GCT_send_ax_kx (struct CadetTunnel *t, int force_reply);
-/**
- * Sends an already built and encrypted message on a tunnel, choosing the best
- * connection. Useful for re-queueing messages queued on a destroyed connection.
- *
- * @param message Message to send. Function modifies it.
- * @param t Tunnel on which this message is transmitted.
- */
-void
-GCT_resend_message (const struct GNUNET_MessageHeader *message,
- struct CadetTunnel *t);
-
-
/**
* Is the tunnel directed towards the local peer?
*