Skip to content

Commit

Permalink
NO-SNOW Add better logging to TestPutGetWithAutoCompressFalse
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-pfus committed Nov 22, 2024
1 parent 11cccfd commit bac6b37
Show file tree
Hide file tree
Showing 8 changed files with 65 additions and 34 deletions.
12 changes: 12 additions & 0 deletions assert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ func assertStringContainsF(t *testing.T, actual string, expectedToContain string
fatalOnNonEmpty(t, validateStringContains(actual, expectedToContain, descriptions...))
}

func assertStringDoesNotContainE(t *testing.T, actual string, expectedNotToContain string, descriptions ...string) {
errorOnNonEmpty(t, validateStringDoesNotContain(actual, expectedNotToContain, descriptions...))
}

func assertEmptyStringE(t *testing.T, actual string, descriptions ...string) {
errorOnNonEmpty(t, validateEmptyString(actual, descriptions...))
}
Expand Down Expand Up @@ -180,6 +184,14 @@ func validateStringContains(actual string, expectedToContain string, description
return fmt.Sprintf("expected \"%s\" to contain \"%s\" but did not. %s", actual, expectedToContain, desc)
}

func validateStringDoesNotContain(actual string, expectedToNotContain string, descriptions ...string) string {
if !strings.Contains(actual, expectedToNotContain) {
return ""
}
desc := joinDescriptions(descriptions...)
return fmt.Sprintf("expected \"%s\" not to contain \"%s\" but it did. %s", actual, expectedToNotContain, desc)
}

func validateEmptyString(actual string, descriptions ...string) string {
if actual == "" {
return ""
Expand Down
6 changes: 6 additions & 0 deletions azure_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ func (util *snowflakeAzureClient) getFileHeader(meta *fileMetadata, filename str
if err != nil {
var se *azcore.ResponseError
if errors.As(err, &se) {
print("azure getFileHeader: ")
println(se.Error())
if se.ErrorCode == string(bloberror.BlobNotFound) {
meta.resStatus = notFoundFile
return nil, fmt.Errorf("could not find file")
Expand All @@ -100,6 +102,7 @@ func (util *snowflakeAzureClient) getFileHeader(meta *fileMetadata, filename str
return nil, fmt.Errorf("received 403, attempting to renew")
}
}
println("unknown azure error")
meta.resStatus = errStatus
return nil, err
}
Expand Down Expand Up @@ -233,6 +236,8 @@ func (util *snowflakeAzureClient) uploadFile(
if err != nil {
var se *azcore.ResponseError
if errors.As(err, &se) {
print("azure upload file: ")
println(se.Error())
if se.StatusCode == 403 && util.detectAzureTokenExpireError(se.RawResponse) {
meta.resStatus = renewToken
} else {
Expand All @@ -241,6 +246,7 @@ func (util *snowflakeAzureClient) uploadFile(
}
return err
}
println("Unknown azure error")
meta.resStatus = errStatus
return err
}
Expand Down
2 changes: 1 addition & 1 deletion driver_ocsp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -723,7 +723,7 @@ func TestExpiredCertificate(t *testing.T) {
// Go 1.20 throws tls CertificateVerification error
errString := urlErr.Err.Error()
// badssl sometimes times out
if !strings.Contains(errString, "certificate has expired or is not yet valid") && !strings.Contains(errString, "timeout") {
if !strings.Contains(errString, "certificate has expired or is not yet valid") && !strings.Contains(errString, "timeout") && !strings.Contains(errString, "established connection failed because connected host has failed to respond") {
t.Fatalf("failed to extract error Certificate error: %v", err)
}
}
Expand Down
4 changes: 4 additions & 0 deletions function_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,12 @@ func TestGoWrapper(t *testing.T) {
GoroutineWrapper = closeGoWrapperCalledChannel

ctx := WithAsyncMode(context.Background())
println("Staring SELECT 1 query")
rows := dbt.mustQueryContext(ctx, "SELECT 1")
assertTrueE(t, rows.Next())
defer rows.Close()
var i int
assertNilF(t, rows.Scan(&i))

assertTrueF(t, getGoWrapperCalled(), "channel should be closed, indicating our wrapper worked")
})
Expand Down
5 changes: 5 additions & 0 deletions gcs_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ func (util *snowflakeGcsClient) getFileHeader(meta *fileMetadata, filename strin
if err != nil {
return nil, err
}
print("GCS getFileHeader: ")
println(resp.StatusCode)
if resp.StatusCode != http.StatusOK {
meta.lastError = fmt.Errorf("%v", resp.Status)
meta.resStatus = errStatus
Expand Down Expand Up @@ -224,6 +226,9 @@ func (util *snowflakeGcsClient) uploadFile(
if err != nil {
return err
}
print("GCS uploadFile: ")
println(err.Error())
println(resp.StatusCode)
if resp.StatusCode != http.StatusOK {
if resp.StatusCode == 403 || resp.StatusCode == 408 || resp.StatusCode == 429 || resp.StatusCode == 500 || resp.StatusCode == 503 {
meta.lastError = fmt.Errorf("%v", resp.Status)
Expand Down
61 changes: 30 additions & 31 deletions put_get_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ func TestPutError(t *testing.T) {
t.Error(err)
}
defer func() {
assertNilF(t, f.Close())
assertNilF(t, f.Close())
}()
_, err = f.WriteString("test1")
assertNilF(t, err)
assertNilF(t, os.Chmod(file1, 0000))
defer func() {
assertNilF(t, os.Chmod(file1, 0644))
assertNilF(t, os.Chmod(file1, 0644))
}()

data := &execResponseData{
Expand Down Expand Up @@ -217,7 +217,7 @@ func TestPutLocalFile(t *testing.T) {
var s0, s1, s2, s3, s4, s5, s6, s7, s8, s9 string
rows := dbt.mustQuery("copy into gotest_putget_t1")
defer func() {
assertNilF(t, rows.Close())
assertNilF(t, rows.Close())
}()
for rows.Next() {
assertNilF(t, rows.Scan(&s0, &s1, &s2, &s3, &s4, &s5, &s6, &s7, &s8, &s9))
Expand All @@ -228,7 +228,7 @@ func TestPutLocalFile(t *testing.T) {

rows2 := dbt.mustQuery("select count(*) from gotest_putget_t1")
defer func() {
assertNilF(t, rows2.Close())
assertNilF(t, rows2.Close())
}()
var i int
if rows2.Next() {
Expand All @@ -240,7 +240,7 @@ func TestPutLocalFile(t *testing.T) {

rows3 := dbt.mustQuery(`select STATUS from information_schema .load_history where table_name='gotest_putget_t1'`)
defer func() {
assertNilF(t, rows3.Close())
assertNilF(t, rows3.Close())
}()
if rows3.Next() {
assertNilF(t, rows3.Scan(&s0, &s1, &s2, &s3, &s4, &s5, &s6, &s7, &s8, &s9))
Expand All @@ -263,7 +263,7 @@ func TestPutGetWithAutoCompressFalse(t *testing.T) {
assertNilF(t, err)
assertNilF(t, f.Sync())
defer func() {
assertNilF(t, f.Close())
assertNilF(t, f.Close())
}()

runDBTest(t, func(dbt *DBTest) {
Expand All @@ -276,15 +276,15 @@ func TestPutGetWithAutoCompressFalse(t *testing.T) {
defer dbt.mustExec("rm @~/test_put_uncompress_file")
rows := dbt.mustQuery("ls @~/test_put_uncompress_file")
defer func() {
assertNilF(t, rows.Close())
assertNilF(t, rows.Close())
}()
var file, s1, s2, s3 string
if rows.Next() {
err = rows.Scan(&file, &s1, &s2, &s3)
assertNilE(t, err)
}
assertTrueF(t, strings.Contains(file, "test_put_uncompress_file/data.txt"), fmt.Sprintf("should contain file. got: %v", file))
assertFalseF(t, strings.Contains(file, "data.txt.gz"), fmt.Sprintf("should not contain file. got: %v", file))
assertStringContainsE(t, file, "test_put_uncompress_file/data.txt")
assertStringDoesNotContainE(t, file, "data.txt.gz")

// GET test
var streamBuf bytes.Buffer
Expand All @@ -294,7 +294,7 @@ func TestPutGetWithAutoCompressFalse(t *testing.T) {
sqlText = strings.ReplaceAll(sql, "\\", "\\\\")
rows2 := dbt.mustQueryContext(ctx, sqlText)
defer func() {
assertNilF(t, rows2.Close())
assertNilF(t, rows2.Close())
}()
for rows2.Next() {
err = rows2.Scan(&file, &s1, &s2, &s3)
Expand Down Expand Up @@ -452,7 +452,7 @@ func testPutGet(t *testing.T, isStream bool) {
t.Error(err)
}
defer func() {
assertNilF(t, fileStream.Close())
assertNilF(t, fileStream.Close())
}()

var sqlText string
Expand All @@ -469,7 +469,7 @@ func testPutGet(t *testing.T, isStream bool) {
rows = dbt.mustQuery(sqlText)
}
defer func() {
assertNilF(t, rows.Close())
assertNilF(t, rows.Close())
}()

var s0, s1, s2, s3, s4, s5, s6, s7 string
Expand Down Expand Up @@ -499,7 +499,7 @@ func testPutGet(t *testing.T, isStream bool) {
sqlText = strings.ReplaceAll(sql, "\\", "\\\\")
rows2 := dbt.mustQueryContext(ctx, sqlText)
defer func() {
assertNilF(t, rows2.Close())
assertNilF(t, rows2.Close())
}()
for rows2.Next() {
if err = rows2.Scan(&s0, &s1, &s2, &s3); err != nil {
Expand All @@ -524,7 +524,7 @@ func testPutGet(t *testing.T, isStream bool) {
gz, err := gzip.NewReader(&streamBuf)
assertNilE(t, err)
defer func() {
assertNilF(t, gz.Close())
assertNilF(t, gz.Close())
}()
for {
c := make([]byte, defaultChunkBufferSize)
Expand All @@ -547,13 +547,13 @@ func testPutGet(t *testing.T, isStream bool) {
f, err := os.Open(fileName)
assertNilE(t, err)
defer func() {
assertNilF(t, f.Close())
assertNilF(t, f.Close())
}()

gz, err := gzip.NewReader(f)
assertNilE(t, err)
defer func() {
assertNilF(t, gz.Close())
assertNilF(t, gz.Close())
}()

for {
Expand Down Expand Up @@ -582,7 +582,7 @@ func TestPutGetGcsDownscopedCredential(t *testing.T) {
t.Error(err)
}
defer func() {
assertNilF(t, os.RemoveAll(tmpDir))
assertNilF(t, os.RemoveAll(tmpDir))
}()
fname := filepath.Join(tmpDir, "test_put_get.txt.gz")
originalContents := "123,test1\n456,test2\n"
Expand Down Expand Up @@ -619,7 +619,7 @@ func TestPutGetGcsDownscopedCredential(t *testing.T) {
sql, strings.ReplaceAll(fname, "\\", "\\\\"), tableName)
rows = dbt.mustQuery(sqlText)
defer func() {
assertNilF(t, rows.Close())
assertNilF(t, rows.Close())
}()

var s0, s1, s2, s3, s4, s5, s6, s7 string
Expand All @@ -645,7 +645,7 @@ func TestPutGetGcsDownscopedCredential(t *testing.T) {
sqlText = strings.ReplaceAll(sql, "\\", "\\\\")
rows2 := dbt.mustQuery(sqlText)
defer func() {
assertNilF(t, rows2.Close())
assertNilF(t, rows2.Close())
}()
for rows2.Next() {
if err = rows2.Scan(&s0, &s1, &s2, &s3); err != nil {
Expand Down Expand Up @@ -713,13 +713,12 @@ func TestPutGetLargeFile(t *testing.T) {
defer dbt.mustExec("rm @~/test_put_largefile")
rows := dbt.mustQuery("ls @~/test_put_largefile")
defer func() {
assertNilF(t, rows.Close())
assertNilF(t, rows.Close())
}()
var file, s1, s2, s3 string
if rows.Next() {
err = rows.Scan(&file, &s1, &s2, &s3)
assertNilF(t, err)
}
assertTrueF(t, rows.Next())
err = rows.Scan(&file, &s1, &s2, &s3)
assertNilF(t, err)

if !strings.Contains(file, "largefile.txt.gz") {
t.Fatalf("should contain file. got: %v", file)
Expand All @@ -733,7 +732,7 @@ func TestPutGetLargeFile(t *testing.T) {
sqlText = strings.ReplaceAll(sql, "\\", "\\\\")
rows2 := dbt.mustQueryContext(ctx, sqlText)
defer func() {
assertNilF(t, rows2.Close())
assertNilF(t, rows2.Close())
}()
for rows2.Next() {
err = rows2.Scan(&file, &s1, &s2, &s3)
Expand All @@ -751,7 +750,7 @@ func TestPutGetLargeFile(t *testing.T) {
gz, err := gzip.NewReader(&streamBuf)
assertNilE(t, err)
defer func() {
assertNilF(t, gz.Close())
assertNilF(t, gz.Close())
}()
for {
c := make([]byte, defaultChunkBufferSize)
Expand Down Expand Up @@ -809,7 +808,7 @@ func TestPutGetMaxLOBSize(t *testing.T) {
fileStream, err := os.Open(fname)
assertNilF(t, err)
defer func() {
assertNilF(t, fileStream.Close())
assertNilF(t, fileStream.Close())
}()

// test PUT command
Expand All @@ -820,7 +819,7 @@ func TestPutGetMaxLOBSize(t *testing.T) {
sql, strings.ReplaceAll(fname, "\\", "\\\\"), tableName)
rows = dbt.mustQuery(sqlText)
defer func() {
assertNilF(t, rows.Close())
assertNilF(t, rows.Close())
}()

var s0, s1, s2, s3, s4, s5, s6, s7 string
Expand All @@ -845,7 +844,7 @@ func TestPutGetMaxLOBSize(t *testing.T) {
sqlText = strings.ReplaceAll(sql, "\\", "\\\\")
rows2 := dbt.mustQuery(sqlText)
defer func() {
assertNilF(t, rows2.Close())
assertNilF(t, rows2.Close())
}()
for rows2.Next() {
err = rows2.Scan(&s0, &s1, &s2, &s3)
Expand All @@ -864,13 +863,13 @@ func TestPutGetMaxLOBSize(t *testing.T) {
assertNilE(t, err)

defer func() {
assertNilF(t, f.Close())
assertNilF(t, f.Close())
}()
gz, err := gzip.NewReader(f)
assertNilE(t, err)

defer func() {
assertNilF(t, gz.Close())
assertNilF(t, gz.Close())
}()
var contents string
for {
Expand Down
4 changes: 4 additions & 0 deletions s3_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ func (util *snowflakeS3Client) getFileHeader(meta *fileMetadata, filename string
}
out, err := s3Cli.HeadObject(context.Background(), headObjInput)
if err != nil {
print("err in S3 getFileHeader: ")
println(err.Error())
var ae smithy.APIError
if errors.As(err, &ae) {
if ae.ErrorCode() == notFound {
Expand Down Expand Up @@ -217,6 +219,8 @@ func (util *snowflakeS3Client) uploadFile(
}

if err != nil {
print("err in S3 uploadFile: ")
println(err.Error())
var ae smithy.APIError
if errors.As(err, &ae) {
if ae.ErrorCode() == expiredToken {
Expand Down
5 changes: 3 additions & 2 deletions storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,12 @@ func (rsu *remoteStorageUtil) uploadOneFile(meta *fileMetadata) error {
if !meta.overwrite {
header, err := utilClass.getFileHeader(meta, meta.dstFileName)
if meta.resStatus == notFoundFile {
err := utilClass.uploadFile(dataFile, meta, encryptMeta, maxConcurrency, meta.options.MultiPartThreshold)
if err != nil {
err2 := utilClass.uploadFile(dataFile, meta, encryptMeta, maxConcurrency, meta.options.MultiPartThreshold)
if err2 != nil {
logger.Warnf("Error uploading %v. err: %v", dataFile, err)
}
} else if err != nil {
println(err)
return err
}
if header != nil && meta.resStatus == uploaded {
Expand Down

0 comments on commit bac6b37

Please sign in to comment.