*/
int is_ready;
+ /**
+ * Sending consumed more bytes on wire than payload was announced
+ * This overhead is added to the delay of next sending operation
+ */
+ size_t traffic_overhead;
};
n->id = *pid;
n->h = h;
n->is_ready = GNUNET_YES;
+ n->traffic_overhead = 0;
GNUNET_BANDWIDTH_tracker_init (&n->out_tracker,
GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
MAX_BANDWIDTH_CARRY_S);
GNUNET_assert (GNUNET_OK ==
GNUNET_CONTAINER_multihashmap_put (h->neighbours,
- &pid->hashPubKey, n,
+ &n->id.hashPubKey, n,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
return n;
}
struct GNUNET_PeerIdentity me;
uint16_t size;
uint32_t ats_count;
+ uint32_t bytes_msg;
+ uint32_t bytes_physical;
GNUNET_assert (h->client != NULL);
if (msg == NULL)
break;
}
okm = (const struct SendOkMessage *) msg;
+ bytes_msg = ntohl (okm->bytes_msg);
+ bytes_physical = ntohl (okm->bytes_physical);
LOG (GNUNET_ERROR_TYPE_DEBUG, "Receiving `%s' message, transmission %s.\n",
"SEND_OK", ntohl (okm->success) == GNUNET_OK ? "succeeded" : "failed");
+
n = neighbour_find (h, &okm->peer);
if (n == NULL)
break;
+
+ if (bytes_physical >= bytes_msg)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Overhead for %u byte message: %u \n",
+ bytes_msg, bytes_physical - bytes_msg);
+ n->traffic_overhead += bytes_physical - bytes_msg;
+ }
GNUNET_break (GNUNET_NO == n->is_ready);
n->is_ready = GNUNET_YES;
if ((n->th != NULL) && (n->hn == NULL))
if (NULL != h->control_head)
delay = GNUNET_TIME_UNIT_ZERO;
else if (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap)))
+ {
delay =
GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker,
- n->th->notify_size);
+ n->th->notify_size + n->traffic_overhead);
+ n->traffic_overhead = 0;
+ }
else
return; /* no work to be done */
LOG (GNUNET_ERROR_TYPE_DEBUG,
const struct GNUNET_PeerIdentity *target)
{
struct GNUNET_PeerIdentity *pid;
-
if (NULL == handle->client)
- return;
+ {
+ /* FIXME: handle->client can be NULL when transport api is reconnecting */
+ GNUNET_break (0);
+ return;
+ }
+
pid = GNUNET_malloc (sizeof (struct GNUNET_PeerIdentity));
*pid = *target;
schedule_control_transmit (handle,
}
+struct SendHelloContext
+{
+ GNUNET_SCHEDULER_Task cont;
+
+ void *cls;
+
+ struct GNUNET_MessageHeader *msg;
+};
+
/**
* Send HELLO message to the service.
*
static size_t
send_hello (void *cls, size_t size, void *buf)
{
- struct GNUNET_MessageHeader *msg = cls;
+ struct SendHelloContext *shc = cls;
+ struct GNUNET_MessageHeader *msg = shc->msg;
uint16_t ssize;
+ struct GNUNET_SCHEDULER_TaskContext tc;
+ tc.read_ready = NULL;
+ tc.write_ready = NULL;
+ tc.reason = GNUNET_SCHEDULER_REASON_TIMEOUT;
if (buf == NULL)
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Timeout while trying to transmit `%s' request.\n", "HELLO");
+ if (NULL != shc->cont)
+ shc->cont (shc->cls, &tc);
GNUNET_free (msg);
+ GNUNET_free (shc);
return 0;
}
LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmitting `%s' request.\n", "HELLO");
GNUNET_assert (size >= ssize);
memcpy (buf, msg, ssize);
GNUNET_free (msg);
+ tc.reason = GNUNET_SCHEDULER_REASON_READ_READY;
+ if (NULL != shc->cont)
+ shc->cont (shc->cls, &tc);
+ GNUNET_free (shc);
return ssize;
}
*
* @param handle connection to transport service
* @param hello the hello message
- * @param cont continuation to call when HELLO has been sent
+ * @param cont continuation to call when HELLO has been sent,
+ * tc reason GNUNET_SCHEDULER_REASON_TIMEOUT for fail
+ * tc reasong GNUNET_SCHEDULER_REASON_READY for success
* @param cls closure for continuation
*
*/
uint16_t size;
struct GNUNET_PeerIdentity peer;
struct GNUNET_MessageHeader *msg;
+ struct SendHelloContext * shc;
+ struct GNUNET_SCHEDULER_TaskContext tc;
+
+ tc.read_ready = NULL;
+ tc.write_ready = NULL;
+ tc.reason = GNUNET_SCHEDULER_REASON_TIMEOUT;
if (NULL == handle->client)
+ {
+ if (NULL != cont)
+ cont (cls, &tc);
return;
+ }
GNUNET_break (ntohs (hello->type) == GNUNET_MESSAGE_TYPE_HELLO);
size = ntohs (hello->size);
GNUNET_break (size >= sizeof (struct GNUNET_MessageHeader));
GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) hello, &peer))
{
GNUNET_break (0);
+ if (NULL != cont)
+ if (NULL != cont)
+ cont (cls, &tc);
return;
}
msg = GNUNET_malloc (size);
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Offering `%s' message of `%4s' to transport for validation.\n", "HELLO",
GNUNET_i2s (&peer));
- schedule_control_transmit (handle, size, &send_hello, msg);
+ shc = GNUNET_malloc (sizeof (struct SendHelloContext));
+ shc->msg = msg;
+ shc->cont = cont;
+ shc->cls = cls;
+ schedule_control_transmit (handle, size, &send_hello, shc);
}
ret->nd_cb = nd;
ret->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
ret->neighbours =
- GNUNET_CONTAINER_multihashmap_create (STARTING_NEIGHBOURS_SIZE);
+ GNUNET_CONTAINER_multihashmap_create (STARTING_NEIGHBOURS_SIZE, GNUNET_YES);
ret->ready_heap =
GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
- ret->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, ret);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to transport service.\n");
+ ret->client = GNUNET_CLIENT_connect ("transport", cfg);
+ if (ret->client == NULL)
+ {
+ GNUNET_free (ret);
+ return NULL;
+ }
+ schedule_control_transmit (ret, sizeof (struct StartMessage), &send_start, ret);
return ret;
}
th->priority = priority;
n->th = th;
/* calculate when our transmission should be ready */
- delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker, size);
+ delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker, size + n->traffic_overhead);
+ n->traffic_overhead = 0;
if (delay.rel_value > timeout.rel_value)
delay.rel_value = 0; /* notify immediately (with failure) */
LOG (GNUNET_ERROR_TYPE_DEBUG,