2 This file is part of GNUnet.
3 (C) 2012 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
23 * Checks for matching the sender and socket->other_peer in server
26 * Decrement PEER intern count during socket close and listen close to free the
27 * memory used for PEER interning
29 * Add code for write io timeout
31 * Include retransmission for control messages
35 * @file stream/stream_api.c
36 * @brief Implementation of the stream library
37 * @author Sree Harsha Totakura
42 #include "gnunet_common.h"
43 #include "gnunet_crypto_lib.h"
44 #include "gnunet_stream_lib.h"
45 #include "gnunet_testing_lib.h"
46 #include "stream_protocol.h"
48 #define LOG(kind,...) \
49 GNUNET_log_from (kind, "stream-api", __VA_ARGS__)
52 * The maximum packet size of a stream packet
54 #define MAX_PACKET_SIZE 64000
59 #define RECEIVE_BUFFER_SIZE 4096000
62 * The maximum payload a data message packet can carry
64 static size_t max_payload_size =
65 MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
68 * states in the Protocol
73 * Client initialization state
78 * Listener initialization state
83 * Pre-connection establishment state
88 * State where a connection has been established
93 * State where the socket is closed on our side and waiting to be ACK'ed
95 STATE_RECEIVE_CLOSE_WAIT,
98 * State where the socket is closed for reading
100 STATE_RECEIVE_CLOSED,
103 * State where the socket is closed on our side and waiting to be ACK'ed
105 STATE_TRANSMIT_CLOSE_WAIT,
108 * State where the socket is closed for writing
110 STATE_TRANSMIT_CLOSED,
113 * State where the socket is closed on our side and waiting to be ACK'ed
118 * State where the socket is closed
125 * Functions of this type are called when a message is written
127 * @param cls the closure from queue_message
128 * @param socket the socket the written message was bound to
130 typedef void (*SendFinishCallback) (void *cls,
131 struct GNUNET_STREAM_Socket *socket);
135 * The send message queue
142 struct GNUNET_STREAM_MessageHeader *message;
145 * Callback to be called when the message is sent
147 SendFinishCallback finish_cb;
150 * The closure for finish_cb
155 * The next message in queue. Should be NULL in the last message
157 struct MessageQueue *next;
160 * The next message in queue. Should be NULL in the first message
162 struct MessageQueue *prev;
167 * The STREAM Socket Handler
169 struct GNUNET_STREAM_Socket
172 * Retransmission timeout
174 struct GNUNET_TIME_Relative retransmit_timeout;
177 * The Acknowledgement Bitmap
179 GNUNET_STREAM_AckBitmap ack_bitmap;
182 * Time when the Acknowledgement was queued
184 struct GNUNET_TIME_Absolute ack_time_registered;
187 * Queued Acknowledgement deadline
189 struct GNUNET_TIME_Relative ack_time_deadline;
194 struct GNUNET_MESH_Handle *mesh;
197 * The mesh tunnel handle
199 struct GNUNET_MESH_Tunnel *tunnel;
202 * Stream open closure
207 * Stream open callback
209 GNUNET_STREAM_OpenCallback open_cb;
212 * The current transmit handle (if a pending transmit request exists)
214 struct GNUNET_MESH_TransmitHandle *transmit_handle;
217 * The current act transmit handle (if a pending ack transmit request exists)
219 struct GNUNET_MESH_TransmitHandle *ack_transmit_handle;
222 * Pointer to the current ack message using in ack_task
224 struct GNUNET_STREAM_AckMessage *ack_msg;
227 * The current message associated with the transmit handle
229 struct MessageQueue *queue_head;
232 * The queue tail, should always point to the last message in queue
234 struct MessageQueue *queue_tail;
237 * The write IO_handle associated with this socket
239 struct GNUNET_STREAM_IOWriteHandle *write_handle;
242 * The read IO_handle associated with this socket
244 struct GNUNET_STREAM_IOReadHandle *read_handle;
247 * The shutdown handle associated with this socket
249 struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
252 * Buffer for storing received messages
254 void *receive_buffer;
257 * The listen socket from which this socket is derived. Should be NULL if it
258 * is not a derived socket
260 struct GNUNET_STREAM_ListenSocket *lsocket;
263 * Task identifier for the read io timeout task
265 GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id;
268 * Task identifier for retransmission task after timeout
270 GNUNET_SCHEDULER_TaskIdentifier retransmission_timeout_task_id;
273 * The task for sending timely Acks
275 GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
278 * Task scheduled to continue a read operation.
280 GNUNET_SCHEDULER_TaskIdentifier read_task_id;
283 * The state of the protocol associated with this socket
288 * The status of the socket
290 enum GNUNET_STREAM_Status status;
293 * The number of previous timeouts; FIXME: currently not used
295 unsigned int retries;
298 * The peer identity of the peer at the other end of the stream
300 GNUNET_PEER_Id other_peer;
303 * The application port number (type: uint32_t)
305 GNUNET_MESH_ApplicationType app_port;
308 * The session id associated with this stream connection
309 * FIXME: Not used currently, may be removed
314 * Write sequence number. Set to random when sending HELLO(client) and
317 uint32_t write_sequence_number;
320 * Read sequence number. This number's value is determined during handshake
322 uint32_t read_sequence_number;
325 * The receiver buffer size
327 uint32_t receive_buffer_size;
330 * The receiver buffer boundaries
332 uint32_t receive_buffer_boundaries[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
335 * receiver's available buffer after the last acknowledged packet
337 uint32_t receiver_window_available;
340 * The offset pointer used during write operation
342 uint32_t write_offset;
345 * The offset after which we are expecting data
347 uint32_t read_offset;
350 * The offset upto which user has read from the received buffer
352 uint32_t copy_offset;
357 * A socket for listening
359 struct GNUNET_STREAM_ListenSocket
364 struct GNUNET_MESH_Handle *mesh;
367 * The callback function which is called after successful opening socket
369 GNUNET_STREAM_ListenCallback listen_cb;
372 * The call back closure
378 * FIXME: Remove if not required!
380 GNUNET_MESH_ApplicationType port;
385 * The IO Write Handle
387 struct GNUNET_STREAM_IOWriteHandle
390 * The socket to which this write handle is associated
392 struct GNUNET_STREAM_Socket *socket;
395 * The packet_buffers associated with this Handle
397 struct GNUNET_STREAM_DataMessage *messages[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
400 * The write continuation callback
402 GNUNET_STREAM_CompletionContinuation write_cont;
405 * Write continuation closure
407 void *write_cont_cls;
410 * The bitmap of this IOHandle; Corresponding bit for a message is set when
411 * it has been acknowledged by the receiver
413 GNUNET_STREAM_AckBitmap ack_bitmap;
416 * Number of bytes in this write handle
425 struct GNUNET_STREAM_IOReadHandle
428 * Callback for the read processor
430 GNUNET_STREAM_DataProcessor proc;
433 * The closure pointer for the read processor callback
440 * Handle for Shutdown
442 struct GNUNET_STREAM_ShutdownHandle
445 * The socket associated with this shutdown handle
447 struct GNUNET_STREAM_Socket *socket;
450 * Shutdown completion callback
452 GNUNET_STREAM_ShutdownCompletion completion_cb;
455 * Closure for completion callback
457 void *completion_cls;
460 * Close message retransmission task id
462 GNUNET_SCHEDULER_TaskIdentifier close_msg_retransmission_task_id;
465 * Which operation to shutdown? SHUT_RD, SHUT_WR or SHUT_RDWR
472 * Default value in seconds for various timeouts
474 static unsigned int default_timeout = 10;
478 * Callback function for sending queued message
480 * @param cls closure the socket
481 * @param size number of bytes available in buf
482 * @param buf where the callee should write the message
483 * @return number of bytes written to buf
486 send_message_notify (void *cls, size_t size, void *buf)
488 struct GNUNET_STREAM_Socket *socket = cls;
489 struct GNUNET_PeerIdentity target;
490 struct MessageQueue *head;
493 socket->transmit_handle = NULL; /* Remove the transmit handle */
494 head = socket->queue_head;
496 return 0; /* just to be safe */
497 GNUNET_PEER_resolve (socket->other_peer, &target);
498 if (0 == size) /* request timed out */
501 LOG (GNUNET_ERROR_TYPE_DEBUG,
502 "Message sending timed out. Retry %d \n",
504 socket->transmit_handle =
505 GNUNET_MESH_notify_transmit_ready (socket->tunnel,
508 /* FIXME: exponential backoff */
509 socket->retransmit_timeout,
511 ntohs (head->message->header.size),
512 &send_message_notify,
517 ret = ntohs (head->message->header.size);
518 GNUNET_assert (size >= ret);
519 memcpy (buf, head->message, ret);
520 if (NULL != head->finish_cb)
522 head->finish_cb (head->finish_cb_cls, socket);
524 GNUNET_CONTAINER_DLL_remove (socket->queue_head,
527 GNUNET_free (head->message);
529 head = socket->queue_head;
530 if (NULL != head) /* more pending messages to send */
533 socket->transmit_handle =
534 GNUNET_MESH_notify_transmit_ready (socket->tunnel,
537 /* FIXME: exponential backoff */
538 socket->retransmit_timeout,
540 ntohs (head->message->header.size),
541 &send_message_notify,
549 * Queues a message for sending using the mesh connection of a socket
551 * @param socket the socket whose mesh connection is used
552 * @param message the message to be sent
553 * @param finish_cb the callback to be called when the message is sent
554 * @param finish_cb_cls the closure for the callback
557 queue_message (struct GNUNET_STREAM_Socket *socket,
558 struct GNUNET_STREAM_MessageHeader *message,
559 SendFinishCallback finish_cb,
562 struct MessageQueue *queue_entity;
563 struct GNUNET_PeerIdentity target;
566 ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA)
567 && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK));
569 LOG (GNUNET_ERROR_TYPE_DEBUG,
570 "Queueing message of type %d and size %d\n",
571 ntohs (message->header.type),
572 ntohs (message->header.size));
573 GNUNET_assert (NULL != message);
574 queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
575 queue_entity->message = message;
576 queue_entity->finish_cb = finish_cb;
577 queue_entity->finish_cb_cls = finish_cb_cls;
578 GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
581 if (NULL == socket->transmit_handle)
584 GNUNET_PEER_resolve (socket->other_peer, &target);
585 socket->transmit_handle =
586 GNUNET_MESH_notify_transmit_ready (socket->tunnel,
589 socket->retransmit_timeout,
591 ntohs (message->header.size),
592 &send_message_notify,
599 * Copies a message and queues it for sending using the mesh connection of
602 * @param socket the socket whose mesh connection is used
603 * @param message the message to be sent
604 * @param finish_cb the callback to be called when the message is sent
605 * @param finish_cb_cls the closure for the callback
608 copy_and_queue_message (struct GNUNET_STREAM_Socket *socket,
609 const struct GNUNET_STREAM_MessageHeader *message,
610 SendFinishCallback finish_cb,
613 struct GNUNET_STREAM_MessageHeader *msg_copy;
616 size = ntohs (message->header.size);
617 msg_copy = GNUNET_malloc (size);
618 memcpy (msg_copy, message, size);
619 queue_message (socket, msg_copy, finish_cb, finish_cb_cls);
624 * Callback function for sending ack message
626 * @param cls closure the ACK message created in ack_task
627 * @param size number of bytes available in buffer
628 * @param buf where the callee should write the message
629 * @return number of bytes written to buf
632 send_ack_notify (void *cls, size_t size, void *buf)
634 struct GNUNET_STREAM_Socket *socket = cls;
638 LOG (GNUNET_ERROR_TYPE_DEBUG,
639 "%s called with size 0\n", __func__);
642 GNUNET_assert (ntohs (socket->ack_msg->header.header.size) <= size);
644 size = ntohs (socket->ack_msg->header.header.size);
645 memcpy (buf, socket->ack_msg, size);
647 GNUNET_free (socket->ack_msg);
648 socket->ack_msg = NULL;
649 socket->ack_transmit_handle = NULL;
654 * Writes data using the given socket. The amount of data written is limited by
655 * the receiver_window_size
657 * @param socket the socket to use
660 write_data (struct GNUNET_STREAM_Socket *socket);
663 * Task for retransmitting data messages if they aren't ACK before their ack
666 * @param cls the socket
667 * @param tc the Task context
670 retransmission_timeout_task (void *cls,
671 const struct GNUNET_SCHEDULER_TaskContext *tc)
673 struct GNUNET_STREAM_Socket *socket = cls;
675 if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
678 LOG (GNUNET_ERROR_TYPE_DEBUG,
679 "Retransmitting DATA...\n");
680 socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
686 * Task for sending ACK message
688 * @param cls the socket
689 * @param tc the Task context
693 const struct GNUNET_SCHEDULER_TaskContext *tc)
695 struct GNUNET_STREAM_Socket *socket = cls;
696 struct GNUNET_STREAM_AckMessage *ack_msg;
697 struct GNUNET_PeerIdentity target;
699 if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
704 socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
706 /* Create the ACK Message */
707 ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
708 ack_msg->header.header.size = htons (sizeof (struct
709 GNUNET_STREAM_AckMessage));
710 ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
711 ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
712 ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
713 ack_msg->receive_window_remaining =
714 htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
716 socket->ack_msg = ack_msg;
717 GNUNET_PEER_resolve (socket->other_peer, &target);
718 /* Request MESH for sending ACK */
719 socket->ack_transmit_handle =
720 GNUNET_MESH_notify_transmit_ready (socket->tunnel,
723 socket->retransmit_timeout,
725 ntohs (ack_msg->header.header.size),
732 * Retransmission task for shutdown messages
734 * @param cls the shutdown handle
735 * @param tc the Task Context
738 close_msg_retransmission_task (void *cls,
739 const struct GNUNET_SCHEDULER_TaskContext *tc)
741 struct GNUNET_STREAM_ShutdownHandle *shutdown_handle = cls;
742 struct GNUNET_STREAM_MessageHeader *msg;
743 struct GNUNET_STREAM_Socket *socket;
745 GNUNET_assert (NULL != shutdown_handle);
746 socket = shutdown_handle->socket;
748 msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
749 msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
750 switch (shutdown_handle->operation)
753 msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
756 msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
759 msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
763 shutdown_handle->close_msg_retransmission_task_id =
764 GNUNET_SCHEDULER_NO_TASK;
767 queue_message (socket, msg, NULL, NULL);
768 shutdown_handle->close_msg_retransmission_task_id =
769 GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
770 &close_msg_retransmission_task,
776 * Function to modify a bit in GNUNET_STREAM_AckBitmap
778 * @param bitmap the bitmap to modify
779 * @param bit the bit number to modify
780 * @param value GNUNET_YES to on, GNUNET_NO to off
783 ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
787 GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
788 if (GNUNET_YES == value)
789 *bitmap |= (1LL << bit);
791 *bitmap &= ~(1LL << bit);
796 * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
798 * @param bitmap address of the bitmap that has to be checked
799 * @param bit the bit number to check
800 * @return GNUNET_YES if the bit is set; GNUNET_NO if not
803 ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
806 GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
807 return 0 != (*bitmap & (1LL << bit));
812 * Writes data using the given socket. The amount of data written is limited by
813 * the receiver_window_size
815 * @param socket the socket to use
818 write_data (struct GNUNET_STREAM_Socket *socket)
820 struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
821 int packet; /* Although an int, should never be negative */
825 /* Find the last acknowledged packet */
826 for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
828 if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
831 else if (NULL == io_handle->messages[packet])
834 /* Resend packets which weren't ack'ed */
835 for (packet=0; packet < ack_packet; packet++)
837 if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
840 LOG (GNUNET_ERROR_TYPE_DEBUG,
841 "Placing DATA message with sequence %u in send queue\n",
842 ntohl (io_handle->messages[packet]->sequence_number));
844 copy_and_queue_message (socket,
845 &io_handle->messages[packet]->header,
850 packet = ack_packet + 1;
851 /* Now send new packets if there is enough buffer space */
852 while ( (NULL != io_handle->messages[packet]) &&
853 (socket->receiver_window_available
854 >= ntohs (io_handle->messages[packet]->header.header.size)) )
856 socket->receiver_window_available -=
857 ntohs (io_handle->messages[packet]->header.header.size);
858 LOG (GNUNET_ERROR_TYPE_DEBUG,
859 "Placing DATA message with sequence %u in send queue\n",
860 ntohl (io_handle->messages[packet]->sequence_number));
861 copy_and_queue_message (socket,
862 &io_handle->messages[packet]->header,
868 if (GNUNET_SCHEDULER_NO_TASK == socket->retransmission_timeout_task_id)
869 socket->retransmission_timeout_task_id =
870 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
871 (GNUNET_TIME_UNIT_SECONDS, 8),
872 &retransmission_timeout_task,
878 * Task for calling the read processor
880 * @param cls the socket
881 * @param tc the task context
884 call_read_processor (void *cls,
885 const struct GNUNET_SCHEDULER_TaskContext *tc)
887 struct GNUNET_STREAM_Socket *socket = cls;
889 size_t valid_read_size;
891 uint32_t sequence_increase;
892 uint32_t offset_increase;
894 socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
895 if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
898 if (NULL == socket->receive_buffer)
901 GNUNET_assert (NULL != socket->read_handle);
902 GNUNET_assert (NULL != socket->read_handle->proc);
904 /* Check the bitmap for any holes */
905 for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
907 if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
911 /* We only call read processor if we have the first packet */
912 GNUNET_assert (0 < packet);
915 socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
917 GNUNET_assert (0 != valid_read_size);
919 /* Cancel the read_io_timeout_task */
920 GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task_id);
921 socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
923 /* Call the data processor */
924 LOG (GNUNET_ERROR_TYPE_DEBUG,
925 "Calling read processor\n");
927 socket->read_handle->proc (socket->read_handle->proc_cls,
929 socket->receive_buffer + socket->copy_offset,
931 LOG (GNUNET_ERROR_TYPE_DEBUG,
932 "Read processor read %d bytes\n",
934 LOG (GNUNET_ERROR_TYPE_DEBUG,
935 "Read processor completed successfully\n");
937 /* Free the read handle */
938 GNUNET_free (socket->read_handle);
939 socket->read_handle = NULL;
941 GNUNET_assert (read_size <= valid_read_size);
942 socket->copy_offset += read_size;
944 /* Determine upto which packet we can remove from the buffer */
945 for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
947 if (socket->copy_offset == socket->receive_buffer_boundaries[packet])
949 if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
953 /* If no packets can be removed we can't move the buffer */
954 if (0 == packet) return;
956 sequence_increase = packet;
957 LOG (GNUNET_ERROR_TYPE_DEBUG,
958 "Sequence increase after read processor completion: %u\n",
961 /* Shift the data in the receive buffer */
962 memmove (socket->receive_buffer,
963 socket->receive_buffer
964 + socket->receive_buffer_boundaries[sequence_increase-1],
965 socket->receive_buffer_size
966 - socket->receive_buffer_boundaries[sequence_increase-1]);
968 /* Shift the bitmap */
969 socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
971 /* Set read_sequence_number */
972 socket->read_sequence_number += sequence_increase;
974 /* Set read_offset */
975 offset_increase = socket->receive_buffer_boundaries[sequence_increase-1];
976 socket->read_offset += offset_increase;
978 /* Fix copy_offset */
979 GNUNET_assert (offset_increase <= socket->copy_offset);
980 socket->copy_offset -= offset_increase;
982 /* Fix relative boundaries */
983 for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
985 if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase)
987 socket->receive_buffer_boundaries[packet] =
988 socket->receive_buffer_boundaries[packet + sequence_increase]
992 socket->receive_buffer_boundaries[packet] = 0;
998 * Cancels the existing read io handle
1000 * @param cls the closure from the SCHEDULER call
1001 * @param tc the task context
1004 read_io_timeout (void *cls,
1005 const struct GNUNET_SCHEDULER_TaskContext *tc)
1007 struct GNUNET_STREAM_Socket *socket = cls;
1008 GNUNET_STREAM_DataProcessor proc;
1011 socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
1012 if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
1014 LOG (GNUNET_ERROR_TYPE_DEBUG,
1015 "Read task timedout - Cancelling it\n");
1016 GNUNET_SCHEDULER_cancel (socket->read_task_id);
1017 socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
1019 GNUNET_assert (NULL != socket->read_handle);
1020 proc = socket->read_handle->proc;
1021 proc_cls = socket->read_handle->proc_cls;
1023 GNUNET_free (socket->read_handle);
1024 socket->read_handle = NULL;
1025 /* Call the read processor to signal timeout */
1027 GNUNET_STREAM_TIMEOUT,
1034 * Handler for DATA messages; Same for both client and server
1036 * @param socket the socket through which the ack was received
1037 * @param tunnel connection to the other end
1038 * @param sender who sent the message
1039 * @param msg the data message
1040 * @param atsi performance data for the connection
1041 * @return GNUNET_OK to keep the connection open,
1042 * GNUNET_SYSERR to close it (signal serious error)
1045 handle_data (struct GNUNET_STREAM_Socket *socket,
1046 struct GNUNET_MESH_Tunnel *tunnel,
1047 const struct GNUNET_PeerIdentity *sender,
1048 const struct GNUNET_STREAM_DataMessage *msg,
1049 const struct GNUNET_ATS_Information*atsi)
1051 const void *payload;
1052 uint32_t bytes_needed;
1053 uint32_t relative_offset;
1054 uint32_t relative_sequence_number;
1057 size = htons (msg->header.header.size);
1058 if (size < sizeof (struct GNUNET_STREAM_DataMessage))
1060 GNUNET_break_op (0);
1061 return GNUNET_SYSERR;
1064 if (GNUNET_PEER_search (sender) != socket->other_peer)
1066 LOG (GNUNET_ERROR_TYPE_DEBUG,
1067 "Received DATA from non-confirming peer\n");
1071 switch (socket->state)
1073 case STATE_ESTABLISHED:
1074 case STATE_TRANSMIT_CLOSED:
1075 case STATE_TRANSMIT_CLOSE_WAIT:
1077 /* check if the message's sequence number is in the range we are
1079 relative_sequence_number =
1080 ntohl (msg->sequence_number) - socket->read_sequence_number;
1081 if ( relative_sequence_number > GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
1083 LOG (GNUNET_ERROR_TYPE_DEBUG,
1084 "Ignoring received message with sequence number %u\n",
1085 ntohl (msg->sequence_number));
1086 /* Start ACK sending task if one is not already present */
1087 if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1089 socket->ack_task_id =
1090 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1091 (msg->ack_deadline),
1098 /* Check if we have already seen this message */
1099 if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
1100 relative_sequence_number))
1102 LOG (GNUNET_ERROR_TYPE_DEBUG,
1103 "Ignoring already received message with sequence "
1105 ntohl (msg->sequence_number));
1106 /* Start ACK sending task if one is not already present */
1107 if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1109 socket->ack_task_id =
1110 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1111 (msg->ack_deadline),
1118 LOG (GNUNET_ERROR_TYPE_DEBUG,
1119 "Receiving DATA with sequence number: %u and size: %d from %x\n",
1120 ntohl (msg->sequence_number),
1121 ntohs (msg->header.header.size),
1122 socket->other_peer);
1124 /* Check if we have to allocate the buffer */
1125 size -= sizeof (struct GNUNET_STREAM_DataMessage);
1126 relative_offset = ntohl (msg->offset) - socket->read_offset;
1127 bytes_needed = relative_offset + size;
1128 if (bytes_needed > socket->receive_buffer_size)
1130 if (bytes_needed <= RECEIVE_BUFFER_SIZE)
1132 socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
1134 socket->receive_buffer_size = bytes_needed;
1138 LOG (GNUNET_ERROR_TYPE_DEBUG,
1139 "Cannot accommodate packet %d as buffer is full\n",
1140 ntohl (msg->sequence_number));
1145 /* Copy Data to buffer */
1147 GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
1148 memcpy (socket->receive_buffer + relative_offset,
1151 socket->receive_buffer_boundaries[relative_sequence_number] =
1152 relative_offset + size;
1154 /* Modify the ACK bitmap */
1155 ackbitmap_modify_bit (&socket->ack_bitmap,
1156 relative_sequence_number,
1159 /* Start ACK sending task if one is not already present */
1160 if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1162 socket->ack_task_id =
1163 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1164 (msg->ack_deadline),
1169 if ((NULL != socket->read_handle) /* A read handle is waiting */
1170 /* There is no current read task */
1171 && (GNUNET_SCHEDULER_NO_TASK == socket->read_task_id)
1172 /* We have the first packet */
1173 && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap,
1176 LOG (GNUNET_ERROR_TYPE_DEBUG,
1177 "Scheduling read processor\n");
1179 socket->read_task_id =
1180 GNUNET_SCHEDULER_add_now (&call_read_processor,
1187 LOG (GNUNET_ERROR_TYPE_DEBUG,
1188 "Received data message when it cannot be handled\n");
1196 * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1198 * @param cls the socket (set from GNUNET_MESH_connect)
1199 * @param tunnel connection to the other end
1200 * @param tunnel_ctx place to store local state associated with the tunnel
1201 * @param sender who sent the message
1202 * @param message the actual message
1203 * @param atsi performance data for the connection
1204 * @return GNUNET_OK to keep the connection open,
1205 * GNUNET_SYSERR to close it (signal serious error)
1208 client_handle_data (void *cls,
1209 struct GNUNET_MESH_Tunnel *tunnel,
1211 const struct GNUNET_PeerIdentity *sender,
1212 const struct GNUNET_MessageHeader *message,
1213 const struct GNUNET_ATS_Information*atsi)
1215 struct GNUNET_STREAM_Socket *socket = cls;
1217 return handle_data (socket,
1220 (const struct GNUNET_STREAM_DataMessage *) message,
1226 * Callback to set state to ESTABLISHED
1228 * @param cls the closure from queue_message FIXME: document
1229 * @param socket the socket to requiring state change
1232 set_state_established (void *cls,
1233 struct GNUNET_STREAM_Socket *socket)
1235 struct GNUNET_PeerIdentity initiator_pid;
1237 LOG (GNUNET_ERROR_TYPE_DEBUG,
1238 "Attaining ESTABLISHED state\n");
1239 socket->write_offset = 0;
1240 socket->read_offset = 0;
1241 socket->state = STATE_ESTABLISHED;
1242 /* FIXME: What if listen_cb is NULL */
1243 if (NULL != socket->lsocket)
1245 GNUNET_PEER_resolve (socket->other_peer, &initiator_pid);
1246 LOG (GNUNET_ERROR_TYPE_DEBUG,
1247 "Calling listen callback\n");
1248 if (GNUNET_SYSERR ==
1249 socket->lsocket->listen_cb (socket->lsocket->listen_cb_cls,
1253 socket->state = STATE_CLOSED;
1254 /* FIXME: We should close in a decent way */
1255 GNUNET_MESH_tunnel_destroy (socket->tunnel); /* Destroy the tunnel */
1256 GNUNET_free (socket);
1259 else if (socket->open_cb)
1260 socket->open_cb (socket->open_cls, socket);
1265 * Callback to set state to HELLO_WAIT
1267 * @param cls the closure from queue_message
1268 * @param socket the socket to requiring state change
1271 set_state_hello_wait (void *cls,
1272 struct GNUNET_STREAM_Socket *socket)
1274 GNUNET_assert (STATE_INIT == socket->state);
1275 LOG (GNUNET_ERROR_TYPE_DEBUG,
1276 "Attaining HELLO_WAIT state\n");
1277 socket->state = STATE_HELLO_WAIT;
1282 * Callback to set state to CLOSE_WAIT
1284 * @param cls the closure from queue_message
1285 * @param socket the socket requiring state change
1288 set_state_close_wait (void *cls,
1289 struct GNUNET_STREAM_Socket *socket)
1291 LOG (GNUNET_ERROR_TYPE_DEBUG,
1292 "Attaing CLOSE_WAIT state\n");
1293 socket->state = STATE_CLOSE_WAIT;
1294 GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1295 socket->receive_buffer = NULL;
1296 socket->receive_buffer_size = 0;
1301 * Callback to set state to RECEIVE_CLOSE_WAIT
1303 * @param cls the closure from queue_message
1304 * @param socket the socket requiring state change
1307 set_state_receive_close_wait (void *cls,
1308 struct GNUNET_STREAM_Socket *socket)
1310 LOG (GNUNET_ERROR_TYPE_DEBUG,
1311 "Attaing RECEIVE_CLOSE_WAIT state\n");
1312 socket->state = STATE_RECEIVE_CLOSE_WAIT;
1313 GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1314 socket->receive_buffer = NULL;
1315 socket->receive_buffer_size = 0;
1320 * Callback to set state to TRANSMIT_CLOSE_WAIT
1322 * @param cls the closure from queue_message
1323 * @param socket the socket requiring state change
1326 set_state_transmit_close_wait (void *cls,
1327 struct GNUNET_STREAM_Socket *socket)
1329 LOG (GNUNET_ERROR_TYPE_DEBUG,
1330 "Attaining TRANSMIT_CLOSE_WAIT state\n");
1331 socket->state = STATE_TRANSMIT_CLOSE_WAIT;
1336 * Callback to set state to CLOSED
1338 * @param cls the closure from queue_message
1339 * @param socket the socket requiring state change
1342 set_state_closed (void *cls,
1343 struct GNUNET_STREAM_Socket *socket)
1345 socket->state = STATE_CLOSED;
1349 * Returns a new HelloAckMessage. Also sets the write sequence number for the
1352 * @param socket the socket for which this HelloAckMessage has to be generated
1353 * @return the HelloAckMessage
1355 static struct GNUNET_STREAM_HelloAckMessage *
1356 generate_hello_ack_msg (struct GNUNET_STREAM_Socket *socket)
1358 struct GNUNET_STREAM_HelloAckMessage *msg;
1360 /* Get the random sequence number */
1361 socket->write_sequence_number =
1362 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1363 LOG (GNUNET_ERROR_TYPE_DEBUG,
1364 "Generated write sequence number %u\n",
1365 (unsigned int) socket->write_sequence_number);
1367 msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1368 msg->header.header.size =
1369 htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1370 msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1371 msg->sequence_number = htonl (socket->write_sequence_number);
1372 msg->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
1379 * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1381 * @param cls the socket (set from GNUNET_MESH_connect)
1382 * @param tunnel connection to the other end
1383 * @param tunnel_ctx this is NULL
1384 * @param sender who sent the message
1385 * @param message the actual message
1386 * @param atsi performance data for the connection
1387 * @return GNUNET_OK to keep the connection open,
1388 * GNUNET_SYSERR to close it (signal serious error)
1391 client_handle_hello_ack (void *cls,
1392 struct GNUNET_MESH_Tunnel *tunnel,
1394 const struct GNUNET_PeerIdentity *sender,
1395 const struct GNUNET_MessageHeader *message,
1396 const struct GNUNET_ATS_Information*atsi)
1398 struct GNUNET_STREAM_Socket *socket = cls;
1399 const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
1400 struct GNUNET_STREAM_HelloAckMessage *reply;
1402 if (GNUNET_PEER_search (sender) != socket->other_peer)
1404 LOG (GNUNET_ERROR_TYPE_DEBUG,
1405 "Received HELLO_ACK from non-confirming peer\n");
1408 ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
1409 LOG (GNUNET_ERROR_TYPE_DEBUG,
1410 "Received HELLO_ACK from %x\n",
1411 socket->other_peer);
1413 GNUNET_assert (socket->tunnel == tunnel);
1414 switch (socket->state)
1416 case STATE_HELLO_WAIT:
1417 socket->read_sequence_number = ntohl (ack_msg->sequence_number);
1418 LOG (GNUNET_ERROR_TYPE_DEBUG,
1419 "Read sequence number %u\n",
1420 (unsigned int) socket->read_sequence_number);
1421 socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
1422 reply = generate_hello_ack_msg (socket);
1423 queue_message (socket,
1425 &set_state_established,
1428 case STATE_ESTABLISHED:
1429 case STATE_RECEIVE_CLOSE_WAIT:
1430 // call statistics (# ACKs ignored++)
1434 LOG (GNUNET_ERROR_TYPE_DEBUG,
1435 "Server %x sent HELLO_ACK when in state %d\n",
1438 socket->state = STATE_CLOSED; // introduce STATE_ERROR?
1439 return GNUNET_SYSERR;
1446 * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1448 * @param cls the socket (set from GNUNET_MESH_connect)
1449 * @param tunnel connection to the other end
1450 * @param tunnel_ctx this is NULL
1451 * @param sender who sent the message
1452 * @param message the actual message
1453 * @param atsi performance data for the connection
1454 * @return GNUNET_OK to keep the connection open,
1455 * GNUNET_SYSERR to close it (signal serious error)
1458 client_handle_reset (void *cls,
1459 struct GNUNET_MESH_Tunnel *tunnel,
1461 const struct GNUNET_PeerIdentity *sender,
1462 const struct GNUNET_MessageHeader *message,
1463 const struct GNUNET_ATS_Information*atsi)
1465 // struct GNUNET_STREAM_Socket *socket = cls;
1472 * Common message handler for handling TRANSMIT_CLOSE messages
1474 * @param socket the socket through which the ack was received
1475 * @param tunnel connection to the other end
1476 * @param sender who sent the message
1477 * @param msg the transmit close message
1478 * @param atsi performance data for the connection
1479 * @return GNUNET_OK to keep the connection open,
1480 * GNUNET_SYSERR to close it (signal serious error)
1483 handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
1484 struct GNUNET_MESH_Tunnel *tunnel,
1485 const struct GNUNET_PeerIdentity *sender,
1486 const struct GNUNET_STREAM_MessageHeader *msg,
1487 const struct GNUNET_ATS_Information*atsi)
1489 struct GNUNET_STREAM_MessageHeader *reply;
1491 switch (socket->state)
1493 case STATE_ESTABLISHED:
1494 socket->state = STATE_RECEIVE_CLOSED;
1496 /* Send TRANSMIT_CLOSE_ACK */
1497 reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1498 reply->header.type =
1499 htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
1500 reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1501 queue_message (socket, reply, NULL, NULL);
1505 /* FIXME: Call statistics? */
1513 * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1515 * @param cls the socket (set from GNUNET_MESH_connect)
1516 * @param tunnel connection to the other end
1517 * @param tunnel_ctx this is NULL
1518 * @param sender who sent the message
1519 * @param message the actual message
1520 * @param atsi performance data for the connection
1521 * @return GNUNET_OK to keep the connection open,
1522 * GNUNET_SYSERR to close it (signal serious error)
1525 client_handle_transmit_close (void *cls,
1526 struct GNUNET_MESH_Tunnel *tunnel,
1528 const struct GNUNET_PeerIdentity *sender,
1529 const struct GNUNET_MessageHeader *message,
1530 const struct GNUNET_ATS_Information*atsi)
1532 struct GNUNET_STREAM_Socket *socket = cls;
1534 return handle_transmit_close (socket,
1537 (struct GNUNET_STREAM_MessageHeader *)message,
1543 * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_*_CLOSE_ACK messages
1545 * @param socket the socket
1546 * @param tunnel connection to the other end
1547 * @param sender who sent the message
1548 * @param message the actual message
1549 * @param atsi performance data for the connection
1550 * @param operation the close operation which is being ACK'ed
1551 * @return GNUNET_OK to keep the connection open,
1552 * GNUNET_SYSERR to close it (signal serious error)
1555 handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
1556 struct GNUNET_MESH_Tunnel *tunnel,
1557 const struct GNUNET_PeerIdentity *sender,
1558 const struct GNUNET_STREAM_MessageHeader *message,
1559 const struct GNUNET_ATS_Information *atsi,
1562 struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
1564 shutdown_handle = socket->shutdown_handle;
1565 if (NULL == shutdown_handle)
1567 LOG (GNUNET_ERROR_TYPE_DEBUG,
1568 "Received *CLOSE_ACK when shutdown handle is NULL\n");
1575 switch (socket->state)
1577 case STATE_CLOSE_WAIT:
1578 if (SHUT_RDWR != shutdown_handle->operation)
1580 LOG (GNUNET_ERROR_TYPE_DEBUG,
1581 "Received CLOSE_ACK when shutdown handle is not for SHUT_RDWR\n");
1585 LOG (GNUNET_ERROR_TYPE_DEBUG,
1586 "Received CLOSE_ACK from %x\n",
1587 socket->other_peer);
1588 socket->state = STATE_CLOSED;
1591 LOG (GNUNET_ERROR_TYPE_DEBUG,
1592 "Received CLOSE_ACK when in it not expected\n");
1598 switch (socket->state)
1600 case STATE_RECEIVE_CLOSE_WAIT:
1601 if (SHUT_RD != shutdown_handle->operation)
1603 LOG (GNUNET_ERROR_TYPE_DEBUG,
1604 "Received RECEIVE_CLOSE_ACK when shutdown handle is not for SHUT_RD\n");
1608 LOG (GNUNET_ERROR_TYPE_DEBUG,
1609 "Received RECEIVE_CLOSE_ACK from %x\n",
1610 socket->other_peer);
1611 socket->state = STATE_RECEIVE_CLOSED;
1614 LOG (GNUNET_ERROR_TYPE_DEBUG,
1615 "Received RECEIVE_CLOSE_ACK when in it not expected\n");
1621 switch (socket->state)
1623 case STATE_TRANSMIT_CLOSE_WAIT:
1624 if (SHUT_WR != shutdown_handle->operation)
1626 LOG (GNUNET_ERROR_TYPE_DEBUG,
1627 "Received TRANSMIT_CLOSE_ACK when shutdown handle is not for SHUT_WR\n");
1631 LOG (GNUNET_ERROR_TYPE_DEBUG,
1632 "Received TRANSMIT_CLOSE_ACK from %x\n",
1633 socket->other_peer);
1634 socket->state = STATE_TRANSMIT_CLOSED;
1637 LOG (GNUNET_ERROR_TYPE_DEBUG,
1638 "Received TRANSMIT_CLOSE_ACK when in it not expected\n");
1647 if (NULL != shutdown_handle->completion_cb) /* Shutdown completion */
1648 shutdown_handle->completion_cb(shutdown_handle->completion_cls,
1650 GNUNET_free (shutdown_handle); /* Free shutdown handle */
1651 socket->shutdown_handle = NULL;
1652 if (GNUNET_SCHEDULER_NO_TASK
1653 != shutdown_handle->close_msg_retransmission_task_id)
1655 GNUNET_SCHEDULER_cancel
1656 (shutdown_handle->close_msg_retransmission_task_id);
1657 shutdown_handle->close_msg_retransmission_task_id =
1658 GNUNET_SCHEDULER_NO_TASK;
1665 * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1667 * @param cls the socket (set from GNUNET_MESH_connect)
1668 * @param tunnel connection to the other end
1669 * @param tunnel_ctx this is NULL
1670 * @param sender who sent the message
1671 * @param message the actual message
1672 * @param atsi performance data for the connection
1673 * @return GNUNET_OK to keep the connection open,
1674 * GNUNET_SYSERR to close it (signal serious error)
1677 client_handle_transmit_close_ack (void *cls,
1678 struct GNUNET_MESH_Tunnel *tunnel,
1680 const struct GNUNET_PeerIdentity *sender,
1681 const struct GNUNET_MessageHeader *message,
1682 const struct GNUNET_ATS_Information*atsi)
1684 struct GNUNET_STREAM_Socket *socket = cls;
1686 return handle_generic_close_ack (socket,
1689 (const struct GNUNET_STREAM_MessageHeader *)
1697 * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1699 * @param socket the socket
1700 * @param tunnel connection to the other end
1701 * @param sender who sent the message
1702 * @param message the actual message
1703 * @param atsi performance data for the connection
1704 * @return GNUNET_OK to keep the connection open,
1705 * GNUNET_SYSERR to close it (signal serious error)
1708 handle_receive_close (struct GNUNET_STREAM_Socket *socket,
1709 struct GNUNET_MESH_Tunnel *tunnel,
1710 const struct GNUNET_PeerIdentity *sender,
1711 const struct GNUNET_STREAM_MessageHeader *message,
1712 const struct GNUNET_ATS_Information *atsi)
1714 struct GNUNET_STREAM_MessageHeader *receive_close_ack;
1716 switch (socket->state)
1720 case STATE_HELLO_WAIT:
1721 LOG (GNUNET_ERROR_TYPE_DEBUG,
1722 "Ignoring RECEIVE_CLOSE as it cannot be handled now\n");
1728 LOG (GNUNET_ERROR_TYPE_DEBUG,
1729 "Received RECEIVE_CLOSE from %x\n",
1730 socket->other_peer);
1732 GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1733 receive_close_ack->header.size =
1734 htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1735 receive_close_ack->header.type =
1736 htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK);
1737 queue_message (socket,
1742 /* FIXME: Handle the case where write handle is present; the write operation
1743 should be deemed as finised and the write continuation callback
1744 has to be called with the stream status GNUNET_STREAM_SHUTDOWN */
1750 * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1752 * @param cls the socket (set from GNUNET_MESH_connect)
1753 * @param tunnel connection to the other end
1754 * @param tunnel_ctx this is NULL
1755 * @param sender who sent the message
1756 * @param message the actual message
1757 * @param atsi performance data for the connection
1758 * @return GNUNET_OK to keep the connection open,
1759 * GNUNET_SYSERR to close it (signal serious error)
1762 client_handle_receive_close (void *cls,
1763 struct GNUNET_MESH_Tunnel *tunnel,
1765 const struct GNUNET_PeerIdentity *sender,
1766 const struct GNUNET_MessageHeader *message,
1767 const struct GNUNET_ATS_Information*atsi)
1769 struct GNUNET_STREAM_Socket *socket = cls;
1772 handle_receive_close (socket,
1775 (const struct GNUNET_STREAM_MessageHeader *) message,
1781 * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1783 * @param cls the socket (set from GNUNET_MESH_connect)
1784 * @param tunnel connection to the other end
1785 * @param tunnel_ctx this is NULL
1786 * @param sender who sent the message
1787 * @param message the actual message
1788 * @param atsi performance data for the connection
1789 * @return GNUNET_OK to keep the connection open,
1790 * GNUNET_SYSERR to close it (signal serious error)
1793 client_handle_receive_close_ack (void *cls,
1794 struct GNUNET_MESH_Tunnel *tunnel,
1796 const struct GNUNET_PeerIdentity *sender,
1797 const struct GNUNET_MessageHeader *message,
1798 const struct GNUNET_ATS_Information*atsi)
1800 struct GNUNET_STREAM_Socket *socket = cls;
1802 return handle_generic_close_ack (socket,
1805 (const struct GNUNET_STREAM_MessageHeader *)
1813 * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1815 * @param socket the socket
1816 * @param tunnel connection to the other end
1817 * @param sender who sent the message
1818 * @param message the actual message
1819 * @param atsi performance data for the connection
1820 * @return GNUNET_OK to keep the connection open,
1821 * GNUNET_SYSERR to close it (signal serious error)
1824 handle_close (struct GNUNET_STREAM_Socket *socket,
1825 struct GNUNET_MESH_Tunnel *tunnel,
1826 const struct GNUNET_PeerIdentity *sender,
1827 const struct GNUNET_STREAM_MessageHeader *message,
1828 const struct GNUNET_ATS_Information*atsi)
1830 struct GNUNET_STREAM_MessageHeader *close_ack;
1832 switch (socket->state)
1836 case STATE_HELLO_WAIT:
1837 LOG (GNUNET_ERROR_TYPE_DEBUG,
1838 "Ignoring RECEIVE_CLOSE as it cannot be handled now\n");
1844 LOG (GNUNET_ERROR_TYPE_DEBUG,
1845 "Received CLOSE from %x\n",
1846 socket->other_peer);
1847 close_ack = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1848 close_ack->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1849 close_ack->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK);
1850 queue_message (socket,
1854 if (socket->state == STATE_CLOSED)
1857 GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1858 socket->receive_buffer = NULL;
1859 socket->receive_buffer_size = 0;
1865 * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1867 * @param cls the socket (set from GNUNET_MESH_connect)
1868 * @param tunnel connection to the other end
1869 * @param tunnel_ctx this is NULL
1870 * @param sender who sent the message
1871 * @param message the actual message
1872 * @param atsi performance data for the connection
1873 * @return GNUNET_OK to keep the connection open,
1874 * GNUNET_SYSERR to close it (signal serious error)
1877 client_handle_close (void *cls,
1878 struct GNUNET_MESH_Tunnel *tunnel,
1880 const struct GNUNET_PeerIdentity *sender,
1881 const struct GNUNET_MessageHeader *message,
1882 const struct GNUNET_ATS_Information*atsi)
1884 struct GNUNET_STREAM_Socket *socket = cls;
1886 return handle_close (socket,
1889 (const struct GNUNET_STREAM_MessageHeader *) message,
1895 * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1897 * @param cls the socket (set from GNUNET_MESH_connect)
1898 * @param tunnel connection to the other end
1899 * @param tunnel_ctx this is NULL
1900 * @param sender who sent the message
1901 * @param message the actual message
1902 * @param atsi performance data for the connection
1903 * @return GNUNET_OK to keep the connection open,
1904 * GNUNET_SYSERR to close it (signal serious error)
1907 client_handle_close_ack (void *cls,
1908 struct GNUNET_MESH_Tunnel *tunnel,
1910 const struct GNUNET_PeerIdentity *sender,
1911 const struct GNUNET_MessageHeader *message,
1912 const struct GNUNET_ATS_Information *atsi)
1914 struct GNUNET_STREAM_Socket *socket = cls;
1916 return handle_generic_close_ack (socket,
1919 (const struct GNUNET_STREAM_MessageHeader *)
1925 /*****************************/
1926 /* Server's Message Handlers */
1927 /*****************************/
1930 * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1932 * @param cls the closure
1933 * @param tunnel connection to the other end
1934 * @param tunnel_ctx the socket
1935 * @param sender who sent the message
1936 * @param message the actual message
1937 * @param atsi performance data for the connection
1938 * @return GNUNET_OK to keep the connection open,
1939 * GNUNET_SYSERR to close it (signal serious error)
1942 server_handle_data (void *cls,
1943 struct GNUNET_MESH_Tunnel *tunnel,
1945 const struct GNUNET_PeerIdentity *sender,
1946 const struct GNUNET_MessageHeader *message,
1947 const struct GNUNET_ATS_Information*atsi)
1949 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1951 return handle_data (socket,
1954 (const struct GNUNET_STREAM_DataMessage *)message,
1960 * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
1962 * @param cls the closure
1963 * @param tunnel connection to the other end
1964 * @param tunnel_ctx the socket
1965 * @param sender who sent the message
1966 * @param message the actual message
1967 * @param atsi performance data for the connection
1968 * @return GNUNET_OK to keep the connection open,
1969 * GNUNET_SYSERR to close it (signal serious error)
1972 server_handle_hello (void *cls,
1973 struct GNUNET_MESH_Tunnel *tunnel,
1975 const struct GNUNET_PeerIdentity *sender,
1976 const struct GNUNET_MessageHeader *message,
1977 const struct GNUNET_ATS_Information*atsi)
1979 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1980 struct GNUNET_STREAM_HelloAckMessage *reply;
1982 if (GNUNET_PEER_search (sender) != socket->other_peer)
1984 LOG (GNUNET_ERROR_TYPE_DEBUG,
1985 "Received HELLO from non-confirming peer\n");
1989 GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO ==
1990 ntohs (message->type));
1991 GNUNET_assert (socket->tunnel == tunnel);
1992 LOG (GNUNET_ERROR_TYPE_DEBUG,
1993 "Received HELLO from %x\n",
1994 socket->other_peer);
1996 if (STATE_INIT == socket->state)
1998 reply = generate_hello_ack_msg (socket);
1999 queue_message (socket,
2001 &set_state_hello_wait,
2006 LOG (GNUNET_ERROR_TYPE_DEBUG,
2007 "Client sent HELLO when in state %d\n", socket->state);
2008 /* FIXME: Send RESET? */
2016 * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
2018 * @param cls the closure
2019 * @param tunnel connection to the other end
2020 * @param tunnel_ctx the socket
2021 * @param sender who sent the message
2022 * @param message the actual message
2023 * @param atsi performance data for the connection
2024 * @return GNUNET_OK to keep the connection open,
2025 * GNUNET_SYSERR to close it (signal serious error)
2028 server_handle_hello_ack (void *cls,
2029 struct GNUNET_MESH_Tunnel *tunnel,
2031 const struct GNUNET_PeerIdentity *sender,
2032 const struct GNUNET_MessageHeader *message,
2033 const struct GNUNET_ATS_Information*atsi)
2035 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2036 const struct GNUNET_STREAM_HelloAckMessage *ack_message;
2038 GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK ==
2039 ntohs (message->type));
2040 GNUNET_assert (socket->tunnel == tunnel);
2041 ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
2042 if (STATE_HELLO_WAIT == socket->state)
2044 LOG (GNUNET_ERROR_TYPE_DEBUG,
2045 "Received HELLO_ACK from %x\n",
2046 socket->other_peer);
2047 socket->read_sequence_number = ntohl (ack_message->sequence_number);
2048 LOG (GNUNET_ERROR_TYPE_DEBUG,
2049 "Read sequence number %u\n",
2050 (unsigned int) socket->read_sequence_number);
2051 socket->receiver_window_available =
2052 ntohl (ack_message->receiver_window_size);
2053 /* Attain ESTABLISHED state */
2054 set_state_established (NULL, socket);
2058 LOG (GNUNET_ERROR_TYPE_DEBUG,
2059 "Client sent HELLO_ACK when in state %d\n", socket->state);
2060 /* FIXME: Send RESET? */
2068 * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
2070 * @param cls the closure
2071 * @param tunnel connection to the other end
2072 * @param tunnel_ctx the socket
2073 * @param sender who sent the message
2074 * @param message the actual message
2075 * @param atsi performance data for the connection
2076 * @return GNUNET_OK to keep the connection open,
2077 * GNUNET_SYSERR to close it (signal serious error)
2080 server_handle_reset (void *cls,
2081 struct GNUNET_MESH_Tunnel *tunnel,
2083 const struct GNUNET_PeerIdentity *sender,
2084 const struct GNUNET_MessageHeader *message,
2085 const struct GNUNET_ATS_Information*atsi)
2087 // struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2094 * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
2096 * @param cls the closure
2097 * @param tunnel connection to the other end
2098 * @param tunnel_ctx the socket
2099 * @param sender who sent the message
2100 * @param message the actual message
2101 * @param atsi performance data for the connection
2102 * @return GNUNET_OK to keep the connection open,
2103 * GNUNET_SYSERR to close it (signal serious error)
2106 server_handle_transmit_close (void *cls,
2107 struct GNUNET_MESH_Tunnel *tunnel,
2109 const struct GNUNET_PeerIdentity *sender,
2110 const struct GNUNET_MessageHeader *message,
2111 const struct GNUNET_ATS_Information*atsi)
2113 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2115 return handle_transmit_close (socket,
2118 (struct GNUNET_STREAM_MessageHeader *)message,
2124 * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
2126 * @param cls the closure
2127 * @param tunnel connection to the other end
2128 * @param tunnel_ctx the socket
2129 * @param sender who sent the message
2130 * @param message the actual message
2131 * @param atsi performance data for the connection
2132 * @return GNUNET_OK to keep the connection open,
2133 * GNUNET_SYSERR to close it (signal serious error)
2136 server_handle_transmit_close_ack (void *cls,
2137 struct GNUNET_MESH_Tunnel *tunnel,
2139 const struct GNUNET_PeerIdentity *sender,
2140 const struct GNUNET_MessageHeader *message,
2141 const struct GNUNET_ATS_Information*atsi)
2143 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2145 return handle_generic_close_ack (socket,
2148 (const struct GNUNET_STREAM_MessageHeader *)
2156 * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
2158 * @param cls the closure
2159 * @param tunnel connection to the other end
2160 * @param tunnel_ctx the socket
2161 * @param sender who sent the message
2162 * @param message the actual message
2163 * @param atsi performance data for the connection
2164 * @return GNUNET_OK to keep the connection open,
2165 * GNUNET_SYSERR to close it (signal serious error)
2168 server_handle_receive_close (void *cls,
2169 struct GNUNET_MESH_Tunnel *tunnel,
2171 const struct GNUNET_PeerIdentity *sender,
2172 const struct GNUNET_MessageHeader *message,
2173 const struct GNUNET_ATS_Information*atsi)
2175 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2178 handle_receive_close (socket,
2181 (const struct GNUNET_STREAM_MessageHeader *) message,
2187 * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
2189 * @param cls the closure
2190 * @param tunnel connection to the other end
2191 * @param tunnel_ctx the socket
2192 * @param sender who sent the message
2193 * @param message the actual message
2194 * @param atsi performance data for the connection
2195 * @return GNUNET_OK to keep the connection open,
2196 * GNUNET_SYSERR to close it (signal serious error)
2199 server_handle_receive_close_ack (void *cls,
2200 struct GNUNET_MESH_Tunnel *tunnel,
2202 const struct GNUNET_PeerIdentity *sender,
2203 const struct GNUNET_MessageHeader *message,
2204 const struct GNUNET_ATS_Information*atsi)
2206 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2208 return handle_generic_close_ack (socket,
2211 (const struct GNUNET_STREAM_MessageHeader *)
2219 * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
2221 * @param cls the listen socket (from GNUNET_MESH_connect in
2222 * GNUNET_STREAM_listen)
2223 * @param tunnel connection to the other end
2224 * @param tunnel_ctx the socket
2225 * @param sender who sent the message
2226 * @param message the actual message
2227 * @param atsi performance data for the connection
2228 * @return GNUNET_OK to keep the connection open,
2229 * GNUNET_SYSERR to close it (signal serious error)
2232 server_handle_close (void *cls,
2233 struct GNUNET_MESH_Tunnel *tunnel,
2235 const struct GNUNET_PeerIdentity *sender,
2236 const struct GNUNET_MessageHeader *message,
2237 const struct GNUNET_ATS_Information*atsi)
2239 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2241 return handle_close (socket,
2244 (const struct GNUNET_STREAM_MessageHeader *) message,
2250 * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
2252 * @param cls the closure
2253 * @param tunnel connection to the other end
2254 * @param tunnel_ctx the socket
2255 * @param sender who sent the message
2256 * @param message the actual message
2257 * @param atsi performance data for the connection
2258 * @return GNUNET_OK to keep the connection open,
2259 * GNUNET_SYSERR to close it (signal serious error)
2262 server_handle_close_ack (void *cls,
2263 struct GNUNET_MESH_Tunnel *tunnel,
2265 const struct GNUNET_PeerIdentity *sender,
2266 const struct GNUNET_MessageHeader *message,
2267 const struct GNUNET_ATS_Information*atsi)
2269 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2271 return handle_generic_close_ack (socket,
2274 (const struct GNUNET_STREAM_MessageHeader *)
2282 * Handler for DATA_ACK messages
2284 * @param socket the socket through which the ack was received
2285 * @param tunnel connection to the other end
2286 * @param sender who sent the message
2287 * @param ack the acknowledgment message
2288 * @param atsi performance data for the connection
2289 * @return GNUNET_OK to keep the connection open,
2290 * GNUNET_SYSERR to close it (signal serious error)
2293 handle_ack (struct GNUNET_STREAM_Socket *socket,
2294 struct GNUNET_MESH_Tunnel *tunnel,
2295 const struct GNUNET_PeerIdentity *sender,
2296 const struct GNUNET_STREAM_AckMessage *ack,
2297 const struct GNUNET_ATS_Information*atsi)
2299 unsigned int packet;
2300 int need_retransmission;
2303 if (GNUNET_PEER_search (sender) != socket->other_peer)
2305 LOG (GNUNET_ERROR_TYPE_DEBUG,
2306 "Received ACK from non-confirming peer\n");
2310 switch (socket->state)
2312 case (STATE_ESTABLISHED):
2313 case (STATE_RECEIVE_CLOSED):
2314 case (STATE_RECEIVE_CLOSE_WAIT):
2315 if (NULL == socket->write_handle)
2317 LOG (GNUNET_ERROR_TYPE_DEBUG,
2318 "Received DATA_ACK when write_handle is NULL\n");
2321 /* FIXME: increment in the base sequence number is breaking current flow
2323 if (!((socket->write_sequence_number
2324 - ntohl (ack->base_sequence_number)) < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
2326 LOG (GNUNET_ERROR_TYPE_DEBUG,
2327 "Received DATA_ACK with unexpected base sequence number\n");
2328 LOG (GNUNET_ERROR_TYPE_DEBUG,
2329 "Current write sequence: %u; Ack's base sequence: %u\n",
2330 socket->write_sequence_number,
2331 ntohl (ack->base_sequence_number));
2334 /* FIXME: include the case when write_handle is cancelled - ignore the
2337 LOG (GNUNET_ERROR_TYPE_DEBUG,
2338 "Received DATA_ACK from %x\n",
2339 socket->other_peer);
2341 /* Cancel the retransmission task */
2342 if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
2344 GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
2345 socket->retransmission_timeout_task_id =
2346 GNUNET_SCHEDULER_NO_TASK;
2349 for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2351 if (NULL == socket->write_handle->messages[packet]) break;
2352 if (ntohl (ack->base_sequence_number)
2353 >= ntohl (socket->write_handle->messages[packet]->sequence_number))
2354 ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
2359 ackbitmap_is_bit_set (&socket->write_handle->ack_bitmap,
2360 ntohl (socket->write_handle->messages[packet]->sequence_number)
2361 - ntohl (ack->base_sequence_number)))
2362 ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
2367 /* Update the receive window remaining
2368 FIXME : Should update with the value from a data ack with greater
2370 socket->receiver_window_available =
2371 ntohl (ack->receive_window_remaining);
2373 /* Check if we have received all acknowledgements */
2374 need_retransmission = GNUNET_NO;
2375 for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2377 if (NULL == socket->write_handle->messages[packet]) break;
2378 if (GNUNET_YES != ackbitmap_is_bit_set
2379 (&socket->write_handle->ack_bitmap,packet))
2381 need_retransmission = GNUNET_YES;
2385 if (GNUNET_YES == need_retransmission)
2387 write_data (socket);
2389 else /* We have to call the write continuation callback now */
2391 /* Free the packets */
2392 for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2394 GNUNET_free_non_null (socket->write_handle->messages[packet]);
2396 if (NULL != socket->write_handle->write_cont)
2397 socket->write_handle->write_cont
2398 (socket->write_handle->write_cont_cls,
2400 socket->write_handle->size);
2401 LOG (GNUNET_ERROR_TYPE_DEBUG,
2402 "Write completion callback completed\n");
2403 /* We are done with the write handle - Freeing it */
2404 GNUNET_free (socket->write_handle);
2405 socket->write_handle = NULL;
2416 * Handler for DATA_ACK messages
2418 * @param cls the 'struct GNUNET_STREAM_Socket'
2419 * @param tunnel connection to the other end
2420 * @param tunnel_ctx unused
2421 * @param sender who sent the message
2422 * @param message the actual message
2423 * @param atsi performance data for the connection
2424 * @return GNUNET_OK to keep the connection open,
2425 * GNUNET_SYSERR to close it (signal serious error)
2428 client_handle_ack (void *cls,
2429 struct GNUNET_MESH_Tunnel *tunnel,
2431 const struct GNUNET_PeerIdentity *sender,
2432 const struct GNUNET_MessageHeader *message,
2433 const struct GNUNET_ATS_Information*atsi)
2435 struct GNUNET_STREAM_Socket *socket = cls;
2436 const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2438 return handle_ack (socket, tunnel, sender, ack, atsi);
2443 * Handler for DATA_ACK messages
2445 * @param cls the server's listen socket
2446 * @param tunnel connection to the other end
2447 * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
2448 * @param sender who sent the message
2449 * @param message the actual message
2450 * @param atsi performance data for the connection
2451 * @return GNUNET_OK to keep the connection open,
2452 * GNUNET_SYSERR to close it (signal serious error)
2455 server_handle_ack (void *cls,
2456 struct GNUNET_MESH_Tunnel *tunnel,
2458 const struct GNUNET_PeerIdentity *sender,
2459 const struct GNUNET_MessageHeader *message,
2460 const struct GNUNET_ATS_Information*atsi)
2462 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2463 const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2465 return handle_ack (socket, tunnel, sender, ack, atsi);
2470 * For client message handlers, the stream socket is in the
2473 static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
2474 {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2475 {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK,
2476 sizeof (struct GNUNET_STREAM_AckMessage) },
2477 {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2478 sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2479 {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2480 sizeof (struct GNUNET_STREAM_MessageHeader)},
2481 {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2482 sizeof (struct GNUNET_STREAM_MessageHeader)},
2483 {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2484 sizeof (struct GNUNET_STREAM_MessageHeader)},
2485 {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2486 sizeof (struct GNUNET_STREAM_MessageHeader)},
2487 {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2488 sizeof (struct GNUNET_STREAM_MessageHeader)},
2489 {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2490 sizeof (struct GNUNET_STREAM_MessageHeader)},
2491 {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2492 sizeof (struct GNUNET_STREAM_MessageHeader)},
2498 * For server message handlers, the stream socket is in the
2499 * tunnel context, and the listen socket in the closure argument.
2501 static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
2502 {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2503 {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK,
2504 sizeof (struct GNUNET_STREAM_AckMessage) },
2505 {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO,
2506 sizeof (struct GNUNET_STREAM_MessageHeader)},
2507 {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2508 sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2509 {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2510 sizeof (struct GNUNET_STREAM_MessageHeader)},
2511 {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2512 sizeof (struct GNUNET_STREAM_MessageHeader)},
2513 {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2514 sizeof (struct GNUNET_STREAM_MessageHeader)},
2515 {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2516 sizeof (struct GNUNET_STREAM_MessageHeader)},
2517 {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2518 sizeof (struct GNUNET_STREAM_MessageHeader)},
2519 {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2520 sizeof (struct GNUNET_STREAM_MessageHeader)},
2521 {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2522 sizeof (struct GNUNET_STREAM_MessageHeader)},
2528 * Function called when our target peer is connected to our tunnel
2530 * @param cls the socket for which this tunnel is created
2531 * @param peer the peer identity of the target
2532 * @param atsi performance data for the connection
2535 mesh_peer_connect_callback (void *cls,
2536 const struct GNUNET_PeerIdentity *peer,
2537 const struct GNUNET_ATS_Information * atsi)
2539 struct GNUNET_STREAM_Socket *socket = cls;
2540 struct GNUNET_STREAM_MessageHeader *message;
2541 GNUNET_PEER_Id connected_peer;
2543 connected_peer = GNUNET_PEER_search (peer);
2545 if (connected_peer != socket->other_peer)
2547 LOG (GNUNET_ERROR_TYPE_DEBUG,
2548 "A peer which is not our target has connected to our tunnel\n");
2552 LOG (GNUNET_ERROR_TYPE_DEBUG,
2553 "Target peer %x connected\n",
2556 /* Set state to INIT */
2557 socket->state = STATE_INIT;
2559 /* Send HELLO message */
2560 message = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
2561 message->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
2562 message->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
2563 queue_message (socket,
2565 &set_state_hello_wait,
2568 /* Call open callback */
2569 if (NULL == socket->open_cb)
2571 LOG (GNUNET_ERROR_TYPE_DEBUG,
2572 "STREAM_open callback is NULL\n");
2578 * Function called when our target peer is disconnected from our tunnel
2580 * @param cls the socket associated which this tunnel
2581 * @param peer the peer identity of the target
2584 mesh_peer_disconnect_callback (void *cls,
2585 const struct GNUNET_PeerIdentity *peer)
2587 struct GNUNET_STREAM_Socket *socket=cls;
2589 /* If the state is SHUTDOWN its ok; else set the state of the socket to SYSERR */
2590 LOG (GNUNET_ERROR_TYPE_DEBUG,
2591 "Other peer %x disconnected\n",
2592 socket->other_peer);
2597 * Method called whenever a peer creates a tunnel to us
2599 * @param cls closure
2600 * @param tunnel new handle to the tunnel
2601 * @param initiator peer that started the tunnel
2602 * @param atsi performance information for the tunnel
2603 * @return initial tunnel context for the tunnel
2604 * (can be NULL -- that's not an error)
2607 new_tunnel_notify (void *cls,
2608 struct GNUNET_MESH_Tunnel *tunnel,
2609 const struct GNUNET_PeerIdentity *initiator,
2610 const struct GNUNET_ATS_Information *atsi)
2612 struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2613 struct GNUNET_STREAM_Socket *socket;
2615 /* FIXME: If a tunnel is already created, we should not accept new tunnels
2616 from the same peer again until the socket is closed */
2618 socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2619 socket->other_peer = GNUNET_PEER_intern (initiator);
2620 socket->tunnel = tunnel;
2621 socket->session_id = 0; /* FIXME */
2622 socket->state = STATE_INIT;
2623 socket->lsocket = lsocket;
2624 LOG (GNUNET_ERROR_TYPE_DEBUG,
2625 "Peer %x initiated tunnel to us\n",
2626 socket->other_peer);
2628 /* FIXME: Copy MESH handle from lsocket to socket */
2635 * Function called whenever an inbound tunnel is destroyed. Should clean up
2636 * any associated state. This function is NOT called if the client has
2637 * explicitly asked for the tunnel to be destroyed using
2638 * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
2641 * @param cls closure (set from GNUNET_MESH_connect)
2642 * @param tunnel connection to the other end (henceforth invalid)
2643 * @param tunnel_ctx place where local state associated
2644 * with the tunnel is stored
2647 tunnel_cleaner (void *cls,
2648 const struct GNUNET_MESH_Tunnel *tunnel,
2651 struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
2653 if (tunnel != socket->tunnel)
2657 LOG (GNUNET_ERROR_TYPE_DEBUG,
2658 "Peer %x has terminated connection abruptly\n",
2659 socket->other_peer);
2661 socket->status = GNUNET_STREAM_SHUTDOWN;
2663 /* Clear Transmit handles */
2664 if (NULL != socket->transmit_handle)
2666 GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2667 socket->transmit_handle = NULL;
2669 if (NULL != socket->ack_transmit_handle)
2671 GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle);
2672 GNUNET_free (socket->ack_msg);
2673 socket->ack_msg = NULL;
2674 socket->ack_transmit_handle = NULL;
2676 /* Stop Tasks using socket->tunnel */
2677 if (GNUNET_SCHEDULER_NO_TASK != socket->ack_task_id)
2679 GNUNET_SCHEDULER_cancel (socket->ack_task_id);
2680 socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
2682 if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
2684 GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
2685 socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
2687 /* FIXME: Cancel all other tasks using socket->tunnel */
2688 socket->tunnel = NULL;
2698 * Tries to open a stream to the target peer
2700 * @param cfg configuration to use
2701 * @param target the target peer to which the stream has to be opened
2702 * @param app_port the application port number which uniquely identifies this
2704 * @param open_cb this function will be called after stream has be established
2705 * @param open_cb_cls the closure for open_cb
2706 * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
2707 * @return if successful it returns the stream socket; NULL if stream cannot be
2710 struct GNUNET_STREAM_Socket *
2711 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
2712 const struct GNUNET_PeerIdentity *target,
2713 GNUNET_MESH_ApplicationType app_port,
2714 GNUNET_STREAM_OpenCallback open_cb,
2718 struct GNUNET_STREAM_Socket *socket;
2719 enum GNUNET_STREAM_Option option;
2720 GNUNET_MESH_ApplicationType ports[] = {app_port, 0};
2721 va_list vargs; /* Variable arguments */
2723 LOG (GNUNET_ERROR_TYPE_DEBUG,
2726 socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2727 socket->other_peer = GNUNET_PEER_intern (target);
2728 socket->open_cb = open_cb;
2729 socket->open_cls = open_cb_cls;
2731 socket->retransmit_timeout =
2732 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
2734 va_start (vargs, open_cb_cls); /* Parse variable args */
2736 option = va_arg (vargs, enum GNUNET_STREAM_Option);
2739 case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
2740 /* Expect struct GNUNET_TIME_Relative */
2741 socket->retransmit_timeout = va_arg (vargs,
2742 struct GNUNET_TIME_Relative);
2744 case GNUNET_STREAM_OPTION_END:
2747 } while (GNUNET_STREAM_OPTION_END != option);
2748 va_end (vargs); /* End of variable args parsing */
2749 socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
2750 10, /* QUEUE size as parameter? */
2752 NULL, /* No inbound tunnel handler */
2753 NULL, /* No in-tunnel cleaner */
2754 client_message_handlers,
2755 ports); /* We don't get inbound tunnels */
2756 if (NULL == socket->mesh) /* Fail if we cannot connect to mesh */
2758 GNUNET_free (socket);
2762 /* Now create the mesh tunnel to target */
2763 LOG (GNUNET_ERROR_TYPE_DEBUG,
2764 "Creating MESH Tunnel\n");
2765 socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
2766 NULL, /* Tunnel context */
2767 &mesh_peer_connect_callback,
2768 &mesh_peer_disconnect_callback,
2770 GNUNET_assert (NULL != socket->tunnel);
2771 GNUNET_MESH_peer_request_connect_add (socket->tunnel,
2774 LOG (GNUNET_ERROR_TYPE_DEBUG,
2775 "%s() END\n", __func__);
2781 * Shutdown the stream for reading or writing (similar to man 2 shutdown).
2783 * @param socket the stream socket
2784 * @param operation SHUT_RD, SHUT_WR or SHUT_RDWR
2785 * @param completion_cb the callback that will be called upon successful
2786 * shutdown of given operation
2787 * @param completion_cls the closure for the completion callback
2788 * @return the shutdown handle
2790 struct GNUNET_STREAM_ShutdownHandle *
2791 GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
2793 GNUNET_STREAM_ShutdownCompletion completion_cb,
2794 void *completion_cls)
2796 struct GNUNET_STREAM_ShutdownHandle *handle;
2797 struct GNUNET_STREAM_MessageHeader *msg;
2799 GNUNET_assert (NULL == socket->shutdown_handle);
2801 handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ShutdownHandle));
2802 handle->socket = socket;
2803 handle->completion_cb = completion_cb;
2804 handle->completion_cls = completion_cls;
2805 socket->shutdown_handle = handle;
2807 msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
2808 msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
2812 handle->operation = SHUT_RD;
2813 if (NULL != socket->read_handle)
2814 LOG (GNUNET_ERROR_TYPE_WARNING,
2815 "Existing read handle should be cancelled before shutting"
2817 msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
2818 queue_message (socket,
2820 &set_state_receive_close_wait,
2824 handle->operation = SHUT_WR;
2825 if (NULL != socket->write_handle)
2826 LOG (GNUNET_ERROR_TYPE_WARNING,
2827 "Existing write handle should be cancelled before shutting"
2829 msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
2830 queue_message (socket,
2832 &set_state_transmit_close_wait,
2836 handle->operation = SHUT_RDWR;
2837 if (NULL != socket->write_handle)
2838 LOG (GNUNET_ERROR_TYPE_WARNING,
2839 "Existing write handle should be cancelled before shutting"
2841 if (NULL != socket->read_handle)
2842 LOG (GNUNET_ERROR_TYPE_WARNING,
2843 "Existing read handle should be cancelled before shutting"
2845 msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
2846 queue_message (socket,
2848 &set_state_close_wait,
2852 LOG (GNUNET_ERROR_TYPE_WARNING,
2853 "GNUNET_STREAM_shutdown called with invalid value for "
2854 "parameter operation -- Ignoring\n");
2856 GNUNET_free (handle);
2859 handle->close_msg_retransmission_task_id =
2860 GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
2861 &close_msg_retransmission_task,
2868 * Cancels a pending shutdown
2870 * @param handle the shutdown handle returned from GNUNET_STREAM_shutdown
2873 GNUNET_STREAM_shutdown_cancel (struct GNUNET_STREAM_ShutdownHandle *handle)
2875 if (GNUNET_SCHEDULER_NO_TASK != handle->close_msg_retransmission_task_id)
2876 GNUNET_SCHEDULER_cancel (handle->close_msg_retransmission_task_id);
2877 GNUNET_free (handle);
2885 * @param socket the stream socket
2888 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
2890 struct MessageQueue *head;
2892 GNUNET_break (NULL == socket->read_handle);
2893 GNUNET_break (NULL == socket->write_handle);
2895 if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
2897 /* socket closed with read task pending!? */
2899 GNUNET_SCHEDULER_cancel (socket->read_task_id);
2900 socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
2903 /* Terminate the ack'ing tasks if they are still present */
2904 if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK)
2906 GNUNET_SCHEDULER_cancel (socket->ack_task_id);
2907 socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
2910 /* Clear Transmit handles */
2911 if (NULL != socket->transmit_handle)
2913 GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2914 socket->transmit_handle = NULL;
2916 if (NULL != socket->ack_transmit_handle)
2918 GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle);
2919 GNUNET_free (socket->ack_msg);
2920 socket->ack_msg = NULL;
2921 socket->ack_transmit_handle = NULL;
2924 /* Clear existing message queue */
2925 while (NULL != (head = socket->queue_head)) {
2926 GNUNET_CONTAINER_DLL_remove (socket->queue_head,
2929 GNUNET_free (head->message);
2933 /* Close associated tunnel */
2934 if (NULL != socket->tunnel)
2936 GNUNET_MESH_tunnel_destroy (socket->tunnel);
2937 socket->tunnel = NULL;
2940 /* Close mesh connection */
2941 if (NULL != socket->mesh && NULL == socket->lsocket)
2943 GNUNET_MESH_disconnect (socket->mesh);
2944 socket->mesh = NULL;
2947 /* Release receive buffer */
2948 if (NULL != socket->receive_buffer)
2950 GNUNET_free (socket->receive_buffer);
2953 GNUNET_free (socket);
2958 * Listens for stream connections for a specific application ports
2960 * @param cfg the configuration to use
2961 * @param app_port the application port for which new streams will be accepted
2962 * @param listen_cb this function will be called when a peer tries to establish
2964 * @param listen_cb_cls closure for listen_cb
2965 * @return listen socket, NULL for any error
2967 struct GNUNET_STREAM_ListenSocket *
2968 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
2969 GNUNET_MESH_ApplicationType app_port,
2970 GNUNET_STREAM_ListenCallback listen_cb,
2971 void *listen_cb_cls)
2973 /* FIXME: Add variable args for passing configration options? */
2974 struct GNUNET_STREAM_ListenSocket *lsocket;
2975 GNUNET_MESH_ApplicationType ports[] = {app_port, 0};
2977 lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
2978 lsocket->port = app_port;
2979 lsocket->listen_cb = listen_cb;
2980 lsocket->listen_cb_cls = listen_cb_cls;
2981 lsocket->mesh = GNUNET_MESH_connect (cfg,
2982 10, /* FIXME: QUEUE size as parameter? */
2983 lsocket, /* Closure */
2986 server_message_handlers,
2988 GNUNET_assert (NULL != lsocket->mesh);
2994 * Closes the listen socket
2996 * @param lsocket the listen socket
2999 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
3001 /* Close MESH connection */
3002 GNUNET_assert (NULL != lsocket->mesh);
3003 GNUNET_MESH_disconnect (lsocket->mesh);
3005 GNUNET_free (lsocket);
3010 * Tries to write the given data to the stream. The maximum size of data that
3011 * can be written as part of a write operation is (64 * (64000 - sizeof (struct
3012 * GNUNET_STREAM_DataMessage))). If size is greater than this it is not an API
3013 * violation, however only the said number of maximum bytes will be written.
3015 * @param socket the socket representing a stream
3016 * @param data the data buffer from where the data is written into the stream
3017 * @param size the number of bytes to be written from the data buffer
3018 * @param timeout the timeout period
3019 * @param write_cont the function to call upon writing some bytes into the
3021 * @param write_cont_cls the closure
3023 * @return handle to cancel the operation; if a previous write is pending or
3024 * the stream has been shutdown for this operation then write_cont is
3025 * immediately called and NULL is returned.
3027 struct GNUNET_STREAM_IOWriteHandle *
3028 GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
3031 struct GNUNET_TIME_Relative timeout,
3032 GNUNET_STREAM_CompletionContinuation write_cont,
3033 void *write_cont_cls)
3035 unsigned int num_needed_packets;
3036 unsigned int packet;
3037 struct GNUNET_STREAM_IOWriteHandle *io_handle;
3038 uint32_t packet_size;
3039 uint32_t payload_size;
3040 struct GNUNET_STREAM_DataMessage *data_msg;
3042 struct GNUNET_TIME_Relative ack_deadline;
3044 LOG (GNUNET_ERROR_TYPE_DEBUG,
3047 /* Return NULL if there is already a write request pending */
3048 if (NULL != socket->write_handle)
3054 switch (socket->state)
3056 case STATE_TRANSMIT_CLOSED:
3057 case STATE_TRANSMIT_CLOSE_WAIT:
3059 case STATE_CLOSE_WAIT:
3060 if (NULL != write_cont)
3061 write_cont (write_cont_cls, GNUNET_STREAM_SHUTDOWN, 0);
3062 LOG (GNUNET_ERROR_TYPE_DEBUG,
3063 "%s() END\n", __func__);
3067 case STATE_HELLO_WAIT:
3068 if (NULL != write_cont)
3069 /* FIXME: GNUNET_STREAM_SYSERR?? */
3070 write_cont (write_cont_cls, GNUNET_STREAM_SYSERR, 0);
3071 LOG (GNUNET_ERROR_TYPE_DEBUG,
3072 "%s() END\n", __func__);
3074 case STATE_ESTABLISHED:
3075 case STATE_RECEIVE_CLOSED:
3076 case STATE_RECEIVE_CLOSE_WAIT:
3080 if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size)
3081 size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size;
3082 num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size;
3083 io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
3084 io_handle->socket = socket;
3085 io_handle->write_cont = write_cont;
3086 io_handle->write_cont_cls = write_cont_cls;
3087 io_handle->size = size;
3089 /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3090 determined from RTT */
3091 ack_deadline = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
3092 /* Divide the given buffer into packets for sending */
3093 for (packet=0; packet < num_needed_packets; packet++)
3095 if ((packet + 1) * max_payload_size < size)
3097 payload_size = max_payload_size;
3098 packet_size = MAX_PACKET_SIZE;
3102 payload_size = size - packet * max_payload_size;
3103 packet_size = payload_size + sizeof (struct
3104 GNUNET_STREAM_DataMessage);
3106 io_handle->messages[packet] = GNUNET_malloc (packet_size);
3107 io_handle->messages[packet]->header.header.size = htons (packet_size);
3108 io_handle->messages[packet]->header.header.type =
3109 htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
3110 io_handle->messages[packet]->sequence_number =
3111 htonl (socket->write_sequence_number++);
3112 io_handle->messages[packet]->offset = htonl (socket->write_offset);
3114 /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3115 determined from RTT */
3116 io_handle->messages[packet]->ack_deadline =
3117 GNUNET_TIME_relative_hton (ack_deadline);
3118 data_msg = io_handle->messages[packet];
3119 /* Copy data from given buffer to the packet */
3120 memcpy (&data_msg[1],
3123 sweep += payload_size;
3124 socket->write_offset += payload_size;
3126 socket->write_handle = io_handle;
3127 write_data (socket);
3129 LOG (GNUNET_ERROR_TYPE_DEBUG,
3130 "%s() END\n", __func__);
3138 * Tries to read data from the stream.
3140 * @param socket the socket representing a stream
3141 * @param timeout the timeout period
3142 * @param proc function to call with data (once only)
3143 * @param proc_cls the closure for proc
3145 * @return handle to cancel the operation; if the stream has been shutdown for
3146 * this type of opeartion then the DataProcessor is immediately
3147 * called with GNUNET_STREAM_SHUTDOWN as status and NULL if returned
3149 struct GNUNET_STREAM_IOReadHandle *
3150 GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
3151 struct GNUNET_TIME_Relative timeout,
3152 GNUNET_STREAM_DataProcessor proc,
3155 struct GNUNET_STREAM_IOReadHandle *read_handle;
3157 LOG (GNUNET_ERROR_TYPE_DEBUG,
3161 /* Return NULL if there is already a read handle; the user has to cancel that
3162 first before continuing or has to wait until it is completed */
3163 if (NULL != socket->read_handle) return NULL;
3165 GNUNET_assert (NULL != proc);
3167 switch (socket->state)
3169 case STATE_RECEIVE_CLOSED:
3170 case STATE_RECEIVE_CLOSE_WAIT:
3172 case STATE_CLOSE_WAIT:
3173 proc (proc_cls, GNUNET_STREAM_SHUTDOWN, NULL, 0);
3174 LOG (GNUNET_ERROR_TYPE_DEBUG,
3182 read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
3183 read_handle->proc = proc;
3184 read_handle->proc_cls = proc_cls;
3185 socket->read_handle = read_handle;
3187 /* Check if we have a packet at bitmap 0 */
3188 if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
3191 socket->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
3196 /* Setup the read timeout task */
3197 socket->read_io_timeout_task_id =
3198 GNUNET_SCHEDULER_add_delayed (timeout,
3201 LOG (GNUNET_ERROR_TYPE_DEBUG,
3209 * Cancel pending write operation.
3211 * @param ioh handle to operation to cancel
3214 GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
3216 struct GNUNET_STREAM_Socket *socket = ioh->socket;
3217 unsigned int packet;
3219 GNUNET_assert (NULL != socket->write_handle);
3220 GNUNET_assert (socket->write_handle == ioh);
3222 if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
3224 GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
3225 socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
3228 for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
3230 if (NULL == ioh->messages[packet]) break;
3231 GNUNET_free (ioh->messages[packet]);
3234 GNUNET_free (socket->write_handle);
3235 socket->write_handle = NULL;
3241 * Cancel pending read operation.
3243 * @param ioh handle to operation to cancel
3246 GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh)