Skip to content

Commit

Permalink
sink(ticdc): use multi part s3 uploader in storage sink (#9954) (#10178)
Browse files Browse the repository at this point in the history
close #10098, close #10172
  • Loading branch information
ti-chi-bot authored Nov 30, 2023
1 parent 6c1515c commit 873a2ff
Show file tree
Hide file tree
Showing 11 changed files with 158 additions and 49 deletions.
33 changes: 28 additions & 5 deletions cdc/sinkv2/eventsink/cloudstorage/dml_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"sync/atomic"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tiflow/cdc/model"
Expand All @@ -28,6 +27,7 @@ import (
mcloudstorage "github.com/pingcap/tiflow/cdc/sinkv2/metrics/cloudstorage"
"github.com/pingcap/tiflow/engine/pkg/clock"
"github.com/pingcap/tiflow/pkg/chann"
"github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/sink/cloudstorage"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
Expand Down Expand Up @@ -237,10 +237,33 @@ func (d *dmlWorker) writeDataFile(ctx context.Context, path string, task *single
callbacks = append(callbacks, msg.Callback)
}

if err := d.statistics.RecordBatchExecution(func() (int, error) {
err := d.storage.WriteFile(ctx, path, buf.Bytes())
if err != nil {
return 0, err
if err := d.statistics.RecordBatchExecution(func() (_ int, inErr error) {
if d.config.FlushConcurrency <= 1 {
return rowsCnt, d.storage.WriteFile(ctx, path, buf.Bytes())
}

writer, inErr := d.storage.Create(ctx, path, &storage.WriterOption{
Concurrency: d.config.FlushConcurrency,
})
if inErr != nil {
return 0, inErr
}

defer func() {
closeErr := writer.Close(ctx)
if inErr != nil {
log.Error("failed to close writer", zap.Error(closeErr),
zap.Int("workerID", d.id),
zap.Any("table", task.tableInfo.TableName),
zap.String("namespace", d.changeFeedID.Namespace),
zap.String("changefeed", d.changeFeedID.ID))
if inErr == nil {
inErr = closeErr
}
}
}()
if _, inErr = writer.Write(ctx, buf.Bytes()); inErr != nil {
return 0, inErr
}
return rowsCnt, nil
}); err != nil {
Expand Down
16 changes: 15 additions & 1 deletion dm/checker/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/pingcap/tiflow/dm/pkg/utils"
onlineddl "github.com/pingcap/tiflow/dm/syncer/online-ddl-tools"
"github.com/pingcap/tiflow/dm/unit"
pd "github.com/tikv/pd/client"
"go.uber.org/atomic"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -350,7 +351,20 @@ func (c *Checker) Init(ctx context.Context) (err error) {
return err
}

builder, err := restore.NewPrecheckItemBuilderFromConfig(c.tctx.Context(), lCfg)
pdClient, err := pd.NewClientWithContext(
ctx, []string{lCfg.TiDB.PdAddr}, pd.SecurityOption{
CAPath: lCfg.Security.CAPath,
CertPath: lCfg.Security.CertPath,
KeyPath: lCfg.Security.KeyPath,
SSLCABytes: lCfg.Security.CABytes,
SSLCertBytes: lCfg.Security.CertBytes,
SSLKEYBytes: lCfg.Security.KeyBytes,
})
if err != nil {
return err
}

builder, err := restore.NewPrecheckItemBuilderFromConfig(c.tctx.Context(), lCfg, pdClient)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion engine/jobmaster/example/worker_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (w *exampleWorker) Tick(ctx context.Context) error {
return err
}

file, err := storage.BrExternalStorage().Create(ctx, strconv.Itoa(count)+".txt")
file, err := storage.BrExternalStorage().Create(ctx, strconv.Itoa(count)+".txt", nil)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions engine/pkg/externalresource/broker/broker_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func createS3ResourceForWorker(
defer cancel()
for index, fileName := range newTestFiles {
filePath := fmt.Sprintf("%s/%s/%s", creator, resName, fileName)
f, err := resStorage.Create(ctx, fileName)
f, err := resStorage.Create(ctx, fileName, nil)
require.NoError(t, err)
content := filePath + fmt.Sprintf("_index-%d", index)
// FIXME(CharlesCheung): If nothing is written, f.Close will report an error.
Expand Down Expand Up @@ -252,7 +252,7 @@ func TestIntegrationBrokerOpenNewS3Storage(t *testing.T) {
cli.AssertExpectations(t)
cli.ExpectedCalls = nil

f, err := hdl.BrExternalStorage().Create(context.Background(), "1.txt")
f, err := hdl.BrExternalStorage().Create(context.Background(), "1.txt", nil)
require.NoError(t, err)

err = f.Close(context.Background())
Expand Down
4 changes: 2 additions & 2 deletions engine/pkg/externalresource/broker/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func TestBrokerOpenNewStorage(t *testing.T) {
cli.AssertExpectations(t)
cli.ExpectedCalls = nil

f, err := hdl.BrExternalStorage().Create(context.Background(), "1.txt")
f, err := hdl.BrExternalStorage().Create(context.Background(), "1.txt", nil)
require.NoError(t, err)

err = f.Close(context.Background())
Expand Down Expand Up @@ -163,7 +163,7 @@ func TestBrokerOpenExistingStorage(t *testing.T) {

cli.AssertExpectations(t)

f, err := hdl.BrExternalStorage().Create(context.Background(), "1.txt")
f, err := hdl.BrExternalStorage().Create(context.Background(), "1.txt", nil)
require.NoError(t, err)

err = f.Close(context.Background())
Expand Down
4 changes: 2 additions & 2 deletions engine/pkg/externalresource/integration_test/gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestLocalFileTriggeredByJobRemoval(t *testing.T) {
resID)
require.NoError(t, err)

_, err = handle.BrExternalStorage().Create(context.Background(), "1.txt")
_, err = handle.BrExternalStorage().Create(context.Background(), "1.txt", nil)
require.NoError(t, err)
err = handle.Persist(ctx)
require.NoError(t, err)
Expand Down Expand Up @@ -99,7 +99,7 @@ func TestLocalFileRecordRemovedTriggeredByExecutorOffline(t *testing.T) {
"job-1",
"/local/resource-1")
require.NoError(t, err)
_, err = handle.BrExternalStorage().Create(context.Background(), "1.txt")
_, err = handle.BrExternalStorage().Create(context.Background(), "1.txt", nil)
require.NoError(t, err)
err = handle.Persist(ctx)
require.NoError(t, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func TestFileManagerBasics(t *testing.T) {

storage, err := newBrStorageForLocalFile(res1.AbsolutePath())
require.NoError(t, err)
fwriter, err := storage.Create(context.Background(), "1.txt")
fwriter, err := storage.Create(context.Background(), "1.txt", nil)
require.NoError(t, err)
err = fwriter.Close(context.Background())
require.NoError(t, err)
Expand All @@ -100,7 +100,7 @@ func TestFileManagerBasics(t *testing.T) {

storage, err = newBrStorageForLocalFile(res2.AbsolutePath())
require.NoError(t, err)
fwriter, err = storage.Create(context.Background(), "1.txt")
fwriter, err = storage.Create(context.Background(), "1.txt", nil)
require.NoError(t, err)
err = fwriter.Close(context.Background())
require.NoError(t, err)
Expand Down Expand Up @@ -146,7 +146,7 @@ func TestFileManagerManyWorkers(t *testing.T) {

storage, err := newBrStorageForLocalFile(res1.AbsolutePath())
require.NoError(t, err)
fwriter, err := storage.Create(context.Background(), "1.txt")
fwriter, err := storage.Create(context.Background(), "1.txt", nil)
require.NoError(t, err)
err = fwriter.Close(context.Background())
require.NoError(t, err)
Expand All @@ -166,7 +166,7 @@ func TestFileManagerManyWorkers(t *testing.T) {

storage, err = newBrStorageForLocalFile(res2.AbsolutePath())
require.NoError(t, err)
fwriter, err = storage.Create(context.Background(), "1.txt")
fwriter, err = storage.Create(context.Background(), "1.txt", nil)
require.NoError(t, err)
err = fwriter.Close(context.Background())
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion engine/pkg/externalresource/internal/s3/file_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ func createPlaceholderFile(ctx context.Context, storage brStorage.ExternalStorag
return errors.ErrExternalStorageAPI.GenWithStackByArgs("resource already exists")
}

writer, err := storage.Create(ctx, placeholderFileName)
writer, err := storage.Create(ctx, placeholderFileName, nil)
if err != nil {
return errors.ErrExternalStorageAPI.Wrap(err).GenWithStackByArgs("creating placeholder file")
}
Expand Down
29 changes: 20 additions & 9 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/KimMachineGun/automemlimit v0.2.4
github.com/Shopify/sarama v1.38.1
github.com/VividCortex/mysqlerr v1.0.0
github.com/aws/aws-sdk-go v1.44.48
github.com/aws/aws-sdk-go v1.44.259
github.com/benbjohnson/clock v1.3.0
github.com/bradleyjkemp/grpc-tools v0.2.5
github.com/cenkalti/backoff/v4 v4.0.2
Expand Down Expand Up @@ -58,11 +58,11 @@ require (
github.com/pingcap/check v0.0.0-20211026125417-57bd13f7b5f0
github.com/pingcap/errors v0.11.5-0.20220729040631-518f63d66278
github.com/pingcap/failpoint v0.0.0-20220423142525-ae43b7f4e5c3
github.com/pingcap/kvproto v0.0.0-20221130022225-6c56ac56fe5f
github.com/pingcap/kvproto v0.0.0-20230928035022-1bdcc25ed63c
github.com/pingcap/log v1.1.1-0.20221116035753-734d527bc87c
github.com/pingcap/tidb v1.1.0-beta.0.20230412065101-c24a1cda6d26
github.com/pingcap/tidb v1.1.0-beta.0.20231124055343-a06279f27e4c
github.com/pingcap/tidb-tools v7.0.1-0.20230410100500-0bbe5f0cd5c9+incompatible
github.com/pingcap/tidb/parser v0.0.0-20230412065101-c24a1cda6d26
github.com/pingcap/tidb/parser v0.0.0-20231124055343-a06279f27e4c
github.com/prometheus/client_golang v1.13.0
github.com/prometheus/client_model v0.2.0
github.com/r3labs/diff v1.1.0
Expand All @@ -78,9 +78,9 @@ require (
github.com/swaggo/gin-swagger v1.2.0
github.com/swaggo/swag v1.6.6-0.20200529100950-7c765ddd0476
github.com/syndtr/goleveldb v1.0.1-0.20210305035536-64b5b1c73954
github.com/tikv/client-go/v2 v2.0.4-0.20230131081004-cd83d1507d70
github.com/tikv/client-go/v2 v2.0.4-0.20231121073938-194639470f84
github.com/tikv/pd v1.1.0-beta.0.20220303060546-3695d8164800
github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07
github.com/tikv/pd/client v0.0.0-20230904040343-947701a32c05
github.com/tinylib/msgp v1.1.6
github.com/uber-go/atomic v1.4.0
github.com/vmihailenco/msgpack/v5 v5.3.5
Expand All @@ -105,8 +105,8 @@ require (
golang.org/x/sys v0.13.0
golang.org/x/text v0.13.0
golang.org/x/time v0.2.0
google.golang.org/genproto v0.0.0-20221201164419-0e50fba7f41c
google.golang.org/grpc v1.50.1
google.golang.org/genproto v0.0.0-20221202195650-67e5cbc046fd
google.golang.org/grpc v1.51.0
google.golang.org/protobuf v1.30.0
gopkg.in/yaml.v2 v2.4.0
gorm.io/driver/mysql v1.3.3
Expand All @@ -131,7 +131,7 @@ require (
github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129 // indirect
github.com/apache/thrift v0.13.1-0.20201008052519-daf620915714 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/blacktear23/go-proxyprotocol v1.0.5 // indirect
github.com/blacktear23/go-proxyprotocol v1.0.6 // indirect
github.com/bytedance/sonic v1.9.1 // indirect
github.com/cakturk/go-netstat v0.0.0-20200220111822-e5b49efee7a5 // indirect
github.com/carlmjohnson/flagext v0.21.0 // indirect
Expand Down Expand Up @@ -162,6 +162,7 @@ require (
github.com/gabriel-vasile/mimetype v1.4.2 // indirect
github.com/ghodss/yaml v1.0.0 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
github.com/go-logr/logr v1.2.0 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/go-openapi/jsonpointer v0.19.5 // indirect
github.com/go-openapi/jsonreference v0.20.0 // indirect
Expand All @@ -174,6 +175,7 @@ require (
github.com/golang/glog v1.0.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/gofuzz v1.1.0 // indirect
github.com/google/pprof v0.0.0-20211122183932-1daafda22083 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.2.0 // indirect
github.com/googleapis/gax-go/v2 v2.7.0 // indirect
Expand All @@ -185,6 +187,7 @@ require (
github.com/iancoleman/strcase v0.2.0 // indirect
github.com/improbable-eng/grpc-web v0.12.0 // indirect
github.com/inconshreveable/mousetrap v1.0.1 // indirect
github.com/influxdata/tdigest v0.0.1 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
github.com/jcmturner/gofork v1.7.6 // indirect
Expand Down Expand Up @@ -249,6 +252,7 @@ require (
github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726 // indirect
github.com/siddontang/go-log v0.0.0-20180807004314-8d05993dda07 // indirect
github.com/sirupsen/logrus v1.9.0 // indirect
github.com/spkg/bom v1.0.0 // indirect
github.com/stathat/consistent v1.0.0 // indirect
github.com/stretchr/objx v0.5.0 // indirect
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 // indirect
Expand Down Expand Up @@ -289,13 +293,20 @@ require (
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/api v0.103.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/api v0.24.0 // indirect
k8s.io/apimachinery v0.24.0 // indirect
k8s.io/klog/v2 v2.60.1 // indirect
k8s.io/utils v0.0.0-20220210201930-3a6ce19ff2f9 // indirect
modernc.org/libc v1.16.8 // indirect
modernc.org/mathutil v1.4.1 // indirect
modernc.org/memory v1.1.1 // indirect
modernc.org/sqlite v1.17.3 // indirect
sigs.k8s.io/json v0.0.0-20211208200746-9f7c6b3444d2 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.1 // indirect
sigs.k8s.io/yaml v1.2.0 // indirect
sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0 // indirect
sourcegraph.com/sourcegraph/appdash-data v0.0.0-20151005221446-73f23eafcf67 // indirect
Expand Down
Loading

0 comments on commit 873a2ff

Please sign in to comment.