-fix double-sending in stream if finish_cb behaves in a certain way
[oweals/gnunet.git] / src / stream / stream_api.c
1 /*
2   This file is part of GNUnet.
3   (C) 2012 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /* TODO:
22  *
23  * Checks for matching the sender and socket->other_peer in server
24  * message handlers  
25  *
26  * Add code for write io timeout
27  *
28  * Include retransmission for control messages
29  **/
30
31 /**
32  * @file stream/stream_api.c
33  * @brief Implementation of the stream library
34  * @author Sree Harsha Totakura
35  */
36
37 #include "platform.h"
38 #include "gnunet_common.h"
39 #include "gnunet_crypto_lib.h"
40 #include "gnunet_lockmanager_service.h"
41 #include "gnunet_statistics_service.h"
42 #include "gnunet_stream_lib.h"
43 #include "stream_protocol.h"
44
45 /**
46  * Generic logging shorthand
47  */
48 #define LOG(kind,...)                                   \
49   GNUNET_log_from (kind, "stream-api", __VA_ARGS__)
50
51 /**
52  * Debug logging shorthand
53  */
54 #define LOG_DEBUG(...)                          \
55   LOG (GNUNET_ERROR_TYPE_DEBUG, __VA_ARGS__)
56
57 /**
58  * Time in relative seconds shorthand
59  */
60 #define TIME_REL_SECS(sec) \
61   GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, sec)
62
63 /**
64  * The maximum packet size of a stream packet
65  */
66 #define DEFAULT_MAX_PAYLOAD_SIZE 64000
67
68 /**
69  * Receive buffer
70  */
71 #define RECEIVE_BUFFER_SIZE 4096000
72
73 /**
74  * states in the Protocol
75  */
76 enum State
77   {
78     /**
79      * Client initialization state
80      */
81     STATE_INIT,
82
83     /**
84      * Listener initialization state 
85      */
86     STATE_LISTEN,
87
88     /**
89      * Pre-connection establishment state
90      */
91     STATE_HELLO_WAIT,
92
93     /**
94      * State where a connection has been established
95      */
96     STATE_ESTABLISHED,
97
98     /**
99      * State where the socket is closed on our side and waiting to be ACK'ed
100      */
101     STATE_RECEIVE_CLOSE_WAIT,
102
103     /**
104      * State where the socket is closed for reading
105      */
106     STATE_RECEIVE_CLOSED,
107
108     /**
109      * State where the socket is closed on our side and waiting to be ACK'ed
110      */
111     STATE_TRANSMIT_CLOSE_WAIT,
112
113     /**
114      * State where the socket is closed for writing
115      */
116     STATE_TRANSMIT_CLOSED,
117
118     /**
119      * State where the socket is closed on our side and waiting to be ACK'ed
120      */
121     STATE_CLOSE_WAIT,
122
123     /**
124      * State where the socket is closed
125      */
126     STATE_CLOSED 
127   };
128
129
130 /**
131  * Functions of this type are called when a message is written
132  *
133  * @param cls the closure from queue_message
134  * @param socket the socket the written message was bound to
135  */
136 typedef void (*SendFinishCallback) (void *cls,
137                                     struct GNUNET_STREAM_Socket *socket);
138
139
140 /**
141  * The send message queue
142  */
143 struct MessageQueue
144 {
145   /**
146    * The message
147    */
148   struct GNUNET_STREAM_MessageHeader *message;
149
150   /**
151    * Callback to be called when the message is sent
152    */
153   SendFinishCallback finish_cb;
154
155   /**
156    * The closure for finish_cb
157    */
158   void *finish_cb_cls;
159
160   /**
161    * The next message in queue. Should be NULL in the last message
162    */
163   struct MessageQueue *next;
164
165   /**
166    * The next message in queue. Should be NULL in the first message
167    */
168   struct MessageQueue *prev;
169 };
170
171
172 /**
173  * The STREAM Socket Handler
174  */
175 struct GNUNET_STREAM_Socket
176 {
177   /**
178    * The mesh handle
179    */
180   struct GNUNET_MESH_Handle *mesh;
181
182   /**
183    * Handle to statistics
184    */
185   struct GNUNET_STATISTICS_Handle *stat_handle;
186
187   /**
188    * The mesh tunnel handle
189    */
190   struct GNUNET_MESH_Tunnel *tunnel;
191
192   /**
193    * Stream open closure
194    */
195   void *open_cls;
196
197   /**
198    * Stream open callback
199    */
200   GNUNET_STREAM_OpenCallback open_cb;
201
202   /**
203    * The current transmit handle (if a pending transmit request exists)
204    */
205   struct GNUNET_MESH_TransmitHandle *transmit_handle;
206
207   /**
208    * The current message associated with the transmit handle
209    */
210   struct MessageQueue *queue_head;
211
212   /**
213    * The queue tail, should always point to the last message in queue
214    */
215   struct MessageQueue *queue_tail;
216
217   /**
218    * The write IO_handle associated with this socket
219    */
220   struct GNUNET_STREAM_IOWriteHandle *write_handle;
221
222   /**
223    * The read IO_handle associated with this socket
224    */
225   struct GNUNET_STREAM_IOReadHandle *read_handle;
226
227   /**
228    * The shutdown handle associated with this socket
229    */
230   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
231
232   /**
233    * Buffer for storing received messages
234    */
235   void *receive_buffer;
236
237   /**
238    * The listen socket from which this socket is derived. Should be NULL if it
239    * is not a derived socket
240    */
241   struct GNUNET_STREAM_ListenSocket *lsocket;
242
243   /**
244    * The peer identity of the peer at the other end of the stream
245    */
246   struct GNUNET_PeerIdentity other_peer;
247
248   /**
249    * The Acknowledgement Bitmap
250    */
251   GNUNET_STREAM_AckBitmap ack_bitmap;
252
253   /**
254    * Task identifier for retransmission task after timeout
255    */
256   GNUNET_SCHEDULER_TaskIdentifier data_retransmission_task_id;
257
258   /**
259    * Task identifier for retransmission of control messages
260    */
261   GNUNET_SCHEDULER_TaskIdentifier control_retransmission_task_id;
262
263   /**
264    * The task for sending timely Acks
265    */
266   GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
267
268   /**
269    * Retransmission timeout
270    */
271   struct GNUNET_TIME_Relative retransmit_timeout;
272
273   /**
274    * Time when the Acknowledgement was queued
275    */
276   struct GNUNET_TIME_Absolute ack_time_registered;
277
278   /**
279    * Queued Acknowledgement deadline
280    */
281   struct GNUNET_TIME_Relative ack_time_deadline;
282
283   /**
284    * The state of the protocol associated with this socket
285    */
286   enum State state;
287
288   /**
289    * The status of the socket
290    */
291   enum GNUNET_STREAM_Status status;
292
293   /**
294    * The number of previous timeouts; FIXME: currently not used
295    */
296   unsigned int retries;
297
298   /**
299    * Whether testing mode is active or not
300    */
301   int testing_active;
302
303   /**
304    * Is receive closed
305    */
306   int receive_closed;
307
308   /**
309    * Is transmission closed
310    */
311   int transmit_closed;
312
313   /**
314    * The application port number (type: uint32_t)
315    */
316   GNUNET_MESH_ApplicationType app_port;
317
318   /**
319    * The write sequence number to be set incase of testing
320    */
321   uint32_t testing_set_write_sequence_number_value;
322
323   /**
324    * Write sequence number. Set to random when sending HELLO(client) and
325    * HELLO_ACK(server) 
326    */
327   uint32_t write_sequence_number;
328
329   /**
330    * Read sequence number. This number's value is determined during handshake
331    */
332   uint32_t read_sequence_number;
333
334   /**
335    * The receiver buffer size
336    */
337   uint32_t receive_buffer_size;
338
339   /**
340    * The receiver buffer boundaries
341    */
342   uint32_t receive_buffer_boundaries[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
343
344   /**
345    * receiver's available buffer after the last acknowledged packet
346    */
347   uint32_t receiver_window_available;
348
349   /**
350    * The offset pointer used during write operation
351    */
352   uint32_t write_offset;
353
354   /**
355    * The offset after which we are expecting data
356    */
357   uint32_t read_offset;
358
359   /**
360    * The offset upto which user has read from the received buffer
361    */
362   uint32_t copy_offset;
363
364   /**
365    * The maximum size of the data message payload this stream handle can send
366    */
367   uint16_t max_payload_size;
368 };
369
370
371 /**
372  * A socket for listening
373  */
374 struct GNUNET_STREAM_ListenSocket
375 {
376   /**
377    * The mesh handle
378    */
379   struct GNUNET_MESH_Handle *mesh;
380
381   /**
382    * Handle to statistics
383    */
384   struct GNUNET_STATISTICS_Handle *stat_handle;
385
386   /**
387    * Our configuration
388    */
389   struct GNUNET_CONFIGURATION_Handle *cfg;
390
391   /**
392    * Handle to the lock manager service
393    */
394   struct GNUNET_LOCKMANAGER_Handle *lockmanager;
395
396   /**
397    * The active LockingRequest from lockmanager
398    */
399   struct GNUNET_LOCKMANAGER_LockingRequest *locking_request;
400
401   /**
402    * Callback to call after acquring a lock and listening
403    */
404   GNUNET_STREAM_ListenSuccessCallback listen_ok_cb;
405
406   /**
407    * The callback function which is called after successful opening socket
408    */
409   GNUNET_STREAM_ListenCallback listen_cb;
410
411   /**
412    * The call back closure
413    */
414   void *listen_cb_cls;
415
416   /**
417    * The service port
418    */
419   GNUNET_MESH_ApplicationType port;
420   
421   /**
422    * The id of the lockmanager timeout task
423    */
424   GNUNET_SCHEDULER_TaskIdentifier lockmanager_acquire_timeout_task;
425
426   /**
427    * The retransmit timeout
428    */
429   struct GNUNET_TIME_Relative retransmit_timeout;
430   
431   /**
432    * Listen enabled?
433    */
434   int listening;
435
436   /**
437    * Whether testing mode is active or not
438    */
439   int testing_active;
440
441   /**
442    * The write sequence number to be set incase of testing
443    */
444   uint32_t testing_set_write_sequence_number_value;
445
446   /**
447    * The maximum size of the data message payload this stream handle can send
448    */
449   uint16_t max_payload_size;
450
451 };
452
453
454 /**
455  * The IO Write Handle
456  */
457 struct GNUNET_STREAM_IOWriteHandle
458 {
459   /**
460    * The socket to which this write handle is associated
461    */
462   struct GNUNET_STREAM_Socket *socket;
463
464   /**
465    * The packet_buffers associated with this Handle
466    */
467   struct GNUNET_STREAM_DataMessage *messages[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
468
469   /**
470    * The write continuation callback
471    */
472   GNUNET_STREAM_CompletionContinuation write_cont;
473
474   /**
475    * Write continuation closure
476    */
477   void *write_cont_cls;
478
479   /**
480    * The bitmap of this IOHandle; Corresponding bit for a message is set when
481    * it has been acknowledged by the receiver
482    */
483   GNUNET_STREAM_AckBitmap ack_bitmap;
484
485   /**
486    * Number of bytes in this write handle
487    */
488   size_t size;
489
490   /**
491    * Number of packets already transmitted from this IO handle. Retransmitted
492    * packets are not taken into account here. This is used to determine which
493    * packets account for retransmission and which packets occupy buffer space at
494    * the receiver.
495    */
496   unsigned int packets_sent;
497 };
498
499
500 /**
501  * The IO Read Handle
502  */
503 struct GNUNET_STREAM_IOReadHandle
504 {
505   /**
506    * The socket to which this read handle is associated
507    */
508   struct GNUNET_STREAM_Socket *socket;
509   
510   /**
511    * Callback for the read processor
512    */
513   GNUNET_STREAM_DataProcessor proc;
514
515   /**
516    * The closure pointer for the read processor callback
517    */
518   void *proc_cls;
519
520   /**
521    * Task identifier for the read io timeout task
522    */
523   GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id;
524
525   /**
526    * Task scheduled to continue a read operation.
527    */
528   GNUNET_SCHEDULER_TaskIdentifier read_task_id;
529 };
530
531
532 /**
533  * Handle for Shutdown
534  */
535 struct GNUNET_STREAM_ShutdownHandle
536 {
537   /**
538    * The socket associated with this shutdown handle
539    */
540   struct GNUNET_STREAM_Socket *socket;
541
542   /**
543    * Shutdown completion callback
544    */
545   GNUNET_STREAM_ShutdownCompletion completion_cb;
546
547   /**
548    * Closure for completion callback
549    */
550   void *completion_cls;
551
552   /**
553    * Close message retransmission task id
554    */
555   GNUNET_SCHEDULER_TaskIdentifier close_msg_retransmission_task_id;
556
557   /**
558    * Task scheduled to call the shutdown continuation callback
559    */
560   GNUNET_SCHEDULER_TaskIdentifier call_cont_task_id;
561
562   /**
563    * Which operation to shutdown? SHUT_RD, SHUT_WR or SHUT_RDWR
564    */
565   int operation;  
566 };
567
568
569 /**
570  * Default value in seconds for various timeouts
571  */
572 static const unsigned int default_timeout = 10;
573
574 /**
575  * The domain name for locks we use here
576  */
577 static const char *locking_domain = "GNUNET_STREAM_APPLOCK";
578
579
580 /**
581  * Callback function for sending queued message
582  *
583  * @param cls closure the socket
584  * @param size number of bytes available in buf
585  * @param buf where the callee should write the message
586  * @return number of bytes written to buf
587  */
588 static size_t
589 send_message_notify (void *cls, size_t size, void *buf)
590 {
591   struct GNUNET_STREAM_Socket *socket = cls;
592   struct MessageQueue *head;
593   size_t ret;
594
595   socket->transmit_handle = NULL; /* Remove the transmit handle */
596   head = socket->queue_head;
597   if (NULL == head)
598     return 0; /* just to be safe */
599   if (0 == size)                /* request timed out */
600   {
601     socket->retries++;
602     LOG (GNUNET_ERROR_TYPE_DEBUG,
603          "%s: Message sending timed out. Retry %d \n",
604          GNUNET_i2s (&socket->other_peer),
605          socket->retries);
606     socket->transmit_handle = 
607       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
608                                          GNUNET_NO, /* Corking */
609                                          /* FIXME: exponential backoff */
610                                          socket->retransmit_timeout,
611                                          &socket->other_peer,
612                                          ntohs (head->message->header.size),
613                                          &send_message_notify,
614                                          socket);
615     return 0;
616   }
617   ret = ntohs (head->message->header.size);
618   GNUNET_assert (size >= ret);
619   memcpy (buf, head->message, ret);
620   if (NULL != head->finish_cb)
621   {
622     head->finish_cb (head->finish_cb_cls, socket);
623   }
624   GNUNET_CONTAINER_DLL_remove (socket->queue_head,
625                                socket->queue_tail,
626                                head);
627   GNUNET_free (head->message);
628   GNUNET_free (head);
629   if (NULL != socket->transmit_handle)
630     return ret; /* 'finish_cb' might have triggered message already! */
631   head = socket->queue_head;
632   if (NULL != head)    /* more pending messages to send */
633   {
634     socket->retries = 0;
635     socket->transmit_handle = 
636       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
637                                          GNUNET_NO, /* Corking */
638                                          /* FIXME: exponential backoff */
639                                          socket->retransmit_timeout,
640                                          &socket->other_peer,
641                                          ntohs (head->message->header.size),
642                                          &send_message_notify,
643                                          socket);
644   }
645   return ret;
646 }
647
648
649 /**
650  * Queues a message for sending using the mesh connection of a socket
651  *
652  * @param socket the socket whose mesh connection is used
653  * @param message the message to be sent
654  * @param finish_cb the callback to be called when the message is sent
655  * @param finish_cb_cls the closure for the callback
656  * @param urgent set to GNUNET_YES to add the message to the beginning of the
657  *          queue; GNUNET_NO to add at the tail
658  */
659 static void
660 queue_message (struct GNUNET_STREAM_Socket *socket,
661                struct GNUNET_STREAM_MessageHeader *message,
662                SendFinishCallback finish_cb,
663                void *finish_cb_cls,
664                int urgent)
665 {
666   struct MessageQueue *queue_entity;
667
668   GNUNET_assert 
669     ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA)
670      && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK));
671   LOG (GNUNET_ERROR_TYPE_DEBUG,
672        "%s: Queueing message of type %d and size %d\n",
673        GNUNET_i2s (&socket->other_peer),
674        ntohs (message->header.type),
675        ntohs (message->header.size));
676   GNUNET_assert (NULL != message);
677   queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
678   queue_entity->message = message;
679   queue_entity->finish_cb = finish_cb;
680   queue_entity->finish_cb_cls = finish_cb_cls;
681   if (GNUNET_YES == urgent)
682   {
683     GNUNET_CONTAINER_DLL_insert (socket->queue_head, socket->queue_tail,
684                                  queue_entity);
685     if (NULL != socket->transmit_handle)
686     {
687       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
688       socket->transmit_handle = NULL;
689     }
690   }
691   else
692     GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
693                                       socket->queue_tail,
694                                       queue_entity);
695   if (NULL == socket->transmit_handle)
696   {
697     socket->retries = 0;
698     socket->transmit_handle = 
699         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
700                                            GNUNET_NO, /* Corking */
701                                            socket->retransmit_timeout,
702                                            &socket->other_peer,
703                                            ntohs (message->header.size),
704                                            &send_message_notify,
705                                            socket);
706   }
707 }
708
709
710 /**
711  * Copies a message and queues it for sending using the mesh connection of
712  * given socket 
713  *
714  * @param socket the socket whose mesh connection is used
715  * @param message the message to be sent
716  * @param finish_cb the callback to be called when the message is sent
717  * @param finish_cb_cls the closure for the callback
718  */
719 static void
720 copy_and_queue_message (struct GNUNET_STREAM_Socket *socket,
721                         const struct GNUNET_STREAM_MessageHeader *message,
722                         SendFinishCallback finish_cb,
723                         void *finish_cb_cls)
724 {
725   struct GNUNET_STREAM_MessageHeader *msg_copy;
726   uint16_t size;
727   
728   size = ntohs (message->header.size);
729   msg_copy = GNUNET_malloc (size);
730   memcpy (msg_copy, message, size);
731   queue_message (socket, msg_copy, finish_cb, finish_cb_cls, GNUNET_NO);
732 }
733
734
735 /**
736  * Writes data using the given socket. The amount of data written is limited by
737  * the receiver_window_size
738  *
739  * @param socket the socket to use
740  */
741 static void 
742 write_data (struct GNUNET_STREAM_Socket *socket);
743
744
745 /**
746  * Task for retransmitting data messages if they aren't ACK before their ack
747  * deadline 
748  *
749  * @param cls the socket
750  * @param tc the Task context
751  */
752 static void
753 data_retransmission_task (void *cls,
754                           const struct GNUNET_SCHEDULER_TaskContext *tc)
755 {
756   struct GNUNET_STREAM_Socket *socket = cls;
757   
758   socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
759   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
760     return;
761   LOG (GNUNET_ERROR_TYPE_DEBUG,
762        "%s: Retransmitting DATA...\n", GNUNET_i2s (&socket->other_peer));
763   write_data (socket);
764 }
765
766
767 /**
768  * Task for sending ACK message
769  *
770  * @param cls the socket
771  * @param tc the Task context
772  */
773 static void
774 ack_task (void *cls,
775           const struct GNUNET_SCHEDULER_TaskContext *tc)
776 {
777   struct GNUNET_STREAM_Socket *socket = cls;
778   struct GNUNET_STREAM_AckMessage *ack_msg;
779
780   socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
781   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
782     return;
783   /* Create the ACK Message */
784   ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
785   ack_msg->header.header.size = htons (sizeof (struct 
786                                                GNUNET_STREAM_AckMessage));
787   ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
788   ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
789   ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
790   ack_msg->receive_window_remaining = 
791     htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
792   /* Queue up ACK for immediate sending */
793   queue_message (socket, &ack_msg->header, NULL, NULL, GNUNET_YES);
794 }
795
796
797 /**
798  * Retransmission task for shutdown messages
799  *
800  * @param cls the shutdown handle
801  * @param tc the Task Context
802  */
803 static void
804 close_msg_retransmission_task (void *cls,
805                                const struct GNUNET_SCHEDULER_TaskContext *tc)
806 {
807   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle = cls;
808   struct GNUNET_STREAM_MessageHeader *msg;
809   struct GNUNET_STREAM_Socket *socket;
810
811   shutdown_handle->close_msg_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
812   GNUNET_assert (NULL != shutdown_handle);
813   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
814     return;
815   socket = shutdown_handle->socket;
816   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
817   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
818   switch (shutdown_handle->operation)
819   {
820   case SHUT_RDWR:
821     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
822     break;
823   case SHUT_RD:
824     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
825     break;
826   case SHUT_WR:
827     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
828     break;
829   default:
830     GNUNET_free (msg);
831     shutdown_handle->close_msg_retransmission_task_id = 
832       GNUNET_SCHEDULER_NO_TASK;
833     return;
834   }
835   queue_message (socket, msg, NULL, NULL, GNUNET_NO);
836   shutdown_handle->close_msg_retransmission_task_id =
837     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
838                                   &close_msg_retransmission_task,
839                                   shutdown_handle);
840 }
841
842
843 /**
844  * Function to modify a bit in GNUNET_STREAM_AckBitmap
845  *
846  * @param bitmap the bitmap to modify
847  * @param bit the bit number to modify
848  * @param value GNUNET_YES to on, GNUNET_NO to off
849  */
850 static void
851 ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
852                       unsigned int bit, 
853                       int value)
854 {
855   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
856   if (GNUNET_YES == value)
857     *bitmap |= (1LL << bit);
858   else
859     *bitmap &= ~(1LL << bit);
860 }
861
862
863 /**
864  * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
865  *
866  * @param bitmap address of the bitmap that has to be checked
867  * @param bit the bit number to check
868  * @return GNUNET_YES if the bit is set; GNUNET_NO if not
869  */
870 static uint8_t
871 ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
872                       unsigned int bit)
873 {
874   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
875   return 0 != (*bitmap & (1LL << bit));
876 }
877
878
879 /**
880  * Writes data using the given socket. The amount of data written is limited by
881  * the receiver_window_size
882  *
883  * @param socket the socket to use
884  */
885 static void 
886 write_data (struct GNUNET_STREAM_Socket *socket)
887 {
888   struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
889   unsigned int packet;
890   
891   for (packet=0; packet < io_handle->packets_sent; packet++)
892   {
893     if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
894                                            packet))
895     {
896       LOG (GNUNET_ERROR_TYPE_DEBUG,
897            "%s: Retransmitting DATA message with sequence %u\n",
898            GNUNET_i2s (&socket->other_peer),
899            ntohl (io_handle->messages[packet]->sequence_number));
900       copy_and_queue_message (socket,
901                               &io_handle->messages[packet]->header,
902                               NULL,
903                               NULL);
904     }
905   }
906   /* Now send new packets if there is enough buffer space */
907   while ((packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH) &&
908          (NULL != io_handle->messages[packet]) &&
909          (socket->receiver_window_available 
910           >= ntohs (io_handle->messages[packet]->header.header.size)))
911   {
912     socket->receiver_window_available -= 
913       ntohs (io_handle->messages[packet]->header.header.size);
914     LOG (GNUNET_ERROR_TYPE_DEBUG,
915          "%s: Placing DATA message with sequence %u in send queue\n",
916          GNUNET_i2s (&socket->other_peer),
917          ntohl (io_handle->messages[packet]->sequence_number));
918     copy_and_queue_message (socket,
919                             &io_handle->messages[packet]->header,
920                             NULL,
921                             NULL);
922     packet++;
923   }
924   io_handle->packets_sent = packet;
925   // FIXME: 8s is not good, should use GNUNET_TIME_STD_BACKOFF...
926   if (GNUNET_SCHEDULER_NO_TASK == socket->data_retransmission_task_id)
927     socket->data_retransmission_task_id = 
928       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 
929                                     (GNUNET_TIME_UNIT_SECONDS, 8),
930                                     &data_retransmission_task,
931                                     socket);
932 }
933
934
935 /**
936  * Task for calling the read processor
937  *
938  * @param cls the socket
939  * @param tc the task context
940  */
941 static void
942 call_read_processor (void *cls,
943                      const struct GNUNET_SCHEDULER_TaskContext *tc)
944 {
945   struct GNUNET_STREAM_Socket *socket = cls;
946   struct GNUNET_STREAM_IOReadHandle *read_handle;
947   size_t read_size;
948   size_t valid_read_size;
949   unsigned int packet;
950   uint32_t sequence_increase;
951   uint32_t offset_increase;
952
953   read_handle = socket->read_handle;
954   GNUNET_assert (NULL != read_handle);
955   read_handle->read_task_id = GNUNET_SCHEDULER_NO_TASK;
956   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
957     return;
958   if (NULL == socket->receive_buffer) 
959     return;
960   GNUNET_assert (NULL != socket->read_handle);
961   GNUNET_assert (NULL != socket->read_handle->proc);
962   /* Check the bitmap for any holes */
963   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
964   {
965     if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
966                                            packet))
967       break;
968   }
969   /* We only call read processor if we have the first packet */
970   GNUNET_assert (0 < packet);
971   valid_read_size = 
972     socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
973   GNUNET_assert (0 != valid_read_size);
974   /* Cancel the read_io_timeout_task */
975   GNUNET_SCHEDULER_cancel (read_handle->read_io_timeout_task_id);
976   read_handle->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
977   /* Call the data processor */
978   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Calling read processor\n",
979        GNUNET_i2s (&socket->other_peer));
980   read_size = 
981       socket->read_handle->proc (socket->read_handle->proc_cls,
982                                  socket->status,
983                                  socket->receive_buffer + socket->copy_offset,
984                                  valid_read_size);
985   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor read %d bytes\n",
986        GNUNET_i2s (&socket->other_peer), read_size);
987   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor completed successfully\n",
988        GNUNET_i2s (&socket->other_peer));
989   /* Free the read handle */
990   GNUNET_free (socket->read_handle);
991   socket->read_handle = NULL;
992   GNUNET_assert (read_size <= valid_read_size);
993   socket->copy_offset += read_size;
994   /* Determine upto which packet we can remove from the buffer */
995   for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
996   {
997     if (socket->copy_offset == socket->receive_buffer_boundaries[packet])
998     { 
999       packet++; 
1000       break;
1001     }
1002     if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
1003       break;
1004   }
1005   /* If no packets can be removed we can't move the buffer */
1006   if (0 == packet) 
1007     return;
1008   sequence_increase = packet;
1009   LOG (GNUNET_ERROR_TYPE_DEBUG,
1010        "%s: Sequence increase after read processor completion: %u\n",
1011        GNUNET_i2s (&socket->other_peer), sequence_increase);
1012   /* Shift the data in the receive buffer */
1013   socket->receive_buffer = 
1014     memmove (socket->receive_buffer,
1015              socket->receive_buffer 
1016              + socket->receive_buffer_boundaries[sequence_increase-1],
1017              socket->receive_buffer_size
1018              - socket->receive_buffer_boundaries[sequence_increase-1]);
1019   /* Shift the bitmap */
1020   socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
1021   /* Set read_sequence_number */
1022   socket->read_sequence_number += sequence_increase;
1023   /* Set read_offset */
1024   offset_increase = socket->receive_buffer_boundaries[sequence_increase-1];
1025   socket->read_offset += offset_increase;
1026   /* Fix copy_offset */
1027   GNUNET_assert (offset_increase <= socket->copy_offset);
1028   socket->copy_offset -= offset_increase;
1029   /* Fix relative boundaries */
1030   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
1031   {
1032     if (packet < (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase))
1033     {
1034       uint32_t ahead_buffer_boundary;
1035
1036       ahead_buffer_boundary = 
1037         socket->receive_buffer_boundaries[packet + sequence_increase];
1038       if (0 == ahead_buffer_boundary)
1039         socket->receive_buffer_boundaries[packet] = 0;
1040       else
1041       {
1042         GNUNET_assert (offset_increase < ahead_buffer_boundary);
1043         socket->receive_buffer_boundaries[packet] = 
1044           ahead_buffer_boundary - offset_increase;
1045       }
1046     }
1047     else
1048       socket->receive_buffer_boundaries[packet] = 0;
1049   }
1050 }
1051
1052
1053 /**
1054  * Cancels the existing read io handle
1055  *
1056  * @param cls the closure from the SCHEDULER call
1057  * @param tc the task context
1058  */
1059 static void
1060 read_io_timeout (void *cls,
1061                  const struct GNUNET_SCHEDULER_TaskContext *tc)
1062 {
1063   struct GNUNET_STREAM_Socket *socket = cls;
1064   struct GNUNET_STREAM_IOReadHandle *read_handle;
1065   GNUNET_STREAM_DataProcessor proc;
1066   void *proc_cls;
1067
1068   read_handle = socket->read_handle;
1069   GNUNET_assert (NULL != read_handle);
1070   read_handle->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
1071   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
1072     return;
1073   if (read_handle->read_task_id != GNUNET_SCHEDULER_NO_TASK)
1074   {
1075     LOG (GNUNET_ERROR_TYPE_DEBUG,
1076          "%s: Read task timedout - Cancelling it\n",
1077          GNUNET_i2s (&socket->other_peer));
1078     GNUNET_SCHEDULER_cancel (read_handle->read_task_id);
1079     read_handle->read_task_id = GNUNET_SCHEDULER_NO_TASK;
1080   }
1081   proc = read_handle->proc;
1082   proc_cls = read_handle->proc_cls;
1083   GNUNET_free (read_handle);
1084   socket->read_handle = NULL;
1085   /* Call the read processor to signal timeout */
1086   proc (proc_cls,
1087         GNUNET_STREAM_TIMEOUT,
1088         NULL,
1089         0);
1090 }
1091
1092
1093 /**
1094  * Handler for DATA messages; Same for both client and server
1095  *
1096  * @param socket the socket through which the ack was received
1097  * @param tunnel connection to the other end
1098  * @param sender who sent the message
1099  * @param msg the data message
1100  * @param atsi performance data for the connection
1101  * @return GNUNET_OK to keep the connection open,
1102  *         GNUNET_SYSERR to close it (signal serious error)
1103  */
1104 static int
1105 handle_data (struct GNUNET_STREAM_Socket *socket,
1106              struct GNUNET_MESH_Tunnel *tunnel,
1107              const struct GNUNET_PeerIdentity *sender,
1108              const struct GNUNET_STREAM_DataMessage *msg,
1109              const struct GNUNET_ATS_Information*atsi)
1110 {
1111   const void *payload;
1112   struct GNUNET_TIME_Relative ack_deadline_rel;
1113   uint32_t bytes_needed;
1114   uint32_t relative_offset;
1115   uint32_t relative_sequence_number;
1116   uint16_t size;
1117
1118   size = htons (msg->header.header.size);
1119   if (size < sizeof (struct GNUNET_STREAM_DataMessage))
1120   {
1121     GNUNET_break_op (0);
1122     return GNUNET_SYSERR;
1123   }
1124   if (0 != memcmp (sender, &socket->other_peer,
1125                    sizeof (struct GNUNET_PeerIdentity)))
1126   {
1127     LOG (GNUNET_ERROR_TYPE_DEBUG,
1128          "%s: Received DATA from non-confirming peer\n",
1129          GNUNET_i2s (&socket->other_peer));
1130     return GNUNET_YES;
1131   }
1132   switch (socket->state)
1133   {
1134   case STATE_ESTABLISHED:
1135   case STATE_TRANSMIT_CLOSED:
1136   case STATE_TRANSMIT_CLOSE_WAIT:      
1137     /* check if the message's sequence number is in the range we are
1138        expecting */
1139     relative_sequence_number = 
1140       ntohl (msg->sequence_number) - socket->read_sequence_number;
1141     if ( relative_sequence_number >= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
1142     {
1143       LOG (GNUNET_ERROR_TYPE_DEBUG,
1144            "%s: Ignoring received message with sequence number %u\n",
1145            GNUNET_i2s (&socket->other_peer),
1146            ntohl (msg->sequence_number));
1147       /* Start ACK sending task if one is not already present */
1148       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1149       {
1150         socket->ack_task_id = 
1151           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1152                                         (msg->ack_deadline),
1153                                         &ack_task,
1154                                         socket);
1155       }
1156       return GNUNET_YES;
1157     }      
1158     /* Check if we have already seen this message */
1159     if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
1160                                             relative_sequence_number))
1161     {
1162       LOG (GNUNET_ERROR_TYPE_DEBUG,
1163            "%s: Ignoring already received message with sequence number %u\n",
1164            GNUNET_i2s (&socket->other_peer),
1165            ntohl (msg->sequence_number));
1166       /* Start ACK sending task if one is not already present */
1167       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1168       {
1169         socket->ack_task_id = 
1170           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1171                                         (msg->ack_deadline), &ack_task, socket);
1172       }
1173       return GNUNET_YES;
1174     }
1175     LOG (GNUNET_ERROR_TYPE_DEBUG,
1176          "%s: Receiving DATA with sequence number: %u and size: %d from %s\n",
1177          GNUNET_i2s (&socket->other_peer), ntohl (msg->sequence_number),
1178          ntohs (msg->header.header.size), GNUNET_i2s (&socket->other_peer));
1179     /* Check if we have to allocate the buffer */
1180     size -= sizeof (struct GNUNET_STREAM_DataMessage);
1181     relative_offset = ntohl (msg->offset) - socket->read_offset;
1182     bytes_needed = relative_offset + size;
1183     if (bytes_needed > socket->receive_buffer_size)
1184     {
1185       if (bytes_needed <= RECEIVE_BUFFER_SIZE)
1186       {
1187         socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
1188                                                  bytes_needed);
1189         socket->receive_buffer_size = bytes_needed;
1190       }
1191       else
1192       {
1193         LOG (GNUNET_ERROR_TYPE_DEBUG,
1194              "%s: Cannot accommodate packet %d as buffer is full\n",
1195              GNUNET_i2s (&socket->other_peer), ntohl (msg->sequence_number));
1196         return GNUNET_YES;
1197       }
1198     }
1199     /* Copy Data to buffer */
1200     payload = &msg[1];
1201     GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
1202     memcpy (socket->receive_buffer + relative_offset, payload, size);
1203     socket->receive_buffer_boundaries[relative_sequence_number] = 
1204         relative_offset + size;
1205     /* Modify the ACK bitmap */
1206     ackbitmap_modify_bit (&socket->ack_bitmap, relative_sequence_number,
1207                           GNUNET_YES);
1208     /* Start ACK sending task if one is not already present */
1209     ack_deadline_rel = GNUNET_TIME_relative_ntoh (msg->ack_deadline);
1210     if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1211     {
1212       ack_deadline_rel = 
1213           GNUNET_TIME_relative_min (ack_deadline_rel,
1214                                     GNUNET_TIME_relative_multiply
1215                                     (GNUNET_TIME_UNIT_SECONDS, 300));
1216       socket->ack_task_id = 
1217           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh 
1218                                         (msg->ack_deadline), &ack_task, socket);
1219       socket->ack_time_registered = GNUNET_TIME_absolute_get ();
1220       socket->ack_time_deadline = ack_deadline_rel;
1221     }
1222     else
1223     {
1224       struct GNUNET_TIME_Relative ack_time_past;
1225       struct GNUNET_TIME_Relative ack_time_remaining;
1226       struct GNUNET_TIME_Relative ack_time_min;
1227       ack_time_past = 
1228           GNUNET_TIME_absolute_get_duration (socket->ack_time_registered);
1229       ack_time_remaining = GNUNET_TIME_relative_subtract
1230           (socket->ack_time_deadline, ack_time_past);
1231       ack_time_min = GNUNET_TIME_relative_min (ack_time_remaining,
1232                                                ack_deadline_rel);
1233       if (0 == memcmp(&ack_deadline_rel, &ack_time_min,
1234                       sizeof (struct GNUNET_TIME_Relative)))
1235       {
1236         ack_deadline_rel = ack_time_min;
1237         GNUNET_SCHEDULER_cancel (socket->ack_task_id);
1238         socket->ack_task_id = GNUNET_SCHEDULER_add_delayed (ack_deadline_rel,
1239                                                             &ack_task, socket);
1240         socket->ack_time_registered = GNUNET_TIME_absolute_get ();
1241         socket->ack_time_deadline = ack_deadline_rel;
1242       }
1243     }
1244     if ((NULL != socket->read_handle) /* A read handle is waiting */
1245         /* There is no current read task */
1246         && (GNUNET_SCHEDULER_NO_TASK == socket->read_handle->read_task_id)
1247         /* We have the first packet */
1248         && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap, 0)))
1249     {
1250       LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Scheduling read processor\n",
1251            GNUNET_i2s (&socket->other_peer));
1252       socket->read_handle->read_task_id =
1253           GNUNET_SCHEDULER_add_now (&call_read_processor, socket);
1254     }
1255     break;
1256   default:
1257     LOG (GNUNET_ERROR_TYPE_DEBUG,
1258          "%s: Received data message when it cannot be handled\n",
1259          GNUNET_i2s (&socket->other_peer));
1260     break;
1261   }
1262   return GNUNET_YES;
1263 }
1264
1265
1266 /**
1267  * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1268  *
1269  * @param cls the socket (set from GNUNET_MESH_connect)
1270  * @param tunnel connection to the other end
1271  * @param tunnel_ctx place to store local state associated with the tunnel
1272  * @param sender who sent the message
1273  * @param message the actual message
1274  * @param atsi performance data for the connection
1275  * @return GNUNET_OK to keep the connection open,
1276  *         GNUNET_SYSERR to close it (signal serious error)
1277  */
1278 static int
1279 client_handle_data (void *cls,
1280                     struct GNUNET_MESH_Tunnel *tunnel,
1281                     void **tunnel_ctx,
1282                     const struct GNUNET_PeerIdentity *sender,
1283                     const struct GNUNET_MessageHeader *message,
1284                     const struct GNUNET_ATS_Information*atsi)
1285 {
1286   struct GNUNET_STREAM_Socket *socket = cls;
1287
1288   return handle_data (socket, tunnel, sender, 
1289                       (const struct GNUNET_STREAM_DataMessage *) message, atsi);
1290 }
1291
1292
1293 /**
1294  * Callback to set state to ESTABLISHED
1295  *
1296  * @param cls the closure NULL;
1297  * @param socket the socket to requiring state change
1298  */
1299 static void
1300 set_state_established (void *cls,
1301                        struct GNUNET_STREAM_Socket *socket)
1302 {
1303   LOG (GNUNET_ERROR_TYPE_DEBUG, 
1304        "%s: Attaining ESTABLISHED state\n",
1305        GNUNET_i2s (&socket->other_peer));
1306   socket->write_offset = 0;
1307   socket->read_offset = 0;
1308   socket->state = STATE_ESTABLISHED;
1309   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK !=
1310                  socket->control_retransmission_task_id);
1311   GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
1312   socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
1313   if (NULL != socket->lsocket)
1314   {
1315     LOG (GNUNET_ERROR_TYPE_DEBUG,
1316          "%s: Calling listen callback\n",
1317          GNUNET_i2s (&socket->other_peer));
1318     if (GNUNET_SYSERR == 
1319         socket->lsocket->listen_cb (socket->lsocket->listen_cb_cls,
1320                                     socket,
1321                                     &socket->other_peer))
1322     {
1323       socket->state = STATE_CLOSED;
1324       /* FIXME: We should close in a decent way (send RST) */
1325       GNUNET_MESH_tunnel_destroy (socket->tunnel); /* Destroy the tunnel */
1326       GNUNET_free (socket);
1327     }
1328   }
1329   else
1330     socket->open_cb (socket->open_cls, socket);
1331 }
1332
1333
1334 /**
1335  * Callback to set state to HELLO_WAIT
1336  *
1337  * @param cls the closure from queue_message
1338  * @param socket the socket to requiring state change
1339  */
1340 static void
1341 set_state_hello_wait (void *cls,
1342                       struct GNUNET_STREAM_Socket *socket)
1343 {
1344   GNUNET_assert (STATE_INIT == socket->state);
1345   LOG (GNUNET_ERROR_TYPE_DEBUG,
1346        "%s: Attaining HELLO_WAIT state\n",
1347        GNUNET_i2s (&socket->other_peer));
1348   socket->state = STATE_HELLO_WAIT;
1349 }
1350
1351
1352 /**
1353  * Callback to set state to CLOSE_WAIT
1354  *
1355  * @param cls the closure from queue_message
1356  * @param socket the socket requiring state change
1357  */
1358 static void
1359 set_state_close_wait (void *cls,
1360                       struct GNUNET_STREAM_Socket *socket)
1361 {
1362   LOG (GNUNET_ERROR_TYPE_DEBUG,
1363        "%s: Attaing CLOSE_WAIT state\n",
1364        GNUNET_i2s (&socket->other_peer));
1365   socket->state = STATE_CLOSE_WAIT;
1366   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1367   socket->receive_buffer = NULL;
1368   socket->receive_buffer_size = 0;
1369 }
1370
1371
1372 /**
1373  * Callback to set state to RECEIVE_CLOSE_WAIT
1374  *
1375  * @param cls the closure from queue_message
1376  * @param socket the socket requiring state change
1377  */
1378 static void
1379 set_state_receive_close_wait (void *cls,
1380                               struct GNUNET_STREAM_Socket *socket)
1381 {
1382   LOG (GNUNET_ERROR_TYPE_DEBUG,
1383        "%s: Attaing RECEIVE_CLOSE_WAIT state\n",
1384        GNUNET_i2s (&socket->other_peer));
1385   socket->state = STATE_RECEIVE_CLOSE_WAIT;
1386   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1387   socket->receive_buffer = NULL;
1388   socket->receive_buffer_size = 0;
1389 }
1390
1391
1392 /**
1393  * Callback to set state to TRANSMIT_CLOSE_WAIT
1394  *
1395  * @param cls the closure from queue_message
1396  * @param socket the socket requiring state change
1397  */
1398 static void
1399 set_state_transmit_close_wait (void *cls,
1400                                struct GNUNET_STREAM_Socket *socket)
1401 {
1402   LOG (GNUNET_ERROR_TYPE_DEBUG,
1403        "%s: Attaing TRANSMIT_CLOSE_WAIT state\n",
1404        GNUNET_i2s (&socket->other_peer));
1405   socket->state = STATE_TRANSMIT_CLOSE_WAIT;
1406 }
1407
1408
1409 /**
1410  * Callback to set state to CLOSED
1411  *
1412  * @param cls the closure from queue_message
1413  * @param socket the socket requiring state change
1414  */
1415 static void
1416 set_state_closed (void *cls,
1417                   struct GNUNET_STREAM_Socket *socket)
1418 {
1419   socket->state = STATE_CLOSED;
1420 }
1421
1422
1423 /**
1424  * Returns GNUNET_MESSAGE_TYPE_STREAM_HELLO
1425  *
1426  * @return the generate hello message
1427  */
1428 static struct GNUNET_STREAM_MessageHeader *
1429 generate_hello (void)
1430 {
1431   struct GNUNET_STREAM_MessageHeader *msg;
1432
1433   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1434   msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
1435   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1436   return msg;
1437 }
1438
1439
1440 /**
1441  * Returns a new HelloAckMessage. Also sets the write sequence number for the
1442  * socket
1443  *
1444  * @param socket the socket for which this HelloAckMessage has to be generated
1445  * @param generate_seq GNUNET_YES to generate the write sequence number,
1446  *          GNUNET_NO to use the existing sequence number
1447  * @return the HelloAckMessage
1448  */
1449 static struct GNUNET_STREAM_HelloAckMessage *
1450 generate_hello_ack (struct GNUNET_STREAM_Socket *socket,
1451                     int generate_seq)
1452 {
1453   struct GNUNET_STREAM_HelloAckMessage *msg;
1454
1455   if (GNUNET_YES == generate_seq)
1456   {
1457     if (GNUNET_YES == socket->testing_active)
1458       socket->write_sequence_number =
1459         socket->testing_set_write_sequence_number_value;
1460     else
1461       socket->write_sequence_number = 
1462         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1463     LOG_DEBUG ("%s: write sequence number %u\n",
1464                GNUNET_i2s (&socket->other_peer),
1465                (unsigned int) socket->write_sequence_number);
1466   }
1467   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1468   msg->header.header.size = 
1469     htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1470   msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1471   msg->sequence_number = htonl (socket->write_sequence_number);
1472   msg->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
1473   return msg;
1474 }
1475
1476
1477 /**
1478  * Task for retransmitting control messages if they aren't ACK'ed before a
1479  * deadline
1480  *
1481  * @param cls the socket
1482  * @param tc the Task context
1483  */
1484 static void
1485 control_retransmission_task (void *cls,
1486                              const struct GNUNET_SCHEDULER_TaskContext *tc)
1487 {
1488   struct GNUNET_STREAM_Socket *socket = cls;
1489     
1490   socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
1491   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
1492     return;
1493   LOG_DEBUG ("%s: Retransmitting a control message\n",
1494                  GNUNET_i2s (&socket->other_peer));
1495   switch (socket->state)
1496   {
1497   case STATE_INIT:    
1498     GNUNET_break (0);
1499     break;
1500   case STATE_LISTEN:
1501     GNUNET_break (0);
1502     break;
1503   case STATE_HELLO_WAIT:
1504     if (NULL == socket->lsocket) /* We are client */
1505       queue_message (socket, generate_hello (), NULL, NULL, GNUNET_NO);
1506     else
1507       queue_message (socket,
1508                      (struct GNUNET_STREAM_MessageHeader *)
1509                      generate_hello_ack (socket, GNUNET_NO), NULL, NULL,
1510                      GNUNET_NO);
1511     socket->control_retransmission_task_id =
1512     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
1513                                   &control_retransmission_task, socket);
1514     break;
1515   case STATE_ESTABLISHED:
1516     if (NULL == socket->lsocket)
1517       queue_message (socket,
1518                      (struct GNUNET_STREAM_MessageHeader *)
1519                      generate_hello_ack (socket, GNUNET_NO), NULL, NULL,
1520                      GNUNET_NO);
1521     else
1522       GNUNET_break (0);
1523     break;
1524   default:
1525     GNUNET_break (0);
1526   }  
1527 }
1528
1529
1530 /**
1531  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1532  *
1533  * @param cls the socket (set from GNUNET_MESH_connect)
1534  * @param tunnel connection to the other end
1535  * @param tunnel_ctx this is NULL
1536  * @param sender who sent the message
1537  * @param message the actual message
1538  * @param atsi performance data for the connection
1539  * @return GNUNET_OK to keep the connection open,
1540  *         GNUNET_SYSERR to close it (signal serious error)
1541  */
1542 static int
1543 client_handle_hello_ack (void *cls,
1544                          struct GNUNET_MESH_Tunnel *tunnel,
1545                          void **tunnel_ctx,
1546                          const struct GNUNET_PeerIdentity *sender,
1547                          const struct GNUNET_MessageHeader *message,
1548                          const struct GNUNET_ATS_Information*atsi)
1549 {
1550   struct GNUNET_STREAM_Socket *socket = cls;
1551   const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
1552   struct GNUNET_STREAM_HelloAckMessage *reply;
1553
1554   if (0 != memcmp (sender, &socket->other_peer,
1555                    sizeof (struct GNUNET_PeerIdentity)))
1556   {
1557     LOG (GNUNET_ERROR_TYPE_DEBUG,
1558          "%s: Received HELLO_ACK from non-confirming peer\n",
1559          GNUNET_i2s (&socket->other_peer));
1560     return GNUNET_YES;
1561   }
1562   ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
1563   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received HELLO_ACK from %s\n",
1564        GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1565   GNUNET_assert (socket->tunnel == tunnel);
1566   switch (socket->state)
1567   {
1568   case STATE_HELLO_WAIT:
1569     socket->read_sequence_number = ntohl (ack_msg->sequence_number);
1570     LOG (GNUNET_ERROR_TYPE_DEBUG,
1571          "%s: Read sequence number %u\n",
1572          GNUNET_i2s (&socket->other_peer),
1573          (unsigned int) socket->read_sequence_number);
1574     socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
1575     reply = generate_hello_ack (socket, GNUNET_YES);
1576     queue_message (socket, &reply->header, &set_state_established,
1577                    NULL, GNUNET_NO);    
1578     return GNUNET_OK;
1579   case STATE_ESTABLISHED:
1580     // call statistics (# ACKs ignored++)
1581     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
1582                    socket->control_retransmission_task_id);
1583     socket->control_retransmission_task_id =
1584       GNUNET_SCHEDULER_add_now (&control_retransmission_task, socket);
1585     return GNUNET_OK;
1586   default:
1587     LOG_DEBUG ("%s: Server %s sent HELLO_ACK when in state %d\n", 
1588                GNUNET_i2s (&socket->other_peer),
1589                GNUNET_i2s (&socket->other_peer), socket->state);
1590     socket->state = STATE_CLOSED; // introduce STATE_ERROR?
1591     return GNUNET_SYSERR;
1592   }
1593 }
1594
1595
1596 /**
1597  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1598  *
1599  * @param cls the socket (set from GNUNET_MESH_connect)
1600  * @param tunnel connection to the other end
1601  * @param tunnel_ctx this is NULL
1602  * @param sender who sent the message
1603  * @param message the actual message
1604  * @param atsi performance data for the connection
1605  * @return GNUNET_OK to keep the connection open,
1606  *         GNUNET_SYSERR to close it (signal serious error)
1607  */
1608 static int
1609 client_handle_reset (void *cls,
1610                      struct GNUNET_MESH_Tunnel *tunnel,
1611                      void **tunnel_ctx,
1612                      const struct GNUNET_PeerIdentity *sender,
1613                      const struct GNUNET_MessageHeader *message,
1614                      const struct GNUNET_ATS_Information*atsi)
1615 {
1616   // struct GNUNET_STREAM_Socket *socket = cls;
1617
1618   return GNUNET_OK;
1619 }
1620
1621
1622 /**
1623  * Common message handler for handling TRANSMIT_CLOSE messages
1624  *
1625  * @param socket the socket through which the ack was received
1626  * @param tunnel connection to the other end
1627  * @param sender who sent the message
1628  * @param msg the transmit close message
1629  * @param atsi performance data for the connection
1630  * @return GNUNET_OK to keep the connection open,
1631  *         GNUNET_SYSERR to close it (signal serious error)
1632  */
1633 static int
1634 handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
1635                        struct GNUNET_MESH_Tunnel *tunnel,
1636                        const struct GNUNET_PeerIdentity *sender,
1637                        const struct GNUNET_STREAM_MessageHeader *msg,
1638                        const struct GNUNET_ATS_Information*atsi)
1639 {
1640   struct GNUNET_STREAM_MessageHeader *reply;
1641
1642   switch (socket->state)
1643   {
1644   case STATE_INIT:
1645   case STATE_LISTEN:
1646   case STATE_HELLO_WAIT:
1647     LOG (GNUNET_ERROR_TYPE_DEBUG,
1648          "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
1649          GNUNET_i2s (&socket->other_peer));
1650     return GNUNET_OK;
1651   default:
1652     break;
1653   }
1654   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received TRANSMIT_CLOSE from %s\n",
1655        GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1656   socket->receive_closed = GNUNET_YES;
1657   if (GNUNET_YES == socket->transmit_closed)
1658     socket->state = STATE_CLOSED;
1659   else
1660     socket->state = STATE_RECEIVE_CLOSED;
1661   /* Send TRANSMIT_CLOSE_ACK */
1662   reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1663   reply->header.type = 
1664       htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
1665   reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1666   queue_message (socket, reply, NULL, NULL, GNUNET_NO);
1667   return GNUNET_OK;
1668 }
1669
1670
1671 /**
1672  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1673  *
1674  * @param cls the socket (set from GNUNET_MESH_connect)
1675  * @param tunnel connection to the other end
1676  * @param tunnel_ctx this is NULL
1677  * @param sender who sent the message
1678  * @param message the actual message
1679  * @param atsi performance data for the connection
1680  * @return GNUNET_OK to keep the connection open,
1681  *         GNUNET_SYSERR to close it (signal serious error)
1682  */
1683 static int
1684 client_handle_transmit_close (void *cls,
1685                               struct GNUNET_MESH_Tunnel *tunnel,
1686                               void **tunnel_ctx,
1687                               const struct GNUNET_PeerIdentity *sender,
1688                               const struct GNUNET_MessageHeader *message,
1689                               const struct GNUNET_ATS_Information*atsi)
1690 {
1691   struct GNUNET_STREAM_Socket *socket = cls;
1692   
1693   return handle_transmit_close (socket,
1694                                 tunnel,
1695                                 sender,
1696                                 (struct GNUNET_STREAM_MessageHeader *)message,
1697                                 atsi);
1698 }
1699
1700
1701 /**
1702  * Task for calling the shutdown continuation callback
1703  *
1704  * @param cls the socket
1705  * @param tc the scheduler task context
1706  */
1707 static void
1708 call_cont_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1709 {
1710   struct GNUNET_STREAM_Socket *socket = cls;
1711   
1712   GNUNET_assert (NULL != socket->shutdown_handle);
1713   socket->shutdown_handle->call_cont_task_id = GNUNET_SCHEDULER_NO_TASK;
1714   if (NULL != socket->shutdown_handle->completion_cb)
1715     socket->shutdown_handle->completion_cb
1716         (socket->shutdown_handle->completion_cls,
1717          socket->shutdown_handle->operation);
1718   GNUNET_free (socket->shutdown_handle);
1719   socket->shutdown_handle = NULL;
1720 }
1721
1722
1723 /**
1724  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_*_CLOSE_ACK messages
1725  *
1726  * @param socket the socket
1727  * @param tunnel connection to the other end
1728  * @param sender who sent the message
1729  * @param message the actual message
1730  * @param atsi performance data for the connection
1731  * @param operation the close operation which is being ACK'ed
1732  * @return GNUNET_OK to keep the connection open,
1733  *         GNUNET_SYSERR to close it (signal serious error)
1734  */
1735 static int
1736 handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
1737                           struct GNUNET_MESH_Tunnel *tunnel,
1738                           const struct GNUNET_PeerIdentity *sender,
1739                           const struct GNUNET_STREAM_MessageHeader *message,
1740                           const struct GNUNET_ATS_Information *atsi,
1741                           int operation)
1742 {
1743   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
1744
1745   shutdown_handle = socket->shutdown_handle;
1746   if (NULL == shutdown_handle)
1747   {
1748     /* This may happen when the shudown handle is cancelled */
1749     LOG (GNUNET_ERROR_TYPE_DEBUG,
1750          "%s: Received CLOSE_ACK when shutdown handle is NULL\n",
1751          GNUNET_i2s (&socket->other_peer));
1752     return GNUNET_OK;
1753   }
1754   switch (operation)
1755   {
1756   case SHUT_RDWR:
1757     switch (socket->state)
1758     {
1759     case STATE_CLOSE_WAIT:
1760       if (SHUT_RDWR != shutdown_handle->operation)
1761       {
1762         LOG (GNUNET_ERROR_TYPE_DEBUG,
1763              "%s: Received CLOSE_ACK when shutdown handle is not for "
1764              "SHUT_RDWR\n", GNUNET_i2s (&socket->other_peer));
1765         return GNUNET_OK;
1766       }
1767       LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received CLOSE_ACK from %s\n",
1768            GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1769       socket->state = STATE_CLOSED;
1770       break;
1771     default:
1772       LOG (GNUNET_ERROR_TYPE_DEBUG,
1773            "%s: Received CLOSE_ACK when it is not expected\n",
1774            GNUNET_i2s (&socket->other_peer));
1775       return GNUNET_OK;
1776     }
1777     break;
1778   case SHUT_RD:
1779     switch (socket->state)
1780     {
1781     case STATE_RECEIVE_CLOSE_WAIT:
1782       if (SHUT_RD != shutdown_handle->operation)
1783       {
1784         LOG (GNUNET_ERROR_TYPE_DEBUG,
1785              "%s: Received RECEIVE_CLOSE_ACK when shutdown handle "
1786              "is not for SHUT_RD\n", GNUNET_i2s (&socket->other_peer));
1787         return GNUNET_OK;
1788       }
1789       LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received RECEIVE_CLOSE_ACK from %s\n",
1790            GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1791       socket->state = STATE_RECEIVE_CLOSED;
1792       break;
1793     default:
1794       LOG (GNUNET_ERROR_TYPE_DEBUG,
1795            "%s: Received RECEIVE_CLOSE_ACK when it is not expected\n",
1796            GNUNET_i2s (&socket->other_peer));
1797       return GNUNET_OK;
1798     }
1799     break;
1800   case SHUT_WR:
1801     switch (socket->state)
1802     {
1803     case STATE_TRANSMIT_CLOSE_WAIT:
1804       if (SHUT_WR != shutdown_handle->operation)
1805       {
1806         LOG (GNUNET_ERROR_TYPE_DEBUG,
1807              "%s: Received TRANSMIT_CLOSE_ACK when shutdown handle "
1808              "is not for SHUT_WR\n",
1809              GNUNET_i2s (&socket->other_peer));
1810         return GNUNET_OK;
1811       }
1812       LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received TRANSMIT_CLOSE_ACK from %s\n",
1813            GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1814       socket->state = STATE_TRANSMIT_CLOSED;
1815       break;
1816     default:
1817       LOG (GNUNET_ERROR_TYPE_DEBUG,
1818            "%s: Received TRANSMIT_CLOSE_ACK when it is not expected\n",
1819            GNUNET_i2s (&socket->other_peer));          
1820       return GNUNET_OK;
1821     }
1822     break;
1823   default:
1824     GNUNET_assert (0);
1825   }
1826   shutdown_handle->call_cont_task_id = GNUNET_SCHEDULER_add_now
1827       (&call_cont_task, socket);
1828   if (GNUNET_SCHEDULER_NO_TASK
1829       != shutdown_handle->close_msg_retransmission_task_id)
1830   {
1831     GNUNET_SCHEDULER_cancel
1832       (shutdown_handle->close_msg_retransmission_task_id);
1833     shutdown_handle->close_msg_retransmission_task_id =
1834         GNUNET_SCHEDULER_NO_TASK;
1835   }
1836   return GNUNET_OK;
1837 }
1838
1839
1840 /**
1841  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1842  *
1843  * @param cls the socket (set from GNUNET_MESH_connect)
1844  * @param tunnel connection to the other end
1845  * @param tunnel_ctx this is NULL
1846  * @param sender who sent the message
1847  * @param message the actual message
1848  * @param atsi performance data for the connection
1849  * @return GNUNET_OK to keep the connection open,
1850  *         GNUNET_SYSERR to close it (signal serious error)
1851  */
1852 static int
1853 client_handle_transmit_close_ack (void *cls,
1854                                   struct GNUNET_MESH_Tunnel *tunnel,
1855                                   void **tunnel_ctx,
1856                                   const struct GNUNET_PeerIdentity *sender,
1857                                   const struct GNUNET_MessageHeader *message,
1858                                   const struct GNUNET_ATS_Information*atsi)
1859 {
1860   struct GNUNET_STREAM_Socket *socket = cls;
1861
1862   return handle_generic_close_ack (socket,
1863                                    tunnel,
1864                                    sender,
1865                                    (const struct GNUNET_STREAM_MessageHeader *)
1866                                    message,
1867                                    atsi,
1868                                    SHUT_WR);
1869 }
1870
1871
1872 /**
1873  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1874  *
1875  * @param socket the socket
1876  * @param tunnel connection to the other end
1877  * @param sender who sent the message
1878  * @param message the actual message
1879  * @param atsi performance data for the connection
1880  * @return GNUNET_OK to keep the connection open,
1881  *         GNUNET_SYSERR to close it (signal serious error)
1882  */
1883 static int
1884 handle_receive_close (struct GNUNET_STREAM_Socket *socket,
1885                       struct GNUNET_MESH_Tunnel *tunnel,
1886                       const struct GNUNET_PeerIdentity *sender,
1887                       const struct GNUNET_STREAM_MessageHeader *message,
1888                       const struct GNUNET_ATS_Information *atsi)
1889 {
1890   struct GNUNET_STREAM_MessageHeader *receive_close_ack;
1891
1892   switch (socket->state)
1893   {
1894   case STATE_INIT:
1895   case STATE_LISTEN:
1896   case STATE_HELLO_WAIT:
1897     LOG (GNUNET_ERROR_TYPE_DEBUG,
1898          "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
1899          GNUNET_i2s (&socket->other_peer));
1900     return GNUNET_OK;
1901   default:
1902     break;
1903   }  
1904   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received RECEIVE_CLOSE from %s\n",
1905        GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1906   socket->transmit_closed = GNUNET_YES;
1907   if (GNUNET_YES == socket->receive_closed)
1908     socket->state = STATE_CLOSED;
1909   else
1910     socket->state = STATE_TRANSMIT_CLOSED;
1911   receive_close_ack =
1912     GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1913   receive_close_ack->header.size =
1914     htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1915   receive_close_ack->header.type =
1916     htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK);
1917   queue_message (socket, receive_close_ack, NULL, NULL, GNUNET_NO);
1918   /* If write handle is present call it with GNUNET_STREAM_SHUTDOWN to signal
1919      that that stream has been shutdown */
1920   if (NULL != socket->write_handle)
1921   {
1922     GNUNET_STREAM_CompletionContinuation wc;
1923     void *wc_cls;
1924
1925     wc = socket->write_handle->write_cont;
1926     wc_cls = socket->write_handle->write_cont_cls;
1927     GNUNET_STREAM_io_write_cancel (socket->write_handle);
1928     socket->write_handle = NULL;
1929     if (NULL != wc)
1930       wc (wc_cls,
1931           GNUNET_STREAM_SHUTDOWN, 0);
1932   }
1933   return GNUNET_OK;
1934 }
1935
1936
1937 /**
1938  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1939  *
1940  * @param cls the socket (set from GNUNET_MESH_connect)
1941  * @param tunnel connection to the other end
1942  * @param tunnel_ctx this is NULL
1943  * @param sender who sent the message
1944  * @param message the actual message
1945  * @param atsi performance data for the connection
1946  * @return GNUNET_OK to keep the connection open,
1947  *         GNUNET_SYSERR to close it (signal serious error)
1948  */
1949 static int
1950 client_handle_receive_close (void *cls,
1951                              struct GNUNET_MESH_Tunnel *tunnel,
1952                              void **tunnel_ctx,
1953                              const struct GNUNET_PeerIdentity *sender,
1954                              const struct GNUNET_MessageHeader *message,
1955                              const struct GNUNET_ATS_Information*atsi)
1956 {
1957   struct GNUNET_STREAM_Socket *socket = cls;
1958
1959   return
1960     handle_receive_close (socket,
1961                           tunnel,
1962                           sender,
1963                           (const struct GNUNET_STREAM_MessageHeader *) message,
1964                           atsi);
1965 }
1966
1967
1968 /**
1969  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1970  *
1971  * @param cls the socket (set from GNUNET_MESH_connect)
1972  * @param tunnel connection to the other end
1973  * @param tunnel_ctx this is NULL
1974  * @param sender who sent the message
1975  * @param message the actual message
1976  * @param atsi performance data for the connection
1977  * @return GNUNET_OK to keep the connection open,
1978  *         GNUNET_SYSERR to close it (signal serious error)
1979  */
1980 static int
1981 client_handle_receive_close_ack (void *cls,
1982                                  struct GNUNET_MESH_Tunnel *tunnel,
1983                                  void **tunnel_ctx,
1984                                  const struct GNUNET_PeerIdentity *sender,
1985                                  const struct GNUNET_MessageHeader *message,
1986                                  const struct GNUNET_ATS_Information*atsi)
1987 {
1988   struct GNUNET_STREAM_Socket *socket = cls;
1989
1990   return handle_generic_close_ack (socket,
1991                                    tunnel,
1992                                    sender,
1993                                    (const struct GNUNET_STREAM_MessageHeader *)
1994                                    message,
1995                                    atsi,
1996                                    SHUT_RD);
1997 }
1998
1999
2000 /**
2001  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
2002  *
2003  * @param socket the socket
2004  * @param tunnel connection to the other end
2005  * @param sender who sent the message
2006  * @param message the actual message
2007  * @param atsi performance data for the connection
2008  * @return GNUNET_OK to keep the connection open,
2009  *         GNUNET_SYSERR to close it (signal serious error)
2010  */
2011 static int
2012 handle_close (struct GNUNET_STREAM_Socket *socket,
2013               struct GNUNET_MESH_Tunnel *tunnel,
2014               const struct GNUNET_PeerIdentity *sender,
2015               const struct GNUNET_STREAM_MessageHeader *message,
2016               const struct GNUNET_ATS_Information*atsi)
2017 {
2018   struct GNUNET_STREAM_MessageHeader *close_ack;
2019
2020   switch (socket->state)
2021   {
2022   case STATE_INIT:
2023   case STATE_LISTEN:
2024   case STATE_HELLO_WAIT:
2025     LOG (GNUNET_ERROR_TYPE_DEBUG,
2026          "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
2027          GNUNET_i2s (&socket->other_peer));
2028     return GNUNET_OK;
2029   default:
2030     break;
2031   }
2032   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received CLOSE from %s\n",
2033        GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
2034   close_ack = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
2035   close_ack->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
2036   close_ack->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK);
2037   queue_message (socket, close_ack, &set_state_closed, NULL, GNUNET_NO);
2038   if (STATE_CLOSED == socket->state)
2039     return GNUNET_OK;
2040   socket->receive_closed = GNUNET_YES;
2041   socket->transmit_closed = GNUNET_YES;
2042   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
2043   socket->receive_buffer = NULL;
2044   socket->receive_buffer_size = 0;  
2045   /* If write handle is present call it with GNUNET_STREAM_SHUTDOWN to signal
2046      that that stream has been shutdown */
2047   if (NULL != socket->write_handle)
2048   {
2049     GNUNET_STREAM_CompletionContinuation wc;
2050     void *wc_cls;
2051
2052     wc = socket->write_handle->write_cont;
2053     wc_cls = socket->write_handle->write_cont_cls;
2054     GNUNET_STREAM_io_write_cancel (socket->write_handle);
2055     socket->write_handle = NULL;
2056     if (NULL != wc)
2057       wc (wc_cls,
2058           GNUNET_STREAM_SHUTDOWN, 0);
2059   }
2060   return GNUNET_OK;
2061 }
2062
2063
2064 /**
2065  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
2066  *
2067  * @param cls the socket (set from GNUNET_MESH_connect)
2068  * @param tunnel connection to the other end
2069  * @param tunnel_ctx this is NULL
2070  * @param sender who sent the message
2071  * @param message the actual message
2072  * @param atsi performance data for the connection
2073  * @return GNUNET_OK to keep the connection open,
2074  *         GNUNET_SYSERR to close it (signal serious error)
2075  */
2076 static int
2077 client_handle_close (void *cls,
2078                      struct GNUNET_MESH_Tunnel *tunnel,
2079                      void **tunnel_ctx,
2080                      const struct GNUNET_PeerIdentity *sender,
2081                      const struct GNUNET_MessageHeader *message,
2082                      const struct GNUNET_ATS_Information*atsi)
2083 {
2084   struct GNUNET_STREAM_Socket *socket = cls;
2085
2086   return handle_close (socket,
2087                        tunnel,
2088                        sender,
2089                        (const struct GNUNET_STREAM_MessageHeader *) message,
2090                        atsi);
2091 }
2092
2093
2094 /**
2095  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
2096  *
2097  * @param cls the socket (set from GNUNET_MESH_connect)
2098  * @param tunnel connection to the other end
2099  * @param tunnel_ctx this is NULL
2100  * @param sender who sent the message
2101  * @param message the actual message
2102  * @param atsi performance data for the connection
2103  * @return GNUNET_OK to keep the connection open,
2104  *         GNUNET_SYSERR to close it (signal serious error)
2105  */
2106 static int
2107 client_handle_close_ack (void *cls,
2108                          struct GNUNET_MESH_Tunnel *tunnel,
2109                          void **tunnel_ctx,
2110                          const struct GNUNET_PeerIdentity *sender,
2111                          const struct GNUNET_MessageHeader *message,
2112                          const struct GNUNET_ATS_Information *atsi)
2113 {
2114   struct GNUNET_STREAM_Socket *socket = cls;
2115
2116   return handle_generic_close_ack (socket,
2117                                    tunnel,
2118                                    sender,
2119                                    (const struct GNUNET_STREAM_MessageHeader *) 
2120                                    message,
2121                                    atsi,
2122                                    SHUT_RDWR);
2123 }
2124
2125 /*****************************/
2126 /* Server's Message Handlers */
2127 /*****************************/
2128
2129 /**
2130  * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
2131  *
2132  * @param cls the closure
2133  * @param tunnel connection to the other end
2134  * @param tunnel_ctx the socket
2135  * @param sender who sent the message
2136  * @param message the actual message
2137  * @param atsi performance data for the connection
2138  * @return GNUNET_OK to keep the connection open,
2139  *         GNUNET_SYSERR to close it (signal serious error)
2140  */
2141 static int
2142 server_handle_data (void *cls,
2143                     struct GNUNET_MESH_Tunnel *tunnel,
2144                     void **tunnel_ctx,
2145                     const struct GNUNET_PeerIdentity *sender,
2146                     const struct GNUNET_MessageHeader *message,
2147                     const struct GNUNET_ATS_Information*atsi)
2148 {
2149   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2150
2151   return handle_data (socket,
2152                       tunnel,
2153                       sender,
2154                       (const struct GNUNET_STREAM_DataMessage *)message,
2155                       atsi);
2156 }
2157
2158
2159 /**
2160  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
2161  *
2162  * @param cls the closure
2163  * @param tunnel connection to the other end
2164  * @param tunnel_ctx the socket
2165  * @param sender who sent the message
2166  * @param message the actual message
2167  * @param atsi performance data for the connection
2168  * @return GNUNET_OK to keep the connection open,
2169  *         GNUNET_SYSERR to close it (signal serious error)
2170  */
2171 static int
2172 server_handle_hello (void *cls,
2173                      struct GNUNET_MESH_Tunnel *tunnel,
2174                      void **tunnel_ctx,
2175                      const struct GNUNET_PeerIdentity *sender,
2176                      const struct GNUNET_MessageHeader *message,
2177                      const struct GNUNET_ATS_Information*atsi)
2178 {
2179   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2180   struct GNUNET_STREAM_HelloAckMessage *reply;
2181
2182   if (0 != memcmp (sender,
2183                    &socket->other_peer,
2184                    sizeof (struct GNUNET_PeerIdentity)))
2185   {
2186     LOG_DEBUG ("%s: Received HELLO from non-confirming peer\n",
2187                GNUNET_i2s (&socket->other_peer));
2188     return GNUNET_YES;
2189   }
2190   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == ntohs (message->type));
2191   GNUNET_assert (socket->tunnel == tunnel);
2192   LOG_DEBUG ("%s: Received HELLO from %s\n", GNUNET_i2s (&socket->other_peer),
2193              GNUNET_i2s (&socket->other_peer));
2194   switch (socket->state)
2195   {
2196   case STATE_INIT:
2197     reply = generate_hello_ack (socket, GNUNET_YES);
2198     queue_message (socket, &reply->header, &set_state_hello_wait, NULL,
2199                    GNUNET_NO);
2200     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
2201                    socket->control_retransmission_task_id);
2202     socket->control_retransmission_task_id =
2203       GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
2204                                     &control_retransmission_task, socket);
2205     break;
2206   case STATE_HELLO_WAIT:
2207     /* Perhaps our HELLO_ACK was lost */
2208     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != 
2209                    socket->control_retransmission_task_id);
2210     GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
2211     socket->control_retransmission_task_id =
2212       GNUNET_SCHEDULER_add_now (&control_retransmission_task, socket);
2213     break;
2214   default:
2215     LOG_DEBUG( "%s: Client sent HELLO when in state %d\n",
2216                GNUNET_i2s (&socket->other_peer), socket->state);
2217     /* FIXME: Send RESET? */
2218   }
2219   return GNUNET_OK;
2220 }
2221
2222
2223 /**
2224  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
2225  *
2226  * @param cls the closure
2227  * @param tunnel connection to the other end
2228  * @param tunnel_ctx the socket
2229  * @param sender who sent the message
2230  * @param message the actual message
2231  * @param atsi performance data for the connection
2232  * @return GNUNET_OK to keep the connection open,
2233  *         GNUNET_SYSERR to close it (signal serious error)
2234  */
2235 static int
2236 server_handle_hello_ack (void *cls,
2237                          struct GNUNET_MESH_Tunnel *tunnel,
2238                          void **tunnel_ctx,
2239                          const struct GNUNET_PeerIdentity *sender,
2240                          const struct GNUNET_MessageHeader *message,
2241                          const struct GNUNET_ATS_Information*atsi)
2242 {
2243   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2244   const struct GNUNET_STREAM_HelloAckMessage *ack_message;
2245
2246   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK ==
2247                  ntohs (message->type));
2248   GNUNET_assert (socket->tunnel == tunnel);
2249   ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
2250   switch (socket->state)  
2251   {
2252   case STATE_HELLO_WAIT:
2253     LOG (GNUNET_ERROR_TYPE_DEBUG,
2254          "%s: Received HELLO_ACK from %s\n",
2255          GNUNET_i2s (&socket->other_peer),
2256          GNUNET_i2s (&socket->other_peer));
2257     socket->read_sequence_number = ntohl (ack_message->sequence_number);
2258     LOG (GNUNET_ERROR_TYPE_DEBUG,
2259          "%s: Read sequence number %u\n",
2260          GNUNET_i2s (&socket->other_peer),
2261          (unsigned int) socket->read_sequence_number);
2262     socket->receiver_window_available = 
2263       ntohl (ack_message->receiver_window_size);
2264     set_state_established (NULL, socket);
2265     break;
2266   default:
2267     LOG (GNUNET_ERROR_TYPE_DEBUG,
2268          "Client sent HELLO_ACK when in state %d\n", socket->state);    
2269   }
2270   return GNUNET_OK;
2271 }
2272
2273
2274 /**
2275  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
2276  *
2277  * @param cls the closure
2278  * @param tunnel connection to the other end
2279  * @param tunnel_ctx the socket
2280  * @param sender who sent the message
2281  * @param message the actual message
2282  * @param atsi performance data for the connection
2283  * @return GNUNET_OK to keep the connection open,
2284  *         GNUNET_SYSERR to close it (signal serious error)
2285  */
2286 static int
2287 server_handle_reset (void *cls,
2288                      struct GNUNET_MESH_Tunnel *tunnel,
2289                      void **tunnel_ctx,
2290                      const struct GNUNET_PeerIdentity *sender,
2291                      const struct GNUNET_MessageHeader *message,
2292                      const struct GNUNET_ATS_Information*atsi)
2293 {
2294   // struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2295
2296   return GNUNET_OK;
2297 }
2298
2299
2300 /**
2301  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
2302  *
2303  * @param cls the closure
2304  * @param tunnel connection to the other end
2305  * @param tunnel_ctx the socket
2306  * @param sender who sent the message
2307  * @param message the actual message
2308  * @param atsi performance data for the connection
2309  * @return GNUNET_OK to keep the connection open,
2310  *         GNUNET_SYSERR to close it (signal serious error)
2311  */
2312 static int
2313 server_handle_transmit_close (void *cls,
2314                               struct GNUNET_MESH_Tunnel *tunnel,
2315                               void **tunnel_ctx,
2316                               const struct GNUNET_PeerIdentity *sender,
2317                               const struct GNUNET_MessageHeader *message,
2318                               const struct GNUNET_ATS_Information*atsi)
2319 {
2320   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2321
2322   return handle_transmit_close (socket,
2323                                 tunnel,
2324                                 sender,
2325                                 (struct GNUNET_STREAM_MessageHeader *)message,
2326                                 atsi);
2327 }
2328
2329
2330 /**
2331  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
2332  *
2333  * @param cls the closure
2334  * @param tunnel connection to the other end
2335  * @param tunnel_ctx the socket
2336  * @param sender who sent the message
2337  * @param message the actual message
2338  * @param atsi performance data for the connection
2339  * @return GNUNET_OK to keep the connection open,
2340  *         GNUNET_SYSERR to close it (signal serious error)
2341  */
2342 static int
2343 server_handle_transmit_close_ack (void *cls,
2344                                   struct GNUNET_MESH_Tunnel *tunnel,
2345                                   void **tunnel_ctx,
2346                                   const struct GNUNET_PeerIdentity *sender,
2347                                   const struct GNUNET_MessageHeader *message,
2348                                   const struct GNUNET_ATS_Information*atsi)
2349 {
2350   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2351
2352   return handle_generic_close_ack (socket,
2353                                    tunnel,
2354                                    sender,
2355                                    (const struct GNUNET_STREAM_MessageHeader *)
2356                                    message,
2357                                    atsi,
2358                                    SHUT_WR);
2359 }
2360
2361
2362 /**
2363  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
2364  *
2365  * @param cls the closure
2366  * @param tunnel connection to the other end
2367  * @param tunnel_ctx the socket
2368  * @param sender who sent the message
2369  * @param message the actual message
2370  * @param atsi performance data for the connection
2371  * @return GNUNET_OK to keep the connection open,
2372  *         GNUNET_SYSERR to close it (signal serious error)
2373  */
2374 static int
2375 server_handle_receive_close (void *cls,
2376                              struct GNUNET_MESH_Tunnel *tunnel,
2377                              void **tunnel_ctx,
2378                              const struct GNUNET_PeerIdentity *sender,
2379                              const struct GNUNET_MessageHeader *message,
2380                              const struct GNUNET_ATS_Information*atsi)
2381 {
2382   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2383
2384   return
2385     handle_receive_close (socket,
2386                           tunnel,
2387                           sender,
2388                           (const struct GNUNET_STREAM_MessageHeader *) message,
2389                           atsi);
2390 }
2391
2392
2393 /**
2394  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
2395  *
2396  * @param cls the closure
2397  * @param tunnel connection to the other end
2398  * @param tunnel_ctx the socket
2399  * @param sender who sent the message
2400  * @param message the actual message
2401  * @param atsi performance data for the connection
2402  * @return GNUNET_OK to keep the connection open,
2403  *         GNUNET_SYSERR to close it (signal serious error)
2404  */
2405 static int
2406 server_handle_receive_close_ack (void *cls,
2407                                  struct GNUNET_MESH_Tunnel *tunnel,
2408                                  void **tunnel_ctx,
2409                                  const struct GNUNET_PeerIdentity *sender,
2410                                  const struct GNUNET_MessageHeader *message,
2411                                  const struct GNUNET_ATS_Information*atsi)
2412 {
2413   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2414
2415   return handle_generic_close_ack (socket,
2416                                    tunnel,
2417                                    sender,
2418                                    (const struct GNUNET_STREAM_MessageHeader *)
2419                                    message,
2420                                    atsi,
2421                                    SHUT_RD);
2422 }
2423
2424
2425 /**
2426  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
2427  *
2428  * @param cls the listen socket (from GNUNET_MESH_connect in
2429  *          GNUNET_STREAM_listen) 
2430  * @param tunnel connection to the other end
2431  * @param tunnel_ctx the socket
2432  * @param sender who sent the message
2433  * @param message the actual message
2434  * @param atsi performance data for the connection
2435  * @return GNUNET_OK to keep the connection open,
2436  *         GNUNET_SYSERR to close it (signal serious error)
2437  */
2438 static int
2439 server_handle_close (void *cls,
2440                      struct GNUNET_MESH_Tunnel *tunnel,
2441                      void **tunnel_ctx,
2442                      const struct GNUNET_PeerIdentity *sender,
2443                      const struct GNUNET_MessageHeader *message,
2444                      const struct GNUNET_ATS_Information*atsi)
2445 {
2446   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2447   
2448   return handle_close (socket,
2449                        tunnel,
2450                        sender,
2451                        (const struct GNUNET_STREAM_MessageHeader *) message,
2452                        atsi);
2453 }
2454
2455
2456 /**
2457  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
2458  *
2459  * @param cls the closure
2460  * @param tunnel connection to the other end
2461  * @param tunnel_ctx the socket
2462  * @param sender who sent the message
2463  * @param message the actual message
2464  * @param atsi performance data for the connection
2465  * @return GNUNET_OK to keep the connection open,
2466  *         GNUNET_SYSERR to close it (signal serious error)
2467  */
2468 static int
2469 server_handle_close_ack (void *cls,
2470                          struct GNUNET_MESH_Tunnel *tunnel,
2471                          void **tunnel_ctx,
2472                          const struct GNUNET_PeerIdentity *sender,
2473                          const struct GNUNET_MessageHeader *message,
2474                          const struct GNUNET_ATS_Information*atsi)
2475 {
2476   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2477
2478   return handle_generic_close_ack (socket,
2479                                    tunnel,
2480                                    sender,
2481                                    (const struct GNUNET_STREAM_MessageHeader *) 
2482                                    message,
2483                                    atsi,
2484                                    SHUT_RDWR);
2485 }
2486
2487
2488 /**
2489  * Handler for DATA_ACK messages
2490  *
2491  * @param socket the socket through which the ack was received
2492  * @param tunnel connection to the other end
2493  * @param sender who sent the message
2494  * @param ack the acknowledgment message
2495  * @param atsi performance data for the connection
2496  * @return GNUNET_OK to keep the connection open,
2497  *         GNUNET_SYSERR to close it (signal serious error)
2498  */
2499 static int
2500 handle_ack (struct GNUNET_STREAM_Socket *socket,
2501             struct GNUNET_MESH_Tunnel *tunnel,
2502             const struct GNUNET_PeerIdentity *sender,
2503             const struct GNUNET_STREAM_AckMessage *ack,
2504             const struct GNUNET_ATS_Information*atsi)
2505 {
2506   unsigned int packet;
2507   int need_retransmission;
2508   uint32_t sequence_difference;
2509   
2510   if (0 != memcmp (sender,
2511                    &socket->other_peer,
2512                    sizeof (struct GNUNET_PeerIdentity)))
2513   {
2514     LOG (GNUNET_ERROR_TYPE_DEBUG,
2515          "%s: Received ACK from non-confirming peer\n",
2516          GNUNET_i2s (&socket->other_peer));
2517     return GNUNET_YES;
2518   }
2519   switch (socket->state)
2520   {
2521   case (STATE_ESTABLISHED):
2522   case (STATE_RECEIVE_CLOSED):
2523   case (STATE_RECEIVE_CLOSE_WAIT):
2524     if (NULL == socket->write_handle)
2525     {
2526       LOG (GNUNET_ERROR_TYPE_DEBUG,
2527            "%s: Received DATA_ACK when write_handle is NULL\n",
2528            GNUNET_i2s (&socket->other_peer));
2529       return GNUNET_OK;
2530     }
2531     sequence_difference = 
2532         socket->write_sequence_number - ntohl (ack->base_sequence_number);
2533     if (!(sequence_difference <= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
2534     {
2535       LOG (GNUNET_ERROR_TYPE_DEBUG,
2536            "%s: Received DATA_ACK with unexpected base sequence number\n",
2537            GNUNET_i2s (&socket->other_peer));
2538       LOG (GNUNET_ERROR_TYPE_DEBUG,
2539            "%s: Current write sequence: %u; Ack's base sequence: %u\n",
2540            GNUNET_i2s (&socket->other_peer),
2541            socket->write_sequence_number,
2542            ntohl (ack->base_sequence_number));
2543       return GNUNET_OK;
2544     }
2545     /* FIXME: include the case when write_handle is cancelled - ignore the 
2546        acks */
2547     LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received DATA_ACK from %s\n",
2548          GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
2549     /* Cancel the retransmission task */
2550     if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
2551     {
2552       GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
2553       socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
2554     }
2555     for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2556     {
2557       if (NULL == socket->write_handle->messages[packet]) break;
2558       /* BS: Base sequence from ack; PS: sequence num of current packet */
2559       sequence_difference = ntohl (ack->base_sequence_number)
2560         - ntohl (socket->write_handle->messages[packet]->sequence_number);
2561       if ((0 == sequence_difference) ||
2562           (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH < sequence_difference))
2563         continue; /* The message in our handle is not yet received */
2564       /* case where BS = PS + GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH */
2565       /* sequence_difference <= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH */
2566       ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
2567                             packet, GNUNET_YES);
2568     }
2569     /* Update the receive window remaining
2570        FIXME : Should update with the value from a data ack with greater
2571        sequence number */
2572     socket->receiver_window_available = 
2573       ntohl (ack->receive_window_remaining);
2574     /* Check if we have received all acknowledgements */
2575     need_retransmission = GNUNET_NO;
2576     for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2577     {
2578       if (NULL == socket->write_handle->messages[packet]) break;
2579       if (GNUNET_YES != ackbitmap_is_bit_set 
2580           (&socket->write_handle->ack_bitmap,packet))
2581       {
2582         need_retransmission = GNUNET_YES;
2583         break;
2584       }
2585     }
2586     if (GNUNET_YES == need_retransmission)
2587     {
2588       write_data (socket);
2589     }
2590     else      /* We have to call the write continuation callback now */
2591     {
2592       struct GNUNET_STREAM_IOWriteHandle *write_handle;
2593       
2594       /* Free the packets */
2595       for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2596       {
2597         GNUNET_free_non_null (socket->write_handle->messages[packet]);
2598       }
2599       write_handle = socket->write_handle;
2600       socket->write_handle = NULL;
2601       if (NULL != write_handle->write_cont)
2602         write_handle->write_cont (write_handle->write_cont_cls,
2603                                   socket->status,
2604                                   write_handle->size);
2605       /* We are done with the write handle - Freeing it */
2606       GNUNET_free (write_handle);
2607       LOG (GNUNET_ERROR_TYPE_DEBUG,
2608            "%s: Write completion callback completed\n",
2609            GNUNET_i2s (&socket->other_peer));      
2610     }
2611     break;
2612   default:
2613     break;
2614   }
2615   return GNUNET_OK;
2616 }
2617
2618
2619 /**
2620  * Handler for DATA_ACK messages
2621  *
2622  * @param cls the 'struct GNUNET_STREAM_Socket'
2623  * @param tunnel connection to the other end
2624  * @param tunnel_ctx unused
2625  * @param sender who sent the message
2626  * @param message the actual message
2627  * @param atsi performance data for the connection
2628  * @return GNUNET_OK to keep the connection open,
2629  *         GNUNET_SYSERR to close it (signal serious error)
2630  */
2631 static int
2632 client_handle_ack (void *cls,
2633                    struct GNUNET_MESH_Tunnel *tunnel,
2634                    void **tunnel_ctx,
2635                    const struct GNUNET_PeerIdentity *sender,
2636                    const struct GNUNET_MessageHeader *message,
2637                    const struct GNUNET_ATS_Information*atsi)
2638 {
2639   struct GNUNET_STREAM_Socket *socket = cls;
2640   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2641  
2642   return handle_ack (socket, tunnel, sender, ack, atsi);
2643 }
2644
2645
2646 /**
2647  * Handler for DATA_ACK messages
2648  *
2649  * @param cls the server's listen socket
2650  * @param tunnel connection to the other end
2651  * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
2652  * @param sender who sent the message
2653  * @param message the actual message
2654  * @param atsi performance data for the connection
2655  * @return GNUNET_OK to keep the connection open,
2656  *         GNUNET_SYSERR to close it (signal serious error)
2657  */
2658 static int
2659 server_handle_ack (void *cls,
2660                    struct GNUNET_MESH_Tunnel *tunnel,
2661                    void **tunnel_ctx,
2662                    const struct GNUNET_PeerIdentity *sender,
2663                    const struct GNUNET_MessageHeader *message,
2664                    const struct GNUNET_ATS_Information*atsi)
2665 {
2666   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2667   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2668  
2669   return handle_ack (socket, tunnel, sender, ack, atsi);
2670 }
2671
2672
2673 /**
2674  * For client message handlers, the stream socket is in the
2675  * closure argument.
2676  */
2677 static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
2678   {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2679   {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2680    sizeof (struct GNUNET_STREAM_AckMessage) },
2681   {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2682    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2683   {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2684    sizeof (struct GNUNET_STREAM_MessageHeader)},
2685   {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2686    sizeof (struct GNUNET_STREAM_MessageHeader)},
2687   {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2688    sizeof (struct GNUNET_STREAM_MessageHeader)},
2689   {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2690    sizeof (struct GNUNET_STREAM_MessageHeader)},
2691   {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2692    sizeof (struct GNUNET_STREAM_MessageHeader)},
2693   {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2694    sizeof (struct GNUNET_STREAM_MessageHeader)},
2695   {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2696    sizeof (struct GNUNET_STREAM_MessageHeader)},
2697   {NULL, 0, 0}
2698 };
2699
2700
2701 /**
2702  * For server message handlers, the stream socket is in the
2703  * tunnel context, and the listen socket in the closure argument.
2704  */
2705 static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
2706   {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2707   {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2708    sizeof (struct GNUNET_STREAM_AckMessage) },
2709   {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 
2710    sizeof (struct GNUNET_STREAM_MessageHeader)},
2711   {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2712    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2713   {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2714    sizeof (struct GNUNET_STREAM_MessageHeader)},
2715   {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2716    sizeof (struct GNUNET_STREAM_MessageHeader)},
2717   {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2718    sizeof (struct GNUNET_STREAM_MessageHeader)},
2719   {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2720    sizeof (struct GNUNET_STREAM_MessageHeader)},
2721   {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2722    sizeof (struct GNUNET_STREAM_MessageHeader)},
2723   {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2724    sizeof (struct GNUNET_STREAM_MessageHeader)},
2725   {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2726    sizeof (struct GNUNET_STREAM_MessageHeader)},
2727   {NULL, 0, 0}
2728 };
2729
2730
2731 /**
2732  * Function called when our target peer is connected to our tunnel
2733  *
2734  * @param cls the socket for which this tunnel is created
2735  * @param peer the peer identity of the target
2736  * @param atsi performance data for the connection
2737  */
2738 static void
2739 mesh_peer_connect_callback (void *cls,
2740                             const struct GNUNET_PeerIdentity *peer,
2741                             const struct GNUNET_ATS_Information * atsi)
2742 {
2743   struct GNUNET_STREAM_Socket *socket = cls;
2744   struct GNUNET_STREAM_MessageHeader *message;
2745   
2746   if (0 != memcmp (peer,
2747                    &socket->other_peer,
2748                    sizeof (struct GNUNET_PeerIdentity)))
2749   {
2750     LOG (GNUNET_ERROR_TYPE_DEBUG,
2751          "%s: A peer which is not our target has connected to our tunnel\n",
2752          GNUNET_i2s(peer));
2753     return;
2754   }
2755   LOG (GNUNET_ERROR_TYPE_DEBUG,
2756        "%s: Target peer %s connected\n",
2757        GNUNET_i2s (&socket->other_peer),
2758        GNUNET_i2s (&socket->other_peer));
2759   /* Set state to INIT */
2760   socket->state = STATE_INIT;
2761   /* Send HELLO message */
2762   message = generate_hello ();
2763   queue_message (socket, message, &set_state_hello_wait, NULL, GNUNET_NO);
2764   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
2765                  socket->control_retransmission_task_id);
2766   socket->control_retransmission_task_id =
2767     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
2768                                   &control_retransmission_task, socket);
2769 }
2770
2771
2772 /**
2773  * Function called when our target peer is disconnected from our tunnel
2774  *
2775  * @param cls the socket associated which this tunnel
2776  * @param peer the peer identity of the target
2777  */
2778 static void
2779 mesh_peer_disconnect_callback (void *cls,
2780                                const struct GNUNET_PeerIdentity *peer)
2781 {
2782   struct GNUNET_STREAM_Socket *socket=cls;
2783   
2784   /* If the state is SHUTDOWN its ok; else set the state of the socket to SYSERR */
2785   LOG (GNUNET_ERROR_TYPE_DEBUG,
2786        "%s: Other peer %s disconnected \n",
2787        GNUNET_i2s (&socket->other_peer),
2788        GNUNET_i2s (&socket->other_peer));
2789 }
2790
2791
2792 /**
2793  * Method called whenever a peer creates a tunnel to us
2794  *
2795  * @param cls closure
2796  * @param tunnel new handle to the tunnel
2797  * @param initiator peer that started the tunnel
2798  * @param atsi performance information for the tunnel
2799  * @return initial tunnel context for the tunnel
2800  *         (can be NULL -- that's not an error)
2801  */
2802 static void *
2803 new_tunnel_notify (void *cls,
2804                    struct GNUNET_MESH_Tunnel *tunnel,
2805                    const struct GNUNET_PeerIdentity *initiator,
2806                    const struct GNUNET_ATS_Information *atsi)
2807 {
2808   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2809   struct GNUNET_STREAM_Socket *socket;
2810
2811   /* FIXME: If a tunnel is already created, we should not accept new tunnels
2812      from the same peer again until the socket is closed */
2813
2814   if (GNUNET_NO == lsocket->listening)
2815   {
2816     GNUNET_MESH_tunnel_destroy (tunnel);
2817     return NULL;
2818   }
2819   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2820   socket->other_peer = *initiator;
2821   socket->tunnel = tunnel;
2822   socket->state = STATE_INIT;
2823   socket->lsocket = lsocket;
2824   socket->stat_handle = lsocket->stat_handle;
2825   socket->retransmit_timeout = lsocket->retransmit_timeout;
2826   socket->testing_active = lsocket->testing_active;
2827   socket->testing_set_write_sequence_number_value =
2828       lsocket->testing_set_write_sequence_number_value;
2829   socket->max_payload_size = lsocket->max_payload_size;
2830   LOG (GNUNET_ERROR_TYPE_DEBUG,
2831        "%s: Peer %s initiated tunnel to us\n", 
2832        GNUNET_i2s (&socket->other_peer),
2833        GNUNET_i2s (&socket->other_peer));
2834   if (NULL != socket->stat_handle)
2835   {
2836     GNUNET_STATISTICS_update (socket->stat_handle,
2837                               "total inbound connections received",
2838                               1, GNUNET_NO);
2839     GNUNET_STATISTICS_update (socket->stat_handle,
2840                               "inbound connections", 1, GNUNET_NO);
2841   }
2842   
2843   return socket;
2844 }
2845
2846
2847 /**
2848  * Function called whenever an inbound tunnel is destroyed.  Should clean up
2849  * any associated state.  This function is NOT called if the client has
2850  * explicitly asked for the tunnel to be destroyed using
2851  * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
2852  * the tunnel.
2853  *
2854  * @param cls closure (set from GNUNET_MESH_connect)
2855  * @param tunnel connection to the other end (henceforth invalid)
2856  * @param tunnel_ctx place where local state associated
2857  *                   with the tunnel is stored
2858  */
2859 static void 
2860 tunnel_cleaner (void *cls,
2861                 const struct GNUNET_MESH_Tunnel *tunnel,
2862                 void *tunnel_ctx)
2863 {
2864   struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
2865   struct MessageQueue *head;
2866
2867   GNUNET_assert (tunnel == socket->tunnel);
2868   GNUNET_break_op(0);
2869   LOG (GNUNET_ERROR_TYPE_DEBUG,
2870        "%s: Peer %s has terminated connection abruptly\n",
2871        GNUNET_i2s (&socket->other_peer),
2872        GNUNET_i2s (&socket->other_peer));
2873   if (NULL != socket->stat_handle)
2874   {
2875     GNUNET_STATISTICS_update (socket->stat_handle,
2876                               "connections terminated abruptly", 1, GNUNET_NO);
2877     GNUNET_STATISTICS_update (socket->stat_handle,
2878                               "inbound connections", -1, GNUNET_NO);
2879   }
2880   socket->status = GNUNET_STREAM_SHUTDOWN;
2881   /* Clear Transmit handles */
2882   if (NULL != socket->transmit_handle)
2883   {
2884     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2885     socket->transmit_handle = NULL;
2886   }
2887   /* Stop Tasks using socket->tunnel */
2888   if (GNUNET_SCHEDULER_NO_TASK != socket->ack_task_id)
2889   {
2890     GNUNET_SCHEDULER_cancel (socket->ack_task_id);
2891     socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
2892   }
2893   if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
2894   {
2895     GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
2896     socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
2897   }  
2898   /* Terminate the control retransmission tasks */
2899   if (GNUNET_SCHEDULER_NO_TASK != socket->control_retransmission_task_id)
2900   {
2901     GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
2902     socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
2903   }
2904   /* Clear Transmit handles */
2905   if (NULL != socket->transmit_handle)
2906   {
2907     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2908     socket->transmit_handle = NULL;
2909   }
2910   /* Clear existing message queue */
2911   while (NULL != (head = socket->queue_head)) {
2912     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
2913                                  socket->queue_tail,
2914                                  head);
2915     GNUNET_free (head->message);
2916     GNUNET_free (head);
2917   }
2918   socket->tunnel = NULL;
2919 }
2920
2921
2922 /**
2923  * Callback to signal timeout on lockmanager lock acquire
2924  *
2925  * @param cls the ListenSocket
2926  * @param tc the scheduler task context
2927  */
2928 static void
2929 lockmanager_acquire_timeout (void *cls, 
2930                              const struct GNUNET_SCHEDULER_TaskContext *tc)
2931 {
2932   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2933   GNUNET_STREAM_ListenCallback listen_cb;
2934   void *listen_cb_cls;
2935
2936   lsocket->lockmanager_acquire_timeout_task = GNUNET_SCHEDULER_NO_TASK;
2937   listen_cb = lsocket->listen_cb;
2938   listen_cb_cls = lsocket->listen_cb_cls;
2939   if (NULL != listen_cb)
2940     listen_cb (listen_cb_cls, NULL, NULL);
2941 }
2942
2943
2944 /**
2945  * Callback to notify us on the status changes on app_port lock
2946  *
2947  * @param cls the ListenSocket
2948  * @param domain the domain name of the lock
2949  * @param lock the app_port
2950  * @param status the current status of the lock
2951  */
2952 static void
2953 lock_status_change_cb (void *cls, const char *domain, uint32_t lock,
2954                        enum GNUNET_LOCKMANAGER_Status status)
2955 {
2956   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2957
2958   GNUNET_assert (lock == (uint32_t) lsocket->port);
2959   if (GNUNET_LOCKMANAGER_SUCCESS == status)
2960   {
2961     lsocket->listening = GNUNET_YES;
2962     if (GNUNET_SCHEDULER_NO_TASK != lsocket->lockmanager_acquire_timeout_task)
2963     {
2964       GNUNET_SCHEDULER_cancel (lsocket->lockmanager_acquire_timeout_task);
2965       lsocket->lockmanager_acquire_timeout_task = GNUNET_SCHEDULER_NO_TASK;
2966     }
2967     if (NULL == lsocket->mesh)
2968     {
2969       GNUNET_MESH_ApplicationType ports[] = {lsocket->port, 0};
2970
2971       lsocket->mesh = GNUNET_MESH_connect (lsocket->cfg,
2972                                            lsocket, /* Closure */
2973                                            &new_tunnel_notify,
2974                                            &tunnel_cleaner,
2975                                            server_message_handlers,
2976                                            ports);
2977       GNUNET_assert (NULL != lsocket->mesh);
2978       if (NULL != lsocket->listen_ok_cb)
2979       {
2980         (void) lsocket->listen_ok_cb ();
2981       }
2982     }
2983   }
2984   if (GNUNET_LOCKMANAGER_RELEASE == status)
2985     lsocket->listening = GNUNET_NO;
2986 }
2987
2988
2989 /*****************/
2990 /* API functions */
2991 /*****************/
2992
2993
2994 /**
2995  * Tries to open a stream to the target peer
2996  *
2997  * @param cfg configuration to use
2998  * @param target the target peer to which the stream has to be opened
2999  * @param app_port the application port number which uniquely identifies this
3000  *            stream
3001  * @param open_cb this function will be called after stream has be established;
3002  *          cannot be NULL
3003  * @param open_cb_cls the closure for open_cb
3004  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
3005  * @return if successful it returns the stream socket; NULL if stream cannot be
3006  *         opened 
3007  */
3008 struct GNUNET_STREAM_Socket *
3009 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
3010                     const struct GNUNET_PeerIdentity *target,
3011                     GNUNET_MESH_ApplicationType app_port,
3012                     GNUNET_STREAM_OpenCallback open_cb,
3013                     void *open_cb_cls,
3014                     ...)
3015 {
3016   struct GNUNET_STREAM_Socket *socket;
3017   enum GNUNET_STREAM_Option option;
3018   GNUNET_MESH_ApplicationType ports[] = {app_port, 0};
3019   va_list vargs;
3020   uint16_t payload_size;
3021
3022   LOG (GNUNET_ERROR_TYPE_DEBUG,
3023        "%s\n", __func__);
3024   GNUNET_assert (NULL != open_cb);
3025   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
3026   socket->other_peer = *target;
3027   socket->open_cb = open_cb;
3028   socket->open_cls = open_cb_cls;
3029   /* Set defaults */
3030   socket->retransmit_timeout = TIME_REL_SECS (default_timeout);
3031   socket->testing_active = GNUNET_NO;
3032   socket->max_payload_size = DEFAULT_MAX_PAYLOAD_SIZE;
3033   va_start (vargs, open_cb_cls); /* Parse variable args */
3034   do {
3035     option = va_arg (vargs, enum GNUNET_STREAM_Option);
3036     switch (option)
3037     {
3038     case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
3039       /* Expect struct GNUNET_TIME_Relative */
3040       socket->retransmit_timeout = va_arg (vargs,
3041                                            struct GNUNET_TIME_Relative);
3042       break;
3043     case GNUNET_STREAM_OPTION_TESTING_SET_WRITE_SEQUENCE_NUMBER:
3044       socket->testing_active = GNUNET_YES;
3045       socket->testing_set_write_sequence_number_value = va_arg (vargs,
3046                                                                 uint32_t);
3047       break;
3048     case GNUNET_STREAM_OPTION_LISTEN_TIMEOUT:
3049       GNUNET_break (0);          /* Option irrelevant in STREAM_open */
3050       break;
3051     case GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS:
3052       GNUNET_break (0);          /* Option irrelevant in STREAM_open */
3053       break;
3054     case GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE:
3055       payload_size = (uint16_t) va_arg (vargs, unsigned int);
3056       GNUNET_assert (0 != payload_size);
3057       if (payload_size < socket->max_payload_size)
3058         socket->max_payload_size = payload_size;
3059       break;
3060     case GNUNET_STREAM_OPTION_END:
3061       break;
3062     }
3063   } while (GNUNET_STREAM_OPTION_END != option);
3064   va_end (vargs);               /* End of variable args parsing */
3065   socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
3066                                       socket, /* cls */
3067                                       NULL, /* No inbound tunnel handler */
3068                                       NULL, /* No in-tunnel cleaner */
3069                                       client_message_handlers,
3070                                       ports); /* We don't get inbound tunnels */
3071   if (NULL == socket->mesh)   /* Fail if we cannot connect to mesh */
3072   {
3073     GNUNET_free (socket);
3074     return NULL;
3075   }
3076   /* Now create the mesh tunnel to target */
3077   LOG (GNUNET_ERROR_TYPE_DEBUG, "Creating MESH Tunnel\n");
3078   socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
3079                                               NULL, /* Tunnel context */
3080                                               &mesh_peer_connect_callback,
3081                                               &mesh_peer_disconnect_callback,
3082                                               socket);
3083   GNUNET_assert (NULL != socket->tunnel);
3084   GNUNET_MESH_peer_request_connect_add (socket->tunnel,
3085                                         &socket->other_peer);
3086   socket->stat_handle = GNUNET_STATISTICS_create ("stream", cfg);
3087   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s() END\n", __func__);
3088   return socket;
3089 }
3090
3091
3092 /**
3093  * Shutdown the stream for reading or writing (similar to man 2 shutdown).
3094  *
3095  * @param socket the stream socket
3096  * @param operation SHUT_RD, SHUT_WR or SHUT_RDWR
3097  * @param completion_cb the callback that will be called upon successful
3098  *          shutdown of given operation
3099  * @param completion_cls the closure for the completion callback
3100  * @return the shutdown handle
3101  */
3102 struct GNUNET_STREAM_ShutdownHandle *
3103 GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
3104                         int operation,
3105                         GNUNET_STREAM_ShutdownCompletion completion_cb,
3106                         void *completion_cls)
3107 {
3108   struct GNUNET_STREAM_ShutdownHandle *handle;
3109   struct GNUNET_STREAM_MessageHeader *msg;
3110   
3111   GNUNET_assert (NULL == socket->shutdown_handle);
3112   handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ShutdownHandle));
3113   handle->socket = socket;
3114   handle->completion_cb = completion_cb;
3115   handle->completion_cls = completion_cls;
3116   socket->shutdown_handle = handle;
3117   if ( ((GNUNET_YES == socket->receive_closed) && (SHUT_RD == operation))
3118        || ((GNUNET_YES == socket->transmit_closed) && (SHUT_WR == operation))
3119        || ((GNUNET_YES == socket->transmit_closed) 
3120            && (GNUNET_YES == socket->receive_closed)
3121            && (SHUT_RDWR == operation)) )
3122   {
3123     handle->operation = operation;
3124     handle->call_cont_task_id = GNUNET_SCHEDULER_add_now (&call_cont_task,
3125                                                           socket);
3126     return handle;
3127   }
3128   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
3129   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
3130   switch (operation)
3131   {
3132   case SHUT_RD:
3133     handle->operation = SHUT_RD;
3134     if (NULL != socket->read_handle)
3135       LOG (GNUNET_ERROR_TYPE_WARNING,
3136            "Existing read handle should be cancelled before shutting"
3137            " down reading\n");
3138     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
3139     queue_message (socket, msg, &set_state_receive_close_wait, NULL,
3140                    GNUNET_NO);
3141     socket->receive_closed = GNUNET_YES;
3142     break;
3143   case SHUT_WR:
3144     handle->operation = SHUT_WR;
3145     if (NULL != socket->write_handle)
3146       LOG (GNUNET_ERROR_TYPE_WARNING,
3147            "Existing write handle should be cancelled before shutting"
3148            " down writing\n");
3149     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
3150     queue_message (socket, msg, &set_state_transmit_close_wait, NULL,
3151                    GNUNET_NO);
3152     socket->transmit_closed = GNUNET_YES;
3153     break;
3154   case SHUT_RDWR:
3155     handle->operation = SHUT_RDWR;
3156     if (NULL != socket->write_handle)
3157       LOG (GNUNET_ERROR_TYPE_WARNING,
3158            "Existing write handle should be cancelled before shutting"
3159            " down writing\n");
3160     if (NULL != socket->read_handle)
3161       LOG (GNUNET_ERROR_TYPE_WARNING,
3162            "Existing read handle should be cancelled before shutting"
3163            " down reading\n");
3164     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
3165     queue_message (socket, msg, &set_state_close_wait, NULL, GNUNET_NO);
3166     socket->transmit_closed = GNUNET_YES;
3167     socket->receive_closed = GNUNET_YES;
3168     break;
3169   default:
3170     LOG (GNUNET_ERROR_TYPE_WARNING,
3171          "GNUNET_STREAM_shutdown called with invalid value for "
3172          "parameter operation -- Ignoring\n");
3173     GNUNET_free (msg);
3174     GNUNET_free (handle);
3175     return NULL;
3176   }
3177   handle->close_msg_retransmission_task_id =
3178     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
3179                                   &close_msg_retransmission_task,
3180                                   handle);
3181   return handle;
3182 }
3183
3184
3185 /**
3186  * Cancels a pending shutdown. Note that the shutdown messages may already be
3187  * sent and the stream is shutdown already for the operation given to
3188  * GNUNET_STREAM_shutdown(). This function only clears up any retranmissions of
3189  * shutdown messages and frees the shutdown handle.
3190  *
3191  * @param handle the shutdown handle returned from GNUNET_STREAM_shutdown
3192  */
3193 void
3194 GNUNET_STREAM_shutdown_cancel (struct GNUNET_STREAM_ShutdownHandle *handle)
3195 {
3196   if (GNUNET_SCHEDULER_NO_TASK != handle->close_msg_retransmission_task_id)
3197     GNUNET_SCHEDULER_cancel (handle->close_msg_retransmission_task_id);
3198   if (GNUNET_SCHEDULER_NO_TASK != handle->call_cont_task_id)
3199     GNUNET_SCHEDULER_cancel (handle->call_cont_task_id);
3200   handle->socket->shutdown_handle = NULL;
3201   GNUNET_free (handle);
3202 }
3203
3204
3205 /**
3206  * Closes the stream
3207  *
3208  * @param socket the stream socket
3209  */
3210 void
3211 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
3212 {
3213   struct MessageQueue *head;
3214
3215   if (NULL != socket->read_handle)
3216   {
3217     LOG (GNUNET_ERROR_TYPE_WARNING,
3218          "Closing STREAM socket when a read handle is pending\n");
3219     GNUNET_STREAM_io_read_cancel (socket->read_handle);
3220   }
3221   if (NULL != socket->write_handle)
3222   {
3223     LOG (GNUNET_ERROR_TYPE_WARNING,
3224          "Closing STREAM socket when a write handle is pending\n");
3225     GNUNET_STREAM_io_write_cancel (socket->write_handle);
3226     //socket->write_handle = NULL;
3227   }
3228   /* Terminate the ack'ing task if they are still present */
3229   if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK)
3230   {
3231     GNUNET_SCHEDULER_cancel (socket->ack_task_id);
3232     socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
3233   }
3234   /* Terminate the control retransmission tasks */
3235   if (GNUNET_SCHEDULER_NO_TASK != socket->control_retransmission_task_id)
3236   {
3237     GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
3238   }
3239   /* Clear Transmit handles */
3240   if (NULL != socket->transmit_handle)
3241   {
3242     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
3243     socket->transmit_handle = NULL;
3244   }
3245   /* Clear existing message queue */
3246   while (NULL != (head = socket->queue_head)) {
3247     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
3248                                  socket->queue_tail,
3249                                  head);
3250     GNUNET_free (head->message);
3251     GNUNET_free (head);
3252   }
3253   /* Close associated tunnel */
3254   if (NULL != socket->tunnel)
3255   {
3256     GNUNET_MESH_tunnel_destroy (socket->tunnel);
3257     socket->tunnel = NULL;
3258   }
3259   /* Close mesh connection */
3260   if ((NULL != socket->mesh) && (NULL == socket->lsocket))
3261   {
3262     GNUNET_MESH_disconnect (socket->mesh);
3263     socket->mesh = NULL;
3264   }
3265   /* Close statistics connection */
3266   if ( (NULL != socket->stat_handle) && (NULL == socket->lsocket) )
3267     GNUNET_STATISTICS_destroy (socket->stat_handle, GNUNET_YES);
3268   /* Release receive buffer */
3269   if (NULL != socket->receive_buffer)
3270   {
3271     GNUNET_free (socket->receive_buffer);
3272   }
3273   GNUNET_free (socket);
3274 }
3275
3276
3277 /**
3278  * Listens for stream connections for a specific application ports
3279  *
3280  * @param cfg the configuration to use
3281  * @param app_port the application port for which new streams will be accepted
3282  * @param listen_cb this function will be called when a peer tries to establish
3283  *            a stream with us
3284  * @param listen_cb_cls closure for listen_cb
3285  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
3286  * @return listen socket, NULL for any error
3287  */
3288 struct GNUNET_STREAM_ListenSocket *
3289 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
3290                       GNUNET_MESH_ApplicationType app_port,
3291                       GNUNET_STREAM_ListenCallback listen_cb,
3292                       void *listen_cb_cls,
3293                       ...)
3294 {
3295   struct GNUNET_STREAM_ListenSocket *lsocket;
3296   struct GNUNET_TIME_Relative listen_timeout;
3297   enum GNUNET_STREAM_Option option;
3298   va_list vargs;
3299   uint16_t payload_size;
3300
3301   GNUNET_assert (NULL != listen_cb);
3302   lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
3303   lsocket->cfg = GNUNET_CONFIGURATION_dup (cfg);
3304   lsocket->lockmanager = GNUNET_LOCKMANAGER_connect (lsocket->cfg);
3305   if (NULL == lsocket->lockmanager)
3306   {
3307     GNUNET_CONFIGURATION_destroy (lsocket->cfg);
3308     GNUNET_free (lsocket);
3309     return NULL;
3310   }
3311   lsocket->listening = GNUNET_NO;/* We listen when we get a lock on app_port */  
3312   /* Set defaults */
3313   lsocket->retransmit_timeout = TIME_REL_SECS (default_timeout);
3314   lsocket->testing_active = GNUNET_NO;
3315   lsocket->listen_ok_cb = NULL;
3316   lsocket->max_payload_size = DEFAULT_MAX_PAYLOAD_SIZE;
3317   listen_timeout = TIME_REL_SECS (60); /* A minute for listen timeout */  
3318   va_start (vargs, listen_cb_cls);
3319   do {
3320     option = va_arg (vargs, enum GNUNET_STREAM_Option);
3321     switch (option)
3322     {
3323     case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
3324       lsocket->retransmit_timeout = va_arg (vargs,
3325                                             struct GNUNET_TIME_Relative);
3326       break;
3327     case GNUNET_STREAM_OPTION_TESTING_SET_WRITE_SEQUENCE_NUMBER:
3328       lsocket->testing_active = GNUNET_YES;
3329       lsocket->testing_set_write_sequence_number_value = va_arg (vargs,
3330                                                                  uint32_t);
3331       break;
3332     case GNUNET_STREAM_OPTION_LISTEN_TIMEOUT:
3333       listen_timeout = GNUNET_TIME_relative_multiply
3334         (GNUNET_TIME_UNIT_MILLISECONDS, va_arg (vargs, uint32_t));
3335       break;
3336     case GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS:
3337       lsocket->listen_ok_cb = va_arg (vargs,
3338                                       GNUNET_STREAM_ListenSuccessCallback);
3339       break;
3340     case GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE:
3341       payload_size = (uint16_t) va_arg (vargs, unsigned int);
3342       GNUNET_assert (0 != payload_size);
3343       if (payload_size < lsocket->max_payload_size)
3344         lsocket->max_payload_size = payload_size;
3345       break;
3346     case GNUNET_STREAM_OPTION_END:
3347       break;
3348     }
3349   } while (GNUNET_STREAM_OPTION_END != option);
3350   va_end (vargs);
3351   lsocket->port = app_port;
3352   lsocket->listen_cb = listen_cb;
3353   lsocket->listen_cb_cls = listen_cb_cls;
3354   lsocket->locking_request = 
3355     GNUNET_LOCKMANAGER_acquire_lock (lsocket->lockmanager, locking_domain,
3356                                      (uint32_t) lsocket->port,
3357                                      &lock_status_change_cb, lsocket);
3358   lsocket->lockmanager_acquire_timeout_task =
3359     GNUNET_SCHEDULER_add_delayed (listen_timeout,
3360                                   &lockmanager_acquire_timeout, lsocket);
3361   lsocket->stat_handle = GNUNET_STATISTICS_create ("stream",
3362                                                    lsocket->cfg);
3363   return lsocket;
3364 }
3365
3366
3367 /**
3368  * Closes the listen socket
3369  *
3370  * @param lsocket the listen socket
3371  */
3372 void
3373 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
3374 {
3375   /* Close MESH connection */
3376   if (NULL != lsocket->mesh)
3377     GNUNET_MESH_disconnect (lsocket->mesh);
3378   if (NULL != lsocket->stat_handle)
3379     GNUNET_STATISTICS_destroy (lsocket->stat_handle, GNUNET_YES);
3380   GNUNET_CONFIGURATION_destroy (lsocket->cfg);
3381   if (GNUNET_SCHEDULER_NO_TASK != lsocket->lockmanager_acquire_timeout_task)
3382     GNUNET_SCHEDULER_cancel (lsocket->lockmanager_acquire_timeout_task);
3383   if (NULL != lsocket->locking_request)
3384     GNUNET_LOCKMANAGER_cancel_request (lsocket->locking_request);
3385   if (NULL != lsocket->lockmanager)
3386     GNUNET_LOCKMANAGER_disconnect (lsocket->lockmanager);
3387   GNUNET_free (lsocket);
3388 }
3389
3390
3391 /**
3392  * Tries to write the given data to the stream. The maximum size of data that
3393  * can be written as part of a write operation is (64 * (64000 - sizeof (struct
3394  * GNUNET_STREAM_DataMessage))). If size is greater than this it is not an API
3395  * violation, however only the said number of maximum bytes will be written.
3396  *
3397  * @param socket the socket representing a stream
3398  * @param data the data buffer from where the data is written into the stream
3399  * @param size the number of bytes to be written from the data buffer
3400  * @param timeout the timeout period
3401  * @param write_cont the function to call upon writing some bytes into the
3402  *          stream 
3403  * @param write_cont_cls the closure
3404  *
3405  * @return handle to cancel the operation; if a previous write is pending or
3406  *           the stream has been shutdown for this operation then write_cont is
3407  *           immediately called and NULL is returned.
3408  */
3409 struct GNUNET_STREAM_IOWriteHandle *
3410 GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
3411                      const void *data,
3412                      size_t size,
3413                      struct GNUNET_TIME_Relative timeout,
3414                      GNUNET_STREAM_CompletionContinuation write_cont,
3415                      void *write_cont_cls)
3416 {
3417   struct GNUNET_STREAM_IOWriteHandle *io_handle;
3418   struct GNUNET_STREAM_DataMessage *data_msg;
3419   const void *sweep;
3420   struct GNUNET_TIME_Relative ack_deadline;
3421   unsigned int num_needed_packets;
3422   unsigned int packet;
3423   uint32_t packet_size;
3424   uint32_t payload_size;
3425   uint16_t max_data_packet_size;
3426
3427   LOG (GNUNET_ERROR_TYPE_DEBUG,
3428        "%s\n", __func__);
3429   if (NULL != socket->write_handle)
3430   {
3431     GNUNET_break (0);
3432     return NULL;
3433   }
3434   switch (socket->state)
3435   {
3436   case STATE_TRANSMIT_CLOSED:
3437   case STATE_TRANSMIT_CLOSE_WAIT:
3438   case STATE_CLOSED:
3439   case STATE_CLOSE_WAIT:
3440     if (NULL != write_cont)
3441       write_cont (write_cont_cls, GNUNET_STREAM_SHUTDOWN, 0);
3442     LOG (GNUNET_ERROR_TYPE_DEBUG,
3443          "%s() END\n", __func__);
3444     return NULL;
3445   case STATE_INIT:
3446   case STATE_LISTEN:
3447   case STATE_HELLO_WAIT:
3448     if (NULL != write_cont)
3449       /* FIXME: GNUNET_STREAM_SYSERR?? */
3450       write_cont (write_cont_cls, GNUNET_STREAM_SYSERR, 0);
3451     LOG (GNUNET_ERROR_TYPE_DEBUG,
3452          "%s() END\n", __func__);
3453     return NULL;
3454   case STATE_ESTABLISHED:
3455   case STATE_RECEIVE_CLOSED:
3456   case STATE_RECEIVE_CLOSE_WAIT:
3457     break;
3458   }
3459   if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * socket->max_payload_size < size)
3460     size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH  * socket->max_payload_size;
3461   num_needed_packets =
3462       (size + (socket->max_payload_size - 1)) / socket->max_payload_size;
3463   io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
3464   io_handle->socket = socket;
3465   io_handle->write_cont = write_cont;
3466   io_handle->write_cont_cls = write_cont_cls;
3467   io_handle->size = size;
3468   io_handle->packets_sent = 0;
3469   sweep = data;
3470   /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3471      determined from RTT */
3472   ack_deadline = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
3473   /* Divide the given buffer into packets for sending */
3474   max_data_packet_size =
3475       socket->max_payload_size + sizeof (struct GNUNET_STREAM_DataMessage);
3476   for (packet=0; packet < num_needed_packets; packet++)
3477   {
3478     if ((packet + 1) * socket->max_payload_size < size) 
3479     {
3480       payload_size = socket->max_payload_size;
3481       packet_size = max_data_packet_size;
3482     }
3483     else 
3484     {
3485       payload_size = size - packet * socket->max_payload_size;
3486       packet_size = 
3487           payload_size + sizeof (struct GNUNET_STREAM_DataMessage);
3488     }
3489     io_handle->messages[packet] = GNUNET_malloc (packet_size);
3490     io_handle->messages[packet]->header.header.size = htons (packet_size);
3491     io_handle->messages[packet]->header.header.type =
3492       htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
3493     io_handle->messages[packet]->sequence_number =
3494       htonl (socket->write_sequence_number++);
3495     io_handle->messages[packet]->offset = htonl (socket->write_offset);
3496     /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3497        determined from RTT */
3498     io_handle->messages[packet]->ack_deadline =
3499       GNUNET_TIME_relative_hton (ack_deadline);
3500     data_msg = io_handle->messages[packet];
3501     /* Copy data from given buffer to the packet */
3502     memcpy (&data_msg[1], sweep, payload_size);
3503     sweep += payload_size;
3504     socket->write_offset += payload_size;
3505   }
3506   /* ack the last data message. FIXME: remove when we figure out how to do this
3507      using RTT */
3508   io_handle->messages[num_needed_packets - 1]->ack_deadline = 
3509       GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO);
3510   socket->write_handle = io_handle;
3511   write_data (socket);
3512   LOG (GNUNET_ERROR_TYPE_DEBUG,
3513        "%s() END\n", __func__);
3514   return io_handle;
3515 }
3516
3517
3518 /**
3519  * Tries to read data from the stream.
3520  *
3521  * @param socket the socket representing a stream
3522  * @param timeout the timeout period
3523  * @param proc function to call with data (once only)
3524  * @param proc_cls the closure for proc
3525  *
3526  * @return handle to cancel the operation; NULL is returned if: the stream has
3527  *           been shutdown for this type of opeartion (the DataProcessor is
3528  *           immediately called with GNUNET_STREAM_SHUTDOWN as status) OR another
3529  *           read handle is present (only one read handle per socket is present
3530  *           at any time)
3531  */
3532 struct GNUNET_STREAM_IOReadHandle *
3533 GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
3534                     struct GNUNET_TIME_Relative timeout,
3535                     GNUNET_STREAM_DataProcessor proc,
3536                     void *proc_cls)
3537 {
3538   struct GNUNET_STREAM_IOReadHandle *read_handle;
3539   
3540   LOG (GNUNET_ERROR_TYPE_DEBUG,
3541        "%s: %s()\n", 
3542        GNUNET_i2s (&socket->other_peer),
3543        __func__);
3544   /* Return NULL if there is already a read handle; the user has to cancel that
3545      first before continuing or has to wait until it is completed */
3546   if (NULL != socket->read_handle) 
3547     return NULL;
3548   GNUNET_assert (NULL != proc);
3549   switch (socket->state)
3550   {
3551   case STATE_RECEIVE_CLOSED:
3552   case STATE_RECEIVE_CLOSE_WAIT:
3553   case STATE_CLOSED:
3554   case STATE_CLOSE_WAIT:
3555     LOG (GNUNET_ERROR_TYPE_DEBUG,
3556          "%s: %s() END\n",
3557          GNUNET_i2s (&socket->other_peer),
3558          __func__);
3559     proc (proc_cls, GNUNET_STREAM_SHUTDOWN, NULL, 0);
3560     return NULL;
3561   default:
3562     break;
3563   }
3564   read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
3565   read_handle->proc = proc;
3566   read_handle->proc_cls = proc_cls;
3567   read_handle->socket = socket;
3568   socket->read_handle = read_handle;
3569   if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
3570                                           0))
3571     read_handle->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
3572                                                           socket);   
3573   read_handle->read_io_timeout_task_id =
3574       GNUNET_SCHEDULER_add_delayed (timeout, &read_io_timeout, socket);
3575   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: %s() END\n",
3576        GNUNET_i2s (&socket->other_peer), __func__);
3577   return read_handle;
3578 }
3579
3580
3581 /**
3582  * Cancel pending write operation.
3583  *
3584  * @param ioh handle to operation to cancel
3585  */
3586 void
3587 GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
3588 {
3589   struct GNUNET_STREAM_Socket *socket = ioh->socket;
3590   unsigned int packet;
3591
3592   GNUNET_assert (NULL != socket->write_handle);
3593   GNUNET_assert (socket->write_handle == ioh);
3594
3595   if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
3596   {
3597     GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
3598     socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
3599   }
3600
3601   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
3602   {
3603     if (NULL == ioh->messages[packet]) break;
3604     GNUNET_free (ioh->messages[packet]);
3605   }
3606       
3607   GNUNET_free (socket->write_handle);
3608   socket->write_handle = NULL;
3609 }
3610
3611
3612 /**
3613  * Cancel pending read operation.
3614  *
3615  * @param ioh handle to operation to cancel
3616  */
3617 void
3618 GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh)
3619 {
3620   struct GNUNET_STREAM_Socket *socket;
3621   
3622   socket = ioh->socket;
3623   GNUNET_assert (NULL != socket->read_handle);
3624   GNUNET_assert (ioh == socket->read_handle);
3625   /* Read io time task should be there; if it is already executed then this
3626   read handle is not valid; However upon scheduler shutdown the read io task
3627   may be executed before */
3628   if (GNUNET_SCHEDULER_NO_TASK != ioh->read_io_timeout_task_id)
3629     GNUNET_SCHEDULER_cancel (ioh->read_io_timeout_task_id);
3630   /* reading task may be present; if so we have to stop it */
3631   if (GNUNET_SCHEDULER_NO_TASK != ioh->read_task_id)
3632     GNUNET_SCHEDULER_cancel (ioh->read_task_id);
3633   GNUNET_free (ioh);
3634   socket->read_handle = NULL;
3635 }
3636
3637 /* end of stream_api.c */