};
+/**
+ * UDP ACK Message-Packet header (after defragmentation).
+ */
+struct UDP_ACK_Message
+{
+ /**
+ * Message header.
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Desired delay for flow control
+ */
+ uint32_t delay;
+
+ /**
+ * What is the identity of the sender
+ */
+ struct GNUNET_PeerIdentity sender;
+};
+
+
/**
* Network format for IPv4 addresses.
*/
struct GNUNET_TIME_Absolute valid_until;
GNUNET_SCHEDULER_TaskIdentifier invalidation_task;
+
+ GNUNET_SCHEDULER_TaskIdentifier delayed_cont_task;
+
+ /**
+ * Desired delay for next sending we send to other peer
+ */
+ struct GNUNET_TIME_Relative flow_delay_for_other_peer;
+
+ /**
+ * Desired delay for next sending we received from other peer
+ */
+ struct GNUNET_TIME_Absolute flow_delay_from_other_peer;
};
*/
size_t addr_len;
+ struct GNUNET_PeerIdentity id;
+
};
* @param peer peer's identity
* @return NULL if we have no session
*/
-struct Session *
+static struct Session *
find_session (struct Plugin *plugin, const struct GNUNET_PeerIdentity *peer)
{
return GNUNET_CONTAINER_multihashmap_get (plugin->sessions,
&peer->hashPubKey);
}
-int inbound_session_iterator (void *cls,
- const GNUNET_HashCode * key,
- void *value)
+
+static int
+inbound_session_iterator (void *cls,
+ const GNUNET_HashCode * key,
+ void *value)
{
struct PeerSessionIteratorContext *psc = cls;
struct Session *s = value;
}
if (psc->result != NULL)
return GNUNET_NO;
- else
- return GNUNET_YES;
-};
+ return GNUNET_YES;
+}
+
/**
* Lookup the session for the given peer.
* @param peer peer's identity
* @return NULL if we have no session
*/
-struct Session *
+static struct Session *
find_inbound_session (struct Plugin *plugin,
const struct GNUNET_PeerIdentity *peer,
const void * addr, size_t addrlen)
}
+static int
+inbound_session_by_addr_iterator (void *cls,
+ const GNUNET_HashCode * key,
+ void *value)
+{
+ struct PeerSessionIteratorContext *psc = cls;
+ struct Session *s = value;
+ if (s->addrlen == psc->addrlen)
+ {
+ if (0 == memcmp (&s[1], psc->addr, s->addrlen))
+ psc->result = s;
+ }
+ if (psc->result != NULL)
+ return GNUNET_NO;
+ else
+ return GNUNET_YES;
+};
+
+/**
+ * Lookup the session for the given peer just by address.
+ *
+ * @param plugin the plugin
+ * @param addr address
+ * @param addrlen address length
+ * @return NULL if we have no session
+ */
+static struct Session *
+find_inbound_session_by_addr (struct Plugin *plugin, const void * addr, size_t addrlen)
+{
+ struct PeerSessionIteratorContext psc;
+ psc.result = NULL;
+ psc.addrlen = addrlen;
+ psc.addr = addr;
+
+ GNUNET_CONTAINER_multihashmap_iterate (plugin->inbound_sessions, &inbound_session_by_addr_iterator, &psc);
+ return psc.result;
+}
+
+
/**
* Destroy a session, plugin is being unloaded.
*
if (peer_session->frag != NULL)
GNUNET_FRAGMENT_context_destroy (peer_session->frag);
+ if (GNUNET_SCHEDULER_NO_TASK != peer_session->delayed_cont_task)
+ GNUNET_SCHEDULER_cancel (peer_session->delayed_cont_task);
GNUNET_free (peer_session);
return GNUNET_OK;
}
if (s->invalidation_task != GNUNET_SCHEDULER_NO_TASK)
GNUNET_SCHEDULER_cancel(s->invalidation_task);
+ if (GNUNET_SCHEDULER_NO_TASK != s->delayed_cont_task)
+ GNUNET_SCHEDULER_cancel (s->delayed_cont_task);
GNUNET_free (s);
return GNUNET_OK;
}
"UDP DISCONNECT\n");
plugin->last_expected_delay = GNUNET_FRAGMENT_context_destroy (session->frag);
+ if (GNUNET_SCHEDULER_NO_TASK != session->delayed_cont_task)
+ GNUNET_SCHEDULER_cancel (session->delayed_cont_task);
if (session->cont != NULL)
session->cont (session->cont_cls, target, GNUNET_SYSERR);
GNUNET_free (session);
GNUNET_FRAGMENT_context_transmission_done (session->frag);
}
+
static struct Session *
create_session (struct Plugin *plugin, const struct GNUNET_PeerIdentity *target,
const void *addr, size_t addrlen,
static const char *
udp_address_to_string (void *cls, const void *addr, size_t addrlen);
+
+static void
+udp_call_continuation (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct Session *s = cls;
+ GNUNET_TRANSPORT_TransmitContinuation cont = s->cont;
+
+ s->delayed_cont_task = GNUNET_SCHEDULER_NO_TASK;
+ s->cont = NULL;
+ cont (s->cont_cls, &s->target, GNUNET_OK);
+}
+
+
/**
* Function that can be used by the transport service to transmit
* a message using the plugin.
size_t mlen = msgbuf_size + sizeof (struct UDPMessage);
char mbuf[mlen];
struct UDPMessage *udp;
+ struct GNUNET_TIME_Relative delta;
if (mlen >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
{
if ((force_address == GNUNET_SYSERR) && (session == NULL))
return GNUNET_SYSERR;
+ s = NULL;
/* safety check: comparing address to address stored in session */
if ((session != NULL) && (addr != NULL) && (addrlen != 0))
{
}
}
//session_invalid:
+ if ((addr == NULL) || (addrlen == 0))
+ return GNUNET_SYSERR;
peer_session = create_session (plugin, target, addr, addrlen, cont, cont_cls);
if (peer_session == NULL)
{
if (cont != NULL)
cont (cont_cls, target, GNUNET_SYSERR);
+ return GNUNET_SYSERR;;
}
/* Message */
udp->sender = *plugin->env->my_identity;
memcpy (&udp[1], msgbuf, msgbuf_size);
+ if (s != NULL)
+ delta = GNUNET_TIME_absolute_get_remaining (s->flow_delay_from_other_peer);
+ else
+ delta = GNUNET_TIME_UNIT_ZERO;
if (mlen <= UDP_MTU)
{
mlen = udp_send (plugin, peer_session->sock_addr, &udp->header);
if (cont != NULL)
- cont (cont_cls, target, (mlen > 0) ? GNUNET_OK : GNUNET_SYSERR);
+ {
+ if ( (delta.rel_value > 0) &&
+ (mlen > 0) )
+ {
+ s->cont = cont;
+ s->cont_cls = cont_cls;
+ s->delayed_cont_task = GNUNET_SCHEDULER_add_delayed (delta,
+ &udp_call_continuation,
+ s);
+ }
+ else
+ cont (cont_cls, target, (mlen > 0) ? GNUNET_OK : GNUNET_SYSERR);
+ }
GNUNET_free_non_null (peer_session);
}
else
struct Plugin *plugin = cls;
struct SourceInformation *si = client;
struct GNUNET_ATS_Information distance;
+ struct GNUNET_TIME_Relative delay;
/* setup ATS */
distance.type = htonl (GNUNET_ATS_QUALITY_NET_DISTANCE);
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Giving Session %X %s to transport\n", si->session, GNUNET_i2s(&si->session->target));
- plugin->env->receive (plugin->env->cls, &si->sender, hdr, &distance, 1, si->session,
+ delay = plugin->env->receive (plugin->env->cls, &si->sender, hdr, &distance, 1, si->session,
si->arg, si->args);
+ si->session->flow_delay_for_other_peer = delay;
}
static void
s,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
}
- s->valid_until = GNUNET_TIME_absolute_add(GNUNET_TIME_absolute_get(), GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
+ s->valid_until = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
if (s->invalidation_task != GNUNET_SCHEDULER_NO_TASK)
{
GNUNET_SCHEDULER_cancel(s->invalidation_task);
ack_proc (void *cls, uint32_t id, const struct GNUNET_MessageHeader *msg)
{
struct ReceiveContext *rc = cls;
- size_t msize = sizeof (struct UDPMessage) + ntohs (msg->size);
+
+ size_t msize = sizeof (struct UDP_ACK_Message) + ntohs (msg->size);
char buf[msize];
- struct UDPMessage *udp;
+ struct UDP_ACK_Message *udp_ack;
+ uint32_t delay = 0;
+
+ struct Session *s;
+ s = find_inbound_session_by_addr (rc->plugin, rc->src_addr, rc->addr_len);
+ if (s != NULL)
+ {
+ if (s->flow_delay_for_other_peer.rel_value <= UINT32_MAX)
+ delay = s->flow_delay_for_other_peer.rel_value;
+ else
+ delay = UINT32_MAX;
+ }
+
#if DEBUG_UDP
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending ACK to `%s'\n",
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending ACK to `%s' including delay of %u ms\n",
GNUNET_a2s (rc->src_addr,
(rc->src_addr->sa_family ==
AF_INET) ? sizeof (struct sockaddr_in) :
- sizeof (struct sockaddr_in6)));
+ sizeof (struct sockaddr_in6)),
+ delay);
#endif
- udp = (struct UDPMessage *) buf;
- udp->header.size = htons ((uint16_t) msize);
- udp->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK);
- udp->reserved = htonl (0);
- udp->sender = *rc->plugin->env->my_identity;
- memcpy (&udp[1], msg, ntohs (msg->size));
- (void) udp_send (rc->plugin, rc->src_addr, &udp->header);
+ udp_ack = (struct UDP_ACK_Message *) buf;
+ udp_ack->header.size = htons ((uint16_t) msize);
+ udp_ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK);
+ udp_ack->delay = htonl (delay);
+ udp_ack->sender = *rc->plugin->env->my_identity;
+ memcpy (&udp_ack[1], msg, ntohs (msg->size));
+ (void) udp_send (rc->plugin, rc->src_addr, &udp_ack->header);
}
* Number of bytes in 'addr'.
*/
socklen_t addr_len;
+
+ struct Session * session;
};
const struct GNUNET_MessageHeader *msg;
const struct GNUNET_MessageHeader *ack;
struct Session *peer_session;
- const struct UDPMessage *udp;
+ const struct UDP_ACK_Message *udp_ack;
struct ReceiveContext *rc;
struct GNUNET_TIME_Absolute now;
struct FindReceiveContext frc;
+ struct Session * s = NULL;
+ struct GNUNET_TIME_Relative flow_delay;
fromlen = sizeof (addr);
memset (&addr, 0, sizeof (addr));
(const struct sockaddr *) addr, fromlen);
return;
case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK:
+
if (ntohs (msg->size) <
- sizeof (struct UDPMessage) + sizeof (struct GNUNET_MessageHeader))
+ sizeof (struct UDP_ACK_Message) + sizeof (struct GNUNET_MessageHeader))
{
GNUNET_break_op (0);
return;
}
- udp = (const struct UDPMessage *) msg;
- if (ntohl (udp->reserved) != 0)
+ udp_ack = (const struct UDP_ACK_Message *) msg;
+ s = find_inbound_session(plugin, &udp_ack->sender, addr, fromlen);
+ if (s != NULL)
{
- GNUNET_break_op (0);
- return;
+ flow_delay.rel_value = (uint64_t) ntohl(udp_ack->delay);
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "We received a sending delay of %llu\n", flow_delay.rel_value);
+
+ s->flow_delay_from_other_peer = GNUNET_TIME_relative_to_absolute (flow_delay);
}
- ack = (const struct GNUNET_MessageHeader *) &udp[1];
- if (ntohs (ack->size) != ntohs (msg->size) - sizeof (struct UDPMessage))
+ ack = (const struct GNUNET_MessageHeader *) &udp_ack[1];
+ if (ntohs (ack->size) != ntohs (msg->size) - sizeof (struct UDP_ACK_Message))
{
GNUNET_break_op (0);
return;
#if DEBUG_UDP
LOG (GNUNET_ERROR_TYPE_DEBUG,
"UDP processes %u-byte acknowledgement from `%s' at `%s'\n",
- (unsigned int) ntohs (msg->size), GNUNET_i2s (&udp->sender),
+ (unsigned int) ntohs (msg->size), GNUNET_i2s (&udp_ack->sender),
GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
#endif
- peer_session = find_session (plugin, &udp->sender);
+ peer_session = find_session (plugin, &udp_ack->sender);
if (NULL == peer_session)
{
#if DEBUG_UDP
return;
GNUNET_assert (GNUNET_OK ==
GNUNET_CONTAINER_multihashmap_remove (plugin->sessions,
- &udp->
+ &udp_ack->
sender.hashPubKey,
peer_session));
plugin->last_expected_delay =
GNUNET_FRAGMENT_context_destroy (peer_session->frag);
if (peer_session->cont != NULL)
- peer_session->cont (peer_session->cont_cls, &udp->sender, GNUNET_OK);
+ peer_session->cont (peer_session->cont_cls, &udp_ack->sender, GNUNET_OK);
GNUNET_free (peer_session);
return;
case GNUNET_MESSAGE_TYPE_FRAGMENT:
return api;
}
-/*
-
-static void invalidation_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
- struct Session * s = cls;
- struct Plugin * plugin = s->plugin;
-
- s->invalidation_task = GNUNET_SCHEDULER_NO_TASK;
-
- GNUNET_CONTAINER_multihashmap_remove (plugin->inbound_sessions, &s->target.hashPubKey, s);
-
-
- plugin->env->session_end (plugin->env->cls, &s->target, s);
- LOG (GNUNET_ERROR_TYPE_ERROR,
- "Session %X is now invalid\n", s);
- destroy_session(s, &s->target.hashPubKey, s);
-}
-*/
-
-
-
/**
* Shutdown the plugin.
*