commenting out dead test
[oweals/gnunet.git] / src / stream / stream_api.c
1 /*
2   This file is part of GNUnet.
3   (C) 2012 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /* TODO:
22  *
23  * Checks for matching the sender and socket->other_peer in server
24  * message handlers  
25  *
26  * Add code for write io timeout
27  *
28  * Include retransmission for control messages
29  **/
30
31 /**
32  * @file stream/stream_api.c
33  * @brief Implementation of the stream library
34  * @author Sree Harsha Totakura
35  */
36
37
38 #include "platform.h"
39 #include "gnunet_common.h"
40 #include "gnunet_crypto_lib.h"
41 #include "gnunet_lockmanager_service.h"
42 #include "gnunet_stream_lib.h"
43 #include "stream_protocol.h"
44
45 /**
46  * Generic logging shorthand
47  */
48 #define LOG(kind,...)                                   \
49   GNUNET_log_from (kind, "stream-api", __VA_ARGS__)
50
51 /**
52  * Debug logging shorthand
53  */
54 #define LOG_DEBUG(...)                          \
55   LOG (GNUNET_ERROR_TYPE_DEBUG, __VA_ARGS__)
56
57 /**
58  * Time in relative seconds shorthand
59  */
60 #define TIME_REL_SECS(sec) \
61   GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, sec)
62
63 /**
64  * The maximum packet size of a stream packet
65  */
66 #define MAX_PACKET_SIZE 512//64000
67
68 /**
69  * Receive buffer
70  */
71 #define RECEIVE_BUFFER_SIZE 4096000
72
73 /**
74  * The maximum payload a data message packet can carry
75  */
76 static const size_t max_payload_size = 
77   MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
78
79 /**
80  * states in the Protocol
81  */
82 enum State
83   {
84     /**
85      * Client initialization state
86      */
87     STATE_INIT,
88
89     /**
90      * Listener initialization state 
91      */
92     STATE_LISTEN,
93
94     /**
95      * Pre-connection establishment state
96      */
97     STATE_HELLO_WAIT,
98
99     /**
100      * State where a connection has been established
101      */
102     STATE_ESTABLISHED,
103
104     /**
105      * State where the socket is closed on our side and waiting to be ACK'ed
106      */
107     STATE_RECEIVE_CLOSE_WAIT,
108
109     /**
110      * State where the socket is closed for reading
111      */
112     STATE_RECEIVE_CLOSED,
113
114     /**
115      * State where the socket is closed on our side and waiting to be ACK'ed
116      */
117     STATE_TRANSMIT_CLOSE_WAIT,
118
119     /**
120      * State where the socket is closed for writing
121      */
122     STATE_TRANSMIT_CLOSED,
123
124     /**
125      * State where the socket is closed on our side and waiting to be ACK'ed
126      */
127     STATE_CLOSE_WAIT,
128
129     /**
130      * State where the socket is closed
131      */
132     STATE_CLOSED 
133   };
134
135
136 /**
137  * Functions of this type are called when a message is written
138  *
139  * @param cls the closure from queue_message
140  * @param socket the socket the written message was bound to
141  */
142 typedef void (*SendFinishCallback) (void *cls,
143                                     struct GNUNET_STREAM_Socket *socket);
144
145
146 /**
147  * The send message queue
148  */
149 struct MessageQueue
150 {
151   /**
152    * The message
153    */
154   struct GNUNET_STREAM_MessageHeader *message;
155
156   /**
157    * Callback to be called when the message is sent
158    */
159   SendFinishCallback finish_cb;
160
161   /**
162    * The closure for finish_cb
163    */
164   void *finish_cb_cls;
165
166   /**
167    * The next message in queue. Should be NULL in the last message
168    */
169   struct MessageQueue *next;
170
171   /**
172    * The next message in queue. Should be NULL in the first message
173    */
174   struct MessageQueue *prev;
175 };
176
177
178 /**
179  * The STREAM Socket Handler
180  */
181 struct GNUNET_STREAM_Socket
182 {
183   /**
184    * Retransmission timeout
185    */
186   struct GNUNET_TIME_Relative retransmit_timeout;
187
188   /**
189    * The Acknowledgement Bitmap
190    */
191   GNUNET_STREAM_AckBitmap ack_bitmap;
192
193   /**
194    * Time when the Acknowledgement was queued
195    */
196   struct GNUNET_TIME_Absolute ack_time_registered;
197
198   /**
199    * Queued Acknowledgement deadline
200    */
201   struct GNUNET_TIME_Relative ack_time_deadline;
202
203   /**
204    * The mesh handle
205    */
206   struct GNUNET_MESH_Handle *mesh;
207
208   /**
209    * The mesh tunnel handle
210    */
211   struct GNUNET_MESH_Tunnel *tunnel;
212
213   /**
214    * Stream open closure
215    */
216   void *open_cls;
217
218   /**
219    * Stream open callback
220    */
221   GNUNET_STREAM_OpenCallback open_cb;
222
223   /**
224    * The current transmit handle (if a pending transmit request exists)
225    */
226   struct GNUNET_MESH_TransmitHandle *transmit_handle;
227
228   /**
229    * The current message associated with the transmit handle
230    */
231   struct MessageQueue *queue_head;
232
233   /**
234    * The queue tail, should always point to the last message in queue
235    */
236   struct MessageQueue *queue_tail;
237
238   /**
239    * The write IO_handle associated with this socket
240    */
241   struct GNUNET_STREAM_IOWriteHandle *write_handle;
242
243   /**
244    * The read IO_handle associated with this socket
245    */
246   struct GNUNET_STREAM_IOReadHandle *read_handle;
247
248   /**
249    * The shutdown handle associated with this socket
250    */
251   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
252
253   /**
254    * Buffer for storing received messages
255    */
256   void *receive_buffer;
257
258   /**
259    * The listen socket from which this socket is derived. Should be NULL if it
260    * is not a derived socket
261    */
262   struct GNUNET_STREAM_ListenSocket *lsocket;
263
264   /**
265    * The peer identity of the peer at the other end of the stream
266    */
267   struct GNUNET_PeerIdentity other_peer;
268
269   /**
270    * Task identifier for the read io timeout task
271    */
272   GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id;
273
274   /**
275    * Task identifier for retransmission task after timeout
276    */
277   GNUNET_SCHEDULER_TaskIdentifier data_retransmission_task_id;
278
279   /**
280    * Task identifier for retransmission of control messages
281    */
282   GNUNET_SCHEDULER_TaskIdentifier control_retransmission_task_id;
283
284   /**
285    * The task for sending timely Acks
286    */
287   GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
288
289   /**
290    * Task scheduled to continue a read operation.
291    */
292   GNUNET_SCHEDULER_TaskIdentifier read_task_id;
293
294   /**
295    * The state of the protocol associated with this socket
296    */
297   enum State state;
298
299   /**
300    * The status of the socket
301    */
302   enum GNUNET_STREAM_Status status;
303
304   /**
305    * The number of previous timeouts; FIXME: currently not used
306    */
307   unsigned int retries;
308
309   /**
310    * The application port number (type: uint32_t)
311    */
312   GNUNET_MESH_ApplicationType app_port;
313
314   /**
315    * Whether testing mode is active or not
316    */
317   int testing_active;
318
319   /**
320    * The write sequence number to be set incase of testing
321    */
322   uint32_t testing_set_write_sequence_number_value;
323
324   /**
325    * The session id associated with this stream connection
326    * FIXME: Not used currently, may be removed
327    */
328   uint32_t session_id;
329
330   /**
331    * Write sequence number. Set to random when sending HELLO(client) and
332    * HELLO_ACK(server) 
333    */
334   uint32_t write_sequence_number;
335
336   /**
337    * Read sequence number. This number's value is determined during handshake
338    */
339   uint32_t read_sequence_number;
340
341   /**
342    * The receiver buffer size
343    */
344   uint32_t receive_buffer_size;
345
346   /**
347    * The receiver buffer boundaries
348    */
349   uint32_t receive_buffer_boundaries[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
350
351   /**
352    * receiver's available buffer after the last acknowledged packet
353    */
354   uint32_t receiver_window_available;
355
356   /**
357    * The offset pointer used during write operation
358    */
359   uint32_t write_offset;
360
361   /**
362    * The offset after which we are expecting data
363    */
364   uint32_t read_offset;
365
366   /**
367    * The offset upto which user has read from the received buffer
368    */
369   uint32_t copy_offset;
370 };
371
372
373 /**
374  * A socket for listening
375  */
376 struct GNUNET_STREAM_ListenSocket
377 {
378   /**
379    * The mesh handle
380    */
381   struct GNUNET_MESH_Handle *mesh;
382
383   /**
384    * Our configuration
385    */
386   struct GNUNET_CONFIGURATION_Handle *cfg;
387
388   /**
389    * Handle to the lock manager service
390    */
391   struct GNUNET_LOCKMANAGER_Handle *lockmanager;
392
393   /**
394    * The active LockingRequest from lockmanager
395    */
396   struct GNUNET_LOCKMANAGER_LockingRequest *locking_request;
397
398   /**
399    * Callback to call after acquring a lock and listening
400    */
401   GNUNET_STREAM_ListenSuccessCallback listen_ok_cb;
402
403   /**
404    * The callback function which is called after successful opening socket
405    */
406   GNUNET_STREAM_ListenCallback listen_cb;
407
408   /**
409    * The call back closure
410    */
411   void *listen_cb_cls;
412
413   /**
414    * The service port
415    */
416   GNUNET_MESH_ApplicationType port;
417   
418   /**
419    * The id of the lockmanager timeout task
420    */
421   GNUNET_SCHEDULER_TaskIdentifier lockmanager_acquire_timeout_task;
422
423   /**
424    * The retransmit timeout
425    */
426   struct GNUNET_TIME_Relative retransmit_timeout;
427   
428   /**
429    * Listen enabled?
430    */
431   int listening;
432
433   /**
434    * Whether testing mode is active or not
435    */
436   int testing_active;
437
438   /**
439    * The write sequence number to be set incase of testing
440    */
441   uint32_t testing_set_write_sequence_number_value;
442 };
443
444
445 /**
446  * The IO Write Handle
447  */
448 struct GNUNET_STREAM_IOWriteHandle
449 {
450   /**
451    * The socket to which this write handle is associated
452    */
453   struct GNUNET_STREAM_Socket *socket;
454
455   /**
456    * The packet_buffers associated with this Handle
457    */
458   struct GNUNET_STREAM_DataMessage *messages[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
459
460   /**
461    * The write continuation callback
462    */
463   GNUNET_STREAM_CompletionContinuation write_cont;
464
465   /**
466    * Write continuation closure
467    */
468   void *write_cont_cls;
469
470   /**
471    * The bitmap of this IOHandle; Corresponding bit for a message is set when
472    * it has been acknowledged by the receiver
473    */
474   GNUNET_STREAM_AckBitmap ack_bitmap;
475
476   /**
477    * Number of bytes in this write handle
478    */
479   size_t size;
480 };
481
482
483 /**
484  * The IO Read Handle
485  */
486 struct GNUNET_STREAM_IOReadHandle
487 {
488   /**
489    * The socket to which this read handle is associated
490    */
491   struct GNUNET_STREAM_Socket *socket;
492   
493   /**
494    * Callback for the read processor
495    */
496   GNUNET_STREAM_DataProcessor proc;
497
498   /**
499    * The closure pointer for the read processor callback
500    */
501   void *proc_cls;
502 };
503
504
505 /**
506  * Handle for Shutdown
507  */
508 struct GNUNET_STREAM_ShutdownHandle
509 {
510   /**
511    * The socket associated with this shutdown handle
512    */
513   struct GNUNET_STREAM_Socket *socket;
514
515   /**
516    * Shutdown completion callback
517    */
518   GNUNET_STREAM_ShutdownCompletion completion_cb;
519
520   /**
521    * Closure for completion callback
522    */
523   void *completion_cls;
524
525   /**
526    * Close message retransmission task id
527    */
528   GNUNET_SCHEDULER_TaskIdentifier close_msg_retransmission_task_id;
529
530   /**
531    * Which operation to shutdown? SHUT_RD, SHUT_WR or SHUT_RDWR
532    */
533   int operation;  
534 };
535
536
537 /**
538  * Default value in seconds for various timeouts
539  */
540 static const unsigned int default_timeout = 10;
541
542 /**
543  * The domain name for locks we use here
544  */
545 static const char *locking_domain = "GNUNET_STREAM_APPLOCK";
546
547
548 /**
549  * Callback function for sending queued message
550  *
551  * @param cls closure the socket
552  * @param size number of bytes available in buf
553  * @param buf where the callee should write the message
554  * @return number of bytes written to buf
555  */
556 static size_t
557 send_message_notify (void *cls, size_t size, void *buf)
558 {
559   struct GNUNET_STREAM_Socket *socket = cls;
560   struct MessageQueue *head;
561   size_t ret;
562
563   socket->transmit_handle = NULL; /* Remove the transmit handle */
564   head = socket->queue_head;
565   if (NULL == head)
566     return 0; /* just to be safe */
567   if (0 == size)                /* request timed out */
568   {
569     socket->retries++;
570     LOG (GNUNET_ERROR_TYPE_DEBUG,
571          "%s: Message sending timed out. Retry %d \n",
572          GNUNET_i2s (&socket->other_peer),
573          socket->retries);
574     socket->transmit_handle = 
575       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
576                                          GNUNET_NO, /* Corking */
577                                          /* FIXME: exponential backoff */
578                                          socket->retransmit_timeout,
579                                          &socket->other_peer,
580                                          ntohs (head->message->header.size),
581                                          &send_message_notify,
582                                          socket);
583     return 0;
584   }
585   ret = ntohs (head->message->header.size);
586   GNUNET_assert (size >= ret);
587   memcpy (buf, head->message, ret);
588   if (NULL != head->finish_cb)
589   {
590     head->finish_cb (head->finish_cb_cls, socket);
591   }
592   GNUNET_CONTAINER_DLL_remove (socket->queue_head,
593                                socket->queue_tail,
594                                head);
595   GNUNET_free (head->message);
596   GNUNET_free (head);
597   head = socket->queue_head;
598   if (NULL != head)    /* more pending messages to send */
599   {
600     socket->retries = 0;
601     socket->transmit_handle = 
602       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
603                                          GNUNET_NO, /* Corking */
604                                          /* FIXME: exponential backoff */
605                                          socket->retransmit_timeout,
606                                          &socket->other_peer,
607                                          ntohs (head->message->header.size),
608                                          &send_message_notify,
609                                          socket);
610   }
611   return ret;
612 }
613
614
615 /**
616  * Queues a message for sending using the mesh connection of a socket
617  *
618  * @param socket the socket whose mesh connection is used
619  * @param message the message to be sent
620  * @param finish_cb the callback to be called when the message is sent
621  * @param finish_cb_cls the closure for the callback
622  * @param urgent set to GNUNET_YES to add the message to the beginning of the
623  *          queue; GNUNET_NO to add at the tail
624  */
625 static void
626 queue_message (struct GNUNET_STREAM_Socket *socket,
627                struct GNUNET_STREAM_MessageHeader *message,
628                SendFinishCallback finish_cb,
629                void *finish_cb_cls,
630                int urgent)
631 {
632   struct MessageQueue *queue_entity;
633
634   GNUNET_assert 
635     ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA)
636      && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK));
637   LOG (GNUNET_ERROR_TYPE_DEBUG,
638        "%s: Queueing message of type %d and size %d\n",
639        GNUNET_i2s (&socket->other_peer),
640        ntohs (message->header.type),
641        ntohs (message->header.size));
642   GNUNET_assert (NULL != message);
643   queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
644   queue_entity->message = message;
645   queue_entity->finish_cb = finish_cb;
646   queue_entity->finish_cb_cls = finish_cb_cls;
647   if (GNUNET_YES == urgent)
648   {
649     GNUNET_CONTAINER_DLL_insert (socket->queue_head, socket->queue_tail,
650                                  queue_entity);
651     if (NULL != socket->transmit_handle)
652     {
653       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
654       socket->transmit_handle = NULL;
655     }
656   }
657   else
658     GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
659                                       socket->queue_tail,
660                                       queue_entity);
661   if (NULL == socket->transmit_handle)
662   {
663     socket->retries = 0;
664     socket->transmit_handle = 
665       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
666                                          GNUNET_NO, /* Corking */
667                                          socket->retransmit_timeout,
668                                          &socket->other_peer,
669                                          ntohs (message->header.size),
670                                          &send_message_notify,
671                                          socket);
672   }
673 }
674
675
676 /**
677  * Copies a message and queues it for sending using the mesh connection of
678  * given socket 
679  *
680  * @param socket the socket whose mesh connection is used
681  * @param message the message to be sent
682  * @param finish_cb the callback to be called when the message is sent
683  * @param finish_cb_cls the closure for the callback
684  */
685 static void
686 copy_and_queue_message (struct GNUNET_STREAM_Socket *socket,
687                         const struct GNUNET_STREAM_MessageHeader *message,
688                         SendFinishCallback finish_cb,
689                         void *finish_cb_cls)
690 {
691   struct GNUNET_STREAM_MessageHeader *msg_copy;
692   uint16_t size;
693   
694   size = ntohs (message->header.size);
695   msg_copy = GNUNET_malloc (size);
696   memcpy (msg_copy, message, size);
697   queue_message (socket, msg_copy, finish_cb, finish_cb_cls, GNUNET_NO);
698 }
699
700
701 /**
702  * Writes data using the given socket. The amount of data written is limited by
703  * the receiver_window_size
704  *
705  * @param socket the socket to use
706  */
707 static void 
708 write_data (struct GNUNET_STREAM_Socket *socket);
709
710
711 /**
712  * Task for retransmitting data messages if they aren't ACK before their ack
713  * deadline 
714  *
715  * @param cls the socket
716  * @param tc the Task context
717  */
718 static void
719 data_retransmission_task (void *cls,
720                           const struct GNUNET_SCHEDULER_TaskContext *tc)
721 {
722   struct GNUNET_STREAM_Socket *socket = cls;
723   
724   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
725     return;
726   LOG (GNUNET_ERROR_TYPE_DEBUG,
727        "%s: Retransmitting DATA...\n", GNUNET_i2s (&socket->other_peer));
728   socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
729   write_data (socket);
730 }
731
732
733 /**
734  * Task for sending ACK message
735  *
736  * @param cls the socket
737  * @param tc the Task context
738  */
739 static void
740 ack_task (void *cls,
741           const struct GNUNET_SCHEDULER_TaskContext *tc)
742 {
743   struct GNUNET_STREAM_Socket *socket = cls;
744   struct GNUNET_STREAM_AckMessage *ack_msg;
745
746   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
747   {
748     return;
749   }
750   socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
751   /* Create the ACK Message */
752   ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
753   ack_msg->header.header.size = htons (sizeof (struct 
754                                                GNUNET_STREAM_AckMessage));
755   ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
756   ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
757   ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
758   ack_msg->receive_window_remaining = 
759     htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
760   /* Queue up ACK for immediate sending */
761   queue_message (socket, &ack_msg->header, NULL, NULL, GNUNET_YES);
762 }
763
764
765 /**
766  * Retransmission task for shutdown messages
767  *
768  * @param cls the shutdown handle
769  * @param tc the Task Context
770  */
771 static void
772 close_msg_retransmission_task (void *cls,
773                                const struct GNUNET_SCHEDULER_TaskContext *tc)
774 {
775   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle = cls;
776   struct GNUNET_STREAM_MessageHeader *msg;
777   struct GNUNET_STREAM_Socket *socket;
778
779   GNUNET_assert (NULL != shutdown_handle);
780   socket = shutdown_handle->socket;
781
782   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
783   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
784   switch (shutdown_handle->operation)
785   {
786   case SHUT_RDWR:
787     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
788     break;
789   case SHUT_RD:
790     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
791     break;
792   case SHUT_WR:
793     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
794     break;
795   default:
796     GNUNET_free (msg);
797     shutdown_handle->close_msg_retransmission_task_id = 
798       GNUNET_SCHEDULER_NO_TASK;
799     return;
800   }
801   queue_message (socket, msg, NULL, NULL, GNUNET_NO);
802   shutdown_handle->close_msg_retransmission_task_id =
803     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
804                                   &close_msg_retransmission_task,
805                                   shutdown_handle);
806 }
807
808
809 /**
810  * Function to modify a bit in GNUNET_STREAM_AckBitmap
811  *
812  * @param bitmap the bitmap to modify
813  * @param bit the bit number to modify
814  * @param value GNUNET_YES to on, GNUNET_NO to off
815  */
816 static void
817 ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
818                       unsigned int bit, 
819                       int value)
820 {
821   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
822   if (GNUNET_YES == value)
823     *bitmap |= (1LL << bit);
824   else
825     *bitmap &= ~(1LL << bit);
826 }
827
828
829 /**
830  * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
831  *
832  * @param bitmap address of the bitmap that has to be checked
833  * @param bit the bit number to check
834  * @return GNUNET_YES if the bit is set; GNUNET_NO if not
835  */
836 static uint8_t
837 ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
838                       unsigned int bit)
839 {
840   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
841   return 0 != (*bitmap & (1LL << bit));
842 }
843
844
845 /**
846  * Writes data using the given socket. The amount of data written is limited by
847  * the receiver_window_size
848  *
849  * @param socket the socket to use
850  */
851 static void 
852 write_data (struct GNUNET_STREAM_Socket *socket)
853 {
854   struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
855   int packet;                   /* Although an int, should never be negative */
856   int ack_packet;
857
858   ack_packet = -1;
859   /* Find the last acknowledged packet */
860   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
861   {      
862     if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
863                                             packet))
864       ack_packet = packet;        
865     else if (NULL == io_handle->messages[packet])
866       break;
867   }
868   /* Resend packets which weren't ack'ed */
869   for (packet=0; packet < ack_packet; packet++)
870   {
871     if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
872                                            packet))
873     {
874       LOG (GNUNET_ERROR_TYPE_DEBUG,
875            "%s: Placing DATA message with sequence %u in send queue\n",
876            GNUNET_i2s (&socket->other_peer),
877            ntohl (io_handle->messages[packet]->sequence_number));
878       copy_and_queue_message (socket,
879                               &io_handle->messages[packet]->header,
880                               NULL,
881                               NULL);
882     }
883   }
884   packet = ack_packet + 1;
885   /* Now send new packets if there is enough buffer space */
886   while ( (NULL != io_handle->messages[packet]) &&
887           (socket->receiver_window_available 
888            >= ntohs (io_handle->messages[packet]->header.header.size)) &&
889           (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
890   {
891     socket->receiver_window_available -= 
892       ntohs (io_handle->messages[packet]->header.header.size);
893     LOG (GNUNET_ERROR_TYPE_DEBUG,
894          "%s: Placing DATA message with sequence %u in send queue\n",
895          GNUNET_i2s (&socket->other_peer),
896          ntohl (io_handle->messages[packet]->sequence_number));
897     copy_and_queue_message (socket,
898                             &io_handle->messages[packet]->header,
899                             NULL,
900                             NULL);
901     packet++;
902   }
903   if (GNUNET_SCHEDULER_NO_TASK == socket->data_retransmission_task_id)
904     socket->data_retransmission_task_id = 
905       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 
906                                     (GNUNET_TIME_UNIT_SECONDS, 8),
907                                     &data_retransmission_task,
908                                     socket);
909 }
910
911
912 /**
913  * Task for calling the read processor
914  *
915  * @param cls the socket
916  * @param tc the task context
917  */
918 static void
919 call_read_processor (void *cls,
920                      const struct GNUNET_SCHEDULER_TaskContext *tc)
921 {
922   struct GNUNET_STREAM_Socket *socket = cls;
923   size_t read_size;
924   size_t valid_read_size;
925   unsigned int packet;
926   uint32_t sequence_increase;
927   uint32_t offset_increase;
928
929   socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
930   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
931     return;
932
933   if (NULL == socket->receive_buffer) 
934     return;
935
936   GNUNET_assert (NULL != socket->read_handle);
937   GNUNET_assert (NULL != socket->read_handle->proc);
938
939   /* Check the bitmap for any holes */
940   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
941   {
942     if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
943                                            packet))
944       break;
945   }
946   /* We only call read processor if we have the first packet */
947   GNUNET_assert (0 < packet);
948   valid_read_size = 
949     socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
950   GNUNET_assert (0 != valid_read_size);
951   /* Cancel the read_io_timeout_task */
952   GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task_id);
953   socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
954   /* Call the data processor */
955   LOG (GNUNET_ERROR_TYPE_DEBUG,
956        "%s: Calling read processor\n",
957        GNUNET_i2s (&socket->other_peer));
958   read_size = 
959     socket->read_handle->proc (socket->read_handle->proc_cls,
960                                socket->status,
961                                socket->receive_buffer + socket->copy_offset,
962                                valid_read_size);
963   LOG (GNUNET_ERROR_TYPE_DEBUG,
964        "%s: Read processor read %d bytes\n",
965        GNUNET_i2s (&socket->other_peer), read_size);
966   LOG (GNUNET_ERROR_TYPE_DEBUG,
967        "%s: Read processor completed successfully\n",
968        GNUNET_i2s (&socket->other_peer));
969   /* Free the read handle */
970   GNUNET_free (socket->read_handle);
971   socket->read_handle = NULL;
972   GNUNET_assert (read_size <= valid_read_size);
973   socket->copy_offset += read_size;
974   /* Determine upto which packet we can remove from the buffer */
975   for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
976   {
977     if (socket->copy_offset == socket->receive_buffer_boundaries[packet])
978     { packet++; break; }
979     if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
980       break;
981   }
982
983   /* If no packets can be removed we can't move the buffer */
984   if (0 == packet) return;
985   sequence_increase = packet;
986   LOG (GNUNET_ERROR_TYPE_DEBUG,
987        "%s: Sequence increase after read processor completion: %u\n",
988        GNUNET_i2s (&socket->other_peer), sequence_increase);
989
990   /* Shift the data in the receive buffer */
991   socket->receive_buffer = 
992     memmove (socket->receive_buffer,
993              socket->receive_buffer 
994              + socket->receive_buffer_boundaries[sequence_increase-1],
995              socket->receive_buffer_size
996              - socket->receive_buffer_boundaries[sequence_increase-1]);
997   /* Shift the bitmap */
998   socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
999   /* Set read_sequence_number */
1000   socket->read_sequence_number += sequence_increase;
1001   /* Set read_offset */
1002   offset_increase = socket->receive_buffer_boundaries[sequence_increase-1];
1003   socket->read_offset += offset_increase;
1004   /* Fix copy_offset */
1005   GNUNET_assert (offset_increase <= socket->copy_offset);
1006   socket->copy_offset -= offset_increase;
1007   /* Fix relative boundaries */
1008   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
1009   {
1010     if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase)
1011     {
1012       uint32_t ahead_buffer_boundary;
1013
1014       ahead_buffer_boundary = 
1015         socket->receive_buffer_boundaries[packet + sequence_increase];
1016       if (0 == ahead_buffer_boundary)
1017         socket->receive_buffer_boundaries[packet] = 0;
1018       else
1019       {
1020         GNUNET_assert (offset_increase < ahead_buffer_boundary);
1021         socket->receive_buffer_boundaries[packet] = 
1022           ahead_buffer_boundary - offset_increase;
1023       }
1024     }
1025     else
1026       socket->receive_buffer_boundaries[packet] = 0;
1027   }
1028 }
1029
1030
1031 /**
1032  * Cancels the existing read io handle
1033  *
1034  * @param cls the closure from the SCHEDULER call
1035  * @param tc the task context
1036  */
1037 static void
1038 read_io_timeout (void *cls,
1039                  const struct GNUNET_SCHEDULER_TaskContext *tc)
1040 {
1041   struct GNUNET_STREAM_Socket *socket = cls;
1042   GNUNET_STREAM_DataProcessor proc;
1043   void *proc_cls;
1044
1045   socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
1046   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
1047   {
1048     LOG (GNUNET_ERROR_TYPE_DEBUG,
1049          "%s: Read task timedout - Cancelling it\n",
1050          GNUNET_i2s (&socket->other_peer));
1051     GNUNET_SCHEDULER_cancel (socket->read_task_id);
1052     socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
1053   }
1054   GNUNET_assert (NULL != socket->read_handle);
1055   proc = socket->read_handle->proc;
1056   proc_cls = socket->read_handle->proc_cls;
1057   GNUNET_free (socket->read_handle);
1058   socket->read_handle = NULL;
1059   /* Call the read processor to signal timeout */
1060   proc (proc_cls,
1061         GNUNET_STREAM_TIMEOUT,
1062         NULL,
1063         0);
1064 }
1065
1066
1067 /**
1068  * Handler for DATA messages; Same for both client and server
1069  *
1070  * @param socket the socket through which the ack was received
1071  * @param tunnel connection to the other end
1072  * @param sender who sent the message
1073  * @param msg the data message
1074  * @param atsi performance data for the connection
1075  * @return GNUNET_OK to keep the connection open,
1076  *         GNUNET_SYSERR to close it (signal serious error)
1077  */
1078 static int
1079 handle_data (struct GNUNET_STREAM_Socket *socket,
1080              struct GNUNET_MESH_Tunnel *tunnel,
1081              const struct GNUNET_PeerIdentity *sender,
1082              const struct GNUNET_STREAM_DataMessage *msg,
1083              const struct GNUNET_ATS_Information*atsi)
1084 {
1085   const void *payload;
1086   uint32_t bytes_needed;
1087   uint32_t relative_offset;
1088   uint32_t relative_sequence_number;
1089   uint16_t size;
1090
1091   size = htons (msg->header.header.size);
1092   if (size < sizeof (struct GNUNET_STREAM_DataMessage))
1093   {
1094     GNUNET_break_op (0);
1095     return GNUNET_SYSERR;
1096   }
1097
1098   if (0 != memcmp (sender,
1099                    &socket->other_peer,
1100                    sizeof (struct GNUNET_PeerIdentity)))
1101   {
1102     LOG (GNUNET_ERROR_TYPE_DEBUG,
1103          "%s: Received DATA from non-confirming peer\n",
1104          GNUNET_i2s (&socket->other_peer));
1105     return GNUNET_YES;
1106   }
1107
1108   switch (socket->state)
1109   {
1110   case STATE_ESTABLISHED:
1111   case STATE_TRANSMIT_CLOSED:
1112   case STATE_TRANSMIT_CLOSE_WAIT:
1113       
1114     /* check if the message's sequence number is in the range we are
1115        expecting */
1116     relative_sequence_number = 
1117       ntohl (msg->sequence_number) - socket->read_sequence_number;
1118     if ( relative_sequence_number > GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
1119     {
1120       LOG (GNUNET_ERROR_TYPE_DEBUG,
1121            "%s: Ignoring received message with sequence number %u\n",
1122            GNUNET_i2s (&socket->other_peer),
1123            ntohl (msg->sequence_number));
1124       /* Start ACK sending task if one is not already present */
1125       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1126       {
1127         socket->ack_task_id = 
1128           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1129                                         (msg->ack_deadline),
1130                                         &ack_task,
1131                                         socket);
1132       }
1133       return GNUNET_YES;
1134     }
1135       
1136     /* Check if we have already seen this message */
1137     if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
1138                                             relative_sequence_number))
1139     {
1140       LOG (GNUNET_ERROR_TYPE_DEBUG,
1141            "%s: Ignoring already received message with sequence number %u\n",
1142            GNUNET_i2s (&socket->other_peer),
1143            ntohl (msg->sequence_number));
1144       /* Start ACK sending task if one is not already present */
1145       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1146       {
1147         socket->ack_task_id = 
1148           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1149                                         (msg->ack_deadline),
1150                                         &ack_task,
1151                                         socket);
1152       }
1153       return GNUNET_YES;
1154     }
1155
1156     LOG (GNUNET_ERROR_TYPE_DEBUG,
1157          "%s: Receiving DATA with sequence number: %u and size: %d from %s\n",
1158          GNUNET_i2s (&socket->other_peer),
1159          ntohl (msg->sequence_number),
1160          ntohs (msg->header.header.size),
1161          GNUNET_i2s (&socket->other_peer));
1162       
1163     /* Check if we have to allocate the buffer */
1164     size -= sizeof (struct GNUNET_STREAM_DataMessage);
1165     relative_offset = ntohl (msg->offset) - socket->read_offset;
1166     bytes_needed = relative_offset + size;
1167     if (bytes_needed > socket->receive_buffer_size)
1168     {
1169       if (bytes_needed <= RECEIVE_BUFFER_SIZE)
1170       {
1171         socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
1172                                                  bytes_needed);
1173         socket->receive_buffer_size = bytes_needed;
1174       }
1175       else
1176       {
1177         LOG (GNUNET_ERROR_TYPE_DEBUG,
1178              "%s: Cannot accommodate packet %d as buffer is full\n",
1179              GNUNET_i2s (&socket->other_peer),
1180              ntohl (msg->sequence_number));
1181         return GNUNET_YES;
1182       }
1183     }
1184       
1185     /* Copy Data to buffer */
1186     payload = &msg[1];
1187     GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
1188     memcpy (socket->receive_buffer + relative_offset,
1189             payload,
1190             size);
1191     socket->receive_buffer_boundaries[relative_sequence_number] = 
1192       relative_offset + size;
1193       
1194     /* Modify the ACK bitmap */
1195     ackbitmap_modify_bit (&socket->ack_bitmap,
1196                           relative_sequence_number,
1197                           GNUNET_YES);
1198
1199     /* Start ACK sending task if one is not already present */
1200     if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1201     {
1202       socket->ack_task_id = 
1203         GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1204                                       (msg->ack_deadline),
1205                                       &ack_task,
1206                                       socket);
1207     }
1208
1209     if ((NULL != socket->read_handle) /* A read handle is waiting */
1210         /* There is no current read task */
1211         && (GNUNET_SCHEDULER_NO_TASK == socket->read_task_id)
1212         /* We have the first packet */
1213         && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap,
1214                                                0)))
1215     {
1216       LOG (GNUNET_ERROR_TYPE_DEBUG,
1217            "%s: Scheduling read processor\n",
1218            GNUNET_i2s (&socket->other_peer));
1219           
1220       socket->read_task_id = 
1221         GNUNET_SCHEDULER_add_now (&call_read_processor,
1222                                   socket);
1223     }
1224       
1225     break;
1226
1227   default:
1228     LOG (GNUNET_ERROR_TYPE_DEBUG,
1229          "%s: Received data message when it cannot be handled\n",
1230          GNUNET_i2s (&socket->other_peer));
1231     break;
1232   }
1233   return GNUNET_YES;
1234 }
1235
1236
1237 /**
1238  * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1239  *
1240  * @param cls the socket (set from GNUNET_MESH_connect)
1241  * @param tunnel connection to the other end
1242  * @param tunnel_ctx place to store local state associated with the tunnel
1243  * @param sender who sent the message
1244  * @param message the actual message
1245  * @param atsi performance data for the connection
1246  * @return GNUNET_OK to keep the connection open,
1247  *         GNUNET_SYSERR to close it (signal serious error)
1248  */
1249 static int
1250 client_handle_data (void *cls,
1251                     struct GNUNET_MESH_Tunnel *tunnel,
1252                     void **tunnel_ctx,
1253                     const struct GNUNET_PeerIdentity *sender,
1254                     const struct GNUNET_MessageHeader *message,
1255                     const struct GNUNET_ATS_Information*atsi)
1256 {
1257   struct GNUNET_STREAM_Socket *socket = cls;
1258
1259   return handle_data (socket, 
1260                       tunnel, 
1261                       sender, 
1262                       (const struct GNUNET_STREAM_DataMessage *) message, 
1263                       atsi);
1264 }
1265
1266
1267 /**
1268  * Callback to set state to ESTABLISHED
1269  *
1270  * @param cls the closure NULL;
1271  * @param socket the socket to requiring state change
1272  */
1273 static void
1274 set_state_established (void *cls,
1275                        struct GNUNET_STREAM_Socket *socket)
1276 {
1277   LOG (GNUNET_ERROR_TYPE_DEBUG, 
1278        "%s: Attaining ESTABLISHED state\n",
1279        GNUNET_i2s (&socket->other_peer));
1280   socket->write_offset = 0;
1281   socket->read_offset = 0;
1282   socket->state = STATE_ESTABLISHED;
1283   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK !=
1284                  socket->control_retransmission_task_id);
1285   GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
1286   socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
1287   if (NULL != socket->lsocket)
1288   {
1289     LOG (GNUNET_ERROR_TYPE_DEBUG,
1290          "%s: Calling listen callback\n",
1291          GNUNET_i2s (&socket->other_peer));
1292     if (GNUNET_SYSERR == 
1293         socket->lsocket->listen_cb (socket->lsocket->listen_cb_cls,
1294                                     socket,
1295                                     &socket->other_peer))
1296     {
1297       socket->state = STATE_CLOSED;
1298       /* FIXME: We should close in a decent way (send RST) */
1299       GNUNET_MESH_tunnel_destroy (socket->tunnel); /* Destroy the tunnel */
1300       GNUNET_free (socket);
1301     }
1302   }
1303   else
1304     socket->open_cb (socket->open_cls, socket);
1305 }
1306
1307
1308 /**
1309  * Callback to set state to HELLO_WAIT
1310  *
1311  * @param cls the closure from queue_message
1312  * @param socket the socket to requiring state change
1313  */
1314 static void
1315 set_state_hello_wait (void *cls,
1316                       struct GNUNET_STREAM_Socket *socket)
1317 {
1318   GNUNET_assert (STATE_INIT == socket->state);
1319   LOG (GNUNET_ERROR_TYPE_DEBUG,
1320        "%s: Attaining HELLO_WAIT state\n",
1321        GNUNET_i2s (&socket->other_peer));
1322   socket->state = STATE_HELLO_WAIT;
1323 }
1324
1325
1326 /**
1327  * Callback to set state to CLOSE_WAIT
1328  *
1329  * @param cls the closure from queue_message
1330  * @param socket the socket requiring state change
1331  */
1332 static void
1333 set_state_close_wait (void *cls,
1334                       struct GNUNET_STREAM_Socket *socket)
1335 {
1336   LOG (GNUNET_ERROR_TYPE_DEBUG,
1337        "%s: Attaing CLOSE_WAIT state\n",
1338        GNUNET_i2s (&socket->other_peer));
1339   socket->state = STATE_CLOSE_WAIT;
1340   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1341   socket->receive_buffer = NULL;
1342   socket->receive_buffer_size = 0;
1343 }
1344
1345
1346 /**
1347  * Callback to set state to RECEIVE_CLOSE_WAIT
1348  *
1349  * @param cls the closure from queue_message
1350  * @param socket the socket requiring state change
1351  */
1352 static void
1353 set_state_receive_close_wait (void *cls,
1354                               struct GNUNET_STREAM_Socket *socket)
1355 {
1356   LOG (GNUNET_ERROR_TYPE_DEBUG,
1357        "%s: Attaing RECEIVE_CLOSE_WAIT state\n",
1358        GNUNET_i2s (&socket->other_peer));
1359   socket->state = STATE_RECEIVE_CLOSE_WAIT;
1360   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1361   socket->receive_buffer = NULL;
1362   socket->receive_buffer_size = 0;
1363 }
1364
1365
1366 /**
1367  * Callback to set state to TRANSMIT_CLOSE_WAIT
1368  *
1369  * @param cls the closure from queue_message
1370  * @param socket the socket requiring state change
1371  */
1372 static void
1373 set_state_transmit_close_wait (void *cls,
1374                                struct GNUNET_STREAM_Socket *socket)
1375 {
1376   LOG (GNUNET_ERROR_TYPE_DEBUG,
1377        "%s: Attaing TRANSMIT_CLOSE_WAIT state\n",
1378        GNUNET_i2s (&socket->other_peer));
1379   socket->state = STATE_TRANSMIT_CLOSE_WAIT;
1380 }
1381
1382
1383 /**
1384  * Callback to set state to CLOSED
1385  *
1386  * @param cls the closure from queue_message
1387  * @param socket the socket requiring state change
1388  */
1389 static void
1390 set_state_closed (void *cls,
1391                   struct GNUNET_STREAM_Socket *socket)
1392 {
1393   socket->state = STATE_CLOSED;
1394 }
1395
1396
1397 /**
1398  * Returns GNUNET_MESSAGE_TYPE_STREAM_HELLO
1399  *
1400  * @return the generate hello message
1401  */
1402 static struct GNUNET_STREAM_MessageHeader *
1403 generate_hello (void)
1404 {
1405   struct GNUNET_STREAM_MessageHeader *msg;
1406
1407   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1408   msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
1409   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1410   return msg;
1411 }
1412
1413
1414 /**
1415  * Returns a new HelloAckMessage. Also sets the write sequence number for the
1416  * socket
1417  *
1418  * @param socket the socket for which this HelloAckMessage has to be generated
1419  * @param generate_seq GNUNET_YES to generate the write sequence number,
1420  *          GNUNET_NO to use the existing sequence number
1421  * @return the HelloAckMessage
1422  */
1423 static struct GNUNET_STREAM_HelloAckMessage *
1424 generate_hello_ack (struct GNUNET_STREAM_Socket *socket,
1425                     int generate_seq)
1426 {
1427   struct GNUNET_STREAM_HelloAckMessage *msg;
1428
1429   if (GNUNET_YES == generate_seq)
1430   {
1431     if (GNUNET_YES == socket->testing_active)
1432       socket->write_sequence_number =
1433         socket->testing_set_write_sequence_number_value;
1434     else
1435       socket->write_sequence_number = 
1436         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1437     LOG_DEBUG ("%s: write sequence number %u\n",
1438                GNUNET_i2s (&socket->other_peer),
1439                (unsigned int) socket->write_sequence_number);
1440   }
1441   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1442   msg->header.header.size = 
1443     htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1444   msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1445   msg->sequence_number = htonl (socket->write_sequence_number);
1446   msg->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
1447   return msg;
1448 }
1449
1450
1451 /**
1452  * Task for retransmitting control messages if they aren't ACK'ed before a
1453  * deadline
1454  *
1455  * @param cls the socket
1456  * @param tc the Task context
1457  */
1458 static void
1459 control_retransmission_task (void *cls,
1460                              const struct GNUNET_SCHEDULER_TaskContext *tc)
1461 {
1462   struct GNUNET_STREAM_Socket *socket = cls;
1463     
1464   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
1465     return;
1466   socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
1467   LOG_DEBUG ("%s: Retransmitting a control message\n",
1468                  GNUNET_i2s (&socket->other_peer));
1469   switch (socket->state)
1470   {
1471   case STATE_INIT:    
1472     GNUNET_break (0);
1473     break;
1474   case STATE_LISTEN:
1475     GNUNET_break (0);
1476     break;
1477   case STATE_HELLO_WAIT:
1478     if (NULL == socket->lsocket) /* We are client */
1479       queue_message (socket, generate_hello (), NULL, NULL, GNUNET_NO);
1480     else
1481       queue_message (socket,
1482                      (struct GNUNET_STREAM_MessageHeader *)
1483                      generate_hello_ack (socket, GNUNET_NO), NULL, NULL,
1484                      GNUNET_NO);
1485     socket->control_retransmission_task_id =
1486     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
1487                                   &control_retransmission_task, socket);
1488     break;
1489   case STATE_ESTABLISHED:
1490     if (NULL == socket->lsocket)
1491       queue_message (socket,
1492                      (struct GNUNET_STREAM_MessageHeader *)
1493                      generate_hello_ack (socket, GNUNET_NO), NULL, NULL,
1494                      GNUNET_NO);
1495     else
1496       GNUNET_break (0);
1497   default:
1498     GNUNET_break (0);
1499   }  
1500 }
1501
1502
1503 /**
1504  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1505  *
1506  * @param cls the socket (set from GNUNET_MESH_connect)
1507  * @param tunnel connection to the other end
1508  * @param tunnel_ctx this is NULL
1509  * @param sender who sent the message
1510  * @param message the actual message
1511  * @param atsi performance data for the connection
1512  * @return GNUNET_OK to keep the connection open,
1513  *         GNUNET_SYSERR to close it (signal serious error)
1514  */
1515 static int
1516 client_handle_hello_ack (void *cls,
1517                          struct GNUNET_MESH_Tunnel *tunnel,
1518                          void **tunnel_ctx,
1519                          const struct GNUNET_PeerIdentity *sender,
1520                          const struct GNUNET_MessageHeader *message,
1521                          const struct GNUNET_ATS_Information*atsi)
1522 {
1523   struct GNUNET_STREAM_Socket *socket = cls;
1524   const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
1525   struct GNUNET_STREAM_HelloAckMessage *reply;
1526
1527   if (0 != memcmp (sender,
1528                    &socket->other_peer,
1529                    sizeof (struct GNUNET_PeerIdentity)))
1530   {
1531     LOG (GNUNET_ERROR_TYPE_DEBUG,
1532          "%s: Received HELLO_ACK from non-confirming peer\n",
1533          GNUNET_i2s (&socket->other_peer));
1534     return GNUNET_YES;
1535   }
1536   ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
1537   LOG (GNUNET_ERROR_TYPE_DEBUG,
1538        "%s: Received HELLO_ACK from %s\n",
1539        GNUNET_i2s (&socket->other_peer),
1540        GNUNET_i2s (&socket->other_peer));
1541
1542   GNUNET_assert (socket->tunnel == tunnel);
1543   switch (socket->state)
1544   {
1545   case STATE_HELLO_WAIT:
1546     socket->read_sequence_number = ntohl (ack_msg->sequence_number);
1547     LOG (GNUNET_ERROR_TYPE_DEBUG,
1548          "%s: Read sequence number %u\n",
1549          GNUNET_i2s (&socket->other_peer),
1550          (unsigned int) socket->read_sequence_number);
1551     socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
1552     reply = generate_hello_ack (socket, GNUNET_YES);
1553     queue_message (socket, &reply->header, &set_state_established, 
1554                    NULL, GNUNET_NO);    
1555     return GNUNET_OK;
1556   case STATE_ESTABLISHED:
1557     // call statistics (# ACKs ignored++)
1558     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
1559                    socket->control_retransmission_task_id);
1560     socket->control_retransmission_task_id =
1561       GNUNET_SCHEDULER_add_now (&control_retransmission_task, socket);
1562     return GNUNET_OK;
1563   default:
1564     LOG_DEBUG ("%s: Server %s sent HELLO_ACK when in state %d\n", 
1565                GNUNET_i2s (&socket->other_peer),
1566                GNUNET_i2s (&socket->other_peer),
1567                socket->state);
1568     socket->state = STATE_CLOSED; // introduce STATE_ERROR?
1569     return GNUNET_SYSERR;
1570   }
1571 }
1572
1573
1574 /**
1575  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1576  *
1577  * @param cls the socket (set from GNUNET_MESH_connect)
1578  * @param tunnel connection to the other end
1579  * @param tunnel_ctx this is NULL
1580  * @param sender who sent the message
1581  * @param message the actual message
1582  * @param atsi performance data for the connection
1583  * @return GNUNET_OK to keep the connection open,
1584  *         GNUNET_SYSERR to close it (signal serious error)
1585  */
1586 static int
1587 client_handle_reset (void *cls,
1588                      struct GNUNET_MESH_Tunnel *tunnel,
1589                      void **tunnel_ctx,
1590                      const struct GNUNET_PeerIdentity *sender,
1591                      const struct GNUNET_MessageHeader *message,
1592                      const struct GNUNET_ATS_Information*atsi)
1593 {
1594   // struct GNUNET_STREAM_Socket *socket = cls;
1595
1596   return GNUNET_OK;
1597 }
1598
1599
1600 /**
1601  * Common message handler for handling TRANSMIT_CLOSE messages
1602  *
1603  * @param socket the socket through which the ack was received
1604  * @param tunnel connection to the other end
1605  * @param sender who sent the message
1606  * @param msg the transmit close message
1607  * @param atsi performance data for the connection
1608  * @return GNUNET_OK to keep the connection open,
1609  *         GNUNET_SYSERR to close it (signal serious error)
1610  */
1611 static int
1612 handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
1613                        struct GNUNET_MESH_Tunnel *tunnel,
1614                        const struct GNUNET_PeerIdentity *sender,
1615                        const struct GNUNET_STREAM_MessageHeader *msg,
1616                        const struct GNUNET_ATS_Information*atsi)
1617 {
1618   struct GNUNET_STREAM_MessageHeader *reply;
1619
1620   switch (socket->state)
1621   {
1622   case STATE_ESTABLISHED:
1623     socket->state = STATE_RECEIVE_CLOSED;
1624
1625     /* Send TRANSMIT_CLOSE_ACK */
1626     reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1627     reply->header.type = 
1628       htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
1629     reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1630     queue_message (socket, reply, NULL, NULL, GNUNET_NO);
1631     break;
1632
1633   default:
1634     /* FIXME: Call statistics? */
1635     break;
1636   }
1637   return GNUNET_YES;
1638 }
1639
1640
1641 /**
1642  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1643  *
1644  * @param cls the socket (set from GNUNET_MESH_connect)
1645  * @param tunnel connection to the other end
1646  * @param tunnel_ctx this is NULL
1647  * @param sender who sent the message
1648  * @param message the actual message
1649  * @param atsi performance data for the connection
1650  * @return GNUNET_OK to keep the connection open,
1651  *         GNUNET_SYSERR to close it (signal serious error)
1652  */
1653 static int
1654 client_handle_transmit_close (void *cls,
1655                               struct GNUNET_MESH_Tunnel *tunnel,
1656                               void **tunnel_ctx,
1657                               const struct GNUNET_PeerIdentity *sender,
1658                               const struct GNUNET_MessageHeader *message,
1659                               const struct GNUNET_ATS_Information*atsi)
1660 {
1661   struct GNUNET_STREAM_Socket *socket = cls;
1662   
1663   return handle_transmit_close (socket,
1664                                 tunnel,
1665                                 sender,
1666                                 (struct GNUNET_STREAM_MessageHeader *)message,
1667                                 atsi);
1668 }
1669
1670
1671 /**
1672  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_*_CLOSE_ACK messages
1673  *
1674  * @param socket the socket
1675  * @param tunnel connection to the other end
1676  * @param sender who sent the message
1677  * @param message the actual message
1678  * @param atsi performance data for the connection
1679  * @param operation the close operation which is being ACK'ed
1680  * @return GNUNET_OK to keep the connection open,
1681  *         GNUNET_SYSERR to close it (signal serious error)
1682  */
1683 static int
1684 handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
1685                           struct GNUNET_MESH_Tunnel *tunnel,
1686                           const struct GNUNET_PeerIdentity *sender,
1687                           const struct GNUNET_STREAM_MessageHeader *message,
1688                           const struct GNUNET_ATS_Information *atsi,
1689                           int operation)
1690 {
1691   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
1692
1693   shutdown_handle = socket->shutdown_handle;
1694   if (NULL == shutdown_handle)
1695   {
1696     LOG (GNUNET_ERROR_TYPE_DEBUG,
1697          "%s: Received CLOSE_ACK when shutdown handle is NULL\n",
1698          GNUNET_i2s (&socket->other_peer));
1699     return GNUNET_OK;
1700   }
1701
1702   switch (operation)
1703   {
1704   case SHUT_RDWR:
1705     switch (socket->state)
1706     {
1707     case STATE_CLOSE_WAIT:
1708       if (SHUT_RDWR != shutdown_handle->operation)
1709       {
1710         LOG (GNUNET_ERROR_TYPE_DEBUG,
1711              "%s: Received CLOSE_ACK when shutdown handle is not for "
1712              "SHUT_RDWR\n",
1713              GNUNET_i2s (&socket->other_peer));
1714         return GNUNET_OK;
1715       }
1716
1717       LOG (GNUNET_ERROR_TYPE_DEBUG,
1718            "%s: Received CLOSE_ACK from %s\n",
1719            GNUNET_i2s (&socket->other_peer),
1720            GNUNET_i2s (&socket->other_peer));
1721       socket->state = STATE_CLOSED;
1722       break;
1723     default:
1724       LOG (GNUNET_ERROR_TYPE_DEBUG,
1725            "%s: Received CLOSE_ACK when in it not expected\n",
1726            GNUNET_i2s (&socket->other_peer));
1727       return GNUNET_OK;
1728     }
1729     break;
1730
1731   case SHUT_RD:
1732     switch (socket->state)
1733     {
1734     case STATE_RECEIVE_CLOSE_WAIT:
1735       if (SHUT_RD != shutdown_handle->operation)
1736       {
1737         LOG (GNUNET_ERROR_TYPE_DEBUG,
1738              "%s: Received RECEIVE_CLOSE_ACK when shutdown handle "
1739              "is not for SHUT_RD\n",
1740              GNUNET_i2s (&socket->other_peer));
1741         return GNUNET_OK;
1742       }
1743
1744       LOG (GNUNET_ERROR_TYPE_DEBUG,
1745            "%s: Received RECEIVE_CLOSE_ACK from %s\n",
1746            GNUNET_i2s (&socket->other_peer),
1747            GNUNET_i2s (&socket->other_peer));
1748       socket->state = STATE_RECEIVE_CLOSED;
1749       break;
1750     default:
1751       LOG (GNUNET_ERROR_TYPE_DEBUG,
1752            "%s: Received RECEIVE_CLOSE_ACK when in it not expected\n",
1753            GNUNET_i2s (&socket->other_peer));
1754       return GNUNET_OK;
1755     }
1756
1757     break;
1758   case SHUT_WR:
1759     switch (socket->state)
1760     {
1761     case STATE_TRANSMIT_CLOSE_WAIT:
1762       if (SHUT_WR != shutdown_handle->operation)
1763       {
1764         LOG (GNUNET_ERROR_TYPE_DEBUG,
1765              "%s: Received TRANSMIT_CLOSE_ACK when shutdown handle "
1766              "is not for SHUT_WR\n",
1767              GNUNET_i2s (&socket->other_peer));
1768         return GNUNET_OK;
1769       }
1770
1771       LOG (GNUNET_ERROR_TYPE_DEBUG,
1772            "%s: Received TRANSMIT_CLOSE_ACK from %s\n",
1773            GNUNET_i2s (&socket->other_peer),
1774            GNUNET_i2s (&socket->other_peer));
1775       socket->state = STATE_TRANSMIT_CLOSED;
1776       break;
1777     default:
1778       LOG (GNUNET_ERROR_TYPE_DEBUG,
1779            "%s: Received TRANSMIT_CLOSE_ACK when in it not expected\n",
1780            GNUNET_i2s (&socket->other_peer));
1781           
1782       return GNUNET_OK;
1783     }
1784     break;
1785   default:
1786     GNUNET_assert (0);
1787   }
1788
1789   if (NULL != shutdown_handle->completion_cb) /* Shutdown completion */
1790     shutdown_handle->completion_cb(shutdown_handle->completion_cls,
1791                                    operation);
1792   if (GNUNET_SCHEDULER_NO_TASK
1793       != shutdown_handle->close_msg_retransmission_task_id)
1794   {
1795     GNUNET_SCHEDULER_cancel
1796       (shutdown_handle->close_msg_retransmission_task_id);
1797     shutdown_handle->close_msg_retransmission_task_id =
1798       GNUNET_SCHEDULER_NO_TASK;
1799   }
1800   GNUNET_free (shutdown_handle); /* Free shutdown handle */
1801   socket->shutdown_handle = NULL;
1802   return GNUNET_OK;
1803 }
1804
1805
1806 /**
1807  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1808  *
1809  * @param cls the socket (set from GNUNET_MESH_connect)
1810  * @param tunnel connection to the other end
1811  * @param tunnel_ctx this is NULL
1812  * @param sender who sent the message
1813  * @param message the actual message
1814  * @param atsi performance data for the connection
1815  * @return GNUNET_OK to keep the connection open,
1816  *         GNUNET_SYSERR to close it (signal serious error)
1817  */
1818 static int
1819 client_handle_transmit_close_ack (void *cls,
1820                                   struct GNUNET_MESH_Tunnel *tunnel,
1821                                   void **tunnel_ctx,
1822                                   const struct GNUNET_PeerIdentity *sender,
1823                                   const struct GNUNET_MessageHeader *message,
1824                                   const struct GNUNET_ATS_Information*atsi)
1825 {
1826   struct GNUNET_STREAM_Socket *socket = cls;
1827
1828   return handle_generic_close_ack (socket,
1829                                    tunnel,
1830                                    sender,
1831                                    (const struct GNUNET_STREAM_MessageHeader *)
1832                                    message,
1833                                    atsi,
1834                                    SHUT_WR);
1835 }
1836
1837
1838 /**
1839  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1840  *
1841  * @param socket the socket
1842  * @param tunnel connection to the other end
1843  * @param sender who sent the message
1844  * @param message the actual message
1845  * @param atsi performance data for the connection
1846  * @return GNUNET_OK to keep the connection open,
1847  *         GNUNET_SYSERR to close it (signal serious error)
1848  */
1849 static int
1850 handle_receive_close (struct GNUNET_STREAM_Socket *socket,
1851                       struct GNUNET_MESH_Tunnel *tunnel,
1852                       const struct GNUNET_PeerIdentity *sender,
1853                       const struct GNUNET_STREAM_MessageHeader *message,
1854                       const struct GNUNET_ATS_Information *atsi)
1855 {
1856   struct GNUNET_STREAM_MessageHeader *receive_close_ack;
1857
1858   switch (socket->state)
1859   {
1860   case STATE_INIT:
1861   case STATE_LISTEN:
1862   case STATE_HELLO_WAIT:
1863     LOG (GNUNET_ERROR_TYPE_DEBUG,
1864          "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
1865          GNUNET_i2s (&socket->other_peer));
1866     return GNUNET_OK;
1867   default:
1868     break;
1869   }
1870   
1871   LOG (GNUNET_ERROR_TYPE_DEBUG,
1872        "%s: Received RECEIVE_CLOSE from %s\n",
1873        GNUNET_i2s (&socket->other_peer),
1874        GNUNET_i2s (&socket->other_peer));
1875   receive_close_ack =
1876     GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1877   receive_close_ack->header.size =
1878     htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1879   receive_close_ack->header.type =
1880     htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK);
1881   queue_message (socket, receive_close_ack, &set_state_closed,
1882                  NULL, GNUNET_NO);  
1883   /* FIXME: Handle the case where write handle is present; the write operation
1884      should be deemed as finised and the write continuation callback
1885      has to be called with the stream status GNUNET_STREAM_SHUTDOWN */
1886   return GNUNET_OK;
1887 }
1888
1889
1890 /**
1891  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1892  *
1893  * @param cls the socket (set from GNUNET_MESH_connect)
1894  * @param tunnel connection to the other end
1895  * @param tunnel_ctx this is NULL
1896  * @param sender who sent the message
1897  * @param message the actual message
1898  * @param atsi performance data for the connection
1899  * @return GNUNET_OK to keep the connection open,
1900  *         GNUNET_SYSERR to close it (signal serious error)
1901  */
1902 static int
1903 client_handle_receive_close (void *cls,
1904                              struct GNUNET_MESH_Tunnel *tunnel,
1905                              void **tunnel_ctx,
1906                              const struct GNUNET_PeerIdentity *sender,
1907                              const struct GNUNET_MessageHeader *message,
1908                              const struct GNUNET_ATS_Information*atsi)
1909 {
1910   struct GNUNET_STREAM_Socket *socket = cls;
1911
1912   return
1913     handle_receive_close (socket,
1914                           tunnel,
1915                           sender,
1916                           (const struct GNUNET_STREAM_MessageHeader *) message,
1917                           atsi);
1918 }
1919
1920
1921 /**
1922  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1923  *
1924  * @param cls the socket (set from GNUNET_MESH_connect)
1925  * @param tunnel connection to the other end
1926  * @param tunnel_ctx this is NULL
1927  * @param sender who sent the message
1928  * @param message the actual message
1929  * @param atsi performance data for the connection
1930  * @return GNUNET_OK to keep the connection open,
1931  *         GNUNET_SYSERR to close it (signal serious error)
1932  */
1933 static int
1934 client_handle_receive_close_ack (void *cls,
1935                                  struct GNUNET_MESH_Tunnel *tunnel,
1936                                  void **tunnel_ctx,
1937                                  const struct GNUNET_PeerIdentity *sender,
1938                                  const struct GNUNET_MessageHeader *message,
1939                                  const struct GNUNET_ATS_Information*atsi)
1940 {
1941   struct GNUNET_STREAM_Socket *socket = cls;
1942
1943   return handle_generic_close_ack (socket,
1944                                    tunnel,
1945                                    sender,
1946                                    (const struct GNUNET_STREAM_MessageHeader *)
1947                                    message,
1948                                    atsi,
1949                                    SHUT_RD);
1950 }
1951
1952
1953 /**
1954  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1955  *
1956  * @param socket the socket
1957  * @param tunnel connection to the other end
1958  * @param sender who sent the message
1959  * @param message the actual message
1960  * @param atsi performance data for the connection
1961  * @return GNUNET_OK to keep the connection open,
1962  *         GNUNET_SYSERR to close it (signal serious error)
1963  */
1964 static int
1965 handle_close (struct GNUNET_STREAM_Socket *socket,
1966               struct GNUNET_MESH_Tunnel *tunnel,
1967               const struct GNUNET_PeerIdentity *sender,
1968               const struct GNUNET_STREAM_MessageHeader *message,
1969               const struct GNUNET_ATS_Information*atsi)
1970 {
1971   struct GNUNET_STREAM_MessageHeader *close_ack;
1972
1973   switch (socket->state)
1974   {
1975   case STATE_INIT:
1976   case STATE_LISTEN:
1977   case STATE_HELLO_WAIT:
1978     LOG (GNUNET_ERROR_TYPE_DEBUG,
1979          "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
1980          GNUNET_i2s (&socket->other_peer));
1981     return GNUNET_OK;
1982   default:
1983     break;
1984   }
1985
1986   LOG (GNUNET_ERROR_TYPE_DEBUG,
1987        "%s: Received CLOSE from %s\n",
1988        GNUNET_i2s (&socket->other_peer),
1989        GNUNET_i2s (&socket->other_peer));
1990   close_ack = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1991   close_ack->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1992   close_ack->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK);
1993   queue_message (socket, close_ack, &set_state_closed, NULL, GNUNET_NO);
1994   if (socket->state == STATE_CLOSED)
1995     return GNUNET_OK;
1996
1997   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1998   socket->receive_buffer = NULL;
1999   socket->receive_buffer_size = 0;
2000   return GNUNET_OK;
2001 }
2002
2003
2004 /**
2005  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
2006  *
2007  * @param cls the socket (set from GNUNET_MESH_connect)
2008  * @param tunnel connection to the other end
2009  * @param tunnel_ctx this is NULL
2010  * @param sender who sent the message
2011  * @param message the actual message
2012  * @param atsi performance data for the connection
2013  * @return GNUNET_OK to keep the connection open,
2014  *         GNUNET_SYSERR to close it (signal serious error)
2015  */
2016 static int
2017 client_handle_close (void *cls,
2018                      struct GNUNET_MESH_Tunnel *tunnel,
2019                      void **tunnel_ctx,
2020                      const struct GNUNET_PeerIdentity *sender,
2021                      const struct GNUNET_MessageHeader *message,
2022                      const struct GNUNET_ATS_Information*atsi)
2023 {
2024   struct GNUNET_STREAM_Socket *socket = cls;
2025
2026   return handle_close (socket,
2027                        tunnel,
2028                        sender,
2029                        (const struct GNUNET_STREAM_MessageHeader *) message,
2030                        atsi);
2031 }
2032
2033
2034 /**
2035  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
2036  *
2037  * @param cls the socket (set from GNUNET_MESH_connect)
2038  * @param tunnel connection to the other end
2039  * @param tunnel_ctx this is NULL
2040  * @param sender who sent the message
2041  * @param message the actual message
2042  * @param atsi performance data for the connection
2043  * @return GNUNET_OK to keep the connection open,
2044  *         GNUNET_SYSERR to close it (signal serious error)
2045  */
2046 static int
2047 client_handle_close_ack (void *cls,
2048                          struct GNUNET_MESH_Tunnel *tunnel,
2049                          void **tunnel_ctx,
2050                          const struct GNUNET_PeerIdentity *sender,
2051                          const struct GNUNET_MessageHeader *message,
2052                          const struct GNUNET_ATS_Information *atsi)
2053 {
2054   struct GNUNET_STREAM_Socket *socket = cls;
2055
2056   return handle_generic_close_ack (socket,
2057                                    tunnel,
2058                                    sender,
2059                                    (const struct GNUNET_STREAM_MessageHeader *) 
2060                                    message,
2061                                    atsi,
2062                                    SHUT_RDWR);
2063 }
2064
2065 /*****************************/
2066 /* Server's Message Handlers */
2067 /*****************************/
2068
2069 /**
2070  * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
2071  *
2072  * @param cls the closure
2073  * @param tunnel connection to the other end
2074  * @param tunnel_ctx the socket
2075  * @param sender who sent the message
2076  * @param message the actual message
2077  * @param atsi performance data for the connection
2078  * @return GNUNET_OK to keep the connection open,
2079  *         GNUNET_SYSERR to close it (signal serious error)
2080  */
2081 static int
2082 server_handle_data (void *cls,
2083                     struct GNUNET_MESH_Tunnel *tunnel,
2084                     void **tunnel_ctx,
2085                     const struct GNUNET_PeerIdentity *sender,
2086                     const struct GNUNET_MessageHeader *message,
2087                     const struct GNUNET_ATS_Information*atsi)
2088 {
2089   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2090
2091   return handle_data (socket,
2092                       tunnel,
2093                       sender,
2094                       (const struct GNUNET_STREAM_DataMessage *)message,
2095                       atsi);
2096 }
2097
2098
2099 /**
2100  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
2101  *
2102  * @param cls the closure
2103  * @param tunnel connection to the other end
2104  * @param tunnel_ctx the socket
2105  * @param sender who sent the message
2106  * @param message the actual message
2107  * @param atsi performance data for the connection
2108  * @return GNUNET_OK to keep the connection open,
2109  *         GNUNET_SYSERR to close it (signal serious error)
2110  */
2111 static int
2112 server_handle_hello (void *cls,
2113                      struct GNUNET_MESH_Tunnel *tunnel,
2114                      void **tunnel_ctx,
2115                      const struct GNUNET_PeerIdentity *sender,
2116                      const struct GNUNET_MessageHeader *message,
2117                      const struct GNUNET_ATS_Information*atsi)
2118 {
2119   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2120   struct GNUNET_STREAM_HelloAckMessage *reply;
2121
2122   if (0 != memcmp (sender,
2123                    &socket->other_peer,
2124                    sizeof (struct GNUNET_PeerIdentity)))
2125   {
2126     LOG_DEBUG ("%s: Received HELLO from non-confirming peer\n",
2127                GNUNET_i2s (&socket->other_peer));
2128     return GNUNET_YES;
2129   }
2130   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == ntohs (message->type));
2131   GNUNET_assert (socket->tunnel == tunnel);
2132   LOG_DEBUG ("%s: Received HELLO from %s\n", GNUNET_i2s (&socket->other_peer),
2133              GNUNET_i2s (&socket->other_peer));
2134   switch (socket->status)
2135   {
2136   case STATE_INIT:
2137     reply = generate_hello_ack (socket, GNUNET_YES);
2138     queue_message (socket, &reply->header, &set_state_hello_wait, NULL,
2139                    GNUNET_NO);
2140     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
2141                    socket->control_retransmission_task_id);
2142     socket->control_retransmission_task_id =
2143       GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
2144                                     &control_retransmission_task, socket);
2145     break;
2146   case STATE_HELLO_WAIT:
2147     /* Perhaps our HELLO_ACK was lost */
2148     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != 
2149                    socket->control_retransmission_task_id);
2150     GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
2151     socket->control_retransmission_task_id =
2152       GNUNET_SCHEDULER_add_now (&control_retransmission_task, socket);
2153     break;
2154   default:
2155     LOG_DEBUG( "%s: Client sent HELLO when in state %d\n",
2156                GNUNET_i2s (&socket->other_peer), socket->state);
2157     /* FIXME: Send RESET? */
2158   }
2159   return GNUNET_OK;
2160 }
2161
2162
2163 /**
2164  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
2165  *
2166  * @param cls the closure
2167  * @param tunnel connection to the other end
2168  * @param tunnel_ctx the socket
2169  * @param sender who sent the message
2170  * @param message the actual message
2171  * @param atsi performance data for the connection
2172  * @return GNUNET_OK to keep the connection open,
2173  *         GNUNET_SYSERR to close it (signal serious error)
2174  */
2175 static int
2176 server_handle_hello_ack (void *cls,
2177                          struct GNUNET_MESH_Tunnel *tunnel,
2178                          void **tunnel_ctx,
2179                          const struct GNUNET_PeerIdentity *sender,
2180                          const struct GNUNET_MessageHeader *message,
2181                          const struct GNUNET_ATS_Information*atsi)
2182 {
2183   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2184   const struct GNUNET_STREAM_HelloAckMessage *ack_message;
2185
2186   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK ==
2187                  ntohs (message->type));
2188   GNUNET_assert (socket->tunnel == tunnel);
2189   ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
2190   switch (socket->state)  
2191   {
2192   case STATE_HELLO_WAIT:
2193     LOG (GNUNET_ERROR_TYPE_DEBUG,
2194          "%s: Received HELLO_ACK from %s\n",
2195          GNUNET_i2s (&socket->other_peer),
2196          GNUNET_i2s (&socket->other_peer));
2197     socket->read_sequence_number = ntohl (ack_message->sequence_number);
2198     LOG (GNUNET_ERROR_TYPE_DEBUG,
2199          "%s: Read sequence number %u\n",
2200          GNUNET_i2s (&socket->other_peer),
2201          (unsigned int) socket->read_sequence_number);
2202     socket->receiver_window_available = 
2203       ntohl (ack_message->receiver_window_size);
2204     set_state_established (NULL, socket);
2205     break;
2206   default:
2207     LOG (GNUNET_ERROR_TYPE_DEBUG,
2208          "Client sent HELLO_ACK when in state %d\n", socket->state);    
2209   }
2210   return GNUNET_OK;
2211 }
2212
2213
2214 /**
2215  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
2216  *
2217  * @param cls the closure
2218  * @param tunnel connection to the other end
2219  * @param tunnel_ctx the socket
2220  * @param sender who sent the message
2221  * @param message the actual message
2222  * @param atsi performance data for the connection
2223  * @return GNUNET_OK to keep the connection open,
2224  *         GNUNET_SYSERR to close it (signal serious error)
2225  */
2226 static int
2227 server_handle_reset (void *cls,
2228                      struct GNUNET_MESH_Tunnel *tunnel,
2229                      void **tunnel_ctx,
2230                      const struct GNUNET_PeerIdentity *sender,
2231                      const struct GNUNET_MessageHeader *message,
2232                      const struct GNUNET_ATS_Information*atsi)
2233 {
2234   // struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2235
2236   return GNUNET_OK;
2237 }
2238
2239
2240 /**
2241  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
2242  *
2243  * @param cls the closure
2244  * @param tunnel connection to the other end
2245  * @param tunnel_ctx the socket
2246  * @param sender who sent the message
2247  * @param message the actual message
2248  * @param atsi performance data for the connection
2249  * @return GNUNET_OK to keep the connection open,
2250  *         GNUNET_SYSERR to close it (signal serious error)
2251  */
2252 static int
2253 server_handle_transmit_close (void *cls,
2254                               struct GNUNET_MESH_Tunnel *tunnel,
2255                               void **tunnel_ctx,
2256                               const struct GNUNET_PeerIdentity *sender,
2257                               const struct GNUNET_MessageHeader *message,
2258                               const struct GNUNET_ATS_Information*atsi)
2259 {
2260   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2261
2262   return handle_transmit_close (socket,
2263                                 tunnel,
2264                                 sender,
2265                                 (struct GNUNET_STREAM_MessageHeader *)message,
2266                                 atsi);
2267 }
2268
2269
2270 /**
2271  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
2272  *
2273  * @param cls the closure
2274  * @param tunnel connection to the other end
2275  * @param tunnel_ctx the socket
2276  * @param sender who sent the message
2277  * @param message the actual message
2278  * @param atsi performance data for the connection
2279  * @return GNUNET_OK to keep the connection open,
2280  *         GNUNET_SYSERR to close it (signal serious error)
2281  */
2282 static int
2283 server_handle_transmit_close_ack (void *cls,
2284                                   struct GNUNET_MESH_Tunnel *tunnel,
2285                                   void **tunnel_ctx,
2286                                   const struct GNUNET_PeerIdentity *sender,
2287                                   const struct GNUNET_MessageHeader *message,
2288                                   const struct GNUNET_ATS_Information*atsi)
2289 {
2290   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2291
2292   return handle_generic_close_ack (socket,
2293                                    tunnel,
2294                                    sender,
2295                                    (const struct GNUNET_STREAM_MessageHeader *)
2296                                    message,
2297                                    atsi,
2298                                    SHUT_WR);
2299 }
2300
2301
2302 /**
2303  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
2304  *
2305  * @param cls the closure
2306  * @param tunnel connection to the other end
2307  * @param tunnel_ctx the socket
2308  * @param sender who sent the message
2309  * @param message the actual message
2310  * @param atsi performance data for the connection
2311  * @return GNUNET_OK to keep the connection open,
2312  *         GNUNET_SYSERR to close it (signal serious error)
2313  */
2314 static int
2315 server_handle_receive_close (void *cls,
2316                              struct GNUNET_MESH_Tunnel *tunnel,
2317                              void **tunnel_ctx,
2318                              const struct GNUNET_PeerIdentity *sender,
2319                              const struct GNUNET_MessageHeader *message,
2320                              const struct GNUNET_ATS_Information*atsi)
2321 {
2322   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2323
2324   return
2325     handle_receive_close (socket,
2326                           tunnel,
2327                           sender,
2328                           (const struct GNUNET_STREAM_MessageHeader *) message,
2329                           atsi);
2330 }
2331
2332
2333 /**
2334  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
2335  *
2336  * @param cls the closure
2337  * @param tunnel connection to the other end
2338  * @param tunnel_ctx the socket
2339  * @param sender who sent the message
2340  * @param message the actual message
2341  * @param atsi performance data for the connection
2342  * @return GNUNET_OK to keep the connection open,
2343  *         GNUNET_SYSERR to close it (signal serious error)
2344  */
2345 static int
2346 server_handle_receive_close_ack (void *cls,
2347                                  struct GNUNET_MESH_Tunnel *tunnel,
2348                                  void **tunnel_ctx,
2349                                  const struct GNUNET_PeerIdentity *sender,
2350                                  const struct GNUNET_MessageHeader *message,
2351                                  const struct GNUNET_ATS_Information*atsi)
2352 {
2353   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2354
2355   return handle_generic_close_ack (socket,
2356                                    tunnel,
2357                                    sender,
2358                                    (const struct GNUNET_STREAM_MessageHeader *)
2359                                    message,
2360                                    atsi,
2361                                    SHUT_RD);
2362 }
2363
2364
2365 /**
2366  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
2367  *
2368  * @param cls the listen socket (from GNUNET_MESH_connect in
2369  *          GNUNET_STREAM_listen) 
2370  * @param tunnel connection to the other end
2371  * @param tunnel_ctx the socket
2372  * @param sender who sent the message
2373  * @param message the actual message
2374  * @param atsi performance data for the connection
2375  * @return GNUNET_OK to keep the connection open,
2376  *         GNUNET_SYSERR to close it (signal serious error)
2377  */
2378 static int
2379 server_handle_close (void *cls,
2380                      struct GNUNET_MESH_Tunnel *tunnel,
2381                      void **tunnel_ctx,
2382                      const struct GNUNET_PeerIdentity *sender,
2383                      const struct GNUNET_MessageHeader *message,
2384                      const struct GNUNET_ATS_Information*atsi)
2385 {
2386   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2387   
2388   return handle_close (socket,
2389                        tunnel,
2390                        sender,
2391                        (const struct GNUNET_STREAM_MessageHeader *) message,
2392                        atsi);
2393 }
2394
2395
2396 /**
2397  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
2398  *
2399  * @param cls the closure
2400  * @param tunnel connection to the other end
2401  * @param tunnel_ctx the socket
2402  * @param sender who sent the message
2403  * @param message the actual message
2404  * @param atsi performance data for the connection
2405  * @return GNUNET_OK to keep the connection open,
2406  *         GNUNET_SYSERR to close it (signal serious error)
2407  */
2408 static int
2409 server_handle_close_ack (void *cls,
2410                          struct GNUNET_MESH_Tunnel *tunnel,
2411                          void **tunnel_ctx,
2412                          const struct GNUNET_PeerIdentity *sender,
2413                          const struct GNUNET_MessageHeader *message,
2414                          const struct GNUNET_ATS_Information*atsi)
2415 {
2416   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2417
2418   return handle_generic_close_ack (socket,
2419                                    tunnel,
2420                                    sender,
2421                                    (const struct GNUNET_STREAM_MessageHeader *) 
2422                                    message,
2423                                    atsi,
2424                                    SHUT_RDWR);
2425 }
2426
2427
2428 /**
2429  * Handler for DATA_ACK messages
2430  *
2431  * @param socket the socket through which the ack was received
2432  * @param tunnel connection to the other end
2433  * @param sender who sent the message
2434  * @param ack the acknowledgment message
2435  * @param atsi performance data for the connection
2436  * @return GNUNET_OK to keep the connection open,
2437  *         GNUNET_SYSERR to close it (signal serious error)
2438  */
2439 static int
2440 handle_ack (struct GNUNET_STREAM_Socket *socket,
2441             struct GNUNET_MESH_Tunnel *tunnel,
2442             const struct GNUNET_PeerIdentity *sender,
2443             const struct GNUNET_STREAM_AckMessage *ack,
2444             const struct GNUNET_ATS_Information*atsi)
2445 {
2446   unsigned int packet;
2447   int need_retransmission;
2448   uint32_t sequence_difference;
2449   
2450   if (0 != memcmp (sender,
2451                    &socket->other_peer,
2452                    sizeof (struct GNUNET_PeerIdentity)))
2453   {
2454     LOG (GNUNET_ERROR_TYPE_DEBUG,
2455          "%s: Received ACK from non-confirming peer\n",
2456          GNUNET_i2s (&socket->other_peer));
2457     return GNUNET_YES;
2458   }
2459   switch (socket->state)
2460   {
2461   case (STATE_ESTABLISHED):
2462   case (STATE_RECEIVE_CLOSED):
2463   case (STATE_RECEIVE_CLOSE_WAIT):
2464     if (NULL == socket->write_handle)
2465     {
2466       LOG (GNUNET_ERROR_TYPE_DEBUG,
2467            "%s: Received DATA_ACK when write_handle is NULL\n",
2468            GNUNET_i2s (&socket->other_peer));
2469       return GNUNET_OK;
2470     }
2471     if (!((socket->write_sequence_number 
2472            - ntohl (ack->base_sequence_number)) < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
2473     {
2474       LOG (GNUNET_ERROR_TYPE_DEBUG,
2475            "%s: Received DATA_ACK with unexpected base sequence number\n",
2476            GNUNET_i2s (&socket->other_peer));
2477       LOG (GNUNET_ERROR_TYPE_DEBUG,
2478            "%s: Current write sequence: %u; Ack's base sequence: %u\n",
2479            GNUNET_i2s (&socket->other_peer),
2480            socket->write_sequence_number,
2481            ntohl (ack->base_sequence_number));
2482       return GNUNET_OK;
2483     }
2484     /* FIXME: include the case when write_handle is cancelled - ignore the 
2485        acks */
2486     LOG (GNUNET_ERROR_TYPE_DEBUG,
2487          "%s: Received DATA_ACK from %s\n",
2488          GNUNET_i2s (&socket->other_peer),
2489          GNUNET_i2s (&socket->other_peer));
2490       
2491     /* Cancel the retransmission task */
2492     if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
2493     {
2494       GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
2495       socket->data_retransmission_task_id = 
2496         GNUNET_SCHEDULER_NO_TASK;
2497     }
2498     for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2499     {
2500       if (NULL == socket->write_handle->messages[packet]) break;
2501       /* BS: Base sequence from ack; PS: sequence num of current packet */
2502       sequence_difference = ntohl (ack->base_sequence_number)
2503         - ntohl (socket->write_handle->messages[packet]->sequence_number);
2504       /* case where BS = PS + GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH */
2505       if ((sequence_difference == GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
2506           || ((sequence_difference < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
2507               && (0 != sequence_difference))) /* case: BS > PS and BS != PS*/
2508       {
2509         ackbitmap_modify_bit (&socket->write_handle->ack_bitmap, packet,
2510                               GNUNET_YES);
2511         continue;
2512       }
2513       if (GNUNET_YES == 
2514           ackbitmap_is_bit_set (&socket->write_handle->ack_bitmap,
2515                                 -sequence_difference))/*inversion as PS >= BS */
2516       {
2517         ackbitmap_modify_bit (&socket->write_handle->ack_bitmap, packet,
2518                               GNUNET_YES);
2519       }
2520     }
2521     /* Update the receive window remaining
2522        FIXME : Should update with the value from a data ack with greater
2523        sequence number */
2524     socket->receiver_window_available = 
2525       ntohl (ack->receive_window_remaining);
2526     /* Check if we have received all acknowledgements */
2527     need_retransmission = GNUNET_NO;
2528     for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2529     {
2530       if (NULL == socket->write_handle->messages[packet]) break;
2531       if (GNUNET_YES != ackbitmap_is_bit_set 
2532           (&socket->write_handle->ack_bitmap,packet))
2533       {
2534         need_retransmission = GNUNET_YES;
2535         break;
2536       }
2537     }
2538     if (GNUNET_YES == need_retransmission)
2539     {
2540       write_data (socket);
2541     }
2542     else      /* We have to call the write continuation callback now */
2543     {
2544       struct GNUNET_STREAM_IOWriteHandle *write_handle;
2545       
2546       /* Free the packets */
2547       for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2548       {
2549         GNUNET_free_non_null (socket->write_handle->messages[packet]);
2550       }
2551       write_handle = socket->write_handle;
2552       socket->write_handle = NULL;
2553       if (NULL != write_handle->write_cont)
2554         write_handle->write_cont (write_handle->write_cont_cls,
2555                                   socket->status,
2556                                   write_handle->size);
2557       /* We are done with the write handle - Freeing it */
2558       GNUNET_free (write_handle);
2559       LOG (GNUNET_ERROR_TYPE_DEBUG,
2560            "%s: Write completion callback completed\n",
2561            GNUNET_i2s (&socket->other_peer));      
2562     }
2563     break;
2564   default:
2565     break;
2566   }
2567   return GNUNET_OK;
2568 }
2569
2570
2571 /**
2572  * Handler for DATA_ACK messages
2573  *
2574  * @param cls the 'struct GNUNET_STREAM_Socket'
2575  * @param tunnel connection to the other end
2576  * @param tunnel_ctx unused
2577  * @param sender who sent the message
2578  * @param message the actual message
2579  * @param atsi performance data for the connection
2580  * @return GNUNET_OK to keep the connection open,
2581  *         GNUNET_SYSERR to close it (signal serious error)
2582  */
2583 static int
2584 client_handle_ack (void *cls,
2585                    struct GNUNET_MESH_Tunnel *tunnel,
2586                    void **tunnel_ctx,
2587                    const struct GNUNET_PeerIdentity *sender,
2588                    const struct GNUNET_MessageHeader *message,
2589                    const struct GNUNET_ATS_Information*atsi)
2590 {
2591   struct GNUNET_STREAM_Socket *socket = cls;
2592   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2593  
2594   return handle_ack (socket, tunnel, sender, ack, atsi);
2595 }
2596
2597
2598 /**
2599  * Handler for DATA_ACK messages
2600  *
2601  * @param cls the server's listen socket
2602  * @param tunnel connection to the other end
2603  * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
2604  * @param sender who sent the message
2605  * @param message the actual message
2606  * @param atsi performance data for the connection
2607  * @return GNUNET_OK to keep the connection open,
2608  *         GNUNET_SYSERR to close it (signal serious error)
2609  */
2610 static int
2611 server_handle_ack (void *cls,
2612                    struct GNUNET_MESH_Tunnel *tunnel,
2613                    void **tunnel_ctx,
2614                    const struct GNUNET_PeerIdentity *sender,
2615                    const struct GNUNET_MessageHeader *message,
2616                    const struct GNUNET_ATS_Information*atsi)
2617 {
2618   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2619   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2620  
2621   return handle_ack (socket, tunnel, sender, ack, atsi);
2622 }
2623
2624
2625 /**
2626  * For client message handlers, the stream socket is in the
2627  * closure argument.
2628  */
2629 static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
2630   {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2631   {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2632    sizeof (struct GNUNET_STREAM_AckMessage) },
2633   {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2634    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2635   {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2636    sizeof (struct GNUNET_STREAM_MessageHeader)},
2637   {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2638    sizeof (struct GNUNET_STREAM_MessageHeader)},
2639   {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2640    sizeof (struct GNUNET_STREAM_MessageHeader)},
2641   {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2642    sizeof (struct GNUNET_STREAM_MessageHeader)},
2643   {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2644    sizeof (struct GNUNET_STREAM_MessageHeader)},
2645   {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2646    sizeof (struct GNUNET_STREAM_MessageHeader)},
2647   {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2648    sizeof (struct GNUNET_STREAM_MessageHeader)},
2649   {NULL, 0, 0}
2650 };
2651
2652
2653 /**
2654  * For server message handlers, the stream socket is in the
2655  * tunnel context, and the listen socket in the closure argument.
2656  */
2657 static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
2658   {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2659   {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2660    sizeof (struct GNUNET_STREAM_AckMessage) },
2661   {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 
2662    sizeof (struct GNUNET_STREAM_MessageHeader)},
2663   {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2664    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2665   {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2666    sizeof (struct GNUNET_STREAM_MessageHeader)},
2667   {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2668    sizeof (struct GNUNET_STREAM_MessageHeader)},
2669   {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2670    sizeof (struct GNUNET_STREAM_MessageHeader)},
2671   {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2672    sizeof (struct GNUNET_STREAM_MessageHeader)},
2673   {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2674    sizeof (struct GNUNET_STREAM_MessageHeader)},
2675   {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2676    sizeof (struct GNUNET_STREAM_MessageHeader)},
2677   {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2678    sizeof (struct GNUNET_STREAM_MessageHeader)},
2679   {NULL, 0, 0}
2680 };
2681
2682
2683 /**
2684  * Function called when our target peer is connected to our tunnel
2685  *
2686  * @param cls the socket for which this tunnel is created
2687  * @param peer the peer identity of the target
2688  * @param atsi performance data for the connection
2689  */
2690 static void
2691 mesh_peer_connect_callback (void *cls,
2692                             const struct GNUNET_PeerIdentity *peer,
2693                             const struct GNUNET_ATS_Information * atsi)
2694 {
2695   struct GNUNET_STREAM_Socket *socket = cls;
2696   struct GNUNET_STREAM_MessageHeader *message;
2697   
2698   if (0 != memcmp (peer,
2699                    &socket->other_peer,
2700                    sizeof (struct GNUNET_PeerIdentity)))
2701   {
2702     LOG (GNUNET_ERROR_TYPE_DEBUG,
2703          "%s: A peer which is not our target has connected to our tunnel\n",
2704          GNUNET_i2s(peer));
2705     return;
2706   }
2707   LOG (GNUNET_ERROR_TYPE_DEBUG,
2708        "%s: Target peer %s connected\n",
2709        GNUNET_i2s (&socket->other_peer),
2710        GNUNET_i2s (&socket->other_peer));
2711   /* Set state to INIT */
2712   socket->state = STATE_INIT;
2713   /* Send HELLO message */
2714   message = generate_hello ();
2715   queue_message (socket, message, &set_state_hello_wait, NULL, GNUNET_NO);
2716   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
2717                  socket->control_retransmission_task_id);
2718   socket->control_retransmission_task_id =
2719     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
2720                                   &control_retransmission_task, socket);
2721 }
2722
2723
2724 /**
2725  * Function called when our target peer is disconnected from our tunnel
2726  *
2727  * @param cls the socket associated which this tunnel
2728  * @param peer the peer identity of the target
2729  */
2730 static void
2731 mesh_peer_disconnect_callback (void *cls,
2732                                const struct GNUNET_PeerIdentity *peer)
2733 {
2734   struct GNUNET_STREAM_Socket *socket=cls;
2735   
2736   /* If the state is SHUTDOWN its ok; else set the state of the socket to SYSERR */
2737   LOG (GNUNET_ERROR_TYPE_DEBUG,
2738        "%s: Other peer %s disconnected \n",
2739        GNUNET_i2s (&socket->other_peer),
2740        GNUNET_i2s (&socket->other_peer));
2741 }
2742
2743
2744 /**
2745  * Method called whenever a peer creates a tunnel to us
2746  *
2747  * @param cls closure
2748  * @param tunnel new handle to the tunnel
2749  * @param initiator peer that started the tunnel
2750  * @param atsi performance information for the tunnel
2751  * @return initial tunnel context for the tunnel
2752  *         (can be NULL -- that's not an error)
2753  */
2754 static void *
2755 new_tunnel_notify (void *cls,
2756                    struct GNUNET_MESH_Tunnel *tunnel,
2757                    const struct GNUNET_PeerIdentity *initiator,
2758                    const struct GNUNET_ATS_Information *atsi)
2759 {
2760   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2761   struct GNUNET_STREAM_Socket *socket;
2762
2763   /* FIXME: If a tunnel is already created, we should not accept new tunnels
2764      from the same peer again until the socket is closed */
2765
2766   if (GNUNET_NO == lsocket->listening)
2767   {
2768 //     FIXME: socket uninitalized
2769 //     FIXME: cannot use GNUNET_i2s twice in same call (static buffer)
2770 //     LOG (GNUNET_ERROR_TYPE_DEBUG,
2771 //          "%s: Destroying tunnel from peer %s as we don't have the lock\n",
2772 //          GNUNET_i2s (&socket->other_peer),
2773 //          GNUNET_i2s (&socket->other_peer));
2774     GNUNET_MESH_tunnel_destroy (tunnel);
2775     return NULL;
2776   }
2777   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2778   socket->other_peer = *initiator;
2779   socket->tunnel = tunnel;
2780   socket->session_id = 0;       /* FIXME */
2781   socket->state = STATE_INIT;
2782   socket->lsocket = lsocket;
2783   socket->retransmit_timeout = lsocket->retransmit_timeout;
2784   socket->testing_active = lsocket->testing_active;
2785   socket->testing_set_write_sequence_number_value =
2786     lsocket->testing_set_write_sequence_number_value;    
2787   LOG (GNUNET_ERROR_TYPE_DEBUG,
2788        "%s: Peer %s initiated tunnel to us\n", 
2789        GNUNET_i2s (&socket->other_peer),
2790        GNUNET_i2s (&socket->other_peer));
2791   /* FIXME: Copy MESH handle from lsocket to socket */
2792   return socket;
2793 }
2794
2795
2796 /**
2797  * Function called whenever an inbound tunnel is destroyed.  Should clean up
2798  * any associated state.  This function is NOT called if the client has
2799  * explicitly asked for the tunnel to be destroyed using
2800  * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
2801  * the tunnel.
2802  *
2803  * @param cls closure (set from GNUNET_MESH_connect)
2804  * @param tunnel connection to the other end (henceforth invalid)
2805  * @param tunnel_ctx place where local state associated
2806  *                   with the tunnel is stored
2807  */
2808 static void 
2809 tunnel_cleaner (void *cls,
2810                 const struct GNUNET_MESH_Tunnel *tunnel,
2811                 void *tunnel_ctx)
2812 {
2813   struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
2814
2815   if (tunnel != socket->tunnel)
2816     return;
2817
2818   GNUNET_break_op(0);
2819   LOG (GNUNET_ERROR_TYPE_DEBUG,
2820        "%s: Peer %s has terminated connection abruptly\n",
2821        GNUNET_i2s (&socket->other_peer),
2822        GNUNET_i2s (&socket->other_peer));
2823
2824   socket->status = GNUNET_STREAM_SHUTDOWN;
2825
2826   /* Clear Transmit handles */
2827   if (NULL != socket->transmit_handle)
2828   {
2829     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2830     socket->transmit_handle = NULL;
2831   }
2832   /* Stop Tasks using socket->tunnel */
2833   if (GNUNET_SCHEDULER_NO_TASK != socket->ack_task_id)
2834   {
2835     GNUNET_SCHEDULER_cancel (socket->ack_task_id);
2836     socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
2837   }
2838   if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
2839   {
2840     GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
2841     socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
2842   }
2843   /* FIXME: Cancel all other tasks using socket->tunnel */
2844   socket->tunnel = NULL;
2845 }
2846
2847
2848 /**
2849  * Callback to signal timeout on lockmanager lock acquire
2850  *
2851  * @param cls the ListenSocket
2852  * @param tc the scheduler task context
2853  */
2854 static void
2855 lockmanager_acquire_timeout (void *cls, 
2856                              const struct GNUNET_SCHEDULER_TaskContext *tc)
2857 {
2858   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2859   GNUNET_STREAM_ListenCallback listen_cb;
2860   void *listen_cb_cls;
2861
2862   lsocket->lockmanager_acquire_timeout_task = GNUNET_SCHEDULER_NO_TASK;
2863   listen_cb = lsocket->listen_cb;
2864   listen_cb_cls = lsocket->listen_cb_cls;
2865   GNUNET_STREAM_listen_close (lsocket);
2866   if (NULL != listen_cb)
2867     listen_cb (listen_cb_cls, NULL, NULL);  
2868 }
2869
2870
2871 /**
2872  * Callback to notify us on the status changes on app_port lock
2873  *
2874  * @param cls the ListenSocket
2875  * @param domain the domain name of the lock
2876  * @param lock the app_port
2877  * @param status the current status of the lock
2878  */
2879 static void
2880 lock_status_change_cb (void *cls, const char *domain, uint32_t lock,
2881                        enum GNUNET_LOCKMANAGER_Status status)
2882 {
2883   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2884
2885   GNUNET_assert (lock == (uint32_t) lsocket->port);
2886   if (GNUNET_LOCKMANAGER_SUCCESS == status)
2887   {
2888     lsocket->listening = GNUNET_YES;
2889     if (GNUNET_SCHEDULER_NO_TASK != lsocket->lockmanager_acquire_timeout_task)
2890     {
2891       GNUNET_SCHEDULER_cancel (lsocket->lockmanager_acquire_timeout_task);
2892       lsocket->lockmanager_acquire_timeout_task = GNUNET_SCHEDULER_NO_TASK;
2893     }
2894     if (NULL == lsocket->mesh)
2895     {
2896       GNUNET_MESH_ApplicationType ports[] = {lsocket->port, 0};
2897
2898       lsocket->mesh = GNUNET_MESH_connect (lsocket->cfg,
2899                                            lsocket, /* Closure */
2900                                            &new_tunnel_notify,
2901                                            &tunnel_cleaner,
2902                                            server_message_handlers,
2903                                            ports);
2904       GNUNET_assert (NULL != lsocket->mesh);
2905       if (NULL != lsocket->listen_ok_cb)
2906       {
2907         (void) lsocket->listen_ok_cb ();
2908       }
2909     }
2910   }
2911   if (GNUNET_LOCKMANAGER_RELEASE == status)
2912     lsocket->listening = GNUNET_NO;
2913 }
2914
2915
2916 /*****************/
2917 /* API functions */
2918 /*****************/
2919
2920
2921 /**
2922  * Tries to open a stream to the target peer
2923  *
2924  * @param cfg configuration to use
2925  * @param target the target peer to which the stream has to be opened
2926  * @param app_port the application port number which uniquely identifies this
2927  *            stream
2928  * @param open_cb this function will be called after stream has be established;
2929  *          cannot be NULL
2930  * @param open_cb_cls the closure for open_cb
2931  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
2932  * @return if successful it returns the stream socket; NULL if stream cannot be
2933  *         opened 
2934  */
2935 struct GNUNET_STREAM_Socket *
2936 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
2937                     const struct GNUNET_PeerIdentity *target,
2938                     GNUNET_MESH_ApplicationType app_port,
2939                     GNUNET_STREAM_OpenCallback open_cb,
2940                     void *open_cb_cls,
2941                     ...)
2942 {
2943   struct GNUNET_STREAM_Socket *socket;
2944   enum GNUNET_STREAM_Option option;
2945   GNUNET_MESH_ApplicationType ports[] = {app_port, 0};
2946   va_list vargs;
2947
2948   LOG (GNUNET_ERROR_TYPE_DEBUG,
2949        "%s\n", __func__);
2950   GNUNET_assert (NULL != open_cb);
2951   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2952   socket->other_peer = *target;
2953   socket->open_cb = open_cb;
2954   socket->open_cls = open_cb_cls;
2955   /* Set defaults */
2956   socket->retransmit_timeout = TIME_REL_SECS (default_timeout);
2957   socket->testing_active = GNUNET_NO;
2958   va_start (vargs, open_cb_cls); /* Parse variable args */
2959   do {
2960     option = va_arg (vargs, enum GNUNET_STREAM_Option);
2961     switch (option)
2962     {
2963     case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
2964       /* Expect struct GNUNET_TIME_Relative */
2965       socket->retransmit_timeout = va_arg (vargs,
2966                                            struct GNUNET_TIME_Relative);
2967       break;
2968     case GNUNET_STREAM_OPTION_TESTING_SET_WRITE_SEQUENCE_NUMBER:
2969       socket->testing_active = GNUNET_YES;
2970       socket->testing_set_write_sequence_number_value = va_arg (vargs,
2971                                                                 uint32_t);
2972       break;
2973     case GNUNET_STREAM_OPTION_LISTEN_TIMEOUT:
2974       GNUNET_break (0);          /* Option irrelevant in STREAM_open */
2975       break;
2976     case GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS:
2977       GNUNET_break (0);          /* Option irrelevant in STREAM_open */
2978       break;
2979     case GNUNET_STREAM_OPTION_END:
2980       break;
2981     }
2982   } while (GNUNET_STREAM_OPTION_END != option);
2983   va_end (vargs);               /* End of variable args parsing */
2984   socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
2985                                       socket, /* cls */
2986                                       NULL, /* No inbound tunnel handler */
2987                                       NULL, /* No in-tunnel cleaner */
2988                                       client_message_handlers,
2989                                       ports); /* We don't get inbound tunnels */
2990   if (NULL == socket->mesh)   /* Fail if we cannot connect to mesh */
2991   {
2992     GNUNET_free (socket);
2993     return NULL;
2994   }
2995   /* Now create the mesh tunnel to target */
2996   LOG (GNUNET_ERROR_TYPE_DEBUG, "Creating MESH Tunnel\n");
2997   socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
2998                                               NULL, /* Tunnel context */
2999                                               &mesh_peer_connect_callback,
3000                                               &mesh_peer_disconnect_callback,
3001                                               socket);
3002   GNUNET_assert (NULL != socket->tunnel);
3003   GNUNET_MESH_peer_request_connect_add (socket->tunnel,
3004                                         &socket->other_peer);
3005   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s() END\n", __func__);
3006   return socket;
3007 }
3008
3009
3010 /**
3011  * Shutdown the stream for reading or writing (similar to man 2 shutdown).
3012  *
3013  * @param socket the stream socket
3014  * @param operation SHUT_RD, SHUT_WR or SHUT_RDWR
3015  * @param completion_cb the callback that will be called upon successful
3016  *          shutdown of given operation
3017  * @param completion_cls the closure for the completion callback
3018  * @return the shutdown handle
3019  */
3020 struct GNUNET_STREAM_ShutdownHandle *
3021 GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
3022                         int operation,
3023                         GNUNET_STREAM_ShutdownCompletion completion_cb,
3024                         void *completion_cls)
3025 {
3026   struct GNUNET_STREAM_ShutdownHandle *handle;
3027   struct GNUNET_STREAM_MessageHeader *msg;
3028   
3029   GNUNET_assert (NULL == socket->shutdown_handle);
3030
3031   handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ShutdownHandle));
3032   handle->socket = socket;
3033   handle->completion_cb = completion_cb;
3034   handle->completion_cls = completion_cls;
3035   socket->shutdown_handle = handle;
3036
3037   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
3038   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
3039   switch (operation)
3040   {
3041   case SHUT_RD:
3042     handle->operation = SHUT_RD;
3043     if (NULL != socket->read_handle)
3044       LOG (GNUNET_ERROR_TYPE_WARNING,
3045            "Existing read handle should be cancelled before shutting"
3046            " down reading\n");
3047     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
3048     queue_message (socket, msg, &set_state_receive_close_wait, NULL,
3049                    GNUNET_NO);
3050     break;
3051   case SHUT_WR:
3052     handle->operation = SHUT_WR;
3053     if (NULL != socket->write_handle)
3054       LOG (GNUNET_ERROR_TYPE_WARNING,
3055            "Existing write handle should be cancelled before shutting"
3056            " down writing\n");
3057     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
3058     queue_message (socket, msg, &set_state_transmit_close_wait, NULL,
3059                    GNUNET_NO);
3060     break;
3061   case SHUT_RDWR:
3062     handle->operation = SHUT_RDWR;
3063     if (NULL != socket->write_handle)
3064       LOG (GNUNET_ERROR_TYPE_WARNING,
3065            "Existing write handle should be cancelled before shutting"
3066            " down writing\n");
3067     if (NULL != socket->read_handle)
3068       LOG (GNUNET_ERROR_TYPE_WARNING,
3069            "Existing read handle should be cancelled before shutting"
3070            " down reading\n");
3071     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
3072     queue_message (socket, msg, &set_state_close_wait, NULL, GNUNET_NO);
3073     break;
3074   default:
3075     LOG (GNUNET_ERROR_TYPE_WARNING,
3076          "GNUNET_STREAM_shutdown called with invalid value for "
3077          "parameter operation -- Ignoring\n");
3078     GNUNET_free (msg);
3079     GNUNET_free (handle);
3080     return NULL;
3081   }
3082   handle->close_msg_retransmission_task_id =
3083     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
3084                                   &close_msg_retransmission_task,
3085                                   handle);
3086   return handle;
3087 }
3088
3089
3090 /**
3091  * Cancels a pending shutdown
3092  *
3093  * @param handle the shutdown handle returned from GNUNET_STREAM_shutdown
3094  */
3095 void
3096 GNUNET_STREAM_shutdown_cancel (struct GNUNET_STREAM_ShutdownHandle *handle)
3097 {
3098   if (GNUNET_SCHEDULER_NO_TASK != handle->close_msg_retransmission_task_id)
3099     GNUNET_SCHEDULER_cancel (handle->close_msg_retransmission_task_id);
3100   handle->socket->shutdown_handle = NULL;
3101   GNUNET_free (handle);
3102 }
3103
3104
3105 /**
3106  * Closes the stream
3107  *
3108  * @param socket the stream socket
3109  */
3110 void
3111 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
3112 {
3113   struct MessageQueue *head;
3114
3115   if (NULL != socket->read_handle)
3116   {
3117     LOG (GNUNET_ERROR_TYPE_WARNING,
3118          "Closing STREAM socket when a read handle is pending\n");
3119   }
3120   if (NULL != socket->write_handle)
3121   {
3122     LOG (GNUNET_ERROR_TYPE_WARNING,
3123          "Closing STREAM socket when a write handle is pending\n");
3124     GNUNET_STREAM_io_write_cancel (socket->write_handle);
3125     //socket->write_handle = NULL;
3126   }
3127   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
3128   {
3129     /* socket closed with read task pending!? */
3130     GNUNET_break (0);
3131     GNUNET_SCHEDULER_cancel (socket->read_task_id);
3132     socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
3133   }  
3134   /* Terminate the ack'ing tasks if they are still present */
3135   if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK)
3136   {
3137     GNUNET_SCHEDULER_cancel (socket->ack_task_id);
3138     socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
3139   }
3140   /* Terminate the control retransmission tasks */
3141   if (GNUNET_SCHEDULER_NO_TASK != socket->control_retransmission_task_id)
3142   {
3143     GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
3144   }
3145   /* Clear Transmit handles */
3146   if (NULL != socket->transmit_handle)
3147   {
3148     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
3149     socket->transmit_handle = NULL;
3150   }
3151   /* Clear existing message queue */
3152   while (NULL != (head = socket->queue_head)) {
3153     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
3154                                  socket->queue_tail,
3155                                  head);
3156     GNUNET_free (head->message);
3157     GNUNET_free (head);
3158   }
3159
3160   /* Close associated tunnel */
3161   if (NULL != socket->tunnel)
3162   {
3163     GNUNET_MESH_tunnel_destroy (socket->tunnel);
3164     socket->tunnel = NULL;
3165   }
3166
3167   /* Close mesh connection */
3168   if (NULL != socket->mesh && NULL == socket->lsocket)
3169   {
3170     GNUNET_MESH_disconnect (socket->mesh);
3171     socket->mesh = NULL;
3172   }
3173   
3174   /* Release receive buffer */
3175   if (NULL != socket->receive_buffer)
3176   {
3177     GNUNET_free (socket->receive_buffer);
3178   }
3179
3180   GNUNET_free (socket);
3181 }
3182
3183
3184 /**
3185  * Listens for stream connections for a specific application ports
3186  *
3187  * @param cfg the configuration to use
3188  * @param app_port the application port for which new streams will be accepted
3189  * @param listen_cb this function will be called when a peer tries to establish
3190  *            a stream with us
3191  * @param listen_cb_cls closure for listen_cb
3192  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
3193  * @return listen socket, NULL for any error
3194  */
3195 struct GNUNET_STREAM_ListenSocket *
3196 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
3197                       GNUNET_MESH_ApplicationType app_port,
3198                       GNUNET_STREAM_ListenCallback listen_cb,
3199                       void *listen_cb_cls,
3200                       ...)
3201 {
3202   /* FIXME: Add variable args for passing configration options? */
3203   struct GNUNET_STREAM_ListenSocket *lsocket;
3204   struct GNUNET_TIME_Relative listen_timeout;
3205   enum GNUNET_STREAM_Option option;
3206   va_list vargs;
3207
3208   GNUNET_assert (NULL != listen_cb);
3209   lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
3210   lsocket->cfg = GNUNET_CONFIGURATION_dup (cfg);
3211   lsocket->lockmanager = GNUNET_LOCKMANAGER_connect (lsocket->cfg);
3212   if (NULL == lsocket->lockmanager)
3213   {
3214     GNUNET_CONFIGURATION_destroy (lsocket->cfg);
3215     GNUNET_free (lsocket);
3216     return NULL;
3217   }
3218   lsocket->listening = GNUNET_NO;/* We listen when we get a lock on app_port */  
3219   /* Set defaults */
3220   lsocket->retransmit_timeout = TIME_REL_SECS (default_timeout);
3221   lsocket->testing_active = GNUNET_NO;
3222   lsocket->listen_ok_cb = NULL;
3223   listen_timeout = TIME_REL_SECS (60); /* A minute for listen timeout */  
3224   va_start (vargs, listen_cb_cls);
3225   do {
3226     option = va_arg (vargs, enum GNUNET_STREAM_Option);
3227     switch (option)
3228     {
3229     case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
3230       lsocket->retransmit_timeout = va_arg (vargs,
3231                                             struct GNUNET_TIME_Relative);
3232       break;
3233     case GNUNET_STREAM_OPTION_TESTING_SET_WRITE_SEQUENCE_NUMBER:
3234       lsocket->testing_active = GNUNET_YES;
3235       lsocket->testing_set_write_sequence_number_value = va_arg (vargs,
3236                                                                  uint32_t);
3237       break;
3238     case GNUNET_STREAM_OPTION_LISTEN_TIMEOUT:
3239       listen_timeout = GNUNET_TIME_relative_multiply
3240         (GNUNET_TIME_UNIT_MILLISECONDS, va_arg (vargs, uint32_t));
3241       break;
3242     case GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS:
3243       lsocket->listen_ok_cb = va_arg (vargs,
3244                                       GNUNET_STREAM_ListenSuccessCallback);
3245       break;
3246     case GNUNET_STREAM_OPTION_END:
3247       break;
3248     }
3249   } while (GNUNET_STREAM_OPTION_END != option);
3250   va_end (vargs);
3251   lsocket->port = app_port;
3252   lsocket->listen_cb = listen_cb;
3253   lsocket->listen_cb_cls = listen_cb_cls;
3254   lsocket->locking_request = 
3255     GNUNET_LOCKMANAGER_acquire_lock (lsocket->lockmanager, locking_domain,
3256                                      (uint32_t) lsocket->port,
3257                                      &lock_status_change_cb, lsocket);
3258   lsocket->lockmanager_acquire_timeout_task =
3259     GNUNET_SCHEDULER_add_delayed (listen_timeout,
3260                                   &lockmanager_acquire_timeout, lsocket);
3261   return lsocket;
3262 }
3263
3264
3265 /**
3266  * Closes the listen socket
3267  *
3268  * @param lsocket the listen socket
3269  */
3270 void
3271 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
3272 {
3273   /* Close MESH connection */
3274   if (NULL != lsocket->mesh)
3275     GNUNET_MESH_disconnect (lsocket->mesh);
3276   GNUNET_CONFIGURATION_destroy (lsocket->cfg);
3277   if (GNUNET_SCHEDULER_NO_TASK != lsocket->lockmanager_acquire_timeout_task)
3278     GNUNET_SCHEDULER_cancel (lsocket->lockmanager_acquire_timeout_task);
3279   if (NULL != lsocket->locking_request)
3280     GNUNET_LOCKMANAGER_cancel_request (lsocket->locking_request);
3281   if (NULL != lsocket->lockmanager)
3282     GNUNET_LOCKMANAGER_disconnect (lsocket->lockmanager);
3283   GNUNET_free (lsocket);
3284 }
3285
3286
3287 /**
3288  * Tries to write the given data to the stream. The maximum size of data that
3289  * can be written as part of a write operation is (64 * (64000 - sizeof (struct
3290  * GNUNET_STREAM_DataMessage))). If size is greater than this it is not an API
3291  * violation, however only the said number of maximum bytes will be written.
3292  *
3293  * @param socket the socket representing a stream
3294  * @param data the data buffer from where the data is written into the stream
3295  * @param size the number of bytes to be written from the data buffer
3296  * @param timeout the timeout period
3297  * @param write_cont the function to call upon writing some bytes into the
3298  *          stream 
3299  * @param write_cont_cls the closure
3300  *
3301  * @return handle to cancel the operation; if a previous write is pending or
3302  *           the stream has been shutdown for this operation then write_cont is
3303  *           immediately called and NULL is returned.
3304  */
3305 struct GNUNET_STREAM_IOWriteHandle *
3306 GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
3307                      const void *data,
3308                      size_t size,
3309                      struct GNUNET_TIME_Relative timeout,
3310                      GNUNET_STREAM_CompletionContinuation write_cont,
3311                      void *write_cont_cls)
3312 {
3313   unsigned int num_needed_packets;
3314   unsigned int packet;
3315   struct GNUNET_STREAM_IOWriteHandle *io_handle;
3316   uint32_t packet_size;
3317   uint32_t payload_size;
3318   struct GNUNET_STREAM_DataMessage *data_msg;
3319   const void *sweep;
3320   struct GNUNET_TIME_Relative ack_deadline;
3321
3322   LOG (GNUNET_ERROR_TYPE_DEBUG,
3323        "%s\n", __func__);
3324
3325   /* Return NULL if there is already a write request pending */
3326   if (NULL != socket->write_handle)
3327   {
3328     GNUNET_break (0);
3329     return NULL;
3330   }
3331
3332   switch (socket->state)
3333   {
3334   case STATE_TRANSMIT_CLOSED:
3335   case STATE_TRANSMIT_CLOSE_WAIT:
3336   case STATE_CLOSED:
3337   case STATE_CLOSE_WAIT:
3338     if (NULL != write_cont)
3339       write_cont (write_cont_cls, GNUNET_STREAM_SHUTDOWN, 0);
3340     LOG (GNUNET_ERROR_TYPE_DEBUG,
3341          "%s() END\n", __func__);
3342     return NULL;
3343   case STATE_INIT:
3344   case STATE_LISTEN:
3345   case STATE_HELLO_WAIT:
3346     if (NULL != write_cont)
3347       /* FIXME: GNUNET_STREAM_SYSERR?? */
3348       write_cont (write_cont_cls, GNUNET_STREAM_SYSERR, 0);
3349     LOG (GNUNET_ERROR_TYPE_DEBUG,
3350          "%s() END\n", __func__);
3351     return NULL;
3352   case STATE_ESTABLISHED:
3353   case STATE_RECEIVE_CLOSED:
3354   case STATE_RECEIVE_CLOSE_WAIT:
3355     break;
3356   }
3357
3358   if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size)
3359     size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH  * max_payload_size;
3360   num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size;
3361   io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
3362   io_handle->socket = socket;
3363   io_handle->write_cont = write_cont;
3364   io_handle->write_cont_cls = write_cont_cls;
3365   io_handle->size = size;
3366   sweep = data;
3367   /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3368      determined from RTT */
3369   ack_deadline = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
3370   /* Divide the given buffer into packets for sending */
3371   for (packet=0; packet < num_needed_packets; packet++)
3372   {
3373     if ((packet + 1) * max_payload_size < size) 
3374     {
3375       payload_size = max_payload_size;
3376       packet_size = MAX_PACKET_SIZE;
3377     }
3378     else 
3379     {
3380       payload_size = size - packet * max_payload_size;
3381       packet_size =  payload_size + sizeof (struct
3382                                             GNUNET_STREAM_DataMessage); 
3383     }
3384     io_handle->messages[packet] = GNUNET_malloc (packet_size);
3385     io_handle->messages[packet]->header.header.size = htons (packet_size);
3386     io_handle->messages[packet]->header.header.type =
3387       htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
3388     io_handle->messages[packet]->sequence_number =
3389       htonl (socket->write_sequence_number++);
3390     io_handle->messages[packet]->offset = htonl (socket->write_offset);
3391
3392     /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3393        determined from RTT */
3394     io_handle->messages[packet]->ack_deadline =
3395       GNUNET_TIME_relative_hton (ack_deadline);
3396     data_msg = io_handle->messages[packet];
3397     /* Copy data from given buffer to the packet */
3398     memcpy (&data_msg[1],
3399             sweep,
3400             payload_size);
3401     sweep += payload_size;
3402     socket->write_offset += payload_size;
3403   }
3404   socket->write_handle = io_handle;
3405   write_data (socket);
3406
3407   LOG (GNUNET_ERROR_TYPE_DEBUG,
3408        "%s() END\n", __func__);
3409
3410   return io_handle;
3411 }
3412
3413
3414 /**
3415  * Tries to read data from the stream.
3416  *
3417  * @param socket the socket representing a stream
3418  * @param timeout the timeout period
3419  * @param proc function to call with data (once only)
3420  * @param proc_cls the closure for proc
3421  *
3422  * @return handle to cancel the operation; if the stream has been shutdown for
3423  *           this type of opeartion then the DataProcessor is immediately
3424  *           called with GNUNET_STREAM_SHUTDOWN as status and NULL if returned
3425  */
3426 struct GNUNET_STREAM_IOReadHandle *
3427 GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
3428                     struct GNUNET_TIME_Relative timeout,
3429                     GNUNET_STREAM_DataProcessor proc,
3430                     void *proc_cls)
3431 {
3432   struct GNUNET_STREAM_IOReadHandle *read_handle;
3433   
3434   LOG (GNUNET_ERROR_TYPE_DEBUG,
3435        "%s: %s()\n", 
3436        GNUNET_i2s (&socket->other_peer),
3437        __func__);
3438   /* Return NULL if there is already a read handle; the user has to cancel that
3439      first before continuing or has to wait until it is completed */
3440   if (NULL != socket->read_handle) 
3441     return NULL;
3442   GNUNET_assert (NULL != proc);
3443   switch (socket->state)
3444   {
3445   case STATE_RECEIVE_CLOSED:
3446   case STATE_RECEIVE_CLOSE_WAIT:
3447   case STATE_CLOSED:
3448   case STATE_CLOSE_WAIT:
3449     proc (proc_cls, GNUNET_STREAM_SHUTDOWN, NULL, 0);
3450     LOG (GNUNET_ERROR_TYPE_DEBUG,
3451          "%s: %s() END\n",
3452          GNUNET_i2s (&socket->other_peer),
3453          __func__);
3454     return NULL;
3455   default:
3456     break;
3457   }
3458   read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
3459   read_handle->proc = proc;
3460   read_handle->proc_cls = proc_cls;
3461   read_handle->socket = socket;
3462   socket->read_handle = read_handle;
3463   /* Check if we have a packet at bitmap 0 */
3464   if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
3465                                           0))
3466   {
3467     socket->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
3468                                                      socket);   
3469   }
3470   /* Setup the read timeout task */
3471   socket->read_io_timeout_task_id =
3472     GNUNET_SCHEDULER_add_delayed (timeout,
3473                                   &read_io_timeout,
3474                                   socket);
3475   LOG (GNUNET_ERROR_TYPE_DEBUG,
3476        "%s: %s() END\n",
3477        GNUNET_i2s (&socket->other_peer),
3478        __func__);
3479   return read_handle;
3480 }
3481
3482
3483 /**
3484  * Cancel pending write operation.
3485  *
3486  * @param ioh handle to operation to cancel
3487  */
3488 void
3489 GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
3490 {
3491   struct GNUNET_STREAM_Socket *socket = ioh->socket;
3492   unsigned int packet;
3493
3494   GNUNET_assert (NULL != socket->write_handle);
3495   GNUNET_assert (socket->write_handle == ioh);
3496
3497   if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
3498   {
3499     GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
3500     socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
3501   }
3502
3503   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
3504   {
3505     if (NULL == ioh->messages[packet]) break;
3506     GNUNET_free (ioh->messages[packet]);
3507   }
3508       
3509   GNUNET_free (socket->write_handle);
3510   socket->write_handle = NULL;
3511 }
3512
3513
3514 /**
3515  * Cancel pending read operation.
3516  *
3517  * @param ioh handle to operation to cancel
3518  */
3519 void
3520 GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh)
3521 {
3522   struct GNUNET_STREAM_Socket *socket;
3523   
3524   socket = ioh->socket;
3525   GNUNET_assert (NULL != socket->read_handle);
3526   GNUNET_assert (ioh == socket->read_handle);
3527   /* Read io time task should be there; if it is already executed then this
3528   read handle is not valid */
3529   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != socket->read_io_timeout_task_id);
3530   GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task_id);
3531   socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
3532   /* reading task may be present; if so we have to stop it */
3533   if (GNUNET_SCHEDULER_NO_TASK != socket->read_task_id)
3534   {
3535     GNUNET_SCHEDULER_cancel (socket->read_task_id);
3536     socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
3537   }
3538   GNUNET_free (ioh);
3539   socket->read_handle = NULL;
3540 }
3541
3542 /* end of stream_api.c */