From 4bb21b188c15d1076de1d4acf5812a3b4287b820 Mon Sep 17 00:00:00 2001 From: Dag Wullt Date: Sat, 1 Apr 2023 08:58:40 +0200 Subject: [PATCH] Add wait for indices --- monstache.go | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/monstache.go b/monstache.go index 5d51b8a..f544128 100644 --- a/monstache.go +++ b/monstache.go @@ -325,6 +325,7 @@ type configOptions struct { ElasticHealth1 int `toml:"elasticsearch-healthcheck-timeout"` ElasticPKIAuth elasticPKIAuth `toml:"elasticsearch-pki-auth"` ElasticAPIKey string `toml:"elasticsearch-api-key"` + WaitForIndices stringargs `toml:"wait-for-indices"` ResumeName string `toml:"resume-name"` NsRegex string `toml:"namespace-regex"` NsDropRegex string `toml:"namespace-drop-regex"` @@ -1774,6 +1775,7 @@ func (config *configOptions) parseCommandLineFlags() *configOptions { flag.Var(&config.FileNamespaces, "file-namespace", "A list of file namespaces") flag.Var(&config.PatchNamespaces, "patch-namespace", "A list of patch namespaces") flag.Var(&config.Workers, "workers", "A list of worker names") + flag.Var(&config.WaitForIndices, "wait-for-indices", "A list of index names to wait for before starting synchronisation") flag.BoolVar(&config.EnableHTTPServer, "enable-http-server", false, "True to enable an internal http server") flag.StringVar(&config.HTTPServerAddr, "http-server-addr", "", "The address the internal http server listens on") flag.BoolVar(&config.PruneInvalidJSON, "prune-invalid-json", false, "True to omit values which do not serialize to JSON such as +Inf and -Inf and thus cause errors") @@ -4361,6 +4363,7 @@ func (ic *indexClient) run() { ic.startDownload() ic.startPostProcess() ic.clusterWait() + ic.waitForIndices() ic.startListen() ic.startReadWait() ic.startExpireCreds() @@ -4825,6 +4828,25 @@ func (ic *indexClient) clusterWait() { } } +func (ic *indexClient) waitForIndices() { + if len(ic.config.WaitForIndices) > 0 { + for { + exists, err := ic.client.IndexExists(ic.config.WaitForIndices...).Do(context.Background()) + if err != nil { + errorLog.Printf("Error waiting for indices %v", ic.config.WaitForIndices) + time.Sleep(5 * time.Second) + continue + } + if !exists { + infoLog.Printf("Waiting for indices %v", ic.config.WaitForIndices) + time.Sleep(5 * time.Second) + continue + } + break + } + } +} + func (ic *indexClient) hasNewEvents() bool { if ic.lastTs.T > ic.lastTsSaved.T || (ic.lastTs.T == ic.lastTsSaved.T && ic.lastTs.I > ic.lastTsSaved.I) {