Skip to content

Commit

Permalink
Reject TwoPC calls if semi-sync is not enabled (#16608)
Browse files Browse the repository at this point in the history
Signed-off-by: Manan Gupta <[email protected]>
  • Loading branch information
GuptaManan100 authored Aug 23, 2024
1 parent 1db282a commit 78a54ce
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 13 deletions.
29 changes: 29 additions & 0 deletions go/test/endtoend/transaction/twopc/twopc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1022,3 +1022,32 @@ func TestReadingUnresolvedTransactions(t *testing.T) {
})
}
}

// TestSemiSyncRequiredWithTwoPC tests that semi-sync is required when using two-phase commit.
func TestSemiSyncRequiredWithTwoPC(t *testing.T) {
// cleanup all the old data.
conn, closer := start(t)
defer closer()

out, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("SetKeyspaceDurabilityPolicy", keyspaceName, "--durability-policy=none")
require.NoError(t, err, out)
defer clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("SetKeyspaceDurabilityPolicy", keyspaceName, "--durability-policy=semi_sync")

// After changing the durability policy for the given keyspace to none, we run PRS.
shard := clusterInstance.Keyspaces[0].Shards[2]
newPrimary := shard.Vttablets[1]
_, err = clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput(
"PlannedReparentShard",
fmt.Sprintf("%s/%s", keyspaceName, shard.Name),
"--new-primary", newPrimary.Alias)
require.NoError(t, err)

// A new distributed transaction should fail.
utils.Exec(t, conn, "begin")
utils.Exec(t, conn, "insert into twopc_t1(id, col) values(4, 4)")
utils.Exec(t, conn, "insert into twopc_t1(id, col) values(6, 4)")
utils.Exec(t, conn, "insert into twopc_t1(id, col) values(9, 4)")
_, err = utils.ExecAllowError(t, conn, "commit")
require.Error(t, err)
require.ErrorContains(t, err, "two-pc is enabled, but semi-sync is not")
}
7 changes: 4 additions & 3 deletions go/test/endtoend/transaction/tx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,10 @@ func TestMain(m *testing.M) {

// Start keyspace
keyspace := &cluster.Keyspace{
Name: keyspaceName,
SchemaSQL: SchemaSQL,
VSchema: VSchema,
Name: keyspaceName,
SchemaSQL: SchemaSQL,
VSchema: VSchema,
DurabilityPolicy: "semi_sync",
}
if err := clusterInstance.StartKeyspace(*keyspace, []string{"-80", "80-"}, 1, false); err != nil {
return 1, err
Expand Down
29 changes: 20 additions & 9 deletions go/vt/vttablet/tabletmanager/rpc_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,10 @@ import (
"strings"
"time"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/protoutil"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/mysqlctl"
"vitess.io/vitess/go/vt/proto/vtrpc"
Expand Down Expand Up @@ -348,6 +347,15 @@ func (tm *TabletManager) InitPrimary(ctx context.Context, semiSync bool) (string
}
defer tm.unlock()

semiSyncAction, err := tm.convertBoolToSemiSyncAction(ctx, semiSync)
if err != nil {
return "", err
}

// If semi-sync is enabled, we need to set two pc to be allowed.
// Otherwise, we block all Prepared calls because atomic transactions require semi-sync for correctness..
tm.QueryServiceControl.SetTwoPCAllowed(semiSyncAction == SemiSyncActionSet)

// Setting super_read_only `OFF` so that we can run the DDL commands
if _, err := tm.MysqlDaemon.SetSuperReadOnly(ctx, false); err != nil {
if sqlErr, ok := err.(*sqlerror.SQLError); ok && sqlErr.Number() == sqlerror.ERUnknownSystemVariable {
Expand All @@ -369,11 +377,6 @@ func (tm *TabletManager) InitPrimary(ctx context.Context, semiSync bool) (string
return "", err
}

semiSyncAction, err := tm.convertBoolToSemiSyncAction(ctx, semiSync)
if err != nil {
return "", err
}

// Set the server read-write, from now on we can accept real
// client writes. Note that if semi-sync replication is enabled,
// we'll still need some replicas to be able to commit transactions.
Expand Down Expand Up @@ -595,6 +598,10 @@ func (tm *TabletManager) UndoDemotePrimary(ctx context.Context, semiSync bool) e
return err
}

// If semi-sync is enabled, we need to set two pc to be allowed.
// Otherwise, we block all Prepared calls because atomic transactions require semi-sync for correctness..
tm.QueryServiceControl.SetTwoPCAllowed(semiSyncAction == SemiSyncActionSet)

// If using semi-sync, we need to enable source-side.
if err := tm.fixSemiSync(ctx, topodatapb.TabletType_PRIMARY, semiSyncAction); err != nil {
return err
Expand Down Expand Up @@ -911,12 +918,16 @@ func (tm *TabletManager) PromoteReplica(ctx context.Context, semiSync bool) (str
}
defer tm.unlock()

pos, err := tm.MysqlDaemon.Promote(ctx, tm.hookExtraEnv())
semiSyncAction, err := tm.convertBoolToSemiSyncAction(ctx, semiSync)
if err != nil {
return "", err
}

semiSyncAction, err := tm.convertBoolToSemiSyncAction(ctx, semiSync)
// If semi-sync is enabled, we need to set two pc to be allowed.
// Otherwise, we block all Prepared calls because atomic transactions require semi-sync for correctness..
tm.QueryServiceControl.SetTwoPCAllowed(semiSyncAction == SemiSyncActionSet)

pos, err := tm.MysqlDaemon.Promote(ctx, tm.hookExtraEnv())
if err != nil {
return "", err
}
Expand Down
3 changes: 3 additions & 0 deletions go/vt/vttablet/tabletserver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ type Controller interface {
GetThrottlerStatus(ctx context.Context) *throttle.ThrottlerStatus

RedoPreparedTransactions()

// SetTwoPCAllowed sets whether TwoPC is allowed or not.
SetTwoPCAllowed(bool)
}

// Ensure TabletServer satisfies Controller interface.
Expand Down
3 changes: 3 additions & 0 deletions go/vt/vttablet/tabletserver/dt_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ func (dte *DTExecutor) Prepare(transactionID int64, dtid string) error {
if !dte.te.twopcEnabled {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "2pc is not enabled")
}
if !dte.te.twopcAllowed {
return vterrors.VT10002("two-pc is enabled, but semi-sync is not")
}
defer dte.te.env.Stats().QueryTimings.Record("PREPARE", time.Now())
dte.logStats.TransactionID = transactionID

Expand Down
5 changes: 5 additions & 0 deletions go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -1692,6 +1692,11 @@ func (tsv *TabletServer) RedoPreparedTransactions() {
tsv.te.RedoPreparedTransactions()
}

// SetTwoPCAllowed sets whether TwoPC is allowed or not.
func (tsv *TabletServer) SetTwoPCAllowed(allowed bool) {
tsv.te.twopcAllowed = allowed
}

// HandlePanic is part of the queryservice.QueryService interface
func (tsv *TabletServer) HandlePanic(err *error) {
if x := recover(); x != nil {
Expand Down
9 changes: 8 additions & 1 deletion go/vt/vttablet/tabletserver/tx_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,11 @@ type TxEngine struct {
// transition while creating new transactions
beginRequests sync.WaitGroup

twopcEnabled bool
// twopcEnabled is the flag value of whether the user has enabled twopc or not.
twopcEnabled bool
// twopcAllowed is wether it is safe to allow two pc transactions or not.
// If the primary tablet doesn't run with semi-sync we set this to false, and disallow any prepared calls.
twopcAllowed bool
shutdownGracePeriod time.Duration
coordinatorAddress string
abandonAge time.Duration
Expand All @@ -100,6 +104,9 @@ func NewTxEngine(env tabletenv.Env, dxNotifier func()) *TxEngine {
}
limiter := txlimiter.New(env)
te.txPool = NewTxPool(env, limiter)
// We initially allow twoPC (handles vttablet restarts).
// We will disallow them, when a new tablet is promoted if semi-sync is turned off.
te.twopcAllowed = true
te.twopcEnabled = config.TwoPCEnable
if te.twopcEnabled {
if config.TwoPCAbandonAge <= 0 {
Expand Down
4 changes: 4 additions & 0 deletions go/vt/vttablet/tabletservermock/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,10 @@ func (tqsc *Controller) GetThrottlerStatus(ctx context.Context) *throttle.Thrott
// RedoPreparedTransactions is part of the tabletserver.Controller interface
func (tqsc *Controller) RedoPreparedTransactions() {}

// SetTwoPCAllowed sets whether TwoPC is allowed or not.
func (tqsc *Controller) SetTwoPCAllowed(bool) {
}

// EnterLameduck implements tabletserver.Controller.
func (tqsc *Controller) EnterLameduck() {
tqsc.mu.Lock()
Expand Down

0 comments on commit 78a54ce

Please sign in to comment.