activating client_new implementation, seems to mostly work fine, or better than the...
authorChristian Grothoff <christian@grothoff.org>
Fri, 21 Oct 2016 16:04:46 +0000 (16:04 +0000)
committerChristian Grothoff <christian@grothoff.org>
Fri, 21 Oct 2016 16:04:46 +0000 (16:04 +0000)
src/transport/transport_api_core.c
src/util/client.c
src/util/client_new.c
src/util/mq.c

index f6ea43db901813d58ed2bd76d89ff4450abb7a35..de18a140cd7e19fcde3e3c1567d947c30beb2064 100644 (file)
@@ -345,6 +345,25 @@ handle_hello (void *cls,
 }
 
 
+/**
+ * A message from the handler's message queue to a neighbour was
+ * transmitted.  Now trigger (possibly delayed) notification of the
+ * neighbour's message queue that we are done and thus ready for
+ * the next message.
+ *
+ * @param cls the `struct Neighbour` where the message was sent
+ */
+static void
+notify_send_done_fin (void *cls)
+{
+  struct Neighbour *n = cls;
+
+  n->timeout_task = NULL;
+  n->is_ready = GNUNET_YES;
+  GNUNET_MQ_impl_send_continue (n->mq);
+}
+
+
 /**
  * A message from the handler's message queue to a neighbour was
  * transmitted.  Now trigger (possibly delayed) notification of the
@@ -364,8 +383,8 @@ notify_send_done (void *cls)
   {
     GNUNET_BANDWIDTH_tracker_consume (&n->out_tracker,
                                       n->env_size + n->traffic_overhead);
-    n->traffic_overhead = 0;
     n->env = NULL;
+    n->traffic_overhead = 0;
   }
   delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker,
                                               128);
@@ -375,10 +394,11 @@ notify_send_done (void *cls)
     GNUNET_MQ_impl_send_continue (n->mq);
     return;
   }
+  GNUNET_MQ_impl_send_in_flight (n->mq);
   /* cannot send even a small message without violating
-     quota, wait a before notifying MQ */
+     quota, wait a before allowing MQ to send next message */
   n->timeout_task = GNUNET_SCHEDULER_add_delayed (delay,
-                                                  &notify_send_done,
+                                                  &notify_send_done_fin,
                                                   n);
 }
 
@@ -411,6 +431,7 @@ mq_send_impl (struct GNUNET_MQ_Handle *mq,
     GNUNET_MQ_impl_send_continue (mq);
     return;
   }
+  GNUNET_assert (NULL == n->env);
   n->env = GNUNET_MQ_msg_nested_mh (obm,
                                     GNUNET_MESSAGE_TYPE_TRANSPORT_SEND,
                                     msg);
index f40d5e6ebc907b5ee64fe98293ae3f9a8777cd6a..47db91c8e796e86b836894b16adbe09c9645e691 100644 (file)
@@ -375,7 +375,7 @@ do_connect (const char *service_name,
  * @return the message queue, NULL on error
  */
 struct GNUNET_MQ_Handle *
-GNUNET_CLIENT_connecT (const struct GNUNET_CONFIGURATION_Handle *cfg,
+GNUNET_CLIENT_connecTX (const struct GNUNET_CONFIGURATION_Handle *cfg,
                        const char *service_name,
                        const struct GNUNET_MQ_MessageHandler *handlers,
                        GNUNET_MQ_ErrorHandler error_handler,
index 1e90470fb2896f442b0cdb94e64ace54e65567a2..593d3a268612d0a60527bc62cad29b0135337e9a 100644 (file)
@@ -213,10 +213,9 @@ start_connect (void *cls);
 static void
 connect_fail_continuation (struct ClientState *cstate)
 {
-  LOG (GNUNET_ERROR_TYPE_INFO,
-       "Failed to establish TCP connection to `%s:%u', no further addresses to try.\n",
-       cstate->hostname,
-       cstate->port);
+  LOG (GNUNET_ERROR_TYPE_WARNING,
+       "Failed to establish connection to `%s', no further addresses to try.\n",
+       cstate->service_name);
   GNUNET_break (NULL == cstate->ap_head);
   GNUNET_break (NULL == cstate->ap_tail);
   GNUNET_break (NULL == cstate->dns_active);
@@ -245,6 +244,7 @@ transmit_ready (void *cls)
   ssize_t ret;
   size_t len;
   const char *pos;
+  int notify_in_flight;
 
   cstate->send_task = NULL;
   pos = (const char *) cstate->msg;
@@ -262,10 +262,7 @@ transmit_ready (void *cls)
                             GNUNET_MQ_ERROR_WRITE);
     return;
   }
-  if (0 == cstate->msg_off)
-  {
-    GNUNET_MQ_impl_send_in_flight (cstate->mq);
-  }
+  notify_in_flight = (0 == cstate->msg_off);
   cstate->msg_off += ret;
   if (cstate->msg_off < len)
   {
@@ -274,6 +271,8 @@ transmit_ready (void *cls)
                                         cstate->sock,
                                         &transmit_ready,
                                         cstate);
+    if (notify_in_flight) 
+      GNUNET_MQ_impl_send_in_flight (cstate->mq);
     return;
   }
   cstate->msg = NULL;
@@ -345,6 +344,7 @@ connection_client_destroy_impl (struct GNUNET_MQ_Handle *mq,
   {
     /* defer destruction */
     cstate->in_destroy = GNUNET_YES;
+    cstate->mq = NULL;
     return;
   }
   if (NULL != cstate->dns_active)
@@ -384,8 +384,12 @@ receive_ready (void *cls)
                          GNUNET_NO);
   if (GNUNET_SYSERR == ret)
   {
-    GNUNET_MQ_inject_error (cstate->mq,
-                            GNUNET_MQ_ERROR_READ);
+    if (NULL != cstate->mq)
+      GNUNET_MQ_inject_error (cstate->mq,
+                             GNUNET_MQ_ERROR_READ);
+    if (GNUNET_YES == cstate->in_destroy)
+      connection_client_destroy_impl (cstate->mq,
+                                     cstate);
     return;
   }
   if (GNUNET_YES == cstate->in_destroy)
@@ -723,16 +727,25 @@ start_connect (void *cls)
 #endif
 
   if ( (0 == (cstate->attempts++ % 2)) ||
-       (0 == cstate->port) )
+       (0 == cstate->port) ||
+       (NULL == cstate->hostname) )
   {
-    /* on even rounds, try UNIX first */
+    /* on even rounds, try UNIX first, or always
+       if we do not have a DNS name and TCP port. */
     cstate->sock = try_unixpath (cstate->service_name,
                                  cstate->cfg);
     if (NULL != cstate->sock)
     {
       connect_success_continuation (cstate);
       return;
-    }
+    }    
+  }
+  if ( (NULL == cstate->hostname) ||
+       (0 == cstate->port) )
+  {
+    /* All options failed. Boo! */
+    connect_fail_continuation (cstate);
+    return;
   }
   cstate->dns_active
     = GNUNET_RESOLVER_ip_get (cstate->hostname,
@@ -807,11 +820,11 @@ connection_client_cancel_impl (struct GNUNET_MQ_Handle *mq,
  * @return the message queue, NULL on error
  */
 struct GNUNET_MQ_Handle *
-GNUNET_CLIENT_connecT2 (const struct GNUNET_CONFIGURATION_Handle *cfg,
-                        const char *service_name,
-                        const struct GNUNET_MQ_MessageHandler *handlers,
-                        GNUNET_MQ_ErrorHandler error_handler,
-                        void *error_handler_cls)
+GNUNET_CLIENT_connecT (const struct GNUNET_CONFIGURATION_Handle *cfg,
+                      const char *service_name,
+                      const struct GNUNET_MQ_MessageHandler *handlers,
+                      GNUNET_MQ_ErrorHandler error_handler,
+                      void *error_handler_cls)
 {
   struct ClientState *cstate;
 
index 4ba6c5ff8c255ab4e2e089263bc6de34d6badef6..ba947d5b85d53100e74dd79bab13866b0e556614 100644 (file)
@@ -127,6 +127,11 @@ struct GNUNET_MQ_Handle
    */
   void *error_handler_cls;
 
+  /**
+   * Task to asynchronously run #impl_send_continue(). 
+   */
+  struct GNUNET_SCHEDULER_Task *send_task;
+  
   /**
    * Linked list of messages pending to be sent
    */
@@ -144,23 +149,11 @@ struct GNUNET_MQ_Handle
    */
   struct GNUNET_MQ_Envelope *current_envelope;
 
-  /**
-   * GNUNET_YES if the sent notification was called 
-   * for the current envelope.
-   */
-  int send_notification_called;
-
   /**
    * Map of associations, lazily allocated
    */
   struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map;
 
-  /**
-   * Task scheduled during #GNUNET_MQ_impl_send_continue
-   * or #GNUNET_MQ_impl_send_in_flight
-   */
-  struct GNUNET_SCHEDULER_Task *send_task;
-
   /**
    * Functions to call on queue destruction; kept in a DLL.
    */
@@ -196,9 +189,15 @@ struct GNUNET_MQ_Handle
   unsigned int queue_length;
 
   /**
-   * GNUNET_YES if GNUNET_MQ_impl_evacuate was called.
+   * #GNUNET_YES if GNUNET_MQ_impl_evacuate was called.
+   * FIXME: is this dead?
    */
   int evacuate_called;
+
+  /**
+   * #GNUNET_YES if GNUNET_MQ_impl_send_in_flight() was called.
+   */
+  int in_flight;
 };
 
 
@@ -364,7 +363,7 @@ GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *ev)
 unsigned int
 GNUNET_MQ_get_length (struct GNUNET_MQ_Handle *mq)
 {
-  return mq->queue_length;
+  return mq->queue_length - (GNUNET_YES == mq->in_flight) ? 1 : 0;
 }
 
 
@@ -385,7 +384,8 @@ GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq,
   mq->queue_length++;
   ev->parent_queue = mq;
   /* is the implementation busy? queue it! */
-  if (NULL != mq->current_envelope)
+  if ( (NULL != mq->current_envelope) ||
+       (NULL != mq->send_task) )
   {
     GNUNET_CONTAINER_DLL_insert_tail (mq->envelope_head,
                                       mq->envelope_tail,
@@ -427,35 +427,6 @@ GNUNET_MQ_send_copy (struct GNUNET_MQ_Handle *mq,
 }
 
 
-/**
- * Task run to call the send notification for the next queued
- * message, if any.  Only useful for implementing message queues,
- * results in undefined behavior if not used carefully.
- *
- * @param cls message queue to send the next message with
- */
-static void
-impl_send_in_flight (void *cls)
-{
-  struct GNUNET_MQ_Handle *mq = cls;
-  struct GNUNET_MQ_Envelope *current_envelope;
-
-  mq->send_task = NULL;
-  /* call is only valid if we're actually currently sending
-   * a message */
-  current_envelope = mq->current_envelope;
-  GNUNET_assert (NULL != current_envelope);
-  /* can't call cancel from now on anymore */
-  current_envelope->parent_queue = NULL;
-  if ( (GNUNET_NO == mq->send_notification_called) &&
-       (NULL != current_envelope->sent_cb) )
-  {
-    current_envelope->sent_cb (current_envelope->sent_cls);
-  }
-  mq->send_notification_called = GNUNET_YES;
-}
-
-
 /**
  * Task run to call the send implementation for the next queued
  * message, if any.  Only useful for implementing message queues,
@@ -467,32 +438,19 @@ static void
 impl_send_continue (void *cls)
 {
   struct GNUNET_MQ_Handle *mq = cls;
-  struct GNUNET_MQ_Envelope *current_envelope;
-
+  
   mq->send_task = NULL;
   /* call is only valid if we're actually currently sending
    * a message */
-  current_envelope = mq->current_envelope;
-  GNUNET_assert (NULL != current_envelope);
-  impl_send_in_flight (mq);
-  GNUNET_assert (0 < mq->queue_length);
-  mq->queue_length--;
   if (NULL == mq->envelope_head)
-  {
-    mq->current_envelope = NULL;
-  }
-  else
-  {
-    mq->current_envelope = mq->envelope_head;
-    GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
-                                 mq->envelope_tail,
-                                 mq->current_envelope);
-    mq->send_notification_called = GNUNET_NO;
-    mq->send_impl (mq,
-                  mq->current_envelope->mh,
-                  mq->impl_state);
-  }
-  GNUNET_free (current_envelope);
+    return;
+  mq->current_envelope = mq->envelope_head;
+  GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
+                              mq->envelope_tail,
+                              mq->current_envelope);
+  mq->send_impl (mq,
+                mq->current_envelope->mh,
+                mq->impl_state);
 }
 
 
@@ -506,22 +464,32 @@ impl_send_continue (void *cls)
 void
 GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq)
 {
-  /* maybe #GNUNET_MQ_impl_send_in_flight was called? */
-  if (NULL != mq->send_task)
-  {
-    GNUNET_SCHEDULER_cancel (mq->send_task);
-  }
+  struct GNUNET_MQ_Envelope *current_envelope;
+  GNUNET_MQ_NotifyCallback cb;
+  
+  GNUNET_assert (0 < mq->queue_length);
+  mq->queue_length--;
+  current_envelope = mq->current_envelope;
+  current_envelope->parent_queue = NULL;
+  mq->current_envelope = NULL;
+  GNUNET_assert (NULL == mq->send_task);
   mq->send_task = GNUNET_SCHEDULER_add_now (&impl_send_continue,
-                                            mq);
+                                           mq);
+  if (NULL != (cb = current_envelope->sent_cb))
+  {
+    current_envelope->sent_cb = NULL;
+    cb (current_envelope->sent_cls);
+  }  
+  GNUNET_free (current_envelope);
 }
 
 
 /**
  * Call the send notification for the current message, but do not
- * try to send the next message until #gnunet_mq_impl_send_continue
+ * try to send the next message until #GNUNET_MQ_impl_send_continue
  * is called.
  *
- * only useful for implementing message queues, results in undefined
+ * Only useful for implementing message queues, results in undefined
  * behavior if not used carefully.
  *
  * @param mq message queue to send the next message with
@@ -529,9 +497,21 @@ GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq)
 void
 GNUNET_MQ_impl_send_in_flight (struct GNUNET_MQ_Handle *mq)
 {
-  GNUNET_assert (NULL == mq->send_task);
-  mq->send_task = GNUNET_SCHEDULER_add_now (&impl_send_in_flight,
-                                            mq);
+  struct GNUNET_MQ_Envelope *current_envelope;
+  GNUNET_MQ_NotifyCallback cb;
+  
+  mq->in_flight = GNUNET_YES;
+  /* call is only valid if we're actually currently sending
+   * a message */
+  current_envelope = mq->current_envelope;
+  GNUNET_assert (NULL != current_envelope);
+  /* can't call cancel from now on anymore */
+  current_envelope->parent_queue = NULL;
+  if (NULL != (cb = current_envelope->sent_cb))
+  {
+    current_envelope->sent_cb = NULL;
+    cb (current_envelope->sent_cls);
+  }
 }
 
 
@@ -1187,7 +1167,6 @@ GNUNET_MQ_send_cancel (struct GNUNET_MQ_Envelope *ev)
       GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
                                    mq->envelope_tail,
                                    mq->current_envelope);
-      mq->send_notification_called = GNUNET_NO;
       mq->send_impl (mq,
                     mq->current_envelope->mh,
                     mq->impl_state);