1c0eec878ccc841a9d1b5d5c8a9c37657c3d7006
[oweals/gnunet.git] / src / stream / stream_api.c
1 /*
2   This file is part of GNUnet.
3   (C) 2012 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file stream/stream_api.c
23  * @brief Implementation of the stream library
24  * @author Sree Harsha Totakura
25  */
26 #include "platform.h"
27 #include "gnunet_common.h"
28 #include "gnunet_crypto_lib.h"
29 #include "gnunet_stream_lib.h"
30 #include "stream_protocol.h"
31
32
33 /**
34  * The maximum packet size of a stream packet
35  */
36 #define MAX_PACKET_SIZE 64000
37
38 /**
39  * The maximum payload a data message packet can carry
40  */
41 static size_t max_payload_size = 
42   MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
43
44 /**
45  * Receive buffer
46  */
47 #define RECEIVE_BUFFER_SIZE 4096000
48
49 /**
50  * states in the Protocol
51  */
52 enum State
53   {
54     /**
55      * Client initialization state
56      */
57     STATE_INIT,
58
59     /**
60      * Listener initialization state 
61      */
62     STATE_LISTEN,
63
64     /**
65      * Pre-connection establishment state
66      */
67     STATE_HELLO_WAIT,
68
69     /**
70      * State where a connection has been established
71      */
72     STATE_ESTABLISHED,
73
74     /**
75      * State where the socket is closed on our side and waiting to be ACK'ed
76      */
77     STATE_RECEIVE_CLOSE_WAIT,
78
79     /**
80      * State where the socket is closed for reading
81      */
82     STATE_RECEIVE_CLOSED,
83
84     /**
85      * State where the socket is closed on our side and waiting to be ACK'ed
86      */
87     STATE_TRANSMIT_CLOSE_WAIT,
88
89     /**
90      * State where the socket is closed for writing
91      */
92     STATE_TRANSMIT_CLOSED,
93
94     /**
95      * State where the socket is closed on our side and waiting to be ACK'ed
96      */
97     STATE_CLOSE_WAIT,
98
99     /**
100      * State where the socket is closed
101      */
102     STATE_CLOSED 
103   };
104
105
106 /**
107  * Functions of this type are called when a message is written
108  *
109  * @param socket the socket the written message was bound to
110  */
111 typedef void (*SendFinishCallback) (void *cls,
112                                     struct GNUNET_STREAM_Socket *socket);
113
114
115 /**
116  * The send message queue
117  */
118 struct MessageQueue
119 {
120   /**
121    * The message
122    */
123   struct GNUNET_STREAM_MessageHeader *message;
124
125   /**
126    * Callback to be called when the message is sent
127    */
128   SendFinishCallback finish_cb;
129
130   /**
131    * The closure for finish_cb
132    */
133   void *finish_cb_cls;
134
135   /**
136    * The next message in queue. Should be NULL in the last message
137    */
138   struct MessageQueue *next;
139
140   /**
141    * The next message in queue. Should be NULL in the last message
142    */
143   struct MessageQueue *prev;
144 };
145
146
147 /**
148  * The STREAM Socket Handler
149  */
150 struct GNUNET_STREAM_Socket
151 {
152
153   /**
154    * The peer identity of the peer at the other end of the stream
155    */
156   struct GNUNET_PeerIdentity other_peer;
157
158   /**
159    * Retransmission timeout
160    */
161   struct GNUNET_TIME_Relative retransmit_timeout;
162
163   /**
164    * The Acknowledgement Bitmap
165    */
166   GNUNET_STREAM_AckBitmap ack_bitmap;
167
168   /**
169    * Time when the Acknowledgement was queued
170    */
171   struct GNUNET_TIME_Absolute ack_time_registered;
172
173   /**
174    * Queued Acknowledgement deadline
175    */
176   struct GNUNET_TIME_Relative ack_time_deadline;
177
178   /**
179    * The task for sending timely Acks
180    */
181   GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
182
183   /**
184    * The mesh handle
185    */
186   struct GNUNET_MESH_Handle *mesh;
187
188   /**
189    * The mesh tunnel handle
190    */
191   struct GNUNET_MESH_Tunnel *tunnel;
192
193   /**
194    * Stream open closure
195    */
196   void *open_cls;
197
198   /**
199    * Stream open callback
200    */
201   GNUNET_STREAM_OpenCallback open_cb;
202
203   /**
204    * The current transmit handle (if a pending transmit request exists)
205    */
206   struct GNUNET_MESH_TransmitHandle *transmit_handle;
207
208   /**
209    * The current message associated with the transmit handle
210    */
211   struct MessageQueue *queue_head;
212
213   /**
214    * The queue tail, should always point to the last message in queue
215    */
216   struct MessageQueue *queue_tail;
217
218   /**
219    * The write IO_handle associated with this socket
220    */
221   struct GNUNET_STREAM_IOHandle *write_handle;
222
223   /**
224    * The read IO_handle associated with this socket
225    */
226   struct GNUNET_STREAM_IOHandle *read_handle;
227
228   /**
229    * Buffer for storing received messages
230    */
231   void *receive_buffer;
232
233   /**
234    * The state of the protocol associated with this socket
235    */
236   enum State state;
237
238   /**
239    * The status of the socket
240    */
241   enum GNUNET_STREAM_Status status;
242
243   /**
244    * The number of previous timeouts; FIXME: currently not used
245    */
246   unsigned int retries;
247
248   /**
249    * The session id associated with this stream connection
250    * FIXME: Not used currently, may be removed
251    */
252   uint32_t session_id;
253
254   /**
255    * Write sequence number. Set to random when sending HELLO(client) and
256    * HELLO_ACK(server) 
257    */
258   uint32_t write_sequence_number;
259
260   /**
261    * Read sequence number. This number's value is determined during handshake
262    */
263   uint32_t read_sequence_number;
264
265   /**
266    * receiver's available buffer after the last acknowledged packet
267    */
268   uint32_t receive_window_available;
269 };
270
271
272 /**
273  * A socket for listening
274  */
275 struct GNUNET_STREAM_ListenSocket
276 {
277
278   /**
279    * The mesh handle
280    */
281   struct GNUNET_MESH_Handle *mesh;
282
283   /**
284    * The callback function which is called after successful opening socket
285    */
286   GNUNET_STREAM_ListenCallback listen_cb;
287
288   /**
289    * The call back closure
290    */
291   void *listen_cb_cls;
292
293   /**
294    * The service port
295    */
296   GNUNET_MESH_ApplicationType port;
297 };
298
299
300
301
302 /**
303  * The IO Handle
304  */
305 struct GNUNET_STREAM_IOHandle
306 {
307   /**
308    * The packet_buffers associated with this Handle
309    */
310   struct GNUNET_STREAM_DataMessage *messages[64];
311
312   /**
313    * The bitmap of this IOHandle; Corresponding bit for a message is set when
314    * it has been acknowledged by the receiver
315    */
316   GNUNET_STREAM_AckBitmap ack_bitmap;
317
318   /**
319    * receiver's available buffer
320    */
321   uint32_t receive_window_available;
322
323   /**
324    * Number of packets sent before waiting for an ack
325    *
326    * FIXME: Do we need this?
327    */
328   unsigned int sent_packets;
329 };
330
331
332 /**
333  * Default value in seconds for various timeouts
334  */
335 static unsigned int default_timeout = 300;
336
337
338 /**
339  * Callback function for sending hello message
340  *
341  * @param cls closure the socket
342  * @param size number of bytes available in buf
343  * @param buf where the callee should write the message
344  * @return number of bytes written to buf
345  */
346 static size_t
347 send_message_notify (void *cls, size_t size, void *buf)
348 {
349   struct GNUNET_STREAM_Socket *socket = cls;
350   struct MessageQueue *head;
351   size_t ret;
352
353   socket->transmit_handle = NULL; /* Remove the transmit handle */
354   head = socket->queue_head;
355   if (NULL == head)
356     return 0; /* just to be safe */
357   if (0 == size)                /* request timed out */
358     {
359       socket->retries++;
360       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
361                   "Message sending timed out. Retry %d \n",
362                   socket->retries);
363       socket->transmit_handle = 
364         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
365                                            0, /* Corking */
366                                            1, /* Priority */
367                                            /* FIXME: exponential backoff */
368                                            socket->retransmit_timeout,
369                                            &socket->other_peer,
370                                            ntohs (head->message->header.size),
371                                            &send_message_notify,
372                                            socket);
373       return 0;
374     }
375
376   ret = ntohs (head->message->header.size);
377   GNUNET_assert (size >= ret);
378   memcpy (buf, head->message, ret);
379   if (NULL != head->finish_cb)
380     {
381       head->finish_cb (socket, head->finish_cb_cls);
382     }
383   GNUNET_CONTAINER_DLL_remove (socket->queue_head,
384                                socket->queue_tail,
385                                head);
386   GNUNET_free (head->message);
387   GNUNET_free (head);
388   head = socket->queue_head;
389   if (NULL != head)    /* more pending messages to send */
390     {
391       socket->retries = 0;
392       socket->transmit_handle = 
393         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
394                                            0, /* Corking */
395                                            1, /* Priority */
396                                            /* FIXME: exponential backoff */
397                                            socket->retransmit_timeout,
398                                            &socket->other_peer,
399                                            ntohs (head->message->header.size),
400                                            &send_message_notify,
401                                            socket);
402     }
403   return ret;
404 }
405
406
407 /**
408  * Queues a message for sending using the mesh connection of a socket
409  *
410  * @param socket the socket whose mesh connection is used
411  * @param message the message to be sent
412  * @param finish_cb the callback to be called when the message is sent
413  * @param finish_cb_cls the closure for the callback
414  */
415 static void
416 queue_message (struct GNUNET_STREAM_Socket *socket,
417                struct GNUNET_STREAM_MessageHeader *message,
418                SendFinishCallback finish_cb,
419                void *finish_cb_cls)
420 {
421   struct MessageQueue *queue_entity;
422
423   queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
424   queue_entity->message = message;
425   queue_entity->finish_cb = finish_cb;
426   queue_entity->finish_cb_cls = finish_cb_cls;
427   GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
428                                     socket->queue_tail,
429                                     queue_entity);
430   if (NULL == socket->transmit_handle)
431   {
432     socket->retries = 0;
433     socket->transmit_handle = 
434       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
435                                          0, /* Corking */
436                                          1, /* Priority */
437                                          socket->retransmit_timeout,
438                                          &socket->other_peer,
439                                          ntohs (message->header.size),
440                                          &send_message_notify,
441                                          socket);
442   }
443 }
444
445
446 /**
447  * Callback function for sending ack message
448  *
449  * @param cls closure the ACK message created in ack_task
450  * @param size number of bytes available in buffer
451  * @param buf where the callee should write the message
452  * @return number of bytes written to buf
453  */
454 static size_t
455 send_ack_notify (void *cls, size_t size, void *buf)
456 {
457   struct GNUNET_STREAM_AckMessage *ack_msg = cls;
458
459   if (0 == size)
460     {
461       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
462                   "%s called with size 0\n", __func__);
463       return 0;
464     }
465   GNUNET_assert (ack_msg->header.header.size <= size);
466   
467   size = ack_msg->header.header.size;
468   memcpy (buf, ack_msg, size);
469   return size;
470 }
471
472
473 /**
474  * Task for sending ACK message
475  *
476  * @param cls the socket
477  * @param tc the Task context
478  */
479 static void
480 ack_task (void *cls,
481           const struct GNUNET_SCHEDULER_TaskContext *tc)
482 {
483   struct GNUNET_STREAM_Socket *socket = cls;
484   struct GNUNET_STREAM_AckMessage *ack_msg;
485
486   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
487     {
488       return;
489     }
490
491   socket->ack_task_id = 0;
492
493   /* Create the ACK Message */
494   ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
495   ack_msg->header.header.size = htons (sizeof (struct 
496                                                GNUNET_STREAM_AckMessage));
497   ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
498   ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
499   ack_msg->base_sequence_number = htonl (socket->write_sequence_number);
500   ack_msg->receive_window_remaining = htonl (socket->receive_window_available);
501
502   /* Request MESH for sending ACK */
503   GNUNET_MESH_notify_transmit_ready (socket->tunnel,
504                                      0, /* Corking */
505                                      1, /* Priority */
506                                      socket->retransmit_timeout,
507                                      &socket->other_peer,
508                                      ntohs (ack_msg->header.header.size),
509                                      &send_ack_notify,
510                                      ack_msg);
511
512   
513 }
514
515
516 /**
517  * Function to modify a bit in GNUNET_STREAM_AckBitmap
518  *
519  * @param bitmap the bitmap to modify
520  * @param bit the bit number to modify
521  * @param value GNUNET_YES to on, GNUNET_NO to off
522  */
523 static void
524 ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
525                       unsigned int bit, 
526                       int value)
527 {
528   GNUNET_assert (bit < 64);
529   if (GNUNET_YES == value)
530     *bitmap |= (1LL << bit);
531   else
532     *bitmap &= ~(1LL << bit);
533 }
534
535
536 /**
537  * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
538  *
539  * @param bitmap address of the bitmap that has to be checked
540  * @param bit the bit number to check
541  * @return GNUNET_YES if the bit is set; GNUNET_NO if not
542  */
543 static uint8_t
544 ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
545                       unsigned int bit)
546 {
547   GNUNET_assert (bit < 64);
548   return 0 != (*bitmap & (1LL << bit));
549 }
550
551
552
553 /**
554  * Function called when Data Message is sent
555  *
556  * @param cls the io_handle corresponding to the Data Message
557  * @param socket the socket which was used
558  */
559 static void
560 write_data_finish_cb (void *cls,
561                       struct GNUNET_STREAM_Socket *socket)
562 {
563   struct GNUNET_STREAM_IOHandle *io_handle = cls;
564
565   io_handle->sent_packets++;
566 }
567
568
569 /**
570  * Writes data using the given socket. The amount of data written is limited by
571  * the receive_window_size
572  *
573  * @param socket the socket to use
574  */
575 static void 
576 write_data (struct GNUNET_STREAM_Socket *socket)
577 {
578   struct GNUNET_STREAM_IOHandle *io_handle = socket->write_handle;
579   unsigned int packet;
580   int ack_packet;
581
582   ack_packet = -1;
583   /* Find the last acknowledged packet */
584   for (packet=0; packet < 64; packet++)
585     {      
586       if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
587                                               packet))
588         ack_packet = packet;        
589       else if (NULL == io_handle->messages[packet])
590         break;
591     }
592   /* Resend packets which weren't ack'ed */
593   for (packet=0; packet < ack_packet; packet++)
594     {
595       if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
596                                              packet))
597         {
598           queue_message (socket,
599                          &io_handle->messages[packet]->header,
600                          NULL,
601                          NULL);
602         }
603     }
604   packet = ack_packet + 1;
605   /* Now send new packets if there is enough buffer space */
606   while ( (NULL != io_handle->messages[packet]) &&
607           (io_handle->receive_window_available >= ntohs (io_handle->messages[packet]->header.header.size)) )
608     {
609       io_handle->receive_window_available -= ntohs (io_handle->messages[packet]->header.header.size);
610       queue_message (socket,
611                      &io_handle->messages[packet]->header,
612                      &write_data_finish_cb,
613                      io_handle);
614       packet++;
615     }
616 }
617
618
619 /**
620  * Handler for DATA messages; Same for both client and server
621  *
622  * @param socket the socket through which the ack was received
623  * @param tunnel connection to the other end
624  * @param sender who sent the message
625  * @param ack the acknowledgment message
626  * @param atsi performance data for the connection
627  * @return GNUNET_OK to keep the connection open,
628  *         GNUNET_SYSERR to close it (signal serious error)
629  */
630 static int
631 handle_data (struct GNUNET_STREAM_Socket *socket,
632              struct GNUNET_MESH_Tunnel *tunnel,
633              const struct GNUNET_PeerIdentity *sender,
634              const struct GNUNET_STREAM_DataMessage *msg,
635              const struct GNUNET_ATS_Information*atsi)
636 {
637   uint16_t size;
638   const void *payload;
639
640   size = msg->header.header.size;
641   if (size < sizeof (struct GNUNET_STREAM_DataMessage))
642     {
643       GNUNET_break_op (0);
644       return GNUNET_SYSERR;
645     }
646
647   switch (socket->state)
648     {
649     case STATE_ESTABLISHED:
650     case STATE_TRANSMIT_CLOSED:
651     case STATE_TRANSMIT_CLOSE_WAIT:
652       GNUNET_assert (NULL != socket->receive_buffer);
653       /* check if the message's sequence number is greater than the one we are
654          expecting */
655       if (ntohl (msg->sequence_number) - socket->read_sequence_number < 64)
656         {
657
658         }
659       else                      /* We are receiving a retransmitted message */
660         {
661           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
662                       "Message with sequence number %d retransmitted\n",
663                       ntohl (socket->read_sequence_number));
664           return GNUNET_YES;
665         }
666       /* Copy Data to buffer and send acknowledgements */
667       size -= sizeof (struct GNUNET_STREAM_DataMessage);
668       payload = &msg[1];
669       memcpy (socket->receive_buffer 
670               + (ntohl (msg->sequence_number) - socket->read_sequence_number)
671               * MAX_PACKET_SIZE,
672               payload,
673               size);
674       
675       /* Modify the ACK bitmap */
676       ackbitmap_modify_bit (&socket->ack_bitmap,
677                             ntohl (msg->sequence_number) -
678                             socket->read_sequence_number,
679                             GNUNET_YES);
680
681       /* Start ACK sending task if one is not already present */
682       if (0 == socket->ack_task_id)
683        {
684          socket->ack_task_id = 
685            GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
686                                          (msg->ack_deadline),
687                                          &ack_task,
688                                          socket);
689        }
690       
691       break;
692
693     default:
694       /* FIXME: call statistics */
695       break;
696     }
697   return GNUNET_YES;
698 }
699
700 /**
701  * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
702  *
703  * @param cls the socket (set from GNUNET_MESH_connect)
704  * @param tunnel connection to the other end
705  * @param tunnel_ctx place to store local state associated with the tunnel
706  * @param sender who sent the message
707  * @param message the actual message
708  * @param atsi performance data for the connection
709  * @return GNUNET_OK to keep the connection open,
710  *         GNUNET_SYSERR to close it (signal serious error)
711  */
712 static int
713 client_handle_data (void *cls,
714              struct GNUNET_MESH_Tunnel *tunnel,
715              void **tunnel_ctx,
716              const struct GNUNET_PeerIdentity *sender,
717              const struct GNUNET_MessageHeader *message,
718              const struct GNUNET_ATS_Information*atsi)
719 {
720   struct GNUNET_STREAM_Socket *socket = cls;
721
722   return handle_data (socket, 
723                       tunnel, 
724                       sender, 
725                       (const struct GNUNET_STREAM_DataMessage *) message, 
726                       atsi);
727 }
728
729
730 /**
731  * Callback to set state to ESTABLISHED
732  *
733  * @param cls the closure from queue_message FIXME: document
734  * @param socket the socket to requiring state change
735  */
736 static void
737 set_state_established (void *cls,
738                        struct GNUNET_STREAM_Socket *socket)
739 {
740   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Attaining ESTABLISHED state\n");
741   /* Initialize the receive buffer */
742   socket->receive_buffer = GNUNET_malloc (RECEIVE_BUFFER_SIZE);
743   socket->state = STATE_ESTABLISHED;
744 }
745
746
747 /**
748  * Callback to set state to HELLO_WAIT
749  *
750  * @param cls the closure from queue_message
751  * @param socket the socket to requiring state change
752  */
753 static void
754 set_state_hello_wait (void *cls,
755                       struct GNUNET_STREAM_Socket *socket)
756 {
757   GNUNET_assert (STATE_INIT == socket->state);
758   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Attaining HELLO_WAIT state\n");
759   socket->state = STATE_HELLO_WAIT;
760 }
761
762
763 /**
764  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
765  *
766  * @param cls the socket (set from GNUNET_MESH_connect)
767  * @param tunnel connection to the other end
768  * @param tunnel_ctx this is NULL
769  * @param sender who sent the message
770  * @param message the actual message
771  * @param atsi performance data for the connection
772  * @return GNUNET_OK to keep the connection open,
773  *         GNUNET_SYSERR to close it (signal serious error)
774  */
775 static int
776 client_handle_hello_ack (void *cls,
777                          struct GNUNET_MESH_Tunnel *tunnel,
778                          void **tunnel_ctx,
779                          const struct GNUNET_PeerIdentity *sender,
780                          const struct GNUNET_MessageHeader *message,
781                          const struct GNUNET_ATS_Information*atsi)
782 {
783   struct GNUNET_STREAM_Socket *socket = cls;
784   const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
785   struct GNUNET_STREAM_HelloAckMessage *reply;
786
787   ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
788   GNUNET_assert (socket->tunnel == tunnel);
789   switch (socket->state)
790   {
791   case STATE_HELLO_WAIT:
792       socket->read_sequence_number = ntohl (ack_msg->sequence_number);
793       socket->receive_window_available = ntohl (ack_msg->receive_window_size);
794       /* Get the random sequence number */
795       socket->write_sequence_number = 
796         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
797       reply = 
798         GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
799       reply->header.header.size = 
800         htons (sizeof (struct GNUNET_STREAM_MessageHeader));
801       reply->header.header.type = 
802         htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
803       reply->sequence_number = htonl (socket->write_sequence_number);
804       reply->receive_window_size = htonl (RECEIVE_BUFFER_SIZE);
805       queue_message (socket, 
806                      &reply->header, 
807                      &set_state_established, 
808                      NULL);      
809       return GNUNET_OK;
810   case STATE_ESTABLISHED:
811   case STATE_RECEIVE_CLOSE_WAIT:
812     // call statistics (# ACKs ignored++)
813     return GNUNET_OK;
814   case STATE_INIT:
815   default:
816     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
817                 "Server sent HELLO_ACK when in state %d\n", socket->state);
818     socket->state = STATE_CLOSED; // introduce STATE_ERROR?
819     return GNUNET_SYSERR;
820   }
821
822 }
823
824
825 /**
826  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
827  *
828  * @param cls the socket (set from GNUNET_MESH_connect)
829  * @param tunnel connection to the other end
830  * @param tunnel_ctx this is NULL
831  * @param sender who sent the message
832  * @param message the actual message
833  * @param atsi performance data for the connection
834  * @return GNUNET_OK to keep the connection open,
835  *         GNUNET_SYSERR to close it (signal serious error)
836  */
837 static int
838 client_handle_reset (void *cls,
839                      struct GNUNET_MESH_Tunnel *tunnel,
840                      void **tunnel_ctx,
841                      const struct GNUNET_PeerIdentity *sender,
842                      const struct GNUNET_MessageHeader *message,
843                      const struct GNUNET_ATS_Information*atsi)
844 {
845   struct GNUNET_STREAM_Socket *socket = cls;
846
847   return GNUNET_OK;
848 }
849
850
851 /**
852  * Common message handler for handling TRANSMIT_CLOSE messages
853  *
854  * @param socket the socket through which the ack was received
855  * @param tunnel connection to the other end
856  * @param sender who sent the message
857  * @param ack the acknowledgment message
858  * @param atsi performance data for the connection
859  * @return GNUNET_OK to keep the connection open,
860  *         GNUNET_SYSERR to close it (signal serious error)
861  */
862 static int
863 handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
864                        struct GNUNET_MESH_Tunnel *tunnel,
865                        const struct GNUNET_PeerIdentity *sender,
866                        const struct GNUNET_STREAM_MessageHeader *msg,
867                        const struct GNUNET_ATS_Information*atsi)
868 {
869   struct GNUNET_STREAM_MessageHeader *reply;
870
871   switch (socket->state)
872     {
873     case STATE_ESTABLISHED:
874       socket->state = STATE_RECEIVE_CLOSED;
875
876       /* Send TRANSMIT_CLOSE_ACK */
877       reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
878       reply->header.type = 
879         htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
880       reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
881       queue_message (socket, reply, NULL, NULL);
882       break;
883
884     default:
885       /* FIXME: Call statistics? */
886       break;
887     }
888   return GNUNET_YES;
889 }
890
891
892 /**
893  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
894  *
895  * @param cls the socket (set from GNUNET_MESH_connect)
896  * @param tunnel connection to the other end
897  * @param tunnel_ctx this is NULL
898  * @param sender who sent the message
899  * @param message the actual message
900  * @param atsi performance data for the connection
901  * @return GNUNET_OK to keep the connection open,
902  *         GNUNET_SYSERR to close it (signal serious error)
903  */
904 static int
905 client_handle_transmit_close (void *cls,
906                               struct GNUNET_MESH_Tunnel *tunnel,
907                               void **tunnel_ctx,
908                               const struct GNUNET_PeerIdentity *sender,
909                               const struct GNUNET_MessageHeader *message,
910                               const struct GNUNET_ATS_Information*atsi)
911 {
912   struct GNUNET_STREAM_Socket *socket = cls;
913   
914   return handle_transmit_close (socket,
915                                 tunnel,
916                                 sender,
917                                 (struct GNUNET_STREAM_MessageHeader *)message,
918                                 atsi);
919 }
920
921
922 /**
923  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
924  *
925  * @param cls the socket (set from GNUNET_MESH_connect)
926  * @param tunnel connection to the other end
927  * @param tunnel_ctx this is NULL
928  * @param sender who sent the message
929  * @param message the actual message
930  * @param atsi performance data for the connection
931  * @return GNUNET_OK to keep the connection open,
932  *         GNUNET_SYSERR to close it (signal serious error)
933  */
934 static int
935 client_handle_transmit_close_ack (void *cls,
936                                   struct GNUNET_MESH_Tunnel *tunnel,
937                                   void **tunnel_ctx,
938                                   const struct GNUNET_PeerIdentity *sender,
939                                   const struct GNUNET_MessageHeader *message,
940                                   const struct GNUNET_ATS_Information*atsi)
941 {
942   struct GNUNET_STREAM_Socket *socket = cls;
943
944   return GNUNET_OK;
945 }
946
947
948 /**
949  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
950  *
951  * @param cls the socket (set from GNUNET_MESH_connect)
952  * @param tunnel connection to the other end
953  * @param tunnel_ctx this is NULL
954  * @param sender who sent the message
955  * @param message the actual message
956  * @param atsi performance data for the connection
957  * @return GNUNET_OK to keep the connection open,
958  *         GNUNET_SYSERR to close it (signal serious error)
959  */
960 static int
961 client_handle_receive_close (void *cls,
962                              struct GNUNET_MESH_Tunnel *tunnel,
963                              void **tunnel_ctx,
964                              const struct GNUNET_PeerIdentity *sender,
965                              const struct GNUNET_MessageHeader *message,
966                              const struct GNUNET_ATS_Information*atsi)
967 {
968   struct GNUNET_STREAM_Socket *socket = cls;
969
970   return GNUNET_OK;
971 }
972
973
974 /**
975  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
976  *
977  * @param cls the socket (set from GNUNET_MESH_connect)
978  * @param tunnel connection to the other end
979  * @param tunnel_ctx this is NULL
980  * @param sender who sent the message
981  * @param message the actual message
982  * @param atsi performance data for the connection
983  * @return GNUNET_OK to keep the connection open,
984  *         GNUNET_SYSERR to close it (signal serious error)
985  */
986 static int
987 client_handle_receive_close_ack (void *cls,
988                                  struct GNUNET_MESH_Tunnel *tunnel,
989                                  void **tunnel_ctx,
990                                  const struct GNUNET_PeerIdentity *sender,
991                                  const struct GNUNET_MessageHeader *message,
992                                  const struct GNUNET_ATS_Information*atsi)
993 {
994   struct GNUNET_STREAM_Socket *socket = cls;
995
996   return GNUNET_OK;
997 }
998
999
1000 /**
1001  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1002  *
1003  * @param cls the socket (set from GNUNET_MESH_connect)
1004  * @param tunnel connection to the other end
1005  * @param tunnel_ctx this is NULL
1006  * @param sender who sent the message
1007  * @param message the actual message
1008  * @param atsi performance data for the connection
1009  * @return GNUNET_OK to keep the connection open,
1010  *         GNUNET_SYSERR to close it (signal serious error)
1011  */
1012 static int
1013 client_handle_close (void *cls,
1014                      struct GNUNET_MESH_Tunnel *tunnel,
1015                      void **tunnel_ctx,
1016                      const struct GNUNET_PeerIdentity *sender,
1017                      const struct GNUNET_MessageHeader *message,
1018                      const struct GNUNET_ATS_Information*atsi)
1019 {
1020   struct GNUNET_STREAM_Socket *socket = cls;
1021
1022   return GNUNET_OK;
1023 }
1024
1025
1026 /**
1027  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1028  *
1029  * @param cls the socket (set from GNUNET_MESH_connect)
1030  * @param tunnel connection to the other end
1031  * @param tunnel_ctx this is NULL
1032  * @param sender who sent the message
1033  * @param message the actual message
1034  * @param atsi performance data for the connection
1035  * @return GNUNET_OK to keep the connection open,
1036  *         GNUNET_SYSERR to close it (signal serious error)
1037  */
1038 static int
1039 client_handle_close_ack (void *cls,
1040                          struct GNUNET_MESH_Tunnel *tunnel,
1041                          void **tunnel_ctx,
1042                          const struct GNUNET_PeerIdentity *sender,
1043                          const struct GNUNET_MessageHeader *message,
1044                          const struct GNUNET_ATS_Information*atsi)
1045 {
1046   struct GNUNET_STREAM_Socket *socket = cls;
1047
1048   return GNUNET_OK;
1049 }
1050
1051 /*****************************/
1052 /* Server's Message Handlers */
1053 /*****************************/
1054
1055 /**
1056  * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1057  *
1058  * @param cls the closure
1059  * @param tunnel connection to the other end
1060  * @param tunnel_ctx the socket
1061  * @param sender who sent the message
1062  * @param message the actual message
1063  * @param atsi performance data for the connection
1064  * @return GNUNET_OK to keep the connection open,
1065  *         GNUNET_SYSERR to close it (signal serious error)
1066  */
1067 static int
1068 server_handle_data (void *cls,
1069                     struct GNUNET_MESH_Tunnel *tunnel,
1070                     void **tunnel_ctx,
1071                     const struct GNUNET_PeerIdentity *sender,
1072                     const struct GNUNET_MessageHeader *message,
1073                     const struct GNUNET_ATS_Information*atsi)
1074 {
1075   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1076
1077   return GNUNET_OK;
1078 }
1079
1080
1081 /**
1082  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
1083  *
1084  * @param cls the closure
1085  * @param tunnel connection to the other end
1086  * @param tunnel_ctx the socket
1087  * @param sender who sent the message
1088  * @param message the actual message
1089  * @param atsi performance data for the connection
1090  * @return GNUNET_OK to keep the connection open,
1091  *         GNUNET_SYSERR to close it (signal serious error)
1092  */
1093 static int
1094 server_handle_hello (void *cls,
1095                      struct GNUNET_MESH_Tunnel *tunnel,
1096                      void **tunnel_ctx,
1097                      const struct GNUNET_PeerIdentity *sender,
1098                      const struct GNUNET_MessageHeader *message,
1099                      const struct GNUNET_ATS_Information*atsi)
1100 {
1101   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1102   struct GNUNET_STREAM_HelloAckMessage *reply;
1103
1104   GNUNET_assert (socket->tunnel == tunnel);
1105   if (STATE_INIT == socket->state)
1106     {
1107       /* Get the random sequence number */
1108       socket->write_sequence_number = 
1109         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1110       reply = 
1111         GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1112       reply->header.header.size = 
1113         htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1114       reply->header.header.type = 
1115         htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1116       reply->sequence_number = htonl (socket->write_sequence_number);
1117       queue_message (socket, 
1118                      &reply->header,
1119                      &set_state_hello_wait, 
1120                      NULL);
1121     }
1122   else
1123     {
1124       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1125                   "Client sent HELLO when in state %d\n", socket->state);
1126       /* FIXME: Send RESET? */
1127       
1128     }
1129   return GNUNET_OK;
1130 }
1131
1132
1133 /**
1134  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1135  *
1136  * @param cls the closure
1137  * @param tunnel connection to the other end
1138  * @param tunnel_ctx the socket
1139  * @param sender who sent the message
1140  * @param message the actual message
1141  * @param atsi performance data for the connection
1142  * @return GNUNET_OK to keep the connection open,
1143  *         GNUNET_SYSERR to close it (signal serious error)
1144  */
1145 static int
1146 server_handle_hello_ack (void *cls,
1147                          struct GNUNET_MESH_Tunnel *tunnel,
1148                          void **tunnel_ctx,
1149                          const struct GNUNET_PeerIdentity *sender,
1150                          const struct GNUNET_MessageHeader *message,
1151                          const struct GNUNET_ATS_Information*atsi)
1152 {
1153   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1154   const struct GNUNET_STREAM_HelloAckMessage *ack_message;
1155
1156   ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
1157   GNUNET_assert (socket->tunnel == tunnel);
1158   if (STATE_HELLO_WAIT == socket->state)
1159     {
1160       socket->read_sequence_number = ntohl (ack_message->sequence_number);
1161       socket->receive_window_available = 
1162         ntohl (ack_message->receive_window_size);
1163       socket->state = STATE_ESTABLISHED;
1164     }
1165   else
1166     {
1167       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1168                   "Client sent HELLO_ACK when in state %d\n", socket->state);
1169       /* FIXME: Send RESET? */
1170       
1171     }
1172   return GNUNET_OK;
1173 }
1174
1175
1176 /**
1177  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1178  *
1179  * @param cls the closure
1180  * @param tunnel connection to the other end
1181  * @param tunnel_ctx the socket
1182  * @param sender who sent the message
1183  * @param message the actual message
1184  * @param atsi performance data for the connection
1185  * @return GNUNET_OK to keep the connection open,
1186  *         GNUNET_SYSERR to close it (signal serious error)
1187  */
1188 static int
1189 server_handle_reset (void *cls,
1190                      struct GNUNET_MESH_Tunnel *tunnel,
1191                      void **tunnel_ctx,
1192                      const struct GNUNET_PeerIdentity *sender,
1193                      const struct GNUNET_MessageHeader *message,
1194                      const struct GNUNET_ATS_Information*atsi)
1195 {
1196   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1197
1198   return GNUNET_OK;
1199 }
1200
1201
1202 /**
1203  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1204  *
1205  * @param cls the closure
1206  * @param tunnel connection to the other end
1207  * @param tunnel_ctx the socket
1208  * @param sender who sent the message
1209  * @param message the actual message
1210  * @param atsi performance data for the connection
1211  * @return GNUNET_OK to keep the connection open,
1212  *         GNUNET_SYSERR to close it (signal serious error)
1213  */
1214 static int
1215 server_handle_transmit_close (void *cls,
1216                               struct GNUNET_MESH_Tunnel *tunnel,
1217                               void **tunnel_ctx,
1218                               const struct GNUNET_PeerIdentity *sender,
1219                               const struct GNUNET_MessageHeader *message,
1220                               const struct GNUNET_ATS_Information*atsi)
1221 {
1222   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1223
1224   return handle_transmit_close (socket,
1225                                 tunnel,
1226                                 sender,
1227                                 (struct GNUNET_STREAM_MessageHeader *)message,
1228                                 atsi);
1229 }
1230
1231
1232 /**
1233  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1234  *
1235  * @param cls the closure
1236  * @param tunnel connection to the other end
1237  * @param tunnel_ctx the socket
1238  * @param sender who sent the message
1239  * @param message the actual message
1240  * @param atsi performance data for the connection
1241  * @return GNUNET_OK to keep the connection open,
1242  *         GNUNET_SYSERR to close it (signal serious error)
1243  */
1244 static int
1245 server_handle_transmit_close_ack (void *cls,
1246                                   struct GNUNET_MESH_Tunnel *tunnel,
1247                                   void **tunnel_ctx,
1248                                   const struct GNUNET_PeerIdentity *sender,
1249                                   const struct GNUNET_MessageHeader *message,
1250                                   const struct GNUNET_ATS_Information*atsi)
1251 {
1252   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1253
1254   return GNUNET_OK;
1255 }
1256
1257
1258 /**
1259  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1260  *
1261  * @param cls the closure
1262  * @param tunnel connection to the other end
1263  * @param tunnel_ctx the socket
1264  * @param sender who sent the message
1265  * @param message the actual message
1266  * @param atsi performance data for the connection
1267  * @return GNUNET_OK to keep the connection open,
1268  *         GNUNET_SYSERR to close it (signal serious error)
1269  */
1270 static int
1271 server_handle_receive_close (void *cls,
1272                              struct GNUNET_MESH_Tunnel *tunnel,
1273                              void **tunnel_ctx,
1274                              const struct GNUNET_PeerIdentity *sender,
1275                              const struct GNUNET_MessageHeader *message,
1276                              const struct GNUNET_ATS_Information*atsi)
1277 {
1278   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1279
1280   return GNUNET_OK;
1281 }
1282
1283
1284 /**
1285  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1286  *
1287  * @param cls the closure
1288  * @param tunnel connection to the other end
1289  * @param tunnel_ctx the socket
1290  * @param sender who sent the message
1291  * @param message the actual message
1292  * @param atsi performance data for the connection
1293  * @return GNUNET_OK to keep the connection open,
1294  *         GNUNET_SYSERR to close it (signal serious error)
1295  */
1296 static int
1297 server_handle_receive_close_ack (void *cls,
1298                                  struct GNUNET_MESH_Tunnel *tunnel,
1299                                  void **tunnel_ctx,
1300                                  const struct GNUNET_PeerIdentity *sender,
1301                                  const struct GNUNET_MessageHeader *message,
1302                                  const struct GNUNET_ATS_Information*atsi)
1303 {
1304   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1305
1306   return GNUNET_OK;
1307 }
1308
1309
1310 /**
1311  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1312  *
1313  * @param cls the closure
1314  * @param tunnel connection to the other end
1315  * @param tunnel_ctx the socket
1316  * @param sender who sent the message
1317  * @param message the actual message
1318  * @param atsi performance data for the connection
1319  * @return GNUNET_OK to keep the connection open,
1320  *         GNUNET_SYSERR to close it (signal serious error)
1321  */
1322 static int
1323 server_handle_close (void *cls,
1324                      struct GNUNET_MESH_Tunnel *tunnel,
1325                      void **tunnel_ctx,
1326                      const struct GNUNET_PeerIdentity *sender,
1327                      const struct GNUNET_MessageHeader *message,
1328                      const struct GNUNET_ATS_Information*atsi)
1329 {
1330   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1331
1332   return GNUNET_OK;
1333 }
1334
1335
1336 /**
1337  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1338  *
1339  * @param cls the closure
1340  * @param tunnel connection to the other end
1341  * @param tunnel_ctx the socket
1342  * @param sender who sent the message
1343  * @param message the actual message
1344  * @param atsi performance data for the connection
1345  * @return GNUNET_OK to keep the connection open,
1346  *         GNUNET_SYSERR to close it (signal serious error)
1347  */
1348 static int
1349 server_handle_close_ack (void *cls,
1350                          struct GNUNET_MESH_Tunnel *tunnel,
1351                          void **tunnel_ctx,
1352                          const struct GNUNET_PeerIdentity *sender,
1353                          const struct GNUNET_MessageHeader *message,
1354                          const struct GNUNET_ATS_Information*atsi)
1355 {
1356   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1357
1358   return GNUNET_OK;
1359 }
1360
1361
1362 /**
1363  * Message Handler for mesh
1364  *
1365  * @param socket the socket through which the ack was received
1366  * @param tunnel connection to the other end
1367  * @param sender who sent the message
1368  * @param ack the acknowledgment message
1369  * @param atsi performance data for the connection
1370  * @return GNUNET_OK to keep the connection open,
1371  *         GNUNET_SYSERR to close it (signal serious error)
1372  */
1373 static int
1374 handle_ack (struct GNUNET_STREAM_Socket *socket,
1375             struct GNUNET_MESH_Tunnel *tunnel,
1376             const struct GNUNET_PeerIdentity *sender,
1377             const struct GNUNET_STREAM_AckMessage *ack,
1378             const struct GNUNET_ATS_Information*atsi)
1379 {
1380   switch (socket->state)
1381     {
1382     case (STATE_ESTABLISHED):
1383       if (NULL == socket->write_handle)
1384         {
1385           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1386                       "Received DATA ACK when write_handle is NULL\n");
1387           return GNUNET_OK;
1388         }
1389
1390       socket->write_handle->ack_bitmap = GNUNET_ntohll (ack->bitmap);
1391       socket->write_handle->receive_window_available = 
1392         ntohl (ack->receive_window_remaining);
1393       write_data (socket);
1394       break;
1395     default:
1396       break;
1397     }
1398   return GNUNET_OK;
1399 }
1400
1401
1402 /**
1403  * Message Handler for mesh
1404  *
1405  * @param cls the 'struct GNUNET_STREAM_Socket'
1406  * @param tunnel connection to the other end
1407  * @param tunnel_ctx unused
1408  * @param sender who sent the message
1409  * @param message the actual message
1410  * @param atsi performance data for the connection
1411  * @return GNUNET_OK to keep the connection open,
1412  *         GNUNET_SYSERR to close it (signal serious error)
1413  */
1414 static int
1415 client_handle_ack (void *cls,
1416                    struct GNUNET_MESH_Tunnel *tunnel,
1417                    void **tunnel_ctx,
1418                    const struct GNUNET_PeerIdentity *sender,
1419                    const struct GNUNET_MessageHeader *message,
1420                    const struct GNUNET_ATS_Information*atsi)
1421 {
1422   struct GNUNET_STREAM_Socket *socket = cls;
1423   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
1424  
1425   return handle_ack (socket, tunnel, sender, ack, atsi);
1426 }
1427
1428
1429 /**
1430  * Message Handler for mesh
1431  *
1432  * @param cls the server's listen socket
1433  * @param tunnel connection to the other end
1434  * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
1435  * @param sender who sent the message
1436  * @param message the actual message
1437  * @param atsi performance data for the connection
1438  * @return GNUNET_OK to keep the connection open,
1439  *         GNUNET_SYSERR to close it (signal serious error)
1440  */
1441 static int
1442 server_handle_ack (void *cls,
1443                    struct GNUNET_MESH_Tunnel *tunnel,
1444                    void **tunnel_ctx,
1445                    const struct GNUNET_PeerIdentity *sender,
1446                    const struct GNUNET_MessageHeader *message,
1447                    const struct GNUNET_ATS_Information*atsi)
1448 {
1449   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1450   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
1451  
1452   return handle_ack (socket, tunnel, sender, ack, atsi);
1453 }
1454
1455
1456 /**
1457  * For client message handlers, the stream socket is in the
1458  * closure argument.
1459  */
1460 static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
1461   {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
1462   {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
1463    sizeof (struct GNUNET_STREAM_AckMessage) },
1464   {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
1465    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
1466   {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
1467    sizeof (struct GNUNET_STREAM_MessageHeader)},
1468   {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
1469    sizeof (struct GNUNET_STREAM_MessageHeader)},
1470   {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
1471    sizeof (struct GNUNET_STREAM_MessageHeader)},
1472   {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
1473    sizeof (struct GNUNET_STREAM_MessageHeader)},
1474   {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
1475    sizeof (struct GNUNET_STREAM_MessageHeader)},
1476   {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
1477    sizeof (struct GNUNET_STREAM_MessageHeader)},
1478   {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
1479    sizeof (struct GNUNET_STREAM_MessageHeader)},
1480   {NULL, 0, 0}
1481 };
1482
1483
1484 /**
1485  * For server message handlers, the stream socket is in the
1486  * tunnel context, and the listen socket in the closure argument.
1487  */
1488 static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
1489   {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
1490   {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
1491    sizeof (struct GNUNET_STREAM_AckMessage) },
1492   {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 
1493    sizeof (struct GNUNET_STREAM_MessageHeader)},
1494   {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
1495    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
1496   {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
1497    sizeof (struct GNUNET_STREAM_MessageHeader)},
1498   {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
1499    sizeof (struct GNUNET_STREAM_MessageHeader)},
1500   {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
1501    sizeof (struct GNUNET_STREAM_MessageHeader)},
1502   {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
1503    sizeof (struct GNUNET_STREAM_MessageHeader)},
1504   {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
1505    sizeof (struct GNUNET_STREAM_MessageHeader)},
1506   {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
1507    sizeof (struct GNUNET_STREAM_MessageHeader)},
1508   {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
1509    sizeof (struct GNUNET_STREAM_MessageHeader)},
1510   {NULL, 0, 0}
1511 };
1512
1513
1514 /**
1515  * Function called when our target peer is connected to our tunnel
1516  *
1517  * @param cls the socket for which this tunnel is created
1518  * @param peer the peer identity of the target
1519  * @param atsi performance data for the connection
1520  */
1521 static void
1522 mesh_peer_connect_callback (void *cls,
1523                             const struct GNUNET_PeerIdentity *peer,
1524                             const struct GNUNET_ATS_Information * atsi)
1525 {
1526   struct GNUNET_STREAM_Socket *socket = cls;
1527   struct GNUNET_STREAM_MessageHeader *message;
1528
1529   if (0 != memcmp (&socket->other_peer, 
1530                    peer, 
1531                    sizeof (struct GNUNET_PeerIdentity)))
1532     {
1533       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1534                   "A peer (%s) which is not our target has connected to our tunnel", 
1535                   GNUNET_i2s (peer));
1536       return;
1537     }
1538   
1539   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1540               "Target peer %s connected\n", GNUNET_i2s (peer));
1541   
1542   /* Set state to INIT */
1543   socket->state = STATE_INIT;
1544
1545   /* Send HELLO message */
1546   message = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1547   message->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
1548   message->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1549   queue_message (socket,
1550                  message,
1551                  &set_state_hello_wait,
1552                  NULL);
1553
1554   /* Call open callback */
1555   if (NULL == socket->open_cb)
1556     {
1557       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1558                   "STREAM_open callback is NULL\n");
1559     }
1560   else
1561     {
1562       socket->open_cb (socket->open_cls, socket);
1563     }
1564 }
1565
1566
1567 /**
1568  * Function called when our target peer is disconnected from our tunnel
1569  *
1570  * @param cls the socket associated which this tunnel
1571  * @param peer the peer identity of the target
1572  */
1573 static void
1574 mesh_peer_disconnect_callback (void *cls,
1575                                const struct GNUNET_PeerIdentity *peer)
1576 {
1577
1578 }
1579
1580
1581 /*****************/
1582 /* API functions */
1583 /*****************/
1584
1585
1586 /**
1587  * Tries to open a stream to the target peer
1588  *
1589  * @param cfg configuration to use
1590  * @param target the target peer to which the stream has to be opened
1591  * @param app_port the application port number which uniquely identifies this
1592  *            stream
1593  * @param open_cb this function will be called after stream has be established 
1594  * @param open_cb_cls the closure for open_cb
1595  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
1596  * @return if successful it returns the stream socket; NULL if stream cannot be
1597  *         opened 
1598  */
1599 struct GNUNET_STREAM_Socket *
1600 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
1601                     const struct GNUNET_PeerIdentity *target,
1602                     GNUNET_MESH_ApplicationType app_port,
1603                     GNUNET_STREAM_OpenCallback open_cb,
1604                     void *open_cb_cls,
1605                     ...)
1606 {
1607   struct GNUNET_STREAM_Socket *socket;
1608   enum GNUNET_STREAM_Option option;
1609   va_list vargs;                /* Variable arguments */
1610
1611   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
1612   socket->other_peer = *target;
1613   socket->open_cb = open_cb;
1614   socket->open_cls = open_cb_cls;
1615
1616   /* Set defaults */
1617   socket->retransmit_timeout = 
1618     GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
1619
1620   va_start (vargs, open_cb_cls); /* Parse variable args */
1621   do {
1622     option = va_arg (vargs, enum GNUNET_STREAM_Option);
1623     switch (option)
1624       {
1625       case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
1626         /* Expect struct GNUNET_TIME_Relative */
1627         socket->retransmit_timeout = va_arg (vargs,
1628                                              struct GNUNET_TIME_Relative);
1629         break;
1630       case GNUNET_STREAM_OPTION_END:
1631         break;
1632       }
1633   } while (GNUNET_STREAM_OPTION_END != option);
1634   va_end (vargs);               /* End of variable args parsing */
1635
1636   socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
1637                                       1,  /* QUEUE size as parameter? */
1638                                       socket, /* cls */
1639                                       NULL, /* No inbound tunnel handler */
1640                                       NULL, /* No inbound tunnel cleaner */
1641                                       client_message_handlers,
1642                                       NULL); /* We don't get inbound tunnels */
1643   // FIXME: if (NULL == socket->mesh) ...
1644
1645   /* Now create the mesh tunnel to target */
1646   socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
1647                                               NULL, /* Tunnel context */
1648                                               &mesh_peer_connect_callback,
1649                                               &mesh_peer_disconnect_callback,
1650                                               socket);
1651   // FIXME: if (NULL == socket->tunnel) ...
1652
1653   return socket;
1654 }
1655
1656
1657 /**
1658  * Closes the stream
1659  *
1660  * @param socket the stream socket
1661  */
1662 void
1663 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
1664 {
1665   struct MessageQueue *head;
1666
1667   /* Clear Transmit handles */
1668   if (NULL != socket->transmit_handle)
1669     {
1670       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
1671       socket->transmit_handle = NULL;
1672     }
1673
1674   /* Clear existing message queue */
1675   while (NULL != (head = socket->queue_head)) {
1676     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
1677                                  socket->queue_tail,
1678                                  head);
1679     GNUNET_free (head->message);
1680     GNUNET_free (head);
1681   }
1682
1683   /* Close associated tunnel */
1684   if (NULL != socket->tunnel)
1685     {
1686       GNUNET_MESH_tunnel_destroy (socket->tunnel);
1687       socket->tunnel = NULL;
1688     }
1689
1690   /* Close mesh connection */
1691   if (NULL != socket->mesh)
1692     {
1693       GNUNET_MESH_disconnect (socket->mesh);
1694       socket->mesh = NULL;
1695     }
1696   
1697   /* Release receive buffer */
1698   if (NULL != socket->receive_buffer)
1699     {
1700       GNUNET_free (socket->receive_buffer);
1701     }
1702
1703   GNUNET_free (socket);
1704 }
1705
1706
1707 /**
1708  * Method called whenever a peer creates a tunnel to us
1709  *
1710  * @param cls closure
1711  * @param tunnel new handle to the tunnel
1712  * @param initiator peer that started the tunnel
1713  * @param atsi performance information for the tunnel
1714  * @return initial tunnel context for the tunnel
1715  *         (can be NULL -- that's not an error)
1716  */
1717 static void *
1718 new_tunnel_notify (void *cls,
1719                    struct GNUNET_MESH_Tunnel *tunnel,
1720                    const struct GNUNET_PeerIdentity *initiator,
1721                    const struct GNUNET_ATS_Information *atsi)
1722 {
1723   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
1724   struct GNUNET_STREAM_Socket *socket;
1725
1726   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
1727   socket->tunnel = tunnel;
1728   socket->session_id = 0;       /* FIXME */
1729   socket->other_peer = *initiator;
1730   socket->state = STATE_INIT;
1731
1732   if (GNUNET_SYSERR == lsocket->listen_cb (lsocket->listen_cb_cls,
1733                                            socket,
1734                                            &socket->other_peer))
1735     {
1736       socket->state = STATE_CLOSED;
1737       /* FIXME: Send CLOSE message and then free */
1738       GNUNET_free (socket);
1739       GNUNET_MESH_tunnel_destroy (tunnel); /* Destroy the tunnel */
1740     }
1741   return socket;
1742 }
1743
1744
1745 /**
1746  * Function called whenever an inbound tunnel is destroyed.  Should clean up
1747  * any associated state.  This function is NOT called if the client has
1748  * explicitly asked for the tunnel to be destroyed using
1749  * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
1750  * the tunnel.
1751  *
1752  * @param cls closure (set from GNUNET_MESH_connect)
1753  * @param tunnel connection to the other end (henceforth invalid)
1754  * @param tunnel_ctx place where local state associated
1755  *                   with the tunnel is stored
1756  */
1757 static void 
1758 tunnel_cleaner (void *cls,
1759                 const struct GNUNET_MESH_Tunnel *tunnel,
1760                 void *tunnel_ctx)
1761 {
1762   struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
1763   
1764   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1765               "Peer %s has terminated connection abruptly\n",
1766               GNUNET_i2s (&socket->other_peer));
1767
1768   socket->status = GNUNET_STREAM_SHUTDOWN;
1769   /* Clear Transmit handles */
1770   if (NULL != socket->transmit_handle)
1771     {
1772       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
1773       socket->transmit_handle = NULL;
1774     }
1775   socket->tunnel = NULL;
1776 }
1777
1778
1779 /**
1780  * Listens for stream connections for a specific application ports
1781  *
1782  * @param cfg the configuration to use
1783  * @param app_port the application port for which new streams will be accepted
1784  * @param listen_cb this function will be called when a peer tries to establish
1785  *            a stream with us
1786  * @param listen_cb_cls closure for listen_cb
1787  * @return listen socket, NULL for any error
1788  */
1789 struct GNUNET_STREAM_ListenSocket *
1790 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
1791                       GNUNET_MESH_ApplicationType app_port,
1792                       GNUNET_STREAM_ListenCallback listen_cb,
1793                       void *listen_cb_cls)
1794 {
1795   /* FIXME: Add variable args for passing configration options? */
1796   struct GNUNET_STREAM_ListenSocket *lsocket;
1797   GNUNET_MESH_ApplicationType app_types[2];
1798
1799   app_types[0] = app_port;
1800   app_types[1] = 0;
1801   lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
1802   lsocket->port = app_port;
1803   lsocket->listen_cb = listen_cb;
1804   lsocket->listen_cb_cls = listen_cb_cls;
1805   lsocket->mesh = GNUNET_MESH_connect (cfg,
1806                                        10, /* FIXME: QUEUE size as parameter? */
1807                                        lsocket, /* Closure */
1808                                        &new_tunnel_notify,
1809                                        &tunnel_cleaner,
1810                                        server_message_handlers,
1811                                        app_types);
1812   return lsocket;
1813 }
1814
1815
1816 /**
1817  * Closes the listen socket
1818  *
1819  * @param lsocket the listen socket
1820  */
1821 void
1822 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
1823 {
1824   /* Close MESH connection */
1825   GNUNET_MESH_disconnect (lsocket->mesh);
1826   
1827   GNUNET_free (lsocket);
1828 }
1829
1830
1831 /**
1832  * Tries to write the given data to the stream
1833  *
1834  * @param socket the socket representing a stream
1835  * @param data the data buffer from where the data is written into the stream
1836  * @param size the number of bytes to be written from the data buffer
1837  * @param timeout the timeout period
1838  * @param write_cont the function to call upon writing some bytes into the stream
1839  * @param write_cont_cls the closure
1840  * @return handle to cancel the operation
1841  */
1842 struct GNUNET_STREAM_IOHandle *
1843 GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
1844                      const void *data,
1845                      size_t size,
1846                      struct GNUNET_TIME_Relative timeout,
1847                      GNUNET_STREAM_CompletionContinuation write_cont,
1848                      void *write_cont_cls)
1849 {
1850   unsigned int num_needed_packets;
1851   unsigned int packet;
1852   struct GNUNET_STREAM_IOHandle *io_handle;
1853   size_t packet_size;
1854   struct GNUNET_STREAM_DataMessage *data_msg;
1855   const void *sweep;
1856
1857   /* There is already a write request pending */
1858   if (NULL != socket->write_handle)
1859   {
1860     GNUNET_break (0);
1861     return NULL;
1862   }
1863   if (!((STATE_ESTABLISHED == socket->state)
1864         || (STATE_RECEIVE_CLOSE_WAIT == socket->state)
1865         || (STATE_RECEIVE_CLOSED == socket->state)))
1866     {
1867       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1868                   "Attempting to write on a closed (OR) not-yet-established"
1869                   "stream\n"); 
1870       return NULL;
1871     } 
1872   if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size)
1873     size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH  * max_payload_size;
1874   num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size;
1875   io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOHandle));
1876   io_handle->receive_window_available = socket->receive_window_available;
1877   sweep = data;
1878   /* Divide the given buffer into packets for sending */
1879   for (packet=0; packet < num_needed_packets; packet++)
1880     {
1881       if ((packet + 1) * max_payload_size < size) 
1882         {
1883           packet_size = MAX_PACKET_SIZE;
1884         }
1885       else 
1886         {
1887           packet_size = size - packet * max_payload_size
1888             + sizeof (struct GNUNET_STREAM_DataMessage);
1889         }
1890       io_handle->messages[packet] = GNUNET_malloc (packet_size);
1891       io_handle->messages[packet]->header.header.size = htons (packet_size);
1892       io_handle->messages[packet]->header.header.type =
1893         htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
1894       io_handle->messages[packet]->sequence_number =
1895         htons (socket->write_sequence_number++);
1896       data_msg = io_handle->messages[packet];
1897       memcpy (&data_msg[1],
1898               sweep,
1899               packet_size - sizeof (struct GNUNET_STREAM_DataMessage));
1900       sweep += packet_size - sizeof (struct GNUNET_STREAM_DataMessage);
1901     }
1902
1903   write_data (socket);
1904
1905   return io_handle;
1906 }