faceac297623dd7308e2225be2fc796508d2932d
[oweals/gnunet.git] / src / stream / stream_api.c
1 /*
2   This file is part of GNUnet.
3   (C) 2012 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file stream/stream_api.c
23  * @brief Implementation of the stream library
24  * @author Sree Harsha Totakura
25  */
26 #include "platform.h"
27 #include "gnunet_common.h"
28 #include "gnunet_crypto_lib.h"
29 #include "gnunet_stream_lib.h"
30 #include "stream_protocol.h"
31
32
33 #define MAX_PACKET_SIZE 64000
34
35
36 /**
37  * states in the Protocol
38  */
39 enum State
40   {
41     /**
42      * Client initialization state
43      */
44     STATE_INIT,
45
46     /**
47      * Listener initialization state 
48      */
49     STATE_LISTEN,
50
51     /**
52      * Pre-connection establishment state
53      */
54     STATE_HELLO_WAIT,
55
56     /**
57      * State where a connection has been established
58      */
59     STATE_ESTABLISHED,
60
61     /**
62      * State where the socket is closed on our side and waiting to be ACK'ed
63      */
64     STATE_RECEIVE_CLOSE_WAIT,
65
66     /**
67      * State where the socket is closed for reading
68      */
69     STATE_RECEIVE_CLOSED,
70
71     /**
72      * State where the socket is closed on our side and waiting to be ACK'ed
73      */
74     STATE_TRANSMIT_CLOSE_WAIT,
75
76     /**
77      * State where the socket is closed for writing
78      */
79     STATE_TRANSMIT_CLOSED,
80
81     /**
82      * State where the socket is closed on our side and waiting to be ACK'ed
83      */
84     STATE_CLOSE_WAIT,
85
86     /**
87      * State where the socket is closed
88      */
89     STATE_CLOSED 
90   };
91
92
93 /**
94  * Functions of this type are called when a message is written
95  *
96  * @param socket the socket the written message was bound to
97  */
98 typedef void (*SendFinishCallback) (void *cls,
99                                     struct GNUNET_STREAM_Socket *socket);
100
101
102 /**
103  * The send message queue
104  */
105 struct MessageQueue
106 {
107   /**
108    * The message
109    */
110   struct GNUNET_STREAM_MessageHeader *message;
111
112   /**
113    * Callback to be called when the message is sent
114    */
115   SendFinishCallback finish_cb;
116
117   /**
118    * The closure for finish_cb
119    */
120   void *finish_cb_cls;
121
122   /**
123    * The next message in queue. Should be NULL in the last message
124    */
125   struct MessageQueue *next;
126
127   /**
128    * The next message in queue. Should be NULL in the last message
129    */
130   struct MessageQueue *prev;
131 };
132
133
134 /**
135  * The STREAM Socket Handler
136  */
137 struct GNUNET_STREAM_Socket
138 {
139
140   /**
141    * The peer identity of the peer at the other end of the stream
142    */
143   struct GNUNET_PeerIdentity other_peer;
144
145   /**
146    * Retransmission timeout
147    */
148   struct GNUNET_TIME_Relative retransmit_timeout;
149
150   /**
151    * The mesh handle
152    */
153   struct GNUNET_MESH_Handle *mesh;
154
155   /**
156    * The mesh tunnel handle
157    */
158   struct GNUNET_MESH_Tunnel *tunnel;
159
160   /**
161    * Stream open closure
162    */
163   void *open_cls;
164
165   /**
166    * Stream open callback
167    */
168   GNUNET_STREAM_OpenCallback open_cb;
169
170   /**
171    * The current transmit handle (if a pending transmit request exists)
172    */
173   struct GNUNET_MESH_TransmitHandle *transmit_handle;
174
175   /**
176    * The current message associated with the transmit handle
177    */
178   struct MessageQueue *queue_head;
179
180   /**
181    * The queue tail, should always point to the last message in queue
182    */
183   struct MessageQueue *queue_tail;
184
185   /**
186    * The write IO_handle associated with this socket
187    */
188   struct GNUNET_STREAM_IOHandle *write_handle;
189
190   /**
191    * The read IO_handle associated with this socket
192    */
193   struct GNUNET_STREAM_IOHandle *read_handle;
194
195   /**
196    * The state of the protocol associated with this socket
197    */
198   enum State state;
199
200   /**
201    * The status of the socket
202    */
203   enum GNUNET_STREAM_Status status;
204
205   /**
206    * The number of previous timeouts; FIXME: currently not used
207    */
208   unsigned int retries;
209
210   /**
211    * The session id associated with this stream connection
212    */
213   uint32_t session_id;
214
215   /**
216    * Write sequence number. Start at random upon reaching ESTABLISHED state
217    */
218   uint32_t write_sequence_number;
219
220   /**
221    * Read sequence number. This number's value is determined during handshake
222    */
223   uint32_t read_sequence_number;
224 };
225
226
227 /**
228  * A socket for listening
229  */
230 struct GNUNET_STREAM_ListenSocket
231 {
232
233   /**
234    * The mesh handle
235    */
236   struct GNUNET_MESH_Handle *mesh;
237
238   /**
239    * The callback function which is called after successful opening socket
240    */
241   GNUNET_STREAM_ListenCallback listen_cb;
242
243   /**
244    * The call back closure
245    */
246   void *listen_cb_cls;
247
248   /**
249    * The service port
250    */
251   GNUNET_MESH_ApplicationType port;
252 };
253
254
255 /**
256  * The IO Handle
257  */
258 struct GNUNET_STREAM_IOHandle
259 {
260   /**
261    * The packet_buffers associated with this Handle
262    */
263   struct GNUNET_STREAM_DataMessage *messages[64];
264
265   /**
266    * The bitmap of this IOHandle; Corresponding bit for a message is set when
267    * it has been acknowledged by the receiver
268    */
269   GNUNET_STREAM_AckBitmap bitmap;
270 };
271
272
273 /**
274  * Default value in seconds for various timeouts
275  */
276 static unsigned int default_timeout = 300;
277
278
279 /**
280  * Callback function for sending hello message
281  *
282  * @param cls closure the socket
283  * @param size number of bytes available in buf
284  * @param buf where the callee should write the message
285  * @return number of bytes written to buf
286  */
287 static size_t
288 send_message_notify (void *cls, size_t size, void *buf)
289 {
290   struct GNUNET_STREAM_Socket *socket = cls;
291   struct MessageQueue *head;
292   size_t ret;
293
294   socket->transmit_handle = NULL; /* Remove the transmit handle */
295   head = socket->queue_head;
296   if (NULL == head)
297     return 0; /* just to be safe */
298   if (0 == size)                /* request timed out */
299     {
300       socket->retries++;
301       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
302                   "Message sending timed out. Retry %d \n",
303                   socket->retries);
304       socket->transmit_handle = 
305         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
306                                            0, /* Corking */
307                                            1, /* Priority */
308                                            /* FIXME: exponential backoff */
309                                            socket->retransmit_timeout,
310                                            &socket->other_peer,
311                                            ntohs (head->message->header.size),
312                                            &send_message_notify,
313                                            socket);
314       return 0;
315     }
316
317   ret = ntohs (head->message->header.size);
318   GNUNET_assert (size >= ret);
319   memcpy (buf, head->message, ret);
320   if (NULL != head->finish_cb)
321     {
322       head->finish_cb (socket, head->finish_cb_cls);
323     }
324   GNUNET_CONTAINER_DLL_remove (socket->queue_head,
325                                socket->queue_tail,
326                                head);
327   GNUNET_free (head->message);
328   GNUNET_free (head);
329   head = socket->queue_head;
330   if (NULL != head)    /* more pending messages to send */
331     {
332       socket->retries = 0;
333       socket->transmit_handle = 
334         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
335                                            0, /* Corking */
336                                            1, /* Priority */
337                                            /* FIXME: exponential backoff */
338                                            socket->retransmit_timeout,
339                                            &socket->other_peer,
340                                            ntohs (head->message->header.size),
341                                            &send_message_notify,
342                                            socket);
343     }
344   return ret;
345 }
346
347
348 /**
349  * Queues a message for sending using the mesh connection of a socket
350  *
351  * @param socket the socket whose mesh connection is used
352  * @param message the message to be sent
353  * @param finish_cb the callback to be called when the message is sent
354  * @param finish_cb_cls the closure for the callback
355  */
356 static void
357 queue_message (struct GNUNET_STREAM_Socket *socket,
358                struct GNUNET_STREAM_MessageHeader *message,
359                SendFinishCallback finish_cb,
360                void *finish_cb_cls)
361 {
362   struct MessageQueue *queue_entity;
363
364   queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
365   queue_entity->message = message;
366   queue_entity->finish_cb = finish_cb;
367   queue_entity->finish_cb_cls = finish_cb_cls;
368   GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
369                                     socket->queue_tail,
370                                     queue_entity);
371   if (NULL == socket->transmit_handle)
372   {
373     socket->retries = 0;
374     socket->transmit_handle = 
375       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
376                                          0, /* Corking */
377                                          1, /* Priority */
378                                          socket->retransmit_timeout,
379                                          &socket->other_peer,
380                                          ntohs (message->header.size),
381                                          &send_message_notify,
382                                          socket);
383   }
384 }
385
386
387 /**
388  * Function to modify a bit in GNUNET_STREAM_AckBitmap
389  *
390  * @param bitmap the bitmap to modify
391  * @param bit the bit number to modify
392  * @param value GNUNET_YES to on, GNUNET_NO to off
393  */
394 static void
395 AckBitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
396                       unsigned int bit, 
397                       int value)
398 {
399   GNUNET_assert (bit < 64);
400   if (GNUNET_YES == value)
401     *bitmap |= (1LL << bit);
402   else
403     *bitmap &= ~(1LL << bit);
404 }
405
406
407 /**
408  * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
409  *
410  * @param bit the bit number to check
411  * @return GNUNET_YES if the bit is set; GNUNET_NO if not
412  */
413 static uint8_t
414 AckBitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
415                       unsigned int bit)
416 {
417   GNUNET_assert (bit < 64);
418   return 0 != (*bitmap & (1LL << bit));
419 }
420
421
422 /**
423  * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
424  *
425  * @param cls the socket (set from GNUNET_MESH_connect)
426  * @param tunnel connection to the other end
427  * @param tunnel_ctx place to store local state associated with the tunnel
428  * @param sender who sent the message
429  * @param message the actual message
430  * @param atsi performance data for the connection
431  * @return GNUNET_OK to keep the connection open,
432  *         GNUNET_SYSERR to close it (signal serious error)
433  */
434 static int
435 client_handle_data (void *cls,
436              struct GNUNET_MESH_Tunnel *tunnel,
437              void **tunnel_ctx,
438              const struct GNUNET_PeerIdentity *sender,
439              const struct GNUNET_MessageHeader *message,
440              const struct GNUNET_ATS_Information*atsi)
441 {
442   struct GNUNET_STREAM_Socket *socket = cls;
443   uint16_t size;
444   const struct GNUNET_STREAM_DataMessage *data_msg;
445   const void *payload;
446
447   size = ntohs (message->size);
448   if (size < sizeof (struct GNUNET_STREAM_DataMessage))
449   {
450     GNUNET_break_op (0);
451     return GNUNET_SYSERR;
452   }
453   data_msg = (const struct GNUNET_STREAM_DataMessage *) message;
454   size -= sizeof (struct GNUNET_STREAM_DataMessage);
455   payload = &data_msg[1];
456   /* ... */
457   
458   return GNUNET_OK;
459 }
460
461
462 /**
463  * Callback to set state to ESTABLISHED
464  *
465  * @param cls the closure from queue_message FIXME: document
466  * @param socket the socket to requiring state change
467  */
468 static void
469 set_state_established (void *cls,
470                        struct GNUNET_STREAM_Socket *socket)
471 {
472   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Attaining ESTABLISHED state\n");
473   socket->state = STATE_ESTABLISHED;
474 }
475
476
477 /**
478  * Callback to set state to HELLO_WAIT
479  *
480  * @param cls the closure from queue_message
481  * @param socket the socket to requiring state change
482  */
483 static void
484 set_state_hello_wait (void *cls,
485                       struct GNUNET_STREAM_Socket *socket)
486 {
487   GNUNET_assert (STATE_INIT == socket->state);
488   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Attaining HELLO_WAIT state\n");
489   socket->state = STATE_HELLO_WAIT;
490 }
491
492
493 /**
494  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
495  *
496  * @param cls the socket (set from GNUNET_MESH_connect)
497  * @param tunnel connection to the other end
498  * @param tunnel_ctx this is NULL
499  * @param sender who sent the message
500  * @param message the actual message
501  * @param atsi performance data for the connection
502  * @return GNUNET_OK to keep the connection open,
503  *         GNUNET_SYSERR to close it (signal serious error)
504  */
505 static int
506 client_handle_hello_ack (void *cls,
507                          struct GNUNET_MESH_Tunnel *tunnel,
508                          void **tunnel_ctx,
509                          const struct GNUNET_PeerIdentity *sender,
510                          const struct GNUNET_MessageHeader *message,
511                          const struct GNUNET_ATS_Information*atsi)
512 {
513   struct GNUNET_STREAM_Socket *socket = cls;
514   const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
515   struct GNUNET_STREAM_HelloAckMessage *reply;
516
517   ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
518   GNUNET_assert (socket->tunnel == tunnel);
519   switch (socket->state)
520   {
521   case STATE_HELLO_WAIT:
522       socket->read_sequence_number = ntohl (ack_msg->sequence_number);
523       /* Get the random sequence number */
524       socket->write_sequence_number = 
525         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
526       reply = 
527         GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
528       reply->header.header.size = 
529         htons (sizeof (struct GNUNET_STREAM_MessageHeader));
530       reply->header.header.type = 
531         htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
532       reply->sequence_number = htonl (socket->write_sequence_number);
533       queue_message (socket, 
534                      &reply->header, 
535                      &set_state_established, 
536                      NULL);      
537       return GNUNET_OK;
538   case STATE_ESTABLISHED:
539   case STATE_RECEIVE_CLOSE_WAIT:
540     // call statistics (# ACKs ignored++)
541     return GNUNET_OK;
542   case STATE_INIT:
543   default:
544     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
545                 "Server sent HELLO_ACK when in state %d\n", socket->state);
546     socket->state = STATE_CLOSED; // introduce STATE_ERROR?
547     return GNUNET_SYSERR;
548   }
549
550 }
551
552
553 /**
554  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
555  *
556  * @param cls the socket (set from GNUNET_MESH_connect)
557  * @param tunnel connection to the other end
558  * @param tunnel_ctx this is NULL
559  * @param sender who sent the message
560  * @param message the actual message
561  * @param atsi performance data for the connection
562  * @return GNUNET_OK to keep the connection open,
563  *         GNUNET_SYSERR to close it (signal serious error)
564  */
565 static int
566 client_handle_reset (void *cls,
567                      struct GNUNET_MESH_Tunnel *tunnel,
568                      void **tunnel_ctx,
569                      const struct GNUNET_PeerIdentity *sender,
570                      const struct GNUNET_MessageHeader *message,
571                      const struct GNUNET_ATS_Information*atsi)
572 {
573   struct GNUNET_STREAM_Socket *socket = cls;
574
575   return GNUNET_OK;
576 }
577
578
579 /**
580  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
581  *
582  * @param cls the socket (set from GNUNET_MESH_connect)
583  * @param tunnel connection to the other end
584  * @param tunnel_ctx this is NULL
585  * @param sender who sent the message
586  * @param message the actual message
587  * @param atsi performance data for the connection
588  * @return GNUNET_OK to keep the connection open,
589  *         GNUNET_SYSERR to close it (signal serious error)
590  */
591 static int
592 client_handle_transmit_close (void *cls,
593                               struct GNUNET_MESH_Tunnel *tunnel,
594                               void **tunnel_ctx,
595                               const struct GNUNET_PeerIdentity *sender,
596                               const struct GNUNET_MessageHeader *message,
597                               const struct GNUNET_ATS_Information*atsi)
598 {
599   struct GNUNET_STREAM_Socket *socket = cls;
600
601   return GNUNET_OK;
602 }
603
604
605 /**
606  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
607  *
608  * @param cls the socket (set from GNUNET_MESH_connect)
609  * @param tunnel connection to the other end
610  * @param tunnel_ctx this is NULL
611  * @param sender who sent the message
612  * @param message the actual message
613  * @param atsi performance data for the connection
614  * @return GNUNET_OK to keep the connection open,
615  *         GNUNET_SYSERR to close it (signal serious error)
616  */
617 static int
618 client_handle_transmit_close_ack (void *cls,
619                                   struct GNUNET_MESH_Tunnel *tunnel,
620                                   void **tunnel_ctx,
621                                   const struct GNUNET_PeerIdentity *sender,
622                                   const struct GNUNET_MessageHeader *message,
623                                   const struct GNUNET_ATS_Information*atsi)
624 {
625   struct GNUNET_STREAM_Socket *socket = cls;
626
627   return GNUNET_OK;
628 }
629
630
631 /**
632  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
633  *
634  * @param cls the socket (set from GNUNET_MESH_connect)
635  * @param tunnel connection to the other end
636  * @param tunnel_ctx this is NULL
637  * @param sender who sent the message
638  * @param message the actual message
639  * @param atsi performance data for the connection
640  * @return GNUNET_OK to keep the connection open,
641  *         GNUNET_SYSERR to close it (signal serious error)
642  */
643 static int
644 client_handle_receive_close (void *cls,
645                              struct GNUNET_MESH_Tunnel *tunnel,
646                              void **tunnel_ctx,
647                              const struct GNUNET_PeerIdentity *sender,
648                              const struct GNUNET_MessageHeader *message,
649                              const struct GNUNET_ATS_Information*atsi)
650 {
651   struct GNUNET_STREAM_Socket *socket = cls;
652
653   return GNUNET_OK;
654 }
655
656
657 /**
658  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
659  *
660  * @param cls the socket (set from GNUNET_MESH_connect)
661  * @param tunnel connection to the other end
662  * @param tunnel_ctx this is NULL
663  * @param sender who sent the message
664  * @param message the actual message
665  * @param atsi performance data for the connection
666  * @return GNUNET_OK to keep the connection open,
667  *         GNUNET_SYSERR to close it (signal serious error)
668  */
669 static int
670 client_handle_receive_close_ack (void *cls,
671                                  struct GNUNET_MESH_Tunnel *tunnel,
672                                  void **tunnel_ctx,
673                                  const struct GNUNET_PeerIdentity *sender,
674                                  const struct GNUNET_MessageHeader *message,
675                                  const struct GNUNET_ATS_Information*atsi)
676 {
677   struct GNUNET_STREAM_Socket *socket = cls;
678
679   return GNUNET_OK;
680 }
681
682
683 /**
684  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
685  *
686  * @param cls the socket (set from GNUNET_MESH_connect)
687  * @param tunnel connection to the other end
688  * @param tunnel_ctx this is NULL
689  * @param sender who sent the message
690  * @param message the actual message
691  * @param atsi performance data for the connection
692  * @return GNUNET_OK to keep the connection open,
693  *         GNUNET_SYSERR to close it (signal serious error)
694  */
695 static int
696 client_handle_close (void *cls,
697                      struct GNUNET_MESH_Tunnel *tunnel,
698                      void **tunnel_ctx,
699                      const struct GNUNET_PeerIdentity *sender,
700                      const struct GNUNET_MessageHeader *message,
701                      const struct GNUNET_ATS_Information*atsi)
702 {
703   struct GNUNET_STREAM_Socket *socket = cls;
704
705   return GNUNET_OK;
706 }
707
708
709 /**
710  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
711  *
712  * @param cls the socket (set from GNUNET_MESH_connect)
713  * @param tunnel connection to the other end
714  * @param tunnel_ctx this is NULL
715  * @param sender who sent the message
716  * @param message the actual message
717  * @param atsi performance data for the connection
718  * @return GNUNET_OK to keep the connection open,
719  *         GNUNET_SYSERR to close it (signal serious error)
720  */
721 static int
722 client_handle_close_ack (void *cls,
723                          struct GNUNET_MESH_Tunnel *tunnel,
724                          void **tunnel_ctx,
725                          const struct GNUNET_PeerIdentity *sender,
726                          const struct GNUNET_MessageHeader *message,
727                          const struct GNUNET_ATS_Information*atsi)
728 {
729   struct GNUNET_STREAM_Socket *socket = cls;
730
731   return GNUNET_OK;
732 }
733
734 /*****************************/
735 /* Server's Message Handlers */
736 /*****************************/
737
738 /**
739  * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
740  *
741  * @param cls the closure
742  * @param tunnel connection to the other end
743  * @param tunnel_ctx the socket
744  * @param sender who sent the message
745  * @param message the actual message
746  * @param atsi performance data for the connection
747  * @return GNUNET_OK to keep the connection open,
748  *         GNUNET_SYSERR to close it (signal serious error)
749  */
750 static int
751 server_handle_data (void *cls,
752                     struct GNUNET_MESH_Tunnel *tunnel,
753                     void **tunnel_ctx,
754                     const struct GNUNET_PeerIdentity *sender,
755                     const struct GNUNET_MessageHeader *message,
756                     const struct GNUNET_ATS_Information*atsi)
757 {
758   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
759
760   return GNUNET_OK;
761 }
762
763
764 /**
765  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
766  *
767  * @param cls the closure
768  * @param tunnel connection to the other end
769  * @param tunnel_ctx the socket
770  * @param sender who sent the message
771  * @param message the actual message
772  * @param atsi performance data for the connection
773  * @return GNUNET_OK to keep the connection open,
774  *         GNUNET_SYSERR to close it (signal serious error)
775  */
776 static int
777 server_handle_hello (void *cls,
778                      struct GNUNET_MESH_Tunnel *tunnel,
779                      void **tunnel_ctx,
780                      const struct GNUNET_PeerIdentity *sender,
781                      const struct GNUNET_MessageHeader *message,
782                      const struct GNUNET_ATS_Information*atsi)
783 {
784   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
785   struct GNUNET_STREAM_HelloAckMessage *reply;
786
787   GNUNET_assert (socket->tunnel == tunnel);
788   if (STATE_INIT == socket->state)
789     {
790       /* Get the random sequence number */
791       socket->write_sequence_number = 
792         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
793       reply = 
794         GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
795       reply->header.header.size = 
796         htons (sizeof (struct GNUNET_STREAM_MessageHeader));
797       reply->header.header.type = 
798         htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
799       reply->sequence_number = htonl (socket->write_sequence_number);
800       queue_message (socket, 
801                      &reply->header,
802                      &set_state_hello_wait, 
803                      NULL);
804     }
805   else
806     {
807       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
808                   "Client sent HELLO when in state %d\n", socket->state);
809       /* FIXME: Send RESET? */
810       
811     }
812   return GNUNET_OK;
813 }
814
815
816 /**
817  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
818  *
819  * @param cls the closure
820  * @param tunnel connection to the other end
821  * @param tunnel_ctx the socket
822  * @param sender who sent the message
823  * @param message the actual message
824  * @param atsi performance data for the connection
825  * @return GNUNET_OK to keep the connection open,
826  *         GNUNET_SYSERR to close it (signal serious error)
827  */
828 static int
829 server_handle_hello_ack (void *cls,
830                          struct GNUNET_MESH_Tunnel *tunnel,
831                          void **tunnel_ctx,
832                          const struct GNUNET_PeerIdentity *sender,
833                          const struct GNUNET_MessageHeader *message,
834                          const struct GNUNET_ATS_Information*atsi)
835 {
836   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
837   const struct GNUNET_STREAM_HelloAckMessage *ack_message;
838
839   ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
840   GNUNET_assert (socket->tunnel == tunnel);
841   if (STATE_HELLO_WAIT == socket->state)
842     {
843       socket->read_sequence_number = ntohs (ack_message->sequence_number);
844       socket->state = STATE_ESTABLISHED;
845     }
846   else
847     {
848       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
849                   "Client sent HELLO_ACK when in state %d\n", socket->state);
850       /* FIXME: Send RESET? */
851       
852     }
853   return GNUNET_OK;
854 }
855
856
857 /**
858  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
859  *
860  * @param cls the closure
861  * @param tunnel connection to the other end
862  * @param tunnel_ctx the socket
863  * @param sender who sent the message
864  * @param message the actual message
865  * @param atsi performance data for the connection
866  * @return GNUNET_OK to keep the connection open,
867  *         GNUNET_SYSERR to close it (signal serious error)
868  */
869 static int
870 server_handle_reset (void *cls,
871                      struct GNUNET_MESH_Tunnel *tunnel,
872                      void **tunnel_ctx,
873                      const struct GNUNET_PeerIdentity *sender,
874                      const struct GNUNET_MessageHeader *message,
875                      const struct GNUNET_ATS_Information*atsi)
876 {
877   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
878
879   return GNUNET_OK;
880 }
881
882
883 /**
884  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
885  *
886  * @param cls the closure
887  * @param tunnel connection to the other end
888  * @param tunnel_ctx the socket
889  * @param sender who sent the message
890  * @param message the actual message
891  * @param atsi performance data for the connection
892  * @return GNUNET_OK to keep the connection open,
893  *         GNUNET_SYSERR to close it (signal serious error)
894  */
895 static int
896 server_handle_transmit_close (void *cls,
897                               struct GNUNET_MESH_Tunnel *tunnel,
898                               void **tunnel_ctx,
899                               const struct GNUNET_PeerIdentity *sender,
900                               const struct GNUNET_MessageHeader *message,
901                               const struct GNUNET_ATS_Information*atsi)
902 {
903   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
904
905   return GNUNET_OK;
906 }
907
908
909 /**
910  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
911  *
912  * @param cls the closure
913  * @param tunnel connection to the other end
914  * @param tunnel_ctx the socket
915  * @param sender who sent the message
916  * @param message the actual message
917  * @param atsi performance data for the connection
918  * @return GNUNET_OK to keep the connection open,
919  *         GNUNET_SYSERR to close it (signal serious error)
920  */
921 static int
922 server_handle_transmit_close_ack (void *cls,
923                                   struct GNUNET_MESH_Tunnel *tunnel,
924                                   void **tunnel_ctx,
925                                   const struct GNUNET_PeerIdentity *sender,
926                                   const struct GNUNET_MessageHeader *message,
927                                   const struct GNUNET_ATS_Information*atsi)
928 {
929   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
930
931   return GNUNET_OK;
932 }
933
934
935 /**
936  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
937  *
938  * @param cls the closure
939  * @param tunnel connection to the other end
940  * @param tunnel_ctx the socket
941  * @param sender who sent the message
942  * @param message the actual message
943  * @param atsi performance data for the connection
944  * @return GNUNET_OK to keep the connection open,
945  *         GNUNET_SYSERR to close it (signal serious error)
946  */
947 static int
948 server_handle_receive_close (void *cls,
949                              struct GNUNET_MESH_Tunnel *tunnel,
950                              void **tunnel_ctx,
951                              const struct GNUNET_PeerIdentity *sender,
952                              const struct GNUNET_MessageHeader *message,
953                              const struct GNUNET_ATS_Information*atsi)
954 {
955   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
956
957   return GNUNET_OK;
958 }
959
960
961 /**
962  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
963  *
964  * @param cls the closure
965  * @param tunnel connection to the other end
966  * @param tunnel_ctx the socket
967  * @param sender who sent the message
968  * @param message the actual message
969  * @param atsi performance data for the connection
970  * @return GNUNET_OK to keep the connection open,
971  *         GNUNET_SYSERR to close it (signal serious error)
972  */
973 static int
974 server_handle_receive_close_ack (void *cls,
975                                  struct GNUNET_MESH_Tunnel *tunnel,
976                                  void **tunnel_ctx,
977                                  const struct GNUNET_PeerIdentity *sender,
978                                  const struct GNUNET_MessageHeader *message,
979                                  const struct GNUNET_ATS_Information*atsi)
980 {
981   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
982
983   return GNUNET_OK;
984 }
985
986
987 /**
988  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
989  *
990  * @param cls the closure
991  * @param tunnel connection to the other end
992  * @param tunnel_ctx the socket
993  * @param sender who sent the message
994  * @param message the actual message
995  * @param atsi performance data for the connection
996  * @return GNUNET_OK to keep the connection open,
997  *         GNUNET_SYSERR to close it (signal serious error)
998  */
999 static int
1000 server_handle_close (void *cls,
1001                      struct GNUNET_MESH_Tunnel *tunnel,
1002                      void **tunnel_ctx,
1003                      const struct GNUNET_PeerIdentity *sender,
1004                      const struct GNUNET_MessageHeader *message,
1005                      const struct GNUNET_ATS_Information*atsi)
1006 {
1007   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1008
1009   return GNUNET_OK;
1010 }
1011
1012
1013 /**
1014  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1015  *
1016  * @param cls the closure
1017  * @param tunnel connection to the other end
1018  * @param tunnel_ctx the socket
1019  * @param sender who sent the message
1020  * @param message the actual message
1021  * @param atsi performance data for the connection
1022  * @return GNUNET_OK to keep the connection open,
1023  *         GNUNET_SYSERR to close it (signal serious error)
1024  */
1025 static int
1026 server_handle_close_ack (void *cls,
1027                          struct GNUNET_MESH_Tunnel *tunnel,
1028                          void **tunnel_ctx,
1029                          const struct GNUNET_PeerIdentity *sender,
1030                          const struct GNUNET_MessageHeader *message,
1031                          const struct GNUNET_ATS_Information*atsi)
1032 {
1033   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1034
1035   return GNUNET_OK;
1036 }
1037
1038
1039 /**
1040  * Message Handler for mesh
1041  *
1042  * @param cls closure (set from GNUNET_MESH_connect)
1043  * @param tunnel connection to the other end
1044  * @param tunnel_ctx place to store local state associated with the tunnel
1045  * @param sender who sent the message
1046  * @param ack the actual message
1047  * @param atsi performance data for the connection
1048  * @return GNUNET_OK to keep the connection open,
1049  *         GNUNET_SYSERR to close it (signal serious error)
1050  */
1051 static int
1052 handle_ack (struct GNUNET_STREAM_Socket *socket,
1053             struct GNUNET_MESH_Tunnel *tunnel,
1054             const struct GNUNET_PeerIdentity *sender,
1055             const struct GNUNET_STREAM_AckMessage *ack,
1056             const struct GNUNET_ATS_Information*atsi)
1057 {
1058   return GNUNET_OK;
1059 }
1060
1061
1062 /**
1063  * Message Handler for mesh
1064  *
1065  * @param cls the 'struct GNUNET_STREAM_Socket'
1066  * @param tunnel connection to the other end
1067  * @param tunnel_ctx unused
1068  * @param sender who sent the message
1069  * @param message the actual message
1070  * @param atsi performance data for the connection
1071  * @return GNUNET_OK to keep the connection open,
1072  *         GNUNET_SYSERR to close it (signal serious error)
1073  */
1074 static int
1075 client_handle_ack (void *cls,
1076                    struct GNUNET_MESH_Tunnel *tunnel,
1077                    void **tunnel_ctx,
1078                    const struct GNUNET_PeerIdentity *sender,
1079                    const struct GNUNET_MessageHeader *message,
1080                    const struct GNUNET_ATS_Information*atsi)
1081 {
1082   struct GNUNET_STREAM_Socket *socket = cls;
1083   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
1084  
1085   return handle_ack (socket, tunnel, sender, ack, atsi);
1086 }
1087
1088
1089 /**
1090  * Message Handler for mesh
1091  *
1092  * @param cls the server's listen socket
1093  * @param tunnel connection to the other end
1094  * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
1095  * @param sender who sent the message
1096  * @param message the actual message
1097  * @param atsi performance data for the connection
1098  * @return GNUNET_OK to keep the connection open,
1099  *         GNUNET_SYSERR to close it (signal serious error)
1100  */
1101 static int
1102 server_handle_ack (void *cls,
1103                    struct GNUNET_MESH_Tunnel *tunnel,
1104                    void **tunnel_ctx,
1105                    const struct GNUNET_PeerIdentity *sender,
1106                    const struct GNUNET_MessageHeader *message,
1107                    const struct GNUNET_ATS_Information*atsi)
1108 {
1109   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1110   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
1111  
1112   return handle_ack (socket, tunnel, sender, ack, atsi);
1113 }
1114
1115
1116 /**
1117  * For client message handlers, the stream socket is in the
1118  * closure argument.
1119  */
1120 static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
1121   {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
1122   {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
1123    sizeof (struct GNUNET_STREAM_AckMessage) },
1124   {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
1125    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
1126   {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
1127    sizeof (struct GNUNET_STREAM_MessageHeader)},
1128   {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
1129    sizeof (struct GNUNET_STREAM_MessageHeader)},
1130   {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
1131    sizeof (struct GNUNET_STREAM_MessageHeader)},
1132   {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
1133    sizeof (struct GNUNET_STREAM_MessageHeader)},
1134   {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
1135    sizeof (struct GNUNET_STREAM_MessageHeader)},
1136   {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
1137    sizeof (struct GNUNET_STREAM_MessageHeader)},
1138   {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
1139    sizeof (struct GNUNET_STREAM_MessageHeader)},
1140   {NULL, 0, 0}
1141 };
1142
1143
1144 /**
1145  * For server message handlers, the stream socket is in the
1146  * tunnel context, and the listen socket in the closure argument.
1147  */
1148 static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
1149   {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
1150   {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
1151    sizeof (struct GNUNET_STREAM_AckMessage) },
1152   {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 
1153    sizeof (struct GNUNET_STREAM_MessageHeader)},
1154   {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
1155    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
1156   {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
1157    sizeof (struct GNUNET_STREAM_MessageHeader)},
1158   {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
1159    sizeof (struct GNUNET_STREAM_MessageHeader)},
1160   {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
1161    sizeof (struct GNUNET_STREAM_MessageHeader)},
1162   {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
1163    sizeof (struct GNUNET_STREAM_MessageHeader)},
1164   {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
1165    sizeof (struct GNUNET_STREAM_MessageHeader)},
1166   {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
1167    sizeof (struct GNUNET_STREAM_MessageHeader)},
1168   {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
1169    sizeof (struct GNUNET_STREAM_MessageHeader)},
1170   {NULL, 0, 0}
1171 };
1172
1173
1174 /**
1175  * Function called when our target peer is connected to our tunnel
1176  *
1177  * @param peer the peer identity of the target
1178  * @param atsi performance data for the connection
1179  */
1180 static void
1181 mesh_peer_connect_callback (void *cls,
1182                             const struct GNUNET_PeerIdentity *peer,
1183                             const struct GNUNET_ATS_Information * atsi)
1184 {
1185   struct GNUNET_STREAM_Socket *socket = cls;
1186   struct GNUNET_STREAM_MessageHeader *message;
1187
1188   if (0 != memcmp (&socket->other_peer, 
1189                    peer, 
1190                    sizeof (struct GNUNET_PeerIdentity)))
1191     {
1192       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1193                   "A peer (%s) which is not our target has connected to our tunnel", 
1194                   GNUNET_i2s (peer));
1195       return;
1196     }
1197   
1198   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1199               "Target peer %s connected\n", GNUNET_i2s (peer));
1200   
1201   /* Set state to INIT */
1202   socket->state = STATE_INIT;
1203
1204   /* Send HELLO message */
1205   message = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1206   message->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
1207   message->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1208   queue_message (socket,
1209                  message,
1210                  &set_state_hello_wait,
1211                  NULL);
1212
1213   /* Call open callback */
1214   if (NULL == socket->open_cb)
1215     {
1216       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1217                   "STREAM_open callback is NULL\n");
1218     }
1219   else
1220     {
1221       socket->open_cb (socket->open_cls, socket);
1222     }
1223 }
1224
1225
1226 /**
1227  * Function called when our target peer is disconnected from our tunnel
1228  *
1229  * @param peer the peer identity of the target
1230  */
1231 static void
1232 mesh_peer_disconnect_callback (void *cls,
1233                                const struct GNUNET_PeerIdentity *peer)
1234 {
1235
1236 }
1237
1238
1239 /*****************/
1240 /* API functions */
1241 /*****************/
1242
1243
1244 /**
1245  * Tries to open a stream to the target peer
1246  *
1247  * @param cfg configuration to use
1248  * @param target the target peer to which the stream has to be opened
1249  * @param app_port the application port number which uniquely identifies this
1250  *            stream
1251  * @param open_cb this function will be called after stream has be established 
1252  * @param open_cb_cls the closure for open_cb
1253  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
1254  * @return if successful it returns the stream socket; NULL if stream cannot be
1255  *         opened 
1256  */
1257 struct GNUNET_STREAM_Socket *
1258 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
1259                     const struct GNUNET_PeerIdentity *target,
1260                     GNUNET_MESH_ApplicationType app_port,
1261                     GNUNET_STREAM_OpenCallback open_cb,
1262                     void *open_cb_cls,
1263                     ...)
1264 {
1265   struct GNUNET_STREAM_Socket *socket;
1266   enum GNUNET_STREAM_Option option;
1267   va_list vargs;                /* Variable arguments */
1268
1269   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
1270   socket->other_peer = *target;
1271   socket->open_cb = open_cb;
1272   socket->open_cls = open_cb_cls;
1273
1274   /* Set defaults */
1275   socket->retransmit_timeout = 
1276     GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
1277
1278   va_start (vargs, open_cb_cls); /* Parse variable args */
1279   do {
1280     option = va_arg (vargs, enum GNUNET_STREAM_Option);
1281     switch (option)
1282       {
1283       case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
1284         /* Expect struct GNUNET_TIME_Relative */
1285         socket->retransmit_timeout = va_arg (vargs,
1286                                              struct GNUNET_TIME_Relative);
1287         break;
1288       case GNUNET_STREAM_OPTION_END:
1289         break;
1290       }
1291   } while (GNUNET_STREAM_OPTION_END != option);
1292   va_end (vargs);               /* End of variable args parsing */
1293
1294   socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
1295                                       1,  /* QUEUE size as parameter? */
1296                                       socket, /* cls */
1297                                       NULL, /* No inbound tunnel handler */
1298                                       NULL, /* No inbound tunnel cleaner */
1299                                       client_message_handlers,
1300                                       NULL); /* We don't get inbound tunnels */
1301   // FIXME: if (NULL == socket->mesh) ...
1302
1303   /* Now create the mesh tunnel to target */
1304   socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
1305                                               NULL, /* Tunnel context */
1306                                               &mesh_peer_connect_callback,
1307                                               &mesh_peer_disconnect_callback,
1308                                               socket);
1309   // FIXME: if (NULL == socket->tunnel) ...
1310
1311   return socket;
1312 }
1313
1314
1315 /**
1316  * Closes the stream
1317  *
1318  * @param socket the stream socket
1319  */
1320 void
1321 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
1322 {
1323   struct MessageQueue *head;
1324
1325   /* Clear Transmit handles */
1326   if (NULL != socket->transmit_handle)
1327     {
1328       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
1329       socket->transmit_handle = NULL;
1330     }
1331
1332   /* Clear existing message queue */
1333   while (NULL != (head = socket->queue_head)) {
1334     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
1335                                  socket->queue_tail,
1336                                  head);
1337     GNUNET_free (head->message);
1338     GNUNET_free (head);
1339   }
1340
1341   /* Close associated tunnel */
1342   if (NULL != socket->tunnel)
1343     {
1344       GNUNET_MESH_tunnel_destroy (socket->tunnel);
1345       socket->tunnel = NULL;
1346     }
1347
1348   /* Close mesh connection */
1349   if (NULL != socket->mesh)
1350     {
1351       GNUNET_MESH_disconnect (socket->mesh);
1352       socket->mesh = NULL;
1353     }
1354   GNUNET_free (socket);
1355 }
1356
1357
1358 /**
1359  * Method called whenever a peer creates a tunnel to us
1360  *
1361  * @param cls closure
1362  * @param tunnel new handle to the tunnel
1363  * @param initiator peer that started the tunnel
1364  * @param atsi performance information for the tunnel
1365  * @return initial tunnel context for the tunnel
1366  *         (can be NULL -- that's not an error)
1367  */
1368 static void *
1369 new_tunnel_notify (void *cls,
1370                    struct GNUNET_MESH_Tunnel *tunnel,
1371                    const struct GNUNET_PeerIdentity *initiator,
1372                    const struct GNUNET_ATS_Information *atsi)
1373 {
1374   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
1375   struct GNUNET_STREAM_Socket *socket;
1376
1377   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
1378   socket->tunnel = tunnel;
1379   socket->session_id = 0;       /* FIXME */
1380   socket->other_peer = *initiator;
1381   socket->state = STATE_INIT;
1382
1383   if (GNUNET_SYSERR == lsocket->listen_cb (lsocket->listen_cb_cls,
1384                                            socket,
1385                                            &socket->other_peer))
1386     {
1387       socket->state = STATE_CLOSED;
1388       /* FIXME: Send CLOSE message and then free */
1389       GNUNET_free (socket);
1390       GNUNET_MESH_tunnel_destroy (tunnel); /* Destroy the tunnel */
1391     }
1392   return socket;
1393 }
1394
1395
1396 /**
1397  * Function called whenever an inbound tunnel is destroyed.  Should clean up
1398  * any associated state.  This function is NOT called if the client has
1399  * explicitly asked for the tunnel to be destroyed using
1400  * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
1401  * the tunnel.
1402  *
1403  * @param cls closure (set from GNUNET_MESH_connect)
1404  * @param tunnel connection to the other end (henceforth invalid)
1405  * @param tunnel_ctx place where local state associated
1406  *                   with the tunnel is stored
1407  */
1408 static void 
1409 tunnel_cleaner (void *cls,
1410                 const struct GNUNET_MESH_Tunnel *tunnel,
1411                 void *tunnel_ctx)
1412 {
1413   struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
1414   
1415   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1416               "Peer %s has terminated connection abruptly\n",
1417               GNUNET_i2s (&socket->other_peer));
1418
1419   socket->status = GNUNET_STREAM_SHUTDOWN;
1420   /* Clear Transmit handles */
1421   if (NULL != socket->transmit_handle)
1422     {
1423       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
1424       socket->transmit_handle = NULL;
1425     }
1426   socket->tunnel = NULL;
1427 }
1428
1429
1430 /**
1431  * Listens for stream connections for a specific application ports
1432  *
1433  * @param cfg the configuration to use
1434  * @param app_port the application port for which new streams will be accepted
1435  * @param listen_cb this function will be called when a peer tries to establish
1436  *            a stream with us
1437  * @param listen_cb_cls closure for listen_cb
1438  * @return listen socket, NULL for any error
1439  */
1440 struct GNUNET_STREAM_ListenSocket *
1441 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
1442                       GNUNET_MESH_ApplicationType app_port,
1443                       GNUNET_STREAM_ListenCallback listen_cb,
1444                       void *listen_cb_cls)
1445 {
1446   /* FIXME: Add variable args for passing configration options? */
1447   struct GNUNET_STREAM_ListenSocket *lsocket;
1448   GNUNET_MESH_ApplicationType app_types[2];
1449
1450   app_types[0] = app_port;
1451   app_types[1] = 0;
1452   lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
1453   lsocket->port = app_port;
1454   lsocket->listen_cb = listen_cb;
1455   lsocket->listen_cb_cls = listen_cb_cls;
1456   lsocket->mesh = GNUNET_MESH_connect (cfg,
1457                                        10, /* FIXME: QUEUE size as parameter? */
1458                                        lsocket, /* Closure */
1459                                        &new_tunnel_notify,
1460                                        &tunnel_cleaner,
1461                                        server_message_handlers,
1462                                        app_types);
1463   return lsocket;
1464 }
1465
1466
1467 /**
1468  * Closes the listen socket
1469  *
1470  * @param socket the listen socket
1471  */
1472 void
1473 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
1474 {
1475   /* Close MESH connection */
1476   GNUNET_MESH_disconnect (lsocket->mesh);
1477   
1478   GNUNET_free (lsocket);
1479 }
1480
1481
1482 /**
1483  * Tries to write the given data to the stream
1484  *
1485  * @param socket the socket representing a stream
1486  * @param data the data buffer from where the data is written into the stream
1487  * @param size the number of bytes to be written from the data buffer
1488  * @param timeout the timeout period
1489  * @param write_cont the function to call upon writing some bytes into the stream
1490  * @param write_cont_cls the closure
1491  * @return handle to cancel the operation
1492  */
1493 struct GNUNET_STREAM_IOHandle *
1494 GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
1495                      const void *data,
1496                      size_t size,
1497                      struct GNUNET_TIME_Relative timeout,
1498                      GNUNET_STREAM_CompletionContinuation write_cont,
1499                      void *write_cont_cls)
1500 {
1501   unsigned int num_needed_packets;
1502   unsigned int packet;
1503   struct GNUNET_STREAM_IOHandle *io_handle;
1504   struct GNUNET_STREAM_DataMessage *data_msg;
1505   size_t max_payload_size;
1506   size_t packet_size;
1507   const void *sweep;
1508
1509   /* There is already a write request pending */
1510   if (NULL != socket->write_handle) return NULL;
1511   if (!(STATE_ESTABLISHED == socket->state 
1512         || STATE_RECEIVE_CLOSE_WAIT == socket->state
1513         || STATE_RECEIVE_CLOSED == socket->state))
1514     {
1515       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1516                   "Attempting to write on a closed/not-yet-established stream\n");
1517       return NULL;
1518     }
1519       
1520   max_payload_size = 
1521     MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
1522   num_needed_packets = ceil (size / max_payload_size);
1523   if (64 < num_needed_packets) 
1524     {
1525       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1526                   "Given buffer cannot be accommodated in 64 packets\n");
1527       num_needed_packets = 64;
1528     }
1529
1530   io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOHandle));
1531   sweep = data;
1532   /* Divide the given area into packets for sending */
1533   for (packet=0; packet < num_needed_packets; packet++)
1534     {
1535       if ((packet + 1) * max_payload_size < size) 
1536         {
1537           packet_size = MAX_PACKET_SIZE;
1538         }
1539       else 
1540         {
1541           packet_size = size - packet * max_payload_size
1542             + sizeof (struct GNUNET_STREAM_DataMessage);
1543         }
1544       io_handle->messages[packet] = GNUNET_malloc (packet_size);
1545       io_handle->messages[packet]->header.header.size = htons (packet_size);
1546       io_handle->messages[packet]->header.header.type =
1547         htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
1548       data_msg = io_handle->messages[packet];
1549       memcpy (&data_msg[1],
1550               sweep,
1551               packet_size - sizeof (struct GNUNET_STREAM_DataMessage));
1552       sweep += packet_size - sizeof (struct GNUNET_STREAM_DataMessage);
1553     }
1554
1555   return io_handle;
1556 }