-bugfixes and testcases
[oweals/gnunet.git] / src / stream / stream_api.c
1 /*
2   This file is part of GNUnet.
3   (C) 2012 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /* TODO:
22  * Copy MESH handle from lsocket to socket
23  *
24  * Checks for matching the sender and socket->other_peer in server
25  * message handlers  */
26
27 /**
28  * @file stream/stream_api.c
29  * @brief Implementation of the stream library
30  * @author Sree Harsha Totakura
31  */
32 #include "platform.h"
33 #include "gnunet_common.h"
34 #include "gnunet_crypto_lib.h"
35 #include "gnunet_stream_lib.h"
36 #include "stream_protocol.h"
37
38
39 /**
40  * The maximum packet size of a stream packet
41  */
42 #define MAX_PACKET_SIZE 64000
43
44 /**
45  * The maximum payload a data message packet can carry
46  */
47 static size_t max_payload_size = 
48   MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
49
50 /**
51  * Receive buffer
52  */
53 #define RECEIVE_BUFFER_SIZE 4096000
54
55 /**
56  * states in the Protocol
57  */
58 enum State
59   {
60     /**
61      * Client initialization state
62      */
63     STATE_INIT,
64
65     /**
66      * Listener initialization state 
67      */
68     STATE_LISTEN,
69
70     /**
71      * Pre-connection establishment state
72      */
73     STATE_HELLO_WAIT,
74
75     /**
76      * State where a connection has been established
77      */
78     STATE_ESTABLISHED,
79
80     /**
81      * State where the socket is closed on our side and waiting to be ACK'ed
82      */
83     STATE_RECEIVE_CLOSE_WAIT,
84
85     /**
86      * State where the socket is closed for reading
87      */
88     STATE_RECEIVE_CLOSED,
89
90     /**
91      * State where the socket is closed on our side and waiting to be ACK'ed
92      */
93     STATE_TRANSMIT_CLOSE_WAIT,
94
95     /**
96      * State where the socket is closed for writing
97      */
98     STATE_TRANSMIT_CLOSED,
99
100     /**
101      * State where the socket is closed on our side and waiting to be ACK'ed
102      */
103     STATE_CLOSE_WAIT,
104
105     /**
106      * State where the socket is closed
107      */
108     STATE_CLOSED 
109   };
110
111
112 /**
113  * Functions of this type are called when a message is written
114  *
115  * @param cls the closure from queue_message
116  * @param socket the socket the written message was bound to
117  */
118 typedef void (*SendFinishCallback) (void *cls,
119                                     struct GNUNET_STREAM_Socket *socket);
120
121
122 /**
123  * The send message queue
124  */
125 struct MessageQueue
126 {
127   /**
128    * The message
129    */
130   struct GNUNET_STREAM_MessageHeader *message;
131
132   /**
133    * Callback to be called when the message is sent
134    */
135   SendFinishCallback finish_cb;
136
137   /**
138    * The closure for finish_cb
139    */
140   void *finish_cb_cls;
141
142   /**
143    * The next message in queue. Should be NULL in the last message
144    */
145   struct MessageQueue *next;
146
147   /**
148    * The next message in queue. Should be NULL in the first message
149    */
150   struct MessageQueue *prev;
151 };
152
153
154 /**
155  * The STREAM Socket Handler
156  */
157 struct GNUNET_STREAM_Socket
158 {
159
160   /**
161    * The peer identity of the peer at the other end of the stream
162    */
163   struct GNUNET_PeerIdentity other_peer;
164
165   /**
166    * Retransmission timeout
167    */
168   struct GNUNET_TIME_Relative retransmit_timeout;
169
170   /**
171    * The Acknowledgement Bitmap
172    */
173   GNUNET_STREAM_AckBitmap ack_bitmap;
174
175   /**
176    * Time when the Acknowledgement was queued
177    */
178   struct GNUNET_TIME_Absolute ack_time_registered;
179
180   /**
181    * Queued Acknowledgement deadline
182    */
183   struct GNUNET_TIME_Relative ack_time_deadline;
184
185   /**
186    * The task for sending timely Acks
187    */
188   GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
189
190   /**
191    * Task scheduled to continue a read operation.
192    */
193   GNUNET_SCHEDULER_TaskIdentifier read_task;
194
195   /**
196    * The mesh handle
197    */
198   struct GNUNET_MESH_Handle *mesh;
199
200   /**
201    * The mesh tunnel handle
202    */
203   struct GNUNET_MESH_Tunnel *tunnel;
204
205   /**
206    * Stream open closure
207    */
208   void *open_cls;
209
210   /**
211    * Stream open callback
212    */
213   GNUNET_STREAM_OpenCallback open_cb;
214
215   /**
216    * The current transmit handle (if a pending transmit request exists)
217    */
218   struct GNUNET_MESH_TransmitHandle *transmit_handle;
219
220   /**
221    * The current message associated with the transmit handle
222    */
223   struct MessageQueue *queue_head;
224
225   /**
226    * The queue tail, should always point to the last message in queue
227    */
228   struct MessageQueue *queue_tail;
229
230   /**
231    * The write IO_handle associated with this socket
232    */
233   struct GNUNET_STREAM_IOWriteHandle *write_handle;
234
235   /**
236    * The read IO_handle associated with this socket
237    */
238   struct GNUNET_STREAM_IOReadHandle *read_handle;
239
240   /**
241    * Buffer for storing received messages
242    */
243   void *receive_buffer;
244
245   /**
246    * Task identifier for the read io timeout task
247    */
248   GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task;
249
250   /**
251    * The state of the protocol associated with this socket
252    */
253   enum State state;
254
255   /**
256    * The status of the socket
257    */
258   enum GNUNET_STREAM_Status status;
259
260   /**
261    * The number of previous timeouts; FIXME: currently not used
262    */
263   unsigned int retries;
264
265   /**
266    * The application port number (type: uint32_t)
267    */
268   GNUNET_MESH_ApplicationType app_port;
269
270   /**
271    * The session id associated with this stream connection
272    * FIXME: Not used currently, may be removed
273    */
274   uint32_t session_id;
275
276   /**
277    * Write sequence number. Set to random when sending HELLO(client) and
278    * HELLO_ACK(server) 
279    */
280   uint32_t write_sequence_number;
281
282   /**
283    * Read sequence number. This number's value is determined during handshake
284    */
285   uint32_t read_sequence_number;
286
287   /**
288    * The receiver buffer size
289    */
290   uint32_t receive_buffer_size;
291
292   /**
293    * The receiver buffer boundaries
294    */
295   uint32_t receive_buffer_boundaries[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
296
297   /**
298    * receiver's available buffer after the last acknowledged packet
299    */
300   uint32_t receive_window_available;
301
302   /**
303    * The offset pointer used during write operation
304    */
305   uint32_t write_offset;
306
307   /**
308    * The offset after which we are expecting data
309    */
310   uint32_t read_offset;
311
312   /**
313    * The offset upto which user has read from the received buffer
314    */
315   uint32_t copy_offset;
316 };
317
318
319 /**
320  * A socket for listening
321  */
322 struct GNUNET_STREAM_ListenSocket
323 {
324
325   /**
326    * The mesh handle
327    */
328   struct GNUNET_MESH_Handle *mesh;
329
330   /**
331    * The callback function which is called after successful opening socket
332    */
333   GNUNET_STREAM_ListenCallback listen_cb;
334
335   /**
336    * The call back closure
337    */
338   void *listen_cb_cls;
339
340   /**
341    * The service port
342    * FIXME: Remove if not required!
343    */
344   GNUNET_MESH_ApplicationType port;
345 };
346
347
348 /**
349  * The IO Write Handle
350  */
351 struct GNUNET_STREAM_IOWriteHandle
352 {
353   /**
354    * The packet_buffers associated with this Handle
355    */
356   struct GNUNET_STREAM_DataMessage *messages[64];
357
358   /**
359    * The bitmap of this IOHandle; Corresponding bit for a message is set when
360    * it has been acknowledged by the receiver
361    */
362   GNUNET_STREAM_AckBitmap ack_bitmap;
363
364   /**
365    * Number of packets sent before waiting for an ack
366    *
367    * FIXME: Do we need this?
368    */
369   unsigned int sent_packets;
370 };
371
372
373 /**
374  * The IO Read Handle
375  */
376 struct GNUNET_STREAM_IOReadHandle
377 {
378   /**
379    * Callback for the read processor
380    */
381   GNUNET_STREAM_DataProcessor proc;
382
383   /**
384    * The closure pointer for the read processor callback
385    */
386   void *proc_cls;
387 };
388
389
390 /**
391  * Default value in seconds for various timeouts
392  */
393 static unsigned int default_timeout = 300;
394
395
396 /**
397  * Callback function for sending hello message
398  *
399  * @param cls closure the socket
400  * @param size number of bytes available in buf
401  * @param buf where the callee should write the message
402  * @return number of bytes written to buf
403  */
404 static size_t
405 send_message_notify (void *cls, size_t size, void *buf)
406 {
407   struct GNUNET_STREAM_Socket *socket = cls;
408   struct MessageQueue *head;
409   size_t ret;
410
411   socket->transmit_handle = NULL; /* Remove the transmit handle */
412   head = socket->queue_head;
413   if (NULL == head)
414     return 0; /* just to be safe */
415   if (0 == size)                /* request timed out */
416     {
417       socket->retries++;
418       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
419                   "Message sending timed out. Retry %d \n",
420                   socket->retries);
421       socket->transmit_handle = 
422         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
423                                            0, /* Corking */
424                                            1, /* Priority */
425                                            /* FIXME: exponential backoff */
426                                            socket->retransmit_timeout,
427                                            &socket->other_peer,
428                                            ntohs (head->message->header.size),
429                                            &send_message_notify,
430                                            socket);
431       return 0;
432     }
433
434   ret = ntohs (head->message->header.size);
435   GNUNET_assert (size >= ret);
436   memcpy (buf, head->message, ret);
437   if (NULL != head->finish_cb)
438     {
439       head->finish_cb (head->finish_cb_cls, socket);
440     }
441   GNUNET_CONTAINER_DLL_remove (socket->queue_head,
442                                socket->queue_tail,
443                                head);
444   GNUNET_free (head->message);
445   GNUNET_free (head);
446   head = socket->queue_head;
447   if (NULL != head)    /* more pending messages to send */
448     {
449       socket->retries = 0;
450       socket->transmit_handle = 
451         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
452                                            0, /* Corking */
453                                            1, /* Priority */
454                                            /* FIXME: exponential backoff */
455                                            socket->retransmit_timeout,
456                                            &socket->other_peer,
457                                            ntohs (head->message->header.size),
458                                            &send_message_notify,
459                                            socket);
460     }
461   return ret;
462 }
463
464
465 /**
466  * Queues a message for sending using the mesh connection of a socket
467  *
468  * @param socket the socket whose mesh connection is used
469  * @param message the message to be sent
470  * @param finish_cb the callback to be called when the message is sent
471  * @param finish_cb_cls the closure for the callback
472  */
473 static void
474 queue_message (struct GNUNET_STREAM_Socket *socket,
475                struct GNUNET_STREAM_MessageHeader *message,
476                SendFinishCallback finish_cb,
477                void *finish_cb_cls)
478 {
479   struct MessageQueue *queue_entity;
480
481   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
482               "Queueing message of type %d and size %d\n",
483               ntohs (message->header.type),
484               ntohs (message->header.size));
485   queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
486   queue_entity->message = message;
487   queue_entity->finish_cb = finish_cb;
488   queue_entity->finish_cb_cls = finish_cb_cls;
489   GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
490                                     socket->queue_tail,
491                                     queue_entity);
492   if (NULL == socket->transmit_handle)
493   {
494     socket->retries = 0;
495     socket->transmit_handle = 
496       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
497                                          0, /* Corking */
498                                          1, /* Priority */
499                                          socket->retransmit_timeout,
500                                          &socket->other_peer,
501                                          ntohs (message->header.size),
502                                          &send_message_notify,
503                                          socket);
504   }
505 }
506
507
508 /**
509  * Callback function for sending ack message
510  *
511  * @param cls closure the ACK message created in ack_task
512  * @param size number of bytes available in buffer
513  * @param buf where the callee should write the message
514  * @return number of bytes written to buf
515  */
516 static size_t
517 send_ack_notify (void *cls, size_t size, void *buf)
518 {
519   struct GNUNET_STREAM_AckMessage *ack_msg = cls;
520
521   if (0 == size)
522     {
523       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
524                   "%s called with size 0\n", __func__);
525       return 0;
526     }
527   GNUNET_assert (ack_msg->header.header.size <= size);
528   
529   size = ack_msg->header.header.size;
530   memcpy (buf, ack_msg, size);
531   return size;
532 }
533
534
535 /**
536  * Task for sending ACK message
537  *
538  * @param cls the socket
539  * @param tc the Task context
540  */
541 static void
542 ack_task (void *cls,
543           const struct GNUNET_SCHEDULER_TaskContext *tc)
544 {
545   struct GNUNET_STREAM_Socket *socket = cls;
546   struct GNUNET_STREAM_AckMessage *ack_msg;
547
548   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
549     {
550       return;
551     }
552
553   socket->ack_task_id = 0;
554
555   /* Create the ACK Message */
556   ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
557   ack_msg->header.header.size = htons (sizeof (struct 
558                                                GNUNET_STREAM_AckMessage));
559   ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
560   ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
561   ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
562   ack_msg->receive_window_remaining = 
563     htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
564
565   /* Request MESH for sending ACK */
566   GNUNET_MESH_notify_transmit_ready (socket->tunnel,
567                                      0, /* Corking */
568                                      1, /* Priority */
569                                      socket->retransmit_timeout,
570                                      &socket->other_peer,
571                                      ntohs (ack_msg->header.header.size),
572                                      &send_ack_notify,
573                                      ack_msg);
574
575   
576 }
577
578
579 /**
580  * Function to modify a bit in GNUNET_STREAM_AckBitmap
581  *
582  * @param bitmap the bitmap to modify
583  * @param bit the bit number to modify
584  * @param value GNUNET_YES to on, GNUNET_NO to off
585  */
586 static void
587 ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
588                       unsigned int bit, 
589                       int value)
590 {
591   GNUNET_assert (bit < 64);
592   if (GNUNET_YES == value)
593     *bitmap |= (1LL << bit);
594   else
595     *bitmap &= ~(1LL << bit);
596 }
597
598
599 /**
600  * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
601  *
602  * @param bitmap address of the bitmap that has to be checked
603  * @param bit the bit number to check
604  * @return GNUNET_YES if the bit is set; GNUNET_NO if not
605  */
606 static uint8_t
607 ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
608                       unsigned int bit)
609 {
610   GNUNET_assert (bit < 64);
611   return 0 != (*bitmap & (1LL << bit));
612 }
613
614
615
616 /**
617  * Function called when Data Message is sent
618  *
619  * @param cls the io_handle corresponding to the Data Message
620  * @param socket the socket which was used
621  */
622 static void
623 write_data_finish_cb (void *cls,
624                       struct GNUNET_STREAM_Socket *socket)
625 {
626   struct GNUNET_STREAM_IOWriteHandle *io_handle = cls;
627
628   io_handle->sent_packets++;
629 }
630
631
632 /**
633  * Writes data using the given socket. The amount of data written is limited by
634  * the receive_window_size
635  *
636  * @param socket the socket to use
637  */
638 static void 
639 write_data (struct GNUNET_STREAM_Socket *socket)
640 {
641   struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
642   unsigned int packet;
643   int ack_packet;
644
645   ack_packet = -1;
646   /* Find the last acknowledged packet */
647   for (packet=0; packet < 64; packet++)
648     {      
649       if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
650                                               packet))
651         ack_packet = packet;        
652       else if (NULL == io_handle->messages[packet])
653         break;
654     }
655   /* Resend packets which weren't ack'ed */
656   for (packet=0; packet < ack_packet; packet++)
657     {
658       if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
659                                              packet))
660         {
661           queue_message (socket,
662                          &io_handle->messages[packet]->header,
663                          NULL,
664                          NULL);
665         }
666     }
667   packet = ack_packet + 1;
668   /* Now send new packets if there is enough buffer space */
669   while ( (NULL != io_handle->messages[packet]) &&
670           (socket->receive_window_available >= ntohs (io_handle->messages[packet]->header.header.size)) )
671     {
672       socket->receive_window_available -= ntohs (io_handle->messages[packet]->header.header.size);
673       queue_message (socket,
674                      &io_handle->messages[packet]->header,
675                      &write_data_finish_cb,
676                      io_handle);
677       packet++;
678     }
679 }
680
681
682 /**
683  * Task for calling the read processor
684  *
685  * @param cls the socket
686  * @param tc the task context
687  */
688 static void
689 call_read_processor (void *cls,
690                           const struct GNUNET_SCHEDULER_TaskContext *tc)
691 {
692   struct GNUNET_STREAM_Socket *socket = cls;
693   size_t read_size;
694   size_t valid_read_size;
695   unsigned int packet;
696   uint32_t sequence_increase;
697   uint32_t offset_increase;
698
699   socket->read_task = GNUNET_SCHEDULER_NO_TASK;
700   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
701     return;
702
703   GNUNET_assert (NULL != socket->read_handle);
704   GNUNET_assert (NULL != socket->read_handle->proc);
705
706   /* Check the bitmap for any holes */
707   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
708     {
709       if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
710                                              packet))
711         break;
712     }
713   /* We only call read processor if we have the first packet */
714   GNUNET_assert (0 < packet);
715
716   valid_read_size = 
717     socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
718
719   GNUNET_assert (0 != valid_read_size);
720
721   /* Cancel the read_io_timeout_task */
722   GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task);
723   socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK;
724
725   /* Call the data processor */
726   read_size = 
727     socket->read_handle->proc (socket->read_handle->proc_cls,
728                                socket->status,
729                                socket->receive_buffer + socket->copy_offset,
730                                valid_read_size);
731   /* Free the read handle */
732   GNUNET_free (socket->read_handle);
733   socket->read_handle = NULL;
734
735   GNUNET_assert (read_size <= valid_read_size);
736   socket->copy_offset += read_size;
737
738   /* Determine upto which packet we can remove from the buffer */
739   for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
740     if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
741       break;
742
743   /* If no packets can be removed we can't move the buffer */
744   if (0 == packet) return;
745
746   sequence_increase = packet;
747
748   /* Shift the data in the receive buffer */
749   memmove (socket->receive_buffer,
750            socket->receive_buffer 
751            + socket->receive_buffer_boundaries[sequence_increase-1],
752            socket->receive_buffer_size - socket->receive_buffer_boundaries[sequence_increase-1]);
753   
754   /* Shift the bitmap */
755   socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
756   
757   /* Set read_sequence_number */
758   socket->read_sequence_number += sequence_increase;
759   
760   /* Set read_offset */
761   offset_increase = socket->receive_buffer_boundaries[sequence_increase-1];
762   socket->read_offset += offset_increase;
763
764   /* Fix copy_offset */
765   GNUNET_assert (offset_increase <= socket->copy_offset);
766   socket->copy_offset -= offset_increase;
767   
768   /* Fix relative boundaries */
769   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
770     {
771       if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase)
772         {
773           socket->receive_buffer_boundaries[packet] = 
774             socket->receive_buffer_boundaries[packet + sequence_increase] 
775             - offset_increase;
776         }
777       else
778         socket->receive_buffer_boundaries[packet] = 0;
779     }
780 }
781
782
783 /**
784  * Cancels the existing read io handle
785  *
786  * @param cls the closure from the SCHEDULER call
787  * @param tc the task context
788  */
789 static void
790 read_io_timeout (void *cls, 
791                 const struct GNUNET_SCHEDULER_TaskContext *tc)
792 {
793   struct GNUNET_STREAM_Socket *socket = cls;
794
795   socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK;
796   if (socket->read_task != GNUNET_SCHEDULER_NO_TASK)
797   {
798     GNUNET_SCHEDULER_cancel (socket->read_task);
799     socket->read_task = GNUNET_SCHEDULER_NO_TASK;
800   }
801   GNUNET_assert (NULL != socket->read_handle);
802   
803   GNUNET_free (socket->read_handle);
804   socket->read_handle = NULL;
805 }
806
807
808 /**
809  * Handler for DATA messages; Same for both client and server
810  *
811  * @param socket the socket through which the ack was received
812  * @param tunnel connection to the other end
813  * @param sender who sent the message
814  * @param msg the data message
815  * @param atsi performance data for the connection
816  * @return GNUNET_OK to keep the connection open,
817  *         GNUNET_SYSERR to close it (signal serious error)
818  */
819 static int
820 handle_data (struct GNUNET_STREAM_Socket *socket,
821              struct GNUNET_MESH_Tunnel *tunnel,
822              const struct GNUNET_PeerIdentity *sender,
823              const struct GNUNET_STREAM_DataMessage *msg,
824              const struct GNUNET_ATS_Information*atsi)
825 {
826   const void *payload;
827   uint32_t bytes_needed;
828   uint32_t relative_offset;
829   uint32_t relative_sequence_number;
830   uint16_t size;
831
832   size = htons (msg->header.header.size);
833   if (size < sizeof (struct GNUNET_STREAM_DataMessage))
834     {
835       GNUNET_break_op (0);
836       return GNUNET_SYSERR;
837     }
838
839   switch (socket->state)
840     {
841     case STATE_ESTABLISHED:
842     case STATE_TRANSMIT_CLOSED:
843     case STATE_TRANSMIT_CLOSE_WAIT:
844
845       /* check if the message's sequence number is in the range we are
846          expecting */
847       relative_sequence_number = 
848         ntohl (msg->sequence_number) - socket->read_sequence_number;
849       if ( relative_sequence_number > 64)
850         {
851           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
852                       "Ignoring received message with sequence number %d",
853                       ntohl (msg->sequence_number));
854           return GNUNET_YES;
855         }
856
857       /* Check if we have to allocate the buffer */
858       size -= sizeof (struct GNUNET_STREAM_DataMessage);
859       relative_offset = ntohl (msg->offset) - socket->read_offset;
860       bytes_needed = relative_offset + size;
861       
862       if (bytes_needed > socket->receive_buffer_size)
863         {
864           if (bytes_needed <= RECEIVE_BUFFER_SIZE)
865             {
866               socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
867                                                        bytes_needed);
868               socket->receive_buffer_size = bytes_needed;
869             }
870           else
871             {
872               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
873                           "Cannot accommodate packet %d as buffer is full\n",
874                           ntohl (msg->sequence_number));
875               return GNUNET_YES;
876             }
877         }
878       
879       /* Copy Data to buffer */
880       payload = &msg[1];
881       GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
882       memcpy (socket->receive_buffer + relative_offset,
883               payload,
884               size);
885       socket->receive_buffer_boundaries[relative_sequence_number] = 
886         relative_offset + size;
887       
888       /* Modify the ACK bitmap */
889       ackbitmap_modify_bit (&socket->ack_bitmap,
890                             relative_sequence_number,
891                             GNUNET_YES);
892
893       /* Start ACK sending task if one is not already present */
894       if (0 == socket->ack_task_id)
895        {
896          socket->ack_task_id = 
897            GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
898                                          (msg->ack_deadline),
899                                          &ack_task,
900                                          socket);
901        }
902
903       if ((NULL != socket->read_handle) /* A read handle is waiting */
904           /* There is no current read task */
905           && (GNUNET_SCHEDULER_NO_TASK == socket->read_task)
906           /* We have the first packet */
907           && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap,
908                                                  0)))
909         {
910           socket->read_task = 
911             GNUNET_SCHEDULER_add_now (&call_read_processor,
912                                       socket);
913         }
914       
915       break;
916
917     default:
918       /* FIXME: call statistics */
919       break;
920     }
921   return GNUNET_YES;
922 }
923
924 /**
925  * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
926  *
927  * @param cls the socket (set from GNUNET_MESH_connect)
928  * @param tunnel connection to the other end
929  * @param tunnel_ctx place to store local state associated with the tunnel
930  * @param sender who sent the message
931  * @param message the actual message
932  * @param atsi performance data for the connection
933  * @return GNUNET_OK to keep the connection open,
934  *         GNUNET_SYSERR to close it (signal serious error)
935  */
936 static int
937 client_handle_data (void *cls,
938              struct GNUNET_MESH_Tunnel *tunnel,
939              void **tunnel_ctx,
940              const struct GNUNET_PeerIdentity *sender,
941              const struct GNUNET_MessageHeader *message,
942              const struct GNUNET_ATS_Information*atsi)
943 {
944   struct GNUNET_STREAM_Socket *socket = cls;
945
946   return handle_data (socket, 
947                       tunnel, 
948                       sender, 
949                       (const struct GNUNET_STREAM_DataMessage *) message, 
950                       atsi);
951 }
952
953
954 /**
955  * Callback to set state to ESTABLISHED
956  *
957  * @param cls the closure from queue_message FIXME: document
958  * @param socket the socket to requiring state change
959  */
960 static void
961 set_state_established (void *cls,
962                        struct GNUNET_STREAM_Socket *socket)
963 {
964   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Attaining ESTABLISHED state\n");
965   socket->write_offset = 0;
966   socket->read_offset = 0;
967   socket->state = STATE_ESTABLISHED;
968   if (socket->open_cb)
969     socket->open_cb (socket->open_cls, socket);
970 }
971
972
973 /**
974  * Callback to set state to HELLO_WAIT
975  *
976  * @param cls the closure from queue_message
977  * @param socket the socket to requiring state change
978  */
979 static void
980 set_state_hello_wait (void *cls,
981                       struct GNUNET_STREAM_Socket *socket)
982 {
983   GNUNET_assert (STATE_INIT == socket->state);
984   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Attaining HELLO_WAIT state\n");
985   socket->state = STATE_HELLO_WAIT;
986 }
987
988
989 /**
990  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
991  *
992  * @param cls the socket (set from GNUNET_MESH_connect)
993  * @param tunnel connection to the other end
994  * @param tunnel_ctx this is NULL
995  * @param sender who sent the message
996  * @param message the actual message
997  * @param atsi performance data for the connection
998  * @return GNUNET_OK to keep the connection open,
999  *         GNUNET_SYSERR to close it (signal serious error)
1000  */
1001 static int
1002 client_handle_hello_ack (void *cls,
1003                          struct GNUNET_MESH_Tunnel *tunnel,
1004                          void **tunnel_ctx,
1005                          const struct GNUNET_PeerIdentity *sender,
1006                          const struct GNUNET_MessageHeader *message,
1007                          const struct GNUNET_ATS_Information*atsi)
1008 {
1009   struct GNUNET_STREAM_Socket *socket = cls;
1010   const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
1011   struct GNUNET_STREAM_HelloAckMessage *reply;
1012
1013   ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
1014   GNUNET_assert (socket->tunnel == tunnel);
1015   switch (socket->state)
1016   {
1017   case STATE_HELLO_WAIT:
1018       socket->read_sequence_number = ntohl (ack_msg->sequence_number);
1019       socket->receive_window_available = ntohl (ack_msg->receive_window_size);
1020       /* Get the random sequence number */
1021       socket->write_sequence_number = 
1022         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1023       reply = 
1024         GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1025       reply->header.header.size = 
1026         htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1027       reply->header.header.type = 
1028         htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1029       reply->sequence_number = htonl (socket->write_sequence_number);
1030       reply->receive_window_size = htonl (RECEIVE_BUFFER_SIZE);
1031       queue_message (socket, 
1032                      &reply->header, 
1033                      &set_state_established, 
1034                      NULL);      
1035       return GNUNET_OK;
1036   case STATE_ESTABLISHED:
1037   case STATE_RECEIVE_CLOSE_WAIT:
1038     // call statistics (# ACKs ignored++)
1039     return GNUNET_OK;
1040   case STATE_INIT:
1041   default:
1042     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1043                 "Server sent HELLO_ACK when in state %d\n", socket->state);
1044     socket->state = STATE_CLOSED; // introduce STATE_ERROR?
1045     return GNUNET_SYSERR;
1046   }
1047
1048 }
1049
1050
1051 /**
1052  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1053  *
1054  * @param cls the socket (set from GNUNET_MESH_connect)
1055  * @param tunnel connection to the other end
1056  * @param tunnel_ctx this is NULL
1057  * @param sender who sent the message
1058  * @param message the actual message
1059  * @param atsi performance data for the connection
1060  * @return GNUNET_OK to keep the connection open,
1061  *         GNUNET_SYSERR to close it (signal serious error)
1062  */
1063 static int
1064 client_handle_reset (void *cls,
1065                      struct GNUNET_MESH_Tunnel *tunnel,
1066                      void **tunnel_ctx,
1067                      const struct GNUNET_PeerIdentity *sender,
1068                      const struct GNUNET_MessageHeader *message,
1069                      const struct GNUNET_ATS_Information*atsi)
1070 {
1071   struct GNUNET_STREAM_Socket *socket = cls;
1072
1073   return GNUNET_OK;
1074 }
1075
1076
1077 /**
1078  * Common message handler for handling TRANSMIT_CLOSE messages
1079  *
1080  * @param socket the socket through which the ack was received
1081  * @param tunnel connection to the other end
1082  * @param sender who sent the message
1083  * @param msg the transmit close message
1084  * @param atsi performance data for the connection
1085  * @return GNUNET_OK to keep the connection open,
1086  *         GNUNET_SYSERR to close it (signal serious error)
1087  */
1088 static int
1089 handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
1090                        struct GNUNET_MESH_Tunnel *tunnel,
1091                        const struct GNUNET_PeerIdentity *sender,
1092                        const struct GNUNET_STREAM_MessageHeader *msg,
1093                        const struct GNUNET_ATS_Information*atsi)
1094 {
1095   struct GNUNET_STREAM_MessageHeader *reply;
1096
1097   switch (socket->state)
1098     {
1099     case STATE_ESTABLISHED:
1100       socket->state = STATE_RECEIVE_CLOSED;
1101
1102       /* Send TRANSMIT_CLOSE_ACK */
1103       reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1104       reply->header.type = 
1105         htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
1106       reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1107       queue_message (socket, reply, NULL, NULL);
1108       break;
1109
1110     default:
1111       /* FIXME: Call statistics? */
1112       break;
1113     }
1114   return GNUNET_YES;
1115 }
1116
1117
1118 /**
1119  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1120  *
1121  * @param cls the socket (set from GNUNET_MESH_connect)
1122  * @param tunnel connection to the other end
1123  * @param tunnel_ctx this is NULL
1124  * @param sender who sent the message
1125  * @param message the actual message
1126  * @param atsi performance data for the connection
1127  * @return GNUNET_OK to keep the connection open,
1128  *         GNUNET_SYSERR to close it (signal serious error)
1129  */
1130 static int
1131 client_handle_transmit_close (void *cls,
1132                               struct GNUNET_MESH_Tunnel *tunnel,
1133                               void **tunnel_ctx,
1134                               const struct GNUNET_PeerIdentity *sender,
1135                               const struct GNUNET_MessageHeader *message,
1136                               const struct GNUNET_ATS_Information*atsi)
1137 {
1138   struct GNUNET_STREAM_Socket *socket = cls;
1139   
1140   return handle_transmit_close (socket,
1141                                 tunnel,
1142                                 sender,
1143                                 (struct GNUNET_STREAM_MessageHeader *)message,
1144                                 atsi);
1145 }
1146
1147
1148 /**
1149  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1150  *
1151  * @param cls the socket (set from GNUNET_MESH_connect)
1152  * @param tunnel connection to the other end
1153  * @param tunnel_ctx this is NULL
1154  * @param sender who sent the message
1155  * @param message the actual message
1156  * @param atsi performance data for the connection
1157  * @return GNUNET_OK to keep the connection open,
1158  *         GNUNET_SYSERR to close it (signal serious error)
1159  */
1160 static int
1161 client_handle_transmit_close_ack (void *cls,
1162                                   struct GNUNET_MESH_Tunnel *tunnel,
1163                                   void **tunnel_ctx,
1164                                   const struct GNUNET_PeerIdentity *sender,
1165                                   const struct GNUNET_MessageHeader *message,
1166                                   const struct GNUNET_ATS_Information*atsi)
1167 {
1168   struct GNUNET_STREAM_Socket *socket = cls;
1169
1170   return GNUNET_OK;
1171 }
1172
1173
1174 /**
1175  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1176  *
1177  * @param cls the socket (set from GNUNET_MESH_connect)
1178  * @param tunnel connection to the other end
1179  * @param tunnel_ctx this is NULL
1180  * @param sender who sent the message
1181  * @param message the actual message
1182  * @param atsi performance data for the connection
1183  * @return GNUNET_OK to keep the connection open,
1184  *         GNUNET_SYSERR to close it (signal serious error)
1185  */
1186 static int
1187 client_handle_receive_close (void *cls,
1188                              struct GNUNET_MESH_Tunnel *tunnel,
1189                              void **tunnel_ctx,
1190                              const struct GNUNET_PeerIdentity *sender,
1191                              const struct GNUNET_MessageHeader *message,
1192                              const struct GNUNET_ATS_Information*atsi)
1193 {
1194   struct GNUNET_STREAM_Socket *socket = cls;
1195
1196   return GNUNET_OK;
1197 }
1198
1199
1200 /**
1201  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1202  *
1203  * @param cls the socket (set from GNUNET_MESH_connect)
1204  * @param tunnel connection to the other end
1205  * @param tunnel_ctx this is NULL
1206  * @param sender who sent the message
1207  * @param message the actual message
1208  * @param atsi performance data for the connection
1209  * @return GNUNET_OK to keep the connection open,
1210  *         GNUNET_SYSERR to close it (signal serious error)
1211  */
1212 static int
1213 client_handle_receive_close_ack (void *cls,
1214                                  struct GNUNET_MESH_Tunnel *tunnel,
1215                                  void **tunnel_ctx,
1216                                  const struct GNUNET_PeerIdentity *sender,
1217                                  const struct GNUNET_MessageHeader *message,
1218                                  const struct GNUNET_ATS_Information*atsi)
1219 {
1220   struct GNUNET_STREAM_Socket *socket = cls;
1221
1222   return GNUNET_OK;
1223 }
1224
1225
1226 /**
1227  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1228  *
1229  * @param cls the socket (set from GNUNET_MESH_connect)
1230  * @param tunnel connection to the other end
1231  * @param tunnel_ctx this is NULL
1232  * @param sender who sent the message
1233  * @param message the actual message
1234  * @param atsi performance data for the connection
1235  * @return GNUNET_OK to keep the connection open,
1236  *         GNUNET_SYSERR to close it (signal serious error)
1237  */
1238 static int
1239 client_handle_close (void *cls,
1240                      struct GNUNET_MESH_Tunnel *tunnel,
1241                      void **tunnel_ctx,
1242                      const struct GNUNET_PeerIdentity *sender,
1243                      const struct GNUNET_MessageHeader *message,
1244                      const struct GNUNET_ATS_Information*atsi)
1245 {
1246   struct GNUNET_STREAM_Socket *socket = cls;
1247
1248   return GNUNET_OK;
1249 }
1250
1251
1252 /**
1253  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1254  *
1255  * @param cls the socket (set from GNUNET_MESH_connect)
1256  * @param tunnel connection to the other end
1257  * @param tunnel_ctx this is NULL
1258  * @param sender who sent the message
1259  * @param message the actual message
1260  * @param atsi performance data for the connection
1261  * @return GNUNET_OK to keep the connection open,
1262  *         GNUNET_SYSERR to close it (signal serious error)
1263  */
1264 static int
1265 client_handle_close_ack (void *cls,
1266                          struct GNUNET_MESH_Tunnel *tunnel,
1267                          void **tunnel_ctx,
1268                          const struct GNUNET_PeerIdentity *sender,
1269                          const struct GNUNET_MessageHeader *message,
1270                          const struct GNUNET_ATS_Information*atsi)
1271 {
1272   struct GNUNET_STREAM_Socket *socket = cls;
1273
1274   return GNUNET_OK;
1275 }
1276
1277 /*****************************/
1278 /* Server's Message Handlers */
1279 /*****************************/
1280
1281 /**
1282  * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1283  *
1284  * @param cls the closure
1285  * @param tunnel connection to the other end
1286  * @param tunnel_ctx the socket
1287  * @param sender who sent the message
1288  * @param message the actual message
1289  * @param atsi performance data for the connection
1290  * @return GNUNET_OK to keep the connection open,
1291  *         GNUNET_SYSERR to close it (signal serious error)
1292  */
1293 static int
1294 server_handle_data (void *cls,
1295                     struct GNUNET_MESH_Tunnel *tunnel,
1296                     void **tunnel_ctx,
1297                     const struct GNUNET_PeerIdentity *sender,
1298                     const struct GNUNET_MessageHeader *message,
1299                     const struct GNUNET_ATS_Information*atsi)
1300 {
1301   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1302
1303   return handle_data (socket,
1304                       tunnel,
1305                       sender,
1306                       (const struct GNUNET_STREAM_DataMessage *)message,
1307                       atsi);
1308 }
1309
1310
1311 /**
1312  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
1313  *
1314  * @param cls the closure
1315  * @param tunnel connection to the other end
1316  * @param tunnel_ctx the socket
1317  * @param sender who sent the message
1318  * @param message the actual message
1319  * @param atsi performance data for the connection
1320  * @return GNUNET_OK to keep the connection open,
1321  *         GNUNET_SYSERR to close it (signal serious error)
1322  */
1323 static int
1324 server_handle_hello (void *cls,
1325                      struct GNUNET_MESH_Tunnel *tunnel,
1326                      void **tunnel_ctx,
1327                      const struct GNUNET_PeerIdentity *sender,
1328                      const struct GNUNET_MessageHeader *message,
1329                      const struct GNUNET_ATS_Information*atsi)
1330 {
1331   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1332   struct GNUNET_STREAM_HelloAckMessage *reply;
1333
1334   GNUNET_assert (socket->tunnel == tunnel);
1335   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1336               "Received HELLO from %s\n", GNUNET_i2s(sender));
1337
1338   /* Catch possible protocol breaks */
1339   GNUNET_break_op (0 != memcmp (&socket->other_peer, 
1340                                 sender,
1341                                 sizeof (struct GNUNET_PeerIdentity)));
1342
1343   if (STATE_INIT == socket->state)
1344     {
1345       /* Get the random sequence number */
1346       socket->write_sequence_number = 
1347         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1348       reply = 
1349         GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1350       reply->header.header.size = 
1351         htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1352       reply->header.header.type = 
1353         htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1354       reply->sequence_number = htonl (socket->write_sequence_number);
1355       queue_message (socket, 
1356                      &reply->header,
1357                      &set_state_hello_wait, 
1358                      NULL);
1359     }
1360   else
1361     {
1362       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1363                   "Client sent HELLO when in state %d\n", socket->state);
1364       /* FIXME: Send RESET? */
1365       
1366     }
1367   return GNUNET_OK;
1368 }
1369
1370
1371 /**
1372  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1373  *
1374  * @param cls the closure
1375  * @param tunnel connection to the other end
1376  * @param tunnel_ctx the socket
1377  * @param sender who sent the message
1378  * @param message the actual message
1379  * @param atsi performance data for the connection
1380  * @return GNUNET_OK to keep the connection open,
1381  *         GNUNET_SYSERR to close it (signal serious error)
1382  */
1383 static int
1384 server_handle_hello_ack (void *cls,
1385                          struct GNUNET_MESH_Tunnel *tunnel,
1386                          void **tunnel_ctx,
1387                          const struct GNUNET_PeerIdentity *sender,
1388                          const struct GNUNET_MessageHeader *message,
1389                          const struct GNUNET_ATS_Information*atsi)
1390 {
1391   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1392   const struct GNUNET_STREAM_HelloAckMessage *ack_message;
1393
1394   ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
1395   GNUNET_assert (socket->tunnel == tunnel);
1396   if (STATE_HELLO_WAIT == socket->state)
1397     {
1398       socket->read_sequence_number = ntohl (ack_message->sequence_number);
1399       socket->receive_window_available = 
1400         ntohl (ack_message->receive_window_size);
1401       /* Attain ESTABLISHED state */
1402       set_state_established (NULL, socket);
1403     }
1404   else
1405     {
1406       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1407                   "Client sent HELLO_ACK when in state %d\n", socket->state);
1408       /* FIXME: Send RESET? */
1409       
1410     }
1411   return GNUNET_OK;
1412 }
1413
1414
1415 /**
1416  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1417  *
1418  * @param cls the closure
1419  * @param tunnel connection to the other end
1420  * @param tunnel_ctx the socket
1421  * @param sender who sent the message
1422  * @param message the actual message
1423  * @param atsi performance data for the connection
1424  * @return GNUNET_OK to keep the connection open,
1425  *         GNUNET_SYSERR to close it (signal serious error)
1426  */
1427 static int
1428 server_handle_reset (void *cls,
1429                      struct GNUNET_MESH_Tunnel *tunnel,
1430                      void **tunnel_ctx,
1431                      const struct GNUNET_PeerIdentity *sender,
1432                      const struct GNUNET_MessageHeader *message,
1433                      const struct GNUNET_ATS_Information*atsi)
1434 {
1435   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1436
1437   return GNUNET_OK;
1438 }
1439
1440
1441 /**
1442  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1443  *
1444  * @param cls the closure
1445  * @param tunnel connection to the other end
1446  * @param tunnel_ctx the socket
1447  * @param sender who sent the message
1448  * @param message the actual message
1449  * @param atsi performance data for the connection
1450  * @return GNUNET_OK to keep the connection open,
1451  *         GNUNET_SYSERR to close it (signal serious error)
1452  */
1453 static int
1454 server_handle_transmit_close (void *cls,
1455                               struct GNUNET_MESH_Tunnel *tunnel,
1456                               void **tunnel_ctx,
1457                               const struct GNUNET_PeerIdentity *sender,
1458                               const struct GNUNET_MessageHeader *message,
1459                               const struct GNUNET_ATS_Information*atsi)
1460 {
1461   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1462
1463   return handle_transmit_close (socket,
1464                                 tunnel,
1465                                 sender,
1466                                 (struct GNUNET_STREAM_MessageHeader *)message,
1467                                 atsi);
1468 }
1469
1470
1471 /**
1472  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1473  *
1474  * @param cls the closure
1475  * @param tunnel connection to the other end
1476  * @param tunnel_ctx the socket
1477  * @param sender who sent the message
1478  * @param message the actual message
1479  * @param atsi performance data for the connection
1480  * @return GNUNET_OK to keep the connection open,
1481  *         GNUNET_SYSERR to close it (signal serious error)
1482  */
1483 static int
1484 server_handle_transmit_close_ack (void *cls,
1485                                   struct GNUNET_MESH_Tunnel *tunnel,
1486                                   void **tunnel_ctx,
1487                                   const struct GNUNET_PeerIdentity *sender,
1488                                   const struct GNUNET_MessageHeader *message,
1489                                   const struct GNUNET_ATS_Information*atsi)
1490 {
1491   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1492
1493   return GNUNET_OK;
1494 }
1495
1496
1497 /**
1498  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1499  *
1500  * @param cls the closure
1501  * @param tunnel connection to the other end
1502  * @param tunnel_ctx the socket
1503  * @param sender who sent the message
1504  * @param message the actual message
1505  * @param atsi performance data for the connection
1506  * @return GNUNET_OK to keep the connection open,
1507  *         GNUNET_SYSERR to close it (signal serious error)
1508  */
1509 static int
1510 server_handle_receive_close (void *cls,
1511                              struct GNUNET_MESH_Tunnel *tunnel,
1512                              void **tunnel_ctx,
1513                              const struct GNUNET_PeerIdentity *sender,
1514                              const struct GNUNET_MessageHeader *message,
1515                              const struct GNUNET_ATS_Information*atsi)
1516 {
1517   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1518
1519   return GNUNET_OK;
1520 }
1521
1522
1523 /**
1524  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1525  *
1526  * @param cls the closure
1527  * @param tunnel connection to the other end
1528  * @param tunnel_ctx the socket
1529  * @param sender who sent the message
1530  * @param message the actual message
1531  * @param atsi performance data for the connection
1532  * @return GNUNET_OK to keep the connection open,
1533  *         GNUNET_SYSERR to close it (signal serious error)
1534  */
1535 static int
1536 server_handle_receive_close_ack (void *cls,
1537                                  struct GNUNET_MESH_Tunnel *tunnel,
1538                                  void **tunnel_ctx,
1539                                  const struct GNUNET_PeerIdentity *sender,
1540                                  const struct GNUNET_MessageHeader *message,
1541                                  const struct GNUNET_ATS_Information*atsi)
1542 {
1543   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1544
1545   return GNUNET_OK;
1546 }
1547
1548
1549 /**
1550  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1551  *
1552  * @param cls the closure
1553  * @param tunnel connection to the other end
1554  * @param tunnel_ctx the socket
1555  * @param sender who sent the message
1556  * @param message the actual message
1557  * @param atsi performance data for the connection
1558  * @return GNUNET_OK to keep the connection open,
1559  *         GNUNET_SYSERR to close it (signal serious error)
1560  */
1561 static int
1562 server_handle_close (void *cls,
1563                      struct GNUNET_MESH_Tunnel *tunnel,
1564                      void **tunnel_ctx,
1565                      const struct GNUNET_PeerIdentity *sender,
1566                      const struct GNUNET_MessageHeader *message,
1567                      const struct GNUNET_ATS_Information*atsi)
1568 {
1569   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1570
1571   return GNUNET_OK;
1572 }
1573
1574
1575 /**
1576  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1577  *
1578  * @param cls the closure
1579  * @param tunnel connection to the other end
1580  * @param tunnel_ctx the socket
1581  * @param sender who sent the message
1582  * @param message the actual message
1583  * @param atsi performance data for the connection
1584  * @return GNUNET_OK to keep the connection open,
1585  *         GNUNET_SYSERR to close it (signal serious error)
1586  */
1587 static int
1588 server_handle_close_ack (void *cls,
1589                          struct GNUNET_MESH_Tunnel *tunnel,
1590                          void **tunnel_ctx,
1591                          const struct GNUNET_PeerIdentity *sender,
1592                          const struct GNUNET_MessageHeader *message,
1593                          const struct GNUNET_ATS_Information*atsi)
1594 {
1595   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1596
1597   return GNUNET_OK;
1598 }
1599
1600
1601 /**
1602  * Message Handler for mesh
1603  *
1604  * @param socket the socket through which the ack was received
1605  * @param tunnel connection to the other end
1606  * @param sender who sent the message
1607  * @param ack the acknowledgment message
1608  * @param atsi performance data for the connection
1609  * @return GNUNET_OK to keep the connection open,
1610  *         GNUNET_SYSERR to close it (signal serious error)
1611  */
1612 static int
1613 handle_ack (struct GNUNET_STREAM_Socket *socket,
1614             struct GNUNET_MESH_Tunnel *tunnel,
1615             const struct GNUNET_PeerIdentity *sender,
1616             const struct GNUNET_STREAM_AckMessage *ack,
1617             const struct GNUNET_ATS_Information*atsi)
1618 {
1619   switch (socket->state)
1620     {
1621     case (STATE_ESTABLISHED):
1622       if (NULL == socket->write_handle)
1623         {
1624           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1625                       "Received DATA ACK when write_handle is NULL\n");
1626           return GNUNET_OK;
1627         }
1628
1629       socket->write_handle->ack_bitmap = GNUNET_ntohll (ack->bitmap);
1630       socket->receive_window_available = 
1631         ntohl (ack->receive_window_remaining);
1632       write_data (socket);
1633       break;
1634     default:
1635       break;
1636     }
1637   return GNUNET_OK;
1638 }
1639
1640
1641 /**
1642  * Message Handler for mesh
1643  *
1644  * @param cls the 'struct GNUNET_STREAM_Socket'
1645  * @param tunnel connection to the other end
1646  * @param tunnel_ctx unused
1647  * @param sender who sent the message
1648  * @param message the actual message
1649  * @param atsi performance data for the connection
1650  * @return GNUNET_OK to keep the connection open,
1651  *         GNUNET_SYSERR to close it (signal serious error)
1652  */
1653 static int
1654 client_handle_ack (void *cls,
1655                    struct GNUNET_MESH_Tunnel *tunnel,
1656                    void **tunnel_ctx,
1657                    const struct GNUNET_PeerIdentity *sender,
1658                    const struct GNUNET_MessageHeader *message,
1659                    const struct GNUNET_ATS_Information*atsi)
1660 {
1661   struct GNUNET_STREAM_Socket *socket = cls;
1662   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
1663  
1664   return handle_ack (socket, tunnel, sender, ack, atsi);
1665 }
1666
1667
1668 /**
1669  * Message Handler for mesh
1670  *
1671  * @param cls the server's listen socket
1672  * @param tunnel connection to the other end
1673  * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
1674  * @param sender who sent the message
1675  * @param message the actual message
1676  * @param atsi performance data for the connection
1677  * @return GNUNET_OK to keep the connection open,
1678  *         GNUNET_SYSERR to close it (signal serious error)
1679  */
1680 static int
1681 server_handle_ack (void *cls,
1682                    struct GNUNET_MESH_Tunnel *tunnel,
1683                    void **tunnel_ctx,
1684                    const struct GNUNET_PeerIdentity *sender,
1685                    const struct GNUNET_MessageHeader *message,
1686                    const struct GNUNET_ATS_Information*atsi)
1687 {
1688   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1689   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
1690  
1691   return handle_ack (socket, tunnel, sender, ack, atsi);
1692 }
1693
1694
1695 /**
1696  * For client message handlers, the stream socket is in the
1697  * closure argument.
1698  */
1699 static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
1700   {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
1701   {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
1702    sizeof (struct GNUNET_STREAM_AckMessage) },
1703   {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
1704    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
1705   {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
1706    sizeof (struct GNUNET_STREAM_MessageHeader)},
1707   {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
1708    sizeof (struct GNUNET_STREAM_MessageHeader)},
1709   {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
1710    sizeof (struct GNUNET_STREAM_MessageHeader)},
1711   {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
1712    sizeof (struct GNUNET_STREAM_MessageHeader)},
1713   {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
1714    sizeof (struct GNUNET_STREAM_MessageHeader)},
1715   {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
1716    sizeof (struct GNUNET_STREAM_MessageHeader)},
1717   {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
1718    sizeof (struct GNUNET_STREAM_MessageHeader)},
1719   {NULL, 0, 0}
1720 };
1721
1722
1723 /**
1724  * For server message handlers, the stream socket is in the
1725  * tunnel context, and the listen socket in the closure argument.
1726  */
1727 static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
1728   {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
1729   {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
1730    sizeof (struct GNUNET_STREAM_AckMessage) },
1731   {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 
1732    sizeof (struct GNUNET_STREAM_MessageHeader)},
1733   {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
1734    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
1735   {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
1736    sizeof (struct GNUNET_STREAM_MessageHeader)},
1737   {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
1738    sizeof (struct GNUNET_STREAM_MessageHeader)},
1739   {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
1740    sizeof (struct GNUNET_STREAM_MessageHeader)},
1741   {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
1742    sizeof (struct GNUNET_STREAM_MessageHeader)},
1743   {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
1744    sizeof (struct GNUNET_STREAM_MessageHeader)},
1745   {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
1746    sizeof (struct GNUNET_STREAM_MessageHeader)},
1747   {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
1748    sizeof (struct GNUNET_STREAM_MessageHeader)},
1749   {NULL, 0, 0}
1750 };
1751
1752
1753 /**
1754  * Function called when our target peer is connected to our tunnel
1755  *
1756  * @param cls the socket for which this tunnel is created
1757  * @param peer the peer identity of the target
1758  * @param atsi performance data for the connection
1759  */
1760 static void
1761 mesh_peer_connect_callback (void *cls,
1762                             const struct GNUNET_PeerIdentity *peer,
1763                             const struct GNUNET_ATS_Information * atsi)
1764 {
1765   struct GNUNET_STREAM_Socket *socket = cls;
1766   struct GNUNET_STREAM_MessageHeader *message;
1767
1768   if (0 != memcmp (&socket->other_peer, 
1769                    peer, 
1770                    sizeof (struct GNUNET_PeerIdentity)))
1771     {
1772       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1773                   "A peer (%s) which is not our target has connected to our tunnel", 
1774                   GNUNET_i2s (peer));
1775       return;
1776     }
1777   
1778   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1779               "Target peer %s connected\n", GNUNET_i2s (peer));
1780   
1781   /* Set state to INIT */
1782   socket->state = STATE_INIT;
1783
1784   /* Send HELLO message */
1785   message = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1786   message->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
1787   message->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1788   queue_message (socket,
1789                  message,
1790                  &set_state_hello_wait,
1791                  NULL);
1792
1793   /* Call open callback */
1794   if (NULL == socket->open_cb)
1795     {
1796       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1797                   "STREAM_open callback is NULL\n");
1798     }
1799 }
1800
1801
1802 /**
1803  * Function called when our target peer is disconnected from our tunnel
1804  *
1805  * @param cls the socket associated which this tunnel
1806  * @param peer the peer identity of the target
1807  */
1808 static void
1809 mesh_peer_disconnect_callback (void *cls,
1810                                const struct GNUNET_PeerIdentity *peer)
1811 {
1812
1813 }
1814
1815
1816 /**
1817  * Method called whenever a peer creates a tunnel to us
1818  *
1819  * @param cls closure
1820  * @param tunnel new handle to the tunnel
1821  * @param initiator peer that started the tunnel
1822  * @param atsi performance information for the tunnel
1823  * @return initial tunnel context for the tunnel
1824  *         (can be NULL -- that's not an error)
1825  */
1826 static void *
1827 new_tunnel_notify (void *cls,
1828                    struct GNUNET_MESH_Tunnel *tunnel,
1829                    const struct GNUNET_PeerIdentity *initiator,
1830                    const struct GNUNET_ATS_Information *atsi)
1831 {
1832   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
1833   struct GNUNET_STREAM_Socket *socket;
1834
1835   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1836               "Peer %s initiated tunnel to us\n", GNUNET_i2s (initiator));
1837   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
1838   socket->tunnel = tunnel;
1839   socket->session_id = 0;       /* FIXME */
1840   socket->other_peer = *initiator;
1841   socket->state = STATE_INIT;
1842   /* FIXME: Copy MESH handle from lsocket to socket */
1843
1844   if (GNUNET_SYSERR == lsocket->listen_cb (lsocket->listen_cb_cls,
1845                                            socket,
1846                                            &socket->other_peer))
1847     {
1848       socket->state = STATE_CLOSED;
1849       /* FIXME: Send CLOSE message and then free */
1850       GNUNET_free (socket);
1851       GNUNET_MESH_tunnel_destroy (tunnel); /* Destroy the tunnel */
1852     }
1853   return socket;
1854 }
1855
1856
1857 /**
1858  * Function called whenever an inbound tunnel is destroyed.  Should clean up
1859  * any associated state.  This function is NOT called if the client has
1860  * explicitly asked for the tunnel to be destroyed using
1861  * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
1862  * the tunnel.
1863  *
1864  * @param cls closure (set from GNUNET_MESH_connect)
1865  * @param tunnel connection to the other end (henceforth invalid)
1866  * @param tunnel_ctx place where local state associated
1867  *                   with the tunnel is stored
1868  */
1869 static void 
1870 tunnel_cleaner (void *cls,
1871                 const struct GNUNET_MESH_Tunnel *tunnel,
1872                 void *tunnel_ctx)
1873 {
1874   struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
1875   
1876   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1877               "Peer %s has terminated connection abruptly\n",
1878               GNUNET_i2s (&socket->other_peer));
1879
1880   socket->status = GNUNET_STREAM_SHUTDOWN;
1881   /* Clear Transmit handles */
1882   if (NULL != socket->transmit_handle)
1883     {
1884       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
1885       socket->transmit_handle = NULL;
1886     }
1887   socket->tunnel = NULL;
1888 }
1889
1890
1891 /*****************/
1892 /* API functions */
1893 /*****************/
1894
1895
1896 /**
1897  * Tries to open a stream to the target peer
1898  *
1899  * @param cfg configuration to use
1900  * @param target the target peer to which the stream has to be opened
1901  * @param app_port the application port number which uniquely identifies this
1902  *            stream
1903  * @param open_cb this function will be called after stream has be established 
1904  * @param open_cb_cls the closure for open_cb
1905  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
1906  * @return if successful it returns the stream socket; NULL if stream cannot be
1907  *         opened 
1908  */
1909 struct GNUNET_STREAM_Socket *
1910 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
1911                     const struct GNUNET_PeerIdentity *target,
1912                     GNUNET_MESH_ApplicationType app_port,
1913                     GNUNET_STREAM_OpenCallback open_cb,
1914                     void *open_cb_cls,
1915                     ...)
1916 {
1917   struct GNUNET_STREAM_Socket *socket;
1918   enum GNUNET_STREAM_Option option;
1919   va_list vargs;                /* Variable arguments */
1920
1921   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1922               "%s\n", __func__);
1923
1924   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
1925   socket->other_peer = *target;
1926   socket->open_cb = open_cb;
1927   socket->open_cls = open_cb_cls;
1928
1929   /* Set defaults */
1930   socket->retransmit_timeout = 
1931     GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
1932
1933   va_start (vargs, open_cb_cls); /* Parse variable args */
1934   do {
1935     option = va_arg (vargs, enum GNUNET_STREAM_Option);
1936     switch (option)
1937       {
1938       case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
1939         /* Expect struct GNUNET_TIME_Relative */
1940         socket->retransmit_timeout = va_arg (vargs,
1941                                              struct GNUNET_TIME_Relative);
1942         break;
1943       case GNUNET_STREAM_OPTION_END:
1944         break;
1945       }
1946   } while (GNUNET_STREAM_OPTION_END != option);
1947   va_end (vargs);               /* End of variable args parsing */
1948   socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
1949                                       10,  /* QUEUE size as parameter? */
1950                                       socket, /* cls */
1951                                       NULL, /* No inbound tunnel handler */
1952                                       &tunnel_cleaner,
1953                                       client_message_handlers,
1954                                       &app_port); /* We don't get inbound tunnels */
1955   if (NULL == socket->mesh)   /* Fail if we cannot connect to mesh */
1956     {
1957       GNUNET_free (socket);
1958       return NULL;
1959     }
1960
1961   /* Now create the mesh tunnel to target */
1962   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1963               "Creating MESH Tunnel\n");
1964   socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
1965                                               NULL, /* Tunnel context */
1966                                               &mesh_peer_connect_callback,
1967                                               &mesh_peer_disconnect_callback,
1968                                               socket);
1969   GNUNET_assert (NULL != socket->tunnel);
1970   GNUNET_MESH_peer_request_connect_add (socket->tunnel,
1971                                         target);
1972   
1973   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1974               "%s() END\n", __func__);
1975   return socket;
1976 }
1977
1978
1979 /**
1980  * Shutdown the stream for reading or writing (man 2 shutdown).
1981  *
1982  * @param socket the stream socket
1983  * @param how SHUT_RD, SHUT_WR or SHUT_RDWR 
1984  */
1985 void
1986 GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
1987                         int how)
1988 {
1989   return;
1990 }
1991
1992
1993 /**
1994  * Closes the stream
1995  *
1996  * @param socket the stream socket
1997  */
1998 void
1999 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
2000 {
2001   struct MessageQueue *head;
2002
2003   if (socket->read_task != GNUNET_SCHEDULER_NO_TASK)
2004   {
2005     /* socket closed with read task pending!? */
2006     GNUNET_break (0);
2007     GNUNET_SCHEDULER_cancel (socket->read_task);
2008     socket->read_task = GNUNET_SCHEDULER_NO_TASK;
2009   }
2010
2011   /* Clear Transmit handles */
2012   if (NULL != socket->transmit_handle)
2013     {
2014       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2015       socket->transmit_handle = NULL;
2016     }
2017
2018   /* Clear existing message queue */
2019   while (NULL != (head = socket->queue_head)) {
2020     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
2021                                  socket->queue_tail,
2022                                  head);
2023     GNUNET_free (head->message);
2024     GNUNET_free (head);
2025   }
2026
2027   /* Close associated tunnel */
2028   if (NULL != socket->tunnel)
2029     {
2030       GNUNET_MESH_tunnel_destroy (socket->tunnel);
2031       socket->tunnel = NULL;
2032     }
2033
2034   /* Close mesh connection */
2035   if (NULL != socket->mesh)
2036     {
2037       GNUNET_MESH_disconnect (socket->mesh);
2038       socket->mesh = NULL;
2039     }
2040   
2041   /* Release receive buffer */
2042   if (NULL != socket->receive_buffer)
2043     {
2044       GNUNET_free (socket->receive_buffer);
2045     }
2046
2047   GNUNET_free (socket);
2048 }
2049
2050
2051 /**
2052  * Listens for stream connections for a specific application ports
2053  *
2054  * @param cfg the configuration to use
2055  * @param app_port the application port for which new streams will be accepted
2056  * @param listen_cb this function will be called when a peer tries to establish
2057  *            a stream with us
2058  * @param listen_cb_cls closure for listen_cb
2059  * @return listen socket, NULL for any error
2060  */
2061 struct GNUNET_STREAM_ListenSocket *
2062 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
2063                       GNUNET_MESH_ApplicationType app_port,
2064                       GNUNET_STREAM_ListenCallback listen_cb,
2065                       void *listen_cb_cls)
2066 {
2067   /* FIXME: Add variable args for passing configration options? */
2068   struct GNUNET_STREAM_ListenSocket *lsocket;
2069
2070   lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
2071   lsocket->port = app_port;
2072   lsocket->listen_cb = listen_cb;
2073   lsocket->listen_cb_cls = listen_cb_cls;
2074   lsocket->mesh = GNUNET_MESH_connect (cfg,
2075                                        10, /* FIXME: QUEUE size as parameter? */
2076                                        lsocket, /* Closure */
2077                                        &new_tunnel_notify,
2078                                        &tunnel_cleaner,
2079                                        server_message_handlers,
2080                                        &app_port);
2081   GNUNET_assert (NULL != lsocket->mesh);
2082   return lsocket;
2083 }
2084
2085
2086 /**
2087  * Closes the listen socket
2088  *
2089  * @param lsocket the listen socket
2090  */
2091 void
2092 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
2093 {
2094   /* Close MESH connection */
2095   GNUNET_MESH_disconnect (lsocket->mesh);
2096   
2097   GNUNET_free (lsocket);
2098 }
2099
2100
2101 /**
2102  * Tries to write the given data to the stream
2103  *
2104  * @param socket the socket representing a stream
2105  * @param data the data buffer from where the data is written into the stream
2106  * @param size the number of bytes to be written from the data buffer
2107  * @param timeout the timeout period
2108  * @param write_cont the function to call upon writing some bytes into the stream
2109  * @param write_cont_cls the closure
2110  * @return handle to cancel the operation
2111  */
2112 struct GNUNET_STREAM_IOWriteHandle *
2113 GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
2114                      const void *data,
2115                      size_t size,
2116                      struct GNUNET_TIME_Relative timeout,
2117                      GNUNET_STREAM_CompletionContinuation write_cont,
2118                      void *write_cont_cls)
2119 {
2120   unsigned int num_needed_packets;
2121   unsigned int packet;
2122   struct GNUNET_STREAM_IOWriteHandle *io_handle;
2123   uint32_t packet_size;
2124   uint32_t payload_size;
2125   struct GNUNET_STREAM_DataMessage *data_msg;
2126   const void *sweep;
2127
2128   /* Return NULL if there is already a write request pending */
2129   if (NULL != socket->write_handle)
2130   {
2131     GNUNET_break (0);
2132     return NULL;
2133   }
2134   if (!((STATE_ESTABLISHED == socket->state)
2135         || (STATE_RECEIVE_CLOSE_WAIT == socket->state)
2136         || (STATE_RECEIVE_CLOSED == socket->state)))
2137     {
2138       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2139                   "Attempting to write on a closed (OR) not-yet-established"
2140                   "stream\n"); 
2141       return NULL;
2142     } 
2143   if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size)
2144     size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH  * max_payload_size;
2145   num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size;
2146   io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
2147   sweep = data;
2148   /* Divide the given buffer into packets for sending */
2149   for (packet=0; packet < num_needed_packets; packet++)
2150     {
2151       if ((packet + 1) * max_payload_size < size) 
2152         {
2153           payload_size = max_payload_size;
2154           packet_size = MAX_PACKET_SIZE;
2155         }
2156       else 
2157         {
2158           payload_size = size - packet * max_payload_size;
2159           packet_size =  payload_size + sizeof (struct
2160                                                 GNUNET_STREAM_DataMessage); 
2161         }
2162       io_handle->messages[packet] = GNUNET_malloc (packet_size);
2163       io_handle->messages[packet]->header.header.size = htons (packet_size);
2164       io_handle->messages[packet]->header.header.type =
2165         htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
2166       io_handle->messages[packet]->sequence_number =
2167         htons (socket->write_sequence_number++);
2168       io_handle->messages[packet]->offset = htons (socket->write_offset);
2169
2170       /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
2171          determined from RTT */
2172       io_handle->messages[packet]->ack_deadline = 
2173         GNUNET_TIME_relative_hton (GNUNET_TIME_relative_multiply 
2174                                    (GNUNET_TIME_UNIT_SECONDS, 5));
2175       data_msg = io_handle->messages[packet];
2176       /* Copy data from given buffer to the packet */
2177       memcpy (&data_msg[1],
2178               sweep,
2179               payload_size);
2180       sweep += payload_size;
2181       socket->write_offset += payload_size;
2182     }
2183   socket->write_handle = io_handle;
2184   write_data (socket);
2185
2186   return io_handle;
2187 }
2188
2189
2190 /**
2191  * Tries to read data from the stream
2192  *
2193  * @param socket the socket representing a stream
2194  * @param timeout the timeout period
2195  * @param proc function to call with data (once only)
2196  * @param proc_cls the closure for proc
2197  * @return handle to cancel the operation
2198  */
2199 struct GNUNET_STREAM_IOReadHandle *
2200 GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
2201                     struct GNUNET_TIME_Relative timeout,
2202                     GNUNET_STREAM_DataProcessor proc,
2203                     void *proc_cls)
2204 {
2205   struct GNUNET_STREAM_IOReadHandle *read_handle;
2206   
2207   /* Return NULL if there is already a read handle; the user has to cancel that
2208   first before continuing or has to wait until it is completed */
2209   if (NULL != socket->read_handle) return NULL;
2210
2211   read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
2212   read_handle->proc = proc;
2213   socket->read_handle = read_handle;
2214
2215   /* Check if we have a packet at bitmap 0 */
2216   if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
2217                                           0))
2218     {
2219       socket->read_task = GNUNET_SCHEDULER_add_now (&call_read_processor,
2220                                                     socket);
2221    
2222     }
2223   
2224   /* Setup the read timeout task */
2225   socket->read_io_timeout_task = GNUNET_SCHEDULER_add_delayed (timeout,
2226                                                                &read_io_timeout,
2227                                                                socket);
2228   return read_handle;
2229 }
2230
2231
2232 /**
2233  * Cancel pending write operation.
2234  *
2235  * @param ioh handle to operation to cancel
2236  */
2237 void
2238 GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
2239 {
2240   return;
2241 }
2242
2243
2244 /**
2245  * Cancel pending read operation.
2246  *
2247  * @param ioh handle to operation to cancel
2248  */
2249 void
2250 GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh)
2251 {
2252   return;
2253 }