* @brief statistics values
*/
uint64_t stats[STAT_TYPE_MAX];
+ /**
+ * @brief Handle for the statistics get request
+ */
+ struct GNUNET_STATISTICS_GetHandle *h_stat_get[STAT_TYPE_MAX];
};
/**
return GNUNET_YES;
}
+
+static void
+cancel_pending_req (struct PendingRequest *pending_req)
+{
+ struct RPSPeer *rps_peer;
+
+ rps_peer = pending_req->rps_peer;
+ GNUNET_CONTAINER_DLL_remove (rps_peer->pending_req_head,
+ rps_peer->pending_req_tail,
+ pending_req);
+ rps_peer->num_pending_reqs--;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Cancelling pending rps get request\n");
+ GNUNET_SCHEDULER_cancel (pending_req->request_task);
+ GNUNET_free (pending_req);
+}
+
+static void
+cancel_request (struct PendingReply *pending_rep)
+{
+ struct RPSPeer *rps_peer;
+
+ rps_peer = pending_rep->rps_peer;
+ GNUNET_CONTAINER_DLL_remove (rps_peer->pending_rep_head,
+ rps_peer->pending_rep_tail,
+ pending_rep);
+ rps_peer->num_pending_reps--;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Cancelling rps get reply\n");
+ GNUNET_RPS_request_cancel (pending_rep->req_handle);
+ GNUNET_free (pending_rep);
+}
+
+void
+clean_peer (unsigned peer_index)
+{
+ struct PendingReply *pending_rep;
+ struct PendingRequest *pending_req;
+
+ pending_rep = rps_peers[peer_index].pending_rep_head;
+ while (NULL != (pending_rep = rps_peers[peer_index].pending_rep_head))
+ {
+ cancel_request (pending_rep);
+ }
+ pending_req = rps_peers[peer_index].pending_req_head;
+ while (NULL != (pending_req = rps_peers[peer_index].pending_req_head))
+ {
+ cancel_pending_req (pending_req);
+ }
+ if (NULL != rps_peers[peer_index].rps_handle)
+ {
+ GNUNET_RPS_disconnect (rps_peers[peer_index].rps_handle);
+ rps_peers[peer_index].rps_handle = NULL;
+ }
+ for (unsigned stat_type = STAT_TYPE_ROUNDS;
+ stat_type < STAT_TYPE_MAX;
+ stat_type++)
+ {
+ if (NULL != rps_peers[peer_index].h_stat_get[stat_type])
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "(%u) did not yet receive stat value for `%s'\n",
+ rps_peers[peer_index].index,
+ stat_type_2_str (stat_type));
+ GNUNET_STATISTICS_get_cancel (
+ rps_peers[peer_index].h_stat_get[stat_type]);
+ }
+ }
+ if (NULL != rps_peers[peer_index].op)
+ {
+ GNUNET_TESTBED_operation_done (rps_peers[peer_index].op);
+ rps_peers[peer_index].op = NULL;
+ }
+}
+
/**
* Task run on timeout to shut everything down.
*/
shutdown_op (void *cls)
{
unsigned int i;
+ struct OpListEntry *entry;
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Shutdown task scheduled, going down.\n");
in_shutdown = GNUNET_YES;
+ shutdown_task = NULL;
if (NULL != post_test_task)
{
GNUNET_SCHEDULER_cancel (post_test_task);
GNUNET_SCHEDULER_cancel (churn_task);
churn_task = NULL;
}
+ entry = oplist_head;
+ while (NULL != (entry = oplist_head))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Operation still pending on shutdown (%u)\n",
+ entry->index);
+ GNUNET_TESTBED_operation_done (entry->op);
+ GNUNET_CONTAINER_DLL_remove (oplist_head, oplist_tail, entry);
+ GNUNET_free (entry);
+ }
for (i = 0; i < num_peers; i++)
{
- if (NULL != rps_peers[i].rps_handle)
- {
- GNUNET_RPS_disconnect (rps_peers[i].rps_handle);
- rps_peers[i].rps_handle = NULL;
- }
- if (NULL != rps_peers[i].op)
- {
- GNUNET_TESTBED_operation_done (rps_peers[i].op);
- rps_peers[i].op = NULL;
- }
+ clean_peer (i);
}
+ GNUNET_SCHEDULER_shutdown ();
}
{
GNUNET_SCHEDULER_cancel (shutdown_task);
shutdown_task = GNUNET_SCHEDULER_add_now (&shutdown_op, NULL);
- GNUNET_SCHEDULER_shutdown ();
}
}
"Failed to connect to RPS service: %s\n",
emsg);
ok = 1;
- GNUNET_SCHEDULER_shutdown ();
+ shutdown_op (NULL);
return;
}
rps_peer->num_pending_reqs--;
}
-static void
-cancel_pending_req (struct PendingRequest *pending_req)
-{
- struct RPSPeer *rps_peer;
-
- rps_peer = pending_req->rps_peer;
- GNUNET_CONTAINER_DLL_remove (rps_peer->pending_req_head,
- rps_peer->pending_req_tail,
- pending_req);
- rps_peer->num_pending_reqs--;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Cancelling pending request\n");
- GNUNET_SCHEDULER_cancel (pending_req->request_task);
- GNUNET_free (pending_req);
-}
-
-static void
-cancel_request (struct PendingReply *pending_rep)
-{
- struct RPSPeer *rps_peer;
-
- rps_peer = pending_rep->rps_peer;
- GNUNET_CONTAINER_DLL_remove (rps_peer->pending_rep_head,
- rps_peer->pending_rep_tail,
- pending_rep);
- rps_peer->num_pending_reps--;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Cancelling request\n");
- GNUNET_RPS_request_cancel (pending_rep->req_handle);
- GNUNET_free (pending_rep);
-}
-
/**
* Schedule requests for peer @a rps_peer that have neither been scheduled, nor
if (NULL != emsg)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to start/stop RPS at a peer\n");
- GNUNET_SCHEDULER_shutdown ();
+ shutdown_op (NULL);
return;
}
GNUNET_assert (0 != entry->delta);
GNUNET_free (stat_cls);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Shutting down\n");
- GNUNET_SCHEDULER_shutdown ();
+ shutdown_op (NULL);
} else {
GNUNET_free (stat_cls);
}
{
const struct STATcls *stat_cls = (const struct STATcls *) cls;
struct RPSPeer *rps_peer = (struct RPSPeer *) stat_cls->rps_peer;
+
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got stat value: %s - %" PRIu64 "\n",
//stat_type_2_str (stat_cls->stat_type),
name,
value);
+ rps_peer->h_stat_get[stat_str_2_type (name)] = NULL;
to_file (rps_peer->file_name_stats,
"%s: %" PRIu64 "\n",
name,
stat_cls->stat_type = stat_type;
rps_peer->file_name_stats =
store_prefix_file_name (rps_peer->peer_id, "stats");
- GNUNET_STATISTICS_get (rps_peer->stats_h,
- "rps",
- stat_type_2_str (stat_type),
- post_test_shutdown_ready_cb,
- stat_iterator,
- (struct STATcls *) stat_cls);
+ rps_peer->h_stat_get[stat_type] = GNUNET_STATISTICS_get (
+ rps_peer->stats_h,
+ "rps",
+ stat_type_2_str (stat_type),
+ post_test_shutdown_ready_cb,
+ stat_iterator,
+ (struct STATcls *) stat_cls);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Requested statistics for %s (peer %" PRIu32 ")\n",
stat_type_2_str (stat_type),