Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support providing configuration from YAML files #123

Merged
merged 9 commits into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ require (
github.com/google/uuid v1.1.1
github.com/jpillora/backoff v1.0.0
github.com/kelseyhightower/envconfig v1.4.0
github.com/mcuadros/go-defaults v1.2.0
github.com/prometheus/client_golang v1.3.0
github.com/prometheus/client_model v0.1.0
github.com/rs/zerolog v1.20.0
github.com/sirupsen/logrus v1.6.0
github.com/stretchr/testify v1.8.0
gopkg.in/yaml.v3 v3.0.1
)

require (
Expand All @@ -33,5 +35,4 @@ require (
github.com/prometheus/procfs v0.0.8 // indirect
golang.org/x/sys v0.3.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
7 changes: 6 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,15 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/mcuadros/go-defaults v1.2.0 h1:FODb8WSf0uGaY8elWJAkoLL0Ri6AlZ1bFlenk56oZtc=
github.com/mcuadros/go-defaults v1.2.0/go.mod h1:WEZtHEVIGYVDqkKSWBdWKUVdRyKlMfulPaGDWIVeCWY=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/pierrec/lz4/v4 v4.0.3 h1:vNQKSVZNYUEAvRY9FaUXAF1XPbSOHJtDTiP41kzDz2E=
github.com/pierrec/lz4/v4 v4.0.3/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down Expand Up @@ -127,8 +131,9 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc=
gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
Expand Down
5 changes: 3 additions & 2 deletions proxy/launch.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ func runSignalListener(cancelFunc context.CancelFunc) {
}()
}

func launchProxy(profilingSupported bool) {
conf, err := config.New().ParseEnvVars()
func launchProxy(profilingSupported bool, configFile string) {
conf, err := config.New().LoadConfig(configFile)

if err != nil {
log.Errorf("Error loading configuration: %v. Aborting startup.", err)
os.Exit(-1)
Expand Down
5 changes: 3 additions & 2 deletions proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ import (
// TODO: to be managed externally
const ZdmVersionString = "2.2.0"

var displayVersion = flag.Bool("version", false, "Display the ZDM proxy version and exit")
var displayVersion = flag.Bool("version", false, "display the ZDM proxy version and exit")
lukasz-antoniak marked this conversation as resolved.
Show resolved Hide resolved
var configFile = flag.String("config", "", "specify path to ZDM configuration file")

func main() {

Expand All @@ -29,5 +30,5 @@ func main() {
// Always record version information (very) early in the log
log.Infof("Starting ZDM proxy version %v", ZdmVersionString)

launchProxy(false)
launchProxy(false, *configFile)
}
3 changes: 2 additions & 1 deletion proxy/main_profiling.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

var cpuProfile = flag.String("cpu_profile", "", "write cpu profile to the specified file")
var memProfile = flag.String("mem_profile", "", "write memory profile to the specified file")
var configFile = flag.String("config", "", "specify path to ZDM configuration file")

func main() {

Expand Down Expand Up @@ -61,5 +62,5 @@ func main() {
}()
}

launchProxy(true)
launchProxy(true, configFile)
lukasz-antoniak marked this conversation as resolved.
Show resolved Hide resolved
}
171 changes: 102 additions & 69 deletions proxy/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ import (
"fmt"
"github.com/datastax/zdm-proxy/proxy/pkg/common"
"github.com/kelseyhightower/envconfig"
def "github.com/mcuadros/go-defaults"
log "github.com/sirupsen/logrus"
"gopkg.in/yaml.v3"
"net"
"os"
"strconv"
"strings"
)
Expand All @@ -16,111 +19,111 @@ type Config struct {

// Global bucket

PrimaryCluster string `default:"ORIGIN" split_words:"true"`
ReadMode string `default:"PRIMARY_ONLY" split_words:"true"`
ReplaceCqlFunctions bool `default:"false" split_words:"true"`
AsyncHandshakeTimeoutMs int `default:"4000" split_words:"true"`
LogLevel string `default:"INFO" split_words:"true"`
PrimaryCluster string `default:"ORIGIN" split_words:"true" yaml:"primary_cluster"`
ReadMode string `default:"PRIMARY_ONLY" split_words:"true" yaml:"read_mode"`
ReplaceCqlFunctions bool `default:"false" split_words:"true" yaml:"replace_cql_functions"`
AsyncHandshakeTimeoutMs int `default:"4000" split_words:"true" yaml:"async_handshake_timeout_ms"`
LogLevel string `default:"INFO" split_words:"true" yaml:"log_level"`

// Proxy Topology (also known as system.peers "virtualization") bucket

ProxyTopologyIndex int `default:"0" split_words:"true"`
ProxyTopologyAddresses string `split_words:"true"`
ProxyTopologyNumTokens int `default:"8" split_words:"true"`
ProxyTopologyIndex int `default:"0" split_words:"true" yaml:"proxy_topology_index"`
ProxyTopologyAddresses string `split_words:"true" yaml:"proxy_topology_addresses"`
ProxyTopologyNumTokens int `default:"8" split_words:"true" yaml:"proxy_topology_num_tokens"`

// Origin bucket

OriginContactPoints string `split_words:"true"`
OriginPort int `default:"9042" split_words:"true"`
OriginSecureConnectBundlePath string `split_words:"true"`
OriginLocalDatacenter string `split_words:"true"`
OriginUsername string `required:"true" split_words:"true"`
OriginPassword string `required:"true" split_words:"true" json:"-"`
OriginConnectionTimeoutMs int `default:"30000" split_words:"true"`
OriginContactPoints string `split_words:"true" yaml:"origin_contact_points"`
OriginPort int `default:"9042" split_words:"true" yaml:"origin_port"`
OriginSecureConnectBundlePath string `split_words:"true" yaml:"origin_secure_connect_bundle_path"`
OriginLocalDatacenter string `split_words:"true" yaml:"origin_local_datacenter"`
OriginUsername string `required:"true" split_words:"true" yaml:"origin_username"`
OriginPassword string `required:"true" split_words:"true" json:"-" yaml:"origin_password"`
OriginConnectionTimeoutMs int `default:"30000" split_words:"true" yaml:"origin_connection_timeout_ms"`

OriginTlsServerCaPath string `split_words:"true"`
OriginTlsClientCertPath string `split_words:"true"`
OriginTlsClientKeyPath string `split_words:"true"`
OriginTlsServerCaPath string `split_words:"true" yaml:"origin_tls_server_ca_path"`
OriginTlsClientCertPath string `split_words:"true" yaml:"origin_tls_client_cert_path"`
OriginTlsClientKeyPath string `split_words:"true" yaml:"origin_tls_client_key_path"`

// Target bucket

TargetContactPoints string `split_words:"true"`
TargetPort int `default:"9042" split_words:"true"`
TargetSecureConnectBundlePath string `split_words:"true"`
TargetLocalDatacenter string `split_words:"true"`
TargetUsername string `required:"true" split_words:"true"`
TargetPassword string `required:"true" split_words:"true" json:"-"`
TargetConnectionTimeoutMs int `default:"30000" split_words:"true"`
TargetContactPoints string `split_words:"true" yaml:"target_contact_points"`
TargetPort int `default:"9042" split_words:"true" yaml:"target_port"`
TargetSecureConnectBundlePath string `split_words:"true" yaml:"target_secure_connect_bundle_path"`
TargetLocalDatacenter string `split_words:"true" yaml:"target_local_datacenter"`
TargetUsername string `required:"true" split_words:"true" yaml:"target_username"`
TargetPassword string `required:"true" split_words:"true" json:"-" yaml:"target_password"`
TargetConnectionTimeoutMs int `default:"30000" split_words:"true" yaml:"target_connection_timeout_ms"`

TargetTlsServerCaPath string `split_words:"true"`
TargetTlsClientCertPath string `split_words:"true"`
TargetTlsClientKeyPath string `split_words:"true"`
TargetTlsServerCaPath string `split_words:"true" yaml:"target_tls_server_ca_path"`
TargetTlsClientCertPath string `split_words:"true" yaml:"target_tls_client_cert_path"`
TargetTlsClientKeyPath string `split_words:"true" yaml:"target_tls_client_key_path"`

// Proxy bucket

ProxyListenAddress string `default:"localhost" split_words:"true"`
ProxyListenPort int `default:"14002" split_words:"true"`
ProxyRequestTimeoutMs int `default:"10000" split_words:"true"`
ProxyMaxClientConnections int `default:"1000" split_words:"true"`
ProxyMaxStreamIds int `default:"2048" split_words:"true"`
ProxyListenAddress string `default:"localhost" split_words:"true" yaml:"proxy_listen_address"`
ProxyListenPort int `default:"14002" split_words:"true" yaml:"proxy_listen_port"`
ProxyRequestTimeoutMs int `default:"10000" split_words:"true" yaml:"proxy_request_timeout_ms"`
ProxyMaxClientConnections int `default:"1000" split_words:"true" yaml:"proxy_max_client_connections"`
ProxyMaxStreamIds int `default:"2048" split_words:"true" yaml:"proxy_max_stream_ids"`

ProxyTlsCaPath string `split_words:"true"`
ProxyTlsCertPath string `split_words:"true"`
ProxyTlsKeyPath string `split_words:"true"`
ProxyTlsRequireClientAuth bool `split_words:"true"`
ProxyTlsCaPath string `split_words:"true" yaml:"proxy_tls_ca_path"`
ProxyTlsCertPath string `split_words:"true" yaml:"proxy_tls_cert_path"`
ProxyTlsKeyPath string `split_words:"true" yaml:"proxy_tls_key_path"`
ProxyTlsRequireClientAuth bool `split_words:"true" yaml:"proxy_tls_require_client_auth"`

// Metrics bucket

MetricsEnabled bool `default:"true" split_words:"true"`
MetricsAddress string `default:"localhost" split_words:"true"`
MetricsPort int `default:"14001" split_words:"true"`
MetricsPrefix string `default:"zdm" split_words:"true"`
MetricsEnabled bool `default:"true" split_words:"true" yaml:"metrics_enabled"`
MetricsAddress string `default:"localhost" split_words:"true" yaml:"metrics_address"`
MetricsPort int `default:"14001" split_words:"true" yaml:"metrics_port"`
MetricsPrefix string `default:"zdm" split_words:"true" yaml:"metrics_prefix"`

MetricsOriginLatencyBucketsMs string `default:"1, 4, 7, 10, 25, 40, 60, 80, 100, 150, 250, 500, 1000, 2500, 5000, 10000, 15000" split_words:"true"`
MetricsTargetLatencyBucketsMs string `default:"1, 4, 7, 10, 25, 40, 60, 80, 100, 150, 250, 500, 1000, 2500, 5000, 10000, 15000" split_words:"true"`
MetricsAsyncReadLatencyBucketsMs string `default:"1, 4, 7, 10, 25, 40, 60, 80, 100, 150, 250, 500, 1000, 2500, 5000, 10000, 15000" split_words:"true"`
MetricsOriginLatencyBucketsMs string `default:"1, 4, 7, 10, 25, 40, 60, 80, 100, 150, 250, 500, 1000, 2500, 5000, 10000, 15000" split_words:"true" yaml:"metrics_origin_latency_buckets_ms"`
MetricsTargetLatencyBucketsMs string `default:"1, 4, 7, 10, 25, 40, 60, 80, 100, 150, 250, 500, 1000, 2500, 5000, 10000, 15000" split_words:"true" yaml:"metrics_target_latency_buckets_ms"`
MetricsAsyncReadLatencyBucketsMs string `default:"1, 4, 7, 10, 25, 40, 60, 80, 100, 150, 250, 500, 1000, 2500, 5000, 10000, 15000" split_words:"true" yaml:"metrics_async_read_latency_buckets_ms"`

// Heartbeat bucket

HeartbeatIntervalMs int `default:"30000" split_words:"true"`
HeartbeatIntervalMs int `default:"30000" split_words:"true" yaml:"heartbeat_interval_ms"`

HeartbeatRetryIntervalMinMs int `default:"250" split_words:"true"`
HeartbeatRetryIntervalMaxMs int `default:"30000" split_words:"true"`
HeartbeatRetryBackoffFactor float64 `default:"2" split_words:"true"`
HeartbeatFailureThreshold int `default:"1" split_words:"true"`
HeartbeatRetryIntervalMinMs int `default:"250" split_words:"true" yaml:"heartbeat_retry_interval_min_ms"`
HeartbeatRetryIntervalMaxMs int `default:"30000" split_words:"true" yaml:"heartbeat_retry_interval_max_ms"`
HeartbeatRetryBackoffFactor float64 `default:"2" split_words:"true" yaml:"heartbeat_retry_backoff_factor"`
HeartbeatFailureThreshold int `default:"1" split_words:"true" yaml:"heartbeat_failure_threshold"`

//////////////////////////////////////////////////////////////////////
/// THE SETTINGS BELOW AREN'T SUPPORTED AND MAY CHANGE AT ANY TIME ///
//////////////////////////////////////////////////////////////////////

SystemQueriesMode string `default:"ORIGIN" split_words:"true"`
SystemQueriesMode string `default:"ORIGIN" split_words:"true" yaml:"system_queries_mode"`

ForwardClientCredentialsToOrigin bool `default:"false" split_words:"true"` // only takes effect if both clusters have auth enabled
ForwardClientCredentialsToOrigin bool `default:"false" split_words:"true" yaml:"forward_client_credentials_to_origin"` // only takes effect if both clusters have auth enabled

OriginEnableHostAssignment bool `default:"true" split_words:"true"`
TargetEnableHostAssignment bool `default:"true" split_words:"true"`
OriginEnableHostAssignment bool `default:"true" split_words:"true" yaml:"origin_enable_host_assignment"`
TargetEnableHostAssignment bool `default:"true" split_words:"true" yaml:"target_enable_host_assignment"`

//////////////////////////////////////////////////////////////////////////////////////////////////////////
/// THE SETTINGS BELOW ARE FOR PERFORMANCE TUNING; THEY AREN'T SUPPORTED AND MAY CHANGE AT ANY TIME //////
//////////////////////////////////////////////////////////////////////////////////////////////////////////

RequestWriteQueueSizeFrames int `default:"128" split_words:"true"`
RequestWriteBufferSizeBytes int `default:"4096" split_words:"true"`
RequestReadBufferSizeBytes int `default:"32768" split_words:"true"`
RequestWriteQueueSizeFrames int `default:"128" split_words:"true" yaml:"request_write_queue_size_frames"`
RequestWriteBufferSizeBytes int `default:"4096" split_words:"true" yaml:"request_write_buffer_size_bytes"`
RequestReadBufferSizeBytes int `default:"32768" split_words:"true" yaml:"request_read_buffer_size_bytes"`

ResponseWriteQueueSizeFrames int `default:"128" split_words:"true"`
ResponseWriteBufferSizeBytes int `default:"8192" split_words:"true"`
ResponseReadBufferSizeBytes int `default:"32768" split_words:"true"`
ResponseWriteQueueSizeFrames int `default:"128" split_words:"true" yaml:"response_write_queue_size_frames"`
ResponseWriteBufferSizeBytes int `default:"8192" split_words:"true" yaml:"response_write_buffer_size_bytes"`
ResponseReadBufferSizeBytes int `default:"32768" split_words:"true" yaml:"response_read_buffer_size_bytes"`

RequestResponseMaxWorkers int `default:"-1" split_words:"true"`
WriteMaxWorkers int `default:"-1" split_words:"true"`
ReadMaxWorkers int `default:"-1" split_words:"true"`
ListenerMaxWorkers int `default:"-1" split_words:"true"`
RequestResponseMaxWorkers int `default:"-1" split_words:"true" yaml:"request_response_max_workers"`
WriteMaxWorkers int `default:"-1" split_words:"true" yaml:"write_max_workers"`
ReadMaxWorkers int `default:"-1" split_words:"true" yaml:"read_max_workers"`
ListenerMaxWorkers int `default:"-1" split_words:"true" yaml:"listener_max_workers"`

EventQueueSizeFrames int `default:"12" split_words:"true"`
EventQueueSizeFrames int `default:"12" split_words:"true" yaml:"event_queue_size_frames"`

AsyncConnectorWriteQueueSizeFrames int `default:"2048" split_words:"true"`
AsyncConnectorWriteBufferSizeBytes int `default:"4096" split_words:"true"`
AsyncConnectorWriteQueueSizeFrames int `default:"2048" split_words:"true" yaml:"async_connector_write_queue_size_frames"`
AsyncConnectorWriteBufferSizeBytes int `default:"4096" split_words:"true" yaml:"async_connector_write_buffer_size_bytes"`
}

func (c *Config) String() string {
Expand All @@ -133,12 +136,42 @@ func New() *Config {
return &Config{}
}

func (c *Config) loadFromFile(configFile string) error {
file, err := os.Open(configFile)
if err != nil {
return fmt.Errorf("could not read configuration file %v: %w", configFile, err)
}
defer file.Close()

def.SetDefaults(c) // apply default tag, it is not supported by YAML decoder
lukasz-antoniak marked this conversation as resolved.
Show resolved Hide resolved
dec := yaml.NewDecoder(file)
if err = dec.Decode(c); err != nil {
return fmt.Errorf("could not parse yaml file %v: %w", configFile, err)
}
return nil
}

// ParseEnvVars fills out the fields of the Config struct according to envconfig rules
// See: Usage @ https://github.com/kelseyhightower/envconfig
func (c *Config) ParseEnvVars() (*Config, error) {
func (c *Config) parseEnvVars() error {
err := envconfig.Process("ZDM", c)
if err != nil {
return nil, fmt.Errorf("could not load environment variables: %w", err)
return fmt.Errorf("could not load environment variables: %w", err)
}

return nil
}

func (c *Config) LoadConfig(configFile string) (*Config, error) {
var err error

if configFile != "" {
err = c.loadFromFile(configFile)
} else {
err = c.parseEnvVars()
}
if err != nil {
return nil, err
}

err = c.Validate()
Expand All @@ -148,7 +181,7 @@ func (c *Config) ParseEnvVars() (*Config, error) {

log.Infof("Parsed configuration: %v", c)

return c, nil
return c, err
lukasz-antoniak marked this conversation as resolved.
Show resolved Hide resolved
}

func lookupFirstIp4(host string) (net.IP, error) {
Expand Down
2 changes: 1 addition & 1 deletion proxy/pkg/config/config_dual_reads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestConfig_ParseReadMode(t *testing.T) {
setOriginContactPointsAndPortEnvVars()
setTargetContactPointsAndPortEnvVars()

conf, err := New().ParseEnvVars()
conf, err := New().LoadConfig("")
if err != nil {
if tt.errExpected {
require.Equal(t, tt.errMsg, err.Error())
Expand Down
Loading
Loading