- using de/serialization functionality
[oweals/gnunet.git] / src / stream / stream_api.c
1 /*
2   This file is part of GNUnet.
3   (C) 2012 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file stream/stream_api.c
23  * @brief Implementation of the stream library
24  * @author Sree Harsha Totakura
25  */
26 #include "platform.h"
27 #include "gnunet_common.h"
28 #include "gnunet_crypto_lib.h"
29 #include "gnunet_stream_lib.h"
30 #include "stream_protocol.h"
31
32
33 /**
34  * The maximum packet size of a stream packet
35  */
36 #define MAX_PACKET_SIZE 64000
37
38 /**
39  * The maximum payload a data message packet can carry
40  */
41 static size_t max_payload_size = 
42   MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
43
44 /**
45  * Receive buffer
46  */
47 #define RECEIVE_BUFFER_SIZE 4096000
48
49 /**
50  * states in the Protocol
51  */
52 enum State
53   {
54     /**
55      * Client initialization state
56      */
57     STATE_INIT,
58
59     /**
60      * Listener initialization state 
61      */
62     STATE_LISTEN,
63
64     /**
65      * Pre-connection establishment state
66      */
67     STATE_HELLO_WAIT,
68
69     /**
70      * State where a connection has been established
71      */
72     STATE_ESTABLISHED,
73
74     /**
75      * State where the socket is closed on our side and waiting to be ACK'ed
76      */
77     STATE_RECEIVE_CLOSE_WAIT,
78
79     /**
80      * State where the socket is closed for reading
81      */
82     STATE_RECEIVE_CLOSED,
83
84     /**
85      * State where the socket is closed on our side and waiting to be ACK'ed
86      */
87     STATE_TRANSMIT_CLOSE_WAIT,
88
89     /**
90      * State where the socket is closed for writing
91      */
92     STATE_TRANSMIT_CLOSED,
93
94     /**
95      * State where the socket is closed on our side and waiting to be ACK'ed
96      */
97     STATE_CLOSE_WAIT,
98
99     /**
100      * State where the socket is closed
101      */
102     STATE_CLOSED 
103   };
104
105
106 /**
107  * Functions of this type are called when a message is written
108  *
109  * @param cls the closure from queue_message
110  * @param socket the socket the written message was bound to
111  */
112 typedef void (*SendFinishCallback) (void *cls,
113                                     struct GNUNET_STREAM_Socket *socket);
114
115
116 /**
117  * The send message queue
118  */
119 struct MessageQueue
120 {
121   /**
122    * The message
123    */
124   struct GNUNET_STREAM_MessageHeader *message;
125
126   /**
127    * Callback to be called when the message is sent
128    */
129   SendFinishCallback finish_cb;
130
131   /**
132    * The closure for finish_cb
133    */
134   void *finish_cb_cls;
135
136   /**
137    * The next message in queue. Should be NULL in the last message
138    */
139   struct MessageQueue *next;
140
141   /**
142    * The next message in queue. Should be NULL in the first message
143    */
144   struct MessageQueue *prev;
145 };
146
147
148 /**
149  * The STREAM Socket Handler
150  */
151 struct GNUNET_STREAM_Socket
152 {
153
154   /**
155    * The peer identity of the peer at the other end of the stream
156    */
157   struct GNUNET_PeerIdentity other_peer;
158
159   /**
160    * Retransmission timeout
161    */
162   struct GNUNET_TIME_Relative retransmit_timeout;
163
164   /**
165    * The Acknowledgement Bitmap
166    */
167   GNUNET_STREAM_AckBitmap ack_bitmap;
168
169   /**
170    * Time when the Acknowledgement was queued
171    */
172   struct GNUNET_TIME_Absolute ack_time_registered;
173
174   /**
175    * Queued Acknowledgement deadline
176    */
177   struct GNUNET_TIME_Relative ack_time_deadline;
178
179   /**
180    * The task for sending timely Acks
181    */
182   GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
183
184   /**
185    * Task scheduled to continue a read operation.
186    */
187   GNUNET_SCHEDULER_TaskIdentifier read_task;
188
189   /**
190    * The mesh handle
191    */
192   struct GNUNET_MESH_Handle *mesh;
193
194   /**
195    * The mesh tunnel handle
196    */
197   struct GNUNET_MESH_Tunnel *tunnel;
198
199   /**
200    * Stream open closure
201    */
202   void *open_cls;
203
204   /**
205    * Stream open callback
206    */
207   GNUNET_STREAM_OpenCallback open_cb;
208
209   /**
210    * The current transmit handle (if a pending transmit request exists)
211    */
212   struct GNUNET_MESH_TransmitHandle *transmit_handle;
213
214   /**
215    * The current message associated with the transmit handle
216    */
217   struct MessageQueue *queue_head;
218
219   /**
220    * The queue tail, should always point to the last message in queue
221    */
222   struct MessageQueue *queue_tail;
223
224   /**
225    * The write IO_handle associated with this socket
226    */
227   struct GNUNET_STREAM_IOWriteHandle *write_handle;
228
229   /**
230    * The read IO_handle associated with this socket
231    */
232   struct GNUNET_STREAM_IOReadHandle *read_handle;
233
234   /**
235    * Buffer for storing received messages
236    */
237   void *receive_buffer;
238
239   /**
240    * Task identifier for the read io timeout task
241    */
242   GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task;
243
244   /**
245    * The state of the protocol associated with this socket
246    */
247   enum State state;
248
249   /**
250    * The status of the socket
251    */
252   enum GNUNET_STREAM_Status status;
253
254   /**
255    * The number of previous timeouts; FIXME: currently not used
256    */
257   unsigned int retries;
258
259   /**
260    * The session id associated with this stream connection
261    * FIXME: Not used currently, may be removed
262    */
263   uint32_t session_id;
264
265   /**
266    * Write sequence number. Set to random when sending HELLO(client) and
267    * HELLO_ACK(server) 
268    */
269   uint32_t write_sequence_number;
270
271   /**
272    * Read sequence number. This number's value is determined during handshake
273    */
274   uint32_t read_sequence_number;
275
276   /**
277    * The receiver buffer size
278    */
279   uint32_t receive_buffer_size;
280
281   /**
282    * The receiver buffer boundaries
283    */
284   uint32_t receive_buffer_boundaries[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
285
286   /**
287    * receiver's available buffer after the last acknowledged packet
288    */
289   uint32_t receive_window_available;
290
291   /**
292    * The offset pointer used during write operation
293    */
294   uint32_t write_offset;
295
296   /**
297    * The offset after which we are expecting data
298    */
299   uint32_t read_offset;
300
301   /**
302    * The offset upto which user has read from the received buffer
303    */
304   uint32_t copy_offset;
305 };
306
307
308 /**
309  * A socket for listening
310  */
311 struct GNUNET_STREAM_ListenSocket
312 {
313
314   /**
315    * The mesh handle
316    */
317   struct GNUNET_MESH_Handle *mesh;
318
319   /**
320    * The callback function which is called after successful opening socket
321    */
322   GNUNET_STREAM_ListenCallback listen_cb;
323
324   /**
325    * The call back closure
326    */
327   void *listen_cb_cls;
328
329   /**
330    * The service port
331    */
332   GNUNET_MESH_ApplicationType port;
333 };
334
335
336 /**
337  * The IO Write Handle
338  */
339 struct GNUNET_STREAM_IOWriteHandle
340 {
341   /**
342    * The packet_buffers associated with this Handle
343    */
344   struct GNUNET_STREAM_DataMessage *messages[64];
345
346   /**
347    * The bitmap of this IOHandle; Corresponding bit for a message is set when
348    * it has been acknowledged by the receiver
349    */
350   GNUNET_STREAM_AckBitmap ack_bitmap;
351
352   /**
353    * Number of packets sent before waiting for an ack
354    *
355    * FIXME: Do we need this?
356    */
357   unsigned int sent_packets;
358 };
359
360
361 /**
362  * The IO Read Handle
363  */
364 struct GNUNET_STREAM_IOReadHandle
365 {
366   /**
367    * Callback for the read processor
368    */
369   GNUNET_STREAM_DataProcessor proc;
370
371   /**
372    * The closure pointer for the read processor callback
373    */
374   void *proc_cls;
375 };
376
377
378 /**
379  * Default value in seconds for various timeouts
380  */
381 static unsigned int default_timeout = 300;
382
383
384 /**
385  * Callback function for sending hello message
386  *
387  * @param cls closure the socket
388  * @param size number of bytes available in buf
389  * @param buf where the callee should write the message
390  * @return number of bytes written to buf
391  */
392 static size_t
393 send_message_notify (void *cls, size_t size, void *buf)
394 {
395   struct GNUNET_STREAM_Socket *socket = cls;
396   struct MessageQueue *head;
397   size_t ret;
398
399   socket->transmit_handle = NULL; /* Remove the transmit handle */
400   head = socket->queue_head;
401   if (NULL == head)
402     return 0; /* just to be safe */
403   if (0 == size)                /* request timed out */
404     {
405       socket->retries++;
406       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
407                   "Message sending timed out. Retry %d \n",
408                   socket->retries);
409       socket->transmit_handle = 
410         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
411                                            0, /* Corking */
412                                            1, /* Priority */
413                                            /* FIXME: exponential backoff */
414                                            socket->retransmit_timeout,
415                                            &socket->other_peer,
416                                            ntohs (head->message->header.size),
417                                            &send_message_notify,
418                                            socket);
419       return 0;
420     }
421
422   ret = ntohs (head->message->header.size);
423   GNUNET_assert (size >= ret);
424   memcpy (buf, head->message, ret);
425   if (NULL != head->finish_cb)
426     {
427       head->finish_cb (socket, head->finish_cb_cls);
428     }
429   GNUNET_CONTAINER_DLL_remove (socket->queue_head,
430                                socket->queue_tail,
431                                head);
432   GNUNET_free (head->message);
433   GNUNET_free (head);
434   head = socket->queue_head;
435   if (NULL != head)    /* more pending messages to send */
436     {
437       socket->retries = 0;
438       socket->transmit_handle = 
439         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
440                                            0, /* Corking */
441                                            1, /* Priority */
442                                            /* FIXME: exponential backoff */
443                                            socket->retransmit_timeout,
444                                            &socket->other_peer,
445                                            ntohs (head->message->header.size),
446                                            &send_message_notify,
447                                            socket);
448     }
449   return ret;
450 }
451
452
453 /**
454  * Queues a message for sending using the mesh connection of a socket
455  *
456  * @param socket the socket whose mesh connection is used
457  * @param message the message to be sent
458  * @param finish_cb the callback to be called when the message is sent
459  * @param finish_cb_cls the closure for the callback
460  */
461 static void
462 queue_message (struct GNUNET_STREAM_Socket *socket,
463                struct GNUNET_STREAM_MessageHeader *message,
464                SendFinishCallback finish_cb,
465                void *finish_cb_cls)
466 {
467   struct MessageQueue *queue_entity;
468
469   queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
470   queue_entity->message = message;
471   queue_entity->finish_cb = finish_cb;
472   queue_entity->finish_cb_cls = finish_cb_cls;
473   GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
474                                     socket->queue_tail,
475                                     queue_entity);
476   if (NULL == socket->transmit_handle)
477   {
478     socket->retries = 0;
479     socket->transmit_handle = 
480       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
481                                          0, /* Corking */
482                                          1, /* Priority */
483                                          socket->retransmit_timeout,
484                                          &socket->other_peer,
485                                          ntohs (message->header.size),
486                                          &send_message_notify,
487                                          socket);
488   }
489 }
490
491
492 /**
493  * Callback function for sending ack message
494  *
495  * @param cls closure the ACK message created in ack_task
496  * @param size number of bytes available in buffer
497  * @param buf where the callee should write the message
498  * @return number of bytes written to buf
499  */
500 static size_t
501 send_ack_notify (void *cls, size_t size, void *buf)
502 {
503   struct GNUNET_STREAM_AckMessage *ack_msg = cls;
504
505   if (0 == size)
506     {
507       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
508                   "%s called with size 0\n", __func__);
509       return 0;
510     }
511   GNUNET_assert (ack_msg->header.header.size <= size);
512   
513   size = ack_msg->header.header.size;
514   memcpy (buf, ack_msg, size);
515   return size;
516 }
517
518
519 /**
520  * Task for sending ACK message
521  *
522  * @param cls the socket
523  * @param tc the Task context
524  */
525 static void
526 ack_task (void *cls,
527           const struct GNUNET_SCHEDULER_TaskContext *tc)
528 {
529   struct GNUNET_STREAM_Socket *socket = cls;
530   struct GNUNET_STREAM_AckMessage *ack_msg;
531
532   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
533     {
534       return;
535     }
536
537   socket->ack_task_id = 0;
538
539   /* Create the ACK Message */
540   ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
541   ack_msg->header.header.size = htons (sizeof (struct 
542                                                GNUNET_STREAM_AckMessage));
543   ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
544   ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
545   ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
546   ack_msg->receive_window_remaining = 
547     htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
548
549   /* Request MESH for sending ACK */
550   GNUNET_MESH_notify_transmit_ready (socket->tunnel,
551                                      0, /* Corking */
552                                      1, /* Priority */
553                                      socket->retransmit_timeout,
554                                      &socket->other_peer,
555                                      ntohs (ack_msg->header.header.size),
556                                      &send_ack_notify,
557                                      ack_msg);
558
559   
560 }
561
562
563 /**
564  * Function to modify a bit in GNUNET_STREAM_AckBitmap
565  *
566  * @param bitmap the bitmap to modify
567  * @param bit the bit number to modify
568  * @param value GNUNET_YES to on, GNUNET_NO to off
569  */
570 static void
571 ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
572                       unsigned int bit, 
573                       int value)
574 {
575   GNUNET_assert (bit < 64);
576   if (GNUNET_YES == value)
577     *bitmap |= (1LL << bit);
578   else
579     *bitmap &= ~(1LL << bit);
580 }
581
582
583 /**
584  * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
585  *
586  * @param bitmap address of the bitmap that has to be checked
587  * @param bit the bit number to check
588  * @return GNUNET_YES if the bit is set; GNUNET_NO if not
589  */
590 static uint8_t
591 ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
592                       unsigned int bit)
593 {
594   GNUNET_assert (bit < 64);
595   return 0 != (*bitmap & (1LL << bit));
596 }
597
598
599
600 /**
601  * Function called when Data Message is sent
602  *
603  * @param cls the io_handle corresponding to the Data Message
604  * @param socket the socket which was used
605  */
606 static void
607 write_data_finish_cb (void *cls,
608                       struct GNUNET_STREAM_Socket *socket)
609 {
610   struct GNUNET_STREAM_IOWriteHandle *io_handle = cls;
611
612   io_handle->sent_packets++;
613 }
614
615
616 /**
617  * Writes data using the given socket. The amount of data written is limited by
618  * the receive_window_size
619  *
620  * @param socket the socket to use
621  */
622 static void 
623 write_data (struct GNUNET_STREAM_Socket *socket)
624 {
625   struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
626   unsigned int packet;
627   int ack_packet;
628
629   ack_packet = -1;
630   /* Find the last acknowledged packet */
631   for (packet=0; packet < 64; packet++)
632     {      
633       if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
634                                               packet))
635         ack_packet = packet;        
636       else if (NULL == io_handle->messages[packet])
637         break;
638     }
639   /* Resend packets which weren't ack'ed */
640   for (packet=0; packet < ack_packet; packet++)
641     {
642       if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
643                                              packet))
644         {
645           queue_message (socket,
646                          &io_handle->messages[packet]->header,
647                          NULL,
648                          NULL);
649         }
650     }
651   packet = ack_packet + 1;
652   /* Now send new packets if there is enough buffer space */
653   while ( (NULL != io_handle->messages[packet]) &&
654           (socket->receive_window_available >= ntohs (io_handle->messages[packet]->header.header.size)) )
655     {
656       socket->receive_window_available -= ntohs (io_handle->messages[packet]->header.header.size);
657       queue_message (socket,
658                      &io_handle->messages[packet]->header,
659                      &write_data_finish_cb,
660                      io_handle);
661       packet++;
662     }
663 }
664
665
666 /**
667  * Task for calling the read processor
668  *
669  * @param cls the socket
670  * @param tc the task context
671  */
672 static void
673 call_read_processor (void *cls,
674                           const struct GNUNET_SCHEDULER_TaskContext *tc)
675 {
676   struct GNUNET_STREAM_Socket *socket = cls;
677   size_t read_size;
678   size_t valid_read_size;
679   unsigned int packet;
680   uint32_t sequence_increase;
681   uint32_t offset_increase;
682
683   socket->read_task = GNUNET_SCHEDULER_NO_TASK;
684   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
685     return;
686
687   GNUNET_assert (NULL != socket->read_handle);
688   GNUNET_assert (NULL != socket->read_handle->proc);
689
690   /* Check the bitmap for any holes */
691   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
692     {
693       if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
694                                              packet))
695         break;
696     }
697   /* We only call read processor if we have the first packet */
698   GNUNET_assert (0 < packet);
699
700   valid_read_size = 
701     socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
702
703   GNUNET_assert (0 != valid_read_size);
704
705   /* Cancel the read_io_timeout_task */
706   GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task);
707   socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK;
708
709   /* Call the data processor */
710   read_size = 
711     socket->read_handle->proc (socket->read_handle->proc_cls,
712                                socket->status,
713                                socket->receive_buffer + socket->copy_offset,
714                                valid_read_size);
715   /* Free the read handle */
716   GNUNET_free (socket->read_handle);
717   socket->read_handle = NULL;
718
719   GNUNET_assert (read_size <= valid_read_size);
720   socket->copy_offset += read_size;
721
722   /* Determine upto which packet we can remove from the buffer */
723   for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
724     if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
725       break;
726
727   /* If no packets can be removed we can't move the buffer */
728   if (0 == packet) return;
729
730   sequence_increase = packet;
731
732   /* Shift the data in the receive buffer */
733   memmove (socket->receive_buffer,
734            socket->receive_buffer 
735            + socket->receive_buffer_boundaries[sequence_increase-1],
736            socket->receive_buffer_size - socket->receive_buffer_boundaries[sequence_increase-1]);
737   
738   /* Shift the bitmap */
739   socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
740   
741   /* Set read_sequence_number */
742   socket->read_sequence_number += sequence_increase;
743   
744   /* Set read_offset */
745   offset_increase = socket->receive_buffer_boundaries[sequence_increase-1];
746   socket->read_offset += offset_increase;
747
748   /* Fix copy_offset */
749   GNUNET_assert (offset_increase <= socket->copy_offset);
750   socket->copy_offset -= offset_increase;
751   
752   /* Fix relative boundaries */
753   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
754     {
755       if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase)
756         {
757           socket->receive_buffer_boundaries[packet] = 
758             socket->receive_buffer_boundaries[packet + sequence_increase] 
759             - offset_increase;
760         }
761       else
762         socket->receive_buffer_boundaries[packet] = 0;
763     }
764 }
765
766
767 /**
768  * Cancels the existing read io handle
769  *
770  * @param cls the closure from the SCHEDULER call
771  * @param tc the task context
772  */
773 static void
774 read_io_timeout (void *cls, 
775                 const struct GNUNET_SCHEDULER_TaskContext *tc)
776 {
777   struct GNUNET_STREAM_Socket *socket = cls;
778
779   socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK;
780   if (socket->read_task != GNUNET_SCHEDULER_NO_TASK)
781   {
782     GNUNET_SCHEDULER_cancel (socket->read_task);
783     socket->read_task = GNUNET_SCHEDULER_NO_TASK;
784   }
785   GNUNET_assert (NULL != socket->read_handle);
786   
787   GNUNET_free (socket->read_handle);
788   socket->read_handle = NULL;
789 }
790
791
792 /**
793  * Handler for DATA messages; Same for both client and server
794  *
795  * @param socket the socket through which the ack was received
796  * @param tunnel connection to the other end
797  * @param sender who sent the message
798  * @param msg the data message
799  * @param atsi performance data for the connection
800  * @return GNUNET_OK to keep the connection open,
801  *         GNUNET_SYSERR to close it (signal serious error)
802  */
803 static int
804 handle_data (struct GNUNET_STREAM_Socket *socket,
805              struct GNUNET_MESH_Tunnel *tunnel,
806              const struct GNUNET_PeerIdentity *sender,
807              const struct GNUNET_STREAM_DataMessage *msg,
808              const struct GNUNET_ATS_Information*atsi)
809 {
810   const void *payload;
811   uint32_t bytes_needed;
812   uint32_t relative_offset;
813   uint32_t relative_sequence_number;
814   uint16_t size;
815
816   size = htons (msg->header.header.size);
817   if (size < sizeof (struct GNUNET_STREAM_DataMessage))
818     {
819       GNUNET_break_op (0);
820       return GNUNET_SYSERR;
821     }
822
823   switch (socket->state)
824     {
825     case STATE_ESTABLISHED:
826     case STATE_TRANSMIT_CLOSED:
827     case STATE_TRANSMIT_CLOSE_WAIT:
828
829       /* check if the message's sequence number is in the range we are
830          expecting */
831       relative_sequence_number = 
832         ntohl (msg->sequence_number) - socket->read_sequence_number;
833       if ( relative_sequence_number > 64)
834         {
835           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
836                       "Ignoring received message with sequence number %d",
837                       ntohl (msg->sequence_number));
838           return GNUNET_YES;
839         }
840
841       /* Check if we have to allocate the buffer */
842       size -= sizeof (struct GNUNET_STREAM_DataMessage);
843       relative_offset = ntohl (msg->offset) - socket->read_offset;
844       bytes_needed = relative_offset + size;
845       
846       if (bytes_needed > socket->receive_buffer_size)
847         {
848           if (bytes_needed <= RECEIVE_BUFFER_SIZE)
849             {
850               socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
851                                                        bytes_needed);
852               socket->receive_buffer_size = bytes_needed;
853             }
854           else
855             {
856               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
857                           "Cannot accommodate packet %d as buffer is full\n",
858                           ntohl (msg->sequence_number));
859               return GNUNET_YES;
860             }
861         }
862       
863       /* Copy Data to buffer */
864       payload = &msg[1];
865       GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
866       memcpy (socket->receive_buffer + relative_offset,
867               payload,
868               size);
869       socket->receive_buffer_boundaries[relative_sequence_number] = 
870         relative_offset + size;
871       
872       /* Modify the ACK bitmap */
873       ackbitmap_modify_bit (&socket->ack_bitmap,
874                             relative_sequence_number,
875                             GNUNET_YES);
876
877       /* Start ACK sending task if one is not already present */
878       if (0 == socket->ack_task_id)
879        {
880          socket->ack_task_id = 
881            GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
882                                          (msg->ack_deadline),
883                                          &ack_task,
884                                          socket);
885        }
886
887       if ((NULL != socket->read_handle) /* A read handle is waiting */
888           /* There is no current read task */
889           && (GNUNET_SCHEDULER_NO_TASK == socket->read_task)
890           /* We have the first packet */
891           && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap,
892                                                  0)))
893         {
894           socket->read_task = 
895             GNUNET_SCHEDULER_add_now (&call_read_processor,
896                                       socket);
897         }
898       
899       break;
900
901     default:
902       /* FIXME: call statistics */
903       break;
904     }
905   return GNUNET_YES;
906 }
907
908 /**
909  * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
910  *
911  * @param cls the socket (set from GNUNET_MESH_connect)
912  * @param tunnel connection to the other end
913  * @param tunnel_ctx place to store local state associated with the tunnel
914  * @param sender who sent the message
915  * @param message the actual message
916  * @param atsi performance data for the connection
917  * @return GNUNET_OK to keep the connection open,
918  *         GNUNET_SYSERR to close it (signal serious error)
919  */
920 static int
921 client_handle_data (void *cls,
922              struct GNUNET_MESH_Tunnel *tunnel,
923              void **tunnel_ctx,
924              const struct GNUNET_PeerIdentity *sender,
925              const struct GNUNET_MessageHeader *message,
926              const struct GNUNET_ATS_Information*atsi)
927 {
928   struct GNUNET_STREAM_Socket *socket = cls;
929
930   return handle_data (socket, 
931                       tunnel, 
932                       sender, 
933                       (const struct GNUNET_STREAM_DataMessage *) message, 
934                       atsi);
935 }
936
937
938 /**
939  * Callback to set state to ESTABLISHED
940  *
941  * @param cls the closure from queue_message FIXME: document
942  * @param socket the socket to requiring state change
943  */
944 static void
945 set_state_established (void *cls,
946                        struct GNUNET_STREAM_Socket *socket)
947 {
948   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Attaining ESTABLISHED state\n");
949   socket->write_offset = 0;
950   socket->read_offset = 0;
951   socket->state = STATE_ESTABLISHED;
952 }
953
954
955 /**
956  * Callback to set state to HELLO_WAIT
957  *
958  * @param cls the closure from queue_message
959  * @param socket the socket to requiring state change
960  */
961 static void
962 set_state_hello_wait (void *cls,
963                       struct GNUNET_STREAM_Socket *socket)
964 {
965   GNUNET_assert (STATE_INIT == socket->state);
966   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Attaining HELLO_WAIT state\n");
967   socket->state = STATE_HELLO_WAIT;
968 }
969
970
971 /**
972  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
973  *
974  * @param cls the socket (set from GNUNET_MESH_connect)
975  * @param tunnel connection to the other end
976  * @param tunnel_ctx this is NULL
977  * @param sender who sent the message
978  * @param message the actual message
979  * @param atsi performance data for the connection
980  * @return GNUNET_OK to keep the connection open,
981  *         GNUNET_SYSERR to close it (signal serious error)
982  */
983 static int
984 client_handle_hello_ack (void *cls,
985                          struct GNUNET_MESH_Tunnel *tunnel,
986                          void **tunnel_ctx,
987                          const struct GNUNET_PeerIdentity *sender,
988                          const struct GNUNET_MessageHeader *message,
989                          const struct GNUNET_ATS_Information*atsi)
990 {
991   struct GNUNET_STREAM_Socket *socket = cls;
992   const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
993   struct GNUNET_STREAM_HelloAckMessage *reply;
994
995   ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
996   GNUNET_assert (socket->tunnel == tunnel);
997   switch (socket->state)
998   {
999   case STATE_HELLO_WAIT:
1000       socket->read_sequence_number = ntohl (ack_msg->sequence_number);
1001       socket->receive_window_available = ntohl (ack_msg->receive_window_size);
1002       /* Get the random sequence number */
1003       socket->write_sequence_number = 
1004         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1005       reply = 
1006         GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1007       reply->header.header.size = 
1008         htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1009       reply->header.header.type = 
1010         htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1011       reply->sequence_number = htonl (socket->write_sequence_number);
1012       reply->receive_window_size = htonl (RECEIVE_BUFFER_SIZE);
1013       queue_message (socket, 
1014                      &reply->header, 
1015                      &set_state_established, 
1016                      NULL);      
1017       return GNUNET_OK;
1018   case STATE_ESTABLISHED:
1019   case STATE_RECEIVE_CLOSE_WAIT:
1020     // call statistics (# ACKs ignored++)
1021     return GNUNET_OK;
1022   case STATE_INIT:
1023   default:
1024     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1025                 "Server sent HELLO_ACK when in state %d\n", socket->state);
1026     socket->state = STATE_CLOSED; // introduce STATE_ERROR?
1027     return GNUNET_SYSERR;
1028   }
1029
1030 }
1031
1032
1033 /**
1034  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1035  *
1036  * @param cls the socket (set from GNUNET_MESH_connect)
1037  * @param tunnel connection to the other end
1038  * @param tunnel_ctx this is NULL
1039  * @param sender who sent the message
1040  * @param message the actual message
1041  * @param atsi performance data for the connection
1042  * @return GNUNET_OK to keep the connection open,
1043  *         GNUNET_SYSERR to close it (signal serious error)
1044  */
1045 static int
1046 client_handle_reset (void *cls,
1047                      struct GNUNET_MESH_Tunnel *tunnel,
1048                      void **tunnel_ctx,
1049                      const struct GNUNET_PeerIdentity *sender,
1050                      const struct GNUNET_MessageHeader *message,
1051                      const struct GNUNET_ATS_Information*atsi)
1052 {
1053   struct GNUNET_STREAM_Socket *socket = cls;
1054
1055   return GNUNET_OK;
1056 }
1057
1058
1059 /**
1060  * Common message handler for handling TRANSMIT_CLOSE messages
1061  *
1062  * @param socket the socket through which the ack was received
1063  * @param tunnel connection to the other end
1064  * @param sender who sent the message
1065  * @param msg the transmit close message
1066  * @param atsi performance data for the connection
1067  * @return GNUNET_OK to keep the connection open,
1068  *         GNUNET_SYSERR to close it (signal serious error)
1069  */
1070 static int
1071 handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
1072                        struct GNUNET_MESH_Tunnel *tunnel,
1073                        const struct GNUNET_PeerIdentity *sender,
1074                        const struct GNUNET_STREAM_MessageHeader *msg,
1075                        const struct GNUNET_ATS_Information*atsi)
1076 {
1077   struct GNUNET_STREAM_MessageHeader *reply;
1078
1079   switch (socket->state)
1080     {
1081     case STATE_ESTABLISHED:
1082       socket->state = STATE_RECEIVE_CLOSED;
1083
1084       /* Send TRANSMIT_CLOSE_ACK */
1085       reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1086       reply->header.type = 
1087         htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
1088       reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1089       queue_message (socket, reply, NULL, NULL);
1090       break;
1091
1092     default:
1093       /* FIXME: Call statistics? */
1094       break;
1095     }
1096   return GNUNET_YES;
1097 }
1098
1099
1100 /**
1101  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1102  *
1103  * @param cls the socket (set from GNUNET_MESH_connect)
1104  * @param tunnel connection to the other end
1105  * @param tunnel_ctx this is NULL
1106  * @param sender who sent the message
1107  * @param message the actual message
1108  * @param atsi performance data for the connection
1109  * @return GNUNET_OK to keep the connection open,
1110  *         GNUNET_SYSERR to close it (signal serious error)
1111  */
1112 static int
1113 client_handle_transmit_close (void *cls,
1114                               struct GNUNET_MESH_Tunnel *tunnel,
1115                               void **tunnel_ctx,
1116                               const struct GNUNET_PeerIdentity *sender,
1117                               const struct GNUNET_MessageHeader *message,
1118                               const struct GNUNET_ATS_Information*atsi)
1119 {
1120   struct GNUNET_STREAM_Socket *socket = cls;
1121   
1122   return handle_transmit_close (socket,
1123                                 tunnel,
1124                                 sender,
1125                                 (struct GNUNET_STREAM_MessageHeader *)message,
1126                                 atsi);
1127 }
1128
1129
1130 /**
1131  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1132  *
1133  * @param cls the socket (set from GNUNET_MESH_connect)
1134  * @param tunnel connection to the other end
1135  * @param tunnel_ctx this is NULL
1136  * @param sender who sent the message
1137  * @param message the actual message
1138  * @param atsi performance data for the connection
1139  * @return GNUNET_OK to keep the connection open,
1140  *         GNUNET_SYSERR to close it (signal serious error)
1141  */
1142 static int
1143 client_handle_transmit_close_ack (void *cls,
1144                                   struct GNUNET_MESH_Tunnel *tunnel,
1145                                   void **tunnel_ctx,
1146                                   const struct GNUNET_PeerIdentity *sender,
1147                                   const struct GNUNET_MessageHeader *message,
1148                                   const struct GNUNET_ATS_Information*atsi)
1149 {
1150   struct GNUNET_STREAM_Socket *socket = cls;
1151
1152   return GNUNET_OK;
1153 }
1154
1155
1156 /**
1157  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1158  *
1159  * @param cls the socket (set from GNUNET_MESH_connect)
1160  * @param tunnel connection to the other end
1161  * @param tunnel_ctx this is NULL
1162  * @param sender who sent the message
1163  * @param message the actual message
1164  * @param atsi performance data for the connection
1165  * @return GNUNET_OK to keep the connection open,
1166  *         GNUNET_SYSERR to close it (signal serious error)
1167  */
1168 static int
1169 client_handle_receive_close (void *cls,
1170                              struct GNUNET_MESH_Tunnel *tunnel,
1171                              void **tunnel_ctx,
1172                              const struct GNUNET_PeerIdentity *sender,
1173                              const struct GNUNET_MessageHeader *message,
1174                              const struct GNUNET_ATS_Information*atsi)
1175 {
1176   struct GNUNET_STREAM_Socket *socket = cls;
1177
1178   return GNUNET_OK;
1179 }
1180
1181
1182 /**
1183  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1184  *
1185  * @param cls the socket (set from GNUNET_MESH_connect)
1186  * @param tunnel connection to the other end
1187  * @param tunnel_ctx this is NULL
1188  * @param sender who sent the message
1189  * @param message the actual message
1190  * @param atsi performance data for the connection
1191  * @return GNUNET_OK to keep the connection open,
1192  *         GNUNET_SYSERR to close it (signal serious error)
1193  */
1194 static int
1195 client_handle_receive_close_ack (void *cls,
1196                                  struct GNUNET_MESH_Tunnel *tunnel,
1197                                  void **tunnel_ctx,
1198                                  const struct GNUNET_PeerIdentity *sender,
1199                                  const struct GNUNET_MessageHeader *message,
1200                                  const struct GNUNET_ATS_Information*atsi)
1201 {
1202   struct GNUNET_STREAM_Socket *socket = cls;
1203
1204   return GNUNET_OK;
1205 }
1206
1207
1208 /**
1209  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1210  *
1211  * @param cls the socket (set from GNUNET_MESH_connect)
1212  * @param tunnel connection to the other end
1213  * @param tunnel_ctx this is NULL
1214  * @param sender who sent the message
1215  * @param message the actual message
1216  * @param atsi performance data for the connection
1217  * @return GNUNET_OK to keep the connection open,
1218  *         GNUNET_SYSERR to close it (signal serious error)
1219  */
1220 static int
1221 client_handle_close (void *cls,
1222                      struct GNUNET_MESH_Tunnel *tunnel,
1223                      void **tunnel_ctx,
1224                      const struct GNUNET_PeerIdentity *sender,
1225                      const struct GNUNET_MessageHeader *message,
1226                      const struct GNUNET_ATS_Information*atsi)
1227 {
1228   struct GNUNET_STREAM_Socket *socket = cls;
1229
1230   return GNUNET_OK;
1231 }
1232
1233
1234 /**
1235  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1236  *
1237  * @param cls the socket (set from GNUNET_MESH_connect)
1238  * @param tunnel connection to the other end
1239  * @param tunnel_ctx this is NULL
1240  * @param sender who sent the message
1241  * @param message the actual message
1242  * @param atsi performance data for the connection
1243  * @return GNUNET_OK to keep the connection open,
1244  *         GNUNET_SYSERR to close it (signal serious error)
1245  */
1246 static int
1247 client_handle_close_ack (void *cls,
1248                          struct GNUNET_MESH_Tunnel *tunnel,
1249                          void **tunnel_ctx,
1250                          const struct GNUNET_PeerIdentity *sender,
1251                          const struct GNUNET_MessageHeader *message,
1252                          const struct GNUNET_ATS_Information*atsi)
1253 {
1254   struct GNUNET_STREAM_Socket *socket = cls;
1255
1256   return GNUNET_OK;
1257 }
1258
1259 /*****************************/
1260 /* Server's Message Handlers */
1261 /*****************************/
1262
1263 /**
1264  * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1265  *
1266  * @param cls the closure
1267  * @param tunnel connection to the other end
1268  * @param tunnel_ctx the socket
1269  * @param sender who sent the message
1270  * @param message the actual message
1271  * @param atsi performance data for the connection
1272  * @return GNUNET_OK to keep the connection open,
1273  *         GNUNET_SYSERR to close it (signal serious error)
1274  */
1275 static int
1276 server_handle_data (void *cls,
1277                     struct GNUNET_MESH_Tunnel *tunnel,
1278                     void **tunnel_ctx,
1279                     const struct GNUNET_PeerIdentity *sender,
1280                     const struct GNUNET_MessageHeader *message,
1281                     const struct GNUNET_ATS_Information*atsi)
1282 {
1283   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1284
1285   return handle_data (socket,
1286                       tunnel,
1287                       sender,
1288                       (const struct GNUNET_STREAM_DataMessage *)message,
1289                       atsi);
1290 }
1291
1292
1293 /**
1294  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
1295  *
1296  * @param cls the closure
1297  * @param tunnel connection to the other end
1298  * @param tunnel_ctx the socket
1299  * @param sender who sent the message
1300  * @param message the actual message
1301  * @param atsi performance data for the connection
1302  * @return GNUNET_OK to keep the connection open,
1303  *         GNUNET_SYSERR to close it (signal serious error)
1304  */
1305 static int
1306 server_handle_hello (void *cls,
1307                      struct GNUNET_MESH_Tunnel *tunnel,
1308                      void **tunnel_ctx,
1309                      const struct GNUNET_PeerIdentity *sender,
1310                      const struct GNUNET_MessageHeader *message,
1311                      const struct GNUNET_ATS_Information*atsi)
1312 {
1313   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1314   struct GNUNET_STREAM_HelloAckMessage *reply;
1315
1316   GNUNET_assert (socket->tunnel == tunnel);
1317   if (STATE_INIT == socket->state)
1318     {
1319       /* Get the random sequence number */
1320       socket->write_sequence_number = 
1321         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1322       reply = 
1323         GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1324       reply->header.header.size = 
1325         htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1326       reply->header.header.type = 
1327         htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1328       reply->sequence_number = htonl (socket->write_sequence_number);
1329       queue_message (socket, 
1330                      &reply->header,
1331                      &set_state_hello_wait, 
1332                      NULL);
1333     }
1334   else
1335     {
1336       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1337                   "Client sent HELLO when in state %d\n", socket->state);
1338       /* FIXME: Send RESET? */
1339       
1340     }
1341   return GNUNET_OK;
1342 }
1343
1344
1345 /**
1346  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1347  *
1348  * @param cls the closure
1349  * @param tunnel connection to the other end
1350  * @param tunnel_ctx the socket
1351  * @param sender who sent the message
1352  * @param message the actual message
1353  * @param atsi performance data for the connection
1354  * @return GNUNET_OK to keep the connection open,
1355  *         GNUNET_SYSERR to close it (signal serious error)
1356  */
1357 static int
1358 server_handle_hello_ack (void *cls,
1359                          struct GNUNET_MESH_Tunnel *tunnel,
1360                          void **tunnel_ctx,
1361                          const struct GNUNET_PeerIdentity *sender,
1362                          const struct GNUNET_MessageHeader *message,
1363                          const struct GNUNET_ATS_Information*atsi)
1364 {
1365   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1366   const struct GNUNET_STREAM_HelloAckMessage *ack_message;
1367
1368   ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
1369   GNUNET_assert (socket->tunnel == tunnel);
1370   if (STATE_HELLO_WAIT == socket->state)
1371     {
1372       socket->read_sequence_number = ntohl (ack_message->sequence_number);
1373       socket->receive_window_available = 
1374         ntohl (ack_message->receive_window_size);
1375       /* Attain ESTABLISHED state */
1376       set_state_established (NULL, socket);
1377     }
1378   else
1379     {
1380       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1381                   "Client sent HELLO_ACK when in state %d\n", socket->state);
1382       /* FIXME: Send RESET? */
1383       
1384     }
1385   return GNUNET_OK;
1386 }
1387
1388
1389 /**
1390  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1391  *
1392  * @param cls the closure
1393  * @param tunnel connection to the other end
1394  * @param tunnel_ctx the socket
1395  * @param sender who sent the message
1396  * @param message the actual message
1397  * @param atsi performance data for the connection
1398  * @return GNUNET_OK to keep the connection open,
1399  *         GNUNET_SYSERR to close it (signal serious error)
1400  */
1401 static int
1402 server_handle_reset (void *cls,
1403                      struct GNUNET_MESH_Tunnel *tunnel,
1404                      void **tunnel_ctx,
1405                      const struct GNUNET_PeerIdentity *sender,
1406                      const struct GNUNET_MessageHeader *message,
1407                      const struct GNUNET_ATS_Information*atsi)
1408 {
1409   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1410
1411   return GNUNET_OK;
1412 }
1413
1414
1415 /**
1416  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1417  *
1418  * @param cls the closure
1419  * @param tunnel connection to the other end
1420  * @param tunnel_ctx the socket
1421  * @param sender who sent the message
1422  * @param message the actual message
1423  * @param atsi performance data for the connection
1424  * @return GNUNET_OK to keep the connection open,
1425  *         GNUNET_SYSERR to close it (signal serious error)
1426  */
1427 static int
1428 server_handle_transmit_close (void *cls,
1429                               struct GNUNET_MESH_Tunnel *tunnel,
1430                               void **tunnel_ctx,
1431                               const struct GNUNET_PeerIdentity *sender,
1432                               const struct GNUNET_MessageHeader *message,
1433                               const struct GNUNET_ATS_Information*atsi)
1434 {
1435   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1436
1437   return handle_transmit_close (socket,
1438                                 tunnel,
1439                                 sender,
1440                                 (struct GNUNET_STREAM_MessageHeader *)message,
1441                                 atsi);
1442 }
1443
1444
1445 /**
1446  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1447  *
1448  * @param cls the closure
1449  * @param tunnel connection to the other end
1450  * @param tunnel_ctx the socket
1451  * @param sender who sent the message
1452  * @param message the actual message
1453  * @param atsi performance data for the connection
1454  * @return GNUNET_OK to keep the connection open,
1455  *         GNUNET_SYSERR to close it (signal serious error)
1456  */
1457 static int
1458 server_handle_transmit_close_ack (void *cls,
1459                                   struct GNUNET_MESH_Tunnel *tunnel,
1460                                   void **tunnel_ctx,
1461                                   const struct GNUNET_PeerIdentity *sender,
1462                                   const struct GNUNET_MessageHeader *message,
1463                                   const struct GNUNET_ATS_Information*atsi)
1464 {
1465   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1466
1467   return GNUNET_OK;
1468 }
1469
1470
1471 /**
1472  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1473  *
1474  * @param cls the closure
1475  * @param tunnel connection to the other end
1476  * @param tunnel_ctx the socket
1477  * @param sender who sent the message
1478  * @param message the actual message
1479  * @param atsi performance data for the connection
1480  * @return GNUNET_OK to keep the connection open,
1481  *         GNUNET_SYSERR to close it (signal serious error)
1482  */
1483 static int
1484 server_handle_receive_close (void *cls,
1485                              struct GNUNET_MESH_Tunnel *tunnel,
1486                              void **tunnel_ctx,
1487                              const struct GNUNET_PeerIdentity *sender,
1488                              const struct GNUNET_MessageHeader *message,
1489                              const struct GNUNET_ATS_Information*atsi)
1490 {
1491   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1492
1493   return GNUNET_OK;
1494 }
1495
1496
1497 /**
1498  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1499  *
1500  * @param cls the closure
1501  * @param tunnel connection to the other end
1502  * @param tunnel_ctx the socket
1503  * @param sender who sent the message
1504  * @param message the actual message
1505  * @param atsi performance data for the connection
1506  * @return GNUNET_OK to keep the connection open,
1507  *         GNUNET_SYSERR to close it (signal serious error)
1508  */
1509 static int
1510 server_handle_receive_close_ack (void *cls,
1511                                  struct GNUNET_MESH_Tunnel *tunnel,
1512                                  void **tunnel_ctx,
1513                                  const struct GNUNET_PeerIdentity *sender,
1514                                  const struct GNUNET_MessageHeader *message,
1515                                  const struct GNUNET_ATS_Information*atsi)
1516 {
1517   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1518
1519   return GNUNET_OK;
1520 }
1521
1522
1523 /**
1524  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1525  *
1526  * @param cls the closure
1527  * @param tunnel connection to the other end
1528  * @param tunnel_ctx the socket
1529  * @param sender who sent the message
1530  * @param message the actual message
1531  * @param atsi performance data for the connection
1532  * @return GNUNET_OK to keep the connection open,
1533  *         GNUNET_SYSERR to close it (signal serious error)
1534  */
1535 static int
1536 server_handle_close (void *cls,
1537                      struct GNUNET_MESH_Tunnel *tunnel,
1538                      void **tunnel_ctx,
1539                      const struct GNUNET_PeerIdentity *sender,
1540                      const struct GNUNET_MessageHeader *message,
1541                      const struct GNUNET_ATS_Information*atsi)
1542 {
1543   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1544
1545   return GNUNET_OK;
1546 }
1547
1548
1549 /**
1550  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1551  *
1552  * @param cls the closure
1553  * @param tunnel connection to the other end
1554  * @param tunnel_ctx the socket
1555  * @param sender who sent the message
1556  * @param message the actual message
1557  * @param atsi performance data for the connection
1558  * @return GNUNET_OK to keep the connection open,
1559  *         GNUNET_SYSERR to close it (signal serious error)
1560  */
1561 static int
1562 server_handle_close_ack (void *cls,
1563                          struct GNUNET_MESH_Tunnel *tunnel,
1564                          void **tunnel_ctx,
1565                          const struct GNUNET_PeerIdentity *sender,
1566                          const struct GNUNET_MessageHeader *message,
1567                          const struct GNUNET_ATS_Information*atsi)
1568 {
1569   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1570
1571   return GNUNET_OK;
1572 }
1573
1574
1575 /**
1576  * Message Handler for mesh
1577  *
1578  * @param socket the socket through which the ack was received
1579  * @param tunnel connection to the other end
1580  * @param sender who sent the message
1581  * @param ack the acknowledgment message
1582  * @param atsi performance data for the connection
1583  * @return GNUNET_OK to keep the connection open,
1584  *         GNUNET_SYSERR to close it (signal serious error)
1585  */
1586 static int
1587 handle_ack (struct GNUNET_STREAM_Socket *socket,
1588             struct GNUNET_MESH_Tunnel *tunnel,
1589             const struct GNUNET_PeerIdentity *sender,
1590             const struct GNUNET_STREAM_AckMessage *ack,
1591             const struct GNUNET_ATS_Information*atsi)
1592 {
1593   switch (socket->state)
1594     {
1595     case (STATE_ESTABLISHED):
1596       if (NULL == socket->write_handle)
1597         {
1598           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1599                       "Received DATA ACK when write_handle is NULL\n");
1600           return GNUNET_OK;
1601         }
1602
1603       socket->write_handle->ack_bitmap = GNUNET_ntohll (ack->bitmap);
1604       socket->receive_window_available = 
1605         ntohl (ack->receive_window_remaining);
1606       write_data (socket);
1607       break;
1608     default:
1609       break;
1610     }
1611   return GNUNET_OK;
1612 }
1613
1614
1615 /**
1616  * Message Handler for mesh
1617  *
1618  * @param cls the 'struct GNUNET_STREAM_Socket'
1619  * @param tunnel connection to the other end
1620  * @param tunnel_ctx unused
1621  * @param sender who sent the message
1622  * @param message the actual message
1623  * @param atsi performance data for the connection
1624  * @return GNUNET_OK to keep the connection open,
1625  *         GNUNET_SYSERR to close it (signal serious error)
1626  */
1627 static int
1628 client_handle_ack (void *cls,
1629                    struct GNUNET_MESH_Tunnel *tunnel,
1630                    void **tunnel_ctx,
1631                    const struct GNUNET_PeerIdentity *sender,
1632                    const struct GNUNET_MessageHeader *message,
1633                    const struct GNUNET_ATS_Information*atsi)
1634 {
1635   struct GNUNET_STREAM_Socket *socket = cls;
1636   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
1637  
1638   return handle_ack (socket, tunnel, sender, ack, atsi);
1639 }
1640
1641
1642 /**
1643  * Message Handler for mesh
1644  *
1645  * @param cls the server's listen socket
1646  * @param tunnel connection to the other end
1647  * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
1648  * @param sender who sent the message
1649  * @param message the actual message
1650  * @param atsi performance data for the connection
1651  * @return GNUNET_OK to keep the connection open,
1652  *         GNUNET_SYSERR to close it (signal serious error)
1653  */
1654 static int
1655 server_handle_ack (void *cls,
1656                    struct GNUNET_MESH_Tunnel *tunnel,
1657                    void **tunnel_ctx,
1658                    const struct GNUNET_PeerIdentity *sender,
1659                    const struct GNUNET_MessageHeader *message,
1660                    const struct GNUNET_ATS_Information*atsi)
1661 {
1662   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1663   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
1664  
1665   return handle_ack (socket, tunnel, sender, ack, atsi);
1666 }
1667
1668
1669 /**
1670  * For client message handlers, the stream socket is in the
1671  * closure argument.
1672  */
1673 static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
1674   {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
1675   {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
1676    sizeof (struct GNUNET_STREAM_AckMessage) },
1677   {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
1678    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
1679   {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
1680    sizeof (struct GNUNET_STREAM_MessageHeader)},
1681   {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
1682    sizeof (struct GNUNET_STREAM_MessageHeader)},
1683   {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
1684    sizeof (struct GNUNET_STREAM_MessageHeader)},
1685   {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
1686    sizeof (struct GNUNET_STREAM_MessageHeader)},
1687   {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
1688    sizeof (struct GNUNET_STREAM_MessageHeader)},
1689   {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
1690    sizeof (struct GNUNET_STREAM_MessageHeader)},
1691   {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
1692    sizeof (struct GNUNET_STREAM_MessageHeader)},
1693   {NULL, 0, 0}
1694 };
1695
1696
1697 /**
1698  * For server message handlers, the stream socket is in the
1699  * tunnel context, and the listen socket in the closure argument.
1700  */
1701 static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
1702   {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
1703   {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
1704    sizeof (struct GNUNET_STREAM_AckMessage) },
1705   {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 
1706    sizeof (struct GNUNET_STREAM_MessageHeader)},
1707   {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
1708    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
1709   {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
1710    sizeof (struct GNUNET_STREAM_MessageHeader)},
1711   {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
1712    sizeof (struct GNUNET_STREAM_MessageHeader)},
1713   {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
1714    sizeof (struct GNUNET_STREAM_MessageHeader)},
1715   {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
1716    sizeof (struct GNUNET_STREAM_MessageHeader)},
1717   {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
1718    sizeof (struct GNUNET_STREAM_MessageHeader)},
1719   {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
1720    sizeof (struct GNUNET_STREAM_MessageHeader)},
1721   {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
1722    sizeof (struct GNUNET_STREAM_MessageHeader)},
1723   {NULL, 0, 0}
1724 };
1725
1726
1727 /**
1728  * Function called when our target peer is connected to our tunnel
1729  *
1730  * @param cls the socket for which this tunnel is created
1731  * @param peer the peer identity of the target
1732  * @param atsi performance data for the connection
1733  */
1734 static void
1735 mesh_peer_connect_callback (void *cls,
1736                             const struct GNUNET_PeerIdentity *peer,
1737                             const struct GNUNET_ATS_Information * atsi)
1738 {
1739   struct GNUNET_STREAM_Socket *socket = cls;
1740   struct GNUNET_STREAM_MessageHeader *message;
1741
1742   if (0 != memcmp (&socket->other_peer, 
1743                    peer, 
1744                    sizeof (struct GNUNET_PeerIdentity)))
1745     {
1746       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1747                   "A peer (%s) which is not our target has connected to our tunnel", 
1748                   GNUNET_i2s (peer));
1749       return;
1750     }
1751   
1752   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1753               "Target peer %s connected\n", GNUNET_i2s (peer));
1754   
1755   /* Set state to INIT */
1756   socket->state = STATE_INIT;
1757
1758   /* Send HELLO message */
1759   message = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1760   message->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
1761   message->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1762   queue_message (socket,
1763                  message,
1764                  &set_state_hello_wait,
1765                  NULL);
1766
1767   /* Call open callback */
1768   if (NULL == socket->open_cb)
1769     {
1770       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1771                   "STREAM_open callback is NULL\n");
1772     }
1773   else
1774     {
1775       socket->open_cb (socket->open_cls, socket);
1776     }
1777 }
1778
1779
1780 /**
1781  * Function called when our target peer is disconnected from our tunnel
1782  *
1783  * @param cls the socket associated which this tunnel
1784  * @param peer the peer identity of the target
1785  */
1786 static void
1787 mesh_peer_disconnect_callback (void *cls,
1788                                const struct GNUNET_PeerIdentity *peer)
1789 {
1790
1791 }
1792
1793
1794 /*****************/
1795 /* API functions */
1796 /*****************/
1797
1798
1799 /**
1800  * Tries to open a stream to the target peer
1801  *
1802  * @param cfg configuration to use
1803  * @param target the target peer to which the stream has to be opened
1804  * @param app_port the application port number which uniquely identifies this
1805  *            stream
1806  * @param open_cb this function will be called after stream has be established 
1807  * @param open_cb_cls the closure for open_cb
1808  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
1809  * @return if successful it returns the stream socket; NULL if stream cannot be
1810  *         opened 
1811  */
1812 struct GNUNET_STREAM_Socket *
1813 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
1814                     const struct GNUNET_PeerIdentity *target,
1815                     GNUNET_MESH_ApplicationType app_port,
1816                     GNUNET_STREAM_OpenCallback open_cb,
1817                     void *open_cb_cls,
1818                     ...)
1819 {
1820   struct GNUNET_STREAM_Socket *socket;
1821   enum GNUNET_STREAM_Option option;
1822   va_list vargs;                /* Variable arguments */
1823
1824   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
1825   socket->other_peer = *target;
1826   socket->open_cb = open_cb;
1827   socket->open_cls = open_cb_cls;
1828
1829   /* Set defaults */
1830   socket->retransmit_timeout = 
1831     GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
1832
1833   va_start (vargs, open_cb_cls); /* Parse variable args */
1834   do {
1835     option = va_arg (vargs, enum GNUNET_STREAM_Option);
1836     switch (option)
1837       {
1838       case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
1839         /* Expect struct GNUNET_TIME_Relative */
1840         socket->retransmit_timeout = va_arg (vargs,
1841                                              struct GNUNET_TIME_Relative);
1842         break;
1843       case GNUNET_STREAM_OPTION_END:
1844         break;
1845       }
1846   } while (GNUNET_STREAM_OPTION_END != option);
1847   va_end (vargs);               /* End of variable args parsing */
1848
1849   socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
1850                                       1,  /* QUEUE size as parameter? */
1851                                       socket, /* cls */
1852                                       NULL, /* No inbound tunnel handler */
1853                                       NULL, /* No inbound tunnel cleaner */
1854                                       client_message_handlers,
1855                                       NULL); /* We don't get inbound tunnels */
1856   // FIXME: if (NULL == socket->mesh) ...
1857
1858   /* Now create the mesh tunnel to target */
1859   socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
1860                                               NULL, /* Tunnel context */
1861                                               &mesh_peer_connect_callback,
1862                                               &mesh_peer_disconnect_callback,
1863                                               socket);
1864   // FIXME: if (NULL == socket->tunnel) ...
1865
1866   return socket;
1867 }
1868
1869
1870 /**
1871  * Closes the stream
1872  *
1873  * @param socket the stream socket
1874  */
1875 void
1876 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
1877 {
1878   struct MessageQueue *head;
1879
1880   if (socket->read_task != GNUNET_SCHEDULER_NO_TASK)
1881   {
1882     /* socket closed with read task pending!? */
1883     GNUNET_break (0);
1884     GNUNET_SCHEDULER_cancel (socket->read_task);
1885     socket->read_task = GNUNET_SCHEDULER_NO_TASK;
1886   }
1887
1888   /* Clear Transmit handles */
1889   if (NULL != socket->transmit_handle)
1890     {
1891       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
1892       socket->transmit_handle = NULL;
1893     }
1894
1895   /* Clear existing message queue */
1896   while (NULL != (head = socket->queue_head)) {
1897     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
1898                                  socket->queue_tail,
1899                                  head);
1900     GNUNET_free (head->message);
1901     GNUNET_free (head);
1902   }
1903
1904   /* Close associated tunnel */
1905   if (NULL != socket->tunnel)
1906     {
1907       GNUNET_MESH_tunnel_destroy (socket->tunnel);
1908       socket->tunnel = NULL;
1909     }
1910
1911   /* Close mesh connection */
1912   if (NULL != socket->mesh)
1913     {
1914       GNUNET_MESH_disconnect (socket->mesh);
1915       socket->mesh = NULL;
1916     }
1917   
1918   /* Release receive buffer */
1919   if (NULL != socket->receive_buffer)
1920     {
1921       GNUNET_free (socket->receive_buffer);
1922     }
1923
1924   GNUNET_free (socket);
1925 }
1926
1927
1928 /**
1929  * Method called whenever a peer creates a tunnel to us
1930  *
1931  * @param cls closure
1932  * @param tunnel new handle to the tunnel
1933  * @param initiator peer that started the tunnel
1934  * @param atsi performance information for the tunnel
1935  * @return initial tunnel context for the tunnel
1936  *         (can be NULL -- that's not an error)
1937  */
1938 static void *
1939 new_tunnel_notify (void *cls,
1940                    struct GNUNET_MESH_Tunnel *tunnel,
1941                    const struct GNUNET_PeerIdentity *initiator,
1942                    const struct GNUNET_ATS_Information *atsi)
1943 {
1944   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
1945   struct GNUNET_STREAM_Socket *socket;
1946
1947   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
1948   socket->tunnel = tunnel;
1949   socket->session_id = 0;       /* FIXME */
1950   socket->other_peer = *initiator;
1951   socket->state = STATE_INIT;
1952
1953   if (GNUNET_SYSERR == lsocket->listen_cb (lsocket->listen_cb_cls,
1954                                            socket,
1955                                            &socket->other_peer))
1956     {
1957       socket->state = STATE_CLOSED;
1958       /* FIXME: Send CLOSE message and then free */
1959       GNUNET_free (socket);
1960       GNUNET_MESH_tunnel_destroy (tunnel); /* Destroy the tunnel */
1961     }
1962   return socket;
1963 }
1964
1965
1966 /**
1967  * Function called whenever an inbound tunnel is destroyed.  Should clean up
1968  * any associated state.  This function is NOT called if the client has
1969  * explicitly asked for the tunnel to be destroyed using
1970  * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
1971  * the tunnel.
1972  *
1973  * @param cls closure (set from GNUNET_MESH_connect)
1974  * @param tunnel connection to the other end (henceforth invalid)
1975  * @param tunnel_ctx place where local state associated
1976  *                   with the tunnel is stored
1977  */
1978 static void 
1979 tunnel_cleaner (void *cls,
1980                 const struct GNUNET_MESH_Tunnel *tunnel,
1981                 void *tunnel_ctx)
1982 {
1983   struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
1984   
1985   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1986               "Peer %s has terminated connection abruptly\n",
1987               GNUNET_i2s (&socket->other_peer));
1988
1989   socket->status = GNUNET_STREAM_SHUTDOWN;
1990   /* Clear Transmit handles */
1991   if (NULL != socket->transmit_handle)
1992     {
1993       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
1994       socket->transmit_handle = NULL;
1995     }
1996   socket->tunnel = NULL;
1997 }
1998
1999
2000 /**
2001  * Listens for stream connections for a specific application ports
2002  *
2003  * @param cfg the configuration to use
2004  * @param app_port the application port for which new streams will be accepted
2005  * @param listen_cb this function will be called when a peer tries to establish
2006  *            a stream with us
2007  * @param listen_cb_cls closure for listen_cb
2008  * @return listen socket, NULL for any error
2009  */
2010 struct GNUNET_STREAM_ListenSocket *
2011 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
2012                       GNUNET_MESH_ApplicationType app_port,
2013                       GNUNET_STREAM_ListenCallback listen_cb,
2014                       void *listen_cb_cls)
2015 {
2016   /* FIXME: Add variable args for passing configration options? */
2017   struct GNUNET_STREAM_ListenSocket *lsocket;
2018   GNUNET_MESH_ApplicationType app_types[2];
2019
2020   app_types[0] = app_port;
2021   app_types[1] = 0;
2022   lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
2023   lsocket->port = app_port;
2024   lsocket->listen_cb = listen_cb;
2025   lsocket->listen_cb_cls = listen_cb_cls;
2026   lsocket->mesh = GNUNET_MESH_connect (cfg,
2027                                        10, /* FIXME: QUEUE size as parameter? */
2028                                        lsocket, /* Closure */
2029                                        &new_tunnel_notify,
2030                                        &tunnel_cleaner,
2031                                        server_message_handlers,
2032                                        app_types);
2033   return lsocket;
2034 }
2035
2036
2037 /**
2038  * Closes the listen socket
2039  *
2040  * @param lsocket the listen socket
2041  */
2042 void
2043 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
2044 {
2045   /* Close MESH connection */
2046   GNUNET_MESH_disconnect (lsocket->mesh);
2047   
2048   GNUNET_free (lsocket);
2049 }
2050
2051
2052 /**
2053  * Tries to write the given data to the stream
2054  *
2055  * @param socket the socket representing a stream
2056  * @param data the data buffer from where the data is written into the stream
2057  * @param size the number of bytes to be written from the data buffer
2058  * @param timeout the timeout period
2059  * @param write_cont the function to call upon writing some bytes into the stream
2060  * @param write_cont_cls the closure
2061  * @return handle to cancel the operation
2062  */
2063 struct GNUNET_STREAM_IOWriteHandle *
2064 GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
2065                      const void *data,
2066                      size_t size,
2067                      struct GNUNET_TIME_Relative timeout,
2068                      GNUNET_STREAM_CompletionContinuation write_cont,
2069                      void *write_cont_cls)
2070 {
2071   unsigned int num_needed_packets;
2072   unsigned int packet;
2073   struct GNUNET_STREAM_IOWriteHandle *io_handle;
2074   uint32_t packet_size;
2075   uint32_t payload_size;
2076   struct GNUNET_STREAM_DataMessage *data_msg;
2077   const void *sweep;
2078
2079   /* Return NULL if there is already a write request pending */
2080   if (NULL != socket->write_handle)
2081   {
2082     GNUNET_break (0);
2083     return NULL;
2084   }
2085   if (!((STATE_ESTABLISHED == socket->state)
2086         || (STATE_RECEIVE_CLOSE_WAIT == socket->state)
2087         || (STATE_RECEIVE_CLOSED == socket->state)))
2088     {
2089       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2090                   "Attempting to write on a closed (OR) not-yet-established"
2091                   "stream\n"); 
2092       return NULL;
2093     } 
2094   if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size)
2095     size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH  * max_payload_size;
2096   num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size;
2097   io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
2098   sweep = data;
2099   /* Divide the given buffer into packets for sending */
2100   for (packet=0; packet < num_needed_packets; packet++)
2101     {
2102       if ((packet + 1) * max_payload_size < size) 
2103         {
2104           payload_size = max_payload_size;
2105           packet_size = MAX_PACKET_SIZE;
2106         }
2107       else 
2108         {
2109           payload_size = size - packet * max_payload_size;
2110           packet_size =  payload_size + sizeof (struct
2111                                                 GNUNET_STREAM_DataMessage); 
2112         }
2113       io_handle->messages[packet] = GNUNET_malloc (packet_size);
2114       io_handle->messages[packet]->header.header.size = htons (packet_size);
2115       io_handle->messages[packet]->header.header.type =
2116         htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
2117       io_handle->messages[packet]->sequence_number =
2118         htons (socket->write_sequence_number++);
2119       io_handle->messages[packet]->offset = htons (socket->write_offset);
2120
2121       /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
2122          determined from RTT */
2123       io_handle->messages[packet]->ack_deadline = 
2124         GNUNET_TIME_relative_hton (GNUNET_TIME_relative_multiply 
2125                                    (GNUNET_TIME_UNIT_SECONDS, 5));
2126       data_msg = io_handle->messages[packet];
2127       /* Copy data from given buffer to the packet */
2128       memcpy (&data_msg[1],
2129               sweep,
2130               payload_size);
2131       sweep += payload_size;
2132       socket->write_offset += payload_size;
2133     }
2134   socket->write_handle = io_handle;
2135   write_data (socket);
2136
2137   return io_handle;
2138 }
2139
2140
2141 /**
2142  * Tries to read data from the stream
2143  *
2144  * @param socket the socket representing a stream
2145  * @param timeout the timeout period
2146  * @param proc function to call with data (once only)
2147  * @param proc_cls the closure for proc
2148  * @return handle to cancel the operation
2149  */
2150 struct GNUNET_STREAM_IOReadHandle *
2151 GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
2152                     struct GNUNET_TIME_Relative timeout,
2153                     GNUNET_STREAM_DataProcessor proc,
2154                     void *proc_cls)
2155 {
2156   struct GNUNET_STREAM_IOReadHandle *read_handle;
2157   
2158   /* Return NULL if there is already a read handle; the user has to cancel that
2159   first before continuing or has to wait until it is completed */
2160   if (NULL != socket->read_handle) return NULL;
2161
2162   read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
2163   read_handle->proc = proc;
2164   socket->read_handle = read_handle;
2165
2166   /* Check if we have a packet at bitmap 0 */
2167   if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
2168                                           0))
2169     {
2170       socket->read_task = GNUNET_SCHEDULER_add_now (&call_read_processor,
2171                                                     socket);
2172    
2173     }
2174   
2175   /* Setup the read timeout task */
2176   socket->read_io_timeout_task = GNUNET_SCHEDULER_add_delayed (timeout,
2177                                                                &read_io_timeout,
2178                                                                socket);
2179   return read_handle;
2180 }