moved read_io task and its corresponding timeout task to read handle
[oweals/gnunet.git] / src / stream / stream_api.c
1 /*
2   This file is part of GNUnet.
3   (C) 2012 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /* TODO:
22  *
23  * Checks for matching the sender and socket->other_peer in server
24  * message handlers  
25  *
26  * Add code for write io timeout
27  *
28  * Include retransmission for control messages
29  **/
30
31 /**
32  * @file stream/stream_api.c
33  * @brief Implementation of the stream library
34  * @author Sree Harsha Totakura
35  */
36
37
38 #include "platform.h"
39 #include "gnunet_common.h"
40 #include "gnunet_crypto_lib.h"
41 #include "gnunet_lockmanager_service.h"
42 #include "gnunet_stream_lib.h"
43 #include "stream_protocol.h"
44
45 /**
46  * Generic logging shorthand
47  */
48 #define LOG(kind,...)                                   \
49   GNUNET_log_from (kind, "stream-api", __VA_ARGS__)
50
51 /**
52  * Debug logging shorthand
53  */
54 #define LOG_DEBUG(...)                          \
55   LOG (GNUNET_ERROR_TYPE_DEBUG, __VA_ARGS__)
56
57 /**
58  * Time in relative seconds shorthand
59  */
60 #define TIME_REL_SECS(sec) \
61   GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, sec)
62
63 /**
64  * The maximum packet size of a stream packet
65  */
66 #define DEFAULT_MAX_PAYLOAD_SIZE 64000
67
68 /**
69  * Receive buffer
70  */
71 #define RECEIVE_BUFFER_SIZE 4096000
72
73 /**
74  * states in the Protocol
75  */
76 enum State
77   {
78     /**
79      * Client initialization state
80      */
81     STATE_INIT,
82
83     /**
84      * Listener initialization state 
85      */
86     STATE_LISTEN,
87
88     /**
89      * Pre-connection establishment state
90      */
91     STATE_HELLO_WAIT,
92
93     /**
94      * State where a connection has been established
95      */
96     STATE_ESTABLISHED,
97
98     /**
99      * State where the socket is closed on our side and waiting to be ACK'ed
100      */
101     STATE_RECEIVE_CLOSE_WAIT,
102
103     /**
104      * State where the socket is closed for reading
105      */
106     STATE_RECEIVE_CLOSED,
107
108     /**
109      * State where the socket is closed on our side and waiting to be ACK'ed
110      */
111     STATE_TRANSMIT_CLOSE_WAIT,
112
113     /**
114      * State where the socket is closed for writing
115      */
116     STATE_TRANSMIT_CLOSED,
117
118     /**
119      * State where the socket is closed on our side and waiting to be ACK'ed
120      */
121     STATE_CLOSE_WAIT,
122
123     /**
124      * State where the socket is closed
125      */
126     STATE_CLOSED 
127   };
128
129
130 /**
131  * Functions of this type are called when a message is written
132  *
133  * @param cls the closure from queue_message
134  * @param socket the socket the written message was bound to
135  */
136 typedef void (*SendFinishCallback) (void *cls,
137                                     struct GNUNET_STREAM_Socket *socket);
138
139
140 /**
141  * The send message queue
142  */
143 struct MessageQueue
144 {
145   /**
146    * The message
147    */
148   struct GNUNET_STREAM_MessageHeader *message;
149
150   /**
151    * Callback to be called when the message is sent
152    */
153   SendFinishCallback finish_cb;
154
155   /**
156    * The closure for finish_cb
157    */
158   void *finish_cb_cls;
159
160   /**
161    * The next message in queue. Should be NULL in the last message
162    */
163   struct MessageQueue *next;
164
165   /**
166    * The next message in queue. Should be NULL in the first message
167    */
168   struct MessageQueue *prev;
169 };
170
171
172 /**
173  * The STREAM Socket Handler
174  */
175 struct GNUNET_STREAM_Socket
176 {
177   /**
178    * Retransmission timeout
179    */
180   struct GNUNET_TIME_Relative retransmit_timeout;
181
182   /**
183    * The Acknowledgement Bitmap
184    */
185   GNUNET_STREAM_AckBitmap ack_bitmap;
186
187   /**
188    * Time when the Acknowledgement was queued
189    */
190   struct GNUNET_TIME_Absolute ack_time_registered;
191
192   /**
193    * Queued Acknowledgement deadline
194    */
195   struct GNUNET_TIME_Relative ack_time_deadline;
196
197   /**
198    * The mesh handle
199    */
200   struct GNUNET_MESH_Handle *mesh;
201
202   /**
203    * The mesh tunnel handle
204    */
205   struct GNUNET_MESH_Tunnel *tunnel;
206
207   /**
208    * Stream open closure
209    */
210   void *open_cls;
211
212   /**
213    * Stream open callback
214    */
215   GNUNET_STREAM_OpenCallback open_cb;
216
217   /**
218    * The current transmit handle (if a pending transmit request exists)
219    */
220   struct GNUNET_MESH_TransmitHandle *transmit_handle;
221
222   /**
223    * The current message associated with the transmit handle
224    */
225   struct MessageQueue *queue_head;
226
227   /**
228    * The queue tail, should always point to the last message in queue
229    */
230   struct MessageQueue *queue_tail;
231
232   /**
233    * The write IO_handle associated with this socket
234    */
235   struct GNUNET_STREAM_IOWriteHandle *write_handle;
236
237   /**
238    * The read IO_handle associated with this socket
239    */
240   struct GNUNET_STREAM_IOReadHandle *read_handle;
241
242   /**
243    * The shutdown handle associated with this socket
244    */
245   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
246
247   /**
248    * Buffer for storing received messages
249    */
250   void *receive_buffer;
251
252   /**
253    * The listen socket from which this socket is derived. Should be NULL if it
254    * is not a derived socket
255    */
256   struct GNUNET_STREAM_ListenSocket *lsocket;
257
258   /**
259    * The peer identity of the peer at the other end of the stream
260    */
261   struct GNUNET_PeerIdentity other_peer;
262
263   /**
264    * Task identifier for retransmission task after timeout
265    */
266   GNUNET_SCHEDULER_TaskIdentifier data_retransmission_task_id;
267
268   /**
269    * Task identifier for retransmission of control messages
270    */
271   GNUNET_SCHEDULER_TaskIdentifier control_retransmission_task_id;
272
273   /**
274    * The task for sending timely Acks
275    */
276   GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
277
278   /**
279    * The state of the protocol associated with this socket
280    */
281   enum State state;
282
283   /**
284    * The status of the socket
285    */
286   enum GNUNET_STREAM_Status status;
287
288   /**
289    * The number of previous timeouts; FIXME: currently not used
290    */
291   unsigned int retries;
292
293   /**
294    * The application port number (type: uint32_t)
295    */
296   GNUNET_MESH_ApplicationType app_port;
297
298   /**
299    * Whether testing mode is active or not
300    */
301   int testing_active;
302
303   /**
304    * The write sequence number to be set incase of testing
305    */
306   uint32_t testing_set_write_sequence_number_value;
307
308   /**
309    * The session id associated with this stream connection
310    * FIXME: Not used currently, may be removed
311    */
312   uint32_t session_id;
313
314   /**
315    * Write sequence number. Set to random when sending HELLO(client) and
316    * HELLO_ACK(server) 
317    */
318   uint32_t write_sequence_number;
319
320   /**
321    * Read sequence number. This number's value is determined during handshake
322    */
323   uint32_t read_sequence_number;
324
325   /**
326    * The receiver buffer size
327    */
328   uint32_t receive_buffer_size;
329
330   /**
331    * The receiver buffer boundaries
332    */
333   uint32_t receive_buffer_boundaries[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
334
335   /**
336    * receiver's available buffer after the last acknowledged packet
337    */
338   uint32_t receiver_window_available;
339
340   /**
341    * The offset pointer used during write operation
342    */
343   uint32_t write_offset;
344
345   /**
346    * The offset after which we are expecting data
347    */
348   uint32_t read_offset;
349
350   /**
351    * The offset upto which user has read from the received buffer
352    */
353   uint32_t copy_offset;
354
355   /**
356    * The maximum size of the data message payload this stream handle can send
357    */
358   uint16_t max_payload_size;
359 };
360
361
362 /**
363  * A socket for listening
364  */
365 struct GNUNET_STREAM_ListenSocket
366 {
367   /**
368    * The mesh handle
369    */
370   struct GNUNET_MESH_Handle *mesh;
371
372   /**
373    * Our configuration
374    */
375   struct GNUNET_CONFIGURATION_Handle *cfg;
376
377   /**
378    * Handle to the lock manager service
379    */
380   struct GNUNET_LOCKMANAGER_Handle *lockmanager;
381
382   /**
383    * The active LockingRequest from lockmanager
384    */
385   struct GNUNET_LOCKMANAGER_LockingRequest *locking_request;
386
387   /**
388    * Callback to call after acquring a lock and listening
389    */
390   GNUNET_STREAM_ListenSuccessCallback listen_ok_cb;
391
392   /**
393    * The callback function which is called after successful opening socket
394    */
395   GNUNET_STREAM_ListenCallback listen_cb;
396
397   /**
398    * The call back closure
399    */
400   void *listen_cb_cls;
401
402   /**
403    * The service port
404    */
405   GNUNET_MESH_ApplicationType port;
406   
407   /**
408    * The id of the lockmanager timeout task
409    */
410   GNUNET_SCHEDULER_TaskIdentifier lockmanager_acquire_timeout_task;
411
412   /**
413    * The retransmit timeout
414    */
415   struct GNUNET_TIME_Relative retransmit_timeout;
416   
417   /**
418    * Listen enabled?
419    */
420   int listening;
421
422   /**
423    * Whether testing mode is active or not
424    */
425   int testing_active;
426
427   /**
428    * The write sequence number to be set incase of testing
429    */
430   uint32_t testing_set_write_sequence_number_value;
431
432   /**
433    * The maximum size of the data message payload this stream handle can send
434    */
435   uint16_t max_payload_size;
436
437 };
438
439
440 /**
441  * The IO Write Handle
442  */
443 struct GNUNET_STREAM_IOWriteHandle
444 {
445   /**
446    * The socket to which this write handle is associated
447    */
448   struct GNUNET_STREAM_Socket *socket;
449
450   /**
451    * The packet_buffers associated with this Handle
452    */
453   struct GNUNET_STREAM_DataMessage *messages[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
454
455   /**
456    * The write continuation callback
457    */
458   GNUNET_STREAM_CompletionContinuation write_cont;
459
460   /**
461    * Write continuation closure
462    */
463   void *write_cont_cls;
464
465   /**
466    * The bitmap of this IOHandle; Corresponding bit for a message is set when
467    * it has been acknowledged by the receiver
468    */
469   GNUNET_STREAM_AckBitmap ack_bitmap;
470
471   /**
472    * Number of bytes in this write handle
473    */
474   size_t size;
475
476   /**
477    * Number of packets already transmitted from this IO handle. Retransmitted
478    * packets are not taken into account here. This is used to determine which
479    * packets account for retransmission and which packets occupy buffer space at
480    * the receiver.
481    */
482   unsigned int packets_sent;
483 };
484
485
486 /**
487  * The IO Read Handle
488  */
489 struct GNUNET_STREAM_IOReadHandle
490 {
491   /**
492    * The socket to which this read handle is associated
493    */
494   struct GNUNET_STREAM_Socket *socket;
495   
496   /**
497    * Callback for the read processor
498    */
499   GNUNET_STREAM_DataProcessor proc;
500
501   /**
502    * The closure pointer for the read processor callback
503    */
504   void *proc_cls;
505
506   /**
507    * Task identifier for the read io timeout task
508    */
509   GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id;
510
511   /**
512    * Task scheduled to continue a read operation.
513    */
514   GNUNET_SCHEDULER_TaskIdentifier read_task_id;
515 };
516
517
518 /**
519  * Handle for Shutdown
520  */
521 struct GNUNET_STREAM_ShutdownHandle
522 {
523   /**
524    * The socket associated with this shutdown handle
525    */
526   struct GNUNET_STREAM_Socket *socket;
527
528   /**
529    * Shutdown completion callback
530    */
531   GNUNET_STREAM_ShutdownCompletion completion_cb;
532
533   /**
534    * Closure for completion callback
535    */
536   void *completion_cls;
537
538   /**
539    * Close message retransmission task id
540    */
541   GNUNET_SCHEDULER_TaskIdentifier close_msg_retransmission_task_id;
542
543   /**
544    * Which operation to shutdown? SHUT_RD, SHUT_WR or SHUT_RDWR
545    */
546   int operation;  
547 };
548
549
550 /**
551  * Default value in seconds for various timeouts
552  */
553 static const unsigned int default_timeout = 10;
554
555 /**
556  * The domain name for locks we use here
557  */
558 static const char *locking_domain = "GNUNET_STREAM_APPLOCK";
559
560
561 /**
562  * Callback function for sending queued message
563  *
564  * @param cls closure the socket
565  * @param size number of bytes available in buf
566  * @param buf where the callee should write the message
567  * @return number of bytes written to buf
568  */
569 static size_t
570 send_message_notify (void *cls, size_t size, void *buf)
571 {
572   struct GNUNET_STREAM_Socket *socket = cls;
573   struct MessageQueue *head;
574   size_t ret;
575
576   socket->transmit_handle = NULL; /* Remove the transmit handle */
577   head = socket->queue_head;
578   if (NULL == head)
579     return 0; /* just to be safe */
580   if (0 == size)                /* request timed out */
581   {
582     socket->retries++;
583     LOG (GNUNET_ERROR_TYPE_DEBUG,
584          "%s: Message sending timed out. Retry %d \n",
585          GNUNET_i2s (&socket->other_peer),
586          socket->retries);
587     socket->transmit_handle = 
588       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
589                                          GNUNET_NO, /* Corking */
590                                          /* FIXME: exponential backoff */
591                                          socket->retransmit_timeout,
592                                          &socket->other_peer,
593                                          ntohs (head->message->header.size),
594                                          &send_message_notify,
595                                          socket);
596     return 0;
597   }
598   ret = ntohs (head->message->header.size);
599   GNUNET_assert (size >= ret);
600   memcpy (buf, head->message, ret);
601   if (NULL != head->finish_cb)
602   {
603     head->finish_cb (head->finish_cb_cls, socket);
604   }
605   GNUNET_CONTAINER_DLL_remove (socket->queue_head,
606                                socket->queue_tail,
607                                head);
608   GNUNET_free (head->message);
609   GNUNET_free (head);
610   head = socket->queue_head;
611   if (NULL != head)    /* more pending messages to send */
612   {
613     socket->retries = 0;
614     socket->transmit_handle = 
615       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
616                                          GNUNET_NO, /* Corking */
617                                          /* FIXME: exponential backoff */
618                                          socket->retransmit_timeout,
619                                          &socket->other_peer,
620                                          ntohs (head->message->header.size),
621                                          &send_message_notify,
622                                          socket);
623   }
624   return ret;
625 }
626
627
628 /**
629  * Queues a message for sending using the mesh connection of a socket
630  *
631  * @param socket the socket whose mesh connection is used
632  * @param message the message to be sent
633  * @param finish_cb the callback to be called when the message is sent
634  * @param finish_cb_cls the closure for the callback
635  * @param urgent set to GNUNET_YES to add the message to the beginning of the
636  *          queue; GNUNET_NO to add at the tail
637  */
638 static void
639 queue_message (struct GNUNET_STREAM_Socket *socket,
640                struct GNUNET_STREAM_MessageHeader *message,
641                SendFinishCallback finish_cb,
642                void *finish_cb_cls,
643                int urgent)
644 {
645   struct MessageQueue *queue_entity;
646
647   GNUNET_assert 
648     ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA)
649      && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK));
650   LOG (GNUNET_ERROR_TYPE_DEBUG,
651        "%s: Queueing message of type %d and size %d\n",
652        GNUNET_i2s (&socket->other_peer),
653        ntohs (message->header.type),
654        ntohs (message->header.size));
655   GNUNET_assert (NULL != message);
656   queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
657   queue_entity->message = message;
658   queue_entity->finish_cb = finish_cb;
659   queue_entity->finish_cb_cls = finish_cb_cls;
660   if (GNUNET_YES == urgent)
661   {
662     GNUNET_CONTAINER_DLL_insert (socket->queue_head, socket->queue_tail,
663                                  queue_entity);
664     if (NULL != socket->transmit_handle)
665     {
666       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
667       socket->transmit_handle = NULL;
668     }
669   }
670   else
671     GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
672                                       socket->queue_tail,
673                                       queue_entity);
674   if (NULL == socket->transmit_handle)
675   {
676     socket->retries = 0;
677     socket->transmit_handle = 
678         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
679                                            GNUNET_NO, /* Corking */
680                                            socket->retransmit_timeout,
681                                            &socket->other_peer,
682                                            ntohs (message->header.size),
683                                            &send_message_notify,
684                                            socket);
685   }
686 }
687
688
689 /**
690  * Copies a message and queues it for sending using the mesh connection of
691  * given socket 
692  *
693  * @param socket the socket whose mesh connection is used
694  * @param message the message to be sent
695  * @param finish_cb the callback to be called when the message is sent
696  * @param finish_cb_cls the closure for the callback
697  */
698 static void
699 copy_and_queue_message (struct GNUNET_STREAM_Socket *socket,
700                         const struct GNUNET_STREAM_MessageHeader *message,
701                         SendFinishCallback finish_cb,
702                         void *finish_cb_cls)
703 {
704   struct GNUNET_STREAM_MessageHeader *msg_copy;
705   uint16_t size;
706   
707   size = ntohs (message->header.size);
708   msg_copy = GNUNET_malloc (size);
709   memcpy (msg_copy, message, size);
710   queue_message (socket, msg_copy, finish_cb, finish_cb_cls, GNUNET_NO);
711 }
712
713
714 /**
715  * Writes data using the given socket. The amount of data written is limited by
716  * the receiver_window_size
717  *
718  * @param socket the socket to use
719  */
720 static void 
721 write_data (struct GNUNET_STREAM_Socket *socket);
722
723
724 /**
725  * Task for retransmitting data messages if they aren't ACK before their ack
726  * deadline 
727  *
728  * @param cls the socket
729  * @param tc the Task context
730  */
731 static void
732 data_retransmission_task (void *cls,
733                           const struct GNUNET_SCHEDULER_TaskContext *tc)
734 {
735   struct GNUNET_STREAM_Socket *socket = cls;
736   
737   socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
738   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
739     return;
740   LOG (GNUNET_ERROR_TYPE_DEBUG,
741        "%s: Retransmitting DATA...\n", GNUNET_i2s (&socket->other_peer));
742   write_data (socket);
743 }
744
745
746 /**
747  * Task for sending ACK message
748  *
749  * @param cls the socket
750  * @param tc the Task context
751  */
752 static void
753 ack_task (void *cls,
754           const struct GNUNET_SCHEDULER_TaskContext *tc)
755 {
756   struct GNUNET_STREAM_Socket *socket = cls;
757   struct GNUNET_STREAM_AckMessage *ack_msg;
758
759   socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
760   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
761     return;
762   /* Create the ACK Message */
763   ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
764   ack_msg->header.header.size = htons (sizeof (struct 
765                                                GNUNET_STREAM_AckMessage));
766   ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
767   ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
768   ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
769   ack_msg->receive_window_remaining = 
770     htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
771   /* Queue up ACK for immediate sending */
772   queue_message (socket, &ack_msg->header, NULL, NULL, GNUNET_YES);
773 }
774
775
776 /**
777  * Retransmission task for shutdown messages
778  *
779  * @param cls the shutdown handle
780  * @param tc the Task Context
781  */
782 static void
783 close_msg_retransmission_task (void *cls,
784                                const struct GNUNET_SCHEDULER_TaskContext *tc)
785 {
786   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle = cls;
787   struct GNUNET_STREAM_MessageHeader *msg;
788   struct GNUNET_STREAM_Socket *socket;
789
790   shutdown_handle->close_msg_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
791   GNUNET_assert (NULL != shutdown_handle);
792   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
793     return;
794   socket = shutdown_handle->socket;
795   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
796   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
797   switch (shutdown_handle->operation)
798   {
799   case SHUT_RDWR:
800     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
801     break;
802   case SHUT_RD:
803     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
804     break;
805   case SHUT_WR:
806     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
807     break;
808   default:
809     GNUNET_free (msg);
810     shutdown_handle->close_msg_retransmission_task_id = 
811       GNUNET_SCHEDULER_NO_TASK;
812     return;
813   }
814   queue_message (socket, msg, NULL, NULL, GNUNET_NO);
815   shutdown_handle->close_msg_retransmission_task_id =
816     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
817                                   &close_msg_retransmission_task,
818                                   shutdown_handle);
819 }
820
821
822 /**
823  * Function to modify a bit in GNUNET_STREAM_AckBitmap
824  *
825  * @param bitmap the bitmap to modify
826  * @param bit the bit number to modify
827  * @param value GNUNET_YES to on, GNUNET_NO to off
828  */
829 static void
830 ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
831                       unsigned int bit, 
832                       int value)
833 {
834   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
835   if (GNUNET_YES == value)
836     *bitmap |= (1LL << bit);
837   else
838     *bitmap &= ~(1LL << bit);
839 }
840
841
842 /**
843  * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
844  *
845  * @param bitmap address of the bitmap that has to be checked
846  * @param bit the bit number to check
847  * @return GNUNET_YES if the bit is set; GNUNET_NO if not
848  */
849 static uint8_t
850 ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
851                       unsigned int bit)
852 {
853   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
854   return 0 != (*bitmap & (1LL << bit));
855 }
856
857
858 /**
859  * Writes data using the given socket. The amount of data written is limited by
860  * the receiver_window_size
861  *
862  * @param socket the socket to use
863  */
864 static void 
865 write_data (struct GNUNET_STREAM_Socket *socket)
866 {
867   struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
868   unsigned int packet;
869   
870   for (packet=0; packet < io_handle->packets_sent; packet++)
871   {
872     if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
873                                            packet))
874     {
875       LOG (GNUNET_ERROR_TYPE_DEBUG,
876            "%s: Retransmitting DATA message with sequence %u\n",
877            GNUNET_i2s (&socket->other_peer),
878            ntohl (io_handle->messages[packet]->sequence_number));
879       copy_and_queue_message (socket,
880                               &io_handle->messages[packet]->header,
881                               NULL,
882                               NULL);
883     }
884   }
885   /* Now send new packets if there is enough buffer space */
886   while ( (NULL != io_handle->messages[packet]) &&
887           (socket->receiver_window_available 
888            >= ntohs (io_handle->messages[packet]->header.header.size)) &&
889           (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
890   {
891     socket->receiver_window_available -= 
892       ntohs (io_handle->messages[packet]->header.header.size);
893     LOG (GNUNET_ERROR_TYPE_DEBUG,
894          "%s: Placing DATA message with sequence %u in send queue\n",
895          GNUNET_i2s (&socket->other_peer),
896          ntohl (io_handle->messages[packet]->sequence_number));
897     copy_and_queue_message (socket,
898                             &io_handle->messages[packet]->header,
899                             NULL,
900                             NULL);
901     packet++;
902   }
903   io_handle->packets_sent = packet;
904   if (GNUNET_SCHEDULER_NO_TASK == socket->data_retransmission_task_id)
905     socket->data_retransmission_task_id = 
906       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 
907                                     (GNUNET_TIME_UNIT_SECONDS, 8),
908                                     &data_retransmission_task,
909                                     socket);
910 }
911
912
913 /**
914  * Task for calling the read processor
915  *
916  * @param cls the socket
917  * @param tc the task context
918  */
919 static void
920 call_read_processor (void *cls,
921                      const struct GNUNET_SCHEDULER_TaskContext *tc)
922 {
923   struct GNUNET_STREAM_Socket *socket = cls;
924   struct GNUNET_STREAM_IOReadHandle *read_handle;
925   size_t read_size;
926   size_t valid_read_size;
927   unsigned int packet;
928   uint32_t sequence_increase;
929   uint32_t offset_increase;
930
931   read_handle = socket->read_handle;
932   GNUNET_assert (NULL != read_handle);
933   read_handle->read_task_id = GNUNET_SCHEDULER_NO_TASK;
934   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
935     return;
936   if (NULL == socket->receive_buffer) 
937     return;
938   GNUNET_assert (NULL != socket->read_handle);
939   GNUNET_assert (NULL != socket->read_handle->proc);
940   /* Check the bitmap for any holes */
941   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
942   {
943     if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
944                                            packet))
945       break;
946   }
947   /* We only call read processor if we have the first packet */
948   GNUNET_assert (0 < packet);
949   valid_read_size = 
950     socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
951   GNUNET_assert (0 != valid_read_size);
952   /* Cancel the read_io_timeout_task */
953   GNUNET_SCHEDULER_cancel (read_handle->read_io_timeout_task_id);
954   read_handle->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
955   /* Call the data processor */
956   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Calling read processor\n",
957        GNUNET_i2s (&socket->other_peer));
958   read_size = 
959       socket->read_handle->proc (socket->read_handle->proc_cls,
960                                  socket->status,
961                                  socket->receive_buffer + socket->copy_offset,
962                                  valid_read_size);
963   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor read %d bytes\n",
964        GNUNET_i2s (&socket->other_peer), read_size);
965   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor completed successfully\n",
966        GNUNET_i2s (&socket->other_peer));
967   /* Free the read handle */
968   GNUNET_free (socket->read_handle);
969   socket->read_handle = NULL;
970   GNUNET_assert (read_size <= valid_read_size);
971   socket->copy_offset += read_size;
972   /* Determine upto which packet we can remove from the buffer */
973   for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
974   {
975     if (socket->copy_offset == socket->receive_buffer_boundaries[packet])
976     { packet++; break; }
977     if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
978       break;
979   }
980   /* If no packets can be removed we can't move the buffer */
981   if (0 == packet) 
982     return;
983   sequence_increase = packet;
984   LOG (GNUNET_ERROR_TYPE_DEBUG,
985        "%s: Sequence increase after read processor completion: %u\n",
986        GNUNET_i2s (&socket->other_peer), sequence_increase);
987   /* Shift the data in the receive buffer */
988   socket->receive_buffer = 
989     memmove (socket->receive_buffer,
990              socket->receive_buffer 
991              + socket->receive_buffer_boundaries[sequence_increase-1],
992              socket->receive_buffer_size
993              - socket->receive_buffer_boundaries[sequence_increase-1]);
994   /* Shift the bitmap */
995   socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
996   /* Set read_sequence_number */
997   socket->read_sequence_number += sequence_increase;
998   /* Set read_offset */
999   offset_increase = socket->receive_buffer_boundaries[sequence_increase-1];
1000   socket->read_offset += offset_increase;
1001   /* Fix copy_offset */
1002   GNUNET_assert (offset_increase <= socket->copy_offset);
1003   socket->copy_offset -= offset_increase;
1004   /* Fix relative boundaries */
1005   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
1006   {
1007     if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase)
1008     {
1009       uint32_t ahead_buffer_boundary;
1010
1011       ahead_buffer_boundary = 
1012         socket->receive_buffer_boundaries[packet + sequence_increase];
1013       if (0 == ahead_buffer_boundary)
1014         socket->receive_buffer_boundaries[packet] = 0;
1015       else
1016       {
1017         GNUNET_assert (offset_increase < ahead_buffer_boundary);
1018         socket->receive_buffer_boundaries[packet] = 
1019           ahead_buffer_boundary - offset_increase;
1020       }
1021     }
1022     else
1023       socket->receive_buffer_boundaries[packet] = 0;
1024   }
1025 }
1026
1027
1028 /**
1029  * Cancels the existing read io handle
1030  *
1031  * @param cls the closure from the SCHEDULER call
1032  * @param tc the task context
1033  */
1034 static void
1035 read_io_timeout (void *cls,
1036                  const struct GNUNET_SCHEDULER_TaskContext *tc)
1037 {
1038   struct GNUNET_STREAM_Socket *socket = cls;
1039   struct GNUNET_STREAM_IOReadHandle *read_handle;
1040   GNUNET_STREAM_DataProcessor proc;
1041   void *proc_cls;
1042
1043   read_handle = socket->read_handle;
1044   GNUNET_assert (NULL != read_handle);
1045   read_handle->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
1046   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
1047     return;
1048   if (read_handle->read_task_id != GNUNET_SCHEDULER_NO_TASK)
1049   {
1050     LOG (GNUNET_ERROR_TYPE_DEBUG,
1051          "%s: Read task timedout - Cancelling it\n",
1052          GNUNET_i2s (&socket->other_peer));
1053     GNUNET_SCHEDULER_cancel (read_handle->read_task_id);
1054     read_handle->read_task_id = GNUNET_SCHEDULER_NO_TASK;
1055   }
1056   proc = read_handle->proc;
1057   proc_cls = read_handle->proc_cls;
1058   GNUNET_free (read_handle);
1059   socket->read_handle = NULL;
1060   /* Call the read processor to signal timeout */
1061   proc (proc_cls,
1062         GNUNET_STREAM_TIMEOUT,
1063         NULL,
1064         0);
1065 }
1066
1067
1068 /**
1069  * Handler for DATA messages; Same for both client and server
1070  *
1071  * @param socket the socket through which the ack was received
1072  * @param tunnel connection to the other end
1073  * @param sender who sent the message
1074  * @param msg the data message
1075  * @param atsi performance data for the connection
1076  * @return GNUNET_OK to keep the connection open,
1077  *         GNUNET_SYSERR to close it (signal serious error)
1078  */
1079 static int
1080 handle_data (struct GNUNET_STREAM_Socket *socket,
1081              struct GNUNET_MESH_Tunnel *tunnel,
1082              const struct GNUNET_PeerIdentity *sender,
1083              const struct GNUNET_STREAM_DataMessage *msg,
1084              const struct GNUNET_ATS_Information*atsi)
1085 {
1086   const void *payload;
1087   struct GNUNET_TIME_Relative ack_deadline_rel;
1088   uint32_t bytes_needed;
1089   uint32_t relative_offset;
1090   uint32_t relative_sequence_number;
1091   uint16_t size;
1092
1093   size = htons (msg->header.header.size);
1094   if (size < sizeof (struct GNUNET_STREAM_DataMessage))
1095   {
1096     GNUNET_break_op (0);
1097     return GNUNET_SYSERR;
1098   }
1099   if (0 != memcmp (sender, &socket->other_peer,
1100                    sizeof (struct GNUNET_PeerIdentity)))
1101   {
1102     LOG (GNUNET_ERROR_TYPE_DEBUG,
1103          "%s: Received DATA from non-confirming peer\n",
1104          GNUNET_i2s (&socket->other_peer));
1105     return GNUNET_YES;
1106   }
1107   switch (socket->state)
1108   {
1109   case STATE_ESTABLISHED:
1110   case STATE_TRANSMIT_CLOSED:
1111   case STATE_TRANSMIT_CLOSE_WAIT:      
1112     /* check if the message's sequence number is in the range we are
1113        expecting */
1114     relative_sequence_number = 
1115       ntohl (msg->sequence_number) - socket->read_sequence_number;
1116     if ( relative_sequence_number > GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
1117     {
1118       LOG (GNUNET_ERROR_TYPE_DEBUG,
1119            "%s: Ignoring received message with sequence number %u\n",
1120            GNUNET_i2s (&socket->other_peer),
1121            ntohl (msg->sequence_number));
1122       /* Start ACK sending task if one is not already present */
1123       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1124       {
1125         socket->ack_task_id = 
1126           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1127                                         (msg->ack_deadline),
1128                                         &ack_task,
1129                                         socket);
1130       }
1131       return GNUNET_YES;
1132     }      
1133     /* Check if we have already seen this message */
1134     if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
1135                                             relative_sequence_number))
1136     {
1137       LOG (GNUNET_ERROR_TYPE_DEBUG,
1138            "%s: Ignoring already received message with sequence number %u\n",
1139            GNUNET_i2s (&socket->other_peer),
1140            ntohl (msg->sequence_number));
1141       /* Start ACK sending task if one is not already present */
1142       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1143       {
1144         socket->ack_task_id = 
1145           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1146                                         (msg->ack_deadline), &ack_task, socket);
1147       }
1148       return GNUNET_YES;
1149     }
1150     LOG (GNUNET_ERROR_TYPE_DEBUG,
1151          "%s: Receiving DATA with sequence number: %u and size: %d from %s\n",
1152          GNUNET_i2s (&socket->other_peer), ntohl (msg->sequence_number),
1153          ntohs (msg->header.header.size), GNUNET_i2s (&socket->other_peer));
1154     /* Check if we have to allocate the buffer */
1155     size -= sizeof (struct GNUNET_STREAM_DataMessage);
1156     relative_offset = ntohl (msg->offset) - socket->read_offset;
1157     bytes_needed = relative_offset + size;
1158     if (bytes_needed > socket->receive_buffer_size)
1159     {
1160       if (bytes_needed <= RECEIVE_BUFFER_SIZE)
1161       {
1162         socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
1163                                                  bytes_needed);
1164         socket->receive_buffer_size = bytes_needed;
1165       }
1166       else
1167       {
1168         LOG (GNUNET_ERROR_TYPE_DEBUG,
1169              "%s: Cannot accommodate packet %d as buffer is full\n",
1170              GNUNET_i2s (&socket->other_peer), ntohl (msg->sequence_number));
1171         return GNUNET_YES;
1172       }
1173     }
1174     /* Copy Data to buffer */
1175     payload = &msg[1];
1176     GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
1177     memcpy (socket->receive_buffer + relative_offset, payload, size);
1178     socket->receive_buffer_boundaries[relative_sequence_number] = 
1179         relative_offset + size;
1180     /* Modify the ACK bitmap */
1181     ackbitmap_modify_bit (&socket->ack_bitmap, relative_sequence_number,
1182                           GNUNET_YES);
1183     /* Start ACK sending task if one is not already present */
1184     ack_deadline_rel = GNUNET_TIME_relative_ntoh (msg->ack_deadline);
1185     if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1186     {
1187       ack_deadline_rel = 
1188           GNUNET_TIME_relative_min (ack_deadline_rel,
1189                                     GNUNET_TIME_relative_multiply
1190                                     (GNUNET_TIME_UNIT_SECONDS, 300));
1191       socket->ack_task_id = 
1192           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh 
1193                                         (msg->ack_deadline), &ack_task, socket);
1194       socket->ack_time_registered = GNUNET_TIME_absolute_get ();
1195       socket->ack_time_deadline = ack_deadline_rel;
1196     }
1197     else
1198     {
1199       struct GNUNET_TIME_Relative ack_time_past;
1200       struct GNUNET_TIME_Relative ack_time_remaining;
1201       struct GNUNET_TIME_Relative ack_time_min;
1202       ack_time_past = 
1203           GNUNET_TIME_absolute_get_duration (socket->ack_time_registered);
1204       ack_time_remaining = GNUNET_TIME_relative_subtract
1205           (socket->ack_time_deadline, ack_time_past);
1206       ack_time_min = GNUNET_TIME_relative_min (ack_time_remaining,
1207                                                ack_deadline_rel);
1208       if (0 == memcmp(&ack_deadline_rel, &ack_time_min,
1209                       sizeof (struct GNUNET_TIME_Relative)))
1210       {
1211         ack_deadline_rel = ack_time_min;
1212         GNUNET_SCHEDULER_cancel (socket->ack_task_id);
1213         socket->ack_task_id = GNUNET_SCHEDULER_add_delayed (ack_deadline_rel,
1214                                                             &ack_task, socket);
1215         socket->ack_time_registered = GNUNET_TIME_absolute_get ();
1216         socket->ack_time_deadline = ack_deadline_rel;
1217       }
1218     }
1219     if ((NULL != socket->read_handle) /* A read handle is waiting */
1220         /* There is no current read task */
1221         && (GNUNET_SCHEDULER_NO_TASK == socket->read_handle->read_task_id)
1222         /* We have the first packet */
1223         && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap, 0)))
1224     {
1225       LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Scheduling read processor\n",
1226            GNUNET_i2s (&socket->other_peer));
1227       socket->read_handle->read_task_id =
1228           GNUNET_SCHEDULER_add_now (&call_read_processor, socket);
1229     }
1230     break;
1231   default:
1232     LOG (GNUNET_ERROR_TYPE_DEBUG,
1233          "%s: Received data message when it cannot be handled\n",
1234          GNUNET_i2s (&socket->other_peer));
1235     break;
1236   }
1237   return GNUNET_YES;
1238 }
1239
1240
1241 /**
1242  * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1243  *
1244  * @param cls the socket (set from GNUNET_MESH_connect)
1245  * @param tunnel connection to the other end
1246  * @param tunnel_ctx place to store local state associated with the tunnel
1247  * @param sender who sent the message
1248  * @param message the actual message
1249  * @param atsi performance data for the connection
1250  * @return GNUNET_OK to keep the connection open,
1251  *         GNUNET_SYSERR to close it (signal serious error)
1252  */
1253 static int
1254 client_handle_data (void *cls,
1255                     struct GNUNET_MESH_Tunnel *tunnel,
1256                     void **tunnel_ctx,
1257                     const struct GNUNET_PeerIdentity *sender,
1258                     const struct GNUNET_MessageHeader *message,
1259                     const struct GNUNET_ATS_Information*atsi)
1260 {
1261   struct GNUNET_STREAM_Socket *socket = cls;
1262
1263   return handle_data (socket, tunnel, sender, 
1264                       (const struct GNUNET_STREAM_DataMessage *) message, atsi);
1265 }
1266
1267
1268 /**
1269  * Callback to set state to ESTABLISHED
1270  *
1271  * @param cls the closure NULL;
1272  * @param socket the socket to requiring state change
1273  */
1274 static void
1275 set_state_established (void *cls,
1276                        struct GNUNET_STREAM_Socket *socket)
1277 {
1278   LOG (GNUNET_ERROR_TYPE_DEBUG, 
1279        "%s: Attaining ESTABLISHED state\n",
1280        GNUNET_i2s (&socket->other_peer));
1281   socket->write_offset = 0;
1282   socket->read_offset = 0;
1283   socket->state = STATE_ESTABLISHED;
1284   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK !=
1285                  socket->control_retransmission_task_id);
1286   GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
1287   socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
1288   if (NULL != socket->lsocket)
1289   {
1290     LOG (GNUNET_ERROR_TYPE_DEBUG,
1291          "%s: Calling listen callback\n",
1292          GNUNET_i2s (&socket->other_peer));
1293     if (GNUNET_SYSERR == 
1294         socket->lsocket->listen_cb (socket->lsocket->listen_cb_cls,
1295                                     socket,
1296                                     &socket->other_peer))
1297     {
1298       socket->state = STATE_CLOSED;
1299       /* FIXME: We should close in a decent way (send RST) */
1300       GNUNET_MESH_tunnel_destroy (socket->tunnel); /* Destroy the tunnel */
1301       GNUNET_free (socket);
1302     }
1303   }
1304   else
1305     socket->open_cb (socket->open_cls, socket);
1306 }
1307
1308
1309 /**
1310  * Callback to set state to HELLO_WAIT
1311  *
1312  * @param cls the closure from queue_message
1313  * @param socket the socket to requiring state change
1314  */
1315 static void
1316 set_state_hello_wait (void *cls,
1317                       struct GNUNET_STREAM_Socket *socket)
1318 {
1319   GNUNET_assert (STATE_INIT == socket->state);
1320   LOG (GNUNET_ERROR_TYPE_DEBUG,
1321        "%s: Attaining HELLO_WAIT state\n",
1322        GNUNET_i2s (&socket->other_peer));
1323   socket->state = STATE_HELLO_WAIT;
1324 }
1325
1326
1327 /**
1328  * Callback to set state to CLOSE_WAIT
1329  *
1330  * @param cls the closure from queue_message
1331  * @param socket the socket requiring state change
1332  */
1333 static void
1334 set_state_close_wait (void *cls,
1335                       struct GNUNET_STREAM_Socket *socket)
1336 {
1337   LOG (GNUNET_ERROR_TYPE_DEBUG,
1338        "%s: Attaing CLOSE_WAIT state\n",
1339        GNUNET_i2s (&socket->other_peer));
1340   socket->state = STATE_CLOSE_WAIT;
1341   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1342   socket->receive_buffer = NULL;
1343   socket->receive_buffer_size = 0;
1344 }
1345
1346
1347 /**
1348  * Callback to set state to RECEIVE_CLOSE_WAIT
1349  *
1350  * @param cls the closure from queue_message
1351  * @param socket the socket requiring state change
1352  */
1353 static void
1354 set_state_receive_close_wait (void *cls,
1355                               struct GNUNET_STREAM_Socket *socket)
1356 {
1357   LOG (GNUNET_ERROR_TYPE_DEBUG,
1358        "%s: Attaing RECEIVE_CLOSE_WAIT state\n",
1359        GNUNET_i2s (&socket->other_peer));
1360   socket->state = STATE_RECEIVE_CLOSE_WAIT;
1361   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1362   socket->receive_buffer = NULL;
1363   socket->receive_buffer_size = 0;
1364 }
1365
1366
1367 /**
1368  * Callback to set state to TRANSMIT_CLOSE_WAIT
1369  *
1370  * @param cls the closure from queue_message
1371  * @param socket the socket requiring state change
1372  */
1373 static void
1374 set_state_transmit_close_wait (void *cls,
1375                                struct GNUNET_STREAM_Socket *socket)
1376 {
1377   LOG (GNUNET_ERROR_TYPE_DEBUG,
1378        "%s: Attaing TRANSMIT_CLOSE_WAIT state\n",
1379        GNUNET_i2s (&socket->other_peer));
1380   socket->state = STATE_TRANSMIT_CLOSE_WAIT;
1381 }
1382
1383
1384 /**
1385  * Callback to set state to CLOSED
1386  *
1387  * @param cls the closure from queue_message
1388  * @param socket the socket requiring state change
1389  */
1390 static void
1391 set_state_closed (void *cls,
1392                   struct GNUNET_STREAM_Socket *socket)
1393 {
1394   socket->state = STATE_CLOSED;
1395 }
1396
1397
1398 /**
1399  * Returns GNUNET_MESSAGE_TYPE_STREAM_HELLO
1400  *
1401  * @return the generate hello message
1402  */
1403 static struct GNUNET_STREAM_MessageHeader *
1404 generate_hello (void)
1405 {
1406   struct GNUNET_STREAM_MessageHeader *msg;
1407
1408   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1409   msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
1410   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1411   return msg;
1412 }
1413
1414
1415 /**
1416  * Returns a new HelloAckMessage. Also sets the write sequence number for the
1417  * socket
1418  *
1419  * @param socket the socket for which this HelloAckMessage has to be generated
1420  * @param generate_seq GNUNET_YES to generate the write sequence number,
1421  *          GNUNET_NO to use the existing sequence number
1422  * @return the HelloAckMessage
1423  */
1424 static struct GNUNET_STREAM_HelloAckMessage *
1425 generate_hello_ack (struct GNUNET_STREAM_Socket *socket,
1426                     int generate_seq)
1427 {
1428   struct GNUNET_STREAM_HelloAckMessage *msg;
1429
1430   if (GNUNET_YES == generate_seq)
1431   {
1432     if (GNUNET_YES == socket->testing_active)
1433       socket->write_sequence_number =
1434         socket->testing_set_write_sequence_number_value;
1435     else
1436       socket->write_sequence_number = 
1437         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1438     LOG_DEBUG ("%s: write sequence number %u\n",
1439                GNUNET_i2s (&socket->other_peer),
1440                (unsigned int) socket->write_sequence_number);
1441   }
1442   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1443   msg->header.header.size = 
1444     htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1445   msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1446   msg->sequence_number = htonl (socket->write_sequence_number);
1447   msg->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
1448   return msg;
1449 }
1450
1451
1452 /**
1453  * Task for retransmitting control messages if they aren't ACK'ed before a
1454  * deadline
1455  *
1456  * @param cls the socket
1457  * @param tc the Task context
1458  */
1459 static void
1460 control_retransmission_task (void *cls,
1461                              const struct GNUNET_SCHEDULER_TaskContext *tc)
1462 {
1463   struct GNUNET_STREAM_Socket *socket = cls;
1464     
1465   socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
1466   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
1467     return;
1468   LOG_DEBUG ("%s: Retransmitting a control message\n",
1469                  GNUNET_i2s (&socket->other_peer));
1470   switch (socket->state)
1471   {
1472   case STATE_INIT:    
1473     GNUNET_break (0);
1474     break;
1475   case STATE_LISTEN:
1476     GNUNET_break (0);
1477     break;
1478   case STATE_HELLO_WAIT:
1479     if (NULL == socket->lsocket) /* We are client */
1480       queue_message (socket, generate_hello (), NULL, NULL, GNUNET_NO);
1481     else
1482       queue_message (socket,
1483                      (struct GNUNET_STREAM_MessageHeader *)
1484                      generate_hello_ack (socket, GNUNET_NO), NULL, NULL,
1485                      GNUNET_NO);
1486     socket->control_retransmission_task_id =
1487     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
1488                                   &control_retransmission_task, socket);
1489     break;
1490   case STATE_ESTABLISHED:
1491     if (NULL == socket->lsocket)
1492       queue_message (socket,
1493                      (struct GNUNET_STREAM_MessageHeader *)
1494                      generate_hello_ack (socket, GNUNET_NO), NULL, NULL,
1495                      GNUNET_NO);
1496     else
1497       GNUNET_break (0);
1498   default:
1499     GNUNET_break (0);
1500   }  
1501 }
1502
1503
1504 /**
1505  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1506  *
1507  * @param cls the socket (set from GNUNET_MESH_connect)
1508  * @param tunnel connection to the other end
1509  * @param tunnel_ctx this is NULL
1510  * @param sender who sent the message
1511  * @param message the actual message
1512  * @param atsi performance data for the connection
1513  * @return GNUNET_OK to keep the connection open,
1514  *         GNUNET_SYSERR to close it (signal serious error)
1515  */
1516 static int
1517 client_handle_hello_ack (void *cls,
1518                          struct GNUNET_MESH_Tunnel *tunnel,
1519                          void **tunnel_ctx,
1520                          const struct GNUNET_PeerIdentity *sender,
1521                          const struct GNUNET_MessageHeader *message,
1522                          const struct GNUNET_ATS_Information*atsi)
1523 {
1524   struct GNUNET_STREAM_Socket *socket = cls;
1525   const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
1526   struct GNUNET_STREAM_HelloAckMessage *reply;
1527
1528   if (0 != memcmp (sender, &socket->other_peer,
1529                    sizeof (struct GNUNET_PeerIdentity)))
1530   {
1531     LOG (GNUNET_ERROR_TYPE_DEBUG,
1532          "%s: Received HELLO_ACK from non-confirming peer\n",
1533          GNUNET_i2s (&socket->other_peer));
1534     return GNUNET_YES;
1535   }
1536   ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
1537   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received HELLO_ACK from %s\n",
1538        GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1539   GNUNET_assert (socket->tunnel == tunnel);
1540   switch (socket->state)
1541   {
1542   case STATE_HELLO_WAIT:
1543     socket->read_sequence_number = ntohl (ack_msg->sequence_number);
1544     LOG (GNUNET_ERROR_TYPE_DEBUG,
1545          "%s: Read sequence number %u\n",
1546          GNUNET_i2s (&socket->other_peer),
1547          (unsigned int) socket->read_sequence_number);
1548     socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
1549     reply = generate_hello_ack (socket, GNUNET_YES);
1550     queue_message (socket, &reply->header, &set_state_established,
1551                    NULL, GNUNET_NO);    
1552     return GNUNET_OK;
1553   case STATE_ESTABLISHED:
1554     // call statistics (# ACKs ignored++)
1555     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
1556                    socket->control_retransmission_task_id);
1557     socket->control_retransmission_task_id =
1558       GNUNET_SCHEDULER_add_now (&control_retransmission_task, socket);
1559     return GNUNET_OK;
1560   default:
1561     LOG_DEBUG ("%s: Server %s sent HELLO_ACK when in state %d\n", 
1562                GNUNET_i2s (&socket->other_peer),
1563                GNUNET_i2s (&socket->other_peer), socket->state);
1564     socket->state = STATE_CLOSED; // introduce STATE_ERROR?
1565     return GNUNET_SYSERR;
1566   }
1567 }
1568
1569
1570 /**
1571  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1572  *
1573  * @param cls the socket (set from GNUNET_MESH_connect)
1574  * @param tunnel connection to the other end
1575  * @param tunnel_ctx this is NULL
1576  * @param sender who sent the message
1577  * @param message the actual message
1578  * @param atsi performance data for the connection
1579  * @return GNUNET_OK to keep the connection open,
1580  *         GNUNET_SYSERR to close it (signal serious error)
1581  */
1582 static int
1583 client_handle_reset (void *cls,
1584                      struct GNUNET_MESH_Tunnel *tunnel,
1585                      void **tunnel_ctx,
1586                      const struct GNUNET_PeerIdentity *sender,
1587                      const struct GNUNET_MessageHeader *message,
1588                      const struct GNUNET_ATS_Information*atsi)
1589 {
1590   // struct GNUNET_STREAM_Socket *socket = cls;
1591
1592   return GNUNET_OK;
1593 }
1594
1595
1596 /**
1597  * Common message handler for handling TRANSMIT_CLOSE messages
1598  *
1599  * @param socket the socket through which the ack was received
1600  * @param tunnel connection to the other end
1601  * @param sender who sent the message
1602  * @param msg the transmit close message
1603  * @param atsi performance data for the connection
1604  * @return GNUNET_OK to keep the connection open,
1605  *         GNUNET_SYSERR to close it (signal serious error)
1606  */
1607 static int
1608 handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
1609                        struct GNUNET_MESH_Tunnel *tunnel,
1610                        const struct GNUNET_PeerIdentity *sender,
1611                        const struct GNUNET_STREAM_MessageHeader *msg,
1612                        const struct GNUNET_ATS_Information*atsi)
1613 {
1614   struct GNUNET_STREAM_MessageHeader *reply;
1615
1616   switch (socket->state)
1617   {
1618   case STATE_ESTABLISHED:
1619     socket->state = STATE_RECEIVE_CLOSED;
1620     /* Send TRANSMIT_CLOSE_ACK */
1621     reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1622     reply->header.type = 
1623       htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
1624     reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1625     queue_message (socket, reply, NULL, NULL, GNUNET_NO);
1626     break;
1627   default:
1628     /* FIXME: Call statistics? */
1629     break;
1630   }
1631   return GNUNET_YES;
1632 }
1633
1634
1635 /**
1636  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1637  *
1638  * @param cls the socket (set from GNUNET_MESH_connect)
1639  * @param tunnel connection to the other end
1640  * @param tunnel_ctx this is NULL
1641  * @param sender who sent the message
1642  * @param message the actual message
1643  * @param atsi performance data for the connection
1644  * @return GNUNET_OK to keep the connection open,
1645  *         GNUNET_SYSERR to close it (signal serious error)
1646  */
1647 static int
1648 client_handle_transmit_close (void *cls,
1649                               struct GNUNET_MESH_Tunnel *tunnel,
1650                               void **tunnel_ctx,
1651                               const struct GNUNET_PeerIdentity *sender,
1652                               const struct GNUNET_MessageHeader *message,
1653                               const struct GNUNET_ATS_Information*atsi)
1654 {
1655   struct GNUNET_STREAM_Socket *socket = cls;
1656   
1657   return handle_transmit_close (socket,
1658                                 tunnel,
1659                                 sender,
1660                                 (struct GNUNET_STREAM_MessageHeader *)message,
1661                                 atsi);
1662 }
1663
1664
1665 /**
1666  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_*_CLOSE_ACK messages
1667  *
1668  * @param socket the socket
1669  * @param tunnel connection to the other end
1670  * @param sender who sent the message
1671  * @param message the actual message
1672  * @param atsi performance data for the connection
1673  * @param operation the close operation which is being ACK'ed
1674  * @return GNUNET_OK to keep the connection open,
1675  *         GNUNET_SYSERR to close it (signal serious error)
1676  */
1677 static int
1678 handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
1679                           struct GNUNET_MESH_Tunnel *tunnel,
1680                           const struct GNUNET_PeerIdentity *sender,
1681                           const struct GNUNET_STREAM_MessageHeader *message,
1682                           const struct GNUNET_ATS_Information *atsi,
1683                           int operation)
1684 {
1685   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
1686
1687   shutdown_handle = socket->shutdown_handle;
1688   if (NULL == shutdown_handle)
1689   {
1690     LOG (GNUNET_ERROR_TYPE_DEBUG,
1691          "%s: Received CLOSE_ACK when shutdown handle is NULL\n",
1692          GNUNET_i2s (&socket->other_peer));
1693     return GNUNET_OK;
1694   }
1695   switch (operation)
1696   {
1697   case SHUT_RDWR:
1698     switch (socket->state)
1699     {
1700     case STATE_CLOSE_WAIT:
1701       if (SHUT_RDWR != shutdown_handle->operation)
1702       {
1703         LOG (GNUNET_ERROR_TYPE_DEBUG,
1704              "%s: Received CLOSE_ACK when shutdown handle is not for "
1705              "SHUT_RDWR\n", GNUNET_i2s (&socket->other_peer));
1706         return GNUNET_OK;
1707       }
1708       LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received CLOSE_ACK from %s\n",
1709            GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1710       socket->state = STATE_CLOSED;
1711       break;
1712     default:
1713       LOG (GNUNET_ERROR_TYPE_DEBUG,
1714            "%s: Received CLOSE_ACK when in it not expected\n",
1715            GNUNET_i2s (&socket->other_peer));
1716       return GNUNET_OK;
1717     }
1718     break;
1719   case SHUT_RD:
1720     switch (socket->state)
1721     {
1722     case STATE_RECEIVE_CLOSE_WAIT:
1723       if (SHUT_RD != shutdown_handle->operation)
1724       {
1725         LOG (GNUNET_ERROR_TYPE_DEBUG,
1726              "%s: Received RECEIVE_CLOSE_ACK when shutdown handle "
1727              "is not for SHUT_RD\n", GNUNET_i2s (&socket->other_peer));
1728         return GNUNET_OK;
1729       }
1730       LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received RECEIVE_CLOSE_ACK from %s\n",
1731            GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1732       socket->state = STATE_RECEIVE_CLOSED;
1733       break;
1734     default:
1735       LOG (GNUNET_ERROR_TYPE_DEBUG,
1736            "%s: Received RECEIVE_CLOSE_ACK when in it not expected\n",
1737            GNUNET_i2s (&socket->other_peer));
1738       return GNUNET_OK;
1739     }
1740     break;
1741   case SHUT_WR:
1742     switch (socket->state)
1743     {
1744     case STATE_TRANSMIT_CLOSE_WAIT:
1745       if (SHUT_WR != shutdown_handle->operation)
1746       {
1747         LOG (GNUNET_ERROR_TYPE_DEBUG,
1748              "%s: Received TRANSMIT_CLOSE_ACK when shutdown handle "
1749              "is not for SHUT_WR\n",
1750              GNUNET_i2s (&socket->other_peer));
1751         return GNUNET_OK;
1752       }
1753       LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received TRANSMIT_CLOSE_ACK from %s\n",
1754            GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1755       socket->state = STATE_TRANSMIT_CLOSED;
1756       break;
1757     default:
1758       LOG (GNUNET_ERROR_TYPE_DEBUG,
1759            "%s: Received TRANSMIT_CLOSE_ACK when in it not expected\n",
1760            GNUNET_i2s (&socket->other_peer));          
1761       return GNUNET_OK;
1762     }
1763     break;
1764   default:
1765     GNUNET_assert (0);
1766   }
1767   if (NULL != shutdown_handle->completion_cb) /* Shutdown completion */
1768     shutdown_handle->completion_cb(shutdown_handle->completion_cls,
1769                                    operation);
1770   if (GNUNET_SCHEDULER_NO_TASK
1771       != shutdown_handle->close_msg_retransmission_task_id)
1772   {
1773     GNUNET_SCHEDULER_cancel
1774       (shutdown_handle->close_msg_retransmission_task_id);
1775     shutdown_handle->close_msg_retransmission_task_id =
1776         GNUNET_SCHEDULER_NO_TASK;
1777   }
1778   GNUNET_free (shutdown_handle); /* Free shutdown handle */
1779   socket->shutdown_handle = NULL;
1780   return GNUNET_OK;
1781 }
1782
1783
1784 /**
1785  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1786  *
1787  * @param cls the socket (set from GNUNET_MESH_connect)
1788  * @param tunnel connection to the other end
1789  * @param tunnel_ctx this is NULL
1790  * @param sender who sent the message
1791  * @param message the actual message
1792  * @param atsi performance data for the connection
1793  * @return GNUNET_OK to keep the connection open,
1794  *         GNUNET_SYSERR to close it (signal serious error)
1795  */
1796 static int
1797 client_handle_transmit_close_ack (void *cls,
1798                                   struct GNUNET_MESH_Tunnel *tunnel,
1799                                   void **tunnel_ctx,
1800                                   const struct GNUNET_PeerIdentity *sender,
1801                                   const struct GNUNET_MessageHeader *message,
1802                                   const struct GNUNET_ATS_Information*atsi)
1803 {
1804   struct GNUNET_STREAM_Socket *socket = cls;
1805
1806   return handle_generic_close_ack (socket,
1807                                    tunnel,
1808                                    sender,
1809                                    (const struct GNUNET_STREAM_MessageHeader *)
1810                                    message,
1811                                    atsi,
1812                                    SHUT_WR);
1813 }
1814
1815
1816 /**
1817  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1818  *
1819  * @param socket the socket
1820  * @param tunnel connection to the other end
1821  * @param sender who sent the message
1822  * @param message the actual message
1823  * @param atsi performance data for the connection
1824  * @return GNUNET_OK to keep the connection open,
1825  *         GNUNET_SYSERR to close it (signal serious error)
1826  */
1827 static int
1828 handle_receive_close (struct GNUNET_STREAM_Socket *socket,
1829                       struct GNUNET_MESH_Tunnel *tunnel,
1830                       const struct GNUNET_PeerIdentity *sender,
1831                       const struct GNUNET_STREAM_MessageHeader *message,
1832                       const struct GNUNET_ATS_Information *atsi)
1833 {
1834   struct GNUNET_STREAM_MessageHeader *receive_close_ack;
1835
1836   switch (socket->state)
1837   {
1838   case STATE_INIT:
1839   case STATE_LISTEN:
1840   case STATE_HELLO_WAIT:
1841     LOG (GNUNET_ERROR_TYPE_DEBUG,
1842          "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
1843          GNUNET_i2s (&socket->other_peer));
1844     return GNUNET_OK;
1845   default:
1846     break;
1847   }
1848   
1849   LOG (GNUNET_ERROR_TYPE_DEBUG,
1850        "%s: Received RECEIVE_CLOSE from %s\n",
1851        GNUNET_i2s (&socket->other_peer),
1852        GNUNET_i2s (&socket->other_peer));
1853   receive_close_ack =
1854     GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1855   receive_close_ack->header.size =
1856     htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1857   receive_close_ack->header.type =
1858     htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK);
1859   queue_message (socket, receive_close_ack, &set_state_closed,
1860                  NULL, GNUNET_NO);  
1861   /* FIXME: Handle the case where write handle is present; the write operation
1862      should be deemed as finised and the write continuation callback
1863      has to be called with the stream status GNUNET_STREAM_SHUTDOWN */
1864   return GNUNET_OK;
1865 }
1866
1867
1868 /**
1869  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1870  *
1871  * @param cls the socket (set from GNUNET_MESH_connect)
1872  * @param tunnel connection to the other end
1873  * @param tunnel_ctx this is NULL
1874  * @param sender who sent the message
1875  * @param message the actual message
1876  * @param atsi performance data for the connection
1877  * @return GNUNET_OK to keep the connection open,
1878  *         GNUNET_SYSERR to close it (signal serious error)
1879  */
1880 static int
1881 client_handle_receive_close (void *cls,
1882                              struct GNUNET_MESH_Tunnel *tunnel,
1883                              void **tunnel_ctx,
1884                              const struct GNUNET_PeerIdentity *sender,
1885                              const struct GNUNET_MessageHeader *message,
1886                              const struct GNUNET_ATS_Information*atsi)
1887 {
1888   struct GNUNET_STREAM_Socket *socket = cls;
1889
1890   return
1891     handle_receive_close (socket,
1892                           tunnel,
1893                           sender,
1894                           (const struct GNUNET_STREAM_MessageHeader *) message,
1895                           atsi);
1896 }
1897
1898
1899 /**
1900  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1901  *
1902  * @param cls the socket (set from GNUNET_MESH_connect)
1903  * @param tunnel connection to the other end
1904  * @param tunnel_ctx this is NULL
1905  * @param sender who sent the message
1906  * @param message the actual message
1907  * @param atsi performance data for the connection
1908  * @return GNUNET_OK to keep the connection open,
1909  *         GNUNET_SYSERR to close it (signal serious error)
1910  */
1911 static int
1912 client_handle_receive_close_ack (void *cls,
1913                                  struct GNUNET_MESH_Tunnel *tunnel,
1914                                  void **tunnel_ctx,
1915                                  const struct GNUNET_PeerIdentity *sender,
1916                                  const struct GNUNET_MessageHeader *message,
1917                                  const struct GNUNET_ATS_Information*atsi)
1918 {
1919   struct GNUNET_STREAM_Socket *socket = cls;
1920
1921   return handle_generic_close_ack (socket,
1922                                    tunnel,
1923                                    sender,
1924                                    (const struct GNUNET_STREAM_MessageHeader *)
1925                                    message,
1926                                    atsi,
1927                                    SHUT_RD);
1928 }
1929
1930
1931 /**
1932  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1933  *
1934  * @param socket the socket
1935  * @param tunnel connection to the other end
1936  * @param sender who sent the message
1937  * @param message the actual message
1938  * @param atsi performance data for the connection
1939  * @return GNUNET_OK to keep the connection open,
1940  *         GNUNET_SYSERR to close it (signal serious error)
1941  */
1942 static int
1943 handle_close (struct GNUNET_STREAM_Socket *socket,
1944               struct GNUNET_MESH_Tunnel *tunnel,
1945               const struct GNUNET_PeerIdentity *sender,
1946               const struct GNUNET_STREAM_MessageHeader *message,
1947               const struct GNUNET_ATS_Information*atsi)
1948 {
1949   struct GNUNET_STREAM_MessageHeader *close_ack;
1950
1951   switch (socket->state)
1952   {
1953   case STATE_INIT:
1954   case STATE_LISTEN:
1955   case STATE_HELLO_WAIT:
1956     LOG (GNUNET_ERROR_TYPE_DEBUG,
1957          "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
1958          GNUNET_i2s (&socket->other_peer));
1959     return GNUNET_OK;
1960   default:
1961     break;
1962   }
1963
1964   LOG (GNUNET_ERROR_TYPE_DEBUG,
1965        "%s: Received CLOSE from %s\n",
1966        GNUNET_i2s (&socket->other_peer),
1967        GNUNET_i2s (&socket->other_peer));
1968   close_ack = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1969   close_ack->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1970   close_ack->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK);
1971   queue_message (socket, close_ack, &set_state_closed, NULL, GNUNET_NO);
1972   if (socket->state == STATE_CLOSED)
1973     return GNUNET_OK;
1974
1975   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1976   socket->receive_buffer = NULL;
1977   socket->receive_buffer_size = 0;
1978   return GNUNET_OK;
1979 }
1980
1981
1982 /**
1983  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1984  *
1985  * @param cls the socket (set from GNUNET_MESH_connect)
1986  * @param tunnel connection to the other end
1987  * @param tunnel_ctx this is NULL
1988  * @param sender who sent the message
1989  * @param message the actual message
1990  * @param atsi performance data for the connection
1991  * @return GNUNET_OK to keep the connection open,
1992  *         GNUNET_SYSERR to close it (signal serious error)
1993  */
1994 static int
1995 client_handle_close (void *cls,
1996                      struct GNUNET_MESH_Tunnel *tunnel,
1997                      void **tunnel_ctx,
1998                      const struct GNUNET_PeerIdentity *sender,
1999                      const struct GNUNET_MessageHeader *message,
2000                      const struct GNUNET_ATS_Information*atsi)
2001 {
2002   struct GNUNET_STREAM_Socket *socket = cls;
2003
2004   return handle_close (socket,
2005                        tunnel,
2006                        sender,
2007                        (const struct GNUNET_STREAM_MessageHeader *) message,
2008                        atsi);
2009 }
2010
2011
2012 /**
2013  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
2014  *
2015  * @param cls the socket (set from GNUNET_MESH_connect)
2016  * @param tunnel connection to the other end
2017  * @param tunnel_ctx this is NULL
2018  * @param sender who sent the message
2019  * @param message the actual message
2020  * @param atsi performance data for the connection
2021  * @return GNUNET_OK to keep the connection open,
2022  *         GNUNET_SYSERR to close it (signal serious error)
2023  */
2024 static int
2025 client_handle_close_ack (void *cls,
2026                          struct GNUNET_MESH_Tunnel *tunnel,
2027                          void **tunnel_ctx,
2028                          const struct GNUNET_PeerIdentity *sender,
2029                          const struct GNUNET_MessageHeader *message,
2030                          const struct GNUNET_ATS_Information *atsi)
2031 {
2032   struct GNUNET_STREAM_Socket *socket = cls;
2033
2034   return handle_generic_close_ack (socket,
2035                                    tunnel,
2036                                    sender,
2037                                    (const struct GNUNET_STREAM_MessageHeader *) 
2038                                    message,
2039                                    atsi,
2040                                    SHUT_RDWR);
2041 }
2042
2043 /*****************************/
2044 /* Server's Message Handlers */
2045 /*****************************/
2046
2047 /**
2048  * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
2049  *
2050  * @param cls the closure
2051  * @param tunnel connection to the other end
2052  * @param tunnel_ctx the socket
2053  * @param sender who sent the message
2054  * @param message the actual message
2055  * @param atsi performance data for the connection
2056  * @return GNUNET_OK to keep the connection open,
2057  *         GNUNET_SYSERR to close it (signal serious error)
2058  */
2059 static int
2060 server_handle_data (void *cls,
2061                     struct GNUNET_MESH_Tunnel *tunnel,
2062                     void **tunnel_ctx,
2063                     const struct GNUNET_PeerIdentity *sender,
2064                     const struct GNUNET_MessageHeader *message,
2065                     const struct GNUNET_ATS_Information*atsi)
2066 {
2067   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2068
2069   return handle_data (socket,
2070                       tunnel,
2071                       sender,
2072                       (const struct GNUNET_STREAM_DataMessage *)message,
2073                       atsi);
2074 }
2075
2076
2077 /**
2078  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
2079  *
2080  * @param cls the closure
2081  * @param tunnel connection to the other end
2082  * @param tunnel_ctx the socket
2083  * @param sender who sent the message
2084  * @param message the actual message
2085  * @param atsi performance data for the connection
2086  * @return GNUNET_OK to keep the connection open,
2087  *         GNUNET_SYSERR to close it (signal serious error)
2088  */
2089 static int
2090 server_handle_hello (void *cls,
2091                      struct GNUNET_MESH_Tunnel *tunnel,
2092                      void **tunnel_ctx,
2093                      const struct GNUNET_PeerIdentity *sender,
2094                      const struct GNUNET_MessageHeader *message,
2095                      const struct GNUNET_ATS_Information*atsi)
2096 {
2097   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2098   struct GNUNET_STREAM_HelloAckMessage *reply;
2099
2100   if (0 != memcmp (sender,
2101                    &socket->other_peer,
2102                    sizeof (struct GNUNET_PeerIdentity)))
2103   {
2104     LOG_DEBUG ("%s: Received HELLO from non-confirming peer\n",
2105                GNUNET_i2s (&socket->other_peer));
2106     return GNUNET_YES;
2107   }
2108   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == ntohs (message->type));
2109   GNUNET_assert (socket->tunnel == tunnel);
2110   LOG_DEBUG ("%s: Received HELLO from %s\n", GNUNET_i2s (&socket->other_peer),
2111              GNUNET_i2s (&socket->other_peer));
2112   switch (socket->status)
2113   {
2114   case STATE_INIT:
2115     reply = generate_hello_ack (socket, GNUNET_YES);
2116     queue_message (socket, &reply->header, &set_state_hello_wait, NULL,
2117                    GNUNET_NO);
2118     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
2119                    socket->control_retransmission_task_id);
2120     socket->control_retransmission_task_id =
2121       GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
2122                                     &control_retransmission_task, socket);
2123     break;
2124   case STATE_HELLO_WAIT:
2125     /* Perhaps our HELLO_ACK was lost */
2126     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != 
2127                    socket->control_retransmission_task_id);
2128     GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
2129     socket->control_retransmission_task_id =
2130       GNUNET_SCHEDULER_add_now (&control_retransmission_task, socket);
2131     break;
2132   default:
2133     LOG_DEBUG( "%s: Client sent HELLO when in state %d\n",
2134                GNUNET_i2s (&socket->other_peer), socket->state);
2135     /* FIXME: Send RESET? */
2136   }
2137   return GNUNET_OK;
2138 }
2139
2140
2141 /**
2142  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
2143  *
2144  * @param cls the closure
2145  * @param tunnel connection to the other end
2146  * @param tunnel_ctx the socket
2147  * @param sender who sent the message
2148  * @param message the actual message
2149  * @param atsi performance data for the connection
2150  * @return GNUNET_OK to keep the connection open,
2151  *         GNUNET_SYSERR to close it (signal serious error)
2152  */
2153 static int
2154 server_handle_hello_ack (void *cls,
2155                          struct GNUNET_MESH_Tunnel *tunnel,
2156                          void **tunnel_ctx,
2157                          const struct GNUNET_PeerIdentity *sender,
2158                          const struct GNUNET_MessageHeader *message,
2159                          const struct GNUNET_ATS_Information*atsi)
2160 {
2161   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2162   const struct GNUNET_STREAM_HelloAckMessage *ack_message;
2163
2164   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK ==
2165                  ntohs (message->type));
2166   GNUNET_assert (socket->tunnel == tunnel);
2167   ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
2168   switch (socket->state)  
2169   {
2170   case STATE_HELLO_WAIT:
2171     LOG (GNUNET_ERROR_TYPE_DEBUG,
2172          "%s: Received HELLO_ACK from %s\n",
2173          GNUNET_i2s (&socket->other_peer),
2174          GNUNET_i2s (&socket->other_peer));
2175     socket->read_sequence_number = ntohl (ack_message->sequence_number);
2176     LOG (GNUNET_ERROR_TYPE_DEBUG,
2177          "%s: Read sequence number %u\n",
2178          GNUNET_i2s (&socket->other_peer),
2179          (unsigned int) socket->read_sequence_number);
2180     socket->receiver_window_available = 
2181       ntohl (ack_message->receiver_window_size);
2182     set_state_established (NULL, socket);
2183     break;
2184   default:
2185     LOG (GNUNET_ERROR_TYPE_DEBUG,
2186          "Client sent HELLO_ACK when in state %d\n", socket->state);    
2187   }
2188   return GNUNET_OK;
2189 }
2190
2191
2192 /**
2193  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
2194  *
2195  * @param cls the closure
2196  * @param tunnel connection to the other end
2197  * @param tunnel_ctx the socket
2198  * @param sender who sent the message
2199  * @param message the actual message
2200  * @param atsi performance data for the connection
2201  * @return GNUNET_OK to keep the connection open,
2202  *         GNUNET_SYSERR to close it (signal serious error)
2203  */
2204 static int
2205 server_handle_reset (void *cls,
2206                      struct GNUNET_MESH_Tunnel *tunnel,
2207                      void **tunnel_ctx,
2208                      const struct GNUNET_PeerIdentity *sender,
2209                      const struct GNUNET_MessageHeader *message,
2210                      const struct GNUNET_ATS_Information*atsi)
2211 {
2212   // struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2213
2214   return GNUNET_OK;
2215 }
2216
2217
2218 /**
2219  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
2220  *
2221  * @param cls the closure
2222  * @param tunnel connection to the other end
2223  * @param tunnel_ctx the socket
2224  * @param sender who sent the message
2225  * @param message the actual message
2226  * @param atsi performance data for the connection
2227  * @return GNUNET_OK to keep the connection open,
2228  *         GNUNET_SYSERR to close it (signal serious error)
2229  */
2230 static int
2231 server_handle_transmit_close (void *cls,
2232                               struct GNUNET_MESH_Tunnel *tunnel,
2233                               void **tunnel_ctx,
2234                               const struct GNUNET_PeerIdentity *sender,
2235                               const struct GNUNET_MessageHeader *message,
2236                               const struct GNUNET_ATS_Information*atsi)
2237 {
2238   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2239
2240   return handle_transmit_close (socket,
2241                                 tunnel,
2242                                 sender,
2243                                 (struct GNUNET_STREAM_MessageHeader *)message,
2244                                 atsi);
2245 }
2246
2247
2248 /**
2249  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
2250  *
2251  * @param cls the closure
2252  * @param tunnel connection to the other end
2253  * @param tunnel_ctx the socket
2254  * @param sender who sent the message
2255  * @param message the actual message
2256  * @param atsi performance data for the connection
2257  * @return GNUNET_OK to keep the connection open,
2258  *         GNUNET_SYSERR to close it (signal serious error)
2259  */
2260 static int
2261 server_handle_transmit_close_ack (void *cls,
2262                                   struct GNUNET_MESH_Tunnel *tunnel,
2263                                   void **tunnel_ctx,
2264                                   const struct GNUNET_PeerIdentity *sender,
2265                                   const struct GNUNET_MessageHeader *message,
2266                                   const struct GNUNET_ATS_Information*atsi)
2267 {
2268   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2269
2270   return handle_generic_close_ack (socket,
2271                                    tunnel,
2272                                    sender,
2273                                    (const struct GNUNET_STREAM_MessageHeader *)
2274                                    message,
2275                                    atsi,
2276                                    SHUT_WR);
2277 }
2278
2279
2280 /**
2281  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
2282  *
2283  * @param cls the closure
2284  * @param tunnel connection to the other end
2285  * @param tunnel_ctx the socket
2286  * @param sender who sent the message
2287  * @param message the actual message
2288  * @param atsi performance data for the connection
2289  * @return GNUNET_OK to keep the connection open,
2290  *         GNUNET_SYSERR to close it (signal serious error)
2291  */
2292 static int
2293 server_handle_receive_close (void *cls,
2294                              struct GNUNET_MESH_Tunnel *tunnel,
2295                              void **tunnel_ctx,
2296                              const struct GNUNET_PeerIdentity *sender,
2297                              const struct GNUNET_MessageHeader *message,
2298                              const struct GNUNET_ATS_Information*atsi)
2299 {
2300   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2301
2302   return
2303     handle_receive_close (socket,
2304                           tunnel,
2305                           sender,
2306                           (const struct GNUNET_STREAM_MessageHeader *) message,
2307                           atsi);
2308 }
2309
2310
2311 /**
2312  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
2313  *
2314  * @param cls the closure
2315  * @param tunnel connection to the other end
2316  * @param tunnel_ctx the socket
2317  * @param sender who sent the message
2318  * @param message the actual message
2319  * @param atsi performance data for the connection
2320  * @return GNUNET_OK to keep the connection open,
2321  *         GNUNET_SYSERR to close it (signal serious error)
2322  */
2323 static int
2324 server_handle_receive_close_ack (void *cls,
2325                                  struct GNUNET_MESH_Tunnel *tunnel,
2326                                  void **tunnel_ctx,
2327                                  const struct GNUNET_PeerIdentity *sender,
2328                                  const struct GNUNET_MessageHeader *message,
2329                                  const struct GNUNET_ATS_Information*atsi)
2330 {
2331   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2332
2333   return handle_generic_close_ack (socket,
2334                                    tunnel,
2335                                    sender,
2336                                    (const struct GNUNET_STREAM_MessageHeader *)
2337                                    message,
2338                                    atsi,
2339                                    SHUT_RD);
2340 }
2341
2342
2343 /**
2344  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
2345  *
2346  * @param cls the listen socket (from GNUNET_MESH_connect in
2347  *          GNUNET_STREAM_listen) 
2348  * @param tunnel connection to the other end
2349  * @param tunnel_ctx the socket
2350  * @param sender who sent the message
2351  * @param message the actual message
2352  * @param atsi performance data for the connection
2353  * @return GNUNET_OK to keep the connection open,
2354  *         GNUNET_SYSERR to close it (signal serious error)
2355  */
2356 static int
2357 server_handle_close (void *cls,
2358                      struct GNUNET_MESH_Tunnel *tunnel,
2359                      void **tunnel_ctx,
2360                      const struct GNUNET_PeerIdentity *sender,
2361                      const struct GNUNET_MessageHeader *message,
2362                      const struct GNUNET_ATS_Information*atsi)
2363 {
2364   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2365   
2366   return handle_close (socket,
2367                        tunnel,
2368                        sender,
2369                        (const struct GNUNET_STREAM_MessageHeader *) message,
2370                        atsi);
2371 }
2372
2373
2374 /**
2375  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
2376  *
2377  * @param cls the closure
2378  * @param tunnel connection to the other end
2379  * @param tunnel_ctx the socket
2380  * @param sender who sent the message
2381  * @param message the actual message
2382  * @param atsi performance data for the connection
2383  * @return GNUNET_OK to keep the connection open,
2384  *         GNUNET_SYSERR to close it (signal serious error)
2385  */
2386 static int
2387 server_handle_close_ack (void *cls,
2388                          struct GNUNET_MESH_Tunnel *tunnel,
2389                          void **tunnel_ctx,
2390                          const struct GNUNET_PeerIdentity *sender,
2391                          const struct GNUNET_MessageHeader *message,
2392                          const struct GNUNET_ATS_Information*atsi)
2393 {
2394   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2395
2396   return handle_generic_close_ack (socket,
2397                                    tunnel,
2398                                    sender,
2399                                    (const struct GNUNET_STREAM_MessageHeader *) 
2400                                    message,
2401                                    atsi,
2402                                    SHUT_RDWR);
2403 }
2404
2405
2406 /**
2407  * Handler for DATA_ACK messages
2408  *
2409  * @param socket the socket through which the ack was received
2410  * @param tunnel connection to the other end
2411  * @param sender who sent the message
2412  * @param ack the acknowledgment message
2413  * @param atsi performance data for the connection
2414  * @return GNUNET_OK to keep the connection open,
2415  *         GNUNET_SYSERR to close it (signal serious error)
2416  */
2417 static int
2418 handle_ack (struct GNUNET_STREAM_Socket *socket,
2419             struct GNUNET_MESH_Tunnel *tunnel,
2420             const struct GNUNET_PeerIdentity *sender,
2421             const struct GNUNET_STREAM_AckMessage *ack,
2422             const struct GNUNET_ATS_Information*atsi)
2423 {
2424   unsigned int packet;
2425   int need_retransmission;
2426   uint32_t sequence_difference;
2427   
2428   if (0 != memcmp (sender,
2429                    &socket->other_peer,
2430                    sizeof (struct GNUNET_PeerIdentity)))
2431   {
2432     LOG (GNUNET_ERROR_TYPE_DEBUG,
2433          "%s: Received ACK from non-confirming peer\n",
2434          GNUNET_i2s (&socket->other_peer));
2435     return GNUNET_YES;
2436   }
2437   switch (socket->state)
2438   {
2439   case (STATE_ESTABLISHED):
2440   case (STATE_RECEIVE_CLOSED):
2441   case (STATE_RECEIVE_CLOSE_WAIT):
2442     if (NULL == socket->write_handle)
2443     {
2444       LOG (GNUNET_ERROR_TYPE_DEBUG,
2445            "%s: Received DATA_ACK when write_handle is NULL\n",
2446            GNUNET_i2s (&socket->other_peer));
2447       return GNUNET_OK;
2448     }
2449     sequence_difference = 
2450         socket->write_sequence_number - ntohl (ack->base_sequence_number);
2451     if (!(sequence_difference <= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
2452     {
2453       LOG (GNUNET_ERROR_TYPE_DEBUG,
2454            "%s: Received DATA_ACK with unexpected base sequence number\n",
2455            GNUNET_i2s (&socket->other_peer));
2456       LOG (GNUNET_ERROR_TYPE_DEBUG,
2457            "%s: Current write sequence: %u; Ack's base sequence: %u\n",
2458            GNUNET_i2s (&socket->other_peer),
2459            socket->write_sequence_number,
2460            ntohl (ack->base_sequence_number));
2461       return GNUNET_OK;
2462     }
2463     /* FIXME: include the case when write_handle is cancelled - ignore the 
2464        acks */
2465     LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received DATA_ACK from %s\n",
2466          GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
2467     /* Cancel the retransmission task */
2468     if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
2469     {
2470       GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
2471       socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
2472     }
2473     for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2474     {
2475       if (NULL == socket->write_handle->messages[packet]) break;
2476       /* BS: Base sequence from ack; PS: sequence num of current packet */
2477       sequence_difference = ntohl (ack->base_sequence_number)
2478         - ntohl (socket->write_handle->messages[packet]->sequence_number);
2479       if ((0 == sequence_difference) ||
2480           (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH < sequence_difference))
2481         continue; /* The message in our handle is not yet received */
2482       /* case where BS = PS + GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH */
2483       /* sequence_difference <= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH */
2484       ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
2485                             packet, GNUNET_YES);
2486     }
2487     /* Update the receive window remaining
2488        FIXME : Should update with the value from a data ack with greater
2489        sequence number */
2490     socket->receiver_window_available = 
2491       ntohl (ack->receive_window_remaining);
2492     /* Check if we have received all acknowledgements */
2493     need_retransmission = GNUNET_NO;
2494     for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2495     {
2496       if (NULL == socket->write_handle->messages[packet]) break;
2497       if (GNUNET_YES != ackbitmap_is_bit_set 
2498           (&socket->write_handle->ack_bitmap,packet))
2499       {
2500         need_retransmission = GNUNET_YES;
2501         break;
2502       }
2503     }
2504     if (GNUNET_YES == need_retransmission)
2505     {
2506       write_data (socket);
2507     }
2508     else      /* We have to call the write continuation callback now */
2509     {
2510       struct GNUNET_STREAM_IOWriteHandle *write_handle;
2511       
2512       /* Free the packets */
2513       for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2514       {
2515         GNUNET_free_non_null (socket->write_handle->messages[packet]);
2516       }
2517       write_handle = socket->write_handle;
2518       socket->write_handle = NULL;
2519       if (NULL != write_handle->write_cont)
2520         write_handle->write_cont (write_handle->write_cont_cls,
2521                                   socket->status,
2522                                   write_handle->size);
2523       /* We are done with the write handle - Freeing it */
2524       GNUNET_free (write_handle);
2525       LOG (GNUNET_ERROR_TYPE_DEBUG,
2526            "%s: Write completion callback completed\n",
2527            GNUNET_i2s (&socket->other_peer));      
2528     }
2529     break;
2530   default:
2531     break;
2532   }
2533   return GNUNET_OK;
2534 }
2535
2536
2537 /**
2538  * Handler for DATA_ACK messages
2539  *
2540  * @param cls the 'struct GNUNET_STREAM_Socket'
2541  * @param tunnel connection to the other end
2542  * @param tunnel_ctx unused
2543  * @param sender who sent the message
2544  * @param message the actual message
2545  * @param atsi performance data for the connection
2546  * @return GNUNET_OK to keep the connection open,
2547  *         GNUNET_SYSERR to close it (signal serious error)
2548  */
2549 static int
2550 client_handle_ack (void *cls,
2551                    struct GNUNET_MESH_Tunnel *tunnel,
2552                    void **tunnel_ctx,
2553                    const struct GNUNET_PeerIdentity *sender,
2554                    const struct GNUNET_MessageHeader *message,
2555                    const struct GNUNET_ATS_Information*atsi)
2556 {
2557   struct GNUNET_STREAM_Socket *socket = cls;
2558   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2559  
2560   return handle_ack (socket, tunnel, sender, ack, atsi);
2561 }
2562
2563
2564 /**
2565  * Handler for DATA_ACK messages
2566  *
2567  * @param cls the server's listen socket
2568  * @param tunnel connection to the other end
2569  * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
2570  * @param sender who sent the message
2571  * @param message the actual message
2572  * @param atsi performance data for the connection
2573  * @return GNUNET_OK to keep the connection open,
2574  *         GNUNET_SYSERR to close it (signal serious error)
2575  */
2576 static int
2577 server_handle_ack (void *cls,
2578                    struct GNUNET_MESH_Tunnel *tunnel,
2579                    void **tunnel_ctx,
2580                    const struct GNUNET_PeerIdentity *sender,
2581                    const struct GNUNET_MessageHeader *message,
2582                    const struct GNUNET_ATS_Information*atsi)
2583 {
2584   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2585   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2586  
2587   return handle_ack (socket, tunnel, sender, ack, atsi);
2588 }
2589
2590
2591 /**
2592  * For client message handlers, the stream socket is in the
2593  * closure argument.
2594  */
2595 static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
2596   {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2597   {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2598    sizeof (struct GNUNET_STREAM_AckMessage) },
2599   {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2600    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2601   {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2602    sizeof (struct GNUNET_STREAM_MessageHeader)},
2603   {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2604    sizeof (struct GNUNET_STREAM_MessageHeader)},
2605   {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2606    sizeof (struct GNUNET_STREAM_MessageHeader)},
2607   {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2608    sizeof (struct GNUNET_STREAM_MessageHeader)},
2609   {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2610    sizeof (struct GNUNET_STREAM_MessageHeader)},
2611   {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2612    sizeof (struct GNUNET_STREAM_MessageHeader)},
2613   {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2614    sizeof (struct GNUNET_STREAM_MessageHeader)},
2615   {NULL, 0, 0}
2616 };
2617
2618
2619 /**
2620  * For server message handlers, the stream socket is in the
2621  * tunnel context, and the listen socket in the closure argument.
2622  */
2623 static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
2624   {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2625   {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2626    sizeof (struct GNUNET_STREAM_AckMessage) },
2627   {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 
2628    sizeof (struct GNUNET_STREAM_MessageHeader)},
2629   {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2630    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2631   {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2632    sizeof (struct GNUNET_STREAM_MessageHeader)},
2633   {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2634    sizeof (struct GNUNET_STREAM_MessageHeader)},
2635   {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2636    sizeof (struct GNUNET_STREAM_MessageHeader)},
2637   {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2638    sizeof (struct GNUNET_STREAM_MessageHeader)},
2639   {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2640    sizeof (struct GNUNET_STREAM_MessageHeader)},
2641   {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2642    sizeof (struct GNUNET_STREAM_MessageHeader)},
2643   {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2644    sizeof (struct GNUNET_STREAM_MessageHeader)},
2645   {NULL, 0, 0}
2646 };
2647
2648
2649 /**
2650  * Function called when our target peer is connected to our tunnel
2651  *
2652  * @param cls the socket for which this tunnel is created
2653  * @param peer the peer identity of the target
2654  * @param atsi performance data for the connection
2655  */
2656 static void
2657 mesh_peer_connect_callback (void *cls,
2658                             const struct GNUNET_PeerIdentity *peer,
2659                             const struct GNUNET_ATS_Information * atsi)
2660 {
2661   struct GNUNET_STREAM_Socket *socket = cls;
2662   struct GNUNET_STREAM_MessageHeader *message;
2663   
2664   if (0 != memcmp (peer,
2665                    &socket->other_peer,
2666                    sizeof (struct GNUNET_PeerIdentity)))
2667   {
2668     LOG (GNUNET_ERROR_TYPE_DEBUG,
2669          "%s: A peer which is not our target has connected to our tunnel\n",
2670          GNUNET_i2s(peer));
2671     return;
2672   }
2673   LOG (GNUNET_ERROR_TYPE_DEBUG,
2674        "%s: Target peer %s connected\n",
2675        GNUNET_i2s (&socket->other_peer),
2676        GNUNET_i2s (&socket->other_peer));
2677   /* Set state to INIT */
2678   socket->state = STATE_INIT;
2679   /* Send HELLO message */
2680   message = generate_hello ();
2681   queue_message (socket, message, &set_state_hello_wait, NULL, GNUNET_NO);
2682   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
2683                  socket->control_retransmission_task_id);
2684   socket->control_retransmission_task_id =
2685     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
2686                                   &control_retransmission_task, socket);
2687 }
2688
2689
2690 /**
2691  * Function called when our target peer is disconnected from our tunnel
2692  *
2693  * @param cls the socket associated which this tunnel
2694  * @param peer the peer identity of the target
2695  */
2696 static void
2697 mesh_peer_disconnect_callback (void *cls,
2698                                const struct GNUNET_PeerIdentity *peer)
2699 {
2700   struct GNUNET_STREAM_Socket *socket=cls;
2701   
2702   /* If the state is SHUTDOWN its ok; else set the state of the socket to SYSERR */
2703   LOG (GNUNET_ERROR_TYPE_DEBUG,
2704        "%s: Other peer %s disconnected \n",
2705        GNUNET_i2s (&socket->other_peer),
2706        GNUNET_i2s (&socket->other_peer));
2707 }
2708
2709
2710 /**
2711  * Method called whenever a peer creates a tunnel to us
2712  *
2713  * @param cls closure
2714  * @param tunnel new handle to the tunnel
2715  * @param initiator peer that started the tunnel
2716  * @param atsi performance information for the tunnel
2717  * @return initial tunnel context for the tunnel
2718  *         (can be NULL -- that's not an error)
2719  */
2720 static void *
2721 new_tunnel_notify (void *cls,
2722                    struct GNUNET_MESH_Tunnel *tunnel,
2723                    const struct GNUNET_PeerIdentity *initiator,
2724                    const struct GNUNET_ATS_Information *atsi)
2725 {
2726   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2727   struct GNUNET_STREAM_Socket *socket;
2728
2729   /* FIXME: If a tunnel is already created, we should not accept new tunnels
2730      from the same peer again until the socket is closed */
2731
2732   if (GNUNET_NO == lsocket->listening)
2733   {
2734     GNUNET_MESH_tunnel_destroy (tunnel);
2735     return NULL;
2736   }
2737   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2738   socket->other_peer = *initiator;
2739   socket->tunnel = tunnel;
2740   socket->session_id = 0;       /* FIXME */
2741   socket->state = STATE_INIT;
2742   socket->lsocket = lsocket;
2743   socket->retransmit_timeout = lsocket->retransmit_timeout;
2744   socket->testing_active = lsocket->testing_active;
2745   socket->testing_set_write_sequence_number_value =
2746       lsocket->testing_set_write_sequence_number_value;
2747   socket->max_payload_size = lsocket->max_payload_size;
2748   LOG (GNUNET_ERROR_TYPE_DEBUG,
2749        "%s: Peer %s initiated tunnel to us\n", 
2750        GNUNET_i2s (&socket->other_peer),
2751        GNUNET_i2s (&socket->other_peer));
2752   /* FIXME: Copy MESH handle from lsocket to socket */
2753   return socket;
2754 }
2755
2756
2757 /**
2758  * Function called whenever an inbound tunnel is destroyed.  Should clean up
2759  * any associated state.  This function is NOT called if the client has
2760  * explicitly asked for the tunnel to be destroyed using
2761  * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
2762  * the tunnel.
2763  *
2764  * @param cls closure (set from GNUNET_MESH_connect)
2765  * @param tunnel connection to the other end (henceforth invalid)
2766  * @param tunnel_ctx place where local state associated
2767  *                   with the tunnel is stored
2768  */
2769 static void 
2770 tunnel_cleaner (void *cls,
2771                 const struct GNUNET_MESH_Tunnel *tunnel,
2772                 void *tunnel_ctx)
2773 {
2774   struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
2775
2776   if (tunnel != socket->tunnel)
2777     return;
2778
2779   GNUNET_break_op(0);
2780   LOG (GNUNET_ERROR_TYPE_DEBUG,
2781        "%s: Peer %s has terminated connection abruptly\n",
2782        GNUNET_i2s (&socket->other_peer),
2783        GNUNET_i2s (&socket->other_peer));
2784   socket->status = GNUNET_STREAM_SHUTDOWN;
2785   /* Clear Transmit handles */
2786   if (NULL != socket->transmit_handle)
2787   {
2788     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2789     socket->transmit_handle = NULL;
2790   }
2791   /* Stop Tasks using socket->tunnel */
2792   if (GNUNET_SCHEDULER_NO_TASK != socket->ack_task_id)
2793   {
2794     GNUNET_SCHEDULER_cancel (socket->ack_task_id);
2795     socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
2796   }
2797   if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
2798   {
2799     GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
2800     socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
2801   }
2802   /* FIXME: Cancel all other tasks using socket->tunnel */
2803   socket->tunnel = NULL;
2804 }
2805
2806
2807 /**
2808  * Callback to signal timeout on lockmanager lock acquire
2809  *
2810  * @param cls the ListenSocket
2811  * @param tc the scheduler task context
2812  */
2813 static void
2814 lockmanager_acquire_timeout (void *cls, 
2815                              const struct GNUNET_SCHEDULER_TaskContext *tc)
2816 {
2817   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2818   GNUNET_STREAM_ListenCallback listen_cb;
2819   void *listen_cb_cls;
2820
2821   lsocket->lockmanager_acquire_timeout_task = GNUNET_SCHEDULER_NO_TASK;
2822   listen_cb = lsocket->listen_cb;
2823   listen_cb_cls = lsocket->listen_cb_cls;
2824   if (NULL != listen_cb)
2825     listen_cb (listen_cb_cls, NULL, NULL);
2826 }
2827
2828
2829 /**
2830  * Callback to notify us on the status changes on app_port lock
2831  *
2832  * @param cls the ListenSocket
2833  * @param domain the domain name of the lock
2834  * @param lock the app_port
2835  * @param status the current status of the lock
2836  */
2837 static void
2838 lock_status_change_cb (void *cls, const char *domain, uint32_t lock,
2839                        enum GNUNET_LOCKMANAGER_Status status)
2840 {
2841   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2842
2843   GNUNET_assert (lock == (uint32_t) lsocket->port);
2844   if (GNUNET_LOCKMANAGER_SUCCESS == status)
2845   {
2846     lsocket->listening = GNUNET_YES;
2847     if (GNUNET_SCHEDULER_NO_TASK != lsocket->lockmanager_acquire_timeout_task)
2848     {
2849       GNUNET_SCHEDULER_cancel (lsocket->lockmanager_acquire_timeout_task);
2850       lsocket->lockmanager_acquire_timeout_task = GNUNET_SCHEDULER_NO_TASK;
2851     }
2852     if (NULL == lsocket->mesh)
2853     {
2854       GNUNET_MESH_ApplicationType ports[] = {lsocket->port, 0};
2855
2856       lsocket->mesh = GNUNET_MESH_connect (lsocket->cfg,
2857                                            lsocket, /* Closure */
2858                                            &new_tunnel_notify,
2859                                            &tunnel_cleaner,
2860                                            server_message_handlers,
2861                                            ports);
2862       GNUNET_assert (NULL != lsocket->mesh);
2863       if (NULL != lsocket->listen_ok_cb)
2864       {
2865         (void) lsocket->listen_ok_cb ();
2866       }
2867     }
2868   }
2869   if (GNUNET_LOCKMANAGER_RELEASE == status)
2870     lsocket->listening = GNUNET_NO;
2871 }
2872
2873
2874 /*****************/
2875 /* API functions */
2876 /*****************/
2877
2878
2879 /**
2880  * Tries to open a stream to the target peer
2881  *
2882  * @param cfg configuration to use
2883  * @param target the target peer to which the stream has to be opened
2884  * @param app_port the application port number which uniquely identifies this
2885  *            stream
2886  * @param open_cb this function will be called after stream has be established;
2887  *          cannot be NULL
2888  * @param open_cb_cls the closure for open_cb
2889  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
2890  * @return if successful it returns the stream socket; NULL if stream cannot be
2891  *         opened 
2892  */
2893 struct GNUNET_STREAM_Socket *
2894 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
2895                     const struct GNUNET_PeerIdentity *target,
2896                     GNUNET_MESH_ApplicationType app_port,
2897                     GNUNET_STREAM_OpenCallback open_cb,
2898                     void *open_cb_cls,
2899                     ...)
2900 {
2901   struct GNUNET_STREAM_Socket *socket;
2902   enum GNUNET_STREAM_Option option;
2903   GNUNET_MESH_ApplicationType ports[] = {app_port, 0};
2904   va_list vargs;
2905   uint16_t payload_size;
2906
2907   LOG (GNUNET_ERROR_TYPE_DEBUG,
2908        "%s\n", __func__);
2909   GNUNET_assert (NULL != open_cb);
2910   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2911   socket->other_peer = *target;
2912   socket->open_cb = open_cb;
2913   socket->open_cls = open_cb_cls;
2914   /* Set defaults */
2915   socket->retransmit_timeout = TIME_REL_SECS (default_timeout);
2916   socket->testing_active = GNUNET_NO;
2917   socket->max_payload_size = DEFAULT_MAX_PAYLOAD_SIZE;
2918   va_start (vargs, open_cb_cls); /* Parse variable args */
2919   do {
2920     option = va_arg (vargs, enum GNUNET_STREAM_Option);
2921     switch (option)
2922     {
2923     case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
2924       /* Expect struct GNUNET_TIME_Relative */
2925       socket->retransmit_timeout = va_arg (vargs,
2926                                            struct GNUNET_TIME_Relative);
2927       break;
2928     case GNUNET_STREAM_OPTION_TESTING_SET_WRITE_SEQUENCE_NUMBER:
2929       socket->testing_active = GNUNET_YES;
2930       socket->testing_set_write_sequence_number_value = va_arg (vargs,
2931                                                                 uint32_t);
2932       break;
2933     case GNUNET_STREAM_OPTION_LISTEN_TIMEOUT:
2934       GNUNET_break (0);          /* Option irrelevant in STREAM_open */
2935       break;
2936     case GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS:
2937       GNUNET_break (0);          /* Option irrelevant in STREAM_open */
2938       break;
2939     case GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE:
2940       payload_size = (uint16_t) va_arg (vargs, unsigned int);
2941       GNUNET_assert (0 != payload_size);
2942       if (payload_size < socket->max_payload_size)
2943         socket->max_payload_size = payload_size;
2944       break;
2945     case GNUNET_STREAM_OPTION_END:
2946       break;
2947     }
2948   } while (GNUNET_STREAM_OPTION_END != option);
2949   va_end (vargs);               /* End of variable args parsing */
2950   socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
2951                                       socket, /* cls */
2952                                       NULL, /* No inbound tunnel handler */
2953                                       NULL, /* No in-tunnel cleaner */
2954                                       client_message_handlers,
2955                                       ports); /* We don't get inbound tunnels */
2956   if (NULL == socket->mesh)   /* Fail if we cannot connect to mesh */
2957   {
2958     GNUNET_free (socket);
2959     return NULL;
2960   }
2961   /* Now create the mesh tunnel to target */
2962   LOG (GNUNET_ERROR_TYPE_DEBUG, "Creating MESH Tunnel\n");
2963   socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
2964                                               NULL, /* Tunnel context */
2965                                               &mesh_peer_connect_callback,
2966                                               &mesh_peer_disconnect_callback,
2967                                               socket);
2968   GNUNET_assert (NULL != socket->tunnel);
2969   GNUNET_MESH_peer_request_connect_add (socket->tunnel,
2970                                         &socket->other_peer);
2971   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s() END\n", __func__);
2972   return socket;
2973 }
2974
2975
2976 /**
2977  * Shutdown the stream for reading or writing (similar to man 2 shutdown).
2978  *
2979  * @param socket the stream socket
2980  * @param operation SHUT_RD, SHUT_WR or SHUT_RDWR
2981  * @param completion_cb the callback that will be called upon successful
2982  *          shutdown of given operation
2983  * @param completion_cls the closure for the completion callback
2984  * @return the shutdown handle
2985  */
2986 struct GNUNET_STREAM_ShutdownHandle *
2987 GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
2988                         int operation,
2989                         GNUNET_STREAM_ShutdownCompletion completion_cb,
2990                         void *completion_cls)
2991 {
2992   struct GNUNET_STREAM_ShutdownHandle *handle;
2993   struct GNUNET_STREAM_MessageHeader *msg;
2994   
2995   GNUNET_assert (NULL == socket->shutdown_handle);
2996
2997   handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ShutdownHandle));
2998   handle->socket = socket;
2999   handle->completion_cb = completion_cb;
3000   handle->completion_cls = completion_cls;
3001   socket->shutdown_handle = handle;
3002
3003   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
3004   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
3005   switch (operation)
3006   {
3007   case SHUT_RD:
3008     handle->operation = SHUT_RD;
3009     if (NULL != socket->read_handle)
3010       LOG (GNUNET_ERROR_TYPE_WARNING,
3011            "Existing read handle should be cancelled before shutting"
3012            " down reading\n");
3013     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
3014     queue_message (socket, msg, &set_state_receive_close_wait, NULL,
3015                    GNUNET_NO);
3016     break;
3017   case SHUT_WR:
3018     handle->operation = SHUT_WR;
3019     if (NULL != socket->write_handle)
3020       LOG (GNUNET_ERROR_TYPE_WARNING,
3021            "Existing write handle should be cancelled before shutting"
3022            " down writing\n");
3023     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
3024     queue_message (socket, msg, &set_state_transmit_close_wait, NULL,
3025                    GNUNET_NO);
3026     break;
3027   case SHUT_RDWR:
3028     handle->operation = SHUT_RDWR;
3029     if (NULL != socket->write_handle)
3030       LOG (GNUNET_ERROR_TYPE_WARNING,
3031            "Existing write handle should be cancelled before shutting"
3032            " down writing\n");
3033     if (NULL != socket->read_handle)
3034       LOG (GNUNET_ERROR_TYPE_WARNING,
3035            "Existing read handle should be cancelled before shutting"
3036            " down reading\n");
3037     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
3038     queue_message (socket, msg, &set_state_close_wait, NULL, GNUNET_NO);
3039     break;
3040   default:
3041     LOG (GNUNET_ERROR_TYPE_WARNING,
3042          "GNUNET_STREAM_shutdown called with invalid value for "
3043          "parameter operation -- Ignoring\n");
3044     GNUNET_free (msg);
3045     GNUNET_free (handle);
3046     return NULL;
3047   }
3048   handle->close_msg_retransmission_task_id =
3049     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
3050                                   &close_msg_retransmission_task,
3051                                   handle);
3052   return handle;
3053 }
3054
3055
3056 /**
3057  * Cancels a pending shutdown
3058  *
3059  * @param handle the shutdown handle returned from GNUNET_STREAM_shutdown
3060  */
3061 void
3062 GNUNET_STREAM_shutdown_cancel (struct GNUNET_STREAM_ShutdownHandle *handle)
3063 {
3064   if (GNUNET_SCHEDULER_NO_TASK != handle->close_msg_retransmission_task_id)
3065     GNUNET_SCHEDULER_cancel (handle->close_msg_retransmission_task_id);
3066   handle->socket->shutdown_handle = NULL;
3067   GNUNET_free (handle);
3068 }
3069
3070
3071 /**
3072  * Closes the stream
3073  *
3074  * @param socket the stream socket
3075  */
3076 void
3077 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
3078 {
3079   struct MessageQueue *head;
3080
3081   if (NULL != socket->read_handle)
3082   {
3083     LOG (GNUNET_ERROR_TYPE_WARNING,
3084          "Closing STREAM socket when a read handle is pending\n");
3085     GNUNET_STREAM_io_read_cancel (socket->read_handle);
3086   }
3087   if (NULL != socket->write_handle)
3088   {
3089     LOG (GNUNET_ERROR_TYPE_WARNING,
3090          "Closing STREAM socket when a write handle is pending\n");
3091     GNUNET_STREAM_io_write_cancel (socket->write_handle);
3092     //socket->write_handle = NULL;
3093   }
3094   /* Terminate the ack'ing task if they are still present */
3095   if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK)
3096   {
3097     GNUNET_SCHEDULER_cancel (socket->ack_task_id);
3098     socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
3099   }
3100   /* Terminate the control retransmission tasks */
3101   if (GNUNET_SCHEDULER_NO_TASK != socket->control_retransmission_task_id)
3102   {
3103     GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
3104   }
3105   /* Clear Transmit handles */
3106   if (NULL != socket->transmit_handle)
3107   {
3108     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
3109     socket->transmit_handle = NULL;
3110   }
3111   /* Clear existing message queue */
3112   while (NULL != (head = socket->queue_head)) {
3113     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
3114                                  socket->queue_tail,
3115                                  head);
3116     GNUNET_free (head->message);
3117     GNUNET_free (head);
3118   }
3119   /* Close associated tunnel */
3120   if (NULL != socket->tunnel)
3121   {
3122     GNUNET_MESH_tunnel_destroy (socket->tunnel);
3123     socket->tunnel = NULL;
3124   }
3125   /* Close mesh connection */
3126   if (NULL != socket->mesh && NULL == socket->lsocket)
3127   {
3128     GNUNET_MESH_disconnect (socket->mesh);
3129     socket->mesh = NULL;
3130   }  
3131   /* Release receive buffer */
3132   if (NULL != socket->receive_buffer)
3133   {
3134     GNUNET_free (socket->receive_buffer);
3135   }
3136   GNUNET_free (socket);
3137 }
3138
3139
3140 /**
3141  * Listens for stream connections for a specific application ports
3142  *
3143  * @param cfg the configuration to use
3144  * @param app_port the application port for which new streams will be accepted
3145  * @param listen_cb this function will be called when a peer tries to establish
3146  *            a stream with us
3147  * @param listen_cb_cls closure for listen_cb
3148  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
3149  * @return listen socket, NULL for any error
3150  */
3151 struct GNUNET_STREAM_ListenSocket *
3152 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
3153                       GNUNET_MESH_ApplicationType app_port,
3154                       GNUNET_STREAM_ListenCallback listen_cb,
3155                       void *listen_cb_cls,
3156                       ...)
3157 {
3158   /* FIXME: Add variable args for passing configration options? */
3159   struct GNUNET_STREAM_ListenSocket *lsocket;
3160   struct GNUNET_TIME_Relative listen_timeout;
3161   enum GNUNET_STREAM_Option option;
3162   va_list vargs;
3163   uint16_t payload_size;
3164
3165   GNUNET_assert (NULL != listen_cb);
3166   lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
3167   lsocket->cfg = GNUNET_CONFIGURATION_dup (cfg);
3168   lsocket->lockmanager = GNUNET_LOCKMANAGER_connect (lsocket->cfg);
3169   if (NULL == lsocket->lockmanager)
3170   {
3171     GNUNET_CONFIGURATION_destroy (lsocket->cfg);
3172     GNUNET_free (lsocket);
3173     return NULL;
3174   }
3175   lsocket->listening = GNUNET_NO;/* We listen when we get a lock on app_port */  
3176   /* Set defaults */
3177   lsocket->retransmit_timeout = TIME_REL_SECS (default_timeout);
3178   lsocket->testing_active = GNUNET_NO;
3179   lsocket->listen_ok_cb = NULL;
3180   lsocket->max_payload_size = DEFAULT_MAX_PAYLOAD_SIZE;
3181   listen_timeout = TIME_REL_SECS (60); /* A minute for listen timeout */  
3182   va_start (vargs, listen_cb_cls);
3183   do {
3184     option = va_arg (vargs, enum GNUNET_STREAM_Option);
3185     switch (option)
3186     {
3187     case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
3188       lsocket->retransmit_timeout = va_arg (vargs,
3189                                             struct GNUNET_TIME_Relative);
3190       break;
3191     case GNUNET_STREAM_OPTION_TESTING_SET_WRITE_SEQUENCE_NUMBER:
3192       lsocket->testing_active = GNUNET_YES;
3193       lsocket->testing_set_write_sequence_number_value = va_arg (vargs,
3194                                                                  uint32_t);
3195       break;
3196     case GNUNET_STREAM_OPTION_LISTEN_TIMEOUT:
3197       listen_timeout = GNUNET_TIME_relative_multiply
3198         (GNUNET_TIME_UNIT_MILLISECONDS, va_arg (vargs, uint32_t));
3199       break;
3200     case GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS:
3201       lsocket->listen_ok_cb = va_arg (vargs,
3202                                       GNUNET_STREAM_ListenSuccessCallback);
3203       break;
3204     case GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE:
3205       payload_size = (uint16_t) va_arg (vargs, unsigned int);
3206       GNUNET_assert (0 != payload_size);
3207       if (payload_size < lsocket->max_payload_size)
3208         lsocket->max_payload_size = payload_size;
3209       break;
3210     case GNUNET_STREAM_OPTION_END:
3211       break;
3212     }
3213   } while (GNUNET_STREAM_OPTION_END != option);
3214   va_end (vargs);
3215   lsocket->port = app_port;
3216   lsocket->listen_cb = listen_cb;
3217   lsocket->listen_cb_cls = listen_cb_cls;
3218   lsocket->locking_request = 
3219     GNUNET_LOCKMANAGER_acquire_lock (lsocket->lockmanager, locking_domain,
3220                                      (uint32_t) lsocket->port,
3221                                      &lock_status_change_cb, lsocket);
3222   lsocket->lockmanager_acquire_timeout_task =
3223     GNUNET_SCHEDULER_add_delayed (listen_timeout,
3224                                   &lockmanager_acquire_timeout, lsocket);
3225   return lsocket;
3226 }
3227
3228
3229 /**
3230  * Closes the listen socket
3231  *
3232  * @param lsocket the listen socket
3233  */
3234 void
3235 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
3236 {
3237   /* Close MESH connection */
3238   if (NULL != lsocket->mesh)
3239     GNUNET_MESH_disconnect (lsocket->mesh);
3240   GNUNET_CONFIGURATION_destroy (lsocket->cfg);
3241   if (GNUNET_SCHEDULER_NO_TASK != lsocket->lockmanager_acquire_timeout_task)
3242     GNUNET_SCHEDULER_cancel (lsocket->lockmanager_acquire_timeout_task);
3243   if (NULL != lsocket->locking_request)
3244     GNUNET_LOCKMANAGER_cancel_request (lsocket->locking_request);
3245   if (NULL != lsocket->lockmanager)
3246     GNUNET_LOCKMANAGER_disconnect (lsocket->lockmanager);
3247   GNUNET_free (lsocket);
3248 }
3249
3250
3251 /**
3252  * Tries to write the given data to the stream. The maximum size of data that
3253  * can be written as part of a write operation is (64 * (64000 - sizeof (struct
3254  * GNUNET_STREAM_DataMessage))). If size is greater than this it is not an API
3255  * violation, however only the said number of maximum bytes will be written.
3256  *
3257  * @param socket the socket representing a stream
3258  * @param data the data buffer from where the data is written into the stream
3259  * @param size the number of bytes to be written from the data buffer
3260  * @param timeout the timeout period
3261  * @param write_cont the function to call upon writing some bytes into the
3262  *          stream 
3263  * @param write_cont_cls the closure
3264  *
3265  * @return handle to cancel the operation; if a previous write is pending or
3266  *           the stream has been shutdown for this operation then write_cont is
3267  *           immediately called and NULL is returned.
3268  */
3269 struct GNUNET_STREAM_IOWriteHandle *
3270 GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
3271                      const void *data,
3272                      size_t size,
3273                      struct GNUNET_TIME_Relative timeout,
3274                      GNUNET_STREAM_CompletionContinuation write_cont,
3275                      void *write_cont_cls)
3276 {
3277   struct GNUNET_STREAM_IOWriteHandle *io_handle;
3278   struct GNUNET_STREAM_DataMessage *data_msg;
3279   const void *sweep;
3280   struct GNUNET_TIME_Relative ack_deadline;
3281   unsigned int num_needed_packets;
3282   unsigned int packet;
3283   uint32_t packet_size;
3284   uint32_t payload_size;
3285   uint16_t max_data_packet_size;
3286
3287   LOG (GNUNET_ERROR_TYPE_DEBUG,
3288        "%s\n", __func__);
3289   if (NULL != socket->write_handle)
3290   {
3291     GNUNET_break (0);
3292     return NULL;
3293   }
3294   switch (socket->state)
3295   {
3296   case STATE_TRANSMIT_CLOSED:
3297   case STATE_TRANSMIT_CLOSE_WAIT:
3298   case STATE_CLOSED:
3299   case STATE_CLOSE_WAIT:
3300     if (NULL != write_cont)
3301       write_cont (write_cont_cls, GNUNET_STREAM_SHUTDOWN, 0);
3302     LOG (GNUNET_ERROR_TYPE_DEBUG,
3303          "%s() END\n", __func__);
3304     return NULL;
3305   case STATE_INIT:
3306   case STATE_LISTEN:
3307   case STATE_HELLO_WAIT:
3308     if (NULL != write_cont)
3309       /* FIXME: GNUNET_STREAM_SYSERR?? */
3310       write_cont (write_cont_cls, GNUNET_STREAM_SYSERR, 0);
3311     LOG (GNUNET_ERROR_TYPE_DEBUG,
3312          "%s() END\n", __func__);
3313     return NULL;
3314   case STATE_ESTABLISHED:
3315   case STATE_RECEIVE_CLOSED:
3316   case STATE_RECEIVE_CLOSE_WAIT:
3317     break;
3318   }
3319   if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * socket->max_payload_size < size)
3320     size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH  * socket->max_payload_size;
3321   num_needed_packets =
3322       (size + (socket->max_payload_size - 1)) / socket->max_payload_size;
3323   io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
3324   io_handle->socket = socket;
3325   io_handle->write_cont = write_cont;
3326   io_handle->write_cont_cls = write_cont_cls;
3327   io_handle->size = size;
3328   io_handle->packets_sent = 0;
3329   sweep = data;
3330   /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3331      determined from RTT */
3332   ack_deadline = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
3333   /* Divide the given buffer into packets for sending */
3334   max_data_packet_size =
3335       socket->max_payload_size + sizeof (struct GNUNET_STREAM_DataMessage);
3336   for (packet=0; packet < num_needed_packets; packet++)
3337   {
3338     if ((packet + 1) * socket->max_payload_size < size) 
3339     {
3340       payload_size = socket->max_payload_size;
3341       packet_size = max_data_packet_size;
3342     }
3343     else 
3344     {
3345       payload_size = size - packet * socket->max_payload_size;
3346       packet_size = 
3347           payload_size + sizeof (struct GNUNET_STREAM_DataMessage);
3348     }
3349     io_handle->messages[packet] = GNUNET_malloc (packet_size);
3350     io_handle->messages[packet]->header.header.size = htons (packet_size);
3351     io_handle->messages[packet]->header.header.type =
3352       htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
3353     io_handle->messages[packet]->sequence_number =
3354       htonl (socket->write_sequence_number++);
3355     io_handle->messages[packet]->offset = htonl (socket->write_offset);
3356     /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3357        determined from RTT */
3358     io_handle->messages[packet]->ack_deadline =
3359       GNUNET_TIME_relative_hton (ack_deadline);
3360     data_msg = io_handle->messages[packet];
3361     /* Copy data from given buffer to the packet */
3362     memcpy (&data_msg[1], sweep, payload_size);
3363     sweep += payload_size;
3364     socket->write_offset += payload_size;
3365   }
3366   /* ack the last data message. FIXME: remove when we figure out how to do this
3367      using RTT */
3368   io_handle->messages[num_needed_packets - 1]->ack_deadline = 
3369       GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO);
3370   socket->write_handle = io_handle;
3371   write_data (socket);
3372   LOG (GNUNET_ERROR_TYPE_DEBUG,
3373        "%s() END\n", __func__);
3374   return io_handle;
3375 }
3376
3377
3378 /**
3379  * Tries to read data from the stream.
3380  *
3381  * @param socket the socket representing a stream
3382  * @param timeout the timeout period
3383  * @param proc function to call with data (once only)
3384  * @param proc_cls the closure for proc
3385  *
3386  * @return handle to cancel the operation; if the stream has been shutdown for
3387  *           this type of opeartion then the DataProcessor is immediately
3388  *           called with GNUNET_STREAM_SHUTDOWN as status and NULL if returned
3389  */
3390 struct GNUNET_STREAM_IOReadHandle *
3391 GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
3392                     struct GNUNET_TIME_Relative timeout,
3393                     GNUNET_STREAM_DataProcessor proc,
3394                     void *proc_cls)
3395 {
3396   struct GNUNET_STREAM_IOReadHandle *read_handle;
3397   
3398   LOG (GNUNET_ERROR_TYPE_DEBUG,
3399        "%s: %s()\n", 
3400        GNUNET_i2s (&socket->other_peer),
3401        __func__);
3402   /* Return NULL if there is already a read handle; the user has to cancel that
3403      first before continuing or has to wait until it is completed */
3404   if (NULL != socket->read_handle) 
3405     return NULL;
3406   GNUNET_assert (NULL != proc);
3407   switch (socket->state)
3408   {
3409   case STATE_RECEIVE_CLOSED:
3410   case STATE_RECEIVE_CLOSE_WAIT:
3411   case STATE_CLOSED:
3412   case STATE_CLOSE_WAIT:
3413     proc (proc_cls, GNUNET_STREAM_SHUTDOWN, NULL, 0);
3414     LOG (GNUNET_ERROR_TYPE_DEBUG,
3415          "%s: %s() END\n",
3416          GNUNET_i2s (&socket->other_peer),
3417          __func__);
3418     return NULL;
3419   default:
3420     break;
3421   }
3422   read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
3423   read_handle->proc = proc;
3424   read_handle->proc_cls = proc_cls;
3425   read_handle->socket = socket;
3426   socket->read_handle = read_handle;
3427   if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
3428                                           0))
3429     read_handle->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
3430                                                           socket);   
3431   read_handle->read_io_timeout_task_id =
3432       GNUNET_SCHEDULER_add_delayed (timeout, &read_io_timeout, socket);
3433   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: %s() END\n",
3434        GNUNET_i2s (&socket->other_peer), __func__);
3435   return read_handle;
3436 }
3437
3438
3439 /**
3440  * Cancel pending write operation.
3441  *
3442  * @param ioh handle to operation to cancel
3443  */
3444 void
3445 GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
3446 {
3447   struct GNUNET_STREAM_Socket *socket = ioh->socket;
3448   unsigned int packet;
3449
3450   GNUNET_assert (NULL != socket->write_handle);
3451   GNUNET_assert (socket->write_handle == ioh);
3452
3453   if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
3454   {
3455     GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
3456     socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
3457   }
3458
3459   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
3460   {
3461     if (NULL == ioh->messages[packet]) break;
3462     GNUNET_free (ioh->messages[packet]);
3463   }
3464       
3465   GNUNET_free (socket->write_handle);
3466   socket->write_handle = NULL;
3467 }
3468
3469
3470 /**
3471  * Cancel pending read operation.
3472  *
3473  * @param ioh handle to operation to cancel
3474  */
3475 void
3476 GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh)
3477 {
3478   struct GNUNET_STREAM_Socket *socket;
3479   
3480   socket = ioh->socket;
3481   GNUNET_assert (NULL != socket->read_handle);
3482   GNUNET_assert (ioh == socket->read_handle);
3483   /* Read io time task should be there; if it is already executed then this
3484   read handle is not valid; However upon scheduler shutdown the read io task
3485   may be executed before */
3486   if (GNUNET_SCHEDULER_NO_TASK != ioh->read_io_timeout_task_id)
3487     GNUNET_SCHEDULER_cancel (ioh->read_io_timeout_task_id);
3488   /* reading task may be present; if so we have to stop it */
3489   if (GNUNET_SCHEDULER_NO_TASK != ioh->read_task_id)
3490     GNUNET_SCHEDULER_cancel (ioh->read_task_id);
3491   GNUNET_free (ioh);
3492   socket->read_handle = NULL;
3493 }
3494
3495 /* end of stream_api.c */