Skip to content

Commit

Permalink
sink(ticdc): use multi part s3 uploader in storage sink (#9954) (#10179)
Browse files Browse the repository at this point in the history
close #10098, close #10172
  • Loading branch information
ti-chi-bot authored Dec 8, 2023
1 parent 4895558 commit 7099e8b
Show file tree
Hide file tree
Showing 15 changed files with 201 additions and 70 deletions.
3 changes: 3 additions & 0 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,7 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
OutputColumnID: c.Sink.CloudStorageConfig.OutputColumnID,
FileExpirationDays: c.Sink.CloudStorageConfig.FileExpirationDays,
FileCleanupCronSpec: c.Sink.CloudStorageConfig.FileCleanupCronSpec,
FlushConcurrency: c.Sink.CloudStorageConfig.FlushConcurrency,
}
}

Expand Down Expand Up @@ -613,6 +614,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
OutputColumnID: cloned.Sink.CloudStorageConfig.OutputColumnID,
FileExpirationDays: cloned.Sink.CloudStorageConfig.FileExpirationDays,
FileCleanupCronSpec: cloned.Sink.CloudStorageConfig.FileCleanupCronSpec,
FlushConcurrency: cloned.Sink.CloudStorageConfig.FlushConcurrency,
}
}

Expand Down Expand Up @@ -1053,6 +1055,7 @@ type CloudStorageConfig struct {
OutputColumnID *bool `json:"output_column_id,omitempty"`
FileExpirationDays *int `json:"file_expiration_days,omitempty"`
FileCleanupCronSpec *string `json:"file_cleanup_cron_spec,omitempty"`
FlushConcurrency *int `json:"flush_concurrency,omitempty"`
}

// ChangefeedStatus holds common information of a changefeed in cdc
Expand Down
31 changes: 27 additions & 4 deletions cdc/sink/dmlsink/cloudstorage/dml_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ import (
"sync/atomic"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/metrics"
mcloudstorage "github.com/pingcap/tiflow/cdc/sink/metrics/cloudstorage"
"github.com/pingcap/tiflow/engine/pkg/clock"
"github.com/pingcap/tiflow/pkg/chann"
"github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/sink/cloudstorage"
"github.com/pingcap/tiflow/pkg/sink/codec/common"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -238,9 +238,32 @@ func (d *dmlWorker) writeDataFile(ctx context.Context, path string, task *single
}

if err := d.statistics.RecordBatchExecution(func() (int, error) {
err := d.storage.WriteFile(ctx, path, buf.Bytes())
if err != nil {
return 0, err
if d.config.FlushConcurrency <= 1 {
return rowsCnt, d.storage.WriteFile(ctx, path, buf.Bytes())
}

writer, inErr := d.storage.Create(ctx, path, &storage.WriterOption{
Concurrency: d.config.FlushConcurrency,
})
if inErr != nil {
return 0, inErr
}

defer func() {
closeErr := writer.Close(ctx)
if inErr != nil {
log.Error("failed to close writer", zap.Error(closeErr),
zap.Int("workerID", d.id),
zap.Any("table", task.tableInfo.TableName),
zap.String("namespace", d.changeFeedID.Namespace),
zap.String("changefeed", d.changeFeedID.ID))
if inErr == nil {
inErr = closeErr
}
}
}()
if _, inErr = writer.Write(ctx, buf.Bytes()); inErr != nil {
return 0, inErr
}
return rowsCnt, nil
}); err != nil {
Expand Down
17 changes: 16 additions & 1 deletion dm/checker/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
"github.com/pingcap/tiflow/dm/pkg/terror"
onlineddl "github.com/pingcap/tiflow/dm/syncer/online-ddl-tools"
"github.com/pingcap/tiflow/dm/unit"
pd "github.com/tikv/pd/client"
"go.uber.org/atomic"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -444,7 +445,20 @@ func (c *Checker) Init(ctx context.Context) (err error) {
if err != nil {
return err
}
targetInfoGetter, err := importer.NewTargetInfoGetterImpl(lCfg, targetDB)

pdClient, err := pd.NewClientWithContext(
ctx, []string{lCfg.TiDB.PdAddr}, pd.SecurityOption{
CAPath: lCfg.Security.CAPath,
CertPath: lCfg.Security.CertPath,
KeyPath: lCfg.Security.KeyPath,
SSLCABytes: lCfg.Security.CABytes,
SSLCertBytes: lCfg.Security.CertBytes,
SSLKEYBytes: lCfg.Security.KeyBytes,
})
if err != nil {
return err
}
targetInfoGetter, err := importer.NewTargetInfoGetterImpl(lCfg, targetDB, pdClient)
if err != nil {
return err
}
Expand All @@ -471,6 +485,7 @@ func (c *Checker) Init(ctx context.Context) (err error) {
dbMetas,
newLightningPrecheckAdaptor(targetInfoGetter, info),
cpdb,
pdClient,
)

if _, ok := c.checkingItems[config.LightningFreeSpaceChecking]; ok {
Expand Down
2 changes: 1 addition & 1 deletion engine/jobmaster/example/worker_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (w *exampleWorker) Tick(ctx context.Context) error {
return err
}

file, err := storage.BrExternalStorage().Create(ctx, strconv.Itoa(count)+".txt")
file, err := storage.BrExternalStorage().Create(ctx, strconv.Itoa(count)+".txt", nil)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions engine/pkg/externalresource/broker/broker_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func createS3ResourceForWorker(
defer cancel()
for index, fileName := range newTestFiles {
filePath := fmt.Sprintf("%s/%s/%s", creator, resName, fileName)
f, err := resStorage.Create(ctx, fileName)
f, err := resStorage.Create(ctx, fileName, nil)
require.NoError(t, err)
content := filePath + fmt.Sprintf("_index-%d", index)
// FIXME(CharlesCheung): If nothing is written, f.Close will report an error.
Expand Down Expand Up @@ -252,7 +252,7 @@ func TestIntegrationBrokerOpenNewS3Storage(t *testing.T) {
cli.AssertExpectations(t)
cli.ExpectedCalls = nil

f, err := hdl.BrExternalStorage().Create(context.Background(), "1.txt")
f, err := hdl.BrExternalStorage().Create(context.Background(), "1.txt", nil)
require.NoError(t, err)

err = f.Close(context.Background())
Expand Down
4 changes: 2 additions & 2 deletions engine/pkg/externalresource/broker/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func TestBrokerOpenNewStorage(t *testing.T) {
cli.AssertExpectations(t)
cli.ExpectedCalls = nil

f, err := hdl.BrExternalStorage().Create(context.Background(), "1.txt")
f, err := hdl.BrExternalStorage().Create(context.Background(), "1.txt", nil)
require.NoError(t, err)

err = f.Close(context.Background())
Expand Down Expand Up @@ -164,7 +164,7 @@ func TestBrokerOpenExistingStorage(t *testing.T) {

cli.AssertExpectations(t)

f, err := hdl.BrExternalStorage().Create(context.Background(), "1.txt")
f, err := hdl.BrExternalStorage().Create(context.Background(), "1.txt", nil)
require.NoError(t, err)

err = f.Close(context.Background())
Expand Down
4 changes: 2 additions & 2 deletions engine/pkg/externalresource/integration_test/gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestLocalFileTriggeredByJobRemoval(t *testing.T) {
resID)
require.NoError(t, err)

_, err = handle.BrExternalStorage().Create(context.Background(), "1.txt")
_, err = handle.BrExternalStorage().Create(context.Background(), "1.txt", nil)
require.NoError(t, err)
err = handle.Persist(ctx)
require.NoError(t, err)
Expand Down Expand Up @@ -99,7 +99,7 @@ func TestLocalFileRecordRemovedTriggeredByExecutorOffline(t *testing.T) {
"job-1",
"/local/resource-1")
require.NoError(t, err)
_, err = handle.BrExternalStorage().Create(context.Background(), "1.txt")
_, err = handle.BrExternalStorage().Create(context.Background(), "1.txt", nil)
require.NoError(t, err)
err = handle.Persist(ctx)
require.NoError(t, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ func createPlaceholderFile(ctx context.Context, storage brStorage.ExternalStorag
return errors.ErrExternalStorageAPI.GenWithStackByArgs("resource already exists")
}

writer, err := storage.Create(ctx, placeholderFileName)
writer, err := storage.Create(ctx, placeholderFileName, nil)
if err != nil {
return errors.ErrExternalStorageAPI.Wrap(err).GenWithStackByArgs("creating placeholder file")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func TestFileManagerBasics(t *testing.T) {

storage, err := newBrStorageForLocalFile(res1.AbsolutePath())
require.NoError(t, err)
fwriter, err := storage.Create(context.Background(), "1.txt")
fwriter, err := storage.Create(context.Background(), "1.txt", nil)
require.NoError(t, err)
err = fwriter.Close(context.Background())
require.NoError(t, err)
Expand All @@ -100,7 +100,7 @@ func TestFileManagerBasics(t *testing.T) {

storage, err = newBrStorageForLocalFile(res2.AbsolutePath())
require.NoError(t, err)
fwriter, err = storage.Create(context.Background(), "1.txt")
fwriter, err = storage.Create(context.Background(), "1.txt", nil)
require.NoError(t, err)
err = fwriter.Close(context.Background())
require.NoError(t, err)
Expand Down Expand Up @@ -146,7 +146,7 @@ func TestFileManagerManyWorkers(t *testing.T) {

storage, err := newBrStorageForLocalFile(res1.AbsolutePath())
require.NoError(t, err)
fwriter, err := storage.Create(context.Background(), "1.txt")
fwriter, err := storage.Create(context.Background(), "1.txt", nil)
require.NoError(t, err)
err = fwriter.Close(context.Background())
require.NoError(t, err)
Expand All @@ -166,7 +166,7 @@ func TestFileManagerManyWorkers(t *testing.T) {

storage, err = newBrStorageForLocalFile(res2.AbsolutePath())
require.NoError(t, err)
fwriter, err = storage.Create(context.Background(), "1.txt")
fwriter, err = storage.Create(context.Background(), "1.txt", nil)
require.NoError(t, err)
err = fwriter.Close(context.Background())
require.NoError(t, err)
Expand Down
51 changes: 33 additions & 18 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/KimMachineGun/automemlimit v0.2.4
github.com/Shopify/sarama v1.38.1
github.com/VividCortex/mysqlerr v1.0.0
github.com/aws/aws-sdk-go v1.44.48
github.com/aws/aws-sdk-go v1.44.259
github.com/benbjohnson/clock v1.3.0
github.com/bradleyjkemp/grpc-tools v0.2.5
github.com/cenkalti/backoff/v4 v4.0.2
Expand All @@ -30,7 +30,7 @@ require (
github.com/glebarez/sqlite v1.4.6
github.com/go-mysql-org/go-mysql v1.6.1-0.20221223014230-81966e15b9c5
github.com/go-ozzo/ozzo-validation/v4 v4.3.0
github.com/go-sql-driver/mysql v1.7.0
github.com/go-sql-driver/mysql v1.7.1
github.com/goccy/go-json v0.9.11
github.com/gogo/gateway v1.1.0
github.com/gogo/protobuf v1.3.2
Expand Down Expand Up @@ -62,12 +62,12 @@ require (
github.com/pingcap/check v0.0.0-20211026125417-57bd13f7b5f0
github.com/pingcap/errors v0.11.5-0.20221009092201-b66cddb77c32
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c
github.com/pingcap/kvproto v0.0.0-20230419072653-dc3cd8784a19
github.com/pingcap/kvproto v0.0.0-20231011074246-fa00d2b03372
github.com/pingcap/log v1.1.1-0.20230317032135-a0d097d16e22
github.com/pingcap/tidb v1.1.0-beta.0.20230420065519-eb77d3928398
github.com/pingcap/tidb v1.1.0-beta.0.20231207141451-a53a963a4a50
github.com/pingcap/tidb-tools v7.0.0+incompatible
github.com/pingcap/tidb/parser v0.0.0-20230420065519-eb77d3928398
github.com/prometheus/client_golang v1.15.0
github.com/pingcap/tidb/parser v0.0.0-20231207141451-a53a963a4a50
github.com/prometheus/client_golang v1.15.1
github.com/prometheus/client_model v0.3.0
github.com/r3labs/diff v1.1.0
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475
Expand All @@ -83,9 +83,9 @@ require (
github.com/swaggo/gin-swagger v1.2.0
github.com/swaggo/swag v1.8.3
github.com/syndtr/goleveldb v1.0.1-0.20210305035536-64b5b1c73954
github.com/tikv/client-go/v2 v2.0.8-0.20230419123920-35c1ee47c4f9
github.com/tikv/client-go/v2 v2.0.8-0.20231018094816-44449c0526f2
github.com/tikv/pd v1.1.0-beta.0.20230203015356-248b3f0be132
github.com/tikv/pd/client v0.0.0-20230419153320-f1d1a80feb95
github.com/tikv/pd/client v0.0.0-20230905092614-113cdedbebb6
github.com/tinylib/msgp v1.1.6
github.com/uber-go/atomic v1.4.0
github.com/vmihailenco/msgpack/v5 v5.3.5
Expand All @@ -97,7 +97,7 @@ require (
go.etcd.io/etcd/raft/v3 v3.5.2
go.etcd.io/etcd/server/v3 v3.5.2
go.etcd.io/etcd/tests/v3 v3.5.2
go.uber.org/atomic v1.10.0
go.uber.org/atomic v1.11.0
go.uber.org/dig v1.13.0
go.uber.org/goleak v1.2.1
go.uber.org/multierr v1.11.0
Expand All @@ -106,7 +106,7 @@ require (
golang.org/x/exp v0.0.0-20221023144134-a1e5550cf13e
golang.org/x/net v0.10.0
golang.org/x/oauth2 v0.8.0
golang.org/x/sync v0.1.0
golang.org/x/sync v0.2.0
golang.org/x/sys v0.8.0
golang.org/x/text v0.9.0
golang.org/x/time v0.3.0
Expand All @@ -127,6 +127,7 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/azcore v0.20.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azidentity v0.12.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v0.8.1 // indirect
github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 // indirect
github.com/DataDog/zstd v1.4.6-0.20210211175136-c6db21d202f4 // indirect
github.com/KyleBanks/depth v1.2.1 // indirect
github.com/Masterminds/semver v1.5.0 // indirect
Expand All @@ -136,7 +137,7 @@ require (
github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129 // indirect
github.com/apache/thrift v0.13.1-0.20201008052519-daf620915714 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/blacktear23/go-proxyprotocol v1.0.5 // indirect
github.com/blacktear23/go-proxyprotocol v1.0.6 // indirect
github.com/cakturk/go-netstat v0.0.0-20200220111822-e5b49efee7a5 // indirect
github.com/carlmjohnson/flagext v0.21.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
Expand Down Expand Up @@ -167,18 +168,22 @@ require (
github.com/form3tech-oss/jwt-go v3.2.5+incompatible // indirect
github.com/ghodss/yaml v1.0.0 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
github.com/go-asn1-ber/asn1-ber v1.5.4 // indirect
github.com/go-ldap/ldap/v3 v3.4.4 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/go-openapi/jsonpointer v0.19.5 // indirect
github.com/go-openapi/jsonreference v0.20.0 // indirect
github.com/go-openapi/jsonpointer v0.19.6 // indirect
github.com/go-openapi/jsonreference v0.20.1 // indirect
github.com/go-openapi/spec v0.20.6 // indirect
github.com/go-openapi/swag v0.21.1 // indirect
github.com/go-openapi/swag v0.22.3 // indirect
github.com/go-playground/locales v0.14.0 // indirect
github.com/go-playground/universal-translator v0.18.0 // indirect
github.com/go-playground/validator/v10 v10.10.0 // indirect
github.com/godbus/dbus/v5 v5.0.4 // indirect
github.com/golang/glog v1.0.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/gofuzz v1.1.0 // indirect
github.com/google/pprof v0.0.0-20211122183932-1daafda22083 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.2.3 // indirect
github.com/googleapis/gax-go/v2 v2.7.1 // indirect
Expand All @@ -189,7 +194,8 @@ require (
github.com/hashicorp/go-uuid v1.0.3 // indirect
github.com/iancoleman/strcase v0.2.0 // indirect
github.com/improbable-eng/grpc-web v0.12.0 // indirect
github.com/inconshreveable/mousetrap v1.0.1 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/influxdata/tdigest v0.0.1 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
github.com/jcmturner/gofork v1.7.6 // indirect
Expand Down Expand Up @@ -243,7 +249,7 @@ require (
github.com/prometheus/procfs v0.9.0 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20220927061507-ef77025ab5aa // indirect
github.com/rivo/uniseg v0.4.4 // indirect
github.com/rogpeppe/go-internal v1.9.0 // indirect
github.com/rogpeppe/go-internal v1.10.0 // indirect
github.com/rs/cors v1.7.0 // indirect
github.com/sasha-s/go-deadlock v0.2.0 // indirect
github.com/shoenig/go-m1cpu v0.1.5 // indirect
Expand Down Expand Up @@ -288,17 +294,24 @@ require (
go.opentelemetry.io/proto/otlp v0.7.0 // indirect
golang.org/x/crypto v0.8.0 // indirect
golang.org/x/term v0.8.0 // indirect
golang.org/x/tools v0.8.0 // indirect
golang.org/x/tools v0.9.1 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/api v0.114.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/api v0.27.2 // indirect
k8s.io/apimachinery v0.27.2 // indirect
k8s.io/klog/v2 v2.90.1 // indirect
k8s.io/utils v0.0.0-20230209194617-a36077c30491 // indirect
modernc.org/libc v1.16.8 // indirect
modernc.org/mathutil v1.4.1 // indirect
modernc.org/mathutil v1.5.0 // indirect
modernc.org/memory v1.1.1 // indirect
modernc.org/sqlite v1.17.3 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect
sigs.k8s.io/yaml v1.3.0 // indirect
sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0 // indirect
sourcegraph.com/sourcegraph/appdash-data v0.0.0-20151005221446-73f23eafcf67 // indirect
Expand All @@ -312,3 +325,5 @@ replace github.com/benbjohnson/clock v1.3.0 => github.com/benbjohnson/clock v1.1

// copy from TiDB
replace go.opencensus.io => go.opencensus.io v0.23.1-0.20220331163232-052120675fac

replace github.com/go-ldap/ldap/v3 => github.com/YangKeao/ldap/v3 v3.4.5-0.20230421065457-369a3bab1117
Loading

0 comments on commit 7099e8b

Please sign in to comment.