struct GNUNET_ATS_Information ats;
- struct FragmentationContext * head;
- struct FragmentationContext * tail;
+ struct FragmentationContext * frag_ctx;
+
+// struct FragmentationContext * head;
+// struct FragmentationContext * tail;
};
* Length of 'src_addr'
*/
size_t addr_len;
-
- struct GNUNET_PeerIdentity id;
-
};
*/
void *cont_cls;
- struct FragmentationContext *frag;
+ struct FragmentationContext *frag_ctx;
};
{
struct Plugin *plugin = cls;
struct Session *s = value;
+ struct UDPMessageWrapper *udpw;
#if DEBUG_UDP
LOG (GNUNET_ERROR_TYPE_DEBUG,
GNUNET_i2s (&s->target),
GNUNET_a2s (s->sock_addr, s->addrlen));
#endif
- struct FragmentationContext *fctx = s->head;
- while (fctx != NULL)
+ plugin->env->session_end (plugin->env->cls, &s->target, s);
+
+ while (s->frag_ctx != NULL)
{
- GNUNET_FRAGMENT_context_destroy(fctx->frag);
- GNUNET_CONTAINER_DLL_remove(s->head, s->tail, fctx);
- GNUNET_free (fctx);
- fctx = s->head;
+ GNUNET_FRAGMENT_context_destroy(s->frag_ctx->frag);
+ GNUNET_free (s->frag_ctx);
+ s->frag_ctx = NULL;
}
- plugin->env->session_end (plugin->env->cls, &s->target, s);
+ udpw = plugin->msg_head;
+ while (udpw != NULL)
+ {
+ if (udpw->session == s)
+ {
+ GNUNET_CONTAINER_DLL_remove(plugin->msg_head, plugin->msg_tail, udpw);
+
+ if (udpw->cont != NULL)
+ udpw->cont (udpw->cont_cls, &s->target, GNUNET_SYSERR);
+ GNUNET_free (udpw);
+ }
+ udpw = plugin->msg_head;
+ }
GNUNET_assert (GNUNET_YES ==
GNUNET_CONTAINER_multihashmap_remove (plugin->sessions,
&s->target.hashPubKey,
s));
+
GNUNET_free (s);
return GNUNET_OK;
}
/* Clean up sessions */
GNUNET_CONTAINER_multihashmap_get_multiple (plugin->sessions, &target->hashPubKey, &disconnect_and_free_it, plugin);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "FREEED SESSIONS from peer `%s'\n", GNUNET_i2s (target));
}
static struct Session *
socklen_t s_addrlen = s->addrlen;
-#if VERBOSE
+#if VERBOSE_UDP
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Comparing address %s <-> %s\n",
udp_address_to_string (NULL, (void *) address->address, address->address_length),
GNUNET_a2s (s->sock_addr, s->addrlen));
struct SessionCompareContext cctx;
cctx.addr = address;
cctx.res = NULL;
-#if DEBUG_UDP
+#if VERBOSE_UDP
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Looking for existing session for peer `%s' `%s' \n", GNUNET_i2s (&address->peer), udp_address_to_string(NULL, address->address, address->address_length));
#endif
GNUNET_CONTAINER_multihashmap_get_multiple(plugin->sessions, &address->peer.hashPubKey, session_cmp_it, &cctx);
if (cctx.res != NULL)
{
-#if DEBUG_UDP
+#if VERBOSE_UDP
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Found existing session %p\n", cctx.res);
#endif
return cctx.res;
address->address,
address->address_length,
NULL, NULL);
-#if DEBUG_UDP
+#if VERBOSE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Creating new session %p for peer `%s' address `%s'\n",
s,
size_t msg_len = ntohs (msg->size);
-#if DEBUG_UDP
+#if VERBOSE_UDP
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Enqueuing fragment with %u bytes %u\n", msg_len , sizeof (struct UDPMessageWrapper));
#endif
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Enqueueing fragment with %u bytes %u\n", msg_len , sizeof (struct UDPMessageWrapper));
-
udpw = GNUNET_malloc (sizeof (struct UDPMessageWrapper) + msg_len);
udpw->session = frag_ctx->session;
udpw->cont = frag_ctx->cont;
udpw->cont_cls = frag_ctx->cont_cls;
udpw->timeout = frag_ctx->timeout;
- udpw->frag = frag_ctx;
+ udpw->frag_ctx = frag_ctx;
memcpy (udpw->udp, msg, msg_len);
GNUNET_CONTAINER_DLL_insert(plugin->msg_head, plugin->msg_tail, udpw);
return GNUNET_SYSERR;
}
- LOG (GNUNET_ERROR_TYPE_ERROR,
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
"UDP transmits %u-byte message to `%s' using address `%s'\n",
msgbuf_size,
GNUNET_i2s (&s->target),
udpw->timeout = GNUNET_TIME_absolute_add(GNUNET_TIME_absolute_get(), to);
udpw->cont = cont;
udpw->cont_cls = cont_cls;
- udpw->frag = NULL;
+ udpw->frag_ctx = NULL;
memcpy (udpw->udp, udp, sizeof (struct UDPMessage));
memcpy (&udpw->udp[sizeof (struct UDPMessage)], msgbuf, msgbuf_size);
}
else
{
- LOG (GNUNET_ERROR_TYPE_ERROR,
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
"UDP has to fragment message \n");
+ if (s->frag_ctx != NULL)
+ return GNUNET_SYSERR;
memcpy (&udp[1], msgbuf, msgbuf_size);
struct FragmentationContext * frag_ctx = GNUNET_malloc(sizeof (struct FragmentationContext));
&enqueue_fragment,
frag_ctx);
- GNUNET_CONTAINER_DLL_insert(s->head, s->tail, frag_ctx);
+ s->frag_ctx = frag_ctx;
}
&si->sender,
hdr,
(const struct GNUNET_ATS_Information *) &ats, 2,
- si->session,
+ NULL,
si->arg,
si->args);
si->session->flow_delay_for_other_peer = delay;
GNUNET_break (0);
return;
}
+ LOG (GNUNET_ERROR_TYPE_ERROR, "Sending fragment_msg_proc ms\n");
process_udp_message (rc->plugin, (const struct UDPMessage *) msg,
rc->src_addr, rc->addr_len);
}
struct LookupContext
{
- struct DefragContext *rc;
+ const struct sockaddr * addr;
+ size_t addrlen;
+
struct Session *res;
};
struct LookupContext *l_ctx = cls;
struct Session * s = value;
- if ((s->addrlen == l_ctx->rc->addr_len) &&
- (0 == memcmp (s->sock_addr, l_ctx->rc->src_addr, s->addrlen)))
+ if ((s->addrlen == l_ctx->addrlen) &&
+ (0 == memcmp (s->sock_addr, l_ctx->addr, s->addrlen)))
{
- LOG (GNUNET_ERROR_TYPE_ERROR,
- "YYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYY \n");
l_ctx->res = s;
return GNUNET_NO;
}
ack_proc (void *cls, uint32_t id, const struct GNUNET_MessageHeader *msg)
{
struct DefragContext *rc = cls;
+ LOG (GNUNET_ERROR_TYPE_ERROR, "Sending ACK ms\n");
size_t msize = sizeof (struct UDP_ACK_Message) + ntohs (msg->size);
struct UDP_ACK_Message *udp_ack;
struct Session *s;
struct LookupContext l_ctx;
- l_ctx.rc = rc;
+ l_ctx.addr = rc->src_addr;
+ l_ctx.addrlen = rc->addr_len;
l_ctx.res = NULL;
- GNUNET_CONTAINER_multihashmap_get_multiple(rc->plugin->sessions,
- &rc->id.hashPubKey,
+ GNUNET_CONTAINER_multihashmap_iterate (rc->plugin->sessions,
&lookup_session_by_addr_it,
&l_ctx);
s = l_ctx.res;
- if (s != NULL)
- {
- if (s->flow_delay_for_other_peer.rel_value <= UINT32_MAX)
- delay = s->flow_delay_for_other_peer.rel_value;
- else
- delay = UINT32_MAX;
- }
+ GNUNET_assert (s != NULL);
+
+ if (s->flow_delay_for_other_peer.rel_value <= UINT32_MAX)
+ delay = s->flow_delay_for_other_peer.rel_value;
#if DEBUG_UDP
LOG (GNUNET_ERROR_TYPE_DEBUG,
udpw = GNUNET_malloc (sizeof (struct UDPMessageWrapper) + msize);
udpw->cont = NULL;
udpw->cont_cls = NULL;
- udpw->frag = NULL;
+ udpw->frag_ctx = NULL;
udpw->msg_size = msize;
udpw->session = s;
udpw->timeout = GNUNET_TIME_absolute_get_forever();
return;
}
-static void read_process_ack ()
+static void read_process_ack (struct Plugin *plugin,
+ const struct GNUNET_MessageHeader *msg,
+ char *addr,
+ socklen_t fromlen)
{
- //const struct GNUNET_MessageHeader *ack;
- //struct Session *peer_session;
- //const struct UDP_ACK_Message *udp_ack;
- //struct Session *s = NULL;
- //struct GNUNET_TIME_Relative flow_delay;
- //struct GNUNET_ATS_Information ats;
- GNUNET_break_op (0);
+ const struct GNUNET_MessageHeader *ack;
+ const struct UDP_ACK_Message *udp_ack;
+ struct LookupContext l_ctx;
+ struct Session *s = NULL;
+ struct GNUNET_TIME_Relative flow_delay;
+
+ if (ntohs (msg->size) <
+ sizeof (struct UDP_ACK_Message) + sizeof (struct GNUNET_MessageHeader))
+ {
+ GNUNET_break_op (0);
+ return;
+ }
+
+ udp_ack = (const struct UDP_ACK_Message *) msg;
+
+ l_ctx.addr = (const struct sockaddr *) addr;
+ l_ctx.addrlen = fromlen;
+ l_ctx.res = NULL;
+ GNUNET_CONTAINER_multihashmap_iterate (plugin->sessions,
+ &lookup_session_by_addr_it,
+ &l_ctx);
+ s = l_ctx.res;
+ GNUNET_assert (s != NULL);
+
+ if (s != NULL)
+ {
+ flow_delay.rel_value = (uint64_t) ntohl (udp_ack->delay);
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "We received a sending delay of %llu\n",
+ flow_delay.rel_value);
+
+ s->flow_delay_from_other_peer =
+ GNUNET_TIME_relative_to_absolute (flow_delay);
+ }
+
+ ack = (const struct GNUNET_MessageHeader *) &udp_ack[1];
+ if (ntohs (ack->size) !=
+ ntohs (msg->size) - sizeof (struct UDP_ACK_Message))
+ {
+ GNUNET_break_op (0);
+ return;
+ }
+
+ if (GNUNET_OK != GNUNET_FRAGMENT_process_ack (s->frag_ctx->frag, ack))
+ {
+#if DEBUG_UDP
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "UDP processes %u-byte acknowledgement from `%s' at `%s'\n",
+ (unsigned int) ntohs (msg->size), GNUNET_i2s (&udp_ack->sender),
+ GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
+#endif
+ return;
+ }
+
+#if DEBUG_UDP
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "FULL MESSAGE ACKed\n",
+ (unsigned int) ntohs (msg->size), GNUNET_i2s (&udp_ack->sender),
+ GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
+#endif
+ plugin->last_expected_delay = GNUNET_FRAGMENT_context_destroy (s->frag_ctx->frag);
+
+ if (s->frag_ctx->cont != NULL)
+ s->frag_ctx->cont
+ (s->frag_ctx->cont_cls, &udp_ack->sender, GNUNET_OK);
+ GNUNET_free (s->frag_ctx);
+ s->frag_ctx = NULL;
+ return;
}
static void read_process_fragment (struct Plugin *plugin,
GNUNET_CONTAINER_heap_insert (plugin->defrag_ctxs, d_ctx,
(GNUNET_CONTAINER_HeapCostType)
now.abs_value);
+#if DEBUG_UDP
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Created new defragmentation context for %u-byte fragment from `%s'\n",
+ (unsigned int) ntohs (msg->size),
+ GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
+#endif
+ }
+ else
+ {
+#if DEBUG_UDP
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Found existing defragmentation context for %u-byte fragment from `%s'\n",
+ (unsigned int) ntohs (msg->size),
+ GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
+#endif
}
if (GNUNET_OK == GNUNET_DEFRAGMENT_process_fragment (d_ctx->defrag, msg))
return;
case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK:
- read_process_ack ();
+ read_process_ack (plugin, msg, addr, fromlen);;
return;
case GNUNET_MESSAGE_TYPE_FRAGMENT:
if (udpw->cont != NULL)
udpw->cont (udpw->cont_cls, &udpw->session->target, GNUNET_SYSERR);
- if (udpw->frag != NULL)
+ if (udpw->frag_ctx != NULL)
{
#if DEBUG_UDP
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Fragmendted message for peer `%s' with size %u timed out\n",
- GNUNET_i2s(&udpw->session->target), udpw->frag->bytes_to_send);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Fragmented message for peer `%s' with size %u timed out\n",
+ GNUNET_i2s(&udpw->session->target), udpw->frag_ctx->bytes_to_send);
#endif
- GNUNET_FRAGMENT_context_destroy(udpw->frag->frag);
+ GNUNET_FRAGMENT_context_destroy(udpw->frag_ctx->frag);
}
else
{
if (GNUNET_SYSERR == sent)
{
GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "sendto");
- LOG (GNUNET_ERROR_TYPE_ERROR,
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
"UDP transmitted %u-byte message to %s (%d: %s)\n",
- (unsigned int) ntohs (udpw->msg_size), GNUNET_a2s (sa, slen), (int) sent,
+ (unsigned int) (udpw->msg_size), GNUNET_a2s (sa, slen), (int) sent,
(sent < 0) ? STRERROR (errno) : "ok");
if (udpw->cont != NULL)
udpw->cont (udpw->cont_cls, &udpw->session->target, GNUNET_SYSERR);
}
LOG (GNUNET_ERROR_TYPE_DEBUG,
"UDP transmitted %u-byte message to %s (%d: %s)\n",
- (unsigned int) ntohs (udpw->msg_size), GNUNET_a2s (sa, slen), (int) sent,
+ (unsigned int) (udpw->msg_size), GNUNET_a2s (sa, slen), (int) sent,
(sent < 0) ? STRERROR (errno) : "ok");
/* This was just a message fragment */
- if (udpw->frag != NULL)
+ if (udpw->frag_ctx != NULL)
{
- GNUNET_FRAGMENT_context_transmission_done (udpw->frag->frag);
+ GNUNET_FRAGMENT_context_transmission_done (udpw->frag_ctx->frag);
}
/* This was a complete message*/
else
{
struct GNUNET_TRANSPORT_PluginFunctions *api = cls;
struct Plugin *plugin = api->cls;
-
+GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "libgnunet_plugin_transport_udp_done\n ");
stop_broadcast (plugin);
if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK)