enable setting per-envelope or per-queue transmission options with MQ API
authorChristian Grothoff <christian@grothoff.org>
Sat, 30 Jul 2016 22:27:18 +0000 (22:27 +0000)
committerChristian Grothoff <christian@grothoff.org>
Sat, 30 Jul 2016 22:27:18 +0000 (22:27 +0000)
src/core/core_api_2.c
src/include/gnunet_core_service.h
src/include/gnunet_mq_lib.h
src/util/mq.c

index d45c98e936a553abf9e22ce59a8003c2f993752e..70dd1e0f0c0f4794d155dd4a661e855233910a5c 100644 (file)
@@ -256,6 +256,26 @@ handle_mq_error (void *cls,
 }
 
 
+/**
+ * Inquire with CORE what options should be set for a message
+ * so that it is transmitted with the given @a priority and
+ * the given @a cork value.
+ *
+ * @param cork desired corking 
+ * @param priority desired message priority
+ * @param[out] flags set to `flags` value for #GNUNET_MQ_set_options()
+ * @return `extra` argument to give to #GNUNET_MQ_set_options()
+ */
+const void *
+GNUNET_CORE_get_mq_options (int cork,
+                           enum GNUNET_CORE_Priority priority,
+                           uint64_t *flags)
+{
+  *flags = ((uint64_t) priority) + (((uint64_t) cork) << 32);
+  return NULL;
+}
+
+
 /**
  * Implement sending functionality of a message queue for
  * us sending messages to a peer.
@@ -275,12 +295,20 @@ core_mq_send_impl (struct GNUNET_MQ_Handle *mq,
   struct SendMessage *sm;
   struct GNUNET_MQ_Envelope *env;
   uint16_t msize;
-  int cork
-    = GNUNET_NO; // FIXME
-  enum GNUNET_CORE_Priority priority
-    = GNUNET_CORE_PRIO_BEST_EFFORT; // FIXME
+  uint64_t flags;
+  int cork;
+  enum GNUNET_CORE_Priority priority;
 
   GNUNET_assert (NULL == pr->env);
+  /* extract options from envelope */
+  env = GNUNET_MQ_get_current_envelope (mq);
+  GNUNET_break (NULL ==
+               GNUNET_MQ_env_get_options (env,
+                                          &flags));
+  cork = (int) (flags >> 32);
+  priority = (uint32_t) flags;
+
+  /* check message size for sanity */
   msize = ntohs (msg->size);
   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (struct SendMessage))
   {
@@ -288,6 +316,8 @@ core_mq_send_impl (struct GNUNET_MQ_Handle *mq,
     GNUNET_MQ_impl_send_continue (mq);
     return;
   }
+
+  /* ask core for transmission */
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Asking core for transmission of %u bytes to `%s'\n",
        (unsigned int) msize,
@@ -302,6 +332,8 @@ core_mq_send_impl (struct GNUNET_MQ_Handle *mq,
   smr->smr_id = htons (++pr->smr_id_gen);
   GNUNET_MQ_send (h->mq,
                   env);
+
+  /* prepare message with actual transmission data */
   pr->env = GNUNET_MQ_msg_nested_mh (sm,
                                     GNUNET_MESSAGE_TYPE_CORE_SEND,
                                     msg);
@@ -385,6 +417,8 @@ connect_peer (struct GNUNET_CORE_Handle *h,
              const struct GNUNET_PeerIdentity *peer)
 {
   struct PeerRecord *pr;
+  uint64_t flags;
+  const void *extra;
   
   pr = GNUNET_new (struct PeerRecord);
   pr->peer = *peer;
@@ -401,6 +435,13 @@ connect_peer (struct GNUNET_CORE_Handle *h,
                                          h->handlers,
                                          &core_mq_error_handler,
                                          pr);
+  /* get our default options */
+  extra = GNUNET_CORE_get_mq_options (GNUNET_NO,
+                                     GNUNET_CORE_PRIO_BEST_EFFORT,
+                                     &flags);
+  GNUNET_MQ_set_options (pr->mq,
+                        flags,
+                        extra);
   if (NULL != h->connects)
   {
     pr->client_cls = h->connects (h->cls,
index fa74415d620b18baeee9c5876ccc844514cedb4a..6ec486b26c0cbcebe3e9cb0be39e503f987ef39b 100644 (file)
@@ -247,6 +247,16 @@ GNUNET_CORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
                      int outbound_hdr_only,
                      const struct GNUNET_CORE_MessageHandler *handlers);
 
+/**
+ * Disconnect from the core service.    This function can only
+ * be called *after* all pending #GNUNET_CORE_notify_transmit_ready
+ * requests have been explicitly cancelled.
+ *
+ * @param handle connection to core to disconnect
+ */
+void
+GNUNET_CORE_disconnect (struct GNUNET_CORE_Handle *handle);
+
 
 /**
  * Connect to the core service.  Note that the connection may complete
@@ -282,14 +292,40 @@ GNUNET_CORE_connecT (const struct GNUNET_CONFIGURATION_Handle *cfg,
 
 
 /**
- * Disconnect from the core service.    This function can only
- * be called *after* all pending #GNUNET_CORE_notify_transmit_ready
- * requests have been explicitly cancelled.
+ * Disconnect from the core service.
  *
  * @param handle connection to core to disconnect
  */
 void
-GNUNET_CORE_disconnect (struct GNUNET_CORE_Handle *handle);
+GNUNET_CORE_disconnecT (struct GNUNET_CORE_Handle *handle);
+
+
+/**
+ * Inquire with CORE what options should be set for a message
+ * so that it is transmitted with the given @a priority and
+ * the given @a cork value.
+ *
+ * @param cork desired corking 
+ * @param priority desired message priority
+ * @param[out] flags set to `flags` value for #GNUNET_MQ_set_options()
+ * @return `extra` argument to give to #GNUNET_MQ_set_options()
+ */
+const void *
+GNUNET_CORE_get_mq_options (int cork,
+                           enum GNUNET_CORE_Priority priority,
+                           uint64_t *flags);
+
+
+/**
+ * Obtain the message queue for a connected peer.
+ *
+ * @param h the core handle
+ * @param pid the identity of the peer 
+ * @return NULL if @a pid is not connected
+ */
+struct GNUNET_MQ_Handle *
+GNUNET_CORE_get_mq (const struct GNUNET_CORE_Handle *h,
+                   const struct GNUNET_PeerIdentity *pid);
 
 
 /**
index 43cefca9f1a17ef7b4232a2384ffd1d4b48197b3..35313263ddb56560ee2ceb281cf80e31cd01a482 100644 (file)
 
 
 /**
- * Implementation of the GNUNET_MQ_extract_nexted_mh macro.
+ * Implementation of the #GNUNET_MQ_extract_nexted_mh macro.
  *
  * @param mh message header to extract nested message header from
  * @param base_size size of the message before the nested message's header appears
@@ -305,7 +305,7 @@ struct GNUNET_MQ_MessageHandler
 /**
  * End-marker for the handlers array
  */
-#define GNUNET_MQ_handler_end() {NULL, NULL, NULL, 0, 0}
+#define GNUNET_MQ_handler_end() { NULL, NULL, NULL, 0, 0 }
 
 
 /**
@@ -433,6 +433,57 @@ void
 GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *mqm);
 
 
+/**
+ * Function to obtain the current envelope from
+ * within #GNUNET_MQ_SendImpl implementations.
+ *
+ * @param mq message queue to interrogate
+ * @return the current envelope
+ */
+struct GNUNET_MQ_Envelope *
+GNUNET_MQ_get_current_envelope (struct GNUNET_MQ_Handle *mq);
+
+
+/**
+ * Set application-specific options for this envelope.
+ * Overrides the options set for the queue with
+ * #GNUNET_MQ_set_options() for this message only.
+ *
+ * @param env message to set options for
+ * @param flags flags to use (meaning is queue-specific)
+ * @param extra additional buffer for further data (also queue-specific)
+ */
+void
+GNUNET_MQ_env_set_options (struct GNUNET_MQ_Envelope *env,
+                          uint64_t flags,
+                          const void *extra);
+
+
+/**
+ * Get application-specific options for this envelope.
+ *
+ * @param env message to set options for
+ * @param[out] flags set to flags to use (meaning is queue-specific)
+ * @return extra additional buffer for further data (also queue-specific)
+ */
+const void *
+GNUNET_MQ_env_get_options (struct GNUNET_MQ_Envelope *env,
+                          uint64_t *flags);
+
+
+/**
+ * Set application-specific options for this queue.
+ *
+ * @param mq message queue to set options for
+ * @param flags flags to use (meaning is queue-specific)
+ * @param extra additional buffer for further data (also queue-specific)
+ */
+void
+GNUNET_MQ_set_options (struct GNUNET_MQ_Handle *mq,
+                      uint64_t flags,
+                      const void *extra);
+
+
 /**
  * Obtain the current length of the message queue.
  *
index 1638d7e0c9db497a4f8846eed54de423bf24109f..a4ea5e39d91bb2bda188b6656ebe3ce81f9f4a0d 100644 (file)
@@ -63,6 +63,26 @@ struct GNUNET_MQ_Envelope
    * Closure for @e send_cb
    */
   void *sent_cls;
+
+  /**
+   * Flags that were set for this envelope by
+   * #GNUNET_MQ_env_set_options().   Only valid if
+   * @e have_custom_options is set.
+   */
+  uint64_t flags;
+
+  /**
+   * Additional options buffer set for this envelope by
+   * #GNUNET_MQ_env_set_options().  Only valid if
+   * @e have_custom_options is set.
+   */
+  const void *extra;
+
+  /**
+   * Did the application call #GNUNET_MQ_env_set_options()?
+   */
+  int have_custom_options;
+  
 };
 
 
@@ -134,6 +154,18 @@ struct GNUNET_MQ_Handle
    */
   struct GNUNET_SCHEDULER_Task *continue_task;
 
+  /**
+   * Additional options buffer set for this queue by
+   * #GNUNET_MQ_set_options().  Default is 0.
+   */
+  const void *default_extra;
+  
+  /**
+   * Flags that were set for this queue by
+   * #GNUNET_MQ_set_options().   Default is 0.
+   */
+  uint64_t default_flags;
+
   /**
    * Next id that should be used for the @e assoc_map,
    * initialized lazily to a random value together with
@@ -1040,4 +1072,84 @@ GNUNET_MQ_send_cancel (struct GNUNET_MQ_Envelope *ev)
   GNUNET_free (ev);
 }
 
+
+/**
+ * Function to obtain the current envelope from
+ * within #GNUNET_MQ_SendImpl implementations.
+ *
+ * @param mq message queue to interrogate
+ * @return the current envelope
+ */
+struct GNUNET_MQ_Envelope *
+GNUNET_MQ_get_current_envelope (struct GNUNET_MQ_Handle *mq)
+{
+  return mq->current_envelope;
+}
+
+
+/**
+ * Set application-specific options for this envelope.
+ * Overrides the options set for the queue with
+ * #GNUNET_MQ_set_options() for this message only.
+ *
+ * @param env message to set options for
+ * @param flags flags to use (meaning is queue-specific)
+ * @param extra additional buffer for further data (also queue-specific)
+ */
+void
+GNUNET_MQ_env_set_options (struct GNUNET_MQ_Envelope *env,
+                          uint64_t flags,
+                          const void *extra)
+{
+  env->flags = flags;
+  env->extra = extra;
+  env->have_custom_options = GNUNET_YES;
+}
+
+
+/**
+ * Get application-specific options for this envelope.
+ *
+ * @param env message to set options for
+ * @param[out] flags set to flags to use (meaning is queue-specific)
+ * @return extra additional buffer for further data (also queue-specific)
+ */
+const void *
+GNUNET_MQ_env_get_options (struct GNUNET_MQ_Envelope *env,
+                          uint64_t *flags)
+{
+  struct GNUNET_MQ_Handle *mq = env->parent_queue;
+  
+  if (GNUNET_YES == env->have_custom_options)
+  {
+    *flags = env->flags;
+    return env->extra;
+  }
+  if (NULL == mq)
+  {
+    *flags = 0;
+    return NULL;
+  }
+  *flags = mq->default_flags;
+  return mq->default_extra;
+}
+
+
+/**
+ * Set application-specific options for this queue.
+ *
+ * @param mq message queue to set options for
+ * @param flags flags to use (meaning is queue-specific)
+ * @param extra additional buffer for further data (also queue-specific)
+ */
+void
+GNUNET_MQ_set_options (struct GNUNET_MQ_Handle *mq,
+                      uint64_t flags,
+                      const void *extra)
+{
+  mq->default_flags = flags;
+  mq->default_extra = extra;
+}
+
+
 /* end of mq.c */