72056fa62c6b042d8363a7727a10fa47afccf56e
[oweals/gnunet.git] / src / stream / stream_api.c
1 /*
2   This file is part of GNUnet.
3   (C) 2012 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /* TODO:
22  *
23  * Checks for matching the sender and socket->other_peer in server
24  * message handlers  
25  *
26  * Add code for write io timeout
27  *
28  * Include retransmission for control messages
29  **/
30
31 /**
32  * @file stream/stream_api.c
33  * @brief Implementation of the stream library
34  * @author Sree Harsha Totakura
35  */
36
37 #include "platform.h"
38 #include "gnunet_common.h"
39 #include "gnunet_util_lib.h"
40 #include "gnunet_lockmanager_service.h"
41 #include "gnunet_statistics_service.h"
42 #include "gnunet_stream_lib.h"
43 #include "stream_protocol.h"
44
45 /**
46  * Generic logging shorthand
47  */
48 #define LOG(kind,...)                                   \
49   GNUNET_log_from (kind, "stream-api", __VA_ARGS__)
50
51 /**
52  * Debug logging shorthand
53  */
54 #define LOG_DEBUG(...)                          \
55   LOG (GNUNET_ERROR_TYPE_DEBUG, __VA_ARGS__)
56
57 /**
58  * Time in relative seconds shorthand
59  */
60 #define TIME_REL_SECS(sec) \
61   GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, sec)
62
63 /**
64  * The maximum packet size of a stream packet
65  */
66 #define DEFAULT_MAX_PAYLOAD_SIZE 64000
67
68 /**
69  * Receive buffer
70  */
71 #define RECEIVE_BUFFER_SIZE 4096000
72
73 /**
74  * states in the Protocol
75  */
76 enum State
77   {
78     /**
79      * Client initialization state
80      */
81     STATE_INIT,
82
83     /**
84      * Listener initialization state 
85      */
86     STATE_LISTEN,
87
88     /**
89      * Pre-connection establishment state
90      */
91     STATE_HELLO_WAIT,
92
93     /**
94      * State where a connection has been established
95      */
96     STATE_ESTABLISHED,
97
98     /**
99      * State where the socket is closed on our side and waiting to be ACK'ed
100      */
101     STATE_RECEIVE_CLOSE_WAIT,
102
103     /**
104      * State where the socket is closed for reading
105      */
106     STATE_RECEIVE_CLOSED,
107
108     /**
109      * State where the socket is closed on our side and waiting to be ACK'ed
110      */
111     STATE_TRANSMIT_CLOSE_WAIT,
112
113     /**
114      * State where the socket is closed for writing
115      */
116     STATE_TRANSMIT_CLOSED,
117
118     /**
119      * State where the socket is closed on our side and waiting to be ACK'ed
120      */
121     STATE_CLOSE_WAIT,
122
123     /**
124      * State where the socket is closed
125      */
126     STATE_CLOSED 
127   };
128
129
130 /**
131  * Functions of this type are called when a message is written
132  *
133  * @param cls the closure from queue_message
134  * @param socket the socket the written message was bound to
135  */
136 typedef void (*SendFinishCallback) (void *cls,
137                                     struct GNUNET_STREAM_Socket *socket);
138
139
140 /**
141  * The send message queue
142  */
143 struct MessageQueue
144 {
145   /**
146    * The message
147    */
148   struct GNUNET_STREAM_MessageHeader *message;
149
150   /**
151    * Callback to be called when the message is sent
152    */
153   SendFinishCallback finish_cb;
154
155   /**
156    * The closure for finish_cb
157    */
158   void *finish_cb_cls;
159
160   /**
161    * The next message in queue. Should be NULL in the last message
162    */
163   struct MessageQueue *next;
164
165   /**
166    * The next message in queue. Should be NULL in the first message
167    */
168   struct MessageQueue *prev;
169 };
170
171
172 /**
173  * The STREAM Socket Handler
174  */
175 struct GNUNET_STREAM_Socket
176 {
177   /**
178    * The mesh handle
179    */
180   struct GNUNET_MESH_Handle *mesh;
181
182   /**
183    * Handle to statistics
184    */
185   struct GNUNET_STATISTICS_Handle *stat_handle;
186
187   /**
188    * The mesh tunnel handle
189    */
190   struct GNUNET_MESH_Tunnel *tunnel;
191
192   /**
193    * Stream open closure
194    */
195   void *open_cls;
196
197   /**
198    * Stream open callback
199    */
200   GNUNET_STREAM_OpenCallback open_cb;
201
202   /**
203    * The current transmit handle (if a pending transmit request exists)
204    */
205   struct GNUNET_MESH_TransmitHandle *transmit_handle;
206
207   /**
208    * The current message associated with the transmit handle
209    */
210   struct MessageQueue *queue_head;
211
212   /**
213    * The queue tail, should always point to the last message in queue
214    */
215   struct MessageQueue *queue_tail;
216
217   /**
218    * The write IO_handle associated with this socket
219    */
220   struct GNUNET_STREAM_IOWriteHandle *write_handle;
221
222   /**
223    * The read IO_handle associated with this socket
224    */
225   struct GNUNET_STREAM_IOReadHandle *read_handle;
226
227   /**
228    * The shutdown handle associated with this socket
229    */
230   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
231
232   /**
233    * Buffer for storing received messages
234    */
235   void *receive_buffer;
236
237   /**
238    * The listen socket from which this socket is derived. Should be NULL if it
239    * is not a derived socket
240    */
241   struct GNUNET_STREAM_ListenSocket *lsocket;
242
243   /**
244    * The peer identity of the peer at the other end of the stream
245    */
246   struct GNUNET_PeerIdentity other_peer;
247
248   /**
249    * The Acknowledgement Bitmap
250    */
251   GNUNET_STREAM_AckBitmap ack_bitmap;
252
253   /**
254    * Task identifier for retransmission task after timeout
255    */
256   GNUNET_SCHEDULER_TaskIdentifier data_retransmission_task_id;
257
258   /**
259    * Task identifier for retransmission of control messages
260    */
261   GNUNET_SCHEDULER_TaskIdentifier control_retransmission_task_id;
262
263   /**
264    * The task for sending timely Acks
265    */
266   GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
267
268   /**
269    * Retransmission timeout
270    */
271   struct GNUNET_TIME_Relative retransmit_timeout;
272
273   /**
274    * Time when the Acknowledgement was queued
275    */
276   struct GNUNET_TIME_Absolute ack_time_registered;
277
278   /**
279    * Queued Acknowledgement deadline
280    */
281   struct GNUNET_TIME_Relative ack_time_deadline;
282
283   /**
284    * Mesh transmit timeout
285    */
286   struct GNUNET_TIME_Relative mesh_retry_timeout;
287
288   /**
289    * Data retransmission timeout
290    */
291   struct GNUNET_TIME_Relative data_retransmit_timeout;
292
293   /**
294    * The state of the protocol associated with this socket
295    */
296   enum State state;
297
298   /**
299    * The status of the socket
300    */
301   enum GNUNET_STREAM_Status status;
302
303   /**
304    * Whether testing mode is active or not
305    */
306   int testing_active;
307
308   /**
309    * Is receive closed
310    */
311   int receive_closed;
312
313   /**
314    * Is transmission closed
315    */
316   int transmit_closed;
317
318   /**
319    * The application port number (type: uint32_t)
320    */
321   GNUNET_MESH_ApplicationType app_port;
322
323   /**
324    * The write sequence number to be set incase of testing
325    */
326   uint32_t testing_set_write_sequence_number_value;
327
328   /**
329    * Write sequence number. Set to random when sending HELLO(client) and
330    * HELLO_ACK(server) 
331    */
332   uint32_t write_sequence_number;
333
334   /**
335    * Read sequence number. This number's value is determined during handshake
336    */
337   uint32_t read_sequence_number;
338
339   /**
340    * The receiver buffer size
341    */
342   uint32_t receive_buffer_size;
343
344   /**
345    * The receiver buffer boundaries
346    */
347   uint32_t receive_buffer_boundaries[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
348
349   /**
350    * receiver's available buffer after the last acknowledged packet
351    */
352   uint32_t receiver_window_available;
353
354   /**
355    * The offset pointer used during write operation
356    */
357   uint32_t write_offset;
358
359   /**
360    * The offset after which we are expecting data
361    */
362   uint32_t read_offset;
363
364   /**
365    * The offset upto which user has read from the received buffer
366    */
367   uint32_t copy_offset;
368
369   /**
370    * The maximum size of the data message payload this stream handle can send
371    */
372   uint16_t max_payload_size;
373 };
374
375
376 /**
377  * A socket for listening
378  */
379 struct GNUNET_STREAM_ListenSocket
380 {
381   /**
382    * The mesh handle
383    */
384   struct GNUNET_MESH_Handle *mesh;
385
386   /**
387    * Handle to statistics
388    */
389   struct GNUNET_STATISTICS_Handle *stat_handle;
390
391   /**
392    * Our configuration
393    */
394   struct GNUNET_CONFIGURATION_Handle *cfg;
395
396   /**
397    * Handle to the lock manager service
398    */
399   struct GNUNET_LOCKMANAGER_Handle *lockmanager;
400
401   /**
402    * The active LockingRequest from lockmanager
403    */
404   struct GNUNET_LOCKMANAGER_LockingRequest *locking_request;
405
406   /**
407    * Callback to call after acquring a lock and listening
408    */
409   GNUNET_STREAM_ListenSuccessCallback listen_ok_cb;
410
411   /**
412    * The callback function which is called after successful opening socket
413    */
414   GNUNET_STREAM_ListenCallback listen_cb;
415
416   /**
417    * The call back closure
418    */
419   void *listen_cb_cls;
420
421   /**
422    * The service port
423    */
424   GNUNET_MESH_ApplicationType port;
425   
426   /**
427    * The id of the lockmanager timeout task
428    */
429   GNUNET_SCHEDULER_TaskIdentifier lockmanager_acquire_timeout_task;
430
431   /**
432    * The retransmit timeout
433    */
434   struct GNUNET_TIME_Relative retransmit_timeout;
435   
436   /**
437    * Listen enabled?
438    */
439   int listening;
440
441   /**
442    * Whether testing mode is active or not
443    */
444   int testing_active;
445
446   /**
447    * The write sequence number to be set incase of testing
448    */
449   uint32_t testing_set_write_sequence_number_value;
450
451   /**
452    * The maximum size of the data message payload this stream handle can send
453    */
454   uint16_t max_payload_size;
455
456 };
457
458
459 /**
460  * The IO Write Handle
461  */
462 struct GNUNET_STREAM_IOWriteHandle
463 {
464   /**
465    * The socket to which this write handle is associated
466    */
467   struct GNUNET_STREAM_Socket *socket;
468
469   /**
470    * The packet_buffers associated with this Handle
471    */
472   struct GNUNET_STREAM_DataMessage *messages[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
473
474   /**
475    * The write continuation callback
476    */
477   GNUNET_STREAM_CompletionContinuation write_cont;
478
479   /**
480    * Write continuation closure
481    */
482   void *write_cont_cls;
483
484   /**
485    * The bitmap of this IOHandle; Corresponding bit for a message is set when
486    * it has been acknowledged by the receiver
487    */
488   GNUNET_STREAM_AckBitmap ack_bitmap;
489
490   /**
491    * Number of bytes in this write handle
492    */
493   size_t size;
494
495   /**
496    * Number of packets already transmitted from this IO handle. Retransmitted
497    * packets are not taken into account here. This is used to determine which
498    * packets account for retransmission and which packets occupy buffer space at
499    * the receiver.
500    */
501   unsigned int packets_sent;
502 };
503
504
505 /**
506  * The IO Read Handle
507  */
508 struct GNUNET_STREAM_IOReadHandle
509 {
510   /**
511    * The socket to which this read handle is associated
512    */
513   struct GNUNET_STREAM_Socket *socket;
514   
515   /**
516    * Callback for the read processor
517    */
518   GNUNET_STREAM_DataProcessor proc;
519
520   /**
521    * The closure pointer for the read processor callback
522    */
523   void *proc_cls;
524
525   /**
526    * Task identifier for the read io timeout task
527    */
528   GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id;
529
530   /**
531    * Task scheduled to continue a read operation.
532    */
533   GNUNET_SCHEDULER_TaskIdentifier read_task_id;
534 };
535
536
537 /**
538  * Handle for Shutdown
539  */
540 struct GNUNET_STREAM_ShutdownHandle
541 {
542   /**
543    * The socket associated with this shutdown handle
544    */
545   struct GNUNET_STREAM_Socket *socket;
546
547   /**
548    * Shutdown completion callback
549    */
550   GNUNET_STREAM_ShutdownCompletion completion_cb;
551
552   /**
553    * Closure for completion callback
554    */
555   void *completion_cls;
556
557   /**
558    * Close message retransmission task id
559    */
560   GNUNET_SCHEDULER_TaskIdentifier close_msg_retransmission_task_id;
561
562   /**
563    * Task scheduled to call the shutdown continuation callback
564    */
565   GNUNET_SCHEDULER_TaskIdentifier call_cont_task_id;
566
567   /**
568    * Which operation to shutdown? SHUT_RD, SHUT_WR or SHUT_RDWR
569    */
570   int operation;  
571 };
572
573
574 /**
575  * Default value in seconds for various timeouts
576  */
577 static const unsigned int default_timeout = 10;
578
579 /**
580  * The domain name for locks we use here
581  */
582 static const char *locking_domain = "GNUNET_STREAM_APPLOCK";
583
584 /**
585  * Callback function for sending queued message
586  *
587  * @param cls closure the socket
588  * @param size number of bytes available in buf
589  * @param buf where the callee should write the message
590  * @return number of bytes written to buf
591  */
592 static size_t
593 send_message_notify (void *cls, size_t size, void *buf)
594 {
595   struct GNUNET_STREAM_Socket *socket = cls;
596   struct MessageQueue *head;
597   size_t ret;
598
599   socket->transmit_handle = NULL; /* Remove the transmit handle */
600   head = socket->queue_head;
601   if (NULL == head)
602     return 0; /* just to be safe */
603   if (0 == size)                /* request timed out */
604   {
605     socket->mesh_retry_timeout = GNUNET_TIME_STD_BACKOFF
606         (socket->mesh_retry_timeout);    
607     LOG (GNUNET_ERROR_TYPE_DEBUG,
608          "%s: Message sending to MESH timed out. Retrying in %s \n",
609          GNUNET_i2s (&socket->other_peer),
610          GNUNET_STRINGS_relative_time_to_string (socket->mesh_retry_timeout,
611                                                  GNUNET_YES));
612     socket->transmit_handle = 
613         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
614                                            GNUNET_NO, /* Corking */
615                                            socket->mesh_retry_timeout,
616                                            &socket->other_peer,
617                                            ntohs (head->message->header.size),
618                                            &send_message_notify,
619                                            socket);
620     return 0;
621   }
622   ret = ntohs (head->message->header.size);
623   GNUNET_assert (size >= ret);
624   memcpy (buf, head->message, ret);
625   if (NULL != head->finish_cb)
626   {
627     head->finish_cb (head->finish_cb_cls, socket);
628   }
629   GNUNET_CONTAINER_DLL_remove (socket->queue_head,
630                                socket->queue_tail,
631                                head);
632   GNUNET_free (head->message);
633   GNUNET_free (head);
634   if (NULL != socket->transmit_handle)
635     return ret; /* 'finish_cb' might have triggered message already! */
636   head = socket->queue_head;
637   if (NULL != head)    /* more pending messages to send */
638   {
639     socket->mesh_retry_timeout = GNUNET_TIME_UNIT_ZERO;
640     socket->transmit_handle = 
641         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
642                                            GNUNET_NO, /* Corking */
643                                            socket->mesh_retry_timeout,
644                                            &socket->other_peer,
645                                            ntohs (head->message->header.size),
646                                            &send_message_notify,
647                                            socket);
648   }
649   return ret;
650 }
651
652
653 /**
654  * Queues a message for sending using the mesh connection of a socket
655  *
656  * @param socket the socket whose mesh connection is used
657  * @param message the message to be sent
658  * @param finish_cb the callback to be called when the message is sent
659  * @param finish_cb_cls the closure for the callback
660  * @param urgent set to GNUNET_YES to add the message to the beginning of the
661  *          queue; GNUNET_NO to add at the tail
662  */
663 static void
664 queue_message (struct GNUNET_STREAM_Socket *socket,
665                struct GNUNET_STREAM_MessageHeader *message,
666                SendFinishCallback finish_cb,
667                void *finish_cb_cls,
668                int urgent)
669 {
670   struct MessageQueue *queue_entity;
671
672   GNUNET_assert 
673     ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA)
674      && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK));
675   LOG (GNUNET_ERROR_TYPE_DEBUG,
676        "%s: Queueing message of type %d and size %d\n",
677        GNUNET_i2s (&socket->other_peer),
678        ntohs (message->header.type),
679        ntohs (message->header.size));
680   GNUNET_assert (NULL != message);
681   queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
682   queue_entity->message = message;
683   queue_entity->finish_cb = finish_cb;
684   queue_entity->finish_cb_cls = finish_cb_cls;
685   if (GNUNET_YES == urgent)
686   {
687     GNUNET_CONTAINER_DLL_insert (socket->queue_head, socket->queue_tail,
688                                  queue_entity);
689     if (NULL != socket->transmit_handle)
690     {
691       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
692       socket->transmit_handle = NULL;
693     }
694   }
695   else
696     GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
697                                       socket->queue_tail,
698                                       queue_entity);
699   if (NULL == socket->transmit_handle)
700   {
701     socket->mesh_retry_timeout = GNUNET_TIME_UNIT_ZERO;
702     socket->transmit_handle = 
703         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
704                                            GNUNET_NO, /* Corking */
705                                            socket->mesh_retry_timeout,
706                                            &socket->other_peer,
707                                            ntohs (message->header.size),
708                                            &send_message_notify,
709                                            socket);
710   }
711 }
712
713
714 /**
715  * Copies a message and queues it for sending using the mesh connection of
716  * given socket 
717  *
718  * @param socket the socket whose mesh connection is used
719  * @param message the message to be sent
720  * @param finish_cb the callback to be called when the message is sent
721  * @param finish_cb_cls the closure for the callback
722  */
723 static void
724 copy_and_queue_message (struct GNUNET_STREAM_Socket *socket,
725                         const struct GNUNET_STREAM_MessageHeader *message,
726                         SendFinishCallback finish_cb,
727                         void *finish_cb_cls)
728 {
729   struct GNUNET_STREAM_MessageHeader *msg_copy;
730   uint16_t size;
731   
732   size = ntohs (message->header.size);
733   msg_copy = GNUNET_malloc (size);
734   memcpy (msg_copy, message, size);
735   queue_message (socket, msg_copy, finish_cb, finish_cb_cls, GNUNET_NO);
736 }
737
738
739 /**
740  * Writes data using the given socket. The amount of data written is limited by
741  * the receiver_window_size
742  *
743  * @param socket the socket to use
744  */
745 static void 
746 write_data (struct GNUNET_STREAM_Socket *socket);
747
748
749 /**
750  * Task for retransmitting data messages if they aren't ACK before their ack
751  * deadline 
752  *
753  * @param cls the socket
754  * @param tc the Task context
755  */
756 static void
757 data_retransmission_task (void *cls,
758                           const struct GNUNET_SCHEDULER_TaskContext *tc)
759 {
760   struct GNUNET_STREAM_Socket *socket = cls;
761   
762   socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
763   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
764     return;
765   LOG (GNUNET_ERROR_TYPE_DEBUG,
766        "%s: Retransmitting DATA...\n", GNUNET_i2s (&socket->other_peer));
767   write_data (socket);
768 }
769
770
771 /**
772  * Task for sending ACK message
773  *
774  * @param cls the socket
775  * @param tc the Task context
776  */
777 static void
778 ack_task (void *cls,
779           const struct GNUNET_SCHEDULER_TaskContext *tc)
780 {
781   struct GNUNET_STREAM_Socket *socket = cls;
782   struct GNUNET_STREAM_AckMessage *ack_msg;
783
784   socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
785   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
786     return;
787   /* Create the ACK Message */
788   ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
789   ack_msg->header.header.size = htons (sizeof (struct 
790                                                GNUNET_STREAM_AckMessage));
791   ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
792   ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
793   ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
794   ack_msg->receive_window_remaining = 
795     htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
796   /* Queue up ACK for immediate sending */
797   queue_message (socket, &ack_msg->header, NULL, NULL, GNUNET_YES);
798 }
799
800
801 /**
802  * Retransmission task for shutdown messages
803  *
804  * @param cls the shutdown handle
805  * @param tc the Task Context
806  */
807 static void
808 close_msg_retransmission_task (void *cls,
809                                const struct GNUNET_SCHEDULER_TaskContext *tc)
810 {
811   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle = cls;
812   struct GNUNET_STREAM_MessageHeader *msg;
813   struct GNUNET_STREAM_Socket *socket;
814
815   shutdown_handle->close_msg_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
816   GNUNET_assert (NULL != shutdown_handle);
817   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
818     return;
819   socket = shutdown_handle->socket;
820   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
821   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
822   switch (shutdown_handle->operation)
823   {
824   case SHUT_RDWR:
825     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
826     break;
827   case SHUT_RD:
828     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
829     break;
830   case SHUT_WR:
831     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
832     break;
833   default:
834     GNUNET_free (msg);
835     shutdown_handle->close_msg_retransmission_task_id = 
836       GNUNET_SCHEDULER_NO_TASK;
837     return;
838   }
839   queue_message (socket, msg, NULL, NULL, GNUNET_NO);
840   shutdown_handle->close_msg_retransmission_task_id =
841     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
842                                   &close_msg_retransmission_task,
843                                   shutdown_handle);
844 }
845
846
847 /**
848  * Function to modify a bit in GNUNET_STREAM_AckBitmap
849  *
850  * @param bitmap the bitmap to modify
851  * @param bit the bit number to modify
852  * @param value GNUNET_YES to on, GNUNET_NO to off
853  */
854 static void
855 ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
856                       unsigned int bit, 
857                       int value)
858 {
859   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
860   if (GNUNET_YES == value)
861     *bitmap |= (1LL << bit);
862   else
863     *bitmap &= ~(1LL << bit);
864 }
865
866
867 /**
868  * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
869  *
870  * @param bitmap address of the bitmap that has to be checked
871  * @param bit the bit number to check
872  * @return GNUNET_YES if the bit is set; GNUNET_NO if not
873  */
874 static uint8_t
875 ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
876                       unsigned int bit)
877 {
878   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
879   return 0 != (*bitmap & (1LL << bit));
880 }
881
882
883 /**
884  * Writes data using the given socket. The amount of data written is limited by
885  * the receiver_window_size
886  *
887  * @param socket the socket to use
888  */
889 static void 
890 write_data (struct GNUNET_STREAM_Socket *socket)
891 {
892   struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
893   unsigned int packet;
894   
895   for (packet=0; packet < io_handle->packets_sent; packet++)
896   {
897     if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
898                                            packet))
899     {
900       LOG (GNUNET_ERROR_TYPE_DEBUG,
901            "%s: Retransmitting DATA message with sequence %u\n",
902            GNUNET_i2s (&socket->other_peer),
903            ntohl (io_handle->messages[packet]->sequence_number));
904       copy_and_queue_message (socket,
905                               &io_handle->messages[packet]->header,
906                               NULL,
907                               NULL);
908     }
909   }
910   /* Now send new packets if there is enough buffer space */
911   while ((packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH) &&
912          (NULL != io_handle->messages[packet]) &&
913          (socket->receiver_window_available 
914           >= ntohs (io_handle->messages[packet]->header.header.size)))
915   {
916     socket->receiver_window_available -= 
917       ntohs (io_handle->messages[packet]->header.header.size);
918     LOG (GNUNET_ERROR_TYPE_DEBUG,
919          "%s: Placing DATA message with sequence %u in send queue\n",
920          GNUNET_i2s (&socket->other_peer),
921          ntohl (io_handle->messages[packet]->sequence_number));
922     copy_and_queue_message (socket,
923                             &io_handle->messages[packet]->header,
924                             NULL,
925                             NULL);
926     packet++;
927   }
928   io_handle->packets_sent = packet;
929   // FIXME: 8s is not good, should use GNUNET_TIME_STD_BACKOFF...
930   if (GNUNET_SCHEDULER_NO_TASK == socket->data_retransmission_task_id)
931   {
932     socket->data_retransmit_timeout = GNUNET_TIME_STD_BACKOFF
933         (socket->data_retransmit_timeout);
934     socket->data_retransmission_task_id = 
935         GNUNET_SCHEDULER_add_delayed (socket->data_retransmit_timeout,
936                                       &data_retransmission_task,
937                                       socket);
938   }
939 }
940
941
942 /**
943  * Task for calling the read processor
944  *
945  * @param cls the socket
946  * @param tc the task context
947  */
948 static void
949 call_read_processor (void *cls,
950                      const struct GNUNET_SCHEDULER_TaskContext *tc)
951 {
952   struct GNUNET_STREAM_Socket *socket = cls;
953   struct GNUNET_STREAM_IOReadHandle *read_handle;
954   size_t read_size;
955   size_t valid_read_size;
956   unsigned int packet;
957   uint32_t sequence_increase;
958   uint32_t offset_increase;
959
960   read_handle = socket->read_handle;
961   GNUNET_assert (NULL != read_handle);
962   read_handle->read_task_id = GNUNET_SCHEDULER_NO_TASK;
963   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
964     return;
965   if (NULL == socket->receive_buffer) 
966     return;
967   GNUNET_assert (NULL != socket->read_handle);
968   GNUNET_assert (NULL != socket->read_handle->proc);
969   /* Check the bitmap for any holes */
970   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
971   {
972     if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
973                                            packet))
974       break;
975   }
976   /* We only call read processor if we have the first packet */
977   GNUNET_assert (0 < packet);
978   valid_read_size = 
979     socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
980   GNUNET_assert (0 != valid_read_size);
981   /* Cancel the read_io_timeout_task */
982   GNUNET_SCHEDULER_cancel (read_handle->read_io_timeout_task_id);
983   read_handle->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
984   /* Call the data processor */
985   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Calling read processor\n",
986        GNUNET_i2s (&socket->other_peer));
987   read_size = 
988       socket->read_handle->proc (socket->read_handle->proc_cls,
989                                  socket->status,
990                                  socket->receive_buffer + socket->copy_offset,
991                                  valid_read_size);
992   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor read %d bytes\n",
993        GNUNET_i2s (&socket->other_peer), read_size);
994   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor completed successfully\n",
995        GNUNET_i2s (&socket->other_peer));
996   /* Free the read handle */
997   GNUNET_free (socket->read_handle);
998   socket->read_handle = NULL;
999   GNUNET_assert (read_size <= valid_read_size);
1000   socket->copy_offset += read_size;
1001   /* Determine upto which packet we can remove from the buffer */
1002   for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
1003   {
1004     if (socket->copy_offset == socket->receive_buffer_boundaries[packet])
1005     { 
1006       packet++; 
1007       break;
1008     }
1009     if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
1010       break;
1011   }
1012   /* If no packets can be removed we can't move the buffer */
1013   if (0 == packet) 
1014     return;
1015   sequence_increase = packet;
1016   LOG (GNUNET_ERROR_TYPE_DEBUG,
1017        "%s: Sequence increase after read processor completion: %u\n",
1018        GNUNET_i2s (&socket->other_peer), sequence_increase);
1019   /* Shift the data in the receive buffer */
1020   socket->receive_buffer = 
1021     memmove (socket->receive_buffer,
1022              socket->receive_buffer 
1023              + socket->receive_buffer_boundaries[sequence_increase-1],
1024              socket->receive_buffer_size
1025              - socket->receive_buffer_boundaries[sequence_increase-1]);
1026   /* Shift the bitmap */
1027   socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
1028   /* Set read_sequence_number */
1029   socket->read_sequence_number += sequence_increase;
1030   /* Set read_offset */
1031   offset_increase = socket->receive_buffer_boundaries[sequence_increase-1];
1032   socket->read_offset += offset_increase;
1033   /* Fix copy_offset */
1034   GNUNET_assert (offset_increase <= socket->copy_offset);
1035   socket->copy_offset -= offset_increase;
1036   /* Fix relative boundaries */
1037   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
1038   {
1039     if (packet < (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase))
1040     {
1041       uint32_t ahead_buffer_boundary;
1042
1043       ahead_buffer_boundary = 
1044         socket->receive_buffer_boundaries[packet + sequence_increase];
1045       if (0 == ahead_buffer_boundary)
1046         socket->receive_buffer_boundaries[packet] = 0;
1047       else
1048       {
1049         GNUNET_assert (offset_increase < ahead_buffer_boundary);
1050         socket->receive_buffer_boundaries[packet] = 
1051           ahead_buffer_boundary - offset_increase;
1052       }
1053     }
1054     else
1055       socket->receive_buffer_boundaries[packet] = 0;
1056   }
1057 }
1058
1059
1060 /**
1061  * Cancels the existing read io handle
1062  *
1063  * @param cls the closure from the SCHEDULER call
1064  * @param tc the task context
1065  */
1066 static void
1067 read_io_timeout (void *cls,
1068                  const struct GNUNET_SCHEDULER_TaskContext *tc)
1069 {
1070   struct GNUNET_STREAM_Socket *socket = cls;
1071   struct GNUNET_STREAM_IOReadHandle *read_handle;
1072   GNUNET_STREAM_DataProcessor proc;
1073   void *proc_cls;
1074
1075   read_handle = socket->read_handle;
1076   GNUNET_assert (NULL != read_handle);
1077   read_handle->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
1078   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
1079     return;
1080   if (read_handle->read_task_id != GNUNET_SCHEDULER_NO_TASK)
1081   {
1082     LOG (GNUNET_ERROR_TYPE_DEBUG,
1083          "%s: Read task timedout - Cancelling it\n",
1084          GNUNET_i2s (&socket->other_peer));
1085     GNUNET_SCHEDULER_cancel (read_handle->read_task_id);
1086     read_handle->read_task_id = GNUNET_SCHEDULER_NO_TASK;
1087   }
1088   proc = read_handle->proc;
1089   proc_cls = read_handle->proc_cls;
1090   GNUNET_free (read_handle);
1091   socket->read_handle = NULL;
1092   /* Call the read processor to signal timeout */
1093   proc (proc_cls,
1094         GNUNET_STREAM_TIMEOUT,
1095         NULL,
1096         0);
1097 }
1098
1099
1100 /**
1101  * Handler for DATA messages; Same for both client and server
1102  *
1103  * @param socket the socket through which the ack was received
1104  * @param tunnel connection to the other end
1105  * @param sender who sent the message
1106  * @param msg the data message
1107  * @param atsi performance data for the connection
1108  * @return GNUNET_OK to keep the connection open,
1109  *         GNUNET_SYSERR to close it (signal serious error)
1110  */
1111 static int
1112 handle_data (struct GNUNET_STREAM_Socket *socket,
1113              struct GNUNET_MESH_Tunnel *tunnel,
1114              const struct GNUNET_PeerIdentity *sender,
1115              const struct GNUNET_STREAM_DataMessage *msg,
1116              const struct GNUNET_ATS_Information*atsi)
1117 {
1118   const void *payload;
1119   struct GNUNET_TIME_Relative ack_deadline_rel;
1120   uint32_t bytes_needed;
1121   uint32_t relative_offset;
1122   uint32_t relative_sequence_number;
1123   uint16_t size;
1124
1125   size = htons (msg->header.header.size);
1126   if (size < sizeof (struct GNUNET_STREAM_DataMessage))
1127   {
1128     GNUNET_break_op (0);
1129     return GNUNET_SYSERR;
1130   }
1131   if (0 != memcmp (sender, &socket->other_peer,
1132                    sizeof (struct GNUNET_PeerIdentity)))
1133   {
1134     LOG (GNUNET_ERROR_TYPE_DEBUG,
1135          "%s: Received DATA from non-confirming peer\n",
1136          GNUNET_i2s (&socket->other_peer));
1137     return GNUNET_YES;
1138   }
1139   switch (socket->state)
1140   {
1141   case STATE_ESTABLISHED:
1142   case STATE_TRANSMIT_CLOSED:
1143   case STATE_TRANSMIT_CLOSE_WAIT:      
1144     /* check if the message's sequence number is in the range we are
1145        expecting */
1146     relative_sequence_number = 
1147       ntohl (msg->sequence_number) - socket->read_sequence_number;
1148     if ( relative_sequence_number >= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
1149     {
1150       LOG (GNUNET_ERROR_TYPE_DEBUG,
1151            "%s: Ignoring received message with sequence number %u\n",
1152            GNUNET_i2s (&socket->other_peer),
1153            ntohl (msg->sequence_number));
1154       /* Start ACK sending task if one is not already present */
1155       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1156       {
1157         socket->ack_task_id = 
1158           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1159                                         (msg->ack_deadline),
1160                                         &ack_task,
1161                                         socket);
1162       }
1163       return GNUNET_YES;
1164     }      
1165     /* Check if we have already seen this message */
1166     if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
1167                                             relative_sequence_number))
1168     {
1169       LOG (GNUNET_ERROR_TYPE_DEBUG,
1170            "%s: Ignoring already received message with sequence number %u\n",
1171            GNUNET_i2s (&socket->other_peer),
1172            ntohl (msg->sequence_number));
1173       /* Start ACK sending task if one is not already present */
1174       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1175       {
1176         socket->ack_task_id = 
1177           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1178                                         (msg->ack_deadline), &ack_task, socket);
1179       }
1180       return GNUNET_YES;
1181     }
1182     LOG (GNUNET_ERROR_TYPE_DEBUG,
1183          "%s: Receiving DATA with sequence number: %u and size: %d from %s\n",
1184          GNUNET_i2s (&socket->other_peer), ntohl (msg->sequence_number),
1185          ntohs (msg->header.header.size), GNUNET_i2s (&socket->other_peer));
1186     /* Check if we have to allocate the buffer */
1187     size -= sizeof (struct GNUNET_STREAM_DataMessage);
1188     relative_offset = ntohl (msg->offset) - socket->read_offset;
1189     bytes_needed = relative_offset + size;
1190     if (bytes_needed > socket->receive_buffer_size)
1191     {
1192       if (bytes_needed <= RECEIVE_BUFFER_SIZE)
1193       {
1194         socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
1195                                                  bytes_needed);
1196         socket->receive_buffer_size = bytes_needed;
1197       }
1198       else
1199       {
1200         LOG (GNUNET_ERROR_TYPE_DEBUG,
1201              "%s: Cannot accommodate packet %d as buffer is full\n",
1202              GNUNET_i2s (&socket->other_peer), ntohl (msg->sequence_number));
1203         return GNUNET_YES;
1204       }
1205     }
1206     /* Copy Data to buffer */
1207     payload = &msg[1];
1208     GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
1209     memcpy (socket->receive_buffer + relative_offset, payload, size);
1210     socket->receive_buffer_boundaries[relative_sequence_number] = 
1211         relative_offset + size;
1212     /* Modify the ACK bitmap */
1213     ackbitmap_modify_bit (&socket->ack_bitmap, relative_sequence_number,
1214                           GNUNET_YES);
1215     /* Start ACK sending task if one is not already present */
1216     ack_deadline_rel = GNUNET_TIME_relative_ntoh (msg->ack_deadline);
1217     if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1218     {
1219       ack_deadline_rel = 
1220           GNUNET_TIME_relative_min (ack_deadline_rel,
1221                                     GNUNET_TIME_relative_multiply
1222                                     (GNUNET_TIME_UNIT_SECONDS, 300));
1223       socket->ack_task_id = 
1224           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh 
1225                                         (msg->ack_deadline), &ack_task, socket);
1226       socket->ack_time_registered = GNUNET_TIME_absolute_get ();
1227       socket->ack_time_deadline = ack_deadline_rel;
1228     }
1229     else
1230     {
1231       struct GNUNET_TIME_Relative ack_time_past;
1232       struct GNUNET_TIME_Relative ack_time_remaining;
1233       struct GNUNET_TIME_Relative ack_time_min;
1234       ack_time_past = 
1235           GNUNET_TIME_absolute_get_duration (socket->ack_time_registered);
1236       ack_time_remaining = GNUNET_TIME_relative_subtract
1237           (socket->ack_time_deadline, ack_time_past);
1238       ack_time_min = GNUNET_TIME_relative_min (ack_time_remaining,
1239                                                ack_deadline_rel);
1240       if (0 == memcmp(&ack_deadline_rel, &ack_time_min,
1241                       sizeof (struct GNUNET_TIME_Relative)))
1242       {
1243         ack_deadline_rel = ack_time_min;
1244         GNUNET_SCHEDULER_cancel (socket->ack_task_id);
1245         socket->ack_task_id = GNUNET_SCHEDULER_add_delayed (ack_deadline_rel,
1246                                                             &ack_task, socket);
1247         socket->ack_time_registered = GNUNET_TIME_absolute_get ();
1248         socket->ack_time_deadline = ack_deadline_rel;
1249       }
1250     }
1251     if ((NULL != socket->read_handle) /* A read handle is waiting */
1252         /* There is no current read task */
1253         && (GNUNET_SCHEDULER_NO_TASK == socket->read_handle->read_task_id)
1254         /* We have the first packet */
1255         && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap, 0)))
1256     {
1257       LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Scheduling read processor\n",
1258            GNUNET_i2s (&socket->other_peer));
1259       socket->read_handle->read_task_id =
1260           GNUNET_SCHEDULER_add_now (&call_read_processor, socket);
1261     }
1262     break;
1263   default:
1264     LOG (GNUNET_ERROR_TYPE_DEBUG,
1265          "%s: Received data message when it cannot be handled\n",
1266          GNUNET_i2s (&socket->other_peer));
1267     break;
1268   }
1269   return GNUNET_YES;
1270 }
1271
1272
1273 /**
1274  * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1275  *
1276  * @param cls the socket (set from GNUNET_MESH_connect)
1277  * @param tunnel connection to the other end
1278  * @param tunnel_ctx place to store local state associated with the tunnel
1279  * @param sender who sent the message
1280  * @param message the actual message
1281  * @param atsi performance data for the connection
1282  * @return GNUNET_OK to keep the connection open,
1283  *         GNUNET_SYSERR to close it (signal serious error)
1284  */
1285 static int
1286 client_handle_data (void *cls,
1287                     struct GNUNET_MESH_Tunnel *tunnel,
1288                     void **tunnel_ctx,
1289                     const struct GNUNET_PeerIdentity *sender,
1290                     const struct GNUNET_MessageHeader *message,
1291                     const struct GNUNET_ATS_Information*atsi)
1292 {
1293   struct GNUNET_STREAM_Socket *socket = cls;
1294
1295   return handle_data (socket, tunnel, sender, 
1296                       (const struct GNUNET_STREAM_DataMessage *) message, atsi);
1297 }
1298
1299
1300 /**
1301  * Callback to set state to ESTABLISHED
1302  *
1303  * @param cls the closure NULL;
1304  * @param socket the socket to requiring state change
1305  */
1306 static void
1307 set_state_established (void *cls,
1308                        struct GNUNET_STREAM_Socket *socket)
1309 {
1310   LOG (GNUNET_ERROR_TYPE_DEBUG, 
1311        "%s: Attaining ESTABLISHED state\n",
1312        GNUNET_i2s (&socket->other_peer));
1313   socket->write_offset = 0;
1314   socket->read_offset = 0;
1315   socket->state = STATE_ESTABLISHED;
1316   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK !=
1317                  socket->control_retransmission_task_id);
1318   GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
1319   socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
1320   if (NULL != socket->lsocket)
1321   {
1322     LOG (GNUNET_ERROR_TYPE_DEBUG,
1323          "%s: Calling listen callback\n",
1324          GNUNET_i2s (&socket->other_peer));
1325     if (GNUNET_SYSERR == 
1326         socket->lsocket->listen_cb (socket->lsocket->listen_cb_cls,
1327                                     socket,
1328                                     &socket->other_peer))
1329     {
1330       socket->state = STATE_CLOSED;
1331       /* FIXME: We should close in a decent way (send RST) */
1332       GNUNET_MESH_tunnel_destroy (socket->tunnel); /* Destroy the tunnel */
1333       GNUNET_free (socket);
1334     }
1335   }
1336   else
1337     socket->open_cb (socket->open_cls, socket);
1338 }
1339
1340
1341 /**
1342  * Callback to set state to HELLO_WAIT
1343  *
1344  * @param cls the closure from queue_message
1345  * @param socket the socket to requiring state change
1346  */
1347 static void
1348 set_state_hello_wait (void *cls,
1349                       struct GNUNET_STREAM_Socket *socket)
1350 {
1351   GNUNET_assert (STATE_INIT == socket->state);
1352   LOG (GNUNET_ERROR_TYPE_DEBUG,
1353        "%s: Attaining HELLO_WAIT state\n",
1354        GNUNET_i2s (&socket->other_peer));
1355   socket->state = STATE_HELLO_WAIT;
1356 }
1357
1358
1359 /**
1360  * Callback to set state to CLOSE_WAIT
1361  *
1362  * @param cls the closure from queue_message
1363  * @param socket the socket requiring state change
1364  */
1365 static void
1366 set_state_close_wait (void *cls,
1367                       struct GNUNET_STREAM_Socket *socket)
1368 {
1369   LOG (GNUNET_ERROR_TYPE_DEBUG,
1370        "%s: Attaing CLOSE_WAIT state\n",
1371        GNUNET_i2s (&socket->other_peer));
1372   socket->state = STATE_CLOSE_WAIT;
1373   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1374   socket->receive_buffer = NULL;
1375   socket->receive_buffer_size = 0;
1376 }
1377
1378
1379 /**
1380  * Callback to set state to RECEIVE_CLOSE_WAIT
1381  *
1382  * @param cls the closure from queue_message
1383  * @param socket the socket requiring state change
1384  */
1385 static void
1386 set_state_receive_close_wait (void *cls,
1387                               struct GNUNET_STREAM_Socket *socket)
1388 {
1389   LOG (GNUNET_ERROR_TYPE_DEBUG,
1390        "%s: Attaing RECEIVE_CLOSE_WAIT state\n",
1391        GNUNET_i2s (&socket->other_peer));
1392   socket->state = STATE_RECEIVE_CLOSE_WAIT;
1393   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1394   socket->receive_buffer = NULL;
1395   socket->receive_buffer_size = 0;
1396 }
1397
1398
1399 /**
1400  * Callback to set state to TRANSMIT_CLOSE_WAIT
1401  *
1402  * @param cls the closure from queue_message
1403  * @param socket the socket requiring state change
1404  */
1405 static void
1406 set_state_transmit_close_wait (void *cls,
1407                                struct GNUNET_STREAM_Socket *socket)
1408 {
1409   LOG (GNUNET_ERROR_TYPE_DEBUG,
1410        "%s: Attaing TRANSMIT_CLOSE_WAIT state\n",
1411        GNUNET_i2s (&socket->other_peer));
1412   socket->state = STATE_TRANSMIT_CLOSE_WAIT;
1413 }
1414
1415
1416 /**
1417  * Callback to set state to CLOSED
1418  *
1419  * @param cls the closure from queue_message
1420  * @param socket the socket requiring state change
1421  */
1422 static void
1423 set_state_closed (void *cls,
1424                   struct GNUNET_STREAM_Socket *socket)
1425 {
1426   socket->state = STATE_CLOSED;
1427 }
1428
1429
1430 /**
1431  * Returns GNUNET_MESSAGE_TYPE_STREAM_HELLO
1432  *
1433  * @return the generate hello message
1434  */
1435 static struct GNUNET_STREAM_MessageHeader *
1436 generate_hello (void)
1437 {
1438   struct GNUNET_STREAM_MessageHeader *msg;
1439
1440   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1441   msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
1442   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1443   return msg;
1444 }
1445
1446
1447 /**
1448  * Returns a new HelloAckMessage. Also sets the write sequence number for the
1449  * socket
1450  *
1451  * @param socket the socket for which this HelloAckMessage has to be generated
1452  * @param generate_seq GNUNET_YES to generate the write sequence number,
1453  *          GNUNET_NO to use the existing sequence number
1454  * @return the HelloAckMessage
1455  */
1456 static struct GNUNET_STREAM_HelloAckMessage *
1457 generate_hello_ack (struct GNUNET_STREAM_Socket *socket,
1458                     int generate_seq)
1459 {
1460   struct GNUNET_STREAM_HelloAckMessage *msg;
1461
1462   if (GNUNET_YES == generate_seq)
1463   {
1464     if (GNUNET_YES == socket->testing_active)
1465       socket->write_sequence_number =
1466         socket->testing_set_write_sequence_number_value;
1467     else
1468       socket->write_sequence_number = 
1469         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1470     LOG_DEBUG ("%s: write sequence number %u\n",
1471                GNUNET_i2s (&socket->other_peer),
1472                (unsigned int) socket->write_sequence_number);
1473   }
1474   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1475   msg->header.header.size = 
1476     htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1477   msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1478   msg->sequence_number = htonl (socket->write_sequence_number);
1479   msg->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
1480   return msg;
1481 }
1482
1483
1484 /**
1485  * Task for retransmitting control messages if they aren't ACK'ed before a
1486  * deadline
1487  *
1488  * @param cls the socket
1489  * @param tc the Task context
1490  */
1491 static void
1492 control_retransmission_task (void *cls,
1493                              const struct GNUNET_SCHEDULER_TaskContext *tc)
1494 {
1495   struct GNUNET_STREAM_Socket *socket = cls;
1496     
1497   socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
1498   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
1499     return;
1500   LOG_DEBUG ("%s: Retransmitting a control message\n",
1501                  GNUNET_i2s (&socket->other_peer));
1502   switch (socket->state)
1503   {
1504   case STATE_INIT:    
1505     GNUNET_break (0);
1506     break;
1507   case STATE_LISTEN:
1508     GNUNET_break (0);
1509     break;
1510   case STATE_HELLO_WAIT:
1511     if (NULL == socket->lsocket) /* We are client */
1512       queue_message (socket, generate_hello (), NULL, NULL, GNUNET_NO);
1513     else
1514       queue_message (socket,
1515                      (struct GNUNET_STREAM_MessageHeader *)
1516                      generate_hello_ack (socket, GNUNET_NO), NULL, NULL,
1517                      GNUNET_NO);
1518     socket->control_retransmission_task_id =
1519     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
1520                                   &control_retransmission_task, socket);
1521     break;
1522   case STATE_ESTABLISHED:
1523     if (NULL == socket->lsocket)
1524       queue_message (socket,
1525                      (struct GNUNET_STREAM_MessageHeader *)
1526                      generate_hello_ack (socket, GNUNET_NO), NULL, NULL,
1527                      GNUNET_NO);
1528     else
1529       GNUNET_break (0);
1530     break;
1531   default:
1532     GNUNET_break (0);
1533   }  
1534 }
1535
1536
1537 /**
1538  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1539  *
1540  * @param cls the socket (set from GNUNET_MESH_connect)
1541  * @param tunnel connection to the other end
1542  * @param tunnel_ctx this is NULL
1543  * @param sender who sent the message
1544  * @param message the actual message
1545  * @param atsi performance data for the connection
1546  * @return GNUNET_OK to keep the connection open,
1547  *         GNUNET_SYSERR to close it (signal serious error)
1548  */
1549 static int
1550 client_handle_hello_ack (void *cls,
1551                          struct GNUNET_MESH_Tunnel *tunnel,
1552                          void **tunnel_ctx,
1553                          const struct GNUNET_PeerIdentity *sender,
1554                          const struct GNUNET_MessageHeader *message,
1555                          const struct GNUNET_ATS_Information*atsi)
1556 {
1557   struct GNUNET_STREAM_Socket *socket = cls;
1558   const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
1559   struct GNUNET_STREAM_HelloAckMessage *reply;
1560
1561   if (0 != memcmp (sender, &socket->other_peer,
1562                    sizeof (struct GNUNET_PeerIdentity)))
1563   {
1564     LOG (GNUNET_ERROR_TYPE_DEBUG,
1565          "%s: Received HELLO_ACK from non-confirming peer\n",
1566          GNUNET_i2s (&socket->other_peer));
1567     return GNUNET_YES;
1568   }
1569   ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
1570   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received HELLO_ACK from %s\n",
1571        GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1572   GNUNET_assert (socket->tunnel == tunnel);
1573   switch (socket->state)
1574   {
1575   case STATE_HELLO_WAIT:
1576     socket->read_sequence_number = ntohl (ack_msg->sequence_number);
1577     LOG (GNUNET_ERROR_TYPE_DEBUG,
1578          "%s: Read sequence number %u\n",
1579          GNUNET_i2s (&socket->other_peer),
1580          (unsigned int) socket->read_sequence_number);
1581     socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
1582     reply = generate_hello_ack (socket, GNUNET_YES);
1583     queue_message (socket, &reply->header, &set_state_established,
1584                    NULL, GNUNET_NO);    
1585     return GNUNET_OK;
1586   case STATE_ESTABLISHED:
1587     // call statistics (# ACKs ignored++)
1588     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
1589                    socket->control_retransmission_task_id);
1590     socket->control_retransmission_task_id =
1591       GNUNET_SCHEDULER_add_now (&control_retransmission_task, socket);
1592     return GNUNET_OK;
1593   default:
1594     LOG_DEBUG ("%s: Server %s sent HELLO_ACK when in state %d\n", 
1595                GNUNET_i2s (&socket->other_peer),
1596                GNUNET_i2s (&socket->other_peer), socket->state);
1597     socket->state = STATE_CLOSED; // introduce STATE_ERROR?
1598     return GNUNET_SYSERR;
1599   }
1600 }
1601
1602
1603 /**
1604  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1605  *
1606  * @param cls the socket (set from GNUNET_MESH_connect)
1607  * @param tunnel connection to the other end
1608  * @param tunnel_ctx this is NULL
1609  * @param sender who sent the message
1610  * @param message the actual message
1611  * @param atsi performance data for the connection
1612  * @return GNUNET_OK to keep the connection open,
1613  *         GNUNET_SYSERR to close it (signal serious error)
1614  */
1615 static int
1616 client_handle_reset (void *cls,
1617                      struct GNUNET_MESH_Tunnel *tunnel,
1618                      void **tunnel_ctx,
1619                      const struct GNUNET_PeerIdentity *sender,
1620                      const struct GNUNET_MessageHeader *message,
1621                      const struct GNUNET_ATS_Information*atsi)
1622 {
1623   // struct GNUNET_STREAM_Socket *socket = cls;
1624
1625   return GNUNET_OK;
1626 }
1627
1628
1629 /**
1630  * Common message handler for handling TRANSMIT_CLOSE messages
1631  *
1632  * @param socket the socket through which the ack was received
1633  * @param tunnel connection to the other end
1634  * @param sender who sent the message
1635  * @param msg the transmit close message
1636  * @param atsi performance data for the connection
1637  * @return GNUNET_OK to keep the connection open,
1638  *         GNUNET_SYSERR to close it (signal serious error)
1639  */
1640 static int
1641 handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
1642                        struct GNUNET_MESH_Tunnel *tunnel,
1643                        const struct GNUNET_PeerIdentity *sender,
1644                        const struct GNUNET_STREAM_MessageHeader *msg,
1645                        const struct GNUNET_ATS_Information*atsi)
1646 {
1647   struct GNUNET_STREAM_MessageHeader *reply;
1648
1649   switch (socket->state)
1650   {
1651   case STATE_INIT:
1652   case STATE_LISTEN:
1653   case STATE_HELLO_WAIT:
1654     LOG (GNUNET_ERROR_TYPE_DEBUG,
1655          "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
1656          GNUNET_i2s (&socket->other_peer));
1657     return GNUNET_OK;
1658   default:
1659     break;
1660   }
1661   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received TRANSMIT_CLOSE from %s\n",
1662        GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1663   socket->receive_closed = GNUNET_YES;
1664   if (GNUNET_YES == socket->transmit_closed)
1665     socket->state = STATE_CLOSED;
1666   else
1667     socket->state = STATE_RECEIVE_CLOSED;
1668   /* Send TRANSMIT_CLOSE_ACK */
1669   reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1670   reply->header.type = 
1671       htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
1672   reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1673   queue_message (socket, reply, NULL, NULL, GNUNET_NO);
1674   return GNUNET_OK;
1675 }
1676
1677
1678 /**
1679  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1680  *
1681  * @param cls the socket (set from GNUNET_MESH_connect)
1682  * @param tunnel connection to the other end
1683  * @param tunnel_ctx this is NULL
1684  * @param sender who sent the message
1685  * @param message the actual message
1686  * @param atsi performance data for the connection
1687  * @return GNUNET_OK to keep the connection open,
1688  *         GNUNET_SYSERR to close it (signal serious error)
1689  */
1690 static int
1691 client_handle_transmit_close (void *cls,
1692                               struct GNUNET_MESH_Tunnel *tunnel,
1693                               void **tunnel_ctx,
1694                               const struct GNUNET_PeerIdentity *sender,
1695                               const struct GNUNET_MessageHeader *message,
1696                               const struct GNUNET_ATS_Information*atsi)
1697 {
1698   struct GNUNET_STREAM_Socket *socket = cls;
1699   
1700   return handle_transmit_close (socket,
1701                                 tunnel,
1702                                 sender,
1703                                 (struct GNUNET_STREAM_MessageHeader *)message,
1704                                 atsi);
1705 }
1706
1707
1708 /**
1709  * Task for calling the shutdown continuation callback
1710  *
1711  * @param cls the socket
1712  * @param tc the scheduler task context
1713  */
1714 static void
1715 call_cont_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1716 {
1717   struct GNUNET_STREAM_Socket *socket = cls;
1718   
1719   GNUNET_assert (NULL != socket->shutdown_handle);
1720   socket->shutdown_handle->call_cont_task_id = GNUNET_SCHEDULER_NO_TASK;
1721   if (NULL != socket->shutdown_handle->completion_cb)
1722     socket->shutdown_handle->completion_cb
1723         (socket->shutdown_handle->completion_cls,
1724          socket->shutdown_handle->operation);
1725   GNUNET_free (socket->shutdown_handle);
1726   socket->shutdown_handle = NULL;
1727 }
1728
1729
1730 /**
1731  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_*_CLOSE_ACK messages
1732  *
1733  * @param socket the socket
1734  * @param tunnel connection to the other end
1735  * @param sender who sent the message
1736  * @param message the actual message
1737  * @param atsi performance data for the connection
1738  * @param operation the close operation which is being ACK'ed
1739  * @return GNUNET_OK to keep the connection open,
1740  *         GNUNET_SYSERR to close it (signal serious error)
1741  */
1742 static int
1743 handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
1744                           struct GNUNET_MESH_Tunnel *tunnel,
1745                           const struct GNUNET_PeerIdentity *sender,
1746                           const struct GNUNET_STREAM_MessageHeader *message,
1747                           const struct GNUNET_ATS_Information *atsi,
1748                           int operation)
1749 {
1750   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
1751
1752   shutdown_handle = socket->shutdown_handle;
1753   if (NULL == shutdown_handle)
1754   {
1755     /* This may happen when the shudown handle is cancelled */
1756     LOG (GNUNET_ERROR_TYPE_DEBUG,
1757          "%s: Received CLOSE_ACK when shutdown handle is NULL\n",
1758          GNUNET_i2s (&socket->other_peer));
1759     return GNUNET_OK;
1760   }
1761   switch (operation)
1762   {
1763   case SHUT_RDWR:
1764     switch (socket->state)
1765     {
1766     case STATE_CLOSE_WAIT:
1767       if (SHUT_RDWR != shutdown_handle->operation)
1768       {
1769         LOG (GNUNET_ERROR_TYPE_DEBUG,
1770              "%s: Received CLOSE_ACK when shutdown handle is not for "
1771              "SHUT_RDWR\n", GNUNET_i2s (&socket->other_peer));
1772         return GNUNET_OK;
1773       }
1774       LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received CLOSE_ACK from %s\n",
1775            GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1776       socket->state = STATE_CLOSED;
1777       break;
1778     default:
1779       LOG (GNUNET_ERROR_TYPE_DEBUG,
1780            "%s: Received CLOSE_ACK when it is not expected\n",
1781            GNUNET_i2s (&socket->other_peer));
1782       return GNUNET_OK;
1783     }
1784     break;
1785   case SHUT_RD:
1786     switch (socket->state)
1787     {
1788     case STATE_RECEIVE_CLOSE_WAIT:
1789       if (SHUT_RD != shutdown_handle->operation)
1790       {
1791         LOG (GNUNET_ERROR_TYPE_DEBUG,
1792              "%s: Received RECEIVE_CLOSE_ACK when shutdown handle "
1793              "is not for SHUT_RD\n", GNUNET_i2s (&socket->other_peer));
1794         return GNUNET_OK;
1795       }
1796       LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received RECEIVE_CLOSE_ACK from %s\n",
1797            GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1798       socket->state = STATE_RECEIVE_CLOSED;
1799       break;
1800     default:
1801       LOG (GNUNET_ERROR_TYPE_DEBUG,
1802            "%s: Received RECEIVE_CLOSE_ACK when it is not expected\n",
1803            GNUNET_i2s (&socket->other_peer));
1804       return GNUNET_OK;
1805     }
1806     break;
1807   case SHUT_WR:
1808     switch (socket->state)
1809     {
1810     case STATE_TRANSMIT_CLOSE_WAIT:
1811       if (SHUT_WR != shutdown_handle->operation)
1812       {
1813         LOG (GNUNET_ERROR_TYPE_DEBUG,
1814              "%s: Received TRANSMIT_CLOSE_ACK when shutdown handle "
1815              "is not for SHUT_WR\n",
1816              GNUNET_i2s (&socket->other_peer));
1817         return GNUNET_OK;
1818       }
1819       LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received TRANSMIT_CLOSE_ACK from %s\n",
1820            GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1821       socket->state = STATE_TRANSMIT_CLOSED;
1822       break;
1823     default:
1824       LOG (GNUNET_ERROR_TYPE_DEBUG,
1825            "%s: Received TRANSMIT_CLOSE_ACK when it is not expected\n",
1826            GNUNET_i2s (&socket->other_peer));          
1827       return GNUNET_OK;
1828     }
1829     break;
1830   default:
1831     GNUNET_assert (0);
1832   }
1833   shutdown_handle->call_cont_task_id = GNUNET_SCHEDULER_add_now
1834       (&call_cont_task, socket);
1835   if (GNUNET_SCHEDULER_NO_TASK
1836       != shutdown_handle->close_msg_retransmission_task_id)
1837   {
1838     GNUNET_SCHEDULER_cancel
1839       (shutdown_handle->close_msg_retransmission_task_id);
1840     shutdown_handle->close_msg_retransmission_task_id =
1841         GNUNET_SCHEDULER_NO_TASK;
1842   }
1843   return GNUNET_OK;
1844 }
1845
1846
1847 /**
1848  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1849  *
1850  * @param cls the socket (set from GNUNET_MESH_connect)
1851  * @param tunnel connection to the other end
1852  * @param tunnel_ctx this is NULL
1853  * @param sender who sent the message
1854  * @param message the actual message
1855  * @param atsi performance data for the connection
1856  * @return GNUNET_OK to keep the connection open,
1857  *         GNUNET_SYSERR to close it (signal serious error)
1858  */
1859 static int
1860 client_handle_transmit_close_ack (void *cls,
1861                                   struct GNUNET_MESH_Tunnel *tunnel,
1862                                   void **tunnel_ctx,
1863                                   const struct GNUNET_PeerIdentity *sender,
1864                                   const struct GNUNET_MessageHeader *message,
1865                                   const struct GNUNET_ATS_Information*atsi)
1866 {
1867   struct GNUNET_STREAM_Socket *socket = cls;
1868
1869   return handle_generic_close_ack (socket,
1870                                    tunnel,
1871                                    sender,
1872                                    (const struct GNUNET_STREAM_MessageHeader *)
1873                                    message,
1874                                    atsi,
1875                                    SHUT_WR);
1876 }
1877
1878
1879 /**
1880  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1881  *
1882  * @param socket the socket
1883  * @param tunnel connection to the other end
1884  * @param sender who sent the message
1885  * @param message the actual message
1886  * @param atsi performance data for the connection
1887  * @return GNUNET_OK to keep the connection open,
1888  *         GNUNET_SYSERR to close it (signal serious error)
1889  */
1890 static int
1891 handle_receive_close (struct GNUNET_STREAM_Socket *socket,
1892                       struct GNUNET_MESH_Tunnel *tunnel,
1893                       const struct GNUNET_PeerIdentity *sender,
1894                       const struct GNUNET_STREAM_MessageHeader *message,
1895                       const struct GNUNET_ATS_Information *atsi)
1896 {
1897   struct GNUNET_STREAM_MessageHeader *receive_close_ack;
1898
1899   switch (socket->state)
1900   {
1901   case STATE_INIT:
1902   case STATE_LISTEN:
1903   case STATE_HELLO_WAIT:
1904     LOG (GNUNET_ERROR_TYPE_DEBUG,
1905          "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
1906          GNUNET_i2s (&socket->other_peer));
1907     return GNUNET_OK;
1908   default:
1909     break;
1910   }  
1911   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received RECEIVE_CLOSE from %s\n",
1912        GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1913   socket->transmit_closed = GNUNET_YES;
1914   if (GNUNET_YES == socket->receive_closed)
1915     socket->state = STATE_CLOSED;
1916   else
1917     socket->state = STATE_TRANSMIT_CLOSED;
1918   receive_close_ack =
1919     GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1920   receive_close_ack->header.size =
1921     htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1922   receive_close_ack->header.type =
1923     htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK);
1924   queue_message (socket, receive_close_ack, NULL, NULL, GNUNET_NO);
1925   /* If write handle is present call it with GNUNET_STREAM_SHUTDOWN to signal
1926      that that stream has been shutdown */
1927   if (NULL != socket->write_handle)
1928   {
1929     GNUNET_STREAM_CompletionContinuation wc;
1930     void *wc_cls;
1931
1932     wc = socket->write_handle->write_cont;
1933     wc_cls = socket->write_handle->write_cont_cls;
1934     GNUNET_STREAM_io_write_cancel (socket->write_handle);
1935     socket->write_handle = NULL;
1936     if (NULL != wc)
1937       wc (wc_cls,
1938           GNUNET_STREAM_SHUTDOWN, 0);
1939   }
1940   return GNUNET_OK;
1941 }
1942
1943
1944 /**
1945  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1946  *
1947  * @param cls the socket (set from GNUNET_MESH_connect)
1948  * @param tunnel connection to the other end
1949  * @param tunnel_ctx this is NULL
1950  * @param sender who sent the message
1951  * @param message the actual message
1952  * @param atsi performance data for the connection
1953  * @return GNUNET_OK to keep the connection open,
1954  *         GNUNET_SYSERR to close it (signal serious error)
1955  */
1956 static int
1957 client_handle_receive_close (void *cls,
1958                              struct GNUNET_MESH_Tunnel *tunnel,
1959                              void **tunnel_ctx,
1960                              const struct GNUNET_PeerIdentity *sender,
1961                              const struct GNUNET_MessageHeader *message,
1962                              const struct GNUNET_ATS_Information*atsi)
1963 {
1964   struct GNUNET_STREAM_Socket *socket = cls;
1965
1966   return
1967     handle_receive_close (socket,
1968                           tunnel,
1969                           sender,
1970                           (const struct GNUNET_STREAM_MessageHeader *) message,
1971                           atsi);
1972 }
1973
1974
1975 /**
1976  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1977  *
1978  * @param cls the socket (set from GNUNET_MESH_connect)
1979  * @param tunnel connection to the other end
1980  * @param tunnel_ctx this is NULL
1981  * @param sender who sent the message
1982  * @param message the actual message
1983  * @param atsi performance data for the connection
1984  * @return GNUNET_OK to keep the connection open,
1985  *         GNUNET_SYSERR to close it (signal serious error)
1986  */
1987 static int
1988 client_handle_receive_close_ack (void *cls,
1989                                  struct GNUNET_MESH_Tunnel *tunnel,
1990                                  void **tunnel_ctx,
1991                                  const struct GNUNET_PeerIdentity *sender,
1992                                  const struct GNUNET_MessageHeader *message,
1993                                  const struct GNUNET_ATS_Information*atsi)
1994 {
1995   struct GNUNET_STREAM_Socket *socket = cls;
1996
1997   return handle_generic_close_ack (socket,
1998                                    tunnel,
1999                                    sender,
2000                                    (const struct GNUNET_STREAM_MessageHeader *)
2001                                    message,
2002                                    atsi,
2003                                    SHUT_RD);
2004 }
2005
2006
2007 /**
2008  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
2009  *
2010  * @param socket the socket
2011  * @param tunnel connection to the other end
2012  * @param sender who sent the message
2013  * @param message the actual message
2014  * @param atsi performance data for the connection
2015  * @return GNUNET_OK to keep the connection open,
2016  *         GNUNET_SYSERR to close it (signal serious error)
2017  */
2018 static int
2019 handle_close (struct GNUNET_STREAM_Socket *socket,
2020               struct GNUNET_MESH_Tunnel *tunnel,
2021               const struct GNUNET_PeerIdentity *sender,
2022               const struct GNUNET_STREAM_MessageHeader *message,
2023               const struct GNUNET_ATS_Information*atsi)
2024 {
2025   struct GNUNET_STREAM_MessageHeader *close_ack;
2026
2027   switch (socket->state)
2028   {
2029   case STATE_INIT:
2030   case STATE_LISTEN:
2031   case STATE_HELLO_WAIT:
2032     LOG (GNUNET_ERROR_TYPE_DEBUG,
2033          "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
2034          GNUNET_i2s (&socket->other_peer));
2035     return GNUNET_OK;
2036   default:
2037     break;
2038   }
2039   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received CLOSE from %s\n",
2040        GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
2041   close_ack = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
2042   close_ack->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
2043   close_ack->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK);
2044   queue_message (socket, close_ack, &set_state_closed, NULL, GNUNET_NO);
2045   if (STATE_CLOSED == socket->state)
2046     return GNUNET_OK;
2047   socket->receive_closed = GNUNET_YES;
2048   socket->transmit_closed = GNUNET_YES;
2049   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
2050   socket->receive_buffer = NULL;
2051   socket->receive_buffer_size = 0;  
2052   /* If write handle is present call it with GNUNET_STREAM_SHUTDOWN to signal
2053      that that stream has been shutdown */
2054   if (NULL != socket->write_handle)
2055   {
2056     GNUNET_STREAM_CompletionContinuation wc;
2057     void *wc_cls;
2058
2059     wc = socket->write_handle->write_cont;
2060     wc_cls = socket->write_handle->write_cont_cls;
2061     GNUNET_STREAM_io_write_cancel (socket->write_handle);
2062     socket->write_handle = NULL;
2063     if (NULL != wc)
2064       wc (wc_cls,
2065           GNUNET_STREAM_SHUTDOWN, 0);
2066   }
2067   return GNUNET_OK;
2068 }
2069
2070
2071 /**
2072  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
2073  *
2074  * @param cls the socket (set from GNUNET_MESH_connect)
2075  * @param tunnel connection to the other end
2076  * @param tunnel_ctx this is NULL
2077  * @param sender who sent the message
2078  * @param message the actual message
2079  * @param atsi performance data for the connection
2080  * @return GNUNET_OK to keep the connection open,
2081  *         GNUNET_SYSERR to close it (signal serious error)
2082  */
2083 static int
2084 client_handle_close (void *cls,
2085                      struct GNUNET_MESH_Tunnel *tunnel,
2086                      void **tunnel_ctx,
2087                      const struct GNUNET_PeerIdentity *sender,
2088                      const struct GNUNET_MessageHeader *message,
2089                      const struct GNUNET_ATS_Information*atsi)
2090 {
2091   struct GNUNET_STREAM_Socket *socket = cls;
2092
2093   return handle_close (socket,
2094                        tunnel,
2095                        sender,
2096                        (const struct GNUNET_STREAM_MessageHeader *) message,
2097                        atsi);
2098 }
2099
2100
2101 /**
2102  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
2103  *
2104  * @param cls the socket (set from GNUNET_MESH_connect)
2105  * @param tunnel connection to the other end
2106  * @param tunnel_ctx this is NULL
2107  * @param sender who sent the message
2108  * @param message the actual message
2109  * @param atsi performance data for the connection
2110  * @return GNUNET_OK to keep the connection open,
2111  *         GNUNET_SYSERR to close it (signal serious error)
2112  */
2113 static int
2114 client_handle_close_ack (void *cls,
2115                          struct GNUNET_MESH_Tunnel *tunnel,
2116                          void **tunnel_ctx,
2117                          const struct GNUNET_PeerIdentity *sender,
2118                          const struct GNUNET_MessageHeader *message,
2119                          const struct GNUNET_ATS_Information *atsi)
2120 {
2121   struct GNUNET_STREAM_Socket *socket = cls;
2122
2123   return handle_generic_close_ack (socket,
2124                                    tunnel,
2125                                    sender,
2126                                    (const struct GNUNET_STREAM_MessageHeader *) 
2127                                    message,
2128                                    atsi,
2129                                    SHUT_RDWR);
2130 }
2131
2132 /*****************************/
2133 /* Server's Message Handlers */
2134 /*****************************/
2135
2136 /**
2137  * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
2138  *
2139  * @param cls the closure
2140  * @param tunnel connection to the other end
2141  * @param tunnel_ctx the socket
2142  * @param sender who sent the message
2143  * @param message the actual message
2144  * @param atsi performance data for the connection
2145  * @return GNUNET_OK to keep the connection open,
2146  *         GNUNET_SYSERR to close it (signal serious error)
2147  */
2148 static int
2149 server_handle_data (void *cls,
2150                     struct GNUNET_MESH_Tunnel *tunnel,
2151                     void **tunnel_ctx,
2152                     const struct GNUNET_PeerIdentity *sender,
2153                     const struct GNUNET_MessageHeader *message,
2154                     const struct GNUNET_ATS_Information*atsi)
2155 {
2156   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2157
2158   return handle_data (socket,
2159                       tunnel,
2160                       sender,
2161                       (const struct GNUNET_STREAM_DataMessage *)message,
2162                       atsi);
2163 }
2164
2165
2166 /**
2167  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
2168  *
2169  * @param cls the closure
2170  * @param tunnel connection to the other end
2171  * @param tunnel_ctx the socket
2172  * @param sender who sent the message
2173  * @param message the actual message
2174  * @param atsi performance data for the connection
2175  * @return GNUNET_OK to keep the connection open,
2176  *         GNUNET_SYSERR to close it (signal serious error)
2177  */
2178 static int
2179 server_handle_hello (void *cls,
2180                      struct GNUNET_MESH_Tunnel *tunnel,
2181                      void **tunnel_ctx,
2182                      const struct GNUNET_PeerIdentity *sender,
2183                      const struct GNUNET_MessageHeader *message,
2184                      const struct GNUNET_ATS_Information*atsi)
2185 {
2186   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2187   struct GNUNET_STREAM_HelloAckMessage *reply;
2188
2189   if (0 != memcmp (sender,
2190                    &socket->other_peer,
2191                    sizeof (struct GNUNET_PeerIdentity)))
2192   {
2193     LOG_DEBUG ("%s: Received HELLO from non-confirming peer\n",
2194                GNUNET_i2s (&socket->other_peer));
2195     return GNUNET_YES;
2196   }
2197   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == ntohs (message->type));
2198   GNUNET_assert (socket->tunnel == tunnel);
2199   LOG_DEBUG ("%s: Received HELLO from %s\n", GNUNET_i2s (&socket->other_peer),
2200              GNUNET_i2s (&socket->other_peer));
2201   switch (socket->state)
2202   {
2203   case STATE_INIT:
2204     reply = generate_hello_ack (socket, GNUNET_YES);
2205     queue_message (socket, &reply->header, &set_state_hello_wait, NULL,
2206                    GNUNET_NO);
2207     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
2208                    socket->control_retransmission_task_id);
2209     socket->control_retransmission_task_id =
2210       GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
2211                                     &control_retransmission_task, socket);
2212     break;
2213   case STATE_HELLO_WAIT:
2214     /* Perhaps our HELLO_ACK was lost */
2215     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != 
2216                    socket->control_retransmission_task_id);
2217     GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
2218     socket->control_retransmission_task_id =
2219       GNUNET_SCHEDULER_add_now (&control_retransmission_task, socket);
2220     break;
2221   default:
2222     LOG_DEBUG( "%s: Client sent HELLO when in state %d\n",
2223                GNUNET_i2s (&socket->other_peer), socket->state);
2224     /* FIXME: Send RESET? */
2225   }
2226   return GNUNET_OK;
2227 }
2228
2229
2230 /**
2231  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
2232  *
2233  * @param cls the closure
2234  * @param tunnel connection to the other end
2235  * @param tunnel_ctx the socket
2236  * @param sender who sent the message
2237  * @param message the actual message
2238  * @param atsi performance data for the connection
2239  * @return GNUNET_OK to keep the connection open,
2240  *         GNUNET_SYSERR to close it (signal serious error)
2241  */
2242 static int
2243 server_handle_hello_ack (void *cls,
2244                          struct GNUNET_MESH_Tunnel *tunnel,
2245                          void **tunnel_ctx,
2246                          const struct GNUNET_PeerIdentity *sender,
2247                          const struct GNUNET_MessageHeader *message,
2248                          const struct GNUNET_ATS_Information*atsi)
2249 {
2250   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2251   const struct GNUNET_STREAM_HelloAckMessage *ack_message;
2252
2253   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK ==
2254                  ntohs (message->type));
2255   GNUNET_assert (socket->tunnel == tunnel);
2256   ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
2257   switch (socket->state)  
2258   {
2259   case STATE_HELLO_WAIT:
2260     LOG (GNUNET_ERROR_TYPE_DEBUG,
2261          "%s: Received HELLO_ACK from %s\n",
2262          GNUNET_i2s (&socket->other_peer),
2263          GNUNET_i2s (&socket->other_peer));
2264     socket->read_sequence_number = ntohl (ack_message->sequence_number);
2265     LOG (GNUNET_ERROR_TYPE_DEBUG,
2266          "%s: Read sequence number %u\n",
2267          GNUNET_i2s (&socket->other_peer),
2268          (unsigned int) socket->read_sequence_number);
2269     socket->receiver_window_available = 
2270       ntohl (ack_message->receiver_window_size);
2271     set_state_established (NULL, socket);
2272     break;
2273   default:
2274     LOG (GNUNET_ERROR_TYPE_DEBUG,
2275          "Client sent HELLO_ACK when in state %d\n", socket->state);    
2276   }
2277   return GNUNET_OK;
2278 }
2279
2280
2281 /**
2282  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
2283  *
2284  * @param cls the closure
2285  * @param tunnel connection to the other end
2286  * @param tunnel_ctx the socket
2287  * @param sender who sent the message
2288  * @param message the actual message
2289  * @param atsi performance data for the connection
2290  * @return GNUNET_OK to keep the connection open,
2291  *         GNUNET_SYSERR to close it (signal serious error)
2292  */
2293 static int
2294 server_handle_reset (void *cls,
2295                      struct GNUNET_MESH_Tunnel *tunnel,
2296                      void **tunnel_ctx,
2297                      const struct GNUNET_PeerIdentity *sender,
2298                      const struct GNUNET_MessageHeader *message,
2299                      const struct GNUNET_ATS_Information*atsi)
2300 {
2301   // struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2302
2303   return GNUNET_OK;
2304 }
2305
2306
2307 /**
2308  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
2309  *
2310  * @param cls the closure
2311  * @param tunnel connection to the other end
2312  * @param tunnel_ctx the socket
2313  * @param sender who sent the message
2314  * @param message the actual message
2315  * @param atsi performance data for the connection
2316  * @return GNUNET_OK to keep the connection open,
2317  *         GNUNET_SYSERR to close it (signal serious error)
2318  */
2319 static int
2320 server_handle_transmit_close (void *cls,
2321                               struct GNUNET_MESH_Tunnel *tunnel,
2322                               void **tunnel_ctx,
2323                               const struct GNUNET_PeerIdentity *sender,
2324                               const struct GNUNET_MessageHeader *message,
2325                               const struct GNUNET_ATS_Information*atsi)
2326 {
2327   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2328
2329   return handle_transmit_close (socket,
2330                                 tunnel,
2331                                 sender,
2332                                 (struct GNUNET_STREAM_MessageHeader *)message,
2333                                 atsi);
2334 }
2335
2336
2337 /**
2338  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
2339  *
2340  * @param cls the closure
2341  * @param tunnel connection to the other end
2342  * @param tunnel_ctx the socket
2343  * @param sender who sent the message
2344  * @param message the actual message
2345  * @param atsi performance data for the connection
2346  * @return GNUNET_OK to keep the connection open,
2347  *         GNUNET_SYSERR to close it (signal serious error)
2348  */
2349 static int
2350 server_handle_transmit_close_ack (void *cls,
2351                                   struct GNUNET_MESH_Tunnel *tunnel,
2352                                   void **tunnel_ctx,
2353                                   const struct GNUNET_PeerIdentity *sender,
2354                                   const struct GNUNET_MessageHeader *message,
2355                                   const struct GNUNET_ATS_Information*atsi)
2356 {
2357   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2358
2359   return handle_generic_close_ack (socket,
2360                                    tunnel,
2361                                    sender,
2362                                    (const struct GNUNET_STREAM_MessageHeader *)
2363                                    message,
2364                                    atsi,
2365                                    SHUT_WR);
2366 }
2367
2368
2369 /**
2370  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
2371  *
2372  * @param cls the closure
2373  * @param tunnel connection to the other end
2374  * @param tunnel_ctx the socket
2375  * @param sender who sent the message
2376  * @param message the actual message
2377  * @param atsi performance data for the connection
2378  * @return GNUNET_OK to keep the connection open,
2379  *         GNUNET_SYSERR to close it (signal serious error)
2380  */
2381 static int
2382 server_handle_receive_close (void *cls,
2383                              struct GNUNET_MESH_Tunnel *tunnel,
2384                              void **tunnel_ctx,
2385                              const struct GNUNET_PeerIdentity *sender,
2386                              const struct GNUNET_MessageHeader *message,
2387                              const struct GNUNET_ATS_Information*atsi)
2388 {
2389   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2390
2391   return
2392     handle_receive_close (socket,
2393                           tunnel,
2394                           sender,
2395                           (const struct GNUNET_STREAM_MessageHeader *) message,
2396                           atsi);
2397 }
2398
2399
2400 /**
2401  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
2402  *
2403  * @param cls the closure
2404  * @param tunnel connection to the other end
2405  * @param tunnel_ctx the socket
2406  * @param sender who sent the message
2407  * @param message the actual message
2408  * @param atsi performance data for the connection
2409  * @return GNUNET_OK to keep the connection open,
2410  *         GNUNET_SYSERR to close it (signal serious error)
2411  */
2412 static int
2413 server_handle_receive_close_ack (void *cls,
2414                                  struct GNUNET_MESH_Tunnel *tunnel,
2415                                  void **tunnel_ctx,
2416                                  const struct GNUNET_PeerIdentity *sender,
2417                                  const struct GNUNET_MessageHeader *message,
2418                                  const struct GNUNET_ATS_Information*atsi)
2419 {
2420   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2421
2422   return handle_generic_close_ack (socket,
2423                                    tunnel,
2424                                    sender,
2425                                    (const struct GNUNET_STREAM_MessageHeader *)
2426                                    message,
2427                                    atsi,
2428                                    SHUT_RD);
2429 }
2430
2431
2432 /**
2433  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
2434  *
2435  * @param cls the listen socket (from GNUNET_MESH_connect in
2436  *          GNUNET_STREAM_listen) 
2437  * @param tunnel connection to the other end
2438  * @param tunnel_ctx the socket
2439  * @param sender who sent the message
2440  * @param message the actual message
2441  * @param atsi performance data for the connection
2442  * @return GNUNET_OK to keep the connection open,
2443  *         GNUNET_SYSERR to close it (signal serious error)
2444  */
2445 static int
2446 server_handle_close (void *cls,
2447                      struct GNUNET_MESH_Tunnel *tunnel,
2448                      void **tunnel_ctx,
2449                      const struct GNUNET_PeerIdentity *sender,
2450                      const struct GNUNET_MessageHeader *message,
2451                      const struct GNUNET_ATS_Information*atsi)
2452 {
2453   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2454   
2455   return handle_close (socket,
2456                        tunnel,
2457                        sender,
2458                        (const struct GNUNET_STREAM_MessageHeader *) message,
2459                        atsi);
2460 }
2461
2462
2463 /**
2464  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
2465  *
2466  * @param cls the closure
2467  * @param tunnel connection to the other end
2468  * @param tunnel_ctx the socket
2469  * @param sender who sent the message
2470  * @param message the actual message
2471  * @param atsi performance data for the connection
2472  * @return GNUNET_OK to keep the connection open,
2473  *         GNUNET_SYSERR to close it (signal serious error)
2474  */
2475 static int
2476 server_handle_close_ack (void *cls,
2477                          struct GNUNET_MESH_Tunnel *tunnel,
2478                          void **tunnel_ctx,
2479                          const struct GNUNET_PeerIdentity *sender,
2480                          const struct GNUNET_MessageHeader *message,
2481                          const struct GNUNET_ATS_Information*atsi)
2482 {
2483   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2484
2485   return handle_generic_close_ack (socket,
2486                                    tunnel,
2487                                    sender,
2488                                    (const struct GNUNET_STREAM_MessageHeader *) 
2489                                    message,
2490                                    atsi,
2491                                    SHUT_RDWR);
2492 }
2493
2494
2495 /**
2496  * Handler for DATA_ACK messages
2497  *
2498  * @param socket the socket through which the ack was received
2499  * @param tunnel connection to the other end
2500  * @param sender who sent the message
2501  * @param ack the acknowledgment message
2502  * @param atsi performance data for the connection
2503  * @return GNUNET_OK to keep the connection open,
2504  *         GNUNET_SYSERR to close it (signal serious error)
2505  */
2506 static int
2507 handle_ack (struct GNUNET_STREAM_Socket *socket,
2508             struct GNUNET_MESH_Tunnel *tunnel,
2509             const struct GNUNET_PeerIdentity *sender,
2510             const struct GNUNET_STREAM_AckMessage *ack,
2511             const struct GNUNET_ATS_Information*atsi)
2512 {
2513   unsigned int packet;
2514   int need_retransmission;
2515   uint32_t sequence_difference;
2516   
2517   if (0 != memcmp (sender,
2518                    &socket->other_peer,
2519                    sizeof (struct GNUNET_PeerIdentity)))
2520   {
2521     LOG (GNUNET_ERROR_TYPE_DEBUG,
2522          "%s: Received ACK from non-confirming peer\n",
2523          GNUNET_i2s (&socket->other_peer));
2524     return GNUNET_YES;
2525   }
2526   switch (socket->state)
2527   {
2528   case (STATE_ESTABLISHED):
2529   case (STATE_RECEIVE_CLOSED):
2530   case (STATE_RECEIVE_CLOSE_WAIT):
2531     if (NULL == socket->write_handle)
2532     {
2533       LOG (GNUNET_ERROR_TYPE_DEBUG,
2534            "%s: Received DATA_ACK when write_handle is NULL\n",
2535            GNUNET_i2s (&socket->other_peer));
2536       return GNUNET_OK;
2537     }
2538     sequence_difference = 
2539         socket->write_sequence_number - ntohl (ack->base_sequence_number);
2540     if (!(sequence_difference <= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
2541     {
2542       LOG (GNUNET_ERROR_TYPE_DEBUG,
2543            "%s: Received DATA_ACK with unexpected base sequence number\n",
2544            GNUNET_i2s (&socket->other_peer));
2545       LOG (GNUNET_ERROR_TYPE_DEBUG,
2546            "%s: Current write sequence: %u; Ack's base sequence: %u\n",
2547            GNUNET_i2s (&socket->other_peer),
2548            socket->write_sequence_number,
2549            ntohl (ack->base_sequence_number));
2550       return GNUNET_OK;
2551     }
2552     /* FIXME: include the case when write_handle is cancelled - ignore the 
2553        acks */
2554     LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received DATA_ACK from %s\n",
2555          GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
2556     /* Cancel the retransmission task */
2557     if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
2558     {
2559       GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
2560       socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
2561       socket->data_retransmit_timeout = GNUNET_TIME_UNIT_SECONDS;
2562     }
2563     for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2564     {
2565       if (NULL == socket->write_handle->messages[packet]) break;
2566       /* BS: Base sequence from ack; PS: sequence num of current packet */
2567       sequence_difference = ntohl (ack->base_sequence_number)
2568         - ntohl (socket->write_handle->messages[packet]->sequence_number);
2569       if ((0 == sequence_difference) ||
2570           (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH < sequence_difference))
2571         continue; /* The message in our handle is not yet received */
2572       /* case where BS = PS + GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH */
2573       /* sequence_difference <= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH */
2574       ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
2575                             packet, GNUNET_YES);
2576     }
2577     /* Update the receive window remaining
2578        FIXME : Should update with the value from a data ack with greater
2579        sequence number */
2580     socket->receiver_window_available = 
2581       ntohl (ack->receive_window_remaining);
2582     /* Check if we have received all acknowledgements */
2583     need_retransmission = GNUNET_NO;
2584     for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2585     {
2586       if (NULL == socket->write_handle->messages[packet]) break;
2587       if (GNUNET_YES != ackbitmap_is_bit_set 
2588           (&socket->write_handle->ack_bitmap,packet))
2589       {
2590         need_retransmission = GNUNET_YES;
2591         break;
2592       }
2593     }
2594     if (GNUNET_YES == need_retransmission)
2595     {
2596       write_data (socket);
2597     }
2598     else      /* We have to call the write continuation callback now */
2599     {
2600       struct GNUNET_STREAM_IOWriteHandle *write_handle;
2601       
2602       /* Free the packets */
2603       for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2604       {
2605         GNUNET_free_non_null (socket->write_handle->messages[packet]);
2606       }
2607       write_handle = socket->write_handle;
2608       socket->write_handle = NULL;
2609       if (NULL != write_handle->write_cont)
2610         write_handle->write_cont (write_handle->write_cont_cls,
2611                                   socket->status,
2612                                   write_handle->size);
2613       /* We are done with the write handle - Freeing it */
2614       GNUNET_free (write_handle);
2615       LOG (GNUNET_ERROR_TYPE_DEBUG,
2616            "%s: Write completion callback completed\n",
2617            GNUNET_i2s (&socket->other_peer));      
2618     }
2619     break;
2620   default:
2621     break;
2622   }
2623   return GNUNET_OK;
2624 }
2625
2626
2627 /**
2628  * Handler for DATA_ACK messages
2629  *
2630  * @param cls the 'struct GNUNET_STREAM_Socket'
2631  * @param tunnel connection to the other end
2632  * @param tunnel_ctx unused
2633  * @param sender who sent the message
2634  * @param message the actual message
2635  * @param atsi performance data for the connection
2636  * @return GNUNET_OK to keep the connection open,
2637  *         GNUNET_SYSERR to close it (signal serious error)
2638  */
2639 static int
2640 client_handle_ack (void *cls,
2641                    struct GNUNET_MESH_Tunnel *tunnel,
2642                    void **tunnel_ctx,
2643                    const struct GNUNET_PeerIdentity *sender,
2644                    const struct GNUNET_MessageHeader *message,
2645                    const struct GNUNET_ATS_Information*atsi)
2646 {
2647   struct GNUNET_STREAM_Socket *socket = cls;
2648   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2649  
2650   return handle_ack (socket, tunnel, sender, ack, atsi);
2651 }
2652
2653
2654 /**
2655  * Handler for DATA_ACK messages
2656  *
2657  * @param cls the server's listen socket
2658  * @param tunnel connection to the other end
2659  * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
2660  * @param sender who sent the message
2661  * @param message the actual message
2662  * @param atsi performance data for the connection
2663  * @return GNUNET_OK to keep the connection open,
2664  *         GNUNET_SYSERR to close it (signal serious error)
2665  */
2666 static int
2667 server_handle_ack (void *cls,
2668                    struct GNUNET_MESH_Tunnel *tunnel,
2669                    void **tunnel_ctx,
2670                    const struct GNUNET_PeerIdentity *sender,
2671                    const struct GNUNET_MessageHeader *message,
2672                    const struct GNUNET_ATS_Information*atsi)
2673 {
2674   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2675   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2676  
2677   return handle_ack (socket, tunnel, sender, ack, atsi);
2678 }
2679
2680
2681 /**
2682  * For client message handlers, the stream socket is in the
2683  * closure argument.
2684  */
2685 static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
2686   {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2687   {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2688    sizeof (struct GNUNET_STREAM_AckMessage) },
2689   {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2690    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2691   {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2692    sizeof (struct GNUNET_STREAM_MessageHeader)},
2693   {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2694    sizeof (struct GNUNET_STREAM_MessageHeader)},
2695   {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2696    sizeof (struct GNUNET_STREAM_MessageHeader)},
2697   {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2698    sizeof (struct GNUNET_STREAM_MessageHeader)},
2699   {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2700    sizeof (struct GNUNET_STREAM_MessageHeader)},
2701   {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2702    sizeof (struct GNUNET_STREAM_MessageHeader)},
2703   {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2704    sizeof (struct GNUNET_STREAM_MessageHeader)},
2705   {NULL, 0, 0}
2706 };
2707
2708
2709 /**
2710  * For server message handlers, the stream socket is in the
2711  * tunnel context, and the listen socket in the closure argument.
2712  */
2713 static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
2714   {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2715   {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2716    sizeof (struct GNUNET_STREAM_AckMessage) },
2717   {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 
2718    sizeof (struct GNUNET_STREAM_MessageHeader)},
2719   {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2720    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2721   {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2722    sizeof (struct GNUNET_STREAM_MessageHeader)},
2723   {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2724    sizeof (struct GNUNET_STREAM_MessageHeader)},
2725   {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2726    sizeof (struct GNUNET_STREAM_MessageHeader)},
2727   {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2728    sizeof (struct GNUNET_STREAM_MessageHeader)},
2729   {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2730    sizeof (struct GNUNET_STREAM_MessageHeader)},
2731   {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2732    sizeof (struct GNUNET_STREAM_MessageHeader)},
2733   {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2734    sizeof (struct GNUNET_STREAM_MessageHeader)},
2735   {NULL, 0, 0}
2736 };
2737
2738
2739 /**
2740  * Function called when our target peer is connected to our tunnel
2741  *
2742  * @param cls the socket for which this tunnel is created
2743  * @param peer the peer identity of the target
2744  * @param atsi performance data for the connection
2745  */
2746 static void
2747 mesh_peer_connect_callback (void *cls,
2748                             const struct GNUNET_PeerIdentity *peer,
2749                             const struct GNUNET_ATS_Information * atsi)
2750 {
2751   struct GNUNET_STREAM_Socket *socket = cls;
2752   struct GNUNET_STREAM_MessageHeader *message;
2753   
2754   if (0 != memcmp (peer,
2755                    &socket->other_peer,
2756                    sizeof (struct GNUNET_PeerIdentity)))
2757   {
2758     LOG (GNUNET_ERROR_TYPE_DEBUG,
2759          "%s: A peer which is not our target has connected to our tunnel\n",
2760          GNUNET_i2s(peer));
2761     return;
2762   }
2763   LOG (GNUNET_ERROR_TYPE_DEBUG,
2764        "%s: Target peer %s connected\n",
2765        GNUNET_i2s (&socket->other_peer),
2766        GNUNET_i2s (&socket->other_peer));
2767   /* Set state to INIT */
2768   socket->state = STATE_INIT;
2769   /* Send HELLO message */
2770   message = generate_hello ();
2771   queue_message (socket, message, &set_state_hello_wait, NULL, GNUNET_NO);
2772   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
2773                  socket->control_retransmission_task_id);
2774   socket->control_retransmission_task_id =
2775     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
2776                                   &control_retransmission_task, socket);
2777 }
2778
2779
2780 /**
2781  * Function called when our target peer is disconnected from our tunnel
2782  *
2783  * @param cls the socket associated which this tunnel
2784  * @param peer the peer identity of the target
2785  */
2786 static void
2787 mesh_peer_disconnect_callback (void *cls,
2788                                const struct GNUNET_PeerIdentity *peer)
2789 {
2790   struct GNUNET_STREAM_Socket *socket=cls;
2791   
2792   /* If the state is SHUTDOWN its ok; else set the state of the socket to SYSERR */
2793   LOG (GNUNET_ERROR_TYPE_DEBUG,
2794        "%s: Other peer %s disconnected \n",
2795        GNUNET_i2s (&socket->other_peer),
2796        GNUNET_i2s (&socket->other_peer));
2797 }
2798
2799
2800 /**
2801  * Method called whenever a peer creates a tunnel to us
2802  *
2803  * @param cls closure
2804  * @param tunnel new handle to the tunnel
2805  * @param initiator peer that started the tunnel
2806  * @param atsi performance information for the tunnel
2807  * @return initial tunnel context for the tunnel
2808  *         (can be NULL -- that's not an error)
2809  */
2810 static void *
2811 new_tunnel_notify (void *cls,
2812                    struct GNUNET_MESH_Tunnel *tunnel,
2813                    const struct GNUNET_PeerIdentity *initiator,
2814                    const struct GNUNET_ATS_Information *atsi)
2815 {
2816   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2817   struct GNUNET_STREAM_Socket *socket;
2818
2819   /* FIXME: If a tunnel is already created, we should not accept new tunnels
2820      from the same peer again until the socket is closed */
2821
2822   if (GNUNET_NO == lsocket->listening)
2823   {
2824     GNUNET_MESH_tunnel_destroy (tunnel);
2825     return NULL;
2826   }
2827   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2828   socket->other_peer = *initiator;
2829   socket->tunnel = tunnel;
2830   socket->state = STATE_INIT;
2831   socket->lsocket = lsocket;
2832   socket->stat_handle = lsocket->stat_handle;
2833   socket->retransmit_timeout = lsocket->retransmit_timeout;
2834   socket->testing_active = lsocket->testing_active;
2835   socket->testing_set_write_sequence_number_value =
2836       lsocket->testing_set_write_sequence_number_value;
2837   socket->max_payload_size = lsocket->max_payload_size;
2838   LOG (GNUNET_ERROR_TYPE_DEBUG,
2839        "%s: Peer %s initiated tunnel to us\n", 
2840        GNUNET_i2s (&socket->other_peer),
2841        GNUNET_i2s (&socket->other_peer));
2842   if (NULL != socket->stat_handle)
2843   {
2844     GNUNET_STATISTICS_update (socket->stat_handle,
2845                               "total inbound connections received",
2846                               1, GNUNET_NO);
2847     GNUNET_STATISTICS_update (socket->stat_handle,
2848                               "inbound connections", 1, GNUNET_NO);
2849   }
2850   
2851   return socket;
2852 }
2853
2854
2855 /**
2856  * Function called whenever an inbound tunnel is destroyed.  Should clean up
2857  * any associated state.  This function is NOT called if the client has
2858  * explicitly asked for the tunnel to be destroyed using
2859  * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
2860  * the tunnel.
2861  *
2862  * @param cls closure (set from GNUNET_MESH_connect)
2863  * @param tunnel connection to the other end (henceforth invalid)
2864  * @param tunnel_ctx place where local state associated
2865  *                   with the tunnel is stored
2866  */
2867 static void 
2868 tunnel_cleaner (void *cls,
2869                 const struct GNUNET_MESH_Tunnel *tunnel,
2870                 void *tunnel_ctx)
2871 {
2872   struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
2873   struct MessageQueue *head;
2874
2875   GNUNET_assert (tunnel == socket->tunnel);
2876   GNUNET_break_op(0);
2877   LOG (GNUNET_ERROR_TYPE_DEBUG,
2878        "%s: Peer %s has terminated connection abruptly\n",
2879        GNUNET_i2s (&socket->other_peer),
2880        GNUNET_i2s (&socket->other_peer));
2881   if (NULL != socket->stat_handle)
2882   {
2883     GNUNET_STATISTICS_update (socket->stat_handle,
2884                               "connections terminated abruptly", 1, GNUNET_NO);
2885     GNUNET_STATISTICS_update (socket->stat_handle,
2886                               "inbound connections", -1, GNUNET_NO);
2887   }
2888   socket->status = GNUNET_STREAM_SHUTDOWN;
2889   /* Clear Transmit handles */
2890   if (NULL != socket->transmit_handle)
2891   {
2892     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2893     socket->transmit_handle = NULL;
2894   }
2895   /* Stop Tasks using socket->tunnel */
2896   if (GNUNET_SCHEDULER_NO_TASK != socket->ack_task_id)
2897   {
2898     GNUNET_SCHEDULER_cancel (socket->ack_task_id);
2899     socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
2900   }
2901   if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
2902   {
2903     GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
2904     socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
2905   }  
2906   /* Terminate the control retransmission tasks */
2907   if (GNUNET_SCHEDULER_NO_TASK != socket->control_retransmission_task_id)
2908   {
2909     GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
2910     socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
2911   }
2912   /* Clear Transmit handles */
2913   if (NULL != socket->transmit_handle)
2914   {
2915     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2916     socket->transmit_handle = NULL;
2917   }
2918   /* Clear existing message queue */
2919   while (NULL != (head = socket->queue_head)) {
2920     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
2921                                  socket->queue_tail,
2922                                  head);
2923     GNUNET_free (head->message);
2924     GNUNET_free (head);
2925   }
2926   socket->tunnel = NULL;
2927 }
2928
2929
2930 /**
2931  * Callback to signal timeout on lockmanager lock acquire
2932  *
2933  * @param cls the ListenSocket
2934  * @param tc the scheduler task context
2935  */
2936 static void
2937 lockmanager_acquire_timeout (void *cls, 
2938                              const struct GNUNET_SCHEDULER_TaskContext *tc)
2939 {
2940   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2941   GNUNET_STREAM_ListenCallback listen_cb;
2942   void *listen_cb_cls;
2943
2944   lsocket->lockmanager_acquire_timeout_task = GNUNET_SCHEDULER_NO_TASK;
2945   listen_cb = lsocket->listen_cb;
2946   listen_cb_cls = lsocket->listen_cb_cls;
2947   if (NULL != listen_cb)
2948     listen_cb (listen_cb_cls, NULL, NULL);
2949 }
2950
2951
2952 /**
2953  * Callback to notify us on the status changes on app_port lock
2954  *
2955  * @param cls the ListenSocket
2956  * @param domain the domain name of the lock
2957  * @param lock the app_port
2958  * @param status the current status of the lock
2959  */
2960 static void
2961 lock_status_change_cb (void *cls, const char *domain, uint32_t lock,
2962                        enum GNUNET_LOCKMANAGER_Status status)
2963 {
2964   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2965
2966   GNUNET_assert (lock == (uint32_t) lsocket->port);
2967   if (GNUNET_LOCKMANAGER_SUCCESS == status)
2968   {
2969     lsocket->listening = GNUNET_YES;
2970     if (GNUNET_SCHEDULER_NO_TASK != lsocket->lockmanager_acquire_timeout_task)
2971     {
2972       GNUNET_SCHEDULER_cancel (lsocket->lockmanager_acquire_timeout_task);
2973       lsocket->lockmanager_acquire_timeout_task = GNUNET_SCHEDULER_NO_TASK;
2974     }
2975     if (NULL == lsocket->mesh)
2976     {
2977       GNUNET_MESH_ApplicationType ports[] = {lsocket->port, 0};
2978
2979       lsocket->mesh = GNUNET_MESH_connect (lsocket->cfg,
2980                                            lsocket, /* Closure */
2981                                            &new_tunnel_notify,
2982                                            &tunnel_cleaner,
2983                                            server_message_handlers,
2984                                            ports);
2985       GNUNET_assert (NULL != lsocket->mesh);
2986       if (NULL != lsocket->listen_ok_cb)
2987       {
2988         (void) lsocket->listen_ok_cb ();
2989       }
2990     }
2991   }
2992   if (GNUNET_LOCKMANAGER_RELEASE == status)
2993     lsocket->listening = GNUNET_NO;
2994 }
2995
2996
2997 /*****************/
2998 /* API functions */
2999 /*****************/
3000
3001
3002 /**
3003  * Tries to open a stream to the target peer
3004  *
3005  * @param cfg configuration to use
3006  * @param target the target peer to which the stream has to be opened
3007  * @param app_port the application port number which uniquely identifies this
3008  *            stream
3009  * @param open_cb this function will be called after stream has be established;
3010  *          cannot be NULL
3011  * @param open_cb_cls the closure for open_cb
3012  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
3013  * @return if successful it returns the stream socket; NULL if stream cannot be
3014  *         opened 
3015  */
3016 struct GNUNET_STREAM_Socket *
3017 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
3018                     const struct GNUNET_PeerIdentity *target,
3019                     GNUNET_MESH_ApplicationType app_port,
3020                     GNUNET_STREAM_OpenCallback open_cb,
3021                     void *open_cb_cls,
3022                     ...)
3023 {
3024   struct GNUNET_STREAM_Socket *socket;
3025   enum GNUNET_STREAM_Option option;
3026   GNUNET_MESH_ApplicationType ports[] = {app_port, 0};
3027   va_list vargs;
3028   uint16_t payload_size;
3029
3030   LOG (GNUNET_ERROR_TYPE_DEBUG,
3031        "%s\n", __func__);
3032   GNUNET_assert (NULL != open_cb);
3033   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
3034   socket->other_peer = *target;
3035   socket->open_cb = open_cb;
3036   socket->open_cls = open_cb_cls;
3037   /* Set defaults */
3038   socket->retransmit_timeout = TIME_REL_SECS (default_timeout);
3039   socket->testing_active = GNUNET_NO;
3040   socket->max_payload_size = DEFAULT_MAX_PAYLOAD_SIZE;
3041   va_start (vargs, open_cb_cls); /* Parse variable args */
3042   do {
3043     option = va_arg (vargs, enum GNUNET_STREAM_Option);
3044     switch (option)
3045     {
3046     case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
3047       /* Expect struct GNUNET_TIME_Relative */
3048       socket->retransmit_timeout = va_arg (vargs,
3049                                            struct GNUNET_TIME_Relative);
3050       break;
3051     case GNUNET_STREAM_OPTION_TESTING_SET_WRITE_SEQUENCE_NUMBER:
3052       socket->testing_active = GNUNET_YES;
3053       socket->testing_set_write_sequence_number_value = va_arg (vargs,
3054                                                                 uint32_t);
3055       break;
3056     case GNUNET_STREAM_OPTION_LISTEN_TIMEOUT:
3057       GNUNET_break (0);          /* Option irrelevant in STREAM_open */
3058       break;
3059     case GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS:
3060       GNUNET_break (0);          /* Option irrelevant in STREAM_open */
3061       break;
3062     case GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE:
3063       payload_size = (uint16_t) va_arg (vargs, unsigned int);
3064       GNUNET_assert (0 != payload_size);
3065       if (payload_size < socket->max_payload_size)
3066         socket->max_payload_size = payload_size;
3067       break;
3068     case GNUNET_STREAM_OPTION_END:
3069       break;
3070     }
3071   } while (GNUNET_STREAM_OPTION_END != option);
3072   va_end (vargs);               /* End of variable args parsing */
3073   socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
3074                                       socket, /* cls */
3075                                       NULL, /* No inbound tunnel handler */
3076                                       NULL, /* No in-tunnel cleaner */
3077                                       client_message_handlers,
3078                                       ports); /* We don't get inbound tunnels */
3079   if (NULL == socket->mesh)   /* Fail if we cannot connect to mesh */
3080   {
3081     GNUNET_free (socket);
3082     return NULL;
3083   }
3084   /* Now create the mesh tunnel to target */
3085   LOG (GNUNET_ERROR_TYPE_DEBUG, "Creating MESH Tunnel\n");
3086   socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
3087                                               NULL, /* Tunnel context */
3088                                               &mesh_peer_connect_callback,
3089                                               &mesh_peer_disconnect_callback,
3090                                               socket);
3091   GNUNET_assert (NULL != socket->tunnel);
3092   GNUNET_MESH_peer_request_connect_add (socket->tunnel,
3093                                         &socket->other_peer);
3094   socket->stat_handle = GNUNET_STATISTICS_create ("stream", cfg);
3095   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s() END\n", __func__);
3096   return socket;
3097 }
3098
3099
3100 /**
3101  * Shutdown the stream for reading or writing (similar to man 2 shutdown).
3102  *
3103  * @param socket the stream socket
3104  * @param operation SHUT_RD, SHUT_WR or SHUT_RDWR
3105  * @param completion_cb the callback that will be called upon successful
3106  *          shutdown of given operation
3107  * @param completion_cls the closure for the completion callback
3108  * @return the shutdown handle
3109  */
3110 struct GNUNET_STREAM_ShutdownHandle *
3111 GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
3112                         int operation,
3113                         GNUNET_STREAM_ShutdownCompletion completion_cb,
3114                         void *completion_cls)
3115 {
3116   struct GNUNET_STREAM_ShutdownHandle *handle;
3117   struct GNUNET_STREAM_MessageHeader *msg;
3118   
3119   GNUNET_assert (NULL == socket->shutdown_handle);
3120   handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ShutdownHandle));
3121   handle->socket = socket;
3122   handle->completion_cb = completion_cb;
3123   handle->completion_cls = completion_cls;
3124   socket->shutdown_handle = handle;
3125   if ( ((GNUNET_YES == socket->receive_closed) && (SHUT_RD == operation))
3126        || ((GNUNET_YES == socket->transmit_closed) && (SHUT_WR == operation))
3127        || ((GNUNET_YES == socket->transmit_closed) 
3128            && (GNUNET_YES == socket->receive_closed)
3129            && (SHUT_RDWR == operation)) )
3130   {
3131     handle->operation = operation;
3132     handle->call_cont_task_id = GNUNET_SCHEDULER_add_now (&call_cont_task,
3133                                                           socket);
3134     return handle;
3135   }
3136   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
3137   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
3138   switch (operation)
3139   {
3140   case SHUT_RD:
3141     handle->operation = SHUT_RD;
3142     if (NULL != socket->read_handle)
3143       LOG (GNUNET_ERROR_TYPE_WARNING,
3144            "Existing read handle should be cancelled before shutting"
3145            " down reading\n");
3146     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
3147     queue_message (socket, msg, &set_state_receive_close_wait, NULL,
3148                    GNUNET_NO);
3149     socket->receive_closed = GNUNET_YES;
3150     break;
3151   case SHUT_WR:
3152     handle->operation = SHUT_WR;
3153     if (NULL != socket->write_handle)
3154       LOG (GNUNET_ERROR_TYPE_WARNING,
3155            "Existing write handle should be cancelled before shutting"
3156            " down writing\n");
3157     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
3158     queue_message (socket, msg, &set_state_transmit_close_wait, NULL,
3159                    GNUNET_NO);
3160     socket->transmit_closed = GNUNET_YES;
3161     break;
3162   case SHUT_RDWR:
3163     handle->operation = SHUT_RDWR;
3164     if (NULL != socket->write_handle)
3165       LOG (GNUNET_ERROR_TYPE_WARNING,
3166            "Existing write handle should be cancelled before shutting"
3167            " down writing\n");
3168     if (NULL != socket->read_handle)
3169       LOG (GNUNET_ERROR_TYPE_WARNING,
3170            "Existing read handle should be cancelled before shutting"
3171            " down reading\n");
3172     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
3173     queue_message (socket, msg, &set_state_close_wait, NULL, GNUNET_NO);
3174     socket->transmit_closed = GNUNET_YES;
3175     socket->receive_closed = GNUNET_YES;
3176     break;
3177   default:
3178     LOG (GNUNET_ERROR_TYPE_WARNING,
3179          "GNUNET_STREAM_shutdown called with invalid value for "
3180          "parameter operation -- Ignoring\n");
3181     GNUNET_free (msg);
3182     GNUNET_free (handle);
3183     return NULL;
3184   }
3185   handle->close_msg_retransmission_task_id =
3186     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
3187                                   &close_msg_retransmission_task,
3188                                   handle);
3189   return handle;
3190 }
3191
3192
3193 /**
3194  * Cancels a pending shutdown. Note that the shutdown messages may already be
3195  * sent and the stream is shutdown already for the operation given to
3196  * GNUNET_STREAM_shutdown(). This function only clears up any retranmissions of
3197  * shutdown messages and frees the shutdown handle.
3198  *
3199  * @param handle the shutdown handle returned from GNUNET_STREAM_shutdown
3200  */
3201 void
3202 GNUNET_STREAM_shutdown_cancel (struct GNUNET_STREAM_ShutdownHandle *handle)
3203 {
3204   if (GNUNET_SCHEDULER_NO_TASK != handle->close_msg_retransmission_task_id)
3205     GNUNET_SCHEDULER_cancel (handle->close_msg_retransmission_task_id);
3206   if (GNUNET_SCHEDULER_NO_TASK != handle->call_cont_task_id)
3207     GNUNET_SCHEDULER_cancel (handle->call_cont_task_id);
3208   handle->socket->shutdown_handle = NULL;
3209   GNUNET_free (handle);
3210 }
3211
3212
3213 /**
3214  * Closes the stream
3215  *
3216  * @param socket the stream socket
3217  */
3218 void
3219 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
3220 {
3221   struct MessageQueue *head;
3222
3223   if (NULL != socket->read_handle)
3224   {
3225     LOG (GNUNET_ERROR_TYPE_WARNING,
3226          "Closing STREAM socket when a read handle is pending\n");
3227     GNUNET_STREAM_io_read_cancel (socket->read_handle);
3228   }
3229   if (NULL != socket->write_handle)
3230   {
3231     LOG (GNUNET_ERROR_TYPE_WARNING,
3232          "Closing STREAM socket when a write handle is pending\n");
3233     GNUNET_STREAM_io_write_cancel (socket->write_handle);
3234     //socket->write_handle = NULL;
3235   }
3236   /* Terminate the ack'ing task if they are still present */
3237   if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK)
3238   {
3239     GNUNET_SCHEDULER_cancel (socket->ack_task_id);
3240     socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
3241   }
3242   /* Terminate the control retransmission tasks */
3243   if (GNUNET_SCHEDULER_NO_TASK != socket->control_retransmission_task_id)
3244   {
3245     GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
3246   }
3247   /* Clear Transmit handles */
3248   if (NULL != socket->transmit_handle)
3249   {
3250     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
3251     socket->transmit_handle = NULL;
3252   }
3253   /* Clear existing message queue */
3254   while (NULL != (head = socket->queue_head)) {
3255     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
3256                                  socket->queue_tail,
3257                                  head);
3258     GNUNET_free (head->message);
3259     GNUNET_free (head);
3260   }
3261   /* Close associated tunnel */
3262   if (NULL != socket->tunnel)
3263   {
3264     GNUNET_MESH_tunnel_destroy (socket->tunnel);
3265     socket->tunnel = NULL;
3266   }
3267   /* Close mesh connection */
3268   if ((NULL != socket->mesh) && (NULL == socket->lsocket))
3269   {
3270     GNUNET_MESH_disconnect (socket->mesh);
3271     socket->mesh = NULL;
3272   }
3273   /* Close statistics connection */
3274   if ( (NULL != socket->stat_handle) && (NULL == socket->lsocket) )
3275     GNUNET_STATISTICS_destroy (socket->stat_handle, GNUNET_YES);
3276   /* Release receive buffer */
3277   if (NULL != socket->receive_buffer)
3278   {
3279     GNUNET_free (socket->receive_buffer);
3280   }
3281   GNUNET_free (socket);
3282 }
3283
3284
3285 /**
3286  * Listens for stream connections for a specific application ports
3287  *
3288  * @param cfg the configuration to use
3289  * @param app_port the application port for which new streams will be accepted
3290  * @param listen_cb this function will be called when a peer tries to establish
3291  *            a stream with us
3292  * @param listen_cb_cls closure for listen_cb
3293  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
3294  * @return listen socket, NULL for any error
3295  */
3296 struct GNUNET_STREAM_ListenSocket *
3297 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
3298                       GNUNET_MESH_ApplicationType app_port,
3299                       GNUNET_STREAM_ListenCallback listen_cb,
3300                       void *listen_cb_cls,
3301                       ...)
3302 {
3303   struct GNUNET_STREAM_ListenSocket *lsocket;
3304   struct GNUNET_TIME_Relative listen_timeout;
3305   enum GNUNET_STREAM_Option option;
3306   va_list vargs;
3307   uint16_t payload_size;
3308
3309   GNUNET_assert (NULL != listen_cb);
3310   lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
3311   lsocket->cfg = GNUNET_CONFIGURATION_dup (cfg);
3312   lsocket->lockmanager = GNUNET_LOCKMANAGER_connect (lsocket->cfg);
3313   if (NULL == lsocket->lockmanager)
3314   {
3315     GNUNET_CONFIGURATION_destroy (lsocket->cfg);
3316     GNUNET_free (lsocket);
3317     return NULL;
3318   }
3319   lsocket->listening = GNUNET_NO;/* We listen when we get a lock on app_port */  
3320   /* Set defaults */
3321   lsocket->retransmit_timeout = TIME_REL_SECS (default_timeout);
3322   lsocket->testing_active = GNUNET_NO;
3323   lsocket->listen_ok_cb = NULL;
3324   lsocket->max_payload_size = DEFAULT_MAX_PAYLOAD_SIZE;
3325   listen_timeout = TIME_REL_SECS (60); /* A minute for listen timeout */  
3326   va_start (vargs, listen_cb_cls);
3327   do {
3328     option = va_arg (vargs, enum GNUNET_STREAM_Option);
3329     switch (option)
3330     {
3331     case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
3332       lsocket->retransmit_timeout = va_arg (vargs,
3333                                             struct GNUNET_TIME_Relative);
3334       break;
3335     case GNUNET_STREAM_OPTION_TESTING_SET_WRITE_SEQUENCE_NUMBER:
3336       lsocket->testing_active = GNUNET_YES;
3337       lsocket->testing_set_write_sequence_number_value = va_arg (vargs,
3338                                                                  uint32_t);
3339       break;
3340     case GNUNET_STREAM_OPTION_LISTEN_TIMEOUT:
3341       listen_timeout = GNUNET_TIME_relative_multiply
3342         (GNUNET_TIME_UNIT_MILLISECONDS, va_arg (vargs, uint32_t));
3343       break;
3344     case GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS:
3345       lsocket->listen_ok_cb = va_arg (vargs,
3346                                       GNUNET_STREAM_ListenSuccessCallback);
3347       break;
3348     case GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE:
3349       payload_size = (uint16_t) va_arg (vargs, unsigned int);
3350       GNUNET_assert (0 != payload_size);
3351       if (payload_size < lsocket->max_payload_size)
3352         lsocket->max_payload_size = payload_size;
3353       break;
3354     case GNUNET_STREAM_OPTION_END:
3355       break;
3356     }
3357   } while (GNUNET_STREAM_OPTION_END != option);
3358   va_end (vargs);
3359   lsocket->port = app_port;
3360   lsocket->listen_cb = listen_cb;
3361   lsocket->listen_cb_cls = listen_cb_cls;
3362   lsocket->locking_request = 
3363     GNUNET_LOCKMANAGER_acquire_lock (lsocket->lockmanager, locking_domain,
3364                                      (uint32_t) lsocket->port,
3365                                      &lock_status_change_cb, lsocket);
3366   lsocket->lockmanager_acquire_timeout_task =
3367     GNUNET_SCHEDULER_add_delayed (listen_timeout,
3368                                   &lockmanager_acquire_timeout, lsocket);
3369   lsocket->stat_handle = GNUNET_STATISTICS_create ("stream",
3370                                                    lsocket->cfg);
3371   return lsocket;
3372 }
3373
3374
3375 /**
3376  * Closes the listen socket
3377  *
3378  * @param lsocket the listen socket
3379  */
3380 void
3381 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
3382 {
3383   /* Close MESH connection */
3384   if (NULL != lsocket->mesh)
3385     GNUNET_MESH_disconnect (lsocket->mesh);
3386   if (NULL != lsocket->stat_handle)
3387     GNUNET_STATISTICS_destroy (lsocket->stat_handle, GNUNET_YES);
3388   GNUNET_CONFIGURATION_destroy (lsocket->cfg);
3389   if (GNUNET_SCHEDULER_NO_TASK != lsocket->lockmanager_acquire_timeout_task)
3390     GNUNET_SCHEDULER_cancel (lsocket->lockmanager_acquire_timeout_task);
3391   if (NULL != lsocket->locking_request)
3392     GNUNET_LOCKMANAGER_cancel_request (lsocket->locking_request);
3393   if (NULL != lsocket->lockmanager)
3394     GNUNET_LOCKMANAGER_disconnect (lsocket->lockmanager);
3395   GNUNET_free (lsocket);
3396 }
3397
3398
3399 /**
3400  * Tries to write the given data to the stream. The maximum size of data that
3401  * can be written as part of a write operation is (64 * (64000 - sizeof (struct
3402  * GNUNET_STREAM_DataMessage))). If size is greater than this it is not an API
3403  * violation, however only the said number of maximum bytes will be written.
3404  *
3405  * @param socket the socket representing a stream
3406  * @param data the data buffer from where the data is written into the stream
3407  * @param size the number of bytes to be written from the data buffer
3408  * @param timeout the timeout period
3409  * @param write_cont the function to call upon writing some bytes into the
3410  *          stream 
3411  * @param write_cont_cls the closure
3412  *
3413  * @return handle to cancel the operation; if a previous write is pending or
3414  *           the stream has been shutdown for this operation then write_cont is
3415  *           immediately called and NULL is returned.
3416  */
3417 struct GNUNET_STREAM_IOWriteHandle *
3418 GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
3419                      const void *data,
3420                      size_t size,
3421                      struct GNUNET_TIME_Relative timeout,
3422                      GNUNET_STREAM_CompletionContinuation write_cont,
3423                      void *write_cont_cls)
3424 {
3425   struct GNUNET_STREAM_IOWriteHandle *io_handle;
3426   struct GNUNET_STREAM_DataMessage *data_msg;
3427   const void *sweep;
3428   struct GNUNET_TIME_Relative ack_deadline;
3429   unsigned int num_needed_packets;
3430   unsigned int packet;
3431   uint32_t packet_size;
3432   uint32_t payload_size;
3433   uint16_t max_data_packet_size;
3434
3435   LOG (GNUNET_ERROR_TYPE_DEBUG,
3436        "%s\n", __func__);
3437   if (NULL != socket->write_handle)
3438   {
3439     GNUNET_break (0);
3440     return NULL;
3441   }
3442   switch (socket->state)
3443   {
3444   case STATE_TRANSMIT_CLOSED:
3445   case STATE_TRANSMIT_CLOSE_WAIT:
3446   case STATE_CLOSED:
3447   case STATE_CLOSE_WAIT:
3448     if (NULL != write_cont)
3449       write_cont (write_cont_cls, GNUNET_STREAM_SHUTDOWN, 0);
3450     LOG (GNUNET_ERROR_TYPE_DEBUG,
3451          "%s() END\n", __func__);
3452     return NULL;
3453   case STATE_INIT:
3454   case STATE_LISTEN:
3455   case STATE_HELLO_WAIT:
3456     if (NULL != write_cont)
3457       /* FIXME: GNUNET_STREAM_SYSERR?? */
3458       write_cont (write_cont_cls, GNUNET_STREAM_SYSERR, 0);
3459     LOG (GNUNET_ERROR_TYPE_DEBUG,
3460          "%s() END\n", __func__);
3461     return NULL;
3462   case STATE_ESTABLISHED:
3463   case STATE_RECEIVE_CLOSED:
3464   case STATE_RECEIVE_CLOSE_WAIT:
3465     break;
3466   }
3467   if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * socket->max_payload_size < size)
3468     size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH  * socket->max_payload_size;
3469   num_needed_packets =
3470       (size + (socket->max_payload_size - 1)) / socket->max_payload_size;
3471   io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
3472   io_handle->socket = socket;
3473   io_handle->write_cont = write_cont;
3474   io_handle->write_cont_cls = write_cont_cls;
3475   io_handle->size = size;
3476   io_handle->packets_sent = 0;
3477   sweep = data;
3478   /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3479      determined from RTT */
3480   ack_deadline = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
3481   /* Divide the given buffer into packets for sending */
3482   max_data_packet_size =
3483       socket->max_payload_size + sizeof (struct GNUNET_STREAM_DataMessage);
3484   for (packet=0; packet < num_needed_packets; packet++)
3485   {
3486     if ((packet + 1) * socket->max_payload_size < size) 
3487     {
3488       payload_size = socket->max_payload_size;
3489       packet_size = max_data_packet_size;
3490     }
3491     else 
3492     {
3493       payload_size = size - packet * socket->max_payload_size;
3494       packet_size = 
3495           payload_size + sizeof (struct GNUNET_STREAM_DataMessage);
3496     }
3497     io_handle->messages[packet] = GNUNET_malloc (packet_size);
3498     io_handle->messages[packet]->header.header.size = htons (packet_size);
3499     io_handle->messages[packet]->header.header.type =
3500       htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
3501     io_handle->messages[packet]->sequence_number =
3502       htonl (socket->write_sequence_number++);
3503     io_handle->messages[packet]->offset = htonl (socket->write_offset);
3504     /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3505        determined from RTT */
3506     io_handle->messages[packet]->ack_deadline =
3507       GNUNET_TIME_relative_hton (ack_deadline);
3508     data_msg = io_handle->messages[packet];
3509     /* Copy data from given buffer to the packet */
3510     memcpy (&data_msg[1], sweep, payload_size);
3511     sweep += payload_size;
3512     socket->write_offset += payload_size;
3513   }
3514   /* ack the last data message. FIXME: remove when we figure out how to do this
3515      using RTT */
3516   io_handle->messages[num_needed_packets - 1]->ack_deadline = 
3517       GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO);
3518   socket->data_retransmit_timeout = GNUNET_TIME_UNIT_SECONDS;
3519   socket->write_handle = io_handle;
3520   write_data (socket);
3521   LOG (GNUNET_ERROR_TYPE_DEBUG,
3522        "%s() END\n", __func__);
3523   return io_handle;
3524 }
3525
3526
3527 /**
3528  * Tries to read data from the stream.
3529  *
3530  * @param socket the socket representing a stream
3531  * @param timeout the timeout period
3532  * @param proc function to call with data (once only)
3533  * @param proc_cls the closure for proc
3534  *
3535  * @return handle to cancel the operation; NULL is returned if: the stream has
3536  *           been shutdown for this type of opeartion (the DataProcessor is
3537  *           immediately called with GNUNET_STREAM_SHUTDOWN as status) OR another
3538  *           read handle is present (only one read handle per socket is present
3539  *           at any time)
3540  */
3541 struct GNUNET_STREAM_IOReadHandle *
3542 GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
3543                     struct GNUNET_TIME_Relative timeout,
3544                     GNUNET_STREAM_DataProcessor proc,
3545                     void *proc_cls)
3546 {
3547   struct GNUNET_STREAM_IOReadHandle *read_handle;
3548   
3549   LOG (GNUNET_ERROR_TYPE_DEBUG,
3550        "%s: %s()\n", 
3551        GNUNET_i2s (&socket->other_peer),
3552        __func__);
3553   /* Return NULL if there is already a read handle; the user has to cancel that
3554      first before continuing or has to wait until it is completed */
3555   if (NULL != socket->read_handle) 
3556     return NULL;
3557   GNUNET_assert (NULL != proc);
3558   switch (socket->state)
3559   {
3560   case STATE_RECEIVE_CLOSED:
3561   case STATE_RECEIVE_CLOSE_WAIT:
3562   case STATE_CLOSED:
3563   case STATE_CLOSE_WAIT:
3564     LOG (GNUNET_ERROR_TYPE_DEBUG,
3565          "%s: %s() END\n",
3566          GNUNET_i2s (&socket->other_peer),
3567          __func__);
3568     proc (proc_cls, GNUNET_STREAM_SHUTDOWN, NULL, 0);
3569     return NULL;
3570   default:
3571     break;
3572   }
3573   read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
3574   read_handle->proc = proc;
3575   read_handle->proc_cls = proc_cls;
3576   read_handle->socket = socket;
3577   socket->read_handle = read_handle;
3578   if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
3579                                           0))
3580     read_handle->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
3581                                                           socket);   
3582   read_handle->read_io_timeout_task_id =
3583       GNUNET_SCHEDULER_add_delayed (timeout, &read_io_timeout, socket);
3584   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: %s() END\n",
3585        GNUNET_i2s (&socket->other_peer), __func__);
3586   return read_handle;
3587 }
3588
3589
3590 /**
3591  * Cancel pending write operation.
3592  *
3593  * @param ioh handle to operation to cancel
3594  */
3595 void
3596 GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
3597 {
3598   struct GNUNET_STREAM_Socket *socket = ioh->socket;
3599   unsigned int packet;
3600
3601   GNUNET_assert (NULL != socket->write_handle);
3602   GNUNET_assert (socket->write_handle == ioh);
3603   if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
3604   {
3605     GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
3606     socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
3607   }
3608   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
3609   {
3610     if (NULL == ioh->messages[packet]) break;
3611     GNUNET_free (ioh->messages[packet]);
3612   }      
3613   GNUNET_free (socket->write_handle);
3614   socket->write_handle = NULL;
3615 }
3616
3617
3618 /**
3619  * Cancel pending read operation.
3620  *
3621  * @param ioh handle to operation to cancel
3622  */
3623 void
3624 GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh)
3625 {
3626   struct GNUNET_STREAM_Socket *socket;
3627   
3628   socket = ioh->socket;
3629   GNUNET_assert (NULL != socket->read_handle);
3630   GNUNET_assert (ioh == socket->read_handle);
3631   /* Read io time task should be there; if it is already executed then this
3632   read handle is not valid; However upon scheduler shutdown the read io task
3633   may be executed before */
3634   if (GNUNET_SCHEDULER_NO_TASK != ioh->read_io_timeout_task_id)
3635     GNUNET_SCHEDULER_cancel (ioh->read_io_timeout_task_id);
3636   /* reading task may be present; if so we have to stop it */
3637   if (GNUNET_SCHEDULER_NO_TASK != ioh->read_task_id)
3638     GNUNET_SCHEDULER_cancel (ioh->read_task_id);
3639   GNUNET_free (ioh);
3640   socket->read_handle = NULL;
3641 }
3642
3643 /* end of stream_api.c */