Implementation of port opening and handling in MQ
authorBart Polot <bart.polot+voyager@gmail.com>
Tue, 31 Jan 2017 03:17:58 +0000 (04:17 +0100)
committerBart Polot <bart.polot+voyager@gmail.com>
Tue, 31 Jan 2017 03:17:58 +0000 (04:17 +0100)
src/cadet/cadet_api.c

index 3491bd75f3e3eead6524cb87395fbdd8860c2c55..3eaa78af86ee00f07897938bd71cd82263d00127 100644 (file)
@@ -280,14 +280,11 @@ struct GNUNET_CADET_Channel
   /**
    * Are we allowed to send to the service?
    *
-   * @deprecated
+   * @deprecated?
    */
   unsigned int allow_send;
 
-  /****************************************************************************/
   /*****************************    MQ     ************************************/
-  /****************************************************************************/
-
   /**
    * Message Queue for the channel.
    */
@@ -330,6 +327,38 @@ struct GNUNET_CADET_Port
    * Closure for @a handler.
    */
   void *cls;
+
+  /*****************************    MQ     ************************************/
+
+  /**
+   * Port "number"
+   */
+  struct GNUNET_HashCode id;
+
+  /**
+   * Handler for incoming channels on this port
+   */
+  GNUNET_CADET_ConnectEventHandler connects;
+
+  /**
+   * Closure for @ref connects
+   */
+  void * connects_cls;
+
+  /**
+   * Window size change handler.
+   */
+  GNUNET_CADET_WindowSizeEventHandler window_changes;
+
+  /**
+   * Handler called when an incoming channel is destroyed..
+   */
+  GNUNET_CADET_DisconnectEventHandler disconnects;
+
+  /**
+   * Payload handlers for incoming channels.
+   */
+  const struct GNUNET_MQ_MessageHandler *handlers;
 };
 
 
@@ -553,6 +582,114 @@ remove_from_queue (struct GNUNET_CADET_TransmitHandle *th)
 }
 
 
+/******************************************************************************/
+/***********************      MQ API CALLBACKS     ****************************/
+/******************************************************************************/
+
+
+/**
+ * Implement sending functionality of a message queue for
+ * us sending messages to a peer.
+ *
+ * Encapsulates the payload message in a #GNUNET_CADET_LocalData message
+ * in order to label the message with the channel ID and send the
+ * encapsulated message to the service.
+ *
+ * @param mq the message queue
+ * @param msg the message to send
+ * @param impl_state state of the implementation
+ */
+static void
+cadet_mq_send_impl (struct GNUNET_MQ_Handle *mq,
+                    const struct GNUNET_MessageHeader *msg,
+                    void *impl_state)
+{
+  struct GNUNET_CADET_Channel *ch = impl_state;
+  struct GNUNET_CADET_Handle *h = ch->cadet;
+  uint16_t msize;
+  struct GNUNET_MQ_Envelope *env;
+  struct GNUNET_CADET_LocalData *cadet_msg;
+
+
+  if (NULL == h->mq)
+  {
+    /* We're currently reconnecting, pretend this worked */
+    GNUNET_MQ_impl_send_continue (mq);
+    return;
+  }
+
+  /* check message size for sanity */
+  msize = ntohs (msg->size);
+  if (msize > GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE)
+  {
+    GNUNET_break (0);
+    GNUNET_MQ_impl_send_continue (mq);
+    return;
+  }
+
+  env = GNUNET_MQ_msg_nested_mh (cadet_msg,
+                                 GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA,
+                                 msg);
+  cadet_msg->ccn = ch->ccn;
+  GNUNET_MQ_send (h->mq, env);
+  GNUNET_MQ_impl_send_continue (mq);
+}
+
+
+/**
+ * Handle destruction of a message queue.  Implementations must not
+ * free @a mq, but should take care of @a impl_state.
+ *
+ * @param mq the message queue to destroy
+ * @param impl_state state of the implementation
+ */
+static void
+cadet_mq_destroy_impl (struct GNUNET_MQ_Handle *mq,
+                       void *impl_state)
+{
+  struct GNUNET_CADET_Channel *ch = impl_state;
+
+  GNUNET_assert (mq == ch->mq);
+  ch->mq = NULL;
+}
+
+
+/**
+ * We had an error processing a message we forwarded from a peer to
+ * the CADET service.  We should just complain about it but otherwise
+ * continue processing.
+ *
+ * @param cls closure
+ * @param error error code
+ */
+static void
+cadet_mq_error_handler (void *cls,
+                        enum GNUNET_MQ_Error error)
+{
+  GNUNET_break_op (0);
+}
+
+
+/**
+ * Implementation function that cancels the currently sent message.
+ * Should basically undo whatever #mq_send_impl() did.
+ *
+ * @param mq message queue
+ * @param impl_state state specific to the implementation
+ */
+static void
+cadet_mq_cancel_impl (struct GNUNET_MQ_Handle *mq,
+                     void *impl_state)
+{
+  struct GNUNET_CADET_Channel *ch = impl_state;
+
+  LOG (GNUNET_ERROR_TYPE_WARNING,
+       "Cannot cancel mq message on channel %X of %p\n",
+       ch->ccn.channel_of_client, ch->cadet);
+
+  GNUNET_break (0);
+}
+
 
 /******************************************************************************/
 /***********************      RECEIVE HANDLERS     ****************************/
@@ -627,15 +764,17 @@ handle_channel_created (void *cls,
   port = find_port (h, port_number);
   if (NULL == port)
   {
+    /* We could have closed the port but the service didn't know about it yet
+     * This is not an error.
+     */
     struct GNUNET_CADET_LocalChannelDestroyMessage *d_msg;
     struct GNUNET_MQ_Envelope *env;
 
     GNUNET_break (0);
     LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "No handler for incoming channel %X [%s]\n",
+         "No handler for incoming channel %X (on port %s, recently closed?)\n",
          ntohl (ccn.channel_of_client),
          GNUNET_h2s (port_number));
-    /* FIXME: should disconnect instead, this is a serious error! */
     env = GNUNET_MQ_msg (d_msg,
                          GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_DESTROY);
     d_msg->ccn = msg->ccn;
@@ -651,17 +790,38 @@ handle_channel_created (void *cls,
   ch->ccn = ccn;
   ch->incoming_port = port;
   ch->options = ntohl (msg->opt);
-
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Creating incoming channel %X [%s] %p\n",
        ntohl (ccn.channel_of_client),
        GNUNET_h2s (port_number),
        ch);
-  ch->ctx = port->handler (port->cls,
-                           ch,
-                           &msg->peer,
-                           port->hash,
-                           ch->options);
+
+  if (NULL != port->handler)
+  {
+    /** @deprecated */
+    /* Old style API */
+    ch->ctx = port->handler (port->cls,
+                            ch,
+                            &msg->peer,
+                            port->hash,
+                            ch->options);
+  } else {
+    /* MQ API */
+    GNUNET_assert (NULL != port->connects);
+    ch->window_changes = port->window_changes;
+    ch->disconnects = port->disconnects;
+    ch->mq = GNUNET_MQ_queue_for_callbacks (&cadet_mq_send_impl,
+                                            &cadet_mq_destroy_impl,
+                                            &cadet_mq_cancel_impl,
+                                            ch,
+                                            port->handlers,
+                                            &cadet_mq_error_handler,
+                                            ch);
+    ch->ctx = port->connects (port->cadet->cls,
+                              ch,
+                              &msg->peer);
+    GNUNET_MQ_set_handlers_closure (ch->mq, ch->ctx);
+  }
 }
 
 
@@ -2236,111 +2396,32 @@ GNUNET_CADET_open_porT (struct GNUNET_CADET_Handle *h,
                         GNUNET_CADET_DisconnectEventHandler disconnects,
                         const struct GNUNET_MQ_MessageHandler *handlers)
 {
-  return NULL;
-}
-
-
-/**
- * Implement sending functionality of a message queue for
- * us sending messages to a peer.
- *
- * Encapsulates the payload message in a #GNUNET_CADET_LocalData message
- * in order to label the message with the channel ID and send the
- * encapsulated message to the service.
- *
- * @param mq the message queue
- * @param msg the message to send
- * @param impl_state state of the implementation
- */
-static void
-cadet_mq_send_impl (struct GNUNET_MQ_Handle *mq,
-                    const struct GNUNET_MessageHeader *msg,
-                    void *impl_state)
-{
-  struct GNUNET_CADET_Channel *ch = impl_state;
-  struct GNUNET_CADET_Handle *h = ch->cadet;
-  uint16_t msize;
+  struct GNUNET_CADET_PortMessage *msg;
   struct GNUNET_MQ_Envelope *env;
-  struct GNUNET_CADET_LocalData *cadet_msg;
+  struct GNUNET_CADET_Port *p;
 
+  GNUNET_assert (NULL != connects);
 
-  if (NULL == h->mq)
-  {
-    /* We're currently reconnecting, pretend this worked */
-    GNUNET_MQ_impl_send_continue (mq);
-    return;
-  }
+  p = GNUNET_new (struct GNUNET_CADET_Port);
+  p->cadet = h;
+  p->id = *port;
+  p->connects = connects;
+  p->cls = connects_cls;
+  p->window_changes = window_changes;
+  p->disconnects = disconnects;
+  p->handlers = handlers;
 
-  /* check message size for sanity */
-  msize = ntohs (msg->size);
-  if (msize > GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE)
-  {
-    GNUNET_break (0);
-    GNUNET_MQ_impl_send_continue (mq);
-    return;
-  }
+  GNUNET_assert (GNUNET_OK ==
+                GNUNET_CONTAINER_multihashmap_put (h->ports,
+                                                   p->hash,
+                                                   p,
+                                                   GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
 
-  env = GNUNET_MQ_msg_nested_mh (cadet_msg,
-                                 GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA,
-                                 msg);
-  cadet_msg->ccn = ch->ccn;
+  env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_PORT_OPEN);
+  msg->port = p->id;
   GNUNET_MQ_send (h->mq, env);
-  GNUNET_MQ_impl_send_continue (mq);
-}
-
 
-/**
- * Handle destruction of a message queue.  Implementations must not
- * free @a mq, but should take care of @a impl_state.
- *
- * @param mq the message queue to destroy
- * @param impl_state state of the implementation
- */
-static void
-cadet_mq_destroy_impl (struct GNUNET_MQ_Handle *mq,
-                       void *impl_state)
-{
-  struct GNUNET_CADET_Channel *ch = impl_state;
-
-  GNUNET_assert (mq == ch->mq);
-  ch->mq = NULL;
-}
-
-
-/**
- * We had an error processing a message we forwarded from a peer to
- * the CADET service.  We should just complain about it but otherwise
- * continue processing.
- *
- * @param cls closure
- * @param error error code
- */
-static void
-cadet_mq_error_handler (void *cls,
-                        enum GNUNET_MQ_Error error)
-{
-  GNUNET_break_op (0);
-}
-
-
-/**
- * Implementation function that cancels the currently sent message.
- * Should basically undo whatever #mq_send_impl() did.
- *
- * @param mq message queue
- * @param impl_state state specific to the implementation
- */
-static void
-cadet_mq_cancel_impl (struct GNUNET_MQ_Handle *mq,
-                     void *impl_state)
-{
-  struct GNUNET_CADET_Channel *ch = impl_state;
-
-  LOG (GNUNET_ERROR_TYPE_WARNING,
-       "Cannot cancel mq message on channel %X of %p\n",
-       ch->ccn.channel_of_client, ch->cadet);
-
-  GNUNET_break (0);
+  return p;
 }