-fixed relative boundaries in stream read
[oweals/gnunet.git] / src / stream / stream_api.c
1 /*
2   This file is part of GNUnet.
3   (C) 2012 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file stream/stream_api.c
23  * @brief Implementation of the stream library
24  * @author Sree Harsha Totakura
25  */
26 #include "platform.h"
27 #include "gnunet_common.h"
28 #include "gnunet_crypto_lib.h"
29 #include "gnunet_stream_lib.h"
30 #include "stream_protocol.h"
31
32
33 /**
34  * The maximum packet size of a stream packet
35  */
36 #define MAX_PACKET_SIZE 64000
37
38 /**
39  * The maximum payload a data message packet can carry
40  */
41 static size_t max_payload_size = 
42   MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
43
44 /**
45  * Receive buffer
46  */
47 #define RECEIVE_BUFFER_SIZE 4096000
48
49 /**
50  * states in the Protocol
51  */
52 enum State
53   {
54     /**
55      * Client initialization state
56      */
57     STATE_INIT,
58
59     /**
60      * Listener initialization state 
61      */
62     STATE_LISTEN,
63
64     /**
65      * Pre-connection establishment state
66      */
67     STATE_HELLO_WAIT,
68
69     /**
70      * State where a connection has been established
71      */
72     STATE_ESTABLISHED,
73
74     /**
75      * State where the socket is closed on our side and waiting to be ACK'ed
76      */
77     STATE_RECEIVE_CLOSE_WAIT,
78
79     /**
80      * State where the socket is closed for reading
81      */
82     STATE_RECEIVE_CLOSED,
83
84     /**
85      * State where the socket is closed on our side and waiting to be ACK'ed
86      */
87     STATE_TRANSMIT_CLOSE_WAIT,
88
89     /**
90      * State where the socket is closed for writing
91      */
92     STATE_TRANSMIT_CLOSED,
93
94     /**
95      * State where the socket is closed on our side and waiting to be ACK'ed
96      */
97     STATE_CLOSE_WAIT,
98
99     /**
100      * State where the socket is closed
101      */
102     STATE_CLOSED 
103   };
104
105
106 /**
107  * Functions of this type are called when a message is written
108  *
109  * @param cls the closure from queue_message
110  * @param socket the socket the written message was bound to
111  */
112 typedef void (*SendFinishCallback) (void *cls,
113                                     struct GNUNET_STREAM_Socket *socket);
114
115
116 /**
117  * The send message queue
118  */
119 struct MessageQueue
120 {
121   /**
122    * The message
123    */
124   struct GNUNET_STREAM_MessageHeader *message;
125
126   /**
127    * Callback to be called when the message is sent
128    */
129   SendFinishCallback finish_cb;
130
131   /**
132    * The closure for finish_cb
133    */
134   void *finish_cb_cls;
135
136   /**
137    * The next message in queue. Should be NULL in the last message
138    */
139   struct MessageQueue *next;
140
141   /**
142    * The next message in queue. Should be NULL in the first message
143    */
144   struct MessageQueue *prev;
145 };
146
147
148 /**
149  * The STREAM Socket Handler
150  */
151 struct GNUNET_STREAM_Socket
152 {
153
154   /**
155    * The peer identity of the peer at the other end of the stream
156    */
157   struct GNUNET_PeerIdentity other_peer;
158
159   /**
160    * Retransmission timeout
161    */
162   struct GNUNET_TIME_Relative retransmit_timeout;
163
164   /**
165    * The Acknowledgement Bitmap
166    */
167   GNUNET_STREAM_AckBitmap ack_bitmap;
168
169   /**
170    * Time when the Acknowledgement was queued
171    */
172   struct GNUNET_TIME_Absolute ack_time_registered;
173
174   /**
175    * Queued Acknowledgement deadline
176    */
177   struct GNUNET_TIME_Relative ack_time_deadline;
178
179   /**
180    * The task for sending timely Acks
181    */
182   GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
183
184   /**
185    * The mesh handle
186    */
187   struct GNUNET_MESH_Handle *mesh;
188
189   /**
190    * The mesh tunnel handle
191    */
192   struct GNUNET_MESH_Tunnel *tunnel;
193
194   /**
195    * Stream open closure
196    */
197   void *open_cls;
198
199   /**
200    * Stream open callback
201    */
202   GNUNET_STREAM_OpenCallback open_cb;
203
204   /**
205    * The current transmit handle (if a pending transmit request exists)
206    */
207   struct GNUNET_MESH_TransmitHandle *transmit_handle;
208
209   /**
210    * The current message associated with the transmit handle
211    */
212   struct MessageQueue *queue_head;
213
214   /**
215    * The queue tail, should always point to the last message in queue
216    */
217   struct MessageQueue *queue_tail;
218
219   /**
220    * The write IO_handle associated with this socket
221    */
222   struct GNUNET_STREAM_IOWriteHandle *write_handle;
223
224   /**
225    * The read IO_handle associated with this socket
226    */
227   struct GNUNET_STREAM_IOReadHandle *read_handle;
228
229   /**
230    * Buffer for storing received messages
231    */
232   void *receive_buffer;
233
234   /**
235    * Copy buffer pointer; Used during read operations
236    */
237   void *copy_buffer;
238
239   /**
240    * The state of the protocol associated with this socket
241    */
242   enum State state;
243
244   /**
245    * The status of the socket
246    */
247   enum GNUNET_STREAM_Status status;
248
249   /**
250    * The number of previous timeouts; FIXME: currently not used
251    */
252   unsigned int retries;
253
254   /**
255    * The session id associated with this stream connection
256    * FIXME: Not used currently, may be removed
257    */
258   uint32_t session_id;
259
260   /**
261    * Write sequence number. Set to random when sending HELLO(client) and
262    * HELLO_ACK(server) 
263    */
264   uint32_t write_sequence_number;
265
266   /**
267    * Read sequence number. This number's value is determined during handshake
268    */
269   uint32_t read_sequence_number;
270
271   /**
272    * The receiver buffer size
273    */
274   uint32_t receive_buffer_size;
275
276   /**
277    * The receiver buffer boundaries
278    */
279   uint32_t receive_buffer_boundaries[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
280
281   /**
282    * receiver's available buffer after the last acknowledged packet
283    */
284   uint32_t receive_window_available;
285
286   /**
287    * The offset pointer used during write operation
288    */
289   uint32_t write_offset;
290
291   /**
292    * The offset after which we are expecting data
293    */
294   uint32_t read_offset;
295
296   /**
297    * The size of the copy buffer
298    */
299   uint32_t copy_buffer_size;
300   
301   /**
302    * The read offset of copy buffer
303    */
304   uint32_t copy_buffer_read_offset;
305 };
306
307
308 /**
309  * A socket for listening
310  */
311 struct GNUNET_STREAM_ListenSocket
312 {
313
314   /**
315    * The mesh handle
316    */
317   struct GNUNET_MESH_Handle *mesh;
318
319   /**
320    * The callback function which is called after successful opening socket
321    */
322   GNUNET_STREAM_ListenCallback listen_cb;
323
324   /**
325    * The call back closure
326    */
327   void *listen_cb_cls;
328
329   /**
330    * The service port
331    */
332   GNUNET_MESH_ApplicationType port;
333 };
334
335
336 /**
337  * The IO Write Handle
338  */
339 struct GNUNET_STREAM_IOWriteHandle
340 {
341   /**
342    * The packet_buffers associated with this Handle
343    */
344   struct GNUNET_STREAM_DataMessage *messages[64];
345
346   /**
347    * The bitmap of this IOHandle; Corresponding bit for a message is set when
348    * it has been acknowledged by the receiver
349    */
350   GNUNET_STREAM_AckBitmap ack_bitmap;
351
352   /**
353    * Number of packets sent before waiting for an ack
354    *
355    * FIXME: Do we need this?
356    */
357   unsigned int sent_packets;
358 };
359
360
361 /**
362  * The IO Read Handle
363  */
364 struct GNUNET_STREAM_IOReadHandle
365 {
366   /**
367    * Callback for the read processor
368    */
369   GNUNET_STREAM_DataProcessor proc;
370
371   /**
372    * The closure pointer for the read processor callback
373    */
374   void *proc_cls;
375 };
376
377
378 /**
379  * Default value in seconds for various timeouts
380  */
381 static unsigned int default_timeout = 300;
382
383
384 /**
385  * Callback function for sending hello message
386  *
387  * @param cls closure the socket
388  * @param size number of bytes available in buf
389  * @param buf where the callee should write the message
390  * @return number of bytes written to buf
391  */
392 static size_t
393 send_message_notify (void *cls, size_t size, void *buf)
394 {
395   struct GNUNET_STREAM_Socket *socket = cls;
396   struct MessageQueue *head;
397   size_t ret;
398
399   socket->transmit_handle = NULL; /* Remove the transmit handle */
400   head = socket->queue_head;
401   if (NULL == head)
402     return 0; /* just to be safe */
403   if (0 == size)                /* request timed out */
404     {
405       socket->retries++;
406       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
407                   "Message sending timed out. Retry %d \n",
408                   socket->retries);
409       socket->transmit_handle = 
410         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
411                                            0, /* Corking */
412                                            1, /* Priority */
413                                            /* FIXME: exponential backoff */
414                                            socket->retransmit_timeout,
415                                            &socket->other_peer,
416                                            ntohs (head->message->header.size),
417                                            &send_message_notify,
418                                            socket);
419       return 0;
420     }
421
422   ret = ntohs (head->message->header.size);
423   GNUNET_assert (size >= ret);
424   memcpy (buf, head->message, ret);
425   if (NULL != head->finish_cb)
426     {
427       head->finish_cb (socket, head->finish_cb_cls);
428     }
429   GNUNET_CONTAINER_DLL_remove (socket->queue_head,
430                                socket->queue_tail,
431                                head);
432   GNUNET_free (head->message);
433   GNUNET_free (head);
434   head = socket->queue_head;
435   if (NULL != head)    /* more pending messages to send */
436     {
437       socket->retries = 0;
438       socket->transmit_handle = 
439         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
440                                            0, /* Corking */
441                                            1, /* Priority */
442                                            /* FIXME: exponential backoff */
443                                            socket->retransmit_timeout,
444                                            &socket->other_peer,
445                                            ntohs (head->message->header.size),
446                                            &send_message_notify,
447                                            socket);
448     }
449   return ret;
450 }
451
452
453 /**
454  * Queues a message for sending using the mesh connection of a socket
455  *
456  * @param socket the socket whose mesh connection is used
457  * @param message the message to be sent
458  * @param finish_cb the callback to be called when the message is sent
459  * @param finish_cb_cls the closure for the callback
460  */
461 static void
462 queue_message (struct GNUNET_STREAM_Socket *socket,
463                struct GNUNET_STREAM_MessageHeader *message,
464                SendFinishCallback finish_cb,
465                void *finish_cb_cls)
466 {
467   struct MessageQueue *queue_entity;
468
469   queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
470   queue_entity->message = message;
471   queue_entity->finish_cb = finish_cb;
472   queue_entity->finish_cb_cls = finish_cb_cls;
473   GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
474                                     socket->queue_tail,
475                                     queue_entity);
476   if (NULL == socket->transmit_handle)
477   {
478     socket->retries = 0;
479     socket->transmit_handle = 
480       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
481                                          0, /* Corking */
482                                          1, /* Priority */
483                                          socket->retransmit_timeout,
484                                          &socket->other_peer,
485                                          ntohs (message->header.size),
486                                          &send_message_notify,
487                                          socket);
488   }
489 }
490
491
492 /**
493  * Callback function for sending ack message
494  *
495  * @param cls closure the ACK message created in ack_task
496  * @param size number of bytes available in buffer
497  * @param buf where the callee should write the message
498  * @return number of bytes written to buf
499  */
500 static size_t
501 send_ack_notify (void *cls, size_t size, void *buf)
502 {
503   struct GNUNET_STREAM_AckMessage *ack_msg = cls;
504
505   if (0 == size)
506     {
507       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
508                   "%s called with size 0\n", __func__);
509       return 0;
510     }
511   GNUNET_assert (ack_msg->header.header.size <= size);
512   
513   size = ack_msg->header.header.size;
514   memcpy (buf, ack_msg, size);
515   return size;
516 }
517
518
519 /**
520  * Task for sending ACK message
521  *
522  * @param cls the socket
523  * @param tc the Task context
524  */
525 static void
526 ack_task (void *cls,
527           const struct GNUNET_SCHEDULER_TaskContext *tc)
528 {
529   struct GNUNET_STREAM_Socket *socket = cls;
530   struct GNUNET_STREAM_AckMessage *ack_msg;
531
532   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
533     {
534       return;
535     }
536
537   socket->ack_task_id = 0;
538
539   /* Create the ACK Message */
540   ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
541   ack_msg->header.header.size = htons (sizeof (struct 
542                                                GNUNET_STREAM_AckMessage));
543   ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
544   ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
545   ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
546   ack_msg->receive_window_remaining = 
547     htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
548
549   /* Request MESH for sending ACK */
550   GNUNET_MESH_notify_transmit_ready (socket->tunnel,
551                                      0, /* Corking */
552                                      1, /* Priority */
553                                      socket->retransmit_timeout,
554                                      &socket->other_peer,
555                                      ntohs (ack_msg->header.header.size),
556                                      &send_ack_notify,
557                                      ack_msg);
558
559   
560 }
561
562
563 /**
564  * Function to modify a bit in GNUNET_STREAM_AckBitmap
565  *
566  * @param bitmap the bitmap to modify
567  * @param bit the bit number to modify
568  * @param value GNUNET_YES to on, GNUNET_NO to off
569  */
570 static void
571 ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
572                       unsigned int bit, 
573                       int value)
574 {
575   GNUNET_assert (bit < 64);
576   if (GNUNET_YES == value)
577     *bitmap |= (1LL << bit);
578   else
579     *bitmap &= ~(1LL << bit);
580 }
581
582
583 /**
584  * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
585  *
586  * @param bitmap address of the bitmap that has to be checked
587  * @param bit the bit number to check
588  * @return GNUNET_YES if the bit is set; GNUNET_NO if not
589  */
590 static uint8_t
591 ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
592                       unsigned int bit)
593 {
594   GNUNET_assert (bit < 64);
595   return 0 != (*bitmap & (1LL << bit));
596 }
597
598
599
600 /**
601  * Function called when Data Message is sent
602  *
603  * @param cls the io_handle corresponding to the Data Message
604  * @param socket the socket which was used
605  */
606 static void
607 write_data_finish_cb (void *cls,
608                       struct GNUNET_STREAM_Socket *socket)
609 {
610   struct GNUNET_STREAM_IOWriteHandle *io_handle = cls;
611
612   io_handle->sent_packets++;
613 }
614
615
616 /**
617  * Writes data using the given socket. The amount of data written is limited by
618  * the receive_window_size
619  *
620  * @param socket the socket to use
621  */
622 static void 
623 write_data (struct GNUNET_STREAM_Socket *socket)
624 {
625   struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
626   unsigned int packet;
627   int ack_packet;
628
629   ack_packet = -1;
630   /* Find the last acknowledged packet */
631   for (packet=0; packet < 64; packet++)
632     {      
633       if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
634                                               packet))
635         ack_packet = packet;        
636       else if (NULL == io_handle->messages[packet])
637         break;
638     }
639   /* Resend packets which weren't ack'ed */
640   for (packet=0; packet < ack_packet; packet++)
641     {
642       if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
643                                              packet))
644         {
645           queue_message (socket,
646                          &io_handle->messages[packet]->header,
647                          NULL,
648                          NULL);
649         }
650     }
651   packet = ack_packet + 1;
652   /* Now send new packets if there is enough buffer space */
653   while ( (NULL != io_handle->messages[packet]) &&
654           (socket->receive_window_available >= ntohs (io_handle->messages[packet]->header.header.size)) )
655     {
656       socket->receive_window_available -= ntohs (io_handle->messages[packet]->header.header.size);
657       queue_message (socket,
658                      &io_handle->messages[packet]->header,
659                      &write_data_finish_cb,
660                      io_handle);
661       packet++;
662     }
663 }
664
665
666 /**
667  * Handler for DATA messages; Same for both client and server
668  *
669  * @param socket the socket through which the ack was received
670  * @param tunnel connection to the other end
671  * @param sender who sent the message
672  * @param msg the data message
673  * @param atsi performance data for the connection
674  * @return GNUNET_OK to keep the connection open,
675  *         GNUNET_SYSERR to close it (signal serious error)
676  */
677 static int
678 handle_data (struct GNUNET_STREAM_Socket *socket,
679              struct GNUNET_MESH_Tunnel *tunnel,
680              const struct GNUNET_PeerIdentity *sender,
681              const struct GNUNET_STREAM_DataMessage *msg,
682              const struct GNUNET_ATS_Information*atsi)
683 {
684   const void *payload;
685   uint32_t bytes_needed;
686   uint32_t relative_offset;
687   uint32_t relative_sequence_number;
688   uint16_t size;
689
690   size = htons (msg->header.header.size);
691   if (size < sizeof (struct GNUNET_STREAM_DataMessage))
692     {
693       GNUNET_break_op (0);
694       return GNUNET_SYSERR;
695     }
696
697   switch (socket->state)
698     {
699     case STATE_ESTABLISHED:
700     case STATE_TRANSMIT_CLOSED:
701     case STATE_TRANSMIT_CLOSE_WAIT:
702
703       /* check if the message's sequence number is in the range we are
704          expecting */
705       relative_sequence_number = 
706         ntohl (msg->sequence_number) - socket->read_sequence_number;
707       if ( relative_sequence_number > 64)
708         {
709           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
710                       "Ignoring received message with sequence number %d",
711                       ntohl (msg->sequence_number));
712           return GNUNET_YES;
713         }
714
715       /* Check if we have to allocate the buffer */
716       size -= sizeof (struct GNUNET_STREAM_DataMessage);
717       relative_offset = ntohl (msg->offset) - socket->read_offset;
718       bytes_needed = relative_offset + size;
719       
720       if (bytes_needed > socket->receive_buffer_size)
721         {
722           if (bytes_needed <= RECEIVE_BUFFER_SIZE)
723             {
724               socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
725                                                        bytes_needed);
726               socket->receive_buffer_size = bytes_needed;
727             }
728           else
729             {
730               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
731                           "Cannot accommodate packet %d as buffer is full\n",
732                           ntohl (msg->sequence_number));
733               return GNUNET_YES;
734             }
735         }
736       
737       /* Copy Data to buffer */
738       payload = &msg[1];
739       GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
740       memcpy (socket->receive_buffer + relative_offset,
741               payload,
742               size);
743       socket->receive_buffer_boundaries[relative_sequence_number] = 
744         relative_offset + size;
745       
746       /* Modify the ACK bitmap */
747       ackbitmap_modify_bit (&socket->ack_bitmap,
748                             relative_sequence_number,
749                             GNUNET_YES);
750
751       /* Start ACK sending task if one is not already present */
752       if (0 == socket->ack_task_id)
753        {
754          socket->ack_task_id = 
755            GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
756                                          (msg->ack_deadline),
757                                          &ack_task,
758                                          socket);
759        }
760       
761       break;
762
763     default:
764       /* FIXME: call statistics */
765       break;
766     }
767   return GNUNET_YES;
768 }
769
770 /**
771  * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
772  *
773  * @param cls the socket (set from GNUNET_MESH_connect)
774  * @param tunnel connection to the other end
775  * @param tunnel_ctx place to store local state associated with the tunnel
776  * @param sender who sent the message
777  * @param message the actual message
778  * @param atsi performance data for the connection
779  * @return GNUNET_OK to keep the connection open,
780  *         GNUNET_SYSERR to close it (signal serious error)
781  */
782 static int
783 client_handle_data (void *cls,
784              struct GNUNET_MESH_Tunnel *tunnel,
785              void **tunnel_ctx,
786              const struct GNUNET_PeerIdentity *sender,
787              const struct GNUNET_MessageHeader *message,
788              const struct GNUNET_ATS_Information*atsi)
789 {
790   struct GNUNET_STREAM_Socket *socket = cls;
791
792   return handle_data (socket, 
793                       tunnel, 
794                       sender, 
795                       (const struct GNUNET_STREAM_DataMessage *) message, 
796                       atsi);
797 }
798
799
800 /**
801  * Callback to set state to ESTABLISHED
802  *
803  * @param cls the closure from queue_message FIXME: document
804  * @param socket the socket to requiring state change
805  */
806 static void
807 set_state_established (void *cls,
808                        struct GNUNET_STREAM_Socket *socket)
809 {
810   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Attaining ESTABLISHED state\n");
811   socket->write_offset = 0;
812   socket->read_offset = 0;
813   socket->state = STATE_ESTABLISHED;
814 }
815
816
817 /**
818  * Callback to set state to HELLO_WAIT
819  *
820  * @param cls the closure from queue_message
821  * @param socket the socket to requiring state change
822  */
823 static void
824 set_state_hello_wait (void *cls,
825                       struct GNUNET_STREAM_Socket *socket)
826 {
827   GNUNET_assert (STATE_INIT == socket->state);
828   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Attaining HELLO_WAIT state\n");
829   socket->state = STATE_HELLO_WAIT;
830 }
831
832
833 /**
834  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
835  *
836  * @param cls the socket (set from GNUNET_MESH_connect)
837  * @param tunnel connection to the other end
838  * @param tunnel_ctx this is NULL
839  * @param sender who sent the message
840  * @param message the actual message
841  * @param atsi performance data for the connection
842  * @return GNUNET_OK to keep the connection open,
843  *         GNUNET_SYSERR to close it (signal serious error)
844  */
845 static int
846 client_handle_hello_ack (void *cls,
847                          struct GNUNET_MESH_Tunnel *tunnel,
848                          void **tunnel_ctx,
849                          const struct GNUNET_PeerIdentity *sender,
850                          const struct GNUNET_MessageHeader *message,
851                          const struct GNUNET_ATS_Information*atsi)
852 {
853   struct GNUNET_STREAM_Socket *socket = cls;
854   const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
855   struct GNUNET_STREAM_HelloAckMessage *reply;
856
857   ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
858   GNUNET_assert (socket->tunnel == tunnel);
859   switch (socket->state)
860   {
861   case STATE_HELLO_WAIT:
862       socket->read_sequence_number = ntohl (ack_msg->sequence_number);
863       socket->receive_window_available = ntohl (ack_msg->receive_window_size);
864       /* Get the random sequence number */
865       socket->write_sequence_number = 
866         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
867       reply = 
868         GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
869       reply->header.header.size = 
870         htons (sizeof (struct GNUNET_STREAM_MessageHeader));
871       reply->header.header.type = 
872         htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
873       reply->sequence_number = htonl (socket->write_sequence_number);
874       reply->receive_window_size = htonl (RECEIVE_BUFFER_SIZE);
875       queue_message (socket, 
876                      &reply->header, 
877                      &set_state_established, 
878                      NULL);      
879       return GNUNET_OK;
880   case STATE_ESTABLISHED:
881   case STATE_RECEIVE_CLOSE_WAIT:
882     // call statistics (# ACKs ignored++)
883     return GNUNET_OK;
884   case STATE_INIT:
885   default:
886     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
887                 "Server sent HELLO_ACK when in state %d\n", socket->state);
888     socket->state = STATE_CLOSED; // introduce STATE_ERROR?
889     return GNUNET_SYSERR;
890   }
891
892 }
893
894
895 /**
896  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
897  *
898  * @param cls the socket (set from GNUNET_MESH_connect)
899  * @param tunnel connection to the other end
900  * @param tunnel_ctx this is NULL
901  * @param sender who sent the message
902  * @param message the actual message
903  * @param atsi performance data for the connection
904  * @return GNUNET_OK to keep the connection open,
905  *         GNUNET_SYSERR to close it (signal serious error)
906  */
907 static int
908 client_handle_reset (void *cls,
909                      struct GNUNET_MESH_Tunnel *tunnel,
910                      void **tunnel_ctx,
911                      const struct GNUNET_PeerIdentity *sender,
912                      const struct GNUNET_MessageHeader *message,
913                      const struct GNUNET_ATS_Information*atsi)
914 {
915   struct GNUNET_STREAM_Socket *socket = cls;
916
917   return GNUNET_OK;
918 }
919
920
921 /**
922  * Common message handler for handling TRANSMIT_CLOSE messages
923  *
924  * @param socket the socket through which the ack was received
925  * @param tunnel connection to the other end
926  * @param sender who sent the message
927  * @param msg the transmit close message
928  * @param atsi performance data for the connection
929  * @return GNUNET_OK to keep the connection open,
930  *         GNUNET_SYSERR to close it (signal serious error)
931  */
932 static int
933 handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
934                        struct GNUNET_MESH_Tunnel *tunnel,
935                        const struct GNUNET_PeerIdentity *sender,
936                        const struct GNUNET_STREAM_MessageHeader *msg,
937                        const struct GNUNET_ATS_Information*atsi)
938 {
939   struct GNUNET_STREAM_MessageHeader *reply;
940
941   switch (socket->state)
942     {
943     case STATE_ESTABLISHED:
944       socket->state = STATE_RECEIVE_CLOSED;
945
946       /* Send TRANSMIT_CLOSE_ACK */
947       reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
948       reply->header.type = 
949         htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
950       reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
951       queue_message (socket, reply, NULL, NULL);
952       break;
953
954     default:
955       /* FIXME: Call statistics? */
956       break;
957     }
958   return GNUNET_YES;
959 }
960
961
962 /**
963  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
964  *
965  * @param cls the socket (set from GNUNET_MESH_connect)
966  * @param tunnel connection to the other end
967  * @param tunnel_ctx this is NULL
968  * @param sender who sent the message
969  * @param message the actual message
970  * @param atsi performance data for the connection
971  * @return GNUNET_OK to keep the connection open,
972  *         GNUNET_SYSERR to close it (signal serious error)
973  */
974 static int
975 client_handle_transmit_close (void *cls,
976                               struct GNUNET_MESH_Tunnel *tunnel,
977                               void **tunnel_ctx,
978                               const struct GNUNET_PeerIdentity *sender,
979                               const struct GNUNET_MessageHeader *message,
980                               const struct GNUNET_ATS_Information*atsi)
981 {
982   struct GNUNET_STREAM_Socket *socket = cls;
983   
984   return handle_transmit_close (socket,
985                                 tunnel,
986                                 sender,
987                                 (struct GNUNET_STREAM_MessageHeader *)message,
988                                 atsi);
989 }
990
991
992 /**
993  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
994  *
995  * @param cls the socket (set from GNUNET_MESH_connect)
996  * @param tunnel connection to the other end
997  * @param tunnel_ctx this is NULL
998  * @param sender who sent the message
999  * @param message the actual message
1000  * @param atsi performance data for the connection
1001  * @return GNUNET_OK to keep the connection open,
1002  *         GNUNET_SYSERR to close it (signal serious error)
1003  */
1004 static int
1005 client_handle_transmit_close_ack (void *cls,
1006                                   struct GNUNET_MESH_Tunnel *tunnel,
1007                                   void **tunnel_ctx,
1008                                   const struct GNUNET_PeerIdentity *sender,
1009                                   const struct GNUNET_MessageHeader *message,
1010                                   const struct GNUNET_ATS_Information*atsi)
1011 {
1012   struct GNUNET_STREAM_Socket *socket = cls;
1013
1014   return GNUNET_OK;
1015 }
1016
1017
1018 /**
1019  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1020  *
1021  * @param cls the socket (set from GNUNET_MESH_connect)
1022  * @param tunnel connection to the other end
1023  * @param tunnel_ctx this is NULL
1024  * @param sender who sent the message
1025  * @param message the actual message
1026  * @param atsi performance data for the connection
1027  * @return GNUNET_OK to keep the connection open,
1028  *         GNUNET_SYSERR to close it (signal serious error)
1029  */
1030 static int
1031 client_handle_receive_close (void *cls,
1032                              struct GNUNET_MESH_Tunnel *tunnel,
1033                              void **tunnel_ctx,
1034                              const struct GNUNET_PeerIdentity *sender,
1035                              const struct GNUNET_MessageHeader *message,
1036                              const struct GNUNET_ATS_Information*atsi)
1037 {
1038   struct GNUNET_STREAM_Socket *socket = cls;
1039
1040   return GNUNET_OK;
1041 }
1042
1043
1044 /**
1045  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1046  *
1047  * @param cls the socket (set from GNUNET_MESH_connect)
1048  * @param tunnel connection to the other end
1049  * @param tunnel_ctx this is NULL
1050  * @param sender who sent the message
1051  * @param message the actual message
1052  * @param atsi performance data for the connection
1053  * @return GNUNET_OK to keep the connection open,
1054  *         GNUNET_SYSERR to close it (signal serious error)
1055  */
1056 static int
1057 client_handle_receive_close_ack (void *cls,
1058                                  struct GNUNET_MESH_Tunnel *tunnel,
1059                                  void **tunnel_ctx,
1060                                  const struct GNUNET_PeerIdentity *sender,
1061                                  const struct GNUNET_MessageHeader *message,
1062                                  const struct GNUNET_ATS_Information*atsi)
1063 {
1064   struct GNUNET_STREAM_Socket *socket = cls;
1065
1066   return GNUNET_OK;
1067 }
1068
1069
1070 /**
1071  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1072  *
1073  * @param cls the socket (set from GNUNET_MESH_connect)
1074  * @param tunnel connection to the other end
1075  * @param tunnel_ctx this is NULL
1076  * @param sender who sent the message
1077  * @param message the actual message
1078  * @param atsi performance data for the connection
1079  * @return GNUNET_OK to keep the connection open,
1080  *         GNUNET_SYSERR to close it (signal serious error)
1081  */
1082 static int
1083 client_handle_close (void *cls,
1084                      struct GNUNET_MESH_Tunnel *tunnel,
1085                      void **tunnel_ctx,
1086                      const struct GNUNET_PeerIdentity *sender,
1087                      const struct GNUNET_MessageHeader *message,
1088                      const struct GNUNET_ATS_Information*atsi)
1089 {
1090   struct GNUNET_STREAM_Socket *socket = cls;
1091
1092   return GNUNET_OK;
1093 }
1094
1095
1096 /**
1097  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1098  *
1099  * @param cls the socket (set from GNUNET_MESH_connect)
1100  * @param tunnel connection to the other end
1101  * @param tunnel_ctx this is NULL
1102  * @param sender who sent the message
1103  * @param message the actual message
1104  * @param atsi performance data for the connection
1105  * @return GNUNET_OK to keep the connection open,
1106  *         GNUNET_SYSERR to close it (signal serious error)
1107  */
1108 static int
1109 client_handle_close_ack (void *cls,
1110                          struct GNUNET_MESH_Tunnel *tunnel,
1111                          void **tunnel_ctx,
1112                          const struct GNUNET_PeerIdentity *sender,
1113                          const struct GNUNET_MessageHeader *message,
1114                          const struct GNUNET_ATS_Information*atsi)
1115 {
1116   struct GNUNET_STREAM_Socket *socket = cls;
1117
1118   return GNUNET_OK;
1119 }
1120
1121 /*****************************/
1122 /* Server's Message Handlers */
1123 /*****************************/
1124
1125 /**
1126  * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1127  *
1128  * @param cls the closure
1129  * @param tunnel connection to the other end
1130  * @param tunnel_ctx the socket
1131  * @param sender who sent the message
1132  * @param message the actual message
1133  * @param atsi performance data for the connection
1134  * @return GNUNET_OK to keep the connection open,
1135  *         GNUNET_SYSERR to close it (signal serious error)
1136  */
1137 static int
1138 server_handle_data (void *cls,
1139                     struct GNUNET_MESH_Tunnel *tunnel,
1140                     void **tunnel_ctx,
1141                     const struct GNUNET_PeerIdentity *sender,
1142                     const struct GNUNET_MessageHeader *message,
1143                     const struct GNUNET_ATS_Information*atsi)
1144 {
1145   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1146
1147   return handle_data (socket,
1148                       tunnel,
1149                       sender,
1150                       (const struct GNUNET_STREAM_DataMessage *)message,
1151                       atsi);
1152 }
1153
1154
1155 /**
1156  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
1157  *
1158  * @param cls the closure
1159  * @param tunnel connection to the other end
1160  * @param tunnel_ctx the socket
1161  * @param sender who sent the message
1162  * @param message the actual message
1163  * @param atsi performance data for the connection
1164  * @return GNUNET_OK to keep the connection open,
1165  *         GNUNET_SYSERR to close it (signal serious error)
1166  */
1167 static int
1168 server_handle_hello (void *cls,
1169                      struct GNUNET_MESH_Tunnel *tunnel,
1170                      void **tunnel_ctx,
1171                      const struct GNUNET_PeerIdentity *sender,
1172                      const struct GNUNET_MessageHeader *message,
1173                      const struct GNUNET_ATS_Information*atsi)
1174 {
1175   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1176   struct GNUNET_STREAM_HelloAckMessage *reply;
1177
1178   GNUNET_assert (socket->tunnel == tunnel);
1179   if (STATE_INIT == socket->state)
1180     {
1181       /* Get the random sequence number */
1182       socket->write_sequence_number = 
1183         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1184       reply = 
1185         GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1186       reply->header.header.size = 
1187         htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1188       reply->header.header.type = 
1189         htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1190       reply->sequence_number = htonl (socket->write_sequence_number);
1191       queue_message (socket, 
1192                      &reply->header,
1193                      &set_state_hello_wait, 
1194                      NULL);
1195     }
1196   else
1197     {
1198       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1199                   "Client sent HELLO when in state %d\n", socket->state);
1200       /* FIXME: Send RESET? */
1201       
1202     }
1203   return GNUNET_OK;
1204 }
1205
1206
1207 /**
1208  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1209  *
1210  * @param cls the closure
1211  * @param tunnel connection to the other end
1212  * @param tunnel_ctx the socket
1213  * @param sender who sent the message
1214  * @param message the actual message
1215  * @param atsi performance data for the connection
1216  * @return GNUNET_OK to keep the connection open,
1217  *         GNUNET_SYSERR to close it (signal serious error)
1218  */
1219 static int
1220 server_handle_hello_ack (void *cls,
1221                          struct GNUNET_MESH_Tunnel *tunnel,
1222                          void **tunnel_ctx,
1223                          const struct GNUNET_PeerIdentity *sender,
1224                          const struct GNUNET_MessageHeader *message,
1225                          const struct GNUNET_ATS_Information*atsi)
1226 {
1227   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1228   const struct GNUNET_STREAM_HelloAckMessage *ack_message;
1229
1230   ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
1231   GNUNET_assert (socket->tunnel == tunnel);
1232   if (STATE_HELLO_WAIT == socket->state)
1233     {
1234       socket->read_sequence_number = ntohl (ack_message->sequence_number);
1235       socket->receive_window_available = 
1236         ntohl (ack_message->receive_window_size);
1237       /* Attain ESTABLISHED state */
1238       set_state_established (NULL, socket);
1239     }
1240   else
1241     {
1242       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1243                   "Client sent HELLO_ACK when in state %d\n", socket->state);
1244       /* FIXME: Send RESET? */
1245       
1246     }
1247   return GNUNET_OK;
1248 }
1249
1250
1251 /**
1252  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1253  *
1254  * @param cls the closure
1255  * @param tunnel connection to the other end
1256  * @param tunnel_ctx the socket
1257  * @param sender who sent the message
1258  * @param message the actual message
1259  * @param atsi performance data for the connection
1260  * @return GNUNET_OK to keep the connection open,
1261  *         GNUNET_SYSERR to close it (signal serious error)
1262  */
1263 static int
1264 server_handle_reset (void *cls,
1265                      struct GNUNET_MESH_Tunnel *tunnel,
1266                      void **tunnel_ctx,
1267                      const struct GNUNET_PeerIdentity *sender,
1268                      const struct GNUNET_MessageHeader *message,
1269                      const struct GNUNET_ATS_Information*atsi)
1270 {
1271   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1272
1273   return GNUNET_OK;
1274 }
1275
1276
1277 /**
1278  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1279  *
1280  * @param cls the closure
1281  * @param tunnel connection to the other end
1282  * @param tunnel_ctx the socket
1283  * @param sender who sent the message
1284  * @param message the actual message
1285  * @param atsi performance data for the connection
1286  * @return GNUNET_OK to keep the connection open,
1287  *         GNUNET_SYSERR to close it (signal serious error)
1288  */
1289 static int
1290 server_handle_transmit_close (void *cls,
1291                               struct GNUNET_MESH_Tunnel *tunnel,
1292                               void **tunnel_ctx,
1293                               const struct GNUNET_PeerIdentity *sender,
1294                               const struct GNUNET_MessageHeader *message,
1295                               const struct GNUNET_ATS_Information*atsi)
1296 {
1297   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1298
1299   return handle_transmit_close (socket,
1300                                 tunnel,
1301                                 sender,
1302                                 (struct GNUNET_STREAM_MessageHeader *)message,
1303                                 atsi);
1304 }
1305
1306
1307 /**
1308  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1309  *
1310  * @param cls the closure
1311  * @param tunnel connection to the other end
1312  * @param tunnel_ctx the socket
1313  * @param sender who sent the message
1314  * @param message the actual message
1315  * @param atsi performance data for the connection
1316  * @return GNUNET_OK to keep the connection open,
1317  *         GNUNET_SYSERR to close it (signal serious error)
1318  */
1319 static int
1320 server_handle_transmit_close_ack (void *cls,
1321                                   struct GNUNET_MESH_Tunnel *tunnel,
1322                                   void **tunnel_ctx,
1323                                   const struct GNUNET_PeerIdentity *sender,
1324                                   const struct GNUNET_MessageHeader *message,
1325                                   const struct GNUNET_ATS_Information*atsi)
1326 {
1327   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1328
1329   return GNUNET_OK;
1330 }
1331
1332
1333 /**
1334  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1335  *
1336  * @param cls the closure
1337  * @param tunnel connection to the other end
1338  * @param tunnel_ctx the socket
1339  * @param sender who sent the message
1340  * @param message the actual message
1341  * @param atsi performance data for the connection
1342  * @return GNUNET_OK to keep the connection open,
1343  *         GNUNET_SYSERR to close it (signal serious error)
1344  */
1345 static int
1346 server_handle_receive_close (void *cls,
1347                              struct GNUNET_MESH_Tunnel *tunnel,
1348                              void **tunnel_ctx,
1349                              const struct GNUNET_PeerIdentity *sender,
1350                              const struct GNUNET_MessageHeader *message,
1351                              const struct GNUNET_ATS_Information*atsi)
1352 {
1353   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1354
1355   return GNUNET_OK;
1356 }
1357
1358
1359 /**
1360  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1361  *
1362  * @param cls the closure
1363  * @param tunnel connection to the other end
1364  * @param tunnel_ctx the socket
1365  * @param sender who sent the message
1366  * @param message the actual message
1367  * @param atsi performance data for the connection
1368  * @return GNUNET_OK to keep the connection open,
1369  *         GNUNET_SYSERR to close it (signal serious error)
1370  */
1371 static int
1372 server_handle_receive_close_ack (void *cls,
1373                                  struct GNUNET_MESH_Tunnel *tunnel,
1374                                  void **tunnel_ctx,
1375                                  const struct GNUNET_PeerIdentity *sender,
1376                                  const struct GNUNET_MessageHeader *message,
1377                                  const struct GNUNET_ATS_Information*atsi)
1378 {
1379   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1380
1381   return GNUNET_OK;
1382 }
1383
1384
1385 /**
1386  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1387  *
1388  * @param cls the closure
1389  * @param tunnel connection to the other end
1390  * @param tunnel_ctx the socket
1391  * @param sender who sent the message
1392  * @param message the actual message
1393  * @param atsi performance data for the connection
1394  * @return GNUNET_OK to keep the connection open,
1395  *         GNUNET_SYSERR to close it (signal serious error)
1396  */
1397 static int
1398 server_handle_close (void *cls,
1399                      struct GNUNET_MESH_Tunnel *tunnel,
1400                      void **tunnel_ctx,
1401                      const struct GNUNET_PeerIdentity *sender,
1402                      const struct GNUNET_MessageHeader *message,
1403                      const struct GNUNET_ATS_Information*atsi)
1404 {
1405   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1406
1407   return GNUNET_OK;
1408 }
1409
1410
1411 /**
1412  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1413  *
1414  * @param cls the closure
1415  * @param tunnel connection to the other end
1416  * @param tunnel_ctx the socket
1417  * @param sender who sent the message
1418  * @param message the actual message
1419  * @param atsi performance data for the connection
1420  * @return GNUNET_OK to keep the connection open,
1421  *         GNUNET_SYSERR to close it (signal serious error)
1422  */
1423 static int
1424 server_handle_close_ack (void *cls,
1425                          struct GNUNET_MESH_Tunnel *tunnel,
1426                          void **tunnel_ctx,
1427                          const struct GNUNET_PeerIdentity *sender,
1428                          const struct GNUNET_MessageHeader *message,
1429                          const struct GNUNET_ATS_Information*atsi)
1430 {
1431   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1432
1433   return GNUNET_OK;
1434 }
1435
1436
1437 /**
1438  * Message Handler for mesh
1439  *
1440  * @param socket the socket through which the ack was received
1441  * @param tunnel connection to the other end
1442  * @param sender who sent the message
1443  * @param ack the acknowledgment message
1444  * @param atsi performance data for the connection
1445  * @return GNUNET_OK to keep the connection open,
1446  *         GNUNET_SYSERR to close it (signal serious error)
1447  */
1448 static int
1449 handle_ack (struct GNUNET_STREAM_Socket *socket,
1450             struct GNUNET_MESH_Tunnel *tunnel,
1451             const struct GNUNET_PeerIdentity *sender,
1452             const struct GNUNET_STREAM_AckMessage *ack,
1453             const struct GNUNET_ATS_Information*atsi)
1454 {
1455   switch (socket->state)
1456     {
1457     case (STATE_ESTABLISHED):
1458       if (NULL == socket->write_handle)
1459         {
1460           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1461                       "Received DATA ACK when write_handle is NULL\n");
1462           return GNUNET_OK;
1463         }
1464
1465       socket->write_handle->ack_bitmap = GNUNET_ntohll (ack->bitmap);
1466       socket->receive_window_available = 
1467         ntohl (ack->receive_window_remaining);
1468       write_data (socket);
1469       break;
1470     default:
1471       break;
1472     }
1473   return GNUNET_OK;
1474 }
1475
1476
1477 /**
1478  * Message Handler for mesh
1479  *
1480  * @param cls the 'struct GNUNET_STREAM_Socket'
1481  * @param tunnel connection to the other end
1482  * @param tunnel_ctx unused
1483  * @param sender who sent the message
1484  * @param message the actual message
1485  * @param atsi performance data for the connection
1486  * @return GNUNET_OK to keep the connection open,
1487  *         GNUNET_SYSERR to close it (signal serious error)
1488  */
1489 static int
1490 client_handle_ack (void *cls,
1491                    struct GNUNET_MESH_Tunnel *tunnel,
1492                    void **tunnel_ctx,
1493                    const struct GNUNET_PeerIdentity *sender,
1494                    const struct GNUNET_MessageHeader *message,
1495                    const struct GNUNET_ATS_Information*atsi)
1496 {
1497   struct GNUNET_STREAM_Socket *socket = cls;
1498   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
1499  
1500   return handle_ack (socket, tunnel, sender, ack, atsi);
1501 }
1502
1503
1504 /**
1505  * Message Handler for mesh
1506  *
1507  * @param cls the server's listen socket
1508  * @param tunnel connection to the other end
1509  * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
1510  * @param sender who sent the message
1511  * @param message the actual message
1512  * @param atsi performance data for the connection
1513  * @return GNUNET_OK to keep the connection open,
1514  *         GNUNET_SYSERR to close it (signal serious error)
1515  */
1516 static int
1517 server_handle_ack (void *cls,
1518                    struct GNUNET_MESH_Tunnel *tunnel,
1519                    void **tunnel_ctx,
1520                    const struct GNUNET_PeerIdentity *sender,
1521                    const struct GNUNET_MessageHeader *message,
1522                    const struct GNUNET_ATS_Information*atsi)
1523 {
1524   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1525   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
1526  
1527   return handle_ack (socket, tunnel, sender, ack, atsi);
1528 }
1529
1530
1531 /**
1532  * For client message handlers, the stream socket is in the
1533  * closure argument.
1534  */
1535 static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
1536   {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
1537   {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
1538    sizeof (struct GNUNET_STREAM_AckMessage) },
1539   {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
1540    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
1541   {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
1542    sizeof (struct GNUNET_STREAM_MessageHeader)},
1543   {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
1544    sizeof (struct GNUNET_STREAM_MessageHeader)},
1545   {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
1546    sizeof (struct GNUNET_STREAM_MessageHeader)},
1547   {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
1548    sizeof (struct GNUNET_STREAM_MessageHeader)},
1549   {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
1550    sizeof (struct GNUNET_STREAM_MessageHeader)},
1551   {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
1552    sizeof (struct GNUNET_STREAM_MessageHeader)},
1553   {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
1554    sizeof (struct GNUNET_STREAM_MessageHeader)},
1555   {NULL, 0, 0}
1556 };
1557
1558
1559 /**
1560  * For server message handlers, the stream socket is in the
1561  * tunnel context, and the listen socket in the closure argument.
1562  */
1563 static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
1564   {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
1565   {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
1566    sizeof (struct GNUNET_STREAM_AckMessage) },
1567   {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 
1568    sizeof (struct GNUNET_STREAM_MessageHeader)},
1569   {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
1570    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
1571   {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
1572    sizeof (struct GNUNET_STREAM_MessageHeader)},
1573   {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
1574    sizeof (struct GNUNET_STREAM_MessageHeader)},
1575   {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
1576    sizeof (struct GNUNET_STREAM_MessageHeader)},
1577   {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
1578    sizeof (struct GNUNET_STREAM_MessageHeader)},
1579   {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
1580    sizeof (struct GNUNET_STREAM_MessageHeader)},
1581   {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
1582    sizeof (struct GNUNET_STREAM_MessageHeader)},
1583   {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
1584    sizeof (struct GNUNET_STREAM_MessageHeader)},
1585   {NULL, 0, 0}
1586 };
1587
1588
1589 /**
1590  * Function called when our target peer is connected to our tunnel
1591  *
1592  * @param cls the socket for which this tunnel is created
1593  * @param peer the peer identity of the target
1594  * @param atsi performance data for the connection
1595  */
1596 static void
1597 mesh_peer_connect_callback (void *cls,
1598                             const struct GNUNET_PeerIdentity *peer,
1599                             const struct GNUNET_ATS_Information * atsi)
1600 {
1601   struct GNUNET_STREAM_Socket *socket = cls;
1602   struct GNUNET_STREAM_MessageHeader *message;
1603
1604   if (0 != memcmp (&socket->other_peer, 
1605                    peer, 
1606                    sizeof (struct GNUNET_PeerIdentity)))
1607     {
1608       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1609                   "A peer (%s) which is not our target has connected to our tunnel", 
1610                   GNUNET_i2s (peer));
1611       return;
1612     }
1613   
1614   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1615               "Target peer %s connected\n", GNUNET_i2s (peer));
1616   
1617   /* Set state to INIT */
1618   socket->state = STATE_INIT;
1619
1620   /* Send HELLO message */
1621   message = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1622   message->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
1623   message->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1624   queue_message (socket,
1625                  message,
1626                  &set_state_hello_wait,
1627                  NULL);
1628
1629   /* Call open callback */
1630   if (NULL == socket->open_cb)
1631     {
1632       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1633                   "STREAM_open callback is NULL\n");
1634     }
1635   else
1636     {
1637       socket->open_cb (socket->open_cls, socket);
1638     }
1639 }
1640
1641
1642 /**
1643  * Function called when our target peer is disconnected from our tunnel
1644  *
1645  * @param cls the socket associated which this tunnel
1646  * @param peer the peer identity of the target
1647  */
1648 static void
1649 mesh_peer_disconnect_callback (void *cls,
1650                                const struct GNUNET_PeerIdentity *peer)
1651 {
1652
1653 }
1654
1655
1656 /**
1657  * Task for calling the read processor
1658  *
1659  * @param cls the socket
1660  */
1661 static void
1662 call_read_processor_task (void *cls,
1663                           const struct GNUNET_SCHEDULER_TaskContext *tc)
1664 {
1665   struct GNUNET_STREAM_Socket *socket = cls;
1666   size_t read_size;
1667   size_t valid_read_size;
1668
1669   if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN) return;
1670
1671   GNUNET_assert (NULL != socket->read_handle);
1672   GNUNET_assert (NULL != socket->read_handle->proc);
1673   GNUNET_assert (NULL != socket->copy_buffer);
1674   GNUNET_assert (0 != socket->copy_buffer_size);
1675
1676   valid_read_size = socket->copy_buffer_size - socket->copy_buffer_read_offset;
1677   GNUNET_assert (0 != valid_read_size);
1678
1679   read_size = socket->read_handle->proc (socket->read_handle->proc_cls,
1680                                          socket->status,
1681                                          socket->copy_buffer 
1682                                          + socket->copy_buffer_read_offset,
1683                                          valid_read_size);
1684
1685   GNUNET_assert (read_size <= valid_read_size);
1686   socket->copy_buffer_read_offset += read_size;
1687
1688   /* Free the copy buffer once it has been read entirely */
1689   if (socket->copy_buffer_read_offset == socket->copy_buffer_size)
1690     {
1691       GNUNET_free (socket->copy_buffer);
1692       socket->copy_buffer = NULL;
1693       socket->copy_buffer_size = 0;
1694       socket->copy_buffer_read_offset = 0;
1695     }
1696
1697   /* Free the read handle */
1698   GNUNET_free (socket->read_handle);
1699   socket->read_handle = NULL;
1700 }
1701
1702
1703 /*****************/
1704 /* API functions */
1705 /*****************/
1706
1707
1708 /**
1709  * Tries to open a stream to the target peer
1710  *
1711  * @param cfg configuration to use
1712  * @param target the target peer to which the stream has to be opened
1713  * @param app_port the application port number which uniquely identifies this
1714  *            stream
1715  * @param open_cb this function will be called after stream has be established 
1716  * @param open_cb_cls the closure for open_cb
1717  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
1718  * @return if successful it returns the stream socket; NULL if stream cannot be
1719  *         opened 
1720  */
1721 struct GNUNET_STREAM_Socket *
1722 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
1723                     const struct GNUNET_PeerIdentity *target,
1724                     GNUNET_MESH_ApplicationType app_port,
1725                     GNUNET_STREAM_OpenCallback open_cb,
1726                     void *open_cb_cls,
1727                     ...)
1728 {
1729   struct GNUNET_STREAM_Socket *socket;
1730   enum GNUNET_STREAM_Option option;
1731   va_list vargs;                /* Variable arguments */
1732
1733   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
1734   socket->other_peer = *target;
1735   socket->open_cb = open_cb;
1736   socket->open_cls = open_cb_cls;
1737
1738   /* Set defaults */
1739   socket->retransmit_timeout = 
1740     GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
1741
1742   va_start (vargs, open_cb_cls); /* Parse variable args */
1743   do {
1744     option = va_arg (vargs, enum GNUNET_STREAM_Option);
1745     switch (option)
1746       {
1747       case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
1748         /* Expect struct GNUNET_TIME_Relative */
1749         socket->retransmit_timeout = va_arg (vargs,
1750                                              struct GNUNET_TIME_Relative);
1751         break;
1752       case GNUNET_STREAM_OPTION_END:
1753         break;
1754       }
1755   } while (GNUNET_STREAM_OPTION_END != option);
1756   va_end (vargs);               /* End of variable args parsing */
1757
1758   socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
1759                                       1,  /* QUEUE size as parameter? */
1760                                       socket, /* cls */
1761                                       NULL, /* No inbound tunnel handler */
1762                                       NULL, /* No inbound tunnel cleaner */
1763                                       client_message_handlers,
1764                                       NULL); /* We don't get inbound tunnels */
1765   // FIXME: if (NULL == socket->mesh) ...
1766
1767   /* Now create the mesh tunnel to target */
1768   socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
1769                                               NULL, /* Tunnel context */
1770                                               &mesh_peer_connect_callback,
1771                                               &mesh_peer_disconnect_callback,
1772                                               socket);
1773   // FIXME: if (NULL == socket->tunnel) ...
1774
1775   return socket;
1776 }
1777
1778
1779 /**
1780  * Closes the stream
1781  *
1782  * @param socket the stream socket
1783  */
1784 void
1785 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
1786 {
1787   struct MessageQueue *head;
1788
1789   /* Clear Transmit handles */
1790   if (NULL != socket->transmit_handle)
1791     {
1792       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
1793       socket->transmit_handle = NULL;
1794     }
1795
1796   /* Clear existing message queue */
1797   while (NULL != (head = socket->queue_head)) {
1798     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
1799                                  socket->queue_tail,
1800                                  head);
1801     GNUNET_free (head->message);
1802     GNUNET_free (head);
1803   }
1804
1805   /* Close associated tunnel */
1806   if (NULL != socket->tunnel)
1807     {
1808       GNUNET_MESH_tunnel_destroy (socket->tunnel);
1809       socket->tunnel = NULL;
1810     }
1811
1812   /* Close mesh connection */
1813   if (NULL != socket->mesh)
1814     {
1815       GNUNET_MESH_disconnect (socket->mesh);
1816       socket->mesh = NULL;
1817     }
1818   
1819   /* Release receive buffer */
1820   if (NULL != socket->receive_buffer)
1821     {
1822       GNUNET_free (socket->receive_buffer);
1823     }
1824
1825   GNUNET_free (socket);
1826 }
1827
1828
1829 /**
1830  * Method called whenever a peer creates a tunnel to us
1831  *
1832  * @param cls closure
1833  * @param tunnel new handle to the tunnel
1834  * @param initiator peer that started the tunnel
1835  * @param atsi performance information for the tunnel
1836  * @return initial tunnel context for the tunnel
1837  *         (can be NULL -- that's not an error)
1838  */
1839 static void *
1840 new_tunnel_notify (void *cls,
1841                    struct GNUNET_MESH_Tunnel *tunnel,
1842                    const struct GNUNET_PeerIdentity *initiator,
1843                    const struct GNUNET_ATS_Information *atsi)
1844 {
1845   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
1846   struct GNUNET_STREAM_Socket *socket;
1847
1848   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
1849   socket->tunnel = tunnel;
1850   socket->session_id = 0;       /* FIXME */
1851   socket->other_peer = *initiator;
1852   socket->state = STATE_INIT;
1853
1854   if (GNUNET_SYSERR == lsocket->listen_cb (lsocket->listen_cb_cls,
1855                                            socket,
1856                                            &socket->other_peer))
1857     {
1858       socket->state = STATE_CLOSED;
1859       /* FIXME: Send CLOSE message and then free */
1860       GNUNET_free (socket);
1861       GNUNET_MESH_tunnel_destroy (tunnel); /* Destroy the tunnel */
1862     }
1863   return socket;
1864 }
1865
1866
1867 /**
1868  * Function called whenever an inbound tunnel is destroyed.  Should clean up
1869  * any associated state.  This function is NOT called if the client has
1870  * explicitly asked for the tunnel to be destroyed using
1871  * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
1872  * the tunnel.
1873  *
1874  * @param cls closure (set from GNUNET_MESH_connect)
1875  * @param tunnel connection to the other end (henceforth invalid)
1876  * @param tunnel_ctx place where local state associated
1877  *                   with the tunnel is stored
1878  */
1879 static void 
1880 tunnel_cleaner (void *cls,
1881                 const struct GNUNET_MESH_Tunnel *tunnel,
1882                 void *tunnel_ctx)
1883 {
1884   struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
1885   
1886   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1887               "Peer %s has terminated connection abruptly\n",
1888               GNUNET_i2s (&socket->other_peer));
1889
1890   socket->status = GNUNET_STREAM_SHUTDOWN;
1891   /* Clear Transmit handles */
1892   if (NULL != socket->transmit_handle)
1893     {
1894       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
1895       socket->transmit_handle = NULL;
1896     }
1897   socket->tunnel = NULL;
1898 }
1899
1900
1901 /**
1902  * Listens for stream connections for a specific application ports
1903  *
1904  * @param cfg the configuration to use
1905  * @param app_port the application port for which new streams will be accepted
1906  * @param listen_cb this function will be called when a peer tries to establish
1907  *            a stream with us
1908  * @param listen_cb_cls closure for listen_cb
1909  * @return listen socket, NULL for any error
1910  */
1911 struct GNUNET_STREAM_ListenSocket *
1912 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
1913                       GNUNET_MESH_ApplicationType app_port,
1914                       GNUNET_STREAM_ListenCallback listen_cb,
1915                       void *listen_cb_cls)
1916 {
1917   /* FIXME: Add variable args for passing configration options? */
1918   struct GNUNET_STREAM_ListenSocket *lsocket;
1919   GNUNET_MESH_ApplicationType app_types[2];
1920
1921   app_types[0] = app_port;
1922   app_types[1] = 0;
1923   lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
1924   lsocket->port = app_port;
1925   lsocket->listen_cb = listen_cb;
1926   lsocket->listen_cb_cls = listen_cb_cls;
1927   lsocket->mesh = GNUNET_MESH_connect (cfg,
1928                                        10, /* FIXME: QUEUE size as parameter? */
1929                                        lsocket, /* Closure */
1930                                        &new_tunnel_notify,
1931                                        &tunnel_cleaner,
1932                                        server_message_handlers,
1933                                        app_types);
1934   return lsocket;
1935 }
1936
1937
1938 /**
1939  * Closes the listen socket
1940  *
1941  * @param lsocket the listen socket
1942  */
1943 void
1944 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
1945 {
1946   /* Close MESH connection */
1947   GNUNET_MESH_disconnect (lsocket->mesh);
1948   
1949   GNUNET_free (lsocket);
1950 }
1951
1952
1953 /**
1954  * Tries to write the given data to the stream
1955  *
1956  * @param socket the socket representing a stream
1957  * @param data the data buffer from where the data is written into the stream
1958  * @param size the number of bytes to be written from the data buffer
1959  * @param timeout the timeout period
1960  * @param write_cont the function to call upon writing some bytes into the stream
1961  * @param write_cont_cls the closure
1962  * @return handle to cancel the operation
1963  */
1964 struct GNUNET_STREAM_IOWriteHandle *
1965 GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
1966                      const void *data,
1967                      size_t size,
1968                      struct GNUNET_TIME_Relative timeout,
1969                      GNUNET_STREAM_CompletionContinuation write_cont,
1970                      void *write_cont_cls)
1971 {
1972   unsigned int num_needed_packets;
1973   unsigned int packet;
1974   struct GNUNET_STREAM_IOWriteHandle *io_handle;
1975   uint32_t packet_size;
1976   uint32_t payload_size;
1977   struct GNUNET_STREAM_DataMessage *data_msg;
1978   const void *sweep;
1979
1980   /* Return NULL if there is already a write request pending */
1981   if (NULL != socket->write_handle)
1982   {
1983     GNUNET_break (0);
1984     return NULL;
1985   }
1986   if (!((STATE_ESTABLISHED == socket->state)
1987         || (STATE_RECEIVE_CLOSE_WAIT == socket->state)
1988         || (STATE_RECEIVE_CLOSED == socket->state)))
1989     {
1990       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1991                   "Attempting to write on a closed (OR) not-yet-established"
1992                   "stream\n"); 
1993       return NULL;
1994     } 
1995   if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size)
1996     size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH  * max_payload_size;
1997   num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size;
1998   io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
1999   sweep = data;
2000   /* Divide the given buffer into packets for sending */
2001   for (packet=0; packet < num_needed_packets; packet++)
2002     {
2003       if ((packet + 1) * max_payload_size < size) 
2004         {
2005           payload_size = max_payload_size;
2006           packet_size = MAX_PACKET_SIZE;
2007         }
2008       else 
2009         {
2010           payload_size = size - packet * max_payload_size;
2011           packet_size =  payload_size + sizeof (struct
2012                                                 GNUNET_STREAM_DataMessage); 
2013         }
2014       io_handle->messages[packet] = GNUNET_malloc (packet_size);
2015       io_handle->messages[packet]->header.header.size = htons (packet_size);
2016       io_handle->messages[packet]->header.header.type =
2017         htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
2018       io_handle->messages[packet]->sequence_number =
2019         htons (socket->write_sequence_number++);
2020       io_handle->messages[packet]->offset = htons (socket->write_offset);
2021
2022       /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
2023          determined from RTT */
2024       io_handle->messages[packet]->ack_deadline = 
2025         GNUNET_TIME_relative_hton (GNUNET_TIME_relative_multiply 
2026                                    (GNUNET_TIME_UNIT_SECONDS, 5));
2027       data_msg = io_handle->messages[packet];
2028       /* Copy data from given buffer to the packet */
2029       memcpy (&data_msg[1],
2030               sweep,
2031               payload_size);
2032       sweep += payload_size;
2033       socket->write_offset += payload_size;
2034     }
2035   socket->write_handle = io_handle;
2036   write_data (socket);
2037
2038   return io_handle;
2039 }
2040
2041
2042 /**
2043  * Tries to read data from the stream
2044  *
2045  * @param socket the socket representing a stream
2046  * @param timeout the timeout period
2047  * @param proc function to call with data (once only)
2048  * @param proc_cls the closure for proc
2049  * @return handle to cancel the operation
2050  */
2051 struct GNUNET_STREAM_IOReadHandle *
2052 GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
2053                     struct GNUNET_TIME_Relative timeout,
2054                     GNUNET_STREAM_DataProcessor proc,
2055                     void *proc_cls)
2056 {
2057   unsigned int packet;
2058   struct GNUNET_STREAM_IOReadHandle *read_handle;
2059   uint32_t offset_increase;
2060   uint32_t sequence_increase;
2061   
2062   /* Return NULL if there is already a read handle; the user has to cancel that
2063   first before continuing or has to wait until it is completed */
2064   if (NULL != socket->read_handle) return NULL;
2065
2066   read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
2067   read_handle->proc = proc;
2068   socket->read_handle = read_handle;
2069
2070   /* if previous copy buffer is still not read call the data processor on it */
2071   if (NULL != socket->copy_buffer)
2072     {
2073       GNUNET_SCHEDULER_add_now (&call_read_processor_task,
2074                                 socket);
2075     }
2076
2077   /* Check the bitmap for any holes */
2078   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2079     {
2080       if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
2081                                              packet))
2082         break;
2083     }
2084
2085   sequence_increase = packet;
2086
2087   if (0 == sequence_increase)              /* The first packet is still missing */
2088     {
2089       /* We can't do anything until it arrives */
2090     }
2091   else
2092     {
2093       /* Copy data to copy buffer */
2094       GNUNET_assert (0 < socket->receive_buffer_boundaries[sequence_increase-1]);
2095       socket->copy_buffer = 
2096         GNUNET_malloc (socket->receive_buffer_boundaries[sequence_increase-1]);
2097
2098       /* Shift the data in the receive buffer */
2099       memmove (socket->receive_buffer,
2100                socket->receive_buffer 
2101                + socket->receive_buffer_boundaries[sequence_increase-1],
2102                socket->receive_buffer_size - socket->receive_buffer_boundaries[sequence_increase-1]);
2103       
2104       /* Shift the bitmap */
2105       socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
2106
2107       /* Set read_sequence_number */
2108       socket->read_sequence_number += sequence_increase;
2109
2110       /* Set read_offset */
2111       offset_increase = socket->receive_buffer_boundaries[sequence_increase-1];
2112       socket->read_offset += offset_increase;
2113       
2114       /* Fix relative boundaries */
2115       for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2116         {
2117           if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase)
2118             {
2119               socket->receive_buffer_boundaries[packet] = 
2120                 socket->receive_buffer_boundaries[packet + sequence_increase] 
2121                 - offset_increase;
2122             }
2123           else
2124             socket->receive_buffer_boundaries[packet] = 0;
2125         }
2126     }
2127
2128   return read_handle;
2129 }