-added STREAM_shutdown
[oweals/gnunet.git] / src / stream / stream_api.c
1 /*
2   This file is part of GNUnet.
3   (C) 2012 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /* TODO:
22  *
23  * Checks for matching the sender and socket->other_peer in server
24  * message handlers  
25  *
26  * Decrement PEER intern count during socket close and listen close to free the
27  * memory used for PEER interning
28  *
29  * Add code for write io timeout
30  *
31  * Include retransmission for control messages
32  **/
33
34 /**
35  * @file stream/stream_api.c
36  * @brief Implementation of the stream library
37  * @author Sree Harsha Totakura
38  */
39
40
41 #include "platform.h"
42 #include "gnunet_common.h"
43 #include "gnunet_crypto_lib.h"
44 #include "gnunet_stream_lib.h"
45 #include "gnunet_testing_lib.h"
46 #include "stream_protocol.h"
47
48
49 /**
50  * The maximum packet size of a stream packet
51  */
52 #define MAX_PACKET_SIZE 64000
53
54 /**
55  * Receive buffer
56  */
57 #define RECEIVE_BUFFER_SIZE 4096000
58
59 /**
60  * The maximum payload a data message packet can carry
61  */
62 static size_t max_payload_size = 
63   MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
64
65 /**
66  * states in the Protocol
67  */
68 enum State
69   {
70     /**
71      * Client initialization state
72      */
73     STATE_INIT,
74
75     /**
76      * Listener initialization state 
77      */
78     STATE_LISTEN,
79
80     /**
81      * Pre-connection establishment state
82      */
83     STATE_HELLO_WAIT,
84
85     /**
86      * State where a connection has been established
87      */
88     STATE_ESTABLISHED,
89
90     /**
91      * State where the socket is closed on our side and waiting to be ACK'ed
92      */
93     STATE_RECEIVE_CLOSE_WAIT,
94
95     /**
96      * State where the socket is closed for reading
97      */
98     STATE_RECEIVE_CLOSED,
99
100     /**
101      * State where the socket is closed on our side and waiting to be ACK'ed
102      */
103     STATE_TRANSMIT_CLOSE_WAIT,
104
105     /**
106      * State where the socket is closed for writing
107      */
108     STATE_TRANSMIT_CLOSED,
109
110     /**
111      * State where the socket is closed on our side and waiting to be ACK'ed
112      */
113     STATE_CLOSE_WAIT,
114
115     /**
116      * State where the socket is closed
117      */
118     STATE_CLOSED 
119   };
120
121
122 /**
123  * Functions of this type are called when a message is written
124  *
125  * @param cls the closure from queue_message
126  * @param socket the socket the written message was bound to
127  */
128 typedef void (*SendFinishCallback) (void *cls,
129                                     struct GNUNET_STREAM_Socket *socket);
130
131
132 /**
133  * The send message queue
134  */
135 struct MessageQueue
136 {
137   /**
138    * The message
139    */
140   struct GNUNET_STREAM_MessageHeader *message;
141
142   /**
143    * Callback to be called when the message is sent
144    */
145   SendFinishCallback finish_cb;
146
147   /**
148    * The closure for finish_cb
149    */
150   void *finish_cb_cls;
151
152   /**
153    * The next message in queue. Should be NULL in the last message
154    */
155   struct MessageQueue *next;
156
157   /**
158    * The next message in queue. Should be NULL in the first message
159    */
160   struct MessageQueue *prev;
161 };
162
163
164 /**
165  * The STREAM Socket Handler
166  */
167 struct GNUNET_STREAM_Socket
168 {
169   /**
170    * Retransmission timeout
171    */
172   struct GNUNET_TIME_Relative retransmit_timeout;
173
174   /**
175    * The Acknowledgement Bitmap
176    */
177   GNUNET_STREAM_AckBitmap ack_bitmap;
178
179   /**
180    * Time when the Acknowledgement was queued
181    */
182   struct GNUNET_TIME_Absolute ack_time_registered;
183
184   /**
185    * Queued Acknowledgement deadline
186    */
187   struct GNUNET_TIME_Relative ack_time_deadline;
188
189   /**
190    * The mesh handle
191    */
192   struct GNUNET_MESH_Handle *mesh;
193
194   /**
195    * The mesh tunnel handle
196    */
197   struct GNUNET_MESH_Tunnel *tunnel;
198
199   /**
200    * Stream open closure
201    */
202   void *open_cls;
203
204   /**
205    * Stream open callback
206    */
207   GNUNET_STREAM_OpenCallback open_cb;
208
209   /**
210    * The current transmit handle (if a pending transmit request exists)
211    */
212   struct GNUNET_MESH_TransmitHandle *transmit_handle;
213
214   /**
215    * The current message associated with the transmit handle
216    */
217   struct MessageQueue *queue_head;
218
219   /**
220    * The queue tail, should always point to the last message in queue
221    */
222   struct MessageQueue *queue_tail;
223
224   /**
225    * The write IO_handle associated with this socket
226    */
227   struct GNUNET_STREAM_IOWriteHandle *write_handle;
228
229   /**
230    * The read IO_handle associated with this socket
231    */
232   struct GNUNET_STREAM_IOReadHandle *read_handle;
233
234   /**
235    * Buffer for storing received messages
236    */
237   void *receive_buffer;
238
239   /**
240    * The listen socket from which this socket is derived. Should be NULL if it
241    * is not a derived socket
242    */
243   struct GNUNET_STREAM_ListenSocket *lsocket;
244
245   /**
246    * Task identifier for the read io timeout task
247    */
248   GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id;
249
250   /**
251    * Task identifier for retransmission task after timeout
252    */
253   GNUNET_SCHEDULER_TaskIdentifier retransmission_timeout_task_id;
254
255   /**
256    * The task for sending timely Acks
257    */
258   GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
259
260   /**
261    * Task scheduled to continue a read operation.
262    */
263   GNUNET_SCHEDULER_TaskIdentifier read_task_id;
264
265   /**
266    * The state of the protocol associated with this socket
267    */
268   enum State state;
269
270   /**
271    * The status of the socket
272    */
273   enum GNUNET_STREAM_Status status;
274
275   /**
276    * The number of previous timeouts; FIXME: currently not used
277    */
278   unsigned int retries;
279
280   /**
281    * The peer identity of the peer at the other end of the stream
282    */
283   GNUNET_PEER_Id other_peer;
284
285   /**
286    * Our Peer Identity (for debugging)
287    */
288   GNUNET_PEER_Id our_id;
289
290   /**
291    * The application port number (type: uint32_t)
292    */
293   GNUNET_MESH_ApplicationType app_port;
294
295   /**
296    * The session id associated with this stream connection
297    * FIXME: Not used currently, may be removed
298    */
299   uint32_t session_id;
300
301   /**
302    * Write sequence number. Set to random when sending HELLO(client) and
303    * HELLO_ACK(server) 
304    */
305   uint32_t write_sequence_number;
306
307   /**
308    * Read sequence number. This number's value is determined during handshake
309    */
310   uint32_t read_sequence_number;
311
312   /**
313    * The receiver buffer size
314    */
315   uint32_t receive_buffer_size;
316
317   /**
318    * The receiver buffer boundaries
319    */
320   uint32_t receive_buffer_boundaries[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
321
322   /**
323    * receiver's available buffer after the last acknowledged packet
324    */
325   uint32_t receiver_window_available;
326
327   /**
328    * The offset pointer used during write operation
329    */
330   uint32_t write_offset;
331
332   /**
333    * The offset after which we are expecting data
334    */
335   uint32_t read_offset;
336
337   /**
338    * The offset upto which user has read from the received buffer
339    */
340   uint32_t copy_offset;
341 };
342
343
344 /**
345  * A socket for listening
346  */
347 struct GNUNET_STREAM_ListenSocket
348 {
349   /**
350    * The mesh handle
351    */
352   struct GNUNET_MESH_Handle *mesh;
353
354   /**
355    * The callback function which is called after successful opening socket
356    */
357   GNUNET_STREAM_ListenCallback listen_cb;
358
359   /**
360    * The call back closure
361    */
362   void *listen_cb_cls;
363
364   /**
365    * Our interned Peer's identity
366    */
367   GNUNET_PEER_Id our_id;
368
369   /**
370    * The service port
371    * FIXME: Remove if not required!
372    */
373   GNUNET_MESH_ApplicationType port;
374 };
375
376
377 /**
378  * The IO Write Handle
379  */
380 struct GNUNET_STREAM_IOWriteHandle
381 {
382   /**
383    * The socket to which this write handle is associated
384    */
385   struct GNUNET_STREAM_Socket *socket;
386
387   /**
388    * The packet_buffers associated with this Handle
389    */
390   struct GNUNET_STREAM_DataMessage *messages[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
391
392   /**
393    * The write continuation callback
394    */
395   GNUNET_STREAM_CompletionContinuation write_cont;
396
397   /**
398    * Write continuation closure
399    */
400   void *write_cont_cls;
401
402   /**
403    * The bitmap of this IOHandle; Corresponding bit for a message is set when
404    * it has been acknowledged by the receiver
405    */
406   GNUNET_STREAM_AckBitmap ack_bitmap;
407
408   /**
409    * Number of bytes in this write handle
410    */
411   size_t size;
412 };
413
414
415 /**
416  * The IO Read Handle
417  */
418 struct GNUNET_STREAM_IOReadHandle
419 {
420   /**
421    * Callback for the read processor
422    */
423   GNUNET_STREAM_DataProcessor proc;
424
425   /**
426    * The closure pointer for the read processor callback
427    */
428   void *proc_cls;
429 };
430
431
432 /**
433  * Handle for Shutdown
434  */
435 struct GNUNET_STREAM_ShutdownHandle
436 {
437   /**
438    * The socket associated with this shutdown handle
439    */
440   struct GNUNET_STREAM_Socket *socket;
441
442   /**
443    * Which operation to shutdown? SHUT_RD, SHUT_WR or SHUT_RDWR
444    */
445   int operation;
446 };
447
448
449 /**
450  * Default value in seconds for various timeouts
451  */
452 static unsigned int default_timeout = 10;
453
454
455 /**
456  * Callback function for sending queued message
457  *
458  * @param cls closure the socket
459  * @param size number of bytes available in buf
460  * @param buf where the callee should write the message
461  * @return number of bytes written to buf
462  */
463 static size_t
464 send_message_notify (void *cls, size_t size, void *buf)
465 {
466   struct GNUNET_STREAM_Socket *socket = cls;
467   struct GNUNET_PeerIdentity target;
468   struct MessageQueue *head;
469   size_t ret;
470
471   socket->transmit_handle = NULL; /* Remove the transmit handle */
472   head = socket->queue_head;
473   if (NULL == head)
474     return 0; /* just to be safe */
475   GNUNET_PEER_resolve (socket->other_peer, &target);
476   if (0 == size)                /* request timed out */
477     {
478       socket->retries++;
479       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
480                   "Message sending timed out. Retry %d \n",
481                   socket->retries);
482       socket->transmit_handle = 
483         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
484                                            0, /* Corking */
485                                            1, /* Priority */
486                                            /* FIXME: exponential backoff */
487                                            socket->retransmit_timeout,
488                                            &target,
489                                            ntohs (head->message->header.size),
490                                            &send_message_notify,
491                                            socket);
492       return 0;
493     }
494
495   ret = ntohs (head->message->header.size);
496   GNUNET_assert (size >= ret);
497   memcpy (buf, head->message, ret);
498   if (NULL != head->finish_cb)
499     {
500       head->finish_cb (head->finish_cb_cls, socket);
501     }
502   GNUNET_CONTAINER_DLL_remove (socket->queue_head,
503                                socket->queue_tail,
504                                head);
505   GNUNET_free (head->message);
506   GNUNET_free (head);
507   head = socket->queue_head;
508   if (NULL != head)    /* more pending messages to send */
509     {
510       socket->retries = 0;
511       socket->transmit_handle = 
512         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
513                                            0, /* Corking */
514                                            1, /* Priority */
515                                            /* FIXME: exponential backoff */
516                                            socket->retransmit_timeout,
517                                            &target,
518                                            ntohs (head->message->header.size),
519                                            &send_message_notify,
520                                            socket);
521     }
522   return ret;
523 }
524
525
526 /**
527  * Queues a message for sending using the mesh connection of a socket
528  *
529  * @param socket the socket whose mesh connection is used
530  * @param message the message to be sent
531  * @param finish_cb the callback to be called when the message is sent
532  * @param finish_cb_cls the closure for the callback
533  */
534 static void
535 queue_message (struct GNUNET_STREAM_Socket *socket,
536                struct GNUNET_STREAM_MessageHeader *message,
537                SendFinishCallback finish_cb,
538                void *finish_cb_cls)
539 {
540   struct MessageQueue *queue_entity;
541   struct GNUNET_PeerIdentity target;
542
543   GNUNET_assert 
544     ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA)
545      && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK));
546
547   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
548               "%x: Queueing message of type %d and size %d\n",
549               socket->our_id,
550               ntohs (message->header.type),
551               ntohs (message->header.size));
552   GNUNET_assert (NULL != message);
553   queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
554   queue_entity->message = message;
555   queue_entity->finish_cb = finish_cb;
556   queue_entity->finish_cb_cls = finish_cb_cls;
557   GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
558                                     socket->queue_tail,
559                                     queue_entity);
560   if (NULL == socket->transmit_handle)
561   {
562     socket->retries = 0;
563     GNUNET_PEER_resolve (socket->other_peer, &target);
564     socket->transmit_handle = 
565       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
566                                          0, /* Corking */
567                                          1, /* Priority */
568                                          socket->retransmit_timeout,
569                                          &target,
570                                          ntohs (message->header.size),
571                                          &send_message_notify,
572                                          socket);
573   }
574 }
575
576
577 /**
578  * Copies a message and queues it for sending using the mesh connection of
579  * given socket 
580  *
581  * @param socket the socket whose mesh connection is used
582  * @param message the message to be sent
583  * @param finish_cb the callback to be called when the message is sent
584  * @param finish_cb_cls the closure for the callback
585  */
586 static void
587 copy_and_queue_message (struct GNUNET_STREAM_Socket *socket,
588                         const struct GNUNET_STREAM_MessageHeader *message,
589                         SendFinishCallback finish_cb,
590                         void *finish_cb_cls)
591 {
592   struct GNUNET_STREAM_MessageHeader *msg_copy;
593   uint16_t size;
594   
595   size = ntohs (message->header.size);
596   msg_copy = GNUNET_malloc (size);
597   memcpy (msg_copy, message, size);
598   queue_message (socket, msg_copy, finish_cb, finish_cb_cls);
599 }
600
601
602 /**
603  * Callback function for sending ack message
604  *
605  * @param cls closure the ACK message created in ack_task
606  * @param size number of bytes available in buffer
607  * @param buf where the callee should write the message
608  * @return number of bytes written to buf
609  */
610 static size_t
611 send_ack_notify (void *cls, size_t size, void *buf)
612 {
613   struct GNUNET_STREAM_AckMessage *ack_msg = cls;
614
615   if (0 == size)
616     {
617       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
618                   "%s called with size 0\n", __func__);
619       return 0;
620     }
621   GNUNET_assert (ntohs (ack_msg->header.header.size) <= size);
622   
623   size = ntohs (ack_msg->header.header.size);
624   memcpy (buf, ack_msg, size);
625   return size;
626 }
627
628 /**
629  * Writes data using the given socket. The amount of data written is limited by
630  * the receiver_window_size
631  *
632  * @param socket the socket to use
633  */
634 static void 
635 write_data (struct GNUNET_STREAM_Socket *socket);
636
637 /**
638  * Task for retransmitting data messages if they aren't ACK before their ack
639  * deadline 
640  *
641  * @param cls the socket
642  * @param tc the Task context
643  */
644 static void
645 retransmission_timeout_task (void *cls,
646                              const struct GNUNET_SCHEDULER_TaskContext *tc)
647 {
648   struct GNUNET_STREAM_Socket *socket = cls;
649   
650   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
651     return;
652
653   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
654               "%x: Retransmitting DATA...\n", socket->our_id);
655   socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
656   write_data (socket);
657 }
658
659
660 /**
661  * Task for sending ACK message
662  *
663  * @param cls the socket
664  * @param tc the Task context
665  */
666 static void
667 ack_task (void *cls,
668           const struct GNUNET_SCHEDULER_TaskContext *tc)
669 {
670   struct GNUNET_STREAM_Socket *socket = cls;
671   struct GNUNET_STREAM_AckMessage *ack_msg;
672   struct GNUNET_PeerIdentity target;
673
674   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
675     {
676       return;
677     }
678
679   socket->ack_task_id = 0;
680
681   /* Create the ACK Message */
682   ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
683   ack_msg->header.header.size = htons (sizeof (struct 
684                                                GNUNET_STREAM_AckMessage));
685   ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
686   ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
687   ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
688   ack_msg->receive_window_remaining = 
689     htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
690
691   GNUNET_PEER_resolve (socket->other_peer, &target);
692   /* Request MESH for sending ACK */
693   GNUNET_MESH_notify_transmit_ready (socket->tunnel,
694                                      0, /* Corking */
695                                      1, /* Priority */
696                                      socket->retransmit_timeout,
697                                      &target,
698                                      ntohs (ack_msg->header.header.size),
699                                      &send_ack_notify,
700                                      ack_msg);
701
702   
703 }
704
705
706 /**
707  * Function to modify a bit in GNUNET_STREAM_AckBitmap
708  *
709  * @param bitmap the bitmap to modify
710  * @param bit the bit number to modify
711  * @param value GNUNET_YES to on, GNUNET_NO to off
712  */
713 static void
714 ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
715                       unsigned int bit, 
716                       int value)
717 {
718   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
719   if (GNUNET_YES == value)
720     *bitmap |= (1LL << bit);
721   else
722     *bitmap &= ~(1LL << bit);
723 }
724
725
726 /**
727  * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
728  *
729  * @param bitmap address of the bitmap that has to be checked
730  * @param bit the bit number to check
731  * @return GNUNET_YES if the bit is set; GNUNET_NO if not
732  */
733 static uint8_t
734 ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
735                       unsigned int bit)
736 {
737   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
738   return 0 != (*bitmap & (1LL << bit));
739 }
740
741
742 /**
743  * Writes data using the given socket. The amount of data written is limited by
744  * the receiver_window_size
745  *
746  * @param socket the socket to use
747  */
748 static void 
749 write_data (struct GNUNET_STREAM_Socket *socket)
750 {
751   struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
752   int packet;                   /* Although an int, should never be negative */
753   int ack_packet;
754
755   ack_packet = -1;
756   /* Find the last acknowledged packet */
757   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
758     {      
759       if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
760                                               packet))
761         ack_packet = packet;        
762       else if (NULL == io_handle->messages[packet])
763         break;
764     }
765   /* Resend packets which weren't ack'ed */
766   for (packet=0; packet < ack_packet; packet++)
767     {
768       if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
769                                              packet))
770         {
771           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
772                       "%x: Placing DATA message with sequence %u in send queue\n",
773                       socket->our_id,
774                       ntohl (io_handle->messages[packet]->sequence_number));
775
776           copy_and_queue_message (socket,
777                                   &io_handle->messages[packet]->header,
778                                   NULL,
779                                   NULL);
780         }
781     }
782   packet = ack_packet + 1;
783   /* Now send new packets if there is enough buffer space */
784   while ( (NULL != io_handle->messages[packet]) &&
785           (socket->receiver_window_available 
786            >= ntohs (io_handle->messages[packet]->header.header.size)) )
787     {
788       socket->receiver_window_available -= 
789         ntohs (io_handle->messages[packet]->header.header.size);
790       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
791                   "%x: Placing DATA message with sequence %u in send queue\n",
792                   socket->our_id,
793                   ntohl (io_handle->messages[packet]->sequence_number));
794       copy_and_queue_message (socket,
795                               &io_handle->messages[packet]->header,
796                               NULL,
797                               NULL);
798       packet++;
799     }
800
801   if (GNUNET_SCHEDULER_NO_TASK == socket->retransmission_timeout_task_id)
802     socket->retransmission_timeout_task_id = 
803       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 
804                                     (GNUNET_TIME_UNIT_SECONDS, 8),
805                                     &retransmission_timeout_task,
806                                     socket);
807 }
808
809
810 /**
811  * Task for calling the read processor
812  *
813  * @param cls the socket
814  * @param tc the task context
815  */
816 static void
817 call_read_processor (void *cls,
818                      const struct GNUNET_SCHEDULER_TaskContext *tc)
819 {
820   struct GNUNET_STREAM_Socket *socket = cls;
821   size_t read_size;
822   size_t valid_read_size;
823   unsigned int packet;
824   uint32_t sequence_increase;
825   uint32_t offset_increase;
826
827   socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
828   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
829     return;
830
831   if (NULL == socket->receive_buffer) 
832     return;
833
834   GNUNET_assert (NULL != socket->read_handle);
835   GNUNET_assert (NULL != socket->read_handle->proc);
836
837   /* Check the bitmap for any holes */
838   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
839     {
840       if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
841                                              packet))
842         break;
843     }
844   /* We only call read processor if we have the first packet */
845   GNUNET_assert (0 < packet);
846
847   valid_read_size = 
848     socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
849
850   GNUNET_assert (0 != valid_read_size);
851
852   /* Cancel the read_io_timeout_task */
853   GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task_id);
854   socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
855
856   /* Call the data processor */
857   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
858               "%x: Calling read processor\n",
859               socket->our_id);
860   read_size = 
861     socket->read_handle->proc (socket->read_handle->proc_cls,
862                                socket->status,
863                                socket->receive_buffer + socket->copy_offset,
864                                valid_read_size);
865   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
866               "%x: Read processor read %d bytes\n",
867               socket->our_id,
868               read_size);
869   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
870               "%x: Read processor completed successfully\n",
871               socket->our_id);
872
873   /* Free the read handle */
874   GNUNET_free (socket->read_handle);
875   socket->read_handle = NULL;
876
877   GNUNET_assert (read_size <= valid_read_size);
878   socket->copy_offset += read_size;
879
880   /* Determine upto which packet we can remove from the buffer */
881   for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
882     {
883       if (socket->copy_offset == socket->receive_buffer_boundaries[packet])
884         { packet++; break; }
885       if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
886         break;
887     }
888
889   /* If no packets can be removed we can't move the buffer */
890   if (0 == packet) return;
891
892   sequence_increase = packet;
893   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
894               "%x: Sequence increase after read processor completion: %u\n",
895               socket->our_id,
896               sequence_increase);
897
898   /* Shift the data in the receive buffer */
899   memmove (socket->receive_buffer,
900            socket->receive_buffer 
901            + socket->receive_buffer_boundaries[sequence_increase-1],
902            socket->receive_buffer_size
903            - socket->receive_buffer_boundaries[sequence_increase-1]);
904   
905   /* Shift the bitmap */
906   socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
907   
908   /* Set read_sequence_number */
909   socket->read_sequence_number += sequence_increase;
910   
911   /* Set read_offset */
912   offset_increase = socket->receive_buffer_boundaries[sequence_increase-1];
913   socket->read_offset += offset_increase;
914
915   /* Fix copy_offset */
916   GNUNET_assert (offset_increase <= socket->copy_offset);
917   socket->copy_offset -= offset_increase;
918   
919   /* Fix relative boundaries */
920   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
921     {
922       if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase)
923         {
924           socket->receive_buffer_boundaries[packet] = 
925             socket->receive_buffer_boundaries[packet + sequence_increase] 
926             - offset_increase;
927         }
928       else
929         socket->receive_buffer_boundaries[packet] = 0;
930     }
931 }
932
933
934 /**
935  * Cancels the existing read io handle
936  *
937  * @param cls the closure from the SCHEDULER call
938  * @param tc the task context
939  */
940 static void
941 read_io_timeout (void *cls, 
942                 const struct GNUNET_SCHEDULER_TaskContext *tc)
943 {
944   struct GNUNET_STREAM_Socket *socket = cls;
945   GNUNET_STREAM_DataProcessor proc;
946   void *proc_cls;
947
948   socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
949   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
950   {
951     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
952                 "%x: Read task timedout - Cancelling it\n",
953                 socket->our_id);
954     GNUNET_SCHEDULER_cancel (socket->read_task_id);
955     socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
956   }
957   GNUNET_assert (NULL != socket->read_handle);
958   proc = socket->read_handle->proc;
959   proc_cls = socket->read_handle->proc_cls;
960
961   GNUNET_free (socket->read_handle);
962   socket->read_handle = NULL;
963   /* Call the read processor to signal timeout */
964   proc (proc_cls,
965         GNUNET_STREAM_TIMEOUT,
966         NULL,
967         0);
968 }
969
970
971 /**
972  * Handler for DATA messages; Same for both client and server
973  *
974  * @param socket the socket through which the ack was received
975  * @param tunnel connection to the other end
976  * @param sender who sent the message
977  * @param msg the data message
978  * @param atsi performance data for the connection
979  * @return GNUNET_OK to keep the connection open,
980  *         GNUNET_SYSERR to close it (signal serious error)
981  */
982 static int
983 handle_data (struct GNUNET_STREAM_Socket *socket,
984              struct GNUNET_MESH_Tunnel *tunnel,
985              const struct GNUNET_PeerIdentity *sender,
986              const struct GNUNET_STREAM_DataMessage *msg,
987              const struct GNUNET_ATS_Information*atsi)
988 {
989   const void *payload;
990   uint32_t bytes_needed;
991   uint32_t relative_offset;
992   uint32_t relative_sequence_number;
993   uint16_t size;
994
995   size = htons (msg->header.header.size);
996   if (size < sizeof (struct GNUNET_STREAM_DataMessage))
997     {
998       GNUNET_break_op (0);
999       return GNUNET_SYSERR;
1000     }
1001
1002   if (GNUNET_PEER_search (sender) != socket->other_peer)
1003     {
1004       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1005                   "%x: Received DATA from non-confirming peer\n",
1006                   socket->our_id);
1007       return GNUNET_YES;
1008     }
1009
1010   switch (socket->state)
1011     {
1012     case STATE_ESTABLISHED:
1013     case STATE_TRANSMIT_CLOSED:
1014     case STATE_TRANSMIT_CLOSE_WAIT:
1015       
1016       /* check if the message's sequence number is in the range we are
1017          expecting */
1018       relative_sequence_number = 
1019         ntohl (msg->sequence_number) - socket->read_sequence_number;
1020       if ( relative_sequence_number > GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
1021         {
1022           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1023                       "%x: Ignoring received message with sequence number %u\n",
1024                       socket->our_id,
1025                       ntohl (msg->sequence_number));
1026           /* Start ACK sending task if one is not already present */
1027           if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1028             {
1029               socket->ack_task_id = 
1030                 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1031                                               (msg->ack_deadline),
1032                                               &ack_task,
1033                                               socket);
1034             }
1035           return GNUNET_YES;
1036         }
1037       
1038       /* Check if we have already seen this message */
1039       if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
1040                                               relative_sequence_number))
1041         {
1042           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1043                       "%x: Ignoring already received message with sequence "
1044                       "number %u\n",
1045                       socket->our_id,
1046                       ntohl (msg->sequence_number));
1047           /* Start ACK sending task if one is not already present */
1048           if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1049             {
1050               socket->ack_task_id = 
1051                 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1052                                               (msg->ack_deadline),
1053                                               &ack_task,
1054                                               socket);
1055             }
1056           return GNUNET_YES;
1057         }
1058
1059       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1060                   "%x: Receiving DATA with sequence number: %u and size: %d "
1061                   "from %x\n",
1062                   socket->our_id,
1063                   ntohl (msg->sequence_number),
1064                   ntohs (msg->header.header.size),
1065                   socket->other_peer);
1066
1067       /* Check if we have to allocate the buffer */
1068       size -= sizeof (struct GNUNET_STREAM_DataMessage);
1069       relative_offset = ntohl (msg->offset) - socket->read_offset;
1070       bytes_needed = relative_offset + size;
1071       if (bytes_needed > socket->receive_buffer_size)
1072         {
1073           if (bytes_needed <= RECEIVE_BUFFER_SIZE)
1074             {
1075               socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
1076                                                        bytes_needed);
1077               socket->receive_buffer_size = bytes_needed;
1078             }
1079           else
1080             {
1081               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1082                           "%x: Cannot accommodate packet %d as buffer is",
1083                           "full\n",
1084                           socket->our_id,
1085                           ntohl (msg->sequence_number));
1086               return GNUNET_YES;
1087             }
1088         }
1089       
1090       /* Copy Data to buffer */
1091       payload = &msg[1];
1092       GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
1093       memcpy (socket->receive_buffer + relative_offset,
1094               payload,
1095               size);
1096       socket->receive_buffer_boundaries[relative_sequence_number] = 
1097         relative_offset + size;
1098       
1099       /* Modify the ACK bitmap */
1100       ackbitmap_modify_bit (&socket->ack_bitmap,
1101                             relative_sequence_number,
1102                             GNUNET_YES);
1103
1104       /* Start ACK sending task if one is not already present */
1105       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1106        {
1107          socket->ack_task_id = 
1108            GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1109                                          (msg->ack_deadline),
1110                                          &ack_task,
1111                                          socket);
1112        }
1113
1114       if ((NULL != socket->read_handle) /* A read handle is waiting */
1115           /* There is no current read task */
1116           && (GNUNET_SCHEDULER_NO_TASK == socket->read_task_id)
1117           /* We have the first packet */
1118           && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap,
1119                                                  0)))
1120         {
1121           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1122                       "%x: Scheduling read processor\n",
1123                       socket->our_id);
1124
1125           socket->read_task_id = 
1126             GNUNET_SCHEDULER_add_now (&call_read_processor,
1127                                       socket);
1128         }
1129       
1130       break;
1131
1132     default:
1133       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1134                   "%x: Received data message when it cannot be handled\n",
1135                   socket->our_id);
1136       break;
1137     }
1138   return GNUNET_YES;
1139 }
1140
1141
1142 /**
1143  * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1144  *
1145  * @param cls the socket (set from GNUNET_MESH_connect)
1146  * @param tunnel connection to the other end
1147  * @param tunnel_ctx place to store local state associated with the tunnel
1148  * @param sender who sent the message
1149  * @param message the actual message
1150  * @param atsi performance data for the connection
1151  * @return GNUNET_OK to keep the connection open,
1152  *         GNUNET_SYSERR to close it (signal serious error)
1153  */
1154 static int
1155 client_handle_data (void *cls,
1156              struct GNUNET_MESH_Tunnel *tunnel,
1157              void **tunnel_ctx,
1158              const struct GNUNET_PeerIdentity *sender,
1159              const struct GNUNET_MessageHeader *message,
1160              const struct GNUNET_ATS_Information*atsi)
1161 {
1162   struct GNUNET_STREAM_Socket *socket = cls;
1163
1164   return handle_data (socket, 
1165                       tunnel, 
1166                       sender, 
1167                       (const struct GNUNET_STREAM_DataMessage *) message, 
1168                       atsi);
1169 }
1170
1171
1172 /**
1173  * Callback to set state to ESTABLISHED
1174  *
1175  * @param cls the closure from queue_message FIXME: document
1176  * @param socket the socket to requiring state change
1177  */
1178 static void
1179 set_state_established (void *cls,
1180                        struct GNUNET_STREAM_Socket *socket)
1181 {
1182   struct GNUNET_PeerIdentity initiator_pid;
1183
1184   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
1185               "%x: Attaining ESTABLISHED state\n",
1186               socket->our_id);
1187   socket->write_offset = 0;
1188   socket->read_offset = 0;
1189   socket->state = STATE_ESTABLISHED;
1190   /* FIXME: What if listen_cb is NULL */
1191   if (NULL != socket->lsocket)
1192     {
1193       GNUNET_PEER_resolve (socket->other_peer, &initiator_pid);
1194       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1195                   "%x: Calling listen callback\n",
1196                   socket->our_id);
1197       if (GNUNET_SYSERR == 
1198           socket->lsocket->listen_cb (socket->lsocket->listen_cb_cls,
1199                                       socket,
1200                                       &initiator_pid))
1201         {
1202           socket->state = STATE_CLOSED;
1203           /* FIXME: We should close in a decent way */
1204           GNUNET_MESH_tunnel_destroy (socket->tunnel); /* Destroy the tunnel */
1205           GNUNET_free (socket);
1206         }
1207     }
1208   else if (socket->open_cb)
1209     socket->open_cb (socket->open_cls, socket);
1210 }
1211
1212
1213 /**
1214  * Callback to set state to HELLO_WAIT
1215  *
1216  * @param cls the closure from queue_message
1217  * @param socket the socket to requiring state change
1218  */
1219 static void
1220 set_state_hello_wait (void *cls,
1221                       struct GNUNET_STREAM_Socket *socket)
1222 {
1223   GNUNET_assert (STATE_INIT == socket->state);
1224   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
1225               "%x: Attaining HELLO_WAIT state\n",
1226               socket->our_id);
1227   socket->state = STATE_HELLO_WAIT;
1228 }
1229
1230
1231 /**
1232  * Callback to set state to CLOSE_WAIT
1233  *
1234  * @param cls the closure from queue_message
1235  * @param socket the socket requiring state change
1236  */
1237 static void
1238 set_state_close_wait (void *cls,
1239                       struct GNUNET_STREAM_Socket *socket)
1240 {
1241   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1242               "%x: Attaing CLOSE_WAIT state\n",
1243               socket->our_id);
1244   socket->state = STATE_CLOSE_WAIT;
1245   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1246   socket->receive_buffer = NULL;
1247   socket->receive_buffer_size = 0;
1248 }
1249
1250
1251 /**
1252  * Callback to set state to CLOSED
1253  *
1254  * @param cls the closure from queue_message
1255  * @param socket the socket requiring state change
1256  */
1257 static void
1258 set_state_closed (void *cls,
1259                   struct GNUNET_STREAM_Socket *socket)
1260 {
1261   socket->state = STATE_CLOSED;
1262 }
1263
1264 /**
1265  * Returns a new HelloAckMessage. Also sets the write sequence number for the
1266  * socket
1267  *
1268  * @param socket the socket for which this HelloAckMessage has to be generated
1269  * @return the HelloAckMessage
1270  */
1271 static struct GNUNET_STREAM_HelloAckMessage *
1272 generate_hello_ack_msg (struct GNUNET_STREAM_Socket *socket)
1273 {
1274   struct GNUNET_STREAM_HelloAckMessage *msg;
1275
1276   /* Get the random sequence number */
1277   socket->write_sequence_number = 
1278     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1279   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1280               "%x: Generated write sequence number %u\n",
1281               socket->our_id,
1282               (unsigned int) socket->write_sequence_number);
1283   
1284   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1285   msg->header.header.size = 
1286     htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1287   msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1288   msg->sequence_number = htonl (socket->write_sequence_number);
1289   msg->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
1290
1291   return msg;
1292 }
1293
1294
1295 /**
1296  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1297  *
1298  * @param cls the socket (set from GNUNET_MESH_connect)
1299  * @param tunnel connection to the other end
1300  * @param tunnel_ctx this is NULL
1301  * @param sender who sent the message
1302  * @param message the actual message
1303  * @param atsi performance data for the connection
1304  * @return GNUNET_OK to keep the connection open,
1305  *         GNUNET_SYSERR to close it (signal serious error)
1306  */
1307 static int
1308 client_handle_hello_ack (void *cls,
1309                          struct GNUNET_MESH_Tunnel *tunnel,
1310                          void **tunnel_ctx,
1311                          const struct GNUNET_PeerIdentity *sender,
1312                          const struct GNUNET_MessageHeader *message,
1313                          const struct GNUNET_ATS_Information*atsi)
1314 {
1315   struct GNUNET_STREAM_Socket *socket = cls;
1316   const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
1317   struct GNUNET_STREAM_HelloAckMessage *reply;
1318
1319   if (GNUNET_PEER_search (sender) != socket->other_peer)
1320     {
1321       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1322                   "%x: Received HELLO_ACK from non-confirming peer\n",
1323                   socket->our_id);
1324       return GNUNET_YES;
1325     }
1326   ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
1327   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1328               "%x: Received HELLO_ACK from %x\n",
1329               socket->our_id,
1330               socket->other_peer);
1331
1332   GNUNET_assert (socket->tunnel == tunnel);
1333   switch (socket->state)
1334   {
1335   case STATE_HELLO_WAIT:
1336     socket->read_sequence_number = ntohl (ack_msg->sequence_number);
1337     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1338                 "%x: Read sequence number %u\n",
1339                 socket->our_id,
1340                 (unsigned int) socket->read_sequence_number);
1341     socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
1342     reply = generate_hello_ack_msg (socket);
1343     queue_message (socket,
1344                    &reply->header, 
1345                    &set_state_established, 
1346                    NULL);      
1347     return GNUNET_OK;
1348   case STATE_ESTABLISHED:
1349   case STATE_RECEIVE_CLOSE_WAIT:
1350     // call statistics (# ACKs ignored++)
1351     return GNUNET_OK;
1352   case STATE_INIT:
1353   default:
1354     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1355                 "%x: Server %x sent HELLO_ACK when in state %d\n", 
1356                 socket->our_id,
1357                 socket->other_peer,
1358                 socket->state);
1359     socket->state = STATE_CLOSED; // introduce STATE_ERROR?
1360     return GNUNET_SYSERR;
1361   }
1362
1363 }
1364
1365
1366 /**
1367  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1368  *
1369  * @param cls the socket (set from GNUNET_MESH_connect)
1370  * @param tunnel connection to the other end
1371  * @param tunnel_ctx this is NULL
1372  * @param sender who sent the message
1373  * @param message the actual message
1374  * @param atsi performance data for the connection
1375  * @return GNUNET_OK to keep the connection open,
1376  *         GNUNET_SYSERR to close it (signal serious error)
1377  */
1378 static int
1379 client_handle_reset (void *cls,
1380                      struct GNUNET_MESH_Tunnel *tunnel,
1381                      void **tunnel_ctx,
1382                      const struct GNUNET_PeerIdentity *sender,
1383                      const struct GNUNET_MessageHeader *message,
1384                      const struct GNUNET_ATS_Information*atsi)
1385 {
1386   struct GNUNET_STREAM_Socket *socket = cls;
1387
1388   return GNUNET_OK;
1389 }
1390
1391
1392 /**
1393  * Common message handler for handling TRANSMIT_CLOSE messages
1394  *
1395  * @param socket the socket through which the ack was received
1396  * @param tunnel connection to the other end
1397  * @param sender who sent the message
1398  * @param msg the transmit close message
1399  * @param atsi performance data for the connection
1400  * @return GNUNET_OK to keep the connection open,
1401  *         GNUNET_SYSERR to close it (signal serious error)
1402  */
1403 static int
1404 handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
1405                        struct GNUNET_MESH_Tunnel *tunnel,
1406                        const struct GNUNET_PeerIdentity *sender,
1407                        const struct GNUNET_STREAM_MessageHeader *msg,
1408                        const struct GNUNET_ATS_Information*atsi)
1409 {
1410   struct GNUNET_STREAM_MessageHeader *reply;
1411
1412   switch (socket->state)
1413     {
1414     case STATE_ESTABLISHED:
1415       socket->state = STATE_RECEIVE_CLOSED;
1416
1417       /* Send TRANSMIT_CLOSE_ACK */
1418       reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1419       reply->header.type = 
1420         htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
1421       reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1422       queue_message (socket, reply, NULL, NULL);
1423       break;
1424
1425     default:
1426       /* FIXME: Call statistics? */
1427       break;
1428     }
1429   return GNUNET_YES;
1430 }
1431
1432
1433 /**
1434  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1435  *
1436  * @param cls the socket (set from GNUNET_MESH_connect)
1437  * @param tunnel connection to the other end
1438  * @param tunnel_ctx this is NULL
1439  * @param sender who sent the message
1440  * @param message the actual message
1441  * @param atsi performance data for the connection
1442  * @return GNUNET_OK to keep the connection open,
1443  *         GNUNET_SYSERR to close it (signal serious error)
1444  */
1445 static int
1446 client_handle_transmit_close (void *cls,
1447                               struct GNUNET_MESH_Tunnel *tunnel,
1448                               void **tunnel_ctx,
1449                               const struct GNUNET_PeerIdentity *sender,
1450                               const struct GNUNET_MessageHeader *message,
1451                               const struct GNUNET_ATS_Information*atsi)
1452 {
1453   struct GNUNET_STREAM_Socket *socket = cls;
1454   
1455   return handle_transmit_close (socket,
1456                                 tunnel,
1457                                 sender,
1458                                 (struct GNUNET_STREAM_MessageHeader *)message,
1459                                 atsi);
1460 }
1461
1462
1463 /**
1464  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1465  *
1466  * @param cls the socket (set from GNUNET_MESH_connect)
1467  * @param tunnel connection to the other end
1468  * @param tunnel_ctx this is NULL
1469  * @param sender who sent the message
1470  * @param message the actual message
1471  * @param atsi performance data for the connection
1472  * @return GNUNET_OK to keep the connection open,
1473  *         GNUNET_SYSERR to close it (signal serious error)
1474  */
1475 static int
1476 client_handle_transmit_close_ack (void *cls,
1477                                   struct GNUNET_MESH_Tunnel *tunnel,
1478                                   void **tunnel_ctx,
1479                                   const struct GNUNET_PeerIdentity *sender,
1480                                   const struct GNUNET_MessageHeader *message,
1481                                   const struct GNUNET_ATS_Information*atsi)
1482 {
1483   struct GNUNET_STREAM_Socket *socket = cls;
1484
1485   return GNUNET_OK;
1486 }
1487
1488
1489 /**
1490  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1491  *
1492  * @param cls the socket (set from GNUNET_MESH_connect)
1493  * @param tunnel connection to the other end
1494  * @param tunnel_ctx this is NULL
1495  * @param sender who sent the message
1496  * @param message the actual message
1497  * @param atsi performance data for the connection
1498  * @return GNUNET_OK to keep the connection open,
1499  *         GNUNET_SYSERR to close it (signal serious error)
1500  */
1501 static int
1502 client_handle_receive_close (void *cls,
1503                              struct GNUNET_MESH_Tunnel *tunnel,
1504                              void **tunnel_ctx,
1505                              const struct GNUNET_PeerIdentity *sender,
1506                              const struct GNUNET_MessageHeader *message,
1507                              const struct GNUNET_ATS_Information*atsi)
1508 {
1509   struct GNUNET_STREAM_Socket *socket = cls;
1510
1511   return GNUNET_OK;
1512 }
1513
1514
1515 /**
1516  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1517  *
1518  * @param cls the socket (set from GNUNET_MESH_connect)
1519  * @param tunnel connection to the other end
1520  * @param tunnel_ctx this is NULL
1521  * @param sender who sent the message
1522  * @param message the actual message
1523  * @param atsi performance data for the connection
1524  * @return GNUNET_OK to keep the connection open,
1525  *         GNUNET_SYSERR to close it (signal serious error)
1526  */
1527 static int
1528 client_handle_receive_close_ack (void *cls,
1529                                  struct GNUNET_MESH_Tunnel *tunnel,
1530                                  void **tunnel_ctx,
1531                                  const struct GNUNET_PeerIdentity *sender,
1532                                  const struct GNUNET_MessageHeader *message,
1533                                  const struct GNUNET_ATS_Information*atsi)
1534 {
1535   struct GNUNET_STREAM_Socket *socket = cls;
1536
1537   return GNUNET_OK;
1538 }
1539
1540
1541 /**
1542  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1543  *
1544  * @param socket the socket
1545  * @param tunnel connection to the other end
1546  * @param tunnel_ctx this is NULL
1547  * @param sender who sent the message
1548  * @param message the actual message
1549  * @param atsi performance data for the connection
1550  * @return GNUNET_OK to keep the connection open,
1551  *         GNUNET_SYSERR to close it (signal serious error)
1552  */
1553 static int
1554 handle_close (struct GNUNET_STREAM_Socket *socket,
1555               struct GNUNET_MESH_Tunnel *tunnel,
1556               const struct GNUNET_PeerIdentity *sender,
1557               const struct GNUNET_STREAM_MessageHeader *message,
1558               const struct GNUNET_ATS_Information*atsi)
1559 {
1560   struct GNUNET_STREAM_MessageHeader *close_ack;
1561
1562   close_ack = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1563   close_ack->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1564   close_ack->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK);
1565   queue_message (socket,
1566                  close_ack,
1567                  &set_state_closed,
1568                  NULL);
1569   if (socket->state == STATE_CLOSED)
1570     return GNUNET_OK;
1571
1572   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1573   socket->receive_buffer = NULL;
1574   socket->receive_buffer_size = 0;
1575   return GNUNET_OK;
1576 }
1577
1578
1579 /**
1580  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1581  *
1582  * @param cls the socket (set from GNUNET_MESH_connect)
1583  * @param tunnel connection to the other end
1584  * @param tunnel_ctx this is NULL
1585  * @param sender who sent the message
1586  * @param message the actual message
1587  * @param atsi performance data for the connection
1588  * @return GNUNET_OK to keep the connection open,
1589  *         GNUNET_SYSERR to close it (signal serious error)
1590  */
1591 static int
1592 client_handle_close (void *cls,
1593                      struct GNUNET_MESH_Tunnel *tunnel,
1594                      void **tunnel_ctx,
1595                      const struct GNUNET_PeerIdentity *sender,
1596                      const struct GNUNET_MessageHeader *message,
1597                      const struct GNUNET_ATS_Information*atsi)
1598 {
1599   struct GNUNET_STREAM_Socket *socket = cls;
1600
1601   return handle_close (socket,
1602                        tunnel,
1603                        sender,
1604                        (const struct GNUNET_STREAM_MessageHeader *) message,
1605                        atsi);
1606 }
1607
1608
1609 /**
1610  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1611  *
1612  * @param socket the socket
1613  * @param tunnel connection to the other end
1614  * @param tunnel_ctx this is NULL
1615  * @param sender who sent the message
1616  * @param message the actual message
1617  * @param atsi performance data for the connection
1618  * @return GNUNET_OK to keep the connection open,
1619  *         GNUNET_SYSERR to close it (signal serious error)
1620  */
1621 static int
1622 handle_close_ack (struct GNUNET_STREAM_Socket *socket,
1623                   struct GNUNET_MESH_Tunnel *tunnel,
1624                   const struct GNUNET_PeerIdentity *sender,
1625                   const struct GNUNET_STREAM_MessageHeader *message,
1626                   const struct GNUNET_ATS_Information*atsi)
1627 {
1628   switch (socket->state)
1629     {
1630     case STATE_CLOSE_WAIT:
1631       socket->state = STATE_CLOSED;
1632       break;
1633     default:
1634       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1635                   "%x: Received CLOSE_ACK when in it not expected\n",
1636                   socket->our_id);
1637       break;
1638     }
1639   return GNUNET_OK;
1640 }
1641
1642
1643 /**
1644  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1645  *
1646  * @param cls the socket (set from GNUNET_MESH_connect)
1647  * @param tunnel connection to the other end
1648  * @param tunnel_ctx this is NULL
1649  * @param sender who sent the message
1650  * @param message the actual message
1651  * @param atsi performance data for the connection
1652  * @return GNUNET_OK to keep the connection open,
1653  *         GNUNET_SYSERR to close it (signal serious error)
1654  */
1655 static int
1656 client_handle_close_ack (void *cls,
1657                          struct GNUNET_MESH_Tunnel *tunnel,
1658                          void **tunnel_ctx,
1659                          const struct GNUNET_PeerIdentity *sender,
1660                          const struct GNUNET_MessageHeader *message,
1661                          const struct GNUNET_ATS_Information*atsi)
1662 {
1663   struct GNUNET_STREAM_Socket *socket = cls;
1664
1665   return handle_close_ack (socket,
1666                            tunnel,
1667                            sender,
1668                            (const struct GNUNET_STREAM_MessageHeader *) 
1669                            message,
1670                            atsi);
1671 }
1672
1673 /*****************************/
1674 /* Server's Message Handlers */
1675 /*****************************/
1676
1677 /**
1678  * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1679  *
1680  * @param cls the closure
1681  * @param tunnel connection to the other end
1682  * @param tunnel_ctx the socket
1683  * @param sender who sent the message
1684  * @param message the actual message
1685  * @param atsi performance data for the connection
1686  * @return GNUNET_OK to keep the connection open,
1687  *         GNUNET_SYSERR to close it (signal serious error)
1688  */
1689 static int
1690 server_handle_data (void *cls,
1691                     struct GNUNET_MESH_Tunnel *tunnel,
1692                     void **tunnel_ctx,
1693                     const struct GNUNET_PeerIdentity *sender,
1694                     const struct GNUNET_MessageHeader *message,
1695                     const struct GNUNET_ATS_Information*atsi)
1696 {
1697   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1698
1699   return handle_data (socket,
1700                       tunnel,
1701                       sender,
1702                       (const struct GNUNET_STREAM_DataMessage *)message,
1703                       atsi);
1704 }
1705
1706
1707 /**
1708  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
1709  *
1710  * @param cls the closure
1711  * @param tunnel connection to the other end
1712  * @param tunnel_ctx the socket
1713  * @param sender who sent the message
1714  * @param message the actual message
1715  * @param atsi performance data for the connection
1716  * @return GNUNET_OK to keep the connection open,
1717  *         GNUNET_SYSERR to close it (signal serious error)
1718  */
1719 static int
1720 server_handle_hello (void *cls,
1721                      struct GNUNET_MESH_Tunnel *tunnel,
1722                      void **tunnel_ctx,
1723                      const struct GNUNET_PeerIdentity *sender,
1724                      const struct GNUNET_MessageHeader *message,
1725                      const struct GNUNET_ATS_Information*atsi)
1726 {
1727   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1728   struct GNUNET_STREAM_HelloAckMessage *reply;
1729
1730   if (GNUNET_PEER_search (sender) != socket->other_peer)
1731     {
1732       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1733                   "%x: Received HELLO from non-confirming peer\n",
1734                   socket->our_id);
1735       return GNUNET_YES;
1736     }
1737
1738   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == 
1739                  ntohs (message->type));
1740   GNUNET_assert (socket->tunnel == tunnel);
1741   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1742               "%x: Received HELLO from %x\n", 
1743               socket->our_id,
1744               socket->other_peer);
1745
1746   if (STATE_INIT == socket->state)
1747     {
1748       reply = generate_hello_ack_msg (socket);
1749       queue_message (socket, 
1750                      &reply->header,
1751                      &set_state_hello_wait, 
1752                      NULL);
1753     }
1754   else
1755     {
1756       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1757                   "Client sent HELLO when in state %d\n", socket->state);
1758       /* FIXME: Send RESET? */
1759       
1760     }
1761   return GNUNET_OK;
1762 }
1763
1764
1765 /**
1766  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1767  *
1768  * @param cls the closure
1769  * @param tunnel connection to the other end
1770  * @param tunnel_ctx the socket
1771  * @param sender who sent the message
1772  * @param message the actual message
1773  * @param atsi performance data for the connection
1774  * @return GNUNET_OK to keep the connection open,
1775  *         GNUNET_SYSERR to close it (signal serious error)
1776  */
1777 static int
1778 server_handle_hello_ack (void *cls,
1779                          struct GNUNET_MESH_Tunnel *tunnel,
1780                          void **tunnel_ctx,
1781                          const struct GNUNET_PeerIdentity *sender,
1782                          const struct GNUNET_MessageHeader *message,
1783                          const struct GNUNET_ATS_Information*atsi)
1784 {
1785   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1786   const struct GNUNET_STREAM_HelloAckMessage *ack_message;
1787
1788   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK ==
1789                  ntohs (message->type));
1790   GNUNET_assert (socket->tunnel == tunnel);
1791   ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
1792   if (STATE_HELLO_WAIT == socket->state)
1793     {
1794       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1795                   "%x: Received HELLO_ACK from %x\n",
1796                   socket->our_id,
1797                   socket->other_peer);
1798       socket->read_sequence_number = ntohl (ack_message->sequence_number);
1799       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1800                   "%x: Read sequence number %u\n",
1801                   socket->our_id,
1802                   (unsigned int) socket->read_sequence_number);
1803       socket->receiver_window_available = 
1804         ntohl (ack_message->receiver_window_size);
1805       /* Attain ESTABLISHED state */
1806       set_state_established (NULL, socket);
1807     }
1808   else
1809     {
1810       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1811                   "Client sent HELLO_ACK when in state %d\n", socket->state);
1812       /* FIXME: Send RESET? */
1813       
1814     }
1815   return GNUNET_OK;
1816 }
1817
1818
1819 /**
1820  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1821  *
1822  * @param cls the closure
1823  * @param tunnel connection to the other end
1824  * @param tunnel_ctx the socket
1825  * @param sender who sent the message
1826  * @param message the actual message
1827  * @param atsi performance data for the connection
1828  * @return GNUNET_OK to keep the connection open,
1829  *         GNUNET_SYSERR to close it (signal serious error)
1830  */
1831 static int
1832 server_handle_reset (void *cls,
1833                      struct GNUNET_MESH_Tunnel *tunnel,
1834                      void **tunnel_ctx,
1835                      const struct GNUNET_PeerIdentity *sender,
1836                      const struct GNUNET_MessageHeader *message,
1837                      const struct GNUNET_ATS_Information*atsi)
1838 {
1839   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1840
1841   return GNUNET_OK;
1842 }
1843
1844
1845 /**
1846  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1847  *
1848  * @param cls the closure
1849  * @param tunnel connection to the other end
1850  * @param tunnel_ctx the socket
1851  * @param sender who sent the message
1852  * @param message the actual message
1853  * @param atsi performance data for the connection
1854  * @return GNUNET_OK to keep the connection open,
1855  *         GNUNET_SYSERR to close it (signal serious error)
1856  */
1857 static int
1858 server_handle_transmit_close (void *cls,
1859                               struct GNUNET_MESH_Tunnel *tunnel,
1860                               void **tunnel_ctx,
1861                               const struct GNUNET_PeerIdentity *sender,
1862                               const struct GNUNET_MessageHeader *message,
1863                               const struct GNUNET_ATS_Information*atsi)
1864 {
1865   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1866
1867   return handle_transmit_close (socket,
1868                                 tunnel,
1869                                 sender,
1870                                 (struct GNUNET_STREAM_MessageHeader *)message,
1871                                 atsi);
1872 }
1873
1874
1875 /**
1876  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1877  *
1878  * @param cls the closure
1879  * @param tunnel connection to the other end
1880  * @param tunnel_ctx the socket
1881  * @param sender who sent the message
1882  * @param message the actual message
1883  * @param atsi performance data for the connection
1884  * @return GNUNET_OK to keep the connection open,
1885  *         GNUNET_SYSERR to close it (signal serious error)
1886  */
1887 static int
1888 server_handle_transmit_close_ack (void *cls,
1889                                   struct GNUNET_MESH_Tunnel *tunnel,
1890                                   void **tunnel_ctx,
1891                                   const struct GNUNET_PeerIdentity *sender,
1892                                   const struct GNUNET_MessageHeader *message,
1893                                   const struct GNUNET_ATS_Information*atsi)
1894 {
1895   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1896
1897   return GNUNET_OK;
1898 }
1899
1900
1901 /**
1902  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1903  *
1904  * @param cls the closure
1905  * @param tunnel connection to the other end
1906  * @param tunnel_ctx the socket
1907  * @param sender who sent the message
1908  * @param message the actual message
1909  * @param atsi performance data for the connection
1910  * @return GNUNET_OK to keep the connection open,
1911  *         GNUNET_SYSERR to close it (signal serious error)
1912  */
1913 static int
1914 server_handle_receive_close (void *cls,
1915                              struct GNUNET_MESH_Tunnel *tunnel,
1916                              void **tunnel_ctx,
1917                              const struct GNUNET_PeerIdentity *sender,
1918                              const struct GNUNET_MessageHeader *message,
1919                              const struct GNUNET_ATS_Information*atsi)
1920 {
1921   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1922
1923   return GNUNET_OK;
1924 }
1925
1926
1927 /**
1928  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1929  *
1930  * @param cls the closure
1931  * @param tunnel connection to the other end
1932  * @param tunnel_ctx the socket
1933  * @param sender who sent the message
1934  * @param message the actual message
1935  * @param atsi performance data for the connection
1936  * @return GNUNET_OK to keep the connection open,
1937  *         GNUNET_SYSERR to close it (signal serious error)
1938  */
1939 static int
1940 server_handle_receive_close_ack (void *cls,
1941                                  struct GNUNET_MESH_Tunnel *tunnel,
1942                                  void **tunnel_ctx,
1943                                  const struct GNUNET_PeerIdentity *sender,
1944                                  const struct GNUNET_MessageHeader *message,
1945                                  const struct GNUNET_ATS_Information*atsi)
1946 {
1947   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1948
1949   return GNUNET_OK;
1950 }
1951
1952
1953 /**
1954  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1955  *
1956  * @param cls the listen socket (from GNUNET_MESH_connect in
1957  *          GNUNET_STREAM_listen) 
1958  * @param tunnel connection to the other end
1959  * @param tunnel_ctx the socket
1960  * @param sender who sent the message
1961  * @param message the actual message
1962  * @param atsi performance data for the connection
1963  * @return GNUNET_OK to keep the connection open,
1964  *         GNUNET_SYSERR to close it (signal serious error)
1965  */
1966 static int
1967 server_handle_close (void *cls,
1968                      struct GNUNET_MESH_Tunnel *tunnel,
1969                      void **tunnel_ctx,
1970                      const struct GNUNET_PeerIdentity *sender,
1971                      const struct GNUNET_MessageHeader *message,
1972                      const struct GNUNET_ATS_Information*atsi)
1973 {
1974   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1975   
1976   return handle_close (socket,
1977                        tunnel,
1978                        sender,
1979                        (const struct GNUNET_STREAM_MessageHeader *) message,
1980                        atsi);
1981 }
1982
1983
1984 /**
1985  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1986  *
1987  * @param cls the closure
1988  * @param tunnel connection to the other end
1989  * @param tunnel_ctx the socket
1990  * @param sender who sent the message
1991  * @param message the actual message
1992  * @param atsi performance data for the connection
1993  * @return GNUNET_OK to keep the connection open,
1994  *         GNUNET_SYSERR to close it (signal serious error)
1995  */
1996 static int
1997 server_handle_close_ack (void *cls,
1998                          struct GNUNET_MESH_Tunnel *tunnel,
1999                          void **tunnel_ctx,
2000                          const struct GNUNET_PeerIdentity *sender,
2001                          const struct GNUNET_MessageHeader *message,
2002                          const struct GNUNET_ATS_Information*atsi)
2003 {
2004   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2005
2006   return handle_close_ack (socket,
2007                            tunnel,
2008                            sender,
2009                            (const struct GNUNET_STREAM_MessageHeader *) message,
2010                            atsi);
2011 }
2012
2013
2014 /**
2015  * Message Handler for mesh
2016  *
2017  * @param socket the socket through which the ack was received
2018  * @param tunnel connection to the other end
2019  * @param sender who sent the message
2020  * @param ack the acknowledgment message
2021  * @param atsi performance data for the connection
2022  * @return GNUNET_OK to keep the connection open,
2023  *         GNUNET_SYSERR to close it (signal serious error)
2024  */
2025 static int
2026 handle_ack (struct GNUNET_STREAM_Socket *socket,
2027             struct GNUNET_MESH_Tunnel *tunnel,
2028             const struct GNUNET_PeerIdentity *sender,
2029             const struct GNUNET_STREAM_AckMessage *ack,
2030             const struct GNUNET_ATS_Information*atsi)
2031 {
2032   unsigned int packet;
2033   int need_retransmission;
2034   
2035
2036   if (GNUNET_PEER_search (sender) != socket->other_peer)
2037     {
2038       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2039                   "%x: Received ACK from non-confirming peer\n",
2040                   socket->our_id);
2041       return GNUNET_YES;
2042     }
2043
2044   switch (socket->state)
2045     {
2046     case (STATE_ESTABLISHED):
2047       if (NULL == socket->write_handle)
2048         {
2049           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2050                       "%x: Received DATA_ACK when write_handle is NULL\n",
2051                       socket->our_id);
2052           return GNUNET_OK;
2053         }
2054       /* FIXME: increment in the base sequence number is breaking current flow
2055        */
2056       if (!((socket->write_sequence_number 
2057              - ntohl (ack->base_sequence_number)) < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
2058         {
2059           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2060                       "%x: Received DATA_ACK with unexpected base sequence "
2061                       "number\n",
2062                       socket->our_id);
2063           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2064                       "%x: Current write sequence: %u; Ack's base sequence: %u\n",
2065                       socket->our_id,
2066                       socket->write_sequence_number,
2067                       ntohl (ack->base_sequence_number));
2068           return GNUNET_OK;
2069         }
2070       /* FIXME: include the case when write_handle is cancelled - ignore the 
2071          acks */
2072
2073       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2074                   "%x: Received DATA_ACK from %x\n",
2075                   socket->our_id,
2076                   socket->other_peer);
2077       
2078       /* Cancel the retransmission task */
2079       if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
2080         {
2081           GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
2082           socket->retransmission_timeout_task_id = 
2083             GNUNET_SCHEDULER_NO_TASK;
2084         }
2085
2086       for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2087         {
2088           if (NULL == socket->write_handle->messages[packet]) break;
2089           if (ntohl (ack->base_sequence_number)
2090               >= ntohl (socket->write_handle->messages[packet]->sequence_number))
2091             ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
2092                                   packet,
2093                                   GNUNET_YES);
2094           else
2095             if (GNUNET_YES == 
2096                 ackbitmap_is_bit_set (&socket->write_handle->ack_bitmap,
2097                                       ntohl (socket->write_handle->messages[packet]->sequence_number)
2098                                       - ntohl (ack->base_sequence_number)))
2099               ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
2100                                     packet,
2101                                     GNUNET_YES);
2102         }
2103
2104       /* Update the receive window remaining
2105        FIXME : Should update with the value from a data ack with greater
2106        sequence number */
2107       socket->receiver_window_available = 
2108         ntohl (ack->receive_window_remaining);
2109
2110       /* Check if we have received all acknowledgements */
2111       need_retransmission = GNUNET_NO;
2112       for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2113         {
2114           if (NULL == socket->write_handle->messages[packet]) break;
2115           if (GNUNET_YES != ackbitmap_is_bit_set 
2116               (&socket->write_handle->ack_bitmap,packet))
2117             {
2118               need_retransmission = GNUNET_YES;
2119               break;
2120             }
2121         }
2122       if (GNUNET_YES == need_retransmission)
2123         {
2124           write_data (socket);
2125         }
2126       else      /* We have to call the write continuation callback now */
2127         {
2128           /* Free the packets */
2129           for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2130             {
2131               GNUNET_free_non_null (socket->write_handle->messages[packet]);
2132             }
2133           if (NULL != socket->write_handle->write_cont)
2134             socket->write_handle->write_cont
2135               (socket->write_handle->write_cont_cls,
2136                socket->status,
2137                socket->write_handle->size);
2138           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2139                       "%x: Write completion callback completed\n",
2140                       socket->our_id);
2141           /* We are done with the write handle - Freeing it */
2142           GNUNET_free (socket->write_handle);
2143           socket->write_handle = NULL;
2144         }
2145       break;
2146     default:
2147       break;
2148     }
2149   return GNUNET_OK;
2150 }
2151
2152
2153 /**
2154  * Message Handler for mesh
2155  *
2156  * @param cls the 'struct GNUNET_STREAM_Socket'
2157  * @param tunnel connection to the other end
2158  * @param tunnel_ctx unused
2159  * @param sender who sent the message
2160  * @param message the actual message
2161  * @param atsi performance data for the connection
2162  * @return GNUNET_OK to keep the connection open,
2163  *         GNUNET_SYSERR to close it (signal serious error)
2164  */
2165 static int
2166 client_handle_ack (void *cls,
2167                    struct GNUNET_MESH_Tunnel *tunnel,
2168                    void **tunnel_ctx,
2169                    const struct GNUNET_PeerIdentity *sender,
2170                    const struct GNUNET_MessageHeader *message,
2171                    const struct GNUNET_ATS_Information*atsi)
2172 {
2173   struct GNUNET_STREAM_Socket *socket = cls;
2174   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2175  
2176   return handle_ack (socket, tunnel, sender, ack, atsi);
2177 }
2178
2179
2180 /**
2181  * Message Handler for mesh
2182  *
2183  * @param cls the server's listen socket
2184  * @param tunnel connection to the other end
2185  * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
2186  * @param sender who sent the message
2187  * @param message the actual message
2188  * @param atsi performance data for the connection
2189  * @return GNUNET_OK to keep the connection open,
2190  *         GNUNET_SYSERR to close it (signal serious error)
2191  */
2192 static int
2193 server_handle_ack (void *cls,
2194                    struct GNUNET_MESH_Tunnel *tunnel,
2195                    void **tunnel_ctx,
2196                    const struct GNUNET_PeerIdentity *sender,
2197                    const struct GNUNET_MessageHeader *message,
2198                    const struct GNUNET_ATS_Information*atsi)
2199 {
2200   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2201   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2202  
2203   return handle_ack (socket, tunnel, sender, ack, atsi);
2204 }
2205
2206
2207 /**
2208  * For client message handlers, the stream socket is in the
2209  * closure argument.
2210  */
2211 static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
2212   {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2213   {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2214    sizeof (struct GNUNET_STREAM_AckMessage) },
2215   {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2216    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2217   {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2218    sizeof (struct GNUNET_STREAM_MessageHeader)},
2219   {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2220    sizeof (struct GNUNET_STREAM_MessageHeader)},
2221   {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2222    sizeof (struct GNUNET_STREAM_MessageHeader)},
2223   {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2224    sizeof (struct GNUNET_STREAM_MessageHeader)},
2225   {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2226    sizeof (struct GNUNET_STREAM_MessageHeader)},
2227   {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2228    sizeof (struct GNUNET_STREAM_MessageHeader)},
2229   {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2230    sizeof (struct GNUNET_STREAM_MessageHeader)},
2231   {NULL, 0, 0}
2232 };
2233
2234
2235 /**
2236  * For server message handlers, the stream socket is in the
2237  * tunnel context, and the listen socket in the closure argument.
2238  */
2239 static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
2240   {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2241   {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2242    sizeof (struct GNUNET_STREAM_AckMessage) },
2243   {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 
2244    sizeof (struct GNUNET_STREAM_MessageHeader)},
2245   {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2246    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2247   {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2248    sizeof (struct GNUNET_STREAM_MessageHeader)},
2249   {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2250    sizeof (struct GNUNET_STREAM_MessageHeader)},
2251   {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2252    sizeof (struct GNUNET_STREAM_MessageHeader)},
2253   {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2254    sizeof (struct GNUNET_STREAM_MessageHeader)},
2255   {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2256    sizeof (struct GNUNET_STREAM_MessageHeader)},
2257   {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2258    sizeof (struct GNUNET_STREAM_MessageHeader)},
2259   {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2260    sizeof (struct GNUNET_STREAM_MessageHeader)},
2261   {NULL, 0, 0}
2262 };
2263
2264
2265 /**
2266  * Function called when our target peer is connected to our tunnel
2267  *
2268  * @param cls the socket for which this tunnel is created
2269  * @param peer the peer identity of the target
2270  * @param atsi performance data for the connection
2271  */
2272 static void
2273 mesh_peer_connect_callback (void *cls,
2274                             const struct GNUNET_PeerIdentity *peer,
2275                             const struct GNUNET_ATS_Information * atsi)
2276 {
2277   struct GNUNET_STREAM_Socket *socket = cls;
2278   struct GNUNET_STREAM_MessageHeader *message;
2279   GNUNET_PEER_Id connected_peer;
2280
2281   connected_peer = GNUNET_PEER_search (peer);
2282   
2283   if (connected_peer != socket->other_peer)
2284     {
2285       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2286                   "%x: A peer which is not our target has connected",
2287                   "to our tunnel\n",
2288                   socket->our_id);
2289       return;
2290     }
2291   
2292   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2293               "%x: Target peer %x connected\n", 
2294               socket->our_id,
2295               connected_peer);
2296   
2297   /* Set state to INIT */
2298   socket->state = STATE_INIT;
2299
2300   /* Send HELLO message */
2301   message = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
2302   message->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
2303   message->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
2304   queue_message (socket,
2305                  message,
2306                  &set_state_hello_wait,
2307                  NULL);
2308
2309   /* Call open callback */
2310   if (NULL == socket->open_cb)
2311     {
2312       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2313                   "STREAM_open callback is NULL\n");
2314     }
2315 }
2316
2317
2318 /**
2319  * Function called when our target peer is disconnected from our tunnel
2320  *
2321  * @param cls the socket associated which this tunnel
2322  * @param peer the peer identity of the target
2323  */
2324 static void
2325 mesh_peer_disconnect_callback (void *cls,
2326                                const struct GNUNET_PeerIdentity *peer)
2327 {
2328
2329 }
2330
2331
2332 /**
2333  * Method called whenever a peer creates a tunnel to us
2334  *
2335  * @param cls closure
2336  * @param tunnel new handle to the tunnel
2337  * @param initiator peer that started the tunnel
2338  * @param atsi performance information for the tunnel
2339  * @return initial tunnel context for the tunnel
2340  *         (can be NULL -- that's not an error)
2341  */
2342 static void *
2343 new_tunnel_notify (void *cls,
2344                    struct GNUNET_MESH_Tunnel *tunnel,
2345                    const struct GNUNET_PeerIdentity *initiator,
2346                    const struct GNUNET_ATS_Information *atsi)
2347 {
2348   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2349   struct GNUNET_STREAM_Socket *socket;
2350
2351   /* FIXME: If a tunnel is already created, we should not accept new tunnels
2352      from the same peer again until the socket is closed */
2353
2354   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2355   socket->other_peer = GNUNET_PEER_intern (initiator);
2356   socket->tunnel = tunnel;
2357   socket->session_id = 0;       /* FIXME */
2358   socket->state = STATE_INIT;
2359   socket->lsocket = lsocket;
2360   socket->our_id = lsocket->our_id;
2361   
2362   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2363               "%x: Peer %x initiated tunnel to us\n", 
2364               socket->our_id,
2365               socket->other_peer);
2366   
2367   /* FIXME: Copy MESH handle from lsocket to socket */
2368   
2369   return socket;
2370 }
2371
2372
2373 /**
2374  * Function called whenever an inbound tunnel is destroyed.  Should clean up
2375  * any associated state.  This function is NOT called if the client has
2376  * explicitly asked for the tunnel to be destroyed using
2377  * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
2378  * the tunnel.
2379  *
2380  * @param cls closure (set from GNUNET_MESH_connect)
2381  * @param tunnel connection to the other end (henceforth invalid)
2382  * @param tunnel_ctx place where local state associated
2383  *                   with the tunnel is stored
2384  */
2385 static void 
2386 tunnel_cleaner (void *cls,
2387                 const struct GNUNET_MESH_Tunnel *tunnel,
2388                 void *tunnel_ctx)
2389 {
2390   struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
2391   
2392   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2393               "%x: Peer %x has terminated connection abruptly\n",
2394               socket->our_id,
2395               socket->other_peer);
2396
2397   socket->status = GNUNET_STREAM_SHUTDOWN;
2398
2399   /* Clear Transmit handles */
2400   if (NULL != socket->transmit_handle)
2401     {
2402       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2403       socket->transmit_handle = NULL;
2404     }
2405   socket->tunnel = NULL;
2406 }
2407
2408
2409 /*****************/
2410 /* API functions */
2411 /*****************/
2412
2413
2414 /**
2415  * Tries to open a stream to the target peer
2416  *
2417  * @param cfg configuration to use
2418  * @param target the target peer to which the stream has to be opened
2419  * @param app_port the application port number which uniquely identifies this
2420  *            stream
2421  * @param open_cb this function will be called after stream has be established 
2422  * @param open_cb_cls the closure for open_cb
2423  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
2424  * @return if successful it returns the stream socket; NULL if stream cannot be
2425  *         opened 
2426  */
2427 struct GNUNET_STREAM_Socket *
2428 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
2429                     const struct GNUNET_PeerIdentity *target,
2430                     GNUNET_MESH_ApplicationType app_port,
2431                     GNUNET_STREAM_OpenCallback open_cb,
2432                     void *open_cb_cls,
2433                     ...)
2434 {
2435   struct GNUNET_STREAM_Socket *socket;
2436   struct GNUNET_PeerIdentity own_peer_id;
2437   enum GNUNET_STREAM_Option option;
2438   va_list vargs;                /* Variable arguments */
2439
2440   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2441               "%s\n", __func__);
2442
2443   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2444   socket->other_peer = GNUNET_PEER_intern (target);
2445   socket->open_cb = open_cb;
2446   socket->open_cls = open_cb_cls;
2447   GNUNET_TESTING_get_peer_identity (cfg, &own_peer_id);
2448   socket->our_id = GNUNET_PEER_intern (&own_peer_id);
2449   
2450   /* Set defaults */
2451   socket->retransmit_timeout = 
2452     GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
2453
2454   va_start (vargs, open_cb_cls); /* Parse variable args */
2455   do {
2456     option = va_arg (vargs, enum GNUNET_STREAM_Option);
2457     switch (option)
2458       {
2459       case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
2460         /* Expect struct GNUNET_TIME_Relative */
2461         socket->retransmit_timeout = va_arg (vargs,
2462                                              struct GNUNET_TIME_Relative);
2463         break;
2464       case GNUNET_STREAM_OPTION_END:
2465         break;
2466       }
2467   } while (GNUNET_STREAM_OPTION_END != option);
2468   va_end (vargs);               /* End of variable args parsing */
2469   socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
2470                                       10,  /* QUEUE size as parameter? */
2471                                       socket, /* cls */
2472                                       NULL, /* No inbound tunnel handler */
2473                                       &tunnel_cleaner, /* FIXME: not required? */
2474                                       client_message_handlers,
2475                                       &app_port); /* We don't get inbound tunnels */
2476   if (NULL == socket->mesh)   /* Fail if we cannot connect to mesh */
2477     {
2478       GNUNET_free (socket);
2479       return NULL;
2480     }
2481
2482   /* Now create the mesh tunnel to target */
2483   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2484               "Creating MESH Tunnel\n");
2485   socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
2486                                               NULL, /* Tunnel context */
2487                                               &mesh_peer_connect_callback,
2488                                               &mesh_peer_disconnect_callback,
2489                                               socket);
2490   GNUNET_assert (NULL != socket->tunnel);
2491   GNUNET_MESH_peer_request_connect_add (socket->tunnel,
2492                                         target);
2493   
2494   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2495               "%s() END\n", __func__);
2496   return socket;
2497 }
2498
2499
2500 /**
2501  * Shutdown the stream for reading or writing (similar to man 2 shutdown).
2502  *
2503  * @param socket the stream socket
2504  * @param operation SHUT_RD, SHUT_WR or SHUT_RDWR
2505  * @param completion_cb the callback that will be called upon successful
2506  *          shutdown of given operation
2507  * @param completion_cls the closure for the completion callback
2508  * @return the shutdown handle
2509  */
2510 struct GNUNET_STREAM_ShutdownHandle *
2511 GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
2512                         int operation,
2513                         GNUNET_STREAM_ShutdownCompletion completion_cb,
2514                         void *completion_cls)
2515 {
2516   struct GNUNET_STREAM_ShutdownHandle *handle;
2517   struct GNUNET_STREAM_MessageHeader *msg;
2518
2519   handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ShutdownHandle));
2520   handle->socket = socket;
2521   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
2522   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
2523   switch (operation)
2524     {
2525     case SHUT_RD:
2526       handle->operation = SHUT_RD;
2527
2528       break;
2529     case SHUT_WR:
2530       handle->operation = SHUT_WR;
2531       
2532       break;
2533     case SHUT_RDWR:
2534       handle->operation = SHUT_RDWR;
2535       if (NULL != socket->write_handle)
2536         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2537                     "Existing write handle should be cancelled before shutting"
2538                     " down writing\n");
2539       if (NULL != socket->read_handle)
2540         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2541                     "Existing read handle should be cancelled before shutting"
2542                     " down reading\n");
2543       msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
2544       queue_message (socket,
2545                      msg,
2546                      &set_state_close_wait,
2547                      NULL);
2548       break;
2549     default:
2550       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2551                   "GNUNET_STREAM_shutdown called with invalid value for "
2552                   "parameter operation -- Ignoring\n");
2553       GNUNET_free (handle);
2554       return NULL;
2555     }
2556   return handle;
2557 }
2558
2559
2560 /**
2561  * Cancels a pending shutdown
2562  *
2563  * @param handle the shutdown handle returned from GNUNET_STREAM_shutdown
2564  */
2565 void
2566 GNUNET_STREAM_shutdown_cancel (struct GNUNET_STREAM_ShutdownHandle *handle)
2567 {
2568   return;
2569 }
2570
2571
2572 /**
2573  * Closes the stream
2574  *
2575  * @param socket the stream socket
2576  */
2577 void
2578 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
2579 {
2580   struct MessageQueue *head;
2581
2582   GNUNET_break (NULL == socket->read_handle);
2583   GNUNET_break (NULL == socket->write_handle);
2584
2585   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
2586     {
2587       /* socket closed with read task pending!? */
2588       GNUNET_break (0);
2589       GNUNET_SCHEDULER_cancel (socket->read_task_id);
2590       socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
2591     }
2592   
2593   /* Terminate the ack'ing tasks if they are still present */
2594   if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK)
2595     {
2596       GNUNET_SCHEDULER_cancel (socket->ack_task_id);
2597       socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
2598     }
2599
2600   /* Clear Transmit handles */
2601   if (NULL != socket->transmit_handle)
2602     {
2603       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2604       socket->transmit_handle = NULL;
2605     }
2606
2607   /* Clear existing message queue */
2608   while (NULL != (head = socket->queue_head)) {
2609     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
2610                                  socket->queue_tail,
2611                                  head);
2612     GNUNET_free (head->message);
2613     GNUNET_free (head);
2614   }
2615
2616   /* Close associated tunnel */
2617   if (NULL != socket->tunnel)
2618     {
2619       GNUNET_MESH_tunnel_destroy (socket->tunnel);
2620       socket->tunnel = NULL;
2621     }
2622
2623   /* Close mesh connection */
2624   if (NULL != socket->mesh && NULL == socket->lsocket)
2625     {
2626       GNUNET_MESH_disconnect (socket->mesh);
2627       socket->mesh = NULL;
2628     }
2629   
2630   /* Release receive buffer */
2631   if (NULL != socket->receive_buffer)
2632     {
2633       GNUNET_free (socket->receive_buffer);
2634     }
2635
2636   GNUNET_free (socket);
2637 }
2638
2639
2640 /**
2641  * Listens for stream connections for a specific application ports
2642  *
2643  * @param cfg the configuration to use
2644  * @param app_port the application port for which new streams will be accepted
2645  * @param listen_cb this function will be called when a peer tries to establish
2646  *            a stream with us
2647  * @param listen_cb_cls closure for listen_cb
2648  * @return listen socket, NULL for any error
2649  */
2650 struct GNUNET_STREAM_ListenSocket *
2651 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
2652                       GNUNET_MESH_ApplicationType app_port,
2653                       GNUNET_STREAM_ListenCallback listen_cb,
2654                       void *listen_cb_cls)
2655 {
2656   /* FIXME: Add variable args for passing configration options? */
2657   struct GNUNET_STREAM_ListenSocket *lsocket;
2658   struct GNUNET_PeerIdentity our_peer_id;
2659
2660   lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
2661   lsocket->port = app_port;
2662   lsocket->listen_cb = listen_cb;
2663   lsocket->listen_cb_cls = listen_cb_cls;
2664   GNUNET_TESTING_get_peer_identity (cfg, &our_peer_id);
2665   lsocket->our_id = GNUNET_PEER_intern (&our_peer_id);
2666   lsocket->mesh = GNUNET_MESH_connect (cfg,
2667                                        10, /* FIXME: QUEUE size as parameter? */
2668                                        lsocket, /* Closure */
2669                                        &new_tunnel_notify,
2670                                        &tunnel_cleaner,
2671                                        server_message_handlers,
2672                                        &app_port);
2673   GNUNET_assert (NULL != lsocket->mesh);
2674   return lsocket;
2675 }
2676
2677
2678 /**
2679  * Closes the listen socket
2680  *
2681  * @param lsocket the listen socket
2682  */
2683 void
2684 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
2685 {
2686   /* Close MESH connection */
2687   GNUNET_assert (NULL != lsocket->mesh);
2688   GNUNET_MESH_disconnect (lsocket->mesh);
2689   
2690   GNUNET_free (lsocket);
2691 }
2692
2693
2694 /**
2695  * Tries to write the given data to the stream
2696  *
2697  * @param socket the socket representing a stream
2698  * @param data the data buffer from where the data is written into the stream
2699  * @param size the number of bytes to be written from the data buffer
2700  * @param timeout the timeout period
2701  * @param write_cont the function to call upon writing some bytes into the stream
2702  * @param write_cont_cls the closure
2703  * @return handle to cancel the operation
2704  */
2705 struct GNUNET_STREAM_IOWriteHandle *
2706 GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
2707                      const void *data,
2708                      size_t size,
2709                      struct GNUNET_TIME_Relative timeout,
2710                      GNUNET_STREAM_CompletionContinuation write_cont,
2711                      void *write_cont_cls)
2712 {
2713   unsigned int num_needed_packets;
2714   unsigned int packet;
2715   struct GNUNET_STREAM_IOWriteHandle *io_handle;
2716   uint32_t packet_size;
2717   uint32_t payload_size;
2718   struct GNUNET_STREAM_DataMessage *data_msg;
2719   const void *sweep;
2720   struct GNUNET_TIME_Relative ack_deadline;
2721
2722   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2723               "%s\n", __func__);
2724
2725   /* Return NULL if there is already a write request pending */
2726   if (NULL != socket->write_handle)
2727   {
2728     GNUNET_break (0);
2729     return NULL;
2730   }
2731   if (!((STATE_ESTABLISHED == socket->state)
2732         || (STATE_RECEIVE_CLOSE_WAIT == socket->state)
2733         || (STATE_RECEIVE_CLOSED == socket->state)))
2734     {
2735       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2736                   "%x: Attempting to write on a closed (OR) not-yet-established"
2737                   "stream\n",
2738                   socket->our_id);
2739       return NULL;
2740     } 
2741   if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size)
2742     size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH  * max_payload_size;
2743   num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size;
2744   io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
2745   io_handle->socket = socket;
2746   io_handle->write_cont = write_cont;
2747   io_handle->write_cont_cls = write_cont_cls;
2748   io_handle->size = size;
2749   sweep = data;
2750   /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
2751      determined from RTT */
2752   ack_deadline = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
2753   /* Divide the given buffer into packets for sending */
2754   for (packet=0; packet < num_needed_packets; packet++)
2755     {
2756       if ((packet + 1) * max_payload_size < size) 
2757         {
2758           payload_size = max_payload_size;
2759           packet_size = MAX_PACKET_SIZE;
2760         }
2761       else 
2762         {
2763           payload_size = size - packet * max_payload_size;
2764           packet_size =  payload_size + sizeof (struct
2765                                                 GNUNET_STREAM_DataMessage); 
2766         }
2767       io_handle->messages[packet] = GNUNET_malloc (packet_size);
2768       io_handle->messages[packet]->header.header.size = htons (packet_size);
2769       io_handle->messages[packet]->header.header.type =
2770         htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
2771       io_handle->messages[packet]->sequence_number =
2772         htonl (socket->write_sequence_number++);
2773       io_handle->messages[packet]->offset = htonl (socket->write_offset);
2774
2775       /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
2776          determined from RTT */
2777       io_handle->messages[packet]->ack_deadline =
2778         GNUNET_TIME_relative_hton (ack_deadline);
2779       data_msg = io_handle->messages[packet];
2780       /* Copy data from given buffer to the packet */
2781       memcpy (&data_msg[1],
2782               sweep,
2783               payload_size);
2784       sweep += payload_size;
2785       socket->write_offset += payload_size;
2786     }
2787   socket->write_handle = io_handle;
2788   write_data (socket);
2789
2790   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2791               "%s() END\n", __func__);
2792
2793   return io_handle;
2794 }
2795
2796
2797 /**
2798  * Tries to read data from the stream
2799  *
2800  * @param socket the socket representing a stream
2801  * @param timeout the timeout period
2802  * @param proc function to call with data (once only)
2803  * @param proc_cls the closure for proc
2804  * @return handle to cancel the operation
2805  */
2806 struct GNUNET_STREAM_IOReadHandle *
2807 GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
2808                     struct GNUNET_TIME_Relative timeout,
2809                     GNUNET_STREAM_DataProcessor proc,
2810                     void *proc_cls)
2811 {
2812   struct GNUNET_STREAM_IOReadHandle *read_handle;
2813   
2814   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2815               "%x: %s()\n", 
2816               socket->our_id,
2817               __func__);
2818
2819   /* Return NULL if there is already a read handle; the user has to cancel that
2820   first before continuing or has to wait until it is completed */
2821   if (NULL != socket->read_handle) return NULL;
2822
2823   GNUNET_assert (NULL != proc);
2824
2825   read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
2826   read_handle->proc = proc;
2827   read_handle->proc_cls = proc_cls;
2828   socket->read_handle = read_handle;
2829
2830   /* Check if we have a packet at bitmap 0 */
2831   if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
2832                                           0))
2833     {
2834       socket->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
2835                                                        socket);
2836    
2837     }
2838   
2839   /* Setup the read timeout task */
2840   socket->read_io_timeout_task_id =
2841     GNUNET_SCHEDULER_add_delayed (timeout,
2842                                   &read_io_timeout,
2843                                   socket);
2844   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2845               "%x: %s() END\n",
2846               socket->our_id,
2847               __func__);
2848   return read_handle;
2849 }
2850
2851
2852 /**
2853  * Cancel pending write operation.
2854  *
2855  * @param ioh handle to operation to cancel
2856  */
2857 void
2858 GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
2859 {
2860   struct GNUNET_STREAM_Socket *socket = ioh->socket;
2861   unsigned int packet;
2862
2863   GNUNET_assert (NULL != socket->write_handle);
2864   GNUNET_assert (socket->write_handle == ioh);
2865
2866   if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
2867     {
2868       GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
2869       socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
2870     }
2871
2872   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2873     {
2874       if (NULL == ioh->messages[packet]) break;
2875       GNUNET_free (ioh->messages[packet]);
2876     }
2877       
2878   GNUNET_free (socket->write_handle);
2879   socket->write_handle = NULL;
2880   return;
2881 }
2882
2883
2884 /**
2885  * Cancel pending read operation.
2886  *
2887  * @param ioh handle to operation to cancel
2888  */
2889 void
2890 GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh)
2891 {
2892   return;
2893 }