From 37352d7ebf23e9cd6505e001f911d94b6731293d Mon Sep 17 00:00:00 2001 From: Stephen Soltesz Date: Fri, 20 Oct 2023 13:01:24 -0400 Subject: [PATCH] Add support for autoload/v2 conventions to jostler (#51) * Add organization flag * Add tests for organization options * Separate schema validation from uploads * Update organization flag help and validation * Place backward compatible check last * Add -upload-schema flag * Update comments * Add new test cases for invalid orgs and upload * Update coverage flags * Atomic coverage mode * Clarify flag help text * Simplify * Use predefined error * Count * Remove old v2 table also * Use distinct errors for Validate cases * Add new backward compatible test case * Use port :0 to prevent bind errors * Add TODO for simplifying support logic --- .travis.yml | 2 +- cmd/jostler/cli.go | 38 +++++++--- cmd/jostler/main.go | 20 +++++- cmd/jostler/main_test.go | 124 ++++++++++++++++++++++++++++++++- internal/schema/schema.go | 71 ++++++++++++++----- internal/schema/schema_test.go | 2 +- 6 files changed, 222 insertions(+), 35 deletions(-) diff --git a/.travis.yml b/.travis.yml index 6695f13..76bce70 100644 --- a/.travis.yml +++ b/.travis.yml @@ -14,7 +14,7 @@ script: - go vet ./... - CGO_ENABLED=0 go build ./... - go test ./... -race -- go test ./... -v -coverprofile=_coverage.cov +- go test ./... -v -covermode=count -coverprofile=_coverage.cov after_success: # Coveralls diff --git a/cmd/jostler/cli.go b/cmd/jostler/cli.go index bbaef7e..5823850 100644 --- a/cmd/jostler/cli.go +++ b/cmd/jostler/cli.go @@ -5,6 +5,7 @@ import ( "errors" "flag" "fmt" + "regexp" "strings" "time" @@ -22,6 +23,8 @@ var ( bucket string gcsDataDir string mlabNodeName string + organization string + uploadSchema bool = true // Flags related to bundles. dtSchemaFiles flagx.StringArray @@ -43,15 +46,21 @@ var ( testInterval time.Duration // Errors related to command line parsing and validation. - errExtraArgs = errors.New("extra arguments on the command line") - errNoNode = errors.New("must specify mlab-node-name") - errNoBucket = errors.New("must specify GCS bucket") - errNoExperiment = errors.New("must specify experiment") - errNoDatatype = errors.New("must specify at least one datatype") - errSchemaNums = errors.New("more schema files than datatypes") - errSchemaNoMatch = errors.New("does not match any specified datatypes") - errSchemaFilename = errors.New("is not in : format") - errValidate = errors.New("failed to validate") + errExtraArgs = errors.New("extra arguments on the command line") + errNoNode = errors.New("must specify mlab-node-name") + errNoBucket = errors.New("must specify GCS bucket") + errNoExperiment = errors.New("must specify experiment") + errNoDatatype = errors.New("must specify at least one datatype") + errSchemaNums = errors.New("more schema files than datatypes") + errSchemaNoMatch = errors.New("does not match any specified datatypes") + errSchemaFilename = errors.New("is not in : format") + errValidate = errors.New("failed to validate") + errAutoloadOrgRequired = errors.New("organization is required if not using autoload/v1 conventions") + errAutoloadOrgInvalid = errors.New("organization is not valid for autoload/v1 conventions") + errOrgName = errors.New("organization name must only contain lower case letters and numbers") + + // orgNameRegex matches valid organization names (a-z0-9, no spaces or capitals). + orgNameRegex = regexp.MustCompile(`^[a-z0-9]+$`) ) func initFlags() { @@ -59,6 +68,8 @@ func initFlags() { flag.StringVar(&bucket, "gcs-bucket", "", "required - GCS bucket name") flag.StringVar(&gcsDataDir, "gcs-data-dir", "autoload/v1", "home directory in GCS bucket under which bundles will be uploaded") flag.StringVar(&mlabNodeName, "mlab-node-name", "", "required - node name specified directly or via MLAB_NODE_NAME env variable") + flag.StringVar(&organization, "organization", "", "the organization name; required for autoload/v2 conventions") + flag.BoolVar(&uploadSchema, "upload-schema", true, "upload the local table schema if necessary") // Flags related to bundles. dtSchemaFiles = flagx.StringArray{} @@ -142,6 +153,15 @@ func parseAndValidateCLI() error { if err := validateSchemaFlags(); err != nil { return err } + if !strings.Contains(gcsDataDir, "autoload/v1") && organization == "" { + return errAutoloadOrgRequired + } + if strings.Contains(gcsDataDir, "autoload/v1") && organization != "" { + return errAutoloadOrgInvalid + } + if organization != "" && !orgNameRegex.MatchString(organization) { + return errOrgName + } return validateSchemaFiles() } diff --git a/cmd/jostler/main.go b/cmd/jostler/main.go index 234a220..b00593c 100644 --- a/cmd/jostler/main.go +++ b/cmd/jostler/main.go @@ -105,7 +105,21 @@ func daemonMode() error { // ones are a superset of the previous table. for _, datatype := range datatypes { dtSchemaFile := schema.PathForDatatype(datatype, dtSchemaFiles) - if err = schema.ValidateAndUpload(stClient, bucket, experiment, datatype, dtSchemaFile); err != nil { + // TODO(soltesz): simplify the supporting logic for the validate & upload cases. + if uploadSchema { + // For autoload/v1 conventions and authoritative autoload/v2 configurations. + err = schema.ValidateAndUpload(stClient, bucket, experiment, datatype, dtSchemaFile) + } else { + // For autoload/v2 conventions without local schema uploads. + xerr := schema.Validate(stClient, bucket, experiment, datatype, dtSchemaFile) + // Allow backward compatible local schemas. NOTE: local schemas that are new will cause an error. + if errors.Is(xerr, schema.ErrOnlyInOld) || errors.Is(xerr, schema.ErrSchemaMatch) { + err = nil + } else { + err = xerr + } + } + if err != nil { mainCancel() return fmt.Errorf("%v: %w", datatype, err) } @@ -191,8 +205,8 @@ func startUploader(mainCtx context.Context, mainCancel context.CancelFunc, statu gcsConf := uploadbundle.GCSConfig{ GCSClient: stClient, Bucket: bucket, - DataDir: filepath.Join(gcsDataDir, experiment, datatype), - IndexDir: filepath.Join(gcsDataDir, experiment, "index1"), + DataDir: filepath.Join(gcsDataDir, organization, experiment, datatype), + IndexDir: filepath.Join(gcsDataDir, organization, experiment, "index1"), BaseID: fmt.Sprintf("%s-%s-%s-%s", datatype, nameParts.Machine, nameParts.Site, experiment), } bundleConf := uploadbundle.BundleConfig{ diff --git a/cmd/jostler/main_test.go b/cmd/jostler/main_test.go index 4ac221b..4033a11 100644 --- a/cmd/jostler/main_test.go +++ b/cmd/jostler/main_test.go @@ -10,6 +10,7 @@ import ( "strings" "testing" + "github.com/m-lab/go/prometheusx" "github.com/m-lab/jostler/internal/schema" "github.com/m-lab/jostler/internal/testhelper" ) @@ -30,6 +31,9 @@ var ( // TestCLI tests non-interactive CLI invocations. func TestCLI(t *testing.T) { + // Prevent "bind: address already in use" errors during tests. + addr := ":0" + prometheusx.ListenAddress = &addr tests := []struct { name string // name of the test rmTblSchemaFile bool // if true, remove table schema file before running the test @@ -142,10 +146,11 @@ func TestCLI(t *testing.T) { "-experiment", testExperiment, "-datatype", "foo1", "-datatype-schema-file", "foo1:testdata/datatypes/foo1-valid.json", + "-gcs-data-dir=testdata/autoload/v1", }, }, { - "daemon: scenario 2", false, "", + "daemon: scenario 2", false, schema.ErrSchemaMatch.Error(), []string{ "-gcs-bucket", "newclient,download", "-mlab-node-name", testNode, @@ -153,6 +158,7 @@ func TestCLI(t *testing.T) { "-experiment", testExperiment, "-datatype", "foo1", "-datatype-schema-file", "foo1:testdata/datatypes/foo1-valid.json", + "-gcs-data-dir=testdata/autoload/v1", }, }, { @@ -164,6 +170,7 @@ func TestCLI(t *testing.T) { "-experiment", testExperiment, "-datatype", "foo1", "-datatype-schema-file", "foo1:testdata/datatypes/foo1-valid-superset.json", + "-gcs-data-dir=testdata/autoload/v1", }, }, { @@ -175,6 +182,117 @@ func TestCLI(t *testing.T) { "-experiment", testExperiment, "-datatype", "foo1", "-datatype-schema-file", "foo1:testdata/datatypes/foo1-valid.json", + "-gcs-data-dir=testdata/autoload/v1", + }, + }, + // autoload/v2 flag and configuration testing. + { + "invalid: scenario 1", false, errAutoloadOrgInvalid.Error(), + []string{ + "-gcs-bucket", "newclient,download", + "-mlab-node-name", testNode, + "-local-data-dir", testLocalDataDir, + "-experiment", testExperiment, + "-datatype", "foo1", + "-datatype-schema-file", "foo1:testdata/datatypes/foo1-valid.json", + "-gcs-data-dir=testdata/autoload/v1", + "-organization=bar1", // Should not specify organization for an autoload/v1 run. + }, + }, + { + "invalid: scenario 2", true, errAutoloadOrgRequired.Error(), + []string{ + "-gcs-bucket", "newclient,download", + "-mlab-node-name", testNode, + "-local-data-dir", testLocalDataDir, + "-experiment", testExperiment, + "-datatype", "foo1", + "-datatype-schema-file", "foo1:testdata/datatypes/foo1-valid.json", + "-gcs-data-dir=testdata/autoload/v2", // any value other than autoload/v1. + "-organization=", // Organization is required. + }, + }, + { + "invalid: scenario 3", true, errOrgName.Error(), + []string{ + "-gcs-bucket", "newclient,download", + "-mlab-node-name", testNode, + "-local-data-dir", testLocalDataDir, + "-experiment", testExperiment, + "-datatype", "foo1", + "-datatype-schema-file", "foo1:testdata/datatypes/foo1-valid.json", + "-gcs-data-dir=testdata/autoload/v2", // any value other than autoload/v1. + "-organization=INVALIDNAME", // Organization is invalid. + }, + }, + { + "valid: scenario 4 - upload authoritative new schema", true, "", + []string{ + "-gcs-bucket", "newclient,download,upload", + "-mlab-node-name", testNode, + "-local-data-dir", testLocalDataDir, + "-experiment", testExperiment, + "-datatype", "foo1", + "-datatype-schema-file", "foo1:testdata/datatypes/foo1-valid.json", + "-gcs-data-dir=testdata/autoload/v2", + "-organization=foo1org", + "-upload-schema=true", // allow uploads. + }, + }, + { + "valid: scenario 4 - allow matching schema without upload", false, "", + []string{ + "-gcs-bucket", "newclient,download,upload", + "-mlab-node-name", testNode, + "-local-data-dir", testLocalDataDir, + "-experiment", testExperiment, + "-datatype", "foo1", + "-datatype-schema-file", "foo1:testdata/datatypes/foo1-valid.json", + "-gcs-data-dir=testdata/autoload/v2", + "-organization=foo1org", + "-upload-schema=false", + }, + }, + { + "invalid: scenario 4 - cannot upload new v2 schema", false, schema.ErrNewFields.Error(), + []string{ + "-gcs-bucket", "newclient,download", + "-mlab-node-name", testNode, + "-local-data-dir", testLocalDataDir, + "-experiment", testExperiment, + "-datatype", "foo1", + "-datatype-schema-file", "foo1:testdata/datatypes/foo1-valid-superset.json", // superset schema. + "-gcs-data-dir=testdata/autoload/v2", + "-organization=foo1org", + "-upload-schema=false", + }, + }, + { + "valid: scenario 5 - upload newer authoritative new schema", true, "", + []string{ + "-gcs-bucket", "newclient,download,upload", + "-mlab-node-name", testNode, + "-local-data-dir", testLocalDataDir, + "-experiment", testExperiment, + "-datatype", "foo1", + "-datatype-schema-file", "foo1:testdata/datatypes/foo1-valid-superset.json", + "-gcs-data-dir=testdata/autoload/v2", + "-organization=foo1org", + "-upload-schema=true", // allow uploads. + }, + }, + { + "valid: scenario 5 - allow backward compatible schema", false, "", + []string{ + "-gcs-bucket", "newclient,download,upload", + "-mlab-node-name", testNode, + "-local-data-dir", testLocalDataDir, + "-experiment", testExperiment, + "-datatype", "foo1", + "-datatype-schema-file", "foo1:testdata/datatypes/foo1-valid.json", + "-gcs-data-dir=testdata/autoload/v2", + "-organization=foo1org", + "-upload-schema=false", }, }, } @@ -183,8 +301,10 @@ func TestCLI(t *testing.T) { os.RemoveAll("testdata/autoload") }() for i, test := range tests { + t.Logf("name: %s", test.name) if test.rmTblSchemaFile { os.RemoveAll("testdata/autoload/v1/tables/jostler/foo1.table.json") + os.RemoveAll("testdata/autoload/v2/tables/jostler/foo1.table.json") } var s string if test.wantErrStr == "" { @@ -196,7 +316,7 @@ func TestCLI(t *testing.T) { args := test.args // Use a local disk storage implementation that mimics downloads // from and uploads to GCS. - args = append(args, []string{"-gcs-local-disk", "-gcs-data-dir", "testdata/autoload/v1"}...) + args = append(args, []string{"-gcs-local-disk"}...) if testing.Verbose() { args = append(args, "-verbose") } diff --git a/internal/schema/schema.go b/internal/schema/schema.go index 91135b4..0691ae2 100644 --- a/internal/schema/schema.go +++ b/internal/schema/schema.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "os" + "path" "sort" "strings" @@ -36,6 +37,7 @@ var ( ErrStorageClient = errors.New("failed to create storage client") ErrReadSchema = errors.New("failed to read schema file") ErrEmptySchema = errors.New("empty schema file") + ErrInvalidSchema = errors.New("invalid schema file") ErrSchemaFromJSON = errors.New("failed to create schema from JSON") ErrMarshal = errors.New("failed to marshal schema") ErrUnmarshal = errors.New("failed to unmarshal schema") @@ -45,15 +47,17 @@ var ( ErrType = errors.New("unexpected type") ErrDownload = errors.New("failed to download schema") ErrUpload = errors.New("failed to upload schema") + ErrNewFields = errors.New("difference(s) in schema new fields") + ErrSchemaMatch = errors.New("old and new schemas match") + ErrSchemaNotFound = errors.New("schema not found") ) var ( // LocalDataDir is the root of the local directory. LocalDataDir = "/var/spool" // GCSDataDir is the left-most prefix ("root") of GCS objects. - GCSDataDir = "autoload/v1" - dtSchemaPathTemplate = "/datatypes/.json" - tblSchemaPathTemplate = "/tables//.table.json" + GCSDataDir = "autoload/v1" + dtSchemaPathTemplate = "/datatypes/.json" // Testing and debugging support. verbosef = func(fmt string, args ...interface{}) {} @@ -108,26 +112,50 @@ func CreateTableSchemaJSON(datatype, dtSchemaFile string) ([]byte, error) { return tblSchemaJSON, nil } -// ValidateAndUpload compares the current table schema against the -// previous table schema for the given datatype and returns an error -// if they are not compatibale. If the new table schema is a superset -// of the previous one, it will be uploaded to GCS. +// ValidateAndUpload compares the given table schema against the previous table +// schema for the given datatype and returns an error if they are not +// compatibale. If the new table schema is a superset of the previous one, it +// will be uploaded to GCS. func ValidateAndUpload(gcsClient DownloaderUploader, bucket, experiment, datatype, dtSchemaFile string) error { - if err := ValidateSchemaFile(dtSchemaFile); err != nil { + err := Validate(gcsClient, bucket, experiment, datatype, dtSchemaFile) + + switch { + // Upload. + case errors.Is(err, ErrSchemaNotFound): + fallthrough + case errors.Is(err, ErrNewFields): + return uploadTableSchema(gcsClient, bucket, experiment, datatype, dtSchemaFile) + + // Do not upload. + case errors.Is(err, ErrOnlyInOld): + fallthrough + case errors.Is(err, ErrInvalidSchema): + fallthrough + case errors.Is(err, ErrCompare): + fallthrough + case errors.Is(err, ErrTypeMismatch): + fallthrough + case errors.Is(err, ErrSchemaMatch): return err + default: + panic(fmt.Sprintf("unknown schema status: %v", err)) + } +} + +// Validate checks the given table schema against the previous table schema for +// various differences and returns a SchemaStatus corresponding to the +// difference. +func Validate(gcsClient DownloaderUploader, bucket, experiment, datatype, dtSchemaFile string) error { + if err := ValidateSchemaFile(dtSchemaFile); err != nil { + return fmt.Errorf("%v: %w", err, ErrInvalidSchema) } diff, err := diffTableSchemas(gcsClient, bucket, experiment, datatype, dtSchemaFile) if err != nil { if !errors.Is(err, storage.ErrObjectNotExist) { - return fmt.Errorf("%v: %w", ErrCompare, err) + return fmt.Errorf("%v: %w", err, ErrCompare) } // Scenario 1: old doesn't exist, should upload new. - verbosef("no old table schema") - return uploadTableSchema(gcsClient, bucket, experiment, datatype, dtSchemaFile) - } - if diff.nInOld != 0 { - // Scenario 4 - new incompatible with old due to missing fields, should not upload. - return fmt.Errorf("incompatible schema: %2d %w", diff.nInOld, ErrOnlyInOld) + return ErrSchemaNotFound } if diff.nType != 0 { // Scenario 4 - new incompatible with old due to field type mismatch, should not upload. @@ -136,10 +164,15 @@ func ValidateAndUpload(gcsClient DownloaderUploader, bucket, experiment, datatyp if diff.nInNew != 0 { // Scenario 3 - new is a superset of old, should upload. verbosef("%2d field(s) only in new schema", diff.nInNew) - return uploadTableSchema(gcsClient, bucket, experiment, datatype, dtSchemaFile) + return fmt.Errorf("schema differences: %2d %w", diff.nInNew, ErrNewFields) + } + if diff.nInOld != 0 { + // Scenario 4 - new incompatible with old due to missing fields in new, should not upload. + // But, the new schema remains backward compatible with the old schema, because types match and there are no new fields. + return fmt.Errorf("backward compatible schema: %2d %w", diff.nInOld, ErrOnlyInOld) } // Scenario 2 - old exists and matches new, should not upload. - return nil + return ErrSchemaMatch } // diffTableSchemas builds a new table schema for the given datatype, @@ -196,8 +229,8 @@ func diffTableSchemas(gcsClient DownloaderUploader, bucket, experiment, datatype // tblSchemPath returns the GCS object name (aka path) for the given // experiment and datatype. func tblSchemaPath(experiment, datatype string) string { - objPath := strings.Replace(tblSchemaPathTemplate, "", experiment, 1) - return GCSDataDir + strings.Replace(objPath, "", datatype, 1) + schPath := path.Join("tables", experiment, datatype) + ".table.json" + return path.Join(GCSDataDir, schPath) } // uploadTableSchema creates a table schema for the given datatype schema diff --git a/internal/schema/schema_test.go b/internal/schema/schema_test.go index 1e03439..caa19a9 100644 --- a/internal/schema/schema_test.go +++ b/internal/schema/schema_test.go @@ -155,7 +155,7 @@ func TestValidateAndUpload(t *testing.T) { experiment: testExperiment, datatype: testDatatype, dtSchemaFile: "testdata/datatypes/foo1-valid.json", - wantErr: nil, + wantErr: schema.ErrSchemaMatch, }, { name: "scenario 3 - old exists, new is a superset, should upload",