fixing #2574
[oweals/gnunet.git] / src / stream / stream_api.c
1 /*
2   This file is part of GNUnet.
3   (C) 2012 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /* TODO:
22  *
23  * Checks for matching the sender and socket->other_peer in server
24  * message handlers  
25  *
26  * Add code for write io timeout
27  *
28  * Include retransmission for control messages
29  **/
30
31 /**
32  * @file stream/stream_api.c
33  * @brief Implementation of the stream library
34  * @author Sree Harsha Totakura
35  */
36
37
38 #include "platform.h"
39 #include "gnunet_common.h"
40 #include "gnunet_crypto_lib.h"
41 #include "gnunet_lockmanager_service.h"
42 #include "gnunet_stream_lib.h"
43 #include "stream_protocol.h"
44
45 /**
46  * Generic logging shorthand
47  */
48 #define LOG(kind,...)                                   \
49   GNUNET_log_from (kind, "stream-api", __VA_ARGS__)
50
51 /**
52  * Debug logging shorthand
53  */
54 #define LOG_DEBUG(...)                          \
55   LOG (GNUNET_ERROR_TYPE_DEBUG, __VA_ARGS__)
56
57 /**
58  * Time in relative seconds shorthand
59  */
60 #define TIME_REL_SECS(sec) \
61   GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, sec)
62
63 /**
64  * The maximum packet size of a stream packet
65  */
66 #define DEFAULT_MAX_PAYLOAD_SIZE 64000
67
68 /**
69  * Receive buffer
70  */
71 #define RECEIVE_BUFFER_SIZE 4096000
72
73 /**
74  * states in the Protocol
75  */
76 enum State
77   {
78     /**
79      * Client initialization state
80      */
81     STATE_INIT,
82
83     /**
84      * Listener initialization state 
85      */
86     STATE_LISTEN,
87
88     /**
89      * Pre-connection establishment state
90      */
91     STATE_HELLO_WAIT,
92
93     /**
94      * State where a connection has been established
95      */
96     STATE_ESTABLISHED,
97
98     /**
99      * State where the socket is closed on our side and waiting to be ACK'ed
100      */
101     STATE_RECEIVE_CLOSE_WAIT,
102
103     /**
104      * State where the socket is closed for reading
105      */
106     STATE_RECEIVE_CLOSED,
107
108     /**
109      * State where the socket is closed on our side and waiting to be ACK'ed
110      */
111     STATE_TRANSMIT_CLOSE_WAIT,
112
113     /**
114      * State where the socket is closed for writing
115      */
116     STATE_TRANSMIT_CLOSED,
117
118     /**
119      * State where the socket is closed on our side and waiting to be ACK'ed
120      */
121     STATE_CLOSE_WAIT,
122
123     /**
124      * State where the socket is closed
125      */
126     STATE_CLOSED 
127   };
128
129
130 /**
131  * Functions of this type are called when a message is written
132  *
133  * @param cls the closure from queue_message
134  * @param socket the socket the written message was bound to
135  */
136 typedef void (*SendFinishCallback) (void *cls,
137                                     struct GNUNET_STREAM_Socket *socket);
138
139
140 /**
141  * The send message queue
142  */
143 struct MessageQueue
144 {
145   /**
146    * The message
147    */
148   struct GNUNET_STREAM_MessageHeader *message;
149
150   /**
151    * Callback to be called when the message is sent
152    */
153   SendFinishCallback finish_cb;
154
155   /**
156    * The closure for finish_cb
157    */
158   void *finish_cb_cls;
159
160   /**
161    * The next message in queue. Should be NULL in the last message
162    */
163   struct MessageQueue *next;
164
165   /**
166    * The next message in queue. Should be NULL in the first message
167    */
168   struct MessageQueue *prev;
169 };
170
171
172 /**
173  * The STREAM Socket Handler
174  */
175 struct GNUNET_STREAM_Socket
176 {
177   /**
178    * Retransmission timeout
179    */
180   struct GNUNET_TIME_Relative retransmit_timeout;
181
182   /**
183    * The Acknowledgement Bitmap
184    */
185   GNUNET_STREAM_AckBitmap ack_bitmap;
186
187   /**
188    * Time when the Acknowledgement was queued
189    */
190   struct GNUNET_TIME_Absolute ack_time_registered;
191
192   /**
193    * Queued Acknowledgement deadline
194    */
195   struct GNUNET_TIME_Relative ack_time_deadline;
196
197   /**
198    * The mesh handle
199    */
200   struct GNUNET_MESH_Handle *mesh;
201
202   /**
203    * The mesh tunnel handle
204    */
205   struct GNUNET_MESH_Tunnel *tunnel;
206
207   /**
208    * Stream open closure
209    */
210   void *open_cls;
211
212   /**
213    * Stream open callback
214    */
215   GNUNET_STREAM_OpenCallback open_cb;
216
217   /**
218    * The current transmit handle (if a pending transmit request exists)
219    */
220   struct GNUNET_MESH_TransmitHandle *transmit_handle;
221
222   /**
223    * The current message associated with the transmit handle
224    */
225   struct MessageQueue *queue_head;
226
227   /**
228    * The queue tail, should always point to the last message in queue
229    */
230   struct MessageQueue *queue_tail;
231
232   /**
233    * The write IO_handle associated with this socket
234    */
235   struct GNUNET_STREAM_IOWriteHandle *write_handle;
236
237   /**
238    * The read IO_handle associated with this socket
239    */
240   struct GNUNET_STREAM_IOReadHandle *read_handle;
241
242   /**
243    * The shutdown handle associated with this socket
244    */
245   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
246
247   /**
248    * Buffer for storing received messages
249    */
250   void *receive_buffer;
251
252   /**
253    * The listen socket from which this socket is derived. Should be NULL if it
254    * is not a derived socket
255    */
256   struct GNUNET_STREAM_ListenSocket *lsocket;
257
258   /**
259    * The peer identity of the peer at the other end of the stream
260    */
261   struct GNUNET_PeerIdentity other_peer;
262
263   /**
264    * Task identifier for the read io timeout task
265    */
266   GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id;
267
268   /**
269    * Task identifier for retransmission task after timeout
270    */
271   GNUNET_SCHEDULER_TaskIdentifier data_retransmission_task_id;
272
273   /**
274    * Task identifier for retransmission of control messages
275    */
276   GNUNET_SCHEDULER_TaskIdentifier control_retransmission_task_id;
277
278   /**
279    * The task for sending timely Acks
280    */
281   GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
282
283   /**
284    * Task scheduled to continue a read operation.
285    */
286   GNUNET_SCHEDULER_TaskIdentifier read_task_id;
287
288   /**
289    * The state of the protocol associated with this socket
290    */
291   enum State state;
292
293   /**
294    * The status of the socket
295    */
296   enum GNUNET_STREAM_Status status;
297
298   /**
299    * The number of previous timeouts; FIXME: currently not used
300    */
301   unsigned int retries;
302
303   /**
304    * The application port number (type: uint32_t)
305    */
306   GNUNET_MESH_ApplicationType app_port;
307
308   /**
309    * Whether testing mode is active or not
310    */
311   int testing_active;
312
313   /**
314    * The write sequence number to be set incase of testing
315    */
316   uint32_t testing_set_write_sequence_number_value;
317
318   /**
319    * The session id associated with this stream connection
320    * FIXME: Not used currently, may be removed
321    */
322   uint32_t session_id;
323
324   /**
325    * Write sequence number. Set to random when sending HELLO(client) and
326    * HELLO_ACK(server) 
327    */
328   uint32_t write_sequence_number;
329
330   /**
331    * Read sequence number. This number's value is determined during handshake
332    */
333   uint32_t read_sequence_number;
334
335   /**
336    * The receiver buffer size
337    */
338   uint32_t receive_buffer_size;
339
340   /**
341    * The receiver buffer boundaries
342    */
343   uint32_t receive_buffer_boundaries[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
344
345   /**
346    * receiver's available buffer after the last acknowledged packet
347    */
348   uint32_t receiver_window_available;
349
350   /**
351    * The offset pointer used during write operation
352    */
353   uint32_t write_offset;
354
355   /**
356    * The offset after which we are expecting data
357    */
358   uint32_t read_offset;
359
360   /**
361    * The offset upto which user has read from the received buffer
362    */
363   uint32_t copy_offset;
364
365   /**
366    * The maximum size of the data message payload this stream handle can send
367    */
368   uint16_t max_payload_size;
369 };
370
371
372 /**
373  * A socket for listening
374  */
375 struct GNUNET_STREAM_ListenSocket
376 {
377   /**
378    * The mesh handle
379    */
380   struct GNUNET_MESH_Handle *mesh;
381
382   /**
383    * Our configuration
384    */
385   struct GNUNET_CONFIGURATION_Handle *cfg;
386
387   /**
388    * Handle to the lock manager service
389    */
390   struct GNUNET_LOCKMANAGER_Handle *lockmanager;
391
392   /**
393    * The active LockingRequest from lockmanager
394    */
395   struct GNUNET_LOCKMANAGER_LockingRequest *locking_request;
396
397   /**
398    * Callback to call after acquring a lock and listening
399    */
400   GNUNET_STREAM_ListenSuccessCallback listen_ok_cb;
401
402   /**
403    * The callback function which is called after successful opening socket
404    */
405   GNUNET_STREAM_ListenCallback listen_cb;
406
407   /**
408    * The call back closure
409    */
410   void *listen_cb_cls;
411
412   /**
413    * The service port
414    */
415   GNUNET_MESH_ApplicationType port;
416   
417   /**
418    * The id of the lockmanager timeout task
419    */
420   GNUNET_SCHEDULER_TaskIdentifier lockmanager_acquire_timeout_task;
421
422   /**
423    * The retransmit timeout
424    */
425   struct GNUNET_TIME_Relative retransmit_timeout;
426   
427   /**
428    * Listen enabled?
429    */
430   int listening;
431
432   /**
433    * Whether testing mode is active or not
434    */
435   int testing_active;
436
437   /**
438    * The write sequence number to be set incase of testing
439    */
440   uint32_t testing_set_write_sequence_number_value;
441
442   /**
443    * The maximum size of the data message payload this stream handle can send
444    */
445   uint16_t max_payload_size;
446
447 };
448
449
450 /**
451  * The IO Write Handle
452  */
453 struct GNUNET_STREAM_IOWriteHandle
454 {
455   /**
456    * The socket to which this write handle is associated
457    */
458   struct GNUNET_STREAM_Socket *socket;
459
460   /**
461    * The packet_buffers associated with this Handle
462    */
463   struct GNUNET_STREAM_DataMessage *messages[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
464
465   /**
466    * The write continuation callback
467    */
468   GNUNET_STREAM_CompletionContinuation write_cont;
469
470   /**
471    * Write continuation closure
472    */
473   void *write_cont_cls;
474
475   /**
476    * The bitmap of this IOHandle; Corresponding bit for a message is set when
477    * it has been acknowledged by the receiver
478    */
479   GNUNET_STREAM_AckBitmap ack_bitmap;
480
481   /**
482    * Number of bytes in this write handle
483    */
484   size_t size;
485
486   /**
487    * Number of packets already transmitted from this IO handle. Retransmitted
488    * packets are not taken into account here. This is used to determine which
489    * packets account for retransmission and which packets occupy buffer space at
490    * the receiver.
491    */
492   unsigned int packets_sent;
493 };
494
495
496 /**
497  * The IO Read Handle
498  */
499 struct GNUNET_STREAM_IOReadHandle
500 {
501   /**
502    * The socket to which this read handle is associated
503    */
504   struct GNUNET_STREAM_Socket *socket;
505   
506   /**
507    * Callback for the read processor
508    */
509   GNUNET_STREAM_DataProcessor proc;
510
511   /**
512    * The closure pointer for the read processor callback
513    */
514   void *proc_cls;
515 };
516
517
518 /**
519  * Handle for Shutdown
520  */
521 struct GNUNET_STREAM_ShutdownHandle
522 {
523   /**
524    * The socket associated with this shutdown handle
525    */
526   struct GNUNET_STREAM_Socket *socket;
527
528   /**
529    * Shutdown completion callback
530    */
531   GNUNET_STREAM_ShutdownCompletion completion_cb;
532
533   /**
534    * Closure for completion callback
535    */
536   void *completion_cls;
537
538   /**
539    * Close message retransmission task id
540    */
541   GNUNET_SCHEDULER_TaskIdentifier close_msg_retransmission_task_id;
542
543   /**
544    * Which operation to shutdown? SHUT_RD, SHUT_WR or SHUT_RDWR
545    */
546   int operation;  
547 };
548
549
550 /**
551  * Default value in seconds for various timeouts
552  */
553 static const unsigned int default_timeout = 10;
554
555 /**
556  * The domain name for locks we use here
557  */
558 static const char *locking_domain = "GNUNET_STREAM_APPLOCK";
559
560
561 /**
562  * Callback function for sending queued message
563  *
564  * @param cls closure the socket
565  * @param size number of bytes available in buf
566  * @param buf where the callee should write the message
567  * @return number of bytes written to buf
568  */
569 static size_t
570 send_message_notify (void *cls, size_t size, void *buf)
571 {
572   struct GNUNET_STREAM_Socket *socket = cls;
573   struct MessageQueue *head;
574   size_t ret;
575
576   socket->transmit_handle = NULL; /* Remove the transmit handle */
577   head = socket->queue_head;
578   if (NULL == head)
579     return 0; /* just to be safe */
580   if (0 == size)                /* request timed out */
581   {
582     socket->retries++;
583     LOG (GNUNET_ERROR_TYPE_DEBUG,
584          "%s: Message sending timed out. Retry %d \n",
585          GNUNET_i2s (&socket->other_peer),
586          socket->retries);
587     socket->transmit_handle = 
588       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
589                                          GNUNET_NO, /* Corking */
590                                          /* FIXME: exponential backoff */
591                                          socket->retransmit_timeout,
592                                          &socket->other_peer,
593                                          ntohs (head->message->header.size),
594                                          &send_message_notify,
595                                          socket);
596     return 0;
597   }
598   ret = ntohs (head->message->header.size);
599   GNUNET_assert (size >= ret);
600   memcpy (buf, head->message, ret);
601   if (NULL != head->finish_cb)
602   {
603     head->finish_cb (head->finish_cb_cls, socket);
604   }
605   GNUNET_CONTAINER_DLL_remove (socket->queue_head,
606                                socket->queue_tail,
607                                head);
608   GNUNET_free (head->message);
609   GNUNET_free (head);
610   head = socket->queue_head;
611   if (NULL != head)    /* more pending messages to send */
612   {
613     socket->retries = 0;
614     socket->transmit_handle = 
615       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
616                                          GNUNET_NO, /* Corking */
617                                          /* FIXME: exponential backoff */
618                                          socket->retransmit_timeout,
619                                          &socket->other_peer,
620                                          ntohs (head->message->header.size),
621                                          &send_message_notify,
622                                          socket);
623   }
624   return ret;
625 }
626
627
628 /**
629  * Queues a message for sending using the mesh connection of a socket
630  *
631  * @param socket the socket whose mesh connection is used
632  * @param message the message to be sent
633  * @param finish_cb the callback to be called when the message is sent
634  * @param finish_cb_cls the closure for the callback
635  * @param urgent set to GNUNET_YES to add the message to the beginning of the
636  *          queue; GNUNET_NO to add at the tail
637  */
638 static void
639 queue_message (struct GNUNET_STREAM_Socket *socket,
640                struct GNUNET_STREAM_MessageHeader *message,
641                SendFinishCallback finish_cb,
642                void *finish_cb_cls,
643                int urgent)
644 {
645   struct MessageQueue *queue_entity;
646
647   GNUNET_assert 
648     ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA)
649      && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK));
650   LOG (GNUNET_ERROR_TYPE_DEBUG,
651        "%s: Queueing message of type %d and size %d\n",
652        GNUNET_i2s (&socket->other_peer),
653        ntohs (message->header.type),
654        ntohs (message->header.size));
655   GNUNET_assert (NULL != message);
656   queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
657   queue_entity->message = message;
658   queue_entity->finish_cb = finish_cb;
659   queue_entity->finish_cb_cls = finish_cb_cls;
660   if (GNUNET_YES == urgent)
661   {
662     GNUNET_CONTAINER_DLL_insert (socket->queue_head, socket->queue_tail,
663                                  queue_entity);
664     if (NULL != socket->transmit_handle)
665     {
666       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
667       socket->transmit_handle = NULL;
668     }
669   }
670   else
671     GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
672                                       socket->queue_tail,
673                                       queue_entity);
674   if (NULL == socket->transmit_handle)
675   {
676     socket->retries = 0;
677     socket->transmit_handle = 
678         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
679                                            GNUNET_NO, /* Corking */
680                                            socket->retransmit_timeout,
681                                            &socket->other_peer,
682                                            ntohs (message->header.size),
683                                            &send_message_notify,
684                                            socket);
685   }
686 }
687
688
689 /**
690  * Copies a message and queues it for sending using the mesh connection of
691  * given socket 
692  *
693  * @param socket the socket whose mesh connection is used
694  * @param message the message to be sent
695  * @param finish_cb the callback to be called when the message is sent
696  * @param finish_cb_cls the closure for the callback
697  */
698 static void
699 copy_and_queue_message (struct GNUNET_STREAM_Socket *socket,
700                         const struct GNUNET_STREAM_MessageHeader *message,
701                         SendFinishCallback finish_cb,
702                         void *finish_cb_cls)
703 {
704   struct GNUNET_STREAM_MessageHeader *msg_copy;
705   uint16_t size;
706   
707   size = ntohs (message->header.size);
708   msg_copy = GNUNET_malloc (size);
709   memcpy (msg_copy, message, size);
710   queue_message (socket, msg_copy, finish_cb, finish_cb_cls, GNUNET_NO);
711 }
712
713
714 /**
715  * Writes data using the given socket. The amount of data written is limited by
716  * the receiver_window_size
717  *
718  * @param socket the socket to use
719  */
720 static void 
721 write_data (struct GNUNET_STREAM_Socket *socket);
722
723
724 /**
725  * Task for retransmitting data messages if they aren't ACK before their ack
726  * deadline 
727  *
728  * @param cls the socket
729  * @param tc the Task context
730  */
731 static void
732 data_retransmission_task (void *cls,
733                           const struct GNUNET_SCHEDULER_TaskContext *tc)
734 {
735   struct GNUNET_STREAM_Socket *socket = cls;
736   
737   socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
738   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
739     return;
740   LOG (GNUNET_ERROR_TYPE_DEBUG,
741        "%s: Retransmitting DATA...\n", GNUNET_i2s (&socket->other_peer));
742   write_data (socket);
743 }
744
745
746 /**
747  * Task for sending ACK message
748  *
749  * @param cls the socket
750  * @param tc the Task context
751  */
752 static void
753 ack_task (void *cls,
754           const struct GNUNET_SCHEDULER_TaskContext *tc)
755 {
756   struct GNUNET_STREAM_Socket *socket = cls;
757   struct GNUNET_STREAM_AckMessage *ack_msg;
758
759   socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
760   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
761     return;
762   /* Create the ACK Message */
763   ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
764   ack_msg->header.header.size = htons (sizeof (struct 
765                                                GNUNET_STREAM_AckMessage));
766   ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
767   ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
768   ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
769   ack_msg->receive_window_remaining = 
770     htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
771   /* Queue up ACK for immediate sending */
772   queue_message (socket, &ack_msg->header, NULL, NULL, GNUNET_YES);
773 }
774
775
776 /**
777  * Retransmission task for shutdown messages
778  *
779  * @param cls the shutdown handle
780  * @param tc the Task Context
781  */
782 static void
783 close_msg_retransmission_task (void *cls,
784                                const struct GNUNET_SCHEDULER_TaskContext *tc)
785 {
786   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle = cls;
787   struct GNUNET_STREAM_MessageHeader *msg;
788   struct GNUNET_STREAM_Socket *socket;
789
790   shutdown_handle->close_msg_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
791   GNUNET_assert (NULL != shutdown_handle);
792   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
793     return;
794   socket = shutdown_handle->socket;
795   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
796   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
797   switch (shutdown_handle->operation)
798   {
799   case SHUT_RDWR:
800     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
801     break;
802   case SHUT_RD:
803     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
804     break;
805   case SHUT_WR:
806     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
807     break;
808   default:
809     GNUNET_free (msg);
810     shutdown_handle->close_msg_retransmission_task_id = 
811       GNUNET_SCHEDULER_NO_TASK;
812     return;
813   }
814   queue_message (socket, msg, NULL, NULL, GNUNET_NO);
815   shutdown_handle->close_msg_retransmission_task_id =
816     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
817                                   &close_msg_retransmission_task,
818                                   shutdown_handle);
819 }
820
821
822 /**
823  * Function to modify a bit in GNUNET_STREAM_AckBitmap
824  *
825  * @param bitmap the bitmap to modify
826  * @param bit the bit number to modify
827  * @param value GNUNET_YES to on, GNUNET_NO to off
828  */
829 static void
830 ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
831                       unsigned int bit, 
832                       int value)
833 {
834   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
835   if (GNUNET_YES == value)
836     *bitmap |= (1LL << bit);
837   else
838     *bitmap &= ~(1LL << bit);
839 }
840
841
842 /**
843  * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
844  *
845  * @param bitmap address of the bitmap that has to be checked
846  * @param bit the bit number to check
847  * @return GNUNET_YES if the bit is set; GNUNET_NO if not
848  */
849 static uint8_t
850 ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
851                       unsigned int bit)
852 {
853   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
854   return 0 != (*bitmap & (1LL << bit));
855 }
856
857
858 /**
859  * Writes data using the given socket. The amount of data written is limited by
860  * the receiver_window_size
861  *
862  * @param socket the socket to use
863  */
864 static void 
865 write_data (struct GNUNET_STREAM_Socket *socket)
866 {
867   struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
868   unsigned int packet;
869   
870   for (packet=0; packet < io_handle->packets_sent; packet++)
871   {
872     if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
873                                            packet))
874     {
875       LOG (GNUNET_ERROR_TYPE_DEBUG,
876            "%s: Retransmitting DATA message with sequence %u\n",
877            GNUNET_i2s (&socket->other_peer),
878            ntohl (io_handle->messages[packet]->sequence_number));
879       copy_and_queue_message (socket,
880                               &io_handle->messages[packet]->header,
881                               NULL,
882                               NULL);
883     }
884   }
885   /* Now send new packets if there is enough buffer space */
886   while ( (NULL != io_handle->messages[packet]) &&
887           (socket->receiver_window_available 
888            >= ntohs (io_handle->messages[packet]->header.header.size)) &&
889           (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
890   {
891     socket->receiver_window_available -= 
892       ntohs (io_handle->messages[packet]->header.header.size);
893     LOG (GNUNET_ERROR_TYPE_DEBUG,
894          "%s: Placing DATA message with sequence %u in send queue\n",
895          GNUNET_i2s (&socket->other_peer),
896          ntohl (io_handle->messages[packet]->sequence_number));
897     copy_and_queue_message (socket,
898                             &io_handle->messages[packet]->header,
899                             NULL,
900                             NULL);
901     packet++;
902   }
903   io_handle->packets_sent = packet;
904   if (GNUNET_SCHEDULER_NO_TASK == socket->data_retransmission_task_id)
905     socket->data_retransmission_task_id = 
906       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 
907                                     (GNUNET_TIME_UNIT_SECONDS, 8),
908                                     &data_retransmission_task,
909                                     socket);
910 }
911
912
913 /**
914  * Task for calling the read processor
915  *
916  * @param cls the socket
917  * @param tc the task context
918  */
919 static void
920 call_read_processor (void *cls,
921                      const struct GNUNET_SCHEDULER_TaskContext *tc)
922 {
923   struct GNUNET_STREAM_Socket *socket = cls;
924   size_t read_size;
925   size_t valid_read_size;
926   unsigned int packet;
927   uint32_t sequence_increase;
928   uint32_t offset_increase;
929
930   socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
931   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
932     return;
933
934   if (NULL == socket->receive_buffer) 
935     return;
936
937   GNUNET_assert (NULL != socket->read_handle);
938   GNUNET_assert (NULL != socket->read_handle->proc);
939
940   /* Check the bitmap for any holes */
941   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
942   {
943     if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
944                                            packet))
945       break;
946   }
947   /* We only call read processor if we have the first packet */
948   GNUNET_assert (0 < packet);
949   valid_read_size = 
950     socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
951   GNUNET_assert (0 != valid_read_size);
952   /* Cancel the read_io_timeout_task */
953   GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task_id);
954   socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
955   /* Call the data processor */
956   LOG (GNUNET_ERROR_TYPE_DEBUG,
957        "%s: Calling read processor\n",
958        GNUNET_i2s (&socket->other_peer));
959   read_size = 
960     socket->read_handle->proc (socket->read_handle->proc_cls,
961                                socket->status,
962                                socket->receive_buffer + socket->copy_offset,
963                                valid_read_size);
964   LOG (GNUNET_ERROR_TYPE_DEBUG,
965        "%s: Read processor read %d bytes\n",
966        GNUNET_i2s (&socket->other_peer), read_size);
967   LOG (GNUNET_ERROR_TYPE_DEBUG,
968        "%s: Read processor completed successfully\n",
969        GNUNET_i2s (&socket->other_peer));
970   /* Free the read handle */
971   GNUNET_free (socket->read_handle);
972   socket->read_handle = NULL;
973   GNUNET_assert (read_size <= valid_read_size);
974   socket->copy_offset += read_size;
975   /* Determine upto which packet we can remove from the buffer */
976   for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
977   {
978     if (socket->copy_offset == socket->receive_buffer_boundaries[packet])
979     { packet++; break; }
980     if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
981       break;
982   }
983
984   /* If no packets can be removed we can't move the buffer */
985   if (0 == packet) return;
986   sequence_increase = packet;
987   LOG (GNUNET_ERROR_TYPE_DEBUG,
988        "%s: Sequence increase after read processor completion: %u\n",
989        GNUNET_i2s (&socket->other_peer), sequence_increase);
990
991   /* Shift the data in the receive buffer */
992   socket->receive_buffer = 
993     memmove (socket->receive_buffer,
994              socket->receive_buffer 
995              + socket->receive_buffer_boundaries[sequence_increase-1],
996              socket->receive_buffer_size
997              - socket->receive_buffer_boundaries[sequence_increase-1]);
998   /* Shift the bitmap */
999   socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
1000   /* Set read_sequence_number */
1001   socket->read_sequence_number += sequence_increase;
1002   /* Set read_offset */
1003   offset_increase = socket->receive_buffer_boundaries[sequence_increase-1];
1004   socket->read_offset += offset_increase;
1005   /* Fix copy_offset */
1006   GNUNET_assert (offset_increase <= socket->copy_offset);
1007   socket->copy_offset -= offset_increase;
1008   /* Fix relative boundaries */
1009   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
1010   {
1011     if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase)
1012     {
1013       uint32_t ahead_buffer_boundary;
1014
1015       ahead_buffer_boundary = 
1016         socket->receive_buffer_boundaries[packet + sequence_increase];
1017       if (0 == ahead_buffer_boundary)
1018         socket->receive_buffer_boundaries[packet] = 0;
1019       else
1020       {
1021         GNUNET_assert (offset_increase < ahead_buffer_boundary);
1022         socket->receive_buffer_boundaries[packet] = 
1023           ahead_buffer_boundary - offset_increase;
1024       }
1025     }
1026     else
1027       socket->receive_buffer_boundaries[packet] = 0;
1028   }
1029 }
1030
1031
1032 /**
1033  * Cancels the existing read io handle
1034  *
1035  * @param cls the closure from the SCHEDULER call
1036  * @param tc the task context
1037  */
1038 static void
1039 read_io_timeout (void *cls,
1040                  const struct GNUNET_SCHEDULER_TaskContext *tc)
1041 {
1042   struct GNUNET_STREAM_Socket *socket = cls;
1043   GNUNET_STREAM_DataProcessor proc;
1044   void *proc_cls;
1045
1046   socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
1047   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
1048     return;
1049   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
1050   {
1051     LOG (GNUNET_ERROR_TYPE_DEBUG,
1052          "%s: Read task timedout - Cancelling it\n",
1053          GNUNET_i2s (&socket->other_peer));
1054     GNUNET_SCHEDULER_cancel (socket->read_task_id);
1055     socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
1056   }
1057   GNUNET_assert (NULL != socket->read_handle);
1058   proc = socket->read_handle->proc;
1059   proc_cls = socket->read_handle->proc_cls;
1060   GNUNET_free (socket->read_handle);
1061   socket->read_handle = NULL;
1062   /* Call the read processor to signal timeout */
1063   proc (proc_cls,
1064         GNUNET_STREAM_TIMEOUT,
1065         NULL,
1066         0);
1067 }
1068
1069
1070 /**
1071  * Handler for DATA messages; Same for both client and server
1072  *
1073  * @param socket the socket through which the ack was received
1074  * @param tunnel connection to the other end
1075  * @param sender who sent the message
1076  * @param msg the data message
1077  * @param atsi performance data for the connection
1078  * @return GNUNET_OK to keep the connection open,
1079  *         GNUNET_SYSERR to close it (signal serious error)
1080  */
1081 static int
1082 handle_data (struct GNUNET_STREAM_Socket *socket,
1083              struct GNUNET_MESH_Tunnel *tunnel,
1084              const struct GNUNET_PeerIdentity *sender,
1085              const struct GNUNET_STREAM_DataMessage *msg,
1086              const struct GNUNET_ATS_Information*atsi)
1087 {
1088   const void *payload;
1089   struct GNUNET_TIME_Relative ack_deadline_rel;
1090   uint32_t bytes_needed;
1091   uint32_t relative_offset;
1092   uint32_t relative_sequence_number;
1093   uint16_t size;
1094
1095   size = htons (msg->header.header.size);
1096   if (size < sizeof (struct GNUNET_STREAM_DataMessage))
1097   {
1098     GNUNET_break_op (0);
1099     return GNUNET_SYSERR;
1100   }
1101   if (0 != memcmp (sender, &socket->other_peer,
1102                    sizeof (struct GNUNET_PeerIdentity)))
1103   {
1104     LOG (GNUNET_ERROR_TYPE_DEBUG,
1105          "%s: Received DATA from non-confirming peer\n",
1106          GNUNET_i2s (&socket->other_peer));
1107     return GNUNET_YES;
1108   }
1109   switch (socket->state)
1110   {
1111   case STATE_ESTABLISHED:
1112   case STATE_TRANSMIT_CLOSED:
1113   case STATE_TRANSMIT_CLOSE_WAIT:      
1114     /* check if the message's sequence number is in the range we are
1115        expecting */
1116     relative_sequence_number = 
1117       ntohl (msg->sequence_number) - socket->read_sequence_number;
1118     if ( relative_sequence_number > GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
1119     {
1120       LOG (GNUNET_ERROR_TYPE_DEBUG,
1121            "%s: Ignoring received message with sequence number %u\n",
1122            GNUNET_i2s (&socket->other_peer),
1123            ntohl (msg->sequence_number));
1124       /* Start ACK sending task if one is not already present */
1125       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1126       {
1127         socket->ack_task_id = 
1128           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1129                                         (msg->ack_deadline),
1130                                         &ack_task,
1131                                         socket);
1132       }
1133       return GNUNET_YES;
1134     }      
1135     /* Check if we have already seen this message */
1136     if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
1137                                             relative_sequence_number))
1138     {
1139       LOG (GNUNET_ERROR_TYPE_DEBUG,
1140            "%s: Ignoring already received message with sequence number %u\n",
1141            GNUNET_i2s (&socket->other_peer),
1142            ntohl (msg->sequence_number));
1143       /* Start ACK sending task if one is not already present */
1144       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1145       {
1146         socket->ack_task_id = 
1147           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1148                                         (msg->ack_deadline), &ack_task, socket);
1149       }
1150       return GNUNET_YES;
1151     }
1152     LOG (GNUNET_ERROR_TYPE_DEBUG,
1153          "%s: Receiving DATA with sequence number: %u and size: %d from %s\n",
1154          GNUNET_i2s (&socket->other_peer), ntohl (msg->sequence_number),
1155          ntohs (msg->header.header.size), GNUNET_i2s (&socket->other_peer));
1156     /* Check if we have to allocate the buffer */
1157     size -= sizeof (struct GNUNET_STREAM_DataMessage);
1158     relative_offset = ntohl (msg->offset) - socket->read_offset;
1159     bytes_needed = relative_offset + size;
1160     if (bytes_needed > socket->receive_buffer_size)
1161     {
1162       if (bytes_needed <= RECEIVE_BUFFER_SIZE)
1163       {
1164         socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
1165                                                  bytes_needed);
1166         socket->receive_buffer_size = bytes_needed;
1167       }
1168       else
1169       {
1170         LOG (GNUNET_ERROR_TYPE_DEBUG,
1171              "%s: Cannot accommodate packet %d as buffer is full\n",
1172              GNUNET_i2s (&socket->other_peer), ntohl (msg->sequence_number));
1173         return GNUNET_YES;
1174       }
1175     }
1176     /* Copy Data to buffer */
1177     payload = &msg[1];
1178     GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
1179     memcpy (socket->receive_buffer + relative_offset, payload, size);
1180     socket->receive_buffer_boundaries[relative_sequence_number] = 
1181         relative_offset + size;
1182     /* Modify the ACK bitmap */
1183     ackbitmap_modify_bit (&socket->ack_bitmap, relative_sequence_number,
1184                           GNUNET_YES);
1185     /* Start ACK sending task if one is not already present */
1186     ack_deadline_rel = GNUNET_TIME_relative_ntoh (msg->ack_deadline);
1187     if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1188     {
1189       ack_deadline_rel = 
1190           GNUNET_TIME_relative_min (ack_deadline_rel,
1191                                     GNUNET_TIME_relative_multiply
1192                                     (GNUNET_TIME_UNIT_SECONDS, 300));
1193       socket->ack_task_id = 
1194           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh 
1195                                         (msg->ack_deadline), &ack_task, socket);
1196       socket->ack_time_registered = GNUNET_TIME_absolute_get ();
1197       socket->ack_time_deadline = ack_deadline_rel;
1198     }
1199     else
1200     {
1201       struct GNUNET_TIME_Relative ack_time_past;
1202       struct GNUNET_TIME_Relative ack_time_remaining;
1203       struct GNUNET_TIME_Relative ack_time_min;
1204       ack_time_past = 
1205           GNUNET_TIME_absolute_get_duration (socket->ack_time_registered);
1206       ack_time_remaining = GNUNET_TIME_relative_subtract
1207           (socket->ack_time_deadline, ack_time_past);
1208       ack_time_min = GNUNET_TIME_relative_min (ack_time_remaining,
1209                                                ack_deadline_rel);
1210       if (0 == memcmp(&ack_deadline_rel, &ack_time_min,
1211                       sizeof (struct GNUNET_TIME_Relative)))
1212       {
1213         ack_deadline_rel = ack_time_min;
1214         GNUNET_SCHEDULER_cancel (socket->ack_task_id);
1215         socket->ack_task_id = GNUNET_SCHEDULER_add_delayed (ack_deadline_rel,
1216                                                             &ack_task, socket);
1217         socket->ack_time_registered = GNUNET_TIME_absolute_get ();
1218         socket->ack_time_deadline = ack_deadline_rel;
1219       }
1220     }
1221     if ((NULL != socket->read_handle) /* A read handle is waiting */
1222         /* There is no current read task */
1223         && (GNUNET_SCHEDULER_NO_TASK == socket->read_task_id)
1224         /* We have the first packet */
1225         && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap, 0)))
1226     {
1227       LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Scheduling read processor\n", 
1228            GNUNET_i2s (&socket->other_peer));          
1229       socket->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
1230                                                        socket);
1231     }      
1232     break;
1233   default:
1234     LOG (GNUNET_ERROR_TYPE_DEBUG,
1235          "%s: Received data message when it cannot be handled\n",
1236          GNUNET_i2s (&socket->other_peer));
1237     break;
1238   }
1239   return GNUNET_YES;
1240 }
1241
1242
1243 /**
1244  * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1245  *
1246  * @param cls the socket (set from GNUNET_MESH_connect)
1247  * @param tunnel connection to the other end
1248  * @param tunnel_ctx place to store local state associated with the tunnel
1249  * @param sender who sent the message
1250  * @param message the actual message
1251  * @param atsi performance data for the connection
1252  * @return GNUNET_OK to keep the connection open,
1253  *         GNUNET_SYSERR to close it (signal serious error)
1254  */
1255 static int
1256 client_handle_data (void *cls,
1257                     struct GNUNET_MESH_Tunnel *tunnel,
1258                     void **tunnel_ctx,
1259                     const struct GNUNET_PeerIdentity *sender,
1260                     const struct GNUNET_MessageHeader *message,
1261                     const struct GNUNET_ATS_Information*atsi)
1262 {
1263   struct GNUNET_STREAM_Socket *socket = cls;
1264
1265   return handle_data (socket, tunnel, sender, 
1266                       (const struct GNUNET_STREAM_DataMessage *) message, atsi);
1267 }
1268
1269
1270 /**
1271  * Callback to set state to ESTABLISHED
1272  *
1273  * @param cls the closure NULL;
1274  * @param socket the socket to requiring state change
1275  */
1276 static void
1277 set_state_established (void *cls,
1278                        struct GNUNET_STREAM_Socket *socket)
1279 {
1280   LOG (GNUNET_ERROR_TYPE_DEBUG, 
1281        "%s: Attaining ESTABLISHED state\n",
1282        GNUNET_i2s (&socket->other_peer));
1283   socket->write_offset = 0;
1284   socket->read_offset = 0;
1285   socket->state = STATE_ESTABLISHED;
1286   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK !=
1287                  socket->control_retransmission_task_id);
1288   GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
1289   socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
1290   if (NULL != socket->lsocket)
1291   {
1292     LOG (GNUNET_ERROR_TYPE_DEBUG,
1293          "%s: Calling listen callback\n",
1294          GNUNET_i2s (&socket->other_peer));
1295     if (GNUNET_SYSERR == 
1296         socket->lsocket->listen_cb (socket->lsocket->listen_cb_cls,
1297                                     socket,
1298                                     &socket->other_peer))
1299     {
1300       socket->state = STATE_CLOSED;
1301       /* FIXME: We should close in a decent way (send RST) */
1302       GNUNET_MESH_tunnel_destroy (socket->tunnel); /* Destroy the tunnel */
1303       GNUNET_free (socket);
1304     }
1305   }
1306   else
1307     socket->open_cb (socket->open_cls, socket);
1308 }
1309
1310
1311 /**
1312  * Callback to set state to HELLO_WAIT
1313  *
1314  * @param cls the closure from queue_message
1315  * @param socket the socket to requiring state change
1316  */
1317 static void
1318 set_state_hello_wait (void *cls,
1319                       struct GNUNET_STREAM_Socket *socket)
1320 {
1321   GNUNET_assert (STATE_INIT == socket->state);
1322   LOG (GNUNET_ERROR_TYPE_DEBUG,
1323        "%s: Attaining HELLO_WAIT state\n",
1324        GNUNET_i2s (&socket->other_peer));
1325   socket->state = STATE_HELLO_WAIT;
1326 }
1327
1328
1329 /**
1330  * Callback to set state to CLOSE_WAIT
1331  *
1332  * @param cls the closure from queue_message
1333  * @param socket the socket requiring state change
1334  */
1335 static void
1336 set_state_close_wait (void *cls,
1337                       struct GNUNET_STREAM_Socket *socket)
1338 {
1339   LOG (GNUNET_ERROR_TYPE_DEBUG,
1340        "%s: Attaing CLOSE_WAIT state\n",
1341        GNUNET_i2s (&socket->other_peer));
1342   socket->state = STATE_CLOSE_WAIT;
1343   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1344   socket->receive_buffer = NULL;
1345   socket->receive_buffer_size = 0;
1346 }
1347
1348
1349 /**
1350  * Callback to set state to RECEIVE_CLOSE_WAIT
1351  *
1352  * @param cls the closure from queue_message
1353  * @param socket the socket requiring state change
1354  */
1355 static void
1356 set_state_receive_close_wait (void *cls,
1357                               struct GNUNET_STREAM_Socket *socket)
1358 {
1359   LOG (GNUNET_ERROR_TYPE_DEBUG,
1360        "%s: Attaing RECEIVE_CLOSE_WAIT state\n",
1361        GNUNET_i2s (&socket->other_peer));
1362   socket->state = STATE_RECEIVE_CLOSE_WAIT;
1363   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1364   socket->receive_buffer = NULL;
1365   socket->receive_buffer_size = 0;
1366 }
1367
1368
1369 /**
1370  * Callback to set state to TRANSMIT_CLOSE_WAIT
1371  *
1372  * @param cls the closure from queue_message
1373  * @param socket the socket requiring state change
1374  */
1375 static void
1376 set_state_transmit_close_wait (void *cls,
1377                                struct GNUNET_STREAM_Socket *socket)
1378 {
1379   LOG (GNUNET_ERROR_TYPE_DEBUG,
1380        "%s: Attaing TRANSMIT_CLOSE_WAIT state\n",
1381        GNUNET_i2s (&socket->other_peer));
1382   socket->state = STATE_TRANSMIT_CLOSE_WAIT;
1383 }
1384
1385
1386 /**
1387  * Callback to set state to CLOSED
1388  *
1389  * @param cls the closure from queue_message
1390  * @param socket the socket requiring state change
1391  */
1392 static void
1393 set_state_closed (void *cls,
1394                   struct GNUNET_STREAM_Socket *socket)
1395 {
1396   socket->state = STATE_CLOSED;
1397 }
1398
1399
1400 /**
1401  * Returns GNUNET_MESSAGE_TYPE_STREAM_HELLO
1402  *
1403  * @return the generate hello message
1404  */
1405 static struct GNUNET_STREAM_MessageHeader *
1406 generate_hello (void)
1407 {
1408   struct GNUNET_STREAM_MessageHeader *msg;
1409
1410   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1411   msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
1412   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1413   return msg;
1414 }
1415
1416
1417 /**
1418  * Returns a new HelloAckMessage. Also sets the write sequence number for the
1419  * socket
1420  *
1421  * @param socket the socket for which this HelloAckMessage has to be generated
1422  * @param generate_seq GNUNET_YES to generate the write sequence number,
1423  *          GNUNET_NO to use the existing sequence number
1424  * @return the HelloAckMessage
1425  */
1426 static struct GNUNET_STREAM_HelloAckMessage *
1427 generate_hello_ack (struct GNUNET_STREAM_Socket *socket,
1428                     int generate_seq)
1429 {
1430   struct GNUNET_STREAM_HelloAckMessage *msg;
1431
1432   if (GNUNET_YES == generate_seq)
1433   {
1434     if (GNUNET_YES == socket->testing_active)
1435       socket->write_sequence_number =
1436         socket->testing_set_write_sequence_number_value;
1437     else
1438       socket->write_sequence_number = 
1439         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1440     LOG_DEBUG ("%s: write sequence number %u\n",
1441                GNUNET_i2s (&socket->other_peer),
1442                (unsigned int) socket->write_sequence_number);
1443   }
1444   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1445   msg->header.header.size = 
1446     htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1447   msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1448   msg->sequence_number = htonl (socket->write_sequence_number);
1449   msg->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
1450   return msg;
1451 }
1452
1453
1454 /**
1455  * Task for retransmitting control messages if they aren't ACK'ed before a
1456  * deadline
1457  *
1458  * @param cls the socket
1459  * @param tc the Task context
1460  */
1461 static void
1462 control_retransmission_task (void *cls,
1463                              const struct GNUNET_SCHEDULER_TaskContext *tc)
1464 {
1465   struct GNUNET_STREAM_Socket *socket = cls;
1466     
1467   socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
1468   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
1469     return;
1470   LOG_DEBUG ("%s: Retransmitting a control message\n",
1471                  GNUNET_i2s (&socket->other_peer));
1472   switch (socket->state)
1473   {
1474   case STATE_INIT:    
1475     GNUNET_break (0);
1476     break;
1477   case STATE_LISTEN:
1478     GNUNET_break (0);
1479     break;
1480   case STATE_HELLO_WAIT:
1481     if (NULL == socket->lsocket) /* We are client */
1482       queue_message (socket, generate_hello (), NULL, NULL, GNUNET_NO);
1483     else
1484       queue_message (socket,
1485                      (struct GNUNET_STREAM_MessageHeader *)
1486                      generate_hello_ack (socket, GNUNET_NO), NULL, NULL,
1487                      GNUNET_NO);
1488     socket->control_retransmission_task_id =
1489     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
1490                                   &control_retransmission_task, socket);
1491     break;
1492   case STATE_ESTABLISHED:
1493     if (NULL == socket->lsocket)
1494       queue_message (socket,
1495                      (struct GNUNET_STREAM_MessageHeader *)
1496                      generate_hello_ack (socket, GNUNET_NO), NULL, NULL,
1497                      GNUNET_NO);
1498     else
1499       GNUNET_break (0);
1500   default:
1501     GNUNET_break (0);
1502   }  
1503 }
1504
1505
1506 /**
1507  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1508  *
1509  * @param cls the socket (set from GNUNET_MESH_connect)
1510  * @param tunnel connection to the other end
1511  * @param tunnel_ctx this is NULL
1512  * @param sender who sent the message
1513  * @param message the actual message
1514  * @param atsi performance data for the connection
1515  * @return GNUNET_OK to keep the connection open,
1516  *         GNUNET_SYSERR to close it (signal serious error)
1517  */
1518 static int
1519 client_handle_hello_ack (void *cls,
1520                          struct GNUNET_MESH_Tunnel *tunnel,
1521                          void **tunnel_ctx,
1522                          const struct GNUNET_PeerIdentity *sender,
1523                          const struct GNUNET_MessageHeader *message,
1524                          const struct GNUNET_ATS_Information*atsi)
1525 {
1526   struct GNUNET_STREAM_Socket *socket = cls;
1527   const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
1528   struct GNUNET_STREAM_HelloAckMessage *reply;
1529
1530   if (0 != memcmp (sender, &socket->other_peer,
1531                    sizeof (struct GNUNET_PeerIdentity)))
1532   {
1533     LOG (GNUNET_ERROR_TYPE_DEBUG,
1534          "%s: Received HELLO_ACK from non-confirming peer\n",
1535          GNUNET_i2s (&socket->other_peer));
1536     return GNUNET_YES;
1537   }
1538   ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
1539   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received HELLO_ACK from %s\n",
1540        GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1541   GNUNET_assert (socket->tunnel == tunnel);
1542   switch (socket->state)
1543   {
1544   case STATE_HELLO_WAIT:
1545     socket->read_sequence_number = ntohl (ack_msg->sequence_number);
1546     LOG (GNUNET_ERROR_TYPE_DEBUG,
1547          "%s: Read sequence number %u\n",
1548          GNUNET_i2s (&socket->other_peer),
1549          (unsigned int) socket->read_sequence_number);
1550     socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
1551     reply = generate_hello_ack (socket, GNUNET_YES);
1552     queue_message (socket, &reply->header, &set_state_established,
1553                    NULL, GNUNET_NO);    
1554     return GNUNET_OK;
1555   case STATE_ESTABLISHED:
1556     // call statistics (# ACKs ignored++)
1557     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
1558                    socket->control_retransmission_task_id);
1559     socket->control_retransmission_task_id =
1560       GNUNET_SCHEDULER_add_now (&control_retransmission_task, socket);
1561     return GNUNET_OK;
1562   default:
1563     LOG_DEBUG ("%s: Server %s sent HELLO_ACK when in state %d\n", 
1564                GNUNET_i2s (&socket->other_peer),
1565                GNUNET_i2s (&socket->other_peer), socket->state);
1566     socket->state = STATE_CLOSED; // introduce STATE_ERROR?
1567     return GNUNET_SYSERR;
1568   }
1569 }
1570
1571
1572 /**
1573  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1574  *
1575  * @param cls the socket (set from GNUNET_MESH_connect)
1576  * @param tunnel connection to the other end
1577  * @param tunnel_ctx this is NULL
1578  * @param sender who sent the message
1579  * @param message the actual message
1580  * @param atsi performance data for the connection
1581  * @return GNUNET_OK to keep the connection open,
1582  *         GNUNET_SYSERR to close it (signal serious error)
1583  */
1584 static int
1585 client_handle_reset (void *cls,
1586                      struct GNUNET_MESH_Tunnel *tunnel,
1587                      void **tunnel_ctx,
1588                      const struct GNUNET_PeerIdentity *sender,
1589                      const struct GNUNET_MessageHeader *message,
1590                      const struct GNUNET_ATS_Information*atsi)
1591 {
1592   // struct GNUNET_STREAM_Socket *socket = cls;
1593
1594   return GNUNET_OK;
1595 }
1596
1597
1598 /**
1599  * Common message handler for handling TRANSMIT_CLOSE messages
1600  *
1601  * @param socket the socket through which the ack was received
1602  * @param tunnel connection to the other end
1603  * @param sender who sent the message
1604  * @param msg the transmit close message
1605  * @param atsi performance data for the connection
1606  * @return GNUNET_OK to keep the connection open,
1607  *         GNUNET_SYSERR to close it (signal serious error)
1608  */
1609 static int
1610 handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
1611                        struct GNUNET_MESH_Tunnel *tunnel,
1612                        const struct GNUNET_PeerIdentity *sender,
1613                        const struct GNUNET_STREAM_MessageHeader *msg,
1614                        const struct GNUNET_ATS_Information*atsi)
1615 {
1616   struct GNUNET_STREAM_MessageHeader *reply;
1617
1618   switch (socket->state)
1619   {
1620   case STATE_ESTABLISHED:
1621     socket->state = STATE_RECEIVE_CLOSED;
1622     /* Send TRANSMIT_CLOSE_ACK */
1623     reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1624     reply->header.type = 
1625       htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
1626     reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1627     queue_message (socket, reply, NULL, NULL, GNUNET_NO);
1628     break;
1629   default:
1630     /* FIXME: Call statistics? */
1631     break;
1632   }
1633   return GNUNET_YES;
1634 }
1635
1636
1637 /**
1638  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1639  *
1640  * @param cls the socket (set from GNUNET_MESH_connect)
1641  * @param tunnel connection to the other end
1642  * @param tunnel_ctx this is NULL
1643  * @param sender who sent the message
1644  * @param message the actual message
1645  * @param atsi performance data for the connection
1646  * @return GNUNET_OK to keep the connection open,
1647  *         GNUNET_SYSERR to close it (signal serious error)
1648  */
1649 static int
1650 client_handle_transmit_close (void *cls,
1651                               struct GNUNET_MESH_Tunnel *tunnel,
1652                               void **tunnel_ctx,
1653                               const struct GNUNET_PeerIdentity *sender,
1654                               const struct GNUNET_MessageHeader *message,
1655                               const struct GNUNET_ATS_Information*atsi)
1656 {
1657   struct GNUNET_STREAM_Socket *socket = cls;
1658   
1659   return handle_transmit_close (socket,
1660                                 tunnel,
1661                                 sender,
1662                                 (struct GNUNET_STREAM_MessageHeader *)message,
1663                                 atsi);
1664 }
1665
1666
1667 /**
1668  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_*_CLOSE_ACK messages
1669  *
1670  * @param socket the socket
1671  * @param tunnel connection to the other end
1672  * @param sender who sent the message
1673  * @param message the actual message
1674  * @param atsi performance data for the connection
1675  * @param operation the close operation which is being ACK'ed
1676  * @return GNUNET_OK to keep the connection open,
1677  *         GNUNET_SYSERR to close it (signal serious error)
1678  */
1679 static int
1680 handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
1681                           struct GNUNET_MESH_Tunnel *tunnel,
1682                           const struct GNUNET_PeerIdentity *sender,
1683                           const struct GNUNET_STREAM_MessageHeader *message,
1684                           const struct GNUNET_ATS_Information *atsi,
1685                           int operation)
1686 {
1687   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
1688
1689   shutdown_handle = socket->shutdown_handle;
1690   if (NULL == shutdown_handle)
1691   {
1692     LOG (GNUNET_ERROR_TYPE_DEBUG,
1693          "%s: Received CLOSE_ACK when shutdown handle is NULL\n",
1694          GNUNET_i2s (&socket->other_peer));
1695     return GNUNET_OK;
1696   }
1697   switch (operation)
1698   {
1699   case SHUT_RDWR:
1700     switch (socket->state)
1701     {
1702     case STATE_CLOSE_WAIT:
1703       if (SHUT_RDWR != shutdown_handle->operation)
1704       {
1705         LOG (GNUNET_ERROR_TYPE_DEBUG,
1706              "%s: Received CLOSE_ACK when shutdown handle is not for "
1707              "SHUT_RDWR\n", GNUNET_i2s (&socket->other_peer));
1708         return GNUNET_OK;
1709       }
1710       LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received CLOSE_ACK from %s\n",
1711            GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1712       socket->state = STATE_CLOSED;
1713       break;
1714     default:
1715       LOG (GNUNET_ERROR_TYPE_DEBUG,
1716            "%s: Received CLOSE_ACK when in it not expected\n",
1717            GNUNET_i2s (&socket->other_peer));
1718       return GNUNET_OK;
1719     }
1720     break;
1721   case SHUT_RD:
1722     switch (socket->state)
1723     {
1724     case STATE_RECEIVE_CLOSE_WAIT:
1725       if (SHUT_RD != shutdown_handle->operation)
1726       {
1727         LOG (GNUNET_ERROR_TYPE_DEBUG,
1728              "%s: Received RECEIVE_CLOSE_ACK when shutdown handle "
1729              "is not for SHUT_RD\n", GNUNET_i2s (&socket->other_peer));
1730         return GNUNET_OK;
1731       }
1732       LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received RECEIVE_CLOSE_ACK from %s\n",
1733            GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1734       socket->state = STATE_RECEIVE_CLOSED;
1735       break;
1736     default:
1737       LOG (GNUNET_ERROR_TYPE_DEBUG,
1738            "%s: Received RECEIVE_CLOSE_ACK when in it not expected\n",
1739            GNUNET_i2s (&socket->other_peer));
1740       return GNUNET_OK;
1741     }
1742     break;
1743   case SHUT_WR:
1744     switch (socket->state)
1745     {
1746     case STATE_TRANSMIT_CLOSE_WAIT:
1747       if (SHUT_WR != shutdown_handle->operation)
1748       {
1749         LOG (GNUNET_ERROR_TYPE_DEBUG,
1750              "%s: Received TRANSMIT_CLOSE_ACK when shutdown handle "
1751              "is not for SHUT_WR\n",
1752              GNUNET_i2s (&socket->other_peer));
1753         return GNUNET_OK;
1754       }
1755       LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received TRANSMIT_CLOSE_ACK from %s\n",
1756            GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1757       socket->state = STATE_TRANSMIT_CLOSED;
1758       break;
1759     default:
1760       LOG (GNUNET_ERROR_TYPE_DEBUG,
1761            "%s: Received TRANSMIT_CLOSE_ACK when in it not expected\n",
1762            GNUNET_i2s (&socket->other_peer));          
1763       return GNUNET_OK;
1764     }
1765     break;
1766   default:
1767     GNUNET_assert (0);
1768   }
1769   if (NULL != shutdown_handle->completion_cb) /* Shutdown completion */
1770     shutdown_handle->completion_cb(shutdown_handle->completion_cls,
1771                                    operation);
1772   if (GNUNET_SCHEDULER_NO_TASK
1773       != shutdown_handle->close_msg_retransmission_task_id)
1774   {
1775     GNUNET_SCHEDULER_cancel
1776       (shutdown_handle->close_msg_retransmission_task_id);
1777     shutdown_handle->close_msg_retransmission_task_id =
1778         GNUNET_SCHEDULER_NO_TASK;
1779   }
1780   GNUNET_free (shutdown_handle); /* Free shutdown handle */
1781   socket->shutdown_handle = NULL;
1782   return GNUNET_OK;
1783 }
1784
1785
1786 /**
1787  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1788  *
1789  * @param cls the socket (set from GNUNET_MESH_connect)
1790  * @param tunnel connection to the other end
1791  * @param tunnel_ctx this is NULL
1792  * @param sender who sent the message
1793  * @param message the actual message
1794  * @param atsi performance data for the connection
1795  * @return GNUNET_OK to keep the connection open,
1796  *         GNUNET_SYSERR to close it (signal serious error)
1797  */
1798 static int
1799 client_handle_transmit_close_ack (void *cls,
1800                                   struct GNUNET_MESH_Tunnel *tunnel,
1801                                   void **tunnel_ctx,
1802                                   const struct GNUNET_PeerIdentity *sender,
1803                                   const struct GNUNET_MessageHeader *message,
1804                                   const struct GNUNET_ATS_Information*atsi)
1805 {
1806   struct GNUNET_STREAM_Socket *socket = cls;
1807
1808   return handle_generic_close_ack (socket,
1809                                    tunnel,
1810                                    sender,
1811                                    (const struct GNUNET_STREAM_MessageHeader *)
1812                                    message,
1813                                    atsi,
1814                                    SHUT_WR);
1815 }
1816
1817
1818 /**
1819  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1820  *
1821  * @param socket the socket
1822  * @param tunnel connection to the other end
1823  * @param sender who sent the message
1824  * @param message the actual message
1825  * @param atsi performance data for the connection
1826  * @return GNUNET_OK to keep the connection open,
1827  *         GNUNET_SYSERR to close it (signal serious error)
1828  */
1829 static int
1830 handle_receive_close (struct GNUNET_STREAM_Socket *socket,
1831                       struct GNUNET_MESH_Tunnel *tunnel,
1832                       const struct GNUNET_PeerIdentity *sender,
1833                       const struct GNUNET_STREAM_MessageHeader *message,
1834                       const struct GNUNET_ATS_Information *atsi)
1835 {
1836   struct GNUNET_STREAM_MessageHeader *receive_close_ack;
1837
1838   switch (socket->state)
1839   {
1840   case STATE_INIT:
1841   case STATE_LISTEN:
1842   case STATE_HELLO_WAIT:
1843     LOG (GNUNET_ERROR_TYPE_DEBUG,
1844          "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
1845          GNUNET_i2s (&socket->other_peer));
1846     return GNUNET_OK;
1847   default:
1848     break;
1849   }
1850   
1851   LOG (GNUNET_ERROR_TYPE_DEBUG,
1852        "%s: Received RECEIVE_CLOSE from %s\n",
1853        GNUNET_i2s (&socket->other_peer),
1854        GNUNET_i2s (&socket->other_peer));
1855   receive_close_ack =
1856     GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1857   receive_close_ack->header.size =
1858     htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1859   receive_close_ack->header.type =
1860     htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK);
1861   queue_message (socket, receive_close_ack, &set_state_closed,
1862                  NULL, GNUNET_NO);  
1863   /* FIXME: Handle the case where write handle is present; the write operation
1864      should be deemed as finised and the write continuation callback
1865      has to be called with the stream status GNUNET_STREAM_SHUTDOWN */
1866   return GNUNET_OK;
1867 }
1868
1869
1870 /**
1871  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1872  *
1873  * @param cls the socket (set from GNUNET_MESH_connect)
1874  * @param tunnel connection to the other end
1875  * @param tunnel_ctx this is NULL
1876  * @param sender who sent the message
1877  * @param message the actual message
1878  * @param atsi performance data for the connection
1879  * @return GNUNET_OK to keep the connection open,
1880  *         GNUNET_SYSERR to close it (signal serious error)
1881  */
1882 static int
1883 client_handle_receive_close (void *cls,
1884                              struct GNUNET_MESH_Tunnel *tunnel,
1885                              void **tunnel_ctx,
1886                              const struct GNUNET_PeerIdentity *sender,
1887                              const struct GNUNET_MessageHeader *message,
1888                              const struct GNUNET_ATS_Information*atsi)
1889 {
1890   struct GNUNET_STREAM_Socket *socket = cls;
1891
1892   return
1893     handle_receive_close (socket,
1894                           tunnel,
1895                           sender,
1896                           (const struct GNUNET_STREAM_MessageHeader *) message,
1897                           atsi);
1898 }
1899
1900
1901 /**
1902  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1903  *
1904  * @param cls the socket (set from GNUNET_MESH_connect)
1905  * @param tunnel connection to the other end
1906  * @param tunnel_ctx this is NULL
1907  * @param sender who sent the message
1908  * @param message the actual message
1909  * @param atsi performance data for the connection
1910  * @return GNUNET_OK to keep the connection open,
1911  *         GNUNET_SYSERR to close it (signal serious error)
1912  */
1913 static int
1914 client_handle_receive_close_ack (void *cls,
1915                                  struct GNUNET_MESH_Tunnel *tunnel,
1916                                  void **tunnel_ctx,
1917                                  const struct GNUNET_PeerIdentity *sender,
1918                                  const struct GNUNET_MessageHeader *message,
1919                                  const struct GNUNET_ATS_Information*atsi)
1920 {
1921   struct GNUNET_STREAM_Socket *socket = cls;
1922
1923   return handle_generic_close_ack (socket,
1924                                    tunnel,
1925                                    sender,
1926                                    (const struct GNUNET_STREAM_MessageHeader *)
1927                                    message,
1928                                    atsi,
1929                                    SHUT_RD);
1930 }
1931
1932
1933 /**
1934  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1935  *
1936  * @param socket the socket
1937  * @param tunnel connection to the other end
1938  * @param sender who sent the message
1939  * @param message the actual message
1940  * @param atsi performance data for the connection
1941  * @return GNUNET_OK to keep the connection open,
1942  *         GNUNET_SYSERR to close it (signal serious error)
1943  */
1944 static int
1945 handle_close (struct GNUNET_STREAM_Socket *socket,
1946               struct GNUNET_MESH_Tunnel *tunnel,
1947               const struct GNUNET_PeerIdentity *sender,
1948               const struct GNUNET_STREAM_MessageHeader *message,
1949               const struct GNUNET_ATS_Information*atsi)
1950 {
1951   struct GNUNET_STREAM_MessageHeader *close_ack;
1952
1953   switch (socket->state)
1954   {
1955   case STATE_INIT:
1956   case STATE_LISTEN:
1957   case STATE_HELLO_WAIT:
1958     LOG (GNUNET_ERROR_TYPE_DEBUG,
1959          "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
1960          GNUNET_i2s (&socket->other_peer));
1961     return GNUNET_OK;
1962   default:
1963     break;
1964   }
1965
1966   LOG (GNUNET_ERROR_TYPE_DEBUG,
1967        "%s: Received CLOSE from %s\n",
1968        GNUNET_i2s (&socket->other_peer),
1969        GNUNET_i2s (&socket->other_peer));
1970   close_ack = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1971   close_ack->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1972   close_ack->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK);
1973   queue_message (socket, close_ack, &set_state_closed, NULL, GNUNET_NO);
1974   if (socket->state == STATE_CLOSED)
1975     return GNUNET_OK;
1976
1977   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1978   socket->receive_buffer = NULL;
1979   socket->receive_buffer_size = 0;
1980   return GNUNET_OK;
1981 }
1982
1983
1984 /**
1985  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1986  *
1987  * @param cls the socket (set from GNUNET_MESH_connect)
1988  * @param tunnel connection to the other end
1989  * @param tunnel_ctx this is NULL
1990  * @param sender who sent the message
1991  * @param message the actual message
1992  * @param atsi performance data for the connection
1993  * @return GNUNET_OK to keep the connection open,
1994  *         GNUNET_SYSERR to close it (signal serious error)
1995  */
1996 static int
1997 client_handle_close (void *cls,
1998                      struct GNUNET_MESH_Tunnel *tunnel,
1999                      void **tunnel_ctx,
2000                      const struct GNUNET_PeerIdentity *sender,
2001                      const struct GNUNET_MessageHeader *message,
2002                      const struct GNUNET_ATS_Information*atsi)
2003 {
2004   struct GNUNET_STREAM_Socket *socket = cls;
2005
2006   return handle_close (socket,
2007                        tunnel,
2008                        sender,
2009                        (const struct GNUNET_STREAM_MessageHeader *) message,
2010                        atsi);
2011 }
2012
2013
2014 /**
2015  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
2016  *
2017  * @param cls the socket (set from GNUNET_MESH_connect)
2018  * @param tunnel connection to the other end
2019  * @param tunnel_ctx this is NULL
2020  * @param sender who sent the message
2021  * @param message the actual message
2022  * @param atsi performance data for the connection
2023  * @return GNUNET_OK to keep the connection open,
2024  *         GNUNET_SYSERR to close it (signal serious error)
2025  */
2026 static int
2027 client_handle_close_ack (void *cls,
2028                          struct GNUNET_MESH_Tunnel *tunnel,
2029                          void **tunnel_ctx,
2030                          const struct GNUNET_PeerIdentity *sender,
2031                          const struct GNUNET_MessageHeader *message,
2032                          const struct GNUNET_ATS_Information *atsi)
2033 {
2034   struct GNUNET_STREAM_Socket *socket = cls;
2035
2036   return handle_generic_close_ack (socket,
2037                                    tunnel,
2038                                    sender,
2039                                    (const struct GNUNET_STREAM_MessageHeader *) 
2040                                    message,
2041                                    atsi,
2042                                    SHUT_RDWR);
2043 }
2044
2045 /*****************************/
2046 /* Server's Message Handlers */
2047 /*****************************/
2048
2049 /**
2050  * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
2051  *
2052  * @param cls the closure
2053  * @param tunnel connection to the other end
2054  * @param tunnel_ctx the socket
2055  * @param sender who sent the message
2056  * @param message the actual message
2057  * @param atsi performance data for the connection
2058  * @return GNUNET_OK to keep the connection open,
2059  *         GNUNET_SYSERR to close it (signal serious error)
2060  */
2061 static int
2062 server_handle_data (void *cls,
2063                     struct GNUNET_MESH_Tunnel *tunnel,
2064                     void **tunnel_ctx,
2065                     const struct GNUNET_PeerIdentity *sender,
2066                     const struct GNUNET_MessageHeader *message,
2067                     const struct GNUNET_ATS_Information*atsi)
2068 {
2069   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2070
2071   return handle_data (socket,
2072                       tunnel,
2073                       sender,
2074                       (const struct GNUNET_STREAM_DataMessage *)message,
2075                       atsi);
2076 }
2077
2078
2079 /**
2080  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
2081  *
2082  * @param cls the closure
2083  * @param tunnel connection to the other end
2084  * @param tunnel_ctx the socket
2085  * @param sender who sent the message
2086  * @param message the actual message
2087  * @param atsi performance data for the connection
2088  * @return GNUNET_OK to keep the connection open,
2089  *         GNUNET_SYSERR to close it (signal serious error)
2090  */
2091 static int
2092 server_handle_hello (void *cls,
2093                      struct GNUNET_MESH_Tunnel *tunnel,
2094                      void **tunnel_ctx,
2095                      const struct GNUNET_PeerIdentity *sender,
2096                      const struct GNUNET_MessageHeader *message,
2097                      const struct GNUNET_ATS_Information*atsi)
2098 {
2099   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2100   struct GNUNET_STREAM_HelloAckMessage *reply;
2101
2102   if (0 != memcmp (sender,
2103                    &socket->other_peer,
2104                    sizeof (struct GNUNET_PeerIdentity)))
2105   {
2106     LOG_DEBUG ("%s: Received HELLO from non-confirming peer\n",
2107                GNUNET_i2s (&socket->other_peer));
2108     return GNUNET_YES;
2109   }
2110   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == ntohs (message->type));
2111   GNUNET_assert (socket->tunnel == tunnel);
2112   LOG_DEBUG ("%s: Received HELLO from %s\n", GNUNET_i2s (&socket->other_peer),
2113              GNUNET_i2s (&socket->other_peer));
2114   switch (socket->status)
2115   {
2116   case STATE_INIT:
2117     reply = generate_hello_ack (socket, GNUNET_YES);
2118     queue_message (socket, &reply->header, &set_state_hello_wait, NULL,
2119                    GNUNET_NO);
2120     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
2121                    socket->control_retransmission_task_id);
2122     socket->control_retransmission_task_id =
2123       GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
2124                                     &control_retransmission_task, socket);
2125     break;
2126   case STATE_HELLO_WAIT:
2127     /* Perhaps our HELLO_ACK was lost */
2128     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != 
2129                    socket->control_retransmission_task_id);
2130     GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
2131     socket->control_retransmission_task_id =
2132       GNUNET_SCHEDULER_add_now (&control_retransmission_task, socket);
2133     break;
2134   default:
2135     LOG_DEBUG( "%s: Client sent HELLO when in state %d\n",
2136                GNUNET_i2s (&socket->other_peer), socket->state);
2137     /* FIXME: Send RESET? */
2138   }
2139   return GNUNET_OK;
2140 }
2141
2142
2143 /**
2144  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
2145  *
2146  * @param cls the closure
2147  * @param tunnel connection to the other end
2148  * @param tunnel_ctx the socket
2149  * @param sender who sent the message
2150  * @param message the actual message
2151  * @param atsi performance data for the connection
2152  * @return GNUNET_OK to keep the connection open,
2153  *         GNUNET_SYSERR to close it (signal serious error)
2154  */
2155 static int
2156 server_handle_hello_ack (void *cls,
2157                          struct GNUNET_MESH_Tunnel *tunnel,
2158                          void **tunnel_ctx,
2159                          const struct GNUNET_PeerIdentity *sender,
2160                          const struct GNUNET_MessageHeader *message,
2161                          const struct GNUNET_ATS_Information*atsi)
2162 {
2163   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2164   const struct GNUNET_STREAM_HelloAckMessage *ack_message;
2165
2166   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK ==
2167                  ntohs (message->type));
2168   GNUNET_assert (socket->tunnel == tunnel);
2169   ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
2170   switch (socket->state)  
2171   {
2172   case STATE_HELLO_WAIT:
2173     LOG (GNUNET_ERROR_TYPE_DEBUG,
2174          "%s: Received HELLO_ACK from %s\n",
2175          GNUNET_i2s (&socket->other_peer),
2176          GNUNET_i2s (&socket->other_peer));
2177     socket->read_sequence_number = ntohl (ack_message->sequence_number);
2178     LOG (GNUNET_ERROR_TYPE_DEBUG,
2179          "%s: Read sequence number %u\n",
2180          GNUNET_i2s (&socket->other_peer),
2181          (unsigned int) socket->read_sequence_number);
2182     socket->receiver_window_available = 
2183       ntohl (ack_message->receiver_window_size);
2184     set_state_established (NULL, socket);
2185     break;
2186   default:
2187     LOG (GNUNET_ERROR_TYPE_DEBUG,
2188          "Client sent HELLO_ACK when in state %d\n", socket->state);    
2189   }
2190   return GNUNET_OK;
2191 }
2192
2193
2194 /**
2195  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
2196  *
2197  * @param cls the closure
2198  * @param tunnel connection to the other end
2199  * @param tunnel_ctx the socket
2200  * @param sender who sent the message
2201  * @param message the actual message
2202  * @param atsi performance data for the connection
2203  * @return GNUNET_OK to keep the connection open,
2204  *         GNUNET_SYSERR to close it (signal serious error)
2205  */
2206 static int
2207 server_handle_reset (void *cls,
2208                      struct GNUNET_MESH_Tunnel *tunnel,
2209                      void **tunnel_ctx,
2210                      const struct GNUNET_PeerIdentity *sender,
2211                      const struct GNUNET_MessageHeader *message,
2212                      const struct GNUNET_ATS_Information*atsi)
2213 {
2214   // struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2215
2216   return GNUNET_OK;
2217 }
2218
2219
2220 /**
2221  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
2222  *
2223  * @param cls the closure
2224  * @param tunnel connection to the other end
2225  * @param tunnel_ctx the socket
2226  * @param sender who sent the message
2227  * @param message the actual message
2228  * @param atsi performance data for the connection
2229  * @return GNUNET_OK to keep the connection open,
2230  *         GNUNET_SYSERR to close it (signal serious error)
2231  */
2232 static int
2233 server_handle_transmit_close (void *cls,
2234                               struct GNUNET_MESH_Tunnel *tunnel,
2235                               void **tunnel_ctx,
2236                               const struct GNUNET_PeerIdentity *sender,
2237                               const struct GNUNET_MessageHeader *message,
2238                               const struct GNUNET_ATS_Information*atsi)
2239 {
2240   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2241
2242   return handle_transmit_close (socket,
2243                                 tunnel,
2244                                 sender,
2245                                 (struct GNUNET_STREAM_MessageHeader *)message,
2246                                 atsi);
2247 }
2248
2249
2250 /**
2251  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
2252  *
2253  * @param cls the closure
2254  * @param tunnel connection to the other end
2255  * @param tunnel_ctx the socket
2256  * @param sender who sent the message
2257  * @param message the actual message
2258  * @param atsi performance data for the connection
2259  * @return GNUNET_OK to keep the connection open,
2260  *         GNUNET_SYSERR to close it (signal serious error)
2261  */
2262 static int
2263 server_handle_transmit_close_ack (void *cls,
2264                                   struct GNUNET_MESH_Tunnel *tunnel,
2265                                   void **tunnel_ctx,
2266                                   const struct GNUNET_PeerIdentity *sender,
2267                                   const struct GNUNET_MessageHeader *message,
2268                                   const struct GNUNET_ATS_Information*atsi)
2269 {
2270   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2271
2272   return handle_generic_close_ack (socket,
2273                                    tunnel,
2274                                    sender,
2275                                    (const struct GNUNET_STREAM_MessageHeader *)
2276                                    message,
2277                                    atsi,
2278                                    SHUT_WR);
2279 }
2280
2281
2282 /**
2283  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
2284  *
2285  * @param cls the closure
2286  * @param tunnel connection to the other end
2287  * @param tunnel_ctx the socket
2288  * @param sender who sent the message
2289  * @param message the actual message
2290  * @param atsi performance data for the connection
2291  * @return GNUNET_OK to keep the connection open,
2292  *         GNUNET_SYSERR to close it (signal serious error)
2293  */
2294 static int
2295 server_handle_receive_close (void *cls,
2296                              struct GNUNET_MESH_Tunnel *tunnel,
2297                              void **tunnel_ctx,
2298                              const struct GNUNET_PeerIdentity *sender,
2299                              const struct GNUNET_MessageHeader *message,
2300                              const struct GNUNET_ATS_Information*atsi)
2301 {
2302   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2303
2304   return
2305     handle_receive_close (socket,
2306                           tunnel,
2307                           sender,
2308                           (const struct GNUNET_STREAM_MessageHeader *) message,
2309                           atsi);
2310 }
2311
2312
2313 /**
2314  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
2315  *
2316  * @param cls the closure
2317  * @param tunnel connection to the other end
2318  * @param tunnel_ctx the socket
2319  * @param sender who sent the message
2320  * @param message the actual message
2321  * @param atsi performance data for the connection
2322  * @return GNUNET_OK to keep the connection open,
2323  *         GNUNET_SYSERR to close it (signal serious error)
2324  */
2325 static int
2326 server_handle_receive_close_ack (void *cls,
2327                                  struct GNUNET_MESH_Tunnel *tunnel,
2328                                  void **tunnel_ctx,
2329                                  const struct GNUNET_PeerIdentity *sender,
2330                                  const struct GNUNET_MessageHeader *message,
2331                                  const struct GNUNET_ATS_Information*atsi)
2332 {
2333   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2334
2335   return handle_generic_close_ack (socket,
2336                                    tunnel,
2337                                    sender,
2338                                    (const struct GNUNET_STREAM_MessageHeader *)
2339                                    message,
2340                                    atsi,
2341                                    SHUT_RD);
2342 }
2343
2344
2345 /**
2346  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
2347  *
2348  * @param cls the listen socket (from GNUNET_MESH_connect in
2349  *          GNUNET_STREAM_listen) 
2350  * @param tunnel connection to the other end
2351  * @param tunnel_ctx the socket
2352  * @param sender who sent the message
2353  * @param message the actual message
2354  * @param atsi performance data for the connection
2355  * @return GNUNET_OK to keep the connection open,
2356  *         GNUNET_SYSERR to close it (signal serious error)
2357  */
2358 static int
2359 server_handle_close (void *cls,
2360                      struct GNUNET_MESH_Tunnel *tunnel,
2361                      void **tunnel_ctx,
2362                      const struct GNUNET_PeerIdentity *sender,
2363                      const struct GNUNET_MessageHeader *message,
2364                      const struct GNUNET_ATS_Information*atsi)
2365 {
2366   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2367   
2368   return handle_close (socket,
2369                        tunnel,
2370                        sender,
2371                        (const struct GNUNET_STREAM_MessageHeader *) message,
2372                        atsi);
2373 }
2374
2375
2376 /**
2377  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
2378  *
2379  * @param cls the closure
2380  * @param tunnel connection to the other end
2381  * @param tunnel_ctx the socket
2382  * @param sender who sent the message
2383  * @param message the actual message
2384  * @param atsi performance data for the connection
2385  * @return GNUNET_OK to keep the connection open,
2386  *         GNUNET_SYSERR to close it (signal serious error)
2387  */
2388 static int
2389 server_handle_close_ack (void *cls,
2390                          struct GNUNET_MESH_Tunnel *tunnel,
2391                          void **tunnel_ctx,
2392                          const struct GNUNET_PeerIdentity *sender,
2393                          const struct GNUNET_MessageHeader *message,
2394                          const struct GNUNET_ATS_Information*atsi)
2395 {
2396   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2397
2398   return handle_generic_close_ack (socket,
2399                                    tunnel,
2400                                    sender,
2401                                    (const struct GNUNET_STREAM_MessageHeader *) 
2402                                    message,
2403                                    atsi,
2404                                    SHUT_RDWR);
2405 }
2406
2407
2408 /**
2409  * Handler for DATA_ACK messages
2410  *
2411  * @param socket the socket through which the ack was received
2412  * @param tunnel connection to the other end
2413  * @param sender who sent the message
2414  * @param ack the acknowledgment message
2415  * @param atsi performance data for the connection
2416  * @return GNUNET_OK to keep the connection open,
2417  *         GNUNET_SYSERR to close it (signal serious error)
2418  */
2419 static int
2420 handle_ack (struct GNUNET_STREAM_Socket *socket,
2421             struct GNUNET_MESH_Tunnel *tunnel,
2422             const struct GNUNET_PeerIdentity *sender,
2423             const struct GNUNET_STREAM_AckMessage *ack,
2424             const struct GNUNET_ATS_Information*atsi)
2425 {
2426   unsigned int packet;
2427   int need_retransmission;
2428   uint32_t sequence_difference;
2429   
2430   if (0 != memcmp (sender,
2431                    &socket->other_peer,
2432                    sizeof (struct GNUNET_PeerIdentity)))
2433   {
2434     LOG (GNUNET_ERROR_TYPE_DEBUG,
2435          "%s: Received ACK from non-confirming peer\n",
2436          GNUNET_i2s (&socket->other_peer));
2437     return GNUNET_YES;
2438   }
2439   switch (socket->state)
2440   {
2441   case (STATE_ESTABLISHED):
2442   case (STATE_RECEIVE_CLOSED):
2443   case (STATE_RECEIVE_CLOSE_WAIT):
2444     if (NULL == socket->write_handle)
2445     {
2446       LOG (GNUNET_ERROR_TYPE_DEBUG,
2447            "%s: Received DATA_ACK when write_handle is NULL\n",
2448            GNUNET_i2s (&socket->other_peer));
2449       return GNUNET_OK;
2450     }
2451     sequence_difference = 
2452         socket->write_sequence_number - ntohl (ack->base_sequence_number);
2453     if (!(sequence_difference <= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
2454     {
2455       LOG (GNUNET_ERROR_TYPE_DEBUG,
2456            "%s: Received DATA_ACK with unexpected base sequence number\n",
2457            GNUNET_i2s (&socket->other_peer));
2458       LOG (GNUNET_ERROR_TYPE_DEBUG,
2459            "%s: Current write sequence: %u; Ack's base sequence: %u\n",
2460            GNUNET_i2s (&socket->other_peer),
2461            socket->write_sequence_number,
2462            ntohl (ack->base_sequence_number));
2463       return GNUNET_OK;
2464     }
2465     /* FIXME: include the case when write_handle is cancelled - ignore the 
2466        acks */
2467     LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received DATA_ACK from %s\n",
2468          GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
2469     /* Cancel the retransmission task */
2470     if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
2471     {
2472       GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
2473       socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
2474     }
2475     for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2476     {
2477       if (NULL == socket->write_handle->messages[packet]) break;
2478       /* BS: Base sequence from ack; PS: sequence num of current packet */
2479       sequence_difference = ntohl (ack->base_sequence_number)
2480         - ntohl (socket->write_handle->messages[packet]->sequence_number);
2481       if ((0 == sequence_difference) ||
2482           (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH < sequence_difference))
2483         continue; /* The message in our handle is not yet received */
2484       /* case where BS = PS + GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH */
2485       /* sequence_difference <= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH */
2486       ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
2487                             packet, GNUNET_YES);
2488     }
2489     /* Update the receive window remaining
2490        FIXME : Should update with the value from a data ack with greater
2491        sequence number */
2492     socket->receiver_window_available = 
2493       ntohl (ack->receive_window_remaining);
2494     /* Check if we have received all acknowledgements */
2495     need_retransmission = GNUNET_NO;
2496     for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2497     {
2498       if (NULL == socket->write_handle->messages[packet]) break;
2499       if (GNUNET_YES != ackbitmap_is_bit_set 
2500           (&socket->write_handle->ack_bitmap,packet))
2501       {
2502         need_retransmission = GNUNET_YES;
2503         break;
2504       }
2505     }
2506     if (GNUNET_YES == need_retransmission)
2507     {
2508       write_data (socket);
2509     }
2510     else      /* We have to call the write continuation callback now */
2511     {
2512       struct GNUNET_STREAM_IOWriteHandle *write_handle;
2513       
2514       /* Free the packets */
2515       for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2516       {
2517         GNUNET_free_non_null (socket->write_handle->messages[packet]);
2518       }
2519       write_handle = socket->write_handle;
2520       socket->write_handle = NULL;
2521       if (NULL != write_handle->write_cont)
2522         write_handle->write_cont (write_handle->write_cont_cls,
2523                                   socket->status,
2524                                   write_handle->size);
2525       /* We are done with the write handle - Freeing it */
2526       GNUNET_free (write_handle);
2527       LOG (GNUNET_ERROR_TYPE_DEBUG,
2528            "%s: Write completion callback completed\n",
2529            GNUNET_i2s (&socket->other_peer));      
2530     }
2531     break;
2532   default:
2533     break;
2534   }
2535   return GNUNET_OK;
2536 }
2537
2538
2539 /**
2540  * Handler for DATA_ACK messages
2541  *
2542  * @param cls the 'struct GNUNET_STREAM_Socket'
2543  * @param tunnel connection to the other end
2544  * @param tunnel_ctx unused
2545  * @param sender who sent the message
2546  * @param message the actual message
2547  * @param atsi performance data for the connection
2548  * @return GNUNET_OK to keep the connection open,
2549  *         GNUNET_SYSERR to close it (signal serious error)
2550  */
2551 static int
2552 client_handle_ack (void *cls,
2553                    struct GNUNET_MESH_Tunnel *tunnel,
2554                    void **tunnel_ctx,
2555                    const struct GNUNET_PeerIdentity *sender,
2556                    const struct GNUNET_MessageHeader *message,
2557                    const struct GNUNET_ATS_Information*atsi)
2558 {
2559   struct GNUNET_STREAM_Socket *socket = cls;
2560   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2561  
2562   return handle_ack (socket, tunnel, sender, ack, atsi);
2563 }
2564
2565
2566 /**
2567  * Handler for DATA_ACK messages
2568  *
2569  * @param cls the server's listen socket
2570  * @param tunnel connection to the other end
2571  * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
2572  * @param sender who sent the message
2573  * @param message the actual message
2574  * @param atsi performance data for the connection
2575  * @return GNUNET_OK to keep the connection open,
2576  *         GNUNET_SYSERR to close it (signal serious error)
2577  */
2578 static int
2579 server_handle_ack (void *cls,
2580                    struct GNUNET_MESH_Tunnel *tunnel,
2581                    void **tunnel_ctx,
2582                    const struct GNUNET_PeerIdentity *sender,
2583                    const struct GNUNET_MessageHeader *message,
2584                    const struct GNUNET_ATS_Information*atsi)
2585 {
2586   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2587   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2588  
2589   return handle_ack (socket, tunnel, sender, ack, atsi);
2590 }
2591
2592
2593 /**
2594  * For client message handlers, the stream socket is in the
2595  * closure argument.
2596  */
2597 static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
2598   {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2599   {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2600    sizeof (struct GNUNET_STREAM_AckMessage) },
2601   {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2602    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2603   {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2604    sizeof (struct GNUNET_STREAM_MessageHeader)},
2605   {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2606    sizeof (struct GNUNET_STREAM_MessageHeader)},
2607   {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2608    sizeof (struct GNUNET_STREAM_MessageHeader)},
2609   {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2610    sizeof (struct GNUNET_STREAM_MessageHeader)},
2611   {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2612    sizeof (struct GNUNET_STREAM_MessageHeader)},
2613   {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2614    sizeof (struct GNUNET_STREAM_MessageHeader)},
2615   {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2616    sizeof (struct GNUNET_STREAM_MessageHeader)},
2617   {NULL, 0, 0}
2618 };
2619
2620
2621 /**
2622  * For server message handlers, the stream socket is in the
2623  * tunnel context, and the listen socket in the closure argument.
2624  */
2625 static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
2626   {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2627   {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2628    sizeof (struct GNUNET_STREAM_AckMessage) },
2629   {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 
2630    sizeof (struct GNUNET_STREAM_MessageHeader)},
2631   {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2632    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2633   {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2634    sizeof (struct GNUNET_STREAM_MessageHeader)},
2635   {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2636    sizeof (struct GNUNET_STREAM_MessageHeader)},
2637   {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2638    sizeof (struct GNUNET_STREAM_MessageHeader)},
2639   {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2640    sizeof (struct GNUNET_STREAM_MessageHeader)},
2641   {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2642    sizeof (struct GNUNET_STREAM_MessageHeader)},
2643   {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2644    sizeof (struct GNUNET_STREAM_MessageHeader)},
2645   {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2646    sizeof (struct GNUNET_STREAM_MessageHeader)},
2647   {NULL, 0, 0}
2648 };
2649
2650
2651 /**
2652  * Function called when our target peer is connected to our tunnel
2653  *
2654  * @param cls the socket for which this tunnel is created
2655  * @param peer the peer identity of the target
2656  * @param atsi performance data for the connection
2657  */
2658 static void
2659 mesh_peer_connect_callback (void *cls,
2660                             const struct GNUNET_PeerIdentity *peer,
2661                             const struct GNUNET_ATS_Information * atsi)
2662 {
2663   struct GNUNET_STREAM_Socket *socket = cls;
2664   struct GNUNET_STREAM_MessageHeader *message;
2665   
2666   if (0 != memcmp (peer,
2667                    &socket->other_peer,
2668                    sizeof (struct GNUNET_PeerIdentity)))
2669   {
2670     LOG (GNUNET_ERROR_TYPE_DEBUG,
2671          "%s: A peer which is not our target has connected to our tunnel\n",
2672          GNUNET_i2s(peer));
2673     return;
2674   }
2675   LOG (GNUNET_ERROR_TYPE_DEBUG,
2676        "%s: Target peer %s connected\n",
2677        GNUNET_i2s (&socket->other_peer),
2678        GNUNET_i2s (&socket->other_peer));
2679   /* Set state to INIT */
2680   socket->state = STATE_INIT;
2681   /* Send HELLO message */
2682   message = generate_hello ();
2683   queue_message (socket, message, &set_state_hello_wait, NULL, GNUNET_NO);
2684   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
2685                  socket->control_retransmission_task_id);
2686   socket->control_retransmission_task_id =
2687     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
2688                                   &control_retransmission_task, socket);
2689 }
2690
2691
2692 /**
2693  * Function called when our target peer is disconnected from our tunnel
2694  *
2695  * @param cls the socket associated which this tunnel
2696  * @param peer the peer identity of the target
2697  */
2698 static void
2699 mesh_peer_disconnect_callback (void *cls,
2700                                const struct GNUNET_PeerIdentity *peer)
2701 {
2702   struct GNUNET_STREAM_Socket *socket=cls;
2703   
2704   /* If the state is SHUTDOWN its ok; else set the state of the socket to SYSERR */
2705   LOG (GNUNET_ERROR_TYPE_DEBUG,
2706        "%s: Other peer %s disconnected \n",
2707        GNUNET_i2s (&socket->other_peer),
2708        GNUNET_i2s (&socket->other_peer));
2709 }
2710
2711
2712 /**
2713  * Method called whenever a peer creates a tunnel to us
2714  *
2715  * @param cls closure
2716  * @param tunnel new handle to the tunnel
2717  * @param initiator peer that started the tunnel
2718  * @param atsi performance information for the tunnel
2719  * @return initial tunnel context for the tunnel
2720  *         (can be NULL -- that's not an error)
2721  */
2722 static void *
2723 new_tunnel_notify (void *cls,
2724                    struct GNUNET_MESH_Tunnel *tunnel,
2725                    const struct GNUNET_PeerIdentity *initiator,
2726                    const struct GNUNET_ATS_Information *atsi)
2727 {
2728   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2729   struct GNUNET_STREAM_Socket *socket;
2730
2731   /* FIXME: If a tunnel is already created, we should not accept new tunnels
2732      from the same peer again until the socket is closed */
2733
2734   if (GNUNET_NO == lsocket->listening)
2735   {
2736     GNUNET_MESH_tunnel_destroy (tunnel);
2737     return NULL;
2738   }
2739   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2740   socket->other_peer = *initiator;
2741   socket->tunnel = tunnel;
2742   socket->session_id = 0;       /* FIXME */
2743   socket->state = STATE_INIT;
2744   socket->lsocket = lsocket;
2745   socket->retransmit_timeout = lsocket->retransmit_timeout;
2746   socket->testing_active = lsocket->testing_active;
2747   socket->testing_set_write_sequence_number_value =
2748       lsocket->testing_set_write_sequence_number_value;
2749   socket->max_payload_size = lsocket->max_payload_size;
2750   LOG (GNUNET_ERROR_TYPE_DEBUG,
2751        "%s: Peer %s initiated tunnel to us\n", 
2752        GNUNET_i2s (&socket->other_peer),
2753        GNUNET_i2s (&socket->other_peer));
2754   /* FIXME: Copy MESH handle from lsocket to socket */
2755   return socket;
2756 }
2757
2758
2759 /**
2760  * Function called whenever an inbound tunnel is destroyed.  Should clean up
2761  * any associated state.  This function is NOT called if the client has
2762  * explicitly asked for the tunnel to be destroyed using
2763  * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
2764  * the tunnel.
2765  *
2766  * @param cls closure (set from GNUNET_MESH_connect)
2767  * @param tunnel connection to the other end (henceforth invalid)
2768  * @param tunnel_ctx place where local state associated
2769  *                   with the tunnel is stored
2770  */
2771 static void 
2772 tunnel_cleaner (void *cls,
2773                 const struct GNUNET_MESH_Tunnel *tunnel,
2774                 void *tunnel_ctx)
2775 {
2776   struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
2777
2778   if (tunnel != socket->tunnel)
2779     return;
2780
2781   GNUNET_break_op(0);
2782   LOG (GNUNET_ERROR_TYPE_DEBUG,
2783        "%s: Peer %s has terminated connection abruptly\n",
2784        GNUNET_i2s (&socket->other_peer),
2785        GNUNET_i2s (&socket->other_peer));
2786   socket->status = GNUNET_STREAM_SHUTDOWN;
2787   /* Clear Transmit handles */
2788   if (NULL != socket->transmit_handle)
2789   {
2790     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2791     socket->transmit_handle = NULL;
2792   }
2793   /* Stop Tasks using socket->tunnel */
2794   if (GNUNET_SCHEDULER_NO_TASK != socket->ack_task_id)
2795   {
2796     GNUNET_SCHEDULER_cancel (socket->ack_task_id);
2797     socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
2798   }
2799   if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
2800   {
2801     GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
2802     socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
2803   }
2804   /* FIXME: Cancel all other tasks using socket->tunnel */
2805   socket->tunnel = NULL;
2806 }
2807
2808
2809 /**
2810  * Callback to signal timeout on lockmanager lock acquire
2811  *
2812  * @param cls the ListenSocket
2813  * @param tc the scheduler task context
2814  */
2815 static void
2816 lockmanager_acquire_timeout (void *cls, 
2817                              const struct GNUNET_SCHEDULER_TaskContext *tc)
2818 {
2819   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2820   GNUNET_STREAM_ListenCallback listen_cb;
2821   void *listen_cb_cls;
2822
2823   lsocket->lockmanager_acquire_timeout_task = GNUNET_SCHEDULER_NO_TASK;
2824   listen_cb = lsocket->listen_cb;
2825   listen_cb_cls = lsocket->listen_cb_cls;
2826   if (NULL != listen_cb)
2827     listen_cb (listen_cb_cls, NULL, NULL);
2828 }
2829
2830
2831 /**
2832  * Callback to notify us on the status changes on app_port lock
2833  *
2834  * @param cls the ListenSocket
2835  * @param domain the domain name of the lock
2836  * @param lock the app_port
2837  * @param status the current status of the lock
2838  */
2839 static void
2840 lock_status_change_cb (void *cls, const char *domain, uint32_t lock,
2841                        enum GNUNET_LOCKMANAGER_Status status)
2842 {
2843   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2844
2845   GNUNET_assert (lock == (uint32_t) lsocket->port);
2846   if (GNUNET_LOCKMANAGER_SUCCESS == status)
2847   {
2848     lsocket->listening = GNUNET_YES;
2849     if (GNUNET_SCHEDULER_NO_TASK != lsocket->lockmanager_acquire_timeout_task)
2850     {
2851       GNUNET_SCHEDULER_cancel (lsocket->lockmanager_acquire_timeout_task);
2852       lsocket->lockmanager_acquire_timeout_task = GNUNET_SCHEDULER_NO_TASK;
2853     }
2854     if (NULL == lsocket->mesh)
2855     {
2856       GNUNET_MESH_ApplicationType ports[] = {lsocket->port, 0};
2857
2858       lsocket->mesh = GNUNET_MESH_connect (lsocket->cfg,
2859                                            lsocket, /* Closure */
2860                                            &new_tunnel_notify,
2861                                            &tunnel_cleaner,
2862                                            server_message_handlers,
2863                                            ports);
2864       GNUNET_assert (NULL != lsocket->mesh);
2865       if (NULL != lsocket->listen_ok_cb)
2866       {
2867         (void) lsocket->listen_ok_cb ();
2868       }
2869     }
2870   }
2871   if (GNUNET_LOCKMANAGER_RELEASE == status)
2872     lsocket->listening = GNUNET_NO;
2873 }
2874
2875
2876 /*****************/
2877 /* API functions */
2878 /*****************/
2879
2880
2881 /**
2882  * Tries to open a stream to the target peer
2883  *
2884  * @param cfg configuration to use
2885  * @param target the target peer to which the stream has to be opened
2886  * @param app_port the application port number which uniquely identifies this
2887  *            stream
2888  * @param open_cb this function will be called after stream has be established;
2889  *          cannot be NULL
2890  * @param open_cb_cls the closure for open_cb
2891  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
2892  * @return if successful it returns the stream socket; NULL if stream cannot be
2893  *         opened 
2894  */
2895 struct GNUNET_STREAM_Socket *
2896 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
2897                     const struct GNUNET_PeerIdentity *target,
2898                     GNUNET_MESH_ApplicationType app_port,
2899                     GNUNET_STREAM_OpenCallback open_cb,
2900                     void *open_cb_cls,
2901                     ...)
2902 {
2903   struct GNUNET_STREAM_Socket *socket;
2904   enum GNUNET_STREAM_Option option;
2905   GNUNET_MESH_ApplicationType ports[] = {app_port, 0};
2906   va_list vargs;
2907   uint16_t payload_size;
2908
2909   LOG (GNUNET_ERROR_TYPE_DEBUG,
2910        "%s\n", __func__);
2911   GNUNET_assert (NULL != open_cb);
2912   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2913   socket->other_peer = *target;
2914   socket->open_cb = open_cb;
2915   socket->open_cls = open_cb_cls;
2916   /* Set defaults */
2917   socket->retransmit_timeout = TIME_REL_SECS (default_timeout);
2918   socket->testing_active = GNUNET_NO;
2919   socket->max_payload_size = DEFAULT_MAX_PAYLOAD_SIZE;
2920   va_start (vargs, open_cb_cls); /* Parse variable args */
2921   do {
2922     option = va_arg (vargs, enum GNUNET_STREAM_Option);
2923     switch (option)
2924     {
2925     case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
2926       /* Expect struct GNUNET_TIME_Relative */
2927       socket->retransmit_timeout = va_arg (vargs,
2928                                            struct GNUNET_TIME_Relative);
2929       break;
2930     case GNUNET_STREAM_OPTION_TESTING_SET_WRITE_SEQUENCE_NUMBER:
2931       socket->testing_active = GNUNET_YES;
2932       socket->testing_set_write_sequence_number_value = va_arg (vargs,
2933                                                                 uint32_t);
2934       break;
2935     case GNUNET_STREAM_OPTION_LISTEN_TIMEOUT:
2936       GNUNET_break (0);          /* Option irrelevant in STREAM_open */
2937       break;
2938     case GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS:
2939       GNUNET_break (0);          /* Option irrelevant in STREAM_open */
2940       break;
2941     case GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE:
2942       payload_size = (uint16_t) va_arg (vargs, unsigned int);
2943       GNUNET_assert (0 != payload_size);
2944       if (payload_size < socket->max_payload_size)
2945         socket->max_payload_size = payload_size;
2946       break;
2947     case GNUNET_STREAM_OPTION_END:
2948       break;
2949     }
2950   } while (GNUNET_STREAM_OPTION_END != option);
2951   va_end (vargs);               /* End of variable args parsing */
2952   socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
2953                                       socket, /* cls */
2954                                       NULL, /* No inbound tunnel handler */
2955                                       NULL, /* No in-tunnel cleaner */
2956                                       client_message_handlers,
2957                                       ports); /* We don't get inbound tunnels */
2958   if (NULL == socket->mesh)   /* Fail if we cannot connect to mesh */
2959   {
2960     GNUNET_free (socket);
2961     return NULL;
2962   }
2963   /* Now create the mesh tunnel to target */
2964   LOG (GNUNET_ERROR_TYPE_DEBUG, "Creating MESH Tunnel\n");
2965   socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
2966                                               NULL, /* Tunnel context */
2967                                               &mesh_peer_connect_callback,
2968                                               &mesh_peer_disconnect_callback,
2969                                               socket);
2970   GNUNET_assert (NULL != socket->tunnel);
2971   GNUNET_MESH_peer_request_connect_add (socket->tunnel,
2972                                         &socket->other_peer);
2973   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s() END\n", __func__);
2974   return socket;
2975 }
2976
2977
2978 /**
2979  * Shutdown the stream for reading or writing (similar to man 2 shutdown).
2980  *
2981  * @param socket the stream socket
2982  * @param operation SHUT_RD, SHUT_WR or SHUT_RDWR
2983  * @param completion_cb the callback that will be called upon successful
2984  *          shutdown of given operation
2985  * @param completion_cls the closure for the completion callback
2986  * @return the shutdown handle
2987  */
2988 struct GNUNET_STREAM_ShutdownHandle *
2989 GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
2990                         int operation,
2991                         GNUNET_STREAM_ShutdownCompletion completion_cb,
2992                         void *completion_cls)
2993 {
2994   struct GNUNET_STREAM_ShutdownHandle *handle;
2995   struct GNUNET_STREAM_MessageHeader *msg;
2996   
2997   GNUNET_assert (NULL == socket->shutdown_handle);
2998
2999   handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ShutdownHandle));
3000   handle->socket = socket;
3001   handle->completion_cb = completion_cb;
3002   handle->completion_cls = completion_cls;
3003   socket->shutdown_handle = handle;
3004
3005   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
3006   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
3007   switch (operation)
3008   {
3009   case SHUT_RD:
3010     handle->operation = SHUT_RD;
3011     if (NULL != socket->read_handle)
3012       LOG (GNUNET_ERROR_TYPE_WARNING,
3013            "Existing read handle should be cancelled before shutting"
3014            " down reading\n");
3015     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
3016     queue_message (socket, msg, &set_state_receive_close_wait, NULL,
3017                    GNUNET_NO);
3018     break;
3019   case SHUT_WR:
3020     handle->operation = SHUT_WR;
3021     if (NULL != socket->write_handle)
3022       LOG (GNUNET_ERROR_TYPE_WARNING,
3023            "Existing write handle should be cancelled before shutting"
3024            " down writing\n");
3025     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
3026     queue_message (socket, msg, &set_state_transmit_close_wait, NULL,
3027                    GNUNET_NO);
3028     break;
3029   case SHUT_RDWR:
3030     handle->operation = SHUT_RDWR;
3031     if (NULL != socket->write_handle)
3032       LOG (GNUNET_ERROR_TYPE_WARNING,
3033            "Existing write handle should be cancelled before shutting"
3034            " down writing\n");
3035     if (NULL != socket->read_handle)
3036       LOG (GNUNET_ERROR_TYPE_WARNING,
3037            "Existing read handle should be cancelled before shutting"
3038            " down reading\n");
3039     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
3040     queue_message (socket, msg, &set_state_close_wait, NULL, GNUNET_NO);
3041     break;
3042   default:
3043     LOG (GNUNET_ERROR_TYPE_WARNING,
3044          "GNUNET_STREAM_shutdown called with invalid value for "
3045          "parameter operation -- Ignoring\n");
3046     GNUNET_free (msg);
3047     GNUNET_free (handle);
3048     return NULL;
3049   }
3050   handle->close_msg_retransmission_task_id =
3051     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
3052                                   &close_msg_retransmission_task,
3053                                   handle);
3054   return handle;
3055 }
3056
3057
3058 /**
3059  * Cancels a pending shutdown
3060  *
3061  * @param handle the shutdown handle returned from GNUNET_STREAM_shutdown
3062  */
3063 void
3064 GNUNET_STREAM_shutdown_cancel (struct GNUNET_STREAM_ShutdownHandle *handle)
3065 {
3066   if (GNUNET_SCHEDULER_NO_TASK != handle->close_msg_retransmission_task_id)
3067     GNUNET_SCHEDULER_cancel (handle->close_msg_retransmission_task_id);
3068   handle->socket->shutdown_handle = NULL;
3069   GNUNET_free (handle);
3070 }
3071
3072
3073 /**
3074  * Closes the stream
3075  *
3076  * @param socket the stream socket
3077  */
3078 void
3079 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
3080 {
3081   struct MessageQueue *head;
3082
3083   if (NULL != socket->read_handle)
3084   {
3085     LOG (GNUNET_ERROR_TYPE_WARNING,
3086          "Closing STREAM socket when a read handle is pending\n");
3087     GNUNET_STREAM_io_read_cancel (socket->read_handle);
3088   }
3089   if (NULL != socket->write_handle)
3090   {
3091     LOG (GNUNET_ERROR_TYPE_WARNING,
3092          "Closing STREAM socket when a write handle is pending\n");
3093     GNUNET_STREAM_io_write_cancel (socket->write_handle);
3094     //socket->write_handle = NULL;
3095   }
3096   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
3097   {
3098     /* socket closed with read task pending!? */
3099     GNUNET_break (0);
3100     GNUNET_SCHEDULER_cancel (socket->read_task_id);
3101     socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
3102   }  
3103   /* Terminate the ack'ing tasks if they are still present */
3104   if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK)
3105   {
3106     GNUNET_SCHEDULER_cancel (socket->ack_task_id);
3107     socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
3108   }
3109   /* Terminate the control retransmission tasks */
3110   if (GNUNET_SCHEDULER_NO_TASK != socket->control_retransmission_task_id)
3111   {
3112     GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
3113   }
3114   /* Clear Transmit handles */
3115   if (NULL != socket->transmit_handle)
3116   {
3117     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
3118     socket->transmit_handle = NULL;
3119   }
3120   /* Clear existing message queue */
3121   while (NULL != (head = socket->queue_head)) {
3122     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
3123                                  socket->queue_tail,
3124                                  head);
3125     GNUNET_free (head->message);
3126     GNUNET_free (head);
3127   }
3128   /* Close associated tunnel */
3129   if (NULL != socket->tunnel)
3130   {
3131     GNUNET_MESH_tunnel_destroy (socket->tunnel);
3132     socket->tunnel = NULL;
3133   }
3134   /* Close mesh connection */
3135   if (NULL != socket->mesh && NULL == socket->lsocket)
3136   {
3137     GNUNET_MESH_disconnect (socket->mesh);
3138     socket->mesh = NULL;
3139   }  
3140   /* Release receive buffer */
3141   if (NULL != socket->receive_buffer)
3142   {
3143     GNUNET_free (socket->receive_buffer);
3144   }
3145   GNUNET_free (socket);
3146 }
3147
3148
3149 /**
3150  * Listens for stream connections for a specific application ports
3151  *
3152  * @param cfg the configuration to use
3153  * @param app_port the application port for which new streams will be accepted
3154  * @param listen_cb this function will be called when a peer tries to establish
3155  *            a stream with us
3156  * @param listen_cb_cls closure for listen_cb
3157  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
3158  * @return listen socket, NULL for any error
3159  */
3160 struct GNUNET_STREAM_ListenSocket *
3161 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
3162                       GNUNET_MESH_ApplicationType app_port,
3163                       GNUNET_STREAM_ListenCallback listen_cb,
3164                       void *listen_cb_cls,
3165                       ...)
3166 {
3167   /* FIXME: Add variable args for passing configration options? */
3168   struct GNUNET_STREAM_ListenSocket *lsocket;
3169   struct GNUNET_TIME_Relative listen_timeout;
3170   enum GNUNET_STREAM_Option option;
3171   va_list vargs;
3172   uint16_t payload_size;
3173
3174   GNUNET_assert (NULL != listen_cb);
3175   lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
3176   lsocket->cfg = GNUNET_CONFIGURATION_dup (cfg);
3177   lsocket->lockmanager = GNUNET_LOCKMANAGER_connect (lsocket->cfg);
3178   if (NULL == lsocket->lockmanager)
3179   {
3180     GNUNET_CONFIGURATION_destroy (lsocket->cfg);
3181     GNUNET_free (lsocket);
3182     return NULL;
3183   }
3184   lsocket->listening = GNUNET_NO;/* We listen when we get a lock on app_port */  
3185   /* Set defaults */
3186   lsocket->retransmit_timeout = TIME_REL_SECS (default_timeout);
3187   lsocket->testing_active = GNUNET_NO;
3188   lsocket->listen_ok_cb = NULL;
3189   lsocket->max_payload_size = DEFAULT_MAX_PAYLOAD_SIZE;
3190   listen_timeout = TIME_REL_SECS (60); /* A minute for listen timeout */  
3191   va_start (vargs, listen_cb_cls);
3192   do {
3193     option = va_arg (vargs, enum GNUNET_STREAM_Option);
3194     switch (option)
3195     {
3196     case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
3197       lsocket->retransmit_timeout = va_arg (vargs,
3198                                             struct GNUNET_TIME_Relative);
3199       break;
3200     case GNUNET_STREAM_OPTION_TESTING_SET_WRITE_SEQUENCE_NUMBER:
3201       lsocket->testing_active = GNUNET_YES;
3202       lsocket->testing_set_write_sequence_number_value = va_arg (vargs,
3203                                                                  uint32_t);
3204       break;
3205     case GNUNET_STREAM_OPTION_LISTEN_TIMEOUT:
3206       listen_timeout = GNUNET_TIME_relative_multiply
3207         (GNUNET_TIME_UNIT_MILLISECONDS, va_arg (vargs, uint32_t));
3208       break;
3209     case GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS:
3210       lsocket->listen_ok_cb = va_arg (vargs,
3211                                       GNUNET_STREAM_ListenSuccessCallback);
3212       break;
3213     case GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE:
3214       payload_size = (uint16_t) va_arg (vargs, unsigned int);
3215       GNUNET_assert (0 != payload_size);
3216       if (payload_size < lsocket->max_payload_size)
3217         lsocket->max_payload_size = payload_size;
3218       break;
3219     case GNUNET_STREAM_OPTION_END:
3220       break;
3221     }
3222   } while (GNUNET_STREAM_OPTION_END != option);
3223   va_end (vargs);
3224   lsocket->port = app_port;
3225   lsocket->listen_cb = listen_cb;
3226   lsocket->listen_cb_cls = listen_cb_cls;
3227   lsocket->locking_request = 
3228     GNUNET_LOCKMANAGER_acquire_lock (lsocket->lockmanager, locking_domain,
3229                                      (uint32_t) lsocket->port,
3230                                      &lock_status_change_cb, lsocket);
3231   lsocket->lockmanager_acquire_timeout_task =
3232     GNUNET_SCHEDULER_add_delayed (listen_timeout,
3233                                   &lockmanager_acquire_timeout, lsocket);
3234   return lsocket;
3235 }
3236
3237
3238 /**
3239  * Closes the listen socket
3240  *
3241  * @param lsocket the listen socket
3242  */
3243 void
3244 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
3245 {
3246   /* Close MESH connection */
3247   if (NULL != lsocket->mesh)
3248     GNUNET_MESH_disconnect (lsocket->mesh);
3249   GNUNET_CONFIGURATION_destroy (lsocket->cfg);
3250   if (GNUNET_SCHEDULER_NO_TASK != lsocket->lockmanager_acquire_timeout_task)
3251     GNUNET_SCHEDULER_cancel (lsocket->lockmanager_acquire_timeout_task);
3252   if (NULL != lsocket->locking_request)
3253     GNUNET_LOCKMANAGER_cancel_request (lsocket->locking_request);
3254   if (NULL != lsocket->lockmanager)
3255     GNUNET_LOCKMANAGER_disconnect (lsocket->lockmanager);
3256   GNUNET_free (lsocket);
3257 }
3258
3259
3260 /**
3261  * Tries to write the given data to the stream. The maximum size of data that
3262  * can be written as part of a write operation is (64 * (64000 - sizeof (struct
3263  * GNUNET_STREAM_DataMessage))). If size is greater than this it is not an API
3264  * violation, however only the said number of maximum bytes will be written.
3265  *
3266  * @param socket the socket representing a stream
3267  * @param data the data buffer from where the data is written into the stream
3268  * @param size the number of bytes to be written from the data buffer
3269  * @param timeout the timeout period
3270  * @param write_cont the function to call upon writing some bytes into the
3271  *          stream 
3272  * @param write_cont_cls the closure
3273  *
3274  * @return handle to cancel the operation; if a previous write is pending or
3275  *           the stream has been shutdown for this operation then write_cont is
3276  *           immediately called and NULL is returned.
3277  */
3278 struct GNUNET_STREAM_IOWriteHandle *
3279 GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
3280                      const void *data,
3281                      size_t size,
3282                      struct GNUNET_TIME_Relative timeout,
3283                      GNUNET_STREAM_CompletionContinuation write_cont,
3284                      void *write_cont_cls)
3285 {
3286   struct GNUNET_STREAM_IOWriteHandle *io_handle;
3287   struct GNUNET_STREAM_DataMessage *data_msg;
3288   const void *sweep;
3289   struct GNUNET_TIME_Relative ack_deadline;
3290   unsigned int num_needed_packets;
3291   unsigned int packet;
3292   uint32_t packet_size;
3293   uint32_t payload_size;
3294   uint16_t max_data_packet_size;
3295
3296   LOG (GNUNET_ERROR_TYPE_DEBUG,
3297        "%s\n", __func__);
3298   if (NULL != socket->write_handle)
3299   {
3300     GNUNET_break (0);
3301     return NULL;
3302   }
3303   switch (socket->state)
3304   {
3305   case STATE_TRANSMIT_CLOSED:
3306   case STATE_TRANSMIT_CLOSE_WAIT:
3307   case STATE_CLOSED:
3308   case STATE_CLOSE_WAIT:
3309     if (NULL != write_cont)
3310       write_cont (write_cont_cls, GNUNET_STREAM_SHUTDOWN, 0);
3311     LOG (GNUNET_ERROR_TYPE_DEBUG,
3312          "%s() END\n", __func__);
3313     return NULL;
3314   case STATE_INIT:
3315   case STATE_LISTEN:
3316   case STATE_HELLO_WAIT:
3317     if (NULL != write_cont)
3318       /* FIXME: GNUNET_STREAM_SYSERR?? */
3319       write_cont (write_cont_cls, GNUNET_STREAM_SYSERR, 0);
3320     LOG (GNUNET_ERROR_TYPE_DEBUG,
3321          "%s() END\n", __func__);
3322     return NULL;
3323   case STATE_ESTABLISHED:
3324   case STATE_RECEIVE_CLOSED:
3325   case STATE_RECEIVE_CLOSE_WAIT:
3326     break;
3327   }
3328   if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * socket->max_payload_size < size)
3329     size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH  * socket->max_payload_size;
3330   num_needed_packets =
3331       (size + (socket->max_payload_size - 1)) / socket->max_payload_size;
3332   io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
3333   io_handle->socket = socket;
3334   io_handle->write_cont = write_cont;
3335   io_handle->write_cont_cls = write_cont_cls;
3336   io_handle->size = size;
3337   io_handle->packets_sent = 0;
3338   sweep = data;
3339   /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3340      determined from RTT */
3341   ack_deadline = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
3342   /* Divide the given buffer into packets for sending */
3343   max_data_packet_size =
3344       socket->max_payload_size + sizeof (struct GNUNET_STREAM_DataMessage);
3345   for (packet=0; packet < num_needed_packets; packet++)
3346   {
3347     if ((packet + 1) * socket->max_payload_size < size) 
3348     {
3349       payload_size = socket->max_payload_size;
3350       packet_size = max_data_packet_size;
3351     }
3352     else 
3353     {
3354       payload_size = size - packet * socket->max_payload_size;
3355       packet_size = 
3356           payload_size + sizeof (struct GNUNET_STREAM_DataMessage);
3357     }
3358     io_handle->messages[packet] = GNUNET_malloc (packet_size);
3359     io_handle->messages[packet]->header.header.size = htons (packet_size);
3360     io_handle->messages[packet]->header.header.type =
3361       htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
3362     io_handle->messages[packet]->sequence_number =
3363       htonl (socket->write_sequence_number++);
3364     io_handle->messages[packet]->offset = htonl (socket->write_offset);
3365     /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3366        determined from RTT */
3367     io_handle->messages[packet]->ack_deadline =
3368       GNUNET_TIME_relative_hton (ack_deadline);
3369     data_msg = io_handle->messages[packet];
3370     /* Copy data from given buffer to the packet */
3371     memcpy (&data_msg[1], sweep, payload_size);
3372     sweep += payload_size;
3373     socket->write_offset += payload_size;
3374   }
3375   /* ack the last data message. FIXME: remove when we figure out how to do this
3376      using RTT */
3377   io_handle->messages[num_needed_packets - 1]->ack_deadline = 
3378       GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO);
3379   socket->write_handle = io_handle;
3380   write_data (socket);
3381   LOG (GNUNET_ERROR_TYPE_DEBUG,
3382        "%s() END\n", __func__);
3383   return io_handle;
3384 }
3385
3386
3387 /**
3388  * Tries to read data from the stream.
3389  *
3390  * @param socket the socket representing a stream
3391  * @param timeout the timeout period
3392  * @param proc function to call with data (once only)
3393  * @param proc_cls the closure for proc
3394  *
3395  * @return handle to cancel the operation; if the stream has been shutdown for
3396  *           this type of opeartion then the DataProcessor is immediately
3397  *           called with GNUNET_STREAM_SHUTDOWN as status and NULL if returned
3398  */
3399 struct GNUNET_STREAM_IOReadHandle *
3400 GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
3401                     struct GNUNET_TIME_Relative timeout,
3402                     GNUNET_STREAM_DataProcessor proc,
3403                     void *proc_cls)
3404 {
3405   struct GNUNET_STREAM_IOReadHandle *read_handle;
3406   
3407   LOG (GNUNET_ERROR_TYPE_DEBUG,
3408        "%s: %s()\n", 
3409        GNUNET_i2s (&socket->other_peer),
3410        __func__);
3411   /* Return NULL if there is already a read handle; the user has to cancel that
3412      first before continuing or has to wait until it is completed */
3413   if (NULL != socket->read_handle) 
3414     return NULL;
3415   GNUNET_assert (NULL != proc);
3416   switch (socket->state)
3417   {
3418   case STATE_RECEIVE_CLOSED:
3419   case STATE_RECEIVE_CLOSE_WAIT:
3420   case STATE_CLOSED:
3421   case STATE_CLOSE_WAIT:
3422     proc (proc_cls, GNUNET_STREAM_SHUTDOWN, NULL, 0);
3423     LOG (GNUNET_ERROR_TYPE_DEBUG,
3424          "%s: %s() END\n",
3425          GNUNET_i2s (&socket->other_peer),
3426          __func__);
3427     return NULL;
3428   default:
3429     break;
3430   }
3431   read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
3432   read_handle->proc = proc;
3433   read_handle->proc_cls = proc_cls;
3434   read_handle->socket = socket;
3435   socket->read_handle = read_handle;
3436   /* Check if we have a packet at bitmap 0 */
3437   if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
3438                                           0))
3439   {
3440     socket->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
3441                                                      socket);   
3442   }
3443   /* Setup the read timeout task */
3444   socket->read_io_timeout_task_id =
3445     GNUNET_SCHEDULER_add_delayed (timeout,
3446                                   &read_io_timeout,
3447                                   socket);
3448   LOG (GNUNET_ERROR_TYPE_DEBUG,
3449        "%s: %s() END\n",
3450        GNUNET_i2s (&socket->other_peer),
3451        __func__);
3452   return read_handle;
3453 }
3454
3455
3456 /**
3457  * Cancel pending write operation.
3458  *
3459  * @param ioh handle to operation to cancel
3460  */
3461 void
3462 GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
3463 {
3464   struct GNUNET_STREAM_Socket *socket = ioh->socket;
3465   unsigned int packet;
3466
3467   GNUNET_assert (NULL != socket->write_handle);
3468   GNUNET_assert (socket->write_handle == ioh);
3469
3470   if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
3471   {
3472     GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
3473     socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
3474   }
3475
3476   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
3477   {
3478     if (NULL == ioh->messages[packet]) break;
3479     GNUNET_free (ioh->messages[packet]);
3480   }
3481       
3482   GNUNET_free (socket->write_handle);
3483   socket->write_handle = NULL;
3484 }
3485
3486
3487 /**
3488  * Cancel pending read operation.
3489  *
3490  * @param ioh handle to operation to cancel
3491  */
3492 void
3493 GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh)
3494 {
3495   struct GNUNET_STREAM_Socket *socket;
3496   
3497   socket = ioh->socket;
3498   GNUNET_assert (NULL != socket->read_handle);
3499   GNUNET_assert (ioh == socket->read_handle);
3500   /* Read io time task should be there; if it is already executed then this
3501   read handle is not valid; However upon scheduler shutdown the read io task
3502   may be executed before */
3503   if (GNUNET_SCHEDULER_NO_TASK != socket->read_io_timeout_task_id)
3504   {
3505     GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task_id);
3506     socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
3507   }
3508   /* reading task may be present; if so we have to stop it */
3509   if (GNUNET_SCHEDULER_NO_TASK != socket->read_task_id)
3510   {
3511     GNUNET_SCHEDULER_cancel (socket->read_task_id);
3512     socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
3513   }
3514   GNUNET_free (ioh);
3515   socket->read_handle = NULL;
3516 }
3517
3518 /* end of stream_api.c */