#include "gnunet_time_lib.h"
#include "gnunet_dv_service.h"
#include "dv.h"
+#include "../transport/plugin_transport.h"
struct PendingMessages
*/
void *receive_cls;
+ /**
+ * Current unique ID
+ */
+ uint32_t uid_gen;
+
+ /**
+ * Hashmap containing outstanding send requests awaiting confirmation.
+ */
+ struct GNUNET_CONTAINER_MultiHashMap *send_callbacks;
+
};
struct StartContext
{
-
/**
* Start message
*/
struct GNUNET_DV_Handle *handle;
};
+struct SendCallbackContext
+{
+ /**
+ * The continuation to call once a message is confirmed sent (or failed)
+ */
+ GNUNET_TRANSPORT_TransmitContinuation cont;
+
+ /**
+ * Closure to call with send continuation.
+ */
+ void *cont_cls;
+
+ /**
+ * Target of the message.
+ */
+ struct GNUNET_PeerIdentity target;
+};
+
+/**
+ * Convert unique ID to hash code.
+ *
+ * @param uid unique ID to convert
+ * @param hash set to uid (extended with zeros)
+ */
+static void
+hash_from_uid (uint32_t uid,
+ GNUNET_HashCode *hash)
+{
+ memset (hash, 0, sizeof(GNUNET_HashCode));
+ *((uint32_t*)hash) = uid;
+}
/**
* Try to (re)connect to the dv service.
process_pending_message(handle);
}
-
+/**
+ * Handles a message sent from the DV service to us.
+ * Parse it out and give it to the plugin.
+ *
+ * @param cls the handle to the DV API
+ * @param msg the message that was received
+ */
void handle_message_receipt (void *cls,
const struct GNUNET_MessageHeader * msg)
{
struct GNUNET_DV_Handle *handle = cls;
struct GNUNET_DV_MessageReceived *received_msg;
+ struct GNUNET_DV_SendResultMessage *send_result_msg;
size_t packed_msg_len;
size_t sender_address_len;
char *sender_address;
char *packed_msg;
char *packed_msg_start;
+ GNUNET_HashCode uidhash;
+ struct SendCallbackContext *send_ctx;
if (msg == NULL)
{
return; /* Connection closed? */
}
- GNUNET_assert(ntohs(msg->type) == GNUNET_MESSAGE_TYPE_TRANSPORT_DV_RECEIVE);
+ GNUNET_assert((ntohs(msg->type) == GNUNET_MESSAGE_TYPE_TRANSPORT_DV_RECEIVE) || (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_TRANSPORT_DV_SEND_RESULT));
- if (ntohs(msg->size) < sizeof(struct GNUNET_DV_MessageReceived))
- return;
+ switch (ntohs(msg->type))
+ {
+ case GNUNET_MESSAGE_TYPE_TRANSPORT_DV_RECEIVE:
+ if (ntohs(msg->size) < sizeof(struct GNUNET_DV_MessageReceived))
+ return;
- received_msg = (struct GNUNET_DV_MessageReceived *)msg;
- packed_msg_len = ntohl(received_msg->msg_len);
- sender_address_len = ntohl(received_msg->sender_address_len);
+ received_msg = (struct GNUNET_DV_MessageReceived *)msg;
+ packed_msg_len = ntohl(received_msg->msg_len);
+ sender_address_len = ntohl(received_msg->sender_address_len);
- GNUNET_assert(ntohs(msg->size) == (sizeof(struct GNUNET_DV_MessageReceived) + packed_msg_len + sender_address_len));
+ GNUNET_assert(ntohs(msg->size) == (sizeof(struct GNUNET_DV_MessageReceived) + packed_msg_len + sender_address_len));
#if DEBUG_DV
- GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "dv api receives message, size checks out!\n");
+ GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "dv api receives message, size checks out!\n");
#endif
- sender_address = GNUNET_malloc(sender_address_len);
- memcpy(sender_address, &received_msg[1], sender_address_len);
- packed_msg_start = (char *)&received_msg[1];
- packed_msg = GNUNET_malloc(packed_msg_len);
- memcpy(packed_msg, &packed_msg_start[sender_address_len], packed_msg_len);
+ sender_address = GNUNET_malloc(sender_address_len);
+ memcpy(sender_address, &received_msg[1], sender_address_len);
+ packed_msg_start = (char *)&received_msg[1];
+ packed_msg = GNUNET_malloc(packed_msg_len);
+ memcpy(packed_msg, &packed_msg_start[sender_address_len], packed_msg_len);
#if DEBUG_DV
- GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "DV_API receive: packed message type: %d or %d\n", ntohs(((struct GNUNET_MessageHeader *)packed_msg)->type), ((struct GNUNET_MessageHeader *)packed_msg)->type);
- GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "DV_API receive: message sender reported as %s\n", GNUNET_i2s(&received_msg->sender));
- GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "DV_API receive: distance is %u\n", ntohl(received_msg->distance));
+ GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "DV_API receive: packed message type: %d or %d\n", ntohs(((struct GNUNET_MessageHeader *)packed_msg)->type), ((struct GNUNET_MessageHeader *)packed_msg)->type);
+ GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "DV_API receive: message sender reported as %s\n", GNUNET_i2s(&received_msg->sender));
+ GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "DV_API receive: distance is %u\n", ntohl(received_msg->distance));
#endif
- handle->receive_handler(handle->receive_cls,
- &received_msg->sender,
- packed_msg,
- packed_msg_len,
- ntohl(received_msg->distance),
- sender_address,
- sender_address_len);
+ handle->receive_handler(handle->receive_cls,
+ &received_msg->sender,
+ packed_msg,
+ packed_msg_len,
+ ntohl(received_msg->distance),
+ sender_address,
+ sender_address_len);
+
+ GNUNET_free(sender_address);
+ break;
+ case GNUNET_MESSAGE_TYPE_TRANSPORT_DV_SEND_RESULT:
+ if (ntohs(msg->size) < sizeof(struct GNUNET_DV_SendResultMessage))
+ return;
- GNUNET_free(sender_address);
+ send_result_msg = (struct GNUNET_DV_SendResultMessage *)msg;
+ hash_from_uid(ntohl(send_result_msg->uid), &uidhash);
+ send_ctx = GNUNET_CONTAINER_multihashmap_get(handle->send_callbacks, &uidhash);
+ GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "got uid of %u or %u, hash of %s !!!!\n", ntohl(send_result_msg->uid), send_result_msg->uid, GNUNET_h2s(&uidhash));
+ if ((send_ctx != NULL) && (send_ctx->cont != NULL))
+ {
+#if DEBUG_DV
+ GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "dv api notifies transport of send result (%u)!\n", ntohl(send_result_msg->result));
+#endif
+ if (ntohl(send_result_msg->result) == 0)
+ {
+ send_ctx->cont(send_ctx->cont_cls, &send_ctx->target, GNUNET_OK);
+ }
+ else
+ {
+ send_ctx->cont(send_ctx->cont_cls, &send_ctx->target, GNUNET_SYSERR);
+ }
+ }
+ GNUNET_free_non_null(send_ctx);
+ break;
+ default:
+ break;
+ }
GNUNET_CLIENT_receive (handle->client,
&handle_message_receipt,
handle, GNUNET_TIME_UNIT_FOREVER_REL);
unsigned int priority,
struct GNUNET_TIME_Relative timeout,
const void *addr,
- size_t addrlen)
+ size_t addrlen,
+ GNUNET_TRANSPORT_TransmitContinuation
+ cont, void *cont_cls)
{
struct GNUNET_DV_SendMessage *msg;
+ struct SendCallbackContext *send_ctx;
char *end_of_message;
- /* FIXME: Copy message to end of thingy, can't just allocate dummy! */
+ GNUNET_HashCode uidhash;
#if DEBUG_DV
GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "DV SEND called with message of size %d, address size %d, total size to send is %d\n", msgbuf_size, addrlen, sizeof(struct GNUNET_DV_SendMessage) + msgbuf_size + addrlen);
#endif
+ dv_handle->uid_gen++;
msg = GNUNET_malloc(sizeof(struct GNUNET_DV_SendMessage) + addrlen + msgbuf_size);
msg->header.size = htons(sizeof(struct GNUNET_DV_SendMessage) + addrlen + msgbuf_size);
msg->header.type = htons(GNUNET_MESSAGE_TYPE_TRANSPORT_DV_SEND);
msg->priority = htonl(priority);
msg->timeout = timeout;
msg->addrlen = htonl(addrlen);
+ msg->uid = htonl(dv_handle->uid_gen);
memcpy(&msg[1], addr, addrlen);
end_of_message = (char *)&msg[1];
end_of_message = &end_of_message[addrlen];
memcpy(end_of_message, msgbuf, msgbuf_size);
add_pending(dv_handle, msg);
+ send_ctx = GNUNET_malloc(sizeof(struct SendCallbackContext));
+
+ send_ctx->cont = cont;
+ if (cont == NULL)
+ fprintf(stderr, "DV_SEND called with null continuation!\n");
+ send_ctx->cont_cls = cont_cls;
+ memcpy(&send_ctx->target, target, sizeof(struct GNUNET_PeerIdentity));
+
+ hash_from_uid(dv_handle->uid_gen, &uidhash);
+
+ GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "set uid of %u or %u, hash of %s !!!!\n", dv_handle->uid_gen, htonl(dv_handle->uid_gen), GNUNET_h2s(&uidhash));
+ GNUNET_CONTAINER_multihashmap_put(dv_handle->send_callbacks, &uidhash, send_ctx, GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
+
return GNUNET_OK;
}
GNUNET_YES,
&transmit_start, start_context);
+ handle->send_callbacks = GNUNET_CONTAINER_multihashmap_create(100);
+
GNUNET_CLIENT_receive (handle->client,
&handle_message_receipt,
handle, GNUNET_TIME_UNIT_FOREVER_REL);
* How often do we check about sending out more peer information (if
* we are connected to no peers previously).
*/
-#define GNUNET_DV_DEFAULT_SEND_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 500)
+#define GNUNET_DV_DEFAULT_SEND_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 50000)
/**
* How long do we wait at most between sending out information?
*/
-#define GNUNET_DV_MAX_SEND_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 500)
+#define GNUNET_DV_MAX_SEND_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 50000)
/**
* How long can we have not heard from a peer and
#define DIRECT_NEIGHBOR_COST 1
/**
- * The client, should be the DV plugin connected to us. Hopefully
+ * The default number of direct connections to store in DV (max)
+ */
+#define DEFAULT_DIRECT_CONNECTIONS 50
+
+/**
+ * The default size of direct + extended peers in DV (max)
+ */
+#define DEFAULT_DV_SIZE 100
+
+/**
+ * The default fisheye depth, from how many hops away will
+ * we keep peers?
+ */
+#define DEFAULT_FISHEYE_DEPTH 4
+
+/**
+ * The client, the DV plugin connected to us. Hopefully
* this client will never change, although if the plugin dies
* and returns for some reason it may happen.
*/
*/
struct DistantNeighbor *referees;
-static struct GNUNET_TIME_Relative client_transmit_timeout;
-
-static struct GNUNET_TIME_Relative default_dv_delay;
-
static size_t default_dv_priority = 0;
*/
struct PendingMessage *prev;
+ struct GNUNET_DV_SendResultMessage *send_result;
/**
* Actual message to be sent; // avoid allocation
*/
struct PendingMessage *core_pending_tail;
+struct FastGossipNeighborList
+{
+ /**
+ * Next element of DLL
+ */
+ struct FastGossipNeighborList *next;
+
+ /**
+ * Prev element of DLL
+ */
+ struct FastGossipNeighborList *prev;
+
+ /**
+ * The neighbor to gossip about
+ */
+ struct DistantNeighbor *about;
+};
+
/**
* Context created whenever a direct peer connects to us,
* used to gossip other peers to it.
struct DirectNeighbor *toNeighbor;
/**
- * The timeout for this task.
+ * The task associated with this context.
*/
- struct GNUNET_TIME_Relative timeout;
+ GNUNET_SCHEDULER_TaskIdentifier task;
/**
- * The task associated with this context.
+ * Head of DLL of peers to gossip about
+ * as fast as possible to this peer, for initial
+ * set up.
*/
- GNUNET_SCHEDULER_TaskIdentifier task;
+ struct FastGossipNeighborList *fast_gossip_list_head;
+
+ /**
+ * Tail of DLL of peers to gossip about
+ * as fast as possible to this peer, for initial
+ * set up.
+ */
+ struct FastGossipNeighborList *fast_gossip_list_tail;
};
*/
struct GNUNET_MessageHeader *message;
+ /**
+ * The pre-built send result message. Simply needs to be queued
+ * and freed once send has been called!
+ */
+ struct GNUNET_DV_SendResultMessage *send_result;
+
/**
* The size of the message being sent, may be larger
* than message->header.size because it's multiple
struct DistantNeighbor *dest;
};
+struct DisconnectContext
+{
+ /**
+ * Distant neighbor to get pid from.
+ */
+ struct DistantNeighbor *distant;
+
+ /**
+ * Direct neighbor that disconnected.
+ */
+ struct DirectNeighbor *direct;
+};
/**
* We've been given a target ID based on the random numbers that
return GNUNET_NO;
}
+/**
+ * Find a distant peer whose referrer_id matches what we're
+ * looking for. For looking up a peer we've gossipped about
+ * but is now disconnected. Need to do this because we don't
+ * want to remove those that may be accessible via a different
+ * route.
+ */
+static int find_distant_peer (void *cls,
+ const GNUNET_HashCode * key,
+ void *value)
+{
+ struct FindDestinationContext *fdc = cls;
+ struct DistantNeighbor *distant = value;
+
+ if (fdc->tid == distant->referrer_id)
+ {
+ fdc->dest = distant;
+ return GNUNET_NO;
+ }
+ return GNUNET_YES;
+}
+
/**
* Function called to notify a client about the socket
* begin ready to queue more data. "buf" will be
return off;
}
-
+/**
+ * Send a message to the dv plugin.
+ *
+ * @param sender the direct sender of the message
+ * @param message the message to send to the plugin
+ * (may be an encapsulated type)
+ * @param message_size the size of the message to be sent
+ * @param distant_neighbor the original sender of the message
+ * @param cost the cost to the original sender of the message
+ */
void send_to_plugin(const struct GNUNET_PeerIdentity * sender,
const struct GNUNET_MessageHeader *message,
size_t message_size,
/**
* Function called to notify a client about the socket
- * begin ready to queue more data. "buf" will be
+ * being ready to queue more data. "buf" will be
* NULL and "size" zero if the socket was closed for
* writing in the meantime.
*
{
char *cbuf = buf;
struct PendingMessage *reply;
+ struct PendingMessage *client_reply;
size_t off;
size_t msize;
GNUNET_CONTAINER_DLL_remove (core_pending_head,
core_pending_tail,
reply);
+ if (reply->send_result != NULL) /* Will only be non-null if a real client asked for this send */
+ {
+ client_reply = GNUNET_malloc(sizeof(struct PendingMessage) + sizeof(struct GNUNET_DV_SendResultMessage));
+ client_reply->msg = (struct GNUNET_MessageHeader *)&client_reply[1];
+ memcpy(&client_reply[1], reply->send_result, sizeof(struct GNUNET_DV_SendResultMessage));
+ GNUNET_free(reply->send_result);
+
+ GNUNET_CONTAINER_DLL_insert_after(plugin_pending_head, plugin_pending_tail, plugin_pending_tail, client_reply);
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Queued client send receipt success message!\n");
+ if (client_handle != NULL)
+ {
+ if (plugin_transmit_handle == NULL)
+ {
+ plugin_transmit_handle = GNUNET_SERVER_notify_transmit_ready (client_handle,
+ sizeof(struct GNUNET_DV_SendResultMessage),
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &transmit_to_plugin, NULL);
+ }
+ else
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Failed to queue message for plugin, must be one in progress already!!\n");
+ }
+ }
+ }
memcpy (&cbuf[off], reply->msg, msize);
GNUNET_free (reply);
off += msize;
cost = specific_neighbor->cost;
pending_message = GNUNET_malloc(sizeof(struct PendingMessage) + msg_size);
pending_message->msg = (struct GNUNET_MessageHeader *)&pending_message[1];
+ pending_message->send_result = send_context->send_result;
toSend = (p2p_dv_MESSAGE_Data *)pending_message->msg;
toSend->header.size = htons (msg_size);
toSend->header.type = htons (GNUNET_MESSAGE_TYPE_DV_DATA);
cost = target->cost;
pending_message = GNUNET_malloc(sizeof(struct PendingMessage) + msg_size);
pending_message->msg = (struct GNUNET_MessageHeader *)&pending_message[1];
+ pending_message->send_result = NULL;
toSend = (p2p_dv_MESSAGE_Data *)pending_message->msg;
toSend->header.size = htons (msg_size);
toSend->header.type = htons (GNUNET_MESSAGE_TYPE_DV_DATA);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"%s: Sends message size %d on!\n", "dv", packed_message_size);
#endif
- ret = send_message(&destination, &original_sender, NULL, packed_message, packed_message_size, default_dv_priority, default_dv_delay);
+ ret = send_message(&destination, &original_sender, NULL, packed_message, packed_message_size, default_dv_priority, GNUNET_TIME_relative_get_forever());
if (ret != GNUNET_SYSERR)
return GNUNET_OK;
const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct NeighborSendContext *send_context = cls;
-#if DEBUG_DV_GOSSIP
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%s: Entering neighbor_send_task...\n",
- GNUNET_i2s(&my_identity));
+#if DEBUG_DV_GOSSIP_SEND
char * encPeerAbout;
char * encPeerTo;
#endif
struct DistantNeighbor *about;
struct DirectNeighbor *to;
+ struct FastGossipNeighborList *about_list;
p2p_dv_MESSAGE_NeighborInfo *message;
struct PendingMessage *pending_message;
"%s: Called with reason shutdown, shutting down!\n",
GNUNET_i2s(&my_identity));
#endif
- send_context->toNeighbor->send_context = NULL;
- GNUNET_free(send_context);
+ send_context->task = GNUNET_SCHEDULER_NO_TASK;
return;
}
-
- /* FIXME: this may become a problem, because the heap walk has only one internal "walker". This means
- * that if two neighbor_send_tasks are operating in lockstep (which is quite possible, given default
- * values for all connected peers) there may be a serious bias as to which peers get gossiped about!
- * Probably the *best* way to fix would be to have an opaque pointer to the walk position passed as
- * part of the walk_get_next call. Then the heap would have to keep a list of walks, or reset the walk
- * whenever a modification has been detected. Yuck either way. Perhaps we could iterate over the heap
- * once to get a list of peers to gossip about and gossip them over time... But then if one goes away
- * in the mean time that becomes nasty. For now we'll just assume that the walking is done
- * asynchronously enough to avoid major problems (-;
- */
- about = GNUNET_CONTAINER_heap_walk_get_next (ctx.neighbor_min_heap);
+ if (send_context->fast_gossip_list_head != NULL)
+ {
+ about_list = send_context->fast_gossip_list_head;
+ about = send_context->fast_gossip_list_head->about;
+ GNUNET_CONTAINER_DLL_remove(send_context->fast_gossip_list_head,
+ send_context->fast_gossip_list_tail,
+ about_list);
+ GNUNET_free(about_list);
+ }
+ else
+ {
+ /* FIXME: this may become a problem, because the heap walk has only one internal "walker". This means
+ * that if two neighbor_send_tasks are operating in lockstep (which is quite possible, given default
+ * values for all connected peers) there may be a serious bias as to which peers get gossiped about!
+ * Probably the *best* way to fix would be to have an opaque pointer to the walk position passed as
+ * part of the walk_get_next call. Then the heap would have to keep a list of walks, or reset the walk
+ * whenever a modification has been detected. Yuck either way. Perhaps we could iterate over the heap
+ * once to get a list of peers to gossip about and gossip them over time... But then if one goes away
+ * in the mean time that becomes nasty. For now we'll just assume that the walking is done
+ * asynchronously enough to avoid major problems (-;
+ */
+ about = GNUNET_CONTAINER_heap_walk_get_next (ctx.neighbor_min_heap);
+ }
to = send_context->toNeighbor;
if ((about != NULL) && (to != about->referrer /* split horizon */ ) &&
&to->identity, sizeof (struct GNUNET_PeerIdentity))) &&
(about->pkey != NULL))
{
-#if DEBUG_DV_GOSSIP
+#if DEBUG_DV_GOSSIP_SEND
encPeerAbout = GNUNET_strdup(GNUNET_i2s(&about->identity));
encPeerTo = GNUNET_strdup(GNUNET_i2s(&to->identity));
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"%s: Sending info about peer %s to directly connected peer %s\n",
GNUNET_i2s(&my_identity),
encPeerAbout, encPeerTo);
pending_message);
if (core_transmit_handle == NULL)
- core_transmit_handle = GNUNET_CORE_notify_transmit_ready(coreAPI, default_dv_priority, send_context->timeout, &to->identity, sizeof(p2p_dv_MESSAGE_NeighborInfo), &core_transmit_notify, NULL);
+ core_transmit_handle = GNUNET_CORE_notify_transmit_ready(coreAPI, default_dv_priority, GNUNET_TIME_relative_get_forever(), &to->identity, sizeof(p2p_dv_MESSAGE_NeighborInfo), &core_transmit_notify, NULL);
}
- send_context->task = GNUNET_SCHEDULER_add_delayed(sched, GNUNET_DV_DEFAULT_SEND_INTERVAL, &neighbor_send_task, send_context);
+ if (send_context->fast_gossip_list_head != NULL) /* If there are other peers in the fast list, schedule right away */
+ {
+ GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "DV SERVICE: still in fast send mode\n");
+ send_context->task = GNUNET_SCHEDULER_add_now(sched, &neighbor_send_task, send_context);
+ }
+ else
+ {
+ GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "DV SERVICE: entering slow send mode\n");
+ send_context->task = GNUNET_SCHEDULER_add_delayed(sched, GNUNET_DV_DEFAULT_SEND_INTERVAL, &neighbor_send_task, send_context);
+ }
+
return;
}
* @param message the actual message
*/
void handle_dv_send_message (void *cls,
- struct GNUNET_SERVER_Client * client,
- const struct GNUNET_MessageHeader * message)
+ struct GNUNET_SERVER_Client * client,
+ const struct GNUNET_MessageHeader * message)
{
struct GNUNET_DV_SendMessage *send_msg;
+ struct GNUNET_DV_SendResultMessage *send_result_msg;
+ struct PendingMessage *pending_message;
size_t address_len;
size_t message_size;
struct GNUNET_PeerIdentity *destination;
GNUNET_assert(address_len == sizeof(struct GNUNET_PeerIdentity) * 2);
message_size = ntohl(send_msg->msgbuf_size);
-#if DEBUG_DV
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+#if 1
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"%s: Receives %s message size %u!\n\n\n", "dv", "SEND", message_size);
#endif
GNUNET_assert(ntohs(message->size) == sizeof(struct GNUNET_DV_SendMessage) + address_len + message_size);
GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "%s: asked to send message to `%s', but address is for `%s'!", "DV SERVICE", GNUNET_i2s(&send_msg->target), (const char *)&dest_hash.encoding);
}
-#if DEBUG_DV
+#if 1
GNUNET_CRYPTO_hash_to_enc (&destination->hashPubKey, &dest_hash); /* GNUNET_i2s won't properly work, need to hash one ourselves */
dest_hash.encoding[4] = '\0';
- GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "DV SEND called with message of size %d type %d, destination `%s' via `%s'\n", message_size, ntohs(message_buf->type), (const char *)&dest_hash.encoding, GNUNET_i2s(direct));
+ GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "DV SEND called with message of size %d type %d, destination `%s' via `%s'\n", message_size, ntohs(message_buf->type), (const char *)&dest_hash.encoding, GNUNET_i2s(direct));
#endif
send_context = GNUNET_malloc(sizeof(struct DV_SendContext));
+ send_result_msg = GNUNET_malloc(sizeof(struct GNUNET_DV_SendResultMessage));
+ send_result_msg->header.size = htons(sizeof(struct GNUNET_DV_SendResultMessage));
+ send_result_msg->header.type = htons(GNUNET_MESSAGE_TYPE_TRANSPORT_DV_SEND_RESULT);
+ send_result_msg->uid = send_msg->uid; /* No need to ntohl->htonl this */
+
send_context->importance = ntohl(send_msg->priority);
send_context->timeout = send_msg->timeout;
send_context->direct_peer = direct;
send_context->distant_peer = destination;
send_context->message = message_buf;
send_context->message_size = message_size;
+ send_context->send_result = send_result_msg;
/* In bizarro world GNUNET_SYSERR indicates that we succeeded */
if (GNUNET_SYSERR != GNUNET_CONTAINER_multihashmap_get_multiple(ctx.extended_neighbors, &destination->hashPubKey, &send_iterator, send_context))
{
+ send_result_msg->result = htons(1);
+ pending_message = GNUNET_malloc(sizeof(struct PendingMessage) + sizeof(struct GNUNET_DV_SendResultMessage));
+ pending_message->msg = (struct GNUNET_MessageHeader *)&pending_message[1];
+ memcpy(&pending_message[1], send_result_msg, sizeof(struct GNUNET_DV_SendResultMessage));
+ GNUNET_free(send_result_msg);
+
+ GNUNET_CONTAINER_DLL_insert_after(plugin_pending_head, plugin_pending_tail, plugin_pending_tail, pending_message);
+
+ if (client_handle != NULL)
+ {
+ if (plugin_transmit_handle == NULL)
+ {
+ plugin_transmit_handle = GNUNET_SERVER_notify_transmit_ready (client_handle,
+ sizeof(struct GNUNET_DV_SendResultMessage),
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &transmit_to_plugin, NULL);
+ }
+ else
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Failed to queue message for plugin, must be one in progress already!!\n");
+ }
+ }
GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "DV SEND failed to send message to destination `%s' via `%s'\n", (const char *)&dest_hash.encoding, GNUNET_i2s(direct));
}
+ else
+ {
+
+ }
GNUNET_free(message_buf);
GNUNET_free(send_context);
GNUNET_SERVER_receive_done(client, GNUNET_OK);
}
+/** Forward declarations **/
static int handle_dv_gossip_message (void *cls,
const struct GNUNET_PeerIdentity *peer,
const struct GNUNET_MessageHeader *message,
struct GNUNET_TIME_Relative latency,
uint32_t distance);
+static int handle_dv_disconnect_message (void *cls,
+ const struct GNUNET_PeerIdentity *peer,
+ const struct GNUNET_MessageHeader *message,
+ struct GNUNET_TIME_Relative latency,
+ uint32_t distance);
+/** End forward declarations **/
+
+
/**
* List of handlers for the messages understood by this
* service.
static struct GNUNET_CORE_MessageHandler core_handlers[] = {
{&handle_dv_data_message, GNUNET_MESSAGE_TYPE_DV_DATA, 0},
{&handle_dv_gossip_message, GNUNET_MESSAGE_TYPE_DV_GOSSIP, 0},
+ {&handle_dv_disconnect_message, GNUNET_MESSAGE_TYPE_DV_DISCONNECT, 0},
{NULL, 0, 0}
};
{NULL, NULL, 0, 0}
};
+/**
+ * Free a DistantNeighbor node, including removing it
+ * from the referer's list.
+ */
+static void
+distant_neighbor_free (struct DistantNeighbor *referee)
+{
+ struct DirectNeighbor *referrer;
+
+ referrer = referee->referrer;
+ if (referrer != NULL)
+ {
+ GNUNET_CONTAINER_DLL_remove (referrer->referee_head,
+ referrer->referee_tail, referee);
+ }
+ GNUNET_CONTAINER_heap_remove_node (ctx.neighbor_max_heap, referee->max_loc);
+ GNUNET_CONTAINER_heap_remove_node (ctx.neighbor_min_heap, referee->min_loc);
+ GNUNET_CONTAINER_multihashmap_remove_all (ctx.extended_neighbors,
+ &referee->identity.hashPubKey);
+ GNUNET_free (referee->pkey);
+ GNUNET_free (referee);
+}
+
+/**
+ * Free a DirectNeighbor node, including removing it
+ * from the referer's list.
+ */
+static void
+direct_neighbor_free (struct DirectNeighbor *direct)
+{
+ struct NeighborSendContext *send_context;
+ struct FastGossipNeighborList *about_list;
+ struct FastGossipNeighborList *prev_about;
+
+ send_context = direct->send_context;
+
+ if (send_context->task != GNUNET_SCHEDULER_NO_TASK)
+ GNUNET_SCHEDULER_cancel(sched, send_context->task);
+
+ about_list = send_context->fast_gossip_list_head;
+ while (about_list != NULL)
+ {
+ GNUNET_CONTAINER_DLL_remove(send_context->fast_gossip_list_head, send_context->fast_gossip_list_tail, about_list);
+ prev_about = about_list;
+ about_list = about_list->next;
+ GNUNET_free(prev_about);
+ }
+ GNUNET_free(send_context);
+ GNUNET_free(direct);
+}
+
+/**
+ * Multihashmap iterator for sending out disconnect messages
+ * for a peer.
+ *
+ * @param cls the peer that was disconnected
+ * @param key key value stored under
+ * @param value the direct neighbor to send disconnect to
+ *
+ * @return GNUNET_YES to continue iteration, GNUNET_NO to stop
+ */
+static int schedule_disconnect_messages (void *cls,
+ const GNUNET_HashCode * key,
+ void *value)
+{
+ struct DisconnectContext *disconnect_context = cls;
+ struct DirectNeighbor *disconnected = disconnect_context->direct;
+ struct DirectNeighbor *notify = value;
+ struct PendingMessage *pending_message;
+ p2p_dv_MESSAGE_Disconnect *disconnect_message;
+
+ if (memcmp(¬ify->identity, &disconnected->identity, sizeof(struct GNUNET_PeerIdentity)) == 0)
+ return GNUNET_YES; /* Don't send disconnect message to peer that disconnected! */
+
+ pending_message = GNUNET_malloc(sizeof(struct PendingMessage) + sizeof(p2p_dv_MESSAGE_Disconnect));
+ pending_message->msg = (struct GNUNET_MessageHeader *)&pending_message[1];
+ disconnect_message = (p2p_dv_MESSAGE_Disconnect *)pending_message->msg;
+ disconnect_message->header.size = htons (sizeof (p2p_dv_MESSAGE_Disconnect));
+ disconnect_message->header.type = htons (GNUNET_MESSAGE_TYPE_DV_DISCONNECT);
+ disconnect_message->peer_id = htonl(disconnect_context->distant->our_id);
+
+ GNUNET_CONTAINER_DLL_insert_after (core_pending_head,
+ core_pending_tail,
+ core_pending_tail,
+ pending_message);
+
+ if (core_transmit_handle == NULL)
+ core_transmit_handle = GNUNET_CORE_notify_transmit_ready(coreAPI, default_dv_priority, GNUNET_TIME_relative_get_forever(), ¬ify->identity, sizeof(p2p_dv_MESSAGE_Disconnect), &core_transmit_notify, NULL);
+
+ return GNUNET_YES;
+}
+
+/**
+ * Multihashmap iterator for freeing extended neighbors.
+ *
+ * @param cls NULL
+ * @param key key value stored under
+ * @param value the distant neighbor to be freed
+ *
+ * @return GNUNET_YES to continue iteration, GNUNET_NO to stop
+ */
+static int free_extended_neighbors (void *cls,
+ const GNUNET_HashCode * key,
+ void *value)
+{
+ struct DistantNeighbor *distant = value;
+ distant_neighbor_free(distant);
+ return GNUNET_YES;
+}
+
+/**
+ * Multihashmap iterator for freeing direct neighbors.
+ *
+ * @param cls NULL
+ * @param key key value stored under
+ * @param value the direct neighbor to be freed
+ *
+ * @return GNUNET_YES to continue iteration, GNUNET_NO to stop
+ */
+static int free_direct_neighbors (void *cls,
+ const GNUNET_HashCode * key,
+ void *value)
+{
+ struct DirectNeighbor *direct = value;
+ direct_neighbor_free(direct);
+ return GNUNET_YES;
+}
/**
* Task run during shutdown.
#if DEBUG_DV
GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "calling CORE_DISCONNECT\n");
#endif
+ GNUNET_CONTAINER_multihashmap_iterate(ctx.extended_neighbors, &free_extended_neighbors, NULL);
+ GNUNET_CONTAINER_multihashmap_destroy(ctx.extended_neighbors);
+ GNUNET_CONTAINER_multihashmap_iterate(ctx.direct_neighbors, &free_direct_neighbors, NULL);
+ GNUNET_CONTAINER_multihashmap_destroy(ctx.direct_neighbors);
+
+ GNUNET_CONTAINER_heap_destroy(ctx.neighbor_max_heap);
+ GNUNET_CONTAINER_heap_destroy(ctx.neighbor_min_heap);
+
GNUNET_CORE_disconnect (coreAPI);
GNUNET_PEERINFO_disconnect(peerinfo_handle);
#if DEBUG_DV
}
-/**
- * Free a DistantNeighbor node, including removing it
- * from the referer's list.
- */
-static void
-distant_neighbor_free (struct DistantNeighbor *referee)
-{
- struct DirectNeighbor *referrer;
-
- referrer = referee->referrer;
- if (referrer != NULL)
- {
- GNUNET_CONTAINER_DLL_remove (referrer->referee_head,
- referrer->referee_tail, referee);
- }
- GNUNET_CONTAINER_heap_remove_node (ctx.neighbor_max_heap, referee->max_loc);
- GNUNET_CONTAINER_heap_remove_node (ctx.neighbor_min_heap, referee->min_loc);
- GNUNET_CONTAINER_multihashmap_remove_all (ctx.extended_neighbors,
- &referee->identity.hashPubKey);
- GNUNET_free (referee);
-}
-
-
#if DEBUG_DV_GOSSIP
/**
* Iterator over hash map entries.
}
+/**
+ * Core handler for dv disconnect messages. These will be used
+ * by us to tell transport via the dv plugin that a peer can
+ * no longer be contacted by us via a certain address. We should
+ * then propagate these messages on, given that the distance to
+ * the peer indicates we would have gossiped about it to others.
+ *
+ * @param cls closure
+ * @param peer peer which sent the message (immediate sender)
+ * @param message the message
+ * @param latency the latency of the connection we received the message from
+ * @param distance the distance to the immediate peer
+ */
+static int handle_dv_disconnect_message (void *cls,
+ const struct GNUNET_PeerIdentity *peer,
+ const struct GNUNET_MessageHeader *message,
+ struct GNUNET_TIME_Relative latency,
+ uint32_t distance)
+{
+ struct DirectNeighbor *referrer;
+ struct DistantNeighbor *distant;
+ p2p_dv_MESSAGE_Disconnect *enc_message = (p2p_dv_MESSAGE_Disconnect *)message;
+
+ if (ntohs (message->size) < sizeof (p2p_dv_MESSAGE_Disconnect))
+ {
+ return GNUNET_SYSERR; /* invalid message */
+ }
+
+ referrer = GNUNET_CONTAINER_multihashmap_get (ctx.direct_neighbors,
+ &peer->hashPubKey);
+ if (referrer == NULL)
+ return GNUNET_OK;
+
+ distant = referrer->referee_head;
+ while (distant != NULL)
+ {
+ if (distant->referrer_id == ntohl(enc_message->peer_id))
+ {
+ distant_neighbor_free(distant);
+ }
+ }
+
+ return GNUNET_OK;
+}
+
+
/**
* Core handler for dv gossip messages. These will be used
* by us to create a HELLO message for the newly peer containing
return GNUNET_OK;
}
+
+/**
+ * Iterate over all currently known peers, add them to the
+ * fast gossip list for this peer so we get DV routing information
+ * out as fast as possible!
+ *
+ * @param cls the direct neighbor we will gossip to
+ * @param key the hashcode of the peer
+ * @param value the distant neighbor we should add to the list
+ *
+ * @return GNUNET_YES to continue iteration, GNUNET_NO otherwise
+ */
+static int add_all_extended_peers (void *cls,
+ const GNUNET_HashCode * key,
+ void *value)
+{
+ struct NeighborSendContext *send_context = (struct NeighborSendContext *)cls;
+ struct DistantNeighbor *distant = (struct DistantNeighbor *)value;
+ struct FastGossipNeighborList *gossip_entry;
+
+ if (memcmp(&send_context->toNeighbor->identity, &distant->identity, sizeof(struct GNUNET_PeerIdentity)) == 0)
+ return GNUNET_YES; /* Don't gossip to a peer about itself! */
+
+ GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "DV SERVICE: adding extended neighbor to fast send list\n");
+#if SUPPORT_HIDING
+ if (distant->hidden == GNUNET_YES)
+ return GNUNET_YES; /* This peer should not be gossipped about (hidden) */
+#endif
+ gossip_entry = GNUNET_malloc(sizeof(struct FastGossipNeighborList));
+ gossip_entry->about = distant;
+
+ GNUNET_CONTAINER_DLL_insert_after(send_context->fast_gossip_list_head,
+ send_context->fast_gossip_list_tail,
+ send_context->fast_gossip_list_tail,
+ gossip_entry);
+
+ return GNUNET_YES;
+}
+
+
+/**
+ * Iterate over all current direct peers, add newly connected peer
+ * to the fast gossip list for that peer so we get DV routing
+ * information out as fast as possible!
+ *
+ * @param cls the newly connected neighbor we will gossip about
+ * @param key the hashcode of the peer
+ * @param value the direct neighbor we should gossip to
+ *
+ * @return GNUNET_YES to continue iteration, GNUNET_NO otherwise
+ */
+static int add_all_direct_neighbors (void *cls,
+ const GNUNET_HashCode * key,
+ void *value)
+{
+ struct DirectNeighbor *direct = (struct DirectNeighbor *)value;
+ struct DirectNeighbor *to = (struct DirectNeighbor *)cls;
+ struct DistantNeighbor *distant;
+ struct NeighborSendContext *send_context = direct->send_context;
+ struct FastGossipNeighborList *gossip_entry;
+
+ distant = GNUNET_CONTAINER_multihashmap_get(ctx.extended_neighbors, &to->identity.hashPubKey);
+ if (distant == NULL)
+ return GNUNET_YES;
+
+ if (memcmp(&direct->identity, &to->identity, sizeof(struct GNUNET_PeerIdentity)) == 0)
+ return GNUNET_YES; /* Don't gossip to a peer about itself! */
+
+ GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "DV SERVICE: adding new DISTANT neighbor to fast send list\n");
+#if SUPPORT_HIDING
+ if (distant->hidden == GNUNET_YES)
+ return GNUNET_YES; /* This peer should not be gossipped about (hidden) */
+#endif
+ gossip_entry = GNUNET_malloc(sizeof(struct FastGossipNeighborList));
+ gossip_entry->about = distant;
+
+ GNUNET_CONTAINER_DLL_insert_after(send_context->fast_gossip_list_head,
+ send_context->fast_gossip_list_tail,
+ send_context->fast_gossip_list_tail,
+ gossip_entry);
+ if (send_context->task != GNUNET_SCHEDULER_NO_TASK)
+ GNUNET_SCHEDULER_cancel(sched, send_context->task);
+
+ send_context->task = GNUNET_SCHEDULER_add_now(sched, &neighbor_send_task, send_context);
+ return GNUNET_YES;
+}
+
+
static void
process_peerinfo (void *cls,
const struct GNUNET_PeerIdentity *peer,
&peer->hashPubKey,
&add_pkey_to_extended,
&neighbor->pkey);
+
+ GNUNET_CONTAINER_multihashmap_iterate (ctx.extended_neighbors, &add_all_extended_peers, neighbor->send_context);
+
+ GNUNET_CONTAINER_multihashmap_iterate (ctx.direct_neighbors, &add_all_direct_neighbors, neighbor);
neighbor->send_context->task = GNUNET_SCHEDULER_add_now(sched, &neighbor_send_task, neighbor->send_context);
}
}
+
/**
* Method called whenever a peer connects.
*
uint32_t distance)
{
struct DirectNeighbor *neighbor;
+ struct DistantNeighbor *about;
struct PeerIteratorContext *peerinfo_iterator;
#if DEBUG_DV
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
neighbor = GNUNET_malloc (sizeof (struct DirectNeighbor));
neighbor->send_context = GNUNET_malloc(sizeof(struct NeighborSendContext));
neighbor->send_context->toNeighbor = neighbor;
- neighbor->send_context->timeout = default_dv_delay; /* FIXME: base this on total gossip tasks, or bandwidth */
memcpy (&neighbor->identity, peer, sizeof (struct GNUNET_PeerIdentity));
- /*memcpy (&neighbor->pkey, ,sizeof(struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));*/
+
GNUNET_CONTAINER_multihashmap_put (ctx.direct_neighbors,
&peer->hashPubKey,
neighbor, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
GNUNET_TIME_UNIT_FOREVER_REL,
&process_peerinfo,
peerinfo_iterator);
+
/* Only add once we get the publicKey of this guy
*
* neighbor->send_context->task = GNUNET_SCHEDULER_add_now(sched, &neighbor_send_task, neighbor->send_context);
}
else
{
+ about = GNUNET_CONTAINER_multihashmap_get(ctx.extended_neighbors, &peer->hashPubKey);
+ if ((GNUNET_CONTAINER_multihashmap_get(ctx.direct_neighbors, &peer->hashPubKey) == NULL) && (about != NULL))
+ GNUNET_CONTAINER_multihashmap_iterate(ctx.direct_neighbors, &add_all_direct_neighbors, about);
+
#if DEBUG_DV
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"%s: Distance (%d) greater than %d or already know about peer (%s), not re-adding!\n", "dv", distance, DIRECT_NEIGHBOR_COST, GNUNET_i2s(peer));
{
struct DirectNeighbor *neighbor;
struct DistantNeighbor *referee;
-
+ struct FindDestinationContext fdc;
+ struct DisconnectContext disconnect_context;
#if DEBUG_DV
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"%s: Receives core peer disconnect message!\n", "dv");
}
while (NULL != (referee = neighbor->referee_head))
distant_neighbor_free (referee);
+
+ fdc.dest = NULL;
+ fdc.tid = 0;
+
+ GNUNET_CONTAINER_multihashmap_iterate (ctx.extended_neighbors, &find_distant_peer, &fdc);
+
+ if (fdc.dest != NULL)
+ {
+ disconnect_context.direct = neighbor;
+ disconnect_context.distant = fdc.dest;
+ GNUNET_CONTAINER_multihashmap_iterate (ctx.direct_neighbors, &schedule_disconnect_messages, &disconnect_context);
+ }
+
GNUNET_assert (neighbor->referee_tail == NULL);
GNUNET_CONTAINER_multihashmap_remove (ctx.direct_neighbors,
- &peer->hashPubKey, neighbor);
+ &peer->hashPubKey, neighbor);
if ((neighbor->send_context != NULL) && (neighbor->send_context->task != GNUNET_SCHEDULER_NO_TASK))
GNUNET_SCHEDULER_cancel(sched, neighbor->send_context->task);
GNUNET_free (neighbor);
struct GNUNET_SERVER_Handle *server,
const struct GNUNET_CONFIGURATION_Handle *c)
{
- struct GNUNET_TIME_Relative timeout;
unsigned long long max_hosts;
- timeout = GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 5);
sched = scheduler;
cfg = c;
/* FIXME: Read from config, or calculate, or something other than this! */
- max_hosts = 50;
- ctx.max_table_size = 100;
- ctx.fisheye_depth = 3;
+ max_hosts = DEFAULT_DIRECT_CONNECTIONS;
+ ctx.max_table_size = DEFAULT_DV_SIZE;
+ ctx.fisheye_depth = DEFAULT_FISHEYE_DEPTH;
+
+ if (GNUNET_CONFIGURATION_have_value(cfg, "dv", "max_direct_connections"))
+ {
+ GNUNET_CONFIGURATION_get_value_number(cfg, "dv", "max_direct_connections", &max_hosts);
+ }
+
+ if (GNUNET_CONFIGURATION_have_value(cfg, "dv", "max_total_connections"))
+ {
+ GNUNET_CONFIGURATION_get_value_number(cfg, "dv", "max_total_connections", &ctx.max_table_size);
+ }
+
+ if (GNUNET_CONFIGURATION_have_value(cfg, "dv", "fisheye_depth"))
+ {
+ GNUNET_CONFIGURATION_get_value_number(cfg, "dv", "fisheye_depth", &ctx.fisheye_depth);
+ }
ctx.neighbor_min_heap =
GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
ctx.direct_neighbors = GNUNET_CONTAINER_multihashmap_create (max_hosts);
ctx.extended_neighbors =
GNUNET_CONTAINER_multihashmap_create (ctx.max_table_size * 3);
- client_transmit_timeout = GNUNET_TIME_relative_get_forever(); /* Only timeout on disconnect */
- default_dv_delay = GNUNET_TIME_relative_get_forever(); /* Only timeout on disconnect */
GNUNET_SERVER_add_handlers (server, plugin_handlers);
coreAPI =
GNUNET_CORE_connect (sched,
cfg,
- timeout,
+ GNUNET_TIME_relative_get_forever(),
NULL, /* FIXME: anything we want to pass around? */
&core_init,
&handle_core_connect,