-misc stream hxing
[oweals/gnunet.git] / src / stream / stream_api.c
1 /*
2   This file is part of GNUnet.
3   (C) 2012 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file stream/stream_api.c
23  * @brief Implementation of the stream library
24  * @author Sree Harsha Totakura
25  */
26 #include "platform.h"
27 #include "gnunet_common.h"
28 #include "gnunet_stream_lib.h"
29
30 /**
31  * states in the Protocol
32  */
33 enum State
34   {
35     /**
36      * Client initialization state
37      */
38     STATE_INIT,
39
40     /**
41      * Listener initialization state 
42      */
43     STATE_LISTEN,
44
45     /**
46      * Pre-connection establishment state
47      */
48     STATE_HELLO_WAIT,
49
50     /**
51      * State where a connection has been established
52      */
53     STATE_ESTABLISHED,
54
55     /**
56      * State where the socket is closed on our side and waiting to be ACK'ed
57      */
58     STATE_RECEIVE_CLOSE_WAIT,
59
60     /**
61      * State where the socket is closed for reading
62      */
63     STATE_RECEIVE_CLOSED,
64
65     /**
66      * State where the socket is closed on our side and waiting to be ACK'ed
67      */
68     STATE_TRANSMIT_CLOSE_WAIT,
69
70     /**
71      * State where the socket is closed for writing
72      */
73     STATE_TRANSMIT_CLOSED,
74
75     /**
76      * State where the socket is closed on our side and waiting to be ACK'ed
77      */
78     STATE_CLOSE_WAIT,
79
80     /**
81      * State where the socket is closed
82      */
83     STATE_CLOSED 
84   };
85
86
87 /**
88  * The STREAM Socket Handler
89  */
90 struct GNUNET_STREAM_Socket
91 {
92   /**
93    * The mesh handle
94    */
95   struct GNUNET_MESH_Handle *mesh;
96
97   /**
98    * The mesh tunnel handle
99    */
100   struct GNUNET_MESH_Tunnel *tunnel;
101
102   /**
103    * The session id associated with this stream connection
104    */
105   uint32_t session_id;
106
107   /**
108    * The peer identity of the peer at the other end of the stream
109    */
110   GNUNET_PeerIdentity other_peer;
111
112   /**
113    * Stream open closure
114    */
115   void *open_cls;
116
117   /**
118    * Stream open callback
119    */
120   GNUNET_STREAM_OpenCallback open_cb;
121
122   /**
123    * Retransmission timeout
124    */
125   struct GNUNET_TIME_Relative retransmit_timeout;
126
127   /**
128    * The state of the protocol associated with this socket
129    */
130   enum State state;
131
132   /**
133    * The status of the socket
134    */
135   enum GNUNET_STREAM_Status status;
136
137   /**
138    * The current transmit handle (if a pending transmit request exists)
139    */
140   struct GNUNET_MESH_TransmitHandle *transmit_handle;
141
142   /**
143    * The current message associated with the transmit handle
144    */
145   struct GNUNET_MessageHeader *message;
146 };
147
148
149 /**
150  * A socket for listening
151  */
152 struct GNUNET_STREAM_ListenSocket
153 {
154
155   /**
156    * The mesh handle
157    */
158   struct GNUNET_MESH_Handle *mesh;
159
160   /**
161    * The service port
162    */
163   GNUNET_MESH_ApplicationType port;
164
165   /**
166    * The callback function which is called after successful opening socket
167    */
168   GNUNET_STREAM_ListenCallback listen_cb;
169
170   /**
171    * The call back closure
172    */
173   void *listen_cb_cls;
174
175 };
176
177 /**
178  * Default value in seconds for various timeouts
179  */
180 static unsigned int default_timeout = 300;
181
182
183 /**
184  * Callback function from send_message
185  *
186  * @param cls closure the socket on which the send message was called
187  * @param size number of bytes available in buf
188  * @param buf where the callee should write the message
189  * @return number of bytes written to buf
190  */
191 static size_t
192 send_message_notify (void *cls, size_t size, void *buf)
193 {
194   struct GNUNET_STREAM_Socket *socket = cls;
195   size_t ret;
196
197   socket->transmit_handle = NULL; /* Remove the transmit handle */
198   if (0 == size)                /* Socket closed? */
199     {
200       // statistics ("message timeout")
201       
202       
203       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
204                   "Message not sent as tunnel was closed \n");
205       ret = 0;
206     }
207   else                          /* Size is more or equal to what was requested */
208     {
209       ret = ntohs (socket->message->size);
210       GNUNET_assert (size >= ret);
211       memcpy (buf, socket->message, ret);
212     }
213   GNUNET_free (socket->message); /* Free the message memory */
214   socket->message = NULL;
215   return ret;
216 }
217
218
219 /**
220  * Sends a message using the mesh connection of a socket
221  *
222  * @param socket the socket whose mesh connection is used
223  * @param message the message to be sent
224  */
225 static void
226 send_message (struct GNUNET_STREAM_Socket *socket,
227               struct GNUNET_MessageHeader *message)
228 {
229   socket->message = message;
230   socket->transmit_handle = 
231     GNUNET_MESH_notify_transmit_ready (socket->tunnel,
232                                        0, /* Corking */
233                                        timeout, /* FIXME: Maxdelay */
234                                        socket->other_peer,
235                                        ntohs (message->size),
236                                        &send_message_notify,
237                                        socket);
238 }
239
240 /**
241  * Makes state transition dependending on the given state
242  *
243  * @param socket the socket whose state has to be transitioned
244  */
245 static void
246 make_state_transition (struct GNUNET_STREAM_Socket *socket)
247 {
248
249 }
250
251
252 /**
253  * Message Handler for mesh
254  *
255  * @param cls closure (set from GNUNET_MESH_connect)
256  * @param tunnel connection to the other end
257  * @param tunnel_ctx place to store local state associated with the tunnel
258  * @param sender who sent the message
259  * @param message the actual message
260  * @param atsi performance data for the connection
261  * @return GNUNET_OK to keep the connection open,
262  *         GNUNET_SYSERR to close it (signal serious error)
263  */
264 static int
265 handle_data (void *cls,
266              struct GNUNET_MESH_Tunnel *tunnel,
267              void **tunnel_ctx,
268              const struct GNUNET_PeerIdentity *sender,
269              const struct GNUNET_MessageHeader *message,
270              const struct GNUNET_ATS_Information*atsi)
271 {
272   struct GNUNET_STREAM_Socket *socket = cls;
273   uint16_t size;
274   const struct GNUNET_STREAM_DataMessage *data_msg;
275   const void *payload;
276
277   size = ntohs (message->size);
278   if (size < sizeof (struct GNUNET_STREAM_DataMessage))
279   {
280     GNUNET_break_op (0);
281     return GNUNET_SYSERR;
282   }
283   data_msg = (const struct GNUNET_STREAM_DataMessage *) message;
284   size -= sizeof (Struct GNUNET_STREAM_DataMessage);
285   payload = &data_msg[1];
286   /* ... */
287   
288   return GNUNET_OK;
289 }
290
291
292 /**
293  * Message Handler for mesh
294  *
295  * @param cls closure (set from GNUNET_MESH_connect)
296  * @param tunnel connection to the other end
297  * @param tunnel_ctx place to store local state associated with the tunnel
298  * @param sender who sent the message
299  * @param message the actual message
300  * @param atsi performance data for the connection
301  * @return GNUNET_OK to keep the connection open,
302  *         GNUNET_SYSERR to close it (signal serious error)
303  */
304 static int
305 handle_ack (void *cls,
306             struct GNUNET_MESH_Tunnel *tunnel,
307             void **tunnel_ctx,
308             const struct GNUNET_PeerIdentity *sender,
309             const struct GNUNET_MessageHeader *message,
310             const struct GNUNET_ATS_Information*atsi)
311 {
312   struct GNUNET_STREAM_Socket *socket = cls;
313   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
314
315 }
316
317
318 static struct GNUNET_MESH_MessageHandler message_handlers[] = {
319   {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
320   {&handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, sizeof (struct GNUNET_STREAM_AckMessage) },
321   {&handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 0},
322   {&handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK, 0},
323   {&handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET, 0},
324   {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE, 0},
325   {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK, 0},
326   {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE, 0},
327   {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK, 0},
328   {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE, 0},
329   {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_CLOSE, 0},
330   {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK, 0},
331   {NULL, 0, 0}
332 };
333
334
335 /**
336  * Function called when our target peer is connected to our tunnel
337  *
338  * @param peer the peer identity of the target
339  * @param atsi performance data for the connection
340  */
341 static void
342 mesh_peer_connect_callback (void *cls,
343                             const struct GNUNET_PeerIdentity *peer,
344                             const struct GNUNET_ATS_Information * atsi)
345 {
346   const struct GNUNET_STREAM_Socket *socket = cls;
347
348   if (0 != memcmp (socket->other_peer, 
349                    peer, 
350                    sizeof (struct GNUNET_PeerIdentity)))
351     {
352       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
353                   "A peer (%s) which is not our target has\
354   connected to our tunnel", GNUNET_i2s (peer));
355       return;
356     }
357   
358   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
359               "Target peer %s connected\n", GNUNET_i2s (peer));
360   
361   /* Set state to INIT */
362   socket->state = STATE_INIT;
363
364   /* Try to achieve ESTABLISHED state */
365   make_state_transition (socket);
366
367   /* Call open callback */
368   if (NULL == open_cls)
369     {
370       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
371                   "STREAM_open callback is NULL\n");
372     }
373   if (NULL != socket->open_cb)
374     {
375       socket->open_cb (socket->open_cls, socket);
376     }
377 }
378
379
380 /**
381  * Function called when our target peer is disconnected from our tunnel
382  *
383  * @param peer the peer identity of the target
384  */
385 static void
386 mesh_peer_disconnect_callback (void *cls,
387                                const struct GNUNET_PeerIdentity *peer)
388 {
389
390 }
391
392
393 /**
394  * Function to find the mapped socket of a tunnel
395  *
396  * @param tunnel the tunnel whose associated socket has to be retrieved
397  * @return the socket corresponding to the tunnel
398  */
399 static struct GNUNET_STREAM_Socket *
400 find_socket (const struct GNUNET_MESH_Tunnel *tunnel)
401 {
402   /* Search tunnel in a list or hashtable and retrieve the socket */
403 }
404
405 /*****************/
406 /* API functions */
407 /*****************/
408
409
410 /**
411  * Tries to open a stream to the target peer
412  *
413  * @param cfg configuration to use
414  * @param target the target peer to which the stream has to be opened
415  * @param app_port the application port number which uniquely identifies this
416  *            stream
417  * @param open_cb this function will be called after stream has be established 
418  * @param open_cb_cls the closure for open_cb
419  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
420  * @return if successful it returns the stream socket; NULL if stream cannot be
421  *         opened 
422  */
423 struct GNUNET_STREAM_Socket *
424 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
425                     const struct GNUNET_PeerIdentity *target,
426                     GNUNET_MESH_ApplicationType app_port,
427                     GNUNET_STREAM_OpenCallback open_cb,
428                     void *open_cb_cls,
429                     ...)
430 {
431   struct GNUNET_STREAM_Socket *socket;
432   enum GNUNET_STREAM_Option option;
433   va_list vargs;                /* Variable arguments */
434
435   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
436   socket->other_peer = *target;
437   socket->open_cb = open_cb;
438   socket->open_cls = open_cb_cls;
439
440   /* Set defaults */
441   socket->retransmit_timeout = 
442     GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
443
444   va_start (vargs, open_cb_cls); /* Parse variable args */
445   do {
446     option = va_arg (vargs, enum GNUNET_STREAM_Option);
447     switch (option)
448       {
449       case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
450         /* Expect struct GNUNET_TIME_Relative */
451         socket->retransmit_timeout = va_arg (vargs,
452                                              struct GNUNET_TIME_Relative);
453         break;
454       case GNUNET_STREAM_OPTION_END:
455         break;
456       }
457
458   } while (0 != option);
459   va_end (vargs);               /* End of variable args parsing */
460
461   socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
462                                       1,  /* QUEUE size as parameter? */
463                                       socket, /* cls */
464                                       NULL, /* No inbound tunnel handler */
465                                       NULL, /* No inbound tunnel cleaner */
466                                       message_handlers,
467                                       NULL); /* We don't get inbound tunnels */
468   // FIXME: if (NULL == socket->mesh) ...
469
470   /* Now create the mesh tunnel to target */
471   socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
472                                               NULL, /* Tunnel context */
473                                               &mesh_peer_connect_callback,
474                                               &mesh_peer_disconnect_callback,
475                                               (void *) socket);
476   // FIXME: if (NULL == socket->tunnel) ...
477
478   return socket;
479 }
480
481
482 /**
483  * Closes the stream
484  *
485  * @param socket the stream socket
486  */
487 void
488 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
489 {
490   /* Clear Transmit handles */
491   if (NULL != socket->transmit_handle)
492     {
493       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
494     }
495   /* Clear existing message queue message */
496   if (NULL != socket->message)
497     {
498       GNUNET_free (socket->message);
499     }
500   /* Close associated tunnel */
501   if (NULL != socket->tunnel)
502     {
503       GNUNET_MESH_tunnel_destroy (socket->tunnel);
504     }
505   /* Close mesh connection */
506   if (NULL != socket->mesh)
507     {
508       GNUNET_MESH_disconnect (socket->mesh);
509     }
510   GNUNET_free (socket);
511 }
512
513
514 /**
515  * Method called whenever a peer creates a tunnel to us
516  *
517  * @param cls closure
518  * @param tunnel new handle to the tunnel
519  * @param initiator peer that started the tunnel
520  * @param atsi performance information for the tunnel
521  * @return initial tunnel context for the tunnel
522  *         (can be NULL -- that's not an error)
523  */
524 static void 
525 new_tunnel_notify (void *cls,
526                    struct GNUNET_MESH_Tunnel *tunnel,
527                    const struct GNUNET_PeerIdentity *initiator,
528                    const struct GNUNET_ATS_Information *atsi)
529 {
530   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
531   struct GNUNET_STREAM_Socket *socket;
532
533   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
534   socket->tunnel = tunnel;
535   socket->session_id = 0;       /* FIXME */
536   socket->other_peer = GNUNET_malloc (sizeof (struct GNUNET_PeerIdentity));
537   memcpy (socket->other_peer, initiator, sizeof (struct GNUNET_PeerIdentity));
538   socket->state = STATE_LISTEN;
539
540   if (GNUNET_SYSERR == lsocket->listen_cb (lsocket->listen_cb_cls,
541                                            socket,
542                                            socket->other_peer))
543     {
544       socket->state = STATE_CLOSED;
545       make_state_transition (socket);
546       GNUNET_free (socket->other_peer);
547       GNUNET_free (socket);
548       GNUNET_MESH_tunnel_destroy (tunnel); /* Destroy the tunnel */
549     }
550   else
551     {
552       make_state_transition (socket);
553     }
554 }
555
556
557 /**
558  * Function called whenever an inbound tunnel is destroyed.  Should clean up
559  * any associated state.  This function is NOT called if the client has
560  * explicitly asked for the tunnel to be destroyed using
561  * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
562  * the tunnel.
563  *
564  * @param cls closure (set from GNUNET_MESH_connect)
565  * @param tunnel connection to the other end (henceforth invalid)
566  * @param tunnel_ctx place where local state associated
567  *                   with the tunnel is stored
568  */
569 static void 
570 tunnel_cleaner (void *cls,
571                 const struct GNUNET_MESH_Tunnel *tunnel,
572                 void *tunnel_ctx)
573 {
574   struct GNUNET_STREAM_Socket *socket;
575
576   socket = find_socket (tunnel);
577   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
578               "Peer %s has terminated connection abruptly\n",
579               GNUNET_i2s (socket->other_peer));
580
581   socket->status = GNUNET_STREAM_SHUTDOWN;
582   /* Clear Transmit handles */
583   if (NULL != socket->transmit_handle)
584     {
585       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
586       socket->transmit_handle = NULL;
587     }
588    
589   /* Clear existing message queue message */
590   if (NULL != socket->message)
591     {
592       GNUNET_free (socket->message);
593       socket->message = NULL;
594     }
595 }
596
597
598 /**
599  * Listens for stream connections for a specific application ports
600  *
601  * @param cfg the configuration to use
602  * @param app_port the application port for which new streams will be accepted
603  * @param listen_cb this function will be called when a peer tries to establish
604  *            a stream with us
605  * @param listen_cb_cls closure for listen_cb
606  * @return listen socket, NULL for any error
607  */
608 struct GNUNET_STREAM_ListenSocket *
609 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
610                       GNUNET_MESH_ApplicationType app_port,
611                       GNUNET_STREAM_ListenCallback listen_cb,
612                       void *listen_cb_cls)
613 {
614   /* FIXME: Add variable args for passing configration options? */
615   struct GNUNET_STREAM_ListenSocket *lsocket;
616
617   lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
618   lsocket->port = app_port;
619   lsocket->listen_cb = listen_cb;
620   lsocket->listen_cb_cls = listen_cb_cls;
621   lsocket->mesh = GNUNET_MESH_connect (cfg,
622                                        10, /* FIXME: QUEUE size as parameter? */
623                                        lsocket, /* Closure */
624                                        &new_tunnel_notify,
625                                        &tunnel_cleaner,
626                                        message_handlers,
627                                        {app_port, NULL});
628   return lsocket;
629 }
630
631
632 /**
633  * Closes the listen socket
634  *
635  * @param socket the listen socket
636  */
637 void
638 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
639 {
640   /* Do house keeping */
641
642   /* Close MESH connection */
643   GNUNET_MESH_disconnect (lsocket->mesh);
644   
645   GNUNET_free (lsocket);
646 }