/**
* Are we allowed to send to the service?
*
- * @deprecated
+ * @deprecated?
*/
unsigned int allow_send;
- /****************************************************************************/
/***************************** MQ ************************************/
- /****************************************************************************/
-
/**
* Message Queue for the channel.
*/
* Closure for @a handler.
*/
void *cls;
+
+ /***************************** MQ ************************************/
+
+ /**
+ * Port "number"
+ */
+ struct GNUNET_HashCode id;
+
+ /**
+ * Handler for incoming channels on this port
+ */
+ GNUNET_CADET_ConnectEventHandler connects;
+
+ /**
+ * Closure for @ref connects
+ */
+ void * connects_cls;
+
+ /**
+ * Window size change handler.
+ */
+ GNUNET_CADET_WindowSizeEventHandler window_changes;
+
+ /**
+ * Handler called when an incoming channel is destroyed..
+ */
+ GNUNET_CADET_DisconnectEventHandler disconnects;
+
+ /**
+ * Payload handlers for incoming channels.
+ */
+ const struct GNUNET_MQ_MessageHandler *handlers;
};
}
+/******************************************************************************/
+/*********************** MQ API CALLBACKS ****************************/
+/******************************************************************************/
+
+
+/**
+ * Implement sending functionality of a message queue for
+ * us sending messages to a peer.
+ *
+ * Encapsulates the payload message in a #GNUNET_CADET_LocalData message
+ * in order to label the message with the channel ID and send the
+ * encapsulated message to the service.
+ *
+ * @param mq the message queue
+ * @param msg the message to send
+ * @param impl_state state of the implementation
+ */
+static void
+cadet_mq_send_impl (struct GNUNET_MQ_Handle *mq,
+ const struct GNUNET_MessageHeader *msg,
+ void *impl_state)
+{
+ struct GNUNET_CADET_Channel *ch = impl_state;
+ struct GNUNET_CADET_Handle *h = ch->cadet;
+ uint16_t msize;
+ struct GNUNET_MQ_Envelope *env;
+ struct GNUNET_CADET_LocalData *cadet_msg;
+
+
+ if (NULL == h->mq)
+ {
+ /* We're currently reconnecting, pretend this worked */
+ GNUNET_MQ_impl_send_continue (mq);
+ return;
+ }
+
+ /* check message size for sanity */
+ msize = ntohs (msg->size);
+ if (msize > GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE)
+ {
+ GNUNET_break (0);
+ GNUNET_MQ_impl_send_continue (mq);
+ return;
+ }
+
+ env = GNUNET_MQ_msg_nested_mh (cadet_msg,
+ GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA,
+ msg);
+ cadet_msg->ccn = ch->ccn;
+ GNUNET_MQ_send (h->mq, env);
+ GNUNET_MQ_impl_send_continue (mq);
+}
+
+
+/**
+ * Handle destruction of a message queue. Implementations must not
+ * free @a mq, but should take care of @a impl_state.
+ *
+ * @param mq the message queue to destroy
+ * @param impl_state state of the implementation
+ */
+static void
+cadet_mq_destroy_impl (struct GNUNET_MQ_Handle *mq,
+ void *impl_state)
+{
+ struct GNUNET_CADET_Channel *ch = impl_state;
+
+ GNUNET_assert (mq == ch->mq);
+ ch->mq = NULL;
+}
+
+
+/**
+ * We had an error processing a message we forwarded from a peer to
+ * the CADET service. We should just complain about it but otherwise
+ * continue processing.
+ *
+ * @param cls closure
+ * @param error error code
+ */
+static void
+cadet_mq_error_handler (void *cls,
+ enum GNUNET_MQ_Error error)
+{
+ GNUNET_break_op (0);
+}
+
+
+/**
+ * Implementation function that cancels the currently sent message.
+ * Should basically undo whatever #mq_send_impl() did.
+ *
+ * @param mq message queue
+ * @param impl_state state specific to the implementation
+ */
+static void
+cadet_mq_cancel_impl (struct GNUNET_MQ_Handle *mq,
+ void *impl_state)
+{
+ struct GNUNET_CADET_Channel *ch = impl_state;
+
+ LOG (GNUNET_ERROR_TYPE_WARNING,
+ "Cannot cancel mq message on channel %X of %p\n",
+ ch->ccn.channel_of_client, ch->cadet);
+
+ GNUNET_break (0);
+}
+
/******************************************************************************/
/*********************** RECEIVE HANDLERS ****************************/
port = find_port (h, port_number);
if (NULL == port)
{
+ /* We could have closed the port but the service didn't know about it yet
+ * This is not an error.
+ */
struct GNUNET_CADET_LocalChannelDestroyMessage *d_msg;
struct GNUNET_MQ_Envelope *env;
GNUNET_break (0);
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "No handler for incoming channel %X [%s]\n",
+ "No handler for incoming channel %X (on port %s, recently closed?)\n",
ntohl (ccn.channel_of_client),
GNUNET_h2s (port_number));
- /* FIXME: should disconnect instead, this is a serious error! */
env = GNUNET_MQ_msg (d_msg,
GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_DESTROY);
d_msg->ccn = msg->ccn;
ch->ccn = ccn;
ch->incoming_port = port;
ch->options = ntohl (msg->opt);
-
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Creating incoming channel %X [%s] %p\n",
ntohl (ccn.channel_of_client),
GNUNET_h2s (port_number),
ch);
- ch->ctx = port->handler (port->cls,
- ch,
- &msg->peer,
- port->hash,
- ch->options);
+
+ if (NULL != port->handler)
+ {
+ /** @deprecated */
+ /* Old style API */
+ ch->ctx = port->handler (port->cls,
+ ch,
+ &msg->peer,
+ port->hash,
+ ch->options);
+ } else {
+ /* MQ API */
+ GNUNET_assert (NULL != port->connects);
+ ch->window_changes = port->window_changes;
+ ch->disconnects = port->disconnects;
+ ch->mq = GNUNET_MQ_queue_for_callbacks (&cadet_mq_send_impl,
+ &cadet_mq_destroy_impl,
+ &cadet_mq_cancel_impl,
+ ch,
+ port->handlers,
+ &cadet_mq_error_handler,
+ ch);
+ ch->ctx = port->connects (port->cadet->cls,
+ ch,
+ &msg->peer);
+ GNUNET_MQ_set_handlers_closure (ch->mq, ch->ctx);
+ }
}
GNUNET_CADET_DisconnectEventHandler disconnects,
const struct GNUNET_MQ_MessageHandler *handlers)
{
- return NULL;
-}
-
-
-/**
- * Implement sending functionality of a message queue for
- * us sending messages to a peer.
- *
- * Encapsulates the payload message in a #GNUNET_CADET_LocalData message
- * in order to label the message with the channel ID and send the
- * encapsulated message to the service.
- *
- * @param mq the message queue
- * @param msg the message to send
- * @param impl_state state of the implementation
- */
-static void
-cadet_mq_send_impl (struct GNUNET_MQ_Handle *mq,
- const struct GNUNET_MessageHeader *msg,
- void *impl_state)
-{
- struct GNUNET_CADET_Channel *ch = impl_state;
- struct GNUNET_CADET_Handle *h = ch->cadet;
- uint16_t msize;
+ struct GNUNET_CADET_PortMessage *msg;
struct GNUNET_MQ_Envelope *env;
- struct GNUNET_CADET_LocalData *cadet_msg;
+ struct GNUNET_CADET_Port *p;
+ GNUNET_assert (NULL != connects);
- if (NULL == h->mq)
- {
- /* We're currently reconnecting, pretend this worked */
- GNUNET_MQ_impl_send_continue (mq);
- return;
- }
+ p = GNUNET_new (struct GNUNET_CADET_Port);
+ p->cadet = h;
+ p->id = *port;
+ p->connects = connects;
+ p->cls = connects_cls;
+ p->window_changes = window_changes;
+ p->disconnects = disconnects;
+ p->handlers = handlers;
- /* check message size for sanity */
- msize = ntohs (msg->size);
- if (msize > GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE)
- {
- GNUNET_break (0);
- GNUNET_MQ_impl_send_continue (mq);
- return;
- }
+ GNUNET_assert (GNUNET_OK ==
+ GNUNET_CONTAINER_multihashmap_put (h->ports,
+ p->hash,
+ p,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
- env = GNUNET_MQ_msg_nested_mh (cadet_msg,
- GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA,
- msg);
- cadet_msg->ccn = ch->ccn;
+ env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_PORT_OPEN);
+ msg->port = p->id;
GNUNET_MQ_send (h->mq, env);
- GNUNET_MQ_impl_send_continue (mq);
-}
-
-/**
- * Handle destruction of a message queue. Implementations must not
- * free @a mq, but should take care of @a impl_state.
- *
- * @param mq the message queue to destroy
- * @param impl_state state of the implementation
- */
-static void
-cadet_mq_destroy_impl (struct GNUNET_MQ_Handle *mq,
- void *impl_state)
-{
- struct GNUNET_CADET_Channel *ch = impl_state;
-
- GNUNET_assert (mq == ch->mq);
- ch->mq = NULL;
-}
-
-
-/**
- * We had an error processing a message we forwarded from a peer to
- * the CADET service. We should just complain about it but otherwise
- * continue processing.
- *
- * @param cls closure
- * @param error error code
- */
-static void
-cadet_mq_error_handler (void *cls,
- enum GNUNET_MQ_Error error)
-{
- GNUNET_break_op (0);
-}
-
-
-/**
- * Implementation function that cancels the currently sent message.
- * Should basically undo whatever #mq_send_impl() did.
- *
- * @param mq message queue
- * @param impl_state state specific to the implementation
- */
-static void
-cadet_mq_cancel_impl (struct GNUNET_MQ_Handle *mq,
- void *impl_state)
-{
- struct GNUNET_CADET_Channel *ch = impl_state;
-
- LOG (GNUNET_ERROR_TYPE_WARNING,
- "Cannot cancel mq message on channel %X of %p\n",
- ch->ccn.channel_of_client, ch->cadet);
-
- GNUNET_break (0);
+ return p;
}