25b8763339e3538049c4c9a8625c2de848b1c28f
[oweals/gnunet.git] / src / stream / stream_api.c
1 /*
2   This file is part of GNUnet.
3   (C) 2012 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file stream/stream_api.c
23  * @brief Implementation of the stream library
24  * @author Sree Harsha Totakura
25  */
26
27 #include "gnunet_common.h"
28 #include "gnunet_stream_lib.h"
29
30
31 /**
32  * states in the Protocol
33  */
34 enum State
35   {
36     /**
37      * Client initialization state
38      */
39     STATE_INIT,
40
41     /**
42      * Listener initialization state 
43      */
44     STATE_LISTEN,
45
46     /**
47      * Pre-connection establishment state
48      */
49     STATE_HELLO_WAIT,
50
51     /**
52      * State where a connection has been established
53      */
54     STATE_ESTABLISHED,
55
56     /**
57      * State where the socket is closed on our side and waiting to be ACK'ed
58      */
59     STATE_RECEIVE_CLOSE_WAIT,
60
61     /**
62      * State where the socket is closed for reading
63      */
64     STATE_RECEIVE_CLOSED,
65
66     /**
67      * State where the socket is closed on our side and waiting to be ACK'ed
68      */
69     STATE_TRANSMIT_CLOSE_WAIT,
70
71     /**
72      * State where the socket is closed for writing
73      */
74     STATE_TRANSMIT_CLOSED,
75
76     /**
77      * State where the socket is closed on our side and waiting to be ACK'ed
78      */
79     STATE_CLOSE_WAIT,
80
81     /**
82      * State where the socket is closed
83      */
84     STATE_CLOSED 
85   };
86
87
88 /**
89  * The STREAM Socket Handler
90  */
91 struct GNUNET_STREAM_Socket
92 {
93   /**
94    * The mesh handle
95    */
96   struct GNUNET_MESH_Handle *mesh;
97
98   /**
99    * The mesh tunnel handle
100    */
101   struct GNUNET_MESH_Tunnel *tunnel;
102
103   /**
104    * The session id associated with this stream connection
105    */
106   unint32_t session_id;
107
108   /**
109    * The peer identity of the peer at the other end of the stream
110    */
111   GNUNET_PeerIdentity *other_peer;
112
113   /**
114    * Stream open closure
115    */
116   void *open_cls;
117
118   /**
119    * Stream open callback
120    */
121   GNUNET_STREAM_OpenCallback open_cb;
122
123   /**
124    * Retransmission timeout
125    */
126   struct GNUNET_TIME_Relative retransmit_timeout;
127
128   /**
129    * The state of the protocol associated with this socket
130    */
131   enum State state;
132
133   /**
134    * The status of the socket
135    */
136   enum GNUNET_STREAM_Status status;
137
138   /**
139    * The current transmit handle (if a pending transmit request exists)
140    */
141   struct GNUNET_MESH_TransmitHandle *transmit_handle;
142
143   /**
144    * The current message associated with the transmit handle
145    */
146   struct GNUNET_MessageHeader *message;
147 };
148
149
150 /**
151  * A socket for listening
152  */
153 struct GNUNET_STREAM_ListenSocket
154 {
155
156   /**
157    * The mesh handle
158    */
159   struct GNUNET_MESH_Handle *mesh;
160
161   /**
162    * The service port
163    */
164   GNUNET_MESH_ApplicationType port;
165
166   /**
167    * The callback function which is called after successful opening socket
168    */
169   GNUNET_STREAM_ListenCallback listen_cb;
170
171   /**
172    * The call back closure
173    */
174   void *listen_cb_cls;
175
176 };
177
178 /**
179  * Default value in seconds for various timeouts
180  */
181 static unsigned int default_timeout = 300;
182
183
184 /**
185  * Converts message fields from host byte order to network byte order
186  *
187  * @param msg the message to convert
188  */
189 static void
190 GNUNET_STREAM_convert_message_h2n (struct GNUNET_STREAM_MessageHeader *msg)
191 {
192   /* Add type specific message conversion here  */
193
194   msg->size = htons (msg->size);
195   msg->type = htons (msg->type);
196 }
197
198
199 /**
200  * Converts message fields from network byte order to host byte order
201  *
202  * @param msg the messeage to convert
203  */
204 static void
205 GNUNET_STREAM_convert_message_n2h (struct GNUNET_STREAM_MessageHeader *msg)
206 {
207   msg->size = ntohs (msg->size);
208   msg->type = ntohs (msg->type);
209
210   /* Add type specific message conversion here  */
211 }
212
213 /**
214  * Callback function from send_message
215  *
216  * @param cls closure the socket on which the send message was called
217  * @param size number of bytes available in buf
218  * @param buf where the callee should write the message
219  * @return number of bytes written to buf
220  */
221 static size_t
222 send_message_notify (void *cls, size_t size, void *buf)
223 {
224   struct GNUNET_STREAM_Socket *socket;
225
226   socket = (struct GNUNET_STREAM_Socket *) cls;
227   socket->transmit_handle = NULL; /* Remove the transmit handle */
228   if (0 == size)                /* Socket closed? */
229     {
230       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
231                   "Message not sent as tunnel was closed \n");
232     }
233   else                          /* Size is more or equal to what was requested */
234     {
235       size = socket->message->size;
236       GNUNET_STREAM_convert_message_h2n (socket->message) /* Convert h2n */
237         memcpy (buf, socket->message, size);
238     }
239   GNUNET_free (socket->message); /* Free the message memory */
240   socket->message = NULL;
241   return size;
242 }
243
244
245 /**
246  * Sends a message using the mesh connection of a socket
247  *
248  * @param socket the socket whose mesh connection is used
249  * @param message the message to be sent
250  */
251 static void
252 send_message (struct GNUNET_STREAM_Socket *socket,
253               struct GNUNET_MessageHeader *message)
254 {
255   socket->message = message;
256   socket->transmit_handle = 
257     GNUNET_MESH_notify_transmit_ready (socket->tunnel,
258                                        0, /* Corking */
259                                        timeout, /* FIXME: Maxdelay */
260                                        socket->other_peer,
261                                        message->size,
262                                        send_message_notify,
263                                        socket);
264 }
265
266 /**
267  * Makes state transition dependending on the given state
268  *
269  * @param socket the socket whose state has to be transitioned
270  */
271 static void
272 make_state_transition (struct GNUNET_STREAM_Socket *socket)
273 {
274
275 }
276
277
278 /**
279  * Message Handler for mesh
280  *
281  * @param cls closure (set from GNUNET_MESH_connect)
282  * @param tunnel connection to the other end
283  * @param tunnel_ctx place to store local state associated with the tunnel
284  * @param sender who sent the message
285  * @param message the actual message
286  * @param atsi performance data for the connection
287  * @return GNUNET_OK to keep the connection open,
288  *         GNUNET_SYSERR to close it (signal serious error)
289  */
290 static int
291 handle_data (void *cls,
292              struct GNUNET_MESH_Tunnel *tunnel,
293              void **tunnel_ctx,
294              const struct GNUNET_PeerIdentity *sender,
295              const struct GNUNET_MessageHeader *message,
296              const struct GNUNET_ATS_Information*atsi)
297 {
298   uint16_t size;
299   struct GNUNET_STREAM_MessageHeader *message_copy;
300   
301   size = ntohs (message->size);
302   message_copy = GNUNET_malloc (size);
303   memcpy (message_copy, message, size);
304   GNUNET_STREAM_convert_message_n2h (message_copy);
305   
306   route_message (message_copy);
307 }
308
309
310 static struct GNUNET_MESH_MessageHandler message_handlers[] = {
311   {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
312   {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_ACK, 0},
313   {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 0},
314   {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK, 0},
315   {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_RESET, 0},
316   {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE, 0},
317   {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK, 0},
318   {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE, 0},
319   {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK, 0},
320   {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE, 0},
321   {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_CLOSE, 0},
322   {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK, 0},
323   {NULL, 0, 0}
324 };
325
326
327 /**
328  * Function called when our target peer is connected to our tunnel
329  *
330  * @param peer the peer identity of the target
331  * @param atsi performance data for the connection
332  */
333 static void
334 mesh_peer_connect_callback (void *cls,
335                             const struct GNUNET_PeerIdentity *peer,
336                             const struct GNUNET_ATS_Information * atsi)
337 {
338   const struct GNUNET_STREAM_Socket *socket;
339   
340   socket = (const struct GNUNET_STREAM_Socket *) cls;
341   if (0 != memcmp (socket->other_peer, 
342                    peer, 
343                    sizeof (struct GNUNET_PeerIdentity)))
344     {
345       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
346                   "A peer (%s) which is not our target has\
347   connected to our tunnel", GNUNET_i2s (peer));
348       return;
349     }
350   
351   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
352               "Target peer %s connected\n", GNUNET_i2s (peer));
353   
354   /* Set state to INIT */
355   socket->state = STATE_INIT;
356
357   /* Try to achieve ESTABLISHED state */
358   make_state_transition (socket);
359
360   /* Call open callback */
361   if (NULL == open_cls)
362     {
363       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
364                   "STREAM_open callback is NULL\n");
365     }
366   if (NULL != socket->open_cb)
367     {
368       socket->open_cb (socket->open_cls, socket);
369     }
370 }
371
372
373 /**
374  * Function called when our target peer is disconnected from our tunnel
375  *
376  * @param peer the peer identity of the target
377  */
378 static void
379 mesh_peer_disconnect_callback (void *cls,
380                                const struct GNUNET_PeerIdentity *peer)
381 {
382
383 }
384
385
386 /**
387  * Function to find the mapped socket of a tunnel
388  *
389  * @param tunnel the tunnel whose associated socket has to be retrieved
390  * @return the socket corresponding to the tunnel
391  */
392 static struct GNUNET_STREAM_Socket *
393 find_socket (const struct GNUNET_MESH_Tunnel *tunnel)
394 {
395   /* Search tunnel in a list or hashtable and retrieve the socket */
396 }
397
398 /*****************/
399 /* API functions */
400 /*****************/
401
402
403 /**
404  * Tries to open a stream to the target peer
405  *
406  * @param cfg configuration to use
407  * @param target the target peer to which the stream has to be opened
408  * @param app_port the application port number which uniquely identifies this
409  *            stream
410  * @param open_cb this function will be called after stream has be established 
411  * @param open_cb_cls the closure for open_cb
412  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
413  * @return if successful it returns the stream socket; NULL if stream cannot be
414  *         opened 
415  */
416 struct GNUNET_STREAM_Socket *
417 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
418                     const struct GNUNET_PeerIdentity *target,
419                     GNUNET_MESH_ApplicationType app_port,
420                     GNUNET_STREAM_OpenCallback open_cb,
421                     void *open_cb_cls,
422                     ...)
423 {
424   struct GNUNET_STREAM_Socket *socket;
425   enum GNUNET_STREAM_Option option;
426   va_list vargs;                /* Variable arguments */
427
428   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
429   if (NULL == socket)
430     {
431       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
432                   "Unable to allocate memory\n");
433       return NULL;
434     }
435   socket->other_peer = GNUNET_malloc (sizeof (struct GNUNET_PeerIdentity));
436   if (NULL == socket->other_peer)
437     {
438       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
439                   "Unable to allocate memory \n");
440       return NULL;
441     }
442
443   /* Set defaults */
444   socket->retransmit_timeout = 
445     GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
446
447   va_start (vargs, open_cb_cls); /* Parse variable args */
448   do {
449     option = va_arg (vargs, enum GNUNET_STREAM_Option);
450     switch (option)
451       {
452       case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
453         /* Expect struct GNUNET_TIME_Relative */
454         socket->retransmit_timeout = va_arg (vargs,
455                                              struct GNUNET_TIME_Relative);
456         break;
457       case GNUNET_STREAM_OPTION_END:
458         break;
459       }
460
461   } while (0 != option);
462   va_end (vargs);               /* End of variable args parsing */
463
464   socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
465                                       10,  /* QUEUE size as parameter? */
466                                       NULL, /* cls */
467                                       NULL, /* No inbound tunnel handler */
468                                       NULL, /* No inbound tunnel cleaner */
469                                       message_handlers,
470                                       NULL); /* We don't get inbound tunnels */
471
472   memcpy (socket->other_peer, target, sizeof (struct GNUNET_PeerIdentity));
473   socket->open_cb = open_cb;
474   socket->open_cls = open_cb_cls;
475
476   /* Now create the mesh tunnel to target */
477   socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
478                                               NULL, /* Tunnel context */
479                                               &mesh_peer_connect_callback,
480                                               &mesh_peer_disconnect_callback,
481                                               (void *) socket);
482
483   return socket;
484 }
485
486
487 /**
488  * Closes the stream
489  *
490  * @param socket the stream socket
491  */
492 void
493 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
494 {
495   /* Clear Transmit handles */
496   if (NULL != socket->transmit_handle)
497     {
498       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
499     }
500   /* Clear existing message queue message */
501   if (NULL != socket->message)
502     {
503       GNUNET_free (socket->message);
504     }
505   /* Clear memory allocated for other peer's PeerIdentity */
506   GNUNET_Free (socket->other_peer);
507   /* Close associated tunnel */
508   if (NULL != socket->tunnel)
509     {
510       GNUNET_MESH_tunnel_destroy (socket->tunnel);
511     }
512   /* Close mesh connection */
513   if (NULL != socket->mesh)
514     {
515       GNUNET_MESH_disconnect (socket->mesh);
516     }
517   GNUNET_free (socket);
518 }
519
520
521 /**
522  * Method called whenever a peer creates a tunnel to us
523  *
524  * @param cls closure
525  * @param tunnel new handle to the tunnel
526  * @param initiator peer that started the tunnel
527  * @param atsi performance information for the tunnel
528  * @return initial tunnel context for the tunnel
529  *         (can be NULL -- that's not an error)
530  */
531 void 
532 new_tunnel_notify (void *cls,
533                    struct GNUNET_MESH_Tunnel *tunnel,
534                    const struct GNUNET_PeerIdentity *initiator,
535                    const struct GNUNET_ATS_Information *atsi)
536 {
537   struct GNUNET_STREAM_ListenSocket *lsocket;
538   struct GNUNET_STREAM_Socket *socket;
539
540   lsocket = (struct GNUNET_STREAM_ListenSocket *) cls;
541   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
542   socket->tunnel = tunnel;
543   socket->session_id = 0;       /* FIXME */
544   socket->other_peer = GNUNET_malloc (sizeof (struct GNUNET_PeerIdentity));
545   memcpy (socket->other_peer, initiator, sizeof (struct GNUNET_PeerIdentity));
546   socket->state = STATE_LISTEN;
547
548   if (GNUNET_SYSERR == lsocket->listen_cb (lsocket->listen_cb_cls,
549                                            socket,
550                                            socket->other_peer))
551     {
552       socket->state = STATE_CLOSED;
553       make_state_transition (socket);
554       GNUNET_free (socket->other_peer);
555       GNUNET_free (socket);
556       GNUNET_MESH_tunnel_destroy (tunnel); /* Destroy the tunnel */
557     }
558   else
559     {
560       make_state_transition (socket);
561     }
562 }
563
564
565 /**
566  * Function called whenever an inbound tunnel is destroyed.  Should clean up
567  * any associated state.  This function is NOT called if the client has
568  * explicitly asked for the tunnel to be destroyed using
569  * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
570  * the tunnel.
571  *
572  * @param cls closure (set from GNUNET_MESH_connect)
573  * @param tunnel connection to the other end (henceforth invalid)
574  * @param tunnel_ctx place where local state associated
575  *                   with the tunnel is stored
576  */
577 void 
578 tunnel_cleaner (void *cls,
579                 const struct GNUNET_MESH_Tunnel *tunnel,
580                 void *tunnel_ctx)
581 {
582   struct GNUNET_STREAM_Socket *socket;
583
584   socket = find_socket (tunnel);
585   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
586               "Peer %s has terminated connection abruptly\n",
587               GNUNET_i2s (socket->other_peer));
588
589   socket->status = GNUNET_STREAM_SHUTDOWN;
590   /* Clear Transmit handles */
591   if (NULL != socket->transmit_handle)
592     {
593       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
594       socket->transmit_handle = NULL;
595     }
596    
597   /* Clear existing message queue message */
598   if (NULL != socket->message)
599     {
600       GNUNET_free (socket->message);
601       socket->message = NULL;
602     }
603 }
604
605
606 /**
607  * Listens for stream connections for a specific application ports
608  *
609  * @param cfg the configuration to use
610  * @param app_port the application port for which new streams will be accepted
611  * @param listen_cb this function will be called when a peer tries to establish
612  *            a stream with us
613  * @param listen_cb_cls closure for listen_cb
614  * @return listen socket, NULL for any error
615  */
616 struct GNUNET_STREAM_ListenSocket *
617 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
618                       GNUNET_MESH_ApplicationType app_port,
619                       GNUNET_STREAM_ListenCallback listen_cb,
620                       void *listen_cb_cls)
621 {
622   /* FIXME: Add variable args for passing configration options? */
623   struct GNUNET_STREAM_ListenSocket *lsocket;
624
625   lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
626   if (NULL == lsocket)
627     {
628       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
629                   "Unable to allocate memory\n");
630       return NULL;
631     }
632   lsocket->port = app_port;
633   lsocket->listen_cb = listen_cb;
634   lsocket->listen_cb_cls = listen_cb_cls;
635   lsocket->mesh = GNUNET_MESH_connect (cfg,
636                                        10, /* FIXME: QUEUE size as parameter? */
637                                        lsocket, /* Closure */
638                                        &new_tunnel_notify,
639                                        &tunnel_cleaner,
640                                        message_handlers,
641                                        {app_port, NULL});
642   return lsocket;
643 }
644
645
646 /**
647  * Closes the listen socket
648  *
649  * @param socket the listen socket
650  */
651 void
652 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
653 {
654   /* Do house keeping */
655
656   /* Close MESH connection */
657   GNUNET_MESH_disconnect (lsocket->mesh);
658   
659   GNUNET_free (lsocket);
660 }