2 This file is part of GNUnet.
3 (C) 2012 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file stream/stream_api.c
23 * @brief Implementation of the stream library
24 * @author Sree Harsha Totakura
27 #include "gnunet_common.h"
28 #include "gnunet_stream_lib.h"
32 * states in the Protocol
37 * Client initialization state
42 * Listener initialization state
47 * Pre-connection establishment state
52 * State where a connection has been established
57 * State where the socket is closed on our side and waiting to be ACK'ed
59 STATE_RECEIVE_CLOSE_WAIT,
62 * State where the socket is closed for reading
67 * State where the socket is closed on our side and waiting to be ACK'ed
69 STATE_TRANSMIT_CLOSE_WAIT,
72 * State where the socket is closed for writing
74 STATE_TRANSMIT_CLOSED,
77 * State where the socket is closed on our side and waiting to be ACK'ed
82 * State where the socket is closed
89 * The STREAM Socket Handler
91 struct GNUNET_STREAM_Socket
96 struct GNUNET_MESH_Handle *mesh;
99 * The mesh tunnel handle
101 struct GNUNET_MESH_Tunnel *tunnel;
104 * The session id associated with this stream connection
106 unint32_t session_id;
109 * The peer identity of the peer at the other end of the stream
111 GNUNET_PeerIdentity *other_peer;
114 * Stream open closure
119 * Stream open callback
121 GNUNET_STREAM_OpenCallback open_cb;
124 * Retransmission timeout
126 struct GNUNET_TIME_Relative retransmit_timeout;
129 * The state of the protocol associated with this socket
134 * The status of the socket
136 enum GNUNET_STREAM_Status status;
139 * The current transmit handle (if a pending transmit request exists)
141 struct GNUNET_MESH_TransmitHandle *transmit_handle;
144 * The current message associated with the transmit handle
146 struct GNUNET_MessageHeader *message;
151 * A socket for listening
153 struct GNUNET_STREAM_ListenSocket
159 struct GNUNET_MESH_Handle *mesh;
164 GNUNET_MESH_ApplicationType port;
167 * The callback function which is called after successful opening socket
169 GNUNET_STREAM_ListenCallback listen_cb;
172 * The call back closure
179 * Default value in seconds for various timeouts
181 static unsigned int default_timeout = 300;
185 * Converts message fields from host byte order to network byte order
187 * @param msg the message to convert
190 GNUNET_STREAM_convert_message_h2n (struct GNUNET_STREAM_MessageHeader *msg)
192 /* Add type specific message conversion here */
194 msg->size = htons (msg->size);
195 msg->type = htons (msg->type);
200 * Converts message fields from network byte order to host byte order
202 * @param msg the messeage to convert
205 GNUNET_STREAM_convert_message_n2h (struct GNUNET_STREAM_MessageHeader *msg)
207 msg->size = ntohs (msg->size);
208 msg->type = ntohs (msg->type);
210 /* Add type specific message conversion here */
214 * Callback function from send_message
216 * @param cls closure the socket on which the send message was called
217 * @param size number of bytes available in buf
218 * @param buf where the callee should write the message
219 * @return number of bytes written to buf
222 send_message_notify (void *cls, size_t size, void *buf)
224 struct GNUNET_STREAM_Socket *socket;
226 socket = (struct GNUNET_STREAM_Socket *) cls;
227 socket->transmit_handle = NULL; /* Remove the transmit handle */
228 if (0 == size) /* Socket closed? */
230 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
231 "Message not sent as tunnel was closed \n");
233 else /* Size is more or equal to what was requested */
235 size = socket->message->size;
236 GNUNET_STREAM_convert_message_h2n (socket->message) /* Convert h2n */
237 memcpy (buf, socket->message, size);
239 GNUNET_free (socket->message); /* Free the message memory */
240 socket->message = NULL;
246 * Sends a message using the mesh connection of a socket
248 * @param socket the socket whose mesh connection is used
249 * @param message the message to be sent
252 send_message (struct GNUNET_STREAM_Socket *socket,
253 struct GNUNET_MessageHeader *message)
255 socket->message = message;
256 socket->transmit_handle =
257 GNUNET_MESH_notify_transmit_ready (socket->tunnel,
259 timeout, /* FIXME: Maxdelay */
267 * Makes state transition dependending on the given state
269 * @param socket the socket whose state has to be transitioned
272 make_state_transition (struct GNUNET_STREAM_Socket *socket)
279 * Message Handler for mesh
281 * @param cls closure (set from GNUNET_MESH_connect)
282 * @param tunnel connection to the other end
283 * @param tunnel_ctx place to store local state associated with the tunnel
284 * @param sender who sent the message
285 * @param message the actual message
286 * @param atsi performance data for the connection
287 * @return GNUNET_OK to keep the connection open,
288 * GNUNET_SYSERR to close it (signal serious error)
291 handle_data (void *cls,
292 struct GNUNET_MESH_Tunnel *tunnel,
294 const struct GNUNET_PeerIdentity *sender,
295 const struct GNUNET_MessageHeader *message,
296 const struct GNUNET_ATS_Information*atsi)
299 struct GNUNET_STREAM_MessageHeader *message_copy;
301 size = ntohs (message->size);
302 message_copy = GNUNET_malloc (size);
303 memcpy (message_copy, message, size);
304 GNUNET_STREAM_convert_message_n2h (message_copy);
306 route_message (message_copy);
310 static struct GNUNET_MESH_MessageHandler message_handlers[] = {
311 {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
312 {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_ACK, 0},
313 {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 0},
314 {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK, 0},
315 {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_RESET, 0},
316 {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE, 0},
317 {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK, 0},
318 {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE, 0},
319 {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK, 0},
320 {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE, 0},
321 {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_CLOSE, 0},
322 {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK, 0},
328 * Function called when our target peer is connected to our tunnel
330 * @param peer the peer identity of the target
331 * @param atsi performance data for the connection
334 mesh_peer_connect_callback (void *cls,
335 const struct GNUNET_PeerIdentity *peer,
336 const struct GNUNET_ATS_Information * atsi)
338 const struct GNUNET_STREAM_Socket *socket;
340 socket = (const struct GNUNET_STREAM_Socket *) cls;
341 if (0 != memcmp (socket->other_peer,
343 sizeof (struct GNUNET_PeerIdentity)))
345 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
346 "A peer (%s) which is not our target has\
347 connected to our tunnel", GNUNET_i2s (peer));
351 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
352 "Target peer %s connected\n", GNUNET_i2s (peer));
354 /* Set state to INIT */
355 socket->state = STATE_INIT;
357 /* Try to achieve ESTABLISHED state */
358 make_state_transition (socket);
360 /* Call open callback */
361 if (NULL == open_cls)
363 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
364 "STREAM_open callback is NULL\n");
366 if (NULL != socket->open_cb)
368 socket->open_cb (socket->open_cls, socket);
374 * Function called when our target peer is disconnected from our tunnel
376 * @param peer the peer identity of the target
379 mesh_peer_disconnect_callback (void *cls,
380 const struct GNUNET_PeerIdentity *peer)
387 * Function to find the mapped socket of a tunnel
389 * @param tunnel the tunnel whose associated socket has to be retrieved
390 * @return the socket corresponding to the tunnel
392 static struct GNUNET_STREAM_Socket *
393 find_socket (const struct GNUNET_MESH_Tunnel *tunnel)
395 /* Search tunnel in a list or hashtable and retrieve the socket */
404 * Tries to open a stream to the target peer
406 * @param cfg configuration to use
407 * @param target the target peer to which the stream has to be opened
408 * @param app_port the application port number which uniquely identifies this
410 * @param open_cb this function will be called after stream has be established
411 * @param open_cb_cls the closure for open_cb
412 * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
413 * @return if successful it returns the stream socket; NULL if stream cannot be
416 struct GNUNET_STREAM_Socket *
417 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
418 const struct GNUNET_PeerIdentity *target,
419 GNUNET_MESH_ApplicationType app_port,
420 GNUNET_STREAM_OpenCallback open_cb,
424 struct GNUNET_STREAM_Socket *socket;
425 enum GNUNET_STREAM_Option option;
426 va_list vargs; /* Variable arguments */
428 socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
431 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
432 "Unable to allocate memory\n");
435 socket->other_peer = GNUNET_malloc (sizeof (struct GNUNET_PeerIdentity));
436 if (NULL == socket->other_peer)
438 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
439 "Unable to allocate memory \n");
444 socket->retransmit_timeout =
445 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
447 va_start (vargs, open_cb_cls); /* Parse variable args */
449 option = va_arg (vargs, enum GNUNET_STREAM_Option);
452 case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
453 /* Expect struct GNUNET_TIME_Relative */
454 socket->retransmit_timeout = va_arg (vargs,
455 struct GNUNET_TIME_Relative);
457 case GNUNET_STREAM_OPTION_END:
461 } while (0 != option);
462 va_end (vargs); /* End of variable args parsing */
464 socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
465 10, /* QUEUE size as parameter? */
467 NULL, /* No inbound tunnel handler */
468 NULL, /* No inbound tunnel cleaner */
470 NULL); /* We don't get inbound tunnels */
472 memcpy (socket->other_peer, target, sizeof (struct GNUNET_PeerIdentity));
473 socket->open_cb = open_cb;
474 socket->open_cls = open_cb_cls;
476 /* Now create the mesh tunnel to target */
477 socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
478 NULL, /* Tunnel context */
479 &mesh_peer_connect_callback,
480 &mesh_peer_disconnect_callback,
490 * @param socket the stream socket
493 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
495 /* Clear Transmit handles */
496 if (NULL != socket->transmit_handle)
498 GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
500 /* Clear existing message queue message */
501 if (NULL != socket->message)
503 GNUNET_free (socket->message);
505 /* Clear memory allocated for other peer's PeerIdentity */
506 GNUNET_Free (socket->other_peer);
507 /* Close associated tunnel */
508 if (NULL != socket->tunnel)
510 GNUNET_MESH_tunnel_destroy (socket->tunnel);
512 /* Close mesh connection */
513 if (NULL != socket->mesh)
515 GNUNET_MESH_disconnect (socket->mesh);
517 GNUNET_free (socket);
522 * Method called whenever a peer creates a tunnel to us
525 * @param tunnel new handle to the tunnel
526 * @param initiator peer that started the tunnel
527 * @param atsi performance information for the tunnel
528 * @return initial tunnel context for the tunnel
529 * (can be NULL -- that's not an error)
532 new_tunnel_notify (void *cls,
533 struct GNUNET_MESH_Tunnel *tunnel,
534 const struct GNUNET_PeerIdentity *initiator,
535 const struct GNUNET_ATS_Information *atsi)
537 struct GNUNET_STREAM_ListenSocket *lsocket;
538 struct GNUNET_STREAM_Socket *socket;
540 lsocket = (struct GNUNET_STREAM_ListenSocket *) cls;
541 socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
542 socket->tunnel = tunnel;
543 socket->session_id = 0; /* FIXME */
544 socket->other_peer = GNUNET_malloc (sizeof (struct GNUNET_PeerIdentity));
545 memcpy (socket->other_peer, initiator, sizeof (struct GNUNET_PeerIdentity));
546 socket->state = STATE_LISTEN;
548 if (GNUNET_SYSERR == lsocket->listen_cb (lsocket->listen_cb_cls,
552 socket->state = STATE_CLOSED;
553 make_state_transition (socket);
554 GNUNET_free (socket->other_peer);
555 GNUNET_free (socket);
556 GNUNET_MESH_tunnel_destroy (tunnel); /* Destroy the tunnel */
560 make_state_transition (socket);
566 * Function called whenever an inbound tunnel is destroyed. Should clean up
567 * any associated state. This function is NOT called if the client has
568 * explicitly asked for the tunnel to be destroyed using
569 * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
572 * @param cls closure (set from GNUNET_MESH_connect)
573 * @param tunnel connection to the other end (henceforth invalid)
574 * @param tunnel_ctx place where local state associated
575 * with the tunnel is stored
578 tunnel_cleaner (void *cls,
579 const struct GNUNET_MESH_Tunnel *tunnel,
582 struct GNUNET_STREAM_Socket *socket;
584 socket = find_socket (tunnel);
585 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
586 "Peer %s has terminated connection abruptly\n",
587 GNUNET_i2s (socket->other_peer));
589 socket->status = GNUNET_STREAM_SHUTDOWN;
590 /* Clear Transmit handles */
591 if (NULL != socket->transmit_handle)
593 GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
594 socket->transmit_handle = NULL;
597 /* Clear existing message queue message */
598 if (NULL != socket->message)
600 GNUNET_free (socket->message);
601 socket->message = NULL;
607 * Listens for stream connections for a specific application ports
609 * @param cfg the configuration to use
610 * @param app_port the application port for which new streams will be accepted
611 * @param listen_cb this function will be called when a peer tries to establish
613 * @param listen_cb_cls closure for listen_cb
614 * @return listen socket, NULL for any error
616 struct GNUNET_STREAM_ListenSocket *
617 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
618 GNUNET_MESH_ApplicationType app_port,
619 GNUNET_STREAM_ListenCallback listen_cb,
622 /* FIXME: Add variable args for passing configration options? */
623 struct GNUNET_STREAM_ListenSocket *lsocket;
625 lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
628 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
629 "Unable to allocate memory\n");
632 lsocket->port = app_port;
633 lsocket->listen_cb = listen_cb;
634 lsocket->listen_cb_cls = listen_cb_cls;
635 lsocket->mesh = GNUNET_MESH_connect (cfg,
636 10, /* FIXME: QUEUE size as parameter? */
637 lsocket, /* Closure */
647 * Closes the listen socket
649 * @param socket the listen socket
652 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
654 /* Do house keeping */
656 /* Close MESH connection */
657 GNUNET_MESH_disconnect (lsocket->mesh);
659 GNUNET_free (lsocket);