};
+/**
+ * UDP ACK Message-Packet header (after defragmentation).
+ */
+struct UDP_ACK_Message
+{
+ /**
+ * Message header.
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Desired delay for flow control
+ */
+ uint32_t delay;
+
+ /**
+ * What is the identity of the sender
+ */
+ struct GNUNET_PeerIdentity sender;
+};
+
+
/**
* Network format for IPv4 addresses.
*/
struct GNUNET_TIME_Absolute valid_until;
GNUNET_SCHEDULER_TaskIdentifier invalidation_task;
+
+ /*
+ * Desired delay for next sending we send to other peer
+ */
+ struct GNUNET_TIME_Relative flow_delay_for_other_peer;
+
+ /*
+ * Desired delay for next sending we received from other peer
+ */
+ struct GNUNET_TIME_Absolute flow_delay_from_other_peer;
};
*/
size_t addr_len;
+ struct GNUNET_PeerIdentity id;
+
};
return psc.result;
}
+int inbound_session_by_addr_iterator (void *cls,
+ const GNUNET_HashCode * key,
+ void *value)
+{
+ struct PeerSessionIteratorContext *psc = cls;
+ struct Session *s = value;
+ if (s->addrlen == psc->addrlen)
+ {
+ if (0 == memcmp (&s[1], psc->addr, s->addrlen))
+ psc->result = s;
+ }
+ if (psc->result != NULL)
+ return GNUNET_NO;
+ else
+ return GNUNET_YES;
+};
+
+/**
+ * Lookup the session for the given peer just by address.
+ *
+ * @param plugin the plugin
+ * @param addr address
+ * @param addrlen address length
+ * @return NULL if we have no session
+ */
+struct Session *
+find_inbound_session_by_addr (struct Plugin *plugin, const void * addr, size_t addrlen)
+{
+ struct PeerSessionIteratorContext psc;
+ psc.result = NULL;
+ psc.addrlen = addrlen;
+ psc.addr = addr;
+
+ GNUNET_CONTAINER_multihashmap_iterate (plugin->inbound_sessions, &inbound_session_by_addr_iterator, &psc);
+ return psc.result;
+}
+
/**
* Destroy a session, plugin is being unloaded.
if ((force_address == GNUNET_SYSERR) && (session == NULL))
return GNUNET_SYSERR;
+ s = NULL;
/* safety check: comparing address to address stored in session */
if ((session != NULL) && (addr != NULL) && (addrlen != 0))
{
udp->sender = *plugin->env->my_identity;
memcpy (&udp[1], msgbuf, msgbuf_size);
+ if (s != NULL)
+ {
+ struct GNUNET_TIME_Absolute now = GNUNET_TIME_absolute_get();
+ if (s->flow_delay_from_other_peer.abs_value > now.abs_value)
+ {
+ struct GNUNET_TIME_Relative delta = GNUNET_TIME_absolute_get_difference(now, s->flow_delay_from_other_peer);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "We try to send to early! Should in %llu!\n", delta.rel_value);
+ }
+ else
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "We can send!\n");
+ }
+ else
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "SENDING without session!\n");
if (mlen <= UDP_MTU)
{
mlen = udp_send (plugin, peer_session->sock_addr, &udp->header);
struct Plugin *plugin = cls;
struct SourceInformation *si = client;
struct GNUNET_ATS_Information distance;
+ struct GNUNET_TIME_Relative delay;
/* setup ATS */
distance.type = htonl (GNUNET_ATS_QUALITY_NET_DISTANCE);
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Giving Session %X %s to transport\n", si->session, GNUNET_i2s(&si->session->target));
- plugin->env->receive (plugin->env->cls, &si->sender, hdr, &distance, 1, si->session,
+ delay = plugin->env->receive (plugin->env->cls, &si->sender, hdr, &distance, 1, si->session,
si->arg, si->args);
+ si->session->flow_delay_for_other_peer = delay;
}
static void
ack_proc (void *cls, uint32_t id, const struct GNUNET_MessageHeader *msg)
{
struct ReceiveContext *rc = cls;
- size_t msize = sizeof (struct UDPMessage) + ntohs (msg->size);
+
+ size_t msize = sizeof (struct UDP_ACK_Message) + ntohs (msg->size);
char buf[msize];
- struct UDPMessage *udp;
+ struct UDP_ACK_Message *udp_ack;
+ uint32_t delay = 0;
+
+ struct Session *s;
+ s = find_inbound_session_by_addr (rc->plugin, rc->src_addr, rc->addr_len);
+ if (s != NULL)
+ {
+ if (s->flow_delay_for_other_peer.rel_value <= UINT32_MAX)
+ delay = s->flow_delay_for_other_peer.rel_value;
+ else
+ delay = UINT32_MAX;
+ }
+
#if DEBUG_UDP
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending ACK to `%s'\n",
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending ACK to `%s' including delay of %u ms\n",
GNUNET_a2s (rc->src_addr,
(rc->src_addr->sa_family ==
AF_INET) ? sizeof (struct sockaddr_in) :
- sizeof (struct sockaddr_in6)));
+ sizeof (struct sockaddr_in6)),
+ delay);
#endif
- udp = (struct UDPMessage *) buf;
- udp->header.size = htons ((uint16_t) msize);
- udp->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK);
- udp->reserved = htonl (0);
- udp->sender = *rc->plugin->env->my_identity;
- memcpy (&udp[1], msg, ntohs (msg->size));
- (void) udp_send (rc->plugin, rc->src_addr, &udp->header);
+ udp_ack = (struct UDP_ACK_Message *) buf;
+ udp_ack->header.size = htons ((uint16_t) msize);
+ udp_ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK);
+ udp_ack->delay = htonl (delay);
+ udp_ack->sender = *rc->plugin->env->my_identity;
+ memcpy (&udp_ack[1], msg, ntohs (msg->size));
+ (void) udp_send (rc->plugin, rc->src_addr, &udp_ack->header);
}
* Number of bytes in 'addr'.
*/
socklen_t addr_len;
+
+ struct Session * session;
};
const struct GNUNET_MessageHeader *msg;
const struct GNUNET_MessageHeader *ack;
struct Session *peer_session;
- const struct UDPMessage *udp;
+ const struct UDP_ACK_Message *udp_ack;
struct ReceiveContext *rc;
struct GNUNET_TIME_Absolute now;
struct FindReceiveContext frc;
+ struct Session * s = NULL;
+ struct GNUNET_TIME_Relative flow_delay;
fromlen = sizeof (addr);
memset (&addr, 0, sizeof (addr));
(const struct sockaddr *) addr, fromlen);
return;
case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK:
+
if (ntohs (msg->size) <
- sizeof (struct UDPMessage) + sizeof (struct GNUNET_MessageHeader))
+ sizeof (struct UDP_ACK_Message) + sizeof (struct GNUNET_MessageHeader))
{
GNUNET_break_op (0);
return;
}
- udp = (const struct UDPMessage *) msg;
- if (ntohl (udp->reserved) != 0)
+ udp_ack = (const struct UDP_ACK_Message *) msg;
+ s = find_inbound_session(plugin, &udp_ack->sender, addr, fromlen);
+ if (s != NULL)
{
- GNUNET_break_op (0);
- return;
+ flow_delay.rel_value = (uint64_t) ntohl(udp_ack->delay);
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "We received a sending delay of %llu\n", flow_delay.rel_value);
+
+ s->flow_delay_from_other_peer = GNUNET_TIME_absolute_add(GNUNET_TIME_absolute_get(), flow_delay);
}
- ack = (const struct GNUNET_MessageHeader *) &udp[1];
- if (ntohs (ack->size) != ntohs (msg->size) - sizeof (struct UDPMessage))
+ ack = (const struct GNUNET_MessageHeader *) &udp_ack[1];
+ if (ntohs (ack->size) != ntohs (msg->size) - sizeof (struct UDP_ACK_Message))
{
GNUNET_break_op (0);
return;
GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
#endif
- peer_session = find_session (plugin, &udp->sender);
+ peer_session = find_session (plugin, &udp_ack->sender);
if (NULL == peer_session)
{
#if DEBUG_UDP
return;
GNUNET_assert (GNUNET_OK ==
GNUNET_CONTAINER_multihashmap_remove (plugin->sessions,
- &udp->
+ &udp_ack->
sender.hashPubKey,
peer_session));
plugin->last_expected_delay =
GNUNET_FRAGMENT_context_destroy (peer_session->frag);
if (peer_session->cont != NULL)
- peer_session->cont (peer_session->cont_cls, &udp->sender, GNUNET_OK);
+ peer_session->cont (peer_session->cont_cls, &udp_ack->sender, GNUNET_OK);
GNUNET_free (peer_session);
return;
case GNUNET_MESSAGE_TYPE_FRAGMENT:
return api;
}
-/*
-
-static void invalidation_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
- struct Session * s = cls;
- struct Plugin * plugin = s->plugin;
-
- s->invalidation_task = GNUNET_SCHEDULER_NO_TASK;
-
- GNUNET_CONTAINER_multihashmap_remove (plugin->inbound_sessions, &s->target.hashPubKey, s);
-
-
- plugin->env->session_end (plugin->env->cls, &s->target, s);
- LOG (GNUNET_ERROR_TYPE_ERROR,
- "Session %X is now invalid\n", s);
- destroy_session(s, &s->target.hashPubKey, s);
-}
-*/
-
-
-
/**
* Shutdown the plugin.
*