From f05ce1ce047dbc9755b0db38d41c3ef538b74527 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Sun, 30 Jan 2011 23:23:13 +0000 Subject: [PATCH] stuff --- src/fs/gnunet-service-fs_cp.c | 466 +++++++++++++++++++++++++++++++++- src/fs/gnunet-service-fs_cp.h | 18 ++ 2 files changed, 474 insertions(+), 10 deletions(-) diff --git a/src/fs/gnunet-service-fs_cp.c b/src/fs/gnunet-service-fs_cp.c index 6f3f06d3f..48e850cab 100644 --- a/src/fs/gnunet-service-fs_cp.c +++ b/src/fs/gnunet-service-fs_cp.c @@ -27,15 +27,65 @@ #include "gnunet-service-fs.h" #include "gnunet-service-fs_cp.h" +/** + * How often do we flush trust values to disk? + */ +#define TRUST_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5) + struct GSF_PeerTransmitHandle { + /** + * Handle for an active request for transmission to this + * peer, or NULL (if core queue was full). + */ + struct GNUNET_CORE_TransmitHandle *cth; + /** * Time when this transmission request was issued. */ struct GNUNET_TIME_Absolute transmission_request_start_time; + /** + * Timeout for this request. + */ + struct GNUNET_TIME_Absolute timeout; + + /** + * Task called on timeout, or 0 for none. + */ + GNUNET_SCHEDULER_TaskIdentifier timeout_task; + + /** + * Function to call to get the actual message. + */ + GSF_GetMessageCallback gmc; + + /** + * Peer this request targets. + */ + struct GSF_ConnectedPeer *cp; + + /** + * Closure for 'gmc'. + */ + void *gmc_cls; + + /** + * Size of the message to be transmitted. + */ + size_t size; + + /** + * GNUNET_YES if this is a query, GNUNET_NO for content. + */ + int is_query; + + /** + * Priority of this request. + */ + uint32_t priority; }; @@ -57,12 +107,6 @@ struct GSF_ConnectedPeer */ struct GNUNET_TIME_Absolute last_migration_block; - /** - * Handle for an active request for transmission to this - * peer, or NULL. - */ - struct GNUNET_CORE_TransmitHandle *cth; - /** * Messages (replies, queries, content migration) we would like to * send to this peer in the near future. Sorted by priority, head. @@ -126,6 +170,74 @@ struct GSF_ConnectedPeer }; +/** + * Map from peer identities to 'struct GSF_ConnectPeer' entries. + */ +static struct GNUNET_CONTAINER_MultiHashMap *cp_map; + + +/** + * Where do we store trust information? + */ +static char *trustDirectory; + + +/** + * Get the filename under which we would store the GNUNET_HELLO_Message + * for the given host and protocol. + * @return filename of the form DIRECTORY/HOSTID + */ +static char * +get_trust_filename (const struct GNUNET_PeerIdentity *id) +{ + struct GNUNET_CRYPTO_HashAsciiEncoded fil; + char *fn; + + GNUNET_CRYPTO_hash_to_enc (&id->hashPubKey, &fil); + GNUNET_asprintf (&fn, "%s%s%s", trustDirectory, DIR_SEPARATOR_STR, &fil); + return fn; +} + + +/** + * Find latency information in 'atsi'. + * + * @param atsi performance data + * @return connection latency + */ +static struct GNUNET_TIME_Relative +get_latency (const struct GNUNET_TRANSPORT_ATS_Information *atsi) +{ + if (atsi == NULL) + return GNUNET_TIME_UNIT_SECONDS; + while ( (ntohl (atsi->type) != GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR) && + (ntohl (atsi->type) != GNUNET_TRANSPORT_ATS_QUALITY_NET_DELAY) ) + atsi++; + if (ntohl (atsi->type) == GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR) + { + GNUNET_break (0); + /* how can we not have latency data? */ + return GNUNET_TIME_UNIT_SECONDS; + } + return GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, + ntohl (atsi->value)); +} + + +/** + * Update the performance information kept for the given peer. + * + * @param cp peer record to update + * @param atsi transport performance data + */ +static void +update_atsi (struct GSF_ConnectedPeer *cp, + const struct GNUNET_TRANSPORT_ATS_Information *atsi) +{ + // FIXME: merge atsi into cp's performance data! +} + + /** * A peer connected to us. Setup the connected peer * records. @@ -138,8 +250,85 @@ struct GSF_ConnectedPeer * GSF_peer_connect_handler_ (const struct GNUNET_PeerIdentity *peer, const struct GNUNET_TRANSPORT_ATS_Information *atsi) { - // FIXME - return NULL; + struct GSF_ConnectedPeer *cp; + char *fn; + uint32_t trust; + struct GNUNET_TIME_Relative latency; + + cp = GNUNET_malloc (sizeof (struct GSF_ConnectedPeer)); + cp->transmission_delay = GNUNET_LOAD_value_init (latency); + cp->pid = GNUNET_PEER_intern (peer); + fn = get_trust_filename (peer); + if ((GNUNET_DISK_file_test (fn) == GNUNET_YES) && + (sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust)))) + cp->disk_trust = cp->trust = ntohl (trust); + GNUNET_free (fn); + GNUNET_break (GNUNET_OK == + GNUNET_CONTAINER_multihashmap_put (cp_map, + &peer->hashPubKey, + cp, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + update_atsi (cp, atsi); + + + // FIXME: notify plan & migration about new peer! + + return cp; +} + + +/** + * Core is ready to transmit to a peer, get the message. + * + * @param cls the 'struct GSF_PeerTransmitHandle' of the message + * @param size number of bytes core is willing to take + * @param buf where to copy the message + * @return number of bytes copied to buf + */ +static size_t +peer_transmit_ready_cb (void *cls, + size_t size, + void *buf) +{ + struct GSF_PeerTransmitHandle *pth = cls; + struct GSF_ConnectedPeer *cp; + size_t ret; + + cp = pth->cp; + GNUNET_CONTAINER_DLL_remove (cp->pth_head, + cp->pth_tail, + pth); + // FIXME: update 'cp' counters! + ret = pth->gmc (pth->gmc_cls, + 0, NULL); + GNUNET_free (pth); + return ret; +} + + +/** + * Function called if there has been a timeout trying to satisfy + * a transmission request. + * + * @param cls the 'struct GSF_PeerTransmitHandle' of the request + * @param tc scheduler context + */ +static void +peer_transmit_timeout (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct GSF_PeerTransmitHandle *pth = cls; + struct GSF_ConnectedPeer *cp; + + pth->timeout_task = GNUNET_SCHEDULER_NO_TASK; + cp = pth->cp; + GNUNET_CONTAINER_DLL_remove (cp->pth_head, + cp->pth_tail, + pth); + // FIXME: update 'cp' counters! + pth->gmc (pth->gmc_cls, + 0, NULL); + GNUNET_free (pth); } @@ -166,17 +355,83 @@ GSF_peer_transmit_ (struct GSF_ConnectedPeer *peer, GSF_GetMessageCallback gmc, void *gmc_cls) { - // FIXME - return NULL; + struct GSF_ConnectedPeer *cp; + struct GSF_PeerTransmitHandle *pth; + struct GSF_PeerTransmitHandle *pos; + struct GSF_PeerTransmitHandle *prev; + struct GNUNET_PeerIdentity target; + + cp = GNUNET_CONTAINER_multihashmap_get (cp_map, + &peer->hashPubKey); + GNUNET_assert (NULL != cp); + pth = GNUNET_malloc (sizeof (struct GSF_PeerTransmitHandle)); + pth->transmission_request_start_time = GNUNET_TIME_absolute_now (); + pth->timeout = GNUNET_TIME_relative_to_absolute (timeout); + pth->gmc = gmc; + pth->gmc_cls = gmc_cls; + pth->size = size; + pth->is_query = is_query; + pth->priority = priority; + pth->cp = cp; + /* insertion sort (by priority, descending) */ + prev = NULL; + pos = cp->pth_head; + while ( (pos != NULL) && + (pos->priority > priority) ) + { + prev = pos; + pos = pos->next; + } + if (prev == NULL) + GNUNET_CONTAINER_DLL_insert_head (cp->pth_head, + cp->pth_tail, + pth); + else + GNUNET_CONTAINER_DLL_insert_after (cp->pth_head, + cp->pth_tail, + prev, + pth); + GNUNET_PEER_resolve (cp->pid, + &target); + pth->cth = GNUNET_CORE_notify_transmit_ready (core, + priority, + timeout, + &target, + size, + &peer_transmit_ready_cb, + pth); + /* pth->cth could be NULL here, that's OK, we'll try again + later... */ + if (pth->cth == NULL) + pth->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout, + &peer_transmit_timeout, + pth); + return pth; } /** * Cancel an earlier request for transmission. + * + * @param pth request to cancel */ void GSF_peer_transmit_cancel_ (struct GSF_PeerTransmitHandle *pth) { + struct GSF_PeerTransmitHandle *pth = cls; + struct GSF_ConnectedPeer *cp; + + if (pth->timeout_task != GNUNET_SCHEDULER_NO_TASK) + { + GNUNET_SCHEDULER_cancel (pth->timeout_task); + pth->timeout_task = GNUNET_SCHEDULER_NO_TASK; + } + cp = pth->cp; + GNUNET_CONTAINER_DLL_remove (cp->pth_head, + cp->pth_tail, + pth); + // FIXME: update 'cp' counters! + GNUNET_free (pth); } @@ -196,6 +451,7 @@ GSF_peer_update_performance_ (struct GSF_ConnectedPeer *peer, const struct GSF_LocalClient *initiator_client, const struct GSF_ConnectedPeer *initiator_peer) { + // FIXME... } @@ -218,6 +474,12 @@ GSF_peer_status_handler_ (void *cls, struct GNUNET_TIME_Absolute timeout, const struct GNUNET_TRANSPORT_ATS_Information *atsi) { + struct GSF_ConnectedPeer *cp; + + cp = GNUNET_CONTAINER_multihashmap_get (cp_map, + &peer->hashPubKey); + GNUNET_assert (NULL != cp); + update_atsi (cp, atsi); } @@ -232,6 +494,57 @@ void GSF_peer_disconnect_handler_ (void *cls, const struct GNUNET_PeerIdentity *peer) { + struct GSF_ConnectedPeer *cp; + + cp = GNUNET_CONTAINER_multihashmap_get (cp_map, + &peer->hashPubKey); + GNUNET_assert (NULL != cp); + GNUNET_CONTAINER_multihashmap_remove (cp_map, + &peer->hashPubKey, + cp); + // FIXME: more cleanup + GNUNET_free (cp); +} + + +/** + * Closure for 'call_iterator'. + */ +struct IterationContext +{ + /** + * Function to call on each entry. + */ + GSF_ConnectedPeerIterator it; + + /** + * Closure for 'it'. + */ + void *it_cls; +}; + + +/** + * Function that calls the callback for each peer. + * + * @param cls the 'struct IterationContext*' + * @param key identity of the peer + * @param value the 'struct GSF_ConnectedPeer*' + * @return GNUNET_YES to continue iteration + */ +static int +call_iterator (void *cls, + const GNUNET_HashCode *key, + void *value) +{ + struct IterationContext *ic = cls; + struct GSF_ConnectedPeer *cp = value; + + ic->it (ic->it_cls, + (const struct GNUNET_PeerIdentity*) key, + cp, + &cp->ppd); + return GNUNET_YES; } @@ -245,6 +558,13 @@ void GSF_iterate_connected_peers_ (GSF_ConnectedPeerIterator it, void *it_cls) { + struct IterationContext ic; + + ic.it = it; + ic.it_cls = it_cls; + GNUNET_CONTAINER_multihashmap_iterate (cp_map, + &call_iterator, + &ic); } @@ -280,5 +600,131 @@ GSF_connected_peer_reserve_ (struct GSF_ConnectedPeer *cp, } +/** + * Write host-trust information to a file - flush the buffer entry! + * + * @param cls closure, not used + * @param key host identity + * @param value the 'struct GSF_ConnectedPeer' to flush + * @return GNUNET_OK to continue iteration + */ +static int +flush_trust (void *cls, + const GNUNET_HashCode *key, + void *value) +{ + struct GSF_ConnectedPeer *cp = value; + char *fn; + uint32_t trust; + struct GNUNET_PeerIdentity pid; + + if (cp->trust == cp->disk_trust) + return GNUNET_OK; /* unchanged */ + GNUNET_PEER_resolve (cp->pid, + &pid); + fn = get_trust_filename (&pid); + if (cp->trust == 0) + { + if ((0 != UNLINK (fn)) && (errno != ENOENT)) + GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING | + GNUNET_ERROR_TYPE_BULK, "unlink", fn); + } + else + { + trust = htonl (cp->trust); + if (sizeof(uint32_t) == GNUNET_DISK_fn_write (fn, &trust, + sizeof(uint32_t), + GNUNET_DISK_PERM_USER_READ | GNUNET_DISK_PERM_USER_WRITE + | GNUNET_DISK_PERM_GROUP_READ | GNUNET_DISK_PERM_OTHER_READ)) + cp->disk_trust = cp->trust; + } + GNUNET_free (fn); + return GNUNET_OK; +} + + +/** + * Call this method periodically to flush trust information to disk. + * + * @param cls closure, not used + * @param tc task context, not used + */ +static void +cron_flush_trust (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + + if (NULL == cp_map) + return; + GNUNET_CONTAINER_multihashmap_iterate (cp_map, + &flush_trust, + NULL); + if (NULL == tc) + return; + if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) + return; + GNUNET_SCHEDULER_add_delayed (TRUST_FLUSH_FREQ, + &cron_flush_trust, + NULL); +} + + +/** + * Initialize peer management subsystem. + * + * @param cfg configuration to use + */ +void +GSF_connected_peer_init_ (struct GNUNET_CONFIGURATION_Handle *cfg) +{ + cp_map = GNUNET_CONTAINER_multihashmap_create (128); + GNUNET_assert (GNUNET_OK == + GNUNET_CONFIGURATION_get_value_filename (cfg, + "fs", + "TRUST", + &trustDirectory)); + GNUNET_DISK_directory_create (trustDirectory); + GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_HIGH, + &cron_flush_trust, NULL); +} + + +/** + * Iterator to free peer entries. + * + * @param cls closure, unused + * @param key current key code + * @param value value in the hash map (peer entry) + * @return GNUNET_YES (we should continue to iterate) + */ +static int +clean_peer (void *cls, + const GNUNET_HashCode * key, + void *value) +{ + GSF_peer_disconnect_handler_ (NULL, + (const struct GNUNET_PeerIdentity*) key); + return GNUNET_YES; +} + + +/** + * Shutdown peer management subsystem. + */ +void +GSF_connected_peer_done_ () +{ + cron_flush_trust (NULL, NULL); + GNUNET_CONTAINER_multihashmap_iterate (cp_peers, + &clean_peer, + NULL); + GNUNET_CONTAINER_multihashmap_destroy (cp_map); + cp_map = NULL; + GNUNET_free (trustDirectory); + trustDirectory = NULL; +} + + + #endif /* end of gnunet-service-fs_cp.h */ diff --git a/src/fs/gnunet-service-fs_cp.h b/src/fs/gnunet-service-fs_cp.h index 8886199b5..0c3652ae2 100644 --- a/src/fs/gnunet-service-fs_cp.h +++ b/src/fs/gnunet-service-fs_cp.h @@ -186,6 +186,8 @@ GSF_peer_transmit_ (struct GSF_ConnectedPeer *peer, /** * Cancel an earlier request for transmission. + * + * @param pth request to cancel */ void GSF_peer_transmit_cancel_ (struct GSF_PeerTransmitHandle *pth); @@ -285,5 +287,21 @@ GSF_connected_peer_reserve_ (struct GSF_ConnectedPeer *cp, void *rc_cls); +/** + * Initialize peer management subsystem. + * + * @param cfg configuration to use + */ +void +GSF_connected_peer_init_ (struct GNUNET_CONFIGURATION_Handle *cfg); + + +/** + * Shutdown peer management subsystem. + */ +void +GSF_connected_peer_done_ (void); + + #endif /* end of gnunet-service-fs_cp.h */ -- 2.25.1