*/
struct GNUNET_TIME_Relative retransmit_timeout;
+ /**
+ * The Acknowledgement Bitmap
+ */
+ GNUNET_STREAM_AckBitmap ack_bitmap;
+
+ /**
+ * Time when the Acknowledgement was queued
+ */
+ struct GNUNET_TIME_Absolute ack_time_registered;
+
+ /**
+ * Queued Acknowledgement deadline
+ */
+ struct GNUNET_TIME_Relative ack_time_deadline;
+
+ /**
+ * The task for sending timely Acks
+ */
+ GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
+
/**
* The mesh handle
*/
uint32_t read_sequence_number;
/**
- * receiver's available buffer
+ * receiver's available buffer after the last acknowledged packet
*/
uint32_t receive_window_available;
};
}
+/**
+ * Callback function for sending ack message
+ *
+ * @param cls closure the ACK message created in ack_task
+ * @param size number of bytes available in buffer
+ * @param buf where the callee should write the message
+ * @return number of bytes written to buf
+ */
+static size_t
+send_ack_notify (void *cls, size_t size, void *buf)
+{
+ struct GNUNET_STREAM_AckMessage *ack_msg = cls;
+
+ if (0 == size)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%s called with size 0\n", __func__);
+ return 0;
+ }
+ GNUNET_assert (ack_msg->header.header.size <= size);
+
+ size = ack_msg->header.header.size;
+ memcpy (buf, ack_msg, size);
+ return size;
+}
+
+
+/**
+ * Task for sending ACK message
+ *
+ * @param cls the socket
+ * @param tc the Task context
+ */
+static void
+ack_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct GNUNET_STREAM_Socket *socket = cls;
+ struct GNUNET_STREAM_AckMessage *ack_msg;
+
+ if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
+ {
+ return;
+ }
+
+ socket->ack_task_id = 0;
+
+ /* Create the ACK Message */
+ ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
+ ack_msg->header.header.size = htons (sizeof (struct
+ GNUNET_STREAM_AckMessage));
+ ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
+ ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
+ ack_msg->base_sequence_number = htonl (socket->write_sequence_number);
+ ack_msg->receive_window_remaining = htonl (socket->receive_window_available);
+
+ /* Request MESH for sending ACK */
+ GNUNET_MESH_notify_transmit_ready (socket->tunnel,
+ 0, /* Corking */
+ 1, /* Priority */
+ socket->retransmit_timeout,
+ &socket->other_peer,
+ ntohs (ack_msg->header.header.size),
+ &send_ack_notify,
+ ack_msg);
+
+
+}
+
+
/**
* Function to modify a bit in GNUNET_STREAM_AckBitmap
*
* MAX_PACKET_SIZE,
payload,
size);
- /* FIXME: We have to send GNUNET_STREAM_AckMessage */
+
+ /* Modify the ACK bitmap */
+ ackbitmap_modify_bit (&socket->bitmap,
+ ntohl (msg->sequence_number) -
+ socket->read_sequence_number,
+ GNUNET_YES);
+
+ /* Start ACK sending task if one is not already present */
+ if (0 == socket->ack_task_id)
+ {
+ socket->ack_task_id =
+ GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
+ (msg->ack_deadline),
+ &ack_task,
+ socket);
+ }
+
break;
default:
unsigned int num_needed_packets;
unsigned int packet;
struct GNUNET_STREAM_IOHandle *io_handle;
- struct GNUNET_STREAM_DataMessage *data_msg;
size_t packet_size;
+ struct GNUNET_STREAM_DataMessage *data_msg;
const void *sweep;
/* There is already a write request pending */