calc update
[oweals/gnunet.git] / src / transport / gnunet-service-transport.c
index f94ca3ef877f9a2ab4b1ef595d73513536de0d36..d654fcc8cfc89c55cd5fe4c60f31f11d682b3d7b 100644 (file)
@@ -573,17 +573,22 @@ struct TransportPongMessage
 
 };
 
+
 /**
- * Linked list of messages to be transmitted to
- * the client.  Each entry is followed by the
- * actual message.
+ * Linked list of messages to be transmitted to the client.  Each
+ * entry is followed by the actual message.
  */
 struct ClientMessageQueueEntry
 {
   /**
-   * This is a linked list.
+   * This is a doubly-linked list.
    */
   struct ClientMessageQueueEntry *next;
+
+  /**
+   * This is a doubly-linked list.
+   */
+  struct ClientMessageQueueEntry *prev;
 };
 
 
@@ -615,6 +620,11 @@ struct TransportClient
    */
   struct ClientMessageQueueEntry *message_queue_tail;
 
+  /**
+   * Current transmit request handle.
+   */ 
+  struct GNUNET_CONNECTION_TransmitHandle *th;
+
   /**
    * Is a call to "transmit_send_continuation" pending?  If so, we
    * must not free this struct (even if the corresponding client
@@ -853,19 +863,26 @@ find_transport (const char *short_name)
 
 /**
  * Update the quota values for the given neighbour now.
+ *
+ * @param n neighbour to update
+ * @param force GNUNET_YES to force recalculation now
  */
 static void
-update_quota (struct NeighbourList *n)
+update_quota (struct NeighbourList *n,
+             int force)
 {
-  struct GNUNET_TIME_Relative delta;
+  struct GNUNET_TIME_Absolute now;
+  unsigned long long delta;
   uint64_t allowed;
   uint64_t remaining;
 
-  delta = GNUNET_TIME_absolute_get_duration (n->last_quota_update);
-  if (delta.value < MIN_QUOTA_REFRESH_TIME)
-    return;                     /* not enough time passed for doing quota update */
-  allowed = delta.value * n->quota_in;
-
+  now = GNUNET_TIME_absolute_get ();
+  delta = now.value - n->last_quota_update.value;
+  allowed = n->quota_in * delta;
+  if ( (delta < MIN_QUOTA_REFRESH_TIME) &&
+       (!force) &&
+       (allowed < 32 * 1024) )
+    return;                     /* too early, not enough data */
   if (n->last_received < allowed)
     {
       remaining = allowed - n->last_received;
@@ -876,7 +893,7 @@ update_quota (struct NeighbourList *n)
       if (remaining > MAX_BANDWIDTH_CARRY)
         remaining = MAX_BANDWIDTH_CARRY;
       n->last_received = 0;
-      n->last_quota_update = GNUNET_TIME_absolute_get ();
+      n->last_quota_update = now;
       n->last_quota_update.value -= remaining;
       if (n->quota_violation_count > 0)
         n->quota_violation_count--;
@@ -884,10 +901,10 @@ update_quota (struct NeighbourList *n)
   else
     {
       n->last_received -= allowed;
-      n->last_quota_update = GNUNET_TIME_absolute_get ();
+      n->last_quota_update = now;
       if (n->last_received > allowed)
         {
-          /* more than twice the allowed rate! */
+          /* much more than the allowed rate! */
           n->quota_violation_count += 10;
         }
     }
@@ -912,9 +929,9 @@ transmit_to_client_callback (void *cls, size_t size, void *buf)
   uint16_t msize;
   size_t tsize;
   const struct GNUNET_MessageHeader *msg;
-  struct GNUNET_CONNECTION_TransmitHandle *th;
   char *cbuf;
 
+  client->th = NULL;
   if (buf == NULL)
     {
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -922,10 +939,11 @@ transmit_to_client_callback (void *cls, size_t size, void *buf)
       /* fatal error with client, free message queue! */
       while (NULL != (q = client->message_queue_head))
         {
-          client->message_queue_head = q->next;
+         GNUNET_CONTAINER_DLL_remove (client->message_queue_head,
+                                      client->message_queue_tail,
+                                      q);
           GNUNET_free (q);
         }
-      client->message_queue_tail = NULL;
       client->message_count = 0;
       return 0;
     }
@@ -942,9 +960,9 @@ transmit_to_client_callback (void *cls, size_t size, void *buf)
                   "Transmitting message of type %u to client.\n",
                   ntohs (msg->type));
 #endif
-      client->message_queue_head = q->next;
-      if (q->next == NULL)
-        client->message_queue_tail = NULL;
+      GNUNET_CONTAINER_DLL_remove (client->message_queue_head,
+                                  client->message_queue_tail,
+                                  q);
       memcpy (&cbuf[tsize], msg, msize);
       tsize += msize;
       GNUNET_free (q);
@@ -953,12 +971,12 @@ transmit_to_client_callback (void *cls, size_t size, void *buf)
   if (NULL != q)
     {
       GNUNET_assert (msize >= sizeof (struct GNUNET_MessageHeader));
-      th = GNUNET_SERVER_notify_transmit_ready (client->client,
-                                                msize,
-                                                GNUNET_TIME_UNIT_FOREVER_REL,
-                                                &transmit_to_client_callback,
-                                                client);
-      GNUNET_assert (th != NULL);
+      client->th = GNUNET_SERVER_notify_transmit_ready (client->client,
+                                                       msize,
+                                                       GNUNET_TIME_UNIT_FOREVER_REL,
+                                                       &transmit_to_client_callback,
+                                                       client);
+      GNUNET_assert (client->th != NULL);
     }
   return tsize;
 }
@@ -980,7 +998,6 @@ transmit_to_client (struct TransportClient *client,
 {
   struct ClientMessageQueueEntry *q;
   uint16_t msize;
-  struct GNUNET_CONNECTION_TransmitHandle *th;
 
   if ((client->message_count >= MAX_PENDING) && (GNUNET_YES == may_drop))
     {
@@ -991,30 +1008,23 @@ transmit_to_client (struct TransportClient *client,
       /* TODO: call to statistics... */
       return;
     }
-  client->message_count++;
   msize = ntohs (msg->size);
   GNUNET_assert (msize >= sizeof (struct GNUNET_MessageHeader));
   q = GNUNET_malloc (sizeof (struct ClientMessageQueueEntry) + msize);
   memcpy (&q[1], msg, msize);
-  /* append to message queue */
-  if (client->message_queue_tail == NULL)
-    {
-      client->message_queue_tail = q;
-    }
-  else
+  GNUNET_CONTAINER_DLL_insert_after (client->message_queue_head,
+                                    client->message_queue_tail,
+                                    client->message_queue_tail,
+                                    q);                                     
+  client->message_count++;
+  if (client->th == NULL)
     {
-      client->message_queue_tail->next = q;
-      client->message_queue_tail = q;
-    }
-  if (client->message_queue_head == NULL)
-    {
-      client->message_queue_head = q;
-      th = GNUNET_SERVER_notify_transmit_ready (client->client,
-                                                msize,
-                                                GNUNET_TIME_UNIT_FOREVER_REL,
-                                                &transmit_to_client_callback,
-                                                client);
-      GNUNET_assert (th != NULL);
+      client->th = GNUNET_SERVER_notify_transmit_ready (client->client,
+                                                       msize,
+                                                       GNUNET_TIME_UNIT_FOREVER_REL,
+                                                       &transmit_to_client_callback,
+                                                       client);
+      GNUNET_assert (client->th != NULL);
     }
 }
 
@@ -1181,22 +1191,17 @@ retry_transmission_task (void *cls,
 static void
 try_transmission_to_peer (struct NeighbourList *neighbour)
 {
-  struct GNUNET_TIME_Relative min_latency;
   struct ReadyList *rl;
   struct MessageQueue *mq;
   struct GNUNET_TIME_Relative timeout;
 
   if (neighbour->messages_head == NULL)
     return;                     /* nothing to do */
-  min_latency = GNUNET_TIME_UNIT_FOREVER_REL;
   rl = NULL;
   mq = neighbour->messages_head;
   /* FIXME: support bi-directional use of TCP */
   if (mq->specific_address == NULL)
     mq->specific_address = find_ready_address(neighbour); 
-  GNUNET_CONTAINER_DLL_remove (neighbour->messages_head,
-                              neighbour->messages_tail,
-                              mq);
   if (mq->specific_address == NULL)
     {
       timeout = GNUNET_TIME_absolute_get_remaining (mq->timeout);
@@ -1210,6 +1215,9 @@ try_transmission_to_peer (struct NeighbourList *neighbour)
 #endif
          if (mq->client != NULL)
            transmit_send_ok (mq->client, neighbour, GNUNET_NO);
+         GNUNET_CONTAINER_DLL_remove (neighbour->messages_head,
+                                      neighbour->messages_tail,
+                                      mq);
          GNUNET_free (mq);
          return;               /* nobody ready */ 
        }
@@ -1220,8 +1228,18 @@ try_transmission_to_peer (struct NeighbourList *neighbour)
                                                            timeout,
                                                            &retry_transmission_task,
                                                            neighbour);
+#if DEBUG_TRANSPORT
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "No validated destination address available to transmit message of size %u to peer `%4s', will wait %llums to find an address.\n",
+                 mq->message_buf_size,
+                 GNUNET_i2s (&mq->neighbour_id),
+                 timeout.value);
+#endif
       return;    
     }
+  GNUNET_CONTAINER_DLL_remove (neighbour->messages_head,
+                              neighbour->messages_tail,
+                              mq);
   if (mq->specific_address->connected == GNUNET_NO)
     mq->specific_address->connect_attempts++;
   rl = mq->specific_address->ready_list;
@@ -1831,11 +1849,11 @@ check_pending_validation (void *cls,
  * (otherwise we may be seeing a MiM attack).
  *
  * @param cls closure
- * @param name name of the transport that generated the address
+ * @param message the pong message
  * @param peer who responded to our challenge
- * @param challenge the challenge number we presumably used
- * @param sender_addr string describing our sender address (as observed
- *         by the other peer in human-readable format)
+ * @param sender_address string describing our sender address (as observed
+ *         by the other peer in binary format)
+ * @param sender_address_len number of bytes in 'sender_address'
  */
 static void
 handle_pong (void *cls, const struct GNUNET_MessageHeader *message,
@@ -2179,10 +2197,11 @@ add_to_foreign_address_list (void *cls,
     {
 #if DEBUG_TRANSPORT
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Adding address `%s' (%s) for peer `%4s' due to peerinfo data.\n",
+                 "Adding address `%s' (%s) for peer `%4s' due to peerinfo data for %llums.\n",
                  GNUNET_a2s (addr, addrlen),
                  tname,
-                 GNUNET_i2s (&n->id));
+                 GNUNET_i2s (&n->id),
+                 expiration.value);
 #endif
       fal = add_peer_address (n, tname, addr, addrlen);
     }
@@ -2201,7 +2220,7 @@ add_to_foreign_address_list (void *cls,
  *
  * @param cls closure
  * @param peer id of the peer, NULL for last call
- * @param hello hello message for the peer (can be NULL)
+ * @param h hello message for the peer (can be NULL)
  * @param trust amount of trust we have in the peer (not used)
  */
 static void
@@ -2254,10 +2273,13 @@ check_hello_validated (void *cls,
   chvc->hello_known = GNUNET_YES;
   n = find_neighbour (peer);
   if (n != NULL)
-    GNUNET_HELLO_iterate_addresses (h,
-                                   GNUNET_NO,
-                                   &add_to_foreign_address_list,
-                                   n);
+    {
+      GNUNET_HELLO_iterate_addresses (h,
+                                     GNUNET_NO,
+                                     &add_to_foreign_address_list,
+                                     n);
+      try_transmission_to_peer (n);
+    }
   GNUNET_HELLO_iterate_new_addresses (chvc->hello,
                                      h,
                                      GNUNET_TIME_relative_to_absolute (HELLO_REVALIDATION_START_TIME),
@@ -2523,6 +2545,47 @@ handle_ping(void *cls, const struct GNUNET_MessageHeader *message,
 }
 
 
+/**
+ * Calculate how long we should delay reading from the TCP socket to
+ * ensure that we stay within our bandwidth limits (push back).
+ *
+ * @param n for which neighbour should this be calculated
+ * @return how long to delay receiving more data
+ */
+static struct GNUNET_TIME_Relative
+calculate_throttle_delay (struct NeighbourList *n)
+{
+  struct GNUNET_TIME_Relative ret;
+  struct GNUNET_TIME_Absolute now;
+  uint64_t del;
+  uint64_t avail;
+  uint64_t excess;
+
+  now = GNUNET_TIME_absolute_get ();
+  del = now.value - n->last_quota_update.value;
+  if (del > MAX_BANDWIDTH_CARRY)
+    {
+      update_quota (n, GNUNET_YES);
+      del = now.value - n->last_quota_update.value;
+      GNUNET_assert (del <= MAX_BANDWIDTH_CARRY);
+    }
+  if (n->quota_in == 0)
+    n->quota_in = 1;      /* avoid divison by zero */
+  avail = del * n->quota_in;
+  if (avail > n->last_received)
+    return GNUNET_TIME_UNIT_ZERO;       /* can receive right now */
+  excess = n->last_received - avail;
+  ret.value = excess / n->quota_in;
+  if (ret.value > 0)
+    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+               "Throttling read (%llu bytes excess at %llu b/ms), waiting %llums before reading more.\n",
+               (unsigned long long) excess,
+               (unsigned long long) n->quota_in,
+               (unsigned long long) ret.value);
+  return ret;
+}
+
+
 /**
  * Function called by the plugin for each received message.
  * Update data volumes, possibly notify plugins about
@@ -2530,18 +2593,16 @@ handle_ping(void *cls, const struct GNUNET_MessageHeader *message,
  * and generally forward to our receive callback.
  *
  * @param cls the "struct TransportPlugin *" we gave to the plugin
- * @param message the message, NULL if peer was disconnected
- * @param distance the transport cost to this peer (not latency!)
- * @param sender_address the address that the sender reported
- *        (opaque to transport service)
- * @param sender_address_len the length of the sender address
  * @param peer (claimed) identity of the other peer
- * @return the new service_context that the plugin should use
- *         for future receive calls for messages from this
- *         particular peer
- *
- */
-static void
+ * @param message the message, NULL if we only care about
+ *                learning about the delay until we should receive again
+ * @param distance in overlay hops; use 1 unless DV (or 0 if message == NULL)
+ * @param sender_address binary address of the sender (if observed)
+ * @param sender_address_len number of bytes in sender_address
+ * @return how long the plugin should wait until receiving more data
+ *         (plugins that do not support this, can ignore the return value)
+ */
+static struct GNUNET_TIME_Relative
 plugin_env_receive (void *cls, const struct GNUNET_PeerIdentity *peer,
                     const struct GNUNET_MessageHeader *message,
                     unsigned int distance, const char *sender_address,
@@ -2557,110 +2618,87 @@ plugin_env_receive (void *cls, const struct GNUNET_PeerIdentity *peer,
 
   n = find_neighbour (peer);
   if (n == NULL)
-    {
-      if (message == NULL)
-        return;                 /* disconnect of peer already marked down */
-      n = setup_new_neighbour (peer);
-    }
+    n = setup_new_neighbour (peer);    
+  update_quota (n, GNUNET_NO);
   service_context = n->plugins;
   while ((service_context != NULL) && (plugin != service_context->plugin))
     service_context = service_context->next;
   GNUNET_assert ((plugin->api->send == NULL) || (service_context != NULL));
-  if (message == NULL)
+  if (message != NULL)
     {
-#if DEBUG_TRANSPORT
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
-                  "Receive failed from `%4s', triggering disconnect\n",
-                  GNUNET_i2s (&n->id));
-#endif
-      /* TODO: call stats */
-      disconnect_neighbour (n, GNUNET_YES);
-      return;
-    }
-  peer_address = add_peer_address(n, 
-                                 plugin->short_name,
-                                 sender_address, 
-                                 sender_address_len);  
-  if (peer_address != NULL)
-    {
-      peer_address->distance = distance;
-      if (peer_address->connected == GNUNET_NO)
-        {
-         peer_address->connected = GNUNET_YES;
-          peer_address->connect_attempts++;
-        }
-      peer_address->timeout
-        =
-        GNUNET_TIME_relative_to_absolute
-        (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
-    }
-  /* update traffic received amount ... */
-  msize = ntohs (message->size);
-  n->last_received += msize;
-  n->distance = distance;
-  n->peer_timeout =
-    GNUNET_TIME_relative_to_absolute
-    (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
-  GNUNET_SCHEDULER_cancel (sched,
-                          n->timeout_task);
-  n->timeout_task =
-    GNUNET_SCHEDULER_add_delayed (sched,
-                                  GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
-                                  &neighbour_timeout_task, n);
-  update_quota (n);
-  if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD)
-    {
-      /* dropping message due to frequent inbound volume violations! */
-      GNUNET_log (GNUNET_ERROR_TYPE_WARNING |
-                  GNUNET_ERROR_TYPE_BULK,
-                  _
-                  ("Dropping incoming message due to repeated bandwidth quota violations (total of %u).\n"), n->quota_violation_count);
-      /* TODO: call stats */
-      return;
-    }
-  switch (ntohs (message->type))
-    {
-    case GNUNET_MESSAGE_TYPE_HELLO:
-      process_hello (plugin, message);
-      break;
-    case GNUNET_MESSAGE_TYPE_TRANSPORT_PING:
-      handle_ping(plugin, message, peer, sender_address, sender_address_len);
-      break;
-    case GNUNET_MESSAGE_TYPE_TRANSPORT_PONG:
-      handle_pong(plugin, message, peer, sender_address, sender_address_len);
-      break;
-    default:
-      if (! n->received_pong)
+      peer_address = add_peer_address(n, 
+                                     plugin->short_name,
+                                     sender_address, 
+                                     sender_address_len);  
+      if (peer_address != NULL)
        {
-         GNUNET_break_op (0);
-#if DEBUG_TRANSPORT || 1
-         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                     "Received message of type %u from `%4s', prior to key confirmation (dropped).\n",
-                     ntohs (message->type), GNUNET_i2s (peer));
-#endif
-         break;
+         peer_address->distance = distance;
+         if (peer_address->connected == GNUNET_NO)
+           {
+             peer_address->connected = GNUNET_YES;
+             peer_address->connect_attempts++;
+           }
+         peer_address->timeout
+           =
+           GNUNET_TIME_relative_to_absolute
+           (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
+       }
+      /* update traffic received amount ... */
+      msize = ntohs (message->size);
+      n->distance = distance;
+      n->peer_timeout =
+       GNUNET_TIME_relative_to_absolute
+       (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
+      GNUNET_SCHEDULER_cancel (sched,
+                              n->timeout_task);
+      n->timeout_task =
+       GNUNET_SCHEDULER_add_delayed (sched,
+                                     GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
+                                     &neighbour_timeout_task, n);
+      if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD)
+       {
+         /* dropping message due to frequent inbound volume violations! */
+         GNUNET_log (GNUNET_ERROR_TYPE_WARNING |
+                     GNUNET_ERROR_TYPE_BULK,
+                     _
+                     ("Dropping incoming message due to repeated bandwidth quota violations (total of %u).\n"), 
+                     n->quota_violation_count);
+         return GNUNET_TIME_UNIT_MINUTES; /* minimum penalty, likely ignored (UDP...) */
        }
+      switch (ntohs (message->type))
+       {
+       case GNUNET_MESSAGE_TYPE_HELLO:
+         process_hello (plugin, message);
+         break;
+       case GNUNET_MESSAGE_TYPE_TRANSPORT_PING:
+         handle_ping(plugin, message, peer, sender_address, sender_address_len);
+         break;
+       case GNUNET_MESSAGE_TYPE_TRANSPORT_PONG:
+         handle_pong(plugin, message, peer, sender_address, sender_address_len);
+         break;
+       default:
 #if DEBUG_TRANSPORT
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Received message of type %u from `%4s', sending to all clients.\n",
-                  ntohs (message->type), GNUNET_i2s (peer));
+         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                     "Received message of type %u from `%4s', sending to all clients.\n",
+                     ntohs (message->type), GNUNET_i2s (peer));
 #endif
-      /* transmit message to all clients */
-      im = GNUNET_malloc (sizeof (struct InboundMessage) + msize);
-      im->header.size = htons (sizeof (struct InboundMessage) + msize);
-      im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
-      im->latency = GNUNET_TIME_relative_hton (n->latency);
-      im->peer = *peer;
-      memcpy (&im[1], message, msize);
-
-      cpos = clients;
-      while (cpos != NULL)
-        {
-          transmit_to_client (cpos, &im->header, GNUNET_YES);
-          cpos = cpos->next;
-        }
-      GNUNET_free (im);
-    }
+         /* transmit message to all clients */
+         im = GNUNET_malloc (sizeof (struct InboundMessage) + msize);
+         im->header.size = htons (sizeof (struct InboundMessage) + msize);
+         im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
+         im->latency = GNUNET_TIME_relative_hton (n->latency);
+         im->peer = *peer;
+         memcpy (&im[1], message, msize);
+         cpos = clients;
+         while (cpos != NULL)
+           {
+             transmit_to_client (cpos, &im->header, GNUNET_YES);
+             cpos = cpos->next;
+           }
+         GNUNET_free (im);
+       }
+    }  
+  return calculate_throttle_delay (n);
 }
 
 
@@ -2744,10 +2782,6 @@ handle_hello (void *cls,
 {
   int ret;
 
-#if DEBUG_TRANSPORT
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Received `%s' request from client\n", "HELLO");
-#endif
   ret = process_hello (NULL, message);
   GNUNET_SERVER_receive_done (client, ret);
 }
@@ -2830,8 +2864,7 @@ handle_set_quota (void *cls,
   const struct QuotaSetMessage *qsm =
     (const struct QuotaSetMessage *) message;
   struct NeighbourList *n;
-  struct TransportPlugin *p;
-  struct ReadyList *rl;
+  uint32_t qin;
 
   n = find_neighbour (&qsm->peer);
   if (n == NULL)
@@ -2839,25 +2872,16 @@ handle_set_quota (void *cls,
       GNUNET_SERVER_receive_done (client, GNUNET_OK);
       return;
     }
-
+  qin = ntohl (qsm->quota_in);
 #if DEBUG_TRANSPORT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Received `%s' request (new quota %u, old quota %u) from client for peer `%4s'\n",
-              "SET_QUOTA", ntohl(qsm->quota_in), n->quota_in, GNUNET_i2s (&qsm->peer));
+              "SET_QUOTA", qin, n->quota_in, GNUNET_i2s (&qsm->peer));
 #endif
-
-  update_quota (n);
-  if (n->quota_in < ntohl (qsm->quota_in))
+  update_quota (n, GNUNET_YES);
+  if (n->quota_in < qin)
     n->last_quota_update = GNUNET_TIME_absolute_get ();
-  n->quota_in = ntohl (qsm->quota_in);
-  rl = n->plugins;
-  while (rl != NULL)
-    {
-      p = rl->plugin;
-      p->api->set_receive_quota (p->api->cls,
-                                 &qsm->peer, ntohl (qsm->quota_in));
-      rl = rl->next;
-    }
+  n->quota_in = qin;
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
 }
 
@@ -2974,8 +2998,6 @@ create_environment (struct TransportPlugin *plug)
   plug->env.cls = plug;
   plug->env.receive = &plugin_env_receive;
   plug->env.notify_address = &plugin_env_notify_address;
-  plug->env.default_quota_in =
-    (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT + 59999) / (60 * 1000);
   plug->env.max_connections = max_connect_per_transport;
 }
 
@@ -3043,10 +3065,12 @@ client_disconnect_notification (void *cls,
     return;
   while (NULL != (mqe = pos->message_queue_head))
     {
-      pos->message_queue_head = mqe->next;
+      GNUNET_CONTAINER_DLL_remove (pos->message_queue_head,
+                                  pos->message_queue_tail,
+                                  mqe);
+      pos->message_count--;
       GNUNET_free (mqe);
     }
-  pos->message_queue_head = NULL;
   if (prev == NULL)
     clients = pos->next;
   else
@@ -3056,6 +3080,12 @@ client_disconnect_notification (void *cls,
       pos->client = NULL;
       return;
     }
+  if (pos->th != NULL)
+    {
+      GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->th);
+      pos->th = NULL;
+    }
+  GNUNET_break (0 == pos->message_count);
   GNUNET_free (pos);
 }
 
@@ -3096,6 +3126,8 @@ shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
   struct OwnAddressList *al;
   struct CheckHelloValidatedContext *chvc;
 
+  while (neighbours != NULL)
+    disconnect_neighbour (neighbours, GNUNET_NO);
 #if DEBUG_TRANSPORT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Transport service is unloading plugins...\n");