Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support providing configuration from YAML files #123

Merged
merged 9 commits into from
Jul 2, 2024
3 changes: 2 additions & 1 deletion proxy/launch.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ func runSignalListener(cancelFunc context.CancelFunc) {
}

func launchProxy(profilingSupported bool) {
conf, err := config.New().ParseEnvVars()
conf, err := config.New().LoadConfig()

if err != nil {
log.Errorf("Error loading configuration: %v. Aborting startup.", err)
os.Exit(-1)
Expand Down
174 changes: 105 additions & 69 deletions proxy/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import (
"github.com/datastax/zdm-proxy/proxy/pkg/common"
"github.com/kelseyhightower/envconfig"
log "github.com/sirupsen/logrus"
"gopkg.in/yaml.v3"
"net"
"os"
"strconv"
"strings"
)
Expand All @@ -16,111 +18,111 @@ type Config struct {

// Global bucket

PrimaryCluster string `default:"ORIGIN" split_words:"true"`
ReadMode string `default:"PRIMARY_ONLY" split_words:"true"`
ReplaceCqlFunctions bool `default:"false" split_words:"true"`
AsyncHandshakeTimeoutMs int `default:"4000" split_words:"true"`
LogLevel string `default:"INFO" split_words:"true"`
PrimaryCluster string `default:"ORIGIN" split_words:"true" yaml:"primary_cluster"`
ReadMode string `default:"PRIMARY_ONLY" split_words:"true" yaml:"read_mode"`
ReplaceCqlFunctions bool `default:"false" split_words:"true" yaml:"replace_cql_functions"`
AsyncHandshakeTimeoutMs int `default:"4000" split_words:"true" yaml:"async_handshake_timeout_ms"`
LogLevel string `default:"INFO" split_words:"true" yaml:"log_level"`

// Proxy Topology (also known as system.peers "virtualization") bucket

ProxyTopologyIndex int `default:"0" split_words:"true"`
ProxyTopologyAddresses string `split_words:"true"`
ProxyTopologyNumTokens int `default:"8" split_words:"true"`
ProxyTopologyIndex int `default:"0" split_words:"true" yaml:"proxy_topology_index"`
ProxyTopologyAddresses string `split_words:"true" yaml:"proxy_topology_addresses"`
ProxyTopologyNumTokens int `default:"8" split_words:"true" yaml:"proxy_topology_num_tokens"`

// Origin bucket

OriginContactPoints string `split_words:"true"`
OriginPort int `default:"9042" split_words:"true"`
OriginSecureConnectBundlePath string `split_words:"true"`
OriginLocalDatacenter string `split_words:"true"`
OriginUsername string `required:"true" split_words:"true"`
OriginPassword string `required:"true" split_words:"true" json:"-"`
OriginConnectionTimeoutMs int `default:"30000" split_words:"true"`
OriginContactPoints string `split_words:"true" yaml:"origin_contact_points"`
OriginPort int `default:"9042" split_words:"true" yaml:"origin_port"`
OriginSecureConnectBundlePath string `split_words:"true" yaml:"origin_secure_connect_bundle_path"`
OriginLocalDatacenter string `split_words:"true" yaml:"origin_local_datacenter"`
OriginUsername string `split_words:"true" yaml:"origin_username"`
OriginPassword string `split_words:"true" json:"-" yaml:"origin_password"`
OriginConnectionTimeoutMs int `default:"30000" split_words:"true" yaml:"origin_connection_timeout_ms"`

OriginTlsServerCaPath string `split_words:"true"`
OriginTlsClientCertPath string `split_words:"true"`
OriginTlsClientKeyPath string `split_words:"true"`
OriginTlsServerCaPath string `split_words:"true" yaml:"origin_tls_server_ca_path"`
OriginTlsClientCertPath string `split_words:"true" yaml:"origin_tls_client_cert_path"`
OriginTlsClientKeyPath string `split_words:"true" yaml:"origin_tls_client_key_path"`

// Target bucket

TargetContactPoints string `split_words:"true"`
TargetPort int `default:"9042" split_words:"true"`
TargetSecureConnectBundlePath string `split_words:"true"`
TargetLocalDatacenter string `split_words:"true"`
TargetUsername string `required:"true" split_words:"true"`
TargetPassword string `required:"true" split_words:"true" json:"-"`
TargetConnectionTimeoutMs int `default:"30000" split_words:"true"`
TargetContactPoints string `split_words:"true" yaml:"target_contact_points"`
TargetPort int `default:"9042" split_words:"true" yaml:"target_port"`
TargetSecureConnectBundlePath string `split_words:"true" yaml:"target_secure_connect_bundle_path"`
TargetLocalDatacenter string `split_words:"true" yaml:"target_local_datacenter"`
TargetUsername string `split_words:"true" yaml:"target_username"`
TargetPassword string `split_words:"true" json:"-" yaml:"target_password"`
TargetConnectionTimeoutMs int `default:"30000" split_words:"true" yaml:"target_connection_timeout_ms"`

TargetTlsServerCaPath string `split_words:"true"`
TargetTlsClientCertPath string `split_words:"true"`
TargetTlsClientKeyPath string `split_words:"true"`
TargetTlsServerCaPath string `split_words:"true" yaml:"target_tls_server_ca_path"`
TargetTlsClientCertPath string `split_words:"true" yaml:"target_tls_client_cert_path"`
TargetTlsClientKeyPath string `split_words:"true" yaml:"target_tls_client_key_path"`

// Proxy bucket

ProxyListenAddress string `default:"localhost" split_words:"true"`
ProxyListenPort int `default:"14002" split_words:"true"`
ProxyRequestTimeoutMs int `default:"10000" split_words:"true"`
ProxyMaxClientConnections int `default:"1000" split_words:"true"`
ProxyMaxStreamIds int `default:"2048" split_words:"true"`
ProxyListenAddress string `default:"localhost" split_words:"true" yaml:"proxy_listen_address"`
ProxyListenPort int `default:"14002" split_words:"true" yaml:"proxy_listen_port"`
ProxyRequestTimeoutMs int `default:"10000" split_words:"true" yaml:"proxy_request_timeout_ms"`
ProxyMaxClientConnections int `default:"1000" split_words:"true" yaml:"proxy_max_client_connections"`
ProxyMaxStreamIds int `default:"2048" split_words:"true" yaml:"proxy_max_stream_ids"`

ProxyTlsCaPath string `split_words:"true"`
ProxyTlsCertPath string `split_words:"true"`
ProxyTlsKeyPath string `split_words:"true"`
ProxyTlsRequireClientAuth bool `split_words:"true"`
ProxyTlsCaPath string `split_words:"true" yaml:"proxy_tls_ca_path"`
ProxyTlsCertPath string `split_words:"true" yaml:"proxy_tls_cert_path"`
ProxyTlsKeyPath string `split_words:"true" yaml:"proxy_tls_key_path"`
ProxyTlsRequireClientAuth bool `split_words:"true" yaml:"proxy_tls_require_client_auth"`

// Metrics bucket

MetricsEnabled bool `default:"true" split_words:"true"`
MetricsAddress string `default:"localhost" split_words:"true"`
MetricsPort int `default:"14001" split_words:"true"`
MetricsPrefix string `default:"zdm" split_words:"true"`
MetricsEnabled bool `default:"true" split_words:"true" yaml:"metrics_enabled"`
MetricsAddress string `default:"localhost" split_words:"true" yaml:"metrics_address"`
MetricsPort int `default:"14001" split_words:"true" yaml:"metrics_port"`
MetricsPrefix string `default:"zdm" split_words:"true" yaml:"metrics_prefix"`

MetricsOriginLatencyBucketsMs string `default:"1, 4, 7, 10, 25, 40, 60, 80, 100, 150, 250, 500, 1000, 2500, 5000, 10000, 15000" split_words:"true"`
MetricsTargetLatencyBucketsMs string `default:"1, 4, 7, 10, 25, 40, 60, 80, 100, 150, 250, 500, 1000, 2500, 5000, 10000, 15000" split_words:"true"`
MetricsAsyncReadLatencyBucketsMs string `default:"1, 4, 7, 10, 25, 40, 60, 80, 100, 150, 250, 500, 1000, 2500, 5000, 10000, 15000" split_words:"true"`
MetricsOriginLatencyBucketsMs string `default:"1, 4, 7, 10, 25, 40, 60, 80, 100, 150, 250, 500, 1000, 2500, 5000, 10000, 15000" split_words:"true" yaml:"metrics_origin_latency_buckets_ms"`
MetricsTargetLatencyBucketsMs string `default:"1, 4, 7, 10, 25, 40, 60, 80, 100, 150, 250, 500, 1000, 2500, 5000, 10000, 15000" split_words:"true" yaml:"metrics_target_latency_buckets_ms"`
MetricsAsyncReadLatencyBucketsMs string `default:"1, 4, 7, 10, 25, 40, 60, 80, 100, 150, 250, 500, 1000, 2500, 5000, 10000, 15000" split_words:"true" yaml:"metrics_async_read_latency_buckets_ms"`

// Heartbeat bucket

HeartbeatIntervalMs int `default:"30000" split_words:"true"`
HeartbeatIntervalMs int `default:"30000" split_words:"true" yaml:"heartbeat_interval_ms"`

HeartbeatRetryIntervalMinMs int `default:"250" split_words:"true"`
HeartbeatRetryIntervalMaxMs int `default:"30000" split_words:"true"`
HeartbeatRetryBackoffFactor float64 `default:"2" split_words:"true"`
HeartbeatFailureThreshold int `default:"1" split_words:"true"`
HeartbeatRetryIntervalMinMs int `default:"250" split_words:"true" yaml:"heartbeat_retry_interval_min_ms"`
HeartbeatRetryIntervalMaxMs int `default:"30000" split_words:"true" yaml:"heartbeat_retry_interval_max_ms"`
HeartbeatRetryBackoffFactor float64 `default:"2" split_words:"true" yaml:"heartbeat_retry_backoff_factor"`
HeartbeatFailureThreshold int `default:"1" split_words:"true" yaml:"heartbeat_failure_threshold"`

//////////////////////////////////////////////////////////////////////
/// THE SETTINGS BELOW AREN'T SUPPORTED AND MAY CHANGE AT ANY TIME ///
//////////////////////////////////////////////////////////////////////

SystemQueriesMode string `default:"ORIGIN" split_words:"true"`
SystemQueriesMode string `default:"ORIGIN" split_words:"true" yaml:"system_queries_mode"`

ForwardClientCredentialsToOrigin bool `default:"false" split_words:"true"` // only takes effect if both clusters have auth enabled
ForwardClientCredentialsToOrigin bool `default:"false" split_words:"true" yaml:"forward_client_credentials_to_origin"` // only takes effect if both clusters have auth enabled

OriginEnableHostAssignment bool `default:"true" split_words:"true"`
TargetEnableHostAssignment bool `default:"true" split_words:"true"`
OriginEnableHostAssignment bool `default:"true" split_words:"true" yaml:"origin_enable_host_assignment"`
TargetEnableHostAssignment bool `default:"true" split_words:"true" yaml:"target_enable_host_assignment"`

//////////////////////////////////////////////////////////////////////////////////////////////////////////
/// THE SETTINGS BELOW ARE FOR PERFORMANCE TUNING; THEY AREN'T SUPPORTED AND MAY CHANGE AT ANY TIME //////
//////////////////////////////////////////////////////////////////////////////////////////////////////////

RequestWriteQueueSizeFrames int `default:"128" split_words:"true"`
RequestWriteBufferSizeBytes int `default:"4096" split_words:"true"`
RequestReadBufferSizeBytes int `default:"32768" split_words:"true"`
RequestWriteQueueSizeFrames int `default:"128" split_words:"true" yaml:"request_write_queue_size_frames"`
RequestWriteBufferSizeBytes int `default:"4096" split_words:"true" yaml:"request_write_buffer_size_bytes"`
RequestReadBufferSizeBytes int `default:"32768" split_words:"true" yaml:"request_read_buffer_size_bytes"`

ResponseWriteQueueSizeFrames int `default:"128" split_words:"true"`
ResponseWriteBufferSizeBytes int `default:"8192" split_words:"true"`
ResponseReadBufferSizeBytes int `default:"32768" split_words:"true"`
ResponseWriteQueueSizeFrames int `default:"128" split_words:"true" yaml:"response_write_queue_size_frames"`
ResponseWriteBufferSizeBytes int `default:"8192" split_words:"true" yaml:"response_write_buffer_size_bytes"`
ResponseReadBufferSizeBytes int `default:"32768" split_words:"true" yaml:"response_read_buffer_size_bytes"`

RequestResponseMaxWorkers int `default:"-1" split_words:"true"`
WriteMaxWorkers int `default:"-1" split_words:"true"`
ReadMaxWorkers int `default:"-1" split_words:"true"`
ListenerMaxWorkers int `default:"-1" split_words:"true"`
RequestResponseMaxWorkers int `default:"-1" split_words:"true" yaml:"request_response_max_workers"`
WriteMaxWorkers int `default:"-1" split_words:"true" yaml:"write_max_workers"`
ReadMaxWorkers int `default:"-1" split_words:"true" yaml:"read_max_workers"`
ListenerMaxWorkers int `default:"-1" split_words:"true" yaml:"listener_max_workers"`

EventQueueSizeFrames int `default:"12" split_words:"true"`
EventQueueSizeFrames int `default:"12" split_words:"true" yaml:"event_queue_size_frames"`

AsyncConnectorWriteQueueSizeFrames int `default:"2048" split_words:"true"`
AsyncConnectorWriteBufferSizeBytes int `default:"4096" split_words:"true"`
AsyncConnectorWriteQueueSizeFrames int `default:"2048" split_words:"true" yaml:"async_connector_write_queue_size_frames"`
AsyncConnectorWriteBufferSizeBytes int `default:"4096" split_words:"true" yaml:"async_connector_write_buffer_size_bytes"`
}

func (c *Config) String() string {
Expand All @@ -133,12 +135,46 @@ func New() *Config {
return &Config{}
}

func (c *Config) loadFromFiles() error {
paths := os.Getenv("ZDM_CONFIG_FILES")
for _, path := range strings.Split(paths, ",") {
if len(path) == 0 {
continue
}
file, err := os.Open(path)
if err != nil {
return fmt.Errorf("could not read configuration file %v: %w", path, err)
}
defer file.Close()

dec := yaml.NewDecoder(file)
if err = dec.Decode(c); err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if the same config settings are defined in multiple files? As it stands it seems that they will be overwritten which is not a very clear behavior. We should probably return an error and fail the launch if that happens.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was intentional to allow to override. At the end we already print all applied configuration variables, so there should be no confusion.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I heavily disagree with this, I don't see any benefit to it and it might just lead to user error, users never check the configuration that is printed in the logs, it's mostly for us devs to troubleshoot issues.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hadn't thought about the possibility of the same value being set in multiple files, but it is a good point and definitely a risk.

I would see the value in supporting multiple files so that users can have one with stricter governance and another with easier access. For example, they may want to have one file containing credentials / SCB path / contact points, with restricted access, and another with things like read mode, primary cluster, log level etc etc which can be more easily accessed.

This could also be managed by having clearly structured and named files, from which we extract only the expected properties. This would not leave it up to the user to define any property in any file, and would ensure that each property is defined and considered exactly once. The downside is that we need to parse each file specifically for the expected content, but the configuration properties themselves do not tend to change, so I wouldn't see it as a big problem.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we are overengeneering this a bit... In general most tools just allow you to provide a config file and that's it. In the case of DSE, DSE allows you to set a config setting that is a path to a key file so that DSE can decrypt the credentials.

Can we take a step back and actually think if we need multiple config files? I feel like the DSE approach is a simple one but effective no?

return fmt.Errorf("could not parse yaml file %v: %w", path, err)
}
}
return nil
}

// ParseEnvVars fills out the fields of the Config struct according to envconfig rules
// See: Usage @ https://github.com/kelseyhightower/envconfig
func (c *Config) ParseEnvVars() (*Config, error) {
func (c *Config) parseEnvVars() error {
err := envconfig.Process("ZDM", c)
if err != nil {
return nil, fmt.Errorf("could not load environment variables: %w", err)
return fmt.Errorf("could not load environment variables: %w", err)
}

return nil
}

func (c *Config) LoadConfig() (*Config, error) {
err := c.parseEnvVars()
if err != nil {
return nil, err
}

err = c.loadFromFiles()
if err != nil {
return nil, err
joao-r-reis marked this conversation as resolved.
Show resolved Hide resolved
}

err = c.Validate()
Expand All @@ -148,7 +184,7 @@ func (c *Config) ParseEnvVars() (*Config, error) {

log.Infof("Parsed configuration: %v", c)

return c, nil
return c, err
lukasz-antoniak marked this conversation as resolved.
Show resolved Hide resolved
}

func lookupFirstIp4(host string) (net.IP, error) {
Expand Down
2 changes: 1 addition & 1 deletion proxy/pkg/config/config_dual_reads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestConfig_ParseReadMode(t *testing.T) {
setOriginContactPointsAndPortEnvVars()
setTargetContactPointsAndPortEnvVars()

conf, err := New().ParseEnvVars()
conf, err := New().LoadConfig()
if err != nil {
if tt.errExpected {
require.Equal(t, tt.errMsg, err.Error())
Expand Down
92 changes: 87 additions & 5 deletions proxy/pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func TestTargetConfig_WithBundleOnly(t *testing.T) {
// test-specific setup
setEnvVar("ZDM_TARGET_SECURE_CONNECT_BUNDLE_PATH", "/path/to/target/bundle")

conf, err := New().ParseEnvVars()
conf, err := New().LoadConfig()
require.Nil(t, err)
require.Equal(t, conf.TargetSecureConnectBundlePath, "/path/to/target/bundle")
require.Empty(t, conf.TargetContactPoints)
Expand All @@ -36,7 +36,7 @@ func TestTargetConfig_WithHostnameAndPortOnly(t *testing.T) {
// test-specific setup
setTargetContactPointsAndPortEnvVars()

conf, err := New().ParseEnvVars()
conf, err := New().LoadConfig()
require.Nil(t, err)
require.Equal(t, conf.TargetContactPoints, "target.hostname.com")
require.Equal(t, conf.TargetPort, 5647)
Expand All @@ -56,7 +56,7 @@ func TestTargetConfig_WithBundleAndHostname(t *testing.T) {
setTargetContactPointsAndPortEnvVars()
setTargetSecureConnectBundleEnvVar()

_, err := New().ParseEnvVars()
_, err := New().LoadConfig()
require.Error(t, err, "TargetSecureConnectBundlePath and TargetContactPoints are "+
"mutually exclusive. Please specify only one of them.")
}
Expand All @@ -72,7 +72,7 @@ func TestTargetConfig_WithoutBundleAndHostname(t *testing.T) {

// no test-specific setup in this case

_, err := New().ParseEnvVars()
_, err := New().LoadConfig()
require.Error(t, err, "Both TargetSecureConnectBundlePath and TargetContactPoints are "+
"empty. Please specify either one of them.")
}
Expand All @@ -89,7 +89,89 @@ func TestTargetConfig_WithHostnameButWithoutPort(t *testing.T) {
//test-specific setup
setEnvVar("ZDM_TARGET_CONTACT_POINTS", "target.hostname.com")

c, err := New().ParseEnvVars()
c, err := New().LoadConfig()
require.Nil(t, err)
require.Equal(t, 9042, c.TargetPort)
}

func TestConfig_LoadNotExistingFile(t *testing.T) {
defer clearAllEnvVars()
clearAllEnvVars()

setConfigFilesEnvVar("/not/existing/file")

_, err := New().LoadConfig()
require.NotNil(t, err)
require.Contains(t, err.Error(), "could not read configuration file /not/existing/file")
}

func TestConfig_LoadConfigFromFile(t *testing.T) {
defer clearAllEnvVars()
clearAllEnvVars()

f, err := createConfigFile(`
primary_cluster: ORIGIN

origin_username: foo1
origin_password: bar1
target_username: foo2
target_password: bar2

origin_contact_points: 192.168.100.101
origin_port: 19042
target_contact_points: 192.168.100.102
target_port: 29042
proxy_listen_port: 39042
`)
defer removeConfigFile(f)
require.Nil(t, err)
setConfigFilesEnvVar(f.Name())

c, err := New().LoadConfig()
require.Nil(t, err)
require.Equal(t, "ORIGIN", c.PrimaryCluster)
require.Equal(t, "foo1", c.OriginUsername)
require.Equal(t, "bar1", c.OriginPassword)
require.Equal(t, "foo2", c.TargetUsername)
require.Equal(t, "bar2", c.TargetPassword)
require.Equal(t, "192.168.100.101", c.OriginContactPoints)
require.Equal(t, 19042, c.OriginPort)
require.Equal(t, "192.168.100.102", c.TargetContactPoints)
require.Equal(t, 29042, c.TargetPort)
require.Equal(t, 39042, c.ProxyListenPort)
}

func TestConfig_LoadConfigFromFileAndEnvVars(t *testing.T) {
defer clearAllEnvVars()
clearAllEnvVars()

// publicly available information stored as environment variables
setOriginContactPointsAndPortEnvVars()
setTargetContactPointsAndPortEnvVars()

// sensitive username and passwords stored inside two files
f1, err := createConfigFile(`
origin_username: foo1
origin_password: bar1
`)
defer removeConfigFile(f1)
require.Nil(t, err)
f2, err := createConfigFile(`
target_username: foo2
target_password: bar2
target_contact_points: secret.hostname.com
`)
defer removeConfigFile(f2)
require.Nil(t, err)

setConfigFilesEnvVar(f1.Name(), f2.Name())

c, err := New().LoadConfig()
require.Nil(t, err)
require.Equal(t, "foo1", c.OriginUsername)
require.Equal(t, "bar1", c.OriginPassword)
require.Equal(t, "foo2", c.TargetUsername)
require.Equal(t, "bar2", c.TargetPassword)
require.Equal(t, "origin.hostname.com", c.OriginContactPoints)
require.Equal(t, "secret.hostname.com", c.TargetContactPoints) // file value overrides env var
}
Loading
Loading