Skip to content

Commit

Permalink
Use batching and transactions to push records to etcd
Browse files Browse the repository at this point in the history
Support removing existing records from etcd before importing them from file.
  • Loading branch information
NeonSludge committed Jun 21, 2024
1 parent a864cab commit 272ef4a
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 16 deletions.
6 changes: 6 additions & 0 deletions config/ansible-dns-inventory.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ etcd:
path: ""
# PEM-formatted private key (YAML multiline). Environment variable: ADI_ETCD_TLS_KEY_PEM
pem: ""
# Etcd datasource import mode configuration.
import:
# Clear all existing host records before importing records from file. Environment variable: ADI_ETCD_IMPORT_CLEAR
clear: true
# Batch size used when pushing host records to etcd. Should not exceed the maximum number of operations permitted in a etcd transaction (max-txn-ops). Environment variable: ADI_ETCD_IMPORT_BATCH
batch: 128
# Host record parsing configuration.
txt:
# Key/value pair parsing configuration.
Expand Down
2 changes: 2 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ func configKeys() []string {
"etcd.tls.certificate.pem",
"etcd.tls.key.path",
"etcd.tls.key.pem",
"etcd.import.clear",
"etcd.import.batch",
"txt.kv.separator",
"txt.kv.equalsign",
"txt.vars.enabled",
Expand Down
2 changes: 1 addition & 1 deletion pkg/inventory/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (d *DNSDatasource) GetHostRecords(host string) ([]*DatasourceRecord, error)

zone, err := d.findZone(host)
if err != nil {
return nil, errors.Wrap(err, "failed to determine zone from hostname")
return nil, errors.Wrapf(err, "%s: failed to find zone", host)
}

// Get no-transfer host records.
Expand Down
52 changes: 37 additions & 15 deletions pkg/inventory/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,20 +102,25 @@ func (e *EtcdDatasource) getPrefix(prefix string) ([]*mvccpb.KeyValue, error) {
return resp.Kvs, nil
}

// putRecord publishes a host record via the datasource.
func (e *EtcdDatasource) putRecord(record *DatasourceRecord, count int) error {
// execTxn executes etcd operations in a transaction.
func (e *EtcdDatasource) execTxn(ops []etcdv3.Op) error {
cfg := e.Config

zone, err := e.findZone(record.Hostname)
if err != nil {
return errors.Wrap(err, "failed to determine zone from hostname")
}
var batch []etcdv3.Op
for len(ops) > 0 {
if len(ops) >= cfg.Etcd.Import.Batch {
batch, ops = ops[0:cfg.Etcd.Import.Batch:cfg.Etcd.Import.Batch], ops[cfg.Etcd.Import.Batch:]
} else {
batch = ops
ops = nil
}

ctx, cancel := context.WithTimeout(context.Background(), cfg.Etcd.Timeout)
_, err = e.Client.Put(ctx, fmt.Sprintf("%s/%s/%d", zone, record.Hostname, count), record.Attributes)
cancel()
if err != nil {
return errors.Wrap(err, "etcd request failure")
ctx, cancel := context.WithTimeout(context.Background(), cfg.Etcd.Timeout)
_, err := e.Client.Txn(ctx).Then(batch...).Commit()
cancel()
if err != nil {
return errors.Wrap(err, "etcd request failure")
}
}

return nil
Expand Down Expand Up @@ -144,7 +149,7 @@ func (e *EtcdDatasource) GetAllRecords() ([]*DatasourceRecord, error) {
func (e *EtcdDatasource) GetHostRecords(host string) ([]*DatasourceRecord, error) {
zone, err := e.findZone(host)
if err != nil {
return nil, errors.Wrap(err, "failed to determine zone from hostname")
return nil, errors.Wrapf(err, "%s: failed to find zone", host)
}

prefix := zone + "/" + host
Expand All @@ -158,18 +163,35 @@ func (e *EtcdDatasource) GetHostRecords(host string) ([]*DatasourceRecord, error

// PublishRecords writes host records to the datasource.
func (e *EtcdDatasource) PublishRecords(records []*DatasourceRecord) error {
counts := map[string]int{}
cfg := e.Config
log := e.Logger

if cfg.Etcd.Import.Clear {
if err := e.execTxn([]etcdv3.Op{etcdv3.OpDelete("", etcdv3.WithPrefix())}); err != nil {
return err
}
}

ops := []etcdv3.Op{}
counts := map[string]int{}
for _, record := range records {
if _, ok := counts[record.Hostname]; ok {
counts[record.Hostname]++
} else {
counts[record.Hostname] = 0
}

if err := e.putRecord(record, counts[record.Hostname]); err != nil {
return errors.Wrap(err, "failed to publish a host record")
zone, err := e.findZone(record.Hostname)
if err != nil {
log.Warnf("[%s] skipping host record: %v", record.Hostname, err)
continue
}

ops = append(ops, etcdv3.OpPut(fmt.Sprintf("%s/%s/%d", zone, record.Hostname, counts[record.Hostname]), record.Attributes))
}

if err := e.execTxn(ops); err != nil {
return err
}

return nil
Expand Down
8 changes: 8 additions & 0 deletions pkg/inventory/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,14 @@ type (
PEM string `mapstructure:"pem" default:""`
} `mapstructure:"key"`
} `mapstructure:"tls"`
// Etcd datasource import mode configuration.
Import struct {
// Clear all existing host records before importing records from file.
Clear bool `mapstructure:"clear" default:"true"`
// Batch size used when pushing host records to etcd.
// Should not exceed the maximum number of operations permitted in a etcd transaction (max-txn-ops).
Batch int `mapstructure:"batch" default:"128"`
} `mapstructure:"import"`
} `mapstructure:"etcd"`
// Host records parsing configuration.
Txt struct {
Expand Down

0 comments on commit 272ef4a

Please sign in to comment.