diff --git a/config/ansible-dns-inventory.yaml b/config/ansible-dns-inventory.yaml index 0b91c3c..8359992 100644 --- a/config/ansible-dns-inventory.yaml +++ b/config/ansible-dns-inventory.yaml @@ -69,6 +69,12 @@ etcd: path: "" # PEM-formatted private key (YAML multiline). Environment variable: ADI_ETCD_TLS_KEY_PEM pem: "" + # Etcd datasource import mode configuration. + import: + # Clear all existing host records before importing records from file. Environment variable: ADI_ETCD_IMPORT_CLEAR + clear: true + # Batch size used when pushing host records to etcd. Should not exceed the maximum number of operations permitted in a etcd transaction (max-txn-ops). Environment variable: ADI_ETCD_IMPORT_BATCH + batch: 128 # Host record parsing configuration. txt: # Key/value pair parsing configuration. diff --git a/internal/config/config.go b/internal/config/config.go index df83ca0..dabbb03 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -42,6 +42,8 @@ func configKeys() []string { "etcd.tls.certificate.pem", "etcd.tls.key.path", "etcd.tls.key.pem", + "etcd.import.clear", + "etcd.import.batch", "txt.kv.separator", "txt.kv.equalsign", "txt.vars.enabled", diff --git a/pkg/inventory/dns.go b/pkg/inventory/dns.go index 7262a16..5aa94c0 100644 --- a/pkg/inventory/dns.go +++ b/pkg/inventory/dns.go @@ -175,7 +175,7 @@ func (d *DNSDatasource) GetHostRecords(host string) ([]*DatasourceRecord, error) zone, err := d.findZone(host) if err != nil { - return nil, errors.Wrap(err, "failed to determine zone from hostname") + return nil, errors.Wrapf(err, "%s: failed to find zone", host) } // Get no-transfer host records. diff --git a/pkg/inventory/etcd.go b/pkg/inventory/etcd.go index 6053c5c..df5f74c 100644 --- a/pkg/inventory/etcd.go +++ b/pkg/inventory/etcd.go @@ -102,20 +102,25 @@ func (e *EtcdDatasource) getPrefix(prefix string) ([]*mvccpb.KeyValue, error) { return resp.Kvs, nil } -// putRecord publishes a host record via the datasource. -func (e *EtcdDatasource) putRecord(record *DatasourceRecord, count int) error { +// execTxn executes etcd operations in a transaction. +func (e *EtcdDatasource) execTxn(ops []etcdv3.Op) error { cfg := e.Config - zone, err := e.findZone(record.Hostname) - if err != nil { - return errors.Wrap(err, "failed to determine zone from hostname") - } + var batch []etcdv3.Op + for len(ops) > 0 { + if len(ops) >= cfg.Etcd.Import.Batch { + batch, ops = ops[0:cfg.Etcd.Import.Batch:cfg.Etcd.Import.Batch], ops[cfg.Etcd.Import.Batch:] + } else { + batch = ops + ops = nil + } - ctx, cancel := context.WithTimeout(context.Background(), cfg.Etcd.Timeout) - _, err = e.Client.Put(ctx, fmt.Sprintf("%s/%s/%d", zone, record.Hostname, count), record.Attributes) - cancel() - if err != nil { - return errors.Wrap(err, "etcd request failure") + ctx, cancel := context.WithTimeout(context.Background(), cfg.Etcd.Timeout) + _, err := e.Client.Txn(ctx).Then(batch...).Commit() + cancel() + if err != nil { + return errors.Wrap(err, "etcd request failure") + } } return nil @@ -144,7 +149,7 @@ func (e *EtcdDatasource) GetAllRecords() ([]*DatasourceRecord, error) { func (e *EtcdDatasource) GetHostRecords(host string) ([]*DatasourceRecord, error) { zone, err := e.findZone(host) if err != nil { - return nil, errors.Wrap(err, "failed to determine zone from hostname") + return nil, errors.Wrapf(err, "%s: failed to find zone", host) } prefix := zone + "/" + host @@ -158,8 +163,17 @@ func (e *EtcdDatasource) GetHostRecords(host string) ([]*DatasourceRecord, error // PublishRecords writes host records to the datasource. func (e *EtcdDatasource) PublishRecords(records []*DatasourceRecord) error { - counts := map[string]int{} + cfg := e.Config + log := e.Logger + if cfg.Etcd.Import.Clear { + if err := e.execTxn([]etcdv3.Op{etcdv3.OpDelete("", etcdv3.WithPrefix())}); err != nil { + return err + } + } + + ops := []etcdv3.Op{} + counts := map[string]int{} for _, record := range records { if _, ok := counts[record.Hostname]; ok { counts[record.Hostname]++ @@ -167,9 +181,17 @@ func (e *EtcdDatasource) PublishRecords(records []*DatasourceRecord) error { counts[record.Hostname] = 0 } - if err := e.putRecord(record, counts[record.Hostname]); err != nil { - return errors.Wrap(err, "failed to publish a host record") + zone, err := e.findZone(record.Hostname) + if err != nil { + log.Warnf("[%s] skipping host record: %v", record.Hostname, err) + continue } + + ops = append(ops, etcdv3.OpPut(fmt.Sprintf("%s/%s/%d", zone, record.Hostname, counts[record.Hostname]), record.Attributes)) + } + + if err := e.execTxn(ops); err != nil { + return err } return nil diff --git a/pkg/inventory/types.go b/pkg/inventory/types.go index 69cc045..9a68b22 100644 --- a/pkg/inventory/types.go +++ b/pkg/inventory/types.go @@ -95,6 +95,14 @@ type ( PEM string `mapstructure:"pem" default:""` } `mapstructure:"key"` } `mapstructure:"tls"` + // Etcd datasource import mode configuration. + Import struct { + // Clear all existing host records before importing records from file. + Clear bool `mapstructure:"clear" default:"true"` + // Batch size used when pushing host records to etcd. + // Should not exceed the maximum number of operations permitted in a etcd transaction (max-txn-ops). + Batch int `mapstructure:"batch" default:"128"` + } `mapstructure:"import"` } `mapstructure:"etcd"` // Host records parsing configuration. Txt struct {