-retransmission of close messages
[oweals/gnunet.git] / src / stream / stream_api.c
1 /*
2   This file is part of GNUnet.
3   (C) 2012 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /* TODO:
22  *
23  * Checks for matching the sender and socket->other_peer in server
24  * message handlers  
25  *
26  * Decrement PEER intern count during socket close and listen close to free the
27  * memory used for PEER interning
28  *
29  * Add code for write io timeout
30  *
31  * Include retransmission for control messages
32  **/
33
34 /**
35  * @file stream/stream_api.c
36  * @brief Implementation of the stream library
37  * @author Sree Harsha Totakura
38  */
39
40
41 #include "platform.h"
42 #include "gnunet_common.h"
43 #include "gnunet_crypto_lib.h"
44 #include "gnunet_stream_lib.h"
45 #include "gnunet_testing_lib.h"
46 #include "stream_protocol.h"
47
48
49 /**
50  * The maximum packet size of a stream packet
51  */
52 #define MAX_PACKET_SIZE 64000
53
54 /**
55  * Receive buffer
56  */
57 #define RECEIVE_BUFFER_SIZE 4096000
58
59 /**
60  * The maximum payload a data message packet can carry
61  */
62 static size_t max_payload_size = 
63   MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
64
65 /**
66  * states in the Protocol
67  */
68 enum State
69   {
70     /**
71      * Client initialization state
72      */
73     STATE_INIT,
74
75     /**
76      * Listener initialization state 
77      */
78     STATE_LISTEN,
79
80     /**
81      * Pre-connection establishment state
82      */
83     STATE_HELLO_WAIT,
84
85     /**
86      * State where a connection has been established
87      */
88     STATE_ESTABLISHED,
89
90     /**
91      * State where the socket is closed on our side and waiting to be ACK'ed
92      */
93     STATE_RECEIVE_CLOSE_WAIT,
94
95     /**
96      * State where the socket is closed for reading
97      */
98     STATE_RECEIVE_CLOSED,
99
100     /**
101      * State where the socket is closed on our side and waiting to be ACK'ed
102      */
103     STATE_TRANSMIT_CLOSE_WAIT,
104
105     /**
106      * State where the socket is closed for writing
107      */
108     STATE_TRANSMIT_CLOSED,
109
110     /**
111      * State where the socket is closed on our side and waiting to be ACK'ed
112      */
113     STATE_CLOSE_WAIT,
114
115     /**
116      * State where the socket is closed
117      */
118     STATE_CLOSED 
119   };
120
121
122 /**
123  * Functions of this type are called when a message is written
124  *
125  * @param cls the closure from queue_message
126  * @param socket the socket the written message was bound to
127  */
128 typedef void (*SendFinishCallback) (void *cls,
129                                     struct GNUNET_STREAM_Socket *socket);
130
131
132 /**
133  * The send message queue
134  */
135 struct MessageQueue
136 {
137   /**
138    * The message
139    */
140   struct GNUNET_STREAM_MessageHeader *message;
141
142   /**
143    * Callback to be called when the message is sent
144    */
145   SendFinishCallback finish_cb;
146
147   /**
148    * The closure for finish_cb
149    */
150   void *finish_cb_cls;
151
152   /**
153    * The next message in queue. Should be NULL in the last message
154    */
155   struct MessageQueue *next;
156
157   /**
158    * The next message in queue. Should be NULL in the first message
159    */
160   struct MessageQueue *prev;
161 };
162
163
164 /**
165  * The STREAM Socket Handler
166  */
167 struct GNUNET_STREAM_Socket
168 {
169   /**
170    * Retransmission timeout
171    */
172   struct GNUNET_TIME_Relative retransmit_timeout;
173
174   /**
175    * The Acknowledgement Bitmap
176    */
177   GNUNET_STREAM_AckBitmap ack_bitmap;
178
179   /**
180    * Time when the Acknowledgement was queued
181    */
182   struct GNUNET_TIME_Absolute ack_time_registered;
183
184   /**
185    * Queued Acknowledgement deadline
186    */
187   struct GNUNET_TIME_Relative ack_time_deadline;
188
189   /**
190    * The mesh handle
191    */
192   struct GNUNET_MESH_Handle *mesh;
193
194   /**
195    * The mesh tunnel handle
196    */
197   struct GNUNET_MESH_Tunnel *tunnel;
198
199   /**
200    * Stream open closure
201    */
202   void *open_cls;
203
204   /**
205    * Stream open callback
206    */
207   GNUNET_STREAM_OpenCallback open_cb;
208
209   /**
210    * The current transmit handle (if a pending transmit request exists)
211    */
212   struct GNUNET_MESH_TransmitHandle *transmit_handle;
213
214   /**
215    * The current message associated with the transmit handle
216    */
217   struct MessageQueue *queue_head;
218
219   /**
220    * The queue tail, should always point to the last message in queue
221    */
222   struct MessageQueue *queue_tail;
223
224   /**
225    * The write IO_handle associated with this socket
226    */
227   struct GNUNET_STREAM_IOWriteHandle *write_handle;
228
229   /**
230    * The read IO_handle associated with this socket
231    */
232   struct GNUNET_STREAM_IOReadHandle *read_handle;
233
234   /**
235    * The shutdown handle associated with this socket
236    */
237   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
238
239   /**
240    * Buffer for storing received messages
241    */
242   void *receive_buffer;
243
244   /**
245    * The listen socket from which this socket is derived. Should be NULL if it
246    * is not a derived socket
247    */
248   struct GNUNET_STREAM_ListenSocket *lsocket;
249
250   /**
251    * Task identifier for the read io timeout task
252    */
253   GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id;
254
255   /**
256    * Task identifier for retransmission task after timeout
257    */
258   GNUNET_SCHEDULER_TaskIdentifier retransmission_timeout_task_id;
259
260   /**
261    * The task for sending timely Acks
262    */
263   GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
264
265   /**
266    * Task scheduled to continue a read operation.
267    */
268   GNUNET_SCHEDULER_TaskIdentifier read_task_id;
269
270   /**
271    * The state of the protocol associated with this socket
272    */
273   enum State state;
274
275   /**
276    * The status of the socket
277    */
278   enum GNUNET_STREAM_Status status;
279
280   /**
281    * The number of previous timeouts; FIXME: currently not used
282    */
283   unsigned int retries;
284
285   /**
286    * The peer identity of the peer at the other end of the stream
287    */
288   GNUNET_PEER_Id other_peer;
289
290   /**
291    * Our Peer Identity (for debugging)
292    */
293   GNUNET_PEER_Id our_id;
294
295   /**
296    * The application port number (type: uint32_t)
297    */
298   GNUNET_MESH_ApplicationType app_port;
299
300   /**
301    * The session id associated with this stream connection
302    * FIXME: Not used currently, may be removed
303    */
304   uint32_t session_id;
305
306   /**
307    * Write sequence number. Set to random when sending HELLO(client) and
308    * HELLO_ACK(server) 
309    */
310   uint32_t write_sequence_number;
311
312   /**
313    * Read sequence number. This number's value is determined during handshake
314    */
315   uint32_t read_sequence_number;
316
317   /**
318    * The receiver buffer size
319    */
320   uint32_t receive_buffer_size;
321
322   /**
323    * The receiver buffer boundaries
324    */
325   uint32_t receive_buffer_boundaries[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
326
327   /**
328    * receiver's available buffer after the last acknowledged packet
329    */
330   uint32_t receiver_window_available;
331
332   /**
333    * The offset pointer used during write operation
334    */
335   uint32_t write_offset;
336
337   /**
338    * The offset after which we are expecting data
339    */
340   uint32_t read_offset;
341
342   /**
343    * The offset upto which user has read from the received buffer
344    */
345   uint32_t copy_offset;
346 };
347
348
349 /**
350  * A socket for listening
351  */
352 struct GNUNET_STREAM_ListenSocket
353 {
354   /**
355    * The mesh handle
356    */
357   struct GNUNET_MESH_Handle *mesh;
358
359   /**
360    * The callback function which is called after successful opening socket
361    */
362   GNUNET_STREAM_ListenCallback listen_cb;
363
364   /**
365    * The call back closure
366    */
367   void *listen_cb_cls;
368
369   /**
370    * Our interned Peer's identity
371    */
372   GNUNET_PEER_Id our_id;
373
374   /**
375    * The service port
376    * FIXME: Remove if not required!
377    */
378   GNUNET_MESH_ApplicationType port;
379 };
380
381
382 /**
383  * The IO Write Handle
384  */
385 struct GNUNET_STREAM_IOWriteHandle
386 {
387   /**
388    * The socket to which this write handle is associated
389    */
390   struct GNUNET_STREAM_Socket *socket;
391
392   /**
393    * The packet_buffers associated with this Handle
394    */
395   struct GNUNET_STREAM_DataMessage *messages[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
396
397   /**
398    * The write continuation callback
399    */
400   GNUNET_STREAM_CompletionContinuation write_cont;
401
402   /**
403    * Write continuation closure
404    */
405   void *write_cont_cls;
406
407   /**
408    * The bitmap of this IOHandle; Corresponding bit for a message is set when
409    * it has been acknowledged by the receiver
410    */
411   GNUNET_STREAM_AckBitmap ack_bitmap;
412
413   /**
414    * Number of bytes in this write handle
415    */
416   size_t size;
417 };
418
419
420 /**
421  * The IO Read Handle
422  */
423 struct GNUNET_STREAM_IOReadHandle
424 {
425   /**
426    * Callback for the read processor
427    */
428   GNUNET_STREAM_DataProcessor proc;
429
430   /**
431    * The closure pointer for the read processor callback
432    */
433   void *proc_cls;
434 };
435
436
437 /**
438  * Handle for Shutdown
439  */
440 struct GNUNET_STREAM_ShutdownHandle
441 {
442   /**
443    * The socket associated with this shutdown handle
444    */
445   struct GNUNET_STREAM_Socket *socket;
446
447   /**
448    * Shutdown completion callback
449    */
450   GNUNET_STREAM_ShutdownCompletion completion_cb;
451
452   /**
453    * Closure for completion callback
454    */
455   void *completion_cls;
456
457   /**
458    * Close message retransmission task id
459    */
460   GNUNET_SCHEDULER_TaskIdentifier close_msg_retransmission_task_id;
461
462   /**
463    * Which operation to shutdown? SHUT_RD, SHUT_WR or SHUT_RDWR
464    */
465   int operation;  
466 };
467
468
469 /**
470  * Default value in seconds for various timeouts
471  */
472 static unsigned int default_timeout = 10;
473
474
475 /**
476  * Callback function for sending queued message
477  *
478  * @param cls closure the socket
479  * @param size number of bytes available in buf
480  * @param buf where the callee should write the message
481  * @return number of bytes written to buf
482  */
483 static size_t
484 send_message_notify (void *cls, size_t size, void *buf)
485 {
486   struct GNUNET_STREAM_Socket *socket = cls;
487   struct GNUNET_PeerIdentity target;
488   struct MessageQueue *head;
489   size_t ret;
490
491   socket->transmit_handle = NULL; /* Remove the transmit handle */
492   head = socket->queue_head;
493   if (NULL == head)
494     return 0; /* just to be safe */
495   GNUNET_PEER_resolve (socket->other_peer, &target);
496   if (0 == size)                /* request timed out */
497     {
498       socket->retries++;
499       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
500                   "Message sending timed out. Retry %d \n",
501                   socket->retries);
502       socket->transmit_handle = 
503         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
504                                            0, /* Corking */
505                                            1, /* Priority */
506                                            /* FIXME: exponential backoff */
507                                            socket->retransmit_timeout,
508                                            &target,
509                                            ntohs (head->message->header.size),
510                                            &send_message_notify,
511                                            socket);
512       return 0;
513     }
514
515   ret = ntohs (head->message->header.size);
516   GNUNET_assert (size >= ret);
517   memcpy (buf, head->message, ret);
518   if (NULL != head->finish_cb)
519     {
520       head->finish_cb (head->finish_cb_cls, socket);
521     }
522   GNUNET_CONTAINER_DLL_remove (socket->queue_head,
523                                socket->queue_tail,
524                                head);
525   GNUNET_free (head->message);
526   GNUNET_free (head);
527   head = socket->queue_head;
528   if (NULL != head)    /* more pending messages to send */
529     {
530       socket->retries = 0;
531       socket->transmit_handle = 
532         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
533                                            0, /* Corking */
534                                            1, /* Priority */
535                                            /* FIXME: exponential backoff */
536                                            socket->retransmit_timeout,
537                                            &target,
538                                            ntohs (head->message->header.size),
539                                            &send_message_notify,
540                                            socket);
541     }
542   return ret;
543 }
544
545
546 /**
547  * Queues a message for sending using the mesh connection of a socket
548  *
549  * @param socket the socket whose mesh connection is used
550  * @param message the message to be sent
551  * @param finish_cb the callback to be called when the message is sent
552  * @param finish_cb_cls the closure for the callback
553  */
554 static void
555 queue_message (struct GNUNET_STREAM_Socket *socket,
556                struct GNUNET_STREAM_MessageHeader *message,
557                SendFinishCallback finish_cb,
558                void *finish_cb_cls)
559 {
560   struct MessageQueue *queue_entity;
561   struct GNUNET_PeerIdentity target;
562
563   GNUNET_assert 
564     ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA)
565      && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK));
566
567   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
568               "%x: Queueing message of type %d and size %d\n",
569               socket->our_id,
570               ntohs (message->header.type),
571               ntohs (message->header.size));
572   GNUNET_assert (NULL != message);
573   queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
574   queue_entity->message = message;
575   queue_entity->finish_cb = finish_cb;
576   queue_entity->finish_cb_cls = finish_cb_cls;
577   GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
578                                     socket->queue_tail,
579                                     queue_entity);
580   if (NULL == socket->transmit_handle)
581   {
582     socket->retries = 0;
583     GNUNET_PEER_resolve (socket->other_peer, &target);
584     socket->transmit_handle = 
585       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
586                                          0, /* Corking */
587                                          1, /* Priority */
588                                          socket->retransmit_timeout,
589                                          &target,
590                                          ntohs (message->header.size),
591                                          &send_message_notify,
592                                          socket);
593   }
594 }
595
596
597 /**
598  * Copies a message and queues it for sending using the mesh connection of
599  * given socket 
600  *
601  * @param socket the socket whose mesh connection is used
602  * @param message the message to be sent
603  * @param finish_cb the callback to be called when the message is sent
604  * @param finish_cb_cls the closure for the callback
605  */
606 static void
607 copy_and_queue_message (struct GNUNET_STREAM_Socket *socket,
608                         const struct GNUNET_STREAM_MessageHeader *message,
609                         SendFinishCallback finish_cb,
610                         void *finish_cb_cls)
611 {
612   struct GNUNET_STREAM_MessageHeader *msg_copy;
613   uint16_t size;
614   
615   size = ntohs (message->header.size);
616   msg_copy = GNUNET_malloc (size);
617   memcpy (msg_copy, message, size);
618   queue_message (socket, msg_copy, finish_cb, finish_cb_cls);
619 }
620
621
622 /**
623  * Callback function for sending ack message
624  *
625  * @param cls closure the ACK message created in ack_task
626  * @param size number of bytes available in buffer
627  * @param buf where the callee should write the message
628  * @return number of bytes written to buf
629  */
630 static size_t
631 send_ack_notify (void *cls, size_t size, void *buf)
632 {
633   struct GNUNET_STREAM_AckMessage *ack_msg = cls;
634
635   if (0 == size)
636     {
637       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
638                   "%s called with size 0\n", __func__);
639       return 0;
640     }
641   GNUNET_assert (ntohs (ack_msg->header.header.size) <= size);
642   
643   size = ntohs (ack_msg->header.header.size);
644   memcpy (buf, ack_msg, size);
645   return size;
646 }
647
648 /**
649  * Writes data using the given socket. The amount of data written is limited by
650  * the receiver_window_size
651  *
652  * @param socket the socket to use
653  */
654 static void 
655 write_data (struct GNUNET_STREAM_Socket *socket);
656
657 /**
658  * Task for retransmitting data messages if they aren't ACK before their ack
659  * deadline 
660  *
661  * @param cls the socket
662  * @param tc the Task context
663  */
664 static void
665 retransmission_timeout_task (void *cls,
666                              const struct GNUNET_SCHEDULER_TaskContext *tc)
667 {
668   struct GNUNET_STREAM_Socket *socket = cls;
669   
670   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
671     return;
672
673   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
674               "%x: Retransmitting DATA...\n", socket->our_id);
675   socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
676   write_data (socket);
677 }
678
679
680 /**
681  * Task for sending ACK message
682  *
683  * @param cls the socket
684  * @param tc the Task context
685  */
686 static void
687 ack_task (void *cls,
688           const struct GNUNET_SCHEDULER_TaskContext *tc)
689 {
690   struct GNUNET_STREAM_Socket *socket = cls;
691   struct GNUNET_STREAM_AckMessage *ack_msg;
692   struct GNUNET_PeerIdentity target;
693
694   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
695     {
696       return;
697     }
698
699   socket->ack_task_id = 0;
700
701   /* Create the ACK Message */
702   ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
703   ack_msg->header.header.size = htons (sizeof (struct 
704                                                GNUNET_STREAM_AckMessage));
705   ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
706   ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
707   ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
708   ack_msg->receive_window_remaining = 
709     htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
710
711   GNUNET_PEER_resolve (socket->other_peer, &target);
712   /* Request MESH for sending ACK */
713   GNUNET_MESH_notify_transmit_ready (socket->tunnel,
714                                      0, /* Corking */
715                                      1, /* Priority */
716                                      socket->retransmit_timeout,
717                                      &target,
718                                      ntohs (ack_msg->header.header.size),
719                                      &send_ack_notify,
720                                      ack_msg);
721
722   
723 }
724
725
726 /**
727  * Retransmission task for shutdown messages
728  *
729  * @param cls the shutdown handle
730  * @param tc the Task Context
731  */
732 static void
733 close_msg_retransmission_task (void *cls,
734                                const struct GNUNET_SCHEDULER_TaskContext *tc)
735 {
736   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle = cls;
737   struct GNUNET_STREAM_MessageHeader *msg;
738   struct GNUNET_STREAM_Socket *socket;
739
740   GNUNET_assert (NULL != shutdown_handle);
741   socket = shutdown_handle->socket;
742
743   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
744   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
745   switch (shutdown_handle->operation)
746     {
747     case SHUT_RDWR:
748       msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
749       break;
750     case SHUT_RD:
751       msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
752       break;
753     case SHUT_WR:
754       msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
755       break;
756     default:
757       GNUNET_free (msg);
758       shutdown_handle->close_msg_retransmission_task_id = 
759         GNUNET_SCHEDULER_NO_TASK;
760       return;
761     }
762   queue_message (socket, msg, NULL, NULL);
763   shutdown_handle->close_msg_retransmission_task_id =
764     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
765                                   &close_msg_retransmission_task,
766                                   shutdown_handle);
767 }
768
769
770 /**
771  * Function to modify a bit in GNUNET_STREAM_AckBitmap
772  *
773  * @param bitmap the bitmap to modify
774  * @param bit the bit number to modify
775  * @param value GNUNET_YES to on, GNUNET_NO to off
776  */
777 static void
778 ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
779                       unsigned int bit, 
780                       int value)
781 {
782   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
783   if (GNUNET_YES == value)
784     *bitmap |= (1LL << bit);
785   else
786     *bitmap &= ~(1LL << bit);
787 }
788
789
790 /**
791  * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
792  *
793  * @param bitmap address of the bitmap that has to be checked
794  * @param bit the bit number to check
795  * @return GNUNET_YES if the bit is set; GNUNET_NO if not
796  */
797 static uint8_t
798 ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
799                       unsigned int bit)
800 {
801   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
802   return 0 != (*bitmap & (1LL << bit));
803 }
804
805
806 /**
807  * Writes data using the given socket. The amount of data written is limited by
808  * the receiver_window_size
809  *
810  * @param socket the socket to use
811  */
812 static void 
813 write_data (struct GNUNET_STREAM_Socket *socket)
814 {
815   struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
816   int packet;                   /* Although an int, should never be negative */
817   int ack_packet;
818
819   ack_packet = -1;
820   /* Find the last acknowledged packet */
821   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
822     {      
823       if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
824                                               packet))
825         ack_packet = packet;        
826       else if (NULL == io_handle->messages[packet])
827         break;
828     }
829   /* Resend packets which weren't ack'ed */
830   for (packet=0; packet < ack_packet; packet++)
831     {
832       if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
833                                              packet))
834         {
835           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
836                       "%x: Placing DATA message with sequence %u in send queue\n",
837                       socket->our_id,
838                       ntohl (io_handle->messages[packet]->sequence_number));
839
840           copy_and_queue_message (socket,
841                                   &io_handle->messages[packet]->header,
842                                   NULL,
843                                   NULL);
844         }
845     }
846   packet = ack_packet + 1;
847   /* Now send new packets if there is enough buffer space */
848   while ( (NULL != io_handle->messages[packet]) &&
849           (socket->receiver_window_available 
850            >= ntohs (io_handle->messages[packet]->header.header.size)) )
851     {
852       socket->receiver_window_available -= 
853         ntohs (io_handle->messages[packet]->header.header.size);
854       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
855                   "%x: Placing DATA message with sequence %u in send queue\n",
856                   socket->our_id,
857                   ntohl (io_handle->messages[packet]->sequence_number));
858       copy_and_queue_message (socket,
859                               &io_handle->messages[packet]->header,
860                               NULL,
861                               NULL);
862       packet++;
863     }
864
865   if (GNUNET_SCHEDULER_NO_TASK == socket->retransmission_timeout_task_id)
866     socket->retransmission_timeout_task_id = 
867       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 
868                                     (GNUNET_TIME_UNIT_SECONDS, 8),
869                                     &retransmission_timeout_task,
870                                     socket);
871 }
872
873
874 /**
875  * Task for calling the read processor
876  *
877  * @param cls the socket
878  * @param tc the task context
879  */
880 static void
881 call_read_processor (void *cls,
882                      const struct GNUNET_SCHEDULER_TaskContext *tc)
883 {
884   struct GNUNET_STREAM_Socket *socket = cls;
885   size_t read_size;
886   size_t valid_read_size;
887   unsigned int packet;
888   uint32_t sequence_increase;
889   uint32_t offset_increase;
890
891   socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
892   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
893     return;
894
895   if (NULL == socket->receive_buffer) 
896     return;
897
898   GNUNET_assert (NULL != socket->read_handle);
899   GNUNET_assert (NULL != socket->read_handle->proc);
900
901   /* Check the bitmap for any holes */
902   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
903     {
904       if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
905                                              packet))
906         break;
907     }
908   /* We only call read processor if we have the first packet */
909   GNUNET_assert (0 < packet);
910
911   valid_read_size = 
912     socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
913
914   GNUNET_assert (0 != valid_read_size);
915
916   /* Cancel the read_io_timeout_task */
917   GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task_id);
918   socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
919
920   /* Call the data processor */
921   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
922               "%x: Calling read processor\n",
923               socket->our_id);
924   read_size = 
925     socket->read_handle->proc (socket->read_handle->proc_cls,
926                                socket->status,
927                                socket->receive_buffer + socket->copy_offset,
928                                valid_read_size);
929   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
930               "%x: Read processor read %d bytes\n",
931               socket->our_id,
932               read_size);
933   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
934               "%x: Read processor completed successfully\n",
935               socket->our_id);
936
937   /* Free the read handle */
938   GNUNET_free (socket->read_handle);
939   socket->read_handle = NULL;
940
941   GNUNET_assert (read_size <= valid_read_size);
942   socket->copy_offset += read_size;
943
944   /* Determine upto which packet we can remove from the buffer */
945   for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
946     {
947       if (socket->copy_offset == socket->receive_buffer_boundaries[packet])
948         { packet++; break; }
949       if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
950         break;
951     }
952
953   /* If no packets can be removed we can't move the buffer */
954   if (0 == packet) return;
955
956   sequence_increase = packet;
957   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
958               "%x: Sequence increase after read processor completion: %u\n",
959               socket->our_id,
960               sequence_increase);
961
962   /* Shift the data in the receive buffer */
963   memmove (socket->receive_buffer,
964            socket->receive_buffer 
965            + socket->receive_buffer_boundaries[sequence_increase-1],
966            socket->receive_buffer_size
967            - socket->receive_buffer_boundaries[sequence_increase-1]);
968   
969   /* Shift the bitmap */
970   socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
971   
972   /* Set read_sequence_number */
973   socket->read_sequence_number += sequence_increase;
974   
975   /* Set read_offset */
976   offset_increase = socket->receive_buffer_boundaries[sequence_increase-1];
977   socket->read_offset += offset_increase;
978
979   /* Fix copy_offset */
980   GNUNET_assert (offset_increase <= socket->copy_offset);
981   socket->copy_offset -= offset_increase;
982   
983   /* Fix relative boundaries */
984   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
985     {
986       if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase)
987         {
988           socket->receive_buffer_boundaries[packet] = 
989             socket->receive_buffer_boundaries[packet + sequence_increase] 
990             - offset_increase;
991         }
992       else
993         socket->receive_buffer_boundaries[packet] = 0;
994     }
995 }
996
997
998 /**
999  * Cancels the existing read io handle
1000  *
1001  * @param cls the closure from the SCHEDULER call
1002  * @param tc the task context
1003  */
1004 static void
1005 read_io_timeout (void *cls, 
1006                 const struct GNUNET_SCHEDULER_TaskContext *tc)
1007 {
1008   struct GNUNET_STREAM_Socket *socket = cls;
1009   GNUNET_STREAM_DataProcessor proc;
1010   void *proc_cls;
1011
1012   socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
1013   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
1014   {
1015     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1016                 "%x: Read task timedout - Cancelling it\n",
1017                 socket->our_id);
1018     GNUNET_SCHEDULER_cancel (socket->read_task_id);
1019     socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
1020   }
1021   GNUNET_assert (NULL != socket->read_handle);
1022   proc = socket->read_handle->proc;
1023   proc_cls = socket->read_handle->proc_cls;
1024
1025   GNUNET_free (socket->read_handle);
1026   socket->read_handle = NULL;
1027   /* Call the read processor to signal timeout */
1028   proc (proc_cls,
1029         GNUNET_STREAM_TIMEOUT,
1030         NULL,
1031         0);
1032 }
1033
1034
1035 /**
1036  * Handler for DATA messages; Same for both client and server
1037  *
1038  * @param socket the socket through which the ack was received
1039  * @param tunnel connection to the other end
1040  * @param sender who sent the message
1041  * @param msg the data message
1042  * @param atsi performance data for the connection
1043  * @return GNUNET_OK to keep the connection open,
1044  *         GNUNET_SYSERR to close it (signal serious error)
1045  */
1046 static int
1047 handle_data (struct GNUNET_STREAM_Socket *socket,
1048              struct GNUNET_MESH_Tunnel *tunnel,
1049              const struct GNUNET_PeerIdentity *sender,
1050              const struct GNUNET_STREAM_DataMessage *msg,
1051              const struct GNUNET_ATS_Information*atsi)
1052 {
1053   const void *payload;
1054   uint32_t bytes_needed;
1055   uint32_t relative_offset;
1056   uint32_t relative_sequence_number;
1057   uint16_t size;
1058
1059   size = htons (msg->header.header.size);
1060   if (size < sizeof (struct GNUNET_STREAM_DataMessage))
1061     {
1062       GNUNET_break_op (0);
1063       return GNUNET_SYSERR;
1064     }
1065
1066   if (GNUNET_PEER_search (sender) != socket->other_peer)
1067     {
1068       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1069                   "%x: Received DATA from non-confirming peer\n",
1070                   socket->our_id);
1071       return GNUNET_YES;
1072     }
1073
1074   switch (socket->state)
1075     {
1076     case STATE_ESTABLISHED:
1077     case STATE_TRANSMIT_CLOSED:
1078     case STATE_TRANSMIT_CLOSE_WAIT:
1079       
1080       /* check if the message's sequence number is in the range we are
1081          expecting */
1082       relative_sequence_number = 
1083         ntohl (msg->sequence_number) - socket->read_sequence_number;
1084       if ( relative_sequence_number > GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
1085         {
1086           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1087                       "%x: Ignoring received message with sequence number %u\n",
1088                       socket->our_id,
1089                       ntohl (msg->sequence_number));
1090           /* Start ACK sending task if one is not already present */
1091           if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1092             {
1093               socket->ack_task_id = 
1094                 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1095                                               (msg->ack_deadline),
1096                                               &ack_task,
1097                                               socket);
1098             }
1099           return GNUNET_YES;
1100         }
1101       
1102       /* Check if we have already seen this message */
1103       if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
1104                                               relative_sequence_number))
1105         {
1106           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1107                       "%x: Ignoring already received message with sequence "
1108                       "number %u\n",
1109                       socket->our_id,
1110                       ntohl (msg->sequence_number));
1111           /* Start ACK sending task if one is not already present */
1112           if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1113             {
1114               socket->ack_task_id = 
1115                 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1116                                               (msg->ack_deadline),
1117                                               &ack_task,
1118                                               socket);
1119             }
1120           return GNUNET_YES;
1121         }
1122
1123       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1124                   "%x: Receiving DATA with sequence number: %u and size: %d "
1125                   "from %x\n",
1126                   socket->our_id,
1127                   ntohl (msg->sequence_number),
1128                   ntohs (msg->header.header.size),
1129                   socket->other_peer);
1130
1131       /* Check if we have to allocate the buffer */
1132       size -= sizeof (struct GNUNET_STREAM_DataMessage);
1133       relative_offset = ntohl (msg->offset) - socket->read_offset;
1134       bytes_needed = relative_offset + size;
1135       if (bytes_needed > socket->receive_buffer_size)
1136         {
1137           if (bytes_needed <= RECEIVE_BUFFER_SIZE)
1138             {
1139               socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
1140                                                        bytes_needed);
1141               socket->receive_buffer_size = bytes_needed;
1142             }
1143           else
1144             {
1145               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1146                           "%x: Cannot accommodate packet %d as buffer is",
1147                           "full\n",
1148                           socket->our_id,
1149                           ntohl (msg->sequence_number));
1150               return GNUNET_YES;
1151             }
1152         }
1153       
1154       /* Copy Data to buffer */
1155       payload = &msg[1];
1156       GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
1157       memcpy (socket->receive_buffer + relative_offset,
1158               payload,
1159               size);
1160       socket->receive_buffer_boundaries[relative_sequence_number] = 
1161         relative_offset + size;
1162       
1163       /* Modify the ACK bitmap */
1164       ackbitmap_modify_bit (&socket->ack_bitmap,
1165                             relative_sequence_number,
1166                             GNUNET_YES);
1167
1168       /* Start ACK sending task if one is not already present */
1169       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1170        {
1171          socket->ack_task_id = 
1172            GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1173                                          (msg->ack_deadline),
1174                                          &ack_task,
1175                                          socket);
1176        }
1177
1178       if ((NULL != socket->read_handle) /* A read handle is waiting */
1179           /* There is no current read task */
1180           && (GNUNET_SCHEDULER_NO_TASK == socket->read_task_id)
1181           /* We have the first packet */
1182           && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap,
1183                                                  0)))
1184         {
1185           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1186                       "%x: Scheduling read processor\n",
1187                       socket->our_id);
1188
1189           socket->read_task_id = 
1190             GNUNET_SCHEDULER_add_now (&call_read_processor,
1191                                       socket);
1192         }
1193       
1194       break;
1195
1196     default:
1197       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1198                   "%x: Received data message when it cannot be handled\n",
1199                   socket->our_id);
1200       break;
1201     }
1202   return GNUNET_YES;
1203 }
1204
1205
1206 /**
1207  * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1208  *
1209  * @param cls the socket (set from GNUNET_MESH_connect)
1210  * @param tunnel connection to the other end
1211  * @param tunnel_ctx place to store local state associated with the tunnel
1212  * @param sender who sent the message
1213  * @param message the actual message
1214  * @param atsi performance data for the connection
1215  * @return GNUNET_OK to keep the connection open,
1216  *         GNUNET_SYSERR to close it (signal serious error)
1217  */
1218 static int
1219 client_handle_data (void *cls,
1220              struct GNUNET_MESH_Tunnel *tunnel,
1221              void **tunnel_ctx,
1222              const struct GNUNET_PeerIdentity *sender,
1223              const struct GNUNET_MessageHeader *message,
1224              const struct GNUNET_ATS_Information*atsi)
1225 {
1226   struct GNUNET_STREAM_Socket *socket = cls;
1227
1228   return handle_data (socket, 
1229                       tunnel, 
1230                       sender, 
1231                       (const struct GNUNET_STREAM_DataMessage *) message, 
1232                       atsi);
1233 }
1234
1235
1236 /**
1237  * Callback to set state to ESTABLISHED
1238  *
1239  * @param cls the closure from queue_message FIXME: document
1240  * @param socket the socket to requiring state change
1241  */
1242 static void
1243 set_state_established (void *cls,
1244                        struct GNUNET_STREAM_Socket *socket)
1245 {
1246   struct GNUNET_PeerIdentity initiator_pid;
1247
1248   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
1249               "%x: Attaining ESTABLISHED state\n",
1250               socket->our_id);
1251   socket->write_offset = 0;
1252   socket->read_offset = 0;
1253   socket->state = STATE_ESTABLISHED;
1254   /* FIXME: What if listen_cb is NULL */
1255   if (NULL != socket->lsocket)
1256     {
1257       GNUNET_PEER_resolve (socket->other_peer, &initiator_pid);
1258       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1259                   "%x: Calling listen callback\n",
1260                   socket->our_id);
1261       if (GNUNET_SYSERR == 
1262           socket->lsocket->listen_cb (socket->lsocket->listen_cb_cls,
1263                                       socket,
1264                                       &initiator_pid))
1265         {
1266           socket->state = STATE_CLOSED;
1267           /* FIXME: We should close in a decent way */
1268           GNUNET_MESH_tunnel_destroy (socket->tunnel); /* Destroy the tunnel */
1269           GNUNET_free (socket);
1270         }
1271     }
1272   else if (socket->open_cb)
1273     socket->open_cb (socket->open_cls, socket);
1274 }
1275
1276
1277 /**
1278  * Callback to set state to HELLO_WAIT
1279  *
1280  * @param cls the closure from queue_message
1281  * @param socket the socket to requiring state change
1282  */
1283 static void
1284 set_state_hello_wait (void *cls,
1285                       struct GNUNET_STREAM_Socket *socket)
1286 {
1287   GNUNET_assert (STATE_INIT == socket->state);
1288   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
1289               "%x: Attaining HELLO_WAIT state\n",
1290               socket->our_id);
1291   socket->state = STATE_HELLO_WAIT;
1292 }
1293
1294
1295 /**
1296  * Callback to set state to CLOSE_WAIT
1297  *
1298  * @param cls the closure from queue_message
1299  * @param socket the socket requiring state change
1300  */
1301 static void
1302 set_state_close_wait (void *cls,
1303                       struct GNUNET_STREAM_Socket *socket)
1304 {
1305   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1306               "%x: Attaing CLOSE_WAIT state\n",
1307               socket->our_id);
1308   socket->state = STATE_CLOSE_WAIT;
1309   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1310   socket->receive_buffer = NULL;
1311   socket->receive_buffer_size = 0;
1312 }
1313
1314
1315 /**
1316  * Callback to set state to RECEIVE_CLOSE_WAIT
1317  *
1318  * @param cls the closure from queue_message
1319  * @param socket the socket requiring state change
1320  */
1321 static void
1322 set_state_receive_close_wait (void *cls,
1323                               struct GNUNET_STREAM_Socket *socket)
1324 {
1325   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1326               "%x: Attaing RECEIVE_CLOSE_WAIT state\n",
1327               socket->our_id);
1328   socket->state = STATE_RECEIVE_CLOSE_WAIT;
1329   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1330   socket->receive_buffer = NULL;
1331   socket->receive_buffer_size = 0;
1332 }
1333
1334
1335 /**
1336  * Callback to set state to TRANSMIT_CLOSE_WAIT
1337  *
1338  * @param cls the closure from queue_message
1339  * @param socket the socket requiring state change
1340  */
1341 static void
1342 set_state_transmit_close_wait (void *cls,
1343                                struct GNUNET_STREAM_Socket *socket)
1344 {
1345   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1346               "%x: Attaing TRANSMIT_CLOSE_WAIT state\n",
1347               socket->our_id);
1348   socket->state = STATE_TRANSMIT_CLOSE_WAIT;
1349 }
1350
1351
1352 /**
1353  * Callback to set state to CLOSED
1354  *
1355  * @param cls the closure from queue_message
1356  * @param socket the socket requiring state change
1357  */
1358 static void
1359 set_state_closed (void *cls,
1360                   struct GNUNET_STREAM_Socket *socket)
1361 {
1362   socket->state = STATE_CLOSED;
1363 }
1364
1365 /**
1366  * Returns a new HelloAckMessage. Also sets the write sequence number for the
1367  * socket
1368  *
1369  * @param socket the socket for which this HelloAckMessage has to be generated
1370  * @return the HelloAckMessage
1371  */
1372 static struct GNUNET_STREAM_HelloAckMessage *
1373 generate_hello_ack_msg (struct GNUNET_STREAM_Socket *socket)
1374 {
1375   struct GNUNET_STREAM_HelloAckMessage *msg;
1376
1377   /* Get the random sequence number */
1378   socket->write_sequence_number = 
1379     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1380   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1381               "%x: Generated write sequence number %u\n",
1382               socket->our_id,
1383               (unsigned int) socket->write_sequence_number);
1384   
1385   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1386   msg->header.header.size = 
1387     htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1388   msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1389   msg->sequence_number = htonl (socket->write_sequence_number);
1390   msg->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
1391
1392   return msg;
1393 }
1394
1395
1396 /**
1397  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1398  *
1399  * @param cls the socket (set from GNUNET_MESH_connect)
1400  * @param tunnel connection to the other end
1401  * @param tunnel_ctx this is NULL
1402  * @param sender who sent the message
1403  * @param message the actual message
1404  * @param atsi performance data for the connection
1405  * @return GNUNET_OK to keep the connection open,
1406  *         GNUNET_SYSERR to close it (signal serious error)
1407  */
1408 static int
1409 client_handle_hello_ack (void *cls,
1410                          struct GNUNET_MESH_Tunnel *tunnel,
1411                          void **tunnel_ctx,
1412                          const struct GNUNET_PeerIdentity *sender,
1413                          const struct GNUNET_MessageHeader *message,
1414                          const struct GNUNET_ATS_Information*atsi)
1415 {
1416   struct GNUNET_STREAM_Socket *socket = cls;
1417   const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
1418   struct GNUNET_STREAM_HelloAckMessage *reply;
1419
1420   if (GNUNET_PEER_search (sender) != socket->other_peer)
1421     {
1422       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1423                   "%x: Received HELLO_ACK from non-confirming peer\n",
1424                   socket->our_id);
1425       return GNUNET_YES;
1426     }
1427   ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
1428   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1429               "%x: Received HELLO_ACK from %x\n",
1430               socket->our_id,
1431               socket->other_peer);
1432
1433   GNUNET_assert (socket->tunnel == tunnel);
1434   switch (socket->state)
1435   {
1436   case STATE_HELLO_WAIT:
1437     socket->read_sequence_number = ntohl (ack_msg->sequence_number);
1438     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1439                 "%x: Read sequence number %u\n",
1440                 socket->our_id,
1441                 (unsigned int) socket->read_sequence_number);
1442     socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
1443     reply = generate_hello_ack_msg (socket);
1444     queue_message (socket,
1445                    &reply->header, 
1446                    &set_state_established, 
1447                    NULL);      
1448     return GNUNET_OK;
1449   case STATE_ESTABLISHED:
1450   case STATE_RECEIVE_CLOSE_WAIT:
1451     // call statistics (# ACKs ignored++)
1452     return GNUNET_OK;
1453   case STATE_INIT:
1454   default:
1455     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1456                 "%x: Server %x sent HELLO_ACK when in state %d\n", 
1457                 socket->our_id,
1458                 socket->other_peer,
1459                 socket->state);
1460     socket->state = STATE_CLOSED; // introduce STATE_ERROR?
1461     return GNUNET_SYSERR;
1462   }
1463
1464 }
1465
1466
1467 /**
1468  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1469  *
1470  * @param cls the socket (set from GNUNET_MESH_connect)
1471  * @param tunnel connection to the other end
1472  * @param tunnel_ctx this is NULL
1473  * @param sender who sent the message
1474  * @param message the actual message
1475  * @param atsi performance data for the connection
1476  * @return GNUNET_OK to keep the connection open,
1477  *         GNUNET_SYSERR to close it (signal serious error)
1478  */
1479 static int
1480 client_handle_reset (void *cls,
1481                      struct GNUNET_MESH_Tunnel *tunnel,
1482                      void **tunnel_ctx,
1483                      const struct GNUNET_PeerIdentity *sender,
1484                      const struct GNUNET_MessageHeader *message,
1485                      const struct GNUNET_ATS_Information*atsi)
1486 {
1487   struct GNUNET_STREAM_Socket *socket = cls;
1488
1489   return GNUNET_OK;
1490 }
1491
1492
1493 /**
1494  * Common message handler for handling TRANSMIT_CLOSE messages
1495  *
1496  * @param socket the socket through which the ack was received
1497  * @param tunnel connection to the other end
1498  * @param sender who sent the message
1499  * @param msg the transmit close message
1500  * @param atsi performance data for the connection
1501  * @return GNUNET_OK to keep the connection open,
1502  *         GNUNET_SYSERR to close it (signal serious error)
1503  */
1504 static int
1505 handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
1506                        struct GNUNET_MESH_Tunnel *tunnel,
1507                        const struct GNUNET_PeerIdentity *sender,
1508                        const struct GNUNET_STREAM_MessageHeader *msg,
1509                        const struct GNUNET_ATS_Information*atsi)
1510 {
1511   struct GNUNET_STREAM_MessageHeader *reply;
1512
1513   switch (socket->state)
1514     {
1515     case STATE_ESTABLISHED:
1516       socket->state = STATE_RECEIVE_CLOSED;
1517
1518       /* Send TRANSMIT_CLOSE_ACK */
1519       reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1520       reply->header.type = 
1521         htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
1522       reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1523       queue_message (socket, reply, NULL, NULL);
1524       break;
1525
1526     default:
1527       /* FIXME: Call statistics? */
1528       break;
1529     }
1530   return GNUNET_YES;
1531 }
1532
1533
1534 /**
1535  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1536  *
1537  * @param cls the socket (set from GNUNET_MESH_connect)
1538  * @param tunnel connection to the other end
1539  * @param tunnel_ctx this is NULL
1540  * @param sender who sent the message
1541  * @param message the actual message
1542  * @param atsi performance data for the connection
1543  * @return GNUNET_OK to keep the connection open,
1544  *         GNUNET_SYSERR to close it (signal serious error)
1545  */
1546 static int
1547 client_handle_transmit_close (void *cls,
1548                               struct GNUNET_MESH_Tunnel *tunnel,
1549                               void **tunnel_ctx,
1550                               const struct GNUNET_PeerIdentity *sender,
1551                               const struct GNUNET_MessageHeader *message,
1552                               const struct GNUNET_ATS_Information*atsi)
1553 {
1554   struct GNUNET_STREAM_Socket *socket = cls;
1555   
1556   return handle_transmit_close (socket,
1557                                 tunnel,
1558                                 sender,
1559                                 (struct GNUNET_STREAM_MessageHeader *)message,
1560                                 atsi);
1561 }
1562
1563
1564 /**
1565  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1566  *
1567  * @param cls the socket (set from GNUNET_MESH_connect)
1568  * @param tunnel connection to the other end
1569  * @param tunnel_ctx this is NULL
1570  * @param sender who sent the message
1571  * @param message the actual message
1572  * @param atsi performance data for the connection
1573  * @return GNUNET_OK to keep the connection open,
1574  *         GNUNET_SYSERR to close it (signal serious error)
1575  */
1576 static int
1577 client_handle_transmit_close_ack (void *cls,
1578                                   struct GNUNET_MESH_Tunnel *tunnel,
1579                                   void **tunnel_ctx,
1580                                   const struct GNUNET_PeerIdentity *sender,
1581                                   const struct GNUNET_MessageHeader *message,
1582                                   const struct GNUNET_ATS_Information*atsi)
1583 {
1584   struct GNUNET_STREAM_Socket *socket = cls;
1585
1586   return GNUNET_OK;
1587 }
1588
1589
1590 /**
1591  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1592  *
1593  * @param cls the socket (set from GNUNET_MESH_connect)
1594  * @param tunnel connection to the other end
1595  * @param tunnel_ctx this is NULL
1596  * @param sender who sent the message
1597  * @param message the actual message
1598  * @param atsi performance data for the connection
1599  * @return GNUNET_OK to keep the connection open,
1600  *         GNUNET_SYSERR to close it (signal serious error)
1601  */
1602 static int
1603 client_handle_receive_close (void *cls,
1604                              struct GNUNET_MESH_Tunnel *tunnel,
1605                              void **tunnel_ctx,
1606                              const struct GNUNET_PeerIdentity *sender,
1607                              const struct GNUNET_MessageHeader *message,
1608                              const struct GNUNET_ATS_Information*atsi)
1609 {
1610   struct GNUNET_STREAM_Socket *socket = cls;
1611
1612   return GNUNET_OK;
1613 }
1614
1615
1616 /**
1617  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1618  *
1619  * @param cls the socket (set from GNUNET_MESH_connect)
1620  * @param tunnel connection to the other end
1621  * @param tunnel_ctx this is NULL
1622  * @param sender who sent the message
1623  * @param message the actual message
1624  * @param atsi performance data for the connection
1625  * @return GNUNET_OK to keep the connection open,
1626  *         GNUNET_SYSERR to close it (signal serious error)
1627  */
1628 static int
1629 client_handle_receive_close_ack (void *cls,
1630                                  struct GNUNET_MESH_Tunnel *tunnel,
1631                                  void **tunnel_ctx,
1632                                  const struct GNUNET_PeerIdentity *sender,
1633                                  const struct GNUNET_MessageHeader *message,
1634                                  const struct GNUNET_ATS_Information*atsi)
1635 {
1636   struct GNUNET_STREAM_Socket *socket = cls;
1637
1638   return GNUNET_OK;
1639 }
1640
1641
1642 /**
1643  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1644  *
1645  * @param socket the socket
1646  * @param tunnel connection to the other end
1647  * @param sender who sent the message
1648  * @param message the actual message
1649  * @param atsi performance data for the connection
1650  * @return GNUNET_OK to keep the connection open,
1651  *         GNUNET_SYSERR to close it (signal serious error)
1652  */
1653 static int
1654 handle_close (struct GNUNET_STREAM_Socket *socket,
1655               struct GNUNET_MESH_Tunnel *tunnel,
1656               const struct GNUNET_PeerIdentity *sender,
1657               const struct GNUNET_STREAM_MessageHeader *message,
1658               const struct GNUNET_ATS_Information*atsi)
1659 {
1660   struct GNUNET_STREAM_MessageHeader *close_ack;
1661
1662   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1663               "%x: Received CLOSE from %x\n",
1664               socket->our_id,
1665               socket->other_peer);
1666   close_ack = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1667   close_ack->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1668   close_ack->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK);
1669   queue_message (socket,
1670                  close_ack,
1671                  &set_state_closed,
1672                  NULL);
1673   if (socket->state == STATE_CLOSED)
1674     return GNUNET_OK;
1675
1676   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1677   socket->receive_buffer = NULL;
1678   socket->receive_buffer_size = 0;
1679   return GNUNET_OK;
1680 }
1681
1682
1683 /**
1684  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1685  *
1686  * @param cls the socket (set from GNUNET_MESH_connect)
1687  * @param tunnel connection to the other end
1688  * @param tunnel_ctx this is NULL
1689  * @param sender who sent the message
1690  * @param message the actual message
1691  * @param atsi performance data for the connection
1692  * @return GNUNET_OK to keep the connection open,
1693  *         GNUNET_SYSERR to close it (signal serious error)
1694  */
1695 static int
1696 client_handle_close (void *cls,
1697                      struct GNUNET_MESH_Tunnel *tunnel,
1698                      void **tunnel_ctx,
1699                      const struct GNUNET_PeerIdentity *sender,
1700                      const struct GNUNET_MessageHeader *message,
1701                      const struct GNUNET_ATS_Information*atsi)
1702 {
1703   struct GNUNET_STREAM_Socket *socket = cls;
1704
1705   return handle_close (socket,
1706                        tunnel,
1707                        sender,
1708                        (const struct GNUNET_STREAM_MessageHeader *) message,
1709                        atsi);
1710 }
1711
1712
1713 /**
1714  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1715  *
1716  * @param socket the socket
1717  * @param tunnel connection to the other end
1718  * @param sender who sent the message
1719  * @param message the actual message
1720  * @param atsi performance data for the connection
1721  * @return GNUNET_OK to keep the connection open,
1722  *         GNUNET_SYSERR to close it (signal serious error)
1723  */
1724 static int
1725 handle_close_ack (struct GNUNET_STREAM_Socket *socket,
1726                   struct GNUNET_MESH_Tunnel *tunnel,
1727                   const struct GNUNET_PeerIdentity *sender,
1728                   const struct GNUNET_STREAM_MessageHeader *message,
1729                   const struct GNUNET_ATS_Information *atsi)
1730 {
1731   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
1732
1733   shutdown_handle = socket->shutdown_handle;
1734   switch (socket->state)
1735     {
1736     case STATE_CLOSE_WAIT:
1737       if ( (NULL == shutdown_handle) ||
1738            (SHUT_RDWR != shutdown_handle->operation) )
1739         {
1740           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1741                       "%x: Received CLOSE_ACK when shutdown handle is NULL or "
1742                       "not for SHUT_RDWR\n",
1743                       socket->our_id);
1744           return GNUNET_OK;
1745         }
1746
1747       if (GNUNET_SCHEDULER_NO_TASK
1748           != shutdown_handle->close_msg_retransmission_task_id)
1749         {
1750           GNUNET_SCHEDULER_cancel 
1751             (shutdown_handle->close_msg_retransmission_task_id);
1752           shutdown_handle->close_msg_retransmission_task_id =
1753             GNUNET_SCHEDULER_NO_TASK;
1754         }
1755
1756       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1757                   "%x: Received CLOSE_ACK from %x\n",
1758                   socket->our_id,
1759                   socket->other_peer);
1760       socket->state = STATE_CLOSED;
1761       if (NULL != shutdown_handle->completion_cb) /* Shutdown completion */
1762         shutdown_handle->completion_cb(shutdown_handle->completion_cls,
1763                                        SHUT_RDWR);
1764       GNUNET_free (shutdown_handle); /* Free shutdown handle */
1765       break;
1766     default:
1767       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1768                   "%x: Received CLOSE_ACK when in it not expected\n",
1769                   socket->our_id);
1770       break;
1771     }
1772   return GNUNET_OK;
1773 }
1774
1775
1776 /**
1777  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1778  *
1779  * @param cls the socket (set from GNUNET_MESH_connect)
1780  * @param tunnel connection to the other end
1781  * @param tunnel_ctx this is NULL
1782  * @param sender who sent the message
1783  * @param message the actual message
1784  * @param atsi performance data for the connection
1785  * @return GNUNET_OK to keep the connection open,
1786  *         GNUNET_SYSERR to close it (signal serious error)
1787  */
1788 static int
1789 client_handle_close_ack (void *cls,
1790                          struct GNUNET_MESH_Tunnel *tunnel,
1791                          void **tunnel_ctx,
1792                          const struct GNUNET_PeerIdentity *sender,
1793                          const struct GNUNET_MessageHeader *message,
1794                          const struct GNUNET_ATS_Information *atsi)
1795 {
1796   struct GNUNET_STREAM_Socket *socket = cls;
1797
1798   return handle_close_ack (socket,
1799                            tunnel,
1800                            sender,
1801                            (const struct GNUNET_STREAM_MessageHeader *) 
1802                            message,
1803                            atsi);
1804 }
1805
1806 /*****************************/
1807 /* Server's Message Handlers */
1808 /*****************************/
1809
1810 /**
1811  * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1812  *
1813  * @param cls the closure
1814  * @param tunnel connection to the other end
1815  * @param tunnel_ctx the socket
1816  * @param sender who sent the message
1817  * @param message the actual message
1818  * @param atsi performance data for the connection
1819  * @return GNUNET_OK to keep the connection open,
1820  *         GNUNET_SYSERR to close it (signal serious error)
1821  */
1822 static int
1823 server_handle_data (void *cls,
1824                     struct GNUNET_MESH_Tunnel *tunnel,
1825                     void **tunnel_ctx,
1826                     const struct GNUNET_PeerIdentity *sender,
1827                     const struct GNUNET_MessageHeader *message,
1828                     const struct GNUNET_ATS_Information*atsi)
1829 {
1830   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1831
1832   return handle_data (socket,
1833                       tunnel,
1834                       sender,
1835                       (const struct GNUNET_STREAM_DataMessage *)message,
1836                       atsi);
1837 }
1838
1839
1840 /**
1841  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
1842  *
1843  * @param cls the closure
1844  * @param tunnel connection to the other end
1845  * @param tunnel_ctx the socket
1846  * @param sender who sent the message
1847  * @param message the actual message
1848  * @param atsi performance data for the connection
1849  * @return GNUNET_OK to keep the connection open,
1850  *         GNUNET_SYSERR to close it (signal serious error)
1851  */
1852 static int
1853 server_handle_hello (void *cls,
1854                      struct GNUNET_MESH_Tunnel *tunnel,
1855                      void **tunnel_ctx,
1856                      const struct GNUNET_PeerIdentity *sender,
1857                      const struct GNUNET_MessageHeader *message,
1858                      const struct GNUNET_ATS_Information*atsi)
1859 {
1860   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1861   struct GNUNET_STREAM_HelloAckMessage *reply;
1862
1863   if (GNUNET_PEER_search (sender) != socket->other_peer)
1864     {
1865       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1866                   "%x: Received HELLO from non-confirming peer\n",
1867                   socket->our_id);
1868       return GNUNET_YES;
1869     }
1870
1871   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == 
1872                  ntohs (message->type));
1873   GNUNET_assert (socket->tunnel == tunnel);
1874   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1875               "%x: Received HELLO from %x\n", 
1876               socket->our_id,
1877               socket->other_peer);
1878
1879   if (STATE_INIT == socket->state)
1880     {
1881       reply = generate_hello_ack_msg (socket);
1882       queue_message (socket, 
1883                      &reply->header,
1884                      &set_state_hello_wait, 
1885                      NULL);
1886     }
1887   else
1888     {
1889       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1890                   "Client sent HELLO when in state %d\n", socket->state);
1891       /* FIXME: Send RESET? */
1892       
1893     }
1894   return GNUNET_OK;
1895 }
1896
1897
1898 /**
1899  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1900  *
1901  * @param cls the closure
1902  * @param tunnel connection to the other end
1903  * @param tunnel_ctx the socket
1904  * @param sender who sent the message
1905  * @param message the actual message
1906  * @param atsi performance data for the connection
1907  * @return GNUNET_OK to keep the connection open,
1908  *         GNUNET_SYSERR to close it (signal serious error)
1909  */
1910 static int
1911 server_handle_hello_ack (void *cls,
1912                          struct GNUNET_MESH_Tunnel *tunnel,
1913                          void **tunnel_ctx,
1914                          const struct GNUNET_PeerIdentity *sender,
1915                          const struct GNUNET_MessageHeader *message,
1916                          const struct GNUNET_ATS_Information*atsi)
1917 {
1918   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1919   const struct GNUNET_STREAM_HelloAckMessage *ack_message;
1920
1921   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK ==
1922                  ntohs (message->type));
1923   GNUNET_assert (socket->tunnel == tunnel);
1924   ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
1925   if (STATE_HELLO_WAIT == socket->state)
1926     {
1927       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1928                   "%x: Received HELLO_ACK from %x\n",
1929                   socket->our_id,
1930                   socket->other_peer);
1931       socket->read_sequence_number = ntohl (ack_message->sequence_number);
1932       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1933                   "%x: Read sequence number %u\n",
1934                   socket->our_id,
1935                   (unsigned int) socket->read_sequence_number);
1936       socket->receiver_window_available = 
1937         ntohl (ack_message->receiver_window_size);
1938       /* Attain ESTABLISHED state */
1939       set_state_established (NULL, socket);
1940     }
1941   else
1942     {
1943       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1944                   "Client sent HELLO_ACK when in state %d\n", socket->state);
1945       /* FIXME: Send RESET? */
1946       
1947     }
1948   return GNUNET_OK;
1949 }
1950
1951
1952 /**
1953  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1954  *
1955  * @param cls the closure
1956  * @param tunnel connection to the other end
1957  * @param tunnel_ctx the socket
1958  * @param sender who sent the message
1959  * @param message the actual message
1960  * @param atsi performance data for the connection
1961  * @return GNUNET_OK to keep the connection open,
1962  *         GNUNET_SYSERR to close it (signal serious error)
1963  */
1964 static int
1965 server_handle_reset (void *cls,
1966                      struct GNUNET_MESH_Tunnel *tunnel,
1967                      void **tunnel_ctx,
1968                      const struct GNUNET_PeerIdentity *sender,
1969                      const struct GNUNET_MessageHeader *message,
1970                      const struct GNUNET_ATS_Information*atsi)
1971 {
1972   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1973
1974   return GNUNET_OK;
1975 }
1976
1977
1978 /**
1979  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1980  *
1981  * @param cls the closure
1982  * @param tunnel connection to the other end
1983  * @param tunnel_ctx the socket
1984  * @param sender who sent the message
1985  * @param message the actual message
1986  * @param atsi performance data for the connection
1987  * @return GNUNET_OK to keep the connection open,
1988  *         GNUNET_SYSERR to close it (signal serious error)
1989  */
1990 static int
1991 server_handle_transmit_close (void *cls,
1992                               struct GNUNET_MESH_Tunnel *tunnel,
1993                               void **tunnel_ctx,
1994                               const struct GNUNET_PeerIdentity *sender,
1995                               const struct GNUNET_MessageHeader *message,
1996                               const struct GNUNET_ATS_Information*atsi)
1997 {
1998   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1999
2000   return handle_transmit_close (socket,
2001                                 tunnel,
2002                                 sender,
2003                                 (struct GNUNET_STREAM_MessageHeader *)message,
2004                                 atsi);
2005 }
2006
2007
2008 /**
2009  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
2010  *
2011  * @param cls the closure
2012  * @param tunnel connection to the other end
2013  * @param tunnel_ctx the socket
2014  * @param sender who sent the message
2015  * @param message the actual message
2016  * @param atsi performance data for the connection
2017  * @return GNUNET_OK to keep the connection open,
2018  *         GNUNET_SYSERR to close it (signal serious error)
2019  */
2020 static int
2021 server_handle_transmit_close_ack (void *cls,
2022                                   struct GNUNET_MESH_Tunnel *tunnel,
2023                                   void **tunnel_ctx,
2024                                   const struct GNUNET_PeerIdentity *sender,
2025                                   const struct GNUNET_MessageHeader *message,
2026                                   const struct GNUNET_ATS_Information*atsi)
2027 {
2028   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2029
2030   return GNUNET_OK;
2031 }
2032
2033
2034 /**
2035  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
2036  *
2037  * @param cls the closure
2038  * @param tunnel connection to the other end
2039  * @param tunnel_ctx the socket
2040  * @param sender who sent the message
2041  * @param message the actual message
2042  * @param atsi performance data for the connection
2043  * @return GNUNET_OK to keep the connection open,
2044  *         GNUNET_SYSERR to close it (signal serious error)
2045  */
2046 static int
2047 server_handle_receive_close (void *cls,
2048                              struct GNUNET_MESH_Tunnel *tunnel,
2049                              void **tunnel_ctx,
2050                              const struct GNUNET_PeerIdentity *sender,
2051                              const struct GNUNET_MessageHeader *message,
2052                              const struct GNUNET_ATS_Information*atsi)
2053 {
2054   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2055
2056   return GNUNET_OK;
2057 }
2058
2059
2060 /**
2061  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
2062  *
2063  * @param cls the closure
2064  * @param tunnel connection to the other end
2065  * @param tunnel_ctx the socket
2066  * @param sender who sent the message
2067  * @param message the actual message
2068  * @param atsi performance data for the connection
2069  * @return GNUNET_OK to keep the connection open,
2070  *         GNUNET_SYSERR to close it (signal serious error)
2071  */
2072 static int
2073 server_handle_receive_close_ack (void *cls,
2074                                  struct GNUNET_MESH_Tunnel *tunnel,
2075                                  void **tunnel_ctx,
2076                                  const struct GNUNET_PeerIdentity *sender,
2077                                  const struct GNUNET_MessageHeader *message,
2078                                  const struct GNUNET_ATS_Information*atsi)
2079 {
2080   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2081
2082   return GNUNET_OK;
2083 }
2084
2085
2086 /**
2087  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
2088  *
2089  * @param cls the listen socket (from GNUNET_MESH_connect in
2090  *          GNUNET_STREAM_listen) 
2091  * @param tunnel connection to the other end
2092  * @param tunnel_ctx the socket
2093  * @param sender who sent the message
2094  * @param message the actual message
2095  * @param atsi performance data for the connection
2096  * @return GNUNET_OK to keep the connection open,
2097  *         GNUNET_SYSERR to close it (signal serious error)
2098  */
2099 static int
2100 server_handle_close (void *cls,
2101                      struct GNUNET_MESH_Tunnel *tunnel,
2102                      void **tunnel_ctx,
2103                      const struct GNUNET_PeerIdentity *sender,
2104                      const struct GNUNET_MessageHeader *message,
2105                      const struct GNUNET_ATS_Information*atsi)
2106 {
2107   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2108   
2109   return handle_close (socket,
2110                        tunnel,
2111                        sender,
2112                        (const struct GNUNET_STREAM_MessageHeader *) message,
2113                        atsi);
2114 }
2115
2116
2117 /**
2118  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
2119  *
2120  * @param cls the closure
2121  * @param tunnel connection to the other end
2122  * @param tunnel_ctx the socket
2123  * @param sender who sent the message
2124  * @param message the actual message
2125  * @param atsi performance data for the connection
2126  * @return GNUNET_OK to keep the connection open,
2127  *         GNUNET_SYSERR to close it (signal serious error)
2128  */
2129 static int
2130 server_handle_close_ack (void *cls,
2131                          struct GNUNET_MESH_Tunnel *tunnel,
2132                          void **tunnel_ctx,
2133                          const struct GNUNET_PeerIdentity *sender,
2134                          const struct GNUNET_MessageHeader *message,
2135                          const struct GNUNET_ATS_Information*atsi)
2136 {
2137   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2138
2139   return handle_close_ack (socket,
2140                            tunnel,
2141                            sender,
2142                            (const struct GNUNET_STREAM_MessageHeader *) message,
2143                            atsi);
2144 }
2145
2146
2147 /**
2148  * Message Handler for mesh
2149  *
2150  * @param socket the socket through which the ack was received
2151  * @param tunnel connection to the other end
2152  * @param sender who sent the message
2153  * @param ack the acknowledgment message
2154  * @param atsi performance data for the connection
2155  * @return GNUNET_OK to keep the connection open,
2156  *         GNUNET_SYSERR to close it (signal serious error)
2157  */
2158 static int
2159 handle_ack (struct GNUNET_STREAM_Socket *socket,
2160             struct GNUNET_MESH_Tunnel *tunnel,
2161             const struct GNUNET_PeerIdentity *sender,
2162             const struct GNUNET_STREAM_AckMessage *ack,
2163             const struct GNUNET_ATS_Information*atsi)
2164 {
2165   unsigned int packet;
2166   int need_retransmission;
2167   
2168
2169   if (GNUNET_PEER_search (sender) != socket->other_peer)
2170     {
2171       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2172                   "%x: Received ACK from non-confirming peer\n",
2173                   socket->our_id);
2174       return GNUNET_YES;
2175     }
2176
2177   switch (socket->state)
2178     {
2179     case (STATE_ESTABLISHED):
2180       if (NULL == socket->write_handle)
2181         {
2182           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2183                       "%x: Received DATA_ACK when write_handle is NULL\n",
2184                       socket->our_id);
2185           return GNUNET_OK;
2186         }
2187       /* FIXME: increment in the base sequence number is breaking current flow
2188        */
2189       if (!((socket->write_sequence_number 
2190              - ntohl (ack->base_sequence_number)) < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
2191         {
2192           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2193                       "%x: Received DATA_ACK with unexpected base sequence "
2194                       "number\n",
2195                       socket->our_id);
2196           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2197                       "%x: Current write sequence: %u; Ack's base sequence: %u\n",
2198                       socket->our_id,
2199                       socket->write_sequence_number,
2200                       ntohl (ack->base_sequence_number));
2201           return GNUNET_OK;
2202         }
2203       /* FIXME: include the case when write_handle is cancelled - ignore the 
2204          acks */
2205
2206       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2207                   "%x: Received DATA_ACK from %x\n",
2208                   socket->our_id,
2209                   socket->other_peer);
2210       
2211       /* Cancel the retransmission task */
2212       if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
2213         {
2214           GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
2215           socket->retransmission_timeout_task_id = 
2216             GNUNET_SCHEDULER_NO_TASK;
2217         }
2218
2219       for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2220         {
2221           if (NULL == socket->write_handle->messages[packet]) break;
2222           if (ntohl (ack->base_sequence_number)
2223               >= ntohl (socket->write_handle->messages[packet]->sequence_number))
2224             ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
2225                                   packet,
2226                                   GNUNET_YES);
2227           else
2228             if (GNUNET_YES == 
2229                 ackbitmap_is_bit_set (&socket->write_handle->ack_bitmap,
2230                                       ntohl (socket->write_handle->messages[packet]->sequence_number)
2231                                       - ntohl (ack->base_sequence_number)))
2232               ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
2233                                     packet,
2234                                     GNUNET_YES);
2235         }
2236
2237       /* Update the receive window remaining
2238        FIXME : Should update with the value from a data ack with greater
2239        sequence number */
2240       socket->receiver_window_available = 
2241         ntohl (ack->receive_window_remaining);
2242
2243       /* Check if we have received all acknowledgements */
2244       need_retransmission = GNUNET_NO;
2245       for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2246         {
2247           if (NULL == socket->write_handle->messages[packet]) break;
2248           if (GNUNET_YES != ackbitmap_is_bit_set 
2249               (&socket->write_handle->ack_bitmap,packet))
2250             {
2251               need_retransmission = GNUNET_YES;
2252               break;
2253             }
2254         }
2255       if (GNUNET_YES == need_retransmission)
2256         {
2257           write_data (socket);
2258         }
2259       else      /* We have to call the write continuation callback now */
2260         {
2261           /* Free the packets */
2262           for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2263             {
2264               GNUNET_free_non_null (socket->write_handle->messages[packet]);
2265             }
2266           if (NULL != socket->write_handle->write_cont)
2267             socket->write_handle->write_cont
2268               (socket->write_handle->write_cont_cls,
2269                socket->status,
2270                socket->write_handle->size);
2271           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2272                       "%x: Write completion callback completed\n",
2273                       socket->our_id);
2274           /* We are done with the write handle - Freeing it */
2275           GNUNET_free (socket->write_handle);
2276           socket->write_handle = NULL;
2277         }
2278       break;
2279     default:
2280       break;
2281     }
2282   return GNUNET_OK;
2283 }
2284
2285
2286 /**
2287  * Message Handler for mesh
2288  *
2289  * @param cls the 'struct GNUNET_STREAM_Socket'
2290  * @param tunnel connection to the other end
2291  * @param tunnel_ctx unused
2292  * @param sender who sent the message
2293  * @param message the actual message
2294  * @param atsi performance data for the connection
2295  * @return GNUNET_OK to keep the connection open,
2296  *         GNUNET_SYSERR to close it (signal serious error)
2297  */
2298 static int
2299 client_handle_ack (void *cls,
2300                    struct GNUNET_MESH_Tunnel *tunnel,
2301                    void **tunnel_ctx,
2302                    const struct GNUNET_PeerIdentity *sender,
2303                    const struct GNUNET_MessageHeader *message,
2304                    const struct GNUNET_ATS_Information*atsi)
2305 {
2306   struct GNUNET_STREAM_Socket *socket = cls;
2307   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2308  
2309   return handle_ack (socket, tunnel, sender, ack, atsi);
2310 }
2311
2312
2313 /**
2314  * Message Handler for mesh
2315  *
2316  * @param cls the server's listen socket
2317  * @param tunnel connection to the other end
2318  * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
2319  * @param sender who sent the message
2320  * @param message the actual message
2321  * @param atsi performance data for the connection
2322  * @return GNUNET_OK to keep the connection open,
2323  *         GNUNET_SYSERR to close it (signal serious error)
2324  */
2325 static int
2326 server_handle_ack (void *cls,
2327                    struct GNUNET_MESH_Tunnel *tunnel,
2328                    void **tunnel_ctx,
2329                    const struct GNUNET_PeerIdentity *sender,
2330                    const struct GNUNET_MessageHeader *message,
2331                    const struct GNUNET_ATS_Information*atsi)
2332 {
2333   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2334   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2335  
2336   return handle_ack (socket, tunnel, sender, ack, atsi);
2337 }
2338
2339
2340 /**
2341  * For client message handlers, the stream socket is in the
2342  * closure argument.
2343  */
2344 static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
2345   {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2346   {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2347    sizeof (struct GNUNET_STREAM_AckMessage) },
2348   {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2349    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2350   {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2351    sizeof (struct GNUNET_STREAM_MessageHeader)},
2352   {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2353    sizeof (struct GNUNET_STREAM_MessageHeader)},
2354   {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2355    sizeof (struct GNUNET_STREAM_MessageHeader)},
2356   {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2357    sizeof (struct GNUNET_STREAM_MessageHeader)},
2358   {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2359    sizeof (struct GNUNET_STREAM_MessageHeader)},
2360   {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2361    sizeof (struct GNUNET_STREAM_MessageHeader)},
2362   {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2363    sizeof (struct GNUNET_STREAM_MessageHeader)},
2364   {NULL, 0, 0}
2365 };
2366
2367
2368 /**
2369  * For server message handlers, the stream socket is in the
2370  * tunnel context, and the listen socket in the closure argument.
2371  */
2372 static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
2373   {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2374   {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2375    sizeof (struct GNUNET_STREAM_AckMessage) },
2376   {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 
2377    sizeof (struct GNUNET_STREAM_MessageHeader)},
2378   {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2379    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2380   {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2381    sizeof (struct GNUNET_STREAM_MessageHeader)},
2382   {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2383    sizeof (struct GNUNET_STREAM_MessageHeader)},
2384   {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2385    sizeof (struct GNUNET_STREAM_MessageHeader)},
2386   {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2387    sizeof (struct GNUNET_STREAM_MessageHeader)},
2388   {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2389    sizeof (struct GNUNET_STREAM_MessageHeader)},
2390   {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2391    sizeof (struct GNUNET_STREAM_MessageHeader)},
2392   {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2393    sizeof (struct GNUNET_STREAM_MessageHeader)},
2394   {NULL, 0, 0}
2395 };
2396
2397
2398 /**
2399  * Function called when our target peer is connected to our tunnel
2400  *
2401  * @param cls the socket for which this tunnel is created
2402  * @param peer the peer identity of the target
2403  * @param atsi performance data for the connection
2404  */
2405 static void
2406 mesh_peer_connect_callback (void *cls,
2407                             const struct GNUNET_PeerIdentity *peer,
2408                             const struct GNUNET_ATS_Information * atsi)
2409 {
2410   struct GNUNET_STREAM_Socket *socket = cls;
2411   struct GNUNET_STREAM_MessageHeader *message;
2412   GNUNET_PEER_Id connected_peer;
2413
2414   connected_peer = GNUNET_PEER_search (peer);
2415   
2416   if (connected_peer != socket->other_peer)
2417     {
2418       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2419                   "%x: A peer which is not our target has connected",
2420                   "to our tunnel\n",
2421                   socket->our_id);
2422       return;
2423     }
2424   
2425   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2426               "%x: Target peer %x connected\n", 
2427               socket->our_id,
2428               connected_peer);
2429   
2430   /* Set state to INIT */
2431   socket->state = STATE_INIT;
2432
2433   /* Send HELLO message */
2434   message = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
2435   message->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
2436   message->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
2437   queue_message (socket,
2438                  message,
2439                  &set_state_hello_wait,
2440                  NULL);
2441
2442   /* Call open callback */
2443   if (NULL == socket->open_cb)
2444     {
2445       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2446                   "STREAM_open callback is NULL\n");
2447     }
2448 }
2449
2450
2451 /**
2452  * Function called when our target peer is disconnected from our tunnel
2453  *
2454  * @param cls the socket associated which this tunnel
2455  * @param peer the peer identity of the target
2456  */
2457 static void
2458 mesh_peer_disconnect_callback (void *cls,
2459                                const struct GNUNET_PeerIdentity *peer)
2460 {
2461
2462 }
2463
2464
2465 /**
2466  * Method called whenever a peer creates a tunnel to us
2467  *
2468  * @param cls closure
2469  * @param tunnel new handle to the tunnel
2470  * @param initiator peer that started the tunnel
2471  * @param atsi performance information for the tunnel
2472  * @return initial tunnel context for the tunnel
2473  *         (can be NULL -- that's not an error)
2474  */
2475 static void *
2476 new_tunnel_notify (void *cls,
2477                    struct GNUNET_MESH_Tunnel *tunnel,
2478                    const struct GNUNET_PeerIdentity *initiator,
2479                    const struct GNUNET_ATS_Information *atsi)
2480 {
2481   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2482   struct GNUNET_STREAM_Socket *socket;
2483
2484   /* FIXME: If a tunnel is already created, we should not accept new tunnels
2485      from the same peer again until the socket is closed */
2486
2487   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2488   socket->other_peer = GNUNET_PEER_intern (initiator);
2489   socket->tunnel = tunnel;
2490   socket->session_id = 0;       /* FIXME */
2491   socket->state = STATE_INIT;
2492   socket->lsocket = lsocket;
2493   socket->our_id = lsocket->our_id;
2494   
2495   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2496               "%x: Peer %x initiated tunnel to us\n", 
2497               socket->our_id,
2498               socket->other_peer);
2499   
2500   /* FIXME: Copy MESH handle from lsocket to socket */
2501   
2502   return socket;
2503 }
2504
2505
2506 /**
2507  * Function called whenever an inbound tunnel is destroyed.  Should clean up
2508  * any associated state.  This function is NOT called if the client has
2509  * explicitly asked for the tunnel to be destroyed using
2510  * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
2511  * the tunnel.
2512  *
2513  * @param cls closure (set from GNUNET_MESH_connect)
2514  * @param tunnel connection to the other end (henceforth invalid)
2515  * @param tunnel_ctx place where local state associated
2516  *                   with the tunnel is stored
2517  */
2518 static void 
2519 tunnel_cleaner (void *cls,
2520                 const struct GNUNET_MESH_Tunnel *tunnel,
2521                 void *tunnel_ctx)
2522 {
2523   struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
2524   
2525   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2526               "%x: Peer %x has terminated connection abruptly\n",
2527               socket->our_id,
2528               socket->other_peer);
2529
2530   socket->status = GNUNET_STREAM_SHUTDOWN;
2531
2532   /* Clear Transmit handles */
2533   if (NULL != socket->transmit_handle)
2534     {
2535       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2536       socket->transmit_handle = NULL;
2537     }
2538   socket->tunnel = NULL;
2539 }
2540
2541
2542 /*****************/
2543 /* API functions */
2544 /*****************/
2545
2546
2547 /**
2548  * Tries to open a stream to the target peer
2549  *
2550  * @param cfg configuration to use
2551  * @param target the target peer to which the stream has to be opened
2552  * @param app_port the application port number which uniquely identifies this
2553  *            stream
2554  * @param open_cb this function will be called after stream has be established 
2555  * @param open_cb_cls the closure for open_cb
2556  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
2557  * @return if successful it returns the stream socket; NULL if stream cannot be
2558  *         opened 
2559  */
2560 struct GNUNET_STREAM_Socket *
2561 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
2562                     const struct GNUNET_PeerIdentity *target,
2563                     GNUNET_MESH_ApplicationType app_port,
2564                     GNUNET_STREAM_OpenCallback open_cb,
2565                     void *open_cb_cls,
2566                     ...)
2567 {
2568   struct GNUNET_STREAM_Socket *socket;
2569   struct GNUNET_PeerIdentity own_peer_id;
2570   enum GNUNET_STREAM_Option option;
2571   va_list vargs;                /* Variable arguments */
2572
2573   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2574               "%s\n", __func__);
2575
2576   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2577   socket->other_peer = GNUNET_PEER_intern (target);
2578   socket->open_cb = open_cb;
2579   socket->open_cls = open_cb_cls;
2580   GNUNET_TESTING_get_peer_identity (cfg, &own_peer_id);
2581   socket->our_id = GNUNET_PEER_intern (&own_peer_id);
2582   
2583   /* Set defaults */
2584   socket->retransmit_timeout = 
2585     GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
2586
2587   va_start (vargs, open_cb_cls); /* Parse variable args */
2588   do {
2589     option = va_arg (vargs, enum GNUNET_STREAM_Option);
2590     switch (option)
2591       {
2592       case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
2593         /* Expect struct GNUNET_TIME_Relative */
2594         socket->retransmit_timeout = va_arg (vargs,
2595                                              struct GNUNET_TIME_Relative);
2596         break;
2597       case GNUNET_STREAM_OPTION_END:
2598         break;
2599       }
2600   } while (GNUNET_STREAM_OPTION_END != option);
2601   va_end (vargs);               /* End of variable args parsing */
2602   socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
2603                                       10,  /* QUEUE size as parameter? */
2604                                       socket, /* cls */
2605                                       NULL, /* No inbound tunnel handler */
2606                                       &tunnel_cleaner, /* FIXME: not required? */
2607                                       client_message_handlers,
2608                                       &app_port); /* We don't get inbound tunnels */
2609   if (NULL == socket->mesh)   /* Fail if we cannot connect to mesh */
2610     {
2611       GNUNET_free (socket);
2612       return NULL;
2613     }
2614
2615   /* Now create the mesh tunnel to target */
2616   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2617               "Creating MESH Tunnel\n");
2618   socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
2619                                               NULL, /* Tunnel context */
2620                                               &mesh_peer_connect_callback,
2621                                               &mesh_peer_disconnect_callback,
2622                                               socket);
2623   GNUNET_assert (NULL != socket->tunnel);
2624   GNUNET_MESH_peer_request_connect_add (socket->tunnel,
2625                                         target);
2626   
2627   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2628               "%s() END\n", __func__);
2629   return socket;
2630 }
2631
2632
2633 /**
2634  * Shutdown the stream for reading or writing (similar to man 2 shutdown).
2635  *
2636  * @param socket the stream socket
2637  * @param operation SHUT_RD, SHUT_WR or SHUT_RDWR
2638  * @param completion_cb the callback that will be called upon successful
2639  *          shutdown of given operation
2640  * @param completion_cls the closure for the completion callback
2641  * @return the shutdown handle
2642  */
2643 struct GNUNET_STREAM_ShutdownHandle *
2644 GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
2645                         int operation,
2646                         GNUNET_STREAM_ShutdownCompletion completion_cb,
2647                         void *completion_cls)
2648 {
2649   struct GNUNET_STREAM_ShutdownHandle *handle;
2650   struct GNUNET_STREAM_MessageHeader *msg;
2651   
2652   GNUNET_assert (NULL == socket->shutdown_handle);
2653
2654   handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ShutdownHandle));
2655   handle->socket = socket;
2656   handle->completion_cb = completion_cb;
2657   handle->completion_cls = completion_cls;
2658   socket->shutdown_handle = handle;
2659
2660   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
2661   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
2662   switch (operation)
2663     {
2664     case SHUT_RD:
2665       handle->operation = SHUT_RD;
2666       if (NULL != socket->read_handle)
2667         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2668                     "Existing read handle should be cancelled before shutting"
2669                     " down reading\n");
2670       msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
2671       queue_message (socket,
2672                      msg,
2673                      &set_state_receive_close_wait,
2674                      NULL);
2675       break;
2676     case SHUT_WR:
2677       handle->operation = SHUT_WR;
2678       if (NULL != socket->write_handle)
2679         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2680                     "Existing write handle should be cancelled before shutting"
2681                     " down writing\n");
2682       msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
2683       queue_message (socket,
2684                      msg,
2685                      &set_state_transmit_close_wait,
2686                      NULL);
2687       break;
2688     case SHUT_RDWR:
2689       handle->operation = SHUT_RDWR;
2690       if (NULL != socket->write_handle)
2691         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2692                     "Existing write handle should be cancelled before shutting"
2693                     " down writing\n");
2694       if (NULL != socket->read_handle)
2695         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2696                     "Existing read handle should be cancelled before shutting"
2697                     " down reading\n");
2698       msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
2699       queue_message (socket,
2700                      msg,
2701                      &set_state_close_wait,
2702                      NULL);
2703       break;
2704     default:
2705       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2706                   "GNUNET_STREAM_shutdown called with invalid value for "
2707                   "parameter operation -- Ignoring\n");
2708       GNUNET_free (msg);
2709       GNUNET_free (handle);
2710       return NULL;
2711     }
2712   handle->close_msg_retransmission_task_id =
2713     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
2714                                   &close_msg_retransmission_task,
2715                                   handle);
2716   return handle;
2717 }
2718
2719
2720 /**
2721  * Cancels a pending shutdown
2722  *
2723  * @param handle the shutdown handle returned from GNUNET_STREAM_shutdown
2724  */
2725 void
2726 GNUNET_STREAM_shutdown_cancel (struct GNUNET_STREAM_ShutdownHandle *handle)
2727 {
2728   if (GNUNET_SCHEDULER_NO_TASK != handle->close_msg_retransmission_task_id)
2729     GNUNET_SCHEDULER_cancel (handle->close_msg_retransmission_task_id);
2730   GNUNET_free (handle);
2731   return;
2732 }
2733
2734
2735 /**
2736  * Closes the stream
2737  *
2738  * @param socket the stream socket
2739  */
2740 void
2741 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
2742 {
2743   struct MessageQueue *head;
2744
2745   GNUNET_break (NULL == socket->read_handle);
2746   GNUNET_break (NULL == socket->write_handle);
2747
2748   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
2749     {
2750       /* socket closed with read task pending!? */
2751       GNUNET_break (0);
2752       GNUNET_SCHEDULER_cancel (socket->read_task_id);
2753       socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
2754     }
2755   
2756   /* Terminate the ack'ing tasks if they are still present */
2757   if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK)
2758     {
2759       GNUNET_SCHEDULER_cancel (socket->ack_task_id);
2760       socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
2761     }
2762
2763   /* Clear Transmit handles */
2764   if (NULL != socket->transmit_handle)
2765     {
2766       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2767       socket->transmit_handle = NULL;
2768     }
2769
2770   /* Clear existing message queue */
2771   while (NULL != (head = socket->queue_head)) {
2772     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
2773                                  socket->queue_tail,
2774                                  head);
2775     GNUNET_free (head->message);
2776     GNUNET_free (head);
2777   }
2778
2779   /* Close associated tunnel */
2780   if (NULL != socket->tunnel)
2781     {
2782       GNUNET_MESH_tunnel_destroy (socket->tunnel);
2783       socket->tunnel = NULL;
2784     }
2785
2786   /* Close mesh connection */
2787   if (NULL != socket->mesh && NULL == socket->lsocket)
2788     {
2789       GNUNET_MESH_disconnect (socket->mesh);
2790       socket->mesh = NULL;
2791     }
2792   
2793   /* Release receive buffer */
2794   if (NULL != socket->receive_buffer)
2795     {
2796       GNUNET_free (socket->receive_buffer);
2797     }
2798
2799   GNUNET_free (socket);
2800 }
2801
2802
2803 /**
2804  * Listens for stream connections for a specific application ports
2805  *
2806  * @param cfg the configuration to use
2807  * @param app_port the application port for which new streams will be accepted
2808  * @param listen_cb this function will be called when a peer tries to establish
2809  *            a stream with us
2810  * @param listen_cb_cls closure for listen_cb
2811  * @return listen socket, NULL for any error
2812  */
2813 struct GNUNET_STREAM_ListenSocket *
2814 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
2815                       GNUNET_MESH_ApplicationType app_port,
2816                       GNUNET_STREAM_ListenCallback listen_cb,
2817                       void *listen_cb_cls)
2818 {
2819   /* FIXME: Add variable args for passing configration options? */
2820   struct GNUNET_STREAM_ListenSocket *lsocket;
2821   struct GNUNET_PeerIdentity our_peer_id;
2822
2823   lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
2824   lsocket->port = app_port;
2825   lsocket->listen_cb = listen_cb;
2826   lsocket->listen_cb_cls = listen_cb_cls;
2827   GNUNET_TESTING_get_peer_identity (cfg, &our_peer_id);
2828   lsocket->our_id = GNUNET_PEER_intern (&our_peer_id);
2829   lsocket->mesh = GNUNET_MESH_connect (cfg,
2830                                        10, /* FIXME: QUEUE size as parameter? */
2831                                        lsocket, /* Closure */
2832                                        &new_tunnel_notify,
2833                                        &tunnel_cleaner,
2834                                        server_message_handlers,
2835                                        &app_port);
2836   GNUNET_assert (NULL != lsocket->mesh);
2837   return lsocket;
2838 }
2839
2840
2841 /**
2842  * Closes the listen socket
2843  *
2844  * @param lsocket the listen socket
2845  */
2846 void
2847 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
2848 {
2849   /* Close MESH connection */
2850   GNUNET_assert (NULL != lsocket->mesh);
2851   GNUNET_MESH_disconnect (lsocket->mesh);
2852   
2853   GNUNET_free (lsocket);
2854 }
2855
2856
2857 /**
2858  * Tries to write the given data to the stream
2859  *
2860  * @param socket the socket representing a stream
2861  * @param data the data buffer from where the data is written into the stream
2862  * @param size the number of bytes to be written from the data buffer
2863  * @param timeout the timeout period
2864  * @param write_cont the function to call upon writing some bytes into the stream
2865  * @param write_cont_cls the closure
2866  * @return handle to cancel the operation
2867  */
2868 struct GNUNET_STREAM_IOWriteHandle *
2869 GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
2870                      const void *data,
2871                      size_t size,
2872                      struct GNUNET_TIME_Relative timeout,
2873                      GNUNET_STREAM_CompletionContinuation write_cont,
2874                      void *write_cont_cls)
2875 {
2876   unsigned int num_needed_packets;
2877   unsigned int packet;
2878   struct GNUNET_STREAM_IOWriteHandle *io_handle;
2879   uint32_t packet_size;
2880   uint32_t payload_size;
2881   struct GNUNET_STREAM_DataMessage *data_msg;
2882   const void *sweep;
2883   struct GNUNET_TIME_Relative ack_deadline;
2884
2885   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2886               "%s\n", __func__);
2887
2888   /* Return NULL if there is already a write request pending */
2889   if (NULL != socket->write_handle)
2890   {
2891     GNUNET_break (0);
2892     return NULL;
2893   }
2894   if (!((STATE_ESTABLISHED == socket->state)
2895         || (STATE_RECEIVE_CLOSE_WAIT == socket->state)
2896         || (STATE_RECEIVE_CLOSED == socket->state)))
2897     {
2898       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2899                   "%x: Attempting to write on a closed (OR) not-yet-established"
2900                   "stream\n",
2901                   socket->our_id);
2902       return NULL;
2903     } 
2904   if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size)
2905     size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH  * max_payload_size;
2906   num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size;
2907   io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
2908   io_handle->socket = socket;
2909   io_handle->write_cont = write_cont;
2910   io_handle->write_cont_cls = write_cont_cls;
2911   io_handle->size = size;
2912   sweep = data;
2913   /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
2914      determined from RTT */
2915   ack_deadline = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
2916   /* Divide the given buffer into packets for sending */
2917   for (packet=0; packet < num_needed_packets; packet++)
2918     {
2919       if ((packet + 1) * max_payload_size < size) 
2920         {
2921           payload_size = max_payload_size;
2922           packet_size = MAX_PACKET_SIZE;
2923         }
2924       else 
2925         {
2926           payload_size = size - packet * max_payload_size;
2927           packet_size =  payload_size + sizeof (struct
2928                                                 GNUNET_STREAM_DataMessage); 
2929         }
2930       io_handle->messages[packet] = GNUNET_malloc (packet_size);
2931       io_handle->messages[packet]->header.header.size = htons (packet_size);
2932       io_handle->messages[packet]->header.header.type =
2933         htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
2934       io_handle->messages[packet]->sequence_number =
2935         htonl (socket->write_sequence_number++);
2936       io_handle->messages[packet]->offset = htonl (socket->write_offset);
2937
2938       /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
2939          determined from RTT */
2940       io_handle->messages[packet]->ack_deadline =
2941         GNUNET_TIME_relative_hton (ack_deadline);
2942       data_msg = io_handle->messages[packet];
2943       /* Copy data from given buffer to the packet */
2944       memcpy (&data_msg[1],
2945               sweep,
2946               payload_size);
2947       sweep += payload_size;
2948       socket->write_offset += payload_size;
2949     }
2950   socket->write_handle = io_handle;
2951   write_data (socket);
2952
2953   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2954               "%s() END\n", __func__);
2955
2956   return io_handle;
2957 }
2958
2959
2960 /**
2961  * Tries to read data from the stream
2962  *
2963  * @param socket the socket representing a stream
2964  * @param timeout the timeout period
2965  * @param proc function to call with data (once only)
2966  * @param proc_cls the closure for proc
2967  * @return handle to cancel the operation
2968  */
2969 struct GNUNET_STREAM_IOReadHandle *
2970 GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
2971                     struct GNUNET_TIME_Relative timeout,
2972                     GNUNET_STREAM_DataProcessor proc,
2973                     void *proc_cls)
2974 {
2975   struct GNUNET_STREAM_IOReadHandle *read_handle;
2976   
2977   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2978               "%x: %s()\n", 
2979               socket->our_id,
2980               __func__);
2981
2982   /* Return NULL if there is already a read handle; the user has to cancel that
2983   first before continuing or has to wait until it is completed */
2984   if (NULL != socket->read_handle) return NULL;
2985
2986   GNUNET_assert (NULL != proc);
2987
2988   read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
2989   read_handle->proc = proc;
2990   read_handle->proc_cls = proc_cls;
2991   socket->read_handle = read_handle;
2992
2993   /* Check if we have a packet at bitmap 0 */
2994   if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
2995                                           0))
2996     {
2997       socket->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
2998                                                        socket);
2999    
3000     }
3001   
3002   /* Setup the read timeout task */
3003   socket->read_io_timeout_task_id =
3004     GNUNET_SCHEDULER_add_delayed (timeout,
3005                                   &read_io_timeout,
3006                                   socket);
3007   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3008               "%x: %s() END\n",
3009               socket->our_id,
3010               __func__);
3011   return read_handle;
3012 }
3013
3014
3015 /**
3016  * Cancel pending write operation.
3017  *
3018  * @param ioh handle to operation to cancel
3019  */
3020 void
3021 GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
3022 {
3023   struct GNUNET_STREAM_Socket *socket = ioh->socket;
3024   unsigned int packet;
3025
3026   GNUNET_assert (NULL != socket->write_handle);
3027   GNUNET_assert (socket->write_handle == ioh);
3028
3029   if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
3030     {
3031       GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
3032       socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
3033     }
3034
3035   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
3036     {
3037       if (NULL == ioh->messages[packet]) break;
3038       GNUNET_free (ioh->messages[packet]);
3039     }
3040       
3041   GNUNET_free (socket->write_handle);
3042   socket->write_handle = NULL;
3043   return;
3044 }
3045
3046
3047 /**
3048  * Cancel pending read operation.
3049  *
3050  * @param ioh handle to operation to cancel
3051  */
3052 void
3053 GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh)
3054 {
3055   return;
3056 }