implement GNUNET_TRANSPORT_core_receive_continue
authorChristian Grothoff <christian@grothoff.org>
Tue, 21 May 2019 14:56:26 +0000 (16:56 +0200)
committerChristian Grothoff <christian@grothoff.org>
Tue, 21 May 2019 14:56:26 +0000 (16:56 +0200)
src/transport/transport_api_core.c

index 54dc7f4c3f92340be1489844a2e81a5e1e6aa352..224af5de22928cb3d20b14bb04866877bcf9d5bf 100644 (file)
@@ -179,6 +179,15 @@ struct GNUNET_TRANSPORT_CoreHandle
    */
   struct GNUNET_TIME_Relative reconnect_delay;
 
+  /**
+   * Internal counter to check how many more receive OK messages this
+   * CORE service is allowed to send in total. Just to detect easy
+   * cases of protocol violations by the CORE implementation.
+   * NOTE: we may want to make this stronger by counting per peer
+   * instead of globally.
+   */
+  unsigned int rom_pending;
+
   /**
    * Should we check that @e self matches what the service thinks?
    * (if #GNUNET_NO, then @e self is all zeros!).
@@ -695,6 +704,7 @@ handle_recv (void *cls, const struct InboundMessage *im)
     disconnect_and_schedule_reconnect (h);
     return;
   }
+  h->rom_pending++;
   GNUNET_MQ_inject_message (n->mq, imm);
 }
 
@@ -919,4 +929,40 @@ GNUNET_TRANSPORT_core_disconnect (struct GNUNET_TRANSPORT_CoreHandle *handle)
 }
 
 
+/**
+ * Notification from the CORE service to the TRANSPORT service
+ * that the CORE service has finished processing a message from
+ * TRANSPORT (via the @code{handlers} of #GNUNET_TRANSPORT_core_connect())
+ * and that it is thus now OK for TRANSPORT to send more messages
+ * for @a pid.
+ *
+ * Used to provide flow control, this is our equivalent to
+ * #GNUNET_SERVICE_client_continue() of an ordinary service.
+ *
+ * Note that due to the use of a window, TRANSPORT may send multiple
+ * messages destined for the same peer even without an intermediate
+ * call to this function. However, CORE must still call this function
+ * once per message received, as otherwise eventually the window will
+ * be full and TRANSPORT will stop providing messages to CORE for @a
+ * pid.
+ *
+ * @param ch core handle
+ * @param pid which peer was the message from that was fully processed by CORE
+ */
+void
+GNUNET_TRANSPORT_core_receive_continue (struct GNUNET_TRANSPORT_CoreHandle *ch,
+                                        const struct GNUNET_PeerIdentity *pid)
+{
+  struct RecvOkMessage *rom;
+  struct GNUNET_MQ_Envelope *env;
+
+  GNUNET_assert (ch->rom_pending > 0);
+  ch->rom_pending--;
+  env = GNUNET_MQ_msg (rom, GNUNET_MESSAGE_TYPE_TRANSPORT_RECV_OK);
+  rom->increase_window_delta = htonl (1);
+  rom->peer = *pid;
+  GNUNET_MQ_send (ch->mq, env);
+}
+
+
 /* end of transport_api_core.c */