18435d0a760d2d4130630de30906aa696f2d72c1
[oweals/gnunet.git] / src / stream / stream_api.c
1 /*
2   This file is part of GNUnet.
3   (C) 2012 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /* TODO:
22  *
23  * Checks for matching the sender and socket->other_peer in server
24  * message handlers  
25  *
26  * Add code for write io timeout
27  *
28  * Include retransmission for control messages
29  **/
30
31 /**
32  * @file stream/stream_api.c
33  * @brief Implementation of the stream library
34  * @author Sree Harsha Totakura
35  */
36
37
38 #include "platform.h"
39 #include "gnunet_common.h"
40 #include "gnunet_crypto_lib.h"
41 #include "gnunet_lockmanager_service.h"
42 #include "gnunet_stream_lib.h"
43 #include "stream_protocol.h"
44
45 #define LOG(kind,...)                                   \
46   GNUNET_log_from (kind, "stream-api", __VA_ARGS__)
47
48 #define TIME_REL_SECS(sec) \
49   GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, sec)
50
51 /**
52  * The maximum packet size of a stream packet
53  */
54 #define MAX_PACKET_SIZE 512//64000
55
56 /**
57  * Receive buffer
58  */
59 #define RECEIVE_BUFFER_SIZE 4096000
60
61 /**
62  * The maximum payload a data message packet can carry
63  */
64 static const size_t max_payload_size = 
65   MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
66
67 /**
68  * states in the Protocol
69  */
70 enum State
71   {
72     /**
73      * Client initialization state
74      */
75     STATE_INIT,
76
77     /**
78      * Listener initialization state 
79      */
80     STATE_LISTEN,
81
82     /**
83      * Pre-connection establishment state
84      */
85     STATE_HELLO_WAIT,
86
87     /**
88      * State where a connection has been established
89      */
90     STATE_ESTABLISHED,
91
92     /**
93      * State where the socket is closed on our side and waiting to be ACK'ed
94      */
95     STATE_RECEIVE_CLOSE_WAIT,
96
97     /**
98      * State where the socket is closed for reading
99      */
100     STATE_RECEIVE_CLOSED,
101
102     /**
103      * State where the socket is closed on our side and waiting to be ACK'ed
104      */
105     STATE_TRANSMIT_CLOSE_WAIT,
106
107     /**
108      * State where the socket is closed for writing
109      */
110     STATE_TRANSMIT_CLOSED,
111
112     /**
113      * State where the socket is closed on our side and waiting to be ACK'ed
114      */
115     STATE_CLOSE_WAIT,
116
117     /**
118      * State where the socket is closed
119      */
120     STATE_CLOSED 
121   };
122
123
124 /**
125  * Functions of this type are called when a message is written
126  *
127  * @param cls the closure from queue_message
128  * @param socket the socket the written message was bound to
129  */
130 typedef void (*SendFinishCallback) (void *cls,
131                                     struct GNUNET_STREAM_Socket *socket);
132
133
134 /**
135  * The send message queue
136  */
137 struct MessageQueue
138 {
139   /**
140    * The message
141    */
142   struct GNUNET_STREAM_MessageHeader *message;
143
144   /**
145    * Callback to be called when the message is sent
146    */
147   SendFinishCallback finish_cb;
148
149   /**
150    * The closure for finish_cb
151    */
152   void *finish_cb_cls;
153
154   /**
155    * The next message in queue. Should be NULL in the last message
156    */
157   struct MessageQueue *next;
158
159   /**
160    * The next message in queue. Should be NULL in the first message
161    */
162   struct MessageQueue *prev;
163 };
164
165
166 /**
167  * The STREAM Socket Handler
168  */
169 struct GNUNET_STREAM_Socket
170 {
171   /**
172    * Retransmission timeout
173    */
174   struct GNUNET_TIME_Relative retransmit_timeout;
175
176   /**
177    * The Acknowledgement Bitmap
178    */
179   GNUNET_STREAM_AckBitmap ack_bitmap;
180
181   /**
182    * Time when the Acknowledgement was queued
183    */
184   struct GNUNET_TIME_Absolute ack_time_registered;
185
186   /**
187    * Queued Acknowledgement deadline
188    */
189   struct GNUNET_TIME_Relative ack_time_deadline;
190
191   /**
192    * The mesh handle
193    */
194   struct GNUNET_MESH_Handle *mesh;
195
196   /**
197    * The mesh tunnel handle
198    */
199   struct GNUNET_MESH_Tunnel *tunnel;
200
201   /**
202    * Stream open closure
203    */
204   void *open_cls;
205
206   /**
207    * Stream open callback
208    */
209   GNUNET_STREAM_OpenCallback open_cb;
210
211   /**
212    * The current transmit handle (if a pending transmit request exists)
213    */
214   struct GNUNET_MESH_TransmitHandle *transmit_handle;
215
216   /**
217    * The current act transmit handle (if a pending ack transmit request exists)
218    */
219   struct GNUNET_MESH_TransmitHandle *ack_transmit_handle;
220
221   /**
222    * Pointer to the current ack message using in ack_task
223    */
224   struct GNUNET_STREAM_AckMessage *ack_msg;
225
226   /**
227    * The current message associated with the transmit handle
228    */
229   struct MessageQueue *queue_head;
230
231   /**
232    * The queue tail, should always point to the last message in queue
233    */
234   struct MessageQueue *queue_tail;
235
236   /**
237    * The write IO_handle associated with this socket
238    */
239   struct GNUNET_STREAM_IOWriteHandle *write_handle;
240
241   /**
242    * The read IO_handle associated with this socket
243    */
244   struct GNUNET_STREAM_IOReadHandle *read_handle;
245
246   /**
247    * The shutdown handle associated with this socket
248    */
249   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
250
251   /**
252    * Buffer for storing received messages
253    */
254   void *receive_buffer;
255
256   /**
257    * The listen socket from which this socket is derived. Should be NULL if it
258    * is not a derived socket
259    */
260   struct GNUNET_STREAM_ListenSocket *lsocket;
261
262   /**
263    * The peer identity of the peer at the other end of the stream
264    */
265   struct GNUNET_PeerIdentity other_peer;
266
267   /**
268    * Task identifier for the read io timeout task
269    */
270   GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id;
271
272   /**
273    * Task identifier for retransmission task after timeout
274    */
275   GNUNET_SCHEDULER_TaskIdentifier retransmission_timeout_task_id;
276
277   /**
278    * The task for sending timely Acks
279    */
280   GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
281
282   /**
283    * Task scheduled to continue a read operation.
284    */
285   GNUNET_SCHEDULER_TaskIdentifier read_task_id;
286
287   /**
288    * The state of the protocol associated with this socket
289    */
290   enum State state;
291
292   /**
293    * The status of the socket
294    */
295   enum GNUNET_STREAM_Status status;
296
297   /**
298    * The number of previous timeouts; FIXME: currently not used
299    */
300   unsigned int retries;
301
302   /**
303    * The application port number (type: uint32_t)
304    */
305   GNUNET_MESH_ApplicationType app_port;
306
307   /**
308    * Whether testing mode is active or not
309    */
310   int testing_active;
311
312   /**
313    * The write sequence number to be set incase of testing
314    */
315   uint32_t testing_set_write_sequence_number_value;
316
317   /**
318    * The session id associated with this stream connection
319    * FIXME: Not used currently, may be removed
320    */
321   uint32_t session_id;
322
323   /**
324    * Write sequence number. Set to random when sending HELLO(client) and
325    * HELLO_ACK(server) 
326    */
327   uint32_t write_sequence_number;
328
329   /**
330    * Read sequence number. This number's value is determined during handshake
331    */
332   uint32_t read_sequence_number;
333
334   /**
335    * The receiver buffer size
336    */
337   uint32_t receive_buffer_size;
338
339   /**
340    * The receiver buffer boundaries
341    */
342   uint32_t receive_buffer_boundaries[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
343
344   /**
345    * receiver's available buffer after the last acknowledged packet
346    */
347   uint32_t receiver_window_available;
348
349   /**
350    * The offset pointer used during write operation
351    */
352   uint32_t write_offset;
353
354   /**
355    * The offset after which we are expecting data
356    */
357   uint32_t read_offset;
358
359   /**
360    * The offset upto which user has read from the received buffer
361    */
362   uint32_t copy_offset;
363 };
364
365
366 /**
367  * A socket for listening
368  */
369 struct GNUNET_STREAM_ListenSocket
370 {
371   /**
372    * The mesh handle
373    */
374   struct GNUNET_MESH_Handle *mesh;
375
376   /**
377    * Our configuration
378    */
379   struct GNUNET_CONFIGURATION_Handle *cfg;
380
381   /**
382    * Handle to the lock manager service
383    */
384   struct GNUNET_LOCKMANAGER_Handle *lockmanager;
385
386   /**
387    * The active LockingRequest from lockmanager
388    */
389   struct GNUNET_LOCKMANAGER_LockingRequest *locking_request;
390
391   /**
392    * Callback to call after acquring a lock and listening
393    */
394   GNUNET_STREAM_ListenSuccessCallback listen_ok_cb;
395
396   /**
397    * The callback function which is called after successful opening socket
398    */
399   GNUNET_STREAM_ListenCallback listen_cb;
400
401   /**
402    * The call back closure
403    */
404   void *listen_cb_cls;
405
406   /**
407    * The service port
408    */
409   GNUNET_MESH_ApplicationType port;
410   
411   /**
412    * The id of the lockmanager timeout task
413    */
414   GNUNET_SCHEDULER_TaskIdentifier lockmanager_acquire_timeout_task;
415
416   /**
417    * The retransmit timeout
418    */
419   struct GNUNET_TIME_Relative retransmit_timeout;
420   
421   /**
422    * Listen enabled?
423    */
424   int listening;
425
426   /**
427    * Whether testing mode is active or not
428    */
429   int testing_active;
430
431   /**
432    * The write sequence number to be set incase of testing
433    */
434   uint32_t testing_set_write_sequence_number_value;
435 };
436
437
438 /**
439  * The IO Write Handle
440  */
441 struct GNUNET_STREAM_IOWriteHandle
442 {
443   /**
444    * The socket to which this write handle is associated
445    */
446   struct GNUNET_STREAM_Socket *socket;
447
448   /**
449    * The packet_buffers associated with this Handle
450    */
451   struct GNUNET_STREAM_DataMessage *messages[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
452
453   /**
454    * The write continuation callback
455    */
456   GNUNET_STREAM_CompletionContinuation write_cont;
457
458   /**
459    * Write continuation closure
460    */
461   void *write_cont_cls;
462
463   /**
464    * The bitmap of this IOHandle; Corresponding bit for a message is set when
465    * it has been acknowledged by the receiver
466    */
467   GNUNET_STREAM_AckBitmap ack_bitmap;
468
469   /**
470    * Number of bytes in this write handle
471    */
472   size_t size;
473 };
474
475
476 /**
477  * The IO Read Handle
478  */
479 struct GNUNET_STREAM_IOReadHandle
480 {
481   /**
482    * The socket to which this read handle is associated
483    */
484   struct GNUNET_STREAM_Socket *socket;
485   
486   /**
487    * Callback for the read processor
488    */
489   GNUNET_STREAM_DataProcessor proc;
490
491   /**
492    * The closure pointer for the read processor callback
493    */
494   void *proc_cls;
495 };
496
497
498 /**
499  * Handle for Shutdown
500  */
501 struct GNUNET_STREAM_ShutdownHandle
502 {
503   /**
504    * The socket associated with this shutdown handle
505    */
506   struct GNUNET_STREAM_Socket *socket;
507
508   /**
509    * Shutdown completion callback
510    */
511   GNUNET_STREAM_ShutdownCompletion completion_cb;
512
513   /**
514    * Closure for completion callback
515    */
516   void *completion_cls;
517
518   /**
519    * Close message retransmission task id
520    */
521   GNUNET_SCHEDULER_TaskIdentifier close_msg_retransmission_task_id;
522
523   /**
524    * Which operation to shutdown? SHUT_RD, SHUT_WR or SHUT_RDWR
525    */
526   int operation;  
527 };
528
529
530 /**
531  * Default value in seconds for various timeouts
532  */
533 static const unsigned int default_timeout = 10;
534
535 /**
536  * The domain name for locks we use here
537  */
538 static const char *locking_domain = "GNUNET_STREAM_APPLOCK";
539
540
541 /**
542  * Callback function for sending queued message
543  *
544  * @param cls closure the socket
545  * @param size number of bytes available in buf
546  * @param buf where the callee should write the message
547  * @return number of bytes written to buf
548  */
549 static size_t
550 send_message_notify (void *cls, size_t size, void *buf)
551 {
552   struct GNUNET_STREAM_Socket *socket = cls;
553   struct MessageQueue *head;
554   size_t ret;
555
556   socket->transmit_handle = NULL; /* Remove the transmit handle */
557   head = socket->queue_head;
558   if (NULL == head)
559     return 0; /* just to be safe */
560   if (0 == size)                /* request timed out */
561   {
562     socket->retries++;
563     LOG (GNUNET_ERROR_TYPE_DEBUG,
564          "%s: Message sending timed out. Retry %d \n",
565          GNUNET_i2s (&socket->other_peer),
566          socket->retries);
567     socket->transmit_handle = 
568       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
569                                          0, /* Corking */
570                                          1, /* Priority */
571                                          /* FIXME: exponential backoff */
572                                          socket->retransmit_timeout,
573                                          &socket->other_peer,
574                                          ntohs (head->message->header.size),
575                                          &send_message_notify,
576                                          socket);
577     return 0;
578   }
579
580   ret = ntohs (head->message->header.size);
581   GNUNET_assert (size >= ret);
582   memcpy (buf, head->message, ret);
583   if (NULL != head->finish_cb)
584   {
585     head->finish_cb (head->finish_cb_cls, socket);
586   }
587   GNUNET_CONTAINER_DLL_remove (socket->queue_head,
588                                socket->queue_tail,
589                                head);
590   GNUNET_free (head->message);
591   GNUNET_free (head);
592   head = socket->queue_head;
593   if (NULL != head)    /* more pending messages to send */
594   {
595     socket->retries = 0;
596     socket->transmit_handle = 
597       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
598                                          0, /* Corking */
599                                          1, /* Priority */
600                                          /* FIXME: exponential backoff */
601                                          socket->retransmit_timeout,
602                                          &socket->other_peer,
603                                          ntohs (head->message->header.size),
604                                          &send_message_notify,
605                                          socket);
606   }
607   return ret;
608 }
609
610
611 /**
612  * Queues a message for sending using the mesh connection of a socket
613  *
614  * @param socket the socket whose mesh connection is used
615  * @param message the message to be sent
616  * @param finish_cb the callback to be called when the message is sent
617  * @param finish_cb_cls the closure for the callback
618  */
619 static void
620 queue_message (struct GNUNET_STREAM_Socket *socket,
621                struct GNUNET_STREAM_MessageHeader *message,
622                SendFinishCallback finish_cb,
623                void *finish_cb_cls)
624 {
625   struct MessageQueue *queue_entity;
626
627   GNUNET_assert 
628     ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA)
629      && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK));
630
631   LOG (GNUNET_ERROR_TYPE_DEBUG,
632        "%s: Queueing message of type %d and size %d\n",
633        GNUNET_i2s (&socket->other_peer),
634        ntohs (message->header.type),
635        ntohs (message->header.size));
636   GNUNET_assert (NULL != message);
637   queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
638   queue_entity->message = message;
639   queue_entity->finish_cb = finish_cb;
640   queue_entity->finish_cb_cls = finish_cb_cls;
641   GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
642                                     socket->queue_tail,
643                                     queue_entity);
644   if (NULL == socket->transmit_handle)
645   {
646     socket->retries = 0;
647     socket->transmit_handle = 
648       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
649                                          0, /* Corking */
650                                          1, /* Priority */
651                                          socket->retransmit_timeout,
652                                          &socket->other_peer,
653                                          ntohs (message->header.size),
654                                          &send_message_notify,
655                                          socket);
656   }
657 }
658
659
660 /**
661  * Copies a message and queues it for sending using the mesh connection of
662  * given socket 
663  *
664  * @param socket the socket whose mesh connection is used
665  * @param message the message to be sent
666  * @param finish_cb the callback to be called when the message is sent
667  * @param finish_cb_cls the closure for the callback
668  */
669 static void
670 copy_and_queue_message (struct GNUNET_STREAM_Socket *socket,
671                         const struct GNUNET_STREAM_MessageHeader *message,
672                         SendFinishCallback finish_cb,
673                         void *finish_cb_cls)
674 {
675   struct GNUNET_STREAM_MessageHeader *msg_copy;
676   uint16_t size;
677   
678   size = ntohs (message->header.size);
679   msg_copy = GNUNET_malloc (size);
680   memcpy (msg_copy, message, size);
681   queue_message (socket, msg_copy, finish_cb, finish_cb_cls);
682 }
683
684
685 /**
686  * Callback function for sending ack message
687  *
688  * @param cls closure the ACK message created in ack_task
689  * @param size number of bytes available in buffer
690  * @param buf where the callee should write the message
691  * @return number of bytes written to buf
692  */
693 static size_t
694 send_ack_notify (void *cls, size_t size, void *buf)
695 {
696   struct GNUNET_STREAM_Socket *socket = cls;
697
698   if (0 == size)
699   {
700     LOG (GNUNET_ERROR_TYPE_DEBUG,
701          "%s called with size 0\n", __func__);
702     return 0;
703   }
704   GNUNET_assert (ntohs (socket->ack_msg->header.header.size) <= size);
705   
706   size = ntohs (socket->ack_msg->header.header.size);
707   memcpy (buf, socket->ack_msg, size);
708   
709   GNUNET_free (socket->ack_msg);
710   socket->ack_msg = NULL;
711   socket->ack_transmit_handle = NULL;
712   return size;
713 }
714
715
716 /**
717  * Writes data using the given socket. The amount of data written is limited by
718  * the receiver_window_size
719  *
720  * @param socket the socket to use
721  */
722 static void 
723 write_data (struct GNUNET_STREAM_Socket *socket);
724
725
726 /**
727  * Task for retransmitting data messages if they aren't ACK before their ack
728  * deadline 
729  *
730  * @param cls the socket
731  * @param tc the Task context
732  */
733 static void
734 retransmission_timeout_task (void *cls,
735                              const struct GNUNET_SCHEDULER_TaskContext *tc)
736 {
737   struct GNUNET_STREAM_Socket *socket = cls;
738   
739   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
740     return;
741
742   LOG (GNUNET_ERROR_TYPE_DEBUG,
743        "%s: Retransmitting DATA...\n", GNUNET_i2s (&socket->other_peer));
744   socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
745   write_data (socket);
746 }
747
748
749 /**
750  * Task for sending ACK message
751  *
752  * @param cls the socket
753  * @param tc the Task context
754  */
755 static void
756 ack_task (void *cls,
757           const struct GNUNET_SCHEDULER_TaskContext *tc)
758 {
759   struct GNUNET_STREAM_Socket *socket = cls;
760   struct GNUNET_STREAM_AckMessage *ack_msg;
761
762   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
763   {
764     return;
765   }
766   socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
767   /* Create the ACK Message */
768   ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
769   ack_msg->header.header.size = htons (sizeof (struct 
770                                                GNUNET_STREAM_AckMessage));
771   ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
772   ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
773   ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
774   ack_msg->receive_window_remaining = 
775     htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
776   socket->ack_msg = ack_msg;
777   /* Request MESH for sending ACK */
778   socket->ack_transmit_handle = 
779     GNUNET_MESH_notify_transmit_ready (socket->tunnel,
780                                        0, /* Corking */
781                                        1, /* Priority */
782                                        socket->retransmit_timeout,
783                                        &socket->other_peer,
784                                        ntohs (ack_msg->header.header.size),
785                                        &send_ack_notify,
786                                        socket);
787 }
788
789
790 /**
791  * Retransmission task for shutdown messages
792  *
793  * @param cls the shutdown handle
794  * @param tc the Task Context
795  */
796 static void
797 close_msg_retransmission_task (void *cls,
798                                const struct GNUNET_SCHEDULER_TaskContext *tc)
799 {
800   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle = cls;
801   struct GNUNET_STREAM_MessageHeader *msg;
802   struct GNUNET_STREAM_Socket *socket;
803
804   GNUNET_assert (NULL != shutdown_handle);
805   socket = shutdown_handle->socket;
806
807   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
808   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
809   switch (shutdown_handle->operation)
810   {
811   case SHUT_RDWR:
812     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
813     break;
814   case SHUT_RD:
815     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
816     break;
817   case SHUT_WR:
818     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
819     break;
820   default:
821     GNUNET_free (msg);
822     shutdown_handle->close_msg_retransmission_task_id = 
823       GNUNET_SCHEDULER_NO_TASK;
824     return;
825   }
826   queue_message (socket, msg, NULL, NULL);
827   shutdown_handle->close_msg_retransmission_task_id =
828     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
829                                   &close_msg_retransmission_task,
830                                   shutdown_handle);
831 }
832
833
834 /**
835  * Function to modify a bit in GNUNET_STREAM_AckBitmap
836  *
837  * @param bitmap the bitmap to modify
838  * @param bit the bit number to modify
839  * @param value GNUNET_YES to on, GNUNET_NO to off
840  */
841 static void
842 ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
843                       unsigned int bit, 
844                       int value)
845 {
846   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
847   if (GNUNET_YES == value)
848     *bitmap |= (1LL << bit);
849   else
850     *bitmap &= ~(1LL << bit);
851 }
852
853
854 /**
855  * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
856  *
857  * @param bitmap address of the bitmap that has to be checked
858  * @param bit the bit number to check
859  * @return GNUNET_YES if the bit is set; GNUNET_NO if not
860  */
861 static uint8_t
862 ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
863                       unsigned int bit)
864 {
865   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
866   return 0 != (*bitmap & (1LL << bit));
867 }
868
869
870 /**
871  * Writes data using the given socket. The amount of data written is limited by
872  * the receiver_window_size
873  *
874  * @param socket the socket to use
875  */
876 static void 
877 write_data (struct GNUNET_STREAM_Socket *socket)
878 {
879   struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
880   int packet;                   /* Although an int, should never be negative */
881   int ack_packet;
882
883   ack_packet = -1;
884   /* Find the last acknowledged packet */
885   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
886   {      
887     if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
888                                             packet))
889       ack_packet = packet;        
890     else if (NULL == io_handle->messages[packet])
891       break;
892   }
893   /* Resend packets which weren't ack'ed */
894   for (packet=0; packet < ack_packet; packet++)
895   {
896     if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
897                                            packet))
898     {
899       LOG (GNUNET_ERROR_TYPE_DEBUG,
900            "%s: Placing DATA message with sequence %u in send queue\n",
901            GNUNET_i2s (&socket->other_peer),
902            ntohl (io_handle->messages[packet]->sequence_number));
903       copy_and_queue_message (socket,
904                               &io_handle->messages[packet]->header,
905                               NULL,
906                               NULL);
907     }
908   }
909   packet = ack_packet + 1;
910   /* Now send new packets if there is enough buffer space */
911   while ( (NULL != io_handle->messages[packet]) &&
912           (socket->receiver_window_available 
913            >= ntohs (io_handle->messages[packet]->header.header.size)) &&
914           (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
915   {
916     socket->receiver_window_available -= 
917       ntohs (io_handle->messages[packet]->header.header.size);
918     LOG (GNUNET_ERROR_TYPE_DEBUG,
919          "%s: Placing DATA message with sequence %u in send queue\n",
920          GNUNET_i2s (&socket->other_peer),
921          ntohl (io_handle->messages[packet]->sequence_number));
922     copy_and_queue_message (socket,
923                             &io_handle->messages[packet]->header,
924                             NULL,
925                             NULL);
926     packet++;
927   }
928   if (GNUNET_SCHEDULER_NO_TASK == socket->retransmission_timeout_task_id)
929     socket->retransmission_timeout_task_id = 
930       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 
931                                     (GNUNET_TIME_UNIT_SECONDS, 8),
932                                     &retransmission_timeout_task,
933                                     socket);
934 }
935
936
937 /**
938  * Task for calling the read processor
939  *
940  * @param cls the socket
941  * @param tc the task context
942  */
943 static void
944 call_read_processor (void *cls,
945                      const struct GNUNET_SCHEDULER_TaskContext *tc)
946 {
947   struct GNUNET_STREAM_Socket *socket = cls;
948   size_t read_size;
949   size_t valid_read_size;
950   unsigned int packet;
951   uint32_t sequence_increase;
952   uint32_t offset_increase;
953
954   socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
955   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
956     return;
957
958   if (NULL == socket->receive_buffer) 
959     return;
960
961   GNUNET_assert (NULL != socket->read_handle);
962   GNUNET_assert (NULL != socket->read_handle->proc);
963
964   /* Check the bitmap for any holes */
965   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
966   {
967     if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
968                                            packet))
969       break;
970   }
971   /* We only call read processor if we have the first packet */
972   GNUNET_assert (0 < packet);
973   valid_read_size = 
974     socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
975   GNUNET_assert (0 != valid_read_size);
976   /* Cancel the read_io_timeout_task */
977   GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task_id);
978   socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
979   /* Call the data processor */
980   LOG (GNUNET_ERROR_TYPE_DEBUG,
981        "%s: Calling read processor\n",
982        GNUNET_i2s (&socket->other_peer));
983   read_size = 
984     socket->read_handle->proc (socket->read_handle->proc_cls,
985                                socket->status,
986                                socket->receive_buffer + socket->copy_offset,
987                                valid_read_size);
988   LOG (GNUNET_ERROR_TYPE_DEBUG,
989        "%s: Read processor read %d bytes\n",
990        GNUNET_i2s (&socket->other_peer), read_size);
991   LOG (GNUNET_ERROR_TYPE_DEBUG,
992        "%s: Read processor completed successfully\n",
993        GNUNET_i2s (&socket->other_peer));
994   /* Free the read handle */
995   GNUNET_free (socket->read_handle);
996   socket->read_handle = NULL;
997   GNUNET_assert (read_size <= valid_read_size);
998   socket->copy_offset += read_size;
999   /* Determine upto which packet we can remove from the buffer */
1000   for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
1001   {
1002     if (socket->copy_offset == socket->receive_buffer_boundaries[packet])
1003     { packet++; break; }
1004     if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
1005       break;
1006   }
1007
1008   /* If no packets can be removed we can't move the buffer */
1009   if (0 == packet) return;
1010   sequence_increase = packet;
1011   LOG (GNUNET_ERROR_TYPE_DEBUG,
1012        "%s: Sequence increase after read processor completion: %u\n",
1013        GNUNET_i2s (&socket->other_peer), sequence_increase);
1014
1015   /* Shift the data in the receive buffer */
1016   socket->receive_buffer = 
1017     memmove (socket->receive_buffer,
1018              socket->receive_buffer 
1019              + socket->receive_buffer_boundaries[sequence_increase-1],
1020              socket->receive_buffer_size
1021              - socket->receive_buffer_boundaries[sequence_increase-1]);
1022   /* Shift the bitmap */
1023   socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
1024   /* Set read_sequence_number */
1025   socket->read_sequence_number += sequence_increase;
1026   /* Set read_offset */
1027   offset_increase = socket->receive_buffer_boundaries[sequence_increase-1];
1028   socket->read_offset += offset_increase;
1029   /* Fix copy_offset */
1030   GNUNET_assert (offset_increase <= socket->copy_offset);
1031   socket->copy_offset -= offset_increase;
1032   /* Fix relative boundaries */
1033   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
1034   {
1035     if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase)
1036     {
1037       uint32_t ahead_buffer_boundary;
1038
1039       ahead_buffer_boundary = 
1040         socket->receive_buffer_boundaries[packet + sequence_increase];
1041       if (0 == ahead_buffer_boundary)
1042         socket->receive_buffer_boundaries[packet] = 0;
1043       else
1044       {
1045         GNUNET_assert (offset_increase < ahead_buffer_boundary);
1046         socket->receive_buffer_boundaries[packet] = 
1047           ahead_buffer_boundary - offset_increase;
1048       }
1049     }
1050     else
1051       socket->receive_buffer_boundaries[packet] = 0;
1052   }
1053 }
1054
1055
1056 /**
1057  * Cancels the existing read io handle
1058  *
1059  * @param cls the closure from the SCHEDULER call
1060  * @param tc the task context
1061  */
1062 static void
1063 read_io_timeout (void *cls,
1064                  const struct GNUNET_SCHEDULER_TaskContext *tc)
1065 {
1066   struct GNUNET_STREAM_Socket *socket = cls;
1067   GNUNET_STREAM_DataProcessor proc;
1068   void *proc_cls;
1069
1070   socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
1071   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
1072   {
1073     LOG (GNUNET_ERROR_TYPE_DEBUG,
1074          "%s: Read task timedout - Cancelling it\n",
1075          GNUNET_i2s (&socket->other_peer));
1076     GNUNET_SCHEDULER_cancel (socket->read_task_id);
1077     socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
1078   }
1079   GNUNET_assert (NULL != socket->read_handle);
1080   proc = socket->read_handle->proc;
1081   proc_cls = socket->read_handle->proc_cls;
1082   GNUNET_free (socket->read_handle);
1083   socket->read_handle = NULL;
1084   /* Call the read processor to signal timeout */
1085   proc (proc_cls,
1086         GNUNET_STREAM_TIMEOUT,
1087         NULL,
1088         0);
1089 }
1090
1091
1092 /**
1093  * Handler for DATA messages; Same for both client and server
1094  *
1095  * @param socket the socket through which the ack was received
1096  * @param tunnel connection to the other end
1097  * @param sender who sent the message
1098  * @param msg the data message
1099  * @param atsi performance data for the connection
1100  * @return GNUNET_OK to keep the connection open,
1101  *         GNUNET_SYSERR to close it (signal serious error)
1102  */
1103 static int
1104 handle_data (struct GNUNET_STREAM_Socket *socket,
1105              struct GNUNET_MESH_Tunnel *tunnel,
1106              const struct GNUNET_PeerIdentity *sender,
1107              const struct GNUNET_STREAM_DataMessage *msg,
1108              const struct GNUNET_ATS_Information*atsi)
1109 {
1110   const void *payload;
1111   uint32_t bytes_needed;
1112   uint32_t relative_offset;
1113   uint32_t relative_sequence_number;
1114   uint16_t size;
1115
1116   size = htons (msg->header.header.size);
1117   if (size < sizeof (struct GNUNET_STREAM_DataMessage))
1118   {
1119     GNUNET_break_op (0);
1120     return GNUNET_SYSERR;
1121   }
1122
1123   if (0 != memcmp (sender,
1124                    &socket->other_peer,
1125                    sizeof (struct GNUNET_PeerIdentity)))
1126   {
1127     LOG (GNUNET_ERROR_TYPE_DEBUG,
1128          "%s: Received DATA from non-confirming peer\n",
1129          GNUNET_i2s (&socket->other_peer));
1130     return GNUNET_YES;
1131   }
1132
1133   switch (socket->state)
1134   {
1135   case STATE_ESTABLISHED:
1136   case STATE_TRANSMIT_CLOSED:
1137   case STATE_TRANSMIT_CLOSE_WAIT:
1138       
1139     /* check if the message's sequence number is in the range we are
1140        expecting */
1141     relative_sequence_number = 
1142       ntohl (msg->sequence_number) - socket->read_sequence_number;
1143     if ( relative_sequence_number > GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
1144     {
1145       LOG (GNUNET_ERROR_TYPE_DEBUG,
1146            "%s: Ignoring received message with sequence number %u\n",
1147            GNUNET_i2s (&socket->other_peer),
1148            ntohl (msg->sequence_number));
1149       /* Start ACK sending task if one is not already present */
1150       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1151       {
1152         socket->ack_task_id = 
1153           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1154                                         (msg->ack_deadline),
1155                                         &ack_task,
1156                                         socket);
1157       }
1158       return GNUNET_YES;
1159     }
1160       
1161     /* Check if we have already seen this message */
1162     if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
1163                                             relative_sequence_number))
1164     {
1165       LOG (GNUNET_ERROR_TYPE_DEBUG,
1166            "%s: Ignoring already received message with sequence number %u\n",
1167            GNUNET_i2s (&socket->other_peer),
1168            ntohl (msg->sequence_number));
1169       /* Start ACK sending task if one is not already present */
1170       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1171       {
1172         socket->ack_task_id = 
1173           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1174                                         (msg->ack_deadline),
1175                                         &ack_task,
1176                                         socket);
1177       }
1178       return GNUNET_YES;
1179     }
1180
1181     LOG (GNUNET_ERROR_TYPE_DEBUG,
1182          "%s: Receiving DATA with sequence number: %u and size: %d from %s\n",
1183          GNUNET_i2s (&socket->other_peer),
1184          ntohl (msg->sequence_number),
1185          ntohs (msg->header.header.size),
1186          GNUNET_i2s (&socket->other_peer));
1187       
1188     /* Check if we have to allocate the buffer */
1189     size -= sizeof (struct GNUNET_STREAM_DataMessage);
1190     relative_offset = ntohl (msg->offset) - socket->read_offset;
1191     bytes_needed = relative_offset + size;
1192     if (bytes_needed > socket->receive_buffer_size)
1193     {
1194       if (bytes_needed <= RECEIVE_BUFFER_SIZE)
1195       {
1196         socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
1197                                                  bytes_needed);
1198         socket->receive_buffer_size = bytes_needed;
1199       }
1200       else
1201       {
1202         LOG (GNUNET_ERROR_TYPE_DEBUG,
1203              "%s: Cannot accommodate packet %d as buffer is full\n",
1204              GNUNET_i2s (&socket->other_peer),
1205              ntohl (msg->sequence_number));
1206         return GNUNET_YES;
1207       }
1208     }
1209       
1210     /* Copy Data to buffer */
1211     payload = &msg[1];
1212     GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
1213     memcpy (socket->receive_buffer + relative_offset,
1214             payload,
1215             size);
1216     socket->receive_buffer_boundaries[relative_sequence_number] = 
1217       relative_offset + size;
1218       
1219     /* Modify the ACK bitmap */
1220     ackbitmap_modify_bit (&socket->ack_bitmap,
1221                           relative_sequence_number,
1222                           GNUNET_YES);
1223
1224     /* Start ACK sending task if one is not already present */
1225     if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1226     {
1227       socket->ack_task_id = 
1228         GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1229                                       (msg->ack_deadline),
1230                                       &ack_task,
1231                                       socket);
1232     }
1233
1234     if ((NULL != socket->read_handle) /* A read handle is waiting */
1235         /* There is no current read task */
1236         && (GNUNET_SCHEDULER_NO_TASK == socket->read_task_id)
1237         /* We have the first packet */
1238         && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap,
1239                                                0)))
1240     {
1241       LOG (GNUNET_ERROR_TYPE_DEBUG,
1242            "%s: Scheduling read processor\n",
1243            GNUNET_i2s (&socket->other_peer));
1244           
1245       socket->read_task_id = 
1246         GNUNET_SCHEDULER_add_now (&call_read_processor,
1247                                   socket);
1248     }
1249       
1250     break;
1251
1252   default:
1253     LOG (GNUNET_ERROR_TYPE_DEBUG,
1254          "%s: Received data message when it cannot be handled\n",
1255          GNUNET_i2s (&socket->other_peer));
1256     break;
1257   }
1258   return GNUNET_YES;
1259 }
1260
1261
1262 /**
1263  * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1264  *
1265  * @param cls the socket (set from GNUNET_MESH_connect)
1266  * @param tunnel connection to the other end
1267  * @param tunnel_ctx place to store local state associated with the tunnel
1268  * @param sender who sent the message
1269  * @param message the actual message
1270  * @param atsi performance data for the connection
1271  * @return GNUNET_OK to keep the connection open,
1272  *         GNUNET_SYSERR to close it (signal serious error)
1273  */
1274 static int
1275 client_handle_data (void *cls,
1276                     struct GNUNET_MESH_Tunnel *tunnel,
1277                     void **tunnel_ctx,
1278                     const struct GNUNET_PeerIdentity *sender,
1279                     const struct GNUNET_MessageHeader *message,
1280                     const struct GNUNET_ATS_Information*atsi)
1281 {
1282   struct GNUNET_STREAM_Socket *socket = cls;
1283
1284   return handle_data (socket, 
1285                       tunnel, 
1286                       sender, 
1287                       (const struct GNUNET_STREAM_DataMessage *) message, 
1288                       atsi);
1289 }
1290
1291
1292 /**
1293  * Callback to set state to ESTABLISHED
1294  *
1295  * @param cls the closure from queue_message FIXME: document
1296  * @param socket the socket to requiring state change
1297  */
1298 static void
1299 set_state_established (void *cls,
1300                        struct GNUNET_STREAM_Socket *socket)
1301 {
1302   LOG (GNUNET_ERROR_TYPE_DEBUG, 
1303        "%s: Attaining ESTABLISHED state\n",
1304        GNUNET_i2s (&socket->other_peer));
1305   socket->write_offset = 0;
1306   socket->read_offset = 0;
1307   socket->state = STATE_ESTABLISHED;
1308   if (NULL != socket->lsocket)
1309   {
1310     LOG (GNUNET_ERROR_TYPE_DEBUG,
1311          "%s: Calling listen callback\n",
1312          GNUNET_i2s (&socket->other_peer));
1313     if (GNUNET_SYSERR == 
1314         socket->lsocket->listen_cb (socket->lsocket->listen_cb_cls,
1315                                     socket,
1316                                     &socket->other_peer))
1317     {
1318       socket->state = STATE_CLOSED;
1319       /* FIXME: We should close in a decent way (send RST) */
1320       GNUNET_MESH_tunnel_destroy (socket->tunnel); /* Destroy the tunnel */
1321       GNUNET_free (socket);
1322     }
1323   }
1324   else if (NULL != socket->open_cb)
1325     socket->open_cb (socket->open_cls, socket);
1326 }
1327
1328
1329 /**
1330  * Callback to set state to HELLO_WAIT
1331  *
1332  * @param cls the closure from queue_message
1333  * @param socket the socket to requiring state change
1334  */
1335 static void
1336 set_state_hello_wait (void *cls,
1337                       struct GNUNET_STREAM_Socket *socket)
1338 {
1339   GNUNET_assert (STATE_INIT == socket->state);
1340   LOG (GNUNET_ERROR_TYPE_DEBUG, 
1341        "%s: Attaining HELLO_WAIT state\n",
1342        GNUNET_i2s (&socket->other_peer));
1343   socket->state = STATE_HELLO_WAIT;
1344 }
1345
1346
1347 /**
1348  * Callback to set state to CLOSE_WAIT
1349  *
1350  * @param cls the closure from queue_message
1351  * @param socket the socket requiring state change
1352  */
1353 static void
1354 set_state_close_wait (void *cls,
1355                       struct GNUNET_STREAM_Socket *socket)
1356 {
1357   LOG (GNUNET_ERROR_TYPE_DEBUG,
1358        "%s: Attaing CLOSE_WAIT state\n",
1359        GNUNET_i2s (&socket->other_peer));
1360   socket->state = STATE_CLOSE_WAIT;
1361   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1362   socket->receive_buffer = NULL;
1363   socket->receive_buffer_size = 0;
1364 }
1365
1366
1367 /**
1368  * Callback to set state to RECEIVE_CLOSE_WAIT
1369  *
1370  * @param cls the closure from queue_message
1371  * @param socket the socket requiring state change
1372  */
1373 static void
1374 set_state_receive_close_wait (void *cls,
1375                               struct GNUNET_STREAM_Socket *socket)
1376 {
1377   LOG (GNUNET_ERROR_TYPE_DEBUG,
1378        "%s: Attaing RECEIVE_CLOSE_WAIT state\n",
1379        GNUNET_i2s (&socket->other_peer));
1380   socket->state = STATE_RECEIVE_CLOSE_WAIT;
1381   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1382   socket->receive_buffer = NULL;
1383   socket->receive_buffer_size = 0;
1384 }
1385
1386
1387 /**
1388  * Callback to set state to TRANSMIT_CLOSE_WAIT
1389  *
1390  * @param cls the closure from queue_message
1391  * @param socket the socket requiring state change
1392  */
1393 static void
1394 set_state_transmit_close_wait (void *cls,
1395                                struct GNUNET_STREAM_Socket *socket)
1396 {
1397   LOG (GNUNET_ERROR_TYPE_DEBUG,
1398        "%s: Attaing TRANSMIT_CLOSE_WAIT state\n",
1399        GNUNET_i2s (&socket->other_peer));
1400   socket->state = STATE_TRANSMIT_CLOSE_WAIT;
1401 }
1402
1403
1404 /**
1405  * Callback to set state to CLOSED
1406  *
1407  * @param cls the closure from queue_message
1408  * @param socket the socket requiring state change
1409  */
1410 static void
1411 set_state_closed (void *cls,
1412                   struct GNUNET_STREAM_Socket *socket)
1413 {
1414   socket->state = STATE_CLOSED;
1415 }
1416
1417
1418 /**
1419  * Returns a new HelloAckMessage. Also sets the write sequence number for the
1420  * socket
1421  *
1422  * @param socket the socket for which this HelloAckMessage has to be generated
1423  * @return the HelloAckMessage
1424  */
1425 static struct GNUNET_STREAM_HelloAckMessage *
1426 generate_hello_ack_msg (struct GNUNET_STREAM_Socket *socket)
1427 {
1428   struct GNUNET_STREAM_HelloAckMessage *msg;
1429
1430   /* Get the random sequence number */
1431   if (GNUNET_YES == socket->testing_active)
1432     socket->write_sequence_number =
1433       socket->testing_set_write_sequence_number_value;
1434   else
1435     socket->write_sequence_number = 
1436       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1437   LOG (GNUNET_ERROR_TYPE_DEBUG,
1438        "%s: write sequence number %u\n",
1439        GNUNET_i2s (&socket->other_peer),
1440        (unsigned int) socket->write_sequence_number);
1441   
1442   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1443   msg->header.header.size = 
1444     htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1445   msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1446   msg->sequence_number = htonl (socket->write_sequence_number);
1447   msg->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
1448
1449   return msg;
1450 }
1451
1452
1453 /**
1454  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1455  *
1456  * @param cls the socket (set from GNUNET_MESH_connect)
1457  * @param tunnel connection to the other end
1458  * @param tunnel_ctx this is NULL
1459  * @param sender who sent the message
1460  * @param message the actual message
1461  * @param atsi performance data for the connection
1462  * @return GNUNET_OK to keep the connection open,
1463  *         GNUNET_SYSERR to close it (signal serious error)
1464  */
1465 static int
1466 client_handle_hello_ack (void *cls,
1467                          struct GNUNET_MESH_Tunnel *tunnel,
1468                          void **tunnel_ctx,
1469                          const struct GNUNET_PeerIdentity *sender,
1470                          const struct GNUNET_MessageHeader *message,
1471                          const struct GNUNET_ATS_Information*atsi)
1472 {
1473   struct GNUNET_STREAM_Socket *socket = cls;
1474   const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
1475   struct GNUNET_STREAM_HelloAckMessage *reply;
1476
1477   if (0 != memcmp (sender,
1478                    &socket->other_peer,
1479                    sizeof (struct GNUNET_PeerIdentity)))
1480   {
1481     LOG (GNUNET_ERROR_TYPE_DEBUG,
1482          "%s: Received HELLO_ACK from non-confirming peer\n",
1483          GNUNET_i2s (&socket->other_peer));
1484     return GNUNET_YES;
1485   }
1486   ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
1487   LOG (GNUNET_ERROR_TYPE_DEBUG,
1488        "%s: Received HELLO_ACK from %s\n",
1489        GNUNET_i2s (&socket->other_peer),
1490        GNUNET_i2s (&socket->other_peer));
1491
1492   GNUNET_assert (socket->tunnel == tunnel);
1493   switch (socket->state)
1494   {
1495   case STATE_HELLO_WAIT:
1496     socket->read_sequence_number = ntohl (ack_msg->sequence_number);
1497     LOG (GNUNET_ERROR_TYPE_DEBUG,
1498          "%s: Read sequence number %u\n",
1499          GNUNET_i2s (&socket->other_peer),
1500          (unsigned int) socket->read_sequence_number);
1501     socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
1502     reply = generate_hello_ack_msg (socket);
1503     queue_message (socket,
1504                    &reply->header, 
1505                    &set_state_established, 
1506                    NULL);      
1507     return GNUNET_OK;
1508   case STATE_ESTABLISHED:
1509   case STATE_RECEIVE_CLOSE_WAIT:
1510     // call statistics (# ACKs ignored++)
1511     return GNUNET_OK;
1512   case STATE_INIT:
1513   default:
1514     LOG (GNUNET_ERROR_TYPE_DEBUG,
1515          "%s: Server %s sent HELLO_ACK when in state %d\n", 
1516          GNUNET_i2s (&socket->other_peer),
1517          GNUNET_i2s (&socket->other_peer),
1518          socket->state);
1519     socket->state = STATE_CLOSED; // introduce STATE_ERROR?
1520     return GNUNET_SYSERR;
1521   }
1522
1523 }
1524
1525
1526 /**
1527  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1528  *
1529  * @param cls the socket (set from GNUNET_MESH_connect)
1530  * @param tunnel connection to the other end
1531  * @param tunnel_ctx this is NULL
1532  * @param sender who sent the message
1533  * @param message the actual message
1534  * @param atsi performance data for the connection
1535  * @return GNUNET_OK to keep the connection open,
1536  *         GNUNET_SYSERR to close it (signal serious error)
1537  */
1538 static int
1539 client_handle_reset (void *cls,
1540                      struct GNUNET_MESH_Tunnel *tunnel,
1541                      void **tunnel_ctx,
1542                      const struct GNUNET_PeerIdentity *sender,
1543                      const struct GNUNET_MessageHeader *message,
1544                      const struct GNUNET_ATS_Information*atsi)
1545 {
1546   // struct GNUNET_STREAM_Socket *socket = cls;
1547
1548   return GNUNET_OK;
1549 }
1550
1551
1552 /**
1553  * Common message handler for handling TRANSMIT_CLOSE messages
1554  *
1555  * @param socket the socket through which the ack was received
1556  * @param tunnel connection to the other end
1557  * @param sender who sent the message
1558  * @param msg the transmit close message
1559  * @param atsi performance data for the connection
1560  * @return GNUNET_OK to keep the connection open,
1561  *         GNUNET_SYSERR to close it (signal serious error)
1562  */
1563 static int
1564 handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
1565                        struct GNUNET_MESH_Tunnel *tunnel,
1566                        const struct GNUNET_PeerIdentity *sender,
1567                        const struct GNUNET_STREAM_MessageHeader *msg,
1568                        const struct GNUNET_ATS_Information*atsi)
1569 {
1570   struct GNUNET_STREAM_MessageHeader *reply;
1571
1572   switch (socket->state)
1573   {
1574   case STATE_ESTABLISHED:
1575     socket->state = STATE_RECEIVE_CLOSED;
1576
1577     /* Send TRANSMIT_CLOSE_ACK */
1578     reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1579     reply->header.type = 
1580       htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
1581     reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1582     queue_message (socket, reply, NULL, NULL);
1583     break;
1584
1585   default:
1586     /* FIXME: Call statistics? */
1587     break;
1588   }
1589   return GNUNET_YES;
1590 }
1591
1592
1593 /**
1594  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1595  *
1596  * @param cls the socket (set from GNUNET_MESH_connect)
1597  * @param tunnel connection to the other end
1598  * @param tunnel_ctx this is NULL
1599  * @param sender who sent the message
1600  * @param message the actual message
1601  * @param atsi performance data for the connection
1602  * @return GNUNET_OK to keep the connection open,
1603  *         GNUNET_SYSERR to close it (signal serious error)
1604  */
1605 static int
1606 client_handle_transmit_close (void *cls,
1607                               struct GNUNET_MESH_Tunnel *tunnel,
1608                               void **tunnel_ctx,
1609                               const struct GNUNET_PeerIdentity *sender,
1610                               const struct GNUNET_MessageHeader *message,
1611                               const struct GNUNET_ATS_Information*atsi)
1612 {
1613   struct GNUNET_STREAM_Socket *socket = cls;
1614   
1615   return handle_transmit_close (socket,
1616                                 tunnel,
1617                                 sender,
1618                                 (struct GNUNET_STREAM_MessageHeader *)message,
1619                                 atsi);
1620 }
1621
1622
1623 /**
1624  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_*_CLOSE_ACK messages
1625  *
1626  * @param socket the socket
1627  * @param tunnel connection to the other end
1628  * @param sender who sent the message
1629  * @param message the actual message
1630  * @param atsi performance data for the connection
1631  * @param operation the close operation which is being ACK'ed
1632  * @return GNUNET_OK to keep the connection open,
1633  *         GNUNET_SYSERR to close it (signal serious error)
1634  */
1635 static int
1636 handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
1637                           struct GNUNET_MESH_Tunnel *tunnel,
1638                           const struct GNUNET_PeerIdentity *sender,
1639                           const struct GNUNET_STREAM_MessageHeader *message,
1640                           const struct GNUNET_ATS_Information *atsi,
1641                           int operation)
1642 {
1643   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
1644
1645   shutdown_handle = socket->shutdown_handle;
1646   if (NULL == shutdown_handle)
1647   {
1648     LOG (GNUNET_ERROR_TYPE_DEBUG,
1649          "%s: Received CLOSE_ACK when shutdown handle is NULL\n",
1650          GNUNET_i2s (&socket->other_peer));
1651     return GNUNET_OK;
1652   }
1653
1654   switch (operation)
1655   {
1656   case SHUT_RDWR:
1657     switch (socket->state)
1658     {
1659     case STATE_CLOSE_WAIT:
1660       if (SHUT_RDWR != shutdown_handle->operation)
1661       {
1662         LOG (GNUNET_ERROR_TYPE_DEBUG,
1663              "%s: Received CLOSE_ACK when shutdown handle is not for "
1664              "SHUT_RDWR\n",
1665              GNUNET_i2s (&socket->other_peer));
1666         return GNUNET_OK;
1667       }
1668
1669       LOG (GNUNET_ERROR_TYPE_DEBUG,
1670            "%s: Received CLOSE_ACK from %s\n",
1671            GNUNET_i2s (&socket->other_peer),
1672            GNUNET_i2s (&socket->other_peer));
1673       socket->state = STATE_CLOSED;
1674       break;
1675     default:
1676       LOG (GNUNET_ERROR_TYPE_DEBUG,
1677            "%s: Received CLOSE_ACK when in it not expected\n",
1678            GNUNET_i2s (&socket->other_peer));
1679       return GNUNET_OK;
1680     }
1681     break;
1682
1683   case SHUT_RD:
1684     switch (socket->state)
1685     {
1686     case STATE_RECEIVE_CLOSE_WAIT:
1687       if (SHUT_RD != shutdown_handle->operation)
1688       {
1689         LOG (GNUNET_ERROR_TYPE_DEBUG,
1690              "%s: Received RECEIVE_CLOSE_ACK when shutdown handle "
1691              "is not for SHUT_RD\n",
1692              GNUNET_i2s (&socket->other_peer));
1693         return GNUNET_OK;
1694       }
1695
1696       LOG (GNUNET_ERROR_TYPE_DEBUG,
1697            "%s: Received RECEIVE_CLOSE_ACK from %s\n",
1698            GNUNET_i2s (&socket->other_peer),
1699            GNUNET_i2s (&socket->other_peer));
1700       socket->state = STATE_RECEIVE_CLOSED;
1701       break;
1702     default:
1703       LOG (GNUNET_ERROR_TYPE_DEBUG,
1704            "%s: Received RECEIVE_CLOSE_ACK when in it not expected\n",
1705            GNUNET_i2s (&socket->other_peer));
1706       return GNUNET_OK;
1707     }
1708
1709     break;
1710   case SHUT_WR:
1711     switch (socket->state)
1712     {
1713     case STATE_TRANSMIT_CLOSE_WAIT:
1714       if (SHUT_WR != shutdown_handle->operation)
1715       {
1716         LOG (GNUNET_ERROR_TYPE_DEBUG,
1717              "%s: Received TRANSMIT_CLOSE_ACK when shutdown handle "
1718              "is not for SHUT_WR\n",
1719              GNUNET_i2s (&socket->other_peer));
1720         return GNUNET_OK;
1721       }
1722
1723       LOG (GNUNET_ERROR_TYPE_DEBUG,
1724            "%s: Received TRANSMIT_CLOSE_ACK from %s\n",
1725            GNUNET_i2s (&socket->other_peer),
1726            GNUNET_i2s (&socket->other_peer));
1727       socket->state = STATE_TRANSMIT_CLOSED;
1728       break;
1729     default:
1730       LOG (GNUNET_ERROR_TYPE_DEBUG,
1731            "%s: Received TRANSMIT_CLOSE_ACK when in it not expected\n",
1732            GNUNET_i2s (&socket->other_peer));
1733           
1734       return GNUNET_OK;
1735     }
1736     break;
1737   default:
1738     GNUNET_assert (0);
1739   }
1740
1741   if (NULL != shutdown_handle->completion_cb) /* Shutdown completion */
1742     shutdown_handle->completion_cb(shutdown_handle->completion_cls,
1743                                    operation);
1744   GNUNET_free (shutdown_handle); /* Free shutdown handle */
1745   socket->shutdown_handle = NULL;
1746   if (GNUNET_SCHEDULER_NO_TASK
1747       != shutdown_handle->close_msg_retransmission_task_id)
1748   {
1749     GNUNET_SCHEDULER_cancel
1750       (shutdown_handle->close_msg_retransmission_task_id);
1751     shutdown_handle->close_msg_retransmission_task_id =
1752       GNUNET_SCHEDULER_NO_TASK;
1753   }
1754   return GNUNET_OK;
1755 }
1756
1757
1758 /**
1759  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1760  *
1761  * @param cls the socket (set from GNUNET_MESH_connect)
1762  * @param tunnel connection to the other end
1763  * @param tunnel_ctx this is NULL
1764  * @param sender who sent the message
1765  * @param message the actual message
1766  * @param atsi performance data for the connection
1767  * @return GNUNET_OK to keep the connection open,
1768  *         GNUNET_SYSERR to close it (signal serious error)
1769  */
1770 static int
1771 client_handle_transmit_close_ack (void *cls,
1772                                   struct GNUNET_MESH_Tunnel *tunnel,
1773                                   void **tunnel_ctx,
1774                                   const struct GNUNET_PeerIdentity *sender,
1775                                   const struct GNUNET_MessageHeader *message,
1776                                   const struct GNUNET_ATS_Information*atsi)
1777 {
1778   struct GNUNET_STREAM_Socket *socket = cls;
1779
1780   return handle_generic_close_ack (socket,
1781                                    tunnel,
1782                                    sender,
1783                                    (const struct GNUNET_STREAM_MessageHeader *)
1784                                    message,
1785                                    atsi,
1786                                    SHUT_WR);
1787 }
1788
1789
1790 /**
1791  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1792  *
1793  * @param socket the socket
1794  * @param tunnel connection to the other end
1795  * @param sender who sent the message
1796  * @param message the actual message
1797  * @param atsi performance data for the connection
1798  * @return GNUNET_OK to keep the connection open,
1799  *         GNUNET_SYSERR to close it (signal serious error)
1800  */
1801 static int
1802 handle_receive_close (struct GNUNET_STREAM_Socket *socket,
1803                       struct GNUNET_MESH_Tunnel *tunnel,
1804                       const struct GNUNET_PeerIdentity *sender,
1805                       const struct GNUNET_STREAM_MessageHeader *message,
1806                       const struct GNUNET_ATS_Information *atsi)
1807 {
1808   struct GNUNET_STREAM_MessageHeader *receive_close_ack;
1809
1810   switch (socket->state)
1811   {
1812   case STATE_INIT:
1813   case STATE_LISTEN:
1814   case STATE_HELLO_WAIT:
1815     LOG (GNUNET_ERROR_TYPE_DEBUG,
1816          "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
1817          GNUNET_i2s (&socket->other_peer));
1818     return GNUNET_OK;
1819   default:
1820     break;
1821   }
1822   
1823   LOG (GNUNET_ERROR_TYPE_DEBUG,
1824        "%s: Received RECEIVE_CLOSE from %s\n",
1825        GNUNET_i2s (&socket->other_peer),
1826        GNUNET_i2s (&socket->other_peer));
1827   receive_close_ack =
1828     GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1829   receive_close_ack->header.size =
1830     htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1831   receive_close_ack->header.type =
1832     htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK);
1833   queue_message (socket,
1834                  receive_close_ack,
1835                  &set_state_closed,
1836                  NULL);
1837   
1838   /* FIXME: Handle the case where write handle is present; the write operation
1839      should be deemed as finised and the write continuation callback
1840      has to be called with the stream status GNUNET_STREAM_SHUTDOWN */
1841   return GNUNET_OK;
1842 }
1843
1844
1845 /**
1846  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1847  *
1848  * @param cls the socket (set from GNUNET_MESH_connect)
1849  * @param tunnel connection to the other end
1850  * @param tunnel_ctx this is NULL
1851  * @param sender who sent the message
1852  * @param message the actual message
1853  * @param atsi performance data for the connection
1854  * @return GNUNET_OK to keep the connection open,
1855  *         GNUNET_SYSERR to close it (signal serious error)
1856  */
1857 static int
1858 client_handle_receive_close (void *cls,
1859                              struct GNUNET_MESH_Tunnel *tunnel,
1860                              void **tunnel_ctx,
1861                              const struct GNUNET_PeerIdentity *sender,
1862                              const struct GNUNET_MessageHeader *message,
1863                              const struct GNUNET_ATS_Information*atsi)
1864 {
1865   struct GNUNET_STREAM_Socket *socket = cls;
1866
1867   return
1868     handle_receive_close (socket,
1869                           tunnel,
1870                           sender,
1871                           (const struct GNUNET_STREAM_MessageHeader *) message,
1872                           atsi);
1873 }
1874
1875
1876 /**
1877  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1878  *
1879  * @param cls the socket (set from GNUNET_MESH_connect)
1880  * @param tunnel connection to the other end
1881  * @param tunnel_ctx this is NULL
1882  * @param sender who sent the message
1883  * @param message the actual message
1884  * @param atsi performance data for the connection
1885  * @return GNUNET_OK to keep the connection open,
1886  *         GNUNET_SYSERR to close it (signal serious error)
1887  */
1888 static int
1889 client_handle_receive_close_ack (void *cls,
1890                                  struct GNUNET_MESH_Tunnel *tunnel,
1891                                  void **tunnel_ctx,
1892                                  const struct GNUNET_PeerIdentity *sender,
1893                                  const struct GNUNET_MessageHeader *message,
1894                                  const struct GNUNET_ATS_Information*atsi)
1895 {
1896   struct GNUNET_STREAM_Socket *socket = cls;
1897
1898   return handle_generic_close_ack (socket,
1899                                    tunnel,
1900                                    sender,
1901                                    (const struct GNUNET_STREAM_MessageHeader *)
1902                                    message,
1903                                    atsi,
1904                                    SHUT_RD);
1905 }
1906
1907
1908 /**
1909  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1910  *
1911  * @param socket the socket
1912  * @param tunnel connection to the other end
1913  * @param sender who sent the message
1914  * @param message the actual message
1915  * @param atsi performance data for the connection
1916  * @return GNUNET_OK to keep the connection open,
1917  *         GNUNET_SYSERR to close it (signal serious error)
1918  */
1919 static int
1920 handle_close (struct GNUNET_STREAM_Socket *socket,
1921               struct GNUNET_MESH_Tunnel *tunnel,
1922               const struct GNUNET_PeerIdentity *sender,
1923               const struct GNUNET_STREAM_MessageHeader *message,
1924               const struct GNUNET_ATS_Information*atsi)
1925 {
1926   struct GNUNET_STREAM_MessageHeader *close_ack;
1927
1928   switch (socket->state)
1929   {
1930   case STATE_INIT:
1931   case STATE_LISTEN:
1932   case STATE_HELLO_WAIT:
1933     LOG (GNUNET_ERROR_TYPE_DEBUG,
1934          "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
1935          GNUNET_i2s (&socket->other_peer));
1936     return GNUNET_OK;
1937   default:
1938     break;
1939   }
1940
1941   LOG (GNUNET_ERROR_TYPE_DEBUG,
1942        "%s: Received CLOSE from %s\n",
1943        GNUNET_i2s (&socket->other_peer),
1944        GNUNET_i2s (&socket->other_peer));
1945   close_ack = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1946   close_ack->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1947   close_ack->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK);
1948   queue_message (socket,
1949                  close_ack,
1950                  &set_state_closed,
1951                  NULL);
1952   if (socket->state == STATE_CLOSED)
1953     return GNUNET_OK;
1954
1955   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1956   socket->receive_buffer = NULL;
1957   socket->receive_buffer_size = 0;
1958   return GNUNET_OK;
1959 }
1960
1961
1962 /**
1963  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1964  *
1965  * @param cls the socket (set from GNUNET_MESH_connect)
1966  * @param tunnel connection to the other end
1967  * @param tunnel_ctx this is NULL
1968  * @param sender who sent the message
1969  * @param message the actual message
1970  * @param atsi performance data for the connection
1971  * @return GNUNET_OK to keep the connection open,
1972  *         GNUNET_SYSERR to close it (signal serious error)
1973  */
1974 static int
1975 client_handle_close (void *cls,
1976                      struct GNUNET_MESH_Tunnel *tunnel,
1977                      void **tunnel_ctx,
1978                      const struct GNUNET_PeerIdentity *sender,
1979                      const struct GNUNET_MessageHeader *message,
1980                      const struct GNUNET_ATS_Information*atsi)
1981 {
1982   struct GNUNET_STREAM_Socket *socket = cls;
1983
1984   return handle_close (socket,
1985                        tunnel,
1986                        sender,
1987                        (const struct GNUNET_STREAM_MessageHeader *) message,
1988                        atsi);
1989 }
1990
1991
1992 /**
1993  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1994  *
1995  * @param cls the socket (set from GNUNET_MESH_connect)
1996  * @param tunnel connection to the other end
1997  * @param tunnel_ctx this is NULL
1998  * @param sender who sent the message
1999  * @param message the actual message
2000  * @param atsi performance data for the connection
2001  * @return GNUNET_OK to keep the connection open,
2002  *         GNUNET_SYSERR to close it (signal serious error)
2003  */
2004 static int
2005 client_handle_close_ack (void *cls,
2006                          struct GNUNET_MESH_Tunnel *tunnel,
2007                          void **tunnel_ctx,
2008                          const struct GNUNET_PeerIdentity *sender,
2009                          const struct GNUNET_MessageHeader *message,
2010                          const struct GNUNET_ATS_Information *atsi)
2011 {
2012   struct GNUNET_STREAM_Socket *socket = cls;
2013
2014   return handle_generic_close_ack (socket,
2015                                    tunnel,
2016                                    sender,
2017                                    (const struct GNUNET_STREAM_MessageHeader *) 
2018                                    message,
2019                                    atsi,
2020                                    SHUT_RDWR);
2021 }
2022
2023 /*****************************/
2024 /* Server's Message Handlers */
2025 /*****************************/
2026
2027 /**
2028  * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
2029  *
2030  * @param cls the closure
2031  * @param tunnel connection to the other end
2032  * @param tunnel_ctx the socket
2033  * @param sender who sent the message
2034  * @param message the actual message
2035  * @param atsi performance data for the connection
2036  * @return GNUNET_OK to keep the connection open,
2037  *         GNUNET_SYSERR to close it (signal serious error)
2038  */
2039 static int
2040 server_handle_data (void *cls,
2041                     struct GNUNET_MESH_Tunnel *tunnel,
2042                     void **tunnel_ctx,
2043                     const struct GNUNET_PeerIdentity *sender,
2044                     const struct GNUNET_MessageHeader *message,
2045                     const struct GNUNET_ATS_Information*atsi)
2046 {
2047   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2048
2049   return handle_data (socket,
2050                       tunnel,
2051                       sender,
2052                       (const struct GNUNET_STREAM_DataMessage *)message,
2053                       atsi);
2054 }
2055
2056
2057 /**
2058  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
2059  *
2060  * @param cls the closure
2061  * @param tunnel connection to the other end
2062  * @param tunnel_ctx the socket
2063  * @param sender who sent the message
2064  * @param message the actual message
2065  * @param atsi performance data for the connection
2066  * @return GNUNET_OK to keep the connection open,
2067  *         GNUNET_SYSERR to close it (signal serious error)
2068  */
2069 static int
2070 server_handle_hello (void *cls,
2071                      struct GNUNET_MESH_Tunnel *tunnel,
2072                      void **tunnel_ctx,
2073                      const struct GNUNET_PeerIdentity *sender,
2074                      const struct GNUNET_MessageHeader *message,
2075                      const struct GNUNET_ATS_Information*atsi)
2076 {
2077   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2078   struct GNUNET_STREAM_HelloAckMessage *reply;
2079
2080   if (0 != memcmp (sender,
2081                    &socket->other_peer,
2082                    sizeof (struct GNUNET_PeerIdentity)))
2083   {
2084     LOG (GNUNET_ERROR_TYPE_DEBUG,
2085          "%s: Received HELLO from non-confirming peer\n",
2086          GNUNET_i2s (&socket->other_peer));
2087     return GNUNET_YES;
2088   }
2089
2090   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == 
2091                  ntohs (message->type));
2092   GNUNET_assert (socket->tunnel == tunnel);
2093   LOG (GNUNET_ERROR_TYPE_DEBUG,
2094        "%s: Received HELLO from %s\n", 
2095        GNUNET_i2s (&socket->other_peer),
2096        GNUNET_i2s (&socket->other_peer));
2097
2098   if (STATE_INIT == socket->state)
2099   {
2100     reply = generate_hello_ack_msg (socket);
2101     queue_message (socket, 
2102                    &reply->header,
2103                    &set_state_hello_wait, 
2104                    NULL);
2105   }
2106   else
2107   {
2108     LOG (GNUNET_ERROR_TYPE_DEBUG,
2109          "%s: Client sent HELLO when in state %d\n", 
2110          GNUNET_i2s (&socket->other_peer),
2111          socket->state);
2112     /* FIXME: Send RESET? */
2113       
2114   }
2115   return GNUNET_OK;
2116 }
2117
2118
2119 /**
2120  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
2121  *
2122  * @param cls the closure
2123  * @param tunnel connection to the other end
2124  * @param tunnel_ctx the socket
2125  * @param sender who sent the message
2126  * @param message the actual message
2127  * @param atsi performance data for the connection
2128  * @return GNUNET_OK to keep the connection open,
2129  *         GNUNET_SYSERR to close it (signal serious error)
2130  */
2131 static int
2132 server_handle_hello_ack (void *cls,
2133                          struct GNUNET_MESH_Tunnel *tunnel,
2134                          void **tunnel_ctx,
2135                          const struct GNUNET_PeerIdentity *sender,
2136                          const struct GNUNET_MessageHeader *message,
2137                          const struct GNUNET_ATS_Information*atsi)
2138 {
2139   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2140   const struct GNUNET_STREAM_HelloAckMessage *ack_message;
2141
2142   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK ==
2143                  ntohs (message->type));
2144   GNUNET_assert (socket->tunnel == tunnel);
2145   ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
2146   if (STATE_HELLO_WAIT == socket->state)
2147   {
2148     LOG (GNUNET_ERROR_TYPE_DEBUG,
2149          "%s: Received HELLO_ACK from %s\n",
2150          GNUNET_i2s (&socket->other_peer),
2151          GNUNET_i2s (&socket->other_peer));
2152     socket->read_sequence_number = ntohl (ack_message->sequence_number);
2153     LOG (GNUNET_ERROR_TYPE_DEBUG,
2154          "%s: Read sequence number %u\n",
2155          GNUNET_i2s (&socket->other_peer),
2156          (unsigned int) socket->read_sequence_number);
2157     socket->receiver_window_available = 
2158       ntohl (ack_message->receiver_window_size);
2159     /* Attain ESTABLISHED state */
2160     set_state_established (NULL, socket);
2161   }
2162   else
2163   {
2164     LOG (GNUNET_ERROR_TYPE_DEBUG,
2165          "Client sent HELLO_ACK when in state %d\n", socket->state);
2166     /* FIXME: Send RESET? */
2167       
2168   }
2169   return GNUNET_OK;
2170 }
2171
2172
2173 /**
2174  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
2175  *
2176  * @param cls the closure
2177  * @param tunnel connection to the other end
2178  * @param tunnel_ctx the socket
2179  * @param sender who sent the message
2180  * @param message the actual message
2181  * @param atsi performance data for the connection
2182  * @return GNUNET_OK to keep the connection open,
2183  *         GNUNET_SYSERR to close it (signal serious error)
2184  */
2185 static int
2186 server_handle_reset (void *cls,
2187                      struct GNUNET_MESH_Tunnel *tunnel,
2188                      void **tunnel_ctx,
2189                      const struct GNUNET_PeerIdentity *sender,
2190                      const struct GNUNET_MessageHeader *message,
2191                      const struct GNUNET_ATS_Information*atsi)
2192 {
2193   // struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2194
2195   return GNUNET_OK;
2196 }
2197
2198
2199 /**
2200  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
2201  *
2202  * @param cls the closure
2203  * @param tunnel connection to the other end
2204  * @param tunnel_ctx the socket
2205  * @param sender who sent the message
2206  * @param message the actual message
2207  * @param atsi performance data for the connection
2208  * @return GNUNET_OK to keep the connection open,
2209  *         GNUNET_SYSERR to close it (signal serious error)
2210  */
2211 static int
2212 server_handle_transmit_close (void *cls,
2213                               struct GNUNET_MESH_Tunnel *tunnel,
2214                               void **tunnel_ctx,
2215                               const struct GNUNET_PeerIdentity *sender,
2216                               const struct GNUNET_MessageHeader *message,
2217                               const struct GNUNET_ATS_Information*atsi)
2218 {
2219   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2220
2221   return handle_transmit_close (socket,
2222                                 tunnel,
2223                                 sender,
2224                                 (struct GNUNET_STREAM_MessageHeader *)message,
2225                                 atsi);
2226 }
2227
2228
2229 /**
2230  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
2231  *
2232  * @param cls the closure
2233  * @param tunnel connection to the other end
2234  * @param tunnel_ctx the socket
2235  * @param sender who sent the message
2236  * @param message the actual message
2237  * @param atsi performance data for the connection
2238  * @return GNUNET_OK to keep the connection open,
2239  *         GNUNET_SYSERR to close it (signal serious error)
2240  */
2241 static int
2242 server_handle_transmit_close_ack (void *cls,
2243                                   struct GNUNET_MESH_Tunnel *tunnel,
2244                                   void **tunnel_ctx,
2245                                   const struct GNUNET_PeerIdentity *sender,
2246                                   const struct GNUNET_MessageHeader *message,
2247                                   const struct GNUNET_ATS_Information*atsi)
2248 {
2249   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2250
2251   return handle_generic_close_ack (socket,
2252                                    tunnel,
2253                                    sender,
2254                                    (const struct GNUNET_STREAM_MessageHeader *)
2255                                    message,
2256                                    atsi,
2257                                    SHUT_WR);
2258 }
2259
2260
2261 /**
2262  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
2263  *
2264  * @param cls the closure
2265  * @param tunnel connection to the other end
2266  * @param tunnel_ctx the socket
2267  * @param sender who sent the message
2268  * @param message the actual message
2269  * @param atsi performance data for the connection
2270  * @return GNUNET_OK to keep the connection open,
2271  *         GNUNET_SYSERR to close it (signal serious error)
2272  */
2273 static int
2274 server_handle_receive_close (void *cls,
2275                              struct GNUNET_MESH_Tunnel *tunnel,
2276                              void **tunnel_ctx,
2277                              const struct GNUNET_PeerIdentity *sender,
2278                              const struct GNUNET_MessageHeader *message,
2279                              const struct GNUNET_ATS_Information*atsi)
2280 {
2281   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2282
2283   return
2284     handle_receive_close (socket,
2285                           tunnel,
2286                           sender,
2287                           (const struct GNUNET_STREAM_MessageHeader *) message,
2288                           atsi);
2289 }
2290
2291
2292 /**
2293  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
2294  *
2295  * @param cls the closure
2296  * @param tunnel connection to the other end
2297  * @param tunnel_ctx the socket
2298  * @param sender who sent the message
2299  * @param message the actual message
2300  * @param atsi performance data for the connection
2301  * @return GNUNET_OK to keep the connection open,
2302  *         GNUNET_SYSERR to close it (signal serious error)
2303  */
2304 static int
2305 server_handle_receive_close_ack (void *cls,
2306                                  struct GNUNET_MESH_Tunnel *tunnel,
2307                                  void **tunnel_ctx,
2308                                  const struct GNUNET_PeerIdentity *sender,
2309                                  const struct GNUNET_MessageHeader *message,
2310                                  const struct GNUNET_ATS_Information*atsi)
2311 {
2312   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2313
2314   return handle_generic_close_ack (socket,
2315                                    tunnel,
2316                                    sender,
2317                                    (const struct GNUNET_STREAM_MessageHeader *)
2318                                    message,
2319                                    atsi,
2320                                    SHUT_RD);
2321 }
2322
2323
2324 /**
2325  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
2326  *
2327  * @param cls the listen socket (from GNUNET_MESH_connect in
2328  *          GNUNET_STREAM_listen) 
2329  * @param tunnel connection to the other end
2330  * @param tunnel_ctx the socket
2331  * @param sender who sent the message
2332  * @param message the actual message
2333  * @param atsi performance data for the connection
2334  * @return GNUNET_OK to keep the connection open,
2335  *         GNUNET_SYSERR to close it (signal serious error)
2336  */
2337 static int
2338 server_handle_close (void *cls,
2339                      struct GNUNET_MESH_Tunnel *tunnel,
2340                      void **tunnel_ctx,
2341                      const struct GNUNET_PeerIdentity *sender,
2342                      const struct GNUNET_MessageHeader *message,
2343                      const struct GNUNET_ATS_Information*atsi)
2344 {
2345   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2346   
2347   return handle_close (socket,
2348                        tunnel,
2349                        sender,
2350                        (const struct GNUNET_STREAM_MessageHeader *) message,
2351                        atsi);
2352 }
2353
2354
2355 /**
2356  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
2357  *
2358  * @param cls the closure
2359  * @param tunnel connection to the other end
2360  * @param tunnel_ctx the socket
2361  * @param sender who sent the message
2362  * @param message the actual message
2363  * @param atsi performance data for the connection
2364  * @return GNUNET_OK to keep the connection open,
2365  *         GNUNET_SYSERR to close it (signal serious error)
2366  */
2367 static int
2368 server_handle_close_ack (void *cls,
2369                          struct GNUNET_MESH_Tunnel *tunnel,
2370                          void **tunnel_ctx,
2371                          const struct GNUNET_PeerIdentity *sender,
2372                          const struct GNUNET_MessageHeader *message,
2373                          const struct GNUNET_ATS_Information*atsi)
2374 {
2375   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2376
2377   return handle_generic_close_ack (socket,
2378                                    tunnel,
2379                                    sender,
2380                                    (const struct GNUNET_STREAM_MessageHeader *) 
2381                                    message,
2382                                    atsi,
2383                                    SHUT_RDWR);
2384 }
2385
2386
2387 /**
2388  * Handler for DATA_ACK messages
2389  *
2390  * @param socket the socket through which the ack was received
2391  * @param tunnel connection to the other end
2392  * @param sender who sent the message
2393  * @param ack the acknowledgment message
2394  * @param atsi performance data for the connection
2395  * @return GNUNET_OK to keep the connection open,
2396  *         GNUNET_SYSERR to close it (signal serious error)
2397  */
2398 static int
2399 handle_ack (struct GNUNET_STREAM_Socket *socket,
2400             struct GNUNET_MESH_Tunnel *tunnel,
2401             const struct GNUNET_PeerIdentity *sender,
2402             const struct GNUNET_STREAM_AckMessage *ack,
2403             const struct GNUNET_ATS_Information*atsi)
2404 {
2405   unsigned int packet;
2406   int need_retransmission;
2407   uint32_t sequence_difference;
2408   
2409   if (0 != memcmp (sender,
2410                    &socket->other_peer,
2411                    sizeof (struct GNUNET_PeerIdentity)))
2412   {
2413     LOG (GNUNET_ERROR_TYPE_DEBUG,
2414          "%s: Received ACK from non-confirming peer\n",
2415          GNUNET_i2s (&socket->other_peer));
2416     return GNUNET_YES;
2417   }
2418   switch (socket->state)
2419   {
2420   case (STATE_ESTABLISHED):
2421   case (STATE_RECEIVE_CLOSED):
2422   case (STATE_RECEIVE_CLOSE_WAIT):
2423     if (NULL == socket->write_handle)
2424     {
2425       LOG (GNUNET_ERROR_TYPE_DEBUG,
2426            "%s: Received DATA_ACK when write_handle is NULL\n",
2427            GNUNET_i2s (&socket->other_peer));
2428       return GNUNET_OK;
2429     }
2430     /* FIXME: increment in the base sequence number is breaking current flow
2431      */
2432     if (!((socket->write_sequence_number 
2433            - ntohl (ack->base_sequence_number)) < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
2434     {
2435       LOG (GNUNET_ERROR_TYPE_DEBUG,
2436            "%s: Received DATA_ACK with unexpected base sequence number\n",
2437            GNUNET_i2s (&socket->other_peer));
2438       LOG (GNUNET_ERROR_TYPE_DEBUG,
2439            "%s: Current write sequence: %u; Ack's base sequence: %u\n",
2440            GNUNET_i2s (&socket->other_peer),
2441            socket->write_sequence_number,
2442            ntohl (ack->base_sequence_number));
2443       return GNUNET_OK;
2444     }
2445     /* FIXME: include the case when write_handle is cancelled - ignore the 
2446        acks */
2447     LOG (GNUNET_ERROR_TYPE_DEBUG,
2448          "%s: Received DATA_ACK from %s\n",
2449          GNUNET_i2s (&socket->other_peer),
2450          GNUNET_i2s (&socket->other_peer));
2451       
2452     /* Cancel the retransmission task */
2453     if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
2454     {
2455       GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
2456       socket->retransmission_timeout_task_id = 
2457         GNUNET_SCHEDULER_NO_TASK;
2458     }
2459     for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2460     {
2461       if (NULL == socket->write_handle->messages[packet]) break;
2462       /* BS: Base sequence from ack; PS: sequence num of current packet */
2463       sequence_difference = ntohl (ack->base_sequence_number)
2464         - ntohl (socket->write_handle->messages[packet]->sequence_number);
2465       /* case where BS = PS + GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH */
2466       if ((sequence_difference == GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
2467           || ((sequence_difference < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
2468               && (0 != sequence_difference))) /* case: BS > PS and BS != PS*/
2469       {
2470         ackbitmap_modify_bit (&socket->write_handle->ack_bitmap, packet,
2471                               GNUNET_YES);
2472         continue;
2473       }
2474       if (GNUNET_YES == 
2475           ackbitmap_is_bit_set (&socket->write_handle->ack_bitmap,
2476                                 -sequence_difference))/*inversion as PS >= BS */
2477       {
2478         ackbitmap_modify_bit (&socket->write_handle->ack_bitmap, packet,
2479                               GNUNET_YES);
2480       }
2481     }
2482     /* Update the receive window remaining
2483        FIXME : Should update with the value from a data ack with greater
2484        sequence number */
2485     socket->receiver_window_available = 
2486       ntohl (ack->receive_window_remaining);
2487     /* Check if we have received all acknowledgements */
2488     need_retransmission = GNUNET_NO;
2489     for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2490     {
2491       if (NULL == socket->write_handle->messages[packet]) break;
2492       if (GNUNET_YES != ackbitmap_is_bit_set 
2493           (&socket->write_handle->ack_bitmap,packet))
2494       {
2495         need_retransmission = GNUNET_YES;
2496         break;
2497       }
2498     }
2499     if (GNUNET_YES == need_retransmission)
2500     {
2501       write_data (socket);
2502     }
2503     else      /* We have to call the write continuation callback now */
2504     {
2505       struct GNUNET_STREAM_IOWriteHandle *write_handle;
2506       
2507       /* Free the packets */
2508       for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2509       {
2510         GNUNET_free_non_null (socket->write_handle->messages[packet]);
2511       }
2512       write_handle = socket->write_handle;
2513       socket->write_handle = NULL;
2514       if (NULL != write_handle->write_cont)
2515         write_handle->write_cont (write_handle->write_cont_cls,
2516                                   socket->status,
2517                                   write_handle->size);
2518       /* We are done with the write handle - Freeing it */
2519       GNUNET_free (write_handle);
2520       LOG (GNUNET_ERROR_TYPE_DEBUG,
2521            "%s: Write completion callback completed\n",
2522            GNUNET_i2s (&socket->other_peer));      
2523     }
2524     break;
2525   default:
2526     break;
2527   }
2528   return GNUNET_OK;
2529 }
2530
2531
2532 /**
2533  * Handler for DATA_ACK messages
2534  *
2535  * @param cls the 'struct GNUNET_STREAM_Socket'
2536  * @param tunnel connection to the other end
2537  * @param tunnel_ctx unused
2538  * @param sender who sent the message
2539  * @param message the actual message
2540  * @param atsi performance data for the connection
2541  * @return GNUNET_OK to keep the connection open,
2542  *         GNUNET_SYSERR to close it (signal serious error)
2543  */
2544 static int
2545 client_handle_ack (void *cls,
2546                    struct GNUNET_MESH_Tunnel *tunnel,
2547                    void **tunnel_ctx,
2548                    const struct GNUNET_PeerIdentity *sender,
2549                    const struct GNUNET_MessageHeader *message,
2550                    const struct GNUNET_ATS_Information*atsi)
2551 {
2552   struct GNUNET_STREAM_Socket *socket = cls;
2553   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2554  
2555   return handle_ack (socket, tunnel, sender, ack, atsi);
2556 }
2557
2558
2559 /**
2560  * Handler for DATA_ACK messages
2561  *
2562  * @param cls the server's listen socket
2563  * @param tunnel connection to the other end
2564  * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
2565  * @param sender who sent the message
2566  * @param message the actual message
2567  * @param atsi performance data for the connection
2568  * @return GNUNET_OK to keep the connection open,
2569  *         GNUNET_SYSERR to close it (signal serious error)
2570  */
2571 static int
2572 server_handle_ack (void *cls,
2573                    struct GNUNET_MESH_Tunnel *tunnel,
2574                    void **tunnel_ctx,
2575                    const struct GNUNET_PeerIdentity *sender,
2576                    const struct GNUNET_MessageHeader *message,
2577                    const struct GNUNET_ATS_Information*atsi)
2578 {
2579   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2580   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2581  
2582   return handle_ack (socket, tunnel, sender, ack, atsi);
2583 }
2584
2585
2586 /**
2587  * For client message handlers, the stream socket is in the
2588  * closure argument.
2589  */
2590 static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
2591   {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2592   {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2593    sizeof (struct GNUNET_STREAM_AckMessage) },
2594   {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2595    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2596   {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2597    sizeof (struct GNUNET_STREAM_MessageHeader)},
2598   {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2599    sizeof (struct GNUNET_STREAM_MessageHeader)},
2600   {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2601    sizeof (struct GNUNET_STREAM_MessageHeader)},
2602   {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2603    sizeof (struct GNUNET_STREAM_MessageHeader)},
2604   {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2605    sizeof (struct GNUNET_STREAM_MessageHeader)},
2606   {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2607    sizeof (struct GNUNET_STREAM_MessageHeader)},
2608   {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2609    sizeof (struct GNUNET_STREAM_MessageHeader)},
2610   {NULL, 0, 0}
2611 };
2612
2613
2614 /**
2615  * For server message handlers, the stream socket is in the
2616  * tunnel context, and the listen socket in the closure argument.
2617  */
2618 static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
2619   {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2620   {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2621    sizeof (struct GNUNET_STREAM_AckMessage) },
2622   {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 
2623    sizeof (struct GNUNET_STREAM_MessageHeader)},
2624   {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2625    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2626   {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2627    sizeof (struct GNUNET_STREAM_MessageHeader)},
2628   {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2629    sizeof (struct GNUNET_STREAM_MessageHeader)},
2630   {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2631    sizeof (struct GNUNET_STREAM_MessageHeader)},
2632   {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2633    sizeof (struct GNUNET_STREAM_MessageHeader)},
2634   {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2635    sizeof (struct GNUNET_STREAM_MessageHeader)},
2636   {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2637    sizeof (struct GNUNET_STREAM_MessageHeader)},
2638   {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2639    sizeof (struct GNUNET_STREAM_MessageHeader)},
2640   {NULL, 0, 0}
2641 };
2642
2643
2644 /**
2645  * Function called when our target peer is connected to our tunnel
2646  *
2647  * @param cls the socket for which this tunnel is created
2648  * @param peer the peer identity of the target
2649  * @param atsi performance data for the connection
2650  */
2651 static void
2652 mesh_peer_connect_callback (void *cls,
2653                             const struct GNUNET_PeerIdentity *peer,
2654                             const struct GNUNET_ATS_Information * atsi)
2655 {
2656   struct GNUNET_STREAM_Socket *socket = cls;
2657   struct GNUNET_STREAM_MessageHeader *message;
2658   
2659   if (0 != memcmp (peer,
2660                    &socket->other_peer,
2661                    sizeof (struct GNUNET_PeerIdentity)))
2662   {
2663     LOG (GNUNET_ERROR_TYPE_DEBUG,
2664          "%s: A peer which is not our target has connected to our tunnel\n",
2665          GNUNET_i2s(peer));
2666     return;
2667   }
2668   
2669   LOG (GNUNET_ERROR_TYPE_DEBUG,
2670        "%s: Target peer %s connected\n",
2671        GNUNET_i2s (&socket->other_peer),
2672        GNUNET_i2s (&socket->other_peer));
2673   
2674   /* Set state to INIT */
2675   socket->state = STATE_INIT;
2676
2677   /* Send HELLO message */
2678   message = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
2679   message->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
2680   message->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
2681   queue_message (socket,
2682                  message,
2683                  &set_state_hello_wait,
2684                  NULL);
2685
2686   /* Call open callback */
2687   if (NULL == socket->open_cb)
2688   {
2689     LOG (GNUNET_ERROR_TYPE_DEBUG,
2690          "STREAM_open callback is NULL\n");
2691   }
2692 }
2693
2694
2695 /**
2696  * Function called when our target peer is disconnected from our tunnel
2697  *
2698  * @param cls the socket associated which this tunnel
2699  * @param peer the peer identity of the target
2700  */
2701 static void
2702 mesh_peer_disconnect_callback (void *cls,
2703                                const struct GNUNET_PeerIdentity *peer)
2704 {
2705   struct GNUNET_STREAM_Socket *socket=cls;
2706   
2707   /* If the state is SHUTDOWN its ok; else set the state of the socket to SYSERR */
2708   LOG (GNUNET_ERROR_TYPE_DEBUG,
2709        "%s: Other peer %s disconnected \n",
2710        GNUNET_i2s (&socket->other_peer),
2711        GNUNET_i2s (&socket->other_peer));
2712 }
2713
2714
2715 /**
2716  * Method called whenever a peer creates a tunnel to us
2717  *
2718  * @param cls closure
2719  * @param tunnel new handle to the tunnel
2720  * @param initiator peer that started the tunnel
2721  * @param atsi performance information for the tunnel
2722  * @return initial tunnel context for the tunnel
2723  *         (can be NULL -- that's not an error)
2724  */
2725 static void *
2726 new_tunnel_notify (void *cls,
2727                    struct GNUNET_MESH_Tunnel *tunnel,
2728                    const struct GNUNET_PeerIdentity *initiator,
2729                    const struct GNUNET_ATS_Information *atsi)
2730 {
2731   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2732   struct GNUNET_STREAM_Socket *socket;
2733
2734   /* FIXME: If a tunnel is already created, we should not accept new tunnels
2735      from the same peer again until the socket is closed */
2736
2737   if (GNUNET_NO == lsocket->listening)
2738   {
2739     LOG (GNUNET_ERROR_TYPE_DEBUG,
2740          "%s: Destroying tunnel from peer %s as we don't have the lock\n",
2741          GNUNET_i2s (&socket->other_peer),
2742          GNUNET_i2s (&socket->other_peer));
2743     GNUNET_MESH_tunnel_destroy (tunnel);
2744     return NULL;
2745   }
2746   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2747   socket->other_peer = *initiator;
2748   socket->tunnel = tunnel;
2749   socket->session_id = 0;       /* FIXME */
2750   socket->state = STATE_INIT;
2751   socket->lsocket = lsocket;
2752   socket->retransmit_timeout = lsocket->retransmit_timeout;
2753   socket->testing_active = lsocket->testing_active;
2754   socket->testing_set_write_sequence_number_value =
2755     lsocket->testing_set_write_sequence_number_value;
2756     
2757   LOG (GNUNET_ERROR_TYPE_DEBUG,
2758        "%s: Peer %s initiated tunnel to us\n", 
2759        GNUNET_i2s (&socket->other_peer),
2760        GNUNET_i2s (&socket->other_peer));
2761   
2762   /* FIXME: Copy MESH handle from lsocket to socket */
2763   
2764   return socket;
2765 }
2766
2767
2768 /**
2769  * Function called whenever an inbound tunnel is destroyed.  Should clean up
2770  * any associated state.  This function is NOT called if the client has
2771  * explicitly asked for the tunnel to be destroyed using
2772  * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
2773  * the tunnel.
2774  *
2775  * @param cls closure (set from GNUNET_MESH_connect)
2776  * @param tunnel connection to the other end (henceforth invalid)
2777  * @param tunnel_ctx place where local state associated
2778  *                   with the tunnel is stored
2779  */
2780 static void 
2781 tunnel_cleaner (void *cls,
2782                 const struct GNUNET_MESH_Tunnel *tunnel,
2783                 void *tunnel_ctx)
2784 {
2785   struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
2786
2787   if (tunnel != socket->tunnel)
2788     return;
2789
2790   GNUNET_break_op(0);
2791   LOG (GNUNET_ERROR_TYPE_DEBUG,
2792        "%s: Peer %s has terminated connection abruptly\n",
2793        GNUNET_i2s (&socket->other_peer),
2794        GNUNET_i2s (&socket->other_peer));
2795
2796   socket->status = GNUNET_STREAM_SHUTDOWN;
2797
2798   /* Clear Transmit handles */
2799   if (NULL != socket->transmit_handle)
2800   {
2801     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2802     socket->transmit_handle = NULL;
2803   }
2804   if (NULL != socket->ack_transmit_handle)
2805   {
2806     GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle);
2807     GNUNET_free (socket->ack_msg);
2808     socket->ack_msg = NULL;
2809     socket->ack_transmit_handle = NULL;
2810   }
2811   /* Stop Tasks using socket->tunnel */
2812   if (GNUNET_SCHEDULER_NO_TASK != socket->ack_task_id)
2813   {
2814     GNUNET_SCHEDULER_cancel (socket->ack_task_id);
2815     socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
2816   }
2817   if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
2818   {
2819     GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
2820     socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
2821   }
2822   /* FIXME: Cancel all other tasks using socket->tunnel */
2823   socket->tunnel = NULL;
2824 }
2825
2826
2827 /**
2828  * Callback to signal timeout on lockmanager lock acquire
2829  *
2830  * @param cls the ListenSocket
2831  * @param tc the scheduler task context
2832  */
2833 static void
2834 lockmanager_acquire_timeout (void *cls, 
2835                              const struct GNUNET_SCHEDULER_TaskContext *tc)
2836 {
2837   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2838   GNUNET_STREAM_ListenCallback listen_cb;
2839   void *listen_cb_cls;
2840
2841   lsocket->lockmanager_acquire_timeout_task = GNUNET_SCHEDULER_NO_TASK;
2842   listen_cb = lsocket->listen_cb;
2843   listen_cb_cls = lsocket->listen_cb_cls;
2844   GNUNET_STREAM_listen_close (lsocket);
2845   if (NULL != listen_cb)
2846     listen_cb (listen_cb_cls, NULL, NULL);  
2847 }
2848
2849
2850 /**
2851  * Callback to notify us on the status changes on app_port lock
2852  *
2853  * @param cls the ListenSocket
2854  * @param domain the domain name of the lock
2855  * @param lock the app_port
2856  * @param status the current status of the lock
2857  */
2858 static void
2859 lock_status_change_cb (void *cls, const char *domain, uint32_t lock,
2860                        enum GNUNET_LOCKMANAGER_Status status)
2861 {
2862   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2863
2864   GNUNET_assert (lock == (uint32_t) lsocket->port);
2865   if (GNUNET_LOCKMANAGER_SUCCESS == status)
2866   {
2867     lsocket->listening = GNUNET_YES;
2868     if (GNUNET_SCHEDULER_NO_TASK != lsocket->lockmanager_acquire_timeout_task)
2869     {
2870       GNUNET_SCHEDULER_cancel (lsocket->lockmanager_acquire_timeout_task);
2871       lsocket->lockmanager_acquire_timeout_task = GNUNET_SCHEDULER_NO_TASK;
2872     }
2873     if (NULL == lsocket->mesh)
2874     {
2875       GNUNET_MESH_ApplicationType ports[] = {lsocket->port, 0};
2876
2877       lsocket->mesh = GNUNET_MESH_connect (lsocket->cfg,
2878                                            RECEIVE_BUFFER_SIZE, /* FIXME: QUEUE size as parameter? */
2879                                            lsocket, /* Closure */
2880                                            &new_tunnel_notify,
2881                                            &tunnel_cleaner,
2882                                            server_message_handlers,
2883                                            ports);
2884       GNUNET_assert (NULL != lsocket->mesh);
2885       if (NULL != lsocket->listen_ok_cb)
2886       {
2887         (void) lsocket->listen_ok_cb ();
2888       }
2889     }
2890   }
2891   if (GNUNET_LOCKMANAGER_RELEASE == status)
2892     lsocket->listening = GNUNET_NO;
2893 }
2894
2895
2896 /*****************/
2897 /* API functions */
2898 /*****************/
2899
2900
2901 /**
2902  * Tries to open a stream to the target peer
2903  *
2904  * @param cfg configuration to use
2905  * @param target the target peer to which the stream has to be opened
2906  * @param app_port the application port number which uniquely identifies this
2907  *            stream
2908  * @param open_cb this function will be called after stream has be established 
2909  * @param open_cb_cls the closure for open_cb
2910  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
2911  * @return if successful it returns the stream socket; NULL if stream cannot be
2912  *         opened 
2913  */
2914 struct GNUNET_STREAM_Socket *
2915 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
2916                     const struct GNUNET_PeerIdentity *target,
2917                     GNUNET_MESH_ApplicationType app_port,
2918                     GNUNET_STREAM_OpenCallback open_cb,
2919                     void *open_cb_cls,
2920                     ...)
2921 {
2922   struct GNUNET_STREAM_Socket *socket;
2923   enum GNUNET_STREAM_Option option;
2924   GNUNET_MESH_ApplicationType ports[] = {app_port, 0};
2925   va_list vargs;                /* Variable arguments */
2926
2927   LOG (GNUNET_ERROR_TYPE_DEBUG,
2928        "%s\n", __func__);
2929   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2930   socket->other_peer = *target;
2931   socket->open_cb = open_cb;
2932   socket->open_cls = open_cb_cls;
2933   /* Set defaults */
2934   socket->retransmit_timeout = 
2935     GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
2936   socket->testing_active = GNUNET_NO;
2937   va_start (vargs, open_cb_cls); /* Parse variable args */
2938   do {
2939     option = va_arg (vargs, enum GNUNET_STREAM_Option);
2940     switch (option)
2941     {
2942     case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
2943       /* Expect struct GNUNET_TIME_Relative */
2944       socket->retransmit_timeout = va_arg (vargs,
2945                                            struct GNUNET_TIME_Relative);
2946       break;
2947     case GNUNET_STREAM_OPTION_TESTING_SET_WRITE_SEQUENCE_NUMBER:
2948       socket->testing_active = GNUNET_YES;
2949       socket->testing_set_write_sequence_number_value = va_arg (vargs,
2950                                                                 uint32_t);
2951       break;
2952     case GNUNET_STREAM_OPTION_LISTEN_TIMEOUT:
2953       GNUNET_break (0);          /* Option irrelevant in STREAM_open */
2954       break;
2955     case GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS:
2956       GNUNET_break (0);          /* Option irrelevant in STREAM_open */
2957       break;
2958     case GNUNET_STREAM_OPTION_END:
2959       break;
2960     }
2961   } while (GNUNET_STREAM_OPTION_END != option);
2962   va_end (vargs);               /* End of variable args parsing */
2963   socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
2964                                       RECEIVE_BUFFER_SIZE,  /* QUEUE size as parameter? */
2965                                       socket, /* cls */
2966                                       NULL, /* No inbound tunnel handler */
2967                                       NULL, /* No in-tunnel cleaner */
2968                                       client_message_handlers,
2969                                       ports); /* We don't get inbound tunnels */
2970   if (NULL == socket->mesh)   /* Fail if we cannot connect to mesh */
2971   {
2972     GNUNET_free (socket);
2973     return NULL;
2974   }
2975
2976   /* Now create the mesh tunnel to target */
2977   LOG (GNUNET_ERROR_TYPE_DEBUG,
2978        "Creating MESH Tunnel\n");
2979   socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
2980                                               NULL, /* Tunnel context */
2981                                               &mesh_peer_connect_callback,
2982                                               &mesh_peer_disconnect_callback,
2983                                               socket);
2984   GNUNET_assert (NULL != socket->tunnel);
2985   GNUNET_MESH_peer_request_connect_add (socket->tunnel,
2986                                         &socket->other_peer);
2987   
2988   LOG (GNUNET_ERROR_TYPE_DEBUG,
2989        "%s() END\n", __func__);
2990   return socket;
2991 }
2992
2993
2994 /**
2995  * Shutdown the stream for reading or writing (similar to man 2 shutdown).
2996  *
2997  * @param socket the stream socket
2998  * @param operation SHUT_RD, SHUT_WR or SHUT_RDWR
2999  * @param completion_cb the callback that will be called upon successful
3000  *          shutdown of given operation
3001  * @param completion_cls the closure for the completion callback
3002  * @return the shutdown handle
3003  */
3004 struct GNUNET_STREAM_ShutdownHandle *
3005 GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
3006                         int operation,
3007                         GNUNET_STREAM_ShutdownCompletion completion_cb,
3008                         void *completion_cls)
3009 {
3010   struct GNUNET_STREAM_ShutdownHandle *handle;
3011   struct GNUNET_STREAM_MessageHeader *msg;
3012   
3013   GNUNET_assert (NULL == socket->shutdown_handle);
3014
3015   handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ShutdownHandle));
3016   handle->socket = socket;
3017   handle->completion_cb = completion_cb;
3018   handle->completion_cls = completion_cls;
3019   socket->shutdown_handle = handle;
3020
3021   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
3022   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
3023   switch (operation)
3024   {
3025   case SHUT_RD:
3026     handle->operation = SHUT_RD;
3027     if (NULL != socket->read_handle)
3028       LOG (GNUNET_ERROR_TYPE_WARNING,
3029            "Existing read handle should be cancelled before shutting"
3030            " down reading\n");
3031     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
3032     queue_message (socket,
3033                    msg,
3034                    &set_state_receive_close_wait,
3035                    NULL);
3036     break;
3037   case SHUT_WR:
3038     handle->operation = SHUT_WR;
3039     if (NULL != socket->write_handle)
3040       LOG (GNUNET_ERROR_TYPE_WARNING,
3041            "Existing write handle should be cancelled before shutting"
3042            " down writing\n");
3043     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
3044     queue_message (socket,
3045                    msg,
3046                    &set_state_transmit_close_wait,
3047                    NULL);
3048     break;
3049   case SHUT_RDWR:
3050     handle->operation = SHUT_RDWR;
3051     if (NULL != socket->write_handle)
3052       LOG (GNUNET_ERROR_TYPE_WARNING,
3053            "Existing write handle should be cancelled before shutting"
3054            " down writing\n");
3055     if (NULL != socket->read_handle)
3056       LOG (GNUNET_ERROR_TYPE_WARNING,
3057            "Existing read handle should be cancelled before shutting"
3058            " down reading\n");
3059     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
3060     queue_message (socket,
3061                    msg,
3062                    &set_state_close_wait,
3063                    NULL);
3064     break;
3065   default:
3066     LOG (GNUNET_ERROR_TYPE_WARNING,
3067          "GNUNET_STREAM_shutdown called with invalid value for "
3068          "parameter operation -- Ignoring\n");
3069     GNUNET_free (msg);
3070     GNUNET_free (handle);
3071     return NULL;
3072   }
3073   handle->close_msg_retransmission_task_id =
3074     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
3075                                   &close_msg_retransmission_task,
3076                                   handle);
3077   return handle;
3078 }
3079
3080
3081 /**
3082  * Cancels a pending shutdown
3083  *
3084  * @param handle the shutdown handle returned from GNUNET_STREAM_shutdown
3085  */
3086 void
3087 GNUNET_STREAM_shutdown_cancel (struct GNUNET_STREAM_ShutdownHandle *handle)
3088 {
3089   if (GNUNET_SCHEDULER_NO_TASK != handle->close_msg_retransmission_task_id)
3090     GNUNET_SCHEDULER_cancel (handle->close_msg_retransmission_task_id);
3091   GNUNET_free (handle);
3092 }
3093
3094
3095 /**
3096  * Closes the stream
3097  *
3098  * @param socket the stream socket
3099  */
3100 void
3101 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
3102 {
3103   struct MessageQueue *head;
3104
3105   if (NULL != socket->read_handle)
3106   {
3107     LOG (GNUNET_ERROR_TYPE_WARNING,
3108          "Closing STREAM socket when a read handle is pending\n");
3109   }
3110   if (NULL != socket->write_handle)
3111   {
3112     LOG (GNUNET_ERROR_TYPE_WARNING,
3113          "Closing STREAM socket when a write handle is pending\n");
3114     GNUNET_STREAM_io_write_cancel (socket->write_handle);
3115     //socket->write_handle = NULL;
3116   }
3117
3118   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
3119   {
3120     /* socket closed with read task pending!? */
3121     GNUNET_break (0);
3122     GNUNET_SCHEDULER_cancel (socket->read_task_id);
3123     socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
3124   }
3125   
3126   /* Terminate the ack'ing tasks if they are still present */
3127   if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK)
3128   {
3129     GNUNET_SCHEDULER_cancel (socket->ack_task_id);
3130     socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
3131   }
3132
3133   /* Clear Transmit handles */
3134   if (NULL != socket->transmit_handle)
3135   {
3136     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
3137     socket->transmit_handle = NULL;
3138   }
3139   if (NULL != socket->ack_transmit_handle)
3140   {
3141     GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle);
3142     GNUNET_free (socket->ack_msg);
3143     socket->ack_msg = NULL;
3144     socket->ack_transmit_handle = NULL;
3145   }
3146
3147   /* Clear existing message queue */
3148   while (NULL != (head = socket->queue_head)) {
3149     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
3150                                  socket->queue_tail,
3151                                  head);
3152     GNUNET_free (head->message);
3153     GNUNET_free (head);
3154   }
3155
3156   /* Close associated tunnel */
3157   if (NULL != socket->tunnel)
3158   {
3159     GNUNET_MESH_tunnel_destroy (socket->tunnel);
3160     socket->tunnel = NULL;
3161   }
3162
3163   /* Close mesh connection */
3164   if (NULL != socket->mesh && NULL == socket->lsocket)
3165   {
3166     GNUNET_MESH_disconnect (socket->mesh);
3167     socket->mesh = NULL;
3168   }
3169   
3170   /* Release receive buffer */
3171   if (NULL != socket->receive_buffer)
3172   {
3173     GNUNET_free (socket->receive_buffer);
3174   }
3175
3176   GNUNET_free (socket);
3177 }
3178
3179
3180 /**
3181  * Listens for stream connections for a specific application ports
3182  *
3183  * @param cfg the configuration to use
3184  * @param app_port the application port for which new streams will be accepted
3185  * @param listen_cb this function will be called when a peer tries to establish
3186  *            a stream with us
3187  * @param listen_cb_cls closure for listen_cb
3188  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
3189  * @return listen socket, NULL for any error
3190  */
3191 struct GNUNET_STREAM_ListenSocket *
3192 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
3193                       GNUNET_MESH_ApplicationType app_port,
3194                       GNUNET_STREAM_ListenCallback listen_cb,
3195                       void *listen_cb_cls,
3196                       ...)
3197 {
3198   /* FIXME: Add variable args for passing configration options? */
3199   struct GNUNET_STREAM_ListenSocket *lsocket;
3200   struct GNUNET_TIME_Relative listen_timeout;
3201   enum GNUNET_STREAM_Option option;
3202   va_list vargs;
3203
3204   GNUNET_assert (NULL != listen_cb);
3205   lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
3206   lsocket->cfg = GNUNET_CONFIGURATION_dup (cfg);
3207   lsocket->lockmanager = GNUNET_LOCKMANAGER_connect (lsocket->cfg);
3208   if (NULL == lsocket->lockmanager)
3209   {
3210     GNUNET_CONFIGURATION_destroy (lsocket->cfg);
3211     GNUNET_free (lsocket);
3212     return NULL;
3213   }
3214   lsocket->listening = GNUNET_NO;/* We listen when we get a lock on app_port */  
3215   /* Set defaults */
3216   lsocket->retransmit_timeout = 
3217     GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
3218   lsocket->testing_active = GNUNET_NO;
3219   lsocket->listen_ok_cb = NULL;
3220   listen_timeout = TIME_REL_SECS (60); /* A minute for listen timeout */  
3221   va_start (vargs, listen_cb_cls);
3222   do {
3223     option = va_arg (vargs, enum GNUNET_STREAM_Option);
3224     switch (option)
3225     {
3226     case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
3227       lsocket->retransmit_timeout = va_arg (vargs,
3228                                             struct GNUNET_TIME_Relative);
3229       break;
3230     case GNUNET_STREAM_OPTION_TESTING_SET_WRITE_SEQUENCE_NUMBER:
3231       lsocket->testing_active = GNUNET_YES;
3232       lsocket->testing_set_write_sequence_number_value = va_arg (vargs,
3233                                                                  uint32_t);
3234       break;
3235     case GNUNET_STREAM_OPTION_LISTEN_TIMEOUT:
3236       listen_timeout = GNUNET_TIME_relative_multiply
3237         (GNUNET_TIME_UNIT_MILLISECONDS, va_arg (vargs, uint32_t));
3238       break;
3239     case GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS:
3240       lsocket->listen_ok_cb = va_arg (vargs,
3241                                       GNUNET_STREAM_ListenSuccessCallback);
3242       break;
3243     case GNUNET_STREAM_OPTION_END:
3244       break;
3245     }
3246   } while (GNUNET_STREAM_OPTION_END != option);
3247   va_end (vargs);
3248   lsocket->port = app_port;
3249   lsocket->listen_cb = listen_cb;
3250   lsocket->listen_cb_cls = listen_cb_cls;
3251   lsocket->locking_request = 
3252     GNUNET_LOCKMANAGER_acquire_lock (lsocket->lockmanager, locking_domain,
3253                                      (uint32_t) lsocket->port,
3254                                      &lock_status_change_cb, lsocket);
3255   lsocket->lockmanager_acquire_timeout_task =
3256     GNUNET_SCHEDULER_add_delayed (listen_timeout,
3257                                   &lockmanager_acquire_timeout, lsocket);
3258   return lsocket;
3259 }
3260
3261
3262 /**
3263  * Closes the listen socket
3264  *
3265  * @param lsocket the listen socket
3266  */
3267 void
3268 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
3269 {
3270   /* Close MESH connection */
3271   if (NULL != lsocket->mesh)
3272     GNUNET_MESH_disconnect (lsocket->mesh);
3273   GNUNET_CONFIGURATION_destroy (lsocket->cfg);
3274   if (GNUNET_SCHEDULER_NO_TASK != lsocket->lockmanager_acquire_timeout_task)
3275     GNUNET_SCHEDULER_cancel (lsocket->lockmanager_acquire_timeout_task);
3276   if (NULL != lsocket->locking_request)
3277     GNUNET_LOCKMANAGER_cancel_request (lsocket->locking_request);
3278   if (NULL != lsocket->lockmanager)
3279     GNUNET_LOCKMANAGER_disconnect (lsocket->lockmanager);
3280   GNUNET_free (lsocket);
3281 }
3282
3283
3284 /**
3285  * Tries to write the given data to the stream. The maximum size of data that
3286  * can be written as part of a write operation is (64 * (64000 - sizeof (struct
3287  * GNUNET_STREAM_DataMessage))). If size is greater than this it is not an API
3288  * violation, however only the said number of maximum bytes will be written.
3289  *
3290  * @param socket the socket representing a stream
3291  * @param data the data buffer from where the data is written into the stream
3292  * @param size the number of bytes to be written from the data buffer
3293  * @param timeout the timeout period
3294  * @param write_cont the function to call upon writing some bytes into the
3295  *          stream 
3296  * @param write_cont_cls the closure
3297  *
3298  * @return handle to cancel the operation; if a previous write is pending or
3299  *           the stream has been shutdown for this operation then write_cont is
3300  *           immediately called and NULL is returned.
3301  */
3302 struct GNUNET_STREAM_IOWriteHandle *
3303 GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
3304                      const void *data,
3305                      size_t size,
3306                      struct GNUNET_TIME_Relative timeout,
3307                      GNUNET_STREAM_CompletionContinuation write_cont,
3308                      void *write_cont_cls)
3309 {
3310   unsigned int num_needed_packets;
3311   unsigned int packet;
3312   struct GNUNET_STREAM_IOWriteHandle *io_handle;
3313   uint32_t packet_size;
3314   uint32_t payload_size;
3315   struct GNUNET_STREAM_DataMessage *data_msg;
3316   const void *sweep;
3317   struct GNUNET_TIME_Relative ack_deadline;
3318
3319   LOG (GNUNET_ERROR_TYPE_DEBUG,
3320        "%s\n", __func__);
3321
3322   /* Return NULL if there is already a write request pending */
3323   if (NULL != socket->write_handle)
3324   {
3325     GNUNET_break (0);
3326     return NULL;
3327   }
3328
3329   switch (socket->state)
3330   {
3331   case STATE_TRANSMIT_CLOSED:
3332   case STATE_TRANSMIT_CLOSE_WAIT:
3333   case STATE_CLOSED:
3334   case STATE_CLOSE_WAIT:
3335     if (NULL != write_cont)
3336       write_cont (write_cont_cls, GNUNET_STREAM_SHUTDOWN, 0);
3337     LOG (GNUNET_ERROR_TYPE_DEBUG,
3338          "%s() END\n", __func__);
3339     return NULL;
3340   case STATE_INIT:
3341   case STATE_LISTEN:
3342   case STATE_HELLO_WAIT:
3343     if (NULL != write_cont)
3344       /* FIXME: GNUNET_STREAM_SYSERR?? */
3345       write_cont (write_cont_cls, GNUNET_STREAM_SYSERR, 0);
3346     LOG (GNUNET_ERROR_TYPE_DEBUG,
3347          "%s() END\n", __func__);
3348     return NULL;
3349   case STATE_ESTABLISHED:
3350   case STATE_RECEIVE_CLOSED:
3351   case STATE_RECEIVE_CLOSE_WAIT:
3352     break;
3353   }
3354
3355   if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size)
3356     size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH  * max_payload_size;
3357   num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size;
3358   io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
3359   io_handle->socket = socket;
3360   io_handle->write_cont = write_cont;
3361   io_handle->write_cont_cls = write_cont_cls;
3362   io_handle->size = size;
3363   sweep = data;
3364   /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3365      determined from RTT */
3366   ack_deadline = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
3367   /* Divide the given buffer into packets for sending */
3368   for (packet=0; packet < num_needed_packets; packet++)
3369   {
3370     if ((packet + 1) * max_payload_size < size) 
3371     {
3372       payload_size = max_payload_size;
3373       packet_size = MAX_PACKET_SIZE;
3374     }
3375     else 
3376     {
3377       payload_size = size - packet * max_payload_size;
3378       packet_size =  payload_size + sizeof (struct
3379                                             GNUNET_STREAM_DataMessage); 
3380     }
3381     io_handle->messages[packet] = GNUNET_malloc (packet_size);
3382     io_handle->messages[packet]->header.header.size = htons (packet_size);
3383     io_handle->messages[packet]->header.header.type =
3384       htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
3385     io_handle->messages[packet]->sequence_number =
3386       htonl (socket->write_sequence_number++);
3387     io_handle->messages[packet]->offset = htonl (socket->write_offset);
3388
3389     /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3390        determined from RTT */
3391     io_handle->messages[packet]->ack_deadline =
3392       GNUNET_TIME_relative_hton (ack_deadline);
3393     data_msg = io_handle->messages[packet];
3394     /* Copy data from given buffer to the packet */
3395     memcpy (&data_msg[1],
3396             sweep,
3397             payload_size);
3398     sweep += payload_size;
3399     socket->write_offset += payload_size;
3400   }
3401   socket->write_handle = io_handle;
3402   write_data (socket);
3403
3404   LOG (GNUNET_ERROR_TYPE_DEBUG,
3405        "%s() END\n", __func__);
3406
3407   return io_handle;
3408 }
3409
3410
3411 /**
3412  * Tries to read data from the stream.
3413  *
3414  * @param socket the socket representing a stream
3415  * @param timeout the timeout period
3416  * @param proc function to call with data (once only)
3417  * @param proc_cls the closure for proc
3418  *
3419  * @return handle to cancel the operation; if the stream has been shutdown for
3420  *           this type of opeartion then the DataProcessor is immediately
3421  *           called with GNUNET_STREAM_SHUTDOWN as status and NULL if returned
3422  */
3423 struct GNUNET_STREAM_IOReadHandle *
3424 GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
3425                     struct GNUNET_TIME_Relative timeout,
3426                     GNUNET_STREAM_DataProcessor proc,
3427                     void *proc_cls)
3428 {
3429   struct GNUNET_STREAM_IOReadHandle *read_handle;
3430   
3431   LOG (GNUNET_ERROR_TYPE_DEBUG,
3432        "%s: %s()\n", 
3433        GNUNET_i2s (&socket->other_peer),
3434        __func__);
3435   /* Return NULL if there is already a read handle; the user has to cancel that
3436      first before continuing or has to wait until it is completed */
3437   if (NULL != socket->read_handle) 
3438     return NULL;
3439   GNUNET_assert (NULL != proc);
3440   switch (socket->state)
3441   {
3442   case STATE_RECEIVE_CLOSED:
3443   case STATE_RECEIVE_CLOSE_WAIT:
3444   case STATE_CLOSED:
3445   case STATE_CLOSE_WAIT:
3446     proc (proc_cls, GNUNET_STREAM_SHUTDOWN, NULL, 0);
3447     LOG (GNUNET_ERROR_TYPE_DEBUG,
3448          "%s: %s() END\n",
3449          GNUNET_i2s (&socket->other_peer),
3450          __func__);
3451     return NULL;
3452   default:
3453     break;
3454   }
3455   read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
3456   read_handle->proc = proc;
3457   read_handle->proc_cls = proc_cls;
3458   read_handle->socket = socket;
3459   socket->read_handle = read_handle;
3460   /* Check if we have a packet at bitmap 0 */
3461   if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
3462                                           0))
3463   {
3464     socket->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
3465                                                      socket);   
3466   }
3467   /* Setup the read timeout task */
3468   socket->read_io_timeout_task_id =
3469     GNUNET_SCHEDULER_add_delayed (timeout,
3470                                   &read_io_timeout,
3471                                   socket);
3472   LOG (GNUNET_ERROR_TYPE_DEBUG,
3473        "%s: %s() END\n",
3474        GNUNET_i2s (&socket->other_peer),
3475        __func__);
3476   return read_handle;
3477 }
3478
3479
3480 /**
3481  * Cancel pending write operation.
3482  *
3483  * @param ioh handle to operation to cancel
3484  */
3485 void
3486 GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
3487 {
3488   struct GNUNET_STREAM_Socket *socket = ioh->socket;
3489   unsigned int packet;
3490
3491   GNUNET_assert (NULL != socket->write_handle);
3492   GNUNET_assert (socket->write_handle == ioh);
3493
3494   if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
3495   {
3496     GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
3497     socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
3498   }
3499
3500   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
3501   {
3502     if (NULL == ioh->messages[packet]) break;
3503     GNUNET_free (ioh->messages[packet]);
3504   }
3505       
3506   GNUNET_free (socket->write_handle);
3507   socket->write_handle = NULL;
3508 }
3509
3510
3511 /**
3512  * Cancel pending read operation.
3513  *
3514  * @param ioh handle to operation to cancel
3515  */
3516 void
3517 GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh)
3518 {
3519   struct GNUNET_STREAM_Socket *socket;
3520   
3521   socket = ioh->socket;
3522   GNUNET_assert (NULL != socket->read_handle);
3523   GNUNET_assert (ioh == socket->read_handle);
3524   /* Read io time task should be there; if it is already executed then this
3525   read handle is not valid */
3526   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != socket->read_io_timeout_task_id);
3527   GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task_id);
3528   socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
3529   /* reading task may be present; if so we have to stop it */
3530   if (GNUNET_SCHEDULER_NO_TASK != socket->read_task_id)
3531   {
3532     GNUNET_SCHEDULER_cancel (socket->read_task_id);
3533     socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
3534   }
3535   GNUNET_free (ioh);
3536   socket->read_handle = NULL;
3537 }
3538
3539 /* end of stream_api.c */