/**
* Where to store the result.
*/
- struct ReceiveContext *rc;
+ struct DefragContext *rc;
/**
* Address to find.
* Data structure to track defragmentation contexts based
* on the source of the UDP traffic.
*/
-struct ReceiveContext
+struct DefragContext
{
/**
void *element, GNUNET_CONTAINER_HeapCostType cost)
{
struct FindReceiveContext *frc = cls;
- struct ReceiveContext *e = element;
+ struct DefragContext *e = element;
if ((frc->addr_len == e->addr_len) &&
(0 == memcmp (frc->addr, e->src_addr, frc->addr_len)))
static void
fragment_msg_proc (void *cls, const struct GNUNET_MessageHeader *msg)
{
- struct ReceiveContext *rc = cls;
+ struct DefragContext *rc = cls;
if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE)
{
ack_proc (void *cls, uint32_t id, const struct GNUNET_MessageHeader *msg)
{
#if 0
- struct ReceiveContext *rc = cls;
+ struct DefragContext *rc = cls;
size_t msize = sizeof (struct UDP_ACK_Message) + ntohs (msg->size);
char buf[msize];
}
+static void read_process_msg (struct Plugin *plugin,
+ const struct GNUNET_MessageHeader *msg,
+ char *addr,
+ socklen_t fromlen)
+{
+ if (ntohs (msg->size) < sizeof (struct UDPMessage))
+ {
+ GNUNET_break_op (0);
+ return;
+ }
+ process_udp_message (plugin, (const struct UDPMessage *) msg,
+ (const struct sockaddr *) addr, fromlen);
+ return;
+}
+
+static void read_process_ack ()
+{
+ //const struct GNUNET_MessageHeader *ack;
+ //struct Session *peer_session;
+ //const struct UDP_ACK_Message *udp_ack;
+ //struct Session *s = NULL;
+ //struct GNUNET_TIME_Relative flow_delay;
+ //struct GNUNET_ATS_Information ats;
+ GNUNET_break_op (0);
+}
+
+static void read_process_fragment (struct Plugin *plugin,
+ const struct GNUNET_MessageHeader *msg,
+ char *addr,
+ socklen_t fromlen)
+{
+ struct DefragContext *d_ctx;
+ struct GNUNET_TIME_Absolute now;
+ struct FindReceiveContext frc;
+
+
+ frc.rc = NULL;
+ frc.addr = (const struct sockaddr *) addr;
+ frc.addr_len = fromlen;
+
+#if DEBUG_UDP
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "UDP processes %u-byte fragment from `%s'\n",
+ (unsigned int) ntohs (msg->size),
+ GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
+#endif
+
+ /* Lookup existing receive context for this address */
+ GNUNET_CONTAINER_heap_iterate (plugin->defrag_ctxs,
+ &find_receive_context,
+ &frc);
+ now = GNUNET_TIME_absolute_get ();
+ d_ctx = frc.rc;
+
+ if (d_ctx == NULL)
+ {
+ /* Create a new defragmentation context */
+ d_ctx = GNUNET_malloc (sizeof (struct DefragContext) + fromlen);
+ memcpy (&d_ctx[1], addr, fromlen);
+ d_ctx->src_addr = (const struct sockaddr *) &d_ctx[1];
+ d_ctx->addr_len = fromlen;
+ d_ctx->plugin = plugin;
+ d_ctx->defrag =
+ GNUNET_DEFRAGMENT_context_create (plugin->env->stats, UDP_MTU,
+ UDP_MAX_MESSAGES_IN_DEFRAG, d_ctx,
+ &fragment_msg_proc, &ack_proc);
+ d_ctx->hnode =
+ GNUNET_CONTAINER_heap_insert (plugin->defrag_ctxs, d_ctx,
+ (GNUNET_CONTAINER_HeapCostType)
+ now.abs_value);
+ }
+
+ if (GNUNET_OK == GNUNET_DEFRAGMENT_process_fragment (d_ctx->defrag, msg))
+ {
+ /* keep this 'rc' from expiring */
+ GNUNET_CONTAINER_heap_update_cost (plugin->defrag_ctxs, d_ctx->hnode,
+ (GNUNET_CONTAINER_HeapCostType)
+ now.abs_value);
+ }
+ if (GNUNET_CONTAINER_heap_get_size (plugin->defrag_ctxs) >
+ UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG)
+ {
+ /* remove 'rc' that was inactive the longest */
+ d_ctx = GNUNET_CONTAINER_heap_remove_root (plugin->defrag_ctxs);
+ GNUNET_assert (NULL != d_ctx);
+ GNUNET_DEFRAGMENT_context_destroy (d_ctx->defrag);
+ GNUNET_free (d_ctx);
+ }
+}
+
/**
* Read and process a message from the given socket.
*
char buf[65536];
ssize_t size;
const struct GNUNET_MessageHeader *msg;
- //const struct GNUNET_MessageHeader *ack;
- //struct Session *peer_session;
- //const struct UDP_ACK_Message *udp_ack;
- struct ReceiveContext *rc;
- struct GNUNET_TIME_Absolute now;
- struct FindReceiveContext frc;
- //struct Session *s = NULL;
- //struct GNUNET_TIME_Relative flow_delay;
- //struct GNUNET_ATS_Information ats;
fromlen = sizeof (addr);
memset (&addr, 0, sizeof (addr));
size = GNUNET_NETWORK_socket_recvfrom (rsock, buf, sizeof (buf),
(struct sockaddr *) &addr, &fromlen);
+
if (size < sizeof (struct GNUNET_MessageHeader))
{
GNUNET_break_op (0);
GNUNET_break_op (0);
return;
}
+
switch (ntohs (msg->type))
{
case GNUNET_MESSAGE_TYPE_TRANSPORT_BROADCAST_BEACON:
- {
- udp_broadcast_receive(plugin, &buf, size, addr, fromlen);
+ udp_broadcast_receive (plugin, &buf, size, addr, fromlen);
return;
- }
+
case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE:
- if (ntohs (msg->size) < sizeof (struct UDPMessage))
- {
- GNUNET_break_op (0);
- return;
- }
- process_udp_message (plugin, (const struct UDPMessage *) msg,
- (const struct sockaddr *) addr, fromlen);
+ read_process_msg (plugin, msg, addr, fromlen);
return;
+
case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK:
- if (ntohs (msg->size) <
- sizeof (struct UDP_ACK_Message) + sizeof (struct GNUNET_MessageHeader))
- {
- GNUNET_break_op (0);
- return;
- }
- /* TODO */
- GNUNET_break_op (0);
+ read_process_ack ();
return;
- case GNUNET_MESSAGE_TYPE_FRAGMENT:
- frc.rc = NULL;
- frc.addr = (const struct sockaddr *) addr;
- frc.addr_len = fromlen;
- GNUNET_CONTAINER_heap_iterate (plugin->defrags,
- &find_receive_context,
- &frc);
- now = GNUNET_TIME_absolute_get ();
- rc = frc.rc;
- if (rc == NULL)
- {
- /* need to create a new RC */
- rc = GNUNET_malloc (sizeof (struct ReceiveContext) + fromlen);
- memcpy (&rc[1], addr, fromlen);
- rc->src_addr = (const struct sockaddr *) &rc[1];
- rc->addr_len = fromlen;
- rc->plugin = plugin;
- rc->defrag =
- GNUNET_DEFRAGMENT_context_create (plugin->env->stats, UDP_MTU,
- UDP_MAX_MESSAGES_IN_DEFRAG, rc,
- &fragment_msg_proc, &ack_proc);
- rc->hnode =
- GNUNET_CONTAINER_heap_insert (plugin->defrags, rc,
- (GNUNET_CONTAINER_HeapCostType)
- now.abs_value);
- }
-#if DEBUG_UDP
- LOG (GNUNET_ERROR_TYPE_DEBUG, "UDP processes %u-byte fragment from `%s'\n",
- (unsigned int) ntohs (msg->size),
- GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
-#endif
-
- if (GNUNET_OK == GNUNET_DEFRAGMENT_process_fragment (rc->defrag, msg))
- {
- /* keep this 'rc' from expiring */
- GNUNET_CONTAINER_heap_update_cost (plugin->defrags, rc->hnode,
- (GNUNET_CONTAINER_HeapCostType)
- now.abs_value);
- }
- if (GNUNET_CONTAINER_heap_get_size (plugin->defrags) >
- UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG)
- {
- /* remove 'rc' that was inactive the longest */
- rc = GNUNET_CONTAINER_heap_remove_root (plugin->defrags);
- GNUNET_assert (NULL != rc);
- GNUNET_DEFRAGMENT_context_destroy (rc->defrag);
- GNUNET_free (rc);
- }
+ case GNUNET_MESSAGE_TYPE_FRAGMENT:
+ read_process_fragment (plugin, msg, addr, fromlen);
return;
+
default:
GNUNET_break_op (0);
return;
plugin->sessions = GNUNET_CONTAINER_multihashmap_create (10);
- plugin->defrags = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
+ plugin->defrag_ctxs = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
plugin->mst = GNUNET_SERVER_mst_create (&process_inbound_tokenized_messages, plugin);
plugin->port = port;
plugin->aport = aport;
GNUNET_NETWORK_fdset_destroy (plugin->ws);
GNUNET_NAT_unregister (plugin->nat);
- if (plugin->defrags != NULL)
+ if (plugin->defrag_ctxs != NULL)
{
- GNUNET_CONTAINER_heap_destroy(plugin->defrags);
- plugin->defrags = NULL;
+ GNUNET_CONTAINER_heap_destroy(plugin->defrag_ctxs);
+ plugin->defrag_ctxs = NULL;
}
if (plugin->mst != NULL)
{