-added write io cancellation
[oweals/gnunet.git] / src / stream / stream_api.c
1 /*
2   This file is part of GNUnet.
3   (C) 2012 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /* TODO:
22  *
23  * Checks for matching the sender and socket->other_peer in server
24  * message handlers  
25  *
26  * Decrement PEER intern count during socket close and listen close to free the
27  * memory used for PEER interning
28  *
29  * Add code for write io timeout
30  **/
31
32 /**
33  * @file stream/stream_api.c
34  * @brief Implementation of the stream library
35  * @author Sree Harsha Totakura
36  */
37
38
39 #include "platform.h"
40 #include "gnunet_common.h"
41 #include "gnunet_crypto_lib.h"
42 #include "gnunet_stream_lib.h"
43 #include "gnunet_testing_lib.h"
44 #include "stream_protocol.h"
45
46
47 /**
48  * The maximum packet size of a stream packet
49  */
50 #define MAX_PACKET_SIZE 64000
51
52 /**
53  * Receive buffer
54  */
55 #define RECEIVE_BUFFER_SIZE 4096000
56
57 /**
58  * The maximum payload a data message packet can carry
59  */
60 static size_t max_payload_size = 
61   MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
62
63 /**
64  * states in the Protocol
65  */
66 enum State
67   {
68     /**
69      * Client initialization state
70      */
71     STATE_INIT,
72
73     /**
74      * Listener initialization state 
75      */
76     STATE_LISTEN,
77
78     /**
79      * Pre-connection establishment state
80      */
81     STATE_HELLO_WAIT,
82
83     /**
84      * State where a connection has been established
85      */
86     STATE_ESTABLISHED,
87
88     /**
89      * State where the socket is closed on our side and waiting to be ACK'ed
90      */
91     STATE_RECEIVE_CLOSE_WAIT,
92
93     /**
94      * State where the socket is closed for reading
95      */
96     STATE_RECEIVE_CLOSED,
97
98     /**
99      * State where the socket is closed on our side and waiting to be ACK'ed
100      */
101     STATE_TRANSMIT_CLOSE_WAIT,
102
103     /**
104      * State where the socket is closed for writing
105      */
106     STATE_TRANSMIT_CLOSED,
107
108     /**
109      * State where the socket is closed on our side and waiting to be ACK'ed
110      */
111     STATE_CLOSE_WAIT,
112
113     /**
114      * State where the socket is closed
115      */
116     STATE_CLOSED 
117   };
118
119
120 /**
121  * Functions of this type are called when a message is written
122  *
123  * @param cls the closure from queue_message
124  * @param socket the socket the written message was bound to
125  */
126 typedef void (*SendFinishCallback) (void *cls,
127                                     struct GNUNET_STREAM_Socket *socket);
128
129
130 /**
131  * The send message queue
132  */
133 struct MessageQueue
134 {
135   /**
136    * The message
137    */
138   struct GNUNET_STREAM_MessageHeader *message;
139
140   /**
141    * Callback to be called when the message is sent
142    */
143   SendFinishCallback finish_cb;
144
145   /**
146    * The closure for finish_cb
147    */
148   void *finish_cb_cls;
149
150   /**
151    * The next message in queue. Should be NULL in the last message
152    */
153   struct MessageQueue *next;
154
155   /**
156    * The next message in queue. Should be NULL in the first message
157    */
158   struct MessageQueue *prev;
159 };
160
161
162 /**
163  * The STREAM Socket Handler
164  */
165 struct GNUNET_STREAM_Socket
166 {
167   /**
168    * Retransmission timeout
169    */
170   struct GNUNET_TIME_Relative retransmit_timeout;
171
172   /**
173    * The Acknowledgement Bitmap
174    */
175   GNUNET_STREAM_AckBitmap ack_bitmap;
176
177   /**
178    * Time when the Acknowledgement was queued
179    */
180   struct GNUNET_TIME_Absolute ack_time_registered;
181
182   /**
183    * Queued Acknowledgement deadline
184    */
185   struct GNUNET_TIME_Relative ack_time_deadline;
186
187   /**
188    * The mesh handle
189    */
190   struct GNUNET_MESH_Handle *mesh;
191
192   /**
193    * The mesh tunnel handle
194    */
195   struct GNUNET_MESH_Tunnel *tunnel;
196
197   /**
198    * Stream open closure
199    */
200   void *open_cls;
201
202   /**
203    * Stream open callback
204    */
205   GNUNET_STREAM_OpenCallback open_cb;
206
207   /**
208    * The current transmit handle (if a pending transmit request exists)
209    */
210   struct GNUNET_MESH_TransmitHandle *transmit_handle;
211
212   /**
213    * The current message associated with the transmit handle
214    */
215   struct MessageQueue *queue_head;
216
217   /**
218    * The queue tail, should always point to the last message in queue
219    */
220   struct MessageQueue *queue_tail;
221
222   /**
223    * The write IO_handle associated with this socket
224    */
225   struct GNUNET_STREAM_IOWriteHandle *write_handle;
226
227   /**
228    * The read IO_handle associated with this socket
229    */
230   struct GNUNET_STREAM_IOReadHandle *read_handle;
231
232   /**
233    * Buffer for storing received messages
234    */
235   void *receive_buffer;
236
237   /**
238    * The listen socket from which this socket is derived. Should be NULL if it
239    * is not a derived socket
240    */
241   struct GNUNET_STREAM_ListenSocket *lsocket;
242
243   /**
244    * Task identifier for the read io timeout task
245    */
246   GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id;
247
248   /**
249    * Task identifier for retransmission task after timeout
250    */
251   GNUNET_SCHEDULER_TaskIdentifier retransmission_timeout_task_id;
252
253   /**
254    * The task for sending timely Acks
255    */
256   GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
257
258   /**
259    * Task scheduled to continue a read operation.
260    */
261   GNUNET_SCHEDULER_TaskIdentifier read_task_id;
262
263   /**
264    * The state of the protocol associated with this socket
265    */
266   enum State state;
267
268   /**
269    * The status of the socket
270    */
271   enum GNUNET_STREAM_Status status;
272
273   /**
274    * The number of previous timeouts; FIXME: currently not used
275    */
276   unsigned int retries;
277
278   /**
279    * The peer identity of the peer at the other end of the stream
280    */
281   GNUNET_PEER_Id other_peer;
282
283   /**
284    * Our Peer Identity (for debugging)
285    */
286   GNUNET_PEER_Id our_id;
287
288   /**
289    * The application port number (type: uint32_t)
290    */
291   GNUNET_MESH_ApplicationType app_port;
292
293   /**
294    * The session id associated with this stream connection
295    * FIXME: Not used currently, may be removed
296    */
297   uint32_t session_id;
298
299   /**
300    * Write sequence number. Set to random when sending HELLO(client) and
301    * HELLO_ACK(server) 
302    */
303   uint32_t write_sequence_number;
304
305   /**
306    * Read sequence number. This number's value is determined during handshake
307    */
308   uint32_t read_sequence_number;
309
310   /**
311    * The receiver buffer size
312    */
313   uint32_t receive_buffer_size;
314
315   /**
316    * The receiver buffer boundaries
317    */
318   uint32_t receive_buffer_boundaries[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
319
320   /**
321    * receiver's available buffer after the last acknowledged packet
322    */
323   uint32_t receiver_window_available;
324
325   /**
326    * The offset pointer used during write operation
327    */
328   uint32_t write_offset;
329
330   /**
331    * The offset after which we are expecting data
332    */
333   uint32_t read_offset;
334
335   /**
336    * The offset upto which user has read from the received buffer
337    */
338   uint32_t copy_offset;
339 };
340
341
342 /**
343  * A socket for listening
344  */
345 struct GNUNET_STREAM_ListenSocket
346 {
347   /**
348    * The mesh handle
349    */
350   struct GNUNET_MESH_Handle *mesh;
351
352   /**
353    * The callback function which is called after successful opening socket
354    */
355   GNUNET_STREAM_ListenCallback listen_cb;
356
357   /**
358    * The call back closure
359    */
360   void *listen_cb_cls;
361
362   /**
363    * Our interned Peer's identity
364    */
365   GNUNET_PEER_Id our_id;
366
367   /**
368    * The service port
369    * FIXME: Remove if not required!
370    */
371   GNUNET_MESH_ApplicationType port;
372 };
373
374
375 /**
376  * The IO Write Handle
377  */
378 struct GNUNET_STREAM_IOWriteHandle
379 {
380   /**
381    * The socket to which this write handle is associated
382    */
383   struct GNUNET_STREAM_Socket *socket;
384
385   /**
386    * The packet_buffers associated with this Handle
387    */
388   struct GNUNET_STREAM_DataMessage *messages[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
389
390   /**
391    * The write continuation callback
392    */
393   GNUNET_STREAM_CompletionContinuation write_cont;
394
395   /**
396    * Write continuation closure
397    */
398   void *write_cont_cls;
399
400   /**
401    * The bitmap of this IOHandle; Corresponding bit for a message is set when
402    * it has been acknowledged by the receiver
403    */
404   GNUNET_STREAM_AckBitmap ack_bitmap;
405
406   /**
407    * Number of bytes in this write handle
408    */
409   size_t size;
410 };
411
412
413 /**
414  * The IO Read Handle
415  */
416 struct GNUNET_STREAM_IOReadHandle
417 {
418   /**
419    * Callback for the read processor
420    */
421   GNUNET_STREAM_DataProcessor proc;
422
423   /**
424    * The closure pointer for the read processor callback
425    */
426   void *proc_cls;
427 };
428
429
430 /**
431  * Default value in seconds for various timeouts
432  */
433 static unsigned int default_timeout = 10;
434
435 /**
436  * Callback function for sending queued message
437  *
438  * @param cls closure the socket
439  * @param size number of bytes available in buf
440  * @param buf where the callee should write the message
441  * @return number of bytes written to buf
442  */
443 static size_t
444 send_message_notify (void *cls, size_t size, void *buf)
445 {
446   struct GNUNET_STREAM_Socket *socket = cls;
447   struct GNUNET_PeerIdentity target;
448   struct MessageQueue *head;
449   size_t ret;
450
451   socket->transmit_handle = NULL; /* Remove the transmit handle */
452   head = socket->queue_head;
453   if (NULL == head)
454     return 0; /* just to be safe */
455   GNUNET_PEER_resolve (socket->other_peer, &target);
456   if (0 == size)                /* request timed out */
457     {
458       socket->retries++;
459       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
460                   "Message sending timed out. Retry %d \n",
461                   socket->retries);
462       socket->transmit_handle = 
463         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
464                                            0, /* Corking */
465                                            1, /* Priority */
466                                            /* FIXME: exponential backoff */
467                                            socket->retransmit_timeout,
468                                            &target,
469                                            ntohs (head->message->header.size),
470                                            &send_message_notify,
471                                            socket);
472       return 0;
473     }
474
475   ret = ntohs (head->message->header.size);
476   GNUNET_assert (size >= ret);
477   memcpy (buf, head->message, ret);
478   if (NULL != head->finish_cb)
479     {
480       head->finish_cb (head->finish_cb_cls, socket);
481     }
482   GNUNET_CONTAINER_DLL_remove (socket->queue_head,
483                                socket->queue_tail,
484                                head);
485   GNUNET_free (head->message);
486   GNUNET_free (head);
487   head = socket->queue_head;
488   if (NULL != head)    /* more pending messages to send */
489     {
490       socket->retries = 0;
491       socket->transmit_handle = 
492         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
493                                            0, /* Corking */
494                                            1, /* Priority */
495                                            /* FIXME: exponential backoff */
496                                            socket->retransmit_timeout,
497                                            &target,
498                                            ntohs (head->message->header.size),
499                                            &send_message_notify,
500                                            socket);
501     }
502   return ret;
503 }
504
505
506 /**
507  * Queues a message for sending using the mesh connection of a socket
508  *
509  * @param socket the socket whose mesh connection is used
510  * @param message the message to be sent
511  * @param finish_cb the callback to be called when the message is sent
512  * @param finish_cb_cls the closure for the callback
513  */
514 static void
515 queue_message (struct GNUNET_STREAM_Socket *socket,
516                struct GNUNET_STREAM_MessageHeader *message,
517                SendFinishCallback finish_cb,
518                void *finish_cb_cls)
519 {
520   struct MessageQueue *queue_entity;
521   struct GNUNET_PeerIdentity target;
522
523   GNUNET_assert 
524     ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA)
525      && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK));
526
527   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
528               "%x: Queueing message of type %d and size %d\n",
529               socket->our_id,
530               ntohs (message->header.type),
531               ntohs (message->header.size));
532   GNUNET_assert (NULL != message);
533   queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
534   queue_entity->message = message;
535   queue_entity->finish_cb = finish_cb;
536   queue_entity->finish_cb_cls = finish_cb_cls;
537   GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
538                                     socket->queue_tail,
539                                     queue_entity);
540   if (NULL == socket->transmit_handle)
541   {
542     socket->retries = 0;
543     GNUNET_PEER_resolve (socket->other_peer, &target);
544     socket->transmit_handle = 
545       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
546                                          0, /* Corking */
547                                          1, /* Priority */
548                                          socket->retransmit_timeout,
549                                          &target,
550                                          ntohs (message->header.size),
551                                          &send_message_notify,
552                                          socket);
553   }
554 }
555
556
557 /**
558  * Copies a message and queues it for sending using the mesh connection of
559  * given socket 
560  *
561  * @param socket the socket whose mesh connection is used
562  * @param message the message to be sent
563  * @param finish_cb the callback to be called when the message is sent
564  * @param finish_cb_cls the closure for the callback
565  */
566 static void
567 copy_and_queue_message (struct GNUNET_STREAM_Socket *socket,
568                         const struct GNUNET_STREAM_MessageHeader *message,
569                         SendFinishCallback finish_cb,
570                         void *finish_cb_cls)
571 {
572   struct GNUNET_STREAM_MessageHeader *msg_copy;
573   uint16_t size;
574   
575   size = ntohs (message->header.size);
576   msg_copy = GNUNET_malloc (size);
577   memcpy (msg_copy, message, size);
578   queue_message (socket, msg_copy, finish_cb, finish_cb_cls);
579 }
580
581
582 /**
583  * Callback function for sending ack message
584  *
585  * @param cls closure the ACK message created in ack_task
586  * @param size number of bytes available in buffer
587  * @param buf where the callee should write the message
588  * @return number of bytes written to buf
589  */
590 static size_t
591 send_ack_notify (void *cls, size_t size, void *buf)
592 {
593   struct GNUNET_STREAM_AckMessage *ack_msg = cls;
594
595   if (0 == size)
596     {
597       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
598                   "%s called with size 0\n", __func__);
599       return 0;
600     }
601   GNUNET_assert (ntohs (ack_msg->header.header.size) <= size);
602   
603   size = ntohs (ack_msg->header.header.size);
604   memcpy (buf, ack_msg, size);
605   return size;
606 }
607
608 /**
609  * Writes data using the given socket. The amount of data written is limited by
610  * the receiver_window_size
611  *
612  * @param socket the socket to use
613  */
614 static void 
615 write_data (struct GNUNET_STREAM_Socket *socket);
616
617 /**
618  * Task for retransmitting data messages if they aren't ACK before their ack
619  * deadline 
620  *
621  * @param cls the socket
622  * @param tc the Task context
623  */
624 static void
625 retransmission_timeout_task (void *cls,
626                              const struct GNUNET_SCHEDULER_TaskContext *tc)
627 {
628   struct GNUNET_STREAM_Socket *socket = cls;
629   
630   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
631     return;
632
633   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
634               "%x: Retransmitting DATA...\n", socket->our_id);
635   socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
636   write_data (socket);
637 }
638
639
640 /**
641  * Task for sending ACK message
642  *
643  * @param cls the socket
644  * @param tc the Task context
645  */
646 static void
647 ack_task (void *cls,
648           const struct GNUNET_SCHEDULER_TaskContext *tc)
649 {
650   struct GNUNET_STREAM_Socket *socket = cls;
651   struct GNUNET_STREAM_AckMessage *ack_msg;
652   struct GNUNET_PeerIdentity target;
653
654   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
655     {
656       return;
657     }
658
659   socket->ack_task_id = 0;
660
661   /* Create the ACK Message */
662   ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
663   ack_msg->header.header.size = htons (sizeof (struct 
664                                                GNUNET_STREAM_AckMessage));
665   ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
666   ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
667   ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
668   ack_msg->receive_window_remaining = 
669     htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
670
671   GNUNET_PEER_resolve (socket->other_peer, &target);
672   /* Request MESH for sending ACK */
673   GNUNET_MESH_notify_transmit_ready (socket->tunnel,
674                                      0, /* Corking */
675                                      1, /* Priority */
676                                      socket->retransmit_timeout,
677                                      &target,
678                                      ntohs (ack_msg->header.header.size),
679                                      &send_ack_notify,
680                                      ack_msg);
681
682   
683 }
684
685
686 /**
687  * Function to modify a bit in GNUNET_STREAM_AckBitmap
688  *
689  * @param bitmap the bitmap to modify
690  * @param bit the bit number to modify
691  * @param value GNUNET_YES to on, GNUNET_NO to off
692  */
693 static void
694 ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
695                       unsigned int bit, 
696                       int value)
697 {
698   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
699   if (GNUNET_YES == value)
700     *bitmap |= (1LL << bit);
701   else
702     *bitmap &= ~(1LL << bit);
703 }
704
705
706 /**
707  * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
708  *
709  * @param bitmap address of the bitmap that has to be checked
710  * @param bit the bit number to check
711  * @return GNUNET_YES if the bit is set; GNUNET_NO if not
712  */
713 static uint8_t
714 ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
715                       unsigned int bit)
716 {
717   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
718   return 0 != (*bitmap & (1LL << bit));
719 }
720
721
722 /**
723  * Writes data using the given socket. The amount of data written is limited by
724  * the receiver_window_size
725  *
726  * @param socket the socket to use
727  */
728 static void 
729 write_data (struct GNUNET_STREAM_Socket *socket)
730 {
731   struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
732   int packet;                   /* Although an int, should never be negative */
733   int ack_packet;
734
735   ack_packet = -1;
736   /* Find the last acknowledged packet */
737   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
738     {      
739       if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
740                                               packet))
741         ack_packet = packet;        
742       else if (NULL == io_handle->messages[packet])
743         break;
744     }
745   /* Resend packets which weren't ack'ed */
746   for (packet=0; packet < ack_packet; packet++)
747     {
748       if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
749                                              packet))
750         {
751           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
752                       "%x: Placing DATA message with sequence %u in send queue\n",
753                       socket->our_id,
754                       ntohl (io_handle->messages[packet]->sequence_number));
755
756           copy_and_queue_message (socket,
757                                   &io_handle->messages[packet]->header,
758                                   NULL,
759                                   NULL);
760         }
761     }
762   packet = ack_packet + 1;
763   /* Now send new packets if there is enough buffer space */
764   while ( (NULL != io_handle->messages[packet]) &&
765           (socket->receiver_window_available 
766            >= ntohs (io_handle->messages[packet]->header.header.size)) )
767     {
768       socket->receiver_window_available -= 
769         ntohs (io_handle->messages[packet]->header.header.size);
770       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
771                   "%x: Placing DATA message with sequence %u in send queue\n",
772                   socket->our_id,
773                   ntohl (io_handle->messages[packet]->sequence_number));
774       copy_and_queue_message (socket,
775                               &io_handle->messages[packet]->header,
776                               NULL,
777                               NULL);
778       packet++;
779     }
780
781   if (GNUNET_SCHEDULER_NO_TASK == socket->retransmission_timeout_task_id)
782     socket->retransmission_timeout_task_id = 
783       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 
784                                     (GNUNET_TIME_UNIT_SECONDS, 8),
785                                     &retransmission_timeout_task,
786                                     socket);
787 }
788
789
790 /**
791  * Task for calling the read processor
792  *
793  * @param cls the socket
794  * @param tc the task context
795  */
796 static void
797 call_read_processor (void *cls,
798                      const struct GNUNET_SCHEDULER_TaskContext *tc)
799 {
800   struct GNUNET_STREAM_Socket *socket = cls;
801   size_t read_size;
802   size_t valid_read_size;
803   unsigned int packet;
804   uint32_t sequence_increase;
805   uint32_t offset_increase;
806
807   socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
808   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
809     return;
810
811   GNUNET_assert (NULL != socket->read_handle);
812   GNUNET_assert (NULL != socket->read_handle->proc);
813
814   /* Check the bitmap for any holes */
815   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
816     {
817       if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
818                                              packet))
819         break;
820     }
821   /* We only call read processor if we have the first packet */
822   GNUNET_assert (0 < packet);
823
824   valid_read_size = 
825     socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
826
827   GNUNET_assert (0 != valid_read_size);
828
829   /* Cancel the read_io_timeout_task */
830   GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task_id);
831   socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
832
833   /* Call the data processor */
834   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
835               "%x: Calling read processor\n",
836               socket->our_id);
837   read_size = 
838     socket->read_handle->proc (socket->read_handle->proc_cls,
839                                socket->status,
840                                socket->receive_buffer + socket->copy_offset,
841                                valid_read_size);
842   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
843               "%x: Read processor read %d bytes\n",
844               socket->our_id,
845               read_size);
846   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
847               "%x: Read processor completed successfully\n",
848               socket->our_id);
849
850   /* Free the read handle */
851   GNUNET_free (socket->read_handle);
852   socket->read_handle = NULL;
853
854   GNUNET_assert (read_size <= valid_read_size);
855   socket->copy_offset += read_size;
856
857   /* Determine upto which packet we can remove from the buffer */
858   for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
859     {
860       if (socket->copy_offset == socket->receive_buffer_boundaries[packet])
861         { packet++; break; }
862       if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
863         break;
864     }
865
866   /* If no packets can be removed we can't move the buffer */
867   if (0 == packet) return;
868
869   sequence_increase = packet;
870   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
871               "%x: Sequence increase after read processor completion: %u\n",
872               socket->our_id,
873               sequence_increase);
874
875   /* Shift the data in the receive buffer */
876   memmove (socket->receive_buffer,
877            socket->receive_buffer 
878            + socket->receive_buffer_boundaries[sequence_increase-1],
879            socket->receive_buffer_size
880            - socket->receive_buffer_boundaries[sequence_increase-1]);
881   
882   /* Shift the bitmap */
883   socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
884   
885   /* Set read_sequence_number */
886   socket->read_sequence_number += sequence_increase;
887   
888   /* Set read_offset */
889   offset_increase = socket->receive_buffer_boundaries[sequence_increase-1];
890   socket->read_offset += offset_increase;
891
892   /* Fix copy_offset */
893   GNUNET_assert (offset_increase <= socket->copy_offset);
894   socket->copy_offset -= offset_increase;
895   
896   /* Fix relative boundaries */
897   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
898     {
899       if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase)
900         {
901           socket->receive_buffer_boundaries[packet] = 
902             socket->receive_buffer_boundaries[packet + sequence_increase] 
903             - offset_increase;
904         }
905       else
906         socket->receive_buffer_boundaries[packet] = 0;
907     }
908 }
909
910
911 /**
912  * Cancels the existing read io handle
913  *
914  * @param cls the closure from the SCHEDULER call
915  * @param tc the task context
916  */
917 static void
918 read_io_timeout (void *cls, 
919                 const struct GNUNET_SCHEDULER_TaskContext *tc)
920 {
921   struct GNUNET_STREAM_Socket *socket = cls;
922   GNUNET_STREAM_DataProcessor proc;
923   void *proc_cls;
924
925   socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
926   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
927   {
928     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
929                 "%x: Read task timedout - Cancelling it\n",
930                 socket->our_id);
931     GNUNET_SCHEDULER_cancel (socket->read_task_id);
932     socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
933   }
934   GNUNET_assert (NULL != socket->read_handle);
935   proc = socket->read_handle->proc;
936   proc_cls = socket->read_handle->proc_cls;
937
938   GNUNET_free (socket->read_handle);
939   socket->read_handle = NULL;
940   /* Call the read processor to signal timeout */
941   proc (proc_cls,
942         GNUNET_STREAM_TIMEOUT,
943         NULL,
944         0);
945 }
946
947
948 /**
949  * Handler for DATA messages; Same for both client and server
950  *
951  * @param socket the socket through which the ack was received
952  * @param tunnel connection to the other end
953  * @param sender who sent the message
954  * @param msg the data message
955  * @param atsi performance data for the connection
956  * @return GNUNET_OK to keep the connection open,
957  *         GNUNET_SYSERR to close it (signal serious error)
958  */
959 static int
960 handle_data (struct GNUNET_STREAM_Socket *socket,
961              struct GNUNET_MESH_Tunnel *tunnel,
962              const struct GNUNET_PeerIdentity *sender,
963              const struct GNUNET_STREAM_DataMessage *msg,
964              const struct GNUNET_ATS_Information*atsi)
965 {
966   const void *payload;
967   uint32_t bytes_needed;
968   uint32_t relative_offset;
969   uint32_t relative_sequence_number;
970   uint16_t size;
971
972   size = htons (msg->header.header.size);
973   if (size < sizeof (struct GNUNET_STREAM_DataMessage))
974     {
975       GNUNET_break_op (0);
976       return GNUNET_SYSERR;
977     }
978
979   if (GNUNET_PEER_search (sender) != socket->other_peer)
980     {
981       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
982                   "%x: Received DATA from non-confirming peer\n",
983                   socket->our_id);
984       return GNUNET_YES;
985     }
986
987   switch (socket->state)
988     {
989     case STATE_ESTABLISHED:
990     case STATE_TRANSMIT_CLOSED:
991     case STATE_TRANSMIT_CLOSE_WAIT:
992       
993       /* check if the message's sequence number is in the range we are
994          expecting */
995       relative_sequence_number = 
996         ntohl (msg->sequence_number) - socket->read_sequence_number;
997       if ( relative_sequence_number > GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
998         {
999           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1000                       "%x: Ignoring received message with sequence number %u\n",
1001                       socket->our_id,
1002                       ntohl (msg->sequence_number));
1003           /* Start ACK sending task if one is not already present */
1004           if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1005             {
1006               socket->ack_task_id = 
1007                 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1008                                               (msg->ack_deadline),
1009                                               &ack_task,
1010                                               socket);
1011             }
1012           return GNUNET_YES;
1013         }
1014       
1015       /* Check if we have already seen this message */
1016       if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
1017                                               relative_sequence_number))
1018         {
1019           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1020                       "%x: Ignoring already received message with sequence "
1021                       "number %u\n",
1022                       socket->our_id,
1023                       ntohl (msg->sequence_number));
1024           /* Start ACK sending task if one is not already present */
1025           if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1026             {
1027               socket->ack_task_id = 
1028                 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1029                                               (msg->ack_deadline),
1030                                               &ack_task,
1031                                               socket);
1032             }
1033           return GNUNET_YES;
1034         }
1035
1036       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1037                   "%x: Receiving DATA with sequence number: %u and size: %d "
1038                   "from %x\n",
1039                   socket->our_id,
1040                   ntohl (msg->sequence_number),
1041                   ntohs (msg->header.header.size),
1042                   socket->other_peer);
1043
1044       /* Check if we have to allocate the buffer */
1045       size -= sizeof (struct GNUNET_STREAM_DataMessage);
1046       relative_offset = ntohl (msg->offset) - socket->read_offset;
1047       bytes_needed = relative_offset + size;
1048       if (bytes_needed > socket->receive_buffer_size)
1049         {
1050           if (bytes_needed <= RECEIVE_BUFFER_SIZE)
1051             {
1052               socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
1053                                                        bytes_needed);
1054               socket->receive_buffer_size = bytes_needed;
1055             }
1056           else
1057             {
1058               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1059                           "%x: Cannot accommodate packet %d as buffer is",
1060                           "full\n",
1061                           socket->our_id,
1062                           ntohl (msg->sequence_number));
1063               return GNUNET_YES;
1064             }
1065         }
1066       
1067       /* Copy Data to buffer */
1068       payload = &msg[1];
1069       GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
1070       memcpy (socket->receive_buffer + relative_offset,
1071               payload,
1072               size);
1073       socket->receive_buffer_boundaries[relative_sequence_number] = 
1074         relative_offset + size;
1075       
1076       /* Modify the ACK bitmap */
1077       ackbitmap_modify_bit (&socket->ack_bitmap,
1078                             relative_sequence_number,
1079                             GNUNET_YES);
1080
1081       /* Start ACK sending task if one is not already present */
1082       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1083        {
1084          socket->ack_task_id = 
1085            GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1086                                          (msg->ack_deadline),
1087                                          &ack_task,
1088                                          socket);
1089        }
1090
1091       if ((NULL != socket->read_handle) /* A read handle is waiting */
1092           /* There is no current read task */
1093           && (GNUNET_SCHEDULER_NO_TASK == socket->read_task_id)
1094           /* We have the first packet */
1095           && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap,
1096                                                  0)))
1097         {
1098           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1099                       "%x: Scheduling read processor\n",
1100                       socket->our_id);
1101
1102           socket->read_task_id = 
1103             GNUNET_SCHEDULER_add_now (&call_read_processor,
1104                                       socket);
1105         }
1106       
1107       break;
1108
1109     default:
1110       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1111                   "%x: Received data message when it cannot be handled\n",
1112                   socket->our_id);
1113       break;
1114     }
1115   return GNUNET_YES;
1116 }
1117
1118
1119 /**
1120  * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1121  *
1122  * @param cls the socket (set from GNUNET_MESH_connect)
1123  * @param tunnel connection to the other end
1124  * @param tunnel_ctx place to store local state associated with the tunnel
1125  * @param sender who sent the message
1126  * @param message the actual message
1127  * @param atsi performance data for the connection
1128  * @return GNUNET_OK to keep the connection open,
1129  *         GNUNET_SYSERR to close it (signal serious error)
1130  */
1131 static int
1132 client_handle_data (void *cls,
1133              struct GNUNET_MESH_Tunnel *tunnel,
1134              void **tunnel_ctx,
1135              const struct GNUNET_PeerIdentity *sender,
1136              const struct GNUNET_MessageHeader *message,
1137              const struct GNUNET_ATS_Information*atsi)
1138 {
1139   struct GNUNET_STREAM_Socket *socket = cls;
1140
1141   return handle_data (socket, 
1142                       tunnel, 
1143                       sender, 
1144                       (const struct GNUNET_STREAM_DataMessage *) message, 
1145                       atsi);
1146 }
1147
1148
1149 /**
1150  * Callback to set state to ESTABLISHED
1151  *
1152  * @param cls the closure from queue_message FIXME: document
1153  * @param socket the socket to requiring state change
1154  */
1155 static void
1156 set_state_established (void *cls,
1157                        struct GNUNET_STREAM_Socket *socket)
1158 {
1159   struct GNUNET_PeerIdentity initiator_pid;
1160
1161   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
1162               "%x: Attaining ESTABLISHED state\n",
1163               socket->our_id);
1164   socket->write_offset = 0;
1165   socket->read_offset = 0;
1166   socket->state = STATE_ESTABLISHED;
1167   /* FIXME: What if listen_cb is NULL */
1168   if (NULL != socket->lsocket)
1169     {
1170       GNUNET_PEER_resolve (socket->other_peer, &initiator_pid);
1171       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1172                   "%x: Calling listen callback\n",
1173                   socket->our_id);
1174       if (GNUNET_SYSERR == 
1175           socket->lsocket->listen_cb (socket->lsocket->listen_cb_cls,
1176                                       socket,
1177                                       &initiator_pid))
1178         {
1179           socket->state = STATE_CLOSED;
1180           /* FIXME: We should close in a decent way */
1181           GNUNET_MESH_tunnel_destroy (socket->tunnel); /* Destroy the tunnel */
1182           GNUNET_free (socket);
1183         }
1184     }
1185   else if (socket->open_cb)
1186     socket->open_cb (socket->open_cls, socket);
1187 }
1188
1189
1190 /**
1191  * Callback to set state to HELLO_WAIT
1192  *
1193  * @param cls the closure from queue_message
1194  * @param socket the socket to requiring state change
1195  */
1196 static void
1197 set_state_hello_wait (void *cls,
1198                       struct GNUNET_STREAM_Socket *socket)
1199 {
1200   GNUNET_assert (STATE_INIT == socket->state);
1201   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
1202               "%x: Attaining HELLO_WAIT state\n",
1203               socket->our_id);
1204   socket->state = STATE_HELLO_WAIT;
1205 }
1206
1207
1208 /**
1209  * Returns a new HelloAckMessage. Also sets the write sequence number for the
1210  * socket
1211  *
1212  * @param socket the socket for which this HelloAckMessage has to be generated
1213  * @return the HelloAckMessage
1214  */
1215 static struct GNUNET_STREAM_HelloAckMessage *
1216 generate_hello_ack_msg (struct GNUNET_STREAM_Socket *socket)
1217 {
1218   struct GNUNET_STREAM_HelloAckMessage *msg;
1219
1220   /* Get the random sequence number */
1221   socket->write_sequence_number = 
1222     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1223   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1224               "%x: Generated write sequence number %u\n",
1225               socket->our_id,
1226               (unsigned int) socket->write_sequence_number);
1227   
1228   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1229   msg->header.header.size = 
1230     htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1231   msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1232   msg->sequence_number = htonl (socket->write_sequence_number);
1233   msg->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
1234
1235   return msg;
1236 }
1237
1238
1239 /**
1240  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1241  *
1242  * @param cls the socket (set from GNUNET_MESH_connect)
1243  * @param tunnel connection to the other end
1244  * @param tunnel_ctx this is NULL
1245  * @param sender who sent the message
1246  * @param message the actual message
1247  * @param atsi performance data for the connection
1248  * @return GNUNET_OK to keep the connection open,
1249  *         GNUNET_SYSERR to close it (signal serious error)
1250  */
1251 static int
1252 client_handle_hello_ack (void *cls,
1253                          struct GNUNET_MESH_Tunnel *tunnel,
1254                          void **tunnel_ctx,
1255                          const struct GNUNET_PeerIdentity *sender,
1256                          const struct GNUNET_MessageHeader *message,
1257                          const struct GNUNET_ATS_Information*atsi)
1258 {
1259   struct GNUNET_STREAM_Socket *socket = cls;
1260   const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
1261   struct GNUNET_STREAM_HelloAckMessage *reply;
1262
1263   if (GNUNET_PEER_search (sender) != socket->other_peer)
1264     {
1265       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1266                   "%x: Received HELLO_ACK from non-confirming peer\n",
1267                   socket->our_id);
1268       return GNUNET_YES;
1269     }
1270   ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
1271   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1272               "%x: Received HELLO_ACK from %x\n",
1273               socket->our_id,
1274               socket->other_peer);
1275
1276   GNUNET_assert (socket->tunnel == tunnel);
1277   switch (socket->state)
1278   {
1279   case STATE_HELLO_WAIT:
1280     socket->read_sequence_number = ntohl (ack_msg->sequence_number);
1281     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1282                 "%x: Read sequence number %u\n",
1283                 socket->our_id,
1284                 (unsigned int) socket->read_sequence_number);
1285     socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
1286     reply = generate_hello_ack_msg (socket);
1287     queue_message (socket,
1288                    &reply->header, 
1289                    &set_state_established, 
1290                    NULL);      
1291     return GNUNET_OK;
1292   case STATE_ESTABLISHED:
1293   case STATE_RECEIVE_CLOSE_WAIT:
1294     // call statistics (# ACKs ignored++)
1295     return GNUNET_OK;
1296   case STATE_INIT:
1297   default:
1298     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1299                 "%x: Server %x sent HELLO_ACK when in state %d\n", 
1300                 socket->our_id,
1301                 socket->other_peer,
1302                 socket->state);
1303     socket->state = STATE_CLOSED; // introduce STATE_ERROR?
1304     return GNUNET_SYSERR;
1305   }
1306
1307 }
1308
1309
1310 /**
1311  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1312  *
1313  * @param cls the socket (set from GNUNET_MESH_connect)
1314  * @param tunnel connection to the other end
1315  * @param tunnel_ctx this is NULL
1316  * @param sender who sent the message
1317  * @param message the actual message
1318  * @param atsi performance data for the connection
1319  * @return GNUNET_OK to keep the connection open,
1320  *         GNUNET_SYSERR to close it (signal serious error)
1321  */
1322 static int
1323 client_handle_reset (void *cls,
1324                      struct GNUNET_MESH_Tunnel *tunnel,
1325                      void **tunnel_ctx,
1326                      const struct GNUNET_PeerIdentity *sender,
1327                      const struct GNUNET_MessageHeader *message,
1328                      const struct GNUNET_ATS_Information*atsi)
1329 {
1330   struct GNUNET_STREAM_Socket *socket = cls;
1331
1332   return GNUNET_OK;
1333 }
1334
1335
1336 /**
1337  * Common message handler for handling TRANSMIT_CLOSE messages
1338  *
1339  * @param socket the socket through which the ack was received
1340  * @param tunnel connection to the other end
1341  * @param sender who sent the message
1342  * @param msg the transmit close message
1343  * @param atsi performance data for the connection
1344  * @return GNUNET_OK to keep the connection open,
1345  *         GNUNET_SYSERR to close it (signal serious error)
1346  */
1347 static int
1348 handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
1349                        struct GNUNET_MESH_Tunnel *tunnel,
1350                        const struct GNUNET_PeerIdentity *sender,
1351                        const struct GNUNET_STREAM_MessageHeader *msg,
1352                        const struct GNUNET_ATS_Information*atsi)
1353 {
1354   struct GNUNET_STREAM_MessageHeader *reply;
1355
1356   switch (socket->state)
1357     {
1358     case STATE_ESTABLISHED:
1359       socket->state = STATE_RECEIVE_CLOSED;
1360
1361       /* Send TRANSMIT_CLOSE_ACK */
1362       reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1363       reply->header.type = 
1364         htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
1365       reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1366       queue_message (socket, reply, NULL, NULL);
1367       break;
1368
1369     default:
1370       /* FIXME: Call statistics? */
1371       break;
1372     }
1373   return GNUNET_YES;
1374 }
1375
1376
1377 /**
1378  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1379  *
1380  * @param cls the socket (set from GNUNET_MESH_connect)
1381  * @param tunnel connection to the other end
1382  * @param tunnel_ctx this is NULL
1383  * @param sender who sent the message
1384  * @param message the actual message
1385  * @param atsi performance data for the connection
1386  * @return GNUNET_OK to keep the connection open,
1387  *         GNUNET_SYSERR to close it (signal serious error)
1388  */
1389 static int
1390 client_handle_transmit_close (void *cls,
1391                               struct GNUNET_MESH_Tunnel *tunnel,
1392                               void **tunnel_ctx,
1393                               const struct GNUNET_PeerIdentity *sender,
1394                               const struct GNUNET_MessageHeader *message,
1395                               const struct GNUNET_ATS_Information*atsi)
1396 {
1397   struct GNUNET_STREAM_Socket *socket = cls;
1398   
1399   return handle_transmit_close (socket,
1400                                 tunnel,
1401                                 sender,
1402                                 (struct GNUNET_STREAM_MessageHeader *)message,
1403                                 atsi);
1404 }
1405
1406
1407 /**
1408  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1409  *
1410  * @param cls the socket (set from GNUNET_MESH_connect)
1411  * @param tunnel connection to the other end
1412  * @param tunnel_ctx this is NULL
1413  * @param sender who sent the message
1414  * @param message the actual message
1415  * @param atsi performance data for the connection
1416  * @return GNUNET_OK to keep the connection open,
1417  *         GNUNET_SYSERR to close it (signal serious error)
1418  */
1419 static int
1420 client_handle_transmit_close_ack (void *cls,
1421                                   struct GNUNET_MESH_Tunnel *tunnel,
1422                                   void **tunnel_ctx,
1423                                   const struct GNUNET_PeerIdentity *sender,
1424                                   const struct GNUNET_MessageHeader *message,
1425                                   const struct GNUNET_ATS_Information*atsi)
1426 {
1427   struct GNUNET_STREAM_Socket *socket = cls;
1428
1429   return GNUNET_OK;
1430 }
1431
1432
1433 /**
1434  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1435  *
1436  * @param cls the socket (set from GNUNET_MESH_connect)
1437  * @param tunnel connection to the other end
1438  * @param tunnel_ctx this is NULL
1439  * @param sender who sent the message
1440  * @param message the actual message
1441  * @param atsi performance data for the connection
1442  * @return GNUNET_OK to keep the connection open,
1443  *         GNUNET_SYSERR to close it (signal serious error)
1444  */
1445 static int
1446 client_handle_receive_close (void *cls,
1447                              struct GNUNET_MESH_Tunnel *tunnel,
1448                              void **tunnel_ctx,
1449                              const struct GNUNET_PeerIdentity *sender,
1450                              const struct GNUNET_MessageHeader *message,
1451                              const struct GNUNET_ATS_Information*atsi)
1452 {
1453   struct GNUNET_STREAM_Socket *socket = cls;
1454
1455   return GNUNET_OK;
1456 }
1457
1458
1459 /**
1460  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1461  *
1462  * @param cls the socket (set from GNUNET_MESH_connect)
1463  * @param tunnel connection to the other end
1464  * @param tunnel_ctx this is NULL
1465  * @param sender who sent the message
1466  * @param message the actual message
1467  * @param atsi performance data for the connection
1468  * @return GNUNET_OK to keep the connection open,
1469  *         GNUNET_SYSERR to close it (signal serious error)
1470  */
1471 static int
1472 client_handle_receive_close_ack (void *cls,
1473                                  struct GNUNET_MESH_Tunnel *tunnel,
1474                                  void **tunnel_ctx,
1475                                  const struct GNUNET_PeerIdentity *sender,
1476                                  const struct GNUNET_MessageHeader *message,
1477                                  const struct GNUNET_ATS_Information*atsi)
1478 {
1479   struct GNUNET_STREAM_Socket *socket = cls;
1480
1481   return GNUNET_OK;
1482 }
1483
1484
1485 /**
1486  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1487  *
1488  * @param cls the socket (set from GNUNET_MESH_connect)
1489  * @param tunnel connection to the other end
1490  * @param tunnel_ctx this is NULL
1491  * @param sender who sent the message
1492  * @param message the actual message
1493  * @param atsi performance data for the connection
1494  * @return GNUNET_OK to keep the connection open,
1495  *         GNUNET_SYSERR to close it (signal serious error)
1496  */
1497 static int
1498 client_handle_close (void *cls,
1499                      struct GNUNET_MESH_Tunnel *tunnel,
1500                      void **tunnel_ctx,
1501                      const struct GNUNET_PeerIdentity *sender,
1502                      const struct GNUNET_MessageHeader *message,
1503                      const struct GNUNET_ATS_Information*atsi)
1504 {
1505   struct GNUNET_STREAM_Socket *socket = cls;
1506
1507   return GNUNET_OK;
1508 }
1509
1510
1511 /**
1512  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1513  *
1514  * @param cls the socket (set from GNUNET_MESH_connect)
1515  * @param tunnel connection to the other end
1516  * @param tunnel_ctx this is NULL
1517  * @param sender who sent the message
1518  * @param message the actual message
1519  * @param atsi performance data for the connection
1520  * @return GNUNET_OK to keep the connection open,
1521  *         GNUNET_SYSERR to close it (signal serious error)
1522  */
1523 static int
1524 client_handle_close_ack (void *cls,
1525                          struct GNUNET_MESH_Tunnel *tunnel,
1526                          void **tunnel_ctx,
1527                          const struct GNUNET_PeerIdentity *sender,
1528                          const struct GNUNET_MessageHeader *message,
1529                          const struct GNUNET_ATS_Information*atsi)
1530 {
1531   struct GNUNET_STREAM_Socket *socket = cls;
1532
1533   return GNUNET_OK;
1534 }
1535
1536 /*****************************/
1537 /* Server's Message Handlers */
1538 /*****************************/
1539
1540 /**
1541  * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1542  *
1543  * @param cls the closure
1544  * @param tunnel connection to the other end
1545  * @param tunnel_ctx the socket
1546  * @param sender who sent the message
1547  * @param message the actual message
1548  * @param atsi performance data for the connection
1549  * @return GNUNET_OK to keep the connection open,
1550  *         GNUNET_SYSERR to close it (signal serious error)
1551  */
1552 static int
1553 server_handle_data (void *cls,
1554                     struct GNUNET_MESH_Tunnel *tunnel,
1555                     void **tunnel_ctx,
1556                     const struct GNUNET_PeerIdentity *sender,
1557                     const struct GNUNET_MessageHeader *message,
1558                     const struct GNUNET_ATS_Information*atsi)
1559 {
1560   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1561
1562   return handle_data (socket,
1563                       tunnel,
1564                       sender,
1565                       (const struct GNUNET_STREAM_DataMessage *)message,
1566                       atsi);
1567 }
1568
1569
1570 /**
1571  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
1572  *
1573  * @param cls the closure
1574  * @param tunnel connection to the other end
1575  * @param tunnel_ctx the socket
1576  * @param sender who sent the message
1577  * @param message the actual message
1578  * @param atsi performance data for the connection
1579  * @return GNUNET_OK to keep the connection open,
1580  *         GNUNET_SYSERR to close it (signal serious error)
1581  */
1582 static int
1583 server_handle_hello (void *cls,
1584                      struct GNUNET_MESH_Tunnel *tunnel,
1585                      void **tunnel_ctx,
1586                      const struct GNUNET_PeerIdentity *sender,
1587                      const struct GNUNET_MessageHeader *message,
1588                      const struct GNUNET_ATS_Information*atsi)
1589 {
1590   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1591   struct GNUNET_STREAM_HelloAckMessage *reply;
1592
1593   if (GNUNET_PEER_search (sender) != socket->other_peer)
1594     {
1595       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1596                   "%x: Received HELLO from non-confirming peer\n",
1597                   socket->our_id);
1598       return GNUNET_YES;
1599     }
1600
1601   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == 
1602                  ntohs (message->type));
1603   GNUNET_assert (socket->tunnel == tunnel);
1604   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1605               "%x: Received HELLO from %x\n", 
1606               socket->our_id,
1607               socket->other_peer);
1608
1609   if (STATE_INIT == socket->state)
1610     {
1611       reply = generate_hello_ack_msg (socket);
1612       queue_message (socket, 
1613                      &reply->header,
1614                      &set_state_hello_wait, 
1615                      NULL);
1616     }
1617   else
1618     {
1619       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1620                   "Client sent HELLO when in state %d\n", socket->state);
1621       /* FIXME: Send RESET? */
1622       
1623     }
1624   return GNUNET_OK;
1625 }
1626
1627
1628 /**
1629  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1630  *
1631  * @param cls the closure
1632  * @param tunnel connection to the other end
1633  * @param tunnel_ctx the socket
1634  * @param sender who sent the message
1635  * @param message the actual message
1636  * @param atsi performance data for the connection
1637  * @return GNUNET_OK to keep the connection open,
1638  *         GNUNET_SYSERR to close it (signal serious error)
1639  */
1640 static int
1641 server_handle_hello_ack (void *cls,
1642                          struct GNUNET_MESH_Tunnel *tunnel,
1643                          void **tunnel_ctx,
1644                          const struct GNUNET_PeerIdentity *sender,
1645                          const struct GNUNET_MessageHeader *message,
1646                          const struct GNUNET_ATS_Information*atsi)
1647 {
1648   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1649   const struct GNUNET_STREAM_HelloAckMessage *ack_message;
1650
1651   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK ==
1652                  ntohs (message->type));
1653   GNUNET_assert (socket->tunnel == tunnel);
1654   ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
1655   if (STATE_HELLO_WAIT == socket->state)
1656     {
1657       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1658                   "%x: Received HELLO_ACK from %x\n",
1659                   socket->our_id,
1660                   socket->other_peer);
1661       socket->read_sequence_number = ntohl (ack_message->sequence_number);
1662       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1663                   "%x: Read sequence number %u\n",
1664                   socket->our_id,
1665                   (unsigned int) socket->read_sequence_number);
1666       socket->receiver_window_available = 
1667         ntohl (ack_message->receiver_window_size);
1668       /* Attain ESTABLISHED state */
1669       set_state_established (NULL, socket);
1670     }
1671   else
1672     {
1673       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1674                   "Client sent HELLO_ACK when in state %d\n", socket->state);
1675       /* FIXME: Send RESET? */
1676       
1677     }
1678   return GNUNET_OK;
1679 }
1680
1681
1682 /**
1683  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1684  *
1685  * @param cls the closure
1686  * @param tunnel connection to the other end
1687  * @param tunnel_ctx the socket
1688  * @param sender who sent the message
1689  * @param message the actual message
1690  * @param atsi performance data for the connection
1691  * @return GNUNET_OK to keep the connection open,
1692  *         GNUNET_SYSERR to close it (signal serious error)
1693  */
1694 static int
1695 server_handle_reset (void *cls,
1696                      struct GNUNET_MESH_Tunnel *tunnel,
1697                      void **tunnel_ctx,
1698                      const struct GNUNET_PeerIdentity *sender,
1699                      const struct GNUNET_MessageHeader *message,
1700                      const struct GNUNET_ATS_Information*atsi)
1701 {
1702   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1703
1704   return GNUNET_OK;
1705 }
1706
1707
1708 /**
1709  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1710  *
1711  * @param cls the closure
1712  * @param tunnel connection to the other end
1713  * @param tunnel_ctx the socket
1714  * @param sender who sent the message
1715  * @param message the actual message
1716  * @param atsi performance data for the connection
1717  * @return GNUNET_OK to keep the connection open,
1718  *         GNUNET_SYSERR to close it (signal serious error)
1719  */
1720 static int
1721 server_handle_transmit_close (void *cls,
1722                               struct GNUNET_MESH_Tunnel *tunnel,
1723                               void **tunnel_ctx,
1724                               const struct GNUNET_PeerIdentity *sender,
1725                               const struct GNUNET_MessageHeader *message,
1726                               const struct GNUNET_ATS_Information*atsi)
1727 {
1728   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1729
1730   return handle_transmit_close (socket,
1731                                 tunnel,
1732                                 sender,
1733                                 (struct GNUNET_STREAM_MessageHeader *)message,
1734                                 atsi);
1735 }
1736
1737
1738 /**
1739  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1740  *
1741  * @param cls the closure
1742  * @param tunnel connection to the other end
1743  * @param tunnel_ctx the socket
1744  * @param sender who sent the message
1745  * @param message the actual message
1746  * @param atsi performance data for the connection
1747  * @return GNUNET_OK to keep the connection open,
1748  *         GNUNET_SYSERR to close it (signal serious error)
1749  */
1750 static int
1751 server_handle_transmit_close_ack (void *cls,
1752                                   struct GNUNET_MESH_Tunnel *tunnel,
1753                                   void **tunnel_ctx,
1754                                   const struct GNUNET_PeerIdentity *sender,
1755                                   const struct GNUNET_MessageHeader *message,
1756                                   const struct GNUNET_ATS_Information*atsi)
1757 {
1758   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1759
1760   return GNUNET_OK;
1761 }
1762
1763
1764 /**
1765  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1766  *
1767  * @param cls the closure
1768  * @param tunnel connection to the other end
1769  * @param tunnel_ctx the socket
1770  * @param sender who sent the message
1771  * @param message the actual message
1772  * @param atsi performance data for the connection
1773  * @return GNUNET_OK to keep the connection open,
1774  *         GNUNET_SYSERR to close it (signal serious error)
1775  */
1776 static int
1777 server_handle_receive_close (void *cls,
1778                              struct GNUNET_MESH_Tunnel *tunnel,
1779                              void **tunnel_ctx,
1780                              const struct GNUNET_PeerIdentity *sender,
1781                              const struct GNUNET_MessageHeader *message,
1782                              const struct GNUNET_ATS_Information*atsi)
1783 {
1784   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1785
1786   return GNUNET_OK;
1787 }
1788
1789
1790 /**
1791  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1792  *
1793  * @param cls the closure
1794  * @param tunnel connection to the other end
1795  * @param tunnel_ctx the socket
1796  * @param sender who sent the message
1797  * @param message the actual message
1798  * @param atsi performance data for the connection
1799  * @return GNUNET_OK to keep the connection open,
1800  *         GNUNET_SYSERR to close it (signal serious error)
1801  */
1802 static int
1803 server_handle_receive_close_ack (void *cls,
1804                                  struct GNUNET_MESH_Tunnel *tunnel,
1805                                  void **tunnel_ctx,
1806                                  const struct GNUNET_PeerIdentity *sender,
1807                                  const struct GNUNET_MessageHeader *message,
1808                                  const struct GNUNET_ATS_Information*atsi)
1809 {
1810   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1811
1812   return GNUNET_OK;
1813 }
1814
1815
1816 /**
1817  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1818  *
1819  * @param cls the closure
1820  * @param tunnel connection to the other end
1821  * @param tunnel_ctx the socket
1822  * @param sender who sent the message
1823  * @param message the actual message
1824  * @param atsi performance data for the connection
1825  * @return GNUNET_OK to keep the connection open,
1826  *         GNUNET_SYSERR to close it (signal serious error)
1827  */
1828 static int
1829 server_handle_close (void *cls,
1830                      struct GNUNET_MESH_Tunnel *tunnel,
1831                      void **tunnel_ctx,
1832                      const struct GNUNET_PeerIdentity *sender,
1833                      const struct GNUNET_MessageHeader *message,
1834                      const struct GNUNET_ATS_Information*atsi)
1835 {
1836   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1837
1838   return GNUNET_OK;
1839 }
1840
1841
1842 /**
1843  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1844  *
1845  * @param cls the closure
1846  * @param tunnel connection to the other end
1847  * @param tunnel_ctx the socket
1848  * @param sender who sent the message
1849  * @param message the actual message
1850  * @param atsi performance data for the connection
1851  * @return GNUNET_OK to keep the connection open,
1852  *         GNUNET_SYSERR to close it (signal serious error)
1853  */
1854 static int
1855 server_handle_close_ack (void *cls,
1856                          struct GNUNET_MESH_Tunnel *tunnel,
1857                          void **tunnel_ctx,
1858                          const struct GNUNET_PeerIdentity *sender,
1859                          const struct GNUNET_MessageHeader *message,
1860                          const struct GNUNET_ATS_Information*atsi)
1861 {
1862   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1863
1864   return GNUNET_OK;
1865 }
1866
1867
1868 /**
1869  * Message Handler for mesh
1870  *
1871  * @param socket the socket through which the ack was received
1872  * @param tunnel connection to the other end
1873  * @param sender who sent the message
1874  * @param ack the acknowledgment message
1875  * @param atsi performance data for the connection
1876  * @return GNUNET_OK to keep the connection open,
1877  *         GNUNET_SYSERR to close it (signal serious error)
1878  */
1879 static int
1880 handle_ack (struct GNUNET_STREAM_Socket *socket,
1881             struct GNUNET_MESH_Tunnel *tunnel,
1882             const struct GNUNET_PeerIdentity *sender,
1883             const struct GNUNET_STREAM_AckMessage *ack,
1884             const struct GNUNET_ATS_Information*atsi)
1885 {
1886   unsigned int packet;
1887   int need_retransmission;
1888   
1889
1890   if (GNUNET_PEER_search (sender) != socket->other_peer)
1891     {
1892       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1893                   "%x: Received ACK from non-confirming peer\n",
1894                   socket->our_id);
1895       return GNUNET_YES;
1896     }
1897
1898   switch (socket->state)
1899     {
1900     case (STATE_ESTABLISHED):
1901       if (NULL == socket->write_handle)
1902         {
1903           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1904                       "%x: Received DATA_ACK when write_handle is NULL\n",
1905                       socket->our_id);
1906           return GNUNET_OK;
1907         }
1908       /* FIXME: increment in the base sequence number is breaking current flow
1909        */
1910       if (!((socket->write_sequence_number 
1911              - ntohl (ack->base_sequence_number)) < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
1912         {
1913           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1914                       "%x: Received DATA_ACK with unexpected base sequence "
1915                       "number\n",
1916                       socket->our_id);
1917           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1918                       "%x: Current write sequence: %u; Ack's base sequence: %u\n",
1919                       socket->our_id,
1920                       socket->write_sequence_number,
1921                       ntohl (ack->base_sequence_number));
1922           return GNUNET_OK;
1923         }
1924       /* FIXME: include the case when write_handle is cancelled - ignore the 
1925          acks */
1926
1927       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1928                   "%x: Received DATA_ACK from %x\n",
1929                   socket->our_id,
1930                   socket->other_peer);
1931       
1932       /* Cancel the retransmission task */
1933       if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
1934         {
1935           GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
1936           socket->retransmission_timeout_task_id = 
1937             GNUNET_SCHEDULER_NO_TASK;
1938         }
1939
1940       for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
1941         {
1942           if (NULL == socket->write_handle->messages[packet]) break;
1943           if (ntohl (ack->base_sequence_number)
1944               >= ntohl (socket->write_handle->messages[packet]->sequence_number))
1945             ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
1946                                   packet,
1947                                   GNUNET_YES);
1948           else
1949             if (GNUNET_YES == 
1950                 ackbitmap_is_bit_set (&socket->write_handle->ack_bitmap,
1951                                       ntohl (socket->write_handle->messages[packet]->sequence_number)
1952                                       - ntohl (ack->base_sequence_number)))
1953               ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
1954                                     packet,
1955                                     GNUNET_YES);
1956         }
1957
1958       /* Update the receive window remaining
1959        FIXME : Should update with the value from a data ack with greater
1960        sequence number */
1961       socket->receiver_window_available = 
1962         ntohl (ack->receive_window_remaining);
1963
1964       /* Check if we have received all acknowledgements */
1965       need_retransmission = GNUNET_NO;
1966       for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
1967         {
1968           if (NULL == socket->write_handle->messages[packet]) break;
1969           if (GNUNET_YES != ackbitmap_is_bit_set 
1970               (&socket->write_handle->ack_bitmap,packet))
1971             {
1972               need_retransmission = GNUNET_YES;
1973               break;
1974             }
1975         }
1976       if (GNUNET_YES == need_retransmission)
1977         {
1978           write_data (socket);
1979         }
1980       else      /* We have to call the write continuation callback now */
1981         {
1982           /* Free the packets */
1983           for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
1984             {
1985               GNUNET_free_non_null (socket->write_handle->messages[packet]);
1986             }
1987           if (NULL != socket->write_handle->write_cont)
1988             socket->write_handle->write_cont
1989               (socket->write_handle->write_cont_cls,
1990                socket->status,
1991                socket->write_handle->size);
1992           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1993                       "%x: Write completion callback completed\n",
1994                       socket->our_id);
1995           /* We are done with the write handle - Freeing it */
1996           GNUNET_free (socket->write_handle);
1997           socket->write_handle = NULL;
1998         }
1999       break;
2000     default:
2001       break;
2002     }
2003   return GNUNET_OK;
2004 }
2005
2006
2007 /**
2008  * Message Handler for mesh
2009  *
2010  * @param cls the 'struct GNUNET_STREAM_Socket'
2011  * @param tunnel connection to the other end
2012  * @param tunnel_ctx unused
2013  * @param sender who sent the message
2014  * @param message the actual message
2015  * @param atsi performance data for the connection
2016  * @return GNUNET_OK to keep the connection open,
2017  *         GNUNET_SYSERR to close it (signal serious error)
2018  */
2019 static int
2020 client_handle_ack (void *cls,
2021                    struct GNUNET_MESH_Tunnel *tunnel,
2022                    void **tunnel_ctx,
2023                    const struct GNUNET_PeerIdentity *sender,
2024                    const struct GNUNET_MessageHeader *message,
2025                    const struct GNUNET_ATS_Information*atsi)
2026 {
2027   struct GNUNET_STREAM_Socket *socket = cls;
2028   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2029  
2030   return handle_ack (socket, tunnel, sender, ack, atsi);
2031 }
2032
2033
2034 /**
2035  * Message Handler for mesh
2036  *
2037  * @param cls the server's listen socket
2038  * @param tunnel connection to the other end
2039  * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
2040  * @param sender who sent the message
2041  * @param message the actual message
2042  * @param atsi performance data for the connection
2043  * @return GNUNET_OK to keep the connection open,
2044  *         GNUNET_SYSERR to close it (signal serious error)
2045  */
2046 static int
2047 server_handle_ack (void *cls,
2048                    struct GNUNET_MESH_Tunnel *tunnel,
2049                    void **tunnel_ctx,
2050                    const struct GNUNET_PeerIdentity *sender,
2051                    const struct GNUNET_MessageHeader *message,
2052                    const struct GNUNET_ATS_Information*atsi)
2053 {
2054   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2055   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2056  
2057   return handle_ack (socket, tunnel, sender, ack, atsi);
2058 }
2059
2060
2061 /**
2062  * For client message handlers, the stream socket is in the
2063  * closure argument.
2064  */
2065 static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
2066   {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2067   {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2068    sizeof (struct GNUNET_STREAM_AckMessage) },
2069   {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2070    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2071   {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2072    sizeof (struct GNUNET_STREAM_MessageHeader)},
2073   {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2074    sizeof (struct GNUNET_STREAM_MessageHeader)},
2075   {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2076    sizeof (struct GNUNET_STREAM_MessageHeader)},
2077   {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2078    sizeof (struct GNUNET_STREAM_MessageHeader)},
2079   {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2080    sizeof (struct GNUNET_STREAM_MessageHeader)},
2081   {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2082    sizeof (struct GNUNET_STREAM_MessageHeader)},
2083   {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2084    sizeof (struct GNUNET_STREAM_MessageHeader)},
2085   {NULL, 0, 0}
2086 };
2087
2088
2089 /**
2090  * For server message handlers, the stream socket is in the
2091  * tunnel context, and the listen socket in the closure argument.
2092  */
2093 static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
2094   {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2095   {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2096    sizeof (struct GNUNET_STREAM_AckMessage) },
2097   {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 
2098    sizeof (struct GNUNET_STREAM_MessageHeader)},
2099   {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2100    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2101   {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2102    sizeof (struct GNUNET_STREAM_MessageHeader)},
2103   {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2104    sizeof (struct GNUNET_STREAM_MessageHeader)},
2105   {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2106    sizeof (struct GNUNET_STREAM_MessageHeader)},
2107   {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2108    sizeof (struct GNUNET_STREAM_MessageHeader)},
2109   {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2110    sizeof (struct GNUNET_STREAM_MessageHeader)},
2111   {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2112    sizeof (struct GNUNET_STREAM_MessageHeader)},
2113   {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2114    sizeof (struct GNUNET_STREAM_MessageHeader)},
2115   {NULL, 0, 0}
2116 };
2117
2118
2119 /**
2120  * Function called when our target peer is connected to our tunnel
2121  *
2122  * @param cls the socket for which this tunnel is created
2123  * @param peer the peer identity of the target
2124  * @param atsi performance data for the connection
2125  */
2126 static void
2127 mesh_peer_connect_callback (void *cls,
2128                             const struct GNUNET_PeerIdentity *peer,
2129                             const struct GNUNET_ATS_Information * atsi)
2130 {
2131   struct GNUNET_STREAM_Socket *socket = cls;
2132   struct GNUNET_STREAM_MessageHeader *message;
2133   GNUNET_PEER_Id connected_peer;
2134
2135   connected_peer = GNUNET_PEER_search (peer);
2136   
2137   if (connected_peer != socket->other_peer)
2138     {
2139       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2140                   "%x: A peer which is not our target has connected",
2141                   "to our tunnel\n",
2142                   socket->our_id);
2143       return;
2144     }
2145   
2146   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2147               "%x: Target peer %x connected\n", 
2148               socket->our_id,
2149               connected_peer);
2150   
2151   /* Set state to INIT */
2152   socket->state = STATE_INIT;
2153
2154   /* Send HELLO message */
2155   message = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
2156   message->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
2157   message->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
2158   queue_message (socket,
2159                  message,
2160                  &set_state_hello_wait,
2161                  NULL);
2162
2163   /* Call open callback */
2164   if (NULL == socket->open_cb)
2165     {
2166       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2167                   "STREAM_open callback is NULL\n");
2168     }
2169 }
2170
2171
2172 /**
2173  * Function called when our target peer is disconnected from our tunnel
2174  *
2175  * @param cls the socket associated which this tunnel
2176  * @param peer the peer identity of the target
2177  */
2178 static void
2179 mesh_peer_disconnect_callback (void *cls,
2180                                const struct GNUNET_PeerIdentity *peer)
2181 {
2182
2183 }
2184
2185
2186 /**
2187  * Method called whenever a peer creates a tunnel to us
2188  *
2189  * @param cls closure
2190  * @param tunnel new handle to the tunnel
2191  * @param initiator peer that started the tunnel
2192  * @param atsi performance information for the tunnel
2193  * @return initial tunnel context for the tunnel
2194  *         (can be NULL -- that's not an error)
2195  */
2196 static void *
2197 new_tunnel_notify (void *cls,
2198                    struct GNUNET_MESH_Tunnel *tunnel,
2199                    const struct GNUNET_PeerIdentity *initiator,
2200                    const struct GNUNET_ATS_Information *atsi)
2201 {
2202   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2203   struct GNUNET_STREAM_Socket *socket;
2204
2205   /* FIXME: If a tunnel is already created, we should not accept new tunnels
2206      from the same peer again until the socket is closed */
2207
2208   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2209   socket->other_peer = GNUNET_PEER_intern (initiator);
2210   socket->tunnel = tunnel;
2211   socket->session_id = 0;       /* FIXME */
2212   socket->state = STATE_INIT;
2213   socket->lsocket = lsocket;
2214   socket->our_id = lsocket->our_id;
2215   
2216   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2217               "%x: Peer %x initiated tunnel to us\n", 
2218               socket->our_id,
2219               socket->other_peer);
2220   
2221   /* FIXME: Copy MESH handle from lsocket to socket */
2222   
2223   return socket;
2224 }
2225
2226
2227 /**
2228  * Function called whenever an inbound tunnel is destroyed.  Should clean up
2229  * any associated state.  This function is NOT called if the client has
2230  * explicitly asked for the tunnel to be destroyed using
2231  * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
2232  * the tunnel.
2233  *
2234  * @param cls closure (set from GNUNET_MESH_connect)
2235  * @param tunnel connection to the other end (henceforth invalid)
2236  * @param tunnel_ctx place where local state associated
2237  *                   with the tunnel is stored
2238  */
2239 static void 
2240 tunnel_cleaner (void *cls,
2241                 const struct GNUNET_MESH_Tunnel *tunnel,
2242                 void *tunnel_ctx)
2243 {
2244   struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
2245   
2246   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2247               "%x: Peer %x has terminated connection abruptly\n",
2248               socket->our_id,
2249               socket->other_peer);
2250
2251   socket->status = GNUNET_STREAM_SHUTDOWN;
2252
2253   /* Clear Transmit handles */
2254   if (NULL != socket->transmit_handle)
2255     {
2256       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2257       socket->transmit_handle = NULL;
2258     }
2259   socket->tunnel = NULL;
2260 }
2261
2262
2263 /*****************/
2264 /* API functions */
2265 /*****************/
2266
2267
2268 /**
2269  * Tries to open a stream to the target peer
2270  *
2271  * @param cfg configuration to use
2272  * @param target the target peer to which the stream has to be opened
2273  * @param app_port the application port number which uniquely identifies this
2274  *            stream
2275  * @param open_cb this function will be called after stream has be established 
2276  * @param open_cb_cls the closure for open_cb
2277  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
2278  * @return if successful it returns the stream socket; NULL if stream cannot be
2279  *         opened 
2280  */
2281 struct GNUNET_STREAM_Socket *
2282 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
2283                     const struct GNUNET_PeerIdentity *target,
2284                     GNUNET_MESH_ApplicationType app_port,
2285                     GNUNET_STREAM_OpenCallback open_cb,
2286                     void *open_cb_cls,
2287                     ...)
2288 {
2289   struct GNUNET_STREAM_Socket *socket;
2290   struct GNUNET_PeerIdentity own_peer_id;
2291   enum GNUNET_STREAM_Option option;
2292   va_list vargs;                /* Variable arguments */
2293
2294   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2295               "%s\n", __func__);
2296
2297   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2298   socket->other_peer = GNUNET_PEER_intern (target);
2299   socket->open_cb = open_cb;
2300   socket->open_cls = open_cb_cls;
2301   GNUNET_TESTING_get_peer_identity (cfg, &own_peer_id);
2302   socket->our_id = GNUNET_PEER_intern (&own_peer_id);
2303   
2304   /* Set defaults */
2305   socket->retransmit_timeout = 
2306     GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
2307
2308   va_start (vargs, open_cb_cls); /* Parse variable args */
2309   do {
2310     option = va_arg (vargs, enum GNUNET_STREAM_Option);
2311     switch (option)
2312       {
2313       case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
2314         /* Expect struct GNUNET_TIME_Relative */
2315         socket->retransmit_timeout = va_arg (vargs,
2316                                              struct GNUNET_TIME_Relative);
2317         break;
2318       case GNUNET_STREAM_OPTION_END:
2319         break;
2320       }
2321   } while (GNUNET_STREAM_OPTION_END != option);
2322   va_end (vargs);               /* End of variable args parsing */
2323   socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
2324                                       10,  /* QUEUE size as parameter? */
2325                                       socket, /* cls */
2326                                       NULL, /* No inbound tunnel handler */
2327                                       &tunnel_cleaner, /* FIXME: not required? */
2328                                       client_message_handlers,
2329                                       &app_port); /* We don't get inbound tunnels */
2330   if (NULL == socket->mesh)   /* Fail if we cannot connect to mesh */
2331     {
2332       GNUNET_free (socket);
2333       return NULL;
2334     }
2335
2336   /* Now create the mesh tunnel to target */
2337   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2338               "Creating MESH Tunnel\n");
2339   socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
2340                                               NULL, /* Tunnel context */
2341                                               &mesh_peer_connect_callback,
2342                                               &mesh_peer_disconnect_callback,
2343                                               socket);
2344   GNUNET_assert (NULL != socket->tunnel);
2345   GNUNET_MESH_peer_request_connect_add (socket->tunnel,
2346                                         target);
2347   
2348   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2349               "%s() END\n", __func__);
2350   return socket;
2351 }
2352
2353
2354 /**
2355  * Shutdown the stream for reading or writing (man 2 shutdown).
2356  *
2357  * @param socket the stream socket
2358  * @param how SHUT_RD, SHUT_WR or SHUT_RDWR 
2359  */
2360 void
2361 GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
2362                         int how)
2363 {
2364   return;
2365 }
2366
2367
2368 /**
2369  * Closes the stream
2370  *
2371  * @param socket the stream socket
2372  */
2373 void
2374 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
2375 {
2376   struct MessageQueue *head;
2377
2378   GNUNET_break (NULL == socket->read_handle);
2379   GNUNET_break (NULL == socket->write_handle);
2380
2381   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
2382     {
2383       /* socket closed with read task pending!? */
2384       GNUNET_break (0);
2385       GNUNET_SCHEDULER_cancel (socket->read_task_id);
2386       socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
2387     }
2388   
2389   /* Terminate the ack'ing tasks if they are still present */
2390   if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK)
2391     {
2392       GNUNET_SCHEDULER_cancel (socket->ack_task_id);
2393       socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
2394     }
2395
2396   /* Clear Transmit handles */
2397   if (NULL != socket->transmit_handle)
2398     {
2399       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2400       socket->transmit_handle = NULL;
2401     }
2402
2403   /* Clear existing message queue */
2404   while (NULL != (head = socket->queue_head)) {
2405     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
2406                                  socket->queue_tail,
2407                                  head);
2408     GNUNET_free (head->message);
2409     GNUNET_free (head);
2410   }
2411
2412   /* Close associated tunnel */
2413   if (NULL != socket->tunnel)
2414     {
2415       GNUNET_MESH_tunnel_destroy (socket->tunnel);
2416       socket->tunnel = NULL;
2417     }
2418
2419   /* Close mesh connection */
2420   if (NULL != socket->mesh && NULL == socket->lsocket)
2421     {
2422       GNUNET_MESH_disconnect (socket->mesh);
2423       socket->mesh = NULL;
2424     }
2425   
2426   /* Release receive buffer */
2427   if (NULL != socket->receive_buffer)
2428     {
2429       GNUNET_free (socket->receive_buffer);
2430     }
2431
2432   GNUNET_free (socket);
2433 }
2434
2435
2436 /**
2437  * Listens for stream connections for a specific application ports
2438  *
2439  * @param cfg the configuration to use
2440  * @param app_port the application port for which new streams will be accepted
2441  * @param listen_cb this function will be called when a peer tries to establish
2442  *            a stream with us
2443  * @param listen_cb_cls closure for listen_cb
2444  * @return listen socket, NULL for any error
2445  */
2446 struct GNUNET_STREAM_ListenSocket *
2447 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
2448                       GNUNET_MESH_ApplicationType app_port,
2449                       GNUNET_STREAM_ListenCallback listen_cb,
2450                       void *listen_cb_cls)
2451 {
2452   /* FIXME: Add variable args for passing configration options? */
2453   struct GNUNET_STREAM_ListenSocket *lsocket;
2454   struct GNUNET_PeerIdentity our_peer_id;
2455
2456   lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
2457   lsocket->port = app_port;
2458   lsocket->listen_cb = listen_cb;
2459   lsocket->listen_cb_cls = listen_cb_cls;
2460   GNUNET_TESTING_get_peer_identity (cfg, &our_peer_id);
2461   lsocket->our_id = GNUNET_PEER_intern (&our_peer_id);
2462   lsocket->mesh = GNUNET_MESH_connect (cfg,
2463                                        10, /* FIXME: QUEUE size as parameter? */
2464                                        lsocket, /* Closure */
2465                                        &new_tunnel_notify,
2466                                        &tunnel_cleaner,
2467                                        server_message_handlers,
2468                                        &app_port);
2469   GNUNET_assert (NULL != lsocket->mesh);
2470   return lsocket;
2471 }
2472
2473
2474 /**
2475  * Closes the listen socket
2476  *
2477  * @param lsocket the listen socket
2478  */
2479 void
2480 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
2481 {
2482   /* Close MESH connection */
2483   GNUNET_assert (NULL != lsocket->mesh);
2484   GNUNET_MESH_disconnect (lsocket->mesh);
2485   
2486   GNUNET_free (lsocket);
2487 }
2488
2489
2490 /**
2491  * Tries to write the given data to the stream
2492  *
2493  * @param socket the socket representing a stream
2494  * @param data the data buffer from where the data is written into the stream
2495  * @param size the number of bytes to be written from the data buffer
2496  * @param timeout the timeout period
2497  * @param write_cont the function to call upon writing some bytes into the stream
2498  * @param write_cont_cls the closure
2499  * @return handle to cancel the operation
2500  */
2501 struct GNUNET_STREAM_IOWriteHandle *
2502 GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
2503                      const void *data,
2504                      size_t size,
2505                      struct GNUNET_TIME_Relative timeout,
2506                      GNUNET_STREAM_CompletionContinuation write_cont,
2507                      void *write_cont_cls)
2508 {
2509   unsigned int num_needed_packets;
2510   unsigned int packet;
2511   struct GNUNET_STREAM_IOWriteHandle *io_handle;
2512   uint32_t packet_size;
2513   uint32_t payload_size;
2514   struct GNUNET_STREAM_DataMessage *data_msg;
2515   const void *sweep;
2516   struct GNUNET_TIME_Relative ack_deadline;
2517
2518   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2519               "%s\n", __func__);
2520
2521   /* Return NULL if there is already a write request pending */
2522   if (NULL != socket->write_handle)
2523   {
2524     GNUNET_break (0);
2525     return NULL;
2526   }
2527   if (!((STATE_ESTABLISHED == socket->state)
2528         || (STATE_RECEIVE_CLOSE_WAIT == socket->state)
2529         || (STATE_RECEIVE_CLOSED == socket->state)))
2530     {
2531       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2532                   "%x: Attempting to write on a closed (OR) not-yet-established"
2533                   "stream\n",
2534                   socket->our_id);
2535       return NULL;
2536     } 
2537   if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size)
2538     size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH  * max_payload_size;
2539   num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size;
2540   io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
2541   io_handle->socket = socket;
2542   io_handle->write_cont = write_cont;
2543   io_handle->write_cont_cls = write_cont_cls;
2544   io_handle->size = size;
2545   sweep = data;
2546   /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
2547      determined from RTT */
2548   ack_deadline = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
2549   /* Divide the given buffer into packets for sending */
2550   for (packet=0; packet < num_needed_packets; packet++)
2551     {
2552       if ((packet + 1) * max_payload_size < size) 
2553         {
2554           payload_size = max_payload_size;
2555           packet_size = MAX_PACKET_SIZE;
2556         }
2557       else 
2558         {
2559           payload_size = size - packet * max_payload_size;
2560           packet_size =  payload_size + sizeof (struct
2561                                                 GNUNET_STREAM_DataMessage); 
2562         }
2563       io_handle->messages[packet] = GNUNET_malloc (packet_size);
2564       io_handle->messages[packet]->header.header.size = htons (packet_size);
2565       io_handle->messages[packet]->header.header.type =
2566         htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
2567       io_handle->messages[packet]->sequence_number =
2568         htonl (socket->write_sequence_number++);
2569       io_handle->messages[packet]->offset = htonl (socket->write_offset);
2570
2571       /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
2572          determined from RTT */
2573       io_handle->messages[packet]->ack_deadline =
2574         GNUNET_TIME_relative_hton (ack_deadline);
2575       data_msg = io_handle->messages[packet];
2576       /* Copy data from given buffer to the packet */
2577       memcpy (&data_msg[1],
2578               sweep,
2579               payload_size);
2580       sweep += payload_size;
2581       socket->write_offset += payload_size;
2582     }
2583   socket->write_handle = io_handle;
2584   write_data (socket);
2585
2586   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2587               "%s() END\n", __func__);
2588
2589   return io_handle;
2590 }
2591
2592
2593 /**
2594  * Tries to read data from the stream
2595  *
2596  * @param socket the socket representing a stream
2597  * @param timeout the timeout period
2598  * @param proc function to call with data (once only)
2599  * @param proc_cls the closure for proc
2600  * @return handle to cancel the operation
2601  */
2602 struct GNUNET_STREAM_IOReadHandle *
2603 GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
2604                     struct GNUNET_TIME_Relative timeout,
2605                     GNUNET_STREAM_DataProcessor proc,
2606                     void *proc_cls)
2607 {
2608   struct GNUNET_STREAM_IOReadHandle *read_handle;
2609   
2610   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2611               "%x: %s()\n", 
2612               socket->our_id,
2613               __func__);
2614
2615   /* Return NULL if there is already a read handle; the user has to cancel that
2616   first before continuing or has to wait until it is completed */
2617   if (NULL != socket->read_handle) return NULL;
2618
2619   GNUNET_assert (NULL != proc);
2620
2621   read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
2622   read_handle->proc = proc;
2623   read_handle->proc_cls = proc_cls;
2624   socket->read_handle = read_handle;
2625
2626   /* Check if we have a packet at bitmap 0 */
2627   if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
2628                                           0))
2629     {
2630       socket->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
2631                                                        socket);
2632    
2633     }
2634   
2635   /* Setup the read timeout task */
2636   socket->read_io_timeout_task_id =
2637     GNUNET_SCHEDULER_add_delayed (timeout,
2638                                   &read_io_timeout,
2639                                   socket);
2640   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2641               "%x: %s() END\n",
2642               socket->our_id,
2643               __func__);
2644   return read_handle;
2645 }
2646
2647
2648 /**
2649  * Cancel pending write operation.
2650  *
2651  * @param ioh handle to operation to cancel
2652  */
2653 void
2654 GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
2655 {
2656   struct GNUNET_STREAM_Socket *socket = ioh->socket;
2657   unsigned int packet;
2658
2659   GNUNET_assert (NULL != socket->write_handle);
2660   GNUNET_assert (socket->write_handle == ioh);
2661
2662   if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
2663     {
2664       GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
2665       socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
2666     }
2667
2668   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2669     {
2670       if (NULL == ioh->messages[packet]) break;
2671       GNUNET_free (ioh->messages[packet]);
2672     }
2673       
2674   GNUNET_free (socket->write_handle);
2675   socket->write_handle = NULL;
2676   return;
2677 }
2678
2679
2680 /**
2681  * Cancel pending read operation.
2682  *
2683  * @param ioh handle to operation to cancel
2684  */
2685 void
2686 GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh)
2687 {
2688   return;
2689 }