fixed listen callback to happen after reaching ESTABLISHED state
[oweals/gnunet.git] / src / stream / stream_api.c
1 /*
2   This file is part of GNUnet.
3   (C) 2012 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /* TODO:
22  *
23  * Checks for matching the sender and socket->other_peer in server
24  * message handlers  
25  *
26  * Decrement PEER intern count during socket close and listen close to free the
27  * memory used for PEER interning
28  **/
29
30 /**
31  * @file stream/stream_api.c
32  * @brief Implementation of the stream library
33  * @author Sree Harsha Totakura
34  */
35 #include "platform.h"
36 #include "gnunet_common.h"
37 #include "gnunet_crypto_lib.h"
38 #include "gnunet_stream_lib.h"
39 #include "gnunet_testing_lib.h"
40 #include "stream_protocol.h"
41
42
43 /**
44  * The maximum packet size of a stream packet
45  */
46 #define MAX_PACKET_SIZE 64000
47
48 /**
49  * The maximum payload a data message packet can carry
50  */
51 static size_t max_payload_size = 
52   MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
53
54 /**
55  * Receive buffer
56  */
57 #define RECEIVE_BUFFER_SIZE 4096000
58
59 /**
60  * states in the Protocol
61  */
62 enum State
63   {
64     /**
65      * Client initialization state
66      */
67     STATE_INIT,
68
69     /**
70      * Listener initialization state 
71      */
72     STATE_LISTEN,
73
74     /**
75      * Pre-connection establishment state
76      */
77     STATE_HELLO_WAIT,
78
79     /**
80      * State where a connection has been established
81      */
82     STATE_ESTABLISHED,
83
84     /**
85      * State where the socket is closed on our side and waiting to be ACK'ed
86      */
87     STATE_RECEIVE_CLOSE_WAIT,
88
89     /**
90      * State where the socket is closed for reading
91      */
92     STATE_RECEIVE_CLOSED,
93
94     /**
95      * State where the socket is closed on our side and waiting to be ACK'ed
96      */
97     STATE_TRANSMIT_CLOSE_WAIT,
98
99     /**
100      * State where the socket is closed for writing
101      */
102     STATE_TRANSMIT_CLOSED,
103
104     /**
105      * State where the socket is closed on our side and waiting to be ACK'ed
106      */
107     STATE_CLOSE_WAIT,
108
109     /**
110      * State where the socket is closed
111      */
112     STATE_CLOSED 
113   };
114
115
116 /**
117  * Functions of this type are called when a message is written
118  *
119  * @param cls the closure from queue_message
120  * @param socket the socket the written message was bound to
121  */
122 typedef void (*SendFinishCallback) (void *cls,
123                                     struct GNUNET_STREAM_Socket *socket);
124
125
126 /**
127  * The send message queue
128  */
129 struct MessageQueue
130 {
131   /**
132    * The message
133    */
134   struct GNUNET_STREAM_MessageHeader *message;
135
136   /**
137    * Callback to be called when the message is sent
138    */
139   SendFinishCallback finish_cb;
140
141   /**
142    * The closure for finish_cb
143    */
144   void *finish_cb_cls;
145
146   /**
147    * The next message in queue. Should be NULL in the last message
148    */
149   struct MessageQueue *next;
150
151   /**
152    * The next message in queue. Should be NULL in the first message
153    */
154   struct MessageQueue *prev;
155 };
156
157
158 /**
159  * The STREAM Socket Handler
160  */
161 struct GNUNET_STREAM_Socket
162 {
163   /**
164    * Retransmission timeout
165    */
166   struct GNUNET_TIME_Relative retransmit_timeout;
167
168   /**
169    * The Acknowledgement Bitmap
170    */
171   GNUNET_STREAM_AckBitmap ack_bitmap;
172
173   /**
174    * Time when the Acknowledgement was queued
175    */
176   struct GNUNET_TIME_Absolute ack_time_registered;
177
178   /**
179    * Queued Acknowledgement deadline
180    */
181   struct GNUNET_TIME_Relative ack_time_deadline;
182
183   /**
184    * The mesh handle
185    */
186   struct GNUNET_MESH_Handle *mesh;
187
188   /**
189    * The mesh tunnel handle
190    */
191   struct GNUNET_MESH_Tunnel *tunnel;
192
193   /**
194    * Stream open closure
195    */
196   void *open_cls;
197
198   /**
199    * Stream open callback
200    */
201   GNUNET_STREAM_OpenCallback open_cb;
202
203   /**
204    * The current transmit handle (if a pending transmit request exists)
205    */
206   struct GNUNET_MESH_TransmitHandle *transmit_handle;
207
208   /**
209    * The current message associated with the transmit handle
210    */
211   struct MessageQueue *queue_head;
212
213   /**
214    * The queue tail, should always point to the last message in queue
215    */
216   struct MessageQueue *queue_tail;
217
218   /**
219    * The write IO_handle associated with this socket
220    */
221   struct GNUNET_STREAM_IOWriteHandle *write_handle;
222
223   /**
224    * The read IO_handle associated with this socket
225    */
226   struct GNUNET_STREAM_IOReadHandle *read_handle;
227
228   /**
229    * Buffer for storing received messages
230    */
231   void *receive_buffer;
232
233   /**
234    * The listen socket from which this socket is derived. Should be NULL if it
235    * is not a derived socket
236    */
237   struct GNUNET_STREAM_ListenSocket *lsocket;
238
239   /**
240    * Task identifier for the read io timeout task
241    */
242   GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task;
243
244   /**
245    * Task identifier for retransmission task after timeout
246    */
247   GNUNET_SCHEDULER_TaskIdentifier retransmission_timeout_task_id;
248
249   /**
250    * The task for sending timely Acks
251    */
252   GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
253
254   /**
255    * Task scheduled to continue a read operation.
256    */
257   GNUNET_SCHEDULER_TaskIdentifier read_task_id;
258
259   /**
260    * The state of the protocol associated with this socket
261    */
262   enum State state;
263
264   /**
265    * The status of the socket
266    */
267   enum GNUNET_STREAM_Status status;
268
269   /**
270    * The number of previous timeouts; FIXME: currently not used
271    */
272   unsigned int retries;
273
274   /**
275    * The peer identity of the peer at the other end of the stream
276    */
277   GNUNET_PEER_Id other_peer;
278
279   /**
280    * Our Peer Identity (for debugging)
281    */
282   GNUNET_PEER_Id our_id;
283
284   /**
285    * The application port number (type: uint32_t)
286    */
287   GNUNET_MESH_ApplicationType app_port;
288
289   /**
290    * The session id associated with this stream connection
291    * FIXME: Not used currently, may be removed
292    */
293   uint32_t session_id;
294
295   /**
296    * Write sequence number. Set to random when sending HELLO(client) and
297    * HELLO_ACK(server) 
298    */
299   uint32_t write_sequence_number;
300
301   /**
302    * Read sequence number. This number's value is determined during handshake
303    */
304   uint32_t read_sequence_number;
305
306   /**
307    * The receiver buffer size
308    */
309   uint32_t receive_buffer_size;
310
311   /**
312    * The receiver buffer boundaries
313    */
314   uint32_t receive_buffer_boundaries[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
315
316   /**
317    * receiver's available buffer after the last acknowledged packet
318    */
319   uint32_t receiver_window_available;
320
321   /**
322    * The offset pointer used during write operation
323    */
324   uint32_t write_offset;
325
326   /**
327    * The offset after which we are expecting data
328    */
329   uint32_t read_offset;
330
331   /**
332    * The offset upto which user has read from the received buffer
333    */
334   uint32_t copy_offset;
335 };
336
337
338 /**
339  * A socket for listening
340  */
341 struct GNUNET_STREAM_ListenSocket
342 {
343   /**
344    * The mesh handle
345    */
346   struct GNUNET_MESH_Handle *mesh;
347
348   /**
349    * The callback function which is called after successful opening socket
350    */
351   GNUNET_STREAM_ListenCallback listen_cb;
352
353   /**
354    * The call back closure
355    */
356   void *listen_cb_cls;
357
358   /**
359    * Our interned Peer's identity
360    */
361   GNUNET_PEER_Id our_id;
362
363   /**
364    * The service port
365    * FIXME: Remove if not required!
366    */
367   GNUNET_MESH_ApplicationType port;
368 };
369
370
371 /**
372  * The IO Write Handle
373  */
374 struct GNUNET_STREAM_IOWriteHandle
375 {
376   /**
377    * The packet_buffers associated with this Handle
378    */
379   struct GNUNET_STREAM_DataMessage *messages[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
380
381   /**
382    * The write continuation callback
383    */
384   GNUNET_STREAM_CompletionContinuation write_cont;
385
386   /**
387    * Write continuation closure
388    */
389   void *write_cont_cls;
390
391   /**
392    * The bitmap of this IOHandle; Corresponding bit for a message is set when
393    * it has been acknowledged by the receiver
394    */
395   GNUNET_STREAM_AckBitmap ack_bitmap;
396
397   /**
398    * Number of bytes in this write handle
399    */
400   size_t size;
401
402   /**
403    * Number of packets sent before waiting for an ack
404    *
405    * FIXME: Do we need this?
406    */
407   unsigned int sent_packets;
408 };
409
410
411 /**
412  * The IO Read Handle
413  */
414 struct GNUNET_STREAM_IOReadHandle
415 {
416   /**
417    * Callback for the read processor
418    */
419   GNUNET_STREAM_DataProcessor proc;
420
421   /**
422    * The closure pointer for the read processor callback
423    */
424   void *proc_cls;
425 };
426
427
428 /**
429  * Default value in seconds for various timeouts
430  */
431 static unsigned int default_timeout = 10;
432
433 /**
434  * Callback function for sending hello message
435  *
436  * @param cls closure the socket
437  * @param size number of bytes available in buf
438  * @param buf where the callee should write the message
439  * @return number of bytes written to buf
440  */
441 static size_t
442 send_message_notify (void *cls, size_t size, void *buf)
443 {
444   struct GNUNET_STREAM_Socket *socket = cls;
445   struct GNUNET_PeerIdentity target;
446   struct MessageQueue *head;
447   size_t ret;
448
449   socket->transmit_handle = NULL; /* Remove the transmit handle */
450   head = socket->queue_head;
451   if (NULL == head)
452     return 0; /* just to be safe */
453   GNUNET_PEER_resolve (socket->other_peer, &target);
454   if (0 == size)                /* request timed out */
455     {
456       socket->retries++;
457       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
458                   "Message sending timed out. Retry %d \n",
459                   socket->retries);
460       socket->transmit_handle = 
461         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
462                                            0, /* Corking */
463                                            1, /* Priority */
464                                            /* FIXME: exponential backoff */
465                                            socket->retransmit_timeout,
466                                            &target,
467                                            ntohs (head->message->header.size),
468                                            &send_message_notify,
469                                            socket);
470       return 0;
471     }
472
473   ret = ntohs (head->message->header.size);
474   GNUNET_assert (size >= ret);
475   memcpy (buf, head->message, ret);
476   if (NULL != head->finish_cb)
477     {
478       head->finish_cb (head->finish_cb_cls, socket);
479     }
480   GNUNET_CONTAINER_DLL_remove (socket->queue_head,
481                                socket->queue_tail,
482                                head);
483   GNUNET_free (head->message);
484   GNUNET_free (head);
485   head = socket->queue_head;
486   if (NULL != head)    /* more pending messages to send */
487     {
488       socket->retries = 0;
489       socket->transmit_handle = 
490         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
491                                            0, /* Corking */
492                                            1, /* Priority */
493                                            /* FIXME: exponential backoff */
494                                            socket->retransmit_timeout,
495                                            &target,
496                                            ntohs (head->message->header.size),
497                                            &send_message_notify,
498                                            socket);
499     }
500   return ret;
501 }
502
503
504 /**
505  * Queues a message for sending using the mesh connection of a socket
506  *
507  * @param socket the socket whose mesh connection is used
508  * @param message the message to be sent
509  * @param finish_cb the callback to be called when the message is sent
510  * @param finish_cb_cls the closure for the callback
511  */
512 static void
513 queue_message (struct GNUNET_STREAM_Socket *socket,
514                struct GNUNET_STREAM_MessageHeader *message,
515                SendFinishCallback finish_cb,
516                void *finish_cb_cls)
517 {
518   struct MessageQueue *queue_entity;
519   struct GNUNET_PeerIdentity target;
520
521   GNUNET_assert 
522     ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA)
523      && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK));
524
525   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
526               "%x: Queueing message of type %d and size %d\n",
527               socket->our_id,
528               ntohs (message->header.type),
529               ntohs (message->header.size));
530   GNUNET_assert (NULL != message);
531   queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
532   queue_entity->message = message;
533   queue_entity->finish_cb = finish_cb;
534   queue_entity->finish_cb_cls = finish_cb_cls;
535   GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
536                                     socket->queue_tail,
537                                     queue_entity);
538   if (NULL == socket->transmit_handle)
539   {
540     socket->retries = 0;
541     GNUNET_PEER_resolve (socket->other_peer, &target);
542     socket->transmit_handle = 
543       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
544                                          0, /* Corking */
545                                          1, /* Priority */
546                                          socket->retransmit_timeout,
547                                          &target,
548                                          ntohs (message->header.size),
549                                          &send_message_notify,
550                                          socket);
551   }
552 }
553
554
555 /**
556  * Copies a message and queues it for sending using the mesh connection of
557  * given socket 
558  *
559  * @param socket the socket whose mesh connection is used
560  * @param message the message to be sent
561  * @param finish_cb the callback to be called when the message is sent
562  * @param finish_cb_cls the closure for the callback
563  */
564 static void
565 copy_and_queue_message (struct GNUNET_STREAM_Socket *socket,
566                         const struct GNUNET_STREAM_MessageHeader *message,
567                         SendFinishCallback finish_cb,
568                         void *finish_cb_cls)
569 {
570   struct GNUNET_STREAM_MessageHeader *msg_copy;
571   uint16_t size;
572   
573   size = ntohs (message->header.size);
574   msg_copy = GNUNET_malloc (size);
575   memcpy (msg_copy, message, size);
576   queue_message (socket, msg_copy, finish_cb, finish_cb_cls);
577 }
578
579
580 /**
581  * Callback function for sending ack message
582  *
583  * @param cls closure the ACK message created in ack_task
584  * @param size number of bytes available in buffer
585  * @param buf where the callee should write the message
586  * @return number of bytes written to buf
587  */
588 static size_t
589 send_ack_notify (void *cls, size_t size, void *buf)
590 {
591   struct GNUNET_STREAM_AckMessage *ack_msg = cls;
592
593   if (0 == size)
594     {
595       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
596                   "%s called with size 0\n", __func__);
597       return 0;
598     }
599   GNUNET_assert (ntohs (ack_msg->header.header.size) <= size);
600   
601   size = ntohs (ack_msg->header.header.size);
602   memcpy (buf, ack_msg, size);
603   return size;
604 }
605
606 /**
607  * Writes data using the given socket. The amount of data written is limited by
608  * the receiver_window_size
609  *
610  * @param socket the socket to use
611  */
612 static void 
613 write_data (struct GNUNET_STREAM_Socket *socket);
614
615 /**
616  * Task for retransmitting data messages if they aren't ACK before their ack
617  * deadline 
618  *
619  * @param cls the socket
620  * @param tc the Task context
621  */
622 static void
623 retransmission_timeout_task (void *cls,
624                              const struct GNUNET_SCHEDULER_TaskContext *tc)
625 {
626   struct GNUNET_STREAM_Socket *socket = cls;
627   
628   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
629     return;
630
631   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
632               "%x: Retransmitting DATA...\n", socket->our_id);
633   socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
634   write_data (socket);
635 }
636
637
638 /**
639  * Task for sending ACK message
640  *
641  * @param cls the socket
642  * @param tc the Task context
643  */
644 static void
645 ack_task (void *cls,
646           const struct GNUNET_SCHEDULER_TaskContext *tc)
647 {
648   struct GNUNET_STREAM_Socket *socket = cls;
649   struct GNUNET_STREAM_AckMessage *ack_msg;
650   struct GNUNET_PeerIdentity target;
651
652   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
653     {
654       return;
655     }
656
657   socket->ack_task_id = 0;
658
659   /* Create the ACK Message */
660   ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
661   ack_msg->header.header.size = htons (sizeof (struct 
662                                                GNUNET_STREAM_AckMessage));
663   ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
664   ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
665   ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
666   ack_msg->receive_window_remaining = 
667     htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
668
669   GNUNET_PEER_resolve (socket->other_peer, &target);
670   /* Request MESH for sending ACK */
671   GNUNET_MESH_notify_transmit_ready (socket->tunnel,
672                                      0, /* Corking */
673                                      1, /* Priority */
674                                      socket->retransmit_timeout,
675                                      &target,
676                                      ntohs (ack_msg->header.header.size),
677                                      &send_ack_notify,
678                                      ack_msg);
679
680   
681 }
682
683
684 /**
685  * Function to modify a bit in GNUNET_STREAM_AckBitmap
686  *
687  * @param bitmap the bitmap to modify
688  * @param bit the bit number to modify
689  * @param value GNUNET_YES to on, GNUNET_NO to off
690  */
691 static void
692 ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
693                       unsigned int bit, 
694                       int value)
695 {
696   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
697   if (GNUNET_YES == value)
698     *bitmap |= (1LL << bit);
699   else
700     *bitmap &= ~(1LL << bit);
701 }
702
703
704 /**
705  * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
706  *
707  * @param bitmap address of the bitmap that has to be checked
708  * @param bit the bit number to check
709  * @return GNUNET_YES if the bit is set; GNUNET_NO if not
710  */
711 static uint8_t
712 ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
713                       unsigned int bit)
714 {
715   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
716   return 0 != (*bitmap & (1LL << bit));
717 }
718
719
720
721 /**
722  * Function called when Data Message is sent
723  *
724  * @param cls the io_handle corresponding to the Data Message
725  * @param socket the socket which was used
726  */
727 static void
728 write_data_finish_cb (void *cls,
729                       struct GNUNET_STREAM_Socket *socket)
730 {
731   struct GNUNET_STREAM_IOWriteHandle *io_handle = cls;
732
733   io_handle->sent_packets++;
734 }
735
736
737 /**
738  * Writes data using the given socket. The amount of data written is limited by
739  * the receiver_window_size
740  *
741  * @param socket the socket to use
742  */
743 static void 
744 write_data (struct GNUNET_STREAM_Socket *socket)
745 {
746   struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
747   int packet;                   /* Although an int, should never be negative */
748   int ack_packet;
749
750   ack_packet = -1;
751   /* Find the last acknowledged packet */
752   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
753     {      
754       if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
755                                               packet))
756         ack_packet = packet;        
757       else if (NULL == io_handle->messages[packet])
758         break;
759     }
760   /* Resend packets which weren't ack'ed */
761   for (packet=0; packet < ack_packet; packet++)
762     {
763       if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
764                                              packet))
765         {
766           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
767                       "%x: Placing DATA message with sequence %u in send queue\n",
768                       socket->our_id,
769                       ntohl (io_handle->messages[packet]->sequence_number));
770
771           copy_and_queue_message (socket,
772                                   &io_handle->messages[packet]->header,
773                                   NULL,
774                                   NULL);
775         }
776     }
777   packet = ack_packet + 1;
778   /* Now send new packets if there is enough buffer space */
779   while ( (NULL != io_handle->messages[packet]) &&
780           (socket->receiver_window_available >= ntohs (io_handle->messages[packet]->header.header.size)) )
781     {
782       socket->receiver_window_available -= 
783         ntohs (io_handle->messages[packet]->header.header.size);
784       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
785                   "%x: Placing DATA message with sequence %u in send queue\n",
786                   socket->our_id,
787                   ntohl (io_handle->messages[packet]->sequence_number));
788       copy_and_queue_message (socket,
789                               &io_handle->messages[packet]->header,
790                               &write_data_finish_cb,
791                               io_handle);
792       packet++;
793     }
794
795   if (GNUNET_SCHEDULER_NO_TASK == socket->retransmission_timeout_task_id)
796     socket->retransmission_timeout_task_id = 
797       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 
798                                     (GNUNET_TIME_UNIT_SECONDS, 5),
799                                     &retransmission_timeout_task,
800                                     socket);
801 }
802
803
804 /**
805  * Task for calling the read processor
806  *
807  * @param cls the socket
808  * @param tc the task context
809  */
810 static void
811 call_read_processor (void *cls,
812                           const struct GNUNET_SCHEDULER_TaskContext *tc)
813 {
814   struct GNUNET_STREAM_Socket *socket = cls;
815   size_t read_size;
816   size_t valid_read_size;
817   unsigned int packet;
818   uint32_t sequence_increase;
819   uint32_t offset_increase;
820
821   socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
822   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
823     return;
824
825   GNUNET_assert (NULL != socket->read_handle);
826   GNUNET_assert (NULL != socket->read_handle->proc);
827
828   /* Check the bitmap for any holes */
829   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
830     {
831       if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
832                                              packet))
833         break;
834     }
835   /* We only call read processor if we have the first packet */
836   GNUNET_assert (0 < packet);
837
838   valid_read_size = 
839     socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
840
841   GNUNET_assert (0 != valid_read_size);
842
843   /* Cancel the read_io_timeout_task */
844   GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task);
845   socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK;
846
847   /* Call the data processor */
848   read_size = 
849     socket->read_handle->proc (socket->read_handle->proc_cls,
850                                socket->status,
851                                socket->receive_buffer + socket->copy_offset,
852                                valid_read_size);
853   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
854               "%x: Read processor completed successfully\n",
855               socket->our_id);
856
857   /* Free the read handle */
858   GNUNET_free (socket->read_handle);
859   socket->read_handle = NULL;
860
861   GNUNET_assert (read_size <= valid_read_size);
862   socket->copy_offset += read_size;
863
864   /* Determine upto which packet we can remove from the buffer */
865   for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
866     if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
867       break;
868
869   /* If no packets can be removed we can't move the buffer */
870   if (0 == packet) return;
871
872   sequence_increase = packet;
873
874   /* Shift the data in the receive buffer */
875   memmove (socket->receive_buffer,
876            socket->receive_buffer 
877            + socket->receive_buffer_boundaries[sequence_increase-1],
878            socket->receive_buffer_size - socket->receive_buffer_boundaries[sequence_increase-1]);
879   
880   /* Shift the bitmap */
881   socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
882   
883   /* Set read_sequence_number */
884   socket->read_sequence_number += sequence_increase;
885   
886   /* Set read_offset */
887   offset_increase = socket->receive_buffer_boundaries[sequence_increase-1];
888   socket->read_offset += offset_increase;
889
890   /* Fix copy_offset */
891   GNUNET_assert (offset_increase <= socket->copy_offset);
892   socket->copy_offset -= offset_increase;
893   
894   /* Fix relative boundaries */
895   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
896     {
897       if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase)
898         {
899           socket->receive_buffer_boundaries[packet] = 
900             socket->receive_buffer_boundaries[packet + sequence_increase] 
901             - offset_increase;
902         }
903       else
904         socket->receive_buffer_boundaries[packet] = 0;
905     }
906 }
907
908
909 /**
910  * Cancels the existing read io handle
911  *
912  * @param cls the closure from the SCHEDULER call
913  * @param tc the task context
914  */
915 static void
916 read_io_timeout (void *cls, 
917                 const struct GNUNET_SCHEDULER_TaskContext *tc)
918 {
919   struct GNUNET_STREAM_Socket *socket = cls;
920
921   socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK;
922   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
923   {
924     GNUNET_SCHEDULER_cancel (socket->read_task_id);
925     socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
926   }
927   GNUNET_assert (NULL != socket->read_handle);
928   
929   GNUNET_free (socket->read_handle);
930   socket->read_handle = NULL;
931 }
932
933
934 /**
935  * Handler for DATA messages; Same for both client and server
936  *
937  * @param socket the socket through which the ack was received
938  * @param tunnel connection to the other end
939  * @param sender who sent the message
940  * @param msg the data message
941  * @param atsi performance data for the connection
942  * @return GNUNET_OK to keep the connection open,
943  *         GNUNET_SYSERR to close it (signal serious error)
944  */
945 static int
946 handle_data (struct GNUNET_STREAM_Socket *socket,
947              struct GNUNET_MESH_Tunnel *tunnel,
948              const struct GNUNET_PeerIdentity *sender,
949              const struct GNUNET_STREAM_DataMessage *msg,
950              const struct GNUNET_ATS_Information*atsi)
951 {
952   const void *payload;
953   uint32_t bytes_needed;
954   uint32_t relative_offset;
955   uint32_t relative_sequence_number;
956   uint16_t size;
957
958   size = htons (msg->header.header.size);
959   if (size < sizeof (struct GNUNET_STREAM_DataMessage))
960     {
961       GNUNET_break_op (0);
962       return GNUNET_SYSERR;
963     }
964
965   if (GNUNET_PEER_search (sender) != socket->other_peer)
966     {
967       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
968                   "%x: Received DATA from non-confirming peer\n",
969                   socket->our_id);
970       return GNUNET_YES;
971     }
972
973   switch (socket->state)
974     {
975     case STATE_ESTABLISHED:
976     case STATE_TRANSMIT_CLOSED:
977     case STATE_TRANSMIT_CLOSE_WAIT:
978       
979       /* check if the message's sequence number is in the range we are
980          expecting */
981       relative_sequence_number = 
982         ntohl (msg->sequence_number) - socket->read_sequence_number;
983       if ( relative_sequence_number > GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
984         {
985           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
986                       "%x: Ignoring received message with sequence number %u\n",
987                       socket->our_id,
988                       ntohl (msg->sequence_number));
989           return GNUNET_YES;
990         }
991
992       /* Check if we have already seen this message */
993       if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
994                                               relative_sequence_number))
995         {
996           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
997                       "%x: Ignoring already received message with sequence "
998                       "number %u\n",
999                       socket->our_id,
1000                       ntohl (msg->sequence_number));
1001           return GNUNET_YES;
1002         }
1003
1004       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1005                   "%x: Receiving DATA with sequence number: %u and size: %d "
1006                   "from %x\n",
1007                   socket->our_id,
1008                   ntohl (msg->sequence_number),
1009                   ntohs (msg->header.header.size),
1010                   socket->other_peer);
1011
1012       /* Check if we have to allocate the buffer */
1013       size -= sizeof (struct GNUNET_STREAM_DataMessage);
1014       relative_offset = ntohl (msg->offset) - socket->read_offset;
1015       bytes_needed = relative_offset + size;
1016       if (bytes_needed > socket->receive_buffer_size)
1017         {
1018           if (bytes_needed <= RECEIVE_BUFFER_SIZE)
1019             {
1020               socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
1021                                                        bytes_needed);
1022               socket->receive_buffer_size = bytes_needed;
1023             }
1024           else
1025             {
1026               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1027                           "%x: Cannot accommodate packet %d as buffer is",
1028                           "full\n",
1029                           socket->our_id,
1030                           ntohl (msg->sequence_number));
1031               return GNUNET_YES;
1032             }
1033         }
1034       
1035       /* Copy Data to buffer */
1036       payload = &msg[1];
1037       GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
1038       memcpy (socket->receive_buffer + relative_offset,
1039               payload,
1040               size);
1041       socket->receive_buffer_boundaries[relative_sequence_number] = 
1042         relative_offset + size;
1043       
1044       /* Modify the ACK bitmap */
1045       ackbitmap_modify_bit (&socket->ack_bitmap,
1046                             relative_sequence_number,
1047                             GNUNET_YES);
1048
1049       /* Start ACK sending task if one is not already present */
1050       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1051        {
1052          socket->ack_task_id = 
1053            GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1054                                          (msg->ack_deadline),
1055                                          &ack_task,
1056                                          socket);
1057        }
1058
1059       if ((NULL != socket->read_handle) /* A read handle is waiting */
1060           /* There is no current read task */
1061           && (GNUNET_SCHEDULER_NO_TASK == socket->read_task_id)
1062           /* We have the first packet */
1063           && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap,
1064                                                  0)))
1065         {
1066           socket->read_task_id = 
1067             GNUNET_SCHEDULER_add_now (&call_read_processor,
1068                                       socket);
1069         }
1070       
1071       break;
1072
1073     default:
1074       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1075                   "%x: Received data message when it cannot be handled\n",
1076                   socket->our_id);
1077       break;
1078     }
1079   return GNUNET_YES;
1080 }
1081
1082 /**
1083  * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1084  *
1085  * @param cls the socket (set from GNUNET_MESH_connect)
1086  * @param tunnel connection to the other end
1087  * @param tunnel_ctx place to store local state associated with the tunnel
1088  * @param sender who sent the message
1089  * @param message the actual message
1090  * @param atsi performance data for the connection
1091  * @return GNUNET_OK to keep the connection open,
1092  *         GNUNET_SYSERR to close it (signal serious error)
1093  */
1094 static int
1095 client_handle_data (void *cls,
1096              struct GNUNET_MESH_Tunnel *tunnel,
1097              void **tunnel_ctx,
1098              const struct GNUNET_PeerIdentity *sender,
1099              const struct GNUNET_MessageHeader *message,
1100              const struct GNUNET_ATS_Information*atsi)
1101 {
1102   struct GNUNET_STREAM_Socket *socket = cls;
1103
1104   return handle_data (socket, 
1105                       tunnel, 
1106                       sender, 
1107                       (const struct GNUNET_STREAM_DataMessage *) message, 
1108                       atsi);
1109 }
1110
1111
1112 /**
1113  * Callback to set state to ESTABLISHED
1114  *
1115  * @param cls the closure from queue_message FIXME: document
1116  * @param socket the socket to requiring state change
1117  */
1118 static void
1119 set_state_established (void *cls,
1120                        struct GNUNET_STREAM_Socket *socket)
1121 {
1122   struct GNUNET_PeerIdentity initiator_pid;
1123
1124   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
1125               "%x: Attaining ESTABLISHED state\n",
1126               socket->our_id);
1127   socket->write_offset = 0;
1128   socket->read_offset = 0;
1129   socket->state = STATE_ESTABLISHED;
1130   /* FIXME: What if listen_cb is NULL */
1131   if (NULL != socket->lsocket)
1132     {
1133       GNUNET_PEER_resolve (socket->other_peer, &initiator_pid);
1134       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1135                   "%x: Calling listen callback\n",
1136                   socket->our_id);
1137       if (GNUNET_SYSERR == 
1138           socket->lsocket->listen_cb (socket->lsocket->listen_cb_cls,
1139                                       socket,
1140                                       &initiator_pid))
1141         {
1142           socket->state = STATE_CLOSED;
1143           /* FIXME: We should close in a decent way */
1144           GNUNET_MESH_tunnel_destroy (socket->tunnel); /* Destroy the tunnel */
1145           GNUNET_free (socket);
1146         }
1147     }
1148   else if (socket->open_cb)
1149     socket->open_cb (socket->open_cls, socket);
1150 }
1151
1152
1153 /**
1154  * Callback to set state to HELLO_WAIT
1155  *
1156  * @param cls the closure from queue_message
1157  * @param socket the socket to requiring state change
1158  */
1159 static void
1160 set_state_hello_wait (void *cls,
1161                       struct GNUNET_STREAM_Socket *socket)
1162 {
1163   GNUNET_assert (STATE_INIT == socket->state);
1164   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
1165               "%x: Attaining HELLO_WAIT state\n",
1166               socket->our_id);
1167   socket->state = STATE_HELLO_WAIT;
1168 }
1169
1170
1171 /**
1172  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1173  *
1174  * @param cls the socket (set from GNUNET_MESH_connect)
1175  * @param tunnel connection to the other end
1176  * @param tunnel_ctx this is NULL
1177  * @param sender who sent the message
1178  * @param message the actual message
1179  * @param atsi performance data for the connection
1180  * @return GNUNET_OK to keep the connection open,
1181  *         GNUNET_SYSERR to close it (signal serious error)
1182  */
1183 static int
1184 client_handle_hello_ack (void *cls,
1185                          struct GNUNET_MESH_Tunnel *tunnel,
1186                          void **tunnel_ctx,
1187                          const struct GNUNET_PeerIdentity *sender,
1188                          const struct GNUNET_MessageHeader *message,
1189                          const struct GNUNET_ATS_Information*atsi)
1190 {
1191   struct GNUNET_STREAM_Socket *socket = cls;
1192   const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
1193   struct GNUNET_STREAM_HelloAckMessage *reply;
1194
1195   if (GNUNET_PEER_search (sender) != socket->other_peer)
1196     {
1197       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1198                   "%x: Received HELLO_ACK from non-confirming peer\n",
1199                   socket->our_id);
1200       return GNUNET_YES;
1201     }
1202   ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
1203   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1204               "%x: Received HELLO_ACK from %x\n",
1205               socket->our_id,
1206               socket->other_peer);
1207
1208   GNUNET_assert (socket->tunnel == tunnel);
1209   switch (socket->state)
1210   {
1211   case STATE_HELLO_WAIT:
1212     socket->read_sequence_number = ntohl (ack_msg->sequence_number);
1213     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1214                 "%x: Read sequence number %u\n",
1215                 socket->our_id,
1216                 (unsigned int) socket->read_sequence_number);
1217     socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
1218     /* Get the random sequence number */
1219     socket->write_sequence_number = 
1220       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1221       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1222                   "%x: Generated write sequence number %u\n",
1223                   socket->our_id,
1224                   (unsigned int) socket->write_sequence_number);
1225     reply = 
1226       GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1227     reply->header.header.size = 
1228       htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1229     reply->header.header.type = 
1230       htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1231     reply->sequence_number = htonl (socket->write_sequence_number);
1232     reply->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
1233     queue_message (socket, 
1234                    &reply->header, 
1235                    &set_state_established, 
1236                    NULL);      
1237     return GNUNET_OK;
1238   case STATE_ESTABLISHED:
1239   case STATE_RECEIVE_CLOSE_WAIT:
1240     // call statistics (# ACKs ignored++)
1241     return GNUNET_OK;
1242   case STATE_INIT:
1243   default:
1244     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1245                 "%x: Server %x sent HELLO_ACK when in state %d\n", 
1246                 socket->our_id,
1247                 socket->other_peer,
1248                 socket->state);
1249     socket->state = STATE_CLOSED; // introduce STATE_ERROR?
1250     return GNUNET_SYSERR;
1251   }
1252
1253 }
1254
1255
1256 /**
1257  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1258  *
1259  * @param cls the socket (set from GNUNET_MESH_connect)
1260  * @param tunnel connection to the other end
1261  * @param tunnel_ctx this is NULL
1262  * @param sender who sent the message
1263  * @param message the actual message
1264  * @param atsi performance data for the connection
1265  * @return GNUNET_OK to keep the connection open,
1266  *         GNUNET_SYSERR to close it (signal serious error)
1267  */
1268 static int
1269 client_handle_reset (void *cls,
1270                      struct GNUNET_MESH_Tunnel *tunnel,
1271                      void **tunnel_ctx,
1272                      const struct GNUNET_PeerIdentity *sender,
1273                      const struct GNUNET_MessageHeader *message,
1274                      const struct GNUNET_ATS_Information*atsi)
1275 {
1276   struct GNUNET_STREAM_Socket *socket = cls;
1277
1278   return GNUNET_OK;
1279 }
1280
1281
1282 /**
1283  * Common message handler for handling TRANSMIT_CLOSE messages
1284  *
1285  * @param socket the socket through which the ack was received
1286  * @param tunnel connection to the other end
1287  * @param sender who sent the message
1288  * @param msg the transmit close message
1289  * @param atsi performance data for the connection
1290  * @return GNUNET_OK to keep the connection open,
1291  *         GNUNET_SYSERR to close it (signal serious error)
1292  */
1293 static int
1294 handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
1295                        struct GNUNET_MESH_Tunnel *tunnel,
1296                        const struct GNUNET_PeerIdentity *sender,
1297                        const struct GNUNET_STREAM_MessageHeader *msg,
1298                        const struct GNUNET_ATS_Information*atsi)
1299 {
1300   struct GNUNET_STREAM_MessageHeader *reply;
1301
1302   switch (socket->state)
1303     {
1304     case STATE_ESTABLISHED:
1305       socket->state = STATE_RECEIVE_CLOSED;
1306
1307       /* Send TRANSMIT_CLOSE_ACK */
1308       reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1309       reply->header.type = 
1310         htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
1311       reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1312       queue_message (socket, reply, NULL, NULL);
1313       break;
1314
1315     default:
1316       /* FIXME: Call statistics? */
1317       break;
1318     }
1319   return GNUNET_YES;
1320 }
1321
1322
1323 /**
1324  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1325  *
1326  * @param cls the socket (set from GNUNET_MESH_connect)
1327  * @param tunnel connection to the other end
1328  * @param tunnel_ctx this is NULL
1329  * @param sender who sent the message
1330  * @param message the actual message
1331  * @param atsi performance data for the connection
1332  * @return GNUNET_OK to keep the connection open,
1333  *         GNUNET_SYSERR to close it (signal serious error)
1334  */
1335 static int
1336 client_handle_transmit_close (void *cls,
1337                               struct GNUNET_MESH_Tunnel *tunnel,
1338                               void **tunnel_ctx,
1339                               const struct GNUNET_PeerIdentity *sender,
1340                               const struct GNUNET_MessageHeader *message,
1341                               const struct GNUNET_ATS_Information*atsi)
1342 {
1343   struct GNUNET_STREAM_Socket *socket = cls;
1344   
1345   return handle_transmit_close (socket,
1346                                 tunnel,
1347                                 sender,
1348                                 (struct GNUNET_STREAM_MessageHeader *)message,
1349                                 atsi);
1350 }
1351
1352
1353 /**
1354  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1355  *
1356  * @param cls the socket (set from GNUNET_MESH_connect)
1357  * @param tunnel connection to the other end
1358  * @param tunnel_ctx this is NULL
1359  * @param sender who sent the message
1360  * @param message the actual message
1361  * @param atsi performance data for the connection
1362  * @return GNUNET_OK to keep the connection open,
1363  *         GNUNET_SYSERR to close it (signal serious error)
1364  */
1365 static int
1366 client_handle_transmit_close_ack (void *cls,
1367                                   struct GNUNET_MESH_Tunnel *tunnel,
1368                                   void **tunnel_ctx,
1369                                   const struct GNUNET_PeerIdentity *sender,
1370                                   const struct GNUNET_MessageHeader *message,
1371                                   const struct GNUNET_ATS_Information*atsi)
1372 {
1373   struct GNUNET_STREAM_Socket *socket = cls;
1374
1375   return GNUNET_OK;
1376 }
1377
1378
1379 /**
1380  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1381  *
1382  * @param cls the socket (set from GNUNET_MESH_connect)
1383  * @param tunnel connection to the other end
1384  * @param tunnel_ctx this is NULL
1385  * @param sender who sent the message
1386  * @param message the actual message
1387  * @param atsi performance data for the connection
1388  * @return GNUNET_OK to keep the connection open,
1389  *         GNUNET_SYSERR to close it (signal serious error)
1390  */
1391 static int
1392 client_handle_receive_close (void *cls,
1393                              struct GNUNET_MESH_Tunnel *tunnel,
1394                              void **tunnel_ctx,
1395                              const struct GNUNET_PeerIdentity *sender,
1396                              const struct GNUNET_MessageHeader *message,
1397                              const struct GNUNET_ATS_Information*atsi)
1398 {
1399   struct GNUNET_STREAM_Socket *socket = cls;
1400
1401   return GNUNET_OK;
1402 }
1403
1404
1405 /**
1406  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1407  *
1408  * @param cls the socket (set from GNUNET_MESH_connect)
1409  * @param tunnel connection to the other end
1410  * @param tunnel_ctx this is NULL
1411  * @param sender who sent the message
1412  * @param message the actual message
1413  * @param atsi performance data for the connection
1414  * @return GNUNET_OK to keep the connection open,
1415  *         GNUNET_SYSERR to close it (signal serious error)
1416  */
1417 static int
1418 client_handle_receive_close_ack (void *cls,
1419                                  struct GNUNET_MESH_Tunnel *tunnel,
1420                                  void **tunnel_ctx,
1421                                  const struct GNUNET_PeerIdentity *sender,
1422                                  const struct GNUNET_MessageHeader *message,
1423                                  const struct GNUNET_ATS_Information*atsi)
1424 {
1425   struct GNUNET_STREAM_Socket *socket = cls;
1426
1427   return GNUNET_OK;
1428 }
1429
1430
1431 /**
1432  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1433  *
1434  * @param cls the socket (set from GNUNET_MESH_connect)
1435  * @param tunnel connection to the other end
1436  * @param tunnel_ctx this is NULL
1437  * @param sender who sent the message
1438  * @param message the actual message
1439  * @param atsi performance data for the connection
1440  * @return GNUNET_OK to keep the connection open,
1441  *         GNUNET_SYSERR to close it (signal serious error)
1442  */
1443 static int
1444 client_handle_close (void *cls,
1445                      struct GNUNET_MESH_Tunnel *tunnel,
1446                      void **tunnel_ctx,
1447                      const struct GNUNET_PeerIdentity *sender,
1448                      const struct GNUNET_MessageHeader *message,
1449                      const struct GNUNET_ATS_Information*atsi)
1450 {
1451   struct GNUNET_STREAM_Socket *socket = cls;
1452
1453   return GNUNET_OK;
1454 }
1455
1456
1457 /**
1458  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1459  *
1460  * @param cls the socket (set from GNUNET_MESH_connect)
1461  * @param tunnel connection to the other end
1462  * @param tunnel_ctx this is NULL
1463  * @param sender who sent the message
1464  * @param message the actual message
1465  * @param atsi performance data for the connection
1466  * @return GNUNET_OK to keep the connection open,
1467  *         GNUNET_SYSERR to close it (signal serious error)
1468  */
1469 static int
1470 client_handle_close_ack (void *cls,
1471                          struct GNUNET_MESH_Tunnel *tunnel,
1472                          void **tunnel_ctx,
1473                          const struct GNUNET_PeerIdentity *sender,
1474                          const struct GNUNET_MessageHeader *message,
1475                          const struct GNUNET_ATS_Information*atsi)
1476 {
1477   struct GNUNET_STREAM_Socket *socket = cls;
1478
1479   return GNUNET_OK;
1480 }
1481
1482 /*****************************/
1483 /* Server's Message Handlers */
1484 /*****************************/
1485
1486 /**
1487  * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1488  *
1489  * @param cls the closure
1490  * @param tunnel connection to the other end
1491  * @param tunnel_ctx the socket
1492  * @param sender who sent the message
1493  * @param message the actual message
1494  * @param atsi performance data for the connection
1495  * @return GNUNET_OK to keep the connection open,
1496  *         GNUNET_SYSERR to close it (signal serious error)
1497  */
1498 static int
1499 server_handle_data (void *cls,
1500                     struct GNUNET_MESH_Tunnel *tunnel,
1501                     void **tunnel_ctx,
1502                     const struct GNUNET_PeerIdentity *sender,
1503                     const struct GNUNET_MessageHeader *message,
1504                     const struct GNUNET_ATS_Information*atsi)
1505 {
1506   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1507
1508   return handle_data (socket,
1509                       tunnel,
1510                       sender,
1511                       (const struct GNUNET_STREAM_DataMessage *)message,
1512                       atsi);
1513 }
1514
1515
1516 /**
1517  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
1518  *
1519  * @param cls the closure
1520  * @param tunnel connection to the other end
1521  * @param tunnel_ctx the socket
1522  * @param sender who sent the message
1523  * @param message the actual message
1524  * @param atsi performance data for the connection
1525  * @return GNUNET_OK to keep the connection open,
1526  *         GNUNET_SYSERR to close it (signal serious error)
1527  */
1528 static int
1529 server_handle_hello (void *cls,
1530                      struct GNUNET_MESH_Tunnel *tunnel,
1531                      void **tunnel_ctx,
1532                      const struct GNUNET_PeerIdentity *sender,
1533                      const struct GNUNET_MessageHeader *message,
1534                      const struct GNUNET_ATS_Information*atsi)
1535 {
1536   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1537   struct GNUNET_STREAM_HelloAckMessage *reply;
1538
1539   if (GNUNET_PEER_search (sender) != socket->other_peer)
1540     {
1541       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1542                   "%x: Received HELLO from non-confirming peer\n",
1543                   socket->our_id);
1544       return GNUNET_YES;
1545     }
1546
1547   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == 
1548                  ntohs (message->type));
1549   GNUNET_assert (socket->tunnel == tunnel);
1550   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1551               "%x: Received HELLO from %x\n", 
1552               socket->our_id,
1553               socket->other_peer);
1554
1555   if (STATE_INIT == socket->state)
1556     {
1557       /* Get the random sequence number */
1558       socket->write_sequence_number = 
1559         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1560       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1561                   "%x: Generated write sequence number %u\n",
1562                   socket->our_id,
1563                   (unsigned int) socket->write_sequence_number);
1564       reply = 
1565         GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1566       reply->header.header.size = 
1567         htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1568       reply->header.header.type = 
1569         htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1570       reply->sequence_number = htonl (socket->write_sequence_number);
1571       reply->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
1572       queue_message (socket, 
1573                      &reply->header,
1574                      &set_state_hello_wait, 
1575                      NULL);
1576     }
1577   else
1578     {
1579       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1580                   "Client sent HELLO when in state %d\n", socket->state);
1581       /* FIXME: Send RESET? */
1582       
1583     }
1584   return GNUNET_OK;
1585 }
1586
1587
1588 /**
1589  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1590  *
1591  * @param cls the closure
1592  * @param tunnel connection to the other end
1593  * @param tunnel_ctx the socket
1594  * @param sender who sent the message
1595  * @param message the actual message
1596  * @param atsi performance data for the connection
1597  * @return GNUNET_OK to keep the connection open,
1598  *         GNUNET_SYSERR to close it (signal serious error)
1599  */
1600 static int
1601 server_handle_hello_ack (void *cls,
1602                          struct GNUNET_MESH_Tunnel *tunnel,
1603                          void **tunnel_ctx,
1604                          const struct GNUNET_PeerIdentity *sender,
1605                          const struct GNUNET_MessageHeader *message,
1606                          const struct GNUNET_ATS_Information*atsi)
1607 {
1608   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1609   const struct GNUNET_STREAM_HelloAckMessage *ack_message;
1610
1611   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK ==
1612                  ntohs (message->type));
1613   GNUNET_assert (socket->tunnel == tunnel);
1614   ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
1615   if (STATE_HELLO_WAIT == socket->state)
1616     {
1617       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1618                   "%x: Received HELLO_ACK from %x\n",
1619                   socket->our_id,
1620                   socket->other_peer);
1621       socket->read_sequence_number = ntohl (ack_message->sequence_number);
1622       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1623                   "%x: Read sequence number %u\n",
1624                   socket->our_id,
1625                   (unsigned int) socket->read_sequence_number);
1626       socket->receiver_window_available = 
1627         ntohl (ack_message->receiver_window_size);
1628       /* Attain ESTABLISHED state */
1629       set_state_established (NULL, socket);
1630     }
1631   else
1632     {
1633       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1634                   "Client sent HELLO_ACK when in state %d\n", socket->state);
1635       /* FIXME: Send RESET? */
1636       
1637     }
1638   return GNUNET_OK;
1639 }
1640
1641
1642 /**
1643  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1644  *
1645  * @param cls the closure
1646  * @param tunnel connection to the other end
1647  * @param tunnel_ctx the socket
1648  * @param sender who sent the message
1649  * @param message the actual message
1650  * @param atsi performance data for the connection
1651  * @return GNUNET_OK to keep the connection open,
1652  *         GNUNET_SYSERR to close it (signal serious error)
1653  */
1654 static int
1655 server_handle_reset (void *cls,
1656                      struct GNUNET_MESH_Tunnel *tunnel,
1657                      void **tunnel_ctx,
1658                      const struct GNUNET_PeerIdentity *sender,
1659                      const struct GNUNET_MessageHeader *message,
1660                      const struct GNUNET_ATS_Information*atsi)
1661 {
1662   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1663
1664   return GNUNET_OK;
1665 }
1666
1667
1668 /**
1669  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1670  *
1671  * @param cls the closure
1672  * @param tunnel connection to the other end
1673  * @param tunnel_ctx the socket
1674  * @param sender who sent the message
1675  * @param message the actual message
1676  * @param atsi performance data for the connection
1677  * @return GNUNET_OK to keep the connection open,
1678  *         GNUNET_SYSERR to close it (signal serious error)
1679  */
1680 static int
1681 server_handle_transmit_close (void *cls,
1682                               struct GNUNET_MESH_Tunnel *tunnel,
1683                               void **tunnel_ctx,
1684                               const struct GNUNET_PeerIdentity *sender,
1685                               const struct GNUNET_MessageHeader *message,
1686                               const struct GNUNET_ATS_Information*atsi)
1687 {
1688   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1689
1690   return handle_transmit_close (socket,
1691                                 tunnel,
1692                                 sender,
1693                                 (struct GNUNET_STREAM_MessageHeader *)message,
1694                                 atsi);
1695 }
1696
1697
1698 /**
1699  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1700  *
1701  * @param cls the closure
1702  * @param tunnel connection to the other end
1703  * @param tunnel_ctx the socket
1704  * @param sender who sent the message
1705  * @param message the actual message
1706  * @param atsi performance data for the connection
1707  * @return GNUNET_OK to keep the connection open,
1708  *         GNUNET_SYSERR to close it (signal serious error)
1709  */
1710 static int
1711 server_handle_transmit_close_ack (void *cls,
1712                                   struct GNUNET_MESH_Tunnel *tunnel,
1713                                   void **tunnel_ctx,
1714                                   const struct GNUNET_PeerIdentity *sender,
1715                                   const struct GNUNET_MessageHeader *message,
1716                                   const struct GNUNET_ATS_Information*atsi)
1717 {
1718   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1719
1720   return GNUNET_OK;
1721 }
1722
1723
1724 /**
1725  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1726  *
1727  * @param cls the closure
1728  * @param tunnel connection to the other end
1729  * @param tunnel_ctx the socket
1730  * @param sender who sent the message
1731  * @param message the actual message
1732  * @param atsi performance data for the connection
1733  * @return GNUNET_OK to keep the connection open,
1734  *         GNUNET_SYSERR to close it (signal serious error)
1735  */
1736 static int
1737 server_handle_receive_close (void *cls,
1738                              struct GNUNET_MESH_Tunnel *tunnel,
1739                              void **tunnel_ctx,
1740                              const struct GNUNET_PeerIdentity *sender,
1741                              const struct GNUNET_MessageHeader *message,
1742                              const struct GNUNET_ATS_Information*atsi)
1743 {
1744   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1745
1746   return GNUNET_OK;
1747 }
1748
1749
1750 /**
1751  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1752  *
1753  * @param cls the closure
1754  * @param tunnel connection to the other end
1755  * @param tunnel_ctx the socket
1756  * @param sender who sent the message
1757  * @param message the actual message
1758  * @param atsi performance data for the connection
1759  * @return GNUNET_OK to keep the connection open,
1760  *         GNUNET_SYSERR to close it (signal serious error)
1761  */
1762 static int
1763 server_handle_receive_close_ack (void *cls,
1764                                  struct GNUNET_MESH_Tunnel *tunnel,
1765                                  void **tunnel_ctx,
1766                                  const struct GNUNET_PeerIdentity *sender,
1767                                  const struct GNUNET_MessageHeader *message,
1768                                  const struct GNUNET_ATS_Information*atsi)
1769 {
1770   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1771
1772   return GNUNET_OK;
1773 }
1774
1775
1776 /**
1777  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1778  *
1779  * @param cls the closure
1780  * @param tunnel connection to the other end
1781  * @param tunnel_ctx the socket
1782  * @param sender who sent the message
1783  * @param message the actual message
1784  * @param atsi performance data for the connection
1785  * @return GNUNET_OK to keep the connection open,
1786  *         GNUNET_SYSERR to close it (signal serious error)
1787  */
1788 static int
1789 server_handle_close (void *cls,
1790                      struct GNUNET_MESH_Tunnel *tunnel,
1791                      void **tunnel_ctx,
1792                      const struct GNUNET_PeerIdentity *sender,
1793                      const struct GNUNET_MessageHeader *message,
1794                      const struct GNUNET_ATS_Information*atsi)
1795 {
1796   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1797
1798   return GNUNET_OK;
1799 }
1800
1801
1802 /**
1803  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1804  *
1805  * @param cls the closure
1806  * @param tunnel connection to the other end
1807  * @param tunnel_ctx the socket
1808  * @param sender who sent the message
1809  * @param message the actual message
1810  * @param atsi performance data for the connection
1811  * @return GNUNET_OK to keep the connection open,
1812  *         GNUNET_SYSERR to close it (signal serious error)
1813  */
1814 static int
1815 server_handle_close_ack (void *cls,
1816                          struct GNUNET_MESH_Tunnel *tunnel,
1817                          void **tunnel_ctx,
1818                          const struct GNUNET_PeerIdentity *sender,
1819                          const struct GNUNET_MessageHeader *message,
1820                          const struct GNUNET_ATS_Information*atsi)
1821 {
1822   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1823
1824   return GNUNET_OK;
1825 }
1826
1827
1828 /**
1829  * Message Handler for mesh
1830  *
1831  * @param socket the socket through which the ack was received
1832  * @param tunnel connection to the other end
1833  * @param sender who sent the message
1834  * @param ack the acknowledgment message
1835  * @param atsi performance data for the connection
1836  * @return GNUNET_OK to keep the connection open,
1837  *         GNUNET_SYSERR to close it (signal serious error)
1838  */
1839 static int
1840 handle_ack (struct GNUNET_STREAM_Socket *socket,
1841             struct GNUNET_MESH_Tunnel *tunnel,
1842             const struct GNUNET_PeerIdentity *sender,
1843             const struct GNUNET_STREAM_AckMessage *ack,
1844             const struct GNUNET_ATS_Information*atsi)
1845 {
1846   unsigned int packet;
1847   int need_retransmission;
1848
1849   if (GNUNET_PEER_search (sender) != socket->other_peer)
1850     {
1851       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1852                   "%x: Received ACK from non-confirming peer\n",
1853                   socket->our_id);
1854       return GNUNET_YES;
1855     }
1856
1857   switch (socket->state)
1858     {
1859     case (STATE_ESTABLISHED):
1860       if (NULL == socket->write_handle)
1861         {
1862           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1863                       "%x: Received DATA_ACK when write_handle is NULL\n",
1864                       socket->our_id);
1865           return GNUNET_OK;
1866         }
1867       
1868       if (!((socket->write_sequence_number 
1869              - htonl (ack->base_sequence_number)) < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
1870         {
1871           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1872                       "%x: Received DATA_ACK with unexpected base sequence",
1873                       "number\n",
1874                       socket->our_id);
1875           return GNUNET_OK;
1876         }
1877       /* FIXME: include the case when write_handle is cancelled - ignore the 
1878          acks */
1879
1880       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1881                   "%x: Received DATA_ACK from %x\n",
1882                   socket->our_id,
1883                   socket->other_peer);
1884       
1885       /* Cancel the retransmission task */
1886       if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
1887         {
1888           GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
1889           socket->retransmission_timeout_task_id = 
1890             GNUNET_SCHEDULER_NO_TASK;
1891         }
1892
1893       /* FIXME: Bits in the ack_bitmap are only to be set; Once set they cannot
1894          be unset */
1895       socket->write_handle->ack_bitmap = GNUNET_ntohll (ack->bitmap);
1896       socket->receiver_window_available = 
1897         ntohl (ack->receive_window_remaining);
1898
1899       /* Check if we have received all acknowledgements */
1900       need_retransmission = GNUNET_NO;
1901       for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
1902         {
1903           if (NULL == socket->write_handle->messages[packet]) break;
1904           if (GNUNET_YES != ackbitmap_is_bit_set 
1905               (&socket->write_handle->ack_bitmap,packet))
1906             {
1907               need_retransmission = GNUNET_YES;
1908               break;
1909             }
1910         }
1911       if (GNUNET_YES == need_retransmission)
1912         {
1913           write_data (socket);
1914         }
1915       else      /* We have to call the write continuation callback now */
1916         {
1917           /* Free the packets */
1918           for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
1919             {
1920               GNUNET_free_non_null (socket->write_handle->messages[packet]);
1921             }
1922           if (NULL != socket->write_handle->write_cont)
1923             socket->write_handle->write_cont
1924               (socket->write_handle->write_cont_cls,
1925                socket->status,
1926                socket->write_handle->size);
1927           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1928                       "%x: Write completion callback completed\n",
1929                       socket->our_id);
1930           /* We are done with the write handle - Freeing it */
1931           GNUNET_free (socket->write_handle);
1932           socket->write_handle = NULL;
1933         }
1934       break;
1935     default:
1936       break;
1937     }
1938   return GNUNET_OK;
1939 }
1940
1941
1942 /**
1943  * Message Handler for mesh
1944  *
1945  * @param cls the 'struct GNUNET_STREAM_Socket'
1946  * @param tunnel connection to the other end
1947  * @param tunnel_ctx unused
1948  * @param sender who sent the message
1949  * @param message the actual message
1950  * @param atsi performance data for the connection
1951  * @return GNUNET_OK to keep the connection open,
1952  *         GNUNET_SYSERR to close it (signal serious error)
1953  */
1954 static int
1955 client_handle_ack (void *cls,
1956                    struct GNUNET_MESH_Tunnel *tunnel,
1957                    void **tunnel_ctx,
1958                    const struct GNUNET_PeerIdentity *sender,
1959                    const struct GNUNET_MessageHeader *message,
1960                    const struct GNUNET_ATS_Information*atsi)
1961 {
1962   struct GNUNET_STREAM_Socket *socket = cls;
1963   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
1964  
1965   return handle_ack (socket, tunnel, sender, ack, atsi);
1966 }
1967
1968
1969 /**
1970  * Message Handler for mesh
1971  *
1972  * @param cls the server's listen socket
1973  * @param tunnel connection to the other end
1974  * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
1975  * @param sender who sent the message
1976  * @param message the actual message
1977  * @param atsi performance data for the connection
1978  * @return GNUNET_OK to keep the connection open,
1979  *         GNUNET_SYSERR to close it (signal serious error)
1980  */
1981 static int
1982 server_handle_ack (void *cls,
1983                    struct GNUNET_MESH_Tunnel *tunnel,
1984                    void **tunnel_ctx,
1985                    const struct GNUNET_PeerIdentity *sender,
1986                    const struct GNUNET_MessageHeader *message,
1987                    const struct GNUNET_ATS_Information*atsi)
1988 {
1989   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1990   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
1991  
1992   return handle_ack (socket, tunnel, sender, ack, atsi);
1993 }
1994
1995
1996 /**
1997  * For client message handlers, the stream socket is in the
1998  * closure argument.
1999  */
2000 static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
2001   {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2002   {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2003    sizeof (struct GNUNET_STREAM_AckMessage) },
2004   {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2005    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2006   {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2007    sizeof (struct GNUNET_STREAM_MessageHeader)},
2008   {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2009    sizeof (struct GNUNET_STREAM_MessageHeader)},
2010   {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2011    sizeof (struct GNUNET_STREAM_MessageHeader)},
2012   {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2013    sizeof (struct GNUNET_STREAM_MessageHeader)},
2014   {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2015    sizeof (struct GNUNET_STREAM_MessageHeader)},
2016   {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2017    sizeof (struct GNUNET_STREAM_MessageHeader)},
2018   {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2019    sizeof (struct GNUNET_STREAM_MessageHeader)},
2020   {NULL, 0, 0}
2021 };
2022
2023
2024 /**
2025  * For server message handlers, the stream socket is in the
2026  * tunnel context, and the listen socket in the closure argument.
2027  */
2028 static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
2029   {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2030   {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2031    sizeof (struct GNUNET_STREAM_AckMessage) },
2032   {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 
2033    sizeof (struct GNUNET_STREAM_MessageHeader)},
2034   {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2035    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2036   {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2037    sizeof (struct GNUNET_STREAM_MessageHeader)},
2038   {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2039    sizeof (struct GNUNET_STREAM_MessageHeader)},
2040   {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2041    sizeof (struct GNUNET_STREAM_MessageHeader)},
2042   {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2043    sizeof (struct GNUNET_STREAM_MessageHeader)},
2044   {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2045    sizeof (struct GNUNET_STREAM_MessageHeader)},
2046   {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2047    sizeof (struct GNUNET_STREAM_MessageHeader)},
2048   {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2049    sizeof (struct GNUNET_STREAM_MessageHeader)},
2050   {NULL, 0, 0}
2051 };
2052
2053
2054 /**
2055  * Function called when our target peer is connected to our tunnel
2056  *
2057  * @param cls the socket for which this tunnel is created
2058  * @param peer the peer identity of the target
2059  * @param atsi performance data for the connection
2060  */
2061 static void
2062 mesh_peer_connect_callback (void *cls,
2063                             const struct GNUNET_PeerIdentity *peer,
2064                             const struct GNUNET_ATS_Information * atsi)
2065 {
2066   struct GNUNET_STREAM_Socket *socket = cls;
2067   struct GNUNET_STREAM_MessageHeader *message;
2068   GNUNET_PEER_Id connected_peer;
2069
2070   connected_peer = GNUNET_PEER_search (peer);
2071   
2072   if (connected_peer != socket->other_peer)
2073     {
2074       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2075                   "%x: A peer which is not our target has connected",
2076                   "to our tunnel\n",
2077                   socket->our_id);
2078       return;
2079     }
2080   
2081   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2082               "%x: Target peer %x connected\n", 
2083               socket->our_id,
2084               connected_peer);
2085   
2086   /* Set state to INIT */
2087   socket->state = STATE_INIT;
2088
2089   /* Send HELLO message */
2090   message = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
2091   message->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
2092   message->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
2093   queue_message (socket,
2094                  message,
2095                  &set_state_hello_wait,
2096                  NULL);
2097
2098   /* Call open callback */
2099   if (NULL == socket->open_cb)
2100     {
2101       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2102                   "STREAM_open callback is NULL\n");
2103     }
2104 }
2105
2106
2107 /**
2108  * Function called when our target peer is disconnected from our tunnel
2109  *
2110  * @param cls the socket associated which this tunnel
2111  * @param peer the peer identity of the target
2112  */
2113 static void
2114 mesh_peer_disconnect_callback (void *cls,
2115                                const struct GNUNET_PeerIdentity *peer)
2116 {
2117
2118 }
2119
2120
2121 /**
2122  * Method called whenever a peer creates a tunnel to us
2123  *
2124  * @param cls closure
2125  * @param tunnel new handle to the tunnel
2126  * @param initiator peer that started the tunnel
2127  * @param atsi performance information for the tunnel
2128  * @return initial tunnel context for the tunnel
2129  *         (can be NULL -- that's not an error)
2130  */
2131 static void *
2132 new_tunnel_notify (void *cls,
2133                    struct GNUNET_MESH_Tunnel *tunnel,
2134                    const struct GNUNET_PeerIdentity *initiator,
2135                    const struct GNUNET_ATS_Information *atsi)
2136 {
2137   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2138   struct GNUNET_STREAM_Socket *socket;
2139
2140   /* FIXME: If a tunnel is already created, we should not accept new tunnels
2141      from the same peer again until the socket is closed */
2142
2143   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2144   socket->other_peer = GNUNET_PEER_intern (initiator);
2145   socket->tunnel = tunnel;
2146   socket->session_id = 0;       /* FIXME */
2147   socket->state = STATE_INIT;
2148   socket->lsocket = lsocket;
2149   socket->our_id = lsocket->our_id;
2150   
2151   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2152               "%x: Peer %x initiated tunnel to us\n", 
2153               socket->our_id,
2154               socket->other_peer);
2155   
2156   /* FIXME: Copy MESH handle from lsocket to socket */
2157   
2158   return socket;
2159 }
2160
2161
2162 /**
2163  * Function called whenever an inbound tunnel is destroyed.  Should clean up
2164  * any associated state.  This function is NOT called if the client has
2165  * explicitly asked for the tunnel to be destroyed using
2166  * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
2167  * the tunnel.
2168  *
2169  * @param cls closure (set from GNUNET_MESH_connect)
2170  * @param tunnel connection to the other end (henceforth invalid)
2171  * @param tunnel_ctx place where local state associated
2172  *                   with the tunnel is stored
2173  */
2174 static void 
2175 tunnel_cleaner (void *cls,
2176                 const struct GNUNET_MESH_Tunnel *tunnel,
2177                 void *tunnel_ctx)
2178 {
2179   struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
2180   
2181   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2182               "%x: Peer %x has terminated connection abruptly\n",
2183               socket->our_id,
2184               socket->other_peer);
2185
2186   socket->status = GNUNET_STREAM_SHUTDOWN;
2187
2188   /* Clear Transmit handles */
2189   if (NULL != socket->transmit_handle)
2190     {
2191       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2192       socket->transmit_handle = NULL;
2193     }
2194   socket->tunnel = NULL;
2195 }
2196
2197
2198 /*****************/
2199 /* API functions */
2200 /*****************/
2201
2202
2203 /**
2204  * Tries to open a stream to the target peer
2205  *
2206  * @param cfg configuration to use
2207  * @param target the target peer to which the stream has to be opened
2208  * @param app_port the application port number which uniquely identifies this
2209  *            stream
2210  * @param open_cb this function will be called after stream has be established 
2211  * @param open_cb_cls the closure for open_cb
2212  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
2213  * @return if successful it returns the stream socket; NULL if stream cannot be
2214  *         opened 
2215  */
2216 struct GNUNET_STREAM_Socket *
2217 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
2218                     const struct GNUNET_PeerIdentity *target,
2219                     GNUNET_MESH_ApplicationType app_port,
2220                     GNUNET_STREAM_OpenCallback open_cb,
2221                     void *open_cb_cls,
2222                     ...)
2223 {
2224   struct GNUNET_STREAM_Socket *socket;
2225   struct GNUNET_PeerIdentity own_peer_id;
2226   enum GNUNET_STREAM_Option option;
2227   va_list vargs;                /* Variable arguments */
2228
2229   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2230               "%s\n", __func__);
2231
2232   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2233   socket->other_peer = GNUNET_PEER_intern (target);
2234   socket->open_cb = open_cb;
2235   socket->open_cls = open_cb_cls;
2236   GNUNET_TESTING_get_peer_identity (cfg, &own_peer_id);
2237   socket->our_id = GNUNET_PEER_intern (&own_peer_id);
2238   
2239   /* Set defaults */
2240   socket->retransmit_timeout = 
2241     GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
2242
2243   va_start (vargs, open_cb_cls); /* Parse variable args */
2244   do {
2245     option = va_arg (vargs, enum GNUNET_STREAM_Option);
2246     switch (option)
2247       {
2248       case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
2249         /* Expect struct GNUNET_TIME_Relative */
2250         socket->retransmit_timeout = va_arg (vargs,
2251                                              struct GNUNET_TIME_Relative);
2252         break;
2253       case GNUNET_STREAM_OPTION_END:
2254         break;
2255       }
2256   } while (GNUNET_STREAM_OPTION_END != option);
2257   va_end (vargs);               /* End of variable args parsing */
2258   socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
2259                                       10,  /* QUEUE size as parameter? */
2260                                       socket, /* cls */
2261                                       NULL, /* No inbound tunnel handler */
2262                                       &tunnel_cleaner, /* FIXME: not required? */
2263                                       client_message_handlers,
2264                                       &app_port); /* We don't get inbound tunnels */
2265   if (NULL == socket->mesh)   /* Fail if we cannot connect to mesh */
2266     {
2267       GNUNET_free (socket);
2268       return NULL;
2269     }
2270
2271   /* Now create the mesh tunnel to target */
2272   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2273               "Creating MESH Tunnel\n");
2274   socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
2275                                               NULL, /* Tunnel context */
2276                                               &mesh_peer_connect_callback,
2277                                               &mesh_peer_disconnect_callback,
2278                                               socket);
2279   GNUNET_assert (NULL != socket->tunnel);
2280   GNUNET_MESH_peer_request_connect_add (socket->tunnel,
2281                                         target);
2282   
2283   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2284               "%s() END\n", __func__);
2285   return socket;
2286 }
2287
2288
2289 /**
2290  * Shutdown the stream for reading or writing (man 2 shutdown).
2291  *
2292  * @param socket the stream socket
2293  * @param how SHUT_RD, SHUT_WR or SHUT_RDWR 
2294  */
2295 void
2296 GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
2297                         int how)
2298 {
2299   return;
2300 }
2301
2302
2303 /**
2304  * Closes the stream
2305  *
2306  * @param socket the stream socket
2307  */
2308 void
2309 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
2310 {
2311   struct MessageQueue *head;
2312
2313   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
2314   {
2315     /* socket closed with read task pending!? */
2316     GNUNET_break (0);
2317     GNUNET_SCHEDULER_cancel (socket->read_task_id);
2318     socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
2319   }
2320
2321   /* Clear Transmit handles */
2322   if (NULL != socket->transmit_handle)
2323     {
2324       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2325       socket->transmit_handle = NULL;
2326     }
2327
2328   /* Clear existing message queue */
2329   while (NULL != (head = socket->queue_head)) {
2330     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
2331                                  socket->queue_tail,
2332                                  head);
2333     GNUNET_free (head->message);
2334     GNUNET_free (head);
2335   }
2336
2337   /* Close associated tunnel */
2338   if (NULL != socket->tunnel)
2339     {
2340       GNUNET_MESH_tunnel_destroy (socket->tunnel);
2341       socket->tunnel = NULL;
2342     }
2343
2344   /* Close mesh connection */
2345   if (NULL != socket->mesh && NULL == socket->lsocket)
2346     {
2347       GNUNET_MESH_disconnect (socket->mesh);
2348       socket->mesh = NULL;
2349     }
2350   
2351   /* Release receive buffer */
2352   if (NULL != socket->receive_buffer)
2353     {
2354       GNUNET_free (socket->receive_buffer);
2355     }
2356
2357   GNUNET_free (socket);
2358 }
2359
2360
2361 /**
2362  * Listens for stream connections for a specific application ports
2363  *
2364  * @param cfg the configuration to use
2365  * @param app_port the application port for which new streams will be accepted
2366  * @param listen_cb this function will be called when a peer tries to establish
2367  *            a stream with us
2368  * @param listen_cb_cls closure for listen_cb
2369  * @return listen socket, NULL for any error
2370  */
2371 struct GNUNET_STREAM_ListenSocket *
2372 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
2373                       GNUNET_MESH_ApplicationType app_port,
2374                       GNUNET_STREAM_ListenCallback listen_cb,
2375                       void *listen_cb_cls)
2376 {
2377   /* FIXME: Add variable args for passing configration options? */
2378   struct GNUNET_STREAM_ListenSocket *lsocket;
2379   struct GNUNET_PeerIdentity our_peer_id;
2380
2381   lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
2382   lsocket->port = app_port;
2383   lsocket->listen_cb = listen_cb;
2384   lsocket->listen_cb_cls = listen_cb_cls;
2385   GNUNET_TESTING_get_peer_identity (cfg, &our_peer_id);
2386   lsocket->our_id = GNUNET_PEER_intern (&our_peer_id);
2387   lsocket->mesh = GNUNET_MESH_connect (cfg,
2388                                        10, /* FIXME: QUEUE size as parameter? */
2389                                        lsocket, /* Closure */
2390                                        &new_tunnel_notify,
2391                                        &tunnel_cleaner,
2392                                        server_message_handlers,
2393                                        &app_port);
2394   GNUNET_assert (NULL != lsocket->mesh);
2395   return lsocket;
2396 }
2397
2398
2399 /**
2400  * Closes the listen socket
2401  *
2402  * @param lsocket the listen socket
2403  */
2404 void
2405 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
2406 {
2407   /* Close MESH connection */
2408   GNUNET_assert (NULL != lsocket->mesh);
2409   GNUNET_MESH_disconnect (lsocket->mesh);
2410   
2411   GNUNET_free (lsocket);
2412 }
2413
2414
2415 /**
2416  * Tries to write the given data to the stream
2417  *
2418  * @param socket the socket representing a stream
2419  * @param data the data buffer from where the data is written into the stream
2420  * @param size the number of bytes to be written from the data buffer
2421  * @param timeout the timeout period
2422  * @param write_cont the function to call upon writing some bytes into the stream
2423  * @param write_cont_cls the closure
2424  * @return handle to cancel the operation
2425  */
2426 struct GNUNET_STREAM_IOWriteHandle *
2427 GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
2428                      const void *data,
2429                      size_t size,
2430                      struct GNUNET_TIME_Relative timeout,
2431                      GNUNET_STREAM_CompletionContinuation write_cont,
2432                      void *write_cont_cls)
2433 {
2434   unsigned int num_needed_packets;
2435   unsigned int packet;
2436   struct GNUNET_STREAM_IOWriteHandle *io_handle;
2437   uint32_t packet_size;
2438   uint32_t payload_size;
2439   struct GNUNET_STREAM_DataMessage *data_msg;
2440   const void *sweep;
2441   struct GNUNET_TIME_Relative ack_deadline;
2442
2443   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2444               "%s\n", __func__);
2445
2446   /* Return NULL if there is already a write request pending */
2447   if (NULL != socket->write_handle)
2448   {
2449     GNUNET_break (0);
2450     return NULL;
2451   }
2452   if (!((STATE_ESTABLISHED == socket->state)
2453         || (STATE_RECEIVE_CLOSE_WAIT == socket->state)
2454         || (STATE_RECEIVE_CLOSED == socket->state)))
2455     {
2456       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2457                   "%x: Attempting to write on a closed (OR) not-yet-established"
2458                   "stream\n",
2459                   socket->our_id);
2460       return NULL;
2461     } 
2462   if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size)
2463     size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH  * max_payload_size;
2464   num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size;
2465   io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
2466   io_handle->write_cont = write_cont;
2467   io_handle->write_cont_cls = write_cont_cls;
2468   io_handle->size = size;
2469   sweep = data;
2470   /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
2471      determined from RTT */
2472   ack_deadline = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
2473   /* Divide the given buffer into packets for sending */
2474   for (packet=0; packet < num_needed_packets; packet++)
2475     {
2476       if ((packet + 1) * max_payload_size < size) 
2477         {
2478           payload_size = max_payload_size;
2479           packet_size = MAX_PACKET_SIZE;
2480         }
2481       else 
2482         {
2483           payload_size = size - packet * max_payload_size;
2484           packet_size =  payload_size + sizeof (struct
2485                                                 GNUNET_STREAM_DataMessage); 
2486         }
2487       io_handle->messages[packet] = GNUNET_malloc (packet_size);
2488       io_handle->messages[packet]->header.header.size = htons (packet_size);
2489       io_handle->messages[packet]->header.header.type =
2490         htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
2491       io_handle->messages[packet]->sequence_number =
2492         htonl (socket->write_sequence_number++);
2493       io_handle->messages[packet]->offset = htonl (socket->write_offset);
2494
2495       /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
2496          determined from RTT */
2497       io_handle->messages[packet]->ack_deadline =
2498         GNUNET_TIME_relative_hton (ack_deadline);
2499       data_msg = io_handle->messages[packet];
2500       /* Copy data from given buffer to the packet */
2501       memcpy (&data_msg[1],
2502               sweep,
2503               payload_size);
2504       sweep += payload_size;
2505       socket->write_offset += payload_size;
2506     }
2507   socket->write_handle = io_handle;
2508   write_data (socket);
2509
2510   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2511               "%s() END\n", __func__);
2512
2513   return io_handle;
2514 }
2515
2516
2517 /**
2518  * Tries to read data from the stream
2519  *
2520  * @param socket the socket representing a stream
2521  * @param timeout the timeout period
2522  * @param proc function to call with data (once only)
2523  * @param proc_cls the closure for proc
2524  * @return handle to cancel the operation
2525  */
2526 struct GNUNET_STREAM_IOReadHandle *
2527 GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
2528                     struct GNUNET_TIME_Relative timeout,
2529                     GNUNET_STREAM_DataProcessor proc,
2530                     void *proc_cls)
2531 {
2532   struct GNUNET_STREAM_IOReadHandle *read_handle;
2533   
2534   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2535               "%s()\n", __func__);
2536
2537   /* Return NULL if there is already a read handle; the user has to cancel that
2538   first before continuing or has to wait until it is completed */
2539   if (NULL != socket->read_handle) return NULL;
2540
2541   read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
2542   read_handle->proc = proc;
2543   socket->read_handle = read_handle;
2544
2545   /* Check if we have a packet at bitmap 0 */
2546   if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
2547                                           0))
2548     {
2549       socket->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
2550                                                        socket);
2551    
2552     }
2553   
2554   /* Setup the read timeout task */
2555   socket->read_io_timeout_task = GNUNET_SCHEDULER_add_delayed (timeout,
2556                                                                &read_io_timeout,
2557                                                                socket);
2558   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2559               "%s() END\n", __func__);
2560   return read_handle;
2561 }
2562
2563
2564 /**
2565  * Cancel pending write operation.
2566  *
2567  * @param ioh handle to operation to cancel
2568  */
2569 void
2570 GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
2571 {
2572   return;
2573 }
2574
2575
2576 /**
2577  * Cancel pending read operation.
2578  *
2579  * @param ioh handle to operation to cancel
2580  */
2581 void
2582 GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh)
2583 {
2584   return;
2585 }