#define LOG(kind,...) GNUNET_log_from (kind, "transport-udp", __VA_ARGS__)
+ * Number of messages we can defragment in parallel. We only really
+ * defragment 1 message at a time, but if messages get re-ordered, we
+ * may want to keep knowledge about the previous message to avoid
+ * discarding the current message in favor of a single fragment of a
+ * previous message. 3 should be good since we don't expect massive
+ * message reorderings with UDP.
+ */
+ * We keep a defragmentation queue per sender address. How many
+ * sender addresses do we support at the same time? Memory consumption
+ * is roughly a factor of 32k * UDP_MAX_MESSAGES_IN_DEFRAG times this
+ * value. (So 128 corresponds to 12 MB and should suffice for
+ * connecting to roughly 128 peers via UDP).
+ */
* Closure for 'append_port'.
* Desired delay for next sending we received from other peer
struct GNUNET_TIME_Absolute flow_delay_from_other_peer;
+ struct FragmentationContext * head;
+ struct FragmentationContext * tail;
+ * Closure for 'find_receive_context'.
+ */
+struct FindReceiveContext
+ /**
+ * Where to store the result.
+ */
+ struct ReceiveContext *rc;
+ /**
+ * Address to find.
+ */
+ const struct sockaddr *addr;
+ /**
+ * Number of bytes in 'addr'.
+ */
+ socklen_t addr_len;
+ struct Session *session;
+ * Data structure to track defragmentation contexts based
+ * on the source of the UDP traffic.
+ */
+struct ReceiveContext
+ /**
+ * Defragmentation context.
+ */
+ struct GNUNET_DEFRAGMENT_Context *defrag;
+ /**
+ * Source address this receive context is for (allocated at the
+ * end of the struct).
+ */
+ const struct sockaddr *src_addr;
+ /**
+ * Reference to master plugin struct.
+ */
+ struct Plugin *plugin;
+ /**
+ * Node in the defrag heap.
+ */
+ struct GNUNET_CONTAINER_HeapNode *hnode;
+ /**
+ * Length of 'src_addr'
+ */
+ size_t addr_len;
+ struct GNUNET_PeerIdentity id;
+ * Closure for 'process_inbound_tokenized_messages'
+ */
+struct FragmentationContext
+ struct FragmentationContext * next;
+ struct FragmentationContext * prev;
+ struct Plugin * plugin;
+ struct GNUNET_FRAGMENT_Context * frag;
+ struct Session * session;
+ /**
+ * Function to call upon completion of the transmission.
+ */
+ GNUNET_TRANSPORT_TransmitContinuation cont;
+ /**
+ * Closure for 'cont'.
+ */
+ void *cont_cls;
+ size_t bytes_to_send;
+struct UDPMessageWrapper
+ struct Session *session;
+ struct UDPMessageWrapper *prev;
+ struct UDPMessageWrapper *next;
+ struct UDPMessage *udp;
+ size_t msg_size;
+ /**
+ * Function to call upon completion of the transmission.
+ */
+ GNUNET_TRANSPORT_TransmitContinuation cont;
+ /**
+ * Closure for 'cont'.
+ */
+ void *cont_cls;
+ struct FragmentationContext *frag;
+ * UDP ACK Message-Packet header (after defragmentation).
+ */
+struct UDP_ACK_Message
+ /**
+ * Message header.
+ */
+ struct GNUNET_MessageHeader header;
+ /**
+ * Desired delay for flow control
+ */
+ uint32_t delay;
+ /**
+ * What is the identity of the sender
+ */
+ struct GNUNET_PeerIdentity sender;
* Function called for a quick conversion of the binary address to
* a numeric address. Note that the caller must not free the
GNUNET_i2s (&s->target),
GNUNET_a2s (s->sock_addr, s->addrlen));
+ struct FragmentationContext *fctx = s->head;
+ while (fctx != NULL)
+ {
+ GNUNET_FRAGMENT_context_destroy(fctx->frag);
+ GNUNET_CONTAINER_DLL_remove(s->head, s->tail, fctx);
+ GNUNET_free (fctx);
+ fctx = s->head;
+ }
plugin->env->session_end (plugin->env->cls, &s->target, s);
return s;
+ * Function that is called with messages created by the fragmentation
+ * module. In the case of the 'proc' callback of the
+ * GNUNET_FRAGMENT_context_create function, this function must
+ * eventually call 'GNUNET_FRAGMENT_context_transmission_done'.
+ *
+ * @param cls closure, the 'struct FragmentationContext'
+ * @param msg the message that was created
+ */
+static void
+enqueue_fragment (void *cls, const struct GNUNET_MessageHeader *msg)
+ struct FragmentationContext *frag_ctx = cls;
+ struct Plugin *plugin = frag_ctx->plugin;
+ struct UDPMessageWrapper * udpw;
+ size_t msg_len = ntohs (msg->size);
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Enqueueing fragment with %u bytes %u\n", msg_len , sizeof (struct UDPMessageWrapper));
+ udpw = GNUNET_malloc (sizeof (struct UDPMessageWrapper) + msg_len);
+ udpw->session = frag_ctx->session;
+ udpw->udp = (struct UDPMessage *) &udpw[1];
+ udpw->msg_size = msg_len;
+ udpw->cont = frag_ctx->cont;
+ udpw->cont_cls = frag_ctx->cont_cls;
+ udpw->frag = frag_ctx;
+ memcpy (udpw->udp, msg, msg_len);
+ GNUNET_CONTAINER_DLL_insert(plugin->msg_head, plugin->msg_tail, udpw);
* Function that can be used by the transport service to transmit
GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls)
struct Plugin *plugin = cls;
- size_t mlen = msgbuf_size + sizeof (struct UDPMessage);;
+ size_t mlen = msgbuf_size + sizeof (struct UDPMessage);
struct GNUNET_TIME_Relative delta;
struct UDPMessageWrapper * udpw;
struct UDPMessage *udp;
+ char mbuf[mlen];
GNUNET_assert (plugin != NULL);
GNUNET_assert (s != NULL);
- "UDP transmits %u-byte message to `%s' using address `%s'\n",
- msgbuf_size,
- GNUNET_i2s (&s->target),
- GNUNET_a2s(s->sock_addr, s->addrlen));
if (GNUNET_YES != GNUNET_CONTAINER_multihashmap_contains_value(plugin->sessions, &s->target.hashPubKey, s))
GNUNET_break (0);
- udpw = GNUNET_malloc (sizeof (struct UDPMessageWrapper) + sizeof (struct UDPMessage) + msgbuf_size);
- udpw->session = s;
- udp = (struct UDPMessage *) &udpw[1];
- udpw->udp = udp;
- udpw->msg_size = mlen;
- udpw->cont = cont;
- udpw->cont_cls = cont_cls;
+ "UDP transmits %u-byte message to `%s' using address `%s'\n",
+ msgbuf_size,
+ GNUNET_i2s (&s->target),
+ GNUNET_a2s(s->sock_addr, s->addrlen));
+ /* Message */
+ udp = (struct UDPMessage *) mbuf;
udp->header.size = htons (mlen);
udp->reserved = htonl (0);
udp->sender = *plugin->env->my_identity;
- memcpy (&udp[1], msgbuf, msgbuf_size);
- GNUNET_CONTAINER_DLL_insert(plugin->msg_head, plugin->msg_tail, udpw);
+ if (mlen <= UDP_MTU)
+ {
+ udpw = GNUNET_malloc (sizeof (struct UDPMessageWrapper) + mlen);
+ udpw->session = s;
+ udpw->udp = (struct UDPMessage *) &udpw[1];
+ udpw->msg_size = mlen;
+ udpw->cont = cont;
+ udpw->cont_cls = cont_cls;
+ udpw->frag = NULL;
+ memcpy (udpw->udp, udp, sizeof (struct UDPMessage));
+ memcpy (&udpw->udp[1], msgbuf, msgbuf_size);
+ GNUNET_CONTAINER_DLL_insert(plugin->msg_head, plugin->msg_tail, udpw);
+ }
+ else
+ {
+ "UDP has to fragment message \n");
+ memcpy (&udp[1], msgbuf, msgbuf_size);
+ struct FragmentationContext * frag_ctx = GNUNET_malloc(sizeof (struct FragmentationContext));
+ frag_ctx->plugin = plugin;
+ frag_ctx->session = s;
+ frag_ctx->cont = cont;
+ frag_ctx->cont_cls = cont_cls;
+ frag_ctx->bytes_to_send = mlen;
+ frag_ctx->frag = GNUNET_FRAGMENT_context_create (plugin->env->stats,
+ &plugin->tracker,
+ plugin->last_expected_delay,
+ &udp->header,
+ &enqueue_fragment,
+ frag_ctx);
+ GNUNET_CONTAINER_DLL_insert(s->head, s->tail, frag_ctx);
+ }
delta = GNUNET_TIME_absolute_get_remaining (s->flow_delay_from_other_peer);
return mlen;
+ * Scan the heap for a receive context with the given address.
+ *
+ * @param cls the 'struct FindReceiveContext'
+ * @param node internal node of the heap
+ * @param element value stored at the node (a 'struct ReceiveContext')
+ * @param cost cost associated with the node
+ * @return GNUNET_YES if we should continue to iterate,
+ * GNUNET_NO if not.
+ */
+static int
+find_receive_context (void *cls, struct GNUNET_CONTAINER_HeapNode *node,
+ void *element, GNUNET_CONTAINER_HeapCostType cost)
+ struct FindReceiveContext *frc = cls;
+ struct ReceiveContext *e = element;
+ if ((frc->addr_len == e->addr_len) &&
+ (0 == memcmp (frc->addr, e->src_addr, frc->addr_len)))
+ {
+ frc->rc = e;
+ return GNUNET_NO;
+ }
+ return GNUNET_YES;
+ * Process a defragmented message.
+ *
+ * @param cls the 'struct ReceiveContext'
+ * @param msg the message
+ */
+static void
+fragment_msg_proc (void *cls, const struct GNUNET_MessageHeader *msg)
+ struct ReceiveContext *rc = cls;
+ {
+ GNUNET_break (0);
+ return;
+ }
+ if (ntohs (msg->size) < sizeof (struct UDPMessage))
+ {
+ GNUNET_break (0);
+ return;
+ }
+ process_udp_message (rc->plugin, (const struct UDPMessage *) msg,
+ rc->src_addr, rc->addr_len);
+ * Transmit an acknowledgement.
+ *
+ * @param cls the 'struct ReceiveContext'
+ * @param id message ID (unused)
+ * @param msg ack to transmit
+ */
+static void
+ack_proc (void *cls, uint32_t id, const struct GNUNET_MessageHeader *msg)
+#if 0
+ struct ReceiveContext *rc = cls;
+ size_t msize = sizeof (struct UDP_ACK_Message) + ntohs (msg->size);
+ char buf[msize];
+ struct UDP_ACK_Message *udp_ack;
+ uint32_t delay = 0;
+ struct Session *s;
+ s = find_inbound_session_by_addr (rc->plugin, rc->src_addr, rc->addr_len);
+ if (s != NULL)
+ {
+ if (s->flow_delay_for_other_peer.rel_value <= UINT32_MAX)
+ delay = s->flow_delay_for_other_peer.rel_value;
+ else
+ delay = UINT32_MAX;
+ }
+ "Sending ACK to `%s' including delay of %u ms\n",
+ GNUNET_a2s (rc->src_addr,
+ (rc->src_addr->sa_family ==
+ AF_INET) ? sizeof (struct sockaddr_in) : sizeof (struct
+ sockaddr_in6)),
+ delay);
+ udp_ack = (struct UDP_ACK_Message *) buf;
+ udp_ack->header.size = htons ((uint16_t) msize);
+ udp_ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK);
+ udp_ack->delay = htonl (delay);
+ udp_ack->sender = *rc->plugin->env->my_identity;
+ memcpy (&udp_ack[1], msg, ntohs (msg->size));
+ (void) udp_send (rc->plugin, rc->src_addr, &udp_ack->header);
* Read and process a message from the given socket.
//const struct GNUNET_MessageHeader *ack;
//struct Session *peer_session;
//const struct UDP_ACK_Message *udp_ack;
- //struct ReceiveContext *rc;
- //struct GNUNET_TIME_Absolute now;
- //struct FindReceiveContext frc;
+ struct ReceiveContext *rc;
+ struct GNUNET_TIME_Absolute now;
+ struct FindReceiveContext frc;
//struct Session *s = NULL;
//struct GNUNET_TIME_Relative flow_delay;
//struct GNUNET_ATS_Information ats;
GNUNET_break_op (0);
process_udp_message (plugin, (const struct UDPMessage *) msg,
(const struct sockaddr *) addr, fromlen);
+ return;
+ if (ntohs (msg->size) <
+ sizeof (struct UDP_ACK_Message) + sizeof (struct GNUNET_MessageHeader))
+ {
+ GNUNET_break_op (0);
+ return;
+ }
+ /* TODO */
+ GNUNET_break_op (0);
+ return;
+ frc.rc = NULL;
+ frc.addr = (const struct sockaddr *) addr;
+ frc.addr_len = fromlen;
+ GNUNET_CONTAINER_heap_iterate (plugin->defrags,
+ &find_receive_context,
+ &frc);
+ now = GNUNET_TIME_absolute_get ();
+ rc = frc.rc;
+ if (rc == NULL)
+ {
+ /* need to create a new RC */
+ rc = GNUNET_malloc (sizeof (struct ReceiveContext) + fromlen);
+ memcpy (&rc[1], addr, fromlen);
+ rc->src_addr = (const struct sockaddr *) &rc[1];
+ rc->addr_len = fromlen;
+ rc->plugin = plugin;
+ rc->defrag =
+ GNUNET_DEFRAGMENT_context_create (plugin->env->stats, UDP_MTU,
+ &fragment_msg_proc, &ack_proc);
+ rc->hnode =
+ GNUNET_CONTAINER_heap_insert (plugin->defrags, rc,
+ now.abs_value);
+ }
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "UDP processes %u-byte fragment from `%s'\n",
+ (unsigned int) ntohs (msg->size),
+ GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
+ if (GNUNET_OK == GNUNET_DEFRAGMENT_process_fragment (rc->defrag, msg))
+ {
+ /* keep this 'rc' from expiring */
+ GNUNET_CONTAINER_heap_update_cost (plugin->defrags, rc->hnode,
+ now.abs_value);
+ }
+ if (GNUNET_CONTAINER_heap_get_size (plugin->defrags) >
+ {
+ /* remove 'rc' that was inactive the longest */
+ rc = GNUNET_CONTAINER_heap_remove_root (plugin->defrags);
+ GNUNET_assert (NULL != rc);
+ GNUNET_DEFRAGMENT_context_destroy (rc->defrag);
+ GNUNET_free (rc);
+ }
(unsigned int) ntohs (udpw->msg_size), GNUNET_a2s (sa, slen), (int) sent,
(sent < 0) ? STRERROR (errno) : "ok");
- if (udpw->cont != NULL)
- udpw->cont (udpw->cont_cls, &udpw->session->target, GNUNET_OK);
+ /* This was just a message fragment */
+ if (udpw->frag != NULL)
+ {
+ GNUNET_FRAGMENT_context_transmission_done (udpw->frag->frag);
+ }
+ /* This was a complete message*/
+ else
+ {
+ if (udpw->cont != NULL)
+ udpw->cont (udpw->cont_cls, &udpw->session->target, GNUNET_OK);
+ }
GNUNET_CONTAINER_DLL_remove(plugin->msg_head, plugin->msg_tail, udpw);
GNUNET_free (udpw);
plugin->sessions = GNUNET_CONTAINER_multihashmap_create (10);
plugin->mst = GNUNET_SERVER_mst_create (&process_inbound_tokenized_messages, plugin);
plugin->port = port;
plugin->aport = aport;
GNUNET_NETWORK_fdset_destroy (plugin->ws);
GNUNET_NAT_unregister (plugin->nat);
+ if (plugin->defrags != NULL)
+ {
+ GNUNET_CONTAINER_heap_destroy(plugin->defrags);
+ plugin->defrags = NULL;
+ }
if (plugin->mst != NULL)