2 This file is part of GNUnet.
3 (C) 2012 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file stream/stream_api.c
23 * @brief Implementation of the stream library
24 * @author Sree Harsha Totakura
27 #include "gnunet_common.h"
28 #include "gnunet_crypto_lib.h"
29 #include "gnunet_stream_lib.h"
30 #include "stream_protocol.h"
34 * The maximum packet size of a stream packet
36 #define MAX_PACKET_SIZE 64000
39 * The maximum payload a data message packet can carry
41 static size_t max_payload_size =
42 MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
47 #define RECEIVE_BUFFER_SIZE 4096000
50 * states in the Protocol
55 * Client initialization state
60 * Listener initialization state
65 * Pre-connection establishment state
70 * State where a connection has been established
75 * State where the socket is closed on our side and waiting to be ACK'ed
77 STATE_RECEIVE_CLOSE_WAIT,
80 * State where the socket is closed for reading
85 * State where the socket is closed on our side and waiting to be ACK'ed
87 STATE_TRANSMIT_CLOSE_WAIT,
90 * State where the socket is closed for writing
92 STATE_TRANSMIT_CLOSED,
95 * State where the socket is closed on our side and waiting to be ACK'ed
100 * State where the socket is closed
107 * Functions of this type are called when a message is written
109 * @param socket the socket the written message was bound to
111 typedef void (*SendFinishCallback) (void *cls,
112 struct GNUNET_STREAM_Socket *socket);
116 * The send message queue
123 struct GNUNET_STREAM_MessageHeader *message;
126 * Callback to be called when the message is sent
128 SendFinishCallback finish_cb;
131 * The closure for finish_cb
136 * The next message in queue. Should be NULL in the last message
138 struct MessageQueue *next;
141 * The next message in queue. Should be NULL in the last message
143 struct MessageQueue *prev;
148 * The STREAM Socket Handler
150 struct GNUNET_STREAM_Socket
154 * The peer identity of the peer at the other end of the stream
156 struct GNUNET_PeerIdentity other_peer;
159 * Retransmission timeout
161 struct GNUNET_TIME_Relative retransmit_timeout;
164 * The Acknowledgement Bitmap
166 GNUNET_STREAM_AckBitmap ack_bitmap;
169 * Time when the Acknowledgement was queued
171 struct GNUNET_TIME_Absolute ack_time_registered;
174 * Queued Acknowledgement deadline
176 struct GNUNET_TIME_Relative ack_time_deadline;
179 * The task for sending timely Acks
181 GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
186 struct GNUNET_MESH_Handle *mesh;
189 * The mesh tunnel handle
191 struct GNUNET_MESH_Tunnel *tunnel;
194 * Stream open closure
199 * Stream open callback
201 GNUNET_STREAM_OpenCallback open_cb;
204 * The current transmit handle (if a pending transmit request exists)
206 struct GNUNET_MESH_TransmitHandle *transmit_handle;
209 * The current message associated with the transmit handle
211 struct MessageQueue *queue_head;
214 * The queue tail, should always point to the last message in queue
216 struct MessageQueue *queue_tail;
219 * The write IO_handle associated with this socket
221 struct GNUNET_STREAM_IOHandle *write_handle;
224 * The read IO_handle associated with this socket
226 struct GNUNET_STREAM_IOHandle *read_handle;
229 * Buffer for storing received messages
231 void *receive_buffer;
234 * The state of the protocol associated with this socket
239 * The status of the socket
241 enum GNUNET_STREAM_Status status;
244 * The number of previous timeouts; FIXME: currently not used
246 unsigned int retries;
249 * The session id associated with this stream connection
250 * FIXME: Not used currently, may be removed
255 * Write sequence number. Set to random when sending HELLO(client) and
258 uint32_t write_sequence_number;
261 * Read sequence number. This number's value is determined during handshake
263 uint32_t read_sequence_number;
266 * receiver's available buffer after the last acknowledged packet
268 uint32_t receive_window_available;
273 * A socket for listening
275 struct GNUNET_STREAM_ListenSocket
281 struct GNUNET_MESH_Handle *mesh;
284 * The callback function which is called after successful opening socket
286 GNUNET_STREAM_ListenCallback listen_cb;
289 * The call back closure
296 GNUNET_MESH_ApplicationType port;
305 struct GNUNET_STREAM_IOHandle
308 * The packet_buffers associated with this Handle
310 struct GNUNET_STREAM_DataMessage *messages[64];
313 * The bitmap of this IOHandle; Corresponding bit for a message is set when
314 * it has been acknowledged by the receiver
316 GNUNET_STREAM_AckBitmap ack_bitmap;
319 * receiver's available buffer
321 uint32_t receive_window_available;
324 * Number of packets sent before waiting for an ack
326 * FIXME: Do we need this?
328 unsigned int sent_packets;
333 * Default value in seconds for various timeouts
335 static unsigned int default_timeout = 300;
339 * Callback function for sending hello message
341 * @param cls closure the socket
342 * @param size number of bytes available in buf
343 * @param buf where the callee should write the message
344 * @return number of bytes written to buf
347 send_message_notify (void *cls, size_t size, void *buf)
349 struct GNUNET_STREAM_Socket *socket = cls;
350 struct MessageQueue *head;
353 socket->transmit_handle = NULL; /* Remove the transmit handle */
354 head = socket->queue_head;
356 return 0; /* just to be safe */
357 if (0 == size) /* request timed out */
360 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
361 "Message sending timed out. Retry %d \n",
363 socket->transmit_handle =
364 GNUNET_MESH_notify_transmit_ready (socket->tunnel,
367 /* FIXME: exponential backoff */
368 socket->retransmit_timeout,
370 ntohs (head->message->header.size),
371 &send_message_notify,
376 ret = ntohs (head->message->header.size);
377 GNUNET_assert (size >= ret);
378 memcpy (buf, head->message, ret);
379 if (NULL != head->finish_cb)
381 head->finish_cb (socket, head->finish_cb_cls);
383 GNUNET_CONTAINER_DLL_remove (socket->queue_head,
386 GNUNET_free (head->message);
388 head = socket->queue_head;
389 if (NULL != head) /* more pending messages to send */
392 socket->transmit_handle =
393 GNUNET_MESH_notify_transmit_ready (socket->tunnel,
396 /* FIXME: exponential backoff */
397 socket->retransmit_timeout,
399 ntohs (head->message->header.size),
400 &send_message_notify,
408 * Queues a message for sending using the mesh connection of a socket
410 * @param socket the socket whose mesh connection is used
411 * @param message the message to be sent
412 * @param finish_cb the callback to be called when the message is sent
413 * @param finish_cb_cls the closure for the callback
416 queue_message (struct GNUNET_STREAM_Socket *socket,
417 struct GNUNET_STREAM_MessageHeader *message,
418 SendFinishCallback finish_cb,
421 struct MessageQueue *queue_entity;
423 queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
424 queue_entity->message = message;
425 queue_entity->finish_cb = finish_cb;
426 queue_entity->finish_cb_cls = finish_cb_cls;
427 GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
430 if (NULL == socket->transmit_handle)
433 socket->transmit_handle =
434 GNUNET_MESH_notify_transmit_ready (socket->tunnel,
437 socket->retransmit_timeout,
439 ntohs (message->header.size),
440 &send_message_notify,
447 * Callback function for sending ack message
449 * @param cls closure the ACK message created in ack_task
450 * @param size number of bytes available in buffer
451 * @param buf where the callee should write the message
452 * @return number of bytes written to buf
455 send_ack_notify (void *cls, size_t size, void *buf)
457 struct GNUNET_STREAM_AckMessage *ack_msg = cls;
461 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
462 "%s called with size 0\n", __func__);
465 GNUNET_assert (ack_msg->header.header.size <= size);
467 size = ack_msg->header.header.size;
468 memcpy (buf, ack_msg, size);
474 * Task for sending ACK message
476 * @param cls the socket
477 * @param tc the Task context
481 const struct GNUNET_SCHEDULER_TaskContext *tc)
483 struct GNUNET_STREAM_Socket *socket = cls;
484 struct GNUNET_STREAM_AckMessage *ack_msg;
486 if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
491 socket->ack_task_id = 0;
493 /* Create the ACK Message */
494 ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
495 ack_msg->header.header.size = htons (sizeof (struct
496 GNUNET_STREAM_AckMessage));
497 ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
498 ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
499 ack_msg->base_sequence_number = htonl (socket->write_sequence_number);
500 ack_msg->receive_window_remaining = htonl (socket->receive_window_available);
502 /* Request MESH for sending ACK */
503 GNUNET_MESH_notify_transmit_ready (socket->tunnel,
506 socket->retransmit_timeout,
508 ntohs (ack_msg->header.header.size),
517 * Function to modify a bit in GNUNET_STREAM_AckBitmap
519 * @param bitmap the bitmap to modify
520 * @param bit the bit number to modify
521 * @param value GNUNET_YES to on, GNUNET_NO to off
524 ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
528 GNUNET_assert (bit < 64);
529 if (GNUNET_YES == value)
530 *bitmap |= (1LL << bit);
532 *bitmap &= ~(1LL << bit);
537 * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
539 * @param bitmap address of the bitmap that has to be checked
540 * @param bit the bit number to check
541 * @return GNUNET_YES if the bit is set; GNUNET_NO if not
544 ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
547 GNUNET_assert (bit < 64);
548 return 0 != (*bitmap & (1LL << bit));
554 * Function called when Data Message is sent
556 * @param cls the io_handle corresponding to the Data Message
557 * @param socket the socket which was used
560 write_data_finish_cb (void *cls,
561 struct GNUNET_STREAM_Socket *socket)
563 struct GNUNET_STREAM_IOHandle *io_handle = cls;
565 io_handle->sent_packets++;
570 * Writes data using the given socket. The amount of data written is limited by
571 * the receive_window_size
573 * @param socket the socket to use
576 write_data (struct GNUNET_STREAM_Socket *socket)
578 struct GNUNET_STREAM_IOHandle *io_handle = socket->write_handle;
583 /* Find the last acknowledged packet */
584 for (packet=0; packet < 64; packet++)
586 if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
589 else if (NULL == io_handle->messages[packet])
592 /* Resend packets which weren't ack'ed */
593 for (packet=0; packet < ack_packet; packet++)
595 if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
598 queue_message (socket,
599 &io_handle->messages[packet]->header,
604 packet = ack_packet + 1;
605 /* Now send new packets if there is enough buffer space */
606 while ( (NULL != io_handle->messages[packet]) &&
607 (io_handle->receive_window_available >= ntohs (io_handle->messages[packet]->header.header.size)) )
609 io_handle->receive_window_available -= ntohs (io_handle->messages[packet]->header.header.size);
610 queue_message (socket,
611 &io_handle->messages[packet]->header,
612 &write_data_finish_cb,
620 * Handler for DATA messages; Same for both client and server
622 * @param socket the socket through which the ack was received
623 * @param tunnel connection to the other end
624 * @param sender who sent the message
625 * @param ack the acknowledgment message
626 * @param atsi performance data for the connection
627 * @return GNUNET_OK to keep the connection open,
628 * GNUNET_SYSERR to close it (signal serious error)
631 handle_data (struct GNUNET_STREAM_Socket *socket,
632 struct GNUNET_MESH_Tunnel *tunnel,
633 const struct GNUNET_PeerIdentity *sender,
634 const struct GNUNET_STREAM_DataMessage *msg,
635 const struct GNUNET_ATS_Information*atsi)
640 size = msg->header.header.size;
641 if (size < sizeof (struct GNUNET_STREAM_DataMessage))
644 return GNUNET_SYSERR;
647 switch (socket->state)
649 case STATE_ESTABLISHED:
650 case STATE_TRANSMIT_CLOSED:
651 case STATE_TRANSMIT_CLOSE_WAIT:
652 GNUNET_assert (NULL != socket->receive_buffer);
653 /* check if the message's sequence number is greater than the one we are
655 if (ntohl (msg->sequence_number) - socket->read_sequence_number < 64)
659 else /* We are receiving a retransmitted message */
661 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
662 "Message with sequence number %d retransmitted\n",
663 ntohl (socket->read_sequence_number));
666 /* Copy Data to buffer and send acknowledgements */
667 size -= sizeof (struct GNUNET_STREAM_DataMessage);
669 memcpy (socket->receive_buffer
670 + (ntohl (msg->sequence_number) - socket->read_sequence_number)
675 /* Modify the ACK bitmap */
676 ackbitmap_modify_bit (&socket->ack_bitmap,
677 ntohl (msg->sequence_number) -
678 socket->read_sequence_number,
681 /* Start ACK sending task if one is not already present */
682 if (0 == socket->ack_task_id)
684 socket->ack_task_id =
685 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
694 /* FIXME: call statistics */
701 * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
703 * @param cls the socket (set from GNUNET_MESH_connect)
704 * @param tunnel connection to the other end
705 * @param tunnel_ctx place to store local state associated with the tunnel
706 * @param sender who sent the message
707 * @param message the actual message
708 * @param atsi performance data for the connection
709 * @return GNUNET_OK to keep the connection open,
710 * GNUNET_SYSERR to close it (signal serious error)
713 client_handle_data (void *cls,
714 struct GNUNET_MESH_Tunnel *tunnel,
716 const struct GNUNET_PeerIdentity *sender,
717 const struct GNUNET_MessageHeader *message,
718 const struct GNUNET_ATS_Information*atsi)
720 struct GNUNET_STREAM_Socket *socket = cls;
722 return handle_data (socket,
725 (const struct GNUNET_STREAM_DataMessage *) message,
731 * Callback to set state to ESTABLISHED
733 * @param cls the closure from queue_message FIXME: document
734 * @param socket the socket to requiring state change
737 set_state_established (void *cls,
738 struct GNUNET_STREAM_Socket *socket)
740 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Attaining ESTABLISHED state\n");
741 /* Initialize the receive buffer */
742 socket->receive_buffer = GNUNET_malloc (RECEIVE_BUFFER_SIZE);
743 socket->state = STATE_ESTABLISHED;
748 * Callback to set state to HELLO_WAIT
750 * @param cls the closure from queue_message
751 * @param socket the socket to requiring state change
754 set_state_hello_wait (void *cls,
755 struct GNUNET_STREAM_Socket *socket)
757 GNUNET_assert (STATE_INIT == socket->state);
758 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Attaining HELLO_WAIT state\n");
759 socket->state = STATE_HELLO_WAIT;
764 * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
766 * @param cls the socket (set from GNUNET_MESH_connect)
767 * @param tunnel connection to the other end
768 * @param tunnel_ctx this is NULL
769 * @param sender who sent the message
770 * @param message the actual message
771 * @param atsi performance data for the connection
772 * @return GNUNET_OK to keep the connection open,
773 * GNUNET_SYSERR to close it (signal serious error)
776 client_handle_hello_ack (void *cls,
777 struct GNUNET_MESH_Tunnel *tunnel,
779 const struct GNUNET_PeerIdentity *sender,
780 const struct GNUNET_MessageHeader *message,
781 const struct GNUNET_ATS_Information*atsi)
783 struct GNUNET_STREAM_Socket *socket = cls;
784 const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
785 struct GNUNET_STREAM_HelloAckMessage *reply;
787 ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
788 GNUNET_assert (socket->tunnel == tunnel);
789 switch (socket->state)
791 case STATE_HELLO_WAIT:
792 socket->read_sequence_number = ntohl (ack_msg->sequence_number);
793 socket->receive_window_available = ntohl (ack_msg->receive_window_size);
794 /* Get the random sequence number */
795 socket->write_sequence_number =
796 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
798 GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
799 reply->header.header.size =
800 htons (sizeof (struct GNUNET_STREAM_MessageHeader));
801 reply->header.header.type =
802 htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
803 reply->sequence_number = htonl (socket->write_sequence_number);
804 reply->receive_window_size = htonl (RECEIVE_BUFFER_SIZE);
805 queue_message (socket,
807 &set_state_established,
810 case STATE_ESTABLISHED:
811 case STATE_RECEIVE_CLOSE_WAIT:
812 // call statistics (# ACKs ignored++)
816 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
817 "Server sent HELLO_ACK when in state %d\n", socket->state);
818 socket->state = STATE_CLOSED; // introduce STATE_ERROR?
819 return GNUNET_SYSERR;
826 * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
828 * @param cls the socket (set from GNUNET_MESH_connect)
829 * @param tunnel connection to the other end
830 * @param tunnel_ctx this is NULL
831 * @param sender who sent the message
832 * @param message the actual message
833 * @param atsi performance data for the connection
834 * @return GNUNET_OK to keep the connection open,
835 * GNUNET_SYSERR to close it (signal serious error)
838 client_handle_reset (void *cls,
839 struct GNUNET_MESH_Tunnel *tunnel,
841 const struct GNUNET_PeerIdentity *sender,
842 const struct GNUNET_MessageHeader *message,
843 const struct GNUNET_ATS_Information*atsi)
845 struct GNUNET_STREAM_Socket *socket = cls;
852 * Common message handler for handling TRANSMIT_CLOSE messages
854 * @param socket the socket through which the ack was received
855 * @param tunnel connection to the other end
856 * @param sender who sent the message
857 * @param ack the acknowledgment message
858 * @param atsi performance data for the connection
859 * @return GNUNET_OK to keep the connection open,
860 * GNUNET_SYSERR to close it (signal serious error)
863 handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
864 struct GNUNET_MESH_Tunnel *tunnel,
865 const struct GNUNET_PeerIdentity *sender,
866 const struct GNUNET_STREAM_MessageHeader *msg,
867 const struct GNUNET_ATS_Information*atsi)
869 struct GNUNET_STREAM_MessageHeader *reply;
871 switch (socket->state)
873 case STATE_ESTABLISHED:
874 socket->state = STATE_RECEIVE_CLOSED;
876 /* Send TRANSMIT_CLOSE_ACK */
877 reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
879 htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
880 reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
881 queue_message (socket, reply, NULL, NULL);
885 /* FIXME: Call statistics? */
893 * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
895 * @param cls the socket (set from GNUNET_MESH_connect)
896 * @param tunnel connection to the other end
897 * @param tunnel_ctx this is NULL
898 * @param sender who sent the message
899 * @param message the actual message
900 * @param atsi performance data for the connection
901 * @return GNUNET_OK to keep the connection open,
902 * GNUNET_SYSERR to close it (signal serious error)
905 client_handle_transmit_close (void *cls,
906 struct GNUNET_MESH_Tunnel *tunnel,
908 const struct GNUNET_PeerIdentity *sender,
909 const struct GNUNET_MessageHeader *message,
910 const struct GNUNET_ATS_Information*atsi)
912 struct GNUNET_STREAM_Socket *socket = cls;
914 return handle_transmit_close (socket,
917 (struct GNUNET_STREAM_MessageHeader *)message,
923 * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
925 * @param cls the socket (set from GNUNET_MESH_connect)
926 * @param tunnel connection to the other end
927 * @param tunnel_ctx this is NULL
928 * @param sender who sent the message
929 * @param message the actual message
930 * @param atsi performance data for the connection
931 * @return GNUNET_OK to keep the connection open,
932 * GNUNET_SYSERR to close it (signal serious error)
935 client_handle_transmit_close_ack (void *cls,
936 struct GNUNET_MESH_Tunnel *tunnel,
938 const struct GNUNET_PeerIdentity *sender,
939 const struct GNUNET_MessageHeader *message,
940 const struct GNUNET_ATS_Information*atsi)
942 struct GNUNET_STREAM_Socket *socket = cls;
949 * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
951 * @param cls the socket (set from GNUNET_MESH_connect)
952 * @param tunnel connection to the other end
953 * @param tunnel_ctx this is NULL
954 * @param sender who sent the message
955 * @param message the actual message
956 * @param atsi performance data for the connection
957 * @return GNUNET_OK to keep the connection open,
958 * GNUNET_SYSERR to close it (signal serious error)
961 client_handle_receive_close (void *cls,
962 struct GNUNET_MESH_Tunnel *tunnel,
964 const struct GNUNET_PeerIdentity *sender,
965 const struct GNUNET_MessageHeader *message,
966 const struct GNUNET_ATS_Information*atsi)
968 struct GNUNET_STREAM_Socket *socket = cls;
975 * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
977 * @param cls the socket (set from GNUNET_MESH_connect)
978 * @param tunnel connection to the other end
979 * @param tunnel_ctx this is NULL
980 * @param sender who sent the message
981 * @param message the actual message
982 * @param atsi performance data for the connection
983 * @return GNUNET_OK to keep the connection open,
984 * GNUNET_SYSERR to close it (signal serious error)
987 client_handle_receive_close_ack (void *cls,
988 struct GNUNET_MESH_Tunnel *tunnel,
990 const struct GNUNET_PeerIdentity *sender,
991 const struct GNUNET_MessageHeader *message,
992 const struct GNUNET_ATS_Information*atsi)
994 struct GNUNET_STREAM_Socket *socket = cls;
1001 * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1003 * @param cls the socket (set from GNUNET_MESH_connect)
1004 * @param tunnel connection to the other end
1005 * @param tunnel_ctx this is NULL
1006 * @param sender who sent the message
1007 * @param message the actual message
1008 * @param atsi performance data for the connection
1009 * @return GNUNET_OK to keep the connection open,
1010 * GNUNET_SYSERR to close it (signal serious error)
1013 client_handle_close (void *cls,
1014 struct GNUNET_MESH_Tunnel *tunnel,
1016 const struct GNUNET_PeerIdentity *sender,
1017 const struct GNUNET_MessageHeader *message,
1018 const struct GNUNET_ATS_Information*atsi)
1020 struct GNUNET_STREAM_Socket *socket = cls;
1027 * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1029 * @param cls the socket (set from GNUNET_MESH_connect)
1030 * @param tunnel connection to the other end
1031 * @param tunnel_ctx this is NULL
1032 * @param sender who sent the message
1033 * @param message the actual message
1034 * @param atsi performance data for the connection
1035 * @return GNUNET_OK to keep the connection open,
1036 * GNUNET_SYSERR to close it (signal serious error)
1039 client_handle_close_ack (void *cls,
1040 struct GNUNET_MESH_Tunnel *tunnel,
1042 const struct GNUNET_PeerIdentity *sender,
1043 const struct GNUNET_MessageHeader *message,
1044 const struct GNUNET_ATS_Information*atsi)
1046 struct GNUNET_STREAM_Socket *socket = cls;
1051 /*****************************/
1052 /* Server's Message Handlers */
1053 /*****************************/
1056 * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1058 * @param cls the closure
1059 * @param tunnel connection to the other end
1060 * @param tunnel_ctx the socket
1061 * @param sender who sent the message
1062 * @param message the actual message
1063 * @param atsi performance data for the connection
1064 * @return GNUNET_OK to keep the connection open,
1065 * GNUNET_SYSERR to close it (signal serious error)
1068 server_handle_data (void *cls,
1069 struct GNUNET_MESH_Tunnel *tunnel,
1071 const struct GNUNET_PeerIdentity *sender,
1072 const struct GNUNET_MessageHeader *message,
1073 const struct GNUNET_ATS_Information*atsi)
1075 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1082 * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
1084 * @param cls the closure
1085 * @param tunnel connection to the other end
1086 * @param tunnel_ctx the socket
1087 * @param sender who sent the message
1088 * @param message the actual message
1089 * @param atsi performance data for the connection
1090 * @return GNUNET_OK to keep the connection open,
1091 * GNUNET_SYSERR to close it (signal serious error)
1094 server_handle_hello (void *cls,
1095 struct GNUNET_MESH_Tunnel *tunnel,
1097 const struct GNUNET_PeerIdentity *sender,
1098 const struct GNUNET_MessageHeader *message,
1099 const struct GNUNET_ATS_Information*atsi)
1101 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1102 struct GNUNET_STREAM_HelloAckMessage *reply;
1104 GNUNET_assert (socket->tunnel == tunnel);
1105 if (STATE_INIT == socket->state)
1107 /* Get the random sequence number */
1108 socket->write_sequence_number =
1109 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1111 GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1112 reply->header.header.size =
1113 htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1114 reply->header.header.type =
1115 htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1116 reply->sequence_number = htonl (socket->write_sequence_number);
1117 queue_message (socket,
1119 &set_state_hello_wait,
1124 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1125 "Client sent HELLO when in state %d\n", socket->state);
1126 /* FIXME: Send RESET? */
1134 * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1136 * @param cls the closure
1137 * @param tunnel connection to the other end
1138 * @param tunnel_ctx the socket
1139 * @param sender who sent the message
1140 * @param message the actual message
1141 * @param atsi performance data for the connection
1142 * @return GNUNET_OK to keep the connection open,
1143 * GNUNET_SYSERR to close it (signal serious error)
1146 server_handle_hello_ack (void *cls,
1147 struct GNUNET_MESH_Tunnel *tunnel,
1149 const struct GNUNET_PeerIdentity *sender,
1150 const struct GNUNET_MessageHeader *message,
1151 const struct GNUNET_ATS_Information*atsi)
1153 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1154 const struct GNUNET_STREAM_HelloAckMessage *ack_message;
1156 ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
1157 GNUNET_assert (socket->tunnel == tunnel);
1158 if (STATE_HELLO_WAIT == socket->state)
1160 socket->read_sequence_number = ntohl (ack_message->sequence_number);
1161 socket->receive_window_available =
1162 ntohl (ack_message->receive_window_size);
1163 socket->state = STATE_ESTABLISHED;
1167 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1168 "Client sent HELLO_ACK when in state %d\n", socket->state);
1169 /* FIXME: Send RESET? */
1177 * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1179 * @param cls the closure
1180 * @param tunnel connection to the other end
1181 * @param tunnel_ctx the socket
1182 * @param sender who sent the message
1183 * @param message the actual message
1184 * @param atsi performance data for the connection
1185 * @return GNUNET_OK to keep the connection open,
1186 * GNUNET_SYSERR to close it (signal serious error)
1189 server_handle_reset (void *cls,
1190 struct GNUNET_MESH_Tunnel *tunnel,
1192 const struct GNUNET_PeerIdentity *sender,
1193 const struct GNUNET_MessageHeader *message,
1194 const struct GNUNET_ATS_Information*atsi)
1196 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1203 * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1205 * @param cls the closure
1206 * @param tunnel connection to the other end
1207 * @param tunnel_ctx the socket
1208 * @param sender who sent the message
1209 * @param message the actual message
1210 * @param atsi performance data for the connection
1211 * @return GNUNET_OK to keep the connection open,
1212 * GNUNET_SYSERR to close it (signal serious error)
1215 server_handle_transmit_close (void *cls,
1216 struct GNUNET_MESH_Tunnel *tunnel,
1218 const struct GNUNET_PeerIdentity *sender,
1219 const struct GNUNET_MessageHeader *message,
1220 const struct GNUNET_ATS_Information*atsi)
1222 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1224 return handle_transmit_close (socket,
1227 (struct GNUNET_STREAM_MessageHeader *)message,
1233 * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1235 * @param cls the closure
1236 * @param tunnel connection to the other end
1237 * @param tunnel_ctx the socket
1238 * @param sender who sent the message
1239 * @param message the actual message
1240 * @param atsi performance data for the connection
1241 * @return GNUNET_OK to keep the connection open,
1242 * GNUNET_SYSERR to close it (signal serious error)
1245 server_handle_transmit_close_ack (void *cls,
1246 struct GNUNET_MESH_Tunnel *tunnel,
1248 const struct GNUNET_PeerIdentity *sender,
1249 const struct GNUNET_MessageHeader *message,
1250 const struct GNUNET_ATS_Information*atsi)
1252 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1259 * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1261 * @param cls the closure
1262 * @param tunnel connection to the other end
1263 * @param tunnel_ctx the socket
1264 * @param sender who sent the message
1265 * @param message the actual message
1266 * @param atsi performance data for the connection
1267 * @return GNUNET_OK to keep the connection open,
1268 * GNUNET_SYSERR to close it (signal serious error)
1271 server_handle_receive_close (void *cls,
1272 struct GNUNET_MESH_Tunnel *tunnel,
1274 const struct GNUNET_PeerIdentity *sender,
1275 const struct GNUNET_MessageHeader *message,
1276 const struct GNUNET_ATS_Information*atsi)
1278 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1285 * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1287 * @param cls the closure
1288 * @param tunnel connection to the other end
1289 * @param tunnel_ctx the socket
1290 * @param sender who sent the message
1291 * @param message the actual message
1292 * @param atsi performance data for the connection
1293 * @return GNUNET_OK to keep the connection open,
1294 * GNUNET_SYSERR to close it (signal serious error)
1297 server_handle_receive_close_ack (void *cls,
1298 struct GNUNET_MESH_Tunnel *tunnel,
1300 const struct GNUNET_PeerIdentity *sender,
1301 const struct GNUNET_MessageHeader *message,
1302 const struct GNUNET_ATS_Information*atsi)
1304 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1311 * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1313 * @param cls the closure
1314 * @param tunnel connection to the other end
1315 * @param tunnel_ctx the socket
1316 * @param sender who sent the message
1317 * @param message the actual message
1318 * @param atsi performance data for the connection
1319 * @return GNUNET_OK to keep the connection open,
1320 * GNUNET_SYSERR to close it (signal serious error)
1323 server_handle_close (void *cls,
1324 struct GNUNET_MESH_Tunnel *tunnel,
1326 const struct GNUNET_PeerIdentity *sender,
1327 const struct GNUNET_MessageHeader *message,
1328 const struct GNUNET_ATS_Information*atsi)
1330 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1337 * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1339 * @param cls the closure
1340 * @param tunnel connection to the other end
1341 * @param tunnel_ctx the socket
1342 * @param sender who sent the message
1343 * @param message the actual message
1344 * @param atsi performance data for the connection
1345 * @return GNUNET_OK to keep the connection open,
1346 * GNUNET_SYSERR to close it (signal serious error)
1349 server_handle_close_ack (void *cls,
1350 struct GNUNET_MESH_Tunnel *tunnel,
1352 const struct GNUNET_PeerIdentity *sender,
1353 const struct GNUNET_MessageHeader *message,
1354 const struct GNUNET_ATS_Information*atsi)
1356 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1363 * Message Handler for mesh
1365 * @param socket the socket through which the ack was received
1366 * @param tunnel connection to the other end
1367 * @param sender who sent the message
1368 * @param ack the acknowledgment message
1369 * @param atsi performance data for the connection
1370 * @return GNUNET_OK to keep the connection open,
1371 * GNUNET_SYSERR to close it (signal serious error)
1374 handle_ack (struct GNUNET_STREAM_Socket *socket,
1375 struct GNUNET_MESH_Tunnel *tunnel,
1376 const struct GNUNET_PeerIdentity *sender,
1377 const struct GNUNET_STREAM_AckMessage *ack,
1378 const struct GNUNET_ATS_Information*atsi)
1380 switch (socket->state)
1382 case (STATE_ESTABLISHED):
1383 if (NULL == socket->write_handle)
1385 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1386 "Received DATA ACK when write_handle is NULL\n");
1390 socket->write_handle->ack_bitmap = GNUNET_ntohll (ack->bitmap);
1391 socket->write_handle->receive_window_available =
1392 ntohl (ack->receive_window_remaining);
1393 write_data (socket);
1403 * Message Handler for mesh
1405 * @param cls the 'struct GNUNET_STREAM_Socket'
1406 * @param tunnel connection to the other end
1407 * @param tunnel_ctx unused
1408 * @param sender who sent the message
1409 * @param message the actual message
1410 * @param atsi performance data for the connection
1411 * @return GNUNET_OK to keep the connection open,
1412 * GNUNET_SYSERR to close it (signal serious error)
1415 client_handle_ack (void *cls,
1416 struct GNUNET_MESH_Tunnel *tunnel,
1418 const struct GNUNET_PeerIdentity *sender,
1419 const struct GNUNET_MessageHeader *message,
1420 const struct GNUNET_ATS_Information*atsi)
1422 struct GNUNET_STREAM_Socket *socket = cls;
1423 const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
1425 return handle_ack (socket, tunnel, sender, ack, atsi);
1430 * Message Handler for mesh
1432 * @param cls the server's listen socket
1433 * @param tunnel connection to the other end
1434 * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
1435 * @param sender who sent the message
1436 * @param message the actual message
1437 * @param atsi performance data for the connection
1438 * @return GNUNET_OK to keep the connection open,
1439 * GNUNET_SYSERR to close it (signal serious error)
1442 server_handle_ack (void *cls,
1443 struct GNUNET_MESH_Tunnel *tunnel,
1445 const struct GNUNET_PeerIdentity *sender,
1446 const struct GNUNET_MessageHeader *message,
1447 const struct GNUNET_ATS_Information*atsi)
1449 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1450 const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
1452 return handle_ack (socket, tunnel, sender, ack, atsi);
1457 * For client message handlers, the stream socket is in the
1460 static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
1461 {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
1462 {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK,
1463 sizeof (struct GNUNET_STREAM_AckMessage) },
1464 {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
1465 sizeof (struct GNUNET_STREAM_HelloAckMessage)},
1466 {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
1467 sizeof (struct GNUNET_STREAM_MessageHeader)},
1468 {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
1469 sizeof (struct GNUNET_STREAM_MessageHeader)},
1470 {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
1471 sizeof (struct GNUNET_STREAM_MessageHeader)},
1472 {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
1473 sizeof (struct GNUNET_STREAM_MessageHeader)},
1474 {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
1475 sizeof (struct GNUNET_STREAM_MessageHeader)},
1476 {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
1477 sizeof (struct GNUNET_STREAM_MessageHeader)},
1478 {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
1479 sizeof (struct GNUNET_STREAM_MessageHeader)},
1485 * For server message handlers, the stream socket is in the
1486 * tunnel context, and the listen socket in the closure argument.
1488 static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
1489 {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
1490 {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK,
1491 sizeof (struct GNUNET_STREAM_AckMessage) },
1492 {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO,
1493 sizeof (struct GNUNET_STREAM_MessageHeader)},
1494 {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
1495 sizeof (struct GNUNET_STREAM_HelloAckMessage)},
1496 {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
1497 sizeof (struct GNUNET_STREAM_MessageHeader)},
1498 {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
1499 sizeof (struct GNUNET_STREAM_MessageHeader)},
1500 {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
1501 sizeof (struct GNUNET_STREAM_MessageHeader)},
1502 {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
1503 sizeof (struct GNUNET_STREAM_MessageHeader)},
1504 {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
1505 sizeof (struct GNUNET_STREAM_MessageHeader)},
1506 {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
1507 sizeof (struct GNUNET_STREAM_MessageHeader)},
1508 {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
1509 sizeof (struct GNUNET_STREAM_MessageHeader)},
1515 * Function called when our target peer is connected to our tunnel
1517 * @param cls the socket for which this tunnel is created
1518 * @param peer the peer identity of the target
1519 * @param atsi performance data for the connection
1522 mesh_peer_connect_callback (void *cls,
1523 const struct GNUNET_PeerIdentity *peer,
1524 const struct GNUNET_ATS_Information * atsi)
1526 struct GNUNET_STREAM_Socket *socket = cls;
1527 struct GNUNET_STREAM_MessageHeader *message;
1529 if (0 != memcmp (&socket->other_peer,
1531 sizeof (struct GNUNET_PeerIdentity)))
1533 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1534 "A peer (%s) which is not our target has connected to our tunnel",
1539 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1540 "Target peer %s connected\n", GNUNET_i2s (peer));
1542 /* Set state to INIT */
1543 socket->state = STATE_INIT;
1545 /* Send HELLO message */
1546 message = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1547 message->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
1548 message->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1549 queue_message (socket,
1551 &set_state_hello_wait,
1554 /* Call open callback */
1555 if (NULL == socket->open_cb)
1557 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1558 "STREAM_open callback is NULL\n");
1562 socket->open_cb (socket->open_cls, socket);
1568 * Function called when our target peer is disconnected from our tunnel
1570 * @param cls the socket associated which this tunnel
1571 * @param peer the peer identity of the target
1574 mesh_peer_disconnect_callback (void *cls,
1575 const struct GNUNET_PeerIdentity *peer)
1587 * Tries to open a stream to the target peer
1589 * @param cfg configuration to use
1590 * @param target the target peer to which the stream has to be opened
1591 * @param app_port the application port number which uniquely identifies this
1593 * @param open_cb this function will be called after stream has be established
1594 * @param open_cb_cls the closure for open_cb
1595 * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
1596 * @return if successful it returns the stream socket; NULL if stream cannot be
1599 struct GNUNET_STREAM_Socket *
1600 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
1601 const struct GNUNET_PeerIdentity *target,
1602 GNUNET_MESH_ApplicationType app_port,
1603 GNUNET_STREAM_OpenCallback open_cb,
1607 struct GNUNET_STREAM_Socket *socket;
1608 enum GNUNET_STREAM_Option option;
1609 va_list vargs; /* Variable arguments */
1611 socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
1612 socket->other_peer = *target;
1613 socket->open_cb = open_cb;
1614 socket->open_cls = open_cb_cls;
1617 socket->retransmit_timeout =
1618 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
1620 va_start (vargs, open_cb_cls); /* Parse variable args */
1622 option = va_arg (vargs, enum GNUNET_STREAM_Option);
1625 case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
1626 /* Expect struct GNUNET_TIME_Relative */
1627 socket->retransmit_timeout = va_arg (vargs,
1628 struct GNUNET_TIME_Relative);
1630 case GNUNET_STREAM_OPTION_END:
1633 } while (GNUNET_STREAM_OPTION_END != option);
1634 va_end (vargs); /* End of variable args parsing */
1636 socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
1637 1, /* QUEUE size as parameter? */
1639 NULL, /* No inbound tunnel handler */
1640 NULL, /* No inbound tunnel cleaner */
1641 client_message_handlers,
1642 NULL); /* We don't get inbound tunnels */
1643 // FIXME: if (NULL == socket->mesh) ...
1645 /* Now create the mesh tunnel to target */
1646 socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
1647 NULL, /* Tunnel context */
1648 &mesh_peer_connect_callback,
1649 &mesh_peer_disconnect_callback,
1651 // FIXME: if (NULL == socket->tunnel) ...
1660 * @param socket the stream socket
1663 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
1665 struct MessageQueue *head;
1667 /* Clear Transmit handles */
1668 if (NULL != socket->transmit_handle)
1670 GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
1671 socket->transmit_handle = NULL;
1674 /* Clear existing message queue */
1675 while (NULL != (head = socket->queue_head)) {
1676 GNUNET_CONTAINER_DLL_remove (socket->queue_head,
1679 GNUNET_free (head->message);
1683 /* Close associated tunnel */
1684 if (NULL != socket->tunnel)
1686 GNUNET_MESH_tunnel_destroy (socket->tunnel);
1687 socket->tunnel = NULL;
1690 /* Close mesh connection */
1691 if (NULL != socket->mesh)
1693 GNUNET_MESH_disconnect (socket->mesh);
1694 socket->mesh = NULL;
1697 /* Release receive buffer */
1698 if (NULL != socket->receive_buffer)
1700 GNUNET_free (socket->receive_buffer);
1703 GNUNET_free (socket);
1708 * Method called whenever a peer creates a tunnel to us
1710 * @param cls closure
1711 * @param tunnel new handle to the tunnel
1712 * @param initiator peer that started the tunnel
1713 * @param atsi performance information for the tunnel
1714 * @return initial tunnel context for the tunnel
1715 * (can be NULL -- that's not an error)
1718 new_tunnel_notify (void *cls,
1719 struct GNUNET_MESH_Tunnel *tunnel,
1720 const struct GNUNET_PeerIdentity *initiator,
1721 const struct GNUNET_ATS_Information *atsi)
1723 struct GNUNET_STREAM_ListenSocket *lsocket = cls;
1724 struct GNUNET_STREAM_Socket *socket;
1726 socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
1727 socket->tunnel = tunnel;
1728 socket->session_id = 0; /* FIXME */
1729 socket->other_peer = *initiator;
1730 socket->state = STATE_INIT;
1732 if (GNUNET_SYSERR == lsocket->listen_cb (lsocket->listen_cb_cls,
1734 &socket->other_peer))
1736 socket->state = STATE_CLOSED;
1737 /* FIXME: Send CLOSE message and then free */
1738 GNUNET_free (socket);
1739 GNUNET_MESH_tunnel_destroy (tunnel); /* Destroy the tunnel */
1746 * Function called whenever an inbound tunnel is destroyed. Should clean up
1747 * any associated state. This function is NOT called if the client has
1748 * explicitly asked for the tunnel to be destroyed using
1749 * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
1752 * @param cls closure (set from GNUNET_MESH_connect)
1753 * @param tunnel connection to the other end (henceforth invalid)
1754 * @param tunnel_ctx place where local state associated
1755 * with the tunnel is stored
1758 tunnel_cleaner (void *cls,
1759 const struct GNUNET_MESH_Tunnel *tunnel,
1762 struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
1764 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1765 "Peer %s has terminated connection abruptly\n",
1766 GNUNET_i2s (&socket->other_peer));
1768 socket->status = GNUNET_STREAM_SHUTDOWN;
1769 /* Clear Transmit handles */
1770 if (NULL != socket->transmit_handle)
1772 GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
1773 socket->transmit_handle = NULL;
1775 socket->tunnel = NULL;
1780 * Listens for stream connections for a specific application ports
1782 * @param cfg the configuration to use
1783 * @param app_port the application port for which new streams will be accepted
1784 * @param listen_cb this function will be called when a peer tries to establish
1786 * @param listen_cb_cls closure for listen_cb
1787 * @return listen socket, NULL for any error
1789 struct GNUNET_STREAM_ListenSocket *
1790 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
1791 GNUNET_MESH_ApplicationType app_port,
1792 GNUNET_STREAM_ListenCallback listen_cb,
1793 void *listen_cb_cls)
1795 /* FIXME: Add variable args for passing configration options? */
1796 struct GNUNET_STREAM_ListenSocket *lsocket;
1797 GNUNET_MESH_ApplicationType app_types[2];
1799 app_types[0] = app_port;
1801 lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
1802 lsocket->port = app_port;
1803 lsocket->listen_cb = listen_cb;
1804 lsocket->listen_cb_cls = listen_cb_cls;
1805 lsocket->mesh = GNUNET_MESH_connect (cfg,
1806 10, /* FIXME: QUEUE size as parameter? */
1807 lsocket, /* Closure */
1810 server_message_handlers,
1817 * Closes the listen socket
1819 * @param lsocket the listen socket
1822 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
1824 /* Close MESH connection */
1825 GNUNET_MESH_disconnect (lsocket->mesh);
1827 GNUNET_free (lsocket);
1832 * Tries to write the given data to the stream
1834 * @param socket the socket representing a stream
1835 * @param data the data buffer from where the data is written into the stream
1836 * @param size the number of bytes to be written from the data buffer
1837 * @param timeout the timeout period
1838 * @param write_cont the function to call upon writing some bytes into the stream
1839 * @param write_cont_cls the closure
1840 * @return handle to cancel the operation
1842 struct GNUNET_STREAM_IOHandle *
1843 GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
1846 struct GNUNET_TIME_Relative timeout,
1847 GNUNET_STREAM_CompletionContinuation write_cont,
1848 void *write_cont_cls)
1850 unsigned int num_needed_packets;
1851 unsigned int packet;
1852 struct GNUNET_STREAM_IOHandle *io_handle;
1854 struct GNUNET_STREAM_DataMessage *data_msg;
1857 /* There is already a write request pending */
1858 if (NULL != socket->write_handle)
1863 if (!((STATE_ESTABLISHED == socket->state)
1864 || (STATE_RECEIVE_CLOSE_WAIT == socket->state)
1865 || (STATE_RECEIVE_CLOSED == socket->state)))
1867 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1868 "Attempting to write on a closed (OR) not-yet-established"
1872 if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size)
1873 size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size;
1874 num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size;
1875 io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOHandle));
1876 io_handle->receive_window_available = socket->receive_window_available;
1878 /* Divide the given buffer into packets for sending */
1879 for (packet=0; packet < num_needed_packets; packet++)
1881 if ((packet + 1) * max_payload_size < size)
1883 packet_size = MAX_PACKET_SIZE;
1887 packet_size = size - packet * max_payload_size
1888 + sizeof (struct GNUNET_STREAM_DataMessage);
1890 io_handle->messages[packet] = GNUNET_malloc (packet_size);
1891 io_handle->messages[packet]->header.header.size = htons (packet_size);
1892 io_handle->messages[packet]->header.header.type =
1893 htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
1894 io_handle->messages[packet]->sequence_number =
1895 htons (socket->write_sequence_number++);
1896 data_msg = io_handle->messages[packet];
1897 memcpy (&data_msg[1],
1899 packet_size - sizeof (struct GNUNET_STREAM_DataMessage));
1900 sweep += packet_size - sizeof (struct GNUNET_STREAM_DataMessage);
1903 write_data (socket);