WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Affero General Public License for more details.
-
+
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
*/
void *ready_cb_cls;
+ /**
+ * @brief Scheduler task for scheduled callback
+ */
+ struct GNUNET_SCHEDULER_Task *callback_task;
+
/**
* @brief Next element of the DLL
*/
};
+/**
+ * @brief Peers received from the biased stream to be passed to all
+ * srh_handlers
+ */
+static struct GNUNET_PeerIdentity *srh_callback_peers;
+
+/**
+ * @brief Number of peers in the biased stream that are to be passed to all
+ * srh_handlers
+ */
+static uint64_t srh_callback_num_peers;
+
+
/**
* @brief Create a new handle for a stream request
*
struct GNUNET_RPS_StreamRequestHandle *srh_head,
struct GNUNET_RPS_StreamRequestHandle *srh_tail)
{
+ GNUNET_assert (NULL != srh);
+ if (NULL != srh->callback_task)
+ {
+ GNUNET_SCHEDULER_cancel (srh->callback_task);
+ srh->callback_task = NULL;
+ }
GNUNET_CONTAINER_DLL_remove (srh_head,
srh_tail,
srh);
{
struct GNUNET_RPS_Handle *rps_handle;
- GNUNET_assert (NULL != srh);
rps_handle = srh->rps_handle;
- GNUNET_CONTAINER_DLL_remove (rps_handle->stream_requests_head,
- rps_handle->stream_requests_tail,
- srh);
- GNUNET_free (srh);
+ remove_stream_request (srh,
+ rps_handle->stream_requests_head,
+ rps_handle->stream_requests_tail);
if (NULL == rps_handle->stream_requests_head) cancel_stream (rps_handle);
}
return GNUNET_OK;
}
+
+/**
+ * @brief Called by the scheduler to call the callbacks of the srh handlers
+ *
+ * @param cls Stream request handle
+ */
+static void
+srh_callback_scheduled (void *cls)
+{
+ struct GNUNET_RPS_StreamRequestHandle *srh = cls;
+
+ srh->callback_task = NULL;
+ srh->ready_cb (srh->ready_cb_cls,
+ srh_callback_num_peers,
+ srh_callback_peers);
+}
+
+
/**
* This function is called, when the service sends another peer from the biased
* stream.
const struct GNUNET_RPS_CS_DEBUG_StreamReply *msg)
{
struct GNUNET_RPS_Handle *h = cls;
- const struct GNUNET_PeerIdentity *peers;
+ //const struct GNUNET_PeerIdentity *peers;
uint64_t num_peers;
struct GNUNET_RPS_StreamRequestHandle *srh_iter;
struct GNUNET_RPS_StreamRequestHandle *srh_next;
- peers = (struct GNUNET_PeerIdentity *) &msg[1];
+ //peers = (struct GNUNET_PeerIdentity *) &msg[1];
num_peers = ntohl (msg->num_peers);
+ srh_callback_num_peers = num_peers;
+ if (NULL != srh_callback_peers) GNUNET_free (srh_callback_peers);
+ srh_callback_peers =
+ GNUNET_malloc (num_peers * sizeof (struct GNUNET_PeerIdentity));
+ GNUNET_memcpy (srh_callback_peers,
+ &msg[1],
+ num_peers * sizeof (struct GNUNET_PeerIdentity));
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Received %" PRIu64 " peer(s) from stream input.\n",
num_peers);
LOG (GNUNET_ERROR_TYPE_DEBUG, "Calling srh \n");
/* Store next pointer - srh might be removed/freed in callback */
srh_next = srh_iter->next;
- srh_iter->ready_cb (srh_iter->ready_cb_cls,
- num_peers,
- peers);
+ if (NULL != srh_iter->callback_task)
+ {
+ GNUNET_SCHEDULER_cancel (srh_iter->callback_task);
+ }
+ srh_iter->callback_task =
+ GNUNET_SCHEDULER_add_now (srh_callback_scheduled, srh_iter);
srh_iter = srh_next;
}
h = rh->rps_handle;
GNUNET_assert (NULL != rh);
- GNUNET_assert (NULL != rh->srh);
- remove_stream_request (rh->srh,
- h->stream_requests_head,
- h->stream_requests_tail);
+ GNUNET_assert (h == rh->srh->rps_handle);
+ GNUNET_RPS_stream_cancel (rh->srh);
+ rh->srh = NULL;
if (NULL == h->stream_requests_head) cancel_stream(h);
if (NULL != rh->sampler_rh)
{
GNUNET_RPS_stream_cancel (srh_tmp);
}
}
+ if (NULL != srh_callback_peers)
+ {
+ GNUNET_free (srh_callback_peers);
+ srh_callback_peers = NULL;
+ }
if (NULL != h->view_update_cb)
{
LOG (GNUNET_ERROR_TYPE_WARNING,