+/**
+ * Function called to get a message for transmission.
+ *
+ * @param cls closure
+ * @param buf_size number of bytes available in @a buf
+ * @param buf where to copy the message, NULL on error (peer disconnect)
+ * @return number of bytes copied to 'buf', can be 0 (without indicating an error)
+ */
+static size_t
+transmit_message_callback (void *cls, size_t buf_size, void *buf)
+{
+ struct PeerPlan *pp = cls;
+ struct GSF_RequestPlan *rp;
+ size_t msize;
+
+ pp->pth = NULL;
+ if (NULL == buf)
+ {
+ /* failed, try again... */
+ if (NULL != pp->task)
+ GNUNET_SCHEDULER_cancel (pp->task);
+
+ pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
+ GNUNET_STATISTICS_update (GSF_stats,
+ gettext_noop
+ ("# transmission failed (core has no bandwidth)"),
+ 1, GNUNET_NO);
+ return 0;
+ }
+ rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap);
+ if (NULL == rp)
+ {
+ if (NULL != pp->task)
+ GNUNET_SCHEDULER_cancel (pp->task);
+ pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
+ return 0;
+ }
+ msize = GSF_pending_request_get_message_ (get_latest (rp), buf_size, buf);
+ if (msize > buf_size)
+ {
+ if (NULL != pp->task)
+ GNUNET_SCHEDULER_cancel (pp->task);
+ /* buffer to small (message changed), try again */
+ pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
+ return 0;
+ }
+ /* remove from root, add again elsewhere... */
+ GNUNET_assert (rp == GNUNET_CONTAINER_heap_remove_root (pp->priority_heap));
+ rp->hn = NULL;
+ rp->last_transmission = GNUNET_TIME_absolute_get ();
+ rp->transmission_counter++;
+ total_delay++;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Executing plan %p executed %u times, planning retransmission\n",
+ rp, rp->transmission_counter);
+ plan (pp, rp);
+ GNUNET_STATISTICS_update (GSF_stats,
+ gettext_noop
+ ("# query messages sent to other peers"), 1,
+ GNUNET_NO);
+ return msize;
+}
+
+
+/**
+ * Figure out when and how to transmit to the given peer.
+ *
+ * @param cls the `struct PeerPlan`
+ * @param tc scheduler context
+ */
+static void
+schedule_peer_transmission (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct PeerPlan *pp = cls;
+ struct GSF_RequestPlan *rp;
+ size_t msize;
+ struct GNUNET_TIME_Relative delay;
+
+ pp->task = NULL;
+ if (NULL != pp->pth)
+ {
+ GSF_peer_transmit_cancel_ (pp->pth);
+ pp->pth = NULL;
+ }
+ /* move ready requests to priority queue */
+ while ((NULL != (rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap))) &&
+ (0 == GNUNET_TIME_absolute_get_remaining
+ (rp->earliest_transmission).rel_value_us))
+ {
+ GNUNET_assert (rp == GNUNET_CONTAINER_heap_remove_root (pp->delay_heap));
+ rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap, rp, rp->priority);
+ }
+ if (0 == GNUNET_CONTAINER_heap_get_size (pp->priority_heap))
+ {
+ /* priority heap (still) empty, check for delay... */
+ rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap);
+ if (NULL == rp)