ef0065b22e0503a11a85533e2cf1b27b207514b0
[oweals/gnunet.git] / src / stream / stream_api.c
1 /*
2   This file is part of GNUnet.
3   (C) 2012 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /* TODO:
22  *
23  * Checks for matching the sender and socket->other_peer in server
24  * message handlers  
25  *
26  * Decrement PEER intern count during socket close and listen close to free the
27  * memory used for PEER interning
28  *
29  * Add code for write io timeout
30  *
31  * Include retransmission for control messages
32  **/
33
34 /**
35  * @file stream/stream_api.c
36  * @brief Implementation of the stream library
37  * @author Sree Harsha Totakura
38  */
39
40
41 #include "platform.h"
42 #include "gnunet_common.h"
43 #include "gnunet_crypto_lib.h"
44 #include "gnunet_stream_lib.h"
45 #include "gnunet_testing_lib.h"
46 #include "stream_protocol.h"
47
48
49 /**
50  * The maximum packet size of a stream packet
51  */
52 #define MAX_PACKET_SIZE 64000
53
54 /**
55  * Receive buffer
56  */
57 #define RECEIVE_BUFFER_SIZE 4096000
58
59 /**
60  * The maximum payload a data message packet can carry
61  */
62 static size_t max_payload_size = 
63   MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
64
65 /**
66  * states in the Protocol
67  */
68 enum State
69   {
70     /**
71      * Client initialization state
72      */
73     STATE_INIT,
74
75     /**
76      * Listener initialization state 
77      */
78     STATE_LISTEN,
79
80     /**
81      * Pre-connection establishment state
82      */
83     STATE_HELLO_WAIT,
84
85     /**
86      * State where a connection has been established
87      */
88     STATE_ESTABLISHED,
89
90     /**
91      * State where the socket is closed on our side and waiting to be ACK'ed
92      */
93     STATE_RECEIVE_CLOSE_WAIT,
94
95     /**
96      * State where the socket is closed for reading
97      */
98     STATE_RECEIVE_CLOSED,
99
100     /**
101      * State where the socket is closed on our side and waiting to be ACK'ed
102      */
103     STATE_TRANSMIT_CLOSE_WAIT,
104
105     /**
106      * State where the socket is closed for writing
107      */
108     STATE_TRANSMIT_CLOSED,
109
110     /**
111      * State where the socket is closed on our side and waiting to be ACK'ed
112      */
113     STATE_CLOSE_WAIT,
114
115     /**
116      * State where the socket is closed
117      */
118     STATE_CLOSED 
119   };
120
121
122 /**
123  * Functions of this type are called when a message is written
124  *
125  * @param cls the closure from queue_message
126  * @param socket the socket the written message was bound to
127  */
128 typedef void (*SendFinishCallback) (void *cls,
129                                     struct GNUNET_STREAM_Socket *socket);
130
131
132 /**
133  * The send message queue
134  */
135 struct MessageQueue
136 {
137   /**
138    * The message
139    */
140   struct GNUNET_STREAM_MessageHeader *message;
141
142   /**
143    * Callback to be called when the message is sent
144    */
145   SendFinishCallback finish_cb;
146
147   /**
148    * The closure for finish_cb
149    */
150   void *finish_cb_cls;
151
152   /**
153    * The next message in queue. Should be NULL in the last message
154    */
155   struct MessageQueue *next;
156
157   /**
158    * The next message in queue. Should be NULL in the first message
159    */
160   struct MessageQueue *prev;
161 };
162
163
164 /**
165  * The STREAM Socket Handler
166  */
167 struct GNUNET_STREAM_Socket
168 {
169   /**
170    * Retransmission timeout
171    */
172   struct GNUNET_TIME_Relative retransmit_timeout;
173
174   /**
175    * The Acknowledgement Bitmap
176    */
177   GNUNET_STREAM_AckBitmap ack_bitmap;
178
179   /**
180    * Time when the Acknowledgement was queued
181    */
182   struct GNUNET_TIME_Absolute ack_time_registered;
183
184   /**
185    * Queued Acknowledgement deadline
186    */
187   struct GNUNET_TIME_Relative ack_time_deadline;
188
189   /**
190    * The mesh handle
191    */
192   struct GNUNET_MESH_Handle *mesh;
193
194   /**
195    * The mesh tunnel handle
196    */
197   struct GNUNET_MESH_Tunnel *tunnel;
198
199   /**
200    * Stream open closure
201    */
202   void *open_cls;
203
204   /**
205    * Stream open callback
206    */
207   GNUNET_STREAM_OpenCallback open_cb;
208
209   /**
210    * The current transmit handle (if a pending transmit request exists)
211    */
212   struct GNUNET_MESH_TransmitHandle *transmit_handle;
213
214   /**
215    * The current message associated with the transmit handle
216    */
217   struct MessageQueue *queue_head;
218
219   /**
220    * The queue tail, should always point to the last message in queue
221    */
222   struct MessageQueue *queue_tail;
223
224   /**
225    * The write IO_handle associated with this socket
226    */
227   struct GNUNET_STREAM_IOWriteHandle *write_handle;
228
229   /**
230    * The read IO_handle associated with this socket
231    */
232   struct GNUNET_STREAM_IOReadHandle *read_handle;
233
234   /**
235    * The shutdown handle associated with this socket
236    */
237   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
238
239   /**
240    * Buffer for storing received messages
241    */
242   void *receive_buffer;
243
244   /**
245    * The listen socket from which this socket is derived. Should be NULL if it
246    * is not a derived socket
247    */
248   struct GNUNET_STREAM_ListenSocket *lsocket;
249
250   /**
251    * Task identifier for the read io timeout task
252    */
253   GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id;
254
255   /**
256    * Task identifier for retransmission task after timeout
257    */
258   GNUNET_SCHEDULER_TaskIdentifier retransmission_timeout_task_id;
259
260   /**
261    * The task for sending timely Acks
262    */
263   GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
264
265   /**
266    * Task scheduled to continue a read operation.
267    */
268   GNUNET_SCHEDULER_TaskIdentifier read_task_id;
269
270   /**
271    * The state of the protocol associated with this socket
272    */
273   enum State state;
274
275   /**
276    * The status of the socket
277    */
278   enum GNUNET_STREAM_Status status;
279
280   /**
281    * The number of previous timeouts; FIXME: currently not used
282    */
283   unsigned int retries;
284
285   /**
286    * The peer identity of the peer at the other end of the stream
287    */
288   GNUNET_PEER_Id other_peer;
289
290   /**
291    * Our Peer Identity (for debugging)
292    */
293   GNUNET_PEER_Id our_id;
294
295   /**
296    * The application port number (type: uint32_t)
297    */
298   GNUNET_MESH_ApplicationType app_port;
299
300   /**
301    * The session id associated with this stream connection
302    * FIXME: Not used currently, may be removed
303    */
304   uint32_t session_id;
305
306   /**
307    * Write sequence number. Set to random when sending HELLO(client) and
308    * HELLO_ACK(server) 
309    */
310   uint32_t write_sequence_number;
311
312   /**
313    * Read sequence number. This number's value is determined during handshake
314    */
315   uint32_t read_sequence_number;
316
317   /**
318    * The receiver buffer size
319    */
320   uint32_t receive_buffer_size;
321
322   /**
323    * The receiver buffer boundaries
324    */
325   uint32_t receive_buffer_boundaries[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
326
327   /**
328    * receiver's available buffer after the last acknowledged packet
329    */
330   uint32_t receiver_window_available;
331
332   /**
333    * The offset pointer used during write operation
334    */
335   uint32_t write_offset;
336
337   /**
338    * The offset after which we are expecting data
339    */
340   uint32_t read_offset;
341
342   /**
343    * The offset upto which user has read from the received buffer
344    */
345   uint32_t copy_offset;
346 };
347
348
349 /**
350  * A socket for listening
351  */
352 struct GNUNET_STREAM_ListenSocket
353 {
354   /**
355    * The mesh handle
356    */
357   struct GNUNET_MESH_Handle *mesh;
358
359   /**
360    * The callback function which is called after successful opening socket
361    */
362   GNUNET_STREAM_ListenCallback listen_cb;
363
364   /**
365    * The call back closure
366    */
367   void *listen_cb_cls;
368
369   /**
370    * Our interned Peer's identity
371    */
372   GNUNET_PEER_Id our_id;
373
374   /**
375    * The service port
376    * FIXME: Remove if not required!
377    */
378   GNUNET_MESH_ApplicationType port;
379 };
380
381
382 /**
383  * The IO Write Handle
384  */
385 struct GNUNET_STREAM_IOWriteHandle
386 {
387   /**
388    * The socket to which this write handle is associated
389    */
390   struct GNUNET_STREAM_Socket *socket;
391
392   /**
393    * The packet_buffers associated with this Handle
394    */
395   struct GNUNET_STREAM_DataMessage *messages[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
396
397   /**
398    * The write continuation callback
399    */
400   GNUNET_STREAM_CompletionContinuation write_cont;
401
402   /**
403    * Write continuation closure
404    */
405   void *write_cont_cls;
406
407   /**
408    * The bitmap of this IOHandle; Corresponding bit for a message is set when
409    * it has been acknowledged by the receiver
410    */
411   GNUNET_STREAM_AckBitmap ack_bitmap;
412
413   /**
414    * Number of bytes in this write handle
415    */
416   size_t size;
417 };
418
419
420 /**
421  * The IO Read Handle
422  */
423 struct GNUNET_STREAM_IOReadHandle
424 {
425   /**
426    * Callback for the read processor
427    */
428   GNUNET_STREAM_DataProcessor proc;
429
430   /**
431    * The closure pointer for the read processor callback
432    */
433   void *proc_cls;
434 };
435
436
437 /**
438  * Handle for Shutdown
439  */
440 struct GNUNET_STREAM_ShutdownHandle
441 {
442   /**
443    * The socket associated with this shutdown handle
444    */
445   struct GNUNET_STREAM_Socket *socket;
446
447   /**
448    * Shutdown completion callback
449    */
450   GNUNET_STREAM_ShutdownCompletion completion_cb;
451
452   /**
453    * Closure for completion callback
454    */
455   void *completion_cls;
456
457   /**
458    * Close message retransmission task id
459    */
460   GNUNET_SCHEDULER_TaskIdentifier close_msg_retransmission_task_id;
461
462   /**
463    * Which operation to shutdown? SHUT_RD, SHUT_WR or SHUT_RDWR
464    */
465   int operation;  
466 };
467
468
469 /**
470  * Default value in seconds for various timeouts
471  */
472 static unsigned int default_timeout = 10;
473
474
475 /**
476  * Callback function for sending queued message
477  *
478  * @param cls closure the socket
479  * @param size number of bytes available in buf
480  * @param buf where the callee should write the message
481  * @return number of bytes written to buf
482  */
483 static size_t
484 send_message_notify (void *cls, size_t size, void *buf)
485 {
486   struct GNUNET_STREAM_Socket *socket = cls;
487   struct GNUNET_PeerIdentity target;
488   struct MessageQueue *head;
489   size_t ret;
490
491   socket->transmit_handle = NULL; /* Remove the transmit handle */
492   head = socket->queue_head;
493   if (NULL == head)
494     return 0; /* just to be safe */
495   GNUNET_PEER_resolve (socket->other_peer, &target);
496   if (0 == size)                /* request timed out */
497     {
498       socket->retries++;
499       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
500                   "Message sending timed out. Retry %d \n",
501                   socket->retries);
502       socket->transmit_handle = 
503         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
504                                            0, /* Corking */
505                                            1, /* Priority */
506                                            /* FIXME: exponential backoff */
507                                            socket->retransmit_timeout,
508                                            &target,
509                                            ntohs (head->message->header.size),
510                                            &send_message_notify,
511                                            socket);
512       return 0;
513     }
514
515   ret = ntohs (head->message->header.size);
516   GNUNET_assert (size >= ret);
517   memcpy (buf, head->message, ret);
518   if (NULL != head->finish_cb)
519     {
520       head->finish_cb (head->finish_cb_cls, socket);
521     }
522   GNUNET_CONTAINER_DLL_remove (socket->queue_head,
523                                socket->queue_tail,
524                                head);
525   GNUNET_free (head->message);
526   GNUNET_free (head);
527   head = socket->queue_head;
528   if (NULL != head)    /* more pending messages to send */
529     {
530       socket->retries = 0;
531       socket->transmit_handle = 
532         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
533                                            0, /* Corking */
534                                            1, /* Priority */
535                                            /* FIXME: exponential backoff */
536                                            socket->retransmit_timeout,
537                                            &target,
538                                            ntohs (head->message->header.size),
539                                            &send_message_notify,
540                                            socket);
541     }
542   return ret;
543 }
544
545
546 /**
547  * Queues a message for sending using the mesh connection of a socket
548  *
549  * @param socket the socket whose mesh connection is used
550  * @param message the message to be sent
551  * @param finish_cb the callback to be called when the message is sent
552  * @param finish_cb_cls the closure for the callback
553  */
554 static void
555 queue_message (struct GNUNET_STREAM_Socket *socket,
556                struct GNUNET_STREAM_MessageHeader *message,
557                SendFinishCallback finish_cb,
558                void *finish_cb_cls)
559 {
560   struct MessageQueue *queue_entity;
561   struct GNUNET_PeerIdentity target;
562
563   GNUNET_assert 
564     ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA)
565      && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK));
566
567   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
568               "%x: Queueing message of type %d and size %d\n",
569               socket->our_id,
570               ntohs (message->header.type),
571               ntohs (message->header.size));
572   GNUNET_assert (NULL != message);
573   queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
574   queue_entity->message = message;
575   queue_entity->finish_cb = finish_cb;
576   queue_entity->finish_cb_cls = finish_cb_cls;
577   GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
578                                     socket->queue_tail,
579                                     queue_entity);
580   if (NULL == socket->transmit_handle)
581   {
582     socket->retries = 0;
583     GNUNET_PEER_resolve (socket->other_peer, &target);
584     socket->transmit_handle = 
585       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
586                                          0, /* Corking */
587                                          1, /* Priority */
588                                          socket->retransmit_timeout,
589                                          &target,
590                                          ntohs (message->header.size),
591                                          &send_message_notify,
592                                          socket);
593   }
594 }
595
596
597 /**
598  * Copies a message and queues it for sending using the mesh connection of
599  * given socket 
600  *
601  * @param socket the socket whose mesh connection is used
602  * @param message the message to be sent
603  * @param finish_cb the callback to be called when the message is sent
604  * @param finish_cb_cls the closure for the callback
605  */
606 static void
607 copy_and_queue_message (struct GNUNET_STREAM_Socket *socket,
608                         const struct GNUNET_STREAM_MessageHeader *message,
609                         SendFinishCallback finish_cb,
610                         void *finish_cb_cls)
611 {
612   struct GNUNET_STREAM_MessageHeader *msg_copy;
613   uint16_t size;
614   
615   size = ntohs (message->header.size);
616   msg_copy = GNUNET_malloc (size);
617   memcpy (msg_copy, message, size);
618   queue_message (socket, msg_copy, finish_cb, finish_cb_cls);
619 }
620
621
622 /**
623  * Callback function for sending ack message
624  *
625  * @param cls closure the ACK message created in ack_task
626  * @param size number of bytes available in buffer
627  * @param buf where the callee should write the message
628  * @return number of bytes written to buf
629  */
630 static size_t
631 send_ack_notify (void *cls, size_t size, void *buf)
632 {
633   struct GNUNET_STREAM_AckMessage *ack_msg = cls;
634
635   if (0 == size)
636     {
637       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
638                   "%s called with size 0\n", __func__);
639       return 0;
640     }
641   GNUNET_assert (ntohs (ack_msg->header.header.size) <= size);
642   
643   size = ntohs (ack_msg->header.header.size);
644   memcpy (buf, ack_msg, size);
645   return size;
646 }
647
648 /**
649  * Writes data using the given socket. The amount of data written is limited by
650  * the receiver_window_size
651  *
652  * @param socket the socket to use
653  */
654 static void 
655 write_data (struct GNUNET_STREAM_Socket *socket);
656
657 /**
658  * Task for retransmitting data messages if they aren't ACK before their ack
659  * deadline 
660  *
661  * @param cls the socket
662  * @param tc the Task context
663  */
664 static void
665 retransmission_timeout_task (void *cls,
666                              const struct GNUNET_SCHEDULER_TaskContext *tc)
667 {
668   struct GNUNET_STREAM_Socket *socket = cls;
669   
670   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
671     return;
672
673   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
674               "%x: Retransmitting DATA...\n", socket->our_id);
675   socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
676   write_data (socket);
677 }
678
679
680 /**
681  * Task for sending ACK message
682  *
683  * @param cls the socket
684  * @param tc the Task context
685  */
686 static void
687 ack_task (void *cls,
688           const struct GNUNET_SCHEDULER_TaskContext *tc)
689 {
690   struct GNUNET_STREAM_Socket *socket = cls;
691   struct GNUNET_STREAM_AckMessage *ack_msg;
692   struct GNUNET_PeerIdentity target;
693
694   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
695     {
696       return;
697     }
698
699   socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
700
701   /* Create the ACK Message */
702   ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
703   ack_msg->header.header.size = htons (sizeof (struct 
704                                                GNUNET_STREAM_AckMessage));
705   ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
706   ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
707   ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
708   ack_msg->receive_window_remaining = 
709     htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
710
711   GNUNET_PEER_resolve (socket->other_peer, &target);
712   /* Request MESH for sending ACK */
713   GNUNET_MESH_notify_transmit_ready (socket->tunnel,
714                                      0, /* Corking */
715                                      1, /* Priority */
716                                      socket->retransmit_timeout,
717                                      &target,
718                                      ntohs (ack_msg->header.header.size),
719                                      &send_ack_notify,
720                                      ack_msg);
721
722   
723 }
724
725
726 /**
727  * Retransmission task for shutdown messages
728  *
729  * @param cls the shutdown handle
730  * @param tc the Task Context
731  */
732 static void
733 close_msg_retransmission_task (void *cls,
734                                const struct GNUNET_SCHEDULER_TaskContext *tc)
735 {
736   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle = cls;
737   struct GNUNET_STREAM_MessageHeader *msg;
738   struct GNUNET_STREAM_Socket *socket;
739
740   GNUNET_assert (NULL != shutdown_handle);
741   socket = shutdown_handle->socket;
742
743   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
744   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
745   switch (shutdown_handle->operation)
746     {
747     case SHUT_RDWR:
748       msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
749       break;
750     case SHUT_RD:
751       msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
752       break;
753     case SHUT_WR:
754       msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
755       break;
756     default:
757       GNUNET_free (msg);
758       shutdown_handle->close_msg_retransmission_task_id = 
759         GNUNET_SCHEDULER_NO_TASK;
760       return;
761     }
762   queue_message (socket, msg, NULL, NULL);
763   shutdown_handle->close_msg_retransmission_task_id =
764     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
765                                   &close_msg_retransmission_task,
766                                   shutdown_handle);
767 }
768
769
770 /**
771  * Function to modify a bit in GNUNET_STREAM_AckBitmap
772  *
773  * @param bitmap the bitmap to modify
774  * @param bit the bit number to modify
775  * @param value GNUNET_YES to on, GNUNET_NO to off
776  */
777 static void
778 ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
779                       unsigned int bit, 
780                       int value)
781 {
782   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
783   if (GNUNET_YES == value)
784     *bitmap |= (1LL << bit);
785   else
786     *bitmap &= ~(1LL << bit);
787 }
788
789
790 /**
791  * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
792  *
793  * @param bitmap address of the bitmap that has to be checked
794  * @param bit the bit number to check
795  * @return GNUNET_YES if the bit is set; GNUNET_NO if not
796  */
797 static uint8_t
798 ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
799                       unsigned int bit)
800 {
801   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
802   return 0 != (*bitmap & (1LL << bit));
803 }
804
805
806 /**
807  * Writes data using the given socket. The amount of data written is limited by
808  * the receiver_window_size
809  *
810  * @param socket the socket to use
811  */
812 static void 
813 write_data (struct GNUNET_STREAM_Socket *socket)
814 {
815   struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
816   int packet;                   /* Although an int, should never be negative */
817   int ack_packet;
818
819   ack_packet = -1;
820   /* Find the last acknowledged packet */
821   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
822     {      
823       if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
824                                               packet))
825         ack_packet = packet;        
826       else if (NULL == io_handle->messages[packet])
827         break;
828     }
829   /* Resend packets which weren't ack'ed */
830   for (packet=0; packet < ack_packet; packet++)
831     {
832       if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
833                                              packet))
834         {
835           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
836                       "%x: Placing DATA message with sequence %u in send queue\n",
837                       socket->our_id,
838                       ntohl (io_handle->messages[packet]->sequence_number));
839
840           copy_and_queue_message (socket,
841                                   &io_handle->messages[packet]->header,
842                                   NULL,
843                                   NULL);
844         }
845     }
846   packet = ack_packet + 1;
847   /* Now send new packets if there is enough buffer space */
848   while ( (NULL != io_handle->messages[packet]) &&
849           (socket->receiver_window_available 
850            >= ntohs (io_handle->messages[packet]->header.header.size)) )
851     {
852       socket->receiver_window_available -= 
853         ntohs (io_handle->messages[packet]->header.header.size);
854       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
855                   "%x: Placing DATA message with sequence %u in send queue\n",
856                   socket->our_id,
857                   ntohl (io_handle->messages[packet]->sequence_number));
858       copy_and_queue_message (socket,
859                               &io_handle->messages[packet]->header,
860                               NULL,
861                               NULL);
862       packet++;
863     }
864
865   if (GNUNET_SCHEDULER_NO_TASK == socket->retransmission_timeout_task_id)
866     socket->retransmission_timeout_task_id = 
867       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 
868                                     (GNUNET_TIME_UNIT_SECONDS, 8),
869                                     &retransmission_timeout_task,
870                                     socket);
871 }
872
873
874 /**
875  * Task for calling the read processor
876  *
877  * @param cls the socket
878  * @param tc the task context
879  */
880 static void
881 call_read_processor (void *cls,
882                      const struct GNUNET_SCHEDULER_TaskContext *tc)
883 {
884   struct GNUNET_STREAM_Socket *socket = cls;
885   size_t read_size;
886   size_t valid_read_size;
887   unsigned int packet;
888   uint32_t sequence_increase;
889   uint32_t offset_increase;
890
891   socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
892   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
893     return;
894
895   if (NULL == socket->receive_buffer) 
896     return;
897
898   GNUNET_assert (NULL != socket->read_handle);
899   GNUNET_assert (NULL != socket->read_handle->proc);
900
901   /* Check the bitmap for any holes */
902   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
903     {
904       if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
905                                              packet))
906         break;
907     }
908   /* We only call read processor if we have the first packet */
909   GNUNET_assert (0 < packet);
910
911   valid_read_size = 
912     socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
913
914   GNUNET_assert (0 != valid_read_size);
915
916   /* Cancel the read_io_timeout_task */
917   GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task_id);
918   socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
919
920   /* Call the data processor */
921   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
922               "%x: Calling read processor\n",
923               socket->our_id);
924   read_size = 
925     socket->read_handle->proc (socket->read_handle->proc_cls,
926                                socket->status,
927                                socket->receive_buffer + socket->copy_offset,
928                                valid_read_size);
929   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
930               "%x: Read processor read %d bytes\n",
931               socket->our_id,
932               read_size);
933   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
934               "%x: Read processor completed successfully\n",
935               socket->our_id);
936
937   /* Free the read handle */
938   GNUNET_free (socket->read_handle);
939   socket->read_handle = NULL;
940
941   GNUNET_assert (read_size <= valid_read_size);
942   socket->copy_offset += read_size;
943
944   /* Determine upto which packet we can remove from the buffer */
945   for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
946     {
947       if (socket->copy_offset == socket->receive_buffer_boundaries[packet])
948         { packet++; break; }
949       if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
950         break;
951     }
952
953   /* If no packets can be removed we can't move the buffer */
954   if (0 == packet) return;
955
956   sequence_increase = packet;
957   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
958               "%x: Sequence increase after read processor completion: %u\n",
959               socket->our_id,
960               sequence_increase);
961
962   /* Shift the data in the receive buffer */
963   memmove (socket->receive_buffer,
964            socket->receive_buffer 
965            + socket->receive_buffer_boundaries[sequence_increase-1],
966            socket->receive_buffer_size
967            - socket->receive_buffer_boundaries[sequence_increase-1]);
968   
969   /* Shift the bitmap */
970   socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
971   
972   /* Set read_sequence_number */
973   socket->read_sequence_number += sequence_increase;
974   
975   /* Set read_offset */
976   offset_increase = socket->receive_buffer_boundaries[sequence_increase-1];
977   socket->read_offset += offset_increase;
978
979   /* Fix copy_offset */
980   GNUNET_assert (offset_increase <= socket->copy_offset);
981   socket->copy_offset -= offset_increase;
982   
983   /* Fix relative boundaries */
984   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
985     {
986       if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase)
987         {
988           socket->receive_buffer_boundaries[packet] = 
989             socket->receive_buffer_boundaries[packet + sequence_increase] 
990             - offset_increase;
991         }
992       else
993         socket->receive_buffer_boundaries[packet] = 0;
994     }
995 }
996
997
998 /**
999  * Cancels the existing read io handle
1000  *
1001  * @param cls the closure from the SCHEDULER call
1002  * @param tc the task context
1003  */
1004 static void
1005 read_io_timeout (void *cls, 
1006                 const struct GNUNET_SCHEDULER_TaskContext *tc)
1007 {
1008   struct GNUNET_STREAM_Socket *socket = cls;
1009   GNUNET_STREAM_DataProcessor proc;
1010   void *proc_cls;
1011
1012   socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
1013   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
1014   {
1015     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1016                 "%x: Read task timedout - Cancelling it\n",
1017                 socket->our_id);
1018     GNUNET_SCHEDULER_cancel (socket->read_task_id);
1019     socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
1020   }
1021   GNUNET_assert (NULL != socket->read_handle);
1022   proc = socket->read_handle->proc;
1023   proc_cls = socket->read_handle->proc_cls;
1024
1025   GNUNET_free (socket->read_handle);
1026   socket->read_handle = NULL;
1027   /* Call the read processor to signal timeout */
1028   proc (proc_cls,
1029         GNUNET_STREAM_TIMEOUT,
1030         NULL,
1031         0);
1032 }
1033
1034
1035 /**
1036  * Handler for DATA messages; Same for both client and server
1037  *
1038  * @param socket the socket through which the ack was received
1039  * @param tunnel connection to the other end
1040  * @param sender who sent the message
1041  * @param msg the data message
1042  * @param atsi performance data for the connection
1043  * @return GNUNET_OK to keep the connection open,
1044  *         GNUNET_SYSERR to close it (signal serious error)
1045  */
1046 static int
1047 handle_data (struct GNUNET_STREAM_Socket *socket,
1048              struct GNUNET_MESH_Tunnel *tunnel,
1049              const struct GNUNET_PeerIdentity *sender,
1050              const struct GNUNET_STREAM_DataMessage *msg,
1051              const struct GNUNET_ATS_Information*atsi)
1052 {
1053   const void *payload;
1054   uint32_t bytes_needed;
1055   uint32_t relative_offset;
1056   uint32_t relative_sequence_number;
1057   uint16_t size;
1058
1059   size = htons (msg->header.header.size);
1060   if (size < sizeof (struct GNUNET_STREAM_DataMessage))
1061     {
1062       GNUNET_break_op (0);
1063       return GNUNET_SYSERR;
1064     }
1065
1066   if (GNUNET_PEER_search (sender) != socket->other_peer)
1067     {
1068       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1069                   "%x: Received DATA from non-confirming peer\n",
1070                   socket->our_id);
1071       return GNUNET_YES;
1072     }
1073
1074   switch (socket->state)
1075     {
1076     case STATE_ESTABLISHED:
1077     case STATE_TRANSMIT_CLOSED:
1078     case STATE_TRANSMIT_CLOSE_WAIT:
1079       
1080       /* check if the message's sequence number is in the range we are
1081          expecting */
1082       relative_sequence_number = 
1083         ntohl (msg->sequence_number) - socket->read_sequence_number;
1084       if ( relative_sequence_number > GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
1085         {
1086           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1087                       "%x: Ignoring received message with sequence number %u\n",
1088                       socket->our_id,
1089                       ntohl (msg->sequence_number));
1090           /* Start ACK sending task if one is not already present */
1091           if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1092             {
1093               socket->ack_task_id = 
1094                 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1095                                               (msg->ack_deadline),
1096                                               &ack_task,
1097                                               socket);
1098             }
1099           return GNUNET_YES;
1100         }
1101       
1102       /* Check if we have already seen this message */
1103       if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
1104                                               relative_sequence_number))
1105         {
1106           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1107                       "%x: Ignoring already received message with sequence "
1108                       "number %u\n",
1109                       socket->our_id,
1110                       ntohl (msg->sequence_number));
1111           /* Start ACK sending task if one is not already present */
1112           if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1113             {
1114               socket->ack_task_id = 
1115                 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1116                                               (msg->ack_deadline),
1117                                               &ack_task,
1118                                               socket);
1119             }
1120           return GNUNET_YES;
1121         }
1122
1123       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1124                   "%x: Receiving DATA with sequence number: %u and size: %d "
1125                   "from %x\n",
1126                   socket->our_id,
1127                   ntohl (msg->sequence_number),
1128                   ntohs (msg->header.header.size),
1129                   socket->other_peer);
1130
1131       /* Check if we have to allocate the buffer */
1132       size -= sizeof (struct GNUNET_STREAM_DataMessage);
1133       relative_offset = ntohl (msg->offset) - socket->read_offset;
1134       bytes_needed = relative_offset + size;
1135       if (bytes_needed > socket->receive_buffer_size)
1136         {
1137           if (bytes_needed <= RECEIVE_BUFFER_SIZE)
1138             {
1139               socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
1140                                                        bytes_needed);
1141               socket->receive_buffer_size = bytes_needed;
1142             }
1143           else
1144             {
1145               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1146                           "%x: Cannot accommodate packet %d as buffer is",
1147                           "full\n",
1148                           socket->our_id,
1149                           ntohl (msg->sequence_number));
1150               return GNUNET_YES;
1151             }
1152         }
1153       
1154       /* Copy Data to buffer */
1155       payload = &msg[1];
1156       GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
1157       memcpy (socket->receive_buffer + relative_offset,
1158               payload,
1159               size);
1160       socket->receive_buffer_boundaries[relative_sequence_number] = 
1161         relative_offset + size;
1162       
1163       /* Modify the ACK bitmap */
1164       ackbitmap_modify_bit (&socket->ack_bitmap,
1165                             relative_sequence_number,
1166                             GNUNET_YES);
1167
1168       /* Start ACK sending task if one is not already present */
1169       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1170        {
1171          socket->ack_task_id = 
1172            GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1173                                          (msg->ack_deadline),
1174                                          &ack_task,
1175                                          socket);
1176        }
1177
1178       if ((NULL != socket->read_handle) /* A read handle is waiting */
1179           /* There is no current read task */
1180           && (GNUNET_SCHEDULER_NO_TASK == socket->read_task_id)
1181           /* We have the first packet */
1182           && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap,
1183                                                  0)))
1184         {
1185           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1186                       "%x: Scheduling read processor\n",
1187                       socket->our_id);
1188
1189           socket->read_task_id = 
1190             GNUNET_SCHEDULER_add_now (&call_read_processor,
1191                                       socket);
1192         }
1193       
1194       break;
1195
1196     default:
1197       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1198                   "%x: Received data message when it cannot be handled\n",
1199                   socket->our_id);
1200       break;
1201     }
1202   return GNUNET_YES;
1203 }
1204
1205
1206 /**
1207  * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1208  *
1209  * @param cls the socket (set from GNUNET_MESH_connect)
1210  * @param tunnel connection to the other end
1211  * @param tunnel_ctx place to store local state associated with the tunnel
1212  * @param sender who sent the message
1213  * @param message the actual message
1214  * @param atsi performance data for the connection
1215  * @return GNUNET_OK to keep the connection open,
1216  *         GNUNET_SYSERR to close it (signal serious error)
1217  */
1218 static int
1219 client_handle_data (void *cls,
1220              struct GNUNET_MESH_Tunnel *tunnel,
1221              void **tunnel_ctx,
1222              const struct GNUNET_PeerIdentity *sender,
1223              const struct GNUNET_MessageHeader *message,
1224              const struct GNUNET_ATS_Information*atsi)
1225 {
1226   struct GNUNET_STREAM_Socket *socket = cls;
1227
1228   return handle_data (socket, 
1229                       tunnel, 
1230                       sender, 
1231                       (const struct GNUNET_STREAM_DataMessage *) message, 
1232                       atsi);
1233 }
1234
1235
1236 /**
1237  * Callback to set state to ESTABLISHED
1238  *
1239  * @param cls the closure from queue_message FIXME: document
1240  * @param socket the socket to requiring state change
1241  */
1242 static void
1243 set_state_established (void *cls,
1244                        struct GNUNET_STREAM_Socket *socket)
1245 {
1246   struct GNUNET_PeerIdentity initiator_pid;
1247
1248   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
1249               "%x: Attaining ESTABLISHED state\n",
1250               socket->our_id);
1251   socket->write_offset = 0;
1252   socket->read_offset = 0;
1253   socket->state = STATE_ESTABLISHED;
1254   /* FIXME: What if listen_cb is NULL */
1255   if (NULL != socket->lsocket)
1256     {
1257       GNUNET_PEER_resolve (socket->other_peer, &initiator_pid);
1258       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1259                   "%x: Calling listen callback\n",
1260                   socket->our_id);
1261       if (GNUNET_SYSERR == 
1262           socket->lsocket->listen_cb (socket->lsocket->listen_cb_cls,
1263                                       socket,
1264                                       &initiator_pid))
1265         {
1266           socket->state = STATE_CLOSED;
1267           /* FIXME: We should close in a decent way */
1268           GNUNET_MESH_tunnel_destroy (socket->tunnel); /* Destroy the tunnel */
1269           GNUNET_free (socket);
1270         }
1271     }
1272   else if (socket->open_cb)
1273     socket->open_cb (socket->open_cls, socket);
1274 }
1275
1276
1277 /**
1278  * Callback to set state to HELLO_WAIT
1279  *
1280  * @param cls the closure from queue_message
1281  * @param socket the socket to requiring state change
1282  */
1283 static void
1284 set_state_hello_wait (void *cls,
1285                       struct GNUNET_STREAM_Socket *socket)
1286 {
1287   GNUNET_assert (STATE_INIT == socket->state);
1288   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
1289               "%x: Attaining HELLO_WAIT state\n",
1290               socket->our_id);
1291   socket->state = STATE_HELLO_WAIT;
1292 }
1293
1294
1295 /**
1296  * Callback to set state to CLOSE_WAIT
1297  *
1298  * @param cls the closure from queue_message
1299  * @param socket the socket requiring state change
1300  */
1301 static void
1302 set_state_close_wait (void *cls,
1303                       struct GNUNET_STREAM_Socket *socket)
1304 {
1305   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1306               "%x: Attaing CLOSE_WAIT state\n",
1307               socket->our_id);
1308   socket->state = STATE_CLOSE_WAIT;
1309   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1310   socket->receive_buffer = NULL;
1311   socket->receive_buffer_size = 0;
1312 }
1313
1314
1315 /**
1316  * Callback to set state to RECEIVE_CLOSE_WAIT
1317  *
1318  * @param cls the closure from queue_message
1319  * @param socket the socket requiring state change
1320  */
1321 static void
1322 set_state_receive_close_wait (void *cls,
1323                               struct GNUNET_STREAM_Socket *socket)
1324 {
1325   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1326               "%x: Attaing RECEIVE_CLOSE_WAIT state\n",
1327               socket->our_id);
1328   socket->state = STATE_RECEIVE_CLOSE_WAIT;
1329   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1330   socket->receive_buffer = NULL;
1331   socket->receive_buffer_size = 0;
1332 }
1333
1334
1335 /**
1336  * Callback to set state to TRANSMIT_CLOSE_WAIT
1337  *
1338  * @param cls the closure from queue_message
1339  * @param socket the socket requiring state change
1340  */
1341 static void
1342 set_state_transmit_close_wait (void *cls,
1343                                struct GNUNET_STREAM_Socket *socket)
1344 {
1345   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1346               "%x: Attaing TRANSMIT_CLOSE_WAIT state\n",
1347               socket->our_id);
1348   socket->state = STATE_TRANSMIT_CLOSE_WAIT;
1349 }
1350
1351
1352 /**
1353  * Callback to set state to CLOSED
1354  *
1355  * @param cls the closure from queue_message
1356  * @param socket the socket requiring state change
1357  */
1358 static void
1359 set_state_closed (void *cls,
1360                   struct GNUNET_STREAM_Socket *socket)
1361 {
1362   socket->state = STATE_CLOSED;
1363 }
1364
1365 /**
1366  * Returns a new HelloAckMessage. Also sets the write sequence number for the
1367  * socket
1368  *
1369  * @param socket the socket for which this HelloAckMessage has to be generated
1370  * @return the HelloAckMessage
1371  */
1372 static struct GNUNET_STREAM_HelloAckMessage *
1373 generate_hello_ack_msg (struct GNUNET_STREAM_Socket *socket)
1374 {
1375   struct GNUNET_STREAM_HelloAckMessage *msg;
1376
1377   /* Get the random sequence number */
1378   socket->write_sequence_number = 
1379     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1380   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1381               "%x: Generated write sequence number %u\n",
1382               socket->our_id,
1383               (unsigned int) socket->write_sequence_number);
1384   
1385   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1386   msg->header.header.size = 
1387     htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1388   msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1389   msg->sequence_number = htonl (socket->write_sequence_number);
1390   msg->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
1391
1392   return msg;
1393 }
1394
1395
1396 /**
1397  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1398  *
1399  * @param cls the socket (set from GNUNET_MESH_connect)
1400  * @param tunnel connection to the other end
1401  * @param tunnel_ctx this is NULL
1402  * @param sender who sent the message
1403  * @param message the actual message
1404  * @param atsi performance data for the connection
1405  * @return GNUNET_OK to keep the connection open,
1406  *         GNUNET_SYSERR to close it (signal serious error)
1407  */
1408 static int
1409 client_handle_hello_ack (void *cls,
1410                          struct GNUNET_MESH_Tunnel *tunnel,
1411                          void **tunnel_ctx,
1412                          const struct GNUNET_PeerIdentity *sender,
1413                          const struct GNUNET_MessageHeader *message,
1414                          const struct GNUNET_ATS_Information*atsi)
1415 {
1416   struct GNUNET_STREAM_Socket *socket = cls;
1417   const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
1418   struct GNUNET_STREAM_HelloAckMessage *reply;
1419
1420   if (GNUNET_PEER_search (sender) != socket->other_peer)
1421     {
1422       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1423                   "%x: Received HELLO_ACK from non-confirming peer\n",
1424                   socket->our_id);
1425       return GNUNET_YES;
1426     }
1427   ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
1428   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1429               "%x: Received HELLO_ACK from %x\n",
1430               socket->our_id,
1431               socket->other_peer);
1432
1433   GNUNET_assert (socket->tunnel == tunnel);
1434   switch (socket->state)
1435   {
1436   case STATE_HELLO_WAIT:
1437     socket->read_sequence_number = ntohl (ack_msg->sequence_number);
1438     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1439                 "%x: Read sequence number %u\n",
1440                 socket->our_id,
1441                 (unsigned int) socket->read_sequence_number);
1442     socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
1443     reply = generate_hello_ack_msg (socket);
1444     queue_message (socket,
1445                    &reply->header, 
1446                    &set_state_established, 
1447                    NULL);      
1448     return GNUNET_OK;
1449   case STATE_ESTABLISHED:
1450   case STATE_RECEIVE_CLOSE_WAIT:
1451     // call statistics (# ACKs ignored++)
1452     return GNUNET_OK;
1453   case STATE_INIT:
1454   default:
1455     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1456                 "%x: Server %x sent HELLO_ACK when in state %d\n", 
1457                 socket->our_id,
1458                 socket->other_peer,
1459                 socket->state);
1460     socket->state = STATE_CLOSED; // introduce STATE_ERROR?
1461     return GNUNET_SYSERR;
1462   }
1463
1464 }
1465
1466
1467 /**
1468  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1469  *
1470  * @param cls the socket (set from GNUNET_MESH_connect)
1471  * @param tunnel connection to the other end
1472  * @param tunnel_ctx this is NULL
1473  * @param sender who sent the message
1474  * @param message the actual message
1475  * @param atsi performance data for the connection
1476  * @return GNUNET_OK to keep the connection open,
1477  *         GNUNET_SYSERR to close it (signal serious error)
1478  */
1479 static int
1480 client_handle_reset (void *cls,
1481                      struct GNUNET_MESH_Tunnel *tunnel,
1482                      void **tunnel_ctx,
1483                      const struct GNUNET_PeerIdentity *sender,
1484                      const struct GNUNET_MessageHeader *message,
1485                      const struct GNUNET_ATS_Information*atsi)
1486 {
1487   struct GNUNET_STREAM_Socket *socket = cls;
1488
1489   return GNUNET_OK;
1490 }
1491
1492
1493 /**
1494  * Common message handler for handling TRANSMIT_CLOSE messages
1495  *
1496  * @param socket the socket through which the ack was received
1497  * @param tunnel connection to the other end
1498  * @param sender who sent the message
1499  * @param msg the transmit close message
1500  * @param atsi performance data for the connection
1501  * @return GNUNET_OK to keep the connection open,
1502  *         GNUNET_SYSERR to close it (signal serious error)
1503  */
1504 static int
1505 handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
1506                        struct GNUNET_MESH_Tunnel *tunnel,
1507                        const struct GNUNET_PeerIdentity *sender,
1508                        const struct GNUNET_STREAM_MessageHeader *msg,
1509                        const struct GNUNET_ATS_Information*atsi)
1510 {
1511   struct GNUNET_STREAM_MessageHeader *reply;
1512
1513   switch (socket->state)
1514     {
1515     case STATE_ESTABLISHED:
1516       socket->state = STATE_RECEIVE_CLOSED;
1517
1518       /* Send TRANSMIT_CLOSE_ACK */
1519       reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1520       reply->header.type = 
1521         htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
1522       reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1523       queue_message (socket, reply, NULL, NULL);
1524       break;
1525
1526     default:
1527       /* FIXME: Call statistics? */
1528       break;
1529     }
1530   return GNUNET_YES;
1531 }
1532
1533
1534 /**
1535  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1536  *
1537  * @param cls the socket (set from GNUNET_MESH_connect)
1538  * @param tunnel connection to the other end
1539  * @param tunnel_ctx this is NULL
1540  * @param sender who sent the message
1541  * @param message the actual message
1542  * @param atsi performance data for the connection
1543  * @return GNUNET_OK to keep the connection open,
1544  *         GNUNET_SYSERR to close it (signal serious error)
1545  */
1546 static int
1547 client_handle_transmit_close (void *cls,
1548                               struct GNUNET_MESH_Tunnel *tunnel,
1549                               void **tunnel_ctx,
1550                               const struct GNUNET_PeerIdentity *sender,
1551                               const struct GNUNET_MessageHeader *message,
1552                               const struct GNUNET_ATS_Information*atsi)
1553 {
1554   struct GNUNET_STREAM_Socket *socket = cls;
1555   
1556   return handle_transmit_close (socket,
1557                                 tunnel,
1558                                 sender,
1559                                 (struct GNUNET_STREAM_MessageHeader *)message,
1560                                 atsi);
1561 }
1562
1563
1564 /**
1565  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_*_CLOSE_ACK messages
1566  *
1567  * @param socket the socket
1568  * @param tunnel connection to the other end
1569  * @param sender who sent the message
1570  * @param message the actual message
1571  * @param atsi performance data for the connection
1572  * @param operation the close operation which is being ACK'ed
1573  * @return GNUNET_OK to keep the connection open,
1574  *         GNUNET_SYSERR to close it (signal serious error)
1575  */
1576 static int
1577 handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
1578                           struct GNUNET_MESH_Tunnel *tunnel,
1579                           const struct GNUNET_PeerIdentity *sender,
1580                           const struct GNUNET_STREAM_MessageHeader *message,
1581                           const struct GNUNET_ATS_Information *atsi,
1582                           int operation)
1583 {
1584   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
1585
1586   shutdown_handle = socket->shutdown_handle;
1587   if (NULL == shutdown_handle)
1588     {
1589       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1590                   "%x: Received *CLOSE_ACK when shutdown handle is NULL\n",
1591                   socket->our_id);
1592       return GNUNET_OK;
1593     }
1594
1595   switch (operation)
1596     {
1597     case SHUT_RDWR:
1598       switch (socket->state)
1599         {
1600         case STATE_CLOSE_WAIT:
1601           if (SHUT_RDWR != shutdown_handle->operation)
1602             {
1603               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1604                           "%x: Received CLOSE_ACK when shutdown handle "
1605                           "is not for SHUT_RDWR\n",
1606                           socket->our_id);
1607               return GNUNET_OK;
1608             }
1609
1610           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1611                       "%x: Received CLOSE_ACK from %x\n",
1612                       socket->our_id,
1613                       socket->other_peer);
1614           socket->state = STATE_CLOSED;
1615           break;
1616         default:
1617           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1618                       "%x: Received CLOSE_ACK when in it not expected\n",
1619                       socket->our_id);
1620           return GNUNET_OK;
1621         }
1622       break;
1623
1624     case SHUT_RD:
1625       switch (socket->state)
1626         {
1627         case STATE_RECEIVE_CLOSE_WAIT:
1628           if (SHUT_RD != shutdown_handle->operation)
1629             {
1630               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1631                           "%x: Received RECEIVE_CLOSE_ACK when shutdown handle "
1632                           "is not for SHUT_RD\n",
1633                           socket->our_id);
1634               return GNUNET_OK;
1635             }
1636
1637           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1638                       "%x: Received RECEIVE_CLOSE_ACK from %x\n",
1639                       socket->our_id,
1640                       socket->other_peer);
1641           socket->state = STATE_RECEIVE_CLOSED;
1642           break;
1643         default:
1644           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1645                       "%x: Received RECEIVE_CLOSE_ACK when in it not expected\n",
1646                       socket->our_id);
1647           return GNUNET_OK;
1648         }
1649
1650       break;
1651     case SHUT_WR:
1652       switch (socket->state)
1653         {
1654         case STATE_TRANSMIT_CLOSE_WAIT:
1655           if (SHUT_WR != shutdown_handle->operation)
1656             {
1657               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1658                           "%x: Received TRANSMIT_CLOSE_ACK when shutdown handle "
1659                           "is not for SHUT_WR\n",
1660                           socket->our_id);
1661               return GNUNET_OK;
1662             }
1663
1664           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1665                       "%x: Received TRAMSMIT_CLOSE_ACK from %x\n",
1666                       socket->our_id,
1667                       socket->other_peer);
1668           socket->state = STATE_TRANSMIT_CLOSED;
1669           break;
1670         default:
1671           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1672                       "%x: Received TRANSMIT_CLOSE_ACK when in it not expected\n",
1673                       socket->our_id);
1674           
1675           return GNUNET_OK;
1676         }
1677       break;
1678     default:
1679       GNUNET_assert (0);
1680     }
1681
1682   if (NULL != shutdown_handle->completion_cb) /* Shutdown completion */
1683     shutdown_handle->completion_cb(shutdown_handle->completion_cls,
1684                                    operation);
1685   GNUNET_free (shutdown_handle); /* Free shutdown handle */
1686   socket->shutdown_handle = NULL;
1687   if (GNUNET_SCHEDULER_NO_TASK
1688       != shutdown_handle->close_msg_retransmission_task_id)
1689     {
1690       GNUNET_SCHEDULER_cancel
1691         (shutdown_handle->close_msg_retransmission_task_id);
1692       shutdown_handle->close_msg_retransmission_task_id =
1693         GNUNET_SCHEDULER_NO_TASK;
1694     }
1695   return GNUNET_OK;
1696 }
1697
1698
1699 /**
1700  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1701  *
1702  * @param cls the socket (set from GNUNET_MESH_connect)
1703  * @param tunnel connection to the other end
1704  * @param tunnel_ctx this is NULL
1705  * @param sender who sent the message
1706  * @param message the actual message
1707  * @param atsi performance data for the connection
1708  * @return GNUNET_OK to keep the connection open,
1709  *         GNUNET_SYSERR to close it (signal serious error)
1710  */
1711 static int
1712 client_handle_transmit_close_ack (void *cls,
1713                                   struct GNUNET_MESH_Tunnel *tunnel,
1714                                   void **tunnel_ctx,
1715                                   const struct GNUNET_PeerIdentity *sender,
1716                                   const struct GNUNET_MessageHeader *message,
1717                                   const struct GNUNET_ATS_Information*atsi)
1718 {
1719   struct GNUNET_STREAM_Socket *socket = cls;
1720
1721   return handle_generic_close_ack (socket,
1722                                    tunnel,
1723                                    sender,
1724                                    (const struct GNUNET_STREAM_MessageHeader *)
1725                                    message,
1726                                    atsi,
1727                                    SHUT_WR);
1728 }
1729
1730
1731 /**
1732  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1733  *
1734  * @param socket the socket
1735  * @param tunnel connection to the other end
1736  * @param sender who sent the message
1737  * @param message the actual message
1738  * @param atsi performance data for the connection
1739  * @return GNUNET_OK to keep the connection open,
1740  *         GNUNET_SYSERR to close it (signal serious error)
1741  */
1742 static int
1743 handle_receive_close (struct GNUNET_STREAM_Socket *socket,
1744                       struct GNUNET_MESH_Tunnel *tunnel,
1745                       const struct GNUNET_PeerIdentity *sender,
1746                       const struct GNUNET_STREAM_MessageHeader *message,
1747                       const struct GNUNET_ATS_Information *atsi)
1748 {
1749   struct GNUNET_STREAM_MessageHeader *receive_close_ack;
1750
1751   switch (socket->state)
1752     {
1753     case STATE_INIT:
1754     case STATE_LISTEN:
1755     case STATE_HELLO_WAIT:
1756       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1757                   "%x: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
1758                   socket->our_id);
1759       return GNUNET_OK;
1760     default:
1761       break;
1762     }
1763   
1764   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1765               "%x: Received RECEIVE_CLOSE from %x\n",
1766               socket->our_id,
1767               socket->other_peer);
1768   receive_close_ack =
1769     GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1770   receive_close_ack->header.size =
1771     htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1772   receive_close_ack->header.type =
1773     htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK);
1774   queue_message (socket,
1775                  receive_close_ack,
1776                  &set_state_closed,
1777                  NULL);
1778   
1779   /* FIXME: Handle the case where write handle is present; the write operation
1780               should be deemed as finised and the write continuation callback
1781               has to be called with the stream status GNUNET_STREAM_SHUTDOWN */
1782   return GNUNET_OK;
1783 }
1784
1785
1786 /**
1787  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1788  *
1789  * @param cls the socket (set from GNUNET_MESH_connect)
1790  * @param tunnel connection to the other end
1791  * @param tunnel_ctx this is NULL
1792  * @param sender who sent the message
1793  * @param message the actual message
1794  * @param atsi performance data for the connection
1795  * @return GNUNET_OK to keep the connection open,
1796  *         GNUNET_SYSERR to close it (signal serious error)
1797  */
1798 static int
1799 client_handle_receive_close (void *cls,
1800                              struct GNUNET_MESH_Tunnel *tunnel,
1801                              void **tunnel_ctx,
1802                              const struct GNUNET_PeerIdentity *sender,
1803                              const struct GNUNET_MessageHeader *message,
1804                              const struct GNUNET_ATS_Information*atsi)
1805 {
1806   struct GNUNET_STREAM_Socket *socket = cls;
1807
1808   return
1809     handle_receive_close (socket,
1810                           tunnel,
1811                           sender,
1812                           (const struct GNUNET_STREAM_MessageHeader *) message,
1813                           atsi);
1814 }
1815
1816
1817 /**
1818  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1819  *
1820  * @param cls the socket (set from GNUNET_MESH_connect)
1821  * @param tunnel connection to the other end
1822  * @param tunnel_ctx this is NULL
1823  * @param sender who sent the message
1824  * @param message the actual message
1825  * @param atsi performance data for the connection
1826  * @return GNUNET_OK to keep the connection open,
1827  *         GNUNET_SYSERR to close it (signal serious error)
1828  */
1829 static int
1830 client_handle_receive_close_ack (void *cls,
1831                                  struct GNUNET_MESH_Tunnel *tunnel,
1832                                  void **tunnel_ctx,
1833                                  const struct GNUNET_PeerIdentity *sender,
1834                                  const struct GNUNET_MessageHeader *message,
1835                                  const struct GNUNET_ATS_Information*atsi)
1836 {
1837   struct GNUNET_STREAM_Socket *socket = cls;
1838
1839   return handle_generic_close_ack (socket,
1840                                    tunnel,
1841                                    sender,
1842                                    (const struct GNUNET_STREAM_MessageHeader *)
1843                                    message,
1844                                    atsi,
1845                                    SHUT_RD);
1846 }
1847
1848
1849 /**
1850  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1851  *
1852  * @param socket the socket
1853  * @param tunnel connection to the other end
1854  * @param sender who sent the message
1855  * @param message the actual message
1856  * @param atsi performance data for the connection
1857  * @return GNUNET_OK to keep the connection open,
1858  *         GNUNET_SYSERR to close it (signal serious error)
1859  */
1860 static int
1861 handle_close (struct GNUNET_STREAM_Socket *socket,
1862               struct GNUNET_MESH_Tunnel *tunnel,
1863               const struct GNUNET_PeerIdentity *sender,
1864               const struct GNUNET_STREAM_MessageHeader *message,
1865               const struct GNUNET_ATS_Information*atsi)
1866 {
1867   struct GNUNET_STREAM_MessageHeader *close_ack;
1868
1869   switch (socket->state)
1870     {
1871     case STATE_INIT:
1872     case STATE_LISTEN:
1873     case STATE_HELLO_WAIT:
1874       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1875                   "%x: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
1876                   socket->our_id);
1877       return GNUNET_OK;
1878     default:
1879       break;
1880     }
1881
1882   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1883               "%x: Received CLOSE from %x\n",
1884               socket->our_id,
1885               socket->other_peer);
1886   close_ack = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1887   close_ack->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1888   close_ack->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK);
1889   queue_message (socket,
1890                  close_ack,
1891                  &set_state_closed,
1892                  NULL);
1893   if (socket->state == STATE_CLOSED)
1894     return GNUNET_OK;
1895
1896   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1897   socket->receive_buffer = NULL;
1898   socket->receive_buffer_size = 0;
1899   return GNUNET_OK;
1900 }
1901
1902
1903 /**
1904  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1905  *
1906  * @param cls the socket (set from GNUNET_MESH_connect)
1907  * @param tunnel connection to the other end
1908  * @param tunnel_ctx this is NULL
1909  * @param sender who sent the message
1910  * @param message the actual message
1911  * @param atsi performance data for the connection
1912  * @return GNUNET_OK to keep the connection open,
1913  *         GNUNET_SYSERR to close it (signal serious error)
1914  */
1915 static int
1916 client_handle_close (void *cls,
1917                      struct GNUNET_MESH_Tunnel *tunnel,
1918                      void **tunnel_ctx,
1919                      const struct GNUNET_PeerIdentity *sender,
1920                      const struct GNUNET_MessageHeader *message,
1921                      const struct GNUNET_ATS_Information*atsi)
1922 {
1923   struct GNUNET_STREAM_Socket *socket = cls;
1924
1925   return handle_close (socket,
1926                        tunnel,
1927                        sender,
1928                        (const struct GNUNET_STREAM_MessageHeader *) message,
1929                        atsi);
1930 }
1931
1932
1933 /**
1934  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1935  *
1936  * @param cls the socket (set from GNUNET_MESH_connect)
1937  * @param tunnel connection to the other end
1938  * @param tunnel_ctx this is NULL
1939  * @param sender who sent the message
1940  * @param message the actual message
1941  * @param atsi performance data for the connection
1942  * @return GNUNET_OK to keep the connection open,
1943  *         GNUNET_SYSERR to close it (signal serious error)
1944  */
1945 static int
1946 client_handle_close_ack (void *cls,
1947                          struct GNUNET_MESH_Tunnel *tunnel,
1948                          void **tunnel_ctx,
1949                          const struct GNUNET_PeerIdentity *sender,
1950                          const struct GNUNET_MessageHeader *message,
1951                          const struct GNUNET_ATS_Information *atsi)
1952 {
1953   struct GNUNET_STREAM_Socket *socket = cls;
1954
1955   return handle_generic_close_ack (socket,
1956                                    tunnel,
1957                                    sender,
1958                                    (const struct GNUNET_STREAM_MessageHeader *) 
1959                                    message,
1960                                    atsi,
1961                                    SHUT_RDWR);
1962 }
1963
1964 /*****************************/
1965 /* Server's Message Handlers */
1966 /*****************************/
1967
1968 /**
1969  * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1970  *
1971  * @param cls the closure
1972  * @param tunnel connection to the other end
1973  * @param tunnel_ctx the socket
1974  * @param sender who sent the message
1975  * @param message the actual message
1976  * @param atsi performance data for the connection
1977  * @return GNUNET_OK to keep the connection open,
1978  *         GNUNET_SYSERR to close it (signal serious error)
1979  */
1980 static int
1981 server_handle_data (void *cls,
1982                     struct GNUNET_MESH_Tunnel *tunnel,
1983                     void **tunnel_ctx,
1984                     const struct GNUNET_PeerIdentity *sender,
1985                     const struct GNUNET_MessageHeader *message,
1986                     const struct GNUNET_ATS_Information*atsi)
1987 {
1988   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1989
1990   return handle_data (socket,
1991                       tunnel,
1992                       sender,
1993                       (const struct GNUNET_STREAM_DataMessage *)message,
1994                       atsi);
1995 }
1996
1997
1998 /**
1999  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
2000  *
2001  * @param cls the closure
2002  * @param tunnel connection to the other end
2003  * @param tunnel_ctx the socket
2004  * @param sender who sent the message
2005  * @param message the actual message
2006  * @param atsi performance data for the connection
2007  * @return GNUNET_OK to keep the connection open,
2008  *         GNUNET_SYSERR to close it (signal serious error)
2009  */
2010 static int
2011 server_handle_hello (void *cls,
2012                      struct GNUNET_MESH_Tunnel *tunnel,
2013                      void **tunnel_ctx,
2014                      const struct GNUNET_PeerIdentity *sender,
2015                      const struct GNUNET_MessageHeader *message,
2016                      const struct GNUNET_ATS_Information*atsi)
2017 {
2018   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2019   struct GNUNET_STREAM_HelloAckMessage *reply;
2020
2021   if (GNUNET_PEER_search (sender) != socket->other_peer)
2022     {
2023       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2024                   "%x: Received HELLO from non-confirming peer\n",
2025                   socket->our_id);
2026       return GNUNET_YES;
2027     }
2028
2029   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == 
2030                  ntohs (message->type));
2031   GNUNET_assert (socket->tunnel == tunnel);
2032   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2033               "%x: Received HELLO from %x\n", 
2034               socket->our_id,
2035               socket->other_peer);
2036
2037   if (STATE_INIT == socket->state)
2038     {
2039       reply = generate_hello_ack_msg (socket);
2040       queue_message (socket, 
2041                      &reply->header,
2042                      &set_state_hello_wait, 
2043                      NULL);
2044     }
2045   else
2046     {
2047       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2048                   "Client sent HELLO when in state %d\n", socket->state);
2049       /* FIXME: Send RESET? */
2050       
2051     }
2052   return GNUNET_OK;
2053 }
2054
2055
2056 /**
2057  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
2058  *
2059  * @param cls the closure
2060  * @param tunnel connection to the other end
2061  * @param tunnel_ctx the socket
2062  * @param sender who sent the message
2063  * @param message the actual message
2064  * @param atsi performance data for the connection
2065  * @return GNUNET_OK to keep the connection open,
2066  *         GNUNET_SYSERR to close it (signal serious error)
2067  */
2068 static int
2069 server_handle_hello_ack (void *cls,
2070                          struct GNUNET_MESH_Tunnel *tunnel,
2071                          void **tunnel_ctx,
2072                          const struct GNUNET_PeerIdentity *sender,
2073                          const struct GNUNET_MessageHeader *message,
2074                          const struct GNUNET_ATS_Information*atsi)
2075 {
2076   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2077   const struct GNUNET_STREAM_HelloAckMessage *ack_message;
2078
2079   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK ==
2080                  ntohs (message->type));
2081   GNUNET_assert (socket->tunnel == tunnel);
2082   ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
2083   if (STATE_HELLO_WAIT == socket->state)
2084     {
2085       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2086                   "%x: Received HELLO_ACK from %x\n",
2087                   socket->our_id,
2088                   socket->other_peer);
2089       socket->read_sequence_number = ntohl (ack_message->sequence_number);
2090       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2091                   "%x: Read sequence number %u\n",
2092                   socket->our_id,
2093                   (unsigned int) socket->read_sequence_number);
2094       socket->receiver_window_available = 
2095         ntohl (ack_message->receiver_window_size);
2096       /* Attain ESTABLISHED state */
2097       set_state_established (NULL, socket);
2098     }
2099   else
2100     {
2101       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2102                   "Client sent HELLO_ACK when in state %d\n", socket->state);
2103       /* FIXME: Send RESET? */
2104       
2105     }
2106   return GNUNET_OK;
2107 }
2108
2109
2110 /**
2111  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
2112  *
2113  * @param cls the closure
2114  * @param tunnel connection to the other end
2115  * @param tunnel_ctx the socket
2116  * @param sender who sent the message
2117  * @param message the actual message
2118  * @param atsi performance data for the connection
2119  * @return GNUNET_OK to keep the connection open,
2120  *         GNUNET_SYSERR to close it (signal serious error)
2121  */
2122 static int
2123 server_handle_reset (void *cls,
2124                      struct GNUNET_MESH_Tunnel *tunnel,
2125                      void **tunnel_ctx,
2126                      const struct GNUNET_PeerIdentity *sender,
2127                      const struct GNUNET_MessageHeader *message,
2128                      const struct GNUNET_ATS_Information*atsi)
2129 {
2130   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2131
2132   return GNUNET_OK;
2133 }
2134
2135
2136 /**
2137  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
2138  *
2139  * @param cls the closure
2140  * @param tunnel connection to the other end
2141  * @param tunnel_ctx the socket
2142  * @param sender who sent the message
2143  * @param message the actual message
2144  * @param atsi performance data for the connection
2145  * @return GNUNET_OK to keep the connection open,
2146  *         GNUNET_SYSERR to close it (signal serious error)
2147  */
2148 static int
2149 server_handle_transmit_close (void *cls,
2150                               struct GNUNET_MESH_Tunnel *tunnel,
2151                               void **tunnel_ctx,
2152                               const struct GNUNET_PeerIdentity *sender,
2153                               const struct GNUNET_MessageHeader *message,
2154                               const struct GNUNET_ATS_Information*atsi)
2155 {
2156   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2157
2158   return handle_transmit_close (socket,
2159                                 tunnel,
2160                                 sender,
2161                                 (struct GNUNET_STREAM_MessageHeader *)message,
2162                                 atsi);
2163 }
2164
2165
2166 /**
2167  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
2168  *
2169  * @param cls the closure
2170  * @param tunnel connection to the other end
2171  * @param tunnel_ctx the socket
2172  * @param sender who sent the message
2173  * @param message the actual message
2174  * @param atsi performance data for the connection
2175  * @return GNUNET_OK to keep the connection open,
2176  *         GNUNET_SYSERR to close it (signal serious error)
2177  */
2178 static int
2179 server_handle_transmit_close_ack (void *cls,
2180                                   struct GNUNET_MESH_Tunnel *tunnel,
2181                                   void **tunnel_ctx,
2182                                   const struct GNUNET_PeerIdentity *sender,
2183                                   const struct GNUNET_MessageHeader *message,
2184                                   const struct GNUNET_ATS_Information*atsi)
2185 {
2186   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2187
2188   return handle_generic_close_ack (socket,
2189                                    tunnel,
2190                                    sender,
2191                                    (const struct GNUNET_STREAM_MessageHeader *)
2192                                    message,
2193                                    atsi,
2194                                    SHUT_WR);
2195 }
2196
2197
2198 /**
2199  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
2200  *
2201  * @param cls the closure
2202  * @param tunnel connection to the other end
2203  * @param tunnel_ctx the socket
2204  * @param sender who sent the message
2205  * @param message the actual message
2206  * @param atsi performance data for the connection
2207  * @return GNUNET_OK to keep the connection open,
2208  *         GNUNET_SYSERR to close it (signal serious error)
2209  */
2210 static int
2211 server_handle_receive_close (void *cls,
2212                              struct GNUNET_MESH_Tunnel *tunnel,
2213                              void **tunnel_ctx,
2214                              const struct GNUNET_PeerIdentity *sender,
2215                              const struct GNUNET_MessageHeader *message,
2216                              const struct GNUNET_ATS_Information*atsi)
2217 {
2218   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2219
2220   return
2221     handle_receive_close (socket,
2222                           tunnel,
2223                           sender,
2224                           (const struct GNUNET_STREAM_MessageHeader *) message,
2225                           atsi);
2226 }
2227
2228
2229 /**
2230  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
2231  *
2232  * @param cls the closure
2233  * @param tunnel connection to the other end
2234  * @param tunnel_ctx the socket
2235  * @param sender who sent the message
2236  * @param message the actual message
2237  * @param atsi performance data for the connection
2238  * @return GNUNET_OK to keep the connection open,
2239  *         GNUNET_SYSERR to close it (signal serious error)
2240  */
2241 static int
2242 server_handle_receive_close_ack (void *cls,
2243                                  struct GNUNET_MESH_Tunnel *tunnel,
2244                                  void **tunnel_ctx,
2245                                  const struct GNUNET_PeerIdentity *sender,
2246                                  const struct GNUNET_MessageHeader *message,
2247                                  const struct GNUNET_ATS_Information*atsi)
2248 {
2249   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2250
2251   return handle_generic_close_ack (socket,
2252                                    tunnel,
2253                                    sender,
2254                                    (const struct GNUNET_STREAM_MessageHeader *)
2255                                    message,
2256                                    atsi,
2257                                    SHUT_RD);
2258 }
2259
2260
2261 /**
2262  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
2263  *
2264  * @param cls the listen socket (from GNUNET_MESH_connect in
2265  *          GNUNET_STREAM_listen) 
2266  * @param tunnel connection to the other end
2267  * @param tunnel_ctx the socket
2268  * @param sender who sent the message
2269  * @param message the actual message
2270  * @param atsi performance data for the connection
2271  * @return GNUNET_OK to keep the connection open,
2272  *         GNUNET_SYSERR to close it (signal serious error)
2273  */
2274 static int
2275 server_handle_close (void *cls,
2276                      struct GNUNET_MESH_Tunnel *tunnel,
2277                      void **tunnel_ctx,
2278                      const struct GNUNET_PeerIdentity *sender,
2279                      const struct GNUNET_MessageHeader *message,
2280                      const struct GNUNET_ATS_Information*atsi)
2281 {
2282   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2283   
2284   return handle_close (socket,
2285                        tunnel,
2286                        sender,
2287                        (const struct GNUNET_STREAM_MessageHeader *) message,
2288                        atsi);
2289 }
2290
2291
2292 /**
2293  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
2294  *
2295  * @param cls the closure
2296  * @param tunnel connection to the other end
2297  * @param tunnel_ctx the socket
2298  * @param sender who sent the message
2299  * @param message the actual message
2300  * @param atsi performance data for the connection
2301  * @return GNUNET_OK to keep the connection open,
2302  *         GNUNET_SYSERR to close it (signal serious error)
2303  */
2304 static int
2305 server_handle_close_ack (void *cls,
2306                          struct GNUNET_MESH_Tunnel *tunnel,
2307                          void **tunnel_ctx,
2308                          const struct GNUNET_PeerIdentity *sender,
2309                          const struct GNUNET_MessageHeader *message,
2310                          const struct GNUNET_ATS_Information*atsi)
2311 {
2312   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2313
2314   return handle_generic_close_ack (socket,
2315                                    tunnel,
2316                                    sender,
2317                                    (const struct GNUNET_STREAM_MessageHeader *) 
2318                                    message,
2319                                    atsi,
2320                                    SHUT_RDWR);
2321 }
2322
2323
2324 /**
2325  * Handler for DATA_ACK messages
2326  *
2327  * @param socket the socket through which the ack was received
2328  * @param tunnel connection to the other end
2329  * @param sender who sent the message
2330  * @param ack the acknowledgment message
2331  * @param atsi performance data for the connection
2332  * @return GNUNET_OK to keep the connection open,
2333  *         GNUNET_SYSERR to close it (signal serious error)
2334  */
2335 static int
2336 handle_ack (struct GNUNET_STREAM_Socket *socket,
2337             struct GNUNET_MESH_Tunnel *tunnel,
2338             const struct GNUNET_PeerIdentity *sender,
2339             const struct GNUNET_STREAM_AckMessage *ack,
2340             const struct GNUNET_ATS_Information*atsi)
2341 {
2342   unsigned int packet;
2343   int need_retransmission;
2344   
2345
2346   if (GNUNET_PEER_search (sender) != socket->other_peer)
2347     {
2348       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2349                   "%x: Received ACK from non-confirming peer\n",
2350                   socket->our_id);
2351       return GNUNET_YES;
2352     }
2353
2354   switch (socket->state)
2355     {
2356     case (STATE_ESTABLISHED):
2357     case (STATE_RECEIVE_CLOSED):
2358     case (STATE_RECEIVE_CLOSE_WAIT):
2359       if (NULL == socket->write_handle)
2360         {
2361           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2362                       "%x: Received DATA_ACK when write_handle is NULL\n",
2363                       socket->our_id);
2364           return GNUNET_OK;
2365         }
2366       /* FIXME: increment in the base sequence number is breaking current flow
2367        */
2368       if (!((socket->write_sequence_number 
2369              - ntohl (ack->base_sequence_number)) < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
2370         {
2371           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2372                       "%x: Received DATA_ACK with unexpected base sequence "
2373                       "number\n",
2374                       socket->our_id);
2375           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2376                       "%x: Current write sequence: %u; Ack's base sequence: %u\n",
2377                       socket->our_id,
2378                       socket->write_sequence_number,
2379                       ntohl (ack->base_sequence_number));
2380           return GNUNET_OK;
2381         }
2382       /* FIXME: include the case when write_handle is cancelled - ignore the 
2383          acks */
2384
2385       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2386                   "%x: Received DATA_ACK from %x\n",
2387                   socket->our_id,
2388                   socket->other_peer);
2389       
2390       /* Cancel the retransmission task */
2391       if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
2392         {
2393           GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
2394           socket->retransmission_timeout_task_id = 
2395             GNUNET_SCHEDULER_NO_TASK;
2396         }
2397
2398       for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2399         {
2400           if (NULL == socket->write_handle->messages[packet]) break;
2401           if (ntohl (ack->base_sequence_number)
2402               >= ntohl (socket->write_handle->messages[packet]->sequence_number))
2403             ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
2404                                   packet,
2405                                   GNUNET_YES);
2406           else
2407             if (GNUNET_YES == 
2408                 ackbitmap_is_bit_set (&socket->write_handle->ack_bitmap,
2409                                       ntohl (socket->write_handle->messages[packet]->sequence_number)
2410                                       - ntohl (ack->base_sequence_number)))
2411               ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
2412                                     packet,
2413                                     GNUNET_YES);
2414         }
2415
2416       /* Update the receive window remaining
2417        FIXME : Should update with the value from a data ack with greater
2418        sequence number */
2419       socket->receiver_window_available = 
2420         ntohl (ack->receive_window_remaining);
2421
2422       /* Check if we have received all acknowledgements */
2423       need_retransmission = GNUNET_NO;
2424       for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2425         {
2426           if (NULL == socket->write_handle->messages[packet]) break;
2427           if (GNUNET_YES != ackbitmap_is_bit_set 
2428               (&socket->write_handle->ack_bitmap,packet))
2429             {
2430               need_retransmission = GNUNET_YES;
2431               break;
2432             }
2433         }
2434       if (GNUNET_YES == need_retransmission)
2435         {
2436           write_data (socket);
2437         }
2438       else      /* We have to call the write continuation callback now */
2439         {
2440           /* Free the packets */
2441           for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2442             {
2443               GNUNET_free_non_null (socket->write_handle->messages[packet]);
2444             }
2445           if (NULL != socket->write_handle->write_cont)
2446             socket->write_handle->write_cont
2447               (socket->write_handle->write_cont_cls,
2448                socket->status,
2449                socket->write_handle->size);
2450           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2451                       "%x: Write completion callback completed\n",
2452                       socket->our_id);
2453           /* We are done with the write handle - Freeing it */
2454           GNUNET_free (socket->write_handle);
2455           socket->write_handle = NULL;
2456         }
2457       break;
2458     default:
2459       break;
2460     }
2461   return GNUNET_OK;
2462 }
2463
2464
2465 /**
2466  * Handler for DATA_ACK messages
2467  *
2468  * @param cls the 'struct GNUNET_STREAM_Socket'
2469  * @param tunnel connection to the other end
2470  * @param tunnel_ctx unused
2471  * @param sender who sent the message
2472  * @param message the actual message
2473  * @param atsi performance data for the connection
2474  * @return GNUNET_OK to keep the connection open,
2475  *         GNUNET_SYSERR to close it (signal serious error)
2476  */
2477 static int
2478 client_handle_ack (void *cls,
2479                    struct GNUNET_MESH_Tunnel *tunnel,
2480                    void **tunnel_ctx,
2481                    const struct GNUNET_PeerIdentity *sender,
2482                    const struct GNUNET_MessageHeader *message,
2483                    const struct GNUNET_ATS_Information*atsi)
2484 {
2485   struct GNUNET_STREAM_Socket *socket = cls;
2486   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2487  
2488   return handle_ack (socket, tunnel, sender, ack, atsi);
2489 }
2490
2491
2492 /**
2493  * Handler for DATA_ACK messages
2494  *
2495  * @param cls the server's listen socket
2496  * @param tunnel connection to the other end
2497  * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
2498  * @param sender who sent the message
2499  * @param message the actual message
2500  * @param atsi performance data for the connection
2501  * @return GNUNET_OK to keep the connection open,
2502  *         GNUNET_SYSERR to close it (signal serious error)
2503  */
2504 static int
2505 server_handle_ack (void *cls,
2506                    struct GNUNET_MESH_Tunnel *tunnel,
2507                    void **tunnel_ctx,
2508                    const struct GNUNET_PeerIdentity *sender,
2509                    const struct GNUNET_MessageHeader *message,
2510                    const struct GNUNET_ATS_Information*atsi)
2511 {
2512   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2513   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2514  
2515   return handle_ack (socket, tunnel, sender, ack, atsi);
2516 }
2517
2518
2519 /**
2520  * For client message handlers, the stream socket is in the
2521  * closure argument.
2522  */
2523 static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
2524   {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2525   {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2526    sizeof (struct GNUNET_STREAM_AckMessage) },
2527   {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2528    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2529   {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2530    sizeof (struct GNUNET_STREAM_MessageHeader)},
2531   {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2532    sizeof (struct GNUNET_STREAM_MessageHeader)},
2533   {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2534    sizeof (struct GNUNET_STREAM_MessageHeader)},
2535   {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2536    sizeof (struct GNUNET_STREAM_MessageHeader)},
2537   {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2538    sizeof (struct GNUNET_STREAM_MessageHeader)},
2539   {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2540    sizeof (struct GNUNET_STREAM_MessageHeader)},
2541   {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2542    sizeof (struct GNUNET_STREAM_MessageHeader)},
2543   {NULL, 0, 0}
2544 };
2545
2546
2547 /**
2548  * For server message handlers, the stream socket is in the
2549  * tunnel context, and the listen socket in the closure argument.
2550  */
2551 static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
2552   {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2553   {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2554    sizeof (struct GNUNET_STREAM_AckMessage) },
2555   {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 
2556    sizeof (struct GNUNET_STREAM_MessageHeader)},
2557   {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2558    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2559   {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2560    sizeof (struct GNUNET_STREAM_MessageHeader)},
2561   {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2562    sizeof (struct GNUNET_STREAM_MessageHeader)},
2563   {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2564    sizeof (struct GNUNET_STREAM_MessageHeader)},
2565   {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2566    sizeof (struct GNUNET_STREAM_MessageHeader)},
2567   {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2568    sizeof (struct GNUNET_STREAM_MessageHeader)},
2569   {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2570    sizeof (struct GNUNET_STREAM_MessageHeader)},
2571   {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2572    sizeof (struct GNUNET_STREAM_MessageHeader)},
2573   {NULL, 0, 0}
2574 };
2575
2576
2577 /**
2578  * Function called when our target peer is connected to our tunnel
2579  *
2580  * @param cls the socket for which this tunnel is created
2581  * @param peer the peer identity of the target
2582  * @param atsi performance data for the connection
2583  */
2584 static void
2585 mesh_peer_connect_callback (void *cls,
2586                             const struct GNUNET_PeerIdentity *peer,
2587                             const struct GNUNET_ATS_Information * atsi)
2588 {
2589   struct GNUNET_STREAM_Socket *socket = cls;
2590   struct GNUNET_STREAM_MessageHeader *message;
2591   GNUNET_PEER_Id connected_peer;
2592
2593   connected_peer = GNUNET_PEER_search (peer);
2594   
2595   if (connected_peer != socket->other_peer)
2596     {
2597       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2598                   "%x: A peer which is not our target has connected",
2599                   "to our tunnel\n",
2600                   socket->our_id);
2601       return;
2602     }
2603   
2604   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2605               "%x: Target peer %x connected\n", 
2606               socket->our_id,
2607               connected_peer);
2608   
2609   /* Set state to INIT */
2610   socket->state = STATE_INIT;
2611
2612   /* Send HELLO message */
2613   message = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
2614   message->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
2615   message->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
2616   queue_message (socket,
2617                  message,
2618                  &set_state_hello_wait,
2619                  NULL);
2620
2621   /* Call open callback */
2622   if (NULL == socket->open_cb)
2623     {
2624       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2625                   "STREAM_open callback is NULL\n");
2626     }
2627 }
2628
2629
2630 /**
2631  * Function called when our target peer is disconnected from our tunnel
2632  *
2633  * @param cls the socket associated which this tunnel
2634  * @param peer the peer identity of the target
2635  */
2636 static void
2637 mesh_peer_disconnect_callback (void *cls,
2638                                const struct GNUNET_PeerIdentity *peer)
2639 {
2640
2641 }
2642
2643
2644 /**
2645  * Method called whenever a peer creates a tunnel to us
2646  *
2647  * @param cls closure
2648  * @param tunnel new handle to the tunnel
2649  * @param initiator peer that started the tunnel
2650  * @param atsi performance information for the tunnel
2651  * @return initial tunnel context for the tunnel
2652  *         (can be NULL -- that's not an error)
2653  */
2654 static void *
2655 new_tunnel_notify (void *cls,
2656                    struct GNUNET_MESH_Tunnel *tunnel,
2657                    const struct GNUNET_PeerIdentity *initiator,
2658                    const struct GNUNET_ATS_Information *atsi)
2659 {
2660   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2661   struct GNUNET_STREAM_Socket *socket;
2662
2663   /* FIXME: If a tunnel is already created, we should not accept new tunnels
2664      from the same peer again until the socket is closed */
2665
2666   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2667   socket->other_peer = GNUNET_PEER_intern (initiator);
2668   socket->tunnel = tunnel;
2669   socket->session_id = 0;       /* FIXME */
2670   socket->state = STATE_INIT;
2671   socket->lsocket = lsocket;
2672   socket->our_id = lsocket->our_id;
2673   
2674   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2675               "%x: Peer %x initiated tunnel to us\n", 
2676               socket->our_id,
2677               socket->other_peer);
2678   
2679   /* FIXME: Copy MESH handle from lsocket to socket */
2680   
2681   return socket;
2682 }
2683
2684
2685 /**
2686  * Function called whenever an inbound tunnel is destroyed.  Should clean up
2687  * any associated state.  This function is NOT called if the client has
2688  * explicitly asked for the tunnel to be destroyed using
2689  * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
2690  * the tunnel.
2691  *
2692  * @param cls closure (set from GNUNET_MESH_connect)
2693  * @param tunnel connection to the other end (henceforth invalid)
2694  * @param tunnel_ctx place where local state associated
2695  *                   with the tunnel is stored
2696  */
2697 static void 
2698 tunnel_cleaner (void *cls,
2699                 const struct GNUNET_MESH_Tunnel *tunnel,
2700                 void *tunnel_ctx)
2701 {
2702   struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
2703   
2704   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2705               "%x: Peer %x has terminated connection abruptly\n",
2706               socket->our_id,
2707               socket->other_peer);
2708
2709   socket->status = GNUNET_STREAM_SHUTDOWN;
2710
2711   /* Clear Transmit handles */
2712   if (NULL != socket->transmit_handle)
2713     {
2714       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2715       socket->transmit_handle = NULL;
2716     }
2717   socket->tunnel = NULL;
2718 }
2719
2720
2721 /*****************/
2722 /* API functions */
2723 /*****************/
2724
2725
2726 /**
2727  * Tries to open a stream to the target peer
2728  *
2729  * @param cfg configuration to use
2730  * @param target the target peer to which the stream has to be opened
2731  * @param app_port the application port number which uniquely identifies this
2732  *            stream
2733  * @param open_cb this function will be called after stream has be established 
2734  * @param open_cb_cls the closure for open_cb
2735  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
2736  * @return if successful it returns the stream socket; NULL if stream cannot be
2737  *         opened 
2738  */
2739 struct GNUNET_STREAM_Socket *
2740 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
2741                     const struct GNUNET_PeerIdentity *target,
2742                     GNUNET_MESH_ApplicationType app_port,
2743                     GNUNET_STREAM_OpenCallback open_cb,
2744                     void *open_cb_cls,
2745                     ...)
2746 {
2747   struct GNUNET_STREAM_Socket *socket;
2748   struct GNUNET_PeerIdentity own_peer_id;
2749   enum GNUNET_STREAM_Option option;
2750   va_list vargs;                /* Variable arguments */
2751
2752   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2753               "%s\n", __func__);
2754
2755   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2756   socket->other_peer = GNUNET_PEER_intern (target);
2757   socket->open_cb = open_cb;
2758   socket->open_cls = open_cb_cls;
2759   GNUNET_TESTING_get_peer_identity (cfg, &own_peer_id);
2760   socket->our_id = GNUNET_PEER_intern (&own_peer_id);
2761   
2762   /* Set defaults */
2763   socket->retransmit_timeout = 
2764     GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
2765
2766   va_start (vargs, open_cb_cls); /* Parse variable args */
2767   do {
2768     option = va_arg (vargs, enum GNUNET_STREAM_Option);
2769     switch (option)
2770       {
2771       case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
2772         /* Expect struct GNUNET_TIME_Relative */
2773         socket->retransmit_timeout = va_arg (vargs,
2774                                              struct GNUNET_TIME_Relative);
2775         break;
2776       case GNUNET_STREAM_OPTION_END:
2777         break;
2778       }
2779   } while (GNUNET_STREAM_OPTION_END != option);
2780   va_end (vargs);               /* End of variable args parsing */
2781   socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
2782                                       10,  /* QUEUE size as parameter? */
2783                                       socket, /* cls */
2784                                       NULL, /* No inbound tunnel handler */
2785                                       &tunnel_cleaner, /* FIXME: not required? */
2786                                       client_message_handlers,
2787                                       &app_port); /* We don't get inbound tunnels */
2788   if (NULL == socket->mesh)   /* Fail if we cannot connect to mesh */
2789     {
2790       GNUNET_free (socket);
2791       return NULL;
2792     }
2793
2794   /* Now create the mesh tunnel to target */
2795   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2796               "Creating MESH Tunnel\n");
2797   socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
2798                                               NULL, /* Tunnel context */
2799                                               &mesh_peer_connect_callback,
2800                                               &mesh_peer_disconnect_callback,
2801                                               socket);
2802   GNUNET_assert (NULL != socket->tunnel);
2803   GNUNET_MESH_peer_request_connect_add (socket->tunnel,
2804                                         target);
2805   
2806   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2807               "%s() END\n", __func__);
2808   return socket;
2809 }
2810
2811
2812 /**
2813  * Shutdown the stream for reading or writing (similar to man 2 shutdown).
2814  *
2815  * @param socket the stream socket
2816  * @param operation SHUT_RD, SHUT_WR or SHUT_RDWR
2817  * @param completion_cb the callback that will be called upon successful
2818  *          shutdown of given operation
2819  * @param completion_cls the closure for the completion callback
2820  * @return the shutdown handle
2821  */
2822 struct GNUNET_STREAM_ShutdownHandle *
2823 GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
2824                         int operation,
2825                         GNUNET_STREAM_ShutdownCompletion completion_cb,
2826                         void *completion_cls)
2827 {
2828   struct GNUNET_STREAM_ShutdownHandle *handle;
2829   struct GNUNET_STREAM_MessageHeader *msg;
2830   
2831   GNUNET_assert (NULL == socket->shutdown_handle);
2832
2833   handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ShutdownHandle));
2834   handle->socket = socket;
2835   handle->completion_cb = completion_cb;
2836   handle->completion_cls = completion_cls;
2837   socket->shutdown_handle = handle;
2838
2839   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
2840   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
2841   switch (operation)
2842     {
2843     case SHUT_RD:
2844       handle->operation = SHUT_RD;
2845       if (NULL != socket->read_handle)
2846         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2847                     "Existing read handle should be cancelled before shutting"
2848                     " down reading\n");
2849       msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
2850       queue_message (socket,
2851                      msg,
2852                      &set_state_receive_close_wait,
2853                      NULL);
2854       break;
2855     case SHUT_WR:
2856       handle->operation = SHUT_WR;
2857       if (NULL != socket->write_handle)
2858         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2859                     "Existing write handle should be cancelled before shutting"
2860                     " down writing\n");
2861       msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
2862       queue_message (socket,
2863                      msg,
2864                      &set_state_transmit_close_wait,
2865                      NULL);
2866       break;
2867     case SHUT_RDWR:
2868       handle->operation = SHUT_RDWR;
2869       if (NULL != socket->write_handle)
2870         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2871                     "Existing write handle should be cancelled before shutting"
2872                     " down writing\n");
2873       if (NULL != socket->read_handle)
2874         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2875                     "Existing read handle should be cancelled before shutting"
2876                     " down reading\n");
2877       msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
2878       queue_message (socket,
2879                      msg,
2880                      &set_state_close_wait,
2881                      NULL);
2882       break;
2883     default:
2884       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2885                   "GNUNET_STREAM_shutdown called with invalid value for "
2886                   "parameter operation -- Ignoring\n");
2887       GNUNET_free (msg);
2888       GNUNET_free (handle);
2889       return NULL;
2890     }
2891   handle->close_msg_retransmission_task_id =
2892     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
2893                                   &close_msg_retransmission_task,
2894                                   handle);
2895   return handle;
2896 }
2897
2898
2899 /**
2900  * Cancels a pending shutdown
2901  *
2902  * @param handle the shutdown handle returned from GNUNET_STREAM_shutdown
2903  */
2904 void
2905 GNUNET_STREAM_shutdown_cancel (struct GNUNET_STREAM_ShutdownHandle *handle)
2906 {
2907   if (GNUNET_SCHEDULER_NO_TASK != handle->close_msg_retransmission_task_id)
2908     GNUNET_SCHEDULER_cancel (handle->close_msg_retransmission_task_id);
2909   GNUNET_free (handle);
2910   return;
2911 }
2912
2913
2914 /**
2915  * Closes the stream
2916  *
2917  * @param socket the stream socket
2918  */
2919 void
2920 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
2921 {
2922   struct MessageQueue *head;
2923
2924   GNUNET_break (NULL == socket->read_handle);
2925   GNUNET_break (NULL == socket->write_handle);
2926
2927   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
2928     {
2929       /* socket closed with read task pending!? */
2930       GNUNET_break (0);
2931       GNUNET_SCHEDULER_cancel (socket->read_task_id);
2932       socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
2933     }
2934   
2935   /* Terminate the ack'ing tasks if they are still present */
2936   if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK)
2937     {
2938       GNUNET_SCHEDULER_cancel (socket->ack_task_id);
2939       socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
2940     }
2941
2942   /* Clear Transmit handles */
2943   if (NULL != socket->transmit_handle)
2944     {
2945       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2946       socket->transmit_handle = NULL;
2947     }
2948
2949   /* Clear existing message queue */
2950   while (NULL != (head = socket->queue_head)) {
2951     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
2952                                  socket->queue_tail,
2953                                  head);
2954     GNUNET_free (head->message);
2955     GNUNET_free (head);
2956   }
2957
2958   /* Close associated tunnel */
2959   if (NULL != socket->tunnel)
2960     {
2961       GNUNET_MESH_tunnel_destroy (socket->tunnel);
2962       socket->tunnel = NULL;
2963     }
2964
2965   /* Close mesh connection */
2966   if (NULL != socket->mesh && NULL == socket->lsocket)
2967     {
2968       GNUNET_MESH_disconnect (socket->mesh);
2969       socket->mesh = NULL;
2970     }
2971   
2972   /* Release receive buffer */
2973   if (NULL != socket->receive_buffer)
2974     {
2975       GNUNET_free (socket->receive_buffer);
2976     }
2977
2978   GNUNET_free (socket);
2979 }
2980
2981
2982 /**
2983  * Listens for stream connections for a specific application ports
2984  *
2985  * @param cfg the configuration to use
2986  * @param app_port the application port for which new streams will be accepted
2987  * @param listen_cb this function will be called when a peer tries to establish
2988  *            a stream with us
2989  * @param listen_cb_cls closure for listen_cb
2990  * @return listen socket, NULL for any error
2991  */
2992 struct GNUNET_STREAM_ListenSocket *
2993 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
2994                       GNUNET_MESH_ApplicationType app_port,
2995                       GNUNET_STREAM_ListenCallback listen_cb,
2996                       void *listen_cb_cls)
2997 {
2998   /* FIXME: Add variable args for passing configration options? */
2999   struct GNUNET_STREAM_ListenSocket *lsocket;
3000   struct GNUNET_PeerIdentity our_peer_id;
3001
3002   lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
3003   lsocket->port = app_port;
3004   lsocket->listen_cb = listen_cb;
3005   lsocket->listen_cb_cls = listen_cb_cls;
3006   GNUNET_TESTING_get_peer_identity (cfg, &our_peer_id);
3007   lsocket->our_id = GNUNET_PEER_intern (&our_peer_id);
3008   lsocket->mesh = GNUNET_MESH_connect (cfg,
3009                                        10, /* FIXME: QUEUE size as parameter? */
3010                                        lsocket, /* Closure */
3011                                        &new_tunnel_notify,
3012                                        &tunnel_cleaner,
3013                                        server_message_handlers,
3014                                        &app_port);
3015   GNUNET_assert (NULL != lsocket->mesh);
3016   return lsocket;
3017 }
3018
3019
3020 /**
3021  * Closes the listen socket
3022  *
3023  * @param lsocket the listen socket
3024  */
3025 void
3026 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
3027 {
3028   /* Close MESH connection */
3029   GNUNET_assert (NULL != lsocket->mesh);
3030   GNUNET_MESH_disconnect (lsocket->mesh);
3031   
3032   GNUNET_free (lsocket);
3033 }
3034
3035
3036 /**
3037  * Tries to write the given data to the stream. The maximum size of data that
3038  * can be written as part of a write operation is (64 * (64000 - sizeof (struct
3039  * GNUNET_STREAM_DataMessage))). If size is greater than this it is not an API
3040  * violation, however only the said number of maximum bytes will be written.
3041  *
3042  * @param socket the socket representing a stream
3043  * @param data the data buffer from where the data is written into the stream
3044  * @param size the number of bytes to be written from the data buffer
3045  * @param timeout the timeout period
3046  * @param write_cont the function to call upon writing some bytes into the
3047  *          stream 
3048  * @param write_cont_cls the closure
3049  *
3050  * @return handle to cancel the operation; if a previous write is pending or
3051  *           the stream has been shutdown for this operation then write_cont is
3052  *           immediately called and NULL is returned.
3053  */
3054 struct GNUNET_STREAM_IOWriteHandle *
3055 GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
3056                      const void *data,
3057                      size_t size,
3058                      struct GNUNET_TIME_Relative timeout,
3059                      GNUNET_STREAM_CompletionContinuation write_cont,
3060                      void *write_cont_cls)
3061 {
3062   unsigned int num_needed_packets;
3063   unsigned int packet;
3064   struct GNUNET_STREAM_IOWriteHandle *io_handle;
3065   uint32_t packet_size;
3066   uint32_t payload_size;
3067   struct GNUNET_STREAM_DataMessage *data_msg;
3068   const void *sweep;
3069   struct GNUNET_TIME_Relative ack_deadline;
3070
3071   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3072               "%s\n", __func__);
3073
3074   /* Return NULL if there is already a write request pending */
3075   if (NULL != socket->write_handle)
3076   {
3077     GNUNET_break (0);
3078     return NULL;
3079   }
3080
3081   switch (socket->state)
3082     {
3083     case STATE_TRANSMIT_CLOSED:
3084     case STATE_TRANSMIT_CLOSE_WAIT:
3085     case STATE_CLOSED:
3086     case STATE_CLOSE_WAIT:
3087       if (NULL != write_cont)
3088         write_cont (write_cont_cls, GNUNET_STREAM_SHUTDOWN, 0);
3089       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3090                   "%s() END\n", __func__);
3091       return NULL;
3092     case STATE_INIT:
3093     case STATE_LISTEN:
3094     case STATE_HELLO_WAIT:
3095       if (NULL != write_cont)
3096         /* FIXME: GNUNET_STREAM_SYSERR?? */
3097         write_cont (write_cont_cls, GNUNET_STREAM_SYSERR, 0);
3098       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3099                   "%s() END\n", __func__);
3100       return NULL;
3101     case STATE_ESTABLISHED:
3102     case STATE_RECEIVE_CLOSED:
3103     case STATE_RECEIVE_CLOSE_WAIT:
3104       break;
3105     }
3106
3107   if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size)
3108     size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH  * max_payload_size;
3109   num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size;
3110   io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
3111   io_handle->socket = socket;
3112   io_handle->write_cont = write_cont;
3113   io_handle->write_cont_cls = write_cont_cls;
3114   io_handle->size = size;
3115   sweep = data;
3116   /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3117      determined from RTT */
3118   ack_deadline = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
3119   /* Divide the given buffer into packets for sending */
3120   for (packet=0; packet < num_needed_packets; packet++)
3121     {
3122       if ((packet + 1) * max_payload_size < size) 
3123         {
3124           payload_size = max_payload_size;
3125           packet_size = MAX_PACKET_SIZE;
3126         }
3127       else 
3128         {
3129           payload_size = size - packet * max_payload_size;
3130           packet_size =  payload_size + sizeof (struct
3131                                                 GNUNET_STREAM_DataMessage); 
3132         }
3133       io_handle->messages[packet] = GNUNET_malloc (packet_size);
3134       io_handle->messages[packet]->header.header.size = htons (packet_size);
3135       io_handle->messages[packet]->header.header.type =
3136         htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
3137       io_handle->messages[packet]->sequence_number =
3138         htonl (socket->write_sequence_number++);
3139       io_handle->messages[packet]->offset = htonl (socket->write_offset);
3140
3141       /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3142          determined from RTT */
3143       io_handle->messages[packet]->ack_deadline =
3144         GNUNET_TIME_relative_hton (ack_deadline);
3145       data_msg = io_handle->messages[packet];
3146       /* Copy data from given buffer to the packet */
3147       memcpy (&data_msg[1],
3148               sweep,
3149               payload_size);
3150       sweep += payload_size;
3151       socket->write_offset += payload_size;
3152     }
3153   socket->write_handle = io_handle;
3154   write_data (socket);
3155
3156   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3157               "%s() END\n", __func__);
3158
3159   return io_handle;
3160 }
3161
3162
3163
3164 /**
3165  * Tries to read data from the stream.
3166  *
3167  * @param socket the socket representing a stream
3168  * @param timeout the timeout period
3169  * @param proc function to call with data (once only)
3170  * @param proc_cls the closure for proc
3171  *
3172  * @return handle to cancel the operation; if the stream has been shutdown for
3173  *           this type of opeartion then the DataProcessor is immediately
3174  *           called with GNUNET_STREAM_SHUTDOWN as status and NULL if returned
3175  */
3176 struct GNUNET_STREAM_IOReadHandle *
3177 GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
3178                     struct GNUNET_TIME_Relative timeout,
3179                     GNUNET_STREAM_DataProcessor proc,
3180                     void *proc_cls)
3181 {
3182   struct GNUNET_STREAM_IOReadHandle *read_handle;
3183   
3184   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3185               "%x: %s()\n", 
3186               socket->our_id,
3187               __func__);
3188
3189   /* Return NULL if there is already a read handle; the user has to cancel that
3190   first before continuing or has to wait until it is completed */
3191   if (NULL != socket->read_handle) return NULL;
3192
3193   GNUNET_assert (NULL != proc);
3194
3195   switch (socket->state)
3196     {
3197     case STATE_RECEIVE_CLOSED:
3198     case STATE_RECEIVE_CLOSE_WAIT:
3199     case STATE_CLOSED:
3200     case STATE_CLOSE_WAIT:
3201       proc (proc_cls, GNUNET_STREAM_SHUTDOWN, NULL, 0);
3202       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3203                   "%x: %s() END\n",
3204                   socket->our_id,
3205                   __func__);
3206       return NULL;
3207     default:
3208       break;
3209     }
3210
3211   read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
3212   read_handle->proc = proc;
3213   read_handle->proc_cls = proc_cls;
3214   socket->read_handle = read_handle;
3215
3216   /* Check if we have a packet at bitmap 0 */
3217   if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
3218                                           0))
3219     {
3220       socket->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
3221                                                        socket);
3222    
3223     }
3224   
3225   /* Setup the read timeout task */
3226   socket->read_io_timeout_task_id =
3227     GNUNET_SCHEDULER_add_delayed (timeout,
3228                                   &read_io_timeout,
3229                                   socket);
3230   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3231               "%x: %s() END\n",
3232               socket->our_id,
3233               __func__);
3234   return read_handle;
3235 }
3236
3237
3238 /**
3239  * Cancel pending write operation.
3240  *
3241  * @param ioh handle to operation to cancel
3242  */
3243 void
3244 GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
3245 {
3246   struct GNUNET_STREAM_Socket *socket = ioh->socket;
3247   unsigned int packet;
3248
3249   GNUNET_assert (NULL != socket->write_handle);
3250   GNUNET_assert (socket->write_handle == ioh);
3251
3252   if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
3253     {
3254       GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
3255       socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
3256     }
3257
3258   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
3259     {
3260       if (NULL == ioh->messages[packet]) break;
3261       GNUNET_free (ioh->messages[packet]);
3262     }
3263       
3264   GNUNET_free (socket->write_handle);
3265   socket->write_handle = NULL;
3266   return;
3267 }
3268
3269
3270 /**
3271  * Cancel pending read operation.
3272  *
3273  * @param ioh handle to operation to cancel
3274  */
3275 void
3276 GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh)
3277 {
3278   return;
3279 }