+ if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK)
+ GNUNET_SCHEDULER_cancel(plugin->select_task);
+
+ plugin->select_task =
+ GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ plugin->rs_v4,
+ plugin->ws_v4,
+ &udp_plugin_select, plugin);
+ plugin->with_v4_ws = GNUNET_YES;
+ }
+ }
+ else if (s->addrlen == sizeof (struct sockaddr_in6))
+ {
+ if (plugin->with_v6_ws == GNUNET_NO)
+ {
+ if (plugin->select_task_v6 != GNUNET_SCHEDULER_NO_TASK)
+ GNUNET_SCHEDULER_cancel(plugin->select_task_v6);
+
+ plugin->select_task_v6 =
+ GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ plugin->rs_v6,
+ plugin->ws_v6,
+ &udp_plugin_select_v6, plugin);
+ plugin->with_v6_ws = GNUNET_YES;
+ }
+ }
+
+ return mlen;
+}
+
+
+/**
+ * Our external IP address/port mapping has changed.
+ *
+ * @param cls closure, the 'struct LocalAddrList'
+ * @param add_remove GNUNET_YES to mean the new public IP address, GNUNET_NO to mean
+ * the previous (now invalid) one
+ * @param addr either the previous or the new public IP address
+ * @param addrlen actual lenght of the address
+ */
+static void
+udp_nat_port_map_callback (void *cls, int add_remove,
+ const struct sockaddr *addr, socklen_t addrlen)
+{
+ struct Plugin *plugin = cls;
+ struct IPv4UdpAddress u4;
+ struct IPv6UdpAddress u6;
+ void *arg;
+ size_t args;
+
+ /* convert 'addr' to our internal format */
+ switch (addr->sa_family)
+ {
+ case AF_INET:
+ GNUNET_assert (addrlen == sizeof (struct sockaddr_in));
+ u4.ipv4_addr = ((struct sockaddr_in *) addr)->sin_addr.s_addr;
+ u4.u4_port = ((struct sockaddr_in *) addr)->sin_port;
+ arg = &u4;
+ args = sizeof (u4);
+ break;
+ case AF_INET6:
+ GNUNET_assert (addrlen == sizeof (struct sockaddr_in6));
+ memcpy (&u6.ipv6_addr, &((struct sockaddr_in6 *) addr)->sin6_addr,
+ sizeof (struct in6_addr));
+ u6.u6_port = ((struct sockaddr_in6 *) addr)->sin6_port;
+ arg = &u6;
+ args = sizeof (u6);
+ break;
+ default:
+ GNUNET_break (0);
+ return;
+ }
+ /* modify our published address list */
+ plugin->env->notify_address (plugin->env->cls, add_remove, arg, args);
+}
+
+
+
+/**
+ * Message tokenizer has broken up an incomming message. Pass it on
+ * to the service.
+ *
+ * @param cls the 'struct Plugin'
+ * @param client the 'struct SourceInformation'
+ * @param hdr the actual message
+ */
+static int
+process_inbound_tokenized_messages (void *cls, void *client,
+ const struct GNUNET_MessageHeader *hdr)
+{
+ struct Plugin *plugin = cls;
+ struct SourceInformation *si = client;
+ struct GNUNET_ATS_Information ats[2];
+ struct GNUNET_TIME_Relative delay;
+
+ GNUNET_assert (si->session != NULL);
+ if (GNUNET_YES == si->session->in_destroy)
+ return GNUNET_OK;
+ /* setup ATS */
+ ats[0].type = htonl (GNUNET_ATS_QUALITY_NET_DISTANCE);
+ ats[0].value = htonl (1);
+ ats[1] = si->session->ats;
+ GNUNET_break (ntohl(ats[1].value) != GNUNET_ATS_NET_UNSPECIFIED);
+ delay = plugin->env->receive (plugin->env->cls,
+ &si->sender,
+ hdr,
+ (const struct GNUNET_ATS_Information *) &ats, 2,
+ si->session,
+ si->arg,
+ si->args);
+ si->session->flow_delay_for_other_peer = delay;
+ reschedule_session_timeout(si->session);
+ return GNUNET_OK;
+}
+
+
+/**
+ * We've received a UDP Message. Process it (pass contents to main service).
+ *
+ * @param plugin plugin context
+ * @param msg the message
+ * @param sender_addr sender address
+ * @param sender_addr_len number of bytes in sender_addr
+ */
+static void
+process_udp_message (struct Plugin *plugin, const struct UDPMessage *msg,
+ const struct sockaddr *sender_addr,
+ socklen_t sender_addr_len)
+{
+ struct SourceInformation si;
+ struct Session * s;
+ struct IPv4UdpAddress u4;
+ struct IPv6UdpAddress u6;
+ const void *arg;
+ size_t args;
+
+ if (0 != ntohl (msg->reserved))
+ {
+ GNUNET_break_op (0);
+ return;
+ }
+ if (ntohs (msg->header.size) <
+ sizeof (struct GNUNET_MessageHeader) + sizeof (struct UDPMessage))
+ {
+ GNUNET_break_op (0);
+ return;
+ }
+
+ /* convert address */
+ switch (sender_addr->sa_family)
+ {
+ case AF_INET:
+ GNUNET_assert (sender_addr_len == sizeof (struct sockaddr_in));
+ u4.ipv4_addr = ((struct sockaddr_in *) sender_addr)->sin_addr.s_addr;
+ u4.u4_port = ((struct sockaddr_in *) sender_addr)->sin_port;
+ arg = &u4;
+ args = sizeof (u4);
+ break;
+ case AF_INET6:
+ GNUNET_assert (sender_addr_len == sizeof (struct sockaddr_in6));
+ u6.ipv6_addr = ((struct sockaddr_in6 *) sender_addr)->sin6_addr;
+ u6.u6_port = ((struct sockaddr_in6 *) sender_addr)->sin6_port;
+ arg = &u6;
+ args = sizeof (u6);
+ break;
+ default:
+ GNUNET_break (0);
+ return;
+ }
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Received message with %u bytes from peer `%s' at `%s'\n",
+ (unsigned int) ntohs (msg->header.size), GNUNET_i2s (&msg->sender),
+ GNUNET_a2s (sender_addr, sender_addr_len));
+
+ struct GNUNET_HELLO_Address * address = GNUNET_HELLO_address_allocate(&msg->sender, "udp", arg, args);
+ s = udp_plugin_get_session(plugin, address);
+ GNUNET_free (address);
+
+ /* iterate over all embedded messages */
+ si.session = s;
+ si.sender = msg->sender;
+ si.arg = arg;
+ si.args = args;
+ s->rc++;
+ GNUNET_SERVER_mst_receive (plugin->mst, &si, (const char *) &msg[1],
+ ntohs (msg->header.size) -
+ sizeof (struct UDPMessage), GNUNET_YES, GNUNET_NO);
+ s->rc--;
+ if ( (0 == s->rc) && (GNUNET_YES == s->in_destroy))
+ free_session (s);
+}
+
+
+/**
+ * Scan the heap for a receive context with the given address.
+ *
+ * @param cls the 'struct FindReceiveContext'
+ * @param node internal node of the heap
+ * @param element value stored at the node (a 'struct ReceiveContext')
+ * @param cost cost associated with the node
+ * @return GNUNET_YES if we should continue to iterate,
+ * GNUNET_NO if not.
+ */
+static int
+find_receive_context (void *cls, struct GNUNET_CONTAINER_HeapNode *node,
+ void *element, GNUNET_CONTAINER_HeapCostType cost)
+{
+ struct FindReceiveContext *frc = cls;
+ struct DefragContext *e = element;
+
+ if ((frc->addr_len == e->addr_len) &&
+ (0 == memcmp (frc->addr, e->src_addr, frc->addr_len)))
+ {
+ frc->rc = e;
+ return GNUNET_NO;
+ }
+ return GNUNET_YES;
+}
+
+
+/**
+ * Process a defragmented message.
+ *
+ * @param cls the 'struct ReceiveContext'
+ * @param msg the message
+ */
+static void
+fragment_msg_proc (void *cls, const struct GNUNET_MessageHeader *msg)
+{
+ struct DefragContext *rc = cls;
+
+ if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE)
+ {
+ GNUNET_break (0);
+ return;
+ }
+ if (ntohs (msg->size) < sizeof (struct UDPMessage))
+ {
+ GNUNET_break (0);
+ return;
+ }
+ process_udp_message (rc->plugin, (const struct UDPMessage *) msg,
+ rc->src_addr, rc->addr_len);
+}
+
+
+struct LookupContext
+{
+ const struct sockaddr * addr;
+
+ struct Session *res;
+
+ size_t addrlen;
+};
+
+
+static int
+lookup_session_by_addr_it (void *cls, const struct GNUNET_HashCode * key, void *value)
+{
+ struct LookupContext *l_ctx = cls;
+ struct Session * s = value;
+
+ if ((s->addrlen == l_ctx->addrlen) &&
+ (0 == memcmp (s->sock_addr, l_ctx->addr, s->addrlen)))
+ {
+ l_ctx->res = s;
+ return GNUNET_NO;
+ }
+ return GNUNET_YES;
+}
+
+
+/**
+ * Transmit an acknowledgement.
+ *
+ * @param cls the 'struct ReceiveContext'
+ * @param id message ID (unused)
+ * @param msg ack to transmit
+ */
+static void
+ack_proc (void *cls, uint32_t id, const struct GNUNET_MessageHeader *msg)
+{
+ struct DefragContext *rc = cls;
+ size_t msize = sizeof (struct UDP_ACK_Message) + ntohs (msg->size);
+ struct UDP_ACK_Message *udp_ack;
+ uint32_t delay = 0;
+ struct UDPMessageWrapper *udpw;
+ struct Session *s;
+
+ struct LookupContext l_ctx;
+ l_ctx.addr = rc->src_addr;
+ l_ctx.addrlen = rc->addr_len;
+ l_ctx.res = NULL;
+ GNUNET_CONTAINER_multihashmap_iterate (rc->plugin->sessions,
+ &lookup_session_by_addr_it,
+ &l_ctx);
+ s = l_ctx.res;
+
+ if (NULL == s)
+ return;
+
+ if (s->flow_delay_for_other_peer.rel_value <= UINT32_MAX)
+ delay = s->flow_delay_for_other_peer.rel_value;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending ACK to `%s' including delay of %u ms\n",
+ GNUNET_a2s (rc->src_addr,
+ (rc->src_addr->sa_family ==
+ AF_INET) ? sizeof (struct sockaddr_in) : sizeof (struct
+ sockaddr_in6)),
+ delay);
+ udpw = GNUNET_malloc (sizeof (struct UDPMessageWrapper) + msize);
+ udpw->msg_size = msize;
+ udpw->session = s;
+ udpw->timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
+ udpw->udp = (char *)&udpw[1];
+ udp_ack = (struct UDP_ACK_Message *) udpw->udp;
+ udp_ack->header.size = htons ((uint16_t) msize);
+ udp_ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK);
+ udp_ack->delay = htonl (delay);
+ udp_ack->sender = *rc->plugin->env->my_identity;
+ memcpy (&udp_ack[1], msg, ntohs (msg->size));
+
+ enqueue (rc->plugin, udpw);
+}
+
+
+static void
+read_process_msg (struct Plugin *plugin,
+ const struct GNUNET_MessageHeader *msg,
+ const char *addr,
+ socklen_t fromlen)
+{
+ if (ntohs (msg->size) < sizeof (struct UDPMessage))
+ {
+ GNUNET_break_op (0);
+ return;
+ }
+ process_udp_message (plugin, (const struct UDPMessage *) msg,
+ (const struct sockaddr *) addr, fromlen);
+}
+
+
+static void
+read_process_ack (struct Plugin *plugin,
+ const struct GNUNET_MessageHeader *msg,
+ char *addr,
+ socklen_t fromlen)
+{
+ const struct GNUNET_MessageHeader *ack;
+ const struct UDP_ACK_Message *udp_ack;
+ struct LookupContext l_ctx;
+ struct Session *s;
+ struct GNUNET_TIME_Relative flow_delay;
+
+ if (ntohs (msg->size) <
+ sizeof (struct UDP_ACK_Message) + sizeof (struct GNUNET_MessageHeader))
+ {
+ GNUNET_break_op (0);
+ return;
+ }
+ udp_ack = (const struct UDP_ACK_Message *) msg;
+ l_ctx.addr = (const struct sockaddr *) addr;
+ l_ctx.addrlen = fromlen;
+ l_ctx.res = NULL;
+ GNUNET_CONTAINER_multihashmap_iterate (plugin->sessions,
+ &lookup_session_by_addr_it,
+ &l_ctx);
+ s = l_ctx.res;
+
+ if ((s == NULL) || (s->frag_ctx == NULL))
+ return;
+
+ flow_delay.rel_value = (uint64_t) ntohl (udp_ack->delay);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "We received a sending delay of %llu\n",
+ flow_delay.rel_value);
+ s->flow_delay_from_other_peer =
+ GNUNET_TIME_relative_to_absolute (flow_delay);
+
+ ack = (const struct GNUNET_MessageHeader *) &udp_ack[1];
+ if (ntohs (ack->size) !=
+ ntohs (msg->size) - sizeof (struct UDP_ACK_Message))
+ {
+ GNUNET_break_op (0);
+ return;
+ }
+
+ if (GNUNET_OK != GNUNET_FRAGMENT_process_ack (s->frag_ctx->frag, ack))
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "UDP processes %u-byte acknowledgement from `%s' at `%s'\n",
+ (unsigned int) ntohs (msg->size), GNUNET_i2s (&udp_ack->sender),
+ GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
+ return;
+ }
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "FULL MESSAGE ACKed\n",
+ (unsigned int) ntohs (msg->size), GNUNET_i2s (&udp_ack->sender),
+ GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
+ s->last_expected_delay = GNUNET_FRAGMENT_context_destroy (s->frag_ctx->frag);
+
+ struct UDPMessageWrapper * udpw;
+ struct UDPMessageWrapper * tmp;
+ if (s->addrlen == sizeof (struct sockaddr_in6))
+ {
+ udpw = plugin->ipv6_queue_head;
+ while (NULL != udpw)
+ {
+ tmp = udpw->next;
+ if ((udpw->frag_ctx != NULL) && (udpw->frag_ctx == s->frag_ctx))
+ {
+ GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
+ GNUNET_free (udpw);
+ }
+ udpw = tmp;
+ }
+ }
+ if (s->addrlen == sizeof (struct sockaddr_in))
+ {
+ udpw = plugin->ipv4_queue_head;
+ while (udpw!= NULL)
+ {
+ tmp = udpw->next;
+ if ((udpw->frag_ctx != NULL) && (udpw->frag_ctx == s->frag_ctx))
+ {
+ GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
+ GNUNET_free (udpw);
+ }
+ udpw = tmp;
+ }
+ }
+
+ if (s->frag_ctx->cont != NULL)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Calling continuation for fragmented message to `%s' with result %s\n",
+ GNUNET_i2s (&s->target), "OK");
+ s->frag_ctx->cont (s->frag_ctx->cont_cls, &udp_ack->sender, GNUNET_OK);
+ }
+
+ GNUNET_free (s->frag_ctx);
+ s->frag_ctx = NULL;
+}
+
+
+static void
+read_process_fragment (struct Plugin *plugin,
+ const struct GNUNET_MessageHeader *msg,
+ char *addr,
+ socklen_t fromlen)
+{
+ struct DefragContext *d_ctx;
+ struct GNUNET_TIME_Absolute now;
+ struct FindReceiveContext frc;
+
+ frc.rc = NULL;
+ frc.addr = (const struct sockaddr *) addr;
+ frc.addr_len = fromlen;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "UDP processes %u-byte fragment from `%s'\n",
+ (unsigned int) ntohs (msg->size),
+ GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
+ /* Lookup existing receive context for this address */
+ GNUNET_CONTAINER_heap_iterate (plugin->defrag_ctxs,
+ &find_receive_context,
+ &frc);
+ now = GNUNET_TIME_absolute_get ();
+ d_ctx = frc.rc;
+
+ if (d_ctx == NULL)
+ {
+ /* Create a new defragmentation context */
+ d_ctx = GNUNET_malloc (sizeof (struct DefragContext) + fromlen);
+ memcpy (&d_ctx[1], addr, fromlen);
+ d_ctx->src_addr = (const struct sockaddr *) &d_ctx[1];
+ d_ctx->addr_len = fromlen;
+ d_ctx->plugin = plugin;
+ d_ctx->defrag =
+ GNUNET_DEFRAGMENT_context_create (plugin->env->stats, UDP_MTU,
+ UDP_MAX_MESSAGES_IN_DEFRAG, d_ctx,
+ &fragment_msg_proc, &ack_proc);
+ d_ctx->hnode =
+ GNUNET_CONTAINER_heap_insert (plugin->defrag_ctxs, d_ctx,
+ (GNUNET_CONTAINER_HeapCostType)
+ now.abs_value);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Created new defragmentation context for %u-byte fragment from `%s'\n",
+ (unsigned int) ntohs (msg->size),
+ GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
+ }
+ else
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Found existing defragmentation context for %u-byte fragment from `%s'\n",
+ (unsigned int) ntohs (msg->size),
+ GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
+ }
+
+ if (GNUNET_OK == GNUNET_DEFRAGMENT_process_fragment (d_ctx->defrag, msg))
+ {
+ /* keep this 'rc' from expiring */
+ GNUNET_CONTAINER_heap_update_cost (plugin->defrag_ctxs, d_ctx->hnode,
+ (GNUNET_CONTAINER_HeapCostType)
+ now.abs_value);
+ }
+ if (GNUNET_CONTAINER_heap_get_size (plugin->defrag_ctxs) >
+ UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG)
+ {
+ /* remove 'rc' that was inactive the longest */
+ d_ctx = GNUNET_CONTAINER_heap_remove_root (plugin->defrag_ctxs);
+ GNUNET_assert (NULL != d_ctx);
+ GNUNET_DEFRAGMENT_context_destroy (d_ctx->defrag);
+ GNUNET_free (d_ctx);
+ }
+}
+
+
+/**
+ * Read and process a message from the given socket.
+ *
+ * @param plugin the overall plugin
+ * @param rsock socket to read from
+ */
+static void
+udp_select_read (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *rsock)
+{
+ socklen_t fromlen;
+ char addr[32];
+ char buf[65536] GNUNET_ALIGN;
+ ssize_t size;
+ const struct GNUNET_MessageHeader *msg;
+
+ fromlen = sizeof (addr);
+ memset (&addr, 0, sizeof (addr));
+ size = GNUNET_NETWORK_socket_recvfrom (rsock, buf, sizeof (buf),
+ (struct sockaddr *) &addr, &fromlen);
+#if MINGW
+ /* On SOCK_DGRAM UDP sockets recvfrom might fail with a
+ * WSAECONNRESET error to indicate that previous sendto() (???)
+ * on this socket has failed.
+ */
+ if ( (-1 == size) && (ECONNRESET == errno) )
+ return;