};
+
+
/**
* The IO Handle
*/
ack_packet = -1;
/* Find the last acknowledged packet */
for (packet=0; packet < 64; packet++)
- {
+ {
if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
packet))
- {
- ack_packet = packet;
- }
+ ack_packet = packet;
+ else if (NULL == io_handle->messages[packet])
+ break;
}
/* Resend packets which weren't ack'ed */
for (packet=0; packet < ack_packet; packet++)
}
packet = ack_packet + 1;
/* Now send new packets if there is enough buffer space */
- while (io_handle->receive_window_available -=
- io_handle->messages[packet]->header.header.size > 0)
+ while ( (NULL != io_handle->messages[packet]) &&
+ (io_handle->receive_window_available >= ntohs (io_handle->messages[packet]->header.header.size)) )
{
+ io_handle->receive_window_available -= ntohs (io_handle->messages[packet]->header.header.size);
queue_message (socket,
&io_handle->messages[packet]->header,
&write_data_finish_cb,
io_handle);
+ packet++;
}
}
const void *sweep;
/* There is already a write request pending */
- if (NULL != socket->write_handle) return NULL;
- if (!(STATE_ESTABLISHED == socket->state
- || STATE_RECEIVE_CLOSE_WAIT == socket->state
- || STATE_RECEIVE_CLOSED == socket->state))
+ if (NULL != socket->write_handle)
+ {
+ GNUNET_break (0);
+ return NULL;
+ }
+ if (!((STATE_ESTABLISHED == socket->state)
+ || (STATE_RECEIVE_CLOSE_WAIT == socket->state)
+ || (STATE_RECEIVE_CLOSED == socket->state)))
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Attempting to write on a closed (OR) not-yet-established"
"stream\n");
return NULL;
- }
-
- num_needed_packets = ceil (size / max_payload_size);
- if (64 < num_needed_packets)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Given buffer cannot be accommodated in 64 packets\n");
- num_needed_packets = 64;
- }
-
+ }
+ if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size)
+ size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size;
+ num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size;
io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOHandle));
io_handle->receive_window_available = socket->receive_window_available;
sweep = data;