2 This file is part of GNUnet.
3 (C) 2012 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
23 * Checks for matching the sender and socket->other_peer in server
26 * Add code for write io timeout
28 * Include retransmission for control messages
32 * @file stream/stream_api.c
33 * @brief Implementation of the stream library
34 * @author Sree Harsha Totakura
39 #include "gnunet_common.h"
40 #include "gnunet_crypto_lib.h"
41 #include "gnunet_lockmanager_service.h"
42 #include "gnunet_stream_lib.h"
43 #include "stream_protocol.h"
45 #define LOG(kind,...) \
46 GNUNET_log_from (kind, "stream-api", __VA_ARGS__)
48 #define TIME_REL_SECS(sec) \
49 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, sec)
52 * The maximum packet size of a stream packet
54 #define MAX_PACKET_SIZE 512//64000
59 #define RECEIVE_BUFFER_SIZE 4096000
62 * The maximum payload a data message packet can carry
64 static const size_t max_payload_size =
65 MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
68 * states in the Protocol
73 * Client initialization state
78 * Listener initialization state
83 * Pre-connection establishment state
88 * State where a connection has been established
93 * State where the socket is closed on our side and waiting to be ACK'ed
95 STATE_RECEIVE_CLOSE_WAIT,
98 * State where the socket is closed for reading
100 STATE_RECEIVE_CLOSED,
103 * State where the socket is closed on our side and waiting to be ACK'ed
105 STATE_TRANSMIT_CLOSE_WAIT,
108 * State where the socket is closed for writing
110 STATE_TRANSMIT_CLOSED,
113 * State where the socket is closed on our side and waiting to be ACK'ed
118 * State where the socket is closed
125 * Functions of this type are called when a message is written
127 * @param cls the closure from queue_message
128 * @param socket the socket the written message was bound to
130 typedef void (*SendFinishCallback) (void *cls,
131 struct GNUNET_STREAM_Socket *socket);
135 * The send message queue
142 struct GNUNET_STREAM_MessageHeader *message;
145 * Callback to be called when the message is sent
147 SendFinishCallback finish_cb;
150 * The closure for finish_cb
155 * The next message in queue. Should be NULL in the last message
157 struct MessageQueue *next;
160 * The next message in queue. Should be NULL in the first message
162 struct MessageQueue *prev;
167 * The STREAM Socket Handler
169 struct GNUNET_STREAM_Socket
172 * Retransmission timeout
174 struct GNUNET_TIME_Relative retransmit_timeout;
177 * The Acknowledgement Bitmap
179 GNUNET_STREAM_AckBitmap ack_bitmap;
182 * Time when the Acknowledgement was queued
184 struct GNUNET_TIME_Absolute ack_time_registered;
187 * Queued Acknowledgement deadline
189 struct GNUNET_TIME_Relative ack_time_deadline;
194 struct GNUNET_MESH_Handle *mesh;
197 * The mesh tunnel handle
199 struct GNUNET_MESH_Tunnel *tunnel;
202 * Stream open closure
207 * Stream open callback
209 GNUNET_STREAM_OpenCallback open_cb;
212 * The current transmit handle (if a pending transmit request exists)
214 struct GNUNET_MESH_TransmitHandle *transmit_handle;
217 * The current act transmit handle (if a pending ack transmit request exists)
219 struct GNUNET_MESH_TransmitHandle *ack_transmit_handle;
222 * Pointer to the current ack message using in ack_task
224 struct GNUNET_STREAM_AckMessage *ack_msg;
227 * The current message associated with the transmit handle
229 struct MessageQueue *queue_head;
232 * The queue tail, should always point to the last message in queue
234 struct MessageQueue *queue_tail;
237 * The write IO_handle associated with this socket
239 struct GNUNET_STREAM_IOWriteHandle *write_handle;
242 * The read IO_handle associated with this socket
244 struct GNUNET_STREAM_IOReadHandle *read_handle;
247 * The shutdown handle associated with this socket
249 struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
252 * Buffer for storing received messages
254 void *receive_buffer;
257 * The listen socket from which this socket is derived. Should be NULL if it
258 * is not a derived socket
260 struct GNUNET_STREAM_ListenSocket *lsocket;
263 * The peer identity of the peer at the other end of the stream
265 struct GNUNET_PeerIdentity other_peer;
268 * Task identifier for the read io timeout task
270 GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id;
273 * Task identifier for retransmission task after timeout
275 GNUNET_SCHEDULER_TaskIdentifier retransmission_timeout_task_id;
278 * The task for sending timely Acks
280 GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
283 * Task scheduled to continue a read operation.
285 GNUNET_SCHEDULER_TaskIdentifier read_task_id;
288 * The state of the protocol associated with this socket
293 * The status of the socket
295 enum GNUNET_STREAM_Status status;
298 * The number of previous timeouts; FIXME: currently not used
300 unsigned int retries;
303 * The application port number (type: uint32_t)
305 GNUNET_MESH_ApplicationType app_port;
308 * Whether testing mode is active or not
313 * The write sequence number to be set incase of testing
315 uint32_t testing_set_write_sequence_number_value;
318 * The session id associated with this stream connection
319 * FIXME: Not used currently, may be removed
324 * Write sequence number. Set to random when sending HELLO(client) and
327 uint32_t write_sequence_number;
330 * Read sequence number. This number's value is determined during handshake
332 uint32_t read_sequence_number;
335 * The receiver buffer size
337 uint32_t receive_buffer_size;
340 * The receiver buffer boundaries
342 uint32_t receive_buffer_boundaries[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
345 * receiver's available buffer after the last acknowledged packet
347 uint32_t receiver_window_available;
350 * The offset pointer used during write operation
352 uint32_t write_offset;
355 * The offset after which we are expecting data
357 uint32_t read_offset;
360 * The offset upto which user has read from the received buffer
362 uint32_t copy_offset;
367 * A socket for listening
369 struct GNUNET_STREAM_ListenSocket
374 struct GNUNET_MESH_Handle *mesh;
379 struct GNUNET_CONFIGURATION_Handle *cfg;
382 * Handle to the lock manager service
384 struct GNUNET_LOCKMANAGER_Handle *lockmanager;
387 * The active LockingRequest from lockmanager
389 struct GNUNET_LOCKMANAGER_LockingRequest *locking_request;
392 * Callback to call after acquring a lock and listening
394 GNUNET_STREAM_ListenSuccessCallback listen_ok_cb;
397 * The callback function which is called after successful opening socket
399 GNUNET_STREAM_ListenCallback listen_cb;
402 * The call back closure
409 GNUNET_MESH_ApplicationType port;
412 * The id of the lockmanager timeout task
414 GNUNET_SCHEDULER_TaskIdentifier lockmanager_acquire_timeout_task;
417 * The retransmit timeout
419 struct GNUNET_TIME_Relative retransmit_timeout;
427 * Whether testing mode is active or not
432 * The write sequence number to be set incase of testing
434 uint32_t testing_set_write_sequence_number_value;
439 * The IO Write Handle
441 struct GNUNET_STREAM_IOWriteHandle
444 * The socket to which this write handle is associated
446 struct GNUNET_STREAM_Socket *socket;
449 * The packet_buffers associated with this Handle
451 struct GNUNET_STREAM_DataMessage *messages[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
454 * The write continuation callback
456 GNUNET_STREAM_CompletionContinuation write_cont;
459 * Write continuation closure
461 void *write_cont_cls;
464 * The bitmap of this IOHandle; Corresponding bit for a message is set when
465 * it has been acknowledged by the receiver
467 GNUNET_STREAM_AckBitmap ack_bitmap;
470 * Number of bytes in this write handle
479 struct GNUNET_STREAM_IOReadHandle
482 * The socket to which this read handle is associated
484 struct GNUNET_STREAM_Socket *socket;
487 * Callback for the read processor
489 GNUNET_STREAM_DataProcessor proc;
492 * The closure pointer for the read processor callback
499 * Handle for Shutdown
501 struct GNUNET_STREAM_ShutdownHandle
504 * The socket associated with this shutdown handle
506 struct GNUNET_STREAM_Socket *socket;
509 * Shutdown completion callback
511 GNUNET_STREAM_ShutdownCompletion completion_cb;
514 * Closure for completion callback
516 void *completion_cls;
519 * Close message retransmission task id
521 GNUNET_SCHEDULER_TaskIdentifier close_msg_retransmission_task_id;
524 * Which operation to shutdown? SHUT_RD, SHUT_WR or SHUT_RDWR
531 * Default value in seconds for various timeouts
533 static const unsigned int default_timeout = 10;
536 * The domain name for locks we use here
538 static const char *locking_domain = "GNUNET_STREAM_APPLOCK";
542 * Callback function for sending queued message
544 * @param cls closure the socket
545 * @param size number of bytes available in buf
546 * @param buf where the callee should write the message
547 * @return number of bytes written to buf
550 send_message_notify (void *cls, size_t size, void *buf)
552 struct GNUNET_STREAM_Socket *socket = cls;
553 struct MessageQueue *head;
556 socket->transmit_handle = NULL; /* Remove the transmit handle */
557 head = socket->queue_head;
559 return 0; /* just to be safe */
560 if (0 == size) /* request timed out */
563 LOG (GNUNET_ERROR_TYPE_DEBUG,
564 "%s: Message sending timed out. Retry %d \n",
565 GNUNET_i2s (&socket->other_peer),
567 socket->transmit_handle =
568 GNUNET_MESH_notify_transmit_ready (socket->tunnel,
571 /* FIXME: exponential backoff */
572 socket->retransmit_timeout,
574 ntohs (head->message->header.size),
575 &send_message_notify,
580 ret = ntohs (head->message->header.size);
581 GNUNET_assert (size >= ret);
582 memcpy (buf, head->message, ret);
583 if (NULL != head->finish_cb)
585 head->finish_cb (head->finish_cb_cls, socket);
587 GNUNET_CONTAINER_DLL_remove (socket->queue_head,
590 GNUNET_free (head->message);
592 head = socket->queue_head;
593 if (NULL != head) /* more pending messages to send */
596 socket->transmit_handle =
597 GNUNET_MESH_notify_transmit_ready (socket->tunnel,
600 /* FIXME: exponential backoff */
601 socket->retransmit_timeout,
603 ntohs (head->message->header.size),
604 &send_message_notify,
612 * Queues a message for sending using the mesh connection of a socket
614 * @param socket the socket whose mesh connection is used
615 * @param message the message to be sent
616 * @param finish_cb the callback to be called when the message is sent
617 * @param finish_cb_cls the closure for the callback
620 queue_message (struct GNUNET_STREAM_Socket *socket,
621 struct GNUNET_STREAM_MessageHeader *message,
622 SendFinishCallback finish_cb,
625 struct MessageQueue *queue_entity;
628 ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA)
629 && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK));
631 LOG (GNUNET_ERROR_TYPE_DEBUG,
632 "%s: Queueing message of type %d and size %d\n",
633 GNUNET_i2s (&socket->other_peer),
634 ntohs (message->header.type),
635 ntohs (message->header.size));
636 GNUNET_assert (NULL != message);
637 queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
638 queue_entity->message = message;
639 queue_entity->finish_cb = finish_cb;
640 queue_entity->finish_cb_cls = finish_cb_cls;
641 GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
644 if (NULL == socket->transmit_handle)
647 socket->transmit_handle =
648 GNUNET_MESH_notify_transmit_ready (socket->tunnel,
651 socket->retransmit_timeout,
653 ntohs (message->header.size),
654 &send_message_notify,
661 * Copies a message and queues it for sending using the mesh connection of
664 * @param socket the socket whose mesh connection is used
665 * @param message the message to be sent
666 * @param finish_cb the callback to be called when the message is sent
667 * @param finish_cb_cls the closure for the callback
670 copy_and_queue_message (struct GNUNET_STREAM_Socket *socket,
671 const struct GNUNET_STREAM_MessageHeader *message,
672 SendFinishCallback finish_cb,
675 struct GNUNET_STREAM_MessageHeader *msg_copy;
678 size = ntohs (message->header.size);
679 msg_copy = GNUNET_malloc (size);
680 memcpy (msg_copy, message, size);
681 queue_message (socket, msg_copy, finish_cb, finish_cb_cls);
686 * Callback function for sending ack message
688 * @param cls closure the ACK message created in ack_task
689 * @param size number of bytes available in buffer
690 * @param buf where the callee should write the message
691 * @return number of bytes written to buf
694 send_ack_notify (void *cls, size_t size, void *buf)
696 struct GNUNET_STREAM_Socket *socket = cls;
700 LOG (GNUNET_ERROR_TYPE_DEBUG,
701 "%s called with size 0\n", __func__);
704 GNUNET_assert (ntohs (socket->ack_msg->header.header.size) <= size);
706 size = ntohs (socket->ack_msg->header.header.size);
707 memcpy (buf, socket->ack_msg, size);
709 GNUNET_free (socket->ack_msg);
710 socket->ack_msg = NULL;
711 socket->ack_transmit_handle = NULL;
717 * Writes data using the given socket. The amount of data written is limited by
718 * the receiver_window_size
720 * @param socket the socket to use
723 write_data (struct GNUNET_STREAM_Socket *socket);
727 * Task for retransmitting data messages if they aren't ACK before their ack
730 * @param cls the socket
731 * @param tc the Task context
734 retransmission_timeout_task (void *cls,
735 const struct GNUNET_SCHEDULER_TaskContext *tc)
737 struct GNUNET_STREAM_Socket *socket = cls;
739 if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
742 LOG (GNUNET_ERROR_TYPE_DEBUG,
743 "%s: Retransmitting DATA...\n", GNUNET_i2s (&socket->other_peer));
744 socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
750 * Task for sending ACK message
752 * @param cls the socket
753 * @param tc the Task context
757 const struct GNUNET_SCHEDULER_TaskContext *tc)
759 struct GNUNET_STREAM_Socket *socket = cls;
760 struct GNUNET_STREAM_AckMessage *ack_msg;
762 if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
766 socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
767 /* Create the ACK Message */
768 ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
769 ack_msg->header.header.size = htons (sizeof (struct
770 GNUNET_STREAM_AckMessage));
771 ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
772 ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
773 ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
774 ack_msg->receive_window_remaining =
775 htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
776 socket->ack_msg = ack_msg;
777 /* Request MESH for sending ACK */
778 socket->ack_transmit_handle =
779 GNUNET_MESH_notify_transmit_ready (socket->tunnel,
782 socket->retransmit_timeout,
784 ntohs (ack_msg->header.header.size),
791 * Retransmission task for shutdown messages
793 * @param cls the shutdown handle
794 * @param tc the Task Context
797 close_msg_retransmission_task (void *cls,
798 const struct GNUNET_SCHEDULER_TaskContext *tc)
800 struct GNUNET_STREAM_ShutdownHandle *shutdown_handle = cls;
801 struct GNUNET_STREAM_MessageHeader *msg;
802 struct GNUNET_STREAM_Socket *socket;
804 GNUNET_assert (NULL != shutdown_handle);
805 socket = shutdown_handle->socket;
807 msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
808 msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
809 switch (shutdown_handle->operation)
812 msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
815 msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
818 msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
822 shutdown_handle->close_msg_retransmission_task_id =
823 GNUNET_SCHEDULER_NO_TASK;
826 queue_message (socket, msg, NULL, NULL);
827 shutdown_handle->close_msg_retransmission_task_id =
828 GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
829 &close_msg_retransmission_task,
835 * Function to modify a bit in GNUNET_STREAM_AckBitmap
837 * @param bitmap the bitmap to modify
838 * @param bit the bit number to modify
839 * @param value GNUNET_YES to on, GNUNET_NO to off
842 ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
846 GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
847 if (GNUNET_YES == value)
848 *bitmap |= (1LL << bit);
850 *bitmap &= ~(1LL << bit);
855 * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
857 * @param bitmap address of the bitmap that has to be checked
858 * @param bit the bit number to check
859 * @return GNUNET_YES if the bit is set; GNUNET_NO if not
862 ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
865 GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
866 return 0 != (*bitmap & (1LL << bit));
871 * Writes data using the given socket. The amount of data written is limited by
872 * the receiver_window_size
874 * @param socket the socket to use
877 write_data (struct GNUNET_STREAM_Socket *socket)
879 struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
880 int packet; /* Although an int, should never be negative */
884 /* Find the last acknowledged packet */
885 for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
887 if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
890 else if (NULL == io_handle->messages[packet])
893 /* Resend packets which weren't ack'ed */
894 for (packet=0; packet < ack_packet; packet++)
896 if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
899 LOG (GNUNET_ERROR_TYPE_DEBUG,
900 "%s: Placing DATA message with sequence %u in send queue\n",
901 GNUNET_i2s (&socket->other_peer),
902 ntohl (io_handle->messages[packet]->sequence_number));
903 copy_and_queue_message (socket,
904 &io_handle->messages[packet]->header,
909 packet = ack_packet + 1;
910 /* Now send new packets if there is enough buffer space */
911 while ( (NULL != io_handle->messages[packet]) &&
912 (socket->receiver_window_available
913 >= ntohs (io_handle->messages[packet]->header.header.size)) &&
914 (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
916 socket->receiver_window_available -=
917 ntohs (io_handle->messages[packet]->header.header.size);
918 LOG (GNUNET_ERROR_TYPE_DEBUG,
919 "%s: Placing DATA message with sequence %u in send queue\n",
920 GNUNET_i2s (&socket->other_peer),
921 ntohl (io_handle->messages[packet]->sequence_number));
922 copy_and_queue_message (socket,
923 &io_handle->messages[packet]->header,
928 if (GNUNET_SCHEDULER_NO_TASK == socket->retransmission_timeout_task_id)
929 socket->retransmission_timeout_task_id =
930 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
931 (GNUNET_TIME_UNIT_SECONDS, 8),
932 &retransmission_timeout_task,
938 * Task for calling the read processor
940 * @param cls the socket
941 * @param tc the task context
944 call_read_processor (void *cls,
945 const struct GNUNET_SCHEDULER_TaskContext *tc)
947 struct GNUNET_STREAM_Socket *socket = cls;
949 size_t valid_read_size;
951 uint32_t sequence_increase;
952 uint32_t offset_increase;
954 socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
955 if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
958 if (NULL == socket->receive_buffer)
961 GNUNET_assert (NULL != socket->read_handle);
962 GNUNET_assert (NULL != socket->read_handle->proc);
964 /* Check the bitmap for any holes */
965 for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
967 if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
971 /* We only call read processor if we have the first packet */
972 GNUNET_assert (0 < packet);
974 socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
975 GNUNET_assert (0 != valid_read_size);
976 /* Cancel the read_io_timeout_task */
977 GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task_id);
978 socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
979 /* Call the data processor */
980 LOG (GNUNET_ERROR_TYPE_DEBUG,
981 "%s: Calling read processor\n",
982 GNUNET_i2s (&socket->other_peer));
984 socket->read_handle->proc (socket->read_handle->proc_cls,
986 socket->receive_buffer + socket->copy_offset,
988 LOG (GNUNET_ERROR_TYPE_DEBUG,
989 "%s: Read processor read %d bytes\n",
990 GNUNET_i2s (&socket->other_peer), read_size);
991 LOG (GNUNET_ERROR_TYPE_DEBUG,
992 "%s: Read processor completed successfully\n",
993 GNUNET_i2s (&socket->other_peer));
994 /* Free the read handle */
995 GNUNET_free (socket->read_handle);
996 socket->read_handle = NULL;
997 GNUNET_assert (read_size <= valid_read_size);
998 socket->copy_offset += read_size;
999 /* Determine upto which packet we can remove from the buffer */
1000 for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
1002 if (socket->copy_offset == socket->receive_buffer_boundaries[packet])
1003 { packet++; break; }
1004 if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
1008 /* If no packets can be removed we can't move the buffer */
1009 if (0 == packet) return;
1010 sequence_increase = packet;
1011 LOG (GNUNET_ERROR_TYPE_DEBUG,
1012 "%s: Sequence increase after read processor completion: %u\n",
1013 GNUNET_i2s (&socket->other_peer), sequence_increase);
1015 /* Shift the data in the receive buffer */
1016 socket->receive_buffer =
1017 memmove (socket->receive_buffer,
1018 socket->receive_buffer
1019 + socket->receive_buffer_boundaries[sequence_increase-1],
1020 socket->receive_buffer_size
1021 - socket->receive_buffer_boundaries[sequence_increase-1]);
1022 /* Shift the bitmap */
1023 socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
1024 /* Set read_sequence_number */
1025 socket->read_sequence_number += sequence_increase;
1026 /* Set read_offset */
1027 offset_increase = socket->receive_buffer_boundaries[sequence_increase-1];
1028 socket->read_offset += offset_increase;
1029 /* Fix copy_offset */
1030 GNUNET_assert (offset_increase <= socket->copy_offset);
1031 socket->copy_offset -= offset_increase;
1032 /* Fix relative boundaries */
1033 for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
1035 if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase)
1037 uint32_t ahead_buffer_boundary;
1039 ahead_buffer_boundary =
1040 socket->receive_buffer_boundaries[packet + sequence_increase];
1041 if (0 == ahead_buffer_boundary)
1042 socket->receive_buffer_boundaries[packet] = 0;
1045 GNUNET_assert (offset_increase < ahead_buffer_boundary);
1046 socket->receive_buffer_boundaries[packet] =
1047 ahead_buffer_boundary - offset_increase;
1051 socket->receive_buffer_boundaries[packet] = 0;
1057 * Cancels the existing read io handle
1059 * @param cls the closure from the SCHEDULER call
1060 * @param tc the task context
1063 read_io_timeout (void *cls,
1064 const struct GNUNET_SCHEDULER_TaskContext *tc)
1066 struct GNUNET_STREAM_Socket *socket = cls;
1067 GNUNET_STREAM_DataProcessor proc;
1070 socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
1071 if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
1073 LOG (GNUNET_ERROR_TYPE_DEBUG,
1074 "%s: Read task timedout - Cancelling it\n",
1075 GNUNET_i2s (&socket->other_peer));
1076 GNUNET_SCHEDULER_cancel (socket->read_task_id);
1077 socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
1079 GNUNET_assert (NULL != socket->read_handle);
1080 proc = socket->read_handle->proc;
1081 proc_cls = socket->read_handle->proc_cls;
1082 GNUNET_free (socket->read_handle);
1083 socket->read_handle = NULL;
1084 /* Call the read processor to signal timeout */
1086 GNUNET_STREAM_TIMEOUT,
1093 * Handler for DATA messages; Same for both client and server
1095 * @param socket the socket through which the ack was received
1096 * @param tunnel connection to the other end
1097 * @param sender who sent the message
1098 * @param msg the data message
1099 * @param atsi performance data for the connection
1100 * @return GNUNET_OK to keep the connection open,
1101 * GNUNET_SYSERR to close it (signal serious error)
1104 handle_data (struct GNUNET_STREAM_Socket *socket,
1105 struct GNUNET_MESH_Tunnel *tunnel,
1106 const struct GNUNET_PeerIdentity *sender,
1107 const struct GNUNET_STREAM_DataMessage *msg,
1108 const struct GNUNET_ATS_Information*atsi)
1110 const void *payload;
1111 uint32_t bytes_needed;
1112 uint32_t relative_offset;
1113 uint32_t relative_sequence_number;
1116 size = htons (msg->header.header.size);
1117 if (size < sizeof (struct GNUNET_STREAM_DataMessage))
1119 GNUNET_break_op (0);
1120 return GNUNET_SYSERR;
1123 if (0 != memcmp (sender,
1124 &socket->other_peer,
1125 sizeof (struct GNUNET_PeerIdentity)))
1127 LOG (GNUNET_ERROR_TYPE_DEBUG,
1128 "%s: Received DATA from non-confirming peer\n",
1129 GNUNET_i2s (&socket->other_peer));
1133 switch (socket->state)
1135 case STATE_ESTABLISHED:
1136 case STATE_TRANSMIT_CLOSED:
1137 case STATE_TRANSMIT_CLOSE_WAIT:
1139 /* check if the message's sequence number is in the range we are
1141 relative_sequence_number =
1142 ntohl (msg->sequence_number) - socket->read_sequence_number;
1143 if ( relative_sequence_number > GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
1145 LOG (GNUNET_ERROR_TYPE_DEBUG,
1146 "%s: Ignoring received message with sequence number %u\n",
1147 GNUNET_i2s (&socket->other_peer),
1148 ntohl (msg->sequence_number));
1149 /* Start ACK sending task if one is not already present */
1150 if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1152 socket->ack_task_id =
1153 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1154 (msg->ack_deadline),
1161 /* Check if we have already seen this message */
1162 if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
1163 relative_sequence_number))
1165 LOG (GNUNET_ERROR_TYPE_DEBUG,
1166 "%s: Ignoring already received message with sequence number %u\n",
1167 GNUNET_i2s (&socket->other_peer),
1168 ntohl (msg->sequence_number));
1169 /* Start ACK sending task if one is not already present */
1170 if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1172 socket->ack_task_id =
1173 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1174 (msg->ack_deadline),
1181 LOG (GNUNET_ERROR_TYPE_DEBUG,
1182 "%s: Receiving DATA with sequence number: %u and size: %d from %s\n",
1183 GNUNET_i2s (&socket->other_peer),
1184 ntohl (msg->sequence_number),
1185 ntohs (msg->header.header.size),
1186 GNUNET_i2s (&socket->other_peer));
1188 /* Check if we have to allocate the buffer */
1189 size -= sizeof (struct GNUNET_STREAM_DataMessage);
1190 relative_offset = ntohl (msg->offset) - socket->read_offset;
1191 bytes_needed = relative_offset + size;
1192 if (bytes_needed > socket->receive_buffer_size)
1194 if (bytes_needed <= RECEIVE_BUFFER_SIZE)
1196 socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
1198 socket->receive_buffer_size = bytes_needed;
1202 LOG (GNUNET_ERROR_TYPE_DEBUG,
1203 "%s: Cannot accommodate packet %d as buffer is full\n",
1204 GNUNET_i2s (&socket->other_peer),
1205 ntohl (msg->sequence_number));
1210 /* Copy Data to buffer */
1212 GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
1213 memcpy (socket->receive_buffer + relative_offset,
1216 socket->receive_buffer_boundaries[relative_sequence_number] =
1217 relative_offset + size;
1219 /* Modify the ACK bitmap */
1220 ackbitmap_modify_bit (&socket->ack_bitmap,
1221 relative_sequence_number,
1224 /* Start ACK sending task if one is not already present */
1225 if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1227 socket->ack_task_id =
1228 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1229 (msg->ack_deadline),
1234 if ((NULL != socket->read_handle) /* A read handle is waiting */
1235 /* There is no current read task */
1236 && (GNUNET_SCHEDULER_NO_TASK == socket->read_task_id)
1237 /* We have the first packet */
1238 && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap,
1241 LOG (GNUNET_ERROR_TYPE_DEBUG,
1242 "%s: Scheduling read processor\n",
1243 GNUNET_i2s (&socket->other_peer));
1245 socket->read_task_id =
1246 GNUNET_SCHEDULER_add_now (&call_read_processor,
1253 LOG (GNUNET_ERROR_TYPE_DEBUG,
1254 "%s: Received data message when it cannot be handled\n",
1255 GNUNET_i2s (&socket->other_peer));
1263 * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1265 * @param cls the socket (set from GNUNET_MESH_connect)
1266 * @param tunnel connection to the other end
1267 * @param tunnel_ctx place to store local state associated with the tunnel
1268 * @param sender who sent the message
1269 * @param message the actual message
1270 * @param atsi performance data for the connection
1271 * @return GNUNET_OK to keep the connection open,
1272 * GNUNET_SYSERR to close it (signal serious error)
1275 client_handle_data (void *cls,
1276 struct GNUNET_MESH_Tunnel *tunnel,
1278 const struct GNUNET_PeerIdentity *sender,
1279 const struct GNUNET_MessageHeader *message,
1280 const struct GNUNET_ATS_Information*atsi)
1282 struct GNUNET_STREAM_Socket *socket = cls;
1284 return handle_data (socket,
1287 (const struct GNUNET_STREAM_DataMessage *) message,
1293 * Callback to set state to ESTABLISHED
1295 * @param cls the closure from queue_message FIXME: document
1296 * @param socket the socket to requiring state change
1299 set_state_established (void *cls,
1300 struct GNUNET_STREAM_Socket *socket)
1302 LOG (GNUNET_ERROR_TYPE_DEBUG,
1303 "%s: Attaining ESTABLISHED state\n",
1304 GNUNET_i2s (&socket->other_peer));
1305 socket->write_offset = 0;
1306 socket->read_offset = 0;
1307 socket->state = STATE_ESTABLISHED;
1308 if (NULL != socket->lsocket)
1310 LOG (GNUNET_ERROR_TYPE_DEBUG,
1311 "%s: Calling listen callback\n",
1312 GNUNET_i2s (&socket->other_peer));
1313 if (GNUNET_SYSERR ==
1314 socket->lsocket->listen_cb (socket->lsocket->listen_cb_cls,
1316 &socket->other_peer))
1318 socket->state = STATE_CLOSED;
1319 /* FIXME: We should close in a decent way (send RST) */
1320 GNUNET_MESH_tunnel_destroy (socket->tunnel); /* Destroy the tunnel */
1321 GNUNET_free (socket);
1324 else if (NULL != socket->open_cb)
1325 socket->open_cb (socket->open_cls, socket);
1330 * Callback to set state to HELLO_WAIT
1332 * @param cls the closure from queue_message
1333 * @param socket the socket to requiring state change
1336 set_state_hello_wait (void *cls,
1337 struct GNUNET_STREAM_Socket *socket)
1339 GNUNET_assert (STATE_INIT == socket->state);
1340 LOG (GNUNET_ERROR_TYPE_DEBUG,
1341 "%s: Attaining HELLO_WAIT state\n",
1342 GNUNET_i2s (&socket->other_peer));
1343 socket->state = STATE_HELLO_WAIT;
1348 * Callback to set state to CLOSE_WAIT
1350 * @param cls the closure from queue_message
1351 * @param socket the socket requiring state change
1354 set_state_close_wait (void *cls,
1355 struct GNUNET_STREAM_Socket *socket)
1357 LOG (GNUNET_ERROR_TYPE_DEBUG,
1358 "%s: Attaing CLOSE_WAIT state\n",
1359 GNUNET_i2s (&socket->other_peer));
1360 socket->state = STATE_CLOSE_WAIT;
1361 GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1362 socket->receive_buffer = NULL;
1363 socket->receive_buffer_size = 0;
1368 * Callback to set state to RECEIVE_CLOSE_WAIT
1370 * @param cls the closure from queue_message
1371 * @param socket the socket requiring state change
1374 set_state_receive_close_wait (void *cls,
1375 struct GNUNET_STREAM_Socket *socket)
1377 LOG (GNUNET_ERROR_TYPE_DEBUG,
1378 "%s: Attaing RECEIVE_CLOSE_WAIT state\n",
1379 GNUNET_i2s (&socket->other_peer));
1380 socket->state = STATE_RECEIVE_CLOSE_WAIT;
1381 GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1382 socket->receive_buffer = NULL;
1383 socket->receive_buffer_size = 0;
1388 * Callback to set state to TRANSMIT_CLOSE_WAIT
1390 * @param cls the closure from queue_message
1391 * @param socket the socket requiring state change
1394 set_state_transmit_close_wait (void *cls,
1395 struct GNUNET_STREAM_Socket *socket)
1397 LOG (GNUNET_ERROR_TYPE_DEBUG,
1398 "%s: Attaing TRANSMIT_CLOSE_WAIT state\n",
1399 GNUNET_i2s (&socket->other_peer));
1400 socket->state = STATE_TRANSMIT_CLOSE_WAIT;
1405 * Callback to set state to CLOSED
1407 * @param cls the closure from queue_message
1408 * @param socket the socket requiring state change
1411 set_state_closed (void *cls,
1412 struct GNUNET_STREAM_Socket *socket)
1414 socket->state = STATE_CLOSED;
1419 * Returns a new HelloAckMessage. Also sets the write sequence number for the
1422 * @param socket the socket for which this HelloAckMessage has to be generated
1423 * @return the HelloAckMessage
1425 static struct GNUNET_STREAM_HelloAckMessage *
1426 generate_hello_ack_msg (struct GNUNET_STREAM_Socket *socket)
1428 struct GNUNET_STREAM_HelloAckMessage *msg;
1430 /* Get the random sequence number */
1431 if (GNUNET_YES == socket->testing_active)
1432 socket->write_sequence_number =
1433 socket->testing_set_write_sequence_number_value;
1435 socket->write_sequence_number =
1436 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1437 LOG (GNUNET_ERROR_TYPE_DEBUG,
1438 "%s: write sequence number %u\n",
1439 GNUNET_i2s (&socket->other_peer),
1440 (unsigned int) socket->write_sequence_number);
1442 msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1443 msg->header.header.size =
1444 htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1445 msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1446 msg->sequence_number = htonl (socket->write_sequence_number);
1447 msg->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
1454 * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1456 * @param cls the socket (set from GNUNET_MESH_connect)
1457 * @param tunnel connection to the other end
1458 * @param tunnel_ctx this is NULL
1459 * @param sender who sent the message
1460 * @param message the actual message
1461 * @param atsi performance data for the connection
1462 * @return GNUNET_OK to keep the connection open,
1463 * GNUNET_SYSERR to close it (signal serious error)
1466 client_handle_hello_ack (void *cls,
1467 struct GNUNET_MESH_Tunnel *tunnel,
1469 const struct GNUNET_PeerIdentity *sender,
1470 const struct GNUNET_MessageHeader *message,
1471 const struct GNUNET_ATS_Information*atsi)
1473 struct GNUNET_STREAM_Socket *socket = cls;
1474 const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
1475 struct GNUNET_STREAM_HelloAckMessage *reply;
1477 if (0 != memcmp (sender,
1478 &socket->other_peer,
1479 sizeof (struct GNUNET_PeerIdentity)))
1481 LOG (GNUNET_ERROR_TYPE_DEBUG,
1482 "%s: Received HELLO_ACK from non-confirming peer\n",
1483 GNUNET_i2s (&socket->other_peer));
1486 ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
1487 LOG (GNUNET_ERROR_TYPE_DEBUG,
1488 "%s: Received HELLO_ACK from %s\n",
1489 GNUNET_i2s (&socket->other_peer),
1490 GNUNET_i2s (&socket->other_peer));
1492 GNUNET_assert (socket->tunnel == tunnel);
1493 switch (socket->state)
1495 case STATE_HELLO_WAIT:
1496 socket->read_sequence_number = ntohl (ack_msg->sequence_number);
1497 LOG (GNUNET_ERROR_TYPE_DEBUG,
1498 "%s: Read sequence number %u\n",
1499 GNUNET_i2s (&socket->other_peer),
1500 (unsigned int) socket->read_sequence_number);
1501 socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
1502 reply = generate_hello_ack_msg (socket);
1503 queue_message (socket,
1505 &set_state_established,
1508 case STATE_ESTABLISHED:
1509 case STATE_RECEIVE_CLOSE_WAIT:
1510 // call statistics (# ACKs ignored++)
1514 LOG (GNUNET_ERROR_TYPE_DEBUG,
1515 "%s: Server %s sent HELLO_ACK when in state %d\n",
1516 GNUNET_i2s (&socket->other_peer),
1517 GNUNET_i2s (&socket->other_peer),
1519 socket->state = STATE_CLOSED; // introduce STATE_ERROR?
1520 return GNUNET_SYSERR;
1527 * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1529 * @param cls the socket (set from GNUNET_MESH_connect)
1530 * @param tunnel connection to the other end
1531 * @param tunnel_ctx this is NULL
1532 * @param sender who sent the message
1533 * @param message the actual message
1534 * @param atsi performance data for the connection
1535 * @return GNUNET_OK to keep the connection open,
1536 * GNUNET_SYSERR to close it (signal serious error)
1539 client_handle_reset (void *cls,
1540 struct GNUNET_MESH_Tunnel *tunnel,
1542 const struct GNUNET_PeerIdentity *sender,
1543 const struct GNUNET_MessageHeader *message,
1544 const struct GNUNET_ATS_Information*atsi)
1546 // struct GNUNET_STREAM_Socket *socket = cls;
1553 * Common message handler for handling TRANSMIT_CLOSE messages
1555 * @param socket the socket through which the ack was received
1556 * @param tunnel connection to the other end
1557 * @param sender who sent the message
1558 * @param msg the transmit close message
1559 * @param atsi performance data for the connection
1560 * @return GNUNET_OK to keep the connection open,
1561 * GNUNET_SYSERR to close it (signal serious error)
1564 handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
1565 struct GNUNET_MESH_Tunnel *tunnel,
1566 const struct GNUNET_PeerIdentity *sender,
1567 const struct GNUNET_STREAM_MessageHeader *msg,
1568 const struct GNUNET_ATS_Information*atsi)
1570 struct GNUNET_STREAM_MessageHeader *reply;
1572 switch (socket->state)
1574 case STATE_ESTABLISHED:
1575 socket->state = STATE_RECEIVE_CLOSED;
1577 /* Send TRANSMIT_CLOSE_ACK */
1578 reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1579 reply->header.type =
1580 htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
1581 reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1582 queue_message (socket, reply, NULL, NULL);
1586 /* FIXME: Call statistics? */
1594 * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1596 * @param cls the socket (set from GNUNET_MESH_connect)
1597 * @param tunnel connection to the other end
1598 * @param tunnel_ctx this is NULL
1599 * @param sender who sent the message
1600 * @param message the actual message
1601 * @param atsi performance data for the connection
1602 * @return GNUNET_OK to keep the connection open,
1603 * GNUNET_SYSERR to close it (signal serious error)
1606 client_handle_transmit_close (void *cls,
1607 struct GNUNET_MESH_Tunnel *tunnel,
1609 const struct GNUNET_PeerIdentity *sender,
1610 const struct GNUNET_MessageHeader *message,
1611 const struct GNUNET_ATS_Information*atsi)
1613 struct GNUNET_STREAM_Socket *socket = cls;
1615 return handle_transmit_close (socket,
1618 (struct GNUNET_STREAM_MessageHeader *)message,
1624 * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_*_CLOSE_ACK messages
1626 * @param socket the socket
1627 * @param tunnel connection to the other end
1628 * @param sender who sent the message
1629 * @param message the actual message
1630 * @param atsi performance data for the connection
1631 * @param operation the close operation which is being ACK'ed
1632 * @return GNUNET_OK to keep the connection open,
1633 * GNUNET_SYSERR to close it (signal serious error)
1636 handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
1637 struct GNUNET_MESH_Tunnel *tunnel,
1638 const struct GNUNET_PeerIdentity *sender,
1639 const struct GNUNET_STREAM_MessageHeader *message,
1640 const struct GNUNET_ATS_Information *atsi,
1643 struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
1645 shutdown_handle = socket->shutdown_handle;
1646 if (NULL == shutdown_handle)
1648 LOG (GNUNET_ERROR_TYPE_DEBUG,
1649 "%s: Received CLOSE_ACK when shutdown handle is NULL\n",
1650 GNUNET_i2s (&socket->other_peer));
1657 switch (socket->state)
1659 case STATE_CLOSE_WAIT:
1660 if (SHUT_RDWR != shutdown_handle->operation)
1662 LOG (GNUNET_ERROR_TYPE_DEBUG,
1663 "%s: Received CLOSE_ACK when shutdown handle is not for "
1665 GNUNET_i2s (&socket->other_peer));
1669 LOG (GNUNET_ERROR_TYPE_DEBUG,
1670 "%s: Received CLOSE_ACK from %s\n",
1671 GNUNET_i2s (&socket->other_peer),
1672 GNUNET_i2s (&socket->other_peer));
1673 socket->state = STATE_CLOSED;
1676 LOG (GNUNET_ERROR_TYPE_DEBUG,
1677 "%s: Received CLOSE_ACK when in it not expected\n",
1678 GNUNET_i2s (&socket->other_peer));
1684 switch (socket->state)
1686 case STATE_RECEIVE_CLOSE_WAIT:
1687 if (SHUT_RD != shutdown_handle->operation)
1689 LOG (GNUNET_ERROR_TYPE_DEBUG,
1690 "%s: Received RECEIVE_CLOSE_ACK when shutdown handle "
1691 "is not for SHUT_RD\n",
1692 GNUNET_i2s (&socket->other_peer));
1696 LOG (GNUNET_ERROR_TYPE_DEBUG,
1697 "%s: Received RECEIVE_CLOSE_ACK from %s\n",
1698 GNUNET_i2s (&socket->other_peer),
1699 GNUNET_i2s (&socket->other_peer));
1700 socket->state = STATE_RECEIVE_CLOSED;
1703 LOG (GNUNET_ERROR_TYPE_DEBUG,
1704 "%s: Received RECEIVE_CLOSE_ACK when in it not expected\n",
1705 GNUNET_i2s (&socket->other_peer));
1711 switch (socket->state)
1713 case STATE_TRANSMIT_CLOSE_WAIT:
1714 if (SHUT_WR != shutdown_handle->operation)
1716 LOG (GNUNET_ERROR_TYPE_DEBUG,
1717 "%s: Received TRANSMIT_CLOSE_ACK when shutdown handle "
1718 "is not for SHUT_WR\n",
1719 GNUNET_i2s (&socket->other_peer));
1723 LOG (GNUNET_ERROR_TYPE_DEBUG,
1724 "%s: Received TRANSMIT_CLOSE_ACK from %s\n",
1725 GNUNET_i2s (&socket->other_peer),
1726 GNUNET_i2s (&socket->other_peer));
1727 socket->state = STATE_TRANSMIT_CLOSED;
1730 LOG (GNUNET_ERROR_TYPE_DEBUG,
1731 "%s: Received TRANSMIT_CLOSE_ACK when in it not expected\n",
1732 GNUNET_i2s (&socket->other_peer));
1741 if (NULL != shutdown_handle->completion_cb) /* Shutdown completion */
1742 shutdown_handle->completion_cb(shutdown_handle->completion_cls,
1744 GNUNET_free (shutdown_handle); /* Free shutdown handle */
1745 socket->shutdown_handle = NULL;
1746 if (GNUNET_SCHEDULER_NO_TASK
1747 != shutdown_handle->close_msg_retransmission_task_id)
1749 GNUNET_SCHEDULER_cancel
1750 (shutdown_handle->close_msg_retransmission_task_id);
1751 shutdown_handle->close_msg_retransmission_task_id =
1752 GNUNET_SCHEDULER_NO_TASK;
1759 * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1761 * @param cls the socket (set from GNUNET_MESH_connect)
1762 * @param tunnel connection to the other end
1763 * @param tunnel_ctx this is NULL
1764 * @param sender who sent the message
1765 * @param message the actual message
1766 * @param atsi performance data for the connection
1767 * @return GNUNET_OK to keep the connection open,
1768 * GNUNET_SYSERR to close it (signal serious error)
1771 client_handle_transmit_close_ack (void *cls,
1772 struct GNUNET_MESH_Tunnel *tunnel,
1774 const struct GNUNET_PeerIdentity *sender,
1775 const struct GNUNET_MessageHeader *message,
1776 const struct GNUNET_ATS_Information*atsi)
1778 struct GNUNET_STREAM_Socket *socket = cls;
1780 return handle_generic_close_ack (socket,
1783 (const struct GNUNET_STREAM_MessageHeader *)
1791 * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1793 * @param socket the socket
1794 * @param tunnel connection to the other end
1795 * @param sender who sent the message
1796 * @param message the actual message
1797 * @param atsi performance data for the connection
1798 * @return GNUNET_OK to keep the connection open,
1799 * GNUNET_SYSERR to close it (signal serious error)
1802 handle_receive_close (struct GNUNET_STREAM_Socket *socket,
1803 struct GNUNET_MESH_Tunnel *tunnel,
1804 const struct GNUNET_PeerIdentity *sender,
1805 const struct GNUNET_STREAM_MessageHeader *message,
1806 const struct GNUNET_ATS_Information *atsi)
1808 struct GNUNET_STREAM_MessageHeader *receive_close_ack;
1810 switch (socket->state)
1814 case STATE_HELLO_WAIT:
1815 LOG (GNUNET_ERROR_TYPE_DEBUG,
1816 "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
1817 GNUNET_i2s (&socket->other_peer));
1823 LOG (GNUNET_ERROR_TYPE_DEBUG,
1824 "%s: Received RECEIVE_CLOSE from %s\n",
1825 GNUNET_i2s (&socket->other_peer),
1826 GNUNET_i2s (&socket->other_peer));
1828 GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1829 receive_close_ack->header.size =
1830 htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1831 receive_close_ack->header.type =
1832 htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK);
1833 queue_message (socket,
1838 /* FIXME: Handle the case where write handle is present; the write operation
1839 should be deemed as finised and the write continuation callback
1840 has to be called with the stream status GNUNET_STREAM_SHUTDOWN */
1846 * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1848 * @param cls the socket (set from GNUNET_MESH_connect)
1849 * @param tunnel connection to the other end
1850 * @param tunnel_ctx this is NULL
1851 * @param sender who sent the message
1852 * @param message the actual message
1853 * @param atsi performance data for the connection
1854 * @return GNUNET_OK to keep the connection open,
1855 * GNUNET_SYSERR to close it (signal serious error)
1858 client_handle_receive_close (void *cls,
1859 struct GNUNET_MESH_Tunnel *tunnel,
1861 const struct GNUNET_PeerIdentity *sender,
1862 const struct GNUNET_MessageHeader *message,
1863 const struct GNUNET_ATS_Information*atsi)
1865 struct GNUNET_STREAM_Socket *socket = cls;
1868 handle_receive_close (socket,
1871 (const struct GNUNET_STREAM_MessageHeader *) message,
1877 * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1879 * @param cls the socket (set from GNUNET_MESH_connect)
1880 * @param tunnel connection to the other end
1881 * @param tunnel_ctx this is NULL
1882 * @param sender who sent the message
1883 * @param message the actual message
1884 * @param atsi performance data for the connection
1885 * @return GNUNET_OK to keep the connection open,
1886 * GNUNET_SYSERR to close it (signal serious error)
1889 client_handle_receive_close_ack (void *cls,
1890 struct GNUNET_MESH_Tunnel *tunnel,
1892 const struct GNUNET_PeerIdentity *sender,
1893 const struct GNUNET_MessageHeader *message,
1894 const struct GNUNET_ATS_Information*atsi)
1896 struct GNUNET_STREAM_Socket *socket = cls;
1898 return handle_generic_close_ack (socket,
1901 (const struct GNUNET_STREAM_MessageHeader *)
1909 * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1911 * @param socket the socket
1912 * @param tunnel connection to the other end
1913 * @param sender who sent the message
1914 * @param message the actual message
1915 * @param atsi performance data for the connection
1916 * @return GNUNET_OK to keep the connection open,
1917 * GNUNET_SYSERR to close it (signal serious error)
1920 handle_close (struct GNUNET_STREAM_Socket *socket,
1921 struct GNUNET_MESH_Tunnel *tunnel,
1922 const struct GNUNET_PeerIdentity *sender,
1923 const struct GNUNET_STREAM_MessageHeader *message,
1924 const struct GNUNET_ATS_Information*atsi)
1926 struct GNUNET_STREAM_MessageHeader *close_ack;
1928 switch (socket->state)
1932 case STATE_HELLO_WAIT:
1933 LOG (GNUNET_ERROR_TYPE_DEBUG,
1934 "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
1935 GNUNET_i2s (&socket->other_peer));
1941 LOG (GNUNET_ERROR_TYPE_DEBUG,
1942 "%s: Received CLOSE from %s\n",
1943 GNUNET_i2s (&socket->other_peer),
1944 GNUNET_i2s (&socket->other_peer));
1945 close_ack = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1946 close_ack->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1947 close_ack->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK);
1948 queue_message (socket,
1952 if (socket->state == STATE_CLOSED)
1955 GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1956 socket->receive_buffer = NULL;
1957 socket->receive_buffer_size = 0;
1963 * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1965 * @param cls the socket (set from GNUNET_MESH_connect)
1966 * @param tunnel connection to the other end
1967 * @param tunnel_ctx this is NULL
1968 * @param sender who sent the message
1969 * @param message the actual message
1970 * @param atsi performance data for the connection
1971 * @return GNUNET_OK to keep the connection open,
1972 * GNUNET_SYSERR to close it (signal serious error)
1975 client_handle_close (void *cls,
1976 struct GNUNET_MESH_Tunnel *tunnel,
1978 const struct GNUNET_PeerIdentity *sender,
1979 const struct GNUNET_MessageHeader *message,
1980 const struct GNUNET_ATS_Information*atsi)
1982 struct GNUNET_STREAM_Socket *socket = cls;
1984 return handle_close (socket,
1987 (const struct GNUNET_STREAM_MessageHeader *) message,
1993 * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1995 * @param cls the socket (set from GNUNET_MESH_connect)
1996 * @param tunnel connection to the other end
1997 * @param tunnel_ctx this is NULL
1998 * @param sender who sent the message
1999 * @param message the actual message
2000 * @param atsi performance data for the connection
2001 * @return GNUNET_OK to keep the connection open,
2002 * GNUNET_SYSERR to close it (signal serious error)
2005 client_handle_close_ack (void *cls,
2006 struct GNUNET_MESH_Tunnel *tunnel,
2008 const struct GNUNET_PeerIdentity *sender,
2009 const struct GNUNET_MessageHeader *message,
2010 const struct GNUNET_ATS_Information *atsi)
2012 struct GNUNET_STREAM_Socket *socket = cls;
2014 return handle_generic_close_ack (socket,
2017 (const struct GNUNET_STREAM_MessageHeader *)
2023 /*****************************/
2024 /* Server's Message Handlers */
2025 /*****************************/
2028 * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
2030 * @param cls the closure
2031 * @param tunnel connection to the other end
2032 * @param tunnel_ctx the socket
2033 * @param sender who sent the message
2034 * @param message the actual message
2035 * @param atsi performance data for the connection
2036 * @return GNUNET_OK to keep the connection open,
2037 * GNUNET_SYSERR to close it (signal serious error)
2040 server_handle_data (void *cls,
2041 struct GNUNET_MESH_Tunnel *tunnel,
2043 const struct GNUNET_PeerIdentity *sender,
2044 const struct GNUNET_MessageHeader *message,
2045 const struct GNUNET_ATS_Information*atsi)
2047 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2049 return handle_data (socket,
2052 (const struct GNUNET_STREAM_DataMessage *)message,
2058 * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
2060 * @param cls the closure
2061 * @param tunnel connection to the other end
2062 * @param tunnel_ctx the socket
2063 * @param sender who sent the message
2064 * @param message the actual message
2065 * @param atsi performance data for the connection
2066 * @return GNUNET_OK to keep the connection open,
2067 * GNUNET_SYSERR to close it (signal serious error)
2070 server_handle_hello (void *cls,
2071 struct GNUNET_MESH_Tunnel *tunnel,
2073 const struct GNUNET_PeerIdentity *sender,
2074 const struct GNUNET_MessageHeader *message,
2075 const struct GNUNET_ATS_Information*atsi)
2077 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2078 struct GNUNET_STREAM_HelloAckMessage *reply;
2080 if (0 != memcmp (sender,
2081 &socket->other_peer,
2082 sizeof (struct GNUNET_PeerIdentity)))
2084 LOG (GNUNET_ERROR_TYPE_DEBUG,
2085 "%s: Received HELLO from non-confirming peer\n",
2086 GNUNET_i2s (&socket->other_peer));
2090 GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO ==
2091 ntohs (message->type));
2092 GNUNET_assert (socket->tunnel == tunnel);
2093 LOG (GNUNET_ERROR_TYPE_DEBUG,
2094 "%s: Received HELLO from %s\n",
2095 GNUNET_i2s (&socket->other_peer),
2096 GNUNET_i2s (&socket->other_peer));
2098 if (STATE_INIT == socket->state)
2100 reply = generate_hello_ack_msg (socket);
2101 queue_message (socket,
2103 &set_state_hello_wait,
2108 LOG (GNUNET_ERROR_TYPE_DEBUG,
2109 "%s: Client sent HELLO when in state %d\n",
2110 GNUNET_i2s (&socket->other_peer),
2112 /* FIXME: Send RESET? */
2120 * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
2122 * @param cls the closure
2123 * @param tunnel connection to the other end
2124 * @param tunnel_ctx the socket
2125 * @param sender who sent the message
2126 * @param message the actual message
2127 * @param atsi performance data for the connection
2128 * @return GNUNET_OK to keep the connection open,
2129 * GNUNET_SYSERR to close it (signal serious error)
2132 server_handle_hello_ack (void *cls,
2133 struct GNUNET_MESH_Tunnel *tunnel,
2135 const struct GNUNET_PeerIdentity *sender,
2136 const struct GNUNET_MessageHeader *message,
2137 const struct GNUNET_ATS_Information*atsi)
2139 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2140 const struct GNUNET_STREAM_HelloAckMessage *ack_message;
2142 GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK ==
2143 ntohs (message->type));
2144 GNUNET_assert (socket->tunnel == tunnel);
2145 ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
2146 if (STATE_HELLO_WAIT == socket->state)
2148 LOG (GNUNET_ERROR_TYPE_DEBUG,
2149 "%s: Received HELLO_ACK from %s\n",
2150 GNUNET_i2s (&socket->other_peer),
2151 GNUNET_i2s (&socket->other_peer));
2152 socket->read_sequence_number = ntohl (ack_message->sequence_number);
2153 LOG (GNUNET_ERROR_TYPE_DEBUG,
2154 "%s: Read sequence number %u\n",
2155 GNUNET_i2s (&socket->other_peer),
2156 (unsigned int) socket->read_sequence_number);
2157 socket->receiver_window_available =
2158 ntohl (ack_message->receiver_window_size);
2159 /* Attain ESTABLISHED state */
2160 set_state_established (NULL, socket);
2164 LOG (GNUNET_ERROR_TYPE_DEBUG,
2165 "Client sent HELLO_ACK when in state %d\n", socket->state);
2166 /* FIXME: Send RESET? */
2174 * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
2176 * @param cls the closure
2177 * @param tunnel connection to the other end
2178 * @param tunnel_ctx the socket
2179 * @param sender who sent the message
2180 * @param message the actual message
2181 * @param atsi performance data for the connection
2182 * @return GNUNET_OK to keep the connection open,
2183 * GNUNET_SYSERR to close it (signal serious error)
2186 server_handle_reset (void *cls,
2187 struct GNUNET_MESH_Tunnel *tunnel,
2189 const struct GNUNET_PeerIdentity *sender,
2190 const struct GNUNET_MessageHeader *message,
2191 const struct GNUNET_ATS_Information*atsi)
2193 // struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2200 * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
2202 * @param cls the closure
2203 * @param tunnel connection to the other end
2204 * @param tunnel_ctx the socket
2205 * @param sender who sent the message
2206 * @param message the actual message
2207 * @param atsi performance data for the connection
2208 * @return GNUNET_OK to keep the connection open,
2209 * GNUNET_SYSERR to close it (signal serious error)
2212 server_handle_transmit_close (void *cls,
2213 struct GNUNET_MESH_Tunnel *tunnel,
2215 const struct GNUNET_PeerIdentity *sender,
2216 const struct GNUNET_MessageHeader *message,
2217 const struct GNUNET_ATS_Information*atsi)
2219 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2221 return handle_transmit_close (socket,
2224 (struct GNUNET_STREAM_MessageHeader *)message,
2230 * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
2232 * @param cls the closure
2233 * @param tunnel connection to the other end
2234 * @param tunnel_ctx the socket
2235 * @param sender who sent the message
2236 * @param message the actual message
2237 * @param atsi performance data for the connection
2238 * @return GNUNET_OK to keep the connection open,
2239 * GNUNET_SYSERR to close it (signal serious error)
2242 server_handle_transmit_close_ack (void *cls,
2243 struct GNUNET_MESH_Tunnel *tunnel,
2245 const struct GNUNET_PeerIdentity *sender,
2246 const struct GNUNET_MessageHeader *message,
2247 const struct GNUNET_ATS_Information*atsi)
2249 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2251 return handle_generic_close_ack (socket,
2254 (const struct GNUNET_STREAM_MessageHeader *)
2262 * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
2264 * @param cls the closure
2265 * @param tunnel connection to the other end
2266 * @param tunnel_ctx the socket
2267 * @param sender who sent the message
2268 * @param message the actual message
2269 * @param atsi performance data for the connection
2270 * @return GNUNET_OK to keep the connection open,
2271 * GNUNET_SYSERR to close it (signal serious error)
2274 server_handle_receive_close (void *cls,
2275 struct GNUNET_MESH_Tunnel *tunnel,
2277 const struct GNUNET_PeerIdentity *sender,
2278 const struct GNUNET_MessageHeader *message,
2279 const struct GNUNET_ATS_Information*atsi)
2281 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2284 handle_receive_close (socket,
2287 (const struct GNUNET_STREAM_MessageHeader *) message,
2293 * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
2295 * @param cls the closure
2296 * @param tunnel connection to the other end
2297 * @param tunnel_ctx the socket
2298 * @param sender who sent the message
2299 * @param message the actual message
2300 * @param atsi performance data for the connection
2301 * @return GNUNET_OK to keep the connection open,
2302 * GNUNET_SYSERR to close it (signal serious error)
2305 server_handle_receive_close_ack (void *cls,
2306 struct GNUNET_MESH_Tunnel *tunnel,
2308 const struct GNUNET_PeerIdentity *sender,
2309 const struct GNUNET_MessageHeader *message,
2310 const struct GNUNET_ATS_Information*atsi)
2312 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2314 return handle_generic_close_ack (socket,
2317 (const struct GNUNET_STREAM_MessageHeader *)
2325 * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
2327 * @param cls the listen socket (from GNUNET_MESH_connect in
2328 * GNUNET_STREAM_listen)
2329 * @param tunnel connection to the other end
2330 * @param tunnel_ctx the socket
2331 * @param sender who sent the message
2332 * @param message the actual message
2333 * @param atsi performance data for the connection
2334 * @return GNUNET_OK to keep the connection open,
2335 * GNUNET_SYSERR to close it (signal serious error)
2338 server_handle_close (void *cls,
2339 struct GNUNET_MESH_Tunnel *tunnel,
2341 const struct GNUNET_PeerIdentity *sender,
2342 const struct GNUNET_MessageHeader *message,
2343 const struct GNUNET_ATS_Information*atsi)
2345 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2347 return handle_close (socket,
2350 (const struct GNUNET_STREAM_MessageHeader *) message,
2356 * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
2358 * @param cls the closure
2359 * @param tunnel connection to the other end
2360 * @param tunnel_ctx the socket
2361 * @param sender who sent the message
2362 * @param message the actual message
2363 * @param atsi performance data for the connection
2364 * @return GNUNET_OK to keep the connection open,
2365 * GNUNET_SYSERR to close it (signal serious error)
2368 server_handle_close_ack (void *cls,
2369 struct GNUNET_MESH_Tunnel *tunnel,
2371 const struct GNUNET_PeerIdentity *sender,
2372 const struct GNUNET_MessageHeader *message,
2373 const struct GNUNET_ATS_Information*atsi)
2375 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2377 return handle_generic_close_ack (socket,
2380 (const struct GNUNET_STREAM_MessageHeader *)
2388 * Handler for DATA_ACK messages
2390 * @param socket the socket through which the ack was received
2391 * @param tunnel connection to the other end
2392 * @param sender who sent the message
2393 * @param ack the acknowledgment message
2394 * @param atsi performance data for the connection
2395 * @return GNUNET_OK to keep the connection open,
2396 * GNUNET_SYSERR to close it (signal serious error)
2399 handle_ack (struct GNUNET_STREAM_Socket *socket,
2400 struct GNUNET_MESH_Tunnel *tunnel,
2401 const struct GNUNET_PeerIdentity *sender,
2402 const struct GNUNET_STREAM_AckMessage *ack,
2403 const struct GNUNET_ATS_Information*atsi)
2405 unsigned int packet;
2406 int need_retransmission;
2407 uint32_t sequence_difference;
2409 if (0 != memcmp (sender,
2410 &socket->other_peer,
2411 sizeof (struct GNUNET_PeerIdentity)))
2413 LOG (GNUNET_ERROR_TYPE_DEBUG,
2414 "%s: Received ACK from non-confirming peer\n",
2415 GNUNET_i2s (&socket->other_peer));
2418 switch (socket->state)
2420 case (STATE_ESTABLISHED):
2421 case (STATE_RECEIVE_CLOSED):
2422 case (STATE_RECEIVE_CLOSE_WAIT):
2423 if (NULL == socket->write_handle)
2425 LOG (GNUNET_ERROR_TYPE_DEBUG,
2426 "%s: Received DATA_ACK when write_handle is NULL\n",
2427 GNUNET_i2s (&socket->other_peer));
2430 /* FIXME: increment in the base sequence number is breaking current flow
2432 if (!((socket->write_sequence_number
2433 - ntohl (ack->base_sequence_number)) < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
2435 LOG (GNUNET_ERROR_TYPE_DEBUG,
2436 "%s: Received DATA_ACK with unexpected base sequence number\n",
2437 GNUNET_i2s (&socket->other_peer));
2438 LOG (GNUNET_ERROR_TYPE_DEBUG,
2439 "%s: Current write sequence: %u; Ack's base sequence: %u\n",
2440 GNUNET_i2s (&socket->other_peer),
2441 socket->write_sequence_number,
2442 ntohl (ack->base_sequence_number));
2445 /* FIXME: include the case when write_handle is cancelled - ignore the
2447 LOG (GNUNET_ERROR_TYPE_DEBUG,
2448 "%s: Received DATA_ACK from %s\n",
2449 GNUNET_i2s (&socket->other_peer),
2450 GNUNET_i2s (&socket->other_peer));
2452 /* Cancel the retransmission task */
2453 if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
2455 GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
2456 socket->retransmission_timeout_task_id =
2457 GNUNET_SCHEDULER_NO_TASK;
2459 for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2461 if (NULL == socket->write_handle->messages[packet]) break;
2462 /* BS: Base sequence from ack; PS: sequence num of current packet */
2463 sequence_difference = ntohl (ack->base_sequence_number)
2464 - ntohl (socket->write_handle->messages[packet]->sequence_number);
2465 /* case where BS = PS + GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH */
2466 if ((sequence_difference == GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
2467 || ((sequence_difference < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
2468 && (0 != sequence_difference))) /* case: BS > PS and BS != PS*/
2470 ackbitmap_modify_bit (&socket->write_handle->ack_bitmap, packet,
2475 ackbitmap_is_bit_set (&socket->write_handle->ack_bitmap,
2476 -sequence_difference))/*inversion as PS >= BS */
2478 ackbitmap_modify_bit (&socket->write_handle->ack_bitmap, packet,
2482 /* Update the receive window remaining
2483 FIXME : Should update with the value from a data ack with greater
2485 socket->receiver_window_available =
2486 ntohl (ack->receive_window_remaining);
2487 /* Check if we have received all acknowledgements */
2488 need_retransmission = GNUNET_NO;
2489 for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2491 if (NULL == socket->write_handle->messages[packet]) break;
2492 if (GNUNET_YES != ackbitmap_is_bit_set
2493 (&socket->write_handle->ack_bitmap,packet))
2495 need_retransmission = GNUNET_YES;
2499 if (GNUNET_YES == need_retransmission)
2501 write_data (socket);
2503 else /* We have to call the write continuation callback now */
2505 struct GNUNET_STREAM_IOWriteHandle *write_handle;
2507 /* Free the packets */
2508 for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2510 GNUNET_free_non_null (socket->write_handle->messages[packet]);
2512 write_handle = socket->write_handle;
2513 socket->write_handle = NULL;
2514 if (NULL != write_handle->write_cont)
2515 write_handle->write_cont (write_handle->write_cont_cls,
2517 write_handle->size);
2518 /* We are done with the write handle - Freeing it */
2519 GNUNET_free (write_handle);
2520 LOG (GNUNET_ERROR_TYPE_DEBUG,
2521 "%s: Write completion callback completed\n",
2522 GNUNET_i2s (&socket->other_peer));
2533 * Handler for DATA_ACK messages
2535 * @param cls the 'struct GNUNET_STREAM_Socket'
2536 * @param tunnel connection to the other end
2537 * @param tunnel_ctx unused
2538 * @param sender who sent the message
2539 * @param message the actual message
2540 * @param atsi performance data for the connection
2541 * @return GNUNET_OK to keep the connection open,
2542 * GNUNET_SYSERR to close it (signal serious error)
2545 client_handle_ack (void *cls,
2546 struct GNUNET_MESH_Tunnel *tunnel,
2548 const struct GNUNET_PeerIdentity *sender,
2549 const struct GNUNET_MessageHeader *message,
2550 const struct GNUNET_ATS_Information*atsi)
2552 struct GNUNET_STREAM_Socket *socket = cls;
2553 const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2555 return handle_ack (socket, tunnel, sender, ack, atsi);
2560 * Handler for DATA_ACK messages
2562 * @param cls the server's listen socket
2563 * @param tunnel connection to the other end
2564 * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
2565 * @param sender who sent the message
2566 * @param message the actual message
2567 * @param atsi performance data for the connection
2568 * @return GNUNET_OK to keep the connection open,
2569 * GNUNET_SYSERR to close it (signal serious error)
2572 server_handle_ack (void *cls,
2573 struct GNUNET_MESH_Tunnel *tunnel,
2575 const struct GNUNET_PeerIdentity *sender,
2576 const struct GNUNET_MessageHeader *message,
2577 const struct GNUNET_ATS_Information*atsi)
2579 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2580 const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2582 return handle_ack (socket, tunnel, sender, ack, atsi);
2587 * For client message handlers, the stream socket is in the
2590 static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
2591 {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2592 {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK,
2593 sizeof (struct GNUNET_STREAM_AckMessage) },
2594 {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2595 sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2596 {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2597 sizeof (struct GNUNET_STREAM_MessageHeader)},
2598 {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2599 sizeof (struct GNUNET_STREAM_MessageHeader)},
2600 {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2601 sizeof (struct GNUNET_STREAM_MessageHeader)},
2602 {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2603 sizeof (struct GNUNET_STREAM_MessageHeader)},
2604 {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2605 sizeof (struct GNUNET_STREAM_MessageHeader)},
2606 {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2607 sizeof (struct GNUNET_STREAM_MessageHeader)},
2608 {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2609 sizeof (struct GNUNET_STREAM_MessageHeader)},
2615 * For server message handlers, the stream socket is in the
2616 * tunnel context, and the listen socket in the closure argument.
2618 static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
2619 {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2620 {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK,
2621 sizeof (struct GNUNET_STREAM_AckMessage) },
2622 {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO,
2623 sizeof (struct GNUNET_STREAM_MessageHeader)},
2624 {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2625 sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2626 {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2627 sizeof (struct GNUNET_STREAM_MessageHeader)},
2628 {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2629 sizeof (struct GNUNET_STREAM_MessageHeader)},
2630 {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2631 sizeof (struct GNUNET_STREAM_MessageHeader)},
2632 {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2633 sizeof (struct GNUNET_STREAM_MessageHeader)},
2634 {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2635 sizeof (struct GNUNET_STREAM_MessageHeader)},
2636 {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2637 sizeof (struct GNUNET_STREAM_MessageHeader)},
2638 {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2639 sizeof (struct GNUNET_STREAM_MessageHeader)},
2645 * Function called when our target peer is connected to our tunnel
2647 * @param cls the socket for which this tunnel is created
2648 * @param peer the peer identity of the target
2649 * @param atsi performance data for the connection
2652 mesh_peer_connect_callback (void *cls,
2653 const struct GNUNET_PeerIdentity *peer,
2654 const struct GNUNET_ATS_Information * atsi)
2656 struct GNUNET_STREAM_Socket *socket = cls;
2657 struct GNUNET_STREAM_MessageHeader *message;
2659 if (0 != memcmp (peer,
2660 &socket->other_peer,
2661 sizeof (struct GNUNET_PeerIdentity)))
2663 LOG (GNUNET_ERROR_TYPE_DEBUG,
2664 "%s: A peer which is not our target has connected to our tunnel\n",
2669 LOG (GNUNET_ERROR_TYPE_DEBUG,
2670 "%s: Target peer %s connected\n",
2671 GNUNET_i2s (&socket->other_peer),
2672 GNUNET_i2s (&socket->other_peer));
2674 /* Set state to INIT */
2675 socket->state = STATE_INIT;
2677 /* Send HELLO message */
2678 message = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
2679 message->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
2680 message->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
2681 queue_message (socket,
2683 &set_state_hello_wait,
2686 /* Call open callback */
2687 if (NULL == socket->open_cb)
2689 LOG (GNUNET_ERROR_TYPE_DEBUG,
2690 "STREAM_open callback is NULL\n");
2696 * Function called when our target peer is disconnected from our tunnel
2698 * @param cls the socket associated which this tunnel
2699 * @param peer the peer identity of the target
2702 mesh_peer_disconnect_callback (void *cls,
2703 const struct GNUNET_PeerIdentity *peer)
2705 struct GNUNET_STREAM_Socket *socket=cls;
2707 /* If the state is SHUTDOWN its ok; else set the state of the socket to SYSERR */
2708 LOG (GNUNET_ERROR_TYPE_DEBUG,
2709 "%s: Other peer %s disconnected \n",
2710 GNUNET_i2s (&socket->other_peer),
2711 GNUNET_i2s (&socket->other_peer));
2716 * Method called whenever a peer creates a tunnel to us
2718 * @param cls closure
2719 * @param tunnel new handle to the tunnel
2720 * @param initiator peer that started the tunnel
2721 * @param atsi performance information for the tunnel
2722 * @return initial tunnel context for the tunnel
2723 * (can be NULL -- that's not an error)
2726 new_tunnel_notify (void *cls,
2727 struct GNUNET_MESH_Tunnel *tunnel,
2728 const struct GNUNET_PeerIdentity *initiator,
2729 const struct GNUNET_ATS_Information *atsi)
2731 struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2732 struct GNUNET_STREAM_Socket *socket;
2734 /* FIXME: If a tunnel is already created, we should not accept new tunnels
2735 from the same peer again until the socket is closed */
2737 if (GNUNET_NO == lsocket->listening)
2739 LOG (GNUNET_ERROR_TYPE_DEBUG,
2740 "%s: Destroying tunnel from peer %s as we don't have the lock\n",
2741 GNUNET_i2s (&socket->other_peer),
2742 GNUNET_i2s (&socket->other_peer));
2743 GNUNET_MESH_tunnel_destroy (tunnel);
2746 socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2747 socket->other_peer = *initiator;
2748 socket->tunnel = tunnel;
2749 socket->session_id = 0; /* FIXME */
2750 socket->state = STATE_INIT;
2751 socket->lsocket = lsocket;
2752 socket->retransmit_timeout = lsocket->retransmit_timeout;
2753 socket->testing_active = lsocket->testing_active;
2754 socket->testing_set_write_sequence_number_value =
2755 lsocket->testing_set_write_sequence_number_value;
2757 LOG (GNUNET_ERROR_TYPE_DEBUG,
2758 "%s: Peer %s initiated tunnel to us\n",
2759 GNUNET_i2s (&socket->other_peer),
2760 GNUNET_i2s (&socket->other_peer));
2762 /* FIXME: Copy MESH handle from lsocket to socket */
2769 * Function called whenever an inbound tunnel is destroyed. Should clean up
2770 * any associated state. This function is NOT called if the client has
2771 * explicitly asked for the tunnel to be destroyed using
2772 * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
2775 * @param cls closure (set from GNUNET_MESH_connect)
2776 * @param tunnel connection to the other end (henceforth invalid)
2777 * @param tunnel_ctx place where local state associated
2778 * with the tunnel is stored
2781 tunnel_cleaner (void *cls,
2782 const struct GNUNET_MESH_Tunnel *tunnel,
2785 struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
2787 if (tunnel != socket->tunnel)
2791 LOG (GNUNET_ERROR_TYPE_DEBUG,
2792 "%s: Peer %s has terminated connection abruptly\n",
2793 GNUNET_i2s (&socket->other_peer),
2794 GNUNET_i2s (&socket->other_peer));
2796 socket->status = GNUNET_STREAM_SHUTDOWN;
2798 /* Clear Transmit handles */
2799 if (NULL != socket->transmit_handle)
2801 GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2802 socket->transmit_handle = NULL;
2804 if (NULL != socket->ack_transmit_handle)
2806 GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle);
2807 GNUNET_free (socket->ack_msg);
2808 socket->ack_msg = NULL;
2809 socket->ack_transmit_handle = NULL;
2811 /* Stop Tasks using socket->tunnel */
2812 if (GNUNET_SCHEDULER_NO_TASK != socket->ack_task_id)
2814 GNUNET_SCHEDULER_cancel (socket->ack_task_id);
2815 socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
2817 if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
2819 GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
2820 socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
2822 /* FIXME: Cancel all other tasks using socket->tunnel */
2823 socket->tunnel = NULL;
2828 * Callback to signal timeout on lockmanager lock acquire
2830 * @param cls the ListenSocket
2831 * @param tc the scheduler task context
2834 lockmanager_acquire_timeout (void *cls,
2835 const struct GNUNET_SCHEDULER_TaskContext *tc)
2837 struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2838 GNUNET_STREAM_ListenCallback listen_cb;
2839 void *listen_cb_cls;
2841 lsocket->lockmanager_acquire_timeout_task = GNUNET_SCHEDULER_NO_TASK;
2842 listen_cb = lsocket->listen_cb;
2843 listen_cb_cls = lsocket->listen_cb_cls;
2844 GNUNET_STREAM_listen_close (lsocket);
2845 if (NULL != listen_cb)
2846 listen_cb (listen_cb_cls, NULL, NULL);
2851 * Callback to notify us on the status changes on app_port lock
2853 * @param cls the ListenSocket
2854 * @param domain the domain name of the lock
2855 * @param lock the app_port
2856 * @param status the current status of the lock
2859 lock_status_change_cb (void *cls, const char *domain, uint32_t lock,
2860 enum GNUNET_LOCKMANAGER_Status status)
2862 struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2864 GNUNET_assert (lock == (uint32_t) lsocket->port);
2865 if (GNUNET_LOCKMANAGER_SUCCESS == status)
2867 lsocket->listening = GNUNET_YES;
2868 if (GNUNET_SCHEDULER_NO_TASK != lsocket->lockmanager_acquire_timeout_task)
2870 GNUNET_SCHEDULER_cancel (lsocket->lockmanager_acquire_timeout_task);
2871 lsocket->lockmanager_acquire_timeout_task = GNUNET_SCHEDULER_NO_TASK;
2873 if (NULL == lsocket->mesh)
2875 GNUNET_MESH_ApplicationType ports[] = {lsocket->port, 0};
2877 lsocket->mesh = GNUNET_MESH_connect (lsocket->cfg,
2878 RECEIVE_BUFFER_SIZE, /* FIXME: QUEUE size as parameter? */
2879 lsocket, /* Closure */
2882 server_message_handlers,
2884 GNUNET_assert (NULL != lsocket->mesh);
2885 if (NULL != lsocket->listen_ok_cb)
2887 (void) lsocket->listen_ok_cb ();
2891 if (GNUNET_LOCKMANAGER_RELEASE == status)
2892 lsocket->listening = GNUNET_NO;
2902 * Tries to open a stream to the target peer
2904 * @param cfg configuration to use
2905 * @param target the target peer to which the stream has to be opened
2906 * @param app_port the application port number which uniquely identifies this
2908 * @param open_cb this function will be called after stream has be established
2909 * @param open_cb_cls the closure for open_cb
2910 * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
2911 * @return if successful it returns the stream socket; NULL if stream cannot be
2914 struct GNUNET_STREAM_Socket *
2915 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
2916 const struct GNUNET_PeerIdentity *target,
2917 GNUNET_MESH_ApplicationType app_port,
2918 GNUNET_STREAM_OpenCallback open_cb,
2922 struct GNUNET_STREAM_Socket *socket;
2923 enum GNUNET_STREAM_Option option;
2924 GNUNET_MESH_ApplicationType ports[] = {app_port, 0};
2925 va_list vargs; /* Variable arguments */
2927 LOG (GNUNET_ERROR_TYPE_DEBUG,
2929 socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2930 socket->other_peer = *target;
2931 socket->open_cb = open_cb;
2932 socket->open_cls = open_cb_cls;
2934 socket->retransmit_timeout =
2935 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
2936 socket->testing_active = GNUNET_NO;
2937 va_start (vargs, open_cb_cls); /* Parse variable args */
2939 option = va_arg (vargs, enum GNUNET_STREAM_Option);
2942 case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
2943 /* Expect struct GNUNET_TIME_Relative */
2944 socket->retransmit_timeout = va_arg (vargs,
2945 struct GNUNET_TIME_Relative);
2947 case GNUNET_STREAM_OPTION_TESTING_SET_WRITE_SEQUENCE_NUMBER:
2948 socket->testing_active = GNUNET_YES;
2949 socket->testing_set_write_sequence_number_value = va_arg (vargs,
2952 case GNUNET_STREAM_OPTION_LISTEN_TIMEOUT:
2953 GNUNET_break (0); /* Option irrelevant in STREAM_open */
2955 case GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS:
2956 GNUNET_break (0); /* Option irrelevant in STREAM_open */
2958 case GNUNET_STREAM_OPTION_END:
2961 } while (GNUNET_STREAM_OPTION_END != option);
2962 va_end (vargs); /* End of variable args parsing */
2963 socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
2964 RECEIVE_BUFFER_SIZE, /* QUEUE size as parameter? */
2966 NULL, /* No inbound tunnel handler */
2967 NULL, /* No in-tunnel cleaner */
2968 client_message_handlers,
2969 ports); /* We don't get inbound tunnels */
2970 if (NULL == socket->mesh) /* Fail if we cannot connect to mesh */
2972 GNUNET_free (socket);
2976 /* Now create the mesh tunnel to target */
2977 LOG (GNUNET_ERROR_TYPE_DEBUG,
2978 "Creating MESH Tunnel\n");
2979 socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
2980 NULL, /* Tunnel context */
2981 &mesh_peer_connect_callback,
2982 &mesh_peer_disconnect_callback,
2984 GNUNET_assert (NULL != socket->tunnel);
2985 GNUNET_MESH_peer_request_connect_add (socket->tunnel,
2986 &socket->other_peer);
2988 LOG (GNUNET_ERROR_TYPE_DEBUG,
2989 "%s() END\n", __func__);
2995 * Shutdown the stream for reading or writing (similar to man 2 shutdown).
2997 * @param socket the stream socket
2998 * @param operation SHUT_RD, SHUT_WR or SHUT_RDWR
2999 * @param completion_cb the callback that will be called upon successful
3000 * shutdown of given operation
3001 * @param completion_cls the closure for the completion callback
3002 * @return the shutdown handle
3004 struct GNUNET_STREAM_ShutdownHandle *
3005 GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
3007 GNUNET_STREAM_ShutdownCompletion completion_cb,
3008 void *completion_cls)
3010 struct GNUNET_STREAM_ShutdownHandle *handle;
3011 struct GNUNET_STREAM_MessageHeader *msg;
3013 GNUNET_assert (NULL == socket->shutdown_handle);
3015 handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ShutdownHandle));
3016 handle->socket = socket;
3017 handle->completion_cb = completion_cb;
3018 handle->completion_cls = completion_cls;
3019 socket->shutdown_handle = handle;
3021 msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
3022 msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
3026 handle->operation = SHUT_RD;
3027 if (NULL != socket->read_handle)
3028 LOG (GNUNET_ERROR_TYPE_WARNING,
3029 "Existing read handle should be cancelled before shutting"
3031 msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
3032 queue_message (socket,
3034 &set_state_receive_close_wait,
3038 handle->operation = SHUT_WR;
3039 if (NULL != socket->write_handle)
3040 LOG (GNUNET_ERROR_TYPE_WARNING,
3041 "Existing write handle should be cancelled before shutting"
3043 msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
3044 queue_message (socket,
3046 &set_state_transmit_close_wait,
3050 handle->operation = SHUT_RDWR;
3051 if (NULL != socket->write_handle)
3052 LOG (GNUNET_ERROR_TYPE_WARNING,
3053 "Existing write handle should be cancelled before shutting"
3055 if (NULL != socket->read_handle)
3056 LOG (GNUNET_ERROR_TYPE_WARNING,
3057 "Existing read handle should be cancelled before shutting"
3059 msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
3060 queue_message (socket,
3062 &set_state_close_wait,
3066 LOG (GNUNET_ERROR_TYPE_WARNING,
3067 "GNUNET_STREAM_shutdown called with invalid value for "
3068 "parameter operation -- Ignoring\n");
3070 GNUNET_free (handle);
3073 handle->close_msg_retransmission_task_id =
3074 GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
3075 &close_msg_retransmission_task,
3082 * Cancels a pending shutdown
3084 * @param handle the shutdown handle returned from GNUNET_STREAM_shutdown
3087 GNUNET_STREAM_shutdown_cancel (struct GNUNET_STREAM_ShutdownHandle *handle)
3089 if (GNUNET_SCHEDULER_NO_TASK != handle->close_msg_retransmission_task_id)
3090 GNUNET_SCHEDULER_cancel (handle->close_msg_retransmission_task_id);
3091 GNUNET_free (handle);
3098 * @param socket the stream socket
3101 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
3103 struct MessageQueue *head;
3105 if (NULL != socket->read_handle)
3107 LOG (GNUNET_ERROR_TYPE_WARNING,
3108 "Closing STREAM socket when a read handle is pending\n");
3110 if (NULL != socket->write_handle)
3112 LOG (GNUNET_ERROR_TYPE_WARNING,
3113 "Closing STREAM socket when a write handle is pending\n");
3114 GNUNET_STREAM_io_write_cancel (socket->write_handle);
3115 //socket->write_handle = NULL;
3118 if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
3120 /* socket closed with read task pending!? */
3122 GNUNET_SCHEDULER_cancel (socket->read_task_id);
3123 socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
3126 /* Terminate the ack'ing tasks if they are still present */
3127 if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK)
3129 GNUNET_SCHEDULER_cancel (socket->ack_task_id);
3130 socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
3133 /* Clear Transmit handles */
3134 if (NULL != socket->transmit_handle)
3136 GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
3137 socket->transmit_handle = NULL;
3139 if (NULL != socket->ack_transmit_handle)
3141 GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle);
3142 GNUNET_free (socket->ack_msg);
3143 socket->ack_msg = NULL;
3144 socket->ack_transmit_handle = NULL;
3147 /* Clear existing message queue */
3148 while (NULL != (head = socket->queue_head)) {
3149 GNUNET_CONTAINER_DLL_remove (socket->queue_head,
3152 GNUNET_free (head->message);
3156 /* Close associated tunnel */
3157 if (NULL != socket->tunnel)
3159 GNUNET_MESH_tunnel_destroy (socket->tunnel);
3160 socket->tunnel = NULL;
3163 /* Close mesh connection */
3164 if (NULL != socket->mesh && NULL == socket->lsocket)
3166 GNUNET_MESH_disconnect (socket->mesh);
3167 socket->mesh = NULL;
3170 /* Release receive buffer */
3171 if (NULL != socket->receive_buffer)
3173 GNUNET_free (socket->receive_buffer);
3176 GNUNET_free (socket);
3181 * Listens for stream connections for a specific application ports
3183 * @param cfg the configuration to use
3184 * @param app_port the application port for which new streams will be accepted
3185 * @param listen_cb this function will be called when a peer tries to establish
3187 * @param listen_cb_cls closure for listen_cb
3188 * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
3189 * @return listen socket, NULL for any error
3191 struct GNUNET_STREAM_ListenSocket *
3192 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
3193 GNUNET_MESH_ApplicationType app_port,
3194 GNUNET_STREAM_ListenCallback listen_cb,
3195 void *listen_cb_cls,
3198 /* FIXME: Add variable args for passing configration options? */
3199 struct GNUNET_STREAM_ListenSocket *lsocket;
3200 struct GNUNET_TIME_Relative listen_timeout;
3201 enum GNUNET_STREAM_Option option;
3204 GNUNET_assert (NULL != listen_cb);
3205 lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
3206 lsocket->cfg = GNUNET_CONFIGURATION_dup (cfg);
3207 lsocket->lockmanager = GNUNET_LOCKMANAGER_connect (lsocket->cfg);
3208 if (NULL == lsocket->lockmanager)
3210 GNUNET_CONFIGURATION_destroy (lsocket->cfg);
3211 GNUNET_free (lsocket);
3214 lsocket->listening = GNUNET_NO;/* We listen when we get a lock on app_port */
3216 lsocket->retransmit_timeout =
3217 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
3218 lsocket->testing_active = GNUNET_NO;
3219 lsocket->listen_ok_cb = NULL;
3220 listen_timeout = TIME_REL_SECS (60); /* A minute for listen timeout */
3221 va_start (vargs, listen_cb_cls);
3223 option = va_arg (vargs, enum GNUNET_STREAM_Option);
3226 case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
3227 lsocket->retransmit_timeout = va_arg (vargs,
3228 struct GNUNET_TIME_Relative);
3230 case GNUNET_STREAM_OPTION_TESTING_SET_WRITE_SEQUENCE_NUMBER:
3231 lsocket->testing_active = GNUNET_YES;
3232 lsocket->testing_set_write_sequence_number_value = va_arg (vargs,
3235 case GNUNET_STREAM_OPTION_LISTEN_TIMEOUT:
3236 listen_timeout = GNUNET_TIME_relative_multiply
3237 (GNUNET_TIME_UNIT_MILLISECONDS, va_arg (vargs, uint32_t));
3239 case GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS:
3240 lsocket->listen_ok_cb = va_arg (vargs,
3241 GNUNET_STREAM_ListenSuccessCallback);
3243 case GNUNET_STREAM_OPTION_END:
3246 } while (GNUNET_STREAM_OPTION_END != option);
3248 lsocket->port = app_port;
3249 lsocket->listen_cb = listen_cb;
3250 lsocket->listen_cb_cls = listen_cb_cls;
3251 lsocket->locking_request =
3252 GNUNET_LOCKMANAGER_acquire_lock (lsocket->lockmanager, locking_domain,
3253 (uint32_t) lsocket->port,
3254 &lock_status_change_cb, lsocket);
3255 lsocket->lockmanager_acquire_timeout_task =
3256 GNUNET_SCHEDULER_add_delayed (listen_timeout,
3257 &lockmanager_acquire_timeout, lsocket);
3263 * Closes the listen socket
3265 * @param lsocket the listen socket
3268 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
3270 /* Close MESH connection */
3271 if (NULL != lsocket->mesh)
3272 GNUNET_MESH_disconnect (lsocket->mesh);
3273 GNUNET_CONFIGURATION_destroy (lsocket->cfg);
3274 if (GNUNET_SCHEDULER_NO_TASK != lsocket->lockmanager_acquire_timeout_task)
3275 GNUNET_SCHEDULER_cancel (lsocket->lockmanager_acquire_timeout_task);
3276 if (NULL != lsocket->locking_request)
3277 GNUNET_LOCKMANAGER_cancel_request (lsocket->locking_request);
3278 if (NULL != lsocket->lockmanager)
3279 GNUNET_LOCKMANAGER_disconnect (lsocket->lockmanager);
3280 GNUNET_free (lsocket);
3285 * Tries to write the given data to the stream. The maximum size of data that
3286 * can be written as part of a write operation is (64 * (64000 - sizeof (struct
3287 * GNUNET_STREAM_DataMessage))). If size is greater than this it is not an API
3288 * violation, however only the said number of maximum bytes will be written.
3290 * @param socket the socket representing a stream
3291 * @param data the data buffer from where the data is written into the stream
3292 * @param size the number of bytes to be written from the data buffer
3293 * @param timeout the timeout period
3294 * @param write_cont the function to call upon writing some bytes into the
3296 * @param write_cont_cls the closure
3298 * @return handle to cancel the operation; if a previous write is pending or
3299 * the stream has been shutdown for this operation then write_cont is
3300 * immediately called and NULL is returned.
3302 struct GNUNET_STREAM_IOWriteHandle *
3303 GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
3306 struct GNUNET_TIME_Relative timeout,
3307 GNUNET_STREAM_CompletionContinuation write_cont,
3308 void *write_cont_cls)
3310 unsigned int num_needed_packets;
3311 unsigned int packet;
3312 struct GNUNET_STREAM_IOWriteHandle *io_handle;
3313 uint32_t packet_size;
3314 uint32_t payload_size;
3315 struct GNUNET_STREAM_DataMessage *data_msg;
3317 struct GNUNET_TIME_Relative ack_deadline;
3319 LOG (GNUNET_ERROR_TYPE_DEBUG,
3322 /* Return NULL if there is already a write request pending */
3323 if (NULL != socket->write_handle)
3329 switch (socket->state)
3331 case STATE_TRANSMIT_CLOSED:
3332 case STATE_TRANSMIT_CLOSE_WAIT:
3334 case STATE_CLOSE_WAIT:
3335 if (NULL != write_cont)
3336 write_cont (write_cont_cls, GNUNET_STREAM_SHUTDOWN, 0);
3337 LOG (GNUNET_ERROR_TYPE_DEBUG,
3338 "%s() END\n", __func__);
3342 case STATE_HELLO_WAIT:
3343 if (NULL != write_cont)
3344 /* FIXME: GNUNET_STREAM_SYSERR?? */
3345 write_cont (write_cont_cls, GNUNET_STREAM_SYSERR, 0);
3346 LOG (GNUNET_ERROR_TYPE_DEBUG,
3347 "%s() END\n", __func__);
3349 case STATE_ESTABLISHED:
3350 case STATE_RECEIVE_CLOSED:
3351 case STATE_RECEIVE_CLOSE_WAIT:
3355 if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size)
3356 size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size;
3357 num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size;
3358 io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
3359 io_handle->socket = socket;
3360 io_handle->write_cont = write_cont;
3361 io_handle->write_cont_cls = write_cont_cls;
3362 io_handle->size = size;
3364 /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3365 determined from RTT */
3366 ack_deadline = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
3367 /* Divide the given buffer into packets for sending */
3368 for (packet=0; packet < num_needed_packets; packet++)
3370 if ((packet + 1) * max_payload_size < size)
3372 payload_size = max_payload_size;
3373 packet_size = MAX_PACKET_SIZE;
3377 payload_size = size - packet * max_payload_size;
3378 packet_size = payload_size + sizeof (struct
3379 GNUNET_STREAM_DataMessage);
3381 io_handle->messages[packet] = GNUNET_malloc (packet_size);
3382 io_handle->messages[packet]->header.header.size = htons (packet_size);
3383 io_handle->messages[packet]->header.header.type =
3384 htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
3385 io_handle->messages[packet]->sequence_number =
3386 htonl (socket->write_sequence_number++);
3387 io_handle->messages[packet]->offset = htonl (socket->write_offset);
3389 /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3390 determined from RTT */
3391 io_handle->messages[packet]->ack_deadline =
3392 GNUNET_TIME_relative_hton (ack_deadline);
3393 data_msg = io_handle->messages[packet];
3394 /* Copy data from given buffer to the packet */
3395 memcpy (&data_msg[1],
3398 sweep += payload_size;
3399 socket->write_offset += payload_size;
3401 socket->write_handle = io_handle;
3402 write_data (socket);
3404 LOG (GNUNET_ERROR_TYPE_DEBUG,
3405 "%s() END\n", __func__);
3412 * Tries to read data from the stream.
3414 * @param socket the socket representing a stream
3415 * @param timeout the timeout period
3416 * @param proc function to call with data (once only)
3417 * @param proc_cls the closure for proc
3419 * @return handle to cancel the operation; if the stream has been shutdown for
3420 * this type of opeartion then the DataProcessor is immediately
3421 * called with GNUNET_STREAM_SHUTDOWN as status and NULL if returned
3423 struct GNUNET_STREAM_IOReadHandle *
3424 GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
3425 struct GNUNET_TIME_Relative timeout,
3426 GNUNET_STREAM_DataProcessor proc,
3429 struct GNUNET_STREAM_IOReadHandle *read_handle;
3431 LOG (GNUNET_ERROR_TYPE_DEBUG,
3433 GNUNET_i2s (&socket->other_peer),
3435 /* Return NULL if there is already a read handle; the user has to cancel that
3436 first before continuing or has to wait until it is completed */
3437 if (NULL != socket->read_handle)
3439 GNUNET_assert (NULL != proc);
3440 switch (socket->state)
3442 case STATE_RECEIVE_CLOSED:
3443 case STATE_RECEIVE_CLOSE_WAIT:
3445 case STATE_CLOSE_WAIT:
3446 proc (proc_cls, GNUNET_STREAM_SHUTDOWN, NULL, 0);
3447 LOG (GNUNET_ERROR_TYPE_DEBUG,
3449 GNUNET_i2s (&socket->other_peer),
3455 read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
3456 read_handle->proc = proc;
3457 read_handle->proc_cls = proc_cls;
3458 read_handle->socket = socket;
3459 socket->read_handle = read_handle;
3460 /* Check if we have a packet at bitmap 0 */
3461 if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
3464 socket->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
3467 /* Setup the read timeout task */
3468 socket->read_io_timeout_task_id =
3469 GNUNET_SCHEDULER_add_delayed (timeout,
3472 LOG (GNUNET_ERROR_TYPE_DEBUG,
3474 GNUNET_i2s (&socket->other_peer),
3481 * Cancel pending write operation.
3483 * @param ioh handle to operation to cancel
3486 GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
3488 struct GNUNET_STREAM_Socket *socket = ioh->socket;
3489 unsigned int packet;
3491 GNUNET_assert (NULL != socket->write_handle);
3492 GNUNET_assert (socket->write_handle == ioh);
3494 if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
3496 GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
3497 socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
3500 for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
3502 if (NULL == ioh->messages[packet]) break;
3503 GNUNET_free (ioh->messages[packet]);
3506 GNUNET_free (socket->write_handle);
3507 socket->write_handle = NULL;
3512 * Cancel pending read operation.
3514 * @param ioh handle to operation to cancel
3517 GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh)
3519 struct GNUNET_STREAM_Socket *socket;
3521 socket = ioh->socket;
3522 GNUNET_assert (NULL != socket->read_handle);
3523 GNUNET_assert (ioh == socket->read_handle);
3524 /* Read io time task should be there; if it is already executed then this
3525 read handle is not valid */
3526 GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != socket->read_io_timeout_task_id);
3527 GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task_id);
3528 socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
3529 /* reading task may be present; if so we have to stop it */
3530 if (GNUNET_SCHEDULER_NO_TASK != socket->read_task_id)
3532 GNUNET_SCHEDULER_cancel (socket->read_task_id);
3533 socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
3536 socket->read_handle = NULL;
3539 /* end of stream_api.c */