#include "gnunet_peerinfo_service.h"
#include "gnunet_protocols.h"
#include "gnunet_signatures.h"
+#include "gnunet_statistics_service.h"
#include "gnunet_transport_service.h"
#include "core.h"
+#define DEBUG_HANDSHAKE GNUNET_NO
+
+#define DEBUG_CORE_QUOTA GNUNET_NO
+
/**
* Receive and send buffer windows grow over time. For
* how long can 'unused' bandwidth accumulate before we
- * need to cap it? (specified in ms).
+ * need to cap it? (specified in seconds).
*/
-#define MAX_WINDOW_TIME (5 * 60 * 1000)
+#define MAX_WINDOW_TIME_S (5 * 60)
/**
- * Minimum of bytes per minute (out) to assign to any connected peer.
- * Should be rather low; values larger than DEFAULT_BPM_IN_OUT make no
- * sense.
+ * How many messages do we queue up at most for optional
+ * notifications to a client? (this can cause notifications
+ * about outgoing messages to be dropped).
*/
-#define MIN_BPM_PER_PEER GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT
+#define MAX_NOTIFY_QUEUE 16
/**
- * What is the smallest change (in number of bytes per minute)
- * that we consider significant enough to bother triggering?
+ * Minimum bandwidth (out) to assign to any connected peer.
+ * Should be rather low; values larger than DEFAULT_BW_IN_OUT make no
+ * sense.
*/
-#define MIN_BPM_CHANGE 32
+#define MIN_BANDWIDTH_PER_PEER GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT
/**
* After how much time past the "official" expiration time do
*/
#define PAST_EXPIRATION_DISCARD_TIME GNUNET_TIME_UNIT_SECONDS
+/**
+ * How long do we delay messages to get larger packet sizes (CORKing)?
+ */
+#define MAX_CORK_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
+
/**
* What is the maximum delay for a SET_KEY message?
*/
-#define MAX_SET_KEY_DELAY GNUNET_TIME_UNIT_SECONDS
+#define MAX_SET_KEY_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
/**
* What how long do we wait for SET_KEY confirmation initially?
*/
-#define INITIAL_SET_KEY_RETRY_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 3)
+#define INITIAL_SET_KEY_RETRY_FREQUENCY GNUNET_TIME_relative_multiply (MAX_SET_KEY_DELAY, 3)
/**
* What is the maximum delay for a PING message?
*/
-#define MAX_PING_DELAY GNUNET_TIME_UNIT_SECONDS
+#define MAX_PING_DELAY GNUNET_TIME_relative_multiply (MAX_SET_KEY_DELAY, 2)
/**
* What is the maximum delay for a PONG message?
*/
-#define MAX_PONG_DELAY GNUNET_TIME_UNIT_SECONDS
+#define MAX_PONG_DELAY GNUNET_TIME_relative_multiply (MAX_PING_DELAY, 2)
+
+/**
+ * What is the minimum frequency for a PING message?
+ */
+#define MIN_PING_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
/**
* How often do we recalculate bandwidth quotas?
*/
-#define QUOTA_UPDATE_FREQUENCY GNUNET_TIME_UNIT_SECONDS
+#define QUOTA_UPDATE_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
/**
* What is the priority for a SET_KEY message?
#define PONG_PRIORITY 0xFFFFFF
/**
- * How many messages do we queue per peer at most?
+ * How many messages do we queue per peer at most? Must be at
+ * least two.
*/
#define MAX_PEER_QUEUE_SIZE 16
* Number of bytes (at the beginning) of "struct EncryptedMessage"
* that are NOT encrypted.
*/
-#define ENCRYPTED_HEADER_SIZE (sizeof(struct GNUNET_MessageHeader) + sizeof(uint32_t) + sizeof(GNUNET_HashCode))
+#define ENCRYPTED_HEADER_SIZE (sizeof(struct GNUNET_MessageHeader) + sizeof(uint32_t))
/**
struct GNUNET_MessageHeader header;
/**
- * Always zero.
+ * Random value used for IV generation. ENCRYPTED_HEADER_SIZE must
+ * be set to the offset of the *next* field.
*/
- uint32_t reserved GNUNET_PACKED;
+ uint32_t iv_seed GNUNET_PACKED;
/**
- * Hash of the plaintext, used to verify message integrity;
- * ALSO used as the IV for the symmetric cipher! Everything
- * after this hash will be encrypted. ENCRYPTED_HEADER_SIZE
- * must be set to the offset of the next field.
+ * Hash of the plaintext (starting at 'sequence_number'), used to
+ * verify message integrity. Everything after this hash (including
+ * this hash itself) will be encrypted.
*/
GNUNET_HashCode plaintext_hash;
uint32_t sequence_number GNUNET_PACKED;
/**
- * Desired bandwidth (how much we should send to this
- * peer / how much is the sender willing to receive),
- * in bytes per minute.
+ * Desired bandwidth (how much we should send to this peer / how
+ * much is the sender willing to receive)?
*/
- uint32_t inbound_bpm_limit GNUNET_PACKED;
+ struct GNUNET_BANDWIDTH_Value32NBO inbound_bw_limit;
/**
* Timestamp. Used to prevent reply of ancient messages
};
+
/**
* We're sending an (encrypted) PING to the other peer to check if he
* can decrypt. The other peer should respond with a PONG with the
struct PingMessage
{
/**
- * Message type is either CORE_PING or CORE_PONG.
+ * Message type is CORE_PING.
*/
struct GNUNET_MessageHeader header;
};
+
+/**
+ * Response to a PING. Includes data from the original PING
+ * plus initial bandwidth quota information.
+ */
+struct PongMessage
+{
+ /**
+ * Message type is CORE_PONG.
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Random number proochosen to make reply harder. Must be
+ * first field after header (this is where we start to encrypt!).
+ */
+ uint32_t challenge GNUNET_PACKED;
+
+ /**
+ * Must be zero.
+ */
+ uint32_t reserved GNUNET_PACKED;
+
+ /**
+ * Desired bandwidth (how much we should send to this
+ * peer / how much is the sender willing to receive).
+ */
+ struct GNUNET_BANDWIDTH_Value32NBO inbound_bw_limit;
+
+ /**
+ * Intended target of the PING, used primarily to check
+ * that decryption actually worked.
+ */
+ struct GNUNET_PeerIdentity target;
+};
+
+
/**
* Message transmitted to set (or update) a session key.
*/
{
/**
- * We keep messages in a linked list (for now).
+ * We keep messages in a doubly linked list.
*/
struct MessageEntry *next;
+ /**
+ * We keep messages in a doubly linked list.
+ */
+ struct MessageEntry *prev;
+
/**
* By when are we supposed to transmit this message?
*/
struct GNUNET_TIME_Absolute deadline;
+ /**
+ * By when are we supposed to transmit this message (after
+ * giving slack)?
+ */
+ struct GNUNET_TIME_Absolute slack_deadline;
+
/**
* How important is this message to us?
*/
* Was this message selected for transmission in the
* current round? GNUNET_YES or GNUNET_NO.
*/
- int16_t do_transmit;
+ int8_t do_transmit;
+
+ /**
+ * Did we give this message some slack (delayed sending) previously
+ * (and hence should not give it any more slack)? GNUNET_YES or
+ * GNUNET_NO.
+ */
+ int8_t got_slack;
};
*/
struct PingMessage *pending_ping;
+ /**
+ * We received a PONG message before we got the "public_key"
+ * (or the SET_KEY). We keep it here until we have a key
+ * to decrypt it. NULL if no PONG is pending.
+ */
+ struct PongMessage *pending_pong;
+
+ /**
+ * Non-NULL if we are currently looking up HELLOs for this peer.
+ * for this peer.
+ */
+ struct GNUNET_PEERINFO_IteratorContext *pitr;
+
+ /**
+ * SetKeyMessage to transmit, NULL if we are not currently trying
+ * to send one.
+ */
+ struct SetKeyMessage *skm;
+
/**
* Identity of the neighbour.
*/
*/
GNUNET_SCHEDULER_TaskIdentifier quota_update_task;
+ /**
+ * ID of task used for sending keep-alive pings.
+ */
+ GNUNET_SCHEDULER_TaskIdentifier keep_alive_task;
+
+ /**
+ * ID of task used for cleaning up dead neighbour entries.
+ */
+ GNUNET_SCHEDULER_TaskIdentifier dead_clean_task;
+
/**
* At what time did we generate our encryption key?
*/
struct GNUNET_TIME_Relative set_key_retry_frequency;
/**
- * Time of our last update to the "available_send_window".
- */
- struct GNUNET_TIME_Absolute last_asw_update;
-
- /**
- * Time of our last update to the "available_recv_window".
- */
- struct GNUNET_TIME_Absolute last_arw_update;
-
- /**
- * Number of bytes that we are eligible to transmit to this
- * peer at this point. Incremented every minute by max_out_bpm,
- * bounded by max_bpm (no back-log larger than MAX_BUF_FACT minutes,
- * bandwidth-hogs are sampled at a frequency of about 78s!);
- * may get negative if we have VERY high priority content.
+ * Tracking bandwidth for sending to this peer.
*/
- long long available_send_window;
+ struct GNUNET_BANDWIDTH_Tracker available_send_window;
/**
- * How much downstream capacity of this peer has been reserved for
- * our traffic? (Our clients can request that a certain amount of
- * bandwidth is available for replies to them; this value is used to
- * make sure that this reserved amount of bandwidth is actually
- * available).
+ * Tracking bandwidth for receiving from this peer.
*/
- long long available_recv_window;
+ struct GNUNET_BANDWIDTH_Tracker available_recv_window;
/**
* How valueable were the messages of this peer recently?
*/
unsigned int last_packets_bitmap;
- /**
- * Number of messages in the message queue for this peer.
- */
- unsigned int message_queue_size;
-
/**
* last sequence number received on this connection (highest)
*/
/**
* Available bandwidth in for this peer (current target).
*/
- uint32_t bpm_in;
+ struct GNUNET_BANDWIDTH_Value32NBO bw_in;
/**
* Available bandwidth out for this peer (current target).
*/
- uint32_t bpm_out;
+ struct GNUNET_BANDWIDTH_Value32NBO bw_out;
/**
- * Internal bandwidth limit set for this peer (initially
- * typically set to "-1"). "bpm_out" is MAX of
- * "bpm_out_internal_limit" and "bpm_out_external_limit".
+ * Internal bandwidth limit set for this peer (initially typically
+ * set to "-1"). Actual "bw_out" is MIN of
+ * "bpm_out_internal_limit" and "bw_out_external_limit".
*/
- uint32_t bpm_out_internal_limit;
+ struct GNUNET_BANDWIDTH_Value32NBO bw_out_internal_limit;
/**
* External bandwidth limit set for this peer by the
- * peer that we are communicating with. "bpm_out" is MAX of
- * "bpm_out_internal_limit" and "bpm_out_external_limit".
+ * peer that we are communicating with. "bw_out" is MIN of
+ * "bw_out_internal_limit" and "bw_out_external_limit".
*/
- uint32_t bpm_out_external_limit;
+ struct GNUNET_BANDWIDTH_Value32NBO bw_out_external_limit;
/**
* What was our PING challenge number (for this peer)?
uint32_t ping_challenge;
/**
- * What is our connection status?
- */
- enum PeerStateMachine status;
-
-};
-
-
-/**
- * Events are messages for clients. The struct
- * itself is followed by the actual message.
- */
-struct Event
-{
- /**
- * This is a linked list.
+ * What was the last distance to this peer as reported by the transports?
*/
- struct Event *next;
+ uint32_t last_distance;
/**
- * Size of the message.
+ * What is our connection status?
*/
- size_t size;
+ enum PeerStateMachine status;
/**
- * Could this event be dropped if this queue
- * is getting too large? (NOT YET USED!)
- */
- int can_drop;
+ * Are we currently connected to this neighbour?
+ */
+ int is_connected;
};
*/
struct GNUNET_SERVER_Client *client_handle;
- /**
- * Linked list of messages we still need to deliver to
- * this client.
- */
- struct Event *event_head;
-
- /**
- * Tail of the linked list of events.
- */
- struct Event *event_tail;
-
- /**
- * Current transmit handle, NULL if no transmission request
- * is pending.
- */
- struct GNUNET_NETWORK_TransmitHandle *th;
-
/**
* Array of the types of messages this peer cares
* about (with "tcnt" entries). Allocated as part
* of this client struct, do not free!
*/
- uint16_t *types;
+ const uint16_t *types;
/**
* Options for messages this client cares about,
*/
static struct Client *clients;
+/**
+ * Context for notifications we need to send to our clients.
+ */
+static struct GNUNET_SERVER_NotificationContext *notifier;
+
/**
* We keep neighbours in a linked list (for now).
*/
static struct Neighbour *neighbours;
+/**
+ * For creating statistics.
+ */
+static struct GNUNET_STATISTICS_Handle *stats;
+
/**
* Sum of all preferences among all neighbours.
*/
static unsigned int neighbour_count;
/**
- * How much inbound bandwidth are we supposed to be using?
+ * How much inbound bandwidth are we supposed to be using per second?
+ * FIXME: this value is not used!
*/
-static unsigned long long bandwidth_target_in;
+static unsigned long long bandwidth_target_in_bps;
/**
- * How much outbound bandwidth are we supposed to be using?
+ * How much outbound bandwidth are we supposed to be using per second?
*/
-static unsigned long long bandwidth_target_out;
+static unsigned long long bandwidth_target_out_bps;
preference_sum += n->current_preference;
n = n->next;
}
-}
-
-
-/**
- * Recalculate the number of bytes we expect to
- * receive or transmit in a given window.
- *
- * @param force force an update now (even if not much time has passed)
- * @param window pointer to the byte counter (updated)
- * @param ts pointer to the timestamp (updated)
- * @param bpm number of bytes per minute that should
- * be added to the window.
- */
-static void
-update_window (int force,
- long long *window,
- struct GNUNET_TIME_Absolute *ts, unsigned int bpm)
-{
- struct GNUNET_TIME_Relative since;
-
- since = GNUNET_TIME_absolute_get_duration (*ts);
- if ( (force == GNUNET_NO) &&
- (since.value < 60 * 1000) )
- return; /* not even a minute has passed */
- *ts = GNUNET_TIME_absolute_get ();
- *window += (bpm * since.value) / 60 / 1000;
- if (*window > MAX_WINDOW_TIME * bpm)
- *window = MAX_WINDOW_TIME * bpm;
+ GNUNET_STATISTICS_set (stats, gettext_noop ("# total peer preference"), preference_sum, GNUNET_NO);
}
}
-/**
- * Find the entry for the given client.
- *
- * @param client handle for the client
- * @return NULL if we are not connected, otherwise the
- * client's struct.
- */
-static struct Client *
-find_client (const struct GNUNET_SERVER_Client *client)
-{
- struct Client *ret;
-
- ret = clients;
- while ((ret != NULL) && (client != ret->client_handle))
- ret = ret->next;
- return ret;
-}
-
-
-/**
- * If necessary, initiate a request with the server to
- * transmit messages from the queue of the given client.
- * @param client who to transfer messages to
- */
-static void request_transmit (struct Client *client);
-
-
-/**
- * Client is ready to receive data, provide it.
- *
- * @param cls closure
- * @param size number of bytes available in buf
- * @param buf where the callee should write the message
- * @return number of bytes written to buf
- */
-static size_t
-do_client_transmit (void *cls, size_t size, void *buf)
-{
- struct Client *client = cls;
- struct Event *e;
- char *tgt;
- size_t ret;
-
- client->th = NULL;
-#if DEBUG_CORE_CLIENT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Client ready to receive %u bytes.\n", size);
-#endif
- if (buf == NULL)
- {
-#if DEBUG_CORE
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Failed to transmit data to client (disconnect)?\n");
-#endif
- return 0; /* we'll surely get a disconnect soon... */
- }
- tgt = buf;
- ret = 0;
- while ((NULL != (e = client->event_head)) && (e->size <= size))
- {
- memcpy (&tgt[ret], &e[1], e->size);
- size -= e->size;
- ret += e->size;
- client->event_head = e->next;
- GNUNET_free (e);
- }
- GNUNET_assert (ret > 0);
- if (client->event_head == NULL)
- client->event_tail = NULL;
-#if DEBUG_CORE_CLIENT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Transmitting %u bytes to client\n", ret);
-#endif
- request_transmit (client);
- return ret;
-}
-
-
-/**
- * If necessary, initiate a request with the server to
- * transmit messages from the queue of the given client.
- * @param client who to transfer messages to
- */
-static void
-request_transmit (struct Client *client)
-{
-
- if (NULL != client->th)
- return; /* already pending */
- if (NULL == client->event_head)
- return; /* no more events pending */
-#if DEBUG_CORE_CLIENT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Asking server to transmit %u bytes to client\n",
- client->event_head->size);
-#endif
- client->th
- = GNUNET_SERVER_notify_transmit_ready (client->client_handle,
- client->event_head->size,
- GNUNET_TIME_UNIT_FOREVER_REL,
- &do_client_transmit, client);
-}
-
-
/**
* Send a message to one of our clients.
+ *
* @param client target for the message
* @param msg message to transmit
* @param can_drop could this message be dropped if the
*/
static void
send_to_client (struct Client *client,
- const struct GNUNET_MessageHeader *msg, int can_drop)
+ const struct GNUNET_MessageHeader *msg,
+ int can_drop)
{
- struct Event *e;
- unsigned int queue_size;
- uint16_t msize;
-
#if DEBUG_CORE_CLIENT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Preparing to send message of type %u to client.\n",
ntohs (msg->type));
-#endif
- queue_size = 0;
- e = client->event_head;
- while (e != NULL)
- {
- queue_size++;
- e = e->next;
- }
- if ( (queue_size >= MAX_CLIENT_QUEUE_SIZE) &&
- (can_drop == GNUNET_YES) )
- {
-#if DEBUG_CORE
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Too many messages in queue for the client, dropping the new message.\n");
-#endif
- return;
- }
-
- msize = ntohs (msg->size);
- e = GNUNET_malloc (sizeof (struct Event) + msize);
- /* append */
- if (client->event_tail != NULL)
- client->event_tail->next = e;
- else
- client->event_head = e;
- client->event_tail = e;
- e->can_drop = can_drop;
- e->size = msize;
- memcpy (&e[1], msg, msize);
- request_transmit (client);
+#endif
+ GNUNET_SERVER_notification_context_unicast (notifier,
+ client->client_handle,
+ msg,
+ can_drop);
}
/**
- * Send a message to all of our current clients.
+ * Send a message to all of our current clients that have
+ * the right options set.
+ *
+ * @param msg message to multicast
+ * @param can_drop can this message be discarded if the queue is too long
+ * @param options mask to use
*/
static void
-send_to_all_clients (const struct GNUNET_MessageHeader *msg, int can_drop)
+send_to_all_clients (const struct GNUNET_MessageHeader *msg,
+ int can_drop,
+ int options)
{
struct Client *c;
c = clients;
while (c != NULL)
{
- send_to_client (c, msg, can_drop);
+ if (0 != (c->options & options))
+ {
+#if DEBUG_CORE_CLIENT
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending message of type %u to client.\n",
+ ntohs (msg->type));
+#endif
+ send_to_client (c, msg, can_drop);
+ }
c = c->next;
}
}
struct Client *c;
uint16_t msize;
const uint16_t *types;
+ uint16_t *wtypes;
struct Neighbour *n;
struct ConnectNotifyMessage cnm;
+ unsigned int i;
#if DEBUG_CORE_CLIENT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
return;
}
+ GNUNET_SERVER_notification_context_add (notifier, client);
im = (const struct InitMessage *) message;
types = (const uint16_t *) &im[1];
msize -= sizeof (struct InitMessage);
c->client_handle = client;
c->next = clients;
clients = c;
- memcpy (&c[1], types, msize);
- c->types = (uint16_t *) & c[1];
- c->options = ntohl (im->options);
c->tcnt = msize / sizeof (uint16_t);
+ c->types = (const uint16_t *) &c[1];
+ wtypes = (uint16_t *) &c[1];
+ for (i=0;i<c->tcnt;i++)
+ wtypes[i] = ntohs (types[i]);
+ c->options = ntohl (im->options);
+#if DEBUG_CORE_CLIENT
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Client %p is interested in %u message types\n",
+ c,
+ c->tcnt);
+#endif
/* send init reply message */
irm.header.size = htons (sizeof (struct InitReplyMessage));
irm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_INIT_REPLY);
"Sending `%s' message to client.\n", "INIT_REPLY");
#endif
send_to_client (c, &irm.header, GNUNET_NO);
- /* notify new client about existing neighbours */
- cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
- cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
- n = neighbours;
- while (n != NULL)
+ if (0 != (c->options & GNUNET_CORE_OPTION_SEND_CONNECT))
{
+ /* notify new client about existing neighbours */
+ cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
+ cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
+ n = neighbours;
+ while (n != NULL)
+ {
+ if (n->status == PEER_STATE_KEY_CONFIRMED)
+ {
#if DEBUG_CORE_CLIENT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Sending `%s' message to client.\n", "NOTIFY_CONNECT");
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending `%s' message to client.\n", "NOTIFY_CONNECT");
#endif
- cnm.reserved = htonl (0);
- cnm.peer = n->peer;
- send_to_client (c, &cnm.header, GNUNET_NO);
- n = n->next;
+ cnm.distance = htonl (n->last_distance);
+ cnm.latency = GNUNET_TIME_relative_hton (n->last_latency);
+ cnm.peer = n->peer;
+ send_to_client (c, &cnm.header, GNUNET_NO);
+ }
+ n = n->next;
+ }
}
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
{
struct Client *pos;
struct Client *prev;
- struct Event *e;
+ if (client == NULL)
+ return;
#if DEBUG_CORE_CLIENT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Client has disconnected from core service.\n");
+ "Client %p has disconnected from core service.\n",
+ client);
#endif
prev = NULL;
pos = clients;
clients = pos->next;
else
prev->next = pos->next;
- if (pos->th != NULL)
- GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->th);
- while (NULL != (e = pos->event_head))
- {
- pos->event_head = e->next;
- GNUNET_free (e);
- }
GNUNET_free (pos);
return;
}
/**
- * Handle REQUEST_CONFIGURE request.
+ * Handle REQUEST_INFO request.
*/
static void
-handle_client_request_configure (void *cls,
- struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *message)
+handle_client_request_info (void *cls,
+ struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *message)
{
- const struct RequestConfigureMessage *rcm;
+ const struct RequestInfoMessage *rcm;
struct Neighbour *n;
struct ConfigurationInfoMessage cim;
- struct Client *c;
- int reserv;
+ int32_t want_reserv;
+ int32_t got_reserv;
unsigned long long old_preference;
+ struct GNUNET_SERVER_TransmitContext *tc;
#if DEBUG_CORE_CLIENT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Core service receives `%s' request.\n", "CONFIGURE");
+ "Core service receives `%s' request.\n", "REQUEST_INFO");
#endif
- rcm = (const struct RequestConfigureMessage *) message;
+ rcm = (const struct RequestInfoMessage *) message;
n = find_neighbour (&rcm->peer);
memset (&cim, 0, sizeof (cim));
- if ((n != NULL) && (n->status == PEER_STATE_KEY_CONFIRMED))
- {
- update_window (GNUNET_YES,
- &n->available_send_window,
- &n->last_asw_update,
- n->bpm_out);
- n->bpm_out_internal_limit = ntohl (rcm->limit_outbound_bpm);
- n->bpm_out = GNUNET_MAX (n->bpm_out_internal_limit,
- n->bpm_out_external_limit);
- reserv = ntohl (rcm->reserve_inbound);
- if (reserv < 0)
+ if (n != NULL)
+ {
+ want_reserv = ntohl (rcm->reserve_inbound);
+ if (n->bw_out_internal_limit.value__ != rcm->limit_outbound.value__)
+ {
+ n->bw_out_internal_limit = rcm->limit_outbound;
+ n->bw_out = GNUNET_BANDWIDTH_value_min (n->bw_out_internal_limit,
+ n->bw_out_external_limit);
+ GNUNET_BANDWIDTH_tracker_update_quota (&n->available_recv_window,
+ n->bw_out);
+ GNUNET_TRANSPORT_set_quota (transport,
+ &n->peer,
+ n->bw_in,
+ n->bw_out,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ NULL, NULL);
+ }
+ if (want_reserv < 0)
{
- n->available_recv_window += reserv;
+ got_reserv = want_reserv;
}
- else if (reserv > 0)
+ else if (want_reserv > 0)
{
- update_window (GNUNET_NO,
- &n->available_recv_window,
- &n->last_arw_update, n->bpm_in);
- if (n->available_recv_window < reserv)
- reserv = n->available_recv_window;
- n->available_recv_window -= reserv;
+ if (GNUNET_BANDWIDTH_tracker_get_delay (&n->available_recv_window,
+ want_reserv).value == 0)
+ got_reserv = want_reserv;
+ else
+ got_reserv = 0; /* all or nothing */
}
+ else
+ got_reserv = 0;
+ GNUNET_BANDWIDTH_tracker_consume (&n->available_recv_window,
+ got_reserv);
old_preference = n->current_preference;
n->current_preference += GNUNET_ntohll(rcm->preference_change);
if (old_preference > n->current_preference)
n->current_preference = (unsigned long long) -1;
}
update_preference_sum (n->current_preference - old_preference);
- cim.reserved_amount = htonl (reserv);
- cim.bpm_in = htonl (n->bpm_in);
- cim.bpm_out = htonl (n->bpm_out);
- cim.latency = GNUNET_TIME_relative_hton (n->last_latency);
+#if DEBUG_CORE_QUOTA
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received reservation request for %d bytes for peer `%4s', reserved %d bytes\n",
+ (int) want_reserv,
+ GNUNET_i2s (&rcm->peer),
+ (int) got_reserv);
+#endif
+ cim.reserved_amount = htonl (got_reserv);
+ cim.bw_in = n->bw_in;
+ cim.bw_out = n->bw_out;
cim.preference = n->current_preference;
}
cim.header.size = htons (sizeof (struct ConfigurationInfoMessage));
cim.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_CONFIGURATION_INFO);
cim.peer = rcm->peer;
- c = find_client (client);
- if (c == NULL)
- {
- GNUNET_break (0);
- return;
- }
+
#if DEBUG_CORE_CLIENT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Sending `%s' message to client.\n", "CONFIGURATION_INFO");
#endif
- send_to_client (c, &cim.header, GNUNET_NO);
+ tc = GNUNET_SERVER_transmit_context_create (client);
+ GNUNET_SERVER_transmit_context_append_message (tc, &cim.header);
+ GNUNET_SERVER_transmit_context_run (tc,
+ GNUNET_TIME_UNIT_FOREVER_REL);
}
/**
- * Check if we have encrypted messages for the specified neighbour
- * pending, and if so, check with the transport about sending them
- * out.
- *
- * @param n neighbour to check.
- */
-static void process_encrypted_neighbour_queue (struct Neighbour *n);
-
-
-/**
- * Function called when the transport service is ready to
- * receive an encrypted message for the respective peer
+ * Free the given entry for the neighbour (it has
+ * already been removed from the list at this point).
*
- * @param cls neighbour to use message from
- * @param size number of bytes we can transmit
- * @param buf where to copy the message
- * @return number of bytes transmitted
+ * @param n neighbour to free
*/
-static size_t
-notify_encrypted_transmit_ready (void *cls, size_t size, void *buf)
+static void
+free_neighbour (struct Neighbour *n)
{
- struct Neighbour *n = cls;
struct MessageEntry *m;
- size_t ret;
- char *cbuf;
- n->th = NULL;
- GNUNET_assert (NULL != (m = n->encrypted_head));
- n->encrypted_head = m->next;
- if (m->next == NULL)
- n->encrypted_tail = NULL;
- ret = 0;
- cbuf = buf;
- if (buf != NULL)
+ if (n->pitr != NULL)
{
- GNUNET_assert (size >= m->size);
- memcpy (cbuf, &m[1], m->size);
- ret = m->size;
- n->available_send_window -= m->size;
- process_encrypted_neighbour_queue (n);
+ GNUNET_PEERINFO_iterate_cancel (n->pitr);
+ n->pitr = NULL;
+ }
+ if (n->skm != NULL)
+ {
+ GNUNET_free (n->skm);
+ n->skm = NULL;
+ }
+ while (NULL != (m = n->messages))
+ {
+ n->messages = m->next;
+ GNUNET_free (m);
+ }
+ while (NULL != (m = n->encrypted_head))
+ {
+ GNUNET_CONTAINER_DLL_remove (n->encrypted_head,
+ n->encrypted_tail,
+ m);
+ GNUNET_free (m);
+ }
+ if (NULL != n->th)
+ {
+ GNUNET_TRANSPORT_notify_transmit_ready_cancel (n->th);
+ n->th = NULL;
+ }
+ if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_TASK)
+ GNUNET_SCHEDULER_cancel (sched, n->retry_plaintext_task);
+ if (n->retry_set_key_task != GNUNET_SCHEDULER_NO_TASK)
+ GNUNET_SCHEDULER_cancel (sched, n->retry_set_key_task);
+ if (n->quota_update_task != GNUNET_SCHEDULER_NO_TASK)
+ GNUNET_SCHEDULER_cancel (sched, n->quota_update_task);
+ if (n->dead_clean_task != GNUNET_SCHEDULER_NO_TASK)
+ GNUNET_SCHEDULER_cancel (sched, n->dead_clean_task);
+ if (n->keep_alive_task != GNUNET_SCHEDULER_NO_TASK)
+ GNUNET_SCHEDULER_cancel (sched, n->keep_alive_task);
+ if (n->status == PEER_STATE_KEY_CONFIRMED)
+ GNUNET_STATISTICS_update (stats, gettext_noop ("# peers connected"), -1, GNUNET_NO);
+ GNUNET_free_non_null (n->public_key);
+ GNUNET_free_non_null (n->pending_ping);
+ GNUNET_free_non_null (n->pending_pong);
+ GNUNET_free (n);
+}
+
+
+/**
+ * Check if we have encrypted messages for the specified neighbour
+ * pending, and if so, check with the transport about sending them
+ * out.
+ *
+ * @param n neighbour to check.
+ */
+static void process_encrypted_neighbour_queue (struct Neighbour *n);
+
+
+/**
+ * Encrypt size bytes from in and write the result to out. Use the
+ * key for outbound traffic of the given neighbour.
+ *
+ * @param n neighbour we are sending to
+ * @param iv initialization vector to use
+ * @param in ciphertext
+ * @param out plaintext
+ * @param size size of in/out
+ * @return GNUNET_OK on success
+ */
+static int
+do_encrypt (struct Neighbour *n,
+ const GNUNET_HashCode * iv,
+ const void *in, void *out, size_t size)
+{
+ if (size != (uint16_t) size)
+ {
+ GNUNET_break (0);
+ return GNUNET_NO;
+ }
+ GNUNET_assert (size ==
+ GNUNET_CRYPTO_aes_encrypt (in,
+ (uint16_t) size,
+ &n->encrypt_key,
+ (const struct
+ GNUNET_CRYPTO_AesInitializationVector
+ *) iv, out));
+ GNUNET_STATISTICS_update (stats, gettext_noop ("# bytes encrypted"), size, GNUNET_NO);
+#if DEBUG_CORE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Encrypted %u bytes for `%4s' using key %u\n", size,
+ GNUNET_i2s (&n->peer), n->encrypt_key.crc32);
+#endif
+ return GNUNET_OK;
+}
+
+
+/**
+ * Consider freeing the given neighbour since we may not need
+ * to keep it around anymore.
+ *
+ * @param n neighbour to consider discarding
+ */
+static void
+consider_free_neighbour (struct Neighbour *n);
+
+
+/**
+ * Task triggered when a neighbour entry is about to time out
+ * (and we should prevent this by sending a PING).
+ *
+ * @param cls the 'struct Neighbour'
+ * @param tc scheduler context (not used)
+ */
+static void
+send_keep_alive (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct Neighbour *n = cls;
+ struct GNUNET_TIME_Relative retry;
+ struct GNUNET_TIME_Relative left;
+ struct MessageEntry *me;
+ struct PingMessage pp;
+ struct PingMessage *pm;
+
+ n->keep_alive_task = GNUNET_SCHEDULER_NO_TASK;
+ /* send PING */
+ me = GNUNET_malloc (sizeof (struct MessageEntry) +
+ sizeof (struct PingMessage));
+ me->deadline = GNUNET_TIME_relative_to_absolute (MAX_PING_DELAY);
+ me->priority = PING_PRIORITY;
+ me->size = sizeof (struct PingMessage);
+ GNUNET_CONTAINER_DLL_insert_after (n->encrypted_head,
+ n->encrypted_tail,
+ n->encrypted_tail,
+ me);
+ pm = (struct PingMessage *) &me[1];
+ pm->header.size = htons (sizeof (struct PingMessage));
+ pm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_PING);
+ pp.challenge = htonl (n->ping_challenge);
+ pp.target = n->peer;
+#if DEBUG_CORE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Encrypting `%s' and `%s' messages for `%4s'.\n",
+ "SET_KEY", "PING", GNUNET_i2s (&n->peer));
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending `%s' to `%4s' with challenge %u encrypted using key %u\n",
+ "PING",
+ GNUNET_i2s (&n->peer), n->ping_challenge, n->encrypt_key.crc32);
+#endif
+ do_encrypt (n,
+ &n->peer.hashPubKey,
+ &pp.challenge,
+ &pm->challenge,
+ sizeof (struct PingMessage) -
+ sizeof (struct GNUNET_MessageHeader));
+ process_encrypted_neighbour_queue (n);
+ /* reschedule PING job */
+ left = GNUNET_TIME_absolute_get_remaining (GNUNET_TIME_absolute_add (n->last_activity,
+ GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT));
+ retry = GNUNET_TIME_relative_max (GNUNET_TIME_relative_divide (left, 2),
+ MIN_PING_FREQUENCY);
+ n->keep_alive_task
+ = GNUNET_SCHEDULER_add_delayed (sched,
+ retry,
+ &send_keep_alive,
+ n);
+
+}
+
+
+/**
+ * Task triggered when a neighbour entry might have gotten stale.
+ *
+ * @param cls the 'struct Neighbour'
+ * @param tc scheduler context (not used)
+ */
+static void
+consider_free_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct Neighbour *n = cls;
+
+ n->dead_clean_task = GNUNET_SCHEDULER_NO_TASK;
+ consider_free_neighbour (n);
+}
+
+
+/**
+ * Consider freeing the given neighbour since we may not need
+ * to keep it around anymore.
+ *
+ * @param n neighbour to consider discarding
+ */
+static void
+consider_free_neighbour (struct Neighbour *n)
+{
+ struct Neighbour *pos;
+ struct Neighbour *prev;
+ struct GNUNET_TIME_Relative left;
+
+ if ( (n->th != NULL) ||
+ (n->pitr != NULL) ||
+ (n->status == PEER_STATE_KEY_CONFIRMED) ||
+ (GNUNET_YES == n->is_connected) )
+ return; /* no chance */
+
+ left = GNUNET_TIME_absolute_get_remaining (GNUNET_TIME_absolute_add (n->last_activity,
+ GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT));
+ if (left.value > 0)
+ {
+ if (n->dead_clean_task != GNUNET_SCHEDULER_NO_TASK)
+ GNUNET_SCHEDULER_cancel (sched, n->dead_clean_task);
+ n->dead_clean_task = GNUNET_SCHEDULER_add_delayed (sched,
+ left,
+ &consider_free_task,
+ n);
+ return;
+ }
+ /* actually free the neighbour... */
+ prev = NULL;
+ pos = neighbours;
+ while (pos != n)
+ {
+ prev = pos;
+ pos = pos->next;
+ }
+ if (prev == NULL)
+ neighbours = n->next;
+ else
+ prev->next = n->next;
+ GNUNET_assert (neighbour_count > 0);
+ neighbour_count--;
+ GNUNET_STATISTICS_set (stats, gettext_noop ("# active neighbours"), neighbour_count, GNUNET_NO);
+ free_neighbour (n);
+}
+
+
+/**
+ * Function called when the transport service is ready to
+ * receive an encrypted message for the respective peer
+ *
+ * @param cls neighbour to use message from
+ * @param size number of bytes we can transmit
+ * @param buf where to copy the message
+ * @return number of bytes transmitted
+ */
+static size_t
+notify_encrypted_transmit_ready (void *cls, size_t size, void *buf)
+{
+ struct Neighbour *n = cls;
+ struct MessageEntry *m;
+ size_t ret;
+ char *cbuf;
+
+ n->th = NULL;
+ GNUNET_assert (NULL != (m = n->encrypted_head));
+ GNUNET_CONTAINER_DLL_remove (n->encrypted_head,
+ n->encrypted_tail,
+ m);
+ ret = 0;
+ cbuf = buf;
+ if (buf != NULL)
+ {
+ GNUNET_assert (size >= m->size);
+ memcpy (cbuf, &m[1], m->size);
+ ret = m->size;
+ GNUNET_BANDWIDTH_tracker_consume (&n->available_send_window,
+ m->size);
#if DEBUG_CORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Copied message of type %u and size %u into transport buffer for `%4s'\n",
ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
ret, GNUNET_i2s (&n->peer));
#endif
+ process_encrypted_neighbour_queue (n);
}
else
{
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Transmission for message of type %u and size %u failed\n",
+#if DEBUG_CORE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Transmission of message of type %u and size %u failed\n",
ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
m->size);
+#endif
}
GNUNET_free (m);
+ consider_free_neighbour (n);
return ret;
}
if (n->th != NULL)
return; /* request already pending */
- if (n->encrypted_head == NULL)
+ m = n->encrypted_head;
+ if (m == NULL)
{
/* encrypted queue empty, try plaintext instead */
process_plaintext_neighbour_queue (n);
#if DEBUG_CORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Asking transport for transmission of %u bytes to `%4s' in next %llu ms\n",
- n->encrypted_head->size,
+ m->size,
GNUNET_i2s (&n->peer),
- GNUNET_TIME_absolute_get_remaining (n->
- encrypted_head->deadline).
+ GNUNET_TIME_absolute_get_remaining (m->deadline).
value);
#endif
n->th =
GNUNET_TRANSPORT_notify_transmit_ready (transport, &n->peer,
- n->encrypted_head->size,
- n->encrypted_head->priority,
+ m->size,
+ m->priority,
GNUNET_TIME_absolute_get_remaining
- (n->encrypted_head->deadline),
+ (m->deadline),
¬ify_encrypted_transmit_ready,
n);
if (n->th == NULL)
{
- /* message request too large (oops) */
+ /* message request too large or duplicate request */
GNUNET_break (0);
/* discard encrypted message */
- GNUNET_assert (NULL != (m = n->encrypted_head));
- n->encrypted_head = m->next;
- if (m->next == NULL)
- n->encrypted_tail = NULL;
+ GNUNET_CONTAINER_DLL_remove (n->encrypted_head,
+ n->encrypted_tail,
+ m);
GNUNET_free (m);
process_encrypted_neighbour_queue (n);
}
return GNUNET_SYSERR;
}
if (size !=
- GNUNET_CRYPTO_aes_decrypt (&n->decrypt_key,
- in,
+ GNUNET_CRYPTO_aes_decrypt (in,
(uint16_t) size,
- (const struct
+ &n->decrypt_key,
+ (const struct
GNUNET_CRYPTO_AesInitializationVector *) iv,
out))
{
GNUNET_break (0);
return GNUNET_SYSERR;
}
+ GNUNET_STATISTICS_update (stats, gettext_noop ("# bytes decrypted"), size, GNUNET_NO);
#if DEBUG_CORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Decrypted %u bytes from `%4s' using key %u\n",
}
-/**
- * Encrypt size bytes from in and write the result to out. Use the
- * key for outbound traffic of the given neighbour.
- *
- * @param n neighbour we are sending to
- * @param iv initialization vector to use
- * @param in ciphertext
- * @param out plaintext
- * @param size size of in/out
- * @return GNUNET_OK on success
- */
-static int
-do_encrypt (struct Neighbour *n,
- const GNUNET_HashCode * iv,
- const void *in, void *out, size_t size)
-{
- if (size != (uint16_t) size)
- {
- GNUNET_break (0);
- return GNUNET_NO;
- }
- GNUNET_assert (size ==
- GNUNET_CRYPTO_aes_encrypt (in,
- (uint16_t) size,
- &n->encrypt_key,
- (const struct
- GNUNET_CRYPTO_AesInitializationVector
- *) iv, out));
-#if DEBUG_CORE
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Encrypted %u bytes for `%4s' using key %u\n", size,
- GNUNET_i2s (&n->peer), n->encrypt_key.crc32);
-#endif
- return GNUNET_OK;
-}
-
-
/**
* Select messages for transmission. This heuristic uses a combination
* of earliest deadline first (EDF) scheduling (with bounded horizon)
unsigned int min_prio;
struct GNUNET_TIME_Absolute t;
struct GNUNET_TIME_Absolute now;
- uint64_t delta;
+ struct GNUNET_TIME_Relative delta;
uint64_t avail;
- unsigned long long slack; /* how long could we wait before missing deadlines? */
+ struct GNUNET_TIME_Relative slack; /* how long could we wait before missing deadlines? */
size_t off;
+ uint64_t tsize;
+ unsigned int queue_size;
int discard_low_prio;
GNUNET_assert (NULL != n->messages);
/* should we remove the entry with the lowest
priority from consideration for scheduling at the
end of the loop? */
+ queue_size = 0;
+ tsize = 0;
+ pos = n->messages;
+ while (pos != NULL)
+ {
+ queue_size++;
+ tsize += pos->size;
+ pos = pos->next;
+ }
discard_low_prio = GNUNET_YES;
while (GNUNET_YES == discard_low_prio)
{
min_prio = -1;
discard_low_prio = GNUNET_NO;
/* calculate number of bytes available for transmission at time "t" */
- update_window (GNUNET_NO,
- &n->available_send_window,
- &n->last_asw_update,
- n->bpm_out);
- avail = n->available_send_window;
- t = n->last_asw_update;
+ avail = GNUNET_BANDWIDTH_tracker_get_available (&n->available_send_window);
+ t = now;
/* how many bytes have we (hypothetically) scheduled so far */
off = 0;
/* maximum time we can wait before transmitting anything
and still make all of our deadlines */
- slack = -1;
-
+ slack = MAX_CORK_DELAY;
pos = n->messages;
/* note that we use "*2" here because we want to look
a bit further into the future; much more makes no
sense since new message might be scheduled in the
meantime... */
while ((pos != NULL) && (off < size * 2))
- {
+ {
if (pos->do_transmit == GNUNET_YES)
{
/* already removed from consideration */
}
if (discard_low_prio == GNUNET_NO)
{
- delta = pos->deadline.value;
- if (delta < t.value)
- delta = 0;
- else
- delta = t.value - delta;
- avail += delta * n->bpm_out / 1000 / 60;
+ delta = GNUNET_TIME_absolute_get_difference (t, pos->deadline);
+ if (delta.value > 0)
+ {
+ // FIXME: HUH? Check!
+ t = pos->deadline;
+ avail += GNUNET_BANDWIDTH_value_get_available_until (n->bw_out,
+ delta);
+ }
if (avail < pos->size)
{
+ // FIXME: HUH? Check!
discard_low_prio = GNUNET_YES; /* we could not schedule this one! */
}
else
/* update slack, considering both its absolute deadline
and relative deadlines caused by other messages
with their respective load */
- slack = GNUNET_MIN (slack, avail / n->bpm_out);
- if (pos->deadline.value < now.value)
- slack = 0;
+ slack = GNUNET_TIME_relative_min (slack,
+ GNUNET_BANDWIDTH_value_get_delay_for (n->bw_out,
+ avail));
+ if (pos->deadline.value <= now.value)
+ {
+ /* now or never */
+ slack = GNUNET_TIME_UNIT_ZERO;
+ }
+ else if (GNUNET_YES == pos->got_slack)
+ {
+ /* should be soon now! */
+ slack = GNUNET_TIME_relative_min (slack,
+ GNUNET_TIME_absolute_get_remaining (pos->slack_deadline));
+ }
else
- slack =
- GNUNET_MIN (slack, pos->deadline.value - now.value);
+ {
+ slack =
+ GNUNET_TIME_relative_min (slack,
+ GNUNET_TIME_absolute_get_difference (now, pos->deadline));
+ pos->got_slack = GNUNET_YES;
+ pos->slack_deadline = GNUNET_TIME_absolute_min (pos->deadline,
+ GNUNET_TIME_relative_to_absolute (MAX_CORK_DELAY));
+ }
}
}
off += pos->size;
- t.value = GNUNET_MAX (pos->deadline.value, t.value);
+ t = GNUNET_TIME_absolute_max (pos->deadline, t); // HUH? Check!
if (pos->priority <= min_prio)
{
/* update min for discard */
}
/* guard against sending "tiny" messages with large headers without
urgent deadlines */
- if ( (slack > 1000) && (size > 4 * off) )
- {
- /* less than 25% of message would be filled with
- deadlines still being met if we delay by one
- second or more; so just wait for more data */
- retry_time->value = slack / 2;
+ if ( (slack.value > 0) &&
+ (size > 4 * off) &&
+ (queue_size <= MAX_PEER_QUEUE_SIZE - 2) )
+ {
+ /* less than 25% of message would be filled with deadlines still
+ being met if we delay by one second or more; so just wait for
+ more data; but do not wait longer than 1s (since we don't want
+ to delay messages for a really long time either). */
+ *retry_time = MAX_CORK_DELAY;
/* reset do_transmit values for next time */
while (pos != last)
{
- pos->do_transmit = GNUNET_NO;
+ pos->do_transmit = GNUNET_NO;
pos = pos->next;
}
+#if DEBUG_CORE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Deferring transmission for %llums due to underfull message buffer size (%u/%u)\n",
+ (unsigned long long) slack.value,
+ (unsigned int) off,
+ (unsigned int) size);
+#endif
return 0;
}
/* select marked messages (up to size) for transmission */
}
#if DEBUG_CORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Selected %u bytes of plaintext messages for transmission to `%4s'.\n",
- off, GNUNET_i2s (&n->peer));
+ "Selected %u/%u bytes of %u/%u plaintext messages for transmission to `%4s'.\n",
+ off, tsize,
+ queue_size, MAX_PEER_QUEUE_SIZE,
+ GNUNET_i2s (&n->peer));
#endif
return off;
}
struct GNUNET_TIME_Relative *retry_time,
unsigned int *priority)
{
+ char ntmb[GNUNET_SERVER_MAX_MESSAGE_SIZE];
+ struct NotifyTrafficMessage *ntm = (struct NotifyTrafficMessage*) ntmb;
struct MessageEntry *pos;
struct MessageEntry *prev;
struct MessageEntry *next;
size_t ret;
-
+
ret = 0;
*priority = 0;
*deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
*retry_time = GNUNET_TIME_UNIT_FOREVER_REL;
if (0 == select_messages (n, size, retry_time))
{
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+#if DEBUG_CORE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"No messages selected, will try again in %llu ms\n",
retry_time->value);
+#endif
return 0;
}
+ ntm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_OUTBOUND);
+ ntm->distance = htonl (n->last_distance);
+ ntm->latency = GNUNET_TIME_relative_hton (n->last_latency);
+ ntm->peer = n->peer;
+
pos = n->messages;
prev = NULL;
while ((pos != NULL) && (size >= sizeof (struct GNUNET_MessageHeader)))
if (GNUNET_YES == pos->do_transmit)
{
GNUNET_assert (pos->size <= size);
+ /* do notifications */
+ /* FIXME: track if we have *any* client that wants
+ full notifications and only do this if that is
+ actually true */
+ if (pos->size < GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (struct NotifyTrafficMessage))
+ {
+ memcpy (&ntm[1], &pos[1], pos->size);
+ ntm->header.size = htons (sizeof (struct NotifyTrafficMessage) +
+ sizeof (struct GNUNET_MessageHeader));
+ send_to_all_clients (&ntm->header,
+ GNUNET_YES,
+ GNUNET_CORE_OPTION_SEND_HDR_OUTBOUND);
+ }
+ else
+ {
+ /* message too large for 'full' notifications, we do at
+ least the 'hdr' type */
+ memcpy (&ntm[1],
+ &pos[1],
+ sizeof (struct GNUNET_MessageHeader));
+ }
+ ntm->header.size = htons (sizeof (struct NotifyTrafficMessage) +
+ pos->size);
+ send_to_all_clients (&ntm->header,
+ GNUNET_YES,
+ GNUNET_CORE_OPTION_SEND_FULL_OUTBOUND);
+#if DEBUG_HANDSHAKE
+ fprintf (stderr,
+ "Encrypting message of type %u\n",
+ ntohs(((struct GNUNET_MessageHeader*)&pos[1])->type));
+#endif
+ /* copy for encrypted transmission */
memcpy (&buf[ret], &pos[1], pos->size);
ret += pos->size;
size -= pos->size;
*priority += pos->priority;
+#if DEBUG_CORE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Adding plaintext message of size %u with deadline %llu ms to batch\n",
+ pos->size,
+ GNUNET_TIME_absolute_get_remaining (pos->deadline).value);
+#endif
deadline->value = GNUNET_MIN (deadline->value, pos->deadline.value);
GNUNET_free (pos);
if (prev == NULL)
}
pos = next;
}
+#if DEBUG_CORE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Deadline for message batch is %llu ms\n",
+ GNUNET_TIME_absolute_get_remaining (*deadline).value);
+#endif
return ret;
}
*/
static void send_key (struct Neighbour *n);
+/**
+ * Task that will retry "send_key" if our previous attempt failed
+ * to yield a PONG.
+ */
+static void
+set_key_retry_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct Neighbour *n = cls;
+
+#if DEBUG_CORE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Retrying key transmission to `%4s'\n",
+ GNUNET_i2s (&n->peer));
+#endif
+ n->retry_set_key_task = GNUNET_SCHEDULER_NO_TASK;
+ n->set_key_retry_frequency =
+ GNUNET_TIME_relative_multiply (n->set_key_retry_frequency, 2);
+ send_key (n);
+}
+
/**
* Check if we have plaintext messages for the specified neighbour
unsigned int priority;
struct GNUNET_TIME_Absolute deadline;
struct GNUNET_TIME_Relative retry_time;
+ GNUNET_HashCode iv;
if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_TASK)
{
#endif
return;
case PEER_STATE_KEY_SENT:
- GNUNET_assert (n->retry_set_key_task !=
- GNUNET_SCHEDULER_NO_TASK);
+ if (n->retry_set_key_task == GNUNET_SCHEDULER_NO_TASK)
+ n->retry_set_key_task
+ = GNUNET_SCHEDULER_add_delayed (sched,
+ n->set_key_retry_frequency,
+ &set_key_retry_task, n);
#if DEBUG_CORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Not yet connected to `%4s', deferring processing of plaintext messages.\n",
#endif
return;
case PEER_STATE_KEY_RECEIVED:
- GNUNET_assert (n->retry_set_key_task !=
- GNUNET_SCHEDULER_NO_TASK);
+ if (n->retry_set_key_task == GNUNET_SCHEDULER_NO_TASK)
+ n->retry_set_key_task
+ = GNUNET_SCHEDULER_add_delayed (sched,
+ n->set_key_retry_frequency,
+ &set_key_retry_task, n);
#if DEBUG_CORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Not yet connected to `%4s', deferring processing of plaintext messages.\n",
/* no messages selected for sending, try again later... */
n->retry_plaintext_task =
GNUNET_SCHEDULER_add_delayed (sched,
- GNUNET_NO,
- GNUNET_SCHEDULER_PRIORITY_IDLE,
- GNUNET_SCHEDULER_NO_TASK,
retry_time,
&retry_plaintext_processing, n);
return;
}
-
+#if DEBUG_CORE_QUOTA
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending %u b/s as new limit to peer `%4s'\n",
+ (unsigned int) ntohl (n->bw_in.value__),
+ GNUNET_i2s (&n->peer));
+#endif
+ ph->iv_seed = htonl (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1));
ph->sequence_number = htonl (++n->last_sequence_number_sent);
- ph->inbound_bpm_limit = htonl (n->bpm_in);
+ ph->inbound_bw_limit = n->bw_in;
ph->timestamp = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
/* setup encryption message header */
em = (struct EncryptedMessage *) &me[1];
em->header.size = htons (used);
em->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE);
- em->reserved = htonl (0);
+ em->iv_seed = ph->iv_seed;
esize = used - ENCRYPTED_HEADER_SIZE;
- GNUNET_CRYPTO_hash (&ph->sequence_number, esize, &em->plaintext_hash);
+ GNUNET_CRYPTO_hash (&ph->sequence_number,
+ esize - sizeof (GNUNET_HashCode),
+ &ph->plaintext_hash);
+ GNUNET_CRYPTO_hash (&ph->iv_seed, sizeof (uint32_t), &iv);
/* encrypt */
#if DEBUG_CORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Encrypting %u bytes of plaintext messages for `%4s' for transmission.\n",
+ "Encrypting %u bytes of plaintext messages for `%4s' for transmission in %llums.\n",
esize,
- GNUNET_i2s(&n->peer));
+ GNUNET_i2s(&n->peer),
+ (unsigned long long) GNUNET_TIME_absolute_get_remaining (deadline).value);
#endif
GNUNET_assert (GNUNET_OK ==
do_encrypt (n,
- &em->plaintext_hash,
- &ph->sequence_number,
- &em->sequence_number, esize));
+ &iv,
+ &ph->plaintext_hash,
+ &em->plaintext_hash, esize));
/* append to transmission list */
- if (n->encrypted_tail == NULL)
- n->encrypted_head = me;
- else
- n->encrypted_tail->next = me;
- n->encrypted_tail = me;
+ GNUNET_CONTAINER_DLL_insert_after (n->encrypted_head,
+ n->encrypted_tail,
+ n->encrypted_tail,
+ me);
process_encrypted_neighbour_queue (n);
}
/**
- * Handle CORE_SEND request.
+ * Function that recalculates the bandwidth quota for the
+ * given neighbour and transmits it to the transport service.
+ *
+ * @param cls neighbour for the quota update
+ * @param tc context
+ */
+static void
+neighbour_quota_update (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc);
+
+
+/**
+ * Schedule the task that will recalculate the bandwidth
+ * quota for this peer (and possibly force a disconnect of
+ * idle peers by calculating a bandwidth of zero).
*/
static void
-handle_client_send (void *cls,
- struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *message);
+schedule_quota_update (struct Neighbour *n)
+{
+ GNUNET_assert (n->quota_update_task ==
+ GNUNET_SCHEDULER_NO_TASK);
+ n->quota_update_task
+ = GNUNET_SCHEDULER_add_delayed (sched,
+ QUOTA_UPDATE_FREQUENCY,
+ &neighbour_quota_update,
+ n);
+}
/**
- * Function called to notify us that we either succeeded
- * or failed to connect (at the transport level) to another
- * peer. We should either free the message we were asked
- * to transmit or re-try adding it to the queue.
+ * Initialize a new 'struct Neighbour'.
*
- * @param cls closure
- * @param size number of bytes available in buf
- * @param buf where the callee should write the message
- * @return number of bytes written to buf
+ * @param pid ID of the new neighbour
+ * @return handle for the new neighbour
*/
-static size_t
-send_connect_continuation (void *cls, size_t size, void *buf)
+static struct Neighbour *
+create_neighbour (const struct GNUNET_PeerIdentity *pid)
{
- struct SendMessage *sm = cls;
+ struct Neighbour *n;
+ struct GNUNET_TIME_Absolute now;
- if (buf == NULL)
- {
-#if DEBUG_CORE
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Asked to send message to disconnected peer `%4s' and connection failed. Discarding message.\n",
- GNUNET_i2s (&sm->peer));
-#endif
- GNUNET_free (sm);
- return 0;
- }
-#if DEBUG_CORE
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Connection to peer `%4s' succeeded, retrying original transmission request\n",
- GNUNET_i2s (&sm->peer));
-#endif
- handle_client_send (NULL, NULL, &sm->header);
- GNUNET_free (sm);
- return 0;
+ n = GNUNET_malloc (sizeof (struct Neighbour));
+ n->next = neighbours;
+ neighbours = n;
+ neighbour_count++;
+ GNUNET_STATISTICS_set (stats, gettext_noop ("# active neighbours"), neighbour_count, GNUNET_NO);
+ n->peer = *pid;
+ GNUNET_CRYPTO_aes_create_session_key (&n->encrypt_key);
+ now = GNUNET_TIME_absolute_get ();
+ n->encrypt_key_created = now;
+ n->last_activity = now;
+ n->set_key_retry_frequency = INITIAL_SET_KEY_RETRY_FREQUENCY;
+ n->bw_in = GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT;
+ n->bw_out = GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT;
+ n->bw_out_internal_limit = GNUNET_BANDWIDTH_value_init ((uint32_t) - 1);
+ n->bw_out_external_limit = GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT;
+ n->ping_challenge = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+ (uint32_t) - 1);
+ neighbour_quota_update (n, NULL);
+ return n;
}
/**
* Handle CORE_SEND request.
+ *
+ * @param cls unused
+ * @param client the client issuing the request
+ * @param message the "struct SendMessage"
*/
static void
handle_client_send (void *cls,
const struct GNUNET_MessageHeader *message)
{
const struct SendMessage *sm;
- struct SendMessage *smc;
- const struct GNUNET_MessageHeader *mh;
struct Neighbour *n;
struct MessageEntry *prev;
struct MessageEntry *pos;
}
sm = (const struct SendMessage *) message;
msize -= sizeof (struct SendMessage);
- mh = (const struct GNUNET_MessageHeader *) &sm[1];
- if (msize != ntohs (mh->size))
- {
- GNUNET_break (0);
- if (client != NULL)
- GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
- return;
- }
n = find_neighbour (&sm->peer);
if (n == NULL)
- {
-#if DEBUG_CORE
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Core received `%s' request for `%4s', will try to establish connection within %llu ms\n",
- "SEND",
- GNUNET_i2s (&sm->peer),
- GNUNET_TIME_absolute_get_remaining
- (GNUNET_TIME_absolute_ntoh(sm->deadline)).value);
-#endif
- msize += sizeof (struct SendMessage);
- /* ask transport to connect to the peer */
- smc = GNUNET_malloc (msize);
- memcpy (smc, sm, msize);
- if (NULL ==
- GNUNET_TRANSPORT_notify_transmit_ready (transport,
- &sm->peer,
- 0, 0,
- GNUNET_TIME_absolute_get_remaining
- (GNUNET_TIME_absolute_ntoh
- (sm->deadline)),
- &send_connect_continuation,
- smc))
- {
- /* transport has already a request pending for this peer! */
-#if DEBUG_CORE
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Dropped second message destined for `%4s' since connection is still down.\n",
- GNUNET_i2s(&sm->peer));
-#endif
- GNUNET_free (smc);
- }
- if (client != NULL)
- GNUNET_SERVER_receive_done (client, GNUNET_OK);
- return;
- }
+ n = create_neighbour (&sm->peer);
#if DEBUG_CORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Core received `%s' request, queueing %u bytes of plaintext data for transmission to `%4s'.\n",
/* discard new entry */
#if DEBUG_CORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Queue full, discarding new request\n");
+ "Queue full (%u/%u), discarding new request (%u bytes of type %u)\n",
+ queue_size,
+ MAX_PEER_QUEUE_SIZE,
+ msize,
+ ntohs (message->type));
#endif
if (client != NULL)
GNUNET_SERVER_receive_done (client, GNUNET_OK);
min_prio_prev->next = min_prio_entry->next;
GNUNET_free (min_prio_entry);
}
-
+
+#if DEBUG_CORE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Adding transmission request for `%4s' of size %u to queue\n",
+ GNUNET_i2s (&sm->peer),
+ msize);
+#endif
e = GNUNET_malloc (sizeof (struct MessageEntry) + msize);
e->deadline = GNUNET_TIME_absolute_ntoh (sm->deadline);
e->priority = ntohl (sm->priority);
e->size = msize;
- memcpy (&e[1], mh, msize);
+ memcpy (&e[1], &sm[1], msize);
/* insert, keep list sorted by deadline */
prev = NULL;
}
+/**
+ * Function called when the transport service is ready to
+ * receive a message. Only resets 'n->th' to NULL.
+ *
+ * @param cls neighbour to use message from
+ * @param size number of bytes we can transmit
+ * @param buf where to copy the message
+ * @return number of bytes transmitted
+ */
+static size_t
+notify_transport_connect_done (void *cls, size_t size, void *buf)
+{
+ struct Neighbour *n = cls;
+
+ n->th = NULL;
+ if (buf == NULL)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ _("Failed to connect to `%4s': transport failed to connect\n"),
+ GNUNET_i2s (&n->peer));
+ return 0;
+ }
+ send_key (n);
+ return 0;
+}
+
+
+/**
+ * Handle CORE_REQUEST_CONNECT request.
+ *
+ * @param cls unused
+ * @param client the client issuing the request
+ * @param message the "struct ConnectMessage"
+ */
+static void
+handle_client_request_connect (void *cls,
+ struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *message)
+{
+ const struct ConnectMessage *cm = (const struct ConnectMessage*) message;
+ struct Neighbour *n;
+ struct GNUNET_TIME_Relative timeout;
+
+ if (0 == memcmp (&cm->peer, &my_identity, sizeof (struct GNUNET_PeerIdentity)))
+ {
+ GNUNET_break (0);
+ GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+ return;
+ }
+ GNUNET_SERVER_receive_done (client, GNUNET_OK);
+ n = find_neighbour (&cm->peer);
+ if (n == NULL)
+ n = create_neighbour (&cm->peer);
+ if ( (n->is_connected) ||
+ (n->th != NULL) )
+ return; /* already connected, or at least trying */
+ GNUNET_STATISTICS_update (stats, gettext_noop ("# connection requests received"), 1, GNUNET_NO);
+#if DEBUG_CORE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Core received `%s' request for `%4s', will try to establish connection\n",
+ "REQUEST_CONNECT",
+ GNUNET_i2s (&cm->peer));
+#endif
+ timeout = GNUNET_TIME_relative_ntoh (cm->timeout);
+ /* ask transport to connect to the peer */
+ n->th = GNUNET_TRANSPORT_notify_transmit_ready (transport,
+ &cm->peer,
+ sizeof (struct GNUNET_MessageHeader), 0,
+ timeout,
+ ¬ify_transport_connect_done,
+ n);
+ GNUNET_break (NULL != n->th);
+}
+
+
/**
* List of handlers for the messages understood by this
* service.
static struct GNUNET_SERVER_MessageHandler handlers[] = {
{&handle_client_init, NULL,
GNUNET_MESSAGE_TYPE_CORE_INIT, 0},
- {&handle_client_request_configure, NULL,
- GNUNET_MESSAGE_TYPE_CORE_REQUEST_CONFIGURE,
- sizeof (struct RequestConfigureMessage)},
+ {&handle_client_request_info, NULL,
+ GNUNET_MESSAGE_TYPE_CORE_REQUEST_INFO,
+ sizeof (struct RequestInfoMessage)},
{&handle_client_send, NULL,
GNUNET_MESSAGE_TYPE_CORE_SEND, 0},
+ {&handle_client_request_connect, NULL,
+ GNUNET_MESSAGE_TYPE_CORE_REQUEST_CONNECT,
+ sizeof (struct ConnectMessage)},
{NULL, NULL, 0, 0}
};
/**
- * PEERINFO is giving us a HELLO for a peer. Add the
- * public key to the neighbour's struct and retry
- * send_key. Or, if we did not get a HELLO, just do
- * nothing.
+ * PEERINFO is giving us a HELLO for a peer. Add the public key to
+ * the neighbour's struct and retry send_key. Or, if we did not get a
+ * HELLO, just do nothing.
*
- * @param cls NULL
+ * @param cls the 'struct Neighbour' to retry sending the key for
* @param peer the peer for which this is the HELLO
* @param hello HELLO message of that peer
* @param trust amount of trust we currently have in that peer
const struct GNUNET_HELLO_Message *hello,
uint32_t trust)
{
- struct Neighbour *n;
+ struct Neighbour *n = cls;
if (peer == NULL)
- return;
- n = find_neighbour (peer);
- if (n == NULL)
- return;
+ {
+#if DEBUG_CORE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Entered `process_hello_retry_send_key' and `peer' is NULL!\n");
+#endif
+ n->pitr = NULL;
+ if (n->public_key != NULL)
+ {
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# SETKEY messages deferred (need public key)"),
+ -1,
+ GNUNET_NO);
+ send_key (n);
+ }
+ else
+ {
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# Delayed connecting due to lack of public key"),
+ 1,
+ GNUNET_NO);
+ if (GNUNET_SCHEDULER_NO_TASK == n->retry_set_key_task)
+ n->retry_set_key_task
+ = GNUNET_SCHEDULER_add_delayed (sched,
+ n->set_key_retry_frequency,
+ &set_key_retry_task, n);
+ }
+ return;
+ }
+
+#if DEBUG_CORE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Entered `process_hello_retry_send_key' for peer `%4s'\n",
+ GNUNET_i2s (peer));
+#endif
if (n->public_key != NULL)
- return;
+ {
+#if DEBUG_CORE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "already have public key for peer %s!! (so why are we here?)\n",
+ GNUNET_i2s (peer));
+#endif
+ return;
+ }
+
#if DEBUG_CORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Received new `%s' message for `%4s', initiating key exchange.\n",
GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
if (GNUNET_OK != GNUNET_HELLO_get_key (hello, n->public_key))
{
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# Error extracting public key from HELLO"),
+ 1,
+ GNUNET_NO);
GNUNET_free (n->public_key);
n->public_key = NULL;
+#if DEBUG_CORE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "GNUNET_HELLO_get_key returned awfully\n");
+#endif
return;
}
- send_key (n);
-}
-
-
-/**
- * Task that will retry "send_key" if our previous attempt failed
- * to yield a PONG.
- */
-static void
-set_key_retry_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
- struct Neighbour *n = cls;
-
- GNUNET_assert (n->status != PEER_STATE_KEY_CONFIRMED);
- n->retry_set_key_task = GNUNET_SCHEDULER_NO_TASK;
- n->set_key_retry_frequency =
- GNUNET_TIME_relative_multiply (n->set_key_retry_frequency, 2);
- send_key (n);
}
struct PingMessage pp;
struct PingMessage *pm;
+ if ( (n->retry_set_key_task != GNUNET_SCHEDULER_NO_TASK) ||
+ (n->pitr != NULL) )
+ {
+#if DEBUG_CORE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Key exchange in progress with `%4s'.\n",
+ GNUNET_i2s (&n->peer));
+#endif
+ return; /* already in progress */
+ }
+
#if DEBUG_CORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Asked to perform key exchange with `%4s'.\n",
/* lookup n's public key, then try again */
#if DEBUG_CORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Lacking public key for `%4s', trying to obtain one.\n",
+ "Lacking public key for `%4s', trying to obtain one (send_key).\n",
GNUNET_i2s (&n->peer));
#endif
- GNUNET_PEERINFO_for_all (cfg,
- sched,
- &n->peer,
- 0,
- GNUNET_TIME_UNIT_MINUTES,
- &process_hello_retry_send_key, NULL);
+ GNUNET_assert (n->pitr == NULL);
+ n->pitr = GNUNET_PEERINFO_iterate (cfg,
+ sched,
+ &n->peer,
+ 0,
+ GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 20),
+ &process_hello_retry_send_key, n);
return;
}
/* first, set key message */
me->deadline = GNUNET_TIME_relative_to_absolute (MAX_SET_KEY_DELAY);
me->priority = SET_KEY_PRIORITY;
me->size = sizeof (struct SetKeyMessage);
- if (n->encrypted_head == NULL)
- n->encrypted_head = me;
- else
- n->encrypted_tail->next = me;
- n->encrypted_tail = me;
+ GNUNET_CONTAINER_DLL_insert_after (n->encrypted_head,
+ n->encrypted_tail,
+ n->encrypted_tail,
+ me);
sm = (struct SetKeyMessage *) &me[1];
sm->header.size = htons (sizeof (struct SetKeyMessage));
sm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SET_KEY);
me->deadline = GNUNET_TIME_relative_to_absolute (MAX_PING_DELAY);
me->priority = PING_PRIORITY;
me->size = sizeof (struct PingMessage);
- n->encrypted_tail->next = me;
- n->encrypted_tail = me;
+ GNUNET_CONTAINER_DLL_insert_after (n->encrypted_head,
+ n->encrypted_tail,
+ n->encrypted_tail,
+ me);
pm = (struct PingMessage *) &me[1];
pm->header.size = htons (sizeof (struct PingMessage));
pm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_PING);
case PEER_STATE_KEY_RECEIVED:
break;
case PEER_STATE_KEY_CONFIRMED:
- GNUNET_break (0);
break;
default:
GNUNET_break (0);
break;
}
+#if DEBUG_CORE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Have %llu ms left for `%s' transmission.\n",
+ (unsigned long long) GNUNET_TIME_absolute_get_remaining (me->deadline).value,
+ "SET_KEY");
+#endif
/* trigger queue processing */
process_encrypted_neighbour_queue (n);
- if (n->status != PEER_STATE_KEY_CONFIRMED)
+ if ( (n->status != PEER_STATE_KEY_CONFIRMED) &&
+ (GNUNET_SCHEDULER_NO_TASK == n->retry_set_key_task) )
n->retry_set_key_task
= GNUNET_SCHEDULER_add_delayed (sched,
- GNUNET_NO,
- GNUNET_SCHEDULER_PRIORITY_KEEP,
- GNUNET_SCHEDULER_NO_TASK,
- n->set_key_retry_frequency,
- &set_key_retry_task, n);
+ n->set_key_retry_frequency,
+ &set_key_retry_task, n);
}
const struct GNUNET_HELLO_Message *hello,
uint32_t trust)
{
- struct SetKeyMessage *sm = cls;
- struct Neighbour *n;
+ struct Neighbour *n = cls;
+ struct SetKeyMessage *sm = n->skm;
if (peer == NULL)
{
GNUNET_free (sm);
- return;
- }
- n = find_neighbour (peer);
- if (n == NULL)
- {
- GNUNET_break (0);
+ n->skm = NULL;
+ n->pitr = NULL;
return;
}
if (n->public_key != NULL)
handle_ping (struct Neighbour *n, const struct PingMessage *m)
{
struct PingMessage t;
- struct PingMessage *tp;
+ struct PongMessage tx;
+ struct PongMessage *tp;
struct MessageEntry *me;
#if DEBUG_CORE
"Target of `%s' request is `%4s'.\n",
"PING", GNUNET_i2s (&t.target));
#endif
+ GNUNET_STATISTICS_update (stats, gettext_noop ("# ping messages decrypted"), 1, GNUNET_NO);
if (0 != memcmp (&t.target,
&my_identity, sizeof (struct GNUNET_PeerIdentity)))
{
return;
}
me = GNUNET_malloc (sizeof (struct MessageEntry) +
- sizeof (struct PingMessage));
- if (n->encrypted_tail != NULL)
- n->encrypted_tail->next = me;
- else
- {
- n->encrypted_tail = me;
- n->encrypted_head = me;
- }
+ sizeof (struct PongMessage));
+ GNUNET_CONTAINER_DLL_insert_after (n->encrypted_head,
+ n->encrypted_tail,
+ n->encrypted_tail,
+ me);
me->deadline = GNUNET_TIME_relative_to_absolute (MAX_PONG_DELAY);
me->priority = PONG_PRIORITY;
- me->size = sizeof (struct PingMessage);
- tp = (struct PingMessage *) &me[1];
+ me->size = sizeof (struct PongMessage);
+ tx.reserved = htonl (0);
+ tx.inbound_bw_limit = n->bw_in;
+ tx.challenge = t.challenge;
+ tx.target = t.target;
+ tp = (struct PongMessage *) &me[1];
tp->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_PONG);
- tp->header.size = htons (sizeof (struct PingMessage));
+ tp->header.size = htons (sizeof (struct PongMessage));
do_encrypt (n,
&my_identity.hashPubKey,
- &t.challenge,
+ &tx.challenge,
&tp->challenge,
- sizeof (struct PingMessage) -
+ sizeof (struct PongMessage) -
sizeof (struct GNUNET_MessageHeader));
#if DEBUG_CORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
}
+/**
+ * We received a PONG message. Validate and update our status.
+ *
+ * @param n sender of the PONG
+ * @param m the encrypted PONG message itself
+ */
+static void
+handle_pong (struct Neighbour *n,
+ const struct PongMessage *m)
+{
+ struct PongMessage t;
+ struct ConnectNotifyMessage cnm;
+
+#if DEBUG_CORE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Core service receives `%s' request from `%4s'.\n",
+ "PONG", GNUNET_i2s (&n->peer));
+#endif
+ if (GNUNET_OK !=
+ do_decrypt (n,
+ &n->peer.hashPubKey,
+ &m->challenge,
+ &t.challenge,
+ sizeof (struct PongMessage) -
+ sizeof (struct GNUNET_MessageHeader)))
+ return;
+ if (0 != ntohl (t.reserved))
+ {
+ GNUNET_break_op (0);
+ return;
+ }
+#if DEBUG_CORE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Decrypted `%s' from `%4s' with challenge %u using key %u\n",
+ "PONG",
+ GNUNET_i2s (&t.target),
+ ntohl (t.challenge), n->decrypt_key.crc32);
+#endif
+ if ((0 != memcmp (&t.target,
+ &n->peer,
+ sizeof (struct GNUNET_PeerIdentity))) ||
+ (n->ping_challenge != ntohl (t.challenge)))
+ {
+ /* PONG malformed */
+#if DEBUG_CORE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received malformed `%s' wanted sender `%4s' with challenge %u\n",
+ "PONG", GNUNET_i2s (&n->peer), n->ping_challenge);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received malformed `%s' received from `%4s' with challenge %u\n",
+ "PONG", GNUNET_i2s (&t.target), ntohl (t.challenge));
+#endif
+ GNUNET_break_op (0);
+ return;
+ }
+ switch (n->status)
+ {
+ case PEER_STATE_DOWN:
+ GNUNET_break (0); /* should be impossible */
+ return;
+ case PEER_STATE_KEY_SENT:
+ GNUNET_break (0); /* should be impossible, how did we decrypt? */
+ return;
+ case PEER_STATE_KEY_RECEIVED:
+ n->status = PEER_STATE_KEY_CONFIRMED;
+ if (n->bw_out_external_limit.value__ != t.inbound_bw_limit.value__)
+ {
+ n->bw_out_external_limit = t.inbound_bw_limit;
+ n->bw_out = GNUNET_BANDWIDTH_value_min (n->bw_out_external_limit,
+ n->bw_out_internal_limit);
+ GNUNET_BANDWIDTH_tracker_update_quota (&n->available_send_window,
+ n->bw_out);
+ GNUNET_TRANSPORT_set_quota (transport,
+ &n->peer,
+ n->bw_in,
+ n->bw_out,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ NULL, NULL);
+ }
+#if DEBUG_CORE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Confirmed key via `%s' message for peer `%4s'\n",
+ "PONG", GNUNET_i2s (&n->peer));
+#endif
+ if (n->retry_set_key_task != GNUNET_SCHEDULER_NO_TASK)
+ {
+ GNUNET_SCHEDULER_cancel (sched, n->retry_set_key_task);
+ n->retry_set_key_task = GNUNET_SCHEDULER_NO_TASK;
+ }
+ cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
+ cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
+ cnm.distance = htonl (n->last_distance);
+ cnm.latency = GNUNET_TIME_relative_hton (n->last_latency);
+ cnm.peer = n->peer;
+ send_to_all_clients (&cnm.header, GNUNET_YES, GNUNET_CORE_OPTION_SEND_CONNECT);
+ process_encrypted_neighbour_queue (n);
+ /* fall-through! */
+ case PEER_STATE_KEY_CONFIRMED:
+ n->last_activity = GNUNET_TIME_absolute_get ();
+ if (n->keep_alive_task != GNUNET_SCHEDULER_NO_TASK)
+ GNUNET_SCHEDULER_cancel (sched, n->keep_alive_task);
+ n->keep_alive_task
+ = GNUNET_SCHEDULER_add_delayed (sched,
+ GNUNET_TIME_relative_divide (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, 2),
+ &send_keep_alive,
+ n);
+ break;
+ default:
+ GNUNET_break (0);
+ break;
+ }
+}
+
+
/**
* We received a SET_KEY message. Validate and update
* our key material and status.
struct GNUNET_TIME_Absolute t;
struct GNUNET_CRYPTO_AesSessionKey k;
struct PingMessage *ping;
+ struct PongMessage *pong;
enum PeerStateMachine sender_status;
#if DEBUG_CORE
#endif
if (n->public_key == NULL)
{
+ if (n->pitr != NULL)
+ {
+#if DEBUG_CORE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Ignoring `%s' message due to lack of public key for peer (still trying to obtain one).\n",
+ "SET_KEY");
+#endif
+ return;
+ }
#if DEBUG_CORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Lacking public key for peer, trying to obtain one.\n");
+ "Lacking public key for peer, trying to obtain one (handle_set_key).\n");
#endif
m_cpy = GNUNET_malloc (sizeof (struct SetKeyMessage));
memcpy (m_cpy, m, sizeof (struct SetKeyMessage));
/* lookup n's public key, then try again */
- GNUNET_PEERINFO_for_all (cfg,
- sched,
- &n->peer,
- 0,
- GNUNET_TIME_UNIT_MINUTES,
- &process_hello_retry_handle_set_key, m_cpy);
+ GNUNET_assert (n->skm == NULL);
+ n->skm = m_cpy;
+ n->pitr = GNUNET_PEERINFO_iterate (cfg,
+ sched,
+ &n->peer,
+ 0,
+ GNUNET_TIME_UNIT_MINUTES,
+ &process_hello_retry_handle_set_key, n);
+ GNUNET_STATISTICS_update (stats, gettext_noop ("# SETKEY messages deferred (need public key)"), 1, GNUNET_NO);
+ return;
+ }
+ if (0 != memcmp (&m->target,
+ &my_identity,
+ sizeof (struct GNUNET_PeerIdentity)))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ _("Received `%s' message that was for `%s', not for me. Ignoring.\n"),
+ "SET_KEY",
+ GNUNET_i2s (&m->target));
return;
}
if ((ntohl (m->purpose.size) !=
return;
}
#if DEBUG_CORE
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Decrypting key material.\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Decrypting key material.\n");
#endif
if ((GNUNET_CRYPTO_rsa_decrypt (my_private_key,
&m->encrypted_key,
case PEER_STATE_KEY_CONFIRMED:
if ((sender_status != PEER_STATE_KEY_RECEIVED) &&
(sender_status != PEER_STATE_KEY_CONFIRMED))
- {
+ {
#if DEBUG_CORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Responding to `%s' with my own key (other peer has status %u), I was already fully up.\n",
handle_ping (n, ping);
GNUNET_free (ping);
}
-}
-
-
-/**
- * We received a PONG message. Validate and update
- * our status.
- *
- * @param n sender of the PONG
- * @param m the encrypted PONG message itself
- */
-static void
-handle_pong (struct Neighbour *n, const struct PingMessage *m)
-{
- struct PingMessage t;
-
-#if DEBUG_CORE
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Core service receives `%s' request from `%4s'.\n",
- "PONG", GNUNET_i2s (&n->peer));
-#endif
- if (GNUNET_OK !=
- do_decrypt (n,
- &n->peer.hashPubKey,
- &m->challenge,
- &t.challenge,
- sizeof (struct PingMessage) -
- sizeof (struct GNUNET_MessageHeader)))
- return;
-#if DEBUG_CORE
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Decrypted `%s' from `%4s' with challenge %u using key %u\n",
- "PONG",
- GNUNET_i2s (&t.target),
- ntohl (t.challenge), n->decrypt_key.crc32);
-#endif
- if ((0 != memcmp (&t.target,
- &n->peer,
- sizeof (struct GNUNET_PeerIdentity))) ||
- (n->ping_challenge != ntohl (t.challenge)))
- {
- /* PONG malformed */
-#if DEBUG_CORE
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received malfromed `%s' wanted sender `%4s' with challenge %u\n",
- "PONG", GNUNET_i2s (&n->peer), n->ping_challenge);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received malfromed `%s' received from `%4s' with challenge %u\n",
- "PONG", GNUNET_i2s (&t.target), ntohl (t.challenge));
-#endif
- GNUNET_break_op (0);
- return;
- }
- switch (n->status)
+ if (n->pending_pong != NULL)
{
- case PEER_STATE_DOWN:
- GNUNET_break (0); /* should be impossible */
- return;
- case PEER_STATE_KEY_SENT:
- GNUNET_break (0); /* should be impossible, how did we decrypt? */
- return;
- case PEER_STATE_KEY_RECEIVED:
- n->status = PEER_STATE_KEY_CONFIRMED;
- if (n->retry_set_key_task != GNUNET_SCHEDULER_NO_TASK)
- {
- GNUNET_SCHEDULER_cancel (sched, n->retry_set_key_task);
- n->retry_set_key_task = GNUNET_SCHEDULER_NO_TASK;
- }
- process_encrypted_neighbour_queue (n);
- break;
- case PEER_STATE_KEY_CONFIRMED:
- /* duplicate PONG? */
- break;
- default:
- GNUNET_break (0);
- break;
+ pong = n->pending_pong;
+ n->pending_pong = NULL;
+ handle_pong (n, pong);
+ GNUNET_free (pong);
}
}
ntm = (struct NotifyTrafficMessage *) buf;
ntm->header.size = htons (msize + sizeof (struct NotifyTrafficMessage));
ntm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_INBOUND);
- ntm->reserved = htonl (0);
+ ntm->distance = htonl (sender->last_distance);
+ ntm->latency = GNUNET_TIME_relative_hton (sender->last_latency);
ntm->peer = sender->peer;
memcpy (&ntm[1], m, msize);
send_to_client (client, &ntm->header, GNUNET_YES);
uint16_t type;
unsigned int tpos;
int deliver_full;
+ int dropped;
type = ntohs (m->type);
+#if DEBUG_CORE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received encapsulated message of type %u from `%4s'\n",
+ type,
+ GNUNET_i2s (&sender->peer));
+#endif
+ dropped = GNUNET_YES;
cpos = clients;
while (cpos != NULL)
{
deliver_full = GNUNET_NO;
- if (cpos->options & GNUNET_CORE_OPTION_SEND_FULL_INBOUND)
+ if (0 != (cpos->options & GNUNET_CORE_OPTION_SEND_FULL_INBOUND))
deliver_full = GNUNET_YES;
else
{
}
}
if (GNUNET_YES == deliver_full)
- send_p2p_message_to_client (sender, cpos, m, msize);
+ {
+ send_p2p_message_to_client (sender, cpos, m, msize);
+ dropped = GNUNET_NO;
+ }
else if (cpos->options & GNUNET_CORE_OPTION_SEND_HDR_INBOUND)
- send_p2p_message_to_client (sender, cpos, m,
- sizeof (struct GNUNET_MessageHeader));
+ {
+ send_p2p_message_to_client (sender, cpos, m,
+ sizeof (struct GNUNET_MessageHeader));
+ }
cpos = cpos->next;
}
+ if (dropped == GNUNET_YES)
+ {
+#if DEBUG_CORE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Message of type %u from `%4s' not delivered to any client.\n",
+ type,
+ GNUNET_i2s (&sender->peer));
+#endif
+ /* FIXME: stats... */
+ }
}
size_t off;
uint32_t snum;
struct GNUNET_TIME_Absolute t;
+ GNUNET_HashCode iv;
#if DEBUG_CORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Core service receives `%s' request from `%4s'.\n",
"ENCRYPTED_MESSAGE", GNUNET_i2s (&n->peer));
#endif
+ GNUNET_CRYPTO_hash (&m->iv_seed, sizeof (uint32_t), &iv);
/* decrypt */
if (GNUNET_OK !=
do_decrypt (n,
+ &iv,
&m->plaintext_hash,
- &m->sequence_number,
- &buf[ENCRYPTED_HEADER_SIZE], size - ENCRYPTED_HEADER_SIZE))
+ &buf[ENCRYPTED_HEADER_SIZE],
+ size - ENCRYPTED_HEADER_SIZE))
return;
pt = (struct EncryptedMessage *) buf;
/* validate hash */
GNUNET_CRYPTO_hash (&pt->sequence_number,
- size - ENCRYPTED_HEADER_SIZE, &ph);
- if (0 != memcmp (&ph, &m->plaintext_hash, sizeof (GNUNET_HashCode)))
+ size - ENCRYPTED_HEADER_SIZE - sizeof (GNUNET_HashCode), &ph);
+ if (0 != memcmp (&ph,
+ &pt->plaintext_hash,
+ sizeof (GNUNET_HashCode)))
{
/* checksum failed */
GNUNET_break_op (0);
}
/* process decrypted message(s) */
- update_window (GNUNET_YES,
- &n->available_send_window,
- &n->last_asw_update,
- n->bpm_out);
- n->bpm_out_external_limit = ntohl (pt->inbound_bpm_limit);
- n->bpm_out = GNUNET_MAX (n->bpm_out_external_limit,
- n->bpm_out_internal_limit);
+ if (n->bw_out_external_limit.value__ != pt->inbound_bw_limit.value__)
+ {
+#if DEBUG_CORE_SET_QUOTA
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received %u b/s as new inbound limit for peer `%4s'\n",
+ (unsigned int) ntohl (pt->inbound_bw_limit.value__),
+ GNUNET_i2s (&n->peer));
+#endif
+ n->bw_out_external_limit = pt->inbound_bw_limit;
+ n->bw_out = GNUNET_BANDWIDTH_value_min (n->bw_out_external_limit,
+ n->bw_out_internal_limit);
+ GNUNET_BANDWIDTH_tracker_update_quota (&n->available_send_window,
+ n->bw_out);
+ GNUNET_TRANSPORT_set_quota (transport,
+ &n->peer,
+ n->bw_in,
+ n->bw_out,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ NULL, NULL);
+ }
n->last_activity = GNUNET_TIME_absolute_get ();
+ if (n->keep_alive_task != GNUNET_SCHEDULER_NO_TASK)
+ GNUNET_SCHEDULER_cancel (sched, n->keep_alive_task);
+ n->keep_alive_task
+ = GNUNET_SCHEDULER_add_delayed (sched,
+ GNUNET_TIME_relative_divide (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, 2),
+ &send_keep_alive,
+ n);
off = sizeof (struct EncryptedMessage);
deliver_messages (n, buf, size, off);
}
* Function called by the transport for each received message.
*
* @param cls closure
- * @param latency estimated latency for communicating with the
- * given peer
* @param peer (claimed) identity of the other peer
* @param message the message
+ * @param latency estimated latency for communicating with the
+ * given peer (round-trip)
+ * @param distance in overlay hops, as given by transport plugin
*/
static void
handle_transport_receive (void *cls,
- struct GNUNET_TIME_Relative latency,
const struct GNUNET_PeerIdentity *peer,
- const struct GNUNET_MessageHeader *message)
+ const struct GNUNET_MessageHeader *message,
+ struct GNUNET_TIME_Relative latency,
+ unsigned int distance)
{
struct Neighbour *n;
struct GNUNET_TIME_Absolute now;
#endif
n = find_neighbour (peer);
if (n == NULL)
- {
- GNUNET_break (0);
- return;
- }
+ n = create_neighbour (peer);
+ if (n == NULL)
+ return;
n->last_latency = latency;
- up = n->status == PEER_STATE_KEY_CONFIRMED;
+ n->last_distance = distance;
+ up = (n->status == PEER_STATE_KEY_CONFIRMED);
type = ntohs (message->type);
size = ntohs (message->size);
+#if DEBUG_HANDSHAKE
+ fprintf (stderr,
+ "Received message of type %u from `%4s'\n",
+ type,
+ GNUNET_i2s (peer));
+#endif
switch (type)
{
case GNUNET_MESSAGE_TYPE_CORE_SET_KEY:
GNUNET_break_op (0);
return;
}
+ GNUNET_STATISTICS_update (stats, gettext_noop ("# session keys received"), 1, GNUNET_NO);
handle_set_key (n, (const struct SetKeyMessage *) message);
break;
case GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE:
(n->status != PEER_STATE_KEY_CONFIRMED))
{
GNUNET_break_op (0);
+ /* blacklist briefly (?); might help recover (?) */
+ GNUNET_TRANSPORT_blacklist (sched, cfg,
+ &n->peer,
+ GNUNET_TIME_UNIT_SECONDS,
+ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
+ 5),
+ NULL, NULL);
return;
}
handle_encrypted_message (n, (const struct EncryptedMessage *) message);
GNUNET_break_op (0);
return;
}
+ GNUNET_STATISTICS_update (stats, gettext_noop ("# ping messages received"), 1, GNUNET_NO);
if ((n->status != PEER_STATE_KEY_RECEIVED) &&
(n->status != PEER_STATE_KEY_CONFIRMED))
{
handle_ping (n, (const struct PingMessage *) message);
break;
case GNUNET_MESSAGE_TYPE_CORE_PONG:
- if (size != sizeof (struct PingMessage))
+ if (size != sizeof (struct PongMessage))
{
GNUNET_break_op (0);
return;
}
- if ((n->status != PEER_STATE_KEY_SENT) &&
- (n->status != PEER_STATE_KEY_RECEIVED) &&
- (n->status != PEER_STATE_KEY_CONFIRMED))
+ GNUNET_STATISTICS_update (stats, gettext_noop ("# pong messages received"), 1, GNUNET_NO);
+ if ( (n->status != PEER_STATE_KEY_RECEIVED) &&
+ (n->status != PEER_STATE_KEY_CONFIRMED) )
{
- /* could not decrypt pong, oops! */
- GNUNET_break_op (0);
+#if DEBUG_CORE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Core service receives `%s' request from `%4s' but have not processed key; marking as pending.\n",
+ "PONG", GNUNET_i2s (&n->peer));
+#endif
+ GNUNET_free_non_null (n->pending_pong);
+ n->pending_pong = GNUNET_malloc (sizeof (struct PongMessage));
+ memcpy (n->pending_pong, message, sizeof (struct PongMessage));
return;
}
- handle_pong (n, (const struct PingMessage *) message);
+ handle_pong (n, (const struct PongMessage *) message);
break;
default:
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
now = GNUNET_TIME_absolute_get ();
n->last_activity = now;
if (!up)
- n->time_established = now;
+ {
+ GNUNET_STATISTICS_update (stats, gettext_noop ("# peers connected"), 1, GNUNET_NO);
+ n->time_established = now;
+ }
+ if (n->keep_alive_task != GNUNET_SCHEDULER_NO_TASK)
+ GNUNET_SCHEDULER_cancel (sched, n->keep_alive_task);
+ n->keep_alive_task
+ = GNUNET_SCHEDULER_add_delayed (sched,
+ GNUNET_TIME_relative_divide (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, 2),
+ &send_keep_alive,
+ n);
}
}
-/**
- * Function that recalculates the bandwidth quota for the
- * given neighbour and transmits it to the transport service.
- *
- * @param cls neighbour for the quota update
- * @param tc context
- */
-static void
-neighbour_quota_update (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc);
-
-
-/**
- * Schedule the task that will recalculate the bandwidth
- * quota for this peer (and possibly force a disconnect of
- * idle peers by calculating a bandwidth of zero).
- */
-static void
-schedule_quota_update (struct Neighbour *n)
-{
- GNUNET_assert (n->quota_update_task ==
- GNUNET_SCHEDULER_NO_TASK);
- n->quota_update_task
- = GNUNET_SCHEDULER_add_delayed (sched,
- GNUNET_NO,
- GNUNET_SCHEDULER_PRIORITY_IDLE,
- GNUNET_SCHEDULER_NO_TASK,
- QUOTA_UPDATE_FREQUENCY,
- &neighbour_quota_update,
- n);
-}
-
-
/**
* Function that recalculates the bandwidth quota for the
* given neighbour and transmits it to the transport service.
const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct Neighbour *n = cls;
- uint32_t q_in;
+ struct GNUNET_BANDWIDTH_Value32NBO q_in;
double pref_rel;
double share;
unsigned long long distributable;
+ uint64_t need_per_peer;
+ uint64_t need_per_second;
n->quota_update_task = GNUNET_SCHEDULER_NO_TASK;
/* calculate relative preference among all neighbours;
divides by a bit more to avoid division by zero AND to
account for possibility of new neighbours joining any time
AND to convert to double... */
- pref_rel = n->current_preference / (1.0 + preference_sum);
- share = 0;
+ if (preference_sum == 0)
+ {
+ pref_rel = 1.0 / (double) neighbour_count;
+ }
+ else
+ {
+ pref_rel = n->current_preference / preference_sum;
+ }
+ need_per_peer = GNUNET_BANDWIDTH_value_get_available_until (MIN_BANDWIDTH_PER_PEER,
+ GNUNET_TIME_UNIT_SECONDS);
+ need_per_second = need_per_peer * neighbour_count;
distributable = 0;
- if (bandwidth_target_out > neighbour_count * MIN_BPM_PER_PEER)
- distributable = bandwidth_target_out - neighbour_count * MIN_BPM_PER_PEER;
+ if (bandwidth_target_out_bps > need_per_second)
+ distributable = bandwidth_target_out_bps - need_per_second;
share = distributable * pref_rel;
- q_in = MIN_BPM_PER_PEER + (unsigned long long) share;
+ if (share + need_per_peer > ( (uint32_t)-1))
+ q_in = GNUNET_BANDWIDTH_value_init ((uint32_t) -1);
+ else
+ q_in = GNUNET_BANDWIDTH_value_init (need_per_peer + (uint32_t) share);
/* check if we want to disconnect for good due to inactivity */
if ( (GNUNET_TIME_absolute_get_duration (n->last_activity).value > GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.value) &&
(GNUNET_TIME_absolute_get_duration (n->time_established).value > GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.value) )
- q_in = 0; /* force disconnect */
- if ( (n->bpm_in + MIN_BPM_CHANGE < q_in) ||
- (n->bpm_in - MIN_BPM_CHANGE > q_in) )
{
- n->bpm_in = q_in;
+#if DEBUG_CORE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Forcing disconnect of `%4s' due to inactivity (?).\n",
+ GNUNET_i2s (&n->peer));
+#endif
+ q_in = GNUNET_BANDWIDTH_value_init (0); /* force disconnect */
+ }
+#if DEBUG_CORE_QUOTA
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Current quota for `%4s' is %u/%llu b/s in (old: %u b/s) / %u out (%u internal)\n",
+ GNUNET_i2s (&n->peer),
+ (unsigned int) ntohl (q_in.value__),
+ bandwidth_target_out_bps,
+ (unsigned int) ntohl (n->bw_in.value__),
+ (unsigned int) ntohl (n->bw_out.value__),
+ (unsigned int) ntohl (n->bw_out_internal_limit.value__));
+#endif
+ if (n->bw_in.value__ != q_in.value__)
+ {
+ n->bw_in = q_in;
GNUNET_TRANSPORT_set_quota (transport,
&n->peer,
- n->bpm_in,
- n->bpm_out,
+ n->bw_in,
+ n->bw_out,
GNUNET_TIME_UNIT_FOREVER_REL,
NULL, NULL);
}
* @param cls closure
* @param peer the peer that connected
* @param latency current latency of the connection
+ * @param distance in overlay hops, as given by transport plugin
*/
static void
handle_transport_notify_connect (void *cls,
const struct GNUNET_PeerIdentity *peer,
- struct GNUNET_TIME_Relative latency)
+ struct GNUNET_TIME_Relative latency,
+ unsigned int distance)
{
struct Neighbour *n;
- struct GNUNET_TIME_Absolute now;
struct ConnectNotifyMessage cnm;
n = find_neighbour (peer);
if (n != NULL)
{
- /* duplicate connect notification!? */
- GNUNET_break (0);
- return;
+ if (n->is_connected)
+ {
+ /* duplicate connect notification!? */
+ GNUNET_break (0);
+ return;
+ }
}
- now = GNUNET_TIME_absolute_get ();
- n = GNUNET_malloc (sizeof (struct Neighbour));
- n->next = neighbours;
- neighbours = n;
- neighbour_count++;
- n->peer = *peer;
+ else
+ {
+ n = create_neighbour (peer);
+ }
+ n->is_connected = GNUNET_YES;
n->last_latency = latency;
- GNUNET_CRYPTO_aes_create_session_key (&n->encrypt_key);
- n->encrypt_key_created = now;
- n->set_key_retry_frequency = INITIAL_SET_KEY_RETRY_FREQUENCY;
- n->last_asw_update = now;
- n->last_arw_update = now;
- n->bpm_in = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
- n->bpm_out = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
- n->bpm_out_internal_limit = (uint32_t) - 1;
- n->bpm_out_external_limit = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
- n->ping_challenge = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
- (uint32_t) - 1);
+ n->last_distance = distance;
+ GNUNET_BANDWIDTH_tracker_init (&n->available_send_window,
+ n->bw_out,
+ MAX_WINDOW_TIME_S);
+ GNUNET_BANDWIDTH_tracker_init (&n->available_recv_window,
+ n->bw_in,
+ MAX_WINDOW_TIME_S);
#if DEBUG_CORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Received connection from `%4s'.\n",
GNUNET_i2s (&n->peer));
#endif
- schedule_quota_update (n);
cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
- cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
- cnm.reserved = htonl (0);
+ cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_PRE_CONNECT);
+ cnm.distance = htonl (n->last_distance);
+ cnm.latency = GNUNET_TIME_relative_hton (n->last_latency);
cnm.peer = *peer;
- send_to_all_clients (&cnm.header, GNUNET_YES);
-}
-
-
-/**
- * Free the given entry for the neighbour (it has
- * already been removed from the list at this point).
- *
- * @param n neighbour to free
- */
-static void
-free_neighbour (struct Neighbour *n)
-{
- struct MessageEntry *m;
-
- while (NULL != (m = n->messages))
- {
- n->messages = m->next;
- GNUNET_free (m);
- }
- while (NULL != (m = n->encrypted_head))
- {
- n->encrypted_head = m->next;
- GNUNET_free (m);
- }
- if (NULL != n->th)
- GNUNET_TRANSPORT_notify_transmit_ready_cancel (n->th);
- if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_TASK)
- GNUNET_SCHEDULER_cancel (sched, n->retry_plaintext_task);
- if (n->retry_set_key_task != GNUNET_SCHEDULER_NO_TASK)
- GNUNET_SCHEDULER_cancel (sched, n->retry_set_key_task);
- if (n->quota_update_task != GNUNET_SCHEDULER_NO_TASK)
- GNUNET_SCHEDULER_cancel (sched, n->quota_update_task);
- GNUNET_free_non_null (n->public_key);
- GNUNET_free_non_null (n->pending_ping);
- GNUNET_free (n);
+ send_to_all_clients (&cnm.header, GNUNET_YES, GNUNET_CORE_OPTION_SEND_PRE_CONNECT);
+ GNUNET_TRANSPORT_set_quota (transport,
+ &n->peer,
+ n->bw_in,
+ n->bw_out,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ NULL, NULL);
+ send_key (n);
}
handle_transport_notify_disconnect (void *cls,
const struct GNUNET_PeerIdentity *peer)
{
- struct ConnectNotifyMessage cnm;
+ struct DisconnectNotifyMessage cnm;
struct Neighbour *n;
- struct Neighbour *p;
#if DEBUG_CORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Peer `%4s' disconnected from us.\n", GNUNET_i2s (peer));
#endif
- p = NULL;
- n = neighbours;
- while ((n != NULL) &&
- (0 != memcmp (&n->peer, peer, sizeof (struct GNUNET_PeerIdentity))))
- {
- p = n;
- n = n->next;
- }
+ n = find_neighbour (peer);
if (n == NULL)
{
GNUNET_break (0);
return;
}
- if (p == NULL)
- neighbours = n->next;
- else
- p->next = n->next;
- GNUNET_assert (neighbour_count > 0);
- neighbour_count--;
- cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
+ GNUNET_break (n->is_connected);
+ cnm.header.size = htons (sizeof (struct DisconnectNotifyMessage));
cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_DISCONNECT);
- cnm.reserved = htonl (0);
cnm.peer = *peer;
- send_to_all_clients (&cnm.header, GNUNET_YES);
- free_neighbour (n);
+ send_to_all_clients (&cnm.header, GNUNET_YES, GNUNET_CORE_OPTION_SEND_DISCONNECT);
+ n->is_connected = GNUNET_NO;
}
neighbour_count--;
free_neighbour (n);
}
+ GNUNET_STATISTICS_set (stats, gettext_noop ("# active neighbours"), neighbour_count, GNUNET_NO);
+ GNUNET_SERVER_notification_context_destroy (notifier);
+ notifier = NULL;
while (NULL != (c = clients))
handle_client_disconnect (NULL, c->client_handle);
+ if (my_private_key != NULL)
+ GNUNET_CRYPTO_rsa_key_free (my_private_key);
+ if (stats != NULL)
+ GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
}
struct GNUNET_SERVER_Handle *serv,
const struct GNUNET_CONFIGURATION_Handle *c)
{
-#if 0
- unsigned long long qin;
- unsigned long long qout;
- unsigned long long tneigh;
-#endif
char *keyfile;
sched = s;
- cfg = c;
+ cfg = c;
/* parse configuration */
if (
(GNUNET_OK !=
GNUNET_CONFIGURATION_get_value_number (c,
"CORE",
"TOTAL_QUOTA_IN",
- &bandwidth_target_in)) ||
+ &bandwidth_target_in_bps)) ||
(GNUNET_OK !=
GNUNET_CONFIGURATION_get_value_number (c,
"CORE",
"TOTAL_QUOTA_OUT",
- &bandwidth_target_out)) ||
-#if 0
- (GNUNET_OK !=
- GNUNET_CONFIGURATION_get_value_number (c,
- "CORE",
- "YY",
- &qout)) ||
- (GNUNET_OK !=
- GNUNET_CONFIGURATION_get_value_number (c,
- "CORE",
- "ZZ_LIMIT", &tneigh)) ||
-#endif
+ &bandwidth_target_out_bps)) ||
(GNUNET_OK !=
GNUNET_CONFIGURATION_get_value_filename (c,
"GNUNETD",
sizeof (my_public_key), &my_identity.hashPubKey);
/* setup notification */
server = serv;
+ notifier = GNUNET_SERVER_notification_context_create (server,
+ MAX_NOTIFY_QUEUE);
GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL);
/* setup transport connection */
transport = GNUNET_TRANSPORT_connect (sched,
&handle_transport_notify_connect,
&handle_transport_notify_disconnect);
GNUNET_assert (NULL != transport);
+ stats = GNUNET_STATISTICS_create (sched, "core", cfg);
GNUNET_SCHEDULER_add_delayed (sched,
- GNUNET_YES,
- GNUNET_SCHEDULER_PRIORITY_IDLE,
- GNUNET_SCHEDULER_NO_TASK,
GNUNET_TIME_UNIT_FOREVER_REL,
&cleaning_task, NULL);
/* process client requests */
}
-/**
- * Function called during shutdown. Clean up our state.
- */
-static void
-cleanup (void *cls,
- const struct GNUNET_CONFIGURATION_Handle *cfg)
-{
- if (my_private_key != NULL)
- GNUNET_CRYPTO_rsa_key_free (my_private_key);
-}
-
/**
* The main function for the transport service.
return (GNUNET_OK ==
GNUNET_SERVICE_run (argc,
argv,
- "core", &run, NULL, &cleanup, NULL)) ? 0 : 1;
+ "core",
+ GNUNET_SERVICE_OPTION_NONE,
+ &run, NULL)) ? 0 : 1;
}
/* end of gnunet-service-core.c */