8c25965f17ccb5b97429b579d2760f5954052b36
[oweals/gnunet.git] / src / stream / stream_api.c
1 /*
2   This file is part of GNUnet.
3   (C) 2012 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /* TODO:
22  *
23  * Checks for matching the sender and socket->other_peer in server
24  * message handlers  
25  *
26  * Add code for write io timeout
27  *
28  * Include retransmission for control messages
29  **/
30
31 /**
32  * @file stream/stream_api.c
33  * @brief Implementation of the stream library
34  * @author Sree Harsha Totakura
35  */
36
37
38 #include "platform.h"
39 #include "gnunet_common.h"
40 #include "gnunet_crypto_lib.h"
41 #include "gnunet_stream_lib.h"
42 #include "stream_protocol.h"
43
44 #define LOG(kind,...)                                   \
45   GNUNET_log_from (kind, "stream-api", __VA_ARGS__)
46
47 /**
48  * The maximum packet size of a stream packet
49  */
50 #define MAX_PACKET_SIZE 512//64000
51
52 /**
53  * Receive buffer
54  */
55 #define RECEIVE_BUFFER_SIZE 4096000
56
57 /**
58  * The maximum payload a data message packet can carry
59  */
60 static const size_t max_payload_size = 
61   MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
62
63 /**
64  * states in the Protocol
65  */
66 enum State
67   {
68     /**
69      * Client initialization state
70      */
71     STATE_INIT,
72
73     /**
74      * Listener initialization state 
75      */
76     STATE_LISTEN,
77
78     /**
79      * Pre-connection establishment state
80      */
81     STATE_HELLO_WAIT,
82
83     /**
84      * State where a connection has been established
85      */
86     STATE_ESTABLISHED,
87
88     /**
89      * State where the socket is closed on our side and waiting to be ACK'ed
90      */
91     STATE_RECEIVE_CLOSE_WAIT,
92
93     /**
94      * State where the socket is closed for reading
95      */
96     STATE_RECEIVE_CLOSED,
97
98     /**
99      * State where the socket is closed on our side and waiting to be ACK'ed
100      */
101     STATE_TRANSMIT_CLOSE_WAIT,
102
103     /**
104      * State where the socket is closed for writing
105      */
106     STATE_TRANSMIT_CLOSED,
107
108     /**
109      * State where the socket is closed on our side and waiting to be ACK'ed
110      */
111     STATE_CLOSE_WAIT,
112
113     /**
114      * State where the socket is closed
115      */
116     STATE_CLOSED 
117   };
118
119
120 /**
121  * Functions of this type are called when a message is written
122  *
123  * @param cls the closure from queue_message
124  * @param socket the socket the written message was bound to
125  */
126 typedef void (*SendFinishCallback) (void *cls,
127                                     struct GNUNET_STREAM_Socket *socket);
128
129
130 /**
131  * The send message queue
132  */
133 struct MessageQueue
134 {
135   /**
136    * The message
137    */
138   struct GNUNET_STREAM_MessageHeader *message;
139
140   /**
141    * Callback to be called when the message is sent
142    */
143   SendFinishCallback finish_cb;
144
145   /**
146    * The closure for finish_cb
147    */
148   void *finish_cb_cls;
149
150   /**
151    * The next message in queue. Should be NULL in the last message
152    */
153   struct MessageQueue *next;
154
155   /**
156    * The next message in queue. Should be NULL in the first message
157    */
158   struct MessageQueue *prev;
159 };
160
161
162 /**
163  * The STREAM Socket Handler
164  */
165 struct GNUNET_STREAM_Socket
166 {
167   /**
168    * Retransmission timeout
169    */
170   struct GNUNET_TIME_Relative retransmit_timeout;
171
172   /**
173    * The Acknowledgement Bitmap
174    */
175   GNUNET_STREAM_AckBitmap ack_bitmap;
176
177   /**
178    * Time when the Acknowledgement was queued
179    */
180   struct GNUNET_TIME_Absolute ack_time_registered;
181
182   /**
183    * Queued Acknowledgement deadline
184    */
185   struct GNUNET_TIME_Relative ack_time_deadline;
186
187   /**
188    * The mesh handle
189    */
190   struct GNUNET_MESH_Handle *mesh;
191
192   /**
193    * The mesh tunnel handle
194    */
195   struct GNUNET_MESH_Tunnel *tunnel;
196
197   /**
198    * Stream open closure
199    */
200   void *open_cls;
201
202   /**
203    * Stream open callback
204    */
205   GNUNET_STREAM_OpenCallback open_cb;
206
207   /**
208    * The current transmit handle (if a pending transmit request exists)
209    */
210   struct GNUNET_MESH_TransmitHandle *transmit_handle;
211
212   /**
213    * The current act transmit handle (if a pending ack transmit request exists)
214    */
215   struct GNUNET_MESH_TransmitHandle *ack_transmit_handle;
216
217   /**
218    * Pointer to the current ack message using in ack_task
219    */
220   struct GNUNET_STREAM_AckMessage *ack_msg;
221
222   /**
223    * The current message associated with the transmit handle
224    */
225   struct MessageQueue *queue_head;
226
227   /**
228    * The queue tail, should always point to the last message in queue
229    */
230   struct MessageQueue *queue_tail;
231
232   /**
233    * The write IO_handle associated with this socket
234    */
235   struct GNUNET_STREAM_IOWriteHandle *write_handle;
236
237   /**
238    * The read IO_handle associated with this socket
239    */
240   struct GNUNET_STREAM_IOReadHandle *read_handle;
241
242   /**
243    * The shutdown handle associated with this socket
244    */
245   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
246
247   /**
248    * Buffer for storing received messages
249    */
250   void *receive_buffer;
251
252   /**
253    * The listen socket from which this socket is derived. Should be NULL if it
254    * is not a derived socket
255    */
256   struct GNUNET_STREAM_ListenSocket *lsocket;
257
258   /**
259    * The peer identity of the peer at the other end of the stream
260    */
261   struct GNUNET_PeerIdentity other_peer;
262
263   /**
264    * Task identifier for the read io timeout task
265    */
266   GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id;
267
268   /**
269    * Task identifier for retransmission task after timeout
270    */
271   GNUNET_SCHEDULER_TaskIdentifier retransmission_timeout_task_id;
272
273   /**
274    * The task for sending timely Acks
275    */
276   GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
277
278   /**
279    * Task scheduled to continue a read operation.
280    */
281   GNUNET_SCHEDULER_TaskIdentifier read_task_id;
282
283   /**
284    * The state of the protocol associated with this socket
285    */
286   enum State state;
287
288   /**
289    * The status of the socket
290    */
291   enum GNUNET_STREAM_Status status;
292
293   /**
294    * The number of previous timeouts; FIXME: currently not used
295    */
296   unsigned int retries;
297
298   /**
299    * The application port number (type: uint32_t)
300    */
301   GNUNET_MESH_ApplicationType app_port;
302
303   /**
304    * Whether testing mode is active or not
305    */
306   int testing_active;
307
308   /**
309    * The write sequence number to be set incase of testing
310    */
311   uint32_t testing_set_write_sequence_number_value;
312
313   /**
314    * The session id associated with this stream connection
315    * FIXME: Not used currently, may be removed
316    */
317   uint32_t session_id;
318
319   /**
320    * Write sequence number. Set to random when sending HELLO(client) and
321    * HELLO_ACK(server) 
322    */
323   uint32_t write_sequence_number;
324
325   /**
326    * Read sequence number. This number's value is determined during handshake
327    */
328   uint32_t read_sequence_number;
329
330   /**
331    * The receiver buffer size
332    */
333   uint32_t receive_buffer_size;
334
335   /**
336    * The receiver buffer boundaries
337    */
338   uint32_t receive_buffer_boundaries[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
339
340   /**
341    * receiver's available buffer after the last acknowledged packet
342    */
343   uint32_t receiver_window_available;
344
345   /**
346    * The offset pointer used during write operation
347    */
348   uint32_t write_offset;
349
350   /**
351    * The offset after which we are expecting data
352    */
353   uint32_t read_offset;
354
355   /**
356    * The offset upto which user has read from the received buffer
357    */
358   uint32_t copy_offset;
359 };
360
361
362 /**
363  * A socket for listening
364  */
365 struct GNUNET_STREAM_ListenSocket
366 {
367   /**
368    * The mesh handle
369    */
370   struct GNUNET_MESH_Handle *mesh;
371
372   /**
373    * The callback function which is called after successful opening socket
374    */
375   GNUNET_STREAM_ListenCallback listen_cb;
376
377   /**
378    * The call back closure
379    */
380   void *listen_cb_cls;
381
382   /**
383    * The service port
384    * FIXME: Remove if not required!
385    */
386   GNUNET_MESH_ApplicationType port;
387
388   /**
389    * The retransmit timeout
390    */
391   struct GNUNET_TIME_Relative retransmit_timeout;
392   
393   /**
394    * Whether testing mode is active or not
395    */
396   int testing_active;
397
398   /**
399    * The write sequence number to be set incase of testing
400    */
401   uint32_t testing_set_write_sequence_number_value;
402 };
403
404
405 /**
406  * The IO Write Handle
407  */
408 struct GNUNET_STREAM_IOWriteHandle
409 {
410   /**
411    * The socket to which this write handle is associated
412    */
413   struct GNUNET_STREAM_Socket *socket;
414
415   /**
416    * The packet_buffers associated with this Handle
417    */
418   struct GNUNET_STREAM_DataMessage *messages[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
419
420   /**
421    * The write continuation callback
422    */
423   GNUNET_STREAM_CompletionContinuation write_cont;
424
425   /**
426    * Write continuation closure
427    */
428   void *write_cont_cls;
429
430   /**
431    * The bitmap of this IOHandle; Corresponding bit for a message is set when
432    * it has been acknowledged by the receiver
433    */
434   GNUNET_STREAM_AckBitmap ack_bitmap;
435
436   /**
437    * Number of bytes in this write handle
438    */
439   size_t size;
440 };
441
442
443 /**
444  * The IO Read Handle
445  */
446 struct GNUNET_STREAM_IOReadHandle
447 {
448   /**
449    * Callback for the read processor
450    */
451   GNUNET_STREAM_DataProcessor proc;
452
453   /**
454    * The closure pointer for the read processor callback
455    */
456   void *proc_cls;
457 };
458
459
460 /**
461  * Handle for Shutdown
462  */
463 struct GNUNET_STREAM_ShutdownHandle
464 {
465   /**
466    * The socket associated with this shutdown handle
467    */
468   struct GNUNET_STREAM_Socket *socket;
469
470   /**
471    * Shutdown completion callback
472    */
473   GNUNET_STREAM_ShutdownCompletion completion_cb;
474
475   /**
476    * Closure for completion callback
477    */
478   void *completion_cls;
479
480   /**
481    * Close message retransmission task id
482    */
483   GNUNET_SCHEDULER_TaskIdentifier close_msg_retransmission_task_id;
484
485   /**
486    * Which operation to shutdown? SHUT_RD, SHUT_WR or SHUT_RDWR
487    */
488   int operation;  
489 };
490
491
492 /**
493  * Default value in seconds for various timeouts
494  */
495 static unsigned int default_timeout = 10;
496
497
498 /**
499  * Callback function for sending queued message
500  *
501  * @param cls closure the socket
502  * @param size number of bytes available in buf
503  * @param buf where the callee should write the message
504  * @return number of bytes written to buf
505  */
506 static size_t
507 send_message_notify (void *cls, size_t size, void *buf)
508 {
509   struct GNUNET_STREAM_Socket *socket = cls;
510   struct MessageQueue *head;
511   size_t ret;
512
513   socket->transmit_handle = NULL; /* Remove the transmit handle */
514   head = socket->queue_head;
515   if (NULL == head)
516     return 0; /* just to be safe */
517   if (0 == size)                /* request timed out */
518   {
519     socket->retries++;
520     LOG (GNUNET_ERROR_TYPE_DEBUG,
521          "%s: Message sending timed out. Retry %d \n",
522          GNUNET_i2s (&socket->other_peer),
523          socket->retries);
524     socket->transmit_handle = 
525       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
526                                          0, /* Corking */
527                                          1, /* Priority */
528                                          /* FIXME: exponential backoff */
529                                          socket->retransmit_timeout,
530                                          &socket->other_peer,
531                                          ntohs (head->message->header.size),
532                                          &send_message_notify,
533                                          socket);
534     return 0;
535   }
536
537   ret = ntohs (head->message->header.size);
538   GNUNET_assert (size >= ret);
539   memcpy (buf, head->message, ret);
540   if (NULL != head->finish_cb)
541   {
542     head->finish_cb (head->finish_cb_cls, socket);
543   }
544   GNUNET_CONTAINER_DLL_remove (socket->queue_head,
545                                socket->queue_tail,
546                                head);
547   GNUNET_free (head->message);
548   GNUNET_free (head);
549   head = socket->queue_head;
550   if (NULL != head)    /* more pending messages to send */
551   {
552     socket->retries = 0;
553     socket->transmit_handle = 
554       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
555                                          0, /* Corking */
556                                          1, /* Priority */
557                                          /* FIXME: exponential backoff */
558                                          socket->retransmit_timeout,
559                                          &socket->other_peer,
560                                          ntohs (head->message->header.size),
561                                          &send_message_notify,
562                                          socket);
563   }
564   return ret;
565 }
566
567
568 /**
569  * Queues a message for sending using the mesh connection of a socket
570  *
571  * @param socket the socket whose mesh connection is used
572  * @param message the message to be sent
573  * @param finish_cb the callback to be called when the message is sent
574  * @param finish_cb_cls the closure for the callback
575  */
576 static void
577 queue_message (struct GNUNET_STREAM_Socket *socket,
578                struct GNUNET_STREAM_MessageHeader *message,
579                SendFinishCallback finish_cb,
580                void *finish_cb_cls)
581 {
582   struct MessageQueue *queue_entity;
583
584   GNUNET_assert 
585     ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA)
586      && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK));
587
588   LOG (GNUNET_ERROR_TYPE_DEBUG,
589        "%s: Queueing message of type %d and size %d\n",
590        GNUNET_i2s (&socket->other_peer),
591        ntohs (message->header.type),
592        ntohs (message->header.size));
593   GNUNET_assert (NULL != message);
594   queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
595   queue_entity->message = message;
596   queue_entity->finish_cb = finish_cb;
597   queue_entity->finish_cb_cls = finish_cb_cls;
598   GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
599                                     socket->queue_tail,
600                                     queue_entity);
601   if (NULL == socket->transmit_handle)
602   {
603     socket->retries = 0;
604     socket->transmit_handle = 
605       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
606                                          0, /* Corking */
607                                          1, /* Priority */
608                                          socket->retransmit_timeout,
609                                          &socket->other_peer,
610                                          ntohs (message->header.size),
611                                          &send_message_notify,
612                                          socket);
613   }
614 }
615
616
617 /**
618  * Copies a message and queues it for sending using the mesh connection of
619  * given socket 
620  *
621  * @param socket the socket whose mesh connection is used
622  * @param message the message to be sent
623  * @param finish_cb the callback to be called when the message is sent
624  * @param finish_cb_cls the closure for the callback
625  */
626 static void
627 copy_and_queue_message (struct GNUNET_STREAM_Socket *socket,
628                         const struct GNUNET_STREAM_MessageHeader *message,
629                         SendFinishCallback finish_cb,
630                         void *finish_cb_cls)
631 {
632   struct GNUNET_STREAM_MessageHeader *msg_copy;
633   uint16_t size;
634   
635   size = ntohs (message->header.size);
636   msg_copy = GNUNET_malloc (size);
637   memcpy (msg_copy, message, size);
638   queue_message (socket, msg_copy, finish_cb, finish_cb_cls);
639 }
640
641
642 /**
643  * Callback function for sending ack message
644  *
645  * @param cls closure the ACK message created in ack_task
646  * @param size number of bytes available in buffer
647  * @param buf where the callee should write the message
648  * @return number of bytes written to buf
649  */
650 static size_t
651 send_ack_notify (void *cls, size_t size, void *buf)
652 {
653   struct GNUNET_STREAM_Socket *socket = cls;
654
655   if (0 == size)
656   {
657     LOG (GNUNET_ERROR_TYPE_DEBUG,
658          "%s called with size 0\n", __func__);
659     return 0;
660   }
661   GNUNET_assert (ntohs (socket->ack_msg->header.header.size) <= size);
662   
663   size = ntohs (socket->ack_msg->header.header.size);
664   memcpy (buf, socket->ack_msg, size);
665   
666   GNUNET_free (socket->ack_msg);
667   socket->ack_msg = NULL;
668   socket->ack_transmit_handle = NULL;
669   return size;
670 }
671
672 /**
673  * Writes data using the given socket. The amount of data written is limited by
674  * the receiver_window_size
675  *
676  * @param socket the socket to use
677  */
678 static void 
679 write_data (struct GNUNET_STREAM_Socket *socket);
680
681 /**
682  * Task for retransmitting data messages if they aren't ACK before their ack
683  * deadline 
684  *
685  * @param cls the socket
686  * @param tc the Task context
687  */
688 static void
689 retransmission_timeout_task (void *cls,
690                              const struct GNUNET_SCHEDULER_TaskContext *tc)
691 {
692   struct GNUNET_STREAM_Socket *socket = cls;
693   
694   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
695     return;
696
697   LOG (GNUNET_ERROR_TYPE_DEBUG,
698        "%s: Retransmitting DATA...\n", GNUNET_i2s (&socket->other_peer));
699   socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
700   write_data (socket);
701 }
702
703
704 /**
705  * Task for sending ACK message
706  *
707  * @param cls the socket
708  * @param tc the Task context
709  */
710 static void
711 ack_task (void *cls,
712           const struct GNUNET_SCHEDULER_TaskContext *tc)
713 {
714   struct GNUNET_STREAM_Socket *socket = cls;
715   struct GNUNET_STREAM_AckMessage *ack_msg;
716
717   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
718   {
719     return;
720   }
721   socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
722   /* Create the ACK Message */
723   ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
724   ack_msg->header.header.size = htons (sizeof (struct 
725                                                GNUNET_STREAM_AckMessage));
726   ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
727   ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
728   ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
729   ack_msg->receive_window_remaining = 
730     htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
731   socket->ack_msg = ack_msg;
732   /* Request MESH for sending ACK */
733   socket->ack_transmit_handle = 
734     GNUNET_MESH_notify_transmit_ready (socket->tunnel,
735                                        0, /* Corking */
736                                        1, /* Priority */
737                                        socket->retransmit_timeout,
738                                        &socket->other_peer,
739                                        ntohs (ack_msg->header.header.size),
740                                        &send_ack_notify,
741                                        socket);
742 }
743
744
745 /**
746  * Retransmission task for shutdown messages
747  *
748  * @param cls the shutdown handle
749  * @param tc the Task Context
750  */
751 static void
752 close_msg_retransmission_task (void *cls,
753                                const struct GNUNET_SCHEDULER_TaskContext *tc)
754 {
755   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle = cls;
756   struct GNUNET_STREAM_MessageHeader *msg;
757   struct GNUNET_STREAM_Socket *socket;
758
759   GNUNET_assert (NULL != shutdown_handle);
760   socket = shutdown_handle->socket;
761
762   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
763   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
764   switch (shutdown_handle->operation)
765   {
766   case SHUT_RDWR:
767     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
768     break;
769   case SHUT_RD:
770     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
771     break;
772   case SHUT_WR:
773     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
774     break;
775   default:
776     GNUNET_free (msg);
777     shutdown_handle->close_msg_retransmission_task_id = 
778       GNUNET_SCHEDULER_NO_TASK;
779     return;
780   }
781   queue_message (socket, msg, NULL, NULL);
782   shutdown_handle->close_msg_retransmission_task_id =
783     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
784                                   &close_msg_retransmission_task,
785                                   shutdown_handle);
786 }
787
788
789 /**
790  * Function to modify a bit in GNUNET_STREAM_AckBitmap
791  *
792  * @param bitmap the bitmap to modify
793  * @param bit the bit number to modify
794  * @param value GNUNET_YES to on, GNUNET_NO to off
795  */
796 static void
797 ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
798                       unsigned int bit, 
799                       int value)
800 {
801   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
802   if (GNUNET_YES == value)
803     *bitmap |= (1LL << bit);
804   else
805     *bitmap &= ~(1LL << bit);
806 }
807
808
809 /**
810  * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
811  *
812  * @param bitmap address of the bitmap that has to be checked
813  * @param bit the bit number to check
814  * @return GNUNET_YES if the bit is set; GNUNET_NO if not
815  */
816 static uint8_t
817 ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
818                       unsigned int bit)
819 {
820   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
821   return 0 != (*bitmap & (1LL << bit));
822 }
823
824
825 /**
826  * Writes data using the given socket. The amount of data written is limited by
827  * the receiver_window_size
828  *
829  * @param socket the socket to use
830  */
831 static void 
832 write_data (struct GNUNET_STREAM_Socket *socket)
833 {
834   struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
835   int packet;                   /* Although an int, should never be negative */
836   int ack_packet;
837
838   ack_packet = -1;
839   /* Find the last acknowledged packet */
840   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
841   {      
842     if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
843                                             packet))
844       ack_packet = packet;        
845     else if (NULL == io_handle->messages[packet])
846       break;
847   }
848   /* Resend packets which weren't ack'ed */
849   for (packet=0; packet < ack_packet; packet++)
850   {
851     if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
852                                            packet))
853     {
854       LOG (GNUNET_ERROR_TYPE_DEBUG,
855            "%s: Placing DATA message with sequence %u in send queue\n",
856            GNUNET_i2s (&socket->other_peer),
857            ntohl (io_handle->messages[packet]->sequence_number));
858       copy_and_queue_message (socket,
859                               &io_handle->messages[packet]->header,
860                               NULL,
861                               NULL);
862     }
863   }
864   packet = ack_packet + 1;
865   /* Now send new packets if there is enough buffer space */
866   while ( (NULL != io_handle->messages[packet]) &&
867           (socket->receiver_window_available 
868            >= ntohs (io_handle->messages[packet]->header.header.size)) &&
869           (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
870   {
871     socket->receiver_window_available -= 
872       ntohs (io_handle->messages[packet]->header.header.size);
873     LOG (GNUNET_ERROR_TYPE_DEBUG,
874          "%s: Placing DATA message with sequence %u in send queue\n",
875          GNUNET_i2s (&socket->other_peer),
876          ntohl (io_handle->messages[packet]->sequence_number));
877     copy_and_queue_message (socket,
878                             &io_handle->messages[packet]->header,
879                             NULL,
880                             NULL);
881     packet++;
882   }
883   if (GNUNET_SCHEDULER_NO_TASK == socket->retransmission_timeout_task_id)
884     socket->retransmission_timeout_task_id = 
885       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 
886                                     (GNUNET_TIME_UNIT_SECONDS, 8),
887                                     &retransmission_timeout_task,
888                                     socket);
889 }
890
891
892 /**
893  * Task for calling the read processor
894  *
895  * @param cls the socket
896  * @param tc the task context
897  */
898 static void
899 call_read_processor (void *cls,
900                      const struct GNUNET_SCHEDULER_TaskContext *tc)
901 {
902   struct GNUNET_STREAM_Socket *socket = cls;
903   size_t read_size;
904   size_t valid_read_size;
905   unsigned int packet;
906   uint32_t sequence_increase;
907   uint32_t offset_increase;
908
909   socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
910   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
911     return;
912
913   if (NULL == socket->receive_buffer) 
914     return;
915
916   GNUNET_assert (NULL != socket->read_handle);
917   GNUNET_assert (NULL != socket->read_handle->proc);
918
919   /* Check the bitmap for any holes */
920   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
921   {
922     if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
923                                            packet))
924       break;
925   }
926   /* We only call read processor if we have the first packet */
927   GNUNET_assert (0 < packet);
928   valid_read_size = 
929     socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
930   GNUNET_assert (0 != valid_read_size);
931   /* Cancel the read_io_timeout_task */
932   GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task_id);
933   socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
934   /* Call the data processor */
935   LOG (GNUNET_ERROR_TYPE_DEBUG,
936        "%s: Calling read processor\n",
937        GNUNET_i2s (&socket->other_peer));
938   read_size = 
939     socket->read_handle->proc (socket->read_handle->proc_cls,
940                                socket->status,
941                                socket->receive_buffer + socket->copy_offset,
942                                valid_read_size);
943   LOG (GNUNET_ERROR_TYPE_DEBUG,
944        "%s: Read processor read %d bytes\n",
945        GNUNET_i2s (&socket->other_peer), read_size);
946   LOG (GNUNET_ERROR_TYPE_DEBUG,
947        "%s: Read processor completed successfully\n",
948        GNUNET_i2s (&socket->other_peer));
949   /* Free the read handle */
950   GNUNET_free (socket->read_handle);
951   socket->read_handle = NULL;
952   GNUNET_assert (read_size <= valid_read_size);
953   socket->copy_offset += read_size;
954   /* Determine upto which packet we can remove from the buffer */
955   for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
956   {
957     if (socket->copy_offset == socket->receive_buffer_boundaries[packet])
958     { packet++; break; }
959     if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
960       break;
961   }
962
963   /* If no packets can be removed we can't move the buffer */
964   if (0 == packet) return;
965   sequence_increase = packet;
966   LOG (GNUNET_ERROR_TYPE_DEBUG,
967        "%s: Sequence increase after read processor completion: %u\n",
968        GNUNET_i2s (&socket->other_peer), sequence_increase);
969
970   /* Shift the data in the receive buffer */
971   socket->receive_buffer = 
972     memmove (socket->receive_buffer,
973              socket->receive_buffer 
974              + socket->receive_buffer_boundaries[sequence_increase-1],
975              socket->receive_buffer_size
976              - socket->receive_buffer_boundaries[sequence_increase-1]);
977   /* Shift the bitmap */
978   socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
979   /* Set read_sequence_number */
980   socket->read_sequence_number += sequence_increase;
981   /* Set read_offset */
982   offset_increase = socket->receive_buffer_boundaries[sequence_increase-1];
983   socket->read_offset += offset_increase;
984   /* Fix copy_offset */
985   GNUNET_assert (offset_increase <= socket->copy_offset);
986   socket->copy_offset -= offset_increase;
987   /* Fix relative boundaries */
988   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
989   {
990     if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase)
991     {
992       uint32_t ahead_buffer_boundary;
993
994       ahead_buffer_boundary = 
995         socket->receive_buffer_boundaries[packet + sequence_increase];
996       if (0 == ahead_buffer_boundary)
997         socket->receive_buffer_boundaries[packet] = 0;
998       else
999       {
1000         GNUNET_assert (offset_increase < ahead_buffer_boundary);
1001         socket->receive_buffer_boundaries[packet] = 
1002           ahead_buffer_boundary - offset_increase;
1003       }
1004     }
1005     else
1006       socket->receive_buffer_boundaries[packet] = 0;
1007   }
1008 }
1009
1010
1011 /**
1012  * Cancels the existing read io handle
1013  *
1014  * @param cls the closure from the SCHEDULER call
1015  * @param tc the task context
1016  */
1017 static void
1018 read_io_timeout (void *cls, 
1019                  const struct GNUNET_SCHEDULER_TaskContext *tc)
1020 {
1021   struct GNUNET_STREAM_Socket *socket = cls;
1022   GNUNET_STREAM_DataProcessor proc;
1023   void *proc_cls;
1024
1025   socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
1026   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
1027   {
1028     LOG (GNUNET_ERROR_TYPE_DEBUG,
1029          "%s: Read task timedout - Cancelling it\n",
1030          GNUNET_i2s (&socket->other_peer));
1031     GNUNET_SCHEDULER_cancel (socket->read_task_id);
1032     socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
1033   }
1034   GNUNET_assert (NULL != socket->read_handle);
1035   proc = socket->read_handle->proc;
1036   proc_cls = socket->read_handle->proc_cls;
1037
1038   GNUNET_free (socket->read_handle);
1039   socket->read_handle = NULL;
1040   /* Call the read processor to signal timeout */
1041   proc (proc_cls,
1042         GNUNET_STREAM_TIMEOUT,
1043         NULL,
1044         0);
1045 }
1046
1047
1048 /**
1049  * Handler for DATA messages; Same for both client and server
1050  *
1051  * @param socket the socket through which the ack was received
1052  * @param tunnel connection to the other end
1053  * @param sender who sent the message
1054  * @param msg the data message
1055  * @param atsi performance data for the connection
1056  * @return GNUNET_OK to keep the connection open,
1057  *         GNUNET_SYSERR to close it (signal serious error)
1058  */
1059 static int
1060 handle_data (struct GNUNET_STREAM_Socket *socket,
1061              struct GNUNET_MESH_Tunnel *tunnel,
1062              const struct GNUNET_PeerIdentity *sender,
1063              const struct GNUNET_STREAM_DataMessage *msg,
1064              const struct GNUNET_ATS_Information*atsi)
1065 {
1066   const void *payload;
1067   uint32_t bytes_needed;
1068   uint32_t relative_offset;
1069   uint32_t relative_sequence_number;
1070   uint16_t size;
1071
1072   size = htons (msg->header.header.size);
1073   if (size < sizeof (struct GNUNET_STREAM_DataMessage))
1074   {
1075     GNUNET_break_op (0);
1076     return GNUNET_SYSERR;
1077   }
1078
1079   if (0 != memcmp (sender,
1080                    &socket->other_peer,
1081                    sizeof (struct GNUNET_PeerIdentity)))
1082   {
1083     LOG (GNUNET_ERROR_TYPE_DEBUG,
1084          "%s: Received DATA from non-confirming peer\n",
1085          GNUNET_i2s (&socket->other_peer));
1086     return GNUNET_YES;
1087   }
1088
1089   switch (socket->state)
1090   {
1091   case STATE_ESTABLISHED:
1092   case STATE_TRANSMIT_CLOSED:
1093   case STATE_TRANSMIT_CLOSE_WAIT:
1094       
1095     /* check if the message's sequence number is in the range we are
1096        expecting */
1097     relative_sequence_number = 
1098       ntohl (msg->sequence_number) - socket->read_sequence_number;
1099     if ( relative_sequence_number > GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
1100     {
1101       LOG (GNUNET_ERROR_TYPE_DEBUG,
1102            "%s: Ignoring received message with sequence number %u\n",
1103            GNUNET_i2s (&socket->other_peer),
1104            ntohl (msg->sequence_number));
1105       /* Start ACK sending task if one is not already present */
1106       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1107       {
1108         socket->ack_task_id = 
1109           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1110                                         (msg->ack_deadline),
1111                                         &ack_task,
1112                                         socket);
1113       }
1114       return GNUNET_YES;
1115     }
1116       
1117     /* Check if we have already seen this message */
1118     if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
1119                                             relative_sequence_number))
1120     {
1121       LOG (GNUNET_ERROR_TYPE_DEBUG,
1122            "%s: Ignoring already received message with sequence number %u\n",
1123            GNUNET_i2s (&socket->other_peer),
1124            ntohl (msg->sequence_number));
1125       /* Start ACK sending task if one is not already present */
1126       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1127       {
1128         socket->ack_task_id = 
1129           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1130                                         (msg->ack_deadline),
1131                                         &ack_task,
1132                                         socket);
1133       }
1134       return GNUNET_YES;
1135     }
1136
1137     LOG (GNUNET_ERROR_TYPE_DEBUG,
1138          "%s: Receiving DATA with sequence number: %u and size: %d from %s\n",
1139          GNUNET_i2s (&socket->other_peer),
1140          ntohl (msg->sequence_number),
1141          ntohs (msg->header.header.size),
1142          GNUNET_i2s (&socket->other_peer));
1143       
1144     /* Check if we have to allocate the buffer */
1145     size -= sizeof (struct GNUNET_STREAM_DataMessage);
1146     relative_offset = ntohl (msg->offset) - socket->read_offset;
1147     bytes_needed = relative_offset + size;
1148     if (bytes_needed > socket->receive_buffer_size)
1149     {
1150       if (bytes_needed <= RECEIVE_BUFFER_SIZE)
1151       {
1152         socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
1153                                                  bytes_needed);
1154         socket->receive_buffer_size = bytes_needed;
1155       }
1156       else
1157       {
1158         LOG (GNUNET_ERROR_TYPE_DEBUG,
1159              "%s: Cannot accommodate packet %d as buffer is full\n",
1160              GNUNET_i2s (&socket->other_peer),
1161              ntohl (msg->sequence_number));
1162         return GNUNET_YES;
1163       }
1164     }
1165       
1166     /* Copy Data to buffer */
1167     payload = &msg[1];
1168     GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
1169     memcpy (socket->receive_buffer + relative_offset,
1170             payload,
1171             size);
1172     socket->receive_buffer_boundaries[relative_sequence_number] = 
1173       relative_offset + size;
1174       
1175     /* Modify the ACK bitmap */
1176     ackbitmap_modify_bit (&socket->ack_bitmap,
1177                           relative_sequence_number,
1178                           GNUNET_YES);
1179
1180     /* Start ACK sending task if one is not already present */
1181     if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1182     {
1183       socket->ack_task_id = 
1184         GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1185                                       (msg->ack_deadline),
1186                                       &ack_task,
1187                                       socket);
1188     }
1189
1190     if ((NULL != socket->read_handle) /* A read handle is waiting */
1191         /* There is no current read task */
1192         && (GNUNET_SCHEDULER_NO_TASK == socket->read_task_id)
1193         /* We have the first packet */
1194         && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap,
1195                                                0)))
1196     {
1197       LOG (GNUNET_ERROR_TYPE_DEBUG,
1198            "%s: Scheduling read processor\n",
1199            GNUNET_i2s (&socket->other_peer));
1200           
1201       socket->read_task_id = 
1202         GNUNET_SCHEDULER_add_now (&call_read_processor,
1203                                   socket);
1204     }
1205       
1206     break;
1207
1208   default:
1209     LOG (GNUNET_ERROR_TYPE_DEBUG,
1210          "%s: Received data message when it cannot be handled\n",
1211          GNUNET_i2s (&socket->other_peer));
1212     break;
1213   }
1214   return GNUNET_YES;
1215 }
1216
1217
1218 /**
1219  * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1220  *
1221  * @param cls the socket (set from GNUNET_MESH_connect)
1222  * @param tunnel connection to the other end
1223  * @param tunnel_ctx place to store local state associated with the tunnel
1224  * @param sender who sent the message
1225  * @param message the actual message
1226  * @param atsi performance data for the connection
1227  * @return GNUNET_OK to keep the connection open,
1228  *         GNUNET_SYSERR to close it (signal serious error)
1229  */
1230 static int
1231 client_handle_data (void *cls,
1232                     struct GNUNET_MESH_Tunnel *tunnel,
1233                     void **tunnel_ctx,
1234                     const struct GNUNET_PeerIdentity *sender,
1235                     const struct GNUNET_MessageHeader *message,
1236                     const struct GNUNET_ATS_Information*atsi)
1237 {
1238   struct GNUNET_STREAM_Socket *socket = cls;
1239
1240   return handle_data (socket, 
1241                       tunnel, 
1242                       sender, 
1243                       (const struct GNUNET_STREAM_DataMessage *) message, 
1244                       atsi);
1245 }
1246
1247
1248 /**
1249  * Callback to set state to ESTABLISHED
1250  *
1251  * @param cls the closure from queue_message FIXME: document
1252  * @param socket the socket to requiring state change
1253  */
1254 static void
1255 set_state_established (void *cls,
1256                        struct GNUNET_STREAM_Socket *socket)
1257 {
1258   LOG (GNUNET_ERROR_TYPE_DEBUG, 
1259        "%s: Attaining ESTABLISHED state\n",
1260        GNUNET_i2s (&socket->other_peer));
1261   socket->write_offset = 0;
1262   socket->read_offset = 0;
1263   socket->state = STATE_ESTABLISHED;
1264   /* FIXME: What if listen_cb is NULL */
1265   if (NULL != socket->lsocket)
1266   {
1267     LOG (GNUNET_ERROR_TYPE_DEBUG,
1268          "%s: Calling listen callback\n",
1269          GNUNET_i2s (&socket->other_peer));
1270     if (GNUNET_SYSERR == 
1271         socket->lsocket->listen_cb (socket->lsocket->listen_cb_cls,
1272                                     socket,
1273                                     &socket->other_peer))
1274     {
1275       socket->state = STATE_CLOSED;
1276       /* FIXME: We should close in a decent way */
1277       GNUNET_MESH_tunnel_destroy (socket->tunnel); /* Destroy the tunnel */
1278       GNUNET_free (socket);
1279     }
1280   }
1281   else if (socket->open_cb)
1282     socket->open_cb (socket->open_cls, socket);
1283 }
1284
1285
1286 /**
1287  * Callback to set state to HELLO_WAIT
1288  *
1289  * @param cls the closure from queue_message
1290  * @param socket the socket to requiring state change
1291  */
1292 static void
1293 set_state_hello_wait (void *cls,
1294                       struct GNUNET_STREAM_Socket *socket)
1295 {
1296   GNUNET_assert (STATE_INIT == socket->state);
1297   LOG (GNUNET_ERROR_TYPE_DEBUG, 
1298        "%s: Attaining HELLO_WAIT state\n",
1299        GNUNET_i2s (&socket->other_peer));
1300   socket->state = STATE_HELLO_WAIT;
1301 }
1302
1303
1304 /**
1305  * Callback to set state to CLOSE_WAIT
1306  *
1307  * @param cls the closure from queue_message
1308  * @param socket the socket requiring state change
1309  */
1310 static void
1311 set_state_close_wait (void *cls,
1312                       struct GNUNET_STREAM_Socket *socket)
1313 {
1314   LOG (GNUNET_ERROR_TYPE_DEBUG,
1315        "%s: Attaing CLOSE_WAIT state\n",
1316        GNUNET_i2s (&socket->other_peer));
1317   socket->state = STATE_CLOSE_WAIT;
1318   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1319   socket->receive_buffer = NULL;
1320   socket->receive_buffer_size = 0;
1321 }
1322
1323
1324 /**
1325  * Callback to set state to RECEIVE_CLOSE_WAIT
1326  *
1327  * @param cls the closure from queue_message
1328  * @param socket the socket requiring state change
1329  */
1330 static void
1331 set_state_receive_close_wait (void *cls,
1332                               struct GNUNET_STREAM_Socket *socket)
1333 {
1334   LOG (GNUNET_ERROR_TYPE_DEBUG,
1335        "%s: Attaing RECEIVE_CLOSE_WAIT state\n",
1336        GNUNET_i2s (&socket->other_peer));
1337   socket->state = STATE_RECEIVE_CLOSE_WAIT;
1338   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1339   socket->receive_buffer = NULL;
1340   socket->receive_buffer_size = 0;
1341 }
1342
1343
1344 /**
1345  * Callback to set state to TRANSMIT_CLOSE_WAIT
1346  *
1347  * @param cls the closure from queue_message
1348  * @param socket the socket requiring state change
1349  */
1350 static void
1351 set_state_transmit_close_wait (void *cls,
1352                                struct GNUNET_STREAM_Socket *socket)
1353 {
1354   LOG (GNUNET_ERROR_TYPE_DEBUG,
1355        "%s: Attaing TRANSMIT_CLOSE_WAIT state\n",
1356        GNUNET_i2s (&socket->other_peer));
1357   socket->state = STATE_TRANSMIT_CLOSE_WAIT;
1358 }
1359
1360
1361 /**
1362  * Callback to set state to CLOSED
1363  *
1364  * @param cls the closure from queue_message
1365  * @param socket the socket requiring state change
1366  */
1367 static void
1368 set_state_closed (void *cls,
1369                   struct GNUNET_STREAM_Socket *socket)
1370 {
1371   socket->state = STATE_CLOSED;
1372 }
1373
1374 /**
1375  * Returns a new HelloAckMessage. Also sets the write sequence number for the
1376  * socket
1377  *
1378  * @param socket the socket for which this HelloAckMessage has to be generated
1379  * @return the HelloAckMessage
1380  */
1381 static struct GNUNET_STREAM_HelloAckMessage *
1382 generate_hello_ack_msg (struct GNUNET_STREAM_Socket *socket)
1383 {
1384   struct GNUNET_STREAM_HelloAckMessage *msg;
1385
1386   /* Get the random sequence number */
1387   socket->write_sequence_number = 
1388     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1389   LOG (GNUNET_ERROR_TYPE_DEBUG,
1390        "%s: Generated write sequence number %u\n",
1391        GNUNET_i2s (&socket->other_peer),
1392        (unsigned int) socket->write_sequence_number);
1393   
1394   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1395   msg->header.header.size = 
1396     htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1397   msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1398   msg->sequence_number = htonl (socket->write_sequence_number);
1399   msg->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
1400
1401   return msg;
1402 }
1403
1404
1405 /**
1406  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1407  *
1408  * @param cls the socket (set from GNUNET_MESH_connect)
1409  * @param tunnel connection to the other end
1410  * @param tunnel_ctx this is NULL
1411  * @param sender who sent the message
1412  * @param message the actual message
1413  * @param atsi performance data for the connection
1414  * @return GNUNET_OK to keep the connection open,
1415  *         GNUNET_SYSERR to close it (signal serious error)
1416  */
1417 static int
1418 client_handle_hello_ack (void *cls,
1419                          struct GNUNET_MESH_Tunnel *tunnel,
1420                          void **tunnel_ctx,
1421                          const struct GNUNET_PeerIdentity *sender,
1422                          const struct GNUNET_MessageHeader *message,
1423                          const struct GNUNET_ATS_Information*atsi)
1424 {
1425   struct GNUNET_STREAM_Socket *socket = cls;
1426   const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
1427   struct GNUNET_STREAM_HelloAckMessage *reply;
1428
1429   if (0 != memcmp (sender,
1430                    &socket->other_peer,
1431                    sizeof (struct GNUNET_PeerIdentity)))
1432   {
1433     LOG (GNUNET_ERROR_TYPE_DEBUG,
1434          "%s: Received HELLO_ACK from non-confirming peer\n",
1435          GNUNET_i2s (&socket->other_peer));
1436     return GNUNET_YES;
1437   }
1438   ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
1439   LOG (GNUNET_ERROR_TYPE_DEBUG,
1440        "%s: Received HELLO_ACK from %s\n",
1441        GNUNET_i2s (&socket->other_peer),
1442        GNUNET_i2s (&socket->other_peer));
1443
1444   GNUNET_assert (socket->tunnel == tunnel);
1445   switch (socket->state)
1446   {
1447   case STATE_HELLO_WAIT:
1448     socket->read_sequence_number = ntohl (ack_msg->sequence_number);
1449     LOG (GNUNET_ERROR_TYPE_DEBUG,
1450          "%s: Read sequence number %u\n",
1451          GNUNET_i2s (&socket->other_peer),
1452          (unsigned int) socket->read_sequence_number);
1453     socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
1454     reply = generate_hello_ack_msg (socket);
1455     queue_message (socket,
1456                    &reply->header, 
1457                    &set_state_established, 
1458                    NULL);      
1459     return GNUNET_OK;
1460   case STATE_ESTABLISHED:
1461   case STATE_RECEIVE_CLOSE_WAIT:
1462     // call statistics (# ACKs ignored++)
1463     return GNUNET_OK;
1464   case STATE_INIT:
1465   default:
1466     LOG (GNUNET_ERROR_TYPE_DEBUG,
1467          "%s: Server %s sent HELLO_ACK when in state %d\n", 
1468          GNUNET_i2s (&socket->other_peer),
1469          GNUNET_i2s (&socket->other_peer),
1470          socket->state);
1471     socket->state = STATE_CLOSED; // introduce STATE_ERROR?
1472     return GNUNET_SYSERR;
1473   }
1474
1475 }
1476
1477
1478 /**
1479  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1480  *
1481  * @param cls the socket (set from GNUNET_MESH_connect)
1482  * @param tunnel connection to the other end
1483  * @param tunnel_ctx this is NULL
1484  * @param sender who sent the message
1485  * @param message the actual message
1486  * @param atsi performance data for the connection
1487  * @return GNUNET_OK to keep the connection open,
1488  *         GNUNET_SYSERR to close it (signal serious error)
1489  */
1490 static int
1491 client_handle_reset (void *cls,
1492                      struct GNUNET_MESH_Tunnel *tunnel,
1493                      void **tunnel_ctx,
1494                      const struct GNUNET_PeerIdentity *sender,
1495                      const struct GNUNET_MessageHeader *message,
1496                      const struct GNUNET_ATS_Information*atsi)
1497 {
1498   // struct GNUNET_STREAM_Socket *socket = cls;
1499
1500   return GNUNET_OK;
1501 }
1502
1503
1504 /**
1505  * Common message handler for handling TRANSMIT_CLOSE messages
1506  *
1507  * @param socket the socket through which the ack was received
1508  * @param tunnel connection to the other end
1509  * @param sender who sent the message
1510  * @param msg the transmit close message
1511  * @param atsi performance data for the connection
1512  * @return GNUNET_OK to keep the connection open,
1513  *         GNUNET_SYSERR to close it (signal serious error)
1514  */
1515 static int
1516 handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
1517                        struct GNUNET_MESH_Tunnel *tunnel,
1518                        const struct GNUNET_PeerIdentity *sender,
1519                        const struct GNUNET_STREAM_MessageHeader *msg,
1520                        const struct GNUNET_ATS_Information*atsi)
1521 {
1522   struct GNUNET_STREAM_MessageHeader *reply;
1523
1524   switch (socket->state)
1525   {
1526   case STATE_ESTABLISHED:
1527     socket->state = STATE_RECEIVE_CLOSED;
1528
1529     /* Send TRANSMIT_CLOSE_ACK */
1530     reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1531     reply->header.type = 
1532       htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
1533     reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1534     queue_message (socket, reply, NULL, NULL);
1535     break;
1536
1537   default:
1538     /* FIXME: Call statistics? */
1539     break;
1540   }
1541   return GNUNET_YES;
1542 }
1543
1544
1545 /**
1546  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1547  *
1548  * @param cls the socket (set from GNUNET_MESH_connect)
1549  * @param tunnel connection to the other end
1550  * @param tunnel_ctx this is NULL
1551  * @param sender who sent the message
1552  * @param message the actual message
1553  * @param atsi performance data for the connection
1554  * @return GNUNET_OK to keep the connection open,
1555  *         GNUNET_SYSERR to close it (signal serious error)
1556  */
1557 static int
1558 client_handle_transmit_close (void *cls,
1559                               struct GNUNET_MESH_Tunnel *tunnel,
1560                               void **tunnel_ctx,
1561                               const struct GNUNET_PeerIdentity *sender,
1562                               const struct GNUNET_MessageHeader *message,
1563                               const struct GNUNET_ATS_Information*atsi)
1564 {
1565   struct GNUNET_STREAM_Socket *socket = cls;
1566   
1567   return handle_transmit_close (socket,
1568                                 tunnel,
1569                                 sender,
1570                                 (struct GNUNET_STREAM_MessageHeader *)message,
1571                                 atsi);
1572 }
1573
1574
1575 /**
1576  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_*_CLOSE_ACK messages
1577  *
1578  * @param socket the socket
1579  * @param tunnel connection to the other end
1580  * @param sender who sent the message
1581  * @param message the actual message
1582  * @param atsi performance data for the connection
1583  * @param operation the close operation which is being ACK'ed
1584  * @return GNUNET_OK to keep the connection open,
1585  *         GNUNET_SYSERR to close it (signal serious error)
1586  */
1587 static int
1588 handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
1589                           struct GNUNET_MESH_Tunnel *tunnel,
1590                           const struct GNUNET_PeerIdentity *sender,
1591                           const struct GNUNET_STREAM_MessageHeader *message,
1592                           const struct GNUNET_ATS_Information *atsi,
1593                           int operation)
1594 {
1595   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
1596
1597   shutdown_handle = socket->shutdown_handle;
1598   if (NULL == shutdown_handle)
1599   {
1600     LOG (GNUNET_ERROR_TYPE_DEBUG,
1601          "%s: Received CLOSE_ACK when shutdown handle is NULL\n",
1602          GNUNET_i2s (&socket->other_peer));
1603     return GNUNET_OK;
1604   }
1605
1606   switch (operation)
1607   {
1608   case SHUT_RDWR:
1609     switch (socket->state)
1610     {
1611     case STATE_CLOSE_WAIT:
1612       if (SHUT_RDWR != shutdown_handle->operation)
1613       {
1614         LOG (GNUNET_ERROR_TYPE_DEBUG,
1615              "%s: Received CLOSE_ACK when shutdown handle is not for "
1616              "SHUT_RDWR\n",
1617              GNUNET_i2s (&socket->other_peer));
1618         return GNUNET_OK;
1619       }
1620
1621       LOG (GNUNET_ERROR_TYPE_DEBUG,
1622            "%s: Received CLOSE_ACK from %s\n",
1623            GNUNET_i2s (&socket->other_peer),
1624            GNUNET_i2s (&socket->other_peer));
1625       socket->state = STATE_CLOSED;
1626       break;
1627     default:
1628       LOG (GNUNET_ERROR_TYPE_DEBUG,
1629            "%s: Received CLOSE_ACK when in it not expected\n",
1630            GNUNET_i2s (&socket->other_peer));
1631       return GNUNET_OK;
1632     }
1633     break;
1634
1635   case SHUT_RD:
1636     switch (socket->state)
1637     {
1638     case STATE_RECEIVE_CLOSE_WAIT:
1639       if (SHUT_RD != shutdown_handle->operation)
1640       {
1641         LOG (GNUNET_ERROR_TYPE_DEBUG,
1642              "%s: Received RECEIVE_CLOSE_ACK when shutdown handle "
1643              "is not for SHUT_RD\n",
1644              GNUNET_i2s (&socket->other_peer));
1645         return GNUNET_OK;
1646       }
1647
1648       LOG (GNUNET_ERROR_TYPE_DEBUG,
1649            "%s: Received RECEIVE_CLOSE_ACK from %s\n",
1650            GNUNET_i2s (&socket->other_peer),
1651            GNUNET_i2s (&socket->other_peer));
1652       socket->state = STATE_RECEIVE_CLOSED;
1653       break;
1654     default:
1655       LOG (GNUNET_ERROR_TYPE_DEBUG,
1656            "%s: Received RECEIVE_CLOSE_ACK when in it not expected\n",
1657            GNUNET_i2s (&socket->other_peer));
1658       return GNUNET_OK;
1659     }
1660
1661     break;
1662   case SHUT_WR:
1663     switch (socket->state)
1664     {
1665     case STATE_TRANSMIT_CLOSE_WAIT:
1666       if (SHUT_WR != shutdown_handle->operation)
1667       {
1668         LOG (GNUNET_ERROR_TYPE_DEBUG,
1669              "%s: Received TRANSMIT_CLOSE_ACK when shutdown handle "
1670              "is not for SHUT_WR\n",
1671              GNUNET_i2s (&socket->other_peer));
1672         return GNUNET_OK;
1673       }
1674
1675       LOG (GNUNET_ERROR_TYPE_DEBUG,
1676            "%s: Received TRANSMIT_CLOSE_ACK from %s\n",
1677            GNUNET_i2s (&socket->other_peer),
1678            GNUNET_i2s (&socket->other_peer));
1679       socket->state = STATE_TRANSMIT_CLOSED;
1680       break;
1681     default:
1682       LOG (GNUNET_ERROR_TYPE_DEBUG,
1683            "%s: Received TRANSMIT_CLOSE_ACK when in it not expected\n",
1684            GNUNET_i2s (&socket->other_peer));
1685           
1686       return GNUNET_OK;
1687     }
1688     break;
1689   default:
1690     GNUNET_assert (0);
1691   }
1692
1693   if (NULL != shutdown_handle->completion_cb) /* Shutdown completion */
1694     shutdown_handle->completion_cb(shutdown_handle->completion_cls,
1695                                    operation);
1696   GNUNET_free (shutdown_handle); /* Free shutdown handle */
1697   socket->shutdown_handle = NULL;
1698   if (GNUNET_SCHEDULER_NO_TASK
1699       != shutdown_handle->close_msg_retransmission_task_id)
1700   {
1701     GNUNET_SCHEDULER_cancel
1702       (shutdown_handle->close_msg_retransmission_task_id);
1703     shutdown_handle->close_msg_retransmission_task_id =
1704       GNUNET_SCHEDULER_NO_TASK;
1705   }
1706   return GNUNET_OK;
1707 }
1708
1709
1710 /**
1711  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1712  *
1713  * @param cls the socket (set from GNUNET_MESH_connect)
1714  * @param tunnel connection to the other end
1715  * @param tunnel_ctx this is NULL
1716  * @param sender who sent the message
1717  * @param message the actual message
1718  * @param atsi performance data for the connection
1719  * @return GNUNET_OK to keep the connection open,
1720  *         GNUNET_SYSERR to close it (signal serious error)
1721  */
1722 static int
1723 client_handle_transmit_close_ack (void *cls,
1724                                   struct GNUNET_MESH_Tunnel *tunnel,
1725                                   void **tunnel_ctx,
1726                                   const struct GNUNET_PeerIdentity *sender,
1727                                   const struct GNUNET_MessageHeader *message,
1728                                   const struct GNUNET_ATS_Information*atsi)
1729 {
1730   struct GNUNET_STREAM_Socket *socket = cls;
1731
1732   return handle_generic_close_ack (socket,
1733                                    tunnel,
1734                                    sender,
1735                                    (const struct GNUNET_STREAM_MessageHeader *)
1736                                    message,
1737                                    atsi,
1738                                    SHUT_WR);
1739 }
1740
1741
1742 /**
1743  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1744  *
1745  * @param socket the socket
1746  * @param tunnel connection to the other end
1747  * @param sender who sent the message
1748  * @param message the actual message
1749  * @param atsi performance data for the connection
1750  * @return GNUNET_OK to keep the connection open,
1751  *         GNUNET_SYSERR to close it (signal serious error)
1752  */
1753 static int
1754 handle_receive_close (struct GNUNET_STREAM_Socket *socket,
1755                       struct GNUNET_MESH_Tunnel *tunnel,
1756                       const struct GNUNET_PeerIdentity *sender,
1757                       const struct GNUNET_STREAM_MessageHeader *message,
1758                       const struct GNUNET_ATS_Information *atsi)
1759 {
1760   struct GNUNET_STREAM_MessageHeader *receive_close_ack;
1761
1762   switch (socket->state)
1763   {
1764   case STATE_INIT:
1765   case STATE_LISTEN:
1766   case STATE_HELLO_WAIT:
1767     LOG (GNUNET_ERROR_TYPE_DEBUG,
1768          "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
1769          GNUNET_i2s (&socket->other_peer));
1770     return GNUNET_OK;
1771   default:
1772     break;
1773   }
1774   
1775   LOG (GNUNET_ERROR_TYPE_DEBUG,
1776        "%s: Received RECEIVE_CLOSE from %s\n",
1777        GNUNET_i2s (&socket->other_peer),
1778        GNUNET_i2s (&socket->other_peer));
1779   receive_close_ack =
1780     GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1781   receive_close_ack->header.size =
1782     htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1783   receive_close_ack->header.type =
1784     htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK);
1785   queue_message (socket,
1786                  receive_close_ack,
1787                  &set_state_closed,
1788                  NULL);
1789   
1790   /* FIXME: Handle the case where write handle is present; the write operation
1791      should be deemed as finised and the write continuation callback
1792      has to be called with the stream status GNUNET_STREAM_SHUTDOWN */
1793   return GNUNET_OK;
1794 }
1795
1796
1797 /**
1798  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1799  *
1800  * @param cls the socket (set from GNUNET_MESH_connect)
1801  * @param tunnel connection to the other end
1802  * @param tunnel_ctx this is NULL
1803  * @param sender who sent the message
1804  * @param message the actual message
1805  * @param atsi performance data for the connection
1806  * @return GNUNET_OK to keep the connection open,
1807  *         GNUNET_SYSERR to close it (signal serious error)
1808  */
1809 static int
1810 client_handle_receive_close (void *cls,
1811                              struct GNUNET_MESH_Tunnel *tunnel,
1812                              void **tunnel_ctx,
1813                              const struct GNUNET_PeerIdentity *sender,
1814                              const struct GNUNET_MessageHeader *message,
1815                              const struct GNUNET_ATS_Information*atsi)
1816 {
1817   struct GNUNET_STREAM_Socket *socket = cls;
1818
1819   return
1820     handle_receive_close (socket,
1821                           tunnel,
1822                           sender,
1823                           (const struct GNUNET_STREAM_MessageHeader *) message,
1824                           atsi);
1825 }
1826
1827
1828 /**
1829  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1830  *
1831  * @param cls the socket (set from GNUNET_MESH_connect)
1832  * @param tunnel connection to the other end
1833  * @param tunnel_ctx this is NULL
1834  * @param sender who sent the message
1835  * @param message the actual message
1836  * @param atsi performance data for the connection
1837  * @return GNUNET_OK to keep the connection open,
1838  *         GNUNET_SYSERR to close it (signal serious error)
1839  */
1840 static int
1841 client_handle_receive_close_ack (void *cls,
1842                                  struct GNUNET_MESH_Tunnel *tunnel,
1843                                  void **tunnel_ctx,
1844                                  const struct GNUNET_PeerIdentity *sender,
1845                                  const struct GNUNET_MessageHeader *message,
1846                                  const struct GNUNET_ATS_Information*atsi)
1847 {
1848   struct GNUNET_STREAM_Socket *socket = cls;
1849
1850   return handle_generic_close_ack (socket,
1851                                    tunnel,
1852                                    sender,
1853                                    (const struct GNUNET_STREAM_MessageHeader *)
1854                                    message,
1855                                    atsi,
1856                                    SHUT_RD);
1857 }
1858
1859
1860 /**
1861  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1862  *
1863  * @param socket the socket
1864  * @param tunnel connection to the other end
1865  * @param sender who sent the message
1866  * @param message the actual message
1867  * @param atsi performance data for the connection
1868  * @return GNUNET_OK to keep the connection open,
1869  *         GNUNET_SYSERR to close it (signal serious error)
1870  */
1871 static int
1872 handle_close (struct GNUNET_STREAM_Socket *socket,
1873               struct GNUNET_MESH_Tunnel *tunnel,
1874               const struct GNUNET_PeerIdentity *sender,
1875               const struct GNUNET_STREAM_MessageHeader *message,
1876               const struct GNUNET_ATS_Information*atsi)
1877 {
1878   struct GNUNET_STREAM_MessageHeader *close_ack;
1879
1880   switch (socket->state)
1881   {
1882   case STATE_INIT:
1883   case STATE_LISTEN:
1884   case STATE_HELLO_WAIT:
1885     LOG (GNUNET_ERROR_TYPE_DEBUG,
1886          "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
1887          GNUNET_i2s (&socket->other_peer));
1888     return GNUNET_OK;
1889   default:
1890     break;
1891   }
1892
1893   LOG (GNUNET_ERROR_TYPE_DEBUG,
1894        "%s: Received CLOSE from %s\n",
1895        GNUNET_i2s (&socket->other_peer),
1896        GNUNET_i2s (&socket->other_peer));
1897   close_ack = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1898   close_ack->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1899   close_ack->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK);
1900   queue_message (socket,
1901                  close_ack,
1902                  &set_state_closed,
1903                  NULL);
1904   if (socket->state == STATE_CLOSED)
1905     return GNUNET_OK;
1906
1907   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1908   socket->receive_buffer = NULL;
1909   socket->receive_buffer_size = 0;
1910   return GNUNET_OK;
1911 }
1912
1913
1914 /**
1915  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1916  *
1917  * @param cls the socket (set from GNUNET_MESH_connect)
1918  * @param tunnel connection to the other end
1919  * @param tunnel_ctx this is NULL
1920  * @param sender who sent the message
1921  * @param message the actual message
1922  * @param atsi performance data for the connection
1923  * @return GNUNET_OK to keep the connection open,
1924  *         GNUNET_SYSERR to close it (signal serious error)
1925  */
1926 static int
1927 client_handle_close (void *cls,
1928                      struct GNUNET_MESH_Tunnel *tunnel,
1929                      void **tunnel_ctx,
1930                      const struct GNUNET_PeerIdentity *sender,
1931                      const struct GNUNET_MessageHeader *message,
1932                      const struct GNUNET_ATS_Information*atsi)
1933 {
1934   struct GNUNET_STREAM_Socket *socket = cls;
1935
1936   return handle_close (socket,
1937                        tunnel,
1938                        sender,
1939                        (const struct GNUNET_STREAM_MessageHeader *) message,
1940                        atsi);
1941 }
1942
1943
1944 /**
1945  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1946  *
1947  * @param cls the socket (set from GNUNET_MESH_connect)
1948  * @param tunnel connection to the other end
1949  * @param tunnel_ctx this is NULL
1950  * @param sender who sent the message
1951  * @param message the actual message
1952  * @param atsi performance data for the connection
1953  * @return GNUNET_OK to keep the connection open,
1954  *         GNUNET_SYSERR to close it (signal serious error)
1955  */
1956 static int
1957 client_handle_close_ack (void *cls,
1958                          struct GNUNET_MESH_Tunnel *tunnel,
1959                          void **tunnel_ctx,
1960                          const struct GNUNET_PeerIdentity *sender,
1961                          const struct GNUNET_MessageHeader *message,
1962                          const struct GNUNET_ATS_Information *atsi)
1963 {
1964   struct GNUNET_STREAM_Socket *socket = cls;
1965
1966   return handle_generic_close_ack (socket,
1967                                    tunnel,
1968                                    sender,
1969                                    (const struct GNUNET_STREAM_MessageHeader *) 
1970                                    message,
1971                                    atsi,
1972                                    SHUT_RDWR);
1973 }
1974
1975 /*****************************/
1976 /* Server's Message Handlers */
1977 /*****************************/
1978
1979 /**
1980  * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1981  *
1982  * @param cls the closure
1983  * @param tunnel connection to the other end
1984  * @param tunnel_ctx the socket
1985  * @param sender who sent the message
1986  * @param message the actual message
1987  * @param atsi performance data for the connection
1988  * @return GNUNET_OK to keep the connection open,
1989  *         GNUNET_SYSERR to close it (signal serious error)
1990  */
1991 static int
1992 server_handle_data (void *cls,
1993                     struct GNUNET_MESH_Tunnel *tunnel,
1994                     void **tunnel_ctx,
1995                     const struct GNUNET_PeerIdentity *sender,
1996                     const struct GNUNET_MessageHeader *message,
1997                     const struct GNUNET_ATS_Information*atsi)
1998 {
1999   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2000
2001   return handle_data (socket,
2002                       tunnel,
2003                       sender,
2004                       (const struct GNUNET_STREAM_DataMessage *)message,
2005                       atsi);
2006 }
2007
2008
2009 /**
2010  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
2011  *
2012  * @param cls the closure
2013  * @param tunnel connection to the other end
2014  * @param tunnel_ctx the socket
2015  * @param sender who sent the message
2016  * @param message the actual message
2017  * @param atsi performance data for the connection
2018  * @return GNUNET_OK to keep the connection open,
2019  *         GNUNET_SYSERR to close it (signal serious error)
2020  */
2021 static int
2022 server_handle_hello (void *cls,
2023                      struct GNUNET_MESH_Tunnel *tunnel,
2024                      void **tunnel_ctx,
2025                      const struct GNUNET_PeerIdentity *sender,
2026                      const struct GNUNET_MessageHeader *message,
2027                      const struct GNUNET_ATS_Information*atsi)
2028 {
2029   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2030   struct GNUNET_STREAM_HelloAckMessage *reply;
2031
2032   if (0 != memcmp (sender,
2033                    &socket->other_peer,
2034                    sizeof (struct GNUNET_PeerIdentity)))
2035   {
2036     LOG (GNUNET_ERROR_TYPE_DEBUG,
2037          "%s: Received HELLO from non-confirming peer\n",
2038          GNUNET_i2s (&socket->other_peer));
2039     return GNUNET_YES;
2040   }
2041
2042   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == 
2043                  ntohs (message->type));
2044   GNUNET_assert (socket->tunnel == tunnel);
2045   LOG (GNUNET_ERROR_TYPE_DEBUG,
2046        "%s: Received HELLO from %s\n", 
2047        GNUNET_i2s (&socket->other_peer),
2048        GNUNET_i2s (&socket->other_peer));
2049
2050   if (STATE_INIT == socket->state)
2051   {
2052     reply = generate_hello_ack_msg (socket);
2053     queue_message (socket, 
2054                    &reply->header,
2055                    &set_state_hello_wait, 
2056                    NULL);
2057   }
2058   else
2059   {
2060     LOG (GNUNET_ERROR_TYPE_DEBUG,
2061          "%s: Client sent HELLO when in state %d\n", 
2062          GNUNET_i2s (&socket->other_peer),
2063          socket->state);
2064     /* FIXME: Send RESET? */
2065       
2066   }
2067   return GNUNET_OK;
2068 }
2069
2070
2071 /**
2072  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
2073  *
2074  * @param cls the closure
2075  * @param tunnel connection to the other end
2076  * @param tunnel_ctx the socket
2077  * @param sender who sent the message
2078  * @param message the actual message
2079  * @param atsi performance data for the connection
2080  * @return GNUNET_OK to keep the connection open,
2081  *         GNUNET_SYSERR to close it (signal serious error)
2082  */
2083 static int
2084 server_handle_hello_ack (void *cls,
2085                          struct GNUNET_MESH_Tunnel *tunnel,
2086                          void **tunnel_ctx,
2087                          const struct GNUNET_PeerIdentity *sender,
2088                          const struct GNUNET_MessageHeader *message,
2089                          const struct GNUNET_ATS_Information*atsi)
2090 {
2091   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2092   const struct GNUNET_STREAM_HelloAckMessage *ack_message;
2093
2094   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK ==
2095                  ntohs (message->type));
2096   GNUNET_assert (socket->tunnel == tunnel);
2097   ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
2098   if (STATE_HELLO_WAIT == socket->state)
2099   {
2100     LOG (GNUNET_ERROR_TYPE_DEBUG,
2101          "%s: Received HELLO_ACK from %s\n",
2102          GNUNET_i2s (&socket->other_peer),
2103          GNUNET_i2s (&socket->other_peer));
2104     socket->read_sequence_number = ntohl (ack_message->sequence_number);
2105     LOG (GNUNET_ERROR_TYPE_DEBUG,
2106          "%s: Read sequence number %u\n",
2107          GNUNET_i2s (&socket->other_peer),
2108          (unsigned int) socket->read_sequence_number);
2109     socket->receiver_window_available = 
2110       ntohl (ack_message->receiver_window_size);
2111     /* Attain ESTABLISHED state */
2112     set_state_established (NULL, socket);
2113   }
2114   else
2115   {
2116     LOG (GNUNET_ERROR_TYPE_DEBUG,
2117          "Client sent HELLO_ACK when in state %d\n", socket->state);
2118     /* FIXME: Send RESET? */
2119       
2120   }
2121   return GNUNET_OK;
2122 }
2123
2124
2125 /**
2126  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
2127  *
2128  * @param cls the closure
2129  * @param tunnel connection to the other end
2130  * @param tunnel_ctx the socket
2131  * @param sender who sent the message
2132  * @param message the actual message
2133  * @param atsi performance data for the connection
2134  * @return GNUNET_OK to keep the connection open,
2135  *         GNUNET_SYSERR to close it (signal serious error)
2136  */
2137 static int
2138 server_handle_reset (void *cls,
2139                      struct GNUNET_MESH_Tunnel *tunnel,
2140                      void **tunnel_ctx,
2141                      const struct GNUNET_PeerIdentity *sender,
2142                      const struct GNUNET_MessageHeader *message,
2143                      const struct GNUNET_ATS_Information*atsi)
2144 {
2145   // struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2146
2147   return GNUNET_OK;
2148 }
2149
2150
2151 /**
2152  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
2153  *
2154  * @param cls the closure
2155  * @param tunnel connection to the other end
2156  * @param tunnel_ctx the socket
2157  * @param sender who sent the message
2158  * @param message the actual message
2159  * @param atsi performance data for the connection
2160  * @return GNUNET_OK to keep the connection open,
2161  *         GNUNET_SYSERR to close it (signal serious error)
2162  */
2163 static int
2164 server_handle_transmit_close (void *cls,
2165                               struct GNUNET_MESH_Tunnel *tunnel,
2166                               void **tunnel_ctx,
2167                               const struct GNUNET_PeerIdentity *sender,
2168                               const struct GNUNET_MessageHeader *message,
2169                               const struct GNUNET_ATS_Information*atsi)
2170 {
2171   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2172
2173   return handle_transmit_close (socket,
2174                                 tunnel,
2175                                 sender,
2176                                 (struct GNUNET_STREAM_MessageHeader *)message,
2177                                 atsi);
2178 }
2179
2180
2181 /**
2182  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
2183  *
2184  * @param cls the closure
2185  * @param tunnel connection to the other end
2186  * @param tunnel_ctx the socket
2187  * @param sender who sent the message
2188  * @param message the actual message
2189  * @param atsi performance data for the connection
2190  * @return GNUNET_OK to keep the connection open,
2191  *         GNUNET_SYSERR to close it (signal serious error)
2192  */
2193 static int
2194 server_handle_transmit_close_ack (void *cls,
2195                                   struct GNUNET_MESH_Tunnel *tunnel,
2196                                   void **tunnel_ctx,
2197                                   const struct GNUNET_PeerIdentity *sender,
2198                                   const struct GNUNET_MessageHeader *message,
2199                                   const struct GNUNET_ATS_Information*atsi)
2200 {
2201   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2202
2203   return handle_generic_close_ack (socket,
2204                                    tunnel,
2205                                    sender,
2206                                    (const struct GNUNET_STREAM_MessageHeader *)
2207                                    message,
2208                                    atsi,
2209                                    SHUT_WR);
2210 }
2211
2212
2213 /**
2214  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
2215  *
2216  * @param cls the closure
2217  * @param tunnel connection to the other end
2218  * @param tunnel_ctx the socket
2219  * @param sender who sent the message
2220  * @param message the actual message
2221  * @param atsi performance data for the connection
2222  * @return GNUNET_OK to keep the connection open,
2223  *         GNUNET_SYSERR to close it (signal serious error)
2224  */
2225 static int
2226 server_handle_receive_close (void *cls,
2227                              struct GNUNET_MESH_Tunnel *tunnel,
2228                              void **tunnel_ctx,
2229                              const struct GNUNET_PeerIdentity *sender,
2230                              const struct GNUNET_MessageHeader *message,
2231                              const struct GNUNET_ATS_Information*atsi)
2232 {
2233   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2234
2235   return
2236     handle_receive_close (socket,
2237                           tunnel,
2238                           sender,
2239                           (const struct GNUNET_STREAM_MessageHeader *) message,
2240                           atsi);
2241 }
2242
2243
2244 /**
2245  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
2246  *
2247  * @param cls the closure
2248  * @param tunnel connection to the other end
2249  * @param tunnel_ctx the socket
2250  * @param sender who sent the message
2251  * @param message the actual message
2252  * @param atsi performance data for the connection
2253  * @return GNUNET_OK to keep the connection open,
2254  *         GNUNET_SYSERR to close it (signal serious error)
2255  */
2256 static int
2257 server_handle_receive_close_ack (void *cls,
2258                                  struct GNUNET_MESH_Tunnel *tunnel,
2259                                  void **tunnel_ctx,
2260                                  const struct GNUNET_PeerIdentity *sender,
2261                                  const struct GNUNET_MessageHeader *message,
2262                                  const struct GNUNET_ATS_Information*atsi)
2263 {
2264   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2265
2266   return handle_generic_close_ack (socket,
2267                                    tunnel,
2268                                    sender,
2269                                    (const struct GNUNET_STREAM_MessageHeader *)
2270                                    message,
2271                                    atsi,
2272                                    SHUT_RD);
2273 }
2274
2275
2276 /**
2277  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
2278  *
2279  * @param cls the listen socket (from GNUNET_MESH_connect in
2280  *          GNUNET_STREAM_listen) 
2281  * @param tunnel connection to the other end
2282  * @param tunnel_ctx the socket
2283  * @param sender who sent the message
2284  * @param message the actual message
2285  * @param atsi performance data for the connection
2286  * @return GNUNET_OK to keep the connection open,
2287  *         GNUNET_SYSERR to close it (signal serious error)
2288  */
2289 static int
2290 server_handle_close (void *cls,
2291                      struct GNUNET_MESH_Tunnel *tunnel,
2292                      void **tunnel_ctx,
2293                      const struct GNUNET_PeerIdentity *sender,
2294                      const struct GNUNET_MessageHeader *message,
2295                      const struct GNUNET_ATS_Information*atsi)
2296 {
2297   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2298   
2299   return handle_close (socket,
2300                        tunnel,
2301                        sender,
2302                        (const struct GNUNET_STREAM_MessageHeader *) message,
2303                        atsi);
2304 }
2305
2306
2307 /**
2308  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
2309  *
2310  * @param cls the closure
2311  * @param tunnel connection to the other end
2312  * @param tunnel_ctx the socket
2313  * @param sender who sent the message
2314  * @param message the actual message
2315  * @param atsi performance data for the connection
2316  * @return GNUNET_OK to keep the connection open,
2317  *         GNUNET_SYSERR to close it (signal serious error)
2318  */
2319 static int
2320 server_handle_close_ack (void *cls,
2321                          struct GNUNET_MESH_Tunnel *tunnel,
2322                          void **tunnel_ctx,
2323                          const struct GNUNET_PeerIdentity *sender,
2324                          const struct GNUNET_MessageHeader *message,
2325                          const struct GNUNET_ATS_Information*atsi)
2326 {
2327   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2328
2329   return handle_generic_close_ack (socket,
2330                                    tunnel,
2331                                    sender,
2332                                    (const struct GNUNET_STREAM_MessageHeader *) 
2333                                    message,
2334                                    atsi,
2335                                    SHUT_RDWR);
2336 }
2337
2338
2339 /**
2340  * Handler for DATA_ACK messages
2341  *
2342  * @param socket the socket through which the ack was received
2343  * @param tunnel connection to the other end
2344  * @param sender who sent the message
2345  * @param ack the acknowledgment message
2346  * @param atsi performance data for the connection
2347  * @return GNUNET_OK to keep the connection open,
2348  *         GNUNET_SYSERR to close it (signal serious error)
2349  */
2350 static int
2351 handle_ack (struct GNUNET_STREAM_Socket *socket,
2352             struct GNUNET_MESH_Tunnel *tunnel,
2353             const struct GNUNET_PeerIdentity *sender,
2354             const struct GNUNET_STREAM_AckMessage *ack,
2355             const struct GNUNET_ATS_Information*atsi)
2356 {
2357   unsigned int packet;
2358   int need_retransmission;
2359   
2360
2361   if (0 != memcmp (sender,
2362                    &socket->other_peer,
2363                    sizeof (struct GNUNET_PeerIdentity)))
2364   {
2365     LOG (GNUNET_ERROR_TYPE_DEBUG,
2366          "%s: Received ACK from non-confirming peer\n",
2367          GNUNET_i2s (&socket->other_peer));
2368     return GNUNET_YES;
2369   }
2370
2371   switch (socket->state)
2372   {
2373   case (STATE_ESTABLISHED):
2374   case (STATE_RECEIVE_CLOSED):
2375   case (STATE_RECEIVE_CLOSE_WAIT):
2376     if (NULL == socket->write_handle)
2377     {
2378       LOG (GNUNET_ERROR_TYPE_DEBUG,
2379            "%s: Received DATA_ACK when write_handle is NULL\n",
2380            GNUNET_i2s (&socket->other_peer));
2381       return GNUNET_OK;
2382     }
2383     /* FIXME: increment in the base sequence number is breaking current flow
2384      */
2385     if (!((socket->write_sequence_number 
2386            - ntohl (ack->base_sequence_number)) < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
2387     {
2388       LOG (GNUNET_ERROR_TYPE_DEBUG,
2389            "%s: Received DATA_ACK with unexpected base sequence number\n",
2390            GNUNET_i2s (&socket->other_peer));
2391       LOG (GNUNET_ERROR_TYPE_DEBUG,
2392            "%s: Current write sequence: %u; Ack's base sequence: %u\n",
2393            GNUNET_i2s (&socket->other_peer),
2394            socket->write_sequence_number,
2395            ntohl (ack->base_sequence_number));
2396       return GNUNET_OK;
2397     }
2398     /* FIXME: include the case when write_handle is cancelled - ignore the 
2399        acks */
2400
2401     LOG (GNUNET_ERROR_TYPE_DEBUG,
2402          "%s: Received DATA_ACK from %s\n",
2403          GNUNET_i2s (&socket->other_peer),
2404          GNUNET_i2s (&socket->other_peer));
2405       
2406     /* Cancel the retransmission task */
2407     if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
2408     {
2409       GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
2410       socket->retransmission_timeout_task_id = 
2411         GNUNET_SCHEDULER_NO_TASK;
2412     }
2413
2414     for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2415     {
2416       if (NULL == socket->write_handle->messages[packet]) break;
2417       if (ntohl (ack->base_sequence_number)
2418           >= ntohl (socket->write_handle->messages[packet]->sequence_number))
2419         ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
2420                               packet,
2421                               GNUNET_YES);
2422       else
2423         if (GNUNET_YES == 
2424             ackbitmap_is_bit_set (&socket->write_handle->ack_bitmap,
2425                                   ntohl (socket->write_handle->messages[packet]->sequence_number)
2426                                   - ntohl (ack->base_sequence_number)))
2427           ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
2428                                 packet,
2429                                 GNUNET_YES);
2430     }
2431
2432     /* Update the receive window remaining
2433        FIXME : Should update with the value from a data ack with greater
2434        sequence number */
2435     socket->receiver_window_available = 
2436       ntohl (ack->receive_window_remaining);
2437
2438     /* Check if we have received all acknowledgements */
2439     need_retransmission = GNUNET_NO;
2440     for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2441     {
2442       if (NULL == socket->write_handle->messages[packet]) break;
2443       if (GNUNET_YES != ackbitmap_is_bit_set 
2444           (&socket->write_handle->ack_bitmap,packet))
2445       {
2446         need_retransmission = GNUNET_YES;
2447         break;
2448       }
2449     }
2450     if (GNUNET_YES == need_retransmission)
2451     {
2452       write_data (socket);
2453     }
2454     else      /* We have to call the write continuation callback now */
2455     {
2456       struct GNUNET_STREAM_IOWriteHandle *write_handle;
2457       
2458       /* Free the packets */
2459       for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2460       {
2461         GNUNET_free_non_null (socket->write_handle->messages[packet]);
2462       }
2463       write_handle = socket->write_handle;
2464       socket->write_handle = NULL;
2465       if (NULL != write_handle->write_cont)
2466         write_handle->write_cont (write_handle->write_cont_cls,
2467                                   socket->status,
2468                                   write_handle->size);
2469       /* We are done with the write handle - Freeing it */
2470       GNUNET_free (write_handle);
2471       LOG (GNUNET_ERROR_TYPE_DEBUG,
2472            "%s: Write completion callback completed\n",
2473            GNUNET_i2s (&socket->other_peer));      
2474     }
2475     break;
2476   default:
2477     break;
2478   }
2479   return GNUNET_OK;
2480 }
2481
2482
2483 /**
2484  * Handler for DATA_ACK messages
2485  *
2486  * @param cls the 'struct GNUNET_STREAM_Socket'
2487  * @param tunnel connection to the other end
2488  * @param tunnel_ctx unused
2489  * @param sender who sent the message
2490  * @param message the actual message
2491  * @param atsi performance data for the connection
2492  * @return GNUNET_OK to keep the connection open,
2493  *         GNUNET_SYSERR to close it (signal serious error)
2494  */
2495 static int
2496 client_handle_ack (void *cls,
2497                    struct GNUNET_MESH_Tunnel *tunnel,
2498                    void **tunnel_ctx,
2499                    const struct GNUNET_PeerIdentity *sender,
2500                    const struct GNUNET_MessageHeader *message,
2501                    const struct GNUNET_ATS_Information*atsi)
2502 {
2503   struct GNUNET_STREAM_Socket *socket = cls;
2504   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2505  
2506   return handle_ack (socket, tunnel, sender, ack, atsi);
2507 }
2508
2509
2510 /**
2511  * Handler for DATA_ACK messages
2512  *
2513  * @param cls the server's listen socket
2514  * @param tunnel connection to the other end
2515  * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
2516  * @param sender who sent the message
2517  * @param message the actual message
2518  * @param atsi performance data for the connection
2519  * @return GNUNET_OK to keep the connection open,
2520  *         GNUNET_SYSERR to close it (signal serious error)
2521  */
2522 static int
2523 server_handle_ack (void *cls,
2524                    struct GNUNET_MESH_Tunnel *tunnel,
2525                    void **tunnel_ctx,
2526                    const struct GNUNET_PeerIdentity *sender,
2527                    const struct GNUNET_MessageHeader *message,
2528                    const struct GNUNET_ATS_Information*atsi)
2529 {
2530   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2531   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2532  
2533   return handle_ack (socket, tunnel, sender, ack, atsi);
2534 }
2535
2536
2537 /**
2538  * For client message handlers, the stream socket is in the
2539  * closure argument.
2540  */
2541 static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
2542   {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2543   {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2544    sizeof (struct GNUNET_STREAM_AckMessage) },
2545   {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2546    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2547   {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2548    sizeof (struct GNUNET_STREAM_MessageHeader)},
2549   {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2550    sizeof (struct GNUNET_STREAM_MessageHeader)},
2551   {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2552    sizeof (struct GNUNET_STREAM_MessageHeader)},
2553   {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2554    sizeof (struct GNUNET_STREAM_MessageHeader)},
2555   {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2556    sizeof (struct GNUNET_STREAM_MessageHeader)},
2557   {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2558    sizeof (struct GNUNET_STREAM_MessageHeader)},
2559   {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2560    sizeof (struct GNUNET_STREAM_MessageHeader)},
2561   {NULL, 0, 0}
2562 };
2563
2564
2565 /**
2566  * For server message handlers, the stream socket is in the
2567  * tunnel context, and the listen socket in the closure argument.
2568  */
2569 static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
2570   {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2571   {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2572    sizeof (struct GNUNET_STREAM_AckMessage) },
2573   {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 
2574    sizeof (struct GNUNET_STREAM_MessageHeader)},
2575   {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2576    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2577   {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2578    sizeof (struct GNUNET_STREAM_MessageHeader)},
2579   {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2580    sizeof (struct GNUNET_STREAM_MessageHeader)},
2581   {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2582    sizeof (struct GNUNET_STREAM_MessageHeader)},
2583   {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2584    sizeof (struct GNUNET_STREAM_MessageHeader)},
2585   {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2586    sizeof (struct GNUNET_STREAM_MessageHeader)},
2587   {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2588    sizeof (struct GNUNET_STREAM_MessageHeader)},
2589   {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2590    sizeof (struct GNUNET_STREAM_MessageHeader)},
2591   {NULL, 0, 0}
2592 };
2593
2594
2595 /**
2596  * Function called when our target peer is connected to our tunnel
2597  *
2598  * @param cls the socket for which this tunnel is created
2599  * @param peer the peer identity of the target
2600  * @param atsi performance data for the connection
2601  */
2602 static void
2603 mesh_peer_connect_callback (void *cls,
2604                             const struct GNUNET_PeerIdentity *peer,
2605                             const struct GNUNET_ATS_Information * atsi)
2606 {
2607   struct GNUNET_STREAM_Socket *socket = cls;
2608   struct GNUNET_STREAM_MessageHeader *message;
2609   
2610   if (0 != memcmp (peer,
2611                    &socket->other_peer,
2612                    sizeof (struct GNUNET_PeerIdentity)))
2613   {
2614     LOG (GNUNET_ERROR_TYPE_DEBUG,
2615          "%s: A peer which is not our target has connected to our tunnel\n",
2616          GNUNET_i2s(peer));
2617     return;
2618   }
2619   
2620   LOG (GNUNET_ERROR_TYPE_DEBUG,
2621        "%s: Target peer %s connected\n",
2622        GNUNET_i2s (&socket->other_peer),
2623        GNUNET_i2s (&socket->other_peer));
2624   
2625   /* Set state to INIT */
2626   socket->state = STATE_INIT;
2627
2628   /* Send HELLO message */
2629   message = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
2630   message->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
2631   message->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
2632   queue_message (socket,
2633                  message,
2634                  &set_state_hello_wait,
2635                  NULL);
2636
2637   /* Call open callback */
2638   if (NULL == socket->open_cb)
2639   {
2640     LOG (GNUNET_ERROR_TYPE_DEBUG,
2641          "STREAM_open callback is NULL\n");
2642   }
2643 }
2644
2645
2646 /**
2647  * Function called when our target peer is disconnected from our tunnel
2648  *
2649  * @param cls the socket associated which this tunnel
2650  * @param peer the peer identity of the target
2651  */
2652 static void
2653 mesh_peer_disconnect_callback (void *cls,
2654                                const struct GNUNET_PeerIdentity *peer)
2655 {
2656   struct GNUNET_STREAM_Socket *socket=cls;
2657   
2658   /* If the state is SHUTDOWN its ok; else set the state of the socket to SYSERR */
2659   LOG (GNUNET_ERROR_TYPE_DEBUG,
2660        "%s: Other peer %s disconnected \n",
2661        GNUNET_i2s (&socket->other_peer),
2662        GNUNET_i2s (&socket->other_peer));
2663 }
2664
2665
2666 /**
2667  * Method called whenever a peer creates a tunnel to us
2668  *
2669  * @param cls closure
2670  * @param tunnel new handle to the tunnel
2671  * @param initiator peer that started the tunnel
2672  * @param atsi performance information for the tunnel
2673  * @return initial tunnel context for the tunnel
2674  *         (can be NULL -- that's not an error)
2675  */
2676 static void *
2677 new_tunnel_notify (void *cls,
2678                    struct GNUNET_MESH_Tunnel *tunnel,
2679                    const struct GNUNET_PeerIdentity *initiator,
2680                    const struct GNUNET_ATS_Information *atsi)
2681 {
2682   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2683   struct GNUNET_STREAM_Socket *socket;
2684
2685   /* FIXME: If a tunnel is already created, we should not accept new tunnels
2686      from the same peer again until the socket is closed */
2687
2688   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2689   socket->other_peer = *initiator;
2690   socket->tunnel = tunnel;
2691   socket->session_id = 0;       /* FIXME */
2692   socket->state = STATE_INIT;
2693   socket->lsocket = lsocket;
2694   socket->retransmit_timeout = lsocket->retransmit_timeout;
2695   socket->testing_active = lsocket->testing_active;
2696   socket->testing_set_write_sequence_number_value =
2697     lsocket->testing_set_write_sequence_number_value;
2698     
2699   LOG (GNUNET_ERROR_TYPE_DEBUG,
2700        "%s: Peer %s initiated tunnel to us\n", 
2701        GNUNET_i2s (&socket->other_peer),
2702        GNUNET_i2s (&socket->other_peer));
2703   
2704   /* FIXME: Copy MESH handle from lsocket to socket */
2705   
2706   return socket;
2707 }
2708
2709
2710 /**
2711  * Function called whenever an inbound tunnel is destroyed.  Should clean up
2712  * any associated state.  This function is NOT called if the client has
2713  * explicitly asked for the tunnel to be destroyed using
2714  * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
2715  * the tunnel.
2716  *
2717  * @param cls closure (set from GNUNET_MESH_connect)
2718  * @param tunnel connection to the other end (henceforth invalid)
2719  * @param tunnel_ctx place where local state associated
2720  *                   with the tunnel is stored
2721  */
2722 static void 
2723 tunnel_cleaner (void *cls,
2724                 const struct GNUNET_MESH_Tunnel *tunnel,
2725                 void *tunnel_ctx)
2726 {
2727   struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
2728
2729   if (tunnel != socket->tunnel)
2730     return;
2731
2732   GNUNET_break_op(0);
2733   LOG (GNUNET_ERROR_TYPE_DEBUG,
2734        "%s: Peer %s has terminated connection abruptly\n",
2735        GNUNET_i2s (&socket->other_peer),
2736        GNUNET_i2s (&socket->other_peer));
2737
2738   socket->status = GNUNET_STREAM_SHUTDOWN;
2739
2740   /* Clear Transmit handles */
2741   if (NULL != socket->transmit_handle)
2742   {
2743     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2744     socket->transmit_handle = NULL;
2745   }
2746   if (NULL != socket->ack_transmit_handle)
2747   {
2748     GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle);
2749     GNUNET_free (socket->ack_msg);
2750     socket->ack_msg = NULL;
2751     socket->ack_transmit_handle = NULL;
2752   }
2753   /* Stop Tasks using socket->tunnel */
2754   if (GNUNET_SCHEDULER_NO_TASK != socket->ack_task_id)
2755   {
2756     GNUNET_SCHEDULER_cancel (socket->ack_task_id);
2757     socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
2758   }
2759   if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
2760   {
2761     GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
2762     socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
2763   }
2764   /* FIXME: Cancel all other tasks using socket->tunnel */
2765   socket->tunnel = NULL;
2766 }
2767
2768
2769 /*****************/
2770 /* API functions */
2771 /*****************/
2772
2773
2774 /**
2775  * Tries to open a stream to the target peer
2776  *
2777  * @param cfg configuration to use
2778  * @param target the target peer to which the stream has to be opened
2779  * @param app_port the application port number which uniquely identifies this
2780  *            stream
2781  * @param open_cb this function will be called after stream has be established 
2782  * @param open_cb_cls the closure for open_cb
2783  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
2784  * @return if successful it returns the stream socket; NULL if stream cannot be
2785  *         opened 
2786  */
2787 struct GNUNET_STREAM_Socket *
2788 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
2789                     const struct GNUNET_PeerIdentity *target,
2790                     GNUNET_MESH_ApplicationType app_port,
2791                     GNUNET_STREAM_OpenCallback open_cb,
2792                     void *open_cb_cls,
2793                     ...)
2794 {
2795   struct GNUNET_STREAM_Socket *socket;
2796   enum GNUNET_STREAM_Option option;
2797   GNUNET_MESH_ApplicationType ports[] = {app_port, 0};
2798   va_list vargs;                /* Variable arguments */
2799
2800   LOG (GNUNET_ERROR_TYPE_DEBUG,
2801        "%s\n", __func__);
2802   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2803   socket->other_peer = *target;
2804   socket->open_cb = open_cb;
2805   socket->open_cls = open_cb_cls;
2806   /* Set defaults */
2807   socket->retransmit_timeout = 
2808     GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
2809   socket->testing_active = GNUNET_NO;
2810   va_start (vargs, open_cb_cls); /* Parse variable args */
2811   do {
2812     option = va_arg (vargs, enum GNUNET_STREAM_Option);
2813     switch (option)
2814     {
2815     case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
2816       /* Expect struct GNUNET_TIME_Relative */
2817       socket->retransmit_timeout = va_arg (vargs,
2818                                            struct GNUNET_TIME_Relative);
2819       break;
2820     case GNUNET_STREAM_OPTION_TESTING_SET_WRITE_SEQUENCE_NUMBER:
2821       socket->testing_active = GNUNET_YES;
2822       socket->testing_set_write_sequence_number_value = va_arg (vargs,
2823                                                                 uint32_t);
2824       break;
2825     case GNUNET_STREAM_OPTION_END:
2826       break;
2827     }
2828   } while (GNUNET_STREAM_OPTION_END != option);
2829   va_end (vargs);               /* End of variable args parsing */
2830   socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
2831                                       RECEIVE_BUFFER_SIZE,  /* QUEUE size as parameter? */
2832                                       socket, /* cls */
2833                                       NULL, /* No inbound tunnel handler */
2834                                       NULL, /* No in-tunnel cleaner */
2835                                       client_message_handlers,
2836                                       ports); /* We don't get inbound tunnels */
2837   if (NULL == socket->mesh)   /* Fail if we cannot connect to mesh */
2838   {
2839     GNUNET_free (socket);
2840     return NULL;
2841   }
2842
2843   /* Now create the mesh tunnel to target */
2844   LOG (GNUNET_ERROR_TYPE_DEBUG,
2845        "Creating MESH Tunnel\n");
2846   socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
2847                                               NULL, /* Tunnel context */
2848                                               &mesh_peer_connect_callback,
2849                                               &mesh_peer_disconnect_callback,
2850                                               socket);
2851   GNUNET_assert (NULL != socket->tunnel);
2852   GNUNET_MESH_peer_request_connect_add (socket->tunnel,
2853                                         &socket->other_peer);
2854   
2855   LOG (GNUNET_ERROR_TYPE_DEBUG,
2856        "%s() END\n", __func__);
2857   return socket;
2858 }
2859
2860
2861 /**
2862  * Shutdown the stream for reading or writing (similar to man 2 shutdown).
2863  *
2864  * @param socket the stream socket
2865  * @param operation SHUT_RD, SHUT_WR or SHUT_RDWR
2866  * @param completion_cb the callback that will be called upon successful
2867  *          shutdown of given operation
2868  * @param completion_cls the closure for the completion callback
2869  * @return the shutdown handle
2870  */
2871 struct GNUNET_STREAM_ShutdownHandle *
2872 GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
2873                         int operation,
2874                         GNUNET_STREAM_ShutdownCompletion completion_cb,
2875                         void *completion_cls)
2876 {
2877   struct GNUNET_STREAM_ShutdownHandle *handle;
2878   struct GNUNET_STREAM_MessageHeader *msg;
2879   
2880   GNUNET_assert (NULL == socket->shutdown_handle);
2881
2882   handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ShutdownHandle));
2883   handle->socket = socket;
2884   handle->completion_cb = completion_cb;
2885   handle->completion_cls = completion_cls;
2886   socket->shutdown_handle = handle;
2887
2888   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
2889   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
2890   switch (operation)
2891   {
2892   case SHUT_RD:
2893     handle->operation = SHUT_RD;
2894     if (NULL != socket->read_handle)
2895       LOG (GNUNET_ERROR_TYPE_WARNING,
2896            "Existing read handle should be cancelled before shutting"
2897            " down reading\n");
2898     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
2899     queue_message (socket,
2900                    msg,
2901                    &set_state_receive_close_wait,
2902                    NULL);
2903     break;
2904   case SHUT_WR:
2905     handle->operation = SHUT_WR;
2906     if (NULL != socket->write_handle)
2907       LOG (GNUNET_ERROR_TYPE_WARNING,
2908            "Existing write handle should be cancelled before shutting"
2909            " down writing\n");
2910     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
2911     queue_message (socket,
2912                    msg,
2913                    &set_state_transmit_close_wait,
2914                    NULL);
2915     break;
2916   case SHUT_RDWR:
2917     handle->operation = SHUT_RDWR;
2918     if (NULL != socket->write_handle)
2919       LOG (GNUNET_ERROR_TYPE_WARNING,
2920            "Existing write handle should be cancelled before shutting"
2921            " down writing\n");
2922     if (NULL != socket->read_handle)
2923       LOG (GNUNET_ERROR_TYPE_WARNING,
2924            "Existing read handle should be cancelled before shutting"
2925            " down reading\n");
2926     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
2927     queue_message (socket,
2928                    msg,
2929                    &set_state_close_wait,
2930                    NULL);
2931     break;
2932   default:
2933     LOG (GNUNET_ERROR_TYPE_WARNING,
2934          "GNUNET_STREAM_shutdown called with invalid value for "
2935          "parameter operation -- Ignoring\n");
2936     GNUNET_free (msg);
2937     GNUNET_free (handle);
2938     return NULL;
2939   }
2940   handle->close_msg_retransmission_task_id =
2941     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
2942                                   &close_msg_retransmission_task,
2943                                   handle);
2944   return handle;
2945 }
2946
2947
2948 /**
2949  * Cancels a pending shutdown
2950  *
2951  * @param handle the shutdown handle returned from GNUNET_STREAM_shutdown
2952  */
2953 void
2954 GNUNET_STREAM_shutdown_cancel (struct GNUNET_STREAM_ShutdownHandle *handle)
2955 {
2956   if (GNUNET_SCHEDULER_NO_TASK != handle->close_msg_retransmission_task_id)
2957     GNUNET_SCHEDULER_cancel (handle->close_msg_retransmission_task_id);
2958   GNUNET_free (handle);
2959   return;
2960 }
2961
2962
2963 /**
2964  * Closes the stream
2965  *
2966  * @param socket the stream socket
2967  */
2968 void
2969 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
2970 {
2971   struct MessageQueue *head;
2972
2973   if (NULL != socket->read_handle)
2974   {
2975     LOG (GNUNET_ERROR_TYPE_WARNING,
2976          "Closing STREAM socket when a read handle is pending\n");
2977   }
2978   if (NULL != socket->write_handle)
2979   {
2980     LOG (GNUNET_ERROR_TYPE_WARNING,
2981          "Closing STREAM socket when a write handle is pending\n");
2982     GNUNET_STREAM_io_write_cancel (socket->write_handle);
2983     //socket->write_handle = NULL;
2984   }
2985
2986   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
2987   {
2988     /* socket closed with read task pending!? */
2989     GNUNET_break (0);
2990     GNUNET_SCHEDULER_cancel (socket->read_task_id);
2991     socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
2992   }
2993   
2994   /* Terminate the ack'ing tasks if they are still present */
2995   if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK)
2996   {
2997     GNUNET_SCHEDULER_cancel (socket->ack_task_id);
2998     socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
2999   }
3000
3001   /* Clear Transmit handles */
3002   if (NULL != socket->transmit_handle)
3003   {
3004     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
3005     socket->transmit_handle = NULL;
3006   }
3007   if (NULL != socket->ack_transmit_handle)
3008   {
3009     GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle);
3010     GNUNET_free (socket->ack_msg);
3011     socket->ack_msg = NULL;
3012     socket->ack_transmit_handle = NULL;
3013   }
3014
3015   /* Clear existing message queue */
3016   while (NULL != (head = socket->queue_head)) {
3017     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
3018                                  socket->queue_tail,
3019                                  head);
3020     GNUNET_free (head->message);
3021     GNUNET_free (head);
3022   }
3023
3024   /* Close associated tunnel */
3025   if (NULL != socket->tunnel)
3026   {
3027     GNUNET_MESH_tunnel_destroy (socket->tunnel);
3028     socket->tunnel = NULL;
3029   }
3030
3031   /* Close mesh connection */
3032   if (NULL != socket->mesh && NULL == socket->lsocket)
3033   {
3034     GNUNET_MESH_disconnect (socket->mesh);
3035     socket->mesh = NULL;
3036   }
3037   
3038   /* Release receive buffer */
3039   if (NULL != socket->receive_buffer)
3040   {
3041     GNUNET_free (socket->receive_buffer);
3042   }
3043
3044   GNUNET_free (socket);
3045 }
3046
3047
3048 /**
3049  * Listens for stream connections for a specific application ports
3050  *
3051  * @param cfg the configuration to use
3052  * @param app_port the application port for which new streams will be accepted
3053  * @param listen_cb this function will be called when a peer tries to establish
3054  *            a stream with us
3055  * @param listen_cb_cls closure for listen_cb
3056  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
3057  * @return listen socket, NULL for any error
3058  */
3059 struct GNUNET_STREAM_ListenSocket *
3060 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
3061                       GNUNET_MESH_ApplicationType app_port,
3062                       GNUNET_STREAM_ListenCallback listen_cb,
3063                       void *listen_cb_cls,
3064                       ...)
3065 {
3066   /* FIXME: Add variable args for passing configration options? */
3067   struct GNUNET_STREAM_ListenSocket *lsocket;
3068   GNUNET_MESH_ApplicationType ports[] = {app_port, 0};
3069   enum GNUNET_STREAM_Option option;
3070   va_list vargs;
3071
3072   lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
3073   /* Set defaults */
3074   lsocket->retransmit_timeout = 
3075     GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
3076   lsocket->testing_active = GNUNET_NO;
3077   va_start (vargs, listen_cb_cls);
3078   do {
3079     option = va_arg (vargs, enum GNUNET_STREAM_Option);
3080     switch (option)
3081     {
3082     case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
3083       lsocket->retransmit_timeout = va_arg (vargs,
3084                                             struct GNUNET_TIME_Relative);
3085       break;
3086     case GNUNET_STREAM_OPTION_TESTING_SET_WRITE_SEQUENCE_NUMBER:
3087       lsocket->testing_active = GNUNET_YES;
3088       lsocket->testing_set_write_sequence_number_value = va_arg (vargs,
3089                                                                  uint32_t);
3090       break;
3091     case GNUNET_STREAM_OPTION_END:
3092       break;
3093     }
3094   } while (GNUNET_STREAM_OPTION_END != option);
3095   va_end (vargs);
3096   lsocket->port = app_port;
3097   lsocket->listen_cb = listen_cb;
3098   lsocket->listen_cb_cls = listen_cb_cls;
3099   lsocket->mesh = GNUNET_MESH_connect (cfg,
3100                                        RECEIVE_BUFFER_SIZE, /* FIXME: QUEUE size as parameter? */
3101                                        lsocket, /* Closure */
3102                                        &new_tunnel_notify,
3103                                        &tunnel_cleaner,
3104                                        server_message_handlers,
3105                                        ports);
3106   GNUNET_assert (NULL != lsocket->mesh);
3107   return lsocket;
3108 }
3109
3110
3111 /**
3112  * Closes the listen socket
3113  *
3114  * @param lsocket the listen socket
3115  */
3116 void
3117 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
3118 {
3119   /* Close MESH connection */
3120   GNUNET_assert (NULL != lsocket->mesh);
3121   GNUNET_MESH_disconnect (lsocket->mesh);
3122   
3123   GNUNET_free (lsocket);
3124 }
3125
3126
3127 /**
3128  * Tries to write the given data to the stream. The maximum size of data that
3129  * can be written as part of a write operation is (64 * (64000 - sizeof (struct
3130  * GNUNET_STREAM_DataMessage))). If size is greater than this it is not an API
3131  * violation, however only the said number of maximum bytes will be written.
3132  *
3133  * @param socket the socket representing a stream
3134  * @param data the data buffer from where the data is written into the stream
3135  * @param size the number of bytes to be written from the data buffer
3136  * @param timeout the timeout period
3137  * @param write_cont the function to call upon writing some bytes into the
3138  *          stream 
3139  * @param write_cont_cls the closure
3140  *
3141  * @return handle to cancel the operation; if a previous write is pending or
3142  *           the stream has been shutdown for this operation then write_cont is
3143  *           immediately called and NULL is returned.
3144  */
3145 struct GNUNET_STREAM_IOWriteHandle *
3146 GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
3147                      const void *data,
3148                      size_t size,
3149                      struct GNUNET_TIME_Relative timeout,
3150                      GNUNET_STREAM_CompletionContinuation write_cont,
3151                      void *write_cont_cls)
3152 {
3153   unsigned int num_needed_packets;
3154   unsigned int packet;
3155   struct GNUNET_STREAM_IOWriteHandle *io_handle;
3156   uint32_t packet_size;
3157   uint32_t payload_size;
3158   struct GNUNET_STREAM_DataMessage *data_msg;
3159   const void *sweep;
3160   struct GNUNET_TIME_Relative ack_deadline;
3161
3162   LOG (GNUNET_ERROR_TYPE_DEBUG,
3163        "%s\n", __func__);
3164
3165   /* Return NULL if there is already a write request pending */
3166   if (NULL != socket->write_handle)
3167   {
3168     GNUNET_break (0);
3169     return NULL;
3170   }
3171
3172   switch (socket->state)
3173   {
3174   case STATE_TRANSMIT_CLOSED:
3175   case STATE_TRANSMIT_CLOSE_WAIT:
3176   case STATE_CLOSED:
3177   case STATE_CLOSE_WAIT:
3178     if (NULL != write_cont)
3179       write_cont (write_cont_cls, GNUNET_STREAM_SHUTDOWN, 0);
3180     LOG (GNUNET_ERROR_TYPE_DEBUG,
3181          "%s() END\n", __func__);
3182     return NULL;
3183   case STATE_INIT:
3184   case STATE_LISTEN:
3185   case STATE_HELLO_WAIT:
3186     if (NULL != write_cont)
3187       /* FIXME: GNUNET_STREAM_SYSERR?? */
3188       write_cont (write_cont_cls, GNUNET_STREAM_SYSERR, 0);
3189     LOG (GNUNET_ERROR_TYPE_DEBUG,
3190          "%s() END\n", __func__);
3191     return NULL;
3192   case STATE_ESTABLISHED:
3193   case STATE_RECEIVE_CLOSED:
3194   case STATE_RECEIVE_CLOSE_WAIT:
3195     break;
3196   }
3197
3198   if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size)
3199     size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH  * max_payload_size;
3200   num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size;
3201   io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
3202   io_handle->socket = socket;
3203   io_handle->write_cont = write_cont;
3204   io_handle->write_cont_cls = write_cont_cls;
3205   io_handle->size = size;
3206   sweep = data;
3207   /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3208      determined from RTT */
3209   ack_deadline = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
3210   /* Divide the given buffer into packets for sending */
3211   for (packet=0; packet < num_needed_packets; packet++)
3212   {
3213     if ((packet + 1) * max_payload_size < size) 
3214     {
3215       payload_size = max_payload_size;
3216       packet_size = MAX_PACKET_SIZE;
3217     }
3218     else 
3219     {
3220       payload_size = size - packet * max_payload_size;
3221       packet_size =  payload_size + sizeof (struct
3222                                             GNUNET_STREAM_DataMessage); 
3223     }
3224     io_handle->messages[packet] = GNUNET_malloc (packet_size);
3225     io_handle->messages[packet]->header.header.size = htons (packet_size);
3226     io_handle->messages[packet]->header.header.type =
3227       htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
3228     io_handle->messages[packet]->sequence_number =
3229       htonl (socket->write_sequence_number++);
3230     io_handle->messages[packet]->offset = htonl (socket->write_offset);
3231
3232     /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3233        determined from RTT */
3234     io_handle->messages[packet]->ack_deadline =
3235       GNUNET_TIME_relative_hton (ack_deadline);
3236     data_msg = io_handle->messages[packet];
3237     /* Copy data from given buffer to the packet */
3238     memcpy (&data_msg[1],
3239             sweep,
3240             payload_size);
3241     sweep += payload_size;
3242     socket->write_offset += payload_size;
3243   }
3244   socket->write_handle = io_handle;
3245   write_data (socket);
3246
3247   LOG (GNUNET_ERROR_TYPE_DEBUG,
3248        "%s() END\n", __func__);
3249
3250   return io_handle;
3251 }
3252
3253
3254
3255 /**
3256  * Tries to read data from the stream.
3257  *
3258  * @param socket the socket representing a stream
3259  * @param timeout the timeout period
3260  * @param proc function to call with data (once only)
3261  * @param proc_cls the closure for proc
3262  *
3263  * @return handle to cancel the operation; if the stream has been shutdown for
3264  *           this type of opeartion then the DataProcessor is immediately
3265  *           called with GNUNET_STREAM_SHUTDOWN as status and NULL if returned
3266  */
3267 struct GNUNET_STREAM_IOReadHandle *
3268 GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
3269                     struct GNUNET_TIME_Relative timeout,
3270                     GNUNET_STREAM_DataProcessor proc,
3271                     void *proc_cls)
3272 {
3273   struct GNUNET_STREAM_IOReadHandle *read_handle;
3274   
3275   LOG (GNUNET_ERROR_TYPE_DEBUG,
3276        "%s: %s()\n", 
3277        GNUNET_i2s (&socket->other_peer),
3278        __func__);
3279
3280   /* Return NULL if there is already a read handle; the user has to cancel that
3281      first before continuing or has to wait until it is completed */
3282   if (NULL != socket->read_handle) return NULL;
3283
3284   GNUNET_assert (NULL != proc);
3285
3286   switch (socket->state)
3287   {
3288   case STATE_RECEIVE_CLOSED:
3289   case STATE_RECEIVE_CLOSE_WAIT:
3290   case STATE_CLOSED:
3291   case STATE_CLOSE_WAIT:
3292     proc (proc_cls, GNUNET_STREAM_SHUTDOWN, NULL, 0);
3293     LOG (GNUNET_ERROR_TYPE_DEBUG,
3294          "%s: %s() END\n",
3295          GNUNET_i2s (&socket->other_peer),
3296          __func__);
3297     return NULL;
3298   default:
3299     break;
3300   }
3301
3302   read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
3303   read_handle->proc = proc;
3304   read_handle->proc_cls = proc_cls;
3305   socket->read_handle = read_handle;
3306
3307   /* Check if we have a packet at bitmap 0 */
3308   if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
3309                                           0))
3310   {
3311     socket->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
3312                                                      socket);
3313    
3314   }
3315   
3316   /* Setup the read timeout task */
3317   socket->read_io_timeout_task_id =
3318     GNUNET_SCHEDULER_add_delayed (timeout,
3319                                   &read_io_timeout,
3320                                   socket);
3321   LOG (GNUNET_ERROR_TYPE_DEBUG,
3322        "%s: %s() END\n",
3323        GNUNET_i2s (&socket->other_peer),
3324        __func__);
3325   return read_handle;
3326 }
3327
3328
3329 /**
3330  * Cancel pending write operation.
3331  *
3332  * @param ioh handle to operation to cancel
3333  */
3334 void
3335 GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
3336 {
3337   struct GNUNET_STREAM_Socket *socket = ioh->socket;
3338   unsigned int packet;
3339
3340   GNUNET_assert (NULL != socket->write_handle);
3341   GNUNET_assert (socket->write_handle == ioh);
3342
3343   if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
3344   {
3345     GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
3346     socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
3347   }
3348
3349   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
3350   {
3351     if (NULL == ioh->messages[packet]) break;
3352     GNUNET_free (ioh->messages[packet]);
3353   }
3354       
3355   GNUNET_free (socket->write_handle);
3356   socket->write_handle = NULL;
3357   return;
3358 }
3359
3360
3361 /**
3362  * Cancel pending read operation.
3363  *
3364  * @param ioh handle to operation to cancel
3365  */
3366 void
3367 GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh)
3368 {
3369   return;
3370 }