/**
* Task called on timeout, or 0 for none.
*/
- struct GNUNET_SCHEDULER_Task * timeout_task;
+ struct GNUNET_SCHEDULER_Task *timeout_task;
/**
* Function to call to get the actual message.
/**
* Task for the delay.
*/
- struct GNUNET_SCHEDULER_Task * delay_task;
+ struct GNUNET_SCHEDULER_Task *delay_task;
/**
* Size of the message.
/**
* Task for asynchronous stopping of this request.
*/
- struct GNUNET_SCHEDULER_Task * kill_task;
+ struct GNUNET_SCHEDULER_Task *kill_task;
};
/**
* Task scheduled to revive migration to this peer.
*/
- struct GNUNET_SCHEDULER_Task * mig_revive_task;
+ struct GNUNET_SCHEDULER_Task *mig_revive_task;
/**
* Messages (replies, queries, content migration) we would like to
/**
* Task scheduled if we need to retry bandwidth reservation later.
*/
- struct GNUNET_SCHEDULER_Task * rc_delay_task;
+ struct GNUNET_SCHEDULER_Task *rc_delay_task;
/**
* Active requests from this neighbour, map of query to 'struct PeerRequest'.
*/
unsigned int cth_in_progress;
+ /**
+ * Number of entries in @e delayed_head DLL.
+ */
+ unsigned int delay_queue_size;
+
/**
* Respect rating for this peer on disk.
*/
unsigned int last_request_times_off;
/**
- * GNUNET_YES if we did successfully reserve 32k bandwidth,
- * GNUNET_NO if not.
+ * #GNUNET_YES if we did successfully reserve 32k bandwidth,
+ * #GNUNET_NO if not.
*/
int did_reserve;
GNUNET_assert (NULL == cp->cth);
cp->cth_in_progress++;
cp->cth =
- GNUNET_CORE_notify_transmit_ready (GSF_core, GNUNET_YES,
+ GNUNET_CORE_notify_transmit_ready (GSF_core,
+ GNUNET_YES,
GNUNET_CORE_PRIO_BACKGROUND,
GNUNET_TIME_absolute_get_remaining
- (pth->timeout), &target, pth->size,
+ (pth->timeout),
+ &target,
+ pth->size,
&peer_transmit_ready_cb, cp);
GNUNET_assert (NULL != cp->cth);
GNUNET_assert (0 < cp->cth_in_progress--);
* @return number of bytes copied to @a buf
*/
static size_t
-peer_transmit_ready_cb (void *cls, size_t size, void *buf)
+peer_transmit_ready_cb (void *cls,
+ size_t size,
+ void *buf)
{
struct GSF_ConnectedPeer *cp = cls;
struct GSF_PeerTransmitHandle *pth = cp->pth_head;
GNUNET_SCHEDULER_cancel (pth->timeout_task);
pth->timeout_task = NULL;
}
- GNUNET_CONTAINER_DLL_remove (cp->pth_head, cp->pth_tail, pth);
+ GNUNET_CONTAINER_DLL_remove (cp->pth_head,
+ cp->pth_tail,
+ pth);
if (GNUNET_YES == pth->is_query)
{
cp->ppd.last_request_times[(cp->last_request_times_off++) %
* @param tc scheduler context
*/
static void
-retry_reservation (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+retry_reservation (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct GSF_ConnectedPeer *cp = cls;
struct GNUNET_PeerIdentity target;
GNUNET_PEER_resolve (cp->ppd.pid, &target);
cp->rc_delay_task = NULL;
cp->rc =
- GNUNET_ATS_reserve_bandwidth (GSF_ats, &target, DBLOCK_SIZE,
+ GNUNET_ATS_reserve_bandwidth (GSF_ats,
+ &target,
+ DBLOCK_SIZE,
&ats_reserve_callback, cp);
}
* @return number of bytes copied to @a buf, can be 0 (without indicating an error)
*/
static size_t
-copy_reply (void *cls, size_t buf_size, void *buf)
+copy_reply (void *cls,
+ size_t buf_size,
+ void *buf)
{
struct PutMessage *pm = cls;
size_t size;
struct GSF_DelayedHandle *dh = cls;
struct GSF_ConnectedPeer *cp = dh->cp;
- GNUNET_CONTAINER_DLL_remove (cp->delayed_head, cp->delayed_tail, dh);
+ GNUNET_CONTAINER_DLL_remove (cp->delayed_head,
+ cp->delayed_tail,
+ dh);
+ cp->delay_queue_size--;
if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
{
GNUNET_free (dh->pm);
GNUNET_free (dh);
return;
}
- (void) GSF_peer_transmit_ (cp, GNUNET_NO, UINT32_MAX, REPLY_TIMEOUT,
- dh->msize, ©_reply, dh->pm);
+ (void) GSF_peer_transmit_ (cp,
+ GNUNET_NO,
+ UINT32_MAX,
+ REPLY_TIMEOUT,
+ dh->msize,
+ ©_reply,
+ dh->pm);
GNUNET_free (dh);
}
pm->type = htonl (type);
pm->expiration = GNUNET_TIME_absolute_hton (expiration);
memcpy (&pm[1], data, data_len);
- if ((UINT32_MAX != reply_anonymity_level) && (0 != reply_anonymity_level) &&
- (GNUNET_YES == GSF_enable_randomized_delays))
+ if ( (UINT32_MAX != reply_anonymity_level) &&
+ (0 != reply_anonymity_level) &&
+ (GNUNET_YES == GSF_enable_randomized_delays) )
{
struct GSF_DelayedHandle *dh;
dh->cp = cp;
dh->pm = pm;
dh->msize = msize;
- GNUNET_CONTAINER_DLL_insert (cp->delayed_head, cp->delayed_tail, dh);
+ GNUNET_CONTAINER_DLL_insert (cp->delayed_head,
+ cp->delayed_tail,
+ dh);
+ cp->delay_queue_size++;
dh->delay_task =
GNUNET_SCHEDULER_add_delayed (get_randomized_delay (),
- &transmit_delayed_now, dh);
+ &transmit_delayed_now,
+ dh);
}
else
{
- (void) GSF_peer_transmit_ (cp, GNUNET_NO, UINT32_MAX, REPLY_TIMEOUT, msize,
- ©_reply, pm);
+ (void) GSF_peer_transmit_ (cp,
+ GNUNET_NO,
+ UINT32_MAX,
+ REPLY_TIMEOUT,
+ msize,
+ ©_reply,
+ pm);
}
if (GNUNET_BLOCK_EVALUATION_OK_LAST != eval)
return;
enum GNUNET_BLOCK_Type type;
GNUNET_PEER_Id spid;
- GNUNET_assert (other != NULL);
msize = ntohs (message->size);
if (msize < sizeof (struct GetMessage))
{
}
GNUNET_STATISTICS_update (GSF_stats,
gettext_noop
- ("# GET requests received (from other peers)"), 1,
+ ("# GET requests received (from other peers)"),
+ 1,
GNUNET_NO);
gm = (const struct GetMessage *) message;
type = ntohl (gm->type);
{
if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Failed to find RETURN-TO peer `%4s' in connection set. Dropping query.\n",
+ "Failed to find RETURN-TO peer `%s' in connection set. Dropping query.\n",
GNUNET_i2s (&opt[bits - 1]));
else
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Failed to find peer `%4s' in connection set. Dropping query.\n",
+ "Failed to find peer `%s' in connection set. Dropping query.\n",
GNUNET_i2s (other));
-#if INSANE_STATISTICS
GNUNET_STATISTICS_update (GSF_stats,
gettext_noop
("# requests dropped due to missing reverse route"),
- 1, GNUNET_NO);
-#endif
+ 1,
+ GNUNET_NO);
+ return NULL;
+ }
+ if (cp->ppd.pending_replies + cp->delay_queue_size > 128)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Peer `%s' has too many replies queued already. Dropping query.\n",
+ GNUNET_i2s (other));
+ GNUNET_STATISTICS_update (GSF_stats,
+ gettext_noop ("# requests dropped due to full reply queue"),
+ 1,
+ GNUNET_NO);
return NULL;
}
/* note that we can really only check load here since otherwise
* peers could find out that we are overloaded by not being
* disconnected after sending us a malformed query... */
- priority = bound_priority (ntohl (gm->priority), cps);
+ priority = bound_priority (ntohl (gm->priority),
+ cps);
if (priority < 0)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
return NULL;
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received request for `%s' of type %u from peer `%4s' with flags %u\n",
+ "Received request for `%s' of type %u from peer `%s' with flags %u\n",
GNUNET_h2s (&gm->query),
(unsigned int) type,
GNUNET_i2s (other),
"Timeout trying to transmit to other peer\n");
pth->timeout_task = NULL;
cp = pth->cp;
- GNUNET_CONTAINER_DLL_remove (cp->pth_head, cp->pth_tail, pth);
+ GNUNET_CONTAINER_DLL_remove (cp->pth_head,
+ cp->pth_tail,
+ pth);
if (GNUNET_YES == pth->is_query)
GNUNET_assert (0 < cp->ppd.pending_queries--);
else if (GNUNET_NO == pth->is_query)
prev = pos;
pos = pos->next;
}
- GNUNET_CONTAINER_DLL_insert_after (cp->pth_head, cp->pth_tail, prev, pth);
+ GNUNET_CONTAINER_DLL_insert_after (cp->pth_head,
+ cp->pth_tail,
+ prev,
+ pth);
if (GNUNET_YES == is_query)
cp->ppd.pending_queries++;
else if (GNUNET_NO == is_query)
cp->ppd.pending_replies++;
- pth->timeout_task =
- GNUNET_SCHEDULER_add_delayed (timeout, &peer_transmit_timeout, pth);
+ pth->timeout_task
+ = GNUNET_SCHEDULER_add_delayed (timeout,
+ &peer_transmit_timeout,
+ pth);
schedule_transmission (pth);
return pth;
}
pth->timeout_task = NULL;
}
cp = pth->cp;
- GNUNET_CONTAINER_DLL_remove (cp->pth_head, cp->pth_tail, pth);
+ GNUNET_CONTAINER_DLL_remove (cp->pth_head,
+ cp->pth_tail,
+ pth);
if (GNUNET_YES == pth->is_query)
GNUNET_assert (0 < cp->ppd.pending_queries--);
else if (GNUNET_NO == pth->is_query)
GNUNET_SCHEDULER_cancel (pth->timeout_task);
pth->timeout_task = NULL;
}
- GNUNET_CONTAINER_DLL_remove (cp->pth_head, cp->pth_tail, pth);
+ GNUNET_CONTAINER_DLL_remove (cp->pth_head,
+ cp->pth_tail,
+ pth);
+ if (GNUNET_YES == pth->is_query)
+ GNUNET_assert (0 < cp->ppd.pending_queries--);
+ else if (GNUNET_NO == pth->is_query)
+ GNUNET_assert (0 < cp->ppd.pending_replies--);
pth->gmc (pth->gmc_cls, 0, NULL);
GNUNET_free (pth);
}
while (NULL != (dh = cp->delayed_head))
{
- GNUNET_CONTAINER_DLL_remove (cp->delayed_head, cp->delayed_tail, dh);
+ GNUNET_CONTAINER_DLL_remove (cp->delayed_head,
+ cp->delayed_tail,
+ dh);
+ cp->delay_queue_size--;
GNUNET_SCHEDULER_cancel (dh->delay_task);
GNUNET_free (dh->pm);
GNUNET_free (dh);
GNUNET_SCHEDULER_cancel (cp->mig_revive_task);
cp->mig_revive_task = NULL;
}
+ GNUNET_break (0 == cp->ppd.pending_queries);
+ GNUNET_break (0 == cp->ppd.pending_replies);
GNUNET_free (cp);
}