-fix
[oweals/gnunet.git] / src / stream / stream_api.c
1 /*
2   This file is part of GNUnet.
3   (C) 2012 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file stream/stream_api.c
23  * @brief Implementation of the stream library
24  * @author Sree Harsha Totakura
25  */
26 #include "platform.h"
27 #include "gnunet_common.h"
28 #include "gnunet_crypto_lib.h"
29 #include "gnunet_stream_lib.h"
30 #include "stream_protocol.h"
31
32
33 /**
34  * The maximum packet size of a stream packet
35  */
36 #define MAX_PACKET_SIZE 64000
37
38 /**
39  * The maximum payload a data message packet can carry
40  */
41 static size_t max_payload_size = 
42   MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
43
44 /**
45  * Receive buffer
46  */
47 #define RECEIVE_BUFFER_SIZE 4096000
48
49 /**
50  * states in the Protocol
51  */
52 enum State
53   {
54     /**
55      * Client initialization state
56      */
57     STATE_INIT,
58
59     /**
60      * Listener initialization state 
61      */
62     STATE_LISTEN,
63
64     /**
65      * Pre-connection establishment state
66      */
67     STATE_HELLO_WAIT,
68
69     /**
70      * State where a connection has been established
71      */
72     STATE_ESTABLISHED,
73
74     /**
75      * State where the socket is closed on our side and waiting to be ACK'ed
76      */
77     STATE_RECEIVE_CLOSE_WAIT,
78
79     /**
80      * State where the socket is closed for reading
81      */
82     STATE_RECEIVE_CLOSED,
83
84     /**
85      * State where the socket is closed on our side and waiting to be ACK'ed
86      */
87     STATE_TRANSMIT_CLOSE_WAIT,
88
89     /**
90      * State where the socket is closed for writing
91      */
92     STATE_TRANSMIT_CLOSED,
93
94     /**
95      * State where the socket is closed on our side and waiting to be ACK'ed
96      */
97     STATE_CLOSE_WAIT,
98
99     /**
100      * State where the socket is closed
101      */
102     STATE_CLOSED 
103   };
104
105
106 /**
107  * Functions of this type are called when a message is written
108  *
109  * @param cls the closure from queue_message
110  * @param socket the socket the written message was bound to
111  */
112 typedef void (*SendFinishCallback) (void *cls,
113                                     struct GNUNET_STREAM_Socket *socket);
114
115
116 /**
117  * The send message queue
118  */
119 struct MessageQueue
120 {
121   /**
122    * The message
123    */
124   struct GNUNET_STREAM_MessageHeader *message;
125
126   /**
127    * Callback to be called when the message is sent
128    */
129   SendFinishCallback finish_cb;
130
131   /**
132    * The closure for finish_cb
133    */
134   void *finish_cb_cls;
135
136   /**
137    * The next message in queue. Should be NULL in the last message
138    */
139   struct MessageQueue *next;
140
141   /**
142    * The next message in queue. Should be NULL in the first message
143    */
144   struct MessageQueue *prev;
145 };
146
147
148 /**
149  * The STREAM Socket Handler
150  */
151 struct GNUNET_STREAM_Socket
152 {
153
154   /**
155    * The peer identity of the peer at the other end of the stream
156    */
157   struct GNUNET_PeerIdentity other_peer;
158
159   /**
160    * Retransmission timeout
161    */
162   struct GNUNET_TIME_Relative retransmit_timeout;
163
164   /**
165    * The Acknowledgement Bitmap
166    */
167   GNUNET_STREAM_AckBitmap ack_bitmap;
168
169   /**
170    * Time when the Acknowledgement was queued
171    */
172   struct GNUNET_TIME_Absolute ack_time_registered;
173
174   /**
175    * Queued Acknowledgement deadline
176    */
177   struct GNUNET_TIME_Relative ack_time_deadline;
178
179   /**
180    * The task for sending timely Acks
181    */
182   GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
183
184   /**
185    * Task scheduled to continue a read operation.
186    */
187   GNUNET_SCHEDULER_TaskIdentifier read_task;
188
189   /**
190    * The mesh handle
191    */
192   struct GNUNET_MESH_Handle *mesh;
193
194   /**
195    * The mesh tunnel handle
196    */
197   struct GNUNET_MESH_Tunnel *tunnel;
198
199   /**
200    * Stream open closure
201    */
202   void *open_cls;
203
204   /**
205    * Stream open callback
206    */
207   GNUNET_STREAM_OpenCallback open_cb;
208
209   /**
210    * The current transmit handle (if a pending transmit request exists)
211    */
212   struct GNUNET_MESH_TransmitHandle *transmit_handle;
213
214   /**
215    * The current message associated with the transmit handle
216    */
217   struct MessageQueue *queue_head;
218
219   /**
220    * The queue tail, should always point to the last message in queue
221    */
222   struct MessageQueue *queue_tail;
223
224   /**
225    * The write IO_handle associated with this socket
226    */
227   struct GNUNET_STREAM_IOWriteHandle *write_handle;
228
229   /**
230    * The read IO_handle associated with this socket
231    */
232   struct GNUNET_STREAM_IOReadHandle *read_handle;
233
234   /**
235    * Buffer for storing received messages
236    */
237   void *receive_buffer;
238
239   /**
240    * Copy buffer pointer; Used during read operations
241    */
242   void *copy_buffer;
243
244   /**
245    * Task identifier for the read io timeout task
246    */
247   GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task;
248
249   /**
250    * The state of the protocol associated with this socket
251    */
252   enum State state;
253
254   /**
255    * The status of the socket
256    */
257   enum GNUNET_STREAM_Status status;
258
259   /**
260    * The number of previous timeouts; FIXME: currently not used
261    */
262   unsigned int retries;
263
264   /**
265    * The session id associated with this stream connection
266    * FIXME: Not used currently, may be removed
267    */
268   uint32_t session_id;
269
270   /**
271    * Write sequence number. Set to random when sending HELLO(client) and
272    * HELLO_ACK(server) 
273    */
274   uint32_t write_sequence_number;
275
276   /**
277    * Read sequence number. This number's value is determined during handshake
278    */
279   uint32_t read_sequence_number;
280
281   /**
282    * The receiver buffer size
283    */
284   uint32_t receive_buffer_size;
285
286   /**
287    * The receiver buffer boundaries
288    */
289   uint32_t receive_buffer_boundaries[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
290
291   /**
292    * receiver's available buffer after the last acknowledged packet
293    */
294   uint32_t receive_window_available;
295
296   /**
297    * The offset pointer used during write operation
298    */
299   uint32_t write_offset;
300
301   /**
302    * The offset after which we are expecting data
303    */
304   uint32_t read_offset;
305
306   /**
307    * The size of the copy buffer
308    */
309   uint32_t copy_buffer_size;
310   
311   /**
312    * The read offset of copy buffer
313    */
314   uint32_t copy_buffer_read_offset;
315 };
316
317
318 /**
319  * A socket for listening
320  */
321 struct GNUNET_STREAM_ListenSocket
322 {
323
324   /**
325    * The mesh handle
326    */
327   struct GNUNET_MESH_Handle *mesh;
328
329   /**
330    * The callback function which is called after successful opening socket
331    */
332   GNUNET_STREAM_ListenCallback listen_cb;
333
334   /**
335    * The call back closure
336    */
337   void *listen_cb_cls;
338
339   /**
340    * The service port
341    */
342   GNUNET_MESH_ApplicationType port;
343 };
344
345
346 /**
347  * The IO Write Handle
348  */
349 struct GNUNET_STREAM_IOWriteHandle
350 {
351   /**
352    * The packet_buffers associated with this Handle
353    */
354   struct GNUNET_STREAM_DataMessage *messages[64];
355
356   /**
357    * The bitmap of this IOHandle; Corresponding bit for a message is set when
358    * it has been acknowledged by the receiver
359    */
360   GNUNET_STREAM_AckBitmap ack_bitmap;
361
362   /**
363    * Number of packets sent before waiting for an ack
364    *
365    * FIXME: Do we need this?
366    */
367   unsigned int sent_packets;
368 };
369
370
371 /**
372  * The IO Read Handle
373  */
374 struct GNUNET_STREAM_IOReadHandle
375 {
376   /**
377    * Callback for the read processor
378    */
379   GNUNET_STREAM_DataProcessor proc;
380
381   /**
382    * The closure pointer for the read processor callback
383    */
384   void *proc_cls;
385 };
386
387
388 /**
389  * Default value in seconds for various timeouts
390  */
391 static unsigned int default_timeout = 300;
392
393
394 /**
395  * Callback function for sending hello message
396  *
397  * @param cls closure the socket
398  * @param size number of bytes available in buf
399  * @param buf where the callee should write the message
400  * @return number of bytes written to buf
401  */
402 static size_t
403 send_message_notify (void *cls, size_t size, void *buf)
404 {
405   struct GNUNET_STREAM_Socket *socket = cls;
406   struct MessageQueue *head;
407   size_t ret;
408
409   socket->transmit_handle = NULL; /* Remove the transmit handle */
410   head = socket->queue_head;
411   if (NULL == head)
412     return 0; /* just to be safe */
413   if (0 == size)                /* request timed out */
414     {
415       socket->retries++;
416       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
417                   "Message sending timed out. Retry %d \n",
418                   socket->retries);
419       socket->transmit_handle = 
420         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
421                                            0, /* Corking */
422                                            1, /* Priority */
423                                            /* FIXME: exponential backoff */
424                                            socket->retransmit_timeout,
425                                            &socket->other_peer,
426                                            ntohs (head->message->header.size),
427                                            &send_message_notify,
428                                            socket);
429       return 0;
430     }
431
432   ret = ntohs (head->message->header.size);
433   GNUNET_assert (size >= ret);
434   memcpy (buf, head->message, ret);
435   if (NULL != head->finish_cb)
436     {
437       head->finish_cb (socket, head->finish_cb_cls);
438     }
439   GNUNET_CONTAINER_DLL_remove (socket->queue_head,
440                                socket->queue_tail,
441                                head);
442   GNUNET_free (head->message);
443   GNUNET_free (head);
444   head = socket->queue_head;
445   if (NULL != head)    /* more pending messages to send */
446     {
447       socket->retries = 0;
448       socket->transmit_handle = 
449         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
450                                            0, /* Corking */
451                                            1, /* Priority */
452                                            /* FIXME: exponential backoff */
453                                            socket->retransmit_timeout,
454                                            &socket->other_peer,
455                                            ntohs (head->message->header.size),
456                                            &send_message_notify,
457                                            socket);
458     }
459   return ret;
460 }
461
462
463 /**
464  * Queues a message for sending using the mesh connection of a socket
465  *
466  * @param socket the socket whose mesh connection is used
467  * @param message the message to be sent
468  * @param finish_cb the callback to be called when the message is sent
469  * @param finish_cb_cls the closure for the callback
470  */
471 static void
472 queue_message (struct GNUNET_STREAM_Socket *socket,
473                struct GNUNET_STREAM_MessageHeader *message,
474                SendFinishCallback finish_cb,
475                void *finish_cb_cls)
476 {
477   struct MessageQueue *queue_entity;
478
479   queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
480   queue_entity->message = message;
481   queue_entity->finish_cb = finish_cb;
482   queue_entity->finish_cb_cls = finish_cb_cls;
483   GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
484                                     socket->queue_tail,
485                                     queue_entity);
486   if (NULL == socket->transmit_handle)
487   {
488     socket->retries = 0;
489     socket->transmit_handle = 
490       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
491                                          0, /* Corking */
492                                          1, /* Priority */
493                                          socket->retransmit_timeout,
494                                          &socket->other_peer,
495                                          ntohs (message->header.size),
496                                          &send_message_notify,
497                                          socket);
498   }
499 }
500
501
502 /**
503  * Callback function for sending ack message
504  *
505  * @param cls closure the ACK message created in ack_task
506  * @param size number of bytes available in buffer
507  * @param buf where the callee should write the message
508  * @return number of bytes written to buf
509  */
510 static size_t
511 send_ack_notify (void *cls, size_t size, void *buf)
512 {
513   struct GNUNET_STREAM_AckMessage *ack_msg = cls;
514
515   if (0 == size)
516     {
517       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
518                   "%s called with size 0\n", __func__);
519       return 0;
520     }
521   GNUNET_assert (ack_msg->header.header.size <= size);
522   
523   size = ack_msg->header.header.size;
524   memcpy (buf, ack_msg, size);
525   return size;
526 }
527
528
529 /**
530  * Task for sending ACK message
531  *
532  * @param cls the socket
533  * @param tc the Task context
534  */
535 static void
536 ack_task (void *cls,
537           const struct GNUNET_SCHEDULER_TaskContext *tc)
538 {
539   struct GNUNET_STREAM_Socket *socket = cls;
540   struct GNUNET_STREAM_AckMessage *ack_msg;
541
542   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
543     {
544       return;
545     }
546
547   socket->ack_task_id = 0;
548
549   /* Create the ACK Message */
550   ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
551   ack_msg->header.header.size = htons (sizeof (struct 
552                                                GNUNET_STREAM_AckMessage));
553   ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
554   ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
555   ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
556   ack_msg->receive_window_remaining = 
557     htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
558
559   /* Request MESH for sending ACK */
560   GNUNET_MESH_notify_transmit_ready (socket->tunnel,
561                                      0, /* Corking */
562                                      1, /* Priority */
563                                      socket->retransmit_timeout,
564                                      &socket->other_peer,
565                                      ntohs (ack_msg->header.header.size),
566                                      &send_ack_notify,
567                                      ack_msg);
568
569   
570 }
571
572
573 /**
574  * Function to modify a bit in GNUNET_STREAM_AckBitmap
575  *
576  * @param bitmap the bitmap to modify
577  * @param bit the bit number to modify
578  * @param value GNUNET_YES to on, GNUNET_NO to off
579  */
580 static void
581 ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
582                       unsigned int bit, 
583                       int value)
584 {
585   GNUNET_assert (bit < 64);
586   if (GNUNET_YES == value)
587     *bitmap |= (1LL << bit);
588   else
589     *bitmap &= ~(1LL << bit);
590 }
591
592
593 /**
594  * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
595  *
596  * @param bitmap address of the bitmap that has to be checked
597  * @param bit the bit number to check
598  * @return GNUNET_YES if the bit is set; GNUNET_NO if not
599  */
600 static uint8_t
601 ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
602                       unsigned int bit)
603 {
604   GNUNET_assert (bit < 64);
605   return 0 != (*bitmap & (1LL << bit));
606 }
607
608
609
610 /**
611  * Function called when Data Message is sent
612  *
613  * @param cls the io_handle corresponding to the Data Message
614  * @param socket the socket which was used
615  */
616 static void
617 write_data_finish_cb (void *cls,
618                       struct GNUNET_STREAM_Socket *socket)
619 {
620   struct GNUNET_STREAM_IOWriteHandle *io_handle = cls;
621
622   io_handle->sent_packets++;
623 }
624
625
626 /**
627  * Writes data using the given socket. The amount of data written is limited by
628  * the receive_window_size
629  *
630  * @param socket the socket to use
631  */
632 static void 
633 write_data (struct GNUNET_STREAM_Socket *socket)
634 {
635   struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
636   unsigned int packet;
637   int ack_packet;
638
639   ack_packet = -1;
640   /* Find the last acknowledged packet */
641   for (packet=0; packet < 64; packet++)
642     {      
643       if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
644                                               packet))
645         ack_packet = packet;        
646       else if (NULL == io_handle->messages[packet])
647         break;
648     }
649   /* Resend packets which weren't ack'ed */
650   for (packet=0; packet < ack_packet; packet++)
651     {
652       if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
653                                              packet))
654         {
655           queue_message (socket,
656                          &io_handle->messages[packet]->header,
657                          NULL,
658                          NULL);
659         }
660     }
661   packet = ack_packet + 1;
662   /* Now send new packets if there is enough buffer space */
663   while ( (NULL != io_handle->messages[packet]) &&
664           (socket->receive_window_available >= ntohs (io_handle->messages[packet]->header.header.size)) )
665     {
666       socket->receive_window_available -= ntohs (io_handle->messages[packet]->header.header.size);
667       queue_message (socket,
668                      &io_handle->messages[packet]->header,
669                      &write_data_finish_cb,
670                      io_handle);
671       packet++;
672     }
673 }
674
675
676 /**
677  * Task for calling the read processor
678  *
679  * @param cls the socket
680  * @param tc the task context
681  */
682 static void
683 call_read_processor_task (void *cls,
684                           const struct GNUNET_SCHEDULER_TaskContext *tc)
685 {
686   struct GNUNET_STREAM_Socket *socket = cls;
687   size_t read_size;
688   size_t valid_read_size;
689
690   socket->read_task = GNUNET_SCHEDULER_NO_TASK;
691   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
692     return;
693
694   GNUNET_assert (NULL != socket->read_handle);
695   GNUNET_assert (NULL != socket->read_handle->proc);
696   GNUNET_assert (NULL != socket->copy_buffer);
697   GNUNET_assert (0 != socket->copy_buffer_size);
698
699   valid_read_size = socket->copy_buffer_size - socket->copy_buffer_read_offset;
700   GNUNET_assert (0 != valid_read_size);
701
702   /* Cancel the read_io_timeout_task */
703   GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task);
704   /* Call the data processor */
705   read_size = socket->read_handle->proc (socket->read_handle->proc_cls,
706                                          socket->status,
707                                          socket->copy_buffer 
708                                          + socket->copy_buffer_read_offset,
709                                          valid_read_size);
710
711   GNUNET_assert (read_size <= valid_read_size);
712   socket->copy_buffer_read_offset += read_size;
713
714   /* Free the copy buffer once it has been read entirely */
715   if (socket->copy_buffer_read_offset == socket->copy_buffer_size)
716     {
717       GNUNET_free (socket->copy_buffer);
718       socket->copy_buffer = NULL;
719       socket->copy_buffer_size = 0;
720       socket->copy_buffer_read_offset = 0;
721     }
722
723   /* Free the read handle */
724   GNUNET_free (socket->read_handle);
725   socket->read_handle = NULL;
726 }
727
728
729 /**
730  * Prepares the receive buffer for possible reads; Should only be called when
731  * there is a valid READ io request pending and socket->copy_buffer is empty
732  *
733  * @param socket the socket pointer
734  */
735 static void 
736 prepare_buffer_for_read (struct GNUNET_STREAM_Socket *socket)
737 {
738   unsigned int packet;
739   uint32_t offset_increase;
740   uint32_t sequence_increase;
741
742   GNUNET_assert (NULL == socket->copy_buffer);
743   GNUNET_assert (NULL != socket->read_handle);
744
745   /* Check the bitmap for any holes */
746   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
747     {
748       if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
749                                              packet))
750         break;
751     }
752
753   sequence_increase = packet;
754
755   if (0 == sequence_increase)              /* The first packet is still missing */
756     {
757       return;
758     }
759   
760   /* Copy data to copy buffer */
761   GNUNET_assert (0 < socket->receive_buffer_boundaries[sequence_increase-1]);
762   socket->copy_buffer = 
763     GNUNET_malloc (socket->receive_buffer_boundaries[sequence_increase-1]);
764   memcpy (socket->copy_buffer, 
765           socket->receive_buffer,
766           socket->receive_buffer_boundaries[sequence_increase-1]);
767   
768   /* Shift the data in the receive buffer */
769   memmove (socket->receive_buffer,
770            socket->receive_buffer 
771            + socket->receive_buffer_boundaries[sequence_increase-1],
772            socket->receive_buffer_size - socket->receive_buffer_boundaries[sequence_increase-1]);
773   
774   /* Shift the bitmap */
775   socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
776   
777   /* Set read_sequence_number */
778   socket->read_sequence_number += sequence_increase;
779   
780   /* Set read_offset */
781   offset_increase = socket->receive_buffer_boundaries[sequence_increase-1];
782   socket->read_offset += offset_increase;
783   
784   /* Fix relative boundaries */
785   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
786     {
787       if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase)
788         {
789           socket->receive_buffer_boundaries[packet] = 
790             socket->receive_buffer_boundaries[packet + sequence_increase] 
791             - offset_increase;
792         }
793       else
794         socket->receive_buffer_boundaries[packet] = 0;
795     }
796   
797   socket->read_task = GNUNET_SCHEDULER_add_now (&call_read_processor_task,
798                                                 socket);
799 }
800
801
802
803 /**
804  * Cancels the existing read io handle
805  *
806  * @param cls the closure from the SCHEDULER call
807  * @param tc the task context
808  */
809 static void
810 cancel_read_io (void *cls, 
811                 const struct GNUNET_SCHEDULER_TaskContext *tc)
812 {
813   struct GNUNET_STREAM_Socket *socket = cls;
814
815   if (socket->read_task != GNUNET_SCHEDULER_NO_TASK)
816   {
817     GNUNET_SCHEDULER_cancel (socket->read_task);
818     socket->read_task = GNUNET_SCHEDULER_NO_TASK;
819   }
820   GNUNET_assert (NULL != socket->read_handle);
821   
822   GNUNET_free (socket->read_handle);
823   socket->read_handle = NULL;
824 }
825
826
827 /**
828  * Handler for DATA messages; Same for both client and server
829  *
830  * @param socket the socket through which the ack was received
831  * @param tunnel connection to the other end
832  * @param sender who sent the message
833  * @param msg the data message
834  * @param atsi performance data for the connection
835  * @return GNUNET_OK to keep the connection open,
836  *         GNUNET_SYSERR to close it (signal serious error)
837  */
838 static int
839 handle_data (struct GNUNET_STREAM_Socket *socket,
840              struct GNUNET_MESH_Tunnel *tunnel,
841              const struct GNUNET_PeerIdentity *sender,
842              const struct GNUNET_STREAM_DataMessage *msg,
843              const struct GNUNET_ATS_Information*atsi)
844 {
845   const void *payload;
846   uint32_t bytes_needed;
847   uint32_t relative_offset;
848   uint32_t relative_sequence_number;
849   uint16_t size;
850
851   size = htons (msg->header.header.size);
852   if (size < sizeof (struct GNUNET_STREAM_DataMessage))
853     {
854       GNUNET_break_op (0);
855       return GNUNET_SYSERR;
856     }
857
858   switch (socket->state)
859     {
860     case STATE_ESTABLISHED:
861     case STATE_TRANSMIT_CLOSED:
862     case STATE_TRANSMIT_CLOSE_WAIT:
863
864       /* check if the message's sequence number is in the range we are
865          expecting */
866       relative_sequence_number = 
867         ntohl (msg->sequence_number) - socket->read_sequence_number;
868       if ( relative_sequence_number > 64)
869         {
870           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
871                       "Ignoring received message with sequence number %d",
872                       ntohl (msg->sequence_number));
873           return GNUNET_YES;
874         }
875
876       /* Check if we have to allocate the buffer */
877       size -= sizeof (struct GNUNET_STREAM_DataMessage);
878       relative_offset = ntohl (msg->offset) - socket->read_offset;
879       bytes_needed = relative_offset + size;
880       
881       if (bytes_needed > socket->receive_buffer_size)
882         {
883           if (bytes_needed <= RECEIVE_BUFFER_SIZE)
884             {
885               socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
886                                                        bytes_needed);
887               socket->receive_buffer_size = bytes_needed;
888             }
889           else
890             {
891               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
892                           "Cannot accommodate packet %d as buffer is full\n",
893                           ntohl (msg->sequence_number));
894               return GNUNET_YES;
895             }
896         }
897       
898       /* Copy Data to buffer */
899       payload = &msg[1];
900       GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
901       memcpy (socket->receive_buffer + relative_offset,
902               payload,
903               size);
904       socket->receive_buffer_boundaries[relative_sequence_number] = 
905         relative_offset + size;
906       
907       /* Modify the ACK bitmap */
908       ackbitmap_modify_bit (&socket->ack_bitmap,
909                             relative_sequence_number,
910                             GNUNET_YES);
911
912       /* Start ACK sending task if one is not already present */
913       if (0 == socket->ack_task_id)
914        {
915          socket->ack_task_id = 
916            GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
917                                          (msg->ack_deadline),
918                                          &ack_task,
919                                          socket);
920        }
921
922       if ((NULL != socket->read_handle) /* A read handle is waiting */
923           && (NULL == socket->copy_buffer)) /* And the copy buffer is empty */
924         {
925           prepare_buffer_for_read (socket);
926         }
927       
928       break;
929
930     default:
931       /* FIXME: call statistics */
932       break;
933     }
934   return GNUNET_YES;
935 }
936
937 /**
938  * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
939  *
940  * @param cls the socket (set from GNUNET_MESH_connect)
941  * @param tunnel connection to the other end
942  * @param tunnel_ctx place to store local state associated with the tunnel
943  * @param sender who sent the message
944  * @param message the actual message
945  * @param atsi performance data for the connection
946  * @return GNUNET_OK to keep the connection open,
947  *         GNUNET_SYSERR to close it (signal serious error)
948  */
949 static int
950 client_handle_data (void *cls,
951              struct GNUNET_MESH_Tunnel *tunnel,
952              void **tunnel_ctx,
953              const struct GNUNET_PeerIdentity *sender,
954              const struct GNUNET_MessageHeader *message,
955              const struct GNUNET_ATS_Information*atsi)
956 {
957   struct GNUNET_STREAM_Socket *socket = cls;
958
959   return handle_data (socket, 
960                       tunnel, 
961                       sender, 
962                       (const struct GNUNET_STREAM_DataMessage *) message, 
963                       atsi);
964 }
965
966
967 /**
968  * Callback to set state to ESTABLISHED
969  *
970  * @param cls the closure from queue_message FIXME: document
971  * @param socket the socket to requiring state change
972  */
973 static void
974 set_state_established (void *cls,
975                        struct GNUNET_STREAM_Socket *socket)
976 {
977   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Attaining ESTABLISHED state\n");
978   socket->write_offset = 0;
979   socket->read_offset = 0;
980   socket->state = STATE_ESTABLISHED;
981 }
982
983
984 /**
985  * Callback to set state to HELLO_WAIT
986  *
987  * @param cls the closure from queue_message
988  * @param socket the socket to requiring state change
989  */
990 static void
991 set_state_hello_wait (void *cls,
992                       struct GNUNET_STREAM_Socket *socket)
993 {
994   GNUNET_assert (STATE_INIT == socket->state);
995   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Attaining HELLO_WAIT state\n");
996   socket->state = STATE_HELLO_WAIT;
997 }
998
999
1000 /**
1001  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1002  *
1003  * @param cls the socket (set from GNUNET_MESH_connect)
1004  * @param tunnel connection to the other end
1005  * @param tunnel_ctx this is NULL
1006  * @param sender who sent the message
1007  * @param message the actual message
1008  * @param atsi performance data for the connection
1009  * @return GNUNET_OK to keep the connection open,
1010  *         GNUNET_SYSERR to close it (signal serious error)
1011  */
1012 static int
1013 client_handle_hello_ack (void *cls,
1014                          struct GNUNET_MESH_Tunnel *tunnel,
1015                          void **tunnel_ctx,
1016                          const struct GNUNET_PeerIdentity *sender,
1017                          const struct GNUNET_MessageHeader *message,
1018                          const struct GNUNET_ATS_Information*atsi)
1019 {
1020   struct GNUNET_STREAM_Socket *socket = cls;
1021   const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
1022   struct GNUNET_STREAM_HelloAckMessage *reply;
1023
1024   ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
1025   GNUNET_assert (socket->tunnel == tunnel);
1026   switch (socket->state)
1027   {
1028   case STATE_HELLO_WAIT:
1029       socket->read_sequence_number = ntohl (ack_msg->sequence_number);
1030       socket->receive_window_available = ntohl (ack_msg->receive_window_size);
1031       /* Get the random sequence number */
1032       socket->write_sequence_number = 
1033         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1034       reply = 
1035         GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1036       reply->header.header.size = 
1037         htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1038       reply->header.header.type = 
1039         htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1040       reply->sequence_number = htonl (socket->write_sequence_number);
1041       reply->receive_window_size = htonl (RECEIVE_BUFFER_SIZE);
1042       queue_message (socket, 
1043                      &reply->header, 
1044                      &set_state_established, 
1045                      NULL);      
1046       return GNUNET_OK;
1047   case STATE_ESTABLISHED:
1048   case STATE_RECEIVE_CLOSE_WAIT:
1049     // call statistics (# ACKs ignored++)
1050     return GNUNET_OK;
1051   case STATE_INIT:
1052   default:
1053     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1054                 "Server sent HELLO_ACK when in state %d\n", socket->state);
1055     socket->state = STATE_CLOSED; // introduce STATE_ERROR?
1056     return GNUNET_SYSERR;
1057   }
1058
1059 }
1060
1061
1062 /**
1063  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1064  *
1065  * @param cls the socket (set from GNUNET_MESH_connect)
1066  * @param tunnel connection to the other end
1067  * @param tunnel_ctx this is NULL
1068  * @param sender who sent the message
1069  * @param message the actual message
1070  * @param atsi performance data for the connection
1071  * @return GNUNET_OK to keep the connection open,
1072  *         GNUNET_SYSERR to close it (signal serious error)
1073  */
1074 static int
1075 client_handle_reset (void *cls,
1076                      struct GNUNET_MESH_Tunnel *tunnel,
1077                      void **tunnel_ctx,
1078                      const struct GNUNET_PeerIdentity *sender,
1079                      const struct GNUNET_MessageHeader *message,
1080                      const struct GNUNET_ATS_Information*atsi)
1081 {
1082   struct GNUNET_STREAM_Socket *socket = cls;
1083
1084   return GNUNET_OK;
1085 }
1086
1087
1088 /**
1089  * Common message handler for handling TRANSMIT_CLOSE messages
1090  *
1091  * @param socket the socket through which the ack was received
1092  * @param tunnel connection to the other end
1093  * @param sender who sent the message
1094  * @param msg the transmit close message
1095  * @param atsi performance data for the connection
1096  * @return GNUNET_OK to keep the connection open,
1097  *         GNUNET_SYSERR to close it (signal serious error)
1098  */
1099 static int
1100 handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
1101                        struct GNUNET_MESH_Tunnel *tunnel,
1102                        const struct GNUNET_PeerIdentity *sender,
1103                        const struct GNUNET_STREAM_MessageHeader *msg,
1104                        const struct GNUNET_ATS_Information*atsi)
1105 {
1106   struct GNUNET_STREAM_MessageHeader *reply;
1107
1108   switch (socket->state)
1109     {
1110     case STATE_ESTABLISHED:
1111       socket->state = STATE_RECEIVE_CLOSED;
1112
1113       /* Send TRANSMIT_CLOSE_ACK */
1114       reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1115       reply->header.type = 
1116         htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
1117       reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1118       queue_message (socket, reply, NULL, NULL);
1119       break;
1120
1121     default:
1122       /* FIXME: Call statistics? */
1123       break;
1124     }
1125   return GNUNET_YES;
1126 }
1127
1128
1129 /**
1130  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1131  *
1132  * @param cls the socket (set from GNUNET_MESH_connect)
1133  * @param tunnel connection to the other end
1134  * @param tunnel_ctx this is NULL
1135  * @param sender who sent the message
1136  * @param message the actual message
1137  * @param atsi performance data for the connection
1138  * @return GNUNET_OK to keep the connection open,
1139  *         GNUNET_SYSERR to close it (signal serious error)
1140  */
1141 static int
1142 client_handle_transmit_close (void *cls,
1143                               struct GNUNET_MESH_Tunnel *tunnel,
1144                               void **tunnel_ctx,
1145                               const struct GNUNET_PeerIdentity *sender,
1146                               const struct GNUNET_MessageHeader *message,
1147                               const struct GNUNET_ATS_Information*atsi)
1148 {
1149   struct GNUNET_STREAM_Socket *socket = cls;
1150   
1151   return handle_transmit_close (socket,
1152                                 tunnel,
1153                                 sender,
1154                                 (struct GNUNET_STREAM_MessageHeader *)message,
1155                                 atsi);
1156 }
1157
1158
1159 /**
1160  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1161  *
1162  * @param cls the socket (set from GNUNET_MESH_connect)
1163  * @param tunnel connection to the other end
1164  * @param tunnel_ctx this is NULL
1165  * @param sender who sent the message
1166  * @param message the actual message
1167  * @param atsi performance data for the connection
1168  * @return GNUNET_OK to keep the connection open,
1169  *         GNUNET_SYSERR to close it (signal serious error)
1170  */
1171 static int
1172 client_handle_transmit_close_ack (void *cls,
1173                                   struct GNUNET_MESH_Tunnel *tunnel,
1174                                   void **tunnel_ctx,
1175                                   const struct GNUNET_PeerIdentity *sender,
1176                                   const struct GNUNET_MessageHeader *message,
1177                                   const struct GNUNET_ATS_Information*atsi)
1178 {
1179   struct GNUNET_STREAM_Socket *socket = cls;
1180
1181   return GNUNET_OK;
1182 }
1183
1184
1185 /**
1186  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1187  *
1188  * @param cls the socket (set from GNUNET_MESH_connect)
1189  * @param tunnel connection to the other end
1190  * @param tunnel_ctx this is NULL
1191  * @param sender who sent the message
1192  * @param message the actual message
1193  * @param atsi performance data for the connection
1194  * @return GNUNET_OK to keep the connection open,
1195  *         GNUNET_SYSERR to close it (signal serious error)
1196  */
1197 static int
1198 client_handle_receive_close (void *cls,
1199                              struct GNUNET_MESH_Tunnel *tunnel,
1200                              void **tunnel_ctx,
1201                              const struct GNUNET_PeerIdentity *sender,
1202                              const struct GNUNET_MessageHeader *message,
1203                              const struct GNUNET_ATS_Information*atsi)
1204 {
1205   struct GNUNET_STREAM_Socket *socket = cls;
1206
1207   return GNUNET_OK;
1208 }
1209
1210
1211 /**
1212  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1213  *
1214  * @param cls the socket (set from GNUNET_MESH_connect)
1215  * @param tunnel connection to the other end
1216  * @param tunnel_ctx this is NULL
1217  * @param sender who sent the message
1218  * @param message the actual message
1219  * @param atsi performance data for the connection
1220  * @return GNUNET_OK to keep the connection open,
1221  *         GNUNET_SYSERR to close it (signal serious error)
1222  */
1223 static int
1224 client_handle_receive_close_ack (void *cls,
1225                                  struct GNUNET_MESH_Tunnel *tunnel,
1226                                  void **tunnel_ctx,
1227                                  const struct GNUNET_PeerIdentity *sender,
1228                                  const struct GNUNET_MessageHeader *message,
1229                                  const struct GNUNET_ATS_Information*atsi)
1230 {
1231   struct GNUNET_STREAM_Socket *socket = cls;
1232
1233   return GNUNET_OK;
1234 }
1235
1236
1237 /**
1238  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1239  *
1240  * @param cls the socket (set from GNUNET_MESH_connect)
1241  * @param tunnel connection to the other end
1242  * @param tunnel_ctx this is NULL
1243  * @param sender who sent the message
1244  * @param message the actual message
1245  * @param atsi performance data for the connection
1246  * @return GNUNET_OK to keep the connection open,
1247  *         GNUNET_SYSERR to close it (signal serious error)
1248  */
1249 static int
1250 client_handle_close (void *cls,
1251                      struct GNUNET_MESH_Tunnel *tunnel,
1252                      void **tunnel_ctx,
1253                      const struct GNUNET_PeerIdentity *sender,
1254                      const struct GNUNET_MessageHeader *message,
1255                      const struct GNUNET_ATS_Information*atsi)
1256 {
1257   struct GNUNET_STREAM_Socket *socket = cls;
1258
1259   return GNUNET_OK;
1260 }
1261
1262
1263 /**
1264  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1265  *
1266  * @param cls the socket (set from GNUNET_MESH_connect)
1267  * @param tunnel connection to the other end
1268  * @param tunnel_ctx this is NULL
1269  * @param sender who sent the message
1270  * @param message the actual message
1271  * @param atsi performance data for the connection
1272  * @return GNUNET_OK to keep the connection open,
1273  *         GNUNET_SYSERR to close it (signal serious error)
1274  */
1275 static int
1276 client_handle_close_ack (void *cls,
1277                          struct GNUNET_MESH_Tunnel *tunnel,
1278                          void **tunnel_ctx,
1279                          const struct GNUNET_PeerIdentity *sender,
1280                          const struct GNUNET_MessageHeader *message,
1281                          const struct GNUNET_ATS_Information*atsi)
1282 {
1283   struct GNUNET_STREAM_Socket *socket = cls;
1284
1285   return GNUNET_OK;
1286 }
1287
1288 /*****************************/
1289 /* Server's Message Handlers */
1290 /*****************************/
1291
1292 /**
1293  * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1294  *
1295  * @param cls the closure
1296  * @param tunnel connection to the other end
1297  * @param tunnel_ctx the socket
1298  * @param sender who sent the message
1299  * @param message the actual message
1300  * @param atsi performance data for the connection
1301  * @return GNUNET_OK to keep the connection open,
1302  *         GNUNET_SYSERR to close it (signal serious error)
1303  */
1304 static int
1305 server_handle_data (void *cls,
1306                     struct GNUNET_MESH_Tunnel *tunnel,
1307                     void **tunnel_ctx,
1308                     const struct GNUNET_PeerIdentity *sender,
1309                     const struct GNUNET_MessageHeader *message,
1310                     const struct GNUNET_ATS_Information*atsi)
1311 {
1312   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1313
1314   return handle_data (socket,
1315                       tunnel,
1316                       sender,
1317                       (const struct GNUNET_STREAM_DataMessage *)message,
1318                       atsi);
1319 }
1320
1321
1322 /**
1323  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
1324  *
1325  * @param cls the closure
1326  * @param tunnel connection to the other end
1327  * @param tunnel_ctx the socket
1328  * @param sender who sent the message
1329  * @param message the actual message
1330  * @param atsi performance data for the connection
1331  * @return GNUNET_OK to keep the connection open,
1332  *         GNUNET_SYSERR to close it (signal serious error)
1333  */
1334 static int
1335 server_handle_hello (void *cls,
1336                      struct GNUNET_MESH_Tunnel *tunnel,
1337                      void **tunnel_ctx,
1338                      const struct GNUNET_PeerIdentity *sender,
1339                      const struct GNUNET_MessageHeader *message,
1340                      const struct GNUNET_ATS_Information*atsi)
1341 {
1342   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1343   struct GNUNET_STREAM_HelloAckMessage *reply;
1344
1345   GNUNET_assert (socket->tunnel == tunnel);
1346   if (STATE_INIT == socket->state)
1347     {
1348       /* Get the random sequence number */
1349       socket->write_sequence_number = 
1350         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1351       reply = 
1352         GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1353       reply->header.header.size = 
1354         htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1355       reply->header.header.type = 
1356         htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1357       reply->sequence_number = htonl (socket->write_sequence_number);
1358       queue_message (socket, 
1359                      &reply->header,
1360                      &set_state_hello_wait, 
1361                      NULL);
1362     }
1363   else
1364     {
1365       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1366                   "Client sent HELLO when in state %d\n", socket->state);
1367       /* FIXME: Send RESET? */
1368       
1369     }
1370   return GNUNET_OK;
1371 }
1372
1373
1374 /**
1375  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1376  *
1377  * @param cls the closure
1378  * @param tunnel connection to the other end
1379  * @param tunnel_ctx the socket
1380  * @param sender who sent the message
1381  * @param message the actual message
1382  * @param atsi performance data for the connection
1383  * @return GNUNET_OK to keep the connection open,
1384  *         GNUNET_SYSERR to close it (signal serious error)
1385  */
1386 static int
1387 server_handle_hello_ack (void *cls,
1388                          struct GNUNET_MESH_Tunnel *tunnel,
1389                          void **tunnel_ctx,
1390                          const struct GNUNET_PeerIdentity *sender,
1391                          const struct GNUNET_MessageHeader *message,
1392                          const struct GNUNET_ATS_Information*atsi)
1393 {
1394   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1395   const struct GNUNET_STREAM_HelloAckMessage *ack_message;
1396
1397   ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
1398   GNUNET_assert (socket->tunnel == tunnel);
1399   if (STATE_HELLO_WAIT == socket->state)
1400     {
1401       socket->read_sequence_number = ntohl (ack_message->sequence_number);
1402       socket->receive_window_available = 
1403         ntohl (ack_message->receive_window_size);
1404       /* Attain ESTABLISHED state */
1405       set_state_established (NULL, socket);
1406     }
1407   else
1408     {
1409       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1410                   "Client sent HELLO_ACK when in state %d\n", socket->state);
1411       /* FIXME: Send RESET? */
1412       
1413     }
1414   return GNUNET_OK;
1415 }
1416
1417
1418 /**
1419  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1420  *
1421  * @param cls the closure
1422  * @param tunnel connection to the other end
1423  * @param tunnel_ctx the socket
1424  * @param sender who sent the message
1425  * @param message the actual message
1426  * @param atsi performance data for the connection
1427  * @return GNUNET_OK to keep the connection open,
1428  *         GNUNET_SYSERR to close it (signal serious error)
1429  */
1430 static int
1431 server_handle_reset (void *cls,
1432                      struct GNUNET_MESH_Tunnel *tunnel,
1433                      void **tunnel_ctx,
1434                      const struct GNUNET_PeerIdentity *sender,
1435                      const struct GNUNET_MessageHeader *message,
1436                      const struct GNUNET_ATS_Information*atsi)
1437 {
1438   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1439
1440   return GNUNET_OK;
1441 }
1442
1443
1444 /**
1445  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1446  *
1447  * @param cls the closure
1448  * @param tunnel connection to the other end
1449  * @param tunnel_ctx the socket
1450  * @param sender who sent the message
1451  * @param message the actual message
1452  * @param atsi performance data for the connection
1453  * @return GNUNET_OK to keep the connection open,
1454  *         GNUNET_SYSERR to close it (signal serious error)
1455  */
1456 static int
1457 server_handle_transmit_close (void *cls,
1458                               struct GNUNET_MESH_Tunnel *tunnel,
1459                               void **tunnel_ctx,
1460                               const struct GNUNET_PeerIdentity *sender,
1461                               const struct GNUNET_MessageHeader *message,
1462                               const struct GNUNET_ATS_Information*atsi)
1463 {
1464   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1465
1466   return handle_transmit_close (socket,
1467                                 tunnel,
1468                                 sender,
1469                                 (struct GNUNET_STREAM_MessageHeader *)message,
1470                                 atsi);
1471 }
1472
1473
1474 /**
1475  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1476  *
1477  * @param cls the closure
1478  * @param tunnel connection to the other end
1479  * @param tunnel_ctx the socket
1480  * @param sender who sent the message
1481  * @param message the actual message
1482  * @param atsi performance data for the connection
1483  * @return GNUNET_OK to keep the connection open,
1484  *         GNUNET_SYSERR to close it (signal serious error)
1485  */
1486 static int
1487 server_handle_transmit_close_ack (void *cls,
1488                                   struct GNUNET_MESH_Tunnel *tunnel,
1489                                   void **tunnel_ctx,
1490                                   const struct GNUNET_PeerIdentity *sender,
1491                                   const struct GNUNET_MessageHeader *message,
1492                                   const struct GNUNET_ATS_Information*atsi)
1493 {
1494   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1495
1496   return GNUNET_OK;
1497 }
1498
1499
1500 /**
1501  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1502  *
1503  * @param cls the closure
1504  * @param tunnel connection to the other end
1505  * @param tunnel_ctx the socket
1506  * @param sender who sent the message
1507  * @param message the actual message
1508  * @param atsi performance data for the connection
1509  * @return GNUNET_OK to keep the connection open,
1510  *         GNUNET_SYSERR to close it (signal serious error)
1511  */
1512 static int
1513 server_handle_receive_close (void *cls,
1514                              struct GNUNET_MESH_Tunnel *tunnel,
1515                              void **tunnel_ctx,
1516                              const struct GNUNET_PeerIdentity *sender,
1517                              const struct GNUNET_MessageHeader *message,
1518                              const struct GNUNET_ATS_Information*atsi)
1519 {
1520   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1521
1522   return GNUNET_OK;
1523 }
1524
1525
1526 /**
1527  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1528  *
1529  * @param cls the closure
1530  * @param tunnel connection to the other end
1531  * @param tunnel_ctx the socket
1532  * @param sender who sent the message
1533  * @param message the actual message
1534  * @param atsi performance data for the connection
1535  * @return GNUNET_OK to keep the connection open,
1536  *         GNUNET_SYSERR to close it (signal serious error)
1537  */
1538 static int
1539 server_handle_receive_close_ack (void *cls,
1540                                  struct GNUNET_MESH_Tunnel *tunnel,
1541                                  void **tunnel_ctx,
1542                                  const struct GNUNET_PeerIdentity *sender,
1543                                  const struct GNUNET_MessageHeader *message,
1544                                  const struct GNUNET_ATS_Information*atsi)
1545 {
1546   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1547
1548   return GNUNET_OK;
1549 }
1550
1551
1552 /**
1553  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1554  *
1555  * @param cls the closure
1556  * @param tunnel connection to the other end
1557  * @param tunnel_ctx the socket
1558  * @param sender who sent the message
1559  * @param message the actual message
1560  * @param atsi performance data for the connection
1561  * @return GNUNET_OK to keep the connection open,
1562  *         GNUNET_SYSERR to close it (signal serious error)
1563  */
1564 static int
1565 server_handle_close (void *cls,
1566                      struct GNUNET_MESH_Tunnel *tunnel,
1567                      void **tunnel_ctx,
1568                      const struct GNUNET_PeerIdentity *sender,
1569                      const struct GNUNET_MessageHeader *message,
1570                      const struct GNUNET_ATS_Information*atsi)
1571 {
1572   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1573
1574   return GNUNET_OK;
1575 }
1576
1577
1578 /**
1579  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1580  *
1581  * @param cls the closure
1582  * @param tunnel connection to the other end
1583  * @param tunnel_ctx the socket
1584  * @param sender who sent the message
1585  * @param message the actual message
1586  * @param atsi performance data for the connection
1587  * @return GNUNET_OK to keep the connection open,
1588  *         GNUNET_SYSERR to close it (signal serious error)
1589  */
1590 static int
1591 server_handle_close_ack (void *cls,
1592                          struct GNUNET_MESH_Tunnel *tunnel,
1593                          void **tunnel_ctx,
1594                          const struct GNUNET_PeerIdentity *sender,
1595                          const struct GNUNET_MessageHeader *message,
1596                          const struct GNUNET_ATS_Information*atsi)
1597 {
1598   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1599
1600   return GNUNET_OK;
1601 }
1602
1603
1604 /**
1605  * Message Handler for mesh
1606  *
1607  * @param socket the socket through which the ack was received
1608  * @param tunnel connection to the other end
1609  * @param sender who sent the message
1610  * @param ack the acknowledgment message
1611  * @param atsi performance data for the connection
1612  * @return GNUNET_OK to keep the connection open,
1613  *         GNUNET_SYSERR to close it (signal serious error)
1614  */
1615 static int
1616 handle_ack (struct GNUNET_STREAM_Socket *socket,
1617             struct GNUNET_MESH_Tunnel *tunnel,
1618             const struct GNUNET_PeerIdentity *sender,
1619             const struct GNUNET_STREAM_AckMessage *ack,
1620             const struct GNUNET_ATS_Information*atsi)
1621 {
1622   switch (socket->state)
1623     {
1624     case (STATE_ESTABLISHED):
1625       if (NULL == socket->write_handle)
1626         {
1627           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1628                       "Received DATA ACK when write_handle is NULL\n");
1629           return GNUNET_OK;
1630         }
1631
1632       socket->write_handle->ack_bitmap = GNUNET_ntohll (ack->bitmap);
1633       socket->receive_window_available = 
1634         ntohl (ack->receive_window_remaining);
1635       write_data (socket);
1636       break;
1637     default:
1638       break;
1639     }
1640   return GNUNET_OK;
1641 }
1642
1643
1644 /**
1645  * Message Handler for mesh
1646  *
1647  * @param cls the 'struct GNUNET_STREAM_Socket'
1648  * @param tunnel connection to the other end
1649  * @param tunnel_ctx unused
1650  * @param sender who sent the message
1651  * @param message the actual message
1652  * @param atsi performance data for the connection
1653  * @return GNUNET_OK to keep the connection open,
1654  *         GNUNET_SYSERR to close it (signal serious error)
1655  */
1656 static int
1657 client_handle_ack (void *cls,
1658                    struct GNUNET_MESH_Tunnel *tunnel,
1659                    void **tunnel_ctx,
1660                    const struct GNUNET_PeerIdentity *sender,
1661                    const struct GNUNET_MessageHeader *message,
1662                    const struct GNUNET_ATS_Information*atsi)
1663 {
1664   struct GNUNET_STREAM_Socket *socket = cls;
1665   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
1666  
1667   return handle_ack (socket, tunnel, sender, ack, atsi);
1668 }
1669
1670
1671 /**
1672  * Message Handler for mesh
1673  *
1674  * @param cls the server's listen socket
1675  * @param tunnel connection to the other end
1676  * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
1677  * @param sender who sent the message
1678  * @param message the actual message
1679  * @param atsi performance data for the connection
1680  * @return GNUNET_OK to keep the connection open,
1681  *         GNUNET_SYSERR to close it (signal serious error)
1682  */
1683 static int
1684 server_handle_ack (void *cls,
1685                    struct GNUNET_MESH_Tunnel *tunnel,
1686                    void **tunnel_ctx,
1687                    const struct GNUNET_PeerIdentity *sender,
1688                    const struct GNUNET_MessageHeader *message,
1689                    const struct GNUNET_ATS_Information*atsi)
1690 {
1691   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1692   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
1693  
1694   return handle_ack (socket, tunnel, sender, ack, atsi);
1695 }
1696
1697
1698 /**
1699  * For client message handlers, the stream socket is in the
1700  * closure argument.
1701  */
1702 static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
1703   {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
1704   {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
1705    sizeof (struct GNUNET_STREAM_AckMessage) },
1706   {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
1707    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
1708   {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
1709    sizeof (struct GNUNET_STREAM_MessageHeader)},
1710   {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
1711    sizeof (struct GNUNET_STREAM_MessageHeader)},
1712   {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
1713    sizeof (struct GNUNET_STREAM_MessageHeader)},
1714   {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
1715    sizeof (struct GNUNET_STREAM_MessageHeader)},
1716   {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
1717    sizeof (struct GNUNET_STREAM_MessageHeader)},
1718   {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
1719    sizeof (struct GNUNET_STREAM_MessageHeader)},
1720   {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
1721    sizeof (struct GNUNET_STREAM_MessageHeader)},
1722   {NULL, 0, 0}
1723 };
1724
1725
1726 /**
1727  * For server message handlers, the stream socket is in the
1728  * tunnel context, and the listen socket in the closure argument.
1729  */
1730 static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
1731   {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
1732   {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
1733    sizeof (struct GNUNET_STREAM_AckMessage) },
1734   {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 
1735    sizeof (struct GNUNET_STREAM_MessageHeader)},
1736   {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
1737    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
1738   {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
1739    sizeof (struct GNUNET_STREAM_MessageHeader)},
1740   {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
1741    sizeof (struct GNUNET_STREAM_MessageHeader)},
1742   {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
1743    sizeof (struct GNUNET_STREAM_MessageHeader)},
1744   {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
1745    sizeof (struct GNUNET_STREAM_MessageHeader)},
1746   {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
1747    sizeof (struct GNUNET_STREAM_MessageHeader)},
1748   {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
1749    sizeof (struct GNUNET_STREAM_MessageHeader)},
1750   {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
1751    sizeof (struct GNUNET_STREAM_MessageHeader)},
1752   {NULL, 0, 0}
1753 };
1754
1755
1756 /**
1757  * Function called when our target peer is connected to our tunnel
1758  *
1759  * @param cls the socket for which this tunnel is created
1760  * @param peer the peer identity of the target
1761  * @param atsi performance data for the connection
1762  */
1763 static void
1764 mesh_peer_connect_callback (void *cls,
1765                             const struct GNUNET_PeerIdentity *peer,
1766                             const struct GNUNET_ATS_Information * atsi)
1767 {
1768   struct GNUNET_STREAM_Socket *socket = cls;
1769   struct GNUNET_STREAM_MessageHeader *message;
1770
1771   if (0 != memcmp (&socket->other_peer, 
1772                    peer, 
1773                    sizeof (struct GNUNET_PeerIdentity)))
1774     {
1775       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1776                   "A peer (%s) which is not our target has connected to our tunnel", 
1777                   GNUNET_i2s (peer));
1778       return;
1779     }
1780   
1781   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1782               "Target peer %s connected\n", GNUNET_i2s (peer));
1783   
1784   /* Set state to INIT */
1785   socket->state = STATE_INIT;
1786
1787   /* Send HELLO message */
1788   message = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1789   message->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
1790   message->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1791   queue_message (socket,
1792                  message,
1793                  &set_state_hello_wait,
1794                  NULL);
1795
1796   /* Call open callback */
1797   if (NULL == socket->open_cb)
1798     {
1799       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1800                   "STREAM_open callback is NULL\n");
1801     }
1802   else
1803     {
1804       socket->open_cb (socket->open_cls, socket);
1805     }
1806 }
1807
1808
1809 /**
1810  * Function called when our target peer is disconnected from our tunnel
1811  *
1812  * @param cls the socket associated which this tunnel
1813  * @param peer the peer identity of the target
1814  */
1815 static void
1816 mesh_peer_disconnect_callback (void *cls,
1817                                const struct GNUNET_PeerIdentity *peer)
1818 {
1819
1820 }
1821
1822
1823 /*****************/
1824 /* API functions */
1825 /*****************/
1826
1827
1828 /**
1829  * Tries to open a stream to the target peer
1830  *
1831  * @param cfg configuration to use
1832  * @param target the target peer to which the stream has to be opened
1833  * @param app_port the application port number which uniquely identifies this
1834  *            stream
1835  * @param open_cb this function will be called after stream has be established 
1836  * @param open_cb_cls the closure for open_cb
1837  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
1838  * @return if successful it returns the stream socket; NULL if stream cannot be
1839  *         opened 
1840  */
1841 struct GNUNET_STREAM_Socket *
1842 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
1843                     const struct GNUNET_PeerIdentity *target,
1844                     GNUNET_MESH_ApplicationType app_port,
1845                     GNUNET_STREAM_OpenCallback open_cb,
1846                     void *open_cb_cls,
1847                     ...)
1848 {
1849   struct GNUNET_STREAM_Socket *socket;
1850   enum GNUNET_STREAM_Option option;
1851   va_list vargs;                /* Variable arguments */
1852
1853   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
1854   socket->other_peer = *target;
1855   socket->open_cb = open_cb;
1856   socket->open_cls = open_cb_cls;
1857
1858   /* Set defaults */
1859   socket->retransmit_timeout = 
1860     GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
1861
1862   va_start (vargs, open_cb_cls); /* Parse variable args */
1863   do {
1864     option = va_arg (vargs, enum GNUNET_STREAM_Option);
1865     switch (option)
1866       {
1867       case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
1868         /* Expect struct GNUNET_TIME_Relative */
1869         socket->retransmit_timeout = va_arg (vargs,
1870                                              struct GNUNET_TIME_Relative);
1871         break;
1872       case GNUNET_STREAM_OPTION_END:
1873         break;
1874       }
1875   } while (GNUNET_STREAM_OPTION_END != option);
1876   va_end (vargs);               /* End of variable args parsing */
1877
1878   socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
1879                                       1,  /* QUEUE size as parameter? */
1880                                       socket, /* cls */
1881                                       NULL, /* No inbound tunnel handler */
1882                                       NULL, /* No inbound tunnel cleaner */
1883                                       client_message_handlers,
1884                                       NULL); /* We don't get inbound tunnels */
1885   // FIXME: if (NULL == socket->mesh) ...
1886
1887   /* Now create the mesh tunnel to target */
1888   socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
1889                                               NULL, /* Tunnel context */
1890                                               &mesh_peer_connect_callback,
1891                                               &mesh_peer_disconnect_callback,
1892                                               socket);
1893   // FIXME: if (NULL == socket->tunnel) ...
1894
1895   return socket;
1896 }
1897
1898
1899 /**
1900  * Closes the stream
1901  *
1902  * @param socket the stream socket
1903  */
1904 void
1905 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
1906 {
1907   struct MessageQueue *head;
1908
1909   if (socket->read_task != GNUNET_SCHEDULER_NO_TASK)
1910   {
1911     /* socket closed with read task pending!? */
1912     GNUNET_break (0);
1913     GNUNET_SCHEDULER_cancel (socket->read_task);
1914     socket->read_task = GNUNET_SCHEDULER_NO_TASK;
1915   }
1916
1917   /* Clear Transmit handles */
1918   if (NULL != socket->transmit_handle)
1919     {
1920       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
1921       socket->transmit_handle = NULL;
1922     }
1923
1924   /* Clear existing message queue */
1925   while (NULL != (head = socket->queue_head)) {
1926     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
1927                                  socket->queue_tail,
1928                                  head);
1929     GNUNET_free (head->message);
1930     GNUNET_free (head);
1931   }
1932
1933   /* Close associated tunnel */
1934   if (NULL != socket->tunnel)
1935     {
1936       GNUNET_MESH_tunnel_destroy (socket->tunnel);
1937       socket->tunnel = NULL;
1938     }
1939
1940   /* Close mesh connection */
1941   if (NULL != socket->mesh)
1942     {
1943       GNUNET_MESH_disconnect (socket->mesh);
1944       socket->mesh = NULL;
1945     }
1946   
1947   /* Release receive buffer */
1948   if (NULL != socket->receive_buffer)
1949     {
1950       GNUNET_free (socket->receive_buffer);
1951     }
1952
1953   GNUNET_free (socket);
1954 }
1955
1956
1957 /**
1958  * Method called whenever a peer creates a tunnel to us
1959  *
1960  * @param cls closure
1961  * @param tunnel new handle to the tunnel
1962  * @param initiator peer that started the tunnel
1963  * @param atsi performance information for the tunnel
1964  * @return initial tunnel context for the tunnel
1965  *         (can be NULL -- that's not an error)
1966  */
1967 static void *
1968 new_tunnel_notify (void *cls,
1969                    struct GNUNET_MESH_Tunnel *tunnel,
1970                    const struct GNUNET_PeerIdentity *initiator,
1971                    const struct GNUNET_ATS_Information *atsi)
1972 {
1973   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
1974   struct GNUNET_STREAM_Socket *socket;
1975
1976   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
1977   socket->tunnel = tunnel;
1978   socket->session_id = 0;       /* FIXME */
1979   socket->other_peer = *initiator;
1980   socket->state = STATE_INIT;
1981
1982   if (GNUNET_SYSERR == lsocket->listen_cb (lsocket->listen_cb_cls,
1983                                            socket,
1984                                            &socket->other_peer))
1985     {
1986       socket->state = STATE_CLOSED;
1987       /* FIXME: Send CLOSE message and then free */
1988       GNUNET_free (socket);
1989       GNUNET_MESH_tunnel_destroy (tunnel); /* Destroy the tunnel */
1990     }
1991   return socket;
1992 }
1993
1994
1995 /**
1996  * Function called whenever an inbound tunnel is destroyed.  Should clean up
1997  * any associated state.  This function is NOT called if the client has
1998  * explicitly asked for the tunnel to be destroyed using
1999  * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
2000  * the tunnel.
2001  *
2002  * @param cls closure (set from GNUNET_MESH_connect)
2003  * @param tunnel connection to the other end (henceforth invalid)
2004  * @param tunnel_ctx place where local state associated
2005  *                   with the tunnel is stored
2006  */
2007 static void 
2008 tunnel_cleaner (void *cls,
2009                 const struct GNUNET_MESH_Tunnel *tunnel,
2010                 void *tunnel_ctx)
2011 {
2012   struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
2013   
2014   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2015               "Peer %s has terminated connection abruptly\n",
2016               GNUNET_i2s (&socket->other_peer));
2017
2018   socket->status = GNUNET_STREAM_SHUTDOWN;
2019   /* Clear Transmit handles */
2020   if (NULL != socket->transmit_handle)
2021     {
2022       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2023       socket->transmit_handle = NULL;
2024     }
2025   socket->tunnel = NULL;
2026 }
2027
2028
2029 /**
2030  * Listens for stream connections for a specific application ports
2031  *
2032  * @param cfg the configuration to use
2033  * @param app_port the application port for which new streams will be accepted
2034  * @param listen_cb this function will be called when a peer tries to establish
2035  *            a stream with us
2036  * @param listen_cb_cls closure for listen_cb
2037  * @return listen socket, NULL for any error
2038  */
2039 struct GNUNET_STREAM_ListenSocket *
2040 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
2041                       GNUNET_MESH_ApplicationType app_port,
2042                       GNUNET_STREAM_ListenCallback listen_cb,
2043                       void *listen_cb_cls)
2044 {
2045   /* FIXME: Add variable args for passing configration options? */
2046   struct GNUNET_STREAM_ListenSocket *lsocket;
2047   GNUNET_MESH_ApplicationType app_types[2];
2048
2049   app_types[0] = app_port;
2050   app_types[1] = 0;
2051   lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
2052   lsocket->port = app_port;
2053   lsocket->listen_cb = listen_cb;
2054   lsocket->listen_cb_cls = listen_cb_cls;
2055   lsocket->mesh = GNUNET_MESH_connect (cfg,
2056                                        10, /* FIXME: QUEUE size as parameter? */
2057                                        lsocket, /* Closure */
2058                                        &new_tunnel_notify,
2059                                        &tunnel_cleaner,
2060                                        server_message_handlers,
2061                                        app_types);
2062   return lsocket;
2063 }
2064
2065
2066 /**
2067  * Closes the listen socket
2068  *
2069  * @param lsocket the listen socket
2070  */
2071 void
2072 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
2073 {
2074   /* Close MESH connection */
2075   GNUNET_MESH_disconnect (lsocket->mesh);
2076   
2077   GNUNET_free (lsocket);
2078 }
2079
2080
2081 /**
2082  * Tries to write the given data to the stream
2083  *
2084  * @param socket the socket representing a stream
2085  * @param data the data buffer from where the data is written into the stream
2086  * @param size the number of bytes to be written from the data buffer
2087  * @param timeout the timeout period
2088  * @param write_cont the function to call upon writing some bytes into the stream
2089  * @param write_cont_cls the closure
2090  * @return handle to cancel the operation
2091  */
2092 struct GNUNET_STREAM_IOWriteHandle *
2093 GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
2094                      const void *data,
2095                      size_t size,
2096                      struct GNUNET_TIME_Relative timeout,
2097                      GNUNET_STREAM_CompletionContinuation write_cont,
2098                      void *write_cont_cls)
2099 {
2100   unsigned int num_needed_packets;
2101   unsigned int packet;
2102   struct GNUNET_STREAM_IOWriteHandle *io_handle;
2103   uint32_t packet_size;
2104   uint32_t payload_size;
2105   struct GNUNET_STREAM_DataMessage *data_msg;
2106   const void *sweep;
2107
2108   /* Return NULL if there is already a write request pending */
2109   if (NULL != socket->write_handle)
2110   {
2111     GNUNET_break (0);
2112     return NULL;
2113   }
2114   if (!((STATE_ESTABLISHED == socket->state)
2115         || (STATE_RECEIVE_CLOSE_WAIT == socket->state)
2116         || (STATE_RECEIVE_CLOSED == socket->state)))
2117     {
2118       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2119                   "Attempting to write on a closed (OR) not-yet-established"
2120                   "stream\n"); 
2121       return NULL;
2122     } 
2123   if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size)
2124     size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH  * max_payload_size;
2125   num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size;
2126   io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
2127   sweep = data;
2128   /* Divide the given buffer into packets for sending */
2129   for (packet=0; packet < num_needed_packets; packet++)
2130     {
2131       if ((packet + 1) * max_payload_size < size) 
2132         {
2133           payload_size = max_payload_size;
2134           packet_size = MAX_PACKET_SIZE;
2135         }
2136       else 
2137         {
2138           payload_size = size - packet * max_payload_size;
2139           packet_size =  payload_size + sizeof (struct
2140                                                 GNUNET_STREAM_DataMessage); 
2141         }
2142       io_handle->messages[packet] = GNUNET_malloc (packet_size);
2143       io_handle->messages[packet]->header.header.size = htons (packet_size);
2144       io_handle->messages[packet]->header.header.type =
2145         htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
2146       io_handle->messages[packet]->sequence_number =
2147         htons (socket->write_sequence_number++);
2148       io_handle->messages[packet]->offset = htons (socket->write_offset);
2149
2150       /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
2151          determined from RTT */
2152       io_handle->messages[packet]->ack_deadline = 
2153         GNUNET_TIME_relative_hton (GNUNET_TIME_relative_multiply 
2154                                    (GNUNET_TIME_UNIT_SECONDS, 5));
2155       data_msg = io_handle->messages[packet];
2156       /* Copy data from given buffer to the packet */
2157       memcpy (&data_msg[1],
2158               sweep,
2159               payload_size);
2160       sweep += payload_size;
2161       socket->write_offset += payload_size;
2162     }
2163   socket->write_handle = io_handle;
2164   write_data (socket);
2165
2166   return io_handle;
2167 }
2168
2169
2170 /**
2171  * Tries to read data from the stream
2172  *
2173  * @param socket the socket representing a stream
2174  * @param timeout the timeout period
2175  * @param proc function to call with data (once only)
2176  * @param proc_cls the closure for proc
2177  * @return handle to cancel the operation
2178  */
2179 struct GNUNET_STREAM_IOReadHandle *
2180 GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
2181                     struct GNUNET_TIME_Relative timeout,
2182                     GNUNET_STREAM_DataProcessor proc,
2183                     void *proc_cls)
2184 {
2185   struct GNUNET_STREAM_IOReadHandle *read_handle;
2186   
2187   /* Return NULL if there is already a read handle; the user has to cancel that
2188   first before continuing or has to wait until it is completed */
2189   if (NULL != socket->read_handle) return NULL;
2190
2191   read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
2192   read_handle->proc = proc;
2193   socket->read_handle = read_handle;
2194
2195   /* if previous copy buffer is still not read call the data processor on it */
2196   if (NULL != socket->copy_buffer)
2197     socket->read_task = GNUNET_SCHEDULER_add_now (&call_read_processor_task,
2198                                                   socket);
2199   else
2200     prepare_buffer_for_read (socket);
2201
2202   socket->read_io_timeout_task = GNUNET_SCHEDULER_add_delayed (timeout,
2203                                                                &cancel_read_io,
2204                                                                socket);
2205
2206   return read_handle;
2207 }