*/
static void
transmit_our_hello (void *cls, const struct GNUNET_PeerIdentity *target,
- const struct GNUNET_TRANSPORT_ATS_Information *ats,
+ const struct GNUNET_ATS_Information *ats,
uint32_t ats_count,
const char * transport,
const void * addr,
}
+/**
+ * We received some payload. Prepare to pass it on to our clients.
+ *
+ * @param peer (claimed) identity of the other peer
+ * @param message the message, NULL if we only care about
+ * learning about the delay until we should receive again -- FIXME!
+ * @param ats performance information
+ * @param ats_count number of records in ats
+ * @return how long the plugin should wait until receiving more data
+ */
+static struct GNUNET_TIME_Relative
+process_payload (const struct GNUNET_PeerIdentity *peer,
+ const struct GNUNET_MessageHeader *message,
+ const struct GNUNET_ATS_Information *ats,
+ uint32_t ats_count)
+{
+ struct GNUNET_TIME_Relative ret;
+ int do_forward;
+ struct InboundMessage *im;
+ size_t size = sizeof (struct InboundMessage) + ntohs (message->size);
+ char buf[size];
+
+ ret = GNUNET_TIME_UNIT_ZERO;
+ do_forward = GNUNET_SYSERR;
+ ret =
+ GST_neighbours_calculate_receive_delay (peer,
+ (message ==
+ NULL) ? 0 :
+ ntohs (message->size),
+ &do_forward);
+ im = (struct InboundMessage*) buf;
+ im->header.size = htons (size);
+ im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
+ im->ats_count = htonl (0);
+ memcpy (&(im->peer), peer, sizeof (struct GNUNET_PeerIdentity));
+ memcpy (&im[1], message, ntohs (message->size));
+
+ switch (do_forward)
+ {
+ case GNUNET_YES:
+ GST_clients_broadcast (&im->header, GNUNET_YES);
+ break;
+ case GNUNET_NO:
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ _("Discarded %u bytes of type %u from %s: quota violated!\n"),
+ ntohs (message->size),
+ ntohs (message->type),
+ GNUNET_i2s (peer));
+ break;
+ case GNUNET_SYSERR:
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ _("Discarded %u bytes of type %u from %s: connection is down!\n"),
+ ntohs (message->size),
+ ntohs (message->type),
+ GNUNET_i2s (peer));
+ /* FIXME: store until connection is up? This is virtually always a SETKEY and a PING... */
+ break;
+ default:
+ GNUNET_break (0);
+ break;
+ }
+ return ret;
+}
+
+
/**
* Function called by the transport for each received message.
* This function should also be called with "NULL" for the
static struct GNUNET_TIME_Relative
plugin_env_receive_callback (void *cls, const struct GNUNET_PeerIdentity *peer,
const struct GNUNET_MessageHeader *message,
- const struct GNUNET_TRANSPORT_ATS_Information *ats,
+ const struct GNUNET_ATS_Information *ats,
uint32_t ats_count, struct Session *session,
const char *sender_address,
uint16_t sender_address_len)
{
const char *plugin_name = cls;
- int do_forward;
struct GNUNET_TIME_Relative ret;
uint16_t type;
-
+
ret = GNUNET_TIME_UNIT_ZERO;
- if (NULL != message)
+ if (NULL == message)
+ goto end;
+ type = ntohs (message->type);
+ switch (type)
{
- type = ntohs (message->type);
- switch (type)
- {
- case GNUNET_MESSAGE_TYPE_HELLO:
- GST_validation_handle_hello (message);
- return ret;
- case GNUNET_MESSAGE_TYPE_TRANSPORT_PING:
+ case GNUNET_MESSAGE_TYPE_HELLO:
+ GST_validation_handle_hello (message);
+ return ret;
+ case GNUNET_MESSAGE_TYPE_TRANSPORT_PING:
#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
- "Processing `%s' from `%s'\n", "PING",
- (sender_address != NULL) ? GST_plugins_a2s (plugin_name,
- sender_address,
- sender_address_len)
- : "<inbound>");
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
+ "Processing `%s' from `%s'\n", "PING",
+ (sender_address != NULL) ? GST_plugins_a2s (plugin_name,
+ sender_address,
+ sender_address_len)
+ : "<inbound>");
#endif
- GST_validation_handle_ping (peer, message, plugin_name, session,
- sender_address, sender_address_len);
- break;
- case GNUNET_MESSAGE_TYPE_TRANSPORT_PONG:
+ GST_validation_handle_ping (peer, message, plugin_name, session,
+ sender_address, sender_address_len);
+ break;
+ case GNUNET_MESSAGE_TYPE_TRANSPORT_PONG:
#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
- "Processing `%s' from `%s'\n", "PONG",
- (sender_address != NULL) ? GST_plugins_a2s (plugin_name,
- sender_address,
- sender_address_len)
- : "<inbound>");
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
+ "Processing `%s' from `%s'\n", "PONG",
+ (sender_address != NULL) ? GST_plugins_a2s (plugin_name,
+ sender_address,
+ sender_address_len)
+ : "<inbound>");
#endif
- GST_validation_handle_pong (peer, message);
- break;
- case GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT:
- (void) GST_blacklist_test_allowed (peer, NULL, &try_connect_if_allowed,
- NULL);
- /* TODO: if 'session != NULL', and timestamp more recent than the
- * previous one, maybe notify ATS that this is now the preferred
- * * way to communicate with this peer (other peer switched transport) */
- break;
- case GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT:
- /* FIXME: do some validation to prevent an attacker from sending
- * a fake disconnect message... */
- GST_neighbours_force_disconnect (peer);
- break;
- case GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_KEEPALIVE:
- GST_neighbours_keepalive (peer);
- break;
- default:
- /* should be payload */
- do_forward = GNUNET_SYSERR;
- ret =
- GST_neighbours_calculate_receive_delay (peer,
- (message ==
- NULL) ? 0 :
- ntohs (message->size),
- &do_forward);
- if (do_forward == GNUNET_YES)
- {
- struct InboundMessage *im;
- size_t size = sizeof (struct InboundMessage) + ntohs (message->size);
-
- im = GNUNET_malloc (size);
- im->header.size = htons (size);
- im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
- im->ats_count = htonl (0);
- memcpy (&(im->peer), peer, sizeof (struct GNUNET_PeerIdentity));
- memcpy (&im[1], message, ntohs (message->size));
- GST_clients_broadcast ((const struct GNUNET_MessageHeader *) im,
- GNUNET_YES);
-
- GNUNET_free (im);
- }
- break;
- }
+ GST_validation_handle_pong (peer, message);
+ break;
+ case GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT:
+ GST_neighbours_handle_connect (message,
+ peer,
+ plugin_name, sender_address, sender_address_len,
+ session, ats, ats_count);
+ (void) GST_blacklist_test_allowed (peer, NULL, &try_connect_if_allowed,
+ NULL);
+ break;
+ case GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT:
+ /* FIXME: do some validation to prevent an attacker from sending
+ * a fake disconnect message... */
+ GST_neighbours_force_disconnect (peer);
+ break;
+ case GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_KEEPALIVE:
+ GST_neighbours_keepalive (peer);
+ break;
+ default:
+ /* should be payload */
+ process_payload (peer,
+ message,
+ ats, ats_count);
+ break;
+ }
+ end:
+#if 1
+ /* FIXME: this should not be needed, and not sure it's good to have it, but without
+ this connections seem to go extra-slow */
+ if ((ats_count > 0) && (ats != NULL))
+ {
+ if (NULL != session)
+ GNUNET_log_from (GNUNET_ERROR_TYPE_INFO | GNUNET_ERROR_TYPE_BULK,
+ "transport-ats",
+ "Giving ATS session %p of plugin %s for peer %s\n",
+ session,
+ plugin_name,
+ GNUNET_i2s (peer));
+ GNUNET_ATS_address_update (GST_ats, peer,
+ plugin_name, sender_address, sender_address_len,
+ session,
+ ats, ats_count);
}
- GNUNET_assert ((ats_count > 0) && (ats != NULL));
- /*
- FIXME: this gives an address that might not have been validated to
- ATS for 'selection', which is probably not what we want; this
- might be particularly wrong (as in, possibly hiding bugs with address
- validation) as 'GNUNET_ATS_address_update' currently ignores
- the expiration given.
- */
- GNUNET_ATS_address_update (GST_ats, peer, GNUNET_TIME_absolute_get (), /* valid at least until right now... */
- plugin_name, session, sender_address,
- sender_address_len, ats, ats_count);
+#endif
return ret;
}
"Session %X to peer `%s' ended \n",
session, GNUNET_i2s (peer));
#endif
- GNUNET_ATS_session_destroyed(GST_ats, peer, session);
+ if (NULL != session)
+ GNUNET_log_from (GNUNET_ERROR_TYPE_INFO | GNUNET_ERROR_TYPE_BULK,
+ "transport-ats",
+ "Telling ATS to destroy session %p from peer %s\n",
+ session,
+ GNUNET_i2s (peer));
+ GNUNET_ATS_address_destroyed(GST_ats, peer, NULL, NULL, 0, session);
GST_neighbours_session_terminated (peer, session);
}
* @param session session to use (if available)
* @param plugin_addr address to use (if available)
* @param plugin_addr_len number of bytes in addr
- * @param bandwidth_out assigned outbound bandwidth for the connection
- * @param bandwidth_in assigned inbound bandwidth for the connection
+ * @param bandwidth_out assigned outbound bandwidth for the connection, 0 to disconnect from peer
+ * @param bandwidth_in assigned inbound bandwidth for the connection, 0 to disconnect from peer
*/
static void
ats_request_address_change (void *cls, const struct GNUNET_PeerIdentity *peer,
- const char *plugin_name, struct Session *session,
+ const char *plugin_name,
const void *plugin_addr, size_t plugin_addr_len,
+ struct Session *session,
struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out,
- struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in)
+ struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in,
+ const struct GNUNET_ATS_Information * ats,
+ uint32_t ats_count)
{
+ uint32_t bw_in = ntohl (bandwidth_in.value__);
+ uint32_t bw_out = ntohl (bandwidth_out.value__);
+
+ /* ATS tells me to disconnect from peer*/
+ if ((bw_in == 0) && (bw_out == 0))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "ATS tells me to disconnect from peer `%s'\n",
+ GNUNET_i2s (peer));
+ GST_neighbours_force_disconnect(peer);
+ return;
+ }
+
GST_neighbours_switch_to_address (peer, plugin_name, plugin_addr,
- plugin_addr_len, session, NULL, 0);
+ plugin_addr_len, session, ats, ats_count);
+
+#if DEBUG_TRANSPORT
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending outbound quota of %u Bps for peer `%s' to all clients\n",
+ ntohl (bandwidth_out.value__), GNUNET_i2s (peer));
+#endif
+ struct QuotaSetMessage msg;
+ msg.header.size = htons (sizeof (struct QuotaSetMessage));
+ msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA);
+ msg.quota = bandwidth_out;
+ msg.peer = (*peer);
+ GST_clients_broadcast ((struct GNUNET_MessageHeader *) &msg, GNUNET_NO);
+
+#if DEBUG_TRANSPORT
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Setting inbound quota of %u for peer `%s' to \n",
+ ntohl (bandwidth_in.value__), GNUNET_i2s (peer));
+#endif
GST_neighbours_set_incoming_quota (peer, bandwidth_in);
- // FIXME: use 'bandwidth_out'!
}
* @param cls closure
* @param peer the peer that connected
* @param ats performance data
- * @param ats_count number of entries in ats (excluding 0-termination)
+ * @param ats_count number of entries in ats
*/
static void
neighbours_connect_notification (void *cls,
const struct GNUNET_PeerIdentity *peer,
- const struct GNUNET_TRANSPORT_ATS_Information
+ const struct GNUNET_ATS_Information
*ats, uint32_t ats_count)
{
char buf[sizeof (struct ConnectInfoMessage) +
- ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information)];
+ ats_count * sizeof (struct GNUNET_ATS_Information)];
struct ConnectInfoMessage *connect_msg = (struct ConnectInfoMessage *) buf;
- struct GNUNET_TRANSPORT_ATS_Information *atsm = &connect_msg->ats;
connect_msg->header.size = htons (sizeof (buf));
connect_msg->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
connect_msg->ats_count = htonl (ats_count);
connect_msg->id = *peer;
- memcpy (&connect_msg->ats, ats,
- ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information));
- atsm[ats_count].type = htonl (GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR);
- atsm[ats_count].value = htonl (0);
+ memcpy (&connect_msg->ats, &connect_msg->ats,
+ ats_count * sizeof (struct GNUNET_ATS_Information));
GST_clients_broadcast (&connect_msg->header, GNUNET_NO);
}
GST_validation_stop ();
GST_plugins_unload ();
GST_neighbours_stop ();
- GNUNET_ATS_shutdown (GST_ats);
+ GNUNET_ATS_scheduling_done (GST_ats);
GST_ats = NULL;
GST_clients_stop ();
GST_blacklist_stop ();
GST_plugins_load (&plugin_env_receive_callback,
&plugin_env_address_change_notification,
&plugin_env_session_end);
- GST_ats = GNUNET_ATS_init (GST_cfg, &ats_request_address_change, NULL);
+ GST_ats = GNUNET_ATS_scheduling_init (GST_cfg, &ats_request_address_change, NULL);
GST_neighbours_start (NULL, &neighbours_connect_notification,
&neighbours_disconnect_notification);
GST_clients_start (server);