}
+/**
+ * We received some payload. Prepare to pass it on to our clients.
+ *
+ * @param peer (claimed) identity of the other peer
+ * @param message the message, NULL if we only care about
+ * learning about the delay until we should receive again -- FIXME!
+ * @param ats performance information
+ * @param ats_count number of records in ats
+ * @return how long the plugin should wait until receiving more data
+ */
+static struct GNUNET_TIME_Relative
+process_payload (const struct GNUNET_PeerIdentity *peer,
+ const struct GNUNET_MessageHeader *message,
+ const struct GNUNET_ATS_Information *ats,
+ uint32_t ats_count)
+{
+ struct GNUNET_TIME_Relative ret;
+ int do_forward;
+ struct InboundMessage *im;
+ size_t size = sizeof (struct InboundMessage) + ntohs (message->size);
+ char buf[size];
+
+ ret = GNUNET_TIME_UNIT_ZERO;
+ do_forward = GNUNET_SYSERR;
+ ret =
+ GST_neighbours_calculate_receive_delay (peer,
+ (message ==
+ NULL) ? 0 :
+ ntohs (message->size),
+ &do_forward);
+ im = (struct InboundMessage*) buf;
+ im->header.size = htons (size);
+ im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
+ im->ats_count = htonl (0);
+ memcpy (&(im->peer), peer, sizeof (struct GNUNET_PeerIdentity));
+ memcpy (&im[1], message, ntohs (message->size));
+
+ switch (do_forward)
+ {
+ case GNUNET_YES:
+ GST_clients_broadcast (&im->header, GNUNET_YES);
+ break;
+ case GNUNET_NO:
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ _("Discarded %u bytes of type %u from %s: quota violated!\n"),
+ ntohs (message->size),
+ ntohs (message->type),
+ GNUNET_i2s (peer));
+ break;
+ case GNUNET_SYSERR:
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ _("Discarded %u bytes of type %u from %s: connection is down!\n"),
+ ntohs (message->size),
+ ntohs (message->type),
+ GNUNET_i2s (peer));
+ /* FIXME: store until connection is up? This is virtually always a SETKEY and a PING... */
+ break;
+ default:
+ GNUNET_break (0);
+ break;
+ }
+ return ret;
+}
+
+
/**
* Function called by the transport for each received message.
* This function should also be called with "NULL" for the
uint16_t sender_address_len)
{
const char *plugin_name = cls;
- int do_forward;
struct GNUNET_TIME_Relative ret;
uint16_t type;
ret = GNUNET_TIME_UNIT_ZERO;
- if (NULL != message)
+ if (NULL == message)
+ goto end;
+ type = ntohs (message->type);
+ switch (type)
{
- type = ntohs (message->type);
- switch (type)
- {
- case GNUNET_MESSAGE_TYPE_HELLO:
- GST_validation_handle_hello (message);
- return ret;
- case GNUNET_MESSAGE_TYPE_TRANSPORT_PING:
+ case GNUNET_MESSAGE_TYPE_HELLO:
+ GST_validation_handle_hello (message);
+ return ret;
+ case GNUNET_MESSAGE_TYPE_TRANSPORT_PING:
#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
- "Processing `%s' from `%s'\n", "PING",
- (sender_address != NULL) ? GST_plugins_a2s (plugin_name,
- sender_address,
- sender_address_len)
- : "<inbound>");
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
+ "Processing `%s' from `%s'\n", "PING",
+ (sender_address != NULL) ? GST_plugins_a2s (plugin_name,
+ sender_address,
+ sender_address_len)
+ : "<inbound>");
#endif
- GST_validation_handle_ping (peer, message, plugin_name, session,
- sender_address, sender_address_len);
- break;
- case GNUNET_MESSAGE_TYPE_TRANSPORT_PONG:
+ GST_validation_handle_ping (peer, message, plugin_name, session,
+ sender_address, sender_address_len);
+ break;
+ case GNUNET_MESSAGE_TYPE_TRANSPORT_PONG:
#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
- "Processing `%s' from `%s'\n", "PONG",
- (sender_address != NULL) ? GST_plugins_a2s (plugin_name,
- sender_address,
- sender_address_len)
- : "<inbound>");
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
+ "Processing `%s' from `%s'\n", "PONG",
+ (sender_address != NULL) ? GST_plugins_a2s (plugin_name,
+ sender_address,
+ sender_address_len)
+ : "<inbound>");
#endif
- GST_validation_handle_pong (peer, message);
- break;
- case GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT:
- (void) GST_blacklist_test_allowed (peer, NULL, &try_connect_if_allowed,
- NULL);
- /* TODO: if 'session != NULL', and timestamp more recent than the
- * previous one, maybe notify ATS that this is now the preferred
- * * way to communicate with this peer (other peer switched transport) */
- break;
- case GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT:
- /* FIXME: do some validation to prevent an attacker from sending
- * a fake disconnect message... */
- GST_neighbours_force_disconnect (peer);
- break;
- case GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_KEEPALIVE:
- GST_neighbours_keepalive (peer);
- break;
- default:
- /* should be payload */
- do_forward = GNUNET_SYSERR;
- ret =
- GST_neighbours_calculate_receive_delay (peer,
- (message ==
- NULL) ? 0 :
- ntohs (message->size),
- &do_forward);
- if (do_forward == GNUNET_YES)
- {
- struct InboundMessage *im;
- size_t size = sizeof (struct InboundMessage) + ntohs (message->size);
-
- im = GNUNET_malloc (size);
- im->header.size = htons (size);
- im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
- im->ats_count = htonl (0);
- memcpy (&(im->peer), peer, sizeof (struct GNUNET_PeerIdentity));
- memcpy (&im[1], message, ntohs (message->size));
- GST_clients_broadcast ((const struct GNUNET_MessageHeader *) im,
- GNUNET_YES);
-
- GNUNET_free (im);
- }
- else
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- _("Discarded %u bytes of type %u from %s via plugin %s: connection is down!\n"),
- ntohs (message->size),
- type,
- GNUNET_i2s (peer),
- plugin_name);
- break;
- }
+ GST_validation_handle_pong (peer, message);
+ break;
+ case GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT:
+ GST_neighbours_handle_connect (message,
+ peer,
+ plugin_name, sender_address, sender_address_len,
+ session, ats, ats_count);
+ (void) GST_blacklist_test_allowed (peer, NULL, &try_connect_if_allowed,
+ NULL);
+ break;
+ case GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT:
+ /* FIXME: do some validation to prevent an attacker from sending
+ * a fake disconnect message... */
+ GST_neighbours_force_disconnect (peer);
+ break;
+ case GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_KEEPALIVE:
+ GST_neighbours_keepalive (peer);
+ break;
+ default:
+ /* should be payload */
+ process_payload (peer,
+ message,
+ ats, ats_count);
+ break;
}
- /*
- FIXME: this gives an address that might not have been validated to
- ATS for 'selection', which is probably not what we want; this
- might be particularly wrong (as in, possibly hiding bugs with address
- validation) as 'GNUNET_ATS_address_update' currently ignores
- the expiration given.
- */
+ end:
+#if 1
+ /* FIXME: this should not be needed, and not sure it's good to have it, but without
+ this connections seem to go extra-slow */
if ((ats_count > 0) && (ats != NULL))
GNUNET_ATS_address_update (GST_ats, peer,
plugin_name, sender_address, sender_address_len,
session,
ats, ats_count);
+#endif
return ret;
}
*/
struct GNUNET_BANDWIDTH_Tracker in_tracker;
+ /**
+ * Timestamp of the 'SESSION_CONNECT' message we got from the other peer
+ */
+ struct GNUNET_TIME_Absolute connect_ts;
+
/**
* How often has the other peer (recently) violated the inbound
* traffic limit? Incremented by 10 per violation, decremented by 1
n = lookup_neighbour (peer);
if (NULL == n)
{
- /* FIXME: ATS not fully implemented, once ATS only generates
- these events for 'connected' addresses, things should be better... */
- // GNUNET_break (0);
+ GNUNET_ATS_address_destroyed (GST_ats,
+ peer,
+ plugin_name, address,
+ address_len, session);
return;
}
was_connected = n->is_connected;
n->is_connected = GNUNET_YES;
if (GNUNET_YES != was_connected)
- n->keepalive_task = GNUNET_SCHEDULER_add_now (&neighbour_keepalive_task,
- n);
+ n->keepalive_task = GNUNET_SCHEDULER_add_delayed (KEEPALIVE_FREQUENCY,
+ &neighbour_keepalive_task,
+ n);
#if DEBUG_TRANSPORT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
address_len),
session);
#endif
-
GNUNET_free_non_null (n->addr);
n->addr = GNUNET_malloc (address_len);
memcpy (n->addr, address, address_len);
GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
GST_neighbours_send (peer, &connect_msg, sizeof (connect_msg),
GNUNET_TIME_UNIT_FOREVER_REL, NULL, NULL);
-
if (GNUNET_YES == was_connected)
return;
/* First tell clients about connected neighbours...*/
connect_notify_cb (callback_cls, peer, ats, ats_count);
}
+
+/**
+ * Create an entry in the neighbour map for the given peer
+ *
+ * @param peer peer to create an entry for
+ * @return new neighbour map entry
+ */
+static struct NeighbourMapEntry *
+setup_neighbour (const struct GNUNET_PeerIdentity *peer)
+{
+ struct NeighbourMapEntry *n;
+
+#if DEBUG_TRANSPORT
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Unknown peer `%s', creating new neighbour\n",
+ GNUNET_i2s (peer));
+#endif
+ n = GNUNET_malloc (sizeof (struct NeighbourMapEntry));
+ n->id = *peer;
+ GNUNET_BANDWIDTH_tracker_init (&n->in_tracker,
+ GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
+ MAX_BANDWIDTH_CARRY_S);
+ n->timeout_task =
+ GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
+ &neighbour_timeout_task, n);
+ GNUNET_assert (GNUNET_OK ==
+ GNUNET_CONTAINER_multihashmap_put (neighbours,
+ &n->id.hashPubKey, n,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+ return n;
+}
+
+
/**
* Try to create a connection to the given target (eventually).
*
struct NeighbourMapEntry *n;
GNUNET_assert (neighbours != NULL);
-
#if DEBUG_TRANSPORT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Trying to connect to peer `%s'\n",
GNUNET_i2s (target));
#endif
-
GNUNET_assert (0 !=
memcmp (target, &GST_my_identity,
sizeof (struct GNUNET_PeerIdentity)));
if ((NULL != n) && (GNUNET_YES == n->is_connected))
return; /* already connected */
if (n == NULL)
- {
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Unknown peer `%s', creating new neighbour\n",
- GNUNET_i2s (target));
-#endif
- n = GNUNET_malloc (sizeof (struct NeighbourMapEntry));
- n->id = *target;
- GNUNET_BANDWIDTH_tracker_init (&n->in_tracker,
- GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
- MAX_BANDWIDTH_CARRY_S);
- n->timeout_task =
- GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
- &neighbour_timeout_task, n);
- GNUNET_assert (GNUNET_OK ==
- GNUNET_CONTAINER_multihashmap_put (neighbours,
- &n->id.hashPubKey, n,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
- }
+ n = setup_neighbour (target);
#if DEBUG_TRANSPORT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Asking ATS for suggested address to connect to peer `%s'\n",
- GNUNET_i2s (target));
+ GNUNET_i2s (&n->id));
#endif
- GNUNET_ATS_suggest_address (GST_ats, target);
+ GNUNET_ATS_suggest_address (GST_ats, &n->id);
}
"Session %X to peer `%s' ended \n",
session, GNUNET_i2s (peer));
#endif
-
+ GNUNET_ATS_address_destroyed (GST_ats,
+ peer,
+ NULL, NULL, 0,
+ session);
n = lookup_neighbour (peer);
if (NULL == n)
return;
* @param sender sender of the message
* @param size size of the message
* @param do_forward set to GNUNET_YES if the message should be forwarded to clients
- * GNUNET_NO if the neighbour is not connected or violates the quota
+ * GNUNET_NO if the neighbour is not connected or violates the quota,
+ * GNUNET_SYSERR if the connection is not fully up yet
* @return how long to wait before reading more from this sender
*/
struct GNUNET_TIME_Relative
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
_("Plugin gave us %d bytes of data but somehow the session is not marked as UP yet!\n"),
(int) size);
- *do_forward = GNUNET_NO;
+ *do_forward = GNUNET_SYSERR;
return GNUNET_TIME_UNIT_ZERO;
}
if (GNUNET_YES == GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, size))
}
+/**
+ * We received a 'SESSION_CONNECT' message from the other peer.
+ * Consider switching to it.
+ *
+ * @param message possibly a 'struct SessionConnectMessage' (check format)
+ * @param peer identity of the peer to switch the address for
+ * @param plugin_name name of transport that delivered the PONG
+ * @param address address of the other peer, NULL if other peer
+ * connected to us
+ * @param address_len number of bytes in address
+ * @param session session to use (or NULL)
+ * @param ats performance data
+ * @param ats_count number of entries in ats (excluding 0-termination)
+ */
+void
+GST_neighbours_handle_connect (const struct GNUNET_MessageHeader *message,
+ const struct GNUNET_PeerIdentity *peer,
+ const char *plugin_name,
+ const char *sender_address, uint16_t sender_address_len,
+ struct Session *session,
+ const struct GNUNET_ATS_Information *ats,
+ uint32_t ats_count)
+{
+ const struct SessionConnectMessage *scm;
+ struct GNUNET_TIME_Absolute ts;
+ struct NeighbourMapEntry *n;
+
+ if (ntohs (message->size) != sizeof (struct SessionConnectMessage))
+ {
+ GNUNET_break_op (0);
+ return;
+ }
+ scm = (const struct SessionConnectMessage *) message;
+ GNUNET_break_op (ntohl (scm->reserved) == 0);
+ ts = GNUNET_TIME_absolute_ntoh (scm->timestamp);
+ n = lookup_neighbour (peer);
+ if (NULL == n)
+ n = setup_neighbour (peer);
+ if (ts.abs_value > n->connect_ts.abs_value)
+ {
+ GNUNET_ATS_address_update (GST_ats,
+ peer,
+ plugin_name, sender_address, sender_address_len,
+ session, ats, ats_count);
+ n->connect_ts = ts;
+ }
+}
+
+
/* end of file gnunet-service-transport_neighbours.c */
*ats, uint32_t ats_count);
+/**
+ * We received a 'SESSION_CONNECT' message from the other peer.
+ * Consider switching to it.
+ *
+ * @param message possibly a 'struct SessionConnectMessage' (check format)
+ * @param peer identity of the peer to switch the address for
+ * @param plugin_name name of transport that delivered the PONG
+ * @param address address of the other peer, NULL if other peer
+ * connected to us
+ * @param address_len number of bytes in address
+ * @param session session to use (or NULL)
+ * @param ats performance data
+ * @param ats_count number of entries in ats (excluding 0-termination)
+ */
+void
+GST_neighbours_handle_connect (const struct GNUNET_MessageHeader *message,
+ const struct GNUNET_PeerIdentity *peer,
+ const char *plugin_name,
+ const char *sender_address, uint16_t sender_address_len,
+ struct Session *session,
+ const struct GNUNET_ATS_Information *ats,
+ uint32_t ats_count);
+
+
#endif
/* end of file gnunet-service-transport_neighbours.h */
GNUNET_CONTAINER_DLL_remove (session->pending_messages_head,
session->pending_messages_tail, pos);
GNUNET_assert (size >= pos->message_size);
+ GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
+ "tcp",
+ "Transmitting message of type %u\n",
+ ntohs (((struct GNUNET_MessageHeader*)pos->msg)->type));
/* FIXME: this memcpy can be up to 7% of our total runtime */
memcpy (cbuf, pos->msg, pos->message_size);
cbuf += pos->message_size;
return;
}
session->last_activity = GNUNET_TIME_absolute_get ();
-#if DEBUG_TCP > 1
+#if DEBUG_TCP
GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "tcp",
"Passing %u bytes of type %u from `%4s' to transport service.\n",
(unsigned int) ntohs (message->size),
*/
struct GNUNET_MessageHeader header;
- /**
- * Always zero.
- */
- uint32_t reserved GNUNET_PACKED;
-
/**
* Number of ATS key-value pairs that follow this struct
* (excluding the 0-terminator).
break;
}
im = (const struct InboundMessage *) msg;
- GNUNET_break (0 == ntohl (im->reserved));
ats_count = ntohl (im->ats_count);
imm = (const struct GNUNET_MessageHeader *) &((&(im->ats))[ats_count + 1]);