535850de2a02cf1d49462cca150eb6af03d1c412
[oweals/gnunet.git] / src / stream / stream_api.c
1 /*
2   This file is part of GNUnet.
3   (C) 2012 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /* TODO:
22  *
23  * Checks for matching the sender and socket->other_peer in server
24  * message handlers  
25  *
26  * Decrement PEER intern count during socket close and listen close to free the
27  * memory used for PEER interning
28  *
29  * Add code for write io timeout
30  *
31  * Include retransmission for control messages
32  **/
33
34 /**
35  * @file stream/stream_api.c
36  * @brief Implementation of the stream library
37  * @author Sree Harsha Totakura
38  */
39
40
41 #include "platform.h"
42 #include "gnunet_common.h"
43 #include "gnunet_crypto_lib.h"
44 #include "gnunet_stream_lib.h"
45 #include "gnunet_testing_lib.h"
46 #include "stream_protocol.h"
47
48
49 /**
50  * The maximum packet size of a stream packet
51  */
52 #define MAX_PACKET_SIZE 64000
53
54 /**
55  * Receive buffer
56  */
57 #define RECEIVE_BUFFER_SIZE 4096000
58
59 /**
60  * The maximum payload a data message packet can carry
61  */
62 static size_t max_payload_size = 
63   MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
64
65 /**
66  * states in the Protocol
67  */
68 enum State
69   {
70     /**
71      * Client initialization state
72      */
73     STATE_INIT,
74
75     /**
76      * Listener initialization state 
77      */
78     STATE_LISTEN,
79
80     /**
81      * Pre-connection establishment state
82      */
83     STATE_HELLO_WAIT,
84
85     /**
86      * State where a connection has been established
87      */
88     STATE_ESTABLISHED,
89
90     /**
91      * State where the socket is closed on our side and waiting to be ACK'ed
92      */
93     STATE_RECEIVE_CLOSE_WAIT,
94
95     /**
96      * State where the socket is closed for reading
97      */
98     STATE_RECEIVE_CLOSED,
99
100     /**
101      * State where the socket is closed on our side and waiting to be ACK'ed
102      */
103     STATE_TRANSMIT_CLOSE_WAIT,
104
105     /**
106      * State where the socket is closed for writing
107      */
108     STATE_TRANSMIT_CLOSED,
109
110     /**
111      * State where the socket is closed on our side and waiting to be ACK'ed
112      */
113     STATE_CLOSE_WAIT,
114
115     /**
116      * State where the socket is closed
117      */
118     STATE_CLOSED 
119   };
120
121
122 /**
123  * Functions of this type are called when a message is written
124  *
125  * @param cls the closure from queue_message
126  * @param socket the socket the written message was bound to
127  */
128 typedef void (*SendFinishCallback) (void *cls,
129                                     struct GNUNET_STREAM_Socket *socket);
130
131
132 /**
133  * The send message queue
134  */
135 struct MessageQueue
136 {
137   /**
138    * The message
139    */
140   struct GNUNET_STREAM_MessageHeader *message;
141
142   /**
143    * Callback to be called when the message is sent
144    */
145   SendFinishCallback finish_cb;
146
147   /**
148    * The closure for finish_cb
149    */
150   void *finish_cb_cls;
151
152   /**
153    * The next message in queue. Should be NULL in the last message
154    */
155   struct MessageQueue *next;
156
157   /**
158    * The next message in queue. Should be NULL in the first message
159    */
160   struct MessageQueue *prev;
161 };
162
163
164 /**
165  * The STREAM Socket Handler
166  */
167 struct GNUNET_STREAM_Socket
168 {
169   /**
170    * Retransmission timeout
171    */
172   struct GNUNET_TIME_Relative retransmit_timeout;
173
174   /**
175    * The Acknowledgement Bitmap
176    */
177   GNUNET_STREAM_AckBitmap ack_bitmap;
178
179   /**
180    * Time when the Acknowledgement was queued
181    */
182   struct GNUNET_TIME_Absolute ack_time_registered;
183
184   /**
185    * Queued Acknowledgement deadline
186    */
187   struct GNUNET_TIME_Relative ack_time_deadline;
188
189   /**
190    * The mesh handle
191    */
192   struct GNUNET_MESH_Handle *mesh;
193
194   /**
195    * The mesh tunnel handle
196    */
197   struct GNUNET_MESH_Tunnel *tunnel;
198
199   /**
200    * Stream open closure
201    */
202   void *open_cls;
203
204   /**
205    * Stream open callback
206    */
207   GNUNET_STREAM_OpenCallback open_cb;
208
209   /**
210    * The current transmit handle (if a pending transmit request exists)
211    */
212   struct GNUNET_MESH_TransmitHandle *transmit_handle;
213
214   /**
215    * The current message associated with the transmit handle
216    */
217   struct MessageQueue *queue_head;
218
219   /**
220    * The queue tail, should always point to the last message in queue
221    */
222   struct MessageQueue *queue_tail;
223
224   /**
225    * The write IO_handle associated with this socket
226    */
227   struct GNUNET_STREAM_IOWriteHandle *write_handle;
228
229   /**
230    * The read IO_handle associated with this socket
231    */
232   struct GNUNET_STREAM_IOReadHandle *read_handle;
233
234   /**
235    * The shutdown handle associated with this socket
236    */
237   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
238
239   /**
240    * Buffer for storing received messages
241    */
242   void *receive_buffer;
243
244   /**
245    * The listen socket from which this socket is derived. Should be NULL if it
246    * is not a derived socket
247    */
248   struct GNUNET_STREAM_ListenSocket *lsocket;
249
250   /**
251    * Task identifier for the read io timeout task
252    */
253   GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id;
254
255   /**
256    * Task identifier for retransmission task after timeout
257    */
258   GNUNET_SCHEDULER_TaskIdentifier retransmission_timeout_task_id;
259
260   /**
261    * The task for sending timely Acks
262    */
263   GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
264
265   /**
266    * Task scheduled to continue a read operation.
267    */
268   GNUNET_SCHEDULER_TaskIdentifier read_task_id;
269
270   /**
271    * The state of the protocol associated with this socket
272    */
273   enum State state;
274
275   /**
276    * The status of the socket
277    */
278   enum GNUNET_STREAM_Status status;
279
280   /**
281    * The number of previous timeouts; FIXME: currently not used
282    */
283   unsigned int retries;
284
285   /**
286    * The peer identity of the peer at the other end of the stream
287    */
288   GNUNET_PEER_Id other_peer;
289
290   /**
291    * Our Peer Identity (for debugging)
292    */
293   GNUNET_PEER_Id our_id;
294
295   /**
296    * The application port number (type: uint32_t)
297    */
298   GNUNET_MESH_ApplicationType app_port;
299
300   /**
301    * The session id associated with this stream connection
302    * FIXME: Not used currently, may be removed
303    */
304   uint32_t session_id;
305
306   /**
307    * Write sequence number. Set to random when sending HELLO(client) and
308    * HELLO_ACK(server) 
309    */
310   uint32_t write_sequence_number;
311
312   /**
313    * Read sequence number. This number's value is determined during handshake
314    */
315   uint32_t read_sequence_number;
316
317   /**
318    * The receiver buffer size
319    */
320   uint32_t receive_buffer_size;
321
322   /**
323    * The receiver buffer boundaries
324    */
325   uint32_t receive_buffer_boundaries[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
326
327   /**
328    * receiver's available buffer after the last acknowledged packet
329    */
330   uint32_t receiver_window_available;
331
332   /**
333    * The offset pointer used during write operation
334    */
335   uint32_t write_offset;
336
337   /**
338    * The offset after which we are expecting data
339    */
340   uint32_t read_offset;
341
342   /**
343    * The offset upto which user has read from the received buffer
344    */
345   uint32_t copy_offset;
346 };
347
348
349 /**
350  * A socket for listening
351  */
352 struct GNUNET_STREAM_ListenSocket
353 {
354   /**
355    * The mesh handle
356    */
357   struct GNUNET_MESH_Handle *mesh;
358
359   /**
360    * The callback function which is called after successful opening socket
361    */
362   GNUNET_STREAM_ListenCallback listen_cb;
363
364   /**
365    * The call back closure
366    */
367   void *listen_cb_cls;
368
369   /**
370    * Our interned Peer's identity
371    */
372   GNUNET_PEER_Id our_id;
373
374   /**
375    * The service port
376    * FIXME: Remove if not required!
377    */
378   GNUNET_MESH_ApplicationType port;
379 };
380
381
382 /**
383  * The IO Write Handle
384  */
385 struct GNUNET_STREAM_IOWriteHandle
386 {
387   /**
388    * The socket to which this write handle is associated
389    */
390   struct GNUNET_STREAM_Socket *socket;
391
392   /**
393    * The packet_buffers associated with this Handle
394    */
395   struct GNUNET_STREAM_DataMessage *messages[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
396
397   /**
398    * The write continuation callback
399    */
400   GNUNET_STREAM_CompletionContinuation write_cont;
401
402   /**
403    * Write continuation closure
404    */
405   void *write_cont_cls;
406
407   /**
408    * The bitmap of this IOHandle; Corresponding bit for a message is set when
409    * it has been acknowledged by the receiver
410    */
411   GNUNET_STREAM_AckBitmap ack_bitmap;
412
413   /**
414    * Number of bytes in this write handle
415    */
416   size_t size;
417 };
418
419
420 /**
421  * The IO Read Handle
422  */
423 struct GNUNET_STREAM_IOReadHandle
424 {
425   /**
426    * Callback for the read processor
427    */
428   GNUNET_STREAM_DataProcessor proc;
429
430   /**
431    * The closure pointer for the read processor callback
432    */
433   void *proc_cls;
434 };
435
436
437 /**
438  * Handle for Shutdown
439  */
440 struct GNUNET_STREAM_ShutdownHandle
441 {
442   /**
443    * The socket associated with this shutdown handle
444    */
445   struct GNUNET_STREAM_Socket *socket;
446
447   /**
448    * Which operation to shutdown? SHUT_RD, SHUT_WR or SHUT_RDWR
449    */
450   int operation;
451
452   /**
453    * Shutdown completion callback
454    */
455   GNUNET_STREAM_ShutdownCompletion completion_cb;
456
457   /**
458    * Closure for completion callback
459    */
460   void *completion_cls;
461 };
462
463
464 /**
465  * Default value in seconds for various timeouts
466  */
467 static unsigned int default_timeout = 10;
468
469
470 /**
471  * Callback function for sending queued message
472  *
473  * @param cls closure the socket
474  * @param size number of bytes available in buf
475  * @param buf where the callee should write the message
476  * @return number of bytes written to buf
477  */
478 static size_t
479 send_message_notify (void *cls, size_t size, void *buf)
480 {
481   struct GNUNET_STREAM_Socket *socket = cls;
482   struct GNUNET_PeerIdentity target;
483   struct MessageQueue *head;
484   size_t ret;
485
486   socket->transmit_handle = NULL; /* Remove the transmit handle */
487   head = socket->queue_head;
488   if (NULL == head)
489     return 0; /* just to be safe */
490   GNUNET_PEER_resolve (socket->other_peer, &target);
491   if (0 == size)                /* request timed out */
492     {
493       socket->retries++;
494       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
495                   "Message sending timed out. Retry %d \n",
496                   socket->retries);
497       socket->transmit_handle = 
498         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
499                                            0, /* Corking */
500                                            1, /* Priority */
501                                            /* FIXME: exponential backoff */
502                                            socket->retransmit_timeout,
503                                            &target,
504                                            ntohs (head->message->header.size),
505                                            &send_message_notify,
506                                            socket);
507       return 0;
508     }
509
510   ret = ntohs (head->message->header.size);
511   GNUNET_assert (size >= ret);
512   memcpy (buf, head->message, ret);
513   if (NULL != head->finish_cb)
514     {
515       head->finish_cb (head->finish_cb_cls, socket);
516     }
517   GNUNET_CONTAINER_DLL_remove (socket->queue_head,
518                                socket->queue_tail,
519                                head);
520   GNUNET_free (head->message);
521   GNUNET_free (head);
522   head = socket->queue_head;
523   if (NULL != head)    /* more pending messages to send */
524     {
525       socket->retries = 0;
526       socket->transmit_handle = 
527         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
528                                            0, /* Corking */
529                                            1, /* Priority */
530                                            /* FIXME: exponential backoff */
531                                            socket->retransmit_timeout,
532                                            &target,
533                                            ntohs (head->message->header.size),
534                                            &send_message_notify,
535                                            socket);
536     }
537   return ret;
538 }
539
540
541 /**
542  * Queues a message for sending using the mesh connection of a socket
543  *
544  * @param socket the socket whose mesh connection is used
545  * @param message the message to be sent
546  * @param finish_cb the callback to be called when the message is sent
547  * @param finish_cb_cls the closure for the callback
548  */
549 static void
550 queue_message (struct GNUNET_STREAM_Socket *socket,
551                struct GNUNET_STREAM_MessageHeader *message,
552                SendFinishCallback finish_cb,
553                void *finish_cb_cls)
554 {
555   struct MessageQueue *queue_entity;
556   struct GNUNET_PeerIdentity target;
557
558   GNUNET_assert 
559     ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA)
560      && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK));
561
562   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
563               "%x: Queueing message of type %d and size %d\n",
564               socket->our_id,
565               ntohs (message->header.type),
566               ntohs (message->header.size));
567   GNUNET_assert (NULL != message);
568   queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
569   queue_entity->message = message;
570   queue_entity->finish_cb = finish_cb;
571   queue_entity->finish_cb_cls = finish_cb_cls;
572   GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
573                                     socket->queue_tail,
574                                     queue_entity);
575   if (NULL == socket->transmit_handle)
576   {
577     socket->retries = 0;
578     GNUNET_PEER_resolve (socket->other_peer, &target);
579     socket->transmit_handle = 
580       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
581                                          0, /* Corking */
582                                          1, /* Priority */
583                                          socket->retransmit_timeout,
584                                          &target,
585                                          ntohs (message->header.size),
586                                          &send_message_notify,
587                                          socket);
588   }
589 }
590
591
592 /**
593  * Copies a message and queues it for sending using the mesh connection of
594  * given socket 
595  *
596  * @param socket the socket whose mesh connection is used
597  * @param message the message to be sent
598  * @param finish_cb the callback to be called when the message is sent
599  * @param finish_cb_cls the closure for the callback
600  */
601 static void
602 copy_and_queue_message (struct GNUNET_STREAM_Socket *socket,
603                         const struct GNUNET_STREAM_MessageHeader *message,
604                         SendFinishCallback finish_cb,
605                         void *finish_cb_cls)
606 {
607   struct GNUNET_STREAM_MessageHeader *msg_copy;
608   uint16_t size;
609   
610   size = ntohs (message->header.size);
611   msg_copy = GNUNET_malloc (size);
612   memcpy (msg_copy, message, size);
613   queue_message (socket, msg_copy, finish_cb, finish_cb_cls);
614 }
615
616
617 /**
618  * Callback function for sending ack message
619  *
620  * @param cls closure the ACK message created in ack_task
621  * @param size number of bytes available in buffer
622  * @param buf where the callee should write the message
623  * @return number of bytes written to buf
624  */
625 static size_t
626 send_ack_notify (void *cls, size_t size, void *buf)
627 {
628   struct GNUNET_STREAM_AckMessage *ack_msg = cls;
629
630   if (0 == size)
631     {
632       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
633                   "%s called with size 0\n", __func__);
634       return 0;
635     }
636   GNUNET_assert (ntohs (ack_msg->header.header.size) <= size);
637   
638   size = ntohs (ack_msg->header.header.size);
639   memcpy (buf, ack_msg, size);
640   return size;
641 }
642
643 /**
644  * Writes data using the given socket. The amount of data written is limited by
645  * the receiver_window_size
646  *
647  * @param socket the socket to use
648  */
649 static void 
650 write_data (struct GNUNET_STREAM_Socket *socket);
651
652 /**
653  * Task for retransmitting data messages if they aren't ACK before their ack
654  * deadline 
655  *
656  * @param cls the socket
657  * @param tc the Task context
658  */
659 static void
660 retransmission_timeout_task (void *cls,
661                              const struct GNUNET_SCHEDULER_TaskContext *tc)
662 {
663   struct GNUNET_STREAM_Socket *socket = cls;
664   
665   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
666     return;
667
668   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
669               "%x: Retransmitting DATA...\n", socket->our_id);
670   socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
671   write_data (socket);
672 }
673
674
675 /**
676  * Task for sending ACK message
677  *
678  * @param cls the socket
679  * @param tc the Task context
680  */
681 static void
682 ack_task (void *cls,
683           const struct GNUNET_SCHEDULER_TaskContext *tc)
684 {
685   struct GNUNET_STREAM_Socket *socket = cls;
686   struct GNUNET_STREAM_AckMessage *ack_msg;
687   struct GNUNET_PeerIdentity target;
688
689   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
690     {
691       return;
692     }
693
694   socket->ack_task_id = 0;
695
696   /* Create the ACK Message */
697   ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
698   ack_msg->header.header.size = htons (sizeof (struct 
699                                                GNUNET_STREAM_AckMessage));
700   ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
701   ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
702   ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
703   ack_msg->receive_window_remaining = 
704     htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
705
706   GNUNET_PEER_resolve (socket->other_peer, &target);
707   /* Request MESH for sending ACK */
708   GNUNET_MESH_notify_transmit_ready (socket->tunnel,
709                                      0, /* Corking */
710                                      1, /* Priority */
711                                      socket->retransmit_timeout,
712                                      &target,
713                                      ntohs (ack_msg->header.header.size),
714                                      &send_ack_notify,
715                                      ack_msg);
716
717   
718 }
719
720
721 /**
722  * Function to modify a bit in GNUNET_STREAM_AckBitmap
723  *
724  * @param bitmap the bitmap to modify
725  * @param bit the bit number to modify
726  * @param value GNUNET_YES to on, GNUNET_NO to off
727  */
728 static void
729 ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
730                       unsigned int bit, 
731                       int value)
732 {
733   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
734   if (GNUNET_YES == value)
735     *bitmap |= (1LL << bit);
736   else
737     *bitmap &= ~(1LL << bit);
738 }
739
740
741 /**
742  * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
743  *
744  * @param bitmap address of the bitmap that has to be checked
745  * @param bit the bit number to check
746  * @return GNUNET_YES if the bit is set; GNUNET_NO if not
747  */
748 static uint8_t
749 ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
750                       unsigned int bit)
751 {
752   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
753   return 0 != (*bitmap & (1LL << bit));
754 }
755
756
757 /**
758  * Writes data using the given socket. The amount of data written is limited by
759  * the receiver_window_size
760  *
761  * @param socket the socket to use
762  */
763 static void 
764 write_data (struct GNUNET_STREAM_Socket *socket)
765 {
766   struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
767   int packet;                   /* Although an int, should never be negative */
768   int ack_packet;
769
770   ack_packet = -1;
771   /* Find the last acknowledged packet */
772   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
773     {      
774       if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
775                                               packet))
776         ack_packet = packet;        
777       else if (NULL == io_handle->messages[packet])
778         break;
779     }
780   /* Resend packets which weren't ack'ed */
781   for (packet=0; packet < ack_packet; packet++)
782     {
783       if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
784                                              packet))
785         {
786           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
787                       "%x: Placing DATA message with sequence %u in send queue\n",
788                       socket->our_id,
789                       ntohl (io_handle->messages[packet]->sequence_number));
790
791           copy_and_queue_message (socket,
792                                   &io_handle->messages[packet]->header,
793                                   NULL,
794                                   NULL);
795         }
796     }
797   packet = ack_packet + 1;
798   /* Now send new packets if there is enough buffer space */
799   while ( (NULL != io_handle->messages[packet]) &&
800           (socket->receiver_window_available 
801            >= ntohs (io_handle->messages[packet]->header.header.size)) )
802     {
803       socket->receiver_window_available -= 
804         ntohs (io_handle->messages[packet]->header.header.size);
805       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
806                   "%x: Placing DATA message with sequence %u in send queue\n",
807                   socket->our_id,
808                   ntohl (io_handle->messages[packet]->sequence_number));
809       copy_and_queue_message (socket,
810                               &io_handle->messages[packet]->header,
811                               NULL,
812                               NULL);
813       packet++;
814     }
815
816   if (GNUNET_SCHEDULER_NO_TASK == socket->retransmission_timeout_task_id)
817     socket->retransmission_timeout_task_id = 
818       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 
819                                     (GNUNET_TIME_UNIT_SECONDS, 8),
820                                     &retransmission_timeout_task,
821                                     socket);
822 }
823
824
825 /**
826  * Task for calling the read processor
827  *
828  * @param cls the socket
829  * @param tc the task context
830  */
831 static void
832 call_read_processor (void *cls,
833                      const struct GNUNET_SCHEDULER_TaskContext *tc)
834 {
835   struct GNUNET_STREAM_Socket *socket = cls;
836   size_t read_size;
837   size_t valid_read_size;
838   unsigned int packet;
839   uint32_t sequence_increase;
840   uint32_t offset_increase;
841
842   socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
843   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
844     return;
845
846   if (NULL == socket->receive_buffer) 
847     return;
848
849   GNUNET_assert (NULL != socket->read_handle);
850   GNUNET_assert (NULL != socket->read_handle->proc);
851
852   /* Check the bitmap for any holes */
853   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
854     {
855       if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
856                                              packet))
857         break;
858     }
859   /* We only call read processor if we have the first packet */
860   GNUNET_assert (0 < packet);
861
862   valid_read_size = 
863     socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
864
865   GNUNET_assert (0 != valid_read_size);
866
867   /* Cancel the read_io_timeout_task */
868   GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task_id);
869   socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
870
871   /* Call the data processor */
872   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
873               "%x: Calling read processor\n",
874               socket->our_id);
875   read_size = 
876     socket->read_handle->proc (socket->read_handle->proc_cls,
877                                socket->status,
878                                socket->receive_buffer + socket->copy_offset,
879                                valid_read_size);
880   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
881               "%x: Read processor read %d bytes\n",
882               socket->our_id,
883               read_size);
884   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
885               "%x: Read processor completed successfully\n",
886               socket->our_id);
887
888   /* Free the read handle */
889   GNUNET_free (socket->read_handle);
890   socket->read_handle = NULL;
891
892   GNUNET_assert (read_size <= valid_read_size);
893   socket->copy_offset += read_size;
894
895   /* Determine upto which packet we can remove from the buffer */
896   for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
897     {
898       if (socket->copy_offset == socket->receive_buffer_boundaries[packet])
899         { packet++; break; }
900       if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
901         break;
902     }
903
904   /* If no packets can be removed we can't move the buffer */
905   if (0 == packet) return;
906
907   sequence_increase = packet;
908   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
909               "%x: Sequence increase after read processor completion: %u\n",
910               socket->our_id,
911               sequence_increase);
912
913   /* Shift the data in the receive buffer */
914   memmove (socket->receive_buffer,
915            socket->receive_buffer 
916            + socket->receive_buffer_boundaries[sequence_increase-1],
917            socket->receive_buffer_size
918            - socket->receive_buffer_boundaries[sequence_increase-1]);
919   
920   /* Shift the bitmap */
921   socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
922   
923   /* Set read_sequence_number */
924   socket->read_sequence_number += sequence_increase;
925   
926   /* Set read_offset */
927   offset_increase = socket->receive_buffer_boundaries[sequence_increase-1];
928   socket->read_offset += offset_increase;
929
930   /* Fix copy_offset */
931   GNUNET_assert (offset_increase <= socket->copy_offset);
932   socket->copy_offset -= offset_increase;
933   
934   /* Fix relative boundaries */
935   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
936     {
937       if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase)
938         {
939           socket->receive_buffer_boundaries[packet] = 
940             socket->receive_buffer_boundaries[packet + sequence_increase] 
941             - offset_increase;
942         }
943       else
944         socket->receive_buffer_boundaries[packet] = 0;
945     }
946 }
947
948
949 /**
950  * Cancels the existing read io handle
951  *
952  * @param cls the closure from the SCHEDULER call
953  * @param tc the task context
954  */
955 static void
956 read_io_timeout (void *cls, 
957                 const struct GNUNET_SCHEDULER_TaskContext *tc)
958 {
959   struct GNUNET_STREAM_Socket *socket = cls;
960   GNUNET_STREAM_DataProcessor proc;
961   void *proc_cls;
962
963   socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
964   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
965   {
966     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
967                 "%x: Read task timedout - Cancelling it\n",
968                 socket->our_id);
969     GNUNET_SCHEDULER_cancel (socket->read_task_id);
970     socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
971   }
972   GNUNET_assert (NULL != socket->read_handle);
973   proc = socket->read_handle->proc;
974   proc_cls = socket->read_handle->proc_cls;
975
976   GNUNET_free (socket->read_handle);
977   socket->read_handle = NULL;
978   /* Call the read processor to signal timeout */
979   proc (proc_cls,
980         GNUNET_STREAM_TIMEOUT,
981         NULL,
982         0);
983 }
984
985
986 /**
987  * Handler for DATA messages; Same for both client and server
988  *
989  * @param socket the socket through which the ack was received
990  * @param tunnel connection to the other end
991  * @param sender who sent the message
992  * @param msg the data message
993  * @param atsi performance data for the connection
994  * @return GNUNET_OK to keep the connection open,
995  *         GNUNET_SYSERR to close it (signal serious error)
996  */
997 static int
998 handle_data (struct GNUNET_STREAM_Socket *socket,
999              struct GNUNET_MESH_Tunnel *tunnel,
1000              const struct GNUNET_PeerIdentity *sender,
1001              const struct GNUNET_STREAM_DataMessage *msg,
1002              const struct GNUNET_ATS_Information*atsi)
1003 {
1004   const void *payload;
1005   uint32_t bytes_needed;
1006   uint32_t relative_offset;
1007   uint32_t relative_sequence_number;
1008   uint16_t size;
1009
1010   size = htons (msg->header.header.size);
1011   if (size < sizeof (struct GNUNET_STREAM_DataMessage))
1012     {
1013       GNUNET_break_op (0);
1014       return GNUNET_SYSERR;
1015     }
1016
1017   if (GNUNET_PEER_search (sender) != socket->other_peer)
1018     {
1019       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1020                   "%x: Received DATA from non-confirming peer\n",
1021                   socket->our_id);
1022       return GNUNET_YES;
1023     }
1024
1025   switch (socket->state)
1026     {
1027     case STATE_ESTABLISHED:
1028     case STATE_TRANSMIT_CLOSED:
1029     case STATE_TRANSMIT_CLOSE_WAIT:
1030       
1031       /* check if the message's sequence number is in the range we are
1032          expecting */
1033       relative_sequence_number = 
1034         ntohl (msg->sequence_number) - socket->read_sequence_number;
1035       if ( relative_sequence_number > GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
1036         {
1037           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1038                       "%x: Ignoring received message with sequence number %u\n",
1039                       socket->our_id,
1040                       ntohl (msg->sequence_number));
1041           /* Start ACK sending task if one is not already present */
1042           if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1043             {
1044               socket->ack_task_id = 
1045                 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1046                                               (msg->ack_deadline),
1047                                               &ack_task,
1048                                               socket);
1049             }
1050           return GNUNET_YES;
1051         }
1052       
1053       /* Check if we have already seen this message */
1054       if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
1055                                               relative_sequence_number))
1056         {
1057           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1058                       "%x: Ignoring already received message with sequence "
1059                       "number %u\n",
1060                       socket->our_id,
1061                       ntohl (msg->sequence_number));
1062           /* Start ACK sending task if one is not already present */
1063           if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1064             {
1065               socket->ack_task_id = 
1066                 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1067                                               (msg->ack_deadline),
1068                                               &ack_task,
1069                                               socket);
1070             }
1071           return GNUNET_YES;
1072         }
1073
1074       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1075                   "%x: Receiving DATA with sequence number: %u and size: %d "
1076                   "from %x\n",
1077                   socket->our_id,
1078                   ntohl (msg->sequence_number),
1079                   ntohs (msg->header.header.size),
1080                   socket->other_peer);
1081
1082       /* Check if we have to allocate the buffer */
1083       size -= sizeof (struct GNUNET_STREAM_DataMessage);
1084       relative_offset = ntohl (msg->offset) - socket->read_offset;
1085       bytes_needed = relative_offset + size;
1086       if (bytes_needed > socket->receive_buffer_size)
1087         {
1088           if (bytes_needed <= RECEIVE_BUFFER_SIZE)
1089             {
1090               socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
1091                                                        bytes_needed);
1092               socket->receive_buffer_size = bytes_needed;
1093             }
1094           else
1095             {
1096               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1097                           "%x: Cannot accommodate packet %d as buffer is",
1098                           "full\n",
1099                           socket->our_id,
1100                           ntohl (msg->sequence_number));
1101               return GNUNET_YES;
1102             }
1103         }
1104       
1105       /* Copy Data to buffer */
1106       payload = &msg[1];
1107       GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
1108       memcpy (socket->receive_buffer + relative_offset,
1109               payload,
1110               size);
1111       socket->receive_buffer_boundaries[relative_sequence_number] = 
1112         relative_offset + size;
1113       
1114       /* Modify the ACK bitmap */
1115       ackbitmap_modify_bit (&socket->ack_bitmap,
1116                             relative_sequence_number,
1117                             GNUNET_YES);
1118
1119       /* Start ACK sending task if one is not already present */
1120       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1121        {
1122          socket->ack_task_id = 
1123            GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1124                                          (msg->ack_deadline),
1125                                          &ack_task,
1126                                          socket);
1127        }
1128
1129       if ((NULL != socket->read_handle) /* A read handle is waiting */
1130           /* There is no current read task */
1131           && (GNUNET_SCHEDULER_NO_TASK == socket->read_task_id)
1132           /* We have the first packet */
1133           && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap,
1134                                                  0)))
1135         {
1136           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1137                       "%x: Scheduling read processor\n",
1138                       socket->our_id);
1139
1140           socket->read_task_id = 
1141             GNUNET_SCHEDULER_add_now (&call_read_processor,
1142                                       socket);
1143         }
1144       
1145       break;
1146
1147     default:
1148       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1149                   "%x: Received data message when it cannot be handled\n",
1150                   socket->our_id);
1151       break;
1152     }
1153   return GNUNET_YES;
1154 }
1155
1156
1157 /**
1158  * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1159  *
1160  * @param cls the socket (set from GNUNET_MESH_connect)
1161  * @param tunnel connection to the other end
1162  * @param tunnel_ctx place to store local state associated with the tunnel
1163  * @param sender who sent the message
1164  * @param message the actual message
1165  * @param atsi performance data for the connection
1166  * @return GNUNET_OK to keep the connection open,
1167  *         GNUNET_SYSERR to close it (signal serious error)
1168  */
1169 static int
1170 client_handle_data (void *cls,
1171              struct GNUNET_MESH_Tunnel *tunnel,
1172              void **tunnel_ctx,
1173              const struct GNUNET_PeerIdentity *sender,
1174              const struct GNUNET_MessageHeader *message,
1175              const struct GNUNET_ATS_Information*atsi)
1176 {
1177   struct GNUNET_STREAM_Socket *socket = cls;
1178
1179   return handle_data (socket, 
1180                       tunnel, 
1181                       sender, 
1182                       (const struct GNUNET_STREAM_DataMessage *) message, 
1183                       atsi);
1184 }
1185
1186
1187 /**
1188  * Callback to set state to ESTABLISHED
1189  *
1190  * @param cls the closure from queue_message FIXME: document
1191  * @param socket the socket to requiring state change
1192  */
1193 static void
1194 set_state_established (void *cls,
1195                        struct GNUNET_STREAM_Socket *socket)
1196 {
1197   struct GNUNET_PeerIdentity initiator_pid;
1198
1199   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
1200               "%x: Attaining ESTABLISHED state\n",
1201               socket->our_id);
1202   socket->write_offset = 0;
1203   socket->read_offset = 0;
1204   socket->state = STATE_ESTABLISHED;
1205   /* FIXME: What if listen_cb is NULL */
1206   if (NULL != socket->lsocket)
1207     {
1208       GNUNET_PEER_resolve (socket->other_peer, &initiator_pid);
1209       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1210                   "%x: Calling listen callback\n",
1211                   socket->our_id);
1212       if (GNUNET_SYSERR == 
1213           socket->lsocket->listen_cb (socket->lsocket->listen_cb_cls,
1214                                       socket,
1215                                       &initiator_pid))
1216         {
1217           socket->state = STATE_CLOSED;
1218           /* FIXME: We should close in a decent way */
1219           GNUNET_MESH_tunnel_destroy (socket->tunnel); /* Destroy the tunnel */
1220           GNUNET_free (socket);
1221         }
1222     }
1223   else if (socket->open_cb)
1224     socket->open_cb (socket->open_cls, socket);
1225 }
1226
1227
1228 /**
1229  * Callback to set state to HELLO_WAIT
1230  *
1231  * @param cls the closure from queue_message
1232  * @param socket the socket to requiring state change
1233  */
1234 static void
1235 set_state_hello_wait (void *cls,
1236                       struct GNUNET_STREAM_Socket *socket)
1237 {
1238   GNUNET_assert (STATE_INIT == socket->state);
1239   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
1240               "%x: Attaining HELLO_WAIT state\n",
1241               socket->our_id);
1242   socket->state = STATE_HELLO_WAIT;
1243 }
1244
1245
1246 /**
1247  * Callback to set state to CLOSE_WAIT
1248  *
1249  * @param cls the closure from queue_message
1250  * @param socket the socket requiring state change
1251  */
1252 static void
1253 set_state_close_wait (void *cls,
1254                       struct GNUNET_STREAM_Socket *socket)
1255 {
1256   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1257               "%x: Attaing CLOSE_WAIT state\n",
1258               socket->our_id);
1259   socket->state = STATE_CLOSE_WAIT;
1260   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1261   socket->receive_buffer = NULL;
1262   socket->receive_buffer_size = 0;
1263 }
1264
1265
1266 /**
1267  * Callback to set state to CLOSED
1268  *
1269  * @param cls the closure from queue_message
1270  * @param socket the socket requiring state change
1271  */
1272 static void
1273 set_state_closed (void *cls,
1274                   struct GNUNET_STREAM_Socket *socket)
1275 {
1276   socket->state = STATE_CLOSED;
1277 }
1278
1279 /**
1280  * Returns a new HelloAckMessage. Also sets the write sequence number for the
1281  * socket
1282  *
1283  * @param socket the socket for which this HelloAckMessage has to be generated
1284  * @return the HelloAckMessage
1285  */
1286 static struct GNUNET_STREAM_HelloAckMessage *
1287 generate_hello_ack_msg (struct GNUNET_STREAM_Socket *socket)
1288 {
1289   struct GNUNET_STREAM_HelloAckMessage *msg;
1290
1291   /* Get the random sequence number */
1292   socket->write_sequence_number = 
1293     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1294   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1295               "%x: Generated write sequence number %u\n",
1296               socket->our_id,
1297               (unsigned int) socket->write_sequence_number);
1298   
1299   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1300   msg->header.header.size = 
1301     htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1302   msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1303   msg->sequence_number = htonl (socket->write_sequence_number);
1304   msg->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
1305
1306   return msg;
1307 }
1308
1309
1310 /**
1311  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1312  *
1313  * @param cls the socket (set from GNUNET_MESH_connect)
1314  * @param tunnel connection to the other end
1315  * @param tunnel_ctx this is NULL
1316  * @param sender who sent the message
1317  * @param message the actual message
1318  * @param atsi performance data for the connection
1319  * @return GNUNET_OK to keep the connection open,
1320  *         GNUNET_SYSERR to close it (signal serious error)
1321  */
1322 static int
1323 client_handle_hello_ack (void *cls,
1324                          struct GNUNET_MESH_Tunnel *tunnel,
1325                          void **tunnel_ctx,
1326                          const struct GNUNET_PeerIdentity *sender,
1327                          const struct GNUNET_MessageHeader *message,
1328                          const struct GNUNET_ATS_Information*atsi)
1329 {
1330   struct GNUNET_STREAM_Socket *socket = cls;
1331   const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
1332   struct GNUNET_STREAM_HelloAckMessage *reply;
1333
1334   if (GNUNET_PEER_search (sender) != socket->other_peer)
1335     {
1336       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1337                   "%x: Received HELLO_ACK from non-confirming peer\n",
1338                   socket->our_id);
1339       return GNUNET_YES;
1340     }
1341   ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
1342   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1343               "%x: Received HELLO_ACK from %x\n",
1344               socket->our_id,
1345               socket->other_peer);
1346
1347   GNUNET_assert (socket->tunnel == tunnel);
1348   switch (socket->state)
1349   {
1350   case STATE_HELLO_WAIT:
1351     socket->read_sequence_number = ntohl (ack_msg->sequence_number);
1352     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1353                 "%x: Read sequence number %u\n",
1354                 socket->our_id,
1355                 (unsigned int) socket->read_sequence_number);
1356     socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
1357     reply = generate_hello_ack_msg (socket);
1358     queue_message (socket,
1359                    &reply->header, 
1360                    &set_state_established, 
1361                    NULL);      
1362     return GNUNET_OK;
1363   case STATE_ESTABLISHED:
1364   case STATE_RECEIVE_CLOSE_WAIT:
1365     // call statistics (# ACKs ignored++)
1366     return GNUNET_OK;
1367   case STATE_INIT:
1368   default:
1369     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1370                 "%x: Server %x sent HELLO_ACK when in state %d\n", 
1371                 socket->our_id,
1372                 socket->other_peer,
1373                 socket->state);
1374     socket->state = STATE_CLOSED; // introduce STATE_ERROR?
1375     return GNUNET_SYSERR;
1376   }
1377
1378 }
1379
1380
1381 /**
1382  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1383  *
1384  * @param cls the socket (set from GNUNET_MESH_connect)
1385  * @param tunnel connection to the other end
1386  * @param tunnel_ctx this is NULL
1387  * @param sender who sent the message
1388  * @param message the actual message
1389  * @param atsi performance data for the connection
1390  * @return GNUNET_OK to keep the connection open,
1391  *         GNUNET_SYSERR to close it (signal serious error)
1392  */
1393 static int
1394 client_handle_reset (void *cls,
1395                      struct GNUNET_MESH_Tunnel *tunnel,
1396                      void **tunnel_ctx,
1397                      const struct GNUNET_PeerIdentity *sender,
1398                      const struct GNUNET_MessageHeader *message,
1399                      const struct GNUNET_ATS_Information*atsi)
1400 {
1401   struct GNUNET_STREAM_Socket *socket = cls;
1402
1403   return GNUNET_OK;
1404 }
1405
1406
1407 /**
1408  * Common message handler for handling TRANSMIT_CLOSE messages
1409  *
1410  * @param socket the socket through which the ack was received
1411  * @param tunnel connection to the other end
1412  * @param sender who sent the message
1413  * @param msg the transmit close message
1414  * @param atsi performance data for the connection
1415  * @return GNUNET_OK to keep the connection open,
1416  *         GNUNET_SYSERR to close it (signal serious error)
1417  */
1418 static int
1419 handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
1420                        struct GNUNET_MESH_Tunnel *tunnel,
1421                        const struct GNUNET_PeerIdentity *sender,
1422                        const struct GNUNET_STREAM_MessageHeader *msg,
1423                        const struct GNUNET_ATS_Information*atsi)
1424 {
1425   struct GNUNET_STREAM_MessageHeader *reply;
1426
1427   switch (socket->state)
1428     {
1429     case STATE_ESTABLISHED:
1430       socket->state = STATE_RECEIVE_CLOSED;
1431
1432       /* Send TRANSMIT_CLOSE_ACK */
1433       reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1434       reply->header.type = 
1435         htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
1436       reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1437       queue_message (socket, reply, NULL, NULL);
1438       break;
1439
1440     default:
1441       /* FIXME: Call statistics? */
1442       break;
1443     }
1444   return GNUNET_YES;
1445 }
1446
1447
1448 /**
1449  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1450  *
1451  * @param cls the socket (set from GNUNET_MESH_connect)
1452  * @param tunnel connection to the other end
1453  * @param tunnel_ctx this is NULL
1454  * @param sender who sent the message
1455  * @param message the actual message
1456  * @param atsi performance data for the connection
1457  * @return GNUNET_OK to keep the connection open,
1458  *         GNUNET_SYSERR to close it (signal serious error)
1459  */
1460 static int
1461 client_handle_transmit_close (void *cls,
1462                               struct GNUNET_MESH_Tunnel *tunnel,
1463                               void **tunnel_ctx,
1464                               const struct GNUNET_PeerIdentity *sender,
1465                               const struct GNUNET_MessageHeader *message,
1466                               const struct GNUNET_ATS_Information*atsi)
1467 {
1468   struct GNUNET_STREAM_Socket *socket = cls;
1469   
1470   return handle_transmit_close (socket,
1471                                 tunnel,
1472                                 sender,
1473                                 (struct GNUNET_STREAM_MessageHeader *)message,
1474                                 atsi);
1475 }
1476
1477
1478 /**
1479  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1480  *
1481  * @param cls the socket (set from GNUNET_MESH_connect)
1482  * @param tunnel connection to the other end
1483  * @param tunnel_ctx this is NULL
1484  * @param sender who sent the message
1485  * @param message the actual message
1486  * @param atsi performance data for the connection
1487  * @return GNUNET_OK to keep the connection open,
1488  *         GNUNET_SYSERR to close it (signal serious error)
1489  */
1490 static int
1491 client_handle_transmit_close_ack (void *cls,
1492                                   struct GNUNET_MESH_Tunnel *tunnel,
1493                                   void **tunnel_ctx,
1494                                   const struct GNUNET_PeerIdentity *sender,
1495                                   const struct GNUNET_MessageHeader *message,
1496                                   const struct GNUNET_ATS_Information*atsi)
1497 {
1498   struct GNUNET_STREAM_Socket *socket = cls;
1499
1500   return GNUNET_OK;
1501 }
1502
1503
1504 /**
1505  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1506  *
1507  * @param cls the socket (set from GNUNET_MESH_connect)
1508  * @param tunnel connection to the other end
1509  * @param tunnel_ctx this is NULL
1510  * @param sender who sent the message
1511  * @param message the actual message
1512  * @param atsi performance data for the connection
1513  * @return GNUNET_OK to keep the connection open,
1514  *         GNUNET_SYSERR to close it (signal serious error)
1515  */
1516 static int
1517 client_handle_receive_close (void *cls,
1518                              struct GNUNET_MESH_Tunnel *tunnel,
1519                              void **tunnel_ctx,
1520                              const struct GNUNET_PeerIdentity *sender,
1521                              const struct GNUNET_MessageHeader *message,
1522                              const struct GNUNET_ATS_Information*atsi)
1523 {
1524   struct GNUNET_STREAM_Socket *socket = cls;
1525
1526   return GNUNET_OK;
1527 }
1528
1529
1530 /**
1531  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1532  *
1533  * @param cls the socket (set from GNUNET_MESH_connect)
1534  * @param tunnel connection to the other end
1535  * @param tunnel_ctx this is NULL
1536  * @param sender who sent the message
1537  * @param message the actual message
1538  * @param atsi performance data for the connection
1539  * @return GNUNET_OK to keep the connection open,
1540  *         GNUNET_SYSERR to close it (signal serious error)
1541  */
1542 static int
1543 client_handle_receive_close_ack (void *cls,
1544                                  struct GNUNET_MESH_Tunnel *tunnel,
1545                                  void **tunnel_ctx,
1546                                  const struct GNUNET_PeerIdentity *sender,
1547                                  const struct GNUNET_MessageHeader *message,
1548                                  const struct GNUNET_ATS_Information*atsi)
1549 {
1550   struct GNUNET_STREAM_Socket *socket = cls;
1551
1552   return GNUNET_OK;
1553 }
1554
1555
1556 /**
1557  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1558  *
1559  * @param socket the socket
1560  * @param tunnel connection to the other end
1561  * @param tunnel_ctx this is NULL
1562  * @param sender who sent the message
1563  * @param message the actual message
1564  * @param atsi performance data for the connection
1565  * @return GNUNET_OK to keep the connection open,
1566  *         GNUNET_SYSERR to close it (signal serious error)
1567  */
1568 static int
1569 handle_close (struct GNUNET_STREAM_Socket *socket,
1570               struct GNUNET_MESH_Tunnel *tunnel,
1571               const struct GNUNET_PeerIdentity *sender,
1572               const struct GNUNET_STREAM_MessageHeader *message,
1573               const struct GNUNET_ATS_Information*atsi)
1574 {
1575   struct GNUNET_STREAM_MessageHeader *close_ack;
1576
1577   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1578               "%x: Received CLOSE from %x\n",
1579               socket->our_id,
1580               socket->other_peer);
1581   close_ack = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1582   close_ack->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1583   close_ack->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK);
1584   queue_message (socket,
1585                  close_ack,
1586                  &set_state_closed,
1587                  NULL);
1588   if (socket->state == STATE_CLOSED)
1589     return GNUNET_OK;
1590
1591   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1592   socket->receive_buffer = NULL;
1593   socket->receive_buffer_size = 0;
1594   return GNUNET_OK;
1595 }
1596
1597
1598 /**
1599  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1600  *
1601  * @param cls the socket (set from GNUNET_MESH_connect)
1602  * @param tunnel connection to the other end
1603  * @param tunnel_ctx this is NULL
1604  * @param sender who sent the message
1605  * @param message the actual message
1606  * @param atsi performance data for the connection
1607  * @return GNUNET_OK to keep the connection open,
1608  *         GNUNET_SYSERR to close it (signal serious error)
1609  */
1610 static int
1611 client_handle_close (void *cls,
1612                      struct GNUNET_MESH_Tunnel *tunnel,
1613                      void **tunnel_ctx,
1614                      const struct GNUNET_PeerIdentity *sender,
1615                      const struct GNUNET_MessageHeader *message,
1616                      const struct GNUNET_ATS_Information*atsi)
1617 {
1618   struct GNUNET_STREAM_Socket *socket = cls;
1619
1620   return handle_close (socket,
1621                        tunnel,
1622                        sender,
1623                        (const struct GNUNET_STREAM_MessageHeader *) message,
1624                        atsi);
1625 }
1626
1627
1628 /**
1629  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1630  *
1631  * @param socket the socket
1632  * @param tunnel connection to the other end
1633  * @param tunnel_ctx this is NULL
1634  * @param sender who sent the message
1635  * @param message the actual message
1636  * @param atsi performance data for the connection
1637  * @return GNUNET_OK to keep the connection open,
1638  *         GNUNET_SYSERR to close it (signal serious error)
1639  */
1640 static int
1641 handle_close_ack (struct GNUNET_STREAM_Socket *socket,
1642                   struct GNUNET_MESH_Tunnel *tunnel,
1643                   const struct GNUNET_PeerIdentity *sender,
1644                   const struct GNUNET_STREAM_MessageHeader *message,
1645                   const struct GNUNET_ATS_Information*atsi)
1646 {
1647   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
1648
1649   shutdown_handle = socket->shutdown_handle;
1650   switch (socket->state)
1651     {
1652     case STATE_CLOSE_WAIT:
1653       socket->state = STATE_CLOSED;
1654       if ( (NULL == shutdown_handle) ||
1655            (SHUT_RDWR != shutdown_handle->operation) )
1656         {
1657           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1658                       "%x: Received CLOSE_ACK when shutdown handle is NULL or "
1659                       "not for SHUT_RDWR\n",
1660                       socket->our_id);
1661           return GNUNET_OK;
1662         }
1663       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1664                   "%x: Received CLOSE_ACK from %x\n",
1665                   socket->our_id,
1666                   socket->other_peer);
1667       if (NULL != shutdown_handle->completion_cb) /* Shutdown completion */
1668         shutdown_handle->completion_cb(shutdown_handle->completion_cls,
1669                                        SHUT_RDWR);
1670       GNUNET_free (shutdown_handle); /* Free shutdown handle */
1671       break;
1672     default:
1673       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1674                   "%x: Received CLOSE_ACK when in it not expected\n",
1675                   socket->our_id);
1676       break;
1677     }
1678   return GNUNET_OK;
1679 }
1680
1681
1682 /**
1683  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1684  *
1685  * @param cls the socket (set from GNUNET_MESH_connect)
1686  * @param tunnel connection to the other end
1687  * @param tunnel_ctx this is NULL
1688  * @param sender who sent the message
1689  * @param message the actual message
1690  * @param atsi performance data for the connection
1691  * @return GNUNET_OK to keep the connection open,
1692  *         GNUNET_SYSERR to close it (signal serious error)
1693  */
1694 static int
1695 client_handle_close_ack (void *cls,
1696                          struct GNUNET_MESH_Tunnel *tunnel,
1697                          void **tunnel_ctx,
1698                          const struct GNUNET_PeerIdentity *sender,
1699                          const struct GNUNET_MessageHeader *message,
1700                          const struct GNUNET_ATS_Information*atsi)
1701 {
1702   struct GNUNET_STREAM_Socket *socket = cls;
1703
1704   return handle_close_ack (socket,
1705                            tunnel,
1706                            sender,
1707                            (const struct GNUNET_STREAM_MessageHeader *) 
1708                            message,
1709                            atsi);
1710 }
1711
1712 /*****************************/
1713 /* Server's Message Handlers */
1714 /*****************************/
1715
1716 /**
1717  * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1718  *
1719  * @param cls the closure
1720  * @param tunnel connection to the other end
1721  * @param tunnel_ctx the socket
1722  * @param sender who sent the message
1723  * @param message the actual message
1724  * @param atsi performance data for the connection
1725  * @return GNUNET_OK to keep the connection open,
1726  *         GNUNET_SYSERR to close it (signal serious error)
1727  */
1728 static int
1729 server_handle_data (void *cls,
1730                     struct GNUNET_MESH_Tunnel *tunnel,
1731                     void **tunnel_ctx,
1732                     const struct GNUNET_PeerIdentity *sender,
1733                     const struct GNUNET_MessageHeader *message,
1734                     const struct GNUNET_ATS_Information*atsi)
1735 {
1736   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1737
1738   return handle_data (socket,
1739                       tunnel,
1740                       sender,
1741                       (const struct GNUNET_STREAM_DataMessage *)message,
1742                       atsi);
1743 }
1744
1745
1746 /**
1747  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
1748  *
1749  * @param cls the closure
1750  * @param tunnel connection to the other end
1751  * @param tunnel_ctx the socket
1752  * @param sender who sent the message
1753  * @param message the actual message
1754  * @param atsi performance data for the connection
1755  * @return GNUNET_OK to keep the connection open,
1756  *         GNUNET_SYSERR to close it (signal serious error)
1757  */
1758 static int
1759 server_handle_hello (void *cls,
1760                      struct GNUNET_MESH_Tunnel *tunnel,
1761                      void **tunnel_ctx,
1762                      const struct GNUNET_PeerIdentity *sender,
1763                      const struct GNUNET_MessageHeader *message,
1764                      const struct GNUNET_ATS_Information*atsi)
1765 {
1766   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1767   struct GNUNET_STREAM_HelloAckMessage *reply;
1768
1769   if (GNUNET_PEER_search (sender) != socket->other_peer)
1770     {
1771       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1772                   "%x: Received HELLO from non-confirming peer\n",
1773                   socket->our_id);
1774       return GNUNET_YES;
1775     }
1776
1777   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == 
1778                  ntohs (message->type));
1779   GNUNET_assert (socket->tunnel == tunnel);
1780   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1781               "%x: Received HELLO from %x\n", 
1782               socket->our_id,
1783               socket->other_peer);
1784
1785   if (STATE_INIT == socket->state)
1786     {
1787       reply = generate_hello_ack_msg (socket);
1788       queue_message (socket, 
1789                      &reply->header,
1790                      &set_state_hello_wait, 
1791                      NULL);
1792     }
1793   else
1794     {
1795       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1796                   "Client sent HELLO when in state %d\n", socket->state);
1797       /* FIXME: Send RESET? */
1798       
1799     }
1800   return GNUNET_OK;
1801 }
1802
1803
1804 /**
1805  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1806  *
1807  * @param cls the closure
1808  * @param tunnel connection to the other end
1809  * @param tunnel_ctx the socket
1810  * @param sender who sent the message
1811  * @param message the actual message
1812  * @param atsi performance data for the connection
1813  * @return GNUNET_OK to keep the connection open,
1814  *         GNUNET_SYSERR to close it (signal serious error)
1815  */
1816 static int
1817 server_handle_hello_ack (void *cls,
1818                          struct GNUNET_MESH_Tunnel *tunnel,
1819                          void **tunnel_ctx,
1820                          const struct GNUNET_PeerIdentity *sender,
1821                          const struct GNUNET_MessageHeader *message,
1822                          const struct GNUNET_ATS_Information*atsi)
1823 {
1824   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1825   const struct GNUNET_STREAM_HelloAckMessage *ack_message;
1826
1827   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK ==
1828                  ntohs (message->type));
1829   GNUNET_assert (socket->tunnel == tunnel);
1830   ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
1831   if (STATE_HELLO_WAIT == socket->state)
1832     {
1833       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1834                   "%x: Received HELLO_ACK from %x\n",
1835                   socket->our_id,
1836                   socket->other_peer);
1837       socket->read_sequence_number = ntohl (ack_message->sequence_number);
1838       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1839                   "%x: Read sequence number %u\n",
1840                   socket->our_id,
1841                   (unsigned int) socket->read_sequence_number);
1842       socket->receiver_window_available = 
1843         ntohl (ack_message->receiver_window_size);
1844       /* Attain ESTABLISHED state */
1845       set_state_established (NULL, socket);
1846     }
1847   else
1848     {
1849       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1850                   "Client sent HELLO_ACK when in state %d\n", socket->state);
1851       /* FIXME: Send RESET? */
1852       
1853     }
1854   return GNUNET_OK;
1855 }
1856
1857
1858 /**
1859  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1860  *
1861  * @param cls the closure
1862  * @param tunnel connection to the other end
1863  * @param tunnel_ctx the socket
1864  * @param sender who sent the message
1865  * @param message the actual message
1866  * @param atsi performance data for the connection
1867  * @return GNUNET_OK to keep the connection open,
1868  *         GNUNET_SYSERR to close it (signal serious error)
1869  */
1870 static int
1871 server_handle_reset (void *cls,
1872                      struct GNUNET_MESH_Tunnel *tunnel,
1873                      void **tunnel_ctx,
1874                      const struct GNUNET_PeerIdentity *sender,
1875                      const struct GNUNET_MessageHeader *message,
1876                      const struct GNUNET_ATS_Information*atsi)
1877 {
1878   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1879
1880   return GNUNET_OK;
1881 }
1882
1883
1884 /**
1885  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1886  *
1887  * @param cls the closure
1888  * @param tunnel connection to the other end
1889  * @param tunnel_ctx the socket
1890  * @param sender who sent the message
1891  * @param message the actual message
1892  * @param atsi performance data for the connection
1893  * @return GNUNET_OK to keep the connection open,
1894  *         GNUNET_SYSERR to close it (signal serious error)
1895  */
1896 static int
1897 server_handle_transmit_close (void *cls,
1898                               struct GNUNET_MESH_Tunnel *tunnel,
1899                               void **tunnel_ctx,
1900                               const struct GNUNET_PeerIdentity *sender,
1901                               const struct GNUNET_MessageHeader *message,
1902                               const struct GNUNET_ATS_Information*atsi)
1903 {
1904   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1905
1906   return handle_transmit_close (socket,
1907                                 tunnel,
1908                                 sender,
1909                                 (struct GNUNET_STREAM_MessageHeader *)message,
1910                                 atsi);
1911 }
1912
1913
1914 /**
1915  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1916  *
1917  * @param cls the closure
1918  * @param tunnel connection to the other end
1919  * @param tunnel_ctx the socket
1920  * @param sender who sent the message
1921  * @param message the actual message
1922  * @param atsi performance data for the connection
1923  * @return GNUNET_OK to keep the connection open,
1924  *         GNUNET_SYSERR to close it (signal serious error)
1925  */
1926 static int
1927 server_handle_transmit_close_ack (void *cls,
1928                                   struct GNUNET_MESH_Tunnel *tunnel,
1929                                   void **tunnel_ctx,
1930                                   const struct GNUNET_PeerIdentity *sender,
1931                                   const struct GNUNET_MessageHeader *message,
1932                                   const struct GNUNET_ATS_Information*atsi)
1933 {
1934   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1935
1936   return GNUNET_OK;
1937 }
1938
1939
1940 /**
1941  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1942  *
1943  * @param cls the closure
1944  * @param tunnel connection to the other end
1945  * @param tunnel_ctx the socket
1946  * @param sender who sent the message
1947  * @param message the actual message
1948  * @param atsi performance data for the connection
1949  * @return GNUNET_OK to keep the connection open,
1950  *         GNUNET_SYSERR to close it (signal serious error)
1951  */
1952 static int
1953 server_handle_receive_close (void *cls,
1954                              struct GNUNET_MESH_Tunnel *tunnel,
1955                              void **tunnel_ctx,
1956                              const struct GNUNET_PeerIdentity *sender,
1957                              const struct GNUNET_MessageHeader *message,
1958                              const struct GNUNET_ATS_Information*atsi)
1959 {
1960   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1961
1962   return GNUNET_OK;
1963 }
1964
1965
1966 /**
1967  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1968  *
1969  * @param cls the closure
1970  * @param tunnel connection to the other end
1971  * @param tunnel_ctx the socket
1972  * @param sender who sent the message
1973  * @param message the actual message
1974  * @param atsi performance data for the connection
1975  * @return GNUNET_OK to keep the connection open,
1976  *         GNUNET_SYSERR to close it (signal serious error)
1977  */
1978 static int
1979 server_handle_receive_close_ack (void *cls,
1980                                  struct GNUNET_MESH_Tunnel *tunnel,
1981                                  void **tunnel_ctx,
1982                                  const struct GNUNET_PeerIdentity *sender,
1983                                  const struct GNUNET_MessageHeader *message,
1984                                  const struct GNUNET_ATS_Information*atsi)
1985 {
1986   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1987
1988   return GNUNET_OK;
1989 }
1990
1991
1992 /**
1993  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1994  *
1995  * @param cls the listen socket (from GNUNET_MESH_connect in
1996  *          GNUNET_STREAM_listen) 
1997  * @param tunnel connection to the other end
1998  * @param tunnel_ctx the socket
1999  * @param sender who sent the message
2000  * @param message the actual message
2001  * @param atsi performance data for the connection
2002  * @return GNUNET_OK to keep the connection open,
2003  *         GNUNET_SYSERR to close it (signal serious error)
2004  */
2005 static int
2006 server_handle_close (void *cls,
2007                      struct GNUNET_MESH_Tunnel *tunnel,
2008                      void **tunnel_ctx,
2009                      const struct GNUNET_PeerIdentity *sender,
2010                      const struct GNUNET_MessageHeader *message,
2011                      const struct GNUNET_ATS_Information*atsi)
2012 {
2013   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2014   
2015   return handle_close (socket,
2016                        tunnel,
2017                        sender,
2018                        (const struct GNUNET_STREAM_MessageHeader *) message,
2019                        atsi);
2020 }
2021
2022
2023 /**
2024  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
2025  *
2026  * @param cls the closure
2027  * @param tunnel connection to the other end
2028  * @param tunnel_ctx the socket
2029  * @param sender who sent the message
2030  * @param message the actual message
2031  * @param atsi performance data for the connection
2032  * @return GNUNET_OK to keep the connection open,
2033  *         GNUNET_SYSERR to close it (signal serious error)
2034  */
2035 static int
2036 server_handle_close_ack (void *cls,
2037                          struct GNUNET_MESH_Tunnel *tunnel,
2038                          void **tunnel_ctx,
2039                          const struct GNUNET_PeerIdentity *sender,
2040                          const struct GNUNET_MessageHeader *message,
2041                          const struct GNUNET_ATS_Information*atsi)
2042 {
2043   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2044
2045   return handle_close_ack (socket,
2046                            tunnel,
2047                            sender,
2048                            (const struct GNUNET_STREAM_MessageHeader *) message,
2049                            atsi);
2050 }
2051
2052
2053 /**
2054  * Message Handler for mesh
2055  *
2056  * @param socket the socket through which the ack was received
2057  * @param tunnel connection to the other end
2058  * @param sender who sent the message
2059  * @param ack the acknowledgment message
2060  * @param atsi performance data for the connection
2061  * @return GNUNET_OK to keep the connection open,
2062  *         GNUNET_SYSERR to close it (signal serious error)
2063  */
2064 static int
2065 handle_ack (struct GNUNET_STREAM_Socket *socket,
2066             struct GNUNET_MESH_Tunnel *tunnel,
2067             const struct GNUNET_PeerIdentity *sender,
2068             const struct GNUNET_STREAM_AckMessage *ack,
2069             const struct GNUNET_ATS_Information*atsi)
2070 {
2071   unsigned int packet;
2072   int need_retransmission;
2073   
2074
2075   if (GNUNET_PEER_search (sender) != socket->other_peer)
2076     {
2077       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2078                   "%x: Received ACK from non-confirming peer\n",
2079                   socket->our_id);
2080       return GNUNET_YES;
2081     }
2082
2083   switch (socket->state)
2084     {
2085     case (STATE_ESTABLISHED):
2086       if (NULL == socket->write_handle)
2087         {
2088           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2089                       "%x: Received DATA_ACK when write_handle is NULL\n",
2090                       socket->our_id);
2091           return GNUNET_OK;
2092         }
2093       /* FIXME: increment in the base sequence number is breaking current flow
2094        */
2095       if (!((socket->write_sequence_number 
2096              - ntohl (ack->base_sequence_number)) < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
2097         {
2098           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2099                       "%x: Received DATA_ACK with unexpected base sequence "
2100                       "number\n",
2101                       socket->our_id);
2102           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2103                       "%x: Current write sequence: %u; Ack's base sequence: %u\n",
2104                       socket->our_id,
2105                       socket->write_sequence_number,
2106                       ntohl (ack->base_sequence_number));
2107           return GNUNET_OK;
2108         }
2109       /* FIXME: include the case when write_handle is cancelled - ignore the 
2110          acks */
2111
2112       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2113                   "%x: Received DATA_ACK from %x\n",
2114                   socket->our_id,
2115                   socket->other_peer);
2116       
2117       /* Cancel the retransmission task */
2118       if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
2119         {
2120           GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
2121           socket->retransmission_timeout_task_id = 
2122             GNUNET_SCHEDULER_NO_TASK;
2123         }
2124
2125       for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2126         {
2127           if (NULL == socket->write_handle->messages[packet]) break;
2128           if (ntohl (ack->base_sequence_number)
2129               >= ntohl (socket->write_handle->messages[packet]->sequence_number))
2130             ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
2131                                   packet,
2132                                   GNUNET_YES);
2133           else
2134             if (GNUNET_YES == 
2135                 ackbitmap_is_bit_set (&socket->write_handle->ack_bitmap,
2136                                       ntohl (socket->write_handle->messages[packet]->sequence_number)
2137                                       - ntohl (ack->base_sequence_number)))
2138               ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
2139                                     packet,
2140                                     GNUNET_YES);
2141         }
2142
2143       /* Update the receive window remaining
2144        FIXME : Should update with the value from a data ack with greater
2145        sequence number */
2146       socket->receiver_window_available = 
2147         ntohl (ack->receive_window_remaining);
2148
2149       /* Check if we have received all acknowledgements */
2150       need_retransmission = GNUNET_NO;
2151       for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2152         {
2153           if (NULL == socket->write_handle->messages[packet]) break;
2154           if (GNUNET_YES != ackbitmap_is_bit_set 
2155               (&socket->write_handle->ack_bitmap,packet))
2156             {
2157               need_retransmission = GNUNET_YES;
2158               break;
2159             }
2160         }
2161       if (GNUNET_YES == need_retransmission)
2162         {
2163           write_data (socket);
2164         }
2165       else      /* We have to call the write continuation callback now */
2166         {
2167           /* Free the packets */
2168           for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2169             {
2170               GNUNET_free_non_null (socket->write_handle->messages[packet]);
2171             }
2172           if (NULL != socket->write_handle->write_cont)
2173             socket->write_handle->write_cont
2174               (socket->write_handle->write_cont_cls,
2175                socket->status,
2176                socket->write_handle->size);
2177           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2178                       "%x: Write completion callback completed\n",
2179                       socket->our_id);
2180           /* We are done with the write handle - Freeing it */
2181           GNUNET_free (socket->write_handle);
2182           socket->write_handle = NULL;
2183         }
2184       break;
2185     default:
2186       break;
2187     }
2188   return GNUNET_OK;
2189 }
2190
2191
2192 /**
2193  * Message Handler for mesh
2194  *
2195  * @param cls the 'struct GNUNET_STREAM_Socket'
2196  * @param tunnel connection to the other end
2197  * @param tunnel_ctx unused
2198  * @param sender who sent the message
2199  * @param message the actual message
2200  * @param atsi performance data for the connection
2201  * @return GNUNET_OK to keep the connection open,
2202  *         GNUNET_SYSERR to close it (signal serious error)
2203  */
2204 static int
2205 client_handle_ack (void *cls,
2206                    struct GNUNET_MESH_Tunnel *tunnel,
2207                    void **tunnel_ctx,
2208                    const struct GNUNET_PeerIdentity *sender,
2209                    const struct GNUNET_MessageHeader *message,
2210                    const struct GNUNET_ATS_Information*atsi)
2211 {
2212   struct GNUNET_STREAM_Socket *socket = cls;
2213   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2214  
2215   return handle_ack (socket, tunnel, sender, ack, atsi);
2216 }
2217
2218
2219 /**
2220  * Message Handler for mesh
2221  *
2222  * @param cls the server's listen socket
2223  * @param tunnel connection to the other end
2224  * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
2225  * @param sender who sent the message
2226  * @param message the actual message
2227  * @param atsi performance data for the connection
2228  * @return GNUNET_OK to keep the connection open,
2229  *         GNUNET_SYSERR to close it (signal serious error)
2230  */
2231 static int
2232 server_handle_ack (void *cls,
2233                    struct GNUNET_MESH_Tunnel *tunnel,
2234                    void **tunnel_ctx,
2235                    const struct GNUNET_PeerIdentity *sender,
2236                    const struct GNUNET_MessageHeader *message,
2237                    const struct GNUNET_ATS_Information*atsi)
2238 {
2239   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2240   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2241  
2242   return handle_ack (socket, tunnel, sender, ack, atsi);
2243 }
2244
2245
2246 /**
2247  * For client message handlers, the stream socket is in the
2248  * closure argument.
2249  */
2250 static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
2251   {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2252   {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2253    sizeof (struct GNUNET_STREAM_AckMessage) },
2254   {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2255    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2256   {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2257    sizeof (struct GNUNET_STREAM_MessageHeader)},
2258   {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2259    sizeof (struct GNUNET_STREAM_MessageHeader)},
2260   {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2261    sizeof (struct GNUNET_STREAM_MessageHeader)},
2262   {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2263    sizeof (struct GNUNET_STREAM_MessageHeader)},
2264   {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2265    sizeof (struct GNUNET_STREAM_MessageHeader)},
2266   {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2267    sizeof (struct GNUNET_STREAM_MessageHeader)},
2268   {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2269    sizeof (struct GNUNET_STREAM_MessageHeader)},
2270   {NULL, 0, 0}
2271 };
2272
2273
2274 /**
2275  * For server message handlers, the stream socket is in the
2276  * tunnel context, and the listen socket in the closure argument.
2277  */
2278 static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
2279   {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2280   {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2281    sizeof (struct GNUNET_STREAM_AckMessage) },
2282   {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 
2283    sizeof (struct GNUNET_STREAM_MessageHeader)},
2284   {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2285    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2286   {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2287    sizeof (struct GNUNET_STREAM_MessageHeader)},
2288   {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2289    sizeof (struct GNUNET_STREAM_MessageHeader)},
2290   {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2291    sizeof (struct GNUNET_STREAM_MessageHeader)},
2292   {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2293    sizeof (struct GNUNET_STREAM_MessageHeader)},
2294   {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2295    sizeof (struct GNUNET_STREAM_MessageHeader)},
2296   {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2297    sizeof (struct GNUNET_STREAM_MessageHeader)},
2298   {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2299    sizeof (struct GNUNET_STREAM_MessageHeader)},
2300   {NULL, 0, 0}
2301 };
2302
2303
2304 /**
2305  * Function called when our target peer is connected to our tunnel
2306  *
2307  * @param cls the socket for which this tunnel is created
2308  * @param peer the peer identity of the target
2309  * @param atsi performance data for the connection
2310  */
2311 static void
2312 mesh_peer_connect_callback (void *cls,
2313                             const struct GNUNET_PeerIdentity *peer,
2314                             const struct GNUNET_ATS_Information * atsi)
2315 {
2316   struct GNUNET_STREAM_Socket *socket = cls;
2317   struct GNUNET_STREAM_MessageHeader *message;
2318   GNUNET_PEER_Id connected_peer;
2319
2320   connected_peer = GNUNET_PEER_search (peer);
2321   
2322   if (connected_peer != socket->other_peer)
2323     {
2324       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2325                   "%x: A peer which is not our target has connected",
2326                   "to our tunnel\n",
2327                   socket->our_id);
2328       return;
2329     }
2330   
2331   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2332               "%x: Target peer %x connected\n", 
2333               socket->our_id,
2334               connected_peer);
2335   
2336   /* Set state to INIT */
2337   socket->state = STATE_INIT;
2338
2339   /* Send HELLO message */
2340   message = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
2341   message->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
2342   message->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
2343   queue_message (socket,
2344                  message,
2345                  &set_state_hello_wait,
2346                  NULL);
2347
2348   /* Call open callback */
2349   if (NULL == socket->open_cb)
2350     {
2351       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2352                   "STREAM_open callback is NULL\n");
2353     }
2354 }
2355
2356
2357 /**
2358  * Function called when our target peer is disconnected from our tunnel
2359  *
2360  * @param cls the socket associated which this tunnel
2361  * @param peer the peer identity of the target
2362  */
2363 static void
2364 mesh_peer_disconnect_callback (void *cls,
2365                                const struct GNUNET_PeerIdentity *peer)
2366 {
2367
2368 }
2369
2370
2371 /**
2372  * Method called whenever a peer creates a tunnel to us
2373  *
2374  * @param cls closure
2375  * @param tunnel new handle to the tunnel
2376  * @param initiator peer that started the tunnel
2377  * @param atsi performance information for the tunnel
2378  * @return initial tunnel context for the tunnel
2379  *         (can be NULL -- that's not an error)
2380  */
2381 static void *
2382 new_tunnel_notify (void *cls,
2383                    struct GNUNET_MESH_Tunnel *tunnel,
2384                    const struct GNUNET_PeerIdentity *initiator,
2385                    const struct GNUNET_ATS_Information *atsi)
2386 {
2387   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2388   struct GNUNET_STREAM_Socket *socket;
2389
2390   /* FIXME: If a tunnel is already created, we should not accept new tunnels
2391      from the same peer again until the socket is closed */
2392
2393   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2394   socket->other_peer = GNUNET_PEER_intern (initiator);
2395   socket->tunnel = tunnel;
2396   socket->session_id = 0;       /* FIXME */
2397   socket->state = STATE_INIT;
2398   socket->lsocket = lsocket;
2399   socket->our_id = lsocket->our_id;
2400   
2401   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2402               "%x: Peer %x initiated tunnel to us\n", 
2403               socket->our_id,
2404               socket->other_peer);
2405   
2406   /* FIXME: Copy MESH handle from lsocket to socket */
2407   
2408   return socket;
2409 }
2410
2411
2412 /**
2413  * Function called whenever an inbound tunnel is destroyed.  Should clean up
2414  * any associated state.  This function is NOT called if the client has
2415  * explicitly asked for the tunnel to be destroyed using
2416  * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
2417  * the tunnel.
2418  *
2419  * @param cls closure (set from GNUNET_MESH_connect)
2420  * @param tunnel connection to the other end (henceforth invalid)
2421  * @param tunnel_ctx place where local state associated
2422  *                   with the tunnel is stored
2423  */
2424 static void 
2425 tunnel_cleaner (void *cls,
2426                 const struct GNUNET_MESH_Tunnel *tunnel,
2427                 void *tunnel_ctx)
2428 {
2429   struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
2430   
2431   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2432               "%x: Peer %x has terminated connection abruptly\n",
2433               socket->our_id,
2434               socket->other_peer);
2435
2436   socket->status = GNUNET_STREAM_SHUTDOWN;
2437
2438   /* Clear Transmit handles */
2439   if (NULL != socket->transmit_handle)
2440     {
2441       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2442       socket->transmit_handle = NULL;
2443     }
2444   socket->tunnel = NULL;
2445 }
2446
2447
2448 /*****************/
2449 /* API functions */
2450 /*****************/
2451
2452
2453 /**
2454  * Tries to open a stream to the target peer
2455  *
2456  * @param cfg configuration to use
2457  * @param target the target peer to which the stream has to be opened
2458  * @param app_port the application port number which uniquely identifies this
2459  *            stream
2460  * @param open_cb this function will be called after stream has be established 
2461  * @param open_cb_cls the closure for open_cb
2462  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
2463  * @return if successful it returns the stream socket; NULL if stream cannot be
2464  *         opened 
2465  */
2466 struct GNUNET_STREAM_Socket *
2467 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
2468                     const struct GNUNET_PeerIdentity *target,
2469                     GNUNET_MESH_ApplicationType app_port,
2470                     GNUNET_STREAM_OpenCallback open_cb,
2471                     void *open_cb_cls,
2472                     ...)
2473 {
2474   struct GNUNET_STREAM_Socket *socket;
2475   struct GNUNET_PeerIdentity own_peer_id;
2476   enum GNUNET_STREAM_Option option;
2477   va_list vargs;                /* Variable arguments */
2478
2479   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2480               "%s\n", __func__);
2481
2482   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2483   socket->other_peer = GNUNET_PEER_intern (target);
2484   socket->open_cb = open_cb;
2485   socket->open_cls = open_cb_cls;
2486   GNUNET_TESTING_get_peer_identity (cfg, &own_peer_id);
2487   socket->our_id = GNUNET_PEER_intern (&own_peer_id);
2488   
2489   /* Set defaults */
2490   socket->retransmit_timeout = 
2491     GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
2492
2493   va_start (vargs, open_cb_cls); /* Parse variable args */
2494   do {
2495     option = va_arg (vargs, enum GNUNET_STREAM_Option);
2496     switch (option)
2497       {
2498       case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
2499         /* Expect struct GNUNET_TIME_Relative */
2500         socket->retransmit_timeout = va_arg (vargs,
2501                                              struct GNUNET_TIME_Relative);
2502         break;
2503       case GNUNET_STREAM_OPTION_END:
2504         break;
2505       }
2506   } while (GNUNET_STREAM_OPTION_END != option);
2507   va_end (vargs);               /* End of variable args parsing */
2508   socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
2509                                       10,  /* QUEUE size as parameter? */
2510                                       socket, /* cls */
2511                                       NULL, /* No inbound tunnel handler */
2512                                       &tunnel_cleaner, /* FIXME: not required? */
2513                                       client_message_handlers,
2514                                       &app_port); /* We don't get inbound tunnels */
2515   if (NULL == socket->mesh)   /* Fail if we cannot connect to mesh */
2516     {
2517       GNUNET_free (socket);
2518       return NULL;
2519     }
2520
2521   /* Now create the mesh tunnel to target */
2522   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2523               "Creating MESH Tunnel\n");
2524   socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
2525                                               NULL, /* Tunnel context */
2526                                               &mesh_peer_connect_callback,
2527                                               &mesh_peer_disconnect_callback,
2528                                               socket);
2529   GNUNET_assert (NULL != socket->tunnel);
2530   GNUNET_MESH_peer_request_connect_add (socket->tunnel,
2531                                         target);
2532   
2533   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2534               "%s() END\n", __func__);
2535   return socket;
2536 }
2537
2538
2539 /**
2540  * Shutdown the stream for reading or writing (similar to man 2 shutdown).
2541  *
2542  * @param socket the stream socket
2543  * @param operation SHUT_RD, SHUT_WR or SHUT_RDWR
2544  * @param completion_cb the callback that will be called upon successful
2545  *          shutdown of given operation
2546  * @param completion_cls the closure for the completion callback
2547  * @return the shutdown handle
2548  */
2549 struct GNUNET_STREAM_ShutdownHandle *
2550 GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
2551                         int operation,
2552                         GNUNET_STREAM_ShutdownCompletion completion_cb,
2553                         void *completion_cls)
2554 {
2555   struct GNUNET_STREAM_ShutdownHandle *handle;
2556   struct GNUNET_STREAM_MessageHeader *msg;
2557   
2558   GNUNET_assert (NULL == socket->shutdown_handle);
2559
2560   handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ShutdownHandle));
2561   handle->socket = socket;
2562   handle->completion_cb = completion_cb;
2563   handle->completion_cls = completion_cls;
2564   socket->shutdown_handle = handle;
2565
2566   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
2567   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
2568   switch (operation)
2569     {
2570     case SHUT_RD:
2571       handle->operation = SHUT_RD;
2572       if (NULL != socket->read_handle)
2573         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2574                     "Existing read handle should be cancelled before shutting"
2575                     " down reading\n");
2576       break;
2577     case SHUT_WR:
2578       handle->operation = SHUT_WR;
2579       
2580       break;
2581     case SHUT_RDWR:
2582       handle->operation = SHUT_RDWR;
2583       if (NULL != socket->write_handle)
2584         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2585                     "Existing write handle should be cancelled before shutting"
2586                     " down writing\n");
2587       if (NULL != socket->read_handle)
2588         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2589                     "Existing read handle should be cancelled before shutting"
2590                     " down reading\n");
2591       msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
2592       queue_message (socket,
2593                      msg,
2594                      &set_state_close_wait,
2595                      NULL);
2596       break;
2597     default:
2598       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2599                   "GNUNET_STREAM_shutdown called with invalid value for "
2600                   "parameter operation -- Ignoring\n");
2601       GNUNET_free (handle);
2602       return NULL;
2603     }
2604   return handle;
2605 }
2606
2607
2608 /**
2609  * Cancels a pending shutdown
2610  *
2611  * @param handle the shutdown handle returned from GNUNET_STREAM_shutdown
2612  */
2613 void
2614 GNUNET_STREAM_shutdown_cancel (struct GNUNET_STREAM_ShutdownHandle *handle)
2615 {
2616   return;
2617 }
2618
2619
2620 /**
2621  * Closes the stream
2622  *
2623  * @param socket the stream socket
2624  */
2625 void
2626 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
2627 {
2628   struct MessageQueue *head;
2629
2630   GNUNET_break (NULL == socket->read_handle);
2631   GNUNET_break (NULL == socket->write_handle);
2632
2633   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
2634     {
2635       /* socket closed with read task pending!? */
2636       GNUNET_break (0);
2637       GNUNET_SCHEDULER_cancel (socket->read_task_id);
2638       socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
2639     }
2640   
2641   /* Terminate the ack'ing tasks if they are still present */
2642   if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK)
2643     {
2644       GNUNET_SCHEDULER_cancel (socket->ack_task_id);
2645       socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
2646     }
2647
2648   /* Clear Transmit handles */
2649   if (NULL != socket->transmit_handle)
2650     {
2651       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2652       socket->transmit_handle = NULL;
2653     }
2654
2655   /* Clear existing message queue */
2656   while (NULL != (head = socket->queue_head)) {
2657     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
2658                                  socket->queue_tail,
2659                                  head);
2660     GNUNET_free (head->message);
2661     GNUNET_free (head);
2662   }
2663
2664   /* Close associated tunnel */
2665   if (NULL != socket->tunnel)
2666     {
2667       GNUNET_MESH_tunnel_destroy (socket->tunnel);
2668       socket->tunnel = NULL;
2669     }
2670
2671   /* Close mesh connection */
2672   if (NULL != socket->mesh && NULL == socket->lsocket)
2673     {
2674       GNUNET_MESH_disconnect (socket->mesh);
2675       socket->mesh = NULL;
2676     }
2677   
2678   /* Release receive buffer */
2679   if (NULL != socket->receive_buffer)
2680     {
2681       GNUNET_free (socket->receive_buffer);
2682     }
2683
2684   GNUNET_free (socket);
2685 }
2686
2687
2688 /**
2689  * Listens for stream connections for a specific application ports
2690  *
2691  * @param cfg the configuration to use
2692  * @param app_port the application port for which new streams will be accepted
2693  * @param listen_cb this function will be called when a peer tries to establish
2694  *            a stream with us
2695  * @param listen_cb_cls closure for listen_cb
2696  * @return listen socket, NULL for any error
2697  */
2698 struct GNUNET_STREAM_ListenSocket *
2699 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
2700                       GNUNET_MESH_ApplicationType app_port,
2701                       GNUNET_STREAM_ListenCallback listen_cb,
2702                       void *listen_cb_cls)
2703 {
2704   /* FIXME: Add variable args for passing configration options? */
2705   struct GNUNET_STREAM_ListenSocket *lsocket;
2706   struct GNUNET_PeerIdentity our_peer_id;
2707
2708   lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
2709   lsocket->port = app_port;
2710   lsocket->listen_cb = listen_cb;
2711   lsocket->listen_cb_cls = listen_cb_cls;
2712   GNUNET_TESTING_get_peer_identity (cfg, &our_peer_id);
2713   lsocket->our_id = GNUNET_PEER_intern (&our_peer_id);
2714   lsocket->mesh = GNUNET_MESH_connect (cfg,
2715                                        10, /* FIXME: QUEUE size as parameter? */
2716                                        lsocket, /* Closure */
2717                                        &new_tunnel_notify,
2718                                        &tunnel_cleaner,
2719                                        server_message_handlers,
2720                                        &app_port);
2721   GNUNET_assert (NULL != lsocket->mesh);
2722   return lsocket;
2723 }
2724
2725
2726 /**
2727  * Closes the listen socket
2728  *
2729  * @param lsocket the listen socket
2730  */
2731 void
2732 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
2733 {
2734   /* Close MESH connection */
2735   GNUNET_assert (NULL != lsocket->mesh);
2736   GNUNET_MESH_disconnect (lsocket->mesh);
2737   
2738   GNUNET_free (lsocket);
2739 }
2740
2741
2742 /**
2743  * Tries to write the given data to the stream
2744  *
2745  * @param socket the socket representing a stream
2746  * @param data the data buffer from where the data is written into the stream
2747  * @param size the number of bytes to be written from the data buffer
2748  * @param timeout the timeout period
2749  * @param write_cont the function to call upon writing some bytes into the stream
2750  * @param write_cont_cls the closure
2751  * @return handle to cancel the operation
2752  */
2753 struct GNUNET_STREAM_IOWriteHandle *
2754 GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
2755                      const void *data,
2756                      size_t size,
2757                      struct GNUNET_TIME_Relative timeout,
2758                      GNUNET_STREAM_CompletionContinuation write_cont,
2759                      void *write_cont_cls)
2760 {
2761   unsigned int num_needed_packets;
2762   unsigned int packet;
2763   struct GNUNET_STREAM_IOWriteHandle *io_handle;
2764   uint32_t packet_size;
2765   uint32_t payload_size;
2766   struct GNUNET_STREAM_DataMessage *data_msg;
2767   const void *sweep;
2768   struct GNUNET_TIME_Relative ack_deadline;
2769
2770   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2771               "%s\n", __func__);
2772
2773   /* Return NULL if there is already a write request pending */
2774   if (NULL != socket->write_handle)
2775   {
2776     GNUNET_break (0);
2777     return NULL;
2778   }
2779   if (!((STATE_ESTABLISHED == socket->state)
2780         || (STATE_RECEIVE_CLOSE_WAIT == socket->state)
2781         || (STATE_RECEIVE_CLOSED == socket->state)))
2782     {
2783       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2784                   "%x: Attempting to write on a closed (OR) not-yet-established"
2785                   "stream\n",
2786                   socket->our_id);
2787       return NULL;
2788     } 
2789   if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size)
2790     size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH  * max_payload_size;
2791   num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size;
2792   io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
2793   io_handle->socket = socket;
2794   io_handle->write_cont = write_cont;
2795   io_handle->write_cont_cls = write_cont_cls;
2796   io_handle->size = size;
2797   sweep = data;
2798   /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
2799      determined from RTT */
2800   ack_deadline = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
2801   /* Divide the given buffer into packets for sending */
2802   for (packet=0; packet < num_needed_packets; packet++)
2803     {
2804       if ((packet + 1) * max_payload_size < size) 
2805         {
2806           payload_size = max_payload_size;
2807           packet_size = MAX_PACKET_SIZE;
2808         }
2809       else 
2810         {
2811           payload_size = size - packet * max_payload_size;
2812           packet_size =  payload_size + sizeof (struct
2813                                                 GNUNET_STREAM_DataMessage); 
2814         }
2815       io_handle->messages[packet] = GNUNET_malloc (packet_size);
2816       io_handle->messages[packet]->header.header.size = htons (packet_size);
2817       io_handle->messages[packet]->header.header.type =
2818         htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
2819       io_handle->messages[packet]->sequence_number =
2820         htonl (socket->write_sequence_number++);
2821       io_handle->messages[packet]->offset = htonl (socket->write_offset);
2822
2823       /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
2824          determined from RTT */
2825       io_handle->messages[packet]->ack_deadline =
2826         GNUNET_TIME_relative_hton (ack_deadline);
2827       data_msg = io_handle->messages[packet];
2828       /* Copy data from given buffer to the packet */
2829       memcpy (&data_msg[1],
2830               sweep,
2831               payload_size);
2832       sweep += payload_size;
2833       socket->write_offset += payload_size;
2834     }
2835   socket->write_handle = io_handle;
2836   write_data (socket);
2837
2838   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2839               "%s() END\n", __func__);
2840
2841   return io_handle;
2842 }
2843
2844
2845 /**
2846  * Tries to read data from the stream
2847  *
2848  * @param socket the socket representing a stream
2849  * @param timeout the timeout period
2850  * @param proc function to call with data (once only)
2851  * @param proc_cls the closure for proc
2852  * @return handle to cancel the operation
2853  */
2854 struct GNUNET_STREAM_IOReadHandle *
2855 GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
2856                     struct GNUNET_TIME_Relative timeout,
2857                     GNUNET_STREAM_DataProcessor proc,
2858                     void *proc_cls)
2859 {
2860   struct GNUNET_STREAM_IOReadHandle *read_handle;
2861   
2862   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2863               "%x: %s()\n", 
2864               socket->our_id,
2865               __func__);
2866
2867   /* Return NULL if there is already a read handle; the user has to cancel that
2868   first before continuing or has to wait until it is completed */
2869   if (NULL != socket->read_handle) return NULL;
2870
2871   GNUNET_assert (NULL != proc);
2872
2873   read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
2874   read_handle->proc = proc;
2875   read_handle->proc_cls = proc_cls;
2876   socket->read_handle = read_handle;
2877
2878   /* Check if we have a packet at bitmap 0 */
2879   if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
2880                                           0))
2881     {
2882       socket->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
2883                                                        socket);
2884    
2885     }
2886   
2887   /* Setup the read timeout task */
2888   socket->read_io_timeout_task_id =
2889     GNUNET_SCHEDULER_add_delayed (timeout,
2890                                   &read_io_timeout,
2891                                   socket);
2892   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2893               "%x: %s() END\n",
2894               socket->our_id,
2895               __func__);
2896   return read_handle;
2897 }
2898
2899
2900 /**
2901  * Cancel pending write operation.
2902  *
2903  * @param ioh handle to operation to cancel
2904  */
2905 void
2906 GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
2907 {
2908   struct GNUNET_STREAM_Socket *socket = ioh->socket;
2909   unsigned int packet;
2910
2911   GNUNET_assert (NULL != socket->write_handle);
2912   GNUNET_assert (socket->write_handle == ioh);
2913
2914   if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
2915     {
2916       GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
2917       socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
2918     }
2919
2920   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2921     {
2922       if (NULL == ioh->messages[packet]) break;
2923       GNUNET_free (ioh->messages[packet]);
2924     }
2925       
2926   GNUNET_free (socket->write_handle);
2927   socket->write_handle = NULL;
2928   return;
2929 }
2930
2931
2932 /**
2933  * Cancel pending read operation.
2934  *
2935  * @param ioh handle to operation to cancel
2936  */
2937 void
2938 GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh)
2939 {
2940   return;
2941 }