*/
struct MeshConnection *c;
+ /**
+ * Is FWD in c?
+ */
+ int fwd;
+
/**
* Channel this message belongs to, if known.
*/
struct MeshConnection *c;
/**
- * Transmission queue to core DLL head
- */
- struct MeshPeerQueue *queue_head;
-
- /**
- * Transmission queue to core DLL tail
- */
- struct MeshPeerQueue *queue_tail;
-
- /**
- * How many messages are in the queue to this peer.
+ * How many messages are in the queue on this connection.
*/
unsigned int queue_n;
*/
unsigned int queue_max;
- /**
- * Handle for queued transmissions
- */
- struct GNUNET_CORE_TransmitHandle *core_transmit;
-
/**
* ID of the last packet sent towards the peer.
*/
*/
struct MeshTunnel2 *tunnel;
+ /**
+ * Connections that go through this peer, indexed by tid;
+ */
+ struct GNUNET_CONTAINER_MultiHashMap *connections;
+
+ /**
+ * Handle for queued transmissions
+ */
+ struct GNUNET_CORE_TransmitHandle *core_transmit;
+
+ /**
+ * Transmission queue to core DLL head
+ */
+ struct MeshPeerQueue *queue_head;
+
+ /**
+ * Transmission queue to core DLL tail
+ */
+ struct MeshPeerQueue *queue_tail;
+
+ /**
+ * How many messages are in the queue to this peer.
+ */
+ unsigned int queue_n;
};
struct MeshReliableMessage *tail_sent;
/**
- * Messages pending
+ * Messages pending to send.
*/
unsigned int n_sent;
struct MeshReliableMessage *head_recv;
struct MeshReliableMessage *tail_recv;
+ /**
+ * Messages received.
+ */
+ unsigned int n_recv;
+
+ /**
+ * Can we send data to the client?
+ */
+ int client_ready;
+
/**
* Task to resend/poll in case no ACK is received.
*/
}
+/**
+ * Get the hop in a connection.
+ *
+ * @param c Connection.
+ * @param fwd Next hop?
+ *
+ * @return Next peer in the connection.
+ */
+static struct MeshPeer *
+connection_get_hop (struct MeshConnection *c, int fwd)
+{
+ if (fwd)
+ return connection_get_next_hop (c);
+ return connection_get_prev_hop (c);
+}
+
/**
* Check if client has registered with the service and has not disconnected
*
}
+/**
+ * Get the total buffer space for a tunnel
+ */
+static unsigned int
+tunnel_get_buffer (struct MeshTunnel2 *t, int fwd)
+{
+ struct MeshConnection *c;
+ struct MeshFlowControl *fc;
+ unsigned int buffer;
+
+ for (buffer = 0, c = t->connection_head; NULL != c; c = c->next)
+ {
+ if (c->state != MESH_CONNECTION_READY)
+ continue;
+
+ fc = fwd ? &c->fwd_fc : &c->bck_fc;
+ buffer += fc->last_ack_recv - fc->last_pid_sent;
+ }
+
+ return buffer;
+}
+
+
/**
* FIXME FIXME FIXME FIXME FIXME FIXME FIXME FIXME FIXME FIXME FIXME
* Encrypt data with the tunnel key.
size_t size;
uint16_t type;
- neighbor = fwd ? connection_get_next_hop (c) : connection_get_prev_hop (c);
+ neighbor = connection_get_hop (c, fwd);
if (NULL == neighbor)
{
GNUNET_break (0);
/**
- * Iterator over all the peers to remove the oldest not-used entry.
+ * Destroy the peer_info and free any allocated resources linked to it
+ *
+ * @param peer The peer_info to destroy.
+ *
+ * @return GNUNET_OK on success
+ */
+static int
+peer_destroy (struct MeshPeer *peer)
+{
+ struct GNUNET_PeerIdentity id;
+ struct MeshPeerPath *p;
+ struct MeshPeerPath *nextp;
+
+ GNUNET_PEER_resolve (peer->id, &id);
+ GNUNET_PEER_change_rc (peer->id, -1);
+
+ if (GNUNET_YES !=
+ GNUNET_CONTAINER_multihashmap_remove (peers, &id.hashPubKey, peer))
+ {
+ GNUNET_break (0);
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "removing peer %s, not in hashmap\n", GNUNET_i2s (&id));
+ }
+ if (NULL != peer->dhtget)
+ {
+ GNUNET_DHT_get_stop (peer->dhtget);
+ }
+ p = peer->path_head;
+ while (NULL != p)
+ {
+ nextp = p->next;
+ GNUNET_CONTAINER_DLL_remove (peer->path_head, peer->path_tail, p);
+ path_destroy (p);
+ p = nextp;
+ }
+ tunnel_destroy_empty (peer->tunnel);
+ GNUNET_free (peer);
+ return GNUNET_OK;
+}
+
+
+/**
+ * Returns if peer is used (has a tunnel, is neighbor).
+ *
+ * @peer Peer to check.
+ *
+ * @return GNUNET_YES if peer is in use.
+ */
+static int
+peer_is_used (struct MeshPeer *peer)
+{
+ struct MeshPeerPath *p;
+
+ if (NULL != peer->tunnel)
+ return GNUNET_YES;
+
+ for (p = peer->path_head; NULL != p; p = p->next)
+ {
+ if (p->length < 3)
+ return GNUNET_YES;
+ }
+ return GNUNET_NO;
+}
+
+/**
+ * Iterator over all the peers to get the oldest timestamp.
*
* @param cls Closure (unsued).
* @param key ID of the peer.
* @param value Peer_Info of the peer.
+ */
+static int
+peer_get_oldest (void *cls,
+ const struct GNUNET_HashCode *key,
+ void *value)
+{
+ struct MeshPeer *p = value;
+ struct GNUNET_TIME_Absolute *abs = cls;
+
+ /* Don't count active peers */
+ if (GNUNET_YES == peer_is_used (p))
+ return GNUNET_YES;
+
+ if (abs->abs_value < p->last_contact.abs_value)
+ abs->abs_value = p->last_contact.abs_value;
+
+ return GNUNET_YES;
+}
+
+
+/**
+ * Iterator over all the peers to remove the oldest entry.
*
- * FIXME implement
+ * @param cls Closure (unsued).
+ * @param key ID of the peer.
+ * @param value Peer_Info of the peer.
*/
static int
peer_timeout (void *cls,
const struct GNUNET_HashCode *key,
void *value)
{
+ struct MeshPeer *p = value;
+ struct GNUNET_TIME_Absolute *abs = cls;
+
+ if (p->last_contact.abs_value == abs->abs_value &&
+ GNUNET_NO == peer_is_used (p))
+ {
+ peer_destroy (p);
+ return GNUNET_NO;
+ }
return GNUNET_YES;
}
+/**
+ * Delete oldest unused peer.
+ */
+static void
+peer_delete_oldest (void)
+{
+ struct GNUNET_TIME_Absolute abs;
+
+ abs = GNUNET_TIME_UNIT_FOREVER_ABS;
+
+ GNUNET_CONTAINER_multihashmap_iterate (peers,
+ &peer_get_oldest,
+ &abs);
+ GNUNET_CONTAINER_multihashmap_iterate (peers,
+ &peer_timeout,
+ &abs);
+}
+
+
/**
* Retrieve the MeshPeer stucture associated with the peer, create one
* and insert it in the appropriate structures if the peer is not known yet.
peer = GNUNET_new (struct MeshPeer);
if (GNUNET_CONTAINER_multihashmap_size (peers) > max_peers)
{
- GNUNET_CONTAINER_multihashmap_iterate (peers,
- &peer_timeout,
- NULL);
+ peer_delete_oldest ();
}
GNUNET_CONTAINER_multihashmap_put (peers, &peer_id->hashPubKey, peer,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
return best_p;
}
+static int
+queue_is_sendable (struct MeshPeerQueue *q)
+{
+ struct MeshFlowControl *fc;
+
+ /* Is PID-independent? */
+ switch (q->type)
+ {
+ case GNUNET_MESSAGE_TYPE_MESH_ACK:
+ case GNUNET_MESSAGE_TYPE_MESH_POLL:
+ return GNUNET_YES;
+ }
+
+ /* Is PID allowed? */
+ fc = q->fwd ? &q->c->fwd_fc : &q->c->bck_fc;
+ if (GMC_is_pid_bigger (fc->last_ack_recv, fc->last_pid_sent))
+ return GNUNET_YES;
+
+ return GNUNET_NO;
+}
+
+
+/**
+ * Get first sendable message.
+ *
+ * @param peer The destination peer.
+ *
+ * @return Best current known path towards the peer, if any.
+ */
+static struct MeshPeerQueue *
+peer_get_first_message (const struct MeshPeer *peer)
+{
+ struct MeshPeerQueue *q;
+
+ for (q = peer->queue_head; NULL != q; q = q->next)
+ {
+ if (queue_is_sendable (q))
+ return q;
+ }
+
+ return NULL;
+}
+
/**
* Try to establish a new connection to this peer in the given tunnel.
}
+/**
+ * Get the first transmittable message for a connection.
+ *
+ * @param c Connection.
+ * @param fwd Is this FWD?
+ *
+ * @return First transmittable message.
+ */
+static struct MeshPeerQueue *
+connection_get_first_message (struct MeshConnection *c, int fwd)
+{
+ struct MeshPeerQueue *q;
+ struct MeshPeer *p;
+
+ p = connection_get_hop (c, fwd);
+
+ for (q = p->queue_head; NULL != q; q = q->next)
+ {
+ if (q->c != c)
+ continue;
+ if (queue_is_sendable (q))
+ return q;
+ }
+
+ return NULL;
+}
+
+
+
/**
* @brief Re-initiate traffic on this connection if necessary.
*
static void
connection_unlock_queue (struct MeshConnection *c, int fwd)
{
- struct MeshFlowControl *fc;
struct MeshPeer *peer;
struct MeshPeerQueue *q;
size_t size;
- peer = fwd ? connection_get_next_hop(c) : connection_get_prev_hop(c);
- fc = fwd ? &c->fwd_fc : &c->bck_fc;
+ peer = connection_get_hop (c, fwd);
- if (NULL != fc->core_transmit)
+ if (NULL != peer->core_transmit)
return; /* Already unlocked */
- q = fc->queue_head;
+ q = connection_get_first_message (c, fwd);
if (NULL == q)
return; /* Nothing to transmit */
size = q->size;
- fc->core_transmit =
+ peer->core_transmit =
GNUNET_CORE_notify_transmit_ready (core_handle,
GNUNET_NO,
0,
struct MeshPeerQueue *q;
struct MeshPeerQueue *next;
struct MeshFlowControl *fc;
+ struct MeshPeer *peer;
if (NULL == c)
{
return;
}
fc = fwd ? &c->fwd_fc : &c->bck_fc;
- for (q = fc->queue_head; NULL != q; q = next)
+ peer = connection_get_hop (c, fwd);
+
+ for (q = peer->queue_head; NULL != q; q = next)
{
next = q->next;
if (q->c == c)
queue_destroy (q, GNUNET_YES);
}
}
- if (NULL == fc->queue_head)
+ if (NULL == peer->queue_head)
{
- if (NULL != fc->core_transmit)
+ if (NULL != peer->core_transmit)
{
- GNUNET_CORE_notify_transmit_ready_cancel (fc->core_transmit);
- fc->core_transmit = NULL;
+ GNUNET_CORE_notify_transmit_ready_cancel (peer->core_transmit);
+ peer->core_transmit = NULL;
}
if (GNUNET_SCHEDULER_NO_TASK != fc->poll_task)
{
}
-/**
- * Destroy the peer_info and free any allocated resources linked to it
- *
- * @param peer The peer_info to destroy.
- *
- * @return GNUNET_OK on success
- */
-static int
-peer_destroy (struct MeshPeer *peer)
-{
- struct GNUNET_PeerIdentity id;
- struct MeshPeerPath *p;
- struct MeshPeerPath *nextp;
-
- GNUNET_PEER_resolve (peer->id, &id);
- GNUNET_PEER_change_rc (peer->id, -1);
-
- if (GNUNET_YES !=
- GNUNET_CONTAINER_multihashmap_remove (peers, &id.hashPubKey, peer))
- {
- GNUNET_break (0);
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "removing peer %s, not in hashmap\n", GNUNET_i2s (&id));
- }
- if (NULL != peer->dhtget)
- {
- GNUNET_DHT_get_stop (peer->dhtget);
- }
- p = peer->path_head;
- while (NULL != p)
- {
- nextp = p->next;
- GNUNET_CONTAINER_DLL_remove (peer->path_head, peer->path_tail, p);
- path_destroy (p);
- p = nextp;
- }
- tunnel_destroy_empty (peer->tunnel);
- GNUNET_free (peer);
- return GNUNET_OK;
-}
-
-
/**
* Remove all paths that rely on a direct connection between p1 and p2
* from the peer itself and notify all tunnels about it.
/**
- * Send up to 64 buffered messages to the client for in order delivery.
+ * Send a buffered message to the client, for in order delivery or
+ * as result of client ACK.
*
* @param ch Channel on which to empty the message buffer.
* @param c Client to send to.
struct MeshChannelReliability *rel)
{
struct MeshReliableMessage *copy;
- struct MeshReliableMessage *next;
uint32_t *mid;
- if (GNUNET_NO == ch->reliable)
+ if (GNUNET_NO == rel->client_ready)
{
- GNUNET_break (0);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "client not ready\n");
return;
}
mid = rel == ch->bck_rel ? &ch->mid_recv_fwd : &ch->mid_recv_bck;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "send_buffered_data\n");
- for (copy = rel->head_recv; NULL != copy; copy = next)
+ copy = rel->head_recv;
+ if (NULL != copy)
{
- next = copy->next;
- if (copy->mid == *mid)
+ if (copy->mid == *mid || GNUNET_NO == ch->reliable)
{
struct GNUNET_MESH_Data *msg = (struct GNUNET_MESH_Data *) ©[1];
" have %u! now expecting %u\n",
copy->mid, *mid + 1);
channel_send_client_data (ch, msg, (rel == ch->bck_rel));
+ rel->n_recv--;
*mid = *mid + 1;
GNUNET_CONTAINER_DLL_remove (rel->head_recv, rel->tail_recv, copy);
GNUNET_free (copy);
else
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- " don't have %u, next is %u\n",
+ " reliable && don't have %u, next is %u\n",
*mid,
copy->mid);
return;
/**
- * We have received a message out of order, buffer it until we receive
- * the missing one and we can feed the rest to the client.
+ * We have received a message out of order, or the client is not ready.
+ * Buffer it until we receive an ACK from the client or the missing
+ * message from the channel.
*
* @param msg Message to buffer.
* @param rel Reliability data to the corresponding direction.
copy->rel = rel;
memcpy (©[1], msg, size);
+ rel->n_recv++;
+
// FIXME do something better than O(n), although n < 64...
// FIXME start from the end (most messages are the latest ones)
for (prev = rel->head_recv; NULL != prev; prev = prev->next)
struct MeshChannelReliability *rel = cls;
struct MeshReliableMessage *copy;
struct MeshPeerQueue *q;
- struct MeshFlowControl *fc;
struct MeshChannel *ch;
struct MeshConnection *c;
struct GNUNET_MESH_Data *payload;
+ struct MeshPeer *hop;
int fwd;
rel->retry_task = GNUNET_SCHEDULER_NO_TASK;
payload = (struct GNUNET_MESH_Data *) ©[1];
fwd = (rel == ch->fwd_rel);
c = tunnel_get_connection (ch->t, fwd);
- fc = fwd ? &c->fwd_fc : &c->bck_fc;
- for (q = fc->queue_head; NULL != q; q = q->next)
+ hop = connection_get_hop (c, fwd);
+ for (q = hop->queue_head; NULL != q; q = q->next)
{
if (ntohs (payload->header.type) == q->type && ch == q->ch)
{
}
+
+/**
+ * Send ACK on one or more connections due to buffer space to the client.
+ */
+static void
+channel_send_ack (struct MeshChannel *ch, uint32_t buffer, int fwd)
+{
+ struct MeshTunnel2 *t = ch->t;
+ struct MeshConnection *c;
+ struct MeshFlowControl *fc;
+ uint32_t allowed;
+ uint32_t to_allow;
+ unsigned int cs;
+
+ /* Count connections, how many messages are already allowed */
+ for (cs = 0, allowed = 0, c = t->connection_head; NULL != c; c = c->next)
+ {
+ fc = fwd ? &c->fwd_fc : &c->bck_fc;
+ if (GMC_is_pid_bigger(fc->last_pid_recv, fc->last_ack_sent))
+ {
+ GNUNET_break (0);
+ continue;
+ }
+ allowed += fc->last_ack_sent - fc->last_pid_recv;
+ cs++;
+ }
+
+ /* Make sure there is no overflow */
+ if (allowed > buffer)
+ {
+ GNUNET_break (0);
+ return;
+ }
+
+ /* Authorize connections to send more data */
+ to_allow = buffer - allowed;
+ for (c = t->connection_head; NULL != c && to_allow > 0; c = c->next)
+ {
+ fc = fwd ? &c->fwd_fc : &c->bck_fc;
+ if (fc->last_ack_sent - fc->last_pid_recv > 64 / 3)
+ {
+ continue;
+ }
+ send_ack (c, fc->last_ack_sent + 1, fwd);
+ to_allow--;
+ }
+
+ GNUNET_break (to_allow == 0);
+}
+
+
/**
* Send keepalive packets for a connection.
*
if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
return;
- connection_keepalive (c, GNUNET_YES);
+ connection_maintain (c, GNUNET_YES);
c->fwd_maintenance_task = GNUNET_SCHEDULER_add_delayed (refresh_connection_time,
&connection_fwd_keepalive,
c);
if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
return;
- connection_keepalive (c, GNUNET_NO);
+ connection_maintain (c, GNUNET_NO);
c->bck_maintenance_task = GNUNET_SCHEDULER_add_delayed (refresh_connection_time,
&connection_bck_keepalive,
c);
return;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying tunnel %s\n",
- GNUNET_i2s (GNUNET_PEER_resolve2 (c->t->peer->id)));
+ GNUNET_i2s (GNUNET_PEER_resolve2 (t->peer->id)));
if (GNUNET_YES != GNUNET_CONTAINER_multihashmap_remove (tunnels, &t->id, t))
GNUNET_break (0);
}
+/**
+ * Iterator to notify all connections of a broken link. Mark connections
+ * to destroy after all traffic has been sent.
+ *
+ * @param cls Closure (peer disconnected).
+ * @param key Current key code (tid).
+ * @param value Value in the hash map (connection).
+ *
+ * @return GNUNET_YES if we should continue to iterate,
+ * GNUNET_NO if not.
+ */
+static int
+connection_broken (void *cls,
+ const struct GNUNET_HashCode *key,
+ void *value)
+{
+ struct MeshPeer *peer = cls;
+ struct MeshConnection *c = value;
+ struct GNUNET_MESH_ConnectionBroken msg;
+ int fwd;
+
+ fwd = peer == connection_get_prev_hop (c);
+ connection_cancel_queues (c, !fwd);
+
+ msg.header.size = htons (sizeof (struct GNUNET_MESH_ConnectionBroken));
+ msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_CONNECTION_BROKEN);
+ msg.cid = htonl (c->id);
+ msg.tid = c->t->id;
+ msg.peer1 = my_full_id;
+ msg.peer2 = *GNUNET_PEER_resolve2 (peer->id);
+ send_prebuilt_message_connection (&msg.header, c, NULL, fwd);
+ c->destroy = GNUNET_YES;
+
+ return GNUNET_YES;
+}
+
/******************************************************************************/
/**************** MESH NETWORK HANDLER HELPERS ***********************/
/******************************************************************************/
static void
queue_destroy (struct MeshPeerQueue *queue, int clear_cls)
{
+ struct MeshPeer *peer;
struct MeshFlowControl *fc;
int fwd;
- fwd = (queue->peer == connection_get_next_hop (queue->c));
+ fwd = queue->fwd;
+ peer = queue->peer;
fc = fwd ? &queue->c->fwd_fc : &queue->c->bck_fc;
if (GNUNET_YES == clear_cls)
}
GNUNET_free_non_null (queue->cls);
}
- GNUNET_CONTAINER_DLL_remove (fc->queue_head, fc->queue_tail, queue);
+ GNUNET_CONTAINER_DLL_remove (peer->queue_head, peer->queue_tail, queue);
fc->queue_n--;
+ peer->queue_n--;
+ if (NULL != queue->c)
+ {
+ queue->c->pending_messages--;
+ if (NULL != queue->c->t)
+ {
+ queue->c->t->pending_messages--;
+ }
+ }
GNUNET_free (queue);
}
queue_send (void *cls, size_t size, void *buf)
{
struct MeshPeer *peer = cls;
- const struct GNUNET_PeerIdentity *dst_id;
+ struct MeshFlowControl *fc;
+ struct MeshConnection *c;
struct GNUNET_MessageHeader *msg;
struct MeshPeerQueue *queue;
struct MeshTunnel2 *t;
- struct MeshFlowControl *fc;
- struct MeshConnection *c;
+ const struct GNUNET_PeerIdentity *dst_id;
size_t data_size;
uint32_t pid;
uint16_t type;
int fwd;
- c = queue->c;
- fwd = (queue->peer == connection_get_next_hop (c));
- fc = fwd ? &c->fwd_fc : &c->bck_fc;
-
- if (NULL == fc)
- {
- GNUNET_break (0);
- return 0;
- }
- fc->core_transmit = NULL;
-
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "* Queue send\n");
if (NULL == buf || 0 == size)
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "* Buffer size 0.\n");
return 0;
}
- queue = fc->queue_head;
- /* Queue has no traffic */
+ /* Initialize */
+ queue = peer_get_first_message (peer);
if (NULL == queue)
{
GNUNET_break (0); /* Core tmt_rdy should've been canceled */
return 0;
}
+ queue->peer->core_transmit = NULL;
+ c = queue->c;
+ fwd = queue->fwd;
+ fc = fwd ? &c->fwd_fc : &c->bck_fc;
+
dst_id = GNUNET_PEER_resolve2 (peer->id);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "* towards %s\n", GNUNET_i2s (dst_id));
if (queue->size > size)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "* not enough room, reissue\n");
- fc->core_transmit =
+ peer->core_transmit =
GNUNET_CORE_notify_transmit_ready (core_handle,
GNUNET_NO,
0,
data_size = 0;
}
- fc->queue_n--;
-
if (0 < drop_percent &&
GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, 101) < drop_percent)
{
}
/* If more data in queue, send next */
- queue = fc->queue_head;
+ queue = peer_get_first_message (peer);
if (NULL != queue)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "* more data!\n");
- if (NULL == fc->core_transmit) {
- fc->core_transmit =
+ if (NULL == peer->core_transmit) {
+ peer->core_transmit =
GNUNET_CORE_notify_transmit_ready(core_handle,
0,
0,
return; /* Drop this message */
}
- fc->queue_n++;
if (GMC_is_pid_bigger(fc->last_pid_sent + 1, fc->last_ack_recv) &&
GNUNET_SCHEDULER_NO_TASK == fc->poll_task)
fc->poll_task = GNUNET_SCHEDULER_add_delayed (fc->poll_time,
queue->peer = dst;
queue->c = c;
queue->ch = ch;
+ queue->fwd = fwd;
if (100 <= priority)
- GNUNET_CONTAINER_DLL_insert (fc->queue_head, fc->queue_tail, queue);
+ GNUNET_CONTAINER_DLL_insert (dst->queue_head, dst->queue_tail, queue);
else
- GNUNET_CONTAINER_DLL_insert_tail (fc->queue_head, fc->queue_tail, queue);
+ GNUNET_CONTAINER_DLL_insert_tail (dst->queue_head, dst->queue_tail, queue);
- if (NULL == fc->core_transmit)
+ if (NULL == dst->core_transmit)
{
- fc->core_transmit =
+ dst->core_transmit =
GNUNET_CORE_notify_transmit_ready (core_handle,
0,
0,
}
c->pending_messages++;
c->t->pending_messages++;
+ fc->queue_n++;
+ dst->queue_n++;
}
fc = fwd ? &c->fwd_fc : &c->bck_fc;
/* Check if origin is as expected */
- neighbor = fwd ? connection_get_prev_hop (c) : connection_get_next_hop (c);
+ neighbor = connection_get_hop (c, fwd);
if (peer_get (peer)->id != neighbor->id)
{
GNUNET_break_op (0);
GNUNET_YES : GNUNET_NO;
/* Check if origin is as expected */
- neighbor = fwd ? connection_get_prev_hop (c) : connection_get_next_hop (c);
+ neighbor = connection_get_hop (c, fwd);
if (peer_get (peer)->id != neighbor->id)
{
GNUNET_break_op (0);
copy = GNUNET_malloc (sizeof (struct MeshReliableMessage)
+ sizeof(struct GNUNET_MESH_Data)
+ size);
-
copy->mid = *mid;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "!!! DATA %u\n", copy->mid);
copy->timestamp = GNUNET_TIME_absolute_get ();
payload->chid = htonl (ch->gid);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
" calling generic handler...\n");
- if (chid < GNUNET_MESH_LOCAL_CHANNEL_ID_SERV)
- handle_data (ch->t, payload, GNUNET_YES);
- else
- handle_data (ch->t, payload, GNUNET_NO);
+ send_prebuilt_message_channel (&payload->header, ch, fwd);
}
+ if (tunnel_get_buffer (ch->t, fwd) > 0)
+ send_local_ack (ch, c, fwd);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "receive done OK\n");
GNUNET_SERVER_receive_done (client, GNUNET_OK);
const struct GNUNET_MessageHeader *message)
{
struct GNUNET_MESH_LocalAck *msg;
+ struct MeshChannelReliability *rel;
struct MeshChannel *ch;
struct MeshClient *c;
MESH_ChannelNumber chid;
+ int fwd;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got a local ACK\n");
+
/* Sanity check for client registration */
if (NULL == (c = client_get (client)))
{
msg = (struct GNUNET_MESH_LocalAck *) message;
- /* Tunnel exists? */
+ /* Channel exists? */
chid = ntohl (msg->channel_id);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " on channel %X\n", chid);
ch = channel_get_by_local_id (c, chid);
return;
}
- /* Does client own tunnel? I.E: Is this an ACK for BCK traffic? */
- if (chid < GNUNET_MESH_LOCAL_CHANNEL_ID_SERV)
- {
- /* The client owns the channel, ACK is for data to_origin, send BCK ACK. */
- ch->prev_fc.last_ack_recv++;
- tunnel_send_ack (t, GNUNET_MESSAGE_TYPE_MESH_LOCAL_ACK, GNUNET_NO);
- }
- else
- {
- /* The client doesn't own the channel, this ACK is for FWD traffic. */
- t->next_fc.last_ack_recv++;
- tunnel_send_ack (t, GNUNET_MESSAGE_TYPE_MESH_LOCAL_ACK, GNUNET_YES);
- }
+ fwd = chid < GNUNET_MESH_LOCAL_CHANNEL_ID_SERV;
+ rel = fwd ? ch->fwd_rel : ch->bck_rel;
+
+ rel->client_ready = GNUNET_YES;
+ channel_send_client_buffered_data (ch, c, rel);
+ channel_send_ack (ch, 64 - rel->n_recv, fwd);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
struct GNUNET_MESH_LocalMonitor *msg;
msg = GNUNET_malloc (sizeof(struct GNUNET_MESH_LocalMonitor));
- msg->channel_id = htonl (ch->id);
+ msg->channel_id = htonl (ch->gid);
msg->header.size = htons (sizeof (struct GNUNET_MESH_LocalMonitor));
msg->header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_INFO_TUNNELS);
c->id,
&msg->owner,
ntohl (msg->channel_id));
- ch = channel_get (&msg->owner, ntohl (msg->channel_id));
+// ch = channel_get (&msg->owner, ntohl (msg->channel_id));
+ ch = NULL; // FIXME
if (NULL == ch)
{
/* We don't know the tunnel */
static void
core_connect (void *cls, const struct GNUNET_PeerIdentity *peer)
{
- struct MeshPeer *peer_info;
+ struct MeshPeer *pi;
struct MeshPeerPath *path;
DEBUG_CONN ("Peer connected\n");
DEBUG_CONN (" %s\n", GNUNET_i2s (&my_full_id));
- peer_info = peer_get (peer);
- if (myid == peer_info->id)
+ pi = peer_get (peer);
+ if (myid == pi->id)
{
DEBUG_CONN (" (self)\n");
path = path_new (1);
{
DEBUG_CONN (" %s\n", GNUNET_i2s (peer));
path = path_new (2);
- path->peers[1] = peer_info->id;
- GNUNET_PEER_change_rc (peer_info->id, 1);
+ path->peers[1] = pi->id;
+ GNUNET_PEER_change_rc (pi->id, 1);
GNUNET_STATISTICS_update (stats, "# peers", 1, GNUNET_NO);
}
path->peers[0] = myid;
GNUNET_PEER_change_rc (myid, 1);
- peer_add_path (peer_info, path, GNUNET_YES);
+ peer_add_path (pi, path, GNUNET_YES);
+
+ pi->connections = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_YES);
return;
}
core_disconnect (void *cls, const struct GNUNET_PeerIdentity *peer)
{
struct MeshPeer *pi;
- struct MeshPeerQueue *q;
- struct MeshPeerQueue *n;
- struct MeshFlowControl *fc;
DEBUG_CONN ("Peer disconnected\n");
pi = GNUNET_CONTAINER_multihashmap_get (peers, &peer->hashPubKey);
GNUNET_break (0);
return;
}
- fc = pi->fc;
- if (NULL != fc)
- {
- GNUNET_break (0);
- return;
- }
- pi->fc = NULL;
- q = fc->queue_head;
- while (NULL != q)
- {
- n = q->next;
- queue_destroy (q, GNUNET_YES);
- q = n;
- }
- if (NULL != fc->core_transmit)
- GNUNET_CORE_notify_transmit_ready_cancel (fc->core_transmit);
- if (GNUNET_SCHEDULER_NO_TASK != fc->poll_task)
- GNUNET_SCHEDULER_cancel (fc->poll_task);
+ peer_remove_path (pi, myid, pi->id);
- peer_remove_path (pi, pi->id, myid);
+ GNUNET_CONTAINER_multihashmap_iterate (pi->connections,
+ connection_broken,
+ pi);
+ GNUNET_CONTAINER_multihashmap_destroy (pi->connections);
+ pi->connections = NULL;
if (myid == pi->id)
{
DEBUG_CONN (" (self)\n");
}
GNUNET_STATISTICS_update (stats, "# peers", -1, GNUNET_NO);
- GNUNET_free (fc);
return;
}