Skip to content

Commit

Permalink
Simplify the ValidateAndUpload logic (#52)
Browse files Browse the repository at this point in the history
* Simplify ValidateAndUpload logic
* Make validate a local function
* Add test case for no upload
  • Loading branch information
stephen-soltesz authored Nov 8, 2023
1 parent 4f3c9f3 commit c44b5f6
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 43 deletions.
15 changes: 1 addition & 14 deletions cmd/jostler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,20 +105,7 @@ func daemonMode() error {
// ones are a superset of the previous table.
for _, datatype := range datatypes {
dtSchemaFile := schema.PathForDatatype(datatype, dtSchemaFiles)
// TODO(soltesz): simplify the supporting logic for the validate & upload cases.
if uploadSchema {
// For autoload/v1 conventions and authoritative autoload/v2 configurations.
err = schema.ValidateAndUpload(stClient, bucket, experiment, datatype, dtSchemaFile)
} else {
// For autoload/v2 conventions without local schema uploads.
xerr := schema.Validate(stClient, bucket, experiment, datatype, dtSchemaFile)
// Allow backward compatible local schemas. NOTE: local schemas that are new will cause an error.
if errors.Is(xerr, schema.ErrOnlyInOld) || errors.Is(xerr, schema.ErrSchemaMatch) {
err = nil
} else {
err = xerr
}
}
err := schema.ValidateAndUpload(stClient, bucket, experiment, datatype, dtSchemaFile, uploadSchema)
if err != nil {
mainCancel()
return fmt.Errorf("%v: %w", datatype, err)
Expand Down
2 changes: 1 addition & 1 deletion cmd/jostler/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func TestCLI(t *testing.T) {
},
},
{
"daemon: scenario 2", false, schema.ErrSchemaMatch.Error(),
"daemon: scenario 2", false, "",
[]string{
"-gcs-bucket", "newclient,download",
"-mlab-node-name", testNode,
Expand Down
44 changes: 18 additions & 26 deletions internal/schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,36 +116,28 @@ func CreateTableSchemaJSON(datatype, dtSchemaFile string) ([]byte, error) {
// schema for the given datatype and returns an error if they are not
// compatibale. If the new table schema is a superset of the previous one, it
// will be uploaded to GCS.
func ValidateAndUpload(gcsClient DownloaderUploader, bucket, experiment, datatype, dtSchemaFile string) error {
err := Validate(gcsClient, bucket, experiment, datatype, dtSchemaFile)

switch {
// Upload.
case errors.Is(err, ErrSchemaNotFound):
fallthrough
case errors.Is(err, ErrNewFields):
return uploadTableSchema(gcsClient, bucket, experiment, datatype, dtSchemaFile)

// Do not upload.
case errors.Is(err, ErrOnlyInOld):
fallthrough
case errors.Is(err, ErrInvalidSchema):
fallthrough
case errors.Is(err, ErrCompare):
fallthrough
case errors.Is(err, ErrTypeMismatch):
fallthrough
case errors.Is(err, ErrSchemaMatch):
return err
default:
panic(fmt.Sprintf("unknown schema status: %v", err))
func ValidateAndUpload(gcsClient DownloaderUploader, bucket, experiment, datatype, dtSchemaFile string, uploadSchema bool) error {
err := validate(gcsClient, bucket, experiment, datatype, dtSchemaFile)
if uploadSchema && (errors.Is(err, ErrSchemaNotFound) || errors.Is(err, ErrNewFields)) {
// For autoload/v1 conventions and authoritative autoload/v2 configurations.
// Upload when the schema is not found or there are new local fields in the schema.
err = uploadTableSchema(gcsClient, bucket, experiment, datatype, dtSchemaFile)
} else if !uploadSchema && errors.Is(err, ErrOnlyInOld) {
// For autoload/v2 conventions without local schema uploads.
// Allow backward compatible local schemas.
err = nil
}
// In all cases, allow matching schemas.
if errors.Is(err, ErrSchemaMatch) {
err = nil
}
return err
}

// Validate checks the given table schema against the previous table schema for
// validate checks the given table schema against the previous table schema for
// various differences and returns a SchemaStatus corresponding to the
// difference.
func Validate(gcsClient DownloaderUploader, bucket, experiment, datatype, dtSchemaFile string) error {
func validate(gcsClient DownloaderUploader, bucket, experiment, datatype, dtSchemaFile string) error {
if err := ValidateSchemaFile(dtSchemaFile); err != nil {
return fmt.Errorf("%v: %w", err, ErrInvalidSchema)
}
Expand Down Expand Up @@ -234,7 +226,7 @@ func tblSchemaPath(experiment, datatype string) string {
}

// uploadTableSchema creates a table schema for the given datatype schema
// and uploads it to GCS.
// and uploads it to GCS. uploadTableSchema does not validate the schema.
func uploadTableSchema(gcsClient DownloaderUploader, bucket, experiment, datatype, dtSchemaFile string) error {
ctx := context.Background()
tblSchemaJSON, err := CreateTableSchemaJSON(datatype, dtSchemaFile)
Expand Down
26 changes: 24 additions & 2 deletions internal/schema/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func TestValidateAndUpload(t *testing.T) {
experiment string
datatype string
dtSchemaFile string
uploadSchema bool

wantErr error
}{
Expand All @@ -95,6 +96,7 @@ func TestValidateAndUpload(t *testing.T) {
experiment: testExperiment,
datatype: testDatatype,
dtSchemaFile: "testdata/datatypes/non-existent.json", // this file doesn't exist
uploadSchema: true,
wantErr: schema.ErrReadSchema,
},
{
Expand All @@ -105,6 +107,7 @@ func TestValidateAndUpload(t *testing.T) {
experiment: testExperiment,
datatype: testDatatype,
dtSchemaFile: "testdata/datatypes/foo1-invalid.json", // this file doesn't exist
uploadSchema: true,
wantErr: schema.ErrUnmarshal,
},
{
Expand All @@ -115,6 +118,7 @@ func TestValidateAndUpload(t *testing.T) {
experiment: testExperiment,
datatype: testDatatype,
dtSchemaFile: "testdata/datatypes/foo1-valid.json",
uploadSchema: true,
wantErr: schema.ErrStorageClient,
},
{
Expand All @@ -125,6 +129,7 @@ func TestValidateAndUpload(t *testing.T) {
experiment: testExperiment,
datatype: testDatatype,
dtSchemaFile: "testdata/datatypes/foo1-valid.json",
uploadSchema: true,
wantErr: schema.ErrUpload,
},
{
Expand All @@ -135,6 +140,7 @@ func TestValidateAndUpload(t *testing.T) {
experiment: testExperiment,
datatype: testDatatype,
dtSchemaFile: "testdata/datatypes/foo1-valid.json",
uploadSchema: true,
wantErr: nil,
},
{
Expand All @@ -145,6 +151,7 @@ func TestValidateAndUpload(t *testing.T) {
experiment: testExperiment,
datatype: testDatatype,
dtSchemaFile: "testdata/datatypes/foo1-valid.json",
uploadSchema: true,
wantErr: schema.ErrDownload,
},
{
Expand All @@ -155,7 +162,8 @@ func TestValidateAndUpload(t *testing.T) {
experiment: testExperiment,
datatype: testDatatype,
dtSchemaFile: "testdata/datatypes/foo1-valid.json",
wantErr: schema.ErrSchemaMatch,
uploadSchema: true,
wantErr: nil,
},
{
name: "scenario 3 - old exists, new is a superset, should upload",
Expand All @@ -165,6 +173,7 @@ func TestValidateAndUpload(t *testing.T) {
experiment: testExperiment,
datatype: testDatatype,
dtSchemaFile: "testdata/datatypes/foo1-valid-superset.json",
uploadSchema: true,
wantErr: nil,
},
{
Expand All @@ -175,6 +184,7 @@ func TestValidateAndUpload(t *testing.T) {
experiment: testExperiment,
datatype: testDatatype,
dtSchemaFile: "testdata/datatypes/foo1-valid.json",
uploadSchema: true,
wantErr: schema.ErrOnlyInOld,
},
{
Expand All @@ -185,8 +195,20 @@ func TestValidateAndUpload(t *testing.T) {
experiment: testExperiment,
datatype: testDatatype,
dtSchemaFile: "testdata/datatypes/foo1-incompatible.json",
uploadSchema: true,
wantErr: schema.ErrTypeMismatch,
},
{
name: "scenario 5 no-upload - old exists, new is backward-compatible - should succeed",
tblSchemaFile: "autoload/v1/tables/jostler/foo1.table.json",
rmTblSchemaFile: false,
bucket: "newclient,download",
experiment: testExperiment,
datatype: testDatatype,
dtSchemaFile: "testdata/datatypes/foo1-valid.json",
uploadSchema: false,
wantErr: nil,
},
}

defer func() {
Expand All @@ -212,7 +234,7 @@ func TestValidateAndUpload(t *testing.T) {
}
t.Fatalf("testhelper.NewClient() = %v, wanted nil", err)
}
gotErr := schema.ValidateAndUpload(stClient, test.bucket, test.experiment, test.datatype, test.dtSchemaFile)
gotErr := schema.ValidateAndUpload(stClient, test.bucket, test.experiment, test.datatype, test.dtSchemaFile, test.uploadSchema)
t.Logf("%s>>> gotErr=%v%s\n\n", testhelper.ANSIPurple, gotErr, testhelper.ANSIEnd)
if gotErr == nil && test.wantErr == nil {
continue
Expand Down

0 comments on commit c44b5f6

Please sign in to comment.