Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reject TwoPC calls if semi-sync is not enabled #16608

Merged
29 changes: 29 additions & 0 deletions go/test/endtoend/transaction/twopc/twopc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1022,3 +1022,32 @@ func TestReadingUnresolvedTransactions(t *testing.T) {
})
}
}

// TestSemiSyncRequiredWithTwoPC tests that semi-sync is required when using two-phase commit.
func TestSemiSyncRequiredWithTwoPC(t *testing.T) {
// cleanup all the old data.
conn, closer := start(t)
defer closer()

out, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("SetKeyspaceDurabilityPolicy", keyspaceName, "--durability-policy=none")
require.NoError(t, err, out)
defer clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("SetKeyspaceDurabilityPolicy", keyspaceName, "--durability-policy=semi_sync")

// After changing the durability policy for the given keyspace to none, we run PRS.
shard := clusterInstance.Keyspaces[0].Shards[2]
newPrimary := shard.Vttablets[1]
_, err = clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput(
"PlannedReparentShard",
fmt.Sprintf("%s/%s", keyspaceName, shard.Name),
"--new-primary", newPrimary.Alias)
require.NoError(t, err)

// A new distributed transaction should fail.
utils.Exec(t, conn, "begin")
utils.Exec(t, conn, "insert into twopc_t1(id, col) values(4, 4)")
utils.Exec(t, conn, "insert into twopc_t1(id, col) values(6, 4)")
utils.Exec(t, conn, "insert into twopc_t1(id, col) values(9, 4)")
_, err = utils.ExecAllowError(t, conn, "commit")
require.Error(t, err)
require.ErrorContains(t, err, "two-pc is enabled, but semi-sync is not")
}
7 changes: 4 additions & 3 deletions go/test/endtoend/transaction/tx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,10 @@ func TestMain(m *testing.M) {

// Start keyspace
keyspace := &cluster.Keyspace{
Name: keyspaceName,
SchemaSQL: SchemaSQL,
VSchema: VSchema,
Name: keyspaceName,
SchemaSQL: SchemaSQL,
VSchema: VSchema,
DurabilityPolicy: "semi_sync",
}
if err := clusterInstance.StartKeyspace(*keyspace, []string{"-80", "80-"}, 1, false); err != nil {
return 1, err
Expand Down
29 changes: 20 additions & 9 deletions go/vt/vttablet/tabletmanager/rpc_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,10 @@ import (
"strings"
"time"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/protoutil"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/mysqlctl"
"vitess.io/vitess/go/vt/proto/vtrpc"
Expand Down Expand Up @@ -348,6 +347,15 @@ func (tm *TabletManager) InitPrimary(ctx context.Context, semiSync bool) (string
}
defer tm.unlock()

semiSyncAction, err := tm.convertBoolToSemiSyncAction(ctx, semiSync)
if err != nil {
return "", err
}

// If semi-sync is enabled, we need to set two pc to be allowed.
// Otherwise, we block all Prepared calls because atomic transactions require semi-sync for correctness..
tm.QueryServiceControl.SetTwoPCAllowed(semiSyncAction == SemiSyncActionSet)

// Setting super_read_only `OFF` so that we can run the DDL commands
if _, err := tm.MysqlDaemon.SetSuperReadOnly(ctx, false); err != nil {
if sqlErr, ok := err.(*sqlerror.SQLError); ok && sqlErr.Number() == sqlerror.ERUnknownSystemVariable {
Expand All @@ -369,11 +377,6 @@ func (tm *TabletManager) InitPrimary(ctx context.Context, semiSync bool) (string
return "", err
}

semiSyncAction, err := tm.convertBoolToSemiSyncAction(ctx, semiSync)
if err != nil {
return "", err
}

// Set the server read-write, from now on we can accept real
// client writes. Note that if semi-sync replication is enabled,
// we'll still need some replicas to be able to commit transactions.
Expand Down Expand Up @@ -595,6 +598,10 @@ func (tm *TabletManager) UndoDemotePrimary(ctx context.Context, semiSync bool) e
return err
}

// If semi-sync is enabled, we need to set two pc to be allowed.
// Otherwise, we block all Prepared calls because atomic transactions require semi-sync for correctness..
tm.QueryServiceControl.SetTwoPCAllowed(semiSyncAction == SemiSyncActionSet)

// If using semi-sync, we need to enable source-side.
if err := tm.fixSemiSync(ctx, topodatapb.TabletType_PRIMARY, semiSyncAction); err != nil {
return err
Expand Down Expand Up @@ -911,12 +918,16 @@ func (tm *TabletManager) PromoteReplica(ctx context.Context, semiSync bool) (str
}
defer tm.unlock()

pos, err := tm.MysqlDaemon.Promote(ctx, tm.hookExtraEnv())
semiSyncAction, err := tm.convertBoolToSemiSyncAction(ctx, semiSync)
if err != nil {
return "", err
}

semiSyncAction, err := tm.convertBoolToSemiSyncAction(ctx, semiSync)
// If semi-sync is enabled, we need to set two pc to be allowed.
// Otherwise, we block all Prepared calls because atomic transactions require semi-sync for correctness..
tm.QueryServiceControl.SetTwoPCAllowed(semiSyncAction == SemiSyncActionSet)

pos, err := tm.MysqlDaemon.Promote(ctx, tm.hookExtraEnv())
if err != nil {
return "", err
}
Expand Down
3 changes: 3 additions & 0 deletions go/vt/vttablet/tabletserver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ type Controller interface {
GetThrottlerStatus(ctx context.Context) *throttle.ThrottlerStatus

RedoPreparedTransactions()

// SetTwoPCAllowed sets whether TwoPC is allowed or not.
SetTwoPCAllowed(bool)
}

// Ensure TabletServer satisfies Controller interface.
Expand Down
3 changes: 3 additions & 0 deletions go/vt/vttablet/tabletserver/dt_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ func (dte *DTExecutor) Prepare(transactionID int64, dtid string) error {
if !dte.te.twopcEnabled {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "2pc is not enabled")
}
if !dte.te.twopcAllowed {
return vterrors.VT10002("two-pc is enabled, but semi-sync is not")
}
defer dte.te.env.Stats().QueryTimings.Record("PREPARE", time.Now())
dte.logStats.TransactionID = transactionID

Expand Down
5 changes: 5 additions & 0 deletions go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -1692,6 +1692,11 @@ func (tsv *TabletServer) RedoPreparedTransactions() {
tsv.te.RedoPreparedTransactions()
}

// SetTwoPCAllowed sets whether TwoPC is allowed or not.
func (tsv *TabletServer) SetTwoPCAllowed(allowed bool) {
tsv.te.twopcAllowed = allowed
}

// HandlePanic is part of the queryservice.QueryService interface
func (tsv *TabletServer) HandlePanic(err *error) {
if x := recover(); x != nil {
Expand Down
9 changes: 8 additions & 1 deletion go/vt/vttablet/tabletserver/tx_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,11 @@ type TxEngine struct {
// transition while creating new transactions
beginRequests sync.WaitGroup

twopcEnabled bool
// twopcEnabled is the flag value of whether the user has enabled twopc or not.
twopcEnabled bool
// twopcAllowed is wether it is safe to allow two pc transactions or not.
// If the primary tablet doesn't run with semi-sync we set this to false, and disallow any prepared calls.
twopcAllowed bool
GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved
shutdownGracePeriod time.Duration
coordinatorAddress string
abandonAge time.Duration
Expand All @@ -100,6 +104,9 @@ func NewTxEngine(env tabletenv.Env, dxNotifier func()) *TxEngine {
}
limiter := txlimiter.New(env)
te.txPool = NewTxPool(env, limiter)
// We initially allow twoPC (handles vttablet restarts).
// We will disallow them, when a new tablet is promoted if semi-sync is turned off.
te.twopcAllowed = true
te.twopcEnabled = config.TwoPCEnable
if te.twopcEnabled {
if config.TwoPCAbandonAge <= 0 {
Expand Down
4 changes: 4 additions & 0 deletions go/vt/vttablet/tabletservermock/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,10 @@ func (tqsc *Controller) GetThrottlerStatus(ctx context.Context) *throttle.Thrott
// RedoPreparedTransactions is part of the tabletserver.Controller interface
func (tqsc *Controller) RedoPreparedTransactions() {}

// SetTwoPCAllowed sets whether TwoPC is allowed or not.
func (tqsc *Controller) SetTwoPCAllowed(bool) {
}

// EnterLameduck implements tabletserver.Controller.
func (tqsc *Controller) EnterLameduck() {
tqsc.mu.Lock()
Expand Down
Loading