*/
static unsigned int newly_found_peers;
-/**
- * Handle to the datacache service (for inserting/retrieving data)
- */
-static struct GNUNET_DATACACHE_Handle *datacache;
-
/**
* Handle for the statistics service.
*/
}
-/**
- * Iterator for local get request results,
- *
- * @param cls closure for iterator, a DatacacheGetContext
- * @param exp when does this value expire?
- * @param key the key this data is stored under
- * @param size the size of the data identified by key
- * @param data the actual data
- * @param type the type of the data
- *
- * @return GNUNET_OK to continue iteration, anything else
- * to stop iteration.
- */
-static int
-datacache_get_iterator (void *cls, struct GNUNET_TIME_Absolute exp,
- const GNUNET_HashCode * key, size_t size,
- const char *data, enum GNUNET_BLOCK_Type type)
-{
- struct DHT_MessageContext *msg_ctx = cls;
- struct DHT_MessageContext new_msg_ctx;
- struct GNUNET_DHT_GetResultMessage *get_result;
- enum GNUNET_BLOCK_EvaluationResult eval;
- const struct DHTPutEntry *put_entry;
- int get_size;
- char *path_offset;
-
-#if DEBUG_DHT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "`%s:%s': Received `%s' response from datacache\n", my_short_id,
- "DHT", "GET");
-#endif
-
- put_entry = (const struct DHTPutEntry *) data;
-
- if (size !=
- sizeof (struct DHTPutEntry) + put_entry->data_size +
- (put_entry->path_length * sizeof (struct GNUNET_PeerIdentity)))
- {
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Path + data size doesn't add up for data inserted into datacache!\nData size %d, path length %d, expected %d, got %d\n",
- put_entry->data_size, put_entry->path_length,
- sizeof (struct DHTPutEntry) + put_entry->data_size +
- (put_entry->path_length * sizeof (struct GNUNET_PeerIdentity)),
- size);
- msg_ctx->do_forward = GNUNET_NO;
- return GNUNET_OK;
- }
-
- eval =
- GNUNET_BLOCK_evaluate (block_context, type, key, &msg_ctx->reply_bf,
- msg_ctx->reply_bf_mutator, msg_ctx->xquery,
- msg_ctx->xquery_size, &put_entry[1],
- put_entry->data_size);
-
- switch (eval)
- {
- case GNUNET_BLOCK_EVALUATION_OK_LAST:
- msg_ctx->do_forward = GNUNET_NO;
- case GNUNET_BLOCK_EVALUATION_OK_MORE:
- memcpy (&new_msg_ctx, msg_ctx, sizeof (struct DHT_MessageContext));
- if (GNUNET_DHT_RO_RECORD_ROUTE ==
- (msg_ctx->msg_options & GNUNET_DHT_RO_RECORD_ROUTE))
- {
- new_msg_ctx.msg_options = GNUNET_DHT_RO_RECORD_ROUTE;
- }
-
- get_size =
- sizeof (struct GNUNET_DHT_GetResultMessage) + put_entry->data_size +
- (put_entry->path_length * sizeof (struct GNUNET_PeerIdentity));
- get_result = GNUNET_malloc (get_size);
- get_result->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET_RESULT);
- get_result->header.size = htons (get_size);
- get_result->expiration = GNUNET_TIME_absolute_hton (exp);
- get_result->type = htons (type);
- get_result->put_path_length = htons (put_entry->path_length);
- path_offset = (char *) &put_entry[1];
- path_offset += put_entry->data_size;
- /* Copy the actual data and the path_history to the end of the get result */
- memcpy (&get_result[1], &put_entry[1],
- put_entry->data_size +
- (put_entry->path_length * sizeof (struct GNUNET_PeerIdentity)));
- new_msg_ctx.peer = my_identity;
- new_msg_ctx.bloom = NULL;
- new_msg_ctx.hop_count = 0;
- new_msg_ctx.importance = DHT_DEFAULT_P2P_IMPORTANCE + 2; /* Make result routing a higher priority */
- new_msg_ctx.timeout = DHT_DEFAULT_P2P_TIMEOUT;
- increment_stats (STAT_GET_RESPONSE_START);
- route_result_message (&get_result->header, &new_msg_ctx);
- GNUNET_free (get_result);
- break;
- case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
-#if DEBUG_DHT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s:%s': Duplicate block error\n",
- my_short_id, "DHT");
-#endif
- break;
- case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
-#if DEBUG_DHT
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "`%s:%s': Invalid request error\n",
- my_short_id, "DHT");
-#endif
- break;
- case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
-#if DEBUG_DHT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "`%s:%s': Valid request, no results.\n", my_short_id, "DHT");
-#endif
- GNUNET_break (0);
- break;
- case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
- GNUNET_break_op (0);
- msg_ctx->do_forward = GNUNET_NO;
- break;
- case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
-#if DEBUG_DHT
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "`%s:%s': Unsupported block type (%u) in response!\n",
- my_short_id, "DHT", type);
-#endif
- /* msg_ctx->do_forward = GNUNET_NO; // not sure... */
- break;
- }
- return GNUNET_OK;
-}
/**
increment_stats (STAT_GETS);
results = 0;
msg_ctx->do_forward = GNUNET_YES;
- if (datacache != NULL)
- results =
- GNUNET_DATACACHE_get (datacache, &msg_ctx->key, type,
- &datacache_get_iterator, msg_ctx);
#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"`%s:%s': Found %d results for `%s' request uid %llu\n",
#endif
increment_stats (STAT_PUTS_INSERTED);
- if (datacache != NULL)
- {
- /* Put size is actual data size plus struct overhead plus path length (if any) */
- put_size =
- data_size + sizeof (struct DHTPutEntry) +
- (msg_ctx->path_history_len * sizeof (struct GNUNET_PeerIdentity));
- put_entry = GNUNET_malloc (put_size);
- put_entry->data_size = data_size;
- put_entry->path_length = msg_ctx->path_history_len;
- /* Copy data to end of put entry */
- memcpy (&put_entry[1], &put_msg[1], data_size);
- if (msg_ctx->path_history_len > 0)
- {
- /* Copy path after data */
- path_offset = (char *) &put_entry[1];
- path_offset += data_size;
- memcpy (path_offset, msg_ctx->path_history,
- msg_ctx->path_history_len * sizeof (struct GNUNET_PeerIdentity));
- }
-
- ret =
- GNUNET_DATACACHE_put (datacache, &msg_ctx->key, put_size,
- (const char *) put_entry, put_type,
- GNUNET_TIME_absolute_ntoh (put_msg->expiration));
- GNUNET_free (put_entry);
- }
- else
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "`%s:%s': %s request received, but have no datacache!\n",
- my_short_id, "DHT", "PUT");
route_message (msg, msg_ctx);
}
transport_handle = NULL;
}
GDS_NEIGHBOURS_done ();
+ GDS_DATACACHE_done ();
GDS_NSE_done ();
for (bucket_count = lowest_bucket; bucket_count < MAX_BUCKETS; bucket_count++)
{
delete_peer (pos, bucket_count);
}
}
- if (datacache != NULL)
- {
-#if DEBUG_DHT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%s:%s Destroying datacache!\n",
- my_short_id, "DHT");
-#endif
- GNUNET_DATACACHE_destroy (datacache);
- datacache = NULL;
- }
if (stats != NULL)
{
GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
unsigned long long temp_config_num;
cfg = c;
- datacache = GNUNET_DATACACHE_create (cfg, "dhtcache");
+ GDS_DATACACHE_init ();
coreAPI = GNUNET_CORE_connect (cfg, /* Main configuration */
DEFAULT_CORE_QUEUE_SIZE, /* queue size */
NULL, /* Closure passed to DHT functions */
--- /dev/null
+/*
+ This file is part of GNUnet.
+ (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ Boston, MA 02111-1307, USA.
+*/
+
+/**
+ * @file dht/gnunet-service-dht_datacache.c
+ * @brief GNUnet DHT service's datacache integration
+ * @author Christian Grothoff
+ * @author Nathan Evans
+ */
+#include "gnunet-service-dht_datacache.h"
+
+
+/**
+ * Handle to the datacache service (for inserting/retrieving data)
+ */
+static struct GNUNET_DATACACHE_Handle *datacache;
+
+
+/**
+ * Entry for inserting data into datacache from the DHT.
+ */
+struct DHTPutEntry
+{
+ /**
+ * Size of data.
+ */
+ uint16_t data_size;
+
+ /**
+ * Length of recorded path.
+ */
+ uint16_t path_length;
+
+ /* PATH ENTRIES */
+
+ /* PUT DATA */
+
+};
+
+
+/**
+ * Handle a datum we've received from another peer. Cache if
+ * possible.
+ *
+ * @param expiration when will the reply expire
+ * @param key the query this reply is for
+ * @param put_path_length number of peers in 'put_path'
+ * @param put_path path the reply took on put
+ * @param type type of the reply
+ * @param data_size number of bytes in 'data'
+ * @param data application payload data
+ */
+void
+GDS_DATACACHE_handle_put (struct GNUNET_TIME_Absolute expiration,
+ const GNUNET_HashCode *key,
+ unsigned int put_path_length,
+ const struct GNUNET_PeerIdentity *put_path,
+ uint32_t type,
+ size_t data_size,
+ const void *data)
+{
+ size_t plen = data_size + put_path_length * sizeof(struct GNUNET_PeerIdentity) + sizeof(struct DHTPutEntry);
+ char buf[plen];
+ struct DHTPutEntry *pe;
+ struct GNUNET_PeerIdentity *pp;
+ char *path_offset;
+
+ if (datacache == NULL)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "%s request received, but have no datacache!\n",
+ "PUT");
+ return;
+ }
+ if (data_size >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
+ {
+ GNUNET_break (0);
+ return;
+ }
+ /* Put size is actual data size plus struct overhead plus path length (if any) */
+ pe = (struct DHTPutEntry *) buf;
+ pe->data_size = htons (data_size);
+ pe->path_length = htons ((uint16_t) put_path_length);
+ pp = (struct GNUNET_PeerIdentity *) &pe[1];
+ memcpy (pp, put_path, put_path_length * sizeof (struct GNUNET_PeerIdentity));
+ memcpy (&pp[put_path_length],
+ data, data_size);
+ (void) GNUNET_DATACACHE_put (datacache, key,
+ plen, (const char *) pe, type,
+ expiration);
+}
+
+
+/**
+ * Context containing information about a GET request.
+ */
+struct GetRequestContext
+{
+ /**
+ * extended query (see gnunet_block_lib.h).
+ */
+ const void *xquery;
+
+ /**
+ * Bloomfilter to filter out duplicate replies (updated)
+ */
+ struct GNUNET_CONTAINER_BloomFilter **reply_bf;
+
+ /**
+ * The key this request was about
+ */
+ GNUNET_HashCode key;
+
+ /**
+ * Number of bytes in xquery.
+ */
+ size_t xquery_size;
+
+ /**
+ * Mutator value for the reply_bf, see gnunet_block_lib.h
+ */
+ uint32_t reply_bf_mutator;
+
+};
+
+
+/**
+ * Iterator for local get request results,
+ *
+ * @param cls closure for iterator, a DatacacheGetContext
+ * @param exp when does this value expire?
+ * @param key the key this data is stored under
+ * @param size the size of the data identified by key
+ * @param data the actual data
+ * @param type the type of the data
+ *
+ * @return GNUNET_OK to continue iteration, anything else
+ * to stop iteration.
+ */
+static int
+datacache_get_iterator (void *cls, struct GNUNET_TIME_Absolute exp,
+ const GNUNET_HashCode * key, size_t size,
+ const char *data, enum GNUNET_BLOCK_Type type)
+{
+ struct GetRequestContext *ctx = cls;
+ const struct DHTPutEntry *pe;
+ const struct GNUNET_PeerIdentity *pp;
+ const char *data;
+ size_t data_size;
+ uint16_t put_path_length;
+ enum GNUNET_BLOCK_EvaluationResult eval;
+
+ pe = (const struct DHTPutEntry *) data;
+ put_path_length = ntohs (pe->path_length);
+ data_size = ntohs (pe->data_size);
+
+ if (size !=
+ sizeof (struct DHTPutEntry) + data_size +
+ (put_path_length * sizeof (struct GNUNET_PeerIdentity)))
+ {
+ GNUNET_break (0);
+ return GNUNET_OK;
+ }
+ pp = (const struct GNUNET_PeerIdentity *) &pe[1];
+ data = (const char*) &pp[put_path_length];
+ eval =
+ GNUNET_BLOCK_evaluate (block_context, type, key,
+ ctx->reply_bf,
+ ctx->reply_bf_mutator,
+ ctx->xquery,
+ ctx->xquery_size,
+ data,
+ data_size);
+ switch (eval)
+ {
+ case GNUNET_BLOCK_EVALUATION_OK_LAST:
+ case GNUNET_BLOCK_EVALUATION_OK_MORE:
+ /* forward to local clients */
+ GDS_CLIENT_handle_reply (exp,
+ key,
+ 0, NULL,
+ put_path_length, pp,
+ type, data_size, data);
+ /* forward to other peers */
+ GDS_NEIGHBOURS_handle_reply (type, exp,
+ key, put_path_length, pp,
+ 0, NULL, data, data_size);
+ break;
+ case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
+ break;
+ case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
+ break;
+ case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
+ GNUNET_break (0);
+ break;
+ case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Unsupported block type (%u) in local response!\n",
+ type);
+ break;
+ }
+ return GNUNET_OK;
+}
+
+
+/**
+ * Context containing information about a GET request.
+ */
+struct GetRequestContext
+{
+ /**
+ * extended query (see gnunet_block_lib.h).
+ */
+ const void *xquery;
+
+ /**
+ * Bloomfilter to filter out duplicate replies (updated)
+ */
+ struct GNUNET_CONTAINER_BloomFilter **reply_bf;
+
+ /**
+ * The key this request was about
+ */
+ GNUNET_HashCode key;
+
+ /**
+ * Number of bytes in xquery.
+ */
+ size_t xquery_size;
+
+ /**
+ * Mutator value for the reply_bf, see gnunet_block_lib.h
+ */
+ uint32_t reply_bf_mutator;
+
+};
+
+
+/**
+ * Handle a GET request we've received from another peer.
+ *
+ * @param key the query
+ * @param type requested data type
+ * @param xquery extended query
+ * @param xquery_size number of bytes in xquery
+ * @param reply_bf where the reply bf is (to be) stored, possibly updated, can be NULL
+ * @param reply_bf_mutator mutation value for reply_bf
+ */
+void
+GDS_DATACACHE_handle_get (const GNUNET_HashCode *key,
+ uint32_t type,
+ const void *xquery,
+ size_t xquery_size,
+ struct GNUNET_CONTAINER_BloomFilter **reply_bf,
+ uint32_t reply_bf_mutator)
+{
+ struct GetRequestContext ctx;
+
+ if (datacache == NULL)
+ return;
+ ctx.key = *key;
+ ctx.xquery = xquery;
+ ctx.xquery_size = xquery_size;
+ ctx.reply_bf = reply_bf;
+ ctx.reply_bf_mutator = reply_bf_mutator;
+ (void) GNUNET_DATACACHE_get (datacache, &msg_ctx->key, type,
+ &datacache_get_iterator, &ctx);
+}
+
+
+/**
+ * Initialize datacache subsystem.
+ */
+void
+GDS_DATACACHE_init ()
+{
+ datacache = GNUNET_DATACACHE_create (cfg, "dhtcache");
+}
+
+
+/**
+ * Shutdown datacache subsystem.
+ */
+void
+GDS_DATACACHE_done ()
+{
+ if (datacache != NULL)
+ {
+ GNUNET_DATACACHE_destroy (datacache);
+ datacache = NULL;
+ }
+}
+
+
+/* end of gnunet-service-dht_datacache.c */
#include "gnunet_dht_service.h"
#include "gnunet_statistics_service.h"
#include "dht.h"
+#include "gnunet-service-dht_datacache.h"
#include <fenv.h>
/**
uint32_t desired_replication_level GNUNET_PACKED;
/**
- * Generic route path length for a message in the
- * DHT that arrived at a peer and generated
- * a reply. Copied to the end of this message.
+ * Length of the PUT path that follows (if tracked).
*/
- uint32_t outgoing_path_length GNUNET_PACKED;
+ uint32_t put_path_length GNUNET_PACKED;
+
+ /**
+ * When does the content expire?
+ */
+ struct GNUNET_TIME_AbsoluteNBO expiration_time;
/**
* Bloomfilter (for peer identities) to stop circular routes
};
+/**
+ * P2P Result message
+ */
+struct PeerResultMessage
+{
+ /**
+ * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Content type.
+ */
+ uint32_t type GNUNET_PACKED;
+
+ /**
+ * Length of the PUT path that follows (if tracked).
+ */
+ uint32_t put_path_length GNUNET_PACKED;
+
+ /**
+ * Length of the GET path that follows (if tracked).
+ */
+ uint32_t get_path_length GNUNET_PACKED;
+
+ /**
+ * The key of the corresponding GET request.
+ */
+ GNUNET_HashCode key;
+
+ /* put path (if tracked) */
+
+ /* get path (if tracked) */
+
+ /* Payload */
+
+};
+
+
/**
* P2P GET message
*/
struct PeerGetMessage
{
/**
- * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_PUT
+ * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_GET
*/
struct GNUNET_MessageHeader header;
struct P2PPendingMessage *prev;
/**
- * Message importance level.
+ * Message importance level. FIXME: used? useful?
*/
unsigned int importance;
/**
- * Time when this request was scheduled to be sent.
- */
- struct GNUNET_TIME_Absolute scheduled;
-
- /**
- * How long to wait before sending message.
+ * When does this message time out?
*/
- struct GNUNET_TIME_Relative timeout;
+ struct GNUNET_TIME_Absolute timeout;
/**
* Actual message to be sent, allocated at the end of the struct:
struct PeerInfo *prev;
/**
- * Count of outstanding messages for peer.
+ * Count of outstanding messages for peer. FIXME: NEEDED?
+ * FIXME: bound queue size!?
*/
unsigned int pending_count;
/**
- * Perform a PUT operation. // FIXME: document if this is only
- * routing or also storage and/or even local client notification!
+ * Called when core is ready to send a message we asked for
+ * out to the destination.
+ *
+ * @param cls the 'struct PeerInfo' of the target peer
+ * @param size number of bytes available in buf
+ * @param buf where the callee should write the message
+ * @return number of bytes written to buf
+ */
+static size_t
+core_transmit_notify (void *cls, size_t size, void *buf)
+{
+ struct PeerInfo *peer = cls;
+ char *cbuf = buf;
+ struct P2PPendingMessage *pending;
+ size_t off;
+ size_t msize;
+
+ peer->th = NULL;
+ if (buf == NULL)
+ {
+ /* client disconnected */
+ return 0;
+ }
+ if (peer->head == NULL)
+ {
+ /* no messages pending */
+ return 0;
+ }
+ off = 0;
+ while ( (NULL != (pending = peer->head)) &&
+ (size - off >= (msize = ntohs (pending->msg->size))) )
+ {
+ memcpy (&cbuf[off], pending->msg, msize);
+ off += msize;
+ peer->pending_count--;
+ GNUNET_CONTAINER_DLL_remove (peer->head, peer->tail, pending);
+ GNUNET_free (pending);
+ }
+ if (peer->head != NULL)
+ peer->th
+ = GNUNET_CORE_notify_transmit_ready (coreAPI, GNUNET_YES,
+ pending->importance,
+ pending->timeout, &peer->id, msize,
+ &core_transmit_notify, peer);
+
+ return off;
+}
+
+
+/**
+ * Transmit all messages in the peer's message queue.
+ *
+ * @param peer message queue to process
+ */
+static void
+process_peer_queue (struct PeerInfo *peer)
+{
+ struct P2PPendingMessage *pending;
+
+ if (NULL != (pending = peer->head))
+ return;
+ if (NULL != peer->th)
+ return;
+ peer->th
+ = GNUNET_CORE_notify_transmit_ready (coreAPI, GNUNET_YES,
+ pending->importance,
+ pending->timeout, &peer->id,
+ ntohs (pending->msg->size),
+ &core_transmit_notify, peer);
+}
+
+
+/**
+ * To how many peers should we (on average) forward the request to
+ * obtain the desired target_replication count (on average).
+ *
+ * FIXME: double-check that this is fine
+ *
+ * @param hop_count number of hops the message has traversed
+ * @param target_replication the number of total paths desired
+ * @return Some number of peers to forward the message to
+ */
+static unsigned int
+get_forward_count (uint32_t hop_count,
+ uint32_t target_replication)
+{
+ uint32_t random_value;
+ uint32_t forward_count;
+ float target_value;
+
+ /* bound by system-wide maximum */
+ target_replication = GNUNET_MIN (16 /* FIXME: use named constant */,
+ target_replication);
+ if (hop_count > log_of_network_size_estimate * 2.0)
+ {
+ /* Once we have reached our ideal number of hops, only forward to 1 peer */
+ return 1;
+ }
+ target_value =
+ 1 + (target_replication - 1.0) / (log_of_network_size_estimate +
+ ((float) (target_replication - 1.0) *
+ hop_count));
+ /* Set forward count to floor of target_value */
+ forward_count = (uint32_t) target_value;
+ /* Subtract forward_count (floor) from target_value (yields value between 0 and 1) */
+ target_value = target_value - forward_count;
+ random_value =
+ GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, UINT32_MAX);
+ if (random_value < (target_value * UINT32_MAX))
+ forward_count++;
+ return forward_count;
+}
+
+
+/**
+ * Check whether my identity is closer than any known peers. If a
+ * non-null bloomfilter is given, check if this is the closest peer
+ * that hasn't already been routed to.
+ * FIXME: needed?
+ *
+ * @param key hash code to check closeness to
+ * @param bloom bloomfilter, exclude these entries from the decision
+ * @return GNUNET_YES if node location is closest,
+ * GNUNET_NO otherwise.
+ */
+static int
+am_closest_peer (const GNUNET_HashCode *key,
+ const struct GNUNET_CONTAINER_BloomFilter *bloom)
+{
+ int bits;
+ int other_bits;
+ int bucket_num;
+ int count;
+ struct PeerInfo *pos;
+ unsigned int my_distance;
+
+ if (0 == memcmp (&my_identity.hashPubKey, key, sizeof (GNUNET_HashCode)))
+ return GNUNET_YES;
+ bucket_num = find_current_bucket (key);
+ bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey, key);
+ my_distance = distance (&my_identity.hashPubKey, key);
+ pos = k_buckets[bucket_num].head;
+ count = 0;
+ while ((pos != NULL) && (count < bucket_size))
+ {
+ if ((bloom != NULL) &&
+ (GNUNET_YES ==
+ GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey)))
+ {
+ pos = pos->next;
+ continue; /* Skip already checked entries */
+ }
+ other_bits = GNUNET_CRYPTO_hash_matching_bits (&pos->id.hashPubKey, key);
+ if (other_bits > bits)
+ return GNUNET_NO;
+ if (other_bits == bits) /* We match the same number of bits */
+ return GNUNET_YES;
+ pos = pos->next;
+ }
+ /* No peers closer, we are the closest! */
+ return GNUNET_YES;
+}
+
+
+/**
+ * Select a peer from the routing table that would be a good routing
+ * destination for sending a message for "key". The resulting peer
+ * must not be in the set of blocked peers.<p>
+ *
+ * Note that we should not ALWAYS select the closest peer to the
+ * target, peers further away from the target should be chosen with
+ * exponentially declining probability.
+ *
+ * FIXME: double-check that this is fine
+ *
+ *
+ * @param key the key we are selecting a peer to route to
+ * @param bloom a bloomfilter containing entries this request has seen already
+ * @param hops how many hops has this message traversed thus far
+ * @return Peer to route to, or NULL on error
+ */
+static struct PeerInfo *
+select_peer (const GNUNET_HashCode *key,
+ const struct GNUNET_CONTAINER_BloomFilter *bloom,
+ uint32_t hops)
+{
+ unsigned int bc;
+ unsigned int count;
+ unsigned int selected;
+ struct PeerInfo *pos;
+ unsigned int distance;
+ unsigned int largest_distance;
+ struct PeerInfo *chosen;
+
+ if (hops >= log_of_network_size_estimate)
+ {
+ /* greedy selection (closest peer that is not in bloomfilter) */
+ largest_distance = 0;
+ chosen = NULL;
+ for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++)
+ {
+ pos = k_buckets[bc].head;
+ count = 0;
+ while ((pos != NULL) && (count < bucket_size))
+ {
+ /* If we are doing strict Kademlia routing, then checking the bloomfilter is basically cheating! */
+ if (GNUNET_NO ==
+ GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
+ {
+ distance = inverse_distance (key, &pos->id.hashPubKey);
+ if (distance > largest_distance)
+ {
+ chosen = pos;
+ largest_distance = distance;
+ }
+ }
+ count++;
+ pos = pos->next;
+ }
+ }
+ return chosen;
+ }
+
+ /* select "random" peer */
+ /* count number of peers that are available and not filtered */
+ count = 0;
+ for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++)
+ {
+ pos = k_buckets[bc].head;
+ while ((pos != NULL) && (count < bucket_size))
+ {
+ if (GNUNET_YES ==
+ GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
+ {
+ pos = pos->next;
+ continue; /* Ignore bloomfiltered peers */
+ }
+ count++;
+ pos = pos->next;
+ }
+ }
+ if (count == 0) /* No peers to select from! */
+ {
+ return NULL;
+ }
+ /* Now actually choose a peer */
+ selected = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, count);
+ count = 0;
+ for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++)
+ {
+ pos = k_buckets[bc].head;
+ while ((pos != NULL) && (count < bucket_size))
+ {
+ if (GNUNET_YES ==
+ GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
+ {
+ pos = pos->next;
+ continue; /* Ignore bloomfiltered peers */
+ }
+ if (0 == selected--)
+ return pos;
+ pos = pos->next;
+ }
+ }
+ GNUNET_break (0);
+ return NULL;
+}
+
+
+/**
+ * Compute the set of peers that the given request should be
+ * forwarded to.
+ *
+ * @param key routing key
+ * @param bloom bloom filter excluding peers as targets, all selected
+ * peers will be added to the bloom filter
+ * @param hop_count number of hops the request has traversed so far
+ * @param target_replication desired number of replicas
+ * @param targets where to store an array of target peers (to be
+ * free'd by the caller)
+ * @return number of peers returned in 'targets'.
+ */
+static unsigned int
+get_target_peers (const GNUNET_HashCode *key,
+ struct GNUNET_CONTAINER_BloomFilter *bloom,
+ uint32_t hop_count,
+ uint32_t target_replication,
+ struct PeerInfo ***targets)
+{
+ unsigned int ret;
+ unsigned int off;
+ struct PeerInfo **rtargets;
+ struct PeerInfo *nxt;
+
+ ret = get_forward_count (hop_count, target_replication);
+ if (ret == 0)
+ {
+ *targets = NULL;
+ return 0;
+ }
+ rtargets = GNUNET_malloc (sizeof (struct PeerInfo*) * ret);
+ off = 0;
+ while (ret-- > 0)
+ {
+ nxt = select_peer (key, bloom, hop_count);
+ if (nxt == NULL)
+ break;
+ rtargets[off++] = nxt;
+ GNUNET_CONTAINER_bloomfilter_add (bloom, &nxt->id.hashPubKey);
+ }
+ if (0 == off)
+ {
+ GNUNET_free (rtargets);
+ *targets = NULL;
+ return 0;
+ }
+ *targets = rtargets;
+ return off;
+}
+
+
+/**
+ * Perform a PUT operation. Forwards the given request to other
+ * peers. Does not store the data locally. Does not give the
+ * data to local clients. May do nothing if this is the only
+ * peer in the network (or if we are the closest peer in the
+ * network).
*
* @param type type of the block
* @param options routing options
* @param desired_replication_level desired replication count
* @param expiration_time when does the content expire
+ * @param hop_count how many hops has this message traversed so far
+ * @param bf Bloom filter of peers this PUT has already traversed
* @param key key for the content
* @param put_path_length number of entries in put_path
* @param put_path peers this request has traversed so far (if tracked)
* @param data_size number of bytes in data
*/
void
-GST_NEIGHBOURS_handle_put (uint32_t type,
+GDS_NEIGHBOURS_handle_put (uint32_t type,
uint32_t options,
uint32_t desired_replication_level,
GNUNET_TIME_Absolute expiration_time,
+ uint32_t hop_count,
+ struct GNUNET_CONTAINER_BloomFilter *bf,
const GNUNET_HashCode *key,
unsigned int put_path_length,
struct GNUNET_PeerIdentity *put_path,
const void *data,
size_t data_size)
{
- // FIXME
+ unsigned int target_count;
+ unsigned int i;
+ struct PeerInfo **targets;
+ struct PeerInfo *target;
+ struct P2PPendingMessage *pending;
+ size_t msize;
+ struct PeerPutMessage *ppm;
+ struct GNUNET_PeerIdentity *pp;
+
+ target_count = get_target_peers (key, bf, hop_count,
+ desired_replication_level,
+ &targets);
+ if (0 == target_count)
+ return;
+ msize = put_path_length * sizeof (struct GNUNET_PeerIdentity) + data_size + sizeof (struct PeerPutMessage);
+ if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
+ {
+ put_path_length = 0;
+ msize = data_size + sizeof (struct PeerPutMessage);
+ }
+ if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
+ {
+ GNUNET_break (0);
+ return;
+ }
+ for (i=0;i<target_count;i++)
+ {
+ target = targets[i];
+ pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize);
+ pending->importance = 0; /* FIXME */
+ pending->timeout = expiration_time;
+ ppm = (struct PeerPutMessage*) &pending[1];
+ pending->msg = &ppm->header;
+ ppm->header.size = htons (msize);
+ ppm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_PUT);
+ ppm->options = htonl (options);
+ ppm->type = htonl (type);
+ ppm->hop_count = htonl (hop_count + 1);
+ ppm->desired_replication_level = htonl (desired_replication_level);
+ ppm->put_path_length = htonl (put_path_length);
+ ppm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time);
+ GNUNET_assert (GNUNET_OK ==
+ GNUNET_CONTAINER_bloomfilter_get_raw_data (bf,
+ ppm->bloomfilter,
+ DHT_BLOOM_SIZE));
+ ppm->key = *key;
+ pp = (const struct GNUNET_PeerIdentity*) &ppm[1];
+ memcpy (pp, put_path, sizeof (struct GNUNET_PeerIdentity) * put_path_length);
+ memcpy (&pp[put_path_length], data, data_size);
+ GNUNET_CONTAINER_DLL_insert (target->head,
+ target->tail,
+ pending);
+ target->pending_count++;
+ process_peer_queue (target);
+ }
+ GNUNET_free (targets);
}
/**
- * Perform a GET operation. // FIXME: document if this is only
- * routing or also state-tracking and/or even local lookup!
+ * Perform a GET operation. Forwards the given request to other
+ * peers. Does not lookup the key locally. May do nothing if this is
+ * the only peer in the network (or if we are the closest peer in the
+ * network).
*
* @param type type of the block
* @param options routing options
* @param desired_replication_level desired replication count
+ * @param hop_count how many hops did this request traverse so far?
* @param key key for the content
* @param xquery extended query
* @param xquery_size number of bytes in xquery
* @param peer_bf filter for peers not to select (again)
*/
void
-GST_NEIGHBOURS_handle_get (uint32_t type,
+GDS_NEIGHBOURS_handle_get (uint32_t type,
uint32_t options,
uint32_t desired_replication_level,
+ uint32_t hop_count,
const GNUNET_HashCode *key,
const void *xquery,
size_t xquery_size,
uint32_t reply_bf_mutator,
const struct GNUNET_CONTAINER_BloomFilter *peer_bf)
{
- // FIXME
+ unsigned int target_count;
+ unsigned int i;
+ struct PeerInfo **targets;
+ struct PeerInfo *target;
+ struct P2PPendingMessage *pending;
+ size_t msize;
+ struct PeerGetMessage *pgm;
+ char *xq;
+ size_t reply_bf_size;
+
+ target_count = get_target_peers (key, peer_bf, hop_count,
+ desired_replication_level,
+ &targets);
+ if (0 == target_count)
+ return;
+ reply_bf_size = GNUNET_CONTAINER_bloomfilter_get_size (reply_bf);
+ msize = xquery_size + sizeof (struct PeerGetMessage) + reply_bf_size;
+ if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
+ {
+ GNUNET_break (0);
+ return;
+ }
+ for (i=0;i<target_count;i++)
+ {
+ target = targets[i];
+ pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize);
+ pending->importance = 0; /* FIXME */
+ pending->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_HOURS); /* FIXME */
+ pgm = (struct PeerGetMessage*) &pending[1];
+ pending->msg = &pgm->header;
+ pgm->header.size = htons (msize);
+ pgm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_GET);
+ pgm->options = htonl (options);
+ pgm->type = htonl (type);
+ pgm->hop_count = htonl (hop_count + 1);
+ pgm->desired_replication_level = htonl (desired_replication_level);
+ pgm->xquery_size = htonl (xquery_size);
+ pgm->bf_mutator = reply_bf_mutator;
+ GNUNET_assert (GNUNET_OK ==
+ GNUNET_CONTAINER_bloomfilter_get_raw_data (peer_bf,
+ pgm->bloomfilter,
+ DHT_BLOOM_SIZE));
+ pgm->key = *key;
+ xq = (const struct GNUNET_PeerIdentity*) &ppm[1];
+ memcpy (xq, xquery, xquery_size);
+ GNUNET_assert (GNUNET_OK ==
+ GNUNET_CONTAINER_bloomfilter_get_raw_data (reply_bf,
+ &xq[xquery_size],
+ reply_bf_size));
+ GNUNET_CONTAINER_DLL_insert (target->head,
+ target->tail,
+ pending);
+ target->pending_count++;
+ process_peer_queue (target);
+ }
+ GNUNET_free (targets);
}
/**
- * Handle a reply (route to origin). FIXME: should this be here?
- * (reply-routing table might be better done elsewhere).
+ * Handle a reply (route to origin). Only forwards the reply back to
+ * other peers waiting for it. Does not do local caching or
+ * forwarding to local clients.
*
* @param type type of the block
* @param options routing options
* @param data_size number of bytes in data
*/
void
-GST_NEIGHBOURS_handle_reply (uint32_t type,
+GDS_NEIGHBOURS_handle_reply (uint32_t type,
uint32_t options,
GNUNET_TIME_Absolute expiration_time,
const GNUNET_HashCode *key,
}
+/**
+ * Closure for 'add_known_to_bloom'.
+ */
+struct BloomConstructorContext
+{
+ /**
+ * Bloom filter under construction.
+ */
+ struct GNUNET_CONTAINER_BloomFilter *bloom;
+
+ /**
+ * Mutator to use.
+ */
+ uint32_t bf_mutator;
+};
+
+
/**
* Add each of the peers we already know to the bloom filter of
* the request so that we don't get duplicate HELLOs.
*
- * @param cls the 'struct GNUNET_CONTAINER_BloomFilter' we're building
+ * @param cls the 'struct BloomConstructorContext'.
* @param key peer identity to add to the bloom filter
* @param value value the peer information (unused)
* @return GNUNET_YES (we should continue to iterate)
static int
add_known_to_bloom (void *cls, const GNUNET_HashCode * key, void *value)
{
- struct GNUNET_CONTAINER_BloomFilter *bloom = cls;
+ struct BloomConstructorContext *ctx = cls;
+ GNUNET_HashCode mh;
- GNUNET_CONTAINER_bloomfilter_add (bloom, key);
+ GNUNET_BLOCK_mingle_hash (key, ctx->bf_mutator, &mh);
+ GNUNET_CONTAINER_bloomfilter_add (ctx->bloom, &mh);
return GNUNET_YES;
}
struct GNUNET_DHT_FindPeerMessage *find_peer_msg;
struct DHT_MessageContext msg_ctx;
struct GNUNET_TIME_Relative next_send_time;
- struct GNUNET_CONTAINER_BloomFilter *temp_bloom;
+ struct BloomConstructorContext bcc;
find_peer_task = GNUNET_SCHEDULER_NO_TASK;
if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
newly_found_peers = 0;
return;
}
-
- // FIXME: build message...
- find_peer_msg = GNUNET_malloc (sizeof (struct GNUNET_DHT_FindPeerMessage));
- find_peer_msg->header.size =
- htons (sizeof (struct GNUNET_DHT_FindPeerMessage));
- find_peer_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_FIND_PEER);
- temp_bloom =
- GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
- GNUNET_CONTAINER_multihashmap_iterate (all_known_peers, &add_known_to_bloom,
- temp_bloom);
- GNUNET_assert (GNUNET_OK ==
- GNUNET_CONTAINER_bloomfilter_get_raw_data (temp_bloom,
- find_peer_msg->
- bloomfilter,
- DHT_BLOOM_SIZE));
- GNUNET_CONTAINER_bloomfilter_free (temp_bloom);
-
- memset (&msg_ctx, 0, sizeof (struct DHT_MessageContext));
- memcpy (&msg_ctx.key, &my_identity.hashPubKey, sizeof (GNUNET_HashCode));
- msg_ctx.unique_id =
- GNUNET_ntohll (GNUNET_CRYPTO_random_u64
- (GNUNET_CRYPTO_QUALITY_STRONG, UINT64_MAX));
- msg_ctx.replication = DHT_DEFAULT_FIND_PEER_REPLICATION;
- msg_ctx.msg_options = GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE;
- msg_ctx.network_size = log_of_network_size_estimate;
- msg_ctx.peer = my_identity;
- msg_ctx.importance = DHT_DEFAULT_FIND_PEER_IMPORTANCE;
- msg_ctx.timeout = DHT_DEFAULT_FIND_PEER_TIMEOUT;
- // FIXME: transmit message...
- demultiplex_message (&find_peer_msg->header, &msg_ctx);
- GNUNET_free (find_peer_msg);
-
+ bcc.bf_mutator = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX);
+ bcc.bloom =
+ GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
+ GNUNET_CONTAINER_multihashmap_iterate (all_known_peers,
+ &add_known_to_bloom,
+ &bcc);
+ // FIXME: pass priority!?
+ GDS_NEIGHBOURS_handle_get (GNUNET_BLOCK_TYPE_DHT_HELLO,
+ GNUNET_DHT_RO_FIND_PEER,
+ 16 /* FIXME: replication level? */,
+ 0,
+ &my_identity.hashPubKey,
+ NULL, 0,
+ bcc.bloom, bcc.bf_mutator, NULL);
+ GNUNET_CONTAINER_bloomfilter_free (bcc.bloom);
/* schedule next round */
newly_found_peers = 0;
next_send_time.rel_value =
/**
- * Core handler for p2p get requests.
+ * Core handler for p2p put requests.
*
* @param cls closure
+ * @param peer sender of the request
* @param message message
* @param peer peer identity this notification is about
* @param atsi performance data
* GNUNET_SYSERR to close it (signal serious error)
*/
static int
-handle_dht_p2p_get (void *cls, const struct GNUNET_PeerIdentity *peer,
+handle_dht_p2p_put (void *cls,
+ const struct GNUNET_PeerIdentity *peer,
const struct GNUNET_MessageHeader *message,
const struct GNUNET_TRANSPORT_ATS_Information
*atsi)
{
- struct GNUNET_DHT_P2PRouteMessage *incoming =
- (struct GNUNET_DHT_P2PRouteMessage *) message;
- struct GNUNET_MessageHeader *enc_msg =
- (struct GNUNET_MessageHeader *) &incoming[1];
- struct DHT_MessageContext *msg_ctx;
- char *route_path;
- int path_size;
-
- if (ntohs (enc_msg->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE - 1)
+ const struct PeerPutMessage *put;
+ const struct GNUNET_PeerIdentity *put_path;
+ const void *payload;
+ uint32_t putlen;
+ uint16_t msize;
+ size_t payload_size;
+ struct GNUNET_CONTAINER_BloomFilter *bf;
+ GNUNET_HashCode test_key;
+
+ msize = ntohs (message->size);
+ if (msize < sizeof (struct PeerPutMessage))
{
GNUNET_break_op (0);
return GNUNET_YES;
}
-
- if (get_max_send_delay ().rel_value > MAX_REQUEST_TIME.rel_value)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Sending of previous replies took too long, backing off!\n");
- increment_stats ("# route requests dropped due to high load");
- decrease_max_send_delay (get_max_send_delay ());
- return GNUNET_YES;
- }
- msg_ctx = GNUNET_malloc (sizeof (struct DHT_MessageContext));
- msg_ctx->bloom =
- GNUNET_CONTAINER_bloomfilter_init (incoming->bloomfilter, DHT_BLOOM_SIZE,
- DHT_BLOOM_K);
- GNUNET_assert (msg_ctx->bloom != NULL);
- msg_ctx->hop_count = ntohl (incoming->hop_count);
- memcpy (&msg_ctx->key, &incoming->key, sizeof (GNUNET_HashCode));
- msg_ctx->replication = ntohl (incoming->desired_replication_level);
- msg_ctx->msg_options = ntohl (incoming->options);
- if (GNUNET_DHT_RO_RECORD_ROUTE ==
- (msg_ctx->msg_options & GNUNET_DHT_RO_RECORD_ROUTE))
+ put = (const struct PeerPutMessage*) message;
+ putlen = ntohl (put->put_path_length);
+ if ( (msize < sizeof (struct PeerPutMessage) + putlen * sizeof (struct GNUNET_PeerIdentity)) ||
+ (putlen > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) )
+ {
+ GNUNET_break_op (0);
+ return GNUNET_YES;
+ }
+ put_path = (const struct GNUNET_PeerIdentity*) &put[1];
+ payload = &put_path[putlen];
+ payload_size = msize - (sizeof (struct PeerPutMessage) +
+ putlen * sizeof (struct GNUNET_PeerIdentity));
+ switch (GNUNET_BLOCK_get_key (block_context,
+ ntohl (put->type),
+ payload, payload_size,
+ &test_key))
{
- path_size =
- ntohl (incoming->outgoing_path_length) *
- sizeof (struct GNUNET_PeerIdentity);
- if (ntohs (message->size) !=
- (sizeof (struct GNUNET_DHT_P2PRouteMessage) + ntohs (enc_msg->size) +
- path_size))
+ case GNUNET_YES:
+ if (0 != memcmp (&test_key, key, sizeof (GNUNET_HashCode)))
{
GNUNET_break_op (0);
- GNUNET_free (msg_ctx);
return GNUNET_YES;
}
- route_path = (char *) &incoming[1];
- route_path = route_path + ntohs (enc_msg->size);
- msg_ctx->path_history =
- GNUNET_malloc (sizeof (struct GNUNET_PeerIdentity) + path_size);
- memcpy (msg_ctx->path_history, route_path, path_size);
- memcpy (&msg_ctx->path_history[path_size], &my_identity,
- sizeof (struct GNUNET_PeerIdentity));
- msg_ctx->path_history_len = ntohl (incoming->outgoing_path_length) + 1;
+ break;
+ case GNUNET_NO:
+ GNUNET_break_op (0);
+ return GNUNET_YES;
+ case GNUNET_SYSERR:
+ /* cannot verify, good luck */
+ break;
}
- msg_ctx->network_size = ntohl (incoming->network_size);
- msg_ctx->peer = *peer;
- msg_ctx->importance = DHT_DEFAULT_P2P_IMPORTANCE;
- msg_ctx->timeout = DHT_DEFAULT_P2P_TIMEOUT;
- demultiplex_message (enc_msg, msg_ctx);
- if (msg_ctx->bloom != NULL)
+ bf = GNUNET_CONTAINER_bloomfilter_init (put->bloomfilter,
+ DHT_BLOOM_SIZE,
+ DHT_BLOOM_K);
{
- GNUNET_CONTAINER_bloomfilter_free (msg_ctx->bloom);
- msg_ctx->bloom = NULL;
+ struct GNUNET_PeerIdentity pp[putlen+1];
+
+ /* extend 'put path' by sender */
+ memcpy (pp, put_path, putlen * sizeof (struct GNUNET_PeerIdentity));
+ pp[putlen] = *sender;
+
+ /* give to local clients */
+ GDS_CLIENT_handle_reply (GNUNET_TIME_absolute_ntoh (put->expiration_time),
+ &put->key,
+ 0, NULL,
+ putlen + 1,
+ pp,
+ ntohl (put->type),
+ payload_size,
+ payload);
+ /* store locally */
+ GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (put->expiration_time),
+ &put->key,
+ putlen + 1, pp,
+ ntohl (put->type),
+ payload_size,
+ payload);
+ /* route to other peers */
+ GDS_NEIGHBOURS_handle_put (ntohl (put->type),
+ ntohl (put->options),
+ ntohl (put->desired_replication_level),
+ GNUNET_TIME_absolute_ntoh (put->expiration_time),
+ ntohl (put->hop_count),
+ bf,
+ putlen + 1, pp,
+ payload,
+ payload_size);
}
- GNUNET_free (msg_ctx);
+ GNUNET_CONTAINER_bloomfilter_free (bf);
return GNUNET_YES;
}
/**
- * Core handler for p2p put requests.
+ * Core handler for p2p get requests.
*
* @param cls closure
+ * @param peer sender of the request
* @param message message
* @param peer peer identity this notification is about
* @param atsi performance data
* GNUNET_SYSERR to close it (signal serious error)
*/
static int
-handle_dht_p2p_put (void *cls, const struct GNUNET_PeerIdentity *peer,
+handle_dht_p2p_get (void *cls, const struct GNUNET_PeerIdentity *peer,
const struct GNUNET_MessageHeader *message,
const struct GNUNET_TRANSPORT_ATS_Information
*atsi)
{
+ // 1) validate GET
+ // 2) store in routing table
+ // 3) check options (i.e. FIND PEER)
+ // 4) local lookup (=> need eval result!)
+ // 5) p2p forwarding
+
+
struct GNUNET_DHT_P2PRouteMessage *incoming =
(struct GNUNET_DHT_P2PRouteMessage *) message;
struct GNUNET_MessageHeader *enc_msg =
char *route_path;
int path_size;
+ // FIXME
if (ntohs (enc_msg->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE - 1)
{
GNUNET_break_op (0);
/**
- * Core handler for p2p route results.
+ * Core handler for p2p result messages.
*
* @param cls closure
* @param message message
const struct GNUNET_TRANSPORT_ATS_Information
*atsi)
{
+ // 1) validate result format
+ // 2) append 'peer' to put path
+ // 3) forward to local clients
+ // 4) p2p routing
const struct GNUNET_DHT_P2PRouteResultMessage *incoming =
(const struct GNUNET_DHT_P2PRouteResultMessage *) message;
struct GNUNET_MessageHeader *enc_msg =
(struct GNUNET_MessageHeader *) &incoming[1];
struct DHT_MessageContext msg_ctx;
+ // FIXME
if (ntohs (enc_msg->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE - 1)
{
GNUNET_break_op (0);
* Initialize neighbours subsystem.
*/
int
-GST_NEIGHBOURS_init ()
+GDS_NEIGHBOURS_init ()
{
static struct GNUNET_CORE_MessageHandler core_handlers[] = {
{&handle_dht_get, GNUNET_MESSAGE_TYPE_DHT_P2P_GET, 0},
GNUNET_CONFIGURATION_get_value_number (cfg, "DHT", "bucket_size",
&temp_config_num))
bucket_size = (unsigned int) temp_config_num;
- coreAPI = GNUNET_CORE_connect (GDS_cfg, /* Main configuration */
- DEFAULT_CORE_QUEUE_SIZE, /* queue size */
- NULL, /* Closure passed to DHT functions */
- &core_init, /* Call core_init once connected */
- &handle_core_connect, /* Handle connects */
- &handle_core_disconnect, /* remove peers on disconnects */
+ coreAPI = GNUNET_CORE_connect (GDS_cfg,
+ DEFAULT_CORE_QUEUE_SIZE,
+ NULL,
+ &core_init,
+ &handle_core_connect,
+ &handle_core_disconnect,
NULL, /* Do we care about "status" updates? */
- NULL, /* Don't want notified about all incoming messages */
- GNUNET_NO, /* For header only inbound notification */
- NULL, /* Don't want notified about all outbound messages */
- GNUNET_NO, /* For header only outbound notification */
- core_handlers); /* Register these handlers */
+ NULL, GNUNET_NO,
+ NULL, GNUNET_NO,
+ core_handlers);
if (coreAPI == NULL)
return GNUNET_SYSERR;
all_known_peers = GNUNET_CONTAINER_multihashmap_create (256);
* Shutdown neighbours subsystem.
*/
void
-GST_NEIGHBOURS_done ()
+GDS_NEIGHBOURS_done ()
{
GNUNET_assert (coreAPI != NULL);
GNUNET_CORE_disconnect (coreAPI);