-fixing misc issues in stream, including compilation errors on bots, but more
[oweals/gnunet.git] / src / stream / stream_api.c
1 /*
2   This file is part of GNUnet.
3   (C) 2012 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /* TODO:
22  *
23  * Checks for matching the sender and socket->other_peer in server
24  * message handlers  
25  *
26  * Decrement PEER intern count during socket close and listen close to free the
27  * memory used for PEER interning
28  *
29  * Add code for write io timeout
30  *
31  * Include retransmission for control messages
32  **/
33
34 /**
35  * @file stream/stream_api.c
36  * @brief Implementation of the stream library
37  * @author Sree Harsha Totakura
38  */
39
40
41 #include "platform.h"
42 #include "gnunet_common.h"
43 #include "gnunet_crypto_lib.h"
44 #include "gnunet_stream_lib.h"
45 #include "gnunet_testing_lib.h"
46 #include "stream_protocol.h"
47
48
49 /**
50  * The maximum packet size of a stream packet
51  */
52 #define MAX_PACKET_SIZE 64000
53
54 /**
55  * Receive buffer
56  */
57 #define RECEIVE_BUFFER_SIZE 4096000
58
59 /**
60  * The maximum payload a data message packet can carry
61  */
62 static size_t max_payload_size = 
63   MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
64
65 /**
66  * states in the Protocol
67  */
68 enum State
69   {
70     /**
71      * Client initialization state
72      */
73     STATE_INIT,
74
75     /**
76      * Listener initialization state 
77      */
78     STATE_LISTEN,
79
80     /**
81      * Pre-connection establishment state
82      */
83     STATE_HELLO_WAIT,
84
85     /**
86      * State where a connection has been established
87      */
88     STATE_ESTABLISHED,
89
90     /**
91      * State where the socket is closed on our side and waiting to be ACK'ed
92      */
93     STATE_RECEIVE_CLOSE_WAIT,
94
95     /**
96      * State where the socket is closed for reading
97      */
98     STATE_RECEIVE_CLOSED,
99
100     /**
101      * State where the socket is closed on our side and waiting to be ACK'ed
102      */
103     STATE_TRANSMIT_CLOSE_WAIT,
104
105     /**
106      * State where the socket is closed for writing
107      */
108     STATE_TRANSMIT_CLOSED,
109
110     /**
111      * State where the socket is closed on our side and waiting to be ACK'ed
112      */
113     STATE_CLOSE_WAIT,
114
115     /**
116      * State where the socket is closed
117      */
118     STATE_CLOSED 
119   };
120
121
122 /**
123  * Functions of this type are called when a message is written
124  *
125  * @param cls the closure from queue_message
126  * @param socket the socket the written message was bound to
127  */
128 typedef void (*SendFinishCallback) (void *cls,
129                                     struct GNUNET_STREAM_Socket *socket);
130
131
132 /**
133  * The send message queue
134  */
135 struct MessageQueue
136 {
137   /**
138    * The message
139    */
140   struct GNUNET_STREAM_MessageHeader *message;
141
142   /**
143    * Callback to be called when the message is sent
144    */
145   SendFinishCallback finish_cb;
146
147   /**
148    * The closure for finish_cb
149    */
150   void *finish_cb_cls;
151
152   /**
153    * The next message in queue. Should be NULL in the last message
154    */
155   struct MessageQueue *next;
156
157   /**
158    * The next message in queue. Should be NULL in the first message
159    */
160   struct MessageQueue *prev;
161 };
162
163
164 /**
165  * The STREAM Socket Handler
166  */
167 struct GNUNET_STREAM_Socket
168 {
169   /**
170    * Retransmission timeout
171    */
172   struct GNUNET_TIME_Relative retransmit_timeout;
173
174   /**
175    * The Acknowledgement Bitmap
176    */
177   GNUNET_STREAM_AckBitmap ack_bitmap;
178
179   /**
180    * Time when the Acknowledgement was queued
181    */
182   struct GNUNET_TIME_Absolute ack_time_registered;
183
184   /**
185    * Queued Acknowledgement deadline
186    */
187   struct GNUNET_TIME_Relative ack_time_deadline;
188
189   /**
190    * The mesh handle
191    */
192   struct GNUNET_MESH_Handle *mesh;
193
194   /**
195    * The mesh tunnel handle
196    */
197   struct GNUNET_MESH_Tunnel *tunnel;
198
199   /**
200    * Stream open closure
201    */
202   void *open_cls;
203
204   /**
205    * Stream open callback
206    */
207   GNUNET_STREAM_OpenCallback open_cb;
208
209   /**
210    * The current transmit handle (if a pending transmit request exists)
211    */
212   struct GNUNET_MESH_TransmitHandle *transmit_handle;
213
214   /**
215    * The current act transmit handle (if a pending ack transmit request exists)
216    */
217   struct GNUNET_MESH_TransmitHandle *ack_transmit_handle;
218
219   /**
220    * Pointer to the current ack message using in ack_task
221    */
222   struct GNUNET_STREAM_AckMessage *ack_msg;
223
224   /**
225    * The current message associated with the transmit handle
226    */
227   struct MessageQueue *queue_head;
228
229   /**
230    * The queue tail, should always point to the last message in queue
231    */
232   struct MessageQueue *queue_tail;
233
234   /**
235    * The write IO_handle associated with this socket
236    */
237   struct GNUNET_STREAM_IOWriteHandle *write_handle;
238
239   /**
240    * The read IO_handle associated with this socket
241    */
242   struct GNUNET_STREAM_IOReadHandle *read_handle;
243
244   /**
245    * The shutdown handle associated with this socket
246    */
247   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
248
249   /**
250    * Buffer for storing received messages
251    */
252   void *receive_buffer;
253
254   /**
255    * The listen socket from which this socket is derived. Should be NULL if it
256    * is not a derived socket
257    */
258   struct GNUNET_STREAM_ListenSocket *lsocket;
259
260   /**
261    * Task identifier for the read io timeout task
262    */
263   GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id;
264
265   /**
266    * Task identifier for retransmission task after timeout
267    */
268   GNUNET_SCHEDULER_TaskIdentifier retransmission_timeout_task_id;
269
270   /**
271    * The task for sending timely Acks
272    */
273   GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
274
275   /**
276    * Task scheduled to continue a read operation.
277    */
278   GNUNET_SCHEDULER_TaskIdentifier read_task_id;
279
280   /**
281    * The state of the protocol associated with this socket
282    */
283   enum State state;
284
285   /**
286    * The status of the socket
287    */
288   enum GNUNET_STREAM_Status status;
289
290   /**
291    * The number of previous timeouts; FIXME: currently not used
292    */
293   unsigned int retries;
294
295   /**
296    * The peer identity of the peer at the other end of the stream
297    */
298   GNUNET_PEER_Id other_peer;
299
300   /**
301    * The application port number (type: uint32_t)
302    */
303   GNUNET_MESH_ApplicationType app_port;
304
305   /**
306    * The session id associated with this stream connection
307    * FIXME: Not used currently, may be removed
308    */
309   uint32_t session_id;
310
311   /**
312    * Write sequence number. Set to random when sending HELLO(client) and
313    * HELLO_ACK(server) 
314    */
315   uint32_t write_sequence_number;
316
317   /**
318    * Read sequence number. This number's value is determined during handshake
319    */
320   uint32_t read_sequence_number;
321
322   /**
323    * The receiver buffer size
324    */
325   uint32_t receive_buffer_size;
326
327   /**
328    * The receiver buffer boundaries
329    */
330   uint32_t receive_buffer_boundaries[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
331
332   /**
333    * receiver's available buffer after the last acknowledged packet
334    */
335   uint32_t receiver_window_available;
336
337   /**
338    * The offset pointer used during write operation
339    */
340   uint32_t write_offset;
341
342   /**
343    * The offset after which we are expecting data
344    */
345   uint32_t read_offset;
346
347   /**
348    * The offset upto which user has read from the received buffer
349    */
350   uint32_t copy_offset;
351 };
352
353
354 /**
355  * A socket for listening
356  */
357 struct GNUNET_STREAM_ListenSocket
358 {
359   /**
360    * The mesh handle
361    */
362   struct GNUNET_MESH_Handle *mesh;
363
364   /**
365    * The callback function which is called after successful opening socket
366    */
367   GNUNET_STREAM_ListenCallback listen_cb;
368
369   /**
370    * The call back closure
371    */
372   void *listen_cb_cls;
373
374   /**
375    * The service port
376    * FIXME: Remove if not required!
377    */
378   GNUNET_MESH_ApplicationType port;
379 };
380
381
382 /**
383  * The IO Write Handle
384  */
385 struct GNUNET_STREAM_IOWriteHandle
386 {
387   /**
388    * The socket to which this write handle is associated
389    */
390   struct GNUNET_STREAM_Socket *socket;
391
392   /**
393    * The packet_buffers associated with this Handle
394    */
395   struct GNUNET_STREAM_DataMessage *messages[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
396
397   /**
398    * The write continuation callback
399    */
400   GNUNET_STREAM_CompletionContinuation write_cont;
401
402   /**
403    * Write continuation closure
404    */
405   void *write_cont_cls;
406
407   /**
408    * The bitmap of this IOHandle; Corresponding bit for a message is set when
409    * it has been acknowledged by the receiver
410    */
411   GNUNET_STREAM_AckBitmap ack_bitmap;
412
413   /**
414    * Number of bytes in this write handle
415    */
416   size_t size;
417 };
418
419
420 /**
421  * The IO Read Handle
422  */
423 struct GNUNET_STREAM_IOReadHandle
424 {
425   /**
426    * Callback for the read processor
427    */
428   GNUNET_STREAM_DataProcessor proc;
429
430   /**
431    * The closure pointer for the read processor callback
432    */
433   void *proc_cls;
434 };
435
436
437 /**
438  * Handle for Shutdown
439  */
440 struct GNUNET_STREAM_ShutdownHandle
441 {
442   /**
443    * The socket associated with this shutdown handle
444    */
445   struct GNUNET_STREAM_Socket *socket;
446
447   /**
448    * Shutdown completion callback
449    */
450   GNUNET_STREAM_ShutdownCompletion completion_cb;
451
452   /**
453    * Closure for completion callback
454    */
455   void *completion_cls;
456
457   /**
458    * Close message retransmission task id
459    */
460   GNUNET_SCHEDULER_TaskIdentifier close_msg_retransmission_task_id;
461
462   /**
463    * Which operation to shutdown? SHUT_RD, SHUT_WR or SHUT_RDWR
464    */
465   int operation;  
466 };
467
468
469 /**
470  * Default value in seconds for various timeouts
471  */
472 static unsigned int default_timeout = 10;
473
474
475 /**
476  * Callback function for sending queued message
477  *
478  * @param cls closure the socket
479  * @param size number of bytes available in buf
480  * @param buf where the callee should write the message
481  * @return number of bytes written to buf
482  */
483 static size_t
484 send_message_notify (void *cls, size_t size, void *buf)
485 {
486   struct GNUNET_STREAM_Socket *socket = cls;
487   struct GNUNET_PeerIdentity target;
488   struct MessageQueue *head;
489   size_t ret;
490
491   socket->transmit_handle = NULL; /* Remove the transmit handle */
492   head = socket->queue_head;
493   if (NULL == head)
494     return 0; /* just to be safe */
495   GNUNET_PEER_resolve (socket->other_peer, &target);
496   if (0 == size)                /* request timed out */
497     {
498       socket->retries++;
499       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
500                   "Message sending timed out. Retry %d \n",
501                   socket->retries);
502       socket->transmit_handle = 
503         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
504                                            0, /* Corking */
505                                            1, /* Priority */
506                                            /* FIXME: exponential backoff */
507                                            socket->retransmit_timeout,
508                                            &target,
509                                            ntohs (head->message->header.size),
510                                            &send_message_notify,
511                                            socket);
512       return 0;
513     }
514
515   ret = ntohs (head->message->header.size);
516   GNUNET_assert (size >= ret);
517   memcpy (buf, head->message, ret);
518   if (NULL != head->finish_cb)
519     {
520       head->finish_cb (head->finish_cb_cls, socket);
521     }
522   GNUNET_CONTAINER_DLL_remove (socket->queue_head,
523                                socket->queue_tail,
524                                head);
525   GNUNET_free (head->message);
526   GNUNET_free (head);
527   head = socket->queue_head;
528   if (NULL != head)    /* more pending messages to send */
529     {
530       socket->retries = 0;
531       socket->transmit_handle = 
532         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
533                                            0, /* Corking */
534                                            1, /* Priority */
535                                            /* FIXME: exponential backoff */
536                                            socket->retransmit_timeout,
537                                            &target,
538                                            ntohs (head->message->header.size),
539                                            &send_message_notify,
540                                            socket);
541     }
542   return ret;
543 }
544
545
546 /**
547  * Queues a message for sending using the mesh connection of a socket
548  *
549  * @param socket the socket whose mesh connection is used
550  * @param message the message to be sent
551  * @param finish_cb the callback to be called when the message is sent
552  * @param finish_cb_cls the closure for the callback
553  */
554 static void
555 queue_message (struct GNUNET_STREAM_Socket *socket,
556                struct GNUNET_STREAM_MessageHeader *message,
557                SendFinishCallback finish_cb,
558                void *finish_cb_cls)
559 {
560   struct MessageQueue *queue_entity;
561   struct GNUNET_PeerIdentity target;
562
563   GNUNET_assert 
564     ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA)
565      && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK));
566
567   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
568               "Queueing message of type %d and size %d\n",
569               ntohs (message->header.type),
570               ntohs (message->header.size));
571   GNUNET_assert (NULL != message);
572   queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
573   queue_entity->message = message;
574   queue_entity->finish_cb = finish_cb;
575   queue_entity->finish_cb_cls = finish_cb_cls;
576   GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
577                                     socket->queue_tail,
578                                     queue_entity);
579   if (NULL == socket->transmit_handle)
580   {
581     socket->retries = 0;
582     GNUNET_PEER_resolve (socket->other_peer, &target);
583     socket->transmit_handle = 
584       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
585                                          0, /* Corking */
586                                          1, /* Priority */
587                                          socket->retransmit_timeout,
588                                          &target,
589                                          ntohs (message->header.size),
590                                          &send_message_notify,
591                                          socket);
592   }
593 }
594
595
596 /**
597  * Copies a message and queues it for sending using the mesh connection of
598  * given socket 
599  *
600  * @param socket the socket whose mesh connection is used
601  * @param message the message to be sent
602  * @param finish_cb the callback to be called when the message is sent
603  * @param finish_cb_cls the closure for the callback
604  */
605 static void
606 copy_and_queue_message (struct GNUNET_STREAM_Socket *socket,
607                         const struct GNUNET_STREAM_MessageHeader *message,
608                         SendFinishCallback finish_cb,
609                         void *finish_cb_cls)
610 {
611   struct GNUNET_STREAM_MessageHeader *msg_copy;
612   uint16_t size;
613   
614   size = ntohs (message->header.size);
615   msg_copy = GNUNET_malloc (size);
616   memcpy (msg_copy, message, size);
617   queue_message (socket, msg_copy, finish_cb, finish_cb_cls);
618 }
619
620
621 /**
622  * Callback function for sending ack message
623  *
624  * @param cls closure the ACK message created in ack_task
625  * @param size number of bytes available in buffer
626  * @param buf where the callee should write the message
627  * @return number of bytes written to buf
628  */
629 static size_t
630 send_ack_notify (void *cls, size_t size, void *buf)
631 {
632   struct GNUNET_STREAM_Socket *socket = cls;
633
634   if (0 == size)
635     {
636       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
637                   "%s called with size 0\n", __func__);
638       return 0;
639     }
640   GNUNET_assert (ntohs (socket->ack_msg->header.header.size) <= size);
641   
642   size = ntohs (socket->ack_msg->header.header.size);
643   memcpy (buf, socket->ack_msg, size);
644   
645   GNUNET_free (socket->ack_msg);
646   socket->ack_msg = NULL;
647   socket->ack_transmit_handle = NULL;
648   return size;
649 }
650
651 /**
652  * Writes data using the given socket. The amount of data written is limited by
653  * the receiver_window_size
654  *
655  * @param socket the socket to use
656  */
657 static void 
658 write_data (struct GNUNET_STREAM_Socket *socket);
659
660 /**
661  * Task for retransmitting data messages if they aren't ACK before their ack
662  * deadline 
663  *
664  * @param cls the socket
665  * @param tc the Task context
666  */
667 static void
668 retransmission_timeout_task (void *cls,
669                              const struct GNUNET_SCHEDULER_TaskContext *tc)
670 {
671   struct GNUNET_STREAM_Socket *socket = cls;
672   
673   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
674     return;
675
676   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
677               "Retransmitting DATA...\n");
678   socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
679   write_data (socket);
680 }
681
682
683 /**
684  * Task for sending ACK message
685  *
686  * @param cls the socket
687  * @param tc the Task context
688  */
689 static void
690 ack_task (void *cls,
691           const struct GNUNET_SCHEDULER_TaskContext *tc)
692 {
693   struct GNUNET_STREAM_Socket *socket = cls;
694   struct GNUNET_STREAM_AckMessage *ack_msg;
695   struct GNUNET_PeerIdentity target;
696
697   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
698     {
699       return;
700     }
701
702   socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
703
704   /* Create the ACK Message */
705   ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
706   ack_msg->header.header.size = htons (sizeof (struct 
707                                                GNUNET_STREAM_AckMessage));
708   ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
709   ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
710   ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
711   ack_msg->receive_window_remaining = 
712     htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
713
714   socket->ack_msg = ack_msg;
715   GNUNET_PEER_resolve (socket->other_peer, &target);
716   /* Request MESH for sending ACK */
717   socket->ack_transmit_handle = 
718     GNUNET_MESH_notify_transmit_ready (socket->tunnel,
719                                        0, /* Corking */
720                                        1, /* Priority */
721                                        socket->retransmit_timeout,
722                                        &target,
723                                        ntohs (ack_msg->header.header.size),
724                                        &send_ack_notify,
725                                        socket);
726 }
727
728
729 /**
730  * Retransmission task for shutdown messages
731  *
732  * @param cls the shutdown handle
733  * @param tc the Task Context
734  */
735 static void
736 close_msg_retransmission_task (void *cls,
737                                const struct GNUNET_SCHEDULER_TaskContext *tc)
738 {
739   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle = cls;
740   struct GNUNET_STREAM_MessageHeader *msg;
741   struct GNUNET_STREAM_Socket *socket;
742
743   GNUNET_assert (NULL != shutdown_handle);
744   socket = shutdown_handle->socket;
745
746   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
747   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
748   switch (shutdown_handle->operation)
749     {
750     case SHUT_RDWR:
751       msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
752       break;
753     case SHUT_RD:
754       msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
755       break;
756     case SHUT_WR:
757       msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
758       break;
759     default:
760       GNUNET_free (msg);
761       shutdown_handle->close_msg_retransmission_task_id = 
762         GNUNET_SCHEDULER_NO_TASK;
763       return;
764     }
765   queue_message (socket, msg, NULL, NULL);
766   shutdown_handle->close_msg_retransmission_task_id =
767     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
768                                   &close_msg_retransmission_task,
769                                   shutdown_handle);
770 }
771
772
773 /**
774  * Function to modify a bit in GNUNET_STREAM_AckBitmap
775  *
776  * @param bitmap the bitmap to modify
777  * @param bit the bit number to modify
778  * @param value GNUNET_YES to on, GNUNET_NO to off
779  */
780 static void
781 ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
782                       unsigned int bit, 
783                       int value)
784 {
785   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
786   if (GNUNET_YES == value)
787     *bitmap |= (1LL << bit);
788   else
789     *bitmap &= ~(1LL << bit);
790 }
791
792
793 /**
794  * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
795  *
796  * @param bitmap address of the bitmap that has to be checked
797  * @param bit the bit number to check
798  * @return GNUNET_YES if the bit is set; GNUNET_NO if not
799  */
800 static uint8_t
801 ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
802                       unsigned int bit)
803 {
804   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
805   return 0 != (*bitmap & (1LL << bit));
806 }
807
808
809 /**
810  * Writes data using the given socket. The amount of data written is limited by
811  * the receiver_window_size
812  *
813  * @param socket the socket to use
814  */
815 static void 
816 write_data (struct GNUNET_STREAM_Socket *socket)
817 {
818   struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
819   int packet;                   /* Although an int, should never be negative */
820   int ack_packet;
821
822   ack_packet = -1;
823   /* Find the last acknowledged packet */
824   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
825     {      
826       if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
827                                               packet))
828         ack_packet = packet;        
829       else if (NULL == io_handle->messages[packet])
830         break;
831     }
832   /* Resend packets which weren't ack'ed */
833   for (packet=0; packet < ack_packet; packet++)
834     {
835       if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
836                                              packet))
837         {
838           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
839                       "Placing DATA message with sequence %u in send queue\n",
840                       ntohl (io_handle->messages[packet]->sequence_number));
841
842           copy_and_queue_message (socket,
843                                   &io_handle->messages[packet]->header,
844                                   NULL,
845                                   NULL);
846         }
847     }
848   packet = ack_packet + 1;
849   /* Now send new packets if there is enough buffer space */
850   while ( (NULL != io_handle->messages[packet]) &&
851           (socket->receiver_window_available 
852            >= ntohs (io_handle->messages[packet]->header.header.size)) )
853     {
854       socket->receiver_window_available -= 
855         ntohs (io_handle->messages[packet]->header.header.size);
856       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
857                   "Placing DATA message with sequence %u in send queue\n",
858                   ntohl (io_handle->messages[packet]->sequence_number));
859       copy_and_queue_message (socket,
860                               &io_handle->messages[packet]->header,
861                               NULL,
862                               NULL);
863       packet++;
864     }
865
866   if (GNUNET_SCHEDULER_NO_TASK == socket->retransmission_timeout_task_id)
867     socket->retransmission_timeout_task_id = 
868       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 
869                                     (GNUNET_TIME_UNIT_SECONDS, 8),
870                                     &retransmission_timeout_task,
871                                     socket);
872 }
873
874
875 /**
876  * Task for calling the read processor
877  *
878  * @param cls the socket
879  * @param tc the task context
880  */
881 static void
882 call_read_processor (void *cls,
883                      const struct GNUNET_SCHEDULER_TaskContext *tc)
884 {
885   struct GNUNET_STREAM_Socket *socket = cls;
886   size_t read_size;
887   size_t valid_read_size;
888   unsigned int packet;
889   uint32_t sequence_increase;
890   uint32_t offset_increase;
891
892   socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
893   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
894     return;
895
896   if (NULL == socket->receive_buffer) 
897     return;
898
899   GNUNET_assert (NULL != socket->read_handle);
900   GNUNET_assert (NULL != socket->read_handle->proc);
901
902   /* Check the bitmap for any holes */
903   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
904     {
905       if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
906                                              packet))
907         break;
908     }
909   /* We only call read processor if we have the first packet */
910   GNUNET_assert (0 < packet);
911
912   valid_read_size = 
913     socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
914
915   GNUNET_assert (0 != valid_read_size);
916
917   /* Cancel the read_io_timeout_task */
918   GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task_id);
919   socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
920
921   /* Call the data processor */
922   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
923               "Calling read processor\n");
924   read_size = 
925     socket->read_handle->proc (socket->read_handle->proc_cls,
926                                socket->status,
927                                socket->receive_buffer + socket->copy_offset,
928                                valid_read_size);
929   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
930               "Read processor read %d bytes\n",
931               read_size);
932   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
933               "Read processor completed successfully\n");
934
935   /* Free the read handle */
936   GNUNET_free (socket->read_handle);
937   socket->read_handle = NULL;
938
939   GNUNET_assert (read_size <= valid_read_size);
940   socket->copy_offset += read_size;
941
942   /* Determine upto which packet we can remove from the buffer */
943   for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
944     {
945       if (socket->copy_offset == socket->receive_buffer_boundaries[packet])
946         { packet++; break; }
947       if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
948         break;
949     }
950
951   /* If no packets can be removed we can't move the buffer */
952   if (0 == packet) return;
953
954   sequence_increase = packet;
955   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
956               "Sequence increase after read processor completion: %u\n",
957               sequence_increase);
958
959   /* Shift the data in the receive buffer */
960   memmove (socket->receive_buffer,
961            socket->receive_buffer 
962            + socket->receive_buffer_boundaries[sequence_increase-1],
963            socket->receive_buffer_size
964            - socket->receive_buffer_boundaries[sequence_increase-1]);
965   
966   /* Shift the bitmap */
967   socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
968   
969   /* Set read_sequence_number */
970   socket->read_sequence_number += sequence_increase;
971   
972   /* Set read_offset */
973   offset_increase = socket->receive_buffer_boundaries[sequence_increase-1];
974   socket->read_offset += offset_increase;
975
976   /* Fix copy_offset */
977   GNUNET_assert (offset_increase <= socket->copy_offset);
978   socket->copy_offset -= offset_increase;
979   
980   /* Fix relative boundaries */
981   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
982     {
983       if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase)
984         {
985           socket->receive_buffer_boundaries[packet] = 
986             socket->receive_buffer_boundaries[packet + sequence_increase] 
987             - offset_increase;
988         }
989       else
990         socket->receive_buffer_boundaries[packet] = 0;
991     }
992 }
993
994
995 /**
996  * Cancels the existing read io handle
997  *
998  * @param cls the closure from the SCHEDULER call
999  * @param tc the task context
1000  */
1001 static void
1002 read_io_timeout (void *cls, 
1003                 const struct GNUNET_SCHEDULER_TaskContext *tc)
1004 {
1005   struct GNUNET_STREAM_Socket *socket = cls;
1006   GNUNET_STREAM_DataProcessor proc;
1007   void *proc_cls;
1008
1009   socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
1010   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
1011   {
1012     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1013                 "Read task timedout - Cancelling it\n");
1014     GNUNET_SCHEDULER_cancel (socket->read_task_id);
1015     socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
1016   }
1017   GNUNET_assert (NULL != socket->read_handle);
1018   proc = socket->read_handle->proc;
1019   proc_cls = socket->read_handle->proc_cls;
1020
1021   GNUNET_free (socket->read_handle);
1022   socket->read_handle = NULL;
1023   /* Call the read processor to signal timeout */
1024   proc (proc_cls,
1025         GNUNET_STREAM_TIMEOUT,
1026         NULL,
1027         0);
1028 }
1029
1030
1031 /**
1032  * Handler for DATA messages; Same for both client and server
1033  *
1034  * @param socket the socket through which the ack was received
1035  * @param tunnel connection to the other end
1036  * @param sender who sent the message
1037  * @param msg the data message
1038  * @param atsi performance data for the connection
1039  * @return GNUNET_OK to keep the connection open,
1040  *         GNUNET_SYSERR to close it (signal serious error)
1041  */
1042 static int
1043 handle_data (struct GNUNET_STREAM_Socket *socket,
1044              struct GNUNET_MESH_Tunnel *tunnel,
1045              const struct GNUNET_PeerIdentity *sender,
1046              const struct GNUNET_STREAM_DataMessage *msg,
1047              const struct GNUNET_ATS_Information*atsi)
1048 {
1049   const void *payload;
1050   uint32_t bytes_needed;
1051   uint32_t relative_offset;
1052   uint32_t relative_sequence_number;
1053   uint16_t size;
1054
1055   size = htons (msg->header.header.size);
1056   if (size < sizeof (struct GNUNET_STREAM_DataMessage))
1057     {
1058       GNUNET_break_op (0);
1059       return GNUNET_SYSERR;
1060     }
1061
1062   if (GNUNET_PEER_search (sender) != socket->other_peer)
1063     {
1064       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1065                   "Received DATA from non-confirming peer\n");
1066       return GNUNET_YES;
1067     }
1068
1069   switch (socket->state)
1070     {
1071     case STATE_ESTABLISHED:
1072     case STATE_TRANSMIT_CLOSED:
1073     case STATE_TRANSMIT_CLOSE_WAIT:
1074       
1075       /* check if the message's sequence number is in the range we are
1076          expecting */
1077       relative_sequence_number = 
1078         ntohl (msg->sequence_number) - socket->read_sequence_number;
1079       if ( relative_sequence_number > GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
1080         {
1081           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1082                       "Ignoring received message with sequence number %u\n",
1083                       ntohl (msg->sequence_number));
1084           /* Start ACK sending task if one is not already present */
1085           if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1086             {
1087               socket->ack_task_id = 
1088                 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1089                                               (msg->ack_deadline),
1090                                               &ack_task,
1091                                               socket);
1092             }
1093           return GNUNET_YES;
1094         }
1095       
1096       /* Check if we have already seen this message */
1097       if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
1098                                               relative_sequence_number))
1099         {
1100           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1101                       "Ignoring already received message with sequence "
1102                       "number %u\n",
1103                       ntohl (msg->sequence_number));
1104           /* Start ACK sending task if one is not already present */
1105           if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1106             {
1107               socket->ack_task_id = 
1108                 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1109                                               (msg->ack_deadline),
1110                                               &ack_task,
1111                                               socket);
1112             }
1113           return GNUNET_YES;
1114         }
1115
1116       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1117                   "Receiving DATA with sequence number: %u and size: %d from %x\n",
1118                   ntohl (msg->sequence_number),
1119                   ntohs (msg->header.header.size),
1120                   socket->other_peer);
1121
1122       /* Check if we have to allocate the buffer */
1123       size -= sizeof (struct GNUNET_STREAM_DataMessage);
1124       relative_offset = ntohl (msg->offset) - socket->read_offset;
1125       bytes_needed = relative_offset + size;
1126       if (bytes_needed > socket->receive_buffer_size)
1127         {
1128           if (bytes_needed <= RECEIVE_BUFFER_SIZE)
1129             {
1130               socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
1131                                                        bytes_needed);
1132               socket->receive_buffer_size = bytes_needed;
1133             }
1134           else
1135             {
1136               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1137                           "Cannot accommodate packet %d as buffer is full\n",
1138                           ntohl (msg->sequence_number));
1139               return GNUNET_YES;
1140             }
1141         }
1142       
1143       /* Copy Data to buffer */
1144       payload = &msg[1];
1145       GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
1146       memcpy (socket->receive_buffer + relative_offset,
1147               payload,
1148               size);
1149       socket->receive_buffer_boundaries[relative_sequence_number] = 
1150         relative_offset + size;
1151       
1152       /* Modify the ACK bitmap */
1153       ackbitmap_modify_bit (&socket->ack_bitmap,
1154                             relative_sequence_number,
1155                             GNUNET_YES);
1156
1157       /* Start ACK sending task if one is not already present */
1158       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1159        {
1160          socket->ack_task_id = 
1161            GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1162                                          (msg->ack_deadline),
1163                                          &ack_task,
1164                                          socket);
1165        }
1166
1167       if ((NULL != socket->read_handle) /* A read handle is waiting */
1168           /* There is no current read task */
1169           && (GNUNET_SCHEDULER_NO_TASK == socket->read_task_id)
1170           /* We have the first packet */
1171           && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap,
1172                                                  0)))
1173         {
1174           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1175                       "Scheduling read processor\n");
1176
1177           socket->read_task_id = 
1178             GNUNET_SCHEDULER_add_now (&call_read_processor,
1179                                       socket);
1180         }
1181       
1182       break;
1183
1184     default:
1185       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1186                   "Received data message when it cannot be handled\n");
1187       break;
1188     }
1189   return GNUNET_YES;
1190 }
1191
1192
1193 /**
1194  * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1195  *
1196  * @param cls the socket (set from GNUNET_MESH_connect)
1197  * @param tunnel connection to the other end
1198  * @param tunnel_ctx place to store local state associated with the tunnel
1199  * @param sender who sent the message
1200  * @param message the actual message
1201  * @param atsi performance data for the connection
1202  * @return GNUNET_OK to keep the connection open,
1203  *         GNUNET_SYSERR to close it (signal serious error)
1204  */
1205 static int
1206 client_handle_data (void *cls,
1207              struct GNUNET_MESH_Tunnel *tunnel,
1208              void **tunnel_ctx,
1209              const struct GNUNET_PeerIdentity *sender,
1210              const struct GNUNET_MessageHeader *message,
1211              const struct GNUNET_ATS_Information*atsi)
1212 {
1213   struct GNUNET_STREAM_Socket *socket = cls;
1214
1215   return handle_data (socket, 
1216                       tunnel, 
1217                       sender, 
1218                       (const struct GNUNET_STREAM_DataMessage *) message, 
1219                       atsi);
1220 }
1221
1222
1223 /**
1224  * Callback to set state to ESTABLISHED
1225  *
1226  * @param cls the closure from queue_message FIXME: document
1227  * @param socket the socket to requiring state change
1228  */
1229 static void
1230 set_state_established (void *cls,
1231                        struct GNUNET_STREAM_Socket *socket)
1232 {
1233   struct GNUNET_PeerIdentity initiator_pid;
1234
1235   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
1236               "Attaining ESTABLISHED state\n");
1237   socket->write_offset = 0;
1238   socket->read_offset = 0;
1239   socket->state = STATE_ESTABLISHED;
1240   /* FIXME: What if listen_cb is NULL */
1241   if (NULL != socket->lsocket)
1242     {
1243       GNUNET_PEER_resolve (socket->other_peer, &initiator_pid);
1244       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1245                   "Calling listen callback\n");
1246       if (GNUNET_SYSERR == 
1247           socket->lsocket->listen_cb (socket->lsocket->listen_cb_cls,
1248                                       socket,
1249                                       &initiator_pid))
1250         {
1251           socket->state = STATE_CLOSED;
1252           /* FIXME: We should close in a decent way */
1253           GNUNET_MESH_tunnel_destroy (socket->tunnel); /* Destroy the tunnel */
1254           GNUNET_free (socket);
1255         }
1256     }
1257   else if (socket->open_cb)
1258     socket->open_cb (socket->open_cls, socket);
1259 }
1260
1261
1262 /**
1263  * Callback to set state to HELLO_WAIT
1264  *
1265  * @param cls the closure from queue_message
1266  * @param socket the socket to requiring state change
1267  */
1268 static void
1269 set_state_hello_wait (void *cls,
1270                       struct GNUNET_STREAM_Socket *socket)
1271 {
1272   GNUNET_assert (STATE_INIT == socket->state);
1273   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
1274               "Attaining HELLO_WAIT state\n");
1275   socket->state = STATE_HELLO_WAIT;
1276 }
1277
1278
1279 /**
1280  * Callback to set state to CLOSE_WAIT
1281  *
1282  * @param cls the closure from queue_message
1283  * @param socket the socket requiring state change
1284  */
1285 static void
1286 set_state_close_wait (void *cls,
1287                       struct GNUNET_STREAM_Socket *socket)
1288 {
1289   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1290               "Attaing CLOSE_WAIT state\n");
1291   socket->state = STATE_CLOSE_WAIT;
1292   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1293   socket->receive_buffer = NULL;
1294   socket->receive_buffer_size = 0;
1295 }
1296
1297
1298 /**
1299  * Callback to set state to RECEIVE_CLOSE_WAIT
1300  *
1301  * @param cls the closure from queue_message
1302  * @param socket the socket requiring state change
1303  */
1304 static void
1305 set_state_receive_close_wait (void *cls,
1306                               struct GNUNET_STREAM_Socket *socket)
1307 {
1308   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1309               "Attaing RECEIVE_CLOSE_WAIT state\n");
1310   socket->state = STATE_RECEIVE_CLOSE_WAIT;
1311   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1312   socket->receive_buffer = NULL;
1313   socket->receive_buffer_size = 0;
1314 }
1315
1316
1317 /**
1318  * Callback to set state to TRANSMIT_CLOSE_WAIT
1319  *
1320  * @param cls the closure from queue_message
1321  * @param socket the socket requiring state change
1322  */
1323 static void
1324 set_state_transmit_close_wait (void *cls,
1325                                struct GNUNET_STREAM_Socket *socket)
1326 {
1327   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1328               "Attaining TRANSMIT_CLOSE_WAIT state\n");
1329   socket->state = STATE_TRANSMIT_CLOSE_WAIT;
1330 }
1331
1332
1333 /**
1334  * Callback to set state to CLOSED
1335  *
1336  * @param cls the closure from queue_message
1337  * @param socket the socket requiring state change
1338  */
1339 static void
1340 set_state_closed (void *cls,
1341                   struct GNUNET_STREAM_Socket *socket)
1342 {
1343   socket->state = STATE_CLOSED;
1344 }
1345
1346 /**
1347  * Returns a new HelloAckMessage. Also sets the write sequence number for the
1348  * socket
1349  *
1350  * @param socket the socket for which this HelloAckMessage has to be generated
1351  * @return the HelloAckMessage
1352  */
1353 static struct GNUNET_STREAM_HelloAckMessage *
1354 generate_hello_ack_msg (struct GNUNET_STREAM_Socket *socket)
1355 {
1356   struct GNUNET_STREAM_HelloAckMessage *msg;
1357
1358   /* Get the random sequence number */
1359   socket->write_sequence_number = 
1360     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1361   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1362               "Generated write sequence number %u\n",
1363               (unsigned int) socket->write_sequence_number);
1364   
1365   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1366   msg->header.header.size = 
1367     htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1368   msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1369   msg->sequence_number = htonl (socket->write_sequence_number);
1370   msg->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
1371
1372   return msg;
1373 }
1374
1375
1376 /**
1377  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1378  *
1379  * @param cls the socket (set from GNUNET_MESH_connect)
1380  * @param tunnel connection to the other end
1381  * @param tunnel_ctx this is NULL
1382  * @param sender who sent the message
1383  * @param message the actual message
1384  * @param atsi performance data for the connection
1385  * @return GNUNET_OK to keep the connection open,
1386  *         GNUNET_SYSERR to close it (signal serious error)
1387  */
1388 static int
1389 client_handle_hello_ack (void *cls,
1390                          struct GNUNET_MESH_Tunnel *tunnel,
1391                          void **tunnel_ctx,
1392                          const struct GNUNET_PeerIdentity *sender,
1393                          const struct GNUNET_MessageHeader *message,
1394                          const struct GNUNET_ATS_Information*atsi)
1395 {
1396   struct GNUNET_STREAM_Socket *socket = cls;
1397   const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
1398   struct GNUNET_STREAM_HelloAckMessage *reply;
1399
1400   if (GNUNET_PEER_search (sender) != socket->other_peer)
1401     {
1402       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1403                   "Received HELLO_ACK from non-confirming peer\n");
1404       return GNUNET_YES;
1405     }
1406   ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
1407   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1408               "Received HELLO_ACK from %x\n",
1409               socket->other_peer);
1410
1411   GNUNET_assert (socket->tunnel == tunnel);
1412   switch (socket->state)
1413   {
1414   case STATE_HELLO_WAIT:
1415     socket->read_sequence_number = ntohl (ack_msg->sequence_number);
1416     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1417                 "Read sequence number %u\n",
1418                 (unsigned int) socket->read_sequence_number);
1419     socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
1420     reply = generate_hello_ack_msg (socket);
1421     queue_message (socket,
1422                    &reply->header, 
1423                    &set_state_established, 
1424                    NULL);      
1425     return GNUNET_OK;
1426   case STATE_ESTABLISHED:
1427   case STATE_RECEIVE_CLOSE_WAIT:
1428     // call statistics (# ACKs ignored++)
1429     return GNUNET_OK;
1430   case STATE_INIT:
1431   default:
1432     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1433                 "Server %x sent HELLO_ACK when in state %d\n", 
1434                 socket->other_peer,
1435                 socket->state);
1436     socket->state = STATE_CLOSED; // introduce STATE_ERROR?
1437     return GNUNET_SYSERR;
1438   }
1439
1440 }
1441
1442
1443 /**
1444  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1445  *
1446  * @param cls the socket (set from GNUNET_MESH_connect)
1447  * @param tunnel connection to the other end
1448  * @param tunnel_ctx this is NULL
1449  * @param sender who sent the message
1450  * @param message the actual message
1451  * @param atsi performance data for the connection
1452  * @return GNUNET_OK to keep the connection open,
1453  *         GNUNET_SYSERR to close it (signal serious error)
1454  */
1455 static int
1456 client_handle_reset (void *cls,
1457                      struct GNUNET_MESH_Tunnel *tunnel,
1458                      void **tunnel_ctx,
1459                      const struct GNUNET_PeerIdentity *sender,
1460                      const struct GNUNET_MessageHeader *message,
1461                      const struct GNUNET_ATS_Information*atsi)
1462 {
1463   // struct GNUNET_STREAM_Socket *socket = cls;
1464
1465   return GNUNET_OK;
1466 }
1467
1468
1469 /**
1470  * Common message handler for handling TRANSMIT_CLOSE messages
1471  *
1472  * @param socket the socket through which the ack was received
1473  * @param tunnel connection to the other end
1474  * @param sender who sent the message
1475  * @param msg the transmit close message
1476  * @param atsi performance data for the connection
1477  * @return GNUNET_OK to keep the connection open,
1478  *         GNUNET_SYSERR to close it (signal serious error)
1479  */
1480 static int
1481 handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
1482                        struct GNUNET_MESH_Tunnel *tunnel,
1483                        const struct GNUNET_PeerIdentity *sender,
1484                        const struct GNUNET_STREAM_MessageHeader *msg,
1485                        const struct GNUNET_ATS_Information*atsi)
1486 {
1487   struct GNUNET_STREAM_MessageHeader *reply;
1488
1489   switch (socket->state)
1490     {
1491     case STATE_ESTABLISHED:
1492       socket->state = STATE_RECEIVE_CLOSED;
1493
1494       /* Send TRANSMIT_CLOSE_ACK */
1495       reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1496       reply->header.type = 
1497         htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
1498       reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1499       queue_message (socket, reply, NULL, NULL);
1500       break;
1501
1502     default:
1503       /* FIXME: Call statistics? */
1504       break;
1505     }
1506   return GNUNET_YES;
1507 }
1508
1509
1510 /**
1511  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1512  *
1513  * @param cls the socket (set from GNUNET_MESH_connect)
1514  * @param tunnel connection to the other end
1515  * @param tunnel_ctx this is NULL
1516  * @param sender who sent the message
1517  * @param message the actual message
1518  * @param atsi performance data for the connection
1519  * @return GNUNET_OK to keep the connection open,
1520  *         GNUNET_SYSERR to close it (signal serious error)
1521  */
1522 static int
1523 client_handle_transmit_close (void *cls,
1524                               struct GNUNET_MESH_Tunnel *tunnel,
1525                               void **tunnel_ctx,
1526                               const struct GNUNET_PeerIdentity *sender,
1527                               const struct GNUNET_MessageHeader *message,
1528                               const struct GNUNET_ATS_Information*atsi)
1529 {
1530   struct GNUNET_STREAM_Socket *socket = cls;
1531   
1532   return handle_transmit_close (socket,
1533                                 tunnel,
1534                                 sender,
1535                                 (struct GNUNET_STREAM_MessageHeader *)message,
1536                                 atsi);
1537 }
1538
1539
1540 /**
1541  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_*_CLOSE_ACK messages
1542  *
1543  * @param socket the socket
1544  * @param tunnel connection to the other end
1545  * @param sender who sent the message
1546  * @param message the actual message
1547  * @param atsi performance data for the connection
1548  * @param operation the close operation which is being ACK'ed
1549  * @return GNUNET_OK to keep the connection open,
1550  *         GNUNET_SYSERR to close it (signal serious error)
1551  */
1552 static int
1553 handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
1554                           struct GNUNET_MESH_Tunnel *tunnel,
1555                           const struct GNUNET_PeerIdentity *sender,
1556                           const struct GNUNET_STREAM_MessageHeader *message,
1557                           const struct GNUNET_ATS_Information *atsi,
1558                           int operation)
1559 {
1560   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
1561
1562   shutdown_handle = socket->shutdown_handle;
1563   if (NULL == shutdown_handle)
1564     {
1565       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1566                   "Received *CLOSE_ACK when shutdown handle is NULL\n");
1567       return GNUNET_OK;
1568     }
1569
1570   switch (operation)
1571     {
1572     case SHUT_RDWR:
1573       switch (socket->state)
1574         {
1575         case STATE_CLOSE_WAIT:
1576           if (SHUT_RDWR != shutdown_handle->operation)
1577             {
1578               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1579                           "Received CLOSE_ACK when shutdown handle is not for SHUT_RDWR\n");
1580               return GNUNET_OK;
1581             }
1582
1583           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1584                       "Received CLOSE_ACK from %x\n",
1585                       socket->other_peer);
1586           socket->state = STATE_CLOSED;
1587           break;
1588         default:
1589           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1590                       "Received CLOSE_ACK when in it not expected\n");
1591           return GNUNET_OK;
1592         }
1593       break;
1594
1595     case SHUT_RD:
1596       switch (socket->state)
1597         {
1598         case STATE_RECEIVE_CLOSE_WAIT:
1599           if (SHUT_RD != shutdown_handle->operation)
1600             {
1601               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1602                           "Received RECEIVE_CLOSE_ACK when shutdown handle is not for SHUT_RD\n");
1603               return GNUNET_OK;
1604             }
1605
1606           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1607                       "Received RECEIVE_CLOSE_ACK from %x\n",
1608                       socket->other_peer);
1609           socket->state = STATE_RECEIVE_CLOSED;
1610           break;
1611         default:
1612           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1613                       "Received RECEIVE_CLOSE_ACK when in it not expected\n");
1614           return GNUNET_OK;
1615         }
1616
1617       break;
1618     case SHUT_WR:
1619       switch (socket->state)
1620         {
1621         case STATE_TRANSMIT_CLOSE_WAIT:
1622           if (SHUT_WR != shutdown_handle->operation)
1623             {
1624               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1625                           "Received TRANSMIT_CLOSE_ACK when shutdown handle is not for SHUT_WR\n");
1626               return GNUNET_OK;
1627             }
1628
1629           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1630                       "Received TRANSMIT_CLOSE_ACK from %x\n",
1631                       socket->other_peer);
1632           socket->state = STATE_TRANSMIT_CLOSED;
1633           break;
1634         default:
1635           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1636                       "Received TRANSMIT_CLOSE_ACK when in it not expected\n");
1637           
1638           return GNUNET_OK;
1639         }
1640       break;
1641     default:
1642       GNUNET_assert (0);
1643     }
1644
1645   if (NULL != shutdown_handle->completion_cb) /* Shutdown completion */
1646     shutdown_handle->completion_cb(shutdown_handle->completion_cls,
1647                                    operation);
1648   GNUNET_free (shutdown_handle); /* Free shutdown handle */
1649   socket->shutdown_handle = NULL;
1650   if (GNUNET_SCHEDULER_NO_TASK
1651       != shutdown_handle->close_msg_retransmission_task_id)
1652     {
1653       GNUNET_SCHEDULER_cancel
1654         (shutdown_handle->close_msg_retransmission_task_id);
1655       shutdown_handle->close_msg_retransmission_task_id =
1656         GNUNET_SCHEDULER_NO_TASK;
1657     }
1658   return GNUNET_OK;
1659 }
1660
1661
1662 /**
1663  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1664  *
1665  * @param cls the socket (set from GNUNET_MESH_connect)
1666  * @param tunnel connection to the other end
1667  * @param tunnel_ctx this is NULL
1668  * @param sender who sent the message
1669  * @param message the actual message
1670  * @param atsi performance data for the connection
1671  * @return GNUNET_OK to keep the connection open,
1672  *         GNUNET_SYSERR to close it (signal serious error)
1673  */
1674 static int
1675 client_handle_transmit_close_ack (void *cls,
1676                                   struct GNUNET_MESH_Tunnel *tunnel,
1677                                   void **tunnel_ctx,
1678                                   const struct GNUNET_PeerIdentity *sender,
1679                                   const struct GNUNET_MessageHeader *message,
1680                                   const struct GNUNET_ATS_Information*atsi)
1681 {
1682   struct GNUNET_STREAM_Socket *socket = cls;
1683
1684   return handle_generic_close_ack (socket,
1685                                    tunnel,
1686                                    sender,
1687                                    (const struct GNUNET_STREAM_MessageHeader *)
1688                                    message,
1689                                    atsi,
1690                                    SHUT_WR);
1691 }
1692
1693
1694 /**
1695  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1696  *
1697  * @param socket the socket
1698  * @param tunnel connection to the other end
1699  * @param sender who sent the message
1700  * @param message the actual message
1701  * @param atsi performance data for the connection
1702  * @return GNUNET_OK to keep the connection open,
1703  *         GNUNET_SYSERR to close it (signal serious error)
1704  */
1705 static int
1706 handle_receive_close (struct GNUNET_STREAM_Socket *socket,
1707                       struct GNUNET_MESH_Tunnel *tunnel,
1708                       const struct GNUNET_PeerIdentity *sender,
1709                       const struct GNUNET_STREAM_MessageHeader *message,
1710                       const struct GNUNET_ATS_Information *atsi)
1711 {
1712   struct GNUNET_STREAM_MessageHeader *receive_close_ack;
1713
1714   switch (socket->state)
1715     {
1716     case STATE_INIT:
1717     case STATE_LISTEN:
1718     case STATE_HELLO_WAIT:
1719       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1720                   "Ignoring RECEIVE_CLOSE as it cannot be handled now\n");
1721       return GNUNET_OK;
1722     default:
1723       break;
1724     }
1725   
1726   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1727               "Received RECEIVE_CLOSE from %x\n",
1728               socket->other_peer);
1729   receive_close_ack =
1730     GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1731   receive_close_ack->header.size =
1732     htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1733   receive_close_ack->header.type =
1734     htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK);
1735   queue_message (socket,
1736                  receive_close_ack,
1737                  &set_state_closed,
1738                  NULL);
1739   
1740   /* FIXME: Handle the case where write handle is present; the write operation
1741               should be deemed as finised and the write continuation callback
1742               has to be called with the stream status GNUNET_STREAM_SHUTDOWN */
1743   return GNUNET_OK;
1744 }
1745
1746
1747 /**
1748  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1749  *
1750  * @param cls the socket (set from GNUNET_MESH_connect)
1751  * @param tunnel connection to the other end
1752  * @param tunnel_ctx this is NULL
1753  * @param sender who sent the message
1754  * @param message the actual message
1755  * @param atsi performance data for the connection
1756  * @return GNUNET_OK to keep the connection open,
1757  *         GNUNET_SYSERR to close it (signal serious error)
1758  */
1759 static int
1760 client_handle_receive_close (void *cls,
1761                              struct GNUNET_MESH_Tunnel *tunnel,
1762                              void **tunnel_ctx,
1763                              const struct GNUNET_PeerIdentity *sender,
1764                              const struct GNUNET_MessageHeader *message,
1765                              const struct GNUNET_ATS_Information*atsi)
1766 {
1767   struct GNUNET_STREAM_Socket *socket = cls;
1768
1769   return
1770     handle_receive_close (socket,
1771                           tunnel,
1772                           sender,
1773                           (const struct GNUNET_STREAM_MessageHeader *) message,
1774                           atsi);
1775 }
1776
1777
1778 /**
1779  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1780  *
1781  * @param cls the socket (set from GNUNET_MESH_connect)
1782  * @param tunnel connection to the other end
1783  * @param tunnel_ctx this is NULL
1784  * @param sender who sent the message
1785  * @param message the actual message
1786  * @param atsi performance data for the connection
1787  * @return GNUNET_OK to keep the connection open,
1788  *         GNUNET_SYSERR to close it (signal serious error)
1789  */
1790 static int
1791 client_handle_receive_close_ack (void *cls,
1792                                  struct GNUNET_MESH_Tunnel *tunnel,
1793                                  void **tunnel_ctx,
1794                                  const struct GNUNET_PeerIdentity *sender,
1795                                  const struct GNUNET_MessageHeader *message,
1796                                  const struct GNUNET_ATS_Information*atsi)
1797 {
1798   struct GNUNET_STREAM_Socket *socket = cls;
1799
1800   return handle_generic_close_ack (socket,
1801                                    tunnel,
1802                                    sender,
1803                                    (const struct GNUNET_STREAM_MessageHeader *)
1804                                    message,
1805                                    atsi,
1806                                    SHUT_RD);
1807 }
1808
1809
1810 /**
1811  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1812  *
1813  * @param socket the socket
1814  * @param tunnel connection to the other end
1815  * @param sender who sent the message
1816  * @param message the actual message
1817  * @param atsi performance data for the connection
1818  * @return GNUNET_OK to keep the connection open,
1819  *         GNUNET_SYSERR to close it (signal serious error)
1820  */
1821 static int
1822 handle_close (struct GNUNET_STREAM_Socket *socket,
1823               struct GNUNET_MESH_Tunnel *tunnel,
1824               const struct GNUNET_PeerIdentity *sender,
1825               const struct GNUNET_STREAM_MessageHeader *message,
1826               const struct GNUNET_ATS_Information*atsi)
1827 {
1828   struct GNUNET_STREAM_MessageHeader *close_ack;
1829
1830   switch (socket->state)
1831     {
1832     case STATE_INIT:
1833     case STATE_LISTEN:
1834     case STATE_HELLO_WAIT:
1835       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1836                   "Ignoring RECEIVE_CLOSE as it cannot be handled now\n");
1837       return GNUNET_OK;
1838     default:
1839       break;
1840     }
1841
1842   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1843               "Received CLOSE from %x\n",
1844               socket->other_peer);
1845   close_ack = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1846   close_ack->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1847   close_ack->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK);
1848   queue_message (socket,
1849                  close_ack,
1850                  &set_state_closed,
1851                  NULL);
1852   if (socket->state == STATE_CLOSED)
1853     return GNUNET_OK;
1854
1855   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1856   socket->receive_buffer = NULL;
1857   socket->receive_buffer_size = 0;
1858   return GNUNET_OK;
1859 }
1860
1861
1862 /**
1863  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1864  *
1865  * @param cls the socket (set from GNUNET_MESH_connect)
1866  * @param tunnel connection to the other end
1867  * @param tunnel_ctx this is NULL
1868  * @param sender who sent the message
1869  * @param message the actual message
1870  * @param atsi performance data for the connection
1871  * @return GNUNET_OK to keep the connection open,
1872  *         GNUNET_SYSERR to close it (signal serious error)
1873  */
1874 static int
1875 client_handle_close (void *cls,
1876                      struct GNUNET_MESH_Tunnel *tunnel,
1877                      void **tunnel_ctx,
1878                      const struct GNUNET_PeerIdentity *sender,
1879                      const struct GNUNET_MessageHeader *message,
1880                      const struct GNUNET_ATS_Information*atsi)
1881 {
1882   struct GNUNET_STREAM_Socket *socket = cls;
1883
1884   return handle_close (socket,
1885                        tunnel,
1886                        sender,
1887                        (const struct GNUNET_STREAM_MessageHeader *) message,
1888                        atsi);
1889 }
1890
1891
1892 /**
1893  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1894  *
1895  * @param cls the socket (set from GNUNET_MESH_connect)
1896  * @param tunnel connection to the other end
1897  * @param tunnel_ctx this is NULL
1898  * @param sender who sent the message
1899  * @param message the actual message
1900  * @param atsi performance data for the connection
1901  * @return GNUNET_OK to keep the connection open,
1902  *         GNUNET_SYSERR to close it (signal serious error)
1903  */
1904 static int
1905 client_handle_close_ack (void *cls,
1906                          struct GNUNET_MESH_Tunnel *tunnel,
1907                          void **tunnel_ctx,
1908                          const struct GNUNET_PeerIdentity *sender,
1909                          const struct GNUNET_MessageHeader *message,
1910                          const struct GNUNET_ATS_Information *atsi)
1911 {
1912   struct GNUNET_STREAM_Socket *socket = cls;
1913
1914   return handle_generic_close_ack (socket,
1915                                    tunnel,
1916                                    sender,
1917                                    (const struct GNUNET_STREAM_MessageHeader *) 
1918                                    message,
1919                                    atsi,
1920                                    SHUT_RDWR);
1921 }
1922
1923 /*****************************/
1924 /* Server's Message Handlers */
1925 /*****************************/
1926
1927 /**
1928  * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1929  *
1930  * @param cls the closure
1931  * @param tunnel connection to the other end
1932  * @param tunnel_ctx the socket
1933  * @param sender who sent the message
1934  * @param message the actual message
1935  * @param atsi performance data for the connection
1936  * @return GNUNET_OK to keep the connection open,
1937  *         GNUNET_SYSERR to close it (signal serious error)
1938  */
1939 static int
1940 server_handle_data (void *cls,
1941                     struct GNUNET_MESH_Tunnel *tunnel,
1942                     void **tunnel_ctx,
1943                     const struct GNUNET_PeerIdentity *sender,
1944                     const struct GNUNET_MessageHeader *message,
1945                     const struct GNUNET_ATS_Information*atsi)
1946 {
1947   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1948
1949   return handle_data (socket,
1950                       tunnel,
1951                       sender,
1952                       (const struct GNUNET_STREAM_DataMessage *)message,
1953                       atsi);
1954 }
1955
1956
1957 /**
1958  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
1959  *
1960  * @param cls the closure
1961  * @param tunnel connection to the other end
1962  * @param tunnel_ctx the socket
1963  * @param sender who sent the message
1964  * @param message the actual message
1965  * @param atsi performance data for the connection
1966  * @return GNUNET_OK to keep the connection open,
1967  *         GNUNET_SYSERR to close it (signal serious error)
1968  */
1969 static int
1970 server_handle_hello (void *cls,
1971                      struct GNUNET_MESH_Tunnel *tunnel,
1972                      void **tunnel_ctx,
1973                      const struct GNUNET_PeerIdentity *sender,
1974                      const struct GNUNET_MessageHeader *message,
1975                      const struct GNUNET_ATS_Information*atsi)
1976 {
1977   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1978   struct GNUNET_STREAM_HelloAckMessage *reply;
1979
1980   if (GNUNET_PEER_search (sender) != socket->other_peer)
1981     {
1982       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1983                   "Received HELLO from non-confirming peer\n");
1984       return GNUNET_YES;
1985     }
1986
1987   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == 
1988                  ntohs (message->type));
1989   GNUNET_assert (socket->tunnel == tunnel);
1990   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1991               "Received HELLO from %x\n", 
1992               socket->other_peer);
1993
1994   if (STATE_INIT == socket->state)
1995     {
1996       reply = generate_hello_ack_msg (socket);
1997       queue_message (socket, 
1998                      &reply->header,
1999                      &set_state_hello_wait, 
2000                      NULL);
2001     }
2002   else
2003     {
2004       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2005                   "Client sent HELLO when in state %d\n", socket->state);
2006       /* FIXME: Send RESET? */
2007       
2008     }
2009   return GNUNET_OK;
2010 }
2011
2012
2013 /**
2014  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
2015  *
2016  * @param cls the closure
2017  * @param tunnel connection to the other end
2018  * @param tunnel_ctx the socket
2019  * @param sender who sent the message
2020  * @param message the actual message
2021  * @param atsi performance data for the connection
2022  * @return GNUNET_OK to keep the connection open,
2023  *         GNUNET_SYSERR to close it (signal serious error)
2024  */
2025 static int
2026 server_handle_hello_ack (void *cls,
2027                          struct GNUNET_MESH_Tunnel *tunnel,
2028                          void **tunnel_ctx,
2029                          const struct GNUNET_PeerIdentity *sender,
2030                          const struct GNUNET_MessageHeader *message,
2031                          const struct GNUNET_ATS_Information*atsi)
2032 {
2033   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2034   const struct GNUNET_STREAM_HelloAckMessage *ack_message;
2035
2036   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK ==
2037                  ntohs (message->type));
2038   GNUNET_assert (socket->tunnel == tunnel);
2039   ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
2040   if (STATE_HELLO_WAIT == socket->state)
2041     {
2042       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2043                   "Received HELLO_ACK from %x\n",
2044                   socket->other_peer);
2045       socket->read_sequence_number = ntohl (ack_message->sequence_number);
2046       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2047                   "Read sequence number %u\n",
2048                   (unsigned int) socket->read_sequence_number);
2049       socket->receiver_window_available = 
2050         ntohl (ack_message->receiver_window_size);
2051       /* Attain ESTABLISHED state */
2052       set_state_established (NULL, socket);
2053     }
2054   else
2055     {
2056       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2057                   "Client sent HELLO_ACK when in state %d\n", socket->state);
2058       /* FIXME: Send RESET? */
2059       
2060     }
2061   return GNUNET_OK;
2062 }
2063
2064
2065 /**
2066  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
2067  *
2068  * @param cls the closure
2069  * @param tunnel connection to the other end
2070  * @param tunnel_ctx the socket
2071  * @param sender who sent the message
2072  * @param message the actual message
2073  * @param atsi performance data for the connection
2074  * @return GNUNET_OK to keep the connection open,
2075  *         GNUNET_SYSERR to close it (signal serious error)
2076  */
2077 static int
2078 server_handle_reset (void *cls,
2079                      struct GNUNET_MESH_Tunnel *tunnel,
2080                      void **tunnel_ctx,
2081                      const struct GNUNET_PeerIdentity *sender,
2082                      const struct GNUNET_MessageHeader *message,
2083                      const struct GNUNET_ATS_Information*atsi)
2084 {
2085   // struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2086
2087   return GNUNET_OK;
2088 }
2089
2090
2091 /**
2092  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
2093  *
2094  * @param cls the closure
2095  * @param tunnel connection to the other end
2096  * @param tunnel_ctx the socket
2097  * @param sender who sent the message
2098  * @param message the actual message
2099  * @param atsi performance data for the connection
2100  * @return GNUNET_OK to keep the connection open,
2101  *         GNUNET_SYSERR to close it (signal serious error)
2102  */
2103 static int
2104 server_handle_transmit_close (void *cls,
2105                               struct GNUNET_MESH_Tunnel *tunnel,
2106                               void **tunnel_ctx,
2107                               const struct GNUNET_PeerIdentity *sender,
2108                               const struct GNUNET_MessageHeader *message,
2109                               const struct GNUNET_ATS_Information*atsi)
2110 {
2111   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2112
2113   return handle_transmit_close (socket,
2114                                 tunnel,
2115                                 sender,
2116                                 (struct GNUNET_STREAM_MessageHeader *)message,
2117                                 atsi);
2118 }
2119
2120
2121 /**
2122  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
2123  *
2124  * @param cls the closure
2125  * @param tunnel connection to the other end
2126  * @param tunnel_ctx the socket
2127  * @param sender who sent the message
2128  * @param message the actual message
2129  * @param atsi performance data for the connection
2130  * @return GNUNET_OK to keep the connection open,
2131  *         GNUNET_SYSERR to close it (signal serious error)
2132  */
2133 static int
2134 server_handle_transmit_close_ack (void *cls,
2135                                   struct GNUNET_MESH_Tunnel *tunnel,
2136                                   void **tunnel_ctx,
2137                                   const struct GNUNET_PeerIdentity *sender,
2138                                   const struct GNUNET_MessageHeader *message,
2139                                   const struct GNUNET_ATS_Information*atsi)
2140 {
2141   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2142
2143   return handle_generic_close_ack (socket,
2144                                    tunnel,
2145                                    sender,
2146                                    (const struct GNUNET_STREAM_MessageHeader *)
2147                                    message,
2148                                    atsi,
2149                                    SHUT_WR);
2150 }
2151
2152
2153 /**
2154  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
2155  *
2156  * @param cls the closure
2157  * @param tunnel connection to the other end
2158  * @param tunnel_ctx the socket
2159  * @param sender who sent the message
2160  * @param message the actual message
2161  * @param atsi performance data for the connection
2162  * @return GNUNET_OK to keep the connection open,
2163  *         GNUNET_SYSERR to close it (signal serious error)
2164  */
2165 static int
2166 server_handle_receive_close (void *cls,
2167                              struct GNUNET_MESH_Tunnel *tunnel,
2168                              void **tunnel_ctx,
2169                              const struct GNUNET_PeerIdentity *sender,
2170                              const struct GNUNET_MessageHeader *message,
2171                              const struct GNUNET_ATS_Information*atsi)
2172 {
2173   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2174
2175   return
2176     handle_receive_close (socket,
2177                           tunnel,
2178                           sender,
2179                           (const struct GNUNET_STREAM_MessageHeader *) message,
2180                           atsi);
2181 }
2182
2183
2184 /**
2185  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
2186  *
2187  * @param cls the closure
2188  * @param tunnel connection to the other end
2189  * @param tunnel_ctx the socket
2190  * @param sender who sent the message
2191  * @param message the actual message
2192  * @param atsi performance data for the connection
2193  * @return GNUNET_OK to keep the connection open,
2194  *         GNUNET_SYSERR to close it (signal serious error)
2195  */
2196 static int
2197 server_handle_receive_close_ack (void *cls,
2198                                  struct GNUNET_MESH_Tunnel *tunnel,
2199                                  void **tunnel_ctx,
2200                                  const struct GNUNET_PeerIdentity *sender,
2201                                  const struct GNUNET_MessageHeader *message,
2202                                  const struct GNUNET_ATS_Information*atsi)
2203 {
2204   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2205
2206   return handle_generic_close_ack (socket,
2207                                    tunnel,
2208                                    sender,
2209                                    (const struct GNUNET_STREAM_MessageHeader *)
2210                                    message,
2211                                    atsi,
2212                                    SHUT_RD);
2213 }
2214
2215
2216 /**
2217  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
2218  *
2219  * @param cls the listen socket (from GNUNET_MESH_connect in
2220  *          GNUNET_STREAM_listen) 
2221  * @param tunnel connection to the other end
2222  * @param tunnel_ctx the socket
2223  * @param sender who sent the message
2224  * @param message the actual message
2225  * @param atsi performance data for the connection
2226  * @return GNUNET_OK to keep the connection open,
2227  *         GNUNET_SYSERR to close it (signal serious error)
2228  */
2229 static int
2230 server_handle_close (void *cls,
2231                      struct GNUNET_MESH_Tunnel *tunnel,
2232                      void **tunnel_ctx,
2233                      const struct GNUNET_PeerIdentity *sender,
2234                      const struct GNUNET_MessageHeader *message,
2235                      const struct GNUNET_ATS_Information*atsi)
2236 {
2237   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2238   
2239   return handle_close (socket,
2240                        tunnel,
2241                        sender,
2242                        (const struct GNUNET_STREAM_MessageHeader *) message,
2243                        atsi);
2244 }
2245
2246
2247 /**
2248  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
2249  *
2250  * @param cls the closure
2251  * @param tunnel connection to the other end
2252  * @param tunnel_ctx the socket
2253  * @param sender who sent the message
2254  * @param message the actual message
2255  * @param atsi performance data for the connection
2256  * @return GNUNET_OK to keep the connection open,
2257  *         GNUNET_SYSERR to close it (signal serious error)
2258  */
2259 static int
2260 server_handle_close_ack (void *cls,
2261                          struct GNUNET_MESH_Tunnel *tunnel,
2262                          void **tunnel_ctx,
2263                          const struct GNUNET_PeerIdentity *sender,
2264                          const struct GNUNET_MessageHeader *message,
2265                          const struct GNUNET_ATS_Information*atsi)
2266 {
2267   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2268
2269   return handle_generic_close_ack (socket,
2270                                    tunnel,
2271                                    sender,
2272                                    (const struct GNUNET_STREAM_MessageHeader *) 
2273                                    message,
2274                                    atsi,
2275                                    SHUT_RDWR);
2276 }
2277
2278
2279 /**
2280  * Handler for DATA_ACK messages
2281  *
2282  * @param socket the socket through which the ack was received
2283  * @param tunnel connection to the other end
2284  * @param sender who sent the message
2285  * @param ack the acknowledgment message
2286  * @param atsi performance data for the connection
2287  * @return GNUNET_OK to keep the connection open,
2288  *         GNUNET_SYSERR to close it (signal serious error)
2289  */
2290 static int
2291 handle_ack (struct GNUNET_STREAM_Socket *socket,
2292             struct GNUNET_MESH_Tunnel *tunnel,
2293             const struct GNUNET_PeerIdentity *sender,
2294             const struct GNUNET_STREAM_AckMessage *ack,
2295             const struct GNUNET_ATS_Information*atsi)
2296 {
2297   unsigned int packet;
2298   int need_retransmission;
2299   
2300
2301   if (GNUNET_PEER_search (sender) != socket->other_peer)
2302     {
2303       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2304                   "Received ACK from non-confirming peer\n");
2305       return GNUNET_YES;
2306     }
2307
2308   switch (socket->state)
2309     {
2310     case (STATE_ESTABLISHED):
2311     case (STATE_RECEIVE_CLOSED):
2312     case (STATE_RECEIVE_CLOSE_WAIT):
2313       if (NULL == socket->write_handle)
2314         {
2315           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2316                       "Received DATA_ACK when write_handle is NULL\n");
2317           return GNUNET_OK;
2318         }
2319       /* FIXME: increment in the base sequence number is breaking current flow
2320        */
2321       if (!((socket->write_sequence_number 
2322              - ntohl (ack->base_sequence_number)) < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
2323         {
2324           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2325                       "Received DATA_ACK with unexpected base sequence number\n");
2326           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2327                       "Current write sequence: %u; Ack's base sequence: %u\n",
2328                       socket->write_sequence_number,
2329                       ntohl (ack->base_sequence_number));
2330           return GNUNET_OK;
2331         }
2332       /* FIXME: include the case when write_handle is cancelled - ignore the 
2333          acks */
2334
2335       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2336                   "Received DATA_ACK from %x\n",
2337                   socket->other_peer);
2338       
2339       /* Cancel the retransmission task */
2340       if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
2341         {
2342           GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
2343           socket->retransmission_timeout_task_id = 
2344             GNUNET_SCHEDULER_NO_TASK;
2345         }
2346
2347       for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2348         {
2349           if (NULL == socket->write_handle->messages[packet]) break;
2350           if (ntohl (ack->base_sequence_number)
2351               >= ntohl (socket->write_handle->messages[packet]->sequence_number))
2352             ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
2353                                   packet,
2354                                   GNUNET_YES);
2355           else
2356             if (GNUNET_YES == 
2357                 ackbitmap_is_bit_set (&socket->write_handle->ack_bitmap,
2358                                       ntohl (socket->write_handle->messages[packet]->sequence_number)
2359                                       - ntohl (ack->base_sequence_number)))
2360               ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
2361                                     packet,
2362                                     GNUNET_YES);
2363         }
2364
2365       /* Update the receive window remaining
2366        FIXME : Should update with the value from a data ack with greater
2367        sequence number */
2368       socket->receiver_window_available = 
2369         ntohl (ack->receive_window_remaining);
2370
2371       /* Check if we have received all acknowledgements */
2372       need_retransmission = GNUNET_NO;
2373       for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2374         {
2375           if (NULL == socket->write_handle->messages[packet]) break;
2376           if (GNUNET_YES != ackbitmap_is_bit_set 
2377               (&socket->write_handle->ack_bitmap,packet))
2378             {
2379               need_retransmission = GNUNET_YES;
2380               break;
2381             }
2382         }
2383       if (GNUNET_YES == need_retransmission)
2384         {
2385           write_data (socket);
2386         }
2387       else      /* We have to call the write continuation callback now */
2388         {
2389           /* Free the packets */
2390           for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2391             {
2392               GNUNET_free_non_null (socket->write_handle->messages[packet]);
2393             }
2394           if (NULL != socket->write_handle->write_cont)
2395             socket->write_handle->write_cont
2396               (socket->write_handle->write_cont_cls,
2397                socket->status,
2398                socket->write_handle->size);
2399           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2400                       "Write completion callback completed\n");
2401           /* We are done with the write handle - Freeing it */
2402           GNUNET_free (socket->write_handle);
2403           socket->write_handle = NULL;
2404         }
2405       break;
2406     default:
2407       break;
2408     }
2409   return GNUNET_OK;
2410 }
2411
2412
2413 /**
2414  * Handler for DATA_ACK messages
2415  *
2416  * @param cls the 'struct GNUNET_STREAM_Socket'
2417  * @param tunnel connection to the other end
2418  * @param tunnel_ctx unused
2419  * @param sender who sent the message
2420  * @param message the actual message
2421  * @param atsi performance data for the connection
2422  * @return GNUNET_OK to keep the connection open,
2423  *         GNUNET_SYSERR to close it (signal serious error)
2424  */
2425 static int
2426 client_handle_ack (void *cls,
2427                    struct GNUNET_MESH_Tunnel *tunnel,
2428                    void **tunnel_ctx,
2429                    const struct GNUNET_PeerIdentity *sender,
2430                    const struct GNUNET_MessageHeader *message,
2431                    const struct GNUNET_ATS_Information*atsi)
2432 {
2433   struct GNUNET_STREAM_Socket *socket = cls;
2434   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2435  
2436   return handle_ack (socket, tunnel, sender, ack, atsi);
2437 }
2438
2439
2440 /**
2441  * Handler for DATA_ACK messages
2442  *
2443  * @param cls the server's listen socket
2444  * @param tunnel connection to the other end
2445  * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
2446  * @param sender who sent the message
2447  * @param message the actual message
2448  * @param atsi performance data for the connection
2449  * @return GNUNET_OK to keep the connection open,
2450  *         GNUNET_SYSERR to close it (signal serious error)
2451  */
2452 static int
2453 server_handle_ack (void *cls,
2454                    struct GNUNET_MESH_Tunnel *tunnel,
2455                    void **tunnel_ctx,
2456                    const struct GNUNET_PeerIdentity *sender,
2457                    const struct GNUNET_MessageHeader *message,
2458                    const struct GNUNET_ATS_Information*atsi)
2459 {
2460   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2461   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2462  
2463   return handle_ack (socket, tunnel, sender, ack, atsi);
2464 }
2465
2466
2467 /**
2468  * For client message handlers, the stream socket is in the
2469  * closure argument.
2470  */
2471 static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
2472   {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2473   {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2474    sizeof (struct GNUNET_STREAM_AckMessage) },
2475   {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2476    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2477   {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2478    sizeof (struct GNUNET_STREAM_MessageHeader)},
2479   {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2480    sizeof (struct GNUNET_STREAM_MessageHeader)},
2481   {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2482    sizeof (struct GNUNET_STREAM_MessageHeader)},
2483   {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2484    sizeof (struct GNUNET_STREAM_MessageHeader)},
2485   {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2486    sizeof (struct GNUNET_STREAM_MessageHeader)},
2487   {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2488    sizeof (struct GNUNET_STREAM_MessageHeader)},
2489   {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2490    sizeof (struct GNUNET_STREAM_MessageHeader)},
2491   {NULL, 0, 0}
2492 };
2493
2494
2495 /**
2496  * For server message handlers, the stream socket is in the
2497  * tunnel context, and the listen socket in the closure argument.
2498  */
2499 static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
2500   {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2501   {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2502    sizeof (struct GNUNET_STREAM_AckMessage) },
2503   {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 
2504    sizeof (struct GNUNET_STREAM_MessageHeader)},
2505   {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2506    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2507   {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2508    sizeof (struct GNUNET_STREAM_MessageHeader)},
2509   {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2510    sizeof (struct GNUNET_STREAM_MessageHeader)},
2511   {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2512    sizeof (struct GNUNET_STREAM_MessageHeader)},
2513   {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2514    sizeof (struct GNUNET_STREAM_MessageHeader)},
2515   {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2516    sizeof (struct GNUNET_STREAM_MessageHeader)},
2517   {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2518    sizeof (struct GNUNET_STREAM_MessageHeader)},
2519   {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2520    sizeof (struct GNUNET_STREAM_MessageHeader)},
2521   {NULL, 0, 0}
2522 };
2523
2524
2525 /**
2526  * Function called when our target peer is connected to our tunnel
2527  *
2528  * @param cls the socket for which this tunnel is created
2529  * @param peer the peer identity of the target
2530  * @param atsi performance data for the connection
2531  */
2532 static void
2533 mesh_peer_connect_callback (void *cls,
2534                             const struct GNUNET_PeerIdentity *peer,
2535                             const struct GNUNET_ATS_Information * atsi)
2536 {
2537   struct GNUNET_STREAM_Socket *socket = cls;
2538   struct GNUNET_STREAM_MessageHeader *message;
2539   GNUNET_PEER_Id connected_peer;
2540
2541   connected_peer = GNUNET_PEER_search (peer);
2542   
2543   if (connected_peer != socket->other_peer)
2544     {
2545       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2546                   "A peer which is not our target has connected to our tunnel\n");
2547       return;
2548     }
2549   
2550   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2551               "Target peer %x connected\n", 
2552               connected_peer);
2553   
2554   /* Set state to INIT */
2555   socket->state = STATE_INIT;
2556
2557   /* Send HELLO message */
2558   message = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
2559   message->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
2560   message->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
2561   queue_message (socket,
2562                  message,
2563                  &set_state_hello_wait,
2564                  NULL);
2565
2566   /* Call open callback */
2567   if (NULL == socket->open_cb)
2568     {
2569       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2570                   "STREAM_open callback is NULL\n");
2571     }
2572 }
2573
2574
2575 /**
2576  * Function called when our target peer is disconnected from our tunnel
2577  *
2578  * @param cls the socket associated which this tunnel
2579  * @param peer the peer identity of the target
2580  */
2581 static void
2582 mesh_peer_disconnect_callback (void *cls,
2583                                const struct GNUNET_PeerIdentity *peer)
2584 {
2585   struct GNUNET_STREAM_Socket *socket=cls;
2586   
2587   /* If the state is SHUTDOWN its ok; else set the state of the socket to SYSERR */
2588   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2589               "Other peer %x disconnected\n",
2590               socket->other_peer);
2591 }
2592
2593
2594 /**
2595  * Method called whenever a peer creates a tunnel to us
2596  *
2597  * @param cls closure
2598  * @param tunnel new handle to the tunnel
2599  * @param initiator peer that started the tunnel
2600  * @param atsi performance information for the tunnel
2601  * @return initial tunnel context for the tunnel
2602  *         (can be NULL -- that's not an error)
2603  */
2604 static void *
2605 new_tunnel_notify (void *cls,
2606                    struct GNUNET_MESH_Tunnel *tunnel,
2607                    const struct GNUNET_PeerIdentity *initiator,
2608                    const struct GNUNET_ATS_Information *atsi)
2609 {
2610   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2611   struct GNUNET_STREAM_Socket *socket;
2612
2613   /* FIXME: If a tunnel is already created, we should not accept new tunnels
2614      from the same peer again until the socket is closed */
2615
2616   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2617   socket->other_peer = GNUNET_PEER_intern (initiator);
2618   socket->tunnel = tunnel;
2619   socket->session_id = 0;       /* FIXME */
2620   socket->state = STATE_INIT;
2621   socket->lsocket = lsocket; 
2622   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2623               "Peer %x initiated tunnel to us\n", 
2624               socket->other_peer);
2625   
2626   /* FIXME: Copy MESH handle from lsocket to socket */
2627   
2628   return socket;
2629 }
2630
2631
2632 /**
2633  * Function called whenever an inbound tunnel is destroyed.  Should clean up
2634  * any associated state.  This function is NOT called if the client has
2635  * explicitly asked for the tunnel to be destroyed using
2636  * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
2637  * the tunnel.
2638  *
2639  * @param cls closure (set from GNUNET_MESH_connect)
2640  * @param tunnel connection to the other end (henceforth invalid)
2641  * @param tunnel_ctx place where local state associated
2642  *                   with the tunnel is stored
2643  */
2644 static void 
2645 tunnel_cleaner (void *cls,
2646                 const struct GNUNET_MESH_Tunnel *tunnel,
2647                 void *tunnel_ctx)
2648 {
2649   struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
2650
2651   if (tunnel != socket->tunnel)
2652     return;
2653
2654   GNUNET_break_op(0);
2655   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2656               "Peer %x has terminated connection abruptly\n",
2657               socket->other_peer);
2658
2659   socket->status = GNUNET_STREAM_SHUTDOWN;
2660
2661   /* Clear Transmit handles */
2662   if (NULL != socket->transmit_handle)
2663     {
2664       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2665       socket->transmit_handle = NULL;
2666     }
2667   if (NULL != socket->ack_transmit_handle)
2668     {
2669       GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle);
2670       GNUNET_free (socket->ack_msg);
2671       socket->ack_msg = NULL;
2672       socket->ack_transmit_handle = NULL;
2673     }
2674   /* Stop Tasks using socket->tunnel */
2675   if (GNUNET_SCHEDULER_NO_TASK != socket->ack_task_id)
2676     {
2677       GNUNET_SCHEDULER_cancel (socket->ack_task_id);
2678       socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
2679     }
2680   if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
2681     {
2682       GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
2683       socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
2684     }
2685   /* FIXME: Cancel all other tasks using socket->tunnel */
2686   socket->tunnel = NULL;
2687 }
2688
2689
2690 /*****************/
2691 /* API functions */
2692 /*****************/
2693
2694
2695 /**
2696  * Tries to open a stream to the target peer
2697  *
2698  * @param cfg configuration to use
2699  * @param target the target peer to which the stream has to be opened
2700  * @param app_port the application port number which uniquely identifies this
2701  *            stream
2702  * @param open_cb this function will be called after stream has be established 
2703  * @param open_cb_cls the closure for open_cb
2704  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
2705  * @return if successful it returns the stream socket; NULL if stream cannot be
2706  *         opened 
2707  */
2708 struct GNUNET_STREAM_Socket *
2709 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
2710                     const struct GNUNET_PeerIdentity *target,
2711                     GNUNET_MESH_ApplicationType app_port,
2712                     GNUNET_STREAM_OpenCallback open_cb,
2713                     void *open_cb_cls,
2714                     ...)
2715 {
2716   struct GNUNET_STREAM_Socket *socket;
2717   enum GNUNET_STREAM_Option option;
2718   GNUNET_MESH_ApplicationType ports[] = {app_port, 0};
2719   va_list vargs;                /* Variable arguments */
2720
2721   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2722               "%s\n", __func__);
2723
2724   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2725   socket->other_peer = GNUNET_PEER_intern (target);
2726   socket->open_cb = open_cb;
2727   socket->open_cls = open_cb_cls;
2728   /* Set defaults */
2729   socket->retransmit_timeout = 
2730     GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
2731
2732   va_start (vargs, open_cb_cls); /* Parse variable args */
2733   do {
2734     option = va_arg (vargs, enum GNUNET_STREAM_Option);
2735     switch (option)
2736       {
2737       case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
2738         /* Expect struct GNUNET_TIME_Relative */
2739         socket->retransmit_timeout = va_arg (vargs,
2740                                              struct GNUNET_TIME_Relative);
2741         break;
2742       case GNUNET_STREAM_OPTION_END:
2743         break;
2744       }
2745   } while (GNUNET_STREAM_OPTION_END != option);
2746   va_end (vargs);               /* End of variable args parsing */
2747   socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
2748                                       10,  /* QUEUE size as parameter? */
2749                                       socket, /* cls */
2750                                       NULL, /* No inbound tunnel handler */
2751                                       NULL, /* No in-tunnel cleaner */
2752                                       client_message_handlers,
2753                                       ports); /* We don't get inbound tunnels */
2754   if (NULL == socket->mesh)   /* Fail if we cannot connect to mesh */
2755     {
2756       GNUNET_free (socket);
2757       return NULL;
2758     }
2759
2760   /* Now create the mesh tunnel to target */
2761   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2762               "Creating MESH Tunnel\n");
2763   socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
2764                                               NULL, /* Tunnel context */
2765                                               &mesh_peer_connect_callback,
2766                                               &mesh_peer_disconnect_callback,
2767                                               socket);
2768   GNUNET_assert (NULL != socket->tunnel);
2769   GNUNET_MESH_peer_request_connect_add (socket->tunnel,
2770                                         target);
2771   
2772   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2773               "%s() END\n", __func__);
2774   return socket;
2775 }
2776
2777
2778 /**
2779  * Shutdown the stream for reading or writing (similar to man 2 shutdown).
2780  *
2781  * @param socket the stream socket
2782  * @param operation SHUT_RD, SHUT_WR or SHUT_RDWR
2783  * @param completion_cb the callback that will be called upon successful
2784  *          shutdown of given operation
2785  * @param completion_cls the closure for the completion callback
2786  * @return the shutdown handle
2787  */
2788 struct GNUNET_STREAM_ShutdownHandle *
2789 GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
2790                         int operation,
2791                         GNUNET_STREAM_ShutdownCompletion completion_cb,
2792                         void *completion_cls)
2793 {
2794   struct GNUNET_STREAM_ShutdownHandle *handle;
2795   struct GNUNET_STREAM_MessageHeader *msg;
2796   
2797   GNUNET_assert (NULL == socket->shutdown_handle);
2798
2799   handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ShutdownHandle));
2800   handle->socket = socket;
2801   handle->completion_cb = completion_cb;
2802   handle->completion_cls = completion_cls;
2803   socket->shutdown_handle = handle;
2804
2805   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
2806   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
2807   switch (operation)
2808     {
2809     case SHUT_RD:
2810       handle->operation = SHUT_RD;
2811       if (NULL != socket->read_handle)
2812         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2813                     "Existing read handle should be cancelled before shutting"
2814                     " down reading\n");
2815       msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
2816       queue_message (socket,
2817                      msg,
2818                      &set_state_receive_close_wait,
2819                      NULL);
2820       break;
2821     case SHUT_WR:
2822       handle->operation = SHUT_WR;
2823       if (NULL != socket->write_handle)
2824         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2825                     "Existing write handle should be cancelled before shutting"
2826                     " down writing\n");
2827       msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
2828       queue_message (socket,
2829                      msg,
2830                      &set_state_transmit_close_wait,
2831                      NULL);
2832       break;
2833     case SHUT_RDWR:
2834       handle->operation = SHUT_RDWR;
2835       if (NULL != socket->write_handle)
2836         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2837                     "Existing write handle should be cancelled before shutting"
2838                     " down writing\n");
2839       if (NULL != socket->read_handle)
2840         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2841                     "Existing read handle should be cancelled before shutting"
2842                     " down reading\n");
2843       msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
2844       queue_message (socket,
2845                      msg,
2846                      &set_state_close_wait,
2847                      NULL);
2848       break;
2849     default:
2850       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2851                   "GNUNET_STREAM_shutdown called with invalid value for "
2852                   "parameter operation -- Ignoring\n");
2853       GNUNET_free (msg);
2854       GNUNET_free (handle);
2855       return NULL;
2856     }
2857   handle->close_msg_retransmission_task_id =
2858     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
2859                                   &close_msg_retransmission_task,
2860                                   handle);
2861   return handle;
2862 }
2863
2864
2865 /**
2866  * Cancels a pending shutdown
2867  *
2868  * @param handle the shutdown handle returned from GNUNET_STREAM_shutdown
2869  */
2870 void
2871 GNUNET_STREAM_shutdown_cancel (struct GNUNET_STREAM_ShutdownHandle *handle)
2872 {
2873   if (GNUNET_SCHEDULER_NO_TASK != handle->close_msg_retransmission_task_id)
2874     GNUNET_SCHEDULER_cancel (handle->close_msg_retransmission_task_id);
2875   GNUNET_free (handle);
2876   return;
2877 }
2878
2879
2880 /**
2881  * Closes the stream
2882  *
2883  * @param socket the stream socket
2884  */
2885 void
2886 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
2887 {
2888   struct MessageQueue *head;
2889
2890   GNUNET_break (NULL == socket->read_handle);
2891   GNUNET_break (NULL == socket->write_handle);
2892
2893   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
2894     {
2895       /* socket closed with read task pending!? */
2896       GNUNET_break (0);
2897       GNUNET_SCHEDULER_cancel (socket->read_task_id);
2898       socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
2899     }
2900   
2901   /* Terminate the ack'ing tasks if they are still present */
2902   if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK)
2903     {
2904       GNUNET_SCHEDULER_cancel (socket->ack_task_id);
2905       socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
2906     }
2907
2908   /* Clear Transmit handles */
2909   if (NULL != socket->transmit_handle)
2910     {
2911       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2912       socket->transmit_handle = NULL;
2913     }
2914   if (NULL != socket->ack_transmit_handle)
2915     {
2916       GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle);
2917       GNUNET_free (socket->ack_msg);
2918       socket->ack_msg = NULL;
2919       socket->ack_transmit_handle = NULL;
2920     }
2921
2922   /* Clear existing message queue */
2923   while (NULL != (head = socket->queue_head)) {
2924     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
2925                                  socket->queue_tail,
2926                                  head);
2927     GNUNET_free (head->message);
2928     GNUNET_free (head);
2929   }
2930
2931   /* Close associated tunnel */
2932   if (NULL != socket->tunnel)
2933     {
2934       GNUNET_MESH_tunnel_destroy (socket->tunnel);
2935       socket->tunnel = NULL;
2936     }
2937
2938   /* Close mesh connection */
2939   if (NULL != socket->mesh && NULL == socket->lsocket)
2940     {
2941       GNUNET_MESH_disconnect (socket->mesh);
2942       socket->mesh = NULL;
2943     }
2944   
2945   /* Release receive buffer */
2946   if (NULL != socket->receive_buffer)
2947     {
2948       GNUNET_free (socket->receive_buffer);
2949     }
2950
2951   GNUNET_free (socket);
2952 }
2953
2954
2955 /**
2956  * Listens for stream connections for a specific application ports
2957  *
2958  * @param cfg the configuration to use
2959  * @param app_port the application port for which new streams will be accepted
2960  * @param listen_cb this function will be called when a peer tries to establish
2961  *            a stream with us
2962  * @param listen_cb_cls closure for listen_cb
2963  * @return listen socket, NULL for any error
2964  */
2965 struct GNUNET_STREAM_ListenSocket *
2966 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
2967                       GNUNET_MESH_ApplicationType app_port,
2968                       GNUNET_STREAM_ListenCallback listen_cb,
2969                       void *listen_cb_cls)
2970 {
2971   /* FIXME: Add variable args for passing configration options? */
2972   struct GNUNET_STREAM_ListenSocket *lsocket;
2973   GNUNET_MESH_ApplicationType ports[] = {app_port, 0};
2974
2975   lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
2976   lsocket->port = app_port;
2977   lsocket->listen_cb = listen_cb;
2978   lsocket->listen_cb_cls = listen_cb_cls;
2979   lsocket->mesh = GNUNET_MESH_connect (cfg,
2980                                        10, /* FIXME: QUEUE size as parameter? */
2981                                        lsocket, /* Closure */
2982                                        &new_tunnel_notify,
2983                                        &tunnel_cleaner,
2984                                        server_message_handlers,
2985                                        ports);
2986   GNUNET_assert (NULL != lsocket->mesh);
2987   return lsocket;
2988 }
2989
2990
2991 /**
2992  * Closes the listen socket
2993  *
2994  * @param lsocket the listen socket
2995  */
2996 void
2997 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
2998 {
2999   /* Close MESH connection */
3000   GNUNET_assert (NULL != lsocket->mesh);
3001   GNUNET_MESH_disconnect (lsocket->mesh);
3002   
3003   GNUNET_free (lsocket);
3004 }
3005
3006
3007 /**
3008  * Tries to write the given data to the stream. The maximum size of data that
3009  * can be written as part of a write operation is (64 * (64000 - sizeof (struct
3010  * GNUNET_STREAM_DataMessage))). If size is greater than this it is not an API
3011  * violation, however only the said number of maximum bytes will be written.
3012  *
3013  * @param socket the socket representing a stream
3014  * @param data the data buffer from where the data is written into the stream
3015  * @param size the number of bytes to be written from the data buffer
3016  * @param timeout the timeout period
3017  * @param write_cont the function to call upon writing some bytes into the
3018  *          stream 
3019  * @param write_cont_cls the closure
3020  *
3021  * @return handle to cancel the operation; if a previous write is pending or
3022  *           the stream has been shutdown for this operation then write_cont is
3023  *           immediately called and NULL is returned.
3024  */
3025 struct GNUNET_STREAM_IOWriteHandle *
3026 GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
3027                      const void *data,
3028                      size_t size,
3029                      struct GNUNET_TIME_Relative timeout,
3030                      GNUNET_STREAM_CompletionContinuation write_cont,
3031                      void *write_cont_cls)
3032 {
3033   unsigned int num_needed_packets;
3034   unsigned int packet;
3035   struct GNUNET_STREAM_IOWriteHandle *io_handle;
3036   uint32_t packet_size;
3037   uint32_t payload_size;
3038   struct GNUNET_STREAM_DataMessage *data_msg;
3039   const void *sweep;
3040   struct GNUNET_TIME_Relative ack_deadline;
3041
3042   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3043               "%s\n", __func__);
3044
3045   /* Return NULL if there is already a write request pending */
3046   if (NULL != socket->write_handle)
3047   {
3048     GNUNET_break (0);
3049     return NULL;
3050   }
3051
3052   switch (socket->state)
3053     {
3054     case STATE_TRANSMIT_CLOSED:
3055     case STATE_TRANSMIT_CLOSE_WAIT:
3056     case STATE_CLOSED:
3057     case STATE_CLOSE_WAIT:
3058       if (NULL != write_cont)
3059         write_cont (write_cont_cls, GNUNET_STREAM_SHUTDOWN, 0);
3060       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3061                   "%s() END\n", __func__);
3062       return NULL;
3063     case STATE_INIT:
3064     case STATE_LISTEN:
3065     case STATE_HELLO_WAIT:
3066       if (NULL != write_cont)
3067         /* FIXME: GNUNET_STREAM_SYSERR?? */
3068         write_cont (write_cont_cls, GNUNET_STREAM_SYSERR, 0);
3069       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3070                   "%s() END\n", __func__);
3071       return NULL;
3072     case STATE_ESTABLISHED:
3073     case STATE_RECEIVE_CLOSED:
3074     case STATE_RECEIVE_CLOSE_WAIT:
3075       break;
3076     }
3077
3078   if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size)
3079     size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH  * max_payload_size;
3080   num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size;
3081   io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
3082   io_handle->socket = socket;
3083   io_handle->write_cont = write_cont;
3084   io_handle->write_cont_cls = write_cont_cls;
3085   io_handle->size = size;
3086   sweep = data;
3087   /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3088      determined from RTT */
3089   ack_deadline = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
3090   /* Divide the given buffer into packets for sending */
3091   for (packet=0; packet < num_needed_packets; packet++)
3092     {
3093       if ((packet + 1) * max_payload_size < size) 
3094         {
3095           payload_size = max_payload_size;
3096           packet_size = MAX_PACKET_SIZE;
3097         }
3098       else 
3099         {
3100           payload_size = size - packet * max_payload_size;
3101           packet_size =  payload_size + sizeof (struct
3102                                                 GNUNET_STREAM_DataMessage); 
3103         }
3104       io_handle->messages[packet] = GNUNET_malloc (packet_size);
3105       io_handle->messages[packet]->header.header.size = htons (packet_size);
3106       io_handle->messages[packet]->header.header.type =
3107         htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
3108       io_handle->messages[packet]->sequence_number =
3109         htonl (socket->write_sequence_number++);
3110       io_handle->messages[packet]->offset = htonl (socket->write_offset);
3111
3112       /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3113          determined from RTT */
3114       io_handle->messages[packet]->ack_deadline =
3115         GNUNET_TIME_relative_hton (ack_deadline);
3116       data_msg = io_handle->messages[packet];
3117       /* Copy data from given buffer to the packet */
3118       memcpy (&data_msg[1],
3119               sweep,
3120               payload_size);
3121       sweep += payload_size;
3122       socket->write_offset += payload_size;
3123     }
3124   socket->write_handle = io_handle;
3125   write_data (socket);
3126
3127   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3128               "%s() END\n", __func__);
3129
3130   return io_handle;
3131 }
3132
3133
3134
3135 /**
3136  * Tries to read data from the stream.
3137  *
3138  * @param socket the socket representing a stream
3139  * @param timeout the timeout period
3140  * @param proc function to call with data (once only)
3141  * @param proc_cls the closure for proc
3142  *
3143  * @return handle to cancel the operation; if the stream has been shutdown for
3144  *           this type of opeartion then the DataProcessor is immediately
3145  *           called with GNUNET_STREAM_SHUTDOWN as status and NULL if returned
3146  */
3147 struct GNUNET_STREAM_IOReadHandle *
3148 GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
3149                     struct GNUNET_TIME_Relative timeout,
3150                     GNUNET_STREAM_DataProcessor proc,
3151                     void *proc_cls)
3152 {
3153   struct GNUNET_STREAM_IOReadHandle *read_handle;
3154   
3155   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3156               "%s()\n", 
3157               __func__);
3158
3159   /* Return NULL if there is already a read handle; the user has to cancel that
3160   first before continuing or has to wait until it is completed */
3161   if (NULL != socket->read_handle) return NULL;
3162
3163   GNUNET_assert (NULL != proc);
3164
3165   switch (socket->state)
3166     {
3167     case STATE_RECEIVE_CLOSED:
3168     case STATE_RECEIVE_CLOSE_WAIT:
3169     case STATE_CLOSED:
3170     case STATE_CLOSE_WAIT:
3171       proc (proc_cls, GNUNET_STREAM_SHUTDOWN, NULL, 0);
3172       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3173                   "%s() END\n",
3174                   __func__);
3175       return NULL;
3176     default:
3177       break;
3178     }
3179
3180   read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
3181   read_handle->proc = proc;
3182   read_handle->proc_cls = proc_cls;
3183   socket->read_handle = read_handle;
3184
3185   /* Check if we have a packet at bitmap 0 */
3186   if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
3187                                           0))
3188     {
3189       socket->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
3190                                                        socket);
3191    
3192     }
3193   
3194   /* Setup the read timeout task */
3195   socket->read_io_timeout_task_id =
3196     GNUNET_SCHEDULER_add_delayed (timeout,
3197                                   &read_io_timeout,
3198                                   socket);
3199   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3200               "%s() END\n",
3201               __func__);
3202   return read_handle;
3203 }
3204
3205
3206 /**
3207  * Cancel pending write operation.
3208  *
3209  * @param ioh handle to operation to cancel
3210  */
3211 void
3212 GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
3213 {
3214   struct GNUNET_STREAM_Socket *socket = ioh->socket;
3215   unsigned int packet;
3216
3217   GNUNET_assert (NULL != socket->write_handle);
3218   GNUNET_assert (socket->write_handle == ioh);
3219
3220   if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
3221     {
3222       GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
3223       socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
3224     }
3225
3226   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
3227     {
3228       if (NULL == ioh->messages[packet]) break;
3229       GNUNET_free (ioh->messages[packet]);
3230     }
3231       
3232   GNUNET_free (socket->write_handle);
3233   socket->write_handle = NULL;
3234   return;
3235 }
3236
3237
3238 /**
3239  * Cancel pending read operation.
3240  *
3241  * @param ioh handle to operation to cancel
3242  */
3243 void
3244 GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh)
3245 {
3246   return;
3247 }