* How often do we check about sending out more peer information (if
* we are connected to no peers previously).
*/
-#define GNUNET_DV_DEFAULT_SEND_INTERVAL (500 * GNUNET_CRON_MILLISECONDS)
+#define GNUNET_DV_DEFAULT_SEND_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 500))
/**
* How long do we wait at most between sending out information?
*/
-#define GNUNET_DV_MAX_SEND_INTERVAL (5000 * GNUNET_CRON_MILLISECONDS)
+#define GNUNET_DV_MAX_SEND_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 5))
/**
* How long can we have not heard from a peer and
* still have it in our tables?
*/
-#define GNUNET_DV_PEER_EXPIRATION_TIME (3000 * GNUNET_CRON_SECONDS)
+#define GNUNET_DV_PEER_EXPIRATION_TIME GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 1000))
/**
* Priority for gossip.
* How often should we check if expiration time has elapsed for
* some peer?
*/
-#define GNUNET_DV_MAINTAIN_FREQUENCY (5 * GNUNET_CRON_SECONDS)
+#define GNUNET_DV_MAINTAIN_FREQUENCY GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 5))
/**
* How long to allow a message to be delayed?
*/
-#define DV_DELAY (5000 * GNUNET_CRON_MILLISECONDS)
+#define DV_DELAY GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 5))
/**
* Priority to use for DV data messages.
*/
static struct GNUNET_SERVER_Client * client_handle;
+/**
+ * Task to run when we shut down, cleaning up all our trash
+ */
GNUNET_SCHEDULER_TaskIdentifier cleanup_task;
+/**
+ * Task to run to gossip about peers. Will reschedule itself forever until shutdown!
+ */
+GNUNET_SCHEDULER_TaskIdentifier gossip_task;
+
/**
* Struct where neighbor information is stored.
*/
struct DistantNeighbor *referees;
+static struct GNUNET_TIME_Relative client_transmit_timeout;
+
+static struct GNUNET_TIME_Relative default_dv_delay;
+
+static size_t default_dv_priority = 0;
+
+
+/**
+ * Pending message struct, also a test to see if these can
+ * safely ONLY be freed by callback.
+ */
+struct PendingMessage
+{
+ /**
+ * Copy of message to be sent
+ */
+ struct GNUNET_MessageHeader *msg;
+
+ /**
+ * Size of message to be sent
+ */
+ size_t msg_size;
+
+ /**
+ * Transmit handle, for cancellation if necessary.
+ */
+ struct GNUNET_CORE_TransmitHandle *transmit_handle;
+
+};
+
+
/**
* Struct to hold information for updating existing neighbors
*/
unsigned long long max_table_size;
- unsigned int send_interval;
+ struct GNUNET_TIME_Relative send_interval;
unsigned int neighbor_id_loc;
int closing;
-};
-static char shortID[5];
+};
static struct GNUNET_DV_Context ctx;
struct GNUNET_DV_MessageReceived *received_msg;
int size;
- if (ntohs(msg->size) < sizeof(struct GNUNET_DV_MessageReceived))
- return;
-
- size = sizeof(struct GNUNET_DV_MessageReceived) + message_size + sizeof(struct GNUNET_PeerIdentity);
+ size = sizeof(struct GNUNET_DV_MessageReceived) + sizeof(struct GNUNET_PeerIdentity) + message_size;
received_msg = GNUNET_malloc(size);
received_msg->header.size = htons(size);
received_msg->header.type = htons(GNUNET_MESSAGE_TYPE_TRANSPORT_DV_RECEIVE);
received_msg->sender_address_len = sizeof(struct GNUNET_PeerIdentity);
received_msg->distance = htonl(distant_neighbor->cost);
+ received_msg->msg_len = htons(message_size);
/* Set the sender in this message to be the original sender! */
memcpy(&received_msg->sender, &distant_neighbor->identity, sizeof(struct GNUNET_PeerIdentity));
/* Copy the intermediate sender to the end of the message, this is how the transport identifies this peer */
memcpy(&received_msg[1], sender, sizeof(struct GNUNET_PeerIdentity));
+ /* Copy the actual message after the sender */
+ memcpy(&received_msg[1 + sizeof(struct GNUNET_PeerIdentity)], message, message_size);
/* FIXME: Send to the client please */
GNUNET_SERVER_notify_transmit_ready (client_handle,
- size, CLIENT_TRANSMIT_TIMEOUT,
+ size, client_transmit_timeout,
&transmit_to_plugin, &received_msg);
}
+
+/**
+ * Function called to notify a client about the socket
+ * begin ready to queue more data. "buf" will be
+ * NULL and "size" zero if the socket was closed for
+ * writing in the meantime.
+ *
+ * @param cls closure
+ * @param size number of bytes available in buf
+ * @param buf where the callee should write the message
+ * @return number of bytes written to buf
+ */
+size_t core_transmit_notify (void *cls,
+ size_t size, void *buf)
+{
+ struct PendingMessage *pending_message = cls;
+ char *send_buf = buf;
+ if ((buf == NULL) || (size < pending_message->msg_size))
+ {
+ return 0;
+ }
+
+ memcpy(send_buf, pending_message->msg, pending_message->msg_size);
+
+ GNUNET_free(pending_message->msg);
+ GNUNET_free(pending_message);
+
+ return size;
+}
+
+/**
+ * Send a DV data message via DV.
+ *
+ * @param recipient the ultimate recipient of this message
+ * @param the original sender of the message
+ * @param message the packed message
+ * @param importance what priority to send this message with
+ * @param timeout how long to possibly delay sending this message
+ */
+static int
+send_message (const struct GNUNET_PeerIdentity * recipient,
+ const struct GNUNET_PeerIdentity * sender,
+ const struct GNUNET_MessageHeader * message,
+ unsigned int importance, struct GNUNET_TIME_Relative timeout)
+{
+ p2p_dv_MESSAGE_Data *toSend;
+ unsigned int msg_size;
+ unsigned int cost;
+ unsigned int recipient_id;
+ unsigned int sender_id;
+ struct DistantNeighbor *target;
+ struct DistantNeighbor *source;
+ struct PendingMessage *pending_message;
+
+ msg_size = ntohs (message->size) + sizeof (p2p_dv_MESSAGE_Data);
+
+ target = GNUNET_CONTAINER_multihashmap_get (ctx.extended_neighbors,
+ &recipient->hashPubKey);
+ if (target == NULL)
+ {
+ /* target unknown to us, drop! */
+ return GNUNET_SYSERR;
+ }
+ recipient_id = target->referrer_id;
+
+ source = GNUNET_CONTAINER_multihashmap_get (ctx.extended_neighbors,
+ &sender->hashPubKey);
+ if (source == NULL)
+ {
+ if (0 != (memcmp (my_identity,
+ sender, sizeof (struct GNUNET_PeerIdentity))))
+ {
+ /* sender unknown to us, drop! */
+ return GNUNET_SYSERR;
+ }
+ sender_id = 0; /* 0 == us */
+ }
+ else
+ {
+ /* find out the number that we use when we gossip about
+ the sender */
+ sender_id = source->our_id;
+ }
+
+ cost = target->cost;
+ pending_message = GNUNET_malloc(sizeof(struct PendingMessage));
+ pending_message->msg = GNUNET_malloc (msg_size);
+ toSend = (p2p_dv_MESSAGE_Data *)pending_message->msg;
+ toSend->header.size = htons (msg_size);
+ toSend->header.type = htons (GNUNET_MESSAGE_TYPE_DV_DATA);
+ toSend->sender = htonl (sender_id);
+ toSend->recipient = htonl (recipient_id);
+ memcpy (&toSend[1], message, ntohs (message->size));
+ pending_message->msg_size = msg_size;
+
+ pending_message->transmit_handle = GNUNET_CORE_notify_transmit_ready(coreAPI, importance, timeout, &target->referrer->identity, msg_size, &core_transmit_notify, pending_message);
+ if (NULL == pending_message->transmit_handle)
+ {
+ GNUNET_free (pending_message->msg);
+ GNUNET_free (pending_message);
+ return GNUNET_SYSERR;
+ }
+
+ /*coreAPI->ciphertext_send (&target->referrer->identity,
+ &toSend->header, importance, maxdelay);*/
+ return (int) cost;
+}
+
+
/**
* Core handler for dv data messages. Whatever this message
* contains all we really have to do is rip it out of its
if ( (ntohs (packed_message->type) != GNUNET_MESSAGE_TYPE_DV_GOSSIP) &&
(ntohs (packed_message->type) != GNUNET_MESSAGE_TYPE_DV_DATA) )
{
- /* FIXME: send the message, wrap it up and return it to the DV plugin */
- /*coreAPI->loopback_send (&original_sender, (const char *) packed_message,
- ntohs (packed_message->size), GNUNET_YES, NULL);*/
send_to_plugin(peer, packed_message, ntohs(packed_message->size), pos);
}
return GNUNET_OK;
}
+ /* At this point we have a message, and we need to forward it on to the
+ * next DV hop.
+ */
/* FIXME: Can't send message on, we have to behave.
* We have to tell core we have a message for the next peer, and let
* transport do transport selection on how to get this message to 'em */
/*ret = send_message (&destination,
&original_sender,
packed_message, DV_PRIORITY, DV_DELAY);*/
- send_to_core(&destination, &original_sender, packed_message, DV_PRIORITY, DV_DELAY);
+ ret = send_message(&destination, &original_sender, packed_message, default_dv_priority, default_dv_delay);
- return GNUNET_OK;
+ if (ret != GNUNET_SYSERR)
+ return GNUNET_OK;
+ else
+ return GNUNET_SYSERR;
+}
+
+
+/**
+ * Thread which chooses a peer to gossip about and a peer to gossip
+ * to, then constructs the message and sends it out. Will run until
+ * done_module_dv is called.
+ */
+static void
+neighbor_send_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+#if DEBUG_DV
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Entering neighbor_send_thread...\n",
+ GNUNET_i2s(my_identity));
+ char * encPeerAbout;
+ char * encPeerTo;
+#endif
+ struct DistantNeighbor *about;
+ struct DirectNeighbor *to;
+
+ p2p_dv_MESSAGE_NeighborInfo *message;
+ struct PendingMessage *pending_message;
+
+ if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN)
+ return;
+
+ about = GNUNET_CONTAINER_heap_walk_get_next (ctx.neighbor_min_heap);
+ to = GNUNET_CONTAINER_heap_get_random (ctx.neighbor_min_heap, GNUNET_CONTAINER_multihashmap_size(ctx.direct_neighbors));
+
+ if ((about != NULL) && (to != about->referrer /* split horizon */ ) &&
+#if SUPPORT_HIDING
+ (about->hidden == GNUNET_NO) &&
+#endif
+ (to != NULL) &&
+ (0 != memcmp (&about->identity,
+ &to->identity, sizeof (struct GNUNET_PeerIdentity))))
+ {
+#if DEBUG_DV
+ encPeerAbout = GNUNET_strdup(GNUNET_i2s(&about->identity));
+ encPeerTo = GNUNET_strdup(GNUNET_i2s(&to->identity));
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Sending info about peer %s to directly connected peer %s\n",
+ GNUNET_i2s(my_identity),
+ encPeerAbout, encPeerTo);
+#endif
+ pending_message = GNUNET_malloc(sizeof(struct PendingMessage));
+ pending_message->msg = GNUNET_malloc(sizeof(p2p_dv_MESSAGE_NeighborInfo));
+ message = (p2p_dv_MESSAGE_NeighborInfo *)pending_message->msg;
+ message->header.size = htons (sizeof (p2p_dv_MESSAGE_NeighborInfo));
+ message->header.type = htons (GNUNET_MESSAGE_TYPE_DV_GOSSIP);
+ message->cost = htonl (about->cost);
+ message->neighbor_id = htonl (about->our_id);
+ memcpy (&message->neighbor,
+ &about->identity, sizeof (struct GNUNET_PeerIdentity));
+
+ pending_message->transmit_handle = GNUNET_CORE_notify_transmit_ready(coreAPI, default_dv_priority, default_dv_delay, &to->identity, sizeof(p2p_dv_MESSAGE_NeighborInfo), &core_transmit_notify, pending_message);
+
+ if (NULL == pending_message->transmit_handle)
+ {
+ GNUNET_free (pending_message->msg);
+ GNUNET_free (pending_message);
+ return;
+ }
+ /*coreAPI->ciphertext_send (&to->identity, &message.header,
+ GNUNET_DV_DHT_GOSSIP_PRIORITY,
+ ctx.send_interval);*/
+ }
+
+ gossip_task = GNUNET_SCHEDULER_add_delayed(sched, ctx.send_interval, &neighbor_send_task, NULL);
+ return;
}
/**
timeout = GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 5);
sched = scheduler;
cfg = c;
+
+ client_transmit_timeout = GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 5);
+ default_dv_delay = GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 5);
GNUNET_SERVER_add_handlers (server, plugin_handlers);
coreAPI =
GNUNET_CORE_connect (sched,
/* Scheduled the task to clean up when shutdown is called */
+ gossip_task = GNUNET_SCHEDULER_add_delayed(sched, ctx.send_interval, &neighbor_send_task, NULL);
cleanup_task = GNUNET_SCHEDULER_add_delayed (sched,
GNUNET_TIME_UNIT_FOREVER_REL,
&shutdown_task,