WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Affero General Public License for more details.
-
+
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
SPDX-License-Identifier: AGPL3.0-or-later
-*/
+ */
/**
* @file src/fragmentation/fragmentation.c
* @brief library to help fragment messages
/**
* Absolute minimum delay we impose between sending and expecting ACK to arrive.
*/
-#define MIN_ACK_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 1)
+#define MIN_ACK_DELAY GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 1)
/**
* Fragmentation context.
*/
-struct GNUNET_FRAGMENT_Context
-{
+struct GNUNET_FRAGMENT_Context {
/**
* Statistics to use.
*/
* Target fragment size.
*/
uint16_t mtu;
-
};
* @return ack in human-readable format
*/
const char *
-GNUNET_FRAGMENT_print_ack (const struct GNUNET_MessageHeader *ack)
+GNUNET_FRAGMENT_print_ack(const struct GNUNET_MessageHeader *ack)
{
static char buf[128];
const struct FragmentAcknowledgement *fa;
- if (sizeof (struct FragmentAcknowledgement) !=
- htons (ack->size))
+ if (sizeof(struct FragmentAcknowledgement) !=
+ htons(ack->size))
return "<malformed ack>";
- fa = (const struct FragmentAcknowledgement *) ack;
- GNUNET_snprintf (buf,
- sizeof (buf),
- "%u-%llX",
- ntohl (fa->fragment_id),
- GNUNET_ntohll (fa->bits));
+ fa = (const struct FragmentAcknowledgement *)ack;
+ GNUNET_snprintf(buf,
+ sizeof(buf),
+ "%u-%llX",
+ ntohl(fa->fragment_id),
+ GNUNET_ntohll(fa->bits));
return buf;
}
* @param cls the `struct GNUNET_FRAGMENT_Context`
*/
static void
-transmit_next (void *cls)
+transmit_next(void *cls)
{
struct GNUNET_FRAGMENT_Context *fc = cls;
char msg[fc->mtu];
int wrap;
fc->task = NULL;
- GNUNET_assert (GNUNET_NO == fc->proc_busy);
+ GNUNET_assert(GNUNET_NO == fc->proc_busy);
if (0 == fc->acks)
return; /* all done */
/* calculate delay */
wrap = 0;
while (0 == (fc->acks & (1LLU << fc->next_transmission)))
- {
- fc->next_transmission = (fc->next_transmission + 1) % 64;
- wrap |= (0 == fc->next_transmission);
- }
+ {
+ fc->next_transmission = (fc->next_transmission + 1) % 64;
+ wrap |= (0 == fc->next_transmission);
+ }
bit = fc->next_transmission;
- size = ntohs (fc->msg->size);
- if (bit == size / (fc->mtu - sizeof (struct FragmentHeader)))
+ size = ntohs(fc->msg->size);
+ if (bit == size / (fc->mtu - sizeof(struct FragmentHeader)))
fsize =
- (size % (fc->mtu - sizeof (struct FragmentHeader))) +
- sizeof (struct FragmentHeader);
+ (size % (fc->mtu - sizeof(struct FragmentHeader))) +
+ sizeof(struct FragmentHeader);
else
fsize = fc->mtu;
if (NULL != fc->tracker)
- delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker,
- fsize);
+ delay = GNUNET_BANDWIDTH_tracker_get_delay(fc->tracker,
+ fsize);
else
delay = GNUNET_TIME_UNIT_ZERO;
if (delay.rel_value_us > 0)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Fragmentation logic delays transmission of next fragment by %s\n",
- GNUNET_STRINGS_relative_time_to_string (delay,
+ {
+ GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
+ "Fragmentation logic delays transmission of next fragment by %s\n",
+ GNUNET_STRINGS_relative_time_to_string(delay,
GNUNET_YES));
- fc->task = GNUNET_SCHEDULER_add_delayed (delay,
- &transmit_next,
- fc);
- return;
- }
+ fc->task = GNUNET_SCHEDULER_add_delayed(delay,
+ &transmit_next,
+ fc);
+ return;
+ }
fc->next_transmission = (fc->next_transmission + 1) % 64;
wrap |= (0 == fc->next_transmission);
while (0 == (fc->acks & (1LLU << fc->next_transmission)))
- {
- fc->next_transmission = (fc->next_transmission + 1) % 64;
- wrap |= (0 == fc->next_transmission);
- }
+ {
+ fc->next_transmission = (fc->next_transmission + 1) % 64;
+ wrap |= (0 == fc->next_transmission);
+ }
/* assemble fragmentation message */
- mbuf = (const char *) &fc[1];
- fh = (struct FragmentHeader *) msg;
- fh->header.size = htons (fsize);
- fh->header.type = htons (GNUNET_MESSAGE_TYPE_FRAGMENT);
- fh->fragment_id = htonl (fc->fragment_id);
+ mbuf = (const char *)&fc[1];
+ fh = (struct FragmentHeader *)msg;
+ fh->header.size = htons(fsize);
+ fh->header.type = htons(GNUNET_MESSAGE_TYPE_FRAGMENT);
+ fh->fragment_id = htonl(fc->fragment_id);
fh->total_size = fc->msg->size; /* already in big-endian */
- fh->offset = htons ((fc->mtu - sizeof (struct FragmentHeader)) * bit);
- GNUNET_memcpy (&fh[1], &mbuf[bit * (fc->mtu - sizeof (struct FragmentHeader))],
- fsize - sizeof (struct FragmentHeader));
+ fh->offset = htons((fc->mtu - sizeof(struct FragmentHeader)) * bit);
+ GNUNET_memcpy(&fh[1], &mbuf[bit * (fc->mtu - sizeof(struct FragmentHeader))],
+ fsize - sizeof(struct FragmentHeader));
if (NULL != fc->tracker)
- GNUNET_BANDWIDTH_tracker_consume (fc->tracker, fsize);
- GNUNET_STATISTICS_update (fc->stats,
- _("# fragments transmitted"),
- 1,
- GNUNET_NO);
+ GNUNET_BANDWIDTH_tracker_consume(fc->tracker, fsize);
+ GNUNET_STATISTICS_update(fc->stats,
+ _("# fragments transmitted"),
+ 1,
+ GNUNET_NO);
if (0 != fc->last_round.abs_value_us)
- GNUNET_STATISTICS_update (fc->stats,
- _("# fragments retransmitted"),
- 1,
- GNUNET_NO);
+ GNUNET_STATISTICS_update(fc->stats,
+ _("# fragments retransmitted"),
+ 1,
+ GNUNET_NO);
/* select next message to calculate delay */
bit = fc->next_transmission;
- size = ntohs (fc->msg->size);
- if (bit == size / (fc->mtu - sizeof (struct FragmentHeader)))
- fsize = size % (fc->mtu - sizeof (struct FragmentHeader));
+ size = ntohs(fc->msg->size);
+ if (bit == size / (fc->mtu - sizeof(struct FragmentHeader)))
+ fsize = size % (fc->mtu - sizeof(struct FragmentHeader));
else
fsize = fc->mtu;
if (NULL != fc->tracker)
- delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker,
- fsize);
+ delay = GNUNET_BANDWIDTH_tracker_get_delay(fc->tracker,
+ fsize);
else
delay = GNUNET_TIME_UNIT_ZERO;
if (fc->num_rounds < 64)
- delay = GNUNET_TIME_relative_max (delay,
- GNUNET_TIME_relative_saturating_multiply
- (fc->msg_delay,
+ delay = GNUNET_TIME_relative_max(delay,
+ GNUNET_TIME_relative_saturating_multiply
+ (fc->msg_delay,
(1ULL << fc->num_rounds)));
else
delay = GNUNET_TIME_UNIT_FOREVER_REL;
if (wrap)
- {
- /* full round transmitted wait 2x delay for ACK before going again */
- fc->num_rounds++;
- delay = GNUNET_TIME_relative_saturating_multiply (fc->ack_delay, 2);
- /* never use zero, need some time for ACK always */
- delay = GNUNET_TIME_relative_max (MIN_ACK_DELAY, delay);
- fc->wack = GNUNET_YES;
- fc->last_round = GNUNET_TIME_absolute_get ();
- GNUNET_STATISTICS_update (fc->stats,
- _("# fragments wrap arounds"),
- 1,
- GNUNET_NO);
- }
+ {
+ /* full round transmitted wait 2x delay for ACK before going again */
+ fc->num_rounds++;
+ delay = GNUNET_TIME_relative_saturating_multiply(fc->ack_delay, 2);
+ /* never use zero, need some time for ACK always */
+ delay = GNUNET_TIME_relative_max(MIN_ACK_DELAY, delay);
+ fc->wack = GNUNET_YES;
+ fc->last_round = GNUNET_TIME_absolute_get();
+ GNUNET_STATISTICS_update(fc->stats,
+ _("# fragments wrap arounds"),
+ 1,
+ GNUNET_NO);
+ }
fc->proc_busy = GNUNET_YES;
- fc->delay_until = GNUNET_TIME_relative_to_absolute (delay);
+ fc->delay_until = GNUNET_TIME_relative_to_absolute(delay);
fc->num_transmissions++;
- fc->proc (fc->proc_cls,
- &fh->header);
+ fc->proc(fc->proc_cls,
+ &fh->header);
}
* @return the fragmentation context
*/
struct GNUNET_FRAGMENT_Context *
-GNUNET_FRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats,
- uint16_t mtu,
- struct GNUNET_BANDWIDTH_Tracker *tracker,
- struct GNUNET_TIME_Relative msg_delay,
- struct GNUNET_TIME_Relative ack_delay,
- const struct GNUNET_MessageHeader *msg,
- GNUNET_FRAGMENT_MessageProcessor proc,
- void *proc_cls)
+GNUNET_FRAGMENT_context_create(struct GNUNET_STATISTICS_Handle *stats,
+ uint16_t mtu,
+ struct GNUNET_BANDWIDTH_Tracker *tracker,
+ struct GNUNET_TIME_Relative msg_delay,
+ struct GNUNET_TIME_Relative ack_delay,
+ const struct GNUNET_MessageHeader *msg,
+ GNUNET_FRAGMENT_MessageProcessor proc,
+ void *proc_cls)
{
struct GNUNET_FRAGMENT_Context *fc;
size_t size;
uint64_t bits;
- GNUNET_STATISTICS_update (stats,
- _("# messages fragmented"),
- 1,
- GNUNET_NO);
- GNUNET_assert (mtu >= 1024 + sizeof (struct FragmentHeader));
- size = ntohs (msg->size);
- GNUNET_STATISTICS_update (stats,
- _("# total size of fragmented messages"),
- size, GNUNET_NO);
- GNUNET_assert (size >= sizeof (struct GNUNET_MessageHeader));
- fc = GNUNET_malloc (sizeof (struct GNUNET_FRAGMENT_Context) + size);
+ GNUNET_STATISTICS_update(stats,
+ _("# messages fragmented"),
+ 1,
+ GNUNET_NO);
+ GNUNET_assert(mtu >= 1024 + sizeof(struct FragmentHeader));
+ size = ntohs(msg->size);
+ GNUNET_STATISTICS_update(stats,
+ _("# total size of fragmented messages"),
+ size, GNUNET_NO);
+ GNUNET_assert(size >= sizeof(struct GNUNET_MessageHeader));
+ fc = GNUNET_malloc(sizeof(struct GNUNET_FRAGMENT_Context) + size);
fc->stats = stats;
fc->mtu = mtu;
fc->tracker = tracker;
fc->ack_delay = ack_delay;
fc->msg_delay = msg_delay;
- fc->msg = (const struct GNUNET_MessageHeader *) &fc[1];
+ fc->msg = (const struct GNUNET_MessageHeader *)&fc[1];
fc->proc = proc;
fc->proc_cls = proc_cls;
fc->fragment_id =
- GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
- UINT32_MAX);
- GNUNET_memcpy (&fc[1], msg, size);
+ GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK,
+ UINT32_MAX);
+ GNUNET_memcpy(&fc[1], msg, size);
bits =
- (size + mtu - sizeof (struct FragmentHeader) - 1) / (mtu -
- sizeof (struct
- FragmentHeader));
- GNUNET_assert (bits <= 64);
+ (size + mtu - sizeof(struct FragmentHeader) - 1) / (mtu -
+ sizeof(struct
+ FragmentHeader));
+ GNUNET_assert(bits <= 64);
if (bits == 64)
fc->acks_mask = UINT64_MAX; /* set all 64 bit */
else
fc->acks_mask = (1LLU << bits) - 1; /* set lowest 'bits' bit */
fc->acks = fc->acks_mask;
- fc->task = GNUNET_SCHEDULER_add_now (&transmit_next, fc);
+ fc->task = GNUNET_SCHEDULER_add_now(&transmit_next, fc);
return fc;
}
* @param fc fragmentation context
*/
void
-GNUNET_FRAGMENT_context_transmission_done (struct GNUNET_FRAGMENT_Context *fc)
+GNUNET_FRAGMENT_context_transmission_done(struct GNUNET_FRAGMENT_Context *fc)
{
- GNUNET_assert (fc->proc_busy == GNUNET_YES);
+ GNUNET_assert(fc->proc_busy == GNUNET_YES);
fc->proc_busy = GNUNET_NO;
- GNUNET_assert (fc->task == NULL);
+ GNUNET_assert(fc->task == NULL);
fc->task =
- GNUNET_SCHEDULER_add_at (fc->delay_until,
- &transmit_next,
- fc);
+ GNUNET_SCHEDULER_add_at(fc->delay_until,
+ &transmit_next,
+ fc);
}
* #GNUNET_SYSERR if this ack is not valid for this fc
*/
int
-GNUNET_FRAGMENT_process_ack (struct GNUNET_FRAGMENT_Context *fc,
- const struct GNUNET_MessageHeader *msg)
+GNUNET_FRAGMENT_process_ack(struct GNUNET_FRAGMENT_Context *fc,
+ const struct GNUNET_MessageHeader *msg)
{
const struct FragmentAcknowledgement *fa;
uint64_t abits;
unsigned int snd_cnt;
unsigned int i;
- if (sizeof (struct FragmentAcknowledgement) != ntohs (msg->size))
- {
- GNUNET_break_op (0);
- return GNUNET_SYSERR;
- }
- fa = (const struct FragmentAcknowledgement *) msg;
- if (ntohl (fa->fragment_id) != fc->fragment_id)
- return GNUNET_SYSERR; /* not our ACK */
- abits = GNUNET_ntohll (fa->bits);
- if ( (GNUNET_YES == fc->wack) &&
- (0 != fc->num_transmissions) )
- {
- /* normal ACK, can update running average of delay... */
- fc->wack = GNUNET_NO;
- ndelay = GNUNET_TIME_absolute_get_duration (fc->last_round);
- fc->ack_delay.rel_value_us =
- (ndelay.rel_value_us / fc->num_transmissions + 3 * fc->ack_delay.rel_value_us) / 4;
- /* calculate ratio msg sent vs. msg acked */
- ack_cnt = 0;
- snd_cnt = 0;
- for (i=0;i<64;i++)
- {
- if (1 == (fc->acks_mask & (1ULL << i)))
- {
- snd_cnt++;
- if (0 == (abits & (1ULL << i)))
- ack_cnt++;
- }
- }
- if (0 == ack_cnt)
+ if (sizeof(struct FragmentAcknowledgement) != ntohs(msg->size))
{
- /* complete loss */
- fc->msg_delay = GNUNET_TIME_relative_saturating_multiply (fc->msg_delay,
- snd_cnt);
+ GNUNET_break_op(0);
+ return GNUNET_SYSERR;
}
- else if (snd_cnt > ack_cnt)
+ fa = (const struct FragmentAcknowledgement *)msg;
+ if (ntohl(fa->fragment_id) != fc->fragment_id)
+ return GNUNET_SYSERR; /* not our ACK */
+ abits = GNUNET_ntohll(fa->bits);
+ if ((GNUNET_YES == fc->wack) &&
+ (0 != fc->num_transmissions))
{
- /* some loss, slow down proportionally */
- fc->msg_delay.rel_value_us = ((fc->msg_delay.rel_value_us * ack_cnt) / snd_cnt);
+ /* normal ACK, can update running average of delay... */
+ fc->wack = GNUNET_NO;
+ ndelay = GNUNET_TIME_absolute_get_duration(fc->last_round);
+ fc->ack_delay.rel_value_us =
+ (ndelay.rel_value_us / fc->num_transmissions + 3 * fc->ack_delay.rel_value_us) / 4;
+ /* calculate ratio msg sent vs. msg acked */
+ ack_cnt = 0;
+ snd_cnt = 0;
+ for (i = 0; i < 64; i++)
+ {
+ if (1 == (fc->acks_mask & (1ULL << i)))
+ {
+ snd_cnt++;
+ if (0 == (abits & (1ULL << i)))
+ ack_cnt++;
+ }
+ }
+ if (0 == ack_cnt)
+ {
+ /* complete loss */
+ fc->msg_delay = GNUNET_TIME_relative_saturating_multiply(fc->msg_delay,
+ snd_cnt);
+ }
+ else if (snd_cnt > ack_cnt)
+ {
+ /* some loss, slow down proportionally */
+ fc->msg_delay.rel_value_us = ((fc->msg_delay.rel_value_us * ack_cnt) / snd_cnt);
+ }
+ else if (snd_cnt == ack_cnt)
+ {
+ fc->msg_delay.rel_value_us =
+ (ndelay.rel_value_us / fc->num_transmissions + 3 * fc->msg_delay.rel_value_us) / 5;
+ }
+ fc->num_transmissions = 0;
+ fc->msg_delay = GNUNET_TIME_relative_min(fc->msg_delay,
+ GNUNET_TIME_UNIT_SECONDS);
+ fc->ack_delay = GNUNET_TIME_relative_min(fc->ack_delay,
+ GNUNET_TIME_UNIT_SECONDS);
}
- else if (snd_cnt == ack_cnt)
+ GNUNET_STATISTICS_update(fc->stats,
+ _("# fragment acknowledgements received"),
+ 1,
+ GNUNET_NO);
+ if (abits != (fc->acks & abits))
{
- fc->msg_delay.rel_value_us =
- (ndelay.rel_value_us / fc->num_transmissions + 3 * fc->msg_delay.rel_value_us) / 5;
+ /* ID collission or message reordering, count! This should be rare! */
+ GNUNET_STATISTICS_update(fc->stats,
+ _("# bits removed from fragmentation ACKs"), 1,
+ GNUNET_NO);
}
- fc->num_transmissions = 0;
- fc->msg_delay = GNUNET_TIME_relative_min (fc->msg_delay,
- GNUNET_TIME_UNIT_SECONDS);
- fc->ack_delay = GNUNET_TIME_relative_min (fc->ack_delay,
- GNUNET_TIME_UNIT_SECONDS);
- }
- GNUNET_STATISTICS_update (fc->stats,
- _("# fragment acknowledgements received"),
- 1,
- GNUNET_NO);
- if (abits != (fc->acks & abits))
- {
- /* ID collission or message reordering, count! This should be rare! */
- GNUNET_STATISTICS_update (fc->stats,
- _("# bits removed from fragmentation ACKs"), 1,
- GNUNET_NO);
- }
fc->acks = abits & fc->acks_mask;
if (0 != fc->acks)
- {
- /* more to transmit, do so right now (if tracker permits...) */
- if (fc->task != NULL)
- {
- /* schedule next transmission now, no point in waiting... */
- GNUNET_SCHEDULER_cancel (fc->task);
- fc->task = GNUNET_SCHEDULER_add_now (&transmit_next, fc);
- }
- else
{
- /* only case where there is no task should be if we're waiting
- * for the right to transmit again (proc_busy set to YES) */
- GNUNET_assert (GNUNET_YES == fc->proc_busy);
+ /* more to transmit, do so right now (if tracker permits...) */
+ if (fc->task != NULL)
+ {
+ /* schedule next transmission now, no point in waiting... */
+ GNUNET_SCHEDULER_cancel(fc->task);
+ fc->task = GNUNET_SCHEDULER_add_now(&transmit_next, fc);
+ }
+ else
+ {
+ /* only case where there is no task should be if we're waiting
+ * for the right to transmit again (proc_busy set to YES) */
+ GNUNET_assert(GNUNET_YES == fc->proc_busy);
+ }
+ return GNUNET_NO;
}
- return GNUNET_NO;
- }
/* all done */
- GNUNET_STATISTICS_update (fc->stats,
- _("# fragmentation transmissions completed"),
- 1,
- GNUNET_NO);
+ GNUNET_STATISTICS_update(fc->stats,
+ _("# fragmentation transmissions completed"),
+ 1,
+ GNUNET_NO);
if (NULL != fc->task)
- {
- GNUNET_SCHEDULER_cancel (fc->task);
- fc->task = NULL;
- }
+ {
+ GNUNET_SCHEDULER_cancel(fc->task);
+ fc->task = NULL;
+ }
return GNUNET_OK;
}
* last message, set to FOREVER if the message was not fully transmitted (OUT only)
*/
void
-GNUNET_FRAGMENT_context_destroy (struct GNUNET_FRAGMENT_Context *fc,
- struct GNUNET_TIME_Relative *msg_delay,
- struct GNUNET_TIME_Relative *ack_delay)
+GNUNET_FRAGMENT_context_destroy(struct GNUNET_FRAGMENT_Context *fc,
+ struct GNUNET_TIME_Relative *msg_delay,
+ struct GNUNET_TIME_Relative *ack_delay)
{
if (fc->task != NULL)
- GNUNET_SCHEDULER_cancel (fc->task);
+ GNUNET_SCHEDULER_cancel(fc->task);
if (NULL != ack_delay)
*ack_delay = fc->ack_delay;
if (NULL != msg_delay)
- *msg_delay = GNUNET_TIME_relative_saturating_multiply (fc->msg_delay,
- fc->num_rounds);
- GNUNET_free (fc);
+ *msg_delay = GNUNET_TIME_relative_saturating_multiply(fc->msg_delay,
+ fc->num_rounds);
+ GNUNET_free(fc);
}