2 This file is part of GNUnet.
3 (C) 2012 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
23 * Checks for matching the sender and socket->other_peer in server
26 * Add code for write io timeout
28 * Include retransmission for control messages
32 * @file stream/stream_api.c
33 * @brief Implementation of the stream library
34 * @author Sree Harsha Totakura
39 #include "gnunet_common.h"
40 #include "gnunet_crypto_lib.h"
41 #include "gnunet_lockmanager_service.h"
42 #include "gnunet_stream_lib.h"
43 #include "stream_protocol.h"
46 * Generic logging shorthand
48 #define LOG(kind,...) \
49 GNUNET_log_from (kind, "stream-api", __VA_ARGS__)
52 * Debug logging shorthand
54 #define LOG_DEBUG(...) \
55 LOG (GNUNET_ERROR_TYPE_DEBUG, __VA_ARGS__)
58 * Time in relative seconds shorthand
60 #define TIME_REL_SECS(sec) \
61 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, sec)
64 * The maximum packet size of a stream packet
66 #define DEFAULT_MAX_PAYLOAD_SIZE 64000
71 #define RECEIVE_BUFFER_SIZE 4096000
74 * states in the Protocol
79 * Client initialization state
84 * Listener initialization state
89 * Pre-connection establishment state
94 * State where a connection has been established
99 * State where the socket is closed on our side and waiting to be ACK'ed
101 STATE_RECEIVE_CLOSE_WAIT,
104 * State where the socket is closed for reading
106 STATE_RECEIVE_CLOSED,
109 * State where the socket is closed on our side and waiting to be ACK'ed
111 STATE_TRANSMIT_CLOSE_WAIT,
114 * State where the socket is closed for writing
116 STATE_TRANSMIT_CLOSED,
119 * State where the socket is closed on our side and waiting to be ACK'ed
124 * State where the socket is closed
131 * Functions of this type are called when a message is written
133 * @param cls the closure from queue_message
134 * @param socket the socket the written message was bound to
136 typedef void (*SendFinishCallback) (void *cls,
137 struct GNUNET_STREAM_Socket *socket);
141 * The send message queue
148 struct GNUNET_STREAM_MessageHeader *message;
151 * Callback to be called when the message is sent
153 SendFinishCallback finish_cb;
156 * The closure for finish_cb
161 * The next message in queue. Should be NULL in the last message
163 struct MessageQueue *next;
166 * The next message in queue. Should be NULL in the first message
168 struct MessageQueue *prev;
173 * The STREAM Socket Handler
175 struct GNUNET_STREAM_Socket
178 * Retransmission timeout
180 struct GNUNET_TIME_Relative retransmit_timeout;
183 * The Acknowledgement Bitmap
185 GNUNET_STREAM_AckBitmap ack_bitmap;
188 * Time when the Acknowledgement was queued
190 struct GNUNET_TIME_Absolute ack_time_registered;
193 * Queued Acknowledgement deadline
195 struct GNUNET_TIME_Relative ack_time_deadline;
200 struct GNUNET_MESH_Handle *mesh;
203 * The mesh tunnel handle
205 struct GNUNET_MESH_Tunnel *tunnel;
208 * Stream open closure
213 * Stream open callback
215 GNUNET_STREAM_OpenCallback open_cb;
218 * The current transmit handle (if a pending transmit request exists)
220 struct GNUNET_MESH_TransmitHandle *transmit_handle;
223 * The current message associated with the transmit handle
225 struct MessageQueue *queue_head;
228 * The queue tail, should always point to the last message in queue
230 struct MessageQueue *queue_tail;
233 * The write IO_handle associated with this socket
235 struct GNUNET_STREAM_IOWriteHandle *write_handle;
238 * The read IO_handle associated with this socket
240 struct GNUNET_STREAM_IOReadHandle *read_handle;
243 * The shutdown handle associated with this socket
245 struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
248 * Buffer for storing received messages
250 void *receive_buffer;
253 * The listen socket from which this socket is derived. Should be NULL if it
254 * is not a derived socket
256 struct GNUNET_STREAM_ListenSocket *lsocket;
259 * The peer identity of the peer at the other end of the stream
261 struct GNUNET_PeerIdentity other_peer;
264 * Task identifier for retransmission task after timeout
266 GNUNET_SCHEDULER_TaskIdentifier data_retransmission_task_id;
269 * Task identifier for retransmission of control messages
271 GNUNET_SCHEDULER_TaskIdentifier control_retransmission_task_id;
274 * The task for sending timely Acks
276 GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
279 * The state of the protocol associated with this socket
284 * The status of the socket
286 enum GNUNET_STREAM_Status status;
289 * The number of previous timeouts; FIXME: currently not used
291 unsigned int retries;
294 * The application port number (type: uint32_t)
296 GNUNET_MESH_ApplicationType app_port;
299 * Whether testing mode is active or not
304 * The write sequence number to be set incase of testing
306 uint32_t testing_set_write_sequence_number_value;
309 * Write sequence number. Set to random when sending HELLO(client) and
312 uint32_t write_sequence_number;
315 * Read sequence number. This number's value is determined during handshake
317 uint32_t read_sequence_number;
320 * The receiver buffer size
322 uint32_t receive_buffer_size;
325 * The receiver buffer boundaries
327 uint32_t receive_buffer_boundaries[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
330 * receiver's available buffer after the last acknowledged packet
332 uint32_t receiver_window_available;
335 * The offset pointer used during write operation
337 uint32_t write_offset;
340 * The offset after which we are expecting data
342 uint32_t read_offset;
345 * The offset upto which user has read from the received buffer
347 uint32_t copy_offset;
350 * The maximum size of the data message payload this stream handle can send
352 uint16_t max_payload_size;
357 * A socket for listening
359 struct GNUNET_STREAM_ListenSocket
364 struct GNUNET_MESH_Handle *mesh;
369 struct GNUNET_CONFIGURATION_Handle *cfg;
372 * Handle to the lock manager service
374 struct GNUNET_LOCKMANAGER_Handle *lockmanager;
377 * The active LockingRequest from lockmanager
379 struct GNUNET_LOCKMANAGER_LockingRequest *locking_request;
382 * Callback to call after acquring a lock and listening
384 GNUNET_STREAM_ListenSuccessCallback listen_ok_cb;
387 * The callback function which is called after successful opening socket
389 GNUNET_STREAM_ListenCallback listen_cb;
392 * The call back closure
399 GNUNET_MESH_ApplicationType port;
402 * The id of the lockmanager timeout task
404 GNUNET_SCHEDULER_TaskIdentifier lockmanager_acquire_timeout_task;
407 * The retransmit timeout
409 struct GNUNET_TIME_Relative retransmit_timeout;
417 * Whether testing mode is active or not
422 * The write sequence number to be set incase of testing
424 uint32_t testing_set_write_sequence_number_value;
427 * The maximum size of the data message payload this stream handle can send
429 uint16_t max_payload_size;
435 * The IO Write Handle
437 struct GNUNET_STREAM_IOWriteHandle
440 * The socket to which this write handle is associated
442 struct GNUNET_STREAM_Socket *socket;
445 * The packet_buffers associated with this Handle
447 struct GNUNET_STREAM_DataMessage *messages[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
450 * The write continuation callback
452 GNUNET_STREAM_CompletionContinuation write_cont;
455 * Write continuation closure
457 void *write_cont_cls;
460 * The bitmap of this IOHandle; Corresponding bit for a message is set when
461 * it has been acknowledged by the receiver
463 GNUNET_STREAM_AckBitmap ack_bitmap;
466 * Number of bytes in this write handle
471 * Number of packets already transmitted from this IO handle. Retransmitted
472 * packets are not taken into account here. This is used to determine which
473 * packets account for retransmission and which packets occupy buffer space at
476 unsigned int packets_sent;
483 struct GNUNET_STREAM_IOReadHandle
486 * The socket to which this read handle is associated
488 struct GNUNET_STREAM_Socket *socket;
491 * Callback for the read processor
493 GNUNET_STREAM_DataProcessor proc;
496 * The closure pointer for the read processor callback
501 * Task identifier for the read io timeout task
503 GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id;
506 * Task scheduled to continue a read operation.
508 GNUNET_SCHEDULER_TaskIdentifier read_task_id;
513 * Handle for Shutdown
515 struct GNUNET_STREAM_ShutdownHandle
518 * The socket associated with this shutdown handle
520 struct GNUNET_STREAM_Socket *socket;
523 * Shutdown completion callback
525 GNUNET_STREAM_ShutdownCompletion completion_cb;
528 * Closure for completion callback
530 void *completion_cls;
533 * Close message retransmission task id
535 GNUNET_SCHEDULER_TaskIdentifier close_msg_retransmission_task_id;
538 * Which operation to shutdown? SHUT_RD, SHUT_WR or SHUT_RDWR
545 * Default value in seconds for various timeouts
547 static const unsigned int default_timeout = 10;
550 * The domain name for locks we use here
552 static const char *locking_domain = "GNUNET_STREAM_APPLOCK";
556 * Callback function for sending queued message
558 * @param cls closure the socket
559 * @param size number of bytes available in buf
560 * @param buf where the callee should write the message
561 * @return number of bytes written to buf
564 send_message_notify (void *cls, size_t size, void *buf)
566 struct GNUNET_STREAM_Socket *socket = cls;
567 struct MessageQueue *head;
570 socket->transmit_handle = NULL; /* Remove the transmit handle */
571 head = socket->queue_head;
573 return 0; /* just to be safe */
574 if (0 == size) /* request timed out */
577 LOG (GNUNET_ERROR_TYPE_DEBUG,
578 "%s: Message sending timed out. Retry %d \n",
579 GNUNET_i2s (&socket->other_peer),
581 socket->transmit_handle =
582 GNUNET_MESH_notify_transmit_ready (socket->tunnel,
583 GNUNET_NO, /* Corking */
584 /* FIXME: exponential backoff */
585 socket->retransmit_timeout,
587 ntohs (head->message->header.size),
588 &send_message_notify,
592 ret = ntohs (head->message->header.size);
593 GNUNET_assert (size >= ret);
594 memcpy (buf, head->message, ret);
595 if (NULL != head->finish_cb)
597 head->finish_cb (head->finish_cb_cls, socket);
599 GNUNET_CONTAINER_DLL_remove (socket->queue_head,
602 GNUNET_free (head->message);
604 head = socket->queue_head;
605 if (NULL != head) /* more pending messages to send */
608 socket->transmit_handle =
609 GNUNET_MESH_notify_transmit_ready (socket->tunnel,
610 GNUNET_NO, /* Corking */
611 /* FIXME: exponential backoff */
612 socket->retransmit_timeout,
614 ntohs (head->message->header.size),
615 &send_message_notify,
623 * Queues a message for sending using the mesh connection of a socket
625 * @param socket the socket whose mesh connection is used
626 * @param message the message to be sent
627 * @param finish_cb the callback to be called when the message is sent
628 * @param finish_cb_cls the closure for the callback
629 * @param urgent set to GNUNET_YES to add the message to the beginning of the
630 * queue; GNUNET_NO to add at the tail
633 queue_message (struct GNUNET_STREAM_Socket *socket,
634 struct GNUNET_STREAM_MessageHeader *message,
635 SendFinishCallback finish_cb,
639 struct MessageQueue *queue_entity;
642 ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA)
643 && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK));
644 LOG (GNUNET_ERROR_TYPE_DEBUG,
645 "%s: Queueing message of type %d and size %d\n",
646 GNUNET_i2s (&socket->other_peer),
647 ntohs (message->header.type),
648 ntohs (message->header.size));
649 GNUNET_assert (NULL != message);
650 queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
651 queue_entity->message = message;
652 queue_entity->finish_cb = finish_cb;
653 queue_entity->finish_cb_cls = finish_cb_cls;
654 if (GNUNET_YES == urgent)
656 GNUNET_CONTAINER_DLL_insert (socket->queue_head, socket->queue_tail,
658 if (NULL != socket->transmit_handle)
660 GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
661 socket->transmit_handle = NULL;
665 GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
668 if (NULL == socket->transmit_handle)
671 socket->transmit_handle =
672 GNUNET_MESH_notify_transmit_ready (socket->tunnel,
673 GNUNET_NO, /* Corking */
674 socket->retransmit_timeout,
676 ntohs (message->header.size),
677 &send_message_notify,
684 * Copies a message and queues it for sending using the mesh connection of
687 * @param socket the socket whose mesh connection is used
688 * @param message the message to be sent
689 * @param finish_cb the callback to be called when the message is sent
690 * @param finish_cb_cls the closure for the callback
693 copy_and_queue_message (struct GNUNET_STREAM_Socket *socket,
694 const struct GNUNET_STREAM_MessageHeader *message,
695 SendFinishCallback finish_cb,
698 struct GNUNET_STREAM_MessageHeader *msg_copy;
701 size = ntohs (message->header.size);
702 msg_copy = GNUNET_malloc (size);
703 memcpy (msg_copy, message, size);
704 queue_message (socket, msg_copy, finish_cb, finish_cb_cls, GNUNET_NO);
709 * Writes data using the given socket. The amount of data written is limited by
710 * the receiver_window_size
712 * @param socket the socket to use
715 write_data (struct GNUNET_STREAM_Socket *socket);
719 * Task for retransmitting data messages if they aren't ACK before their ack
722 * @param cls the socket
723 * @param tc the Task context
726 data_retransmission_task (void *cls,
727 const struct GNUNET_SCHEDULER_TaskContext *tc)
729 struct GNUNET_STREAM_Socket *socket = cls;
731 socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
732 if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
734 LOG (GNUNET_ERROR_TYPE_DEBUG,
735 "%s: Retransmitting DATA...\n", GNUNET_i2s (&socket->other_peer));
741 * Task for sending ACK message
743 * @param cls the socket
744 * @param tc the Task context
748 const struct GNUNET_SCHEDULER_TaskContext *tc)
750 struct GNUNET_STREAM_Socket *socket = cls;
751 struct GNUNET_STREAM_AckMessage *ack_msg;
753 socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
754 if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
756 /* Create the ACK Message */
757 ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
758 ack_msg->header.header.size = htons (sizeof (struct
759 GNUNET_STREAM_AckMessage));
760 ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
761 ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
762 ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
763 ack_msg->receive_window_remaining =
764 htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
765 /* Queue up ACK for immediate sending */
766 queue_message (socket, &ack_msg->header, NULL, NULL, GNUNET_YES);
771 * Retransmission task for shutdown messages
773 * @param cls the shutdown handle
774 * @param tc the Task Context
777 close_msg_retransmission_task (void *cls,
778 const struct GNUNET_SCHEDULER_TaskContext *tc)
780 struct GNUNET_STREAM_ShutdownHandle *shutdown_handle = cls;
781 struct GNUNET_STREAM_MessageHeader *msg;
782 struct GNUNET_STREAM_Socket *socket;
784 shutdown_handle->close_msg_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
785 GNUNET_assert (NULL != shutdown_handle);
786 if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
788 socket = shutdown_handle->socket;
789 msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
790 msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
791 switch (shutdown_handle->operation)
794 msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
797 msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
800 msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
804 shutdown_handle->close_msg_retransmission_task_id =
805 GNUNET_SCHEDULER_NO_TASK;
808 queue_message (socket, msg, NULL, NULL, GNUNET_NO);
809 shutdown_handle->close_msg_retransmission_task_id =
810 GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
811 &close_msg_retransmission_task,
817 * Function to modify a bit in GNUNET_STREAM_AckBitmap
819 * @param bitmap the bitmap to modify
820 * @param bit the bit number to modify
821 * @param value GNUNET_YES to on, GNUNET_NO to off
824 ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
828 GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
829 if (GNUNET_YES == value)
830 *bitmap |= (1LL << bit);
832 *bitmap &= ~(1LL << bit);
837 * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
839 * @param bitmap address of the bitmap that has to be checked
840 * @param bit the bit number to check
841 * @return GNUNET_YES if the bit is set; GNUNET_NO if not
844 ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
847 GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
848 return 0 != (*bitmap & (1LL << bit));
853 * Writes data using the given socket. The amount of data written is limited by
854 * the receiver_window_size
856 * @param socket the socket to use
859 write_data (struct GNUNET_STREAM_Socket *socket)
861 struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
864 for (packet=0; packet < io_handle->packets_sent; packet++)
866 if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
869 LOG (GNUNET_ERROR_TYPE_DEBUG,
870 "%s: Retransmitting DATA message with sequence %u\n",
871 GNUNET_i2s (&socket->other_peer),
872 ntohl (io_handle->messages[packet]->sequence_number));
873 copy_and_queue_message (socket,
874 &io_handle->messages[packet]->header,
879 /* Now send new packets if there is enough buffer space */
880 while ( (NULL != io_handle->messages[packet]) &&
881 (socket->receiver_window_available
882 >= ntohs (io_handle->messages[packet]->header.header.size)) &&
883 (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
885 socket->receiver_window_available -=
886 ntohs (io_handle->messages[packet]->header.header.size);
887 LOG (GNUNET_ERROR_TYPE_DEBUG,
888 "%s: Placing DATA message with sequence %u in send queue\n",
889 GNUNET_i2s (&socket->other_peer),
890 ntohl (io_handle->messages[packet]->sequence_number));
891 copy_and_queue_message (socket,
892 &io_handle->messages[packet]->header,
897 io_handle->packets_sent = packet;
898 if (GNUNET_SCHEDULER_NO_TASK == socket->data_retransmission_task_id)
899 socket->data_retransmission_task_id =
900 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
901 (GNUNET_TIME_UNIT_SECONDS, 8),
902 &data_retransmission_task,
908 * Task for calling the read processor
910 * @param cls the socket
911 * @param tc the task context
914 call_read_processor (void *cls,
915 const struct GNUNET_SCHEDULER_TaskContext *tc)
917 struct GNUNET_STREAM_Socket *socket = cls;
918 struct GNUNET_STREAM_IOReadHandle *read_handle;
920 size_t valid_read_size;
922 uint32_t sequence_increase;
923 uint32_t offset_increase;
925 read_handle = socket->read_handle;
926 GNUNET_assert (NULL != read_handle);
927 read_handle->read_task_id = GNUNET_SCHEDULER_NO_TASK;
928 if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
930 if (NULL == socket->receive_buffer)
932 GNUNET_assert (NULL != socket->read_handle);
933 GNUNET_assert (NULL != socket->read_handle->proc);
934 /* Check the bitmap for any holes */
935 for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
937 if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
941 /* We only call read processor if we have the first packet */
942 GNUNET_assert (0 < packet);
944 socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
945 GNUNET_assert (0 != valid_read_size);
946 /* Cancel the read_io_timeout_task */
947 GNUNET_SCHEDULER_cancel (read_handle->read_io_timeout_task_id);
948 read_handle->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
949 /* Call the data processor */
950 LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Calling read processor\n",
951 GNUNET_i2s (&socket->other_peer));
953 socket->read_handle->proc (socket->read_handle->proc_cls,
955 socket->receive_buffer + socket->copy_offset,
957 LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor read %d bytes\n",
958 GNUNET_i2s (&socket->other_peer), read_size);
959 LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor completed successfully\n",
960 GNUNET_i2s (&socket->other_peer));
961 /* Free the read handle */
962 GNUNET_free (socket->read_handle);
963 socket->read_handle = NULL;
964 GNUNET_assert (read_size <= valid_read_size);
965 socket->copy_offset += read_size;
966 /* Determine upto which packet we can remove from the buffer */
967 for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
969 if (socket->copy_offset == socket->receive_buffer_boundaries[packet])
971 if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
974 /* If no packets can be removed we can't move the buffer */
977 sequence_increase = packet;
978 LOG (GNUNET_ERROR_TYPE_DEBUG,
979 "%s: Sequence increase after read processor completion: %u\n",
980 GNUNET_i2s (&socket->other_peer), sequence_increase);
981 /* Shift the data in the receive buffer */
982 socket->receive_buffer =
983 memmove (socket->receive_buffer,
984 socket->receive_buffer
985 + socket->receive_buffer_boundaries[sequence_increase-1],
986 socket->receive_buffer_size
987 - socket->receive_buffer_boundaries[sequence_increase-1]);
988 /* Shift the bitmap */
989 socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
990 /* Set read_sequence_number */
991 socket->read_sequence_number += sequence_increase;
992 /* Set read_offset */
993 offset_increase = socket->receive_buffer_boundaries[sequence_increase-1];
994 socket->read_offset += offset_increase;
995 /* Fix copy_offset */
996 GNUNET_assert (offset_increase <= socket->copy_offset);
997 socket->copy_offset -= offset_increase;
998 /* Fix relative boundaries */
999 for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
1001 if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase)
1003 uint32_t ahead_buffer_boundary;
1005 ahead_buffer_boundary =
1006 socket->receive_buffer_boundaries[packet + sequence_increase];
1007 if (0 == ahead_buffer_boundary)
1008 socket->receive_buffer_boundaries[packet] = 0;
1011 GNUNET_assert (offset_increase < ahead_buffer_boundary);
1012 socket->receive_buffer_boundaries[packet] =
1013 ahead_buffer_boundary - offset_increase;
1017 socket->receive_buffer_boundaries[packet] = 0;
1023 * Cancels the existing read io handle
1025 * @param cls the closure from the SCHEDULER call
1026 * @param tc the task context
1029 read_io_timeout (void *cls,
1030 const struct GNUNET_SCHEDULER_TaskContext *tc)
1032 struct GNUNET_STREAM_Socket *socket = cls;
1033 struct GNUNET_STREAM_IOReadHandle *read_handle;
1034 GNUNET_STREAM_DataProcessor proc;
1037 read_handle = socket->read_handle;
1038 GNUNET_assert (NULL != read_handle);
1039 read_handle->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
1040 if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
1042 if (read_handle->read_task_id != GNUNET_SCHEDULER_NO_TASK)
1044 LOG (GNUNET_ERROR_TYPE_DEBUG,
1045 "%s: Read task timedout - Cancelling it\n",
1046 GNUNET_i2s (&socket->other_peer));
1047 GNUNET_SCHEDULER_cancel (read_handle->read_task_id);
1048 read_handle->read_task_id = GNUNET_SCHEDULER_NO_TASK;
1050 proc = read_handle->proc;
1051 proc_cls = read_handle->proc_cls;
1052 GNUNET_free (read_handle);
1053 socket->read_handle = NULL;
1054 /* Call the read processor to signal timeout */
1056 GNUNET_STREAM_TIMEOUT,
1063 * Handler for DATA messages; Same for both client and server
1065 * @param socket the socket through which the ack was received
1066 * @param tunnel connection to the other end
1067 * @param sender who sent the message
1068 * @param msg the data message
1069 * @param atsi performance data for the connection
1070 * @return GNUNET_OK to keep the connection open,
1071 * GNUNET_SYSERR to close it (signal serious error)
1074 handle_data (struct GNUNET_STREAM_Socket *socket,
1075 struct GNUNET_MESH_Tunnel *tunnel,
1076 const struct GNUNET_PeerIdentity *sender,
1077 const struct GNUNET_STREAM_DataMessage *msg,
1078 const struct GNUNET_ATS_Information*atsi)
1080 const void *payload;
1081 struct GNUNET_TIME_Relative ack_deadline_rel;
1082 uint32_t bytes_needed;
1083 uint32_t relative_offset;
1084 uint32_t relative_sequence_number;
1087 size = htons (msg->header.header.size);
1088 if (size < sizeof (struct GNUNET_STREAM_DataMessage))
1090 GNUNET_break_op (0);
1091 return GNUNET_SYSERR;
1093 if (0 != memcmp (sender, &socket->other_peer,
1094 sizeof (struct GNUNET_PeerIdentity)))
1096 LOG (GNUNET_ERROR_TYPE_DEBUG,
1097 "%s: Received DATA from non-confirming peer\n",
1098 GNUNET_i2s (&socket->other_peer));
1101 switch (socket->state)
1103 case STATE_ESTABLISHED:
1104 case STATE_TRANSMIT_CLOSED:
1105 case STATE_TRANSMIT_CLOSE_WAIT:
1106 /* check if the message's sequence number is in the range we are
1108 relative_sequence_number =
1109 ntohl (msg->sequence_number) - socket->read_sequence_number;
1110 if ( relative_sequence_number > GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
1112 LOG (GNUNET_ERROR_TYPE_DEBUG,
1113 "%s: Ignoring received message with sequence number %u\n",
1114 GNUNET_i2s (&socket->other_peer),
1115 ntohl (msg->sequence_number));
1116 /* Start ACK sending task if one is not already present */
1117 if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1119 socket->ack_task_id =
1120 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1121 (msg->ack_deadline),
1127 /* Check if we have already seen this message */
1128 if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
1129 relative_sequence_number))
1131 LOG (GNUNET_ERROR_TYPE_DEBUG,
1132 "%s: Ignoring already received message with sequence number %u\n",
1133 GNUNET_i2s (&socket->other_peer),
1134 ntohl (msg->sequence_number));
1135 /* Start ACK sending task if one is not already present */
1136 if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1138 socket->ack_task_id =
1139 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1140 (msg->ack_deadline), &ack_task, socket);
1144 LOG (GNUNET_ERROR_TYPE_DEBUG,
1145 "%s: Receiving DATA with sequence number: %u and size: %d from %s\n",
1146 GNUNET_i2s (&socket->other_peer), ntohl (msg->sequence_number),
1147 ntohs (msg->header.header.size), GNUNET_i2s (&socket->other_peer));
1148 /* Check if we have to allocate the buffer */
1149 size -= sizeof (struct GNUNET_STREAM_DataMessage);
1150 relative_offset = ntohl (msg->offset) - socket->read_offset;
1151 bytes_needed = relative_offset + size;
1152 if (bytes_needed > socket->receive_buffer_size)
1154 if (bytes_needed <= RECEIVE_BUFFER_SIZE)
1156 socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
1158 socket->receive_buffer_size = bytes_needed;
1162 LOG (GNUNET_ERROR_TYPE_DEBUG,
1163 "%s: Cannot accommodate packet %d as buffer is full\n",
1164 GNUNET_i2s (&socket->other_peer), ntohl (msg->sequence_number));
1168 /* Copy Data to buffer */
1170 GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
1171 memcpy (socket->receive_buffer + relative_offset, payload, size);
1172 socket->receive_buffer_boundaries[relative_sequence_number] =
1173 relative_offset + size;
1174 /* Modify the ACK bitmap */
1175 ackbitmap_modify_bit (&socket->ack_bitmap, relative_sequence_number,
1177 /* Start ACK sending task if one is not already present */
1178 ack_deadline_rel = GNUNET_TIME_relative_ntoh (msg->ack_deadline);
1179 if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1182 GNUNET_TIME_relative_min (ack_deadline_rel,
1183 GNUNET_TIME_relative_multiply
1184 (GNUNET_TIME_UNIT_SECONDS, 300));
1185 socket->ack_task_id =
1186 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1187 (msg->ack_deadline), &ack_task, socket);
1188 socket->ack_time_registered = GNUNET_TIME_absolute_get ();
1189 socket->ack_time_deadline = ack_deadline_rel;
1193 struct GNUNET_TIME_Relative ack_time_past;
1194 struct GNUNET_TIME_Relative ack_time_remaining;
1195 struct GNUNET_TIME_Relative ack_time_min;
1197 GNUNET_TIME_absolute_get_duration (socket->ack_time_registered);
1198 ack_time_remaining = GNUNET_TIME_relative_subtract
1199 (socket->ack_time_deadline, ack_time_past);
1200 ack_time_min = GNUNET_TIME_relative_min (ack_time_remaining,
1202 if (0 == memcmp(&ack_deadline_rel, &ack_time_min,
1203 sizeof (struct GNUNET_TIME_Relative)))
1205 ack_deadline_rel = ack_time_min;
1206 GNUNET_SCHEDULER_cancel (socket->ack_task_id);
1207 socket->ack_task_id = GNUNET_SCHEDULER_add_delayed (ack_deadline_rel,
1209 socket->ack_time_registered = GNUNET_TIME_absolute_get ();
1210 socket->ack_time_deadline = ack_deadline_rel;
1213 if ((NULL != socket->read_handle) /* A read handle is waiting */
1214 /* There is no current read task */
1215 && (GNUNET_SCHEDULER_NO_TASK == socket->read_handle->read_task_id)
1216 /* We have the first packet */
1217 && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap, 0)))
1219 LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Scheduling read processor\n",
1220 GNUNET_i2s (&socket->other_peer));
1221 socket->read_handle->read_task_id =
1222 GNUNET_SCHEDULER_add_now (&call_read_processor, socket);
1226 LOG (GNUNET_ERROR_TYPE_DEBUG,
1227 "%s: Received data message when it cannot be handled\n",
1228 GNUNET_i2s (&socket->other_peer));
1236 * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1238 * @param cls the socket (set from GNUNET_MESH_connect)
1239 * @param tunnel connection to the other end
1240 * @param tunnel_ctx place to store local state associated with the tunnel
1241 * @param sender who sent the message
1242 * @param message the actual message
1243 * @param atsi performance data for the connection
1244 * @return GNUNET_OK to keep the connection open,
1245 * GNUNET_SYSERR to close it (signal serious error)
1248 client_handle_data (void *cls,
1249 struct GNUNET_MESH_Tunnel *tunnel,
1251 const struct GNUNET_PeerIdentity *sender,
1252 const struct GNUNET_MessageHeader *message,
1253 const struct GNUNET_ATS_Information*atsi)
1255 struct GNUNET_STREAM_Socket *socket = cls;
1257 return handle_data (socket, tunnel, sender,
1258 (const struct GNUNET_STREAM_DataMessage *) message, atsi);
1263 * Callback to set state to ESTABLISHED
1265 * @param cls the closure NULL;
1266 * @param socket the socket to requiring state change
1269 set_state_established (void *cls,
1270 struct GNUNET_STREAM_Socket *socket)
1272 LOG (GNUNET_ERROR_TYPE_DEBUG,
1273 "%s: Attaining ESTABLISHED state\n",
1274 GNUNET_i2s (&socket->other_peer));
1275 socket->write_offset = 0;
1276 socket->read_offset = 0;
1277 socket->state = STATE_ESTABLISHED;
1278 GNUNET_assert (GNUNET_SCHEDULER_NO_TASK !=
1279 socket->control_retransmission_task_id);
1280 GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
1281 socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
1282 if (NULL != socket->lsocket)
1284 LOG (GNUNET_ERROR_TYPE_DEBUG,
1285 "%s: Calling listen callback\n",
1286 GNUNET_i2s (&socket->other_peer));
1287 if (GNUNET_SYSERR ==
1288 socket->lsocket->listen_cb (socket->lsocket->listen_cb_cls,
1290 &socket->other_peer))
1292 socket->state = STATE_CLOSED;
1293 /* FIXME: We should close in a decent way (send RST) */
1294 GNUNET_MESH_tunnel_destroy (socket->tunnel); /* Destroy the tunnel */
1295 GNUNET_free (socket);
1299 socket->open_cb (socket->open_cls, socket);
1304 * Callback to set state to HELLO_WAIT
1306 * @param cls the closure from queue_message
1307 * @param socket the socket to requiring state change
1310 set_state_hello_wait (void *cls,
1311 struct GNUNET_STREAM_Socket *socket)
1313 GNUNET_assert (STATE_INIT == socket->state);
1314 LOG (GNUNET_ERROR_TYPE_DEBUG,
1315 "%s: Attaining HELLO_WAIT state\n",
1316 GNUNET_i2s (&socket->other_peer));
1317 socket->state = STATE_HELLO_WAIT;
1322 * Callback to set state to CLOSE_WAIT
1324 * @param cls the closure from queue_message
1325 * @param socket the socket requiring state change
1328 set_state_close_wait (void *cls,
1329 struct GNUNET_STREAM_Socket *socket)
1331 LOG (GNUNET_ERROR_TYPE_DEBUG,
1332 "%s: Attaing CLOSE_WAIT state\n",
1333 GNUNET_i2s (&socket->other_peer));
1334 socket->state = STATE_CLOSE_WAIT;
1335 GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1336 socket->receive_buffer = NULL;
1337 socket->receive_buffer_size = 0;
1342 * Callback to set state to RECEIVE_CLOSE_WAIT
1344 * @param cls the closure from queue_message
1345 * @param socket the socket requiring state change
1348 set_state_receive_close_wait (void *cls,
1349 struct GNUNET_STREAM_Socket *socket)
1351 LOG (GNUNET_ERROR_TYPE_DEBUG,
1352 "%s: Attaing RECEIVE_CLOSE_WAIT state\n",
1353 GNUNET_i2s (&socket->other_peer));
1354 socket->state = STATE_RECEIVE_CLOSE_WAIT;
1355 GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1356 socket->receive_buffer = NULL;
1357 socket->receive_buffer_size = 0;
1362 * Callback to set state to TRANSMIT_CLOSE_WAIT
1364 * @param cls the closure from queue_message
1365 * @param socket the socket requiring state change
1368 set_state_transmit_close_wait (void *cls,
1369 struct GNUNET_STREAM_Socket *socket)
1371 LOG (GNUNET_ERROR_TYPE_DEBUG,
1372 "%s: Attaing TRANSMIT_CLOSE_WAIT state\n",
1373 GNUNET_i2s (&socket->other_peer));
1374 socket->state = STATE_TRANSMIT_CLOSE_WAIT;
1379 * Callback to set state to CLOSED
1381 * @param cls the closure from queue_message
1382 * @param socket the socket requiring state change
1385 set_state_closed (void *cls,
1386 struct GNUNET_STREAM_Socket *socket)
1388 socket->state = STATE_CLOSED;
1393 * Returns GNUNET_MESSAGE_TYPE_STREAM_HELLO
1395 * @return the generate hello message
1397 static struct GNUNET_STREAM_MessageHeader *
1398 generate_hello (void)
1400 struct GNUNET_STREAM_MessageHeader *msg;
1402 msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1403 msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
1404 msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1410 * Returns a new HelloAckMessage. Also sets the write sequence number for the
1413 * @param socket the socket for which this HelloAckMessage has to be generated
1414 * @param generate_seq GNUNET_YES to generate the write sequence number,
1415 * GNUNET_NO to use the existing sequence number
1416 * @return the HelloAckMessage
1418 static struct GNUNET_STREAM_HelloAckMessage *
1419 generate_hello_ack (struct GNUNET_STREAM_Socket *socket,
1422 struct GNUNET_STREAM_HelloAckMessage *msg;
1424 if (GNUNET_YES == generate_seq)
1426 if (GNUNET_YES == socket->testing_active)
1427 socket->write_sequence_number =
1428 socket->testing_set_write_sequence_number_value;
1430 socket->write_sequence_number =
1431 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1432 LOG_DEBUG ("%s: write sequence number %u\n",
1433 GNUNET_i2s (&socket->other_peer),
1434 (unsigned int) socket->write_sequence_number);
1436 msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1437 msg->header.header.size =
1438 htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1439 msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1440 msg->sequence_number = htonl (socket->write_sequence_number);
1441 msg->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
1447 * Task for retransmitting control messages if they aren't ACK'ed before a
1450 * @param cls the socket
1451 * @param tc the Task context
1454 control_retransmission_task (void *cls,
1455 const struct GNUNET_SCHEDULER_TaskContext *tc)
1457 struct GNUNET_STREAM_Socket *socket = cls;
1459 socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
1460 if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
1462 LOG_DEBUG ("%s: Retransmitting a control message\n",
1463 GNUNET_i2s (&socket->other_peer));
1464 switch (socket->state)
1472 case STATE_HELLO_WAIT:
1473 if (NULL == socket->lsocket) /* We are client */
1474 queue_message (socket, generate_hello (), NULL, NULL, GNUNET_NO);
1476 queue_message (socket,
1477 (struct GNUNET_STREAM_MessageHeader *)
1478 generate_hello_ack (socket, GNUNET_NO), NULL, NULL,
1480 socket->control_retransmission_task_id =
1481 GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
1482 &control_retransmission_task, socket);
1484 case STATE_ESTABLISHED:
1485 if (NULL == socket->lsocket)
1486 queue_message (socket,
1487 (struct GNUNET_STREAM_MessageHeader *)
1488 generate_hello_ack (socket, GNUNET_NO), NULL, NULL,
1499 * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1501 * @param cls the socket (set from GNUNET_MESH_connect)
1502 * @param tunnel connection to the other end
1503 * @param tunnel_ctx this is NULL
1504 * @param sender who sent the message
1505 * @param message the actual message
1506 * @param atsi performance data for the connection
1507 * @return GNUNET_OK to keep the connection open,
1508 * GNUNET_SYSERR to close it (signal serious error)
1511 client_handle_hello_ack (void *cls,
1512 struct GNUNET_MESH_Tunnel *tunnel,
1514 const struct GNUNET_PeerIdentity *sender,
1515 const struct GNUNET_MessageHeader *message,
1516 const struct GNUNET_ATS_Information*atsi)
1518 struct GNUNET_STREAM_Socket *socket = cls;
1519 const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
1520 struct GNUNET_STREAM_HelloAckMessage *reply;
1522 if (0 != memcmp (sender, &socket->other_peer,
1523 sizeof (struct GNUNET_PeerIdentity)))
1525 LOG (GNUNET_ERROR_TYPE_DEBUG,
1526 "%s: Received HELLO_ACK from non-confirming peer\n",
1527 GNUNET_i2s (&socket->other_peer));
1530 ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
1531 LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received HELLO_ACK from %s\n",
1532 GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1533 GNUNET_assert (socket->tunnel == tunnel);
1534 switch (socket->state)
1536 case STATE_HELLO_WAIT:
1537 socket->read_sequence_number = ntohl (ack_msg->sequence_number);
1538 LOG (GNUNET_ERROR_TYPE_DEBUG,
1539 "%s: Read sequence number %u\n",
1540 GNUNET_i2s (&socket->other_peer),
1541 (unsigned int) socket->read_sequence_number);
1542 socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
1543 reply = generate_hello_ack (socket, GNUNET_YES);
1544 queue_message (socket, &reply->header, &set_state_established,
1547 case STATE_ESTABLISHED:
1548 // call statistics (# ACKs ignored++)
1549 GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
1550 socket->control_retransmission_task_id);
1551 socket->control_retransmission_task_id =
1552 GNUNET_SCHEDULER_add_now (&control_retransmission_task, socket);
1555 LOG_DEBUG ("%s: Server %s sent HELLO_ACK when in state %d\n",
1556 GNUNET_i2s (&socket->other_peer),
1557 GNUNET_i2s (&socket->other_peer), socket->state);
1558 socket->state = STATE_CLOSED; // introduce STATE_ERROR?
1559 return GNUNET_SYSERR;
1565 * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1567 * @param cls the socket (set from GNUNET_MESH_connect)
1568 * @param tunnel connection to the other end
1569 * @param tunnel_ctx this is NULL
1570 * @param sender who sent the message
1571 * @param message the actual message
1572 * @param atsi performance data for the connection
1573 * @return GNUNET_OK to keep the connection open,
1574 * GNUNET_SYSERR to close it (signal serious error)
1577 client_handle_reset (void *cls,
1578 struct GNUNET_MESH_Tunnel *tunnel,
1580 const struct GNUNET_PeerIdentity *sender,
1581 const struct GNUNET_MessageHeader *message,
1582 const struct GNUNET_ATS_Information*atsi)
1584 // struct GNUNET_STREAM_Socket *socket = cls;
1591 * Common message handler for handling TRANSMIT_CLOSE messages
1593 * @param socket the socket through which the ack was received
1594 * @param tunnel connection to the other end
1595 * @param sender who sent the message
1596 * @param msg the transmit close message
1597 * @param atsi performance data for the connection
1598 * @return GNUNET_OK to keep the connection open,
1599 * GNUNET_SYSERR to close it (signal serious error)
1602 handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
1603 struct GNUNET_MESH_Tunnel *tunnel,
1604 const struct GNUNET_PeerIdentity *sender,
1605 const struct GNUNET_STREAM_MessageHeader *msg,
1606 const struct GNUNET_ATS_Information*atsi)
1608 struct GNUNET_STREAM_MessageHeader *reply;
1610 switch (socket->state)
1612 case STATE_ESTABLISHED:
1613 socket->state = STATE_RECEIVE_CLOSED;
1614 /* Send TRANSMIT_CLOSE_ACK */
1615 reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1616 reply->header.type =
1617 htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
1618 reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1619 queue_message (socket, reply, NULL, NULL, GNUNET_NO);
1622 /* FIXME: Call statistics? */
1630 * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1632 * @param cls the socket (set from GNUNET_MESH_connect)
1633 * @param tunnel connection to the other end
1634 * @param tunnel_ctx this is NULL
1635 * @param sender who sent the message
1636 * @param message the actual message
1637 * @param atsi performance data for the connection
1638 * @return GNUNET_OK to keep the connection open,
1639 * GNUNET_SYSERR to close it (signal serious error)
1642 client_handle_transmit_close (void *cls,
1643 struct GNUNET_MESH_Tunnel *tunnel,
1645 const struct GNUNET_PeerIdentity *sender,
1646 const struct GNUNET_MessageHeader *message,
1647 const struct GNUNET_ATS_Information*atsi)
1649 struct GNUNET_STREAM_Socket *socket = cls;
1651 return handle_transmit_close (socket,
1654 (struct GNUNET_STREAM_MessageHeader *)message,
1660 * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_*_CLOSE_ACK messages
1662 * @param socket the socket
1663 * @param tunnel connection to the other end
1664 * @param sender who sent the message
1665 * @param message the actual message
1666 * @param atsi performance data for the connection
1667 * @param operation the close operation which is being ACK'ed
1668 * @return GNUNET_OK to keep the connection open,
1669 * GNUNET_SYSERR to close it (signal serious error)
1672 handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
1673 struct GNUNET_MESH_Tunnel *tunnel,
1674 const struct GNUNET_PeerIdentity *sender,
1675 const struct GNUNET_STREAM_MessageHeader *message,
1676 const struct GNUNET_ATS_Information *atsi,
1679 struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
1681 shutdown_handle = socket->shutdown_handle;
1682 if (NULL == shutdown_handle)
1684 LOG (GNUNET_ERROR_TYPE_DEBUG,
1685 "%s: Received CLOSE_ACK when shutdown handle is NULL\n",
1686 GNUNET_i2s (&socket->other_peer));
1692 switch (socket->state)
1694 case STATE_CLOSE_WAIT:
1695 if (SHUT_RDWR != shutdown_handle->operation)
1697 LOG (GNUNET_ERROR_TYPE_DEBUG,
1698 "%s: Received CLOSE_ACK when shutdown handle is not for "
1699 "SHUT_RDWR\n", GNUNET_i2s (&socket->other_peer));
1702 LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received CLOSE_ACK from %s\n",
1703 GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1704 socket->state = STATE_CLOSED;
1707 LOG (GNUNET_ERROR_TYPE_DEBUG,
1708 "%s: Received CLOSE_ACK when in it not expected\n",
1709 GNUNET_i2s (&socket->other_peer));
1714 switch (socket->state)
1716 case STATE_RECEIVE_CLOSE_WAIT:
1717 if (SHUT_RD != shutdown_handle->operation)
1719 LOG (GNUNET_ERROR_TYPE_DEBUG,
1720 "%s: Received RECEIVE_CLOSE_ACK when shutdown handle "
1721 "is not for SHUT_RD\n", GNUNET_i2s (&socket->other_peer));
1724 LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received RECEIVE_CLOSE_ACK from %s\n",
1725 GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1726 socket->state = STATE_RECEIVE_CLOSED;
1729 LOG (GNUNET_ERROR_TYPE_DEBUG,
1730 "%s: Received RECEIVE_CLOSE_ACK when in it not expected\n",
1731 GNUNET_i2s (&socket->other_peer));
1736 switch (socket->state)
1738 case STATE_TRANSMIT_CLOSE_WAIT:
1739 if (SHUT_WR != shutdown_handle->operation)
1741 LOG (GNUNET_ERROR_TYPE_DEBUG,
1742 "%s: Received TRANSMIT_CLOSE_ACK when shutdown handle "
1743 "is not for SHUT_WR\n",
1744 GNUNET_i2s (&socket->other_peer));
1747 LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received TRANSMIT_CLOSE_ACK from %s\n",
1748 GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1749 socket->state = STATE_TRANSMIT_CLOSED;
1752 LOG (GNUNET_ERROR_TYPE_DEBUG,
1753 "%s: Received TRANSMIT_CLOSE_ACK when in it not expected\n",
1754 GNUNET_i2s (&socket->other_peer));
1761 if (NULL != shutdown_handle->completion_cb) /* Shutdown completion */
1762 shutdown_handle->completion_cb(shutdown_handle->completion_cls,
1764 if (GNUNET_SCHEDULER_NO_TASK
1765 != shutdown_handle->close_msg_retransmission_task_id)
1767 GNUNET_SCHEDULER_cancel
1768 (shutdown_handle->close_msg_retransmission_task_id);
1769 shutdown_handle->close_msg_retransmission_task_id =
1770 GNUNET_SCHEDULER_NO_TASK;
1772 GNUNET_free (shutdown_handle); /* Free shutdown handle */
1773 socket->shutdown_handle = NULL;
1779 * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1781 * @param cls the socket (set from GNUNET_MESH_connect)
1782 * @param tunnel connection to the other end
1783 * @param tunnel_ctx this is NULL
1784 * @param sender who sent the message
1785 * @param message the actual message
1786 * @param atsi performance data for the connection
1787 * @return GNUNET_OK to keep the connection open,
1788 * GNUNET_SYSERR to close it (signal serious error)
1791 client_handle_transmit_close_ack (void *cls,
1792 struct GNUNET_MESH_Tunnel *tunnel,
1794 const struct GNUNET_PeerIdentity *sender,
1795 const struct GNUNET_MessageHeader *message,
1796 const struct GNUNET_ATS_Information*atsi)
1798 struct GNUNET_STREAM_Socket *socket = cls;
1800 return handle_generic_close_ack (socket,
1803 (const struct GNUNET_STREAM_MessageHeader *)
1811 * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1813 * @param socket the socket
1814 * @param tunnel connection to the other end
1815 * @param sender who sent the message
1816 * @param message the actual message
1817 * @param atsi performance data for the connection
1818 * @return GNUNET_OK to keep the connection open,
1819 * GNUNET_SYSERR to close it (signal serious error)
1822 handle_receive_close (struct GNUNET_STREAM_Socket *socket,
1823 struct GNUNET_MESH_Tunnel *tunnel,
1824 const struct GNUNET_PeerIdentity *sender,
1825 const struct GNUNET_STREAM_MessageHeader *message,
1826 const struct GNUNET_ATS_Information *atsi)
1828 struct GNUNET_STREAM_MessageHeader *receive_close_ack;
1830 switch (socket->state)
1834 case STATE_HELLO_WAIT:
1835 LOG (GNUNET_ERROR_TYPE_DEBUG,
1836 "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
1837 GNUNET_i2s (&socket->other_peer));
1843 LOG (GNUNET_ERROR_TYPE_DEBUG,
1844 "%s: Received RECEIVE_CLOSE from %s\n",
1845 GNUNET_i2s (&socket->other_peer),
1846 GNUNET_i2s (&socket->other_peer));
1848 GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1849 receive_close_ack->header.size =
1850 htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1851 receive_close_ack->header.type =
1852 htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK);
1853 queue_message (socket, receive_close_ack, &set_state_closed,
1855 /* FIXME: Handle the case where write handle is present; the write operation
1856 should be deemed as finised and the write continuation callback
1857 has to be called with the stream status GNUNET_STREAM_SHUTDOWN */
1863 * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1865 * @param cls the socket (set from GNUNET_MESH_connect)
1866 * @param tunnel connection to the other end
1867 * @param tunnel_ctx this is NULL
1868 * @param sender who sent the message
1869 * @param message the actual message
1870 * @param atsi performance data for the connection
1871 * @return GNUNET_OK to keep the connection open,
1872 * GNUNET_SYSERR to close it (signal serious error)
1875 client_handle_receive_close (void *cls,
1876 struct GNUNET_MESH_Tunnel *tunnel,
1878 const struct GNUNET_PeerIdentity *sender,
1879 const struct GNUNET_MessageHeader *message,
1880 const struct GNUNET_ATS_Information*atsi)
1882 struct GNUNET_STREAM_Socket *socket = cls;
1885 handle_receive_close (socket,
1888 (const struct GNUNET_STREAM_MessageHeader *) message,
1894 * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1896 * @param cls the socket (set from GNUNET_MESH_connect)
1897 * @param tunnel connection to the other end
1898 * @param tunnel_ctx this is NULL
1899 * @param sender who sent the message
1900 * @param message the actual message
1901 * @param atsi performance data for the connection
1902 * @return GNUNET_OK to keep the connection open,
1903 * GNUNET_SYSERR to close it (signal serious error)
1906 client_handle_receive_close_ack (void *cls,
1907 struct GNUNET_MESH_Tunnel *tunnel,
1909 const struct GNUNET_PeerIdentity *sender,
1910 const struct GNUNET_MessageHeader *message,
1911 const struct GNUNET_ATS_Information*atsi)
1913 struct GNUNET_STREAM_Socket *socket = cls;
1915 return handle_generic_close_ack (socket,
1918 (const struct GNUNET_STREAM_MessageHeader *)
1926 * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1928 * @param socket the socket
1929 * @param tunnel connection to the other end
1930 * @param sender who sent the message
1931 * @param message the actual message
1932 * @param atsi performance data for the connection
1933 * @return GNUNET_OK to keep the connection open,
1934 * GNUNET_SYSERR to close it (signal serious error)
1937 handle_close (struct GNUNET_STREAM_Socket *socket,
1938 struct GNUNET_MESH_Tunnel *tunnel,
1939 const struct GNUNET_PeerIdentity *sender,
1940 const struct GNUNET_STREAM_MessageHeader *message,
1941 const struct GNUNET_ATS_Information*atsi)
1943 struct GNUNET_STREAM_MessageHeader *close_ack;
1945 switch (socket->state)
1949 case STATE_HELLO_WAIT:
1950 LOG (GNUNET_ERROR_TYPE_DEBUG,
1951 "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
1952 GNUNET_i2s (&socket->other_peer));
1958 LOG (GNUNET_ERROR_TYPE_DEBUG,
1959 "%s: Received CLOSE from %s\n",
1960 GNUNET_i2s (&socket->other_peer),
1961 GNUNET_i2s (&socket->other_peer));
1962 close_ack = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1963 close_ack->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1964 close_ack->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK);
1965 queue_message (socket, close_ack, &set_state_closed, NULL, GNUNET_NO);
1966 if (socket->state == STATE_CLOSED)
1969 GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1970 socket->receive_buffer = NULL;
1971 socket->receive_buffer_size = 0;
1977 * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1979 * @param cls the socket (set from GNUNET_MESH_connect)
1980 * @param tunnel connection to the other end
1981 * @param tunnel_ctx this is NULL
1982 * @param sender who sent the message
1983 * @param message the actual message
1984 * @param atsi performance data for the connection
1985 * @return GNUNET_OK to keep the connection open,
1986 * GNUNET_SYSERR to close it (signal serious error)
1989 client_handle_close (void *cls,
1990 struct GNUNET_MESH_Tunnel *tunnel,
1992 const struct GNUNET_PeerIdentity *sender,
1993 const struct GNUNET_MessageHeader *message,
1994 const struct GNUNET_ATS_Information*atsi)
1996 struct GNUNET_STREAM_Socket *socket = cls;
1998 return handle_close (socket,
2001 (const struct GNUNET_STREAM_MessageHeader *) message,
2007 * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
2009 * @param cls the socket (set from GNUNET_MESH_connect)
2010 * @param tunnel connection to the other end
2011 * @param tunnel_ctx this is NULL
2012 * @param sender who sent the message
2013 * @param message the actual message
2014 * @param atsi performance data for the connection
2015 * @return GNUNET_OK to keep the connection open,
2016 * GNUNET_SYSERR to close it (signal serious error)
2019 client_handle_close_ack (void *cls,
2020 struct GNUNET_MESH_Tunnel *tunnel,
2022 const struct GNUNET_PeerIdentity *sender,
2023 const struct GNUNET_MessageHeader *message,
2024 const struct GNUNET_ATS_Information *atsi)
2026 struct GNUNET_STREAM_Socket *socket = cls;
2028 return handle_generic_close_ack (socket,
2031 (const struct GNUNET_STREAM_MessageHeader *)
2037 /*****************************/
2038 /* Server's Message Handlers */
2039 /*****************************/
2042 * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
2044 * @param cls the closure
2045 * @param tunnel connection to the other end
2046 * @param tunnel_ctx the socket
2047 * @param sender who sent the message
2048 * @param message the actual message
2049 * @param atsi performance data for the connection
2050 * @return GNUNET_OK to keep the connection open,
2051 * GNUNET_SYSERR to close it (signal serious error)
2054 server_handle_data (void *cls,
2055 struct GNUNET_MESH_Tunnel *tunnel,
2057 const struct GNUNET_PeerIdentity *sender,
2058 const struct GNUNET_MessageHeader *message,
2059 const struct GNUNET_ATS_Information*atsi)
2061 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2063 return handle_data (socket,
2066 (const struct GNUNET_STREAM_DataMessage *)message,
2072 * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
2074 * @param cls the closure
2075 * @param tunnel connection to the other end
2076 * @param tunnel_ctx the socket
2077 * @param sender who sent the message
2078 * @param message the actual message
2079 * @param atsi performance data for the connection
2080 * @return GNUNET_OK to keep the connection open,
2081 * GNUNET_SYSERR to close it (signal serious error)
2084 server_handle_hello (void *cls,
2085 struct GNUNET_MESH_Tunnel *tunnel,
2087 const struct GNUNET_PeerIdentity *sender,
2088 const struct GNUNET_MessageHeader *message,
2089 const struct GNUNET_ATS_Information*atsi)
2091 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2092 struct GNUNET_STREAM_HelloAckMessage *reply;
2094 if (0 != memcmp (sender,
2095 &socket->other_peer,
2096 sizeof (struct GNUNET_PeerIdentity)))
2098 LOG_DEBUG ("%s: Received HELLO from non-confirming peer\n",
2099 GNUNET_i2s (&socket->other_peer));
2102 GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == ntohs (message->type));
2103 GNUNET_assert (socket->tunnel == tunnel);
2104 LOG_DEBUG ("%s: Received HELLO from %s\n", GNUNET_i2s (&socket->other_peer),
2105 GNUNET_i2s (&socket->other_peer));
2106 switch (socket->status)
2109 reply = generate_hello_ack (socket, GNUNET_YES);
2110 queue_message (socket, &reply->header, &set_state_hello_wait, NULL,
2112 GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
2113 socket->control_retransmission_task_id);
2114 socket->control_retransmission_task_id =
2115 GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
2116 &control_retransmission_task, socket);
2118 case STATE_HELLO_WAIT:
2119 /* Perhaps our HELLO_ACK was lost */
2120 GNUNET_assert (GNUNET_SCHEDULER_NO_TASK !=
2121 socket->control_retransmission_task_id);
2122 GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
2123 socket->control_retransmission_task_id =
2124 GNUNET_SCHEDULER_add_now (&control_retransmission_task, socket);
2127 LOG_DEBUG( "%s: Client sent HELLO when in state %d\n",
2128 GNUNET_i2s (&socket->other_peer), socket->state);
2129 /* FIXME: Send RESET? */
2136 * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
2138 * @param cls the closure
2139 * @param tunnel connection to the other end
2140 * @param tunnel_ctx the socket
2141 * @param sender who sent the message
2142 * @param message the actual message
2143 * @param atsi performance data for the connection
2144 * @return GNUNET_OK to keep the connection open,
2145 * GNUNET_SYSERR to close it (signal serious error)
2148 server_handle_hello_ack (void *cls,
2149 struct GNUNET_MESH_Tunnel *tunnel,
2151 const struct GNUNET_PeerIdentity *sender,
2152 const struct GNUNET_MessageHeader *message,
2153 const struct GNUNET_ATS_Information*atsi)
2155 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2156 const struct GNUNET_STREAM_HelloAckMessage *ack_message;
2158 GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK ==
2159 ntohs (message->type));
2160 GNUNET_assert (socket->tunnel == tunnel);
2161 ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
2162 switch (socket->state)
2164 case STATE_HELLO_WAIT:
2165 LOG (GNUNET_ERROR_TYPE_DEBUG,
2166 "%s: Received HELLO_ACK from %s\n",
2167 GNUNET_i2s (&socket->other_peer),
2168 GNUNET_i2s (&socket->other_peer));
2169 socket->read_sequence_number = ntohl (ack_message->sequence_number);
2170 LOG (GNUNET_ERROR_TYPE_DEBUG,
2171 "%s: Read sequence number %u\n",
2172 GNUNET_i2s (&socket->other_peer),
2173 (unsigned int) socket->read_sequence_number);
2174 socket->receiver_window_available =
2175 ntohl (ack_message->receiver_window_size);
2176 set_state_established (NULL, socket);
2179 LOG (GNUNET_ERROR_TYPE_DEBUG,
2180 "Client sent HELLO_ACK when in state %d\n", socket->state);
2187 * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
2189 * @param cls the closure
2190 * @param tunnel connection to the other end
2191 * @param tunnel_ctx the socket
2192 * @param sender who sent the message
2193 * @param message the actual message
2194 * @param atsi performance data for the connection
2195 * @return GNUNET_OK to keep the connection open,
2196 * GNUNET_SYSERR to close it (signal serious error)
2199 server_handle_reset (void *cls,
2200 struct GNUNET_MESH_Tunnel *tunnel,
2202 const struct GNUNET_PeerIdentity *sender,
2203 const struct GNUNET_MessageHeader *message,
2204 const struct GNUNET_ATS_Information*atsi)
2206 // struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2213 * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
2215 * @param cls the closure
2216 * @param tunnel connection to the other end
2217 * @param tunnel_ctx the socket
2218 * @param sender who sent the message
2219 * @param message the actual message
2220 * @param atsi performance data for the connection
2221 * @return GNUNET_OK to keep the connection open,
2222 * GNUNET_SYSERR to close it (signal serious error)
2225 server_handle_transmit_close (void *cls,
2226 struct GNUNET_MESH_Tunnel *tunnel,
2228 const struct GNUNET_PeerIdentity *sender,
2229 const struct GNUNET_MessageHeader *message,
2230 const struct GNUNET_ATS_Information*atsi)
2232 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2234 return handle_transmit_close (socket,
2237 (struct GNUNET_STREAM_MessageHeader *)message,
2243 * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
2245 * @param cls the closure
2246 * @param tunnel connection to the other end
2247 * @param tunnel_ctx the socket
2248 * @param sender who sent the message
2249 * @param message the actual message
2250 * @param atsi performance data for the connection
2251 * @return GNUNET_OK to keep the connection open,
2252 * GNUNET_SYSERR to close it (signal serious error)
2255 server_handle_transmit_close_ack (void *cls,
2256 struct GNUNET_MESH_Tunnel *tunnel,
2258 const struct GNUNET_PeerIdentity *sender,
2259 const struct GNUNET_MessageHeader *message,
2260 const struct GNUNET_ATS_Information*atsi)
2262 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2264 return handle_generic_close_ack (socket,
2267 (const struct GNUNET_STREAM_MessageHeader *)
2275 * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
2277 * @param cls the closure
2278 * @param tunnel connection to the other end
2279 * @param tunnel_ctx the socket
2280 * @param sender who sent the message
2281 * @param message the actual message
2282 * @param atsi performance data for the connection
2283 * @return GNUNET_OK to keep the connection open,
2284 * GNUNET_SYSERR to close it (signal serious error)
2287 server_handle_receive_close (void *cls,
2288 struct GNUNET_MESH_Tunnel *tunnel,
2290 const struct GNUNET_PeerIdentity *sender,
2291 const struct GNUNET_MessageHeader *message,
2292 const struct GNUNET_ATS_Information*atsi)
2294 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2297 handle_receive_close (socket,
2300 (const struct GNUNET_STREAM_MessageHeader *) message,
2306 * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
2308 * @param cls the closure
2309 * @param tunnel connection to the other end
2310 * @param tunnel_ctx the socket
2311 * @param sender who sent the message
2312 * @param message the actual message
2313 * @param atsi performance data for the connection
2314 * @return GNUNET_OK to keep the connection open,
2315 * GNUNET_SYSERR to close it (signal serious error)
2318 server_handle_receive_close_ack (void *cls,
2319 struct GNUNET_MESH_Tunnel *tunnel,
2321 const struct GNUNET_PeerIdentity *sender,
2322 const struct GNUNET_MessageHeader *message,
2323 const struct GNUNET_ATS_Information*atsi)
2325 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2327 return handle_generic_close_ack (socket,
2330 (const struct GNUNET_STREAM_MessageHeader *)
2338 * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
2340 * @param cls the listen socket (from GNUNET_MESH_connect in
2341 * GNUNET_STREAM_listen)
2342 * @param tunnel connection to the other end
2343 * @param tunnel_ctx the socket
2344 * @param sender who sent the message
2345 * @param message the actual message
2346 * @param atsi performance data for the connection
2347 * @return GNUNET_OK to keep the connection open,
2348 * GNUNET_SYSERR to close it (signal serious error)
2351 server_handle_close (void *cls,
2352 struct GNUNET_MESH_Tunnel *tunnel,
2354 const struct GNUNET_PeerIdentity *sender,
2355 const struct GNUNET_MessageHeader *message,
2356 const struct GNUNET_ATS_Information*atsi)
2358 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2360 return handle_close (socket,
2363 (const struct GNUNET_STREAM_MessageHeader *) message,
2369 * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
2371 * @param cls the closure
2372 * @param tunnel connection to the other end
2373 * @param tunnel_ctx the socket
2374 * @param sender who sent the message
2375 * @param message the actual message
2376 * @param atsi performance data for the connection
2377 * @return GNUNET_OK to keep the connection open,
2378 * GNUNET_SYSERR to close it (signal serious error)
2381 server_handle_close_ack (void *cls,
2382 struct GNUNET_MESH_Tunnel *tunnel,
2384 const struct GNUNET_PeerIdentity *sender,
2385 const struct GNUNET_MessageHeader *message,
2386 const struct GNUNET_ATS_Information*atsi)
2388 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2390 return handle_generic_close_ack (socket,
2393 (const struct GNUNET_STREAM_MessageHeader *)
2401 * Handler for DATA_ACK messages
2403 * @param socket the socket through which the ack was received
2404 * @param tunnel connection to the other end
2405 * @param sender who sent the message
2406 * @param ack the acknowledgment message
2407 * @param atsi performance data for the connection
2408 * @return GNUNET_OK to keep the connection open,
2409 * GNUNET_SYSERR to close it (signal serious error)
2412 handle_ack (struct GNUNET_STREAM_Socket *socket,
2413 struct GNUNET_MESH_Tunnel *tunnel,
2414 const struct GNUNET_PeerIdentity *sender,
2415 const struct GNUNET_STREAM_AckMessage *ack,
2416 const struct GNUNET_ATS_Information*atsi)
2418 unsigned int packet;
2419 int need_retransmission;
2420 uint32_t sequence_difference;
2422 if (0 != memcmp (sender,
2423 &socket->other_peer,
2424 sizeof (struct GNUNET_PeerIdentity)))
2426 LOG (GNUNET_ERROR_TYPE_DEBUG,
2427 "%s: Received ACK from non-confirming peer\n",
2428 GNUNET_i2s (&socket->other_peer));
2431 switch (socket->state)
2433 case (STATE_ESTABLISHED):
2434 case (STATE_RECEIVE_CLOSED):
2435 case (STATE_RECEIVE_CLOSE_WAIT):
2436 if (NULL == socket->write_handle)
2438 LOG (GNUNET_ERROR_TYPE_DEBUG,
2439 "%s: Received DATA_ACK when write_handle is NULL\n",
2440 GNUNET_i2s (&socket->other_peer));
2443 sequence_difference =
2444 socket->write_sequence_number - ntohl (ack->base_sequence_number);
2445 if (!(sequence_difference <= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
2447 LOG (GNUNET_ERROR_TYPE_DEBUG,
2448 "%s: Received DATA_ACK with unexpected base sequence number\n",
2449 GNUNET_i2s (&socket->other_peer));
2450 LOG (GNUNET_ERROR_TYPE_DEBUG,
2451 "%s: Current write sequence: %u; Ack's base sequence: %u\n",
2452 GNUNET_i2s (&socket->other_peer),
2453 socket->write_sequence_number,
2454 ntohl (ack->base_sequence_number));
2457 /* FIXME: include the case when write_handle is cancelled - ignore the
2459 LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received DATA_ACK from %s\n",
2460 GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
2461 /* Cancel the retransmission task */
2462 if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
2464 GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
2465 socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
2467 for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2469 if (NULL == socket->write_handle->messages[packet]) break;
2470 /* BS: Base sequence from ack; PS: sequence num of current packet */
2471 sequence_difference = ntohl (ack->base_sequence_number)
2472 - ntohl (socket->write_handle->messages[packet]->sequence_number);
2473 if ((0 == sequence_difference) ||
2474 (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH < sequence_difference))
2475 continue; /* The message in our handle is not yet received */
2476 /* case where BS = PS + GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH */
2477 /* sequence_difference <= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH */
2478 ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
2479 packet, GNUNET_YES);
2481 /* Update the receive window remaining
2482 FIXME : Should update with the value from a data ack with greater
2484 socket->receiver_window_available =
2485 ntohl (ack->receive_window_remaining);
2486 /* Check if we have received all acknowledgements */
2487 need_retransmission = GNUNET_NO;
2488 for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2490 if (NULL == socket->write_handle->messages[packet]) break;
2491 if (GNUNET_YES != ackbitmap_is_bit_set
2492 (&socket->write_handle->ack_bitmap,packet))
2494 need_retransmission = GNUNET_YES;
2498 if (GNUNET_YES == need_retransmission)
2500 write_data (socket);
2502 else /* We have to call the write continuation callback now */
2504 struct GNUNET_STREAM_IOWriteHandle *write_handle;
2506 /* Free the packets */
2507 for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2509 GNUNET_free_non_null (socket->write_handle->messages[packet]);
2511 write_handle = socket->write_handle;
2512 socket->write_handle = NULL;
2513 if (NULL != write_handle->write_cont)
2514 write_handle->write_cont (write_handle->write_cont_cls,
2516 write_handle->size);
2517 /* We are done with the write handle - Freeing it */
2518 GNUNET_free (write_handle);
2519 LOG (GNUNET_ERROR_TYPE_DEBUG,
2520 "%s: Write completion callback completed\n",
2521 GNUNET_i2s (&socket->other_peer));
2532 * Handler for DATA_ACK messages
2534 * @param cls the 'struct GNUNET_STREAM_Socket'
2535 * @param tunnel connection to the other end
2536 * @param tunnel_ctx unused
2537 * @param sender who sent the message
2538 * @param message the actual message
2539 * @param atsi performance data for the connection
2540 * @return GNUNET_OK to keep the connection open,
2541 * GNUNET_SYSERR to close it (signal serious error)
2544 client_handle_ack (void *cls,
2545 struct GNUNET_MESH_Tunnel *tunnel,
2547 const struct GNUNET_PeerIdentity *sender,
2548 const struct GNUNET_MessageHeader *message,
2549 const struct GNUNET_ATS_Information*atsi)
2551 struct GNUNET_STREAM_Socket *socket = cls;
2552 const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2554 return handle_ack (socket, tunnel, sender, ack, atsi);
2559 * Handler for DATA_ACK messages
2561 * @param cls the server's listen socket
2562 * @param tunnel connection to the other end
2563 * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
2564 * @param sender who sent the message
2565 * @param message the actual message
2566 * @param atsi performance data for the connection
2567 * @return GNUNET_OK to keep the connection open,
2568 * GNUNET_SYSERR to close it (signal serious error)
2571 server_handle_ack (void *cls,
2572 struct GNUNET_MESH_Tunnel *tunnel,
2574 const struct GNUNET_PeerIdentity *sender,
2575 const struct GNUNET_MessageHeader *message,
2576 const struct GNUNET_ATS_Information*atsi)
2578 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2579 const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2581 return handle_ack (socket, tunnel, sender, ack, atsi);
2586 * For client message handlers, the stream socket is in the
2589 static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
2590 {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2591 {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK,
2592 sizeof (struct GNUNET_STREAM_AckMessage) },
2593 {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2594 sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2595 {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2596 sizeof (struct GNUNET_STREAM_MessageHeader)},
2597 {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2598 sizeof (struct GNUNET_STREAM_MessageHeader)},
2599 {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2600 sizeof (struct GNUNET_STREAM_MessageHeader)},
2601 {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2602 sizeof (struct GNUNET_STREAM_MessageHeader)},
2603 {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2604 sizeof (struct GNUNET_STREAM_MessageHeader)},
2605 {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2606 sizeof (struct GNUNET_STREAM_MessageHeader)},
2607 {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2608 sizeof (struct GNUNET_STREAM_MessageHeader)},
2614 * For server message handlers, the stream socket is in the
2615 * tunnel context, and the listen socket in the closure argument.
2617 static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
2618 {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2619 {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK,
2620 sizeof (struct GNUNET_STREAM_AckMessage) },
2621 {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO,
2622 sizeof (struct GNUNET_STREAM_MessageHeader)},
2623 {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2624 sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2625 {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2626 sizeof (struct GNUNET_STREAM_MessageHeader)},
2627 {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2628 sizeof (struct GNUNET_STREAM_MessageHeader)},
2629 {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2630 sizeof (struct GNUNET_STREAM_MessageHeader)},
2631 {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2632 sizeof (struct GNUNET_STREAM_MessageHeader)},
2633 {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2634 sizeof (struct GNUNET_STREAM_MessageHeader)},
2635 {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2636 sizeof (struct GNUNET_STREAM_MessageHeader)},
2637 {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2638 sizeof (struct GNUNET_STREAM_MessageHeader)},
2644 * Function called when our target peer is connected to our tunnel
2646 * @param cls the socket for which this tunnel is created
2647 * @param peer the peer identity of the target
2648 * @param atsi performance data for the connection
2651 mesh_peer_connect_callback (void *cls,
2652 const struct GNUNET_PeerIdentity *peer,
2653 const struct GNUNET_ATS_Information * atsi)
2655 struct GNUNET_STREAM_Socket *socket = cls;
2656 struct GNUNET_STREAM_MessageHeader *message;
2658 if (0 != memcmp (peer,
2659 &socket->other_peer,
2660 sizeof (struct GNUNET_PeerIdentity)))
2662 LOG (GNUNET_ERROR_TYPE_DEBUG,
2663 "%s: A peer which is not our target has connected to our tunnel\n",
2667 LOG (GNUNET_ERROR_TYPE_DEBUG,
2668 "%s: Target peer %s connected\n",
2669 GNUNET_i2s (&socket->other_peer),
2670 GNUNET_i2s (&socket->other_peer));
2671 /* Set state to INIT */
2672 socket->state = STATE_INIT;
2673 /* Send HELLO message */
2674 message = generate_hello ();
2675 queue_message (socket, message, &set_state_hello_wait, NULL, GNUNET_NO);
2676 GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
2677 socket->control_retransmission_task_id);
2678 socket->control_retransmission_task_id =
2679 GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
2680 &control_retransmission_task, socket);
2685 * Function called when our target peer is disconnected from our tunnel
2687 * @param cls the socket associated which this tunnel
2688 * @param peer the peer identity of the target
2691 mesh_peer_disconnect_callback (void *cls,
2692 const struct GNUNET_PeerIdentity *peer)
2694 struct GNUNET_STREAM_Socket *socket=cls;
2696 /* If the state is SHUTDOWN its ok; else set the state of the socket to SYSERR */
2697 LOG (GNUNET_ERROR_TYPE_DEBUG,
2698 "%s: Other peer %s disconnected \n",
2699 GNUNET_i2s (&socket->other_peer),
2700 GNUNET_i2s (&socket->other_peer));
2705 * Method called whenever a peer creates a tunnel to us
2707 * @param cls closure
2708 * @param tunnel new handle to the tunnel
2709 * @param initiator peer that started the tunnel
2710 * @param atsi performance information for the tunnel
2711 * @return initial tunnel context for the tunnel
2712 * (can be NULL -- that's not an error)
2715 new_tunnel_notify (void *cls,
2716 struct GNUNET_MESH_Tunnel *tunnel,
2717 const struct GNUNET_PeerIdentity *initiator,
2718 const struct GNUNET_ATS_Information *atsi)
2720 struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2721 struct GNUNET_STREAM_Socket *socket;
2723 /* FIXME: If a tunnel is already created, we should not accept new tunnels
2724 from the same peer again until the socket is closed */
2726 if (GNUNET_NO == lsocket->listening)
2728 GNUNET_MESH_tunnel_destroy (tunnel);
2731 socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2732 socket->other_peer = *initiator;
2733 socket->tunnel = tunnel;
2734 socket->state = STATE_INIT;
2735 socket->lsocket = lsocket;
2736 socket->retransmit_timeout = lsocket->retransmit_timeout;
2737 socket->testing_active = lsocket->testing_active;
2738 socket->testing_set_write_sequence_number_value =
2739 lsocket->testing_set_write_sequence_number_value;
2740 socket->max_payload_size = lsocket->max_payload_size;
2741 LOG (GNUNET_ERROR_TYPE_DEBUG,
2742 "%s: Peer %s initiated tunnel to us\n",
2743 GNUNET_i2s (&socket->other_peer),
2744 GNUNET_i2s (&socket->other_peer));
2750 * Function called whenever an inbound tunnel is destroyed. Should clean up
2751 * any associated state. This function is NOT called if the client has
2752 * explicitly asked for the tunnel to be destroyed using
2753 * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
2756 * @param cls closure (set from GNUNET_MESH_connect)
2757 * @param tunnel connection to the other end (henceforth invalid)
2758 * @param tunnel_ctx place where local state associated
2759 * with the tunnel is stored
2762 tunnel_cleaner (void *cls,
2763 const struct GNUNET_MESH_Tunnel *tunnel,
2766 struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
2768 GNUNET_assert (tunnel == socket->tunnel);
2770 LOG (GNUNET_ERROR_TYPE_DEBUG,
2771 "%s: Peer %s has terminated connection abruptly\n",
2772 GNUNET_i2s (&socket->other_peer),
2773 GNUNET_i2s (&socket->other_peer));
2774 socket->status = GNUNET_STREAM_SHUTDOWN;
2775 /* Clear Transmit handles */
2776 if (NULL != socket->transmit_handle)
2778 GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2779 socket->transmit_handle = NULL;
2781 /* Stop Tasks using socket->tunnel */
2782 if (GNUNET_SCHEDULER_NO_TASK != socket->ack_task_id)
2784 GNUNET_SCHEDULER_cancel (socket->ack_task_id);
2785 socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
2787 if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
2789 GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
2790 socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
2792 /* FIXME: Cancel all other tasks using socket->tunnel */
2793 socket->tunnel = NULL;
2798 * Callback to signal timeout on lockmanager lock acquire
2800 * @param cls the ListenSocket
2801 * @param tc the scheduler task context
2804 lockmanager_acquire_timeout (void *cls,
2805 const struct GNUNET_SCHEDULER_TaskContext *tc)
2807 struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2808 GNUNET_STREAM_ListenCallback listen_cb;
2809 void *listen_cb_cls;
2811 lsocket->lockmanager_acquire_timeout_task = GNUNET_SCHEDULER_NO_TASK;
2812 listen_cb = lsocket->listen_cb;
2813 listen_cb_cls = lsocket->listen_cb_cls;
2814 if (NULL != listen_cb)
2815 listen_cb (listen_cb_cls, NULL, NULL);
2820 * Callback to notify us on the status changes on app_port lock
2822 * @param cls the ListenSocket
2823 * @param domain the domain name of the lock
2824 * @param lock the app_port
2825 * @param status the current status of the lock
2828 lock_status_change_cb (void *cls, const char *domain, uint32_t lock,
2829 enum GNUNET_LOCKMANAGER_Status status)
2831 struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2833 GNUNET_assert (lock == (uint32_t) lsocket->port);
2834 if (GNUNET_LOCKMANAGER_SUCCESS == status)
2836 lsocket->listening = GNUNET_YES;
2837 if (GNUNET_SCHEDULER_NO_TASK != lsocket->lockmanager_acquire_timeout_task)
2839 GNUNET_SCHEDULER_cancel (lsocket->lockmanager_acquire_timeout_task);
2840 lsocket->lockmanager_acquire_timeout_task = GNUNET_SCHEDULER_NO_TASK;
2842 if (NULL == lsocket->mesh)
2844 GNUNET_MESH_ApplicationType ports[] = {lsocket->port, 0};
2846 lsocket->mesh = GNUNET_MESH_connect (lsocket->cfg,
2847 lsocket, /* Closure */
2850 server_message_handlers,
2852 GNUNET_assert (NULL != lsocket->mesh);
2853 if (NULL != lsocket->listen_ok_cb)
2855 (void) lsocket->listen_ok_cb ();
2859 if (GNUNET_LOCKMANAGER_RELEASE == status)
2860 lsocket->listening = GNUNET_NO;
2870 * Tries to open a stream to the target peer
2872 * @param cfg configuration to use
2873 * @param target the target peer to which the stream has to be opened
2874 * @param app_port the application port number which uniquely identifies this
2876 * @param open_cb this function will be called after stream has be established;
2878 * @param open_cb_cls the closure for open_cb
2879 * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
2880 * @return if successful it returns the stream socket; NULL if stream cannot be
2883 struct GNUNET_STREAM_Socket *
2884 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
2885 const struct GNUNET_PeerIdentity *target,
2886 GNUNET_MESH_ApplicationType app_port,
2887 GNUNET_STREAM_OpenCallback open_cb,
2891 struct GNUNET_STREAM_Socket *socket;
2892 enum GNUNET_STREAM_Option option;
2893 GNUNET_MESH_ApplicationType ports[] = {app_port, 0};
2895 uint16_t payload_size;
2897 LOG (GNUNET_ERROR_TYPE_DEBUG,
2899 GNUNET_assert (NULL != open_cb);
2900 socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2901 socket->other_peer = *target;
2902 socket->open_cb = open_cb;
2903 socket->open_cls = open_cb_cls;
2905 socket->retransmit_timeout = TIME_REL_SECS (default_timeout);
2906 socket->testing_active = GNUNET_NO;
2907 socket->max_payload_size = DEFAULT_MAX_PAYLOAD_SIZE;
2908 va_start (vargs, open_cb_cls); /* Parse variable args */
2910 option = va_arg (vargs, enum GNUNET_STREAM_Option);
2913 case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
2914 /* Expect struct GNUNET_TIME_Relative */
2915 socket->retransmit_timeout = va_arg (vargs,
2916 struct GNUNET_TIME_Relative);
2918 case GNUNET_STREAM_OPTION_TESTING_SET_WRITE_SEQUENCE_NUMBER:
2919 socket->testing_active = GNUNET_YES;
2920 socket->testing_set_write_sequence_number_value = va_arg (vargs,
2923 case GNUNET_STREAM_OPTION_LISTEN_TIMEOUT:
2924 GNUNET_break (0); /* Option irrelevant in STREAM_open */
2926 case GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS:
2927 GNUNET_break (0); /* Option irrelevant in STREAM_open */
2929 case GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE:
2930 payload_size = (uint16_t) va_arg (vargs, unsigned int);
2931 GNUNET_assert (0 != payload_size);
2932 if (payload_size < socket->max_payload_size)
2933 socket->max_payload_size = payload_size;
2935 case GNUNET_STREAM_OPTION_END:
2938 } while (GNUNET_STREAM_OPTION_END != option);
2939 va_end (vargs); /* End of variable args parsing */
2940 socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
2942 NULL, /* No inbound tunnel handler */
2943 NULL, /* No in-tunnel cleaner */
2944 client_message_handlers,
2945 ports); /* We don't get inbound tunnels */
2946 if (NULL == socket->mesh) /* Fail if we cannot connect to mesh */
2948 GNUNET_free (socket);
2951 /* Now create the mesh tunnel to target */
2952 LOG (GNUNET_ERROR_TYPE_DEBUG, "Creating MESH Tunnel\n");
2953 socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
2954 NULL, /* Tunnel context */
2955 &mesh_peer_connect_callback,
2956 &mesh_peer_disconnect_callback,
2958 GNUNET_assert (NULL != socket->tunnel);
2959 GNUNET_MESH_peer_request_connect_add (socket->tunnel,
2960 &socket->other_peer);
2961 LOG (GNUNET_ERROR_TYPE_DEBUG, "%s() END\n", __func__);
2967 * Shutdown the stream for reading or writing (similar to man 2 shutdown).
2969 * @param socket the stream socket
2970 * @param operation SHUT_RD, SHUT_WR or SHUT_RDWR
2971 * @param completion_cb the callback that will be called upon successful
2972 * shutdown of given operation
2973 * @param completion_cls the closure for the completion callback
2974 * @return the shutdown handle
2976 struct GNUNET_STREAM_ShutdownHandle *
2977 GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
2979 GNUNET_STREAM_ShutdownCompletion completion_cb,
2980 void *completion_cls)
2982 struct GNUNET_STREAM_ShutdownHandle *handle;
2983 struct GNUNET_STREAM_MessageHeader *msg;
2985 GNUNET_assert (NULL == socket->shutdown_handle);
2987 handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ShutdownHandle));
2988 handle->socket = socket;
2989 handle->completion_cb = completion_cb;
2990 handle->completion_cls = completion_cls;
2991 socket->shutdown_handle = handle;
2993 msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
2994 msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
2998 handle->operation = SHUT_RD;
2999 if (NULL != socket->read_handle)
3000 LOG (GNUNET_ERROR_TYPE_WARNING,
3001 "Existing read handle should be cancelled before shutting"
3003 msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
3004 queue_message (socket, msg, &set_state_receive_close_wait, NULL,
3008 handle->operation = SHUT_WR;
3009 if (NULL != socket->write_handle)
3010 LOG (GNUNET_ERROR_TYPE_WARNING,
3011 "Existing write handle should be cancelled before shutting"
3013 msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
3014 queue_message (socket, msg, &set_state_transmit_close_wait, NULL,
3018 handle->operation = SHUT_RDWR;
3019 if (NULL != socket->write_handle)
3020 LOG (GNUNET_ERROR_TYPE_WARNING,
3021 "Existing write handle should be cancelled before shutting"
3023 if (NULL != socket->read_handle)
3024 LOG (GNUNET_ERROR_TYPE_WARNING,
3025 "Existing read handle should be cancelled before shutting"
3027 msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
3028 queue_message (socket, msg, &set_state_close_wait, NULL, GNUNET_NO);
3031 LOG (GNUNET_ERROR_TYPE_WARNING,
3032 "GNUNET_STREAM_shutdown called with invalid value for "
3033 "parameter operation -- Ignoring\n");
3035 GNUNET_free (handle);
3038 handle->close_msg_retransmission_task_id =
3039 GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
3040 &close_msg_retransmission_task,
3047 * Cancels a pending shutdown
3049 * @param handle the shutdown handle returned from GNUNET_STREAM_shutdown
3052 GNUNET_STREAM_shutdown_cancel (struct GNUNET_STREAM_ShutdownHandle *handle)
3054 if (GNUNET_SCHEDULER_NO_TASK != handle->close_msg_retransmission_task_id)
3055 GNUNET_SCHEDULER_cancel (handle->close_msg_retransmission_task_id);
3056 handle->socket->shutdown_handle = NULL;
3057 GNUNET_free (handle);
3064 * @param socket the stream socket
3067 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
3069 struct MessageQueue *head;
3071 if (NULL != socket->read_handle)
3073 LOG (GNUNET_ERROR_TYPE_WARNING,
3074 "Closing STREAM socket when a read handle is pending\n");
3075 GNUNET_STREAM_io_read_cancel (socket->read_handle);
3077 if (NULL != socket->write_handle)
3079 LOG (GNUNET_ERROR_TYPE_WARNING,
3080 "Closing STREAM socket when a write handle is pending\n");
3081 GNUNET_STREAM_io_write_cancel (socket->write_handle);
3082 //socket->write_handle = NULL;
3084 /* Terminate the ack'ing task if they are still present */
3085 if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK)
3087 GNUNET_SCHEDULER_cancel (socket->ack_task_id);
3088 socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
3090 /* Terminate the control retransmission tasks */
3091 if (GNUNET_SCHEDULER_NO_TASK != socket->control_retransmission_task_id)
3093 GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
3095 /* Clear Transmit handles */
3096 if (NULL != socket->transmit_handle)
3098 GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
3099 socket->transmit_handle = NULL;
3101 /* Clear existing message queue */
3102 while (NULL != (head = socket->queue_head)) {
3103 GNUNET_CONTAINER_DLL_remove (socket->queue_head,
3106 GNUNET_free (head->message);
3109 /* Close associated tunnel */
3110 if (NULL != socket->tunnel)
3112 GNUNET_MESH_tunnel_destroy (socket->tunnel);
3113 socket->tunnel = NULL;
3115 /* Close mesh connection */
3116 if (NULL != socket->mesh && NULL == socket->lsocket)
3118 GNUNET_MESH_disconnect (socket->mesh);
3119 socket->mesh = NULL;
3121 /* Release receive buffer */
3122 if (NULL != socket->receive_buffer)
3124 GNUNET_free (socket->receive_buffer);
3126 GNUNET_free (socket);
3131 * Listens for stream connections for a specific application ports
3133 * @param cfg the configuration to use
3134 * @param app_port the application port for which new streams will be accepted
3135 * @param listen_cb this function will be called when a peer tries to establish
3137 * @param listen_cb_cls closure for listen_cb
3138 * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
3139 * @return listen socket, NULL for any error
3141 struct GNUNET_STREAM_ListenSocket *
3142 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
3143 GNUNET_MESH_ApplicationType app_port,
3144 GNUNET_STREAM_ListenCallback listen_cb,
3145 void *listen_cb_cls,
3148 /* FIXME: Add variable args for passing configration options? */
3149 struct GNUNET_STREAM_ListenSocket *lsocket;
3150 struct GNUNET_TIME_Relative listen_timeout;
3151 enum GNUNET_STREAM_Option option;
3153 uint16_t payload_size;
3155 GNUNET_assert (NULL != listen_cb);
3156 lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
3157 lsocket->cfg = GNUNET_CONFIGURATION_dup (cfg);
3158 lsocket->lockmanager = GNUNET_LOCKMANAGER_connect (lsocket->cfg);
3159 if (NULL == lsocket->lockmanager)
3161 GNUNET_CONFIGURATION_destroy (lsocket->cfg);
3162 GNUNET_free (lsocket);
3165 lsocket->listening = GNUNET_NO;/* We listen when we get a lock on app_port */
3167 lsocket->retransmit_timeout = TIME_REL_SECS (default_timeout);
3168 lsocket->testing_active = GNUNET_NO;
3169 lsocket->listen_ok_cb = NULL;
3170 lsocket->max_payload_size = DEFAULT_MAX_PAYLOAD_SIZE;
3171 listen_timeout = TIME_REL_SECS (60); /* A minute for listen timeout */
3172 va_start (vargs, listen_cb_cls);
3174 option = va_arg (vargs, enum GNUNET_STREAM_Option);
3177 case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
3178 lsocket->retransmit_timeout = va_arg (vargs,
3179 struct GNUNET_TIME_Relative);
3181 case GNUNET_STREAM_OPTION_TESTING_SET_WRITE_SEQUENCE_NUMBER:
3182 lsocket->testing_active = GNUNET_YES;
3183 lsocket->testing_set_write_sequence_number_value = va_arg (vargs,
3186 case GNUNET_STREAM_OPTION_LISTEN_TIMEOUT:
3187 listen_timeout = GNUNET_TIME_relative_multiply
3188 (GNUNET_TIME_UNIT_MILLISECONDS, va_arg (vargs, uint32_t));
3190 case GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS:
3191 lsocket->listen_ok_cb = va_arg (vargs,
3192 GNUNET_STREAM_ListenSuccessCallback);
3194 case GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE:
3195 payload_size = (uint16_t) va_arg (vargs, unsigned int);
3196 GNUNET_assert (0 != payload_size);
3197 if (payload_size < lsocket->max_payload_size)
3198 lsocket->max_payload_size = payload_size;
3200 case GNUNET_STREAM_OPTION_END:
3203 } while (GNUNET_STREAM_OPTION_END != option);
3205 lsocket->port = app_port;
3206 lsocket->listen_cb = listen_cb;
3207 lsocket->listen_cb_cls = listen_cb_cls;
3208 lsocket->locking_request =
3209 GNUNET_LOCKMANAGER_acquire_lock (lsocket->lockmanager, locking_domain,
3210 (uint32_t) lsocket->port,
3211 &lock_status_change_cb, lsocket);
3212 lsocket->lockmanager_acquire_timeout_task =
3213 GNUNET_SCHEDULER_add_delayed (listen_timeout,
3214 &lockmanager_acquire_timeout, lsocket);
3220 * Closes the listen socket
3222 * @param lsocket the listen socket
3225 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
3227 /* Close MESH connection */
3228 if (NULL != lsocket->mesh)
3229 GNUNET_MESH_disconnect (lsocket->mesh);
3230 GNUNET_CONFIGURATION_destroy (lsocket->cfg);
3231 if (GNUNET_SCHEDULER_NO_TASK != lsocket->lockmanager_acquire_timeout_task)
3232 GNUNET_SCHEDULER_cancel (lsocket->lockmanager_acquire_timeout_task);
3233 if (NULL != lsocket->locking_request)
3234 GNUNET_LOCKMANAGER_cancel_request (lsocket->locking_request);
3235 if (NULL != lsocket->lockmanager)
3236 GNUNET_LOCKMANAGER_disconnect (lsocket->lockmanager);
3237 GNUNET_free (lsocket);
3242 * Tries to write the given data to the stream. The maximum size of data that
3243 * can be written as part of a write operation is (64 * (64000 - sizeof (struct
3244 * GNUNET_STREAM_DataMessage))). If size is greater than this it is not an API
3245 * violation, however only the said number of maximum bytes will be written.
3247 * @param socket the socket representing a stream
3248 * @param data the data buffer from where the data is written into the stream
3249 * @param size the number of bytes to be written from the data buffer
3250 * @param timeout the timeout period
3251 * @param write_cont the function to call upon writing some bytes into the
3253 * @param write_cont_cls the closure
3255 * @return handle to cancel the operation; if a previous write is pending or
3256 * the stream has been shutdown for this operation then write_cont is
3257 * immediately called and NULL is returned.
3259 struct GNUNET_STREAM_IOWriteHandle *
3260 GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
3263 struct GNUNET_TIME_Relative timeout,
3264 GNUNET_STREAM_CompletionContinuation write_cont,
3265 void *write_cont_cls)
3267 struct GNUNET_STREAM_IOWriteHandle *io_handle;
3268 struct GNUNET_STREAM_DataMessage *data_msg;
3270 struct GNUNET_TIME_Relative ack_deadline;
3271 unsigned int num_needed_packets;
3272 unsigned int packet;
3273 uint32_t packet_size;
3274 uint32_t payload_size;
3275 uint16_t max_data_packet_size;
3277 LOG (GNUNET_ERROR_TYPE_DEBUG,
3279 if (NULL != socket->write_handle)
3284 switch (socket->state)
3286 case STATE_TRANSMIT_CLOSED:
3287 case STATE_TRANSMIT_CLOSE_WAIT:
3289 case STATE_CLOSE_WAIT:
3290 if (NULL != write_cont)
3291 write_cont (write_cont_cls, GNUNET_STREAM_SHUTDOWN, 0);
3292 LOG (GNUNET_ERROR_TYPE_DEBUG,
3293 "%s() END\n", __func__);
3297 case STATE_HELLO_WAIT:
3298 if (NULL != write_cont)
3299 /* FIXME: GNUNET_STREAM_SYSERR?? */
3300 write_cont (write_cont_cls, GNUNET_STREAM_SYSERR, 0);
3301 LOG (GNUNET_ERROR_TYPE_DEBUG,
3302 "%s() END\n", __func__);
3304 case STATE_ESTABLISHED:
3305 case STATE_RECEIVE_CLOSED:
3306 case STATE_RECEIVE_CLOSE_WAIT:
3309 if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * socket->max_payload_size < size)
3310 size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * socket->max_payload_size;
3311 num_needed_packets =
3312 (size + (socket->max_payload_size - 1)) / socket->max_payload_size;
3313 io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
3314 io_handle->socket = socket;
3315 io_handle->write_cont = write_cont;
3316 io_handle->write_cont_cls = write_cont_cls;
3317 io_handle->size = size;
3318 io_handle->packets_sent = 0;
3320 /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3321 determined from RTT */
3322 ack_deadline = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
3323 /* Divide the given buffer into packets for sending */
3324 max_data_packet_size =
3325 socket->max_payload_size + sizeof (struct GNUNET_STREAM_DataMessage);
3326 for (packet=0; packet < num_needed_packets; packet++)
3328 if ((packet + 1) * socket->max_payload_size < size)
3330 payload_size = socket->max_payload_size;
3331 packet_size = max_data_packet_size;
3335 payload_size = size - packet * socket->max_payload_size;
3337 payload_size + sizeof (struct GNUNET_STREAM_DataMessage);
3339 io_handle->messages[packet] = GNUNET_malloc (packet_size);
3340 io_handle->messages[packet]->header.header.size = htons (packet_size);
3341 io_handle->messages[packet]->header.header.type =
3342 htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
3343 io_handle->messages[packet]->sequence_number =
3344 htonl (socket->write_sequence_number++);
3345 io_handle->messages[packet]->offset = htonl (socket->write_offset);
3346 /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3347 determined from RTT */
3348 io_handle->messages[packet]->ack_deadline =
3349 GNUNET_TIME_relative_hton (ack_deadline);
3350 data_msg = io_handle->messages[packet];
3351 /* Copy data from given buffer to the packet */
3352 memcpy (&data_msg[1], sweep, payload_size);
3353 sweep += payload_size;
3354 socket->write_offset += payload_size;
3356 /* ack the last data message. FIXME: remove when we figure out how to do this
3358 io_handle->messages[num_needed_packets - 1]->ack_deadline =
3359 GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO);
3360 socket->write_handle = io_handle;
3361 write_data (socket);
3362 LOG (GNUNET_ERROR_TYPE_DEBUG,
3363 "%s() END\n", __func__);
3369 * Tries to read data from the stream.
3371 * @param socket the socket representing a stream
3372 * @param timeout the timeout period
3373 * @param proc function to call with data (once only)
3374 * @param proc_cls the closure for proc
3376 * @return handle to cancel the operation; NULL is returned if: the stream has
3377 * been shutdown for this type of opeartion (the DataProcessor is
3378 * immediately called with GNUNET_STREAM_SHUTDOWN as status) OR another
3379 * read handle is present (only one read handle per socket is present
3382 struct GNUNET_STREAM_IOReadHandle *
3383 GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
3384 struct GNUNET_TIME_Relative timeout,
3385 GNUNET_STREAM_DataProcessor proc,
3388 struct GNUNET_STREAM_IOReadHandle *read_handle;
3390 LOG (GNUNET_ERROR_TYPE_DEBUG,
3392 GNUNET_i2s (&socket->other_peer),
3394 /* Return NULL if there is already a read handle; the user has to cancel that
3395 first before continuing or has to wait until it is completed */
3396 if (NULL != socket->read_handle)
3398 GNUNET_assert (NULL != proc);
3399 switch (socket->state)
3401 case STATE_RECEIVE_CLOSED:
3402 case STATE_RECEIVE_CLOSE_WAIT:
3404 case STATE_CLOSE_WAIT:
3405 proc (proc_cls, GNUNET_STREAM_SHUTDOWN, NULL, 0);
3406 LOG (GNUNET_ERROR_TYPE_DEBUG,
3408 GNUNET_i2s (&socket->other_peer),
3414 read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
3415 read_handle->proc = proc;
3416 read_handle->proc_cls = proc_cls;
3417 read_handle->socket = socket;
3418 socket->read_handle = read_handle;
3419 if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
3421 read_handle->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
3423 read_handle->read_io_timeout_task_id =
3424 GNUNET_SCHEDULER_add_delayed (timeout, &read_io_timeout, socket);
3425 LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: %s() END\n",
3426 GNUNET_i2s (&socket->other_peer), __func__);
3432 * Cancel pending write operation.
3434 * @param ioh handle to operation to cancel
3437 GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
3439 struct GNUNET_STREAM_Socket *socket = ioh->socket;
3440 unsigned int packet;
3442 GNUNET_assert (NULL != socket->write_handle);
3443 GNUNET_assert (socket->write_handle == ioh);
3445 if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
3447 GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
3448 socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
3451 for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
3453 if (NULL == ioh->messages[packet]) break;
3454 GNUNET_free (ioh->messages[packet]);
3457 GNUNET_free (socket->write_handle);
3458 socket->write_handle = NULL;
3463 * Cancel pending read operation.
3465 * @param ioh handle to operation to cancel
3468 GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh)
3470 struct GNUNET_STREAM_Socket *socket;
3472 socket = ioh->socket;
3473 GNUNET_assert (NULL != socket->read_handle);
3474 GNUNET_assert (ioh == socket->read_handle);
3475 /* Read io time task should be there; if it is already executed then this
3476 read handle is not valid; However upon scheduler shutdown the read io task
3477 may be executed before */
3478 if (GNUNET_SCHEDULER_NO_TASK != ioh->read_io_timeout_task_id)
3479 GNUNET_SCHEDULER_cancel (ioh->read_io_timeout_task_id);
3480 /* reading task may be present; if so we have to stop it */
3481 if (GNUNET_SCHEDULER_NO_TASK != ioh->read_task_id)
3482 GNUNET_SCHEDULER_cancel (ioh->read_task_id);
3484 socket->read_handle = NULL;
3487 /* end of stream_api.c */