WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Affero General Public License for more details.
-
+
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+ SPDX-License-Identifier: AGPL3.0-or-later
*/
/**
#include "platform.h"
#include "gnunet_util_lib.h"
#include "gnunet_protocols.h"
+#include "gnunet_constants.h"
+#include "gnunet_nt_lib.h"
#include "gnunet_statistics_service.h"
#include "gnunet_transport_communication_service.h"
/**
- * How many messages do we keep at most in the queue to the
- * transport service before we start to drop (default,
+ * How many messages do we keep at most in the queue to the
+ * transport service before we start to drop (default,
* can be changed via the configuration file).
* Should be _below_ the level of the communicator API, as
* otherwise we may read messages just to have them dropped
#define DEFAULT_MAX_QUEUE_LENGTH 8
/**
- * Name of the communicator.
+ * Address prefix used by the communicator.
*/
-#define COMMUNICATOR_NAME "unix"
+#define COMMUNICATOR_ADDRESS_PREFIX "unix"
+/**
+ * Configuration section used by the communicator.
+ */
+#define COMMUNICATOR_CONFIG_SECTION "communicator-unix"
+
+/**
+ * Our MTU.
+ */
+#define UNIX_MTU UINT16_MAX
GNUNET_NETWORK_STRUCT_BEGIN
* if this queue is in the #queue_head DLL.
*/
const struct GNUNET_MessageHeader *msg;
-
+
/**
* Message queue we are providing for the #ch.
*/
struct GNUNET_MQ_Handle *mq;
-
+
/**
* handle for this queue with the #ch.
- */
+ */
struct GNUNET_TRANSPORT_QueueHandle *qh;
-
+
/**
* Number of bytes we currently have in our write queue.
*/
struct GNUNET_MQ_Handle *mq;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Disconnecting queue for peer `%s'\n",
+ "Disconnecting queue for peer `%s'\n",
GNUNET_i2s (&queue->target));
if (0 != queue->bytes_in_queue)
{
GNUNET_CONTAINER_DLL_remove (queue_head,
queue_tail,
queue);
- queue->bytes_in_queue = 0;
+ queue->bytes_in_queue = 0;
}
if (NULL != (mq = queue->mq))
{
&queue->target,
queue));
GNUNET_STATISTICS_set (stats,
- "# UNIX queues active",
+ "# queues active",
GNUNET_CONTAINER_multipeermap_size (queue_map),
GNUNET_NO);
if (NULL != queue->timeout_task)
queue->address_len);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"UNIX transmitted message to %s (%d/%u: %s)\n",
- GNUNET_i2s (&queue->target),
+ GNUNET_i2s (&queue->target),
(int) sent,
(unsigned int) msg_size,
(sent < 0) ? STRERROR (errno) : "ok");
/* We should retry later... */
GNUNET_log_strerror (GNUNET_ERROR_TYPE_DEBUG,
"send");
- return;
+ return;
case EMSGSIZE:
{
socklen_t size = 0;
GNUNET_assert (mq == queue->mq);
GNUNET_assert (NULL == queue->msg);
- queue->msg = msg;
+ queue->msg = msg;
GNUNET_CONTAINER_DLL_insert (queue_head,
queue_tail,
queue);
+ GNUNET_assert (NULL != unix_sock);
if (NULL == write_task)
write_task =
GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL,
* data to another peer.
*
* @param peer the target peer
+ * @param cs inbound or outbound queue
* @param un the address
* @param un_len number of bytes in @a un
* @return the queue or NULL of max connections exceeded
*/
static struct Queue *
setup_queue (const struct GNUNET_PeerIdentity *target,
- const struct sockaddr_un *un,
- socklen_t un_len)
+ enum GNUNET_TRANSPORT_ConnectionStatus cs,
+ const struct sockaddr_un *un,
+ socklen_t un_len)
{
struct Queue *queue;
queue = GNUNET_new (struct Queue);
queue->target = *target;
queue->address = GNUNET_memdup (un,
- un_len);
+ un_len);
queue->address_len = un_len;
(void) GNUNET_CONTAINER_multipeermap_put (queue_map,
- &queue->target,
- queue,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+ &queue->target,
+ queue,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
GNUNET_STATISTICS_set (stats,
"# queues active",
GNUNET_CONTAINER_multipeermap_size (queue_map),
queue->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
queue->timeout_task
= GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
- &queue_timeout,
- queue);
+ &queue_timeout,
+ queue);
queue->mq
= GNUNET_MQ_queue_for_callbacks (&mq_send,
- &mq_destroy,
- &mq_cancel,
- queue,
- NULL,
- &mq_error,
- queue);
+ &mq_destroy,
+ &mq_cancel,
+ queue,
+ NULL,
+ &mq_error,
+ queue);
{
char *foreign_addr;
-
+
if ('\0' == un->sun_path[0])
GNUNET_asprintf (&foreign_addr,
- "%s-@%s",
- COMMUNICATOR_NAME,
- &un->sun_path[1]);
+ "%s-@%s",
+ COMMUNICATOR_ADDRESS_PREFIX,
+ &un->sun_path[1]);
else
GNUNET_asprintf (&foreign_addr,
- "%s-%s",
- COMMUNICATOR_NAME,
- un->sun_path);
+ "%s-%s",
+ COMMUNICATOR_ADDRESS_PREFIX,
+ un->sun_path);
queue->qh
= GNUNET_TRANSPORT_communicator_mq_add (ch,
- &queue->target,
- foreign_addr,
- GNUNET_ATS_NET_LOOPBACK,
- queue->mq);
+ &queue->target,
+ foreign_addr,
+ UNIX_MTU,
+ GNUNET_NT_LOOPBACK,
+ cs,
+ queue->mq);
GNUNET_free (foreign_addr);
}
return queue;
delivering_messages--;
if (GNUNET_OK != success)
GNUNET_STATISTICS_update (stats,
- "# transport transmission failures",
- 1,
- GNUNET_NO);
+ "# transport transmission failures",
+ 1,
+ GNUNET_NO);
+ GNUNET_assert (NULL != unix_sock);
if ( (NULL == read_task) &&
(delivering_messages < max_queue_length) )
read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
- unix_sock,
- &select_read_cb,
- NULL);
+ unix_sock,
+ &select_read_cb,
+ NULL);
}
ssize_t ret;
uint16_t msize;
+ GNUNET_assert (NULL != unix_sock);
read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
- unix_sock,
- &select_read_cb,
- NULL);
+ unix_sock,
+ &select_read_cb,
+ NULL);
addrlen = sizeof (un);
memset (&un,
0,
sizeof (un));
ret = GNUNET_NETWORK_socket_recvfrom (unix_sock,
buf,
- sizeof (buf),
+ sizeof (buf),
(struct sockaddr *) &un,
&addrlen);
if ( (-1 == ret) &&
return;
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Read %d bytes from socket %s\n",
- (int) ret,
- un.sun_path);
+ "Read %d bytes from socket %s\n",
+ (int) ret,
+ un.sun_path);
GNUNET_assert (AF_UNIX == (un.sun_family));
msg = (struct UNIXMessage *) buf;
msize = ntohs (msg->header.size);
return;
}
queue = lookup_queue (&msg->sender,
- &un,
- addrlen);
+ &un,
+ addrlen);
if (NULL == queue)
queue = setup_queue (&msg->sender,
- &un,
- addrlen);
+ GNUNET_TRANSPORT_CS_INBOUND,
+ &un,
+ addrlen);
else
reschedule_queue_timeout (queue);
if (NULL == queue)
_("Maximum number of UNIX connections exceeded, dropping incoming message\n"));
return;
}
-
+
{
uint16_t offset = 0;
uint16_t tsize = msize - sizeof (struct UNIXMessage);
const char *msgbuf = (const char *) &msg[1];
-
+
while (offset + sizeof (struct GNUNET_MessageHeader) <= tsize)
{
const struct GNUNET_MessageHeader *currhdr;
sizeof (al_hdr));
csize = ntohs (al_hdr.size);
if ( (csize < sizeof (struct GNUNET_MessageHeader)) ||
- (csize > tsize - offset))
+ (csize > tsize - offset))
{
- GNUNET_break_op (0);
- break;
+ GNUNET_break_op (0);
+ break;
}
ret = GNUNET_TRANSPORT_communicator_receive (ch,
- &msg->sender,
- currhdr,
- &receive_complete_cb,
- NULL);
+ &msg->sender,
+ currhdr,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &receive_complete_cb,
+ NULL);
if (GNUNET_SYSERR == ret)
return; /* transport not up */
if (GNUNET_NO == ret)
const char *path;
struct sockaddr_un *un;
socklen_t un_len;
-
+
if (0 != strncmp (address,
- COMMUNICATOR_NAME "-",
- strlen (COMMUNICATOR_NAME "-")))
+ COMMUNICATOR_ADDRESS_PREFIX "-",
+ strlen (COMMUNICATOR_ADDRESS_PREFIX "-")))
{
GNUNET_break_op (0);
return GNUNET_SYSERR;
}
- path = &address[strlen (COMMUNICATOR_NAME "-")];
+ path = &address[strlen (COMMUNICATOR_ADDRESS_PREFIX "-")];
un = unix_address_to_sockaddr (path,
&un_len);
queue = lookup_queue (peer,
return GNUNET_OK;
}
queue = setup_queue (peer,
- un,
- un_len);
+ GNUNET_TRANSPORT_CS_OUTBOUND,
+ un,
+ un_len);
GNUNET_free (un);
if (NULL == queue)
{
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Failed to setup queue to %s at `%s'\n",
- GNUNET_i2s (peer),
- path);
+ "Failed to setup queue to %s at `%s'\n",
+ GNUNET_i2s (peer),
+ path);
return GNUNET_NO;
}
return GNUNET_OK;
}
+/**
+ * Function called when the transport service has received an
+ * acknowledgement for this communicator (!) via a different return
+ * path.
+ *
+ * Not applicable for UNIX.
+ *
+ * @param cls closure
+ * @param sender which peer sent the notification
+ * @param msg payload
+ */
+static void
+enc_notify_cb (void *cls,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_MessageHeader *msg)
+{
+ (void) cls;
+ (void) sender;
+ (void) msg;
+ GNUNET_break_op (0);
+}
+
+
/**
* Setup communicator and launch network interactions.
- *
+ *
* @param cls NULL (always)
* @param args remaining command-line arguments
* @param cfgfile name of the configuration file used (for saving, can be NULL!)
socklen_t un_len;
char *my_addr;
(void) cls;
-
+
if (GNUNET_OK !=
GNUNET_CONFIGURATION_get_value_filename (cfg,
- "communicator-unix",
+ COMMUNICATOR_CONFIG_SECTION,
"UNIXPATH",
&unix_socket_path))
{
GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
- "communicator-unix",
+ COMMUNICATOR_CONFIG_SECTION,
"UNIXPATH");
return;
}
if (GNUNET_OK !=
GNUNET_CONFIGURATION_get_value_number (cfg,
- "communicator-unix",
+ COMMUNICATOR_CONFIG_SECTION,
"MAX_QUEUE_LENGTH",
&max_queue_length))
max_queue_length = DEFAULT_MAX_QUEUE_LENGTH;
-
+
un = unix_address_to_sockaddr (unix_socket_path,
&un_len);
if (NULL == un)
queue_map = GNUNET_CONTAINER_multipeermap_create (10,
GNUNET_NO);
ch = GNUNET_TRANSPORT_communicator_connect (cfg,
- COMMUNICATOR_NAME,
- 65535,
+ COMMUNICATOR_CONFIG_SECTION,
+ COMMUNICATOR_ADDRESS_PREFIX,
+ GNUNET_TRANSPORT_CC_RELIABLE,
&mq_init,
- NULL);
+ NULL,
+ &enc_notify_cb,
+ NULL);
if (NULL == ch)
{
GNUNET_break (0);
}
GNUNET_asprintf (&my_addr,
"%s-%s",
- COMMUNICATOR_NAME,
+ COMMUNICATOR_ADDRESS_PREFIX,
unix_socket_path);
+ GNUNET_free (unix_socket_path);
ai = GNUNET_TRANSPORT_communicator_address_add (ch,
my_addr,
- GNUNET_ATS_NET_LOOPBACK,
+ GNUNET_NT_LOOPBACK,
GNUNET_TIME_UNIT_FOREVER_REL);
GNUNET_free (my_addr);
- GNUNET_free (unix_socket_path);
- read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
- unix_sock,
- &select_read_cb,
- NULL);
}