#include "gnunet-service-fs.h"
#include "gnunet-service-fs_cp.h"
+/**
+ * How often do we flush trust values to disk?
+ */
+#define TRUST_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
+
struct GSF_PeerTransmitHandle
{
+ /**
+ * Handle for an active request for transmission to this
+ * peer, or NULL (if core queue was full).
+ */
+ struct GNUNET_CORE_TransmitHandle *cth;
+
/**
* Time when this transmission request was issued.
*/
struct GNUNET_TIME_Absolute transmission_request_start_time;
+ /**
+ * Timeout for this request.
+ */
+ struct GNUNET_TIME_Absolute timeout;
+
+ /**
+ * Task called on timeout, or 0 for none.
+ */
+ GNUNET_SCHEDULER_TaskIdentifier timeout_task;
+
+ /**
+ * Function to call to get the actual message.
+ */
+ GSF_GetMessageCallback gmc;
+
+ /**
+ * Peer this request targets.
+ */
+ struct GSF_ConnectedPeer *cp;
+
+ /**
+ * Closure for 'gmc'.
+ */
+ void *gmc_cls;
+
+ /**
+ * Size of the message to be transmitted.
+ */
+ size_t size;
+
+ /**
+ * GNUNET_YES if this is a query, GNUNET_NO for content.
+ */
+ int is_query;
+
+ /**
+ * Priority of this request.
+ */
+ uint32_t priority;
};
*/
struct GNUNET_TIME_Absolute last_migration_block;
- /**
- * Handle for an active request for transmission to this
- * peer, or NULL.
- */
- struct GNUNET_CORE_TransmitHandle *cth;
-
/**
* Messages (replies, queries, content migration) we would like to
* send to this peer in the near future. Sorted by priority, head.
};
+/**
+ * Map from peer identities to 'struct GSF_ConnectPeer' entries.
+ */
+static struct GNUNET_CONTAINER_MultiHashMap *cp_map;
+
+
+/**
+ * Where do we store trust information?
+ */
+static char *trustDirectory;
+
+
+/**
+ * Get the filename under which we would store the GNUNET_HELLO_Message
+ * for the given host and protocol.
+ * @return filename of the form DIRECTORY/HOSTID
+ */
+static char *
+get_trust_filename (const struct GNUNET_PeerIdentity *id)
+{
+ struct GNUNET_CRYPTO_HashAsciiEncoded fil;
+ char *fn;
+
+ GNUNET_CRYPTO_hash_to_enc (&id->hashPubKey, &fil);
+ GNUNET_asprintf (&fn, "%s%s%s", trustDirectory, DIR_SEPARATOR_STR, &fil);
+ return fn;
+}
+
+
+/**
+ * Find latency information in 'atsi'.
+ *
+ * @param atsi performance data
+ * @return connection latency
+ */
+static struct GNUNET_TIME_Relative
+get_latency (const struct GNUNET_TRANSPORT_ATS_Information *atsi)
+{
+ if (atsi == NULL)
+ return GNUNET_TIME_UNIT_SECONDS;
+ while ( (ntohl (atsi->type) != GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR) &&
+ (ntohl (atsi->type) != GNUNET_TRANSPORT_ATS_QUALITY_NET_DELAY) )
+ atsi++;
+ if (ntohl (atsi->type) == GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR)
+ {
+ GNUNET_break (0);
+ /* how can we not have latency data? */
+ return GNUNET_TIME_UNIT_SECONDS;
+ }
+ return GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
+ ntohl (atsi->value));
+}
+
+
+/**
+ * Update the performance information kept for the given peer.
+ *
+ * @param cp peer record to update
+ * @param atsi transport performance data
+ */
+static void
+update_atsi (struct GSF_ConnectedPeer *cp,
+ const struct GNUNET_TRANSPORT_ATS_Information *atsi)
+{
+ // FIXME: merge atsi into cp's performance data!
+}
+
+
/**
* A peer connected to us. Setup the connected peer
* records.
GSF_peer_connect_handler_ (const struct GNUNET_PeerIdentity *peer,
const struct GNUNET_TRANSPORT_ATS_Information *atsi)
{
- // FIXME
- return NULL;
+ struct GSF_ConnectedPeer *cp;
+ char *fn;
+ uint32_t trust;
+ struct GNUNET_TIME_Relative latency;
+
+ cp = GNUNET_malloc (sizeof (struct GSF_ConnectedPeer));
+ cp->transmission_delay = GNUNET_LOAD_value_init (latency);
+ cp->pid = GNUNET_PEER_intern (peer);
+ fn = get_trust_filename (peer);
+ if ((GNUNET_DISK_file_test (fn) == GNUNET_YES) &&
+ (sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust))))
+ cp->disk_trust = cp->trust = ntohl (trust);
+ GNUNET_free (fn);
+ GNUNET_break (GNUNET_OK ==
+ GNUNET_CONTAINER_multihashmap_put (cp_map,
+ &peer->hashPubKey,
+ cp,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+ update_atsi (cp, atsi);
+
+
+ // FIXME: notify plan & migration about new peer!
+
+ return cp;
+}
+
+
+/**
+ * Core is ready to transmit to a peer, get the message.
+ *
+ * @param cls the 'struct GSF_PeerTransmitHandle' of the message
+ * @param size number of bytes core is willing to take
+ * @param buf where to copy the message
+ * @return number of bytes copied to buf
+ */
+static size_t
+peer_transmit_ready_cb (void *cls,
+ size_t size,
+ void *buf)
+{
+ struct GSF_PeerTransmitHandle *pth = cls;
+ struct GSF_ConnectedPeer *cp;
+ size_t ret;
+
+ cp = pth->cp;
+ GNUNET_CONTAINER_DLL_remove (cp->pth_head,
+ cp->pth_tail,
+ pth);
+ // FIXME: update 'cp' counters!
+ ret = pth->gmc (pth->gmc_cls,
+ 0, NULL);
+ GNUNET_free (pth);
+ return ret;
+}
+
+
+/**
+ * Function called if there has been a timeout trying to satisfy
+ * a transmission request.
+ *
+ * @param cls the 'struct GSF_PeerTransmitHandle' of the request
+ * @param tc scheduler context
+ */
+static void
+peer_transmit_timeout (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct GSF_PeerTransmitHandle *pth = cls;
+ struct GSF_ConnectedPeer *cp;
+
+ pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+ cp = pth->cp;
+ GNUNET_CONTAINER_DLL_remove (cp->pth_head,
+ cp->pth_tail,
+ pth);
+ // FIXME: update 'cp' counters!
+ pth->gmc (pth->gmc_cls,
+ 0, NULL);
+ GNUNET_free (pth);
}
GSF_GetMessageCallback gmc,
void *gmc_cls)
{
- // FIXME
- return NULL;
+ struct GSF_ConnectedPeer *cp;
+ struct GSF_PeerTransmitHandle *pth;
+ struct GSF_PeerTransmitHandle *pos;
+ struct GSF_PeerTransmitHandle *prev;
+ struct GNUNET_PeerIdentity target;
+
+ cp = GNUNET_CONTAINER_multihashmap_get (cp_map,
+ &peer->hashPubKey);
+ GNUNET_assert (NULL != cp);
+ pth = GNUNET_malloc (sizeof (struct GSF_PeerTransmitHandle));
+ pth->transmission_request_start_time = GNUNET_TIME_absolute_now ();
+ pth->timeout = GNUNET_TIME_relative_to_absolute (timeout);
+ pth->gmc = gmc;
+ pth->gmc_cls = gmc_cls;
+ pth->size = size;
+ pth->is_query = is_query;
+ pth->priority = priority;
+ pth->cp = cp;
+ /* insertion sort (by priority, descending) */
+ prev = NULL;
+ pos = cp->pth_head;
+ while ( (pos != NULL) &&
+ (pos->priority > priority) )
+ {
+ prev = pos;
+ pos = pos->next;
+ }
+ if (prev == NULL)
+ GNUNET_CONTAINER_DLL_insert_head (cp->pth_head,
+ cp->pth_tail,
+ pth);
+ else
+ GNUNET_CONTAINER_DLL_insert_after (cp->pth_head,
+ cp->pth_tail,
+ prev,
+ pth);
+ GNUNET_PEER_resolve (cp->pid,
+ &target);
+ pth->cth = GNUNET_CORE_notify_transmit_ready (core,
+ priority,
+ timeout,
+ &target,
+ size,
+ &peer_transmit_ready_cb,
+ pth);
+ /* pth->cth could be NULL here, that's OK, we'll try again
+ later... */
+ if (pth->cth == NULL)
+ pth->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout,
+ &peer_transmit_timeout,
+ pth);
+ return pth;
}
/**
* Cancel an earlier request for transmission.
+ *
+ * @param pth request to cancel
*/
void
GSF_peer_transmit_cancel_ (struct GSF_PeerTransmitHandle *pth)
{
+ struct GSF_PeerTransmitHandle *pth = cls;
+ struct GSF_ConnectedPeer *cp;
+
+ if (pth->timeout_task != GNUNET_SCHEDULER_NO_TASK)
+ {
+ GNUNET_SCHEDULER_cancel (pth->timeout_task);
+ pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+ }
+ cp = pth->cp;
+ GNUNET_CONTAINER_DLL_remove (cp->pth_head,
+ cp->pth_tail,
+ pth);
+ // FIXME: update 'cp' counters!
+ GNUNET_free (pth);
}
const struct GSF_LocalClient *initiator_client,
const struct GSF_ConnectedPeer *initiator_peer)
{
+ // FIXME...
}
struct GNUNET_TIME_Absolute timeout,
const struct GNUNET_TRANSPORT_ATS_Information *atsi)
{
+ struct GSF_ConnectedPeer *cp;
+
+ cp = GNUNET_CONTAINER_multihashmap_get (cp_map,
+ &peer->hashPubKey);
+ GNUNET_assert (NULL != cp);
+ update_atsi (cp, atsi);
}
GSF_peer_disconnect_handler_ (void *cls,
const struct GNUNET_PeerIdentity *peer)
{
+ struct GSF_ConnectedPeer *cp;
+
+ cp = GNUNET_CONTAINER_multihashmap_get (cp_map,
+ &peer->hashPubKey);
+ GNUNET_assert (NULL != cp);
+ GNUNET_CONTAINER_multihashmap_remove (cp_map,
+ &peer->hashPubKey,
+ cp);
+ // FIXME: more cleanup
+ GNUNET_free (cp);
+}
+
+
+/**
+ * Closure for 'call_iterator'.
+ */
+struct IterationContext
+{
+ /**
+ * Function to call on each entry.
+ */
+ GSF_ConnectedPeerIterator it;
+
+ /**
+ * Closure for 'it'.
+ */
+ void *it_cls;
+};
+
+
+/**
+ * Function that calls the callback for each peer.
+ *
+ * @param cls the 'struct IterationContext*'
+ * @param key identity of the peer
+ * @param value the 'struct GSF_ConnectedPeer*'
+ * @return GNUNET_YES to continue iteration
+ */
+static int
+call_iterator (void *cls,
+ const GNUNET_HashCode *key,
+ void *value)
+{
+ struct IterationContext *ic = cls;
+ struct GSF_ConnectedPeer *cp = value;
+
+ ic->it (ic->it_cls,
+ (const struct GNUNET_PeerIdentity*) key,
+ cp,
+ &cp->ppd);
+ return GNUNET_YES;
}
GSF_iterate_connected_peers_ (GSF_ConnectedPeerIterator it,
void *it_cls)
{
+ struct IterationContext ic;
+
+ ic.it = it;
+ ic.it_cls = it_cls;
+ GNUNET_CONTAINER_multihashmap_iterate (cp_map,
+ &call_iterator,
+ &ic);
}
}
+/**
+ * Write host-trust information to a file - flush the buffer entry!
+ *
+ * @param cls closure, not used
+ * @param key host identity
+ * @param value the 'struct GSF_ConnectedPeer' to flush
+ * @return GNUNET_OK to continue iteration
+ */
+static int
+flush_trust (void *cls,
+ const GNUNET_HashCode *key,
+ void *value)
+{
+ struct GSF_ConnectedPeer *cp = value;
+ char *fn;
+ uint32_t trust;
+ struct GNUNET_PeerIdentity pid;
+
+ if (cp->trust == cp->disk_trust)
+ return GNUNET_OK; /* unchanged */
+ GNUNET_PEER_resolve (cp->pid,
+ &pid);
+ fn = get_trust_filename (&pid);
+ if (cp->trust == 0)
+ {
+ if ((0 != UNLINK (fn)) && (errno != ENOENT))
+ GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING |
+ GNUNET_ERROR_TYPE_BULK, "unlink", fn);
+ }
+ else
+ {
+ trust = htonl (cp->trust);
+ if (sizeof(uint32_t) == GNUNET_DISK_fn_write (fn, &trust,
+ sizeof(uint32_t),
+ GNUNET_DISK_PERM_USER_READ | GNUNET_DISK_PERM_USER_WRITE
+ | GNUNET_DISK_PERM_GROUP_READ | GNUNET_DISK_PERM_OTHER_READ))
+ cp->disk_trust = cp->trust;
+ }
+ GNUNET_free (fn);
+ return GNUNET_OK;
+}
+
+
+/**
+ * Call this method periodically to flush trust information to disk.
+ *
+ * @param cls closure, not used
+ * @param tc task context, not used
+ */
+static void
+cron_flush_trust (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+
+ if (NULL == cp_map)
+ return;
+ GNUNET_CONTAINER_multihashmap_iterate (cp_map,
+ &flush_trust,
+ NULL);
+ if (NULL == tc)
+ return;
+ if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
+ return;
+ GNUNET_SCHEDULER_add_delayed (TRUST_FLUSH_FREQ,
+ &cron_flush_trust,
+ NULL);
+}
+
+
+/**
+ * Initialize peer management subsystem.
+ *
+ * @param cfg configuration to use
+ */
+void
+GSF_connected_peer_init_ (struct GNUNET_CONFIGURATION_Handle *cfg)
+{
+ cp_map = GNUNET_CONTAINER_multihashmap_create (128);
+ GNUNET_assert (GNUNET_OK ==
+ GNUNET_CONFIGURATION_get_value_filename (cfg,
+ "fs",
+ "TRUST",
+ &trustDirectory));
+ GNUNET_DISK_directory_create (trustDirectory);
+ GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_HIGH,
+ &cron_flush_trust, NULL);
+}
+
+
+/**
+ * Iterator to free peer entries.
+ *
+ * @param cls closure, unused
+ * @param key current key code
+ * @param value value in the hash map (peer entry)
+ * @return GNUNET_YES (we should continue to iterate)
+ */
+static int
+clean_peer (void *cls,
+ const GNUNET_HashCode * key,
+ void *value)
+{
+ GSF_peer_disconnect_handler_ (NULL,
+ (const struct GNUNET_PeerIdentity*) key);
+ return GNUNET_YES;
+}
+
+
+/**
+ * Shutdown peer management subsystem.
+ */
+void
+GSF_connected_peer_done_ ()
+{
+ cron_flush_trust (NULL, NULL);
+ GNUNET_CONTAINER_multihashmap_iterate (cp_peers,
+ &clean_peer,
+ NULL);
+ GNUNET_CONTAINER_multihashmap_destroy (cp_map);
+ cp_map = NULL;
+ GNUNET_free (trustDirectory);
+ trustDirectory = NULL;
+}
+
+
+
#endif
/* end of gnunet-service-fs_cp.h */