-added prepare_buffer_for_read
[oweals/gnunet.git] / src / stream / stream_api.c
1 /*
2   This file is part of GNUnet.
3   (C) 2012 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file stream/stream_api.c
23  * @brief Implementation of the stream library
24  * @author Sree Harsha Totakura
25  */
26 #include "platform.h"
27 #include "gnunet_common.h"
28 #include "gnunet_crypto_lib.h"
29 #include "gnunet_stream_lib.h"
30 #include "stream_protocol.h"
31
32
33 /**
34  * The maximum packet size of a stream packet
35  */
36 #define MAX_PACKET_SIZE 64000
37
38 /**
39  * The maximum payload a data message packet can carry
40  */
41 static size_t max_payload_size = 
42   MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
43
44 /**
45  * Receive buffer
46  */
47 #define RECEIVE_BUFFER_SIZE 4096000
48
49 /**
50  * states in the Protocol
51  */
52 enum State
53   {
54     /**
55      * Client initialization state
56      */
57     STATE_INIT,
58
59     /**
60      * Listener initialization state 
61      */
62     STATE_LISTEN,
63
64     /**
65      * Pre-connection establishment state
66      */
67     STATE_HELLO_WAIT,
68
69     /**
70      * State where a connection has been established
71      */
72     STATE_ESTABLISHED,
73
74     /**
75      * State where the socket is closed on our side and waiting to be ACK'ed
76      */
77     STATE_RECEIVE_CLOSE_WAIT,
78
79     /**
80      * State where the socket is closed for reading
81      */
82     STATE_RECEIVE_CLOSED,
83
84     /**
85      * State where the socket is closed on our side and waiting to be ACK'ed
86      */
87     STATE_TRANSMIT_CLOSE_WAIT,
88
89     /**
90      * State where the socket is closed for writing
91      */
92     STATE_TRANSMIT_CLOSED,
93
94     /**
95      * State where the socket is closed on our side and waiting to be ACK'ed
96      */
97     STATE_CLOSE_WAIT,
98
99     /**
100      * State where the socket is closed
101      */
102     STATE_CLOSED 
103   };
104
105
106 /**
107  * Functions of this type are called when a message is written
108  *
109  * @param cls the closure from queue_message
110  * @param socket the socket the written message was bound to
111  */
112 typedef void (*SendFinishCallback) (void *cls,
113                                     struct GNUNET_STREAM_Socket *socket);
114
115
116 /**
117  * The send message queue
118  */
119 struct MessageQueue
120 {
121   /**
122    * The message
123    */
124   struct GNUNET_STREAM_MessageHeader *message;
125
126   /**
127    * Callback to be called when the message is sent
128    */
129   SendFinishCallback finish_cb;
130
131   /**
132    * The closure for finish_cb
133    */
134   void *finish_cb_cls;
135
136   /**
137    * The next message in queue. Should be NULL in the last message
138    */
139   struct MessageQueue *next;
140
141   /**
142    * The next message in queue. Should be NULL in the first message
143    */
144   struct MessageQueue *prev;
145 };
146
147
148 /**
149  * The STREAM Socket Handler
150  */
151 struct GNUNET_STREAM_Socket
152 {
153
154   /**
155    * The peer identity of the peer at the other end of the stream
156    */
157   struct GNUNET_PeerIdentity other_peer;
158
159   /**
160    * Retransmission timeout
161    */
162   struct GNUNET_TIME_Relative retransmit_timeout;
163
164   /**
165    * The Acknowledgement Bitmap
166    */
167   GNUNET_STREAM_AckBitmap ack_bitmap;
168
169   /**
170    * Time when the Acknowledgement was queued
171    */
172   struct GNUNET_TIME_Absolute ack_time_registered;
173
174   /**
175    * Queued Acknowledgement deadline
176    */
177   struct GNUNET_TIME_Relative ack_time_deadline;
178
179   /**
180    * The task for sending timely Acks
181    */
182   GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
183
184   /**
185    * The mesh handle
186    */
187   struct GNUNET_MESH_Handle *mesh;
188
189   /**
190    * The mesh tunnel handle
191    */
192   struct GNUNET_MESH_Tunnel *tunnel;
193
194   /**
195    * Stream open closure
196    */
197   void *open_cls;
198
199   /**
200    * Stream open callback
201    */
202   GNUNET_STREAM_OpenCallback open_cb;
203
204   /**
205    * The current transmit handle (if a pending transmit request exists)
206    */
207   struct GNUNET_MESH_TransmitHandle *transmit_handle;
208
209   /**
210    * The current message associated with the transmit handle
211    */
212   struct MessageQueue *queue_head;
213
214   /**
215    * The queue tail, should always point to the last message in queue
216    */
217   struct MessageQueue *queue_tail;
218
219   /**
220    * The write IO_handle associated with this socket
221    */
222   struct GNUNET_STREAM_IOWriteHandle *write_handle;
223
224   /**
225    * The read IO_handle associated with this socket
226    */
227   struct GNUNET_STREAM_IOReadHandle *read_handle;
228
229   /**
230    * Buffer for storing received messages
231    */
232   void *receive_buffer;
233
234   /**
235    * Copy buffer pointer; Used during read operations
236    */
237   void *copy_buffer;
238
239   /**
240    * The state of the protocol associated with this socket
241    */
242   enum State state;
243
244   /**
245    * The status of the socket
246    */
247   enum GNUNET_STREAM_Status status;
248
249   /**
250    * The number of previous timeouts; FIXME: currently not used
251    */
252   unsigned int retries;
253
254   /**
255    * The session id associated with this stream connection
256    * FIXME: Not used currently, may be removed
257    */
258   uint32_t session_id;
259
260   /**
261    * Write sequence number. Set to random when sending HELLO(client) and
262    * HELLO_ACK(server) 
263    */
264   uint32_t write_sequence_number;
265
266   /**
267    * Read sequence number. This number's value is determined during handshake
268    */
269   uint32_t read_sequence_number;
270
271   /**
272    * The receiver buffer size
273    */
274   uint32_t receive_buffer_size;
275
276   /**
277    * The receiver buffer boundaries
278    */
279   uint32_t receive_buffer_boundaries[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
280
281   /**
282    * receiver's available buffer after the last acknowledged packet
283    */
284   uint32_t receive_window_available;
285
286   /**
287    * The offset pointer used during write operation
288    */
289   uint32_t write_offset;
290
291   /**
292    * The offset after which we are expecting data
293    */
294   uint32_t read_offset;
295
296   /**
297    * The size of the copy buffer
298    */
299   uint32_t copy_buffer_size;
300   
301   /**
302    * The read offset of copy buffer
303    */
304   uint32_t copy_buffer_read_offset;
305 };
306
307
308 /**
309  * A socket for listening
310  */
311 struct GNUNET_STREAM_ListenSocket
312 {
313
314   /**
315    * The mesh handle
316    */
317   struct GNUNET_MESH_Handle *mesh;
318
319   /**
320    * The callback function which is called after successful opening socket
321    */
322   GNUNET_STREAM_ListenCallback listen_cb;
323
324   /**
325    * The call back closure
326    */
327   void *listen_cb_cls;
328
329   /**
330    * The service port
331    */
332   GNUNET_MESH_ApplicationType port;
333 };
334
335
336 /**
337  * The IO Write Handle
338  */
339 struct GNUNET_STREAM_IOWriteHandle
340 {
341   /**
342    * The packet_buffers associated with this Handle
343    */
344   struct GNUNET_STREAM_DataMessage *messages[64];
345
346   /**
347    * The bitmap of this IOHandle; Corresponding bit for a message is set when
348    * it has been acknowledged by the receiver
349    */
350   GNUNET_STREAM_AckBitmap ack_bitmap;
351
352   /**
353    * Number of packets sent before waiting for an ack
354    *
355    * FIXME: Do we need this?
356    */
357   unsigned int sent_packets;
358 };
359
360
361 /**
362  * The IO Read Handle
363  */
364 struct GNUNET_STREAM_IOReadHandle
365 {
366   /**
367    * Callback for the read processor
368    */
369   GNUNET_STREAM_DataProcessor proc;
370
371   /**
372    * The closure pointer for the read processor callback
373    */
374   void *proc_cls;
375 };
376
377
378 /**
379  * Default value in seconds for various timeouts
380  */
381 static unsigned int default_timeout = 300;
382
383
384 /**
385  * Callback function for sending hello message
386  *
387  * @param cls closure the socket
388  * @param size number of bytes available in buf
389  * @param buf where the callee should write the message
390  * @return number of bytes written to buf
391  */
392 static size_t
393 send_message_notify (void *cls, size_t size, void *buf)
394 {
395   struct GNUNET_STREAM_Socket *socket = cls;
396   struct MessageQueue *head;
397   size_t ret;
398
399   socket->transmit_handle = NULL; /* Remove the transmit handle */
400   head = socket->queue_head;
401   if (NULL == head)
402     return 0; /* just to be safe */
403   if (0 == size)                /* request timed out */
404     {
405       socket->retries++;
406       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
407                   "Message sending timed out. Retry %d \n",
408                   socket->retries);
409       socket->transmit_handle = 
410         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
411                                            0, /* Corking */
412                                            1, /* Priority */
413                                            /* FIXME: exponential backoff */
414                                            socket->retransmit_timeout,
415                                            &socket->other_peer,
416                                            ntohs (head->message->header.size),
417                                            &send_message_notify,
418                                            socket);
419       return 0;
420     }
421
422   ret = ntohs (head->message->header.size);
423   GNUNET_assert (size >= ret);
424   memcpy (buf, head->message, ret);
425   if (NULL != head->finish_cb)
426     {
427       head->finish_cb (socket, head->finish_cb_cls);
428     }
429   GNUNET_CONTAINER_DLL_remove (socket->queue_head,
430                                socket->queue_tail,
431                                head);
432   GNUNET_free (head->message);
433   GNUNET_free (head);
434   head = socket->queue_head;
435   if (NULL != head)    /* more pending messages to send */
436     {
437       socket->retries = 0;
438       socket->transmit_handle = 
439         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
440                                            0, /* Corking */
441                                            1, /* Priority */
442                                            /* FIXME: exponential backoff */
443                                            socket->retransmit_timeout,
444                                            &socket->other_peer,
445                                            ntohs (head->message->header.size),
446                                            &send_message_notify,
447                                            socket);
448     }
449   return ret;
450 }
451
452
453 /**
454  * Queues a message for sending using the mesh connection of a socket
455  *
456  * @param socket the socket whose mesh connection is used
457  * @param message the message to be sent
458  * @param finish_cb the callback to be called when the message is sent
459  * @param finish_cb_cls the closure for the callback
460  */
461 static void
462 queue_message (struct GNUNET_STREAM_Socket *socket,
463                struct GNUNET_STREAM_MessageHeader *message,
464                SendFinishCallback finish_cb,
465                void *finish_cb_cls)
466 {
467   struct MessageQueue *queue_entity;
468
469   queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
470   queue_entity->message = message;
471   queue_entity->finish_cb = finish_cb;
472   queue_entity->finish_cb_cls = finish_cb_cls;
473   GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
474                                     socket->queue_tail,
475                                     queue_entity);
476   if (NULL == socket->transmit_handle)
477   {
478     socket->retries = 0;
479     socket->transmit_handle = 
480       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
481                                          0, /* Corking */
482                                          1, /* Priority */
483                                          socket->retransmit_timeout,
484                                          &socket->other_peer,
485                                          ntohs (message->header.size),
486                                          &send_message_notify,
487                                          socket);
488   }
489 }
490
491
492 /**
493  * Callback function for sending ack message
494  *
495  * @param cls closure the ACK message created in ack_task
496  * @param size number of bytes available in buffer
497  * @param buf where the callee should write the message
498  * @return number of bytes written to buf
499  */
500 static size_t
501 send_ack_notify (void *cls, size_t size, void *buf)
502 {
503   struct GNUNET_STREAM_AckMessage *ack_msg = cls;
504
505   if (0 == size)
506     {
507       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
508                   "%s called with size 0\n", __func__);
509       return 0;
510     }
511   GNUNET_assert (ack_msg->header.header.size <= size);
512   
513   size = ack_msg->header.header.size;
514   memcpy (buf, ack_msg, size);
515   return size;
516 }
517
518
519 /**
520  * Task for sending ACK message
521  *
522  * @param cls the socket
523  * @param tc the Task context
524  */
525 static void
526 ack_task (void *cls,
527           const struct GNUNET_SCHEDULER_TaskContext *tc)
528 {
529   struct GNUNET_STREAM_Socket *socket = cls;
530   struct GNUNET_STREAM_AckMessage *ack_msg;
531
532   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
533     {
534       return;
535     }
536
537   socket->ack_task_id = 0;
538
539   /* Create the ACK Message */
540   ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
541   ack_msg->header.header.size = htons (sizeof (struct 
542                                                GNUNET_STREAM_AckMessage));
543   ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
544   ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
545   ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
546   ack_msg->receive_window_remaining = 
547     htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
548
549   /* Request MESH for sending ACK */
550   GNUNET_MESH_notify_transmit_ready (socket->tunnel,
551                                      0, /* Corking */
552                                      1, /* Priority */
553                                      socket->retransmit_timeout,
554                                      &socket->other_peer,
555                                      ntohs (ack_msg->header.header.size),
556                                      &send_ack_notify,
557                                      ack_msg);
558
559   
560 }
561
562
563 /**
564  * Function to modify a bit in GNUNET_STREAM_AckBitmap
565  *
566  * @param bitmap the bitmap to modify
567  * @param bit the bit number to modify
568  * @param value GNUNET_YES to on, GNUNET_NO to off
569  */
570 static void
571 ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
572                       unsigned int bit, 
573                       int value)
574 {
575   GNUNET_assert (bit < 64);
576   if (GNUNET_YES == value)
577     *bitmap |= (1LL << bit);
578   else
579     *bitmap &= ~(1LL << bit);
580 }
581
582
583 /**
584  * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
585  *
586  * @param bitmap address of the bitmap that has to be checked
587  * @param bit the bit number to check
588  * @return GNUNET_YES if the bit is set; GNUNET_NO if not
589  */
590 static uint8_t
591 ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
592                       unsigned int bit)
593 {
594   GNUNET_assert (bit < 64);
595   return 0 != (*bitmap & (1LL << bit));
596 }
597
598
599
600 /**
601  * Function called when Data Message is sent
602  *
603  * @param cls the io_handle corresponding to the Data Message
604  * @param socket the socket which was used
605  */
606 static void
607 write_data_finish_cb (void *cls,
608                       struct GNUNET_STREAM_Socket *socket)
609 {
610   struct GNUNET_STREAM_IOWriteHandle *io_handle = cls;
611
612   io_handle->sent_packets++;
613 }
614
615
616 /**
617  * Writes data using the given socket. The amount of data written is limited by
618  * the receive_window_size
619  *
620  * @param socket the socket to use
621  */
622 static void 
623 write_data (struct GNUNET_STREAM_Socket *socket)
624 {
625   struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
626   unsigned int packet;
627   int ack_packet;
628
629   ack_packet = -1;
630   /* Find the last acknowledged packet */
631   for (packet=0; packet < 64; packet++)
632     {      
633       if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
634                                               packet))
635         ack_packet = packet;        
636       else if (NULL == io_handle->messages[packet])
637         break;
638     }
639   /* Resend packets which weren't ack'ed */
640   for (packet=0; packet < ack_packet; packet++)
641     {
642       if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
643                                              packet))
644         {
645           queue_message (socket,
646                          &io_handle->messages[packet]->header,
647                          NULL,
648                          NULL);
649         }
650     }
651   packet = ack_packet + 1;
652   /* Now send new packets if there is enough buffer space */
653   while ( (NULL != io_handle->messages[packet]) &&
654           (socket->receive_window_available >= ntohs (io_handle->messages[packet]->header.header.size)) )
655     {
656       socket->receive_window_available -= ntohs (io_handle->messages[packet]->header.header.size);
657       queue_message (socket,
658                      &io_handle->messages[packet]->header,
659                      &write_data_finish_cb,
660                      io_handle);
661       packet++;
662     }
663 }
664
665
666 /**
667  * Task for calling the read processor
668  *
669  * @param cls the socket
670  * @param tc the task context
671  */
672 static void
673 call_read_processor_task (void *cls,
674                           const struct GNUNET_SCHEDULER_TaskContext *tc)
675 {
676   struct GNUNET_STREAM_Socket *socket = cls;
677   size_t read_size;
678   size_t valid_read_size;
679
680   if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN) return;
681
682   GNUNET_assert (NULL != socket->read_handle);
683   GNUNET_assert (NULL != socket->read_handle->proc);
684   GNUNET_assert (NULL != socket->copy_buffer);
685   GNUNET_assert (0 != socket->copy_buffer_size);
686
687   valid_read_size = socket->copy_buffer_size - socket->copy_buffer_read_offset;
688   GNUNET_assert (0 != valid_read_size);
689
690   read_size = socket->read_handle->proc (socket->read_handle->proc_cls,
691                                          socket->status,
692                                          socket->copy_buffer 
693                                          + socket->copy_buffer_read_offset,
694                                          valid_read_size);
695
696   GNUNET_assert (read_size <= valid_read_size);
697   socket->copy_buffer_read_offset += read_size;
698
699   /* Free the copy buffer once it has been read entirely */
700   if (socket->copy_buffer_read_offset == socket->copy_buffer_size)
701     {
702       GNUNET_free (socket->copy_buffer);
703       socket->copy_buffer = NULL;
704       socket->copy_buffer_size = 0;
705       socket->copy_buffer_read_offset = 0;
706     }
707
708   /* Free the read handle */
709   GNUNET_free (socket->read_handle);
710   socket->read_handle = NULL;
711 }
712
713
714 /**
715  * Prepares the receive buffer for possible reads; Should only be called when
716  * there is a valid READ io request pending and socket->copy_buffer is empty
717  *
718  * @param socket the socket pointer
719  */
720 static void 
721 prepare_buffer_for_read (struct GNUNET_STREAM_Socket *socket)
722 {
723   unsigned int packet;
724   uint32_t offset_increase;
725   uint32_t sequence_increase;
726
727   GNUNET_assert (NULL == socket->copy_buffer);
728   GNUNET_assert (NULL != socket->read_handle);
729
730   /* Check the bitmap for any holes */
731   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
732     {
733       if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
734                                              packet))
735         break;
736     }
737
738   sequence_increase = packet;
739
740   if (0 == sequence_increase)              /* The first packet is still missing */
741     {
742       return;
743     }
744   
745   /* Copy data to copy buffer */
746   GNUNET_assert (0 < socket->receive_buffer_boundaries[sequence_increase-1]);
747   socket->copy_buffer = 
748     GNUNET_malloc (socket->receive_buffer_boundaries[sequence_increase-1]);
749   memcpy (socket->copy_buffer, 
750           socket->receive_buffer,
751           socket->receive_buffer_boundaries[sequence_increase-1]);
752   
753   /* Shift the data in the receive buffer */
754   memmove (socket->receive_buffer,
755            socket->receive_buffer 
756            + socket->receive_buffer_boundaries[sequence_increase-1],
757            socket->receive_buffer_size - socket->receive_buffer_boundaries[sequence_increase-1]);
758   
759   /* Shift the bitmap */
760   socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
761   
762   /* Set read_sequence_number */
763   socket->read_sequence_number += sequence_increase;
764   
765   /* Set read_offset */
766   offset_increase = socket->receive_buffer_boundaries[sequence_increase-1];
767   socket->read_offset += offset_increase;
768   
769   /* Fix relative boundaries */
770   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
771     {
772       if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase)
773         {
774           socket->receive_buffer_boundaries[packet] = 
775             socket->receive_buffer_boundaries[packet + sequence_increase] 
776             - offset_increase;
777         }
778       else
779         socket->receive_buffer_boundaries[packet] = 0;
780     }
781
782   GNUNET_SCHEDULER_add_continuation (&call_read_processor_task,
783                                      socket,
784                                      GNUNET_SCHEDULER_REASON_PREREQ_DONE);
785 }
786
787
788 /**
789  * Handler for DATA messages; Same for both client and server
790  *
791  * @param socket the socket through which the ack was received
792  * @param tunnel connection to the other end
793  * @param sender who sent the message
794  * @param msg the data message
795  * @param atsi performance data for the connection
796  * @return GNUNET_OK to keep the connection open,
797  *         GNUNET_SYSERR to close it (signal serious error)
798  */
799 static int
800 handle_data (struct GNUNET_STREAM_Socket *socket,
801              struct GNUNET_MESH_Tunnel *tunnel,
802              const struct GNUNET_PeerIdentity *sender,
803              const struct GNUNET_STREAM_DataMessage *msg,
804              const struct GNUNET_ATS_Information*atsi)
805 {
806   const void *payload;
807   uint32_t bytes_needed;
808   uint32_t relative_offset;
809   uint32_t relative_sequence_number;
810   uint16_t size;
811
812   size = htons (msg->header.header.size);
813   if (size < sizeof (struct GNUNET_STREAM_DataMessage))
814     {
815       GNUNET_break_op (0);
816       return GNUNET_SYSERR;
817     }
818
819   switch (socket->state)
820     {
821     case STATE_ESTABLISHED:
822     case STATE_TRANSMIT_CLOSED:
823     case STATE_TRANSMIT_CLOSE_WAIT:
824
825       /* check if the message's sequence number is in the range we are
826          expecting */
827       relative_sequence_number = 
828         ntohl (msg->sequence_number) - socket->read_sequence_number;
829       if ( relative_sequence_number > 64)
830         {
831           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
832                       "Ignoring received message with sequence number %d",
833                       ntohl (msg->sequence_number));
834           return GNUNET_YES;
835         }
836
837       /* Check if we have to allocate the buffer */
838       size -= sizeof (struct GNUNET_STREAM_DataMessage);
839       relative_offset = ntohl (msg->offset) - socket->read_offset;
840       bytes_needed = relative_offset + size;
841       
842       if (bytes_needed > socket->receive_buffer_size)
843         {
844           if (bytes_needed <= RECEIVE_BUFFER_SIZE)
845             {
846               socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
847                                                        bytes_needed);
848               socket->receive_buffer_size = bytes_needed;
849             }
850           else
851             {
852               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
853                           "Cannot accommodate packet %d as buffer is full\n",
854                           ntohl (msg->sequence_number));
855               return GNUNET_YES;
856             }
857         }
858       
859       /* Copy Data to buffer */
860       payload = &msg[1];
861       GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
862       memcpy (socket->receive_buffer + relative_offset,
863               payload,
864               size);
865       socket->receive_buffer_boundaries[relative_sequence_number] = 
866         relative_offset + size;
867       
868       /* Modify the ACK bitmap */
869       ackbitmap_modify_bit (&socket->ack_bitmap,
870                             relative_sequence_number,
871                             GNUNET_YES);
872
873       /* Start ACK sending task if one is not already present */
874       if (0 == socket->ack_task_id)
875        {
876          socket->ack_task_id = 
877            GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
878                                          (msg->ack_deadline),
879                                          &ack_task,
880                                          socket);
881        }
882
883       if ((NULL != socket->read_handle) /* A read handle is waiting */
884           && (NULL == socket->copy_buffer)) /* And the copy buffer is empty */
885         {
886           prepare_buffer_for_read (socket);
887         }
888       
889       break;
890
891     default:
892       /* FIXME: call statistics */
893       break;
894     }
895   return GNUNET_YES;
896 }
897
898 /**
899  * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
900  *
901  * @param cls the socket (set from GNUNET_MESH_connect)
902  * @param tunnel connection to the other end
903  * @param tunnel_ctx place to store local state associated with the tunnel
904  * @param sender who sent the message
905  * @param message the actual message
906  * @param atsi performance data for the connection
907  * @return GNUNET_OK to keep the connection open,
908  *         GNUNET_SYSERR to close it (signal serious error)
909  */
910 static int
911 client_handle_data (void *cls,
912              struct GNUNET_MESH_Tunnel *tunnel,
913              void **tunnel_ctx,
914              const struct GNUNET_PeerIdentity *sender,
915              const struct GNUNET_MessageHeader *message,
916              const struct GNUNET_ATS_Information*atsi)
917 {
918   struct GNUNET_STREAM_Socket *socket = cls;
919
920   return handle_data (socket, 
921                       tunnel, 
922                       sender, 
923                       (const struct GNUNET_STREAM_DataMessage *) message, 
924                       atsi);
925 }
926
927
928 /**
929  * Callback to set state to ESTABLISHED
930  *
931  * @param cls the closure from queue_message FIXME: document
932  * @param socket the socket to requiring state change
933  */
934 static void
935 set_state_established (void *cls,
936                        struct GNUNET_STREAM_Socket *socket)
937 {
938   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Attaining ESTABLISHED state\n");
939   socket->write_offset = 0;
940   socket->read_offset = 0;
941   socket->state = STATE_ESTABLISHED;
942 }
943
944
945 /**
946  * Callback to set state to HELLO_WAIT
947  *
948  * @param cls the closure from queue_message
949  * @param socket the socket to requiring state change
950  */
951 static void
952 set_state_hello_wait (void *cls,
953                       struct GNUNET_STREAM_Socket *socket)
954 {
955   GNUNET_assert (STATE_INIT == socket->state);
956   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Attaining HELLO_WAIT state\n");
957   socket->state = STATE_HELLO_WAIT;
958 }
959
960
961 /**
962  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
963  *
964  * @param cls the socket (set from GNUNET_MESH_connect)
965  * @param tunnel connection to the other end
966  * @param tunnel_ctx this is NULL
967  * @param sender who sent the message
968  * @param message the actual message
969  * @param atsi performance data for the connection
970  * @return GNUNET_OK to keep the connection open,
971  *         GNUNET_SYSERR to close it (signal serious error)
972  */
973 static int
974 client_handle_hello_ack (void *cls,
975                          struct GNUNET_MESH_Tunnel *tunnel,
976                          void **tunnel_ctx,
977                          const struct GNUNET_PeerIdentity *sender,
978                          const struct GNUNET_MessageHeader *message,
979                          const struct GNUNET_ATS_Information*atsi)
980 {
981   struct GNUNET_STREAM_Socket *socket = cls;
982   const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
983   struct GNUNET_STREAM_HelloAckMessage *reply;
984
985   ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
986   GNUNET_assert (socket->tunnel == tunnel);
987   switch (socket->state)
988   {
989   case STATE_HELLO_WAIT:
990       socket->read_sequence_number = ntohl (ack_msg->sequence_number);
991       socket->receive_window_available = ntohl (ack_msg->receive_window_size);
992       /* Get the random sequence number */
993       socket->write_sequence_number = 
994         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
995       reply = 
996         GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
997       reply->header.header.size = 
998         htons (sizeof (struct GNUNET_STREAM_MessageHeader));
999       reply->header.header.type = 
1000         htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1001       reply->sequence_number = htonl (socket->write_sequence_number);
1002       reply->receive_window_size = htonl (RECEIVE_BUFFER_SIZE);
1003       queue_message (socket, 
1004                      &reply->header, 
1005                      &set_state_established, 
1006                      NULL);      
1007       return GNUNET_OK;
1008   case STATE_ESTABLISHED:
1009   case STATE_RECEIVE_CLOSE_WAIT:
1010     // call statistics (# ACKs ignored++)
1011     return GNUNET_OK;
1012   case STATE_INIT:
1013   default:
1014     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1015                 "Server sent HELLO_ACK when in state %d\n", socket->state);
1016     socket->state = STATE_CLOSED; // introduce STATE_ERROR?
1017     return GNUNET_SYSERR;
1018   }
1019
1020 }
1021
1022
1023 /**
1024  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1025  *
1026  * @param cls the socket (set from GNUNET_MESH_connect)
1027  * @param tunnel connection to the other end
1028  * @param tunnel_ctx this is NULL
1029  * @param sender who sent the message
1030  * @param message the actual message
1031  * @param atsi performance data for the connection
1032  * @return GNUNET_OK to keep the connection open,
1033  *         GNUNET_SYSERR to close it (signal serious error)
1034  */
1035 static int
1036 client_handle_reset (void *cls,
1037                      struct GNUNET_MESH_Tunnel *tunnel,
1038                      void **tunnel_ctx,
1039                      const struct GNUNET_PeerIdentity *sender,
1040                      const struct GNUNET_MessageHeader *message,
1041                      const struct GNUNET_ATS_Information*atsi)
1042 {
1043   struct GNUNET_STREAM_Socket *socket = cls;
1044
1045   return GNUNET_OK;
1046 }
1047
1048
1049 /**
1050  * Common message handler for handling TRANSMIT_CLOSE messages
1051  *
1052  * @param socket the socket through which the ack was received
1053  * @param tunnel connection to the other end
1054  * @param sender who sent the message
1055  * @param msg the transmit close message
1056  * @param atsi performance data for the connection
1057  * @return GNUNET_OK to keep the connection open,
1058  *         GNUNET_SYSERR to close it (signal serious error)
1059  */
1060 static int
1061 handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
1062                        struct GNUNET_MESH_Tunnel *tunnel,
1063                        const struct GNUNET_PeerIdentity *sender,
1064                        const struct GNUNET_STREAM_MessageHeader *msg,
1065                        const struct GNUNET_ATS_Information*atsi)
1066 {
1067   struct GNUNET_STREAM_MessageHeader *reply;
1068
1069   switch (socket->state)
1070     {
1071     case STATE_ESTABLISHED:
1072       socket->state = STATE_RECEIVE_CLOSED;
1073
1074       /* Send TRANSMIT_CLOSE_ACK */
1075       reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1076       reply->header.type = 
1077         htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
1078       reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1079       queue_message (socket, reply, NULL, NULL);
1080       break;
1081
1082     default:
1083       /* FIXME: Call statistics? */
1084       break;
1085     }
1086   return GNUNET_YES;
1087 }
1088
1089
1090 /**
1091  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1092  *
1093  * @param cls the socket (set from GNUNET_MESH_connect)
1094  * @param tunnel connection to the other end
1095  * @param tunnel_ctx this is NULL
1096  * @param sender who sent the message
1097  * @param message the actual message
1098  * @param atsi performance data for the connection
1099  * @return GNUNET_OK to keep the connection open,
1100  *         GNUNET_SYSERR to close it (signal serious error)
1101  */
1102 static int
1103 client_handle_transmit_close (void *cls,
1104                               struct GNUNET_MESH_Tunnel *tunnel,
1105                               void **tunnel_ctx,
1106                               const struct GNUNET_PeerIdentity *sender,
1107                               const struct GNUNET_MessageHeader *message,
1108                               const struct GNUNET_ATS_Information*atsi)
1109 {
1110   struct GNUNET_STREAM_Socket *socket = cls;
1111   
1112   return handle_transmit_close (socket,
1113                                 tunnel,
1114                                 sender,
1115                                 (struct GNUNET_STREAM_MessageHeader *)message,
1116                                 atsi);
1117 }
1118
1119
1120 /**
1121  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1122  *
1123  * @param cls the socket (set from GNUNET_MESH_connect)
1124  * @param tunnel connection to the other end
1125  * @param tunnel_ctx this is NULL
1126  * @param sender who sent the message
1127  * @param message the actual message
1128  * @param atsi performance data for the connection
1129  * @return GNUNET_OK to keep the connection open,
1130  *         GNUNET_SYSERR to close it (signal serious error)
1131  */
1132 static int
1133 client_handle_transmit_close_ack (void *cls,
1134                                   struct GNUNET_MESH_Tunnel *tunnel,
1135                                   void **tunnel_ctx,
1136                                   const struct GNUNET_PeerIdentity *sender,
1137                                   const struct GNUNET_MessageHeader *message,
1138                                   const struct GNUNET_ATS_Information*atsi)
1139 {
1140   struct GNUNET_STREAM_Socket *socket = cls;
1141
1142   return GNUNET_OK;
1143 }
1144
1145
1146 /**
1147  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1148  *
1149  * @param cls the socket (set from GNUNET_MESH_connect)
1150  * @param tunnel connection to the other end
1151  * @param tunnel_ctx this is NULL
1152  * @param sender who sent the message
1153  * @param message the actual message
1154  * @param atsi performance data for the connection
1155  * @return GNUNET_OK to keep the connection open,
1156  *         GNUNET_SYSERR to close it (signal serious error)
1157  */
1158 static int
1159 client_handle_receive_close (void *cls,
1160                              struct GNUNET_MESH_Tunnel *tunnel,
1161                              void **tunnel_ctx,
1162                              const struct GNUNET_PeerIdentity *sender,
1163                              const struct GNUNET_MessageHeader *message,
1164                              const struct GNUNET_ATS_Information*atsi)
1165 {
1166   struct GNUNET_STREAM_Socket *socket = cls;
1167
1168   return GNUNET_OK;
1169 }
1170
1171
1172 /**
1173  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1174  *
1175  * @param cls the socket (set from GNUNET_MESH_connect)
1176  * @param tunnel connection to the other end
1177  * @param tunnel_ctx this is NULL
1178  * @param sender who sent the message
1179  * @param message the actual message
1180  * @param atsi performance data for the connection
1181  * @return GNUNET_OK to keep the connection open,
1182  *         GNUNET_SYSERR to close it (signal serious error)
1183  */
1184 static int
1185 client_handle_receive_close_ack (void *cls,
1186                                  struct GNUNET_MESH_Tunnel *tunnel,
1187                                  void **tunnel_ctx,
1188                                  const struct GNUNET_PeerIdentity *sender,
1189                                  const struct GNUNET_MessageHeader *message,
1190                                  const struct GNUNET_ATS_Information*atsi)
1191 {
1192   struct GNUNET_STREAM_Socket *socket = cls;
1193
1194   return GNUNET_OK;
1195 }
1196
1197
1198 /**
1199  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1200  *
1201  * @param cls the socket (set from GNUNET_MESH_connect)
1202  * @param tunnel connection to the other end
1203  * @param tunnel_ctx this is NULL
1204  * @param sender who sent the message
1205  * @param message the actual message
1206  * @param atsi performance data for the connection
1207  * @return GNUNET_OK to keep the connection open,
1208  *         GNUNET_SYSERR to close it (signal serious error)
1209  */
1210 static int
1211 client_handle_close (void *cls,
1212                      struct GNUNET_MESH_Tunnel *tunnel,
1213                      void **tunnel_ctx,
1214                      const struct GNUNET_PeerIdentity *sender,
1215                      const struct GNUNET_MessageHeader *message,
1216                      const struct GNUNET_ATS_Information*atsi)
1217 {
1218   struct GNUNET_STREAM_Socket *socket = cls;
1219
1220   return GNUNET_OK;
1221 }
1222
1223
1224 /**
1225  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1226  *
1227  * @param cls the socket (set from GNUNET_MESH_connect)
1228  * @param tunnel connection to the other end
1229  * @param tunnel_ctx this is NULL
1230  * @param sender who sent the message
1231  * @param message the actual message
1232  * @param atsi performance data for the connection
1233  * @return GNUNET_OK to keep the connection open,
1234  *         GNUNET_SYSERR to close it (signal serious error)
1235  */
1236 static int
1237 client_handle_close_ack (void *cls,
1238                          struct GNUNET_MESH_Tunnel *tunnel,
1239                          void **tunnel_ctx,
1240                          const struct GNUNET_PeerIdentity *sender,
1241                          const struct GNUNET_MessageHeader *message,
1242                          const struct GNUNET_ATS_Information*atsi)
1243 {
1244   struct GNUNET_STREAM_Socket *socket = cls;
1245
1246   return GNUNET_OK;
1247 }
1248
1249 /*****************************/
1250 /* Server's Message Handlers */
1251 /*****************************/
1252
1253 /**
1254  * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1255  *
1256  * @param cls the closure
1257  * @param tunnel connection to the other end
1258  * @param tunnel_ctx the socket
1259  * @param sender who sent the message
1260  * @param message the actual message
1261  * @param atsi performance data for the connection
1262  * @return GNUNET_OK to keep the connection open,
1263  *         GNUNET_SYSERR to close it (signal serious error)
1264  */
1265 static int
1266 server_handle_data (void *cls,
1267                     struct GNUNET_MESH_Tunnel *tunnel,
1268                     void **tunnel_ctx,
1269                     const struct GNUNET_PeerIdentity *sender,
1270                     const struct GNUNET_MessageHeader *message,
1271                     const struct GNUNET_ATS_Information*atsi)
1272 {
1273   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1274
1275   return handle_data (socket,
1276                       tunnel,
1277                       sender,
1278                       (const struct GNUNET_STREAM_DataMessage *)message,
1279                       atsi);
1280 }
1281
1282
1283 /**
1284  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
1285  *
1286  * @param cls the closure
1287  * @param tunnel connection to the other end
1288  * @param tunnel_ctx the socket
1289  * @param sender who sent the message
1290  * @param message the actual message
1291  * @param atsi performance data for the connection
1292  * @return GNUNET_OK to keep the connection open,
1293  *         GNUNET_SYSERR to close it (signal serious error)
1294  */
1295 static int
1296 server_handle_hello (void *cls,
1297                      struct GNUNET_MESH_Tunnel *tunnel,
1298                      void **tunnel_ctx,
1299                      const struct GNUNET_PeerIdentity *sender,
1300                      const struct GNUNET_MessageHeader *message,
1301                      const struct GNUNET_ATS_Information*atsi)
1302 {
1303   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1304   struct GNUNET_STREAM_HelloAckMessage *reply;
1305
1306   GNUNET_assert (socket->tunnel == tunnel);
1307   if (STATE_INIT == socket->state)
1308     {
1309       /* Get the random sequence number */
1310       socket->write_sequence_number = 
1311         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1312       reply = 
1313         GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1314       reply->header.header.size = 
1315         htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1316       reply->header.header.type = 
1317         htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1318       reply->sequence_number = htonl (socket->write_sequence_number);
1319       queue_message (socket, 
1320                      &reply->header,
1321                      &set_state_hello_wait, 
1322                      NULL);
1323     }
1324   else
1325     {
1326       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1327                   "Client sent HELLO when in state %d\n", socket->state);
1328       /* FIXME: Send RESET? */
1329       
1330     }
1331   return GNUNET_OK;
1332 }
1333
1334
1335 /**
1336  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1337  *
1338  * @param cls the closure
1339  * @param tunnel connection to the other end
1340  * @param tunnel_ctx the socket
1341  * @param sender who sent the message
1342  * @param message the actual message
1343  * @param atsi performance data for the connection
1344  * @return GNUNET_OK to keep the connection open,
1345  *         GNUNET_SYSERR to close it (signal serious error)
1346  */
1347 static int
1348 server_handle_hello_ack (void *cls,
1349                          struct GNUNET_MESH_Tunnel *tunnel,
1350                          void **tunnel_ctx,
1351                          const struct GNUNET_PeerIdentity *sender,
1352                          const struct GNUNET_MessageHeader *message,
1353                          const struct GNUNET_ATS_Information*atsi)
1354 {
1355   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1356   const struct GNUNET_STREAM_HelloAckMessage *ack_message;
1357
1358   ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
1359   GNUNET_assert (socket->tunnel == tunnel);
1360   if (STATE_HELLO_WAIT == socket->state)
1361     {
1362       socket->read_sequence_number = ntohl (ack_message->sequence_number);
1363       socket->receive_window_available = 
1364         ntohl (ack_message->receive_window_size);
1365       /* Attain ESTABLISHED state */
1366       set_state_established (NULL, socket);
1367     }
1368   else
1369     {
1370       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1371                   "Client sent HELLO_ACK when in state %d\n", socket->state);
1372       /* FIXME: Send RESET? */
1373       
1374     }
1375   return GNUNET_OK;
1376 }
1377
1378
1379 /**
1380  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1381  *
1382  * @param cls the closure
1383  * @param tunnel connection to the other end
1384  * @param tunnel_ctx the socket
1385  * @param sender who sent the message
1386  * @param message the actual message
1387  * @param atsi performance data for the connection
1388  * @return GNUNET_OK to keep the connection open,
1389  *         GNUNET_SYSERR to close it (signal serious error)
1390  */
1391 static int
1392 server_handle_reset (void *cls,
1393                      struct GNUNET_MESH_Tunnel *tunnel,
1394                      void **tunnel_ctx,
1395                      const struct GNUNET_PeerIdentity *sender,
1396                      const struct GNUNET_MessageHeader *message,
1397                      const struct GNUNET_ATS_Information*atsi)
1398 {
1399   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1400
1401   return GNUNET_OK;
1402 }
1403
1404
1405 /**
1406  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1407  *
1408  * @param cls the closure
1409  * @param tunnel connection to the other end
1410  * @param tunnel_ctx the socket
1411  * @param sender who sent the message
1412  * @param message the actual message
1413  * @param atsi performance data for the connection
1414  * @return GNUNET_OK to keep the connection open,
1415  *         GNUNET_SYSERR to close it (signal serious error)
1416  */
1417 static int
1418 server_handle_transmit_close (void *cls,
1419                               struct GNUNET_MESH_Tunnel *tunnel,
1420                               void **tunnel_ctx,
1421                               const struct GNUNET_PeerIdentity *sender,
1422                               const struct GNUNET_MessageHeader *message,
1423                               const struct GNUNET_ATS_Information*atsi)
1424 {
1425   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1426
1427   return handle_transmit_close (socket,
1428                                 tunnel,
1429                                 sender,
1430                                 (struct GNUNET_STREAM_MessageHeader *)message,
1431                                 atsi);
1432 }
1433
1434
1435 /**
1436  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1437  *
1438  * @param cls the closure
1439  * @param tunnel connection to the other end
1440  * @param tunnel_ctx the socket
1441  * @param sender who sent the message
1442  * @param message the actual message
1443  * @param atsi performance data for the connection
1444  * @return GNUNET_OK to keep the connection open,
1445  *         GNUNET_SYSERR to close it (signal serious error)
1446  */
1447 static int
1448 server_handle_transmit_close_ack (void *cls,
1449                                   struct GNUNET_MESH_Tunnel *tunnel,
1450                                   void **tunnel_ctx,
1451                                   const struct GNUNET_PeerIdentity *sender,
1452                                   const struct GNUNET_MessageHeader *message,
1453                                   const struct GNUNET_ATS_Information*atsi)
1454 {
1455   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1456
1457   return GNUNET_OK;
1458 }
1459
1460
1461 /**
1462  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1463  *
1464  * @param cls the closure
1465  * @param tunnel connection to the other end
1466  * @param tunnel_ctx the socket
1467  * @param sender who sent the message
1468  * @param message the actual message
1469  * @param atsi performance data for the connection
1470  * @return GNUNET_OK to keep the connection open,
1471  *         GNUNET_SYSERR to close it (signal serious error)
1472  */
1473 static int
1474 server_handle_receive_close (void *cls,
1475                              struct GNUNET_MESH_Tunnel *tunnel,
1476                              void **tunnel_ctx,
1477                              const struct GNUNET_PeerIdentity *sender,
1478                              const struct GNUNET_MessageHeader *message,
1479                              const struct GNUNET_ATS_Information*atsi)
1480 {
1481   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1482
1483   return GNUNET_OK;
1484 }
1485
1486
1487 /**
1488  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1489  *
1490  * @param cls the closure
1491  * @param tunnel connection to the other end
1492  * @param tunnel_ctx the socket
1493  * @param sender who sent the message
1494  * @param message the actual message
1495  * @param atsi performance data for the connection
1496  * @return GNUNET_OK to keep the connection open,
1497  *         GNUNET_SYSERR to close it (signal serious error)
1498  */
1499 static int
1500 server_handle_receive_close_ack (void *cls,
1501                                  struct GNUNET_MESH_Tunnel *tunnel,
1502                                  void **tunnel_ctx,
1503                                  const struct GNUNET_PeerIdentity *sender,
1504                                  const struct GNUNET_MessageHeader *message,
1505                                  const struct GNUNET_ATS_Information*atsi)
1506 {
1507   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1508
1509   return GNUNET_OK;
1510 }
1511
1512
1513 /**
1514  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1515  *
1516  * @param cls the closure
1517  * @param tunnel connection to the other end
1518  * @param tunnel_ctx the socket
1519  * @param sender who sent the message
1520  * @param message the actual message
1521  * @param atsi performance data for the connection
1522  * @return GNUNET_OK to keep the connection open,
1523  *         GNUNET_SYSERR to close it (signal serious error)
1524  */
1525 static int
1526 server_handle_close (void *cls,
1527                      struct GNUNET_MESH_Tunnel *tunnel,
1528                      void **tunnel_ctx,
1529                      const struct GNUNET_PeerIdentity *sender,
1530                      const struct GNUNET_MessageHeader *message,
1531                      const struct GNUNET_ATS_Information*atsi)
1532 {
1533   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1534
1535   return GNUNET_OK;
1536 }
1537
1538
1539 /**
1540  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1541  *
1542  * @param cls the closure
1543  * @param tunnel connection to the other end
1544  * @param tunnel_ctx the socket
1545  * @param sender who sent the message
1546  * @param message the actual message
1547  * @param atsi performance data for the connection
1548  * @return GNUNET_OK to keep the connection open,
1549  *         GNUNET_SYSERR to close it (signal serious error)
1550  */
1551 static int
1552 server_handle_close_ack (void *cls,
1553                          struct GNUNET_MESH_Tunnel *tunnel,
1554                          void **tunnel_ctx,
1555                          const struct GNUNET_PeerIdentity *sender,
1556                          const struct GNUNET_MessageHeader *message,
1557                          const struct GNUNET_ATS_Information*atsi)
1558 {
1559   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1560
1561   return GNUNET_OK;
1562 }
1563
1564
1565 /**
1566  * Message Handler for mesh
1567  *
1568  * @param socket the socket through which the ack was received
1569  * @param tunnel connection to the other end
1570  * @param sender who sent the message
1571  * @param ack the acknowledgment message
1572  * @param atsi performance data for the connection
1573  * @return GNUNET_OK to keep the connection open,
1574  *         GNUNET_SYSERR to close it (signal serious error)
1575  */
1576 static int
1577 handle_ack (struct GNUNET_STREAM_Socket *socket,
1578             struct GNUNET_MESH_Tunnel *tunnel,
1579             const struct GNUNET_PeerIdentity *sender,
1580             const struct GNUNET_STREAM_AckMessage *ack,
1581             const struct GNUNET_ATS_Information*atsi)
1582 {
1583   switch (socket->state)
1584     {
1585     case (STATE_ESTABLISHED):
1586       if (NULL == socket->write_handle)
1587         {
1588           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1589                       "Received DATA ACK when write_handle is NULL\n");
1590           return GNUNET_OK;
1591         }
1592
1593       socket->write_handle->ack_bitmap = GNUNET_ntohll (ack->bitmap);
1594       socket->receive_window_available = 
1595         ntohl (ack->receive_window_remaining);
1596       write_data (socket);
1597       break;
1598     default:
1599       break;
1600     }
1601   return GNUNET_OK;
1602 }
1603
1604
1605 /**
1606  * Message Handler for mesh
1607  *
1608  * @param cls the 'struct GNUNET_STREAM_Socket'
1609  * @param tunnel connection to the other end
1610  * @param tunnel_ctx unused
1611  * @param sender who sent the message
1612  * @param message the actual message
1613  * @param atsi performance data for the connection
1614  * @return GNUNET_OK to keep the connection open,
1615  *         GNUNET_SYSERR to close it (signal serious error)
1616  */
1617 static int
1618 client_handle_ack (void *cls,
1619                    struct GNUNET_MESH_Tunnel *tunnel,
1620                    void **tunnel_ctx,
1621                    const struct GNUNET_PeerIdentity *sender,
1622                    const struct GNUNET_MessageHeader *message,
1623                    const struct GNUNET_ATS_Information*atsi)
1624 {
1625   struct GNUNET_STREAM_Socket *socket = cls;
1626   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
1627  
1628   return handle_ack (socket, tunnel, sender, ack, atsi);
1629 }
1630
1631
1632 /**
1633  * Message Handler for mesh
1634  *
1635  * @param cls the server's listen socket
1636  * @param tunnel connection to the other end
1637  * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
1638  * @param sender who sent the message
1639  * @param message the actual message
1640  * @param atsi performance data for the connection
1641  * @return GNUNET_OK to keep the connection open,
1642  *         GNUNET_SYSERR to close it (signal serious error)
1643  */
1644 static int
1645 server_handle_ack (void *cls,
1646                    struct GNUNET_MESH_Tunnel *tunnel,
1647                    void **tunnel_ctx,
1648                    const struct GNUNET_PeerIdentity *sender,
1649                    const struct GNUNET_MessageHeader *message,
1650                    const struct GNUNET_ATS_Information*atsi)
1651 {
1652   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1653   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
1654  
1655   return handle_ack (socket, tunnel, sender, ack, atsi);
1656 }
1657
1658
1659 /**
1660  * For client message handlers, the stream socket is in the
1661  * closure argument.
1662  */
1663 static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
1664   {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
1665   {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
1666    sizeof (struct GNUNET_STREAM_AckMessage) },
1667   {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
1668    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
1669   {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
1670    sizeof (struct GNUNET_STREAM_MessageHeader)},
1671   {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
1672    sizeof (struct GNUNET_STREAM_MessageHeader)},
1673   {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
1674    sizeof (struct GNUNET_STREAM_MessageHeader)},
1675   {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
1676    sizeof (struct GNUNET_STREAM_MessageHeader)},
1677   {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
1678    sizeof (struct GNUNET_STREAM_MessageHeader)},
1679   {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
1680    sizeof (struct GNUNET_STREAM_MessageHeader)},
1681   {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
1682    sizeof (struct GNUNET_STREAM_MessageHeader)},
1683   {NULL, 0, 0}
1684 };
1685
1686
1687 /**
1688  * For server message handlers, the stream socket is in the
1689  * tunnel context, and the listen socket in the closure argument.
1690  */
1691 static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
1692   {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
1693   {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
1694    sizeof (struct GNUNET_STREAM_AckMessage) },
1695   {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 
1696    sizeof (struct GNUNET_STREAM_MessageHeader)},
1697   {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
1698    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
1699   {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
1700    sizeof (struct GNUNET_STREAM_MessageHeader)},
1701   {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
1702    sizeof (struct GNUNET_STREAM_MessageHeader)},
1703   {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
1704    sizeof (struct GNUNET_STREAM_MessageHeader)},
1705   {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
1706    sizeof (struct GNUNET_STREAM_MessageHeader)},
1707   {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
1708    sizeof (struct GNUNET_STREAM_MessageHeader)},
1709   {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
1710    sizeof (struct GNUNET_STREAM_MessageHeader)},
1711   {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
1712    sizeof (struct GNUNET_STREAM_MessageHeader)},
1713   {NULL, 0, 0}
1714 };
1715
1716
1717 /**
1718  * Function called when our target peer is connected to our tunnel
1719  *
1720  * @param cls the socket for which this tunnel is created
1721  * @param peer the peer identity of the target
1722  * @param atsi performance data for the connection
1723  */
1724 static void
1725 mesh_peer_connect_callback (void *cls,
1726                             const struct GNUNET_PeerIdentity *peer,
1727                             const struct GNUNET_ATS_Information * atsi)
1728 {
1729   struct GNUNET_STREAM_Socket *socket = cls;
1730   struct GNUNET_STREAM_MessageHeader *message;
1731
1732   if (0 != memcmp (&socket->other_peer, 
1733                    peer, 
1734                    sizeof (struct GNUNET_PeerIdentity)))
1735     {
1736       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1737                   "A peer (%s) which is not our target has connected to our tunnel", 
1738                   GNUNET_i2s (peer));
1739       return;
1740     }
1741   
1742   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1743               "Target peer %s connected\n", GNUNET_i2s (peer));
1744   
1745   /* Set state to INIT */
1746   socket->state = STATE_INIT;
1747
1748   /* Send HELLO message */
1749   message = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1750   message->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
1751   message->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1752   queue_message (socket,
1753                  message,
1754                  &set_state_hello_wait,
1755                  NULL);
1756
1757   /* Call open callback */
1758   if (NULL == socket->open_cb)
1759     {
1760       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1761                   "STREAM_open callback is NULL\n");
1762     }
1763   else
1764     {
1765       socket->open_cb (socket->open_cls, socket);
1766     }
1767 }
1768
1769
1770 /**
1771  * Function called when our target peer is disconnected from our tunnel
1772  *
1773  * @param cls the socket associated which this tunnel
1774  * @param peer the peer identity of the target
1775  */
1776 static void
1777 mesh_peer_disconnect_callback (void *cls,
1778                                const struct GNUNET_PeerIdentity *peer)
1779 {
1780
1781 }
1782
1783
1784 /*****************/
1785 /* API functions */
1786 /*****************/
1787
1788
1789 /**
1790  * Tries to open a stream to the target peer
1791  *
1792  * @param cfg configuration to use
1793  * @param target the target peer to which the stream has to be opened
1794  * @param app_port the application port number which uniquely identifies this
1795  *            stream
1796  * @param open_cb this function will be called after stream has be established 
1797  * @param open_cb_cls the closure for open_cb
1798  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
1799  * @return if successful it returns the stream socket; NULL if stream cannot be
1800  *         opened 
1801  */
1802 struct GNUNET_STREAM_Socket *
1803 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
1804                     const struct GNUNET_PeerIdentity *target,
1805                     GNUNET_MESH_ApplicationType app_port,
1806                     GNUNET_STREAM_OpenCallback open_cb,
1807                     void *open_cb_cls,
1808                     ...)
1809 {
1810   struct GNUNET_STREAM_Socket *socket;
1811   enum GNUNET_STREAM_Option option;
1812   va_list vargs;                /* Variable arguments */
1813
1814   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
1815   socket->other_peer = *target;
1816   socket->open_cb = open_cb;
1817   socket->open_cls = open_cb_cls;
1818
1819   /* Set defaults */
1820   socket->retransmit_timeout = 
1821     GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
1822
1823   va_start (vargs, open_cb_cls); /* Parse variable args */
1824   do {
1825     option = va_arg (vargs, enum GNUNET_STREAM_Option);
1826     switch (option)
1827       {
1828       case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
1829         /* Expect struct GNUNET_TIME_Relative */
1830         socket->retransmit_timeout = va_arg (vargs,
1831                                              struct GNUNET_TIME_Relative);
1832         break;
1833       case GNUNET_STREAM_OPTION_END:
1834         break;
1835       }
1836   } while (GNUNET_STREAM_OPTION_END != option);
1837   va_end (vargs);               /* End of variable args parsing */
1838
1839   socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
1840                                       1,  /* QUEUE size as parameter? */
1841                                       socket, /* cls */
1842                                       NULL, /* No inbound tunnel handler */
1843                                       NULL, /* No inbound tunnel cleaner */
1844                                       client_message_handlers,
1845                                       NULL); /* We don't get inbound tunnels */
1846   // FIXME: if (NULL == socket->mesh) ...
1847
1848   /* Now create the mesh tunnel to target */
1849   socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
1850                                               NULL, /* Tunnel context */
1851                                               &mesh_peer_connect_callback,
1852                                               &mesh_peer_disconnect_callback,
1853                                               socket);
1854   // FIXME: if (NULL == socket->tunnel) ...
1855
1856   return socket;
1857 }
1858
1859
1860 /**
1861  * Closes the stream
1862  *
1863  * @param socket the stream socket
1864  */
1865 void
1866 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
1867 {
1868   struct MessageQueue *head;
1869
1870   /* Clear Transmit handles */
1871   if (NULL != socket->transmit_handle)
1872     {
1873       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
1874       socket->transmit_handle = NULL;
1875     }
1876
1877   /* Clear existing message queue */
1878   while (NULL != (head = socket->queue_head)) {
1879     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
1880                                  socket->queue_tail,
1881                                  head);
1882     GNUNET_free (head->message);
1883     GNUNET_free (head);
1884   }
1885
1886   /* Close associated tunnel */
1887   if (NULL != socket->tunnel)
1888     {
1889       GNUNET_MESH_tunnel_destroy (socket->tunnel);
1890       socket->tunnel = NULL;
1891     }
1892
1893   /* Close mesh connection */
1894   if (NULL != socket->mesh)
1895     {
1896       GNUNET_MESH_disconnect (socket->mesh);
1897       socket->mesh = NULL;
1898     }
1899   
1900   /* Release receive buffer */
1901   if (NULL != socket->receive_buffer)
1902     {
1903       GNUNET_free (socket->receive_buffer);
1904     }
1905
1906   GNUNET_free (socket);
1907 }
1908
1909
1910 /**
1911  * Method called whenever a peer creates a tunnel to us
1912  *
1913  * @param cls closure
1914  * @param tunnel new handle to the tunnel
1915  * @param initiator peer that started the tunnel
1916  * @param atsi performance information for the tunnel
1917  * @return initial tunnel context for the tunnel
1918  *         (can be NULL -- that's not an error)
1919  */
1920 static void *
1921 new_tunnel_notify (void *cls,
1922                    struct GNUNET_MESH_Tunnel *tunnel,
1923                    const struct GNUNET_PeerIdentity *initiator,
1924                    const struct GNUNET_ATS_Information *atsi)
1925 {
1926   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
1927   struct GNUNET_STREAM_Socket *socket;
1928
1929   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
1930   socket->tunnel = tunnel;
1931   socket->session_id = 0;       /* FIXME */
1932   socket->other_peer = *initiator;
1933   socket->state = STATE_INIT;
1934
1935   if (GNUNET_SYSERR == lsocket->listen_cb (lsocket->listen_cb_cls,
1936                                            socket,
1937                                            &socket->other_peer))
1938     {
1939       socket->state = STATE_CLOSED;
1940       /* FIXME: Send CLOSE message and then free */
1941       GNUNET_free (socket);
1942       GNUNET_MESH_tunnel_destroy (tunnel); /* Destroy the tunnel */
1943     }
1944   return socket;
1945 }
1946
1947
1948 /**
1949  * Function called whenever an inbound tunnel is destroyed.  Should clean up
1950  * any associated state.  This function is NOT called if the client has
1951  * explicitly asked for the tunnel to be destroyed using
1952  * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
1953  * the tunnel.
1954  *
1955  * @param cls closure (set from GNUNET_MESH_connect)
1956  * @param tunnel connection to the other end (henceforth invalid)
1957  * @param tunnel_ctx place where local state associated
1958  *                   with the tunnel is stored
1959  */
1960 static void 
1961 tunnel_cleaner (void *cls,
1962                 const struct GNUNET_MESH_Tunnel *tunnel,
1963                 void *tunnel_ctx)
1964 {
1965   struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
1966   
1967   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1968               "Peer %s has terminated connection abruptly\n",
1969               GNUNET_i2s (&socket->other_peer));
1970
1971   socket->status = GNUNET_STREAM_SHUTDOWN;
1972   /* Clear Transmit handles */
1973   if (NULL != socket->transmit_handle)
1974     {
1975       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
1976       socket->transmit_handle = NULL;
1977     }
1978   socket->tunnel = NULL;
1979 }
1980
1981
1982 /**
1983  * Listens for stream connections for a specific application ports
1984  *
1985  * @param cfg the configuration to use
1986  * @param app_port the application port for which new streams will be accepted
1987  * @param listen_cb this function will be called when a peer tries to establish
1988  *            a stream with us
1989  * @param listen_cb_cls closure for listen_cb
1990  * @return listen socket, NULL for any error
1991  */
1992 struct GNUNET_STREAM_ListenSocket *
1993 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
1994                       GNUNET_MESH_ApplicationType app_port,
1995                       GNUNET_STREAM_ListenCallback listen_cb,
1996                       void *listen_cb_cls)
1997 {
1998   /* FIXME: Add variable args for passing configration options? */
1999   struct GNUNET_STREAM_ListenSocket *lsocket;
2000   GNUNET_MESH_ApplicationType app_types[2];
2001
2002   app_types[0] = app_port;
2003   app_types[1] = 0;
2004   lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
2005   lsocket->port = app_port;
2006   lsocket->listen_cb = listen_cb;
2007   lsocket->listen_cb_cls = listen_cb_cls;
2008   lsocket->mesh = GNUNET_MESH_connect (cfg,
2009                                        10, /* FIXME: QUEUE size as parameter? */
2010                                        lsocket, /* Closure */
2011                                        &new_tunnel_notify,
2012                                        &tunnel_cleaner,
2013                                        server_message_handlers,
2014                                        app_types);
2015   return lsocket;
2016 }
2017
2018
2019 /**
2020  * Closes the listen socket
2021  *
2022  * @param lsocket the listen socket
2023  */
2024 void
2025 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
2026 {
2027   /* Close MESH connection */
2028   GNUNET_MESH_disconnect (lsocket->mesh);
2029   
2030   GNUNET_free (lsocket);
2031 }
2032
2033
2034 /**
2035  * Tries to write the given data to the stream
2036  *
2037  * @param socket the socket representing a stream
2038  * @param data the data buffer from where the data is written into the stream
2039  * @param size the number of bytes to be written from the data buffer
2040  * @param timeout the timeout period
2041  * @param write_cont the function to call upon writing some bytes into the stream
2042  * @param write_cont_cls the closure
2043  * @return handle to cancel the operation
2044  */
2045 struct GNUNET_STREAM_IOWriteHandle *
2046 GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
2047                      const void *data,
2048                      size_t size,
2049                      struct GNUNET_TIME_Relative timeout,
2050                      GNUNET_STREAM_CompletionContinuation write_cont,
2051                      void *write_cont_cls)
2052 {
2053   unsigned int num_needed_packets;
2054   unsigned int packet;
2055   struct GNUNET_STREAM_IOWriteHandle *io_handle;
2056   uint32_t packet_size;
2057   uint32_t payload_size;
2058   struct GNUNET_STREAM_DataMessage *data_msg;
2059   const void *sweep;
2060
2061   /* Return NULL if there is already a write request pending */
2062   if (NULL != socket->write_handle)
2063   {
2064     GNUNET_break (0);
2065     return NULL;
2066   }
2067   if (!((STATE_ESTABLISHED == socket->state)
2068         || (STATE_RECEIVE_CLOSE_WAIT == socket->state)
2069         || (STATE_RECEIVE_CLOSED == socket->state)))
2070     {
2071       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2072                   "Attempting to write on a closed (OR) not-yet-established"
2073                   "stream\n"); 
2074       return NULL;
2075     } 
2076   if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size)
2077     size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH  * max_payload_size;
2078   num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size;
2079   io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
2080   sweep = data;
2081   /* Divide the given buffer into packets for sending */
2082   for (packet=0; packet < num_needed_packets; packet++)
2083     {
2084       if ((packet + 1) * max_payload_size < size) 
2085         {
2086           payload_size = max_payload_size;
2087           packet_size = MAX_PACKET_SIZE;
2088         }
2089       else 
2090         {
2091           payload_size = size - packet * max_payload_size;
2092           packet_size =  payload_size + sizeof (struct
2093                                                 GNUNET_STREAM_DataMessage); 
2094         }
2095       io_handle->messages[packet] = GNUNET_malloc (packet_size);
2096       io_handle->messages[packet]->header.header.size = htons (packet_size);
2097       io_handle->messages[packet]->header.header.type =
2098         htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
2099       io_handle->messages[packet]->sequence_number =
2100         htons (socket->write_sequence_number++);
2101       io_handle->messages[packet]->offset = htons (socket->write_offset);
2102
2103       /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
2104          determined from RTT */
2105       io_handle->messages[packet]->ack_deadline = 
2106         GNUNET_TIME_relative_hton (GNUNET_TIME_relative_multiply 
2107                                    (GNUNET_TIME_UNIT_SECONDS, 5));
2108       data_msg = io_handle->messages[packet];
2109       /* Copy data from given buffer to the packet */
2110       memcpy (&data_msg[1],
2111               sweep,
2112               payload_size);
2113       sweep += payload_size;
2114       socket->write_offset += payload_size;
2115     }
2116   socket->write_handle = io_handle;
2117   write_data (socket);
2118
2119   return io_handle;
2120 }
2121
2122
2123 /**
2124  * Tries to read data from the stream
2125  *
2126  * @param socket the socket representing a stream
2127  * @param timeout the timeout period
2128  * @param proc function to call with data (once only)
2129  * @param proc_cls the closure for proc
2130  * @return handle to cancel the operation
2131  */
2132 struct GNUNET_STREAM_IOReadHandle *
2133 GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
2134                     struct GNUNET_TIME_Relative timeout,
2135                     GNUNET_STREAM_DataProcessor proc,
2136                     void *proc_cls)
2137 {
2138   struct GNUNET_STREAM_IOReadHandle *read_handle;
2139   
2140   /* Return NULL if there is already a read handle; the user has to cancel that
2141   first before continuing or has to wait until it is completed */
2142   if (NULL != socket->read_handle) return NULL;
2143
2144   read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
2145   read_handle->proc = proc;
2146   socket->read_handle = read_handle;
2147
2148   /* if previous copy buffer is still not read call the data processor on it */
2149   if (NULL != socket->copy_buffer)
2150     GNUNET_SCHEDULER_add_continuation (&call_read_processor_task,
2151                                        socket,
2152                                        GNUNET_SCHEDULER_REASON_PREREQ_DONE);
2153   else
2154     prepare_buffer_for_read (socket);
2155
2156   return read_handle;
2157 }