/* P2P Messages */
+/**
+ * RPS check liveliness message to check liveliness of other peer
+ */
+#define GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE 950
+
/**
* RPS PUSH message to push own ID to another peer
*/
-#define GNUNET_MESSAGE_TYPE_RPS_PP_PUSH 950
+#define GNUNET_MESSAGE_TYPE_RPS_PP_PUSH 951
/**
* RPS PULL REQUEST message to request the local view of another peer
*/
-#define GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST 951
+#define GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST 952
/**
* RPS PULL REPLY message which contains the view of the other peer
*/
-#define GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY 952
+#define GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY 953
/**
* RPS CS REQUEST Message for the Client to request (a) random peer(s)
*/
-#define GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST 953
+#define GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST 954
/**
* RPS CS REPLY Message for the Server to send (a) random peer(s)
*/
-#define GNUNET_MESSAGE_TYPE_RPS_CS_REPLY 954
+#define GNUNET_MESSAGE_TYPE_RPS_CS_REPLY 955
/**
* RPS CS REQUEST CANCEL Message for the Client to cancel a request
*/
-#define GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST_CANCEL 955
+#define GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST_CANCEL 956
/**
* RPS CS SEED Message for the Client to seed peers into rps
*/
-#define GNUNET_MESSAGE_TYPE_RPS_CS_SEED 956
+#define GNUNET_MESSAGE_TYPE_RPS_CS_SEED 957
#ifdef ENABLE_MALICIOUS
/**
* Turn RPS service malicious
*/
-#define GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS 957
+#define GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS 958
#endif /* ENABLE_MALICIOUS */
*
* To be canceled on shutdown.
*/
- struct GNUNET_CADET_TransmitHandle *transmit_handle;
+ struct PendingMessage *liveliness_check_pending;
/**
* Number of pending operations.
struct GNUNET_PeerIdentity *peer;
unsigned int i;
- /* Cancle cadet transmit_handle if still scheduled */
- if (NULL != peer_ctx->transmit_handle)
- {
- GNUNET_CADET_notify_transmit_ready_cancel (peer_ctx->transmit_handle);
- peer_ctx->transmit_handle = NULL;
- }
-
peer = &peer_ctx->peer_id;
- (void) add_valid_peer (peer);
- set_peer_flag (peer_ctx, Peers_ONLINE);
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Peer %s is live and valid, calling %i pending operations on it\n",
GNUNET_i2s (peer),
peer_ctx->num_pending_ops);
+ if (NULL != peer_ctx->liveliness_check_pending)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Removing pending liveliness check for peer %s\n",
+ GNUNET_i2s (&peer_ctx->peer_id));
+ // TODO wait until cadet sets mq->cancel_impl
+ //GNUNET_MQ_send_cancel (peer_ctx->liveliness_check_pending->ev);
+ GNUNET_free (peer_ctx->liveliness_check_pending);
+ peer_ctx->liveliness_check_pending = NULL;
+ }
+
+ (void) add_valid_peer (peer);
+ set_peer_flag (peer_ctx, Peers_ONLINE);
+
/* Call pending operations */
for (i = 0; i < peer_ctx->num_pending_ops; i++)
{
struct PeerContext *peer_ctx;
peer_ctx = get_peer_ctx (peer);
- GNUNET_assert (NULL == peer_ctx->transmit_handle);
if (NULL == peer_ctx->mq)
{
return peer_ctx->mq;
}
+
/**
- * @brief Callback that is called when a channel was effectively established.
- *
- * This is an implementation of #GNUNET_CONNECTION_TransmitReadyNotify and
- * given to #GNUNET_CADET_notify_transmit_ready_cancel and called when the
- * channel was successfully established.
+ * @brief This is called in response to the first message we sent as a
+ * liveliness check.
*
- * This function type was originally ment to be called to provide the data to
- * be sent. This is called when the connection is ready to queue more data.
- * However we use it to get notified about the successful establishement of a
- * cadet channel.
- *
- * @a buf will be NULL and @a size zero if the
- * connection was closed for writing in the meantime.
- *
- * @param cls closure
- * @param size number of bytes available in @a buf
- * @param buf where the callee should write the message
- * @return number of bytes written to @a buf
+ * @param cls #PeerContext of peer with pending liveliness check
*/
-//TODO
-static size_t
-cadet_notify_transmit_ready_cb (void *cls, size_t size, void *buf)
+static void
+mq_liveliness_check_successful (void *cls)
{
- struct PeerContext *peer_ctx = (struct PeerContext *) cls;
- // TODO make sure the context is not deleted or the establishing of the
- // channel is cancelled
+ struct PeerContext *peer_ctx = cls;
- peer_ctx->transmit_handle = NULL;
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Set ->transmit_handle = NULL for peer %s\n",
- GNUNET_i2s (&peer_ctx->peer_id));
-
- if ( (NULL != buf) &&
- (0 != size) )
+ if (NULL != peer_ctx->liveliness_check_pending)
{
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Liveliness check for peer %s was successfull\n",
+ GNUNET_i2s (&peer_ctx->peer_id));
+ GNUNET_free (peer_ctx->liveliness_check_pending);
+ peer_ctx->liveliness_check_pending = NULL;
set_peer_live (peer_ctx);
}
- else
- {
- LOG (GNUNET_ERROR_TYPE_WARNING,
- "Problems establishing a connection to peer %s in order to check liveliness\n",
- GNUNET_i2s (&peer_ctx->peer_id));
- // TODO reschedule? cleanup?
- }
- return 0;
}
/**
"Get informed about peer %s getting live\n",
GNUNET_i2s (&peer_ctx->peer_id));
- if (NULL != peer_ctx->transmit_handle)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Already waiting for notification\n");
- return;
- }
- if (NULL != peer_ctx->send_channel)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Already have established channel to peer\n");
- return;
- }
- (void) get_channel (&peer_ctx->peer_id);
- peer_ctx->transmit_handle =
- GNUNET_CADET_notify_transmit_ready (peer_ctx->send_channel,
- GNUNET_NO,
- GNUNET_TIME_UNIT_FOREVER_REL,
- sizeof (struct GNUNET_MessageHeader),
- cadet_notify_transmit_ready_cb,
- peer_ctx);
- if (NULL == peer_ctx->transmit_handle)
- {
- LOG (GNUNET_ERROR_TYPE_WARNING,
- "Cadet was not able to queue the request (insufficient memory)\n");
- GNUNET_break (0);
- }
+ struct GNUNET_MQ_Handle *mq;
+ struct GNUNET_MQ_Envelope *ev;
+
+ ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE);
+ peer_ctx->liveliness_check_pending = GNUNET_new (struct PendingMessage);
+ peer_ctx->liveliness_check_pending->ev = ev;
+ peer_ctx->liveliness_check_pending->peer_ctx = peer_ctx;
+ peer_ctx->liveliness_check_pending->type = "Check liveliness";
+ mq = get_mq (&peer_ctx->peer_id);
+ GNUNET_MQ_notify_sent (ev,
+ mq_liveliness_check_successful,
+ peer_ctx);
+ GNUNET_MQ_send (mq, ev);
}
/**
Peers_unset_peer_flag (peer, Peers_ONLINE);
GNUNET_array_grow (peer_ctx->pending_ops, peer_ctx->num_pending_ops, 0);
- // TODO delete struct GNUNET_TRANSPORT_TransmitHandle *transmit_handle
- /* Cancle messages that have not been sent yet */
while (NULL != peer_ctx->pending_messages_head)
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
}
/* If we are still waiting for notification whether this peer is live
* cancel the according task */
- if (NULL != peer_ctx->transmit_handle)
+ if (NULL != peer_ctx->liveliness_check_pending)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Trying to cancle transmit_handle for peer %s\n",
+ "Removing pending liveliness check for peer %s\n",
GNUNET_i2s (&peer_ctx->peer_id));
- GNUNET_CADET_notify_transmit_ready_cancel (peer_ctx->transmit_handle);
- peer_ctx->transmit_handle = NULL;
+ // TODO wait until cadet sets mq->cancel_impl
+ //GNUNET_MQ_send_cancel (peer_ctx->liveliness_check_pending->ev);
+ GNUNET_free (peer_ctx->liveliness_check_pending);
+ peer_ctx->liveliness_check_pending = NULL;
}
if (NULL != peer_ctx->send_channel)
{