* @author Christian Grothoff
*
* TODO:
- * - Need to defer forwarding messages until after CONNECT message
* - CHECK that 'address' being NULL in 'struct ForeignAddressList' is
* tolerated in the code everywhere (could not happen before)
*
*/
struct MessageQueue *messages_tail;
+ /**
+ * Buffer for at most one payload message used when we receive
+ * payload data before our PING-PONG has succeeded. We then
+ * store such messages in this intermediary buffer until the
+ * connection is fully up.
+ */
+ struct GNUNET_MessageHeader *pre_connect_message_buffer;
+
/**
* Context for peerinfo iteration.
* NULL after we are done processing peerinfo's information.
}
+
+
+/**
+ * Function that will be called if we receive some payload
+ * from another peer.
+ *
+ * @param message the payload
+ * @param n peer who claimed to be the sender
+ */
+static void
+handle_payload_message (const struct GNUNET_MessageHeader *message,
+ struct NeighbourList *n)
+{
+ struct InboundMessage *im;
+ struct TransportClient *cpos;
+ uint16_t msize;
+
+ msize = ntohs (message->size);
+ if (n->received_pong == GNUNET_NO)
+ {
+ GNUNET_free_non_null (n->pre_connect_message_buffer);
+ n->pre_connect_message_buffer = GNUNET_malloc (msize);
+ memcpy (n->pre_connect_message_buffer, message, msize);
+ return;
+ }
+#if DEBUG_TRANSPORT
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received message of type %u from `%4s', sending to all clients.\n",
+ ntohs (message->type),
+ GNUNET_i2s (&n->id));
+#endif
+ if (GNUNET_YES == GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker,
+ (ssize_t) msize))
+ {
+ n->quota_violation_count++;
+#if DEBUG_TRANSPORT
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Bandwidth quota (%u b/s) violation detected (total of %u).\n",
+ n->in_tracker.available_bytes_per_s__,
+ n->quota_violation_count);
+#endif
+ /* Discount 32k per violation */
+ GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker,
+ - 32 * 1024);
+ }
+ else
+ {
+ if (n->quota_violation_count > 0)
+ {
+ /* try to add 32k back */
+ GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker,
+ 32 * 1024);
+ n->quota_violation_count--;
+ }
+ }
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# payload received from other peers"),
+ msize,
+ GNUNET_NO);
+ /* transmit message to all clients */
+ im = GNUNET_malloc (sizeof (struct InboundMessage) + msize);
+ im->header.size = htons (sizeof (struct InboundMessage) + msize);
+ im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
+ im->latency = GNUNET_TIME_relative_hton (n->latency);
+ im->peer = n->id;
+ memcpy (&im[1], message, msize);
+ cpos = clients;
+ while (cpos != NULL)
+ {
+ transmit_to_client (cpos, &im->header, GNUNET_YES);
+ cpos = cpos->next;
+ }
+ GNUNET_free (im);
+}
+
+
/**
* Iterator over hash map entries. Checks if the given validation
* entry is for the same challenge as what is given in the PONG.
struct GNUNET_PeerIdentity target;
struct NeighbourList *n;
struct ForeignAddressList *fal;
+ struct GNUNET_MessageHeader *prem;
if (ve->challenge != challenge)
return GNUNET_YES;
{
n->received_pong = GNUNET_YES;
notify_clients_connect (&target, n->latency, n->distance);
+ if (NULL != (prem = n->pre_connect_message_buffer))
+ {
+ n->pre_connect_message_buffer = NULL;
+ handle_payload_message (prem, n);
+ GNUNET_free (prem);
+ }
}
if (n->retry_task != GNUNET_SCHEDULER_NO_TASK)
{
gettext_noop ("# active neighbours"),
-1,
GNUNET_NO);
+ GNUNET_free_non_null (n->pre_connect_message_buffer);
GNUNET_free (n);
}
{
struct TransportPlugin *plugin = cls;
struct ReadyList *service_context;
- struct TransportClient *cpos;
- struct InboundMessage *im;
struct ForeignAddressList *peer_address;
uint16_t msize;
struct NeighbourList *n;
handle_pong (plugin, message, peer, sender_address, sender_address_len);
break;
default:
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received message of type %u from `%4s', sending to all clients.\n",
- ntohs (message->type), GNUNET_i2s (peer));
-#endif
- if (GNUNET_YES == GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker,
- (ssize_t) msize))
- {
- n->quota_violation_count++;
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Bandwidth quota (%u b/s) violation detected (total of %u).\n",
- n->in_tracker.available_bytes_per_s__,
- n->quota_violation_count);
-#endif
- /* Discount 32k per violation */
- GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker,
- - 32 * 1024);
- }
- else
- {
- if (n->quota_violation_count > 0)
- {
- /* try to add 32k back */
- GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker,
- 32 * 1024);
- n->quota_violation_count--;
- }
- }
- GNUNET_STATISTICS_update (stats,
- gettext_noop ("# payload received from other peers"),
- msize,
- GNUNET_NO);
- /* transmit message to all clients */
- im = GNUNET_malloc (sizeof (struct InboundMessage) + msize);
- im->header.size = htons (sizeof (struct InboundMessage) + msize);
- im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
- im->latency = GNUNET_TIME_relative_hton (n->latency);
- im->peer = *peer;
- memcpy (&im[1], message, msize);
- cpos = clients;
- while (cpos != NULL)
- {
- transmit_to_client (cpos, &im->header, GNUNET_YES);
- cpos = cpos->next;
- }
- GNUNET_free (im);
+ handle_payload_message (message, n);
+ break;
}
}
ret = GNUNET_BANDWIDTH_tracker_get_delay (&n->in_tracker, 0);