-added read io timeout
[oweals/gnunet.git] / src / stream / stream_api.c
1 /*
2   This file is part of GNUnet.
3   (C) 2012 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file stream/stream_api.c
23  * @brief Implementation of the stream library
24  * @author Sree Harsha Totakura
25  */
26 #include "platform.h"
27 #include "gnunet_common.h"
28 #include "gnunet_crypto_lib.h"
29 #include "gnunet_stream_lib.h"
30 #include "stream_protocol.h"
31
32
33 /**
34  * The maximum packet size of a stream packet
35  */
36 #define MAX_PACKET_SIZE 64000
37
38 /**
39  * The maximum payload a data message packet can carry
40  */
41 static size_t max_payload_size = 
42   MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
43
44 /**
45  * Receive buffer
46  */
47 #define RECEIVE_BUFFER_SIZE 4096000
48
49 /**
50  * states in the Protocol
51  */
52 enum State
53   {
54     /**
55      * Client initialization state
56      */
57     STATE_INIT,
58
59     /**
60      * Listener initialization state 
61      */
62     STATE_LISTEN,
63
64     /**
65      * Pre-connection establishment state
66      */
67     STATE_HELLO_WAIT,
68
69     /**
70      * State where a connection has been established
71      */
72     STATE_ESTABLISHED,
73
74     /**
75      * State where the socket is closed on our side and waiting to be ACK'ed
76      */
77     STATE_RECEIVE_CLOSE_WAIT,
78
79     /**
80      * State where the socket is closed for reading
81      */
82     STATE_RECEIVE_CLOSED,
83
84     /**
85      * State where the socket is closed on our side and waiting to be ACK'ed
86      */
87     STATE_TRANSMIT_CLOSE_WAIT,
88
89     /**
90      * State where the socket is closed for writing
91      */
92     STATE_TRANSMIT_CLOSED,
93
94     /**
95      * State where the socket is closed on our side and waiting to be ACK'ed
96      */
97     STATE_CLOSE_WAIT,
98
99     /**
100      * State where the socket is closed
101      */
102     STATE_CLOSED 
103   };
104
105
106 /**
107  * Functions of this type are called when a message is written
108  *
109  * @param cls the closure from queue_message
110  * @param socket the socket the written message was bound to
111  */
112 typedef void (*SendFinishCallback) (void *cls,
113                                     struct GNUNET_STREAM_Socket *socket);
114
115
116 /**
117  * The send message queue
118  */
119 struct MessageQueue
120 {
121   /**
122    * The message
123    */
124   struct GNUNET_STREAM_MessageHeader *message;
125
126   /**
127    * Callback to be called when the message is sent
128    */
129   SendFinishCallback finish_cb;
130
131   /**
132    * The closure for finish_cb
133    */
134   void *finish_cb_cls;
135
136   /**
137    * The next message in queue. Should be NULL in the last message
138    */
139   struct MessageQueue *next;
140
141   /**
142    * The next message in queue. Should be NULL in the first message
143    */
144   struct MessageQueue *prev;
145 };
146
147
148 /**
149  * The STREAM Socket Handler
150  */
151 struct GNUNET_STREAM_Socket
152 {
153
154   /**
155    * The peer identity of the peer at the other end of the stream
156    */
157   struct GNUNET_PeerIdentity other_peer;
158
159   /**
160    * Retransmission timeout
161    */
162   struct GNUNET_TIME_Relative retransmit_timeout;
163
164   /**
165    * The Acknowledgement Bitmap
166    */
167   GNUNET_STREAM_AckBitmap ack_bitmap;
168
169   /**
170    * Time when the Acknowledgement was queued
171    */
172   struct GNUNET_TIME_Absolute ack_time_registered;
173
174   /**
175    * Queued Acknowledgement deadline
176    */
177   struct GNUNET_TIME_Relative ack_time_deadline;
178
179   /**
180    * The task for sending timely Acks
181    */
182   GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
183
184   /**
185    * The mesh handle
186    */
187   struct GNUNET_MESH_Handle *mesh;
188
189   /**
190    * The mesh tunnel handle
191    */
192   struct GNUNET_MESH_Tunnel *tunnel;
193
194   /**
195    * Stream open closure
196    */
197   void *open_cls;
198
199   /**
200    * Stream open callback
201    */
202   GNUNET_STREAM_OpenCallback open_cb;
203
204   /**
205    * The current transmit handle (if a pending transmit request exists)
206    */
207   struct GNUNET_MESH_TransmitHandle *transmit_handle;
208
209   /**
210    * The current message associated with the transmit handle
211    */
212   struct MessageQueue *queue_head;
213
214   /**
215    * The queue tail, should always point to the last message in queue
216    */
217   struct MessageQueue *queue_tail;
218
219   /**
220    * The write IO_handle associated with this socket
221    */
222   struct GNUNET_STREAM_IOWriteHandle *write_handle;
223
224   /**
225    * The read IO_handle associated with this socket
226    */
227   struct GNUNET_STREAM_IOReadHandle *read_handle;
228
229   /**
230    * Buffer for storing received messages
231    */
232   void *receive_buffer;
233
234   /**
235    * Copy buffer pointer; Used during read operations
236    */
237   void *copy_buffer;
238
239   /**
240    * Task identifier for the read io timeout task
241    */
242   GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task;
243
244   /**
245    * The state of the protocol associated with this socket
246    */
247   enum State state;
248
249   /**
250    * The status of the socket
251    */
252   enum GNUNET_STREAM_Status status;
253
254   /**
255    * The number of previous timeouts; FIXME: currently not used
256    */
257   unsigned int retries;
258
259   /**
260    * The session id associated with this stream connection
261    * FIXME: Not used currently, may be removed
262    */
263   uint32_t session_id;
264
265   /**
266    * Write sequence number. Set to random when sending HELLO(client) and
267    * HELLO_ACK(server) 
268    */
269   uint32_t write_sequence_number;
270
271   /**
272    * Read sequence number. This number's value is determined during handshake
273    */
274   uint32_t read_sequence_number;
275
276   /**
277    * The receiver buffer size
278    */
279   uint32_t receive_buffer_size;
280
281   /**
282    * The receiver buffer boundaries
283    */
284   uint32_t receive_buffer_boundaries[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
285
286   /**
287    * receiver's available buffer after the last acknowledged packet
288    */
289   uint32_t receive_window_available;
290
291   /**
292    * The offset pointer used during write operation
293    */
294   uint32_t write_offset;
295
296   /**
297    * The offset after which we are expecting data
298    */
299   uint32_t read_offset;
300
301   /**
302    * The size of the copy buffer
303    */
304   uint32_t copy_buffer_size;
305   
306   /**
307    * The read offset of copy buffer
308    */
309   uint32_t copy_buffer_read_offset;
310 };
311
312
313 /**
314  * A socket for listening
315  */
316 struct GNUNET_STREAM_ListenSocket
317 {
318
319   /**
320    * The mesh handle
321    */
322   struct GNUNET_MESH_Handle *mesh;
323
324   /**
325    * The callback function which is called after successful opening socket
326    */
327   GNUNET_STREAM_ListenCallback listen_cb;
328
329   /**
330    * The call back closure
331    */
332   void *listen_cb_cls;
333
334   /**
335    * The service port
336    */
337   GNUNET_MESH_ApplicationType port;
338 };
339
340
341 /**
342  * The IO Write Handle
343  */
344 struct GNUNET_STREAM_IOWriteHandle
345 {
346   /**
347    * The packet_buffers associated with this Handle
348    */
349   struct GNUNET_STREAM_DataMessage *messages[64];
350
351   /**
352    * The bitmap of this IOHandle; Corresponding bit for a message is set when
353    * it has been acknowledged by the receiver
354    */
355   GNUNET_STREAM_AckBitmap ack_bitmap;
356
357   /**
358    * Number of packets sent before waiting for an ack
359    *
360    * FIXME: Do we need this?
361    */
362   unsigned int sent_packets;
363 };
364
365
366 /**
367  * The IO Read Handle
368  */
369 struct GNUNET_STREAM_IOReadHandle
370 {
371   /**
372    * Callback for the read processor
373    */
374   GNUNET_STREAM_DataProcessor proc;
375
376   /**
377    * The closure pointer for the read processor callback
378    */
379   void *proc_cls;
380 };
381
382
383 /**
384  * Default value in seconds for various timeouts
385  */
386 static unsigned int default_timeout = 300;
387
388
389 /**
390  * Callback function for sending hello message
391  *
392  * @param cls closure the socket
393  * @param size number of bytes available in buf
394  * @param buf where the callee should write the message
395  * @return number of bytes written to buf
396  */
397 static size_t
398 send_message_notify (void *cls, size_t size, void *buf)
399 {
400   struct GNUNET_STREAM_Socket *socket = cls;
401   struct MessageQueue *head;
402   size_t ret;
403
404   socket->transmit_handle = NULL; /* Remove the transmit handle */
405   head = socket->queue_head;
406   if (NULL == head)
407     return 0; /* just to be safe */
408   if (0 == size)                /* request timed out */
409     {
410       socket->retries++;
411       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
412                   "Message sending timed out. Retry %d \n",
413                   socket->retries);
414       socket->transmit_handle = 
415         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
416                                            0, /* Corking */
417                                            1, /* Priority */
418                                            /* FIXME: exponential backoff */
419                                            socket->retransmit_timeout,
420                                            &socket->other_peer,
421                                            ntohs (head->message->header.size),
422                                            &send_message_notify,
423                                            socket);
424       return 0;
425     }
426
427   ret = ntohs (head->message->header.size);
428   GNUNET_assert (size >= ret);
429   memcpy (buf, head->message, ret);
430   if (NULL != head->finish_cb)
431     {
432       head->finish_cb (socket, head->finish_cb_cls);
433     }
434   GNUNET_CONTAINER_DLL_remove (socket->queue_head,
435                                socket->queue_tail,
436                                head);
437   GNUNET_free (head->message);
438   GNUNET_free (head);
439   head = socket->queue_head;
440   if (NULL != head)    /* more pending messages to send */
441     {
442       socket->retries = 0;
443       socket->transmit_handle = 
444         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
445                                            0, /* Corking */
446                                            1, /* Priority */
447                                            /* FIXME: exponential backoff */
448                                            socket->retransmit_timeout,
449                                            &socket->other_peer,
450                                            ntohs (head->message->header.size),
451                                            &send_message_notify,
452                                            socket);
453     }
454   return ret;
455 }
456
457
458 /**
459  * Queues a message for sending using the mesh connection of a socket
460  *
461  * @param socket the socket whose mesh connection is used
462  * @param message the message to be sent
463  * @param finish_cb the callback to be called when the message is sent
464  * @param finish_cb_cls the closure for the callback
465  */
466 static void
467 queue_message (struct GNUNET_STREAM_Socket *socket,
468                struct GNUNET_STREAM_MessageHeader *message,
469                SendFinishCallback finish_cb,
470                void *finish_cb_cls)
471 {
472   struct MessageQueue *queue_entity;
473
474   queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
475   queue_entity->message = message;
476   queue_entity->finish_cb = finish_cb;
477   queue_entity->finish_cb_cls = finish_cb_cls;
478   GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
479                                     socket->queue_tail,
480                                     queue_entity);
481   if (NULL == socket->transmit_handle)
482   {
483     socket->retries = 0;
484     socket->transmit_handle = 
485       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
486                                          0, /* Corking */
487                                          1, /* Priority */
488                                          socket->retransmit_timeout,
489                                          &socket->other_peer,
490                                          ntohs (message->header.size),
491                                          &send_message_notify,
492                                          socket);
493   }
494 }
495
496
497 /**
498  * Callback function for sending ack message
499  *
500  * @param cls closure the ACK message created in ack_task
501  * @param size number of bytes available in buffer
502  * @param buf where the callee should write the message
503  * @return number of bytes written to buf
504  */
505 static size_t
506 send_ack_notify (void *cls, size_t size, void *buf)
507 {
508   struct GNUNET_STREAM_AckMessage *ack_msg = cls;
509
510   if (0 == size)
511     {
512       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
513                   "%s called with size 0\n", __func__);
514       return 0;
515     }
516   GNUNET_assert (ack_msg->header.header.size <= size);
517   
518   size = ack_msg->header.header.size;
519   memcpy (buf, ack_msg, size);
520   return size;
521 }
522
523
524 /**
525  * Task for sending ACK message
526  *
527  * @param cls the socket
528  * @param tc the Task context
529  */
530 static void
531 ack_task (void *cls,
532           const struct GNUNET_SCHEDULER_TaskContext *tc)
533 {
534   struct GNUNET_STREAM_Socket *socket = cls;
535   struct GNUNET_STREAM_AckMessage *ack_msg;
536
537   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
538     {
539       return;
540     }
541
542   socket->ack_task_id = 0;
543
544   /* Create the ACK Message */
545   ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
546   ack_msg->header.header.size = htons (sizeof (struct 
547                                                GNUNET_STREAM_AckMessage));
548   ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
549   ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
550   ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
551   ack_msg->receive_window_remaining = 
552     htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
553
554   /* Request MESH for sending ACK */
555   GNUNET_MESH_notify_transmit_ready (socket->tunnel,
556                                      0, /* Corking */
557                                      1, /* Priority */
558                                      socket->retransmit_timeout,
559                                      &socket->other_peer,
560                                      ntohs (ack_msg->header.header.size),
561                                      &send_ack_notify,
562                                      ack_msg);
563
564   
565 }
566
567
568 /**
569  * Function to modify a bit in GNUNET_STREAM_AckBitmap
570  *
571  * @param bitmap the bitmap to modify
572  * @param bit the bit number to modify
573  * @param value GNUNET_YES to on, GNUNET_NO to off
574  */
575 static void
576 ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
577                       unsigned int bit, 
578                       int value)
579 {
580   GNUNET_assert (bit < 64);
581   if (GNUNET_YES == value)
582     *bitmap |= (1LL << bit);
583   else
584     *bitmap &= ~(1LL << bit);
585 }
586
587
588 /**
589  * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
590  *
591  * @param bitmap address of the bitmap that has to be checked
592  * @param bit the bit number to check
593  * @return GNUNET_YES if the bit is set; GNUNET_NO if not
594  */
595 static uint8_t
596 ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
597                       unsigned int bit)
598 {
599   GNUNET_assert (bit < 64);
600   return 0 != (*bitmap & (1LL << bit));
601 }
602
603
604
605 /**
606  * Function called when Data Message is sent
607  *
608  * @param cls the io_handle corresponding to the Data Message
609  * @param socket the socket which was used
610  */
611 static void
612 write_data_finish_cb (void *cls,
613                       struct GNUNET_STREAM_Socket *socket)
614 {
615   struct GNUNET_STREAM_IOWriteHandle *io_handle = cls;
616
617   io_handle->sent_packets++;
618 }
619
620
621 /**
622  * Writes data using the given socket. The amount of data written is limited by
623  * the receive_window_size
624  *
625  * @param socket the socket to use
626  */
627 static void 
628 write_data (struct GNUNET_STREAM_Socket *socket)
629 {
630   struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
631   unsigned int packet;
632   int ack_packet;
633
634   ack_packet = -1;
635   /* Find the last acknowledged packet */
636   for (packet=0; packet < 64; packet++)
637     {      
638       if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
639                                               packet))
640         ack_packet = packet;        
641       else if (NULL == io_handle->messages[packet])
642         break;
643     }
644   /* Resend packets which weren't ack'ed */
645   for (packet=0; packet < ack_packet; packet++)
646     {
647       if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
648                                              packet))
649         {
650           queue_message (socket,
651                          &io_handle->messages[packet]->header,
652                          NULL,
653                          NULL);
654         }
655     }
656   packet = ack_packet + 1;
657   /* Now send new packets if there is enough buffer space */
658   while ( (NULL != io_handle->messages[packet]) &&
659           (socket->receive_window_available >= ntohs (io_handle->messages[packet]->header.header.size)) )
660     {
661       socket->receive_window_available -= ntohs (io_handle->messages[packet]->header.header.size);
662       queue_message (socket,
663                      &io_handle->messages[packet]->header,
664                      &write_data_finish_cb,
665                      io_handle);
666       packet++;
667     }
668 }
669
670
671 /**
672  * Task for calling the read processor
673  *
674  * @param cls the socket
675  * @param tc the task context
676  */
677 static void
678 call_read_processor_task (void *cls,
679                           const struct GNUNET_SCHEDULER_TaskContext *tc)
680 {
681   struct GNUNET_STREAM_Socket *socket = cls;
682   size_t read_size;
683   size_t valid_read_size;
684
685   if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN) return;
686
687   GNUNET_assert (NULL != socket->read_handle);
688   GNUNET_assert (NULL != socket->read_handle->proc);
689   GNUNET_assert (NULL != socket->copy_buffer);
690   GNUNET_assert (0 != socket->copy_buffer_size);
691
692   valid_read_size = socket->copy_buffer_size - socket->copy_buffer_read_offset;
693   GNUNET_assert (0 != valid_read_size);
694
695   /* Cancel the read_io_timeout_task */
696   GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task);
697   /* Call the data processor */
698   read_size = socket->read_handle->proc (socket->read_handle->proc_cls,
699                                          socket->status,
700                                          socket->copy_buffer 
701                                          + socket->copy_buffer_read_offset,
702                                          valid_read_size);
703
704   GNUNET_assert (read_size <= valid_read_size);
705   socket->copy_buffer_read_offset += read_size;
706
707   /* Free the copy buffer once it has been read entirely */
708   if (socket->copy_buffer_read_offset == socket->copy_buffer_size)
709     {
710       GNUNET_free (socket->copy_buffer);
711       socket->copy_buffer = NULL;
712       socket->copy_buffer_size = 0;
713       socket->copy_buffer_read_offset = 0;
714     }
715
716   /* Free the read handle */
717   GNUNET_free (socket->read_handle);
718   socket->read_handle = NULL;
719 }
720
721
722 /**
723  * Prepares the receive buffer for possible reads; Should only be called when
724  * there is a valid READ io request pending and socket->copy_buffer is empty
725  *
726  * @param socket the socket pointer
727  */
728 static void 
729 prepare_buffer_for_read (struct GNUNET_STREAM_Socket *socket)
730 {
731   unsigned int packet;
732   uint32_t offset_increase;
733   uint32_t sequence_increase;
734
735   GNUNET_assert (NULL == socket->copy_buffer);
736   GNUNET_assert (NULL != socket->read_handle);
737
738   /* Check the bitmap for any holes */
739   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
740     {
741       if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
742                                              packet))
743         break;
744     }
745
746   sequence_increase = packet;
747
748   if (0 == sequence_increase)              /* The first packet is still missing */
749     {
750       return;
751     }
752   
753   /* Copy data to copy buffer */
754   GNUNET_assert (0 < socket->receive_buffer_boundaries[sequence_increase-1]);
755   socket->copy_buffer = 
756     GNUNET_malloc (socket->receive_buffer_boundaries[sequence_increase-1]);
757   memcpy (socket->copy_buffer, 
758           socket->receive_buffer,
759           socket->receive_buffer_boundaries[sequence_increase-1]);
760   
761   /* Shift the data in the receive buffer */
762   memmove (socket->receive_buffer,
763            socket->receive_buffer 
764            + socket->receive_buffer_boundaries[sequence_increase-1],
765            socket->receive_buffer_size - socket->receive_buffer_boundaries[sequence_increase-1]);
766   
767   /* Shift the bitmap */
768   socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
769   
770   /* Set read_sequence_number */
771   socket->read_sequence_number += sequence_increase;
772   
773   /* Set read_offset */
774   offset_increase = socket->receive_buffer_boundaries[sequence_increase-1];
775   socket->read_offset += offset_increase;
776   
777   /* Fix relative boundaries */
778   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
779     {
780       if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase)
781         {
782           socket->receive_buffer_boundaries[packet] = 
783             socket->receive_buffer_boundaries[packet + sequence_increase] 
784             - offset_increase;
785         }
786       else
787         socket->receive_buffer_boundaries[packet] = 0;
788     }
789
790   GNUNET_SCHEDULER_add_continuation (&call_read_processor_task,
791                                      socket,
792                                      GNUNET_SCHEDULER_REASON_PREREQ_DONE);
793 }
794
795
796
797 /**
798  * Cancels the existing read io handle
799  *
800  * @param cls the closure from the SCHEDULER call
801  * @param tc the task context
802  */
803 static void
804 cancel_read_io (void *cls, 
805                 const struct GNUNET_SCHEDULER_TaskContext *tc)
806 {
807   struct GNUNET_STREAM_Socket *socket = cls;
808
809   GNUNET_assert (NULL != socket->read_handle);
810   
811   GNUNET_free (socket->read_handle);
812   socket->read_handle = NULL;
813 }
814
815
816 /**
817  * Handler for DATA messages; Same for both client and server
818  *
819  * @param socket the socket through which the ack was received
820  * @param tunnel connection to the other end
821  * @param sender who sent the message
822  * @param msg the data message
823  * @param atsi performance data for the connection
824  * @return GNUNET_OK to keep the connection open,
825  *         GNUNET_SYSERR to close it (signal serious error)
826  */
827 static int
828 handle_data (struct GNUNET_STREAM_Socket *socket,
829              struct GNUNET_MESH_Tunnel *tunnel,
830              const struct GNUNET_PeerIdentity *sender,
831              const struct GNUNET_STREAM_DataMessage *msg,
832              const struct GNUNET_ATS_Information*atsi)
833 {
834   const void *payload;
835   uint32_t bytes_needed;
836   uint32_t relative_offset;
837   uint32_t relative_sequence_number;
838   uint16_t size;
839
840   size = htons (msg->header.header.size);
841   if (size < sizeof (struct GNUNET_STREAM_DataMessage))
842     {
843       GNUNET_break_op (0);
844       return GNUNET_SYSERR;
845     }
846
847   switch (socket->state)
848     {
849     case STATE_ESTABLISHED:
850     case STATE_TRANSMIT_CLOSED:
851     case STATE_TRANSMIT_CLOSE_WAIT:
852
853       /* check if the message's sequence number is in the range we are
854          expecting */
855       relative_sequence_number = 
856         ntohl (msg->sequence_number) - socket->read_sequence_number;
857       if ( relative_sequence_number > 64)
858         {
859           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
860                       "Ignoring received message with sequence number %d",
861                       ntohl (msg->sequence_number));
862           return GNUNET_YES;
863         }
864
865       /* Check if we have to allocate the buffer */
866       size -= sizeof (struct GNUNET_STREAM_DataMessage);
867       relative_offset = ntohl (msg->offset) - socket->read_offset;
868       bytes_needed = relative_offset + size;
869       
870       if (bytes_needed > socket->receive_buffer_size)
871         {
872           if (bytes_needed <= RECEIVE_BUFFER_SIZE)
873             {
874               socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
875                                                        bytes_needed);
876               socket->receive_buffer_size = bytes_needed;
877             }
878           else
879             {
880               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
881                           "Cannot accommodate packet %d as buffer is full\n",
882                           ntohl (msg->sequence_number));
883               return GNUNET_YES;
884             }
885         }
886       
887       /* Copy Data to buffer */
888       payload = &msg[1];
889       GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
890       memcpy (socket->receive_buffer + relative_offset,
891               payload,
892               size);
893       socket->receive_buffer_boundaries[relative_sequence_number] = 
894         relative_offset + size;
895       
896       /* Modify the ACK bitmap */
897       ackbitmap_modify_bit (&socket->ack_bitmap,
898                             relative_sequence_number,
899                             GNUNET_YES);
900
901       /* Start ACK sending task if one is not already present */
902       if (0 == socket->ack_task_id)
903        {
904          socket->ack_task_id = 
905            GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
906                                          (msg->ack_deadline),
907                                          &ack_task,
908                                          socket);
909        }
910
911       if ((NULL != socket->read_handle) /* A read handle is waiting */
912           && (NULL == socket->copy_buffer)) /* And the copy buffer is empty */
913         {
914           prepare_buffer_for_read (socket);
915         }
916       
917       break;
918
919     default:
920       /* FIXME: call statistics */
921       break;
922     }
923   return GNUNET_YES;
924 }
925
926 /**
927  * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
928  *
929  * @param cls the socket (set from GNUNET_MESH_connect)
930  * @param tunnel connection to the other end
931  * @param tunnel_ctx place to store local state associated with the tunnel
932  * @param sender who sent the message
933  * @param message the actual message
934  * @param atsi performance data for the connection
935  * @return GNUNET_OK to keep the connection open,
936  *         GNUNET_SYSERR to close it (signal serious error)
937  */
938 static int
939 client_handle_data (void *cls,
940              struct GNUNET_MESH_Tunnel *tunnel,
941              void **tunnel_ctx,
942              const struct GNUNET_PeerIdentity *sender,
943              const struct GNUNET_MessageHeader *message,
944              const struct GNUNET_ATS_Information*atsi)
945 {
946   struct GNUNET_STREAM_Socket *socket = cls;
947
948   return handle_data (socket, 
949                       tunnel, 
950                       sender, 
951                       (const struct GNUNET_STREAM_DataMessage *) message, 
952                       atsi);
953 }
954
955
956 /**
957  * Callback to set state to ESTABLISHED
958  *
959  * @param cls the closure from queue_message FIXME: document
960  * @param socket the socket to requiring state change
961  */
962 static void
963 set_state_established (void *cls,
964                        struct GNUNET_STREAM_Socket *socket)
965 {
966   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Attaining ESTABLISHED state\n");
967   socket->write_offset = 0;
968   socket->read_offset = 0;
969   socket->state = STATE_ESTABLISHED;
970 }
971
972
973 /**
974  * Callback to set state to HELLO_WAIT
975  *
976  * @param cls the closure from queue_message
977  * @param socket the socket to requiring state change
978  */
979 static void
980 set_state_hello_wait (void *cls,
981                       struct GNUNET_STREAM_Socket *socket)
982 {
983   GNUNET_assert (STATE_INIT == socket->state);
984   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Attaining HELLO_WAIT state\n");
985   socket->state = STATE_HELLO_WAIT;
986 }
987
988
989 /**
990  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
991  *
992  * @param cls the socket (set from GNUNET_MESH_connect)
993  * @param tunnel connection to the other end
994  * @param tunnel_ctx this is NULL
995  * @param sender who sent the message
996  * @param message the actual message
997  * @param atsi performance data for the connection
998  * @return GNUNET_OK to keep the connection open,
999  *         GNUNET_SYSERR to close it (signal serious error)
1000  */
1001 static int
1002 client_handle_hello_ack (void *cls,
1003                          struct GNUNET_MESH_Tunnel *tunnel,
1004                          void **tunnel_ctx,
1005                          const struct GNUNET_PeerIdentity *sender,
1006                          const struct GNUNET_MessageHeader *message,
1007                          const struct GNUNET_ATS_Information*atsi)
1008 {
1009   struct GNUNET_STREAM_Socket *socket = cls;
1010   const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
1011   struct GNUNET_STREAM_HelloAckMessage *reply;
1012
1013   ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
1014   GNUNET_assert (socket->tunnel == tunnel);
1015   switch (socket->state)
1016   {
1017   case STATE_HELLO_WAIT:
1018       socket->read_sequence_number = ntohl (ack_msg->sequence_number);
1019       socket->receive_window_available = ntohl (ack_msg->receive_window_size);
1020       /* Get the random sequence number */
1021       socket->write_sequence_number = 
1022         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1023       reply = 
1024         GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1025       reply->header.header.size = 
1026         htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1027       reply->header.header.type = 
1028         htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1029       reply->sequence_number = htonl (socket->write_sequence_number);
1030       reply->receive_window_size = htonl (RECEIVE_BUFFER_SIZE);
1031       queue_message (socket, 
1032                      &reply->header, 
1033                      &set_state_established, 
1034                      NULL);      
1035       return GNUNET_OK;
1036   case STATE_ESTABLISHED:
1037   case STATE_RECEIVE_CLOSE_WAIT:
1038     // call statistics (# ACKs ignored++)
1039     return GNUNET_OK;
1040   case STATE_INIT:
1041   default:
1042     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1043                 "Server sent HELLO_ACK when in state %d\n", socket->state);
1044     socket->state = STATE_CLOSED; // introduce STATE_ERROR?
1045     return GNUNET_SYSERR;
1046   }
1047
1048 }
1049
1050
1051 /**
1052  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1053  *
1054  * @param cls the socket (set from GNUNET_MESH_connect)
1055  * @param tunnel connection to the other end
1056  * @param tunnel_ctx this is NULL
1057  * @param sender who sent the message
1058  * @param message the actual message
1059  * @param atsi performance data for the connection
1060  * @return GNUNET_OK to keep the connection open,
1061  *         GNUNET_SYSERR to close it (signal serious error)
1062  */
1063 static int
1064 client_handle_reset (void *cls,
1065                      struct GNUNET_MESH_Tunnel *tunnel,
1066                      void **tunnel_ctx,
1067                      const struct GNUNET_PeerIdentity *sender,
1068                      const struct GNUNET_MessageHeader *message,
1069                      const struct GNUNET_ATS_Information*atsi)
1070 {
1071   struct GNUNET_STREAM_Socket *socket = cls;
1072
1073   return GNUNET_OK;
1074 }
1075
1076
1077 /**
1078  * Common message handler for handling TRANSMIT_CLOSE messages
1079  *
1080  * @param socket the socket through which the ack was received
1081  * @param tunnel connection to the other end
1082  * @param sender who sent the message
1083  * @param msg the transmit close message
1084  * @param atsi performance data for the connection
1085  * @return GNUNET_OK to keep the connection open,
1086  *         GNUNET_SYSERR to close it (signal serious error)
1087  */
1088 static int
1089 handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
1090                        struct GNUNET_MESH_Tunnel *tunnel,
1091                        const struct GNUNET_PeerIdentity *sender,
1092                        const struct GNUNET_STREAM_MessageHeader *msg,
1093                        const struct GNUNET_ATS_Information*atsi)
1094 {
1095   struct GNUNET_STREAM_MessageHeader *reply;
1096
1097   switch (socket->state)
1098     {
1099     case STATE_ESTABLISHED:
1100       socket->state = STATE_RECEIVE_CLOSED;
1101
1102       /* Send TRANSMIT_CLOSE_ACK */
1103       reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1104       reply->header.type = 
1105         htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
1106       reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1107       queue_message (socket, reply, NULL, NULL);
1108       break;
1109
1110     default:
1111       /* FIXME: Call statistics? */
1112       break;
1113     }
1114   return GNUNET_YES;
1115 }
1116
1117
1118 /**
1119  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1120  *
1121  * @param cls the socket (set from GNUNET_MESH_connect)
1122  * @param tunnel connection to the other end
1123  * @param tunnel_ctx this is NULL
1124  * @param sender who sent the message
1125  * @param message the actual message
1126  * @param atsi performance data for the connection
1127  * @return GNUNET_OK to keep the connection open,
1128  *         GNUNET_SYSERR to close it (signal serious error)
1129  */
1130 static int
1131 client_handle_transmit_close (void *cls,
1132                               struct GNUNET_MESH_Tunnel *tunnel,
1133                               void **tunnel_ctx,
1134                               const struct GNUNET_PeerIdentity *sender,
1135                               const struct GNUNET_MessageHeader *message,
1136                               const struct GNUNET_ATS_Information*atsi)
1137 {
1138   struct GNUNET_STREAM_Socket *socket = cls;
1139   
1140   return handle_transmit_close (socket,
1141                                 tunnel,
1142                                 sender,
1143                                 (struct GNUNET_STREAM_MessageHeader *)message,
1144                                 atsi);
1145 }
1146
1147
1148 /**
1149  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1150  *
1151  * @param cls the socket (set from GNUNET_MESH_connect)
1152  * @param tunnel connection to the other end
1153  * @param tunnel_ctx this is NULL
1154  * @param sender who sent the message
1155  * @param message the actual message
1156  * @param atsi performance data for the connection
1157  * @return GNUNET_OK to keep the connection open,
1158  *         GNUNET_SYSERR to close it (signal serious error)
1159  */
1160 static int
1161 client_handle_transmit_close_ack (void *cls,
1162                                   struct GNUNET_MESH_Tunnel *tunnel,
1163                                   void **tunnel_ctx,
1164                                   const struct GNUNET_PeerIdentity *sender,
1165                                   const struct GNUNET_MessageHeader *message,
1166                                   const struct GNUNET_ATS_Information*atsi)
1167 {
1168   struct GNUNET_STREAM_Socket *socket = cls;
1169
1170   return GNUNET_OK;
1171 }
1172
1173
1174 /**
1175  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1176  *
1177  * @param cls the socket (set from GNUNET_MESH_connect)
1178  * @param tunnel connection to the other end
1179  * @param tunnel_ctx this is NULL
1180  * @param sender who sent the message
1181  * @param message the actual message
1182  * @param atsi performance data for the connection
1183  * @return GNUNET_OK to keep the connection open,
1184  *         GNUNET_SYSERR to close it (signal serious error)
1185  */
1186 static int
1187 client_handle_receive_close (void *cls,
1188                              struct GNUNET_MESH_Tunnel *tunnel,
1189                              void **tunnel_ctx,
1190                              const struct GNUNET_PeerIdentity *sender,
1191                              const struct GNUNET_MessageHeader *message,
1192                              const struct GNUNET_ATS_Information*atsi)
1193 {
1194   struct GNUNET_STREAM_Socket *socket = cls;
1195
1196   return GNUNET_OK;
1197 }
1198
1199
1200 /**
1201  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1202  *
1203  * @param cls the socket (set from GNUNET_MESH_connect)
1204  * @param tunnel connection to the other end
1205  * @param tunnel_ctx this is NULL
1206  * @param sender who sent the message
1207  * @param message the actual message
1208  * @param atsi performance data for the connection
1209  * @return GNUNET_OK to keep the connection open,
1210  *         GNUNET_SYSERR to close it (signal serious error)
1211  */
1212 static int
1213 client_handle_receive_close_ack (void *cls,
1214                                  struct GNUNET_MESH_Tunnel *tunnel,
1215                                  void **tunnel_ctx,
1216                                  const struct GNUNET_PeerIdentity *sender,
1217                                  const struct GNUNET_MessageHeader *message,
1218                                  const struct GNUNET_ATS_Information*atsi)
1219 {
1220   struct GNUNET_STREAM_Socket *socket = cls;
1221
1222   return GNUNET_OK;
1223 }
1224
1225
1226 /**
1227  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1228  *
1229  * @param cls the socket (set from GNUNET_MESH_connect)
1230  * @param tunnel connection to the other end
1231  * @param tunnel_ctx this is NULL
1232  * @param sender who sent the message
1233  * @param message the actual message
1234  * @param atsi performance data for the connection
1235  * @return GNUNET_OK to keep the connection open,
1236  *         GNUNET_SYSERR to close it (signal serious error)
1237  */
1238 static int
1239 client_handle_close (void *cls,
1240                      struct GNUNET_MESH_Tunnel *tunnel,
1241                      void **tunnel_ctx,
1242                      const struct GNUNET_PeerIdentity *sender,
1243                      const struct GNUNET_MessageHeader *message,
1244                      const struct GNUNET_ATS_Information*atsi)
1245 {
1246   struct GNUNET_STREAM_Socket *socket = cls;
1247
1248   return GNUNET_OK;
1249 }
1250
1251
1252 /**
1253  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1254  *
1255  * @param cls the socket (set from GNUNET_MESH_connect)
1256  * @param tunnel connection to the other end
1257  * @param tunnel_ctx this is NULL
1258  * @param sender who sent the message
1259  * @param message the actual message
1260  * @param atsi performance data for the connection
1261  * @return GNUNET_OK to keep the connection open,
1262  *         GNUNET_SYSERR to close it (signal serious error)
1263  */
1264 static int
1265 client_handle_close_ack (void *cls,
1266                          struct GNUNET_MESH_Tunnel *tunnel,
1267                          void **tunnel_ctx,
1268                          const struct GNUNET_PeerIdentity *sender,
1269                          const struct GNUNET_MessageHeader *message,
1270                          const struct GNUNET_ATS_Information*atsi)
1271 {
1272   struct GNUNET_STREAM_Socket *socket = cls;
1273
1274   return GNUNET_OK;
1275 }
1276
1277 /*****************************/
1278 /* Server's Message Handlers */
1279 /*****************************/
1280
1281 /**
1282  * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1283  *
1284  * @param cls the closure
1285  * @param tunnel connection to the other end
1286  * @param tunnel_ctx the socket
1287  * @param sender who sent the message
1288  * @param message the actual message
1289  * @param atsi performance data for the connection
1290  * @return GNUNET_OK to keep the connection open,
1291  *         GNUNET_SYSERR to close it (signal serious error)
1292  */
1293 static int
1294 server_handle_data (void *cls,
1295                     struct GNUNET_MESH_Tunnel *tunnel,
1296                     void **tunnel_ctx,
1297                     const struct GNUNET_PeerIdentity *sender,
1298                     const struct GNUNET_MessageHeader *message,
1299                     const struct GNUNET_ATS_Information*atsi)
1300 {
1301   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1302
1303   return handle_data (socket,
1304                       tunnel,
1305                       sender,
1306                       (const struct GNUNET_STREAM_DataMessage *)message,
1307                       atsi);
1308 }
1309
1310
1311 /**
1312  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
1313  *
1314  * @param cls the closure
1315  * @param tunnel connection to the other end
1316  * @param tunnel_ctx the socket
1317  * @param sender who sent the message
1318  * @param message the actual message
1319  * @param atsi performance data for the connection
1320  * @return GNUNET_OK to keep the connection open,
1321  *         GNUNET_SYSERR to close it (signal serious error)
1322  */
1323 static int
1324 server_handle_hello (void *cls,
1325                      struct GNUNET_MESH_Tunnel *tunnel,
1326                      void **tunnel_ctx,
1327                      const struct GNUNET_PeerIdentity *sender,
1328                      const struct GNUNET_MessageHeader *message,
1329                      const struct GNUNET_ATS_Information*atsi)
1330 {
1331   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1332   struct GNUNET_STREAM_HelloAckMessage *reply;
1333
1334   GNUNET_assert (socket->tunnel == tunnel);
1335   if (STATE_INIT == socket->state)
1336     {
1337       /* Get the random sequence number */
1338       socket->write_sequence_number = 
1339         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1340       reply = 
1341         GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1342       reply->header.header.size = 
1343         htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1344       reply->header.header.type = 
1345         htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1346       reply->sequence_number = htonl (socket->write_sequence_number);
1347       queue_message (socket, 
1348                      &reply->header,
1349                      &set_state_hello_wait, 
1350                      NULL);
1351     }
1352   else
1353     {
1354       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1355                   "Client sent HELLO when in state %d\n", socket->state);
1356       /* FIXME: Send RESET? */
1357       
1358     }
1359   return GNUNET_OK;
1360 }
1361
1362
1363 /**
1364  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1365  *
1366  * @param cls the closure
1367  * @param tunnel connection to the other end
1368  * @param tunnel_ctx the socket
1369  * @param sender who sent the message
1370  * @param message the actual message
1371  * @param atsi performance data for the connection
1372  * @return GNUNET_OK to keep the connection open,
1373  *         GNUNET_SYSERR to close it (signal serious error)
1374  */
1375 static int
1376 server_handle_hello_ack (void *cls,
1377                          struct GNUNET_MESH_Tunnel *tunnel,
1378                          void **tunnel_ctx,
1379                          const struct GNUNET_PeerIdentity *sender,
1380                          const struct GNUNET_MessageHeader *message,
1381                          const struct GNUNET_ATS_Information*atsi)
1382 {
1383   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1384   const struct GNUNET_STREAM_HelloAckMessage *ack_message;
1385
1386   ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
1387   GNUNET_assert (socket->tunnel == tunnel);
1388   if (STATE_HELLO_WAIT == socket->state)
1389     {
1390       socket->read_sequence_number = ntohl (ack_message->sequence_number);
1391       socket->receive_window_available = 
1392         ntohl (ack_message->receive_window_size);
1393       /* Attain ESTABLISHED state */
1394       set_state_established (NULL, socket);
1395     }
1396   else
1397     {
1398       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1399                   "Client sent HELLO_ACK when in state %d\n", socket->state);
1400       /* FIXME: Send RESET? */
1401       
1402     }
1403   return GNUNET_OK;
1404 }
1405
1406
1407 /**
1408  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1409  *
1410  * @param cls the closure
1411  * @param tunnel connection to the other end
1412  * @param tunnel_ctx the socket
1413  * @param sender who sent the message
1414  * @param message the actual message
1415  * @param atsi performance data for the connection
1416  * @return GNUNET_OK to keep the connection open,
1417  *         GNUNET_SYSERR to close it (signal serious error)
1418  */
1419 static int
1420 server_handle_reset (void *cls,
1421                      struct GNUNET_MESH_Tunnel *tunnel,
1422                      void **tunnel_ctx,
1423                      const struct GNUNET_PeerIdentity *sender,
1424                      const struct GNUNET_MessageHeader *message,
1425                      const struct GNUNET_ATS_Information*atsi)
1426 {
1427   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1428
1429   return GNUNET_OK;
1430 }
1431
1432
1433 /**
1434  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1435  *
1436  * @param cls the closure
1437  * @param tunnel connection to the other end
1438  * @param tunnel_ctx the socket
1439  * @param sender who sent the message
1440  * @param message the actual message
1441  * @param atsi performance data for the connection
1442  * @return GNUNET_OK to keep the connection open,
1443  *         GNUNET_SYSERR to close it (signal serious error)
1444  */
1445 static int
1446 server_handle_transmit_close (void *cls,
1447                               struct GNUNET_MESH_Tunnel *tunnel,
1448                               void **tunnel_ctx,
1449                               const struct GNUNET_PeerIdentity *sender,
1450                               const struct GNUNET_MessageHeader *message,
1451                               const struct GNUNET_ATS_Information*atsi)
1452 {
1453   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1454
1455   return handle_transmit_close (socket,
1456                                 tunnel,
1457                                 sender,
1458                                 (struct GNUNET_STREAM_MessageHeader *)message,
1459                                 atsi);
1460 }
1461
1462
1463 /**
1464  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1465  *
1466  * @param cls the closure
1467  * @param tunnel connection to the other end
1468  * @param tunnel_ctx the socket
1469  * @param sender who sent the message
1470  * @param message the actual message
1471  * @param atsi performance data for the connection
1472  * @return GNUNET_OK to keep the connection open,
1473  *         GNUNET_SYSERR to close it (signal serious error)
1474  */
1475 static int
1476 server_handle_transmit_close_ack (void *cls,
1477                                   struct GNUNET_MESH_Tunnel *tunnel,
1478                                   void **tunnel_ctx,
1479                                   const struct GNUNET_PeerIdentity *sender,
1480                                   const struct GNUNET_MessageHeader *message,
1481                                   const struct GNUNET_ATS_Information*atsi)
1482 {
1483   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1484
1485   return GNUNET_OK;
1486 }
1487
1488
1489 /**
1490  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1491  *
1492  * @param cls the closure
1493  * @param tunnel connection to the other end
1494  * @param tunnel_ctx the socket
1495  * @param sender who sent the message
1496  * @param message the actual message
1497  * @param atsi performance data for the connection
1498  * @return GNUNET_OK to keep the connection open,
1499  *         GNUNET_SYSERR to close it (signal serious error)
1500  */
1501 static int
1502 server_handle_receive_close (void *cls,
1503                              struct GNUNET_MESH_Tunnel *tunnel,
1504                              void **tunnel_ctx,
1505                              const struct GNUNET_PeerIdentity *sender,
1506                              const struct GNUNET_MessageHeader *message,
1507                              const struct GNUNET_ATS_Information*atsi)
1508 {
1509   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1510
1511   return GNUNET_OK;
1512 }
1513
1514
1515 /**
1516  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1517  *
1518  * @param cls the closure
1519  * @param tunnel connection to the other end
1520  * @param tunnel_ctx the socket
1521  * @param sender who sent the message
1522  * @param message the actual message
1523  * @param atsi performance data for the connection
1524  * @return GNUNET_OK to keep the connection open,
1525  *         GNUNET_SYSERR to close it (signal serious error)
1526  */
1527 static int
1528 server_handle_receive_close_ack (void *cls,
1529                                  struct GNUNET_MESH_Tunnel *tunnel,
1530                                  void **tunnel_ctx,
1531                                  const struct GNUNET_PeerIdentity *sender,
1532                                  const struct GNUNET_MessageHeader *message,
1533                                  const struct GNUNET_ATS_Information*atsi)
1534 {
1535   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1536
1537   return GNUNET_OK;
1538 }
1539
1540
1541 /**
1542  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1543  *
1544  * @param cls the closure
1545  * @param tunnel connection to the other end
1546  * @param tunnel_ctx the socket
1547  * @param sender who sent the message
1548  * @param message the actual message
1549  * @param atsi performance data for the connection
1550  * @return GNUNET_OK to keep the connection open,
1551  *         GNUNET_SYSERR to close it (signal serious error)
1552  */
1553 static int
1554 server_handle_close (void *cls,
1555                      struct GNUNET_MESH_Tunnel *tunnel,
1556                      void **tunnel_ctx,
1557                      const struct GNUNET_PeerIdentity *sender,
1558                      const struct GNUNET_MessageHeader *message,
1559                      const struct GNUNET_ATS_Information*atsi)
1560 {
1561   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1562
1563   return GNUNET_OK;
1564 }
1565
1566
1567 /**
1568  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1569  *
1570  * @param cls the closure
1571  * @param tunnel connection to the other end
1572  * @param tunnel_ctx the socket
1573  * @param sender who sent the message
1574  * @param message the actual message
1575  * @param atsi performance data for the connection
1576  * @return GNUNET_OK to keep the connection open,
1577  *         GNUNET_SYSERR to close it (signal serious error)
1578  */
1579 static int
1580 server_handle_close_ack (void *cls,
1581                          struct GNUNET_MESH_Tunnel *tunnel,
1582                          void **tunnel_ctx,
1583                          const struct GNUNET_PeerIdentity *sender,
1584                          const struct GNUNET_MessageHeader *message,
1585                          const struct GNUNET_ATS_Information*atsi)
1586 {
1587   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1588
1589   return GNUNET_OK;
1590 }
1591
1592
1593 /**
1594  * Message Handler for mesh
1595  *
1596  * @param socket the socket through which the ack was received
1597  * @param tunnel connection to the other end
1598  * @param sender who sent the message
1599  * @param ack the acknowledgment message
1600  * @param atsi performance data for the connection
1601  * @return GNUNET_OK to keep the connection open,
1602  *         GNUNET_SYSERR to close it (signal serious error)
1603  */
1604 static int
1605 handle_ack (struct GNUNET_STREAM_Socket *socket,
1606             struct GNUNET_MESH_Tunnel *tunnel,
1607             const struct GNUNET_PeerIdentity *sender,
1608             const struct GNUNET_STREAM_AckMessage *ack,
1609             const struct GNUNET_ATS_Information*atsi)
1610 {
1611   switch (socket->state)
1612     {
1613     case (STATE_ESTABLISHED):
1614       if (NULL == socket->write_handle)
1615         {
1616           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1617                       "Received DATA ACK when write_handle is NULL\n");
1618           return GNUNET_OK;
1619         }
1620
1621       socket->write_handle->ack_bitmap = GNUNET_ntohll (ack->bitmap);
1622       socket->receive_window_available = 
1623         ntohl (ack->receive_window_remaining);
1624       write_data (socket);
1625       break;
1626     default:
1627       break;
1628     }
1629   return GNUNET_OK;
1630 }
1631
1632
1633 /**
1634  * Message Handler for mesh
1635  *
1636  * @param cls the 'struct GNUNET_STREAM_Socket'
1637  * @param tunnel connection to the other end
1638  * @param tunnel_ctx unused
1639  * @param sender who sent the message
1640  * @param message the actual message
1641  * @param atsi performance data for the connection
1642  * @return GNUNET_OK to keep the connection open,
1643  *         GNUNET_SYSERR to close it (signal serious error)
1644  */
1645 static int
1646 client_handle_ack (void *cls,
1647                    struct GNUNET_MESH_Tunnel *tunnel,
1648                    void **tunnel_ctx,
1649                    const struct GNUNET_PeerIdentity *sender,
1650                    const struct GNUNET_MessageHeader *message,
1651                    const struct GNUNET_ATS_Information*atsi)
1652 {
1653   struct GNUNET_STREAM_Socket *socket = cls;
1654   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
1655  
1656   return handle_ack (socket, tunnel, sender, ack, atsi);
1657 }
1658
1659
1660 /**
1661  * Message Handler for mesh
1662  *
1663  * @param cls the server's listen socket
1664  * @param tunnel connection to the other end
1665  * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
1666  * @param sender who sent the message
1667  * @param message the actual message
1668  * @param atsi performance data for the connection
1669  * @return GNUNET_OK to keep the connection open,
1670  *         GNUNET_SYSERR to close it (signal serious error)
1671  */
1672 static int
1673 server_handle_ack (void *cls,
1674                    struct GNUNET_MESH_Tunnel *tunnel,
1675                    void **tunnel_ctx,
1676                    const struct GNUNET_PeerIdentity *sender,
1677                    const struct GNUNET_MessageHeader *message,
1678                    const struct GNUNET_ATS_Information*atsi)
1679 {
1680   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1681   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
1682  
1683   return handle_ack (socket, tunnel, sender, ack, atsi);
1684 }
1685
1686
1687 /**
1688  * For client message handlers, the stream socket is in the
1689  * closure argument.
1690  */
1691 static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
1692   {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
1693   {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
1694    sizeof (struct GNUNET_STREAM_AckMessage) },
1695   {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
1696    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
1697   {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
1698    sizeof (struct GNUNET_STREAM_MessageHeader)},
1699   {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
1700    sizeof (struct GNUNET_STREAM_MessageHeader)},
1701   {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
1702    sizeof (struct GNUNET_STREAM_MessageHeader)},
1703   {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
1704    sizeof (struct GNUNET_STREAM_MessageHeader)},
1705   {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
1706    sizeof (struct GNUNET_STREAM_MessageHeader)},
1707   {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
1708    sizeof (struct GNUNET_STREAM_MessageHeader)},
1709   {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
1710    sizeof (struct GNUNET_STREAM_MessageHeader)},
1711   {NULL, 0, 0}
1712 };
1713
1714
1715 /**
1716  * For server message handlers, the stream socket is in the
1717  * tunnel context, and the listen socket in the closure argument.
1718  */
1719 static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
1720   {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
1721   {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
1722    sizeof (struct GNUNET_STREAM_AckMessage) },
1723   {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 
1724    sizeof (struct GNUNET_STREAM_MessageHeader)},
1725   {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
1726    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
1727   {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
1728    sizeof (struct GNUNET_STREAM_MessageHeader)},
1729   {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
1730    sizeof (struct GNUNET_STREAM_MessageHeader)},
1731   {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
1732    sizeof (struct GNUNET_STREAM_MessageHeader)},
1733   {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
1734    sizeof (struct GNUNET_STREAM_MessageHeader)},
1735   {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
1736    sizeof (struct GNUNET_STREAM_MessageHeader)},
1737   {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
1738    sizeof (struct GNUNET_STREAM_MessageHeader)},
1739   {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
1740    sizeof (struct GNUNET_STREAM_MessageHeader)},
1741   {NULL, 0, 0}
1742 };
1743
1744
1745 /**
1746  * Function called when our target peer is connected to our tunnel
1747  *
1748  * @param cls the socket for which this tunnel is created
1749  * @param peer the peer identity of the target
1750  * @param atsi performance data for the connection
1751  */
1752 static void
1753 mesh_peer_connect_callback (void *cls,
1754                             const struct GNUNET_PeerIdentity *peer,
1755                             const struct GNUNET_ATS_Information * atsi)
1756 {
1757   struct GNUNET_STREAM_Socket *socket = cls;
1758   struct GNUNET_STREAM_MessageHeader *message;
1759
1760   if (0 != memcmp (&socket->other_peer, 
1761                    peer, 
1762                    sizeof (struct GNUNET_PeerIdentity)))
1763     {
1764       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1765                   "A peer (%s) which is not our target has connected to our tunnel", 
1766                   GNUNET_i2s (peer));
1767       return;
1768     }
1769   
1770   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1771               "Target peer %s connected\n", GNUNET_i2s (peer));
1772   
1773   /* Set state to INIT */
1774   socket->state = STATE_INIT;
1775
1776   /* Send HELLO message */
1777   message = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1778   message->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
1779   message->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1780   queue_message (socket,
1781                  message,
1782                  &set_state_hello_wait,
1783                  NULL);
1784
1785   /* Call open callback */
1786   if (NULL == socket->open_cb)
1787     {
1788       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1789                   "STREAM_open callback is NULL\n");
1790     }
1791   else
1792     {
1793       socket->open_cb (socket->open_cls, socket);
1794     }
1795 }
1796
1797
1798 /**
1799  * Function called when our target peer is disconnected from our tunnel
1800  *
1801  * @param cls the socket associated which this tunnel
1802  * @param peer the peer identity of the target
1803  */
1804 static void
1805 mesh_peer_disconnect_callback (void *cls,
1806                                const struct GNUNET_PeerIdentity *peer)
1807 {
1808
1809 }
1810
1811
1812 /*****************/
1813 /* API functions */
1814 /*****************/
1815
1816
1817 /**
1818  * Tries to open a stream to the target peer
1819  *
1820  * @param cfg configuration to use
1821  * @param target the target peer to which the stream has to be opened
1822  * @param app_port the application port number which uniquely identifies this
1823  *            stream
1824  * @param open_cb this function will be called after stream has be established 
1825  * @param open_cb_cls the closure for open_cb
1826  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
1827  * @return if successful it returns the stream socket; NULL if stream cannot be
1828  *         opened 
1829  */
1830 struct GNUNET_STREAM_Socket *
1831 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
1832                     const struct GNUNET_PeerIdentity *target,
1833                     GNUNET_MESH_ApplicationType app_port,
1834                     GNUNET_STREAM_OpenCallback open_cb,
1835                     void *open_cb_cls,
1836                     ...)
1837 {
1838   struct GNUNET_STREAM_Socket *socket;
1839   enum GNUNET_STREAM_Option option;
1840   va_list vargs;                /* Variable arguments */
1841
1842   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
1843   socket->other_peer = *target;
1844   socket->open_cb = open_cb;
1845   socket->open_cls = open_cb_cls;
1846
1847   /* Set defaults */
1848   socket->retransmit_timeout = 
1849     GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
1850
1851   va_start (vargs, open_cb_cls); /* Parse variable args */
1852   do {
1853     option = va_arg (vargs, enum GNUNET_STREAM_Option);
1854     switch (option)
1855       {
1856       case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
1857         /* Expect struct GNUNET_TIME_Relative */
1858         socket->retransmit_timeout = va_arg (vargs,
1859                                              struct GNUNET_TIME_Relative);
1860         break;
1861       case GNUNET_STREAM_OPTION_END:
1862         break;
1863       }
1864   } while (GNUNET_STREAM_OPTION_END != option);
1865   va_end (vargs);               /* End of variable args parsing */
1866
1867   socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
1868                                       1,  /* QUEUE size as parameter? */
1869                                       socket, /* cls */
1870                                       NULL, /* No inbound tunnel handler */
1871                                       NULL, /* No inbound tunnel cleaner */
1872                                       client_message_handlers,
1873                                       NULL); /* We don't get inbound tunnels */
1874   // FIXME: if (NULL == socket->mesh) ...
1875
1876   /* Now create the mesh tunnel to target */
1877   socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
1878                                               NULL, /* Tunnel context */
1879                                               &mesh_peer_connect_callback,
1880                                               &mesh_peer_disconnect_callback,
1881                                               socket);
1882   // FIXME: if (NULL == socket->tunnel) ...
1883
1884   return socket;
1885 }
1886
1887
1888 /**
1889  * Closes the stream
1890  *
1891  * @param socket the stream socket
1892  */
1893 void
1894 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
1895 {
1896   struct MessageQueue *head;
1897
1898   /* Clear Transmit handles */
1899   if (NULL != socket->transmit_handle)
1900     {
1901       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
1902       socket->transmit_handle = NULL;
1903     }
1904
1905   /* Clear existing message queue */
1906   while (NULL != (head = socket->queue_head)) {
1907     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
1908                                  socket->queue_tail,
1909                                  head);
1910     GNUNET_free (head->message);
1911     GNUNET_free (head);
1912   }
1913
1914   /* Close associated tunnel */
1915   if (NULL != socket->tunnel)
1916     {
1917       GNUNET_MESH_tunnel_destroy (socket->tunnel);
1918       socket->tunnel = NULL;
1919     }
1920
1921   /* Close mesh connection */
1922   if (NULL != socket->mesh)
1923     {
1924       GNUNET_MESH_disconnect (socket->mesh);
1925       socket->mesh = NULL;
1926     }
1927   
1928   /* Release receive buffer */
1929   if (NULL != socket->receive_buffer)
1930     {
1931       GNUNET_free (socket->receive_buffer);
1932     }
1933
1934   GNUNET_free (socket);
1935 }
1936
1937
1938 /**
1939  * Method called whenever a peer creates a tunnel to us
1940  *
1941  * @param cls closure
1942  * @param tunnel new handle to the tunnel
1943  * @param initiator peer that started the tunnel
1944  * @param atsi performance information for the tunnel
1945  * @return initial tunnel context for the tunnel
1946  *         (can be NULL -- that's not an error)
1947  */
1948 static void *
1949 new_tunnel_notify (void *cls,
1950                    struct GNUNET_MESH_Tunnel *tunnel,
1951                    const struct GNUNET_PeerIdentity *initiator,
1952                    const struct GNUNET_ATS_Information *atsi)
1953 {
1954   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
1955   struct GNUNET_STREAM_Socket *socket;
1956
1957   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
1958   socket->tunnel = tunnel;
1959   socket->session_id = 0;       /* FIXME */
1960   socket->other_peer = *initiator;
1961   socket->state = STATE_INIT;
1962
1963   if (GNUNET_SYSERR == lsocket->listen_cb (lsocket->listen_cb_cls,
1964                                            socket,
1965                                            &socket->other_peer))
1966     {
1967       socket->state = STATE_CLOSED;
1968       /* FIXME: Send CLOSE message and then free */
1969       GNUNET_free (socket);
1970       GNUNET_MESH_tunnel_destroy (tunnel); /* Destroy the tunnel */
1971     }
1972   return socket;
1973 }
1974
1975
1976 /**
1977  * Function called whenever an inbound tunnel is destroyed.  Should clean up
1978  * any associated state.  This function is NOT called if the client has
1979  * explicitly asked for the tunnel to be destroyed using
1980  * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
1981  * the tunnel.
1982  *
1983  * @param cls closure (set from GNUNET_MESH_connect)
1984  * @param tunnel connection to the other end (henceforth invalid)
1985  * @param tunnel_ctx place where local state associated
1986  *                   with the tunnel is stored
1987  */
1988 static void 
1989 tunnel_cleaner (void *cls,
1990                 const struct GNUNET_MESH_Tunnel *tunnel,
1991                 void *tunnel_ctx)
1992 {
1993   struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
1994   
1995   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1996               "Peer %s has terminated connection abruptly\n",
1997               GNUNET_i2s (&socket->other_peer));
1998
1999   socket->status = GNUNET_STREAM_SHUTDOWN;
2000   /* Clear Transmit handles */
2001   if (NULL != socket->transmit_handle)
2002     {
2003       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2004       socket->transmit_handle = NULL;
2005     }
2006   socket->tunnel = NULL;
2007 }
2008
2009
2010 /**
2011  * Listens for stream connections for a specific application ports
2012  *
2013  * @param cfg the configuration to use
2014  * @param app_port the application port for which new streams will be accepted
2015  * @param listen_cb this function will be called when a peer tries to establish
2016  *            a stream with us
2017  * @param listen_cb_cls closure for listen_cb
2018  * @return listen socket, NULL for any error
2019  */
2020 struct GNUNET_STREAM_ListenSocket *
2021 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
2022                       GNUNET_MESH_ApplicationType app_port,
2023                       GNUNET_STREAM_ListenCallback listen_cb,
2024                       void *listen_cb_cls)
2025 {
2026   /* FIXME: Add variable args for passing configration options? */
2027   struct GNUNET_STREAM_ListenSocket *lsocket;
2028   GNUNET_MESH_ApplicationType app_types[2];
2029
2030   app_types[0] = app_port;
2031   app_types[1] = 0;
2032   lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
2033   lsocket->port = app_port;
2034   lsocket->listen_cb = listen_cb;
2035   lsocket->listen_cb_cls = listen_cb_cls;
2036   lsocket->mesh = GNUNET_MESH_connect (cfg,
2037                                        10, /* FIXME: QUEUE size as parameter? */
2038                                        lsocket, /* Closure */
2039                                        &new_tunnel_notify,
2040                                        &tunnel_cleaner,
2041                                        server_message_handlers,
2042                                        app_types);
2043   return lsocket;
2044 }
2045
2046
2047 /**
2048  * Closes the listen socket
2049  *
2050  * @param lsocket the listen socket
2051  */
2052 void
2053 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
2054 {
2055   /* Close MESH connection */
2056   GNUNET_MESH_disconnect (lsocket->mesh);
2057   
2058   GNUNET_free (lsocket);
2059 }
2060
2061
2062 /**
2063  * Tries to write the given data to the stream
2064  *
2065  * @param socket the socket representing a stream
2066  * @param data the data buffer from where the data is written into the stream
2067  * @param size the number of bytes to be written from the data buffer
2068  * @param timeout the timeout period
2069  * @param write_cont the function to call upon writing some bytes into the stream
2070  * @param write_cont_cls the closure
2071  * @return handle to cancel the operation
2072  */
2073 struct GNUNET_STREAM_IOWriteHandle *
2074 GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
2075                      const void *data,
2076                      size_t size,
2077                      struct GNUNET_TIME_Relative timeout,
2078                      GNUNET_STREAM_CompletionContinuation write_cont,
2079                      void *write_cont_cls)
2080 {
2081   unsigned int num_needed_packets;
2082   unsigned int packet;
2083   struct GNUNET_STREAM_IOWriteHandle *io_handle;
2084   uint32_t packet_size;
2085   uint32_t payload_size;
2086   struct GNUNET_STREAM_DataMessage *data_msg;
2087   const void *sweep;
2088
2089   /* Return NULL if there is already a write request pending */
2090   if (NULL != socket->write_handle)
2091   {
2092     GNUNET_break (0);
2093     return NULL;
2094   }
2095   if (!((STATE_ESTABLISHED == socket->state)
2096         || (STATE_RECEIVE_CLOSE_WAIT == socket->state)
2097         || (STATE_RECEIVE_CLOSED == socket->state)))
2098     {
2099       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2100                   "Attempting to write on a closed (OR) not-yet-established"
2101                   "stream\n"); 
2102       return NULL;
2103     } 
2104   if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size)
2105     size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH  * max_payload_size;
2106   num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size;
2107   io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
2108   sweep = data;
2109   /* Divide the given buffer into packets for sending */
2110   for (packet=0; packet < num_needed_packets; packet++)
2111     {
2112       if ((packet + 1) * max_payload_size < size) 
2113         {
2114           payload_size = max_payload_size;
2115           packet_size = MAX_PACKET_SIZE;
2116         }
2117       else 
2118         {
2119           payload_size = size - packet * max_payload_size;
2120           packet_size =  payload_size + sizeof (struct
2121                                                 GNUNET_STREAM_DataMessage); 
2122         }
2123       io_handle->messages[packet] = GNUNET_malloc (packet_size);
2124       io_handle->messages[packet]->header.header.size = htons (packet_size);
2125       io_handle->messages[packet]->header.header.type =
2126         htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
2127       io_handle->messages[packet]->sequence_number =
2128         htons (socket->write_sequence_number++);
2129       io_handle->messages[packet]->offset = htons (socket->write_offset);
2130
2131       /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
2132          determined from RTT */
2133       io_handle->messages[packet]->ack_deadline = 
2134         GNUNET_TIME_relative_hton (GNUNET_TIME_relative_multiply 
2135                                    (GNUNET_TIME_UNIT_SECONDS, 5));
2136       data_msg = io_handle->messages[packet];
2137       /* Copy data from given buffer to the packet */
2138       memcpy (&data_msg[1],
2139               sweep,
2140               payload_size);
2141       sweep += payload_size;
2142       socket->write_offset += payload_size;
2143     }
2144   socket->write_handle = io_handle;
2145   write_data (socket);
2146
2147   return io_handle;
2148 }
2149
2150
2151 /**
2152  * Tries to read data from the stream
2153  *
2154  * @param socket the socket representing a stream
2155  * @param timeout the timeout period
2156  * @param proc function to call with data (once only)
2157  * @param proc_cls the closure for proc
2158  * @return handle to cancel the operation
2159  */
2160 struct GNUNET_STREAM_IOReadHandle *
2161 GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
2162                     struct GNUNET_TIME_Relative timeout,
2163                     GNUNET_STREAM_DataProcessor proc,
2164                     void *proc_cls)
2165 {
2166   struct GNUNET_STREAM_IOReadHandle *read_handle;
2167   
2168   /* Return NULL if there is already a read handle; the user has to cancel that
2169   first before continuing or has to wait until it is completed */
2170   if (NULL != socket->read_handle) return NULL;
2171
2172   read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
2173   read_handle->proc = proc;
2174   socket->read_handle = read_handle;
2175
2176   /* if previous copy buffer is still not read call the data processor on it */
2177   if (NULL != socket->copy_buffer)
2178     GNUNET_SCHEDULER_add_continuation (&call_read_processor_task,
2179                                        socket,
2180                                        GNUNET_SCHEDULER_REASON_PREREQ_DONE);
2181   else
2182     prepare_buffer_for_read (socket);
2183
2184   socket->read_io_timeout_task = GNUNET_SCHEDULER_add_delayed (timeout,
2185                                                                &cancel_read_io,
2186                                                                socket);
2187
2188   return read_handle;
2189 }