#include "platform.h"
#include "gnunet-service-fs_cp.h"
#include "gnunet-service-fs_pe.h"
+#include "gnunet-service-fs_pr.h"
/**
- * Hash map from peer identities to GNUNET_CONTAINER_Heap's with
- * pending requests as entries.
+ * Transmission plan for a peer.
+ */
+struct PeerPlan
+{
+ /**
+ * Heap with pending queries, smaller weights mean higher priority.
+ */
+ struct GNUNET_CONTAINER_Heap *heap;
+
+ /**
+ * Current transmission request handle.
+ */
+ struct GSF_PeerTransmitHandle *pth;
+
+ /**
+ * Peer for which this is the plan.
+ */
+ struct GSF_ConnectedPeer *cp;
+
+ /**
+ * Current task for executing the plan.
+ */
+ GNUNET_SCHEDULER_TaskIdentifier task;
+};
+
+
+/**
+ * Hash map from peer identities to PeerPlans.
*/
static struct GNUNET_CONTAINER_MultiHashMap *plans;
/**
- * Get the size of the request queue for the given peer.
+ * Figure out when and how to transmit to the given peer.
+ *
+ * @param cls the 'struct GSF_ConnectedPeer' for transmission
+ * @param tc scheduler context
+ */
+static void
+schedule_peer_transmission (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc);
+
+
+/**
+ * Function called to get a message for transmission.
*
- * @param cp connected peer to query
- * @return number of entries in this peer's request queue
+ * @param cls closure
+ * @param buf_size number of bytes available in buf
+ * @param buf where to copy the message, NULL on error (peer disconnect)
+ * @return number of bytes copied to 'buf', can be 0 (without indicating an error)
*/
-static struct GNUNET_CONTAINER_Heap *
-get_heap (const struct GSF_ConnectedPeer *cp)
+static size_t
+transmit_message_callback (void *cls,
+ size_t buf_size,
+ void *buf)
{
- struct GNUNET_PeerIdentity id;
+ struct PeerPlan *pp = cls;
+ struct GSF_PendingRequest *pr;
+ size_t msize;
- GSF_connected_peer_get_identity_ (cp, &id);
- return GNUNET_CONTAINER_multihashmap_get (plans,
- &id.hashPubKey);
+ if (NULL == buf)
+ {
+ /* failed, try again... */
+ pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
+ return 0;
+ }
+ pr = GNUNET_CONTAINER_heap_peek (pp->heap);
+ msize = GSF_pending_request_get_message_ (pr, buf_size, buf);
+ if (msize > buf_size)
+ {
+ /* buffer to small (message changed), try again */
+ pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
+ return 0;
+ }
+ /* remove from root, add again elsewhere... */
+ GNUNET_assert (pr == GNUNET_CONTAINER_heap_remove_root (pp->heap));
+ GSF_plan_add_ (pp->cp, pr);
+ return msize;
+}
+
+
+/**
+ * Figure out when and how to transmit to the given peer.
+ *
+ * @param cls the 'struct PeerPlan'
+ * @param tc scheduler context
+ */
+static void
+schedule_peer_transmission (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct PeerPlan *pp = cls;
+ struct GSF_PendingRequest *pr;
+ size_t msize;
+ struct GNUNET_TIME_Relative delay;
+
+ pp->task = GNUNET_SCHEDULER_NO_TASK;
+ if (NULL == pp->heap)
+ return;
+ if (0 == GNUNET_CONTAINER_heap_get_size (pp->heap))
+ return;
+ GNUNET_assert (NULL == pp->pth);
+ pr = GNUNET_CONTAINER_heap_peek (pp->heap);
+ if (0) // FIXME: if (re)transmission should wait, wait...
+ {
+ delay = GNUNET_TIME_UNIT_SECONDS;
+ // FIXME
+ pp->task = GNUNET_SCHEDULER_add_delayed (delay,
+ &schedule_peer_transmission,
+ pp);
+ return;
+ }
+ msize = GSF_pending_request_get_message_ (pr, 0, NULL);
+ pp->pth = GSF_peer_transmit_ (pp->cp,
+ GNUNET_YES,
+ 0 /* FIXME: pr->priority? */,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ msize,
+ &transmit_message_callback,
+ pp);
+ GNUNET_assert (NULL != pp->pth);
}
*
* @param cp peer with the entry
* @param pr request with the entry
- * @param weight determines position of the entry in the cp queue,
- * lower weights are earlier in the queue
*/
void
GSF_plan_add_ (const struct GSF_ConnectedPeer *cp,
- struct GSF_PendingRequest *pr,
- GNUNET_CONTAINER_HeapCostType weight)
+ struct GSF_PendingRequest *pr)
{
struct GNUNET_PeerIdentity id;
- struct GNUNET_CONTAINER_Heap *h;
-
+ struct PeerPlan *pp;
+ GNUNET_CONTAINER_HeapCostType weight;
+
GSF_connected_peer_get_identity_ (cp, &id);
- h = GNUNET_CONTAINER_multihashmap_get (plans,
- &id.hashPubKey);
- if (NULL == h)
+ pp = GNUNET_CONTAINER_multihashmap_get (plans,
+ &id.hashPubKey);
+ if (NULL == pp)
{
- h = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
+ pp = GNUNET_malloc (sizeof (struct PeerPlan));
+ pp->heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
GNUNET_CONTAINER_multihashmap_put (plans,
&id.hashPubKey,
- h,
+ pp,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
}
- GNUNET_CONTAINER_heap_insert (h,
+ weight = 0; // FIXME: calculate real weight!
+ GNUNET_CONTAINER_heap_insert (pp->heap,
pr,
weight);
+ if (pp->pth != NULL)
+ {
+ if (pr != GNUNET_CONTAINER_heap_peek (pp->heap))
+ return;
+ GSF_peer_transmit_cancel_ (pp->pth);
+ pp->pth = NULL;
+ }
+ if (GNUNET_SCHEDULER_NO_TASK != pp->task)
+ GNUNET_SCHEDULER_cancel (pp->task);
+ pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission,
+ pp);
}
GSF_plan_notify_peer_disconnect_ (const struct GSF_ConnectedPeer *cp)
{
struct GNUNET_PeerIdentity id;
- struct GNUNET_CONTAINER_Heap *h;
+ struct PeerPlan *pp;
GSF_connected_peer_get_identity_ (cp, &id);
- h = GNUNET_CONTAINER_multihashmap_get (plans,
- &id.hashPubKey);
+ pp = GNUNET_CONTAINER_multihashmap_get (plans,
+ &id.hashPubKey);
GNUNET_CONTAINER_multihashmap_remove (plans,
&id.hashPubKey,
- h);
- GNUNET_CONTAINER_heap_destroy (h);
+ pp);
+ if (NULL != pp->pth)
+ GSF_peer_transmit_cancel_ (pp->pth);
+ if (GNUNET_SCHEDULER_NO_TASK != pp->task)
+ GNUNET_SCHEDULER_cancel (pp->task);
+ GNUNET_CONTAINER_heap_destroy (pp->heap);
+ GNUNET_free (pp);
}
/**
- * Remove the given request from all heaps. *
+ * Remove the given request from all heaps. * FIXME: O(n) -- inefficient!
*
* @param cls 'struct GSF_PendingRequest' to purge
* @param key identity of the peer we're currently looking at (unused)
- * @param value request heap for the given peer to search for the 'cls'
+ * @param value PeerPlan for the given peer to search for the 'cls'
* @return GNUNET_OK (continue iteration)
*/
static int
void *value)
{
const struct GSF_PendingRequest *pr = cls;
- struct GNUNET_CONTAINER_Heap *h = value;
+ struct PeerPlan *pp = value;
+ struct GNUNET_CONTAINER_Heap *h = pp->heap;
struct FindRequestClosure frc;
frc.pr = pr;
}
-/**
- * Get the lowest-weight entry for the respective peer
- * from the plan. Removes the entry from the plan's queue.
- *
- * @param cp connected peer to query for the next request
- * @return NULL if the queue for this peer is empty
- */
-struct GSF_PendingRequest *
-GSF_plan_get_ (const struct GSF_ConnectedPeer *cp)
-{
- struct GNUNET_CONTAINER_Heap *h;
-
- h = get_heap (cp);
- if (NULL == h)
- return NULL;
- return GNUNET_CONTAINER_heap_remove_root (h);
-}
-
-
-/**
- * Get the size of the request queue for the given peer.
- *
- * @param cp connected peer to query
- * @return number of entries in this peer's request queue
- */
-unsigned int
-GSF_plan_size_ (const struct GSF_ConnectedPeer *cp)
-{
- struct GNUNET_CONTAINER_Heap *h;
-
- h = get_heap (cp);
- if (NULL == h)
- return 0;
- return GNUNET_CONTAINER_heap_get_size (h);
-}
-
-
-
/**
* Initialize plan subsystem.
*/