#define INTERVAL_QUERY_PATHETIC 60
#define INTERVAL_QUERY_AGRESSIVE 5
-#define TRUSTLEVEL_BADPEER 6
+#define TRUSTLEVEL_BADPEER 6 /* bad if *less than* TRUSTLEVEL_BADPEER */
#define TRUSTLEVEL_PATHETIC 2
#define TRUSTLEVEL_AGRESSIVE 8
#define TRUSTLEVEL_MAX 10
}
static double
-gettime(void)
+gettime_fp(void)
{
struct timeval tv;
gettimeofday(&tv, NULL); /* never fails */
- return (tv.tv_sec + JAN_1970 + 1.0e-6 * tv.tv_usec);
+ return (tv.tv_sec + 1.0e-6 * tv.tv_usec + JAN_1970);
}
-
static void
d_to_tv(double d, struct timeval *tv)
{
errno = 0;
if (!from) {
- ret = sendto(fd, msg, len, 0, to, addrlen);
+ ret = sendto(fd, msg, len, MSG_DONTWAIT, to, addrlen);
} else {
- ret = send_to_from(fd, msg, len, 0, to, from, addrlen);
+ ret = send_to_from(fd, msg, len, MSG_DONTWAIT, to, from, addrlen);
}
if (ret != len) {
bb_perror_msg("send failed");
}
static int
-client_query(ntp_peer_t *p)
+send_query_to_peer(ntp_peer_t *p)
{
if (p->query.fd == -1) {
p->query.fd = xsocket(p->lsa->u.sa.sa_family, SOCK_DGRAM, 0);
p->query.msg.xmttime.int_partl = random();
p->query.msg.xmttime.fractionl = random();
- p->query.xmttime = gettime();
+ p->query.xmttime = gettime_fp();
if (sendmsg_wrap(p->query.fd, /*from:*/ NULL, /*to:*/ &p->lsa->u.sa, /*addrlen:*/ p->lsa->len,
&p->query.msg, NTP_MSGSIZE_NOAUTH) == -1) {
{
ntp_peer_t *p;
unsigned offset_cnt;
+ unsigned middle;
int i = 0;
ntp_peer_t **peers;
double offset_median;
offset_cnt++;
}
- peers = xzalloc(sizeof(ntp_peer_t *) * offset_cnt);
+ if (offset_cnt == 0)
+ goto clear_good;
+
+ peers = xzalloc(sizeof(peers[0]) * offset_cnt);
for (item = G.ntp_peers; item != NULL; item = item->link) {
p = (ntp_peer_t *) item->data;
if (p->trustlevel < TRUSTLEVEL_BADPEER)
peers[i++] = p;
}
- qsort(peers, offset_cnt, sizeof(ntp_peer_t *), offset_compare);
-
- if (offset_cnt != 0) {
- if ((offset_cnt & 1) == 0) {
-//TODO: try offset_cnt /= 2...
- offset_median =
- (peers[offset_cnt / 2 - 1]->update.offset +
- peers[offset_cnt / 2]->update.offset) / 2;
- G.status.rootdelay =
- (peers[offset_cnt / 2 - 1]->update.delay +
- peers[offset_cnt / 2]->update.delay) / 2;
- G.status.stratum = MAX(
- peers[offset_cnt / 2 - 1]->update.status.stratum,
- peers[offset_cnt / 2]->update.status.stratum);
- } else {
- offset_median = peers[offset_cnt / 2]->update.offset;
- G.status.rootdelay = peers[offset_cnt / 2]->update.delay;
- G.status.stratum = peers[offset_cnt / 2]->update.status.stratum;
- }
- G.status.leap = peers[offset_cnt / 2]->update.status.leap;
+ qsort(peers, offset_cnt, sizeof(peers[0]), offset_compare);
- bb_info_msg("adjusting local clock by %fs", offset_median);
+ middle = offset_cnt / 2;
+ if ((offset_cnt & 1) == 0) {
+ offset_median = (peers[middle-1]->update.offset + peers[middle]->update.offset) / 2;
+ G.status.rootdelay = (peers[middle-1]->update.delay + peers[middle]->update.delay) / 2;
+ G.status.stratum = MAX(peers[middle-1]->update.status.stratum, peers[middle]->update.status.stratum);
+ } else {
+ offset_median = peers[middle]->update.offset;
+ G.status.rootdelay = peers[middle]->update.delay;
+ G.status.stratum = peers[middle]->update.status.stratum;
+ }
+ G.status.leap = peers[middle]->update.status.leap;
- d_to_tv(offset_median, &tv);
- if (adjtime(&tv, &olddelta) == -1)
- bb_error_msg("adjtime failed");
- else if (!G.firstadj
- && olddelta.tv_sec == 0
- && olddelta.tv_usec == 0
- && !G.status.synced
- ) {
- bb_info_msg("clock synced");
- G.status.synced = 1;
- } else if (G.status.synced) {
- bb_info_msg("clock unsynced");
- G.status.synced = 0;
- }
+ bb_info_msg("adjusting local clock by %fs", offset_median);
- G.firstadj = 0;
- G.status.reftime = gettime();
- G.status.stratum++; /* one more than selected peer */
- G.scale = updated_scale(offset_median);
+ d_to_tv(offset_median, &tv);
+ if (adjtime(&tv, &olddelta) == -1)
+ bb_error_msg("adjtime failed");
+ else if (!G.firstadj
+ && olddelta.tv_sec == 0
+ && olddelta.tv_usec == 0
+ && !G.status.synced
+ ) {
+ bb_info_msg("clock synced");
+ G.status.synced = 1;
+ } else if (G.status.synced) {
+ bb_info_msg("clock unsynced");
+ G.status.synced = 0;
+ }
+
+ G.firstadj = 0;
+ G.status.reftime = gettime_fp();
+ G.status.stratum++; /* one more than selected peer */
+ G.scale = updated_scale(offset_median);
- G.status.refid4 = peers[offset_cnt / 2]->update.status.refid4;
+ G.status.refid4 = peers[middle]->update.status.refid4;
- lsa = peers[offset_cnt / 2]->lsa;
- G.status.refid =
+ lsa = peers[middle]->lsa;
+ G.status.refid =
#if ENABLE_FEATURE_IPV6
- lsa->u.sa.sa_family != AF_INET ?
- G.status.refid4 :
+ lsa->u.sa.sa_family != AF_INET ?
+ G.status.refid4 :
#endif
- lsa->u.sin.sin_addr.s_addr;
- }
+ lsa->u.sin.sin_addr.s_addr;
free(peers);
+ clear_good:
for (item = G.ntp_peers; item != NULL; item = item->link) {
p = (ntp_peer_t *) item->data;
p->update.good = 0;
}
static void
-client_update(ntp_peer_t *p)
+update_peer_data(ntp_peer_t *p)
{
int i, best = 0, good = 0;
}
static void
-client_dispatch(ntp_peer_t *p)
+recv_and_process_peer_pkt(ntp_peer_t *p)
{
char *addr;
ssize_t size;
addr = xmalloc_sockaddr2dotted_noport(&p->lsa->u.sa);
-//TODO: use MSG_DONTWAIT flag?
- size = recv(p->query.fd, &msg, sizeof(msg), 0);
+ size = recv(p->query.fd, &msg, sizeof(msg), MSG_DONTWAIT);
if (size == -1) {
bb_perror_msg("recv(%s) error", addr);
if (errno == EHOSTUNREACH || errno == EHOSTDOWN
|| errno == ENETUNREACH || errno == ENETDOWN
|| errno == ECONNREFUSED || errno == EADDRNOTAVAIL
+ || errno == EAGAIN
) {
//TODO: always do this?
set_next(p, error_interval());
xfunc_die();
}
- T4 = gettime();
+ T4 = gettime_fp();
if (size != NTP_MSGSIZE_NOAUTH && size != NTP_MSGSIZE) {
bb_error_msg("malformed packet received from %s", addr);
goto bail;
}
offset->error = (T2 - T1) - (T3 - T4);
+// Can we use (T4 - JAN_1970) instead of time(NULL)?
offset->rcvd = time(NULL);
offset->good = 1;
/* every received reply which we do not discard increases trust */
if (p->trustlevel < TRUSTLEVEL_MAX) {
- if (p->trustlevel < TRUSTLEVEL_BADPEER
- && p->trustlevel + 1 >= TRUSTLEVEL_BADPEER
- ) {
- bb_info_msg("peer %s now valid", addr);
- }
p->trustlevel++;
+ if (p->trustlevel == TRUSTLEVEL_BADPEER)
+ bb_info_msg("peer %s now valid", addr);
}
bb_info_msg("reply from %s: offset %f delay %f, next query %ds", addr,
offset->offset, offset->delay, (int) interval);
- client_update(p);
+ update_peer_data(p);
settime(offset->offset);
- if (++p->shift >= OFFSET_ARRAY_SIZE)
+ p->shift++;
+ if (p->shift >= OFFSET_ARRAY_SIZE)
p->shift = 0;
bail:
#if ENABLE_FEATURE_NTPD_SERVER
static void
-server_dispatch(int fd)
+recv_and_process_client_pkt(void /*int fd*/)
{
ssize_t size;
uint8_t version;
to = get_sock_lsa(G.listen_fd);
from = xzalloc(to->len);
-//TODO: use MGS_DONTWAIT flag?
- size = recv_from_to(fd, &msg, sizeof(msg), 0, from, &to->u.sa, to->len);
+ size = recv_from_to(G.listen_fd, &msg, sizeof(msg), MSG_DONTWAIT, from, &to->u.sa, to->len);
if (size != NTP_MSGSIZE_NOAUTH && size != NTP_MSGSIZE) {
char *addr;
- if (size < 0)
- bb_error_msg_and_die("recv_from_to");
+ if (size < 0) {
+ if (errno == EAGAIN)
+ goto bail;
+ bb_perror_msg_and_die("recv_from_to");
+ }
addr = xmalloc_sockaddr2dotted_noport(from);
bb_error_msg("malformed packet received from %s", addr);
free(addr);
msg.stratum = G.status.stratum;
msg.ppoll = query_ppoll;
msg.precision = G.status.precision;
- rectime = gettime();
+ rectime = gettime_fp();
msg.xmttime = msg.rectime = d_to_lfp(rectime);
msg.reftime = d_to_lfp(G.status.reftime);
- //msg.xmttime = d_to_lfp(gettime()); // = msg.rectime
+ //msg.xmttime = d_to_lfp(gettime_fp()); // = msg.rectime
msg.orgtime = query_xmttime;
msg.rootdelay = d_to_sfp(G.status.rootdelay);
version = (query_status & VERSION_MASK); /* ... >> VERSION_SHIFT - done below instead */
msg.refid = (version > (3 << VERSION_SHIFT)) ? G.status.refid4 : G.status.refid;
- /* We reply from the address packet was sent to,
+ /* We reply from the local address packet was sent to,
* this makes to/from look swapped here: */
- sendmsg_wrap(fd,
+ sendmsg_wrap(G.listen_fd,
/*from:*/ &to->u.sa, /*to:*/ from, /*addrlen:*/ to->len,
&msg, size);
ntp_init(argv);
{
- unsigned new_cnt = g.peer_cnt;
- idx2peer = xzalloc(sizeof(void *) * new_cnt);
+ unsigned cnt = g.peer_cnt;
/* if ENABLE_FEATURE_NTPD_SERVER, + 1 for listen_fd: */
- pfd = xzalloc(sizeof(pfd[0]) * (new_cnt + ENABLE_FEATURE_NTPD_SERVER));
+ idx2peer = xzalloc(sizeof(void *) * (cnt + ENABLE_FEATURE_NTPD_SERVER));
+ pfd = xzalloc(sizeof(pfd[0]) * (cnt + ENABLE_FEATURE_NTPD_SERVER));
}
while (!bb_got_signal) {
llist_t *item;
- unsigned i, j, first_peer_idx;
+ unsigned i, j;
unsigned sent_cnt, trial_cnt;
int nfds, timeout;
- time_t nextaction;
+ time_t cur_time, nextaction;
- nextaction = time(NULL) + 3600;
+ /* Nothing between here and poll() blocks for any significant time */
+
+ cur_time = time(NULL);
+ nextaction = cur_time + 3600;
i = 0;
#if ENABLE_FEATURE_NTPD_SERVER
i++;
}
#endif
- first_peer_idx = i;
+ /* Pass over peer list, send requests, time out on receives */
sent_cnt = trial_cnt = 0;
for (item = g.ntp_peers; item != NULL; item = item->link) {
ntp_peer_t *p = (ntp_peer_t *) item->data;
- if (p->next > 0 && p->next <= time(NULL)) {
+ if (p->next != 0 && p->next <= cur_time) {
+ /* Time to send new req */
trial_cnt++;
- if (client_query(p) == 0)
+ if (send_query_to_peer(p) == 0)
sent_cnt++;
}
- if (p->next > 0 && p->next < nextaction)
- nextaction = p->next;
- if (p->deadline > 0 && p->deadline < nextaction)
- nextaction = p->deadline;
-
- if (p->deadline > 0 && p->deadline <= time(NULL)) {
+ if (p->deadline != 0 && p->deadline <= cur_time) {
+ /* Timed out waiting for reply */
char *addr = xmalloc_sockaddr2dotted_noport(&p->lsa->u.sa);
timeout = error_interval();
set_next(p, timeout);
}
+ if (p->next != 0 && p->next < nextaction)
+ nextaction = p->next;
+ if (p->deadline != 0 && p->deadline < nextaction)
+ nextaction = p->deadline;
+
if (p->state == STATE_QUERY_SENT) {
+ /* Wait for reply from this peer */
pfd[i].fd = p->query.fd;
pfd[i].events = POLLIN;
- idx2peer[i - first_peer_idx] = p;
+ idx2peer[i] = p;
i++;
}
}
if ((trial_cnt > 0 && sent_cnt == 0) || g.peer_cnt == 0)
- settime(0); /* no good peers, don't wait */
+ settime(0); /* no good peers, don't wait */
- timeout = nextaction - time(NULL);
+ timeout = nextaction - cur_time;
if (timeout < 0)
timeout = 0;
+ /* Here we may block */
if (g.verbose)
bb_error_msg("entering poll %u secs", timeout);
nfds = poll(pfd, i, timeout * 1000);
+ if (nfds <= 0)
+ continue;
+ /* Process any received packets */
j = 0;
#if ENABLE_FEATURE_NTPD_SERVER
-//TODO: simplify. There is only one server fd!
- for (; nfds > 0 && j < first_peer_idx; j++) {
- if (pfd[j].revents & (POLLIN|POLLERR)) {
+ if (g.listen_fd != -1) {
+ if (pfd[0].revents /* & (POLLIN|POLLERR)*/) {
nfds--;
- server_dispatch(pfd[j].fd);
+ recv_and_process_client_pkt(/*g.listen_fd*/);
}
+ j = 1;
}
#endif
- for (; nfds > 0 && j < i; j++) {
- if (pfd[j].revents & (POLLIN|POLLERR)) {
+ for (; nfds != 0 && j < i; j++) {
+ if (pfd[j].revents /* & (POLLIN|POLLERR)*/) {
nfds--;
- client_dispatch(idx2peer[j - first_peer_idx]);
+ recv_and_process_peer_pkt(idx2peer[j]);
}
}
} /* while (!bb_got_signal) */