* @brief low-level P2P messaging
* @author Christian Grothoff
*
- * TODO:
- * - if we do not receive an ACK in response to our
- * HELLO, retransmit HELLO!
+ * NOTE:
+ * - This code uses 'GNUNET_a2s' for debug printing in many places,
+ * which is technically wrong since it assumes we have IP+Port
+ * (v4/v6) addresses. Once we add transports like http or smtp
+ * this will have to be changed!
*/
#include "platform.h"
#include "gnunet_client_lib.h"
+#include "gnunet_container_lib.h"
#include "gnunet_constants.h"
#include "gnunet_getopt_lib.h"
#include "gnunet_hello_lib.h"
#include "plugin_transport.h"
#include "transport.h"
+/**
+ * Should we do some additional checks (to validate behavior
+ * of clients)?
+ */
+#define EXTRA_CHECKS GNUNET_YES
+
/**
* How many messages can we have pending for a given client process
* before we start to drop incoming messages? We typically should
*/
#define MAX_CONNECT_RETRY 3
+/**
+ * Limit on the number of ready-to-run tasks when validating
+ * HELLOs. If more tasks are ready to run, we will drop
+ * HELLOs instead of validating them.
+ */
+#define MAX_HELLO_LOAD 4
+
/**
* How often must a peer violate bandwidth quotas before we start
* to simply drop its messages?
*/
-#define QUOTA_VIOLATION_DROP_THRESHOLD 100
+#define QUOTA_VIOLATION_DROP_THRESHOLD 10
/**
* How long until a HELLO verification attempt should time out?
* Besides, if a single request to an address takes a long time,
* then the peer is unlikely worthwhile anyway.
*/
-#define HELLO_VERIFICATION_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 3)
+#define HELLO_VERIFICATION_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
/**
* How long will we allow sending of a ping to be delayed?
*/
#define TRANSPORT_DEFAULT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 15)
-#define TRANSPORT_DEFAULT_PRIORITY 4 /* Tired of remembering arbitrary priority names */
+/**
+ * How often will we re-validate for latency information
+ */
+#define TRANSPORT_DEFAULT_REVALIDATION GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 60)
+
+/**
+ * Priority to use for PONG messages.
+ */
+#define TRANSPORT_PONG_PRIORITY 4
/**
* How often do we re-add (cheaper) plugins to our list of plugins
#define PLUGIN_RETRY_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 15)
/**
- * After how long do we expire an address in a HELLO
- * that we just validated? This value is also used
- * for our own addresses when we create a HELLO.
+ * After how long do we expire an address in a HELLO that we just
+ * validated? This value is also used for our own addresses when we
+ * create a HELLO.
*/
#define HELLO_ADDRESS_EXPIRATION GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 12)
/**
- * Entry in linked list of network addresses.
+ * How long before an existing address expires should we again try to
+ * validate it? Must be (significantly) smaller than
+ * HELLO_ADDRESS_EXPIRATION.
+ */
+#define HELLO_REVALIDATION_START_TIME GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 1)
+
+
+/**
+ * List of addresses of other peers
+ */
+struct ForeignAddressList
+{
+ /**
+ * This is a linked list.
+ */
+ struct ForeignAddressList *next;
+
+ /**
+ * Which ready list does this entry belong to.
+ */
+ struct ReadyList *ready_list;
+
+ /**
+ * How long until we auto-expire this address (unless it is
+ * re-confirmed by the transport)?
+ */
+ struct GNUNET_TIME_Absolute expires;
+
+ /**
+ * Task used to re-validate addresses, updates latencies and
+ * verifies liveness.
+ */
+ GNUNET_SCHEDULER_TaskIdentifier revalidate_task;
+
+ /**
+ * Length of addr.
+ */
+ size_t addrlen;
+
+ /**
+ * The address.
+ */
+ const void *addr;
+
+ /**
+ * What was the last latency observed for this plugin
+ * and peer? Invalid if connected is GNUNET_NO.
+ */
+ struct GNUNET_TIME_Relative latency;
+
+ /**
+ * If we did not successfully transmit a message to the given peer
+ * via this connection during the specified time, we should consider
+ * the connection to be dead. This is used in the case that a TCP
+ * transport simply stalls writing to the stream but does not
+ * formerly get a signal that the other peer died.
+ */
+ struct GNUNET_TIME_Absolute timeout;
+
+ /**
+ * Are we currently connected via this address? The first time we
+ * successfully transmit or receive data to a peer via a particular
+ * address, we set this to GNUNET_YES. If we later get an error
+ * (disconnect notification, transmission failure, timeout), we set
+ * it back to GNUNET_NO.
+ */
+ int connected;
+
+ /**
+ * Is this plugin currently busy transmitting to the specific target?
+ * GNUNET_NO if not (initial, default state is GNUNET_NO). Internal
+ * messages do not count as 'in transmit'.
+ */
+ int in_transmit;
+
+ /**
+ * Has this address been validated yet?
+ */
+ int validated;
+
+ /**
+ * How often have we tried to connect using this plugin? Used to
+ * discriminate against addresses that do not work well.
+ * FIXME: not yet used, but should be!
+ */
+ unsigned int connect_attempts;
+
+ /**
+ * DV distance to this peer (1 if no DV is used).
+ * FIXME: need to set this from transport plugins!
+ */
+ uint32_t distance;
+
+};
+
+
+/**
+ * Entry in linked list of network addresses for ourselves.
*/
-struct AddressList
+struct OwnAddressList
{
/**
* This is a linked list.
*/
- struct AddressList *next;
+ struct OwnAddressList *next;
/**
* The address, actually a pointer to the end
* of this struct. Do not free!
*/
- void *addr;
-
+ const void *addr;
+
/**
* How long until we auto-expire this address (unless it is
* re-confirmed by the transport)?
/**
* List of our known addresses for this transport.
*/
- struct AddressList *addresses;
+ struct OwnAddressList *addresses;
/**
* Environment this transport service is using
*/
GNUNET_SCHEDULER_TaskIdentifier address_update_task;
-
/**
* Set to GNUNET_YES if we need to scrap the existing
* list of "addresses" and start fresh when we receive
* to the list and wait for the commit call.
*/
int rebuild;
+
};
-struct NeighborList;
+struct NeighbourList;
/**
- * For each neighbor we keep a list of messages
- * that we still want to transmit to the neighbor.
+ * For each neighbour we keep a list of messages
+ * that we still want to transmit to the neighbour.
*/
struct MessageQueue
{
/**
- * This is a linked list.
+ * This is a doubly linked list.
*/
struct MessageQueue *next;
/**
- * The message we want to transmit.
+ * This is a doubly linked list.
+ */
+ struct MessageQueue *prev;
+
+ /**
+ * The message(s) we want to transmit, GNUNET_MessageHeader(s)
+ * stuck together in memory. Allocated at the end of this struct.
*/
- struct GNUNET_MessageHeader *message;
+ const char *message_buf;
+
+ /**
+ * Size of the message buf
+ */
+ size_t message_buf_size;
/**
* Client responsible for queueing the message;
- * used to check that a client has not two messages
+ * used to check that a client has no two messages
* pending for the same target. Can be NULL.
*/
struct TransportClient *client;
/**
- * Neighbor this entry belongs to.
+ * Using which specific address should we send this message?
+ */
+ struct ForeignAddressList *specific_address;
+
+ /**
+ * Peer ID of the Neighbour this entry belongs to.
*/
- struct NeighborList *neighbor;
+ struct GNUNET_PeerIdentity neighbour_id;
/**
* Plugin that we used for the transmission.
*/
struct TransportPlugin *plugin;
+ /**
+ * At what time should we fail?
+ */
+ struct GNUNET_TIME_Absolute timeout;
+
/**
* Internal message of the transport system that should not be
* included in the usual SEND-SEND_OK transmission confirmation
/**
- * For a given Neighbor, which plugins are available
+ * For a given Neighbour, which plugins are available
* to talk to this peer and what are their costs?
*/
struct ReadyList
{
-
/**
* This is a linked list.
*/
struct TransportPlugin *plugin;
/**
- * Neighbor this entry belongs to.
- */
- struct NeighborList *neighbor;
-
- /**
- * What was the last latency observed for this plugin
- * and peer? Invalid if connected is GNUNET_NO.
- */
- struct GNUNET_TIME_Relative latency;
-
- /**
- * If we did not successfully transmit a message to the given peer
- * via this connection during the specified time, we should consider
- * the connection to be dead. This is used in the case that a TCP
- * transport simply stalls writing to the stream but does not
- * formerly get a signal that the other peer died.
- */
- struct GNUNET_TIME_Absolute timeout;
-
- /**
- * Is this plugin currently connected? The first time
- * we transmit or send data to a peer via a particular
- * plugin, we set this to GNUNET_YES. If we later get
- * an error (disconnect notification or transmission
- * failure), we set it back to GNUNET_NO. Each time the
- * value is set to GNUNET_YES, we increment the
- * "connect_attempts" counter. If that one reaches a
- * particular threshold, we consider the plugin to not
- * be working properly at this time for the given peer
- * and remove it from the eligible list.
- */
- int connected;
-
- /**
- * How often have we tried to connect using this plugin?
- */
- unsigned int connect_attempts;
-
- /**
- * Is this plugin ready to transmit to the specific target?
- * GNUNET_NO if not. Initially, all plugins are marked ready. If a
- * transmission is in progress, "transmit_ready" is set to
- * GNUNET_NO.
+ * Transport addresses, latency, and readiness for
+ * this particular plugin.
*/
- int transmit_ready;
+ struct ForeignAddressList *addresses;
};
/**
- * Entry in linked list of all of our current neighbors.
+ * Entry in linked list of all of our current neighbours.
*/
-struct NeighborList
+struct NeighbourList
{
/**
* This is a linked list.
*/
- struct NeighborList *next;
+ struct NeighbourList *next;
/**
* Which of our transports is connected to this peer
struct ReadyList *plugins;
/**
- * List of messages we would like to send to this peer;
+ * Head of list of messages we would like to send to this peer;
* must contain at most one message per client.
*/
- struct MessageQueue *messages;
+ struct MessageQueue *messages_head;
/**
- * Identity of this neighbor.
- */
- struct GNUNET_PeerIdentity id;
-
- /*
- * Opaque addr of this peer, only known to the plugin
+ * Tail of list of messages we would like to send to this peer; must
+ * contain at most one message per client.
*/
- char *addr;
+ struct MessageQueue *messages_tail;
- /*
- * Size of addr
+ /**
+ * Identity of this neighbour.
*/
- size_t addr_len;
+ struct GNUNET_PeerIdentity id;
/**
* ID of task scheduled to run when this peer is about to
*/
GNUNET_SCHEDULER_TaskIdentifier timeout_task;
+ /**
+ * ID of task scheduled to run when we should retry transmitting
+ * the head of the message queue.
+ */
+ GNUNET_SCHEDULER_TaskIdentifier retry_task;
+
/**
* How long until we should consider this peer dead
* (if we don't receive another message in the
struct GNUNET_TIME_Absolute peer_timeout;
/**
- * At what time did we reset last_received last?
- */
- struct GNUNET_TIME_Absolute last_quota_update;
-
- /**
- * At what time should we try to again add plugins to
- * our ready list?
+ * Tracker for inbound bandwidth.
*/
- struct GNUNET_TIME_Absolute retry_plugins_time;
+ struct GNUNET_BANDWIDTH_Tracker in_tracker;
/**
- * How many bytes have we received since the "last_quota_update"
- * timestamp?
- */
- uint64_t last_received;
-
- /**
- * Global quota for inbound traffic for the neighbor in bytes/ms.
+ * The latency we have seen for this particular address for
+ * this particular peer. This latency may have been calculated
+ * over multiple transports. This value reflects how long it took
+ * us to receive a response when SENDING via this particular
+ * transport/neighbour/address combination!
+ *
+ * FIXME: we need to periodically send PINGs to update this
+ * latency (at least more often than the current "huge" (11h?)
+ * update interval).
*/
- uint32_t quota_in;
+ struct GNUNET_TIME_Relative latency;
/**
* How often has the other peer (recently) violated the
unsigned int quota_violation_count;
/**
- * Have we seen an ACK from this neighbor in the past?
- * (used to make up a fake ACK for clients connecting after
- * the neighbor connected to us).
+ * DV distance to this peer (1 if no DV is used).
*/
- int saw_ack;
+ uint32_t distance;
- /* The latency we have seen for this particular address for
- * this particular peer. This latency may have been calculated
- * over multiple transports. This value reflects how long it took
- * us to receive a response when SENDING via this particular
- * transport/neighbor/address combination!
+ /**
+ * Have we seen an PONG from this neighbour in the past (and
+ * not had a disconnect since)?
*/
- struct GNUNET_TIME_RelativeNBO latency;
+ int received_pong;
};
/**
* Message used to ask a peer to validate receipt (to check an address
- * from a HELLO). Followed by the address used. Note that the
- * recipients response does not affirm that he has this address,
- * only that he got the challenge message.
+ * from a HELLO).
*/
struct TransportPingMessage
{
*/
struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded signer;
+ /**
+ * Size of address appended to this message
+ */
+ size_t addrlen;
+
};
+
/**
- * Linked list of messages to be transmitted to
- * the client. Each entry is followed by the
- * actual message.
+ * Linked list of messages to be transmitted to the client. Each
+ * entry is followed by the actual message.
*/
struct ClientMessageQueueEntry
{
/**
- * This is a linked list.
+ * This is a doubly-linked list.
*/
struct ClientMessageQueueEntry *next;
+
+ /**
+ * This is a doubly-linked list.
+ */
+ struct ClientMessageQueueEntry *prev;
};
*/
struct ClientMessageQueueEntry *message_queue_tail;
+ /**
+ * Current transmit request handle.
+ */
+ struct GNUNET_CONNECTION_TransmitHandle *th;
+
/**
* Is a call to "transmit_send_continuation" pending? If so, we
* must not free this struct (even if the corresponding client
/**
- * For each HELLO, we may have to validate multiple addresses;
- * each address gets its own request entry.
+ * Entry in map of all HELLOs awaiting validation.
*/
-struct ValidationAddress
+struct ValidationEntry
{
+
/**
- * This is a linked list.
+ * The address, actually a pointer to the end
+ * of this struct. Do not free!
*/
- struct ValidationAddress *next;
+ const void *addr;
/**
* Name of the transport.
char *transport_name;
/**
- * When should this validated address expire?
+ * The public key of the peer.
+ */
+ struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded publicKey;
+
+ /**
+ * ID of task that will clean up this entry if we don't succeed
+ * with the validation first.
*/
- struct GNUNET_TIME_Absolute expiration;
+ GNUNET_SCHEDULER_TaskIdentifier timeout_task;
/**
- * Length of the address we are validating.
+ * At what time did we send this validation?
*/
- size_t addr_len;
+ struct GNUNET_TIME_Absolute send_time;
/**
- * Challenge number we used.
+ * Length of addr.
*/
- uint32_t challenge;
+ size_t addrlen;
/**
- * Set to GNUNET_YES if the challenge was met,
- * GNUNET_SYSERR if we know it failed, GNUNET_NO
- * if we are waiting on a response.
+ * Challenge number we used.
*/
- int ok;
+ uint32_t challenge;
+
};
/**
- * Entry in linked list of all HELLOs awaiting validation.
+ * Context of currently active requests to peerinfo
+ * for validation of HELLOs.
*/
-struct ValidationList
+struct CheckHelloValidatedContext
{
/**
- * This is a linked list.
+ * This is a doubly-linked list.
*/
- struct ValidationList *next;
+ struct CheckHelloValidatedContext *next;
/**
- * Linked list with one entry per address from the HELLO
- * that needs to be validated.
+ * This is a doubly-linked list.
*/
- struct ValidationAddress *addresses;
+ struct CheckHelloValidatedContext *prev;
/**
- * The public key of the peer.
+ * Hello that we are validating.
*/
- struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded publicKey;
+ const struct GNUNET_HELLO_Message *hello;
/**
- * When does this record time-out? (assuming the
- * challenge goes unanswered)
+ * Context for peerinfo iteration.
+ * NULL after we are done processing peerinfo's information.
*/
- struct GNUNET_TIME_Absolute timeout;
+ struct GNUNET_PEERINFO_IteratorContext *piter;
+
+ /**
+ * Was a HELLO known for this peer to peerinfo?
+ */
+ int hello_known;
};
-
/**
- * HELLOs awaiting validation.
+ * Struct for keeping information about addresses to validate
+ * so that we can re-use for sending around ping's and receiving
+ * pongs periodically to keep connections alive and also better
+ * estimate latency of connections.
+ *
*/
-static struct ValidationList *pending_validations;
+struct PeriodicValidationContext
+{
+
+ /**
+ * The address we are keeping alive
+ */
+ struct ForeignAddressList *foreign_address;
+
+ /**
+ * The name of the transport
+ */
+ char *transport;
+
+ /**
+ * Public Key of the peer to re-validate
+ */
+ struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded publicKey;
+
+};
/**
* Our HELLO message.
static struct GNUNET_HELLO_Message *our_hello;
/**
- * "version" of "our_hello". Used to see if a given
- * neighbor has already been sent the latest version
- * of our HELLO message.
+ * "version" of "our_hello". Used to see if a given neighbour has
+ * already been sent the latest version of our HELLO message.
*/
static unsigned int our_hello_version;
static struct GNUNET_SERVER_Handle *server;
/**
- * All known neighbors and their HELLOs.
+ * All known neighbours and their HELLOs.
*/
-static struct NeighborList *neighbors;
+static struct NeighbourList *neighbours;
/**
- * Number of neighbors we'd like to have.
+ * Number of neighbours we'd like to have.
*/
static uint32_t max_connect_per_transport;
+/**
+ * Head of linked list.
+ */
+static struct CheckHelloValidatedContext *chvc_head;
+
+/**
+ * Tail of linked list.
+ */
+static struct CheckHelloValidatedContext *chvc_tail;
+
+/**
+ * Map of PeerIdentities to 'struct ValidationEntry*'s (addresses
+ * of the given peer that we are currently validating).
+ */
+static struct GNUNET_CONTAINER_MultiHashMap *validation_map;
+
+/**
+ * Handle for reporting statistics.
+ */
+static struct GNUNET_STATISTICS_Handle *stats;
+
+
+/**
+ * The peer specified by the given neighbour has timed-out or a plugin
+ * has disconnected. We may either need to do nothing (other plugins
+ * still up), or trigger a full disconnect and clean up. This
+ * function updates our state and do the necessary notifications.
+ * Also notifies our clients that the neighbour is now officially
+ * gone.
+ *
+ * @param n the neighbour list entry for the peer
+ * @param check should we just check if all plugins
+ * disconnected or must we ask all plugins to
+ * disconnect?
+ */
+static void disconnect_neighbour (struct NeighbourList *n, int check);
+
+/**
+ * Check the ready list for the given neighbour and if a plugin is
+ * ready for transmission (and if we have a message), do so!
+ *
+ * @param neighbour target peer for which to transmit
+ */
+static void try_transmission_to_peer (struct NeighbourList *neighbour);
+
/**
- * Find an entry in the neighbor list for a particular peer.
+ * Find an entry in the neighbour list for a particular peer.
* if sender_address is not specified (NULL) then return the
* first matching entry. If sender_address is specified, then
* make sure that the address and address_len also matches.
- *
+ *
+ * FIXME: This description does not fit the function.
+ *
* @return NULL if not found.
*/
-static struct NeighborList *
-find_neighbor (const struct GNUNET_PeerIdentity *key, const char *sender_address,
- size_t sender_address_len)
+static struct NeighbourList *
+find_neighbour (const struct GNUNET_PeerIdentity *key)
{
- struct NeighborList *head = neighbors;
- if (sender_address == NULL)
- {
- while ((head != NULL) &&
- (0 != memcmp (key, &head->id, sizeof (struct GNUNET_PeerIdentity))))
- head = head->next;
- }
- else
- {
- while ((head != NULL) &&
- (0 != memcmp (key, &head->id, sizeof (struct GNUNET_PeerIdentity))) &&
- (sender_address_len != head->addr_len) &&
- (0 != memcmp (sender_address, &head->addr, head->addr_len)))
- head = head->next;
- }
+ struct NeighbourList *head = neighbours;
+
+ while ((head != NULL) &&
+ (0 != memcmp (key, &head->id, sizeof (struct GNUNET_PeerIdentity))))
+ head = head->next;
return head;
}
/**
- * Update the quota values for the given neighbor now.
- */
-static void
-update_quota (struct NeighborList *n)
-{
- struct GNUNET_TIME_Relative delta;
- uint64_t allowed;
- uint64_t remaining;
-
- delta = GNUNET_TIME_absolute_get_duration (n->last_quota_update);
- if (delta.value < MIN_QUOTA_REFRESH_TIME)
- return; /* not enough time passed for doing quota update */
- allowed = delta.value * n->quota_in;
- if (n->last_received < allowed)
- {
- remaining = allowed - n->last_received;
- if (n->quota_in > 0)
- remaining /= n->quota_in;
- else
- remaining = 0;
- if (remaining > MAX_BANDWIDTH_CARRY)
- remaining = MAX_BANDWIDTH_CARRY;
- n->last_received = 0;
- n->last_quota_update = GNUNET_TIME_absolute_get ();
- n->last_quota_update.value -= remaining;
- if (n->quota_violation_count > 0)
- n->quota_violation_count--;
- }
- else
- {
- n->last_received -= allowed;
- n->last_quota_update = GNUNET_TIME_absolute_get ();
- if (n->last_received > allowed)
- {
- /* more than twice the allowed rate! */
- n->quota_violation_count += 10;
- }
- }
-}
-
-
-/**
- * Function called to notify a client about the socket
- * being ready to queue more data. "buf" will be
- * NULL and "size" zero if the socket was closed for
- * writing in the meantime.
+ * Function called to notify a client about the socket being ready to
+ * queue more data. "buf" will be NULL and "size" zero if the socket
+ * was closed for writing in the meantime.
*
* @param cls closure
* @param size number of bytes available in buf
uint16_t msize;
size_t tsize;
const struct GNUNET_MessageHeader *msg;
- struct GNUNET_CONNECTION_TransmitHandle *th;
char *cbuf;
+ client->th = NULL;
if (buf == NULL)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
/* fatal error with client, free message queue! */
while (NULL != (q = client->message_queue_head))
{
- client->message_queue_head = q->next;
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# bytes discarded (could not transmit to client)"),
+ ntohs (((const struct GNUNET_MessageHeader*)&q[1])->size),
+ GNUNET_NO);
+ GNUNET_CONTAINER_DLL_remove (client->message_queue_head,
+ client->message_queue_tail,
+ q);
GNUNET_free (q);
}
- client->message_queue_tail = NULL;
client->message_count = 0;
return 0;
}
"Transmitting message of type %u to client.\n",
ntohs (msg->type));
#endif
- client->message_queue_head = q->next;
- if (q->next == NULL)
- client->message_queue_tail = NULL;
+ GNUNET_CONTAINER_DLL_remove (client->message_queue_head,
+ client->message_queue_tail,
+ q);
memcpy (&cbuf[tsize], msg, msize);
tsize += msize;
GNUNET_free (q);
if (NULL != q)
{
GNUNET_assert (msize >= sizeof (struct GNUNET_MessageHeader));
- th = GNUNET_SERVER_notify_transmit_ready (client->client,
- msize,
- GNUNET_TIME_UNIT_FOREVER_REL,
- &transmit_to_client_callback,
- client);
- GNUNET_assert (th != NULL);
+ client->th = GNUNET_SERVER_notify_transmit_ready (client->client,
+ msize,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &transmit_to_client_callback,
+ client);
+ GNUNET_assert (client->th != NULL);
}
return tsize;
}
{
struct ClientMessageQueueEntry *q;
uint16_t msize;
- struct GNUNET_CONNECTION_TransmitHandle *th;
if ((client->message_count >= MAX_PENDING) && (GNUNET_YES == may_drop))
{
/* TODO: call to statistics... */
return;
}
- client->message_count++;
msize = ntohs (msg->size);
GNUNET_assert (msize >= sizeof (struct GNUNET_MessageHeader));
q = GNUNET_malloc (sizeof (struct ClientMessageQueueEntry) + msize);
memcpy (&q[1], msg, msize);
- /* append to message queue */
- if (client->message_queue_tail == NULL)
+ GNUNET_CONTAINER_DLL_insert_after (client->message_queue_head,
+ client->message_queue_tail,
+ client->message_queue_tail,
+ q);
+ client->message_count++;
+ if (client->th == NULL)
{
- client->message_queue_tail = q;
- }
- else
- {
- client->message_queue_tail->next = q;
- client->message_queue_tail = q;
- }
- if (client->message_queue_head == NULL)
- {
- client->message_queue_head = q;
- th = GNUNET_SERVER_notify_transmit_ready (client->client,
- msize,
- GNUNET_TIME_UNIT_FOREVER_REL,
- &transmit_to_client_callback,
- client);
- GNUNET_assert (th != NULL);
+ client->th = GNUNET_SERVER_notify_transmit_ready (client->client,
+ msize,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &transmit_to_client_callback,
+ client);
+ GNUNET_assert (client->th != NULL);
}
}
/**
- * Find alternative plugins for communication.
+ * Transmit a 'SEND_OK' notification to the given client for the
+ * given neighbour.
*
- * @param neighbor for which neighbor should we try to find
- * more plugins?
+ * @param client who to notify
+ * @param n neighbour to notify about
+ * @param result status code for the transmission request
*/
static void
-try_alternative_plugins (struct NeighborList *neighbor)
+transmit_send_ok (struct TransportClient *client,
+ struct NeighbourList *n,
+ int result)
{
- struct ReadyList *rl;
-
- if ((neighbor->plugins != NULL) &&
- (neighbor->retry_plugins_time.value >
- GNUNET_TIME_absolute_get ().value))
- return; /* don't try right now */
- neighbor->retry_plugins_time
- = GNUNET_TIME_relative_to_absolute (PLUGIN_RETRY_FREQUENCY);
-
- rl = neighbor->plugins;
- while (rl != NULL)
- {
- if (rl->connect_attempts > 0)
- rl->connect_attempts--; /* amnesty */
- rl = rl->next;
- }
+ struct SendOkMessage send_ok_msg;
+ send_ok_msg.header.size = htons (sizeof (send_ok_msg));
+ send_ok_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
+ send_ok_msg.success = htonl (result);
+ send_ok_msg.latency = GNUNET_TIME_relative_hton (n->latency);
+ send_ok_msg.peer = n->id;
+ transmit_to_client (client, &send_ok_msg.header, GNUNET_NO);
}
-/**
- * The peer specified by the given neighbor has timed-out or a plugin
- * has disconnected. We may either need to do nothing (other plugins
- * still up), or trigger a full disconnect and clean up. This
- * function updates our state and do the necessary notifications.
- * Also notifies our clients that the neighbor is now officially
- * gone.
- *
- * @param n the neighbor list entry for the peer
- * @param check should we just check if all plugins
- * disconnected or must we ask all plugins to
- * disconnect?
- */
-static void disconnect_neighbor (struct NeighborList *n, int check);
-
-
-/**
- * Check the ready list for the given neighbor and
- * if a plugin is ready for transmission (and if we
- * have a message), do so!
- *
- * @param neighbor target peer for which to check the plugins
- */
-static void try_transmission_to_peer (struct NeighborList *neighbor);
-
-
/**
* Function called by the GNUNET_TRANSPORT_TransmitFunction
* upon "completion" of a send request. This tells the API
int result)
{
struct MessageQueue *mq = cls;
- struct ReadyList *rl;
- struct SendOkMessage send_ok_msg;
- struct NeighborList *n;
+ struct NeighbourList *n;
- GNUNET_assert (mq != NULL);
- n = mq->neighbor;
- GNUNET_assert (n != NULL);
- GNUNET_assert (0 ==
- memcmp (&n->id, target,
- sizeof (struct GNUNET_PeerIdentity)));
- rl = n->plugins;
- while ((rl != NULL) && (rl->plugin != mq->plugin))
- rl = rl->next;
- GNUNET_assert (rl != NULL);
if (result == GNUNET_OK)
{
- rl->timeout =
- GNUNET_TIME_relative_to_absolute
- (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# bytes successfully transmitted by plugins"),
+ mq->message_buf_size,
+ GNUNET_NO);
}
else
{
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Transmission to peer `%s' failed, marking connection as down.\n",
- GNUNET_i2s (target));
- rl->connected = GNUNET_NO;
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# bytes with transmission failure by plugins"),
+ mq->message_buf_size,
+ GNUNET_NO);
+ }
+ n = find_neighbour(&mq->neighbour_id);
+ GNUNET_assert (n != NULL);
+ if (mq->specific_address != NULL)
+ {
+ if (result == GNUNET_OK)
+ {
+ mq->specific_address->timeout =
+ GNUNET_TIME_relative_to_absolute
+ (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
+ mq->specific_address->connected = GNUNET_YES;
+ }
+ else
+ {
+ mq->specific_address->connected = GNUNET_NO;
+ }
+ if (! mq->internal_msg)
+ mq->specific_address->in_transmit = GNUNET_NO;
}
- if (!mq->internal_msg)
- rl->transmit_ready = GNUNET_YES;
if (mq->client != NULL)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Notifying client %p about failed transission to peer `%4s'.\n",
- mq->client, GNUNET_i2s (target));
- send_ok_msg.header.size = htons (sizeof (send_ok_msg));
- send_ok_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
- send_ok_msg.success = htonl (result);
- send_ok_msg.peer = n->id;
- transmit_to_client (mq->client, &send_ok_msg.header, GNUNET_NO);
- }
- GNUNET_free (mq->message);
+ transmit_send_ok (mq->client, n, result);
GNUNET_free (mq);
- /* one plugin just became ready again, try transmitting
- another message (if available) */
- if (result == GNUNET_OK)
- try_transmission_to_peer (n);
- else
- disconnect_neighbor (n, GNUNET_YES);
+ try_transmission_to_peer (n);
}
/**
- * Check the ready list for the given neighbor and
- * if a plugin is ready for transmission (and if we
- * have a message), do so!
+ * Find an address in any of the available transports for
+ * the given neighbour that would be good for message
+ * transmission. This is essentially the transport selection
+ * routine.
+ *
+ * @param neighbour for whom to select an address
+ * @return selected address, NULL if we have none
*/
-static void
-try_transmission_to_peer (struct NeighborList *neighbor)
+struct ForeignAddressList *
+find_ready_address(struct NeighbourList *neighbour)
{
- struct ReadyList *pos;
- struct GNUNET_TIME_Relative min_latency;
- struct ReadyList *rl;
- struct MessageQueue *mq;
- struct GNUNET_TIME_Absolute now;
+ struct ReadyList *head = neighbour->plugins;
+ struct ForeignAddressList *addresses;
+ struct GNUNET_TIME_Absolute now = GNUNET_TIME_absolute_get ();
+ struct ForeignAddressList *best_address;
- if (neighbor->messages == NULL)
- return; /* nothing to do */
- try_alternative_plugins (neighbor);
- min_latency = GNUNET_TIME_UNIT_FOREVER_REL;
- rl = NULL;
- mq = neighbor->messages;
- now = GNUNET_TIME_absolute_get ();
- pos = neighbor->plugins;
- while (pos != NULL)
+ best_address = NULL;
+ while (head != NULL)
{
- /* set plugins that are inactive for a long time back to disconnected */
- if ((pos->timeout.value < now.value) && (pos->connected == GNUNET_YES))
+ addresses = head->addresses;
+ while (addresses != NULL)
{
+ if ( (addresses->timeout.value < now.value) &&
+ (addresses->connected == GNUNET_YES) )
+ {
#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Marking long-time inactive connection to `%4s' as down.\n",
- GNUNET_i2s (&neighbor->id));
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Marking long-time inactive connection to `%4s' as down.\n",
+ GNUNET_i2s (&neighbour->id));
#endif
- pos->connected = GNUNET_NO;
+ addresses->connected = GNUNET_NO;
+ }
+ addresses = addresses->next;
}
- if (((GNUNET_YES == pos->transmit_ready) ||
- (mq->internal_msg)) &&
- (pos->connect_attempts < MAX_CONNECT_RETRY) &&
- ((rl == NULL) || (min_latency.value > pos->latency.value)))
+
+ addresses = head->addresses;
+ while (addresses != NULL)
{
- rl = pos;
- min_latency = pos->latency;
+ if ( ( (best_address == NULL) ||
+ (addresses->connected == GNUNET_YES) ||
+ (best_address->connected == GNUNET_NO) ) &&
+ (addresses->in_transmit == GNUNET_NO) &&
+ ( (best_address == NULL) ||
+ (addresses->latency.value < best_address->latency.value)) )
+ best_address = addresses;
+ /* FIXME: also give lower-latency addresses that are not
+ connected a chance some times... */
+ addresses = addresses->next;
}
- pos = pos->next;
+ head = head->next;
}
- if (rl == NULL)
- {
#if DEBUG_TRANSPORT
+ if (best_address != NULL)
+ {
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "No plugin ready to transmit message\n");
-#endif
- return; /* nobody ready */
+ "Best address found has latency of %llu ms.\n",
+ best_address->latency.value);
}
- if (GNUNET_NO == rl->connected)
- {
- rl->connect_attempts++;
- rl->connected = GNUNET_YES;
+#endif
+ return best_address;
+
+}
+
+
+/**
+ * We should re-try transmitting to the given peer,
+ * hopefully we've learned something in the meantime.
+ */
+static void
+retry_transmission_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct NeighbourList *n = cls;
+
+ n->retry_task = GNUNET_SCHEDULER_NO_TASK;
+ try_transmission_to_peer (n);
+}
+
+
+/**
+ * Check the ready list for the given neighbour and if a plugin is
+ * ready for transmission (and if we have a message), do so!
+ *
+ * @param neighbour target peer for which to transmit
+ */
+static void
+try_transmission_to_peer (struct NeighbourList *neighbour)
+{
+ struct ReadyList *rl;
+ struct MessageQueue *mq;
+ struct GNUNET_TIME_Relative timeout;
+
+ if (neighbour->messages_head == NULL)
+ return; /* nothing to do */
+ rl = NULL;
+ mq = neighbour->messages_head;
+ /* FIXME: support bi-directional use of TCP */
+ if (mq->specific_address == NULL)
+ mq->specific_address = find_ready_address(neighbour);
+ if (mq->specific_address == NULL)
+ {
+ timeout = GNUNET_TIME_absolute_get_remaining (mq->timeout);
+ if (timeout.value == 0)
+ {
+#if DEBUG_TRANSPORT
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "No destination address available to transmit message of size %u to peer `%4s'\n",
+ mq->message_buf_size,
+ GNUNET_i2s (&mq->neighbour_id));
+#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# bytes in message queue for other peers"),
+ -mq->message_buf_size,
+ GNUNET_NO);
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# bytes discarded (no destination address available)"),
+ mq->message_buf_size,
+ GNUNET_NO);
+ if (mq->client != NULL)
+ transmit_send_ok (mq->client, neighbour, GNUNET_NO);
+ GNUNET_CONTAINER_DLL_remove (neighbour->messages_head,
+ neighbour->messages_tail,
+ mq);
+ GNUNET_free (mq);
+ return; /* nobody ready */
+ }
+ if (neighbour->retry_task != GNUNET_SCHEDULER_NO_TASK)
+ GNUNET_SCHEDULER_cancel (sched,
+ neighbour->retry_task);
+ neighbour->retry_task = GNUNET_SCHEDULER_add_delayed (sched,
+ timeout,
+ &retry_transmission_task,
+ neighbour);
#if DEBUG_TRANSPORT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Establishing fresh connection with `%4s' via plugin `%s'\n",
- GNUNET_i2s (&neighbor->id), rl->plugin->short_name);
+ "No validated destination address available to transmit message of size %u to peer `%4s', will wait %llums to find an address.\n",
+ mq->message_buf_size,
+ GNUNET_i2s (&mq->neighbour_id),
+ timeout.value);
#endif
- }
- neighbor->messages = mq->next;
+ /* FIXME: might want to trigger peerinfo lookup here
+ (unless that's already pending...) */
+ return;
+ }
+ GNUNET_CONTAINER_DLL_remove (neighbour->messages_head,
+ neighbour->messages_tail,
+ mq);
+ if (mq->specific_address->connected == GNUNET_NO)
+ mq->specific_address->connect_attempts++;
+ rl = mq->specific_address->ready_list;
mq->plugin = rl->plugin;
if (!mq->internal_msg)
- rl->transmit_ready = GNUNET_NO;
+ mq->specific_address->in_transmit = GNUNET_YES;
#if DEBUG_TRANSPORT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Giving message of type `%u' for `%4s' to plugin `%s'\n",
- ntohs (mq->message->type),
- GNUNET_i2s (&neighbor->id), rl->plugin->short_name);
+ "Sending message of size %u for `%4s' to `%s' via plugin `%s'\n",
+ mq->message_buf_size,
+ GNUNET_i2s (&neighbour->id),
+ GNUNET_a2s (mq->specific_address->addr,
+ mq->specific_address->addrlen),
+ rl->plugin->short_name);
#endif
-
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# bytes in message queue for other peers"),
+ -mq->message_buf_size,
+ GNUNET_NO);
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# bytes transmitted to other peers"),
+ mq->message_buf_size,
+ GNUNET_NO);
rl->plugin->api->send (rl->plugin->api->cls,
- &neighbor->id,
- mq->message,
- mq->priority,
- GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
- rl->neighbor->addr,
- rl->neighbor->addr_len,
- GNUNET_NO,
- &transmit_send_continuation, mq);
+ &mq->neighbour_id,
+ mq->message_buf,
+ mq->message_buf_size,
+ mq->priority,
+ GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
+ mq->specific_address->addr,
+ mq->specific_address->addrlen,
+ GNUNET_YES /* FIXME: sometimes, we want to be more tolerant here! */,
+ &transmit_send_continuation, mq);
}
* Send the specified message to the specified peer.
*
* @param client source of the transmission request (can be NULL)
+ * @param peer_address ForeignAddressList where we should send this message
* @param priority how important is the message
- * @param msg message to send
- * @param is_internal is this an internal message
- * @param neighbor handle to the neighbor for transmission
+ * @param timeout how long do we have to transmit?
+ * @param message_buf message(s) to send GNUNET_MessageHeader(s)
+ * @param message_buf_size total size of all messages in message_buf
+ * @param is_internal is this an internal message; these are pre-pended and
+ * also do not count for plugins being "ready" to transmit
+ * @param neighbour handle to the neighbour for transmission
*/
static void
transmit_to_peer (struct TransportClient *client,
+ struct ForeignAddressList *peer_address,
unsigned int priority,
- const struct GNUNET_MessageHeader *msg,
- int is_internal, struct NeighborList *neighbor)
+ struct GNUNET_TIME_Relative timeout,
+ const char *message_buf,
+ size_t message_buf_size,
+ int is_internal, struct NeighbourList *neighbour)
{
struct MessageQueue *mq;
- struct MessageQueue *mqe;
- struct GNUNET_MessageHeader *m;
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- _("Sending message of type %u to peer `%4s'\n"),
- ntohs (msg->type), GNUNET_i2s (&neighbor->id));
-#endif
+#if EXTRA_CHECKS
if (client != NULL)
{
/* check for duplicate submission */
- mq = neighbor->messages;
+ mq = neighbour->messages_head;
while (NULL != mq)
{
if (mq->client == client)
{
/* client transmitted to same peer twice
- before getting SendOk! */
+ before getting SEND_OK! */
GNUNET_break (0);
return;
}
mq = mq->next;
}
}
- mq = GNUNET_malloc (sizeof (struct MessageQueue));
+#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# bytes in message queue for other peers"),
+ message_buf_size,
+ GNUNET_NO);
+ mq = GNUNET_malloc (sizeof (struct MessageQueue) + message_buf_size);
+ mq->specific_address = peer_address;
mq->client = client;
- m = GNUNET_malloc (ntohs (msg->size));
- memcpy (m, msg, ntohs (msg->size));
- mq->message = m;
- mq->neighbor = neighbor;
+ memcpy (&mq[1], message_buf, message_buf_size);
+ mq->message_buf = (const char*) &mq[1];
+ mq->message_buf_size = message_buf_size;
+ memcpy(&mq->neighbour_id, &neighbour->id, sizeof(struct GNUNET_PeerIdentity));
mq->internal_msg = is_internal;
mq->priority = priority;
-
- /* find tail */
- mqe = neighbor->messages;
- if (mqe != NULL)
- while (mqe->next != NULL)
- mqe = mqe->next;
- if (mqe == NULL)
- {
- /* new head */
- neighbor->messages = mq;
- }
+ mq->timeout = GNUNET_TIME_relative_to_absolute (timeout);
+ if (is_internal)
+ GNUNET_CONTAINER_DLL_insert (neighbour->messages_head,
+ neighbour->messages_tail,
+ mq);
else
- {
- /* append */
- mqe->next = mq;
- }
- try_transmission_to_peer (neighbor);
+ GNUNET_CONTAINER_DLL_insert_after (neighbour->messages_head,
+ neighbour->messages_tail,
+ neighbour->messages_tail,
+ mq);
+ try_transmission_to_peer (neighbour);
}
struct GeneratorContext
{
struct TransportPlugin *plug_pos;
- struct AddressList *addr_pos;
+ struct OwnAddressList *addr_pos;
struct GNUNET_TIME_Absolute expiration;
};
gc->addr_pos = (gc->plug_pos != NULL) ? gc->plug_pos->addresses : NULL;
}
if (NULL == gc->plug_pos)
- return 0;
+ {
+
+ return 0;
+ }
ret = GNUNET_HELLO_add_address (gc->plug_pos->short_name,
gc->expiration,
gc->addr_pos->addr,
{
struct GNUNET_HELLO_Message *hello;
struct TransportClient *cpos;
- struct NeighborList *npos;
+ struct NeighbourList *npos;
struct GeneratorContext gc;
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
- "Refreshing my `%s'\n", "HELLO");
-#endif
gc.plug_pos = plugins;
gc.addr_pos = plugins != NULL ? plugins->addresses : NULL;
gc.expiration = GNUNET_TIME_relative_to_absolute (HELLO_ADDRESS_EXPIRATION);
hello = GNUNET_HELLO_create (&my_public_key, &address_generator, &gc);
+#if DEBUG_TRANSPORT
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
+ "Refreshed my `%s', new size is %d\n", "HELLO", GNUNET_HELLO_size(hello));
+#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# refreshed my HELLO"),
+ 1,
+ GNUNET_NO);
cpos = clients;
while (cpos != NULL)
{
our_hello = hello;
our_hello_version++;
GNUNET_PEERINFO_add_peer (cfg, sched, &my_identity, our_hello);
- npos = neighbors;
+ npos = neighbours;
while (npos != NULL)
{
#if DEBUG_TRANSPORT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
- "Transmitting updated `%s' to neighbor `%4s'\n",
+ "Transmitting updated `%s' to neighbour `%4s'\n",
"HELLO", GNUNET_i2s (&npos->id));
#endif
- transmit_to_peer (NULL, 0,
- (const struct GNUNET_MessageHeader *) our_hello,
- GNUNET_YES, npos);
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# transmitted my HELLO to other peers"),
+ 1,
+ GNUNET_NO);
+ transmit_to_peer (NULL, NULL, 0,
+ HELLO_ADDRESS_EXPIRATION,
+ (const char *) our_hello,
+ GNUNET_HELLO_size(our_hello),
+ GNUNET_NO, npos);
npos = npos->next;
}
}
struct GNUNET_TIME_Relative min_remaining;
struct GNUNET_TIME_Relative remaining;
struct GNUNET_TIME_Absolute now;
- struct AddressList *pos;
- struct AddressList *prev;
- struct AddressList *next;
+ struct OwnAddressList *pos;
+ struct OwnAddressList *prev;
+ struct OwnAddressList *next;
int expired;
if (plugin->address_update_task != GNUNET_SCHEDULER_NO_TASK)
struct GNUNET_TIME_Relative expires)
{
struct TransportPlugin *p = cls;
- struct AddressList *al;
+ struct OwnAddressList *al;
struct GNUNET_TIME_Absolute abex;
abex = GNUNET_TIME_relative_to_absolute (expires);
}
al = al->next;
}
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Plugin `%s' informs us about a new address `%s'\n", name,
- GNUNET_a2s (addr, addrlen));
-#endif
- al = GNUNET_malloc (sizeof (struct AddressList) + addrlen);
+
+ al = GNUNET_malloc (sizeof (struct OwnAddressList) + addrlen);
al->addr = &al[1];
al->next = p->addresses;
p->addresses = al;
*/
static void
notify_clients_connect (const struct GNUNET_PeerIdentity *peer,
- struct GNUNET_TIME_Relative latency)
+ struct GNUNET_TIME_Relative latency,
+ uint32_t distance)
{
struct ConnectInfoMessage cim;
struct TransportClient *cpos;
#if DEBUG_TRANSPORT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Informing clients about peer `%4s' connecting to us\n",
- GNUNET_i2s (peer));
+ "Notifying clients about connection from `%s'\n",
+ GNUNET_i2s (peer));
#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# peers connected"),
+ 1,
+ GNUNET_NO);
cim.header.size = htons (sizeof (struct ConnectInfoMessage));
cim.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
- cim.quota_out = htonl (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT / (60 * 1000));
+ cim.distance = htonl (distance);
cim.latency = GNUNET_TIME_relative_hton (latency);
memcpy (&cim.id, peer, sizeof (struct GNUNET_PeerIdentity));
cpos = clients;
#if DEBUG_TRANSPORT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Informing clients about peer `%4s' disconnecting\n",
- GNUNET_i2s (peer));
+ "Notifying clients about lost connection to `%s'\n",
+ GNUNET_i2s (peer));
#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# peers connected"),
+ -1,
+ GNUNET_NO);
dim.header.size = htons (sizeof (struct DisconnectInfoMessage));
dim.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT);
dim.reserved = htonl (0);
/**
- * Copy any validated addresses to buf.
+ * Find a ForeignAddressList entry for the given neighbour
+ * that matches the given address and transport.
*
- * @return 0 once all addresses have been
- * returned
- */
-static size_t
-list_validated_addresses (void *cls, size_t max, void *buf)
+ * @param neighbour which peer we care about
+ * @param tname name of the transport plugin
+ * @param addr binary address
+ * @param addrlen length of addr
+ * @return NULL if no such entry exists
+ */
+static struct ForeignAddressList *
+find_peer_address(struct NeighbourList *neighbour,
+ const char *tname,
+ const char *addr,
+ size_t addrlen)
{
- struct ValidationAddress **va = cls;
- size_t ret;
+ struct ReadyList *head;
+ struct ForeignAddressList *address_head;
- while ((NULL != *va) && ((*va)->ok != GNUNET_YES))
- *va = (*va)->next;
- if (NULL == *va)
- return 0;
- ret = GNUNET_HELLO_add_address ((*va)->transport_name,
- (*va)->expiration,
- &(*va)[1], (*va)->addr_len, buf, max);
- *va = (*va)->next;
+ head = neighbour->plugins;
+ while (head != NULL)
+ {
+ if (0 == strcmp (tname, head->plugin->short_name))
+ break;
+ head = head->next;
+ }
+ if (head == NULL)
+ return NULL;
+
+ address_head = head->addresses;
+ while ( (address_head != NULL) &&
+ ( (address_head->addrlen != addrlen) ||
+ (memcmp(address_head->addr, addr, addrlen) != 0) ) )
+ address_head = address_head->next;
+ return address_head;
+}
+
+
+/**
+ * Get the peer address struct for the given neighbour and
+ * address. If it doesn't yet exist, create it.
+ *
+ * @param neighbour which peer we care about
+ * @param tname name of the transport plugin
+ * @param addr binary address
+ * @param addrlen length of addr
+ * @return NULL if we do not have a transport plugin for 'tname'
+ */
+static struct ForeignAddressList *
+add_peer_address(struct NeighbourList *neighbour,
+ const char *tname,
+ const char *addr,
+ size_t addrlen)
+{
+ struct ReadyList *head;
+ struct ForeignAddressList *ret;
+
+ ret = find_peer_address (neighbour, tname, addr, addrlen);
+ if (ret != NULL)
+ return ret;
+ head = neighbour->plugins;
+ while (head != NULL)
+ {
+ if (0 == strcmp (tname, head->plugin->short_name))
+ break;
+ head = head->next;
+ }
+ if (head == NULL)
+ return NULL;
+ ret = GNUNET_malloc(sizeof(struct ForeignAddressList) + addrlen);
+ ret->addr = (const char*) &ret[1];
+ memcpy (&ret[1], addr, addrlen);
+ ret->addrlen = addrlen;
+ ret->expires = GNUNET_TIME_relative_to_absolute
+ (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
+ ret->latency = GNUNET_TIME_relative_get_forever();
+ ret->distance = -1;
+ ret->timeout = GNUNET_TIME_relative_to_absolute
+ (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
+ ret->ready_list = head;
+ ret->next = head->addresses;
+ head->addresses = ret;
return ret;
}
/**
- * HELLO validation cleanup task.
+ * Closure for 'add_validated_address'.
*/
-static void
-cleanup_validation (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+struct AddValidatedAddressContext
{
- struct ValidationAddress *va;
- struct ValidationList *pos;
- struct ValidationList *prev;
- struct GNUNET_TIME_Absolute now;
- struct GNUNET_TIME_Absolute first;
- struct GNUNET_HELLO_Message *hello;
- struct GNUNET_PeerIdentity pid;
- struct NeighborList *n;
+ /**
+ * Entry that has been validated.
+ */
+ const struct ValidationEntry *ve;
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
- "HELLO validation cleanup background task running...\n");
-#endif
- now = GNUNET_TIME_absolute_get ();
- prev = NULL;
- pos = pending_validations;
- while (pos != NULL)
- {
- if (pos->timeout.value < now.value)
- {
- if (prev == NULL)
- pending_validations = pos->next;
- else
- prev->next = pos->next;
- va = pos->addresses;
- hello = GNUNET_HELLO_create (&pos->publicKey,
- &list_validated_addresses, &va);
- GNUNET_CRYPTO_hash (&pos->publicKey,
- sizeof (struct
- GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
- &pid.hashPubKey);
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Creating persistent `%s' message for peer `%4s' based on confirmed addresses.\n",
- "HELLO", GNUNET_i2s (&pid));
-#endif
- GNUNET_PEERINFO_add_peer (cfg, sched, &pid, hello);
- n = find_neighbor (&pid, NULL, 0);
- if (NULL != n)
- try_transmission_to_peer (n);
- GNUNET_free (hello);
- while (NULL != (va = pos->addresses))
- {
- pos->addresses = va->next;
- GNUNET_free (va->transport_name);
- GNUNET_free (va);
- }
- GNUNET_free (pos);
- if (prev == NULL)
- pos = pending_validations;
- else
- pos = prev->next;
- continue;
- }
- prev = pos;
- pos = pos->next;
- }
+ /**
+ * Flag set after we have added the address so
+ * that we terminate the iteration next time.
+ */
+ int done;
+};
- /* finally, reschedule cleanup if needed; list is
- ordered by timeout, so we need the last element... */
- if (NULL != pending_validations)
- {
- first = pending_validations->timeout;
- pos = pending_validations;
- while (pos != NULL)
- {
- first = GNUNET_TIME_absolute_min (first, pos->timeout);
- pos = pos->next;
- }
- GNUNET_SCHEDULER_add_delayed (sched,
- GNUNET_TIME_absolute_get_remaining
- (first), &cleanup_validation, NULL);
- }
+
+/**
+ * Callback function used to fill a buffer of max bytes with a list of
+ * addresses in the format used by HELLOs. Should use
+ * "GNUNET_HELLO_add_address" as a helper function.
+ *
+ * @param cls the 'struct AddValidatedAddressContext' with the validated address
+ * @param max maximum number of bytes that can be written to buf
+ * @param buf where to write the address information
+ * @return number of bytes written, 0 to signal the
+ * end of the iteration.
+ */
+static size_t
+add_validated_address (void *cls,
+ size_t max, void *buf)
+{
+ struct AddValidatedAddressContext *avac = cls;
+ const struct ValidationEntry *ve = avac->ve;
+
+ if (GNUNET_YES == avac->done)
+ return 0;
+ avac->done = GNUNET_YES;
+ return GNUNET_HELLO_add_address (ve->transport_name,
+ GNUNET_TIME_relative_to_absolute (HELLO_ADDRESS_EXPIRATION),
+ ve->addr,
+ ve->addrlen,
+ buf,
+ max);
}
-static struct GNUNET_MessageHeader *
-createPingMessage (struct GNUNET_PeerIdentity * target, struct ValidationAddress *va)
-{
+static void send_periodic_ping(void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc);
- struct TransportPingMessage *ping;
- ping = GNUNET_malloc(sizeof(struct TransportPingMessage));
- ping->challenge = htonl(va->challenge);
- ping->header.size = sizeof(struct TransportPingMessage);
- ping->header.type = htons(GNUNET_MESSAGE_TYPE_TRANSPORT_PING);
- memcpy(&ping->target, target, sizeof(struct GNUNET_PeerIdentity));
+/**
+ * Iterator over hash map entries. Checks if the given validation
+ * entry is for the same challenge as what is given in the PONG.
+ *
+ * @param cls the 'struct TransportPongMessage*'
+ * @param key peer identity
+ * @param value value in the hash map ('struct ValidationEntry')
+ * @return GNUNET_YES if we should continue to
+ * iterate (mismatch), GNUNET_NO if not (entry matched)
+ */
+static int
+check_pending_validation (void *cls,
+ const GNUNET_HashCode * key,
+ void *value)
+{
+ const struct TransportPongMessage *pong = cls;
+ struct ValidationEntry *ve = value;
+ struct AddValidatedAddressContext avac;
+ unsigned int challenge = ntohl(pong->challenge);
+ struct GNUNET_HELLO_Message *hello;
+ struct GNUNET_PeerIdentity target;
+ struct NeighbourList *n;
+ struct ForeignAddressList *fal;
+ struct PeriodicValidationContext *periodic_validation_context;
+
+ if (ve->challenge != challenge)
+ return GNUNET_YES;
- return &ping->header;
+#if DEBUG_TRANSPORT
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Confirmed validity of address, peer `%4s' has address `%s' (%s).\n",
+ GNUNET_h2s (key),
+ GNUNET_a2s ((const struct sockaddr *) ve->addr,
+ ve->addrlen),
+ ve->transport_name);
+#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# address validation successes"),
+ 1,
+ GNUNET_NO);
+ /* create the updated HELLO */
+ GNUNET_CRYPTO_hash (&ve->publicKey,
+ sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
+ &target.hashPubKey);
+ avac.done = GNUNET_NO;
+ avac.ve = ve;
+ hello = GNUNET_HELLO_create (&ve->publicKey,
+ &add_validated_address,
+ &avac);
+ GNUNET_PEERINFO_add_peer (cfg, sched,
+ &target,
+ hello);
+ GNUNET_free (hello);
+ n = find_neighbour (&target);
+ if (n != NULL)
+ {
+ fal = add_peer_address (n,
+ ve->transport_name,
+ ve->addr,
+ ve->addrlen);
+ GNUNET_assert (fal != NULL);
+ fal->expires = GNUNET_TIME_relative_to_absolute (HELLO_ADDRESS_EXPIRATION);
+ fal->validated = GNUNET_YES;
+ fal->latency = GNUNET_TIME_absolute_get_duration (ve->send_time);
+ periodic_validation_context = GNUNET_malloc(sizeof(struct PeriodicValidationContext));
+ periodic_validation_context->foreign_address = fal;
+ periodic_validation_context->transport = strdup(ve->transport_name);
+ memcpy(&periodic_validation_context->publicKey,
+ &ve->publicKey,
+ sizeof(struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
+ /* FIXME: this causes all of the revalidation PINGs for the same HELLO
+ to be transmitted in bulk, which is not nice; also,
+ triggering these HERE means that revalidations do NOT happen AT ALL
+ for HELLOs a previous instance of this process validated (since
+ there is no "initial" validation PING => no revalidation => BUG! */
+ fal->revalidate_task = GNUNET_SCHEDULER_add_delayed(sched,
+ TRANSPORT_DEFAULT_REVALIDATION,
+ &send_periodic_ping,
+ periodic_validation_context);
+ if (n->latency.value == GNUNET_TIME_UNIT_FOREVER_REL.value)
+ n->latency = fal->latency;
+ else
+ n->latency.value = (fal->latency.value + n->latency.value) / 2;
+ n->distance = fal->distance;
+ if (GNUNET_NO == n->received_pong)
+ {
+ notify_clients_connect (&target, n->latency, n->distance);
+ n->received_pong = GNUNET_YES;
+ }
+ if (n->retry_task != GNUNET_SCHEDULER_NO_TASK)
+ {
+ GNUNET_SCHEDULER_cancel (sched,
+ n->retry_task);
+ n->retry_task = GNUNET_SCHEDULER_NO_TASK;
+ try_transmission_to_peer (n);
+ }
+ }
+
+ /* clean up validation entry */
+ GNUNET_assert (GNUNET_YES ==
+ GNUNET_CONTAINER_multihashmap_remove (validation_map,
+ key,
+ ve));
+ GNUNET_SCHEDULER_cancel (sched,
+ ve->timeout_task);
+ GNUNET_free (ve->transport_name);
+ GNUNET_free (ve);
+ return GNUNET_NO;
}
+
/**
* Function that will be called if we receive a validation
* of an address challenge that we transmitted to another
* (otherwise we may be seeing a MiM attack).
*
* @param cls closure
- * @param name name of the transport that generated the address
+ * @param message the pong message
* @param peer who responded to our challenge
- * @param challenge the challenge number we presumably used
- * @param sender_addr string describing our sender address (as observed
- * by the other peer in human-readable format)
+ * @param sender_address string describing our sender address (as observed
+ * by the other peer in binary format)
+ * @param sender_address_len number of bytes in 'sender_address'
*/
static void
handle_pong (void *cls, const struct GNUNET_MessageHeader *message,
const char *sender_address,
size_t sender_address_len)
{
- unsigned int not_done;
- int matched;
- struct ValidationList *pos;
- struct ValidationAddress *va;
- struct GNUNET_PeerIdentity id;
- struct TransportPongMessage *pong = (struct TransportPongMessage *)message;
-
- unsigned int challenge = ntohl(pong->challenge);
- pos = pending_validations;
- while (pos != NULL)
- {
- GNUNET_CRYPTO_hash (&pos->publicKey,
- sizeof (struct
- GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
- &id.hashPubKey);
- if (0 == memcmp (peer, &id, sizeof (struct GNUNET_PeerIdentity)))
- break;
- pos = pos->next;
- }
- if (pos == NULL)
- {
- /* TODO: call statistics (unmatched PONG) */
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- _
- ("Received validation response but have no record of any validation request for `%4s'. Ignoring.\n"),
- GNUNET_i2s (peer));
- return;
- }
- not_done = 0;
- matched = GNUNET_NO;
- va = pos->addresses;
- while (va != NULL)
- {
- if (va->challenge == challenge)
- {
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Confirmed validity of address, peer `%4s' has address `%s'.\n",
- GNUNET_i2s (peer),
- GNUNET_a2s ((const struct sockaddr *) sender_address,
- sender_address_len));
+#if DEBUG_TRANSPORT > 1
+ /* we get tons of these that just get discarded, only log
+ if we are quite verbose */
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Receiving `%s' message from `%4s'.\n", "PONG",
+ GNUNET_i2s (peer));
#endif
- GNUNET_log (GNUNET_ERROR_TYPE_INFO | GNUNET_ERROR_TYPE_BULK,
- _
- ("Another peer saw us using the address `%s' via `FIXME'. If this is not plausible, this address should be listed in the configuration as implausible to avoid MiM attacks.\n"),
- GNUNET_a2s ((const struct sockaddr *) &va[1],
- va->addr_len));
- va->ok = GNUNET_YES;
- va->expiration =
- GNUNET_TIME_relative_to_absolute (HELLO_ADDRESS_EXPIRATION);
- matched = GNUNET_YES;
- }
- if (va->ok != GNUNET_YES)
- not_done++;
- va = va->next;
- }
- if (GNUNET_NO == matched)
- {
- /* TODO: call statistics (unmatched PONG) */
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- _
- ("Received `%s' message but have no record of a matching `%s' message. Ignoring.\n"),
- "PONG", "PING");
- }
- if (0 == not_done)
- {
-#if DEBUG_TRANSPORT
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# PONG messages received"),
+ 1,
+ GNUNET_NO);
+ if (GNUNET_SYSERR !=
+ GNUNET_CONTAINER_multihashmap_get_multiple (validation_map,
+ &peer->hashPubKey,
+ &check_pending_validation,
+ (void*) message))
+ {
+ /* This is *expected* to happen a lot since we send
+ PONGs to *all* known addresses of the sender of
+ the PING, so most likely we get multiple PONGs
+ per PING, and all but the first PONG will end up
+ here. So really we should not print anything here
+ unless we want to be very, very verbose... */
+#if DEBUG_TRANSPORT > 2
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "All addresses validated, will now construct `%s' for `%4s'.\n",
- "HELLO", GNUNET_i2s (peer));
+ "Received `%s' message from `%4s' but have no record of a matching `%s' message. Ignoring.\n",
+ "PONG",
+ GNUNET_i2s (peer),
+ "PING");
#endif
- pos->timeout.value = 0;
- GNUNET_SCHEDULER_add_with_priority (sched,
- GNUNET_SCHEDULER_PRIORITY_IDLE,
- &cleanup_validation, NULL);
+ return;
}
- else
- {
+
+#if 0
+ /* FIXME: add given address to potential pool of our addresses
+ (for voting) */
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO | GNUNET_ERROR_TYPE_BULK,
+ _("Another peer saw us using the address `%s' via `%s'.\n"),
+ GNUNET_a2s ((const struct sockaddr *) &pong[1],
+ ntohs(pong->addrlen)),
+ va->transport_name);
+#endif
+}
+
+
+static void
+neighbour_timeout_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct NeighbourList *n = cls;
+
#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Still waiting for %u additional `%s' messages before constructing `%s' for `%4s'.\n",
- not_done, "PONG", "HELLO", GNUNET_i2s (peer));
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
+ "Neighbour `%4s' has timed out!\n", GNUNET_i2s (&n->id));
#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# disconnects due to timeout"),
+ 1,
+ GNUNET_NO);
+ n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+ disconnect_neighbour (n, GNUNET_NO);
+}
+
+
+/**
+ * Create a fresh entry in our neighbour list for the given peer.
+ * Will try to transmit our current HELLO to the new neighbour. Also
+ * notifies our clients about the new "connection".
+ *
+ * @param peer the peer for which we create the entry
+ * @return the new neighbour list entry
+ */
+static struct NeighbourList *
+setup_new_neighbour (const struct GNUNET_PeerIdentity *peer)
+{
+ struct NeighbourList *n;
+ struct TransportPlugin *tp;
+ struct ReadyList *rl;
+
+ GNUNET_assert (our_hello != NULL);
+ n = GNUNET_malloc (sizeof (struct NeighbourList));
+ n->next = neighbours;
+ neighbours = n;
+ n->id = *peer;
+ n->peer_timeout =
+ GNUNET_TIME_relative_to_absolute
+ (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
+ GNUNET_BANDWIDTH_tracker_init (&n->in_tracker,
+ GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
+ MAX_BANDWIDTH_CARRY_S);
+ tp = plugins;
+ while (tp != NULL)
+ {
+ if (tp->api->send != NULL)
+ {
+ rl = GNUNET_malloc (sizeof (struct ReadyList));
+ rl->next = n->plugins;
+ n->plugins = rl;
+ rl->plugin = tp;
+ rl->addresses = NULL;
+ }
+ tp = tp->next;
}
+ n->latency = GNUNET_TIME_UNIT_FOREVER_REL;
+ n->distance = -1;
+ n->timeout_task = GNUNET_SCHEDULER_add_delayed (sched,
+ GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
+ &neighbour_timeout_task, n);
+ transmit_to_peer (NULL, NULL, 0,
+ HELLO_ADDRESS_EXPIRATION,
+ (const char *) our_hello, GNUNET_HELLO_size(our_hello),
+ GNUNET_NO, n);
+ return n;
}
-struct CheckHelloValidatedContext
+/**
+ * Closure for 'check_address_exists'.
+ */
+struct CheckAddressExistsClosure
{
/**
- * Plugin for which we are validating.
+ * Address to check for.
*/
- struct TransportPlugin *plugin;
+ const void *addr;
/**
- * Hello that we are validating.
+ * Name of the transport.
*/
- struct GNUNET_HELLO_Message *hello;
+ const char *tname;
/**
- * Validation list being build.
+ * Length of addr.
*/
- struct ValidationList *e;
+ size_t addrlen;
/**
- * Context for peerinfo iteration.
- * NULL after we are done processing peerinfo's information.
+ * Set to GNUNET_YES if the address exists.
*/
- struct GNUNET_PEERINFO_IteratorContext *piter;
+ int exists;
+};
+
+
+/**
+ * Iterator over hash map entries. Checks if the given
+ * validation entry is for the same address as what is given
+ * in the closure.
+ *
+ * @param cls the 'struct CheckAddressExistsClosure*'
+ * @param key current key code (ignored)
+ * @param value value in the hash map ('struct ValidationEntry')
+ * @return GNUNET_YES if we should continue to
+ * iterate (mismatch), GNUNET_NO if not (entry matched)
+ */
+static int
+check_address_exists (void *cls,
+ const GNUNET_HashCode * key,
+ void *value)
+{
+ struct CheckAddressExistsClosure *caec = cls;
+ struct ValidationEntry *ve = value;
+ if ( (0 == strcmp (caec->tname,
+ ve->transport_name)) &&
+ (caec->addrlen == ve->addrlen) &&
+ (0 == memcmp (caec->addr,
+ ve->addr,
+ caec->addrlen)) )
+ {
+ caec->exists = GNUNET_YES;
+ return GNUNET_NO;
+ }
+ return GNUNET_YES;
+}
+
+
+/**
+ * HELLO validation cleanup task (validation failed).
+ *
+ * @param cls the 'struct ValidationEntry' that failed
+ * @param tc scheduler context (unused)
+ */
+static void
+timeout_hello_validation (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct ValidationEntry *va = cls;
+ struct GNUNET_PeerIdentity pid;
+
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# address validation timeouts"),
+ 1,
+ GNUNET_NO);
+ GNUNET_CRYPTO_hash (&va->publicKey,
+ sizeof (struct
+ GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
+ &pid.hashPubKey);
+ GNUNET_CONTAINER_multihashmap_remove (validation_map,
+ &pid.hashPubKey,
+ va);
+ GNUNET_free (va->transport_name);
+ GNUNET_free (va);
+}
+
+
+/**
+ * Check if the given address is already being validated; if not,
+ * append the given address to the list of entries that are being be
+ * validated and initiate validation.
+ *
+ * @param cls closure ('struct PeriodicValidationContext *')
+ * @param tname name of the transport
+ * @param expiration expiration time
+ * @param addr the address
+ * @param addrlen length of the address
+ * @return GNUNET_OK (always)
+ */
+static int
+rerun_validation (void *cls,
+ const char *tname,
+ struct GNUNET_TIME_Absolute expiration,
+ const void *addr, size_t addrlen)
+{
+ struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *publicKey = cls;
+ struct GNUNET_PeerIdentity id;
+ struct TransportPlugin *tp;
+ struct ValidationEntry *va;
+ struct NeighbourList *neighbour;
+ struct ForeignAddressList *peer_address;
+ struct TransportPingMessage ping;
+ /*struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded pk;*/
+ struct CheckAddressExistsClosure caec;
+ char * message_buf;
+ uint16_t hello_size;
+ size_t tsize;
+
+ tp = find_transport (tname);
+ if (tp == NULL)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO |
+ GNUNET_ERROR_TYPE_BULK,
+ _
+ ("Transport `%s' not loaded, will not try to validate peer address using this transport.\n"),
+ tname);
+ return GNUNET_OK;
+ }
+
+ GNUNET_CRYPTO_hash (publicKey,
+ sizeof (struct
+ GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
+ &id.hashPubKey);
+ caec.addr = addr;
+ caec.addrlen = addrlen;
+ caec.tname = tname;
+ caec.exists = GNUNET_NO;
+ GNUNET_CONTAINER_multihashmap_iterate (validation_map,
+ &check_address_exists,
+ &caec);
+ if (caec.exists == GNUNET_YES)
+ {
+ /* During validation attempts we will likely trigger the other
+ peer trying to validate our address which in turn will cause
+ it to send us its HELLO, so we expect to hit this case rather
+ frequently. Only print something if we are very verbose. */
+#if DEBUG_TRANSPORT > 1
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Some validation of address `%s' via `%s' for peer `%4s' already in progress.\n",
+ GNUNET_a2s (addr, addrlen),
+ tname,
+ GNUNET_i2s (&id));
+#endif
+ return GNUNET_OK;
+ }
+ va = GNUNET_malloc (sizeof (struct ValidationEntry) + addrlen);
+ va->transport_name = GNUNET_strdup (tname);
+ va->challenge = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+ (unsigned int) -1);
+ va->send_time = GNUNET_TIME_absolute_get();
+ va->addr = (const void*) &va[1];
+ memcpy (&va[1], addr, addrlen);
+ va->addrlen = addrlen;
+ memcpy(&va->publicKey, publicKey, sizeof(struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
+ va->timeout_task = GNUNET_SCHEDULER_add_delayed (sched,
+ HELLO_VERIFICATION_TIMEOUT,
+ &timeout_hello_validation,
+ va);
+ GNUNET_CONTAINER_multihashmap_put (validation_map,
+ &id.hashPubKey,
+ va,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+ neighbour = find_neighbour(&id);
+ if (neighbour == NULL)
+ neighbour = setup_new_neighbour(&id);
+ peer_address = add_peer_address(neighbour, tname, addr, addrlen);
+ GNUNET_assert(peer_address != NULL);
+ hello_size = GNUNET_HELLO_size(our_hello);
+ tsize = sizeof(struct TransportPingMessage) + hello_size;
+ message_buf = GNUNET_malloc(tsize);
+ ping.challenge = htonl(va->challenge);
+ ping.header.size = htons(sizeof(struct TransportPingMessage));
+ ping.header.type = htons(GNUNET_MESSAGE_TYPE_TRANSPORT_PING);
+ memcpy(&ping.target, &id, sizeof(struct GNUNET_PeerIdentity));
+ memcpy(message_buf, our_hello, hello_size);
+ memcpy(&message_buf[hello_size],
+ &ping,
+ sizeof(struct TransportPingMessage));
+#if DEBUG_TRANSPORT_REVALIDATION
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Performing re-validation of address `%s' via `%s' for peer `%4s' sending `%s' (%u bytes) and `%s' (%u bytes)\n",
+ GNUNET_a2s (addr, addrlen),
+ tname,
+ GNUNET_i2s (&id),
+ "HELLO", hello_size,
+ "PING", sizeof (struct TransportPingMessage));
+#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# PING messages sent for re-validation"),
+ 1,
+ GNUNET_NO);
+ transmit_to_peer (NULL, peer_address,
+ GNUNET_SCHEDULER_PRIORITY_DEFAULT,
+ HELLO_VERIFICATION_TIMEOUT,
+ message_buf, tsize,
+ GNUNET_YES, neighbour);
+ GNUNET_free(message_buf);
+ return GNUNET_OK;
+}
-};
+
+/**
+ * Send periodic ping messages to a give foreign address.
+ *
+ * cls closure, can be safely cast to ForeignAddressList
+ * tc task context
+ *
+ * FIXME: Since a _billion_ pongs are sent for every ping,
+ * maybe this should be a special message type or something
+ * that gets discarded on the other side instead of initiating
+ * a flood.
+ */
+static void
+send_periodic_ping (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct PeriodicValidationContext *periodic_validation_context = cls;
+
+ if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN)
+ {
+ GNUNET_free(periodic_validation_context->transport);
+ GNUNET_free(periodic_validation_context);
+ return; /* We have been shutdown, don't do anything! */
+ }
+ rerun_validation(&periodic_validation_context->publicKey,
+ periodic_validation_context->transport,
+ periodic_validation_context->foreign_address->expires,
+ periodic_validation_context->foreign_address->addr,
+ periodic_validation_context->foreign_address->addrlen);
+ GNUNET_free(periodic_validation_context->transport);
+ GNUNET_free(periodic_validation_context);
+}
/**
- * Append the given address to the list of entries
- * that need to be validated.
+ * Check if the given address is already being validated; if not,
+ * append the given address to the list of entries that are being be
+ * validated and initiate validation.
+ *
+ * @param cls closure ('struct CheckHelloValidatedContext *')
+ * @param tname name of the transport
+ * @param expiration expiration time
+ * @param addr the address
+ * @param addrlen length of the address
+ * @return GNUNET_OK (always)
*/
static int
run_validation (void *cls,
struct GNUNET_TIME_Absolute expiration,
const void *addr, size_t addrlen)
{
- struct ValidationList *e = cls;
- struct TransportPlugin *tp;
- struct ValidationAddress *va;
+ struct CheckHelloValidatedContext *chvc = cls;
struct GNUNET_PeerIdentity id;
- struct GNUNET_MessageHeader *pingMessage;
- int sent;
+ struct TransportPlugin *tp;
+ struct ValidationEntry *va;
+ struct NeighbourList *neighbour;
+ struct ForeignAddressList *peer_address;
+ struct TransportPingMessage ping;
+ struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded pk;
+ struct CheckAddressExistsClosure caec;
+ char * message_buf;
+ uint16_t hello_size;
+ size_t tsize;
+
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# peer addresses scheduled for validation"),
+ 1,
+ GNUNET_NO);
tp = find_transport (tname);
if (tp == NULL)
{
_
("Transport `%s' not loaded, will not try to validate peer address using this transport.\n"),
tname);
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# peer addresses not validated (no applicable transport plugin available)"),
+ 1,
+ GNUNET_NO);
return GNUNET_OK;
}
- GNUNET_CRYPTO_hash (&e->publicKey,
+ GNUNET_HELLO_get_key (chvc->hello, &pk);
+ GNUNET_CRYPTO_hash (&pk,
sizeof (struct
GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
&id.hashPubKey);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Scheduling validation of address `%s' via `%s' for `%4s'\n",
- GNUNET_a2s (addr, addrlen), tname, GNUNET_i2s (&id));
-
- va = GNUNET_malloc (sizeof (struct ValidationAddress) + addrlen);
- va->next = e->addresses;
- e->addresses = va;
+ caec.addr = addr;
+ caec.addrlen = addrlen;
+ caec.tname = tname;
+ caec.exists = GNUNET_NO;
+ GNUNET_CONTAINER_multihashmap_iterate (validation_map,
+ &check_address_exists,
+ &caec);
+ if (caec.exists == GNUNET_YES)
+ {
+ /* During validation attempts we will likely trigger the other
+ peer trying to validate our address which in turn will cause
+ it to send us its HELLO, so we expect to hit this case rather
+ frequently. Only print something if we are very verbose. */
+#if DEBUG_TRANSPORT > 1
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Validation of address `%s' via `%s' for peer `%4s' already in progress.\n",
+ GNUNET_a2s (addr, addrlen),
+ tname,
+ GNUNET_i2s (&id));
+#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# peer addresses not validated (in progress)"),
+ 1,
+ GNUNET_NO);
+ return GNUNET_OK;
+ }
+ va = GNUNET_malloc (sizeof (struct ValidationEntry) + addrlen);
va->transport_name = GNUNET_strdup (tname);
- va->addr_len = addrlen;
va->challenge = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
(unsigned int) -1);
+ va->send_time = GNUNET_TIME_absolute_get();
+ va->addr = (const void*) &va[1];
memcpy (&va[1], addr, addrlen);
-
- pingMessage = createPingMessage(&id, va);
-
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending ping message to address `%s' via `%s' for `%4s'\n",
- GNUNET_a2s (addr, addrlen), tname, GNUNET_i2s (&id));
-
-
- sent = tp->api->send(tp->api->cls, &id, pingMessage, GNUNET_SCHEDULER_PRIORITY_DEFAULT,
- TRANSPORT_DEFAULT_TIMEOUT, addr, addrlen, GNUNET_YES, NULL, NULL);
-
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transport returned %d from send!\n", sent);
-
- GNUNET_free(pingMessage);
-
+ va->addrlen = addrlen;
+ GNUNET_HELLO_get_key (chvc->hello,
+ &va->publicKey);
+ va->timeout_task = GNUNET_SCHEDULER_add_delayed (sched,
+ HELLO_VERIFICATION_TIMEOUT,
+ &timeout_hello_validation,
+ va);
+ GNUNET_CONTAINER_multihashmap_put (validation_map,
+ &id.hashPubKey,
+ va,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+ neighbour = find_neighbour(&id);
+ if (neighbour == NULL)
+ neighbour = setup_new_neighbour(&id);
+ peer_address = add_peer_address(neighbour, tname, addr, addrlen);
+ GNUNET_assert(peer_address != NULL);
+ hello_size = GNUNET_HELLO_size(our_hello);
+ tsize = sizeof(struct TransportPingMessage) + hello_size;
+ message_buf = GNUNET_malloc(tsize);
+ ping.challenge = htonl(va->challenge);
+ ping.header.size = htons(sizeof(struct TransportPingMessage));
+ ping.header.type = htons(GNUNET_MESSAGE_TYPE_TRANSPORT_PING);
+ memcpy(&ping.target, &id, sizeof(struct GNUNET_PeerIdentity));
+ memcpy(message_buf, our_hello, hello_size);
+ memcpy(&message_buf[hello_size],
+ &ping,
+ sizeof(struct TransportPingMessage));
+#if DEBUG_TRANSPORT
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Performing validation of address `%s' via `%s' for peer `%4s' sending `%s' (%u bytes) and `%s' (%u bytes)\n",
+ GNUNET_a2s (addr, addrlen),
+ tname,
+ GNUNET_i2s (&id),
+ "HELLO", hello_size,
+ "PING", sizeof (struct TransportPingMessage));
+#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# PING messages sent for initial validation"),
+ 1,
+ GNUNET_NO);
+ transmit_to_peer (NULL, peer_address,
+ GNUNET_SCHEDULER_PRIORITY_DEFAULT,
+ HELLO_VERIFICATION_TIMEOUT,
+ message_buf, tsize,
+ GNUNET_YES, neighbour);
+ GNUNET_free(message_buf);
return GNUNET_OK;
}
-/*
- * @param cls handle to the plugin (for sending)
- * @param target the peer identity of the peer we are sending to
- * @param challenge the challenge number
- * @param timeout how long to await validation?
- * @param addr the address to validate
- * @param addrlen the length of the address
- *
- * Perform address validation, which means sending a PING PONG to
- * the address via the transport plugin. If not validated, then
- * do not count this as a good peer/address...
+
+/**
+ * Add the given address to the list of foreign addresses
+ * available for the given peer (check for duplicates).
*
+ * @param cls the respective 'struct NeighbourList' to update
+ * @param tname name of the transport
+ * @param expiration expiration time
+ * @param addr the address
+ * @param addrlen length of the address
+ * @return GNUNET_OK (always)
*/
-static void
-validate_address (void *cls, struct ValidationAddress *va,
- const struct GNUNET_PeerIdentity *target,
- struct GNUNET_TIME_Relative timeout,
- const void *addr, size_t addrlen)
+static int
+add_to_foreign_address_list (void *cls,
+ const char *tname,
+ struct GNUNET_TIME_Absolute expiration,
+ const void *addr, size_t addrlen)
{
- /* struct Plugin *plugin = cls;
- int challenge = va->challenge; */
+ struct NeighbourList *n = cls;
+ struct ForeignAddressList *fal;
+ int try;
-
- return;
+ try = GNUNET_NO;
+ fal = find_peer_address (n, tname, addr, addrlen);
+ if (fal == NULL)
+ {
+#if DEBUG_TRANSPORT
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Adding address `%s' (%s) for peer `%4s' due to peerinfo data for %llums.\n",
+ GNUNET_a2s (addr, addrlen),
+ tname,
+ GNUNET_i2s (&n->id),
+ expiration.value);
+#endif
+ fal = add_peer_address (n, tname, addr, addrlen);
+ try = GNUNET_YES;
+ }
+ if (fal == NULL)
+ return GNUNET_OK;
+ fal->expires = GNUNET_TIME_absolute_max (expiration,
+ fal->expires);
+ fal->validated = GNUNET_YES;
+ if (try == GNUNET_YES)
+ try_transmission_to_peer (n);
+ return GNUNET_OK;
}
/**
* Check if addresses in validated hello "h" overlap with
- * those in "chvc->hello" and update "chvc->hello" accordingly,
- * removing those addresses that have already been validated.
+ * those in "chvc->hello" and validate the rest.
+ *
+ * @param cls closure
+ * @param peer id of the peer, NULL for last call
+ * @param h hello message for the peer (can be NULL)
+ * @param trust amount of trust we have in the peer (not used)
*/
static void
check_hello_validated (void *cls,
const struct GNUNET_PeerIdentity *peer,
- const struct GNUNET_HELLO_Message *h, uint32_t trust)
+ const struct GNUNET_HELLO_Message *h,
+ uint32_t trust)
{
struct CheckHelloValidatedContext *chvc = cls;
- struct ValidationAddress *va;
- struct TransportPlugin *tp;
- int first_call;
- struct GNUNET_PeerIdentity apeer;
+ struct GNUNET_HELLO_Message *plain_hello;
+ struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded pk;
+ struct GNUNET_PeerIdentity target;
+ struct NeighbourList *n;
- first_call = GNUNET_NO;
- if (chvc->e == NULL)
+ if (peer == NULL)
{
chvc->piter = NULL;
- first_call = GNUNET_YES;
- chvc->e = GNUNET_malloc (sizeof (struct ValidationList));
- GNUNET_assert (GNUNET_OK ==
- GNUNET_HELLO_get_key (h != NULL ? h : chvc->hello,
- &chvc->e->publicKey));
- chvc->e->timeout =
- GNUNET_TIME_relative_to_absolute (HELLO_VERIFICATION_TIMEOUT);
- chvc->e->next = pending_validations;
- pending_validations = chvc->e;
- }
- if (h != NULL)
- {
- GNUNET_HELLO_iterate_new_addresses (chvc->hello,
- h,
- GNUNET_TIME_absolute_get (),
- &run_validation, chvc->e);
- }
- else if (GNUNET_YES == first_call)
- {
- /* no existing HELLO, all addresses are new */
- GNUNET_HELLO_iterate_addresses (chvc->hello,
- GNUNET_NO, &run_validation, chvc->e);
- }
- if (h != NULL)
- return; /* wait for next call */
- /* finally, transmit validation attempts */
- GNUNET_assert (GNUNET_OK == GNUNET_HELLO_get_id (chvc->hello, &apeer));
+ GNUNET_CONTAINER_DLL_remove (chvc_head,
+ chvc_tail,
+ chvc);
+ if (GNUNET_NO == chvc->hello_known)
+ {
+ /* notify PEERINFO about the peer now, so that we at least
+ have the public key if some other component needs it */
+ GNUNET_HELLO_get_key (chvc->hello, &pk);
+ GNUNET_CRYPTO_hash (&pk,
+ sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
+ &target.hashPubKey);
+ plain_hello = GNUNET_HELLO_create (&pk,
+ NULL,
+ NULL);
+ GNUNET_PEERINFO_add_peer (cfg, sched, &target, plain_hello);
+ GNUNET_free (plain_hello);
#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Ready to validate addresses from `%s' message for peer `%4s'\n",
- "HELLO", GNUNET_i2s (&apeer));
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Peerinfo had no `%s' message for peer `%4s', full validation needed.\n",
+ "HELLO",
+ GNUNET_i2s (&target));
#endif
- va = chvc->e->addresses;
- while (va != NULL)
- {
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# new HELLOs requiring full validation"),
+ 1,
+ GNUNET_NO);
+ GNUNET_HELLO_iterate_addresses (chvc->hello,
+ GNUNET_NO,
+ &run_validation,
+ chvc);
+ }
+ else
+ {
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# duplicate HELLO (peer known)"),
+ 1,
+ GNUNET_NO);
+ }
+ GNUNET_free (chvc);
+ return;
+ }
+ if (h == NULL)
+ return;
#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Establishing `%s' connection to validate `%s' address `%s' of `%4s'\n",
- va->transport_name,
- "HELLO",
- GNUNET_a2s ((const struct sockaddr *) &va[1],
- va->addr_len), GNUNET_i2s (&apeer));
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Peerinfo had `%s' message for peer `%4s', validating only new addresses.\n",
+ "HELLO",
+ GNUNET_i2s (peer));
#endif
- tp = find_transport (va->transport_name);
- GNUNET_assert (tp != NULL);
- /* This validation should happen inside the transport, not from the plugin! */
- validate_address (tp->api->cls, va, &apeer,
- HELLO_VERIFICATION_TIMEOUT,
- &va[1], va->addr_len);
- /* va->ok = GNUNET_SYSERR; will be set by validate_address! */
- va = va->next;
- }
- GNUNET_SCHEDULER_add_delayed (sched,
- GNUNET_TIME_absolute_get_remaining (chvc->
- e->timeout),
- &cleanup_validation, NULL);
- GNUNET_free (chvc);
+ chvc->hello_known = GNUNET_YES;
+ n = find_neighbour (peer);
+ if (n != NULL)
+ {
+ GNUNET_HELLO_iterate_addresses (h,
+ GNUNET_NO,
+ &add_to_foreign_address_list,
+ n);
+ try_transmission_to_peer (n);
+ }
+ GNUNET_HELLO_iterate_new_addresses (chvc->hello,
+ h,
+ GNUNET_TIME_relative_to_absolute (HELLO_REVALIDATION_START_TIME),
+ &run_validation,
+ chvc);
}
-
/**
* Process HELLO-message.
*
process_hello (struct TransportPlugin *plugin,
const struct GNUNET_MessageHeader *message)
{
- struct ValidationList *e;
uint16_t hsize;
struct GNUNET_PeerIdentity target;
const struct GNUNET_HELLO_Message *hello;
return GNUNET_SYSERR;
}
/* first, check if load is too high */
- if (GNUNET_OS_load_cpu_get (cfg) > 100)
+ if (GNUNET_SCHEDULER_get_load (sched,
+ GNUNET_SCHEDULER_PRIORITY_BACKGROUND) > MAX_HELLO_LOAD)
{
- /* TODO: call to stats? */
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# HELLOs ignored due to high load"),
+ 1,
+ GNUNET_NO);
return GNUNET_OK;
}
hello = (const struct GNUNET_HELLO_Message *) message;
GNUNET_CRYPTO_hash (&publicKey,
sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
&target.hashPubKey);
-#if DEBUG_TRANSPORT
+ if (0 == memcmp (&my_identity,
+ &target,
+ sizeof (struct GNUNET_PeerIdentity)))
+ {
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# HELLOs ignored for validation (is my own HELLO)"),
+ 1,
+ GNUNET_NO);
+ return GNUNET_OK;
+ }
+#if DEBUG_TRANSPORT > 1
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Processing `%s' message for `%4s'\n",
- "HELLO", GNUNET_i2s (&target));
-#endif
- /* check if a HELLO for this peer is already on the validation list */
- e = pending_validations;
- while (e != NULL)
- {
- if (0 == memcmp (&e->publicKey,
- &publicKey,
- sizeof (struct
- GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded)))
- {
- /* TODO: call to stats? */
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "`%s' message for peer `%4s' is already pending; ignoring new message\n",
- "HELLO", GNUNET_i2s (&target));
+ "Processing `%s' message for `%4s' of size %u\n",
+ "HELLO",
+ GNUNET_i2s (&target),
+ GNUNET_HELLO_size(hello));
#endif
- return GNUNET_OK;
- }
- e = e->next;
- }
chvc = GNUNET_malloc (sizeof (struct CheckHelloValidatedContext) + hsize);
- chvc->plugin = plugin;
- chvc->hello = (struct GNUNET_HELLO_Message *) &chvc[1];
- memcpy (chvc->hello, hello, hsize);
+ chvc->hello = (const struct GNUNET_HELLO_Message *) &chvc[1];
+ memcpy (&chvc[1], hello, hsize);
+ GNUNET_CONTAINER_DLL_insert (chvc_head,
+ chvc_tail,
+ chvc);
/* finally, check if HELLO was previously validated
(continuation will then schedule actual validation) */
chvc->piter = GNUNET_PEERINFO_iterate (cfg,
/**
- * The peer specified by the given neighbor has timed-out or a plugin
+ * The peer specified by the given neighbour has timed-out or a plugin
* has disconnected. We may either need to do nothing (other plugins
* still up), or trigger a full disconnect and clean up. This
- * function updates our state and do the necessary notifications.
- * Also notifies our clients that the neighbor is now officially
+ * function updates our state and does the necessary notifications.
+ * Also notifies our clients that the neighbour is now officially
* gone.
*
- * @param n the neighbor list entry for the peer
+ * @param n the neighbour list entry for the peer
* @param check should we just check if all plugins
* disconnected or must we ask all plugins to
* disconnect?
*/
static void
-disconnect_neighbor (struct NeighborList *n, int check)
+disconnect_neighbour (struct NeighbourList *n, int check)
{
struct ReadyList *rpos;
- struct NeighborList *npos;
- struct NeighborList *nprev;
+ struct NeighbourList *npos;
+ struct NeighbourList *nprev;
struct MessageQueue *mq;
+ struct ForeignAddressList *peer_addresses;
+ struct ForeignAddressList *peer_pos;
if (GNUNET_YES == check)
{
rpos = n->plugins;
while (NULL != rpos)
{
- if (GNUNET_YES == rpos->connected)
- return; /* still connected */
+ peer_addresses = rpos->addresses;
+ while (peer_addresses != NULL)
+ {
+ if (GNUNET_YES == peer_addresses->connected)
+ return; /* still connected */
+ peer_addresses = peer_addresses->next;
+ }
rpos = rpos->next;
}
}
-
#if DEBUG_TRANSPORT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
- "Disconnecting from `%4s'\n", GNUNET_i2s (&n->id));
+ "Disconnecting from `%4s'\n",
+ GNUNET_i2s (&n->id));
#endif
- /* remove n from neighbors list */
+ /* remove n from neighbours list */
nprev = NULL;
- npos = neighbors;
+ npos = neighbours;
while ((npos != NULL) && (npos != n))
{
nprev = npos;
}
GNUNET_assert (npos != NULL);
if (nprev == NULL)
- neighbors = n->next;
+ neighbours = n->next;
else
nprev->next = n->next;
/* notify all clients about disconnect */
- notify_clients_disconnect (&n->id);
+ if (GNUNET_YES == n->received_pong)
+ notify_clients_disconnect (&n->id);
/* clean up all plugins, cancel connections and pending transmissions */
while (NULL != (rpos = n->plugins))
{
n->plugins = rpos->next;
- GNUNET_assert (rpos->neighbor == n);
- if (GNUNET_YES == rpos->connected)
- rpos->plugin->api->disconnect (rpos->plugin->api->cls, &n->id);
+ rpos->plugin->api->disconnect (rpos->plugin->api->cls, &n->id);
+
+ while (rpos->addresses != NULL)
+ {
+ peer_pos = rpos->addresses;
+ rpos->addresses = peer_pos->next;
+ GNUNET_free(peer_pos);
+ }
GNUNET_free (rpos);
}
/* free all messages on the queue */
- while (NULL != (mq = n->messages))
- {
- n->messages = mq->next;
- GNUNET_assert (mq->neighbor == n);
+ while (NULL != (mq = n->messages_head))
+ {
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# bytes in message queue for other peers"),
+ -mq->message_buf_size,
+ GNUNET_NO);
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# bytes discarded due to disconnect"),
+ mq->message_buf_size,
+ GNUNET_NO);
+ GNUNET_CONTAINER_DLL_remove (n->messages_head,
+ n->messages_tail,
+ mq);
+ GNUNET_assert (0 == memcmp(&mq->neighbour_id,
+ &n->id,
+ sizeof(struct GNUNET_PeerIdentity)));
GNUNET_free (mq);
}
if (n->timeout_task != GNUNET_SCHEDULER_NO_TASK)
- GNUNET_SCHEDULER_cancel (sched, n->timeout_task);
- /* finally, free n itself */
- GNUNET_free (n);
-}
-
-
-/**
- * Add an entry for each of our transport plugins
- * (that are able to send) to the list of plugins
- * for this neighbor.
- *
- * @param neighbor to initialize
- */
-static void
-add_plugins (struct NeighborList *neighbor)
-{
- struct TransportPlugin *tp;
- struct ReadyList *rl;
-
- neighbor->retry_plugins_time
- = GNUNET_TIME_relative_to_absolute (PLUGIN_RETRY_FREQUENCY);
- tp = plugins;
- while (tp != NULL)
{
- if (tp->api->send != NULL)
- {
- rl = GNUNET_malloc (sizeof (struct ReadyList));
- rl->next = neighbor->plugins;
- neighbor->plugins = rl;
- rl->plugin = tp;
- rl->neighbor = neighbor;
- rl->transmit_ready = GNUNET_YES;
- }
- tp = tp->next;
+ GNUNET_SCHEDULER_cancel (sched, n->timeout_task);
+ n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
}
-}
-
-
-static void
-neighbor_timeout_task (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
- struct NeighborList *n = cls;
-
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
- "Neighbor `%4s' has timed out!\n", GNUNET_i2s (&n->id));
-#endif
- n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
- disconnect_neighbor (n, GNUNET_NO);
+ if (n->retry_task != GNUNET_SCHEDULER_NO_TASK)
+ {
+ GNUNET_SCHEDULER_cancel (sched, n->retry_task);
+ n->retry_task = GNUNET_SCHEDULER_NO_TASK;
+ }
+ /* finally, free n itself */
+ GNUNET_free (n);
}
/**
- * Create a fresh entry in our neighbor list for the given peer.
- * Will try to transmit our current HELLO to the new neighbor. Also
- * notifies our clients about the new "connection".
- *
- * @param peer the peer for which we create the entry
- * @return the new neighbor list entry
- */
-static struct NeighborList *
-setup_new_neighbor (const struct GNUNET_PeerIdentity *peer, const char *addr, size_t sender_address_len)
-{
- struct NeighborList *n;
-
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
- "Setting up new neighbor `%4s', sending our HELLO to introduce ourselves\n",
- GNUNET_i2s (peer));
-#endif
- GNUNET_assert (our_hello != NULL);
- n = GNUNET_malloc (sizeof (struct NeighborList));
- n->next = neighbors;
- neighbors = n;
- n->id = *peer;
- n->last_quota_update = GNUNET_TIME_absolute_get ();
- n->peer_timeout =
- GNUNET_TIME_relative_to_absolute
- (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
- n->quota_in = (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT + 59999) / (60 * 1000);
- add_plugins (n);
- n->timeout_task = GNUNET_SCHEDULER_add_delayed (sched,
- GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
- &neighbor_timeout_task, n);
- transmit_to_peer (NULL, 0,
- (const struct GNUNET_MessageHeader *) our_hello,
- GNUNET_YES, n);
- notify_clients_connect (peer, GNUNET_TIME_UNIT_FOREVER_REL);
- return n;
-}
-
-/*
* We have received a PING message from someone. Need to send a PONG message
- * in response to the peer by any means necessary. Of course, with something
- * like TCP where a connection exists, we may want to send it that way. But
- * we may not be able to make that distinction...
- */
-static int handle_ping(void *cls, const struct GNUNET_MessageHeader *message,
- const struct GNUNET_PeerIdentity *peer,
- const char *sender_address,
- size_t sender_address_len)
+ * in response to the peer by any means necessary.
+ *
+ * FIXME: With something like TCP where a connection exists, we may
+ * want to send it that way. But the current API does not seem to
+ * allow us to do so (can't tell this to the transport!)
+ */
+static int
+handle_ping(void *cls, const struct GNUNET_MessageHeader *message,
+ const struct GNUNET_PeerIdentity *peer,
+ const char *sender_address,
+ size_t sender_address_len)
{
struct TransportPlugin *plugin = cls;
struct TransportPingMessage *ping;
struct TransportPongMessage *pong;
- uint16_t msize;
-
- pong = GNUNET_malloc(sizeof(struct TransportPongMessage));
-
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
- "Processing `%s' from `%s'\n",
- "PING", GNUNET_a2s ((const struct sockaddr *)sender_address, sender_address_len));
-#endif
+ struct NeighbourList *n;
+ struct ReadyList *rl;
+ struct ForeignAddressList *fal;
- msize = ntohs (message->size);
- if (msize < sizeof (struct TransportPingMessage))
+ if (ntohs (message->size) != sizeof (struct TransportPingMessage))
{
GNUNET_break_op (0);
return GNUNET_SYSERR;
sizeof (struct GNUNET_PeerIdentity)))
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- _("Received `%s' message not destined for me!\n"), "PING");
+ _("Received `%s' message not destined for me!\n"),
+ "PING");
return GNUNET_SYSERR;
}
-
- msize -= sizeof (struct TransportPingMessage);
-/*
- * if (GNUNET_OK != tcp_plugin_address_suggested (plugin, &vcm[1], msize))
- {
- GNUNET_break_op (0);
- GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
- return;
- }
-
- if (GNUNET_OK != GNUNET_SERVER_client_get_address (client, &addr, &addrlen))
- {
- GNUNET_break (0);
- GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
- return;
- }
-*/
+#if DEBUG_TRANSPORT
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
+ "Processing `%s' from `%s'\n",
+ "PING",
+ GNUNET_a2s ((const struct sockaddr *)sender_address,
+ sender_address_len));
+#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# PING messages received"),
+ 1,
+ GNUNET_NO);
pong = GNUNET_malloc (sizeof (struct TransportPongMessage) + sender_address_len);
pong->header.size = htons (sizeof (struct TransportPongMessage) + sender_address_len);
pong->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_PONG);
sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) + sender_address_len);
pong->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_TCP_PING);
pong->challenge = ping->challenge;
-
- memcpy(&pong->signer, &my_public_key, sizeof(struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
+ pong->addrlen = htons(sender_address_len);
+ memcpy(&pong->signer,
+ &my_public_key,
+ sizeof(struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
memcpy (&pong[1], sender_address, sender_address_len);
GNUNET_assert (GNUNET_OK ==
GNUNET_CRYPTO_rsa_sign (my_private_key,
&pong->purpose, &pong->signature));
- transmit_to_peer(NULL, TRANSPORT_DEFAULT_PRIORITY, &pong->header, GNUNET_NO, find_neighbor(peer, NULL, 0));
- /* plugin->api->send(); */ /* We can't directly send back along received address, because
- the port we saw for the peer (for TCP especially) will not
- likely be the open port on the other side! */
+ n = find_neighbour(peer);
+ if (n == NULL)
+ n = setup_new_neighbour(peer);
+ /* broadcast 'PONG' to all available addresses */
+ rl = n->plugins;
+ while (rl != NULL)
+ {
+ fal = rl->addresses;
+ while (fal != NULL)
+ {
+ transmit_to_peer(NULL, fal,
+ TRANSPORT_PONG_PRIORITY,
+ HELLO_VERIFICATION_TIMEOUT,
+ (const char *)pong,
+ ntohs(pong->header.size),
+ GNUNET_YES,
+ n);
+ fal = fal->next;
+ }
+ rl = rl->next;
+ }
GNUNET_free(pong);
return GNUNET_OK;
}
+
/**
* Function called by the plugin for each received message.
* Update data volumes, possibly notify plugins about
* and generally forward to our receive callback.
*
* @param cls the "struct TransportPlugin *" we gave to the plugin
- * @param message the message, NULL if peer was disconnected
- * @param distance the transport cost to this peer (not latency!)
- * @param sender_address the address that the sender reported
- * (opaque to transport service)
- * @param sender_address_len the length of the sender address
* @param peer (claimed) identity of the other peer
- * @return the new service_context that the plugin should use
- * for future receive calls for messages from this
- * particular peer
- *
- */
-static void
+ * @param message the message, NULL if we only care about
+ * learning about the delay until we should receive again
+ * @param distance in overlay hops; use 1 unless DV (or 0 if message == NULL)
+ * @param sender_address binary address of the sender (if observed)
+ * @param sender_address_len number of bytes in sender_address
+ * @return how long the plugin should wait until receiving more data
+ * (plugins that do not support this, can ignore the return value)
+ */
+static struct GNUNET_TIME_Relative
plugin_env_receive (void *cls, const struct GNUNET_PeerIdentity *peer,
const struct GNUNET_MessageHeader *message,
unsigned int distance, const char *sender_address,
size_t sender_address_len)
{
- const struct GNUNET_MessageHeader ack = {
- htons (sizeof (struct GNUNET_MessageHeader)),
- htons (GNUNET_MESSAGE_TYPE_TRANSPORT_ACK)
- };
struct ReadyList *service_context;
struct TransportPlugin *plugin = cls;
struct TransportClient *cpos;
struct InboundMessage *im;
+ struct ForeignAddressList *peer_address;
uint16_t msize;
- struct NeighborList *n;
+ struct NeighbourList *n;
+ struct GNUNET_TIME_Relative ret;
- n = find_neighbor (peer, sender_address, sender_address_len);
+ n = find_neighbour (peer);
if (n == NULL)
- {
- if (message == NULL)
- return; /* disconnect of peer already marked down */
- n = setup_new_neighbor (peer, sender_address, sender_address_len);
- }
+ n = setup_new_neighbour (peer);
service_context = n->plugins;
while ((service_context != NULL) && (plugin != service_context->plugin))
service_context = service_context->next;
GNUNET_assert ((plugin->api->send == NULL) || (service_context != NULL));
- if (message == NULL)
- {
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
- "Receive failed from `%4s', triggering disconnect\n",
- GNUNET_i2s (&n->id));
-#endif
- /* TODO: call stats */
- if (service_context != NULL)
- service_context->connected = GNUNET_NO;
- disconnect_neighbor (n, GNUNET_YES);
- return;
- }
+ if (message != NULL)
+ {
+ peer_address = add_peer_address(n,
+ plugin->short_name,
+ sender_address,
+ sender_address_len);
+ if (peer_address != NULL)
+ {
+ peer_address->distance = distance;
+ if (peer_address->connected == GNUNET_NO)
+ {
+ peer_address->connected = GNUNET_YES;
+ peer_address->connect_attempts++;
+ }
+ peer_address->timeout
+ =
+ GNUNET_TIME_relative_to_absolute
+ (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
+ }
+ /* update traffic received amount ... */
+ msize = ntohs (message->size);
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# bytes received from other peers"),
+ msize,
+ GNUNET_NO);
+ n->distance = distance;
+ n->peer_timeout =
+ GNUNET_TIME_relative_to_absolute
+ (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
+ GNUNET_SCHEDULER_cancel (sched,
+ n->timeout_task);
+ n->timeout_task =
+ GNUNET_SCHEDULER_add_delayed (sched,
+ GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
+ &neighbour_timeout_task, n);
+ if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD)
+ {
+ /* dropping message due to frequent inbound volume violations! */
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING |
+ GNUNET_ERROR_TYPE_BULK,
+ _
+ ("Dropping incoming message due to repeated bandwidth quota (%u b/s) violations (total of %u).\n"),
+ n->in_tracker.available_bytes_per_s__,
+ n->quota_violation_count);
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# bandwidth quota violations by other peers"),
+ 1,
+ GNUNET_NO);
+ return GNUNET_TIME_UNIT_MINUTES; /* minimum penalty, likely ignored (UDP...) */
+ }
+ switch (ntohs (message->type))
+ {
+ case GNUNET_MESSAGE_TYPE_HELLO:
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# HELLO messages received from other peers"),
+ 1,
+ GNUNET_NO);
+ process_hello (plugin, message);
+ break;
+ case GNUNET_MESSAGE_TYPE_TRANSPORT_PING:
+ handle_ping(plugin, message, peer, sender_address, sender_address_len);
+ break;
+ case GNUNET_MESSAGE_TYPE_TRANSPORT_PONG:
+ handle_pong(plugin, message, peer, sender_address, sender_address_len);
+ break;
+ default:
#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
- "Processing message of type `%u' received by plugin...\n",
- ntohs (message->type));
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received message of type %u from `%4s', sending to all clients.\n",
+ ntohs (message->type), GNUNET_i2s (peer));
#endif
- if (service_context != NULL)
- {
- if (service_context->connected == GNUNET_NO)
- {
- service_context->connected = GNUNET_YES;
- service_context->transmit_ready = GNUNET_YES;
- service_context->connect_attempts++;
- }
- service_context->timeout
- =
- GNUNET_TIME_relative_to_absolute
- (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
- /* service_context->latency = latency; */ /* This value should be set by us! */
- }
- /* update traffic received amount ... */
- msize = ntohs (message->size);
- n->last_received += msize;
- GNUNET_SCHEDULER_cancel (sched, n->timeout_task);
- n->peer_timeout =
- GNUNET_TIME_relative_to_absolute
- (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
- n->timeout_task =
- GNUNET_SCHEDULER_add_delayed (sched,
- GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
- &neighbor_timeout_task, n);
- update_quota (n);
- if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD)
- {
- /* dropping message due to frequent inbound volume violations! */
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING |
- GNUNET_ERROR_TYPE_BULK,
- _
- ("Dropping incoming message due to repeated bandwidth quota violations.\n"));
- /* TODO: call stats */
- GNUNET_assert ((service_context == NULL) ||
- (NULL != service_context->neighbor));
- return;
- }
- switch (ntohs (message->type))
+ if (GNUNET_YES == GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker,
+ msize))
+ n->quota_violation_count++;
+ else
+ n->quota_violation_count = 0; /* back within limits */
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# payload received from other peers"),
+ msize,
+ GNUNET_NO);
+ /* transmit message to all clients */
+ im = GNUNET_malloc (sizeof (struct InboundMessage) + msize);
+ im->header.size = htons (sizeof (struct InboundMessage) + msize);
+ im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
+ im->latency = GNUNET_TIME_relative_hton (n->latency);
+ im->peer = *peer;
+ memcpy (&im[1], message, msize);
+ cpos = clients;
+ while (cpos != NULL)
+ {
+ transmit_to_client (cpos, &im->header, GNUNET_YES);
+ cpos = cpos->next;
+ }
+ GNUNET_free (im);
+ }
+ }
+ ret = GNUNET_BANDWIDTH_tracker_get_delay (&n->in_tracker, 0);
+ if (ret.value > 0)
{
- case GNUNET_MESSAGE_TYPE_HELLO:
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Receiving `%s' message from `%4s'.\n", "HELLO",
- GNUNET_i2s (peer));
-#endif
- process_hello (plugin, message);
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Sending `%s' message to connecting peer `%4s'.\n", "ACK",
- GNUNET_i2s (peer));
-#endif
- transmit_to_peer (NULL, 0, &ack, GNUNET_YES, n);
- break;
- case GNUNET_MESSAGE_TYPE_TRANSPORT_PING:
- handle_ping(plugin, message, peer, sender_address, sender_address_len);
- break;
- case GNUNET_MESSAGE_TYPE_TRANSPORT_PONG:
- handle_pong(plugin, message, peer, sender_address, sender_address_len);
- break;
- //plugin_env_notify_validation();
- case GNUNET_MESSAGE_TYPE_TRANSPORT_ACK:
- n->saw_ack = GNUNET_YES;
- /* intentional fall-through! */
- default:
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received message of type %u from `%4s', sending to all clients.\n",
- ntohs (message->type), GNUNET_i2s (peer));
-#endif
- /* transmit message to all clients */
- im = GNUNET_malloc (sizeof (struct InboundMessage) + msize);
- im->header.size = htons (sizeof (struct InboundMessage) + msize);
- im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
- im->latency = n->latency;
- im->peer = *peer;
- memcpy (&im[1], message, msize);
-
- cpos = clients;
- while (cpos != NULL)
- {
- transmit_to_client (cpos, &im->header, GNUNET_YES);
- cpos = cpos->next;
- }
- GNUNET_free (im);
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Throttling read (%llu bytes excess at %u b/s), waiting %llums before reading more.\n",
+ (unsigned long long) n->in_tracker.consumption_since_last_update__,
+ (unsigned int) n->in_tracker.available_bytes_per_s__,
+ (unsigned long long) ret.value);
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# ms throttling suggested"),
+ (int64_t) ret.value,
+ GNUNET_NO);
}
- GNUNET_assert ((service_context == NULL) ||
- (NULL != service_context->neighbor));
+ return ret;
}
{
struct TransportClient *c;
struct ConnectInfoMessage cim;
- struct NeighborList *n;
- struct InboundMessage *im;
- struct GNUNET_MessageHeader *ack;
+ struct NeighbourList *n;
#if DEBUG_TRANSPORT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
/* tell new client about all existing connections */
cim.header.size = htons (sizeof (struct ConnectInfoMessage));
cim.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
- cim.quota_out =
- htonl (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT / (60 * 1000));
- cim.latency = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO); /* FIXME? */
- im = GNUNET_malloc (sizeof (struct InboundMessage) +
- sizeof (struct GNUNET_MessageHeader));
- im->header.size = htons (sizeof (struct InboundMessage) +
- sizeof (struct GNUNET_MessageHeader));
- im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
- im->latency = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO); /* FIXME? */
- ack = (struct GNUNET_MessageHeader *) &im[1];
- ack->size = htons (sizeof (struct GNUNET_MessageHeader));
- ack->type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_ACK);
- for (n = neighbors; n != NULL; n = n->next)
- {
- cim.id = n->id;
- transmit_to_client (c, &cim.header, GNUNET_NO);
- if (n->saw_ack)
- {
- im->peer = n->id;
- transmit_to_client (c, &im->header, GNUNET_NO);
+ n = neighbours;
+ while (n != NULL)
+ {
+ if (GNUNET_YES == n->received_pong)
+ {
+ cim.id = n->id;
+ cim.latency = GNUNET_TIME_relative_hton (n->latency);
+ cim.distance = htonl (n->distance);
+ transmit_to_client (c, &cim.header, GNUNET_NO);
}
+ n = n->next;
}
- GNUNET_free (im);
- }
- else
- {
- fprintf(stderr, "Our hello is NULL!\n");
}
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
{
int ret;
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received `%s' request from client\n", "HELLO");
-#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# HELLOs received from clients"),
+ 1,
+ GNUNET_NO);
ret = process_hello (NULL, message);
GNUNET_SERVER_receive_done (client, ret);
}
const struct GNUNET_MessageHeader *message)
{
struct TransportClient *tc;
- struct NeighborList *n;
+ struct NeighbourList *n;
const struct OutboundMessage *obm;
const struct GNUNET_MessageHeader *obmm;
uint16_t size;
GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
return;
}
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# payload received for other peers"),
+ size,
+ GNUNET_NO);
obm = (const struct OutboundMessage *) message;
#if DEBUG_TRANSPORT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
return;
}
- n = find_neighbor (&obm->peer, NULL, 0);
+ n = find_neighbour (&obm->peer);
if (n == NULL)
- n = setup_new_neighbor (&obm->peer, NULL, 0);
+ n = setup_new_neighbour (&obm->peer);
tc = clients;
while ((tc != NULL) && (tc->client != client))
tc = tc->next;
ntohs (obmm->size),
ntohs (obmm->type), GNUNET_i2s (&obm->peer));
#endif
- transmit_to_peer (tc, ntohl (obm->priority), obmm, GNUNET_NO, n);
+ transmit_to_peer (tc, NULL, ntohl (obm->priority),
+ GNUNET_TIME_relative_ntoh (obm->timeout),
+ (char *)obmm,
+ ntohs (obmm->size), GNUNET_NO, n);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
{
const struct QuotaSetMessage *qsm =
(const struct QuotaSetMessage *) message;
- struct NeighborList *n;
- struct TransportPlugin *p;
- struct ReadyList *rl;
-
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received `%s' request from client for peer `%4s'\n",
- "SET_QUOTA", GNUNET_i2s (&qsm->peer));
-#endif
- n = find_neighbor (&qsm->peer, NULL, 0);
+ struct NeighbourList *n;
+
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# SET QUOTA messages received"),
+ 1,
+ GNUNET_NO);
+ n = find_neighbour (&qsm->peer);
if (n == NULL)
{
GNUNET_SERVER_receive_done (client, GNUNET_OK);
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# SET QUOTA messages ignored (no such peer)"),
+ 1,
+ GNUNET_NO);
return;
}
- update_quota (n);
- if (n->quota_in < ntohl (qsm->quota_in))
- n->last_quota_update = GNUNET_TIME_absolute_get ();
- n->quota_in = ntohl (qsm->quota_in);
- rl = n->plugins;
- while (rl != NULL)
- {
- p = rl->plugin;
- p->api->set_receive_quota (p->api->cls,
- &qsm->peer, ntohl (qsm->quota_in));
- rl = rl->next;
- }
- GNUNET_SERVER_receive_done (client, GNUNET_OK);
-}
-
-
-/**
- * Handle TRY_CONNECT-message.
- *
- * @param cls closure (always NULL)
- * @param client identification of the client
- * @param message the actual message
- */
-static void
-handle_try_connect (void *cls,
- struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *message)
-{
- const struct TryConnectMessage *tcm;
-
- tcm = (const struct TryConnectMessage *) message;
#if DEBUG_TRANSPORT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received `%s' request from client %p asking to connect to `%4s'\n",
- "TRY_CONNECT", client, GNUNET_i2s (&tcm->peer));
-#endif
- if (NULL == find_neighbor (&tcm->peer, NULL, 0))
- setup_new_neighbor (&tcm->peer, NULL, 0); /* Can we set up a truly _new_ neighbor without
- knowing its address? Should we ask the plugin
- for more information about this peer? I don't
- think we can... Or set up new peer should only
- happen when transport notifies us of an address,
- and this setup should check for an address in
- the existing list only */
-#if DEBUG_TRANSPORT
- else
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Client asked to connect to `%4s', but connection already exists\n",
- "TRY_CONNECT", GNUNET_i2s (&tcm->peer));
+ "Received `%s' request (new quota %u, old quota %u) from client for peer `%4s'\n",
+ "SET_QUOTA",
+ (unsigned int) ntohl (qsm->quota.value__),
+ (unsigned int) n->in_tracker.available_bytes_per_s__,
+ GNUNET_i2s (&qsm->peer));
#endif
+ GNUNET_BANDWIDTH_tracker_update_quota (&n->in_tracker,
+ qsm->quota);
+ if (0 == ntohl (qsm->quota.value__))
+ disconnect_neighbour (n, GNUNET_NO);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
+
static void
transmit_address_to_client (void *cls, const char *address)
{
GNUNET_SERVER_transmit_context_run (tc, GNUNET_TIME_UNIT_FOREVER_REL);
}
+
/**
* Handle AddressLookup-message.
*
GNUNET_MESSAGE_TYPE_TRANSPORT_SEND, 0},
{&handle_set_quota, NULL,
GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA, sizeof (struct QuotaSetMessage)},
- {&handle_try_connect, NULL,
- GNUNET_MESSAGE_TYPE_TRANSPORT_TRY_CONNECT,
- sizeof (struct TryConnectMessage)},
{&handle_address_lookup, NULL,
GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_LOOKUP,
0},
plug->env.cls = plug;
plug->env.receive = &plugin_env_receive;
plug->env.notify_address = &plugin_env_notify_address;
- plug->env.default_quota_in =
- (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT + 59999) / (60 * 1000);
plug->env.max_connections = max_connect_per_transport;
+ plug->env.stats = stats;
}
struct TransportClient *prev;
struct ClientMessageQueueEntry *mqe;
- if (client == NULL)
+ if (client == NULL)
return;
#if DEBUG_TRANSPORT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
return;
while (NULL != (mqe = pos->message_queue_head))
{
- pos->message_queue_head = mqe->next;
+ GNUNET_CONTAINER_DLL_remove (pos->message_queue_head,
+ pos->message_queue_tail,
+ mqe);
+ pos->message_count--;
GNUNET_free (mqe);
}
- pos->message_queue_head = NULL;
if (prev == NULL)
clients = pos->next;
else
pos->client = NULL;
return;
}
+ if (pos->th != NULL)
+ {
+ GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->th);
+ pos->th = NULL;
+ }
+ GNUNET_break (0 == pos->message_count);
GNUNET_free (pos);
}
/**
- * Function called when the service shuts down. Unloads our plugins.
+ * Iterator to free entries in the validation_map.
+ *
+ * @param cls closure (unused)
+ * @param key current key code
+ * @param value value in the hash map (validation to abort)
+ * @return GNUNET_YES (always)
+ */
+static int
+abort_validation (void *cls,
+ const GNUNET_HashCode * key,
+ void *value)
+{
+ struct ValidationEntry *va = value;
+
+ GNUNET_SCHEDULER_cancel (sched, va->timeout_task);
+ GNUNET_free (va->transport_name);
+ GNUNET_free (va);
+ return GNUNET_YES;
+}
+
+
+/**
+ * Function called when the service shuts down. Unloads our plugins
+ * and cancels pending validations.
*
* @param cls closure, unused
* @param tc task context (unused)
*/
static void
-unload_plugins (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct TransportPlugin *plug;
- struct AddressList *al;
+ struct OwnAddressList *al;
+ struct CheckHelloValidatedContext *chvc;
+ while (neighbours != NULL)
+ disconnect_neighbour (neighbours, GNUNET_NO);
#if DEBUG_TRANSPORT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Transport service is unloading plugins...\n");
if (my_private_key != NULL)
GNUNET_CRYPTO_rsa_key_free (my_private_key);
GNUNET_free_non_null (our_hello);
+
+ /* free 'chvc' data structure */
+ while (NULL != (chvc = chvc_head))
+ {
+ chvc_head = chvc->next;
+ GNUNET_PEERINFO_iterate_cancel (chvc->piter);
+ GNUNET_free (chvc);
+ }
+ chvc_tail = NULL;
+
+ GNUNET_CONTAINER_multihashmap_iterate (validation_map,
+ &abort_validation,
+ NULL);
+ GNUNET_CONTAINER_multihashmap_destroy (validation_map);
+ validation_map = NULL;
+ if (stats != NULL)
+ {
+ GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
+ stats = NULL;
+ }
}
sched = s;
cfg = c;
+ stats = GNUNET_STATISTICS_create (sched, "transport", cfg);
+ validation_map = GNUNET_CONTAINER_multihashmap_create (64);
/* parse configuration */
if ((GNUNET_OK !=
GNUNET_CONFIGURATION_get_value_number (c,
_
("Transport service is lacking key configuration settings. Exiting.\n"));
GNUNET_SCHEDULER_shutdown (s);
+ if (stats != NULL)
+ {
+ GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
+ stats = NULL;
+ }
+ GNUNET_CONTAINER_multihashmap_destroy (validation_map);
+ validation_map = NULL;
return;
}
max_connect_per_transport = (uint32_t) tneigh;
_
("Transport service could not access hostkey. Exiting.\n"));
GNUNET_SCHEDULER_shutdown (s);
+ if (stats != NULL)
+ {
+ GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
+ stats = NULL;
+ }
+ GNUNET_CONTAINER_multihashmap_destroy (validation_map);
+ validation_map = NULL;
return;
}
GNUNET_CRYPTO_rsa_key_get_public (my_private_key, &my_public_key);
}
GNUNET_SCHEDULER_add_delayed (sched,
GNUNET_TIME_UNIT_FOREVER_REL,
- &unload_plugins, NULL);
+ &shutdown_task, NULL);
if (no_transports)
refresh_hello ();