*/
#include "platform.h"
-#include "gnunet_block_lib.h"
-#include "gnunet_util_lib.h"
+#include "gnunet_constants.h"
#include "gnunet_protocols.h"
-#include "gnunet_nse_service.h"
-#include "gnunet_core_service.h"
-#include "gnunet_datacache_lib.h"
-#include "gnunet_transport_service.h"
-#include "gnunet_hello_lib.h"
-#include "gnunet_dht_service.h"
#include "gnunet_statistics_service.h"
-#include "dht_new.h"
-#include <fenv.h>
+#include "gnunet-service-dht.h"
#include "gnunet-service-dht_clients.h"
#include "gnunet-service-dht_datacache.h"
#include "gnunet-service-dht_neighbours.h"
+#include "dht.h"
/**
/**
* The type for the data for the GET request.
*/
- enum GNUNET_BLOCK_Type msg_type;
+ enum GNUNET_BLOCK_Type type;
};
static struct ClientList *
find_active_client (struct GNUNET_SERVER_Client *client)
{
- struct ClientList *pos = client_list;
+ struct ClientList *pos = client_head;
struct ClientList *ret;
while (pos != NULL)
GNUNET_assert (GNUNET_YES ==
GNUNET_CONTAINER_multihashmap_remove (forward_map,
key, record));
- GNUNET_CONTAINER_heap_remove_node (record->hnode);
- GNUNET_ARRAY_append (record->seen_replies,
- record->seen_replies_count,
- 0);
+ if (NULL != record->hnode)
+ GNUNET_CONTAINER_heap_remove_node (record->hnode);
+ GNUNET_array_grow (record->seen_replies,
+ record->seen_replies_count,
+ 0);
GNUNET_free (record);
return GNUNET_YES;
}
handle_client_disconnect (void *cls,
struct GNUNET_SERVER_Client *client)
{
- struct ClientList *pos = client_list;
- struct ClientList *found;
+ struct ClientList *pos;
struct PendingMessage *reply;
- found = NULL;
- while (pos != NULL)
- {
- if (pos->client_handle == client)
- {
- GNUNET_CONTAINER_DLL_remove (client_head,
- client_tail,
- pos);
- found = pos;
- break;
- }
- pos = pos->next;
- }
- if (found == NULL)
- return;
- if (found->transmit_handle != NULL)
- GNUNET_CONNECTION_notify_transmit_ready_cancel (found->transmit_handle);
- while (NULL != (reply = found->pending_head))
+ pos = find_active_client (client);
+ GNUNET_CONTAINER_DLL_remove (client_head,
+ client_tail,
+ pos);
+ if (pos->transmit_handle != NULL)
+ GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->transmit_handle);
+ while (NULL != (reply = pos->pending_head))
{
- GNUNET_CONTAINER_DLL_remove (found->pending_head, found->pending_tail,
+ GNUNET_CONTAINER_DLL_remove (pos->pending_head, pos->pending_tail,
reply);
GNUNET_free (reply);
}
- GNUNET_CONTAINER_multihashmap_iterate (forward_list.hashmap,
- &remove_client_records, found);
- GNUNET_free (found);
+ GNUNET_CONTAINER_multihashmap_iterate (forward_map,
+ &remove_client_records, pos);
+ GNUNET_free (pos);
}
transmit_request (struct ClientQueryRecord *cqr)
{
int32_t reply_bf_mutator;
+ struct GNUNET_CONTAINER_BloomFilter *reply_bf;
+ struct GNUNET_CONTAINER_BloomFilter *peer_bf;
+ GNUNET_STATISTICS_update (GDS_stats,
+ gettext_noop ("# GET requests from clients injected"), 1,
+ GNUNET_NO);
reply_bf_mutator = (int32_t) GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
UINT32_MAX);
reply_bf = GNUNET_BLOCK_construct_bloomfilter (reply_bf_mutator,
cqr->seen_replies,
cqr->seen_replies_count);
- GST_NEIGHBOURS_handle_get (cqr->msg_type,
+ peer_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
+ DHT_BLOOM_SIZE,
+ GNUNET_CONSTANTS_BLOOMFILTER_K);
+ GDS_NEIGHBOURS_handle_get (cqr->type,
cqr->msg_options,
cqr->replication,
+ 0 /* hop count */,
&cqr->key,
cqr->xquery,
cqr->xquery_size,
reply_bf,
reply_bf_mutator,
- NULL /* no peers blocked initially */);
- GNUNET_CONTAINER_bloomfilter_destroy (reply_bf);
+ peer_bf);
+ GNUNET_CONTAINER_bloomfilter_free (reply_bf);
+ GNUNET_CONTAINER_bloomfilter_free (peer_bf);
/* exponential back-off for retries, max 1h */
cqr->retry_frequency =
- GNUNET_TIME_relative_max (GNUNET_TIME_UNIT_HOURS,
+ GNUNET_TIME_relative_min (GNUNET_TIME_UNIT_HOURS,
GNUNET_TIME_relative_multiply (cqr->retry_frequency, 2));
cqr->retry_time = GNUNET_TIME_relative_to_absolute (cqr->retry_frequency);
}
{
cqr->hnode = NULL;
delay = GNUNET_TIME_absolute_get_remaining (cqr->retry_time);
- if (delay.value > 0)
+ if (delay.rel_value > 0)
{
cqr->hnode = GNUNET_CONTAINER_heap_insert (retry_heap, cqr,
cqr->retry_time.abs_value);
return;
}
transmit_request (cqr);
+ cqr->hnode = GNUNET_CONTAINER_heap_insert (retry_heap, cqr,
+ cqr->retry_time.abs_value);
}
}
const struct GNUNET_MessageHeader *message)
{
const struct GNUNET_DHT_ClientPutMessage *dht_msg;
+ struct GNUNET_CONTAINER_BloomFilter *peer_bf;
uint16_t size;
size = ntohs (message->size);
{
GNUNET_break (0);
GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+ return;
}
+ GNUNET_STATISTICS_update (GDS_stats,
+ gettext_noop ("# PUT requests received from clients"), 1,
+ GNUNET_NO);
dht_msg = (const struct GNUNET_DHT_ClientPutMessage *) message;
/* give to local clients */
- GDS_CLIENT_handle_reply (GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
+ GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
&dht_msg->key,
0, NULL,
0, NULL,
size - sizeof (struct GNUNET_DHT_ClientPutMessage),
&dht_msg[1]);
/* store locally */
- GST_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
+ GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
&dht_msg->key,
0, NULL,
ntohl (dht_msg->type),
size - sizeof (struct GNUNET_DHT_ClientPutMessage),
&dht_msg[1]);
/* route to other peers */
- GST_NEIGHBOURS_handle_put (ntohl (dht_msg->type),
+ peer_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
+ DHT_BLOOM_SIZE,
+ GNUNET_CONSTANTS_BLOOMFILTER_K);
+ GDS_NEIGHBOURS_handle_put (ntohl (dht_msg->type),
ntohl (dht_msg->options),
ntohl (dht_msg->desired_replication_level),
GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
+ 0 /* hop count */,
+ peer_bf,
&dht_msg->key,
+ 0, NULL,
&dht_msg[1],
size - sizeof (struct GNUNET_DHT_ClientPutMessage));
+ GNUNET_CONTAINER_bloomfilter_free (peer_bf);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
const struct GNUNET_MessageHeader *message)
{
const struct GNUNET_DHT_ClientGetMessage *get;
- const struct GNUNET_MessageHeader *enc_msg;
-
struct ClientQueryRecord *cqr;
size_t xquery_size;
const char* xquery;
xquery_size = size - sizeof (struct GNUNET_DHT_ClientGetMessage);
get = (const struct GNUNET_DHT_ClientGetMessage *) message;
xquery = (const char*) &get[1];
-
+ GNUNET_STATISTICS_update (GDS_stats,
+ gettext_noop ("# GET requests received from clients"), 1,
+ GNUNET_NO);
cqr = GNUNET_malloc (sizeof (struct ClientQueryRecord) + xquery_size);
cqr->key = get->key;
cqr->xquery_size = xquery_size;
cqr->replication = ntohl (get->desired_replication_level);
cqr->msg_options = ntohl (get->options);
- cqr->msg_type = ntohl (get->type);
- GNUNET_CONTAINER_multihashmap_put (forward_map, KEY, cqr,
+ cqr->type = ntohl (get->type);
+ GNUNET_CONTAINER_multihashmap_put (forward_map, &get->key, cqr,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
/* start remote requests */
if (GNUNET_SCHEDULER_NO_TASK != retry_task)
retry_task = GNUNET_SCHEDULER_add_now (&transmit_next_request_task, NULL);
/* perform local lookup */
GDS_DATACACHE_handle_get (&get->key,
- cqr->msg_type,
+ cqr->type,
cqr->xquery,
xquery_size,
NULL, 0);
/**
- * Closure for 'remove_by_uid'.
+ * Closure for 'remove_by_unique_id'.
*/
-struct RemoveByUidContext
+struct RemoveByUniqueIdContext
{
/**
* Client that issued the removal request.
/**
* Unique ID of the request.
*/
- uint64_t uid;
+ uint64_t unique_id;
};
/**
* Iterator over hash map entries that frees all entries
- * that match the given client and UID.
+ * that match the given client and unique ID.
*
- * @param cls UID and client to search for in source routes
+ * @param cls unique ID and client to search for in source routes
* @param key current key code
* @param value value in the hash map, a ClientQueryRecord
* @return GNUNET_YES (we should continue to iterate)
*/
static int
-remove_by_uid (void *cls, const GNUNET_HashCode * key, void *value)
+remove_by_unique_id (void *cls, const GNUNET_HashCode * key, void *value)
{
- const struct RemoveByUidContext *ctx = cls;
+ const struct RemoveByUniqueIdContext *ctx = cls;
struct ClientQueryRecord *record = value;
- if (record->uid != ctx->uid)
+ if (record->unique_id != ctx->unique_id)
return GNUNET_YES;
return remove_client_records (ctx->client, key, record);
}
{
const struct GNUNET_DHT_ClientGetStopMessage *dht_stop_msg =
(const struct GNUNET_DHT_ClientGetStopMessage *) message;
+ struct RemoveByUniqueIdContext ctx;
+ GNUNET_STATISTICS_update (GDS_stats,
+ gettext_noop ("# GET STOP requests received from clients"), 1,
+ GNUNET_NO);
ctx.client = find_active_client (client);
- ctx.uid = &dht_stop_msg.unique_id);
+ ctx.unique_id = dht_stop_msg->unique_id;
GNUNET_CONTAINER_multihashmap_get_multiple (forward_map,
&dht_stop_msg->key,
- &remove_by_uid,
+ &remove_by_unique_id,
&ctx);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
struct ForwardReplyContext *frc = cls;
struct ClientQueryRecord *record = value;
struct PendingMessage *pm;
- struct ReplyMessage *reply;
+ struct GNUNET_DHT_ClientResultMessage *reply;
enum GNUNET_BLOCK_EvaluationResult eval;
int do_free;
GNUNET_HashCode ch;
if ( (record->type != GNUNET_BLOCK_TYPE_ANY) &&
(record->type != frc->type) )
- return GNUNET_YES; /* type mismatch */
+ {
+ GNUNET_STATISTICS_update (GDS_stats,
+ gettext_noop ("# Key match, type mismatches in REPLY to CLIENT"), 1,
+ GNUNET_NO);
+ return GNUNET_YES; /* type mismatch */
+ }
GNUNET_CRYPTO_hash (frc->data,
frc->data_size,
&ch);
if (0 == memcmp (&record->seen_replies[i],
&ch,
sizeof (GNUNET_HashCode)))
- return GNUNET_YES; /* duplicate */
+ {
+ GNUNET_STATISTICS_update (GDS_stats,
+ gettext_noop ("# Duplicate REPLIES to CLIENT request dropped"), 1,
+ GNUNET_NO);
+ return GNUNET_YES; /* duplicate */
+ }
eval =
GNUNET_BLOCK_evaluate (GDS_block_context,
record->type, key,
do_free = GNUNET_YES;
break;
case GNUNET_BLOCK_EVALUATION_OK_MORE:
- GNUNET_ARRAY_append (record->seen_replies,
+ GNUNET_array_append (record->seen_replies,
record->seen_replies_count,
ch);
do_free = GNUNET_NO;
return GNUNET_NO;
case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Unsupported block type (%u) in request!\n",
+ _("Unsupported block type (%u) in request!\n"),
record->type);
return GNUNET_NO;
+ default:
+ GNUNET_break (0);
+ return GNUNET_NO;
}
if (GNUNET_NO == frc->do_copy)
{
sizeof (struct PendingMessage) + ntohs (frc->pm->msg->size));
pm->next = pm->prev = NULL;
}
- reply = (struct ReplyMessage*) &pm[1];
+ GNUNET_STATISTICS_update (GDS_stats,
+ gettext_noop ("# RESULTS queued for clients"), 1,
+ GNUNET_NO);
+ reply = (struct GNUNET_DHT_ClientResultMessage*) &pm[1];
reply->unique_id = record->unique_id;
add_pending_message (record->client, pm);
if (GNUNET_YES == do_free)
* @param data application payload data
*/
void
-GDS_CLIENT_handle_reply (struct GNUNET_TIME_Absolute expiration,
+GDS_CLIENTS_handle_reply (struct GNUNET_TIME_Absolute expiration,
const GNUNET_HashCode *key,
unsigned int get_path_length,
const struct GNUNET_PeerIdentity *get_path,
{
struct ForwardReplyContext frc;
struct PendingMessage *pm;
- struct ReplyMessage *reply;
+ struct GNUNET_DHT_ClientResultMessage *reply;
struct GNUNET_PeerIdentity *paths;
size_t msize;
if (NULL ==
- GNUNET_CONTAINER_multihashmap_get (foward_map, key))
+ GNUNET_CONTAINER_multihashmap_get (forward_map, key))
+ {
+ GNUNET_STATISTICS_update (GDS_stats,
+ gettext_noop ("# REPLIES ignored for CLIENTS (no match)"), 1,
+ GNUNET_NO);
return; /* no matching request, fast exit! */
- msize = sizeof(struct ReplyMessage) + data_size +
+ }
+ msize = sizeof(struct GNUNET_DHT_ClientResultMessage) + data_size +
(get_path_length + put_path_length) * sizeof (struct GNUNET_PeerIdentity);
if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
{
return;
}
pm = (struct PendingMessage *) GNUNET_malloc (msize + sizeof (struct PendingMessage));
- reply = (struct ReplyMessage*) &pm[1];
+ reply = (struct GNUNET_DHT_ClientResultMessage*) &pm[1];
pm->msg = &reply->header;
reply->header.size = htons ((uint16_t) msize);
reply->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT);
reply->expiration = GNUNET_TIME_absolute_hton (expiration);
reply->key = *key;
paths = (struct GNUNET_PeerIdentity*) &reply[1];
- mempcy (paths, get_path,
- sizeof (struct GNUNET_PeerIdentity) * get_path_length);
- mempcy (&paths[get_path_length],
- put_path, sizeof (struct GNUNET_PeerIdentity) * put_path_length);
+ memcpy (paths, put_path,
+ sizeof (struct GNUNET_PeerIdentity) * put_path_length);
+ memcpy (&paths[put_path_length],
+ get_path, sizeof (struct GNUNET_PeerIdentity) * get_path_length);
memcpy (&paths[get_path_length + put_path_length],
data,
data_size);
frc.data = data;
frc.data_size = data_size;
frc.type = type;
- GNUNET_CONTAINER_multihashmap_get_multiple (foward_map, key,
+ GNUNET_CONTAINER_multihashmap_get_multiple (forward_map, key,
&forward_reply,
&frc);
if (GNUNET_NO == frc.do_copy)
{
/* did not match any of the requests, free! */
- GNUNET_free (buf);
+ GNUNET_STATISTICS_update (GDS_stats,
+ gettext_noop ("# REPLIES ignored for CLIENTS (no match)"), 1,
+ GNUNET_NO);
+ GNUNET_free (pm);
}
}
* @param server the initialized server
*/
void
-GDS_CLIENT_init (struct GNUNET_SERVER_Handle *server)
+GDS_CLIENTS_init (struct GNUNET_SERVER_Handle *server)
{
static struct GNUNET_SERVER_MessageHandler plugin_handlers[] = {
{&handle_dht_local_put, NULL,
GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET, 0},
{&handle_dht_local_get_stop, NULL,
GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_STOP,
- sizeof (struct GNUNET_DHT_StopMessage) },
+ sizeof (struct GNUNET_DHT_ClientGetStopMessage) },
{NULL, NULL, 0, 0}
};
forward_map = GNUNET_CONTAINER_multihashmap_create (1024);
* Shutdown client subsystem.
*/
void
-GDS_CLIENT_done ()
+GDS_CLIENTS_done ()
{
GNUNET_assert (client_head == NULL);
GNUNET_assert (client_tail == NULL);