Skip to content

Commit

Permalink
Add log-rotation-size option
Browse files Browse the repository at this point in the history
  • Loading branch information
yamatcha committed Nov 20, 2024
1 parent c3c26ee commit 6d7ad28
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 12 deletions.
18 changes: 17 additions & 1 deletion cmd/moco-agent/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ var config struct {
connIdleTime time.Duration
connectionTimeout time.Duration
logRotationSchedule string
logRotationSize int64
readTimeout time.Duration
maxDelayThreshold time.Duration
socketPath string
Expand Down Expand Up @@ -127,7 +128,7 @@ var rootCmd = &cobra.Command{
metrics.Init(registry, clusterName, index)

c := cron.New(cron.WithLogger(rLogger.WithName("cron")))
if _, err := c.AddFunc(config.logRotationSchedule, agent.RotateLog); err != nil {
if _, err := c.AddFunc(config.logRotationSchedule, func() { agent.RotateErrorLog(); agent.RotateSlowLog() }); err != nil {
rLogger.Error(err, "failed to parse the cron spec", "spec", config.logRotationSchedule)
return err
}
Expand All @@ -142,6 +143,20 @@ var rootCmd = &cobra.Command{
}
}()

// Rotate if the file size exceeds logRotationSize
ticker := time.NewTicker(time.Second)
if config.logRotationSize > 0 {
go func() {
select {
case <-ctx.Done():
return
case <-ticker.C:
agent.RotateLogIfSizeExceeded(config.logRotationSize)
}
}()
defer ticker.Stop()
}

reloader, err := cert.NewReloader(config.grpcCertDir, rLogger.WithName("cert-reloader"))
if err != nil {
return err
Expand Down Expand Up @@ -239,6 +254,7 @@ func init() {
fs.DurationVar(&config.connIdleTime, "max-idle-time", 30*time.Second, "The maximum amount of time a connection may be idle")
fs.DurationVar(&config.connectionTimeout, "connection-timeout", 5*time.Second, "Dial timeout")
fs.StringVar(&config.logRotationSchedule, "log-rotation-schedule", logRotationScheduleDefault, "Cron format schedule for MySQL log rotation")
fs.Int64Var(&config.logRotationSize, "log-rotation-size", 0, "Rotate MySQL log files when it exceeds the specified size in bytes.")
fs.DurationVar(&config.readTimeout, "read-timeout", 30*time.Second, "I/O read timeout")
fs.DurationVar(&config.maxDelayThreshold, "max-delay", time.Minute, "Acceptable max commit delay considering as ready; the zero value accepts any delay")
fs.StringVar(&config.socketPath, "socket-path", socketPathDefault, "Path of mysqld socket file.")
Expand Down
1 change: 1 addition & 0 deletions docs/moco-agent.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Flags:
--grpc-cert-dir string gRPC certificate directory (default "/grpc-cert")
-h, --help help for moco-agent
--log-rotation-schedule string Cron format schedule for MySQL log rotation (default "*/5 * * * *")
--log-rotation-size int Rotate MySQL log files when it exceeds the specified size in bytes.
--logfile string Log filename
--logformat string Log format [plain,logfmt,json]
--loglevel string Log level [critical,error,warning,info,debug]
Expand Down
48 changes: 43 additions & 5 deletions server/rotate.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (
"github.com/cybozu-go/moco-agent/metrics"
)

// RotateLog rotates log files
func (a *Agent) RotateLog() {
// RotateLog rotates error log files
func (a *Agent) RotateErrorLog() {
ctx := context.Background()

metrics.LogRotationCount.Inc()
Expand All @@ -25,20 +25,58 @@ func (a *Agent) RotateLog() {
return
}

if _, err := a.db.ExecContext(ctx, "FLUSH LOCAL ERROR LOGS"); err != nil {
a.logger.Error(err, "failed to exec FLUSH LOCAL ERROR LOGS")
metrics.LogRotationFailureCount.Inc()
return
}

durationSeconds := time.Since(startTime).Seconds()
metrics.LogRotationDurationSeconds.Observe(durationSeconds)
}

// RotateLog rotates slow log files
func (a *Agent) RotateSlowLog() {
ctx := context.Background()

metrics.LogRotationCount.Inc()
startTime := time.Now()

slowFile := filepath.Join(a.logDir, mocoagent.MySQLSlowLogName)
err = os.Rename(slowFile, slowFile+".0")
err := os.Rename(slowFile, slowFile+".0")
if err != nil && !os.IsNotExist(err) {
a.logger.Error(err, "failed to rotate slow query log file")
metrics.LogRotationFailureCount.Inc()
return
}

if _, err := a.db.ExecContext(ctx, "FLUSH LOCAL ERROR LOGS, SLOW LOGS"); err != nil {
a.logger.Error(err, "failed to exec FLUSH LOCAL LOGS")
if _, err := a.db.ExecContext(ctx, "FLUSH LOCAL SLOW LOGS"); err != nil {
a.logger.Error(err, "failed to exec FLUSH LOCAL SLOW LOGS")
metrics.LogRotationFailureCount.Inc()
return
}

durationSeconds := time.Since(startTime).Seconds()
metrics.LogRotationDurationSeconds.Observe(durationSeconds)
}

// RotateLogIfSizeExceeded rotates log files if it exceeds rotationSize
func (a *Agent) RotateLogIfSizeExceeded(rotationSize int64) {
errFile := filepath.Join(a.logDir, mocoagent.MySQLErrorLogName)
errFileStat, err := os.Stat(errFile)
if err != nil {
a.logger.Error(err, "failed to get stat of error log file")
}
if errFileStat.Size() > rotationSize {
a.RotateErrorLog()
}

slowFile := filepath.Join(a.logDir, mocoagent.MySQLSlowLogName)
slowFileStat, err := os.Stat(slowFile)
if err != nil {
a.logger.Error(err, "failed to get stat of slow query log file")
}
if slowFileStat.Size() > rotationSize {
a.RotateSlowLog()
}
}
78 changes: 72 additions & 6 deletions server/rotate_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package server

import (
"bytes"
"os"
"path/filepath"
"time"
Expand All @@ -12,7 +13,7 @@ import (
"github.com/prometheus/client_golang/prometheus/testutil"
)

var _ = Describe("log rotation", func() {
var _ = Describe("log rotation", Ordered, func() {
It("should rotate logs", func() {
By("starting MySQLd")
StartMySQLD(replicaHost, replicaPort, replicaServerID)
Expand Down Expand Up @@ -45,13 +46,14 @@ var _ = Describe("log rotation", func() {
Expect(err).ShouldNot(HaveOccurred())
}

agent.RotateLog()
agent.RotateErrorLog()
agent.RotateSlowLog()

for _, file := range logFiles {
_, err := os.Stat(file + ".0")
Expect(err).ShouldNot(HaveOccurred())
}
Expect(testutil.ToFloat64(metrics.LogRotationCount)).To(BeNumerically("==", 1))
Expect(testutil.ToFloat64(metrics.LogRotationCount)).To(BeNumerically("==", 2))
Expect(testutil.ToFloat64(metrics.LogRotationFailureCount)).To(BeNumerically("==", 0))

By("creating the same name directory")
Expand All @@ -62,9 +64,73 @@ var _ = Describe("log rotation", func() {
Expect(err).ShouldNot(HaveOccurred())
}

agent.RotateLog()
agent.RotateErrorLog()
agent.RotateSlowLog()

Expect(testutil.ToFloat64(metrics.LogRotationCount)).To(BeNumerically("==", 2))
Expect(testutil.ToFloat64(metrics.LogRotationFailureCount)).To(BeNumerically("==", 1))
Expect(testutil.ToFloat64(metrics.LogRotationCount)).To(BeNumerically("==", 4))
Expect(testutil.ToFloat64(metrics.LogRotationFailureCount)).To(BeNumerically("==", 2))
})

It("should rotate logs by RotateLogIfSizeExceeded if size exceeds", func() {
By("starting MySQLd")
StartMySQLD(replicaHost, replicaPort, replicaServerID)
defer StopAndRemoveMySQLD(replicaHost)

sockFile := filepath.Join(socketDir(replicaHost), "mysqld.sock")
tmpDir, err := os.MkdirTemp("", "moco-test-agent-")
Expect(err).NotTo(HaveOccurred())
defer os.RemoveAll(tmpDir)

conf := MySQLAccessorConfig{
Host: "localhost",
Port: replicaPort,
Password: agentUserPassword,
ConnMaxIdleTime: 30 * time.Minute,
ConnectionTimeout: 3 * time.Second,
ReadTimeout: 30 * time.Second,
}
agent, err := New(conf, testClusterName, sockFile, tmpDir, maxDelayThreshold, time.Second, testLogger)
Expect(err).ShouldNot(HaveOccurred())
defer agent.CloseDB()

By("preparing log files for testing")
slowFile := filepath.Join(tmpDir, mocoagent.MySQLSlowLogName)
errFile := filepath.Join(tmpDir, mocoagent.MySQLErrorLogName)
logFiles := []string{slowFile, errFile}

logDataSize := 512
data := bytes.Repeat([]byte("a"), logDataSize)
for _, file := range logFiles {
f, err := os.Create(file)
Expect(err).ShouldNot(HaveOccurred())
f.Write(data)
}

agent.RotateLogIfSizeExceeded(int64(logDataSize) + 1)

Expect(testutil.ToFloat64(metrics.LogRotationCount)).To(BeNumerically("==", 4))
Expect(testutil.ToFloat64(metrics.LogRotationFailureCount)).To(BeNumerically("==", 2))

agent.RotateLogIfSizeExceeded(int64(logDataSize) - 1)

for _, file := range logFiles {
_, err := os.Stat(file + ".0")
Expect(err).ShouldNot(HaveOccurred())
}
Expect(testutil.ToFloat64(metrics.LogRotationCount)).To(BeNumerically("==", 6))
Expect(testutil.ToFloat64(metrics.LogRotationFailureCount)).To(BeNumerically("==", 2))

By("creating the same name directory")
for _, file := range logFiles {
err := os.Rename(file+".0", file)
Expect(err).ShouldNot(HaveOccurred())
err = os.Mkdir(file+".0", 0777)
Expect(err).ShouldNot(HaveOccurred())
}

agent.RotateLogIfSizeExceeded(int64(logDataSize) - 1)

Expect(testutil.ToFloat64(metrics.LogRotationCount)).To(BeNumerically("==", 8))
Expect(testutil.ToFloat64(metrics.LogRotationFailureCount)).To(BeNumerically("==", 4))
})
})

0 comments on commit 6d7ad28

Please sign in to comment.