214ac4e41ef372e0007482112c24de9a5150f234
[oweals/gnunet.git] / src / stream / stream_api.c
1 /*
2   This file is part of GNUnet.
3   (C) 2012 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /* TODO:
22  *
23  * Checks for matching the sender and socket->other_peer in server
24  * message handlers  
25  *
26  * Add code for write io timeout
27  *
28  * Include retransmission for control messages
29  **/
30
31 /**
32  * @file stream/stream_api.c
33  * @brief Implementation of the stream library
34  * @author Sree Harsha Totakura
35  */
36
37
38 #include "platform.h"
39 #include "gnunet_common.h"
40 #include "gnunet_crypto_lib.h"
41 #include "gnunet_stream_lib.h"
42 #include "stream_protocol.h"
43
44 #define LOG(kind,...)                                   \
45   GNUNET_log_from (kind, "stream-api", __VA_ARGS__)
46
47 /**
48  * The maximum packet size of a stream packet
49  */
50 #define MAX_PACKET_SIZE 512//64000
51
52 /**
53  * Receive buffer
54  */
55 #define RECEIVE_BUFFER_SIZE 4096000
56
57 /**
58  * The maximum payload a data message packet can carry
59  */
60 static const size_t max_payload_size = 
61   MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
62
63 /**
64  * states in the Protocol
65  */
66 enum State
67   {
68     /**
69      * Client initialization state
70      */
71     STATE_INIT,
72
73     /**
74      * Listener initialization state 
75      */
76     STATE_LISTEN,
77
78     /**
79      * Pre-connection establishment state
80      */
81     STATE_HELLO_WAIT,
82
83     /**
84      * State where a connection has been established
85      */
86     STATE_ESTABLISHED,
87
88     /**
89      * State where the socket is closed on our side and waiting to be ACK'ed
90      */
91     STATE_RECEIVE_CLOSE_WAIT,
92
93     /**
94      * State where the socket is closed for reading
95      */
96     STATE_RECEIVE_CLOSED,
97
98     /**
99      * State where the socket is closed on our side and waiting to be ACK'ed
100      */
101     STATE_TRANSMIT_CLOSE_WAIT,
102
103     /**
104      * State where the socket is closed for writing
105      */
106     STATE_TRANSMIT_CLOSED,
107
108     /**
109      * State where the socket is closed on our side and waiting to be ACK'ed
110      */
111     STATE_CLOSE_WAIT,
112
113     /**
114      * State where the socket is closed
115      */
116     STATE_CLOSED 
117   };
118
119
120 /**
121  * Functions of this type are called when a message is written
122  *
123  * @param cls the closure from queue_message
124  * @param socket the socket the written message was bound to
125  */
126 typedef void (*SendFinishCallback) (void *cls,
127                                     struct GNUNET_STREAM_Socket *socket);
128
129
130 /**
131  * The send message queue
132  */
133 struct MessageQueue
134 {
135   /**
136    * The message
137    */
138   struct GNUNET_STREAM_MessageHeader *message;
139
140   /**
141    * Callback to be called when the message is sent
142    */
143   SendFinishCallback finish_cb;
144
145   /**
146    * The closure for finish_cb
147    */
148   void *finish_cb_cls;
149
150   /**
151    * The next message in queue. Should be NULL in the last message
152    */
153   struct MessageQueue *next;
154
155   /**
156    * The next message in queue. Should be NULL in the first message
157    */
158   struct MessageQueue *prev;
159 };
160
161
162 /**
163  * The STREAM Socket Handler
164  */
165 struct GNUNET_STREAM_Socket
166 {
167   /**
168    * Retransmission timeout
169    */
170   struct GNUNET_TIME_Relative retransmit_timeout;
171
172   /**
173    * The Acknowledgement Bitmap
174    */
175   GNUNET_STREAM_AckBitmap ack_bitmap;
176
177   /**
178    * Time when the Acknowledgement was queued
179    */
180   struct GNUNET_TIME_Absolute ack_time_registered;
181
182   /**
183    * Queued Acknowledgement deadline
184    */
185   struct GNUNET_TIME_Relative ack_time_deadline;
186
187   /**
188    * The mesh handle
189    */
190   struct GNUNET_MESH_Handle *mesh;
191
192   /**
193    * The mesh tunnel handle
194    */
195   struct GNUNET_MESH_Tunnel *tunnel;
196
197   /**
198    * Stream open closure
199    */
200   void *open_cls;
201
202   /**
203    * Stream open callback
204    */
205   GNUNET_STREAM_OpenCallback open_cb;
206
207   /**
208    * The current transmit handle (if a pending transmit request exists)
209    */
210   struct GNUNET_MESH_TransmitHandle *transmit_handle;
211
212   /**
213    * The current act transmit handle (if a pending ack transmit request exists)
214    */
215   struct GNUNET_MESH_TransmitHandle *ack_transmit_handle;
216
217   /**
218    * Pointer to the current ack message using in ack_task
219    */
220   struct GNUNET_STREAM_AckMessage *ack_msg;
221
222   /**
223    * The current message associated with the transmit handle
224    */
225   struct MessageQueue *queue_head;
226
227   /**
228    * The queue tail, should always point to the last message in queue
229    */
230   struct MessageQueue *queue_tail;
231
232   /**
233    * The write IO_handle associated with this socket
234    */
235   struct GNUNET_STREAM_IOWriteHandle *write_handle;
236
237   /**
238    * The read IO_handle associated with this socket
239    */
240   struct GNUNET_STREAM_IOReadHandle *read_handle;
241
242   /**
243    * The shutdown handle associated with this socket
244    */
245   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
246
247   /**
248    * Buffer for storing received messages
249    */
250   void *receive_buffer;
251
252   /**
253    * The listen socket from which this socket is derived. Should be NULL if it
254    * is not a derived socket
255    */
256   struct GNUNET_STREAM_ListenSocket *lsocket;
257
258   /**
259    * The peer identity of the peer at the other end of the stream
260    */
261   struct GNUNET_PeerIdentity other_peer;
262
263   /**
264    * Task identifier for the read io timeout task
265    */
266   GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id;
267
268   /**
269    * Task identifier for retransmission task after timeout
270    */
271   GNUNET_SCHEDULER_TaskIdentifier retransmission_timeout_task_id;
272
273   /**
274    * The task for sending timely Acks
275    */
276   GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
277
278   /**
279    * Task scheduled to continue a read operation.
280    */
281   GNUNET_SCHEDULER_TaskIdentifier read_task_id;
282
283   /**
284    * The state of the protocol associated with this socket
285    */
286   enum State state;
287
288   /**
289    * The status of the socket
290    */
291   enum GNUNET_STREAM_Status status;
292
293   /**
294    * The number of previous timeouts; FIXME: currently not used
295    */
296   unsigned int retries;
297
298   /**
299    * The application port number (type: uint32_t)
300    */
301   GNUNET_MESH_ApplicationType app_port;
302
303   /**
304    * Whether testing mode is active or not
305    */
306   int testing_active;
307
308   /**
309    * The write sequence number to be set incase of testing
310    */
311   uint32_t testing_set_write_sequence_number_value;
312
313   /**
314    * The session id associated with this stream connection
315    * FIXME: Not used currently, may be removed
316    */
317   uint32_t session_id;
318
319   /**
320    * Write sequence number. Set to random when sending HELLO(client) and
321    * HELLO_ACK(server) 
322    */
323   uint32_t write_sequence_number;
324
325   /**
326    * Read sequence number. This number's value is determined during handshake
327    */
328   uint32_t read_sequence_number;
329
330   /**
331    * The receiver buffer size
332    */
333   uint32_t receive_buffer_size;
334
335   /**
336    * The receiver buffer boundaries
337    */
338   uint32_t receive_buffer_boundaries[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
339
340   /**
341    * receiver's available buffer after the last acknowledged packet
342    */
343   uint32_t receiver_window_available;
344
345   /**
346    * The offset pointer used during write operation
347    */
348   uint32_t write_offset;
349
350   /**
351    * The offset after which we are expecting data
352    */
353   uint32_t read_offset;
354
355   /**
356    * The offset upto which user has read from the received buffer
357    */
358   uint32_t copy_offset;
359 };
360
361
362 /**
363  * A socket for listening
364  */
365 struct GNUNET_STREAM_ListenSocket
366 {
367   /**
368    * The mesh handle
369    */
370   struct GNUNET_MESH_Handle *mesh;
371
372   /**
373    * The callback function which is called after successful opening socket
374    */
375   GNUNET_STREAM_ListenCallback listen_cb;
376
377   /**
378    * The call back closure
379    */
380   void *listen_cb_cls;
381
382   /**
383    * The service port
384    * FIXME: Remove if not required!
385    */
386   GNUNET_MESH_ApplicationType port;
387
388   /**
389    * The retransmit timeout
390    */
391   struct GNUNET_TIME_Relative retransmit_timeout;
392   
393   /**
394    * Whether testing mode is active or not
395    */
396   int testing_active;
397
398   /**
399    * The write sequence number to be set incase of testing
400    */
401   uint32_t testing_set_write_sequence_number_value;
402 };
403
404
405 /**
406  * The IO Write Handle
407  */
408 struct GNUNET_STREAM_IOWriteHandle
409 {
410   /**
411    * The socket to which this write handle is associated
412    */
413   struct GNUNET_STREAM_Socket *socket;
414
415   /**
416    * The packet_buffers associated with this Handle
417    */
418   struct GNUNET_STREAM_DataMessage *messages[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
419
420   /**
421    * The write continuation callback
422    */
423   GNUNET_STREAM_CompletionContinuation write_cont;
424
425   /**
426    * Write continuation closure
427    */
428   void *write_cont_cls;
429
430   /**
431    * The bitmap of this IOHandle; Corresponding bit for a message is set when
432    * it has been acknowledged by the receiver
433    */
434   GNUNET_STREAM_AckBitmap ack_bitmap;
435
436   /**
437    * Number of bytes in this write handle
438    */
439   size_t size;
440 };
441
442
443 /**
444  * The IO Read Handle
445  */
446 struct GNUNET_STREAM_IOReadHandle
447 {
448   /**
449    * Callback for the read processor
450    */
451   GNUNET_STREAM_DataProcessor proc;
452
453   /**
454    * The closure pointer for the read processor callback
455    */
456   void *proc_cls;
457 };
458
459
460 /**
461  * Handle for Shutdown
462  */
463 struct GNUNET_STREAM_ShutdownHandle
464 {
465   /**
466    * The socket associated with this shutdown handle
467    */
468   struct GNUNET_STREAM_Socket *socket;
469
470   /**
471    * Shutdown completion callback
472    */
473   GNUNET_STREAM_ShutdownCompletion completion_cb;
474
475   /**
476    * Closure for completion callback
477    */
478   void *completion_cls;
479
480   /**
481    * Close message retransmission task id
482    */
483   GNUNET_SCHEDULER_TaskIdentifier close_msg_retransmission_task_id;
484
485   /**
486    * Which operation to shutdown? SHUT_RD, SHUT_WR or SHUT_RDWR
487    */
488   int operation;  
489 };
490
491
492 /**
493  * Default value in seconds for various timeouts
494  */
495 static unsigned int default_timeout = 10;
496
497
498 /**
499  * Callback function for sending queued message
500  *
501  * @param cls closure the socket
502  * @param size number of bytes available in buf
503  * @param buf where the callee should write the message
504  * @return number of bytes written to buf
505  */
506 static size_t
507 send_message_notify (void *cls, size_t size, void *buf)
508 {
509   struct GNUNET_STREAM_Socket *socket = cls;
510   struct MessageQueue *head;
511   size_t ret;
512
513   socket->transmit_handle = NULL; /* Remove the transmit handle */
514   head = socket->queue_head;
515   if (NULL == head)
516     return 0; /* just to be safe */
517   if (0 == size)                /* request timed out */
518   {
519     socket->retries++;
520     LOG (GNUNET_ERROR_TYPE_DEBUG,
521          "%s: Message sending timed out. Retry %d \n",
522          GNUNET_i2s (&socket->other_peer),
523          socket->retries);
524     socket->transmit_handle = 
525       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
526                                          0, /* Corking */
527                                          1, /* Priority */
528                                          /* FIXME: exponential backoff */
529                                          socket->retransmit_timeout,
530                                          &socket->other_peer,
531                                          ntohs (head->message->header.size),
532                                          &send_message_notify,
533                                          socket);
534     return 0;
535   }
536
537   ret = ntohs (head->message->header.size);
538   GNUNET_assert (size >= ret);
539   memcpy (buf, head->message, ret);
540   if (NULL != head->finish_cb)
541   {
542     head->finish_cb (head->finish_cb_cls, socket);
543   }
544   GNUNET_CONTAINER_DLL_remove (socket->queue_head,
545                                socket->queue_tail,
546                                head);
547   GNUNET_free (head->message);
548   GNUNET_free (head);
549   head = socket->queue_head;
550   if (NULL != head)    /* more pending messages to send */
551   {
552     socket->retries = 0;
553     socket->transmit_handle = 
554       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
555                                          0, /* Corking */
556                                          1, /* Priority */
557                                          /* FIXME: exponential backoff */
558                                          socket->retransmit_timeout,
559                                          &socket->other_peer,
560                                          ntohs (head->message->header.size),
561                                          &send_message_notify,
562                                          socket);
563   }
564   return ret;
565 }
566
567
568 /**
569  * Queues a message for sending using the mesh connection of a socket
570  *
571  * @param socket the socket whose mesh connection is used
572  * @param message the message to be sent
573  * @param finish_cb the callback to be called when the message is sent
574  * @param finish_cb_cls the closure for the callback
575  */
576 static void
577 queue_message (struct GNUNET_STREAM_Socket *socket,
578                struct GNUNET_STREAM_MessageHeader *message,
579                SendFinishCallback finish_cb,
580                void *finish_cb_cls)
581 {
582   struct MessageQueue *queue_entity;
583
584   GNUNET_assert 
585     ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA)
586      && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK));
587
588   LOG (GNUNET_ERROR_TYPE_DEBUG,
589        "%s: Queueing message of type %d and size %d\n",
590        GNUNET_i2s (&socket->other_peer),
591        ntohs (message->header.type),
592        ntohs (message->header.size));
593   GNUNET_assert (NULL != message);
594   queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
595   queue_entity->message = message;
596   queue_entity->finish_cb = finish_cb;
597   queue_entity->finish_cb_cls = finish_cb_cls;
598   GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
599                                     socket->queue_tail,
600                                     queue_entity);
601   if (NULL == socket->transmit_handle)
602   {
603     socket->retries = 0;
604     socket->transmit_handle = 
605       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
606                                          0, /* Corking */
607                                          1, /* Priority */
608                                          socket->retransmit_timeout,
609                                          &socket->other_peer,
610                                          ntohs (message->header.size),
611                                          &send_message_notify,
612                                          socket);
613   }
614 }
615
616
617 /**
618  * Copies a message and queues it for sending using the mesh connection of
619  * given socket 
620  *
621  * @param socket the socket whose mesh connection is used
622  * @param message the message to be sent
623  * @param finish_cb the callback to be called when the message is sent
624  * @param finish_cb_cls the closure for the callback
625  */
626 static void
627 copy_and_queue_message (struct GNUNET_STREAM_Socket *socket,
628                         const struct GNUNET_STREAM_MessageHeader *message,
629                         SendFinishCallback finish_cb,
630                         void *finish_cb_cls)
631 {
632   struct GNUNET_STREAM_MessageHeader *msg_copy;
633   uint16_t size;
634   
635   size = ntohs (message->header.size);
636   msg_copy = GNUNET_malloc (size);
637   memcpy (msg_copy, message, size);
638   queue_message (socket, msg_copy, finish_cb, finish_cb_cls);
639 }
640
641
642 /**
643  * Callback function for sending ack message
644  *
645  * @param cls closure the ACK message created in ack_task
646  * @param size number of bytes available in buffer
647  * @param buf where the callee should write the message
648  * @return number of bytes written to buf
649  */
650 static size_t
651 send_ack_notify (void *cls, size_t size, void *buf)
652 {
653   struct GNUNET_STREAM_Socket *socket = cls;
654
655   if (0 == size)
656   {
657     LOG (GNUNET_ERROR_TYPE_DEBUG,
658          "%s called with size 0\n", __func__);
659     return 0;
660   }
661   GNUNET_assert (ntohs (socket->ack_msg->header.header.size) <= size);
662   
663   size = ntohs (socket->ack_msg->header.header.size);
664   memcpy (buf, socket->ack_msg, size);
665   
666   GNUNET_free (socket->ack_msg);
667   socket->ack_msg = NULL;
668   socket->ack_transmit_handle = NULL;
669   return size;
670 }
671
672 /**
673  * Writes data using the given socket. The amount of data written is limited by
674  * the receiver_window_size
675  *
676  * @param socket the socket to use
677  */
678 static void 
679 write_data (struct GNUNET_STREAM_Socket *socket);
680
681 /**
682  * Task for retransmitting data messages if they aren't ACK before their ack
683  * deadline 
684  *
685  * @param cls the socket
686  * @param tc the Task context
687  */
688 static void
689 retransmission_timeout_task (void *cls,
690                              const struct GNUNET_SCHEDULER_TaskContext *tc)
691 {
692   struct GNUNET_STREAM_Socket *socket = cls;
693   
694   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
695     return;
696
697   LOG (GNUNET_ERROR_TYPE_DEBUG,
698        "%s: Retransmitting DATA...\n", GNUNET_i2s (&socket->other_peer));
699   socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
700   write_data (socket);
701 }
702
703
704 /**
705  * Task for sending ACK message
706  *
707  * @param cls the socket
708  * @param tc the Task context
709  */
710 static void
711 ack_task (void *cls,
712           const struct GNUNET_SCHEDULER_TaskContext *tc)
713 {
714   struct GNUNET_STREAM_Socket *socket = cls;
715   struct GNUNET_STREAM_AckMessage *ack_msg;
716
717   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
718   {
719     return;
720   }
721   socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
722   /* Create the ACK Message */
723   ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
724   ack_msg->header.header.size = htons (sizeof (struct 
725                                                GNUNET_STREAM_AckMessage));
726   ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
727   ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
728   ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
729   ack_msg->receive_window_remaining = 
730     htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
731   socket->ack_msg = ack_msg;
732   /* Request MESH for sending ACK */
733   socket->ack_transmit_handle = 
734     GNUNET_MESH_notify_transmit_ready (socket->tunnel,
735                                        0, /* Corking */
736                                        1, /* Priority */
737                                        socket->retransmit_timeout,
738                                        &socket->other_peer,
739                                        ntohs (ack_msg->header.header.size),
740                                        &send_ack_notify,
741                                        socket);
742 }
743
744
745 /**
746  * Retransmission task for shutdown messages
747  *
748  * @param cls the shutdown handle
749  * @param tc the Task Context
750  */
751 static void
752 close_msg_retransmission_task (void *cls,
753                                const struct GNUNET_SCHEDULER_TaskContext *tc)
754 {
755   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle = cls;
756   struct GNUNET_STREAM_MessageHeader *msg;
757   struct GNUNET_STREAM_Socket *socket;
758
759   GNUNET_assert (NULL != shutdown_handle);
760   socket = shutdown_handle->socket;
761
762   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
763   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
764   switch (shutdown_handle->operation)
765   {
766   case SHUT_RDWR:
767     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
768     break;
769   case SHUT_RD:
770     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
771     break;
772   case SHUT_WR:
773     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
774     break;
775   default:
776     GNUNET_free (msg);
777     shutdown_handle->close_msg_retransmission_task_id = 
778       GNUNET_SCHEDULER_NO_TASK;
779     return;
780   }
781   queue_message (socket, msg, NULL, NULL);
782   shutdown_handle->close_msg_retransmission_task_id =
783     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
784                                   &close_msg_retransmission_task,
785                                   shutdown_handle);
786 }
787
788
789 /**
790  * Function to modify a bit in GNUNET_STREAM_AckBitmap
791  *
792  * @param bitmap the bitmap to modify
793  * @param bit the bit number to modify
794  * @param value GNUNET_YES to on, GNUNET_NO to off
795  */
796 static void
797 ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
798                       unsigned int bit, 
799                       int value)
800 {
801   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
802   if (GNUNET_YES == value)
803     *bitmap |= (1LL << bit);
804   else
805     *bitmap &= ~(1LL << bit);
806 }
807
808
809 /**
810  * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
811  *
812  * @param bitmap address of the bitmap that has to be checked
813  * @param bit the bit number to check
814  * @return GNUNET_YES if the bit is set; GNUNET_NO if not
815  */
816 static uint8_t
817 ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
818                       unsigned int bit)
819 {
820   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
821   return 0 != (*bitmap & (1LL << bit));
822 }
823
824
825 /**
826  * Writes data using the given socket. The amount of data written is limited by
827  * the receiver_window_size
828  *
829  * @param socket the socket to use
830  */
831 static void 
832 write_data (struct GNUNET_STREAM_Socket *socket)
833 {
834   struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
835   int packet;                   /* Although an int, should never be negative */
836   int ack_packet;
837
838   ack_packet = -1;
839   /* Find the last acknowledged packet */
840   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
841   {      
842     if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
843                                             packet))
844       ack_packet = packet;        
845     else if (NULL == io_handle->messages[packet])
846       break;
847   }
848   /* Resend packets which weren't ack'ed */
849   for (packet=0; packet < ack_packet; packet++)
850   {
851     if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
852                                            packet))
853     {
854       LOG (GNUNET_ERROR_TYPE_DEBUG,
855            "%s: Placing DATA message with sequence %u in send queue\n",
856            GNUNET_i2s (&socket->other_peer),
857            ntohl (io_handle->messages[packet]->sequence_number));
858       copy_and_queue_message (socket,
859                               &io_handle->messages[packet]->header,
860                               NULL,
861                               NULL);
862     }
863   }
864   packet = ack_packet + 1;
865   /* Now send new packets if there is enough buffer space */
866   while ( (NULL != io_handle->messages[packet]) &&
867           (socket->receiver_window_available 
868            >= ntohs (io_handle->messages[packet]->header.header.size)) &&
869           (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
870   {
871     socket->receiver_window_available -= 
872       ntohs (io_handle->messages[packet]->header.header.size);
873     LOG (GNUNET_ERROR_TYPE_DEBUG,
874          "%s: Placing DATA message with sequence %u in send queue\n",
875          GNUNET_i2s (&socket->other_peer),
876          ntohl (io_handle->messages[packet]->sequence_number));
877     copy_and_queue_message (socket,
878                             &io_handle->messages[packet]->header,
879                             NULL,
880                             NULL);
881     packet++;
882   }
883   if (GNUNET_SCHEDULER_NO_TASK == socket->retransmission_timeout_task_id)
884     socket->retransmission_timeout_task_id = 
885       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 
886                                     (GNUNET_TIME_UNIT_SECONDS, 8),
887                                     &retransmission_timeout_task,
888                                     socket);
889 }
890
891
892 /**
893  * Task for calling the read processor
894  *
895  * @param cls the socket
896  * @param tc the task context
897  */
898 static void
899 call_read_processor (void *cls,
900                      const struct GNUNET_SCHEDULER_TaskContext *tc)
901 {
902   struct GNUNET_STREAM_Socket *socket = cls;
903   size_t read_size;
904   size_t valid_read_size;
905   unsigned int packet;
906   uint32_t sequence_increase;
907   uint32_t offset_increase;
908
909   socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
910   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
911     return;
912
913   if (NULL == socket->receive_buffer) 
914     return;
915
916   GNUNET_assert (NULL != socket->read_handle);
917   GNUNET_assert (NULL != socket->read_handle->proc);
918
919   /* Check the bitmap for any holes */
920   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
921   {
922     if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
923                                            packet))
924       break;
925   }
926   /* We only call read processor if we have the first packet */
927   GNUNET_assert (0 < packet);
928   valid_read_size = 
929     socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
930   GNUNET_assert (0 != valid_read_size);
931   /* Cancel the read_io_timeout_task */
932   GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task_id);
933   socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
934   /* Call the data processor */
935   LOG (GNUNET_ERROR_TYPE_DEBUG,
936        "%s: Calling read processor\n",
937        GNUNET_i2s (&socket->other_peer));
938   read_size = 
939     socket->read_handle->proc (socket->read_handle->proc_cls,
940                                socket->status,
941                                socket->receive_buffer + socket->copy_offset,
942                                valid_read_size);
943   LOG (GNUNET_ERROR_TYPE_DEBUG,
944        "%s: Read processor read %d bytes\n",
945        GNUNET_i2s (&socket->other_peer), read_size);
946   LOG (GNUNET_ERROR_TYPE_DEBUG,
947        "%s: Read processor completed successfully\n",
948        GNUNET_i2s (&socket->other_peer));
949   /* Free the read handle */
950   GNUNET_free (socket->read_handle);
951   socket->read_handle = NULL;
952   GNUNET_assert (read_size <= valid_read_size);
953   socket->copy_offset += read_size;
954   /* Determine upto which packet we can remove from the buffer */
955   for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
956   {
957     if (socket->copy_offset == socket->receive_buffer_boundaries[packet])
958     { packet++; break; }
959     if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
960       break;
961   }
962
963   /* If no packets can be removed we can't move the buffer */
964   if (0 == packet) return;
965   sequence_increase = packet;
966   LOG (GNUNET_ERROR_TYPE_DEBUG,
967        "%s: Sequence increase after read processor completion: %u\n",
968        GNUNET_i2s (&socket->other_peer), sequence_increase);
969
970   /* Shift the data in the receive buffer */
971   socket->receive_buffer = 
972     memmove (socket->receive_buffer,
973              socket->receive_buffer 
974              + socket->receive_buffer_boundaries[sequence_increase-1],
975              socket->receive_buffer_size
976              - socket->receive_buffer_boundaries[sequence_increase-1]);
977   /* Shift the bitmap */
978   socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
979   /* Set read_sequence_number */
980   socket->read_sequence_number += sequence_increase;
981   /* Set read_offset */
982   offset_increase = socket->receive_buffer_boundaries[sequence_increase-1];
983   socket->read_offset += offset_increase;
984   /* Fix copy_offset */
985   GNUNET_assert (offset_increase <= socket->copy_offset);
986   socket->copy_offset -= offset_increase;
987   /* Fix relative boundaries */
988   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
989   {
990     if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase)
991     {
992       uint32_t ahead_buffer_boundary;
993
994       ahead_buffer_boundary = 
995         socket->receive_buffer_boundaries[packet + sequence_increase];
996       if (0 == ahead_buffer_boundary)
997         socket->receive_buffer_boundaries[packet] = 0;
998       else
999       {
1000         GNUNET_assert (offset_increase < ahead_buffer_boundary);
1001         socket->receive_buffer_boundaries[packet] = 
1002           ahead_buffer_boundary - offset_increase;
1003       }
1004     }
1005     else
1006       socket->receive_buffer_boundaries[packet] = 0;
1007   }
1008 }
1009
1010
1011 /**
1012  * Cancels the existing read io handle
1013  *
1014  * @param cls the closure from the SCHEDULER call
1015  * @param tc the task context
1016  */
1017 static void
1018 read_io_timeout (void *cls, 
1019                  const struct GNUNET_SCHEDULER_TaskContext *tc)
1020 {
1021   struct GNUNET_STREAM_Socket *socket = cls;
1022   GNUNET_STREAM_DataProcessor proc;
1023   void *proc_cls;
1024
1025   socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
1026   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
1027   {
1028     LOG (GNUNET_ERROR_TYPE_DEBUG,
1029          "%s: Read task timedout - Cancelling it\n",
1030          GNUNET_i2s (&socket->other_peer));
1031     GNUNET_SCHEDULER_cancel (socket->read_task_id);
1032     socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
1033   }
1034   GNUNET_assert (NULL != socket->read_handle);
1035   proc = socket->read_handle->proc;
1036   proc_cls = socket->read_handle->proc_cls;
1037
1038   GNUNET_free (socket->read_handle);
1039   socket->read_handle = NULL;
1040   /* Call the read processor to signal timeout */
1041   proc (proc_cls,
1042         GNUNET_STREAM_TIMEOUT,
1043         NULL,
1044         0);
1045 }
1046
1047
1048 /**
1049  * Handler for DATA messages; Same for both client and server
1050  *
1051  * @param socket the socket through which the ack was received
1052  * @param tunnel connection to the other end
1053  * @param sender who sent the message
1054  * @param msg the data message
1055  * @param atsi performance data for the connection
1056  * @return GNUNET_OK to keep the connection open,
1057  *         GNUNET_SYSERR to close it (signal serious error)
1058  */
1059 static int
1060 handle_data (struct GNUNET_STREAM_Socket *socket,
1061              struct GNUNET_MESH_Tunnel *tunnel,
1062              const struct GNUNET_PeerIdentity *sender,
1063              const struct GNUNET_STREAM_DataMessage *msg,
1064              const struct GNUNET_ATS_Information*atsi)
1065 {
1066   const void *payload;
1067   uint32_t bytes_needed;
1068   uint32_t relative_offset;
1069   uint32_t relative_sequence_number;
1070   uint16_t size;
1071
1072   size = htons (msg->header.header.size);
1073   if (size < sizeof (struct GNUNET_STREAM_DataMessage))
1074   {
1075     GNUNET_break_op (0);
1076     return GNUNET_SYSERR;
1077   }
1078
1079   if (0 != memcmp (sender,
1080                    &socket->other_peer,
1081                    sizeof (struct GNUNET_PeerIdentity)))
1082   {
1083     LOG (GNUNET_ERROR_TYPE_DEBUG,
1084          "%s: Received DATA from non-confirming peer\n",
1085          GNUNET_i2s (&socket->other_peer));
1086     return GNUNET_YES;
1087   }
1088
1089   switch (socket->state)
1090   {
1091   case STATE_ESTABLISHED:
1092   case STATE_TRANSMIT_CLOSED:
1093   case STATE_TRANSMIT_CLOSE_WAIT:
1094       
1095     /* check if the message's sequence number is in the range we are
1096        expecting */
1097     relative_sequence_number = 
1098       ntohl (msg->sequence_number) - socket->read_sequence_number;
1099     if ( relative_sequence_number > GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
1100     {
1101       LOG (GNUNET_ERROR_TYPE_DEBUG,
1102            "%s: Ignoring received message with sequence number %u\n",
1103            GNUNET_i2s (&socket->other_peer),
1104            ntohl (msg->sequence_number));
1105       /* Start ACK sending task if one is not already present */
1106       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1107       {
1108         socket->ack_task_id = 
1109           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1110                                         (msg->ack_deadline),
1111                                         &ack_task,
1112                                         socket);
1113       }
1114       return GNUNET_YES;
1115     }
1116       
1117     /* Check if we have already seen this message */
1118     if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
1119                                             relative_sequence_number))
1120     {
1121       LOG (GNUNET_ERROR_TYPE_DEBUG,
1122            "%s: Ignoring already received message with sequence number %u\n",
1123            GNUNET_i2s (&socket->other_peer),
1124            ntohl (msg->sequence_number));
1125       /* Start ACK sending task if one is not already present */
1126       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1127       {
1128         socket->ack_task_id = 
1129           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1130                                         (msg->ack_deadline),
1131                                         &ack_task,
1132                                         socket);
1133       }
1134       return GNUNET_YES;
1135     }
1136
1137     LOG (GNUNET_ERROR_TYPE_DEBUG,
1138          "%s: Receiving DATA with sequence number: %u and size: %d from %s\n",
1139          GNUNET_i2s (&socket->other_peer),
1140          ntohl (msg->sequence_number),
1141          ntohs (msg->header.header.size),
1142          GNUNET_i2s (&socket->other_peer));
1143       
1144     /* Check if we have to allocate the buffer */
1145     size -= sizeof (struct GNUNET_STREAM_DataMessage);
1146     relative_offset = ntohl (msg->offset) - socket->read_offset;
1147     bytes_needed = relative_offset + size;
1148     if (bytes_needed > socket->receive_buffer_size)
1149     {
1150       if (bytes_needed <= RECEIVE_BUFFER_SIZE)
1151       {
1152         socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
1153                                                  bytes_needed);
1154         socket->receive_buffer_size = bytes_needed;
1155       }
1156       else
1157       {
1158         LOG (GNUNET_ERROR_TYPE_DEBUG,
1159              "%s: Cannot accommodate packet %d as buffer is full\n",
1160              GNUNET_i2s (&socket->other_peer),
1161              ntohl (msg->sequence_number));
1162         return GNUNET_YES;
1163       }
1164     }
1165       
1166     /* Copy Data to buffer */
1167     payload = &msg[1];
1168     GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
1169     memcpy (socket->receive_buffer + relative_offset,
1170             payload,
1171             size);
1172     socket->receive_buffer_boundaries[relative_sequence_number] = 
1173       relative_offset + size;
1174       
1175     /* Modify the ACK bitmap */
1176     ackbitmap_modify_bit (&socket->ack_bitmap,
1177                           relative_sequence_number,
1178                           GNUNET_YES);
1179
1180     /* Start ACK sending task if one is not already present */
1181     if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1182     {
1183       socket->ack_task_id = 
1184         GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1185                                       (msg->ack_deadline),
1186                                       &ack_task,
1187                                       socket);
1188     }
1189
1190     if ((NULL != socket->read_handle) /* A read handle is waiting */
1191         /* There is no current read task */
1192         && (GNUNET_SCHEDULER_NO_TASK == socket->read_task_id)
1193         /* We have the first packet */
1194         && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap,
1195                                                0)))
1196     {
1197       LOG (GNUNET_ERROR_TYPE_DEBUG,
1198            "%s: Scheduling read processor\n",
1199            GNUNET_i2s (&socket->other_peer));
1200           
1201       socket->read_task_id = 
1202         GNUNET_SCHEDULER_add_now (&call_read_processor,
1203                                   socket);
1204     }
1205       
1206     break;
1207
1208   default:
1209     LOG (GNUNET_ERROR_TYPE_DEBUG,
1210          "%s: Received data message when it cannot be handled\n",
1211          GNUNET_i2s (&socket->other_peer));
1212     break;
1213   }
1214   return GNUNET_YES;
1215 }
1216
1217
1218 /**
1219  * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1220  *
1221  * @param cls the socket (set from GNUNET_MESH_connect)
1222  * @param tunnel connection to the other end
1223  * @param tunnel_ctx place to store local state associated with the tunnel
1224  * @param sender who sent the message
1225  * @param message the actual message
1226  * @param atsi performance data for the connection
1227  * @return GNUNET_OK to keep the connection open,
1228  *         GNUNET_SYSERR to close it (signal serious error)
1229  */
1230 static int
1231 client_handle_data (void *cls,
1232                     struct GNUNET_MESH_Tunnel *tunnel,
1233                     void **tunnel_ctx,
1234                     const struct GNUNET_PeerIdentity *sender,
1235                     const struct GNUNET_MessageHeader *message,
1236                     const struct GNUNET_ATS_Information*atsi)
1237 {
1238   struct GNUNET_STREAM_Socket *socket = cls;
1239
1240   return handle_data (socket, 
1241                       tunnel, 
1242                       sender, 
1243                       (const struct GNUNET_STREAM_DataMessage *) message, 
1244                       atsi);
1245 }
1246
1247
1248 /**
1249  * Callback to set state to ESTABLISHED
1250  *
1251  * @param cls the closure from queue_message FIXME: document
1252  * @param socket the socket to requiring state change
1253  */
1254 static void
1255 set_state_established (void *cls,
1256                        struct GNUNET_STREAM_Socket *socket)
1257 {
1258   LOG (GNUNET_ERROR_TYPE_DEBUG, 
1259        "%s: Attaining ESTABLISHED state\n",
1260        GNUNET_i2s (&socket->other_peer));
1261   socket->write_offset = 0;
1262   socket->read_offset = 0;
1263   socket->state = STATE_ESTABLISHED;
1264   /* FIXME: What if listen_cb is NULL */
1265   if (NULL != socket->lsocket)
1266   {
1267     LOG (GNUNET_ERROR_TYPE_DEBUG,
1268          "%s: Calling listen callback\n",
1269          GNUNET_i2s (&socket->other_peer));
1270     if (GNUNET_SYSERR == 
1271         socket->lsocket->listen_cb (socket->lsocket->listen_cb_cls,
1272                                     socket,
1273                                     &socket->other_peer))
1274     {
1275       socket->state = STATE_CLOSED;
1276       /* FIXME: We should close in a decent way */
1277       GNUNET_MESH_tunnel_destroy (socket->tunnel); /* Destroy the tunnel */
1278       GNUNET_free (socket);
1279     }
1280   }
1281   else if (socket->open_cb)
1282     socket->open_cb (socket->open_cls, socket);
1283 }
1284
1285
1286 /**
1287  * Callback to set state to HELLO_WAIT
1288  *
1289  * @param cls the closure from queue_message
1290  * @param socket the socket to requiring state change
1291  */
1292 static void
1293 set_state_hello_wait (void *cls,
1294                       struct GNUNET_STREAM_Socket *socket)
1295 {
1296   GNUNET_assert (STATE_INIT == socket->state);
1297   LOG (GNUNET_ERROR_TYPE_DEBUG, 
1298        "%s: Attaining HELLO_WAIT state\n",
1299        GNUNET_i2s (&socket->other_peer));
1300   socket->state = STATE_HELLO_WAIT;
1301 }
1302
1303
1304 /**
1305  * Callback to set state to CLOSE_WAIT
1306  *
1307  * @param cls the closure from queue_message
1308  * @param socket the socket requiring state change
1309  */
1310 static void
1311 set_state_close_wait (void *cls,
1312                       struct GNUNET_STREAM_Socket *socket)
1313 {
1314   LOG (GNUNET_ERROR_TYPE_DEBUG,
1315        "%s: Attaing CLOSE_WAIT state\n",
1316        GNUNET_i2s (&socket->other_peer));
1317   socket->state = STATE_CLOSE_WAIT;
1318   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1319   socket->receive_buffer = NULL;
1320   socket->receive_buffer_size = 0;
1321 }
1322
1323
1324 /**
1325  * Callback to set state to RECEIVE_CLOSE_WAIT
1326  *
1327  * @param cls the closure from queue_message
1328  * @param socket the socket requiring state change
1329  */
1330 static void
1331 set_state_receive_close_wait (void *cls,
1332                               struct GNUNET_STREAM_Socket *socket)
1333 {
1334   LOG (GNUNET_ERROR_TYPE_DEBUG,
1335        "%s: Attaing RECEIVE_CLOSE_WAIT state\n",
1336        GNUNET_i2s (&socket->other_peer));
1337   socket->state = STATE_RECEIVE_CLOSE_WAIT;
1338   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1339   socket->receive_buffer = NULL;
1340   socket->receive_buffer_size = 0;
1341 }
1342
1343
1344 /**
1345  * Callback to set state to TRANSMIT_CLOSE_WAIT
1346  *
1347  * @param cls the closure from queue_message
1348  * @param socket the socket requiring state change
1349  */
1350 static void
1351 set_state_transmit_close_wait (void *cls,
1352                                struct GNUNET_STREAM_Socket *socket)
1353 {
1354   LOG (GNUNET_ERROR_TYPE_DEBUG,
1355        "%s: Attaing TRANSMIT_CLOSE_WAIT state\n",
1356        GNUNET_i2s (&socket->other_peer));
1357   socket->state = STATE_TRANSMIT_CLOSE_WAIT;
1358 }
1359
1360
1361 /**
1362  * Callback to set state to CLOSED
1363  *
1364  * @param cls the closure from queue_message
1365  * @param socket the socket requiring state change
1366  */
1367 static void
1368 set_state_closed (void *cls,
1369                   struct GNUNET_STREAM_Socket *socket)
1370 {
1371   socket->state = STATE_CLOSED;
1372 }
1373
1374 /**
1375  * Returns a new HelloAckMessage. Also sets the write sequence number for the
1376  * socket
1377  *
1378  * @param socket the socket for which this HelloAckMessage has to be generated
1379  * @return the HelloAckMessage
1380  */
1381 static struct GNUNET_STREAM_HelloAckMessage *
1382 generate_hello_ack_msg (struct GNUNET_STREAM_Socket *socket)
1383 {
1384   struct GNUNET_STREAM_HelloAckMessage *msg;
1385
1386   /* Get the random sequence number */
1387   if (GNUNET_YES == socket->testing_active)
1388     socket->write_sequence_number =
1389       socket->testing_set_write_sequence_number_value;
1390   else
1391     socket->write_sequence_number = 
1392       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1393   LOG (GNUNET_ERROR_TYPE_DEBUG,
1394        "%s: write sequence number %u\n",
1395        GNUNET_i2s (&socket->other_peer),
1396        (unsigned int) socket->write_sequence_number);
1397   
1398   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1399   msg->header.header.size = 
1400     htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1401   msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1402   msg->sequence_number = htonl (socket->write_sequence_number);
1403   msg->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
1404
1405   return msg;
1406 }
1407
1408
1409 /**
1410  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1411  *
1412  * @param cls the socket (set from GNUNET_MESH_connect)
1413  * @param tunnel connection to the other end
1414  * @param tunnel_ctx this is NULL
1415  * @param sender who sent the message
1416  * @param message the actual message
1417  * @param atsi performance data for the connection
1418  * @return GNUNET_OK to keep the connection open,
1419  *         GNUNET_SYSERR to close it (signal serious error)
1420  */
1421 static int
1422 client_handle_hello_ack (void *cls,
1423                          struct GNUNET_MESH_Tunnel *tunnel,
1424                          void **tunnel_ctx,
1425                          const struct GNUNET_PeerIdentity *sender,
1426                          const struct GNUNET_MessageHeader *message,
1427                          const struct GNUNET_ATS_Information*atsi)
1428 {
1429   struct GNUNET_STREAM_Socket *socket = cls;
1430   const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
1431   struct GNUNET_STREAM_HelloAckMessage *reply;
1432
1433   if (0 != memcmp (sender,
1434                    &socket->other_peer,
1435                    sizeof (struct GNUNET_PeerIdentity)))
1436   {
1437     LOG (GNUNET_ERROR_TYPE_DEBUG,
1438          "%s: Received HELLO_ACK from non-confirming peer\n",
1439          GNUNET_i2s (&socket->other_peer));
1440     return GNUNET_YES;
1441   }
1442   ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
1443   LOG (GNUNET_ERROR_TYPE_DEBUG,
1444        "%s: Received HELLO_ACK from %s\n",
1445        GNUNET_i2s (&socket->other_peer),
1446        GNUNET_i2s (&socket->other_peer));
1447
1448   GNUNET_assert (socket->tunnel == tunnel);
1449   switch (socket->state)
1450   {
1451   case STATE_HELLO_WAIT:
1452     socket->read_sequence_number = ntohl (ack_msg->sequence_number);
1453     LOG (GNUNET_ERROR_TYPE_DEBUG,
1454          "%s: Read sequence number %u\n",
1455          GNUNET_i2s (&socket->other_peer),
1456          (unsigned int) socket->read_sequence_number);
1457     socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
1458     reply = generate_hello_ack_msg (socket);
1459     queue_message (socket,
1460                    &reply->header, 
1461                    &set_state_established, 
1462                    NULL);      
1463     return GNUNET_OK;
1464   case STATE_ESTABLISHED:
1465   case STATE_RECEIVE_CLOSE_WAIT:
1466     // call statistics (# ACKs ignored++)
1467     return GNUNET_OK;
1468   case STATE_INIT:
1469   default:
1470     LOG (GNUNET_ERROR_TYPE_DEBUG,
1471          "%s: Server %s sent HELLO_ACK when in state %d\n", 
1472          GNUNET_i2s (&socket->other_peer),
1473          GNUNET_i2s (&socket->other_peer),
1474          socket->state);
1475     socket->state = STATE_CLOSED; // introduce STATE_ERROR?
1476     return GNUNET_SYSERR;
1477   }
1478
1479 }
1480
1481
1482 /**
1483  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1484  *
1485  * @param cls the socket (set from GNUNET_MESH_connect)
1486  * @param tunnel connection to the other end
1487  * @param tunnel_ctx this is NULL
1488  * @param sender who sent the message
1489  * @param message the actual message
1490  * @param atsi performance data for the connection
1491  * @return GNUNET_OK to keep the connection open,
1492  *         GNUNET_SYSERR to close it (signal serious error)
1493  */
1494 static int
1495 client_handle_reset (void *cls,
1496                      struct GNUNET_MESH_Tunnel *tunnel,
1497                      void **tunnel_ctx,
1498                      const struct GNUNET_PeerIdentity *sender,
1499                      const struct GNUNET_MessageHeader *message,
1500                      const struct GNUNET_ATS_Information*atsi)
1501 {
1502   // struct GNUNET_STREAM_Socket *socket = cls;
1503
1504   return GNUNET_OK;
1505 }
1506
1507
1508 /**
1509  * Common message handler for handling TRANSMIT_CLOSE messages
1510  *
1511  * @param socket the socket through which the ack was received
1512  * @param tunnel connection to the other end
1513  * @param sender who sent the message
1514  * @param msg the transmit close message
1515  * @param atsi performance data for the connection
1516  * @return GNUNET_OK to keep the connection open,
1517  *         GNUNET_SYSERR to close it (signal serious error)
1518  */
1519 static int
1520 handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
1521                        struct GNUNET_MESH_Tunnel *tunnel,
1522                        const struct GNUNET_PeerIdentity *sender,
1523                        const struct GNUNET_STREAM_MessageHeader *msg,
1524                        const struct GNUNET_ATS_Information*atsi)
1525 {
1526   struct GNUNET_STREAM_MessageHeader *reply;
1527
1528   switch (socket->state)
1529   {
1530   case STATE_ESTABLISHED:
1531     socket->state = STATE_RECEIVE_CLOSED;
1532
1533     /* Send TRANSMIT_CLOSE_ACK */
1534     reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1535     reply->header.type = 
1536       htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
1537     reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1538     queue_message (socket, reply, NULL, NULL);
1539     break;
1540
1541   default:
1542     /* FIXME: Call statistics? */
1543     break;
1544   }
1545   return GNUNET_YES;
1546 }
1547
1548
1549 /**
1550  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1551  *
1552  * @param cls the socket (set from GNUNET_MESH_connect)
1553  * @param tunnel connection to the other end
1554  * @param tunnel_ctx this is NULL
1555  * @param sender who sent the message
1556  * @param message the actual message
1557  * @param atsi performance data for the connection
1558  * @return GNUNET_OK to keep the connection open,
1559  *         GNUNET_SYSERR to close it (signal serious error)
1560  */
1561 static int
1562 client_handle_transmit_close (void *cls,
1563                               struct GNUNET_MESH_Tunnel *tunnel,
1564                               void **tunnel_ctx,
1565                               const struct GNUNET_PeerIdentity *sender,
1566                               const struct GNUNET_MessageHeader *message,
1567                               const struct GNUNET_ATS_Information*atsi)
1568 {
1569   struct GNUNET_STREAM_Socket *socket = cls;
1570   
1571   return handle_transmit_close (socket,
1572                                 tunnel,
1573                                 sender,
1574                                 (struct GNUNET_STREAM_MessageHeader *)message,
1575                                 atsi);
1576 }
1577
1578
1579 /**
1580  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_*_CLOSE_ACK messages
1581  *
1582  * @param socket the socket
1583  * @param tunnel connection to the other end
1584  * @param sender who sent the message
1585  * @param message the actual message
1586  * @param atsi performance data for the connection
1587  * @param operation the close operation which is being ACK'ed
1588  * @return GNUNET_OK to keep the connection open,
1589  *         GNUNET_SYSERR to close it (signal serious error)
1590  */
1591 static int
1592 handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
1593                           struct GNUNET_MESH_Tunnel *tunnel,
1594                           const struct GNUNET_PeerIdentity *sender,
1595                           const struct GNUNET_STREAM_MessageHeader *message,
1596                           const struct GNUNET_ATS_Information *atsi,
1597                           int operation)
1598 {
1599   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
1600
1601   shutdown_handle = socket->shutdown_handle;
1602   if (NULL == shutdown_handle)
1603   {
1604     LOG (GNUNET_ERROR_TYPE_DEBUG,
1605          "%s: Received CLOSE_ACK when shutdown handle is NULL\n",
1606          GNUNET_i2s (&socket->other_peer));
1607     return GNUNET_OK;
1608   }
1609
1610   switch (operation)
1611   {
1612   case SHUT_RDWR:
1613     switch (socket->state)
1614     {
1615     case STATE_CLOSE_WAIT:
1616       if (SHUT_RDWR != shutdown_handle->operation)
1617       {
1618         LOG (GNUNET_ERROR_TYPE_DEBUG,
1619              "%s: Received CLOSE_ACK when shutdown handle is not for "
1620              "SHUT_RDWR\n",
1621              GNUNET_i2s (&socket->other_peer));
1622         return GNUNET_OK;
1623       }
1624
1625       LOG (GNUNET_ERROR_TYPE_DEBUG,
1626            "%s: Received CLOSE_ACK from %s\n",
1627            GNUNET_i2s (&socket->other_peer),
1628            GNUNET_i2s (&socket->other_peer));
1629       socket->state = STATE_CLOSED;
1630       break;
1631     default:
1632       LOG (GNUNET_ERROR_TYPE_DEBUG,
1633            "%s: Received CLOSE_ACK when in it not expected\n",
1634            GNUNET_i2s (&socket->other_peer));
1635       return GNUNET_OK;
1636     }
1637     break;
1638
1639   case SHUT_RD:
1640     switch (socket->state)
1641     {
1642     case STATE_RECEIVE_CLOSE_WAIT:
1643       if (SHUT_RD != shutdown_handle->operation)
1644       {
1645         LOG (GNUNET_ERROR_TYPE_DEBUG,
1646              "%s: Received RECEIVE_CLOSE_ACK when shutdown handle "
1647              "is not for SHUT_RD\n",
1648              GNUNET_i2s (&socket->other_peer));
1649         return GNUNET_OK;
1650       }
1651
1652       LOG (GNUNET_ERROR_TYPE_DEBUG,
1653            "%s: Received RECEIVE_CLOSE_ACK from %s\n",
1654            GNUNET_i2s (&socket->other_peer),
1655            GNUNET_i2s (&socket->other_peer));
1656       socket->state = STATE_RECEIVE_CLOSED;
1657       break;
1658     default:
1659       LOG (GNUNET_ERROR_TYPE_DEBUG,
1660            "%s: Received RECEIVE_CLOSE_ACK when in it not expected\n",
1661            GNUNET_i2s (&socket->other_peer));
1662       return GNUNET_OK;
1663     }
1664
1665     break;
1666   case SHUT_WR:
1667     switch (socket->state)
1668     {
1669     case STATE_TRANSMIT_CLOSE_WAIT:
1670       if (SHUT_WR != shutdown_handle->operation)
1671       {
1672         LOG (GNUNET_ERROR_TYPE_DEBUG,
1673              "%s: Received TRANSMIT_CLOSE_ACK when shutdown handle "
1674              "is not for SHUT_WR\n",
1675              GNUNET_i2s (&socket->other_peer));
1676         return GNUNET_OK;
1677       }
1678
1679       LOG (GNUNET_ERROR_TYPE_DEBUG,
1680            "%s: Received TRANSMIT_CLOSE_ACK from %s\n",
1681            GNUNET_i2s (&socket->other_peer),
1682            GNUNET_i2s (&socket->other_peer));
1683       socket->state = STATE_TRANSMIT_CLOSED;
1684       break;
1685     default:
1686       LOG (GNUNET_ERROR_TYPE_DEBUG,
1687            "%s: Received TRANSMIT_CLOSE_ACK when in it not expected\n",
1688            GNUNET_i2s (&socket->other_peer));
1689           
1690       return GNUNET_OK;
1691     }
1692     break;
1693   default:
1694     GNUNET_assert (0);
1695   }
1696
1697   if (NULL != shutdown_handle->completion_cb) /* Shutdown completion */
1698     shutdown_handle->completion_cb(shutdown_handle->completion_cls,
1699                                    operation);
1700   GNUNET_free (shutdown_handle); /* Free shutdown handle */
1701   socket->shutdown_handle = NULL;
1702   if (GNUNET_SCHEDULER_NO_TASK
1703       != shutdown_handle->close_msg_retransmission_task_id)
1704   {
1705     GNUNET_SCHEDULER_cancel
1706       (shutdown_handle->close_msg_retransmission_task_id);
1707     shutdown_handle->close_msg_retransmission_task_id =
1708       GNUNET_SCHEDULER_NO_TASK;
1709   }
1710   return GNUNET_OK;
1711 }
1712
1713
1714 /**
1715  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1716  *
1717  * @param cls the socket (set from GNUNET_MESH_connect)
1718  * @param tunnel connection to the other end
1719  * @param tunnel_ctx this is NULL
1720  * @param sender who sent the message
1721  * @param message the actual message
1722  * @param atsi performance data for the connection
1723  * @return GNUNET_OK to keep the connection open,
1724  *         GNUNET_SYSERR to close it (signal serious error)
1725  */
1726 static int
1727 client_handle_transmit_close_ack (void *cls,
1728                                   struct GNUNET_MESH_Tunnel *tunnel,
1729                                   void **tunnel_ctx,
1730                                   const struct GNUNET_PeerIdentity *sender,
1731                                   const struct GNUNET_MessageHeader *message,
1732                                   const struct GNUNET_ATS_Information*atsi)
1733 {
1734   struct GNUNET_STREAM_Socket *socket = cls;
1735
1736   return handle_generic_close_ack (socket,
1737                                    tunnel,
1738                                    sender,
1739                                    (const struct GNUNET_STREAM_MessageHeader *)
1740                                    message,
1741                                    atsi,
1742                                    SHUT_WR);
1743 }
1744
1745
1746 /**
1747  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1748  *
1749  * @param socket the socket
1750  * @param tunnel connection to the other end
1751  * @param sender who sent the message
1752  * @param message the actual message
1753  * @param atsi performance data for the connection
1754  * @return GNUNET_OK to keep the connection open,
1755  *         GNUNET_SYSERR to close it (signal serious error)
1756  */
1757 static int
1758 handle_receive_close (struct GNUNET_STREAM_Socket *socket,
1759                       struct GNUNET_MESH_Tunnel *tunnel,
1760                       const struct GNUNET_PeerIdentity *sender,
1761                       const struct GNUNET_STREAM_MessageHeader *message,
1762                       const struct GNUNET_ATS_Information *atsi)
1763 {
1764   struct GNUNET_STREAM_MessageHeader *receive_close_ack;
1765
1766   switch (socket->state)
1767   {
1768   case STATE_INIT:
1769   case STATE_LISTEN:
1770   case STATE_HELLO_WAIT:
1771     LOG (GNUNET_ERROR_TYPE_DEBUG,
1772          "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
1773          GNUNET_i2s (&socket->other_peer));
1774     return GNUNET_OK;
1775   default:
1776     break;
1777   }
1778   
1779   LOG (GNUNET_ERROR_TYPE_DEBUG,
1780        "%s: Received RECEIVE_CLOSE from %s\n",
1781        GNUNET_i2s (&socket->other_peer),
1782        GNUNET_i2s (&socket->other_peer));
1783   receive_close_ack =
1784     GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1785   receive_close_ack->header.size =
1786     htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1787   receive_close_ack->header.type =
1788     htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK);
1789   queue_message (socket,
1790                  receive_close_ack,
1791                  &set_state_closed,
1792                  NULL);
1793   
1794   /* FIXME: Handle the case where write handle is present; the write operation
1795      should be deemed as finised and the write continuation callback
1796      has to be called with the stream status GNUNET_STREAM_SHUTDOWN */
1797   return GNUNET_OK;
1798 }
1799
1800
1801 /**
1802  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1803  *
1804  * @param cls the socket (set from GNUNET_MESH_connect)
1805  * @param tunnel connection to the other end
1806  * @param tunnel_ctx this is NULL
1807  * @param sender who sent the message
1808  * @param message the actual message
1809  * @param atsi performance data for the connection
1810  * @return GNUNET_OK to keep the connection open,
1811  *         GNUNET_SYSERR to close it (signal serious error)
1812  */
1813 static int
1814 client_handle_receive_close (void *cls,
1815                              struct GNUNET_MESH_Tunnel *tunnel,
1816                              void **tunnel_ctx,
1817                              const struct GNUNET_PeerIdentity *sender,
1818                              const struct GNUNET_MessageHeader *message,
1819                              const struct GNUNET_ATS_Information*atsi)
1820 {
1821   struct GNUNET_STREAM_Socket *socket = cls;
1822
1823   return
1824     handle_receive_close (socket,
1825                           tunnel,
1826                           sender,
1827                           (const struct GNUNET_STREAM_MessageHeader *) message,
1828                           atsi);
1829 }
1830
1831
1832 /**
1833  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1834  *
1835  * @param cls the socket (set from GNUNET_MESH_connect)
1836  * @param tunnel connection to the other end
1837  * @param tunnel_ctx this is NULL
1838  * @param sender who sent the message
1839  * @param message the actual message
1840  * @param atsi performance data for the connection
1841  * @return GNUNET_OK to keep the connection open,
1842  *         GNUNET_SYSERR to close it (signal serious error)
1843  */
1844 static int
1845 client_handle_receive_close_ack (void *cls,
1846                                  struct GNUNET_MESH_Tunnel *tunnel,
1847                                  void **tunnel_ctx,
1848                                  const struct GNUNET_PeerIdentity *sender,
1849                                  const struct GNUNET_MessageHeader *message,
1850                                  const struct GNUNET_ATS_Information*atsi)
1851 {
1852   struct GNUNET_STREAM_Socket *socket = cls;
1853
1854   return handle_generic_close_ack (socket,
1855                                    tunnel,
1856                                    sender,
1857                                    (const struct GNUNET_STREAM_MessageHeader *)
1858                                    message,
1859                                    atsi,
1860                                    SHUT_RD);
1861 }
1862
1863
1864 /**
1865  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1866  *
1867  * @param socket the socket
1868  * @param tunnel connection to the other end
1869  * @param sender who sent the message
1870  * @param message the actual message
1871  * @param atsi performance data for the connection
1872  * @return GNUNET_OK to keep the connection open,
1873  *         GNUNET_SYSERR to close it (signal serious error)
1874  */
1875 static int
1876 handle_close (struct GNUNET_STREAM_Socket *socket,
1877               struct GNUNET_MESH_Tunnel *tunnel,
1878               const struct GNUNET_PeerIdentity *sender,
1879               const struct GNUNET_STREAM_MessageHeader *message,
1880               const struct GNUNET_ATS_Information*atsi)
1881 {
1882   struct GNUNET_STREAM_MessageHeader *close_ack;
1883
1884   switch (socket->state)
1885   {
1886   case STATE_INIT:
1887   case STATE_LISTEN:
1888   case STATE_HELLO_WAIT:
1889     LOG (GNUNET_ERROR_TYPE_DEBUG,
1890          "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
1891          GNUNET_i2s (&socket->other_peer));
1892     return GNUNET_OK;
1893   default:
1894     break;
1895   }
1896
1897   LOG (GNUNET_ERROR_TYPE_DEBUG,
1898        "%s: Received CLOSE from %s\n",
1899        GNUNET_i2s (&socket->other_peer),
1900        GNUNET_i2s (&socket->other_peer));
1901   close_ack = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1902   close_ack->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1903   close_ack->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK);
1904   queue_message (socket,
1905                  close_ack,
1906                  &set_state_closed,
1907                  NULL);
1908   if (socket->state == STATE_CLOSED)
1909     return GNUNET_OK;
1910
1911   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1912   socket->receive_buffer = NULL;
1913   socket->receive_buffer_size = 0;
1914   return GNUNET_OK;
1915 }
1916
1917
1918 /**
1919  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1920  *
1921  * @param cls the socket (set from GNUNET_MESH_connect)
1922  * @param tunnel connection to the other end
1923  * @param tunnel_ctx this is NULL
1924  * @param sender who sent the message
1925  * @param message the actual message
1926  * @param atsi performance data for the connection
1927  * @return GNUNET_OK to keep the connection open,
1928  *         GNUNET_SYSERR to close it (signal serious error)
1929  */
1930 static int
1931 client_handle_close (void *cls,
1932                      struct GNUNET_MESH_Tunnel *tunnel,
1933                      void **tunnel_ctx,
1934                      const struct GNUNET_PeerIdentity *sender,
1935                      const struct GNUNET_MessageHeader *message,
1936                      const struct GNUNET_ATS_Information*atsi)
1937 {
1938   struct GNUNET_STREAM_Socket *socket = cls;
1939
1940   return handle_close (socket,
1941                        tunnel,
1942                        sender,
1943                        (const struct GNUNET_STREAM_MessageHeader *) message,
1944                        atsi);
1945 }
1946
1947
1948 /**
1949  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1950  *
1951  * @param cls the socket (set from GNUNET_MESH_connect)
1952  * @param tunnel connection to the other end
1953  * @param tunnel_ctx this is NULL
1954  * @param sender who sent the message
1955  * @param message the actual message
1956  * @param atsi performance data for the connection
1957  * @return GNUNET_OK to keep the connection open,
1958  *         GNUNET_SYSERR to close it (signal serious error)
1959  */
1960 static int
1961 client_handle_close_ack (void *cls,
1962                          struct GNUNET_MESH_Tunnel *tunnel,
1963                          void **tunnel_ctx,
1964                          const struct GNUNET_PeerIdentity *sender,
1965                          const struct GNUNET_MessageHeader *message,
1966                          const struct GNUNET_ATS_Information *atsi)
1967 {
1968   struct GNUNET_STREAM_Socket *socket = cls;
1969
1970   return handle_generic_close_ack (socket,
1971                                    tunnel,
1972                                    sender,
1973                                    (const struct GNUNET_STREAM_MessageHeader *) 
1974                                    message,
1975                                    atsi,
1976                                    SHUT_RDWR);
1977 }
1978
1979 /*****************************/
1980 /* Server's Message Handlers */
1981 /*****************************/
1982
1983 /**
1984  * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1985  *
1986  * @param cls the closure
1987  * @param tunnel connection to the other end
1988  * @param tunnel_ctx the socket
1989  * @param sender who sent the message
1990  * @param message the actual message
1991  * @param atsi performance data for the connection
1992  * @return GNUNET_OK to keep the connection open,
1993  *         GNUNET_SYSERR to close it (signal serious error)
1994  */
1995 static int
1996 server_handle_data (void *cls,
1997                     struct GNUNET_MESH_Tunnel *tunnel,
1998                     void **tunnel_ctx,
1999                     const struct GNUNET_PeerIdentity *sender,
2000                     const struct GNUNET_MessageHeader *message,
2001                     const struct GNUNET_ATS_Information*atsi)
2002 {
2003   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2004
2005   return handle_data (socket,
2006                       tunnel,
2007                       sender,
2008                       (const struct GNUNET_STREAM_DataMessage *)message,
2009                       atsi);
2010 }
2011
2012
2013 /**
2014  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
2015  *
2016  * @param cls the closure
2017  * @param tunnel connection to the other end
2018  * @param tunnel_ctx the socket
2019  * @param sender who sent the message
2020  * @param message the actual message
2021  * @param atsi performance data for the connection
2022  * @return GNUNET_OK to keep the connection open,
2023  *         GNUNET_SYSERR to close it (signal serious error)
2024  */
2025 static int
2026 server_handle_hello (void *cls,
2027                      struct GNUNET_MESH_Tunnel *tunnel,
2028                      void **tunnel_ctx,
2029                      const struct GNUNET_PeerIdentity *sender,
2030                      const struct GNUNET_MessageHeader *message,
2031                      const struct GNUNET_ATS_Information*atsi)
2032 {
2033   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2034   struct GNUNET_STREAM_HelloAckMessage *reply;
2035
2036   if (0 != memcmp (sender,
2037                    &socket->other_peer,
2038                    sizeof (struct GNUNET_PeerIdentity)))
2039   {
2040     LOG (GNUNET_ERROR_TYPE_DEBUG,
2041          "%s: Received HELLO from non-confirming peer\n",
2042          GNUNET_i2s (&socket->other_peer));
2043     return GNUNET_YES;
2044   }
2045
2046   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == 
2047                  ntohs (message->type));
2048   GNUNET_assert (socket->tunnel == tunnel);
2049   LOG (GNUNET_ERROR_TYPE_DEBUG,
2050        "%s: Received HELLO from %s\n", 
2051        GNUNET_i2s (&socket->other_peer),
2052        GNUNET_i2s (&socket->other_peer));
2053
2054   if (STATE_INIT == socket->state)
2055   {
2056     reply = generate_hello_ack_msg (socket);
2057     queue_message (socket, 
2058                    &reply->header,
2059                    &set_state_hello_wait, 
2060                    NULL);
2061   }
2062   else
2063   {
2064     LOG (GNUNET_ERROR_TYPE_DEBUG,
2065          "%s: Client sent HELLO when in state %d\n", 
2066          GNUNET_i2s (&socket->other_peer),
2067          socket->state);
2068     /* FIXME: Send RESET? */
2069       
2070   }
2071   return GNUNET_OK;
2072 }
2073
2074
2075 /**
2076  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
2077  *
2078  * @param cls the closure
2079  * @param tunnel connection to the other end
2080  * @param tunnel_ctx the socket
2081  * @param sender who sent the message
2082  * @param message the actual message
2083  * @param atsi performance data for the connection
2084  * @return GNUNET_OK to keep the connection open,
2085  *         GNUNET_SYSERR to close it (signal serious error)
2086  */
2087 static int
2088 server_handle_hello_ack (void *cls,
2089                          struct GNUNET_MESH_Tunnel *tunnel,
2090                          void **tunnel_ctx,
2091                          const struct GNUNET_PeerIdentity *sender,
2092                          const struct GNUNET_MessageHeader *message,
2093                          const struct GNUNET_ATS_Information*atsi)
2094 {
2095   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2096   const struct GNUNET_STREAM_HelloAckMessage *ack_message;
2097
2098   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK ==
2099                  ntohs (message->type));
2100   GNUNET_assert (socket->tunnel == tunnel);
2101   ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
2102   if (STATE_HELLO_WAIT == socket->state)
2103   {
2104     LOG (GNUNET_ERROR_TYPE_DEBUG,
2105          "%s: Received HELLO_ACK from %s\n",
2106          GNUNET_i2s (&socket->other_peer),
2107          GNUNET_i2s (&socket->other_peer));
2108     socket->read_sequence_number = ntohl (ack_message->sequence_number);
2109     LOG (GNUNET_ERROR_TYPE_DEBUG,
2110          "%s: Read sequence number %u\n",
2111          GNUNET_i2s (&socket->other_peer),
2112          (unsigned int) socket->read_sequence_number);
2113     socket->receiver_window_available = 
2114       ntohl (ack_message->receiver_window_size);
2115     /* Attain ESTABLISHED state */
2116     set_state_established (NULL, socket);
2117   }
2118   else
2119   {
2120     LOG (GNUNET_ERROR_TYPE_DEBUG,
2121          "Client sent HELLO_ACK when in state %d\n", socket->state);
2122     /* FIXME: Send RESET? */
2123       
2124   }
2125   return GNUNET_OK;
2126 }
2127
2128
2129 /**
2130  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
2131  *
2132  * @param cls the closure
2133  * @param tunnel connection to the other end
2134  * @param tunnel_ctx the socket
2135  * @param sender who sent the message
2136  * @param message the actual message
2137  * @param atsi performance data for the connection
2138  * @return GNUNET_OK to keep the connection open,
2139  *         GNUNET_SYSERR to close it (signal serious error)
2140  */
2141 static int
2142 server_handle_reset (void *cls,
2143                      struct GNUNET_MESH_Tunnel *tunnel,
2144                      void **tunnel_ctx,
2145                      const struct GNUNET_PeerIdentity *sender,
2146                      const struct GNUNET_MessageHeader *message,
2147                      const struct GNUNET_ATS_Information*atsi)
2148 {
2149   // struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2150
2151   return GNUNET_OK;
2152 }
2153
2154
2155 /**
2156  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
2157  *
2158  * @param cls the closure
2159  * @param tunnel connection to the other end
2160  * @param tunnel_ctx the socket
2161  * @param sender who sent the message
2162  * @param message the actual message
2163  * @param atsi performance data for the connection
2164  * @return GNUNET_OK to keep the connection open,
2165  *         GNUNET_SYSERR to close it (signal serious error)
2166  */
2167 static int
2168 server_handle_transmit_close (void *cls,
2169                               struct GNUNET_MESH_Tunnel *tunnel,
2170                               void **tunnel_ctx,
2171                               const struct GNUNET_PeerIdentity *sender,
2172                               const struct GNUNET_MessageHeader *message,
2173                               const struct GNUNET_ATS_Information*atsi)
2174 {
2175   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2176
2177   return handle_transmit_close (socket,
2178                                 tunnel,
2179                                 sender,
2180                                 (struct GNUNET_STREAM_MessageHeader *)message,
2181                                 atsi);
2182 }
2183
2184
2185 /**
2186  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
2187  *
2188  * @param cls the closure
2189  * @param tunnel connection to the other end
2190  * @param tunnel_ctx the socket
2191  * @param sender who sent the message
2192  * @param message the actual message
2193  * @param atsi performance data for the connection
2194  * @return GNUNET_OK to keep the connection open,
2195  *         GNUNET_SYSERR to close it (signal serious error)
2196  */
2197 static int
2198 server_handle_transmit_close_ack (void *cls,
2199                                   struct GNUNET_MESH_Tunnel *tunnel,
2200                                   void **tunnel_ctx,
2201                                   const struct GNUNET_PeerIdentity *sender,
2202                                   const struct GNUNET_MessageHeader *message,
2203                                   const struct GNUNET_ATS_Information*atsi)
2204 {
2205   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2206
2207   return handle_generic_close_ack (socket,
2208                                    tunnel,
2209                                    sender,
2210                                    (const struct GNUNET_STREAM_MessageHeader *)
2211                                    message,
2212                                    atsi,
2213                                    SHUT_WR);
2214 }
2215
2216
2217 /**
2218  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
2219  *
2220  * @param cls the closure
2221  * @param tunnel connection to the other end
2222  * @param tunnel_ctx the socket
2223  * @param sender who sent the message
2224  * @param message the actual message
2225  * @param atsi performance data for the connection
2226  * @return GNUNET_OK to keep the connection open,
2227  *         GNUNET_SYSERR to close it (signal serious error)
2228  */
2229 static int
2230 server_handle_receive_close (void *cls,
2231                              struct GNUNET_MESH_Tunnel *tunnel,
2232                              void **tunnel_ctx,
2233                              const struct GNUNET_PeerIdentity *sender,
2234                              const struct GNUNET_MessageHeader *message,
2235                              const struct GNUNET_ATS_Information*atsi)
2236 {
2237   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2238
2239   return
2240     handle_receive_close (socket,
2241                           tunnel,
2242                           sender,
2243                           (const struct GNUNET_STREAM_MessageHeader *) message,
2244                           atsi);
2245 }
2246
2247
2248 /**
2249  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
2250  *
2251  * @param cls the closure
2252  * @param tunnel connection to the other end
2253  * @param tunnel_ctx the socket
2254  * @param sender who sent the message
2255  * @param message the actual message
2256  * @param atsi performance data for the connection
2257  * @return GNUNET_OK to keep the connection open,
2258  *         GNUNET_SYSERR to close it (signal serious error)
2259  */
2260 static int
2261 server_handle_receive_close_ack (void *cls,
2262                                  struct GNUNET_MESH_Tunnel *tunnel,
2263                                  void **tunnel_ctx,
2264                                  const struct GNUNET_PeerIdentity *sender,
2265                                  const struct GNUNET_MessageHeader *message,
2266                                  const struct GNUNET_ATS_Information*atsi)
2267 {
2268   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2269
2270   return handle_generic_close_ack (socket,
2271                                    tunnel,
2272                                    sender,
2273                                    (const struct GNUNET_STREAM_MessageHeader *)
2274                                    message,
2275                                    atsi,
2276                                    SHUT_RD);
2277 }
2278
2279
2280 /**
2281  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
2282  *
2283  * @param cls the listen socket (from GNUNET_MESH_connect in
2284  *          GNUNET_STREAM_listen) 
2285  * @param tunnel connection to the other end
2286  * @param tunnel_ctx the socket
2287  * @param sender who sent the message
2288  * @param message the actual message
2289  * @param atsi performance data for the connection
2290  * @return GNUNET_OK to keep the connection open,
2291  *         GNUNET_SYSERR to close it (signal serious error)
2292  */
2293 static int
2294 server_handle_close (void *cls,
2295                      struct GNUNET_MESH_Tunnel *tunnel,
2296                      void **tunnel_ctx,
2297                      const struct GNUNET_PeerIdentity *sender,
2298                      const struct GNUNET_MessageHeader *message,
2299                      const struct GNUNET_ATS_Information*atsi)
2300 {
2301   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2302   
2303   return handle_close (socket,
2304                        tunnel,
2305                        sender,
2306                        (const struct GNUNET_STREAM_MessageHeader *) message,
2307                        atsi);
2308 }
2309
2310
2311 /**
2312  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
2313  *
2314  * @param cls the closure
2315  * @param tunnel connection to the other end
2316  * @param tunnel_ctx the socket
2317  * @param sender who sent the message
2318  * @param message the actual message
2319  * @param atsi performance data for the connection
2320  * @return GNUNET_OK to keep the connection open,
2321  *         GNUNET_SYSERR to close it (signal serious error)
2322  */
2323 static int
2324 server_handle_close_ack (void *cls,
2325                          struct GNUNET_MESH_Tunnel *tunnel,
2326                          void **tunnel_ctx,
2327                          const struct GNUNET_PeerIdentity *sender,
2328                          const struct GNUNET_MessageHeader *message,
2329                          const struct GNUNET_ATS_Information*atsi)
2330 {
2331   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2332
2333   return handle_generic_close_ack (socket,
2334                                    tunnel,
2335                                    sender,
2336                                    (const struct GNUNET_STREAM_MessageHeader *) 
2337                                    message,
2338                                    atsi,
2339                                    SHUT_RDWR);
2340 }
2341
2342
2343 /**
2344  * Handler for DATA_ACK messages
2345  *
2346  * @param socket the socket through which the ack was received
2347  * @param tunnel connection to the other end
2348  * @param sender who sent the message
2349  * @param ack the acknowledgment message
2350  * @param atsi performance data for the connection
2351  * @return GNUNET_OK to keep the connection open,
2352  *         GNUNET_SYSERR to close it (signal serious error)
2353  */
2354 static int
2355 handle_ack (struct GNUNET_STREAM_Socket *socket,
2356             struct GNUNET_MESH_Tunnel *tunnel,
2357             const struct GNUNET_PeerIdentity *sender,
2358             const struct GNUNET_STREAM_AckMessage *ack,
2359             const struct GNUNET_ATS_Information*atsi)
2360 {
2361   unsigned int packet;
2362   int need_retransmission;
2363   uint32_t sequence_difference;
2364   
2365   if (0 != memcmp (sender,
2366                    &socket->other_peer,
2367                    sizeof (struct GNUNET_PeerIdentity)))
2368   {
2369     LOG (GNUNET_ERROR_TYPE_DEBUG,
2370          "%s: Received ACK from non-confirming peer\n",
2371          GNUNET_i2s (&socket->other_peer));
2372     return GNUNET_YES;
2373   }
2374   switch (socket->state)
2375   {
2376   case (STATE_ESTABLISHED):
2377   case (STATE_RECEIVE_CLOSED):
2378   case (STATE_RECEIVE_CLOSE_WAIT):
2379     if (NULL == socket->write_handle)
2380     {
2381       LOG (GNUNET_ERROR_TYPE_DEBUG,
2382            "%s: Received DATA_ACK when write_handle is NULL\n",
2383            GNUNET_i2s (&socket->other_peer));
2384       return GNUNET_OK;
2385     }
2386     /* FIXME: increment in the base sequence number is breaking current flow
2387      */
2388     if (!((socket->write_sequence_number 
2389            - ntohl (ack->base_sequence_number)) < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
2390     {
2391       LOG (GNUNET_ERROR_TYPE_DEBUG,
2392            "%s: Received DATA_ACK with unexpected base sequence number\n",
2393            GNUNET_i2s (&socket->other_peer));
2394       LOG (GNUNET_ERROR_TYPE_DEBUG,
2395            "%s: Current write sequence: %u; Ack's base sequence: %u\n",
2396            GNUNET_i2s (&socket->other_peer),
2397            socket->write_sequence_number,
2398            ntohl (ack->base_sequence_number));
2399       return GNUNET_OK;
2400     }
2401     /* FIXME: include the case when write_handle is cancelled - ignore the 
2402        acks */
2403     LOG (GNUNET_ERROR_TYPE_DEBUG,
2404          "%s: Received DATA_ACK from %s\n",
2405          GNUNET_i2s (&socket->other_peer),
2406          GNUNET_i2s (&socket->other_peer));
2407       
2408     /* Cancel the retransmission task */
2409     if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
2410     {
2411       GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
2412       socket->retransmission_timeout_task_id = 
2413         GNUNET_SCHEDULER_NO_TASK;
2414     }
2415     for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2416     {
2417       if (NULL == socket->write_handle->messages[packet]) break;
2418       /* BS: Base sequence from ack; PS: sequence num of current packet */
2419       sequence_difference = ntohl (ack->base_sequence_number)
2420         - ntohl (socket->write_handle->messages[packet]->sequence_number);
2421       /* case where BS = PS + GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH */
2422       if ((sequence_difference == GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
2423           || ((sequence_difference < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
2424               && (0 != sequence_difference))) /* case: BS > PS and BS != PS*/
2425       {
2426         ackbitmap_modify_bit (&socket->write_handle->ack_bitmap, packet,
2427                               GNUNET_YES);
2428         continue;
2429       }
2430       if (GNUNET_YES == 
2431           ackbitmap_is_bit_set (&socket->write_handle->ack_bitmap,
2432                                 -sequence_difference))/*inversion as PS >= BS */
2433       {
2434         ackbitmap_modify_bit (&socket->write_handle->ack_bitmap, packet,
2435                               GNUNET_YES);
2436       }
2437     }
2438     /* Update the receive window remaining
2439        FIXME : Should update with the value from a data ack with greater
2440        sequence number */
2441     socket->receiver_window_available = 
2442       ntohl (ack->receive_window_remaining);
2443     /* Check if we have received all acknowledgements */
2444     need_retransmission = GNUNET_NO;
2445     for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2446     {
2447       if (NULL == socket->write_handle->messages[packet]) break;
2448       if (GNUNET_YES != ackbitmap_is_bit_set 
2449           (&socket->write_handle->ack_bitmap,packet))
2450       {
2451         need_retransmission = GNUNET_YES;
2452         break;
2453       }
2454     }
2455     if (GNUNET_YES == need_retransmission)
2456     {
2457       write_data (socket);
2458     }
2459     else      /* We have to call the write continuation callback now */
2460     {
2461       struct GNUNET_STREAM_IOWriteHandle *write_handle;
2462       
2463       /* Free the packets */
2464       for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2465       {
2466         GNUNET_free_non_null (socket->write_handle->messages[packet]);
2467       }
2468       write_handle = socket->write_handle;
2469       socket->write_handle = NULL;
2470       if (NULL != write_handle->write_cont)
2471         write_handle->write_cont (write_handle->write_cont_cls,
2472                                   socket->status,
2473                                   write_handle->size);
2474       /* We are done with the write handle - Freeing it */
2475       GNUNET_free (write_handle);
2476       LOG (GNUNET_ERROR_TYPE_DEBUG,
2477            "%s: Write completion callback completed\n",
2478            GNUNET_i2s (&socket->other_peer));      
2479     }
2480     break;
2481   default:
2482     break;
2483   }
2484   return GNUNET_OK;
2485 }
2486
2487
2488 /**
2489  * Handler for DATA_ACK messages
2490  *
2491  * @param cls the 'struct GNUNET_STREAM_Socket'
2492  * @param tunnel connection to the other end
2493  * @param tunnel_ctx unused
2494  * @param sender who sent the message
2495  * @param message the actual message
2496  * @param atsi performance data for the connection
2497  * @return GNUNET_OK to keep the connection open,
2498  *         GNUNET_SYSERR to close it (signal serious error)
2499  */
2500 static int
2501 client_handle_ack (void *cls,
2502                    struct GNUNET_MESH_Tunnel *tunnel,
2503                    void **tunnel_ctx,
2504                    const struct GNUNET_PeerIdentity *sender,
2505                    const struct GNUNET_MessageHeader *message,
2506                    const struct GNUNET_ATS_Information*atsi)
2507 {
2508   struct GNUNET_STREAM_Socket *socket = cls;
2509   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2510  
2511   return handle_ack (socket, tunnel, sender, ack, atsi);
2512 }
2513
2514
2515 /**
2516  * Handler for DATA_ACK messages
2517  *
2518  * @param cls the server's listen socket
2519  * @param tunnel connection to the other end
2520  * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
2521  * @param sender who sent the message
2522  * @param message the actual message
2523  * @param atsi performance data for the connection
2524  * @return GNUNET_OK to keep the connection open,
2525  *         GNUNET_SYSERR to close it (signal serious error)
2526  */
2527 static int
2528 server_handle_ack (void *cls,
2529                    struct GNUNET_MESH_Tunnel *tunnel,
2530                    void **tunnel_ctx,
2531                    const struct GNUNET_PeerIdentity *sender,
2532                    const struct GNUNET_MessageHeader *message,
2533                    const struct GNUNET_ATS_Information*atsi)
2534 {
2535   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2536   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2537  
2538   return handle_ack (socket, tunnel, sender, ack, atsi);
2539 }
2540
2541
2542 /**
2543  * For client message handlers, the stream socket is in the
2544  * closure argument.
2545  */
2546 static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
2547   {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2548   {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2549    sizeof (struct GNUNET_STREAM_AckMessage) },
2550   {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2551    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2552   {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2553    sizeof (struct GNUNET_STREAM_MessageHeader)},
2554   {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2555    sizeof (struct GNUNET_STREAM_MessageHeader)},
2556   {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2557    sizeof (struct GNUNET_STREAM_MessageHeader)},
2558   {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2559    sizeof (struct GNUNET_STREAM_MessageHeader)},
2560   {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2561    sizeof (struct GNUNET_STREAM_MessageHeader)},
2562   {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2563    sizeof (struct GNUNET_STREAM_MessageHeader)},
2564   {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2565    sizeof (struct GNUNET_STREAM_MessageHeader)},
2566   {NULL, 0, 0}
2567 };
2568
2569
2570 /**
2571  * For server message handlers, the stream socket is in the
2572  * tunnel context, and the listen socket in the closure argument.
2573  */
2574 static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
2575   {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2576   {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2577    sizeof (struct GNUNET_STREAM_AckMessage) },
2578   {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 
2579    sizeof (struct GNUNET_STREAM_MessageHeader)},
2580   {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2581    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2582   {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2583    sizeof (struct GNUNET_STREAM_MessageHeader)},
2584   {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2585    sizeof (struct GNUNET_STREAM_MessageHeader)},
2586   {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2587    sizeof (struct GNUNET_STREAM_MessageHeader)},
2588   {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2589    sizeof (struct GNUNET_STREAM_MessageHeader)},
2590   {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2591    sizeof (struct GNUNET_STREAM_MessageHeader)},
2592   {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2593    sizeof (struct GNUNET_STREAM_MessageHeader)},
2594   {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2595    sizeof (struct GNUNET_STREAM_MessageHeader)},
2596   {NULL, 0, 0}
2597 };
2598
2599
2600 /**
2601  * Function called when our target peer is connected to our tunnel
2602  *
2603  * @param cls the socket for which this tunnel is created
2604  * @param peer the peer identity of the target
2605  * @param atsi performance data for the connection
2606  */
2607 static void
2608 mesh_peer_connect_callback (void *cls,
2609                             const struct GNUNET_PeerIdentity *peer,
2610                             const struct GNUNET_ATS_Information * atsi)
2611 {
2612   struct GNUNET_STREAM_Socket *socket = cls;
2613   struct GNUNET_STREAM_MessageHeader *message;
2614   
2615   if (0 != memcmp (peer,
2616                    &socket->other_peer,
2617                    sizeof (struct GNUNET_PeerIdentity)))
2618   {
2619     LOG (GNUNET_ERROR_TYPE_DEBUG,
2620          "%s: A peer which is not our target has connected to our tunnel\n",
2621          GNUNET_i2s(peer));
2622     return;
2623   }
2624   
2625   LOG (GNUNET_ERROR_TYPE_DEBUG,
2626        "%s: Target peer %s connected\n",
2627        GNUNET_i2s (&socket->other_peer),
2628        GNUNET_i2s (&socket->other_peer));
2629   
2630   /* Set state to INIT */
2631   socket->state = STATE_INIT;
2632
2633   /* Send HELLO message */
2634   message = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
2635   message->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
2636   message->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
2637   queue_message (socket,
2638                  message,
2639                  &set_state_hello_wait,
2640                  NULL);
2641
2642   /* Call open callback */
2643   if (NULL == socket->open_cb)
2644   {
2645     LOG (GNUNET_ERROR_TYPE_DEBUG,
2646          "STREAM_open callback is NULL\n");
2647   }
2648 }
2649
2650
2651 /**
2652  * Function called when our target peer is disconnected from our tunnel
2653  *
2654  * @param cls the socket associated which this tunnel
2655  * @param peer the peer identity of the target
2656  */
2657 static void
2658 mesh_peer_disconnect_callback (void *cls,
2659                                const struct GNUNET_PeerIdentity *peer)
2660 {
2661   struct GNUNET_STREAM_Socket *socket=cls;
2662   
2663   /* If the state is SHUTDOWN its ok; else set the state of the socket to SYSERR */
2664   LOG (GNUNET_ERROR_TYPE_DEBUG,
2665        "%s: Other peer %s disconnected \n",
2666        GNUNET_i2s (&socket->other_peer),
2667        GNUNET_i2s (&socket->other_peer));
2668 }
2669
2670
2671 /**
2672  * Method called whenever a peer creates a tunnel to us
2673  *
2674  * @param cls closure
2675  * @param tunnel new handle to the tunnel
2676  * @param initiator peer that started the tunnel
2677  * @param atsi performance information for the tunnel
2678  * @return initial tunnel context for the tunnel
2679  *         (can be NULL -- that's not an error)
2680  */
2681 static void *
2682 new_tunnel_notify (void *cls,
2683                    struct GNUNET_MESH_Tunnel *tunnel,
2684                    const struct GNUNET_PeerIdentity *initiator,
2685                    const struct GNUNET_ATS_Information *atsi)
2686 {
2687   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2688   struct GNUNET_STREAM_Socket *socket;
2689
2690   /* FIXME: If a tunnel is already created, we should not accept new tunnels
2691      from the same peer again until the socket is closed */
2692
2693   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2694   socket->other_peer = *initiator;
2695   socket->tunnel = tunnel;
2696   socket->session_id = 0;       /* FIXME */
2697   socket->state = STATE_INIT;
2698   socket->lsocket = lsocket;
2699   socket->retransmit_timeout = lsocket->retransmit_timeout;
2700   socket->testing_active = lsocket->testing_active;
2701   socket->testing_set_write_sequence_number_value =
2702     lsocket->testing_set_write_sequence_number_value;
2703     
2704   LOG (GNUNET_ERROR_TYPE_DEBUG,
2705        "%s: Peer %s initiated tunnel to us\n", 
2706        GNUNET_i2s (&socket->other_peer),
2707        GNUNET_i2s (&socket->other_peer));
2708   
2709   /* FIXME: Copy MESH handle from lsocket to socket */
2710   
2711   return socket;
2712 }
2713
2714
2715 /**
2716  * Function called whenever an inbound tunnel is destroyed.  Should clean up
2717  * any associated state.  This function is NOT called if the client has
2718  * explicitly asked for the tunnel to be destroyed using
2719  * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
2720  * the tunnel.
2721  *
2722  * @param cls closure (set from GNUNET_MESH_connect)
2723  * @param tunnel connection to the other end (henceforth invalid)
2724  * @param tunnel_ctx place where local state associated
2725  *                   with the tunnel is stored
2726  */
2727 static void 
2728 tunnel_cleaner (void *cls,
2729                 const struct GNUNET_MESH_Tunnel *tunnel,
2730                 void *tunnel_ctx)
2731 {
2732   struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
2733
2734   if (tunnel != socket->tunnel)
2735     return;
2736
2737   GNUNET_break_op(0);
2738   LOG (GNUNET_ERROR_TYPE_DEBUG,
2739        "%s: Peer %s has terminated connection abruptly\n",
2740        GNUNET_i2s (&socket->other_peer),
2741        GNUNET_i2s (&socket->other_peer));
2742
2743   socket->status = GNUNET_STREAM_SHUTDOWN;
2744
2745   /* Clear Transmit handles */
2746   if (NULL != socket->transmit_handle)
2747   {
2748     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2749     socket->transmit_handle = NULL;
2750   }
2751   if (NULL != socket->ack_transmit_handle)
2752   {
2753     GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle);
2754     GNUNET_free (socket->ack_msg);
2755     socket->ack_msg = NULL;
2756     socket->ack_transmit_handle = NULL;
2757   }
2758   /* Stop Tasks using socket->tunnel */
2759   if (GNUNET_SCHEDULER_NO_TASK != socket->ack_task_id)
2760   {
2761     GNUNET_SCHEDULER_cancel (socket->ack_task_id);
2762     socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
2763   }
2764   if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
2765   {
2766     GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
2767     socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
2768   }
2769   /* FIXME: Cancel all other tasks using socket->tunnel */
2770   socket->tunnel = NULL;
2771 }
2772
2773
2774 /*****************/
2775 /* API functions */
2776 /*****************/
2777
2778
2779 /**
2780  * Tries to open a stream to the target peer
2781  *
2782  * @param cfg configuration to use
2783  * @param target the target peer to which the stream has to be opened
2784  * @param app_port the application port number which uniquely identifies this
2785  *            stream
2786  * @param open_cb this function will be called after stream has be established 
2787  * @param open_cb_cls the closure for open_cb
2788  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
2789  * @return if successful it returns the stream socket; NULL if stream cannot be
2790  *         opened 
2791  */
2792 struct GNUNET_STREAM_Socket *
2793 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
2794                     const struct GNUNET_PeerIdentity *target,
2795                     GNUNET_MESH_ApplicationType app_port,
2796                     GNUNET_STREAM_OpenCallback open_cb,
2797                     void *open_cb_cls,
2798                     ...)
2799 {
2800   struct GNUNET_STREAM_Socket *socket;
2801   enum GNUNET_STREAM_Option option;
2802   GNUNET_MESH_ApplicationType ports[] = {app_port, 0};
2803   va_list vargs;                /* Variable arguments */
2804
2805   LOG (GNUNET_ERROR_TYPE_DEBUG,
2806        "%s\n", __func__);
2807   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2808   socket->other_peer = *target;
2809   socket->open_cb = open_cb;
2810   socket->open_cls = open_cb_cls;
2811   /* Set defaults */
2812   socket->retransmit_timeout = 
2813     GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
2814   socket->testing_active = GNUNET_NO;
2815   va_start (vargs, open_cb_cls); /* Parse variable args */
2816   do {
2817     option = va_arg (vargs, enum GNUNET_STREAM_Option);
2818     switch (option)
2819     {
2820     case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
2821       /* Expect struct GNUNET_TIME_Relative */
2822       socket->retransmit_timeout = va_arg (vargs,
2823                                            struct GNUNET_TIME_Relative);
2824       break;
2825     case GNUNET_STREAM_OPTION_TESTING_SET_WRITE_SEQUENCE_NUMBER:
2826       socket->testing_active = GNUNET_YES;
2827       socket->testing_set_write_sequence_number_value = va_arg (vargs,
2828                                                                 uint32_t);
2829       break;
2830     case GNUNET_STREAM_OPTION_END:
2831       break;
2832     }
2833   } while (GNUNET_STREAM_OPTION_END != option);
2834   va_end (vargs);               /* End of variable args parsing */
2835   socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
2836                                       RECEIVE_BUFFER_SIZE,  /* QUEUE size as parameter? */
2837                                       socket, /* cls */
2838                                       NULL, /* No inbound tunnel handler */
2839                                       NULL, /* No in-tunnel cleaner */
2840                                       client_message_handlers,
2841                                       ports); /* We don't get inbound tunnels */
2842   if (NULL == socket->mesh)   /* Fail if we cannot connect to mesh */
2843   {
2844     GNUNET_free (socket);
2845     return NULL;
2846   }
2847
2848   /* Now create the mesh tunnel to target */
2849   LOG (GNUNET_ERROR_TYPE_DEBUG,
2850        "Creating MESH Tunnel\n");
2851   socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
2852                                               NULL, /* Tunnel context */
2853                                               &mesh_peer_connect_callback,
2854                                               &mesh_peer_disconnect_callback,
2855                                               socket);
2856   GNUNET_assert (NULL != socket->tunnel);
2857   GNUNET_MESH_peer_request_connect_add (socket->tunnel,
2858                                         &socket->other_peer);
2859   
2860   LOG (GNUNET_ERROR_TYPE_DEBUG,
2861        "%s() END\n", __func__);
2862   return socket;
2863 }
2864
2865
2866 /**
2867  * Shutdown the stream for reading or writing (similar to man 2 shutdown).
2868  *
2869  * @param socket the stream socket
2870  * @param operation SHUT_RD, SHUT_WR or SHUT_RDWR
2871  * @param completion_cb the callback that will be called upon successful
2872  *          shutdown of given operation
2873  * @param completion_cls the closure for the completion callback
2874  * @return the shutdown handle
2875  */
2876 struct GNUNET_STREAM_ShutdownHandle *
2877 GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
2878                         int operation,
2879                         GNUNET_STREAM_ShutdownCompletion completion_cb,
2880                         void *completion_cls)
2881 {
2882   struct GNUNET_STREAM_ShutdownHandle *handle;
2883   struct GNUNET_STREAM_MessageHeader *msg;
2884   
2885   GNUNET_assert (NULL == socket->shutdown_handle);
2886
2887   handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ShutdownHandle));
2888   handle->socket = socket;
2889   handle->completion_cb = completion_cb;
2890   handle->completion_cls = completion_cls;
2891   socket->shutdown_handle = handle;
2892
2893   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
2894   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
2895   switch (operation)
2896   {
2897   case SHUT_RD:
2898     handle->operation = SHUT_RD;
2899     if (NULL != socket->read_handle)
2900       LOG (GNUNET_ERROR_TYPE_WARNING,
2901            "Existing read handle should be cancelled before shutting"
2902            " down reading\n");
2903     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
2904     queue_message (socket,
2905                    msg,
2906                    &set_state_receive_close_wait,
2907                    NULL);
2908     break;
2909   case SHUT_WR:
2910     handle->operation = SHUT_WR;
2911     if (NULL != socket->write_handle)
2912       LOG (GNUNET_ERROR_TYPE_WARNING,
2913            "Existing write handle should be cancelled before shutting"
2914            " down writing\n");
2915     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
2916     queue_message (socket,
2917                    msg,
2918                    &set_state_transmit_close_wait,
2919                    NULL);
2920     break;
2921   case SHUT_RDWR:
2922     handle->operation = SHUT_RDWR;
2923     if (NULL != socket->write_handle)
2924       LOG (GNUNET_ERROR_TYPE_WARNING,
2925            "Existing write handle should be cancelled before shutting"
2926            " down writing\n");
2927     if (NULL != socket->read_handle)
2928       LOG (GNUNET_ERROR_TYPE_WARNING,
2929            "Existing read handle should be cancelled before shutting"
2930            " down reading\n");
2931     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
2932     queue_message (socket,
2933                    msg,
2934                    &set_state_close_wait,
2935                    NULL);
2936     break;
2937   default:
2938     LOG (GNUNET_ERROR_TYPE_WARNING,
2939          "GNUNET_STREAM_shutdown called with invalid value for "
2940          "parameter operation -- Ignoring\n");
2941     GNUNET_free (msg);
2942     GNUNET_free (handle);
2943     return NULL;
2944   }
2945   handle->close_msg_retransmission_task_id =
2946     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
2947                                   &close_msg_retransmission_task,
2948                                   handle);
2949   return handle;
2950 }
2951
2952
2953 /**
2954  * Cancels a pending shutdown
2955  *
2956  * @param handle the shutdown handle returned from GNUNET_STREAM_shutdown
2957  */
2958 void
2959 GNUNET_STREAM_shutdown_cancel (struct GNUNET_STREAM_ShutdownHandle *handle)
2960 {
2961   if (GNUNET_SCHEDULER_NO_TASK != handle->close_msg_retransmission_task_id)
2962     GNUNET_SCHEDULER_cancel (handle->close_msg_retransmission_task_id);
2963   GNUNET_free (handle);
2964   return;
2965 }
2966
2967
2968 /**
2969  * Closes the stream
2970  *
2971  * @param socket the stream socket
2972  */
2973 void
2974 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
2975 {
2976   struct MessageQueue *head;
2977
2978   if (NULL != socket->read_handle)
2979   {
2980     LOG (GNUNET_ERROR_TYPE_WARNING,
2981          "Closing STREAM socket when a read handle is pending\n");
2982   }
2983   if (NULL != socket->write_handle)
2984   {
2985     LOG (GNUNET_ERROR_TYPE_WARNING,
2986          "Closing STREAM socket when a write handle is pending\n");
2987     GNUNET_STREAM_io_write_cancel (socket->write_handle);
2988     //socket->write_handle = NULL;
2989   }
2990
2991   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
2992   {
2993     /* socket closed with read task pending!? */
2994     GNUNET_break (0);
2995     GNUNET_SCHEDULER_cancel (socket->read_task_id);
2996     socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
2997   }
2998   
2999   /* Terminate the ack'ing tasks if they are still present */
3000   if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK)
3001   {
3002     GNUNET_SCHEDULER_cancel (socket->ack_task_id);
3003     socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
3004   }
3005
3006   /* Clear Transmit handles */
3007   if (NULL != socket->transmit_handle)
3008   {
3009     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
3010     socket->transmit_handle = NULL;
3011   }
3012   if (NULL != socket->ack_transmit_handle)
3013   {
3014     GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle);
3015     GNUNET_free (socket->ack_msg);
3016     socket->ack_msg = NULL;
3017     socket->ack_transmit_handle = NULL;
3018   }
3019
3020   /* Clear existing message queue */
3021   while (NULL != (head = socket->queue_head)) {
3022     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
3023                                  socket->queue_tail,
3024                                  head);
3025     GNUNET_free (head->message);
3026     GNUNET_free (head);
3027   }
3028
3029   /* Close associated tunnel */
3030   if (NULL != socket->tunnel)
3031   {
3032     GNUNET_MESH_tunnel_destroy (socket->tunnel);
3033     socket->tunnel = NULL;
3034   }
3035
3036   /* Close mesh connection */
3037   if (NULL != socket->mesh && NULL == socket->lsocket)
3038   {
3039     GNUNET_MESH_disconnect (socket->mesh);
3040     socket->mesh = NULL;
3041   }
3042   
3043   /* Release receive buffer */
3044   if (NULL != socket->receive_buffer)
3045   {
3046     GNUNET_free (socket->receive_buffer);
3047   }
3048
3049   GNUNET_free (socket);
3050 }
3051
3052
3053 /**
3054  * Listens for stream connections for a specific application ports
3055  *
3056  * @param cfg the configuration to use
3057  * @param app_port the application port for which new streams will be accepted
3058  * @param listen_cb this function will be called when a peer tries to establish
3059  *            a stream with us
3060  * @param listen_cb_cls closure for listen_cb
3061  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
3062  * @return listen socket, NULL for any error
3063  */
3064 struct GNUNET_STREAM_ListenSocket *
3065 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
3066                       GNUNET_MESH_ApplicationType app_port,
3067                       GNUNET_STREAM_ListenCallback listen_cb,
3068                       void *listen_cb_cls,
3069                       ...)
3070 {
3071   /* FIXME: Add variable args for passing configration options? */
3072   struct GNUNET_STREAM_ListenSocket *lsocket;
3073   GNUNET_MESH_ApplicationType ports[] = {app_port, 0};
3074   enum GNUNET_STREAM_Option option;
3075   va_list vargs;
3076
3077   lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
3078   /* Set defaults */
3079   lsocket->retransmit_timeout = 
3080     GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
3081   lsocket->testing_active = GNUNET_NO;
3082   va_start (vargs, listen_cb_cls);
3083   do {
3084     option = va_arg (vargs, enum GNUNET_STREAM_Option);
3085     switch (option)
3086     {
3087     case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
3088       lsocket->retransmit_timeout = va_arg (vargs,
3089                                             struct GNUNET_TIME_Relative);
3090       break;
3091     case GNUNET_STREAM_OPTION_TESTING_SET_WRITE_SEQUENCE_NUMBER:
3092       lsocket->testing_active = GNUNET_YES;
3093       lsocket->testing_set_write_sequence_number_value = va_arg (vargs,
3094                                                                  uint32_t);
3095       break;
3096     case GNUNET_STREAM_OPTION_END:
3097       break;
3098     }
3099   } while (GNUNET_STREAM_OPTION_END != option);
3100   va_end (vargs);
3101   lsocket->port = app_port;
3102   lsocket->listen_cb = listen_cb;
3103   lsocket->listen_cb_cls = listen_cb_cls;
3104   lsocket->mesh = GNUNET_MESH_connect (cfg,
3105                                        RECEIVE_BUFFER_SIZE, /* FIXME: QUEUE size as parameter? */
3106                                        lsocket, /* Closure */
3107                                        &new_tunnel_notify,
3108                                        &tunnel_cleaner,
3109                                        server_message_handlers,
3110                                        ports);
3111   GNUNET_assert (NULL != lsocket->mesh);
3112   return lsocket;
3113 }
3114
3115
3116 /**
3117  * Closes the listen socket
3118  *
3119  * @param lsocket the listen socket
3120  */
3121 void
3122 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
3123 {
3124   /* Close MESH connection */
3125   GNUNET_assert (NULL != lsocket->mesh);
3126   GNUNET_MESH_disconnect (lsocket->mesh);
3127   
3128   GNUNET_free (lsocket);
3129 }
3130
3131
3132 /**
3133  * Tries to write the given data to the stream. The maximum size of data that
3134  * can be written as part of a write operation is (64 * (64000 - sizeof (struct
3135  * GNUNET_STREAM_DataMessage))). If size is greater than this it is not an API
3136  * violation, however only the said number of maximum bytes will be written.
3137  *
3138  * @param socket the socket representing a stream
3139  * @param data the data buffer from where the data is written into the stream
3140  * @param size the number of bytes to be written from the data buffer
3141  * @param timeout the timeout period
3142  * @param write_cont the function to call upon writing some bytes into the
3143  *          stream 
3144  * @param write_cont_cls the closure
3145  *
3146  * @return handle to cancel the operation; if a previous write is pending or
3147  *           the stream has been shutdown for this operation then write_cont is
3148  *           immediately called and NULL is returned.
3149  */
3150 struct GNUNET_STREAM_IOWriteHandle *
3151 GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
3152                      const void *data,
3153                      size_t size,
3154                      struct GNUNET_TIME_Relative timeout,
3155                      GNUNET_STREAM_CompletionContinuation write_cont,
3156                      void *write_cont_cls)
3157 {
3158   unsigned int num_needed_packets;
3159   unsigned int packet;
3160   struct GNUNET_STREAM_IOWriteHandle *io_handle;
3161   uint32_t packet_size;
3162   uint32_t payload_size;
3163   struct GNUNET_STREAM_DataMessage *data_msg;
3164   const void *sweep;
3165   struct GNUNET_TIME_Relative ack_deadline;
3166
3167   LOG (GNUNET_ERROR_TYPE_DEBUG,
3168        "%s\n", __func__);
3169
3170   /* Return NULL if there is already a write request pending */
3171   if (NULL != socket->write_handle)
3172   {
3173     GNUNET_break (0);
3174     return NULL;
3175   }
3176
3177   switch (socket->state)
3178   {
3179   case STATE_TRANSMIT_CLOSED:
3180   case STATE_TRANSMIT_CLOSE_WAIT:
3181   case STATE_CLOSED:
3182   case STATE_CLOSE_WAIT:
3183     if (NULL != write_cont)
3184       write_cont (write_cont_cls, GNUNET_STREAM_SHUTDOWN, 0);
3185     LOG (GNUNET_ERROR_TYPE_DEBUG,
3186          "%s() END\n", __func__);
3187     return NULL;
3188   case STATE_INIT:
3189   case STATE_LISTEN:
3190   case STATE_HELLO_WAIT:
3191     if (NULL != write_cont)
3192       /* FIXME: GNUNET_STREAM_SYSERR?? */
3193       write_cont (write_cont_cls, GNUNET_STREAM_SYSERR, 0);
3194     LOG (GNUNET_ERROR_TYPE_DEBUG,
3195          "%s() END\n", __func__);
3196     return NULL;
3197   case STATE_ESTABLISHED:
3198   case STATE_RECEIVE_CLOSED:
3199   case STATE_RECEIVE_CLOSE_WAIT:
3200     break;
3201   }
3202
3203   if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size)
3204     size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH  * max_payload_size;
3205   num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size;
3206   io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
3207   io_handle->socket = socket;
3208   io_handle->write_cont = write_cont;
3209   io_handle->write_cont_cls = write_cont_cls;
3210   io_handle->size = size;
3211   sweep = data;
3212   /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3213      determined from RTT */
3214   ack_deadline = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
3215   /* Divide the given buffer into packets for sending */
3216   for (packet=0; packet < num_needed_packets; packet++)
3217   {
3218     if ((packet + 1) * max_payload_size < size) 
3219     {
3220       payload_size = max_payload_size;
3221       packet_size = MAX_PACKET_SIZE;
3222     }
3223     else 
3224     {
3225       payload_size = size - packet * max_payload_size;
3226       packet_size =  payload_size + sizeof (struct
3227                                             GNUNET_STREAM_DataMessage); 
3228     }
3229     io_handle->messages[packet] = GNUNET_malloc (packet_size);
3230     io_handle->messages[packet]->header.header.size = htons (packet_size);
3231     io_handle->messages[packet]->header.header.type =
3232       htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
3233     io_handle->messages[packet]->sequence_number =
3234       htonl (socket->write_sequence_number++);
3235     io_handle->messages[packet]->offset = htonl (socket->write_offset);
3236
3237     /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3238        determined from RTT */
3239     io_handle->messages[packet]->ack_deadline =
3240       GNUNET_TIME_relative_hton (ack_deadline);
3241     data_msg = io_handle->messages[packet];
3242     /* Copy data from given buffer to the packet */
3243     memcpy (&data_msg[1],
3244             sweep,
3245             payload_size);
3246     sweep += payload_size;
3247     socket->write_offset += payload_size;
3248   }
3249   socket->write_handle = io_handle;
3250   write_data (socket);
3251
3252   LOG (GNUNET_ERROR_TYPE_DEBUG,
3253        "%s() END\n", __func__);
3254
3255   return io_handle;
3256 }
3257
3258
3259
3260 /**
3261  * Tries to read data from the stream.
3262  *
3263  * @param socket the socket representing a stream
3264  * @param timeout the timeout period
3265  * @param proc function to call with data (once only)
3266  * @param proc_cls the closure for proc
3267  *
3268  * @return handle to cancel the operation; if the stream has been shutdown for
3269  *           this type of opeartion then the DataProcessor is immediately
3270  *           called with GNUNET_STREAM_SHUTDOWN as status and NULL if returned
3271  */
3272 struct GNUNET_STREAM_IOReadHandle *
3273 GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
3274                     struct GNUNET_TIME_Relative timeout,
3275                     GNUNET_STREAM_DataProcessor proc,
3276                     void *proc_cls)
3277 {
3278   struct GNUNET_STREAM_IOReadHandle *read_handle;
3279   
3280   LOG (GNUNET_ERROR_TYPE_DEBUG,
3281        "%s: %s()\n", 
3282        GNUNET_i2s (&socket->other_peer),
3283        __func__);
3284
3285   /* Return NULL if there is already a read handle; the user has to cancel that
3286      first before continuing or has to wait until it is completed */
3287   if (NULL != socket->read_handle) return NULL;
3288
3289   GNUNET_assert (NULL != proc);
3290
3291   switch (socket->state)
3292   {
3293   case STATE_RECEIVE_CLOSED:
3294   case STATE_RECEIVE_CLOSE_WAIT:
3295   case STATE_CLOSED:
3296   case STATE_CLOSE_WAIT:
3297     proc (proc_cls, GNUNET_STREAM_SHUTDOWN, NULL, 0);
3298     LOG (GNUNET_ERROR_TYPE_DEBUG,
3299          "%s: %s() END\n",
3300          GNUNET_i2s (&socket->other_peer),
3301          __func__);
3302     return NULL;
3303   default:
3304     break;
3305   }
3306
3307   read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
3308   read_handle->proc = proc;
3309   read_handle->proc_cls = proc_cls;
3310   socket->read_handle = read_handle;
3311
3312   /* Check if we have a packet at bitmap 0 */
3313   if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
3314                                           0))
3315   {
3316     socket->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
3317                                                      socket);
3318    
3319   }
3320   
3321   /* Setup the read timeout task */
3322   socket->read_io_timeout_task_id =
3323     GNUNET_SCHEDULER_add_delayed (timeout,
3324                                   &read_io_timeout,
3325                                   socket);
3326   LOG (GNUNET_ERROR_TYPE_DEBUG,
3327        "%s: %s() END\n",
3328        GNUNET_i2s (&socket->other_peer),
3329        __func__);
3330   return read_handle;
3331 }
3332
3333
3334 /**
3335  * Cancel pending write operation.
3336  *
3337  * @param ioh handle to operation to cancel
3338  */
3339 void
3340 GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
3341 {
3342   struct GNUNET_STREAM_Socket *socket = ioh->socket;
3343   unsigned int packet;
3344
3345   GNUNET_assert (NULL != socket->write_handle);
3346   GNUNET_assert (socket->write_handle == ioh);
3347
3348   if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
3349   {
3350     GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
3351     socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
3352   }
3353
3354   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
3355   {
3356     if (NULL == ioh->messages[packet]) break;
3357     GNUNET_free (ioh->messages[packet]);
3358   }
3359       
3360   GNUNET_free (socket->write_handle);
3361   socket->write_handle = NULL;
3362   return;
3363 }
3364
3365
3366 /**
3367  * Cancel pending read operation.
3368  *
3369  * @param ioh handle to operation to cancel
3370  */
3371 void
3372 GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh)
3373 {
3374   return;
3375 }