seperate timeouts for MESH retries
[oweals/gnunet.git] / src / stream / stream_api.c
1 /*
2   This file is part of GNUnet.
3   (C) 2012 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /* TODO:
22  *
23  * Checks for matching the sender and socket->other_peer in server
24  * message handlers  
25  *
26  * Add code for write io timeout
27  *
28  * Include retransmission for control messages
29  **/
30
31 /**
32  * @file stream/stream_api.c
33  * @brief Implementation of the stream library
34  * @author Sree Harsha Totakura
35  */
36
37 #include "platform.h"
38 #include "gnunet_common.h"
39 #include "gnunet_util_lib.h"
40 #include "gnunet_lockmanager_service.h"
41 #include "gnunet_statistics_service.h"
42 #include "gnunet_stream_lib.h"
43 #include "stream_protocol.h"
44
45 /**
46  * Generic logging shorthand
47  */
48 #define LOG(kind,...)                                   \
49   GNUNET_log_from (kind, "stream-api", __VA_ARGS__)
50
51 /**
52  * Debug logging shorthand
53  */
54 #define LOG_DEBUG(...)                          \
55   LOG (GNUNET_ERROR_TYPE_DEBUG, __VA_ARGS__)
56
57 /**
58  * Time in relative seconds shorthand
59  */
60 #define TIME_REL_SECS(sec) \
61   GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, sec)
62
63 /**
64  * The maximum packet size of a stream packet
65  */
66 #define DEFAULT_MAX_PAYLOAD_SIZE 64000
67
68 /**
69  * Receive buffer
70  */
71 #define RECEIVE_BUFFER_SIZE 4096000
72
73 /**
74  * states in the Protocol
75  */
76 enum State
77   {
78     /**
79      * Client initialization state
80      */
81     STATE_INIT,
82
83     /**
84      * Listener initialization state 
85      */
86     STATE_LISTEN,
87
88     /**
89      * Pre-connection establishment state
90      */
91     STATE_HELLO_WAIT,
92
93     /**
94      * State where a connection has been established
95      */
96     STATE_ESTABLISHED,
97
98     /**
99      * State where the socket is closed on our side and waiting to be ACK'ed
100      */
101     STATE_RECEIVE_CLOSE_WAIT,
102
103     /**
104      * State where the socket is closed for reading
105      */
106     STATE_RECEIVE_CLOSED,
107
108     /**
109      * State where the socket is closed on our side and waiting to be ACK'ed
110      */
111     STATE_TRANSMIT_CLOSE_WAIT,
112
113     /**
114      * State where the socket is closed for writing
115      */
116     STATE_TRANSMIT_CLOSED,
117
118     /**
119      * State where the socket is closed on our side and waiting to be ACK'ed
120      */
121     STATE_CLOSE_WAIT,
122
123     /**
124      * State where the socket is closed
125      */
126     STATE_CLOSED 
127   };
128
129
130 /**
131  * Functions of this type are called when a message is written
132  *
133  * @param cls the closure from queue_message
134  * @param socket the socket the written message was bound to
135  */
136 typedef void (*SendFinishCallback) (void *cls,
137                                     struct GNUNET_STREAM_Socket *socket);
138
139
140 /**
141  * The send message queue
142  */
143 struct MessageQueue
144 {
145   /**
146    * The message
147    */
148   struct GNUNET_STREAM_MessageHeader *message;
149
150   /**
151    * Callback to be called when the message is sent
152    */
153   SendFinishCallback finish_cb;
154
155   /**
156    * The closure for finish_cb
157    */
158   void *finish_cb_cls;
159
160   /**
161    * The next message in queue. Should be NULL in the last message
162    */
163   struct MessageQueue *next;
164
165   /**
166    * The next message in queue. Should be NULL in the first message
167    */
168   struct MessageQueue *prev;
169 };
170
171
172 /**
173  * The STREAM Socket Handler
174  */
175 struct GNUNET_STREAM_Socket
176 {
177   /**
178    * The mesh handle
179    */
180   struct GNUNET_MESH_Handle *mesh;
181
182   /**
183    * Handle to statistics
184    */
185   struct GNUNET_STATISTICS_Handle *stat_handle;
186
187   /**
188    * The mesh tunnel handle
189    */
190   struct GNUNET_MESH_Tunnel *tunnel;
191
192   /**
193    * Stream open closure
194    */
195   void *open_cls;
196
197   /**
198    * Stream open callback
199    */
200   GNUNET_STREAM_OpenCallback open_cb;
201
202   /**
203    * The current transmit handle (if a pending transmit request exists)
204    */
205   struct GNUNET_MESH_TransmitHandle *transmit_handle;
206
207   /**
208    * The current message associated with the transmit handle
209    */
210   struct MessageQueue *queue_head;
211
212   /**
213    * The queue tail, should always point to the last message in queue
214    */
215   struct MessageQueue *queue_tail;
216
217   /**
218    * The write IO_handle associated with this socket
219    */
220   struct GNUNET_STREAM_IOWriteHandle *write_handle;
221
222   /**
223    * The read IO_handle associated with this socket
224    */
225   struct GNUNET_STREAM_IOReadHandle *read_handle;
226
227   /**
228    * The shutdown handle associated with this socket
229    */
230   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
231
232   /**
233    * Buffer for storing received messages
234    */
235   void *receive_buffer;
236
237   /**
238    * The listen socket from which this socket is derived. Should be NULL if it
239    * is not a derived socket
240    */
241   struct GNUNET_STREAM_ListenSocket *lsocket;
242
243   /**
244    * The peer identity of the peer at the other end of the stream
245    */
246   struct GNUNET_PeerIdentity other_peer;
247
248   /**
249    * The Acknowledgement Bitmap
250    */
251   GNUNET_STREAM_AckBitmap ack_bitmap;
252
253   /**
254    * Task identifier for retransmission task after timeout
255    */
256   GNUNET_SCHEDULER_TaskIdentifier data_retransmission_task_id;
257
258   /**
259    * Task identifier for retransmission of control messages
260    */
261   GNUNET_SCHEDULER_TaskIdentifier control_retransmission_task_id;
262
263   /**
264    * The task for sending timely Acks
265    */
266   GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
267
268   /**
269    * Retransmission timeout
270    */
271   struct GNUNET_TIME_Relative retransmit_timeout;
272
273   /**
274    * Time when the Acknowledgement was queued
275    */
276   struct GNUNET_TIME_Absolute ack_time_registered;
277
278   /**
279    * Queued Acknowledgement deadline
280    */
281   struct GNUNET_TIME_Relative ack_time_deadline;
282
283   /**
284    * Mesh transmit timeout
285    */
286   struct GNUNET_TIME_Relative mesh_retry_timeout;  
287
288   /**
289    * The state of the protocol associated with this socket
290    */
291   enum State state;
292
293   /**
294    * The status of the socket
295    */
296   enum GNUNET_STREAM_Status status;
297
298   /**
299    * The number of previous timeouts; FIXME: currently not used
300    */
301   unsigned int retries;
302
303   /**
304    * Whether testing mode is active or not
305    */
306   int testing_active;
307
308   /**
309    * Is receive closed
310    */
311   int receive_closed;
312
313   /**
314    * Is transmission closed
315    */
316   int transmit_closed;
317
318   /**
319    * The application port number (type: uint32_t)
320    */
321   GNUNET_MESH_ApplicationType app_port;
322
323   /**
324    * The write sequence number to be set incase of testing
325    */
326   uint32_t testing_set_write_sequence_number_value;
327
328   /**
329    * Write sequence number. Set to random when sending HELLO(client) and
330    * HELLO_ACK(server) 
331    */
332   uint32_t write_sequence_number;
333
334   /**
335    * Read sequence number. This number's value is determined during handshake
336    */
337   uint32_t read_sequence_number;
338
339   /**
340    * The receiver buffer size
341    */
342   uint32_t receive_buffer_size;
343
344   /**
345    * The receiver buffer boundaries
346    */
347   uint32_t receive_buffer_boundaries[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
348
349   /**
350    * receiver's available buffer after the last acknowledged packet
351    */
352   uint32_t receiver_window_available;
353
354   /**
355    * The offset pointer used during write operation
356    */
357   uint32_t write_offset;
358
359   /**
360    * The offset after which we are expecting data
361    */
362   uint32_t read_offset;
363
364   /**
365    * The offset upto which user has read from the received buffer
366    */
367   uint32_t copy_offset;
368
369   /**
370    * The maximum size of the data message payload this stream handle can send
371    */
372   uint16_t max_payload_size;
373 };
374
375
376 /**
377  * A socket for listening
378  */
379 struct GNUNET_STREAM_ListenSocket
380 {
381   /**
382    * The mesh handle
383    */
384   struct GNUNET_MESH_Handle *mesh;
385
386   /**
387    * Handle to statistics
388    */
389   struct GNUNET_STATISTICS_Handle *stat_handle;
390
391   /**
392    * Our configuration
393    */
394   struct GNUNET_CONFIGURATION_Handle *cfg;
395
396   /**
397    * Handle to the lock manager service
398    */
399   struct GNUNET_LOCKMANAGER_Handle *lockmanager;
400
401   /**
402    * The active LockingRequest from lockmanager
403    */
404   struct GNUNET_LOCKMANAGER_LockingRequest *locking_request;
405
406   /**
407    * Callback to call after acquring a lock and listening
408    */
409   GNUNET_STREAM_ListenSuccessCallback listen_ok_cb;
410
411   /**
412    * The callback function which is called after successful opening socket
413    */
414   GNUNET_STREAM_ListenCallback listen_cb;
415
416   /**
417    * The call back closure
418    */
419   void *listen_cb_cls;
420
421   /**
422    * The service port
423    */
424   GNUNET_MESH_ApplicationType port;
425   
426   /**
427    * The id of the lockmanager timeout task
428    */
429   GNUNET_SCHEDULER_TaskIdentifier lockmanager_acquire_timeout_task;
430
431   /**
432    * The retransmit timeout
433    */
434   struct GNUNET_TIME_Relative retransmit_timeout;
435   
436   /**
437    * Listen enabled?
438    */
439   int listening;
440
441   /**
442    * Whether testing mode is active or not
443    */
444   int testing_active;
445
446   /**
447    * The write sequence number to be set incase of testing
448    */
449   uint32_t testing_set_write_sequence_number_value;
450
451   /**
452    * The maximum size of the data message payload this stream handle can send
453    */
454   uint16_t max_payload_size;
455
456 };
457
458
459 /**
460  * The IO Write Handle
461  */
462 struct GNUNET_STREAM_IOWriteHandle
463 {
464   /**
465    * The socket to which this write handle is associated
466    */
467   struct GNUNET_STREAM_Socket *socket;
468
469   /**
470    * The packet_buffers associated with this Handle
471    */
472   struct GNUNET_STREAM_DataMessage *messages[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
473
474   /**
475    * The write continuation callback
476    */
477   GNUNET_STREAM_CompletionContinuation write_cont;
478
479   /**
480    * Write continuation closure
481    */
482   void *write_cont_cls;
483
484   /**
485    * The bitmap of this IOHandle; Corresponding bit for a message is set when
486    * it has been acknowledged by the receiver
487    */
488   GNUNET_STREAM_AckBitmap ack_bitmap;
489
490   /**
491    * Number of bytes in this write handle
492    */
493   size_t size;
494
495   /**
496    * Number of packets already transmitted from this IO handle. Retransmitted
497    * packets are not taken into account here. This is used to determine which
498    * packets account for retransmission and which packets occupy buffer space at
499    * the receiver.
500    */
501   unsigned int packets_sent;
502 };
503
504
505 /**
506  * The IO Read Handle
507  */
508 struct GNUNET_STREAM_IOReadHandle
509 {
510   /**
511    * The socket to which this read handle is associated
512    */
513   struct GNUNET_STREAM_Socket *socket;
514   
515   /**
516    * Callback for the read processor
517    */
518   GNUNET_STREAM_DataProcessor proc;
519
520   /**
521    * The closure pointer for the read processor callback
522    */
523   void *proc_cls;
524
525   /**
526    * Task identifier for the read io timeout task
527    */
528   GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id;
529
530   /**
531    * Task scheduled to continue a read operation.
532    */
533   GNUNET_SCHEDULER_TaskIdentifier read_task_id;
534 };
535
536
537 /**
538  * Handle for Shutdown
539  */
540 struct GNUNET_STREAM_ShutdownHandle
541 {
542   /**
543    * The socket associated with this shutdown handle
544    */
545   struct GNUNET_STREAM_Socket *socket;
546
547   /**
548    * Shutdown completion callback
549    */
550   GNUNET_STREAM_ShutdownCompletion completion_cb;
551
552   /**
553    * Closure for completion callback
554    */
555   void *completion_cls;
556
557   /**
558    * Close message retransmission task id
559    */
560   GNUNET_SCHEDULER_TaskIdentifier close_msg_retransmission_task_id;
561
562   /**
563    * Task scheduled to call the shutdown continuation callback
564    */
565   GNUNET_SCHEDULER_TaskIdentifier call_cont_task_id;
566
567   /**
568    * Which operation to shutdown? SHUT_RD, SHUT_WR or SHUT_RDWR
569    */
570   int operation;  
571 };
572
573
574 /**
575  * Default value in seconds for various timeouts
576  */
577 static const unsigned int default_timeout = 10;
578
579 /**
580  * The domain name for locks we use here
581  */
582 static const char *locking_domain = "GNUNET_STREAM_APPLOCK";
583
584 /**
585  * Callback function for sending queued message
586  *
587  * @param cls closure the socket
588  * @param size number of bytes available in buf
589  * @param buf where the callee should write the message
590  * @return number of bytes written to buf
591  */
592 static size_t
593 send_message_notify (void *cls, size_t size, void *buf)
594 {
595   struct GNUNET_STREAM_Socket *socket = cls;
596   struct MessageQueue *head;
597   size_t ret;
598
599   socket->transmit_handle = NULL; /* Remove the transmit handle */
600   head = socket->queue_head;
601   if (NULL == head)
602     return 0; /* just to be safe */
603   if (0 == size)                /* request timed out */
604   {
605     socket->mesh_retry_timeout = GNUNET_TIME_STD_BACKOFF
606         (socket->mesh_retry_timeout);    
607     LOG (GNUNET_ERROR_TYPE_DEBUG,
608          "%s: Message sending to MESH timed out. Retrying in %s \n",
609          GNUNET_i2s (&socket->other_peer),
610          GNUNET_STRINGS_relative_time_to_string (socket->mesh_retry_timeout,
611                                                  GNUNET_YES));
612     socket->transmit_handle = 
613         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
614                                            GNUNET_NO, /* Corking */
615                                            socket->mesh_retry_timeout,
616                                            &socket->other_peer,
617                                            ntohs (head->message->header.size),
618                                            &send_message_notify,
619                                            socket);
620     return 0;
621   }
622   ret = ntohs (head->message->header.size);
623   GNUNET_assert (size >= ret);
624   memcpy (buf, head->message, ret);
625   if (NULL != head->finish_cb)
626   {
627     head->finish_cb (head->finish_cb_cls, socket);
628   }
629   GNUNET_CONTAINER_DLL_remove (socket->queue_head,
630                                socket->queue_tail,
631                                head);
632   GNUNET_free (head->message);
633   GNUNET_free (head);
634   if (NULL != socket->transmit_handle)
635     return ret; /* 'finish_cb' might have triggered message already! */
636   head = socket->queue_head;
637   if (NULL != head)    /* more pending messages to send */
638   {
639     socket->mesh_retry_timeout = GNUNET_TIME_relative_get_zero_ ();
640     socket->transmit_handle = 
641         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
642                                            GNUNET_NO, /* Corking */
643                                            socket->mesh_retry_timeout,
644                                            &socket->other_peer,
645                                            ntohs (head->message->header.size),
646                                            &send_message_notify,
647                                            socket);
648   }
649   return ret;
650 }
651
652
653 /**
654  * Queues a message for sending using the mesh connection of a socket
655  *
656  * @param socket the socket whose mesh connection is used
657  * @param message the message to be sent
658  * @param finish_cb the callback to be called when the message is sent
659  * @param finish_cb_cls the closure for the callback
660  * @param urgent set to GNUNET_YES to add the message to the beginning of the
661  *          queue; GNUNET_NO to add at the tail
662  */
663 static void
664 queue_message (struct GNUNET_STREAM_Socket *socket,
665                struct GNUNET_STREAM_MessageHeader *message,
666                SendFinishCallback finish_cb,
667                void *finish_cb_cls,
668                int urgent)
669 {
670   struct MessageQueue *queue_entity;
671
672   GNUNET_assert 
673     ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA)
674      && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK));
675   LOG (GNUNET_ERROR_TYPE_DEBUG,
676        "%s: Queueing message of type %d and size %d\n",
677        GNUNET_i2s (&socket->other_peer),
678        ntohs (message->header.type),
679        ntohs (message->header.size));
680   GNUNET_assert (NULL != message);
681   queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
682   queue_entity->message = message;
683   queue_entity->finish_cb = finish_cb;
684   queue_entity->finish_cb_cls = finish_cb_cls;
685   if (GNUNET_YES == urgent)
686   {
687     GNUNET_CONTAINER_DLL_insert (socket->queue_head, socket->queue_tail,
688                                  queue_entity);
689     if (NULL != socket->transmit_handle)
690     {
691       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
692       socket->transmit_handle = NULL;
693     }
694   }
695   else
696     GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
697                                       socket->queue_tail,
698                                       queue_entity);
699   if (NULL == socket->transmit_handle)
700   {
701     socket->mesh_retry_timeout = GNUNET_TIME_relative_get_zero_ ();
702     socket->transmit_handle = 
703         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
704                                            GNUNET_NO, /* Corking */
705                                            socket->mesh_retry_timeout,
706                                            &socket->other_peer,
707                                            ntohs (message->header.size),
708                                            &send_message_notify,
709                                            socket);
710   }
711 }
712
713
714 /**
715  * Copies a message and queues it for sending using the mesh connection of
716  * given socket 
717  *
718  * @param socket the socket whose mesh connection is used
719  * @param message the message to be sent
720  * @param finish_cb the callback to be called when the message is sent
721  * @param finish_cb_cls the closure for the callback
722  */
723 static void
724 copy_and_queue_message (struct GNUNET_STREAM_Socket *socket,
725                         const struct GNUNET_STREAM_MessageHeader *message,
726                         SendFinishCallback finish_cb,
727                         void *finish_cb_cls)
728 {
729   struct GNUNET_STREAM_MessageHeader *msg_copy;
730   uint16_t size;
731   
732   size = ntohs (message->header.size);
733   msg_copy = GNUNET_malloc (size);
734   memcpy (msg_copy, message, size);
735   queue_message (socket, msg_copy, finish_cb, finish_cb_cls, GNUNET_NO);
736 }
737
738
739 /**
740  * Writes data using the given socket. The amount of data written is limited by
741  * the receiver_window_size
742  *
743  * @param socket the socket to use
744  */
745 static void 
746 write_data (struct GNUNET_STREAM_Socket *socket);
747
748
749 /**
750  * Task for retransmitting data messages if they aren't ACK before their ack
751  * deadline 
752  *
753  * @param cls the socket
754  * @param tc the Task context
755  */
756 static void
757 data_retransmission_task (void *cls,
758                           const struct GNUNET_SCHEDULER_TaskContext *tc)
759 {
760   struct GNUNET_STREAM_Socket *socket = cls;
761   
762   socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
763   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
764     return;
765   LOG (GNUNET_ERROR_TYPE_DEBUG,
766        "%s: Retransmitting DATA...\n", GNUNET_i2s (&socket->other_peer));
767   write_data (socket);
768 }
769
770
771 /**
772  * Task for sending ACK message
773  *
774  * @param cls the socket
775  * @param tc the Task context
776  */
777 static void
778 ack_task (void *cls,
779           const struct GNUNET_SCHEDULER_TaskContext *tc)
780 {
781   struct GNUNET_STREAM_Socket *socket = cls;
782   struct GNUNET_STREAM_AckMessage *ack_msg;
783
784   socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
785   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
786     return;
787   /* Create the ACK Message */
788   ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
789   ack_msg->header.header.size = htons (sizeof (struct 
790                                                GNUNET_STREAM_AckMessage));
791   ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
792   ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
793   ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
794   ack_msg->receive_window_remaining = 
795     htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
796   /* Queue up ACK for immediate sending */
797   queue_message (socket, &ack_msg->header, NULL, NULL, GNUNET_YES);
798 }
799
800
801 /**
802  * Retransmission task for shutdown messages
803  *
804  * @param cls the shutdown handle
805  * @param tc the Task Context
806  */
807 static void
808 close_msg_retransmission_task (void *cls,
809                                const struct GNUNET_SCHEDULER_TaskContext *tc)
810 {
811   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle = cls;
812   struct GNUNET_STREAM_MessageHeader *msg;
813   struct GNUNET_STREAM_Socket *socket;
814
815   shutdown_handle->close_msg_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
816   GNUNET_assert (NULL != shutdown_handle);
817   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
818     return;
819   socket = shutdown_handle->socket;
820   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
821   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
822   switch (shutdown_handle->operation)
823   {
824   case SHUT_RDWR:
825     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
826     break;
827   case SHUT_RD:
828     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
829     break;
830   case SHUT_WR:
831     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
832     break;
833   default:
834     GNUNET_free (msg);
835     shutdown_handle->close_msg_retransmission_task_id = 
836       GNUNET_SCHEDULER_NO_TASK;
837     return;
838   }
839   queue_message (socket, msg, NULL, NULL, GNUNET_NO);
840   shutdown_handle->close_msg_retransmission_task_id =
841     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
842                                   &close_msg_retransmission_task,
843                                   shutdown_handle);
844 }
845
846
847 /**
848  * Function to modify a bit in GNUNET_STREAM_AckBitmap
849  *
850  * @param bitmap the bitmap to modify
851  * @param bit the bit number to modify
852  * @param value GNUNET_YES to on, GNUNET_NO to off
853  */
854 static void
855 ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
856                       unsigned int bit, 
857                       int value)
858 {
859   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
860   if (GNUNET_YES == value)
861     *bitmap |= (1LL << bit);
862   else
863     *bitmap &= ~(1LL << bit);
864 }
865
866
867 /**
868  * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
869  *
870  * @param bitmap address of the bitmap that has to be checked
871  * @param bit the bit number to check
872  * @return GNUNET_YES if the bit is set; GNUNET_NO if not
873  */
874 static uint8_t
875 ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
876                       unsigned int bit)
877 {
878   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
879   return 0 != (*bitmap & (1LL << bit));
880 }
881
882
883 /**
884  * Writes data using the given socket. The amount of data written is limited by
885  * the receiver_window_size
886  *
887  * @param socket the socket to use
888  */
889 static void 
890 write_data (struct GNUNET_STREAM_Socket *socket)
891 {
892   struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
893   unsigned int packet;
894   
895   for (packet=0; packet < io_handle->packets_sent; packet++)
896   {
897     if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
898                                            packet))
899     {
900       LOG (GNUNET_ERROR_TYPE_DEBUG,
901            "%s: Retransmitting DATA message with sequence %u\n",
902            GNUNET_i2s (&socket->other_peer),
903            ntohl (io_handle->messages[packet]->sequence_number));
904       copy_and_queue_message (socket,
905                               &io_handle->messages[packet]->header,
906                               NULL,
907                               NULL);
908     }
909   }
910   /* Now send new packets if there is enough buffer space */
911   while ((packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH) &&
912          (NULL != io_handle->messages[packet]) &&
913          (socket->receiver_window_available 
914           >= ntohs (io_handle->messages[packet]->header.header.size)))
915   {
916     socket->receiver_window_available -= 
917       ntohs (io_handle->messages[packet]->header.header.size);
918     LOG (GNUNET_ERROR_TYPE_DEBUG,
919          "%s: Placing DATA message with sequence %u in send queue\n",
920          GNUNET_i2s (&socket->other_peer),
921          ntohl (io_handle->messages[packet]->sequence_number));
922     copy_and_queue_message (socket,
923                             &io_handle->messages[packet]->header,
924                             NULL,
925                             NULL);
926     packet++;
927   }
928   io_handle->packets_sent = packet;
929   // FIXME: 8s is not good, should use GNUNET_TIME_STD_BACKOFF...
930   if (GNUNET_SCHEDULER_NO_TASK == socket->data_retransmission_task_id)
931     socket->data_retransmission_task_id = 
932       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 
933                                     (GNUNET_TIME_UNIT_SECONDS, 8),
934                                     &data_retransmission_task,
935                                     socket);
936 }
937
938
939 /**
940  * Task for calling the read processor
941  *
942  * @param cls the socket
943  * @param tc the task context
944  */
945 static void
946 call_read_processor (void *cls,
947                      const struct GNUNET_SCHEDULER_TaskContext *tc)
948 {
949   struct GNUNET_STREAM_Socket *socket = cls;
950   struct GNUNET_STREAM_IOReadHandle *read_handle;
951   size_t read_size;
952   size_t valid_read_size;
953   unsigned int packet;
954   uint32_t sequence_increase;
955   uint32_t offset_increase;
956
957   read_handle = socket->read_handle;
958   GNUNET_assert (NULL != read_handle);
959   read_handle->read_task_id = GNUNET_SCHEDULER_NO_TASK;
960   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
961     return;
962   if (NULL == socket->receive_buffer) 
963     return;
964   GNUNET_assert (NULL != socket->read_handle);
965   GNUNET_assert (NULL != socket->read_handle->proc);
966   /* Check the bitmap for any holes */
967   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
968   {
969     if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
970                                            packet))
971       break;
972   }
973   /* We only call read processor if we have the first packet */
974   GNUNET_assert (0 < packet);
975   valid_read_size = 
976     socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
977   GNUNET_assert (0 != valid_read_size);
978   /* Cancel the read_io_timeout_task */
979   GNUNET_SCHEDULER_cancel (read_handle->read_io_timeout_task_id);
980   read_handle->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
981   /* Call the data processor */
982   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Calling read processor\n",
983        GNUNET_i2s (&socket->other_peer));
984   read_size = 
985       socket->read_handle->proc (socket->read_handle->proc_cls,
986                                  socket->status,
987                                  socket->receive_buffer + socket->copy_offset,
988                                  valid_read_size);
989   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor read %d bytes\n",
990        GNUNET_i2s (&socket->other_peer), read_size);
991   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor completed successfully\n",
992        GNUNET_i2s (&socket->other_peer));
993   /* Free the read handle */
994   GNUNET_free (socket->read_handle);
995   socket->read_handle = NULL;
996   GNUNET_assert (read_size <= valid_read_size);
997   socket->copy_offset += read_size;
998   /* Determine upto which packet we can remove from the buffer */
999   for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
1000   {
1001     if (socket->copy_offset == socket->receive_buffer_boundaries[packet])
1002     { 
1003       packet++; 
1004       break;
1005     }
1006     if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
1007       break;
1008   }
1009   /* If no packets can be removed we can't move the buffer */
1010   if (0 == packet) 
1011     return;
1012   sequence_increase = packet;
1013   LOG (GNUNET_ERROR_TYPE_DEBUG,
1014        "%s: Sequence increase after read processor completion: %u\n",
1015        GNUNET_i2s (&socket->other_peer), sequence_increase);
1016   /* Shift the data in the receive buffer */
1017   socket->receive_buffer = 
1018     memmove (socket->receive_buffer,
1019              socket->receive_buffer 
1020              + socket->receive_buffer_boundaries[sequence_increase-1],
1021              socket->receive_buffer_size
1022              - socket->receive_buffer_boundaries[sequence_increase-1]);
1023   /* Shift the bitmap */
1024   socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
1025   /* Set read_sequence_number */
1026   socket->read_sequence_number += sequence_increase;
1027   /* Set read_offset */
1028   offset_increase = socket->receive_buffer_boundaries[sequence_increase-1];
1029   socket->read_offset += offset_increase;
1030   /* Fix copy_offset */
1031   GNUNET_assert (offset_increase <= socket->copy_offset);
1032   socket->copy_offset -= offset_increase;
1033   /* Fix relative boundaries */
1034   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
1035   {
1036     if (packet < (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase))
1037     {
1038       uint32_t ahead_buffer_boundary;
1039
1040       ahead_buffer_boundary = 
1041         socket->receive_buffer_boundaries[packet + sequence_increase];
1042       if (0 == ahead_buffer_boundary)
1043         socket->receive_buffer_boundaries[packet] = 0;
1044       else
1045       {
1046         GNUNET_assert (offset_increase < ahead_buffer_boundary);
1047         socket->receive_buffer_boundaries[packet] = 
1048           ahead_buffer_boundary - offset_increase;
1049       }
1050     }
1051     else
1052       socket->receive_buffer_boundaries[packet] = 0;
1053   }
1054 }
1055
1056
1057 /**
1058  * Cancels the existing read io handle
1059  *
1060  * @param cls the closure from the SCHEDULER call
1061  * @param tc the task context
1062  */
1063 static void
1064 read_io_timeout (void *cls,
1065                  const struct GNUNET_SCHEDULER_TaskContext *tc)
1066 {
1067   struct GNUNET_STREAM_Socket *socket = cls;
1068   struct GNUNET_STREAM_IOReadHandle *read_handle;
1069   GNUNET_STREAM_DataProcessor proc;
1070   void *proc_cls;
1071
1072   read_handle = socket->read_handle;
1073   GNUNET_assert (NULL != read_handle);
1074   read_handle->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
1075   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
1076     return;
1077   if (read_handle->read_task_id != GNUNET_SCHEDULER_NO_TASK)
1078   {
1079     LOG (GNUNET_ERROR_TYPE_DEBUG,
1080          "%s: Read task timedout - Cancelling it\n",
1081          GNUNET_i2s (&socket->other_peer));
1082     GNUNET_SCHEDULER_cancel (read_handle->read_task_id);
1083     read_handle->read_task_id = GNUNET_SCHEDULER_NO_TASK;
1084   }
1085   proc = read_handle->proc;
1086   proc_cls = read_handle->proc_cls;
1087   GNUNET_free (read_handle);
1088   socket->read_handle = NULL;
1089   /* Call the read processor to signal timeout */
1090   proc (proc_cls,
1091         GNUNET_STREAM_TIMEOUT,
1092         NULL,
1093         0);
1094 }
1095
1096
1097 /**
1098  * Handler for DATA messages; Same for both client and server
1099  *
1100  * @param socket the socket through which the ack was received
1101  * @param tunnel connection to the other end
1102  * @param sender who sent the message
1103  * @param msg the data message
1104  * @param atsi performance data for the connection
1105  * @return GNUNET_OK to keep the connection open,
1106  *         GNUNET_SYSERR to close it (signal serious error)
1107  */
1108 static int
1109 handle_data (struct GNUNET_STREAM_Socket *socket,
1110              struct GNUNET_MESH_Tunnel *tunnel,
1111              const struct GNUNET_PeerIdentity *sender,
1112              const struct GNUNET_STREAM_DataMessage *msg,
1113              const struct GNUNET_ATS_Information*atsi)
1114 {
1115   const void *payload;
1116   struct GNUNET_TIME_Relative ack_deadline_rel;
1117   uint32_t bytes_needed;
1118   uint32_t relative_offset;
1119   uint32_t relative_sequence_number;
1120   uint16_t size;
1121
1122   size = htons (msg->header.header.size);
1123   if (size < sizeof (struct GNUNET_STREAM_DataMessage))
1124   {
1125     GNUNET_break_op (0);
1126     return GNUNET_SYSERR;
1127   }
1128   if (0 != memcmp (sender, &socket->other_peer,
1129                    sizeof (struct GNUNET_PeerIdentity)))
1130   {
1131     LOG (GNUNET_ERROR_TYPE_DEBUG,
1132          "%s: Received DATA from non-confirming peer\n",
1133          GNUNET_i2s (&socket->other_peer));
1134     return GNUNET_YES;
1135   }
1136   switch (socket->state)
1137   {
1138   case STATE_ESTABLISHED:
1139   case STATE_TRANSMIT_CLOSED:
1140   case STATE_TRANSMIT_CLOSE_WAIT:      
1141     /* check if the message's sequence number is in the range we are
1142        expecting */
1143     relative_sequence_number = 
1144       ntohl (msg->sequence_number) - socket->read_sequence_number;
1145     if ( relative_sequence_number >= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
1146     {
1147       LOG (GNUNET_ERROR_TYPE_DEBUG,
1148            "%s: Ignoring received message with sequence number %u\n",
1149            GNUNET_i2s (&socket->other_peer),
1150            ntohl (msg->sequence_number));
1151       /* Start ACK sending task if one is not already present */
1152       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1153       {
1154         socket->ack_task_id = 
1155           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1156                                         (msg->ack_deadline),
1157                                         &ack_task,
1158                                         socket);
1159       }
1160       return GNUNET_YES;
1161     }      
1162     /* Check if we have already seen this message */
1163     if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
1164                                             relative_sequence_number))
1165     {
1166       LOG (GNUNET_ERROR_TYPE_DEBUG,
1167            "%s: Ignoring already received message with sequence number %u\n",
1168            GNUNET_i2s (&socket->other_peer),
1169            ntohl (msg->sequence_number));
1170       /* Start ACK sending task if one is not already present */
1171       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1172       {
1173         socket->ack_task_id = 
1174           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1175                                         (msg->ack_deadline), &ack_task, socket);
1176       }
1177       return GNUNET_YES;
1178     }
1179     LOG (GNUNET_ERROR_TYPE_DEBUG,
1180          "%s: Receiving DATA with sequence number: %u and size: %d from %s\n",
1181          GNUNET_i2s (&socket->other_peer), ntohl (msg->sequence_number),
1182          ntohs (msg->header.header.size), GNUNET_i2s (&socket->other_peer));
1183     /* Check if we have to allocate the buffer */
1184     size -= sizeof (struct GNUNET_STREAM_DataMessage);
1185     relative_offset = ntohl (msg->offset) - socket->read_offset;
1186     bytes_needed = relative_offset + size;
1187     if (bytes_needed > socket->receive_buffer_size)
1188     {
1189       if (bytes_needed <= RECEIVE_BUFFER_SIZE)
1190       {
1191         socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
1192                                                  bytes_needed);
1193         socket->receive_buffer_size = bytes_needed;
1194       }
1195       else
1196       {
1197         LOG (GNUNET_ERROR_TYPE_DEBUG,
1198              "%s: Cannot accommodate packet %d as buffer is full\n",
1199              GNUNET_i2s (&socket->other_peer), ntohl (msg->sequence_number));
1200         return GNUNET_YES;
1201       }
1202     }
1203     /* Copy Data to buffer */
1204     payload = &msg[1];
1205     GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
1206     memcpy (socket->receive_buffer + relative_offset, payload, size);
1207     socket->receive_buffer_boundaries[relative_sequence_number] = 
1208         relative_offset + size;
1209     /* Modify the ACK bitmap */
1210     ackbitmap_modify_bit (&socket->ack_bitmap, relative_sequence_number,
1211                           GNUNET_YES);
1212     /* Start ACK sending task if one is not already present */
1213     ack_deadline_rel = GNUNET_TIME_relative_ntoh (msg->ack_deadline);
1214     if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1215     {
1216       ack_deadline_rel = 
1217           GNUNET_TIME_relative_min (ack_deadline_rel,
1218                                     GNUNET_TIME_relative_multiply
1219                                     (GNUNET_TIME_UNIT_SECONDS, 300));
1220       socket->ack_task_id = 
1221           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh 
1222                                         (msg->ack_deadline), &ack_task, socket);
1223       socket->ack_time_registered = GNUNET_TIME_absolute_get ();
1224       socket->ack_time_deadline = ack_deadline_rel;
1225     }
1226     else
1227     {
1228       struct GNUNET_TIME_Relative ack_time_past;
1229       struct GNUNET_TIME_Relative ack_time_remaining;
1230       struct GNUNET_TIME_Relative ack_time_min;
1231       ack_time_past = 
1232           GNUNET_TIME_absolute_get_duration (socket->ack_time_registered);
1233       ack_time_remaining = GNUNET_TIME_relative_subtract
1234           (socket->ack_time_deadline, ack_time_past);
1235       ack_time_min = GNUNET_TIME_relative_min (ack_time_remaining,
1236                                                ack_deadline_rel);
1237       if (0 == memcmp(&ack_deadline_rel, &ack_time_min,
1238                       sizeof (struct GNUNET_TIME_Relative)))
1239       {
1240         ack_deadline_rel = ack_time_min;
1241         GNUNET_SCHEDULER_cancel (socket->ack_task_id);
1242         socket->ack_task_id = GNUNET_SCHEDULER_add_delayed (ack_deadline_rel,
1243                                                             &ack_task, socket);
1244         socket->ack_time_registered = GNUNET_TIME_absolute_get ();
1245         socket->ack_time_deadline = ack_deadline_rel;
1246       }
1247     }
1248     if ((NULL != socket->read_handle) /* A read handle is waiting */
1249         /* There is no current read task */
1250         && (GNUNET_SCHEDULER_NO_TASK == socket->read_handle->read_task_id)
1251         /* We have the first packet */
1252         && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap, 0)))
1253     {
1254       LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Scheduling read processor\n",
1255            GNUNET_i2s (&socket->other_peer));
1256       socket->read_handle->read_task_id =
1257           GNUNET_SCHEDULER_add_now (&call_read_processor, socket);
1258     }
1259     break;
1260   default:
1261     LOG (GNUNET_ERROR_TYPE_DEBUG,
1262          "%s: Received data message when it cannot be handled\n",
1263          GNUNET_i2s (&socket->other_peer));
1264     break;
1265   }
1266   return GNUNET_YES;
1267 }
1268
1269
1270 /**
1271  * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1272  *
1273  * @param cls the socket (set from GNUNET_MESH_connect)
1274  * @param tunnel connection to the other end
1275  * @param tunnel_ctx place to store local state associated with the tunnel
1276  * @param sender who sent the message
1277  * @param message the actual message
1278  * @param atsi performance data for the connection
1279  * @return GNUNET_OK to keep the connection open,
1280  *         GNUNET_SYSERR to close it (signal serious error)
1281  */
1282 static int
1283 client_handle_data (void *cls,
1284                     struct GNUNET_MESH_Tunnel *tunnel,
1285                     void **tunnel_ctx,
1286                     const struct GNUNET_PeerIdentity *sender,
1287                     const struct GNUNET_MessageHeader *message,
1288                     const struct GNUNET_ATS_Information*atsi)
1289 {
1290   struct GNUNET_STREAM_Socket *socket = cls;
1291
1292   return handle_data (socket, tunnel, sender, 
1293                       (const struct GNUNET_STREAM_DataMessage *) message, atsi);
1294 }
1295
1296
1297 /**
1298  * Callback to set state to ESTABLISHED
1299  *
1300  * @param cls the closure NULL;
1301  * @param socket the socket to requiring state change
1302  */
1303 static void
1304 set_state_established (void *cls,
1305                        struct GNUNET_STREAM_Socket *socket)
1306 {
1307   LOG (GNUNET_ERROR_TYPE_DEBUG, 
1308        "%s: Attaining ESTABLISHED state\n",
1309        GNUNET_i2s (&socket->other_peer));
1310   socket->write_offset = 0;
1311   socket->read_offset = 0;
1312   socket->state = STATE_ESTABLISHED;
1313   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK !=
1314                  socket->control_retransmission_task_id);
1315   GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
1316   socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
1317   if (NULL != socket->lsocket)
1318   {
1319     LOG (GNUNET_ERROR_TYPE_DEBUG,
1320          "%s: Calling listen callback\n",
1321          GNUNET_i2s (&socket->other_peer));
1322     if (GNUNET_SYSERR == 
1323         socket->lsocket->listen_cb (socket->lsocket->listen_cb_cls,
1324                                     socket,
1325                                     &socket->other_peer))
1326     {
1327       socket->state = STATE_CLOSED;
1328       /* FIXME: We should close in a decent way (send RST) */
1329       GNUNET_MESH_tunnel_destroy (socket->tunnel); /* Destroy the tunnel */
1330       GNUNET_free (socket);
1331     }
1332   }
1333   else
1334     socket->open_cb (socket->open_cls, socket);
1335 }
1336
1337
1338 /**
1339  * Callback to set state to HELLO_WAIT
1340  *
1341  * @param cls the closure from queue_message
1342  * @param socket the socket to requiring state change
1343  */
1344 static void
1345 set_state_hello_wait (void *cls,
1346                       struct GNUNET_STREAM_Socket *socket)
1347 {
1348   GNUNET_assert (STATE_INIT == socket->state);
1349   LOG (GNUNET_ERROR_TYPE_DEBUG,
1350        "%s: Attaining HELLO_WAIT state\n",
1351        GNUNET_i2s (&socket->other_peer));
1352   socket->state = STATE_HELLO_WAIT;
1353 }
1354
1355
1356 /**
1357  * Callback to set state to CLOSE_WAIT
1358  *
1359  * @param cls the closure from queue_message
1360  * @param socket the socket requiring state change
1361  */
1362 static void
1363 set_state_close_wait (void *cls,
1364                       struct GNUNET_STREAM_Socket *socket)
1365 {
1366   LOG (GNUNET_ERROR_TYPE_DEBUG,
1367        "%s: Attaing CLOSE_WAIT state\n",
1368        GNUNET_i2s (&socket->other_peer));
1369   socket->state = STATE_CLOSE_WAIT;
1370   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1371   socket->receive_buffer = NULL;
1372   socket->receive_buffer_size = 0;
1373 }
1374
1375
1376 /**
1377  * Callback to set state to RECEIVE_CLOSE_WAIT
1378  *
1379  * @param cls the closure from queue_message
1380  * @param socket the socket requiring state change
1381  */
1382 static void
1383 set_state_receive_close_wait (void *cls,
1384                               struct GNUNET_STREAM_Socket *socket)
1385 {
1386   LOG (GNUNET_ERROR_TYPE_DEBUG,
1387        "%s: Attaing RECEIVE_CLOSE_WAIT state\n",
1388        GNUNET_i2s (&socket->other_peer));
1389   socket->state = STATE_RECEIVE_CLOSE_WAIT;
1390   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1391   socket->receive_buffer = NULL;
1392   socket->receive_buffer_size = 0;
1393 }
1394
1395
1396 /**
1397  * Callback to set state to TRANSMIT_CLOSE_WAIT
1398  *
1399  * @param cls the closure from queue_message
1400  * @param socket the socket requiring state change
1401  */
1402 static void
1403 set_state_transmit_close_wait (void *cls,
1404                                struct GNUNET_STREAM_Socket *socket)
1405 {
1406   LOG (GNUNET_ERROR_TYPE_DEBUG,
1407        "%s: Attaing TRANSMIT_CLOSE_WAIT state\n",
1408        GNUNET_i2s (&socket->other_peer));
1409   socket->state = STATE_TRANSMIT_CLOSE_WAIT;
1410 }
1411
1412
1413 /**
1414  * Callback to set state to CLOSED
1415  *
1416  * @param cls the closure from queue_message
1417  * @param socket the socket requiring state change
1418  */
1419 static void
1420 set_state_closed (void *cls,
1421                   struct GNUNET_STREAM_Socket *socket)
1422 {
1423   socket->state = STATE_CLOSED;
1424 }
1425
1426
1427 /**
1428  * Returns GNUNET_MESSAGE_TYPE_STREAM_HELLO
1429  *
1430  * @return the generate hello message
1431  */
1432 static struct GNUNET_STREAM_MessageHeader *
1433 generate_hello (void)
1434 {
1435   struct GNUNET_STREAM_MessageHeader *msg;
1436
1437   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1438   msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
1439   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1440   return msg;
1441 }
1442
1443
1444 /**
1445  * Returns a new HelloAckMessage. Also sets the write sequence number for the
1446  * socket
1447  *
1448  * @param socket the socket for which this HelloAckMessage has to be generated
1449  * @param generate_seq GNUNET_YES to generate the write sequence number,
1450  *          GNUNET_NO to use the existing sequence number
1451  * @return the HelloAckMessage
1452  */
1453 static struct GNUNET_STREAM_HelloAckMessage *
1454 generate_hello_ack (struct GNUNET_STREAM_Socket *socket,
1455                     int generate_seq)
1456 {
1457   struct GNUNET_STREAM_HelloAckMessage *msg;
1458
1459   if (GNUNET_YES == generate_seq)
1460   {
1461     if (GNUNET_YES == socket->testing_active)
1462       socket->write_sequence_number =
1463         socket->testing_set_write_sequence_number_value;
1464     else
1465       socket->write_sequence_number = 
1466         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1467     LOG_DEBUG ("%s: write sequence number %u\n",
1468                GNUNET_i2s (&socket->other_peer),
1469                (unsigned int) socket->write_sequence_number);
1470   }
1471   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1472   msg->header.header.size = 
1473     htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1474   msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1475   msg->sequence_number = htonl (socket->write_sequence_number);
1476   msg->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
1477   return msg;
1478 }
1479
1480
1481 /**
1482  * Task for retransmitting control messages if they aren't ACK'ed before a
1483  * deadline
1484  *
1485  * @param cls the socket
1486  * @param tc the Task context
1487  */
1488 static void
1489 control_retransmission_task (void *cls,
1490                              const struct GNUNET_SCHEDULER_TaskContext *tc)
1491 {
1492   struct GNUNET_STREAM_Socket *socket = cls;
1493     
1494   socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
1495   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
1496     return;
1497   LOG_DEBUG ("%s: Retransmitting a control message\n",
1498                  GNUNET_i2s (&socket->other_peer));
1499   switch (socket->state)
1500   {
1501   case STATE_INIT:    
1502     GNUNET_break (0);
1503     break;
1504   case STATE_LISTEN:
1505     GNUNET_break (0);
1506     break;
1507   case STATE_HELLO_WAIT:
1508     if (NULL == socket->lsocket) /* We are client */
1509       queue_message (socket, generate_hello (), NULL, NULL, GNUNET_NO);
1510     else
1511       queue_message (socket,
1512                      (struct GNUNET_STREAM_MessageHeader *)
1513                      generate_hello_ack (socket, GNUNET_NO), NULL, NULL,
1514                      GNUNET_NO);
1515     socket->control_retransmission_task_id =
1516     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
1517                                   &control_retransmission_task, socket);
1518     break;
1519   case STATE_ESTABLISHED:
1520     if (NULL == socket->lsocket)
1521       queue_message (socket,
1522                      (struct GNUNET_STREAM_MessageHeader *)
1523                      generate_hello_ack (socket, GNUNET_NO), NULL, NULL,
1524                      GNUNET_NO);
1525     else
1526       GNUNET_break (0);
1527     break;
1528   default:
1529     GNUNET_break (0);
1530   }  
1531 }
1532
1533
1534 /**
1535  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1536  *
1537  * @param cls the socket (set from GNUNET_MESH_connect)
1538  * @param tunnel connection to the other end
1539  * @param tunnel_ctx this is NULL
1540  * @param sender who sent the message
1541  * @param message the actual message
1542  * @param atsi performance data for the connection
1543  * @return GNUNET_OK to keep the connection open,
1544  *         GNUNET_SYSERR to close it (signal serious error)
1545  */
1546 static int
1547 client_handle_hello_ack (void *cls,
1548                          struct GNUNET_MESH_Tunnel *tunnel,
1549                          void **tunnel_ctx,
1550                          const struct GNUNET_PeerIdentity *sender,
1551                          const struct GNUNET_MessageHeader *message,
1552                          const struct GNUNET_ATS_Information*atsi)
1553 {
1554   struct GNUNET_STREAM_Socket *socket = cls;
1555   const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
1556   struct GNUNET_STREAM_HelloAckMessage *reply;
1557
1558   if (0 != memcmp (sender, &socket->other_peer,
1559                    sizeof (struct GNUNET_PeerIdentity)))
1560   {
1561     LOG (GNUNET_ERROR_TYPE_DEBUG,
1562          "%s: Received HELLO_ACK from non-confirming peer\n",
1563          GNUNET_i2s (&socket->other_peer));
1564     return GNUNET_YES;
1565   }
1566   ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
1567   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received HELLO_ACK from %s\n",
1568        GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1569   GNUNET_assert (socket->tunnel == tunnel);
1570   switch (socket->state)
1571   {
1572   case STATE_HELLO_WAIT:
1573     socket->read_sequence_number = ntohl (ack_msg->sequence_number);
1574     LOG (GNUNET_ERROR_TYPE_DEBUG,
1575          "%s: Read sequence number %u\n",
1576          GNUNET_i2s (&socket->other_peer),
1577          (unsigned int) socket->read_sequence_number);
1578     socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
1579     reply = generate_hello_ack (socket, GNUNET_YES);
1580     queue_message (socket, &reply->header, &set_state_established,
1581                    NULL, GNUNET_NO);    
1582     return GNUNET_OK;
1583   case STATE_ESTABLISHED:
1584     // call statistics (# ACKs ignored++)
1585     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
1586                    socket->control_retransmission_task_id);
1587     socket->control_retransmission_task_id =
1588       GNUNET_SCHEDULER_add_now (&control_retransmission_task, socket);
1589     return GNUNET_OK;
1590   default:
1591     LOG_DEBUG ("%s: Server %s sent HELLO_ACK when in state %d\n", 
1592                GNUNET_i2s (&socket->other_peer),
1593                GNUNET_i2s (&socket->other_peer), socket->state);
1594     socket->state = STATE_CLOSED; // introduce STATE_ERROR?
1595     return GNUNET_SYSERR;
1596   }
1597 }
1598
1599
1600 /**
1601  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1602  *
1603  * @param cls the socket (set from GNUNET_MESH_connect)
1604  * @param tunnel connection to the other end
1605  * @param tunnel_ctx this is NULL
1606  * @param sender who sent the message
1607  * @param message the actual message
1608  * @param atsi performance data for the connection
1609  * @return GNUNET_OK to keep the connection open,
1610  *         GNUNET_SYSERR to close it (signal serious error)
1611  */
1612 static int
1613 client_handle_reset (void *cls,
1614                      struct GNUNET_MESH_Tunnel *tunnel,
1615                      void **tunnel_ctx,
1616                      const struct GNUNET_PeerIdentity *sender,
1617                      const struct GNUNET_MessageHeader *message,
1618                      const struct GNUNET_ATS_Information*atsi)
1619 {
1620   // struct GNUNET_STREAM_Socket *socket = cls;
1621
1622   return GNUNET_OK;
1623 }
1624
1625
1626 /**
1627  * Common message handler for handling TRANSMIT_CLOSE messages
1628  *
1629  * @param socket the socket through which the ack was received
1630  * @param tunnel connection to the other end
1631  * @param sender who sent the message
1632  * @param msg the transmit close message
1633  * @param atsi performance data for the connection
1634  * @return GNUNET_OK to keep the connection open,
1635  *         GNUNET_SYSERR to close it (signal serious error)
1636  */
1637 static int
1638 handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
1639                        struct GNUNET_MESH_Tunnel *tunnel,
1640                        const struct GNUNET_PeerIdentity *sender,
1641                        const struct GNUNET_STREAM_MessageHeader *msg,
1642                        const struct GNUNET_ATS_Information*atsi)
1643 {
1644   struct GNUNET_STREAM_MessageHeader *reply;
1645
1646   switch (socket->state)
1647   {
1648   case STATE_INIT:
1649   case STATE_LISTEN:
1650   case STATE_HELLO_WAIT:
1651     LOG (GNUNET_ERROR_TYPE_DEBUG,
1652          "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
1653          GNUNET_i2s (&socket->other_peer));
1654     return GNUNET_OK;
1655   default:
1656     break;
1657   }
1658   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received TRANSMIT_CLOSE from %s\n",
1659        GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1660   socket->receive_closed = GNUNET_YES;
1661   if (GNUNET_YES == socket->transmit_closed)
1662     socket->state = STATE_CLOSED;
1663   else
1664     socket->state = STATE_RECEIVE_CLOSED;
1665   /* Send TRANSMIT_CLOSE_ACK */
1666   reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1667   reply->header.type = 
1668       htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
1669   reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1670   queue_message (socket, reply, NULL, NULL, GNUNET_NO);
1671   return GNUNET_OK;
1672 }
1673
1674
1675 /**
1676  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1677  *
1678  * @param cls the socket (set from GNUNET_MESH_connect)
1679  * @param tunnel connection to the other end
1680  * @param tunnel_ctx this is NULL
1681  * @param sender who sent the message
1682  * @param message the actual message
1683  * @param atsi performance data for the connection
1684  * @return GNUNET_OK to keep the connection open,
1685  *         GNUNET_SYSERR to close it (signal serious error)
1686  */
1687 static int
1688 client_handle_transmit_close (void *cls,
1689                               struct GNUNET_MESH_Tunnel *tunnel,
1690                               void **tunnel_ctx,
1691                               const struct GNUNET_PeerIdentity *sender,
1692                               const struct GNUNET_MessageHeader *message,
1693                               const struct GNUNET_ATS_Information*atsi)
1694 {
1695   struct GNUNET_STREAM_Socket *socket = cls;
1696   
1697   return handle_transmit_close (socket,
1698                                 tunnel,
1699                                 sender,
1700                                 (struct GNUNET_STREAM_MessageHeader *)message,
1701                                 atsi);
1702 }
1703
1704
1705 /**
1706  * Task for calling the shutdown continuation callback
1707  *
1708  * @param cls the socket
1709  * @param tc the scheduler task context
1710  */
1711 static void
1712 call_cont_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1713 {
1714   struct GNUNET_STREAM_Socket *socket = cls;
1715   
1716   GNUNET_assert (NULL != socket->shutdown_handle);
1717   socket->shutdown_handle->call_cont_task_id = GNUNET_SCHEDULER_NO_TASK;
1718   if (NULL != socket->shutdown_handle->completion_cb)
1719     socket->shutdown_handle->completion_cb
1720         (socket->shutdown_handle->completion_cls,
1721          socket->shutdown_handle->operation);
1722   GNUNET_free (socket->shutdown_handle);
1723   socket->shutdown_handle = NULL;
1724 }
1725
1726
1727 /**
1728  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_*_CLOSE_ACK messages
1729  *
1730  * @param socket the socket
1731  * @param tunnel connection to the other end
1732  * @param sender who sent the message
1733  * @param message the actual message
1734  * @param atsi performance data for the connection
1735  * @param operation the close operation which is being ACK'ed
1736  * @return GNUNET_OK to keep the connection open,
1737  *         GNUNET_SYSERR to close it (signal serious error)
1738  */
1739 static int
1740 handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
1741                           struct GNUNET_MESH_Tunnel *tunnel,
1742                           const struct GNUNET_PeerIdentity *sender,
1743                           const struct GNUNET_STREAM_MessageHeader *message,
1744                           const struct GNUNET_ATS_Information *atsi,
1745                           int operation)
1746 {
1747   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
1748
1749   shutdown_handle = socket->shutdown_handle;
1750   if (NULL == shutdown_handle)
1751   {
1752     /* This may happen when the shudown handle is cancelled */
1753     LOG (GNUNET_ERROR_TYPE_DEBUG,
1754          "%s: Received CLOSE_ACK when shutdown handle is NULL\n",
1755          GNUNET_i2s (&socket->other_peer));
1756     return GNUNET_OK;
1757   }
1758   switch (operation)
1759   {
1760   case SHUT_RDWR:
1761     switch (socket->state)
1762     {
1763     case STATE_CLOSE_WAIT:
1764       if (SHUT_RDWR != shutdown_handle->operation)
1765       {
1766         LOG (GNUNET_ERROR_TYPE_DEBUG,
1767              "%s: Received CLOSE_ACK when shutdown handle is not for "
1768              "SHUT_RDWR\n", GNUNET_i2s (&socket->other_peer));
1769         return GNUNET_OK;
1770       }
1771       LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received CLOSE_ACK from %s\n",
1772            GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1773       socket->state = STATE_CLOSED;
1774       break;
1775     default:
1776       LOG (GNUNET_ERROR_TYPE_DEBUG,
1777            "%s: Received CLOSE_ACK when it is not expected\n",
1778            GNUNET_i2s (&socket->other_peer));
1779       return GNUNET_OK;
1780     }
1781     break;
1782   case SHUT_RD:
1783     switch (socket->state)
1784     {
1785     case STATE_RECEIVE_CLOSE_WAIT:
1786       if (SHUT_RD != shutdown_handle->operation)
1787       {
1788         LOG (GNUNET_ERROR_TYPE_DEBUG,
1789              "%s: Received RECEIVE_CLOSE_ACK when shutdown handle "
1790              "is not for SHUT_RD\n", GNUNET_i2s (&socket->other_peer));
1791         return GNUNET_OK;
1792       }
1793       LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received RECEIVE_CLOSE_ACK from %s\n",
1794            GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1795       socket->state = STATE_RECEIVE_CLOSED;
1796       break;
1797     default:
1798       LOG (GNUNET_ERROR_TYPE_DEBUG,
1799            "%s: Received RECEIVE_CLOSE_ACK when it is not expected\n",
1800            GNUNET_i2s (&socket->other_peer));
1801       return GNUNET_OK;
1802     }
1803     break;
1804   case SHUT_WR:
1805     switch (socket->state)
1806     {
1807     case STATE_TRANSMIT_CLOSE_WAIT:
1808       if (SHUT_WR != shutdown_handle->operation)
1809       {
1810         LOG (GNUNET_ERROR_TYPE_DEBUG,
1811              "%s: Received TRANSMIT_CLOSE_ACK when shutdown handle "
1812              "is not for SHUT_WR\n",
1813              GNUNET_i2s (&socket->other_peer));
1814         return GNUNET_OK;
1815       }
1816       LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received TRANSMIT_CLOSE_ACK from %s\n",
1817            GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1818       socket->state = STATE_TRANSMIT_CLOSED;
1819       break;
1820     default:
1821       LOG (GNUNET_ERROR_TYPE_DEBUG,
1822            "%s: Received TRANSMIT_CLOSE_ACK when it is not expected\n",
1823            GNUNET_i2s (&socket->other_peer));          
1824       return GNUNET_OK;
1825     }
1826     break;
1827   default:
1828     GNUNET_assert (0);
1829   }
1830   shutdown_handle->call_cont_task_id = GNUNET_SCHEDULER_add_now
1831       (&call_cont_task, socket);
1832   if (GNUNET_SCHEDULER_NO_TASK
1833       != shutdown_handle->close_msg_retransmission_task_id)
1834   {
1835     GNUNET_SCHEDULER_cancel
1836       (shutdown_handle->close_msg_retransmission_task_id);
1837     shutdown_handle->close_msg_retransmission_task_id =
1838         GNUNET_SCHEDULER_NO_TASK;
1839   }
1840   return GNUNET_OK;
1841 }
1842
1843
1844 /**
1845  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1846  *
1847  * @param cls the socket (set from GNUNET_MESH_connect)
1848  * @param tunnel connection to the other end
1849  * @param tunnel_ctx this is NULL
1850  * @param sender who sent the message
1851  * @param message the actual message
1852  * @param atsi performance data for the connection
1853  * @return GNUNET_OK to keep the connection open,
1854  *         GNUNET_SYSERR to close it (signal serious error)
1855  */
1856 static int
1857 client_handle_transmit_close_ack (void *cls,
1858                                   struct GNUNET_MESH_Tunnel *tunnel,
1859                                   void **tunnel_ctx,
1860                                   const struct GNUNET_PeerIdentity *sender,
1861                                   const struct GNUNET_MessageHeader *message,
1862                                   const struct GNUNET_ATS_Information*atsi)
1863 {
1864   struct GNUNET_STREAM_Socket *socket = cls;
1865
1866   return handle_generic_close_ack (socket,
1867                                    tunnel,
1868                                    sender,
1869                                    (const struct GNUNET_STREAM_MessageHeader *)
1870                                    message,
1871                                    atsi,
1872                                    SHUT_WR);
1873 }
1874
1875
1876 /**
1877  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1878  *
1879  * @param socket the socket
1880  * @param tunnel connection to the other end
1881  * @param sender who sent the message
1882  * @param message the actual message
1883  * @param atsi performance data for the connection
1884  * @return GNUNET_OK to keep the connection open,
1885  *         GNUNET_SYSERR to close it (signal serious error)
1886  */
1887 static int
1888 handle_receive_close (struct GNUNET_STREAM_Socket *socket,
1889                       struct GNUNET_MESH_Tunnel *tunnel,
1890                       const struct GNUNET_PeerIdentity *sender,
1891                       const struct GNUNET_STREAM_MessageHeader *message,
1892                       const struct GNUNET_ATS_Information *atsi)
1893 {
1894   struct GNUNET_STREAM_MessageHeader *receive_close_ack;
1895
1896   switch (socket->state)
1897   {
1898   case STATE_INIT:
1899   case STATE_LISTEN:
1900   case STATE_HELLO_WAIT:
1901     LOG (GNUNET_ERROR_TYPE_DEBUG,
1902          "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
1903          GNUNET_i2s (&socket->other_peer));
1904     return GNUNET_OK;
1905   default:
1906     break;
1907   }  
1908   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received RECEIVE_CLOSE from %s\n",
1909        GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1910   socket->transmit_closed = GNUNET_YES;
1911   if (GNUNET_YES == socket->receive_closed)
1912     socket->state = STATE_CLOSED;
1913   else
1914     socket->state = STATE_TRANSMIT_CLOSED;
1915   receive_close_ack =
1916     GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1917   receive_close_ack->header.size =
1918     htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1919   receive_close_ack->header.type =
1920     htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK);
1921   queue_message (socket, receive_close_ack, NULL, NULL, GNUNET_NO);
1922   /* If write handle is present call it with GNUNET_STREAM_SHUTDOWN to signal
1923      that that stream has been shutdown */
1924   if (NULL != socket->write_handle)
1925   {
1926     GNUNET_STREAM_CompletionContinuation wc;
1927     void *wc_cls;
1928
1929     wc = socket->write_handle->write_cont;
1930     wc_cls = socket->write_handle->write_cont_cls;
1931     GNUNET_STREAM_io_write_cancel (socket->write_handle);
1932     socket->write_handle = NULL;
1933     if (NULL != wc)
1934       wc (wc_cls,
1935           GNUNET_STREAM_SHUTDOWN, 0);
1936   }
1937   return GNUNET_OK;
1938 }
1939
1940
1941 /**
1942  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1943  *
1944  * @param cls the socket (set from GNUNET_MESH_connect)
1945  * @param tunnel connection to the other end
1946  * @param tunnel_ctx this is NULL
1947  * @param sender who sent the message
1948  * @param message the actual message
1949  * @param atsi performance data for the connection
1950  * @return GNUNET_OK to keep the connection open,
1951  *         GNUNET_SYSERR to close it (signal serious error)
1952  */
1953 static int
1954 client_handle_receive_close (void *cls,
1955                              struct GNUNET_MESH_Tunnel *tunnel,
1956                              void **tunnel_ctx,
1957                              const struct GNUNET_PeerIdentity *sender,
1958                              const struct GNUNET_MessageHeader *message,
1959                              const struct GNUNET_ATS_Information*atsi)
1960 {
1961   struct GNUNET_STREAM_Socket *socket = cls;
1962
1963   return
1964     handle_receive_close (socket,
1965                           tunnel,
1966                           sender,
1967                           (const struct GNUNET_STREAM_MessageHeader *) message,
1968                           atsi);
1969 }
1970
1971
1972 /**
1973  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1974  *
1975  * @param cls the socket (set from GNUNET_MESH_connect)
1976  * @param tunnel connection to the other end
1977  * @param tunnel_ctx this is NULL
1978  * @param sender who sent the message
1979  * @param message the actual message
1980  * @param atsi performance data for the connection
1981  * @return GNUNET_OK to keep the connection open,
1982  *         GNUNET_SYSERR to close it (signal serious error)
1983  */
1984 static int
1985 client_handle_receive_close_ack (void *cls,
1986                                  struct GNUNET_MESH_Tunnel *tunnel,
1987                                  void **tunnel_ctx,
1988                                  const struct GNUNET_PeerIdentity *sender,
1989                                  const struct GNUNET_MessageHeader *message,
1990                                  const struct GNUNET_ATS_Information*atsi)
1991 {
1992   struct GNUNET_STREAM_Socket *socket = cls;
1993
1994   return handle_generic_close_ack (socket,
1995                                    tunnel,
1996                                    sender,
1997                                    (const struct GNUNET_STREAM_MessageHeader *)
1998                                    message,
1999                                    atsi,
2000                                    SHUT_RD);
2001 }
2002
2003
2004 /**
2005  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
2006  *
2007  * @param socket the socket
2008  * @param tunnel connection to the other end
2009  * @param sender who sent the message
2010  * @param message the actual message
2011  * @param atsi performance data for the connection
2012  * @return GNUNET_OK to keep the connection open,
2013  *         GNUNET_SYSERR to close it (signal serious error)
2014  */
2015 static int
2016 handle_close (struct GNUNET_STREAM_Socket *socket,
2017               struct GNUNET_MESH_Tunnel *tunnel,
2018               const struct GNUNET_PeerIdentity *sender,
2019               const struct GNUNET_STREAM_MessageHeader *message,
2020               const struct GNUNET_ATS_Information*atsi)
2021 {
2022   struct GNUNET_STREAM_MessageHeader *close_ack;
2023
2024   switch (socket->state)
2025   {
2026   case STATE_INIT:
2027   case STATE_LISTEN:
2028   case STATE_HELLO_WAIT:
2029     LOG (GNUNET_ERROR_TYPE_DEBUG,
2030          "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
2031          GNUNET_i2s (&socket->other_peer));
2032     return GNUNET_OK;
2033   default:
2034     break;
2035   }
2036   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received CLOSE from %s\n",
2037        GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
2038   close_ack = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
2039   close_ack->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
2040   close_ack->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK);
2041   queue_message (socket, close_ack, &set_state_closed, NULL, GNUNET_NO);
2042   if (STATE_CLOSED == socket->state)
2043     return GNUNET_OK;
2044   socket->receive_closed = GNUNET_YES;
2045   socket->transmit_closed = GNUNET_YES;
2046   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
2047   socket->receive_buffer = NULL;
2048   socket->receive_buffer_size = 0;  
2049   /* If write handle is present call it with GNUNET_STREAM_SHUTDOWN to signal
2050      that that stream has been shutdown */
2051   if (NULL != socket->write_handle)
2052   {
2053     GNUNET_STREAM_CompletionContinuation wc;
2054     void *wc_cls;
2055
2056     wc = socket->write_handle->write_cont;
2057     wc_cls = socket->write_handle->write_cont_cls;
2058     GNUNET_STREAM_io_write_cancel (socket->write_handle);
2059     socket->write_handle = NULL;
2060     if (NULL != wc)
2061       wc (wc_cls,
2062           GNUNET_STREAM_SHUTDOWN, 0);
2063   }
2064   return GNUNET_OK;
2065 }
2066
2067
2068 /**
2069  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
2070  *
2071  * @param cls the socket (set from GNUNET_MESH_connect)
2072  * @param tunnel connection to the other end
2073  * @param tunnel_ctx this is NULL
2074  * @param sender who sent the message
2075  * @param message the actual message
2076  * @param atsi performance data for the connection
2077  * @return GNUNET_OK to keep the connection open,
2078  *         GNUNET_SYSERR to close it (signal serious error)
2079  */
2080 static int
2081 client_handle_close (void *cls,
2082                      struct GNUNET_MESH_Tunnel *tunnel,
2083                      void **tunnel_ctx,
2084                      const struct GNUNET_PeerIdentity *sender,
2085                      const struct GNUNET_MessageHeader *message,
2086                      const struct GNUNET_ATS_Information*atsi)
2087 {
2088   struct GNUNET_STREAM_Socket *socket = cls;
2089
2090   return handle_close (socket,
2091                        tunnel,
2092                        sender,
2093                        (const struct GNUNET_STREAM_MessageHeader *) message,
2094                        atsi);
2095 }
2096
2097
2098 /**
2099  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
2100  *
2101  * @param cls the socket (set from GNUNET_MESH_connect)
2102  * @param tunnel connection to the other end
2103  * @param tunnel_ctx this is NULL
2104  * @param sender who sent the message
2105  * @param message the actual message
2106  * @param atsi performance data for the connection
2107  * @return GNUNET_OK to keep the connection open,
2108  *         GNUNET_SYSERR to close it (signal serious error)
2109  */
2110 static int
2111 client_handle_close_ack (void *cls,
2112                          struct GNUNET_MESH_Tunnel *tunnel,
2113                          void **tunnel_ctx,
2114                          const struct GNUNET_PeerIdentity *sender,
2115                          const struct GNUNET_MessageHeader *message,
2116                          const struct GNUNET_ATS_Information *atsi)
2117 {
2118   struct GNUNET_STREAM_Socket *socket = cls;
2119
2120   return handle_generic_close_ack (socket,
2121                                    tunnel,
2122                                    sender,
2123                                    (const struct GNUNET_STREAM_MessageHeader *) 
2124                                    message,
2125                                    atsi,
2126                                    SHUT_RDWR);
2127 }
2128
2129 /*****************************/
2130 /* Server's Message Handlers */
2131 /*****************************/
2132
2133 /**
2134  * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
2135  *
2136  * @param cls the closure
2137  * @param tunnel connection to the other end
2138  * @param tunnel_ctx the socket
2139  * @param sender who sent the message
2140  * @param message the actual message
2141  * @param atsi performance data for the connection
2142  * @return GNUNET_OK to keep the connection open,
2143  *         GNUNET_SYSERR to close it (signal serious error)
2144  */
2145 static int
2146 server_handle_data (void *cls,
2147                     struct GNUNET_MESH_Tunnel *tunnel,
2148                     void **tunnel_ctx,
2149                     const struct GNUNET_PeerIdentity *sender,
2150                     const struct GNUNET_MessageHeader *message,
2151                     const struct GNUNET_ATS_Information*atsi)
2152 {
2153   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2154
2155   return handle_data (socket,
2156                       tunnel,
2157                       sender,
2158                       (const struct GNUNET_STREAM_DataMessage *)message,
2159                       atsi);
2160 }
2161
2162
2163 /**
2164  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
2165  *
2166  * @param cls the closure
2167  * @param tunnel connection to the other end
2168  * @param tunnel_ctx the socket
2169  * @param sender who sent the message
2170  * @param message the actual message
2171  * @param atsi performance data for the connection
2172  * @return GNUNET_OK to keep the connection open,
2173  *         GNUNET_SYSERR to close it (signal serious error)
2174  */
2175 static int
2176 server_handle_hello (void *cls,
2177                      struct GNUNET_MESH_Tunnel *tunnel,
2178                      void **tunnel_ctx,
2179                      const struct GNUNET_PeerIdentity *sender,
2180                      const struct GNUNET_MessageHeader *message,
2181                      const struct GNUNET_ATS_Information*atsi)
2182 {
2183   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2184   struct GNUNET_STREAM_HelloAckMessage *reply;
2185
2186   if (0 != memcmp (sender,
2187                    &socket->other_peer,
2188                    sizeof (struct GNUNET_PeerIdentity)))
2189   {
2190     LOG_DEBUG ("%s: Received HELLO from non-confirming peer\n",
2191                GNUNET_i2s (&socket->other_peer));
2192     return GNUNET_YES;
2193   }
2194   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == ntohs (message->type));
2195   GNUNET_assert (socket->tunnel == tunnel);
2196   LOG_DEBUG ("%s: Received HELLO from %s\n", GNUNET_i2s (&socket->other_peer),
2197              GNUNET_i2s (&socket->other_peer));
2198   switch (socket->state)
2199   {
2200   case STATE_INIT:
2201     reply = generate_hello_ack (socket, GNUNET_YES);
2202     queue_message (socket, &reply->header, &set_state_hello_wait, NULL,
2203                    GNUNET_NO);
2204     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
2205                    socket->control_retransmission_task_id);
2206     socket->control_retransmission_task_id =
2207       GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
2208                                     &control_retransmission_task, socket);
2209     break;
2210   case STATE_HELLO_WAIT:
2211     /* Perhaps our HELLO_ACK was lost */
2212     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != 
2213                    socket->control_retransmission_task_id);
2214     GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
2215     socket->control_retransmission_task_id =
2216       GNUNET_SCHEDULER_add_now (&control_retransmission_task, socket);
2217     break;
2218   default:
2219     LOG_DEBUG( "%s: Client sent HELLO when in state %d\n",
2220                GNUNET_i2s (&socket->other_peer), socket->state);
2221     /* FIXME: Send RESET? */
2222   }
2223   return GNUNET_OK;
2224 }
2225
2226
2227 /**
2228  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
2229  *
2230  * @param cls the closure
2231  * @param tunnel connection to the other end
2232  * @param tunnel_ctx the socket
2233  * @param sender who sent the message
2234  * @param message the actual message
2235  * @param atsi performance data for the connection
2236  * @return GNUNET_OK to keep the connection open,
2237  *         GNUNET_SYSERR to close it (signal serious error)
2238  */
2239 static int
2240 server_handle_hello_ack (void *cls,
2241                          struct GNUNET_MESH_Tunnel *tunnel,
2242                          void **tunnel_ctx,
2243                          const struct GNUNET_PeerIdentity *sender,
2244                          const struct GNUNET_MessageHeader *message,
2245                          const struct GNUNET_ATS_Information*atsi)
2246 {
2247   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2248   const struct GNUNET_STREAM_HelloAckMessage *ack_message;
2249
2250   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK ==
2251                  ntohs (message->type));
2252   GNUNET_assert (socket->tunnel == tunnel);
2253   ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
2254   switch (socket->state)  
2255   {
2256   case STATE_HELLO_WAIT:
2257     LOG (GNUNET_ERROR_TYPE_DEBUG,
2258          "%s: Received HELLO_ACK from %s\n",
2259          GNUNET_i2s (&socket->other_peer),
2260          GNUNET_i2s (&socket->other_peer));
2261     socket->read_sequence_number = ntohl (ack_message->sequence_number);
2262     LOG (GNUNET_ERROR_TYPE_DEBUG,
2263          "%s: Read sequence number %u\n",
2264          GNUNET_i2s (&socket->other_peer),
2265          (unsigned int) socket->read_sequence_number);
2266     socket->receiver_window_available = 
2267       ntohl (ack_message->receiver_window_size);
2268     set_state_established (NULL, socket);
2269     break;
2270   default:
2271     LOG (GNUNET_ERROR_TYPE_DEBUG,
2272          "Client sent HELLO_ACK when in state %d\n", socket->state);    
2273   }
2274   return GNUNET_OK;
2275 }
2276
2277
2278 /**
2279  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
2280  *
2281  * @param cls the closure
2282  * @param tunnel connection to the other end
2283  * @param tunnel_ctx the socket
2284  * @param sender who sent the message
2285  * @param message the actual message
2286  * @param atsi performance data for the connection
2287  * @return GNUNET_OK to keep the connection open,
2288  *         GNUNET_SYSERR to close it (signal serious error)
2289  */
2290 static int
2291 server_handle_reset (void *cls,
2292                      struct GNUNET_MESH_Tunnel *tunnel,
2293                      void **tunnel_ctx,
2294                      const struct GNUNET_PeerIdentity *sender,
2295                      const struct GNUNET_MessageHeader *message,
2296                      const struct GNUNET_ATS_Information*atsi)
2297 {
2298   // struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2299
2300   return GNUNET_OK;
2301 }
2302
2303
2304 /**
2305  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
2306  *
2307  * @param cls the closure
2308  * @param tunnel connection to the other end
2309  * @param tunnel_ctx the socket
2310  * @param sender who sent the message
2311  * @param message the actual message
2312  * @param atsi performance data for the connection
2313  * @return GNUNET_OK to keep the connection open,
2314  *         GNUNET_SYSERR to close it (signal serious error)
2315  */
2316 static int
2317 server_handle_transmit_close (void *cls,
2318                               struct GNUNET_MESH_Tunnel *tunnel,
2319                               void **tunnel_ctx,
2320                               const struct GNUNET_PeerIdentity *sender,
2321                               const struct GNUNET_MessageHeader *message,
2322                               const struct GNUNET_ATS_Information*atsi)
2323 {
2324   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2325
2326   return handle_transmit_close (socket,
2327                                 tunnel,
2328                                 sender,
2329                                 (struct GNUNET_STREAM_MessageHeader *)message,
2330                                 atsi);
2331 }
2332
2333
2334 /**
2335  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
2336  *
2337  * @param cls the closure
2338  * @param tunnel connection to the other end
2339  * @param tunnel_ctx the socket
2340  * @param sender who sent the message
2341  * @param message the actual message
2342  * @param atsi performance data for the connection
2343  * @return GNUNET_OK to keep the connection open,
2344  *         GNUNET_SYSERR to close it (signal serious error)
2345  */
2346 static int
2347 server_handle_transmit_close_ack (void *cls,
2348                                   struct GNUNET_MESH_Tunnel *tunnel,
2349                                   void **tunnel_ctx,
2350                                   const struct GNUNET_PeerIdentity *sender,
2351                                   const struct GNUNET_MessageHeader *message,
2352                                   const struct GNUNET_ATS_Information*atsi)
2353 {
2354   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2355
2356   return handle_generic_close_ack (socket,
2357                                    tunnel,
2358                                    sender,
2359                                    (const struct GNUNET_STREAM_MessageHeader *)
2360                                    message,
2361                                    atsi,
2362                                    SHUT_WR);
2363 }
2364
2365
2366 /**
2367  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
2368  *
2369  * @param cls the closure
2370  * @param tunnel connection to the other end
2371  * @param tunnel_ctx the socket
2372  * @param sender who sent the message
2373  * @param message the actual message
2374  * @param atsi performance data for the connection
2375  * @return GNUNET_OK to keep the connection open,
2376  *         GNUNET_SYSERR to close it (signal serious error)
2377  */
2378 static int
2379 server_handle_receive_close (void *cls,
2380                              struct GNUNET_MESH_Tunnel *tunnel,
2381                              void **tunnel_ctx,
2382                              const struct GNUNET_PeerIdentity *sender,
2383                              const struct GNUNET_MessageHeader *message,
2384                              const struct GNUNET_ATS_Information*atsi)
2385 {
2386   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2387
2388   return
2389     handle_receive_close (socket,
2390                           tunnel,
2391                           sender,
2392                           (const struct GNUNET_STREAM_MessageHeader *) message,
2393                           atsi);
2394 }
2395
2396
2397 /**
2398  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
2399  *
2400  * @param cls the closure
2401  * @param tunnel connection to the other end
2402  * @param tunnel_ctx the socket
2403  * @param sender who sent the message
2404  * @param message the actual message
2405  * @param atsi performance data for the connection
2406  * @return GNUNET_OK to keep the connection open,
2407  *         GNUNET_SYSERR to close it (signal serious error)
2408  */
2409 static int
2410 server_handle_receive_close_ack (void *cls,
2411                                  struct GNUNET_MESH_Tunnel *tunnel,
2412                                  void **tunnel_ctx,
2413                                  const struct GNUNET_PeerIdentity *sender,
2414                                  const struct GNUNET_MessageHeader *message,
2415                                  const struct GNUNET_ATS_Information*atsi)
2416 {
2417   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2418
2419   return handle_generic_close_ack (socket,
2420                                    tunnel,
2421                                    sender,
2422                                    (const struct GNUNET_STREAM_MessageHeader *)
2423                                    message,
2424                                    atsi,
2425                                    SHUT_RD);
2426 }
2427
2428
2429 /**
2430  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
2431  *
2432  * @param cls the listen socket (from GNUNET_MESH_connect in
2433  *          GNUNET_STREAM_listen) 
2434  * @param tunnel connection to the other end
2435  * @param tunnel_ctx the socket
2436  * @param sender who sent the message
2437  * @param message the actual message
2438  * @param atsi performance data for the connection
2439  * @return GNUNET_OK to keep the connection open,
2440  *         GNUNET_SYSERR to close it (signal serious error)
2441  */
2442 static int
2443 server_handle_close (void *cls,
2444                      struct GNUNET_MESH_Tunnel *tunnel,
2445                      void **tunnel_ctx,
2446                      const struct GNUNET_PeerIdentity *sender,
2447                      const struct GNUNET_MessageHeader *message,
2448                      const struct GNUNET_ATS_Information*atsi)
2449 {
2450   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2451   
2452   return handle_close (socket,
2453                        tunnel,
2454                        sender,
2455                        (const struct GNUNET_STREAM_MessageHeader *) message,
2456                        atsi);
2457 }
2458
2459
2460 /**
2461  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
2462  *
2463  * @param cls the closure
2464  * @param tunnel connection to the other end
2465  * @param tunnel_ctx the socket
2466  * @param sender who sent the message
2467  * @param message the actual message
2468  * @param atsi performance data for the connection
2469  * @return GNUNET_OK to keep the connection open,
2470  *         GNUNET_SYSERR to close it (signal serious error)
2471  */
2472 static int
2473 server_handle_close_ack (void *cls,
2474                          struct GNUNET_MESH_Tunnel *tunnel,
2475                          void **tunnel_ctx,
2476                          const struct GNUNET_PeerIdentity *sender,
2477                          const struct GNUNET_MessageHeader *message,
2478                          const struct GNUNET_ATS_Information*atsi)
2479 {
2480   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2481
2482   return handle_generic_close_ack (socket,
2483                                    tunnel,
2484                                    sender,
2485                                    (const struct GNUNET_STREAM_MessageHeader *) 
2486                                    message,
2487                                    atsi,
2488                                    SHUT_RDWR);
2489 }
2490
2491
2492 /**
2493  * Handler for DATA_ACK messages
2494  *
2495  * @param socket the socket through which the ack was received
2496  * @param tunnel connection to the other end
2497  * @param sender who sent the message
2498  * @param ack the acknowledgment message
2499  * @param atsi performance data for the connection
2500  * @return GNUNET_OK to keep the connection open,
2501  *         GNUNET_SYSERR to close it (signal serious error)
2502  */
2503 static int
2504 handle_ack (struct GNUNET_STREAM_Socket *socket,
2505             struct GNUNET_MESH_Tunnel *tunnel,
2506             const struct GNUNET_PeerIdentity *sender,
2507             const struct GNUNET_STREAM_AckMessage *ack,
2508             const struct GNUNET_ATS_Information*atsi)
2509 {
2510   unsigned int packet;
2511   int need_retransmission;
2512   uint32_t sequence_difference;
2513   
2514   if (0 != memcmp (sender,
2515                    &socket->other_peer,
2516                    sizeof (struct GNUNET_PeerIdentity)))
2517   {
2518     LOG (GNUNET_ERROR_TYPE_DEBUG,
2519          "%s: Received ACK from non-confirming peer\n",
2520          GNUNET_i2s (&socket->other_peer));
2521     return GNUNET_YES;
2522   }
2523   switch (socket->state)
2524   {
2525   case (STATE_ESTABLISHED):
2526   case (STATE_RECEIVE_CLOSED):
2527   case (STATE_RECEIVE_CLOSE_WAIT):
2528     if (NULL == socket->write_handle)
2529     {
2530       LOG (GNUNET_ERROR_TYPE_DEBUG,
2531            "%s: Received DATA_ACK when write_handle is NULL\n",
2532            GNUNET_i2s (&socket->other_peer));
2533       return GNUNET_OK;
2534     }
2535     sequence_difference = 
2536         socket->write_sequence_number - ntohl (ack->base_sequence_number);
2537     if (!(sequence_difference <= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
2538     {
2539       LOG (GNUNET_ERROR_TYPE_DEBUG,
2540            "%s: Received DATA_ACK with unexpected base sequence number\n",
2541            GNUNET_i2s (&socket->other_peer));
2542       LOG (GNUNET_ERROR_TYPE_DEBUG,
2543            "%s: Current write sequence: %u; Ack's base sequence: %u\n",
2544            GNUNET_i2s (&socket->other_peer),
2545            socket->write_sequence_number,
2546            ntohl (ack->base_sequence_number));
2547       return GNUNET_OK;
2548     }
2549     /* FIXME: include the case when write_handle is cancelled - ignore the 
2550        acks */
2551     LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received DATA_ACK from %s\n",
2552          GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
2553     /* Cancel the retransmission task */
2554     if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
2555     {
2556       GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
2557       socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
2558     }
2559     for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2560     {
2561       if (NULL == socket->write_handle->messages[packet]) break;
2562       /* BS: Base sequence from ack; PS: sequence num of current packet */
2563       sequence_difference = ntohl (ack->base_sequence_number)
2564         - ntohl (socket->write_handle->messages[packet]->sequence_number);
2565       if ((0 == sequence_difference) ||
2566           (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH < sequence_difference))
2567         continue; /* The message in our handle is not yet received */
2568       /* case where BS = PS + GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH */
2569       /* sequence_difference <= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH */
2570       ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
2571                             packet, GNUNET_YES);
2572     }
2573     /* Update the receive window remaining
2574        FIXME : Should update with the value from a data ack with greater
2575        sequence number */
2576     socket->receiver_window_available = 
2577       ntohl (ack->receive_window_remaining);
2578     /* Check if we have received all acknowledgements */
2579     need_retransmission = GNUNET_NO;
2580     for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2581     {
2582       if (NULL == socket->write_handle->messages[packet]) break;
2583       if (GNUNET_YES != ackbitmap_is_bit_set 
2584           (&socket->write_handle->ack_bitmap,packet))
2585       {
2586         need_retransmission = GNUNET_YES;
2587         break;
2588       }
2589     }
2590     if (GNUNET_YES == need_retransmission)
2591     {
2592       write_data (socket);
2593     }
2594     else      /* We have to call the write continuation callback now */
2595     {
2596       struct GNUNET_STREAM_IOWriteHandle *write_handle;
2597       
2598       /* Free the packets */
2599       for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2600       {
2601         GNUNET_free_non_null (socket->write_handle->messages[packet]);
2602       }
2603       write_handle = socket->write_handle;
2604       socket->write_handle = NULL;
2605       if (NULL != write_handle->write_cont)
2606         write_handle->write_cont (write_handle->write_cont_cls,
2607                                   socket->status,
2608                                   write_handle->size);
2609       /* We are done with the write handle - Freeing it */
2610       GNUNET_free (write_handle);
2611       LOG (GNUNET_ERROR_TYPE_DEBUG,
2612            "%s: Write completion callback completed\n",
2613            GNUNET_i2s (&socket->other_peer));      
2614     }
2615     break;
2616   default:
2617     break;
2618   }
2619   return GNUNET_OK;
2620 }
2621
2622
2623 /**
2624  * Handler for DATA_ACK messages
2625  *
2626  * @param cls the 'struct GNUNET_STREAM_Socket'
2627  * @param tunnel connection to the other end
2628  * @param tunnel_ctx unused
2629  * @param sender who sent the message
2630  * @param message the actual message
2631  * @param atsi performance data for the connection
2632  * @return GNUNET_OK to keep the connection open,
2633  *         GNUNET_SYSERR to close it (signal serious error)
2634  */
2635 static int
2636 client_handle_ack (void *cls,
2637                    struct GNUNET_MESH_Tunnel *tunnel,
2638                    void **tunnel_ctx,
2639                    const struct GNUNET_PeerIdentity *sender,
2640                    const struct GNUNET_MessageHeader *message,
2641                    const struct GNUNET_ATS_Information*atsi)
2642 {
2643   struct GNUNET_STREAM_Socket *socket = cls;
2644   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2645  
2646   return handle_ack (socket, tunnel, sender, ack, atsi);
2647 }
2648
2649
2650 /**
2651  * Handler for DATA_ACK messages
2652  *
2653  * @param cls the server's listen socket
2654  * @param tunnel connection to the other end
2655  * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
2656  * @param sender who sent the message
2657  * @param message the actual message
2658  * @param atsi performance data for the connection
2659  * @return GNUNET_OK to keep the connection open,
2660  *         GNUNET_SYSERR to close it (signal serious error)
2661  */
2662 static int
2663 server_handle_ack (void *cls,
2664                    struct GNUNET_MESH_Tunnel *tunnel,
2665                    void **tunnel_ctx,
2666                    const struct GNUNET_PeerIdentity *sender,
2667                    const struct GNUNET_MessageHeader *message,
2668                    const struct GNUNET_ATS_Information*atsi)
2669 {
2670   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2671   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2672  
2673   return handle_ack (socket, tunnel, sender, ack, atsi);
2674 }
2675
2676
2677 /**
2678  * For client message handlers, the stream socket is in the
2679  * closure argument.
2680  */
2681 static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
2682   {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2683   {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2684    sizeof (struct GNUNET_STREAM_AckMessage) },
2685   {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2686    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2687   {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2688    sizeof (struct GNUNET_STREAM_MessageHeader)},
2689   {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2690    sizeof (struct GNUNET_STREAM_MessageHeader)},
2691   {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2692    sizeof (struct GNUNET_STREAM_MessageHeader)},
2693   {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2694    sizeof (struct GNUNET_STREAM_MessageHeader)},
2695   {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2696    sizeof (struct GNUNET_STREAM_MessageHeader)},
2697   {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2698    sizeof (struct GNUNET_STREAM_MessageHeader)},
2699   {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2700    sizeof (struct GNUNET_STREAM_MessageHeader)},
2701   {NULL, 0, 0}
2702 };
2703
2704
2705 /**
2706  * For server message handlers, the stream socket is in the
2707  * tunnel context, and the listen socket in the closure argument.
2708  */
2709 static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
2710   {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2711   {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2712    sizeof (struct GNUNET_STREAM_AckMessage) },
2713   {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 
2714    sizeof (struct GNUNET_STREAM_MessageHeader)},
2715   {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2716    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2717   {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2718    sizeof (struct GNUNET_STREAM_MessageHeader)},
2719   {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2720    sizeof (struct GNUNET_STREAM_MessageHeader)},
2721   {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2722    sizeof (struct GNUNET_STREAM_MessageHeader)},
2723   {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2724    sizeof (struct GNUNET_STREAM_MessageHeader)},
2725   {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2726    sizeof (struct GNUNET_STREAM_MessageHeader)},
2727   {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2728    sizeof (struct GNUNET_STREAM_MessageHeader)},
2729   {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2730    sizeof (struct GNUNET_STREAM_MessageHeader)},
2731   {NULL, 0, 0}
2732 };
2733
2734
2735 /**
2736  * Function called when our target peer is connected to our tunnel
2737  *
2738  * @param cls the socket for which this tunnel is created
2739  * @param peer the peer identity of the target
2740  * @param atsi performance data for the connection
2741  */
2742 static void
2743 mesh_peer_connect_callback (void *cls,
2744                             const struct GNUNET_PeerIdentity *peer,
2745                             const struct GNUNET_ATS_Information * atsi)
2746 {
2747   struct GNUNET_STREAM_Socket *socket = cls;
2748   struct GNUNET_STREAM_MessageHeader *message;
2749   
2750   if (0 != memcmp (peer,
2751                    &socket->other_peer,
2752                    sizeof (struct GNUNET_PeerIdentity)))
2753   {
2754     LOG (GNUNET_ERROR_TYPE_DEBUG,
2755          "%s: A peer which is not our target has connected to our tunnel\n",
2756          GNUNET_i2s(peer));
2757     return;
2758   }
2759   LOG (GNUNET_ERROR_TYPE_DEBUG,
2760        "%s: Target peer %s connected\n",
2761        GNUNET_i2s (&socket->other_peer),
2762        GNUNET_i2s (&socket->other_peer));
2763   /* Set state to INIT */
2764   socket->state = STATE_INIT;
2765   /* Send HELLO message */
2766   message = generate_hello ();
2767   queue_message (socket, message, &set_state_hello_wait, NULL, GNUNET_NO);
2768   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
2769                  socket->control_retransmission_task_id);
2770   socket->control_retransmission_task_id =
2771     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
2772                                   &control_retransmission_task, socket);
2773 }
2774
2775
2776 /**
2777  * Function called when our target peer is disconnected from our tunnel
2778  *
2779  * @param cls the socket associated which this tunnel
2780  * @param peer the peer identity of the target
2781  */
2782 static void
2783 mesh_peer_disconnect_callback (void *cls,
2784                                const struct GNUNET_PeerIdentity *peer)
2785 {
2786   struct GNUNET_STREAM_Socket *socket=cls;
2787   
2788   /* If the state is SHUTDOWN its ok; else set the state of the socket to SYSERR */
2789   LOG (GNUNET_ERROR_TYPE_DEBUG,
2790        "%s: Other peer %s disconnected \n",
2791        GNUNET_i2s (&socket->other_peer),
2792        GNUNET_i2s (&socket->other_peer));
2793 }
2794
2795
2796 /**
2797  * Method called whenever a peer creates a tunnel to us
2798  *
2799  * @param cls closure
2800  * @param tunnel new handle to the tunnel
2801  * @param initiator peer that started the tunnel
2802  * @param atsi performance information for the tunnel
2803  * @return initial tunnel context for the tunnel
2804  *         (can be NULL -- that's not an error)
2805  */
2806 static void *
2807 new_tunnel_notify (void *cls,
2808                    struct GNUNET_MESH_Tunnel *tunnel,
2809                    const struct GNUNET_PeerIdentity *initiator,
2810                    const struct GNUNET_ATS_Information *atsi)
2811 {
2812   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2813   struct GNUNET_STREAM_Socket *socket;
2814
2815   /* FIXME: If a tunnel is already created, we should not accept new tunnels
2816      from the same peer again until the socket is closed */
2817
2818   if (GNUNET_NO == lsocket->listening)
2819   {
2820     GNUNET_MESH_tunnel_destroy (tunnel);
2821     return NULL;
2822   }
2823   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2824   socket->other_peer = *initiator;
2825   socket->tunnel = tunnel;
2826   socket->state = STATE_INIT;
2827   socket->lsocket = lsocket;
2828   socket->stat_handle = lsocket->stat_handle;
2829   socket->retransmit_timeout = lsocket->retransmit_timeout;
2830   socket->testing_active = lsocket->testing_active;
2831   socket->testing_set_write_sequence_number_value =
2832       lsocket->testing_set_write_sequence_number_value;
2833   socket->max_payload_size = lsocket->max_payload_size;
2834   LOG (GNUNET_ERROR_TYPE_DEBUG,
2835        "%s: Peer %s initiated tunnel to us\n", 
2836        GNUNET_i2s (&socket->other_peer),
2837        GNUNET_i2s (&socket->other_peer));
2838   if (NULL != socket->stat_handle)
2839   {
2840     GNUNET_STATISTICS_update (socket->stat_handle,
2841                               "total inbound connections received",
2842                               1, GNUNET_NO);
2843     GNUNET_STATISTICS_update (socket->stat_handle,
2844                               "inbound connections", 1, GNUNET_NO);
2845   }
2846   
2847   return socket;
2848 }
2849
2850
2851 /**
2852  * Function called whenever an inbound tunnel is destroyed.  Should clean up
2853  * any associated state.  This function is NOT called if the client has
2854  * explicitly asked for the tunnel to be destroyed using
2855  * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
2856  * the tunnel.
2857  *
2858  * @param cls closure (set from GNUNET_MESH_connect)
2859  * @param tunnel connection to the other end (henceforth invalid)
2860  * @param tunnel_ctx place where local state associated
2861  *                   with the tunnel is stored
2862  */
2863 static void 
2864 tunnel_cleaner (void *cls,
2865                 const struct GNUNET_MESH_Tunnel *tunnel,
2866                 void *tunnel_ctx)
2867 {
2868   struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
2869   struct MessageQueue *head;
2870
2871   GNUNET_assert (tunnel == socket->tunnel);
2872   GNUNET_break_op(0);
2873   LOG (GNUNET_ERROR_TYPE_DEBUG,
2874        "%s: Peer %s has terminated connection abruptly\n",
2875        GNUNET_i2s (&socket->other_peer),
2876        GNUNET_i2s (&socket->other_peer));
2877   if (NULL != socket->stat_handle)
2878   {
2879     GNUNET_STATISTICS_update (socket->stat_handle,
2880                               "connections terminated abruptly", 1, GNUNET_NO);
2881     GNUNET_STATISTICS_update (socket->stat_handle,
2882                               "inbound connections", -1, GNUNET_NO);
2883   }
2884   socket->status = GNUNET_STREAM_SHUTDOWN;
2885   /* Clear Transmit handles */
2886   if (NULL != socket->transmit_handle)
2887   {
2888     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2889     socket->transmit_handle = NULL;
2890   }
2891   /* Stop Tasks using socket->tunnel */
2892   if (GNUNET_SCHEDULER_NO_TASK != socket->ack_task_id)
2893   {
2894     GNUNET_SCHEDULER_cancel (socket->ack_task_id);
2895     socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
2896   }
2897   if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
2898   {
2899     GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
2900     socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
2901   }  
2902   /* Terminate the control retransmission tasks */
2903   if (GNUNET_SCHEDULER_NO_TASK != socket->control_retransmission_task_id)
2904   {
2905     GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
2906     socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
2907   }
2908   /* Clear Transmit handles */
2909   if (NULL != socket->transmit_handle)
2910   {
2911     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2912     socket->transmit_handle = NULL;
2913   }
2914   /* Clear existing message queue */
2915   while (NULL != (head = socket->queue_head)) {
2916     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
2917                                  socket->queue_tail,
2918                                  head);
2919     GNUNET_free (head->message);
2920     GNUNET_free (head);
2921   }
2922   socket->tunnel = NULL;
2923 }
2924
2925
2926 /**
2927  * Callback to signal timeout on lockmanager lock acquire
2928  *
2929  * @param cls the ListenSocket
2930  * @param tc the scheduler task context
2931  */
2932 static void
2933 lockmanager_acquire_timeout (void *cls, 
2934                              const struct GNUNET_SCHEDULER_TaskContext *tc)
2935 {
2936   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2937   GNUNET_STREAM_ListenCallback listen_cb;
2938   void *listen_cb_cls;
2939
2940   lsocket->lockmanager_acquire_timeout_task = GNUNET_SCHEDULER_NO_TASK;
2941   listen_cb = lsocket->listen_cb;
2942   listen_cb_cls = lsocket->listen_cb_cls;
2943   if (NULL != listen_cb)
2944     listen_cb (listen_cb_cls, NULL, NULL);
2945 }
2946
2947
2948 /**
2949  * Callback to notify us on the status changes on app_port lock
2950  *
2951  * @param cls the ListenSocket
2952  * @param domain the domain name of the lock
2953  * @param lock the app_port
2954  * @param status the current status of the lock
2955  */
2956 static void
2957 lock_status_change_cb (void *cls, const char *domain, uint32_t lock,
2958                        enum GNUNET_LOCKMANAGER_Status status)
2959 {
2960   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2961
2962   GNUNET_assert (lock == (uint32_t) lsocket->port);
2963   if (GNUNET_LOCKMANAGER_SUCCESS == status)
2964   {
2965     lsocket->listening = GNUNET_YES;
2966     if (GNUNET_SCHEDULER_NO_TASK != lsocket->lockmanager_acquire_timeout_task)
2967     {
2968       GNUNET_SCHEDULER_cancel (lsocket->lockmanager_acquire_timeout_task);
2969       lsocket->lockmanager_acquire_timeout_task = GNUNET_SCHEDULER_NO_TASK;
2970     }
2971     if (NULL == lsocket->mesh)
2972     {
2973       GNUNET_MESH_ApplicationType ports[] = {lsocket->port, 0};
2974
2975       lsocket->mesh = GNUNET_MESH_connect (lsocket->cfg,
2976                                            lsocket, /* Closure */
2977                                            &new_tunnel_notify,
2978                                            &tunnel_cleaner,
2979                                            server_message_handlers,
2980                                            ports);
2981       GNUNET_assert (NULL != lsocket->mesh);
2982       if (NULL != lsocket->listen_ok_cb)
2983       {
2984         (void) lsocket->listen_ok_cb ();
2985       }
2986     }
2987   }
2988   if (GNUNET_LOCKMANAGER_RELEASE == status)
2989     lsocket->listening = GNUNET_NO;
2990 }
2991
2992
2993 /*****************/
2994 /* API functions */
2995 /*****************/
2996
2997
2998 /**
2999  * Tries to open a stream to the target peer
3000  *
3001  * @param cfg configuration to use
3002  * @param target the target peer to which the stream has to be opened
3003  * @param app_port the application port number which uniquely identifies this
3004  *            stream
3005  * @param open_cb this function will be called after stream has be established;
3006  *          cannot be NULL
3007  * @param open_cb_cls the closure for open_cb
3008  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
3009  * @return if successful it returns the stream socket; NULL if stream cannot be
3010  *         opened 
3011  */
3012 struct GNUNET_STREAM_Socket *
3013 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
3014                     const struct GNUNET_PeerIdentity *target,
3015                     GNUNET_MESH_ApplicationType app_port,
3016                     GNUNET_STREAM_OpenCallback open_cb,
3017                     void *open_cb_cls,
3018                     ...)
3019 {
3020   struct GNUNET_STREAM_Socket *socket;
3021   enum GNUNET_STREAM_Option option;
3022   GNUNET_MESH_ApplicationType ports[] = {app_port, 0};
3023   va_list vargs;
3024   uint16_t payload_size;
3025
3026   LOG (GNUNET_ERROR_TYPE_DEBUG,
3027        "%s\n", __func__);
3028   GNUNET_assert (NULL != open_cb);
3029   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
3030   socket->other_peer = *target;
3031   socket->open_cb = open_cb;
3032   socket->open_cls = open_cb_cls;
3033   /* Set defaults */
3034   socket->retransmit_timeout = TIME_REL_SECS (default_timeout);
3035   socket->testing_active = GNUNET_NO;
3036   socket->max_payload_size = DEFAULT_MAX_PAYLOAD_SIZE;
3037   va_start (vargs, open_cb_cls); /* Parse variable args */
3038   do {
3039     option = va_arg (vargs, enum GNUNET_STREAM_Option);
3040     switch (option)
3041     {
3042     case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
3043       /* Expect struct GNUNET_TIME_Relative */
3044       socket->retransmit_timeout = va_arg (vargs,
3045                                            struct GNUNET_TIME_Relative);
3046       break;
3047     case GNUNET_STREAM_OPTION_TESTING_SET_WRITE_SEQUENCE_NUMBER:
3048       socket->testing_active = GNUNET_YES;
3049       socket->testing_set_write_sequence_number_value = va_arg (vargs,
3050                                                                 uint32_t);
3051       break;
3052     case GNUNET_STREAM_OPTION_LISTEN_TIMEOUT:
3053       GNUNET_break (0);          /* Option irrelevant in STREAM_open */
3054       break;
3055     case GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS:
3056       GNUNET_break (0);          /* Option irrelevant in STREAM_open */
3057       break;
3058     case GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE:
3059       payload_size = (uint16_t) va_arg (vargs, unsigned int);
3060       GNUNET_assert (0 != payload_size);
3061       if (payload_size < socket->max_payload_size)
3062         socket->max_payload_size = payload_size;
3063       break;
3064     case GNUNET_STREAM_OPTION_END:
3065       break;
3066     }
3067   } while (GNUNET_STREAM_OPTION_END != option);
3068   va_end (vargs);               /* End of variable args parsing */
3069   socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
3070                                       socket, /* cls */
3071                                       NULL, /* No inbound tunnel handler */
3072                                       NULL, /* No in-tunnel cleaner */
3073                                       client_message_handlers,
3074                                       ports); /* We don't get inbound tunnels */
3075   if (NULL == socket->mesh)   /* Fail if we cannot connect to mesh */
3076   {
3077     GNUNET_free (socket);
3078     return NULL;
3079   }
3080   /* Now create the mesh tunnel to target */
3081   LOG (GNUNET_ERROR_TYPE_DEBUG, "Creating MESH Tunnel\n");
3082   socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
3083                                               NULL, /* Tunnel context */
3084                                               &mesh_peer_connect_callback,
3085                                               &mesh_peer_disconnect_callback,
3086                                               socket);
3087   GNUNET_assert (NULL != socket->tunnel);
3088   GNUNET_MESH_peer_request_connect_add (socket->tunnel,
3089                                         &socket->other_peer);
3090   socket->stat_handle = GNUNET_STATISTICS_create ("stream", cfg);
3091   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s() END\n", __func__);
3092   return socket;
3093 }
3094
3095
3096 /**
3097  * Shutdown the stream for reading or writing (similar to man 2 shutdown).
3098  *
3099  * @param socket the stream socket
3100  * @param operation SHUT_RD, SHUT_WR or SHUT_RDWR
3101  * @param completion_cb the callback that will be called upon successful
3102  *          shutdown of given operation
3103  * @param completion_cls the closure for the completion callback
3104  * @return the shutdown handle
3105  */
3106 struct GNUNET_STREAM_ShutdownHandle *
3107 GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
3108                         int operation,
3109                         GNUNET_STREAM_ShutdownCompletion completion_cb,
3110                         void *completion_cls)
3111 {
3112   struct GNUNET_STREAM_ShutdownHandle *handle;
3113   struct GNUNET_STREAM_MessageHeader *msg;
3114   
3115   GNUNET_assert (NULL == socket->shutdown_handle);
3116   handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ShutdownHandle));
3117   handle->socket = socket;
3118   handle->completion_cb = completion_cb;
3119   handle->completion_cls = completion_cls;
3120   socket->shutdown_handle = handle;
3121   if ( ((GNUNET_YES == socket->receive_closed) && (SHUT_RD == operation))
3122        || ((GNUNET_YES == socket->transmit_closed) && (SHUT_WR == operation))
3123        || ((GNUNET_YES == socket->transmit_closed) 
3124            && (GNUNET_YES == socket->receive_closed)
3125            && (SHUT_RDWR == operation)) )
3126   {
3127     handle->operation = operation;
3128     handle->call_cont_task_id = GNUNET_SCHEDULER_add_now (&call_cont_task,
3129                                                           socket);
3130     return handle;
3131   }
3132   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
3133   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
3134   switch (operation)
3135   {
3136   case SHUT_RD:
3137     handle->operation = SHUT_RD;
3138     if (NULL != socket->read_handle)
3139       LOG (GNUNET_ERROR_TYPE_WARNING,
3140            "Existing read handle should be cancelled before shutting"
3141            " down reading\n");
3142     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
3143     queue_message (socket, msg, &set_state_receive_close_wait, NULL,
3144                    GNUNET_NO);
3145     socket->receive_closed = GNUNET_YES;
3146     break;
3147   case SHUT_WR:
3148     handle->operation = SHUT_WR;
3149     if (NULL != socket->write_handle)
3150       LOG (GNUNET_ERROR_TYPE_WARNING,
3151            "Existing write handle should be cancelled before shutting"
3152            " down writing\n");
3153     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
3154     queue_message (socket, msg, &set_state_transmit_close_wait, NULL,
3155                    GNUNET_NO);
3156     socket->transmit_closed = GNUNET_YES;
3157     break;
3158   case SHUT_RDWR:
3159     handle->operation = SHUT_RDWR;
3160     if (NULL != socket->write_handle)
3161       LOG (GNUNET_ERROR_TYPE_WARNING,
3162            "Existing write handle should be cancelled before shutting"
3163            " down writing\n");
3164     if (NULL != socket->read_handle)
3165       LOG (GNUNET_ERROR_TYPE_WARNING,
3166            "Existing read handle should be cancelled before shutting"
3167            " down reading\n");
3168     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
3169     queue_message (socket, msg, &set_state_close_wait, NULL, GNUNET_NO);
3170     socket->transmit_closed = GNUNET_YES;
3171     socket->receive_closed = GNUNET_YES;
3172     break;
3173   default:
3174     LOG (GNUNET_ERROR_TYPE_WARNING,
3175          "GNUNET_STREAM_shutdown called with invalid value for "
3176          "parameter operation -- Ignoring\n");
3177     GNUNET_free (msg);
3178     GNUNET_free (handle);
3179     return NULL;
3180   }
3181   handle->close_msg_retransmission_task_id =
3182     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
3183                                   &close_msg_retransmission_task,
3184                                   handle);
3185   return handle;
3186 }
3187
3188
3189 /**
3190  * Cancels a pending shutdown. Note that the shutdown messages may already be
3191  * sent and the stream is shutdown already for the operation given to
3192  * GNUNET_STREAM_shutdown(). This function only clears up any retranmissions of
3193  * shutdown messages and frees the shutdown handle.
3194  *
3195  * @param handle the shutdown handle returned from GNUNET_STREAM_shutdown
3196  */
3197 void
3198 GNUNET_STREAM_shutdown_cancel (struct GNUNET_STREAM_ShutdownHandle *handle)
3199 {
3200   if (GNUNET_SCHEDULER_NO_TASK != handle->close_msg_retransmission_task_id)
3201     GNUNET_SCHEDULER_cancel (handle->close_msg_retransmission_task_id);
3202   if (GNUNET_SCHEDULER_NO_TASK != handle->call_cont_task_id)
3203     GNUNET_SCHEDULER_cancel (handle->call_cont_task_id);
3204   handle->socket->shutdown_handle = NULL;
3205   GNUNET_free (handle);
3206 }
3207
3208
3209 /**
3210  * Closes the stream
3211  *
3212  * @param socket the stream socket
3213  */
3214 void
3215 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
3216 {
3217   struct MessageQueue *head;
3218
3219   if (NULL != socket->read_handle)
3220   {
3221     LOG (GNUNET_ERROR_TYPE_WARNING,
3222          "Closing STREAM socket when a read handle is pending\n");
3223     GNUNET_STREAM_io_read_cancel (socket->read_handle);
3224   }
3225   if (NULL != socket->write_handle)
3226   {
3227     LOG (GNUNET_ERROR_TYPE_WARNING,
3228          "Closing STREAM socket when a write handle is pending\n");
3229     GNUNET_STREAM_io_write_cancel (socket->write_handle);
3230     //socket->write_handle = NULL;
3231   }
3232   /* Terminate the ack'ing task if they are still present */
3233   if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK)
3234   {
3235     GNUNET_SCHEDULER_cancel (socket->ack_task_id);
3236     socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
3237   }
3238   /* Terminate the control retransmission tasks */
3239   if (GNUNET_SCHEDULER_NO_TASK != socket->control_retransmission_task_id)
3240   {
3241     GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
3242   }
3243   /* Clear Transmit handles */
3244   if (NULL != socket->transmit_handle)
3245   {
3246     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
3247     socket->transmit_handle = NULL;
3248   }
3249   /* Clear existing message queue */
3250   while (NULL != (head = socket->queue_head)) {
3251     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
3252                                  socket->queue_tail,
3253                                  head);
3254     GNUNET_free (head->message);
3255     GNUNET_free (head);
3256   }
3257   /* Close associated tunnel */
3258   if (NULL != socket->tunnel)
3259   {
3260     GNUNET_MESH_tunnel_destroy (socket->tunnel);
3261     socket->tunnel = NULL;
3262   }
3263   /* Close mesh connection */
3264   if ((NULL != socket->mesh) && (NULL == socket->lsocket))
3265   {
3266     GNUNET_MESH_disconnect (socket->mesh);
3267     socket->mesh = NULL;
3268   }
3269   /* Close statistics connection */
3270   if ( (NULL != socket->stat_handle) && (NULL == socket->lsocket) )
3271     GNUNET_STATISTICS_destroy (socket->stat_handle, GNUNET_YES);
3272   /* Release receive buffer */
3273   if (NULL != socket->receive_buffer)
3274   {
3275     GNUNET_free (socket->receive_buffer);
3276   }
3277   GNUNET_free (socket);
3278 }
3279
3280
3281 /**
3282  * Listens for stream connections for a specific application ports
3283  *
3284  * @param cfg the configuration to use
3285  * @param app_port the application port for which new streams will be accepted
3286  * @param listen_cb this function will be called when a peer tries to establish
3287  *            a stream with us
3288  * @param listen_cb_cls closure for listen_cb
3289  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
3290  * @return listen socket, NULL for any error
3291  */
3292 struct GNUNET_STREAM_ListenSocket *
3293 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
3294                       GNUNET_MESH_ApplicationType app_port,
3295                       GNUNET_STREAM_ListenCallback listen_cb,
3296                       void *listen_cb_cls,
3297                       ...)
3298 {
3299   struct GNUNET_STREAM_ListenSocket *lsocket;
3300   struct GNUNET_TIME_Relative listen_timeout;
3301   enum GNUNET_STREAM_Option option;
3302   va_list vargs;
3303   uint16_t payload_size;
3304
3305   GNUNET_assert (NULL != listen_cb);
3306   lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
3307   lsocket->cfg = GNUNET_CONFIGURATION_dup (cfg);
3308   lsocket->lockmanager = GNUNET_LOCKMANAGER_connect (lsocket->cfg);
3309   if (NULL == lsocket->lockmanager)
3310   {
3311     GNUNET_CONFIGURATION_destroy (lsocket->cfg);
3312     GNUNET_free (lsocket);
3313     return NULL;
3314   }
3315   lsocket->listening = GNUNET_NO;/* We listen when we get a lock on app_port */  
3316   /* Set defaults */
3317   lsocket->retransmit_timeout = TIME_REL_SECS (default_timeout);
3318   lsocket->testing_active = GNUNET_NO;
3319   lsocket->listen_ok_cb = NULL;
3320   lsocket->max_payload_size = DEFAULT_MAX_PAYLOAD_SIZE;
3321   listen_timeout = TIME_REL_SECS (60); /* A minute for listen timeout */  
3322   va_start (vargs, listen_cb_cls);
3323   do {
3324     option = va_arg (vargs, enum GNUNET_STREAM_Option);
3325     switch (option)
3326     {
3327     case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
3328       lsocket->retransmit_timeout = va_arg (vargs,
3329                                             struct GNUNET_TIME_Relative);
3330       break;
3331     case GNUNET_STREAM_OPTION_TESTING_SET_WRITE_SEQUENCE_NUMBER:
3332       lsocket->testing_active = GNUNET_YES;
3333       lsocket->testing_set_write_sequence_number_value = va_arg (vargs,
3334                                                                  uint32_t);
3335       break;
3336     case GNUNET_STREAM_OPTION_LISTEN_TIMEOUT:
3337       listen_timeout = GNUNET_TIME_relative_multiply
3338         (GNUNET_TIME_UNIT_MILLISECONDS, va_arg (vargs, uint32_t));
3339       break;
3340     case GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS:
3341       lsocket->listen_ok_cb = va_arg (vargs,
3342                                       GNUNET_STREAM_ListenSuccessCallback);
3343       break;
3344     case GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE:
3345       payload_size = (uint16_t) va_arg (vargs, unsigned int);
3346       GNUNET_assert (0 != payload_size);
3347       if (payload_size < lsocket->max_payload_size)
3348         lsocket->max_payload_size = payload_size;
3349       break;
3350     case GNUNET_STREAM_OPTION_END:
3351       break;
3352     }
3353   } while (GNUNET_STREAM_OPTION_END != option);
3354   va_end (vargs);
3355   lsocket->port = app_port;
3356   lsocket->listen_cb = listen_cb;
3357   lsocket->listen_cb_cls = listen_cb_cls;
3358   lsocket->locking_request = 
3359     GNUNET_LOCKMANAGER_acquire_lock (lsocket->lockmanager, locking_domain,
3360                                      (uint32_t) lsocket->port,
3361                                      &lock_status_change_cb, lsocket);
3362   lsocket->lockmanager_acquire_timeout_task =
3363     GNUNET_SCHEDULER_add_delayed (listen_timeout,
3364                                   &lockmanager_acquire_timeout, lsocket);
3365   lsocket->stat_handle = GNUNET_STATISTICS_create ("stream",
3366                                                    lsocket->cfg);
3367   return lsocket;
3368 }
3369
3370
3371 /**
3372  * Closes the listen socket
3373  *
3374  * @param lsocket the listen socket
3375  */
3376 void
3377 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
3378 {
3379   /* Close MESH connection */
3380   if (NULL != lsocket->mesh)
3381     GNUNET_MESH_disconnect (lsocket->mesh);
3382   if (NULL != lsocket->stat_handle)
3383     GNUNET_STATISTICS_destroy (lsocket->stat_handle, GNUNET_YES);
3384   GNUNET_CONFIGURATION_destroy (lsocket->cfg);
3385   if (GNUNET_SCHEDULER_NO_TASK != lsocket->lockmanager_acquire_timeout_task)
3386     GNUNET_SCHEDULER_cancel (lsocket->lockmanager_acquire_timeout_task);
3387   if (NULL != lsocket->locking_request)
3388     GNUNET_LOCKMANAGER_cancel_request (lsocket->locking_request);
3389   if (NULL != lsocket->lockmanager)
3390     GNUNET_LOCKMANAGER_disconnect (lsocket->lockmanager);
3391   GNUNET_free (lsocket);
3392 }
3393
3394
3395 /**
3396  * Tries to write the given data to the stream. The maximum size of data that
3397  * can be written as part of a write operation is (64 * (64000 - sizeof (struct
3398  * GNUNET_STREAM_DataMessage))). If size is greater than this it is not an API
3399  * violation, however only the said number of maximum bytes will be written.
3400  *
3401  * @param socket the socket representing a stream
3402  * @param data the data buffer from where the data is written into the stream
3403  * @param size the number of bytes to be written from the data buffer
3404  * @param timeout the timeout period
3405  * @param write_cont the function to call upon writing some bytes into the
3406  *          stream 
3407  * @param write_cont_cls the closure
3408  *
3409  * @return handle to cancel the operation; if a previous write is pending or
3410  *           the stream has been shutdown for this operation then write_cont is
3411  *           immediately called and NULL is returned.
3412  */
3413 struct GNUNET_STREAM_IOWriteHandle *
3414 GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
3415                      const void *data,
3416                      size_t size,
3417                      struct GNUNET_TIME_Relative timeout,
3418                      GNUNET_STREAM_CompletionContinuation write_cont,
3419                      void *write_cont_cls)
3420 {
3421   struct GNUNET_STREAM_IOWriteHandle *io_handle;
3422   struct GNUNET_STREAM_DataMessage *data_msg;
3423   const void *sweep;
3424   struct GNUNET_TIME_Relative ack_deadline;
3425   unsigned int num_needed_packets;
3426   unsigned int packet;
3427   uint32_t packet_size;
3428   uint32_t payload_size;
3429   uint16_t max_data_packet_size;
3430
3431   LOG (GNUNET_ERROR_TYPE_DEBUG,
3432        "%s\n", __func__);
3433   if (NULL != socket->write_handle)
3434   {
3435     GNUNET_break (0);
3436     return NULL;
3437   }
3438   switch (socket->state)
3439   {
3440   case STATE_TRANSMIT_CLOSED:
3441   case STATE_TRANSMIT_CLOSE_WAIT:
3442   case STATE_CLOSED:
3443   case STATE_CLOSE_WAIT:
3444     if (NULL != write_cont)
3445       write_cont (write_cont_cls, GNUNET_STREAM_SHUTDOWN, 0);
3446     LOG (GNUNET_ERROR_TYPE_DEBUG,
3447          "%s() END\n", __func__);
3448     return NULL;
3449   case STATE_INIT:
3450   case STATE_LISTEN:
3451   case STATE_HELLO_WAIT:
3452     if (NULL != write_cont)
3453       /* FIXME: GNUNET_STREAM_SYSERR?? */
3454       write_cont (write_cont_cls, GNUNET_STREAM_SYSERR, 0);
3455     LOG (GNUNET_ERROR_TYPE_DEBUG,
3456          "%s() END\n", __func__);
3457     return NULL;
3458   case STATE_ESTABLISHED:
3459   case STATE_RECEIVE_CLOSED:
3460   case STATE_RECEIVE_CLOSE_WAIT:
3461     break;
3462   }
3463   if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * socket->max_payload_size < size)
3464     size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH  * socket->max_payload_size;
3465   num_needed_packets =
3466       (size + (socket->max_payload_size - 1)) / socket->max_payload_size;
3467   io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
3468   io_handle->socket = socket;
3469   io_handle->write_cont = write_cont;
3470   io_handle->write_cont_cls = write_cont_cls;
3471   io_handle->size = size;
3472   io_handle->packets_sent = 0;
3473   sweep = data;
3474   /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3475      determined from RTT */
3476   ack_deadline = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
3477   /* Divide the given buffer into packets for sending */
3478   max_data_packet_size =
3479       socket->max_payload_size + sizeof (struct GNUNET_STREAM_DataMessage);
3480   for (packet=0; packet < num_needed_packets; packet++)
3481   {
3482     if ((packet + 1) * socket->max_payload_size < size) 
3483     {
3484       payload_size = socket->max_payload_size;
3485       packet_size = max_data_packet_size;
3486     }
3487     else 
3488     {
3489       payload_size = size - packet * socket->max_payload_size;
3490       packet_size = 
3491           payload_size + sizeof (struct GNUNET_STREAM_DataMessage);
3492     }
3493     io_handle->messages[packet] = GNUNET_malloc (packet_size);
3494     io_handle->messages[packet]->header.header.size = htons (packet_size);
3495     io_handle->messages[packet]->header.header.type =
3496       htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
3497     io_handle->messages[packet]->sequence_number =
3498       htonl (socket->write_sequence_number++);
3499     io_handle->messages[packet]->offset = htonl (socket->write_offset);
3500     /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3501        determined from RTT */
3502     io_handle->messages[packet]->ack_deadline =
3503       GNUNET_TIME_relative_hton (ack_deadline);
3504     data_msg = io_handle->messages[packet];
3505     /* Copy data from given buffer to the packet */
3506     memcpy (&data_msg[1], sweep, payload_size);
3507     sweep += payload_size;
3508     socket->write_offset += payload_size;
3509   }
3510   /* ack the last data message. FIXME: remove when we figure out how to do this
3511      using RTT */
3512   io_handle->messages[num_needed_packets - 1]->ack_deadline = 
3513       GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO);
3514   socket->write_handle = io_handle;
3515   write_data (socket);
3516   LOG (GNUNET_ERROR_TYPE_DEBUG,
3517        "%s() END\n", __func__);
3518   return io_handle;
3519 }
3520
3521
3522 /**
3523  * Tries to read data from the stream.
3524  *
3525  * @param socket the socket representing a stream
3526  * @param timeout the timeout period
3527  * @param proc function to call with data (once only)
3528  * @param proc_cls the closure for proc
3529  *
3530  * @return handle to cancel the operation; NULL is returned if: the stream has
3531  *           been shutdown for this type of opeartion (the DataProcessor is
3532  *           immediately called with GNUNET_STREAM_SHUTDOWN as status) OR another
3533  *           read handle is present (only one read handle per socket is present
3534  *           at any time)
3535  */
3536 struct GNUNET_STREAM_IOReadHandle *
3537 GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
3538                     struct GNUNET_TIME_Relative timeout,
3539                     GNUNET_STREAM_DataProcessor proc,
3540                     void *proc_cls)
3541 {
3542   struct GNUNET_STREAM_IOReadHandle *read_handle;
3543   
3544   LOG (GNUNET_ERROR_TYPE_DEBUG,
3545        "%s: %s()\n", 
3546        GNUNET_i2s (&socket->other_peer),
3547        __func__);
3548   /* Return NULL if there is already a read handle; the user has to cancel that
3549      first before continuing or has to wait until it is completed */
3550   if (NULL != socket->read_handle) 
3551     return NULL;
3552   GNUNET_assert (NULL != proc);
3553   switch (socket->state)
3554   {
3555   case STATE_RECEIVE_CLOSED:
3556   case STATE_RECEIVE_CLOSE_WAIT:
3557   case STATE_CLOSED:
3558   case STATE_CLOSE_WAIT:
3559     LOG (GNUNET_ERROR_TYPE_DEBUG,
3560          "%s: %s() END\n",
3561          GNUNET_i2s (&socket->other_peer),
3562          __func__);
3563     proc (proc_cls, GNUNET_STREAM_SHUTDOWN, NULL, 0);
3564     return NULL;
3565   default:
3566     break;
3567   }
3568   read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
3569   read_handle->proc = proc;
3570   read_handle->proc_cls = proc_cls;
3571   read_handle->socket = socket;
3572   socket->read_handle = read_handle;
3573   if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
3574                                           0))
3575     read_handle->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
3576                                                           socket);   
3577   read_handle->read_io_timeout_task_id =
3578       GNUNET_SCHEDULER_add_delayed (timeout, &read_io_timeout, socket);
3579   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: %s() END\n",
3580        GNUNET_i2s (&socket->other_peer), __func__);
3581   return read_handle;
3582 }
3583
3584
3585 /**
3586  * Cancel pending write operation.
3587  *
3588  * @param ioh handle to operation to cancel
3589  */
3590 void
3591 GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
3592 {
3593   struct GNUNET_STREAM_Socket *socket = ioh->socket;
3594   unsigned int packet;
3595
3596   GNUNET_assert (NULL != socket->write_handle);
3597   GNUNET_assert (socket->write_handle == ioh);
3598
3599   if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
3600   {
3601     GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
3602     socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
3603   }
3604
3605   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
3606   {
3607     if (NULL == ioh->messages[packet]) break;
3608     GNUNET_free (ioh->messages[packet]);
3609   }
3610       
3611   GNUNET_free (socket->write_handle);
3612   socket->write_handle = NULL;
3613 }
3614
3615
3616 /**
3617  * Cancel pending read operation.
3618  *
3619  * @param ioh handle to operation to cancel
3620  */
3621 void
3622 GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh)
3623 {
3624   struct GNUNET_STREAM_Socket *socket;
3625   
3626   socket = ioh->socket;
3627   GNUNET_assert (NULL != socket->read_handle);
3628   GNUNET_assert (ioh == socket->read_handle);
3629   /* Read io time task should be there; if it is already executed then this
3630   read handle is not valid; However upon scheduler shutdown the read io task
3631   may be executed before */
3632   if (GNUNET_SCHEDULER_NO_TASK != ioh->read_io_timeout_task_id)
3633     GNUNET_SCHEDULER_cancel (ioh->read_io_timeout_task_id);
3634   /* reading task may be present; if so we have to stop it */
3635   if (GNUNET_SCHEDULER_NO_TASK != ioh->read_task_id)
3636     GNUNET_SCHEDULER_cancel (ioh->read_task_id);
3637   GNUNET_free (ioh);
3638   socket->read_handle = NULL;
3639 }
3640
3641 /* end of stream_api.c */