use #5553 function in TCP/UDP communicators
[oweals/gnunet.git] / src / transport / gnunet-communicator-udp.c
index 23fb289bc6d791d862bfa96ef509f446047db7e4..fa8eb6acb913e602e45cf58158795abfccc29ff6 100644 (file)
  * @author Christian Grothoff
  *
  * TODO:
+ * - consider imposing transmission limits in the absence
+ *   of ACKs; or: maybe this should be done at TNG service level?
+ *   (at least the receiver might want to enforce limits on
+ *    KX/DH operations per sender in here) (#5552)
+ * - overall, we should look more into flow control support
+ *   (either in backchannel, or general solution in TNG service)
+ * - handle addresses discovered from broadcasts (#5551)
+ *   (think: what was the story again on address validation?
+ *    where is the API for that!?!)
  * - support DNS names in BINDTO option (#5528)
  * - support NAT connection reversal method (#5529)
- * - support other UDP-specific NAT traversal methods 
+ * - support other UDP-specific NAT traversal methods (#) 
  */
 #include "platform.h"
 #include "gnunet_util_lib.h"
 #include "gnunet_statistics_service.h"
 #include "gnunet_transport_communication_service.h"
 
-/**
- * How many messages do we keep at most in the queue to the
- * transport service before we start to drop (default,
- * can be changed via the configuration file).
- * Should be _below_ the level of the communicator API, as
- * otherwise we may read messages just to have them dropped
- * by the communicator API.
- */
-#define DEFAULT_MAX_QUEUE_LENGTH 8
-
 /**
  * How often do we rekey based on time (at least)
  */ 
  */ 
 #define PROTO_QUEUE_TIMEOUT GNUNET_TIME_UNIT_MINUTES
 
+/**
+ * How often do we broadcast our presence on the LAN?
+ */ 
+#define BROADCAST_FREQUENCY GNUNET_TIME_UNIT_MINUTES
+
+/**
+ * How often do we scan for changes to our network interfaces?
+ */ 
+#define INTERFACE_SCAN_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
+
+/**
+ * AES key size.
+ */
+#define AES_KEY_SIZE (256/8)
+
+/**
+ * AES (GCM) IV size.
+ */
+#define AES_IV_SIZE (96/8)
+
+/**
+ * Size of the GCM tag.
+ */
+#define GCM_TAG_SIZE (128/8)
+
+/**
+ * If we fall below this number of available KCNs,
+ * we generate additional ACKs until we reach
+ * #KCN_TARGET.
+ * Should be large enough that we don't generate ACKs all
+ * the time and still have enough time for the ACK to
+ * arrive before the sender runs out. So really this 
+ * should ideally be based on the RTT.
+ */
+#define KCN_THRESHOLD 92
+
+/**
+ * How many KCNs do we keep around *after* we hit
+ * the #KCN_THRESHOLD? Should be larger than
+ * #KCN_THRESHOLD so we do not generate just one
+ * ACK at the time.
+ */
+#define KCN_TARGET 128
+
+/**
+ * What is the maximum delta between KCN sequence numbers
+ * that we allow. Used to expire 'ancient' KCNs that likely
+ * were dropped by the network.  Must be larger than
+ * KCN_TARGET (otherwise we generate new KCNs all the time),
+ * but not too large (otherwise packet loss may cause
+ * sender to fall back to KX needlessly when sender runs
+ * out of ACK'ed KCNs due to losses).
+ */
+#define MAX_SQN_DELTA 160
+
+/**
+ * How many shared master secrets do we keep around 
+ * at most per sender?  Should be large enough so 
+ * that we generally have a chance of sending an ACK
+ * before the sender already rotated out the master
+ * secret.  Generally values around #KCN_TARGET make
+ * sense. Might make sense to adapt to RTT if we had
+ * a good measurement...
+ */
+#define MAX_SECRETS 128
+
 /**
  * How often do we rekey based on number of bytes transmitted?
  * (additionally randomized).
 /**
  * Address prefix used by the communicator.
  */
+
 #define COMMUNICATOR_ADDRESS_PREFIX "udp"
 
 /**
@@ -122,13 +188,13 @@ struct InitialKX
   /**
    * Ephemeral key for KX.
    */ 
-  struct GNUNET_CRYPT_EddsaPublicKey ephemeral;
+  struct GNUNET_CRYPTO_EcdhePublicKey ephemeral;
 
   /**
    * HMAC for the following encrypted message, using GCM.  HMAC uses
    * key derived from the handshake with sequence number zero.
    */ 
-  uint8_t gcm_tag[128/8];
+  char gcm_tag[GCM_TAG_SIZE];
 
 };
 
@@ -187,6 +253,56 @@ struct UDPAck
 };
 
 
+/**
+ * Signature we use to verify that the broadcast was really made by
+ * the peer that claims to have made it.  Basically, affirms that the
+ * peer is really using this IP address (albeit possibly not in _our_
+ * LAN).  Makes it difficult for peers in the LAN to claim to 
+ * be just any global peer -- an attacker must have at least
+ * shared a LAN with the peer they're pretending to be here.
+ */
+struct UdpBroadcastSignature
+{
+  /**
+   * Purpose must be #GNUNET_SIGNATURE_COMMUNICATOR_UDP_BROADCAST
+   */
+  struct GNUNET_CRYPTO_EccSignaturePurpose purpose;
+
+  /**
+   * Identity of the inititor of the UDP broadcast.
+   */ 
+  struct GNUNET_PeerIdentity sender;
+
+  /**
+   * Hash of the sender's UDP address.
+   */ 
+  struct GNUNET_HashCode h_address;
+};
+
+
+/**
+ * Broadcast by peer in LAN announcing its presence.  Unusual in that
+ * we don't pad these to full MTU, as we cannot prevent being
+ * recognized in LAN as GNUnet peers if this feature is enabled
+ * anyway.  Also, the entire message is in cleartext.
+ */
+struct UDPBroadcast
+{
+
+  /**
+   * Sender's peer identity.
+   */
+  struct GNUNET_PeerIdentity sender;
+
+  /**
+   * Sender's signature of type
+   * #GNUNET_SIGNATURE_COMMUNICATOR_UDP_BROADCAST
+   */
+  struct GNUNET_CRYPTO_EddsaSignature sender_sig;  
+  
+};
+
+
 /**
  * UDP message box.  Always sent encrypted, only allowed after
  * the receiver sent a `struct UDPAck` for the base key!
@@ -210,8 +326,7 @@ struct UDPBox
    * wrong, the receiver should check if the message might be a
    * `struct UdpHandshakeSignature`.
    */ 
-  uint8_t gcm_tag[128/8];
-
+  char gcm_tag[GCM_TAG_SIZE];
   
 };
 
@@ -321,7 +436,7 @@ struct SharedSecret
 
   /**
    * Up to which sequence number did we use this @e master already?
-   * (for sending or receiving)
+   * (for encrypting only)
    */ 
   uint32_t sequence_used;
 
@@ -331,6 +446,11 @@ struct SharedSecret
    * use this key?
    */
   uint32_t sequence_allowed;
+
+  /**
+   * Number of active KCN entries.
+   */ 
+  unsigned int active_kce_count;
 };
 
 
@@ -411,6 +531,12 @@ struct ReceiverAddress
    */
   struct SharedSecret *ss_tail;
 
+  /**
+   * Address of the receiver in the human-readable format
+   * with the #COMMUNICATOR_ADDRESS_PREFIX.
+   */ 
+  char *foreign_addr;
+
   /**
    * Address of the other peer.
    */
@@ -421,6 +547,11 @@ struct ReceiverAddress
    */
   socklen_t address_len;
 
+  /**
+   * Entry in sender expiration heap.
+   */ 
+  struct GNUNET_CONTAINER_HeapNode *hn;
+
   /**
    * Message queue we are providing for the #ch.
    */
@@ -436,10 +567,21 @@ struct ReceiverAddress
    */
   struct GNUNET_TIME_Absolute timeout;
 
+  /**
+   * MTU we allowed transport for this receiver right now.
+   */
+  size_t mtu;
+  
   /**
    * Length of the DLL at @a ss_head.
    */ 
   unsigned int num_secrets;
+
+  /**
+   * Number of BOX keys from ACKs we have currently 
+   * available for this receiver.
+   */ 
+  unsigned int acks_available;
   
   /**
    * Which network type does this queue use?
@@ -449,10 +591,64 @@ struct ReceiverAddress
 };
 
 
+/**
+ * Interface we broadcast our presence on.
+ */
+struct BroadcastInterface
+{
+
+  /**
+   * Kept in a DLL.
+   */
+  struct BroadcastInterface *next;
+
+  /**
+   * Kept in a DLL.
+   */
+  struct BroadcastInterface *prev;
+
+  /**
+   * Task for this broadcast interface.
+   */
+  struct GNUNET_SCHEDULER_Task *broadcast_task;
+  
+  /**
+   * Sender's address of the interface.
+   */
+  struct sockaddr *sa;
+  
+  /**
+   * Broadcast address to use on the interface.
+   */
+  struct sockaddr *ba;
+
+  /**
+   * Message we broadcast on this interface.
+   */ 
+  struct UDPBroadcast bcm;
+  
+  /**
+   * If this is an IPv6 interface, this is the request
+   * we use to join/leave the group.
+   */
+  struct ipv6_mreq mcreq;
+  
+  /**
+   * Number of bytes in @e sa.
+   */ 
+  socklen_t salen;
+
+  /**
+   * Was this interface found in the last #iface_proc() scan?
+   */
+  int found;
+};
+
+
 /**
  * Cache of pre-generated key IDs.
  */
-static struct GNUNET_CONTINER_MultiShortMap *key_cache;
+static struct GNUNET_CONTAINER_MultiShortmap *key_cache;
 
 /**
  * ID of read task
@@ -464,6 +660,11 @@ static struct GNUNET_SCHEDULER_Task *read_task;
  */
 static struct GNUNET_SCHEDULER_Task *timeout_task;
 
+/**
+ * ID of master broadcast task
+ */
+static struct GNUNET_SCHEDULER_Task *broadcast_task;
+
 /**
  * For logging statistics.
  */
@@ -494,11 +695,26 @@ static struct GNUNET_CONTAINER_Heap *senders_heap;
  */
 static struct GNUNET_CONTAINER_Heap *receivers_heap;
 
+/**
+ * Broadcast interface tasks. Kept in a DLL.
+ */
+static struct BroadcastInterface *bi_head;
+
+/**
+ * Broadcast interface tasks. Kept in a DLL.
+ */
+static struct BroadcastInterface *bi_tail;
+
 /**
  * Our socket.
  */
 static struct GNUNET_NETWORK_Handle *udp_sock;
 
+/** 
+ * #GNUNET_YES if #udp_sock supports IPv6.
+ */ 
+static int have_v6_socket;
+
 /**
  * Our public key.
  */
@@ -524,10 +740,47 @@ static struct GNUNET_NT_InterfaceScanner *is;
  */
 static struct GNUNET_NAT_Handle *nat;
 
+/**
+ * Port number to which we are actually bound.
+ */ 
+static uint16_t my_port;
+
 
 /**
- * Functions with this signature are called whenever we need
- * to close a receiving state due to timeout.
+ * An interface went away, stop broadcasting on it.
+ *
+ * @param bi entity to close down
+ */
+static void
+bi_destroy (struct BroadcastInterface *bi)
+{
+  if (AF_INET6 == bi->sa->sa_family)
+  {
+    /* Leave the multicast group */
+    if (GNUNET_OK !=
+        GNUNET_NETWORK_socket_setsockopt
+        (udp_sock,
+        IPPROTO_IPV6,
+        IPV6_LEAVE_GROUP,
+         &bi->mcreq,
+        sizeof (bi->mcreq)))
+    {
+      GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING,
+                          "setsockopt");
+    }
+  }
+  GNUNET_CONTAINER_DLL_remove (bi_head,
+                              bi_tail,
+                              bi);
+  GNUNET_SCHEDULER_cancel (bi->broadcast_task);
+  GNUNET_free (bi->sa);
+  GNUNET_free_non_null (bi->ba);
+  GNUNET_free (bi);
+}
+
+
+/**
+ * Destroys a receiving state due to timeout or shutdown.
  *
  * @param receiver entity to close down
  */
@@ -544,16 +797,23 @@ receiver_destroy (struct ReceiverAddress *receiver)
     receiver->mq = NULL;
     GNUNET_MQ_destroy (mq);
   }
+  if (NULL != receiver->qh)
+  {
+    GNUNET_TRANSPORT_communicator_mq_del (receiver->qh);
+    receiver->qh = NULL;
+  }
   GNUNET_assert (GNUNET_YES ==
                  GNUNET_CONTAINER_multipeermap_remove (receivers,
                                                       &receiver->target,
                                                       receiver));
-  // FIXME: remove from receiver_heap
+  GNUNET_assert (receiver ==
+                GNUNET_CONTAINER_heap_remove_node (receiver->hn));
   GNUNET_STATISTICS_set (stats,
                         "# receivers active",
                         GNUNET_CONTAINER_multipeermap_size (receivers),
                         GNUNET_NO);
   GNUNET_free (receiver->address);
+  GNUNET_free (receiver->foreign_addr);
   GNUNET_free (receiver);
 }
 
@@ -568,6 +828,7 @@ kce_destroy (struct KeyCacheEntry *kce)
 {
   struct SharedSecret *ss = kce->ss;
 
+  ss->active_kce_count--;
   GNUNET_CONTAINER_DLL_remove (ss->kce_head,
                               ss->kce_tail,
                               kce);
@@ -629,13 +890,14 @@ kce_generate (struct SharedSecret *ss,
   GNUNET_CONTAINER_DLL_insert (ss->kce_head,
                               ss->kce_tail,
                               kce);
+  ss->active_kce_count++;
   (void) GNUNET_CONTAINER_multishortmap_put (key_cache,
                                             &kce->kid,
                                             kce,
                                             GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
   GNUNET_STATISTICS_set (stats,
                         "# KIDs active",
-                        GNUNET_CONTAINER_multipeermap_size (key_cache),
+                        GNUNET_CONTAINER_multishortmap_size (key_cache),
                         GNUNET_NO);
 }
 
@@ -665,6 +927,8 @@ secret_destroy (struct SharedSecret *ss)
                                 receiver->ss_tail,
                                 ss);
     receiver->num_secrets--;
+    receiver->acks_available
+      -= (ss->sequence_allowed - ss->sequence_used);
   }
   while (NULL != (kce = ss->kce_head))
     kce_destroy (kce);
@@ -674,7 +938,7 @@ secret_destroy (struct SharedSecret *ss)
                            GNUNET_NO);
   GNUNET_STATISTICS_set (stats,
                         "# KIDs active",
-                        GNUNET_CONTAINER_multipeermap_size (key_cache),
+                        GNUNET_CONTAINER_multishortmap_size (key_cache),
                         GNUNET_NO);
   GNUNET_free (ss);
 }
@@ -693,7 +957,8 @@ sender_destroy (struct SenderAddress *sender)
                  GNUNET_CONTAINER_multipeermap_remove (senders,
                                                       &sender->target,
                                                       sender));
-  // FIXME: remove from sender_heap
+  GNUNET_assert (sender ==
+                GNUNET_CONTAINER_heap_remove_node (sender->hn));
   GNUNET_STATISTICS_set (stats,
                         "# senders active",
                         GNUNET_CONTAINER_multipeermap_size (senders),
@@ -703,38 +968,6 @@ sender_destroy (struct SenderAddress *sender)
 }
 
 
-/**
- * Compute @a smac over @a buf.
- *
- * @param msec master secret for HMAC calculation
- * @param serial number for the @a smac calculation
- * @param buf buffer to MAC
- * @param buf_size number of bytes in @a buf
- * @param smac[out] where to write the HMAC
- */
-static void
-get_hmac (const struct GNUNET_HashCode *msec,
-         uint32_t serial,
-         const void *buf,
-         size_t buf_size,
-         struct GNUNET_ShortHashCode *smac)
-{
-  uint32_t sid = htonl (serial);
-
-  GNUNET_CRYPTO_hkdf (smac,
-                     sizeof (*smac),
-                     GCRY_MD_SHA512,
-                     GCRY_MD_SHA256,
-                     &sid,
-                     sizeof (sid),
-                     msec,
-                     sizeof (*msec),
-                     "UDP-HMAC",
-                     strlen ("UDP-HMAC"),
-                     NULL, 0);
-}
-
-
 /**
  * Compute @a key and @a iv.
  *
@@ -746,11 +979,11 @@ get_hmac (const struct GNUNET_HashCode *msec,
 static void
 get_iv_key (const struct GNUNET_HashCode *msec,
            uint32_t serial,
-           char key[256/8],
-           char iv[96/8])
+           char key[AES_KEY_SIZE],
+           char iv[AES_IV_SIZE])
 {
   uint32_t sid = htonl (serial);
-  char res[sizeof(key) + sizeof (iv)];
+  char res[AES_KEY_SIZE + AES_IV_SIZE];
 
   GNUNET_CRYPTO_hkdf (res,
                      sizeof (res),
@@ -764,11 +997,11 @@ get_iv_key (const struct GNUNET_HashCode *msec,
                      strlen ("UDP-IV-KEY"),
                      NULL, 0);
   memcpy (key,
-         sid,
-         sizeof (key));
+         res,
+         AES_KEY_SIZE);
   memcpy (iv,
-         &sid[sizeof(key)],
-         sizeof (iv));
+         &res[AES_KEY_SIZE],
+         AES_IV_SIZE);
 }
 
 
@@ -782,7 +1015,8 @@ reschedule_sender_timeout (struct SenderAddress *sender)
 {
   sender->timeout
     = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
-  // FIXME: update heap!
+  GNUNET_CONTAINER_heap_update_cost (sender->hn,
+                                    sender->timeout.abs_value_us);
 }
 
 
@@ -796,10 +1030,52 @@ reschedule_receiver_timeout (struct ReceiverAddress *receiver)
 {
   receiver->timeout
     = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
-  // FIXME: update heap!
+  GNUNET_CONTAINER_heap_update_cost (receiver->hn,
+                                    receiver->timeout.abs_value_us);
 }
 
 
+/**
+ * Task run to check #receiver_heap and #sender_heap for timeouts.
+ *
+ * @param cls unused, NULL
+ */
+static void
+check_timeouts (void *cls)
+{
+  struct GNUNET_TIME_Relative st;
+  struct GNUNET_TIME_Relative rt;
+  struct GNUNET_TIME_Relative delay;
+  struct ReceiverAddress *receiver;
+  struct SenderAddress *sender;
+  
+  (void) cls;
+  timeout_task = NULL;
+  rt = GNUNET_TIME_UNIT_FOREVER_REL;
+  while (NULL != (receiver = GNUNET_CONTAINER_heap_peek (receivers_heap)))
+  {
+    rt = GNUNET_TIME_absolute_get_remaining (receiver->timeout);
+    if (0 != rt.rel_value_us)
+      break;
+    receiver_destroy (receiver);
+  }
+  st = GNUNET_TIME_UNIT_FOREVER_REL;
+  while (NULL != (sender = GNUNET_CONTAINER_heap_peek (senders_heap)))
+  {
+    st = GNUNET_TIME_absolute_get_remaining (receiver->timeout);
+    if (0 != st.rel_value_us)
+      break;
+    sender_destroy (sender);
+  }
+  delay = GNUNET_TIME_relative_min (rt,
+                                   st);
+  if (delay.rel_value_us < GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us)
+    timeout_task = GNUNET_SCHEDULER_add_delayed (delay,
+                                                &check_timeouts,
+                                                NULL);  
+}
+                         
+
 /**
  * Calcualte cmac from master in @a ss.
  *
@@ -844,7 +1120,7 @@ pass_plaintext_to_core (struct SenderAddress *sender,
                              ntohs (hdr->size),
                              GNUNET_NO);
     (void) GNUNET_TRANSPORT_communicator_receive (ch,
-                                                 &queue->target,
+                                                 &sender->target,
                                                  hdr,
                                                  NULL /* no flow control possible */,
                                                  NULL);
@@ -874,8 +1150,8 @@ setup_cipher (const struct GNUNET_HashCode *msec,
              uint32_t serial,
              gcry_cipher_hd_t *cipher)
 {
-  char key[256/8];
-  char iv[96/8];
+  char key[AES_KEY_SIZE];
+  char iv[AES_IV_SIZE];
 
   gcry_cipher_open (cipher,
                    GCRY_CIPHER_AES256 /* low level: go for speed */,
@@ -908,7 +1184,7 @@ setup_cipher (const struct GNUNET_HashCode *msec,
  */
 static int
 try_decrypt (const struct SharedSecret *ss,
-            char tag[128/8],
+            const char tag[GCM_TAG_SIZE],
             uint32_t serial,
             const char *in_buf,
             size_t in_buf_size,
@@ -921,14 +1197,14 @@ try_decrypt (const struct SharedSecret *ss,
                &cipher);
   GNUNET_assert (0 ==
                 gcry_cipher_decrypt (cipher,
-                                     in_buf,
-                                     in_buf_size,
                                      out_buf,
+                                     in_buf_size,
+                                     in_buf,
                                      in_buf_size));
   if (0 !=
       gcry_cipher_checktag (cipher,
                            tag,
-                           sizeof (tag)))
+                           GCM_TAG_SIZE))
   {
     gcry_cipher_close (cipher);
     GNUNET_STATISTICS_update (stats,
@@ -978,7 +1254,7 @@ setup_shared_secret_enc (const struct GNUNET_CRYPTO_EcdhePrivateKey *ephemeral,
   GNUNET_CRYPTO_ecdh_eddsa (ephemeral,
                            &receiver->target.public_key,
                            &ss->master);
-  calculcate_cmac (ss);
+  calculate_cmac (ss);
   ss->receiver = receiver;
   GNUNET_CONTAINER_DLL_insert (receiver->ss_head,
                               receiver->ss_tail,
@@ -990,7 +1266,74 @@ setup_shared_secret_enc (const struct GNUNET_CRYPTO_EcdhePrivateKey *ephemeral,
                            GNUNET_NO);
   return ss;
 }
-               
+
+
+/**
+ * Setup the MQ for the @a receiver.  If a queue exists,
+ * the existing one is destroyed.  Then the MTU is
+ * recalculated and a fresh queue is initialized.
+ *
+ * @param receiver receiver to setup MQ for
+ */ 
+static void
+setup_receiver_mq (struct ReceiverAddress *receiver);
+
+
+/**
+ * We received an ACK for @a pid. Check if it is for
+ * the receiver in @a value and if so, handle it and
+ * return #GNUNET_NO. Otherwise, return #GNUNET_YES.
+ *
+ * @param cls a `const struct UDPAck`
+ * @param pid peer the ACK is from
+ * @param value a `struct ReceiverAddress`
+ * @return #GNUNET_YES to continue to iterate
+ */
+static int
+handle_ack (void *cls,
+           const struct GNUNET_PeerIdentity *pid,
+           void *value)
+{
+  const struct UDPAck *ack = cls;
+  struct ReceiverAddress *receiver = value;
+
+  (void) pid;
+  for (struct SharedSecret *ss = receiver->ss_head;
+       NULL != ss;
+       ss = ss->next)
+  {
+    if (0 == memcmp (&ack->cmac,
+                    &ss->cmac,
+                    sizeof (struct GNUNET_HashCode)))
+    {
+      uint32_t allowed;
+      
+      allowed = ntohl (ack->sequence_max);
+                           
+      if (allowed > ss->sequence_allowed)
+      {
+       receiver->acks_available += (allowed - ss->sequence_allowed);
+       if ((allowed - ss->sequence_allowed)
+           == receiver->acks_available)
+       {
+         /* we just incremented from zero => MTU change! */
+         setup_receiver_mq (receiver);
+       }
+       ss->sequence_allowed = allowed;
+       /* move ss to head to avoid discarding it anytime soon! */
+       GNUNET_CONTAINER_DLL_remove (receiver->ss_head,
+                                    receiver->ss_tail,
+                                    ss);
+       GNUNET_CONTAINER_DLL_insert (receiver->ss_head,
+                                    receiver->ss_tail,
+                                    ss);
+      }
+      return GNUNET_NO;
+    }
+  }
+  return GNUNET_YES;
+}
+
 
 /**
  * Test if we have received a valid message in plaintext.
@@ -1006,40 +1349,24 @@ try_handle_plaintext (struct SenderAddress *sender,
                      size_t buf_size)
 {
   const struct GNUNET_MessageHeader *hdr
-    = (const struct GNUNET_MessageHeader *) queue->pread_buf;
+    = (const struct GNUNET_MessageHeader *) buf;
   const struct UDPAck *ack
-    = (const struct UDPAck *) queue->pread_buf;
+    = (const struct UDPAck *) buf;
   uint16_t type;
 
   if (sizeof (*hdr) > buf_size)
     return; /* not even a header */
   if (ntohs (hdr->size) > buf_size)
-    return 0; /* not even a header */
+    return; /* not even a header */
   type = ntohs (hdr->type);
   switch (type)
   {
   case GNUNET_MESSAGE_TYPE_COMMUNICATOR_UDP_ACK:
     /* lookup master secret by 'cmac', then update sequence_max */
-    for (struct SharedSecret *ss = sender->ss_head;
-        NULL != ss;
-        ss = ss->next)
-    {
-      if (0 == memcmp (&ack->cmac,
-                      &ss->cmac,
-                      sizeof (ss->cmac)))
-      {
-       ss->sequence_allowed = GNUNET_MAX (ss->sequence_allowed,
-                                          ntohl (ack->sequence_max));
-       /* move ss to head to avoid discarding it anytime soon! */
-       GNUNET_CONTAINER_DLL_remove (sender->ss_head,
-                                    sender->ss_tail,
-                                    ss);
-       GNUNET_CONTAINER_DLL_insert (sender->ss_head,
-                                    sender->ss_tail,
-                                    ss);
-       break;
-      }
-    }
+    GNUNET_CONTAINER_multipeermap_get_multiple (receivers,
+                                               &sender->target,
+                                               &handle_ack,
+                                               (void *) ack);
     /* There could be more messages after the ACK, handle those as well */
     buf += ntohs (hdr->size);
     buf_size -= ntohs (hdr->size);
@@ -1047,6 +1374,9 @@ try_handle_plaintext (struct SenderAddress *sender,
                            buf,
                            buf_size);
     break;
+  case GNUNET_MESSAGE_TYPE_COMMUNICATOR_UDP_PAD:
+    /* skip padding */
+    break;
   default:
     pass_plaintext_to_core (sender,
                            buf,
@@ -1060,15 +1390,33 @@ try_handle_plaintext (struct SenderAddress *sender,
  * the sender an `struct UDPAck` at the next opportunity to allow the
  * sender to use @a ss longer (assuming we did not yet already
  * recently).
+ *
+ * @param ss shared secret to generate ACKs for
  */
 static void
 consider_ss_ack (struct SharedSecret *ss)
 {
   GNUNET_assert (NULL != ss->sender);
-  for (uint32_t i=1;i<0 /* FIXME: ack-based! */;i++)
-    kce_generate (ss,
-                 i);
-  // FIXME: consider generating ACK and more KCEs for ss!
+  /* drop ancient KeyCacheEntries */
+  while ( (NULL != ss->kce_head) &&
+         (MAX_SQN_DELTA < ss->kce_head->sequence_number - ss->kce_tail->sequence_number) )
+    kce_destroy (ss->kce_tail);
+  if (ss->active_kce_count < KCN_THRESHOLD)
+  {
+    struct UDPAck ack;
+
+    while (ss->active_kce_count < KCN_TARGET)
+      kce_generate (ss,
+                   ++ss->sequence_allowed);
+    ack.header.type = htons (GNUNET_MESSAGE_TYPE_COMMUNICATOR_UDP_ACK);
+    ack.header.size = htons (sizeof (ack));
+    ack.sequence_max = htonl (ss->sequence_allowed);
+    ack.cmac = ss->cmac;
+    GNUNET_TRANSPORT_communicator_notify (ch,
+                                         &ss->sender->target,
+                                         COMMUNICATOR_ADDRESS_PREFIX,
+                                         &ack.header);
+  }
 }
 
 
@@ -1085,7 +1433,6 @@ decrypt_box (const struct UDPBox *box,
             struct KeyCacheEntry *kce)
 {
   struct SharedSecret *ss = kce->ss;
-  gcry_cipher_hd_t cipher;
   char out_buf[box_len - sizeof (*box)];
 
   GNUNET_assert (NULL != ss->sender);
@@ -1093,9 +1440,9 @@ decrypt_box (const struct UDPBox *box,
       try_decrypt (ss,
                   box->gcm_tag,
                   kce->sequence_number,
-                  box_len - sizeof (*box),
-                  out_buf,
-                  sizeof (out_buf)))
+                  (const char *) &box[1],
+                  sizeof (out_buf),
+                  out_buf))
   {
     GNUNET_STATISTICS_update (stats,
                              "# Decryption failures with valid KCE",
@@ -1116,6 +1463,142 @@ decrypt_box (const struct UDPBox *box,
 }
 
 
+/**
+ * Closure for #find_sender_by_address()
+ */
+struct SearchContext
+{
+  /**
+   * Address we are looking for.
+   */
+  const struct sockaddr *address;
+
+  /**
+   * Number of bytes in @e address.
+   */
+  socklen_t address_len;
+
+  /**
+   * Return value to set if we found a match.
+   */
+  struct SenderAddress *sender;
+};
+
+
+/**
+ * Find existing `struct SenderAddress` by matching addresses.
+ *
+ * @param cls a `struct SearchContext`
+ * @param key ignored, must match already
+ * @param value a `struct SenderAddress`
+ * @return #GNUNET_YES if not found (continue to search), #GNUNET_NO if found
+ */
+static int
+find_sender_by_address (void *cls,
+                       const struct GNUNET_PeerIdentity *key,
+                       void *value)
+{
+  struct SearchContext *sc = cls;
+  struct SenderAddress *sender = value;
+
+  if ( (sender->address_len == sc->address_len) &&
+       (0 == memcmp (sender->address,
+                    sc->address,
+                    sender->address_len)) )
+  {
+    sc->sender = sender;
+    return GNUNET_NO; /* stop iterating! */
+  }
+  return GNUNET_YES;
+}
+
+
+/**
+ * Create sender address for @a target.  Note that we
+ * might already have one, so a fresh one is only allocated
+ * if one does not yet exist for @a address.
+ *
+ * @param target peer to generate address for
+ * @param address target address 
+ * @param address_len number of bytes in @a address
+ * @return data structure to keep track of key material for
+ *         decrypting data from @a target
+ */
+static struct SenderAddress *
+setup_sender (const struct GNUNET_PeerIdentity *target,
+             const struct sockaddr *address,
+             socklen_t address_len)
+{
+  struct SenderAddress *sender;
+  struct SearchContext sc = {
+    .address = address,
+    .address_len = address_len,
+    .sender = NULL
+  };
+
+  GNUNET_CONTAINER_multipeermap_get_multiple (senders,
+                                             target,
+                                             &find_sender_by_address,
+                                             &sc);
+  if (NULL != sc.sender)
+  {
+    reschedule_sender_timeout (sc.sender);
+    return sc.sender;
+  }
+  sender = GNUNET_new (struct SenderAddress);
+  sender->target = *target;
+  sender->address = GNUNET_memdup (address,
+                                  address_len);
+  sender->address_len = address_len;
+  (void) GNUNET_CONTAINER_multipeermap_put (senders,
+                                           &sender->target,
+                                           sender,
+                                           GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+  GNUNET_STATISTICS_set (stats,
+                        "# senders active",
+                        GNUNET_CONTAINER_multipeermap_size (receivers),
+                        GNUNET_NO);
+  sender->timeout
+    = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
+  sender->hn = GNUNET_CONTAINER_heap_insert (senders_heap,
+                                            sender,
+                                            sender->timeout.abs_value_us);
+  sender->nt = GNUNET_NT_scanner_get_type (is,
+                                          address,
+                                          address_len);
+  if (NULL == timeout_task)
+    timeout_task = GNUNET_SCHEDULER_add_now (&check_timeouts,
+                                            NULL);
+  return sender;
+}
+
+
+/**
+ * Check signature from @a uc against @a ephemeral.
+ *
+ * @param ephermal key that is signed
+ * @param uc signature of claimant
+ * @return #GNUNET_OK if signature is valid
+ */
+static int
+verify_confirmation (const struct GNUNET_CRYPTO_EcdhePublicKey *ephemeral,
+                    const struct UDPConfirmation *uc)
+{
+  struct UdpHandshakeSignature uhs;
+                       
+  uhs.purpose.purpose = htonl (GNUNET_SIGNATURE_COMMUNICATOR_UDP_HANDSHAKE);
+  uhs.purpose.size = htonl (sizeof (uhs));
+  uhs.sender = uc->sender;
+  uhs.receiver = my_identity;
+  uhs.ephemeral = *ephemeral;
+  uhs.monotonic_time = uc->monotonic_time;
+  return GNUNET_CRYPTO_eddsa_verify (GNUNET_SIGNATURE_COMMUNICATOR_UDP_HANDSHAKE,
+                                    &uhs.purpose,
+                                    &uc->sender_sig,
+                                    &uc->sender.public_key);
+}
+
+
 /**
  * Socket read task. 
  *
@@ -1146,6 +1629,7 @@ sock_read (void *cls)
                         "recv");
     return;
   }
+
   /* first, see if it is a UDPBox */
   if (rcvd > sizeof (struct UDPBox))
   {
@@ -1153,8 +1637,8 @@ sock_read (void *cls)
     struct KeyCacheEntry *kce;
 
     box = (const struct UDPBox *) buf;
-    kce = GNUNET_CONTAINER_multihashmap_get (key_cache,
-                                            &box->kid);
+    kce = GNUNET_CONTAINER_multishortmap_get (key_cache,
+                                             &box->kid);
     if (NULL != kce)
     {
       decrypt_box (box,
@@ -1163,7 +1647,39 @@ sock_read (void *cls)
       return;
     }
   }
-  /* next, test if it is a KX */
+
+  /* next, check if it is a broadcast */
+  if (sizeof (struct UDPBroadcast) == rcvd)
+  {
+    const struct UDPBroadcast *ub;
+    struct UdpBroadcastSignature uhs;
+
+    ub = (const struct UDPBroadcast *) buf;
+    uhs.purpose.purpose = htonl (GNUNET_SIGNATURE_COMMUNICATOR_UDP_BROADCAST);
+    uhs.purpose.size = htonl (sizeof (uhs));
+    uhs.sender = ub->sender;
+    GNUNET_CRYPTO_hash (&sa,
+                       salen,
+                       &uhs.h_address);
+    if (GNUNET_OK ==
+       GNUNET_CRYPTO_eddsa_verify (GNUNET_SIGNATURE_COMMUNICATOR_UDP_BROADCAST,
+                                   &uhs.purpose,
+                                   &ub->sender_sig,
+                                   &ub->sender.public_key))
+    {
+      GNUNET_STATISTICS_update (stats,
+                               "# broadcasts received",
+                               1,
+                               GNUNET_NO);
+      // FIXME #5551: we effectively just got a HELLO!
+      // trigger verification NOW!
+      return;
+    }
+    /* continue with KX, mostly for statistics... */
+  }
+  
+
+  /* finally, test if it is a KX */
   if (rcvd < sizeof (struct UDPConfirmation) + sizeof (struct InitialKX))
   {
     GNUNET_STATISTICS_update (stats,
@@ -1181,13 +1697,13 @@ sock_read (void *cls)
     struct SenderAddress *sender;
 
     kx = (const struct InitialKX *) buf;
-    ss = setup_shared_secret_dec (&kx->ephemral);
+    ss = setup_shared_secret_dec (&kx->ephemeral);
     if (GNUNET_OK !=
        try_decrypt (ss,
-                    0,
                     kx->gcm_tag,
+                    0,
                     &buf[sizeof (*kx)],
-                    (const struct GNUNET_CRYPTO_EcdhePublicKey *) buf,
+                    sizeof (pbuf),
                     pbuf))
     {
       GNUNET_free (ss);
@@ -1210,7 +1726,7 @@ sock_read (void *cls)
                                GNUNET_NO);
       return;
     }
-    calculcate_cmac (ss);
+    calculate_cmac (ss);
     sender = setup_sender (&uc->sender,
                           (const struct sockaddr *) &sa,
                           salen);
@@ -1224,8 +1740,8 @@ sock_read (void *cls)
                              1,
                              GNUNET_NO);
     GNUNET_STATISTICS_update (stats,
-                             "# bytes decrypted without BOX",
-                             sizeof (pbuf) - sizeof (*uc),
+                             "# messages decrypted without BOX",
+                             1,
                              GNUNET_NO);
     try_handle_plaintext (sender,
                          &uc[1],
@@ -1267,10 +1783,12 @@ udp_address_to_sockaddr (const char *bindto,
                  bindto);
       return NULL;
     }
-    if (GNUNET_YES ==
-       GNUNET_CONFIGURATION_get_value_yesno (cfg,
-                                             COMMUNICATOR_CONFIG_SECTION,
-                                             "DISABLE_V6"))
+    if ( (GNUNET_NO ==
+         GNUNET_NETWORK_test_pf (PF_INET6)) ||
+        (GNUNET_YES ==
+         GNUNET_CONFIGURATION_get_value_yesno (cfg,
+                                               COMMUNICATOR_CONFIG_SECTION,
+                                               "DISABLE_V6")) )
     {
       struct sockaddr_in *i4;
       
@@ -1374,26 +1892,41 @@ udp_address_to_sockaddr (const char *bindto,
 }
 
 
-#if 0
 /**
+ * Pad @a dgram by @a pad_size using @a out_cipher.
  *
- * 
+ * @param out_cipher cipher to use
+ * @param dgram datagram to pad
+ * @param pad_size number of bytes of padding to append
  */
 static void
-XXX_write (void *cls)
+do_pad (gcry_cipher_hd_t out_cipher,
+       char *dgram,
+       size_t pad_size)
 {
-  ssize_t sent;
+  char pad[pad_size];
 
-  sent = GNUNET_NETWORK_socket_sendto (udp_sock,
-                                      );
-  if (-1 == sent)
+  GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_WEAK,
+                             pad,
+                             sizeof (pad));
+  if (sizeof (pad) > sizeof (struct GNUNET_MessageHeader))
   {
-    GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING,
-                        "send");
-    return;                     
+    struct GNUNET_MessageHeader hdr = {
+      .size = htons (sizeof (pad)),
+      .type = htons (GNUNET_MESSAGE_TYPE_COMMUNICATOR_UDP_PAD)
+    };
+    
+    memcpy (pad,
+           &hdr,
+           sizeof (hdr));
   }
+  GNUNET_assert (0 ==
+                gcry_cipher_encrypt (out_cipher,
+                                     dgram,
+                                     sizeof (pad),
+                                     pad,
+                                     sizeof (pad)));
 }
-#endif
 
 
 /**
@@ -1413,31 +1946,150 @@ mq_send (struct GNUNET_MQ_Handle *mq,
   uint16_t msize = ntohs (msg->size);
 
   GNUNET_assert (mq == receiver->mq);
-  // FIXME: pick encryption method, encrypt and transmit and call MQ-send-contiue!!
-
-#if 0
-  /* compute 'tc' and append in encrypted format to cwrite_buf */
-  tc.sender = my_identity;
-  tc.monotonic_time = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get_monotonic (cfg));
-  ths.purpose.purpose = htonl (GNUNET_SIGNATURE_COMMUNICATOR_UDP_HANDSHAKE);
-  ths.purpose.size = htonl (sizeof (ths));
-  ths.sender = my_identity;
-  ths.receiver = queue->target;
-  ths.ephemeral = *epub;
-  ths.monotonic_time = tc.monotonic_time;
-  GNUNET_assert (GNUNET_OK ==
-                GNUNET_CRYPTO_eddsa_sign (my_private_key,
-                                          &ths.purpose,
-                                          &tc.sender_sig));
-  GNUNET_assert (0 ==
-                gcry_cipher_encrypt (queue->out_cipher,
-                                     &queue->cwrite_buf[queue->cwrite_off],
-                                     sizeof (tc),
-                                     &tc,
-                                     sizeof (tc)));
-#endif
+  if (msize > receiver->mtu)
+  {
+    GNUNET_break (0);
+    receiver_destroy (receiver);
+    return;
+  }
+  reschedule_receiver_timeout (receiver);
+  
+  if (0 == receiver->acks_available)
+  {
+    /* use KX encryption method */
+    struct UdpHandshakeSignature uhs;
+    struct UDPConfirmation uc;
+    struct InitialKX kx;
+    struct GNUNET_CRYPTO_EcdhePrivateKey epriv;
+    char dgram[receiver->mtu +
+              sizeof (uc) +
+              sizeof (kx)];
+    size_t dpos;
+    gcry_cipher_hd_t out_cipher;
+    struct SharedSecret *ss;
 
+    /* setup key material */
+    GNUNET_assert (GNUNET_OK ==
+                  GNUNET_CRYPTO_ecdhe_key_create2 (&epriv));
+
+    ss = setup_shared_secret_enc (&epriv,
+                                 receiver);
+    setup_cipher (&ss->master,
+                 0,
+                 &out_cipher);
+    /* compute 'uc' */
+    uc.sender = my_identity;
+    uc.monotonic_time = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get_monotonic (cfg));
+    uhs.purpose.purpose = htonl (GNUNET_SIGNATURE_COMMUNICATOR_UDP_HANDSHAKE);
+    uhs.purpose.size = htonl (sizeof (uhs));
+    uhs.sender = my_identity;
+    uhs.receiver = receiver->target;
+    GNUNET_CRYPTO_ecdhe_key_get_public (&epriv,
+                                       &uhs.ephemeral);
+    uhs.monotonic_time = uc.monotonic_time;
+    GNUNET_assert (GNUNET_OK ==
+                  GNUNET_CRYPTO_eddsa_sign (my_private_key,
+                                            &uhs.purpose,
+                                            &uc.sender_sig));
+    /* Leave space for kx */
+    dpos = sizeof (struct GNUNET_CRYPTO_EcdhePublicKey);
+    /* Append encrypted uc to dgram */
+    GNUNET_assert (0 ==
+                  gcry_cipher_encrypt (out_cipher,
+                                       &dgram[dpos],
+                                       sizeof (uc),
+                                       &uc,
+                                       sizeof (uc)));
+    dpos += sizeof (uc);
+    /* Append encrypted payload to dgram */
+    GNUNET_assert (0 ==
+                  gcry_cipher_encrypt (out_cipher,
+                                       &dgram[dpos],
+                                       msize,
+                                       msg,
+                                       msize));
+    dpos += msize;
+    do_pad (out_cipher,
+           &dgram[dpos],
+           sizeof (dgram) - dpos);
+    /* Datagram starts with kx */
+    kx.ephemeral = uhs.ephemeral;
+    GNUNET_assert (0 ==
+                  gcry_cipher_gettag (out_cipher,
+                                      kx.gcm_tag,
+                                      sizeof (kx.gcm_tag)));
+    gcry_cipher_close (out_cipher);
+    memcpy (dgram,
+           &kx,
+           sizeof (kx));
+    if (-1 ==
+       GNUNET_NETWORK_socket_sendto (udp_sock,
+                                     dgram,
+                                     sizeof (dgram),
+                                     receiver->address,
+                                     receiver->address_len))
+      GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING,
+                          "send");
+    GNUNET_MQ_impl_send_continue (mq);
+    return;
+  } /* End of KX encryption method */
 
+  /* begin "BOX" encryption method, scan for ACKs from tail! */
+  for (struct SharedSecret *ss = receiver->ss_tail;
+       NULL != ss;
+       ss = ss->prev)
+  {
+    if (ss->sequence_used < ss->sequence_allowed)
+    {
+      char dgram[sizeof (struct UDPBox) + receiver->mtu];
+      struct UDPBox *box;
+      gcry_cipher_hd_t out_cipher;
+      size_t dpos;
+
+      box = (struct UDPBox *) dgram;
+      ss->sequence_used++;
+      get_kid (&ss->master,
+              ss->sequence_used,
+              &box->kid);
+      setup_cipher (&ss->master,
+                   ss->sequence_used,
+                   &out_cipher);
+      /* Append encrypted payload to dgram */
+      dpos = sizeof (struct UDPBox);
+      GNUNET_assert (0 ==
+                    gcry_cipher_encrypt (out_cipher,
+                                         &dgram[dpos],
+                                         msize,
+                                         msg,
+                                         msize));
+      dpos += msize;
+      do_pad (out_cipher,
+             &dgram[dpos],
+             sizeof (dgram) - dpos);
+      GNUNET_assert (0 ==
+                    gcry_cipher_gettag (out_cipher,
+                                        box->gcm_tag,
+                                        sizeof (box->gcm_tag)));
+      gcry_cipher_close (out_cipher);
+      if (-1 ==
+         GNUNET_NETWORK_socket_sendto (udp_sock,
+                                       dgram,
+                                       sizeof (dgram),
+                                       receiver->address,
+                                       receiver->address_len))
+       GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING,
+                            "send");
+      GNUNET_MQ_impl_send_continue (mq);
+      receiver->acks_available--;
+      if (0 == receiver->acks_available)
+      {
+       /* We have no more ACKs => MTU change! */
+       setup_receiver_mq (receiver);
+      }
+      return;
+    }
+  }
+  GNUNET_assert (0);
 }
 
 
@@ -1501,6 +2153,77 @@ mq_error (void *cls,
 }
 
 
+/**
+ * Setup the MQ for the @a receiver.  If a queue exists,
+ * the existing one is destroyed.  Then the MTU is
+ * recalculated and a fresh queue is initialized.
+ *
+ * @param receiver receiver to setup MQ for
+ */ 
+static void
+setup_receiver_mq (struct ReceiverAddress *receiver)
+{
+  size_t base_mtu;
+  
+  if (NULL != receiver->qh)
+  {
+    GNUNET_TRANSPORT_communicator_mq_del (receiver->qh);
+    receiver->qh = NULL;
+  }
+  GNUNET_assert (NULL == receiver->mq);
+  switch (receiver->address->sa_family)
+  {
+  case AF_INET:
+    base_mtu
+      = 1480 /* Ethernet MTU, 1500 - Ethernet header - VLAN tag */
+      - sizeof (struct GNUNET_TUN_IPv4Header) /* 20 */
+      - sizeof (struct GNUNET_TUN_UdpHeader) /* 8 */;
+    break;
+  case AF_INET6:
+    base_mtu
+      = 1280 /* Minimum MTU required by IPv6 */
+      - sizeof (struct GNUNET_TUN_IPv6Header) /* 40 */
+      - sizeof (struct GNUNET_TUN_UdpHeader) /* 8 */;
+    break;
+  default:
+    GNUNET_assert (0);
+    break;
+  }
+  if (0 == receiver->acks_available)
+  {
+    /* MTU based on full KX messages */
+    receiver->mtu
+      = base_mtu
+      - sizeof (struct InitialKX) /* 48 */
+      - sizeof (struct UDPConfirmation); /* 104 */
+  }
+  else
+  {
+    /* MTU based on BOXed messages */
+    receiver->mtu
+      = base_mtu - sizeof (struct UDPBox);
+  }
+  /* => Effective MTU for CORE will range from 1080 (IPv6 + KX) to
+     1404 (IPv4 + Box) bytes, depending on circumstances... */
+  receiver->mq
+    = GNUNET_MQ_queue_for_callbacks (&mq_send,
+                                    &mq_destroy,
+                                    &mq_cancel,
+                                    receiver,
+                                    NULL,
+                                    &mq_error,
+                                    receiver);
+  receiver->qh
+    = GNUNET_TRANSPORT_communicator_mq_add (ch,
+                                           &receiver->target,
+                                           receiver->foreign_addr,
+                                           receiver->mtu,
+                                           receiver->nt,
+                                           GNUNET_TRANSPORT_CS_OUTBOUND,
+                                           receiver->mq);
+}
+
+
 /**
  * Setup a receiver for transmission.  Setup the MQ processing and
  * inform transport that the queue is ready. 
@@ -1509,7 +2232,7 @@ mq_error (void *cls,
  */ 
 static struct ReceiverAddress *
 receiver_setup (const struct GNUNET_PeerIdentity *target,
-               const struct sockddr *address,
+               const struct sockaddr *address,
                socklen_t address_len)
 {
   struct ReceiverAddress *receiver;
@@ -1526,53 +2249,41 @@ receiver_setup (const struct GNUNET_PeerIdentity *target,
                                            &receiver->target,
                                            receiver,
                                            GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
-  // FIXME: add to receiver heap!
+  receiver->timeout
+    = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
+  receiver->hn = GNUNET_CONTAINER_heap_insert (receivers_heap,
+                                              receiver,
+                                              receiver->timeout.abs_value_us);
   GNUNET_STATISTICS_set (stats,
                         "# receivers active",
                         GNUNET_CONTAINER_multipeermap_size (receivers),
                         GNUNET_NO);
-  receiver->timeout
-    = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
-  receiver->mq
-    = GNUNET_MQ_queue_for_callbacks (&mq_send,
-                                    &mq_destroy,
-                                    &mq_cancel,
-                                    receiver,
-                                    NULL,
-                                    &mq_error,
-                                    receiver);
+  switch (address->sa_family)
   {
-    char *foreign_addr;
-
-    switch (address->sa_family)
-    {
-    case AF_INET:
-      GNUNET_asprintf (&foreign_addr,
-                      "%s-%s",
-                      COMMUNICATOR_ADDRESS_PREFIX,
-                      GNUNET_a2s(queue->address,
-                                 queue->address_len));
-      break;
-    case AF_INET6:
-      GNUNET_asprintf (&foreign_addr,
-                      "%s-%s",
-                      COMMUNICATOR_ADDRESS_PREFIX,
-                      GNUNET_a2s(queue->address,
-                                 queue->address_len));
-      break;
-    default:
-      GNUNET_assert (0);
-    }
-    queue->qh
-      = GNUNET_TRANSPORT_communicator_mq_add (ch,
-                                             &receiver->target,
-                                             foreign_addr,
-                                             1200 /* FIXME: MTU OK? */,
-                                             queue->nt,
-                                             GNUNET_TRANSPORT_CS_OUTBOUND,
-                                             queue->mq);
-    GNUNET_free (foreign_addr);
+  case AF_INET:
+    GNUNET_asprintf (&receiver->foreign_addr,
+                    "%s-%s",
+                    COMMUNICATOR_ADDRESS_PREFIX,
+                    GNUNET_a2s (receiver->address,
+                                receiver->address_len));
+    break;
+  case AF_INET6:
+    GNUNET_asprintf (&receiver->foreign_addr,
+                    "%s-%s",
+                    COMMUNICATOR_ADDRESS_PREFIX,
+                    GNUNET_a2s (receiver->address,
+                                receiver->address_len));
+    break;
+  default:
+    GNUNET_assert (0);
   }
+
+  setup_receiver_mq (receiver);
+
+  if (NULL == timeout_task)
+    timeout_task = GNUNET_SCHEDULER_add_now (&check_timeouts,
+                                            NULL);
+  return receiver;
 }
 
 
@@ -1616,6 +2327,7 @@ mq_init (void *cls,
   receiver = receiver_setup (peer,
                             in,
                             in_len);
+  (void) receiver;
   return GNUNET_OK;  
 }
 
@@ -1651,9 +2363,9 @@ get_receiver_delete_it (void *cls,
  * @return #GNUNET_OK to continue to iterate
  */
 static int
-get_receiver_delete_it (void *cls,
-                       const struct GNUNET_PeerIdentity *target,
-                       void *value)
+get_sender_delete_it (void *cls,
+                     const struct GNUNET_PeerIdentity *target,
+                     void *value)
 {
   struct SenderAddress *sender = value;
 
@@ -1677,6 +2389,13 @@ do_shutdown (void *cls)
      GNUNET_NAT_unregister (nat);
      nat = NULL;
   }
+  while (NULL != bi_head)
+    bi_destroy (bi_head);
+  if (NULL != broadcast_task)
+  {
+    GNUNET_SCHEDULER_cancel (broadcast_task);
+    broadcast_task = NULL;
+  }
   if (NULL != read_task)
   {
     GNUNET_SCHEDULER_cancel (read_task);
@@ -1724,13 +2443,11 @@ do_shutdown (void *cls)
 
 
 /**
- * Function called when the transport service has received an
- * acknowledgement for this communicator (!) via a different return
- * path.
- *
- * Not applicable for UDP.
+ * Function called when the transport service has received a
+ * backchannel message for this communicator (!) via a different return
+ * path. Should be an acknowledgement.
  *
- * @param cls closure
+ * @param cls closure, NULL
  * @param sender which peer sent the notification
  * @param msg payload
  */
@@ -1739,10 +2456,20 @@ enc_notify_cb (void *cls,
                const struct GNUNET_PeerIdentity *sender,
                const struct GNUNET_MessageHeader *msg)
 {
+  const struct UDPAck *ack;
+  
   (void) cls;
-  (void) sender;
-  (void) msg;
-  GNUNET_break_op (0);
+  if ( (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_COMMUNICATOR_UDP_ACK) ||
+       (ntohs (msg->size) != sizeof (struct UDPAck)) )
+  {
+    GNUNET_break_op (0);
+    return;
+  }
+  ack = (const struct UDPAck *) msg;
+  GNUNET_CONTAINER_multipeermap_get_multiple (receivers,
+                                             sender,
+                                             &handle_ack,
+                                             (void *) ack);
 }
 
 
@@ -1798,6 +2525,240 @@ nat_address_cb (void *cls,
 }
 
 
+/**
+ * Broadcast our presence on one of our interfaces.
+ *
+ * @param cls a `struct BroadcastInterface`
+ */
+static void
+ifc_broadcast (void *cls)
+{
+  struct BroadcastInterface *bi = cls;
+  struct GNUNET_TIME_Relative delay;
+
+  delay = BROADCAST_FREQUENCY;
+  delay.rel_value_us = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
+                                                delay.rel_value_us);
+  bi->broadcast_task
+    = GNUNET_SCHEDULER_add_delayed (INTERFACE_SCAN_FREQUENCY,
+                                   &ifc_broadcast,
+                                   bi);
+  
+  switch (bi->sa->sa_family) {
+  case AF_INET:
+    {
+      static int yes = 1;
+      static int no = 0;
+      ssize_t sent;
+    
+      if (GNUNET_OK !=
+         GNUNET_NETWORK_socket_setsockopt (udp_sock,
+                                           SOL_SOCKET,
+                                           SO_BROADCAST,
+                                           &yes,
+                                           sizeof (int)))
+       GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING,
+                            "setsockopt");
+      sent = GNUNET_NETWORK_socket_sendto (udp_sock,
+                                          &bi->bcm,
+                                          sizeof (bi->bcm),
+                                          bi->ba,
+                                          bi->salen);
+      if (-1 == sent)
+       GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING,
+                            "sendto");
+      if (GNUNET_OK !=
+         GNUNET_NETWORK_socket_setsockopt (udp_sock,
+                                           SOL_SOCKET,
+                                           SO_BROADCAST,
+                                           &no,
+                                           sizeof (int)))
+       GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING,
+                            "setsockopt");
+      break;
+    }
+  case AF_INET6:
+    {
+      ssize_t sent;
+      struct sockaddr_in6 dst;
+
+      dst.sin6_family = AF_INET6;
+      dst.sin6_port = htons (my_port);
+      dst.sin6_addr = bi->mcreq.ipv6mr_multiaddr;
+      dst.sin6_scope_id = ((struct sockaddr_in6*) bi->ba)->sin6_scope_id;
+
+      sent = GNUNET_NETWORK_socket_sendto (udp_sock,
+                                          &bi->bcm,
+                                          sizeof (bi->bcm),
+                                          (const struct sockaddr *)
+                                          &dst,
+                                          sizeof (dst));
+      if (-1 == sent)
+       GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING,
+                            "sendto");
+      break;
+    }
+  default:
+    GNUNET_break (0);
+    break;
+  }
+}
+
+
+/**
+ * Callback function invoked for each interface found.
+ * Activates/deactivates broadcast interfaces.
+ *
+ * @param cls NULL
+ * @param name name of the interface (can be NULL for unknown)
+ * @param isDefault is this presumably the default interface
+ * @param addr address of this interface (can be NULL for unknown or unassigned)
+ * @param broadcast_addr the broadcast address (can be NULL for unknown or unassigned)
+ * @param netmask the network mask (can be NULL for unknown or unassigned)
+ * @param addrlen length of the address
+ * @return #GNUNET_OK to continue iteration, #GNUNET_SYSERR to abort
+ */
+static int
+iface_proc (void *cls,
+            const char *name,
+            int isDefault,
+            const struct sockaddr *addr,
+            const struct sockaddr *broadcast_addr,
+            const struct sockaddr *netmask, socklen_t addrlen)
+{
+  struct BroadcastInterface *bi;
+  enum GNUNET_NetworkType network;
+  struct UdpBroadcastSignature ubs;
+
+  (void) cls;
+  (void) netmask;
+  network = GNUNET_NT_scanner_get_type (is,
+                                       addr,
+                                       addrlen);
+  if (GNUNET_NT_LOOPBACK == network)
+  {
+    /* Broadcasting on loopback does not make sense */
+    return GNUNET_YES;
+  }
+  if (NULL == addr)
+    return GNUNET_YES; /* need to know our address! */
+  for (bi = bi_head; NULL != bi; bi = bi->next)
+  {
+    if ( (bi->salen == addrlen) &&
+        (0 == memcmp (addr,
+                      bi->sa,
+                      addrlen)) )
+    {
+      bi->found = GNUNET_YES;
+      return GNUNET_OK;
+    }
+  }
+
+  if ( (AF_INET6 == addr->sa_family) &&
+       (NULL == broadcast_addr) )
+    return GNUNET_OK; /* broadcast_addr is required for IPv6! */
+  if ( (AF_INET6 == addr->sa_family) &&
+       (GNUNET_YES != have_v6_socket) )
+    return GNUNET_OK; /* not using IPv6 */
+  
+  bi = GNUNET_new (struct BroadcastInterface);
+  bi->sa = GNUNET_memdup (addr,
+                         addrlen);
+  if (NULL != broadcast_addr)
+    bi->ba = GNUNET_memdup (broadcast_addr,
+                           addrlen);
+  bi->salen = addrlen;
+  bi->found = GNUNET_YES;
+  bi->bcm.sender = my_identity;
+  ubs.purpose.purpose = htonl (GNUNET_SIGNATURE_COMMUNICATOR_UDP_BROADCAST);
+  ubs.purpose.size = htonl (sizeof (ubs));
+  ubs.sender = my_identity;
+  GNUNET_CRYPTO_hash (addr,
+                     addrlen,
+                     &ubs.h_address);
+  GNUNET_assert (GNUNET_OK ==
+                GNUNET_CRYPTO_eddsa_sign (my_private_key,
+                                          &ubs.purpose,
+                                          &bi->bcm.sender_sig));
+  bi->broadcast_task = GNUNET_SCHEDULER_add_now (&ifc_broadcast,
+                                                bi);
+  GNUNET_CONTAINER_DLL_insert (bi_head,
+                              bi_tail,
+                              bi);
+  if ( (AF_INET6 == addr->sa_family) &&
+       (NULL != broadcast_addr) )
+  {
+    /* Create IPv6 multicast request */
+    const struct sockaddr_in6 *s6
+      = (const struct sockaddr_in6 *) broadcast_addr;
+
+    GNUNET_assert (1 ==
+                   inet_pton (AF_INET6,
+                             "FF05::13B",
+                              &bi->mcreq.ipv6mr_multiaddr));
+    
+    /* http://tools.ietf.org/html/rfc2553#section-5.2:
+     *
+     * IPV6_JOIN_GROUP
+     *
+     * Join a multicast group on a specified local interface.  If the
+     * interface index is specified as 0, the kernel chooses the local
+     * interface.  For example, some kernels look up the multicast
+     * group in the normal IPv6 routing table and using the resulting
+     * interface; we do this for each interface, so no need to use
+     * zero (anymore...).
+     */
+    bi->mcreq.ipv6mr_interface = s6->sin6_scope_id;
+
+    /* Join the multicast group */
+    if (GNUNET_OK !=
+        GNUNET_NETWORK_socket_setsockopt
+        (udp_sock,
+        IPPROTO_IPV6,
+        IPV6_JOIN_GROUP,
+         &bi->mcreq,
+        sizeof (bi->mcreq)))
+    {
+      GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING,
+                          "setsockopt");
+    }
+  }
+  return GNUNET_OK;
+}
+
+
+/**
+ * Scan interfaces to broadcast our presence on the LAN.
+ *
+ * @param cls NULL, unused
+ */
+static void
+do_broadcast (void *cls)
+{
+  struct BroadcastInterface *bin;
+  
+  (void) cls;
+  for (struct BroadcastInterface *bi = bi_head;
+       NULL != bi;
+       bi = bi->next)
+    bi->found = GNUNET_NO;
+  GNUNET_OS_network_interfaces_list (&iface_proc,
+                                    NULL);
+  for (struct BroadcastInterface *bi = bi_head;
+       NULL != bi;
+       bi = bin)
+  {
+    bin = bi->next;
+    if (GNUNET_NO == bi->found)
+      bi_destroy (bi);
+  }
+  broadcast_task
+    = GNUNET_SCHEDULER_add_delayed (INTERFACE_SCAN_FREQUENCY,
+                                   &do_broadcast,
+                                   NULL);
+}
+
+
 /**
  * Setup communicator and launch network interactions.
  *
@@ -1831,12 +2792,6 @@ run (void *cls,
                                "BINDTO");
     return;
   }
-  if (GNUNET_OK !=
-      GNUNET_CONFIGURATION_get_value_number (cfg,
-                                            COMMUNICATOR_CONFIG_SECTION,
-                                            "MAX_QUEUE_LENGTH",
-                                            &max_queue_length))
-    max_queue_length = DEFAULT_MAX_QUEUE_LENGTH;
 
   in = udp_address_to_sockaddr (bindto,
                                &in_len);
@@ -1859,6 +2814,8 @@ run (void *cls,
     GNUNET_free (bindto);
     return;
   }
+  if (AF_INET6 == in->sa_family)
+    have_v6_socket = GNUNET_YES;
   if (GNUNET_OK !=
       GNUNET_NETWORK_socket_bind (udp_sock,
                                   in,
@@ -1868,7 +2825,7 @@ run (void *cls,
                              "bind",
                              bindto);
     GNUNET_NETWORK_socket_close (udp_sock);
-    listen_sock = NULL;
+    udp_sock = NULL;
     GNUNET_free (in);
     GNUNET_free (bindto);
     return;
@@ -1893,8 +2850,28 @@ run (void *cls,
              "Bound to `%s'\n",
              GNUNET_a2s ((const struct sockaddr *) &in_sto,
                          sto_len));
+  switch (in->sa_family)
+  {
+  case AF_INET:
+    my_port = ntohs (((struct sockaddr_in *) in)->sin_port);
+    break;
+  case AF_INET6:
+    my_port = ntohs (((struct sockaddr_in6 *) in)->sin6_port);
+    break;
+  default:
+    GNUNET_break (0);
+    my_port = 0;
+  }
   stats = GNUNET_STATISTICS_create ("C-UDP",
                                    cfg);
+  senders = GNUNET_CONTAINER_multipeermap_create (32,
+                                                 GNUNET_YES);
+  receivers = GNUNET_CONTAINER_multipeermap_create (32,
+                                                   GNUNET_YES);
+  senders_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
+  receivers_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
+  key_cache = GNUNET_CONTAINER_multishortmap_create (1024,
+                                                    GNUNET_YES);
   GNUNET_SCHEDULER_add_shutdown (&do_shutdown,
                                 NULL);
   is = GNUNET_NT_scanner_init ();
@@ -1908,17 +2885,11 @@ run (void *cls,
   }
   GNUNET_CRYPTO_eddsa_key_get_public (my_private_key,
                                       &my_identity.public_key);
-  /* start listening */
+  /* start reading */
   read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
                                             udp_sock,
                                             &sock_read,
                                             NULL);
-  senders = GNUNET_CONTAINER_multipeermap_create (32,
-                                                 GNUNET_YES);
-  receivers = GNUNET_CONTAINER_multipeermap_create (32,
-                                                   GNUNET_YES);
-  key_cache = GNUNET_CONTAINER_multishortmap_create (1024,
-                                                    GNUNET_YES);
   ch = GNUNET_TRANSPORT_communicator_connect (cfg,
                                              COMMUNICATOR_CONFIG_SECTION,
                                              COMMUNICATOR_ADDRESS_PREFIX,
@@ -1933,6 +2904,15 @@ run (void *cls,
     GNUNET_SCHEDULER_shutdown ();
     return;
   }
+  /* start broadcasting */
+  if (GNUNET_YES !=
+      GNUNET_CONFIGURATION_get_value_yesno (cfg,
+                                           COMMUNICATOR_CONFIG_SECTION,
+                                           "DISABLE_BROADCAST"))
+  {
+    broadcast_task = GNUNET_SCHEDULER_add_now (&do_broadcast,
+                                              NULL);
+  }  
   nat = GNUNET_NAT_register (cfg,
                             COMMUNICATOR_CONFIG_SECTION,
                             IPPROTO_UDP,