Skip to content

Commit

Permalink
*(ticdc): do not print password in cdc log (#9691) (#9727)
Browse files Browse the repository at this point in the history
close #9690
  • Loading branch information
ti-chi-bot authored Sep 14, 2023
1 parent d2bd195 commit b9dcc64
Show file tree
Hide file tree
Showing 13 changed files with 152 additions and 27 deletions.
11 changes: 3 additions & 8 deletions cdc/api/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,13 +299,6 @@ func (h *OpenAPI) CreateChangefeed(c *gin.Context) {
_ = c.Error(err)
return
}

infoStr, err := info.Marshal()
if err != nil {
_ = c.Error(err)
return
}

upstreamInfo := &model.UpstreamInfo{
ID: up.ID,
PDEndpoints: strings.Join(up.PdEndpoints, ","),
Expand All @@ -322,7 +315,9 @@ func (h *OpenAPI) CreateChangefeed(c *gin.Context) {
return
}

log.Info("Create changefeed successfully!", zap.String("id", changefeedConfig.ID), zap.String("changefeed", infoStr))
log.Info("Create changefeed successfully!",
zap.String("id", changefeedConfig.ID),
zap.String("changefeed", info.String()))
c.Status(http.StatusAccepted)
}

Expand Down
8 changes: 1 addition & 7 deletions cdc/api/v2/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,6 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) {
CAPath: cfg.CAPath,
CertAllowedCN: cfg.CertAllowedCN,
}
infoStr, err := info.Marshal()
if err != nil {
needRemoveGCSafePoint = true
_ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err))
return
}

// cannot create changefeed if there are running lightning/restore tasks
tlsCfg, err := credential.ToTLSConfig()
Expand Down Expand Up @@ -168,7 +162,7 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) {

log.Info("Create changefeed successfully!",
zap.String("id", info.ID),
zap.String("changefeed", infoStr))
zap.String("changefeed", info.String()))
c.JSON(http.StatusOK, toAPIModel(info,
info.StartTs, info.StartTs,
nil, true))
Expand Down
17 changes: 11 additions & 6 deletions cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ type ChangeFeedID struct {
ID string
}

// String implements fmt.Stringer interface
func (c ChangeFeedID) String() string {
return c.Namespace + "/" + c.ID
}

// DefaultChangeFeedID returns `ChangeFeedID` with default namespace
func DefaultChangeFeedID(id string) ChangeFeedID {
return ChangeFeedID{
Expand Down Expand Up @@ -216,9 +221,9 @@ func (info *ChangeFeedInfo) String() (str string) {
return
}

clone.SinkURI, err = util.MaskSinkURI(clone.SinkURI)
if err != nil {
log.Error("failed to marshal changefeed info", zap.Error(err))
clone.SinkURI = util.MaskSensitiveDataInURI(clone.SinkURI)
if clone.Config != nil {
clone.Config.MaskSensitiveData()
}

str, err = clone.Marshal()
Expand Down Expand Up @@ -471,11 +476,11 @@ func (info *ChangeFeedInfo) fixMQSinkProtocol() {
}

func (info *ChangeFeedInfo) updateSinkURIAndConfigProtocol(uri *url.URL, newProtocol string, newQuery url.Values) {
oldRawQuery := uri.RawQuery
newRawQuery := newQuery.Encode()
maskedURI, _ := util.MaskSinkURI(uri.String())
log.Info("handle incompatible protocol from sink URI",
zap.String("oldUriQuery", oldRawQuery),
zap.String("fixedUriQuery", newQuery.Encode()))
zap.String("oldURI", maskedURI),
zap.String("newProtocol", newProtocol))

uri.RawQuery = newRawQuery
fixedSinkURI := uri.String()
Expand Down
4 changes: 2 additions & 2 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,7 @@ LOOP2:
zap.Uint64("changefeedEpoch", epoch),
zap.Uint64("checkpointTs", checkpointTs),
zap.Uint64("resolvedTs", c.resolvedTs),
zap.Stringer("info", c.state.Info))
zap.String("info", c.state.Info.String()))

return nil
}
Expand Down Expand Up @@ -720,7 +720,7 @@ func (c *changefeed) releaseResources(ctx cdcContext.Context) {
zap.String("namespace", c.id.Namespace),
zap.String("changefeed", c.id.ID),
zap.Any("status", c.state.Status),
zap.Stringer("info", c.state.Info),
zap.String("info", c.state.Info.String()),
zap.Bool("isRemoved", c.isRemoved))
}

Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/ddlsink/mq/kafka_ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ func NewKafkaDDLSink(
}

start := time.Now()
log.Info("Try to create a DDL sink producer", zap.Any("options", options))

log.Info("Try to create a DDL sink producer", zap.Any("options", options))
syncProducer, err := factory.SyncProducer(ctx)
if err != nil {
return nil, errors.Trace(err)
Expand Down
4 changes: 2 additions & 2 deletions cdc/syncpointstore/mysql_syncpoint_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func newMySQLSyncPointStore(
return nil, cerror.ErrMySQLConnectionError.Wrap(err).GenWithStack("fail to open MySQL connection")
}

log.Info("Start mysql syncpoint sink")
log.Info("Start mysql syncpoint sink", zap.String("changefeed", id.String()))

return &mysqlSyncPointStore{
db: syncDB,
Expand Down Expand Up @@ -146,7 +146,7 @@ func (s *mysqlSyncPointStore) SinkSyncPoint(ctx context.Context,
var secondaryTs string
err = row.Scan(&secondaryTs)
if err != nil {
log.Info("sync table: get tidb_current_ts err")
log.Info("sync table: get tidb_current_ts err", zap.String("changefeed", id.String()))
err2 := tx.Rollback()
if err2 != nil {
log.Error("failed to write syncpoint table", zap.Error(err))
Expand Down
6 changes: 6 additions & 0 deletions pkg/config/consistent.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/pingcap/tidb/br/pkg/storage"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/redo"
"github.com/pingcap/tiflow/pkg/util"
)

// ConsistentConfig represents replication consistency config for a changefeed.
Expand Down Expand Up @@ -56,3 +57,8 @@ func (c *ConsistentConfig) ValidateAndAdjust() error {
}
return redo.ValidateStorage(uri)
}

// MaskSensitiveData masks sensitive data in ConsistentConfig
func (c *ConsistentConfig) MaskSensitiveData() {
c.Storage = util.MaskSensitiveDataInURI(c.Storage)
}
10 changes: 10 additions & 0 deletions pkg/config/replica_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,3 +322,13 @@ func (c *ReplicaConfig) AdjustEnableOldValueAndVerifyForceReplicate(sinkURI *url

return nil
}

// MaskSensitiveData masks sensitive data in ReplicaConfig
func (c *ReplicaConfig) MaskSensitiveData() {
if c.Sink != nil {
c.Sink.MaskSensitiveData()
}
if c.Consistent != nil {
c.Consistent.MaskSensitiveData()
}
}
30 changes: 30 additions & 0 deletions pkg/config/replica_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,3 +383,33 @@ func TestAdjustEnableOldValueAndVerifyForceReplicate(t *testing.T) {
require.NoError(t, err)
require.False(t, config.EnableOldValue)
}

func TestMaskSensitiveData(t *testing.T) {
config := ReplicaConfig{
Sink: nil,
Consistent: nil,
}
config.MaskSensitiveData()
require.Nil(t, config.Sink)
require.Nil(t, config.Consistent)
config.Sink = &SinkConfig{}
config.Sink.KafkaConfig = &KafkaConfig{
SASLOAuthTokenURL: aws.String("http://abc.com?password=bacd"),
SASLOAuthClientSecret: aws.String("bacd"),
SASLPassword: aws.String("bacd"),
SASLGssAPIPassword: aws.String("bacd"),
Key: aws.String("bacd"),
}
config.Sink.SchemaRegistry = "http://abc.com?password=bacd"
config.Consistent = &ConsistentConfig{
Storage: "http://abc.com?password=bacd",
}
config.MaskSensitiveData()
require.Equal(t, "http://abc.com?password=xxxxx", config.Sink.SchemaRegistry)
require.Equal(t, "http://abc.com?password=xxxxx", config.Consistent.Storage)
require.Equal(t, "http://abc.com?password=xxxxx", *config.Sink.KafkaConfig.SASLOAuthTokenURL)
require.Equal(t, "******", *config.Sink.KafkaConfig.SASLOAuthClientSecret)
require.Equal(t, "******", *config.Sink.KafkaConfig.Key)
require.Equal(t, "******", *config.Sink.KafkaConfig.SASLPassword)
require.Equal(t, "******", *config.Sink.KafkaConfig.SASLGssAPIPassword)
}
22 changes: 22 additions & 0 deletions pkg/config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"net/url"
"strings"

"github.com/aws/aws-sdk-go/aws"
"github.com/pingcap/errors"
"github.com/pingcap/log"
cerror "github.com/pingcap/tiflow/pkg/errors"
Expand Down Expand Up @@ -149,6 +150,16 @@ type SinkConfig struct {
AdvanceTimeoutInSec *uint `toml:"advance-timeout-in-sec" json:"advance-timeout-in-sec,omitempty"`
}

// MaskSensitiveData masks sensitive data in SinkConfig
func (s *SinkConfig) MaskSensitiveData() {
if s.SchemaRegistry != "" {
s.SchemaRegistry = util.MaskSensitiveDataInURI(s.SchemaRegistry)
}
if s.KafkaConfig != nil {
s.KafkaConfig.MaskSensitiveData()
}
}

// CSVConfig defines a series of configuration items for csv codec.
type CSVConfig struct {
// delimiter between fields
Expand Down Expand Up @@ -315,6 +326,17 @@ type KafkaConfig struct {
CodecConfig *CodecConfig `toml:"codec-config" json:"codec-config,omitempty"`
}

// MaskSensitiveData masks sensitive data in KafkaConfig
func (k *KafkaConfig) MaskSensitiveData() {
k.SASLPassword = aws.String("******")
k.SASLGssAPIPassword = aws.String("******")
k.SASLOAuthClientSecret = aws.String("******")
k.Key = aws.String("******")
if k.SASLOAuthTokenURL != nil {
k.SASLOAuthTokenURL = aws.String(util.MaskSensitiveDataInURI(*k.SASLOAuthTokenURL))
}
}

// MySQLConfig represents a MySQL sink configuration
type MySQLConfig struct {
WorkerCount *int `toml:"worker-count" json:"worker-count,omitempty"`
Expand Down
2 changes: 1 addition & 1 deletion pkg/sink/codec/avro/schema_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ func (m *schemaManager) ClearRegistry(ctx context.Context, topicName string) err
)
req, err := http.NewRequestWithContext(ctx, "DELETE", uri, nil)
if err != nil {
log.Error("Could not construct request for clearRegistry", zap.String("uri", uri))
log.Error("Could not construct request for clearRegistry", zap.Error(err))
return cerror.WrapError(cerror.ErrAvroSchemaAPIError, err)
}
req.Header.Add(
Expand Down
29 changes: 29 additions & 0 deletions pkg/util/uri.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,32 @@ func MaskSinkURI(uri string) (string, error) {
}
return uriParsed.Redacted(), nil
}

var sensitiveQueryParameterNames = []string{
"password",
"sasl-password",
"access-key",
"secret-access-key",
"access_token",
"token",
"secret",
"passwd",
"pwd",
}

// MaskSensitiveDataInURI returns an uri that sensitive infos has been masked.
func MaskSensitiveDataInURI(uri string) string {
uriParsed, err := url.Parse(uri)
if err != nil {
log.Error("failed to parse sink URI", zap.Error(err))
return ""
}
queries := uriParsed.Query()
for _, secretKey := range sensitiveQueryParameterNames {
if queries.Has(secretKey) {
queries.Set(secretKey, "xxxxx")
}
}
uriParsed.RawQuery = queries.Encode()
return uriParsed.Redacted()
}
34 changes: 34 additions & 0 deletions pkg/util/uri_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,37 @@ func TestMaskSinkURI(t *testing.T) {
require.Equal(t, tt.masked, maskedURI)
}
}

func TestMaskSensitiveDataInURI(t *testing.T) {
tests := []struct {
uri string
masked string
}{
{
"mysql://root:[email protected]:3306/?time-zone=c",
"mysql://root:[email protected]:3306/?time-zone=c",
},
{
"",
"",
},
{
"abc",
"abc",
},
}
for _, q := range sensitiveQueryParameterNames {
tests = append(tests, struct {
uri string
masked string
}{
"kafka://127.0.0.1:9093/cdc?" + q + "=verysecure",
"kafka://127.0.0.1:9093/cdc?" + q + "=xxxxx",
})
}

for _, tt := range tests {
maskedURI := MaskSensitiveDataInURI(tt.uri)
require.Equal(t, tt.masked, maskedURI)
}
}

0 comments on commit b9dcc64

Please sign in to comment.