2 This file is part of GNUnet.
3 (C) 2012 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file stream/stream_api.c
23 * @brief Implementation of the stream library
24 * @author Sree Harsha Totakura
27 #include "gnunet_common.h"
28 #include "gnunet_stream_lib.h"
31 * states in the Protocol
36 * Client initialization state
41 * Listener initialization state
46 * Pre-connection establishment state
51 * State where a connection has been established
56 * State where the socket is closed on our side and waiting to be ACK'ed
58 STATE_RECEIVE_CLOSE_WAIT,
61 * State where the socket is closed for reading
66 * State where the socket is closed on our side and waiting to be ACK'ed
68 STATE_TRANSMIT_CLOSE_WAIT,
71 * State where the socket is closed for writing
73 STATE_TRANSMIT_CLOSED,
76 * State where the socket is closed on our side and waiting to be ACK'ed
81 * State where the socket is closed
88 * The STREAM Socket Handler
90 struct GNUNET_STREAM_Socket
95 struct GNUNET_MESH_Handle *mesh;
98 * The mesh tunnel handle
100 struct GNUNET_MESH_Tunnel *tunnel;
103 * The session id associated with this stream connection
108 * The peer identity of the peer at the other end of the stream
110 GNUNET_PeerIdentity other_peer;
113 * Stream open closure
118 * Stream open callback
120 GNUNET_STREAM_OpenCallback open_cb;
123 * Retransmission timeout
125 struct GNUNET_TIME_Relative retransmit_timeout;
128 * The state of the protocol associated with this socket
133 * The status of the socket
135 enum GNUNET_STREAM_Status status;
138 * The current transmit handle (if a pending transmit request exists)
140 struct GNUNET_MESH_TransmitHandle *transmit_handle;
143 * The current message associated with the transmit handle
145 struct GNUNET_MessageHeader *message;
150 * A socket for listening
152 struct GNUNET_STREAM_ListenSocket
158 struct GNUNET_MESH_Handle *mesh;
163 GNUNET_MESH_ApplicationType port;
166 * The callback function which is called after successful opening socket
168 GNUNET_STREAM_ListenCallback listen_cb;
171 * The call back closure
178 * Default value in seconds for various timeouts
180 static unsigned int default_timeout = 300;
184 * Callback function from send_message
186 * @param cls closure the socket on which the send message was called
187 * @param size number of bytes available in buf
188 * @param buf where the callee should write the message
189 * @return number of bytes written to buf
192 send_message_notify (void *cls, size_t size, void *buf)
194 struct GNUNET_STREAM_Socket *socket = cls;
197 socket->transmit_handle = NULL; /* Remove the transmit handle */
198 if (0 == size) /* Socket closed? */
200 // statistics ("message timeout")
203 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
204 "Message not sent as tunnel was closed \n");
207 else /* Size is more or equal to what was requested */
209 ret = ntohs (socket->message->size);
210 GNUNET_assert (size >= ret);
211 memcpy (buf, socket->message, ret);
213 GNUNET_free (socket->message); /* Free the message memory */
214 socket->message = NULL;
220 * Sends a message using the mesh connection of a socket
222 * @param socket the socket whose mesh connection is used
223 * @param message the message to be sent
226 send_message (struct GNUNET_STREAM_Socket *socket,
227 struct GNUNET_MessageHeader *message)
229 socket->message = message;
230 socket->transmit_handle =
231 GNUNET_MESH_notify_transmit_ready (socket->tunnel,
233 timeout, /* FIXME: Maxdelay */
235 ntohs (message->size),
236 &send_message_notify,
241 * Makes state transition dependending on the given state
243 * @param socket the socket whose state has to be transitioned
246 make_state_transition (struct GNUNET_STREAM_Socket *socket)
253 * Message Handler for mesh
255 * @param cls closure (set from GNUNET_MESH_connect)
256 * @param tunnel connection to the other end
257 * @param tunnel_ctx place to store local state associated with the tunnel
258 * @param sender who sent the message
259 * @param message the actual message
260 * @param atsi performance data for the connection
261 * @return GNUNET_OK to keep the connection open,
262 * GNUNET_SYSERR to close it (signal serious error)
265 handle_data (void *cls,
266 struct GNUNET_MESH_Tunnel *tunnel,
268 const struct GNUNET_PeerIdentity *sender,
269 const struct GNUNET_MessageHeader *message,
270 const struct GNUNET_ATS_Information*atsi)
272 struct GNUNET_STREAM_Socket *socket = cls;
274 const struct GNUNET_STREAM_DataMessage *data_msg;
277 size = ntohs (message->size);
278 if (size < sizeof (struct GNUNET_STREAM_DataMessage))
281 return GNUNET_SYSERR;
283 data_msg = (const struct GNUNET_STREAM_DataMessage *) message;
284 size -= sizeof (Struct GNUNET_STREAM_DataMessage);
285 payload = &data_msg[1];
293 * Message Handler for mesh
295 * @param cls closure (set from GNUNET_MESH_connect)
296 * @param tunnel connection to the other end
297 * @param tunnel_ctx place to store local state associated with the tunnel
298 * @param sender who sent the message
299 * @param message the actual message
300 * @param atsi performance data for the connection
301 * @return GNUNET_OK to keep the connection open,
302 * GNUNET_SYSERR to close it (signal serious error)
305 handle_ack (void *cls,
306 struct GNUNET_MESH_Tunnel *tunnel,
308 const struct GNUNET_PeerIdentity *sender,
309 const struct GNUNET_MessageHeader *message,
310 const struct GNUNET_ATS_Information*atsi)
312 struct GNUNET_STREAM_Socket *socket = cls;
313 const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
318 static struct GNUNET_MESH_MessageHandler message_handlers[] = {
319 {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
320 {&handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, sizeof (struct GNUNET_STREAM_AckMessage) },
321 {&handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 0},
322 {&handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK, 0},
323 {&handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET, 0},
324 {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE, 0},
325 {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK, 0},
326 {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE, 0},
327 {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK, 0},
328 {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE, 0},
329 {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_CLOSE, 0},
330 {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK, 0},
336 * Function called when our target peer is connected to our tunnel
338 * @param peer the peer identity of the target
339 * @param atsi performance data for the connection
342 mesh_peer_connect_callback (void *cls,
343 const struct GNUNET_PeerIdentity *peer,
344 const struct GNUNET_ATS_Information * atsi)
346 const struct GNUNET_STREAM_Socket *socket = cls;
348 if (0 != memcmp (socket->other_peer,
350 sizeof (struct GNUNET_PeerIdentity)))
352 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
353 "A peer (%s) which is not our target has\
354 connected to our tunnel", GNUNET_i2s (peer));
358 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
359 "Target peer %s connected\n", GNUNET_i2s (peer));
361 /* Set state to INIT */
362 socket->state = STATE_INIT;
364 /* Try to achieve ESTABLISHED state */
365 make_state_transition (socket);
367 /* Call open callback */
368 if (NULL == open_cls)
370 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
371 "STREAM_open callback is NULL\n");
373 if (NULL != socket->open_cb)
375 socket->open_cb (socket->open_cls, socket);
381 * Function called when our target peer is disconnected from our tunnel
383 * @param peer the peer identity of the target
386 mesh_peer_disconnect_callback (void *cls,
387 const struct GNUNET_PeerIdentity *peer)
394 * Function to find the mapped socket of a tunnel
396 * @param tunnel the tunnel whose associated socket has to be retrieved
397 * @return the socket corresponding to the tunnel
399 static struct GNUNET_STREAM_Socket *
400 find_socket (const struct GNUNET_MESH_Tunnel *tunnel)
402 /* Search tunnel in a list or hashtable and retrieve the socket */
411 * Tries to open a stream to the target peer
413 * @param cfg configuration to use
414 * @param target the target peer to which the stream has to be opened
415 * @param app_port the application port number which uniquely identifies this
417 * @param open_cb this function will be called after stream has be established
418 * @param open_cb_cls the closure for open_cb
419 * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
420 * @return if successful it returns the stream socket; NULL if stream cannot be
423 struct GNUNET_STREAM_Socket *
424 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
425 const struct GNUNET_PeerIdentity *target,
426 GNUNET_MESH_ApplicationType app_port,
427 GNUNET_STREAM_OpenCallback open_cb,
431 struct GNUNET_STREAM_Socket *socket;
432 enum GNUNET_STREAM_Option option;
433 va_list vargs; /* Variable arguments */
435 socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
436 socket->other_peer = *target;
437 socket->open_cb = open_cb;
438 socket->open_cls = open_cb_cls;
441 socket->retransmit_timeout =
442 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
444 va_start (vargs, open_cb_cls); /* Parse variable args */
446 option = va_arg (vargs, enum GNUNET_STREAM_Option);
449 case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
450 /* Expect struct GNUNET_TIME_Relative */
451 socket->retransmit_timeout = va_arg (vargs,
452 struct GNUNET_TIME_Relative);
454 case GNUNET_STREAM_OPTION_END:
458 } while (0 != option);
459 va_end (vargs); /* End of variable args parsing */
461 socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
462 1, /* QUEUE size as parameter? */
464 NULL, /* No inbound tunnel handler */
465 NULL, /* No inbound tunnel cleaner */
467 NULL); /* We don't get inbound tunnels */
468 // FIXME: if (NULL == socket->mesh) ...
470 /* Now create the mesh tunnel to target */
471 socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
472 NULL, /* Tunnel context */
473 &mesh_peer_connect_callback,
474 &mesh_peer_disconnect_callback,
476 // FIXME: if (NULL == socket->tunnel) ...
485 * @param socket the stream socket
488 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
490 /* Clear Transmit handles */
491 if (NULL != socket->transmit_handle)
493 GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
495 /* Clear existing message queue message */
496 if (NULL != socket->message)
498 GNUNET_free (socket->message);
500 /* Close associated tunnel */
501 if (NULL != socket->tunnel)
503 GNUNET_MESH_tunnel_destroy (socket->tunnel);
505 /* Close mesh connection */
506 if (NULL != socket->mesh)
508 GNUNET_MESH_disconnect (socket->mesh);
510 GNUNET_free (socket);
515 * Method called whenever a peer creates a tunnel to us
518 * @param tunnel new handle to the tunnel
519 * @param initiator peer that started the tunnel
520 * @param atsi performance information for the tunnel
521 * @return initial tunnel context for the tunnel
522 * (can be NULL -- that's not an error)
525 new_tunnel_notify (void *cls,
526 struct GNUNET_MESH_Tunnel *tunnel,
527 const struct GNUNET_PeerIdentity *initiator,
528 const struct GNUNET_ATS_Information *atsi)
530 struct GNUNET_STREAM_ListenSocket *lsocket = cls;
531 struct GNUNET_STREAM_Socket *socket;
533 socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
534 socket->tunnel = tunnel;
535 socket->session_id = 0; /* FIXME */
536 socket->other_peer = GNUNET_malloc (sizeof (struct GNUNET_PeerIdentity));
537 memcpy (socket->other_peer, initiator, sizeof (struct GNUNET_PeerIdentity));
538 socket->state = STATE_LISTEN;
540 if (GNUNET_SYSERR == lsocket->listen_cb (lsocket->listen_cb_cls,
544 socket->state = STATE_CLOSED;
545 make_state_transition (socket);
546 GNUNET_free (socket->other_peer);
547 GNUNET_free (socket);
548 GNUNET_MESH_tunnel_destroy (tunnel); /* Destroy the tunnel */
552 make_state_transition (socket);
558 * Function called whenever an inbound tunnel is destroyed. Should clean up
559 * any associated state. This function is NOT called if the client has
560 * explicitly asked for the tunnel to be destroyed using
561 * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
564 * @param cls closure (set from GNUNET_MESH_connect)
565 * @param tunnel connection to the other end (henceforth invalid)
566 * @param tunnel_ctx place where local state associated
567 * with the tunnel is stored
570 tunnel_cleaner (void *cls,
571 const struct GNUNET_MESH_Tunnel *tunnel,
574 struct GNUNET_STREAM_Socket *socket;
576 socket = find_socket (tunnel);
577 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
578 "Peer %s has terminated connection abruptly\n",
579 GNUNET_i2s (socket->other_peer));
581 socket->status = GNUNET_STREAM_SHUTDOWN;
582 /* Clear Transmit handles */
583 if (NULL != socket->transmit_handle)
585 GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
586 socket->transmit_handle = NULL;
589 /* Clear existing message queue message */
590 if (NULL != socket->message)
592 GNUNET_free (socket->message);
593 socket->message = NULL;
599 * Listens for stream connections for a specific application ports
601 * @param cfg the configuration to use
602 * @param app_port the application port for which new streams will be accepted
603 * @param listen_cb this function will be called when a peer tries to establish
605 * @param listen_cb_cls closure for listen_cb
606 * @return listen socket, NULL for any error
608 struct GNUNET_STREAM_ListenSocket *
609 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
610 GNUNET_MESH_ApplicationType app_port,
611 GNUNET_STREAM_ListenCallback listen_cb,
614 /* FIXME: Add variable args for passing configration options? */
615 struct GNUNET_STREAM_ListenSocket *lsocket;
617 lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
618 lsocket->port = app_port;
619 lsocket->listen_cb = listen_cb;
620 lsocket->listen_cb_cls = listen_cb_cls;
621 lsocket->mesh = GNUNET_MESH_connect (cfg,
622 10, /* FIXME: QUEUE size as parameter? */
623 lsocket, /* Closure */
633 * Closes the listen socket
635 * @param socket the listen socket
638 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
640 /* Do house keeping */
642 /* Close MESH connection */
643 GNUNET_MESH_disconnect (lsocket->mesh);
645 GNUNET_free (lsocket);