towards fixing #3363: replacing old iteration API with new monitoring API for core...
[oweals/gnunet.git] / src / core / core_api.c
index 7aa3e0519ebc377490527b84499a9e0d4471d49d..7818a60a3bdebf197641db202aac2500e3e7aa1a 100644 (file)
@@ -25,6 +25,7 @@
  * @author Christian Grothoff
  */
 #include "platform.h"
+#include "gnunet_util_lib.h"
 #include "gnunet_constants.h"
 #include "gnunet_core_service.h"
 #include "core.h"
@@ -70,7 +71,7 @@ struct GNUNET_CORE_TransmitHandle
   /**
    * How important is this message?
    */
-  uint32_t priority;
+  enum GNUNET_CORE_Priority priority;
 
   /**
    * Size of this request.
@@ -144,6 +145,13 @@ struct PeerRecord
 
 };
 
+struct CoreMQState
+{
+  struct GNUNET_PeerIdentity target;
+  struct GNUNET_CORE_Handle *core;
+  struct GNUNET_CORE_TransmitHandle *th;
+};
+
 
 /**
  * Type of function called upon completion.
@@ -333,7 +341,7 @@ reconnect (struct GNUNET_CORE_Handle *h);
 /**
  * Task schedule to try to re-connect to core.
  *
- * @param cls the 'struct GNUNET_CORE_Handle'
+ * @param cls the `struct GNUNET_CORE_Handle`
  * @param tc task context
  */
 static void
@@ -342,7 +350,8 @@ reconnect_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
   struct GNUNET_CORE_Handle *h = cls;
 
   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to CORE service after delay\n");
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Connecting to CORE service after delay\n");
   reconnect (h);
 }
 
@@ -351,9 +360,9 @@ reconnect_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
  * Notify clients about disconnect and free
  * the entry for connected peer.
  *
- * @param cls the 'struct GNUNET_CORE_Handle*'
+ * @param cls the `struct GNUNET_CORE_Handle *`
  * @param key the peer identity (not used)
- * @param value the 'struct PeerRecord' to free.
+ * @param value the `struct PeerRecord` to free.
  * @return #GNUNET_YES (continue)
  */
 static int
@@ -425,7 +434,8 @@ reconnect_later (struct GNUNET_CORE_Handle *h)
   h->currently_down = GNUNET_YES;
   GNUNET_assert (h->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
   h->reconnect_task =
-      GNUNET_SCHEDULER_add_delayed (h->retry_backoff, &reconnect_task, h);
+      GNUNET_SCHEDULER_add_delayed (h->retry_backoff,
+                                    &reconnect_task, h);
   while (NULL != (cm = h->control_pending_head))
   {
     GNUNET_CONTAINER_DLL_remove (h->control_pending_head,
@@ -464,7 +474,8 @@ trigger_next_request (struct GNUNET_CORE_Handle *h, int ignore_currently_down);
  * @param tc context, can be NULL (!)
  */
 static void
-transmission_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
+transmission_timeout (void *cls,
+                      const struct GNUNET_SCHEDULER_TaskContext *tc);
 
 
 /**
@@ -506,7 +517,7 @@ request_next_transmission (struct PeerRecord *pr)
   smr = (struct SendMessageRequest *) &cm[1];
   smr->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND_REQUEST);
   smr->header.size = htons (sizeof (struct SendMessageRequest));
-  smr->priority = htonl (th->priority);
+  smr->priority = htonl ((uint32_t) th->priority);
   smr->deadline = GNUNET_TIME_absolute_hton (th->timeout);
   smr->peer = pr->peer;
   smr->reserved = htonl (0);
@@ -529,7 +540,8 @@ request_next_transmission (struct PeerRecord *pr)
  * @param tc context, can be NULL (!)
  */
 static void
-transmission_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+transmission_timeout (void *cls,
+                      const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
   struct PeerRecord *pr = cls;
   struct GNUNET_CORE_Handle *h = pr->ch;
@@ -569,10 +581,10 @@ transmission_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 /**
  * Transmit the next message to the core service.
  *
- * @param cls closure with the 'struct GNUNET_CORE_Handle'
+ * @param cls closure with the `struct GNUNET_CORE_Handle`
  * @param size number of bytes available in @a buf
  * @param buf where the callee should write the message
- * @return number of bytes written to buf
+ * @return number of bytes written to @a buf
  */
 static size_t
 transmit_message (void *cls, size_t size, void *buf)
@@ -641,7 +653,7 @@ transmit_message (void *cls, size_t size, void *buf)
        GNUNET_i2s (&pr->peer), (unsigned int) th->msize);
   sm = (struct SendMessage *) buf;
   sm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND);
-  sm->priority = htonl (th->priority);
+  sm->priority = htonl ((uint32_t) th->priority);
   sm->deadline = GNUNET_TIME_absolute_hton (th->timeout);
   sm->peer = pr->peer;
   sm->cork = htonl ((uint32_t) th->cork);
@@ -688,7 +700,8 @@ transmit_message (void *cls, size_t size, void *buf)
  * @param ignore_currently_down transmit message even if not initialized?
  */
 static void
-trigger_next_request (struct GNUNET_CORE_Handle *h, int ignore_currently_down)
+trigger_next_request (struct GNUNET_CORE_Handle *h,
+                      int ignore_currently_down)
 {
   uint16_t msize;
 
@@ -726,11 +739,12 @@ trigger_next_request (struct GNUNET_CORE_Handle *h, int ignore_currently_down)
 /**
  * Handler for notification messages received from the core.
  *
- * @param cls our "struct GNUNET_CORE_Handle"
+ * @param cls our `struct GNUNET_CORE_Handle`
  * @param msg the message received from the core service
  */
 static void
-main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg)
+main_notify_handler (void *cls,
+                     const struct GNUNET_MessageHeader *msg)
 {
   struct GNUNET_CORE_Handle *h = cls;
   const struct InitReplyMessage *m;
@@ -1064,7 +1078,8 @@ reconnect (struct GNUNET_CORE_Handle *h)
   unsigned int hpos;
 
   GNUNET_assert (NULL == h->client);
-  GNUNET_assert (h->currently_down == GNUNET_YES);
+  GNUNET_assert (GNUNET_YES == h->currently_down);
+  GNUNET_assert (NULL != h->cfg);
   h->client = GNUNET_CLIENT_connect ("core", h->cfg);
   if (NULL == h->client)
   {
@@ -1165,7 +1180,8 @@ GNUNET_CORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
   GNUNET_assert (h->hcnt <
                  (GNUNET_SERVER_MAX_MESSAGE_SIZE -
                   sizeof (struct InitMessage)) / sizeof (uint16_t));
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to CORE service\n");
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Connecting to CORE service\n");
   reconnect (h);
   return h;
 }
@@ -1173,7 +1189,7 @@ GNUNET_CORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
 
 /**
  * Disconnect from the core service.  This function can only
- * be called *after* all pending 'GNUNET_CORE_notify_transmit_ready'
+ * be called *after* all pending #GNUNET_CORE_notify_transmit_ready()
  * requests have been explicitly canceled.
  *
  * @param handle connection to core to disconnect
@@ -1251,7 +1267,7 @@ run_request_next_transmission (void *cls,
  * @param handle connection to core service
  * @param cork is corking allowed for this transmission?
  * @param priority how important is the message?
- * @param maxdelay how long can the message wait?
+ * @param maxdelay how long can the message wait? Only effective if @a cork is #GNUNET_YES
  * @param target who should receive the message, never NULL (can be this peer's identity for loopback)
  * @param notify_size how many bytes of buffer space does @a notify want?
  * @param notify function to call when buffer space is available;
@@ -1264,8 +1280,9 @@ run_request_next_transmission (void *cls,
  *         if NULL is returned, @a notify will NOT be called.
  */
 struct GNUNET_CORE_TransmitHandle *
-GNUNET_CORE_notify_transmit_ready (struct GNUNET_CORE_Handle *handle, int cork,
-                                   uint32_t priority,
+GNUNET_CORE_notify_transmit_ready (struct GNUNET_CORE_Handle *handle,
+                                   int cork,
+                                   enum GNUNET_CORE_Priority priority,
                                    struct GNUNET_TIME_Relative maxdelay,
                                    const struct GNUNET_PeerIdentity *target,
                                    size_t notify_size,
@@ -1275,9 +1292,6 @@ GNUNET_CORE_notify_transmit_ready (struct GNUNET_CORE_Handle *handle, int cork,
   struct PeerRecord *pr;
   struct GNUNET_CORE_TransmitHandle *th;
 
-  GNUNET_assert (NULL != handle);
-  GNUNET_assert (NULL != target);
-
   if (notify_size > GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE)
   {
      GNUNET_break (0);
@@ -1315,7 +1329,8 @@ GNUNET_CORE_notify_transmit_ready (struct GNUNET_CORE_Handle *handle, int cork,
   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == pr->ntr_task);
   pr->ntr_task =
     GNUNET_SCHEDULER_add_now (&run_request_next_transmission, pr);
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmission request added to queue\n");
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Transmission request added to queue\n");
   return th;
 }
 
@@ -1387,4 +1402,124 @@ GNUNET_CORE_is_peer_connected_sync (const struct GNUNET_CORE_Handle *h,
 }
 
 
+/**
+ * Function called to notify a client about the connection
+ * begin ready to queue more data.  "buf" will be
+ * NULL and "size" zero if the connection was closed for
+ * writing in the meantime.
+ *
+ * @param cls closure
+ * @param size number of bytes available in buf
+ * @param buf where the callee should write the message
+ * @return number of bytes written to buf
+ */
+static size_t
+core_mq_ntr (void *cls, size_t size,
+             void *buf)
+{
+  struct GNUNET_MQ_Handle *mq = cls;
+  struct CoreMQState *mqs = GNUNET_MQ_impl_state (mq);
+  const struct GNUNET_MessageHeader *mh = GNUNET_MQ_impl_current (mq);
+  size_t msg_size = ntohs (mh->size);
+  GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "core-mq", "ntr called (size %u, type %u)\n",
+                   msg_size, ntohs (mh->type));
+  mqs->th = NULL;
+  if (NULL == buf)
+  {
+    GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "core-mq", "send error\n");
+    GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_WRITE);
+    return 0;
+  }
+  memcpy (buf, mh, msg_size);
+  GNUNET_MQ_impl_send_continue (mq);
+  return msg_size;
+}
+
+
+/**
+ * Signature of functions implementing the
+ * sending functionality of a message queue.
+ *
+ * @param mq the message queue
+ * @param msg the message to send
+ * @param impl_state state of the implementation
+ */
+static void
+core_mq_send (struct GNUNET_MQ_Handle *mq,
+              const struct GNUNET_MessageHeader *msg,
+              void *impl_state)
+{
+  struct CoreMQState *mqs = impl_state;
+  GNUNET_assert (NULL == mqs->th);
+  GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "core-mq", "Sending queued message (size %u)\n",
+             ntohs (msg->size));
+  mqs->th = GNUNET_CORE_notify_transmit_ready (mqs->core, GNUNET_YES, 0,
+                                               GNUNET_TIME_UNIT_FOREVER_REL,
+                                               &mqs->target,
+                                               ntohs (msg->size), core_mq_ntr, mq);
+}
+
+
+/**
+ * Signature of functions implementing the
+ * destruction of a message queue.
+ * Implementations must not free @a mq, but should
+ * take care of @a impl_state.
+ *
+ * @param mq the message queue to destroy
+ * @param impl_state state of the implementation
+ */
+static void
+core_mq_destroy (struct GNUNET_MQ_Handle *mq, void *impl_state)
+{
+  struct CoreMQState *mqs = impl_state;
+  if (NULL != mqs->th)
+  {
+    GNUNET_CORE_notify_transmit_ready_cancel (mqs->th);
+    mqs->th = NULL;
+  }
+  GNUNET_free (mqs);
+}
+
+
+/**
+ * Implementation function that cancels the currently sent message.
+ *
+ * @param mq message queue
+ * @param impl_state state specific to the implementation
+ */
+static void
+core_mq_cancel (struct GNUNET_MQ_Handle *mq, void *impl_state)
+{
+  struct CoreMQState *mqs = impl_state;
+  GNUNET_assert (NULL != mqs->th);
+  GNUNET_CORE_notify_transmit_ready_cancel (mqs->th);
+}
+
+
+/**
+ * Create a message queue for sending messages to a peer with CORE.
+ * Messages may only be queued with #GNUNET_MQ_send once the init callback has
+ * been called for the given handle.
+ * There must only be one queue per peer for each core handle.
+ * The message queue can only be used to transmit messages,
+ * not to receive them.
+ *
+ * @param h the core handle
+ * @param target the target peer for this queue, may not be NULL
+ * @return a message queue for sending messages over the core handle
+ *         to the target peer
+ */
+struct GNUNET_MQ_Handle *
+GNUNET_CORE_mq_create (struct GNUNET_CORE_Handle *h,
+                       const struct GNUNET_PeerIdentity *target)
+{
+  struct CoreMQState *mqs = GNUNET_new (struct CoreMQState);
+  mqs->core = h;
+  mqs->target = *target;
+  return GNUNET_MQ_queue_for_callbacks (core_mq_send, core_mq_destroy,
+                                        core_mq_cancel, mqs,
+                                        NULL, NULL, NULL);
+}
+
 /* end of core_api.c */