Skip to content

Commit

Permalink
Add support for autoload/v2 conventions to jostler (#51)
Browse files Browse the repository at this point in the history
* Add organization flag

* Add tests for organization options

* Separate schema validation from uploads

* Update organization flag help and validation

* Place backward compatible check last

* Add -upload-schema flag

* Update comments

* Add new test cases for invalid orgs and upload

* Update coverage flags

* Atomic coverage mode

* Clarify flag help text

* Simplify

* Use predefined error

* Count

* Remove old v2 table also

* Use distinct errors for Validate cases

* Add new backward compatible test case

* Use port :0 to prevent bind errors

* Add TODO for simplifying support logic
  • Loading branch information
stephen-soltesz authored Oct 20, 2023
1 parent 83ea852 commit 37352d7
Show file tree
Hide file tree
Showing 6 changed files with 222 additions and 35 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ script:
- go vet ./...
- CGO_ENABLED=0 go build ./...
- go test ./... -race
- go test ./... -v -coverprofile=_coverage.cov
- go test ./... -v -covermode=count -coverprofile=_coverage.cov

after_success:
# Coveralls
Expand Down
38 changes: 29 additions & 9 deletions cmd/jostler/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"flag"
"fmt"
"regexp"
"strings"
"time"

Expand All @@ -22,6 +23,8 @@ var (
bucket string
gcsDataDir string
mlabNodeName string
organization string
uploadSchema bool = true

// Flags related to bundles.
dtSchemaFiles flagx.StringArray
Expand All @@ -43,22 +46,30 @@ var (
testInterval time.Duration

// Errors related to command line parsing and validation.
errExtraArgs = errors.New("extra arguments on the command line")
errNoNode = errors.New("must specify mlab-node-name")
errNoBucket = errors.New("must specify GCS bucket")
errNoExperiment = errors.New("must specify experiment")
errNoDatatype = errors.New("must specify at least one datatype")
errSchemaNums = errors.New("more schema files than datatypes")
errSchemaNoMatch = errors.New("does not match any specified datatypes")
errSchemaFilename = errors.New("is not in <datatype>:<pathname> format")
errValidate = errors.New("failed to validate")
errExtraArgs = errors.New("extra arguments on the command line")
errNoNode = errors.New("must specify mlab-node-name")
errNoBucket = errors.New("must specify GCS bucket")
errNoExperiment = errors.New("must specify experiment")
errNoDatatype = errors.New("must specify at least one datatype")
errSchemaNums = errors.New("more schema files than datatypes")
errSchemaNoMatch = errors.New("does not match any specified datatypes")
errSchemaFilename = errors.New("is not in <datatype>:<pathname> format")
errValidate = errors.New("failed to validate")
errAutoloadOrgRequired = errors.New("organization is required if not using autoload/v1 conventions")
errAutoloadOrgInvalid = errors.New("organization is not valid for autoload/v1 conventions")
errOrgName = errors.New("organization name must only contain lower case letters and numbers")

// orgNameRegex matches valid organization names (a-z0-9, no spaces or capitals).
orgNameRegex = regexp.MustCompile(`^[a-z0-9]+$`)
)

func initFlags() {
// Flags related to GCS.
flag.StringVar(&bucket, "gcs-bucket", "", "required - GCS bucket name")
flag.StringVar(&gcsDataDir, "gcs-data-dir", "autoload/v1", "home directory in GCS bucket under which bundles will be uploaded")
flag.StringVar(&mlabNodeName, "mlab-node-name", "", "required - node name specified directly or via MLAB_NODE_NAME env variable")
flag.StringVar(&organization, "organization", "", "the organization name; required for autoload/v2 conventions")
flag.BoolVar(&uploadSchema, "upload-schema", true, "upload the local table schema if necessary")

// Flags related to bundles.
dtSchemaFiles = flagx.StringArray{}
Expand Down Expand Up @@ -142,6 +153,15 @@ func parseAndValidateCLI() error {
if err := validateSchemaFlags(); err != nil {
return err
}
if !strings.Contains(gcsDataDir, "autoload/v1") && organization == "" {
return errAutoloadOrgRequired
}
if strings.Contains(gcsDataDir, "autoload/v1") && organization != "" {
return errAutoloadOrgInvalid
}
if organization != "" && !orgNameRegex.MatchString(organization) {
return errOrgName
}
return validateSchemaFiles()
}

Expand Down
20 changes: 17 additions & 3 deletions cmd/jostler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,21 @@ func daemonMode() error {
// ones are a superset of the previous table.
for _, datatype := range datatypes {
dtSchemaFile := schema.PathForDatatype(datatype, dtSchemaFiles)
if err = schema.ValidateAndUpload(stClient, bucket, experiment, datatype, dtSchemaFile); err != nil {
// TODO(soltesz): simplify the supporting logic for the validate & upload cases.
if uploadSchema {
// For autoload/v1 conventions and authoritative autoload/v2 configurations.
err = schema.ValidateAndUpload(stClient, bucket, experiment, datatype, dtSchemaFile)
} else {
// For autoload/v2 conventions without local schema uploads.
xerr := schema.Validate(stClient, bucket, experiment, datatype, dtSchemaFile)
// Allow backward compatible local schemas. NOTE: local schemas that are new will cause an error.
if errors.Is(xerr, schema.ErrOnlyInOld) || errors.Is(xerr, schema.ErrSchemaMatch) {
err = nil
} else {
err = xerr
}
}
if err != nil {
mainCancel()
return fmt.Errorf("%v: %w", datatype, err)
}
Expand Down Expand Up @@ -191,8 +205,8 @@ func startUploader(mainCtx context.Context, mainCancel context.CancelFunc, statu
gcsConf := uploadbundle.GCSConfig{
GCSClient: stClient,
Bucket: bucket,
DataDir: filepath.Join(gcsDataDir, experiment, datatype),
IndexDir: filepath.Join(gcsDataDir, experiment, "index1"),
DataDir: filepath.Join(gcsDataDir, organization, experiment, datatype),
IndexDir: filepath.Join(gcsDataDir, organization, experiment, "index1"),
BaseID: fmt.Sprintf("%s-%s-%s-%s", datatype, nameParts.Machine, nameParts.Site, experiment),
}
bundleConf := uploadbundle.BundleConfig{
Expand Down
124 changes: 122 additions & 2 deletions cmd/jostler/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"strings"
"testing"

"github.com/m-lab/go/prometheusx"
"github.com/m-lab/jostler/internal/schema"
"github.com/m-lab/jostler/internal/testhelper"
)
Expand All @@ -30,6 +31,9 @@ var (

// TestCLI tests non-interactive CLI invocations.
func TestCLI(t *testing.T) {
// Prevent "bind: address already in use" errors during tests.
addr := ":0"
prometheusx.ListenAddress = &addr
tests := []struct {
name string // name of the test
rmTblSchemaFile bool // if true, remove table schema file before running the test
Expand Down Expand Up @@ -142,17 +146,19 @@ func TestCLI(t *testing.T) {
"-experiment", testExperiment,
"-datatype", "foo1",
"-datatype-schema-file", "foo1:testdata/datatypes/foo1-valid.json",
"-gcs-data-dir=testdata/autoload/v1",
},
},
{
"daemon: scenario 2", false, "",
"daemon: scenario 2", false, schema.ErrSchemaMatch.Error(),
[]string{
"-gcs-bucket", "newclient,download",
"-mlab-node-name", testNode,
"-local-data-dir", testLocalDataDir,
"-experiment", testExperiment,
"-datatype", "foo1",
"-datatype-schema-file", "foo1:testdata/datatypes/foo1-valid.json",
"-gcs-data-dir=testdata/autoload/v1",
},
},
{
Expand All @@ -164,6 +170,7 @@ func TestCLI(t *testing.T) {
"-experiment", testExperiment,
"-datatype", "foo1",
"-datatype-schema-file", "foo1:testdata/datatypes/foo1-valid-superset.json",
"-gcs-data-dir=testdata/autoload/v1",
},
},
{
Expand All @@ -175,6 +182,117 @@ func TestCLI(t *testing.T) {
"-experiment", testExperiment,
"-datatype", "foo1",
"-datatype-schema-file", "foo1:testdata/datatypes/foo1-valid.json",
"-gcs-data-dir=testdata/autoload/v1",
},
},
// autoload/v2 flag and configuration testing.
{
"invalid: scenario 1", false, errAutoloadOrgInvalid.Error(),
[]string{
"-gcs-bucket", "newclient,download",
"-mlab-node-name", testNode,
"-local-data-dir", testLocalDataDir,
"-experiment", testExperiment,
"-datatype", "foo1",
"-datatype-schema-file", "foo1:testdata/datatypes/foo1-valid.json",
"-gcs-data-dir=testdata/autoload/v1",
"-organization=bar1", // Should not specify organization for an autoload/v1 run.
},
},
{
"invalid: scenario 2", true, errAutoloadOrgRequired.Error(),
[]string{
"-gcs-bucket", "newclient,download",
"-mlab-node-name", testNode,
"-local-data-dir", testLocalDataDir,
"-experiment", testExperiment,
"-datatype", "foo1",
"-datatype-schema-file", "foo1:testdata/datatypes/foo1-valid.json",
"-gcs-data-dir=testdata/autoload/v2", // any value other than autoload/v1.
"-organization=", // Organization is required.
},
},
{
"invalid: scenario 3", true, errOrgName.Error(),
[]string{
"-gcs-bucket", "newclient,download",
"-mlab-node-name", testNode,
"-local-data-dir", testLocalDataDir,
"-experiment", testExperiment,
"-datatype", "foo1",
"-datatype-schema-file", "foo1:testdata/datatypes/foo1-valid.json",
"-gcs-data-dir=testdata/autoload/v2", // any value other than autoload/v1.
"-organization=INVALIDNAME", // Organization is invalid.
},
},
{
"valid: scenario 4 - upload authoritative new schema", true, "",
[]string{
"-gcs-bucket", "newclient,download,upload",
"-mlab-node-name", testNode,
"-local-data-dir", testLocalDataDir,
"-experiment", testExperiment,
"-datatype", "foo1",
"-datatype-schema-file", "foo1:testdata/datatypes/foo1-valid.json",
"-gcs-data-dir=testdata/autoload/v2",
"-organization=foo1org",
"-upload-schema=true", // allow uploads.
},
},
{
"valid: scenario 4 - allow matching schema without upload", false, "",
[]string{
"-gcs-bucket", "newclient,download,upload",
"-mlab-node-name", testNode,
"-local-data-dir", testLocalDataDir,
"-experiment", testExperiment,
"-datatype", "foo1",
"-datatype-schema-file", "foo1:testdata/datatypes/foo1-valid.json",
"-gcs-data-dir=testdata/autoload/v2",
"-organization=foo1org",
"-upload-schema=false",
},
},
{
"invalid: scenario 4 - cannot upload new v2 schema", false, schema.ErrNewFields.Error(),
[]string{
"-gcs-bucket", "newclient,download",
"-mlab-node-name", testNode,
"-local-data-dir", testLocalDataDir,
"-experiment", testExperiment,
"-datatype", "foo1",
"-datatype-schema-file", "foo1:testdata/datatypes/foo1-valid-superset.json", // superset schema.
"-gcs-data-dir=testdata/autoload/v2",
"-organization=foo1org",
"-upload-schema=false",
},
},
{
"valid: scenario 5 - upload newer authoritative new schema", true, "",
[]string{
"-gcs-bucket", "newclient,download,upload",
"-mlab-node-name", testNode,
"-local-data-dir", testLocalDataDir,
"-experiment", testExperiment,
"-datatype", "foo1",
"-datatype-schema-file", "foo1:testdata/datatypes/foo1-valid-superset.json",
"-gcs-data-dir=testdata/autoload/v2",
"-organization=foo1org",
"-upload-schema=true", // allow uploads.
},
},
{
"valid: scenario 5 - allow backward compatible schema", false, "",
[]string{
"-gcs-bucket", "newclient,download,upload",
"-mlab-node-name", testNode,
"-local-data-dir", testLocalDataDir,
"-experiment", testExperiment,
"-datatype", "foo1",
"-datatype-schema-file", "foo1:testdata/datatypes/foo1-valid.json",
"-gcs-data-dir=testdata/autoload/v2",
"-organization=foo1org",
"-upload-schema=false",
},
},
}
Expand All @@ -183,8 +301,10 @@ func TestCLI(t *testing.T) {
os.RemoveAll("testdata/autoload")
}()
for i, test := range tests {
t.Logf("name: %s", test.name)
if test.rmTblSchemaFile {
os.RemoveAll("testdata/autoload/v1/tables/jostler/foo1.table.json")
os.RemoveAll("testdata/autoload/v2/tables/jostler/foo1.table.json")
}
var s string
if test.wantErrStr == "" {
Expand All @@ -196,7 +316,7 @@ func TestCLI(t *testing.T) {
args := test.args
// Use a local disk storage implementation that mimics downloads
// from and uploads to GCS.
args = append(args, []string{"-gcs-local-disk", "-gcs-data-dir", "testdata/autoload/v1"}...)
args = append(args, []string{"-gcs-local-disk"}...)
if testing.Verbose() {
args = append(args, "-verbose")
}
Expand Down
Loading

0 comments on commit 37352d7

Please sign in to comment.