communicate operation timeouts
[oweals/gnunet.git] / src / mesh / mesh_api_new.c
index e466bca94ab32801f38130ba99990fab26d0eee5..79536238fc300b0c9ed051664133c20e86c7c4f4 100644 (file)
  * @file mesh/mesh_api_new.c
  * @brief mesh api: client implementation of mesh service
  * @author Bartlomiej Polot
- * 
+ *
  * STRUCTURE:
  * - CONSTANTS
  * - DATA STRUCTURES
- * - SEND CALLBACKS
+ * - AUXILIARY FUNCTIONS
  * - RECEIVE HANDLERS
+ * - SEND FUNCTIONS
  * - API CALL DEFINITIONS
  */
 
@@ -45,33 +46,103 @@ extern "C"
 #include "gnunet_common.h"
 #include "gnunet_client_lib.h"
 #include "gnunet_util_lib.h"
+#include "gnunet_peer_lib.h"
 #include "gnunet_mesh_service_new.h"
 #include "mesh.h"
+#include "mesh_protocol.h"
 
-/******************************************************************************/
-/**************************       CONSTANTS      ******************************/
-/******************************************************************************/
-
-#define GNUNET_MESH_LOCAL_TUNNEL_ID_MARK 0x80000000
+#define MESH_API_MAX_QUEUE 10
 
 /******************************************************************************/
 /************************      DATA STRUCTURES     ****************************/
 /******************************************************************************/
 
+/**
+ * Transmission queue to the service
+ */
+struct GNUNET_MESH_TransmitHandle
+{
+    /**
+     * Double Linked list
+     */
+  struct GNUNET_MESH_TransmitHandle *next;
+
+    /**
+     * Double Linked list
+     */
+  struct GNUNET_MESH_TransmitHandle *prev;
+
+    /**
+     * Data itself, currently points to the end of this struct if 
+     * we have a message already, NULL if the message is to be 
+     * obtained from the callback.
+     */
+  const struct GNUNET_MessageHeader *data;
+
+  /**
+   * Tunnel this message is sent over (may be NULL for control messages).
+   */
+  struct GNUNET_MESH_Tunnel *tunnel;
+
+  /**
+   * Callback to obtain the message to transmit, or NULL if we
+   * got the message in 'data'.  Notice that messages built
+   * by 'notify' need to be encapsulated with information about
+   * the 'target'.
+   */
+  GNUNET_CONNECTION_TransmitReadyNotify notify;
+
+  /**
+   * Closure for 'notify'
+   */
+  void *notify_cls;
+  
+  /**
+   * How long is this message valid.  Once the timeout has been
+   * reached, the message must no longer be sent.  If this 
+   * is a message with a 'notify' callback set, the 'notify'
+   * function should be called with 'buf' NULL and size 0.
+   */
+  struct GNUNET_TIME_Absolute timeout;
+
+  /**
+   * Task triggering a timeout, can be NO_TASK if the timeout is FOREVER.
+   */
+  GNUNET_SCHEDULER_TaskIdentifier timeout_task;
+
+  /**
+   * Priority of the message.  The queue is sorted by priority,
+   * control messages have the maximum priority (UINT32_MAX).
+   */
+  uint32_t priority;
+  /**
+   * Target of the message, 0 for broadcast.  This field
+   * is only valid if 'notify' is non-NULL.
+   */
+  GNUNET_PEER_Id target;
+                                 
+  /**
+   * Size of 'data' -- or the desired size of 'notify' if 'data' is NULL.
+   */
+  size_t size;
+};
+
+
 /**
  * Opaque handle to the service.
  */
-struct GNUNET_MESH_Handle {
+struct GNUNET_MESH_Handle
+{
     /**
      * Handle to the server connection, to send messages later
      */
-    struct GNUNET_CLIENT_Connection             *client;
+  struct GNUNET_CLIENT_Connection *client;
 
     /**
      * Set of handlers used for processing incoming messages in the tunnels
      */
-    const struct GNUNET_MESH_MessageHandler     *message_handlers;
-    int                                         n_handlers;
+  const struct GNUNET_MESH_MessageHandler *message_handlers;
 
     /**
      * Set of applications that should be claimed to be offered at this node.
@@ -79,87 +150,107 @@ struct GNUNET_MESH_Handle {
      * registered independently and the mapping is up to the developer of the
      * client application.
      */
-    const GNUNET_MESH_ApplicationType           *applications;
-    int                                         n_applications;
+  const GNUNET_MESH_ApplicationType *applications; 
 
     /**
      * Double linked list of the tunnels this client is connected to.
      */
-    struct GNUNET_MESH_Tunnel                   *tunnels_head;
-    struct GNUNET_MESH_Tunnel                   *tunnels_tail;
-
-    /**
-     * tid of the next tunnel to create (to avoid reusing IDs often)
-     */
-    MESH_TunnelID                               next_tid;
+  struct GNUNET_MESH_Tunnel *tunnels_head;
+  struct GNUNET_MESH_Tunnel *tunnels_tail;
 
     /**
      * Callback for tunnel disconnection
      */
-    GNUNET_MESH_TunnelEndHandler                *cleaner;
+  GNUNET_MESH_TunnelEndHandler *cleaner;
 
     /**
      * Handle to cancel pending transmissions in case of disconnection
      */
-    struct GNUNET_CLIENT_TransmitHandle         *th;
+  struct GNUNET_CLIENT_TransmitHandle *th;
 
     /**
      * Closure for all the handlers given by the client
      */
-    void                                        *cls;
+  void *cls;
+
+    /**
+     * Messages to send to the service
+     */
+  struct GNUNET_MESH_TransmitHandle *queue_head;
+  struct GNUNET_MESH_TransmitHandle *queue_tail;
+
+    /**
+     * tid of the next tunnel to create (to avoid reusing IDs often)
+     */
+  MESH_TunnelNumber next_tid;
+
+  unsigned int n_handlers;
+
+  unsigned int n_applications;
+
+  unsigned int max_queue_size;
+
+  /**
+   * Have we started the task to receive messages from the service
+   * yet? We do this after we send the 'MESH_LOCAL_CONNECT' message.
+   */
+  int in_receive;
 };
 
 /**
  * Opaque handle to a tunnel.
  */
-struct GNUNET_MESH_Tunnel {
+struct GNUNET_MESH_Tunnel
+{
 
     /**
      * DLL
      */
-    struct GNUNET_MESH_Tunnel                   *next;
-    struct GNUNET_MESH_Tunnel                   *prev;
+  struct GNUNET_MESH_Tunnel *next;
+  struct GNUNET_MESH_Tunnel *prev;
 
     /**
-     * Owner of the tunnel, either local or remote
+     * Callback to execute when peers connect to the tunnel
      */
-    GNUNET_PEER_Id                              owner;
+  GNUNET_MESH_TunnelConnectHandler connect_handler;
 
     /**
-     * Local ID of the tunnel
+     * Callback to execute when peers disconnect to the tunnel
      */
-    MESH_TunnelID                               tid;
+  GNUNET_MESH_TunnelDisconnectHandler disconnect_handler;
 
     /**
-     * Callback to execute when peers connect to the tunnel
+     * All peers added to the tunnel
      */
-    GNUNET_MESH_TunnelConnectHandler            connect_handler;
+  GNUNET_PEER_Id *peers;
 
     /**
-     * Callback to execute when peers disconnect to the tunnel
+     * Closure for the connect/disconnect handlers
      */
-    GNUNET_MESH_TunnelDisconnectHandler         disconnect_handler;
+  void *cls;
 
     /**
-     * All peers added to the tunnel
+     * Handle to the mesh this tunnel belongs to
      */
-    GNUNET_PEER_Id                              *peers;
+  struct GNUNET_MESH_Handle *mesh;
 
     /**
-     * Closure for the connect/disconnect handlers
+     * Local ID of the tunnel
      */
-    void                                        *cls;
+  MESH_TunnelNumber tid;
 
     /**
-     * Handle to the mesh this tunnel belongs to
+     * Owner of the tunnel
      */
-    struct GNUNET_MESH_Handle                   *mesh;
-};
+  GNUNET_PEER_Id owner;
 
-struct GNUNET_MESH_TransmitHandle {
-    // TODO
+    /**
+     * Number of peer added to the tunnel
+     */
+  unsigned int npeers;
 };
 
+
 /******************************************************************************/
 /***********************     AUXILIARY FUNCTIONS      *************************/
 /******************************************************************************/
@@ -171,221 +262,183 @@ struct GNUNET_MESH_TransmitHandle {
  * @return handle to the required tunnel or NULL if not found
  */
 static struct GNUNET_MESH_Tunnel *
-retrieve_tunnel (struct GNUNET_MESH_Handle *h, MESH_TunnelID tid) 
+retrieve_tunnel (struct GNUNET_MESH_Handle *h, MESH_TunnelNumber tid)
 {
-    struct GNUNET_MESH_Tunnel           *t;
-
-    t = h->tunnels_head;
-    while (t != NULL) {
-        if (t->tid == tid) return t;
-        t = t->next;
-    }
-    return NULL;
+  struct GNUNET_MESH_Tunnel *t;
+
+  t = h->tunnels_head;
+  while (t != NULL)
+  {
+    if (t->tid == tid)
+      return t;
+    t = t->next;
+  }
+  return NULL;
 }
 
-
-/******************************************************************************/
-/************************       SEND CALLBACKS     ****************************/
-/******************************************************************************/
-
-
 /**
- * Function called to send a connect message to the service, specifying the
- * types and applications that the client is interested in.
- * "buf" will be NULL and "size" zero if the socket was closed for writing in
- * the meantime.
- *
- * @param cls closure, the mesh handle
- * @param size number of bytes available in buf
- * @param buf where the callee should write the connect message
- * @return number of bytes written to buf
+ * Get the length of the transmission queue
+ * @param h mesh handle whose queue is to be measured
  */
-static size_t 
-send_connect_packet (void *cls, size_t size, void *buf)
+static unsigned int
+get_queue_length (struct GNUNET_MESH_Handle *h)
 {
-    struct GNUNET_MESH_Handle           *h = cls;
-    struct GNUNET_MESH_ClientConnect    *msg;
-    uint16_t                            *types;
-    uint16_t                            ntypes;
-    GNUNET_MESH_ApplicationType         *apps;
-    uint16_t                            napps;
-
-    h->th = NULL;
-    if (0 == size || buf == NULL) {
-        GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                    "Send connect packet: buffer size 0 or buffer invalid\n");
-       // FIXME: disconnect, reconnect, retry!
-        return 0;
-    }
-    if (sizeof(struct GNUNET_MessageHeader) > size) {
-        GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                    "Send connect packet: buffer size too small\n");
-       // FIXME: disconnect, reconnect, retry!
-        return 0;
-    }
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Send connect packet: %lu bytes buffer\n",
-                size);
-    msg = (struct GNUNET_MESH_ClientConnect *) buf;
-    msg->header.type = htons(GNUNET_MESSAGE_TYPE_MESH_LOCAL_CONNECT);
+  struct GNUNET_MESH_TransmitHandle *q;
+  unsigned int i;
 
-    for (ntypes = 0, types = NULL; h->message_handlers[ntypes].type; ntypes++) {
-        types = GNUNET_realloc(types, sizeof(uint16_t) * (ntypes + 1));
-        types[ntypes] = h->message_handlers[ntypes].type;
-    }
+  /* count */
+  for (q = h->queue_head, i = 0; NULL != q; q = q->next, i++) ;
 
-    for(napps = 0, apps = NULL; h->applications[napps]; napps++) {
-        apps = GNUNET_realloc(apps,
-                              sizeof(GNUNET_MESH_ApplicationType) *
-                                (napps + 1));
-        apps[napps] = h->applications[napps];
-    }
-
-    msg->header.size = htons(sizeof(struct GNUNET_MESH_ClientConnect) +
-                             sizeof(uint16_t) * ntypes +
-                             sizeof(GNUNET_MESH_ApplicationType) * napps);
-
-    memcpy(&msg[1], types, sizeof(uint16_t) * ntypes);
-    memcpy(&msg[1] + sizeof(uint16_t) * ntypes,
-           apps,
-           sizeof(GNUNET_MESH_ApplicationType) * napps);
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Sent %lu bytes long message %d types and %d apps\n",
-                ntohs(msg->header.size),
-                ntypes,
-                napps
-               );
-    msg->applications = htons(napps);
-    msg->types = htons(ntypes);
-
-    return ntohs(msg->header.size);
+  return i;
 }
 
 
+/******************************************************************************/
+/***********************      RECEIVE HANDLERS     ****************************/
+/******************************************************************************/
+
 /**
- * Function called to send a create tunnel message, specifying the tunnel
- * number chosen by the client.
- * "buf" will be NULL and "size" zero if the socket was closed for
- * writing in the meantime.
+ * Process the new tunnel notification and add it to the tunnels in the handle
  *
- * @param cls closure, the tunnel handle
- * @param size number of bytes available in buf
- * @param buf where the callee should write the create tunnel message
- * @return number of bytes written to buf
+ * @param h     The mesh handle
+ * @param msg   A message with the details of the new incoming tunnel
  */
-static size_t 
-send_tunnel_create_packet (void *cls, size_t size, void *buf)
+static void
+process_tunnel_create (struct GNUNET_MESH_Handle *h,
+                       const struct GNUNET_MESH_TunnelMessage *msg)
 {
-    struct GNUNET_MESH_Tunnel           *t = cls;
-    struct GNUNET_MESH_Handle           *h;
-    struct GNUNET_MESH_TunnelMessage    *msg;
-
-    h = t->mesh;
-    h->th = NULL;
-    if (0 == size || buf == NULL) {
-        GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                    "Send connect packet: buffer size 0 or buffer invalid\n");
-        // FIXME: disconnect, reconnect, retry!
-        return 0;
-    }
-    if (sizeof(struct GNUNET_MessageHeader) > size) {
-        GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                    "Send connect packet: buffer size too small\n");
-        // FIXME: disconnect, reconnect, retry!
-        return 0;
-    }
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Send connect packet: %lu bytes buffer\n",
-                size);
-    msg = (struct GNUNET_MESH_TunnelMessage *) buf;
-    msg->header.type = htons(GNUNET_MESSAGE_TYPE_MESH_LOCAL_CONNECT);
-
-    msg->header.size = htons(sizeof(struct GNUNET_MESH_TunnelMessage));
-    msg->tunnel_id = htonl(t->tid);
+  struct GNUNET_MESH_Tunnel *t;
+  MESH_TunnelNumber tid;
 
+  tid = ntohl (msg->tunnel_id);
+  if (tid >= GNUNET_MESH_LOCAL_TUNNEL_ID_MARK)
+  {
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Sent %lu bytes long message\n",
-                ntohs(msg->header.size));
-
-    return ntohs(msg->header.size);
+                "MESH: received an incoming tunnel with tid in local range (%X)\n",
+                tid);
+    GNUNET_break_op (0);
+    return;                     //FIXME abort? reconnect?
+  }
+  t = GNUNET_malloc (sizeof (struct GNUNET_MESH_Tunnel));
+  t->cls = h->cls;
+  t->mesh = h;
+  t->tid = tid;
+
+  return;
 }
 
 
-/******************************************************************************/
-/***********************      RECEIVE HANDLERS     ****************************/
-/******************************************************************************/
-
 /**
- * Process the new tunnel notification and add it to the tunnels in the handle
- * 
+ * Process the new peer event and notify the upper level of it
+ *
  * @param h     The mesh handle
- * @param msh   A message with the details of the new incoming tunnel
+ * @param msg   A message with the details of the peer event
  */
 static void
-process_tunnel_create(struct GNUNET_MESH_Handle *h, 
-                      const struct GNUNET_MESH_TunnelMessage *msg)
+process_peer_event (struct GNUNET_MESH_Handle *h,
+                    const struct GNUNET_MESH_PeerControl *msg)
 {
-    struct GNUNET_MESH_Tunnel                   *t;
-    MESH_TunnelID                               tid;
-
-    tid = ntohl(msg->tunnel_id);
-    if (tid >= GNUNET_MESH_LOCAL_TUNNEL_ID_MARK) {
-        GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-            "MESH: received an incoming tunnel with tid in local range (%X)\n",
-            tid);
-        return; //FIXME abort? reconnect?
-    }
-    t = GNUNET_malloc(sizeof(struct GNUNET_MESH_Tunnel));
-    t->cls = h->cls;
-    t->connect_handler = NULL;
-    t->disconnect_handler = NULL;
-    t->mesh = h;
-    t->tid = tid;
+  struct GNUNET_MESH_Tunnel *t;
+  uint16_t size;
+
+  size = ntohs (msg->header.size);
+  if (size != sizeof (struct GNUNET_MESH_PeerControl))
+  {
+    GNUNET_break_op (0);
     return;
+  }
+  t = retrieve_tunnel (h, ntohl (msg->tunnel_id));
+  if (NULL == t)
+  {
+    GNUNET_break_op (0);
+    return;
+  }
+  if (GNUNET_MESSAGE_TYPE_MESH_LOCAL_PEER_CONNECTED == msg->header.type)
+  {
+    if (NULL != t->connect_handler)
+    {
+      t->connect_handler (t->cls, &msg->peer, NULL);    /* FIXME atsi */
+    }
+  }
+  else
+  {
+    if (NULL != t->disconnect_handler)
+    {
+      t->disconnect_handler (t->cls, &msg->peer);
+    }
+  }
 }
 
 
 /**
  * Process the incoming data packets
- * 
+ *
  * @param h     The mesh handle
  * @param msh   A message encapsulating the data
  */
 static void
-process_incoming_data(struct GNUNET_MESH_Handle *h, 
-                      const struct GNUNET_MESH_Data *msg)
+process_incoming_data (struct GNUNET_MESH_Handle *h,
+                       const struct GNUNET_MessageHeader *message)
 {
-    const struct GNUNET_MESH_Data               *payload;
-    const struct GNUNET_MESH_MessageHandler     *handler;
-    struct GNUNET_MESH_Tunnel                   *t;
-    uint16_t                                    type;
-    int                                         i;
-
-    t = retrieve_tunnel(h, ntohl(msg->tunnel_id));
-
-    payload = (struct GNUNET_MESH_Data *) &msg[1];
-    type = ntohs(payload->header.type);
-    for (i = 0; i < h->n_handlers; i++) {
-        handler = &h->message_handlers[i];
-        if (handler->type == type) {
-            /* FIXME */
-            if (GNUNET_OK == handler->callback (h->cls,
-                                                t,
-                                                NULL,
-                                                NULL,
-                                                NULL,
-                                                NULL))
-            {
-                GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
-                            "MESH: callback completed successfully\n");
-            } else {
-                GNUNET_log(GNUNET_ERROR_TYPE_WARNING,
-                            "MESH: callback caused disconnection\n");
-                GNUNET_MESH_disconnect(h);
-            }
-        }
-    }
+  const struct GNUNET_MessageHeader *payload;
+  const struct GNUNET_MESH_MessageHandler *handler;
+  const struct GNUNET_PeerIdentity *peer;
+  struct GNUNET_MESH_Unicast *ucast;
+  struct GNUNET_MESH_Multicast *mcast;
+  struct GNUNET_MESH_ToOrigin *to_orig;
+  struct GNUNET_MESH_Tunnel *t;
+  uint16_t type;
+  int i;
+
+  type = ntohs (message->type);
+  switch (type)
+  {
+  case GNUNET_MESSAGE_TYPE_MESH_UNICAST:
+    ucast = (struct GNUNET_MESH_Unicast *) message;
+    t = retrieve_tunnel (h, ntohl (ucast->tid));
+    payload = (struct GNUNET_MessageHeader *) &ucast[1];
+    peer = &ucast->oid;
+    break;
+  case GNUNET_MESSAGE_TYPE_MESH_MULTICAST:
+    mcast = (struct GNUNET_MESH_Multicast *) message;
+    t = retrieve_tunnel (h, ntohl (mcast->tid));
+    payload = (struct GNUNET_MessageHeader *) &mcast[1];
+    peer = &mcast->oid;
+    break;
+  case GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN:
+    to_orig = (struct GNUNET_MESH_ToOrigin *) message;
+    t = retrieve_tunnel (h, ntohl (to_orig->tid));
+    payload = (struct GNUNET_MessageHeader *) &to_orig[1];
+    peer = &to_orig->sender;
+    break;
+  default:
+    GNUNET_break_op (0);
     return;
+  }
+  if (NULL == t)
+  {
+    GNUNET_break_op (0);
+    return;
+  }
+  for (i = 0; i < h->n_handlers; i++)
+  {
+    handler = &h->message_handlers[i];
+    if (handler->type == type)
+    {
+      if (GNUNET_OK == handler->callback (h->cls, t, NULL,      /* FIXME ctx */
+                                          peer, payload, NULL)) /* FIXME atsi */
+      {
+        GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                    "MESH: callback completed successfully\n");
+      }
+      else
+      {
+        GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                    "MESH: callback caused disconnection\n");
+        GNUNET_MESH_disconnect (h);
+      }
+    }
+  }
 }
 
 
@@ -396,45 +449,244 @@ process_incoming_data(struct GNUNET_MESH_Handle *h,
  * @param msg message received, NULL on timeout or fatal error
  */
 static void
-msg_received (void *cls, const struct GNUNET_MessageHeader * msg)
+msg_received (void *cls, const struct GNUNET_MessageHeader *msg)
 {
-    struct GNUNET_MESH_Handle                   *h = cls;
+  struct GNUNET_MESH_Handle *h = cls;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "mesh: received a message from MESH\n");
+  if (msg == NULL)
+  {
+    GNUNET_break (0);
+    h->in_receive = GNUNET_NO;
+    // rather: do_reconnect () -- and set 'in_receive' to NO there...
+    // FIXME: service disconnect, handle!
+    return;
+  }
+
+  switch (ntohs (msg->type))
+  {
+    /* Notify of a new incoming tunnel */
+  case GNUNET_MESSAGE_TYPE_MESH_LOCAL_TUNNEL_CREATE:
+    process_tunnel_create (h, (struct GNUNET_MESH_TunnelMessage *) msg);
+    break;
+    /* Notify of a new peer or a peer disconnect in the tunnel */
+  case GNUNET_MESSAGE_TYPE_MESH_LOCAL_PEER_CONNECTED:
+  case GNUNET_MESSAGE_TYPE_MESH_LOCAL_PEER_DISCONNECTED:
+    process_peer_event (h, (struct GNUNET_MESH_PeerControl *) msg);
+    break;
+    /* Notify of a new data packet in the tunnel */
+  case GNUNET_MESSAGE_TYPE_MESH_UNICAST:
+  case GNUNET_MESSAGE_TYPE_MESH_MULTICAST:
+  case GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN:
+    process_incoming_data (h, msg);
+    break;
+    /* We shouldn't get any other packages, log and ignore */
+  default:
+    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                "MESH: unsolicited message form service (type %d)\n",
+                ntohs (msg->type));
+  }
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "mesh: message processed\n");
+  GNUNET_CLIENT_receive (h->client, &msg_received, h,
+                         GNUNET_TIME_UNIT_FOREVER_REL);
+}
+
+
+/******************************************************************************/
+/************************       SEND FUNCTIONS     ****************************/
+/******************************************************************************/
 
-    if (msg == NULL) {
-        GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
-               "received a NULL message from mesh\n");
-        return;
+/**
+ * Function called to send a message to the service.
+ * "buf" will be NULL and "size" zero if the socket was closed for writing in
+ * the meantime.
+ *
+ * @param cls closure, the mesh handle
+ * @param size number of bytes available in buf
+ * @param buf where the callee should write the connect message
+ * @return number of bytes written to buf
+ */
+static size_t
+send_raw (void *cls, size_t size, void *buf)
+{
+  struct GNUNET_MESH_Handle *h = cls;
+  struct GNUNET_MESH_TransmitHandle *q;
+  char *cbuf = buf;
+  size_t ret;
+  size_t psize;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "mesh: Send packet() Buffer %u\n", size);
+  h->th = NULL;
+  if ( (0 == size) || (NULL == buf) )
+  {
+    // FIXME: disconnect, reconnect, retry?
+    // do_reconnect ();
+    return 0;
+  }
+  ret = 0;
+  while ( (NULL != (q = h->queue_head)) &&
+         (size >= q->size) )
+    {
+      GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, 
+                      "mesh-api",
+                      "type: %u\n",
+                      ntohs (q->data->type));
+      if (NULL == q->data)
+       {
+         GNUNET_assert (NULL != q->notify);
+         if (q->target == 0)
+           {
+             /* multicast */
+             struct GNUNET_MESH_Multicast mc; 
+             
+             GNUNET_assert (size >= sizeof (mc) + q->size);
+             psize = q->notify (q->notify_cls,
+                                size - sizeof (mc), 
+                                &cbuf[sizeof(mc)]);
+             if (psize > 0)
+               {
+                 mc.header.size = htons (sizeof (mc) + q->size);
+                 mc.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_MULTICAST);
+                 mc.tid = htonl (q->tunnel->tid);
+                 memset (&mc.oid, 0, sizeof (struct GNUNET_PeerIdentity)); /* myself */
+                 memcpy (cbuf, &mc, sizeof (mc));
+                 psize = q->size + sizeof (mc);
+               }
+           }
+         else
+           {
+             /* unicast */
+             struct GNUNET_MESH_Unicast uc; 
+             
+             GNUNET_assert (size >= sizeof (uc) + q->size);
+             psize = q->notify (q->notify_cls,
+                                size - sizeof (uc), 
+                                &cbuf[sizeof(uc)]);
+             if (psize > 0)
+               {
+                 uc.header.size = htons (sizeof (uc) + q->size);
+                 uc.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_UNICAST);
+                 uc.tid = htonl (q->tunnel->tid);
+                 memset (&uc.oid, 0, sizeof (struct GNUNET_PeerIdentity)); /* myself */
+                 GNUNET_PEER_resolve (q->target, &uc.destination);
+                 memcpy (cbuf, &uc, sizeof (uc));
+                 psize = q->size + sizeof (uc);
+               }         
+           }
+       }
+      else
+       {
+         memcpy (cbuf, q->data, q->size);
+         psize = q->size;
+       }
+      if (q->timeout_task != GNUNET_SCHEDULER_NO_TASK)
+       GNUNET_SCHEDULER_cancel (q->timeout_task);
+      GNUNET_CONTAINER_DLL_remove (h->queue_head, h->queue_tail, q);
+      GNUNET_free (q);
+      cbuf += psize;
+      size -= psize;
+      ret += psize;
     }
 
-    switch (ntohs(msg->type)) {
-        /* Notify of a new incoming tunnel */
-        case GNUNET_MESSAGE_TYPE_MESH_LOCAL_TUNNEL_CREATE:
-            process_tunnel_create(h, (struct GNUNET_MESH_TunnelMessage *)msg);
-            break;
-        /* Notify of a new peer in the tunnel */
-        case GNUNET_MESSAGE_TYPE_MESH_LOCAL_PEER_CONNECTED:
-            break;
-        /* Notify of a peer leaving the tunnel */
-        case GNUNET_MESSAGE_TYPE_MESH_LOCAL_PEER_DISCONNECTED:
-            break;
-        /* Notify of a new data packet in the tunnel */
-        case GNUNET_MESSAGE_TYPE_MESH_LOCAL_DATA:
-            process_incoming_data(h, (struct GNUNET_MESH_Data *)msg);
-            break;
-        /* We shouldn't get any other packages, log and ignore */
-        default:
-            GNUNET_log(GNUNET_ERROR_TYPE_WARNING,
-                        "MESH: unsolicited message form service (type %d)\n",
-                        ntohs(msg->type));
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "mesh:   size: %u\n", ret);
+
+  if (NULL != (q = h->queue_head))
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "mesh:   next size: %u\n",
+                q->size);
+    h->th =
+      GNUNET_CLIENT_notify_transmit_ready (h->client, q->size,
+                                          GNUNET_TIME_UNIT_FOREVER_REL,
+                                          GNUNET_YES, &send_raw, h);
+  }
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "mesh: Send packet() END\n");
+  if (GNUNET_NO == h->in_receive)
+    {
+      h->in_receive = GNUNET_YES;
+      GNUNET_CLIENT_receive (h->client, &msg_received, h,
+                            GNUNET_TIME_UNIT_FOREVER_REL);
     }
+  return ret;
+}
+
+
+static void
+timeout_transmission (void *cls,
+                     const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct GNUNET_MESH_TransmitHandle *q = cls;
+  struct GNUNET_MESH_Handle *mesh;
+  
+  mesh = q->tunnel->mesh;
+  GNUNET_CONTAINER_DLL_remove (mesh->queue_head, 
+                              mesh->queue_tail,
+                               q);
+  if (q->notify != NULL)
+    q->notify (q->notify_cls, 0, NULL); /* signal timeout */
+  GNUNET_free (q);    
+  if ( (NULL == mesh->queue_head) &&
+       (NULL != mesh->th) )
+    {
+      /* queue empty, no point in asking for transmission */
+      GNUNET_CLIENT_notify_transmit_ready_cancel (mesh->th);
+      mesh->th = NULL;
+    }    
+}
+
+
+/**
+ * Add a transmit handle to the transmission queue (by priority).
+ * Also manage timeout.
+ *
+ * @param h mesh handle with the queue head and tail
+ * @param q handle to add
+ */
+static void
+queue_transmit_handle (struct GNUNET_MESH_Handle *h,
+                      struct GNUNET_MESH_TransmitHandle *q)
+{
+  struct GNUNET_MESH_TransmitHandle *p;
+
+  p = h->queue_head;
+  while ( (NULL != p) && (q->priority < p->priority) )
+    p = p->next;
+  GNUNET_CONTAINER_DLL_insert_after (h->queue_head, h->queue_tail, p->prev, q);
+  if (GNUNET_TIME_UNIT_FOREVER_ABS.abs_value != q->timeout.abs_value)
+    q->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (q->timeout),
+                                                   &timeout_transmission,
+                                                   q);
+}
 
-    GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
-               "received a message from mesh\n");
-    GNUNET_CLIENT_receive (h->client,
-                        &msg_received,
-                        h, 
-                        GNUNET_TIME_UNIT_FOREVER_REL);
+
+/**
+ * Auxiliary function to send a packet to the service
+ * Takes care of creating a new queue element and calling the tmt_rdy function
+ * if necessary.
+ * @param h mesh handle
+ * @param msg message to transmit
+ */
+static void
+send_packet (struct GNUNET_MESH_Handle *h, 
+            const struct GNUNET_MessageHeader *msg)
+{
+  struct GNUNET_MESH_TransmitHandle *q;
+  size_t msize;
+
+  msize = ntohs (msg->size);
+  q = GNUNET_malloc (sizeof (struct GNUNET_MESH_TransmitHandle) + msize);
+  q->priority = UINT32_MAX;
+  q->timeout = GNUNET_TIME_UNIT_FOREVER_ABS;  
+  q->size = msize;
+  q->data = (void*) &q[1];
+  memcpy (&q[1], msg, msize);
+  queue_transmit_handle (h, q);
+  if (NULL != h->th)
     return;
+  h->th =
+    GNUNET_CLIENT_notify_transmit_ready (h->client, msize,
+                                        GNUNET_TIME_UNIT_FOREVER_REL,
+                                        GNUNET_YES, &send_raw, h);
 }
 
 /******************************************************************************/
@@ -453,52 +705,74 @@ msg_received (void *cls, const struct GNUNET_MessageHeader * msg)
  *                 inbound messages if the client does not process them fast
  *                 enough (for this notification type, a bounded queue is used)
  * @param stypes Application Types the client claims to offer
- * @return handle to the mesh service 
+ * @return handle to the mesh service
  *         NULL on error (in this case, init is never called)
  */
 struct GNUNET_MESH_Handle *
-GNUNET_MESH_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
-                     void *cls,
+GNUNET_MESH_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, void *cls,
                      GNUNET_MESH_TunnelEndHandler cleaner,
                      const struct GNUNET_MESH_MessageHandler *handlers,
-                     const GNUNET_MESH_ApplicationType *stypes) 
+                     const GNUNET_MESH_ApplicationType *stypes)
 {
-    struct GNUNET_MESH_Handle           *h;
-    size_t                              size;
+  struct GNUNET_MESH_Handle *h;
+  struct GNUNET_MESH_ClientConnect *msg;
+  GNUNET_MESH_ApplicationType *apps;
+  uint16_t napps;
+  uint16_t *types;
+  uint16_t ntypes;
+  size_t size;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "mesh: GNUNET_MESH_connect()\n");
+  h = GNUNET_malloc (sizeof (struct GNUNET_MESH_Handle));
+  h->max_queue_size = MESH_API_MAX_QUEUE; /* FIXME: add to arguments to 'GNUNET_MESH_connect' */
+  h->cleaner = cleaner;
+  h->client = GNUNET_CLIENT_connect ("mesh", cfg);
+  if (h->client == NULL)
+  {
+    GNUNET_break (0);
+    GNUNET_free (h);
+    return NULL;
+  }
 
-    h = GNUNET_malloc(sizeof(struct GNUNET_MESH_Handle));
+  h->cls = cls;
+  h->message_handlers = handlers;
+  h->applications = stypes;
+  h->next_tid = 0x80000000;
 
-    h->cleaner = cleaner;
-    h->client = GNUNET_CLIENT_connect("mesh", cfg);
-    GNUNET_CLIENT_receive (h->client,
-                         &msg_received,
-                         h, 
-                         GNUNET_TIME_UNIT_FOREVER_REL);
-    if(h->client == NULL) {
-        GNUNET_free(h);
-        return NULL;
-    }
+  /* count handlers and apps, calculate size */
+  for (h->n_handlers = 0; handlers[h->n_handlers].type; h->n_handlers++) ;
+  for (h->n_applications = 0; stypes[h->n_applications]; h->n_applications++) ;
+
+  size = sizeof (struct GNUNET_MESH_ClientConnect);
+  size += h->n_handlers * sizeof (uint16_t);
+  size += h->n_applications * sizeof (GNUNET_MESH_ApplicationType);
 
-    h->cls = cls;
-    h->message_handlers = handlers;
-    h->applications = stypes;
-    h->next_tid = 0x80000000;
+  {
+    char buf[size];
 
-    for(h->n_handlers = 0; handlers[h->n_handlers].type; h->n_handlers++);
-    for(h->n_applications = 0; stypes[h->n_applications]; h->n_applications++);
+    /* build connection packet */
+    msg = (struct GNUNET_MESH_ClientConnect *) buf;
+    msg->header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_CONNECT);
+    msg->header.size = htons (size);
+    types = (uint16_t *) & msg[1];
+    for (ntypes = 0; ntypes < h->n_handlers; ntypes++)
+      types[ntypes] = h->message_handlers[ntypes].type;      
+    apps = (GNUNET_MESH_ApplicationType *) &types[ntypes];
+    for (napps = 0; napps < h->n_applications; napps++)
+      apps[napps] = h->applications[napps];      
+    msg->applications = htons (napps);
+    msg->types = htons (ntypes);
 
-    size = sizeof(struct GNUNET_MESH_ClientConnect);
-    size += h->n_handlers * sizeof(uint16_t);
-    size += h->n_applications * sizeof(GNUNET_MESH_ApplicationType);
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+               "mesh: Sending %lu bytes long message %d types and %d apps\n",
+               ntohs (msg->header.size), ntypes, napps);
+    
+    send_packet (h, &msg->header);
+  }
 
-    h->th = GNUNET_CLIENT_notify_transmit_ready(h->client,
-                                                size,
-                                                GNUNET_TIME_UNIT_FOREVER_REL,
-                                                GNUNET_YES,
-                                                &send_connect_packet,
-                                                (void *)h);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "mesh: GNUNET_MESH_connect() END\n");
 
-    return h;
+  return h;
 }
 
 
@@ -507,16 +781,18 @@ GNUNET_MESH_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
  *
  * @param handle connection to mesh to disconnect
  */
-void 
-GNUNET_MESH_disconnect (struct GNUNET_MESH_Handle *handle) 
+void
+GNUNET_MESH_disconnect (struct GNUNET_MESH_Handle *handle)
 {
-    if (NULL != handle->th) {
-        GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
-    }
-    if (NULL != handle->client) {
-        GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO);
-    }
-    GNUNET_free(handle);
+  if (NULL != handle->th)
+  {
+    GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
+  }
+  if (NULL != handle->client)
+  {
+    GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO);
+  }
+  GNUNET_free (handle);
 }
 
 
@@ -531,33 +807,53 @@ GNUNET_MESH_disconnect (struct GNUNET_MESH_Handle *handle)
  */
 struct GNUNET_MESH_Tunnel *
 GNUNET_MESH_tunnel_create (struct GNUNET_MESH_Handle *h,
-                           GNUNET_MESH_TunnelConnectHandler
-                           connect_handler,
+                           GNUNET_MESH_TunnelConnectHandler connect_handler,
                            GNUNET_MESH_TunnelDisconnectHandler
-                           disconnect_handler,
-                           void *handler_cls)
+                           disconnect_handler, void *handler_cls)
+{
+  struct GNUNET_MESH_Tunnel *t;
+  struct GNUNET_MESH_TunnelMessage msg;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "mesh: Creating new tunnel\n");
+  t = GNUNET_malloc (sizeof (struct GNUNET_MESH_Tunnel));
+
+  t->connect_handler = connect_handler;
+  t->disconnect_handler = disconnect_handler;
+  t->cls = handler_cls;
+  t->mesh = h;
+  t->tid = h->next_tid++;
+  h->next_tid |= GNUNET_MESH_LOCAL_TUNNEL_ID_MARK;      // keep in range
+
+  msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_TUNNEL_CREATE);
+  msg.header.size = htons (sizeof (struct GNUNET_MESH_TunnelMessage));
+  msg.tunnel_id = htonl (t->tid);
+  send_packet (h, &msg.header);
+  return t;
+}
+
+
+/**
+ * Destroy an existing tunnel.
+ *
+ * @param tun tunnel handle
+ */
+void
+GNUNET_MESH_tunnel_destroy (struct GNUNET_MESH_Tunnel *tun)
 {
-    struct GNUNET_MESH_Tunnel           *tunnel;
-
-    GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
-               "MESH: Creating new tunnel\n");
-    tunnel = GNUNET_malloc(sizeof(struct GNUNET_MESH_Tunnel));
-
-    tunnel->connect_handler = connect_handler;
-    tunnel->disconnect_handler = disconnect_handler;
-    tunnel->cls = handler_cls;
-    tunnel->mesh = h;
-    tunnel->tid = h->next_tid++;
-    h->next_tid |= GNUNET_MESH_LOCAL_TUNNEL_ID_MARK; // keep in range
-
-    h->th = GNUNET_CLIENT_notify_transmit_ready(h->client,
-                                    sizeof(struct GNUNET_MESH_TunnelMessage),
-                                    GNUNET_TIME_UNIT_FOREVER_REL,
-                                    GNUNET_YES,
-                                    &send_tunnel_create_packet,
-                                    (void *)tunnel);
-
-    return tunnel;
+  struct GNUNET_MESH_Handle *h;
+  struct GNUNET_MESH_TunnelMessage *msg;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "mesh: Destroying tunnel\n");
+
+  h = tun->mesh;
+  msg = GNUNET_malloc (sizeof (struct GNUNET_MESH_TunnelMessage));
+  msg->header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_TUNNEL_DESTROY);
+  msg->header.size = htons (sizeof (struct GNUNET_MESH_TunnelMessage));
+  msg->tunnel_id = htonl (tun->tid);
+
+  GNUNET_free (tun);
+
+  send_packet (h, &msg->header);
 }
 
 
@@ -574,14 +870,36 @@ GNUNET_MESH_peer_request_connect_add (struct GNUNET_MESH_Tunnel *tunnel,
                                       struct GNUNET_TIME_Relative timeout,
                                       const struct GNUNET_PeerIdentity *peer)
 {
-    static GNUNET_PEER_Id       peer_id;
-
-    peer_id = GNUNET_PEER_intern(peer);
-
-    /* FIXME ACTUALLY DO STUFF */
-    tunnel->peers = &peer_id;
-    tunnel->connect_handler(tunnel->cls, peer, NULL);
-    return;
+  struct GNUNET_MESH_PeerControl *msg;
+  GNUNET_PEER_Id peer_id;
+  unsigned int i;
+  
+  peer_id = GNUNET_PEER_intern (peer);
+  for (i = 0; i < tunnel->npeers; i++)
+  {
+    if (tunnel->peers[i] == peer_id)
+    {
+      GNUNET_PEER_change_rc (peer_id, -1);
+      return;
+    }
+  }
+  tunnel->npeers++;
+  tunnel->peers =
+      GNUNET_realloc (tunnel->peers, tunnel->npeers * sizeof (GNUNET_PEER_Id));
+  tunnel->peers[tunnel->npeers - 1] = peer_id;
+
+  msg = GNUNET_malloc (sizeof (struct GNUNET_MESH_PeerControl));
+  msg->header.size = htons (sizeof (struct GNUNET_MESH_PeerControl));
+  msg->header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_CONNECT_PEER_ADD);
+  msg->tunnel_id = htonl (tunnel->tid);
+  msg->timeout = GNUNET_TIME_absolute_hton (GNUNET_TIME_relative_to_absolute (timeout));
+  memcpy (&msg->peer, peer, sizeof (struct GNUNET_PeerIdentity));
+
+  send_packet (tunnel->mesh, &msg->header);
+
+//   tunnel->connect_handler (tunnel->cls, peer, NULL); FIXME call this later
+//   TODO: remember timeout
+  return;
 }
 
 
@@ -596,10 +914,35 @@ void
 GNUNET_MESH_peer_request_connect_del (struct GNUNET_MESH_Tunnel *tunnel,
                                       const struct GNUNET_PeerIdentity *peer)
 {
-    /* FIXME ACTUALLY DO STUFF */
-    tunnel->peers = NULL;
-    tunnel->disconnect_handler(tunnel->cls, peer);
-    return;
+  struct GNUNET_MESH_PeerControl msg;
+  GNUNET_PEER_Id peer_id;
+  unsigned int i;
+
+  peer_id = GNUNET_PEER_search (peer);
+  if (0 == peer_id)
+    {
+      GNUNET_break (0);
+      return;
+    }
+  for (i = 0; i < tunnel->npeers; i++)
+    if (tunnel->peers[i] == peer_id)
+      break;
+  if (i == tunnel->npeers)
+    {
+      GNUNET_break (0);
+      return;
+    }
+  GNUNET_PEER_change_rc (peer_id, -1);
+  tunnel->peers[i] = tunnel->peers[tunnel->npeers-1];
+  GNUNET_array_grow (tunnel->peers,
+                    tunnel->npeers,
+                    tunnel->npeers - 1);
+  msg.header.size = htons (sizeof (struct GNUNET_MESH_PeerControl));
+  msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_CONNECT_PEER_DEL);
+  msg.tunnel_id = htonl (tunnel->tid);
+  msg.timeout = GNUNET_TIME_absolute_hton (GNUNET_TIME_UNIT_FOREVER_ABS);
+  memcpy (&msg.peer, peer, sizeof (struct GNUNET_PeerIdentity));
+  send_packet (tunnel->mesh, &msg.header);
 }
 
 
@@ -615,10 +958,16 @@ GNUNET_MESH_peer_request_connect_del (struct GNUNET_MESH_Tunnel *tunnel,
 void
 GNUNET_MESH_peer_request_connect_by_type (struct GNUNET_MESH_Tunnel *tunnel,
                                           struct GNUNET_TIME_Relative timeout,
-                                          GNUNET_MESH_ApplicationType
-                                          app_type)
+                                          GNUNET_MESH_ApplicationType app_type)
 {
-    return;
+  struct GNUNET_MESH_ConnectPeerByType msg;
+
+  msg.header.size = htons (sizeof (struct GNUNET_MESH_ConnectPeerByType));
+  msg.header.type =  htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_CONNECT_PEER_BY_TYPE);
+  msg.tunnel_id = htonl (tunnel->tid);
+  msg.timeout = GNUNET_TIME_absolute_hton (GNUNET_TIME_relative_to_absolute (timeout));
+  msg.type = htonl (app_type);
+  send_packet (tunnel->mesh, &msg.header);
 }
 
 
@@ -633,7 +982,7 @@ GNUNET_MESH_peer_request_connect_by_type (struct GNUNET_MESH_Tunnel *tunnel,
  * @param priority how important is the message?
  * @param maxdelay how long can the message wait?
  * @param target destination for the message,
- *               NULL for multicast to all tunnel targets 
+ *               NULL for multicast to all tunnel targets
  * @param notify_size how many bytes of buffer space does notify want?
  * @param notify function to call when buffer space is available;
  *        will be called with NULL on timeout or if the overall queue
@@ -645,21 +994,58 @@ GNUNET_MESH_peer_request_connect_by_type (struct GNUNET_MESH_Tunnel *tunnel,
  *         memory); if NULL is returned, "notify" will NOT be called.
  */
 struct GNUNET_MESH_TransmitHandle *
-GNUNET_MESH_notify_transmit_ready (struct GNUNET_MESH_Tunnel *tunnel,
-                                   int cork,
+GNUNET_MESH_notify_transmit_ready (struct GNUNET_MESH_Tunnel *tunnel, int cork,
                                    uint32_t priority,
                                    struct GNUNET_TIME_Relative maxdelay,
                                    const struct GNUNET_PeerIdentity *target,
                                    size_t notify_size,
-                                   GNUNET_CONNECTION_TransmitReadyNotify
-                                   notify,
+                                   GNUNET_CONNECTION_TransmitReadyNotify notify,
                                    void *notify_cls)
 {
-    struct GNUNET_MESH_TransmitHandle   *handle;
+  struct GNUNET_MESH_TransmitHandle *q;
+  size_t overhead;
+
+  if (get_queue_length (tunnel->mesh) >= tunnel->mesh->max_queue_size)
+    return NULL; /* queue full */
+
+  q = GNUNET_malloc (sizeof (struct GNUNET_MESH_TransmitHandle));
+  q->tunnel = tunnel;
+  q->priority = priority;
+  q->timeout = GNUNET_TIME_relative_to_absolute (maxdelay);
+  q->target = GNUNET_PEER_intern (target);
+  overhead = (NULL == target) ? sizeof (struct GNUNET_MESH_Multicast) : sizeof (struct GNUNET_MESH_Unicast);
+  q->size = notify_size + overhead;
+  q->notify = notify;
+  q->notify_cls = notify_cls;
+  queue_transmit_handle (tunnel->mesh, q);
+  return q;
+}
 
-    handle = GNUNET_malloc(sizeof(struct GNUNET_MESH_TransmitHandle));
 
-    return handle;
+/**
+ * Cancel the specified transmission-ready notification.
+ *
+ * @param th handle that was returned by "notify_transmit_ready".
+ */
+void
+GNUNET_MESH_notify_transmit_ready_cancel (struct GNUNET_MESH_TransmitHandle *th)
+{
+  struct GNUNET_MESH_Handle *mesh;
+  
+  mesh = th->tunnel->mesh;
+  if (th->timeout_task != GNUNET_SCHEDULER_NO_TASK)
+    GNUNET_SCHEDULER_cancel (th->timeout_task);
+  GNUNET_CONTAINER_DLL_remove (mesh->queue_head, 
+                              mesh->queue_tail,
+                               th);
+  GNUNET_free (th);
+  if ( (NULL == mesh->queue_head) &&
+       (NULL != mesh->th) )
+    {
+      /* queue empty, no point in asking for transmission */
+      GNUNET_CLIENT_notify_transmit_ready_cancel (mesh->th);
+      mesh->th = NULL;
+    }    
 }