*/
uint32_t desired_replication_level GNUNET_PACKED;
- /**
- * Unique ID for the PUT message.
- */
- uint64_t unique_id GNUNET_PACKED;
-
/**
* How long should this data persist?
*/
};
-/**
- * Message to confirming receipt of PUT, sent from DHT service to clients.
- */
-struct GNUNET_DHT_ClientPutConfirmationMessage
-{
- /**
- * Type: #GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT_OK
- */
- struct GNUNET_MessageHeader header;
-
- /**
- * Always zero.
- */
- uint32_t reserved GNUNET_PACKED;
-
- /**
- * Unique ID from the PUT message that is being confirmed.
- */
- uint64_t unique_id GNUNET_PACKED;
-
-};
-
-
-
/**
* Message to monitor put requests going through peer, DHT service -> clients.
*/
/**
* Continuation to call when done.
*/
- GNUNET_DHT_PutContinuation cont;
+ GNUNET_SCHEDULER_TaskCallback cont;
/**
* Main handle to this DHT api
void *cont_cls;
/**
- * Unique ID for the PUT operation.
+ * Envelope from the PUT operation.
*/
- uint64_t unique_id;
+ struct GNUNET_MQ_Envelope *env;
};
do_disconnect (struct GNUNET_DHT_Handle *h)
{
struct GNUNET_DHT_PutHandle *ph;
- GNUNET_DHT_PutContinuation cont;
+ GNUNET_SCHEDULER_TaskCallback cont;
void *cont_cls;
if (NULL == h->mq)
{
cont = ph->cont;
cont_cls = ph->cont_cls;
+ ph->env = NULL;
GNUNET_DHT_put_cancel (ph);
if (NULL != cont)
- cont (cont_cls,
- GNUNET_SYSERR);
+ cont (cont_cls);
}
GNUNET_assert (NULL == h->reconnect_task);
h->reconnect_task
/**
- * Process a put confirmation message from the service.
+ * Process a MQ PUT transmission notification.
*
* @param cls The DHT handle.
- * @param msg confirmation message from the service.
*/
static void
-handle_put_confirmation (void *cls,
- const struct GNUNET_DHT_ClientPutConfirmationMessage *msg)
+handle_put_cont (void *cls)
{
- struct GNUNET_DHT_Handle *handle = cls;
- struct GNUNET_DHT_PutHandle *ph;
- GNUNET_DHT_PutContinuation cont;
+ struct GNUNET_DHT_PutHandle *ph = cls;
+ GNUNET_SCHEDULER_TaskCallback cont;
void *cont_cls;
- for (ph = handle->put_head; NULL != ph; ph = ph->next)
- if (ph->unique_id == msg->unique_id)
- break;
- if (NULL == ph)
- return;
cont = ph->cont;
cont_cls = ph->cont_cls;
+ ph->env = NULL;
GNUNET_DHT_put_cancel (ph);
if (NULL != cont)
- cont (cont_cls,
- GNUNET_OK);
+ cont (cont_cls);
}
GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT,
struct GNUNET_DHT_ClientResultMessage,
h),
- GNUNET_MQ_hd_fixed_size (put_confirmation,
- GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT_OK,
- struct GNUNET_DHT_ClientPutConfirmationMessage,
- h),
GNUNET_MQ_handler_end ()
};
if (NULL != h->mq)
while (NULL != (ph = handle->put_head))
{
if (NULL != ph->cont)
- ph->cont (ph->cont_cls,
- GNUNET_SYSERR);
+ ph->cont (ph->cont_cls);
GNUNET_DHT_put_cancel (ph);
}
if (NULL != handle->mq)
size_t size,
const void *data,
struct GNUNET_TIME_Absolute exp,
- GNUNET_DHT_PutContinuation cont,
+ GNUNET_SCHEDULER_TaskCallback cont,
void *cont_cls)
{
struct GNUNET_MQ_Envelope *env;
ph->dht_handle = handle;
ph->cont = cont;
ph->cont_cls = cont_cls;
- ph->unique_id = ++handle->uid_gen;
GNUNET_CONTAINER_DLL_insert_tail (handle->put_head,
handle->put_tail,
ph);
env = GNUNET_MQ_msg_extra (put_msg,
size,
GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT);
+ GNUNET_MQ_notify_sent (env,
+ &handle_put_cont,
+ ph);
+ ph->env = env;
put_msg->type = htonl ((uint32_t) type);
put_msg->options = htonl ((uint32_t) options);
put_msg->desired_replication_level = htonl (desired_replication_level);
- put_msg->unique_id = ph->unique_id;
put_msg->expiration = GNUNET_TIME_absolute_hton (exp);
put_msg->key = *key;
GNUNET_memcpy (&put_msg[1],
{
struct GNUNET_DHT_Handle *handle = ph->dht_handle;
+ if (NULL != ph->env)
+ GNUNET_MQ_notify_sent (ph->env,
+ NULL,
+ NULL);
GNUNET_CONTAINER_DLL_remove (handle->put_head,
handle->put_tail,
ph);
const char *emsg)
{
struct GNUNET_DHT_TEST_Context *ctx = cls;
- unsigned int i;
if (NULL != emsg)
{
GNUNET_SCHEDULER_shutdown ();
return;
}
- for (i=0;i<ctx->num_peers;i++)
+ for (unsigned int i=0;i<ctx->num_peers;i++)
if (op == ctx->ops[i])
ctx->dhts[i] = ca_result;
- for (i=0;i<ctx->num_peers;i++)
+ for (unsigned int i=0;i<ctx->num_peers;i++)
if (NULL == ctx->dhts[i])
return; /* still some DHT connections missing */
/* all DHT connections ready! */
void
GNUNET_DHT_TEST_cleanup (struct GNUNET_DHT_TEST_Context *ctx)
{
- unsigned int i;
-
- for (i=0;i<ctx->num_peers;i++)
+ for (unsigned int i=0;i<ctx->num_peers;i++)
GNUNET_TESTBED_operation_done (ctx->ops[i]);
GNUNET_free (ctx->ops);
GNUNET_free (ctx->dhts);
static void
dht_test_run (void *cls,
- struct GNUNET_TESTBED_RunHandle *h,
+ struct GNUNET_TESTBED_RunHandle *h,
unsigned int num_peers,
struct GNUNET_TESTBED_Peer **peers,
unsigned int links_succeeded,
unsigned int links_failed)
{
struct GNUNET_DHT_TEST_Context *ctx = cls;
- unsigned int i;
GNUNET_assert (num_peers == ctx->num_peers);
ctx->peers = peers;
- for (i=0;i<num_peers;i++)
+ for (unsigned int i=0;i<num_peers;i++)
ctx->ops[i] = GNUNET_TESTBED_service_connect (ctx,
peers[i],
"dht",
* Signature of the main function of a task.
*
* @param cls closure
- * @param success #GNUNET_OK if the PUT was transmitted,
- * #GNUNET_NO on timeout,
- * #GNUNET_SYSERR on disconnect from service
- * after the PUT message was transmitted
- * (so we don't know if it was received or not)
*/
static void
-message_sent_cont (void *cls, int success)
+message_sent_cont (void *cls)
{
- if (verbose)
- {
- switch (success)
- {
- case GNUNET_OK:
- FPRINTF (stderr, "%s `%s'!\n", _("PUT request sent with key"), GNUNET_h2s_full(&key));
- break;
- case GNUNET_NO:
- FPRINTF (stderr, "%s", _("Timeout sending PUT request!\n"));
- break;
- case GNUNET_SYSERR:
- FPRINTF (stderr, "%s", _("PUT request not confirmed!\n"));
- break;
- default:
- GNUNET_break (0);
- break;
- }
- }
- GNUNET_SCHEDULER_add_now (&shutdown_task, NULL);
+ GNUNET_SCHEDULER_add_now (&shutdown_task,
+ NULL);
}
if (NULL == (dht_handle = GNUNET_DHT_connect (cfg, 1)))
{
- FPRINTF (stderr, _("Could not connect to %s service!\n"), "DHT");
+ FPRINTF (stderr,
+ _("Could not connect to DHT service!\n"));
ret = 1;
return;
}
{
struct GNUNET_GETOPT_CommandLineOption options[] = {
-
GNUNET_GETOPT_option_string ('d',
"data",
"DATA",
gettext_noop ("the data to insert under the key"),
&data),
-
GNUNET_GETOPT_option_relative_time ('e',
- "expiration",
- "EXPIRATION",
- gettext_noop ("how long to store this entry in the dht (in seconds)"),
- &expiration),
-
+ "expiration",
+ "EXPIRATION",
+ gettext_noop ("how long to store this entry in the dht (in seconds)"),
+ &expiration),
GNUNET_GETOPT_option_string ('k',
"key",
"KEY",
gettext_noop ("the query key"),
&query_key),
-
GNUNET_GETOPT_option_flag ('x',
- "demultiplex",
- gettext_noop ("use DHT's demultiplex everywhere option"),
- &demultixplex_everywhere),
-
+ "demultiplex",
+ gettext_noop ("use DHT's demultiplex everywhere option"),
+ &demultixplex_everywhere),
GNUNET_GETOPT_option_uint ('r',
- "replication",
- "LEVEL",
- gettext_noop ("how many replicas to create"),
- &replication),
-
+ "replication",
+ "LEVEL",
+ gettext_noop ("how many replicas to create"),
+ &replication),
GNUNET_GETOPT_option_flag ('R',
- "record",
- gettext_noop ("use DHT's record route option"),
- &record_route),
-
+ "record",
+ gettext_noop ("use DHT's record route option"),
+ &record_route),
GNUNET_GETOPT_option_uint ('t',
- "type",
- "TYPE",
- gettext_noop ("the type to insert data as"),
- &query_type),
-
+ "type",
+ "TYPE",
+ gettext_noop ("the type to insert data as"),
+ &query_type),
GNUNET_GETOPT_option_verbose (&verbose),
-
GNUNET_GETOPT_OPTION_END
};
- if (GNUNET_OK != GNUNET_STRINGS_get_utf8_args (argc, argv,
- &argc, &argv))
+ if (GNUNET_OK !=
+ GNUNET_STRINGS_get_utf8_args (argc, argv,
+ &argc, &argv))
return 2;
expiration = GNUNET_TIME_UNIT_HOURS;
return (GNUNET_OK ==
struct ClientHandle *ch = cls;
struct GNUNET_CONTAINER_BloomFilter *peer_bf;
uint16_t size;
- struct GNUNET_MQ_Envelope *env;
- struct GNUNET_DHT_ClientPutConfirmationMessage *conf;
size = ntohs (dht_msg->header.size);
GNUNET_STATISTICS_update (GDS_stats,
&dht_msg[1],
size - sizeof (struct GNUNET_DHT_ClientPutMessage));
GNUNET_CONTAINER_bloomfilter_free (peer_bf);
- env = GNUNET_MQ_msg (conf,
- GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT_OK);
- conf->reserved = htonl (0);
- conf->unique_id = dht_msg->unique_id;
- GNUNET_MQ_send (ch->mq,
- env);
GNUNET_SERVICE_client_continue (ch->client);
}
* Signature of the main function of a task.
*
* @param cls closure
- * @param success result of PUT
*/
static void
-test_get (void *cls,
- int success)
+test_get (void *cls)
{
struct GNUNET_HashCode hash;
"Get successful\n");
#if 0
{
- int i;
-
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"PATH: (get %u, put %u)\n",
get_path_length,
put_path_length);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
" LOCAL\n");
- for (i = get_path_length - 1; i >= 0; i--)
+ for (int i = get_path_length - 1; i >= 0; i--)
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
" %s\n",
GNUNET_i2s (&get_path[i]));
- for (i = put_path_length - 1; i >= 0; i--)
+ for (int i = put_path_length - 1; i >= 0; i--)
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
" %s\n",
GNUNET_i2s (&put_path[i]));
struct GNUNET_DHT_Handle **hs = cls;
struct GNUNET_HashCode key;
struct GNUNET_HashCode value;
- unsigned int i;
put_task = NULL;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Putting values into DHT\n");
- for (i = 0; i < NUM_PEERS; i++)
+ for (unsigned int i = 0; i < NUM_PEERS; i++)
{
GNUNET_CRYPTO_hash (&i,
sizeof (i),
* Schedules the next PUT.
*
* @param cls closure, NULL
- * @param success #GNUNET_OK if the operation worked (unused)
*/
static void
-dht_put_cont (void *cls,
- int success)
+dht_put_cont (void *cls)
{
dht_put = NULL;
- dht_task = GNUNET_SCHEDULER_add_delayed (DHT_PUT_FREQUENCY,
- &do_dht_put,
- NULL);
}
{
struct GNUNET_TIME_Absolute expiration;
- dht_task = NULL;
+ dht_task = GNUNET_SCHEDULER_add_delayed (DHT_PUT_FREQUENCY,
+ &do_dht_put,
+ NULL);
expiration = GNUNET_TIME_absolute_ntoh (dns_advertisement.expiration_time);
if (GNUNET_TIME_absolute_get_remaining (expiration).rel_value_us <
GNUNET_TIME_UNIT_HOURS.rel_value_us)
&dns_advertisement.purpose,
&dns_advertisement.signature));
}
+ if (NULL != dht_put)
+ GNUNET_DHT_put_cancel (dht_put);
dht_put = GNUNET_DHT_put (dht,
&dht_put_key,
1 /* replication */,
* Continuation called after DHT PUT operation has finished.
*
* @param cls type of blocks to gather
- * @param success GNUNET_OK if the PUT was transmitted,
- * GNUNET_NO on timeout,
- * GNUNET_SYSERR on disconnect from service
- * after the PUT message was transmitted
- * (so we don't know if it was received or not)
*/
static void
-delay_dht_put_blocks (void *cls, int success)
+delay_dht_put_blocks (void *cls)
{
struct PutOperator *po = cls;
struct GNUNET_DHT_PutHandle;
-/**
- * Type of a PUT continuation. You must not call
- * #GNUNET_DHT_disconnect in this continuation.
- *
- * @param cls closure
- * @param success #GNUNET_OK if the PUT was transmitted,
- * #GNUNET_NO on timeout,
- * #GNUNET_SYSERR on disconnect from service
- * after the PUT message was transmitted
- * (so we don't know if it was received or not)
- */
-typedef void
-(*GNUNET_DHT_PutContinuation)(void *cls,
- int success);
-
-
/**
* Perform a PUT operation storing data in the DHT.
*
size_t size,
const void *data,
struct GNUNET_TIME_Absolute exp,
- GNUNET_DHT_PutContinuation cont,
+ GNUNET_SCHEDULER_TaskCallback cont,
void *cont_cls);
*/
#define GNUNET_MESSAGE_TYPE_DHT_MONITOR_STOP 154
-/**
- * Acknowledge receiving PUT request
- */
-#define GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT_OK 155
-
/**
* Certain results are already known to the client, filter those.
*/
GNUNET_SCHEDULER_TaskCallback cb,
void *cb_cls)
{
- GNUNET_assert (NULL == ev->sent_cb);
+ /* allow setting *OR* clearing callback */
+ GNUNET_assert ( (NULL == ev->sent_cb) ||
+ (NULL == cb) );
ev->sent_cb = cb;
ev->sent_cls = cb_cls;
}
* by a monitor is done.
*
* @param cls a `struct DhtPutActivity`
- * @param success #GNUNET_OK on success
*/
static void
-dht_put_monitor_continuation (void *cls,
- int success)
+dht_put_monitor_continuation (void *cls)
{
struct DhtPutActivity *ma = cls;
* Continuation called from DHT once the PUT operation is done.
*
* @param cls a `struct DhtPutActivity`
- * @param success #GNUNET_OK on success
*/
static void
-dht_put_continuation (void *cls,
- int success)
+dht_put_continuation (void *cls)
{
struct DhtPutActivity *ma = cls;
num_public_records++;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "PUT complete (%s)\n",
- (GNUNET_OK == success) ? "success" : "failure");
+ "PUT complete\n");
dht_queue_length--;
GNUNET_CONTAINER_DLL_remove (it_head,
it_tail,
ma);
GNUNET_free (ma);
- if (GNUNET_OK == success)
- {
- put_cnt++;
- if (0 == put_cnt % DELTA_INTERVAL)
- update_velocity ();
- }
+ put_cnt++;
+ if (0 == put_cnt % DELTA_INTERVAL)
+ update_velocity ();
}
const char *label,
const struct GNUNET_GNSRECORD_Data *rd_public,
unsigned int rd_public_count,
- GNUNET_DHT_PutContinuation cont,
+ GNUNET_SCHEDULER_TaskCallback cont,
void *cont_cls)
{
struct GNUNET_GNSRECORD_Block *block;