*/
int is_query;
+ /**
+ * Did we get a reservation already?
+ */
+ int was_reserved;
+
/**
* Priority of this request.
*/
};
+/**
+ * Information per peer and request.
+ */
+struct PeerRequest
+{
+
+ /**
+ * Handle to generic request.
+ */
+ struct GSF_PendingRequest *pr;
+
+ /**
+ * Handle to specific peer.
+ */
+ struct GSF_ConnectedPeer *cp;
+
+ /**
+ * Task for asynchronous stopping of this request.
+ */
+ GNUNET_SCHEDULER_TaskIdentifier kill_task;
+
+};
+
+
/**
* A connected peer.
*/
GNUNET_SCHEDULER_TaskIdentifier irc_delay_task;
/**
- * Active requests from this neighbour.
+ * Active requests from this neighbour, map of query to 'struct PeerRequest'.
*/
struct GNUNET_CONTAINER_MultiHashMap *request_map;
}
+/**
+ * Core is ready to transmit to a peer, get the message.
+ *
+ * @param cls the 'struct GSF_PeerTransmitHandle' of the message
+ * @param size number of bytes core is willing to take
+ * @param buf where to copy the message
+ * @return number of bytes copied to buf
+ */
+static size_t
+peer_transmit_ready_cb (void *cls,
+ size_t size,
+ void *buf);
+
+
+
+
+/**
+ * Function called by core upon success or failure of our bandwidth reservation request.
+ *
+ * @param cls the 'struct GSF_ConnectedPeer' of the peer for which we made the request
+ * @param peer identifies the peer
+ * @param bandwidth_out available amount of outbound bandwidth
+ * @param amount set to the amount that was actually reserved or unreserved;
+ * either the full requested amount or zero (no partial reservations)
+ * @param res_delay if the reservation could not be satisfied (amount was 0), how
+ * long should the client wait until re-trying?
+ * @param preference current traffic preference for the given peer
+ */
+static void
+core_reserve_callback (void *cls,
+ const struct GNUNET_PeerIdentity *peer,
+ struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out,
+ int32_t amount,
+ struct GNUNET_TIME_Relative res_delay,
+ uint64_t preference);
+
+
+/**
+ * If ready (bandwidth reserved), try to schedule transmission via
+ * core for the given handle.
+ *
+ * @param pth transmission handle to schedule
+ */
+static void
+schedule_transmission (struct GSF_PeerTransmitHandle *pth)
+{
+ struct GSF_ConnectedPeer *cp;
+ struct GNUNET_PeerIdentity target;
+ uint64_t ip;
+
+ if (NULL != pth->cth)
+ return; /* already done */
+ cp = pth->cp;
+ GNUNET_PEER_resolve (cp->ppd.pid,
+ &target);
+ if ( (GNUNET_YES == pth->is_query) &&
+ (GNUNET_YES != pth->was_reserved) )
+ {
+ /* query, need reservation */
+ if (GNUNET_YES != cp->did_reserve)
+ return; /* not ready */
+ cp->did_reserve = GNUNET_NO;
+ /* reservation already done! */
+ pth->was_reserved = GNUNET_YES;
+ ip = cp->inc_preference;
+ cp->inc_preference = 0;
+ cp->irc = GNUNET_CORE_peer_change_preference (GSF_core,
+ &target,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ GNUNET_BANDWIDTH_VALUE_MAX,
+ DBLOCK_SIZE,
+ ip,
+ &core_reserve_callback,
+ cp);
+ }
+ pth->cth = GNUNET_CORE_notify_transmit_ready (GSF_core,
+ GNUNET_YES,
+ pth->priority,
+ GNUNET_TIME_absolute_get_remaining (pth->timeout),
+ &target,
+ pth->size,
+ &peer_transmit_ready_cb,
+ pth);
+}
+
+
/**
* Core is ready to transmit to a peer, get the message.
*
struct GSF_ConnectedPeer *cp;
size_t ret;
+ pth->cth = NULL;
if (pth->timeout_task != GNUNET_SCHEDULER_NO_TASK)
{
GNUNET_SCHEDULER_cancel (pth->timeout_task);
GNUNET_TIME_absolute_get_duration (pth->transmission_request_start_time).rel_value);
ret = pth->gmc (pth->gmc_cls,
size, buf);
- GNUNET_free (pth);
+ GNUNET_free (pth);
+ for (pth = cp->pth_head; pth != NULL; pth = pth->next)
+ schedule_transmission (pth);
return ret;
}
-/**
- * Function called by core upon success or failure of our bandwidth reservation request.
- *
- * @param cls the 'struct GSF_ConnectedPeer' of the peer for which we made the request
- * @param peer identifies the peer
- * @param bandwidth_out available amount of outbound bandwidth
- * @param amount set to the amount that was actually reserved or unreserved;
- * either the full requested amount or zero (no partial reservations)
- * @param res_delay if the reservation could not be satisfied (amount was 0), how
- * long should the client wait until re-trying?
- * @param preference current traffic preference for the given peer
- */
-static void
-core_reserve_callback (void *cls,
- const struct GNUNET_PeerIdentity *peer,
- struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out,
- int32_t amount,
- struct GNUNET_TIME_Relative res_delay,
- uint64_t preference);
-
-
/**
* (re)try to reserve bandwidth from the given peer.
*
}
+/**
+ * Free the given client request.
+ *
+ * @param cls the client request to free
+ * @param tc task context
+ */
+static void
+peer_request_destroy (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct PeerRequest *peerreq = cls;
+ struct GSF_PendingRequest *pr = peerreq->pr;
+ struct GSF_ConnectedPeer *cp = peerreq->cp;
+ struct GSF_PendingRequestData *prd;
+
+ peerreq->kill_task = GNUNET_SCHEDULER_NO_TASK;
+ prd = GSF_pending_request_get_data_ (pr);
+ GNUNET_STATISTICS_update (GSF_stats,
+ gettext_noop ("# P2P searches active"),
+ -1,
+ GNUNET_NO);
+ GNUNET_break (GNUNET_OK ==
+ GNUNET_CONTAINER_multihashmap_remove (cp->request_map,
+ &prd->query,
+ peerreq));
+ GSF_pending_request_cancel_ (pr);
+ GNUNET_free (peerreq);
+}
+
+
/**
* Handle a reply to a pending request. Also called if a request
* expires (then with data == NULL). The handler may be called
* and will also not be called anymore after a call signalling
* expiration.
*
- * @param cls 'struct GSF_ConnectedPeer' of the peer that would
- * have liked an answer to the request
+ * @param cls 'struct PeerRequest' this is an answer for
* @param eval evaluation of the result
* @param pr handle to the original pending request
* @param expiration when does 'data' expire?
const void *data,
size_t data_len)
{
- struct GSF_ConnectedPeer *cp = cls;
+ struct PeerRequest *peerreq = cls;
+ struct GSF_ConnectedPeer *cp = peerreq->cp;
struct GSF_PendingRequestData *prd;
struct PutMessage *pm;
size_t msize;
GNUNET_assert (data_len < GNUNET_SERVER_MAX_MESSAGE_SIZE);
+ GNUNET_assert (peerreq->pr == pr);
prd = GSF_pending_request_get_data_ (pr);
if (NULL == data)
{
GNUNET_break (GNUNET_OK ==
GNUNET_CONTAINER_multihashmap_remove (cp->request_map,
&prd->query,
- pr));
+ peerreq));
+ GNUNET_free (peerreq);
return;
}
GNUNET_break (type != GNUNET_BLOCK_TYPE_ANY);
pm);
if (eval != GNUNET_BLOCK_EVALUATION_OK_LAST)
return;
- GNUNET_STATISTICS_update (GSF_stats,
- gettext_noop ("# P2P searches active"),
- -1,
- GNUNET_NO);
- GNUNET_break (GNUNET_OK ==
- GNUNET_CONTAINER_multihashmap_remove (cp->request_map,
- &prd->query,
- pr));
- GSF_pending_request_cancel_ (pr);
+ peerreq->kill_task = GNUNET_SCHEDULER_add_now (&peer_request_destroy,
+ peerreq);
}
GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
const struct GNUNET_MessageHeader *message)
{
+ struct PeerRequest *peerreq;
struct GSF_PendingRequest *pr;
struct GSF_PendingRequestData *prd;
struct GSF_ConnectedPeer *cp;
ttl -= ttl_decrement;
/* test if the request already exists */
- pr = GNUNET_CONTAINER_multihashmap_get (cp->request_map,
- &gm->query);
- if (pr != NULL)
+ peerreq = GNUNET_CONTAINER_multihashmap_get (cp->request_map,
+ &gm->query);
+ if (peerreq != NULL)
{
+ pr = peerreq->pr;
prd = GSF_pending_request_get_data_ (pr);
if ( (prd->type == type) &&
( (type != GNUNET_BLOCK_TYPE_FS_SBLOCK) ||
GNUNET_assert (GNUNET_YES ==
GNUNET_CONTAINER_multihashmap_remove (cp->request_map,
&gm->query,
- pr));
+ peerreq));
+ if (peerreq->kill_task != GNUNET_SCHEDULER_NO_TASK)
+ GNUNET_SCHEDULER_cancel (peerreq->kill_task);
+ GNUNET_free (peerreq);
}
}
+ peerreq = GNUNET_malloc (sizeof (struct PeerRequest));
+ peerreq->cp = cp;
pr = GSF_pending_request_create_ (options,
type,
&gm->query,
spid,
NULL, 0, /* replies_seen */
&handle_p2p_reply,
- cp);
+ peerreq);
+ peerreq->pr = pr;
GNUNET_break (GNUNET_OK ==
GNUNET_CONTAINER_multihashmap_put (cp->request_map,
&gm->query,
- pr,
+ peerreq,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
GNUNET_STATISTICS_update (GSF_stats,
gettext_noop ("# P2P searches received"),
struct GSF_PeerTransmitHandle *pth;
struct GSF_PeerTransmitHandle *pos;
struct GSF_PeerTransmitHandle *prev;
- struct GNUNET_PeerIdentity target;
- uint64_t ip;
- int is_ready;
pth = GNUNET_malloc (sizeof (struct GSF_PeerTransmitHandle));
pth->transmission_request_start_time = GNUNET_TIME_absolute_get ();
cp->pth_tail,
prev,
pth);
- GNUNET_PEER_resolve (cp->ppd.pid,
- &target);
if (GNUNET_YES == is_query)
- {
- /* query, need reservation */
- cp->ppd.pending_queries++;
- if (GNUNET_YES == cp->did_reserve)
- {
- cp->did_reserve = GNUNET_NO;
- /* reservation already done! */
- is_ready = GNUNET_YES;
- ip = cp->inc_preference;
- cp->inc_preference = 0;
- cp->irc = GNUNET_CORE_peer_change_preference (GSF_core,
- &target,
- GNUNET_TIME_UNIT_FOREVER_REL,
- GNUNET_BANDWIDTH_VALUE_MAX,
- DBLOCK_SIZE,
- ip,
- &core_reserve_callback,
- cp);
- }
- else
- {
- /* still waiting for reservation */
- is_ready = GNUNET_NO;
- }
- }
+ cp->ppd.pending_queries++;
else if (GNUNET_NO == is_query)
- {
- /* no reservation needed for content */
- cp->ppd.pending_replies++;
- is_ready = GNUNET_YES;
- }
- else
- {
- /* not a query or content, no reservation needed */
- is_ready = GNUNET_YES;
- }
- if (is_ready)
- {
- pth->cth = GNUNET_CORE_notify_transmit_ready (GSF_core,
- GNUNET_YES,
- priority,
- timeout,
- &target,
- size,
- &peer_transmit_ready_cb,
+ cp->ppd.pending_replies++;
+
+ pth->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout,
+ &peer_transmit_timeout,
pth);
- /* pth->cth could be NULL here, that's OK, we'll try again
- later... */
- }
- else
- {
-#if DEBUG_FS
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Not ready to ask for transmission to `%s'\n",
- GNUNET_i2s (&target));
-#endif
- }
- if (pth->cth == NULL)
- {
-#if DEBUG_FS
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "No transmission task scheduled, creating timeout task (%llu ms)\n",
- (unsigned long long) timeout.rel_value);
-#endif
- /* if we're waiting for reservation OR if we could not do notify_transmit_ready,
- install a timeout task to be on the safe side */
- pth->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout,
- &peer_transmit_timeout,
- pth);
- }
+ schedule_transmission (pth);
return pth;
}
const GNUNET_HashCode *query,
void *value)
{
- struct GSF_PendingRequest *pr = value;
+ struct PeerRequest *peerreq = cls;
+ struct GSF_PendingRequest *pr = peerreq->pr;
GSF_pending_request_cancel_ (pr);
+ if (peerreq->kill_task != GNUNET_SCHEDULER_NO_TASK)
+ GNUNET_SCHEDULER_cancel (peerreq->kill_task);
+ GNUNET_free (peerreq);
return GNUNET_OK;
}
static struct GNUNET_CONTAINER_MultiHashMap *plans;
+/**
+ * Figure out when and how to transmit to the given peer.
+ *
+ * @param cls the 'struct GSF_ConnectedPeer' for transmission
+ * @param tc scheduler context
+ */
+static void
+schedule_peer_transmission (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc);
+
+
/**
* Insert the given request plan into the heap with the appropriate weight.
*
rp->hn = GNUNET_CONTAINER_heap_insert (pp->delay_heap,
rp,
rp->earliest_transmission.abs_value);
+ if (GNUNET_SCHEDULER_NO_TASK != pp->task)
+ GNUNET_SCHEDULER_cancel (pp->task);
+ pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
}
-/**
- * Figure out when and how to transmit to the given peer.
- *
- * @param cls the 'struct GSF_ConnectedPeer' for transmission
- * @param tc scheduler context
- */
-static void
-schedule_peer_transmission (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc);
-
-
/**
* Function called to get a message for transmission.
*
size_t msize;
pp->task = GNUNET_SCHEDULER_NO_TASK;
- GNUNET_assert (NULL == pp->pth);
+ if (pp->pth != NULL)
+ {
+ GSF_peer_transmit_cancel_ (pp->pth);
+ pp->pth = NULL;
+ }
/* move ready requests to priority queue */
while ( (NULL != (rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap))) &&
(GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value == 0) )
/* priority heap (still) empty, check for delay... */
rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap);
if (NULL == rp)
- return; /* both queues empty */
+ {
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "No active requests for plan %p.\n",
+ pp);
+#endif
+ return; /* both queues empty */
+ }
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Sleeping for %llu ms before retrying requests on plan %p.\n",
+ (unsigned long long) GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value,
+ pp);
+#endif
pp->task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission),
&schedule_peer_transmission,
pp);
rp);
#endif
GNUNET_assert (NULL != rp);
- msize = GSF_pending_request_get_message_ (rp->pr, 0, NULL);
+ msize = GSF_pending_request_get_message_ (rp->pr, 0, NULL);
pp->pth = GSF_peer_transmit_ (pp->cp,
GNUNET_YES,
rp->priority,
prd->rp_tail,
rp);
plan (pp, rp);
- if (0 == GNUNET_CONTAINER_heap_get_size (pp->priority_heap))
- {
- /* no request that should be done immediately, figure out delay */
- if (rp != GNUNET_CONTAINER_heap_peek (pp->delay_heap))
- return; /* did not change delay heap top, no need to do anything */
- GNUNET_assert (NULL == pp->pth);
- if (GNUNET_SCHEDULER_NO_TASK != pp->task)
- GNUNET_SCHEDULER_cancel (pp->task);
- pp->task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission),
- &schedule_peer_transmission,
- pp);
- return;
- }
-
- if (pp->pth != NULL)
- {
- if (rp != GNUNET_CONTAINER_heap_peek (pp->priority_heap))
- return; /* did not change priority heap top, no need to do anyhing */
- GSF_peer_transmit_cancel_ (pp->pth);
- pp->pth = NULL;
- }
- if (GNUNET_SCHEDULER_NO_TASK != pp->task)
- GNUNET_SCHEDULER_cancel (pp->task);
- pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission,
- pp);
}