f3266cb138ab985ec134b215fca18936ad11af4d
[oweals/gnunet.git] / src / stream / stream_api.c
1 /*
2   This file is part of GNUnet.
3   (C) 2012 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /* TODO:
22  * Copy MESH handle from lsocket to socket
23  *
24  * Checks for matching the sender and socket->other_peer in server
25  * message handlers  */
26
27 /**
28  * @file stream/stream_api.c
29  * @brief Implementation of the stream library
30  * @author Sree Harsha Totakura
31  */
32 #include "platform.h"
33 #include "gnunet_common.h"
34 #include "gnunet_crypto_lib.h"
35 #include "gnunet_stream_lib.h"
36 #include "gnunet_testing_lib.h"
37 #include "stream_protocol.h"
38
39
40 /**
41  * The maximum packet size of a stream packet
42  */
43 #define MAX_PACKET_SIZE 64000
44
45 /**
46  * The maximum payload a data message packet can carry
47  */
48 static size_t max_payload_size = 
49   MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
50
51 /**
52  * Receive buffer
53  */
54 #define RECEIVE_BUFFER_SIZE 4096000
55
56 /**
57  * states in the Protocol
58  */
59 enum State
60   {
61     /**
62      * Client initialization state
63      */
64     STATE_INIT,
65
66     /**
67      * Listener initialization state 
68      */
69     STATE_LISTEN,
70
71     /**
72      * Pre-connection establishment state
73      */
74     STATE_HELLO_WAIT,
75
76     /**
77      * State where a connection has been established
78      */
79     STATE_ESTABLISHED,
80
81     /**
82      * State where the socket is closed on our side and waiting to be ACK'ed
83      */
84     STATE_RECEIVE_CLOSE_WAIT,
85
86     /**
87      * State where the socket is closed for reading
88      */
89     STATE_RECEIVE_CLOSED,
90
91     /**
92      * State where the socket is closed on our side and waiting to be ACK'ed
93      */
94     STATE_TRANSMIT_CLOSE_WAIT,
95
96     /**
97      * State where the socket is closed for writing
98      */
99     STATE_TRANSMIT_CLOSED,
100
101     /**
102      * State where the socket is closed on our side and waiting to be ACK'ed
103      */
104     STATE_CLOSE_WAIT,
105
106     /**
107      * State where the socket is closed
108      */
109     STATE_CLOSED 
110   };
111
112
113 /**
114  * Functions of this type are called when a message is written
115  *
116  * @param cls the closure from queue_message
117  * @param socket the socket the written message was bound to
118  */
119 typedef void (*SendFinishCallback) (void *cls,
120                                     struct GNUNET_STREAM_Socket *socket);
121
122
123 /**
124  * The send message queue
125  */
126 struct MessageQueue
127 {
128   /**
129    * The message
130    */
131   struct GNUNET_STREAM_MessageHeader *message;
132
133   /**
134    * Callback to be called when the message is sent
135    */
136   SendFinishCallback finish_cb;
137
138   /**
139    * The closure for finish_cb
140    */
141   void *finish_cb_cls;
142
143   /**
144    * The next message in queue. Should be NULL in the last message
145    */
146   struct MessageQueue *next;
147
148   /**
149    * The next message in queue. Should be NULL in the first message
150    */
151   struct MessageQueue *prev;
152 };
153
154
155 /**
156  * The STREAM Socket Handler
157  */
158 struct GNUNET_STREAM_Socket
159 {
160
161   /**
162    * The peer identity of the peer at the other end of the stream
163    */
164   struct GNUNET_PeerIdentity other_peer;
165
166   /**
167    * Our Peer Identity (for debugging)
168    */
169   struct GNUNET_PeerIdentity our_id;
170
171   /**
172    * Retransmission timeout
173    */
174   struct GNUNET_TIME_Relative retransmit_timeout;
175
176   /**
177    * The Acknowledgement Bitmap
178    */
179   GNUNET_STREAM_AckBitmap ack_bitmap;
180
181   /**
182    * Time when the Acknowledgement was queued
183    */
184   struct GNUNET_TIME_Absolute ack_time_registered;
185
186   /**
187    * Queued Acknowledgement deadline
188    */
189   struct GNUNET_TIME_Relative ack_time_deadline;
190
191   /**
192    * The task for sending timely Acks
193    */
194   GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
195
196   /**
197    * Task scheduled to continue a read operation.
198    */
199   GNUNET_SCHEDULER_TaskIdentifier read_task;
200
201   /**
202    * The mesh handle
203    */
204   struct GNUNET_MESH_Handle *mesh;
205
206   /**
207    * The mesh tunnel handle
208    */
209   struct GNUNET_MESH_Tunnel *tunnel;
210
211   /**
212    * Stream open closure
213    */
214   void *open_cls;
215
216   /**
217    * Stream open callback
218    */
219   GNUNET_STREAM_OpenCallback open_cb;
220
221   /**
222    * The current transmit handle (if a pending transmit request exists)
223    */
224   struct GNUNET_MESH_TransmitHandle *transmit_handle;
225
226   /**
227    * The current message associated with the transmit handle
228    */
229   struct MessageQueue *queue_head;
230
231   /**
232    * The queue tail, should always point to the last message in queue
233    */
234   struct MessageQueue *queue_tail;
235
236   /**
237    * The write IO_handle associated with this socket
238    */
239   struct GNUNET_STREAM_IOWriteHandle *write_handle;
240
241   /**
242    * The read IO_handle associated with this socket
243    */
244   struct GNUNET_STREAM_IOReadHandle *read_handle;
245
246   /**
247    * Buffer for storing received messages
248    */
249   void *receive_buffer;
250
251   /**
252    * Task identifier for the read io timeout task
253    */
254   GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task;
255
256   /**
257    * The state of the protocol associated with this socket
258    */
259   enum State state;
260
261   /**
262    * The status of the socket
263    */
264   enum GNUNET_STREAM_Status status;
265
266   /**
267    * The number of previous timeouts; FIXME: currently not used
268    */
269   unsigned int retries;
270
271   /**
272    * Is this socket derived from listen socket?
273    */
274   unsigned int derived;
275   
276   /**
277    * The application port number (type: uint32_t)
278    */
279   GNUNET_MESH_ApplicationType app_port;
280
281   /**
282    * The session id associated with this stream connection
283    * FIXME: Not used currently, may be removed
284    */
285   uint32_t session_id;
286
287   /**
288    * Write sequence number. Set to random when sending HELLO(client) and
289    * HELLO_ACK(server) 
290    */
291   uint32_t write_sequence_number;
292
293   /**
294    * Read sequence number. This number's value is determined during handshake
295    */
296   uint32_t read_sequence_number;
297
298   /**
299    * The receiver buffer size
300    */
301   uint32_t receive_buffer_size;
302
303   /**
304    * The receiver buffer boundaries
305    */
306   uint32_t receive_buffer_boundaries[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
307
308   /**
309    * receiver's available buffer after the last acknowledged packet
310    */
311   uint32_t receive_window_available;
312
313   /**
314    * The offset pointer used during write operation
315    */
316   uint32_t write_offset;
317
318   /**
319    * The offset after which we are expecting data
320    */
321   uint32_t read_offset;
322
323   /**
324    * The offset upto which user has read from the received buffer
325    */
326   uint32_t copy_offset;
327 };
328
329
330 /**
331  * A socket for listening
332  */
333 struct GNUNET_STREAM_ListenSocket
334 {
335
336   /**
337    * Our Peer's identity
338    */
339   struct GNUNET_PeerIdentity our_id;
340
341   /**
342    * The mesh handle
343    */
344   struct GNUNET_MESH_Handle *mesh;
345
346   /**
347    * The callback function which is called after successful opening socket
348    */
349   GNUNET_STREAM_ListenCallback listen_cb;
350
351   /**
352    * The call back closure
353    */
354   void *listen_cb_cls;
355
356   /**
357    * The service port
358    * FIXME: Remove if not required!
359    */
360   GNUNET_MESH_ApplicationType port;
361 };
362
363
364 /**
365  * The IO Write Handle
366  */
367 struct GNUNET_STREAM_IOWriteHandle
368 {
369   /**
370    * The packet_buffers associated with this Handle
371    */
372   struct GNUNET_STREAM_DataMessage *messages[64];
373
374   /**
375    * The bitmap of this IOHandle; Corresponding bit for a message is set when
376    * it has been acknowledged by the receiver
377    */
378   GNUNET_STREAM_AckBitmap ack_bitmap;
379
380   /**
381    * Number of packets sent before waiting for an ack
382    *
383    * FIXME: Do we need this?
384    */
385   unsigned int sent_packets;
386 };
387
388
389 /**
390  * The IO Read Handle
391  */
392 struct GNUNET_STREAM_IOReadHandle
393 {
394   /**
395    * Callback for the read processor
396    */
397   GNUNET_STREAM_DataProcessor proc;
398
399   /**
400    * The closure pointer for the read processor callback
401    */
402   void *proc_cls;
403 };
404
405
406 /**
407  * Default value in seconds for various timeouts
408  */
409 static unsigned int default_timeout = 300;
410
411
412 /**
413  * Callback function for sending hello message
414  *
415  * @param cls closure the socket
416  * @param size number of bytes available in buf
417  * @param buf where the callee should write the message
418  * @return number of bytes written to buf
419  */
420 static size_t
421 send_message_notify (void *cls, size_t size, void *buf)
422 {
423   struct GNUNET_STREAM_Socket *socket = cls;
424   struct MessageQueue *head;
425   size_t ret;
426
427   socket->transmit_handle = NULL; /* Remove the transmit handle */
428   head = socket->queue_head;
429   if (NULL == head)
430     return 0; /* just to be safe */
431   if (0 == size)                /* request timed out */
432     {
433       socket->retries++;
434       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
435                   "Message sending timed out. Retry %d \n",
436                   socket->retries);
437       socket->transmit_handle = 
438         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
439                                            0, /* Corking */
440                                            1, /* Priority */
441                                            /* FIXME: exponential backoff */
442                                            socket->retransmit_timeout,
443                                            &socket->other_peer,
444                                            ntohs (head->message->header.size),
445                                            &send_message_notify,
446                                            socket);
447       return 0;
448     }
449
450   ret = ntohs (head->message->header.size);
451   GNUNET_assert (size >= ret);
452   memcpy (buf, head->message, ret);
453   if (NULL != head->finish_cb)
454     {
455       head->finish_cb (head->finish_cb_cls, socket);
456     }
457   GNUNET_CONTAINER_DLL_remove (socket->queue_head,
458                                socket->queue_tail,
459                                head);
460   GNUNET_free (head->message);
461   GNUNET_free (head);
462   head = socket->queue_head;
463   if (NULL != head)    /* more pending messages to send */
464     {
465       socket->retries = 0;
466       socket->transmit_handle = 
467         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
468                                            0, /* Corking */
469                                            1, /* Priority */
470                                            /* FIXME: exponential backoff */
471                                            socket->retransmit_timeout,
472                                            &socket->other_peer,
473                                            ntohs (head->message->header.size),
474                                            &send_message_notify,
475                                            socket);
476     }
477   return ret;
478 }
479
480
481 /**
482  * Queues a message for sending using the mesh connection of a socket
483  *
484  * @param socket the socket whose mesh connection is used
485  * @param message the message to be sent
486  * @param finish_cb the callback to be called when the message is sent
487  * @param finish_cb_cls the closure for the callback
488  */
489 static void
490 queue_message (struct GNUNET_STREAM_Socket *socket,
491                struct GNUNET_STREAM_MessageHeader *message,
492                SendFinishCallback finish_cb,
493                void *finish_cb_cls)
494 {
495   struct MessageQueue *queue_entity;
496
497   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
498               "Queueing message of type %d and size %d\n",
499               ntohs (message->header.type),
500               ntohs (message->header.size));
501   queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
502   queue_entity->message = message;
503   queue_entity->finish_cb = finish_cb;
504   queue_entity->finish_cb_cls = finish_cb_cls;
505   GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
506                                     socket->queue_tail,
507                                     queue_entity);
508   if (NULL == socket->transmit_handle)
509   {
510     socket->retries = 0;
511     socket->transmit_handle = 
512       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
513                                          0, /* Corking */
514                                          1, /* Priority */
515                                          socket->retransmit_timeout,
516                                          &socket->other_peer,
517                                          ntohs (message->header.size),
518                                          &send_message_notify,
519                                          socket);
520   }
521 }
522
523
524 /**
525  * Callback function for sending ack message
526  *
527  * @param cls closure the ACK message created in ack_task
528  * @param size number of bytes available in buffer
529  * @param buf where the callee should write the message
530  * @return number of bytes written to buf
531  */
532 static size_t
533 send_ack_notify (void *cls, size_t size, void *buf)
534 {
535   struct GNUNET_STREAM_AckMessage *ack_msg = cls;
536
537   if (0 == size)
538     {
539       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
540                   "%s called with size 0\n", __func__);
541       return 0;
542     }
543   GNUNET_assert (ack_msg->header.header.size <= size);
544   
545   size = ack_msg->header.header.size;
546   memcpy (buf, ack_msg, size);
547   return size;
548 }
549
550
551 /**
552  * Task for sending ACK message
553  *
554  * @param cls the socket
555  * @param tc the Task context
556  */
557 static void
558 ack_task (void *cls,
559           const struct GNUNET_SCHEDULER_TaskContext *tc)
560 {
561   struct GNUNET_STREAM_Socket *socket = cls;
562   struct GNUNET_STREAM_AckMessage *ack_msg;
563
564   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
565     {
566       return;
567     }
568
569   socket->ack_task_id = 0;
570
571   /* Create the ACK Message */
572   ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
573   ack_msg->header.header.size = htons (sizeof (struct 
574                                                GNUNET_STREAM_AckMessage));
575   ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
576   ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
577   ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
578   ack_msg->receive_window_remaining = 
579     htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
580
581   /* Request MESH for sending ACK */
582   GNUNET_MESH_notify_transmit_ready (socket->tunnel,
583                                      0, /* Corking */
584                                      1, /* Priority */
585                                      socket->retransmit_timeout,
586                                      &socket->other_peer,
587                                      ntohs (ack_msg->header.header.size),
588                                      &send_ack_notify,
589                                      ack_msg);
590
591   
592 }
593
594
595 /**
596  * Function to modify a bit in GNUNET_STREAM_AckBitmap
597  *
598  * @param bitmap the bitmap to modify
599  * @param bit the bit number to modify
600  * @param value GNUNET_YES to on, GNUNET_NO to off
601  */
602 static void
603 ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
604                       unsigned int bit, 
605                       int value)
606 {
607   GNUNET_assert (bit < 64);
608   if (GNUNET_YES == value)
609     *bitmap |= (1LL << bit);
610   else
611     *bitmap &= ~(1LL << bit);
612 }
613
614
615 /**
616  * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
617  *
618  * @param bitmap address of the bitmap that has to be checked
619  * @param bit the bit number to check
620  * @return GNUNET_YES if the bit is set; GNUNET_NO if not
621  */
622 static uint8_t
623 ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
624                       unsigned int bit)
625 {
626   GNUNET_assert (bit < 64);
627   return 0 != (*bitmap & (1LL << bit));
628 }
629
630
631
632 /**
633  * Function called when Data Message is sent
634  *
635  * @param cls the io_handle corresponding to the Data Message
636  * @param socket the socket which was used
637  */
638 static void
639 write_data_finish_cb (void *cls,
640                       struct GNUNET_STREAM_Socket *socket)
641 {
642   struct GNUNET_STREAM_IOWriteHandle *io_handle = cls;
643
644   io_handle->sent_packets++;
645 }
646
647
648 /**
649  * Writes data using the given socket. The amount of data written is limited by
650  * the receive_window_size
651  *
652  * @param socket the socket to use
653  */
654 static void 
655 write_data (struct GNUNET_STREAM_Socket *socket)
656 {
657   struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
658   int packet;                   /* Although an int, should never be negative */
659   int ack_packet;
660
661   ack_packet = -1;
662   /* Find the last acknowledged packet */
663   for (packet=0; packet < 64; packet++)
664     {      
665       if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
666                                               packet))
667         ack_packet = packet;        
668       else if (NULL == io_handle->messages[packet])
669         break;
670     }
671   /* Resend packets which weren't ack'ed */
672   for (packet=0; packet < ack_packet; packet++)
673     {
674       if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
675                                              packet))
676         {
677           queue_message (socket,
678                          &io_handle->messages[packet]->header,
679                          NULL,
680                          NULL);
681         }
682     }
683   packet = ack_packet + 1;
684   /* Now send new packets if there is enough buffer space */
685   while ( (NULL != io_handle->messages[packet]) &&
686           (socket->receive_window_available >= ntohs (io_handle->messages[packet]->header.header.size)) )
687     {
688       socket->receive_window_available -= ntohs (io_handle->messages[packet]->header.header.size);
689       queue_message (socket,
690                      &io_handle->messages[packet]->header,
691                      &write_data_finish_cb,
692                      io_handle);
693       packet++;
694     }
695 }
696
697
698 /**
699  * Task for calling the read processor
700  *
701  * @param cls the socket
702  * @param tc the task context
703  */
704 static void
705 call_read_processor (void *cls,
706                           const struct GNUNET_SCHEDULER_TaskContext *tc)
707 {
708   struct GNUNET_STREAM_Socket *socket = cls;
709   size_t read_size;
710   size_t valid_read_size;
711   unsigned int packet;
712   uint32_t sequence_increase;
713   uint32_t offset_increase;
714
715   socket->read_task = GNUNET_SCHEDULER_NO_TASK;
716   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
717     return;
718
719   GNUNET_assert (NULL != socket->read_handle);
720   GNUNET_assert (NULL != socket->read_handle->proc);
721
722   /* Check the bitmap for any holes */
723   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
724     {
725       if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
726                                              packet))
727         break;
728     }
729   /* We only call read processor if we have the first packet */
730   GNUNET_assert (0 < packet);
731
732   valid_read_size = 
733     socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
734
735   GNUNET_assert (0 != valid_read_size);
736
737   /* Cancel the read_io_timeout_task */
738   GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task);
739   socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK;
740
741   /* Call the data processor */
742   read_size = 
743     socket->read_handle->proc (socket->read_handle->proc_cls,
744                                socket->status,
745                                socket->receive_buffer + socket->copy_offset,
746                                valid_read_size);
747   /* Free the read handle */
748   GNUNET_free (socket->read_handle);
749   socket->read_handle = NULL;
750
751   GNUNET_assert (read_size <= valid_read_size);
752   socket->copy_offset += read_size;
753
754   /* Determine upto which packet we can remove from the buffer */
755   for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
756     if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
757       break;
758
759   /* If no packets can be removed we can't move the buffer */
760   if (0 == packet) return;
761
762   sequence_increase = packet;
763
764   /* Shift the data in the receive buffer */
765   memmove (socket->receive_buffer,
766            socket->receive_buffer 
767            + socket->receive_buffer_boundaries[sequence_increase-1],
768            socket->receive_buffer_size - socket->receive_buffer_boundaries[sequence_increase-1]);
769   
770   /* Shift the bitmap */
771   socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
772   
773   /* Set read_sequence_number */
774   socket->read_sequence_number += sequence_increase;
775   
776   /* Set read_offset */
777   offset_increase = socket->receive_buffer_boundaries[sequence_increase-1];
778   socket->read_offset += offset_increase;
779
780   /* Fix copy_offset */
781   GNUNET_assert (offset_increase <= socket->copy_offset);
782   socket->copy_offset -= offset_increase;
783   
784   /* Fix relative boundaries */
785   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
786     {
787       if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase)
788         {
789           socket->receive_buffer_boundaries[packet] = 
790             socket->receive_buffer_boundaries[packet + sequence_increase] 
791             - offset_increase;
792         }
793       else
794         socket->receive_buffer_boundaries[packet] = 0;
795     }
796 }
797
798
799 /**
800  * Cancels the existing read io handle
801  *
802  * @param cls the closure from the SCHEDULER call
803  * @param tc the task context
804  */
805 static void
806 read_io_timeout (void *cls, 
807                 const struct GNUNET_SCHEDULER_TaskContext *tc)
808 {
809   struct GNUNET_STREAM_Socket *socket = cls;
810
811   socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK;
812   if (socket->read_task != GNUNET_SCHEDULER_NO_TASK)
813   {
814     GNUNET_SCHEDULER_cancel (socket->read_task);
815     socket->read_task = GNUNET_SCHEDULER_NO_TASK;
816   }
817   GNUNET_assert (NULL != socket->read_handle);
818   
819   GNUNET_free (socket->read_handle);
820   socket->read_handle = NULL;
821 }
822
823
824 /**
825  * Handler for DATA messages; Same for both client and server
826  *
827  * @param socket the socket through which the ack was received
828  * @param tunnel connection to the other end
829  * @param sender who sent the message
830  * @param msg the data message
831  * @param atsi performance data for the connection
832  * @return GNUNET_OK to keep the connection open,
833  *         GNUNET_SYSERR to close it (signal serious error)
834  */
835 static int
836 handle_data (struct GNUNET_STREAM_Socket *socket,
837              struct GNUNET_MESH_Tunnel *tunnel,
838              const struct GNUNET_PeerIdentity *sender,
839              const struct GNUNET_STREAM_DataMessage *msg,
840              const struct GNUNET_ATS_Information*atsi)
841 {
842   const void *payload;
843   uint32_t bytes_needed;
844   uint32_t relative_offset;
845   uint32_t relative_sequence_number;
846   uint16_t size;
847
848   size = htons (msg->header.header.size);
849   if (size < sizeof (struct GNUNET_STREAM_DataMessage))
850     {
851       GNUNET_break_op (0);
852       return GNUNET_SYSERR;
853     }
854
855   switch (socket->state)
856     {
857     case STATE_ESTABLISHED:
858     case STATE_TRANSMIT_CLOSED:
859     case STATE_TRANSMIT_CLOSE_WAIT:
860
861       /* check if the message's sequence number is in the range we are
862          expecting */
863       relative_sequence_number = 
864         ntohl (msg->sequence_number) - socket->read_sequence_number;
865       if ( relative_sequence_number > 64)
866         {
867           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
868                       "Ignoring received message with sequence number %d",
869                       ntohl (msg->sequence_number));
870           return GNUNET_YES;
871         }
872
873       /* Check if we have to allocate the buffer */
874       size -= sizeof (struct GNUNET_STREAM_DataMessage);
875       relative_offset = ntohl (msg->offset) - socket->read_offset;
876       bytes_needed = relative_offset + size;
877       
878       if (bytes_needed > socket->receive_buffer_size)
879         {
880           if (bytes_needed <= RECEIVE_BUFFER_SIZE)
881             {
882               socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
883                                                        bytes_needed);
884               socket->receive_buffer_size = bytes_needed;
885             }
886           else
887             {
888               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
889                           "Cannot accommodate packet %d as buffer is full\n",
890                           ntohl (msg->sequence_number));
891               return GNUNET_YES;
892             }
893         }
894       
895       /* Copy Data to buffer */
896       payload = &msg[1];
897       GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
898       memcpy (socket->receive_buffer + relative_offset,
899               payload,
900               size);
901       socket->receive_buffer_boundaries[relative_sequence_number] = 
902         relative_offset + size;
903       
904       /* Modify the ACK bitmap */
905       ackbitmap_modify_bit (&socket->ack_bitmap,
906                             relative_sequence_number,
907                             GNUNET_YES);
908
909       /* Start ACK sending task if one is not already present */
910       if (0 == socket->ack_task_id)
911        {
912          socket->ack_task_id = 
913            GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
914                                          (msg->ack_deadline),
915                                          &ack_task,
916                                          socket);
917        }
918
919       if ((NULL != socket->read_handle) /* A read handle is waiting */
920           /* There is no current read task */
921           && (GNUNET_SCHEDULER_NO_TASK == socket->read_task)
922           /* We have the first packet */
923           && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap,
924                                                  0)))
925         {
926           socket->read_task = 
927             GNUNET_SCHEDULER_add_now (&call_read_processor,
928                                       socket);
929         }
930       
931       break;
932
933     default:
934       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
935                   "%s: Received data message when it cannot be handled\n",
936                   GNUNET_i2s (&socket->our_id));
937       break;
938     }
939   return GNUNET_YES;
940 }
941
942 /**
943  * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
944  *
945  * @param cls the socket (set from GNUNET_MESH_connect)
946  * @param tunnel connection to the other end
947  * @param tunnel_ctx place to store local state associated with the tunnel
948  * @param sender who sent the message
949  * @param message the actual message
950  * @param atsi performance data for the connection
951  * @return GNUNET_OK to keep the connection open,
952  *         GNUNET_SYSERR to close it (signal serious error)
953  */
954 static int
955 client_handle_data (void *cls,
956              struct GNUNET_MESH_Tunnel *tunnel,
957              void **tunnel_ctx,
958              const struct GNUNET_PeerIdentity *sender,
959              const struct GNUNET_MessageHeader *message,
960              const struct GNUNET_ATS_Information*atsi)
961 {
962   struct GNUNET_STREAM_Socket *socket = cls;
963
964   return handle_data (socket, 
965                       tunnel, 
966                       sender, 
967                       (const struct GNUNET_STREAM_DataMessage *) message, 
968                       atsi);
969 }
970
971
972 /**
973  * Callback to set state to ESTABLISHED
974  *
975  * @param cls the closure from queue_message FIXME: document
976  * @param socket the socket to requiring state change
977  */
978 static void
979 set_state_established (void *cls,
980                        struct GNUNET_STREAM_Socket *socket)
981 {
982   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
983               "%s: Attaining ESTABLISHED state\n",
984               GNUNET_i2s (&socket->our_id));
985   socket->write_offset = 0;
986   socket->read_offset = 0;
987   socket->state = STATE_ESTABLISHED;
988   if (socket->open_cb)
989     socket->open_cb (socket->open_cls, socket);
990 }
991
992
993 /**
994  * Callback to set state to HELLO_WAIT
995  *
996  * @param cls the closure from queue_message
997  * @param socket the socket to requiring state change
998  */
999 static void
1000 set_state_hello_wait (void *cls,
1001                       struct GNUNET_STREAM_Socket *socket)
1002 {
1003   GNUNET_assert (STATE_INIT == socket->state);
1004   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
1005               "%s: Attaining HELLO_WAIT state\n",
1006               GNUNET_i2s (&socket->our_id));
1007   socket->state = STATE_HELLO_WAIT;
1008 }
1009
1010
1011 /**
1012  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1013  *
1014  * @param cls the socket (set from GNUNET_MESH_connect)
1015  * @param tunnel connection to the other end
1016  * @param tunnel_ctx this is NULL
1017  * @param sender who sent the message
1018  * @param message the actual message
1019  * @param atsi performance data for the connection
1020  * @return GNUNET_OK to keep the connection open,
1021  *         GNUNET_SYSERR to close it (signal serious error)
1022  */
1023 static int
1024 client_handle_hello_ack (void *cls,
1025                          struct GNUNET_MESH_Tunnel *tunnel,
1026                          void **tunnel_ctx,
1027                          const struct GNUNET_PeerIdentity *sender,
1028                          const struct GNUNET_MessageHeader *message,
1029                          const struct GNUNET_ATS_Information*atsi)
1030 {
1031   struct GNUNET_STREAM_Socket *socket = cls;
1032   const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
1033   struct GNUNET_STREAM_HelloAckMessage *reply;
1034
1035   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1036               "%s: Received HELLO_ACK from %s\n",
1037               GNUNET_i2s (&socket->our_id),
1038               GNUNET_i2s (sender));
1039   ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
1040   GNUNET_assert (socket->tunnel == tunnel);
1041   switch (socket->state)
1042   {
1043   case STATE_HELLO_WAIT:
1044       socket->read_sequence_number = ntohl (ack_msg->sequence_number);
1045       socket->receive_window_available = ntohl (ack_msg->receive_window_size);
1046       /* Get the random sequence number */
1047       socket->write_sequence_number = 
1048         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1049       reply = 
1050         GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1051       reply->header.header.size = 
1052         htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1053       reply->header.header.type = 
1054         htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1055       reply->sequence_number = htonl (socket->write_sequence_number);
1056       reply->receive_window_size = htonl (RECEIVE_BUFFER_SIZE);
1057       queue_message (socket, 
1058                      &reply->header, 
1059                      &set_state_established, 
1060                      NULL);      
1061       return GNUNET_OK;
1062   case STATE_ESTABLISHED:
1063   case STATE_RECEIVE_CLOSE_WAIT:
1064     // call statistics (# ACKs ignored++)
1065     return GNUNET_OK;
1066   case STATE_INIT:
1067   default:
1068     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1069                 "%s: Server sent HELLO_ACK when in state %d\n", 
1070                 GNUNET_i2s (&socket->our_id),
1071                 socket->state);
1072     socket->state = STATE_CLOSED; // introduce STATE_ERROR?
1073     return GNUNET_SYSERR;
1074   }
1075
1076 }
1077
1078
1079 /**
1080  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1081  *
1082  * @param cls the socket (set from GNUNET_MESH_connect)
1083  * @param tunnel connection to the other end
1084  * @param tunnel_ctx this is NULL
1085  * @param sender who sent the message
1086  * @param message the actual message
1087  * @param atsi performance data for the connection
1088  * @return GNUNET_OK to keep the connection open,
1089  *         GNUNET_SYSERR to close it (signal serious error)
1090  */
1091 static int
1092 client_handle_reset (void *cls,
1093                      struct GNUNET_MESH_Tunnel *tunnel,
1094                      void **tunnel_ctx,
1095                      const struct GNUNET_PeerIdentity *sender,
1096                      const struct GNUNET_MessageHeader *message,
1097                      const struct GNUNET_ATS_Information*atsi)
1098 {
1099   struct GNUNET_STREAM_Socket *socket = cls;
1100
1101   return GNUNET_OK;
1102 }
1103
1104
1105 /**
1106  * Common message handler for handling TRANSMIT_CLOSE messages
1107  *
1108  * @param socket the socket through which the ack was received
1109  * @param tunnel connection to the other end
1110  * @param sender who sent the message
1111  * @param msg the transmit close message
1112  * @param atsi performance data for the connection
1113  * @return GNUNET_OK to keep the connection open,
1114  *         GNUNET_SYSERR to close it (signal serious error)
1115  */
1116 static int
1117 handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
1118                        struct GNUNET_MESH_Tunnel *tunnel,
1119                        const struct GNUNET_PeerIdentity *sender,
1120                        const struct GNUNET_STREAM_MessageHeader *msg,
1121                        const struct GNUNET_ATS_Information*atsi)
1122 {
1123   struct GNUNET_STREAM_MessageHeader *reply;
1124
1125   switch (socket->state)
1126     {
1127     case STATE_ESTABLISHED:
1128       socket->state = STATE_RECEIVE_CLOSED;
1129
1130       /* Send TRANSMIT_CLOSE_ACK */
1131       reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1132       reply->header.type = 
1133         htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
1134       reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1135       queue_message (socket, reply, NULL, NULL);
1136       break;
1137
1138     default:
1139       /* FIXME: Call statistics? */
1140       break;
1141     }
1142   return GNUNET_YES;
1143 }
1144
1145
1146 /**
1147  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1148  *
1149  * @param cls the socket (set from GNUNET_MESH_connect)
1150  * @param tunnel connection to the other end
1151  * @param tunnel_ctx this is NULL
1152  * @param sender who sent the message
1153  * @param message the actual message
1154  * @param atsi performance data for the connection
1155  * @return GNUNET_OK to keep the connection open,
1156  *         GNUNET_SYSERR to close it (signal serious error)
1157  */
1158 static int
1159 client_handle_transmit_close (void *cls,
1160                               struct GNUNET_MESH_Tunnel *tunnel,
1161                               void **tunnel_ctx,
1162                               const struct GNUNET_PeerIdentity *sender,
1163                               const struct GNUNET_MessageHeader *message,
1164                               const struct GNUNET_ATS_Information*atsi)
1165 {
1166   struct GNUNET_STREAM_Socket *socket = cls;
1167   
1168   return handle_transmit_close (socket,
1169                                 tunnel,
1170                                 sender,
1171                                 (struct GNUNET_STREAM_MessageHeader *)message,
1172                                 atsi);
1173 }
1174
1175
1176 /**
1177  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1178  *
1179  * @param cls the socket (set from GNUNET_MESH_connect)
1180  * @param tunnel connection to the other end
1181  * @param tunnel_ctx this is NULL
1182  * @param sender who sent the message
1183  * @param message the actual message
1184  * @param atsi performance data for the connection
1185  * @return GNUNET_OK to keep the connection open,
1186  *         GNUNET_SYSERR to close it (signal serious error)
1187  */
1188 static int
1189 client_handle_transmit_close_ack (void *cls,
1190                                   struct GNUNET_MESH_Tunnel *tunnel,
1191                                   void **tunnel_ctx,
1192                                   const struct GNUNET_PeerIdentity *sender,
1193                                   const struct GNUNET_MessageHeader *message,
1194                                   const struct GNUNET_ATS_Information*atsi)
1195 {
1196   struct GNUNET_STREAM_Socket *socket = cls;
1197
1198   return GNUNET_OK;
1199 }
1200
1201
1202 /**
1203  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1204  *
1205  * @param cls the socket (set from GNUNET_MESH_connect)
1206  * @param tunnel connection to the other end
1207  * @param tunnel_ctx this is NULL
1208  * @param sender who sent the message
1209  * @param message the actual message
1210  * @param atsi performance data for the connection
1211  * @return GNUNET_OK to keep the connection open,
1212  *         GNUNET_SYSERR to close it (signal serious error)
1213  */
1214 static int
1215 client_handle_receive_close (void *cls,
1216                              struct GNUNET_MESH_Tunnel *tunnel,
1217                              void **tunnel_ctx,
1218                              const struct GNUNET_PeerIdentity *sender,
1219                              const struct GNUNET_MessageHeader *message,
1220                              const struct GNUNET_ATS_Information*atsi)
1221 {
1222   struct GNUNET_STREAM_Socket *socket = cls;
1223
1224   return GNUNET_OK;
1225 }
1226
1227
1228 /**
1229  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1230  *
1231  * @param cls the socket (set from GNUNET_MESH_connect)
1232  * @param tunnel connection to the other end
1233  * @param tunnel_ctx this is NULL
1234  * @param sender who sent the message
1235  * @param message the actual message
1236  * @param atsi performance data for the connection
1237  * @return GNUNET_OK to keep the connection open,
1238  *         GNUNET_SYSERR to close it (signal serious error)
1239  */
1240 static int
1241 client_handle_receive_close_ack (void *cls,
1242                                  struct GNUNET_MESH_Tunnel *tunnel,
1243                                  void **tunnel_ctx,
1244                                  const struct GNUNET_PeerIdentity *sender,
1245                                  const struct GNUNET_MessageHeader *message,
1246                                  const struct GNUNET_ATS_Information*atsi)
1247 {
1248   struct GNUNET_STREAM_Socket *socket = cls;
1249
1250   return GNUNET_OK;
1251 }
1252
1253
1254 /**
1255  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1256  *
1257  * @param cls the socket (set from GNUNET_MESH_connect)
1258  * @param tunnel connection to the other end
1259  * @param tunnel_ctx this is NULL
1260  * @param sender who sent the message
1261  * @param message the actual message
1262  * @param atsi performance data for the connection
1263  * @return GNUNET_OK to keep the connection open,
1264  *         GNUNET_SYSERR to close it (signal serious error)
1265  */
1266 static int
1267 client_handle_close (void *cls,
1268                      struct GNUNET_MESH_Tunnel *tunnel,
1269                      void **tunnel_ctx,
1270                      const struct GNUNET_PeerIdentity *sender,
1271                      const struct GNUNET_MessageHeader *message,
1272                      const struct GNUNET_ATS_Information*atsi)
1273 {
1274   struct GNUNET_STREAM_Socket *socket = cls;
1275
1276   return GNUNET_OK;
1277 }
1278
1279
1280 /**
1281  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1282  *
1283  * @param cls the socket (set from GNUNET_MESH_connect)
1284  * @param tunnel connection to the other end
1285  * @param tunnel_ctx this is NULL
1286  * @param sender who sent the message
1287  * @param message the actual message
1288  * @param atsi performance data for the connection
1289  * @return GNUNET_OK to keep the connection open,
1290  *         GNUNET_SYSERR to close it (signal serious error)
1291  */
1292 static int
1293 client_handle_close_ack (void *cls,
1294                          struct GNUNET_MESH_Tunnel *tunnel,
1295                          void **tunnel_ctx,
1296                          const struct GNUNET_PeerIdentity *sender,
1297                          const struct GNUNET_MessageHeader *message,
1298                          const struct GNUNET_ATS_Information*atsi)
1299 {
1300   struct GNUNET_STREAM_Socket *socket = cls;
1301
1302   return GNUNET_OK;
1303 }
1304
1305 /*****************************/
1306 /* Server's Message Handlers */
1307 /*****************************/
1308
1309 /**
1310  * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1311  *
1312  * @param cls the closure
1313  * @param tunnel connection to the other end
1314  * @param tunnel_ctx the socket
1315  * @param sender who sent the message
1316  * @param message the actual message
1317  * @param atsi performance data for the connection
1318  * @return GNUNET_OK to keep the connection open,
1319  *         GNUNET_SYSERR to close it (signal serious error)
1320  */
1321 static int
1322 server_handle_data (void *cls,
1323                     struct GNUNET_MESH_Tunnel *tunnel,
1324                     void **tunnel_ctx,
1325                     const struct GNUNET_PeerIdentity *sender,
1326                     const struct GNUNET_MessageHeader *message,
1327                     const struct GNUNET_ATS_Information*atsi)
1328 {
1329   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1330
1331   return handle_data (socket,
1332                       tunnel,
1333                       sender,
1334                       (const struct GNUNET_STREAM_DataMessage *)message,
1335                       atsi);
1336 }
1337
1338
1339 /**
1340  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
1341  *
1342  * @param cls the closure
1343  * @param tunnel connection to the other end
1344  * @param tunnel_ctx the socket
1345  * @param sender who sent the message
1346  * @param message the actual message
1347  * @param atsi performance data for the connection
1348  * @return GNUNET_OK to keep the connection open,
1349  *         GNUNET_SYSERR to close it (signal serious error)
1350  */
1351 static int
1352 server_handle_hello (void *cls,
1353                      struct GNUNET_MESH_Tunnel *tunnel,
1354                      void **tunnel_ctx,
1355                      const struct GNUNET_PeerIdentity *sender,
1356                      const struct GNUNET_MessageHeader *message,
1357                      const struct GNUNET_ATS_Information*atsi)
1358 {
1359   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1360   struct GNUNET_STREAM_HelloAckMessage *reply;
1361
1362   GNUNET_assert (socket->tunnel == tunnel);
1363   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1364               "Received HELLO from %s\n", GNUNET_i2s(sender));
1365
1366   /* Catch possible protocol breaks */
1367   GNUNET_break_op (0 == memcmp (&socket->other_peer, 
1368                                 sender,
1369                                 sizeof (struct GNUNET_PeerIdentity)));
1370
1371   if (STATE_INIT == socket->state)
1372     {
1373       /* Get the random sequence number */
1374       socket->write_sequence_number = 
1375         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1376       reply = 
1377         GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1378       reply->header.header.size = 
1379         htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1380       reply->header.header.type = 
1381         htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1382       reply->sequence_number = htonl (socket->write_sequence_number);
1383       queue_message (socket, 
1384                      &reply->header,
1385                      &set_state_hello_wait, 
1386                      NULL);
1387     }
1388   else
1389     {
1390       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1391                   "Client sent HELLO when in state %d\n", socket->state);
1392       /* FIXME: Send RESET? */
1393       
1394     }
1395   return GNUNET_OK;
1396 }
1397
1398
1399 /**
1400  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1401  *
1402  * @param cls the closure
1403  * @param tunnel connection to the other end
1404  * @param tunnel_ctx the socket
1405  * @param sender who sent the message
1406  * @param message the actual message
1407  * @param atsi performance data for the connection
1408  * @return GNUNET_OK to keep the connection open,
1409  *         GNUNET_SYSERR to close it (signal serious error)
1410  */
1411 static int
1412 server_handle_hello_ack (void *cls,
1413                          struct GNUNET_MESH_Tunnel *tunnel,
1414                          void **tunnel_ctx,
1415                          const struct GNUNET_PeerIdentity *sender,
1416                          const struct GNUNET_MessageHeader *message,
1417                          const struct GNUNET_ATS_Information*atsi)
1418 {
1419   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1420   const struct GNUNET_STREAM_HelloAckMessage *ack_message;
1421
1422   ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
1423   GNUNET_assert (socket->tunnel == tunnel);
1424   if (STATE_HELLO_WAIT == socket->state)
1425     {
1426       socket->read_sequence_number = ntohl (ack_message->sequence_number);
1427       socket->receive_window_available = 
1428         ntohl (ack_message->receive_window_size);
1429       /* Attain ESTABLISHED state */
1430       set_state_established (NULL, socket);
1431     }
1432   else
1433     {
1434       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1435                   "Client sent HELLO_ACK when in state %d\n", socket->state);
1436       /* FIXME: Send RESET? */
1437       
1438     }
1439   return GNUNET_OK;
1440 }
1441
1442
1443 /**
1444  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1445  *
1446  * @param cls the closure
1447  * @param tunnel connection to the other end
1448  * @param tunnel_ctx the socket
1449  * @param sender who sent the message
1450  * @param message the actual message
1451  * @param atsi performance data for the connection
1452  * @return GNUNET_OK to keep the connection open,
1453  *         GNUNET_SYSERR to close it (signal serious error)
1454  */
1455 static int
1456 server_handle_reset (void *cls,
1457                      struct GNUNET_MESH_Tunnel *tunnel,
1458                      void **tunnel_ctx,
1459                      const struct GNUNET_PeerIdentity *sender,
1460                      const struct GNUNET_MessageHeader *message,
1461                      const struct GNUNET_ATS_Information*atsi)
1462 {
1463   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1464
1465   return GNUNET_OK;
1466 }
1467
1468
1469 /**
1470  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1471  *
1472  * @param cls the closure
1473  * @param tunnel connection to the other end
1474  * @param tunnel_ctx the socket
1475  * @param sender who sent the message
1476  * @param message the actual message
1477  * @param atsi performance data for the connection
1478  * @return GNUNET_OK to keep the connection open,
1479  *         GNUNET_SYSERR to close it (signal serious error)
1480  */
1481 static int
1482 server_handle_transmit_close (void *cls,
1483                               struct GNUNET_MESH_Tunnel *tunnel,
1484                               void **tunnel_ctx,
1485                               const struct GNUNET_PeerIdentity *sender,
1486                               const struct GNUNET_MessageHeader *message,
1487                               const struct GNUNET_ATS_Information*atsi)
1488 {
1489   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1490
1491   return handle_transmit_close (socket,
1492                                 tunnel,
1493                                 sender,
1494                                 (struct GNUNET_STREAM_MessageHeader *)message,
1495                                 atsi);
1496 }
1497
1498
1499 /**
1500  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1501  *
1502  * @param cls the closure
1503  * @param tunnel connection to the other end
1504  * @param tunnel_ctx the socket
1505  * @param sender who sent the message
1506  * @param message the actual message
1507  * @param atsi performance data for the connection
1508  * @return GNUNET_OK to keep the connection open,
1509  *         GNUNET_SYSERR to close it (signal serious error)
1510  */
1511 static int
1512 server_handle_transmit_close_ack (void *cls,
1513                                   struct GNUNET_MESH_Tunnel *tunnel,
1514                                   void **tunnel_ctx,
1515                                   const struct GNUNET_PeerIdentity *sender,
1516                                   const struct GNUNET_MessageHeader *message,
1517                                   const struct GNUNET_ATS_Information*atsi)
1518 {
1519   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1520
1521   return GNUNET_OK;
1522 }
1523
1524
1525 /**
1526  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1527  *
1528  * @param cls the closure
1529  * @param tunnel connection to the other end
1530  * @param tunnel_ctx the socket
1531  * @param sender who sent the message
1532  * @param message the actual message
1533  * @param atsi performance data for the connection
1534  * @return GNUNET_OK to keep the connection open,
1535  *         GNUNET_SYSERR to close it (signal serious error)
1536  */
1537 static int
1538 server_handle_receive_close (void *cls,
1539                              struct GNUNET_MESH_Tunnel *tunnel,
1540                              void **tunnel_ctx,
1541                              const struct GNUNET_PeerIdentity *sender,
1542                              const struct GNUNET_MessageHeader *message,
1543                              const struct GNUNET_ATS_Information*atsi)
1544 {
1545   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1546
1547   return GNUNET_OK;
1548 }
1549
1550
1551 /**
1552  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1553  *
1554  * @param cls the closure
1555  * @param tunnel connection to the other end
1556  * @param tunnel_ctx the socket
1557  * @param sender who sent the message
1558  * @param message the actual message
1559  * @param atsi performance data for the connection
1560  * @return GNUNET_OK to keep the connection open,
1561  *         GNUNET_SYSERR to close it (signal serious error)
1562  */
1563 static int
1564 server_handle_receive_close_ack (void *cls,
1565                                  struct GNUNET_MESH_Tunnel *tunnel,
1566                                  void **tunnel_ctx,
1567                                  const struct GNUNET_PeerIdentity *sender,
1568                                  const struct GNUNET_MessageHeader *message,
1569                                  const struct GNUNET_ATS_Information*atsi)
1570 {
1571   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1572
1573   return GNUNET_OK;
1574 }
1575
1576
1577 /**
1578  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1579  *
1580  * @param cls the closure
1581  * @param tunnel connection to the other end
1582  * @param tunnel_ctx the socket
1583  * @param sender who sent the message
1584  * @param message the actual message
1585  * @param atsi performance data for the connection
1586  * @return GNUNET_OK to keep the connection open,
1587  *         GNUNET_SYSERR to close it (signal serious error)
1588  */
1589 static int
1590 server_handle_close (void *cls,
1591                      struct GNUNET_MESH_Tunnel *tunnel,
1592                      void **tunnel_ctx,
1593                      const struct GNUNET_PeerIdentity *sender,
1594                      const struct GNUNET_MessageHeader *message,
1595                      const struct GNUNET_ATS_Information*atsi)
1596 {
1597   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1598
1599   return GNUNET_OK;
1600 }
1601
1602
1603 /**
1604  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1605  *
1606  * @param cls the closure
1607  * @param tunnel connection to the other end
1608  * @param tunnel_ctx the socket
1609  * @param sender who sent the message
1610  * @param message the actual message
1611  * @param atsi performance data for the connection
1612  * @return GNUNET_OK to keep the connection open,
1613  *         GNUNET_SYSERR to close it (signal serious error)
1614  */
1615 static int
1616 server_handle_close_ack (void *cls,
1617                          struct GNUNET_MESH_Tunnel *tunnel,
1618                          void **tunnel_ctx,
1619                          const struct GNUNET_PeerIdentity *sender,
1620                          const struct GNUNET_MessageHeader *message,
1621                          const struct GNUNET_ATS_Information*atsi)
1622 {
1623   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1624
1625   return GNUNET_OK;
1626 }
1627
1628
1629 /**
1630  * Message Handler for mesh
1631  *
1632  * @param socket the socket through which the ack was received
1633  * @param tunnel connection to the other end
1634  * @param sender who sent the message
1635  * @param ack the acknowledgment message
1636  * @param atsi performance data for the connection
1637  * @return GNUNET_OK to keep the connection open,
1638  *         GNUNET_SYSERR to close it (signal serious error)
1639  */
1640 static int
1641 handle_ack (struct GNUNET_STREAM_Socket *socket,
1642             struct GNUNET_MESH_Tunnel *tunnel,
1643             const struct GNUNET_PeerIdentity *sender,
1644             const struct GNUNET_STREAM_AckMessage *ack,
1645             const struct GNUNET_ATS_Information*atsi)
1646 {
1647   switch (socket->state)
1648     {
1649     case (STATE_ESTABLISHED):
1650       if (NULL == socket->write_handle)
1651         {
1652           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1653                       "Received DATA ACK when write_handle is NULL\n");
1654           return GNUNET_OK;
1655         }
1656
1657       socket->write_handle->ack_bitmap = GNUNET_ntohll (ack->bitmap);
1658       socket->receive_window_available = 
1659         ntohl (ack->receive_window_remaining);
1660       write_data (socket);
1661       break;
1662     default:
1663       break;
1664     }
1665   return GNUNET_OK;
1666 }
1667
1668
1669 /**
1670  * Message Handler for mesh
1671  *
1672  * @param cls the 'struct GNUNET_STREAM_Socket'
1673  * @param tunnel connection to the other end
1674  * @param tunnel_ctx unused
1675  * @param sender who sent the message
1676  * @param message the actual message
1677  * @param atsi performance data for the connection
1678  * @return GNUNET_OK to keep the connection open,
1679  *         GNUNET_SYSERR to close it (signal serious error)
1680  */
1681 static int
1682 client_handle_ack (void *cls,
1683                    struct GNUNET_MESH_Tunnel *tunnel,
1684                    void **tunnel_ctx,
1685                    const struct GNUNET_PeerIdentity *sender,
1686                    const struct GNUNET_MessageHeader *message,
1687                    const struct GNUNET_ATS_Information*atsi)
1688 {
1689   struct GNUNET_STREAM_Socket *socket = cls;
1690   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
1691  
1692   return handle_ack (socket, tunnel, sender, ack, atsi);
1693 }
1694
1695
1696 /**
1697  * Message Handler for mesh
1698  *
1699  * @param cls the server's listen socket
1700  * @param tunnel connection to the other end
1701  * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
1702  * @param sender who sent the message
1703  * @param message the actual message
1704  * @param atsi performance data for the connection
1705  * @return GNUNET_OK to keep the connection open,
1706  *         GNUNET_SYSERR to close it (signal serious error)
1707  */
1708 static int
1709 server_handle_ack (void *cls,
1710                    struct GNUNET_MESH_Tunnel *tunnel,
1711                    void **tunnel_ctx,
1712                    const struct GNUNET_PeerIdentity *sender,
1713                    const struct GNUNET_MessageHeader *message,
1714                    const struct GNUNET_ATS_Information*atsi)
1715 {
1716   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1717   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
1718  
1719   return handle_ack (socket, tunnel, sender, ack, atsi);
1720 }
1721
1722
1723 /**
1724  * For client message handlers, the stream socket is in the
1725  * closure argument.
1726  */
1727 static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
1728   {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
1729   {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
1730    sizeof (struct GNUNET_STREAM_AckMessage) },
1731   {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
1732    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
1733   {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
1734    sizeof (struct GNUNET_STREAM_MessageHeader)},
1735   {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
1736    sizeof (struct GNUNET_STREAM_MessageHeader)},
1737   {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
1738    sizeof (struct GNUNET_STREAM_MessageHeader)},
1739   {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
1740    sizeof (struct GNUNET_STREAM_MessageHeader)},
1741   {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
1742    sizeof (struct GNUNET_STREAM_MessageHeader)},
1743   {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
1744    sizeof (struct GNUNET_STREAM_MessageHeader)},
1745   {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
1746    sizeof (struct GNUNET_STREAM_MessageHeader)},
1747   {NULL, 0, 0}
1748 };
1749
1750
1751 /**
1752  * For server message handlers, the stream socket is in the
1753  * tunnel context, and the listen socket in the closure argument.
1754  */
1755 static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
1756   {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
1757   {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
1758    sizeof (struct GNUNET_STREAM_AckMessage) },
1759   {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 
1760    sizeof (struct GNUNET_STREAM_MessageHeader)},
1761   {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
1762    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
1763   {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
1764    sizeof (struct GNUNET_STREAM_MessageHeader)},
1765   {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
1766    sizeof (struct GNUNET_STREAM_MessageHeader)},
1767   {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
1768    sizeof (struct GNUNET_STREAM_MessageHeader)},
1769   {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
1770    sizeof (struct GNUNET_STREAM_MessageHeader)},
1771   {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
1772    sizeof (struct GNUNET_STREAM_MessageHeader)},
1773   {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
1774    sizeof (struct GNUNET_STREAM_MessageHeader)},
1775   {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
1776    sizeof (struct GNUNET_STREAM_MessageHeader)},
1777   {NULL, 0, 0}
1778 };
1779
1780
1781 /**
1782  * Function called when our target peer is connected to our tunnel
1783  *
1784  * @param cls the socket for which this tunnel is created
1785  * @param peer the peer identity of the target
1786  * @param atsi performance data for the connection
1787  */
1788 static void
1789 mesh_peer_connect_callback (void *cls,
1790                             const struct GNUNET_PeerIdentity *peer,
1791                             const struct GNUNET_ATS_Information * atsi)
1792 {
1793   struct GNUNET_STREAM_Socket *socket = cls;
1794   struct GNUNET_STREAM_MessageHeader *message;
1795
1796   if (0 != memcmp (&socket->other_peer, 
1797                    peer, 
1798                    sizeof (struct GNUNET_PeerIdentity)))
1799     {
1800       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1801                   "%s: A peer (%s) which is not our target has connected",
1802                   "to our tunnel",
1803                   GNUNET_i2s (&socket->our_id),
1804                   GNUNET_i2s (peer));
1805       return;
1806     }
1807   
1808   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1809               "Target peer %s connected\n", GNUNET_i2s (peer));
1810   
1811   /* Set state to INIT */
1812   socket->state = STATE_INIT;
1813
1814   /* Send HELLO message */
1815   message = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1816   message->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
1817   message->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1818   queue_message (socket,
1819                  message,
1820                  &set_state_hello_wait,
1821                  NULL);
1822
1823   /* Call open callback */
1824   if (NULL == socket->open_cb)
1825     {
1826       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1827                   "STREAM_open callback is NULL\n");
1828     }
1829 }
1830
1831
1832 /**
1833  * Function called when our target peer is disconnected from our tunnel
1834  *
1835  * @param cls the socket associated which this tunnel
1836  * @param peer the peer identity of the target
1837  */
1838 static void
1839 mesh_peer_disconnect_callback (void *cls,
1840                                const struct GNUNET_PeerIdentity *peer)
1841 {
1842
1843 }
1844
1845
1846 /**
1847  * Method called whenever a peer creates a tunnel to us
1848  *
1849  * @param cls closure
1850  * @param tunnel new handle to the tunnel
1851  * @param initiator peer that started the tunnel
1852  * @param atsi performance information for the tunnel
1853  * @return initial tunnel context for the tunnel
1854  *         (can be NULL -- that's not an error)
1855  */
1856 static void *
1857 new_tunnel_notify (void *cls,
1858                    struct GNUNET_MESH_Tunnel *tunnel,
1859                    const struct GNUNET_PeerIdentity *initiator,
1860                    const struct GNUNET_ATS_Information *atsi)
1861 {
1862   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
1863   struct GNUNET_STREAM_Socket *socket;
1864
1865   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1866               "%s: Peer %s initiated tunnel to us\n", 
1867               GNUNET_i2s (&lsocket->our_id),
1868               GNUNET_i2s (initiator));
1869
1870   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
1871   socket->tunnel = tunnel;
1872   socket->session_id = 0;       /* FIXME */
1873   socket->other_peer = *initiator;
1874   socket->state = STATE_INIT;
1875   socket->derived = GNUNET_YES;
1876   socket->our_id = lsocket->our_id;
1877   
1878   /* FIXME: Copy MESH handle from lsocket to socket */
1879
1880   if (GNUNET_SYSERR == lsocket->listen_cb (lsocket->listen_cb_cls,
1881                                            socket,
1882                                            &socket->other_peer))
1883     {
1884       socket->state = STATE_CLOSED;
1885       /* FIXME: Send CLOSE message and then free */
1886       GNUNET_free (socket);
1887       GNUNET_MESH_tunnel_destroy (tunnel); /* Destroy the tunnel */
1888     }
1889   return socket;
1890 }
1891
1892
1893 /**
1894  * Function called whenever an inbound tunnel is destroyed.  Should clean up
1895  * any associated state.  This function is NOT called if the client has
1896  * explicitly asked for the tunnel to be destroyed using
1897  * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
1898  * the tunnel.
1899  *
1900  * @param cls closure (set from GNUNET_MESH_connect)
1901  * @param tunnel connection to the other end (henceforth invalid)
1902  * @param tunnel_ctx place where local state associated
1903  *                   with the tunnel is stored
1904  */
1905 static void 
1906 tunnel_cleaner (void *cls,
1907                 const struct GNUNET_MESH_Tunnel *tunnel,
1908                 void *tunnel_ctx)
1909 {
1910   struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
1911   
1912   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1913               "%s: Peer %s has terminated connection abruptly\n",
1914               GNUNET_i2s (&socket->our_id),
1915               GNUNET_i2s (&socket->other_peer));
1916
1917   socket->status = GNUNET_STREAM_SHUTDOWN;
1918   /* Clear Transmit handles */
1919   if (NULL != socket->transmit_handle)
1920     {
1921       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
1922       socket->transmit_handle = NULL;
1923     }
1924   socket->tunnel = NULL;
1925 }
1926
1927
1928 /*****************/
1929 /* API functions */
1930 /*****************/
1931
1932
1933 /**
1934  * Tries to open a stream to the target peer
1935  *
1936  * @param cfg configuration to use
1937  * @param target the target peer to which the stream has to be opened
1938  * @param app_port the application port number which uniquely identifies this
1939  *            stream
1940  * @param open_cb this function will be called after stream has be established 
1941  * @param open_cb_cls the closure for open_cb
1942  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
1943  * @return if successful it returns the stream socket; NULL if stream cannot be
1944  *         opened 
1945  */
1946 struct GNUNET_STREAM_Socket *
1947 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
1948                     const struct GNUNET_PeerIdentity *target,
1949                     GNUNET_MESH_ApplicationType app_port,
1950                     GNUNET_STREAM_OpenCallback open_cb,
1951                     void *open_cb_cls,
1952                     ...)
1953 {
1954   struct GNUNET_STREAM_Socket *socket;
1955   enum GNUNET_STREAM_Option option;
1956   va_list vargs;                /* Variable arguments */
1957
1958   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1959               "%s\n", __func__);
1960
1961   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
1962   socket->other_peer = *target;
1963   socket->open_cb = open_cb;
1964   socket->open_cls = open_cb_cls;
1965   GNUNET_TESTING_get_peer_identity (cfg, &socket->our_id);
1966
1967   /* Set defaults */
1968   socket->retransmit_timeout = 
1969     GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
1970
1971   va_start (vargs, open_cb_cls); /* Parse variable args */
1972   do {
1973     option = va_arg (vargs, enum GNUNET_STREAM_Option);
1974     switch (option)
1975       {
1976       case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
1977         /* Expect struct GNUNET_TIME_Relative */
1978         socket->retransmit_timeout = va_arg (vargs,
1979                                              struct GNUNET_TIME_Relative);
1980         break;
1981       case GNUNET_STREAM_OPTION_END:
1982         break;
1983       }
1984   } while (GNUNET_STREAM_OPTION_END != option);
1985   va_end (vargs);               /* End of variable args parsing */
1986   socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
1987                                       10,  /* QUEUE size as parameter? */
1988                                       socket, /* cls */
1989                                       NULL, /* No inbound tunnel handler */
1990                                       &tunnel_cleaner,
1991                                       client_message_handlers,
1992                                       &app_port); /* We don't get inbound tunnels */
1993   if (NULL == socket->mesh)   /* Fail if we cannot connect to mesh */
1994     {
1995       GNUNET_free (socket);
1996       return NULL;
1997     }
1998
1999   /* Now create the mesh tunnel to target */
2000   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2001               "Creating MESH Tunnel\n");
2002   socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
2003                                               NULL, /* Tunnel context */
2004                                               &mesh_peer_connect_callback,
2005                                               &mesh_peer_disconnect_callback,
2006                                               socket);
2007   GNUNET_assert (NULL != socket->tunnel);
2008   GNUNET_MESH_peer_request_connect_add (socket->tunnel,
2009                                         target);
2010   
2011   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2012               "%s() END\n", __func__);
2013   return socket;
2014 }
2015
2016
2017 /**
2018  * Shutdown the stream for reading or writing (man 2 shutdown).
2019  *
2020  * @param socket the stream socket
2021  * @param how SHUT_RD, SHUT_WR or SHUT_RDWR 
2022  */
2023 void
2024 GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
2025                         int how)
2026 {
2027   return;
2028 }
2029
2030
2031 /**
2032  * Closes the stream
2033  *
2034  * @param socket the stream socket
2035  */
2036 void
2037 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
2038 {
2039   struct MessageQueue *head;
2040
2041   if (socket->read_task != GNUNET_SCHEDULER_NO_TASK)
2042   {
2043     /* socket closed with read task pending!? */
2044     GNUNET_break (0);
2045     GNUNET_SCHEDULER_cancel (socket->read_task);
2046     socket->read_task = GNUNET_SCHEDULER_NO_TASK;
2047   }
2048
2049   /* Clear Transmit handles */
2050   if (NULL != socket->transmit_handle)
2051     {
2052       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2053       socket->transmit_handle = NULL;
2054     }
2055
2056   /* Clear existing message queue */
2057   while (NULL != (head = socket->queue_head)) {
2058     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
2059                                  socket->queue_tail,
2060                                  head);
2061     GNUNET_free (head->message);
2062     GNUNET_free (head);
2063   }
2064
2065   /* Close associated tunnel */
2066   if (NULL != socket->tunnel)
2067     {
2068       GNUNET_MESH_tunnel_destroy (socket->tunnel);
2069       socket->tunnel = NULL;
2070     }
2071
2072   /* Close mesh connection */
2073   if (NULL != socket->mesh && GNUNET_YES != socket->derived)
2074     {
2075       GNUNET_MESH_disconnect (socket->mesh);
2076       socket->mesh = NULL;
2077     }
2078   
2079   /* Release receive buffer */
2080   if (NULL != socket->receive_buffer)
2081     {
2082       GNUNET_free (socket->receive_buffer);
2083     }
2084
2085   GNUNET_free (socket);
2086 }
2087
2088
2089 /**
2090  * Listens for stream connections for a specific application ports
2091  *
2092  * @param cfg the configuration to use
2093  * @param app_port the application port for which new streams will be accepted
2094  * @param listen_cb this function will be called when a peer tries to establish
2095  *            a stream with us
2096  * @param listen_cb_cls closure for listen_cb
2097  * @return listen socket, NULL for any error
2098  */
2099 struct GNUNET_STREAM_ListenSocket *
2100 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
2101                       GNUNET_MESH_ApplicationType app_port,
2102                       GNUNET_STREAM_ListenCallback listen_cb,
2103                       void *listen_cb_cls)
2104 {
2105   /* FIXME: Add variable args for passing configration options? */
2106   struct GNUNET_STREAM_ListenSocket *lsocket;
2107
2108   lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
2109   lsocket->port = app_port;
2110   lsocket->listen_cb = listen_cb;
2111   lsocket->listen_cb_cls = listen_cb_cls;
2112   GNUNET_TESTING_get_peer_identity (cfg, &lsocket->our_id);
2113   lsocket->mesh = GNUNET_MESH_connect (cfg,
2114                                        10, /* FIXME: QUEUE size as parameter? */
2115                                        lsocket, /* Closure */
2116                                        &new_tunnel_notify,
2117                                        &tunnel_cleaner,
2118                                        server_message_handlers,
2119                                        &app_port);
2120   GNUNET_assert (NULL != lsocket->mesh);
2121   return lsocket;
2122 }
2123
2124
2125 /**
2126  * Closes the listen socket
2127  *
2128  * @param lsocket the listen socket
2129  */
2130 void
2131 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
2132 {
2133   /* Close MESH connection */
2134   GNUNET_assert (NULL != lsocket->mesh);
2135   GNUNET_MESH_disconnect (lsocket->mesh);
2136   
2137   GNUNET_free (lsocket);
2138 }
2139
2140
2141 /**
2142  * Tries to write the given data to the stream
2143  *
2144  * @param socket the socket representing a stream
2145  * @param data the data buffer from where the data is written into the stream
2146  * @param size the number of bytes to be written from the data buffer
2147  * @param timeout the timeout period
2148  * @param write_cont the function to call upon writing some bytes into the stream
2149  * @param write_cont_cls the closure
2150  * @return handle to cancel the operation
2151  */
2152 struct GNUNET_STREAM_IOWriteHandle *
2153 GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
2154                      const void *data,
2155                      size_t size,
2156                      struct GNUNET_TIME_Relative timeout,
2157                      GNUNET_STREAM_CompletionContinuation write_cont,
2158                      void *write_cont_cls)
2159 {
2160   unsigned int num_needed_packets;
2161   unsigned int packet;
2162   struct GNUNET_STREAM_IOWriteHandle *io_handle;
2163   uint32_t packet_size;
2164   uint32_t payload_size;
2165   struct GNUNET_STREAM_DataMessage *data_msg;
2166   const void *sweep;
2167
2168   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2169               "%s\n", __func__);
2170
2171   /* Return NULL if there is already a write request pending */
2172   if (NULL != socket->write_handle)
2173   {
2174     GNUNET_break (0);
2175     return NULL;
2176   }
2177   if (!((STATE_ESTABLISHED == socket->state)
2178         || (STATE_RECEIVE_CLOSE_WAIT == socket->state)
2179         || (STATE_RECEIVE_CLOSED == socket->state)))
2180     {
2181       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2182                   "%s: Attempting to write on a closed (OR) not-yet-established"
2183                   "stream\n",
2184                   GNUNET_i2s (&socket->our_id));
2185       return NULL;
2186     } 
2187   if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size)
2188     size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH  * max_payload_size;
2189   num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size;
2190   io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
2191   sweep = data;
2192   /* Divide the given buffer into packets for sending */
2193   for (packet=0; packet < num_needed_packets; packet++)
2194     {
2195       if ((packet + 1) * max_payload_size < size) 
2196         {
2197           payload_size = max_payload_size;
2198           packet_size = MAX_PACKET_SIZE;
2199         }
2200       else 
2201         {
2202           payload_size = size - packet * max_payload_size;
2203           packet_size =  payload_size + sizeof (struct
2204                                                 GNUNET_STREAM_DataMessage); 
2205         }
2206       io_handle->messages[packet] = GNUNET_malloc (packet_size);
2207       io_handle->messages[packet]->header.header.size = htons (packet_size);
2208       io_handle->messages[packet]->header.header.type =
2209         htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
2210       io_handle->messages[packet]->sequence_number =
2211         htons (socket->write_sequence_number++);
2212       io_handle->messages[packet]->offset = htons (socket->write_offset);
2213
2214       /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
2215          determined from RTT */
2216       io_handle->messages[packet]->ack_deadline = 
2217         GNUNET_TIME_relative_hton (GNUNET_TIME_relative_multiply 
2218                                    (GNUNET_TIME_UNIT_SECONDS, 5));
2219       data_msg = io_handle->messages[packet];
2220       /* Copy data from given buffer to the packet */
2221       memcpy (&data_msg[1],
2222               sweep,
2223               payload_size);
2224       sweep += payload_size;
2225       socket->write_offset += payload_size;
2226     }
2227   socket->write_handle = io_handle;
2228   write_data (socket);
2229
2230   return io_handle;
2231
2232   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2233               "%s() END\n", __func__);
2234 }
2235
2236
2237 /**
2238  * Tries to read data from the stream
2239  *
2240  * @param socket the socket representing a stream
2241  * @param timeout the timeout period
2242  * @param proc function to call with data (once only)
2243  * @param proc_cls the closure for proc
2244  * @return handle to cancel the operation
2245  */
2246 struct GNUNET_STREAM_IOReadHandle *
2247 GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
2248                     struct GNUNET_TIME_Relative timeout,
2249                     GNUNET_STREAM_DataProcessor proc,
2250                     void *proc_cls)
2251 {
2252   struct GNUNET_STREAM_IOReadHandle *read_handle;
2253   
2254   /* Return NULL if there is already a read handle; the user has to cancel that
2255   first before continuing or has to wait until it is completed */
2256   if (NULL != socket->read_handle) return NULL;
2257
2258   read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
2259   read_handle->proc = proc;
2260   socket->read_handle = read_handle;
2261
2262   /* Check if we have a packet at bitmap 0 */
2263   if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
2264                                           0))
2265     {
2266       socket->read_task = GNUNET_SCHEDULER_add_now (&call_read_processor,
2267                                                     socket);
2268    
2269     }
2270   
2271   /* Setup the read timeout task */
2272   socket->read_io_timeout_task = GNUNET_SCHEDULER_add_delayed (timeout,
2273                                                                &read_io_timeout,
2274                                                                socket);
2275   return read_handle;
2276 }
2277
2278
2279 /**
2280  * Cancel pending write operation.
2281  *
2282  * @param ioh handle to operation to cancel
2283  */
2284 void
2285 GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
2286 {
2287   return;
2288 }
2289
2290
2291 /**
2292  * Cancel pending read operation.
2293  *
2294  * @param ioh handle to operation to cancel
2295  */
2296 void
2297 GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh)
2298 {
2299   return;
2300 }