#define DEBUG_FS GNUNET_NO
+/**
+ * Should we introduce random latency in processing? Required for proper
+ * implementation of GAP, but can be disabled for performance evaluation of
+ * the basic routing algorithm.
+ */
+#define SUPPORT_DELAYS GNUNET_NO
+
/**
* Maximum number of outgoing messages we queue per peer.
*/
*/
void *cont_cls;
+ /**
+ * Do not transmit this pending message until this deadline.
+ */
+ struct GNUNET_TIME_Absolute delay_until;
+
/**
* Size of the reply; actual reply message follows
* at the end of this struct.
*/
struct GNUNET_TIME_Absolute last_transmission_request_start;
+ /**
+ * ID of delay task for scheduling transmission.
+ */
+ GNUNET_SCHEDULER_TaskIdentifier delayed_transmission_request_task;
+
/**
* Average priority of successful replies. Calculated
* as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n
}
/* consider scheduling transmission to cp for content migration */
- if (cp->cth != NULL)
+ if (cp->cth != NULL)
return GNUNET_YES;
msize = 0;
pos = mig_head;
msize,
GNUNET_h2s (key));
#endif
+ if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
+ {
+ GNUNET_SCHEDULER_cancel (sched, cp->delayed_transmission_request_task);
+ cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
+ }
cp->cth
= GNUNET_CORE_notify_transmit_ready (core,
0, GNUNET_TIME_UNIT_FOREVER_REL,
TransmissionContinuation cont;
void *cont_cls;
+ cont = pm->cont;
+ cont_cls = pm->cont_cls;
if (pml != NULL)
{
GNUNET_assert (pml->pm == pm);
GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) );
- cont = pm->cont;
- cont_cls = pm->cont_cls;
destroy_pending_message_list_entry (pml);
}
else
GNUNET_PEER_change_rc (cp->pid, -1);
GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
if (NULL != cp->cth)
- GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
+ {
+ GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
+ cp->cth = NULL;
+ }
+ if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
+ {
+ GNUNET_SCHEDULER_cancel (sched, cp->delayed_transmission_request_task);
+ cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
+ }
while (NULL != (pm = cp->pending_messages_head))
destroy_pending_message (pm, 0 /* delivery failed */);
GNUNET_LOAD_value_free (cp->transmission_delay);
/* ******************* Utility functions ******************** */
+/**
+ * We've had to delay a request for transmission to core, but now
+ * we should be ready. Run it.
+ *
+ * @param cls the 'struct ConnectedPeer' for which a request was delayed
+ * @param tc task context (unused)
+ */
+static void
+delayed_transmission_request (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct ConnectedPeer *cp = cls;
+ struct GNUNET_PeerIdentity pid;
+ struct PendingMessage *pm;
+
+ pm = cp->pending_messages_head;
+ cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
+ GNUNET_assert (cp->cth == NULL);
+ if (pm == NULL)
+ return;
+ GNUNET_PEER_resolve (cp->pid,
+ &pid);
+ cp->last_transmission_request_start = GNUNET_TIME_absolute_get ();
+ cp->cth = GNUNET_CORE_notify_transmit_ready (core,
+ pm->priority,
+ GNUNET_CONSTANTS_SERVICE_TIMEOUT,
+ &pid,
+ pm->msize,
+ &transmit_to_peer,
+ cp);
+}
+
+
/**
* Transmit messages by copying it to the target buffer
* "buf". "buf" will be NULL and "size" zero if the socket was closed
{
struct ConnectedPeer *cp = cls;
char *cbuf = buf;
- struct GNUNET_PeerIdentity pid;
struct PendingMessage *pm;
+ struct PendingMessage *next_pm;
+ struct GNUNET_TIME_Absolute now;
+ struct GNUNET_TIME_Relative min_delay;
struct MigrationReadyBlock *mb;
struct MigrationReadyBlock *next;
struct PutMessage migm;
size_t msize;
unsigned int i;
+ struct GNUNET_PeerIdentity pid;
cp->cth = NULL;
if (NULL == buf)
GNUNET_LOAD_update (cp->transmission_delay,
UINT64_MAX);
return 0;
- }
+ }
GNUNET_LOAD_update (cp->transmission_delay,
GNUNET_TIME_absolute_get_duration (cp->last_transmission_request_start).value);
+ now = GNUNET_TIME_absolute_get ();
msize = 0;
- while ( (NULL != (pm = cp->pending_messages_head) ) &&
+ min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
+ next_pm = cp->pending_messages_head;
+ while ( (NULL != (pm = next_pm) ) &&
(pm->msize <= size) )
{
+ next_pm = pm->next;
+ if (pm->delay_until.value > now.value)
+ {
+ min_delay = GNUNET_TIME_relative_min (min_delay,
+ GNUNET_TIME_absolute_get_remaining (pm->delay_until));
+ continue;
+ }
memcpy (&cbuf[msize], &pm[1], pm->msize);
msize += pm->msize;
size -= pm->msize;
+ GNUNET_CONTAINER_DLL_remove (cp->pending_messages_head,
+ cp->pending_messages_tail,
+ pm);
+ if (NULL == pm->pml)
+ cp->pending_requests--;
destroy_pending_message (pm, cp->pid);
}
- if (NULL != pm)
- {
- GNUNET_PEER_resolve (cp->pid,
- &pid);
- cp->last_transmission_request_start = GNUNET_TIME_absolute_get ();
- cp->cth = GNUNET_CORE_notify_transmit_ready (core,
- pm->priority,
- GNUNET_CONSTANTS_SERVICE_TIMEOUT,
- &pid,
- pm->msize,
- &transmit_to_peer,
- cp);
+ if (pm != NULL)
+ min_delay = GNUNET_TIME_UNIT_ZERO;
+ if (NULL != cp->pending_messages_head)
+ {
+ GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == cp->delayed_transmission_request_task);
+ cp->delayed_transmission_request_task
+ = GNUNET_SCHEDULER_add_delayed (sched,
+ min_delay,
+ &delayed_transmission_request,
+ cp);
}
- else
+ if (pm == NULL)
{
+ GNUNET_PEER_resolve (cp->pid,
+ &pid);
next = mig_head;
while (NULL != (mb = next))
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Pushing migration block `%s' (%u bytes) to `%s'\n",
GNUNET_h2s (&mb->query),
- mb->size,
+ (unsigned int) mb->size,
GNUNET_i2s (&pid));
#endif
break;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Migration block `%s' (%u bytes) is not on migration list for peer `%s'\n",
GNUNET_h2s (&mb->query),
- mb->size,
+ (unsigned int) mb->size,
GNUNET_i2s (&pid));
#endif
}
}
#if DEBUG_FS
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Transmitting %u bytes to peer %u\n",
- msize,
- cp->pid);
+ "Transmitting %u bytes to peer with PID %u\n",
+ (unsigned int) msize,
+ (unsigned int) cp->pid);
#endif
return msize;
}
destroy_pending_message (cp->pending_messages_tail, 0);
GNUNET_PEER_resolve (cp->pid, &pid);
if (NULL != cp->cth)
- GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
+ {
+ GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
+ cp->cth = NULL;
+ }
+ if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
+ {
+ GNUNET_SCHEDULER_cancel (sched, cp->delayed_transmission_request_task);
+ cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
+ }
/* need to schedule transmission */
cp->last_transmission_request_start = GNUNET_TIME_absolute_get ();
cp->cth = GNUNET_CORE_notify_transmit_ready (core,
reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
reply->cont = &transmit_reply_continuation;
reply->cont_cls = pr;
+#if SUPPORT_DELAYS
+ reply->delay_until
+ = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
+ GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+ TTL_DECREMENT)));
+#endif
reply->msize = msize;
reply->priority = UINT32_MAX; /* send replies first! */
pm = (struct PutMessage*) &reply[1];
prq.priority = priority;
prq.finished = GNUNET_NO;
prq.request_found = GNUNET_NO;
- process_reply (&prq, key, pr);
if ( (old_rf == 0) &&
- (pr->results_found == 1) )
+ (pr->results_found == 0) )
update_datastore_delays (pr->start_time);
+ process_reply (&prq, key, pr);
if (prq.finished == GNUNET_YES)
return;
if (pr->qe == NULL)