19a2d918dd1005a32aca8bb634b03eabd244d3c1
[oweals/gnunet.git] / src / stream / stream_api.c
1 /*
2   This file is part of GNUnet.
3   (C) 2012 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /* TODO:
22  * Copy MESH handle from lsocket to socket
23  *
24  * Checks for matching the sender and socket->other_peer in server
25  * message handlers  */
26
27 /**
28  * @file stream/stream_api.c
29  * @brief Implementation of the stream library
30  * @author Sree Harsha Totakura
31  */
32 #include "platform.h"
33 #include "gnunet_common.h"
34 #include "gnunet_crypto_lib.h"
35 #include "gnunet_stream_lib.h"
36 #include "gnunet_testing_lib.h"
37 #include "stream_protocol.h"
38
39
40 /**
41  * The maximum packet size of a stream packet
42  */
43 #define MAX_PACKET_SIZE 64000
44
45 /**
46  * The maximum payload a data message packet can carry
47  */
48 static size_t max_payload_size = 
49   MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
50
51 /**
52  * Receive buffer
53  */
54 #define RECEIVE_BUFFER_SIZE 4096000
55
56 /**
57  * states in the Protocol
58  */
59 enum State
60   {
61     /**
62      * Client initialization state
63      */
64     STATE_INIT,
65
66     /**
67      * Listener initialization state 
68      */
69     STATE_LISTEN,
70
71     /**
72      * Pre-connection establishment state
73      */
74     STATE_HELLO_WAIT,
75
76     /**
77      * State where a connection has been established
78      */
79     STATE_ESTABLISHED,
80
81     /**
82      * State where the socket is closed on our side and waiting to be ACK'ed
83      */
84     STATE_RECEIVE_CLOSE_WAIT,
85
86     /**
87      * State where the socket is closed for reading
88      */
89     STATE_RECEIVE_CLOSED,
90
91     /**
92      * State where the socket is closed on our side and waiting to be ACK'ed
93      */
94     STATE_TRANSMIT_CLOSE_WAIT,
95
96     /**
97      * State where the socket is closed for writing
98      */
99     STATE_TRANSMIT_CLOSED,
100
101     /**
102      * State where the socket is closed on our side and waiting to be ACK'ed
103      */
104     STATE_CLOSE_WAIT,
105
106     /**
107      * State where the socket is closed
108      */
109     STATE_CLOSED 
110   };
111
112
113 /**
114  * Functions of this type are called when a message is written
115  *
116  * @param cls the closure from queue_message
117  * @param socket the socket the written message was bound to
118  */
119 typedef void (*SendFinishCallback) (void *cls,
120                                     struct GNUNET_STREAM_Socket *socket);
121
122
123 /**
124  * The send message queue
125  */
126 struct MessageQueue
127 {
128   /**
129    * The message
130    */
131   struct GNUNET_STREAM_MessageHeader *message;
132
133   /**
134    * Callback to be called when the message is sent
135    */
136   SendFinishCallback finish_cb;
137
138   /**
139    * The closure for finish_cb
140    */
141   void *finish_cb_cls;
142
143   /**
144    * The next message in queue. Should be NULL in the last message
145    */
146   struct MessageQueue *next;
147
148   /**
149    * The next message in queue. Should be NULL in the first message
150    */
151   struct MessageQueue *prev;
152 };
153
154
155 /**
156  * The STREAM Socket Handler
157  */
158 struct GNUNET_STREAM_Socket
159 {
160   /**
161    * The peer identity of the peer at the other end of the stream
162    */
163   struct GNUNET_PeerIdentity other_peer;
164
165   /**
166    * Our Peer Identity (for debugging)
167    */
168   struct GNUNET_PeerIdentity our_id;
169
170   /**
171    * Retransmission timeout
172    */
173   struct GNUNET_TIME_Relative retransmit_timeout;
174
175   /**
176    * The Acknowledgement Bitmap
177    */
178   GNUNET_STREAM_AckBitmap ack_bitmap;
179
180   /**
181    * Time when the Acknowledgement was queued
182    */
183   struct GNUNET_TIME_Absolute ack_time_registered;
184
185   /**
186    * Queued Acknowledgement deadline
187    */
188   struct GNUNET_TIME_Relative ack_time_deadline;
189
190   /**
191    * The task for sending timely Acks
192    */
193   GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
194
195   /**
196    * Task scheduled to continue a read operation.
197    */
198   GNUNET_SCHEDULER_TaskIdentifier read_task;
199
200   /**
201    * The mesh handle
202    */
203   struct GNUNET_MESH_Handle *mesh;
204
205   /**
206    * The mesh tunnel handle
207    */
208   struct GNUNET_MESH_Tunnel *tunnel;
209
210   /**
211    * Stream open closure
212    */
213   void *open_cls;
214
215   /**
216    * Stream open callback
217    */
218   GNUNET_STREAM_OpenCallback open_cb;
219
220   /**
221    * The current transmit handle (if a pending transmit request exists)
222    */
223   struct GNUNET_MESH_TransmitHandle *transmit_handle;
224
225   /**
226    * The current message associated with the transmit handle
227    */
228   struct MessageQueue *queue_head;
229
230   /**
231    * The queue tail, should always point to the last message in queue
232    */
233   struct MessageQueue *queue_tail;
234
235   /**
236    * The write IO_handle associated with this socket
237    */
238   struct GNUNET_STREAM_IOWriteHandle *write_handle;
239
240   /**
241    * The read IO_handle associated with this socket
242    */
243   struct GNUNET_STREAM_IOReadHandle *read_handle;
244
245   /**
246    * Buffer for storing received messages
247    */
248   void *receive_buffer;
249
250   /**
251    * Task identifier for the read io timeout task
252    */
253   GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task;
254
255   /**
256    * Task identifier for retransmission task after timeout
257    */
258   GNUNET_SCHEDULER_TaskIdentifier retransmission_timeout_task_id;
259
260   /**
261    * The state of the protocol associated with this socket
262    */
263   enum State state;
264
265   /**
266    * The status of the socket
267    */
268   enum GNUNET_STREAM_Status status;
269
270   /**
271    * The number of previous timeouts; FIXME: currently not used
272    */
273   unsigned int retries;
274
275   /**
276    * Is this socket derived from listen socket?
277    */
278   unsigned int derived;
279   
280   /**
281    * The application port number (type: uint32_t)
282    */
283   GNUNET_MESH_ApplicationType app_port;
284
285   /**
286    * The session id associated with this stream connection
287    * FIXME: Not used currently, may be removed
288    */
289   uint32_t session_id;
290
291   /**
292    * Write sequence number. Set to random when sending HELLO(client) and
293    * HELLO_ACK(server) 
294    */
295   uint32_t write_sequence_number;
296
297   /**
298    * Read sequence number. This number's value is determined during handshake
299    */
300   uint32_t read_sequence_number;
301
302   /**
303    * The receiver buffer size
304    */
305   uint32_t receive_buffer_size;
306
307   /**
308    * The receiver buffer boundaries
309    */
310   uint32_t receive_buffer_boundaries[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
311
312   /**
313    * receiver's available buffer after the last acknowledged packet
314    */
315   uint32_t receive_window_available;
316
317   /**
318    * The offset pointer used during write operation
319    */
320   uint32_t write_offset;
321
322   /**
323    * The offset after which we are expecting data
324    */
325   uint32_t read_offset;
326
327   /**
328    * The offset upto which user has read from the received buffer
329    */
330   uint32_t copy_offset;
331 };
332
333
334 /**
335  * A socket for listening
336  */
337 struct GNUNET_STREAM_ListenSocket
338 {
339
340   /**
341    * Our Peer's identity
342    */
343   struct GNUNET_PeerIdentity our_id;
344
345   /**
346    * The mesh handle
347    */
348   struct GNUNET_MESH_Handle *mesh;
349
350   /**
351    * The callback function which is called after successful opening socket
352    */
353   GNUNET_STREAM_ListenCallback listen_cb;
354
355   /**
356    * The call back closure
357    */
358   void *listen_cb_cls;
359
360   /**
361    * The service port
362    * FIXME: Remove if not required!
363    */
364   GNUNET_MESH_ApplicationType port;
365 };
366
367
368 /**
369  * The IO Write Handle
370  */
371 struct GNUNET_STREAM_IOWriteHandle
372 {
373   /**
374    * The packet_buffers associated with this Handle
375    */
376   struct GNUNET_STREAM_DataMessage *messages[64];
377
378   /**
379    * The write continuation callback
380    */
381   GNUNET_STREAM_CompletionContinuation write_cont;
382
383   /**
384    * Write continuation closure
385    */
386   void *write_cont_cls;
387
388   /**
389    * The bitmap of this IOHandle; Corresponding bit for a message is set when
390    * it has been acknowledged by the receiver
391    */
392   GNUNET_STREAM_AckBitmap ack_bitmap;
393
394   /**
395    * Number of bytes in this write handle
396    */
397   size_t size;
398
399   /**
400    * Number of packets sent before waiting for an ack
401    *
402    * FIXME: Do we need this?
403    */
404   unsigned int sent_packets;
405 };
406
407
408 /**
409  * The IO Read Handle
410  */
411 struct GNUNET_STREAM_IOReadHandle
412 {
413   /**
414    * Callback for the read processor
415    */
416   GNUNET_STREAM_DataProcessor proc;
417
418   /**
419    * The closure pointer for the read processor callback
420    */
421   void *proc_cls;
422 };
423
424
425 /**
426  * Default value in seconds for various timeouts
427  */
428 static unsigned int default_timeout = 10;
429
430
431 /**
432  * Callback function for sending hello message
433  *
434  * @param cls closure the socket
435  * @param size number of bytes available in buf
436  * @param buf where the callee should write the message
437  * @return number of bytes written to buf
438  */
439 static size_t
440 send_message_notify (void *cls, size_t size, void *buf)
441 {
442   struct GNUNET_STREAM_Socket *socket = cls;
443   struct MessageQueue *head;
444   size_t ret;
445
446   socket->transmit_handle = NULL; /* Remove the transmit handle */
447   head = socket->queue_head;
448   if (NULL == head)
449     return 0; /* just to be safe */
450   if (0 == size)                /* request timed out */
451     {
452       socket->retries++;
453       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
454                   "Message sending timed out. Retry %d \n",
455                   socket->retries);
456       socket->transmit_handle = 
457         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
458                                            0, /* Corking */
459                                            1, /* Priority */
460                                            /* FIXME: exponential backoff */
461                                            socket->retransmit_timeout,
462                                            &socket->other_peer,
463                                            ntohs (head->message->header.size),
464                                            &send_message_notify,
465                                            socket);
466       return 0;
467     }
468
469   ret = ntohs (head->message->header.size);
470   GNUNET_assert (size >= ret);
471   memcpy (buf, head->message, ret);
472   if (NULL != head->finish_cb)
473     {
474       head->finish_cb (head->finish_cb_cls, socket);
475     }
476   GNUNET_CONTAINER_DLL_remove (socket->queue_head,
477                                socket->queue_tail,
478                                head);
479   GNUNET_free (head->message);
480   GNUNET_free (head);
481   head = socket->queue_head;
482   if (NULL != head)    /* more pending messages to send */
483     {
484       socket->retries = 0;
485       socket->transmit_handle = 
486         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
487                                            0, /* Corking */
488                                            1, /* Priority */
489                                            /* FIXME: exponential backoff */
490                                            socket->retransmit_timeout,
491                                            &socket->other_peer,
492                                            ntohs (head->message->header.size),
493                                            &send_message_notify,
494                                            socket);
495     }
496   return ret;
497 }
498
499
500 /**
501  * Queues a message for sending using the mesh connection of a socket
502  *
503  * @param socket the socket whose mesh connection is used
504  * @param message the message to be sent
505  * @param finish_cb the callback to be called when the message is sent
506  * @param finish_cb_cls the closure for the callback
507  */
508 static void
509 queue_message (struct GNUNET_STREAM_Socket *socket,
510                struct GNUNET_STREAM_MessageHeader *message,
511                SendFinishCallback finish_cb,
512                void *finish_cb_cls)
513 {
514   struct MessageQueue *queue_entity;
515
516   GNUNET_assert 
517     ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA)
518      && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK));
519
520   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
521               "%s: Queueing message of type %d and size %d\n",
522               GNUNET_i2s (&socket->our_id),
523               ntohs (message->header.type),
524               ntohs (message->header.size));
525   GNUNET_assert (NULL != message);
526   queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
527   queue_entity->message = message;
528   queue_entity->finish_cb = finish_cb;
529   queue_entity->finish_cb_cls = finish_cb_cls;
530   GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
531                                     socket->queue_tail,
532                                     queue_entity);
533   if (NULL == socket->transmit_handle)
534   {
535     socket->retries = 0;
536     socket->transmit_handle = 
537       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
538                                          0, /* Corking */
539                                          1, /* Priority */
540                                          socket->retransmit_timeout,
541                                          &socket->other_peer,
542                                          ntohs (message->header.size),
543                                          &send_message_notify,
544                                          socket);
545   }
546 }
547
548
549 /**
550  * Copies a message and queues it for sending using the mesh connection of
551  * given socket 
552  *
553  * @param socket the socket whose mesh connection is used
554  * @param message the message to be sent
555  * @param finish_cb the callback to be called when the message is sent
556  * @param finish_cb_cls the closure for the callback
557  */
558 static void
559 copy_and_queue_message (struct GNUNET_STREAM_Socket *socket,
560                         const struct GNUNET_STREAM_MessageHeader *message,
561                         SendFinishCallback finish_cb,
562                         void *finish_cb_cls)
563 {
564   struct GNUNET_STREAM_MessageHeader *msg_copy;
565   uint16_t size;
566   
567   size = ntohs (message->header.size);
568   msg_copy = GNUNET_malloc (size);
569   memcpy (msg_copy, message, size);
570   queue_message (socket, msg_copy, finish_cb, finish_cb_cls);
571 }
572
573
574 /**
575  * Callback function for sending ack message
576  *
577  * @param cls closure the ACK message created in ack_task
578  * @param size number of bytes available in buffer
579  * @param buf where the callee should write the message
580  * @return number of bytes written to buf
581  */
582 static size_t
583 send_ack_notify (void *cls, size_t size, void *buf)
584 {
585   struct GNUNET_STREAM_AckMessage *ack_msg = cls;
586
587   if (0 == size)
588     {
589       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
590                   "%s called with size 0\n", __func__);
591       return 0;
592     }
593   GNUNET_assert (ack_msg->header.header.size <= size);
594   
595   size = ack_msg->header.header.size;
596   memcpy (buf, ack_msg, size);
597   return size;
598 }
599
600 /**
601  * Writes data using the given socket. The amount of data written is limited by
602  * the receive_window_size
603  *
604  * @param socket the socket to use
605  */
606 static void 
607 write_data (struct GNUNET_STREAM_Socket *socket);
608
609 /**
610  * Task for retransmitting data messages if they aren't ACK before their ack
611  * deadline 
612  *
613  * @param cls the socket
614  * @param tc the Task context
615  */
616 static void
617 retransmission_timeout_task (void *cls,
618                              const struct GNUNET_SCHEDULER_TaskContext *tc)
619 {
620   struct GNUNET_STREAM_Socket *socket = cls;
621   
622   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
623     return;
624
625   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
626               "%s: Retransmitting DATA...\n", GNUNET_i2s (&socket->our_id));
627   socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
628   write_data (socket);
629 }
630
631
632 /**
633  * Task for sending ACK message
634  *
635  * @param cls the socket
636  * @param tc the Task context
637  */
638 static void
639 ack_task (void *cls,
640           const struct GNUNET_SCHEDULER_TaskContext *tc)
641 {
642   struct GNUNET_STREAM_Socket *socket = cls;
643   struct GNUNET_STREAM_AckMessage *ack_msg;
644
645   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
646     {
647       return;
648     }
649
650   socket->ack_task_id = 0;
651
652   /* Create the ACK Message */
653   ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
654   ack_msg->header.header.size = htons (sizeof (struct 
655                                                GNUNET_STREAM_AckMessage));
656   ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
657   ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
658   ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
659   ack_msg->receive_window_remaining = 
660     htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
661
662   /* Request MESH for sending ACK */
663   GNUNET_MESH_notify_transmit_ready (socket->tunnel,
664                                      0, /* Corking */
665                                      1, /* Priority */
666                                      socket->retransmit_timeout,
667                                      &socket->other_peer,
668                                      ntohs (ack_msg->header.header.size),
669                                      &send_ack_notify,
670                                      ack_msg);
671
672   
673 }
674
675
676 /**
677  * Function to modify a bit in GNUNET_STREAM_AckBitmap
678  *
679  * @param bitmap the bitmap to modify
680  * @param bit the bit number to modify
681  * @param value GNUNET_YES to on, GNUNET_NO to off
682  */
683 static void
684 ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
685                       unsigned int bit, 
686                       int value)
687 {
688   GNUNET_assert (bit < 64);
689   if (GNUNET_YES == value)
690     *bitmap |= (1LL << bit);
691   else
692     *bitmap &= ~(1LL << bit);
693 }
694
695
696 /**
697  * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
698  *
699  * @param bitmap address of the bitmap that has to be checked
700  * @param bit the bit number to check
701  * @return GNUNET_YES if the bit is set; GNUNET_NO if not
702  */
703 static uint8_t
704 ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
705                       unsigned int bit)
706 {
707   GNUNET_assert (bit < 64);
708   return 0 != (*bitmap & (1LL << bit));
709 }
710
711
712
713 /**
714  * Function called when Data Message is sent
715  *
716  * @param cls the io_handle corresponding to the Data Message
717  * @param socket the socket which was used
718  */
719 static void
720 write_data_finish_cb (void *cls,
721                       struct GNUNET_STREAM_Socket *socket)
722 {
723   struct GNUNET_STREAM_IOWriteHandle *io_handle = cls;
724
725   io_handle->sent_packets++;
726 }
727
728
729 /**
730  * Writes data using the given socket. The amount of data written is limited by
731  * the receive_window_size
732  *
733  * @param socket the socket to use
734  */
735 static void 
736 write_data (struct GNUNET_STREAM_Socket *socket)
737 {
738   struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
739   int packet;                   /* Although an int, should never be negative */
740   int ack_packet;
741
742   ack_packet = -1;
743   /* Find the last acknowledged packet */
744   for (packet=0; packet < 64; packet++)
745     {      
746       if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
747                                               packet))
748         ack_packet = packet;        
749       else if (NULL == io_handle->messages[packet])
750         break;
751     }
752   /* Resend packets which weren't ack'ed */
753   for (packet=0; packet < ack_packet; packet++)
754     {
755       if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
756                                              packet))
757         {
758           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
759                       "%s: Placing DATA message with sequence %u in send",
760                       "queue\n",
761                       GNUNET_i2s (&socket->our_id),
762                       (unsigned int) 
763                       io_handle->messages[packet]->sequence_number);
764
765           copy_and_queue_message (socket,
766                                   &io_handle->messages[packet]->header,
767                                   NULL,
768                                   NULL);
769         }
770     }
771   packet = ack_packet + 1;
772   /* Now send new packets if there is enough buffer space */
773   while ( (NULL != io_handle->messages[packet]) &&
774           (socket->receive_window_available >= ntohs (io_handle->messages[packet]->header.header.size)) )
775     {
776       socket->receive_window_available -= 
777         ntohs (io_handle->messages[packet]->header.header.size);
778       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
779                   "%s: Placing DATA message with sequence %u in send",
780                   "queue\n",
781                   GNUNET_i2s (&socket->our_id),
782                   (unsigned int) 
783                   io_handle->messages[packet]->sequence_number);
784       copy_and_queue_message (socket,
785                               &io_handle->messages[packet]->header,
786                               &write_data_finish_cb,
787                               io_handle);
788       packet++;
789     }
790
791   GNUNET_SCHEDULER_add_delayed
792     (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5),
793      &retransmission_timeout_task,
794      socket);                                
795 }
796
797
798 /**
799  * Task for calling the read processor
800  *
801  * @param cls the socket
802  * @param tc the task context
803  */
804 static void
805 call_read_processor (void *cls,
806                           const struct GNUNET_SCHEDULER_TaskContext *tc)
807 {
808   struct GNUNET_STREAM_Socket *socket = cls;
809   size_t read_size;
810   size_t valid_read_size;
811   unsigned int packet;
812   uint32_t sequence_increase;
813   uint32_t offset_increase;
814
815   socket->read_task = GNUNET_SCHEDULER_NO_TASK;
816   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
817     return;
818
819   GNUNET_assert (NULL != socket->read_handle);
820   GNUNET_assert (NULL != socket->read_handle->proc);
821
822   /* Check the bitmap for any holes */
823   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
824     {
825       if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
826                                              packet))
827         break;
828     }
829   /* We only call read processor if we have the first packet */
830   GNUNET_assert (0 < packet);
831
832   valid_read_size = 
833     socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
834
835   GNUNET_assert (0 != valid_read_size);
836
837   /* Cancel the read_io_timeout_task */
838   GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task);
839   socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK;
840
841   /* Call the data processor */
842   read_size = 
843     socket->read_handle->proc (socket->read_handle->proc_cls,
844                                socket->status,
845                                socket->receive_buffer + socket->copy_offset,
846                                valid_read_size);
847   /* Free the read handle */
848   GNUNET_free (socket->read_handle);
849   socket->read_handle = NULL;
850
851   GNUNET_assert (read_size <= valid_read_size);
852   socket->copy_offset += read_size;
853
854   /* Determine upto which packet we can remove from the buffer */
855   for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
856     if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
857       break;
858
859   /* If no packets can be removed we can't move the buffer */
860   if (0 == packet) return;
861
862   sequence_increase = packet;
863
864   /* Shift the data in the receive buffer */
865   memmove (socket->receive_buffer,
866            socket->receive_buffer 
867            + socket->receive_buffer_boundaries[sequence_increase-1],
868            socket->receive_buffer_size - socket->receive_buffer_boundaries[sequence_increase-1]);
869   
870   /* Shift the bitmap */
871   socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
872   
873   /* Set read_sequence_number */
874   socket->read_sequence_number += sequence_increase;
875   
876   /* Set read_offset */
877   offset_increase = socket->receive_buffer_boundaries[sequence_increase-1];
878   socket->read_offset += offset_increase;
879
880   /* Fix copy_offset */
881   GNUNET_assert (offset_increase <= socket->copy_offset);
882   socket->copy_offset -= offset_increase;
883   
884   /* Fix relative boundaries */
885   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
886     {
887       if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase)
888         {
889           socket->receive_buffer_boundaries[packet] = 
890             socket->receive_buffer_boundaries[packet + sequence_increase] 
891             - offset_increase;
892         }
893       else
894         socket->receive_buffer_boundaries[packet] = 0;
895     }
896 }
897
898
899 /**
900  * Cancels the existing read io handle
901  *
902  * @param cls the closure from the SCHEDULER call
903  * @param tc the task context
904  */
905 static void
906 read_io_timeout (void *cls, 
907                 const struct GNUNET_SCHEDULER_TaskContext *tc)
908 {
909   struct GNUNET_STREAM_Socket *socket = cls;
910
911   socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK;
912   if (socket->read_task != GNUNET_SCHEDULER_NO_TASK)
913   {
914     GNUNET_SCHEDULER_cancel (socket->read_task);
915     socket->read_task = GNUNET_SCHEDULER_NO_TASK;
916   }
917   GNUNET_assert (NULL != socket->read_handle);
918   
919   GNUNET_free (socket->read_handle);
920   socket->read_handle = NULL;
921 }
922
923
924 /**
925  * Handler for DATA messages; Same for both client and server
926  *
927  * @param socket the socket through which the ack was received
928  * @param tunnel connection to the other end
929  * @param sender who sent the message
930  * @param msg the data message
931  * @param atsi performance data for the connection
932  * @return GNUNET_OK to keep the connection open,
933  *         GNUNET_SYSERR to close it (signal serious error)
934  */
935 static int
936 handle_data (struct GNUNET_STREAM_Socket *socket,
937              struct GNUNET_MESH_Tunnel *tunnel,
938              const struct GNUNET_PeerIdentity *sender,
939              const struct GNUNET_STREAM_DataMessage *msg,
940              const struct GNUNET_ATS_Information*atsi)
941 {
942   const void *payload;
943   uint32_t bytes_needed;
944   uint32_t relative_offset;
945   uint32_t relative_sequence_number;
946   uint16_t size;
947
948   size = htons (msg->header.header.size);
949   if (size < sizeof (struct GNUNET_STREAM_DataMessage))
950     {
951       GNUNET_break_op (0);
952       return GNUNET_SYSERR;
953     }
954
955   switch (socket->state)
956     {
957     case STATE_ESTABLISHED:
958     case STATE_TRANSMIT_CLOSED:
959     case STATE_TRANSMIT_CLOSE_WAIT:
960
961       /* check if the message's sequence number is in the range we are
962          expecting */
963       relative_sequence_number = 
964         ntohl (msg->sequence_number) - socket->read_sequence_number;
965       if ( relative_sequence_number > 64)
966         {
967           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
968                       "%s: Ignoring received message with sequence number %u\n",
969                       GNUNET_i2s (&socket->our_id),
970                       ntohl (msg->sequence_number));
971           return GNUNET_YES;
972         }
973
974       /* Check if we have to allocate the buffer */
975       size -= sizeof (struct GNUNET_STREAM_DataMessage);
976       relative_offset = ntohl (msg->offset) - socket->read_offset;
977       bytes_needed = relative_offset + size;
978       
979       if (bytes_needed > socket->receive_buffer_size)
980         {
981           if (bytes_needed <= RECEIVE_BUFFER_SIZE)
982             {
983               socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
984                                                        bytes_needed);
985               socket->receive_buffer_size = bytes_needed;
986             }
987           else
988             {
989               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
990                           "Cannot accommodate packet %d as buffer is full\n",
991                           ntohl (msg->sequence_number));
992               return GNUNET_YES;
993             }
994         }
995       
996       /* Copy Data to buffer */
997       payload = &msg[1];
998       GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
999       memcpy (socket->receive_buffer + relative_offset,
1000               payload,
1001               size);
1002       socket->receive_buffer_boundaries[relative_sequence_number] = 
1003         relative_offset + size;
1004       
1005       /* Modify the ACK bitmap */
1006       ackbitmap_modify_bit (&socket->ack_bitmap,
1007                             relative_sequence_number,
1008                             GNUNET_YES);
1009
1010       /* Start ACK sending task if one is not already present */
1011       if (0 == socket->ack_task_id)
1012        {
1013          socket->ack_task_id = 
1014            GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1015                                          (msg->ack_deadline),
1016                                          &ack_task,
1017                                          socket);
1018        }
1019
1020       if ((NULL != socket->read_handle) /* A read handle is waiting */
1021           /* There is no current read task */
1022           && (GNUNET_SCHEDULER_NO_TASK == socket->read_task)
1023           /* We have the first packet */
1024           && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap,
1025                                                  0)))
1026         {
1027           socket->read_task = 
1028             GNUNET_SCHEDULER_add_now (&call_read_processor,
1029                                       socket);
1030         }
1031       
1032       break;
1033
1034     default:
1035       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1036                   "%s: Received data message when it cannot be handled\n",
1037                   GNUNET_i2s (&socket->our_id));
1038       break;
1039     }
1040   return GNUNET_YES;
1041 }
1042
1043 /**
1044  * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1045  *
1046  * @param cls the socket (set from GNUNET_MESH_connect)
1047  * @param tunnel connection to the other end
1048  * @param tunnel_ctx place to store local state associated with the tunnel
1049  * @param sender who sent the message
1050  * @param message the actual message
1051  * @param atsi performance data for the connection
1052  * @return GNUNET_OK to keep the connection open,
1053  *         GNUNET_SYSERR to close it (signal serious error)
1054  */
1055 static int
1056 client_handle_data (void *cls,
1057              struct GNUNET_MESH_Tunnel *tunnel,
1058              void **tunnel_ctx,
1059              const struct GNUNET_PeerIdentity *sender,
1060              const struct GNUNET_MessageHeader *message,
1061              const struct GNUNET_ATS_Information*atsi)
1062 {
1063   struct GNUNET_STREAM_Socket *socket = cls;
1064
1065   return handle_data (socket, 
1066                       tunnel, 
1067                       sender, 
1068                       (const struct GNUNET_STREAM_DataMessage *) message, 
1069                       atsi);
1070 }
1071
1072
1073 /**
1074  * Callback to set state to ESTABLISHED
1075  *
1076  * @param cls the closure from queue_message FIXME: document
1077  * @param socket the socket to requiring state change
1078  */
1079 static void
1080 set_state_established (void *cls,
1081                        struct GNUNET_STREAM_Socket *socket)
1082 {
1083   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
1084               "%s: Attaining ESTABLISHED state\n",
1085               GNUNET_i2s (&socket->our_id));
1086   socket->write_offset = 0;
1087   socket->read_offset = 0;
1088   socket->state = STATE_ESTABLISHED;
1089   if (socket->open_cb)
1090     socket->open_cb (socket->open_cls, socket);
1091 }
1092
1093
1094 /**
1095  * Callback to set state to HELLO_WAIT
1096  *
1097  * @param cls the closure from queue_message
1098  * @param socket the socket to requiring state change
1099  */
1100 static void
1101 set_state_hello_wait (void *cls,
1102                       struct GNUNET_STREAM_Socket *socket)
1103 {
1104   GNUNET_assert (STATE_INIT == socket->state);
1105   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
1106               "%s: Attaining HELLO_WAIT state\n",
1107               GNUNET_i2s (&socket->our_id));
1108   socket->state = STATE_HELLO_WAIT;
1109 }
1110
1111
1112 /**
1113  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1114  *
1115  * @param cls the socket (set from GNUNET_MESH_connect)
1116  * @param tunnel connection to the other end
1117  * @param tunnel_ctx this is NULL
1118  * @param sender who sent the message
1119  * @param message the actual message
1120  * @param atsi performance data for the connection
1121  * @return GNUNET_OK to keep the connection open,
1122  *         GNUNET_SYSERR to close it (signal serious error)
1123  */
1124 static int
1125 client_handle_hello_ack (void *cls,
1126                          struct GNUNET_MESH_Tunnel *tunnel,
1127                          void **tunnel_ctx,
1128                          const struct GNUNET_PeerIdentity *sender,
1129                          const struct GNUNET_MessageHeader *message,
1130                          const struct GNUNET_ATS_Information*atsi)
1131 {
1132   struct GNUNET_STREAM_Socket *socket = cls;
1133   const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
1134   struct GNUNET_STREAM_HelloAckMessage *reply;
1135
1136   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK == 
1137                  ntohs (message->type));
1138   ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
1139   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1140               "%s: Received HELLO_ACK from %s\n",
1141               GNUNET_i2s (&socket->our_id),
1142               GNUNET_i2s (sender));
1143
1144   GNUNET_assert (socket->tunnel == tunnel);
1145   switch (socket->state)
1146   {
1147   case STATE_HELLO_WAIT:
1148     socket->read_sequence_number = ntohl (ack_msg->sequence_number);
1149     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1150                 "%s: Read sequence number %u\n",
1151                 GNUNET_i2s (&socket->our_id),
1152                 (unsigned int) socket->read_sequence_number);
1153     socket->receive_window_available = ntohl (ack_msg->receive_window_size);
1154     /* Get the random sequence number */
1155     socket->write_sequence_number = 
1156       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1157       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1158                   "%s: Generated write sequence number %u\n",
1159                   GNUNET_i2s (&socket->our_id),
1160                   (unsigned int) socket->write_sequence_number);
1161     reply = 
1162       GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1163     reply->header.header.size = 
1164       htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1165     reply->header.header.type = 
1166       htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1167     reply->sequence_number = htonl (socket->write_sequence_number);
1168     reply->receive_window_size = htonl (RECEIVE_BUFFER_SIZE);
1169     queue_message (socket, 
1170                    &reply->header, 
1171                    &set_state_established, 
1172                    NULL);      
1173     return GNUNET_OK;
1174   case STATE_ESTABLISHED:
1175   case STATE_RECEIVE_CLOSE_WAIT:
1176     // call statistics (# ACKs ignored++)
1177     return GNUNET_OK;
1178   case STATE_INIT:
1179   default:
1180     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1181                 "%s: Server sent HELLO_ACK when in state %d\n", 
1182                 GNUNET_i2s (&socket->our_id),
1183                 socket->state);
1184     socket->state = STATE_CLOSED; // introduce STATE_ERROR?
1185     return GNUNET_SYSERR;
1186   }
1187
1188 }
1189
1190
1191 /**
1192  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1193  *
1194  * @param cls the socket (set from GNUNET_MESH_connect)
1195  * @param tunnel connection to the other end
1196  * @param tunnel_ctx this is NULL
1197  * @param sender who sent the message
1198  * @param message the actual message
1199  * @param atsi performance data for the connection
1200  * @return GNUNET_OK to keep the connection open,
1201  *         GNUNET_SYSERR to close it (signal serious error)
1202  */
1203 static int
1204 client_handle_reset (void *cls,
1205                      struct GNUNET_MESH_Tunnel *tunnel,
1206                      void **tunnel_ctx,
1207                      const struct GNUNET_PeerIdentity *sender,
1208                      const struct GNUNET_MessageHeader *message,
1209                      const struct GNUNET_ATS_Information*atsi)
1210 {
1211   struct GNUNET_STREAM_Socket *socket = cls;
1212
1213   return GNUNET_OK;
1214 }
1215
1216
1217 /**
1218  * Common message handler for handling TRANSMIT_CLOSE messages
1219  *
1220  * @param socket the socket through which the ack was received
1221  * @param tunnel connection to the other end
1222  * @param sender who sent the message
1223  * @param msg the transmit close message
1224  * @param atsi performance data for the connection
1225  * @return GNUNET_OK to keep the connection open,
1226  *         GNUNET_SYSERR to close it (signal serious error)
1227  */
1228 static int
1229 handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
1230                        struct GNUNET_MESH_Tunnel *tunnel,
1231                        const struct GNUNET_PeerIdentity *sender,
1232                        const struct GNUNET_STREAM_MessageHeader *msg,
1233                        const struct GNUNET_ATS_Information*atsi)
1234 {
1235   struct GNUNET_STREAM_MessageHeader *reply;
1236
1237   switch (socket->state)
1238     {
1239     case STATE_ESTABLISHED:
1240       socket->state = STATE_RECEIVE_CLOSED;
1241
1242       /* Send TRANSMIT_CLOSE_ACK */
1243       reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1244       reply->header.type = 
1245         htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
1246       reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1247       queue_message (socket, reply, NULL, NULL);
1248       break;
1249
1250     default:
1251       /* FIXME: Call statistics? */
1252       break;
1253     }
1254   return GNUNET_YES;
1255 }
1256
1257
1258 /**
1259  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1260  *
1261  * @param cls the socket (set from GNUNET_MESH_connect)
1262  * @param tunnel connection to the other end
1263  * @param tunnel_ctx this is NULL
1264  * @param sender who sent the message
1265  * @param message the actual message
1266  * @param atsi performance data for the connection
1267  * @return GNUNET_OK to keep the connection open,
1268  *         GNUNET_SYSERR to close it (signal serious error)
1269  */
1270 static int
1271 client_handle_transmit_close (void *cls,
1272                               struct GNUNET_MESH_Tunnel *tunnel,
1273                               void **tunnel_ctx,
1274                               const struct GNUNET_PeerIdentity *sender,
1275                               const struct GNUNET_MessageHeader *message,
1276                               const struct GNUNET_ATS_Information*atsi)
1277 {
1278   struct GNUNET_STREAM_Socket *socket = cls;
1279   
1280   return handle_transmit_close (socket,
1281                                 tunnel,
1282                                 sender,
1283                                 (struct GNUNET_STREAM_MessageHeader *)message,
1284                                 atsi);
1285 }
1286
1287
1288 /**
1289  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1290  *
1291  * @param cls the socket (set from GNUNET_MESH_connect)
1292  * @param tunnel connection to the other end
1293  * @param tunnel_ctx this is NULL
1294  * @param sender who sent the message
1295  * @param message the actual message
1296  * @param atsi performance data for the connection
1297  * @return GNUNET_OK to keep the connection open,
1298  *         GNUNET_SYSERR to close it (signal serious error)
1299  */
1300 static int
1301 client_handle_transmit_close_ack (void *cls,
1302                                   struct GNUNET_MESH_Tunnel *tunnel,
1303                                   void **tunnel_ctx,
1304                                   const struct GNUNET_PeerIdentity *sender,
1305                                   const struct GNUNET_MessageHeader *message,
1306                                   const struct GNUNET_ATS_Information*atsi)
1307 {
1308   struct GNUNET_STREAM_Socket *socket = cls;
1309
1310   return GNUNET_OK;
1311 }
1312
1313
1314 /**
1315  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1316  *
1317  * @param cls the socket (set from GNUNET_MESH_connect)
1318  * @param tunnel connection to the other end
1319  * @param tunnel_ctx this is NULL
1320  * @param sender who sent the message
1321  * @param message the actual message
1322  * @param atsi performance data for the connection
1323  * @return GNUNET_OK to keep the connection open,
1324  *         GNUNET_SYSERR to close it (signal serious error)
1325  */
1326 static int
1327 client_handle_receive_close (void *cls,
1328                              struct GNUNET_MESH_Tunnel *tunnel,
1329                              void **tunnel_ctx,
1330                              const struct GNUNET_PeerIdentity *sender,
1331                              const struct GNUNET_MessageHeader *message,
1332                              const struct GNUNET_ATS_Information*atsi)
1333 {
1334   struct GNUNET_STREAM_Socket *socket = cls;
1335
1336   return GNUNET_OK;
1337 }
1338
1339
1340 /**
1341  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1342  *
1343  * @param cls the socket (set from GNUNET_MESH_connect)
1344  * @param tunnel connection to the other end
1345  * @param tunnel_ctx this is NULL
1346  * @param sender who sent the message
1347  * @param message the actual message
1348  * @param atsi performance data for the connection
1349  * @return GNUNET_OK to keep the connection open,
1350  *         GNUNET_SYSERR to close it (signal serious error)
1351  */
1352 static int
1353 client_handle_receive_close_ack (void *cls,
1354                                  struct GNUNET_MESH_Tunnel *tunnel,
1355                                  void **tunnel_ctx,
1356                                  const struct GNUNET_PeerIdentity *sender,
1357                                  const struct GNUNET_MessageHeader *message,
1358                                  const struct GNUNET_ATS_Information*atsi)
1359 {
1360   struct GNUNET_STREAM_Socket *socket = cls;
1361
1362   return GNUNET_OK;
1363 }
1364
1365
1366 /**
1367  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1368  *
1369  * @param cls the socket (set from GNUNET_MESH_connect)
1370  * @param tunnel connection to the other end
1371  * @param tunnel_ctx this is NULL
1372  * @param sender who sent the message
1373  * @param message the actual message
1374  * @param atsi performance data for the connection
1375  * @return GNUNET_OK to keep the connection open,
1376  *         GNUNET_SYSERR to close it (signal serious error)
1377  */
1378 static int
1379 client_handle_close (void *cls,
1380                      struct GNUNET_MESH_Tunnel *tunnel,
1381                      void **tunnel_ctx,
1382                      const struct GNUNET_PeerIdentity *sender,
1383                      const struct GNUNET_MessageHeader *message,
1384                      const struct GNUNET_ATS_Information*atsi)
1385 {
1386   struct GNUNET_STREAM_Socket *socket = cls;
1387
1388   return GNUNET_OK;
1389 }
1390
1391
1392 /**
1393  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1394  *
1395  * @param cls the socket (set from GNUNET_MESH_connect)
1396  * @param tunnel connection to the other end
1397  * @param tunnel_ctx this is NULL
1398  * @param sender who sent the message
1399  * @param message the actual message
1400  * @param atsi performance data for the connection
1401  * @return GNUNET_OK to keep the connection open,
1402  *         GNUNET_SYSERR to close it (signal serious error)
1403  */
1404 static int
1405 client_handle_close_ack (void *cls,
1406                          struct GNUNET_MESH_Tunnel *tunnel,
1407                          void **tunnel_ctx,
1408                          const struct GNUNET_PeerIdentity *sender,
1409                          const struct GNUNET_MessageHeader *message,
1410                          const struct GNUNET_ATS_Information*atsi)
1411 {
1412   struct GNUNET_STREAM_Socket *socket = cls;
1413
1414   return GNUNET_OK;
1415 }
1416
1417 /*****************************/
1418 /* Server's Message Handlers */
1419 /*****************************/
1420
1421 /**
1422  * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1423  *
1424  * @param cls the closure
1425  * @param tunnel connection to the other end
1426  * @param tunnel_ctx the socket
1427  * @param sender who sent the message
1428  * @param message the actual message
1429  * @param atsi performance data for the connection
1430  * @return GNUNET_OK to keep the connection open,
1431  *         GNUNET_SYSERR to close it (signal serious error)
1432  */
1433 static int
1434 server_handle_data (void *cls,
1435                     struct GNUNET_MESH_Tunnel *tunnel,
1436                     void **tunnel_ctx,
1437                     const struct GNUNET_PeerIdentity *sender,
1438                     const struct GNUNET_MessageHeader *message,
1439                     const struct GNUNET_ATS_Information*atsi)
1440 {
1441   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1442
1443   return handle_data (socket,
1444                       tunnel,
1445                       sender,
1446                       (const struct GNUNET_STREAM_DataMessage *)message,
1447                       atsi);
1448 }
1449
1450
1451 /**
1452  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
1453  *
1454  * @param cls the closure
1455  * @param tunnel connection to the other end
1456  * @param tunnel_ctx the socket
1457  * @param sender who sent the message
1458  * @param message the actual message
1459  * @param atsi performance data for the connection
1460  * @return GNUNET_OK to keep the connection open,
1461  *         GNUNET_SYSERR to close it (signal serious error)
1462  */
1463 static int
1464 server_handle_hello (void *cls,
1465                      struct GNUNET_MESH_Tunnel *tunnel,
1466                      void **tunnel_ctx,
1467                      const struct GNUNET_PeerIdentity *sender,
1468                      const struct GNUNET_MessageHeader *message,
1469                      const struct GNUNET_ATS_Information*atsi)
1470 {
1471   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1472   struct GNUNET_STREAM_HelloAckMessage *reply;
1473
1474   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == 
1475                  ntohs (message->type));
1476   GNUNET_assert (socket->tunnel == tunnel);
1477   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1478               "%s: Received HELLO from %s\n", 
1479               GNUNET_i2s (&socket->our_id),
1480               GNUNET_i2s(sender));
1481
1482   /* Catch possible protocol breaks */
1483   GNUNET_break_op (0 == memcmp (&socket->other_peer, 
1484                                 sender,
1485                                 sizeof (struct GNUNET_PeerIdentity)));
1486
1487   if (STATE_INIT == socket->state)
1488     {
1489       /* Get the random sequence number */
1490       socket->write_sequence_number = 
1491         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1492       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1493                   "%s: Generated write sequence number %u\n",
1494                   GNUNET_i2s (&socket->our_id),
1495                   (unsigned int) socket->write_sequence_number);
1496       reply = 
1497         GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1498       reply->header.header.size = 
1499         htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1500       reply->header.header.type = 
1501         htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1502       reply->sequence_number = htonl (socket->write_sequence_number);
1503       queue_message (socket, 
1504                      &reply->header,
1505                      &set_state_hello_wait, 
1506                      NULL);
1507     }
1508   else
1509     {
1510       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1511                   "Client sent HELLO when in state %d\n", socket->state);
1512       /* FIXME: Send RESET? */
1513       
1514     }
1515   return GNUNET_OK;
1516 }
1517
1518
1519 /**
1520  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1521  *
1522  * @param cls the closure
1523  * @param tunnel connection to the other end
1524  * @param tunnel_ctx the socket
1525  * @param sender who sent the message
1526  * @param message the actual message
1527  * @param atsi performance data for the connection
1528  * @return GNUNET_OK to keep the connection open,
1529  *         GNUNET_SYSERR to close it (signal serious error)
1530  */
1531 static int
1532 server_handle_hello_ack (void *cls,
1533                          struct GNUNET_MESH_Tunnel *tunnel,
1534                          void **tunnel_ctx,
1535                          const struct GNUNET_PeerIdentity *sender,
1536                          const struct GNUNET_MessageHeader *message,
1537                          const struct GNUNET_ATS_Information*atsi)
1538 {
1539   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1540   const struct GNUNET_STREAM_HelloAckMessage *ack_message;
1541
1542   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK ==
1543                  ntohs (message->type));
1544   GNUNET_assert (socket->tunnel == tunnel);
1545   ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
1546   if (STATE_HELLO_WAIT == socket->state)
1547     {
1548       socket->read_sequence_number = ntohl (ack_message->sequence_number);
1549       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1550                   "%s: Read sequence number %u\n",
1551                   GNUNET_i2s (&socket->our_id),
1552                   (unsigned int) socket->read_sequence_number);
1553       socket->receive_window_available = 
1554         ntohl (ack_message->receive_window_size);
1555       /* Attain ESTABLISHED state */
1556       set_state_established (NULL, socket);
1557     }
1558   else
1559     {
1560       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1561                   "Client sent HELLO_ACK when in state %d\n", socket->state);
1562       /* FIXME: Send RESET? */
1563       
1564     }
1565   return GNUNET_OK;
1566 }
1567
1568
1569 /**
1570  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1571  *
1572  * @param cls the closure
1573  * @param tunnel connection to the other end
1574  * @param tunnel_ctx the socket
1575  * @param sender who sent the message
1576  * @param message the actual message
1577  * @param atsi performance data for the connection
1578  * @return GNUNET_OK to keep the connection open,
1579  *         GNUNET_SYSERR to close it (signal serious error)
1580  */
1581 static int
1582 server_handle_reset (void *cls,
1583                      struct GNUNET_MESH_Tunnel *tunnel,
1584                      void **tunnel_ctx,
1585                      const struct GNUNET_PeerIdentity *sender,
1586                      const struct GNUNET_MessageHeader *message,
1587                      const struct GNUNET_ATS_Information*atsi)
1588 {
1589   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1590
1591   return GNUNET_OK;
1592 }
1593
1594
1595 /**
1596  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1597  *
1598  * @param cls the closure
1599  * @param tunnel connection to the other end
1600  * @param tunnel_ctx the socket
1601  * @param sender who sent the message
1602  * @param message the actual message
1603  * @param atsi performance data for the connection
1604  * @return GNUNET_OK to keep the connection open,
1605  *         GNUNET_SYSERR to close it (signal serious error)
1606  */
1607 static int
1608 server_handle_transmit_close (void *cls,
1609                               struct GNUNET_MESH_Tunnel *tunnel,
1610                               void **tunnel_ctx,
1611                               const struct GNUNET_PeerIdentity *sender,
1612                               const struct GNUNET_MessageHeader *message,
1613                               const struct GNUNET_ATS_Information*atsi)
1614 {
1615   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1616
1617   return handle_transmit_close (socket,
1618                                 tunnel,
1619                                 sender,
1620                                 (struct GNUNET_STREAM_MessageHeader *)message,
1621                                 atsi);
1622 }
1623
1624
1625 /**
1626  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1627  *
1628  * @param cls the closure
1629  * @param tunnel connection to the other end
1630  * @param tunnel_ctx the socket
1631  * @param sender who sent the message
1632  * @param message the actual message
1633  * @param atsi performance data for the connection
1634  * @return GNUNET_OK to keep the connection open,
1635  *         GNUNET_SYSERR to close it (signal serious error)
1636  */
1637 static int
1638 server_handle_transmit_close_ack (void *cls,
1639                                   struct GNUNET_MESH_Tunnel *tunnel,
1640                                   void **tunnel_ctx,
1641                                   const struct GNUNET_PeerIdentity *sender,
1642                                   const struct GNUNET_MessageHeader *message,
1643                                   const struct GNUNET_ATS_Information*atsi)
1644 {
1645   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1646
1647   return GNUNET_OK;
1648 }
1649
1650
1651 /**
1652  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1653  *
1654  * @param cls the closure
1655  * @param tunnel connection to the other end
1656  * @param tunnel_ctx the socket
1657  * @param sender who sent the message
1658  * @param message the actual message
1659  * @param atsi performance data for the connection
1660  * @return GNUNET_OK to keep the connection open,
1661  *         GNUNET_SYSERR to close it (signal serious error)
1662  */
1663 static int
1664 server_handle_receive_close (void *cls,
1665                              struct GNUNET_MESH_Tunnel *tunnel,
1666                              void **tunnel_ctx,
1667                              const struct GNUNET_PeerIdentity *sender,
1668                              const struct GNUNET_MessageHeader *message,
1669                              const struct GNUNET_ATS_Information*atsi)
1670 {
1671   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1672
1673   return GNUNET_OK;
1674 }
1675
1676
1677 /**
1678  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1679  *
1680  * @param cls the closure
1681  * @param tunnel connection to the other end
1682  * @param tunnel_ctx the socket
1683  * @param sender who sent the message
1684  * @param message the actual message
1685  * @param atsi performance data for the connection
1686  * @return GNUNET_OK to keep the connection open,
1687  *         GNUNET_SYSERR to close it (signal serious error)
1688  */
1689 static int
1690 server_handle_receive_close_ack (void *cls,
1691                                  struct GNUNET_MESH_Tunnel *tunnel,
1692                                  void **tunnel_ctx,
1693                                  const struct GNUNET_PeerIdentity *sender,
1694                                  const struct GNUNET_MessageHeader *message,
1695                                  const struct GNUNET_ATS_Information*atsi)
1696 {
1697   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1698
1699   return GNUNET_OK;
1700 }
1701
1702
1703 /**
1704  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1705  *
1706  * @param cls the closure
1707  * @param tunnel connection to the other end
1708  * @param tunnel_ctx the socket
1709  * @param sender who sent the message
1710  * @param message the actual message
1711  * @param atsi performance data for the connection
1712  * @return GNUNET_OK to keep the connection open,
1713  *         GNUNET_SYSERR to close it (signal serious error)
1714  */
1715 static int
1716 server_handle_close (void *cls,
1717                      struct GNUNET_MESH_Tunnel *tunnel,
1718                      void **tunnel_ctx,
1719                      const struct GNUNET_PeerIdentity *sender,
1720                      const struct GNUNET_MessageHeader *message,
1721                      const struct GNUNET_ATS_Information*atsi)
1722 {
1723   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1724
1725   return GNUNET_OK;
1726 }
1727
1728
1729 /**
1730  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1731  *
1732  * @param cls the closure
1733  * @param tunnel connection to the other end
1734  * @param tunnel_ctx the socket
1735  * @param sender who sent the message
1736  * @param message the actual message
1737  * @param atsi performance data for the connection
1738  * @return GNUNET_OK to keep the connection open,
1739  *         GNUNET_SYSERR to close it (signal serious error)
1740  */
1741 static int
1742 server_handle_close_ack (void *cls,
1743                          struct GNUNET_MESH_Tunnel *tunnel,
1744                          void **tunnel_ctx,
1745                          const struct GNUNET_PeerIdentity *sender,
1746                          const struct GNUNET_MessageHeader *message,
1747                          const struct GNUNET_ATS_Information*atsi)
1748 {
1749   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1750
1751   return GNUNET_OK;
1752 }
1753
1754
1755 /**
1756  * Message Handler for mesh
1757  *
1758  * @param socket the socket through which the ack was received
1759  * @param tunnel connection to the other end
1760  * @param sender who sent the message
1761  * @param ack the acknowledgment message
1762  * @param atsi performance data for the connection
1763  * @return GNUNET_OK to keep the connection open,
1764  *         GNUNET_SYSERR to close it (signal serious error)
1765  */
1766 static int
1767 handle_ack (struct GNUNET_STREAM_Socket *socket,
1768             struct GNUNET_MESH_Tunnel *tunnel,
1769             const struct GNUNET_PeerIdentity *sender,
1770             const struct GNUNET_STREAM_AckMessage *ack,
1771             const struct GNUNET_ATS_Information*atsi)
1772 {
1773   unsigned int packet;
1774   int need_retransmission;
1775
1776   switch (socket->state)
1777     {
1778     case (STATE_ESTABLISHED):
1779       if (NULL == socket->write_handle)
1780         {
1781           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1782                       "Received DATA_ACK when write_handle is NULL\n");
1783           return GNUNET_OK;
1784         }
1785       
1786       if (!((socket->write_sequence_number 
1787              - htonl (ack->base_sequence_number)) < 64))
1788         {
1789           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1790                       "%s: Received DATA_ACK with unexpected base sequence",
1791                       "number\n",
1792                       GNUNET_i2s (&socket->our_id));
1793           return GNUNET_OK;
1794         }
1795       /* FIXME: include the case when write_handle is cancelled - ignore the 
1796          acks */
1797       
1798       /* Cancel the retransmission task */
1799       if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
1800         {
1801           GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
1802           socket->retransmission_timeout_task_id = 
1803             GNUNET_SCHEDULER_NO_TASK;
1804         }
1805          
1806       socket->write_handle->ack_bitmap = GNUNET_ntohll (ack->bitmap);
1807       socket->receive_window_available = 
1808         ntohl (ack->receive_window_remaining);
1809
1810       /* Check if we have received all acknowledgements */
1811       need_retransmission = GNUNET_NO;
1812       for (packet=0; packet < 64; packet++)
1813         {
1814           if (NULL == socket->write_handle->messages[packet]) break;
1815           if (GNUNET_YES != ackbitmap_is_bit_set 
1816               (&socket->write_handle->ack_bitmap,packet))
1817             {
1818               need_retransmission = GNUNET_YES;
1819               break;
1820             }
1821         }
1822       if (GNUNET_YES == need_retransmission)
1823         {
1824           write_data (socket);
1825         }
1826       else      /* We have to call the write continuation callback now */
1827         {
1828
1829           /* Free the packets */
1830           for (packet=0; packet < 64; packet++)
1831             {
1832               GNUNET_free_non_null (socket->write_handle->messages[packet]);
1833             }
1834           if (NULL != socket->write_handle->write_cont)
1835             socket->write_handle->write_cont
1836               (socket->write_handle->write_cont_cls,
1837                socket->status,
1838                socket->write_handle->size);
1839           /* We are done with the write handle - Freeing it */
1840           GNUNET_free (socket->write_handle);
1841           socket->write_handle = NULL;
1842         }
1843       break;
1844     default:
1845       break;
1846     }
1847   return GNUNET_OK;
1848 }
1849
1850
1851 /**
1852  * Message Handler for mesh
1853  *
1854  * @param cls the 'struct GNUNET_STREAM_Socket'
1855  * @param tunnel connection to the other end
1856  * @param tunnel_ctx unused
1857  * @param sender who sent the message
1858  * @param message the actual message
1859  * @param atsi performance data for the connection
1860  * @return GNUNET_OK to keep the connection open,
1861  *         GNUNET_SYSERR to close it (signal serious error)
1862  */
1863 static int
1864 client_handle_ack (void *cls,
1865                    struct GNUNET_MESH_Tunnel *tunnel,
1866                    void **tunnel_ctx,
1867                    const struct GNUNET_PeerIdentity *sender,
1868                    const struct GNUNET_MessageHeader *message,
1869                    const struct GNUNET_ATS_Information*atsi)
1870 {
1871   struct GNUNET_STREAM_Socket *socket = cls;
1872   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
1873  
1874   return handle_ack (socket, tunnel, sender, ack, atsi);
1875 }
1876
1877
1878 /**
1879  * Message Handler for mesh
1880  *
1881  * @param cls the server's listen socket
1882  * @param tunnel connection to the other end
1883  * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
1884  * @param sender who sent the message
1885  * @param message the actual message
1886  * @param atsi performance data for the connection
1887  * @return GNUNET_OK to keep the connection open,
1888  *         GNUNET_SYSERR to close it (signal serious error)
1889  */
1890 static int
1891 server_handle_ack (void *cls,
1892                    struct GNUNET_MESH_Tunnel *tunnel,
1893                    void **tunnel_ctx,
1894                    const struct GNUNET_PeerIdentity *sender,
1895                    const struct GNUNET_MessageHeader *message,
1896                    const struct GNUNET_ATS_Information*atsi)
1897 {
1898   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1899   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
1900  
1901   return handle_ack (socket, tunnel, sender, ack, atsi);
1902 }
1903
1904
1905 /**
1906  * For client message handlers, the stream socket is in the
1907  * closure argument.
1908  */
1909 static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
1910   {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
1911   {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
1912    sizeof (struct GNUNET_STREAM_AckMessage) },
1913   {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
1914    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
1915   {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
1916    sizeof (struct GNUNET_STREAM_MessageHeader)},
1917   {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
1918    sizeof (struct GNUNET_STREAM_MessageHeader)},
1919   {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
1920    sizeof (struct GNUNET_STREAM_MessageHeader)},
1921   {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
1922    sizeof (struct GNUNET_STREAM_MessageHeader)},
1923   {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
1924    sizeof (struct GNUNET_STREAM_MessageHeader)},
1925   {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
1926    sizeof (struct GNUNET_STREAM_MessageHeader)},
1927   {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
1928    sizeof (struct GNUNET_STREAM_MessageHeader)},
1929   {NULL, 0, 0}
1930 };
1931
1932
1933 /**
1934  * For server message handlers, the stream socket is in the
1935  * tunnel context, and the listen socket in the closure argument.
1936  */
1937 static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
1938   {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
1939   {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
1940    sizeof (struct GNUNET_STREAM_AckMessage) },
1941   {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 
1942    sizeof (struct GNUNET_STREAM_MessageHeader)},
1943   {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
1944    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
1945   {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
1946    sizeof (struct GNUNET_STREAM_MessageHeader)},
1947   {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
1948    sizeof (struct GNUNET_STREAM_MessageHeader)},
1949   {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
1950    sizeof (struct GNUNET_STREAM_MessageHeader)},
1951   {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
1952    sizeof (struct GNUNET_STREAM_MessageHeader)},
1953   {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
1954    sizeof (struct GNUNET_STREAM_MessageHeader)},
1955   {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
1956    sizeof (struct GNUNET_STREAM_MessageHeader)},
1957   {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
1958    sizeof (struct GNUNET_STREAM_MessageHeader)},
1959   {NULL, 0, 0}
1960 };
1961
1962
1963 /**
1964  * Function called when our target peer is connected to our tunnel
1965  *
1966  * @param cls the socket for which this tunnel is created
1967  * @param peer the peer identity of the target
1968  * @param atsi performance data for the connection
1969  */
1970 static void
1971 mesh_peer_connect_callback (void *cls,
1972                             const struct GNUNET_PeerIdentity *peer,
1973                             const struct GNUNET_ATS_Information * atsi)
1974 {
1975   struct GNUNET_STREAM_Socket *socket = cls;
1976   struct GNUNET_STREAM_MessageHeader *message;
1977
1978   if (0 != memcmp (&socket->other_peer, 
1979                    peer, 
1980                    sizeof (struct GNUNET_PeerIdentity)))
1981     {
1982       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1983                   "%s: A peer (%s) which is not our target has connected",
1984                   "to our tunnel",
1985                   GNUNET_i2s (&socket->our_id),
1986                   GNUNET_i2s (peer));
1987       return;
1988     }
1989   
1990   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1991               "%s: Target peer %s connected\n", 
1992               GNUNET_i2s (&(socket->our_id)),
1993               GNUNET_i2s (&(socket->other_peer)));
1994   
1995   /* Set state to INIT */
1996   socket->state = STATE_INIT;
1997
1998   /* Send HELLO message */
1999   message = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
2000   message->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
2001   message->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
2002   queue_message (socket,
2003                  message,
2004                  &set_state_hello_wait,
2005                  NULL);
2006
2007   /* Call open callback */
2008   if (NULL == socket->open_cb)
2009     {
2010       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2011                   "STREAM_open callback is NULL\n");
2012     }
2013 }
2014
2015
2016 /**
2017  * Function called when our target peer is disconnected from our tunnel
2018  *
2019  * @param cls the socket associated which this tunnel
2020  * @param peer the peer identity of the target
2021  */
2022 static void
2023 mesh_peer_disconnect_callback (void *cls,
2024                                const struct GNUNET_PeerIdentity *peer)
2025 {
2026
2027 }
2028
2029
2030 /**
2031  * Method called whenever a peer creates a tunnel to us
2032  *
2033  * @param cls closure
2034  * @param tunnel new handle to the tunnel
2035  * @param initiator peer that started the tunnel
2036  * @param atsi performance information for the tunnel
2037  * @return initial tunnel context for the tunnel
2038  *         (can be NULL -- that's not an error)
2039  */
2040 static void *
2041 new_tunnel_notify (void *cls,
2042                    struct GNUNET_MESH_Tunnel *tunnel,
2043                    const struct GNUNET_PeerIdentity *initiator,
2044                    const struct GNUNET_ATS_Information *atsi)
2045 {
2046   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2047   struct GNUNET_STREAM_Socket *socket;
2048
2049   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2050               "%s: Peer %s initiated tunnel to us\n", 
2051               GNUNET_i2s (&lsocket->our_id),
2052               GNUNET_i2s (initiator));
2053
2054   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2055   socket->tunnel = tunnel;
2056   socket->session_id = 0;       /* FIXME */
2057   socket->other_peer = *initiator;
2058   socket->state = STATE_INIT;
2059   socket->derived = GNUNET_YES;
2060   socket->our_id = lsocket->our_id;
2061   
2062   /* FIXME: Copy MESH handle from lsocket to socket */
2063
2064   if (GNUNET_SYSERR == lsocket->listen_cb (lsocket->listen_cb_cls,
2065                                            socket,
2066                                            &socket->other_peer))
2067     {
2068       socket->state = STATE_CLOSED;
2069       /* FIXME: Send CLOSE message and then free */
2070       GNUNET_free (socket);
2071       GNUNET_MESH_tunnel_destroy (tunnel); /* Destroy the tunnel */
2072     }
2073   return socket;
2074 }
2075
2076
2077 /**
2078  * Function called whenever an inbound tunnel is destroyed.  Should clean up
2079  * any associated state.  This function is NOT called if the client has
2080  * explicitly asked for the tunnel to be destroyed using
2081  * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
2082  * the tunnel.
2083  *
2084  * @param cls closure (set from GNUNET_MESH_connect)
2085  * @param tunnel connection to the other end (henceforth invalid)
2086  * @param tunnel_ctx place where local state associated
2087  *                   with the tunnel is stored
2088  */
2089 static void 
2090 tunnel_cleaner (void *cls,
2091                 const struct GNUNET_MESH_Tunnel *tunnel,
2092                 void *tunnel_ctx)
2093 {
2094   struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
2095   
2096   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2097               "%s: Peer %s has terminated connection abruptly\n",
2098               GNUNET_i2s (&socket->our_id),
2099               GNUNET_i2s (&socket->other_peer));
2100
2101   socket->status = GNUNET_STREAM_SHUTDOWN;
2102   /* Clear Transmit handles */
2103   if (NULL != socket->transmit_handle)
2104     {
2105       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2106       socket->transmit_handle = NULL;
2107     }
2108   socket->tunnel = NULL;
2109 }
2110
2111
2112 /*****************/
2113 /* API functions */
2114 /*****************/
2115
2116
2117 /**
2118  * Tries to open a stream to the target peer
2119  *
2120  * @param cfg configuration to use
2121  * @param target the target peer to which the stream has to be opened
2122  * @param app_port the application port number which uniquely identifies this
2123  *            stream
2124  * @param open_cb this function will be called after stream has be established 
2125  * @param open_cb_cls the closure for open_cb
2126  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
2127  * @return if successful it returns the stream socket; NULL if stream cannot be
2128  *         opened 
2129  */
2130 struct GNUNET_STREAM_Socket *
2131 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
2132                     const struct GNUNET_PeerIdentity *target,
2133                     GNUNET_MESH_ApplicationType app_port,
2134                     GNUNET_STREAM_OpenCallback open_cb,
2135                     void *open_cb_cls,
2136                     ...)
2137 {
2138   struct GNUNET_STREAM_Socket *socket;
2139   enum GNUNET_STREAM_Option option;
2140   va_list vargs;                /* Variable arguments */
2141
2142   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2143               "%s\n", __func__);
2144
2145   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2146   socket->other_peer = *target;
2147   socket->open_cb = open_cb;
2148   socket->open_cls = open_cb_cls;
2149   GNUNET_TESTING_get_peer_identity (cfg, &socket->our_id);
2150
2151   /* Set defaults */
2152   socket->retransmit_timeout = 
2153     GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
2154
2155   va_start (vargs, open_cb_cls); /* Parse variable args */
2156   do {
2157     option = va_arg (vargs, enum GNUNET_STREAM_Option);
2158     switch (option)
2159       {
2160       case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
2161         /* Expect struct GNUNET_TIME_Relative */
2162         socket->retransmit_timeout = va_arg (vargs,
2163                                              struct GNUNET_TIME_Relative);
2164         break;
2165       case GNUNET_STREAM_OPTION_END:
2166         break;
2167       }
2168   } while (GNUNET_STREAM_OPTION_END != option);
2169   va_end (vargs);               /* End of variable args parsing */
2170   socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
2171                                       10,  /* QUEUE size as parameter? */
2172                                       socket, /* cls */
2173                                       NULL, /* No inbound tunnel handler */
2174                                       &tunnel_cleaner, /* FIXME: not required? */
2175                                       client_message_handlers,
2176                                       &app_port); /* We don't get inbound tunnels */
2177   if (NULL == socket->mesh)   /* Fail if we cannot connect to mesh */
2178     {
2179       GNUNET_free (socket);
2180       return NULL;
2181     }
2182
2183   /* Now create the mesh tunnel to target */
2184   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2185               "Creating MESH Tunnel\n");
2186   socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
2187                                               NULL, /* Tunnel context */
2188                                               &mesh_peer_connect_callback,
2189                                               &mesh_peer_disconnect_callback,
2190                                               socket);
2191   GNUNET_assert (NULL != socket->tunnel);
2192   GNUNET_MESH_peer_request_connect_add (socket->tunnel,
2193                                         target);
2194   
2195   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2196               "%s() END\n", __func__);
2197   return socket;
2198 }
2199
2200
2201 /**
2202  * Shutdown the stream for reading or writing (man 2 shutdown).
2203  *
2204  * @param socket the stream socket
2205  * @param how SHUT_RD, SHUT_WR or SHUT_RDWR 
2206  */
2207 void
2208 GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
2209                         int how)
2210 {
2211   return;
2212 }
2213
2214
2215 /**
2216  * Closes the stream
2217  *
2218  * @param socket the stream socket
2219  */
2220 void
2221 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
2222 {
2223   struct MessageQueue *head;
2224
2225   if (socket->read_task != GNUNET_SCHEDULER_NO_TASK)
2226   {
2227     /* socket closed with read task pending!? */
2228     GNUNET_break (0);
2229     GNUNET_SCHEDULER_cancel (socket->read_task);
2230     socket->read_task = GNUNET_SCHEDULER_NO_TASK;
2231   }
2232
2233   /* Clear Transmit handles */
2234   if (NULL != socket->transmit_handle)
2235     {
2236       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2237       socket->transmit_handle = NULL;
2238     }
2239
2240   /* Clear existing message queue */
2241   while (NULL != (head = socket->queue_head)) {
2242     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
2243                                  socket->queue_tail,
2244                                  head);
2245     GNUNET_free (head->message);
2246     GNUNET_free (head);
2247   }
2248
2249   /* Close associated tunnel */
2250   if (NULL != socket->tunnel)
2251     {
2252       GNUNET_MESH_tunnel_destroy (socket->tunnel);
2253       socket->tunnel = NULL;
2254     }
2255
2256   /* Close mesh connection */
2257   if (NULL != socket->mesh && GNUNET_YES != socket->derived)
2258     {
2259       GNUNET_MESH_disconnect (socket->mesh);
2260       socket->mesh = NULL;
2261     }
2262   
2263   /* Release receive buffer */
2264   if (NULL != socket->receive_buffer)
2265     {
2266       GNUNET_free (socket->receive_buffer);
2267     }
2268
2269   GNUNET_free (socket);
2270 }
2271
2272
2273 /**
2274  * Listens for stream connections for a specific application ports
2275  *
2276  * @param cfg the configuration to use
2277  * @param app_port the application port for which new streams will be accepted
2278  * @param listen_cb this function will be called when a peer tries to establish
2279  *            a stream with us
2280  * @param listen_cb_cls closure for listen_cb
2281  * @return listen socket, NULL for any error
2282  */
2283 struct GNUNET_STREAM_ListenSocket *
2284 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
2285                       GNUNET_MESH_ApplicationType app_port,
2286                       GNUNET_STREAM_ListenCallback listen_cb,
2287                       void *listen_cb_cls)
2288 {
2289   /* FIXME: Add variable args for passing configration options? */
2290   struct GNUNET_STREAM_ListenSocket *lsocket;
2291
2292   lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
2293   lsocket->port = app_port;
2294   lsocket->listen_cb = listen_cb;
2295   lsocket->listen_cb_cls = listen_cb_cls;
2296   GNUNET_TESTING_get_peer_identity (cfg, &lsocket->our_id);
2297   lsocket->mesh = GNUNET_MESH_connect (cfg,
2298                                        10, /* FIXME: QUEUE size as parameter? */
2299                                        lsocket, /* Closure */
2300                                        &new_tunnel_notify,
2301                                        &tunnel_cleaner,
2302                                        server_message_handlers,
2303                                        &app_port);
2304   GNUNET_assert (NULL != lsocket->mesh);
2305   return lsocket;
2306 }
2307
2308
2309 /**
2310  * Closes the listen socket
2311  *
2312  * @param lsocket the listen socket
2313  */
2314 void
2315 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
2316 {
2317   /* Close MESH connection */
2318   GNUNET_assert (NULL != lsocket->mesh);
2319   GNUNET_MESH_disconnect (lsocket->mesh);
2320   
2321   GNUNET_free (lsocket);
2322 }
2323
2324
2325 /**
2326  * Tries to write the given data to the stream
2327  *
2328  * @param socket the socket representing a stream
2329  * @param data the data buffer from where the data is written into the stream
2330  * @param size the number of bytes to be written from the data buffer
2331  * @param timeout the timeout period
2332  * @param write_cont the function to call upon writing some bytes into the stream
2333  * @param write_cont_cls the closure
2334  * @return handle to cancel the operation
2335  */
2336 struct GNUNET_STREAM_IOWriteHandle *
2337 GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
2338                      const void *data,
2339                      size_t size,
2340                      struct GNUNET_TIME_Relative timeout,
2341                      GNUNET_STREAM_CompletionContinuation write_cont,
2342                      void *write_cont_cls)
2343 {
2344   unsigned int num_needed_packets;
2345   unsigned int packet;
2346   struct GNUNET_STREAM_IOWriteHandle *io_handle;
2347   uint32_t packet_size;
2348   uint32_t payload_size;
2349   struct GNUNET_STREAM_DataMessage *data_msg;
2350   const void *sweep;
2351   struct GNUNET_TIME_Relative ack_deadline;
2352
2353   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2354               "%s\n", __func__);
2355
2356   /* Return NULL if there is already a write request pending */
2357   if (NULL != socket->write_handle)
2358   {
2359     GNUNET_break (0);
2360     return NULL;
2361   }
2362   if (!((STATE_ESTABLISHED == socket->state)
2363         || (STATE_RECEIVE_CLOSE_WAIT == socket->state)
2364         || (STATE_RECEIVE_CLOSED == socket->state)))
2365     {
2366       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2367                   "%s: Attempting to write on a closed (OR) not-yet-established"
2368                   "stream\n",
2369                   GNUNET_i2s (&socket->our_id));
2370       return NULL;
2371     } 
2372   if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size)
2373     size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH  * max_payload_size;
2374   num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size;
2375   io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
2376   io_handle->write_cont = write_cont;
2377   io_handle->write_cont_cls = write_cont_cls;
2378   io_handle->size = size;
2379   sweep = data;
2380   /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
2381      determined from RTT */
2382   ack_deadline = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
2383   /* Divide the given buffer into packets for sending */
2384   for (packet=0; packet < num_needed_packets; packet++)
2385     {
2386       if ((packet + 1) * max_payload_size < size) 
2387         {
2388           payload_size = max_payload_size;
2389           packet_size = MAX_PACKET_SIZE;
2390         }
2391       else 
2392         {
2393           payload_size = size - packet * max_payload_size;
2394           packet_size =  payload_size + sizeof (struct
2395                                                 GNUNET_STREAM_DataMessage); 
2396         }
2397       io_handle->messages[packet] = GNUNET_malloc (packet_size);
2398       io_handle->messages[packet]->header.header.size = htons (packet_size);
2399       io_handle->messages[packet]->header.header.type =
2400         htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
2401       io_handle->messages[packet]->sequence_number =
2402         htons (socket->write_sequence_number++);
2403       io_handle->messages[packet]->offset = htons (socket->write_offset);
2404
2405       /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
2406          determined from RTT */
2407       io_handle->messages[packet]->ack_deadline =
2408         GNUNET_TIME_relative_hton (ack_deadline);
2409       data_msg = io_handle->messages[packet];
2410       /* Copy data from given buffer to the packet */
2411       memcpy (&data_msg[1],
2412               sweep,
2413               payload_size);
2414       sweep += payload_size;
2415       socket->write_offset += payload_size;
2416     }
2417   socket->write_handle = io_handle;
2418   write_data (socket);
2419
2420   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2421               "%s() END\n", __func__);
2422
2423   return io_handle;
2424 }
2425
2426
2427 /**
2428  * Tries to read data from the stream
2429  *
2430  * @param socket the socket representing a stream
2431  * @param timeout the timeout period
2432  * @param proc function to call with data (once only)
2433  * @param proc_cls the closure for proc
2434  * @return handle to cancel the operation
2435  */
2436 struct GNUNET_STREAM_IOReadHandle *
2437 GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
2438                     struct GNUNET_TIME_Relative timeout,
2439                     GNUNET_STREAM_DataProcessor proc,
2440                     void *proc_cls)
2441 {
2442   struct GNUNET_STREAM_IOReadHandle *read_handle;
2443   
2444   /* Return NULL if there is already a read handle; the user has to cancel that
2445   first before continuing or has to wait until it is completed */
2446   if (NULL != socket->read_handle) return NULL;
2447
2448   read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
2449   read_handle->proc = proc;
2450   socket->read_handle = read_handle;
2451
2452   /* Check if we have a packet at bitmap 0 */
2453   if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
2454                                           0))
2455     {
2456       socket->read_task = GNUNET_SCHEDULER_add_now (&call_read_processor,
2457                                                     socket);
2458    
2459     }
2460   
2461   /* Setup the read timeout task */
2462   socket->read_io_timeout_task = GNUNET_SCHEDULER_add_delayed (timeout,
2463                                                                &read_io_timeout,
2464                                                                socket);
2465   return read_handle;
2466 }
2467
2468
2469 /**
2470  * Cancel pending write operation.
2471  *
2472  * @param ioh handle to operation to cancel
2473  */
2474 void
2475 GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
2476 {
2477   return;
2478 }
2479
2480
2481 /**
2482  * Cancel pending read operation.
2483  *
2484  * @param ioh handle to operation to cancel
2485  */
2486 void
2487 GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh)
2488 {
2489   return;
2490 }