const struct sockaddr *sock_addr;
size_t addrlen;
+
+ /**
+ * Desired delay for next sending we received from other peer
+ */
+ struct GNUNET_TIME_Absolute flow_delay_from_other_peer;
};
};
+/**
+ * Closure for 'process_inbound_tokenized_messages'
+ */
+struct SourceInformation
+{
+ /**
+ * Sender identity.
+ */
+ struct GNUNET_PeerIdentity sender;
+
+ /**
+ * Source address.
+ */
+ const void *arg;
+
+ /**
+ * Number of bytes in source address.
+ */
+ size_t args;
+
+ struct Session *session;
+};
+
/**
* Function called for a quick conversion of the binary address to
struct SessionCompareContext * cctx = cls;
const struct GNUNET_HELLO_Address *address = cctx->addr;
struct Session *s = value;
- struct Session *r = cctx->res;
- struct IPv4UdpAddress * u4 = NULL;
- struct IPv6UdpAddress * u6 = NULL;
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "AAAAAAAAAAAAAAAAAAa\n");
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Looking for existing session for address %s\n", udp_address_to_string (NULL, (void *) address->address, address->address_length));
- if (s->addrlen == address->address_length)
+ socklen_t s_addrlen = s->addrlen;
+
+#if VERBOSE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Comparing address %s <-> %s\n",
+ udp_address_to_string (NULL, (void *) address->address, address->address_length),
+ GNUNET_a2s (s->sock_addr, s->addrlen));
+#endif
+
+ if ((address->address_length == sizeof (struct IPv4UdpAddress)) &&
+ (s_addrlen == sizeof (struct sockaddr_in)))
{
- if (address->address_length == sizeof (struct IPv4UdpAddress))
+ struct IPv4UdpAddress * u4 = NULL;
+ u4 = (struct IPv4UdpAddress *) address->address;
+ const struct sockaddr_in *s4 = (const struct sockaddr_in *) s->sock_addr;
+ if ((0 == memcmp ((const void *) &u4->ipv4_addr,(const void *) &s4->sin_addr, sizeof (struct in_addr))) &&
+ (u4->u4_port == s4->sin_port))
{
- u4 = (struct IPv4UdpAddress * ) address->address;
- struct sockaddr_in *sai = (struct sockaddr_in *) s->sock_addr;
- if ((u4->ipv4_addr == sai->sin_addr.s_addr) &&
- (u4->u4_port == sai->sin_port))
- {
- r = s;
- return GNUNET_NO;
- }
+ cctx->res = s;
+ return GNUNET_NO;
}
- else if (address->address_length == sizeof (struct IPv6UdpAddress))
- {
- u6 = (struct IPv6UdpAddress * ) address->address;
- struct sockaddr_in6 *sai = (struct sockaddr_in6 *) s->sock_addr;
- if ((0 == memcmp (&u6->ipv6_addr, &sai->sin6_addr, sizeof (struct in6_addr))) &&
- (u6->u6_port == sai->sin6_port))
- {
- r = s;
- return GNUNET_NO;
- }
- }
- else
+ }
+ if ((address->address_length == sizeof (struct IPv6UdpAddress)) &&
+ (s_addrlen == sizeof (struct sockaddr_in6)))
+ {
+ struct IPv6UdpAddress * u6 = NULL;
+ u6 = (struct IPv6UdpAddress *) address->address;
+ const struct sockaddr_in6 *s6 = (const struct sockaddr_in6 *) s->sock_addr;
+ if ((0 == memcmp (&u6->ipv6_addr, &s6->sin6_addr, sizeof (struct in6_addr))) &&
+ (u6->u6_port == s6->sin6_port))
{
- GNUNET_break (0);
- return GNUNET_YES;
+ cctx->res = s;
+ return GNUNET_NO;
}
}
+
+
return GNUNET_YES;
}
}
/* check if session already exists */
- if (NULL != NULL)
- {
struct SessionCompareContext cctx;
cctx.addr = address;
cctx.res = NULL;
#if DEBUG_UDP
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Looking for existing session for peer `%s' `%s' \n", GNUNET_i2s (&address->peer), udp_address_to_string(NULL, address->address, address->address_length));
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Looking for existing session for peer `%s' `%s' \n", GNUNET_i2s (&address->peer), udp_address_to_string(NULL, address->address, address->address_length));
#endif
GNUNET_CONTAINER_multihashmap_get_multiple(plugin->sessions, &address->peer.hashPubKey, session_cmp_it, &cctx);
if (cctx.res != NULL)
{
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Found existing session\n");
+#if DEBUG_UDP
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Found existing session %p\n", cctx.res);
+#endif
return cctx.res;
}
- }
+
/* otherwise create new */
s = create_session (plugin,
&address->peer,
address->address,
address->address_length,
NULL, NULL);
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Creating new session %p\n", s);
+#if DEBUG_UDP
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Creating new session %p for peer `%s' address `%s'\n",
+ s,
+ GNUNET_i2s(&address->peer),
+ udp_address_to_string(NULL,address->address,address->address_length));
+#endif
GNUNET_assert (GNUNET_OK ==
GNUNET_CONTAINER_multihashmap_put (plugin->sessions,
&s->target.hashPubKey,
s,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
return s;
}
*/
static ssize_t
udp_plugin_send (void *cls,
- struct Session *session,
+ struct Session *s,
const char *msgbuf, size_t msgbuf_size,
unsigned int priority,
struct GNUNET_TIME_Relative to,
GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls)
{
+ struct Plugin *plugin = cls;
+ size_t mlen = msgbuf_size + sizeof (struct UDPMessage);;
+
+ struct GNUNET_TIME_Relative delta;
+ struct UDPMessageWrapper * udpw;
+ struct UDPMessage *udp;
+
+ GNUNET_assert (plugin != NULL);
+ GNUNET_assert (s != NULL);
+
+ if (mlen >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
+ {
+ GNUNET_break (0);
+ return GNUNET_SYSERR;
+ }
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "UDP transmits %u-byte message to `%s' using address `%s'\n",
+ msgbuf_size,
+ GNUNET_i2s (&s->target),
+ GNUNET_a2s(s->sock_addr, s->addrlen));
+
+ if (GNUNET_YES != GNUNET_CONTAINER_multihashmap_contains_value(plugin->sessions, &s->target.hashPubKey, s))
+ {
+ GNUNET_break (0);
+ return GNUNET_SYSERR;
+ }
+
+ udpw = GNUNET_malloc (sizeof (struct UDPMessageWrapper) + sizeof (struct UDPMessage) + msgbuf_size);
+ udpw->session = s;
+ udp = (struct UDPMessage *) &udpw[1];
+ udpw->udp = udp;
+ udpw->msg_size = mlen;
+ udpw->cont = cont;
+ udpw->cont_cls = cont_cls;
- return 0;
+ udp->header.size = htons (mlen);
+ udp->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE);
+ udp->reserved = htonl (0);
+ udp->sender = *plugin->env->my_identity;
+ memcpy (&udp[1], msgbuf, msgbuf_size);
+ GNUNET_CONTAINER_DLL_insert(plugin->msg_head, plugin->msg_tail, udpw);
+
+ delta = GNUNET_TIME_absolute_get_remaining (s->flow_delay_from_other_peer);
+ return mlen;
}
static ssize_t udp_plugin_send_wrapper (void *cls,
}
+
+/**
+ * Message tokenizer has broken up an incomming message. Pass it on
+ * to the service.
+ *
+ * @param cls the 'struct Plugin'
+ * @param client the 'struct SourceInformation'
+ * @param hdr the actual message
+ */
+static void
+process_inbound_tokenized_messages (void *cls, void *client,
+ const struct GNUNET_MessageHeader *hdr)
+{
+ struct Plugin *plugin = cls;
+ struct SourceInformation *si = client;
+ struct GNUNET_ATS_Information ats[2];
+ struct GNUNET_TIME_Relative delay;
+
+ /* setup ATS */
+ ats[0].type = htonl (GNUNET_ATS_QUALITY_NET_DISTANCE);
+ ats[0].value = htonl (1);
+ ats[1].type = htonl (GNUNET_ATS_NETWORK_TYPE);
+ ats[1].value = htonl (GNUNET_ATS_COST_WAN);
+ //GNUNET_break (ntohl(si->session->ats_address_network_type) != GNUNET_ATS_NET_UNSPECIFIED);
+ delay = plugin->env->receive (plugin->env->cls,
+ &si->sender,
+ hdr,
+ (const struct GNUNET_ATS_Information *) &ats, 2,
+ NULL,
+ si->arg,
+ si->args);
+ //si->session->flow_delay_for_other_peer = delay;
+}
+
+
+/**
+ * We've received a UDP Message. Process it (pass contents to main service).
+ *
+ * @param plugin plugin context
+ * @param msg the message
+ * @param sender_addr sender address
+ * @param sender_addr_len number of bytes in sender_addr
+ */
+static void
+process_udp_message (struct Plugin *plugin, const struct UDPMessage *msg,
+ const struct sockaddr *sender_addr,
+ socklen_t sender_addr_len)
+{
+ struct SourceInformation si;
+ struct IPv4UdpAddress u4;
+ struct IPv6UdpAddress u6;
+ struct GNUNET_ATS_Information ats;
+ const void *arg;
+ size_t args;
+
+ if (0 != ntohl (msg->reserved))
+ {
+ GNUNET_break_op (0);
+ return;
+ }
+ if (ntohs (msg->header.size) <
+ sizeof (struct GNUNET_MessageHeader) + sizeof (struct UDPMessage))
+ {
+ GNUNET_break_op (0);
+ return;
+ }
+
+ ats.type = htonl (GNUNET_ATS_NETWORK_TYPE);
+ ats.value = htonl (GNUNET_ATS_NET_UNSPECIFIED);
+ /* convert address */
+ switch (sender_addr->sa_family)
+ {
+ case AF_INET:
+ GNUNET_assert (sender_addr_len == sizeof (struct sockaddr_in));
+ u4.ipv4_addr = ((struct sockaddr_in *) sender_addr)->sin_addr.s_addr;
+ u4.u4_port = ((struct sockaddr_in *) sender_addr)->sin_port;
+ arg = &u4;
+ args = sizeof (u4);
+ break;
+ case AF_INET6:
+ GNUNET_assert (sender_addr_len == sizeof (struct sockaddr_in6));
+ u6.ipv6_addr = ((struct sockaddr_in6 *) sender_addr)->sin6_addr;
+ u6.u6_port = ((struct sockaddr_in6 *) sender_addr)->sin6_port;
+ arg = &u6;
+ args = sizeof (u6);
+ break;
+ default:
+ GNUNET_break (0);
+ return;
+ }
+#if DEBUG_UDP
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Received message with %u bytes from peer `%s' at `%s'\n",
+ (unsigned int) ntohs (msg->header.size), GNUNET_i2s (&msg->sender),
+ GNUNET_a2s (sender_addr, sender_addr_len));
+#endif
+
+ /* iterate over all embedded messages */
+ si.sender = msg->sender;
+ si.arg = arg;
+ si.args = args;
+
+ GNUNET_SERVER_mst_receive (plugin->mst, &si, (const char *) &msg[1],
+ ntohs (msg->header.size) -
+ sizeof (struct UDPMessage), GNUNET_YES, GNUNET_NO);
+}
+
+
/**
* Read and process a message from the given socket.
*
* @param rsock socket to read from
*/
static void
-udp_read (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *rsock)
+udp_select_read (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *rsock)
{
socklen_t fromlen;
char addr[32];
GNUNET_break_op (0);
return;
}
- /*
+
process_udp_message (plugin, (const struct UDPMessage *) msg,
(const struct sockaddr *) addr, fromlen);
- */
+
return;
default:
GNUNET_break_op (0);
}
}
+size_t
+udp_select_send (struct Plugin *plugin)
+{
+ ssize_t sent;
+ size_t slen;
+
+ struct UDPMessageWrapper *udpw = plugin->msg_head;
+ const struct sockaddr * sa = udpw->session->sock_addr;
+
+ switch (sa->sa_family)
+ {
+ case AF_INET:
+ if (NULL == plugin->sockv4)
+ return 0;
+ sent =
+ GNUNET_NETWORK_socket_sendto (plugin->sockv4, udpw->udp, udpw->msg_size,
+ sa, slen = sizeof (struct sockaddr_in));
+ break;
+ case AF_INET6:
+ if (NULL == plugin->sockv6)
+ return 0;
+ sent =
+ GNUNET_NETWORK_socket_sendto (plugin->sockv6, udpw->udp, udpw->msg_size,
+ sa, slen = sizeof (struct sockaddr_in6));
+ break;
+ default:
+ GNUNET_break (0);
+ return 0;
+ }
+ if (GNUNET_SYSERR == sent)
+ {
+ GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "sendto");
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "UDP transmitted %u-byte message to %s (%d: %s)\n",
+ (unsigned int) ntohs (udpw->msg_size), GNUNET_a2s (sa, slen), (int) sent,
+ (sent < 0) ? STRERROR (errno) : "ok");
+ if (udpw->cont != NULL)
+ udpw->cont (udpw->cont_cls, &udpw->session->target, GNUNET_SYSERR);
+ }
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "UDP transmitted %u-byte message to %s (%d: %s)\n",
+ (unsigned int) ntohs (udpw->msg_size), GNUNET_a2s (sa, slen), (int) sent,
+ (sent < 0) ? STRERROR (errno) : "ok");
+
+ if (udpw->cont != NULL)
+ udpw->cont (udpw->cont_cls, &udpw->session->target, GNUNET_OK);
+
+ GNUNET_CONTAINER_DLL_remove(plugin->msg_head, plugin->msg_tail, udpw);
+ GNUNET_free (udpw);
+
+ return sent;
+}
+
/**
* We have been notified that our readset has something to read. We don't
* know which socket needs to be read, so we have to check each one
{
if ((NULL != plugin->sockv4) &&
(GNUNET_NETWORK_fdset_isset (tc->read_ready, plugin->sockv4)))
- udp_read (plugin, plugin->sockv4);
+ udp_select_read (plugin, plugin->sockv4);
if ((NULL != plugin->sockv6) &&
(GNUNET_NETWORK_fdset_isset (tc->read_ready, plugin->sockv6)))
- udp_read (plugin, plugin->sockv6);
+ udp_select_read (plugin, plugin->sockv6);
}
if ((tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY) != 0)
{
- /* TODO */
+ if (plugin->msg_head != NULL)
+ udp_select_send (plugin);
}
plugin->select_task = GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
plugin->sessions = GNUNET_CONTAINER_multihashmap_create (10);
+ plugin->mst = GNUNET_SERVER_mst_create (&process_inbound_tokenized_messages, plugin);
plugin->port = port;
plugin->aport = aport;
plugin->last_expected_delay = GNUNET_TIME_UNIT_SECONDS;
api->send = &udp_plugin_send_wrapper;
api->send_with_session = &udp_plugin_send;
- LOG (GNUNET_ERROR_TYPE_ERROR, "Setting up sockets\n");
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Setting up sockets\n");
res = setup_sockets (plugin, &serverAddrv6, &serverAddrv4);
if ((res == 0) || ((plugin->sockv4 == NULL) && (plugin->sockv6 == NULL)))
{
return NULL;
}
- LOG (GNUNET_ERROR_TYPE_ERROR, "Starting broadcasting\n");
- setup_broadcast (plugin, &serverAddrv6, &serverAddrv4);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Starting broadcasting\n");
+ if (broadcast == GNUNET_YES)
+ setup_broadcast (plugin, &serverAddrv6, &serverAddrv4);
GNUNET_free_non_null (bind4_address);
GNUNET_NETWORK_fdset_destroy (plugin->ws);
GNUNET_NAT_unregister (plugin->nat);
+ if (plugin->mst != NULL)
+ {
+ GNUNET_SERVER_mst_destroy(plugin->mst);
+ plugin->mst = NULL;
+ }
+
+ /* Clean up leftover messages */
+ struct UDPMessageWrapper *udpw = plugin->msg_head;
+ while (udpw != NULL)
+ {
+ struct UDPMessageWrapper *tmp = udpw->next;
+ GNUNET_CONTAINER_DLL_remove(plugin->msg_head, plugin->msg_tail, udpw);
+ if (udpw->cont != NULL)
+ udpw->cont (udpw->cont_cls, &udpw->session->target, GNUNET_SYSERR);
+ GNUNET_free (udpw);
+ udpw = tmp;
+ }
+
/* Clean up sessions */
#if DEBUG_UDP
LOG (GNUNET_ERROR_TYPE_DEBUG,