diff --git a/nats-logger/main.go b/nats-logger/main.go index 47817c3..89ad22c 100644 --- a/nats-logger/main.go +++ b/nats-logger/main.go @@ -17,11 +17,11 @@ limitations under the License. package main import ( - "context" "encoding/json" "log" "os" "strings" + "time" "github.com/hpcloud/tail" "github.com/nats-io/nats.go" @@ -78,23 +78,16 @@ func main() { } func publishClusterProvisioningLogs(source, subject, id string, nc *nats.Conn) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() for { - select { - case <-ctx.Done(): - return - default: - if err := publishFile(source, subject, id, nc, cancel); err != nil { - log.Printf("Could not publish file: %s", err) - } - - // time.Sleep(500 * time.Millisecond) + if err := publishFile(source, subject, id, nc); err != nil { + log.Printf("Could not publish file: %s", err) } + + time.Sleep(500 * time.Millisecond) } } -func publishFile(source, subject, id string, nc *nats.Conn, cancel context.CancelFunc) error { +func publishFile(source, subject, id string, nc *nats.Conn) error { t, err := tail.TailFile(source, tail.Config{Follow: true}) if err != nil { return err @@ -102,17 +95,15 @@ func publishFile(source, subject, id string, nc *nats.Conn, cancel context.Cance log.Printf("Publishing lines from %s to %s", source, subject) + status := TaskStatus(TaskStatusRunning) for line := range t.Lines { - status := generateTaskStatus(line.Text) + if status == TaskStatusRunning { + status = generateTaskStatus(line.Text) + } msg := newResponse(status, id, "", line.Text) - if err := nc.Publish(subject, msg); err != nil { + if err = nc.Publish(subject, msg); err != nil { klog.ErrorS(err, "failed to publish log") } - - if status != TaskStatusRunning { - cancel() - return nil - } } return nil