-doxygen
[oweals/gnunet.git] / src / stream / stream_api.c
1 /*
2   This file is part of GNUnet.
3   (C) 2012 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /* TODO:
22  *
23  * Checks for matching the sender and socket->other_peer in server
24  * message handlers  
25  *
26  * Add code for write io timeout
27  *
28  * Include retransmission for control messages
29  **/
30
31 /**
32  * @file stream/stream_api.c
33  * @brief Implementation of the stream library
34  * @author Sree Harsha Totakura
35  */
36
37
38 #include "platform.h"
39 #include "gnunet_common.h"
40 #include "gnunet_crypto_lib.h"
41 #include "gnunet_lockmanager_service.h"
42 #include "gnunet_stream_lib.h"
43 #include "stream_protocol.h"
44
45 #define LOG(kind,...)                                   \
46   GNUNET_log_from (kind, "stream-api", __VA_ARGS__)
47
48 #define TIME_REL_SECS(sec) \
49   GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, sec)
50
51 /**
52  * The maximum packet size of a stream packet
53  */
54 #define MAX_PACKET_SIZE 512//64000
55
56 /**
57  * Receive buffer
58  */
59 #define RECEIVE_BUFFER_SIZE 4096000
60
61 /**
62  * The maximum payload a data message packet can carry
63  */
64 static const size_t max_payload_size = 
65   MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
66
67 /**
68  * states in the Protocol
69  */
70 enum State
71   {
72     /**
73      * Client initialization state
74      */
75     STATE_INIT,
76
77     /**
78      * Listener initialization state 
79      */
80     STATE_LISTEN,
81
82     /**
83      * Pre-connection establishment state
84      */
85     STATE_HELLO_WAIT,
86
87     /**
88      * State where a connection has been established
89      */
90     STATE_ESTABLISHED,
91
92     /**
93      * State where the socket is closed on our side and waiting to be ACK'ed
94      */
95     STATE_RECEIVE_CLOSE_WAIT,
96
97     /**
98      * State where the socket is closed for reading
99      */
100     STATE_RECEIVE_CLOSED,
101
102     /**
103      * State where the socket is closed on our side and waiting to be ACK'ed
104      */
105     STATE_TRANSMIT_CLOSE_WAIT,
106
107     /**
108      * State where the socket is closed for writing
109      */
110     STATE_TRANSMIT_CLOSED,
111
112     /**
113      * State where the socket is closed on our side and waiting to be ACK'ed
114      */
115     STATE_CLOSE_WAIT,
116
117     /**
118      * State where the socket is closed
119      */
120     STATE_CLOSED 
121   };
122
123
124 /**
125  * Functions of this type are called when a message is written
126  *
127  * @param cls the closure from queue_message
128  * @param socket the socket the written message was bound to
129  */
130 typedef void (*SendFinishCallback) (void *cls,
131                                     struct GNUNET_STREAM_Socket *socket);
132
133
134 /**
135  * The send message queue
136  */
137 struct MessageQueue
138 {
139   /**
140    * The message
141    */
142   struct GNUNET_STREAM_MessageHeader *message;
143
144   /**
145    * Callback to be called when the message is sent
146    */
147   SendFinishCallback finish_cb;
148
149   /**
150    * The closure for finish_cb
151    */
152   void *finish_cb_cls;
153
154   /**
155    * The next message in queue. Should be NULL in the last message
156    */
157   struct MessageQueue *next;
158
159   /**
160    * The next message in queue. Should be NULL in the first message
161    */
162   struct MessageQueue *prev;
163 };
164
165
166 /**
167  * The STREAM Socket Handler
168  */
169 struct GNUNET_STREAM_Socket
170 {
171   /**
172    * Retransmission timeout
173    */
174   struct GNUNET_TIME_Relative retransmit_timeout;
175
176   /**
177    * The Acknowledgement Bitmap
178    */
179   GNUNET_STREAM_AckBitmap ack_bitmap;
180
181   /**
182    * Time when the Acknowledgement was queued
183    */
184   struct GNUNET_TIME_Absolute ack_time_registered;
185
186   /**
187    * Queued Acknowledgement deadline
188    */
189   struct GNUNET_TIME_Relative ack_time_deadline;
190
191   /**
192    * The mesh handle
193    */
194   struct GNUNET_MESH_Handle *mesh;
195
196   /**
197    * The mesh tunnel handle
198    */
199   struct GNUNET_MESH_Tunnel *tunnel;
200
201   /**
202    * Stream open closure
203    */
204   void *open_cls;
205
206   /**
207    * Stream open callback
208    */
209   GNUNET_STREAM_OpenCallback open_cb;
210
211   /**
212    * The current transmit handle (if a pending transmit request exists)
213    */
214   struct GNUNET_MESH_TransmitHandle *transmit_handle;
215
216   /**
217    * The current act transmit handle (if a pending ack transmit request exists)
218    */
219   struct GNUNET_MESH_TransmitHandle *ack_transmit_handle;
220
221   /**
222    * Pointer to the current ack message using in ack_task
223    */
224   struct GNUNET_STREAM_AckMessage *ack_msg;
225
226   /**
227    * The current message associated with the transmit handle
228    */
229   struct MessageQueue *queue_head;
230
231   /**
232    * The queue tail, should always point to the last message in queue
233    */
234   struct MessageQueue *queue_tail;
235
236   /**
237    * The write IO_handle associated with this socket
238    */
239   struct GNUNET_STREAM_IOWriteHandle *write_handle;
240
241   /**
242    * The read IO_handle associated with this socket
243    */
244   struct GNUNET_STREAM_IOReadHandle *read_handle;
245
246   /**
247    * The shutdown handle associated with this socket
248    */
249   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
250
251   /**
252    * Buffer for storing received messages
253    */
254   void *receive_buffer;
255
256   /**
257    * The listen socket from which this socket is derived. Should be NULL if it
258    * is not a derived socket
259    */
260   struct GNUNET_STREAM_ListenSocket *lsocket;
261
262   /**
263    * The peer identity of the peer at the other end of the stream
264    */
265   struct GNUNET_PeerIdentity other_peer;
266
267   /**
268    * Task identifier for the read io timeout task
269    */
270   GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id;
271
272   /**
273    * Task identifier for retransmission task after timeout
274    */
275   GNUNET_SCHEDULER_TaskIdentifier retransmission_timeout_task_id;
276
277   /**
278    * The task for sending timely Acks
279    */
280   GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
281
282   /**
283    * Task scheduled to continue a read operation.
284    */
285   GNUNET_SCHEDULER_TaskIdentifier read_task_id;
286
287   /**
288    * The state of the protocol associated with this socket
289    */
290   enum State state;
291
292   /**
293    * The status of the socket
294    */
295   enum GNUNET_STREAM_Status status;
296
297   /**
298    * The number of previous timeouts; FIXME: currently not used
299    */
300   unsigned int retries;
301
302   /**
303    * The application port number (type: uint32_t)
304    */
305   GNUNET_MESH_ApplicationType app_port;
306
307   /**
308    * Whether testing mode is active or not
309    */
310   int testing_active;
311
312   /**
313    * The write sequence number to be set incase of testing
314    */
315   uint32_t testing_set_write_sequence_number_value;
316
317   /**
318    * The session id associated with this stream connection
319    * FIXME: Not used currently, may be removed
320    */
321   uint32_t session_id;
322
323   /**
324    * Write sequence number. Set to random when sending HELLO(client) and
325    * HELLO_ACK(server) 
326    */
327   uint32_t write_sequence_number;
328
329   /**
330    * Read sequence number. This number's value is determined during handshake
331    */
332   uint32_t read_sequence_number;
333
334   /**
335    * The receiver buffer size
336    */
337   uint32_t receive_buffer_size;
338
339   /**
340    * The receiver buffer boundaries
341    */
342   uint32_t receive_buffer_boundaries[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
343
344   /**
345    * receiver's available buffer after the last acknowledged packet
346    */
347   uint32_t receiver_window_available;
348
349   /**
350    * The offset pointer used during write operation
351    */
352   uint32_t write_offset;
353
354   /**
355    * The offset after which we are expecting data
356    */
357   uint32_t read_offset;
358
359   /**
360    * The offset upto which user has read from the received buffer
361    */
362   uint32_t copy_offset;
363 };
364
365
366 /**
367  * A socket for listening
368  */
369 struct GNUNET_STREAM_ListenSocket
370 {
371   /**
372    * The mesh handle
373    */
374   struct GNUNET_MESH_Handle *mesh;
375
376   /**
377    * Our configuration
378    */
379   struct GNUNET_CONFIGURATION_Handle *cfg;
380
381   /**
382    * Handle to the lock manager service
383    */
384   struct GNUNET_LOCKMANAGER_Handle *lockmanager;
385
386   /**
387    * The active LockingRequest from lockmanager
388    */
389   struct GNUNET_LOCKMANAGER_LockingRequest *locking_request;
390
391   /**
392    * The callback function which is called after successful opening socket
393    */
394   GNUNET_STREAM_ListenCallback listen_cb;
395
396   /**
397    * The call back closure
398    */
399   void *listen_cb_cls;
400
401   /**
402    * The service port
403    */
404   GNUNET_MESH_ApplicationType port;
405   
406   /**
407    * The id of the lockmanager timeout task
408    */
409   GNUNET_SCHEDULER_TaskIdentifier lockmanager_acquire_timeout_task;
410
411   /**
412    * The retransmit timeout
413    */
414   struct GNUNET_TIME_Relative retransmit_timeout;
415   
416   /**
417    * Listen enabled?
418    */
419   int listening;
420
421   /**
422    * Whether testing mode is active or not
423    */
424   int testing_active;
425
426   /**
427    * The write sequence number to be set incase of testing
428    */
429   uint32_t testing_set_write_sequence_number_value;
430 };
431
432
433 /**
434  * The IO Write Handle
435  */
436 struct GNUNET_STREAM_IOWriteHandle
437 {
438   /**
439    * The socket to which this write handle is associated
440    */
441   struct GNUNET_STREAM_Socket *socket;
442
443   /**
444    * The packet_buffers associated with this Handle
445    */
446   struct GNUNET_STREAM_DataMessage *messages[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
447
448   /**
449    * The write continuation callback
450    */
451   GNUNET_STREAM_CompletionContinuation write_cont;
452
453   /**
454    * Write continuation closure
455    */
456   void *write_cont_cls;
457
458   /**
459    * The bitmap of this IOHandle; Corresponding bit for a message is set when
460    * it has been acknowledged by the receiver
461    */
462   GNUNET_STREAM_AckBitmap ack_bitmap;
463
464   /**
465    * Number of bytes in this write handle
466    */
467   size_t size;
468 };
469
470
471 /**
472  * The IO Read Handle
473  */
474 struct GNUNET_STREAM_IOReadHandle
475 {
476   /**
477    * Callback for the read processor
478    */
479   GNUNET_STREAM_DataProcessor proc;
480
481   /**
482    * The closure pointer for the read processor callback
483    */
484   void *proc_cls;
485 };
486
487
488 /**
489  * Handle for Shutdown
490  */
491 struct GNUNET_STREAM_ShutdownHandle
492 {
493   /**
494    * The socket associated with this shutdown handle
495    */
496   struct GNUNET_STREAM_Socket *socket;
497
498   /**
499    * Shutdown completion callback
500    */
501   GNUNET_STREAM_ShutdownCompletion completion_cb;
502
503   /**
504    * Closure for completion callback
505    */
506   void *completion_cls;
507
508   /**
509    * Close message retransmission task id
510    */
511   GNUNET_SCHEDULER_TaskIdentifier close_msg_retransmission_task_id;
512
513   /**
514    * Which operation to shutdown? SHUT_RD, SHUT_WR or SHUT_RDWR
515    */
516   int operation;  
517 };
518
519
520 /**
521  * Default value in seconds for various timeouts
522  */
523 static const unsigned int default_timeout = 10;
524
525 /**
526  * The domain name for locks we use here
527  */
528 static const char *locking_domain = "GNUNET_STREAM_APPLOCK";
529
530
531 /**
532  * Callback function for sending queued message
533  *
534  * @param cls closure the socket
535  * @param size number of bytes available in buf
536  * @param buf where the callee should write the message
537  * @return number of bytes written to buf
538  */
539 static size_t
540 send_message_notify (void *cls, size_t size, void *buf)
541 {
542   struct GNUNET_STREAM_Socket *socket = cls;
543   struct MessageQueue *head;
544   size_t ret;
545
546   socket->transmit_handle = NULL; /* Remove the transmit handle */
547   head = socket->queue_head;
548   if (NULL == head)
549     return 0; /* just to be safe */
550   if (0 == size)                /* request timed out */
551   {
552     socket->retries++;
553     LOG (GNUNET_ERROR_TYPE_DEBUG,
554          "%s: Message sending timed out. Retry %d \n",
555          GNUNET_i2s (&socket->other_peer),
556          socket->retries);
557     socket->transmit_handle = 
558       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
559                                          0, /* Corking */
560                                          1, /* Priority */
561                                          /* FIXME: exponential backoff */
562                                          socket->retransmit_timeout,
563                                          &socket->other_peer,
564                                          ntohs (head->message->header.size),
565                                          &send_message_notify,
566                                          socket);
567     return 0;
568   }
569
570   ret = ntohs (head->message->header.size);
571   GNUNET_assert (size >= ret);
572   memcpy (buf, head->message, ret);
573   if (NULL != head->finish_cb)
574   {
575     head->finish_cb (head->finish_cb_cls, socket);
576   }
577   GNUNET_CONTAINER_DLL_remove (socket->queue_head,
578                                socket->queue_tail,
579                                head);
580   GNUNET_free (head->message);
581   GNUNET_free (head);
582   head = socket->queue_head;
583   if (NULL != head)    /* more pending messages to send */
584   {
585     socket->retries = 0;
586     socket->transmit_handle = 
587       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
588                                          0, /* Corking */
589                                          1, /* Priority */
590                                          /* FIXME: exponential backoff */
591                                          socket->retransmit_timeout,
592                                          &socket->other_peer,
593                                          ntohs (head->message->header.size),
594                                          &send_message_notify,
595                                          socket);
596   }
597   return ret;
598 }
599
600
601 /**
602  * Queues a message for sending using the mesh connection of a socket
603  *
604  * @param socket the socket whose mesh connection is used
605  * @param message the message to be sent
606  * @param finish_cb the callback to be called when the message is sent
607  * @param finish_cb_cls the closure for the callback
608  */
609 static void
610 queue_message (struct GNUNET_STREAM_Socket *socket,
611                struct GNUNET_STREAM_MessageHeader *message,
612                SendFinishCallback finish_cb,
613                void *finish_cb_cls)
614 {
615   struct MessageQueue *queue_entity;
616
617   GNUNET_assert 
618     ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA)
619      && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK));
620
621   LOG (GNUNET_ERROR_TYPE_DEBUG,
622        "%s: Queueing message of type %d and size %d\n",
623        GNUNET_i2s (&socket->other_peer),
624        ntohs (message->header.type),
625        ntohs (message->header.size));
626   GNUNET_assert (NULL != message);
627   queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
628   queue_entity->message = message;
629   queue_entity->finish_cb = finish_cb;
630   queue_entity->finish_cb_cls = finish_cb_cls;
631   GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
632                                     socket->queue_tail,
633                                     queue_entity);
634   if (NULL == socket->transmit_handle)
635   {
636     socket->retries = 0;
637     socket->transmit_handle = 
638       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
639                                          0, /* Corking */
640                                          1, /* Priority */
641                                          socket->retransmit_timeout,
642                                          &socket->other_peer,
643                                          ntohs (message->header.size),
644                                          &send_message_notify,
645                                          socket);
646   }
647 }
648
649
650 /**
651  * Copies a message and queues it for sending using the mesh connection of
652  * given socket 
653  *
654  * @param socket the socket whose mesh connection is used
655  * @param message the message to be sent
656  * @param finish_cb the callback to be called when the message is sent
657  * @param finish_cb_cls the closure for the callback
658  */
659 static void
660 copy_and_queue_message (struct GNUNET_STREAM_Socket *socket,
661                         const struct GNUNET_STREAM_MessageHeader *message,
662                         SendFinishCallback finish_cb,
663                         void *finish_cb_cls)
664 {
665   struct GNUNET_STREAM_MessageHeader *msg_copy;
666   uint16_t size;
667   
668   size = ntohs (message->header.size);
669   msg_copy = GNUNET_malloc (size);
670   memcpy (msg_copy, message, size);
671   queue_message (socket, msg_copy, finish_cb, finish_cb_cls);
672 }
673
674
675 /**
676  * Callback function for sending ack message
677  *
678  * @param cls closure the ACK message created in ack_task
679  * @param size number of bytes available in buffer
680  * @param buf where the callee should write the message
681  * @return number of bytes written to buf
682  */
683 static size_t
684 send_ack_notify (void *cls, size_t size, void *buf)
685 {
686   struct GNUNET_STREAM_Socket *socket = cls;
687
688   if (0 == size)
689   {
690     LOG (GNUNET_ERROR_TYPE_DEBUG,
691          "%s called with size 0\n", __func__);
692     return 0;
693   }
694   GNUNET_assert (ntohs (socket->ack_msg->header.header.size) <= size);
695   
696   size = ntohs (socket->ack_msg->header.header.size);
697   memcpy (buf, socket->ack_msg, size);
698   
699   GNUNET_free (socket->ack_msg);
700   socket->ack_msg = NULL;
701   socket->ack_transmit_handle = NULL;
702   return size;
703 }
704
705 /**
706  * Writes data using the given socket. The amount of data written is limited by
707  * the receiver_window_size
708  *
709  * @param socket the socket to use
710  */
711 static void 
712 write_data (struct GNUNET_STREAM_Socket *socket);
713
714 /**
715  * Task for retransmitting data messages if they aren't ACK before their ack
716  * deadline 
717  *
718  * @param cls the socket
719  * @param tc the Task context
720  */
721 static void
722 retransmission_timeout_task (void *cls,
723                              const struct GNUNET_SCHEDULER_TaskContext *tc)
724 {
725   struct GNUNET_STREAM_Socket *socket = cls;
726   
727   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
728     return;
729
730   LOG (GNUNET_ERROR_TYPE_DEBUG,
731        "%s: Retransmitting DATA...\n", GNUNET_i2s (&socket->other_peer));
732   socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
733   write_data (socket);
734 }
735
736
737 /**
738  * Task for sending ACK message
739  *
740  * @param cls the socket
741  * @param tc the Task context
742  */
743 static void
744 ack_task (void *cls,
745           const struct GNUNET_SCHEDULER_TaskContext *tc)
746 {
747   struct GNUNET_STREAM_Socket *socket = cls;
748   struct GNUNET_STREAM_AckMessage *ack_msg;
749
750   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
751   {
752     return;
753   }
754   socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
755   /* Create the ACK Message */
756   ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
757   ack_msg->header.header.size = htons (sizeof (struct 
758                                                GNUNET_STREAM_AckMessage));
759   ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
760   ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
761   ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
762   ack_msg->receive_window_remaining = 
763     htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
764   socket->ack_msg = ack_msg;
765   /* Request MESH for sending ACK */
766   socket->ack_transmit_handle = 
767     GNUNET_MESH_notify_transmit_ready (socket->tunnel,
768                                        0, /* Corking */
769                                        1, /* Priority */
770                                        socket->retransmit_timeout,
771                                        &socket->other_peer,
772                                        ntohs (ack_msg->header.header.size),
773                                        &send_ack_notify,
774                                        socket);
775 }
776
777
778 /**
779  * Retransmission task for shutdown messages
780  *
781  * @param cls the shutdown handle
782  * @param tc the Task Context
783  */
784 static void
785 close_msg_retransmission_task (void *cls,
786                                const struct GNUNET_SCHEDULER_TaskContext *tc)
787 {
788   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle = cls;
789   struct GNUNET_STREAM_MessageHeader *msg;
790   struct GNUNET_STREAM_Socket *socket;
791
792   GNUNET_assert (NULL != shutdown_handle);
793   socket = shutdown_handle->socket;
794
795   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
796   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
797   switch (shutdown_handle->operation)
798   {
799   case SHUT_RDWR:
800     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
801     break;
802   case SHUT_RD:
803     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
804     break;
805   case SHUT_WR:
806     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
807     break;
808   default:
809     GNUNET_free (msg);
810     shutdown_handle->close_msg_retransmission_task_id = 
811       GNUNET_SCHEDULER_NO_TASK;
812     return;
813   }
814   queue_message (socket, msg, NULL, NULL);
815   shutdown_handle->close_msg_retransmission_task_id =
816     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
817                                   &close_msg_retransmission_task,
818                                   shutdown_handle);
819 }
820
821
822 /**
823  * Function to modify a bit in GNUNET_STREAM_AckBitmap
824  *
825  * @param bitmap the bitmap to modify
826  * @param bit the bit number to modify
827  * @param value GNUNET_YES to on, GNUNET_NO to off
828  */
829 static void
830 ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
831                       unsigned int bit, 
832                       int value)
833 {
834   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
835   if (GNUNET_YES == value)
836     *bitmap |= (1LL << bit);
837   else
838     *bitmap &= ~(1LL << bit);
839 }
840
841
842 /**
843  * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
844  *
845  * @param bitmap address of the bitmap that has to be checked
846  * @param bit the bit number to check
847  * @return GNUNET_YES if the bit is set; GNUNET_NO if not
848  */
849 static uint8_t
850 ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
851                       unsigned int bit)
852 {
853   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
854   return 0 != (*bitmap & (1LL << bit));
855 }
856
857
858 /**
859  * Writes data using the given socket. The amount of data written is limited by
860  * the receiver_window_size
861  *
862  * @param socket the socket to use
863  */
864 static void 
865 write_data (struct GNUNET_STREAM_Socket *socket)
866 {
867   struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
868   int packet;                   /* Although an int, should never be negative */
869   int ack_packet;
870
871   ack_packet = -1;
872   /* Find the last acknowledged packet */
873   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
874   {      
875     if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
876                                             packet))
877       ack_packet = packet;        
878     else if (NULL == io_handle->messages[packet])
879       break;
880   }
881   /* Resend packets which weren't ack'ed */
882   for (packet=0; packet < ack_packet; packet++)
883   {
884     if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
885                                            packet))
886     {
887       LOG (GNUNET_ERROR_TYPE_DEBUG,
888            "%s: Placing DATA message with sequence %u in send queue\n",
889            GNUNET_i2s (&socket->other_peer),
890            ntohl (io_handle->messages[packet]->sequence_number));
891       copy_and_queue_message (socket,
892                               &io_handle->messages[packet]->header,
893                               NULL,
894                               NULL);
895     }
896   }
897   packet = ack_packet + 1;
898   /* Now send new packets if there is enough buffer space */
899   while ( (NULL != io_handle->messages[packet]) &&
900           (socket->receiver_window_available 
901            >= ntohs (io_handle->messages[packet]->header.header.size)) &&
902           (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
903   {
904     socket->receiver_window_available -= 
905       ntohs (io_handle->messages[packet]->header.header.size);
906     LOG (GNUNET_ERROR_TYPE_DEBUG,
907          "%s: Placing DATA message with sequence %u in send queue\n",
908          GNUNET_i2s (&socket->other_peer),
909          ntohl (io_handle->messages[packet]->sequence_number));
910     copy_and_queue_message (socket,
911                             &io_handle->messages[packet]->header,
912                             NULL,
913                             NULL);
914     packet++;
915   }
916   if (GNUNET_SCHEDULER_NO_TASK == socket->retransmission_timeout_task_id)
917     socket->retransmission_timeout_task_id = 
918       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 
919                                     (GNUNET_TIME_UNIT_SECONDS, 8),
920                                     &retransmission_timeout_task,
921                                     socket);
922 }
923
924
925 /**
926  * Task for calling the read processor
927  *
928  * @param cls the socket
929  * @param tc the task context
930  */
931 static void
932 call_read_processor (void *cls,
933                      const struct GNUNET_SCHEDULER_TaskContext *tc)
934 {
935   struct GNUNET_STREAM_Socket *socket = cls;
936   size_t read_size;
937   size_t valid_read_size;
938   unsigned int packet;
939   uint32_t sequence_increase;
940   uint32_t offset_increase;
941
942   socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
943   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
944     return;
945
946   if (NULL == socket->receive_buffer) 
947     return;
948
949   GNUNET_assert (NULL != socket->read_handle);
950   GNUNET_assert (NULL != socket->read_handle->proc);
951
952   /* Check the bitmap for any holes */
953   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
954   {
955     if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
956                                            packet))
957       break;
958   }
959   /* We only call read processor if we have the first packet */
960   GNUNET_assert (0 < packet);
961   valid_read_size = 
962     socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
963   GNUNET_assert (0 != valid_read_size);
964   /* Cancel the read_io_timeout_task */
965   GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task_id);
966   socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
967   /* Call the data processor */
968   LOG (GNUNET_ERROR_TYPE_DEBUG,
969        "%s: Calling read processor\n",
970        GNUNET_i2s (&socket->other_peer));
971   read_size = 
972     socket->read_handle->proc (socket->read_handle->proc_cls,
973                                socket->status,
974                                socket->receive_buffer + socket->copy_offset,
975                                valid_read_size);
976   LOG (GNUNET_ERROR_TYPE_DEBUG,
977        "%s: Read processor read %d bytes\n",
978        GNUNET_i2s (&socket->other_peer), read_size);
979   LOG (GNUNET_ERROR_TYPE_DEBUG,
980        "%s: Read processor completed successfully\n",
981        GNUNET_i2s (&socket->other_peer));
982   /* Free the read handle */
983   GNUNET_free (socket->read_handle);
984   socket->read_handle = NULL;
985   GNUNET_assert (read_size <= valid_read_size);
986   socket->copy_offset += read_size;
987   /* Determine upto which packet we can remove from the buffer */
988   for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
989   {
990     if (socket->copy_offset == socket->receive_buffer_boundaries[packet])
991     { packet++; break; }
992     if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
993       break;
994   }
995
996   /* If no packets can be removed we can't move the buffer */
997   if (0 == packet) return;
998   sequence_increase = packet;
999   LOG (GNUNET_ERROR_TYPE_DEBUG,
1000        "%s: Sequence increase after read processor completion: %u\n",
1001        GNUNET_i2s (&socket->other_peer), sequence_increase);
1002
1003   /* Shift the data in the receive buffer */
1004   socket->receive_buffer = 
1005     memmove (socket->receive_buffer,
1006              socket->receive_buffer 
1007              + socket->receive_buffer_boundaries[sequence_increase-1],
1008              socket->receive_buffer_size
1009              - socket->receive_buffer_boundaries[sequence_increase-1]);
1010   /* Shift the bitmap */
1011   socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
1012   /* Set read_sequence_number */
1013   socket->read_sequence_number += sequence_increase;
1014   /* Set read_offset */
1015   offset_increase = socket->receive_buffer_boundaries[sequence_increase-1];
1016   socket->read_offset += offset_increase;
1017   /* Fix copy_offset */
1018   GNUNET_assert (offset_increase <= socket->copy_offset);
1019   socket->copy_offset -= offset_increase;
1020   /* Fix relative boundaries */
1021   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
1022   {
1023     if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase)
1024     {
1025       uint32_t ahead_buffer_boundary;
1026
1027       ahead_buffer_boundary = 
1028         socket->receive_buffer_boundaries[packet + sequence_increase];
1029       if (0 == ahead_buffer_boundary)
1030         socket->receive_buffer_boundaries[packet] = 0;
1031       else
1032       {
1033         GNUNET_assert (offset_increase < ahead_buffer_boundary);
1034         socket->receive_buffer_boundaries[packet] = 
1035           ahead_buffer_boundary - offset_increase;
1036       }
1037     }
1038     else
1039       socket->receive_buffer_boundaries[packet] = 0;
1040   }
1041 }
1042
1043
1044 /**
1045  * Cancels the existing read io handle
1046  *
1047  * @param cls the closure from the SCHEDULER call
1048  * @param tc the task context
1049  */
1050 static void
1051 read_io_timeout (void *cls, 
1052                  const struct GNUNET_SCHEDULER_TaskContext *tc)
1053 {
1054   struct GNUNET_STREAM_Socket *socket = cls;
1055   GNUNET_STREAM_DataProcessor proc;
1056   void *proc_cls;
1057
1058   socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
1059   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
1060   {
1061     LOG (GNUNET_ERROR_TYPE_DEBUG,
1062          "%s: Read task timedout - Cancelling it\n",
1063          GNUNET_i2s (&socket->other_peer));
1064     GNUNET_SCHEDULER_cancel (socket->read_task_id);
1065     socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
1066   }
1067   GNUNET_assert (NULL != socket->read_handle);
1068   proc = socket->read_handle->proc;
1069   proc_cls = socket->read_handle->proc_cls;
1070
1071   GNUNET_free (socket->read_handle);
1072   socket->read_handle = NULL;
1073   /* Call the read processor to signal timeout */
1074   proc (proc_cls,
1075         GNUNET_STREAM_TIMEOUT,
1076         NULL,
1077         0);
1078 }
1079
1080
1081 /**
1082  * Handler for DATA messages; Same for both client and server
1083  *
1084  * @param socket the socket through which the ack was received
1085  * @param tunnel connection to the other end
1086  * @param sender who sent the message
1087  * @param msg the data message
1088  * @param atsi performance data for the connection
1089  * @return GNUNET_OK to keep the connection open,
1090  *         GNUNET_SYSERR to close it (signal serious error)
1091  */
1092 static int
1093 handle_data (struct GNUNET_STREAM_Socket *socket,
1094              struct GNUNET_MESH_Tunnel *tunnel,
1095              const struct GNUNET_PeerIdentity *sender,
1096              const struct GNUNET_STREAM_DataMessage *msg,
1097              const struct GNUNET_ATS_Information*atsi)
1098 {
1099   const void *payload;
1100   uint32_t bytes_needed;
1101   uint32_t relative_offset;
1102   uint32_t relative_sequence_number;
1103   uint16_t size;
1104
1105   size = htons (msg->header.header.size);
1106   if (size < sizeof (struct GNUNET_STREAM_DataMessage))
1107   {
1108     GNUNET_break_op (0);
1109     return GNUNET_SYSERR;
1110   }
1111
1112   if (0 != memcmp (sender,
1113                    &socket->other_peer,
1114                    sizeof (struct GNUNET_PeerIdentity)))
1115   {
1116     LOG (GNUNET_ERROR_TYPE_DEBUG,
1117          "%s: Received DATA from non-confirming peer\n",
1118          GNUNET_i2s (&socket->other_peer));
1119     return GNUNET_YES;
1120   }
1121
1122   switch (socket->state)
1123   {
1124   case STATE_ESTABLISHED:
1125   case STATE_TRANSMIT_CLOSED:
1126   case STATE_TRANSMIT_CLOSE_WAIT:
1127       
1128     /* check if the message's sequence number is in the range we are
1129        expecting */
1130     relative_sequence_number = 
1131       ntohl (msg->sequence_number) - socket->read_sequence_number;
1132     if ( relative_sequence_number > GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
1133     {
1134       LOG (GNUNET_ERROR_TYPE_DEBUG,
1135            "%s: Ignoring received message with sequence number %u\n",
1136            GNUNET_i2s (&socket->other_peer),
1137            ntohl (msg->sequence_number));
1138       /* Start ACK sending task if one is not already present */
1139       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1140       {
1141         socket->ack_task_id = 
1142           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1143                                         (msg->ack_deadline),
1144                                         &ack_task,
1145                                         socket);
1146       }
1147       return GNUNET_YES;
1148     }
1149       
1150     /* Check if we have already seen this message */
1151     if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
1152                                             relative_sequence_number))
1153     {
1154       LOG (GNUNET_ERROR_TYPE_DEBUG,
1155            "%s: Ignoring already received message with sequence number %u\n",
1156            GNUNET_i2s (&socket->other_peer),
1157            ntohl (msg->sequence_number));
1158       /* Start ACK sending task if one is not already present */
1159       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1160       {
1161         socket->ack_task_id = 
1162           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1163                                         (msg->ack_deadline),
1164                                         &ack_task,
1165                                         socket);
1166       }
1167       return GNUNET_YES;
1168     }
1169
1170     LOG (GNUNET_ERROR_TYPE_DEBUG,
1171          "%s: Receiving DATA with sequence number: %u and size: %d from %s\n",
1172          GNUNET_i2s (&socket->other_peer),
1173          ntohl (msg->sequence_number),
1174          ntohs (msg->header.header.size),
1175          GNUNET_i2s (&socket->other_peer));
1176       
1177     /* Check if we have to allocate the buffer */
1178     size -= sizeof (struct GNUNET_STREAM_DataMessage);
1179     relative_offset = ntohl (msg->offset) - socket->read_offset;
1180     bytes_needed = relative_offset + size;
1181     if (bytes_needed > socket->receive_buffer_size)
1182     {
1183       if (bytes_needed <= RECEIVE_BUFFER_SIZE)
1184       {
1185         socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
1186                                                  bytes_needed);
1187         socket->receive_buffer_size = bytes_needed;
1188       }
1189       else
1190       {
1191         LOG (GNUNET_ERROR_TYPE_DEBUG,
1192              "%s: Cannot accommodate packet %d as buffer is full\n",
1193              GNUNET_i2s (&socket->other_peer),
1194              ntohl (msg->sequence_number));
1195         return GNUNET_YES;
1196       }
1197     }
1198       
1199     /* Copy Data to buffer */
1200     payload = &msg[1];
1201     GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
1202     memcpy (socket->receive_buffer + relative_offset,
1203             payload,
1204             size);
1205     socket->receive_buffer_boundaries[relative_sequence_number] = 
1206       relative_offset + size;
1207       
1208     /* Modify the ACK bitmap */
1209     ackbitmap_modify_bit (&socket->ack_bitmap,
1210                           relative_sequence_number,
1211                           GNUNET_YES);
1212
1213     /* Start ACK sending task if one is not already present */
1214     if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1215     {
1216       socket->ack_task_id = 
1217         GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1218                                       (msg->ack_deadline),
1219                                       &ack_task,
1220                                       socket);
1221     }
1222
1223     if ((NULL != socket->read_handle) /* A read handle is waiting */
1224         /* There is no current read task */
1225         && (GNUNET_SCHEDULER_NO_TASK == socket->read_task_id)
1226         /* We have the first packet */
1227         && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap,
1228                                                0)))
1229     {
1230       LOG (GNUNET_ERROR_TYPE_DEBUG,
1231            "%s: Scheduling read processor\n",
1232            GNUNET_i2s (&socket->other_peer));
1233           
1234       socket->read_task_id = 
1235         GNUNET_SCHEDULER_add_now (&call_read_processor,
1236                                   socket);
1237     }
1238       
1239     break;
1240
1241   default:
1242     LOG (GNUNET_ERROR_TYPE_DEBUG,
1243          "%s: Received data message when it cannot be handled\n",
1244          GNUNET_i2s (&socket->other_peer));
1245     break;
1246   }
1247   return GNUNET_YES;
1248 }
1249
1250
1251 /**
1252  * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1253  *
1254  * @param cls the socket (set from GNUNET_MESH_connect)
1255  * @param tunnel connection to the other end
1256  * @param tunnel_ctx place to store local state associated with the tunnel
1257  * @param sender who sent the message
1258  * @param message the actual message
1259  * @param atsi performance data for the connection
1260  * @return GNUNET_OK to keep the connection open,
1261  *         GNUNET_SYSERR to close it (signal serious error)
1262  */
1263 static int
1264 client_handle_data (void *cls,
1265                     struct GNUNET_MESH_Tunnel *tunnel,
1266                     void **tunnel_ctx,
1267                     const struct GNUNET_PeerIdentity *sender,
1268                     const struct GNUNET_MessageHeader *message,
1269                     const struct GNUNET_ATS_Information*atsi)
1270 {
1271   struct GNUNET_STREAM_Socket *socket = cls;
1272
1273   return handle_data (socket, 
1274                       tunnel, 
1275                       sender, 
1276                       (const struct GNUNET_STREAM_DataMessage *) message, 
1277                       atsi);
1278 }
1279
1280
1281 /**
1282  * Callback to set state to ESTABLISHED
1283  *
1284  * @param cls the closure from queue_message FIXME: document
1285  * @param socket the socket to requiring state change
1286  */
1287 static void
1288 set_state_established (void *cls,
1289                        struct GNUNET_STREAM_Socket *socket)
1290 {
1291   LOG (GNUNET_ERROR_TYPE_DEBUG, 
1292        "%s: Attaining ESTABLISHED state\n",
1293        GNUNET_i2s (&socket->other_peer));
1294   socket->write_offset = 0;
1295   socket->read_offset = 0;
1296   socket->state = STATE_ESTABLISHED;
1297   /* FIXME: What if listen_cb is NULL */
1298   if (NULL != socket->lsocket)
1299   {
1300     LOG (GNUNET_ERROR_TYPE_DEBUG,
1301          "%s: Calling listen callback\n",
1302          GNUNET_i2s (&socket->other_peer));
1303     if (GNUNET_SYSERR == 
1304         socket->lsocket->listen_cb (socket->lsocket->listen_cb_cls,
1305                                     socket,
1306                                     &socket->other_peer))
1307     {
1308       socket->state = STATE_CLOSED;
1309       /* FIXME: We should close in a decent way */
1310       GNUNET_MESH_tunnel_destroy (socket->tunnel); /* Destroy the tunnel */
1311       GNUNET_free (socket);
1312     }
1313   }
1314   else if (socket->open_cb)
1315     socket->open_cb (socket->open_cls, socket);
1316 }
1317
1318
1319 /**
1320  * Callback to set state to HELLO_WAIT
1321  *
1322  * @param cls the closure from queue_message
1323  * @param socket the socket to requiring state change
1324  */
1325 static void
1326 set_state_hello_wait (void *cls,
1327                       struct GNUNET_STREAM_Socket *socket)
1328 {
1329   GNUNET_assert (STATE_INIT == socket->state);
1330   LOG (GNUNET_ERROR_TYPE_DEBUG, 
1331        "%s: Attaining HELLO_WAIT state\n",
1332        GNUNET_i2s (&socket->other_peer));
1333   socket->state = STATE_HELLO_WAIT;
1334 }
1335
1336
1337 /**
1338  * Callback to set state to CLOSE_WAIT
1339  *
1340  * @param cls the closure from queue_message
1341  * @param socket the socket requiring state change
1342  */
1343 static void
1344 set_state_close_wait (void *cls,
1345                       struct GNUNET_STREAM_Socket *socket)
1346 {
1347   LOG (GNUNET_ERROR_TYPE_DEBUG,
1348        "%s: Attaing CLOSE_WAIT state\n",
1349        GNUNET_i2s (&socket->other_peer));
1350   socket->state = STATE_CLOSE_WAIT;
1351   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1352   socket->receive_buffer = NULL;
1353   socket->receive_buffer_size = 0;
1354 }
1355
1356
1357 /**
1358  * Callback to set state to RECEIVE_CLOSE_WAIT
1359  *
1360  * @param cls the closure from queue_message
1361  * @param socket the socket requiring state change
1362  */
1363 static void
1364 set_state_receive_close_wait (void *cls,
1365                               struct GNUNET_STREAM_Socket *socket)
1366 {
1367   LOG (GNUNET_ERROR_TYPE_DEBUG,
1368        "%s: Attaing RECEIVE_CLOSE_WAIT state\n",
1369        GNUNET_i2s (&socket->other_peer));
1370   socket->state = STATE_RECEIVE_CLOSE_WAIT;
1371   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1372   socket->receive_buffer = NULL;
1373   socket->receive_buffer_size = 0;
1374 }
1375
1376
1377 /**
1378  * Callback to set state to TRANSMIT_CLOSE_WAIT
1379  *
1380  * @param cls the closure from queue_message
1381  * @param socket the socket requiring state change
1382  */
1383 static void
1384 set_state_transmit_close_wait (void *cls,
1385                                struct GNUNET_STREAM_Socket *socket)
1386 {
1387   LOG (GNUNET_ERROR_TYPE_DEBUG,
1388        "%s: Attaing TRANSMIT_CLOSE_WAIT state\n",
1389        GNUNET_i2s (&socket->other_peer));
1390   socket->state = STATE_TRANSMIT_CLOSE_WAIT;
1391 }
1392
1393
1394 /**
1395  * Callback to set state to CLOSED
1396  *
1397  * @param cls the closure from queue_message
1398  * @param socket the socket requiring state change
1399  */
1400 static void
1401 set_state_closed (void *cls,
1402                   struct GNUNET_STREAM_Socket *socket)
1403 {
1404   socket->state = STATE_CLOSED;
1405 }
1406
1407 /**
1408  * Returns a new HelloAckMessage. Also sets the write sequence number for the
1409  * socket
1410  *
1411  * @param socket the socket for which this HelloAckMessage has to be generated
1412  * @return the HelloAckMessage
1413  */
1414 static struct GNUNET_STREAM_HelloAckMessage *
1415 generate_hello_ack_msg (struct GNUNET_STREAM_Socket *socket)
1416 {
1417   struct GNUNET_STREAM_HelloAckMessage *msg;
1418
1419   /* Get the random sequence number */
1420   if (GNUNET_YES == socket->testing_active)
1421     socket->write_sequence_number =
1422       socket->testing_set_write_sequence_number_value;
1423   else
1424     socket->write_sequence_number = 
1425       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1426   LOG (GNUNET_ERROR_TYPE_DEBUG,
1427        "%s: write sequence number %u\n",
1428        GNUNET_i2s (&socket->other_peer),
1429        (unsigned int) socket->write_sequence_number);
1430   
1431   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1432   msg->header.header.size = 
1433     htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1434   msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1435   msg->sequence_number = htonl (socket->write_sequence_number);
1436   msg->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
1437
1438   return msg;
1439 }
1440
1441
1442 /**
1443  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1444  *
1445  * @param cls the socket (set from GNUNET_MESH_connect)
1446  * @param tunnel connection to the other end
1447  * @param tunnel_ctx this is NULL
1448  * @param sender who sent the message
1449  * @param message the actual message
1450  * @param atsi performance data for the connection
1451  * @return GNUNET_OK to keep the connection open,
1452  *         GNUNET_SYSERR to close it (signal serious error)
1453  */
1454 static int
1455 client_handle_hello_ack (void *cls,
1456                          struct GNUNET_MESH_Tunnel *tunnel,
1457                          void **tunnel_ctx,
1458                          const struct GNUNET_PeerIdentity *sender,
1459                          const struct GNUNET_MessageHeader *message,
1460                          const struct GNUNET_ATS_Information*atsi)
1461 {
1462   struct GNUNET_STREAM_Socket *socket = cls;
1463   const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
1464   struct GNUNET_STREAM_HelloAckMessage *reply;
1465
1466   if (0 != memcmp (sender,
1467                    &socket->other_peer,
1468                    sizeof (struct GNUNET_PeerIdentity)))
1469   {
1470     LOG (GNUNET_ERROR_TYPE_DEBUG,
1471          "%s: Received HELLO_ACK from non-confirming peer\n",
1472          GNUNET_i2s (&socket->other_peer));
1473     return GNUNET_YES;
1474   }
1475   ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
1476   LOG (GNUNET_ERROR_TYPE_DEBUG,
1477        "%s: Received HELLO_ACK from %s\n",
1478        GNUNET_i2s (&socket->other_peer),
1479        GNUNET_i2s (&socket->other_peer));
1480
1481   GNUNET_assert (socket->tunnel == tunnel);
1482   switch (socket->state)
1483   {
1484   case STATE_HELLO_WAIT:
1485     socket->read_sequence_number = ntohl (ack_msg->sequence_number);
1486     LOG (GNUNET_ERROR_TYPE_DEBUG,
1487          "%s: Read sequence number %u\n",
1488          GNUNET_i2s (&socket->other_peer),
1489          (unsigned int) socket->read_sequence_number);
1490     socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
1491     reply = generate_hello_ack_msg (socket);
1492     queue_message (socket,
1493                    &reply->header, 
1494                    &set_state_established, 
1495                    NULL);      
1496     return GNUNET_OK;
1497   case STATE_ESTABLISHED:
1498   case STATE_RECEIVE_CLOSE_WAIT:
1499     // call statistics (# ACKs ignored++)
1500     return GNUNET_OK;
1501   case STATE_INIT:
1502   default:
1503     LOG (GNUNET_ERROR_TYPE_DEBUG,
1504          "%s: Server %s sent HELLO_ACK when in state %d\n", 
1505          GNUNET_i2s (&socket->other_peer),
1506          GNUNET_i2s (&socket->other_peer),
1507          socket->state);
1508     socket->state = STATE_CLOSED; // introduce STATE_ERROR?
1509     return GNUNET_SYSERR;
1510   }
1511
1512 }
1513
1514
1515 /**
1516  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1517  *
1518  * @param cls the socket (set from GNUNET_MESH_connect)
1519  * @param tunnel connection to the other end
1520  * @param tunnel_ctx this is NULL
1521  * @param sender who sent the message
1522  * @param message the actual message
1523  * @param atsi performance data for the connection
1524  * @return GNUNET_OK to keep the connection open,
1525  *         GNUNET_SYSERR to close it (signal serious error)
1526  */
1527 static int
1528 client_handle_reset (void *cls,
1529                      struct GNUNET_MESH_Tunnel *tunnel,
1530                      void **tunnel_ctx,
1531                      const struct GNUNET_PeerIdentity *sender,
1532                      const struct GNUNET_MessageHeader *message,
1533                      const struct GNUNET_ATS_Information*atsi)
1534 {
1535   // struct GNUNET_STREAM_Socket *socket = cls;
1536
1537   return GNUNET_OK;
1538 }
1539
1540
1541 /**
1542  * Common message handler for handling TRANSMIT_CLOSE messages
1543  *
1544  * @param socket the socket through which the ack was received
1545  * @param tunnel connection to the other end
1546  * @param sender who sent the message
1547  * @param msg the transmit close message
1548  * @param atsi performance data for the connection
1549  * @return GNUNET_OK to keep the connection open,
1550  *         GNUNET_SYSERR to close it (signal serious error)
1551  */
1552 static int
1553 handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
1554                        struct GNUNET_MESH_Tunnel *tunnel,
1555                        const struct GNUNET_PeerIdentity *sender,
1556                        const struct GNUNET_STREAM_MessageHeader *msg,
1557                        const struct GNUNET_ATS_Information*atsi)
1558 {
1559   struct GNUNET_STREAM_MessageHeader *reply;
1560
1561   switch (socket->state)
1562   {
1563   case STATE_ESTABLISHED:
1564     socket->state = STATE_RECEIVE_CLOSED;
1565
1566     /* Send TRANSMIT_CLOSE_ACK */
1567     reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1568     reply->header.type = 
1569       htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
1570     reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1571     queue_message (socket, reply, NULL, NULL);
1572     break;
1573
1574   default:
1575     /* FIXME: Call statistics? */
1576     break;
1577   }
1578   return GNUNET_YES;
1579 }
1580
1581
1582 /**
1583  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1584  *
1585  * @param cls the socket (set from GNUNET_MESH_connect)
1586  * @param tunnel connection to the other end
1587  * @param tunnel_ctx this is NULL
1588  * @param sender who sent the message
1589  * @param message the actual message
1590  * @param atsi performance data for the connection
1591  * @return GNUNET_OK to keep the connection open,
1592  *         GNUNET_SYSERR to close it (signal serious error)
1593  */
1594 static int
1595 client_handle_transmit_close (void *cls,
1596                               struct GNUNET_MESH_Tunnel *tunnel,
1597                               void **tunnel_ctx,
1598                               const struct GNUNET_PeerIdentity *sender,
1599                               const struct GNUNET_MessageHeader *message,
1600                               const struct GNUNET_ATS_Information*atsi)
1601 {
1602   struct GNUNET_STREAM_Socket *socket = cls;
1603   
1604   return handle_transmit_close (socket,
1605                                 tunnel,
1606                                 sender,
1607                                 (struct GNUNET_STREAM_MessageHeader *)message,
1608                                 atsi);
1609 }
1610
1611
1612 /**
1613  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_*_CLOSE_ACK messages
1614  *
1615  * @param socket the socket
1616  * @param tunnel connection to the other end
1617  * @param sender who sent the message
1618  * @param message the actual message
1619  * @param atsi performance data for the connection
1620  * @param operation the close operation which is being ACK'ed
1621  * @return GNUNET_OK to keep the connection open,
1622  *         GNUNET_SYSERR to close it (signal serious error)
1623  */
1624 static int
1625 handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
1626                           struct GNUNET_MESH_Tunnel *tunnel,
1627                           const struct GNUNET_PeerIdentity *sender,
1628                           const struct GNUNET_STREAM_MessageHeader *message,
1629                           const struct GNUNET_ATS_Information *atsi,
1630                           int operation)
1631 {
1632   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
1633
1634   shutdown_handle = socket->shutdown_handle;
1635   if (NULL == shutdown_handle)
1636   {
1637     LOG (GNUNET_ERROR_TYPE_DEBUG,
1638          "%s: Received CLOSE_ACK when shutdown handle is NULL\n",
1639          GNUNET_i2s (&socket->other_peer));
1640     return GNUNET_OK;
1641   }
1642
1643   switch (operation)
1644   {
1645   case SHUT_RDWR:
1646     switch (socket->state)
1647     {
1648     case STATE_CLOSE_WAIT:
1649       if (SHUT_RDWR != shutdown_handle->operation)
1650       {
1651         LOG (GNUNET_ERROR_TYPE_DEBUG,
1652              "%s: Received CLOSE_ACK when shutdown handle is not for "
1653              "SHUT_RDWR\n",
1654              GNUNET_i2s (&socket->other_peer));
1655         return GNUNET_OK;
1656       }
1657
1658       LOG (GNUNET_ERROR_TYPE_DEBUG,
1659            "%s: Received CLOSE_ACK from %s\n",
1660            GNUNET_i2s (&socket->other_peer),
1661            GNUNET_i2s (&socket->other_peer));
1662       socket->state = STATE_CLOSED;
1663       break;
1664     default:
1665       LOG (GNUNET_ERROR_TYPE_DEBUG,
1666            "%s: Received CLOSE_ACK when in it not expected\n",
1667            GNUNET_i2s (&socket->other_peer));
1668       return GNUNET_OK;
1669     }
1670     break;
1671
1672   case SHUT_RD:
1673     switch (socket->state)
1674     {
1675     case STATE_RECEIVE_CLOSE_WAIT:
1676       if (SHUT_RD != shutdown_handle->operation)
1677       {
1678         LOG (GNUNET_ERROR_TYPE_DEBUG,
1679              "%s: Received RECEIVE_CLOSE_ACK when shutdown handle "
1680              "is not for SHUT_RD\n",
1681              GNUNET_i2s (&socket->other_peer));
1682         return GNUNET_OK;
1683       }
1684
1685       LOG (GNUNET_ERROR_TYPE_DEBUG,
1686            "%s: Received RECEIVE_CLOSE_ACK from %s\n",
1687            GNUNET_i2s (&socket->other_peer),
1688            GNUNET_i2s (&socket->other_peer));
1689       socket->state = STATE_RECEIVE_CLOSED;
1690       break;
1691     default:
1692       LOG (GNUNET_ERROR_TYPE_DEBUG,
1693            "%s: Received RECEIVE_CLOSE_ACK when in it not expected\n",
1694            GNUNET_i2s (&socket->other_peer));
1695       return GNUNET_OK;
1696     }
1697
1698     break;
1699   case SHUT_WR:
1700     switch (socket->state)
1701     {
1702     case STATE_TRANSMIT_CLOSE_WAIT:
1703       if (SHUT_WR != shutdown_handle->operation)
1704       {
1705         LOG (GNUNET_ERROR_TYPE_DEBUG,
1706              "%s: Received TRANSMIT_CLOSE_ACK when shutdown handle "
1707              "is not for SHUT_WR\n",
1708              GNUNET_i2s (&socket->other_peer));
1709         return GNUNET_OK;
1710       }
1711
1712       LOG (GNUNET_ERROR_TYPE_DEBUG,
1713            "%s: Received TRANSMIT_CLOSE_ACK from %s\n",
1714            GNUNET_i2s (&socket->other_peer),
1715            GNUNET_i2s (&socket->other_peer));
1716       socket->state = STATE_TRANSMIT_CLOSED;
1717       break;
1718     default:
1719       LOG (GNUNET_ERROR_TYPE_DEBUG,
1720            "%s: Received TRANSMIT_CLOSE_ACK when in it not expected\n",
1721            GNUNET_i2s (&socket->other_peer));
1722           
1723       return GNUNET_OK;
1724     }
1725     break;
1726   default:
1727     GNUNET_assert (0);
1728   }
1729
1730   if (NULL != shutdown_handle->completion_cb) /* Shutdown completion */
1731     shutdown_handle->completion_cb(shutdown_handle->completion_cls,
1732                                    operation);
1733   GNUNET_free (shutdown_handle); /* Free shutdown handle */
1734   socket->shutdown_handle = NULL;
1735   if (GNUNET_SCHEDULER_NO_TASK
1736       != shutdown_handle->close_msg_retransmission_task_id)
1737   {
1738     GNUNET_SCHEDULER_cancel
1739       (shutdown_handle->close_msg_retransmission_task_id);
1740     shutdown_handle->close_msg_retransmission_task_id =
1741       GNUNET_SCHEDULER_NO_TASK;
1742   }
1743   return GNUNET_OK;
1744 }
1745
1746
1747 /**
1748  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1749  *
1750  * @param cls the socket (set from GNUNET_MESH_connect)
1751  * @param tunnel connection to the other end
1752  * @param tunnel_ctx this is NULL
1753  * @param sender who sent the message
1754  * @param message the actual message
1755  * @param atsi performance data for the connection
1756  * @return GNUNET_OK to keep the connection open,
1757  *         GNUNET_SYSERR to close it (signal serious error)
1758  */
1759 static int
1760 client_handle_transmit_close_ack (void *cls,
1761                                   struct GNUNET_MESH_Tunnel *tunnel,
1762                                   void **tunnel_ctx,
1763                                   const struct GNUNET_PeerIdentity *sender,
1764                                   const struct GNUNET_MessageHeader *message,
1765                                   const struct GNUNET_ATS_Information*atsi)
1766 {
1767   struct GNUNET_STREAM_Socket *socket = cls;
1768
1769   return handle_generic_close_ack (socket,
1770                                    tunnel,
1771                                    sender,
1772                                    (const struct GNUNET_STREAM_MessageHeader *)
1773                                    message,
1774                                    atsi,
1775                                    SHUT_WR);
1776 }
1777
1778
1779 /**
1780  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1781  *
1782  * @param socket the socket
1783  * @param tunnel connection to the other end
1784  * @param sender who sent the message
1785  * @param message the actual message
1786  * @param atsi performance data for the connection
1787  * @return GNUNET_OK to keep the connection open,
1788  *         GNUNET_SYSERR to close it (signal serious error)
1789  */
1790 static int
1791 handle_receive_close (struct GNUNET_STREAM_Socket *socket,
1792                       struct GNUNET_MESH_Tunnel *tunnel,
1793                       const struct GNUNET_PeerIdentity *sender,
1794                       const struct GNUNET_STREAM_MessageHeader *message,
1795                       const struct GNUNET_ATS_Information *atsi)
1796 {
1797   struct GNUNET_STREAM_MessageHeader *receive_close_ack;
1798
1799   switch (socket->state)
1800   {
1801   case STATE_INIT:
1802   case STATE_LISTEN:
1803   case STATE_HELLO_WAIT:
1804     LOG (GNUNET_ERROR_TYPE_DEBUG,
1805          "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
1806          GNUNET_i2s (&socket->other_peer));
1807     return GNUNET_OK;
1808   default:
1809     break;
1810   }
1811   
1812   LOG (GNUNET_ERROR_TYPE_DEBUG,
1813        "%s: Received RECEIVE_CLOSE from %s\n",
1814        GNUNET_i2s (&socket->other_peer),
1815        GNUNET_i2s (&socket->other_peer));
1816   receive_close_ack =
1817     GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1818   receive_close_ack->header.size =
1819     htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1820   receive_close_ack->header.type =
1821     htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK);
1822   queue_message (socket,
1823                  receive_close_ack,
1824                  &set_state_closed,
1825                  NULL);
1826   
1827   /* FIXME: Handle the case where write handle is present; the write operation
1828      should be deemed as finised and the write continuation callback
1829      has to be called with the stream status GNUNET_STREAM_SHUTDOWN */
1830   return GNUNET_OK;
1831 }
1832
1833
1834 /**
1835  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1836  *
1837  * @param cls the socket (set from GNUNET_MESH_connect)
1838  * @param tunnel connection to the other end
1839  * @param tunnel_ctx this is NULL
1840  * @param sender who sent the message
1841  * @param message the actual message
1842  * @param atsi performance data for the connection
1843  * @return GNUNET_OK to keep the connection open,
1844  *         GNUNET_SYSERR to close it (signal serious error)
1845  */
1846 static int
1847 client_handle_receive_close (void *cls,
1848                              struct GNUNET_MESH_Tunnel *tunnel,
1849                              void **tunnel_ctx,
1850                              const struct GNUNET_PeerIdentity *sender,
1851                              const struct GNUNET_MessageHeader *message,
1852                              const struct GNUNET_ATS_Information*atsi)
1853 {
1854   struct GNUNET_STREAM_Socket *socket = cls;
1855
1856   return
1857     handle_receive_close (socket,
1858                           tunnel,
1859                           sender,
1860                           (const struct GNUNET_STREAM_MessageHeader *) message,
1861                           atsi);
1862 }
1863
1864
1865 /**
1866  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1867  *
1868  * @param cls the socket (set from GNUNET_MESH_connect)
1869  * @param tunnel connection to the other end
1870  * @param tunnel_ctx this is NULL
1871  * @param sender who sent the message
1872  * @param message the actual message
1873  * @param atsi performance data for the connection
1874  * @return GNUNET_OK to keep the connection open,
1875  *         GNUNET_SYSERR to close it (signal serious error)
1876  */
1877 static int
1878 client_handle_receive_close_ack (void *cls,
1879                                  struct GNUNET_MESH_Tunnel *tunnel,
1880                                  void **tunnel_ctx,
1881                                  const struct GNUNET_PeerIdentity *sender,
1882                                  const struct GNUNET_MessageHeader *message,
1883                                  const struct GNUNET_ATS_Information*atsi)
1884 {
1885   struct GNUNET_STREAM_Socket *socket = cls;
1886
1887   return handle_generic_close_ack (socket,
1888                                    tunnel,
1889                                    sender,
1890                                    (const struct GNUNET_STREAM_MessageHeader *)
1891                                    message,
1892                                    atsi,
1893                                    SHUT_RD);
1894 }
1895
1896
1897 /**
1898  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1899  *
1900  * @param socket the socket
1901  * @param tunnel connection to the other end
1902  * @param sender who sent the message
1903  * @param message the actual message
1904  * @param atsi performance data for the connection
1905  * @return GNUNET_OK to keep the connection open,
1906  *         GNUNET_SYSERR to close it (signal serious error)
1907  */
1908 static int
1909 handle_close (struct GNUNET_STREAM_Socket *socket,
1910               struct GNUNET_MESH_Tunnel *tunnel,
1911               const struct GNUNET_PeerIdentity *sender,
1912               const struct GNUNET_STREAM_MessageHeader *message,
1913               const struct GNUNET_ATS_Information*atsi)
1914 {
1915   struct GNUNET_STREAM_MessageHeader *close_ack;
1916
1917   switch (socket->state)
1918   {
1919   case STATE_INIT:
1920   case STATE_LISTEN:
1921   case STATE_HELLO_WAIT:
1922     LOG (GNUNET_ERROR_TYPE_DEBUG,
1923          "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
1924          GNUNET_i2s (&socket->other_peer));
1925     return GNUNET_OK;
1926   default:
1927     break;
1928   }
1929
1930   LOG (GNUNET_ERROR_TYPE_DEBUG,
1931        "%s: Received CLOSE from %s\n",
1932        GNUNET_i2s (&socket->other_peer),
1933        GNUNET_i2s (&socket->other_peer));
1934   close_ack = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1935   close_ack->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1936   close_ack->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK);
1937   queue_message (socket,
1938                  close_ack,
1939                  &set_state_closed,
1940                  NULL);
1941   if (socket->state == STATE_CLOSED)
1942     return GNUNET_OK;
1943
1944   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1945   socket->receive_buffer = NULL;
1946   socket->receive_buffer_size = 0;
1947   return GNUNET_OK;
1948 }
1949
1950
1951 /**
1952  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1953  *
1954  * @param cls the socket (set from GNUNET_MESH_connect)
1955  * @param tunnel connection to the other end
1956  * @param tunnel_ctx this is NULL
1957  * @param sender who sent the message
1958  * @param message the actual message
1959  * @param atsi performance data for the connection
1960  * @return GNUNET_OK to keep the connection open,
1961  *         GNUNET_SYSERR to close it (signal serious error)
1962  */
1963 static int
1964 client_handle_close (void *cls,
1965                      struct GNUNET_MESH_Tunnel *tunnel,
1966                      void **tunnel_ctx,
1967                      const struct GNUNET_PeerIdentity *sender,
1968                      const struct GNUNET_MessageHeader *message,
1969                      const struct GNUNET_ATS_Information*atsi)
1970 {
1971   struct GNUNET_STREAM_Socket *socket = cls;
1972
1973   return handle_close (socket,
1974                        tunnel,
1975                        sender,
1976                        (const struct GNUNET_STREAM_MessageHeader *) message,
1977                        atsi);
1978 }
1979
1980
1981 /**
1982  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1983  *
1984  * @param cls the socket (set from GNUNET_MESH_connect)
1985  * @param tunnel connection to the other end
1986  * @param tunnel_ctx this is NULL
1987  * @param sender who sent the message
1988  * @param message the actual message
1989  * @param atsi performance data for the connection
1990  * @return GNUNET_OK to keep the connection open,
1991  *         GNUNET_SYSERR to close it (signal serious error)
1992  */
1993 static int
1994 client_handle_close_ack (void *cls,
1995                          struct GNUNET_MESH_Tunnel *tunnel,
1996                          void **tunnel_ctx,
1997                          const struct GNUNET_PeerIdentity *sender,
1998                          const struct GNUNET_MessageHeader *message,
1999                          const struct GNUNET_ATS_Information *atsi)
2000 {
2001   struct GNUNET_STREAM_Socket *socket = cls;
2002
2003   return handle_generic_close_ack (socket,
2004                                    tunnel,
2005                                    sender,
2006                                    (const struct GNUNET_STREAM_MessageHeader *) 
2007                                    message,
2008                                    atsi,
2009                                    SHUT_RDWR);
2010 }
2011
2012 /*****************************/
2013 /* Server's Message Handlers */
2014 /*****************************/
2015
2016 /**
2017  * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
2018  *
2019  * @param cls the closure
2020  * @param tunnel connection to the other end
2021  * @param tunnel_ctx the socket
2022  * @param sender who sent the message
2023  * @param message the actual message
2024  * @param atsi performance data for the connection
2025  * @return GNUNET_OK to keep the connection open,
2026  *         GNUNET_SYSERR to close it (signal serious error)
2027  */
2028 static int
2029 server_handle_data (void *cls,
2030                     struct GNUNET_MESH_Tunnel *tunnel,
2031                     void **tunnel_ctx,
2032                     const struct GNUNET_PeerIdentity *sender,
2033                     const struct GNUNET_MessageHeader *message,
2034                     const struct GNUNET_ATS_Information*atsi)
2035 {
2036   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2037
2038   return handle_data (socket,
2039                       tunnel,
2040                       sender,
2041                       (const struct GNUNET_STREAM_DataMessage *)message,
2042                       atsi);
2043 }
2044
2045
2046 /**
2047  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
2048  *
2049  * @param cls the closure
2050  * @param tunnel connection to the other end
2051  * @param tunnel_ctx the socket
2052  * @param sender who sent the message
2053  * @param message the actual message
2054  * @param atsi performance data for the connection
2055  * @return GNUNET_OK to keep the connection open,
2056  *         GNUNET_SYSERR to close it (signal serious error)
2057  */
2058 static int
2059 server_handle_hello (void *cls,
2060                      struct GNUNET_MESH_Tunnel *tunnel,
2061                      void **tunnel_ctx,
2062                      const struct GNUNET_PeerIdentity *sender,
2063                      const struct GNUNET_MessageHeader *message,
2064                      const struct GNUNET_ATS_Information*atsi)
2065 {
2066   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2067   struct GNUNET_STREAM_HelloAckMessage *reply;
2068
2069   if (0 != memcmp (sender,
2070                    &socket->other_peer,
2071                    sizeof (struct GNUNET_PeerIdentity)))
2072   {
2073     LOG (GNUNET_ERROR_TYPE_DEBUG,
2074          "%s: Received HELLO from non-confirming peer\n",
2075          GNUNET_i2s (&socket->other_peer));
2076     return GNUNET_YES;
2077   }
2078
2079   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == 
2080                  ntohs (message->type));
2081   GNUNET_assert (socket->tunnel == tunnel);
2082   LOG (GNUNET_ERROR_TYPE_DEBUG,
2083        "%s: Received HELLO from %s\n", 
2084        GNUNET_i2s (&socket->other_peer),
2085        GNUNET_i2s (&socket->other_peer));
2086
2087   if (STATE_INIT == socket->state)
2088   {
2089     reply = generate_hello_ack_msg (socket);
2090     queue_message (socket, 
2091                    &reply->header,
2092                    &set_state_hello_wait, 
2093                    NULL);
2094   }
2095   else
2096   {
2097     LOG (GNUNET_ERROR_TYPE_DEBUG,
2098          "%s: Client sent HELLO when in state %d\n", 
2099          GNUNET_i2s (&socket->other_peer),
2100          socket->state);
2101     /* FIXME: Send RESET? */
2102       
2103   }
2104   return GNUNET_OK;
2105 }
2106
2107
2108 /**
2109  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
2110  *
2111  * @param cls the closure
2112  * @param tunnel connection to the other end
2113  * @param tunnel_ctx the socket
2114  * @param sender who sent the message
2115  * @param message the actual message
2116  * @param atsi performance data for the connection
2117  * @return GNUNET_OK to keep the connection open,
2118  *         GNUNET_SYSERR to close it (signal serious error)
2119  */
2120 static int
2121 server_handle_hello_ack (void *cls,
2122                          struct GNUNET_MESH_Tunnel *tunnel,
2123                          void **tunnel_ctx,
2124                          const struct GNUNET_PeerIdentity *sender,
2125                          const struct GNUNET_MessageHeader *message,
2126                          const struct GNUNET_ATS_Information*atsi)
2127 {
2128   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2129   const struct GNUNET_STREAM_HelloAckMessage *ack_message;
2130
2131   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK ==
2132                  ntohs (message->type));
2133   GNUNET_assert (socket->tunnel == tunnel);
2134   ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
2135   if (STATE_HELLO_WAIT == socket->state)
2136   {
2137     LOG (GNUNET_ERROR_TYPE_DEBUG,
2138          "%s: Received HELLO_ACK from %s\n",
2139          GNUNET_i2s (&socket->other_peer),
2140          GNUNET_i2s (&socket->other_peer));
2141     socket->read_sequence_number = ntohl (ack_message->sequence_number);
2142     LOG (GNUNET_ERROR_TYPE_DEBUG,
2143          "%s: Read sequence number %u\n",
2144          GNUNET_i2s (&socket->other_peer),
2145          (unsigned int) socket->read_sequence_number);
2146     socket->receiver_window_available = 
2147       ntohl (ack_message->receiver_window_size);
2148     /* Attain ESTABLISHED state */
2149     set_state_established (NULL, socket);
2150   }
2151   else
2152   {
2153     LOG (GNUNET_ERROR_TYPE_DEBUG,
2154          "Client sent HELLO_ACK when in state %d\n", socket->state);
2155     /* FIXME: Send RESET? */
2156       
2157   }
2158   return GNUNET_OK;
2159 }
2160
2161
2162 /**
2163  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
2164  *
2165  * @param cls the closure
2166  * @param tunnel connection to the other end
2167  * @param tunnel_ctx the socket
2168  * @param sender who sent the message
2169  * @param message the actual message
2170  * @param atsi performance data for the connection
2171  * @return GNUNET_OK to keep the connection open,
2172  *         GNUNET_SYSERR to close it (signal serious error)
2173  */
2174 static int
2175 server_handle_reset (void *cls,
2176                      struct GNUNET_MESH_Tunnel *tunnel,
2177                      void **tunnel_ctx,
2178                      const struct GNUNET_PeerIdentity *sender,
2179                      const struct GNUNET_MessageHeader *message,
2180                      const struct GNUNET_ATS_Information*atsi)
2181 {
2182   // struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2183
2184   return GNUNET_OK;
2185 }
2186
2187
2188 /**
2189  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
2190  *
2191  * @param cls the closure
2192  * @param tunnel connection to the other end
2193  * @param tunnel_ctx the socket
2194  * @param sender who sent the message
2195  * @param message the actual message
2196  * @param atsi performance data for the connection
2197  * @return GNUNET_OK to keep the connection open,
2198  *         GNUNET_SYSERR to close it (signal serious error)
2199  */
2200 static int
2201 server_handle_transmit_close (void *cls,
2202                               struct GNUNET_MESH_Tunnel *tunnel,
2203                               void **tunnel_ctx,
2204                               const struct GNUNET_PeerIdentity *sender,
2205                               const struct GNUNET_MessageHeader *message,
2206                               const struct GNUNET_ATS_Information*atsi)
2207 {
2208   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2209
2210   return handle_transmit_close (socket,
2211                                 tunnel,
2212                                 sender,
2213                                 (struct GNUNET_STREAM_MessageHeader *)message,
2214                                 atsi);
2215 }
2216
2217
2218 /**
2219  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
2220  *
2221  * @param cls the closure
2222  * @param tunnel connection to the other end
2223  * @param tunnel_ctx the socket
2224  * @param sender who sent the message
2225  * @param message the actual message
2226  * @param atsi performance data for the connection
2227  * @return GNUNET_OK to keep the connection open,
2228  *         GNUNET_SYSERR to close it (signal serious error)
2229  */
2230 static int
2231 server_handle_transmit_close_ack (void *cls,
2232                                   struct GNUNET_MESH_Tunnel *tunnel,
2233                                   void **tunnel_ctx,
2234                                   const struct GNUNET_PeerIdentity *sender,
2235                                   const struct GNUNET_MessageHeader *message,
2236                                   const struct GNUNET_ATS_Information*atsi)
2237 {
2238   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2239
2240   return handle_generic_close_ack (socket,
2241                                    tunnel,
2242                                    sender,
2243                                    (const struct GNUNET_STREAM_MessageHeader *)
2244                                    message,
2245                                    atsi,
2246                                    SHUT_WR);
2247 }
2248
2249
2250 /**
2251  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
2252  *
2253  * @param cls the closure
2254  * @param tunnel connection to the other end
2255  * @param tunnel_ctx the socket
2256  * @param sender who sent the message
2257  * @param message the actual message
2258  * @param atsi performance data for the connection
2259  * @return GNUNET_OK to keep the connection open,
2260  *         GNUNET_SYSERR to close it (signal serious error)
2261  */
2262 static int
2263 server_handle_receive_close (void *cls,
2264                              struct GNUNET_MESH_Tunnel *tunnel,
2265                              void **tunnel_ctx,
2266                              const struct GNUNET_PeerIdentity *sender,
2267                              const struct GNUNET_MessageHeader *message,
2268                              const struct GNUNET_ATS_Information*atsi)
2269 {
2270   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2271
2272   return
2273     handle_receive_close (socket,
2274                           tunnel,
2275                           sender,
2276                           (const struct GNUNET_STREAM_MessageHeader *) message,
2277                           atsi);
2278 }
2279
2280
2281 /**
2282  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
2283  *
2284  * @param cls the closure
2285  * @param tunnel connection to the other end
2286  * @param tunnel_ctx the socket
2287  * @param sender who sent the message
2288  * @param message the actual message
2289  * @param atsi performance data for the connection
2290  * @return GNUNET_OK to keep the connection open,
2291  *         GNUNET_SYSERR to close it (signal serious error)
2292  */
2293 static int
2294 server_handle_receive_close_ack (void *cls,
2295                                  struct GNUNET_MESH_Tunnel *tunnel,
2296                                  void **tunnel_ctx,
2297                                  const struct GNUNET_PeerIdentity *sender,
2298                                  const struct GNUNET_MessageHeader *message,
2299                                  const struct GNUNET_ATS_Information*atsi)
2300 {
2301   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2302
2303   return handle_generic_close_ack (socket,
2304                                    tunnel,
2305                                    sender,
2306                                    (const struct GNUNET_STREAM_MessageHeader *)
2307                                    message,
2308                                    atsi,
2309                                    SHUT_RD);
2310 }
2311
2312
2313 /**
2314  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
2315  *
2316  * @param cls the listen socket (from GNUNET_MESH_connect in
2317  *          GNUNET_STREAM_listen) 
2318  * @param tunnel connection to the other end
2319  * @param tunnel_ctx the socket
2320  * @param sender who sent the message
2321  * @param message the actual message
2322  * @param atsi performance data for the connection
2323  * @return GNUNET_OK to keep the connection open,
2324  *         GNUNET_SYSERR to close it (signal serious error)
2325  */
2326 static int
2327 server_handle_close (void *cls,
2328                      struct GNUNET_MESH_Tunnel *tunnel,
2329                      void **tunnel_ctx,
2330                      const struct GNUNET_PeerIdentity *sender,
2331                      const struct GNUNET_MessageHeader *message,
2332                      const struct GNUNET_ATS_Information*atsi)
2333 {
2334   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2335   
2336   return handle_close (socket,
2337                        tunnel,
2338                        sender,
2339                        (const struct GNUNET_STREAM_MessageHeader *) message,
2340                        atsi);
2341 }
2342
2343
2344 /**
2345  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
2346  *
2347  * @param cls the closure
2348  * @param tunnel connection to the other end
2349  * @param tunnel_ctx the socket
2350  * @param sender who sent the message
2351  * @param message the actual message
2352  * @param atsi performance data for the connection
2353  * @return GNUNET_OK to keep the connection open,
2354  *         GNUNET_SYSERR to close it (signal serious error)
2355  */
2356 static int
2357 server_handle_close_ack (void *cls,
2358                          struct GNUNET_MESH_Tunnel *tunnel,
2359                          void **tunnel_ctx,
2360                          const struct GNUNET_PeerIdentity *sender,
2361                          const struct GNUNET_MessageHeader *message,
2362                          const struct GNUNET_ATS_Information*atsi)
2363 {
2364   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2365
2366   return handle_generic_close_ack (socket,
2367                                    tunnel,
2368                                    sender,
2369                                    (const struct GNUNET_STREAM_MessageHeader *) 
2370                                    message,
2371                                    atsi,
2372                                    SHUT_RDWR);
2373 }
2374
2375
2376 /**
2377  * Handler for DATA_ACK messages
2378  *
2379  * @param socket the socket through which the ack was received
2380  * @param tunnel connection to the other end
2381  * @param sender who sent the message
2382  * @param ack the acknowledgment message
2383  * @param atsi performance data for the connection
2384  * @return GNUNET_OK to keep the connection open,
2385  *         GNUNET_SYSERR to close it (signal serious error)
2386  */
2387 static int
2388 handle_ack (struct GNUNET_STREAM_Socket *socket,
2389             struct GNUNET_MESH_Tunnel *tunnel,
2390             const struct GNUNET_PeerIdentity *sender,
2391             const struct GNUNET_STREAM_AckMessage *ack,
2392             const struct GNUNET_ATS_Information*atsi)
2393 {
2394   unsigned int packet;
2395   int need_retransmission;
2396   uint32_t sequence_difference;
2397   
2398   if (0 != memcmp (sender,
2399                    &socket->other_peer,
2400                    sizeof (struct GNUNET_PeerIdentity)))
2401   {
2402     LOG (GNUNET_ERROR_TYPE_DEBUG,
2403          "%s: Received ACK from non-confirming peer\n",
2404          GNUNET_i2s (&socket->other_peer));
2405     return GNUNET_YES;
2406   }
2407   switch (socket->state)
2408   {
2409   case (STATE_ESTABLISHED):
2410   case (STATE_RECEIVE_CLOSED):
2411   case (STATE_RECEIVE_CLOSE_WAIT):
2412     if (NULL == socket->write_handle)
2413     {
2414       LOG (GNUNET_ERROR_TYPE_DEBUG,
2415            "%s: Received DATA_ACK when write_handle is NULL\n",
2416            GNUNET_i2s (&socket->other_peer));
2417       return GNUNET_OK;
2418     }
2419     /* FIXME: increment in the base sequence number is breaking current flow
2420      */
2421     if (!((socket->write_sequence_number 
2422            - ntohl (ack->base_sequence_number)) < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
2423     {
2424       LOG (GNUNET_ERROR_TYPE_DEBUG,
2425            "%s: Received DATA_ACK with unexpected base sequence number\n",
2426            GNUNET_i2s (&socket->other_peer));
2427       LOG (GNUNET_ERROR_TYPE_DEBUG,
2428            "%s: Current write sequence: %u; Ack's base sequence: %u\n",
2429            GNUNET_i2s (&socket->other_peer),
2430            socket->write_sequence_number,
2431            ntohl (ack->base_sequence_number));
2432       return GNUNET_OK;
2433     }
2434     /* FIXME: include the case when write_handle is cancelled - ignore the 
2435        acks */
2436     LOG (GNUNET_ERROR_TYPE_DEBUG,
2437          "%s: Received DATA_ACK from %s\n",
2438          GNUNET_i2s (&socket->other_peer),
2439          GNUNET_i2s (&socket->other_peer));
2440       
2441     /* Cancel the retransmission task */
2442     if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
2443     {
2444       GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
2445       socket->retransmission_timeout_task_id = 
2446         GNUNET_SCHEDULER_NO_TASK;
2447     }
2448     for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2449     {
2450       if (NULL == socket->write_handle->messages[packet]) break;
2451       /* BS: Base sequence from ack; PS: sequence num of current packet */
2452       sequence_difference = ntohl (ack->base_sequence_number)
2453         - ntohl (socket->write_handle->messages[packet]->sequence_number);
2454       /* case where BS = PS + GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH */
2455       if ((sequence_difference == GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
2456           || ((sequence_difference < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
2457               && (0 != sequence_difference))) /* case: BS > PS and BS != PS*/
2458       {
2459         ackbitmap_modify_bit (&socket->write_handle->ack_bitmap, packet,
2460                               GNUNET_YES);
2461         continue;
2462       }
2463       if (GNUNET_YES == 
2464           ackbitmap_is_bit_set (&socket->write_handle->ack_bitmap,
2465                                 -sequence_difference))/*inversion as PS >= BS */
2466       {
2467         ackbitmap_modify_bit (&socket->write_handle->ack_bitmap, packet,
2468                               GNUNET_YES);
2469       }
2470     }
2471     /* Update the receive window remaining
2472        FIXME : Should update with the value from a data ack with greater
2473        sequence number */
2474     socket->receiver_window_available = 
2475       ntohl (ack->receive_window_remaining);
2476     /* Check if we have received all acknowledgements */
2477     need_retransmission = GNUNET_NO;
2478     for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2479     {
2480       if (NULL == socket->write_handle->messages[packet]) break;
2481       if (GNUNET_YES != ackbitmap_is_bit_set 
2482           (&socket->write_handle->ack_bitmap,packet))
2483       {
2484         need_retransmission = GNUNET_YES;
2485         break;
2486       }
2487     }
2488     if (GNUNET_YES == need_retransmission)
2489     {
2490       write_data (socket);
2491     }
2492     else      /* We have to call the write continuation callback now */
2493     {
2494       struct GNUNET_STREAM_IOWriteHandle *write_handle;
2495       
2496       /* Free the packets */
2497       for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2498       {
2499         GNUNET_free_non_null (socket->write_handle->messages[packet]);
2500       }
2501       write_handle = socket->write_handle;
2502       socket->write_handle = NULL;
2503       if (NULL != write_handle->write_cont)
2504         write_handle->write_cont (write_handle->write_cont_cls,
2505                                   socket->status,
2506                                   write_handle->size);
2507       /* We are done with the write handle - Freeing it */
2508       GNUNET_free (write_handle);
2509       LOG (GNUNET_ERROR_TYPE_DEBUG,
2510            "%s: Write completion callback completed\n",
2511            GNUNET_i2s (&socket->other_peer));      
2512     }
2513     break;
2514   default:
2515     break;
2516   }
2517   return GNUNET_OK;
2518 }
2519
2520
2521 /**
2522  * Handler for DATA_ACK messages
2523  *
2524  * @param cls the 'struct GNUNET_STREAM_Socket'
2525  * @param tunnel connection to the other end
2526  * @param tunnel_ctx unused
2527  * @param sender who sent the message
2528  * @param message the actual message
2529  * @param atsi performance data for the connection
2530  * @return GNUNET_OK to keep the connection open,
2531  *         GNUNET_SYSERR to close it (signal serious error)
2532  */
2533 static int
2534 client_handle_ack (void *cls,
2535                    struct GNUNET_MESH_Tunnel *tunnel,
2536                    void **tunnel_ctx,
2537                    const struct GNUNET_PeerIdentity *sender,
2538                    const struct GNUNET_MessageHeader *message,
2539                    const struct GNUNET_ATS_Information*atsi)
2540 {
2541   struct GNUNET_STREAM_Socket *socket = cls;
2542   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2543  
2544   return handle_ack (socket, tunnel, sender, ack, atsi);
2545 }
2546
2547
2548 /**
2549  * Handler for DATA_ACK messages
2550  *
2551  * @param cls the server's listen socket
2552  * @param tunnel connection to the other end
2553  * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
2554  * @param sender who sent the message
2555  * @param message the actual message
2556  * @param atsi performance data for the connection
2557  * @return GNUNET_OK to keep the connection open,
2558  *         GNUNET_SYSERR to close it (signal serious error)
2559  */
2560 static int
2561 server_handle_ack (void *cls,
2562                    struct GNUNET_MESH_Tunnel *tunnel,
2563                    void **tunnel_ctx,
2564                    const struct GNUNET_PeerIdentity *sender,
2565                    const struct GNUNET_MessageHeader *message,
2566                    const struct GNUNET_ATS_Information*atsi)
2567 {
2568   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2569   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2570  
2571   return handle_ack (socket, tunnel, sender, ack, atsi);
2572 }
2573
2574
2575 /**
2576  * For client message handlers, the stream socket is in the
2577  * closure argument.
2578  */
2579 static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
2580   {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2581   {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2582    sizeof (struct GNUNET_STREAM_AckMessage) },
2583   {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2584    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2585   {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2586    sizeof (struct GNUNET_STREAM_MessageHeader)},
2587   {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2588    sizeof (struct GNUNET_STREAM_MessageHeader)},
2589   {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2590    sizeof (struct GNUNET_STREAM_MessageHeader)},
2591   {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2592    sizeof (struct GNUNET_STREAM_MessageHeader)},
2593   {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2594    sizeof (struct GNUNET_STREAM_MessageHeader)},
2595   {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2596    sizeof (struct GNUNET_STREAM_MessageHeader)},
2597   {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2598    sizeof (struct GNUNET_STREAM_MessageHeader)},
2599   {NULL, 0, 0}
2600 };
2601
2602
2603 /**
2604  * For server message handlers, the stream socket is in the
2605  * tunnel context, and the listen socket in the closure argument.
2606  */
2607 static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
2608   {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2609   {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2610    sizeof (struct GNUNET_STREAM_AckMessage) },
2611   {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 
2612    sizeof (struct GNUNET_STREAM_MessageHeader)},
2613   {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2614    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2615   {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2616    sizeof (struct GNUNET_STREAM_MessageHeader)},
2617   {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2618    sizeof (struct GNUNET_STREAM_MessageHeader)},
2619   {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2620    sizeof (struct GNUNET_STREAM_MessageHeader)},
2621   {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2622    sizeof (struct GNUNET_STREAM_MessageHeader)},
2623   {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2624    sizeof (struct GNUNET_STREAM_MessageHeader)},
2625   {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2626    sizeof (struct GNUNET_STREAM_MessageHeader)},
2627   {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2628    sizeof (struct GNUNET_STREAM_MessageHeader)},
2629   {NULL, 0, 0}
2630 };
2631
2632
2633 /**
2634  * Function called when our target peer is connected to our tunnel
2635  *
2636  * @param cls the socket for which this tunnel is created
2637  * @param peer the peer identity of the target
2638  * @param atsi performance data for the connection
2639  */
2640 static void
2641 mesh_peer_connect_callback (void *cls,
2642                             const struct GNUNET_PeerIdentity *peer,
2643                             const struct GNUNET_ATS_Information * atsi)
2644 {
2645   struct GNUNET_STREAM_Socket *socket = cls;
2646   struct GNUNET_STREAM_MessageHeader *message;
2647   
2648   if (0 != memcmp (peer,
2649                    &socket->other_peer,
2650                    sizeof (struct GNUNET_PeerIdentity)))
2651   {
2652     LOG (GNUNET_ERROR_TYPE_DEBUG,
2653          "%s: A peer which is not our target has connected to our tunnel\n",
2654          GNUNET_i2s(peer));
2655     return;
2656   }
2657   
2658   LOG (GNUNET_ERROR_TYPE_DEBUG,
2659        "%s: Target peer %s connected\n",
2660        GNUNET_i2s (&socket->other_peer),
2661        GNUNET_i2s (&socket->other_peer));
2662   
2663   /* Set state to INIT */
2664   socket->state = STATE_INIT;
2665
2666   /* Send HELLO message */
2667   message = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
2668   message->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
2669   message->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
2670   queue_message (socket,
2671                  message,
2672                  &set_state_hello_wait,
2673                  NULL);
2674
2675   /* Call open callback */
2676   if (NULL == socket->open_cb)
2677   {
2678     LOG (GNUNET_ERROR_TYPE_DEBUG,
2679          "STREAM_open callback is NULL\n");
2680   }
2681 }
2682
2683
2684 /**
2685  * Function called when our target peer is disconnected from our tunnel
2686  *
2687  * @param cls the socket associated which this tunnel
2688  * @param peer the peer identity of the target
2689  */
2690 static void
2691 mesh_peer_disconnect_callback (void *cls,
2692                                const struct GNUNET_PeerIdentity *peer)
2693 {
2694   struct GNUNET_STREAM_Socket *socket=cls;
2695   
2696   /* If the state is SHUTDOWN its ok; else set the state of the socket to SYSERR */
2697   LOG (GNUNET_ERROR_TYPE_DEBUG,
2698        "%s: Other peer %s disconnected \n",
2699        GNUNET_i2s (&socket->other_peer),
2700        GNUNET_i2s (&socket->other_peer));
2701 }
2702
2703
2704 /**
2705  * Method called whenever a peer creates a tunnel to us
2706  *
2707  * @param cls closure
2708  * @param tunnel new handle to the tunnel
2709  * @param initiator peer that started the tunnel
2710  * @param atsi performance information for the tunnel
2711  * @return initial tunnel context for the tunnel
2712  *         (can be NULL -- that's not an error)
2713  */
2714 static void *
2715 new_tunnel_notify (void *cls,
2716                    struct GNUNET_MESH_Tunnel *tunnel,
2717                    const struct GNUNET_PeerIdentity *initiator,
2718                    const struct GNUNET_ATS_Information *atsi)
2719 {
2720   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2721   struct GNUNET_STREAM_Socket *socket;
2722
2723   /* FIXME: If a tunnel is already created, we should not accept new tunnels
2724      from the same peer again until the socket is closed */
2725
2726   if (GNUNET_NO == lsocket->listening)
2727   {
2728     LOG (GNUNET_ERROR_TYPE_DEBUG,
2729          "%s: Destroying tunnel from peer %s as we don't have the lock\n",
2730          GNUNET_i2s (&socket->other_peer),
2731          GNUNET_i2s (&socket->other_peer));
2732     GNUNET_MESH_tunnel_destroy (tunnel);
2733     return NULL;
2734   }
2735   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2736   socket->other_peer = *initiator;
2737   socket->tunnel = tunnel;
2738   socket->session_id = 0;       /* FIXME */
2739   socket->state = STATE_INIT;
2740   socket->lsocket = lsocket;
2741   socket->retransmit_timeout = lsocket->retransmit_timeout;
2742   socket->testing_active = lsocket->testing_active;
2743   socket->testing_set_write_sequence_number_value =
2744     lsocket->testing_set_write_sequence_number_value;
2745     
2746   LOG (GNUNET_ERROR_TYPE_DEBUG,
2747        "%s: Peer %s initiated tunnel to us\n", 
2748        GNUNET_i2s (&socket->other_peer),
2749        GNUNET_i2s (&socket->other_peer));
2750   
2751   /* FIXME: Copy MESH handle from lsocket to socket */
2752   
2753   return socket;
2754 }
2755
2756
2757 /**
2758  * Function called whenever an inbound tunnel is destroyed.  Should clean up
2759  * any associated state.  This function is NOT called if the client has
2760  * explicitly asked for the tunnel to be destroyed using
2761  * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
2762  * the tunnel.
2763  *
2764  * @param cls closure (set from GNUNET_MESH_connect)
2765  * @param tunnel connection to the other end (henceforth invalid)
2766  * @param tunnel_ctx place where local state associated
2767  *                   with the tunnel is stored
2768  */
2769 static void 
2770 tunnel_cleaner (void *cls,
2771                 const struct GNUNET_MESH_Tunnel *tunnel,
2772                 void *tunnel_ctx)
2773 {
2774   struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
2775
2776   if (tunnel != socket->tunnel)
2777     return;
2778
2779   GNUNET_break_op(0);
2780   LOG (GNUNET_ERROR_TYPE_DEBUG,
2781        "%s: Peer %s has terminated connection abruptly\n",
2782        GNUNET_i2s (&socket->other_peer),
2783        GNUNET_i2s (&socket->other_peer));
2784
2785   socket->status = GNUNET_STREAM_SHUTDOWN;
2786
2787   /* Clear Transmit handles */
2788   if (NULL != socket->transmit_handle)
2789   {
2790     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2791     socket->transmit_handle = NULL;
2792   }
2793   if (NULL != socket->ack_transmit_handle)
2794   {
2795     GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle);
2796     GNUNET_free (socket->ack_msg);
2797     socket->ack_msg = NULL;
2798     socket->ack_transmit_handle = NULL;
2799   }
2800   /* Stop Tasks using socket->tunnel */
2801   if (GNUNET_SCHEDULER_NO_TASK != socket->ack_task_id)
2802   {
2803     GNUNET_SCHEDULER_cancel (socket->ack_task_id);
2804     socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
2805   }
2806   if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
2807   {
2808     GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
2809     socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
2810   }
2811   /* FIXME: Cancel all other tasks using socket->tunnel */
2812   socket->tunnel = NULL;
2813 }
2814
2815
2816 /**
2817  * Callback to signal timeout on lockmanager lock acquire
2818  *
2819  * @param cls the ListenSocket
2820  * @param tc the scheduler task context
2821  */
2822 static void
2823 lockmanager_acquire_timeout (void *cls, 
2824                              const struct GNUNET_SCHEDULER_TaskContext *tc)
2825 {
2826   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2827   GNUNET_STREAM_ListenCallback listen_cb;
2828   void *listen_cb_cls;
2829
2830   lsocket->lockmanager_acquire_timeout_task = GNUNET_SCHEDULER_NO_TASK;
2831   listen_cb = lsocket->listen_cb;
2832   listen_cb_cls = lsocket->listen_cb_cls;
2833   GNUNET_STREAM_listen_close (lsocket);
2834   if (NULL != listen_cb)
2835     listen_cb (listen_cb_cls, NULL, NULL);  
2836 }
2837
2838
2839 /**
2840  * Callback to notify us on the status changes on app_port lock
2841  *
2842  * @param cls the ListenSocket
2843  * @param domain the domain name of the lock
2844  * @param lock the app_port
2845  * @param status the current status of the lock
2846  */
2847 static void
2848 lock_status_change_cb (void *cls, const char *domain, uint32_t lock,
2849                        enum GNUNET_LOCKMANAGER_Status status)
2850 {
2851   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2852
2853   GNUNET_assert (lock == (uint32_t) lsocket->port);
2854   if (GNUNET_LOCKMANAGER_SUCCESS == status)
2855   {
2856     lsocket->listening = GNUNET_YES;
2857     if (GNUNET_SCHEDULER_NO_TASK != lsocket->lockmanager_acquire_timeout_task)
2858     {
2859       GNUNET_SCHEDULER_cancel (lsocket->lockmanager_acquire_timeout_task);
2860       lsocket->lockmanager_acquire_timeout_task = GNUNET_SCHEDULER_NO_TASK;
2861     }
2862     if (NULL == lsocket->mesh)
2863     {
2864       GNUNET_MESH_ApplicationType ports[] = {lsocket->port, 0};
2865
2866       lsocket->mesh = GNUNET_MESH_connect (lsocket->cfg,
2867                                            RECEIVE_BUFFER_SIZE, /* FIXME: QUEUE size as parameter? */
2868                                            lsocket, /* Closure */
2869                                            &new_tunnel_notify,
2870                                            &tunnel_cleaner,
2871                                            server_message_handlers,
2872                                            ports);
2873       GNUNET_assert (NULL != lsocket->mesh);
2874     }
2875   }
2876   if (GNUNET_LOCKMANAGER_RELEASE == status)
2877     lsocket->listening = GNUNET_NO;
2878 }
2879
2880
2881 /*****************/
2882 /* API functions */
2883 /*****************/
2884
2885
2886 /**
2887  * Tries to open a stream to the target peer
2888  *
2889  * @param cfg configuration to use
2890  * @param target the target peer to which the stream has to be opened
2891  * @param app_port the application port number which uniquely identifies this
2892  *            stream
2893  * @param open_cb this function will be called after stream has be established 
2894  * @param open_cb_cls the closure for open_cb
2895  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
2896  * @return if successful it returns the stream socket; NULL if stream cannot be
2897  *         opened 
2898  */
2899 struct GNUNET_STREAM_Socket *
2900 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
2901                     const struct GNUNET_PeerIdentity *target,
2902                     GNUNET_MESH_ApplicationType app_port,
2903                     GNUNET_STREAM_OpenCallback open_cb,
2904                     void *open_cb_cls,
2905                     ...)
2906 {
2907   struct GNUNET_STREAM_Socket *socket;
2908   enum GNUNET_STREAM_Option option;
2909   GNUNET_MESH_ApplicationType ports[] = {app_port, 0};
2910   va_list vargs;                /* Variable arguments */
2911
2912   LOG (GNUNET_ERROR_TYPE_DEBUG,
2913        "%s\n", __func__);
2914   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2915   socket->other_peer = *target;
2916   socket->open_cb = open_cb;
2917   socket->open_cls = open_cb_cls;
2918   /* Set defaults */
2919   socket->retransmit_timeout = 
2920     GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
2921   socket->testing_active = GNUNET_NO;
2922   va_start (vargs, open_cb_cls); /* Parse variable args */
2923   do {
2924     option = va_arg (vargs, enum GNUNET_STREAM_Option);
2925     switch (option)
2926     {
2927     case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
2928       /* Expect struct GNUNET_TIME_Relative */
2929       socket->retransmit_timeout = va_arg (vargs,
2930                                            struct GNUNET_TIME_Relative);
2931       break;
2932     case GNUNET_STREAM_OPTION_TESTING_SET_WRITE_SEQUENCE_NUMBER:
2933       socket->testing_active = GNUNET_YES;
2934       socket->testing_set_write_sequence_number_value = va_arg (vargs,
2935                                                                 uint32_t);
2936       break;
2937     case GNUNET_STREAM_OPTION_END:
2938       break;
2939     }
2940   } while (GNUNET_STREAM_OPTION_END != option);
2941   va_end (vargs);               /* End of variable args parsing */
2942   socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
2943                                       RECEIVE_BUFFER_SIZE,  /* QUEUE size as parameter? */
2944                                       socket, /* cls */
2945                                       NULL, /* No inbound tunnel handler */
2946                                       NULL, /* No in-tunnel cleaner */
2947                                       client_message_handlers,
2948                                       ports); /* We don't get inbound tunnels */
2949   if (NULL == socket->mesh)   /* Fail if we cannot connect to mesh */
2950   {
2951     GNUNET_free (socket);
2952     return NULL;
2953   }
2954
2955   /* Now create the mesh tunnel to target */
2956   LOG (GNUNET_ERROR_TYPE_DEBUG,
2957        "Creating MESH Tunnel\n");
2958   socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
2959                                               NULL, /* Tunnel context */
2960                                               &mesh_peer_connect_callback,
2961                                               &mesh_peer_disconnect_callback,
2962                                               socket);
2963   GNUNET_assert (NULL != socket->tunnel);
2964   GNUNET_MESH_peer_request_connect_add (socket->tunnel,
2965                                         &socket->other_peer);
2966   
2967   LOG (GNUNET_ERROR_TYPE_DEBUG,
2968        "%s() END\n", __func__);
2969   return socket;
2970 }
2971
2972
2973 /**
2974  * Shutdown the stream for reading or writing (similar to man 2 shutdown).
2975  *
2976  * @param socket the stream socket
2977  * @param operation SHUT_RD, SHUT_WR or SHUT_RDWR
2978  * @param completion_cb the callback that will be called upon successful
2979  *          shutdown of given operation
2980  * @param completion_cls the closure for the completion callback
2981  * @return the shutdown handle
2982  */
2983 struct GNUNET_STREAM_ShutdownHandle *
2984 GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
2985                         int operation,
2986                         GNUNET_STREAM_ShutdownCompletion completion_cb,
2987                         void *completion_cls)
2988 {
2989   struct GNUNET_STREAM_ShutdownHandle *handle;
2990   struct GNUNET_STREAM_MessageHeader *msg;
2991   
2992   GNUNET_assert (NULL == socket->shutdown_handle);
2993
2994   handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ShutdownHandle));
2995   handle->socket = socket;
2996   handle->completion_cb = completion_cb;
2997   handle->completion_cls = completion_cls;
2998   socket->shutdown_handle = handle;
2999
3000   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
3001   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
3002   switch (operation)
3003   {
3004   case SHUT_RD:
3005     handle->operation = SHUT_RD;
3006     if (NULL != socket->read_handle)
3007       LOG (GNUNET_ERROR_TYPE_WARNING,
3008            "Existing read handle should be cancelled before shutting"
3009            " down reading\n");
3010     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
3011     queue_message (socket,
3012                    msg,
3013                    &set_state_receive_close_wait,
3014                    NULL);
3015     break;
3016   case SHUT_WR:
3017     handle->operation = SHUT_WR;
3018     if (NULL != socket->write_handle)
3019       LOG (GNUNET_ERROR_TYPE_WARNING,
3020            "Existing write handle should be cancelled before shutting"
3021            " down writing\n");
3022     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
3023     queue_message (socket,
3024                    msg,
3025                    &set_state_transmit_close_wait,
3026                    NULL);
3027     break;
3028   case SHUT_RDWR:
3029     handle->operation = SHUT_RDWR;
3030     if (NULL != socket->write_handle)
3031       LOG (GNUNET_ERROR_TYPE_WARNING,
3032            "Existing write handle should be cancelled before shutting"
3033            " down writing\n");
3034     if (NULL != socket->read_handle)
3035       LOG (GNUNET_ERROR_TYPE_WARNING,
3036            "Existing read handle should be cancelled before shutting"
3037            " down reading\n");
3038     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
3039     queue_message (socket,
3040                    msg,
3041                    &set_state_close_wait,
3042                    NULL);
3043     break;
3044   default:
3045     LOG (GNUNET_ERROR_TYPE_WARNING,
3046          "GNUNET_STREAM_shutdown called with invalid value for "
3047          "parameter operation -- Ignoring\n");
3048     GNUNET_free (msg);
3049     GNUNET_free (handle);
3050     return NULL;
3051   }
3052   handle->close_msg_retransmission_task_id =
3053     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
3054                                   &close_msg_retransmission_task,
3055                                   handle);
3056   return handle;
3057 }
3058
3059
3060 /**
3061  * Cancels a pending shutdown
3062  *
3063  * @param handle the shutdown handle returned from GNUNET_STREAM_shutdown
3064  */
3065 void
3066 GNUNET_STREAM_shutdown_cancel (struct GNUNET_STREAM_ShutdownHandle *handle)
3067 {
3068   if (GNUNET_SCHEDULER_NO_TASK != handle->close_msg_retransmission_task_id)
3069     GNUNET_SCHEDULER_cancel (handle->close_msg_retransmission_task_id);
3070   GNUNET_free (handle);
3071   return;
3072 }
3073
3074
3075 /**
3076  * Closes the stream
3077  *
3078  * @param socket the stream socket
3079  */
3080 void
3081 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
3082 {
3083   struct MessageQueue *head;
3084
3085   if (NULL != socket->read_handle)
3086   {
3087     LOG (GNUNET_ERROR_TYPE_WARNING,
3088          "Closing STREAM socket when a read handle is pending\n");
3089   }
3090   if (NULL != socket->write_handle)
3091   {
3092     LOG (GNUNET_ERROR_TYPE_WARNING,
3093          "Closing STREAM socket when a write handle is pending\n");
3094     GNUNET_STREAM_io_write_cancel (socket->write_handle);
3095     //socket->write_handle = NULL;
3096   }
3097
3098   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
3099   {
3100     /* socket closed with read task pending!? */
3101     GNUNET_break (0);
3102     GNUNET_SCHEDULER_cancel (socket->read_task_id);
3103     socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
3104   }
3105   
3106   /* Terminate the ack'ing tasks if they are still present */
3107   if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK)
3108   {
3109     GNUNET_SCHEDULER_cancel (socket->ack_task_id);
3110     socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
3111   }
3112
3113   /* Clear Transmit handles */
3114   if (NULL != socket->transmit_handle)
3115   {
3116     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
3117     socket->transmit_handle = NULL;
3118   }
3119   if (NULL != socket->ack_transmit_handle)
3120   {
3121     GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle);
3122     GNUNET_free (socket->ack_msg);
3123     socket->ack_msg = NULL;
3124     socket->ack_transmit_handle = NULL;
3125   }
3126
3127   /* Clear existing message queue */
3128   while (NULL != (head = socket->queue_head)) {
3129     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
3130                                  socket->queue_tail,
3131                                  head);
3132     GNUNET_free (head->message);
3133     GNUNET_free (head);
3134   }
3135
3136   /* Close associated tunnel */
3137   if (NULL != socket->tunnel)
3138   {
3139     GNUNET_MESH_tunnel_destroy (socket->tunnel);
3140     socket->tunnel = NULL;
3141   }
3142
3143   /* Close mesh connection */
3144   if (NULL != socket->mesh && NULL == socket->lsocket)
3145   {
3146     GNUNET_MESH_disconnect (socket->mesh);
3147     socket->mesh = NULL;
3148   }
3149   
3150   /* Release receive buffer */
3151   if (NULL != socket->receive_buffer)
3152   {
3153     GNUNET_free (socket->receive_buffer);
3154   }
3155
3156   GNUNET_free (socket);
3157 }
3158
3159
3160 /**
3161  * Listens for stream connections for a specific application ports
3162  *
3163  * @param cfg the configuration to use
3164  * @param app_port the application port for which new streams will be accepted
3165  * @param listen_cb this function will be called when a peer tries to establish
3166  *            a stream with us
3167  * @param listen_cb_cls closure for listen_cb
3168  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
3169  * @return listen socket, NULL for any error
3170  */
3171 struct GNUNET_STREAM_ListenSocket *
3172 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
3173                       GNUNET_MESH_ApplicationType app_port,
3174                       GNUNET_STREAM_ListenCallback listen_cb,
3175                       void *listen_cb_cls,
3176                       ...)
3177 {
3178   /* FIXME: Add variable args for passing configration options? */
3179   struct GNUNET_STREAM_ListenSocket *lsocket;
3180   enum GNUNET_STREAM_Option option;
3181   va_list vargs;
3182
3183   lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
3184   lsocket->cfg = GNUNET_CONFIGURATION_dup (cfg);
3185   lsocket->lockmanager = GNUNET_LOCKMANAGER_connect (lsocket->cfg);
3186   if (NULL == lsocket->lockmanager)
3187   {
3188     GNUNET_CONFIGURATION_destroy (lsocket->cfg);
3189     GNUNET_free (lsocket);
3190     return NULL;
3191   }
3192   lsocket->listening = GNUNET_NO;/* We listen when we get a lock on app_port */  
3193   /* Set defaults */
3194   lsocket->retransmit_timeout = 
3195     GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
3196   lsocket->testing_active = GNUNET_NO;
3197   va_start (vargs, listen_cb_cls);
3198   do {
3199     option = va_arg (vargs, enum GNUNET_STREAM_Option);
3200     switch (option)
3201     {
3202     case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
3203       lsocket->retransmit_timeout = va_arg (vargs,
3204                                             struct GNUNET_TIME_Relative);
3205       break;
3206     case GNUNET_STREAM_OPTION_TESTING_SET_WRITE_SEQUENCE_NUMBER:
3207       lsocket->testing_active = GNUNET_YES;
3208       lsocket->testing_set_write_sequence_number_value = va_arg (vargs,
3209                                                                  uint32_t);
3210       break;
3211     case GNUNET_STREAM_OPTION_END:
3212       break;
3213     }
3214   } while (GNUNET_STREAM_OPTION_END != option);
3215   va_end (vargs);
3216   lsocket->port = app_port;
3217   lsocket->listen_cb = listen_cb;
3218   lsocket->listen_cb_cls = listen_cb_cls;
3219   lsocket->locking_request = 
3220     GNUNET_LOCKMANAGER_acquire_lock (lsocket->lockmanager, locking_domain,
3221                                      (uint32_t) lsocket->port,
3222                                      &lock_status_change_cb, lsocket);
3223   lsocket->lockmanager_acquire_timeout_task =
3224     GNUNET_SCHEDULER_add_delayed (TIME_REL_SECS (20),
3225                                   &lockmanager_acquire_timeout, lsocket);
3226   return lsocket;
3227 }
3228
3229
3230 /**
3231  * Closes the listen socket
3232  *
3233  * @param lsocket the listen socket
3234  */
3235 void
3236 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
3237 {
3238   /* Close MESH connection */
3239   if (NULL != lsocket->mesh)
3240     GNUNET_MESH_disconnect (lsocket->mesh);
3241   GNUNET_CONFIGURATION_destroy (lsocket->cfg);
3242   if (GNUNET_SCHEDULER_NO_TASK != lsocket->lockmanager_acquire_timeout_task)
3243     GNUNET_SCHEDULER_cancel (lsocket->lockmanager_acquire_timeout_task);
3244   if (NULL != lsocket->locking_request)
3245     GNUNET_LOCKMANAGER_cancel_request (lsocket->locking_request);
3246   if (NULL != lsocket->lockmanager)
3247     GNUNET_LOCKMANAGER_disconnect (lsocket->lockmanager);
3248   GNUNET_free (lsocket);
3249 }
3250
3251
3252 /**
3253  * Tries to write the given data to the stream. The maximum size of data that
3254  * can be written as part of a write operation is (64 * (64000 - sizeof (struct
3255  * GNUNET_STREAM_DataMessage))). If size is greater than this it is not an API
3256  * violation, however only the said number of maximum bytes will be written.
3257  *
3258  * @param socket the socket representing a stream
3259  * @param data the data buffer from where the data is written into the stream
3260  * @param size the number of bytes to be written from the data buffer
3261  * @param timeout the timeout period
3262  * @param write_cont the function to call upon writing some bytes into the
3263  *          stream 
3264  * @param write_cont_cls the closure
3265  *
3266  * @return handle to cancel the operation; if a previous write is pending or
3267  *           the stream has been shutdown for this operation then write_cont is
3268  *           immediately called and NULL is returned.
3269  */
3270 struct GNUNET_STREAM_IOWriteHandle *
3271 GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
3272                      const void *data,
3273                      size_t size,
3274                      struct GNUNET_TIME_Relative timeout,
3275                      GNUNET_STREAM_CompletionContinuation write_cont,
3276                      void *write_cont_cls)
3277 {
3278   unsigned int num_needed_packets;
3279   unsigned int packet;
3280   struct GNUNET_STREAM_IOWriteHandle *io_handle;
3281   uint32_t packet_size;
3282   uint32_t payload_size;
3283   struct GNUNET_STREAM_DataMessage *data_msg;
3284   const void *sweep;
3285   struct GNUNET_TIME_Relative ack_deadline;
3286
3287   LOG (GNUNET_ERROR_TYPE_DEBUG,
3288        "%s\n", __func__);
3289
3290   /* Return NULL if there is already a write request pending */
3291   if (NULL != socket->write_handle)
3292   {
3293     GNUNET_break (0);
3294     return NULL;
3295   }
3296
3297   switch (socket->state)
3298   {
3299   case STATE_TRANSMIT_CLOSED:
3300   case STATE_TRANSMIT_CLOSE_WAIT:
3301   case STATE_CLOSED:
3302   case STATE_CLOSE_WAIT:
3303     if (NULL != write_cont)
3304       write_cont (write_cont_cls, GNUNET_STREAM_SHUTDOWN, 0);
3305     LOG (GNUNET_ERROR_TYPE_DEBUG,
3306          "%s() END\n", __func__);
3307     return NULL;
3308   case STATE_INIT:
3309   case STATE_LISTEN:
3310   case STATE_HELLO_WAIT:
3311     if (NULL != write_cont)
3312       /* FIXME: GNUNET_STREAM_SYSERR?? */
3313       write_cont (write_cont_cls, GNUNET_STREAM_SYSERR, 0);
3314     LOG (GNUNET_ERROR_TYPE_DEBUG,
3315          "%s() END\n", __func__);
3316     return NULL;
3317   case STATE_ESTABLISHED:
3318   case STATE_RECEIVE_CLOSED:
3319   case STATE_RECEIVE_CLOSE_WAIT:
3320     break;
3321   }
3322
3323   if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size)
3324     size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH  * max_payload_size;
3325   num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size;
3326   io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
3327   io_handle->socket = socket;
3328   io_handle->write_cont = write_cont;
3329   io_handle->write_cont_cls = write_cont_cls;
3330   io_handle->size = size;
3331   sweep = data;
3332   /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3333      determined from RTT */
3334   ack_deadline = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
3335   /* Divide the given buffer into packets for sending */
3336   for (packet=0; packet < num_needed_packets; packet++)
3337   {
3338     if ((packet + 1) * max_payload_size < size) 
3339     {
3340       payload_size = max_payload_size;
3341       packet_size = MAX_PACKET_SIZE;
3342     }
3343     else 
3344     {
3345       payload_size = size - packet * max_payload_size;
3346       packet_size =  payload_size + sizeof (struct
3347                                             GNUNET_STREAM_DataMessage); 
3348     }
3349     io_handle->messages[packet] = GNUNET_malloc (packet_size);
3350     io_handle->messages[packet]->header.header.size = htons (packet_size);
3351     io_handle->messages[packet]->header.header.type =
3352       htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
3353     io_handle->messages[packet]->sequence_number =
3354       htonl (socket->write_sequence_number++);
3355     io_handle->messages[packet]->offset = htonl (socket->write_offset);
3356
3357     /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3358        determined from RTT */
3359     io_handle->messages[packet]->ack_deadline =
3360       GNUNET_TIME_relative_hton (ack_deadline);
3361     data_msg = io_handle->messages[packet];
3362     /* Copy data from given buffer to the packet */
3363     memcpy (&data_msg[1],
3364             sweep,
3365             payload_size);
3366     sweep += payload_size;
3367     socket->write_offset += payload_size;
3368   }
3369   socket->write_handle = io_handle;
3370   write_data (socket);
3371
3372   LOG (GNUNET_ERROR_TYPE_DEBUG,
3373        "%s() END\n", __func__);
3374
3375   return io_handle;
3376 }
3377
3378
3379
3380 /**
3381  * Tries to read data from the stream.
3382  *
3383  * @param socket the socket representing a stream
3384  * @param timeout the timeout period
3385  * @param proc function to call with data (once only)
3386  * @param proc_cls the closure for proc
3387  *
3388  * @return handle to cancel the operation; if the stream has been shutdown for
3389  *           this type of opeartion then the DataProcessor is immediately
3390  *           called with GNUNET_STREAM_SHUTDOWN as status and NULL if returned
3391  */
3392 struct GNUNET_STREAM_IOReadHandle *
3393 GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
3394                     struct GNUNET_TIME_Relative timeout,
3395                     GNUNET_STREAM_DataProcessor proc,
3396                     void *proc_cls)
3397 {
3398   struct GNUNET_STREAM_IOReadHandle *read_handle;
3399   
3400   LOG (GNUNET_ERROR_TYPE_DEBUG,
3401        "%s: %s()\n", 
3402        GNUNET_i2s (&socket->other_peer),
3403        __func__);
3404
3405   /* Return NULL if there is already a read handle; the user has to cancel that
3406      first before continuing or has to wait until it is completed */
3407   if (NULL != socket->read_handle) return NULL;
3408
3409   GNUNET_assert (NULL != proc);
3410
3411   switch (socket->state)
3412   {
3413   case STATE_RECEIVE_CLOSED:
3414   case STATE_RECEIVE_CLOSE_WAIT:
3415   case STATE_CLOSED:
3416   case STATE_CLOSE_WAIT:
3417     proc (proc_cls, GNUNET_STREAM_SHUTDOWN, NULL, 0);
3418     LOG (GNUNET_ERROR_TYPE_DEBUG,
3419          "%s: %s() END\n",
3420          GNUNET_i2s (&socket->other_peer),
3421          __func__);
3422     return NULL;
3423   default:
3424     break;
3425   }
3426
3427   read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
3428   read_handle->proc = proc;
3429   read_handle->proc_cls = proc_cls;
3430   socket->read_handle = read_handle;
3431
3432   /* Check if we have a packet at bitmap 0 */
3433   if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
3434                                           0))
3435   {
3436     socket->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
3437                                                      socket);
3438    
3439   }
3440   
3441   /* Setup the read timeout task */
3442   socket->read_io_timeout_task_id =
3443     GNUNET_SCHEDULER_add_delayed (timeout,
3444                                   &read_io_timeout,
3445                                   socket);
3446   LOG (GNUNET_ERROR_TYPE_DEBUG,
3447        "%s: %s() END\n",
3448        GNUNET_i2s (&socket->other_peer),
3449        __func__);
3450   return read_handle;
3451 }
3452
3453
3454 /**
3455  * Cancel pending write operation.
3456  *
3457  * @param ioh handle to operation to cancel
3458  */
3459 void
3460 GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
3461 {
3462   struct GNUNET_STREAM_Socket *socket = ioh->socket;
3463   unsigned int packet;
3464
3465   GNUNET_assert (NULL != socket->write_handle);
3466   GNUNET_assert (socket->write_handle == ioh);
3467
3468   if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
3469   {
3470     GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
3471     socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
3472   }
3473
3474   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
3475   {
3476     if (NULL == ioh->messages[packet]) break;
3477     GNUNET_free (ioh->messages[packet]);
3478   }
3479       
3480   GNUNET_free (socket->write_handle);
3481   socket->write_handle = NULL;
3482   return;
3483 }
3484
3485
3486 /**
3487  * Cancel pending read operation.
3488  *
3489  * @param ioh handle to operation to cancel
3490  */
3491 void
3492 GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh)
3493 {
3494   return;
3495 }