- Do search with a maximum parallel factor, starting a new one when an old one is done or times out
#include "gnunet_dht_service.h"
#include "gnunet_testbed_service.h"
#include "gnunet_dht_service.h"
#include "gnunet_testbed_service.h"
+#define FIND_TIMEOUT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 90)
+#define SEARCHES_IN_PARALLEL 10
+
/**
* DLL of operations
*/
/**
* DLL of operations
*/
* The starting time of a profiling step.
*/
struct GNUNET_TIME_Absolute prof_start_time;
* The starting time of a profiling step.
*/
struct GNUNET_TIME_Absolute prof_start_time;
+
+ /**
+ * Operation timeout
+ */
+ GNUNET_SCHEDULER_TaskIdentifier timeout;
*/
static int num_search_strings;
*/
static int num_search_strings;
+/**
+ * Index of peer/string search.
+ */
+static unsigned int peer_cnt;
+
/**
* Number of peers found with search strings.
*/
/**
* Number of peers found with search strings.
*/
*/
static unsigned int max_path_compression;
*/
static unsigned int max_path_compression;
-/**
- * If we should distribute the search evenly throught all peers (each
- * peer searches for a string) or if only one peer should search for
- * all strings.
- */
-static int no_distributed_search;
-
/**
* Prefix used for regex announcing. We need to prefix the search
* strings with it, in order to find something.
/**
* Prefix used for regex announcing. We need to prefix the search
* strings with it, in order to find something.
/******************************************************************************/
/******************************************************************************/
-/************************ MESH SERVICE CONNECTIONS **************************/
+/************************ REGEX FIND CONNECTIONS **************************/
/******************************************************************************/
/******************************************************************************/
+
+/**
+ * Start searching for the next string in the DHT.
+ *
+ * @param cls Index of the next peer in the peers array.
+ * @param tc TaskContext.
+ */
+static void
+find_next_string (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
+
+
/**
* Method called when we've found a peer that announced a regex
* that matches our search string. Now get the statistics.
/**
* Method called when we've found a peer that announced a regex
* that matches our search string. Now get the statistics.
+ if (GNUNET_SCHEDULER_NO_TASK != peer->timeout)
+ {
+ GNUNET_SCHEDULER_cancel (peer->timeout);
+ peer->timeout = GNUNET_SCHEDULER_NO_TASK;
+ GNUNET_SCHEDULER_add_now (&find_next_string, NULL);
+ }
+
if (NULL == id)
{
// FIXME not possible right now
if (NULL == id)
{
// FIXME not possible right now
do_connect_by_string (void *cls,
const struct GNUNET_SCHEDULER_TaskContext * tc)
{
do_connect_by_string (void *cls,
const struct GNUNET_SCHEDULER_TaskContext * tc)
{
printf ("Starting string search.\n");
fflush (stdout);
printf ("Starting string search.\n");
fflush (stdout);
- peers[0].search_str = search_strings[0];
- peers[0].search_str_matched = GNUNET_NO;
+ search_timeout_task = GNUNET_SCHEDULER_add_delayed (search_timeout,
+ &do_connect_by_string_timeout, NULL);
+ for (i = 0; i < SEARCHES_IN_PARALLEL; i++)
+ GNUNET_SCHEDULER_add_now (&find_next_string, NULL);
+}
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Searching for string \"%s\" on peer %d with file %s\n",
- peers[0].search_str, 0, peers[0].policy_file);
- /* First connect to mesh service, then search for string. Next
- connect will be in mesh_connect_cb */
- peers[0].op_handle =
- GNUNET_TESTBED_service_connect (NULL,
- peers[0].peer_handle,
- "dht",
- &dht_connect_cb,
- &peers[0],
- &dht_ca,
- &dht_da,
- &peers[0]);
+/**
+ * Search timed out. It might still complete in the future,
+ * but we should start another one.
+ *
+ * @param cls Index of the next peer in the peers array.
+ * @param tc TaskContext.
+ */
+static void
+find_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct RegexPeer *p = cls;
- search_timeout_task = GNUNET_SCHEDULER_add_delayed (search_timeout,
- &do_connect_by_string_timeout, NULL);
+ p->timeout = GNUNET_SCHEDULER_NO_TASK;
+
+ if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
+ return;
+ GNUNET_SCHEDULER_add_now (&find_next_string, NULL);
/**
* Start searching for the next string in the DHT.
*
* @param cls Index of the next peer in the peers array.
* @param tc TaskContext.
*/
/**
* Start searching for the next string in the DHT.
*
* @param cls Index of the next peer in the peers array.
* @param tc TaskContext.
*/
find_next_string (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
find_next_string (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
- long next_p = (long) cls;
-
- if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
+ if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) ||
+ peer_cnt >= (num_search_strings - 1))
+ peers[peer_cnt].search_str = search_strings[peer_cnt];
+ peers[peer_cnt].search_str_matched = GNUNET_NO;
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Searching for string \"%s\" on peer %d with file %s\n",
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Searching for string \"%s\" on peer %d with file %s\n",
- peers[next_p].search_str, next_p, peers[next_p].policy_file);
+ peers[peer_cnt].search_str,
+ peer_cnt,
+ peers[peer_cnt].policy_file);
- /* FIXME
- * dont connect to a new dht for each peer, we might want to seach for n
- * strings on m peers where n > m
- */
- peers[next_p].op_handle =
+ peers[peer_cnt].op_handle =
GNUNET_TESTBED_service_connect (NULL,
GNUNET_TESTBED_service_connect (NULL,
- peers[next_p].peer_handle,
+ peers[peer_cnt].peer_handle,
+ &peers[peer_cnt]);
+ peers[peer_cnt].timeout = GNUNET_SCHEDULER_add_delayed (FIND_TIMEOUT,
+ &find_timeout,
+ &peers[peer_cnt]);
+ peer_cnt++;
/**
* DHT connect callback. Called when we are connected to the dht service for
* the peer in 'cls'. If successfull we connect to the stats service of this
/**
* DHT connect callback. Called when we are connected to the dht service for
* the peer in 'cls'. If successfull we connect to the stats service of this
void *ca_result, const char *emsg)
{
struct RegexPeer *peer = (struct RegexPeer *) cls;
void *ca_result, const char *emsg)
{
struct RegexPeer *peer = (struct RegexPeer *) cls;
- static unsigned int peer_cnt;
- unsigned int next_p;
if (NULL != emsg || NULL == op || NULL == ca_result)
{
if (NULL != emsg || NULL == op || NULL == ca_result)
{
®ex_found_handler, peer,
NULL);
peer->prof_start_time = GNUNET_TIME_absolute_get ();
®ex_found_handler, peer,
NULL);
peer->prof_start_time = GNUNET_TIME_absolute_get ();
-
- if (peer_cnt < (num_search_strings - 1))
- {
- if (GNUNET_YES == no_distributed_search)
- next_p = 0;
- else
- next_p = (++peer_cnt % num_peers);
-
- peers[next_p].search_str = search_strings[next_p];
- peers[next_p].search_str_matched = GNUNET_NO;
-
- /* Don't start all searches at once */
- /* TODO add some intelligence to the timeout */
- GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
- &find_next_string,
- (void *) (long) next_p);
- }
{'p', "max-path-compression", "MAX_PATH_COMPRESSION",
gettext_noop ("maximum path compression length"),
1, &GNUNET_GETOPT_set_uint, &max_path_compression},
{'p', "max-path-compression", "MAX_PATH_COMPRESSION",
gettext_noop ("maximum path compression length"),
1, &GNUNET_GETOPT_set_uint, &max_path_compression},
- {'i', "no-distributed-search", "",
- gettext_noop ("if this option is set, only one peer is responsible for searching all strings"),
- 0, &GNUNET_GETOPT_set_one, &no_distributed_search},
GNUNET_GETOPT_OPTION_END
};
int ret;
GNUNET_GETOPT_OPTION_END
};
int ret;