Implement the connect and create_channel call for mq api
authorBart Polot <bart.polot+voyager@gmail.com>
Tue, 31 Jan 2017 01:58:54 +0000 (02:58 +0100)
committerBart Polot <bart.polot+voyager@gmail.com>
Tue, 31 Jan 2017 01:58:54 +0000 (02:58 +0100)
src/cadet/cadet_api.c

index 2b50f781c466c5152b9d80a9bfdfb3a21ad9d945..3491bd75f3e3eead6524cb87395fbdd8860c2c55 100644 (file)
@@ -38,6 +38,8 @@
 
 /**
  * Transmission queue to the service
+ *
+ * @deprecated
  */
 struct GNUNET_CADET_TransmitHandle
 {
@@ -116,6 +118,11 @@ union CadetInfoCB
  */
 struct GNUNET_CADET_Handle
 {
+  /**
+   * Flag to indicate old or MQ API.
+   */
+  int mq_api;
+
   /**
    * Message queue (if available).
    */
@@ -123,11 +130,15 @@ struct GNUNET_CADET_Handle
 
   /**
    * Set of handlers used for processing incoming messages in the channels
+   *
+   * @deprecated
    */
   const struct GNUNET_CADET_MessageHandler *message_handlers;
 
   /**
    * Number of handlers in the handlers array.
+   *
+   * @deprecated
    */
   unsigned int n_handlers;
 
@@ -153,16 +164,22 @@ struct GNUNET_CADET_Handle
 
   /**
    * Closure for all the handlers given by the client
+   *
+   * @deprecated
    */
   void *cls;
 
   /**
    * Messages to send to the service, head.
+   *
+   * @deprecated
    */
   struct GNUNET_CADET_TransmitHandle *th_head;
 
   /**
    * Messages to send to the service, tail.
+   *
+   * @deprecated
    */
   struct GNUNET_CADET_TransmitHandle *th_tail;
 
@@ -241,9 +258,9 @@ struct GNUNET_CADET_Channel
   struct GNUNET_CADET_ClientChannelNumber ccn;
 
   /**
-   * Channel's port, if any.
+   * Channel's port, if incoming.
    */
-  struct GNUNET_CADET_Port *port;
+  struct GNUNET_CADET_Port *incoming_port;
 
   /**
    * Other end of the channel.
@@ -262,9 +279,30 @@ struct GNUNET_CADET_Channel
 
   /**
    * Are we allowed to send to the service?
+   *
+   * @deprecated
    */
   unsigned int allow_send;
 
+  /****************************************************************************/
+  /*****************************    MQ     ************************************/
+  /****************************************************************************/
+
+  /**
+   * Message Queue for the channel.
+   */
+  struct GNUNET_MQ_Handle *mq;
+
+  /**
+   * Window change handler.
+   */
+  GNUNET_CADET_WindowSizeEventHandler window_changes;
+
+  /**
+   * Disconnect handler.
+   */
+  GNUNET_CADET_DisconnectEventHandler disconnects;
+
 };
 
 
@@ -611,7 +649,7 @@ handle_channel_created (void *cls,
   ch->peer = GNUNET_PEER_intern (&msg->peer);
   ch->cadet = h;
   ch->ccn = ccn;
-  ch->port = port;
+  ch->incoming_port = port;
   ch->options = ntohl (msg->opt);
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -2047,9 +2085,9 @@ cadet_mq_ntr (void *cls, size_t size,
  * @param impl_state state of the implementation
  */
 static void
-cadet_mq_send_impl (struct GNUNET_MQ_Handle *mq,
-                    const struct GNUNET_MessageHeader *msg,
-                    void *impl_state)
+cadet_mq_send_impl_old (struct GNUNET_MQ_Handle *mq,
+                        const struct GNUNET_MessageHeader *msg,
+                        void *impl_state)
 {
   struct CadetMQState *state = impl_state;
 
@@ -2075,8 +2113,8 @@ cadet_mq_send_impl (struct GNUNET_MQ_Handle *mq,
  * @param impl_state state of the implementation
  */
 static void
-cadet_mq_destroy_impl (struct GNUNET_MQ_Handle *mq,
-                       void *impl_state)
+cadet_mq_destroy_impl_old (struct GNUNET_MQ_Handle *mq,
+                           void *impl_state)
 {
   struct CadetMQState *state = impl_state;
 
@@ -2104,8 +2142,8 @@ GNUNET_CADET_mq_create (struct GNUNET_CADET_Channel *channel)
   state = GNUNET_new (struct CadetMQState);
   state->channel = channel;
 
-  mq = GNUNET_MQ_queue_for_callbacks (&cadet_mq_send_impl,
-                                      &cadet_mq_destroy_impl,
+  mq = GNUNET_MQ_queue_for_callbacks (&cadet_mq_send_impl_old,
+                                      &cadet_mq_destroy_impl_old,
                                       NULL, /* FIXME: cancel impl. */
                                       state,
                                       NULL, /* no msg handlers */
@@ -2136,3 +2174,253 @@ GC_u2h (uint32_t port)
 
   return &hash;
 }
+
+
+
+/******************************************************************************/
+/******************************* MQ-BASED API *********************************/
+/******************************************************************************/
+
+/**
+ * Connect to the MQ-based cadet service.
+ *
+ * @param cfg Configuration to use.
+ *
+ * @return Handle to the cadet service NULL on error.
+ */
+struct GNUNET_CADET_Handle *
+GNUNET_CADET_connecT (const struct GNUNET_CONFIGURATION_Handle *cfg)
+{
+  struct GNUNET_CADET_Handle *h;
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "GNUNET_CADET_connecT()\n");
+  h = GNUNET_new (struct GNUNET_CADET_Handle);
+  h->cfg = cfg;
+  h->mq_api = GNUNET_YES;
+  h->ports = GNUNET_CONTAINER_multihashmap_create (4, GNUNET_YES);
+  do_reconnect (h);
+  if (h->mq == NULL)
+  {
+    GNUNET_break (0);
+    GNUNET_CADET_disconnect (h);
+    return NULL;
+  }
+  h->next_ccn.channel_of_client = htonl (GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
+  h->reconnect_time = GNUNET_TIME_UNIT_MILLISECONDS;
+  h->reconnect_task = NULL;
+
+  return h;
+}
+
+
+/**
+ * Open a port to receive incomming MQ-based channels.
+ *
+ * @param h CADET handle.
+ * @param port Hash identifying the port.
+ * @param connects Function called when an incoming channel is connected.
+ * @param connects_cls Closure for the @a connects handler.
+ * @param window_changes Function called when the transmit window size changes.
+ * @param disconnects Function called when a channel is disconnected.
+ * @param handlers Callbacks for messages we care about, NULL-terminated.
+ *
+ * @return Port handle.
+ */
+struct GNUNET_CADET_Port *
+GNUNET_CADET_open_porT (struct GNUNET_CADET_Handle *h,
+                        const struct GNUNET_HashCode *port,
+                        GNUNET_CADET_ConnectEventHandler connects,
+                        void * connects_cls,
+                        GNUNET_CADET_WindowSizeEventHandler window_changes,
+                        GNUNET_CADET_DisconnectEventHandler disconnects,
+                        const struct GNUNET_MQ_MessageHandler *handlers)
+{
+  return NULL;
+}
+
+
+/**
+ * Implement sending functionality of a message queue for
+ * us sending messages to a peer.
+ *
+ * Encapsulates the payload message in a #GNUNET_CADET_LocalData message
+ * in order to label the message with the channel ID and send the
+ * encapsulated message to the service.
+ *
+ * @param mq the message queue
+ * @param msg the message to send
+ * @param impl_state state of the implementation
+ */
+static void
+cadet_mq_send_impl (struct GNUNET_MQ_Handle *mq,
+                    const struct GNUNET_MessageHeader *msg,
+                    void *impl_state)
+{
+  struct GNUNET_CADET_Channel *ch = impl_state;
+  struct GNUNET_CADET_Handle *h = ch->cadet;
+  uint16_t msize;
+  struct GNUNET_MQ_Envelope *env;
+  struct GNUNET_CADET_LocalData *cadet_msg;
+
+
+  if (NULL == h->mq)
+  {
+    /* We're currently reconnecting, pretend this worked */
+    GNUNET_MQ_impl_send_continue (mq);
+    return;
+  }
+
+  /* check message size for sanity */
+  msize = ntohs (msg->size);
+  if (msize > GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE)
+  {
+    GNUNET_break (0);
+    GNUNET_MQ_impl_send_continue (mq);
+    return;
+  }
+
+  env = GNUNET_MQ_msg_nested_mh (cadet_msg,
+                                 GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA,
+                                 msg);
+  cadet_msg->ccn = ch->ccn;
+  GNUNET_MQ_send (h->mq, env);
+  GNUNET_MQ_impl_send_continue (mq);
+}
+
+
+/**
+ * Handle destruction of a message queue.  Implementations must not
+ * free @a mq, but should take care of @a impl_state.
+ *
+ * @param mq the message queue to destroy
+ * @param impl_state state of the implementation
+ */
+static void
+cadet_mq_destroy_impl (struct GNUNET_MQ_Handle *mq,
+                       void *impl_state)
+{
+  struct GNUNET_CADET_Channel *ch = impl_state;
+
+  GNUNET_assert (mq == ch->mq);
+  ch->mq = NULL;
+}
+
+
+/**
+ * We had an error processing a message we forwarded from a peer to
+ * the CADET service.  We should just complain about it but otherwise
+ * continue processing.
+ *
+ * @param cls closure
+ * @param error error code
+ */
+static void
+cadet_mq_error_handler (void *cls,
+                        enum GNUNET_MQ_Error error)
+{
+  GNUNET_break_op (0);
+}
+
+
+/**
+ * Implementation function that cancels the currently sent message.
+ * Should basically undo whatever #mq_send_impl() did.
+ *
+ * @param mq message queue
+ * @param impl_state state specific to the implementation
+ */
+static void
+cadet_mq_cancel_impl (struct GNUNET_MQ_Handle *mq,
+                     void *impl_state)
+{
+  struct GNUNET_CADET_Channel *ch = impl_state;
+
+  LOG (GNUNET_ERROR_TYPE_WARNING,
+       "Cannot cancel mq message on channel %X of %p\n",
+       ch->ccn.channel_of_client, ch->cadet);
+
+  GNUNET_break (0);
+}
+
+
+/**
+ * Create a new channel towards a remote peer.
+ *
+ * If the destination port is not open by any peer or the destination peer
+ * does not accept the channel, #GNUNET_CADET_ChannelEndHandler will be called
+ * for this channel.
+ *
+ * @param h CADET handle.
+ * @param channel_cls Closure for the channel. It's given to:
+ *                    - The disconnect handler @a disconnects
+ *                    - Each message type callback in @a handlers
+ * @param destination Peer identity the channel should go to.
+ * @param port Identification of the destination port.
+ * @param options CadetOption flag field, with all desired option bits set to 1.
+ * @param window_changes Function called when the transmit window size changes.
+ * @param disconnects Function called when the channel is disconnected.
+ * @param handlers Callbacks for messages we care about, NULL-terminated.
+ *
+ * @return Handle to the channel.
+ */
+struct GNUNET_CADET_Channel *
+GNUNET_CADET_channel_creatE (struct GNUNET_CADET_Handle *h,
+                             void *channel_cls,
+                             const struct GNUNET_PeerIdentity *destination,
+                             const struct GNUNET_HashCode *port,
+                             enum GNUNET_CADET_ChannelOption options,
+                             GNUNET_CADET_WindowSizeEventHandler window_changes,
+                             GNUNET_CADET_DisconnectEventHandler disconnects,
+                             const struct GNUNET_MQ_MessageHandler *handlers)
+{
+  struct GNUNET_CADET_Channel *ch;
+  struct GNUNET_CADET_ClientChannelNumber ccn;
+  struct GNUNET_CADET_LocalChannelCreateMessage *msg;
+  struct GNUNET_MQ_Envelope *env;
+
+  /* Save parameters */
+  ccn.channel_of_client = htonl (0);
+  ch = create_channel (h, ccn);
+  ch->ctx = channel_cls;
+  ch->peer = GNUNET_PEER_intern (destination);
+  ch->options = options;
+  ch->window_changes = window_changes;
+  ch->disconnects = disconnects;
+
+  /* Create MQ for channel */
+  ch->mq = GNUNET_MQ_queue_for_callbacks (&cadet_mq_send_impl,
+                                          &cadet_mq_destroy_impl,
+                                          &cadet_mq_cancel_impl,
+                                          ch,
+                                          handlers,
+                                          &cadet_mq_error_handler,
+                                          ch);
+  GNUNET_MQ_set_handlers_closure (ch->mq, channel_cls);
+
+  /* Request channel creation to service */
+  env = GNUNET_MQ_msg (msg,
+                       GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_CREATE);
+  msg->ccn = ch->ccn;
+  msg->port = *port;
+  msg->peer = *destination;
+  msg->opt = htonl (options);
+  GNUNET_MQ_send (h->mq,
+                  env);
+
+  return ch;
+}
+
+
+/**
+ * Obtain the message queue for a connected peer.
+ *
+ * @param channel The channel handle from which to get the MQ.
+ *
+ * @return NULL if @a channel is not yet connected.
+ */
+struct GNUNET_MQ_Handle *
+GNUNET_CADET_get_mq (const struct GNUNET_CADET_Channel *channel)
+{
+  return channel->mq;
+}