d1b4dc29918d52041c291235c0e3e7f0da07789e
[oweals/gnunet.git] / src / stream / stream_api.c
1 /*
2   This file is part of GNUnet.
3   (C) 2012 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file stream/stream_api.c
23  * @brief Implementation of the stream library
24  * @author Sree Harsha Totakura
25  */
26 #include "platform.h"
27 #include "gnunet_common.h"
28 #include "gnunet_stream_lib.h"
29 #include "stream_protocol.h"
30
31 /**
32  * states in the Protocol
33  */
34 enum State
35   {
36     /**
37      * Client initialization state
38      */
39     STATE_INIT,
40
41     /**
42      * Listener initialization state 
43      */
44     STATE_LISTEN,
45
46     /**
47      * Pre-connection establishment state
48      */
49     STATE_HELLO_WAIT,
50
51     /**
52      * State where a connection has been established
53      */
54     STATE_ESTABLISHED,
55
56     /**
57      * State where the socket is closed on our side and waiting to be ACK'ed
58      */
59     STATE_RECEIVE_CLOSE_WAIT,
60
61     /**
62      * State where the socket is closed for reading
63      */
64     STATE_RECEIVE_CLOSED,
65
66     /**
67      * State where the socket is closed on our side and waiting to be ACK'ed
68      */
69     STATE_TRANSMIT_CLOSE_WAIT,
70
71     /**
72      * State where the socket is closed for writing
73      */
74     STATE_TRANSMIT_CLOSED,
75
76     /**
77      * State where the socket is closed on our side and waiting to be ACK'ed
78      */
79     STATE_CLOSE_WAIT,
80
81     /**
82      * State where the socket is closed
83      */
84     STATE_CLOSED 
85   };
86
87
88 /**
89  * Functions of this type are called when a message is written
90  *
91  * @param socket the socket the written message was bound to
92  */
93 typedef void (*SendFinishCallback) (void *cls,
94                                     struct GNUNET_STREAM_Socket *socket);
95
96
97 /**
98  * The send message queue
99  */
100 struct MessageQueue
101 {
102   /**
103    * The message
104    */
105   struct GNUNET_STREAM_MessageHeader *message;
106
107   /**
108    * Callback to be called when the message is sent
109    */
110   SendFinishCallback finish_cb;
111
112   /**
113    * The closure for finish_cb
114    */
115   void *finish_cb_cls;
116
117   /**
118    * The next message in queue. Should be NULL in the last message
119    */
120   struct MessageQueue *next;
121 };
122
123
124 /**
125  * The STREAM Socket Handler
126  */
127 struct GNUNET_STREAM_Socket
128 {
129   /**
130    * The mesh handle
131    */
132   struct GNUNET_MESH_Handle *mesh;
133
134   /**
135    * The mesh tunnel handle
136    */
137   struct GNUNET_MESH_Tunnel *tunnel;
138
139   /**
140    * The session id associated with this stream connection
141    */
142   uint32_t session_id;
143
144   /**
145    * The peer identity of the peer at the other end of the stream
146    */
147   struct GNUNET_PeerIdentity other_peer;
148
149   /**
150    * Stream open closure
151    */
152   void *open_cls;
153
154   /**
155    * Stream open callback
156    */
157   GNUNET_STREAM_OpenCallback open_cb;
158
159   /**
160    * Retransmission timeout
161    */
162   struct GNUNET_TIME_Relative retransmit_timeout;
163
164   /**
165    * The state of the protocol associated with this socket
166    */
167   enum State state;
168
169   /**
170    * The status of the socket
171    */
172   enum GNUNET_STREAM_Status status;
173
174   /**
175    * The current transmit handle (if a pending transmit request exists)
176    */
177   struct GNUNET_MESH_TransmitHandle *transmit_handle;
178
179   /**
180    * The current message associated with the transmit handle
181    */
182   struct MessageQueue *queue;
183
184   /**
185    * The queue tail, should always point to the last message in queue
186    */
187   struct MessageQueue *queue_tail;
188
189   /**
190    * The number of previous timeouts
191    */
192   unsigned int retries;
193 };
194
195
196 /**
197  * A socket for listening
198  */
199 struct GNUNET_STREAM_ListenSocket
200 {
201
202   /**
203    * The mesh handle
204    */
205   struct GNUNET_MESH_Handle *mesh;
206
207   /**
208    * The service port
209    */
210   GNUNET_MESH_ApplicationType port;
211
212   /**
213    * The callback function which is called after successful opening socket
214    */
215   GNUNET_STREAM_ListenCallback listen_cb;
216
217   /**
218    * The call back closure
219    */
220   void *listen_cb_cls;
221
222 };
223
224
225 /**
226  * Default value in seconds for various timeouts
227  */
228 static unsigned int default_timeout = 300;
229
230
231 /**
232  * Callback function for sending hello message
233  *
234  * @param cls closure the socket
235  * @param size number of bytes available in buf
236  * @param buf where the callee should write the message
237  * @return number of bytes written to buf
238  */
239 static size_t
240 send_message_notify (void *cls, size_t size, void *buf)
241 {
242   struct GNUNET_STREAM_Socket *socket = cls;
243   struct MessageQueue *head;
244   size_t ret;
245
246   head = socket->queue;
247   socket->transmit_handle = NULL; /* Remove the transmit handle */
248   if (0 == size)                /* request timed out */
249     {
250       socket->retries++;
251       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
252                   "Message sending timed out. Retry %d \n",
253                   socket->retries);
254       socket->transmit_handle = 
255         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
256                                            0, /* Corking */
257                                            1, /* Priority */
258                                            /* FIXME: exponential backoff */
259                                            socket->retransmit_timeout,
260                                            &socket->other_peer,
261                                            ntohs (head->message->header.size),
262                                            &send_message_notify,
263                                            socket);
264       return 0;
265     }
266
267   ret = ntohs (head->message->header.size);
268   GNUNET_assert (size >= ret);
269   memcpy (buf, head->message, ret);
270   if (NULL != head->finish_cb)
271     {
272       head->finish_cb (socket, head->finish_cb_cls);
273     }
274
275   socket->queue = head->next;   /* Will be NULL if the queue is empty */
276   GNUNET_free (head->message);
277   GNUNET_free (head);
278   head = socket->queue;
279   if (NULL != head)    /* more pending messages to send */
280     {
281       socket->retries = 0;
282       socket->transmit_handle = 
283         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
284                                            0, /* Corking */
285                                            1, /* Priority */
286                                            /* FIXME: exponential backoff */
287                                            socket->retransmit_timeout,
288                                            &socket->other_peer,
289                                            ntohs (head->message->header.size),
290                                            &send_message_notify,
291                                            socket);
292     }
293   return ret;
294 }
295
296
297 /**
298  * Queues a message for sending using the mesh connection of a socket
299  *
300  * @param socket the socket whose mesh connection is used
301  * @param message the message to be sent
302  * @param finish_cb the callback to be called when the message is sent
303  * @param finish_cb_cls the closure for the callback
304  */
305 static void
306 queue_message (struct GNUNET_STREAM_Socket *socket,
307                struct GNUNET_STREAM_MessageHeader *message,
308                SendFinishCallback finish_cb,
309                void *finish_cb_cls)
310 {
311   struct MessageQueue *msg_info;
312
313   msg_info = GNUNET_malloc (sizeof (struct MessageQueue));
314   msg_info->message = message;
315   msg_info->finish_cb = finish_cb;
316   msg_info->finish_cb_cls = finish_cb_cls;
317   msg_info->next = NULL;
318
319
320   if (NULL == socket->queue)
321     {
322       socket->queue = msg_info;
323       socket->queue_tail = msg_info;
324       socket->retries = 0;
325       socket->transmit_handle = 
326         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
327                                            0, /* Corking */
328                                            1, /* Priority */
329                                            socket->retransmit_timeout,
330                                            &socket->other_peer,
331                                            ntohs (message->header.size),
332                                            &send_message_notify,
333                                            socket);
334     }
335   else                          /* There is a pending message in queue */
336     {
337       socket->queue_tail->next = msg_info; /* Add to tail */
338       socket->queue_tail = msg_info;
339     }
340 }
341
342
343
344
345 /**
346  * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
347  *
348  * @param cls the socket (set from GNUNET_MESH_connect)
349  * @param tunnel connection to the other end
350  * @param tunnel_ctx place to store local state associated with the tunnel
351  * @param sender who sent the message
352  * @param message the actual message
353  * @param atsi performance data for the connection
354  * @return GNUNET_OK to keep the connection open,
355  *         GNUNET_SYSERR to close it (signal serious error)
356  */
357 static int
358 client_handle_data (void *cls,
359              struct GNUNET_MESH_Tunnel *tunnel,
360              void **tunnel_ctx,
361              const struct GNUNET_PeerIdentity *sender,
362              const struct GNUNET_MessageHeader *message,
363              const struct GNUNET_ATS_Information*atsi)
364 {
365   struct GNUNET_STREAM_Socket *socket = cls;
366   uint16_t size;
367   const struct GNUNET_STREAM_DataMessage *data_msg;
368   const void *payload;
369
370   size = ntohs (message->size);
371   if (size < sizeof (struct GNUNET_STREAM_DataMessage))
372   {
373     GNUNET_break_op (0);
374     return GNUNET_SYSERR;
375   }
376   data_msg = (const struct GNUNET_STREAM_DataMessage *) message;
377   size -= sizeof (struct GNUNET_STREAM_DataMessage);
378   payload = &data_msg[1];
379   /* ... */
380   
381   return GNUNET_OK;
382 }
383
384
385 /**
386  * Callback to set state to ESTABLISHED
387  *
388  * @param cls the closure from queue_message
389  * @param socket the socket to requiring state change
390  */
391 static void
392 set_state_established (void *cls,
393                        struct GNUNET_STREAM_Socket *socket)
394 {
395   socket->state = STATE_ESTABLISHED;
396 }
397
398
399 /**
400  * Callback to set state to HELLO_WAIT
401  *
402  * @param cls the closure from queue_message
403  * @param socket the socket to requiring state change
404  */
405 static void
406 set_state_hello_wait (void *cls,
407                       struct GNUNET_STREAM_Socket *socket)
408 {
409   GNUNET_assert (STATE_INIT == socket->state);
410   socket->state = STATE_HELLO_WAIT;
411 }
412
413
414 /**
415  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
416  *
417  * @param cls the socket (set from GNUNET_MESH_connect)
418  * @param tunnel connection to the other end
419  * @param tunnel_ctx this is NULL
420  * @param sender who sent the message
421  * @param message the actual message
422  * @param atsi performance data for the connection
423  * @return GNUNET_OK to keep the connection open,
424  *         GNUNET_SYSERR to close it (signal serious error)
425  */
426 static int
427 client_handle_hello_ack (void *cls,
428                          struct GNUNET_MESH_Tunnel *tunnel,
429                          void **tunnel_ctx,
430                          const struct GNUNET_PeerIdentity *sender,
431                          const struct GNUNET_MessageHeader *message,
432                          const struct GNUNET_ATS_Information*atsi)
433 {
434   struct GNUNET_STREAM_Socket *socket = cls;
435   struct GNUNET_STREAM_MessageHeader *reply;
436
437   GNUNET_assert (socket->tunnel == tunnel);
438   if (STATE_HELLO_WAIT == socket->state)
439     {
440       reply = 
441         GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
442       reply->header.size = 
443         htons (sizeof (struct GNUNET_STREAM_MessageHeader));
444       reply->header.type = 
445         htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
446       queue_message (socket, 
447                      reply, 
448                      &set_state_established, 
449                      NULL);
450     }
451
452   return GNUNET_OK;
453 }
454
455
456 /**
457  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
458  *
459  * @param cls the socket (set from GNUNET_MESH_connect)
460  * @param tunnel connection to the other end
461  * @param tunnel_ctx this is NULL
462  * @param sender who sent the message
463  * @param message the actual message
464  * @param atsi performance data for the connection
465  * @return GNUNET_OK to keep the connection open,
466  *         GNUNET_SYSERR to close it (signal serious error)
467  */
468 static int
469 client_handle_reset (void *cls,
470                      struct GNUNET_MESH_Tunnel *tunnel,
471                      void **tunnel_ctx,
472                      const struct GNUNET_PeerIdentity *sender,
473                      const struct GNUNET_MessageHeader *message,
474                      const struct GNUNET_ATS_Information*atsi)
475 {
476   struct GNUNET_STREAM_Socket *socket = cls;
477
478   return GNUNET_OK;
479 }
480
481
482 /**
483  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
484  *
485  * @param cls the socket (set from GNUNET_MESH_connect)
486  * @param tunnel connection to the other end
487  * @param tunnel_ctx this is NULL
488  * @param sender who sent the message
489  * @param message the actual message
490  * @param atsi performance data for the connection
491  * @return GNUNET_OK to keep the connection open,
492  *         GNUNET_SYSERR to close it (signal serious error)
493  */
494 static int
495 client_handle_transmit_close (void *cls,
496                               struct GNUNET_MESH_Tunnel *tunnel,
497                               void **tunnel_ctx,
498                               const struct GNUNET_PeerIdentity *sender,
499                               const struct GNUNET_MessageHeader *message,
500                               const struct GNUNET_ATS_Information*atsi)
501 {
502   struct GNUNET_STREAM_Socket *socket = cls;
503
504   return GNUNET_OK;
505 }
506
507
508 /**
509  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
510  *
511  * @param cls the socket (set from GNUNET_MESH_connect)
512  * @param tunnel connection to the other end
513  * @param tunnel_ctx this is NULL
514  * @param sender who sent the message
515  * @param message the actual message
516  * @param atsi performance data for the connection
517  * @return GNUNET_OK to keep the connection open,
518  *         GNUNET_SYSERR to close it (signal serious error)
519  */
520 static int
521 client_handle_transmit_close_ack (void *cls,
522                                   struct GNUNET_MESH_Tunnel *tunnel,
523                                   void **tunnel_ctx,
524                                   const struct GNUNET_PeerIdentity *sender,
525                                   const struct GNUNET_MessageHeader *message,
526                                   const struct GNUNET_ATS_Information*atsi)
527 {
528   struct GNUNET_STREAM_Socket *socket = cls;
529
530   return GNUNET_OK;
531 }
532
533
534 /**
535  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
536  *
537  * @param cls the socket (set from GNUNET_MESH_connect)
538  * @param tunnel connection to the other end
539  * @param tunnel_ctx this is NULL
540  * @param sender who sent the message
541  * @param message the actual message
542  * @param atsi performance data for the connection
543  * @return GNUNET_OK to keep the connection open,
544  *         GNUNET_SYSERR to close it (signal serious error)
545  */
546 static int
547 client_handle_receive_close (void *cls,
548                              struct GNUNET_MESH_Tunnel *tunnel,
549                              void **tunnel_ctx,
550                              const struct GNUNET_PeerIdentity *sender,
551                              const struct GNUNET_MessageHeader *message,
552                              const struct GNUNET_ATS_Information*atsi)
553 {
554   struct GNUNET_STREAM_Socket *socket = cls;
555
556   return GNUNET_OK;
557 }
558
559
560 /**
561  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
562  *
563  * @param cls the socket (set from GNUNET_MESH_connect)
564  * @param tunnel connection to the other end
565  * @param tunnel_ctx this is NULL
566  * @param sender who sent the message
567  * @param message the actual message
568  * @param atsi performance data for the connection
569  * @return GNUNET_OK to keep the connection open,
570  *         GNUNET_SYSERR to close it (signal serious error)
571  */
572 static int
573 client_handle_receive_close_ack (void *cls,
574                                  struct GNUNET_MESH_Tunnel *tunnel,
575                                  void **tunnel_ctx,
576                                  const struct GNUNET_PeerIdentity *sender,
577                                  const struct GNUNET_MessageHeader *message,
578                                  const struct GNUNET_ATS_Information*atsi)
579 {
580   struct GNUNET_STREAM_Socket *socket = cls;
581
582   return GNUNET_OK;
583 }
584
585
586 /**
587  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
588  *
589  * @param cls the socket (set from GNUNET_MESH_connect)
590  * @param tunnel connection to the other end
591  * @param tunnel_ctx this is NULL
592  * @param sender who sent the message
593  * @param message the actual message
594  * @param atsi performance data for the connection
595  * @return GNUNET_OK to keep the connection open,
596  *         GNUNET_SYSERR to close it (signal serious error)
597  */
598 static int
599 client_handle_close (void *cls,
600                      struct GNUNET_MESH_Tunnel *tunnel,
601                      void **tunnel_ctx,
602                      const struct GNUNET_PeerIdentity *sender,
603                      const struct GNUNET_MessageHeader *message,
604                      const struct GNUNET_ATS_Information*atsi)
605 {
606   struct GNUNET_STREAM_Socket *socket = cls;
607
608   return GNUNET_OK;
609 }
610
611
612 /**
613  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
614  *
615  * @param cls the socket (set from GNUNET_MESH_connect)
616  * @param tunnel connection to the other end
617  * @param tunnel_ctx this is NULL
618  * @param sender who sent the message
619  * @param message the actual message
620  * @param atsi performance data for the connection
621  * @return GNUNET_OK to keep the connection open,
622  *         GNUNET_SYSERR to close it (signal serious error)
623  */
624 static int
625 client_handle_close_ack (void *cls,
626                          struct GNUNET_MESH_Tunnel *tunnel,
627                          void **tunnel_ctx,
628                          const struct GNUNET_PeerIdentity *sender,
629                          const struct GNUNET_MessageHeader *message,
630                          const struct GNUNET_ATS_Information*atsi)
631 {
632   struct GNUNET_STREAM_Socket *socket = cls;
633
634   return GNUNET_OK;
635 }
636
637 /*****************************/
638 /* Server's Message Handlers */
639 /*****************************/
640
641 /**
642  * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
643  *
644  * @param cls the closure
645  * @param tunnel connection to the other end
646  * @param tunnel_ctx the socket
647  * @param sender who sent the message
648  * @param message the actual message
649  * @param atsi performance data for the connection
650  * @return GNUNET_OK to keep the connection open,
651  *         GNUNET_SYSERR to close it (signal serious error)
652  */
653 static int
654 server_handle_data (void *cls,
655                     struct GNUNET_MESH_Tunnel *tunnel,
656                     void **tunnel_ctx,
657                     const struct GNUNET_PeerIdentity *sender,
658                     const struct GNUNET_MessageHeader *message,
659                     const struct GNUNET_ATS_Information*atsi)
660 {
661   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
662
663   return GNUNET_OK;
664 }
665
666
667 /**
668  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
669  *
670  * @param cls the closure
671  * @param tunnel connection to the other end
672  * @param tunnel_ctx the socket
673  * @param sender who sent the message
674  * @param message the actual message
675  * @param atsi performance data for the connection
676  * @return GNUNET_OK to keep the connection open,
677  *         GNUNET_SYSERR to close it (signal serious error)
678  */
679 static int
680 server_handle_hello (void *cls,
681                      struct GNUNET_MESH_Tunnel *tunnel,
682                      void **tunnel_ctx,
683                      const struct GNUNET_PeerIdentity *sender,
684                      const struct GNUNET_MessageHeader *message,
685                      const struct GNUNET_ATS_Information*atsi)
686 {
687   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
688
689   return GNUNET_OK;
690 }
691
692
693 /**
694  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
695  *
696  * @param cls the closure
697  * @param tunnel connection to the other end
698  * @param tunnel_ctx the socket
699  * @param sender who sent the message
700  * @param message the actual message
701  * @param atsi performance data for the connection
702  * @return GNUNET_OK to keep the connection open,
703  *         GNUNET_SYSERR to close it (signal serious error)
704  */
705 static int
706 server_handle_hello_ack (void *cls,
707                          struct GNUNET_MESH_Tunnel *tunnel,
708                          void **tunnel_ctx,
709                          const struct GNUNET_PeerIdentity *sender,
710                          const struct GNUNET_MessageHeader *message,
711                          const struct GNUNET_ATS_Information*atsi)
712 {
713   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
714
715   return GNUNET_OK;
716 }
717
718
719 /**
720  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
721  *
722  * @param cls the closure
723  * @param tunnel connection to the other end
724  * @param tunnel_ctx the socket
725  * @param sender who sent the message
726  * @param message the actual message
727  * @param atsi performance data for the connection
728  * @return GNUNET_OK to keep the connection open,
729  *         GNUNET_SYSERR to close it (signal serious error)
730  */
731 static int
732 server_handle_reset (void *cls,
733                      struct GNUNET_MESH_Tunnel *tunnel,
734                      void **tunnel_ctx,
735                      const struct GNUNET_PeerIdentity *sender,
736                      const struct GNUNET_MessageHeader *message,
737                      const struct GNUNET_ATS_Information*atsi)
738 {
739   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
740
741   return GNUNET_OK;
742 }
743
744
745 /**
746  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
747  *
748  * @param cls the closure
749  * @param tunnel connection to the other end
750  * @param tunnel_ctx the socket
751  * @param sender who sent the message
752  * @param message the actual message
753  * @param atsi performance data for the connection
754  * @return GNUNET_OK to keep the connection open,
755  *         GNUNET_SYSERR to close it (signal serious error)
756  */
757 static int
758 server_handle_transmit_close (void *cls,
759                               struct GNUNET_MESH_Tunnel *tunnel,
760                               void **tunnel_ctx,
761                               const struct GNUNET_PeerIdentity *sender,
762                               const struct GNUNET_MessageHeader *message,
763                               const struct GNUNET_ATS_Information*atsi)
764 {
765   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
766
767   return GNUNET_OK;
768 }
769
770
771 /**
772  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
773  *
774  * @param cls the closure
775  * @param tunnel connection to the other end
776  * @param tunnel_ctx the socket
777  * @param sender who sent the message
778  * @param message the actual message
779  * @param atsi performance data for the connection
780  * @return GNUNET_OK to keep the connection open,
781  *         GNUNET_SYSERR to close it (signal serious error)
782  */
783 static int
784 server_handle_transmit_close_ack (void *cls,
785                                   struct GNUNET_MESH_Tunnel *tunnel,
786                                   void **tunnel_ctx,
787                                   const struct GNUNET_PeerIdentity *sender,
788                                   const struct GNUNET_MessageHeader *message,
789                                   const struct GNUNET_ATS_Information*atsi)
790 {
791   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
792
793   return GNUNET_OK;
794 }
795
796
797 /**
798  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
799  *
800  * @param cls the closure
801  * @param tunnel connection to the other end
802  * @param tunnel_ctx the socket
803  * @param sender who sent the message
804  * @param message the actual message
805  * @param atsi performance data for the connection
806  * @return GNUNET_OK to keep the connection open,
807  *         GNUNET_SYSERR to close it (signal serious error)
808  */
809 static int
810 server_handle_receive_close (void *cls,
811                              struct GNUNET_MESH_Tunnel *tunnel,
812                              void **tunnel_ctx,
813                              const struct GNUNET_PeerIdentity *sender,
814                              const struct GNUNET_MessageHeader *message,
815                              const struct GNUNET_ATS_Information*atsi)
816 {
817   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
818
819   return GNUNET_OK;
820 }
821
822
823 /**
824  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
825  *
826  * @param cls the closure
827  * @param tunnel connection to the other end
828  * @param tunnel_ctx the socket
829  * @param sender who sent the message
830  * @param message the actual message
831  * @param atsi performance data for the connection
832  * @return GNUNET_OK to keep the connection open,
833  *         GNUNET_SYSERR to close it (signal serious error)
834  */
835 static int
836 server_handle_receive_close_ack (void *cls,
837                                  struct GNUNET_MESH_Tunnel *tunnel,
838                                  void **tunnel_ctx,
839                                  const struct GNUNET_PeerIdentity *sender,
840                                  const struct GNUNET_MessageHeader *message,
841                                  const struct GNUNET_ATS_Information*atsi)
842 {
843   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
844
845   return GNUNET_OK;
846 }
847
848
849 /**
850  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
851  *
852  * @param cls the closure
853  * @param tunnel connection to the other end
854  * @param tunnel_ctx the socket
855  * @param sender who sent the message
856  * @param message the actual message
857  * @param atsi performance data for the connection
858  * @return GNUNET_OK to keep the connection open,
859  *         GNUNET_SYSERR to close it (signal serious error)
860  */
861 static int
862 server_handle_close (void *cls,
863                      struct GNUNET_MESH_Tunnel *tunnel,
864                      void **tunnel_ctx,
865                      const struct GNUNET_PeerIdentity *sender,
866                      const struct GNUNET_MessageHeader *message,
867                      const struct GNUNET_ATS_Information*atsi)
868 {
869   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
870
871   return GNUNET_OK;
872 }
873
874
875 /**
876  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
877  *
878  * @param cls the closure
879  * @param tunnel connection to the other end
880  * @param tunnel_ctx the socket
881  * @param sender who sent the message
882  * @param message the actual message
883  * @param atsi performance data for the connection
884  * @return GNUNET_OK to keep the connection open,
885  *         GNUNET_SYSERR to close it (signal serious error)
886  */
887 static int
888 server_handle_close_ack (void *cls,
889                          struct GNUNET_MESH_Tunnel *tunnel,
890                          void **tunnel_ctx,
891                          const struct GNUNET_PeerIdentity *sender,
892                          const struct GNUNET_MessageHeader *message,
893                          const struct GNUNET_ATS_Information*atsi)
894 {
895   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
896
897   return GNUNET_OK;
898 }
899
900
901 /**
902  * Message Handler for mesh
903  *
904  * @param cls closure (set from GNUNET_MESH_connect)
905  * @param tunnel connection to the other end
906  * @param tunnel_ctx place to store local state associated with the tunnel
907  * @param sender who sent the message
908  * @param ack the actual message
909  * @param atsi performance data for the connection
910  * @return GNUNET_OK to keep the connection open,
911  *         GNUNET_SYSERR to close it (signal serious error)
912  */
913 static int
914 handle_ack (struct GNUNET_STREAM_Socket *socket,
915             struct GNUNET_MESH_Tunnel *tunnel,
916             const struct GNUNET_PeerIdentity *sender,
917             const struct GNUNET_STREAM_AckMessage *ack,
918             const struct GNUNET_ATS_Information*atsi)
919 {
920   return GNUNET_OK;
921 }
922
923
924 /**
925  * Message Handler for mesh
926  *
927  * @param cls the 'struct GNUNET_STREAM_Socket'
928  * @param tunnel connection to the other end
929  * @param tunnel_ctx unused
930  * @param sender who sent the message
931  * @param message the actual message
932  * @param atsi performance data for the connection
933  * @return GNUNET_OK to keep the connection open,
934  *         GNUNET_SYSERR to close it (signal serious error)
935  */
936 static int
937 client_handle_ack (void *cls,
938                    struct GNUNET_MESH_Tunnel *tunnel,
939                    void **tunnel_ctx,
940                    const struct GNUNET_PeerIdentity *sender,
941                    const struct GNUNET_MessageHeader *message,
942                    const struct GNUNET_ATS_Information*atsi)
943 {
944   struct GNUNET_STREAM_Socket *socket = cls;
945   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
946  
947   return handle_ack (socket, tunnel, sender, ack, atsi);
948 }
949
950
951 /**
952  * Message Handler for mesh
953  *
954  * @param cls the server's listen socket
955  * @param tunnel connection to the other end
956  * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
957  * @param sender who sent the message
958  * @param message the actual message
959  * @param atsi performance data for the connection
960  * @return GNUNET_OK to keep the connection open,
961  *         GNUNET_SYSERR to close it (signal serious error)
962  */
963 static int
964 server_handle_ack (void *cls,
965                    struct GNUNET_MESH_Tunnel *tunnel,
966                    void **tunnel_ctx,
967                    const struct GNUNET_PeerIdentity *sender,
968                    const struct GNUNET_MessageHeader *message,
969                    const struct GNUNET_ATS_Information*atsi)
970 {
971   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
972   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
973  
974   return handle_ack (socket, tunnel, sender, ack, atsi);
975 }
976
977
978 /**
979  * For client message handlers, the stream socket is in the
980  * closure argument.
981  */
982 static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
983   {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
984   {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
985    sizeof (struct GNUNET_STREAM_AckMessage) },
986   {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
987    sizeof (struct GNUNET_STREAM_MessageHeader)},
988   {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
989    sizeof (struct GNUNET_STREAM_MessageHeader)},
990   {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
991    sizeof (struct GNUNET_STREAM_MessageHeader)},
992   {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
993    sizeof (struct GNUNET_STREAM_MessageHeader)},
994   {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
995    sizeof (struct GNUNET_STREAM_MessageHeader)},
996   {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
997    sizeof (struct GNUNET_STREAM_MessageHeader)},
998   {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
999    sizeof (struct GNUNET_STREAM_MessageHeader)},
1000   {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
1001    sizeof (struct GNUNET_STREAM_MessageHeader)},
1002   {NULL, 0, 0}
1003 };
1004
1005
1006 /**
1007  * For server message handlers, the stream socket is in the
1008  * tunnel context, and the listen socket in the closure argument.
1009  */
1010 static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
1011   {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
1012   {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
1013    sizeof (struct GNUNET_STREAM_AckMessage) },
1014   {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 
1015    sizeof (struct GNUNET_STREAM_MessageHeader)},
1016   {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
1017    sizeof (struct GNUNET_STREAM_MessageHeader)},
1018   {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
1019    sizeof (struct GNUNET_STREAM_MessageHeader)},
1020   {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
1021    sizeof (struct GNUNET_STREAM_MessageHeader)},
1022   {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
1023    sizeof (struct GNUNET_STREAM_MessageHeader)},
1024   {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
1025    sizeof (struct GNUNET_STREAM_MessageHeader)},
1026   {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
1027    sizeof (struct GNUNET_STREAM_MessageHeader)},
1028   {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
1029    sizeof (struct GNUNET_STREAM_MessageHeader)},
1030   {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
1031    sizeof (struct GNUNET_STREAM_MessageHeader)},
1032   {NULL, 0, 0}
1033 };
1034
1035
1036 /**
1037  * Function called when our target peer is connected to our tunnel
1038  *
1039  * @param peer the peer identity of the target
1040  * @param atsi performance data for the connection
1041  */
1042 static void
1043 mesh_peer_connect_callback (void *cls,
1044                             const struct GNUNET_PeerIdentity *peer,
1045                             const struct GNUNET_ATS_Information * atsi)
1046 {
1047   struct GNUNET_STREAM_Socket *socket = cls;
1048   struct GNUNET_STREAM_MessageHeader *message;
1049
1050   if (0 != memcmp (&socket->other_peer, 
1051                    peer, 
1052                    sizeof (struct GNUNET_PeerIdentity)))
1053     {
1054       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1055                   "A peer (%s) which is not our target has\
1056   connected to our tunnel", GNUNET_i2s (peer));
1057       return;
1058     }
1059   
1060   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1061               "Target peer %s connected\n", GNUNET_i2s (peer));
1062   
1063   /* Set state to INIT */
1064   socket->state = STATE_INIT;
1065
1066   /* Send HELLO message */
1067   message = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1068   message->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
1069   message->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1070   queue_message (socket,
1071                  message,
1072                  &set_state_hello_wait,
1073                  NULL);
1074
1075   /* Call open callback */
1076   if (NULL == socket->open_cls)
1077     {
1078       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1079                   "STREAM_open callback is NULL\n");
1080     }
1081   if (NULL != socket->open_cb)
1082     {
1083       socket->open_cb (socket->open_cls, socket);
1084     }
1085 }
1086
1087
1088 /**
1089  * Function called when our target peer is disconnected from our tunnel
1090  *
1091  * @param peer the peer identity of the target
1092  */
1093 static void
1094 mesh_peer_disconnect_callback (void *cls,
1095                                const struct GNUNET_PeerIdentity *peer)
1096 {
1097
1098 }
1099
1100
1101 /*****************/
1102 /* API functions */
1103 /*****************/
1104
1105
1106 /**
1107  * Tries to open a stream to the target peer
1108  *
1109  * @param cfg configuration to use
1110  * @param target the target peer to which the stream has to be opened
1111  * @param app_port the application port number which uniquely identifies this
1112  *            stream
1113  * @param open_cb this function will be called after stream has be established 
1114  * @param open_cb_cls the closure for open_cb
1115  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
1116  * @return if successful it returns the stream socket; NULL if stream cannot be
1117  *         opened 
1118  */
1119 struct GNUNET_STREAM_Socket *
1120 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
1121                     const struct GNUNET_PeerIdentity *target,
1122                     GNUNET_MESH_ApplicationType app_port,
1123                     GNUNET_STREAM_OpenCallback open_cb,
1124                     void *open_cb_cls,
1125                     ...)
1126 {
1127   struct GNUNET_STREAM_Socket *socket;
1128   enum GNUNET_STREAM_Option option;
1129   va_list vargs;                /* Variable arguments */
1130
1131   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
1132   socket->other_peer = *target;
1133   socket->open_cb = open_cb;
1134   socket->open_cls = open_cb_cls;
1135
1136   /* Set defaults */
1137   socket->retransmit_timeout = 
1138     GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
1139
1140   va_start (vargs, open_cb_cls); /* Parse variable args */
1141   do {
1142     option = va_arg (vargs, enum GNUNET_STREAM_Option);
1143     switch (option)
1144       {
1145       case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
1146         /* Expect struct GNUNET_TIME_Relative */
1147         socket->retransmit_timeout = va_arg (vargs,
1148                                              struct GNUNET_TIME_Relative);
1149         break;
1150       case GNUNET_STREAM_OPTION_END:
1151         break;
1152       }
1153
1154   } while (0 != option);
1155   va_end (vargs);               /* End of variable args parsing */
1156
1157   socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
1158                                       1,  /* QUEUE size as parameter? */
1159                                       socket, /* cls */
1160                                       NULL, /* No inbound tunnel handler */
1161                                       NULL, /* No inbound tunnel cleaner */
1162                                       client_message_handlers,
1163                                       NULL); /* We don't get inbound tunnels */
1164   // FIXME: if (NULL == socket->mesh) ...
1165
1166   /* Now create the mesh tunnel to target */
1167   socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
1168                                               NULL, /* Tunnel context */
1169                                               &mesh_peer_connect_callback,
1170                                               &mesh_peer_disconnect_callback,
1171                                               (void *) socket);
1172   // FIXME: if (NULL == socket->tunnel) ...
1173
1174   return socket;
1175 }
1176
1177
1178 /**
1179  * Closes the stream
1180  *
1181  * @param socket the stream socket
1182  */
1183 void
1184 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
1185 {
1186   /* Clear Transmit handles */
1187   if (NULL != socket->transmit_handle)
1188     {
1189       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
1190     }
1191   /* FIXME: Clear message queue */
1192   /* Close associated tunnel */
1193   if (NULL != socket->tunnel)
1194     {
1195       GNUNET_MESH_tunnel_destroy (socket->tunnel);
1196     }
1197   /* Close mesh connection */
1198   if (NULL != socket->mesh)
1199     {
1200       GNUNET_MESH_disconnect (socket->mesh);
1201     }
1202   GNUNET_free (socket);
1203 }
1204
1205
1206 /**
1207  * Method called whenever a peer creates a tunnel to us
1208  *
1209  * @param cls closure
1210  * @param tunnel new handle to the tunnel
1211  * @param initiator peer that started the tunnel
1212  * @param atsi performance information for the tunnel
1213  * @return initial tunnel context for the tunnel
1214  *         (can be NULL -- that's not an error)
1215  */
1216 static void *
1217 new_tunnel_notify (void *cls,
1218                    struct GNUNET_MESH_Tunnel *tunnel,
1219                    const struct GNUNET_PeerIdentity *initiator,
1220                    const struct GNUNET_ATS_Information *atsi)
1221 {
1222   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
1223   struct GNUNET_STREAM_Socket *socket;
1224
1225   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
1226   socket->tunnel = tunnel;
1227   socket->session_id = 0;       /* FIXME */
1228   socket->other_peer = *initiator;
1229   socket->state = STATE_INIT;
1230
1231   if (GNUNET_SYSERR == lsocket->listen_cb (lsocket->listen_cb_cls,
1232                                            socket,
1233                                            &socket->other_peer))
1234     {
1235       socket->state = STATE_CLOSED;
1236       /* FIXME: Send CLOSE message and then free */
1237       GNUNET_free (socket);
1238       GNUNET_MESH_tunnel_destroy (tunnel); /* Destroy the tunnel */
1239     }
1240   return socket;
1241 }
1242
1243
1244 /**
1245  * Function called whenever an inbound tunnel is destroyed.  Should clean up
1246  * any associated state.  This function is NOT called if the client has
1247  * explicitly asked for the tunnel to be destroyed using
1248  * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
1249  * the tunnel.
1250  *
1251  * @param cls closure (set from GNUNET_MESH_connect)
1252  * @param tunnel connection to the other end (henceforth invalid)
1253  * @param tunnel_ctx place where local state associated
1254  *                   with the tunnel is stored
1255  */
1256 static void 
1257 tunnel_cleaner (void *cls,
1258                 const struct GNUNET_MESH_Tunnel *tunnel,
1259                 void *tunnel_ctx)
1260 {
1261   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
1262   struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
1263   struct MessageQueue *head;
1264
1265   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1266               "Peer %s has terminated connection abruptly\n",
1267               GNUNET_i2s (&socket->other_peer));
1268
1269   socket->status = GNUNET_STREAM_SHUTDOWN;
1270   /* Clear Transmit handles */
1271   if (NULL != socket->transmit_handle)
1272     {
1273       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
1274       socket->transmit_handle = NULL;
1275     }
1276    
1277   /* Clear existing message queue */
1278   while (NULL != socket->queue) {
1279     head = socket->queue;
1280     socket->queue = head->next;
1281     GNUNET_free (head->message);
1282     GNUNET_free (head);
1283   }
1284 }
1285
1286
1287 /**
1288  * Listens for stream connections for a specific application ports
1289  *
1290  * @param cfg the configuration to use
1291  * @param app_port the application port for which new streams will be accepted
1292  * @param listen_cb this function will be called when a peer tries to establish
1293  *            a stream with us
1294  * @param listen_cb_cls closure for listen_cb
1295  * @return listen socket, NULL for any error
1296  */
1297 struct GNUNET_STREAM_ListenSocket *
1298 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
1299                       GNUNET_MESH_ApplicationType app_port,
1300                       GNUNET_STREAM_ListenCallback listen_cb,
1301                       void *listen_cb_cls)
1302 {
1303   /* FIXME: Add variable args for passing configration options? */
1304   struct GNUNET_STREAM_ListenSocket *lsocket;
1305   GNUNET_MESH_ApplicationType app_types[2];
1306
1307   app_types[0] = app_port;
1308   app_types[1] = 0;
1309   lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
1310   lsocket->port = app_port;
1311   lsocket->listen_cb = listen_cb;
1312   lsocket->listen_cb_cls = listen_cb_cls;
1313   lsocket->mesh = GNUNET_MESH_connect (cfg,
1314                                        10, /* FIXME: QUEUE size as parameter? */
1315                                        lsocket, /* Closure */
1316                                        &new_tunnel_notify,
1317                                        &tunnel_cleaner,
1318                                        server_message_handlers,
1319                                        app_types);
1320   return lsocket;
1321 }
1322
1323
1324 /**
1325  * Closes the listen socket
1326  *
1327  * @param socket the listen socket
1328  */
1329 void
1330 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
1331 {
1332   /* Do house keeping */
1333
1334   /* Close MESH connection */
1335   GNUNET_MESH_disconnect (lsocket->mesh);
1336   
1337   GNUNET_free (lsocket);
1338 }