towards fixing #3363: replacing old iteration API with new monitoring API for core...
[oweals/gnunet.git] / src / core / core_api.c
index 2b1291d0819735c0efef433cc59c6960e45cfea4..7818a60a3bdebf197641db202aac2500e3e7aa1a 100644 (file)
@@ -25,6 +25,7 @@
  * @author Christian Grothoff
  */
 #include "platform.h"
+#include "gnunet_util_lib.h"
 #include "gnunet_constants.h"
 #include "gnunet_core_service.h"
 #include "core.h"
@@ -70,7 +71,7 @@ struct GNUNET_CORE_TransmitHandle
   /**
    * How important is this message?
    */
-  uint32_t priority;
+  enum GNUNET_CORE_Priority priority;
 
   /**
    * Size of this request.
@@ -144,6 +145,13 @@ struct PeerRecord
 
 };
 
+struct CoreMQState
+{
+  struct GNUNET_PeerIdentity target;
+  struct GNUNET_CORE_Handle *core;
+  struct GNUNET_CORE_TransmitHandle *th;
+};
+
 
 /**
  * Type of function called upon completion.
@@ -277,7 +285,7 @@ struct GNUNET_CORE_Handle
    * Hash map listing all of the peers that we are currently
    * connected to.
    */
-  struct GNUNET_CONTAINER_MultiHashMap *peers;
+  struct GNUNET_CONTAINER_MultiPeerMap *peers;
 
   /**
    * Identity of this peer.
@@ -333,7 +341,7 @@ reconnect (struct GNUNET_CORE_Handle *h);
 /**
  * Task schedule to try to re-connect to core.
  *
- * @param cls the 'struct GNUNET_CORE_Handle'
+ * @param cls the `struct GNUNET_CORE_Handle`
  * @param tc task context
  */
 static void
@@ -342,7 +350,8 @@ reconnect_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
   struct GNUNET_CORE_Handle *h = cls;
 
   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to CORE service after delay\n");
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Connecting to CORE service after delay\n");
   reconnect (h);
 }
 
@@ -351,13 +360,14 @@ reconnect_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
  * Notify clients about disconnect and free
  * the entry for connected peer.
  *
- * @param cls the 'struct GNUNET_CORE_Handle*'
+ * @param cls the `struct GNUNET_CORE_Handle *`
  * @param key the peer identity (not used)
- * @param value the 'struct PeerRecord' to free.
- * @return GNUNET_YES (continue)
+ * @param value the `struct PeerRecord` to free.
+ * @return #GNUNET_YES (continue)
  */
 static int
-disconnect_and_free_peer_entry (void *cls, const struct GNUNET_HashCode * key,
+disconnect_and_free_peer_entry (void *cls,
+                               const struct GNUNET_PeerIdentity *key,
                                 void *value)
 {
   struct GNUNET_CORE_Handle *h = cls;
@@ -389,7 +399,7 @@ disconnect_and_free_peer_entry (void *cls, const struct GNUNET_HashCode * key,
   }
   /* done with 'voluntary' cleanups, now on to normal freeing */
   GNUNET_assert (GNUNET_YES ==
-                 GNUNET_CONTAINER_multihashmap_remove (h->peers, key, pr));
+                 GNUNET_CONTAINER_multipeermap_remove (h->peers, key, pr));
   GNUNET_assert (pr->ch == h);
   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == pr->timeout_task);
   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == pr->ntr_task);
@@ -424,7 +434,8 @@ reconnect_later (struct GNUNET_CORE_Handle *h)
   h->currently_down = GNUNET_YES;
   GNUNET_assert (h->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
   h->reconnect_task =
-      GNUNET_SCHEDULER_add_delayed (h->retry_backoff, &reconnect_task, h);
+      GNUNET_SCHEDULER_add_delayed (h->retry_backoff,
+                                    &reconnect_task, h);
   while (NULL != (cm = h->control_pending_head))
   {
     GNUNET_CONTAINER_DLL_remove (h->control_pending_head,
@@ -435,7 +446,7 @@ reconnect_later (struct GNUNET_CORE_Handle *h)
       cm->cont (cm->cont_cls, GNUNET_NO);
     GNUNET_free (cm);
   }
-  GNUNET_CONTAINER_multihashmap_iterate (h->peers,
+  GNUNET_CONTAINER_multipeermap_iterate (h->peers,
                                          &disconnect_and_free_peer_entry, h);
   while (NULL != (pr = h->ready_peer_head))
     GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, h->ready_peer_tail, pr);
@@ -463,7 +474,8 @@ trigger_next_request (struct GNUNET_CORE_Handle *h, int ignore_currently_down);
  * @param tc context, can be NULL (!)
  */
 static void
-transmission_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
+transmission_timeout (void *cls,
+                      const struct GNUNET_SCHEDULER_TaskContext *tc);
 
 
 /**
@@ -505,7 +517,7 @@ request_next_transmission (struct PeerRecord *pr)
   smr = (struct SendMessageRequest *) &cm[1];
   smr->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND_REQUEST);
   smr->header.size = htons (sizeof (struct SendMessageRequest));
-  smr->priority = htonl (th->priority);
+  smr->priority = htonl ((uint32_t) th->priority);
   smr->deadline = GNUNET_TIME_absolute_hton (th->timeout);
   smr->peer = pr->peer;
   smr->reserved = htonl (0);
@@ -528,7 +540,8 @@ request_next_transmission (struct PeerRecord *pr)
  * @param tc context, can be NULL (!)
  */
 static void
-transmission_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+transmission_timeout (void *cls,
+                      const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
   struct PeerRecord *pr = cls;
   struct GNUNET_CORE_Handle *h = pr->ch;
@@ -568,10 +581,10 @@ transmission_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 /**
  * Transmit the next message to the core service.
  *
- * @param cls closure with the 'struct GNUNET_CORE_Handle'
- * @param size number of bytes available in buf
+ * @param cls closure with the `struct GNUNET_CORE_Handle`
+ * @param size number of bytes available in @a buf
  * @param buf where the callee should write the message
- * @return number of bytes written to buf 
+ * @return number of bytes written to @a buf
  */
 static size_t
 transmit_message (void *cls, size_t size, void *buf)
@@ -640,7 +653,7 @@ transmit_message (void *cls, size_t size, void *buf)
        GNUNET_i2s (&pr->peer), (unsigned int) th->msize);
   sm = (struct SendMessage *) buf;
   sm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND);
-  sm->priority = htonl (th->priority);
+  sm->priority = htonl ((uint32_t) th->priority);
   sm->deadline = GNUNET_TIME_absolute_hton (th->timeout);
   sm->peer = pr->peer;
   sm->cork = htonl ((uint32_t) th->cork);
@@ -648,7 +661,7 @@ transmit_message (void *cls, size_t size, void *buf)
   ret =
     th->get_message (th->get_message_cls,
                     size - sizeof (struct SendMessage), &sm[1]);
-  
+
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Transmitting SEND request to `%s' yielded %u bytes.\n",
        GNUNET_i2s (&pr->peer), ret);
@@ -687,7 +700,8 @@ transmit_message (void *cls, size_t size, void *buf)
  * @param ignore_currently_down transmit message even if not initialized?
  */
 static void
-trigger_next_request (struct GNUNET_CORE_Handle *h, int ignore_currently_down)
+trigger_next_request (struct GNUNET_CORE_Handle *h,
+                      int ignore_currently_down)
 {
   uint16_t msize;
 
@@ -725,11 +739,12 @@ trigger_next_request (struct GNUNET_CORE_Handle *h, int ignore_currently_down)
 /**
  * Handler for notification messages received from the core.
  *
- * @param cls our "struct GNUNET_CORE_Handle"
+ * @param cls our `struct GNUNET_CORE_Handle`
  * @param msg the message received from the core service
  */
 static void
-main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg)
+main_notify_handler (void *cls,
+                     const struct GNUNET_MessageHeader *msg)
 {
   struct GNUNET_CORE_Handle *h = cls;
   const struct InitReplyMessage *m;
@@ -739,7 +754,6 @@ main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg)
   const struct GNUNET_MessageHeader *em;
   const struct SendMessageReady *smr;
   const struct GNUNET_CORE_MessageHandler *mh;
-  const struct GNUNET_ATS_Information *ats;
   GNUNET_CORE_StartupCallback init;
   struct PeerRecord *pr;
   struct GNUNET_CORE_TransmitHandle *th;
@@ -747,8 +761,6 @@ main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg)
   int trigger;
   uint16_t msize;
   uint16_t et;
-  uint32_t ats_count;
-
   if (NULL == msg)
   {
     LOG (GNUNET_ERROR_TYPE_INFO,
@@ -785,7 +797,7 @@ main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg)
       h->init = NULL;
       LOG (GNUNET_ERROR_TYPE_DEBUG, "Connected to core service of peer `%s'.\n",
            GNUNET_i2s (&h->me));
-      init (h->cls, h, &h->me);
+      init (h->cls, &h->me);
     }
     else
     {
@@ -793,17 +805,17 @@ main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg)
            "Successfully reconnected to core service.\n");
     }
     /* fake 'connect to self' */
-    pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &h->me.hashPubKey);
+    pr = GNUNET_CONTAINER_multipeermap_get (h->peers, &h->me);
     GNUNET_assert (NULL == pr);
-    pr = GNUNET_malloc (sizeof (struct PeerRecord));
+    pr = GNUNET_new (struct PeerRecord);
     pr->peer = h->me;
     pr->ch = h;
     GNUNET_assert (GNUNET_YES ==
-                   GNUNET_CONTAINER_multihashmap_put (h->peers,
-                                                      &h->me.hashPubKey, pr,
+                   GNUNET_CONTAINER_multipeermap_put (h->peers,
+                                                      &h->me, pr,
                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
     if (NULL != h->connects)
-      h->connects (h->cls, &h->me, NULL, 0);
+      h->connects (h->cls, &h->me);
     break;
   case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT:
     if (msize < sizeof (struct ConnectNotifyMessage))
@@ -813,10 +825,8 @@ main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg)
       return;
     }
     cnm = (const struct ConnectNotifyMessage *) msg;
-    ats_count = ntohl (cnm->ats_count);
     if (msize !=
-        sizeof (struct ConnectNotifyMessage) +
-        ats_count * sizeof (struct GNUNET_ATS_Information))
+        sizeof (struct ConnectNotifyMessage))
     {
       GNUNET_break (0);
       reconnect_later (h);
@@ -831,23 +841,22 @@ main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg)
       GNUNET_break (0);
       return;
     }
-    pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &cnm->peer.hashPubKey);
+    pr = GNUNET_CONTAINER_multipeermap_get (h->peers, &cnm->peer);
     if (NULL != pr)
     {
       GNUNET_break (0);
       reconnect_later (h);
       return;
     }
-    pr = GNUNET_malloc (sizeof (struct PeerRecord));
+    pr = GNUNET_new (struct PeerRecord);
     pr->peer = cnm->peer;
     pr->ch = h;
     GNUNET_assert (GNUNET_YES ==
-                   GNUNET_CONTAINER_multihashmap_put (h->peers,
-                                                      &cnm->peer.hashPubKey, pr,
+                   GNUNET_CONTAINER_multipeermap_put (h->peers,
+                                                      &cnm->peer, pr,
                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
-    ats = (const struct GNUNET_ATS_Information *) &cnm[1];
     if (NULL != h->connects)
-      h->connects (h->cls, &cnm->peer, ats, ats_count);
+      h->connects (h->cls, &cnm->peer);
     break;
   case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_DISCONNECT:
     if (msize != sizeof (struct DisconnectNotifyMessage))
@@ -867,7 +876,7 @@ main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg)
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "Received notification about disconnect from `%s'.\n",
          GNUNET_i2s (&dnm->peer));
-    pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &dnm->peer.hashPubKey);
+    pr = GNUNET_CONTAINER_multipeermap_get (h->peers, &dnm->peer);
     if (NULL == pr)
     {
       GNUNET_break (0);
@@ -876,7 +885,7 @@ main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg)
     }
     trigger = ((pr->prev != NULL) || (pr->next != NULL) ||
                (h->ready_peer_head == pr));
-    disconnect_and_free_peer_entry (h, &dnm->peer.hashPubKey, pr);
+    disconnect_and_free_peer_entry (h, &dnm->peer, pr);
     if (trigger)
       trigger_next_request (h, GNUNET_NO);
     break;
@@ -888,25 +897,21 @@ main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg)
       return;
     }
     ntm = (const struct NotifyTrafficMessage *) msg;
-    ats_count = ntohl (ntm->ats_count);
     if ((msize <
          sizeof (struct NotifyTrafficMessage) +
-         ats_count * sizeof (struct GNUNET_ATS_Information) +
          sizeof (struct GNUNET_MessageHeader)) )
     {
       GNUNET_break (0);
       reconnect_later (h);
       return;
     }
-    ats = (const struct GNUNET_ATS_Information*) &ntm[1];
-    em = (const struct GNUNET_MessageHeader *) &ats[ats_count];
+    em = (const struct GNUNET_MessageHeader *) &ntm[1];
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "Received message of type %u and size %u from peer `%4s'\n",
          ntohs (em->type), ntohs (em->size), GNUNET_i2s (&ntm->peer));
     if ((GNUNET_NO == h->inbound_hdr_only) &&
         (msize !=
-         ntohs (em->size) + sizeof (struct NotifyTrafficMessage) +
-         +ats_count * sizeof (struct GNUNET_ATS_Information)))
+         ntohs (em->size) + sizeof (struct NotifyTrafficMessage)))
     {
       GNUNET_break (0);
       reconnect_later (h);
@@ -926,7 +931,7 @@ main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg)
         GNUNET_break_op (0);
         continue;
       }
-      pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &ntm->peer.hashPubKey);
+      pr = GNUNET_CONTAINER_multipeermap_get (h->peers, &ntm->peer);
       if (NULL == pr)
       {
        GNUNET_break (0);
@@ -934,15 +939,14 @@ main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg)
        return;
       }
       if (GNUNET_OK !=
-          h->handlers[hpos].callback (h->cls, &ntm->peer, em, ats,
-                                      ats_count))
+          h->handlers[hpos].callback (h->cls, &ntm->peer, em))
       {
         /* error in processing, do not process other messages! */
         break;
       }
     }
     if (NULL != h->inbound_notify)
-      h->inbound_notify (h->cls, &ntm->peer, em, ats, ats_count);
+      h->inbound_notify (h->cls, &ntm->peer, em);
     break;
   case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_OUTBOUND:
     if (msize < sizeof (struct NotifyTrafficMessage))
@@ -952,25 +956,21 @@ main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg)
       return;
     }
     ntm = (const struct NotifyTrafficMessage *) msg;
-    ats_count = ntohl (ntm->ats_count);
     if ((msize <
          sizeof (struct NotifyTrafficMessage) +
-         ats_count * sizeof (struct GNUNET_ATS_Information) +
          sizeof (struct GNUNET_MessageHeader)) )
     {
       GNUNET_break (0);
       reconnect_later (h);
       return;
     }
-    ats = (const struct GNUNET_ATS_Information*) &ntm[1];    
-    em = (const struct GNUNET_MessageHeader *) &ats[ats_count];
+    em = (const struct GNUNET_MessageHeader *) &ntm[1];
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "Received notification about transmission to `%s'.\n",
          GNUNET_i2s (&ntm->peer));
     if ((GNUNET_NO == h->outbound_hdr_only) &&
         (msize !=
-         ntohs (em->size) + sizeof (struct NotifyTrafficMessage) +
-         ats_count * sizeof (struct GNUNET_ATS_Information)))
+         ntohs (em->size) + sizeof (struct NotifyTrafficMessage)))
     {
       GNUNET_break (0);
       reconnect_later (h);
@@ -981,7 +981,7 @@ main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg)
       GNUNET_break (0);
       break;
     }
-    h->outbound_notify (h->cls, &ntm->peer, em, ats, ats_count);
+    h->outbound_notify (h->cls, &ntm->peer, em);
     break;
   case GNUNET_MESSAGE_TYPE_CORE_SEND_READY:
     if (msize != sizeof (struct SendMessageReady))
@@ -991,7 +991,7 @@ main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg)
       return;
     }
     smr = (const struct SendMessageReady *) msg;
-    pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &smr->peer.hashPubKey);
+    pr = GNUNET_CONTAINER_multipeermap_get (h->peers, &smr->peer);
     if (NULL == pr)
     {
       GNUNET_break (0);
@@ -1078,7 +1078,8 @@ reconnect (struct GNUNET_CORE_Handle *h)
   unsigned int hpos;
 
   GNUNET_assert (NULL == h->client);
-  GNUNET_assert (h->currently_down == GNUNET_YES);
+  GNUNET_assert (GNUNET_YES == h->currently_down);
+  GNUNET_assert (NULL != h->cfg);
   h->client = GNUNET_CLIENT_connect ("core", h->cfg);
   if (NULL == h->client)
   {
@@ -1107,7 +1108,7 @@ reconnect (struct GNUNET_CORE_Handle *h)
     else
       opt |= GNUNET_CORE_OPTION_SEND_FULL_OUTBOUND;
   }
-  LOG (GNUNET_ERROR_TYPE_INFO, 
+  LOG (GNUNET_ERROR_TYPE_INFO,
        "(Re)connecting to CORE service, monitoring messages of type %u\n",
        opt);
 
@@ -1133,13 +1134,13 @@ reconnect (struct GNUNET_CORE_Handle *h)
  * @param connects function to call on peer connect, can be NULL
  * @param disconnects function to call on peer disconnect / timeout, can be NULL
  * @param inbound_notify function to call for all inbound messages, can be NULL
- * @param inbound_hdr_only set to GNUNET_YES if inbound_notify will only read the
+ * @param inbound_hdr_only set to #GNUNET_YES if inbound_notify will only read the
  *                GNUNET_MessageHeader and hence we do not need to give it the full message;
- *                can be used to improve efficiency, ignored if inbound_notify is NULLL
+ *                can be used to improve efficiency, ignored if @a inbound_notify is NULLL
  * @param outbound_notify function to call for all outbound messages, can be NULL
- * @param outbound_hdr_only set to GNUNET_YES if outbound_notify will only read the
+ * @param outbound_hdr_only set to #GNUNET_YES if outbound_notify will only read the
  *                GNUNET_MessageHeader and hence we do not need to give it the full message
- *                can be used to improve efficiency, ignored if outbound_notify is NULLL
+ *                can be used to improve efficiency, ignored if @a outbound_notify is NULLL
  * @param handlers callbacks for messages we care about, NULL-terminated
  * @return handle to the core service (only useful for disconnect until 'init' is called);
  *                NULL on error (in this case, init is never called)
@@ -1158,7 +1159,8 @@ GNUNET_CORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
 {
   struct GNUNET_CORE_Handle *h;
 
-  h = GNUNET_malloc (sizeof (struct GNUNET_CORE_Handle));
+  GNUNET_assert (NULL != cfg);
+  h = GNUNET_new (struct GNUNET_CORE_Handle);
   h->cfg = cfg;
   h->cls = cls;
   h->init = init;
@@ -1171,14 +1173,15 @@ GNUNET_CORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
   h->handlers = handlers;
   h->hcnt = 0;
   h->currently_down = GNUNET_YES;
-  h->peers = GNUNET_CONTAINER_multihashmap_create (128, GNUNET_NO);
+  h->peers = GNUNET_CONTAINER_multipeermap_create (128, GNUNET_NO);
   if (NULL != handlers)
     while (handlers[h->hcnt].callback != NULL)
       h->hcnt++;
   GNUNET_assert (h->hcnt <
                  (GNUNET_SERVER_MAX_MESSAGE_SIZE -
                   sizeof (struct InitMessage)) / sizeof (uint16_t));
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to CORE service\n");
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Connecting to CORE service\n");
   reconnect (h);
   return h;
 }
@@ -1186,7 +1189,7 @@ GNUNET_CORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
 
 /**
  * Disconnect from the core service.  This function can only
- * be called *after* all pending 'GNUNET_CORE_notify_transmit_ready'
+ * be called *after* all pending #GNUNET_CORE_notify_transmit_ready()
  * requests have been explicitly canceled.
  *
  * @param handle connection to core to disconnect
@@ -1196,6 +1199,8 @@ GNUNET_CORE_disconnect (struct GNUNET_CORE_Handle *handle)
 {
   struct ControlMessage *cm;
 
+  GNUNET_assert (NULL != handle);
+
   LOG (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting from CORE service\n");
   if (NULL != handle->cth)
   {
@@ -1217,7 +1222,7 @@ GNUNET_CORE_disconnect (struct GNUNET_CORE_Handle *handle)
     GNUNET_CLIENT_disconnect (handle->client);
     handle->client = NULL;
   }
-  GNUNET_CONTAINER_multihashmap_iterate (handle->peers,
+  GNUNET_CONTAINER_multipeermap_iterate (handle->peers,
                                          &disconnect_and_free_peer_entry,
                                          handle);
   if (handle->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
@@ -1225,7 +1230,7 @@ GNUNET_CORE_disconnect (struct GNUNET_CORE_Handle *handle)
     GNUNET_SCHEDULER_cancel (handle->reconnect_task);
     handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
   }
-  GNUNET_CONTAINER_multihashmap_destroy (handle->peers);
+  GNUNET_CONTAINER_multipeermap_destroy (handle->peers);
   handle->peers = NULL;
   GNUNET_break (handle->ready_peer_head == NULL);
   GNUNET_free (handle);
@@ -1235,7 +1240,7 @@ GNUNET_CORE_disconnect (struct GNUNET_CORE_Handle *handle)
 /**
  * Task that calls 'request_next_transmission'.
  *
- * @param cls the 'struct PeerRecord*'
+ * @param cls the 'struct PeerRecord *'
  * @param tc scheduler context
  */
 static void
@@ -1250,21 +1255,21 @@ run_request_next_transmission (void *cls,
 
 
 /**
- * Ask the core to call "notify" once it is ready to transmit the
- * given number of bytes to the specified "target".  Must only be
+ * Ask the core to call @a notify once it is ready to transmit the
+ * given number of bytes to the specified @a target.  Must only be
  * called after a connection to the respective peer has been
  * established (and the client has been informed about this).  You may
  * have one request of this type pending for each connected peer at
  * any time.  If a peer disconnects, the application MUST call
- * "GNUNET_CORE_notify_transmit_ready_cancel" on the respective
+ * #GNUNET_CORE_notify_transmit_ready_cancel on the respective
  * transmission request, if one such request is pending.
  *
  * @param handle connection to core service
  * @param cork is corking allowed for this transmission?
  * @param priority how important is the message?
- * @param maxdelay how long can the message wait?
+ * @param maxdelay how long can the message wait? Only effective if @a cork is #GNUNET_YES
  * @param target who should receive the message, never NULL (can be this peer's identity for loopback)
- * @param notify_size how many bytes of buffer space does notify want?
+ * @param notify_size how many bytes of buffer space does @a notify want?
  * @param notify function to call when buffer space is available;
  *        will be called with NULL on timeout; clients MUST cancel
  *        all pending transmission requests DURING the disconnect
@@ -1272,11 +1277,12 @@ run_request_next_transmission (void *cls,
  * @param notify_cls closure for notify
  * @return non-NULL if the notify callback was queued,
  *         NULL if we can not even queue the request (request already pending);
- *         if NULL is returned, "notify" will NOT be called.
+ *         if NULL is returned, @a notify will NOT be called.
  */
 struct GNUNET_CORE_TransmitHandle *
-GNUNET_CORE_notify_transmit_ready (struct GNUNET_CORE_Handle *handle, int cork,
-                                   uint32_t priority,
+GNUNET_CORE_notify_transmit_ready (struct GNUNET_CORE_Handle *handle,
+                                   int cork,
+                                   enum GNUNET_CORE_Priority priority,
                                    struct GNUNET_TIME_Relative maxdelay,
                                    const struct GNUNET_PeerIdentity *target,
                                    size_t notify_size,
@@ -1296,7 +1302,7 @@ GNUNET_CORE_notify_transmit_ready (struct GNUNET_CORE_Handle *handle, int cork,
        "Asking core for transmission of %u bytes to `%s'\n",
        (unsigned int) notify_size,
        GNUNET_i2s (target));
-  pr = GNUNET_CONTAINER_multihashmap_get (handle->peers, &target->hashPubKey);
+  pr = GNUNET_CONTAINER_multipeermap_get (handle->peers, target);
   if (NULL == pr)
   {
     /* attempt to send to peer that is not connected */
@@ -1323,7 +1329,8 @@ GNUNET_CORE_notify_transmit_ready (struct GNUNET_CORE_Handle *handle, int cork,
   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == pr->ntr_task);
   pr->ntr_task =
     GNUNET_SCHEDULER_add_now (&run_request_next_transmission, pr);
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmission request added to queue\n");
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Transmission request added to queue\n");
   return th;
 }
 
@@ -1339,6 +1346,7 @@ GNUNET_CORE_notify_transmit_ready_cancel (struct GNUNET_CORE_TransmitHandle *th)
   struct PeerRecord *pr = th->peer;
   struct GNUNET_CORE_Handle *h;
 
+  GNUNET_assert (NULL != th);
   GNUNET_assert (NULL != pr);
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Aborting transmission request to core for %u bytes to `%s'\n",
@@ -1390,8 +1398,128 @@ GNUNET_CORE_is_peer_connected_sync (const struct GNUNET_CORE_Handle *h,
 {
   GNUNET_assert (NULL != h);
   GNUNET_assert (NULL != pid);
-  return GNUNET_CONTAINER_multihashmap_contains (h->peers, &pid->hashPubKey);
+  return GNUNET_CONTAINER_multipeermap_contains (h->peers, pid);
+}
+
+
+/**
+ * Function called to notify a client about the connection
+ * begin ready to queue more data.  "buf" will be
+ * NULL and "size" zero if the connection was closed for
+ * writing in the meantime.
+ *
+ * @param cls closure
+ * @param size number of bytes available in buf
+ * @param buf where the callee should write the message
+ * @return number of bytes written to buf
+ */
+static size_t
+core_mq_ntr (void *cls, size_t size,
+             void *buf)
+{
+  struct GNUNET_MQ_Handle *mq = cls;
+  struct CoreMQState *mqs = GNUNET_MQ_impl_state (mq);
+  const struct GNUNET_MessageHeader *mh = GNUNET_MQ_impl_current (mq);
+  size_t msg_size = ntohs (mh->size);
+  GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "core-mq", "ntr called (size %u, type %u)\n",
+                   msg_size, ntohs (mh->type));
+  mqs->th = NULL;
+  if (NULL == buf)
+  {
+    GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "core-mq", "send error\n");
+    GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_WRITE);
+    return 0;
+  }
+  memcpy (buf, mh, msg_size);
+  GNUNET_MQ_impl_send_continue (mq);
+  return msg_size;
+}
+
+
+/**
+ * Signature of functions implementing the
+ * sending functionality of a message queue.
+ *
+ * @param mq the message queue
+ * @param msg the message to send
+ * @param impl_state state of the implementation
+ */
+static void
+core_mq_send (struct GNUNET_MQ_Handle *mq,
+              const struct GNUNET_MessageHeader *msg,
+              void *impl_state)
+{
+  struct CoreMQState *mqs = impl_state;
+  GNUNET_assert (NULL == mqs->th);
+  GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "core-mq", "Sending queued message (size %u)\n",
+             ntohs (msg->size));
+  mqs->th = GNUNET_CORE_notify_transmit_ready (mqs->core, GNUNET_YES, 0,
+                                               GNUNET_TIME_UNIT_FOREVER_REL,
+                                               &mqs->target,
+                                               ntohs (msg->size), core_mq_ntr, mq);
+}
+
+
+/**
+ * Signature of functions implementing the
+ * destruction of a message queue.
+ * Implementations must not free @a mq, but should
+ * take care of @a impl_state.
+ *
+ * @param mq the message queue to destroy
+ * @param impl_state state of the implementation
+ */
+static void
+core_mq_destroy (struct GNUNET_MQ_Handle *mq, void *impl_state)
+{
+  struct CoreMQState *mqs = impl_state;
+  if (NULL != mqs->th)
+  {
+    GNUNET_CORE_notify_transmit_ready_cancel (mqs->th);
+    mqs->th = NULL;
+  }
+  GNUNET_free (mqs);
 }
 
 
+/**
+ * Implementation function that cancels the currently sent message.
+ *
+ * @param mq message queue
+ * @param impl_state state specific to the implementation
+ */
+static void
+core_mq_cancel (struct GNUNET_MQ_Handle *mq, void *impl_state)
+{
+  struct CoreMQState *mqs = impl_state;
+  GNUNET_assert (NULL != mqs->th);
+  GNUNET_CORE_notify_transmit_ready_cancel (mqs->th);
+}
+
+
+/**
+ * Create a message queue for sending messages to a peer with CORE.
+ * Messages may only be queued with #GNUNET_MQ_send once the init callback has
+ * been called for the given handle.
+ * There must only be one queue per peer for each core handle.
+ * The message queue can only be used to transmit messages,
+ * not to receive them.
+ *
+ * @param h the core handle
+ * @param target the target peer for this queue, may not be NULL
+ * @return a message queue for sending messages over the core handle
+ *         to the target peer
+ */
+struct GNUNET_MQ_Handle *
+GNUNET_CORE_mq_create (struct GNUNET_CORE_Handle *h,
+                       const struct GNUNET_PeerIdentity *target)
+{
+  struct CoreMQState *mqs = GNUNET_new (struct CoreMQState);
+  mqs->core = h;
+  mqs->target = *target;
+  return GNUNET_MQ_queue_for_callbacks (core_mq_send, core_mq_destroy,
+                                        core_mq_cancel, mqs,
+                                        NULL, NULL, NULL);
+}
+
 /* end of core_api.c */