2 This file is part of GNUnet.
3 (C) 2012 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
23 * Checks for matching the sender and socket->other_peer in server
26 * Decrement PEER intern count during socket close and listen close to free the
27 * memory used for PEER interning
29 * Add code for write io timeout
31 * Include retransmission for control messages
35 * @file stream/stream_api.c
36 * @brief Implementation of the stream library
37 * @author Sree Harsha Totakura
42 #include "gnunet_common.h"
43 #include "gnunet_crypto_lib.h"
44 #include "gnunet_stream_lib.h"
45 #include "gnunet_testing_lib.h"
46 #include "stream_protocol.h"
50 * The maximum packet size of a stream packet
52 #define MAX_PACKET_SIZE 64000
57 #define RECEIVE_BUFFER_SIZE 4096000
60 * The maximum payload a data message packet can carry
62 static size_t max_payload_size =
63 MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
66 * states in the Protocol
71 * Client initialization state
76 * Listener initialization state
81 * Pre-connection establishment state
86 * State where a connection has been established
91 * State where the socket is closed on our side and waiting to be ACK'ed
93 STATE_RECEIVE_CLOSE_WAIT,
96 * State where the socket is closed for reading
101 * State where the socket is closed on our side and waiting to be ACK'ed
103 STATE_TRANSMIT_CLOSE_WAIT,
106 * State where the socket is closed for writing
108 STATE_TRANSMIT_CLOSED,
111 * State where the socket is closed on our side and waiting to be ACK'ed
116 * State where the socket is closed
123 * Functions of this type are called when a message is written
125 * @param cls the closure from queue_message
126 * @param socket the socket the written message was bound to
128 typedef void (*SendFinishCallback) (void *cls,
129 struct GNUNET_STREAM_Socket *socket);
133 * The send message queue
140 struct GNUNET_STREAM_MessageHeader *message;
143 * Callback to be called when the message is sent
145 SendFinishCallback finish_cb;
148 * The closure for finish_cb
153 * The next message in queue. Should be NULL in the last message
155 struct MessageQueue *next;
158 * The next message in queue. Should be NULL in the first message
160 struct MessageQueue *prev;
165 * The STREAM Socket Handler
167 struct GNUNET_STREAM_Socket
170 * Retransmission timeout
172 struct GNUNET_TIME_Relative retransmit_timeout;
175 * The Acknowledgement Bitmap
177 GNUNET_STREAM_AckBitmap ack_bitmap;
180 * Time when the Acknowledgement was queued
182 struct GNUNET_TIME_Absolute ack_time_registered;
185 * Queued Acknowledgement deadline
187 struct GNUNET_TIME_Relative ack_time_deadline;
192 struct GNUNET_MESH_Handle *mesh;
195 * The mesh tunnel handle
197 struct GNUNET_MESH_Tunnel *tunnel;
200 * Stream open closure
205 * Stream open callback
207 GNUNET_STREAM_OpenCallback open_cb;
210 * The current transmit handle (if a pending transmit request exists)
212 struct GNUNET_MESH_TransmitHandle *transmit_handle;
215 * The current message associated with the transmit handle
217 struct MessageQueue *queue_head;
220 * The queue tail, should always point to the last message in queue
222 struct MessageQueue *queue_tail;
225 * The write IO_handle associated with this socket
227 struct GNUNET_STREAM_IOWriteHandle *write_handle;
230 * The read IO_handle associated with this socket
232 struct GNUNET_STREAM_IOReadHandle *read_handle;
235 * The shutdown handle associated with this socket
237 struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
240 * Buffer for storing received messages
242 void *receive_buffer;
245 * The listen socket from which this socket is derived. Should be NULL if it
246 * is not a derived socket
248 struct GNUNET_STREAM_ListenSocket *lsocket;
251 * Task identifier for the read io timeout task
253 GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id;
256 * Task identifier for retransmission task after timeout
258 GNUNET_SCHEDULER_TaskIdentifier retransmission_timeout_task_id;
261 * The task for sending timely Acks
263 GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
266 * Task scheduled to continue a read operation.
268 GNUNET_SCHEDULER_TaskIdentifier read_task_id;
271 * The state of the protocol associated with this socket
276 * The status of the socket
278 enum GNUNET_STREAM_Status status;
281 * The number of previous timeouts; FIXME: currently not used
283 unsigned int retries;
286 * The peer identity of the peer at the other end of the stream
288 GNUNET_PEER_Id other_peer;
291 * Our Peer Identity (for debugging)
293 GNUNET_PEER_Id our_id;
296 * The application port number (type: uint32_t)
298 GNUNET_MESH_ApplicationType app_port;
301 * The session id associated with this stream connection
302 * FIXME: Not used currently, may be removed
307 * Write sequence number. Set to random when sending HELLO(client) and
310 uint32_t write_sequence_number;
313 * Read sequence number. This number's value is determined during handshake
315 uint32_t read_sequence_number;
318 * The receiver buffer size
320 uint32_t receive_buffer_size;
323 * The receiver buffer boundaries
325 uint32_t receive_buffer_boundaries[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
328 * receiver's available buffer after the last acknowledged packet
330 uint32_t receiver_window_available;
333 * The offset pointer used during write operation
335 uint32_t write_offset;
338 * The offset after which we are expecting data
340 uint32_t read_offset;
343 * The offset upto which user has read from the received buffer
345 uint32_t copy_offset;
350 * A socket for listening
352 struct GNUNET_STREAM_ListenSocket
357 struct GNUNET_MESH_Handle *mesh;
360 * The callback function which is called after successful opening socket
362 GNUNET_STREAM_ListenCallback listen_cb;
365 * The call back closure
370 * Our interned Peer's identity
372 GNUNET_PEER_Id our_id;
376 * FIXME: Remove if not required!
378 GNUNET_MESH_ApplicationType port;
383 * The IO Write Handle
385 struct GNUNET_STREAM_IOWriteHandle
388 * The socket to which this write handle is associated
390 struct GNUNET_STREAM_Socket *socket;
393 * The packet_buffers associated with this Handle
395 struct GNUNET_STREAM_DataMessage *messages[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
398 * The write continuation callback
400 GNUNET_STREAM_CompletionContinuation write_cont;
403 * Write continuation closure
405 void *write_cont_cls;
408 * The bitmap of this IOHandle; Corresponding bit for a message is set when
409 * it has been acknowledged by the receiver
411 GNUNET_STREAM_AckBitmap ack_bitmap;
414 * Number of bytes in this write handle
423 struct GNUNET_STREAM_IOReadHandle
426 * Callback for the read processor
428 GNUNET_STREAM_DataProcessor proc;
431 * The closure pointer for the read processor callback
438 * Handle for Shutdown
440 struct GNUNET_STREAM_ShutdownHandle
443 * The socket associated with this shutdown handle
445 struct GNUNET_STREAM_Socket *socket;
448 * Which operation to shutdown? SHUT_RD, SHUT_WR or SHUT_RDWR
453 * Shutdown completion callback
455 GNUNET_STREAM_ShutdownCompletion completion_cb;
458 * Closure for completion callback
460 void *completion_cls;
465 * Default value in seconds for various timeouts
467 static unsigned int default_timeout = 10;
471 * Callback function for sending queued message
473 * @param cls closure the socket
474 * @param size number of bytes available in buf
475 * @param buf where the callee should write the message
476 * @return number of bytes written to buf
479 send_message_notify (void *cls, size_t size, void *buf)
481 struct GNUNET_STREAM_Socket *socket = cls;
482 struct GNUNET_PeerIdentity target;
483 struct MessageQueue *head;
486 socket->transmit_handle = NULL; /* Remove the transmit handle */
487 head = socket->queue_head;
489 return 0; /* just to be safe */
490 GNUNET_PEER_resolve (socket->other_peer, &target);
491 if (0 == size) /* request timed out */
494 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
495 "Message sending timed out. Retry %d \n",
497 socket->transmit_handle =
498 GNUNET_MESH_notify_transmit_ready (socket->tunnel,
501 /* FIXME: exponential backoff */
502 socket->retransmit_timeout,
504 ntohs (head->message->header.size),
505 &send_message_notify,
510 ret = ntohs (head->message->header.size);
511 GNUNET_assert (size >= ret);
512 memcpy (buf, head->message, ret);
513 if (NULL != head->finish_cb)
515 head->finish_cb (head->finish_cb_cls, socket);
517 GNUNET_CONTAINER_DLL_remove (socket->queue_head,
520 GNUNET_free (head->message);
522 head = socket->queue_head;
523 if (NULL != head) /* more pending messages to send */
526 socket->transmit_handle =
527 GNUNET_MESH_notify_transmit_ready (socket->tunnel,
530 /* FIXME: exponential backoff */
531 socket->retransmit_timeout,
533 ntohs (head->message->header.size),
534 &send_message_notify,
542 * Queues a message for sending using the mesh connection of a socket
544 * @param socket the socket whose mesh connection is used
545 * @param message the message to be sent
546 * @param finish_cb the callback to be called when the message is sent
547 * @param finish_cb_cls the closure for the callback
550 queue_message (struct GNUNET_STREAM_Socket *socket,
551 struct GNUNET_STREAM_MessageHeader *message,
552 SendFinishCallback finish_cb,
555 struct MessageQueue *queue_entity;
556 struct GNUNET_PeerIdentity target;
559 ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA)
560 && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK));
562 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
563 "%x: Queueing message of type %d and size %d\n",
565 ntohs (message->header.type),
566 ntohs (message->header.size));
567 GNUNET_assert (NULL != message);
568 queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
569 queue_entity->message = message;
570 queue_entity->finish_cb = finish_cb;
571 queue_entity->finish_cb_cls = finish_cb_cls;
572 GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
575 if (NULL == socket->transmit_handle)
578 GNUNET_PEER_resolve (socket->other_peer, &target);
579 socket->transmit_handle =
580 GNUNET_MESH_notify_transmit_ready (socket->tunnel,
583 socket->retransmit_timeout,
585 ntohs (message->header.size),
586 &send_message_notify,
593 * Copies a message and queues it for sending using the mesh connection of
596 * @param socket the socket whose mesh connection is used
597 * @param message the message to be sent
598 * @param finish_cb the callback to be called when the message is sent
599 * @param finish_cb_cls the closure for the callback
602 copy_and_queue_message (struct GNUNET_STREAM_Socket *socket,
603 const struct GNUNET_STREAM_MessageHeader *message,
604 SendFinishCallback finish_cb,
607 struct GNUNET_STREAM_MessageHeader *msg_copy;
610 size = ntohs (message->header.size);
611 msg_copy = GNUNET_malloc (size);
612 memcpy (msg_copy, message, size);
613 queue_message (socket, msg_copy, finish_cb, finish_cb_cls);
618 * Callback function for sending ack message
620 * @param cls closure the ACK message created in ack_task
621 * @param size number of bytes available in buffer
622 * @param buf where the callee should write the message
623 * @return number of bytes written to buf
626 send_ack_notify (void *cls, size_t size, void *buf)
628 struct GNUNET_STREAM_AckMessage *ack_msg = cls;
632 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
633 "%s called with size 0\n", __func__);
636 GNUNET_assert (ntohs (ack_msg->header.header.size) <= size);
638 size = ntohs (ack_msg->header.header.size);
639 memcpy (buf, ack_msg, size);
644 * Writes data using the given socket. The amount of data written is limited by
645 * the receiver_window_size
647 * @param socket the socket to use
650 write_data (struct GNUNET_STREAM_Socket *socket);
653 * Task for retransmitting data messages if they aren't ACK before their ack
656 * @param cls the socket
657 * @param tc the Task context
660 retransmission_timeout_task (void *cls,
661 const struct GNUNET_SCHEDULER_TaskContext *tc)
663 struct GNUNET_STREAM_Socket *socket = cls;
665 if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
668 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
669 "%x: Retransmitting DATA...\n", socket->our_id);
670 socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
676 * Task for sending ACK message
678 * @param cls the socket
679 * @param tc the Task context
683 const struct GNUNET_SCHEDULER_TaskContext *tc)
685 struct GNUNET_STREAM_Socket *socket = cls;
686 struct GNUNET_STREAM_AckMessage *ack_msg;
687 struct GNUNET_PeerIdentity target;
689 if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
694 socket->ack_task_id = 0;
696 /* Create the ACK Message */
697 ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
698 ack_msg->header.header.size = htons (sizeof (struct
699 GNUNET_STREAM_AckMessage));
700 ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
701 ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
702 ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
703 ack_msg->receive_window_remaining =
704 htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
706 GNUNET_PEER_resolve (socket->other_peer, &target);
707 /* Request MESH for sending ACK */
708 GNUNET_MESH_notify_transmit_ready (socket->tunnel,
711 socket->retransmit_timeout,
713 ntohs (ack_msg->header.header.size),
722 * Function to modify a bit in GNUNET_STREAM_AckBitmap
724 * @param bitmap the bitmap to modify
725 * @param bit the bit number to modify
726 * @param value GNUNET_YES to on, GNUNET_NO to off
729 ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
733 GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
734 if (GNUNET_YES == value)
735 *bitmap |= (1LL << bit);
737 *bitmap &= ~(1LL << bit);
742 * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
744 * @param bitmap address of the bitmap that has to be checked
745 * @param bit the bit number to check
746 * @return GNUNET_YES if the bit is set; GNUNET_NO if not
749 ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
752 GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
753 return 0 != (*bitmap & (1LL << bit));
758 * Writes data using the given socket. The amount of data written is limited by
759 * the receiver_window_size
761 * @param socket the socket to use
764 write_data (struct GNUNET_STREAM_Socket *socket)
766 struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
767 int packet; /* Although an int, should never be negative */
771 /* Find the last acknowledged packet */
772 for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
774 if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
777 else if (NULL == io_handle->messages[packet])
780 /* Resend packets which weren't ack'ed */
781 for (packet=0; packet < ack_packet; packet++)
783 if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
786 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
787 "%x: Placing DATA message with sequence %u in send queue\n",
789 ntohl (io_handle->messages[packet]->sequence_number));
791 copy_and_queue_message (socket,
792 &io_handle->messages[packet]->header,
797 packet = ack_packet + 1;
798 /* Now send new packets if there is enough buffer space */
799 while ( (NULL != io_handle->messages[packet]) &&
800 (socket->receiver_window_available
801 >= ntohs (io_handle->messages[packet]->header.header.size)) )
803 socket->receiver_window_available -=
804 ntohs (io_handle->messages[packet]->header.header.size);
805 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
806 "%x: Placing DATA message with sequence %u in send queue\n",
808 ntohl (io_handle->messages[packet]->sequence_number));
809 copy_and_queue_message (socket,
810 &io_handle->messages[packet]->header,
816 if (GNUNET_SCHEDULER_NO_TASK == socket->retransmission_timeout_task_id)
817 socket->retransmission_timeout_task_id =
818 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
819 (GNUNET_TIME_UNIT_SECONDS, 8),
820 &retransmission_timeout_task,
826 * Task for calling the read processor
828 * @param cls the socket
829 * @param tc the task context
832 call_read_processor (void *cls,
833 const struct GNUNET_SCHEDULER_TaskContext *tc)
835 struct GNUNET_STREAM_Socket *socket = cls;
837 size_t valid_read_size;
839 uint32_t sequence_increase;
840 uint32_t offset_increase;
842 socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
843 if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
846 if (NULL == socket->receive_buffer)
849 GNUNET_assert (NULL != socket->read_handle);
850 GNUNET_assert (NULL != socket->read_handle->proc);
852 /* Check the bitmap for any holes */
853 for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
855 if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
859 /* We only call read processor if we have the first packet */
860 GNUNET_assert (0 < packet);
863 socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
865 GNUNET_assert (0 != valid_read_size);
867 /* Cancel the read_io_timeout_task */
868 GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task_id);
869 socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
871 /* Call the data processor */
872 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
873 "%x: Calling read processor\n",
876 socket->read_handle->proc (socket->read_handle->proc_cls,
878 socket->receive_buffer + socket->copy_offset,
880 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
881 "%x: Read processor read %d bytes\n",
884 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
885 "%x: Read processor completed successfully\n",
888 /* Free the read handle */
889 GNUNET_free (socket->read_handle);
890 socket->read_handle = NULL;
892 GNUNET_assert (read_size <= valid_read_size);
893 socket->copy_offset += read_size;
895 /* Determine upto which packet we can remove from the buffer */
896 for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
898 if (socket->copy_offset == socket->receive_buffer_boundaries[packet])
900 if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
904 /* If no packets can be removed we can't move the buffer */
905 if (0 == packet) return;
907 sequence_increase = packet;
908 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
909 "%x: Sequence increase after read processor completion: %u\n",
913 /* Shift the data in the receive buffer */
914 memmove (socket->receive_buffer,
915 socket->receive_buffer
916 + socket->receive_buffer_boundaries[sequence_increase-1],
917 socket->receive_buffer_size
918 - socket->receive_buffer_boundaries[sequence_increase-1]);
920 /* Shift the bitmap */
921 socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
923 /* Set read_sequence_number */
924 socket->read_sequence_number += sequence_increase;
926 /* Set read_offset */
927 offset_increase = socket->receive_buffer_boundaries[sequence_increase-1];
928 socket->read_offset += offset_increase;
930 /* Fix copy_offset */
931 GNUNET_assert (offset_increase <= socket->copy_offset);
932 socket->copy_offset -= offset_increase;
934 /* Fix relative boundaries */
935 for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
937 if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase)
939 socket->receive_buffer_boundaries[packet] =
940 socket->receive_buffer_boundaries[packet + sequence_increase]
944 socket->receive_buffer_boundaries[packet] = 0;
950 * Cancels the existing read io handle
952 * @param cls the closure from the SCHEDULER call
953 * @param tc the task context
956 read_io_timeout (void *cls,
957 const struct GNUNET_SCHEDULER_TaskContext *tc)
959 struct GNUNET_STREAM_Socket *socket = cls;
960 GNUNET_STREAM_DataProcessor proc;
963 socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
964 if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
966 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
967 "%x: Read task timedout - Cancelling it\n",
969 GNUNET_SCHEDULER_cancel (socket->read_task_id);
970 socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
972 GNUNET_assert (NULL != socket->read_handle);
973 proc = socket->read_handle->proc;
974 proc_cls = socket->read_handle->proc_cls;
976 GNUNET_free (socket->read_handle);
977 socket->read_handle = NULL;
978 /* Call the read processor to signal timeout */
980 GNUNET_STREAM_TIMEOUT,
987 * Handler for DATA messages; Same for both client and server
989 * @param socket the socket through which the ack was received
990 * @param tunnel connection to the other end
991 * @param sender who sent the message
992 * @param msg the data message
993 * @param atsi performance data for the connection
994 * @return GNUNET_OK to keep the connection open,
995 * GNUNET_SYSERR to close it (signal serious error)
998 handle_data (struct GNUNET_STREAM_Socket *socket,
999 struct GNUNET_MESH_Tunnel *tunnel,
1000 const struct GNUNET_PeerIdentity *sender,
1001 const struct GNUNET_STREAM_DataMessage *msg,
1002 const struct GNUNET_ATS_Information*atsi)
1004 const void *payload;
1005 uint32_t bytes_needed;
1006 uint32_t relative_offset;
1007 uint32_t relative_sequence_number;
1010 size = htons (msg->header.header.size);
1011 if (size < sizeof (struct GNUNET_STREAM_DataMessage))
1013 GNUNET_break_op (0);
1014 return GNUNET_SYSERR;
1017 if (GNUNET_PEER_search (sender) != socket->other_peer)
1019 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1020 "%x: Received DATA from non-confirming peer\n",
1025 switch (socket->state)
1027 case STATE_ESTABLISHED:
1028 case STATE_TRANSMIT_CLOSED:
1029 case STATE_TRANSMIT_CLOSE_WAIT:
1031 /* check if the message's sequence number is in the range we are
1033 relative_sequence_number =
1034 ntohl (msg->sequence_number) - socket->read_sequence_number;
1035 if ( relative_sequence_number > GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
1037 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1038 "%x: Ignoring received message with sequence number %u\n",
1040 ntohl (msg->sequence_number));
1041 /* Start ACK sending task if one is not already present */
1042 if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1044 socket->ack_task_id =
1045 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1046 (msg->ack_deadline),
1053 /* Check if we have already seen this message */
1054 if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
1055 relative_sequence_number))
1057 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1058 "%x: Ignoring already received message with sequence "
1061 ntohl (msg->sequence_number));
1062 /* Start ACK sending task if one is not already present */
1063 if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1065 socket->ack_task_id =
1066 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1067 (msg->ack_deadline),
1074 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1075 "%x: Receiving DATA with sequence number: %u and size: %d "
1078 ntohl (msg->sequence_number),
1079 ntohs (msg->header.header.size),
1080 socket->other_peer);
1082 /* Check if we have to allocate the buffer */
1083 size -= sizeof (struct GNUNET_STREAM_DataMessage);
1084 relative_offset = ntohl (msg->offset) - socket->read_offset;
1085 bytes_needed = relative_offset + size;
1086 if (bytes_needed > socket->receive_buffer_size)
1088 if (bytes_needed <= RECEIVE_BUFFER_SIZE)
1090 socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
1092 socket->receive_buffer_size = bytes_needed;
1096 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1097 "%x: Cannot accommodate packet %d as buffer is",
1100 ntohl (msg->sequence_number));
1105 /* Copy Data to buffer */
1107 GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
1108 memcpy (socket->receive_buffer + relative_offset,
1111 socket->receive_buffer_boundaries[relative_sequence_number] =
1112 relative_offset + size;
1114 /* Modify the ACK bitmap */
1115 ackbitmap_modify_bit (&socket->ack_bitmap,
1116 relative_sequence_number,
1119 /* Start ACK sending task if one is not already present */
1120 if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1122 socket->ack_task_id =
1123 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1124 (msg->ack_deadline),
1129 if ((NULL != socket->read_handle) /* A read handle is waiting */
1130 /* There is no current read task */
1131 && (GNUNET_SCHEDULER_NO_TASK == socket->read_task_id)
1132 /* We have the first packet */
1133 && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap,
1136 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1137 "%x: Scheduling read processor\n",
1140 socket->read_task_id =
1141 GNUNET_SCHEDULER_add_now (&call_read_processor,
1148 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1149 "%x: Received data message when it cannot be handled\n",
1158 * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1160 * @param cls the socket (set from GNUNET_MESH_connect)
1161 * @param tunnel connection to the other end
1162 * @param tunnel_ctx place to store local state associated with the tunnel
1163 * @param sender who sent the message
1164 * @param message the actual message
1165 * @param atsi performance data for the connection
1166 * @return GNUNET_OK to keep the connection open,
1167 * GNUNET_SYSERR to close it (signal serious error)
1170 client_handle_data (void *cls,
1171 struct GNUNET_MESH_Tunnel *tunnel,
1173 const struct GNUNET_PeerIdentity *sender,
1174 const struct GNUNET_MessageHeader *message,
1175 const struct GNUNET_ATS_Information*atsi)
1177 struct GNUNET_STREAM_Socket *socket = cls;
1179 return handle_data (socket,
1182 (const struct GNUNET_STREAM_DataMessage *) message,
1188 * Callback to set state to ESTABLISHED
1190 * @param cls the closure from queue_message FIXME: document
1191 * @param socket the socket to requiring state change
1194 set_state_established (void *cls,
1195 struct GNUNET_STREAM_Socket *socket)
1197 struct GNUNET_PeerIdentity initiator_pid;
1199 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1200 "%x: Attaining ESTABLISHED state\n",
1202 socket->write_offset = 0;
1203 socket->read_offset = 0;
1204 socket->state = STATE_ESTABLISHED;
1205 /* FIXME: What if listen_cb is NULL */
1206 if (NULL != socket->lsocket)
1208 GNUNET_PEER_resolve (socket->other_peer, &initiator_pid);
1209 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1210 "%x: Calling listen callback\n",
1212 if (GNUNET_SYSERR ==
1213 socket->lsocket->listen_cb (socket->lsocket->listen_cb_cls,
1217 socket->state = STATE_CLOSED;
1218 /* FIXME: We should close in a decent way */
1219 GNUNET_MESH_tunnel_destroy (socket->tunnel); /* Destroy the tunnel */
1220 GNUNET_free (socket);
1223 else if (socket->open_cb)
1224 socket->open_cb (socket->open_cls, socket);
1229 * Callback to set state to HELLO_WAIT
1231 * @param cls the closure from queue_message
1232 * @param socket the socket to requiring state change
1235 set_state_hello_wait (void *cls,
1236 struct GNUNET_STREAM_Socket *socket)
1238 GNUNET_assert (STATE_INIT == socket->state);
1239 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1240 "%x: Attaining HELLO_WAIT state\n",
1242 socket->state = STATE_HELLO_WAIT;
1247 * Callback to set state to CLOSE_WAIT
1249 * @param cls the closure from queue_message
1250 * @param socket the socket requiring state change
1253 set_state_close_wait (void *cls,
1254 struct GNUNET_STREAM_Socket *socket)
1256 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1257 "%x: Attaing CLOSE_WAIT state\n",
1259 socket->state = STATE_CLOSE_WAIT;
1260 GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1261 socket->receive_buffer = NULL;
1262 socket->receive_buffer_size = 0;
1267 * Callback to set state to CLOSED
1269 * @param cls the closure from queue_message
1270 * @param socket the socket requiring state change
1273 set_state_closed (void *cls,
1274 struct GNUNET_STREAM_Socket *socket)
1276 socket->state = STATE_CLOSED;
1280 * Returns a new HelloAckMessage. Also sets the write sequence number for the
1283 * @param socket the socket for which this HelloAckMessage has to be generated
1284 * @return the HelloAckMessage
1286 static struct GNUNET_STREAM_HelloAckMessage *
1287 generate_hello_ack_msg (struct GNUNET_STREAM_Socket *socket)
1289 struct GNUNET_STREAM_HelloAckMessage *msg;
1291 /* Get the random sequence number */
1292 socket->write_sequence_number =
1293 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1294 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1295 "%x: Generated write sequence number %u\n",
1297 (unsigned int) socket->write_sequence_number);
1299 msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1300 msg->header.header.size =
1301 htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1302 msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1303 msg->sequence_number = htonl (socket->write_sequence_number);
1304 msg->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
1311 * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1313 * @param cls the socket (set from GNUNET_MESH_connect)
1314 * @param tunnel connection to the other end
1315 * @param tunnel_ctx this is NULL
1316 * @param sender who sent the message
1317 * @param message the actual message
1318 * @param atsi performance data for the connection
1319 * @return GNUNET_OK to keep the connection open,
1320 * GNUNET_SYSERR to close it (signal serious error)
1323 client_handle_hello_ack (void *cls,
1324 struct GNUNET_MESH_Tunnel *tunnel,
1326 const struct GNUNET_PeerIdentity *sender,
1327 const struct GNUNET_MessageHeader *message,
1328 const struct GNUNET_ATS_Information*atsi)
1330 struct GNUNET_STREAM_Socket *socket = cls;
1331 const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
1332 struct GNUNET_STREAM_HelloAckMessage *reply;
1334 if (GNUNET_PEER_search (sender) != socket->other_peer)
1336 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1337 "%x: Received HELLO_ACK from non-confirming peer\n",
1341 ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
1342 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1343 "%x: Received HELLO_ACK from %x\n",
1345 socket->other_peer);
1347 GNUNET_assert (socket->tunnel == tunnel);
1348 switch (socket->state)
1350 case STATE_HELLO_WAIT:
1351 socket->read_sequence_number = ntohl (ack_msg->sequence_number);
1352 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1353 "%x: Read sequence number %u\n",
1355 (unsigned int) socket->read_sequence_number);
1356 socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
1357 reply = generate_hello_ack_msg (socket);
1358 queue_message (socket,
1360 &set_state_established,
1363 case STATE_ESTABLISHED:
1364 case STATE_RECEIVE_CLOSE_WAIT:
1365 // call statistics (# ACKs ignored++)
1369 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1370 "%x: Server %x sent HELLO_ACK when in state %d\n",
1374 socket->state = STATE_CLOSED; // introduce STATE_ERROR?
1375 return GNUNET_SYSERR;
1382 * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1384 * @param cls the socket (set from GNUNET_MESH_connect)
1385 * @param tunnel connection to the other end
1386 * @param tunnel_ctx this is NULL
1387 * @param sender who sent the message
1388 * @param message the actual message
1389 * @param atsi performance data for the connection
1390 * @return GNUNET_OK to keep the connection open,
1391 * GNUNET_SYSERR to close it (signal serious error)
1394 client_handle_reset (void *cls,
1395 struct GNUNET_MESH_Tunnel *tunnel,
1397 const struct GNUNET_PeerIdentity *sender,
1398 const struct GNUNET_MessageHeader *message,
1399 const struct GNUNET_ATS_Information*atsi)
1401 struct GNUNET_STREAM_Socket *socket = cls;
1408 * Common message handler for handling TRANSMIT_CLOSE messages
1410 * @param socket the socket through which the ack was received
1411 * @param tunnel connection to the other end
1412 * @param sender who sent the message
1413 * @param msg the transmit close message
1414 * @param atsi performance data for the connection
1415 * @return GNUNET_OK to keep the connection open,
1416 * GNUNET_SYSERR to close it (signal serious error)
1419 handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
1420 struct GNUNET_MESH_Tunnel *tunnel,
1421 const struct GNUNET_PeerIdentity *sender,
1422 const struct GNUNET_STREAM_MessageHeader *msg,
1423 const struct GNUNET_ATS_Information*atsi)
1425 struct GNUNET_STREAM_MessageHeader *reply;
1427 switch (socket->state)
1429 case STATE_ESTABLISHED:
1430 socket->state = STATE_RECEIVE_CLOSED;
1432 /* Send TRANSMIT_CLOSE_ACK */
1433 reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1434 reply->header.type =
1435 htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
1436 reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1437 queue_message (socket, reply, NULL, NULL);
1441 /* FIXME: Call statistics? */
1449 * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1451 * @param cls the socket (set from GNUNET_MESH_connect)
1452 * @param tunnel connection to the other end
1453 * @param tunnel_ctx this is NULL
1454 * @param sender who sent the message
1455 * @param message the actual message
1456 * @param atsi performance data for the connection
1457 * @return GNUNET_OK to keep the connection open,
1458 * GNUNET_SYSERR to close it (signal serious error)
1461 client_handle_transmit_close (void *cls,
1462 struct GNUNET_MESH_Tunnel *tunnel,
1464 const struct GNUNET_PeerIdentity *sender,
1465 const struct GNUNET_MessageHeader *message,
1466 const struct GNUNET_ATS_Information*atsi)
1468 struct GNUNET_STREAM_Socket *socket = cls;
1470 return handle_transmit_close (socket,
1473 (struct GNUNET_STREAM_MessageHeader *)message,
1479 * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1481 * @param cls the socket (set from GNUNET_MESH_connect)
1482 * @param tunnel connection to the other end
1483 * @param tunnel_ctx this is NULL
1484 * @param sender who sent the message
1485 * @param message the actual message
1486 * @param atsi performance data for the connection
1487 * @return GNUNET_OK to keep the connection open,
1488 * GNUNET_SYSERR to close it (signal serious error)
1491 client_handle_transmit_close_ack (void *cls,
1492 struct GNUNET_MESH_Tunnel *tunnel,
1494 const struct GNUNET_PeerIdentity *sender,
1495 const struct GNUNET_MessageHeader *message,
1496 const struct GNUNET_ATS_Information*atsi)
1498 struct GNUNET_STREAM_Socket *socket = cls;
1505 * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1507 * @param cls the socket (set from GNUNET_MESH_connect)
1508 * @param tunnel connection to the other end
1509 * @param tunnel_ctx this is NULL
1510 * @param sender who sent the message
1511 * @param message the actual message
1512 * @param atsi performance data for the connection
1513 * @return GNUNET_OK to keep the connection open,
1514 * GNUNET_SYSERR to close it (signal serious error)
1517 client_handle_receive_close (void *cls,
1518 struct GNUNET_MESH_Tunnel *tunnel,
1520 const struct GNUNET_PeerIdentity *sender,
1521 const struct GNUNET_MessageHeader *message,
1522 const struct GNUNET_ATS_Information*atsi)
1524 struct GNUNET_STREAM_Socket *socket = cls;
1531 * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1533 * @param cls the socket (set from GNUNET_MESH_connect)
1534 * @param tunnel connection to the other end
1535 * @param tunnel_ctx this is NULL
1536 * @param sender who sent the message
1537 * @param message the actual message
1538 * @param atsi performance data for the connection
1539 * @return GNUNET_OK to keep the connection open,
1540 * GNUNET_SYSERR to close it (signal serious error)
1543 client_handle_receive_close_ack (void *cls,
1544 struct GNUNET_MESH_Tunnel *tunnel,
1546 const struct GNUNET_PeerIdentity *sender,
1547 const struct GNUNET_MessageHeader *message,
1548 const struct GNUNET_ATS_Information*atsi)
1550 struct GNUNET_STREAM_Socket *socket = cls;
1557 * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1559 * @param socket the socket
1560 * @param tunnel connection to the other end
1561 * @param tunnel_ctx this is NULL
1562 * @param sender who sent the message
1563 * @param message the actual message
1564 * @param atsi performance data for the connection
1565 * @return GNUNET_OK to keep the connection open,
1566 * GNUNET_SYSERR to close it (signal serious error)
1569 handle_close (struct GNUNET_STREAM_Socket *socket,
1570 struct GNUNET_MESH_Tunnel *tunnel,
1571 const struct GNUNET_PeerIdentity *sender,
1572 const struct GNUNET_STREAM_MessageHeader *message,
1573 const struct GNUNET_ATS_Information*atsi)
1575 struct GNUNET_STREAM_MessageHeader *close_ack;
1577 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1578 "%x: Received CLOSE from %x\n",
1580 socket->other_peer);
1581 close_ack = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1582 close_ack->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1583 close_ack->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK);
1584 queue_message (socket,
1588 if (socket->state == STATE_CLOSED)
1591 GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1592 socket->receive_buffer = NULL;
1593 socket->receive_buffer_size = 0;
1599 * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1601 * @param cls the socket (set from GNUNET_MESH_connect)
1602 * @param tunnel connection to the other end
1603 * @param tunnel_ctx this is NULL
1604 * @param sender who sent the message
1605 * @param message the actual message
1606 * @param atsi performance data for the connection
1607 * @return GNUNET_OK to keep the connection open,
1608 * GNUNET_SYSERR to close it (signal serious error)
1611 client_handle_close (void *cls,
1612 struct GNUNET_MESH_Tunnel *tunnel,
1614 const struct GNUNET_PeerIdentity *sender,
1615 const struct GNUNET_MessageHeader *message,
1616 const struct GNUNET_ATS_Information*atsi)
1618 struct GNUNET_STREAM_Socket *socket = cls;
1620 return handle_close (socket,
1623 (const struct GNUNET_STREAM_MessageHeader *) message,
1629 * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1631 * @param socket the socket
1632 * @param tunnel connection to the other end
1633 * @param tunnel_ctx this is NULL
1634 * @param sender who sent the message
1635 * @param message the actual message
1636 * @param atsi performance data for the connection
1637 * @return GNUNET_OK to keep the connection open,
1638 * GNUNET_SYSERR to close it (signal serious error)
1641 handle_close_ack (struct GNUNET_STREAM_Socket *socket,
1642 struct GNUNET_MESH_Tunnel *tunnel,
1643 const struct GNUNET_PeerIdentity *sender,
1644 const struct GNUNET_STREAM_MessageHeader *message,
1645 const struct GNUNET_ATS_Information*atsi)
1647 struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
1649 shutdown_handle = socket->shutdown_handle;
1650 switch (socket->state)
1652 case STATE_CLOSE_WAIT:
1653 socket->state = STATE_CLOSED;
1654 if ( (NULL == shutdown_handle) ||
1655 (SHUT_RDWR != shutdown_handle->operation) )
1657 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1658 "%x: Received CLOSE_ACK when shutdown handle is NULL or "
1659 "not for SHUT_RDWR\n",
1663 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1664 "%x: Received CLOSE_ACK from %x\n",
1666 socket->other_peer);
1667 if (NULL != shutdown_handle->completion_cb) /* Shutdown completion */
1668 shutdown_handle->completion_cb(shutdown_handle->completion_cls,
1670 GNUNET_free (shutdown_handle); /* Free shutdown handle */
1673 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1674 "%x: Received CLOSE_ACK when in it not expected\n",
1683 * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1685 * @param cls the socket (set from GNUNET_MESH_connect)
1686 * @param tunnel connection to the other end
1687 * @param tunnel_ctx this is NULL
1688 * @param sender who sent the message
1689 * @param message the actual message
1690 * @param atsi performance data for the connection
1691 * @return GNUNET_OK to keep the connection open,
1692 * GNUNET_SYSERR to close it (signal serious error)
1695 client_handle_close_ack (void *cls,
1696 struct GNUNET_MESH_Tunnel *tunnel,
1698 const struct GNUNET_PeerIdentity *sender,
1699 const struct GNUNET_MessageHeader *message,
1700 const struct GNUNET_ATS_Information*atsi)
1702 struct GNUNET_STREAM_Socket *socket = cls;
1704 return handle_close_ack (socket,
1707 (const struct GNUNET_STREAM_MessageHeader *)
1712 /*****************************/
1713 /* Server's Message Handlers */
1714 /*****************************/
1717 * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1719 * @param cls the closure
1720 * @param tunnel connection to the other end
1721 * @param tunnel_ctx the socket
1722 * @param sender who sent the message
1723 * @param message the actual message
1724 * @param atsi performance data for the connection
1725 * @return GNUNET_OK to keep the connection open,
1726 * GNUNET_SYSERR to close it (signal serious error)
1729 server_handle_data (void *cls,
1730 struct GNUNET_MESH_Tunnel *tunnel,
1732 const struct GNUNET_PeerIdentity *sender,
1733 const struct GNUNET_MessageHeader *message,
1734 const struct GNUNET_ATS_Information*atsi)
1736 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1738 return handle_data (socket,
1741 (const struct GNUNET_STREAM_DataMessage *)message,
1747 * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
1749 * @param cls the closure
1750 * @param tunnel connection to the other end
1751 * @param tunnel_ctx the socket
1752 * @param sender who sent the message
1753 * @param message the actual message
1754 * @param atsi performance data for the connection
1755 * @return GNUNET_OK to keep the connection open,
1756 * GNUNET_SYSERR to close it (signal serious error)
1759 server_handle_hello (void *cls,
1760 struct GNUNET_MESH_Tunnel *tunnel,
1762 const struct GNUNET_PeerIdentity *sender,
1763 const struct GNUNET_MessageHeader *message,
1764 const struct GNUNET_ATS_Information*atsi)
1766 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1767 struct GNUNET_STREAM_HelloAckMessage *reply;
1769 if (GNUNET_PEER_search (sender) != socket->other_peer)
1771 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1772 "%x: Received HELLO from non-confirming peer\n",
1777 GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO ==
1778 ntohs (message->type));
1779 GNUNET_assert (socket->tunnel == tunnel);
1780 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1781 "%x: Received HELLO from %x\n",
1783 socket->other_peer);
1785 if (STATE_INIT == socket->state)
1787 reply = generate_hello_ack_msg (socket);
1788 queue_message (socket,
1790 &set_state_hello_wait,
1795 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1796 "Client sent HELLO when in state %d\n", socket->state);
1797 /* FIXME: Send RESET? */
1805 * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1807 * @param cls the closure
1808 * @param tunnel connection to the other end
1809 * @param tunnel_ctx the socket
1810 * @param sender who sent the message
1811 * @param message the actual message
1812 * @param atsi performance data for the connection
1813 * @return GNUNET_OK to keep the connection open,
1814 * GNUNET_SYSERR to close it (signal serious error)
1817 server_handle_hello_ack (void *cls,
1818 struct GNUNET_MESH_Tunnel *tunnel,
1820 const struct GNUNET_PeerIdentity *sender,
1821 const struct GNUNET_MessageHeader *message,
1822 const struct GNUNET_ATS_Information*atsi)
1824 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1825 const struct GNUNET_STREAM_HelloAckMessage *ack_message;
1827 GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK ==
1828 ntohs (message->type));
1829 GNUNET_assert (socket->tunnel == tunnel);
1830 ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
1831 if (STATE_HELLO_WAIT == socket->state)
1833 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1834 "%x: Received HELLO_ACK from %x\n",
1836 socket->other_peer);
1837 socket->read_sequence_number = ntohl (ack_message->sequence_number);
1838 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1839 "%x: Read sequence number %u\n",
1841 (unsigned int) socket->read_sequence_number);
1842 socket->receiver_window_available =
1843 ntohl (ack_message->receiver_window_size);
1844 /* Attain ESTABLISHED state */
1845 set_state_established (NULL, socket);
1849 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1850 "Client sent HELLO_ACK when in state %d\n", socket->state);
1851 /* FIXME: Send RESET? */
1859 * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1861 * @param cls the closure
1862 * @param tunnel connection to the other end
1863 * @param tunnel_ctx the socket
1864 * @param sender who sent the message
1865 * @param message the actual message
1866 * @param atsi performance data for the connection
1867 * @return GNUNET_OK to keep the connection open,
1868 * GNUNET_SYSERR to close it (signal serious error)
1871 server_handle_reset (void *cls,
1872 struct GNUNET_MESH_Tunnel *tunnel,
1874 const struct GNUNET_PeerIdentity *sender,
1875 const struct GNUNET_MessageHeader *message,
1876 const struct GNUNET_ATS_Information*atsi)
1878 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1885 * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1887 * @param cls the closure
1888 * @param tunnel connection to the other end
1889 * @param tunnel_ctx the socket
1890 * @param sender who sent the message
1891 * @param message the actual message
1892 * @param atsi performance data for the connection
1893 * @return GNUNET_OK to keep the connection open,
1894 * GNUNET_SYSERR to close it (signal serious error)
1897 server_handle_transmit_close (void *cls,
1898 struct GNUNET_MESH_Tunnel *tunnel,
1900 const struct GNUNET_PeerIdentity *sender,
1901 const struct GNUNET_MessageHeader *message,
1902 const struct GNUNET_ATS_Information*atsi)
1904 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1906 return handle_transmit_close (socket,
1909 (struct GNUNET_STREAM_MessageHeader *)message,
1915 * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1917 * @param cls the closure
1918 * @param tunnel connection to the other end
1919 * @param tunnel_ctx the socket
1920 * @param sender who sent the message
1921 * @param message the actual message
1922 * @param atsi performance data for the connection
1923 * @return GNUNET_OK to keep the connection open,
1924 * GNUNET_SYSERR to close it (signal serious error)
1927 server_handle_transmit_close_ack (void *cls,
1928 struct GNUNET_MESH_Tunnel *tunnel,
1930 const struct GNUNET_PeerIdentity *sender,
1931 const struct GNUNET_MessageHeader *message,
1932 const struct GNUNET_ATS_Information*atsi)
1934 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1941 * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1943 * @param cls the closure
1944 * @param tunnel connection to the other end
1945 * @param tunnel_ctx the socket
1946 * @param sender who sent the message
1947 * @param message the actual message
1948 * @param atsi performance data for the connection
1949 * @return GNUNET_OK to keep the connection open,
1950 * GNUNET_SYSERR to close it (signal serious error)
1953 server_handle_receive_close (void *cls,
1954 struct GNUNET_MESH_Tunnel *tunnel,
1956 const struct GNUNET_PeerIdentity *sender,
1957 const struct GNUNET_MessageHeader *message,
1958 const struct GNUNET_ATS_Information*atsi)
1960 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1967 * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1969 * @param cls the closure
1970 * @param tunnel connection to the other end
1971 * @param tunnel_ctx the socket
1972 * @param sender who sent the message
1973 * @param message the actual message
1974 * @param atsi performance data for the connection
1975 * @return GNUNET_OK to keep the connection open,
1976 * GNUNET_SYSERR to close it (signal serious error)
1979 server_handle_receive_close_ack (void *cls,
1980 struct GNUNET_MESH_Tunnel *tunnel,
1982 const struct GNUNET_PeerIdentity *sender,
1983 const struct GNUNET_MessageHeader *message,
1984 const struct GNUNET_ATS_Information*atsi)
1986 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1993 * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1995 * @param cls the listen socket (from GNUNET_MESH_connect in
1996 * GNUNET_STREAM_listen)
1997 * @param tunnel connection to the other end
1998 * @param tunnel_ctx the socket
1999 * @param sender who sent the message
2000 * @param message the actual message
2001 * @param atsi performance data for the connection
2002 * @return GNUNET_OK to keep the connection open,
2003 * GNUNET_SYSERR to close it (signal serious error)
2006 server_handle_close (void *cls,
2007 struct GNUNET_MESH_Tunnel *tunnel,
2009 const struct GNUNET_PeerIdentity *sender,
2010 const struct GNUNET_MessageHeader *message,
2011 const struct GNUNET_ATS_Information*atsi)
2013 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2015 return handle_close (socket,
2018 (const struct GNUNET_STREAM_MessageHeader *) message,
2024 * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
2026 * @param cls the closure
2027 * @param tunnel connection to the other end
2028 * @param tunnel_ctx the socket
2029 * @param sender who sent the message
2030 * @param message the actual message
2031 * @param atsi performance data for the connection
2032 * @return GNUNET_OK to keep the connection open,
2033 * GNUNET_SYSERR to close it (signal serious error)
2036 server_handle_close_ack (void *cls,
2037 struct GNUNET_MESH_Tunnel *tunnel,
2039 const struct GNUNET_PeerIdentity *sender,
2040 const struct GNUNET_MessageHeader *message,
2041 const struct GNUNET_ATS_Information*atsi)
2043 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2045 return handle_close_ack (socket,
2048 (const struct GNUNET_STREAM_MessageHeader *) message,
2054 * Message Handler for mesh
2056 * @param socket the socket through which the ack was received
2057 * @param tunnel connection to the other end
2058 * @param sender who sent the message
2059 * @param ack the acknowledgment message
2060 * @param atsi performance data for the connection
2061 * @return GNUNET_OK to keep the connection open,
2062 * GNUNET_SYSERR to close it (signal serious error)
2065 handle_ack (struct GNUNET_STREAM_Socket *socket,
2066 struct GNUNET_MESH_Tunnel *tunnel,
2067 const struct GNUNET_PeerIdentity *sender,
2068 const struct GNUNET_STREAM_AckMessage *ack,
2069 const struct GNUNET_ATS_Information*atsi)
2071 unsigned int packet;
2072 int need_retransmission;
2075 if (GNUNET_PEER_search (sender) != socket->other_peer)
2077 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2078 "%x: Received ACK from non-confirming peer\n",
2083 switch (socket->state)
2085 case (STATE_ESTABLISHED):
2086 if (NULL == socket->write_handle)
2088 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2089 "%x: Received DATA_ACK when write_handle is NULL\n",
2093 /* FIXME: increment in the base sequence number is breaking current flow
2095 if (!((socket->write_sequence_number
2096 - ntohl (ack->base_sequence_number)) < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
2098 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2099 "%x: Received DATA_ACK with unexpected base sequence "
2102 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2103 "%x: Current write sequence: %u; Ack's base sequence: %u\n",
2105 socket->write_sequence_number,
2106 ntohl (ack->base_sequence_number));
2109 /* FIXME: include the case when write_handle is cancelled - ignore the
2112 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2113 "%x: Received DATA_ACK from %x\n",
2115 socket->other_peer);
2117 /* Cancel the retransmission task */
2118 if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
2120 GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
2121 socket->retransmission_timeout_task_id =
2122 GNUNET_SCHEDULER_NO_TASK;
2125 for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2127 if (NULL == socket->write_handle->messages[packet]) break;
2128 if (ntohl (ack->base_sequence_number)
2129 >= ntohl (socket->write_handle->messages[packet]->sequence_number))
2130 ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
2135 ackbitmap_is_bit_set (&socket->write_handle->ack_bitmap,
2136 ntohl (socket->write_handle->messages[packet]->sequence_number)
2137 - ntohl (ack->base_sequence_number)))
2138 ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
2143 /* Update the receive window remaining
2144 FIXME : Should update with the value from a data ack with greater
2146 socket->receiver_window_available =
2147 ntohl (ack->receive_window_remaining);
2149 /* Check if we have received all acknowledgements */
2150 need_retransmission = GNUNET_NO;
2151 for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2153 if (NULL == socket->write_handle->messages[packet]) break;
2154 if (GNUNET_YES != ackbitmap_is_bit_set
2155 (&socket->write_handle->ack_bitmap,packet))
2157 need_retransmission = GNUNET_YES;
2161 if (GNUNET_YES == need_retransmission)
2163 write_data (socket);
2165 else /* We have to call the write continuation callback now */
2167 /* Free the packets */
2168 for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2170 GNUNET_free_non_null (socket->write_handle->messages[packet]);
2172 if (NULL != socket->write_handle->write_cont)
2173 socket->write_handle->write_cont
2174 (socket->write_handle->write_cont_cls,
2176 socket->write_handle->size);
2177 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2178 "%x: Write completion callback completed\n",
2180 /* We are done with the write handle - Freeing it */
2181 GNUNET_free (socket->write_handle);
2182 socket->write_handle = NULL;
2193 * Message Handler for mesh
2195 * @param cls the 'struct GNUNET_STREAM_Socket'
2196 * @param tunnel connection to the other end
2197 * @param tunnel_ctx unused
2198 * @param sender who sent the message
2199 * @param message the actual message
2200 * @param atsi performance data for the connection
2201 * @return GNUNET_OK to keep the connection open,
2202 * GNUNET_SYSERR to close it (signal serious error)
2205 client_handle_ack (void *cls,
2206 struct GNUNET_MESH_Tunnel *tunnel,
2208 const struct GNUNET_PeerIdentity *sender,
2209 const struct GNUNET_MessageHeader *message,
2210 const struct GNUNET_ATS_Information*atsi)
2212 struct GNUNET_STREAM_Socket *socket = cls;
2213 const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2215 return handle_ack (socket, tunnel, sender, ack, atsi);
2220 * Message Handler for mesh
2222 * @param cls the server's listen socket
2223 * @param tunnel connection to the other end
2224 * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
2225 * @param sender who sent the message
2226 * @param message the actual message
2227 * @param atsi performance data for the connection
2228 * @return GNUNET_OK to keep the connection open,
2229 * GNUNET_SYSERR to close it (signal serious error)
2232 server_handle_ack (void *cls,
2233 struct GNUNET_MESH_Tunnel *tunnel,
2235 const struct GNUNET_PeerIdentity *sender,
2236 const struct GNUNET_MessageHeader *message,
2237 const struct GNUNET_ATS_Information*atsi)
2239 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2240 const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2242 return handle_ack (socket, tunnel, sender, ack, atsi);
2247 * For client message handlers, the stream socket is in the
2250 static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
2251 {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2252 {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK,
2253 sizeof (struct GNUNET_STREAM_AckMessage) },
2254 {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2255 sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2256 {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2257 sizeof (struct GNUNET_STREAM_MessageHeader)},
2258 {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2259 sizeof (struct GNUNET_STREAM_MessageHeader)},
2260 {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2261 sizeof (struct GNUNET_STREAM_MessageHeader)},
2262 {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2263 sizeof (struct GNUNET_STREAM_MessageHeader)},
2264 {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2265 sizeof (struct GNUNET_STREAM_MessageHeader)},
2266 {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2267 sizeof (struct GNUNET_STREAM_MessageHeader)},
2268 {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2269 sizeof (struct GNUNET_STREAM_MessageHeader)},
2275 * For server message handlers, the stream socket is in the
2276 * tunnel context, and the listen socket in the closure argument.
2278 static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
2279 {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2280 {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK,
2281 sizeof (struct GNUNET_STREAM_AckMessage) },
2282 {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO,
2283 sizeof (struct GNUNET_STREAM_MessageHeader)},
2284 {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2285 sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2286 {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2287 sizeof (struct GNUNET_STREAM_MessageHeader)},
2288 {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2289 sizeof (struct GNUNET_STREAM_MessageHeader)},
2290 {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2291 sizeof (struct GNUNET_STREAM_MessageHeader)},
2292 {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2293 sizeof (struct GNUNET_STREAM_MessageHeader)},
2294 {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2295 sizeof (struct GNUNET_STREAM_MessageHeader)},
2296 {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2297 sizeof (struct GNUNET_STREAM_MessageHeader)},
2298 {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2299 sizeof (struct GNUNET_STREAM_MessageHeader)},
2305 * Function called when our target peer is connected to our tunnel
2307 * @param cls the socket for which this tunnel is created
2308 * @param peer the peer identity of the target
2309 * @param atsi performance data for the connection
2312 mesh_peer_connect_callback (void *cls,
2313 const struct GNUNET_PeerIdentity *peer,
2314 const struct GNUNET_ATS_Information * atsi)
2316 struct GNUNET_STREAM_Socket *socket = cls;
2317 struct GNUNET_STREAM_MessageHeader *message;
2318 GNUNET_PEER_Id connected_peer;
2320 connected_peer = GNUNET_PEER_search (peer);
2322 if (connected_peer != socket->other_peer)
2324 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2325 "%x: A peer which is not our target has connected",
2331 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2332 "%x: Target peer %x connected\n",
2336 /* Set state to INIT */
2337 socket->state = STATE_INIT;
2339 /* Send HELLO message */
2340 message = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
2341 message->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
2342 message->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
2343 queue_message (socket,
2345 &set_state_hello_wait,
2348 /* Call open callback */
2349 if (NULL == socket->open_cb)
2351 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2352 "STREAM_open callback is NULL\n");
2358 * Function called when our target peer is disconnected from our tunnel
2360 * @param cls the socket associated which this tunnel
2361 * @param peer the peer identity of the target
2364 mesh_peer_disconnect_callback (void *cls,
2365 const struct GNUNET_PeerIdentity *peer)
2372 * Method called whenever a peer creates a tunnel to us
2374 * @param cls closure
2375 * @param tunnel new handle to the tunnel
2376 * @param initiator peer that started the tunnel
2377 * @param atsi performance information for the tunnel
2378 * @return initial tunnel context for the tunnel
2379 * (can be NULL -- that's not an error)
2382 new_tunnel_notify (void *cls,
2383 struct GNUNET_MESH_Tunnel *tunnel,
2384 const struct GNUNET_PeerIdentity *initiator,
2385 const struct GNUNET_ATS_Information *atsi)
2387 struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2388 struct GNUNET_STREAM_Socket *socket;
2390 /* FIXME: If a tunnel is already created, we should not accept new tunnels
2391 from the same peer again until the socket is closed */
2393 socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2394 socket->other_peer = GNUNET_PEER_intern (initiator);
2395 socket->tunnel = tunnel;
2396 socket->session_id = 0; /* FIXME */
2397 socket->state = STATE_INIT;
2398 socket->lsocket = lsocket;
2399 socket->our_id = lsocket->our_id;
2401 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2402 "%x: Peer %x initiated tunnel to us\n",
2404 socket->other_peer);
2406 /* FIXME: Copy MESH handle from lsocket to socket */
2413 * Function called whenever an inbound tunnel is destroyed. Should clean up
2414 * any associated state. This function is NOT called if the client has
2415 * explicitly asked for the tunnel to be destroyed using
2416 * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
2419 * @param cls closure (set from GNUNET_MESH_connect)
2420 * @param tunnel connection to the other end (henceforth invalid)
2421 * @param tunnel_ctx place where local state associated
2422 * with the tunnel is stored
2425 tunnel_cleaner (void *cls,
2426 const struct GNUNET_MESH_Tunnel *tunnel,
2429 struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
2431 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2432 "%x: Peer %x has terminated connection abruptly\n",
2434 socket->other_peer);
2436 socket->status = GNUNET_STREAM_SHUTDOWN;
2438 /* Clear Transmit handles */
2439 if (NULL != socket->transmit_handle)
2441 GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2442 socket->transmit_handle = NULL;
2444 socket->tunnel = NULL;
2454 * Tries to open a stream to the target peer
2456 * @param cfg configuration to use
2457 * @param target the target peer to which the stream has to be opened
2458 * @param app_port the application port number which uniquely identifies this
2460 * @param open_cb this function will be called after stream has be established
2461 * @param open_cb_cls the closure for open_cb
2462 * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
2463 * @return if successful it returns the stream socket; NULL if stream cannot be
2466 struct GNUNET_STREAM_Socket *
2467 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
2468 const struct GNUNET_PeerIdentity *target,
2469 GNUNET_MESH_ApplicationType app_port,
2470 GNUNET_STREAM_OpenCallback open_cb,
2474 struct GNUNET_STREAM_Socket *socket;
2475 struct GNUNET_PeerIdentity own_peer_id;
2476 enum GNUNET_STREAM_Option option;
2477 va_list vargs; /* Variable arguments */
2479 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2482 socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2483 socket->other_peer = GNUNET_PEER_intern (target);
2484 socket->open_cb = open_cb;
2485 socket->open_cls = open_cb_cls;
2486 GNUNET_TESTING_get_peer_identity (cfg, &own_peer_id);
2487 socket->our_id = GNUNET_PEER_intern (&own_peer_id);
2490 socket->retransmit_timeout =
2491 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
2493 va_start (vargs, open_cb_cls); /* Parse variable args */
2495 option = va_arg (vargs, enum GNUNET_STREAM_Option);
2498 case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
2499 /* Expect struct GNUNET_TIME_Relative */
2500 socket->retransmit_timeout = va_arg (vargs,
2501 struct GNUNET_TIME_Relative);
2503 case GNUNET_STREAM_OPTION_END:
2506 } while (GNUNET_STREAM_OPTION_END != option);
2507 va_end (vargs); /* End of variable args parsing */
2508 socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
2509 10, /* QUEUE size as parameter? */
2511 NULL, /* No inbound tunnel handler */
2512 &tunnel_cleaner, /* FIXME: not required? */
2513 client_message_handlers,
2514 &app_port); /* We don't get inbound tunnels */
2515 if (NULL == socket->mesh) /* Fail if we cannot connect to mesh */
2517 GNUNET_free (socket);
2521 /* Now create the mesh tunnel to target */
2522 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2523 "Creating MESH Tunnel\n");
2524 socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
2525 NULL, /* Tunnel context */
2526 &mesh_peer_connect_callback,
2527 &mesh_peer_disconnect_callback,
2529 GNUNET_assert (NULL != socket->tunnel);
2530 GNUNET_MESH_peer_request_connect_add (socket->tunnel,
2533 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2534 "%s() END\n", __func__);
2540 * Shutdown the stream for reading or writing (similar to man 2 shutdown).
2542 * @param socket the stream socket
2543 * @param operation SHUT_RD, SHUT_WR or SHUT_RDWR
2544 * @param completion_cb the callback that will be called upon successful
2545 * shutdown of given operation
2546 * @param completion_cls the closure for the completion callback
2547 * @return the shutdown handle
2549 struct GNUNET_STREAM_ShutdownHandle *
2550 GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
2552 GNUNET_STREAM_ShutdownCompletion completion_cb,
2553 void *completion_cls)
2555 struct GNUNET_STREAM_ShutdownHandle *handle;
2556 struct GNUNET_STREAM_MessageHeader *msg;
2558 GNUNET_assert (NULL == socket->shutdown_handle);
2560 handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ShutdownHandle));
2561 handle->socket = socket;
2562 handle->completion_cb = completion_cb;
2563 handle->completion_cls = completion_cls;
2564 socket->shutdown_handle = handle;
2566 msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
2567 msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
2571 handle->operation = SHUT_RD;
2572 if (NULL != socket->read_handle)
2573 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2574 "Existing read handle should be cancelled before shutting"
2578 handle->operation = SHUT_WR;
2582 handle->operation = SHUT_RDWR;
2583 if (NULL != socket->write_handle)
2584 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2585 "Existing write handle should be cancelled before shutting"
2587 if (NULL != socket->read_handle)
2588 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2589 "Existing read handle should be cancelled before shutting"
2591 msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
2592 queue_message (socket,
2594 &set_state_close_wait,
2598 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2599 "GNUNET_STREAM_shutdown called with invalid value for "
2600 "parameter operation -- Ignoring\n");
2601 GNUNET_free (handle);
2609 * Cancels a pending shutdown
2611 * @param handle the shutdown handle returned from GNUNET_STREAM_shutdown
2614 GNUNET_STREAM_shutdown_cancel (struct GNUNET_STREAM_ShutdownHandle *handle)
2623 * @param socket the stream socket
2626 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
2628 struct MessageQueue *head;
2630 GNUNET_break (NULL == socket->read_handle);
2631 GNUNET_break (NULL == socket->write_handle);
2633 if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
2635 /* socket closed with read task pending!? */
2637 GNUNET_SCHEDULER_cancel (socket->read_task_id);
2638 socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
2641 /* Terminate the ack'ing tasks if they are still present */
2642 if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK)
2644 GNUNET_SCHEDULER_cancel (socket->ack_task_id);
2645 socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
2648 /* Clear Transmit handles */
2649 if (NULL != socket->transmit_handle)
2651 GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2652 socket->transmit_handle = NULL;
2655 /* Clear existing message queue */
2656 while (NULL != (head = socket->queue_head)) {
2657 GNUNET_CONTAINER_DLL_remove (socket->queue_head,
2660 GNUNET_free (head->message);
2664 /* Close associated tunnel */
2665 if (NULL != socket->tunnel)
2667 GNUNET_MESH_tunnel_destroy (socket->tunnel);
2668 socket->tunnel = NULL;
2671 /* Close mesh connection */
2672 if (NULL != socket->mesh && NULL == socket->lsocket)
2674 GNUNET_MESH_disconnect (socket->mesh);
2675 socket->mesh = NULL;
2678 /* Release receive buffer */
2679 if (NULL != socket->receive_buffer)
2681 GNUNET_free (socket->receive_buffer);
2684 GNUNET_free (socket);
2689 * Listens for stream connections for a specific application ports
2691 * @param cfg the configuration to use
2692 * @param app_port the application port for which new streams will be accepted
2693 * @param listen_cb this function will be called when a peer tries to establish
2695 * @param listen_cb_cls closure for listen_cb
2696 * @return listen socket, NULL for any error
2698 struct GNUNET_STREAM_ListenSocket *
2699 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
2700 GNUNET_MESH_ApplicationType app_port,
2701 GNUNET_STREAM_ListenCallback listen_cb,
2702 void *listen_cb_cls)
2704 /* FIXME: Add variable args for passing configration options? */
2705 struct GNUNET_STREAM_ListenSocket *lsocket;
2706 struct GNUNET_PeerIdentity our_peer_id;
2708 lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
2709 lsocket->port = app_port;
2710 lsocket->listen_cb = listen_cb;
2711 lsocket->listen_cb_cls = listen_cb_cls;
2712 GNUNET_TESTING_get_peer_identity (cfg, &our_peer_id);
2713 lsocket->our_id = GNUNET_PEER_intern (&our_peer_id);
2714 lsocket->mesh = GNUNET_MESH_connect (cfg,
2715 10, /* FIXME: QUEUE size as parameter? */
2716 lsocket, /* Closure */
2719 server_message_handlers,
2721 GNUNET_assert (NULL != lsocket->mesh);
2727 * Closes the listen socket
2729 * @param lsocket the listen socket
2732 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
2734 /* Close MESH connection */
2735 GNUNET_assert (NULL != lsocket->mesh);
2736 GNUNET_MESH_disconnect (lsocket->mesh);
2738 GNUNET_free (lsocket);
2743 * Tries to write the given data to the stream
2745 * @param socket the socket representing a stream
2746 * @param data the data buffer from where the data is written into the stream
2747 * @param size the number of bytes to be written from the data buffer
2748 * @param timeout the timeout period
2749 * @param write_cont the function to call upon writing some bytes into the stream
2750 * @param write_cont_cls the closure
2751 * @return handle to cancel the operation
2753 struct GNUNET_STREAM_IOWriteHandle *
2754 GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
2757 struct GNUNET_TIME_Relative timeout,
2758 GNUNET_STREAM_CompletionContinuation write_cont,
2759 void *write_cont_cls)
2761 unsigned int num_needed_packets;
2762 unsigned int packet;
2763 struct GNUNET_STREAM_IOWriteHandle *io_handle;
2764 uint32_t packet_size;
2765 uint32_t payload_size;
2766 struct GNUNET_STREAM_DataMessage *data_msg;
2768 struct GNUNET_TIME_Relative ack_deadline;
2770 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2773 /* Return NULL if there is already a write request pending */
2774 if (NULL != socket->write_handle)
2779 if (!((STATE_ESTABLISHED == socket->state)
2780 || (STATE_RECEIVE_CLOSE_WAIT == socket->state)
2781 || (STATE_RECEIVE_CLOSED == socket->state)))
2783 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2784 "%x: Attempting to write on a closed (OR) not-yet-established"
2789 if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size)
2790 size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size;
2791 num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size;
2792 io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
2793 io_handle->socket = socket;
2794 io_handle->write_cont = write_cont;
2795 io_handle->write_cont_cls = write_cont_cls;
2796 io_handle->size = size;
2798 /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
2799 determined from RTT */
2800 ack_deadline = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
2801 /* Divide the given buffer into packets for sending */
2802 for (packet=0; packet < num_needed_packets; packet++)
2804 if ((packet + 1) * max_payload_size < size)
2806 payload_size = max_payload_size;
2807 packet_size = MAX_PACKET_SIZE;
2811 payload_size = size - packet * max_payload_size;
2812 packet_size = payload_size + sizeof (struct
2813 GNUNET_STREAM_DataMessage);
2815 io_handle->messages[packet] = GNUNET_malloc (packet_size);
2816 io_handle->messages[packet]->header.header.size = htons (packet_size);
2817 io_handle->messages[packet]->header.header.type =
2818 htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
2819 io_handle->messages[packet]->sequence_number =
2820 htonl (socket->write_sequence_number++);
2821 io_handle->messages[packet]->offset = htonl (socket->write_offset);
2823 /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
2824 determined from RTT */
2825 io_handle->messages[packet]->ack_deadline =
2826 GNUNET_TIME_relative_hton (ack_deadline);
2827 data_msg = io_handle->messages[packet];
2828 /* Copy data from given buffer to the packet */
2829 memcpy (&data_msg[1],
2832 sweep += payload_size;
2833 socket->write_offset += payload_size;
2835 socket->write_handle = io_handle;
2836 write_data (socket);
2838 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2839 "%s() END\n", __func__);
2846 * Tries to read data from the stream
2848 * @param socket the socket representing a stream
2849 * @param timeout the timeout period
2850 * @param proc function to call with data (once only)
2851 * @param proc_cls the closure for proc
2852 * @return handle to cancel the operation
2854 struct GNUNET_STREAM_IOReadHandle *
2855 GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
2856 struct GNUNET_TIME_Relative timeout,
2857 GNUNET_STREAM_DataProcessor proc,
2860 struct GNUNET_STREAM_IOReadHandle *read_handle;
2862 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2867 /* Return NULL if there is already a read handle; the user has to cancel that
2868 first before continuing or has to wait until it is completed */
2869 if (NULL != socket->read_handle) return NULL;
2871 GNUNET_assert (NULL != proc);
2873 read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
2874 read_handle->proc = proc;
2875 read_handle->proc_cls = proc_cls;
2876 socket->read_handle = read_handle;
2878 /* Check if we have a packet at bitmap 0 */
2879 if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
2882 socket->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
2887 /* Setup the read timeout task */
2888 socket->read_io_timeout_task_id =
2889 GNUNET_SCHEDULER_add_delayed (timeout,
2892 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2901 * Cancel pending write operation.
2903 * @param ioh handle to operation to cancel
2906 GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
2908 struct GNUNET_STREAM_Socket *socket = ioh->socket;
2909 unsigned int packet;
2911 GNUNET_assert (NULL != socket->write_handle);
2912 GNUNET_assert (socket->write_handle == ioh);
2914 if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
2916 GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
2917 socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
2920 for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2922 if (NULL == ioh->messages[packet]) break;
2923 GNUNET_free (ioh->messages[packet]);
2926 GNUNET_free (socket->write_handle);
2927 socket->write_handle = NULL;
2933 * Cancel pending read operation.
2935 * @param ioh handle to operation to cancel
2938 GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh)