-cleanup
[oweals/gnunet.git] / src / stream / stream_api.c
1 /*
2   This file is part of GNUnet.
3   (C) 2012 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /* TODO:
22  *
23  * Checks for matching the sender and socket->other_peer in server
24  * message handlers  
25  *
26  * Decrement PEER intern count during socket close and listen close to free the
27  * memory used for PEER interning
28  *
29  * Add code for write io timeout
30  *
31  * Include retransmission for control messages
32  **/
33
34 /**
35  * @file stream/stream_api.c
36  * @brief Implementation of the stream library
37  * @author Sree Harsha Totakura
38  */
39
40
41 #include "platform.h"
42 #include "gnunet_common.h"
43 #include "gnunet_crypto_lib.h"
44 #include "gnunet_stream_lib.h"
45 #include "gnunet_testing_lib.h"
46 #include "stream_protocol.h"
47
48
49 /**
50  * The maximum packet size of a stream packet
51  */
52 #define MAX_PACKET_SIZE 64000
53
54 /**
55  * Receive buffer
56  */
57 #define RECEIVE_BUFFER_SIZE 4096000
58
59 /**
60  * The maximum payload a data message packet can carry
61  */
62 static size_t max_payload_size = 
63   MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
64
65 /**
66  * states in the Protocol
67  */
68 enum State
69   {
70     /**
71      * Client initialization state
72      */
73     STATE_INIT,
74
75     /**
76      * Listener initialization state 
77      */
78     STATE_LISTEN,
79
80     /**
81      * Pre-connection establishment state
82      */
83     STATE_HELLO_WAIT,
84
85     /**
86      * State where a connection has been established
87      */
88     STATE_ESTABLISHED,
89
90     /**
91      * State where the socket is closed on our side and waiting to be ACK'ed
92      */
93     STATE_RECEIVE_CLOSE_WAIT,
94
95     /**
96      * State where the socket is closed for reading
97      */
98     STATE_RECEIVE_CLOSED,
99
100     /**
101      * State where the socket is closed on our side and waiting to be ACK'ed
102      */
103     STATE_TRANSMIT_CLOSE_WAIT,
104
105     /**
106      * State where the socket is closed for writing
107      */
108     STATE_TRANSMIT_CLOSED,
109
110     /**
111      * State where the socket is closed on our side and waiting to be ACK'ed
112      */
113     STATE_CLOSE_WAIT,
114
115     /**
116      * State where the socket is closed
117      */
118     STATE_CLOSED 
119   };
120
121
122 /**
123  * Functions of this type are called when a message is written
124  *
125  * @param cls the closure from queue_message
126  * @param socket the socket the written message was bound to
127  */
128 typedef void (*SendFinishCallback) (void *cls,
129                                     struct GNUNET_STREAM_Socket *socket);
130
131
132 /**
133  * The send message queue
134  */
135 struct MessageQueue
136 {
137   /**
138    * The message
139    */
140   struct GNUNET_STREAM_MessageHeader *message;
141
142   /**
143    * Callback to be called when the message is sent
144    */
145   SendFinishCallback finish_cb;
146
147   /**
148    * The closure for finish_cb
149    */
150   void *finish_cb_cls;
151
152   /**
153    * The next message in queue. Should be NULL in the last message
154    */
155   struct MessageQueue *next;
156
157   /**
158    * The next message in queue. Should be NULL in the first message
159    */
160   struct MessageQueue *prev;
161 };
162
163
164 /**
165  * The STREAM Socket Handler
166  */
167 struct GNUNET_STREAM_Socket
168 {
169   /**
170    * Retransmission timeout
171    */
172   struct GNUNET_TIME_Relative retransmit_timeout;
173
174   /**
175    * The Acknowledgement Bitmap
176    */
177   GNUNET_STREAM_AckBitmap ack_bitmap;
178
179   /**
180    * Time when the Acknowledgement was queued
181    */
182   struct GNUNET_TIME_Absolute ack_time_registered;
183
184   /**
185    * Queued Acknowledgement deadline
186    */
187   struct GNUNET_TIME_Relative ack_time_deadline;
188
189   /**
190    * The mesh handle
191    */
192   struct GNUNET_MESH_Handle *mesh;
193
194   /**
195    * The mesh tunnel handle
196    */
197   struct GNUNET_MESH_Tunnel *tunnel;
198
199   /**
200    * Stream open closure
201    */
202   void *open_cls;
203
204   /**
205    * Stream open callback
206    */
207   GNUNET_STREAM_OpenCallback open_cb;
208
209   /**
210    * The current transmit handle (if a pending transmit request exists)
211    */
212   struct GNUNET_MESH_TransmitHandle *transmit_handle;
213
214   /**
215    * The current act transmit handle (if a pending ack transmit request exists)
216    */
217   struct GNUNET_MESH_TransmitHandle *ack_transmit_handle;
218
219   /**
220    * Pointer to the current ack message using in ack_task
221    */
222   struct GNUNET_STREAM_AckMessage *ack_msg;
223
224   /**
225    * The current message associated with the transmit handle
226    */
227   struct MessageQueue *queue_head;
228
229   /**
230    * The queue tail, should always point to the last message in queue
231    */
232   struct MessageQueue *queue_tail;
233
234   /**
235    * The write IO_handle associated with this socket
236    */
237   struct GNUNET_STREAM_IOWriteHandle *write_handle;
238
239   /**
240    * The read IO_handle associated with this socket
241    */
242   struct GNUNET_STREAM_IOReadHandle *read_handle;
243
244   /**
245    * The shutdown handle associated with this socket
246    */
247   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
248
249   /**
250    * Buffer for storing received messages
251    */
252   void *receive_buffer;
253
254   /**
255    * The listen socket from which this socket is derived. Should be NULL if it
256    * is not a derived socket
257    */
258   struct GNUNET_STREAM_ListenSocket *lsocket;
259
260   /**
261    * Task identifier for the read io timeout task
262    */
263   GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id;
264
265   /**
266    * Task identifier for retransmission task after timeout
267    */
268   GNUNET_SCHEDULER_TaskIdentifier retransmission_timeout_task_id;
269
270   /**
271    * The task for sending timely Acks
272    */
273   GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
274
275   /**
276    * Task scheduled to continue a read operation.
277    */
278   GNUNET_SCHEDULER_TaskIdentifier read_task_id;
279
280   /**
281    * The state of the protocol associated with this socket
282    */
283   enum State state;
284
285   /**
286    * The status of the socket
287    */
288   enum GNUNET_STREAM_Status status;
289
290   /**
291    * The number of previous timeouts; FIXME: currently not used
292    */
293   unsigned int retries;
294
295   /**
296    * The peer identity of the peer at the other end of the stream
297    */
298   GNUNET_PEER_Id other_peer;
299
300   /**
301    * Our Peer Identity (for debugging)
302    */
303   GNUNET_PEER_Id our_id;
304
305   /**
306    * The application port number (type: uint32_t)
307    */
308   GNUNET_MESH_ApplicationType app_port;
309
310   /**
311    * The session id associated with this stream connection
312    * FIXME: Not used currently, may be removed
313    */
314   uint32_t session_id;
315
316   /**
317    * Write sequence number. Set to random when sending HELLO(client) and
318    * HELLO_ACK(server) 
319    */
320   uint32_t write_sequence_number;
321
322   /**
323    * Read sequence number. This number's value is determined during handshake
324    */
325   uint32_t read_sequence_number;
326
327   /**
328    * The receiver buffer size
329    */
330   uint32_t receive_buffer_size;
331
332   /**
333    * The receiver buffer boundaries
334    */
335   uint32_t receive_buffer_boundaries[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
336
337   /**
338    * receiver's available buffer after the last acknowledged packet
339    */
340   uint32_t receiver_window_available;
341
342   /**
343    * The offset pointer used during write operation
344    */
345   uint32_t write_offset;
346
347   /**
348    * The offset after which we are expecting data
349    */
350   uint32_t read_offset;
351
352   /**
353    * The offset upto which user has read from the received buffer
354    */
355   uint32_t copy_offset;
356 };
357
358
359 /**
360  * A socket for listening
361  */
362 struct GNUNET_STREAM_ListenSocket
363 {
364   /**
365    * The mesh handle
366    */
367   struct GNUNET_MESH_Handle *mesh;
368
369   /**
370    * The callback function which is called after successful opening socket
371    */
372   GNUNET_STREAM_ListenCallback listen_cb;
373
374   /**
375    * The call back closure
376    */
377   void *listen_cb_cls;
378
379   /**
380    * Our interned Peer's identity
381    */
382   GNUNET_PEER_Id our_id;
383
384   /**
385    * The service port
386    * FIXME: Remove if not required!
387    */
388   GNUNET_MESH_ApplicationType port;
389 };
390
391
392 /**
393  * The IO Write Handle
394  */
395 struct GNUNET_STREAM_IOWriteHandle
396 {
397   /**
398    * The socket to which this write handle is associated
399    */
400   struct GNUNET_STREAM_Socket *socket;
401
402   /**
403    * The packet_buffers associated with this Handle
404    */
405   struct GNUNET_STREAM_DataMessage *messages[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
406
407   /**
408    * The write continuation callback
409    */
410   GNUNET_STREAM_CompletionContinuation write_cont;
411
412   /**
413    * Write continuation closure
414    */
415   void *write_cont_cls;
416
417   /**
418    * The bitmap of this IOHandle; Corresponding bit for a message is set when
419    * it has been acknowledged by the receiver
420    */
421   GNUNET_STREAM_AckBitmap ack_bitmap;
422
423   /**
424    * Number of bytes in this write handle
425    */
426   size_t size;
427 };
428
429
430 /**
431  * The IO Read Handle
432  */
433 struct GNUNET_STREAM_IOReadHandle
434 {
435   /**
436    * Callback for the read processor
437    */
438   GNUNET_STREAM_DataProcessor proc;
439
440   /**
441    * The closure pointer for the read processor callback
442    */
443   void *proc_cls;
444 };
445
446
447 /**
448  * Handle for Shutdown
449  */
450 struct GNUNET_STREAM_ShutdownHandle
451 {
452   /**
453    * The socket associated with this shutdown handle
454    */
455   struct GNUNET_STREAM_Socket *socket;
456
457   /**
458    * Shutdown completion callback
459    */
460   GNUNET_STREAM_ShutdownCompletion completion_cb;
461
462   /**
463    * Closure for completion callback
464    */
465   void *completion_cls;
466
467   /**
468    * Close message retransmission task id
469    */
470   GNUNET_SCHEDULER_TaskIdentifier close_msg_retransmission_task_id;
471
472   /**
473    * Which operation to shutdown? SHUT_RD, SHUT_WR or SHUT_RDWR
474    */
475   int operation;  
476 };
477
478
479 /**
480  * Default value in seconds for various timeouts
481  */
482 static unsigned int default_timeout = 10;
483
484
485 /**
486  * Callback function for sending queued message
487  *
488  * @param cls closure the socket
489  * @param size number of bytes available in buf
490  * @param buf where the callee should write the message
491  * @return number of bytes written to buf
492  */
493 static size_t
494 send_message_notify (void *cls, size_t size, void *buf)
495 {
496   struct GNUNET_STREAM_Socket *socket = cls;
497   struct GNUNET_PeerIdentity target;
498   struct MessageQueue *head;
499   size_t ret;
500
501   socket->transmit_handle = NULL; /* Remove the transmit handle */
502   head = socket->queue_head;
503   if (NULL == head)
504     return 0; /* just to be safe */
505   GNUNET_PEER_resolve (socket->other_peer, &target);
506   if (0 == size)                /* request timed out */
507     {
508       socket->retries++;
509       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
510                   "Message sending timed out. Retry %d \n",
511                   socket->retries);
512       socket->transmit_handle = 
513         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
514                                            0, /* Corking */
515                                            1, /* Priority */
516                                            /* FIXME: exponential backoff */
517                                            socket->retransmit_timeout,
518                                            &target,
519                                            ntohs (head->message->header.size),
520                                            &send_message_notify,
521                                            socket);
522       return 0;
523     }
524
525   ret = ntohs (head->message->header.size);
526   GNUNET_assert (size >= ret);
527   memcpy (buf, head->message, ret);
528   if (NULL != head->finish_cb)
529     {
530       head->finish_cb (head->finish_cb_cls, socket);
531     }
532   GNUNET_CONTAINER_DLL_remove (socket->queue_head,
533                                socket->queue_tail,
534                                head);
535   GNUNET_free (head->message);
536   GNUNET_free (head);
537   head = socket->queue_head;
538   if (NULL != head)    /* more pending messages to send */
539     {
540       socket->retries = 0;
541       socket->transmit_handle = 
542         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
543                                            0, /* Corking */
544                                            1, /* Priority */
545                                            /* FIXME: exponential backoff */
546                                            socket->retransmit_timeout,
547                                            &target,
548                                            ntohs (head->message->header.size),
549                                            &send_message_notify,
550                                            socket);
551     }
552   return ret;
553 }
554
555
556 /**
557  * Queues a message for sending using the mesh connection of a socket
558  *
559  * @param socket the socket whose mesh connection is used
560  * @param message the message to be sent
561  * @param finish_cb the callback to be called when the message is sent
562  * @param finish_cb_cls the closure for the callback
563  */
564 static void
565 queue_message (struct GNUNET_STREAM_Socket *socket,
566                struct GNUNET_STREAM_MessageHeader *message,
567                SendFinishCallback finish_cb,
568                void *finish_cb_cls)
569 {
570   struct MessageQueue *queue_entity;
571   struct GNUNET_PeerIdentity target;
572
573   GNUNET_assert 
574     ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA)
575      && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK));
576
577   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
578               "%x: Queueing message of type %d and size %d\n",
579               socket->our_id,
580               ntohs (message->header.type),
581               ntohs (message->header.size));
582   GNUNET_assert (NULL != message);
583   queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
584   queue_entity->message = message;
585   queue_entity->finish_cb = finish_cb;
586   queue_entity->finish_cb_cls = finish_cb_cls;
587   GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
588                                     socket->queue_tail,
589                                     queue_entity);
590   if (NULL == socket->transmit_handle)
591   {
592     socket->retries = 0;
593     GNUNET_PEER_resolve (socket->other_peer, &target);
594     socket->transmit_handle = 
595       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
596                                          0, /* Corking */
597                                          1, /* Priority */
598                                          socket->retransmit_timeout,
599                                          &target,
600                                          ntohs (message->header.size),
601                                          &send_message_notify,
602                                          socket);
603   }
604 }
605
606
607 /**
608  * Copies a message and queues it for sending using the mesh connection of
609  * given socket 
610  *
611  * @param socket the socket whose mesh connection is used
612  * @param message the message to be sent
613  * @param finish_cb the callback to be called when the message is sent
614  * @param finish_cb_cls the closure for the callback
615  */
616 static void
617 copy_and_queue_message (struct GNUNET_STREAM_Socket *socket,
618                         const struct GNUNET_STREAM_MessageHeader *message,
619                         SendFinishCallback finish_cb,
620                         void *finish_cb_cls)
621 {
622   struct GNUNET_STREAM_MessageHeader *msg_copy;
623   uint16_t size;
624   
625   size = ntohs (message->header.size);
626   msg_copy = GNUNET_malloc (size);
627   memcpy (msg_copy, message, size);
628   queue_message (socket, msg_copy, finish_cb, finish_cb_cls);
629 }
630
631
632 /**
633  * Callback function for sending ack message
634  *
635  * @param cls closure the ACK message created in ack_task
636  * @param size number of bytes available in buffer
637  * @param buf where the callee should write the message
638  * @return number of bytes written to buf
639  */
640 static size_t
641 send_ack_notify (void *cls, size_t size, void *buf)
642 {
643   struct GNUNET_STREAM_Socket *socket = cls;
644
645   if (0 == size)
646     {
647       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
648                   "%s called with size 0\n", __func__);
649       return 0;
650     }
651   GNUNET_assert (ntohs (socket->ack_msg->header.header.size) <= size);
652   
653   size = ntohs (socket->ack_msg->header.header.size);
654   memcpy (buf, socket->ack_msg, size);
655   
656   GNUNET_free (socket->ack_msg);
657   socket->ack_msg = NULL;
658   socket->ack_transmit_handle = NULL;
659   return size;
660 }
661
662 /**
663  * Writes data using the given socket. The amount of data written is limited by
664  * the receiver_window_size
665  *
666  * @param socket the socket to use
667  */
668 static void 
669 write_data (struct GNUNET_STREAM_Socket *socket);
670
671 /**
672  * Task for retransmitting data messages if they aren't ACK before their ack
673  * deadline 
674  *
675  * @param cls the socket
676  * @param tc the Task context
677  */
678 static void
679 retransmission_timeout_task (void *cls,
680                              const struct GNUNET_SCHEDULER_TaskContext *tc)
681 {
682   struct GNUNET_STREAM_Socket *socket = cls;
683   
684   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
685     return;
686
687   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
688               "%x: Retransmitting DATA...\n", socket->our_id);
689   socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
690   write_data (socket);
691 }
692
693
694 /**
695  * Task for sending ACK message
696  *
697  * @param cls the socket
698  * @param tc the Task context
699  */
700 static void
701 ack_task (void *cls,
702           const struct GNUNET_SCHEDULER_TaskContext *tc)
703 {
704   struct GNUNET_STREAM_Socket *socket = cls;
705   struct GNUNET_STREAM_AckMessage *ack_msg;
706   struct GNUNET_PeerIdentity target;
707
708   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
709     {
710       return;
711     }
712
713   socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
714
715   /* Create the ACK Message */
716   ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
717   ack_msg->header.header.size = htons (sizeof (struct 
718                                                GNUNET_STREAM_AckMessage));
719   ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
720   ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
721   ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
722   ack_msg->receive_window_remaining = 
723     htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
724
725   socket->ack_msg = ack_msg;
726   GNUNET_PEER_resolve (socket->other_peer, &target);
727   /* Request MESH for sending ACK */
728   socket->ack_transmit_handle = 
729     GNUNET_MESH_notify_transmit_ready (socket->tunnel,
730                                        0, /* Corking */
731                                        1, /* Priority */
732                                        socket->retransmit_timeout,
733                                        &target,
734                                        ntohs (ack_msg->header.header.size),
735                                        &send_ack_notify,
736                                        socket);
737 }
738
739
740 /**
741  * Retransmission task for shutdown messages
742  *
743  * @param cls the shutdown handle
744  * @param tc the Task Context
745  */
746 static void
747 close_msg_retransmission_task (void *cls,
748                                const struct GNUNET_SCHEDULER_TaskContext *tc)
749 {
750   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle = cls;
751   struct GNUNET_STREAM_MessageHeader *msg;
752   struct GNUNET_STREAM_Socket *socket;
753
754   GNUNET_assert (NULL != shutdown_handle);
755   socket = shutdown_handle->socket;
756
757   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
758   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
759   switch (shutdown_handle->operation)
760     {
761     case SHUT_RDWR:
762       msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
763       break;
764     case SHUT_RD:
765       msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
766       break;
767     case SHUT_WR:
768       msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
769       break;
770     default:
771       GNUNET_free (msg);
772       shutdown_handle->close_msg_retransmission_task_id = 
773         GNUNET_SCHEDULER_NO_TASK;
774       return;
775     }
776   queue_message (socket, msg, NULL, NULL);
777   shutdown_handle->close_msg_retransmission_task_id =
778     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
779                                   &close_msg_retransmission_task,
780                                   shutdown_handle);
781 }
782
783
784 /**
785  * Function to modify a bit in GNUNET_STREAM_AckBitmap
786  *
787  * @param bitmap the bitmap to modify
788  * @param bit the bit number to modify
789  * @param value GNUNET_YES to on, GNUNET_NO to off
790  */
791 static void
792 ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
793                       unsigned int bit, 
794                       int value)
795 {
796   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
797   if (GNUNET_YES == value)
798     *bitmap |= (1LL << bit);
799   else
800     *bitmap &= ~(1LL << bit);
801 }
802
803
804 /**
805  * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
806  *
807  * @param bitmap address of the bitmap that has to be checked
808  * @param bit the bit number to check
809  * @return GNUNET_YES if the bit is set; GNUNET_NO if not
810  */
811 static uint8_t
812 ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
813                       unsigned int bit)
814 {
815   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
816   return 0 != (*bitmap & (1LL << bit));
817 }
818
819
820 /**
821  * Writes data using the given socket. The amount of data written is limited by
822  * the receiver_window_size
823  *
824  * @param socket the socket to use
825  */
826 static void 
827 write_data (struct GNUNET_STREAM_Socket *socket)
828 {
829   struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
830   int packet;                   /* Although an int, should never be negative */
831   int ack_packet;
832
833   ack_packet = -1;
834   /* Find the last acknowledged packet */
835   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
836     {      
837       if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
838                                               packet))
839         ack_packet = packet;        
840       else if (NULL == io_handle->messages[packet])
841         break;
842     }
843   /* Resend packets which weren't ack'ed */
844   for (packet=0; packet < ack_packet; packet++)
845     {
846       if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
847                                              packet))
848         {
849           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
850                       "%x: Placing DATA message with sequence %u in send queue\n",
851                       socket->our_id,
852                       ntohl (io_handle->messages[packet]->sequence_number));
853
854           copy_and_queue_message (socket,
855                                   &io_handle->messages[packet]->header,
856                                   NULL,
857                                   NULL);
858         }
859     }
860   packet = ack_packet + 1;
861   /* Now send new packets if there is enough buffer space */
862   while ( (NULL != io_handle->messages[packet]) &&
863           (socket->receiver_window_available 
864            >= ntohs (io_handle->messages[packet]->header.header.size)) )
865     {
866       socket->receiver_window_available -= 
867         ntohs (io_handle->messages[packet]->header.header.size);
868       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
869                   "%x: Placing DATA message with sequence %u in send queue\n",
870                   socket->our_id,
871                   ntohl (io_handle->messages[packet]->sequence_number));
872       copy_and_queue_message (socket,
873                               &io_handle->messages[packet]->header,
874                               NULL,
875                               NULL);
876       packet++;
877     }
878
879   if (GNUNET_SCHEDULER_NO_TASK == socket->retransmission_timeout_task_id)
880     socket->retransmission_timeout_task_id = 
881       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 
882                                     (GNUNET_TIME_UNIT_SECONDS, 8),
883                                     &retransmission_timeout_task,
884                                     socket);
885 }
886
887
888 /**
889  * Task for calling the read processor
890  *
891  * @param cls the socket
892  * @param tc the task context
893  */
894 static void
895 call_read_processor (void *cls,
896                      const struct GNUNET_SCHEDULER_TaskContext *tc)
897 {
898   struct GNUNET_STREAM_Socket *socket = cls;
899   size_t read_size;
900   size_t valid_read_size;
901   unsigned int packet;
902   uint32_t sequence_increase;
903   uint32_t offset_increase;
904
905   socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
906   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
907     return;
908
909   if (NULL == socket->receive_buffer) 
910     return;
911
912   GNUNET_assert (NULL != socket->read_handle);
913   GNUNET_assert (NULL != socket->read_handle->proc);
914
915   /* Check the bitmap for any holes */
916   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
917     {
918       if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
919                                              packet))
920         break;
921     }
922   /* We only call read processor if we have the first packet */
923   GNUNET_assert (0 < packet);
924
925   valid_read_size = 
926     socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
927
928   GNUNET_assert (0 != valid_read_size);
929
930   /* Cancel the read_io_timeout_task */
931   GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task_id);
932   socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
933
934   /* Call the data processor */
935   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
936               "%x: Calling read processor\n",
937               socket->our_id);
938   read_size = 
939     socket->read_handle->proc (socket->read_handle->proc_cls,
940                                socket->status,
941                                socket->receive_buffer + socket->copy_offset,
942                                valid_read_size);
943   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
944               "%x: Read processor read %d bytes\n",
945               socket->our_id,
946               read_size);
947   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
948               "%x: Read processor completed successfully\n",
949               socket->our_id);
950
951   /* Free the read handle */
952   GNUNET_free (socket->read_handle);
953   socket->read_handle = NULL;
954
955   GNUNET_assert (read_size <= valid_read_size);
956   socket->copy_offset += read_size;
957
958   /* Determine upto which packet we can remove from the buffer */
959   for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
960     {
961       if (socket->copy_offset == socket->receive_buffer_boundaries[packet])
962         { packet++; break; }
963       if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
964         break;
965     }
966
967   /* If no packets can be removed we can't move the buffer */
968   if (0 == packet) return;
969
970   sequence_increase = packet;
971   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
972               "%x: Sequence increase after read processor completion: %u\n",
973               socket->our_id,
974               sequence_increase);
975
976   /* Shift the data in the receive buffer */
977   memmove (socket->receive_buffer,
978            socket->receive_buffer 
979            + socket->receive_buffer_boundaries[sequence_increase-1],
980            socket->receive_buffer_size
981            - socket->receive_buffer_boundaries[sequence_increase-1]);
982   
983   /* Shift the bitmap */
984   socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
985   
986   /* Set read_sequence_number */
987   socket->read_sequence_number += sequence_increase;
988   
989   /* Set read_offset */
990   offset_increase = socket->receive_buffer_boundaries[sequence_increase-1];
991   socket->read_offset += offset_increase;
992
993   /* Fix copy_offset */
994   GNUNET_assert (offset_increase <= socket->copy_offset);
995   socket->copy_offset -= offset_increase;
996   
997   /* Fix relative boundaries */
998   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
999     {
1000       if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase)
1001         {
1002           socket->receive_buffer_boundaries[packet] = 
1003             socket->receive_buffer_boundaries[packet + sequence_increase] 
1004             - offset_increase;
1005         }
1006       else
1007         socket->receive_buffer_boundaries[packet] = 0;
1008     }
1009 }
1010
1011
1012 /**
1013  * Cancels the existing read io handle
1014  *
1015  * @param cls the closure from the SCHEDULER call
1016  * @param tc the task context
1017  */
1018 static void
1019 read_io_timeout (void *cls, 
1020                 const struct GNUNET_SCHEDULER_TaskContext *tc)
1021 {
1022   struct GNUNET_STREAM_Socket *socket = cls;
1023   GNUNET_STREAM_DataProcessor proc;
1024   void *proc_cls;
1025
1026   socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
1027   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
1028   {
1029     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1030                 "%x: Read task timedout - Cancelling it\n",
1031                 socket->our_id);
1032     GNUNET_SCHEDULER_cancel (socket->read_task_id);
1033     socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
1034   }
1035   GNUNET_assert (NULL != socket->read_handle);
1036   proc = socket->read_handle->proc;
1037   proc_cls = socket->read_handle->proc_cls;
1038
1039   GNUNET_free (socket->read_handle);
1040   socket->read_handle = NULL;
1041   /* Call the read processor to signal timeout */
1042   proc (proc_cls,
1043         GNUNET_STREAM_TIMEOUT,
1044         NULL,
1045         0);
1046 }
1047
1048
1049 /**
1050  * Handler for DATA messages; Same for both client and server
1051  *
1052  * @param socket the socket through which the ack was received
1053  * @param tunnel connection to the other end
1054  * @param sender who sent the message
1055  * @param msg the data message
1056  * @param atsi performance data for the connection
1057  * @return GNUNET_OK to keep the connection open,
1058  *         GNUNET_SYSERR to close it (signal serious error)
1059  */
1060 static int
1061 handle_data (struct GNUNET_STREAM_Socket *socket,
1062              struct GNUNET_MESH_Tunnel *tunnel,
1063              const struct GNUNET_PeerIdentity *sender,
1064              const struct GNUNET_STREAM_DataMessage *msg,
1065              const struct GNUNET_ATS_Information*atsi)
1066 {
1067   const void *payload;
1068   uint32_t bytes_needed;
1069   uint32_t relative_offset;
1070   uint32_t relative_sequence_number;
1071   uint16_t size;
1072
1073   size = htons (msg->header.header.size);
1074   if (size < sizeof (struct GNUNET_STREAM_DataMessage))
1075     {
1076       GNUNET_break_op (0);
1077       return GNUNET_SYSERR;
1078     }
1079
1080   if (GNUNET_PEER_search (sender) != socket->other_peer)
1081     {
1082       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1083                   "%x: Received DATA from non-confirming peer\n",
1084                   socket->our_id);
1085       return GNUNET_YES;
1086     }
1087
1088   switch (socket->state)
1089     {
1090     case STATE_ESTABLISHED:
1091     case STATE_TRANSMIT_CLOSED:
1092     case STATE_TRANSMIT_CLOSE_WAIT:
1093       
1094       /* check if the message's sequence number is in the range we are
1095          expecting */
1096       relative_sequence_number = 
1097         ntohl (msg->sequence_number) - socket->read_sequence_number;
1098       if ( relative_sequence_number > GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
1099         {
1100           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1101                       "%x: Ignoring received message with sequence number %u\n",
1102                       socket->our_id,
1103                       ntohl (msg->sequence_number));
1104           /* Start ACK sending task if one is not already present */
1105           if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1106             {
1107               socket->ack_task_id = 
1108                 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1109                                               (msg->ack_deadline),
1110                                               &ack_task,
1111                                               socket);
1112             }
1113           return GNUNET_YES;
1114         }
1115       
1116       /* Check if we have already seen this message */
1117       if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
1118                                               relative_sequence_number))
1119         {
1120           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1121                       "%x: Ignoring already received message with sequence "
1122                       "number %u\n",
1123                       socket->our_id,
1124                       ntohl (msg->sequence_number));
1125           /* Start ACK sending task if one is not already present */
1126           if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1127             {
1128               socket->ack_task_id = 
1129                 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1130                                               (msg->ack_deadline),
1131                                               &ack_task,
1132                                               socket);
1133             }
1134           return GNUNET_YES;
1135         }
1136
1137       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1138                   "%x: Receiving DATA with sequence number: %u and size: %d "
1139                   "from %x\n",
1140                   socket->our_id,
1141                   ntohl (msg->sequence_number),
1142                   ntohs (msg->header.header.size),
1143                   socket->other_peer);
1144
1145       /* Check if we have to allocate the buffer */
1146       size -= sizeof (struct GNUNET_STREAM_DataMessage);
1147       relative_offset = ntohl (msg->offset) - socket->read_offset;
1148       bytes_needed = relative_offset + size;
1149       if (bytes_needed > socket->receive_buffer_size)
1150         {
1151           if (bytes_needed <= RECEIVE_BUFFER_SIZE)
1152             {
1153               socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
1154                                                        bytes_needed);
1155               socket->receive_buffer_size = bytes_needed;
1156             }
1157           else
1158             {
1159               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1160                           "%x: Cannot accommodate packet %d as buffer is",
1161                           "full\n",
1162                           socket->our_id,
1163                           ntohl (msg->sequence_number));
1164               return GNUNET_YES;
1165             }
1166         }
1167       
1168       /* Copy Data to buffer */
1169       payload = &msg[1];
1170       GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
1171       memcpy (socket->receive_buffer + relative_offset,
1172               payload,
1173               size);
1174       socket->receive_buffer_boundaries[relative_sequence_number] = 
1175         relative_offset + size;
1176       
1177       /* Modify the ACK bitmap */
1178       ackbitmap_modify_bit (&socket->ack_bitmap,
1179                             relative_sequence_number,
1180                             GNUNET_YES);
1181
1182       /* Start ACK sending task if one is not already present */
1183       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1184        {
1185          socket->ack_task_id = 
1186            GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1187                                          (msg->ack_deadline),
1188                                          &ack_task,
1189                                          socket);
1190        }
1191
1192       if ((NULL != socket->read_handle) /* A read handle is waiting */
1193           /* There is no current read task */
1194           && (GNUNET_SCHEDULER_NO_TASK == socket->read_task_id)
1195           /* We have the first packet */
1196           && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap,
1197                                                  0)))
1198         {
1199           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1200                       "%x: Scheduling read processor\n",
1201                       socket->our_id);
1202
1203           socket->read_task_id = 
1204             GNUNET_SCHEDULER_add_now (&call_read_processor,
1205                                       socket);
1206         }
1207       
1208       break;
1209
1210     default:
1211       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1212                   "%x: Received data message when it cannot be handled\n",
1213                   socket->our_id);
1214       break;
1215     }
1216   return GNUNET_YES;
1217 }
1218
1219
1220 /**
1221  * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1222  *
1223  * @param cls the socket (set from GNUNET_MESH_connect)
1224  * @param tunnel connection to the other end
1225  * @param tunnel_ctx place to store local state associated with the tunnel
1226  * @param sender who sent the message
1227  * @param message the actual message
1228  * @param atsi performance data for the connection
1229  * @return GNUNET_OK to keep the connection open,
1230  *         GNUNET_SYSERR to close it (signal serious error)
1231  */
1232 static int
1233 client_handle_data (void *cls,
1234              struct GNUNET_MESH_Tunnel *tunnel,
1235              void **tunnel_ctx,
1236              const struct GNUNET_PeerIdentity *sender,
1237              const struct GNUNET_MessageHeader *message,
1238              const struct GNUNET_ATS_Information*atsi)
1239 {
1240   struct GNUNET_STREAM_Socket *socket = cls;
1241
1242   return handle_data (socket, 
1243                       tunnel, 
1244                       sender, 
1245                       (const struct GNUNET_STREAM_DataMessage *) message, 
1246                       atsi);
1247 }
1248
1249
1250 /**
1251  * Callback to set state to ESTABLISHED
1252  *
1253  * @param cls the closure from queue_message FIXME: document
1254  * @param socket the socket to requiring state change
1255  */
1256 static void
1257 set_state_established (void *cls,
1258                        struct GNUNET_STREAM_Socket *socket)
1259 {
1260   struct GNUNET_PeerIdentity initiator_pid;
1261
1262   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
1263               "%x: Attaining ESTABLISHED state\n",
1264               socket->our_id);
1265   socket->write_offset = 0;
1266   socket->read_offset = 0;
1267   socket->state = STATE_ESTABLISHED;
1268   /* FIXME: What if listen_cb is NULL */
1269   if (NULL != socket->lsocket)
1270     {
1271       GNUNET_PEER_resolve (socket->other_peer, &initiator_pid);
1272       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1273                   "%x: Calling listen callback\n",
1274                   socket->our_id);
1275       if (GNUNET_SYSERR == 
1276           socket->lsocket->listen_cb (socket->lsocket->listen_cb_cls,
1277                                       socket,
1278                                       &initiator_pid))
1279         {
1280           socket->state = STATE_CLOSED;
1281           /* FIXME: We should close in a decent way */
1282           GNUNET_MESH_tunnel_destroy (socket->tunnel); /* Destroy the tunnel */
1283           GNUNET_free (socket);
1284         }
1285     }
1286   else if (socket->open_cb)
1287     socket->open_cb (socket->open_cls, socket);
1288 }
1289
1290
1291 /**
1292  * Callback to set state to HELLO_WAIT
1293  *
1294  * @param cls the closure from queue_message
1295  * @param socket the socket to requiring state change
1296  */
1297 static void
1298 set_state_hello_wait (void *cls,
1299                       struct GNUNET_STREAM_Socket *socket)
1300 {
1301   GNUNET_assert (STATE_INIT == socket->state);
1302   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
1303               "%x: Attaining HELLO_WAIT state\n",
1304               socket->our_id);
1305   socket->state = STATE_HELLO_WAIT;
1306 }
1307
1308
1309 /**
1310  * Callback to set state to CLOSE_WAIT
1311  *
1312  * @param cls the closure from queue_message
1313  * @param socket the socket requiring state change
1314  */
1315 static void
1316 set_state_close_wait (void *cls,
1317                       struct GNUNET_STREAM_Socket *socket)
1318 {
1319   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1320               "%x: Attaing CLOSE_WAIT state\n",
1321               socket->our_id);
1322   socket->state = STATE_CLOSE_WAIT;
1323   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1324   socket->receive_buffer = NULL;
1325   socket->receive_buffer_size = 0;
1326 }
1327
1328
1329 /**
1330  * Callback to set state to RECEIVE_CLOSE_WAIT
1331  *
1332  * @param cls the closure from queue_message
1333  * @param socket the socket requiring state change
1334  */
1335 static void
1336 set_state_receive_close_wait (void *cls,
1337                               struct GNUNET_STREAM_Socket *socket)
1338 {
1339   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1340               "%x: Attaing RECEIVE_CLOSE_WAIT state\n",
1341               socket->our_id);
1342   socket->state = STATE_RECEIVE_CLOSE_WAIT;
1343   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1344   socket->receive_buffer = NULL;
1345   socket->receive_buffer_size = 0;
1346 }
1347
1348
1349 /**
1350  * Callback to set state to TRANSMIT_CLOSE_WAIT
1351  *
1352  * @param cls the closure from queue_message
1353  * @param socket the socket requiring state change
1354  */
1355 static void
1356 set_state_transmit_close_wait (void *cls,
1357                                struct GNUNET_STREAM_Socket *socket)
1358 {
1359   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1360               "%x: Attaing TRANSMIT_CLOSE_WAIT state\n",
1361               socket->our_id);
1362   socket->state = STATE_TRANSMIT_CLOSE_WAIT;
1363 }
1364
1365
1366 /**
1367  * Callback to set state to CLOSED
1368  *
1369  * @param cls the closure from queue_message
1370  * @param socket the socket requiring state change
1371  */
1372 static void
1373 set_state_closed (void *cls,
1374                   struct GNUNET_STREAM_Socket *socket)
1375 {
1376   socket->state = STATE_CLOSED;
1377 }
1378
1379 /**
1380  * Returns a new HelloAckMessage. Also sets the write sequence number for the
1381  * socket
1382  *
1383  * @param socket the socket for which this HelloAckMessage has to be generated
1384  * @return the HelloAckMessage
1385  */
1386 static struct GNUNET_STREAM_HelloAckMessage *
1387 generate_hello_ack_msg (struct GNUNET_STREAM_Socket *socket)
1388 {
1389   struct GNUNET_STREAM_HelloAckMessage *msg;
1390
1391   /* Get the random sequence number */
1392   socket->write_sequence_number = 
1393     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1394   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1395               "%x: Generated write sequence number %u\n",
1396               socket->our_id,
1397               (unsigned int) socket->write_sequence_number);
1398   
1399   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1400   msg->header.header.size = 
1401     htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1402   msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1403   msg->sequence_number = htonl (socket->write_sequence_number);
1404   msg->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
1405
1406   return msg;
1407 }
1408
1409
1410 /**
1411  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1412  *
1413  * @param cls the socket (set from GNUNET_MESH_connect)
1414  * @param tunnel connection to the other end
1415  * @param tunnel_ctx this is NULL
1416  * @param sender who sent the message
1417  * @param message the actual message
1418  * @param atsi performance data for the connection
1419  * @return GNUNET_OK to keep the connection open,
1420  *         GNUNET_SYSERR to close it (signal serious error)
1421  */
1422 static int
1423 client_handle_hello_ack (void *cls,
1424                          struct GNUNET_MESH_Tunnel *tunnel,
1425                          void **tunnel_ctx,
1426                          const struct GNUNET_PeerIdentity *sender,
1427                          const struct GNUNET_MessageHeader *message,
1428                          const struct GNUNET_ATS_Information*atsi)
1429 {
1430   struct GNUNET_STREAM_Socket *socket = cls;
1431   const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
1432   struct GNUNET_STREAM_HelloAckMessage *reply;
1433
1434   if (GNUNET_PEER_search (sender) != socket->other_peer)
1435     {
1436       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1437                   "%x: Received HELLO_ACK from non-confirming peer\n",
1438                   socket->our_id);
1439       return GNUNET_YES;
1440     }
1441   ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
1442   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1443               "%x: Received HELLO_ACK from %x\n",
1444               socket->our_id,
1445               socket->other_peer);
1446
1447   GNUNET_assert (socket->tunnel == tunnel);
1448   switch (socket->state)
1449   {
1450   case STATE_HELLO_WAIT:
1451     socket->read_sequence_number = ntohl (ack_msg->sequence_number);
1452     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1453                 "%x: Read sequence number %u\n",
1454                 socket->our_id,
1455                 (unsigned int) socket->read_sequence_number);
1456     socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
1457     reply = generate_hello_ack_msg (socket);
1458     queue_message (socket,
1459                    &reply->header, 
1460                    &set_state_established, 
1461                    NULL);      
1462     return GNUNET_OK;
1463   case STATE_ESTABLISHED:
1464   case STATE_RECEIVE_CLOSE_WAIT:
1465     // call statistics (# ACKs ignored++)
1466     return GNUNET_OK;
1467   case STATE_INIT:
1468   default:
1469     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1470                 "%x: Server %x sent HELLO_ACK when in state %d\n", 
1471                 socket->our_id,
1472                 socket->other_peer,
1473                 socket->state);
1474     socket->state = STATE_CLOSED; // introduce STATE_ERROR?
1475     return GNUNET_SYSERR;
1476   }
1477
1478 }
1479
1480
1481 /**
1482  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1483  *
1484  * @param cls the socket (set from GNUNET_MESH_connect)
1485  * @param tunnel connection to the other end
1486  * @param tunnel_ctx this is NULL
1487  * @param sender who sent the message
1488  * @param message the actual message
1489  * @param atsi performance data for the connection
1490  * @return GNUNET_OK to keep the connection open,
1491  *         GNUNET_SYSERR to close it (signal serious error)
1492  */
1493 static int
1494 client_handle_reset (void *cls,
1495                      struct GNUNET_MESH_Tunnel *tunnel,
1496                      void **tunnel_ctx,
1497                      const struct GNUNET_PeerIdentity *sender,
1498                      const struct GNUNET_MessageHeader *message,
1499                      const struct GNUNET_ATS_Information*atsi)
1500 {
1501   // struct GNUNET_STREAM_Socket *socket = cls;
1502
1503   return GNUNET_OK;
1504 }
1505
1506
1507 /**
1508  * Common message handler for handling TRANSMIT_CLOSE messages
1509  *
1510  * @param socket the socket through which the ack was received
1511  * @param tunnel connection to the other end
1512  * @param sender who sent the message
1513  * @param msg the transmit close message
1514  * @param atsi performance data for the connection
1515  * @return GNUNET_OK to keep the connection open,
1516  *         GNUNET_SYSERR to close it (signal serious error)
1517  */
1518 static int
1519 handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
1520                        struct GNUNET_MESH_Tunnel *tunnel,
1521                        const struct GNUNET_PeerIdentity *sender,
1522                        const struct GNUNET_STREAM_MessageHeader *msg,
1523                        const struct GNUNET_ATS_Information*atsi)
1524 {
1525   struct GNUNET_STREAM_MessageHeader *reply;
1526
1527   switch (socket->state)
1528     {
1529     case STATE_ESTABLISHED:
1530       socket->state = STATE_RECEIVE_CLOSED;
1531
1532       /* Send TRANSMIT_CLOSE_ACK */
1533       reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1534       reply->header.type = 
1535         htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
1536       reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1537       queue_message (socket, reply, NULL, NULL);
1538       break;
1539
1540     default:
1541       /* FIXME: Call statistics? */
1542       break;
1543     }
1544   return GNUNET_YES;
1545 }
1546
1547
1548 /**
1549  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1550  *
1551  * @param cls the socket (set from GNUNET_MESH_connect)
1552  * @param tunnel connection to the other end
1553  * @param tunnel_ctx this is NULL
1554  * @param sender who sent the message
1555  * @param message the actual message
1556  * @param atsi performance data for the connection
1557  * @return GNUNET_OK to keep the connection open,
1558  *         GNUNET_SYSERR to close it (signal serious error)
1559  */
1560 static int
1561 client_handle_transmit_close (void *cls,
1562                               struct GNUNET_MESH_Tunnel *tunnel,
1563                               void **tunnel_ctx,
1564                               const struct GNUNET_PeerIdentity *sender,
1565                               const struct GNUNET_MessageHeader *message,
1566                               const struct GNUNET_ATS_Information*atsi)
1567 {
1568   struct GNUNET_STREAM_Socket *socket = cls;
1569   
1570   return handle_transmit_close (socket,
1571                                 tunnel,
1572                                 sender,
1573                                 (struct GNUNET_STREAM_MessageHeader *)message,
1574                                 atsi);
1575 }
1576
1577
1578 /**
1579  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_*_CLOSE_ACK messages
1580  *
1581  * @param socket the socket
1582  * @param tunnel connection to the other end
1583  * @param sender who sent the message
1584  * @param message the actual message
1585  * @param atsi performance data for the connection
1586  * @param operation the close operation which is being ACK'ed
1587  * @return GNUNET_OK to keep the connection open,
1588  *         GNUNET_SYSERR to close it (signal serious error)
1589  */
1590 static int
1591 handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
1592                           struct GNUNET_MESH_Tunnel *tunnel,
1593                           const struct GNUNET_PeerIdentity *sender,
1594                           const struct GNUNET_STREAM_MessageHeader *message,
1595                           const struct GNUNET_ATS_Information *atsi,
1596                           int operation)
1597 {
1598   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
1599
1600   shutdown_handle = socket->shutdown_handle;
1601   if (NULL == shutdown_handle)
1602     {
1603       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1604                   "%x: Received *CLOSE_ACK when shutdown handle is NULL\n",
1605                   socket->our_id);
1606       return GNUNET_OK;
1607     }
1608
1609   switch (operation)
1610     {
1611     case SHUT_RDWR:
1612       switch (socket->state)
1613         {
1614         case STATE_CLOSE_WAIT:
1615           if (SHUT_RDWR != shutdown_handle->operation)
1616             {
1617               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1618                           "%x: Received CLOSE_ACK when shutdown handle "
1619                           "is not for SHUT_RDWR\n",
1620                           socket->our_id);
1621               return GNUNET_OK;
1622             }
1623
1624           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1625                       "%x: Received CLOSE_ACK from %x\n",
1626                       socket->our_id,
1627                       socket->other_peer);
1628           socket->state = STATE_CLOSED;
1629           break;
1630         default:
1631           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1632                       "%x: Received CLOSE_ACK when in it not expected\n",
1633                       socket->our_id);
1634           return GNUNET_OK;
1635         }
1636       break;
1637
1638     case SHUT_RD:
1639       switch (socket->state)
1640         {
1641         case STATE_RECEIVE_CLOSE_WAIT:
1642           if (SHUT_RD != shutdown_handle->operation)
1643             {
1644               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1645                           "%x: Received RECEIVE_CLOSE_ACK when shutdown handle "
1646                           "is not for SHUT_RD\n",
1647                           socket->our_id);
1648               return GNUNET_OK;
1649             }
1650
1651           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1652                       "%x: Received RECEIVE_CLOSE_ACK from %x\n",
1653                       socket->our_id,
1654                       socket->other_peer);
1655           socket->state = STATE_RECEIVE_CLOSED;
1656           break;
1657         default:
1658           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1659                       "%x: Received RECEIVE_CLOSE_ACK when in it not expected\n",
1660                       socket->our_id);
1661           return GNUNET_OK;
1662         }
1663
1664       break;
1665     case SHUT_WR:
1666       switch (socket->state)
1667         {
1668         case STATE_TRANSMIT_CLOSE_WAIT:
1669           if (SHUT_WR != shutdown_handle->operation)
1670             {
1671               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1672                           "%x: Received TRANSMIT_CLOSE_ACK when shutdown handle "
1673                           "is not for SHUT_WR\n",
1674                           socket->our_id);
1675               return GNUNET_OK;
1676             }
1677
1678           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1679                       "%x: Received TRANSMIT_CLOSE_ACK from %x\n",
1680                       socket->our_id,
1681                       socket->other_peer);
1682           socket->state = STATE_TRANSMIT_CLOSED;
1683           break;
1684         default:
1685           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1686                       "%x: Received TRANSMIT_CLOSE_ACK when in it not expected\n",
1687                       socket->our_id);
1688           
1689           return GNUNET_OK;
1690         }
1691       break;
1692     default:
1693       GNUNET_assert (0);
1694     }
1695
1696   if (NULL != shutdown_handle->completion_cb) /* Shutdown completion */
1697     shutdown_handle->completion_cb(shutdown_handle->completion_cls,
1698                                    operation);
1699   GNUNET_free (shutdown_handle); /* Free shutdown handle */
1700   socket->shutdown_handle = NULL;
1701   if (GNUNET_SCHEDULER_NO_TASK
1702       != shutdown_handle->close_msg_retransmission_task_id)
1703     {
1704       GNUNET_SCHEDULER_cancel
1705         (shutdown_handle->close_msg_retransmission_task_id);
1706       shutdown_handle->close_msg_retransmission_task_id =
1707         GNUNET_SCHEDULER_NO_TASK;
1708     }
1709   return GNUNET_OK;
1710 }
1711
1712
1713 /**
1714  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1715  *
1716  * @param cls the socket (set from GNUNET_MESH_connect)
1717  * @param tunnel connection to the other end
1718  * @param tunnel_ctx this is NULL
1719  * @param sender who sent the message
1720  * @param message the actual message
1721  * @param atsi performance data for the connection
1722  * @return GNUNET_OK to keep the connection open,
1723  *         GNUNET_SYSERR to close it (signal serious error)
1724  */
1725 static int
1726 client_handle_transmit_close_ack (void *cls,
1727                                   struct GNUNET_MESH_Tunnel *tunnel,
1728                                   void **tunnel_ctx,
1729                                   const struct GNUNET_PeerIdentity *sender,
1730                                   const struct GNUNET_MessageHeader *message,
1731                                   const struct GNUNET_ATS_Information*atsi)
1732 {
1733   struct GNUNET_STREAM_Socket *socket = cls;
1734
1735   return handle_generic_close_ack (socket,
1736                                    tunnel,
1737                                    sender,
1738                                    (const struct GNUNET_STREAM_MessageHeader *)
1739                                    message,
1740                                    atsi,
1741                                    SHUT_WR);
1742 }
1743
1744
1745 /**
1746  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1747  *
1748  * @param socket the socket
1749  * @param tunnel connection to the other end
1750  * @param sender who sent the message
1751  * @param message the actual message
1752  * @param atsi performance data for the connection
1753  * @return GNUNET_OK to keep the connection open,
1754  *         GNUNET_SYSERR to close it (signal serious error)
1755  */
1756 static int
1757 handle_receive_close (struct GNUNET_STREAM_Socket *socket,
1758                       struct GNUNET_MESH_Tunnel *tunnel,
1759                       const struct GNUNET_PeerIdentity *sender,
1760                       const struct GNUNET_STREAM_MessageHeader *message,
1761                       const struct GNUNET_ATS_Information *atsi)
1762 {
1763   struct GNUNET_STREAM_MessageHeader *receive_close_ack;
1764
1765   switch (socket->state)
1766     {
1767     case STATE_INIT:
1768     case STATE_LISTEN:
1769     case STATE_HELLO_WAIT:
1770       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1771                   "%x: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
1772                   socket->our_id);
1773       return GNUNET_OK;
1774     default:
1775       break;
1776     }
1777   
1778   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1779               "%x: Received RECEIVE_CLOSE from %x\n",
1780               socket->our_id,
1781               socket->other_peer);
1782   receive_close_ack =
1783     GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1784   receive_close_ack->header.size =
1785     htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1786   receive_close_ack->header.type =
1787     htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK);
1788   queue_message (socket,
1789                  receive_close_ack,
1790                  &set_state_closed,
1791                  NULL);
1792   
1793   /* FIXME: Handle the case where write handle is present; the write operation
1794               should be deemed as finised and the write continuation callback
1795               has to be called with the stream status GNUNET_STREAM_SHUTDOWN */
1796   return GNUNET_OK;
1797 }
1798
1799
1800 /**
1801  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1802  *
1803  * @param cls the socket (set from GNUNET_MESH_connect)
1804  * @param tunnel connection to the other end
1805  * @param tunnel_ctx this is NULL
1806  * @param sender who sent the message
1807  * @param message the actual message
1808  * @param atsi performance data for the connection
1809  * @return GNUNET_OK to keep the connection open,
1810  *         GNUNET_SYSERR to close it (signal serious error)
1811  */
1812 static int
1813 client_handle_receive_close (void *cls,
1814                              struct GNUNET_MESH_Tunnel *tunnel,
1815                              void **tunnel_ctx,
1816                              const struct GNUNET_PeerIdentity *sender,
1817                              const struct GNUNET_MessageHeader *message,
1818                              const struct GNUNET_ATS_Information*atsi)
1819 {
1820   struct GNUNET_STREAM_Socket *socket = cls;
1821
1822   return
1823     handle_receive_close (socket,
1824                           tunnel,
1825                           sender,
1826                           (const struct GNUNET_STREAM_MessageHeader *) message,
1827                           atsi);
1828 }
1829
1830
1831 /**
1832  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1833  *
1834  * @param cls the socket (set from GNUNET_MESH_connect)
1835  * @param tunnel connection to the other end
1836  * @param tunnel_ctx this is NULL
1837  * @param sender who sent the message
1838  * @param message the actual message
1839  * @param atsi performance data for the connection
1840  * @return GNUNET_OK to keep the connection open,
1841  *         GNUNET_SYSERR to close it (signal serious error)
1842  */
1843 static int
1844 client_handle_receive_close_ack (void *cls,
1845                                  struct GNUNET_MESH_Tunnel *tunnel,
1846                                  void **tunnel_ctx,
1847                                  const struct GNUNET_PeerIdentity *sender,
1848                                  const struct GNUNET_MessageHeader *message,
1849                                  const struct GNUNET_ATS_Information*atsi)
1850 {
1851   struct GNUNET_STREAM_Socket *socket = cls;
1852
1853   return handle_generic_close_ack (socket,
1854                                    tunnel,
1855                                    sender,
1856                                    (const struct GNUNET_STREAM_MessageHeader *)
1857                                    message,
1858                                    atsi,
1859                                    SHUT_RD);
1860 }
1861
1862
1863 /**
1864  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1865  *
1866  * @param socket the socket
1867  * @param tunnel connection to the other end
1868  * @param sender who sent the message
1869  * @param message the actual message
1870  * @param atsi performance data for the connection
1871  * @return GNUNET_OK to keep the connection open,
1872  *         GNUNET_SYSERR to close it (signal serious error)
1873  */
1874 static int
1875 handle_close (struct GNUNET_STREAM_Socket *socket,
1876               struct GNUNET_MESH_Tunnel *tunnel,
1877               const struct GNUNET_PeerIdentity *sender,
1878               const struct GNUNET_STREAM_MessageHeader *message,
1879               const struct GNUNET_ATS_Information*atsi)
1880 {
1881   struct GNUNET_STREAM_MessageHeader *close_ack;
1882
1883   switch (socket->state)
1884     {
1885     case STATE_INIT:
1886     case STATE_LISTEN:
1887     case STATE_HELLO_WAIT:
1888       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1889                   "%x: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
1890                   socket->our_id);
1891       return GNUNET_OK;
1892     default:
1893       break;
1894     }
1895
1896   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1897               "%x: Received CLOSE from %x\n",
1898               socket->our_id,
1899               socket->other_peer);
1900   close_ack = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1901   close_ack->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1902   close_ack->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK);
1903   queue_message (socket,
1904                  close_ack,
1905                  &set_state_closed,
1906                  NULL);
1907   if (socket->state == STATE_CLOSED)
1908     return GNUNET_OK;
1909
1910   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1911   socket->receive_buffer = NULL;
1912   socket->receive_buffer_size = 0;
1913   return GNUNET_OK;
1914 }
1915
1916
1917 /**
1918  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1919  *
1920  * @param cls the socket (set from GNUNET_MESH_connect)
1921  * @param tunnel connection to the other end
1922  * @param tunnel_ctx this is NULL
1923  * @param sender who sent the message
1924  * @param message the actual message
1925  * @param atsi performance data for the connection
1926  * @return GNUNET_OK to keep the connection open,
1927  *         GNUNET_SYSERR to close it (signal serious error)
1928  */
1929 static int
1930 client_handle_close (void *cls,
1931                      struct GNUNET_MESH_Tunnel *tunnel,
1932                      void **tunnel_ctx,
1933                      const struct GNUNET_PeerIdentity *sender,
1934                      const struct GNUNET_MessageHeader *message,
1935                      const struct GNUNET_ATS_Information*atsi)
1936 {
1937   struct GNUNET_STREAM_Socket *socket = cls;
1938
1939   return handle_close (socket,
1940                        tunnel,
1941                        sender,
1942                        (const struct GNUNET_STREAM_MessageHeader *) message,
1943                        atsi);
1944 }
1945
1946
1947 /**
1948  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1949  *
1950  * @param cls the socket (set from GNUNET_MESH_connect)
1951  * @param tunnel connection to the other end
1952  * @param tunnel_ctx this is NULL
1953  * @param sender who sent the message
1954  * @param message the actual message
1955  * @param atsi performance data for the connection
1956  * @return GNUNET_OK to keep the connection open,
1957  *         GNUNET_SYSERR to close it (signal serious error)
1958  */
1959 static int
1960 client_handle_close_ack (void *cls,
1961                          struct GNUNET_MESH_Tunnel *tunnel,
1962                          void **tunnel_ctx,
1963                          const struct GNUNET_PeerIdentity *sender,
1964                          const struct GNUNET_MessageHeader *message,
1965                          const struct GNUNET_ATS_Information *atsi)
1966 {
1967   struct GNUNET_STREAM_Socket *socket = cls;
1968
1969   return handle_generic_close_ack (socket,
1970                                    tunnel,
1971                                    sender,
1972                                    (const struct GNUNET_STREAM_MessageHeader *) 
1973                                    message,
1974                                    atsi,
1975                                    SHUT_RDWR);
1976 }
1977
1978 /*****************************/
1979 /* Server's Message Handlers */
1980 /*****************************/
1981
1982 /**
1983  * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1984  *
1985  * @param cls the closure
1986  * @param tunnel connection to the other end
1987  * @param tunnel_ctx the socket
1988  * @param sender who sent the message
1989  * @param message the actual message
1990  * @param atsi performance data for the connection
1991  * @return GNUNET_OK to keep the connection open,
1992  *         GNUNET_SYSERR to close it (signal serious error)
1993  */
1994 static int
1995 server_handle_data (void *cls,
1996                     struct GNUNET_MESH_Tunnel *tunnel,
1997                     void **tunnel_ctx,
1998                     const struct GNUNET_PeerIdentity *sender,
1999                     const struct GNUNET_MessageHeader *message,
2000                     const struct GNUNET_ATS_Information*atsi)
2001 {
2002   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2003
2004   return handle_data (socket,
2005                       tunnel,
2006                       sender,
2007                       (const struct GNUNET_STREAM_DataMessage *)message,
2008                       atsi);
2009 }
2010
2011
2012 /**
2013  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
2014  *
2015  * @param cls the closure
2016  * @param tunnel connection to the other end
2017  * @param tunnel_ctx the socket
2018  * @param sender who sent the message
2019  * @param message the actual message
2020  * @param atsi performance data for the connection
2021  * @return GNUNET_OK to keep the connection open,
2022  *         GNUNET_SYSERR to close it (signal serious error)
2023  */
2024 static int
2025 server_handle_hello (void *cls,
2026                      struct GNUNET_MESH_Tunnel *tunnel,
2027                      void **tunnel_ctx,
2028                      const struct GNUNET_PeerIdentity *sender,
2029                      const struct GNUNET_MessageHeader *message,
2030                      const struct GNUNET_ATS_Information*atsi)
2031 {
2032   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2033   struct GNUNET_STREAM_HelloAckMessage *reply;
2034
2035   if (GNUNET_PEER_search (sender) != socket->other_peer)
2036     {
2037       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2038                   "%x: Received HELLO from non-confirming peer\n",
2039                   socket->our_id);
2040       return GNUNET_YES;
2041     }
2042
2043   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == 
2044                  ntohs (message->type));
2045   GNUNET_assert (socket->tunnel == tunnel);
2046   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2047               "%x: Received HELLO from %x\n", 
2048               socket->our_id,
2049               socket->other_peer);
2050
2051   if (STATE_INIT == socket->state)
2052     {
2053       reply = generate_hello_ack_msg (socket);
2054       queue_message (socket, 
2055                      &reply->header,
2056                      &set_state_hello_wait, 
2057                      NULL);
2058     }
2059   else
2060     {
2061       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2062                   "Client sent HELLO when in state %d\n", socket->state);
2063       /* FIXME: Send RESET? */
2064       
2065     }
2066   return GNUNET_OK;
2067 }
2068
2069
2070 /**
2071  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
2072  *
2073  * @param cls the closure
2074  * @param tunnel connection to the other end
2075  * @param tunnel_ctx the socket
2076  * @param sender who sent the message
2077  * @param message the actual message
2078  * @param atsi performance data for the connection
2079  * @return GNUNET_OK to keep the connection open,
2080  *         GNUNET_SYSERR to close it (signal serious error)
2081  */
2082 static int
2083 server_handle_hello_ack (void *cls,
2084                          struct GNUNET_MESH_Tunnel *tunnel,
2085                          void **tunnel_ctx,
2086                          const struct GNUNET_PeerIdentity *sender,
2087                          const struct GNUNET_MessageHeader *message,
2088                          const struct GNUNET_ATS_Information*atsi)
2089 {
2090   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2091   const struct GNUNET_STREAM_HelloAckMessage *ack_message;
2092
2093   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK ==
2094                  ntohs (message->type));
2095   GNUNET_assert (socket->tunnel == tunnel);
2096   ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
2097   if (STATE_HELLO_WAIT == socket->state)
2098     {
2099       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2100                   "%x: Received HELLO_ACK from %x\n",
2101                   socket->our_id,
2102                   socket->other_peer);
2103       socket->read_sequence_number = ntohl (ack_message->sequence_number);
2104       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2105                   "%x: Read sequence number %u\n",
2106                   socket->our_id,
2107                   (unsigned int) socket->read_sequence_number);
2108       socket->receiver_window_available = 
2109         ntohl (ack_message->receiver_window_size);
2110       /* Attain ESTABLISHED state */
2111       set_state_established (NULL, socket);
2112     }
2113   else
2114     {
2115       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2116                   "Client sent HELLO_ACK when in state %d\n", socket->state);
2117       /* FIXME: Send RESET? */
2118       
2119     }
2120   return GNUNET_OK;
2121 }
2122
2123
2124 /**
2125  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
2126  *
2127  * @param cls the closure
2128  * @param tunnel connection to the other end
2129  * @param tunnel_ctx the socket
2130  * @param sender who sent the message
2131  * @param message the actual message
2132  * @param atsi performance data for the connection
2133  * @return GNUNET_OK to keep the connection open,
2134  *         GNUNET_SYSERR to close it (signal serious error)
2135  */
2136 static int
2137 server_handle_reset (void *cls,
2138                      struct GNUNET_MESH_Tunnel *tunnel,
2139                      void **tunnel_ctx,
2140                      const struct GNUNET_PeerIdentity *sender,
2141                      const struct GNUNET_MessageHeader *message,
2142                      const struct GNUNET_ATS_Information*atsi)
2143 {
2144   // struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2145
2146   return GNUNET_OK;
2147 }
2148
2149
2150 /**
2151  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
2152  *
2153  * @param cls the closure
2154  * @param tunnel connection to the other end
2155  * @param tunnel_ctx the socket
2156  * @param sender who sent the message
2157  * @param message the actual message
2158  * @param atsi performance data for the connection
2159  * @return GNUNET_OK to keep the connection open,
2160  *         GNUNET_SYSERR to close it (signal serious error)
2161  */
2162 static int
2163 server_handle_transmit_close (void *cls,
2164                               struct GNUNET_MESH_Tunnel *tunnel,
2165                               void **tunnel_ctx,
2166                               const struct GNUNET_PeerIdentity *sender,
2167                               const struct GNUNET_MessageHeader *message,
2168                               const struct GNUNET_ATS_Information*atsi)
2169 {
2170   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2171
2172   return handle_transmit_close (socket,
2173                                 tunnel,
2174                                 sender,
2175                                 (struct GNUNET_STREAM_MessageHeader *)message,
2176                                 atsi);
2177 }
2178
2179
2180 /**
2181  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
2182  *
2183  * @param cls the closure
2184  * @param tunnel connection to the other end
2185  * @param tunnel_ctx the socket
2186  * @param sender who sent the message
2187  * @param message the actual message
2188  * @param atsi performance data for the connection
2189  * @return GNUNET_OK to keep the connection open,
2190  *         GNUNET_SYSERR to close it (signal serious error)
2191  */
2192 static int
2193 server_handle_transmit_close_ack (void *cls,
2194                                   struct GNUNET_MESH_Tunnel *tunnel,
2195                                   void **tunnel_ctx,
2196                                   const struct GNUNET_PeerIdentity *sender,
2197                                   const struct GNUNET_MessageHeader *message,
2198                                   const struct GNUNET_ATS_Information*atsi)
2199 {
2200   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2201
2202   return handle_generic_close_ack (socket,
2203                                    tunnel,
2204                                    sender,
2205                                    (const struct GNUNET_STREAM_MessageHeader *)
2206                                    message,
2207                                    atsi,
2208                                    SHUT_WR);
2209 }
2210
2211
2212 /**
2213  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
2214  *
2215  * @param cls the closure
2216  * @param tunnel connection to the other end
2217  * @param tunnel_ctx the socket
2218  * @param sender who sent the message
2219  * @param message the actual message
2220  * @param atsi performance data for the connection
2221  * @return GNUNET_OK to keep the connection open,
2222  *         GNUNET_SYSERR to close it (signal serious error)
2223  */
2224 static int
2225 server_handle_receive_close (void *cls,
2226                              struct GNUNET_MESH_Tunnel *tunnel,
2227                              void **tunnel_ctx,
2228                              const struct GNUNET_PeerIdentity *sender,
2229                              const struct GNUNET_MessageHeader *message,
2230                              const struct GNUNET_ATS_Information*atsi)
2231 {
2232   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2233
2234   return
2235     handle_receive_close (socket,
2236                           tunnel,
2237                           sender,
2238                           (const struct GNUNET_STREAM_MessageHeader *) message,
2239                           atsi);
2240 }
2241
2242
2243 /**
2244  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
2245  *
2246  * @param cls the closure
2247  * @param tunnel connection to the other end
2248  * @param tunnel_ctx the socket
2249  * @param sender who sent the message
2250  * @param message the actual message
2251  * @param atsi performance data for the connection
2252  * @return GNUNET_OK to keep the connection open,
2253  *         GNUNET_SYSERR to close it (signal serious error)
2254  */
2255 static int
2256 server_handle_receive_close_ack (void *cls,
2257                                  struct GNUNET_MESH_Tunnel *tunnel,
2258                                  void **tunnel_ctx,
2259                                  const struct GNUNET_PeerIdentity *sender,
2260                                  const struct GNUNET_MessageHeader *message,
2261                                  const struct GNUNET_ATS_Information*atsi)
2262 {
2263   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2264
2265   return handle_generic_close_ack (socket,
2266                                    tunnel,
2267                                    sender,
2268                                    (const struct GNUNET_STREAM_MessageHeader *)
2269                                    message,
2270                                    atsi,
2271                                    SHUT_RD);
2272 }
2273
2274
2275 /**
2276  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
2277  *
2278  * @param cls the listen socket (from GNUNET_MESH_connect in
2279  *          GNUNET_STREAM_listen) 
2280  * @param tunnel connection to the other end
2281  * @param tunnel_ctx the socket
2282  * @param sender who sent the message
2283  * @param message the actual message
2284  * @param atsi performance data for the connection
2285  * @return GNUNET_OK to keep the connection open,
2286  *         GNUNET_SYSERR to close it (signal serious error)
2287  */
2288 static int
2289 server_handle_close (void *cls,
2290                      struct GNUNET_MESH_Tunnel *tunnel,
2291                      void **tunnel_ctx,
2292                      const struct GNUNET_PeerIdentity *sender,
2293                      const struct GNUNET_MessageHeader *message,
2294                      const struct GNUNET_ATS_Information*atsi)
2295 {
2296   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2297   
2298   return handle_close (socket,
2299                        tunnel,
2300                        sender,
2301                        (const struct GNUNET_STREAM_MessageHeader *) message,
2302                        atsi);
2303 }
2304
2305
2306 /**
2307  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
2308  *
2309  * @param cls the closure
2310  * @param tunnel connection to the other end
2311  * @param tunnel_ctx the socket
2312  * @param sender who sent the message
2313  * @param message the actual message
2314  * @param atsi performance data for the connection
2315  * @return GNUNET_OK to keep the connection open,
2316  *         GNUNET_SYSERR to close it (signal serious error)
2317  */
2318 static int
2319 server_handle_close_ack (void *cls,
2320                          struct GNUNET_MESH_Tunnel *tunnel,
2321                          void **tunnel_ctx,
2322                          const struct GNUNET_PeerIdentity *sender,
2323                          const struct GNUNET_MessageHeader *message,
2324                          const struct GNUNET_ATS_Information*atsi)
2325 {
2326   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2327
2328   return handle_generic_close_ack (socket,
2329                                    tunnel,
2330                                    sender,
2331                                    (const struct GNUNET_STREAM_MessageHeader *) 
2332                                    message,
2333                                    atsi,
2334                                    SHUT_RDWR);
2335 }
2336
2337
2338 /**
2339  * Handler for DATA_ACK messages
2340  *
2341  * @param socket the socket through which the ack was received
2342  * @param tunnel connection to the other end
2343  * @param sender who sent the message
2344  * @param ack the acknowledgment message
2345  * @param atsi performance data for the connection
2346  * @return GNUNET_OK to keep the connection open,
2347  *         GNUNET_SYSERR to close it (signal serious error)
2348  */
2349 static int
2350 handle_ack (struct GNUNET_STREAM_Socket *socket,
2351             struct GNUNET_MESH_Tunnel *tunnel,
2352             const struct GNUNET_PeerIdentity *sender,
2353             const struct GNUNET_STREAM_AckMessage *ack,
2354             const struct GNUNET_ATS_Information*atsi)
2355 {
2356   unsigned int packet;
2357   int need_retransmission;
2358   
2359
2360   if (GNUNET_PEER_search (sender) != socket->other_peer)
2361     {
2362       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2363                   "%x: Received ACK from non-confirming peer\n",
2364                   socket->our_id);
2365       return GNUNET_YES;
2366     }
2367
2368   switch (socket->state)
2369     {
2370     case (STATE_ESTABLISHED):
2371     case (STATE_RECEIVE_CLOSED):
2372     case (STATE_RECEIVE_CLOSE_WAIT):
2373       if (NULL == socket->write_handle)
2374         {
2375           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2376                       "%x: Received DATA_ACK when write_handle is NULL\n",
2377                       socket->our_id);
2378           return GNUNET_OK;
2379         }
2380       /* FIXME: increment in the base sequence number is breaking current flow
2381        */
2382       if (!((socket->write_sequence_number 
2383              - ntohl (ack->base_sequence_number)) < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
2384         {
2385           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2386                       "%x: Received DATA_ACK with unexpected base sequence "
2387                       "number\n",
2388                       socket->our_id);
2389           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2390                       "%x: Current write sequence: %u; Ack's base sequence: %u\n",
2391                       socket->our_id,
2392                       socket->write_sequence_number,
2393                       ntohl (ack->base_sequence_number));
2394           return GNUNET_OK;
2395         }
2396       /* FIXME: include the case when write_handle is cancelled - ignore the 
2397          acks */
2398
2399       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2400                   "%x: Received DATA_ACK from %x\n",
2401                   socket->our_id,
2402                   socket->other_peer);
2403       
2404       /* Cancel the retransmission task */
2405       if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
2406         {
2407           GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
2408           socket->retransmission_timeout_task_id = 
2409             GNUNET_SCHEDULER_NO_TASK;
2410         }
2411
2412       for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2413         {
2414           if (NULL == socket->write_handle->messages[packet]) break;
2415           if (ntohl (ack->base_sequence_number)
2416               >= ntohl (socket->write_handle->messages[packet]->sequence_number))
2417             ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
2418                                   packet,
2419                                   GNUNET_YES);
2420           else
2421             if (GNUNET_YES == 
2422                 ackbitmap_is_bit_set (&socket->write_handle->ack_bitmap,
2423                                       ntohl (socket->write_handle->messages[packet]->sequence_number)
2424                                       - ntohl (ack->base_sequence_number)))
2425               ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
2426                                     packet,
2427                                     GNUNET_YES);
2428         }
2429
2430       /* Update the receive window remaining
2431        FIXME : Should update with the value from a data ack with greater
2432        sequence number */
2433       socket->receiver_window_available = 
2434         ntohl (ack->receive_window_remaining);
2435
2436       /* Check if we have received all acknowledgements */
2437       need_retransmission = GNUNET_NO;
2438       for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2439         {
2440           if (NULL == socket->write_handle->messages[packet]) break;
2441           if (GNUNET_YES != ackbitmap_is_bit_set 
2442               (&socket->write_handle->ack_bitmap,packet))
2443             {
2444               need_retransmission = GNUNET_YES;
2445               break;
2446             }
2447         }
2448       if (GNUNET_YES == need_retransmission)
2449         {
2450           write_data (socket);
2451         }
2452       else      /* We have to call the write continuation callback now */
2453         {
2454           /* Free the packets */
2455           for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2456             {
2457               GNUNET_free_non_null (socket->write_handle->messages[packet]);
2458             }
2459           if (NULL != socket->write_handle->write_cont)
2460             socket->write_handle->write_cont
2461               (socket->write_handle->write_cont_cls,
2462                socket->status,
2463                socket->write_handle->size);
2464           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2465                       "%x: Write completion callback completed\n",
2466                       socket->our_id);
2467           /* We are done with the write handle - Freeing it */
2468           GNUNET_free (socket->write_handle);
2469           socket->write_handle = NULL;
2470         }
2471       break;
2472     default:
2473       break;
2474     }
2475   return GNUNET_OK;
2476 }
2477
2478
2479 /**
2480  * Handler for DATA_ACK messages
2481  *
2482  * @param cls the 'struct GNUNET_STREAM_Socket'
2483  * @param tunnel connection to the other end
2484  * @param tunnel_ctx unused
2485  * @param sender who sent the message
2486  * @param message the actual message
2487  * @param atsi performance data for the connection
2488  * @return GNUNET_OK to keep the connection open,
2489  *         GNUNET_SYSERR to close it (signal serious error)
2490  */
2491 static int
2492 client_handle_ack (void *cls,
2493                    struct GNUNET_MESH_Tunnel *tunnel,
2494                    void **tunnel_ctx,
2495                    const struct GNUNET_PeerIdentity *sender,
2496                    const struct GNUNET_MessageHeader *message,
2497                    const struct GNUNET_ATS_Information*atsi)
2498 {
2499   struct GNUNET_STREAM_Socket *socket = cls;
2500   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2501  
2502   return handle_ack (socket, tunnel, sender, ack, atsi);
2503 }
2504
2505
2506 /**
2507  * Handler for DATA_ACK messages
2508  *
2509  * @param cls the server's listen socket
2510  * @param tunnel connection to the other end
2511  * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
2512  * @param sender who sent the message
2513  * @param message the actual message
2514  * @param atsi performance data for the connection
2515  * @return GNUNET_OK to keep the connection open,
2516  *         GNUNET_SYSERR to close it (signal serious error)
2517  */
2518 static int
2519 server_handle_ack (void *cls,
2520                    struct GNUNET_MESH_Tunnel *tunnel,
2521                    void **tunnel_ctx,
2522                    const struct GNUNET_PeerIdentity *sender,
2523                    const struct GNUNET_MessageHeader *message,
2524                    const struct GNUNET_ATS_Information*atsi)
2525 {
2526   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2527   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2528  
2529   return handle_ack (socket, tunnel, sender, ack, atsi);
2530 }
2531
2532
2533 /**
2534  * For client message handlers, the stream socket is in the
2535  * closure argument.
2536  */
2537 static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
2538   {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2539   {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2540    sizeof (struct GNUNET_STREAM_AckMessage) },
2541   {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2542    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2543   {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2544    sizeof (struct GNUNET_STREAM_MessageHeader)},
2545   {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2546    sizeof (struct GNUNET_STREAM_MessageHeader)},
2547   {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2548    sizeof (struct GNUNET_STREAM_MessageHeader)},
2549   {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2550    sizeof (struct GNUNET_STREAM_MessageHeader)},
2551   {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2552    sizeof (struct GNUNET_STREAM_MessageHeader)},
2553   {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2554    sizeof (struct GNUNET_STREAM_MessageHeader)},
2555   {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2556    sizeof (struct GNUNET_STREAM_MessageHeader)},
2557   {NULL, 0, 0}
2558 };
2559
2560
2561 /**
2562  * For server message handlers, the stream socket is in the
2563  * tunnel context, and the listen socket in the closure argument.
2564  */
2565 static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
2566   {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2567   {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2568    sizeof (struct GNUNET_STREAM_AckMessage) },
2569   {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 
2570    sizeof (struct GNUNET_STREAM_MessageHeader)},
2571   {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2572    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2573   {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2574    sizeof (struct GNUNET_STREAM_MessageHeader)},
2575   {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2576    sizeof (struct GNUNET_STREAM_MessageHeader)},
2577   {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2578    sizeof (struct GNUNET_STREAM_MessageHeader)},
2579   {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2580    sizeof (struct GNUNET_STREAM_MessageHeader)},
2581   {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2582    sizeof (struct GNUNET_STREAM_MessageHeader)},
2583   {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2584    sizeof (struct GNUNET_STREAM_MessageHeader)},
2585   {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2586    sizeof (struct GNUNET_STREAM_MessageHeader)},
2587   {NULL, 0, 0}
2588 };
2589
2590
2591 /**
2592  * Function called when our target peer is connected to our tunnel
2593  *
2594  * @param cls the socket for which this tunnel is created
2595  * @param peer the peer identity of the target
2596  * @param atsi performance data for the connection
2597  */
2598 static void
2599 mesh_peer_connect_callback (void *cls,
2600                             const struct GNUNET_PeerIdentity *peer,
2601                             const struct GNUNET_ATS_Information * atsi)
2602 {
2603   struct GNUNET_STREAM_Socket *socket = cls;
2604   struct GNUNET_STREAM_MessageHeader *message;
2605   GNUNET_PEER_Id connected_peer;
2606
2607   connected_peer = GNUNET_PEER_search (peer);
2608   
2609   if (connected_peer != socket->other_peer)
2610     {
2611       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2612                   "%x: A peer which is not our target has connected",
2613                   "to our tunnel\n",
2614                   socket->our_id);
2615       return;
2616     }
2617   
2618   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2619               "%x: Target peer %x connected\n", 
2620               socket->our_id,
2621               connected_peer);
2622   
2623   /* Set state to INIT */
2624   socket->state = STATE_INIT;
2625
2626   /* Send HELLO message */
2627   message = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
2628   message->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
2629   message->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
2630   queue_message (socket,
2631                  message,
2632                  &set_state_hello_wait,
2633                  NULL);
2634
2635   /* Call open callback */
2636   if (NULL == socket->open_cb)
2637     {
2638       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2639                   "STREAM_open callback is NULL\n");
2640     }
2641 }
2642
2643
2644 /**
2645  * Function called when our target peer is disconnected from our tunnel
2646  *
2647  * @param cls the socket associated which this tunnel
2648  * @param peer the peer identity of the target
2649  */
2650 static void
2651 mesh_peer_disconnect_callback (void *cls,
2652                                const struct GNUNET_PeerIdentity *peer)
2653 {
2654   struct GNUNET_STREAM_Socket *socket=cls;
2655   
2656   /* If the state is SHUTDOWN its ok; else set the state of the socket to SYSERR */
2657   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2658               "%x: Other peer %x disconnected \n",
2659               socket->our_id,
2660               socket->other_peer);
2661 }
2662
2663
2664 /**
2665  * Method called whenever a peer creates a tunnel to us
2666  *
2667  * @param cls closure
2668  * @param tunnel new handle to the tunnel
2669  * @param initiator peer that started the tunnel
2670  * @param atsi performance information for the tunnel
2671  * @return initial tunnel context for the tunnel
2672  *         (can be NULL -- that's not an error)
2673  */
2674 static void *
2675 new_tunnel_notify (void *cls,
2676                    struct GNUNET_MESH_Tunnel *tunnel,
2677                    const struct GNUNET_PeerIdentity *initiator,
2678                    const struct GNUNET_ATS_Information *atsi)
2679 {
2680   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2681   struct GNUNET_STREAM_Socket *socket;
2682
2683   /* FIXME: If a tunnel is already created, we should not accept new tunnels
2684      from the same peer again until the socket is closed */
2685
2686   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2687   socket->other_peer = GNUNET_PEER_intern (initiator);
2688   socket->tunnel = tunnel;
2689   socket->session_id = 0;       /* FIXME */
2690   socket->state = STATE_INIT;
2691   socket->lsocket = lsocket;
2692   socket->our_id = lsocket->our_id;
2693   
2694   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2695               "%x: Peer %x initiated tunnel to us\n", 
2696               socket->our_id,
2697               socket->other_peer);
2698   
2699   /* FIXME: Copy MESH handle from lsocket to socket */
2700   
2701   return socket;
2702 }
2703
2704
2705 /**
2706  * Function called whenever an inbound tunnel is destroyed.  Should clean up
2707  * any associated state.  This function is NOT called if the client has
2708  * explicitly asked for the tunnel to be destroyed using
2709  * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
2710  * the tunnel.
2711  *
2712  * @param cls closure (set from GNUNET_MESH_connect)
2713  * @param tunnel connection to the other end (henceforth invalid)
2714  * @param tunnel_ctx place where local state associated
2715  *                   with the tunnel is stored
2716  */
2717 static void 
2718 tunnel_cleaner (void *cls,
2719                 const struct GNUNET_MESH_Tunnel *tunnel,
2720                 void *tunnel_ctx)
2721 {
2722   struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
2723
2724   if (tunnel != socket->tunnel)
2725     return;
2726
2727   GNUNET_break_op(0);
2728   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2729               "%x: Peer %x has terminated connection abruptly\n",
2730               socket->our_id,
2731               socket->other_peer);
2732
2733   socket->status = GNUNET_STREAM_SHUTDOWN;
2734
2735   /* Clear Transmit handles */
2736   if (NULL != socket->transmit_handle)
2737     {
2738       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2739       socket->transmit_handle = NULL;
2740     }
2741   if (NULL != socket->ack_transmit_handle)
2742     {
2743       GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle);
2744       GNUNET_free (socket->ack_msg);
2745       socket->ack_msg = NULL;
2746       socket->ack_transmit_handle = NULL;
2747     }
2748   /* Stop Tasks using socket->tunnel */
2749   if (GNUNET_SCHEDULER_NO_TASK != socket->ack_task_id)
2750     {
2751       GNUNET_SCHEDULER_cancel (socket->ack_task_id);
2752       socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
2753     }
2754   if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
2755     {
2756       GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
2757       socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
2758     }
2759   /* FIXME: Cancel all other tasks using socket->tunnel */
2760   socket->tunnel = NULL;
2761 }
2762
2763
2764 /*****************/
2765 /* API functions */
2766 /*****************/
2767
2768
2769 /**
2770  * Tries to open a stream to the target peer
2771  *
2772  * @param cfg configuration to use
2773  * @param target the target peer to which the stream has to be opened
2774  * @param app_port the application port number which uniquely identifies this
2775  *            stream
2776  * @param open_cb this function will be called after stream has be established 
2777  * @param open_cb_cls the closure for open_cb
2778  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
2779  * @return if successful it returns the stream socket; NULL if stream cannot be
2780  *         opened 
2781  */
2782 struct GNUNET_STREAM_Socket *
2783 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
2784                     const struct GNUNET_PeerIdentity *target,
2785                     GNUNET_MESH_ApplicationType app_port,
2786                     GNUNET_STREAM_OpenCallback open_cb,
2787                     void *open_cb_cls,
2788                     ...)
2789 {
2790   struct GNUNET_STREAM_Socket *socket;
2791   struct GNUNET_PeerIdentity own_peer_id;
2792   enum GNUNET_STREAM_Option option;
2793   GNUNET_MESH_ApplicationType ports[] = {app_port, 0};
2794   va_list vargs;                /* Variable arguments */
2795
2796   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2797               "%s\n", __func__);
2798
2799   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2800   socket->other_peer = GNUNET_PEER_intern (target);
2801   socket->open_cb = open_cb;
2802   socket->open_cls = open_cb_cls;
2803   GNUNET_TESTING_get_peer_identity (cfg, &own_peer_id);
2804   socket->our_id = GNUNET_PEER_intern (&own_peer_id);
2805   
2806   /* Set defaults */
2807   socket->retransmit_timeout = 
2808     GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
2809
2810   va_start (vargs, open_cb_cls); /* Parse variable args */
2811   do {
2812     option = va_arg (vargs, enum GNUNET_STREAM_Option);
2813     switch (option)
2814       {
2815       case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
2816         /* Expect struct GNUNET_TIME_Relative */
2817         socket->retransmit_timeout = va_arg (vargs,
2818                                              struct GNUNET_TIME_Relative);
2819         break;
2820       case GNUNET_STREAM_OPTION_END:
2821         break;
2822       }
2823   } while (GNUNET_STREAM_OPTION_END != option);
2824   va_end (vargs);               /* End of variable args parsing */
2825   socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
2826                                       10,  /* QUEUE size as parameter? */
2827                                       socket, /* cls */
2828                                       NULL, /* No inbound tunnel handler */
2829                                       NULL, /* No in-tunnel cleaner */
2830                                       client_message_handlers,
2831                                       ports); /* We don't get inbound tunnels */
2832   if (NULL == socket->mesh)   /* Fail if we cannot connect to mesh */
2833     {
2834       GNUNET_free (socket);
2835       return NULL;
2836     }
2837
2838   /* Now create the mesh tunnel to target */
2839   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2840               "Creating MESH Tunnel\n");
2841   socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
2842                                               NULL, /* Tunnel context */
2843                                               &mesh_peer_connect_callback,
2844                                               &mesh_peer_disconnect_callback,
2845                                               socket);
2846   GNUNET_assert (NULL != socket->tunnel);
2847   GNUNET_MESH_peer_request_connect_add (socket->tunnel,
2848                                         target);
2849   
2850   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2851               "%s() END\n", __func__);
2852   return socket;
2853 }
2854
2855
2856 /**
2857  * Shutdown the stream for reading or writing (similar to man 2 shutdown).
2858  *
2859  * @param socket the stream socket
2860  * @param operation SHUT_RD, SHUT_WR or SHUT_RDWR
2861  * @param completion_cb the callback that will be called upon successful
2862  *          shutdown of given operation
2863  * @param completion_cls the closure for the completion callback
2864  * @return the shutdown handle
2865  */
2866 struct GNUNET_STREAM_ShutdownHandle *
2867 GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
2868                         int operation,
2869                         GNUNET_STREAM_ShutdownCompletion completion_cb,
2870                         void *completion_cls)
2871 {
2872   struct GNUNET_STREAM_ShutdownHandle *handle;
2873   struct GNUNET_STREAM_MessageHeader *msg;
2874   
2875   GNUNET_assert (NULL == socket->shutdown_handle);
2876
2877   handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ShutdownHandle));
2878   handle->socket = socket;
2879   handle->completion_cb = completion_cb;
2880   handle->completion_cls = completion_cls;
2881   socket->shutdown_handle = handle;
2882
2883   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
2884   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
2885   switch (operation)
2886     {
2887     case SHUT_RD:
2888       handle->operation = SHUT_RD;
2889       if (NULL != socket->read_handle)
2890         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2891                     "Existing read handle should be cancelled before shutting"
2892                     " down reading\n");
2893       msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
2894       queue_message (socket,
2895                      msg,
2896                      &set_state_receive_close_wait,
2897                      NULL);
2898       break;
2899     case SHUT_WR:
2900       handle->operation = SHUT_WR;
2901       if (NULL != socket->write_handle)
2902         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2903                     "Existing write handle should be cancelled before shutting"
2904                     " down writing\n");
2905       msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
2906       queue_message (socket,
2907                      msg,
2908                      &set_state_transmit_close_wait,
2909                      NULL);
2910       break;
2911     case SHUT_RDWR:
2912       handle->operation = SHUT_RDWR;
2913       if (NULL != socket->write_handle)
2914         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2915                     "Existing write handle should be cancelled before shutting"
2916                     " down writing\n");
2917       if (NULL != socket->read_handle)
2918         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2919                     "Existing read handle should be cancelled before shutting"
2920                     " down reading\n");
2921       msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
2922       queue_message (socket,
2923                      msg,
2924                      &set_state_close_wait,
2925                      NULL);
2926       break;
2927     default:
2928       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2929                   "GNUNET_STREAM_shutdown called with invalid value for "
2930                   "parameter operation -- Ignoring\n");
2931       GNUNET_free (msg);
2932       GNUNET_free (handle);
2933       return NULL;
2934     }
2935   handle->close_msg_retransmission_task_id =
2936     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
2937                                   &close_msg_retransmission_task,
2938                                   handle);
2939   return handle;
2940 }
2941
2942
2943 /**
2944  * Cancels a pending shutdown
2945  *
2946  * @param handle the shutdown handle returned from GNUNET_STREAM_shutdown
2947  */
2948 void
2949 GNUNET_STREAM_shutdown_cancel (struct GNUNET_STREAM_ShutdownHandle *handle)
2950 {
2951   if (GNUNET_SCHEDULER_NO_TASK != handle->close_msg_retransmission_task_id)
2952     GNUNET_SCHEDULER_cancel (handle->close_msg_retransmission_task_id);
2953   GNUNET_free (handle);
2954   return;
2955 }
2956
2957
2958 /**
2959  * Closes the stream
2960  *
2961  * @param socket the stream socket
2962  */
2963 void
2964 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
2965 {
2966   struct MessageQueue *head;
2967
2968   GNUNET_break (NULL == socket->read_handle);
2969   GNUNET_break (NULL == socket->write_handle);
2970
2971   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
2972     {
2973       /* socket closed with read task pending!? */
2974       GNUNET_break (0);
2975       GNUNET_SCHEDULER_cancel (socket->read_task_id);
2976       socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
2977     }
2978   
2979   /* Terminate the ack'ing tasks if they are still present */
2980   if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK)
2981     {
2982       GNUNET_SCHEDULER_cancel (socket->ack_task_id);
2983       socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
2984     }
2985
2986   /* Clear Transmit handles */
2987   if (NULL != socket->transmit_handle)
2988     {
2989       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2990       socket->transmit_handle = NULL;
2991     }
2992   if (NULL != socket->ack_transmit_handle)
2993     {
2994       GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle);
2995       GNUNET_free (socket->ack_msg);
2996       socket->ack_msg = NULL;
2997       socket->ack_transmit_handle = NULL;
2998     }
2999
3000   /* Clear existing message queue */
3001   while (NULL != (head = socket->queue_head)) {
3002     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
3003                                  socket->queue_tail,
3004                                  head);
3005     GNUNET_free (head->message);
3006     GNUNET_free (head);
3007   }
3008
3009   /* Close associated tunnel */
3010   if (NULL != socket->tunnel)
3011     {
3012       GNUNET_MESH_tunnel_destroy (socket->tunnel);
3013       socket->tunnel = NULL;
3014     }
3015
3016   /* Close mesh connection */
3017   if (NULL != socket->mesh && NULL == socket->lsocket)
3018     {
3019       GNUNET_MESH_disconnect (socket->mesh);
3020       socket->mesh = NULL;
3021     }
3022   
3023   /* Release receive buffer */
3024   if (NULL != socket->receive_buffer)
3025     {
3026       GNUNET_free (socket->receive_buffer);
3027     }
3028
3029   GNUNET_free (socket);
3030 }
3031
3032
3033 /**
3034  * Listens for stream connections for a specific application ports
3035  *
3036  * @param cfg the configuration to use
3037  * @param app_port the application port for which new streams will be accepted
3038  * @param listen_cb this function will be called when a peer tries to establish
3039  *            a stream with us
3040  * @param listen_cb_cls closure for listen_cb
3041  * @return listen socket, NULL for any error
3042  */
3043 struct GNUNET_STREAM_ListenSocket *
3044 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
3045                       GNUNET_MESH_ApplicationType app_port,
3046                       GNUNET_STREAM_ListenCallback listen_cb,
3047                       void *listen_cb_cls)
3048 {
3049   /* FIXME: Add variable args for passing configration options? */
3050   struct GNUNET_STREAM_ListenSocket *lsocket;
3051   GNUNET_MESH_ApplicationType ports[] = {app_port, 0};
3052   struct GNUNET_PeerIdentity our_peer_id;
3053
3054   lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
3055   lsocket->port = app_port;
3056   lsocket->listen_cb = listen_cb;
3057   lsocket->listen_cb_cls = listen_cb_cls;
3058   GNUNET_TESTING_get_peer_identity (cfg, &our_peer_id);
3059   lsocket->our_id = GNUNET_PEER_intern (&our_peer_id);
3060   lsocket->mesh = GNUNET_MESH_connect (cfg,
3061                                        10, /* FIXME: QUEUE size as parameter? */
3062                                        lsocket, /* Closure */
3063                                        &new_tunnel_notify,
3064                                        &tunnel_cleaner,
3065                                        server_message_handlers,
3066                                        ports);
3067   GNUNET_assert (NULL != lsocket->mesh);
3068   return lsocket;
3069 }
3070
3071
3072 /**
3073  * Closes the listen socket
3074  *
3075  * @param lsocket the listen socket
3076  */
3077 void
3078 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
3079 {
3080   /* Close MESH connection */
3081   GNUNET_assert (NULL != lsocket->mesh);
3082   GNUNET_MESH_disconnect (lsocket->mesh);
3083   
3084   GNUNET_free (lsocket);
3085 }
3086
3087
3088 /**
3089  * Tries to write the given data to the stream. The maximum size of data that
3090  * can be written as part of a write operation is (64 * (64000 - sizeof (struct
3091  * GNUNET_STREAM_DataMessage))). If size is greater than this it is not an API
3092  * violation, however only the said number of maximum bytes will be written.
3093  *
3094  * @param socket the socket representing a stream
3095  * @param data the data buffer from where the data is written into the stream
3096  * @param size the number of bytes to be written from the data buffer
3097  * @param timeout the timeout period
3098  * @param write_cont the function to call upon writing some bytes into the
3099  *          stream 
3100  * @param write_cont_cls the closure
3101  *
3102  * @return handle to cancel the operation; if a previous write is pending or
3103  *           the stream has been shutdown for this operation then write_cont is
3104  *           immediately called and NULL is returned.
3105  */
3106 struct GNUNET_STREAM_IOWriteHandle *
3107 GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
3108                      const void *data,
3109                      size_t size,
3110                      struct GNUNET_TIME_Relative timeout,
3111                      GNUNET_STREAM_CompletionContinuation write_cont,
3112                      void *write_cont_cls)
3113 {
3114   unsigned int num_needed_packets;
3115   unsigned int packet;
3116   struct GNUNET_STREAM_IOWriteHandle *io_handle;
3117   uint32_t packet_size;
3118   uint32_t payload_size;
3119   struct GNUNET_STREAM_DataMessage *data_msg;
3120   const void *sweep;
3121   struct GNUNET_TIME_Relative ack_deadline;
3122
3123   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3124               "%s\n", __func__);
3125
3126   /* Return NULL if there is already a write request pending */
3127   if (NULL != socket->write_handle)
3128   {
3129     GNUNET_break (0);
3130     return NULL;
3131   }
3132
3133   switch (socket->state)
3134     {
3135     case STATE_TRANSMIT_CLOSED:
3136     case STATE_TRANSMIT_CLOSE_WAIT:
3137     case STATE_CLOSED:
3138     case STATE_CLOSE_WAIT:
3139       if (NULL != write_cont)
3140         write_cont (write_cont_cls, GNUNET_STREAM_SHUTDOWN, 0);
3141       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3142                   "%s() END\n", __func__);
3143       return NULL;
3144     case STATE_INIT:
3145     case STATE_LISTEN:
3146     case STATE_HELLO_WAIT:
3147       if (NULL != write_cont)
3148         /* FIXME: GNUNET_STREAM_SYSERR?? */
3149         write_cont (write_cont_cls, GNUNET_STREAM_SYSERR, 0);
3150       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3151                   "%s() END\n", __func__);
3152       return NULL;
3153     case STATE_ESTABLISHED:
3154     case STATE_RECEIVE_CLOSED:
3155     case STATE_RECEIVE_CLOSE_WAIT:
3156       break;
3157     }
3158
3159   if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size)
3160     size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH  * max_payload_size;
3161   num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size;
3162   io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
3163   io_handle->socket = socket;
3164   io_handle->write_cont = write_cont;
3165   io_handle->write_cont_cls = write_cont_cls;
3166   io_handle->size = size;
3167   sweep = data;
3168   /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3169      determined from RTT */
3170   ack_deadline = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
3171   /* Divide the given buffer into packets for sending */
3172   for (packet=0; packet < num_needed_packets; packet++)
3173     {
3174       if ((packet + 1) * max_payload_size < size) 
3175         {
3176           payload_size = max_payload_size;
3177           packet_size = MAX_PACKET_SIZE;
3178         }
3179       else 
3180         {
3181           payload_size = size - packet * max_payload_size;
3182           packet_size =  payload_size + sizeof (struct
3183                                                 GNUNET_STREAM_DataMessage); 
3184         }
3185       io_handle->messages[packet] = GNUNET_malloc (packet_size);
3186       io_handle->messages[packet]->header.header.size = htons (packet_size);
3187       io_handle->messages[packet]->header.header.type =
3188         htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
3189       io_handle->messages[packet]->sequence_number =
3190         htonl (socket->write_sequence_number++);
3191       io_handle->messages[packet]->offset = htonl (socket->write_offset);
3192
3193       /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3194          determined from RTT */
3195       io_handle->messages[packet]->ack_deadline =
3196         GNUNET_TIME_relative_hton (ack_deadline);
3197       data_msg = io_handle->messages[packet];
3198       /* Copy data from given buffer to the packet */
3199       memcpy (&data_msg[1],
3200               sweep,
3201               payload_size);
3202       sweep += payload_size;
3203       socket->write_offset += payload_size;
3204     }
3205   socket->write_handle = io_handle;
3206   write_data (socket);
3207
3208   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3209               "%s() END\n", __func__);
3210
3211   return io_handle;
3212 }
3213
3214
3215
3216 /**
3217  * Tries to read data from the stream.
3218  *
3219  * @param socket the socket representing a stream
3220  * @param timeout the timeout period
3221  * @param proc function to call with data (once only)
3222  * @param proc_cls the closure for proc
3223  *
3224  * @return handle to cancel the operation; if the stream has been shutdown for
3225  *           this type of opeartion then the DataProcessor is immediately
3226  *           called with GNUNET_STREAM_SHUTDOWN as status and NULL if returned
3227  */
3228 struct GNUNET_STREAM_IOReadHandle *
3229 GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
3230                     struct GNUNET_TIME_Relative timeout,
3231                     GNUNET_STREAM_DataProcessor proc,
3232                     void *proc_cls)
3233 {
3234   struct GNUNET_STREAM_IOReadHandle *read_handle;
3235   
3236   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3237               "%x: %s()\n", 
3238               socket->our_id,
3239               __func__);
3240
3241   /* Return NULL if there is already a read handle; the user has to cancel that
3242   first before continuing or has to wait until it is completed */
3243   if (NULL != socket->read_handle) return NULL;
3244
3245   GNUNET_assert (NULL != proc);
3246
3247   switch (socket->state)
3248     {
3249     case STATE_RECEIVE_CLOSED:
3250     case STATE_RECEIVE_CLOSE_WAIT:
3251     case STATE_CLOSED:
3252     case STATE_CLOSE_WAIT:
3253       proc (proc_cls, GNUNET_STREAM_SHUTDOWN, NULL, 0);
3254       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3255                   "%x: %s() END\n",
3256                   socket->our_id,
3257                   __func__);
3258       return NULL;
3259     default:
3260       break;
3261     }
3262
3263   read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
3264   read_handle->proc = proc;
3265   read_handle->proc_cls = proc_cls;
3266   socket->read_handle = read_handle;
3267
3268   /* Check if we have a packet at bitmap 0 */
3269   if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
3270                                           0))
3271     {
3272       socket->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
3273                                                        socket);
3274    
3275     }
3276   
3277   /* Setup the read timeout task */
3278   socket->read_io_timeout_task_id =
3279     GNUNET_SCHEDULER_add_delayed (timeout,
3280                                   &read_io_timeout,
3281                                   socket);
3282   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3283               "%x: %s() END\n",
3284               socket->our_id,
3285               __func__);
3286   return read_handle;
3287 }
3288
3289
3290 /**
3291  * Cancel pending write operation.
3292  *
3293  * @param ioh handle to operation to cancel
3294  */
3295 void
3296 GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
3297 {
3298   struct GNUNET_STREAM_Socket *socket = ioh->socket;
3299   unsigned int packet;
3300
3301   GNUNET_assert (NULL != socket->write_handle);
3302   GNUNET_assert (socket->write_handle == ioh);
3303
3304   if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
3305     {
3306       GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
3307       socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
3308     }
3309
3310   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
3311     {
3312       if (NULL == ioh->messages[packet]) break;
3313       GNUNET_free (ioh->messages[packet]);
3314     }
3315       
3316   GNUNET_free (socket->write_handle);
3317   socket->write_handle = NULL;
3318   return;
3319 }
3320
3321
3322 /**
3323  * Cancel pending read operation.
3324  *
3325  * @param ioh handle to operation to cancel
3326  */
3327 void
3328 GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh)
3329 {
3330   return;
3331 }