Skip to content

Commit

Permalink
fix: make talosctl time work with PTP time sync
Browse files Browse the repository at this point in the history
Fix query function used by CLI to match the time syncer behavior.

Signed-off-by: Dmitry Sharshakov <[email protected]>
  • Loading branch information
dsseng committed Dec 19, 2024
1 parent f756043 commit cee6c60
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 29 deletions.
30 changes: 22 additions & 8 deletions internal/app/machined/internal/server/v1alpha1/v1alpha1_time.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ import (
"fmt"
"time"

"github.com/beevik/ntp"
ntpclient "github.com/beevik/ntp"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/emptypb"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/siderolabs/talos/internal/pkg/ntp"
timeapi "github.com/siderolabs/talos/pkg/machinery/api/time"
"github.com/siderolabs/talos/pkg/machinery/config"
"github.com/siderolabs/talos/pkg/machinery/constants"
Expand Down Expand Up @@ -51,21 +52,34 @@ func (r *TimeServer) Time(ctx context.Context, in *emptypb.Empty) (reply *timeap

// TimeCheck issues a query to the specified ntp server and displays the results.
func (r *TimeServer) TimeCheck(ctx context.Context, in *timeapi.TimeRequest) (reply *timeapi.TimeResponse, err error) {
rt, err := ntp.Query(in.Server)
if err != nil {
return nil, fmt.Errorf("error querying NTP server %q: %w", in.Server, err)
}
var t time.Time

if ntp.IsPTPDevice(in.Server) {
ts, err := ntp.QueryPTPDevice(in.Server)
if err != nil {
return nil, fmt.Errorf("error querying PTP device %q: %w", in.Server, err)
}

t = time.Unix(ts.Sec, ts.Nsec)
} else {
rt, err := ntpclient.Query(in.Server)
if err != nil {
return nil, fmt.Errorf("error querying NTP server %q: %w", in.Server, err)
}

if err = rt.Validate(); err != nil {
return nil, fmt.Errorf("error validating NTP response: %w", err)
}

if err = rt.Validate(); err != nil {
return nil, fmt.Errorf("error validating NTP response: %w", err)
t = rt.Time
}

return &timeapi.TimeResponse{
Messages: []*timeapi.Time{
{
Server: in.Server,
Localtime: timestamppb.New(time.Now()),
Remotetime: timestamppb.New(rt.Time),
Remotetime: timestamppb.New(t),
},
},
}, nil
Expand Down
53 changes: 32 additions & 21 deletions internal/pkg/ntp/ntp.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,15 +299,16 @@ func (syncer *Syncer) query(ctx context.Context) (lastSyncServer string, measure
return lastSyncServer, measurement, err
}

func (syncer *Syncer) isPTPDevice(server string) bool {
// IsPTPDevice checks if a given server string represents a PTP device.
func IsPTPDevice(server string) bool {
return strings.HasPrefix(server, "/dev/")
}

func (syncer *Syncer) resolveServers(ctx context.Context) ([]string, error) {
var serverList []string

for _, server := range syncer.getTimeServers() {
if syncer.isPTPDevice(server) {
if IsPTPDevice(server) {
serverList = append(serverList, server)
} else {
ips, err := net.LookupIP(server)
Expand All @@ -331,19 +332,43 @@ func (syncer *Syncer) resolveServers(ctx context.Context) ([]string, error) {
}

func (syncer *Syncer) queryServer(server string) (*Measurement, error) {
if syncer.isPTPDevice(server) {
if IsPTPDevice(server) {
return syncer.queryPTP(server)
}

return syncer.queryNTP(server)
}

func (syncer *Syncer) queryPTP(server string) (*Measurement, error) {
phc, err := os.Open(server)
func (syncer *Syncer) queryPTP(device string) (*Measurement, error) {
ts, err := QueryPTPDevice(device)
if err != nil {
return nil, err
}

offset := time.Until(time.Unix(ts.Sec, ts.Nsec))
syncer.logger.Debug("PTP clock",
zap.Duration("clock_offset", offset),
zap.Int64("sec", ts.Sec),
zap.Int64("nsec", ts.Nsec),
zap.String("device", device),
)

meas := &Measurement{
ClockOffset: offset,
Leap: 0,
Spike: false,
}

return meas, err
}

// QueryPTPDevice queries PTP device for current time.
func QueryPTPDevice(device string) (unix.Timespec, error) {
phc, err := os.Open(device)
if err != nil {
return unix.Timespec{}, err
}

defer phc.Close() //nolint:errcheck

// From clock_gettime(2):
Expand All @@ -360,24 +385,10 @@ func (syncer *Syncer) queryPTP(server string) (*Measurement, error) {

err = unix.ClockGettime(clockid, &ts)
if err != nil {
return nil, err
return unix.Timespec{}, err
}

offset := time.Until(time.Unix(ts.Sec, ts.Nsec))
syncer.logger.Debug("PTP clock",
zap.Duration("clock_offset", offset),
zap.Int64("sec", ts.Sec),
zap.Int64("nsec", ts.Nsec),
zap.String("device", server),
)

meas := &Measurement{
ClockOffset: offset,
Leap: 0,
Spike: false,
}

return meas, err
return ts, err
}

func (syncer *Syncer) queryNTP(server string) (*Measurement, error) {
Expand Down

0 comments on commit cee6c60

Please sign in to comment.