* @brief low-level P2P messaging
* @author Christian Grothoff
*
- * TODO:
- * - remove AddressValidations, incorporate them into the PeerAddressLists
+ * NOTE:
+ * - This code uses 'GNUNET_a2s' for debug printing in many places,
+ * which is technically wrong since it assumes we have IP+Port
+ * (v4/v6) addresses. Once we add transports like http or smtp
+ * this will have to be changed!
*/
#include "platform.h"
#include "gnunet_client_lib.h"
+#include "gnunet_container_lib.h"
#include "gnunet_constants.h"
#include "gnunet_getopt_lib.h"
#include "gnunet_hello_lib.h"
#include "plugin_transport.h"
#include "transport.h"
+/**
+ * Should we do some additional checks (to validate behavior
+ * of clients)?
+ */
+#define EXTRA_CHECKS GNUNET_YES
+
/**
* How many messages can we have pending for a given client process
* before we start to drop incoming messages? We typically should
*/
#define MAX_CONNECT_RETRY 3
+/**
+ * Limit on the number of ready-to-run tasks when validating
+ * HELLOs. If more tasks are ready to run, we will drop
+ * HELLOs instead of validating them.
+ */
+#define MAX_HELLO_LOAD 4
+
/**
* How often must a peer violate bandwidth quotas before we start
* to simply drop its messages?
*/
-#define QUOTA_VIOLATION_DROP_THRESHOLD 100
+#define QUOTA_VIOLATION_DROP_THRESHOLD 10
/**
* How long until a HELLO verification attempt should time out?
*/
#define TRANSPORT_DEFAULT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 15)
-#define TRANSPORT_DEFAULT_PRIORITY 4 /* Tired of remembering arbitrary priority names */
+/**
+ * How often will we re-validate for latency information
+ */
+#define TRANSPORT_DEFAULT_REVALIDATION GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 60)
+
+/**
+ * Priority to use for PONG messages.
+ */
+#define TRANSPORT_PONG_PRIORITY 4
/**
* How often do we re-add (cheaper) plugins to our list of plugins
#define PLUGIN_RETRY_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 15)
/**
- * After how long do we expire an address in a HELLO
- * that we just validated? This value is also used
- * for our own addresses when we create a HELLO.
+ * After how long do we expire an address in a HELLO that we just
+ * validated? This value is also used for our own addresses when we
+ * create a HELLO.
*/
#define HELLO_ADDRESS_EXPIRATION GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 12)
+/**
+ * How long before an existing address expires should we again try to
+ * validate it? Must be (significantly) smaller than
+ * HELLO_ADDRESS_EXPIRATION.
+ */
+#define HELLO_REVALIDATION_START_TIME GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 1)
+
+
/**
* List of addresses of other peers
*/
-struct PeerAddressList
+struct ForeignAddressList
{
/**
* This is a linked list.
*/
- struct PeerAddressList *next;
-
- /*
- * Pointer to the validation associated with this
- * address. May be NULL if already validated!
- */
- struct ValidationAddress *validation;
-
- /**
- * Which of our transport plugins does this entry
- * belong to?
- */
- struct TransportPlugin *plugin;
+ struct ForeignAddressList *next;
/**
- * Neighbor this entry belongs to.
- */
- struct NeighborList *neighbor;
-
- /*
- * Ready list (transport) that this peer belongs to
+ * Which ready list does this entry belong to.
*/
struct ReadyList *ready_list;
+
/**
* How long until we auto-expire this address (unless it is
* re-confirmed by the transport)?
struct GNUNET_TIME_Absolute expires;
/**
- * Length of addr.
+ * Task used to re-validate addresses, updates latencies and
+ * verifies liveness.
*/
- size_t addrlen;
+ GNUNET_SCHEDULER_TaskIdentifier revalidate_task;
/**
- * The address
+ * Length of addr.
*/
- char *addr;
+ size_t addrlen;
/**
- * Is this plugin ready to transmit to the specific target?
- * GNUNET_NO if not. Initially, all plugins are marked ready. If a
- * transmission is in progress, "transmit_ready" is set to
- * GNUNET_NO.
+ * The address.
*/
- int transmit_ready;
+ const void *addr;
/**
* What was the last latency observed for this plugin
struct GNUNET_TIME_Absolute timeout;
/**
- * Is this plugin currently connected? The first time
- * we transmit or send data to a peer via a particular
- * plugin, we set this to GNUNET_YES. If we later get
- * an error (disconnect notification or transmission
- * failure), we set it back to GNUNET_NO. Each time the
- * value is set to GNUNET_YES, we increment the
- * "connect_attempts" counter. If that one reaches a
- * particular threshold, we consider the plugin to not
- * be working properly at this time for the given peer
- * and remove it from the eligible list.
+ * Are we currently connected via this address? The first time we
+ * successfully transmit or receive data to a peer via a particular
+ * address, we set this to GNUNET_YES. If we later get an error
+ * (disconnect notification, transmission failure, timeout), we set
+ * it back to GNUNET_NO.
*/
int connected;
/**
- * How often have we tried to connect using this plugin?
+ * Is this plugin currently busy transmitting to the specific target?
+ * GNUNET_NO if not (initial, default state is GNUNET_NO). Internal
+ * messages do not count as 'in transmit'.
+ */
+ int in_transmit;
+
+ /**
+ * Has this address been validated yet?
+ */
+ int validated;
+
+ /**
+ * How often have we tried to connect using this plugin? Used to
+ * discriminate against addresses that do not work well.
+ * FIXME: not yet used, but should be!
*/
unsigned int connect_attempts;
+ /**
+ * DV distance to this peer (1 if no DV is used).
+ * FIXME: need to set this from transport plugins!
+ */
+ uint32_t distance;
+
};
/**
- * Entry in linked list of network addresses.
+ * Entry in linked list of network addresses for ourselves.
*/
-struct AddressList
+struct OwnAddressList
{
/**
* This is a linked list.
*/
- struct AddressList *next;
+ struct OwnAddressList *next;
/**
* The address, actually a pointer to the end
* of this struct. Do not free!
*/
- void *addr;
-
+ const void *addr;
+
/**
* How long until we auto-expire this address (unless it is
* re-confirmed by the transport)?
/**
* List of our known addresses for this transport.
*/
- struct AddressList *addresses;
+ struct OwnAddressList *addresses;
/**
* Environment this transport service is using
*/
GNUNET_SCHEDULER_TaskIdentifier address_update_task;
-
/**
* Set to GNUNET_YES if we need to scrap the existing
* list of "addresses" and start fresh when we receive
* to the list and wait for the commit call.
*/
int rebuild;
+
};
-struct NeighborList;
+struct NeighbourList;
/**
- * For each neighbor we keep a list of messages
- * that we still want to transmit to the neighbor.
+ * For each neighbour we keep a list of messages
+ * that we still want to transmit to the neighbour.
*/
struct MessageQueue
{
/**
- * This is a linked list.
+ * This is a doubly linked list.
*/
struct MessageQueue *next;
+ /**
+ * This is a doubly linked list.
+ */
+ struct MessageQueue *prev;
+
/**
* The message(s) we want to transmit, GNUNET_MessageHeader(s)
- * stuck together in memory.
+ * stuck together in memory. Allocated at the end of this struct.
*/
- char *message_buf;
+ const char *message_buf;
- /*
+ /**
* Size of the message buf
*/
size_t message_buf_size;
struct TransportClient *client;
/**
- * Neighbor this entry belongs to.
+ * Using which specific address should we send this message?
*/
- /*struct NeighborList *neighbor;*/
+ struct ForeignAddressList *specific_address;
/**
- * Peer ID of the Neighbor this entry belongs to.
+ * Peer ID of the Neighbour this entry belongs to.
*/
- struct GNUNET_PeerIdentity *neighbor_id;
+ struct GNUNET_PeerIdentity neighbour_id;
/**
* Plugin that we used for the transmission.
*/
struct TransportPlugin *plugin;
+ /**
+ * At what time should we fail?
+ */
+ struct GNUNET_TIME_Absolute timeout;
+
/**
* Internal message of the transport system that should not be
* included in the usual SEND-SEND_OK transmission confirmation
*/
unsigned int priority;
- /*
- * Using which specific address should we send this message?
- */
- struct PeerAddressList *specific_peer;
-
};
/**
- * For a given Neighbor, which plugins are available
+ * For a given Neighbour, which plugins are available
* to talk to this peer and what are their costs?
*/
struct ReadyList
struct TransportPlugin *plugin;
/**
- * Neighbor this entry belongs to.
- */
- struct NeighborList *neighbor;
-
- /*
* Transport addresses, latency, and readiness for
* this particular plugin.
*/
- struct PeerAddressList *addresses;
+ struct ForeignAddressList *addresses;
- /**
- * Is this plugin ready to transmit to the specific target?
- * GNUNET_NO if not. Initially, all plugins are marked ready. If a
- * transmission is in progress, "transmit_ready" is set to
- * GNUNET_NO.
- */
- int plugin_transmit_ready;
-
- /*
- * Are any of our PeerAddressList addresses still connected?
- */
- int connected; /* FIXME: dynamically check PeerAddressList addresses when asked to! */
};
/**
- * Entry in linked list of all of our current neighbors.
+ * Entry in linked list of all of our current neighbours.
*/
-struct NeighborList
+struct NeighbourList
{
/**
* This is a linked list.
*/
- struct NeighborList *next;
+ struct NeighbourList *next;
/**
* Which of our transports is connected to this peer
struct ReadyList *plugins;
/**
- * List of messages we would like to send to this peer;
+ * Head of list of messages we would like to send to this peer;
* must contain at most one message per client.
*/
- struct MessageQueue *messages;
+ struct MessageQueue *messages_head;
/**
- * Identity of this neighbor.
+ * Tail of list of messages we would like to send to this peer; must
+ * contain at most one message per client.
+ */
+ struct MessageQueue *messages_tail;
+
+ /**
+ * Identity of this neighbour.
*/
struct GNUNET_PeerIdentity id;
*/
GNUNET_SCHEDULER_TaskIdentifier timeout_task;
+ /**
+ * ID of task scheduled to run when we should retry transmitting
+ * the head of the message queue.
+ */
+ GNUNET_SCHEDULER_TaskIdentifier retry_task;
+
/**
* How long until we should consider this peer dead
* (if we don't receive another message in the
struct GNUNET_TIME_Absolute peer_timeout;
/**
- * At what time did we reset last_received last?
- */
- struct GNUNET_TIME_Absolute last_quota_update;
-
- /**
- * At what time should we try to again add plugins to
- * our ready list?
- */
- struct GNUNET_TIME_Absolute retry_plugins_time;
-
- /**
- * How many bytes have we received since the "last_quota_update"
- * timestamp?
+ * Tracker for inbound bandwidth.
*/
- uint64_t last_received;
+ struct GNUNET_BANDWIDTH_Tracker in_tracker;
/**
- * Global quota for inbound traffic for the neighbor in bytes/ms.
+ * The latency we have seen for this particular address for
+ * this particular peer. This latency may have been calculated
+ * over multiple transports. This value reflects how long it took
+ * us to receive a response when SENDING via this particular
+ * transport/neighbour/address combination!
+ *
+ * FIXME: we need to periodically send PINGs to update this
+ * latency (at least more often than the current "huge" (11h?)
+ * update interval).
*/
- uint32_t quota_in;
+ struct GNUNET_TIME_Relative latency;
/**
* How often has the other peer (recently) violated the
unsigned int quota_violation_count;
/**
- * Have we seen an ACK from this neighbor in the past?
- * (used to make up a fake ACK for clients connecting after
- * the neighbor connected to us).
+ * DV distance to this peer (1 if no DV is used).
*/
- int received_pong;
+ uint32_t distance;
- /* The latency we have seen for this particular address for
- * this particular peer. This latency may have been calculated
- * over multiple transports. This value reflects how long it took
- * us to receive a response when SENDING via this particular
- * transport/neighbor/address combination!
+ /**
+ * Have we seen an PONG from this neighbour in the past (and
+ * not had a disconnect since)?
*/
- struct GNUNET_TIME_RelativeNBO latency;
+ int received_pong;
};
/**
* Message used to ask a peer to validate receipt (to check an address
- * from a HELLO). Followed by the address used. Note that the
- * recipients response does not affirm that he has this address,
- * only that he got the challenge message.
+ * from a HELLO).
*/
struct TransportPingMessage
{
*/
struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded signer;
- /*
+ /**
* Size of address appended to this message
*/
size_t addrlen;
};
+
/**
- * Linked list of messages to be transmitted to
- * the client. Each entry is followed by the
- * actual message.
+ * Linked list of messages to be transmitted to the client. Each
+ * entry is followed by the actual message.
*/
struct ClientMessageQueueEntry
{
/**
- * This is a linked list.
+ * This is a doubly-linked list.
*/
struct ClientMessageQueueEntry *next;
+
+ /**
+ * This is a doubly-linked list.
+ */
+ struct ClientMessageQueueEntry *prev;
};
*/
struct ClientMessageQueueEntry *message_queue_tail;
+ /**
+ * Current transmit request handle.
+ */
+ struct GNUNET_CONNECTION_TransmitHandle *th;
+
/**
* Is a call to "transmit_send_continuation" pending? If so, we
* must not free this struct (even if the corresponding client
/**
- * For each HELLO, we may have to validate multiple addresses;
- * each address gets its own request entry.
+ * Entry in map of all HELLOs awaiting validation.
*/
-struct ValidationAddress
+struct ValidationEntry
{
- /**
- * This is a linked list.
- */
- struct ValidationAddress *next;
- /*
- * What peer_address does this validation belong to?
+ /**
+ * The address, actually a pointer to the end
+ * of this struct. Do not free!
*/
- struct PeerAddressList *peer_address;
+ const void *addr;
/**
* Name of the transport.
char *transport_name;
/**
- * When should this validated address expire?
+ * The public key of the peer.
*/
- struct GNUNET_TIME_Absolute expiration;
+ struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded publicKey;
+
+ /**
+ * ID of task that will clean up this entry if we don't succeed
+ * with the validation first.
+ */
+ GNUNET_SCHEDULER_TaskIdentifier timeout_task;
- /*
+ /**
* At what time did we send this validation?
*/
struct GNUNET_TIME_Absolute send_time;
/**
- * Challenge number we used.
+ * Length of addr.
*/
- uint32_t challenge;
+ size_t addrlen;
/**
- * Set to GNUNET_YES if the challenge was met,
- * GNUNET_SYSERR if we know it failed, GNUNET_NO
- * if we are waiting on a response.
+ * Challenge number we used.
*/
- int ok;
+ uint32_t challenge;
+
};
/**
- * Entry in linked list of all HELLOs awaiting validation.
+ * Context of currently active requests to peerinfo
+ * for validation of HELLOs.
*/
-struct ValidationList
+struct CheckHelloValidatedContext
{
/**
- * This is a linked list.
+ * This is a doubly-linked list.
*/
- struct ValidationList *next;
+ struct CheckHelloValidatedContext *next;
/**
- * Linked list with one entry per address from the HELLO
- * that needs to be validated.
+ * This is a doubly-linked list.
*/
- struct ValidationAddress *addresses;
+ struct CheckHelloValidatedContext *prev;
/**
- * The public key of the peer.
+ * Hello that we are validating.
*/
- struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded publicKey;
+ const struct GNUNET_HELLO_Message *hello;
/**
- * When does this record time-out? (assuming the
- * challenge goes unanswered)
+ * Context for peerinfo iteration.
+ * NULL after we are done processing peerinfo's information.
+ */
+ struct GNUNET_PEERINFO_IteratorContext *piter;
+
+ /**
+ * Was a HELLO known for this peer to peerinfo?
*/
- struct GNUNET_TIME_Absolute timeout;
+ int hello_known;
};
-
-struct CheckHelloValidatedContext
+/**
+ * Struct for keeping information about addresses to validate
+ * so that we can re-use for sending around ping's and receiving
+ * pongs periodically to keep connections alive and also better
+ * estimate latency of connections.
+ *
+ */
+struct PeriodicValidationContext
{
- /**
- * Plugin for which we are validating.
- */
- struct TransportPlugin *plugin;
/**
- * Hello that we are validating.
+ * The address we are keeping alive
*/
- struct GNUNET_HELLO_Message *hello;
+ struct ForeignAddressList *foreign_address;
/**
- * Validation list being built.
+ * The name of the transport
*/
- struct ValidationList *e;
+ char *transport;
/**
- * Context for peerinfo iteration.
- * NULL after we are done processing peerinfo's information.
+ * Public Key of the peer to re-validate
*/
- struct GNUNET_PEERINFO_IteratorContext *piter;
+ struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded publicKey;
};
-
-
-/**
- * HELLOs awaiting validation.
- */
-static struct ValidationList *pending_validations;
-
/**
* Our HELLO message.
*/
static struct GNUNET_HELLO_Message *our_hello;
/**
- * "version" of "our_hello". Used to see if a given
- * neighbor has already been sent the latest version
- * of our HELLO message.
+ * "version" of "our_hello". Used to see if a given neighbour has
+ * already been sent the latest version of our HELLO message.
*/
static unsigned int our_hello_version;
static struct GNUNET_SERVER_Handle *server;
/**
- * All known neighbors and their HELLOs.
+ * All known neighbours and their HELLOs.
*/
-static struct NeighborList *neighbors;
+static struct NeighbourList *neighbours;
/**
- * Number of neighbors we'd like to have.
+ * Number of neighbours we'd like to have.
*/
static uint32_t max_connect_per_transport;
/**
- * The peer specified by the given neighbor has timed-out or a plugin
+ * Head of linked list.
+ */
+static struct CheckHelloValidatedContext *chvc_head;
+
+/**
+ * Tail of linked list.
+ */
+static struct CheckHelloValidatedContext *chvc_tail;
+
+/**
+ * Map of PeerIdentities to 'struct ValidationEntry*'s (addresses
+ * of the given peer that we are currently validating).
+ */
+static struct GNUNET_CONTAINER_MultiHashMap *validation_map;
+
+/**
+ * Handle for reporting statistics.
+ */
+static struct GNUNET_STATISTICS_Handle *stats;
+
+
+/**
+ * The peer specified by the given neighbour has timed-out or a plugin
* has disconnected. We may either need to do nothing (other plugins
* still up), or trigger a full disconnect and clean up. This
* function updates our state and do the necessary notifications.
- * Also notifies our clients that the neighbor is now officially
+ * Also notifies our clients that the neighbour is now officially
* gone.
*
- * @param n the neighbor list entry for the peer
+ * @param n the neighbour list entry for the peer
* @param check should we just check if all plugins
* disconnected or must we ask all plugins to
* disconnect?
*/
-static void disconnect_neighbor (struct NeighborList *n, int check);
-
+static void disconnect_neighbour (struct NeighbourList *n, int check);
/**
- * Check the ready list for the given neighbor and
- * if a plugin is ready for transmission (and if we
- * have a message), do so!
+ * Check the ready list for the given neighbour and if a plugin is
+ * ready for transmission (and if we have a message), do so!
*
- * @param neighbor target peer for which to check the plugins
+ * @param neighbour target peer for which to transmit
*/
-static ssize_t try_transmission_to_peer (struct NeighborList *neighbor);
+static void try_transmission_to_peer (struct NeighbourList *neighbour);
/**
- * Find an entry in the neighbor list for a particular peer.
+ * Find an entry in the neighbour list for a particular peer.
* if sender_address is not specified (NULL) then return the
* first matching entry. If sender_address is specified, then
* make sure that the address and address_len also matches.
- *
+ *
+ * FIXME: This description does not fit the function.
+ *
* @return NULL if not found.
*/
-static struct NeighborList *
-find_neighbor (const struct GNUNET_PeerIdentity *key)
+static struct NeighbourList *
+find_neighbour (const struct GNUNET_PeerIdentity *key)
{
- struct NeighborList *head = neighbors;
+ struct NeighbourList *head = neighbours;
while ((head != NULL) &&
(0 != memcmp (key, &head->id, sizeof (struct GNUNET_PeerIdentity))))
head = head->next;
-
return head;
}
/**
- * Update the quota values for the given neighbor now.
- */
-static void
-update_quota (struct NeighborList *n)
-{
- struct GNUNET_TIME_Relative delta;
- uint64_t allowed;
- uint64_t remaining;
-
- delta = GNUNET_TIME_absolute_get_duration (n->last_quota_update);
- if (delta.value < MIN_QUOTA_REFRESH_TIME)
- return; /* not enough time passed for doing quota update */
- allowed = delta.value * n->quota_in;
-
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING |
- GNUNET_ERROR_TYPE_BULK,
- _
- ("Update quota: last received is %llu, allowed is %llu\n"), n->last_received, allowed);
-
- if (n->last_received < allowed)
- {
- remaining = allowed - n->last_received;
- if (n->quota_in > 0)
- remaining /= n->quota_in;
- else
- remaining = 0;
- if (remaining > MAX_BANDWIDTH_CARRY)
- remaining = MAX_BANDWIDTH_CARRY;
- n->last_received = 0;
- n->last_quota_update = GNUNET_TIME_absolute_get ();
- n->last_quota_update.value -= remaining;
- if (n->quota_violation_count > 0)
- n->quota_violation_count--;
- }
- else
- {
- n->last_received -= allowed;
- n->last_quota_update = GNUNET_TIME_absolute_get ();
- if (n->last_received > allowed)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING |
- GNUNET_ERROR_TYPE_BULK,
- _
- ("LAST RECEIVED: %llu greater than allowed : %llu\n"), n->last_received, allowed);
- /* more than twice the allowed rate! */
- n->quota_violation_count += 10;
- }
- }
-}
-
-
-/**
- * Function called to notify a client about the socket
- * being ready to queue more data. "buf" will be
- * NULL and "size" zero if the socket was closed for
- * writing in the meantime.
+ * Function called to notify a client about the socket being ready to
+ * queue more data. "buf" will be NULL and "size" zero if the socket
+ * was closed for writing in the meantime.
*
* @param cls closure
* @param size number of bytes available in buf
uint16_t msize;
size_t tsize;
const struct GNUNET_MessageHeader *msg;
- struct GNUNET_CONNECTION_TransmitHandle *th;
char *cbuf;
+ client->th = NULL;
if (buf == NULL)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
/* fatal error with client, free message queue! */
while (NULL != (q = client->message_queue_head))
{
- client->message_queue_head = q->next;
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# bytes discarded (could not transmit to client)"),
+ ntohs (((const struct GNUNET_MessageHeader*)&q[1])->size),
+ GNUNET_NO);
+ GNUNET_CONTAINER_DLL_remove (client->message_queue_head,
+ client->message_queue_tail,
+ q);
GNUNET_free (q);
}
- client->message_queue_tail = NULL;
client->message_count = 0;
return 0;
}
"Transmitting message of type %u to client.\n",
ntohs (msg->type));
#endif
- client->message_queue_head = q->next;
- if (q->next == NULL)
- client->message_queue_tail = NULL;
+ GNUNET_CONTAINER_DLL_remove (client->message_queue_head,
+ client->message_queue_tail,
+ q);
memcpy (&cbuf[tsize], msg, msize);
tsize += msize;
GNUNET_free (q);
if (NULL != q)
{
GNUNET_assert (msize >= sizeof (struct GNUNET_MessageHeader));
- th = GNUNET_SERVER_notify_transmit_ready (client->client,
- msize,
- GNUNET_TIME_UNIT_FOREVER_REL,
- &transmit_to_client_callback,
- client);
- GNUNET_assert (th != NULL);
+ client->th = GNUNET_SERVER_notify_transmit_ready (client->client,
+ msize,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &transmit_to_client_callback,
+ client);
+ GNUNET_assert (client->th != NULL);
}
return tsize;
}
{
struct ClientMessageQueueEntry *q;
uint16_t msize;
- struct GNUNET_CONNECTION_TransmitHandle *th;
if ((client->message_count >= MAX_PENDING) && (GNUNET_YES == may_drop))
{
/* TODO: call to statistics... */
return;
}
- client->message_count++;
msize = ntohs (msg->size);
GNUNET_assert (msize >= sizeof (struct GNUNET_MessageHeader));
q = GNUNET_malloc (sizeof (struct ClientMessageQueueEntry) + msize);
memcpy (&q[1], msg, msize);
- /* append to message queue */
- if (client->message_queue_tail == NULL)
- {
- client->message_queue_tail = q;
- }
- else
- {
- client->message_queue_tail->next = q;
- client->message_queue_tail = q;
- }
- if (client->message_queue_head == NULL)
+ GNUNET_CONTAINER_DLL_insert_after (client->message_queue_head,
+ client->message_queue_tail,
+ client->message_queue_tail,
+ q);
+ client->message_count++;
+ if (client->th == NULL)
{
- client->message_queue_head = q;
- th = GNUNET_SERVER_notify_transmit_ready (client->client,
- msize,
- GNUNET_TIME_UNIT_FOREVER_REL,
- &transmit_to_client_callback,
- client);
- GNUNET_assert (th != NULL);
+ client->th = GNUNET_SERVER_notify_transmit_ready (client->client,
+ msize,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &transmit_to_client_callback,
+ client);
+ GNUNET_assert (client->th != NULL);
}
}
/**
- * Find alternative plugins for communication.
+ * Transmit a 'SEND_OK' notification to the given client for the
+ * given neighbour.
*
- * @param neighbor for which neighbor should we try to find
- * more plugins?
+ * @param client who to notify
+ * @param n neighbour to notify about
+ * @param result status code for the transmission request
*/
static void
-try_alternative_plugins (struct NeighborList *neighbor)
+transmit_send_ok (struct TransportClient *client,
+ struct NeighbourList *n,
+ int result)
{
- struct ReadyList *rl;
-
- if ((neighbor->plugins != NULL) &&
- (neighbor->retry_plugins_time.value >
- GNUNET_TIME_absolute_get ().value))
- return; /* don't try right now */
- neighbor->retry_plugins_time
- = GNUNET_TIME_relative_to_absolute (PLUGIN_RETRY_FREQUENCY);
+ struct SendOkMessage send_ok_msg;
- rl = neighbor->plugins;
-#if WTF /* FIXME: What is this supposed to do? */
- while (rl != NULL)
- {
- if (rl->connect_attempts > 0)
- rl->connect_attempts--; /* amnesty */
- rl = rl->next;
- }
-#endif
+ send_ok_msg.header.size = htons (sizeof (send_ok_msg));
+ send_ok_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
+ send_ok_msg.success = htonl (result);
+ send_ok_msg.latency = GNUNET_TIME_relative_hton (n->latency);
+ send_ok_msg.peer = n->id;
+ transmit_to_client (client, &send_ok_msg.header, GNUNET_NO);
}
int result)
{
struct MessageQueue *mq = cls;
- /*struct ReadyList *rl;*/ /* We no longer use the ReadyList for anything here, safe to remove? */
- struct SendOkMessage send_ok_msg;
- struct NeighborList *n;
-
- GNUNET_assert (mq != NULL);
- n = find_neighbor(mq->neighbor_id);
- if (n == NULL) /* Neighbor must have been removed asynchronously! */
- return;
-
- /* Otherwise, let's make sure we've got the right peer */
- GNUNET_assert (0 ==
- memcmp (&n->id, target,
- sizeof (struct GNUNET_PeerIdentity)));
+ struct NeighbourList *n;
if (result == GNUNET_OK)
{
- mq->specific_peer->timeout =
- GNUNET_TIME_relative_to_absolute
- (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# bytes successfully transmitted by plugins"),
+ mq->message_buf_size,
+ GNUNET_NO);
}
else
{
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Transmission to peer `%s' failed, marking connection as down.\n",
- GNUNET_i2s (target));
- mq->specific_peer->connected = GNUNET_NO;
- }
- if (!mq->internal_msg)
- {
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Setting transmit_ready on transport!\n");
-#endif
- mq->specific_peer->transmit_ready = GNUNET_YES;
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# bytes with transmission failure by plugins"),
+ mq->message_buf_size,
+ GNUNET_NO);
+ }
+ n = find_neighbour(&mq->neighbour_id);
+ GNUNET_assert (n != NULL);
+ if (mq->specific_address != NULL)
+ {
+ if (result == GNUNET_OK)
+ {
+ mq->specific_address->timeout =
+ GNUNET_TIME_relative_to_absolute
+ (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
+ mq->specific_address->connected = GNUNET_YES;
+ }
+ else
+ {
+ mq->specific_address->connected = GNUNET_NO;
+ }
+ if (! mq->internal_msg)
+ mq->specific_address->in_transmit = GNUNET_NO;
}
-
if (mq->client != NULL)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Notifying client %p about transmission to peer `%4s'.\n",
- mq->client, GNUNET_i2s (target));
- send_ok_msg.header.size = htons (sizeof (send_ok_msg));
- send_ok_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
- send_ok_msg.success = htonl (result);
- send_ok_msg.peer = n->id;
- transmit_to_client (mq->client, &send_ok_msg.header, GNUNET_NO);
- }
- GNUNET_free (mq->message_buf);
- GNUNET_free (mq->neighbor_id);
+ transmit_send_ok (mq->client, n, result);
GNUNET_free (mq);
- /* one plugin just became ready again, try transmitting
- another message (if available) */
- if (result == GNUNET_OK)
- try_transmission_to_peer (n);
- else
- disconnect_neighbor (n, GNUNET_YES);
+ try_transmission_to_peer (n);
}
-
-
-struct PeerAddressList *
-find_ready_address(struct NeighborList *neighbor)
+/**
+ * Find an address in any of the available transports for
+ * the given neighbour that would be good for message
+ * transmission. This is essentially the transport selection
+ * routine.
+ *
+ * @param neighbour for whom to select an address
+ * @return selected address, NULL if we have none
+ */
+struct ForeignAddressList *
+find_ready_address(struct NeighbourList *neighbour)
{
- struct ReadyList *head = neighbor->plugins;
- struct PeerAddressList *addresses;
+ struct ReadyList *head = neighbour->plugins;
+ struct ForeignAddressList *addresses;
struct GNUNET_TIME_Absolute now = GNUNET_TIME_absolute_get ();
- struct GNUNET_TIME_Relative min_latency = GNUNET_TIME_relative_get_forever();
- struct PeerAddressList *best_address;
+ struct ForeignAddressList *best_address;
best_address = NULL;
while (head != NULL)
{
addresses = head->addresses;
-
while (addresses != NULL)
{
- if ((addresses->timeout.value < now.value) && (addresses->connected == GNUNET_YES))
+ if ( (addresses->timeout.value < now.value) &&
+ (addresses->connected == GNUNET_YES) )
{
#if DEBUG_TRANSPORT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Marking long-time inactive connection to `%4s' as down.\n",
- GNUNET_i2s (&addresses->ready_list->neighbor->id));
+ GNUNET_i2s (&neighbour->id));
#endif
addresses->connected = GNUNET_NO;
}
addresses = head->addresses;
while (addresses != NULL)
{
- if ((addresses->connected == GNUNET_YES) &&
- (addresses->transmit_ready == GNUNET_YES) &&
- ((best_address == NULL) || (addresses->latency.value < best_address->latency.value)))
- {
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Found address with latency %llu (previous best was %llu), setting as best found yet!\n",
- addresses->latency.value, best_address == NULL ? min_latency.value : best_address->latency.value);
-#endif
- best_address = addresses;
- }
+ if ( ( (best_address == NULL) ||
+ (addresses->connected == GNUNET_YES) ||
+ (best_address->connected == GNUNET_NO) ) &&
+ (addresses->in_transmit == GNUNET_NO) &&
+ ( (best_address == NULL) ||
+ (addresses->latency.value < best_address->latency.value)) )
+ best_address = addresses;
+ /* FIXME: also give lower-latency addresses that are not
+ connected a chance some times... */
addresses = addresses->next;
}
head = head->next;
if (best_address != NULL)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Best address found has latency of %llu!\n",
+ "Best address found has latency of %llu ms.\n",
best_address->latency.value);
}
#endif
}
+
+/**
+ * We should re-try transmitting to the given peer,
+ * hopefully we've learned something in the meantime.
+ */
+static void
+retry_transmission_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct NeighbourList *n = cls;
+
+ n->retry_task = GNUNET_SCHEDULER_NO_TASK;
+ try_transmission_to_peer (n);
+}
+
+
/**
- * Check the ready list for the given neighbor and
- * if a plugin is ready for transmission (and if we
- * have a message), do so!
+ * Check the ready list for the given neighbour and if a plugin is
+ * ready for transmission (and if we have a message), do so!
+ *
+ * @param neighbour target peer for which to transmit
*/
-static ssize_t
-try_transmission_to_peer (struct NeighborList *neighbor)
+static void
+try_transmission_to_peer (struct NeighbourList *neighbour)
{
- struct GNUNET_TIME_Relative min_latency;
struct ReadyList *rl;
struct MessageQueue *mq;
- struct GNUNET_TIME_Absolute now;
+ struct GNUNET_TIME_Relative timeout;
- if (neighbor->messages == NULL)
- return 0; /* nothing to do */
- try_alternative_plugins (neighbor);
- min_latency = GNUNET_TIME_UNIT_FOREVER_REL;
+ if (neighbour->messages_head == NULL)
+ return; /* nothing to do */
rl = NULL;
- mq = neighbor->messages;
- now = GNUNET_TIME_absolute_get ();
-
- if (mq->specific_peer == NULL)
- mq->specific_peer = find_ready_address(neighbor); /* Find first available (or best!) address to transmit to */
-
- if (mq->specific_peer == NULL)
- {
+ mq = neighbour->messages_head;
+ /* FIXME: support bi-directional use of TCP */
+ if (mq->specific_address == NULL)
+ mq->specific_address = find_ready_address(neighbour);
+ if (mq->specific_address == NULL)
+ {
+ timeout = GNUNET_TIME_absolute_get_remaining (mq->timeout);
+ if (timeout.value == 0)
+ {
+#if DEBUG_TRANSPORT
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "No destination address available to transmit message of size %u to peer `%4s'\n",
+ mq->message_buf_size,
+ GNUNET_i2s (&mq->neighbour_id));
+#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# bytes in message queue for other peers"),
+ -mq->message_buf_size,
+ GNUNET_NO);
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# bytes discarded (no destination address available)"),
+ mq->message_buf_size,
+ GNUNET_NO);
+ if (mq->client != NULL)
+ transmit_send_ok (mq->client, neighbour, GNUNET_NO);
+ GNUNET_CONTAINER_DLL_remove (neighbour->messages_head,
+ neighbour->messages_tail,
+ mq);
+ GNUNET_free (mq);
+ return; /* nobody ready */
+ }
+ if (neighbour->retry_task != GNUNET_SCHEDULER_NO_TASK)
+ GNUNET_SCHEDULER_cancel (sched,
+ neighbour->retry_task);
+ neighbour->retry_task = GNUNET_SCHEDULER_add_delayed (sched,
+ timeout,
+ &retry_transmission_task,
+ neighbour);
#if DEBUG_TRANSPORT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "No plugin ready to transmit message\n");
+ "No validated destination address available to transmit message of size %u to peer `%4s', will wait %llums to find an address.\n",
+ mq->message_buf_size,
+ GNUNET_i2s (&mq->neighbour_id),
+ timeout.value);
#endif
- return 0; /* nobody ready */
- }
-
- rl = mq->specific_peer->ready_list;
- neighbor->messages = mq->next;
+ /* FIXME: might want to trigger peerinfo lookup here
+ (unless that's already pending...) */
+ return;
+ }
+ GNUNET_CONTAINER_DLL_remove (neighbour->messages_head,
+ neighbour->messages_tail,
+ mq);
+ if (mq->specific_address->connected == GNUNET_NO)
+ mq->specific_address->connect_attempts++;
+ rl = mq->specific_address->ready_list;
mq->plugin = rl->plugin;
if (!mq->internal_msg)
- mq->specific_peer->transmit_ready = GNUNET_NO;
+ mq->specific_address->in_transmit = GNUNET_YES;
#if DEBUG_TRANSPORT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Giving message of size `%u' for `%4s' to plugin `%s'\n",
+ "Sending message of size %u for `%4s' to `%s' via plugin `%s'\n",
mq->message_buf_size,
- GNUNET_i2s (&neighbor->id), rl->plugin->short_name);
+ GNUNET_i2s (&neighbour->id),
+ GNUNET_a2s (mq->specific_address->addr,
+ mq->specific_address->addrlen),
+ rl->plugin->short_name);
#endif
-
- return rl->plugin->api->send (rl->plugin->api->cls,
- mq->neighbor_id,
- mq->message_buf,
- mq->message_buf_size,
- mq->priority,
- GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
- mq->specific_peer->addr,
- mq->specific_peer->addrlen,
- GNUNET_YES,
- &transmit_send_continuation, mq);
-
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# bytes in message queue for other peers"),
+ -mq->message_buf_size,
+ GNUNET_NO);
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# bytes transmitted to other peers"),
+ mq->message_buf_size,
+ GNUNET_NO);
+ rl->plugin->api->send (rl->plugin->api->cls,
+ &mq->neighbour_id,
+ mq->message_buf,
+ mq->message_buf_size,
+ mq->priority,
+ GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
+ mq->specific_address->addr,
+ mq->specific_address->addrlen,
+ GNUNET_YES /* FIXME: sometimes, we want to be more tolerant here! */,
+ &transmit_send_continuation, mq);
}
* Send the specified message to the specified peer.
*
* @param client source of the transmission request (can be NULL)
- * @param peer_address PeerAddressList where we should send this message
+ * @param peer_address ForeignAddressList where we should send this message
* @param priority how important is the message
+ * @param timeout how long do we have to transmit?
* @param message_buf message(s) to send GNUNET_MessageHeader(s)
* @param message_buf_size total size of all messages in message_buf
- * @param is_internal is this an internal message
- * @param neighbor handle to the neighbor for transmission
+ * @param is_internal is this an internal message; these are pre-pended and
+ * also do not count for plugins being "ready" to transmit
+ * @param neighbour handle to the neighbour for transmission
*/
-static ssize_t
+static void
transmit_to_peer (struct TransportClient *client,
- struct PeerAddressList *peer_address,
+ struct ForeignAddressList *peer_address,
unsigned int priority,
+ struct GNUNET_TIME_Relative timeout,
const char *message_buf,
size_t message_buf_size,
- int is_internal, struct NeighborList *neighbor)
+ int is_internal, struct NeighbourList *neighbour)
{
struct MessageQueue *mq;
- struct MessageQueue *mqe;
- char *m;
+#if EXTRA_CHECKS
if (client != NULL)
{
/* check for duplicate submission */
- mq = neighbor->messages;
+ mq = neighbour->messages_head;
while (NULL != mq)
{
if (mq->client == client)
{
/* client transmitted to same peer twice
- before getting SendOk! */
+ before getting SEND_OK! */
GNUNET_break (0);
- return 0;
+ return;
}
mq = mq->next;
}
}
- mq = GNUNET_malloc (sizeof (struct MessageQueue));
- mq->specific_peer = peer_address;
+#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# bytes in message queue for other peers"),
+ message_buf_size,
+ GNUNET_NO);
+ mq = GNUNET_malloc (sizeof (struct MessageQueue) + message_buf_size);
+ mq->specific_address = peer_address;
mq->client = client;
- m = GNUNET_malloc (message_buf_size);
- memcpy (m, message_buf, message_buf_size);
- mq->message_buf = m;
+ memcpy (&mq[1], message_buf, message_buf_size);
+ mq->message_buf = (const char*) &mq[1];
mq->message_buf_size = message_buf_size;
- mq->neighbor_id = GNUNET_malloc(sizeof (struct GNUNET_PeerIdentity));
-
- memcpy(mq->neighbor_id, &neighbor->id, sizeof(struct GNUNET_PeerIdentity));
+ memcpy(&mq->neighbour_id, &neighbour->id, sizeof(struct GNUNET_PeerIdentity));
mq->internal_msg = is_internal;
mq->priority = priority;
-
- /* find tail */
- mqe = neighbor->messages;
- if (mqe != NULL)
- while (mqe->next != NULL)
- mqe = mqe->next;
- if (mqe == NULL)
- {
- /* new head */
- neighbor->messages = mq;
- }
+ mq->timeout = GNUNET_TIME_relative_to_absolute (timeout);
+ if (is_internal)
+ GNUNET_CONTAINER_DLL_insert (neighbour->messages_head,
+ neighbour->messages_tail,
+ mq);
else
- {
- /* append */
- mqe->next = mq;
- }
- return try_transmission_to_peer (neighbor);
+ GNUNET_CONTAINER_DLL_insert_after (neighbour->messages_head,
+ neighbour->messages_tail,
+ neighbour->messages_tail,
+ mq);
+ try_transmission_to_peer (neighbour);
}
struct GeneratorContext
{
struct TransportPlugin *plug_pos;
- struct AddressList *addr_pos;
+ struct OwnAddressList *addr_pos;
struct GNUNET_TIME_Absolute expiration;
};
{
struct GNUNET_HELLO_Message *hello;
struct TransportClient *cpos;
- struct NeighborList *npos;
+ struct NeighbourList *npos;
struct GeneratorContext gc;
gc.plug_pos = plugins;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
"Refreshed my `%s', new size is %d\n", "HELLO", GNUNET_HELLO_size(hello));
#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# refreshed my HELLO"),
+ 1,
+ GNUNET_NO);
cpos = clients;
while (cpos != NULL)
{
our_hello = hello;
our_hello_version++;
GNUNET_PEERINFO_add_peer (cfg, sched, &my_identity, our_hello);
- npos = neighbors;
+ npos = neighbours;
while (npos != NULL)
{
#if DEBUG_TRANSPORT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
- "Transmitting updated `%s' to neighbor `%4s'\n",
+ "Transmitting updated `%s' to neighbour `%4s'\n",
"HELLO", GNUNET_i2s (&npos->id));
-#endif // FIXME: just testing
- //transmit_to_peer (NULL, NULL, 0,
- // (const char *) our_hello, GNUNET_HELLO_size(our_hello),
- // GNUNET_YES, npos);
+#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# transmitted my HELLO to other peers"),
+ 1,
+ GNUNET_NO);
+ transmit_to_peer (NULL, NULL, 0,
+ HELLO_ADDRESS_EXPIRATION,
+ (const char *) our_hello,
+ GNUNET_HELLO_size(our_hello),
+ GNUNET_NO, npos);
npos = npos->next;
}
}
struct GNUNET_TIME_Relative min_remaining;
struct GNUNET_TIME_Relative remaining;
struct GNUNET_TIME_Absolute now;
- struct AddressList *pos;
- struct AddressList *prev;
- struct AddressList *next;
+ struct OwnAddressList *pos;
+ struct OwnAddressList *prev;
+ struct OwnAddressList *next;
int expired;
if (plugin->address_update_task != GNUNET_SCHEDULER_NO_TASK)
struct GNUNET_TIME_Relative expires)
{
struct TransportPlugin *p = cls;
- struct AddressList *al;
+ struct OwnAddressList *al;
struct GNUNET_TIME_Absolute abex;
abex = GNUNET_TIME_relative_to_absolute (expires);
al = al->next;
}
- al = GNUNET_malloc (sizeof (struct AddressList) + addrlen);
+ al = GNUNET_malloc (sizeof (struct OwnAddressList) + addrlen);
al->addr = &al[1];
al->next = p->addresses;
p->addresses = al;
*/
static void
notify_clients_connect (const struct GNUNET_PeerIdentity *peer,
- struct GNUNET_TIME_Relative latency)
+ struct GNUNET_TIME_Relative latency,
+ uint32_t distance)
{
struct ConnectInfoMessage cim;
struct TransportClient *cpos;
- cim.header.size = htons (sizeof (struct ConnectInfoMessage));
+#if DEBUG_TRANSPORT
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Notifying clients about connection from `%s'\n",
+ GNUNET_i2s (peer));
+#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# peers connected"),
+ 1,
+ GNUNET_NO);
+ cim.header.size = htons (sizeof (struct ConnectInfoMessage));
cim.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
- cim.quota_out = htonl (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT / (60 * 1000));
+ cim.distance = htonl (distance);
cim.latency = GNUNET_TIME_relative_hton (latency);
memcpy (&cim.id, peer, sizeof (struct GNUNET_PeerIdentity));
cpos = clients;
struct DisconnectInfoMessage dim;
struct TransportClient *cpos;
+#if DEBUG_TRANSPORT
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Notifying clients about lost connection to `%s'\n",
+ GNUNET_i2s (peer));
+#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# peers connected"),
+ -1,
+ GNUNET_NO);
dim.header.size = htons (sizeof (struct DisconnectInfoMessage));
dim.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT);
dim.reserved = htonl (0);
/**
- * Copy any validated addresses to buf.
+ * Find a ForeignAddressList entry for the given neighbour
+ * that matches the given address and transport.
+ *
+ * @param neighbour which peer we care about
+ * @param tname name of the transport plugin
+ * @param addr binary address
+ * @param addrlen length of addr
+ * @return NULL if no such entry exists
+ */
+static struct ForeignAddressList *
+find_peer_address(struct NeighbourList *neighbour,
+ const char *tname,
+ const char *addr,
+ size_t addrlen)
+{
+ struct ReadyList *head;
+ struct ForeignAddressList *address_head;
+
+ head = neighbour->plugins;
+ while (head != NULL)
+ {
+ if (0 == strcmp (tname, head->plugin->short_name))
+ break;
+ head = head->next;
+ }
+ if (head == NULL)
+ return NULL;
+
+ address_head = head->addresses;
+ while ( (address_head != NULL) &&
+ ( (address_head->addrlen != addrlen) ||
+ (memcmp(address_head->addr, addr, addrlen) != 0) ) )
+ address_head = address_head->next;
+ return address_head;
+}
+
+
+/**
+ * Get the peer address struct for the given neighbour and
+ * address. If it doesn't yet exist, create it.
+ *
+ * @param neighbour which peer we care about
+ * @param tname name of the transport plugin
+ * @param addr binary address
+ * @param addrlen length of addr
+ * @return NULL if we do not have a transport plugin for 'tname'
+ */
+static struct ForeignAddressList *
+add_peer_address(struct NeighbourList *neighbour,
+ const char *tname,
+ const char *addr,
+ size_t addrlen)
+{
+ struct ReadyList *head;
+ struct ForeignAddressList *ret;
+
+ ret = find_peer_address (neighbour, tname, addr, addrlen);
+ if (ret != NULL)
+ return ret;
+ head = neighbour->plugins;
+ while (head != NULL)
+ {
+ if (0 == strcmp (tname, head->plugin->short_name))
+ break;
+ head = head->next;
+ }
+ if (head == NULL)
+ return NULL;
+ ret = GNUNET_malloc(sizeof(struct ForeignAddressList) + addrlen);
+ ret->addr = (const char*) &ret[1];
+ memcpy (&ret[1], addr, addrlen);
+ ret->addrlen = addrlen;
+ ret->expires = GNUNET_TIME_relative_to_absolute
+ (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
+ ret->latency = GNUNET_TIME_relative_get_forever();
+ ret->distance = -1;
+ ret->timeout = GNUNET_TIME_relative_to_absolute
+ (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
+ ret->ready_list = head;
+ ret->next = head->addresses;
+ head->addresses = ret;
+ return ret;
+}
+
+
+/**
+ * Closure for 'add_validated_address'.
+ */
+struct AddValidatedAddressContext
+{
+ /**
+ * Entry that has been validated.
+ */
+ const struct ValidationEntry *ve;
+
+ /**
+ * Flag set after we have added the address so
+ * that we terminate the iteration next time.
+ */
+ int done;
+};
+
+
+/**
+ * Callback function used to fill a buffer of max bytes with a list of
+ * addresses in the format used by HELLOs. Should use
+ * "GNUNET_HELLO_add_address" as a helper function.
*
- * @return 0 once all addresses have been
- * returned
+ * @param cls the 'struct AddValidatedAddressContext' with the validated address
+ * @param max maximum number of bytes that can be written to buf
+ * @param buf where to write the address information
+ * @return number of bytes written, 0 to signal the
+ * end of the iteration.
*/
static size_t
-list_validated_addresses (void *cls, size_t max, void *buf)
+add_validated_address (void *cls,
+ size_t max, void *buf)
{
- struct ValidationAddress **va = cls;
- size_t ret;
+ struct AddValidatedAddressContext *avac = cls;
+ const struct ValidationEntry *ve = avac->ve;
- while ((NULL != *va) && ((*va)->ok != GNUNET_YES))
- *va = (*va)->next;
- if (NULL == *va)
+ if (GNUNET_YES == avac->done)
return 0;
- ret = GNUNET_HELLO_add_address ((*va)->transport_name,
- (*va)->expiration,
- (*va)->peer_address->addr, (*va)->peer_address->addrlen, buf, max);
- *va = (*va)->next;
- return ret;
+ avac->done = GNUNET_YES;
+ return GNUNET_HELLO_add_address (ve->transport_name,
+ GNUNET_TIME_relative_to_absolute (HELLO_ADDRESS_EXPIRATION),
+ ve->addr,
+ ve->addrlen,
+ buf,
+ max);
}
+static void send_periodic_ping(void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc);
+
+
/**
- * HELLO validation cleanup task.
+ * Iterator over hash map entries. Checks if the given validation
+ * entry is for the same challenge as what is given in the PONG.
+ *
+ * @param cls the 'struct TransportPongMessage*'
+ * @param key peer identity
+ * @param value value in the hash map ('struct ValidationEntry')
+ * @return GNUNET_YES if we should continue to
+ * iterate (mismatch), GNUNET_NO if not (entry matched)
*/
-static void
-cleanup_validation (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+static int
+check_pending_validation (void *cls,
+ const GNUNET_HashCode * key,
+ void *value)
{
- struct ValidationAddress *va;
- struct ValidationList *pos;
- struct ValidationList *prev;
- struct GNUNET_TIME_Absolute now;
- struct GNUNET_TIME_Absolute first;
+ const struct TransportPongMessage *pong = cls;
+ struct ValidationEntry *ve = value;
+ struct AddValidatedAddressContext avac;
+ unsigned int challenge = ntohl(pong->challenge);
struct GNUNET_HELLO_Message *hello;
- struct GNUNET_PeerIdentity pid;
- struct NeighborList *n;
+ struct GNUNET_PeerIdentity target;
+ struct NeighbourList *n;
+ struct ForeignAddressList *fal;
+ struct PeriodicValidationContext *periodic_validation_context;
+
+ if (ve->challenge != challenge)
+ return GNUNET_YES;
- now = GNUNET_TIME_absolute_get ();
- prev = NULL;
- pos = pending_validations;
- while (pos != NULL)
- {
- if (pos->timeout.value < now.value)
- {
- if (prev == NULL)
- pending_validations = pos->next;
- else
- prev->next = pos->next;
- va = pos->addresses;
- hello = GNUNET_HELLO_create (&pos->publicKey,
- &list_validated_addresses, &va);
- GNUNET_CRYPTO_hash (&pos->publicKey,
- sizeof (struct
- GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
- &pid.hashPubKey);
#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Creating persistent `%s' message for peer `%4s' based on confirmed addresses.\n",
- "HELLO", GNUNET_i2s (&pid));
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Confirmed validity of address, peer `%4s' has address `%s' (%s).\n",
+ GNUNET_h2s (key),
+ GNUNET_a2s ((const struct sockaddr *) ve->addr,
+ ve->addrlen),
+ ve->transport_name);
#endif
- GNUNET_PEERINFO_add_peer (cfg, sched, &pid, hello);
- n = find_neighbor (&pid);
- if (NULL != n)
- {
- try_transmission_to_peer (n);
- }
- GNUNET_free (hello);
- while (NULL != (va = pos->addresses))
- {
- pos->addresses = va->next;
- GNUNET_free (va->transport_name);
- GNUNET_free (va);
- }
- GNUNET_free (pos);
- if (prev == NULL)
- pos = pending_validations;
- else
- pos = prev->next;
- continue;
- }
- prev = pos;
- pos = pos->next;
- }
-
- /* finally, reschedule cleanup if needed; list is
- ordered by timeout, so we need the last element... */
- if (NULL != pending_validations)
- {
- first = pending_validations->timeout;
- pos = pending_validations;
- while (pos != NULL)
- {
- first = GNUNET_TIME_absolute_min (first, pos->timeout);
- pos = pos->next;
- }
- if (tc->reason != GNUNET_SCHEDULER_REASON_SHUTDOWN)
- {
- GNUNET_SCHEDULER_add_delayed (sched,
- GNUNET_TIME_absolute_get_remaining
- (first), &cleanup_validation, NULL);
- }
- }
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# address validation successes"),
+ 1,
+ GNUNET_NO);
+ /* create the updated HELLO */
+ GNUNET_CRYPTO_hash (&ve->publicKey,
+ sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
+ &target.hashPubKey);
+ avac.done = GNUNET_NO;
+ avac.ve = ve;
+ hello = GNUNET_HELLO_create (&ve->publicKey,
+ &add_validated_address,
+ &avac);
+ GNUNET_PEERINFO_add_peer (cfg, sched,
+ &target,
+ hello);
+ GNUNET_free (hello);
+ n = find_neighbour (&target);
+ if (n != NULL)
+ {
+ fal = add_peer_address (n,
+ ve->transport_name,
+ ve->addr,
+ ve->addrlen);
+ GNUNET_assert (fal != NULL);
+ fal->expires = GNUNET_TIME_relative_to_absolute (HELLO_ADDRESS_EXPIRATION);
+ fal->validated = GNUNET_YES;
+ fal->latency = GNUNET_TIME_absolute_get_duration (ve->send_time);
+ periodic_validation_context = GNUNET_malloc(sizeof(struct PeriodicValidationContext));
+ periodic_validation_context->foreign_address = fal;
+ periodic_validation_context->transport = strdup(ve->transport_name);
+ memcpy(&periodic_validation_context->publicKey,
+ &ve->publicKey,
+ sizeof(struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
+ /* FIXME: this causes all of the revalidation PINGs for the same HELLO
+ to be transmitted in bulk, which is not nice; also,
+ triggering these HERE means that revalidations do NOT happen AT ALL
+ for HELLOs a previous instance of this process validated (since
+ there is no "initial" validation PING => no revalidation => BUG! */
+ fal->revalidate_task = GNUNET_SCHEDULER_add_delayed(sched,
+ TRANSPORT_DEFAULT_REVALIDATION,
+ &send_periodic_ping,
+ periodic_validation_context);
+ if (n->latency.value == GNUNET_TIME_UNIT_FOREVER_REL.value)
+ n->latency = fal->latency;
+ else
+ n->latency.value = (fal->latency.value + n->latency.value) / 2;
+ n->distance = fal->distance;
+ if (GNUNET_NO == n->received_pong)
+ {
+ notify_clients_connect (&target, n->latency, n->distance);
+ n->received_pong = GNUNET_YES;
+ }
+ if (n->retry_task != GNUNET_SCHEDULER_NO_TASK)
+ {
+ GNUNET_SCHEDULER_cancel (sched,
+ n->retry_task);
+ n->retry_task = GNUNET_SCHEDULER_NO_TASK;
+ try_transmission_to_peer (n);
+ }
+ }
+
+ /* clean up validation entry */
+ GNUNET_assert (GNUNET_YES ==
+ GNUNET_CONTAINER_multihashmap_remove (validation_map,
+ key,
+ ve));
+ GNUNET_SCHEDULER_cancel (sched,
+ ve->timeout_task);
+ GNUNET_free (ve->transport_name);
+ GNUNET_free (ve);
+ return GNUNET_NO;
}
* (otherwise we may be seeing a MiM attack).
*
* @param cls closure
- * @param name name of the transport that generated the address
+ * @param message the pong message
* @param peer who responded to our challenge
- * @param challenge the challenge number we presumably used
- * @param sender_addr string describing our sender address (as observed
- * by the other peer in human-readable format)
+ * @param sender_address string describing our sender address (as observed
+ * by the other peer in binary format)
+ * @param sender_address_len number of bytes in 'sender_address'
*/
static void
handle_pong (void *cls, const struct GNUNET_MessageHeader *message,
const char *sender_address,
size_t sender_address_len)
{
- unsigned int not_done;
- int matched;
- struct ValidationList *pos;
- struct ValidationAddress *va;
- struct GNUNET_PeerIdentity id;
- const struct TransportPongMessage *pong = (const struct TransportPongMessage *)message;
- int count = 0;
- unsigned int challenge = ntohl(pong->challenge);
- pos = pending_validations;
-
- while (pos != NULL)
- {
- GNUNET_CRYPTO_hash (&pos->publicKey,
- sizeof (struct
- GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
- &id.hashPubKey);
- if (0 == memcmp (peer, &id, sizeof (struct GNUNET_PeerIdentity)))
- break;
- pos = pos->next;
- count++;
- }
- if (pos == NULL)
- {
- /* TODO: call statistics (unmatched PONG) */
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- _
- ("Received validation response but have no record of any validation request for `%4s' (out of %d). Ignoring.\n"),
- GNUNET_i2s (peer), count);
- return;
- }
- not_done = 0;
- matched = GNUNET_NO;
- va = pos->addresses;
- while (va != NULL)
- {
- if (va->challenge == challenge)
- {
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Confirmed validity of address, peer `%4s' has address `%s'.\n",
- GNUNET_i2s (peer),
- GNUNET_a2s ((const struct sockaddr *) va->peer_address->addr,
- va->peer_address->addrlen));
-#endif
- GNUNET_log (GNUNET_ERROR_TYPE_INFO | GNUNET_ERROR_TYPE_BULK,
- _
- ("Another peer saw us using the address `%s' via `%s'. If this is not plausible, this address should be listed in the configuration as implausible to avoid MiM attacks.\n"),
- GNUNET_a2s ((const struct sockaddr *) &pong[1],
- ntohs(pong->addrlen)), va->transport_name);
- va->ok = GNUNET_YES;
- va->expiration =
- GNUNET_TIME_relative_to_absolute (HELLO_ADDRESS_EXPIRATION);
- matched = GNUNET_YES;
- va->peer_address->connected = GNUNET_YES;
- va->peer_address->latency = GNUNET_TIME_absolute_get_difference(va->send_time, GNUNET_TIME_absolute_get());
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Confirmed validity of address, peer `%4s' has address `%s', latency of %llu\n",
- GNUNET_i2s (peer),
- GNUNET_a2s ((const struct sockaddr *) va->peer_address->addr,
- va->peer_address->addrlen), (unsigned long long)va->peer_address->latency.value);
+#if DEBUG_TRANSPORT > 1
+ /* we get tons of these that just get discarded, only log
+ if we are quite verbose */
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Receiving `%s' message from `%4s'.\n", "PONG",
+ GNUNET_i2s (peer));
#endif
- va->peer_address->transmit_ready = GNUNET_YES;
- va->peer_address->expires = GNUNET_TIME_relative_to_absolute
- (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
- }
- if (va->ok != GNUNET_YES)
- not_done++;
- va = va->next;
- }
- if (GNUNET_NO == matched)
- {
- /* TODO: call statistics (unmatched PONG) */
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- _
- ("Received `%s' message but have no record of a matching `%s' message. Ignoring.\n"),
- "PONG", "PING");
- }
- if (0 == not_done)
- {
-#if DEBUG_TRANSPORT
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# PONG messages received"),
+ 1,
+ GNUNET_NO);
+ if (GNUNET_SYSERR !=
+ GNUNET_CONTAINER_multihashmap_get_multiple (validation_map,
+ &peer->hashPubKey,
+ &check_pending_validation,
+ (void*) message))
+ {
+ /* This is *expected* to happen a lot since we send
+ PONGs to *all* known addresses of the sender of
+ the PING, so most likely we get multiple PONGs
+ per PING, and all but the first PONG will end up
+ here. So really we should not print anything here
+ unless we want to be very, very verbose... */
+#if DEBUG_TRANSPORT > 2
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "All addresses validated, will now construct `%s' for `%4s'.\n",
- "HELLO", GNUNET_i2s (peer));
+ "Received `%s' message from `%4s' but have no record of a matching `%s' message. Ignoring.\n",
+ "PONG",
+ GNUNET_i2s (peer),
+ "PING");
#endif
- pos->timeout.value = 0;
- GNUNET_SCHEDULER_add_with_priority (sched,
- GNUNET_SCHEDULER_PRIORITY_IDLE,
- &cleanup_validation, NULL);
+ return;
}
+#if 0
+ /* FIXME: add given address to potential pool of our addresses
+ (for voting) */
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO | GNUNET_ERROR_TYPE_BULK,
+ _("Another peer saw us using the address `%s' via `%s'.\n"),
+ GNUNET_a2s ((const struct sockaddr *) &pong[1],
+ ntohs(pong->addrlen)),
+ va->transport_name);
+#endif
}
-/**
- * Add an entry for each of our transport plugins
- * (that are able to send) to the list of plugins
- * for this neighbor.
- *
- * @param neighbor to initialize
- */
-static void
-add_plugins (struct NeighborList *neighbor)
-{
- struct TransportPlugin *tp;
- struct ReadyList *rl;
-
- neighbor->retry_plugins_time
- = GNUNET_TIME_relative_to_absolute (PLUGIN_RETRY_FREQUENCY);
- tp = plugins;
- while (tp != NULL)
- {
- if (tp->api->send != NULL)
- {
- rl = GNUNET_malloc (sizeof (struct ReadyList));
- rl->next = neighbor->plugins;
- neighbor->plugins = rl;
- rl->plugin = tp;
- rl->neighbor = neighbor;
- rl->addresses = NULL;
- }
- tp = tp->next;
- }
-}
static void
-neighbor_timeout_task (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc)
+neighbour_timeout_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
{
- struct NeighborList *n = cls;
+ struct NeighbourList *n = cls;
#if DEBUG_TRANSPORT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
- "Neighbor `%4s' has timed out!\n", GNUNET_i2s (&n->id));
+ "Neighbour `%4s' has timed out!\n", GNUNET_i2s (&n->id));
#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# disconnects due to timeout"),
+ 1,
+ GNUNET_NO);
n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
- disconnect_neighbor (n, GNUNET_NO);
+ disconnect_neighbour (n, GNUNET_NO);
}
+
/**
- * Create a fresh entry in our neighbor list for the given peer.
- * Will try to transmit our current HELLO to the new neighbor. Also
+ * Create a fresh entry in our neighbour list for the given peer.
+ * Will try to transmit our current HELLO to the new neighbour. Also
* notifies our clients about the new "connection".
*
* @param peer the peer for which we create the entry
- * @return the new neighbor list entry
+ * @return the new neighbour list entry
*/
-static struct NeighborList *
-setup_new_neighbor (const struct GNUNET_PeerIdentity *peer)
+static struct NeighbourList *
+setup_new_neighbour (const struct GNUNET_PeerIdentity *peer)
{
- struct NeighborList *n;
+ struct NeighbourList *n;
+ struct TransportPlugin *tp;
+ struct ReadyList *rl;
GNUNET_assert (our_hello != NULL);
- n = GNUNET_malloc (sizeof (struct NeighborList));
- n->next = neighbors;
- neighbors = n;
+ n = GNUNET_malloc (sizeof (struct NeighbourList));
+ n->next = neighbours;
+ neighbours = n;
n->id = *peer;
- n->last_quota_update = GNUNET_TIME_absolute_get ();
n->peer_timeout =
GNUNET_TIME_relative_to_absolute
(GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
- n->quota_in = (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT + 59999) / (60 * 1000);
- add_plugins (n);
+ GNUNET_BANDWIDTH_tracker_init (&n->in_tracker,
+ GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
+ MAX_BANDWIDTH_CARRY_S);
+ tp = plugins;
+ while (tp != NULL)
+ {
+ if (tp->api->send != NULL)
+ {
+ rl = GNUNET_malloc (sizeof (struct ReadyList));
+ rl->next = n->plugins;
+ n->plugins = rl;
+ rl->plugin = tp;
+ rl->addresses = NULL;
+ }
+ tp = tp->next;
+ }
+ n->latency = GNUNET_TIME_UNIT_FOREVER_REL;
+ n->distance = -1;
n->timeout_task = GNUNET_SCHEDULER_add_delayed (sched,
GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
- &neighbor_timeout_task, n);
+ &neighbour_timeout_task, n);
transmit_to_peer (NULL, NULL, 0,
+ HELLO_ADDRESS_EXPIRATION,
(const char *) our_hello, GNUNET_HELLO_size(our_hello),
- GNUNET_YES, n);
- notify_clients_connect (peer, GNUNET_TIME_UNIT_FOREVER_REL);
+ GNUNET_NO, n);
return n;
}
-static struct PeerAddressList *
-add_peer_address(struct NeighborList *neighbor, const char *addr, size_t addrlen)
+
+/**
+ * Closure for 'check_address_exists'.
+ */
+struct CheckAddressExistsClosure
{
- /* FIXME: should return a list of PeerAddressLists, support for multiple transports! */
- struct ReadyList *head = neighbor->plugins;
- struct PeerAddressList * new_address;
+ /**
+ * Address to check for.
+ */
+ const void *addr;
- GNUNET_assert(addr != NULL);
+ /**
+ * Name of the transport.
+ */
+ const char *tname;
- new_address = NULL;
- while (head != NULL)
- {
- new_address = GNUNET_malloc(sizeof(struct PeerAddressList));
- new_address->addr = GNUNET_malloc(addrlen);
- memcpy(new_address->addr, addr, addrlen);
- new_address->addrlen = addrlen;
- new_address->connect_attempts = 0;
- new_address->connected = GNUNET_YES; /* Set connected to GNUNET_YES, assuming that we're good */
- new_address->expires = GNUNET_TIME_relative_to_absolute
- (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
- new_address->latency = GNUNET_TIME_relative_get_forever();
- new_address->neighbor = neighbor;
- new_address->plugin = head->plugin;
- new_address->transmit_ready = GNUNET_YES;
- new_address->timeout = GNUNET_TIME_relative_to_absolute
- (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); /* FIXME: Do we need this? */
- new_address->ready_list = head;
- new_address->next = head->addresses;
- head->addresses = new_address;
- head = head->next;
- }
+ /**
+ * Length of addr.
+ */
+ size_t addrlen;
- return new_address;
+ /**
+ * Set to GNUNET_YES if the address exists.
+ */
+ int exists;
+};
+
+
+/**
+ * Iterator over hash map entries. Checks if the given
+ * validation entry is for the same address as what is given
+ * in the closure.
+ *
+ * @param cls the 'struct CheckAddressExistsClosure*'
+ * @param key current key code (ignored)
+ * @param value value in the hash map ('struct ValidationEntry')
+ * @return GNUNET_YES if we should continue to
+ * iterate (mismatch), GNUNET_NO if not (entry matched)
+ */
+static int
+check_address_exists (void *cls,
+ const GNUNET_HashCode * key,
+ void *value)
+{
+ struct CheckAddressExistsClosure *caec = cls;
+ struct ValidationEntry *ve = value;
+ if ( (0 == strcmp (caec->tname,
+ ve->transport_name)) &&
+ (caec->addrlen == ve->addrlen) &&
+ (0 == memcmp (caec->addr,
+ ve->addr,
+ caec->addrlen)) )
+ {
+ caec->exists = GNUNET_YES;
+ return GNUNET_NO;
+ }
+ return GNUNET_YES;
}
-static struct PeerAddressList *
-find_peer_address(struct NeighborList *neighbor, const char *addr, size_t addrlen)
+
+/**
+ * HELLO validation cleanup task (validation failed).
+ *
+ * @param cls the 'struct ValidationEntry' that failed
+ * @param tc scheduler context (unused)
+ */
+static void
+timeout_hello_validation (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
- struct ReadyList *head = neighbor->plugins;
- struct PeerAddressList *address_head;
- while (head != NULL)
- {
- address_head = head->addresses;
- while ((address_head != NULL) &&
- (address_head->addrlen != addrlen) &&
- (memcmp(address_head->addr, addr, addrlen) != 0))
- {
- address_head = address_head->next;
- }
- if (address_head != NULL)
- return address_head;
+ struct ValidationEntry *va = cls;
+ struct GNUNET_PeerIdentity pid;
- head = head->next;
- }
- return NULL;
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# address validation timeouts"),
+ 1,
+ GNUNET_NO);
+ GNUNET_CRYPTO_hash (&va->publicKey,
+ sizeof (struct
+ GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
+ &pid.hashPubKey);
+ GNUNET_CONTAINER_multihashmap_remove (validation_map,
+ &pid.hashPubKey,
+ va);
+ GNUNET_free (va->transport_name);
+ GNUNET_free (va);
}
+
/**
- * Append the given address to the list of entries
- * that need to be validated.
+ * Check if the given address is already being validated; if not,
+ * append the given address to the list of entries that are being be
+ * validated and initiate validation.
+ *
+ * @param cls closure ('struct PeriodicValidationContext *')
+ * @param tname name of the transport
+ * @param expiration expiration time
+ * @param addr the address
+ * @param addrlen length of the address
+ * @return GNUNET_OK (always)
*/
static int
-run_validation (void *cls,
+rerun_validation (void *cls,
const char *tname,
struct GNUNET_TIME_Absolute expiration,
const void *addr, size_t addrlen)
{
- struct ValidationList *e = cls;
- struct TransportPlugin *tp;
- struct ValidationAddress *va;
+ struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *publicKey = cls;
struct GNUNET_PeerIdentity id;
- struct NeighborList *neighbor;
- struct PeerAddressList *peer_address;
- int sent;
- struct TransportPingMessage *ping;
+ struct TransportPlugin *tp;
+ struct ValidationEntry *va;
+ struct NeighbourList *neighbour;
+ struct ForeignAddressList *peer_address;
+ struct TransportPingMessage ping;
+ /*struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded pk;*/
+ struct CheckAddressExistsClosure caec;
char * message_buf;
- int hello_size;
- int tsize;
+ uint16_t hello_size;
+ size_t tsize;
tp = find_transport (tname);
if (tp == NULL)
tname);
return GNUNET_OK;
}
- GNUNET_CRYPTO_hash (&e->publicKey,
+
+ GNUNET_CRYPTO_hash (publicKey,
sizeof (struct
GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
&id.hashPubKey);
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Scheduling validation of address `%s' via `%s' for `%4s'\n",
- GNUNET_a2s (addr, addrlen), tname, GNUNET_i2s (&id));
+ caec.addr = addr;
+ caec.addrlen = addrlen;
+ caec.tname = tname;
+ caec.exists = GNUNET_NO;
+ GNUNET_CONTAINER_multihashmap_iterate (validation_map,
+ &check_address_exists,
+ &caec);
+ if (caec.exists == GNUNET_YES)
+ {
+ /* During validation attempts we will likely trigger the other
+ peer trying to validate our address which in turn will cause
+ it to send us its HELLO, so we expect to hit this case rather
+ frequently. Only print something if we are very verbose. */
+#if DEBUG_TRANSPORT > 1
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Some validation of address `%s' via `%s' for peer `%4s' already in progress.\n",
+ GNUNET_a2s (addr, addrlen),
+ tname,
+ GNUNET_i2s (&id));
#endif
- va = GNUNET_malloc (sizeof (struct ValidationAddress));
- va->next = e->addresses;
- e->addresses = va;
+ return GNUNET_OK;
+ }
+ va = GNUNET_malloc (sizeof (struct ValidationEntry) + addrlen);
va->transport_name = GNUNET_strdup (tname);
va->challenge = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
(unsigned int) -1);
va->send_time = GNUNET_TIME_absolute_get();
+ va->addr = (const void*) &va[1];
+ memcpy (&va[1], addr, addrlen);
+ va->addrlen = addrlen;
+ memcpy(&va->publicKey, publicKey, sizeof(struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
+ va->timeout_task = GNUNET_SCHEDULER_add_delayed (sched,
+ HELLO_VERIFICATION_TIMEOUT,
+ &timeout_hello_validation,
+ va);
+ GNUNET_CONTAINER_multihashmap_put (validation_map,
+ &id.hashPubKey,
+ va,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+ neighbour = find_neighbour(&id);
+ if (neighbour == NULL)
+ neighbour = setup_new_neighbour(&id);
+ peer_address = add_peer_address(neighbour, tname, addr, addrlen);
+ GNUNET_assert(peer_address != NULL);
+ hello_size = GNUNET_HELLO_size(our_hello);
+ tsize = sizeof(struct TransportPingMessage) + hello_size;
+ message_buf = GNUNET_malloc(tsize);
+ ping.challenge = htonl(va->challenge);
+ ping.header.size = htons(sizeof(struct TransportPingMessage));
+ ping.header.type = htons(GNUNET_MESSAGE_TYPE_TRANSPORT_PING);
+ memcpy(&ping.target, &id, sizeof(struct GNUNET_PeerIdentity));
+ memcpy(message_buf, our_hello, hello_size);
+ memcpy(&message_buf[hello_size],
+ &ping,
+ sizeof(struct TransportPingMessage));
+#if DEBUG_TRANSPORT_REVALIDATION
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Performing re-validation of address `%s' via `%s' for peer `%4s' sending `%s' (%u bytes) and `%s' (%u bytes)\n",
+ GNUNET_a2s (addr, addrlen),
+ tname,
+ GNUNET_i2s (&id),
+ "HELLO", hello_size,
+ "PING", sizeof (struct TransportPingMessage));
+#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# PING messages sent for re-validation"),
+ 1,
+ GNUNET_NO);
+ transmit_to_peer (NULL, peer_address,
+ GNUNET_SCHEDULER_PRIORITY_DEFAULT,
+ HELLO_VERIFICATION_TIMEOUT,
+ message_buf, tsize,
+ GNUNET_YES, neighbour);
+ GNUNET_free(message_buf);
+ return GNUNET_OK;
+}
- neighbor = find_neighbor(&id);
- if (neighbor == NULL)
- neighbor = setup_new_neighbor(&id);
+/**
+ * Send periodic ping messages to a give foreign address.
+ *
+ * cls closure, can be safely cast to ForeignAddressList
+ * tc task context
+ *
+ * FIXME: Since a _billion_ pongs are sent for every ping,
+ * maybe this should be a special message type or something
+ * that gets discarded on the other side instead of initiating
+ * a flood.
+ */
+static void
+send_periodic_ping (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct PeriodicValidationContext *periodic_validation_context = cls;
- peer_address = find_peer_address(neighbor, addr, addrlen);
- if (peer_address == NULL)
+ if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN)
{
- peer_address = add_peer_address(neighbor, addr, addrlen);
+ GNUNET_free(periodic_validation_context->transport);
+ GNUNET_free(periodic_validation_context);
+ return; /* We have been shutdown, don't do anything! */
}
+ rerun_validation(&periodic_validation_context->publicKey,
+ periodic_validation_context->transport,
+ periodic_validation_context->foreign_address->expires,
+ periodic_validation_context->foreign_address->addr,
+ periodic_validation_context->foreign_address->addrlen);
+ GNUNET_free(periodic_validation_context->transport);
+ GNUNET_free(periodic_validation_context);
+}
- GNUNET_assert(peer_address != NULL);
- va->peer_address = peer_address; /* Back pointer FIXME: remove this nonsense! */
- peer_address->validation = va;
+/**
+ * Check if the given address is already being validated; if not,
+ * append the given address to the list of entries that are being be
+ * validated and initiate validation.
+ *
+ * @param cls closure ('struct CheckHelloValidatedContext *')
+ * @param tname name of the transport
+ * @param expiration expiration time
+ * @param addr the address
+ * @param addrlen length of the address
+ * @return GNUNET_OK (always)
+ */
+static int
+run_validation (void *cls,
+ const char *tname,
+ struct GNUNET_TIME_Absolute expiration,
+ const void *addr, size_t addrlen)
+{
+ struct CheckHelloValidatedContext *chvc = cls;
+ struct GNUNET_PeerIdentity id;
+ struct TransportPlugin *tp;
+ struct ValidationEntry *va;
+ struct NeighbourList *neighbour;
+ struct ForeignAddressList *peer_address;
+ struct TransportPingMessage ping;
+ struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded pk;
+ struct CheckAddressExistsClosure caec;
+ char * message_buf;
+ uint16_t hello_size;
+ size_t tsize;
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# peer addresses scheduled for validation"),
+ 1,
+ GNUNET_NO);
+ tp = find_transport (tname);
+ if (tp == NULL)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO |
+ GNUNET_ERROR_TYPE_BULK,
+ _
+ ("Transport `%s' not loaded, will not try to validate peer address using this transport.\n"),
+ tname);
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# peer addresses not validated (no applicable transport plugin available)"),
+ 1,
+ GNUNET_NO);
+ return GNUNET_OK;
+ }
+ GNUNET_HELLO_get_key (chvc->hello, &pk);
+ GNUNET_CRYPTO_hash (&pk,
+ sizeof (struct
+ GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
+ &id.hashPubKey);
+ caec.addr = addr;
+ caec.addrlen = addrlen;
+ caec.tname = tname;
+ caec.exists = GNUNET_NO;
+ GNUNET_CONTAINER_multihashmap_iterate (validation_map,
+ &check_address_exists,
+ &caec);
+ if (caec.exists == GNUNET_YES)
+ {
+ /* During validation attempts we will likely trigger the other
+ peer trying to validate our address which in turn will cause
+ it to send us its HELLO, so we expect to hit this case rather
+ frequently. Only print something if we are very verbose. */
+#if DEBUG_TRANSPORT > 1
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Validation of address `%s' via `%s' for peer `%4s' already in progress.\n",
+ GNUNET_a2s (addr, addrlen),
+ tname,
+ GNUNET_i2s (&id));
+#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# peer addresses not validated (in progress)"),
+ 1,
+ GNUNET_NO);
+ return GNUNET_OK;
+ }
+ va = GNUNET_malloc (sizeof (struct ValidationEntry) + addrlen);
+ va->transport_name = GNUNET_strdup (tname);
+ va->challenge = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+ (unsigned int) -1);
+ va->send_time = GNUNET_TIME_absolute_get();
+ va->addr = (const void*) &va[1];
+ memcpy (&va[1], addr, addrlen);
+ va->addrlen = addrlen;
+ GNUNET_HELLO_get_key (chvc->hello,
+ &va->publicKey);
+ va->timeout_task = GNUNET_SCHEDULER_add_delayed (sched,
+ HELLO_VERIFICATION_TIMEOUT,
+ &timeout_hello_validation,
+ va);
+ GNUNET_CONTAINER_multihashmap_put (validation_map,
+ &id.hashPubKey,
+ va,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+ neighbour = find_neighbour(&id);
+ if (neighbour == NULL)
+ neighbour = setup_new_neighbour(&id);
+ peer_address = add_peer_address(neighbour, tname, addr, addrlen);
+ GNUNET_assert(peer_address != NULL);
hello_size = GNUNET_HELLO_size(our_hello);
tsize = sizeof(struct TransportPingMessage) + hello_size;
-
message_buf = GNUNET_malloc(tsize);
-
- ping = GNUNET_malloc(sizeof(struct TransportPingMessage));
- ping->challenge = htonl(va->challenge);
- ping->header.size = htons(sizeof(struct TransportPingMessage));
- ping->header.type = htons(GNUNET_MESSAGE_TYPE_TRANSPORT_PING);
- memcpy(&ping->target, &id, sizeof(struct GNUNET_PeerIdentity));
-
-#if DEBUG_TRANSPORT
- GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "hello size is %d, ping size is %d, total size is %d\n", hello_size, sizeof(struct TransportPingMessage), tsize);
-#endif
+ ping.challenge = htonl(va->challenge);
+ ping.header.size = htons(sizeof(struct TransportPingMessage));
+ ping.header.type = htons(GNUNET_MESSAGE_TYPE_TRANSPORT_PING);
+ memcpy(&ping.target, &id, sizeof(struct GNUNET_PeerIdentity));
memcpy(message_buf, our_hello, hello_size);
- memcpy(&message_buf[hello_size], ping, sizeof(struct TransportPingMessage));
-
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending ping message of size %d to address `%s' via `%s' for `%4s'\n",
- tsize, GNUNET_a2s (addr, addrlen), tname, GNUNET_i2s (&id));
-#endif
- sent = transmit_to_peer(NULL, peer_address, GNUNET_SCHEDULER_PRIORITY_DEFAULT,
- message_buf, tsize, GNUNET_NO, neighbor);
-
+ memcpy(&message_buf[hello_size],
+ &ping,
+ sizeof(struct TransportPingMessage));
#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transport returned %d from send!\n", sent);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Performing validation of address `%s' via `%s' for peer `%4s' sending `%s' (%u bytes) and `%s' (%u bytes)\n",
+ GNUNET_a2s (addr, addrlen),
+ tname,
+ GNUNET_i2s (&id),
+ "HELLO", hello_size,
+ "PING", sizeof (struct TransportPingMessage));
#endif
-
- GNUNET_free(ping);
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# PING messages sent for initial validation"),
+ 1,
+ GNUNET_NO);
+ transmit_to_peer (NULL, peer_address,
+ GNUNET_SCHEDULER_PRIORITY_DEFAULT,
+ HELLO_VERIFICATION_TIMEOUT,
+ message_buf, tsize,
+ GNUNET_YES, neighbour);
GNUNET_free(message_buf);
return GNUNET_OK;
}
-#if WHY
-/*
- * @param cls handle to the plugin (for sending)
- * @param target the peer identity of the peer we are sending to
- * @param challenge the challenge number
- * @param timeout how long to await validation?
- * @param addr the address to validate
- * @param addrlen the length of the address
- *
- * Perform address validation, which means sending a PING PONG to
- * the address via the transport plugin. If not validated, then
- * do not count this as a good peer/address...
+
+/**
+ * Add the given address to the list of foreign addresses
+ * available for the given peer (check for duplicates).
*
- * Currently this function is not used, ping/pongs get sent from the
- * run_validation function. Haven't decided yet how to do this.
+ * @param cls the respective 'struct NeighbourList' to update
+ * @param tname name of the transport
+ * @param expiration expiration time
+ * @param addr the address
+ * @param addrlen length of the address
+ * @return GNUNET_OK (always)
*/
-static void
-validate_address (void *cls, struct ValidationAddress *va,
- const struct GNUNET_PeerIdentity *target,
- struct GNUNET_TIME_Relative timeout,
- const void *addr, size_t addrlen)
+static int
+add_to_foreign_address_list (void *cls,
+ const char *tname,
+ struct GNUNET_TIME_Absolute expiration,
+ const void *addr, size_t addrlen)
{
- /* struct Plugin *plugin = cls;
- int challenge = va->challenge; */
+ struct NeighbourList *n = cls;
+ struct ForeignAddressList *fal;
+ int try;
-
- return;
-}
+ try = GNUNET_NO;
+ fal = find_peer_address (n, tname, addr, addrlen);
+ if (fal == NULL)
+ {
+#if DEBUG_TRANSPORT
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Adding address `%s' (%s) for peer `%4s' due to peerinfo data for %llums.\n",
+ GNUNET_a2s (addr, addrlen),
+ tname,
+ GNUNET_i2s (&n->id),
+ expiration.value);
#endif
+ fal = add_peer_address (n, tname, addr, addrlen);
+ try = GNUNET_YES;
+ }
+ if (fal == NULL)
+ return GNUNET_OK;
+ fal->expires = GNUNET_TIME_absolute_max (expiration,
+ fal->expires);
+ fal->validated = GNUNET_YES;
+ if (try == GNUNET_YES)
+ try_transmission_to_peer (n);
+ return GNUNET_OK;
+}
+
/**
* Check if addresses in validated hello "h" overlap with
- * those in "chvc->hello" and update "chvc->hello" accordingly,
- * removing those addresses that have already been validated.
+ * those in "chvc->hello" and validate the rest.
+ *
+ * @param cls closure
+ * @param peer id of the peer, NULL for last call
+ * @param h hello message for the peer (can be NULL)
+ * @param trust amount of trust we have in the peer (not used)
*/
static void
check_hello_validated (void *cls,
const struct GNUNET_PeerIdentity *peer,
- const struct GNUNET_HELLO_Message *h, uint32_t trust)
+ const struct GNUNET_HELLO_Message *h,
+ uint32_t trust)
{
struct CheckHelloValidatedContext *chvc = cls;
- struct ValidationAddress *va;
- struct TransportPlugin *tp;
- int first_call;
- int count;
- struct GNUNET_PeerIdentity apeer;
+ struct GNUNET_HELLO_Message *plain_hello;
+ struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded pk;
+ struct GNUNET_PeerIdentity target;
+ struct NeighbourList *n;
- first_call = GNUNET_NO;
- if (chvc->e == NULL)
+ if (peer == NULL)
{
chvc->piter = NULL;
- first_call = GNUNET_YES;
- chvc->e = GNUNET_malloc (sizeof (struct ValidationList));
- GNUNET_assert (GNUNET_OK ==
- GNUNET_HELLO_get_key (h != NULL ? h : chvc->hello,
- &chvc->e->publicKey));
- chvc->e->timeout =
- GNUNET_TIME_relative_to_absolute (HELLO_VERIFICATION_TIMEOUT);
- chvc->e->next = pending_validations;
- pending_validations = chvc->e;
- }
-
- if (h != NULL)
- {
- GNUNET_HELLO_iterate_new_addresses (chvc->hello,
- h,
- GNUNET_TIME_absolute_get (),
- &run_validation, chvc->e);
- }
- else if (GNUNET_YES == first_call)
- {
- /* no existing HELLO, all addresses are new */
- GNUNET_HELLO_iterate_addresses (chvc->hello,
- GNUNET_NO, &run_validation, chvc->e);
- }
-
- if (h != NULL)
- return; /* wait for next call */
- /* finally, transmit validation attempts */
- GNUNET_assert (GNUNET_OK == GNUNET_HELLO_get_id (chvc->hello, &apeer));
-
- va = chvc->e->addresses;
- count = 0;
- while (va != NULL)
- {
+ GNUNET_CONTAINER_DLL_remove (chvc_head,
+ chvc_tail,
+ chvc);
+ if (GNUNET_NO == chvc->hello_known)
+ {
+ /* notify PEERINFO about the peer now, so that we at least
+ have the public key if some other component needs it */
+ GNUNET_HELLO_get_key (chvc->hello, &pk);
+ GNUNET_CRYPTO_hash (&pk,
+ sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
+ &target.hashPubKey);
+ plain_hello = GNUNET_HELLO_create (&pk,
+ NULL,
+ NULL);
+ GNUNET_PEERINFO_add_peer (cfg, sched, &target, plain_hello);
+ GNUNET_free (plain_hello);
#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Establishing `%s' connection to validate `%s' address `%s' of `%4s'\n",
- va->transport_name,
- "HELLO",
- GNUNET_a2s ((const struct sockaddr *) va->peer_address->addr,
- va->peer_address->addrlen), GNUNET_i2s (&apeer));
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Peerinfo had no `%s' message for peer `%4s', full validation needed.\n",
+ "HELLO",
+ GNUNET_i2s (&target));
#endif
- tp = find_transport (va->transport_name);
- GNUNET_assert (tp != NULL);
- /* This validation should happen inside the transport, not from the plugin! */
- va->ok = GNUNET_SYSERR;
- va = va->next;
- count++;
- }
-
- GNUNET_SCHEDULER_add_delayed (sched,
- GNUNET_TIME_absolute_get_remaining (chvc->
- e->timeout),
- &cleanup_validation, NULL);
- GNUNET_free (chvc);
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# new HELLOs requiring full validation"),
+ 1,
+ GNUNET_NO);
+ GNUNET_HELLO_iterate_addresses (chvc->hello,
+ GNUNET_NO,
+ &run_validation,
+ chvc);
+ }
+ else
+ {
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# duplicate HELLO (peer known)"),
+ 1,
+ GNUNET_NO);
+ }
+ GNUNET_free (chvc);
+ return;
+ }
+ if (h == NULL)
+ return;
+#if DEBUG_TRANSPORT
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Peerinfo had `%s' message for peer `%4s', validating only new addresses.\n",
+ "HELLO",
+ GNUNET_i2s (peer));
+#endif
+ chvc->hello_known = GNUNET_YES;
+ n = find_neighbour (peer);
+ if (n != NULL)
+ {
+ GNUNET_HELLO_iterate_addresses (h,
+ GNUNET_NO,
+ &add_to_foreign_address_list,
+ n);
+ try_transmission_to_peer (n);
+ }
+ GNUNET_HELLO_iterate_new_addresses (chvc->hello,
+ h,
+ GNUNET_TIME_relative_to_absolute (HELLO_REVALIDATION_START_TIME),
+ &run_validation,
+ chvc);
}
-
/**
* Process HELLO-message.
*
process_hello (struct TransportPlugin *plugin,
const struct GNUNET_MessageHeader *message)
{
- struct ValidationList *e;
uint16_t hsize;
struct GNUNET_PeerIdentity target;
const struct GNUNET_HELLO_Message *hello;
return GNUNET_SYSERR;
}
/* first, check if load is too high */
- if (GNUNET_OS_load_cpu_get (cfg) > 100)
+ if (GNUNET_SCHEDULER_get_load (sched,
+ GNUNET_SCHEDULER_PRIORITY_BACKGROUND) > MAX_HELLO_LOAD)
{
- /* TODO: call to stats? */
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# HELLOs ignored due to high load"),
+ 1,
+ GNUNET_NO);
return GNUNET_OK;
}
hello = (const struct GNUNET_HELLO_Message *) message;
GNUNET_CRYPTO_hash (&publicKey,
sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
&target.hashPubKey);
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Processing `%s' message for `%4s' of size %d (hsize is %d)\n",
- "HELLO", GNUNET_i2s (&target), GNUNET_HELLO_size(hello), hsize);
-#endif
-
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Notifying peerinfo about peer %s\n",
- GNUNET_i2s (&target));
-#endif
-
- /* check if a HELLO for this peer is already on the validation list */
- e = pending_validations;
- while (e != NULL)
+ if (0 == memcmp (&my_identity,
+ &target,
+ sizeof (struct GNUNET_PeerIdentity)))
{
- if (0 == memcmp (&e->publicKey,
- &publicKey,
- sizeof (struct
- GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded)))
- {
- /* TODO: call to stats? */
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "`%s' message for peer `%4s' is already pending; ignoring new message\n",
- "HELLO", GNUNET_i2s (&target));
-#endif
- return GNUNET_OK;
- }
- e = e->next;
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# HELLOs ignored for validation (is my own HELLO)"),
+ 1,
+ GNUNET_NO);
+ return GNUNET_OK;
}
+#if DEBUG_TRANSPORT > 1
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Processing `%s' message for `%4s' of size %u\n",
+ "HELLO",
+ GNUNET_i2s (&target),
+ GNUNET_HELLO_size(hello));
+#endif
chvc = GNUNET_malloc (sizeof (struct CheckHelloValidatedContext) + hsize);
- chvc->plugin = plugin;
- chvc->hello = (struct GNUNET_HELLO_Message *) &chvc[1];
- chvc->e = NULL;
- memcpy (chvc->hello, hello, hsize);
+ chvc->hello = (const struct GNUNET_HELLO_Message *) &chvc[1];
+ memcpy (&chvc[1], hello, hsize);
+ GNUNET_CONTAINER_DLL_insert (chvc_head,
+ chvc_tail,
+ chvc);
/* finally, check if HELLO was previously validated
(continuation will then schedule actual validation) */
chvc->piter = GNUNET_PEERINFO_iterate (cfg,
/**
- * The peer specified by the given neighbor has timed-out or a plugin
+ * The peer specified by the given neighbour has timed-out or a plugin
* has disconnected. We may either need to do nothing (other plugins
* still up), or trigger a full disconnect and clean up. This
* function updates our state and does the necessary notifications.
- * Also notifies our clients that the neighbor is now officially
+ * Also notifies our clients that the neighbour is now officially
* gone.
*
- * @param n the neighbor list entry for the peer
+ * @param n the neighbour list entry for the peer
* @param check should we just check if all plugins
* disconnected or must we ask all plugins to
* disconnect?
*/
static void
-disconnect_neighbor (struct NeighborList *current_handle, int check)
+disconnect_neighbour (struct NeighbourList *n, int check)
{
struct ReadyList *rpos;
- struct NeighborList *npos;
- struct NeighborList *nprev;
- struct NeighborList *n;
+ struct NeighbourList *npos;
+ struct NeighbourList *nprev;
struct MessageQueue *mq;
- struct PeerAddressList *peer_addresses;
- struct PeerAddressList *peer_pos;
-
- if (neighbors == NULL)
- return; /* We don't have any neighbors, so client has an already removed handle! */
-
- npos = neighbors;
- while ((npos != NULL) && (current_handle != npos))
- npos = npos->next;
-
- if (npos == NULL)
- return; /* Couldn't find neighbor in existing list, must have been already removed! */
- else
- n = npos;
+ struct ForeignAddressList *peer_addresses;
+ struct ForeignAddressList *peer_pos;
if (GNUNET_YES == check)
{
rpos = rpos->next;
}
}
-
#if DEBUG_TRANSPORT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
- "Disconnecting from `%4s'\n", GNUNET_i2s (&n->id));
+ "Disconnecting from `%4s'\n",
+ GNUNET_i2s (&n->id));
#endif
- /* remove n from neighbors list */
+ /* remove n from neighbours list */
nprev = NULL;
- npos = neighbors;
+ npos = neighbours;
while ((npos != NULL) && (npos != n))
{
nprev = npos;
}
GNUNET_assert (npos != NULL);
if (nprev == NULL)
- neighbors = n->next;
+ neighbours = n->next;
else
nprev->next = n->next;
/* notify all clients about disconnect */
- notify_clients_disconnect (&n->id);
+ if (GNUNET_YES == n->received_pong)
+ notify_clients_disconnect (&n->id);
/* clean up all plugins, cancel connections and pending transmissions */
while (NULL != (rpos = n->plugins))
{
n->plugins = rpos->next;
- GNUNET_assert (rpos->neighbor == n);
- if (GNUNET_YES == rpos->connected)
- rpos->plugin->api->disconnect (rpos->plugin->api->cls, &n->id);
+ rpos->plugin->api->disconnect (rpos->plugin->api->cls, &n->id);
- peer_pos = rpos->addresses;
- rpos->addresses = peer_pos->next;
- while (peer_pos != NULL)
+ while (rpos->addresses != NULL)
{
- GNUNET_free(peer_pos->addr);
- GNUNET_free(peer_pos);
peer_pos = rpos->addresses;
rpos->addresses = peer_pos->next;
+ GNUNET_free(peer_pos);
}
GNUNET_free (rpos);
}
/* free all messages on the queue */
- while (NULL != (mq = n->messages))
- {
- n->messages = mq->next;
- GNUNET_assert (0 == memcmp(mq->neighbor_id, &n->id, sizeof(struct GNUNET_PeerIdentity)));
- GNUNET_free (mq->neighbor_id);
+ while (NULL != (mq = n->messages_head))
+ {
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# bytes in message queue for other peers"),
+ -mq->message_buf_size,
+ GNUNET_NO);
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# bytes discarded due to disconnect"),
+ mq->message_buf_size,
+ GNUNET_NO);
+ GNUNET_CONTAINER_DLL_remove (n->messages_head,
+ n->messages_tail,
+ mq);
+ GNUNET_assert (0 == memcmp(&mq->neighbour_id,
+ &n->id,
+ sizeof(struct GNUNET_PeerIdentity)));
GNUNET_free (mq);
}
if (n->timeout_task != GNUNET_SCHEDULER_NO_TASK)
- GNUNET_SCHEDULER_cancel (sched, n->timeout_task);
+ {
+ GNUNET_SCHEDULER_cancel (sched, n->timeout_task);
+ n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+ }
+ if (n->retry_task != GNUNET_SCHEDULER_NO_TASK)
+ {
+ GNUNET_SCHEDULER_cancel (sched, n->retry_task);
+ n->retry_task = GNUNET_SCHEDULER_NO_TASK;
+ }
/* finally, free n itself */
GNUNET_free (n);
}
-/*
+/**
* We have received a PING message from someone. Need to send a PONG message
- * in response to the peer by any means necessary. Of course, with something
- * like TCP where a connection exists, we may want to send it that way. But
- * we may not be able to make that distinction...
- */
-static int handle_ping(void *cls, const struct GNUNET_MessageHeader *message,
- const struct GNUNET_PeerIdentity *peer,
- const char *sender_address,
- size_t sender_address_len)
+ * in response to the peer by any means necessary.
+ *
+ * FIXME: With something like TCP where a connection exists, we may
+ * want to send it that way. But the current API does not seem to
+ * allow us to do so (can't tell this to the transport!)
+ */
+static int
+handle_ping(void *cls, const struct GNUNET_MessageHeader *message,
+ const struct GNUNET_PeerIdentity *peer,
+ const char *sender_address,
+ size_t sender_address_len)
{
struct TransportPlugin *plugin = cls;
struct TransportPingMessage *ping;
struct TransportPongMessage *pong;
- struct PeerAddressList *peer_address;
- uint16_t msize;
- struct NeighborList *n;
-
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
- "Processing `%s' from `%s'\n",
- "PING", GNUNET_a2s ((const struct sockaddr *)sender_address, sender_address_len));
-#endif
+ struct NeighbourList *n;
+ struct ReadyList *rl;
+ struct ForeignAddressList *fal;
- msize = ntohs (message->size);
- if (msize < sizeof (struct TransportPingMessage))
+ if (ntohs (message->size) != sizeof (struct TransportPingMessage))
{
GNUNET_break_op (0);
return GNUNET_SYSERR;
sizeof (struct GNUNET_PeerIdentity)))
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- _("Received `%s' message not destined for me!\n"), "PING");
+ _("Received `%s' message not destined for me!\n"),
+ "PING");
return GNUNET_SYSERR;
}
-
- msize -= sizeof (struct TransportPingMessage);
-
+#if DEBUG_TRANSPORT
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
+ "Processing `%s' from `%s'\n",
+ "PING",
+ GNUNET_a2s ((const struct sockaddr *)sender_address,
+ sender_address_len));
+#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# PING messages received"),
+ 1,
+ GNUNET_NO);
pong = GNUNET_malloc (sizeof (struct TransportPongMessage) + sender_address_len);
pong->header.size = htons (sizeof (struct TransportPongMessage) + sender_address_len);
pong->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_PONG);
pong->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_TCP_PING);
pong->challenge = ping->challenge;
pong->addrlen = htons(sender_address_len);
-
- memcpy(&pong->signer, &my_public_key, sizeof(struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
+ memcpy(&pong->signer,
+ &my_public_key,
+ sizeof(struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
memcpy (&pong[1], sender_address, sender_address_len);
GNUNET_assert (GNUNET_OK ==
GNUNET_CRYPTO_rsa_sign (my_private_key,
&pong->purpose, &pong->signature));
- n = find_neighbor(peer);
+ n = find_neighbour(peer);
if (n == NULL)
- n = setup_new_neighbor(peer);
-
- peer_address = find_peer_address(n, sender_address, sender_address_len);
- if (peer_address == NULL)
- peer_address = add_peer_address(n, sender_address, sender_address_len);
-
- peer_address->timeout = GNUNET_TIME_relative_to_absolute(GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
-
- /* We don't use the peer_address because the address we received the message from may not
- * be a reliable way to send it back! We add it to the list which should queue up a separate
- * ping to determine if the address is viable.
- */
- transmit_to_peer(NULL, NULL, TRANSPORT_DEFAULT_PRIORITY, (char *)pong, ntohs(pong->header.size), GNUNET_NO, n);
-
+ n = setup_new_neighbour(peer);
+ /* broadcast 'PONG' to all available addresses */
+ rl = n->plugins;
+ while (rl != NULL)
+ {
+ fal = rl->addresses;
+ while (fal != NULL)
+ {
+ transmit_to_peer(NULL, fal,
+ TRANSPORT_PONG_PRIORITY,
+ HELLO_VERIFICATION_TIMEOUT,
+ (const char *)pong,
+ ntohs(pong->header.size),
+ GNUNET_YES,
+ n);
+ fal = fal->next;
+ }
+ rl = rl->next;
+ }
GNUNET_free(pong);
return GNUNET_OK;
}
+
/**
* Function called by the plugin for each received message.
* Update data volumes, possibly notify plugins about
* and generally forward to our receive callback.
*
* @param cls the "struct TransportPlugin *" we gave to the plugin
- * @param message the message, NULL if peer was disconnected
- * @param distance the transport cost to this peer (not latency!)
- * @param sender_address the address that the sender reported
- * (opaque to transport service)
- * @param sender_address_len the length of the sender address
* @param peer (claimed) identity of the other peer
- * @return the new service_context that the plugin should use
- * for future receive calls for messages from this
- * particular peer
- *
- */
-static void
+ * @param message the message, NULL if we only care about
+ * learning about the delay until we should receive again
+ * @param distance in overlay hops; use 1 unless DV (or 0 if message == NULL)
+ * @param sender_address binary address of the sender (if observed)
+ * @param sender_address_len number of bytes in sender_address
+ * @return how long the plugin should wait until receiving more data
+ * (plugins that do not support this, can ignore the return value)
+ */
+static struct GNUNET_TIME_Relative
plugin_env_receive (void *cls, const struct GNUNET_PeerIdentity *peer,
const struct GNUNET_MessageHeader *message,
unsigned int distance, const char *sender_address,
struct TransportPlugin *plugin = cls;
struct TransportClient *cpos;
struct InboundMessage *im;
- struct PeerAddressList *peer_address;
+ struct ForeignAddressList *peer_address;
uint16_t msize;
- struct NeighborList *n;
+ struct NeighbourList *n;
+ struct GNUNET_TIME_Relative ret;
- n = find_neighbor (peer);
+ n = find_neighbour (peer);
if (n == NULL)
- {
- if (message == NULL)
- return; /* disconnect of peer already marked down */
- n = setup_new_neighbor (peer);
-
- }
-
- peer_address = find_peer_address(n, sender_address, sender_address_len);
- if (peer_address == NULL)
- peer_address = add_peer_address(n, sender_address, sender_address_len);
-
+ n = setup_new_neighbour (peer);
service_context = n->plugins;
while ((service_context != NULL) && (plugin != service_context->plugin))
service_context = service_context->next;
GNUNET_assert ((plugin->api->send == NULL) || (service_context != NULL));
- if (message == NULL)
- {
+ if (message != NULL)
+ {
+ peer_address = add_peer_address(n,
+ plugin->short_name,
+ sender_address,
+ sender_address_len);
+ if (peer_address != NULL)
+ {
+ peer_address->distance = distance;
+ if (peer_address->connected == GNUNET_NO)
+ {
+ peer_address->connected = GNUNET_YES;
+ peer_address->connect_attempts++;
+ }
+ peer_address->timeout
+ =
+ GNUNET_TIME_relative_to_absolute
+ (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
+ }
+ /* update traffic received amount ... */
+ msize = ntohs (message->size);
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# bytes received from other peers"),
+ msize,
+ GNUNET_NO);
+ n->distance = distance;
+ n->peer_timeout =
+ GNUNET_TIME_relative_to_absolute
+ (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
+ GNUNET_SCHEDULER_cancel (sched,
+ n->timeout_task);
+ n->timeout_task =
+ GNUNET_SCHEDULER_add_delayed (sched,
+ GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
+ &neighbour_timeout_task, n);
+ if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD)
+ {
+ /* dropping message due to frequent inbound volume violations! */
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING |
+ GNUNET_ERROR_TYPE_BULK,
+ _
+ ("Dropping incoming message due to repeated bandwidth quota (%u b/s) violations (total of %u).\n"),
+ n->in_tracker.available_bytes_per_s__,
+ n->quota_violation_count);
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# bandwidth quota violations by other peers"),
+ 1,
+ GNUNET_NO);
+ return GNUNET_TIME_UNIT_MINUTES; /* minimum penalty, likely ignored (UDP...) */
+ }
+ switch (ntohs (message->type))
+ {
+ case GNUNET_MESSAGE_TYPE_HELLO:
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# HELLO messages received from other peers"),
+ 1,
+ GNUNET_NO);
+ process_hello (plugin, message);
+ break;
+ case GNUNET_MESSAGE_TYPE_TRANSPORT_PING:
+ handle_ping(plugin, message, peer, sender_address, sender_address_len);
+ break;
+ case GNUNET_MESSAGE_TYPE_TRANSPORT_PONG:
+ handle_pong(plugin, message, peer, sender_address, sender_address_len);
+ break;
+ default:
#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
- "Receive failed from `%4s', triggering disconnect\n",
- GNUNET_i2s (&n->id));
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received message of type %u from `%4s', sending to all clients.\n",
+ ntohs (message->type), GNUNET_i2s (peer));
#endif
- /* TODO: call stats */
- if (service_context != NULL)
- service_context->connected = GNUNET_NO;
- disconnect_neighbor (n, GNUNET_YES);
- return;
- }
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
- "Processing message of type `%u' received by plugin...\n",
- ntohs (message->type));
-#endif
- if (service_context != NULL)
- {
- if (service_context->connected == GNUNET_NO)
- {
- /*service_context->connected = GNUNET_YES;*/
- /* FIXME: What to do here? Should we use these as well, to specify some Address
- * in the AddressList should be available?
- */
- peer_address->transmit_ready = GNUNET_YES;
- peer_address->connect_attempts++;
- }
- peer_address->timeout
- =
- GNUNET_TIME_relative_to_absolute
- (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
- }
- /* update traffic received amount ... */
- msize = ntohs (message->size);
- n->last_received += msize;
- GNUNET_SCHEDULER_cancel (sched, n->timeout_task);
- n->peer_timeout =
- GNUNET_TIME_relative_to_absolute
- (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
- n->timeout_task =
- GNUNET_SCHEDULER_add_delayed (sched,
- GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
- &neighbor_timeout_task, n);
- update_quota (n);
- if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD)
- {
- /* dropping message due to frequent inbound volume violations! */
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING |
- GNUNET_ERROR_TYPE_BULK,
- _
- ("Dropping incoming message due to repeated bandwidth quota violations (total of %u).\n"), n->quota_violation_count);
- /* TODO: call stats */
- GNUNET_assert ((service_context == NULL) ||
- (NULL != service_context->neighbor));
-
- return;
- }
- switch (ntohs (message->type))
+ if (GNUNET_YES == GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker,
+ msize))
+ n->quota_violation_count++;
+ else
+ n->quota_violation_count = 0; /* back within limits */
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# payload received from other peers"),
+ msize,
+ GNUNET_NO);
+ /* transmit message to all clients */
+ im = GNUNET_malloc (sizeof (struct InboundMessage) + msize);
+ im->header.size = htons (sizeof (struct InboundMessage) + msize);
+ im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
+ im->latency = GNUNET_TIME_relative_hton (n->latency);
+ im->peer = *peer;
+ memcpy (&im[1], message, msize);
+ cpos = clients;
+ while (cpos != NULL)
+ {
+ transmit_to_client (cpos, &im->header, GNUNET_YES);
+ cpos = cpos->next;
+ }
+ GNUNET_free (im);
+ }
+ }
+ ret = GNUNET_BANDWIDTH_tracker_get_delay (&n->in_tracker, 0);
+ if (ret.value > 0)
{
- case GNUNET_MESSAGE_TYPE_HELLO:
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Receiving `%s' message from `%4s'.\n", "HELLO",
- GNUNET_i2s (peer));
-#endif
- process_hello (plugin, message);
- break;
- case GNUNET_MESSAGE_TYPE_TRANSPORT_PING:
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Receiving `%s' message from `%4s'.\n", "PING",
- GNUNET_i2s (peer));
-#endif
- handle_ping(plugin, message, peer, sender_address, sender_address_len);
- break;
- case GNUNET_MESSAGE_TYPE_TRANSPORT_PONG:
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Receiving `%s' message from `%4s'.\n", "PONG",
- GNUNET_i2s (peer));
-#endif
- handle_pong(plugin, message, peer, sender_address, sender_address_len);
- break;
- default:
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received REAL MESSAGE type %u from `%4s', sending to all clients.\n",
- ntohs (message->type), GNUNET_i2s (peer));
-#endif
- /* transmit message to all clients */
- im = GNUNET_malloc (sizeof (struct InboundMessage) + msize);
- im->header.size = htons (sizeof (struct InboundMessage) + msize);
- im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
- im->latency = n->latency;
- im->peer = *peer;
- memcpy (&im[1], message, msize);
-
- cpos = clients;
- while (cpos != NULL)
- {
- transmit_to_client (cpos, &im->header, GNUNET_YES);
- cpos = cpos->next;
- }
- GNUNET_free (im);
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Throttling read (%llu bytes excess at %u b/s), waiting %llums before reading more.\n",
+ (unsigned long long) n->in_tracker.consumption_since_last_update__,
+ (unsigned int) n->in_tracker.available_bytes_per_s__,
+ (unsigned long long) ret.value);
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# ms throttling suggested"),
+ (int64_t) ret.value,
+ GNUNET_NO);
}
- GNUNET_assert ((service_context == NULL) ||
- (NULL != service_context->neighbor));
+ return ret;
}
{
struct TransportClient *c;
struct ConnectInfoMessage cim;
- struct NeighborList *n;
- struct InboundMessage *im;
- struct GNUNET_MessageHeader *ack;
+ struct NeighbourList *n;
#if DEBUG_TRANSPORT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
/* tell new client about all existing connections */
cim.header.size = htons (sizeof (struct ConnectInfoMessage));
cim.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
- cim.quota_out =
- htonl (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT / (60 * 1000));
- cim.latency = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO); /* FIXME? */
- im = GNUNET_malloc (sizeof (struct InboundMessage) +
- sizeof (struct GNUNET_MessageHeader));
- im->header.size = htons (sizeof (struct InboundMessage) +
- sizeof (struct GNUNET_MessageHeader));
- im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
- im->latency = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO); /* FIXME? */
- ack = (struct GNUNET_MessageHeader *) &im[1];
- ack->size = htons (sizeof (struct GNUNET_MessageHeader));
- ack->type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_ACK);
- for (n = neighbors; n != NULL; n = n->next)
- {
- cim.id = n->id;
- transmit_to_client (c, &cim.header, GNUNET_NO);
- if (n->received_pong)
- {
- im->peer = n->id;
- transmit_to_client (c, &im->header, GNUNET_NO);
+ n = neighbours;
+ while (n != NULL)
+ {
+ if (GNUNET_YES == n->received_pong)
+ {
+ cim.id = n->id;
+ cim.latency = GNUNET_TIME_relative_hton (n->latency);
+ cim.distance = htonl (n->distance);
+ transmit_to_client (c, &cim.header, GNUNET_NO);
}
+ n = n->next;
}
- GNUNET_free (im);
- }
- else
- {
- fprintf(stderr, "Our hello is NULL!\n");
}
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
{
int ret;
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received `%s' request from client\n", "HELLO");
-#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# HELLOs received from clients"),
+ 1,
+ GNUNET_NO);
ret = process_hello (NULL, message);
GNUNET_SERVER_receive_done (client, ret);
}
const struct GNUNET_MessageHeader *message)
{
struct TransportClient *tc;
- struct NeighborList *n;
+ struct NeighbourList *n;
const struct OutboundMessage *obm;
const struct GNUNET_MessageHeader *obmm;
uint16_t size;
GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
return;
}
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# payload received for other peers"),
+ size,
+ GNUNET_NO);
obm = (const struct OutboundMessage *) message;
#if DEBUG_TRANSPORT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
return;
}
- n = find_neighbor (&obm->peer);
+ n = find_neighbour (&obm->peer);
if (n == NULL)
- n = setup_new_neighbor (&obm->peer); /* But won't ever add address, we have none! */
+ n = setup_new_neighbour (&obm->peer);
tc = clients;
while ((tc != NULL) && (tc->client != client))
tc = tc->next;
ntohs (obmm->size),
ntohs (obmm->type), GNUNET_i2s (&obm->peer));
#endif
- transmit_to_peer (tc, NULL, ntohl (obm->priority), (char *)obmm, ntohs (obmm->size), GNUNET_NO, n);
+ transmit_to_peer (tc, NULL, ntohl (obm->priority),
+ GNUNET_TIME_relative_ntoh (obm->timeout),
+ (char *)obmm,
+ ntohs (obmm->size), GNUNET_NO, n);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
{
const struct QuotaSetMessage *qsm =
(const struct QuotaSetMessage *) message;
- struct NeighborList *n;
- struct TransportPlugin *p;
- struct ReadyList *rl;
-
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received `%s' request from client for peer `%4s'\n",
- "SET_QUOTA", GNUNET_i2s (&qsm->peer));
-#endif
- n = find_neighbor (&qsm->peer);
+ struct NeighbourList *n;
+
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# SET QUOTA messages received"),
+ 1,
+ GNUNET_NO);
+ n = find_neighbour (&qsm->peer);
if (n == NULL)
{
GNUNET_SERVER_receive_done (client, GNUNET_OK);
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# SET QUOTA messages ignored (no such peer)"),
+ 1,
+ GNUNET_NO);
return;
}
- update_quota (n);
- if (n->quota_in < ntohl (qsm->quota_in))
- n->last_quota_update = GNUNET_TIME_absolute_get ();
- n->quota_in = ntohl (qsm->quota_in);
- rl = n->plugins;
- while (rl != NULL)
- {
- p = rl->plugin;
- p->api->set_receive_quota (p->api->cls,
- &qsm->peer, ntohl (qsm->quota_in));
- rl = rl->next;
- }
- GNUNET_SERVER_receive_done (client, GNUNET_OK);
-}
-
-
-/**
- * Handle TRY_CONNECT-message.
- *
- * @param cls closure (always NULL)
- * @param client identification of the client
- * @param message the actual message
- */
-static void
-handle_try_connect (void *cls,
- struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *message)
-{
- const struct TryConnectMessage *tcm;
- struct NeighborList *neighbor;
- tcm = (const struct TryConnectMessage *) message;
#if DEBUG_TRANSPORT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received `%s' request from client %p asking to connect to `%4s'\n",
- "TRY_CONNECT", client, GNUNET_i2s (&tcm->peer));
-#endif
- neighbor = find_neighbor(&tcm->peer);
-
- if (neighbor == NULL)
- setup_new_neighbor (&tcm->peer);
- else
- {
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Client asked to connect to `%4s', but connection already exists\n",
- "TRY_CONNECT", GNUNET_i2s (&tcm->peer));
+ "Received `%s' request (new quota %u, old quota %u) from client for peer `%4s'\n",
+ "SET_QUOTA",
+ (unsigned int) ntohl (qsm->quota.value__),
+ (unsigned int) n->in_tracker.available_bytes_per_s__,
+ GNUNET_i2s (&qsm->peer));
#endif
- transmit_to_peer (NULL, NULL, 0,
- (const char *) our_hello, GNUNET_HELLO_size(our_hello),
- GNUNET_YES, neighbor);
- notify_clients_connect (&tcm->peer, GNUNET_TIME_UNIT_FOREVER_REL);
- }
+ GNUNET_BANDWIDTH_tracker_update_quota (&n->in_tracker,
+ qsm->quota);
+ if (0 == ntohl (qsm->quota.value__))
+ disconnect_neighbour (n, GNUNET_NO);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
+
static void
transmit_address_to_client (void *cls, const char *address)
{
GNUNET_SERVER_transmit_context_run (tc, GNUNET_TIME_UNIT_FOREVER_REL);
}
+
/**
* Handle AddressLookup-message.
*
GNUNET_MESSAGE_TYPE_TRANSPORT_SEND, 0},
{&handle_set_quota, NULL,
GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA, sizeof (struct QuotaSetMessage)},
- {&handle_try_connect, NULL,
- GNUNET_MESSAGE_TYPE_TRANSPORT_TRY_CONNECT,
- sizeof (struct TryConnectMessage)},
{&handle_address_lookup, NULL,
GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_LOOKUP,
0},
plug->env.cls = plug;
plug->env.receive = &plugin_env_receive;
plug->env.notify_address = &plugin_env_notify_address;
- plug->env.default_quota_in =
- (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT + 59999) / (60 * 1000);
plug->env.max_connections = max_connect_per_transport;
+ plug->env.stats = stats;
}
return;
while (NULL != (mqe = pos->message_queue_head))
{
- pos->message_queue_head = mqe->next;
+ GNUNET_CONTAINER_DLL_remove (pos->message_queue_head,
+ pos->message_queue_tail,
+ mqe);
+ pos->message_count--;
GNUNET_free (mqe);
}
- pos->message_queue_head = NULL;
if (prev == NULL)
clients = pos->next;
else
pos->client = NULL;
return;
}
+ if (pos->th != NULL)
+ {
+ GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->th);
+ pos->th = NULL;
+ }
+ GNUNET_break (0 == pos->message_count);
GNUNET_free (pos);
}
/**
- * Function called when the service shuts down. Unloads our plugins.
+ * Iterator to free entries in the validation_map.
+ *
+ * @param cls closure (unused)
+ * @param key current key code
+ * @param value value in the hash map (validation to abort)
+ * @return GNUNET_YES (always)
+ */
+static int
+abort_validation (void *cls,
+ const GNUNET_HashCode * key,
+ void *value)
+{
+ struct ValidationEntry *va = value;
+
+ GNUNET_SCHEDULER_cancel (sched, va->timeout_task);
+ GNUNET_free (va->transport_name);
+ GNUNET_free (va);
+ return GNUNET_YES;
+}
+
+
+/**
+ * Function called when the service shuts down. Unloads our plugins
+ * and cancels pending validations.
*
* @param cls closure, unused
* @param tc task context (unused)
*/
static void
-unload_plugins (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct TransportPlugin *plug;
- struct AddressList *al;
+ struct OwnAddressList *al;
+ struct CheckHelloValidatedContext *chvc;
+ while (neighbours != NULL)
+ disconnect_neighbour (neighbours, GNUNET_NO);
#if DEBUG_TRANSPORT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Transport service is unloading plugins...\n");
if (my_private_key != NULL)
GNUNET_CRYPTO_rsa_key_free (my_private_key);
GNUNET_free_non_null (our_hello);
+
+ /* free 'chvc' data structure */
+ while (NULL != (chvc = chvc_head))
+ {
+ chvc_head = chvc->next;
+ GNUNET_PEERINFO_iterate_cancel (chvc->piter);
+ GNUNET_free (chvc);
+ }
+ chvc_tail = NULL;
+
+ GNUNET_CONTAINER_multihashmap_iterate (validation_map,
+ &abort_validation,
+ NULL);
+ GNUNET_CONTAINER_multihashmap_destroy (validation_map);
+ validation_map = NULL;
+ if (stats != NULL)
+ {
+ GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
+ stats = NULL;
+ }
}
sched = s;
cfg = c;
+ stats = GNUNET_STATISTICS_create (sched, "transport", cfg);
+ validation_map = GNUNET_CONTAINER_multihashmap_create (64);
/* parse configuration */
if ((GNUNET_OK !=
GNUNET_CONFIGURATION_get_value_number (c,
_
("Transport service is lacking key configuration settings. Exiting.\n"));
GNUNET_SCHEDULER_shutdown (s);
+ if (stats != NULL)
+ {
+ GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
+ stats = NULL;
+ }
+ GNUNET_CONTAINER_multihashmap_destroy (validation_map);
+ validation_map = NULL;
return;
}
max_connect_per_transport = (uint32_t) tneigh;
_
("Transport service could not access hostkey. Exiting.\n"));
GNUNET_SCHEDULER_shutdown (s);
+ if (stats != NULL)
+ {
+ GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
+ stats = NULL;
+ }
+ GNUNET_CONTAINER_multihashmap_destroy (validation_map);
+ validation_map = NULL;
return;
}
GNUNET_CRYPTO_rsa_key_get_public (my_private_key, &my_public_key);
}
GNUNET_SCHEDULER_add_delayed (sched,
GNUNET_TIME_UNIT_FOREVER_REL,
- &unload_plugins, NULL);
+ &shutdown_task, NULL);
if (no_transports)
refresh_hello ();