2 This file is part of GNUnet.
3 (C) 2012 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
23 * Checks for matching the sender and socket->other_peer in server
26 * Decrement PEER intern count during socket close and listen close to free the
27 * memory used for PEER interning
29 * Add code for write io timeout
31 * Include retransmission for control messages
35 * @file stream/stream_api.c
36 * @brief Implementation of the stream library
37 * @author Sree Harsha Totakura
42 #include "gnunet_common.h"
43 #include "gnunet_crypto_lib.h"
44 #include "gnunet_stream_lib.h"
45 #include "gnunet_testing_lib.h"
46 #include "stream_protocol.h"
50 * The maximum packet size of a stream packet
52 #define MAX_PACKET_SIZE 64000
57 #define RECEIVE_BUFFER_SIZE 4096000
60 * The maximum payload a data message packet can carry
62 static size_t max_payload_size =
63 MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
66 * states in the Protocol
71 * Client initialization state
76 * Listener initialization state
81 * Pre-connection establishment state
86 * State where a connection has been established
91 * State where the socket is closed on our side and waiting to be ACK'ed
93 STATE_RECEIVE_CLOSE_WAIT,
96 * State where the socket is closed for reading
101 * State where the socket is closed on our side and waiting to be ACK'ed
103 STATE_TRANSMIT_CLOSE_WAIT,
106 * State where the socket is closed for writing
108 STATE_TRANSMIT_CLOSED,
111 * State where the socket is closed on our side and waiting to be ACK'ed
116 * State where the socket is closed
123 * Functions of this type are called when a message is written
125 * @param cls the closure from queue_message
126 * @param socket the socket the written message was bound to
128 typedef void (*SendFinishCallback) (void *cls,
129 struct GNUNET_STREAM_Socket *socket);
133 * The send message queue
140 struct GNUNET_STREAM_MessageHeader *message;
143 * Callback to be called when the message is sent
145 SendFinishCallback finish_cb;
148 * The closure for finish_cb
153 * The next message in queue. Should be NULL in the last message
155 struct MessageQueue *next;
158 * The next message in queue. Should be NULL in the first message
160 struct MessageQueue *prev;
165 * The STREAM Socket Handler
167 struct GNUNET_STREAM_Socket
170 * Retransmission timeout
172 struct GNUNET_TIME_Relative retransmit_timeout;
175 * The Acknowledgement Bitmap
177 GNUNET_STREAM_AckBitmap ack_bitmap;
180 * Time when the Acknowledgement was queued
182 struct GNUNET_TIME_Absolute ack_time_registered;
185 * Queued Acknowledgement deadline
187 struct GNUNET_TIME_Relative ack_time_deadline;
192 struct GNUNET_MESH_Handle *mesh;
195 * The mesh tunnel handle
197 struct GNUNET_MESH_Tunnel *tunnel;
200 * Stream open closure
205 * Stream open callback
207 GNUNET_STREAM_OpenCallback open_cb;
210 * The current transmit handle (if a pending transmit request exists)
212 struct GNUNET_MESH_TransmitHandle *transmit_handle;
215 * The current act transmit handle (if a pending ack transmit request exists)
217 struct GNUNET_MESH_TransmitHandle *ack_transmit_handle;
220 * Pointer to the current ack message using in ack_task
222 struct GNUNET_STREAM_AckMessage *ack_msg;
225 * The current message associated with the transmit handle
227 struct MessageQueue *queue_head;
230 * The queue tail, should always point to the last message in queue
232 struct MessageQueue *queue_tail;
235 * The write IO_handle associated with this socket
237 struct GNUNET_STREAM_IOWriteHandle *write_handle;
240 * The read IO_handle associated with this socket
242 struct GNUNET_STREAM_IOReadHandle *read_handle;
245 * The shutdown handle associated with this socket
247 struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
250 * Buffer for storing received messages
252 void *receive_buffer;
255 * The listen socket from which this socket is derived. Should be NULL if it
256 * is not a derived socket
258 struct GNUNET_STREAM_ListenSocket *lsocket;
261 * Task identifier for the read io timeout task
263 GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id;
266 * Task identifier for retransmission task after timeout
268 GNUNET_SCHEDULER_TaskIdentifier retransmission_timeout_task_id;
271 * The task for sending timely Acks
273 GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
276 * Task scheduled to continue a read operation.
278 GNUNET_SCHEDULER_TaskIdentifier read_task_id;
281 * The state of the protocol associated with this socket
286 * The status of the socket
288 enum GNUNET_STREAM_Status status;
291 * The number of previous timeouts; FIXME: currently not used
293 unsigned int retries;
296 * The peer identity of the peer at the other end of the stream
298 GNUNET_PEER_Id other_peer;
301 * Our Peer Identity (for debugging)
303 GNUNET_PEER_Id our_id;
306 * The application port number (type: uint32_t)
308 GNUNET_MESH_ApplicationType app_port;
311 * The session id associated with this stream connection
312 * FIXME: Not used currently, may be removed
317 * Write sequence number. Set to random when sending HELLO(client) and
320 uint32_t write_sequence_number;
323 * Read sequence number. This number's value is determined during handshake
325 uint32_t read_sequence_number;
328 * The receiver buffer size
330 uint32_t receive_buffer_size;
333 * The receiver buffer boundaries
335 uint32_t receive_buffer_boundaries[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
338 * receiver's available buffer after the last acknowledged packet
340 uint32_t receiver_window_available;
343 * The offset pointer used during write operation
345 uint32_t write_offset;
348 * The offset after which we are expecting data
350 uint32_t read_offset;
353 * The offset upto which user has read from the received buffer
355 uint32_t copy_offset;
360 * A socket for listening
362 struct GNUNET_STREAM_ListenSocket
367 struct GNUNET_MESH_Handle *mesh;
370 * The callback function which is called after successful opening socket
372 GNUNET_STREAM_ListenCallback listen_cb;
375 * The call back closure
380 * Our interned Peer's identity
382 GNUNET_PEER_Id our_id;
386 * FIXME: Remove if not required!
388 GNUNET_MESH_ApplicationType port;
393 * The IO Write Handle
395 struct GNUNET_STREAM_IOWriteHandle
398 * The socket to which this write handle is associated
400 struct GNUNET_STREAM_Socket *socket;
403 * The packet_buffers associated with this Handle
405 struct GNUNET_STREAM_DataMessage *messages[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
408 * The write continuation callback
410 GNUNET_STREAM_CompletionContinuation write_cont;
413 * Write continuation closure
415 void *write_cont_cls;
418 * The bitmap of this IOHandle; Corresponding bit for a message is set when
419 * it has been acknowledged by the receiver
421 GNUNET_STREAM_AckBitmap ack_bitmap;
424 * Number of bytes in this write handle
433 struct GNUNET_STREAM_IOReadHandle
436 * Callback for the read processor
438 GNUNET_STREAM_DataProcessor proc;
441 * The closure pointer for the read processor callback
448 * Handle for Shutdown
450 struct GNUNET_STREAM_ShutdownHandle
453 * The socket associated with this shutdown handle
455 struct GNUNET_STREAM_Socket *socket;
458 * Shutdown completion callback
460 GNUNET_STREAM_ShutdownCompletion completion_cb;
463 * Closure for completion callback
465 void *completion_cls;
468 * Close message retransmission task id
470 GNUNET_SCHEDULER_TaskIdentifier close_msg_retransmission_task_id;
473 * Which operation to shutdown? SHUT_RD, SHUT_WR or SHUT_RDWR
480 * Default value in seconds for various timeouts
482 static unsigned int default_timeout = 10;
486 * Callback function for sending queued message
488 * @param cls closure the socket
489 * @param size number of bytes available in buf
490 * @param buf where the callee should write the message
491 * @return number of bytes written to buf
494 send_message_notify (void *cls, size_t size, void *buf)
496 struct GNUNET_STREAM_Socket *socket = cls;
497 struct GNUNET_PeerIdentity target;
498 struct MessageQueue *head;
501 socket->transmit_handle = NULL; /* Remove the transmit handle */
502 head = socket->queue_head;
504 return 0; /* just to be safe */
505 GNUNET_PEER_resolve (socket->other_peer, &target);
506 if (0 == size) /* request timed out */
509 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
510 "Message sending timed out. Retry %d \n",
512 socket->transmit_handle =
513 GNUNET_MESH_notify_transmit_ready (socket->tunnel,
516 /* FIXME: exponential backoff */
517 socket->retransmit_timeout,
519 ntohs (head->message->header.size),
520 &send_message_notify,
525 ret = ntohs (head->message->header.size);
526 GNUNET_assert (size >= ret);
527 memcpy (buf, head->message, ret);
528 if (NULL != head->finish_cb)
530 head->finish_cb (head->finish_cb_cls, socket);
532 GNUNET_CONTAINER_DLL_remove (socket->queue_head,
535 GNUNET_free (head->message);
537 head = socket->queue_head;
538 if (NULL != head) /* more pending messages to send */
541 socket->transmit_handle =
542 GNUNET_MESH_notify_transmit_ready (socket->tunnel,
545 /* FIXME: exponential backoff */
546 socket->retransmit_timeout,
548 ntohs (head->message->header.size),
549 &send_message_notify,
557 * Queues a message for sending using the mesh connection of a socket
559 * @param socket the socket whose mesh connection is used
560 * @param message the message to be sent
561 * @param finish_cb the callback to be called when the message is sent
562 * @param finish_cb_cls the closure for the callback
565 queue_message (struct GNUNET_STREAM_Socket *socket,
566 struct GNUNET_STREAM_MessageHeader *message,
567 SendFinishCallback finish_cb,
570 struct MessageQueue *queue_entity;
571 struct GNUNET_PeerIdentity target;
574 ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA)
575 && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK));
577 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
578 "%x: Queueing message of type %d and size %d\n",
580 ntohs (message->header.type),
581 ntohs (message->header.size));
582 GNUNET_assert (NULL != message);
583 queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
584 queue_entity->message = message;
585 queue_entity->finish_cb = finish_cb;
586 queue_entity->finish_cb_cls = finish_cb_cls;
587 GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
590 if (NULL == socket->transmit_handle)
593 GNUNET_PEER_resolve (socket->other_peer, &target);
594 socket->transmit_handle =
595 GNUNET_MESH_notify_transmit_ready (socket->tunnel,
598 socket->retransmit_timeout,
600 ntohs (message->header.size),
601 &send_message_notify,
608 * Copies a message and queues it for sending using the mesh connection of
611 * @param socket the socket whose mesh connection is used
612 * @param message the message to be sent
613 * @param finish_cb the callback to be called when the message is sent
614 * @param finish_cb_cls the closure for the callback
617 copy_and_queue_message (struct GNUNET_STREAM_Socket *socket,
618 const struct GNUNET_STREAM_MessageHeader *message,
619 SendFinishCallback finish_cb,
622 struct GNUNET_STREAM_MessageHeader *msg_copy;
625 size = ntohs (message->header.size);
626 msg_copy = GNUNET_malloc (size);
627 memcpy (msg_copy, message, size);
628 queue_message (socket, msg_copy, finish_cb, finish_cb_cls);
633 * Callback function for sending ack message
635 * @param cls closure the ACK message created in ack_task
636 * @param size number of bytes available in buffer
637 * @param buf where the callee should write the message
638 * @return number of bytes written to buf
641 send_ack_notify (void *cls, size_t size, void *buf)
643 struct GNUNET_STREAM_Socket *socket = cls;
647 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
648 "%s called with size 0\n", __func__);
651 GNUNET_assert (ntohs (socket->ack_msg->header.header.size) <= size);
653 size = ntohs (socket->ack_msg->header.header.size);
654 memcpy (buf, socket->ack_msg, size);
656 GNUNET_free (socket->ack_msg);
657 socket->ack_msg = NULL;
658 socket->ack_transmit_handle = NULL;
663 * Writes data using the given socket. The amount of data written is limited by
664 * the receiver_window_size
666 * @param socket the socket to use
669 write_data (struct GNUNET_STREAM_Socket *socket);
672 * Task for retransmitting data messages if they aren't ACK before their ack
675 * @param cls the socket
676 * @param tc the Task context
679 retransmission_timeout_task (void *cls,
680 const struct GNUNET_SCHEDULER_TaskContext *tc)
682 struct GNUNET_STREAM_Socket *socket = cls;
684 if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
687 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
688 "%x: Retransmitting DATA...\n", socket->our_id);
689 socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
695 * Task for sending ACK message
697 * @param cls the socket
698 * @param tc the Task context
702 const struct GNUNET_SCHEDULER_TaskContext *tc)
704 struct GNUNET_STREAM_Socket *socket = cls;
705 struct GNUNET_STREAM_AckMessage *ack_msg;
706 struct GNUNET_PeerIdentity target;
708 if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
713 socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
715 /* Create the ACK Message */
716 ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
717 ack_msg->header.header.size = htons (sizeof (struct
718 GNUNET_STREAM_AckMessage));
719 ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
720 ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
721 ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
722 ack_msg->receive_window_remaining =
723 htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
725 socket->ack_msg = ack_msg;
726 GNUNET_PEER_resolve (socket->other_peer, &target);
727 /* Request MESH for sending ACK */
728 socket->ack_transmit_handle =
729 GNUNET_MESH_notify_transmit_ready (socket->tunnel,
732 socket->retransmit_timeout,
734 ntohs (ack_msg->header.header.size),
741 * Retransmission task for shutdown messages
743 * @param cls the shutdown handle
744 * @param tc the Task Context
747 close_msg_retransmission_task (void *cls,
748 const struct GNUNET_SCHEDULER_TaskContext *tc)
750 struct GNUNET_STREAM_ShutdownHandle *shutdown_handle = cls;
751 struct GNUNET_STREAM_MessageHeader *msg;
752 struct GNUNET_STREAM_Socket *socket;
754 GNUNET_assert (NULL != shutdown_handle);
755 socket = shutdown_handle->socket;
757 msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
758 msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
759 switch (shutdown_handle->operation)
762 msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
765 msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
768 msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
772 shutdown_handle->close_msg_retransmission_task_id =
773 GNUNET_SCHEDULER_NO_TASK;
776 queue_message (socket, msg, NULL, NULL);
777 shutdown_handle->close_msg_retransmission_task_id =
778 GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
779 &close_msg_retransmission_task,
785 * Function to modify a bit in GNUNET_STREAM_AckBitmap
787 * @param bitmap the bitmap to modify
788 * @param bit the bit number to modify
789 * @param value GNUNET_YES to on, GNUNET_NO to off
792 ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
796 GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
797 if (GNUNET_YES == value)
798 *bitmap |= (1LL << bit);
800 *bitmap &= ~(1LL << bit);
805 * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
807 * @param bitmap address of the bitmap that has to be checked
808 * @param bit the bit number to check
809 * @return GNUNET_YES if the bit is set; GNUNET_NO if not
812 ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
815 GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
816 return 0 != (*bitmap & (1LL << bit));
821 * Writes data using the given socket. The amount of data written is limited by
822 * the receiver_window_size
824 * @param socket the socket to use
827 write_data (struct GNUNET_STREAM_Socket *socket)
829 struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
830 int packet; /* Although an int, should never be negative */
834 /* Find the last acknowledged packet */
835 for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
837 if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
840 else if (NULL == io_handle->messages[packet])
843 /* Resend packets which weren't ack'ed */
844 for (packet=0; packet < ack_packet; packet++)
846 if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
849 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
850 "%x: Placing DATA message with sequence %u in send queue\n",
852 ntohl (io_handle->messages[packet]->sequence_number));
854 copy_and_queue_message (socket,
855 &io_handle->messages[packet]->header,
860 packet = ack_packet + 1;
861 /* Now send new packets if there is enough buffer space */
862 while ( (NULL != io_handle->messages[packet]) &&
863 (socket->receiver_window_available
864 >= ntohs (io_handle->messages[packet]->header.header.size)) )
866 socket->receiver_window_available -=
867 ntohs (io_handle->messages[packet]->header.header.size);
868 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
869 "%x: Placing DATA message with sequence %u in send queue\n",
871 ntohl (io_handle->messages[packet]->sequence_number));
872 copy_and_queue_message (socket,
873 &io_handle->messages[packet]->header,
879 if (GNUNET_SCHEDULER_NO_TASK == socket->retransmission_timeout_task_id)
880 socket->retransmission_timeout_task_id =
881 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
882 (GNUNET_TIME_UNIT_SECONDS, 8),
883 &retransmission_timeout_task,
889 * Task for calling the read processor
891 * @param cls the socket
892 * @param tc the task context
895 call_read_processor (void *cls,
896 const struct GNUNET_SCHEDULER_TaskContext *tc)
898 struct GNUNET_STREAM_Socket *socket = cls;
900 size_t valid_read_size;
902 uint32_t sequence_increase;
903 uint32_t offset_increase;
905 socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
906 if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
909 if (NULL == socket->receive_buffer)
912 GNUNET_assert (NULL != socket->read_handle);
913 GNUNET_assert (NULL != socket->read_handle->proc);
915 /* Check the bitmap for any holes */
916 for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
918 if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
922 /* We only call read processor if we have the first packet */
923 GNUNET_assert (0 < packet);
926 socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
928 GNUNET_assert (0 != valid_read_size);
930 /* Cancel the read_io_timeout_task */
931 GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task_id);
932 socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
934 /* Call the data processor */
935 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
936 "%x: Calling read processor\n",
939 socket->read_handle->proc (socket->read_handle->proc_cls,
941 socket->receive_buffer + socket->copy_offset,
943 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
944 "%x: Read processor read %d bytes\n",
947 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
948 "%x: Read processor completed successfully\n",
951 /* Free the read handle */
952 GNUNET_free (socket->read_handle);
953 socket->read_handle = NULL;
955 GNUNET_assert (read_size <= valid_read_size);
956 socket->copy_offset += read_size;
958 /* Determine upto which packet we can remove from the buffer */
959 for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
961 if (socket->copy_offset == socket->receive_buffer_boundaries[packet])
963 if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
967 /* If no packets can be removed we can't move the buffer */
968 if (0 == packet) return;
970 sequence_increase = packet;
971 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
972 "%x: Sequence increase after read processor completion: %u\n",
976 /* Shift the data in the receive buffer */
977 memmove (socket->receive_buffer,
978 socket->receive_buffer
979 + socket->receive_buffer_boundaries[sequence_increase-1],
980 socket->receive_buffer_size
981 - socket->receive_buffer_boundaries[sequence_increase-1]);
983 /* Shift the bitmap */
984 socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
986 /* Set read_sequence_number */
987 socket->read_sequence_number += sequence_increase;
989 /* Set read_offset */
990 offset_increase = socket->receive_buffer_boundaries[sequence_increase-1];
991 socket->read_offset += offset_increase;
993 /* Fix copy_offset */
994 GNUNET_assert (offset_increase <= socket->copy_offset);
995 socket->copy_offset -= offset_increase;
997 /* Fix relative boundaries */
998 for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
1000 if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase)
1002 socket->receive_buffer_boundaries[packet] =
1003 socket->receive_buffer_boundaries[packet + sequence_increase]
1007 socket->receive_buffer_boundaries[packet] = 0;
1013 * Cancels the existing read io handle
1015 * @param cls the closure from the SCHEDULER call
1016 * @param tc the task context
1019 read_io_timeout (void *cls,
1020 const struct GNUNET_SCHEDULER_TaskContext *tc)
1022 struct GNUNET_STREAM_Socket *socket = cls;
1023 GNUNET_STREAM_DataProcessor proc;
1026 socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
1027 if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
1029 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1030 "%x: Read task timedout - Cancelling it\n",
1032 GNUNET_SCHEDULER_cancel (socket->read_task_id);
1033 socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
1035 GNUNET_assert (NULL != socket->read_handle);
1036 proc = socket->read_handle->proc;
1037 proc_cls = socket->read_handle->proc_cls;
1039 GNUNET_free (socket->read_handle);
1040 socket->read_handle = NULL;
1041 /* Call the read processor to signal timeout */
1043 GNUNET_STREAM_TIMEOUT,
1050 * Handler for DATA messages; Same for both client and server
1052 * @param socket the socket through which the ack was received
1053 * @param tunnel connection to the other end
1054 * @param sender who sent the message
1055 * @param msg the data message
1056 * @param atsi performance data for the connection
1057 * @return GNUNET_OK to keep the connection open,
1058 * GNUNET_SYSERR to close it (signal serious error)
1061 handle_data (struct GNUNET_STREAM_Socket *socket,
1062 struct GNUNET_MESH_Tunnel *tunnel,
1063 const struct GNUNET_PeerIdentity *sender,
1064 const struct GNUNET_STREAM_DataMessage *msg,
1065 const struct GNUNET_ATS_Information*atsi)
1067 const void *payload;
1068 uint32_t bytes_needed;
1069 uint32_t relative_offset;
1070 uint32_t relative_sequence_number;
1073 size = htons (msg->header.header.size);
1074 if (size < sizeof (struct GNUNET_STREAM_DataMessage))
1076 GNUNET_break_op (0);
1077 return GNUNET_SYSERR;
1080 if (GNUNET_PEER_search (sender) != socket->other_peer)
1082 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1083 "%x: Received DATA from non-confirming peer\n",
1088 switch (socket->state)
1090 case STATE_ESTABLISHED:
1091 case STATE_TRANSMIT_CLOSED:
1092 case STATE_TRANSMIT_CLOSE_WAIT:
1094 /* check if the message's sequence number is in the range we are
1096 relative_sequence_number =
1097 ntohl (msg->sequence_number) - socket->read_sequence_number;
1098 if ( relative_sequence_number > GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
1100 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1101 "%x: Ignoring received message with sequence number %u\n",
1103 ntohl (msg->sequence_number));
1104 /* Start ACK sending task if one is not already present */
1105 if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1107 socket->ack_task_id =
1108 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1109 (msg->ack_deadline),
1116 /* Check if we have already seen this message */
1117 if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
1118 relative_sequence_number))
1120 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1121 "%x: Ignoring already received message with sequence "
1124 ntohl (msg->sequence_number));
1125 /* Start ACK sending task if one is not already present */
1126 if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1128 socket->ack_task_id =
1129 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1130 (msg->ack_deadline),
1137 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1138 "%x: Receiving DATA with sequence number: %u and size: %d "
1141 ntohl (msg->sequence_number),
1142 ntohs (msg->header.header.size),
1143 socket->other_peer);
1145 /* Check if we have to allocate the buffer */
1146 size -= sizeof (struct GNUNET_STREAM_DataMessage);
1147 relative_offset = ntohl (msg->offset) - socket->read_offset;
1148 bytes_needed = relative_offset + size;
1149 if (bytes_needed > socket->receive_buffer_size)
1151 if (bytes_needed <= RECEIVE_BUFFER_SIZE)
1153 socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
1155 socket->receive_buffer_size = bytes_needed;
1159 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1160 "%x: Cannot accommodate packet %d as buffer is",
1163 ntohl (msg->sequence_number));
1168 /* Copy Data to buffer */
1170 GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
1171 memcpy (socket->receive_buffer + relative_offset,
1174 socket->receive_buffer_boundaries[relative_sequence_number] =
1175 relative_offset + size;
1177 /* Modify the ACK bitmap */
1178 ackbitmap_modify_bit (&socket->ack_bitmap,
1179 relative_sequence_number,
1182 /* Start ACK sending task if one is not already present */
1183 if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1185 socket->ack_task_id =
1186 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1187 (msg->ack_deadline),
1192 if ((NULL != socket->read_handle) /* A read handle is waiting */
1193 /* There is no current read task */
1194 && (GNUNET_SCHEDULER_NO_TASK == socket->read_task_id)
1195 /* We have the first packet */
1196 && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap,
1199 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1200 "%x: Scheduling read processor\n",
1203 socket->read_task_id =
1204 GNUNET_SCHEDULER_add_now (&call_read_processor,
1211 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1212 "%x: Received data message when it cannot be handled\n",
1221 * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1223 * @param cls the socket (set from GNUNET_MESH_connect)
1224 * @param tunnel connection to the other end
1225 * @param tunnel_ctx place to store local state associated with the tunnel
1226 * @param sender who sent the message
1227 * @param message the actual message
1228 * @param atsi performance data for the connection
1229 * @return GNUNET_OK to keep the connection open,
1230 * GNUNET_SYSERR to close it (signal serious error)
1233 client_handle_data (void *cls,
1234 struct GNUNET_MESH_Tunnel *tunnel,
1236 const struct GNUNET_PeerIdentity *sender,
1237 const struct GNUNET_MessageHeader *message,
1238 const struct GNUNET_ATS_Information*atsi)
1240 struct GNUNET_STREAM_Socket *socket = cls;
1242 return handle_data (socket,
1245 (const struct GNUNET_STREAM_DataMessage *) message,
1251 * Callback to set state to ESTABLISHED
1253 * @param cls the closure from queue_message FIXME: document
1254 * @param socket the socket to requiring state change
1257 set_state_established (void *cls,
1258 struct GNUNET_STREAM_Socket *socket)
1260 struct GNUNET_PeerIdentity initiator_pid;
1262 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1263 "%x: Attaining ESTABLISHED state\n",
1265 socket->write_offset = 0;
1266 socket->read_offset = 0;
1267 socket->state = STATE_ESTABLISHED;
1268 /* FIXME: What if listen_cb is NULL */
1269 if (NULL != socket->lsocket)
1271 GNUNET_PEER_resolve (socket->other_peer, &initiator_pid);
1272 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1273 "%x: Calling listen callback\n",
1275 if (GNUNET_SYSERR ==
1276 socket->lsocket->listen_cb (socket->lsocket->listen_cb_cls,
1280 socket->state = STATE_CLOSED;
1281 /* FIXME: We should close in a decent way */
1282 GNUNET_MESH_tunnel_destroy (socket->tunnel); /* Destroy the tunnel */
1283 GNUNET_free (socket);
1286 else if (socket->open_cb)
1287 socket->open_cb (socket->open_cls, socket);
1292 * Callback to set state to HELLO_WAIT
1294 * @param cls the closure from queue_message
1295 * @param socket the socket to requiring state change
1298 set_state_hello_wait (void *cls,
1299 struct GNUNET_STREAM_Socket *socket)
1301 GNUNET_assert (STATE_INIT == socket->state);
1302 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1303 "%x: Attaining HELLO_WAIT state\n",
1305 socket->state = STATE_HELLO_WAIT;
1310 * Callback to set state to CLOSE_WAIT
1312 * @param cls the closure from queue_message
1313 * @param socket the socket requiring state change
1316 set_state_close_wait (void *cls,
1317 struct GNUNET_STREAM_Socket *socket)
1319 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1320 "%x: Attaing CLOSE_WAIT state\n",
1322 socket->state = STATE_CLOSE_WAIT;
1323 GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1324 socket->receive_buffer = NULL;
1325 socket->receive_buffer_size = 0;
1330 * Callback to set state to RECEIVE_CLOSE_WAIT
1332 * @param cls the closure from queue_message
1333 * @param socket the socket requiring state change
1336 set_state_receive_close_wait (void *cls,
1337 struct GNUNET_STREAM_Socket *socket)
1339 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1340 "%x: Attaing RECEIVE_CLOSE_WAIT state\n",
1342 socket->state = STATE_RECEIVE_CLOSE_WAIT;
1343 GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1344 socket->receive_buffer = NULL;
1345 socket->receive_buffer_size = 0;
1350 * Callback to set state to TRANSMIT_CLOSE_WAIT
1352 * @param cls the closure from queue_message
1353 * @param socket the socket requiring state change
1356 set_state_transmit_close_wait (void *cls,
1357 struct GNUNET_STREAM_Socket *socket)
1359 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1360 "%x: Attaing TRANSMIT_CLOSE_WAIT state\n",
1362 socket->state = STATE_TRANSMIT_CLOSE_WAIT;
1367 * Callback to set state to CLOSED
1369 * @param cls the closure from queue_message
1370 * @param socket the socket requiring state change
1373 set_state_closed (void *cls,
1374 struct GNUNET_STREAM_Socket *socket)
1376 socket->state = STATE_CLOSED;
1380 * Returns a new HelloAckMessage. Also sets the write sequence number for the
1383 * @param socket the socket for which this HelloAckMessage has to be generated
1384 * @return the HelloAckMessage
1386 static struct GNUNET_STREAM_HelloAckMessage *
1387 generate_hello_ack_msg (struct GNUNET_STREAM_Socket *socket)
1389 struct GNUNET_STREAM_HelloAckMessage *msg;
1391 /* Get the random sequence number */
1392 socket->write_sequence_number =
1393 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1394 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1395 "%x: Generated write sequence number %u\n",
1397 (unsigned int) socket->write_sequence_number);
1399 msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1400 msg->header.header.size =
1401 htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1402 msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1403 msg->sequence_number = htonl (socket->write_sequence_number);
1404 msg->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
1411 * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1413 * @param cls the socket (set from GNUNET_MESH_connect)
1414 * @param tunnel connection to the other end
1415 * @param tunnel_ctx this is NULL
1416 * @param sender who sent the message
1417 * @param message the actual message
1418 * @param atsi performance data for the connection
1419 * @return GNUNET_OK to keep the connection open,
1420 * GNUNET_SYSERR to close it (signal serious error)
1423 client_handle_hello_ack (void *cls,
1424 struct GNUNET_MESH_Tunnel *tunnel,
1426 const struct GNUNET_PeerIdentity *sender,
1427 const struct GNUNET_MessageHeader *message,
1428 const struct GNUNET_ATS_Information*atsi)
1430 struct GNUNET_STREAM_Socket *socket = cls;
1431 const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
1432 struct GNUNET_STREAM_HelloAckMessage *reply;
1434 if (GNUNET_PEER_search (sender) != socket->other_peer)
1436 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1437 "%x: Received HELLO_ACK from non-confirming peer\n",
1441 ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
1442 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1443 "%x: Received HELLO_ACK from %x\n",
1445 socket->other_peer);
1447 GNUNET_assert (socket->tunnel == tunnel);
1448 switch (socket->state)
1450 case STATE_HELLO_WAIT:
1451 socket->read_sequence_number = ntohl (ack_msg->sequence_number);
1452 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1453 "%x: Read sequence number %u\n",
1455 (unsigned int) socket->read_sequence_number);
1456 socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
1457 reply = generate_hello_ack_msg (socket);
1458 queue_message (socket,
1460 &set_state_established,
1463 case STATE_ESTABLISHED:
1464 case STATE_RECEIVE_CLOSE_WAIT:
1465 // call statistics (# ACKs ignored++)
1469 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1470 "%x: Server %x sent HELLO_ACK when in state %d\n",
1474 socket->state = STATE_CLOSED; // introduce STATE_ERROR?
1475 return GNUNET_SYSERR;
1482 * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1484 * @param cls the socket (set from GNUNET_MESH_connect)
1485 * @param tunnel connection to the other end
1486 * @param tunnel_ctx this is NULL
1487 * @param sender who sent the message
1488 * @param message the actual message
1489 * @param atsi performance data for the connection
1490 * @return GNUNET_OK to keep the connection open,
1491 * GNUNET_SYSERR to close it (signal serious error)
1494 client_handle_reset (void *cls,
1495 struct GNUNET_MESH_Tunnel *tunnel,
1497 const struct GNUNET_PeerIdentity *sender,
1498 const struct GNUNET_MessageHeader *message,
1499 const struct GNUNET_ATS_Information*atsi)
1501 // struct GNUNET_STREAM_Socket *socket = cls;
1508 * Common message handler for handling TRANSMIT_CLOSE messages
1510 * @param socket the socket through which the ack was received
1511 * @param tunnel connection to the other end
1512 * @param sender who sent the message
1513 * @param msg the transmit close message
1514 * @param atsi performance data for the connection
1515 * @return GNUNET_OK to keep the connection open,
1516 * GNUNET_SYSERR to close it (signal serious error)
1519 handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
1520 struct GNUNET_MESH_Tunnel *tunnel,
1521 const struct GNUNET_PeerIdentity *sender,
1522 const struct GNUNET_STREAM_MessageHeader *msg,
1523 const struct GNUNET_ATS_Information*atsi)
1525 struct GNUNET_STREAM_MessageHeader *reply;
1527 switch (socket->state)
1529 case STATE_ESTABLISHED:
1530 socket->state = STATE_RECEIVE_CLOSED;
1532 /* Send TRANSMIT_CLOSE_ACK */
1533 reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1534 reply->header.type =
1535 htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
1536 reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1537 queue_message (socket, reply, NULL, NULL);
1541 /* FIXME: Call statistics? */
1549 * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1551 * @param cls the socket (set from GNUNET_MESH_connect)
1552 * @param tunnel connection to the other end
1553 * @param tunnel_ctx this is NULL
1554 * @param sender who sent the message
1555 * @param message the actual message
1556 * @param atsi performance data for the connection
1557 * @return GNUNET_OK to keep the connection open,
1558 * GNUNET_SYSERR to close it (signal serious error)
1561 client_handle_transmit_close (void *cls,
1562 struct GNUNET_MESH_Tunnel *tunnel,
1564 const struct GNUNET_PeerIdentity *sender,
1565 const struct GNUNET_MessageHeader *message,
1566 const struct GNUNET_ATS_Information*atsi)
1568 struct GNUNET_STREAM_Socket *socket = cls;
1570 return handle_transmit_close (socket,
1573 (struct GNUNET_STREAM_MessageHeader *)message,
1579 * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_*_CLOSE_ACK messages
1581 * @param socket the socket
1582 * @param tunnel connection to the other end
1583 * @param sender who sent the message
1584 * @param message the actual message
1585 * @param atsi performance data for the connection
1586 * @param operation the close operation which is being ACK'ed
1587 * @return GNUNET_OK to keep the connection open,
1588 * GNUNET_SYSERR to close it (signal serious error)
1591 handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
1592 struct GNUNET_MESH_Tunnel *tunnel,
1593 const struct GNUNET_PeerIdentity *sender,
1594 const struct GNUNET_STREAM_MessageHeader *message,
1595 const struct GNUNET_ATS_Information *atsi,
1598 struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
1600 shutdown_handle = socket->shutdown_handle;
1601 if (NULL == shutdown_handle)
1603 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1604 "%x: Received *CLOSE_ACK when shutdown handle is NULL\n",
1612 switch (socket->state)
1614 case STATE_CLOSE_WAIT:
1615 if (SHUT_RDWR != shutdown_handle->operation)
1617 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1618 "%x: Received CLOSE_ACK when shutdown handle "
1619 "is not for SHUT_RDWR\n",
1624 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1625 "%x: Received CLOSE_ACK from %x\n",
1627 socket->other_peer);
1628 socket->state = STATE_CLOSED;
1631 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1632 "%x: Received CLOSE_ACK when in it not expected\n",
1639 switch (socket->state)
1641 case STATE_RECEIVE_CLOSE_WAIT:
1642 if (SHUT_RD != shutdown_handle->operation)
1644 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1645 "%x: Received RECEIVE_CLOSE_ACK when shutdown handle "
1646 "is not for SHUT_RD\n",
1651 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1652 "%x: Received RECEIVE_CLOSE_ACK from %x\n",
1654 socket->other_peer);
1655 socket->state = STATE_RECEIVE_CLOSED;
1658 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1659 "%x: Received RECEIVE_CLOSE_ACK when in it not expected\n",
1666 switch (socket->state)
1668 case STATE_TRANSMIT_CLOSE_WAIT:
1669 if (SHUT_WR != shutdown_handle->operation)
1671 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1672 "%x: Received TRANSMIT_CLOSE_ACK when shutdown handle "
1673 "is not for SHUT_WR\n",
1678 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1679 "%x: Received TRANSMIT_CLOSE_ACK from %x\n",
1681 socket->other_peer);
1682 socket->state = STATE_TRANSMIT_CLOSED;
1685 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1686 "%x: Received TRANSMIT_CLOSE_ACK when in it not expected\n",
1696 if (NULL != shutdown_handle->completion_cb) /* Shutdown completion */
1697 shutdown_handle->completion_cb(shutdown_handle->completion_cls,
1699 GNUNET_free (shutdown_handle); /* Free shutdown handle */
1700 socket->shutdown_handle = NULL;
1701 if (GNUNET_SCHEDULER_NO_TASK
1702 != shutdown_handle->close_msg_retransmission_task_id)
1704 GNUNET_SCHEDULER_cancel
1705 (shutdown_handle->close_msg_retransmission_task_id);
1706 shutdown_handle->close_msg_retransmission_task_id =
1707 GNUNET_SCHEDULER_NO_TASK;
1714 * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1716 * @param cls the socket (set from GNUNET_MESH_connect)
1717 * @param tunnel connection to the other end
1718 * @param tunnel_ctx this is NULL
1719 * @param sender who sent the message
1720 * @param message the actual message
1721 * @param atsi performance data for the connection
1722 * @return GNUNET_OK to keep the connection open,
1723 * GNUNET_SYSERR to close it (signal serious error)
1726 client_handle_transmit_close_ack (void *cls,
1727 struct GNUNET_MESH_Tunnel *tunnel,
1729 const struct GNUNET_PeerIdentity *sender,
1730 const struct GNUNET_MessageHeader *message,
1731 const struct GNUNET_ATS_Information*atsi)
1733 struct GNUNET_STREAM_Socket *socket = cls;
1735 return handle_generic_close_ack (socket,
1738 (const struct GNUNET_STREAM_MessageHeader *)
1746 * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1748 * @param socket the socket
1749 * @param tunnel connection to the other end
1750 * @param sender who sent the message
1751 * @param message the actual message
1752 * @param atsi performance data for the connection
1753 * @return GNUNET_OK to keep the connection open,
1754 * GNUNET_SYSERR to close it (signal serious error)
1757 handle_receive_close (struct GNUNET_STREAM_Socket *socket,
1758 struct GNUNET_MESH_Tunnel *tunnel,
1759 const struct GNUNET_PeerIdentity *sender,
1760 const struct GNUNET_STREAM_MessageHeader *message,
1761 const struct GNUNET_ATS_Information *atsi)
1763 struct GNUNET_STREAM_MessageHeader *receive_close_ack;
1765 switch (socket->state)
1769 case STATE_HELLO_WAIT:
1770 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1771 "%x: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
1778 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1779 "%x: Received RECEIVE_CLOSE from %x\n",
1781 socket->other_peer);
1783 GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1784 receive_close_ack->header.size =
1785 htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1786 receive_close_ack->header.type =
1787 htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK);
1788 queue_message (socket,
1793 /* FIXME: Handle the case where write handle is present; the write operation
1794 should be deemed as finised and the write continuation callback
1795 has to be called with the stream status GNUNET_STREAM_SHUTDOWN */
1801 * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1803 * @param cls the socket (set from GNUNET_MESH_connect)
1804 * @param tunnel connection to the other end
1805 * @param tunnel_ctx this is NULL
1806 * @param sender who sent the message
1807 * @param message the actual message
1808 * @param atsi performance data for the connection
1809 * @return GNUNET_OK to keep the connection open,
1810 * GNUNET_SYSERR to close it (signal serious error)
1813 client_handle_receive_close (void *cls,
1814 struct GNUNET_MESH_Tunnel *tunnel,
1816 const struct GNUNET_PeerIdentity *sender,
1817 const struct GNUNET_MessageHeader *message,
1818 const struct GNUNET_ATS_Information*atsi)
1820 struct GNUNET_STREAM_Socket *socket = cls;
1823 handle_receive_close (socket,
1826 (const struct GNUNET_STREAM_MessageHeader *) message,
1832 * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1834 * @param cls the socket (set from GNUNET_MESH_connect)
1835 * @param tunnel connection to the other end
1836 * @param tunnel_ctx this is NULL
1837 * @param sender who sent the message
1838 * @param message the actual message
1839 * @param atsi performance data for the connection
1840 * @return GNUNET_OK to keep the connection open,
1841 * GNUNET_SYSERR to close it (signal serious error)
1844 client_handle_receive_close_ack (void *cls,
1845 struct GNUNET_MESH_Tunnel *tunnel,
1847 const struct GNUNET_PeerIdentity *sender,
1848 const struct GNUNET_MessageHeader *message,
1849 const struct GNUNET_ATS_Information*atsi)
1851 struct GNUNET_STREAM_Socket *socket = cls;
1853 return handle_generic_close_ack (socket,
1856 (const struct GNUNET_STREAM_MessageHeader *)
1864 * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1866 * @param socket the socket
1867 * @param tunnel connection to the other end
1868 * @param sender who sent the message
1869 * @param message the actual message
1870 * @param atsi performance data for the connection
1871 * @return GNUNET_OK to keep the connection open,
1872 * GNUNET_SYSERR to close it (signal serious error)
1875 handle_close (struct GNUNET_STREAM_Socket *socket,
1876 struct GNUNET_MESH_Tunnel *tunnel,
1877 const struct GNUNET_PeerIdentity *sender,
1878 const struct GNUNET_STREAM_MessageHeader *message,
1879 const struct GNUNET_ATS_Information*atsi)
1881 struct GNUNET_STREAM_MessageHeader *close_ack;
1883 switch (socket->state)
1887 case STATE_HELLO_WAIT:
1888 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1889 "%x: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
1896 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1897 "%x: Received CLOSE from %x\n",
1899 socket->other_peer);
1900 close_ack = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1901 close_ack->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1902 close_ack->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK);
1903 queue_message (socket,
1907 if (socket->state == STATE_CLOSED)
1910 GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1911 socket->receive_buffer = NULL;
1912 socket->receive_buffer_size = 0;
1918 * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1920 * @param cls the socket (set from GNUNET_MESH_connect)
1921 * @param tunnel connection to the other end
1922 * @param tunnel_ctx this is NULL
1923 * @param sender who sent the message
1924 * @param message the actual message
1925 * @param atsi performance data for the connection
1926 * @return GNUNET_OK to keep the connection open,
1927 * GNUNET_SYSERR to close it (signal serious error)
1930 client_handle_close (void *cls,
1931 struct GNUNET_MESH_Tunnel *tunnel,
1933 const struct GNUNET_PeerIdentity *sender,
1934 const struct GNUNET_MessageHeader *message,
1935 const struct GNUNET_ATS_Information*atsi)
1937 struct GNUNET_STREAM_Socket *socket = cls;
1939 return handle_close (socket,
1942 (const struct GNUNET_STREAM_MessageHeader *) message,
1948 * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1950 * @param cls the socket (set from GNUNET_MESH_connect)
1951 * @param tunnel connection to the other end
1952 * @param tunnel_ctx this is NULL
1953 * @param sender who sent the message
1954 * @param message the actual message
1955 * @param atsi performance data for the connection
1956 * @return GNUNET_OK to keep the connection open,
1957 * GNUNET_SYSERR to close it (signal serious error)
1960 client_handle_close_ack (void *cls,
1961 struct GNUNET_MESH_Tunnel *tunnel,
1963 const struct GNUNET_PeerIdentity *sender,
1964 const struct GNUNET_MessageHeader *message,
1965 const struct GNUNET_ATS_Information *atsi)
1967 struct GNUNET_STREAM_Socket *socket = cls;
1969 return handle_generic_close_ack (socket,
1972 (const struct GNUNET_STREAM_MessageHeader *)
1978 /*****************************/
1979 /* Server's Message Handlers */
1980 /*****************************/
1983 * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1985 * @param cls the closure
1986 * @param tunnel connection to the other end
1987 * @param tunnel_ctx the socket
1988 * @param sender who sent the message
1989 * @param message the actual message
1990 * @param atsi performance data for the connection
1991 * @return GNUNET_OK to keep the connection open,
1992 * GNUNET_SYSERR to close it (signal serious error)
1995 server_handle_data (void *cls,
1996 struct GNUNET_MESH_Tunnel *tunnel,
1998 const struct GNUNET_PeerIdentity *sender,
1999 const struct GNUNET_MessageHeader *message,
2000 const struct GNUNET_ATS_Information*atsi)
2002 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2004 return handle_data (socket,
2007 (const struct GNUNET_STREAM_DataMessage *)message,
2013 * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
2015 * @param cls the closure
2016 * @param tunnel connection to the other end
2017 * @param tunnel_ctx the socket
2018 * @param sender who sent the message
2019 * @param message the actual message
2020 * @param atsi performance data for the connection
2021 * @return GNUNET_OK to keep the connection open,
2022 * GNUNET_SYSERR to close it (signal serious error)
2025 server_handle_hello (void *cls,
2026 struct GNUNET_MESH_Tunnel *tunnel,
2028 const struct GNUNET_PeerIdentity *sender,
2029 const struct GNUNET_MessageHeader *message,
2030 const struct GNUNET_ATS_Information*atsi)
2032 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2033 struct GNUNET_STREAM_HelloAckMessage *reply;
2035 if (GNUNET_PEER_search (sender) != socket->other_peer)
2037 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2038 "%x: Received HELLO from non-confirming peer\n",
2043 GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO ==
2044 ntohs (message->type));
2045 GNUNET_assert (socket->tunnel == tunnel);
2046 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2047 "%x: Received HELLO from %x\n",
2049 socket->other_peer);
2051 if (STATE_INIT == socket->state)
2053 reply = generate_hello_ack_msg (socket);
2054 queue_message (socket,
2056 &set_state_hello_wait,
2061 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2062 "Client sent HELLO when in state %d\n", socket->state);
2063 /* FIXME: Send RESET? */
2071 * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
2073 * @param cls the closure
2074 * @param tunnel connection to the other end
2075 * @param tunnel_ctx the socket
2076 * @param sender who sent the message
2077 * @param message the actual message
2078 * @param atsi performance data for the connection
2079 * @return GNUNET_OK to keep the connection open,
2080 * GNUNET_SYSERR to close it (signal serious error)
2083 server_handle_hello_ack (void *cls,
2084 struct GNUNET_MESH_Tunnel *tunnel,
2086 const struct GNUNET_PeerIdentity *sender,
2087 const struct GNUNET_MessageHeader *message,
2088 const struct GNUNET_ATS_Information*atsi)
2090 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2091 const struct GNUNET_STREAM_HelloAckMessage *ack_message;
2093 GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK ==
2094 ntohs (message->type));
2095 GNUNET_assert (socket->tunnel == tunnel);
2096 ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
2097 if (STATE_HELLO_WAIT == socket->state)
2099 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2100 "%x: Received HELLO_ACK from %x\n",
2102 socket->other_peer);
2103 socket->read_sequence_number = ntohl (ack_message->sequence_number);
2104 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2105 "%x: Read sequence number %u\n",
2107 (unsigned int) socket->read_sequence_number);
2108 socket->receiver_window_available =
2109 ntohl (ack_message->receiver_window_size);
2110 /* Attain ESTABLISHED state */
2111 set_state_established (NULL, socket);
2115 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2116 "Client sent HELLO_ACK when in state %d\n", socket->state);
2117 /* FIXME: Send RESET? */
2125 * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
2127 * @param cls the closure
2128 * @param tunnel connection to the other end
2129 * @param tunnel_ctx the socket
2130 * @param sender who sent the message
2131 * @param message the actual message
2132 * @param atsi performance data for the connection
2133 * @return GNUNET_OK to keep the connection open,
2134 * GNUNET_SYSERR to close it (signal serious error)
2137 server_handle_reset (void *cls,
2138 struct GNUNET_MESH_Tunnel *tunnel,
2140 const struct GNUNET_PeerIdentity *sender,
2141 const struct GNUNET_MessageHeader *message,
2142 const struct GNUNET_ATS_Information*atsi)
2144 // struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2151 * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
2153 * @param cls the closure
2154 * @param tunnel connection to the other end
2155 * @param tunnel_ctx the socket
2156 * @param sender who sent the message
2157 * @param message the actual message
2158 * @param atsi performance data for the connection
2159 * @return GNUNET_OK to keep the connection open,
2160 * GNUNET_SYSERR to close it (signal serious error)
2163 server_handle_transmit_close (void *cls,
2164 struct GNUNET_MESH_Tunnel *tunnel,
2166 const struct GNUNET_PeerIdentity *sender,
2167 const struct GNUNET_MessageHeader *message,
2168 const struct GNUNET_ATS_Information*atsi)
2170 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2172 return handle_transmit_close (socket,
2175 (struct GNUNET_STREAM_MessageHeader *)message,
2181 * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
2183 * @param cls the closure
2184 * @param tunnel connection to the other end
2185 * @param tunnel_ctx the socket
2186 * @param sender who sent the message
2187 * @param message the actual message
2188 * @param atsi performance data for the connection
2189 * @return GNUNET_OK to keep the connection open,
2190 * GNUNET_SYSERR to close it (signal serious error)
2193 server_handle_transmit_close_ack (void *cls,
2194 struct GNUNET_MESH_Tunnel *tunnel,
2196 const struct GNUNET_PeerIdentity *sender,
2197 const struct GNUNET_MessageHeader *message,
2198 const struct GNUNET_ATS_Information*atsi)
2200 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2202 return handle_generic_close_ack (socket,
2205 (const struct GNUNET_STREAM_MessageHeader *)
2213 * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
2215 * @param cls the closure
2216 * @param tunnel connection to the other end
2217 * @param tunnel_ctx the socket
2218 * @param sender who sent the message
2219 * @param message the actual message
2220 * @param atsi performance data for the connection
2221 * @return GNUNET_OK to keep the connection open,
2222 * GNUNET_SYSERR to close it (signal serious error)
2225 server_handle_receive_close (void *cls,
2226 struct GNUNET_MESH_Tunnel *tunnel,
2228 const struct GNUNET_PeerIdentity *sender,
2229 const struct GNUNET_MessageHeader *message,
2230 const struct GNUNET_ATS_Information*atsi)
2232 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2235 handle_receive_close (socket,
2238 (const struct GNUNET_STREAM_MessageHeader *) message,
2244 * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
2246 * @param cls the closure
2247 * @param tunnel connection to the other end
2248 * @param tunnel_ctx the socket
2249 * @param sender who sent the message
2250 * @param message the actual message
2251 * @param atsi performance data for the connection
2252 * @return GNUNET_OK to keep the connection open,
2253 * GNUNET_SYSERR to close it (signal serious error)
2256 server_handle_receive_close_ack (void *cls,
2257 struct GNUNET_MESH_Tunnel *tunnel,
2259 const struct GNUNET_PeerIdentity *sender,
2260 const struct GNUNET_MessageHeader *message,
2261 const struct GNUNET_ATS_Information*atsi)
2263 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2265 return handle_generic_close_ack (socket,
2268 (const struct GNUNET_STREAM_MessageHeader *)
2276 * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
2278 * @param cls the listen socket (from GNUNET_MESH_connect in
2279 * GNUNET_STREAM_listen)
2280 * @param tunnel connection to the other end
2281 * @param tunnel_ctx the socket
2282 * @param sender who sent the message
2283 * @param message the actual message
2284 * @param atsi performance data for the connection
2285 * @return GNUNET_OK to keep the connection open,
2286 * GNUNET_SYSERR to close it (signal serious error)
2289 server_handle_close (void *cls,
2290 struct GNUNET_MESH_Tunnel *tunnel,
2292 const struct GNUNET_PeerIdentity *sender,
2293 const struct GNUNET_MessageHeader *message,
2294 const struct GNUNET_ATS_Information*atsi)
2296 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2298 return handle_close (socket,
2301 (const struct GNUNET_STREAM_MessageHeader *) message,
2307 * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
2309 * @param cls the closure
2310 * @param tunnel connection to the other end
2311 * @param tunnel_ctx the socket
2312 * @param sender who sent the message
2313 * @param message the actual message
2314 * @param atsi performance data for the connection
2315 * @return GNUNET_OK to keep the connection open,
2316 * GNUNET_SYSERR to close it (signal serious error)
2319 server_handle_close_ack (void *cls,
2320 struct GNUNET_MESH_Tunnel *tunnel,
2322 const struct GNUNET_PeerIdentity *sender,
2323 const struct GNUNET_MessageHeader *message,
2324 const struct GNUNET_ATS_Information*atsi)
2326 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2328 return handle_generic_close_ack (socket,
2331 (const struct GNUNET_STREAM_MessageHeader *)
2339 * Handler for DATA_ACK messages
2341 * @param socket the socket through which the ack was received
2342 * @param tunnel connection to the other end
2343 * @param sender who sent the message
2344 * @param ack the acknowledgment message
2345 * @param atsi performance data for the connection
2346 * @return GNUNET_OK to keep the connection open,
2347 * GNUNET_SYSERR to close it (signal serious error)
2350 handle_ack (struct GNUNET_STREAM_Socket *socket,
2351 struct GNUNET_MESH_Tunnel *tunnel,
2352 const struct GNUNET_PeerIdentity *sender,
2353 const struct GNUNET_STREAM_AckMessage *ack,
2354 const struct GNUNET_ATS_Information*atsi)
2356 unsigned int packet;
2357 int need_retransmission;
2360 if (GNUNET_PEER_search (sender) != socket->other_peer)
2362 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2363 "%x: Received ACK from non-confirming peer\n",
2368 switch (socket->state)
2370 case (STATE_ESTABLISHED):
2371 case (STATE_RECEIVE_CLOSED):
2372 case (STATE_RECEIVE_CLOSE_WAIT):
2373 if (NULL == socket->write_handle)
2375 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2376 "%x: Received DATA_ACK when write_handle is NULL\n",
2380 /* FIXME: increment in the base sequence number is breaking current flow
2382 if (!((socket->write_sequence_number
2383 - ntohl (ack->base_sequence_number)) < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
2385 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2386 "%x: Received DATA_ACK with unexpected base sequence "
2389 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2390 "%x: Current write sequence: %u; Ack's base sequence: %u\n",
2392 socket->write_sequence_number,
2393 ntohl (ack->base_sequence_number));
2396 /* FIXME: include the case when write_handle is cancelled - ignore the
2399 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2400 "%x: Received DATA_ACK from %x\n",
2402 socket->other_peer);
2404 /* Cancel the retransmission task */
2405 if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
2407 GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
2408 socket->retransmission_timeout_task_id =
2409 GNUNET_SCHEDULER_NO_TASK;
2412 for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2414 if (NULL == socket->write_handle->messages[packet]) break;
2415 if (ntohl (ack->base_sequence_number)
2416 >= ntohl (socket->write_handle->messages[packet]->sequence_number))
2417 ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
2422 ackbitmap_is_bit_set (&socket->write_handle->ack_bitmap,
2423 ntohl (socket->write_handle->messages[packet]->sequence_number)
2424 - ntohl (ack->base_sequence_number)))
2425 ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
2430 /* Update the receive window remaining
2431 FIXME : Should update with the value from a data ack with greater
2433 socket->receiver_window_available =
2434 ntohl (ack->receive_window_remaining);
2436 /* Check if we have received all acknowledgements */
2437 need_retransmission = GNUNET_NO;
2438 for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2440 if (NULL == socket->write_handle->messages[packet]) break;
2441 if (GNUNET_YES != ackbitmap_is_bit_set
2442 (&socket->write_handle->ack_bitmap,packet))
2444 need_retransmission = GNUNET_YES;
2448 if (GNUNET_YES == need_retransmission)
2450 write_data (socket);
2452 else /* We have to call the write continuation callback now */
2454 /* Free the packets */
2455 for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2457 GNUNET_free_non_null (socket->write_handle->messages[packet]);
2459 if (NULL != socket->write_handle->write_cont)
2460 socket->write_handle->write_cont
2461 (socket->write_handle->write_cont_cls,
2463 socket->write_handle->size);
2464 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2465 "%x: Write completion callback completed\n",
2467 /* We are done with the write handle - Freeing it */
2468 GNUNET_free (socket->write_handle);
2469 socket->write_handle = NULL;
2480 * Handler for DATA_ACK messages
2482 * @param cls the 'struct GNUNET_STREAM_Socket'
2483 * @param tunnel connection to the other end
2484 * @param tunnel_ctx unused
2485 * @param sender who sent the message
2486 * @param message the actual message
2487 * @param atsi performance data for the connection
2488 * @return GNUNET_OK to keep the connection open,
2489 * GNUNET_SYSERR to close it (signal serious error)
2492 client_handle_ack (void *cls,
2493 struct GNUNET_MESH_Tunnel *tunnel,
2495 const struct GNUNET_PeerIdentity *sender,
2496 const struct GNUNET_MessageHeader *message,
2497 const struct GNUNET_ATS_Information*atsi)
2499 struct GNUNET_STREAM_Socket *socket = cls;
2500 const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2502 return handle_ack (socket, tunnel, sender, ack, atsi);
2507 * Handler for DATA_ACK messages
2509 * @param cls the server's listen socket
2510 * @param tunnel connection to the other end
2511 * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
2512 * @param sender who sent the message
2513 * @param message the actual message
2514 * @param atsi performance data for the connection
2515 * @return GNUNET_OK to keep the connection open,
2516 * GNUNET_SYSERR to close it (signal serious error)
2519 server_handle_ack (void *cls,
2520 struct GNUNET_MESH_Tunnel *tunnel,
2522 const struct GNUNET_PeerIdentity *sender,
2523 const struct GNUNET_MessageHeader *message,
2524 const struct GNUNET_ATS_Information*atsi)
2526 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2527 const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2529 return handle_ack (socket, tunnel, sender, ack, atsi);
2534 * For client message handlers, the stream socket is in the
2537 static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
2538 {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2539 {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK,
2540 sizeof (struct GNUNET_STREAM_AckMessage) },
2541 {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2542 sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2543 {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2544 sizeof (struct GNUNET_STREAM_MessageHeader)},
2545 {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2546 sizeof (struct GNUNET_STREAM_MessageHeader)},
2547 {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2548 sizeof (struct GNUNET_STREAM_MessageHeader)},
2549 {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2550 sizeof (struct GNUNET_STREAM_MessageHeader)},
2551 {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2552 sizeof (struct GNUNET_STREAM_MessageHeader)},
2553 {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2554 sizeof (struct GNUNET_STREAM_MessageHeader)},
2555 {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2556 sizeof (struct GNUNET_STREAM_MessageHeader)},
2562 * For server message handlers, the stream socket is in the
2563 * tunnel context, and the listen socket in the closure argument.
2565 static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
2566 {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2567 {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK,
2568 sizeof (struct GNUNET_STREAM_AckMessage) },
2569 {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO,
2570 sizeof (struct GNUNET_STREAM_MessageHeader)},
2571 {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2572 sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2573 {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2574 sizeof (struct GNUNET_STREAM_MessageHeader)},
2575 {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2576 sizeof (struct GNUNET_STREAM_MessageHeader)},
2577 {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2578 sizeof (struct GNUNET_STREAM_MessageHeader)},
2579 {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2580 sizeof (struct GNUNET_STREAM_MessageHeader)},
2581 {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2582 sizeof (struct GNUNET_STREAM_MessageHeader)},
2583 {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2584 sizeof (struct GNUNET_STREAM_MessageHeader)},
2585 {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2586 sizeof (struct GNUNET_STREAM_MessageHeader)},
2592 * Function called when our target peer is connected to our tunnel
2594 * @param cls the socket for which this tunnel is created
2595 * @param peer the peer identity of the target
2596 * @param atsi performance data for the connection
2599 mesh_peer_connect_callback (void *cls,
2600 const struct GNUNET_PeerIdentity *peer,
2601 const struct GNUNET_ATS_Information * atsi)
2603 struct GNUNET_STREAM_Socket *socket = cls;
2604 struct GNUNET_STREAM_MessageHeader *message;
2605 GNUNET_PEER_Id connected_peer;
2607 connected_peer = GNUNET_PEER_search (peer);
2609 if (connected_peer != socket->other_peer)
2611 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2612 "%x: A peer which is not our target has connected",
2618 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2619 "%x: Target peer %x connected\n",
2623 /* Set state to INIT */
2624 socket->state = STATE_INIT;
2626 /* Send HELLO message */
2627 message = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
2628 message->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
2629 message->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
2630 queue_message (socket,
2632 &set_state_hello_wait,
2635 /* Call open callback */
2636 if (NULL == socket->open_cb)
2638 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2639 "STREAM_open callback is NULL\n");
2645 * Function called when our target peer is disconnected from our tunnel
2647 * @param cls the socket associated which this tunnel
2648 * @param peer the peer identity of the target
2651 mesh_peer_disconnect_callback (void *cls,
2652 const struct GNUNET_PeerIdentity *peer)
2654 struct GNUNET_STREAM_Socket *socket=cls;
2656 /* If the state is SHUTDOWN its ok; else set the state of the socket to SYSERR */
2657 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2658 "%x: Other peer %x disconnected \n",
2660 socket->other_peer);
2665 * Method called whenever a peer creates a tunnel to us
2667 * @param cls closure
2668 * @param tunnel new handle to the tunnel
2669 * @param initiator peer that started the tunnel
2670 * @param atsi performance information for the tunnel
2671 * @return initial tunnel context for the tunnel
2672 * (can be NULL -- that's not an error)
2675 new_tunnel_notify (void *cls,
2676 struct GNUNET_MESH_Tunnel *tunnel,
2677 const struct GNUNET_PeerIdentity *initiator,
2678 const struct GNUNET_ATS_Information *atsi)
2680 struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2681 struct GNUNET_STREAM_Socket *socket;
2683 /* FIXME: If a tunnel is already created, we should not accept new tunnels
2684 from the same peer again until the socket is closed */
2686 socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2687 socket->other_peer = GNUNET_PEER_intern (initiator);
2688 socket->tunnel = tunnel;
2689 socket->session_id = 0; /* FIXME */
2690 socket->state = STATE_INIT;
2691 socket->lsocket = lsocket;
2692 socket->our_id = lsocket->our_id;
2694 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2695 "%x: Peer %x initiated tunnel to us\n",
2697 socket->other_peer);
2699 /* FIXME: Copy MESH handle from lsocket to socket */
2706 * Function called whenever an inbound tunnel is destroyed. Should clean up
2707 * any associated state. This function is NOT called if the client has
2708 * explicitly asked for the tunnel to be destroyed using
2709 * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
2712 * @param cls closure (set from GNUNET_MESH_connect)
2713 * @param tunnel connection to the other end (henceforth invalid)
2714 * @param tunnel_ctx place where local state associated
2715 * with the tunnel is stored
2718 tunnel_cleaner (void *cls,
2719 const struct GNUNET_MESH_Tunnel *tunnel,
2722 struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
2724 if (tunnel != socket->tunnel)
2728 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2729 "%x: Peer %x has terminated connection abruptly\n",
2731 socket->other_peer);
2733 socket->status = GNUNET_STREAM_SHUTDOWN;
2735 /* Clear Transmit handles */
2736 if (NULL != socket->transmit_handle)
2738 GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2739 socket->transmit_handle = NULL;
2741 if (NULL != socket->ack_transmit_handle)
2743 GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle);
2744 GNUNET_free (socket->ack_msg);
2745 socket->ack_msg = NULL;
2746 socket->ack_transmit_handle = NULL;
2748 /* Stop Tasks using socket->tunnel */
2749 if (GNUNET_SCHEDULER_NO_TASK != socket->ack_task_id)
2751 GNUNET_SCHEDULER_cancel (socket->ack_task_id);
2752 socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
2754 if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
2756 GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
2757 socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
2759 /* FIXME: Cancel all other tasks using socket->tunnel */
2760 socket->tunnel = NULL;
2770 * Tries to open a stream to the target peer
2772 * @param cfg configuration to use
2773 * @param target the target peer to which the stream has to be opened
2774 * @param app_port the application port number which uniquely identifies this
2776 * @param open_cb this function will be called after stream has be established
2777 * @param open_cb_cls the closure for open_cb
2778 * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
2779 * @return if successful it returns the stream socket; NULL if stream cannot be
2782 struct GNUNET_STREAM_Socket *
2783 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
2784 const struct GNUNET_PeerIdentity *target,
2785 GNUNET_MESH_ApplicationType app_port,
2786 GNUNET_STREAM_OpenCallback open_cb,
2790 struct GNUNET_STREAM_Socket *socket;
2791 struct GNUNET_PeerIdentity own_peer_id;
2792 enum GNUNET_STREAM_Option option;
2793 GNUNET_MESH_ApplicationType ports[] = {app_port, 0};
2794 va_list vargs; /* Variable arguments */
2796 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2799 socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2800 socket->other_peer = GNUNET_PEER_intern (target);
2801 socket->open_cb = open_cb;
2802 socket->open_cls = open_cb_cls;
2803 GNUNET_TESTING_get_peer_identity (cfg, &own_peer_id);
2804 socket->our_id = GNUNET_PEER_intern (&own_peer_id);
2807 socket->retransmit_timeout =
2808 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
2810 va_start (vargs, open_cb_cls); /* Parse variable args */
2812 option = va_arg (vargs, enum GNUNET_STREAM_Option);
2815 case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
2816 /* Expect struct GNUNET_TIME_Relative */
2817 socket->retransmit_timeout = va_arg (vargs,
2818 struct GNUNET_TIME_Relative);
2820 case GNUNET_STREAM_OPTION_END:
2823 } while (GNUNET_STREAM_OPTION_END != option);
2824 va_end (vargs); /* End of variable args parsing */
2825 socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
2826 10, /* QUEUE size as parameter? */
2828 NULL, /* No inbound tunnel handler */
2829 NULL, /* No in-tunnel cleaner */
2830 client_message_handlers,
2831 ports); /* We don't get inbound tunnels */
2832 if (NULL == socket->mesh) /* Fail if we cannot connect to mesh */
2834 GNUNET_free (socket);
2838 /* Now create the mesh tunnel to target */
2839 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2840 "Creating MESH Tunnel\n");
2841 socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
2842 NULL, /* Tunnel context */
2843 &mesh_peer_connect_callback,
2844 &mesh_peer_disconnect_callback,
2846 GNUNET_assert (NULL != socket->tunnel);
2847 GNUNET_MESH_peer_request_connect_add (socket->tunnel,
2850 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2851 "%s() END\n", __func__);
2857 * Shutdown the stream for reading or writing (similar to man 2 shutdown).
2859 * @param socket the stream socket
2860 * @param operation SHUT_RD, SHUT_WR or SHUT_RDWR
2861 * @param completion_cb the callback that will be called upon successful
2862 * shutdown of given operation
2863 * @param completion_cls the closure for the completion callback
2864 * @return the shutdown handle
2866 struct GNUNET_STREAM_ShutdownHandle *
2867 GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
2869 GNUNET_STREAM_ShutdownCompletion completion_cb,
2870 void *completion_cls)
2872 struct GNUNET_STREAM_ShutdownHandle *handle;
2873 struct GNUNET_STREAM_MessageHeader *msg;
2875 GNUNET_assert (NULL == socket->shutdown_handle);
2877 handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ShutdownHandle));
2878 handle->socket = socket;
2879 handle->completion_cb = completion_cb;
2880 handle->completion_cls = completion_cls;
2881 socket->shutdown_handle = handle;
2883 msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
2884 msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
2888 handle->operation = SHUT_RD;
2889 if (NULL != socket->read_handle)
2890 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2891 "Existing read handle should be cancelled before shutting"
2893 msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
2894 queue_message (socket,
2896 &set_state_receive_close_wait,
2900 handle->operation = SHUT_WR;
2901 if (NULL != socket->write_handle)
2902 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2903 "Existing write handle should be cancelled before shutting"
2905 msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
2906 queue_message (socket,
2908 &set_state_transmit_close_wait,
2912 handle->operation = SHUT_RDWR;
2913 if (NULL != socket->write_handle)
2914 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2915 "Existing write handle should be cancelled before shutting"
2917 if (NULL != socket->read_handle)
2918 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2919 "Existing read handle should be cancelled before shutting"
2921 msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
2922 queue_message (socket,
2924 &set_state_close_wait,
2928 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2929 "GNUNET_STREAM_shutdown called with invalid value for "
2930 "parameter operation -- Ignoring\n");
2932 GNUNET_free (handle);
2935 handle->close_msg_retransmission_task_id =
2936 GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
2937 &close_msg_retransmission_task,
2944 * Cancels a pending shutdown
2946 * @param handle the shutdown handle returned from GNUNET_STREAM_shutdown
2949 GNUNET_STREAM_shutdown_cancel (struct GNUNET_STREAM_ShutdownHandle *handle)
2951 if (GNUNET_SCHEDULER_NO_TASK != handle->close_msg_retransmission_task_id)
2952 GNUNET_SCHEDULER_cancel (handle->close_msg_retransmission_task_id);
2953 GNUNET_free (handle);
2961 * @param socket the stream socket
2964 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
2966 struct MessageQueue *head;
2968 GNUNET_break (NULL == socket->read_handle);
2969 GNUNET_break (NULL == socket->write_handle);
2971 if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
2973 /* socket closed with read task pending!? */
2975 GNUNET_SCHEDULER_cancel (socket->read_task_id);
2976 socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
2979 /* Terminate the ack'ing tasks if they are still present */
2980 if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK)
2982 GNUNET_SCHEDULER_cancel (socket->ack_task_id);
2983 socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
2986 /* Clear Transmit handles */
2987 if (NULL != socket->transmit_handle)
2989 GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2990 socket->transmit_handle = NULL;
2992 if (NULL != socket->ack_transmit_handle)
2994 GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle);
2995 GNUNET_free (socket->ack_msg);
2996 socket->ack_msg = NULL;
2997 socket->ack_transmit_handle = NULL;
3000 /* Clear existing message queue */
3001 while (NULL != (head = socket->queue_head)) {
3002 GNUNET_CONTAINER_DLL_remove (socket->queue_head,
3005 GNUNET_free (head->message);
3009 /* Close associated tunnel */
3010 if (NULL != socket->tunnel)
3012 GNUNET_MESH_tunnel_destroy (socket->tunnel);
3013 socket->tunnel = NULL;
3016 /* Close mesh connection */
3017 if (NULL != socket->mesh && NULL == socket->lsocket)
3019 GNUNET_MESH_disconnect (socket->mesh);
3020 socket->mesh = NULL;
3023 /* Release receive buffer */
3024 if (NULL != socket->receive_buffer)
3026 GNUNET_free (socket->receive_buffer);
3029 GNUNET_free (socket);
3034 * Listens for stream connections for a specific application ports
3036 * @param cfg the configuration to use
3037 * @param app_port the application port for which new streams will be accepted
3038 * @param listen_cb this function will be called when a peer tries to establish
3040 * @param listen_cb_cls closure for listen_cb
3041 * @return listen socket, NULL for any error
3043 struct GNUNET_STREAM_ListenSocket *
3044 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
3045 GNUNET_MESH_ApplicationType app_port,
3046 GNUNET_STREAM_ListenCallback listen_cb,
3047 void *listen_cb_cls)
3049 /* FIXME: Add variable args for passing configration options? */
3050 struct GNUNET_STREAM_ListenSocket *lsocket;
3051 GNUNET_MESH_ApplicationType ports[] = {app_port, 0};
3052 struct GNUNET_PeerIdentity our_peer_id;
3054 lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
3055 lsocket->port = app_port;
3056 lsocket->listen_cb = listen_cb;
3057 lsocket->listen_cb_cls = listen_cb_cls;
3058 GNUNET_TESTING_get_peer_identity (cfg, &our_peer_id);
3059 lsocket->our_id = GNUNET_PEER_intern (&our_peer_id);
3060 lsocket->mesh = GNUNET_MESH_connect (cfg,
3061 10, /* FIXME: QUEUE size as parameter? */
3062 lsocket, /* Closure */
3065 server_message_handlers,
3067 GNUNET_assert (NULL != lsocket->mesh);
3073 * Closes the listen socket
3075 * @param lsocket the listen socket
3078 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
3080 /* Close MESH connection */
3081 GNUNET_assert (NULL != lsocket->mesh);
3082 GNUNET_MESH_disconnect (lsocket->mesh);
3084 GNUNET_free (lsocket);
3089 * Tries to write the given data to the stream. The maximum size of data that
3090 * can be written as part of a write operation is (64 * (64000 - sizeof (struct
3091 * GNUNET_STREAM_DataMessage))). If size is greater than this it is not an API
3092 * violation, however only the said number of maximum bytes will be written.
3094 * @param socket the socket representing a stream
3095 * @param data the data buffer from where the data is written into the stream
3096 * @param size the number of bytes to be written from the data buffer
3097 * @param timeout the timeout period
3098 * @param write_cont the function to call upon writing some bytes into the
3100 * @param write_cont_cls the closure
3102 * @return handle to cancel the operation; if a previous write is pending or
3103 * the stream has been shutdown for this operation then write_cont is
3104 * immediately called and NULL is returned.
3106 struct GNUNET_STREAM_IOWriteHandle *
3107 GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
3110 struct GNUNET_TIME_Relative timeout,
3111 GNUNET_STREAM_CompletionContinuation write_cont,
3112 void *write_cont_cls)
3114 unsigned int num_needed_packets;
3115 unsigned int packet;
3116 struct GNUNET_STREAM_IOWriteHandle *io_handle;
3117 uint32_t packet_size;
3118 uint32_t payload_size;
3119 struct GNUNET_STREAM_DataMessage *data_msg;
3121 struct GNUNET_TIME_Relative ack_deadline;
3123 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3126 /* Return NULL if there is already a write request pending */
3127 if (NULL != socket->write_handle)
3133 switch (socket->state)
3135 case STATE_TRANSMIT_CLOSED:
3136 case STATE_TRANSMIT_CLOSE_WAIT:
3138 case STATE_CLOSE_WAIT:
3139 if (NULL != write_cont)
3140 write_cont (write_cont_cls, GNUNET_STREAM_SHUTDOWN, 0);
3141 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3142 "%s() END\n", __func__);
3146 case STATE_HELLO_WAIT:
3147 if (NULL != write_cont)
3148 /* FIXME: GNUNET_STREAM_SYSERR?? */
3149 write_cont (write_cont_cls, GNUNET_STREAM_SYSERR, 0);
3150 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3151 "%s() END\n", __func__);
3153 case STATE_ESTABLISHED:
3154 case STATE_RECEIVE_CLOSED:
3155 case STATE_RECEIVE_CLOSE_WAIT:
3159 if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size)
3160 size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size;
3161 num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size;
3162 io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
3163 io_handle->socket = socket;
3164 io_handle->write_cont = write_cont;
3165 io_handle->write_cont_cls = write_cont_cls;
3166 io_handle->size = size;
3168 /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3169 determined from RTT */
3170 ack_deadline = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
3171 /* Divide the given buffer into packets for sending */
3172 for (packet=0; packet < num_needed_packets; packet++)
3174 if ((packet + 1) * max_payload_size < size)
3176 payload_size = max_payload_size;
3177 packet_size = MAX_PACKET_SIZE;
3181 payload_size = size - packet * max_payload_size;
3182 packet_size = payload_size + sizeof (struct
3183 GNUNET_STREAM_DataMessage);
3185 io_handle->messages[packet] = GNUNET_malloc (packet_size);
3186 io_handle->messages[packet]->header.header.size = htons (packet_size);
3187 io_handle->messages[packet]->header.header.type =
3188 htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
3189 io_handle->messages[packet]->sequence_number =
3190 htonl (socket->write_sequence_number++);
3191 io_handle->messages[packet]->offset = htonl (socket->write_offset);
3193 /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3194 determined from RTT */
3195 io_handle->messages[packet]->ack_deadline =
3196 GNUNET_TIME_relative_hton (ack_deadline);
3197 data_msg = io_handle->messages[packet];
3198 /* Copy data from given buffer to the packet */
3199 memcpy (&data_msg[1],
3202 sweep += payload_size;
3203 socket->write_offset += payload_size;
3205 socket->write_handle = io_handle;
3206 write_data (socket);
3208 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3209 "%s() END\n", __func__);
3217 * Tries to read data from the stream.
3219 * @param socket the socket representing a stream
3220 * @param timeout the timeout period
3221 * @param proc function to call with data (once only)
3222 * @param proc_cls the closure for proc
3224 * @return handle to cancel the operation; if the stream has been shutdown for
3225 * this type of opeartion then the DataProcessor is immediately
3226 * called with GNUNET_STREAM_SHUTDOWN as status and NULL if returned
3228 struct GNUNET_STREAM_IOReadHandle *
3229 GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
3230 struct GNUNET_TIME_Relative timeout,
3231 GNUNET_STREAM_DataProcessor proc,
3234 struct GNUNET_STREAM_IOReadHandle *read_handle;
3236 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3241 /* Return NULL if there is already a read handle; the user has to cancel that
3242 first before continuing or has to wait until it is completed */
3243 if (NULL != socket->read_handle) return NULL;
3245 GNUNET_assert (NULL != proc);
3247 switch (socket->state)
3249 case STATE_RECEIVE_CLOSED:
3250 case STATE_RECEIVE_CLOSE_WAIT:
3252 case STATE_CLOSE_WAIT:
3253 proc (proc_cls, GNUNET_STREAM_SHUTDOWN, NULL, 0);
3254 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3263 read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
3264 read_handle->proc = proc;
3265 read_handle->proc_cls = proc_cls;
3266 socket->read_handle = read_handle;
3268 /* Check if we have a packet at bitmap 0 */
3269 if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
3272 socket->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
3277 /* Setup the read timeout task */
3278 socket->read_io_timeout_task_id =
3279 GNUNET_SCHEDULER_add_delayed (timeout,
3282 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3291 * Cancel pending write operation.
3293 * @param ioh handle to operation to cancel
3296 GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
3298 struct GNUNET_STREAM_Socket *socket = ioh->socket;
3299 unsigned int packet;
3301 GNUNET_assert (NULL != socket->write_handle);
3302 GNUNET_assert (socket->write_handle == ioh);
3304 if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
3306 GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
3307 socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
3310 for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
3312 if (NULL == ioh->messages[packet]) break;
3313 GNUNET_free (ioh->messages[packet]);
3316 GNUNET_free (socket->write_handle);
3317 socket->write_handle = NULL;
3323 * Cancel pending read operation.
3325 * @param ioh handle to operation to cancel
3328 GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh)