Skip to content

Commit

Permalink
fix trx log
Browse files Browse the repository at this point in the history
  • Loading branch information
liuyuecai committed Mar 23, 2024
1 parent c31cf14 commit c93ad45
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 51 deletions.
27 changes: 21 additions & 6 deletions pkg/runtime/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package context

import (
"context"
"github.com/arana-db/arana/pkg/runtime"
)

import (
Expand All @@ -33,6 +32,22 @@ const (
_flagWrite
)

// TxState Transaction status
type TxState int32

const (
_ TxState = iota
TrxStarted // CompositeTx Default state
TrxPreparing // All SQL statements are executed, and before the Commit statement executes
TrxPrepared // All SQL statements are executed, and before the Commit statement executes
TrxCommitting // After preparing is completed, ready to start execution
TrxCommitted // Officially complete the Commit action
TrxRolledBacking
TrxRolledBacked
TrxAborted
TrxUnknown // Unknown transaction
)

type (
keyFlag struct{}
keyNodeLabel struct{}
Expand Down Expand Up @@ -118,7 +133,7 @@ func TransactionID(ctx context.Context) string {
return getString(ctx, keyTransactionID{})
}

func TransactionStatus(ctx context.Context) runtime.TxState {
func TransactionStatus(ctx context.Context) TxState {
return getTxStatus(ctx, keyTransactionStatus{})
}

Expand Down Expand Up @@ -157,11 +172,11 @@ func getString(ctx context.Context, v any) string {
return ""
}

func getTxStatus(ctx context.Context, v any) runtime.TxState {
func getTxStatus(ctx context.Context, v any) TxState {
if data, ok := ctx.Value(v).(int32); ok {
if data >= int32(runtime.TrxStarted) && data <= int32(runtime.TrxAborted) {
return runtime.TxState(data)
if data >= int32(TrxStarted) && data <= int32(TrxAborted) {
return TxState(data)
}
}
return runtime.TrxUnknown
return TrxUnknown
}
3 changes: 1 addition & 2 deletions pkg/runtime/transaction/xa.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"context"
"errors"
"fmt"
"github.com/arana-db/arana/pkg/runtime"
)

import (
Expand All @@ -39,7 +38,7 @@ func StartXA(ctx context.Context, bc *mysql.BackendConnection) (proto.Result, er
if len(txId) == 0 {
return nil, ErrorInvalidTxId
}
if rcontext.TransactionStatus(ctx) != runtime.TrxStarted {
if rcontext.TransactionStatus(ctx) != rcontext.TrxStarted {
return nil, ErrorInvalidTxStatus
}
return bc.ExecuteWithWarningCount(fmt.Sprintf("XA START '%s'", txId), false)
Expand Down
70 changes: 27 additions & 43 deletions pkg/runtime/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,22 +57,6 @@ var (
_ proto.VersionSupport = (*compositeTx)(nil)
)

// TxState Transaction status
type TxState int32

const (
_ TxState = iota
TrxStarted // CompositeTx Default state
TrxPreparing // All SQL statements are executed, and before the Commit statement executes
TrxPrepared // All SQL statements are executed, and before the Commit statement executes
TrxCommitting // After preparing is completed, ready to start execution
TrxCommitted // Officially complete the Commit action
TrxRolledBacking
TrxRolledBacked
TrxAborted
TrxUnknown // Unknown transaction
)

// CompositeTx distribute transaction
type (
// CompositeTx distribute transaction
Expand All @@ -82,7 +66,7 @@ type (
// GetTenant get cur tx owner tenant
GetTenant() string
// GetTxState get cur tx state
GetTxState() TxState
GetTxState() rcontext.TxState
// GetExpectedEndTime
GetStartTime() time.Time
// GetExpectedEndTime get cur tx expected end time
Expand All @@ -108,7 +92,7 @@ type (
// GetConn gets mysql connection
GetConn() *mysql.BackendConnection
// GetTxState get cur tx state
GetTxState() TxState
GetTxState() rcontext.TxState
// Commit commit tx
Commit(ctx context.Context) (res proto.Result, warn uint16, err error)
// Rollback rollback tx
Expand All @@ -118,7 +102,7 @@ type (
// TxHook transaction hook
TxHook interface {
// OnTxStateChange Fired when CompositeTx TrxState change
OnTxStateChange(ctx context.Context, state TxState, tx CompositeTx) error
OnTxStateChange(ctx context.Context, state rcontext.TxState, tx CompositeTx) error
// OnCreateBranchTx Fired when BranchTx create
OnCreateBranchTx(ctx context.Context, tx BranchTx)
}
Expand Down Expand Up @@ -150,7 +134,7 @@ func newCompositeTx(ctx context.Context, pi *defaultRuntime, hooks ...TxHook) *c
},
}

tx.setTxState(ctx, TrxStarted)
tx.setTxState(ctx, rcontext.TrxStarted)
return tx
}

Expand All @@ -165,7 +149,7 @@ type compositeTx struct {
endTime time.Time

isoLevel sql.IsolationLevel
txState TxState
txState rcontext.TxState

beginFunc dbFunc

Expand Down Expand Up @@ -334,7 +318,7 @@ func (tx *compositeTx) Commit(ctx context.Context) (proto.Result, uint16, error)
}

func (tx *compositeTx) doPrepareCommit(ctx context.Context) error {
tx.setTxState(ctx, TrxPreparing)
tx.setTxState(ctx, rcontext.TrxPreparing)

var g errgroup.Group
for k, v := range tx.txs {
Expand All @@ -348,16 +332,16 @@ func (tx *compositeTx) doPrepareCommit(ctx context.Context) error {
})
}
if err := g.Wait(); err != nil {
tx.setTxState(ctx, TrxAborted)
tx.setTxState(ctx, rcontext.TrxAborted)
return err
}

tx.setTxState(ctx, TrxPrepared)
tx.setTxState(ctx, rcontext.TrxPrepared)
return nil
}

func (tx *compositeTx) doCommit(ctx context.Context) error {
tx.setTxState(ctx, TrxCommitting)
tx.setTxState(ctx, rcontext.TrxCommitting)

var g errgroup.Group
for k, v := range tx.txs {
Expand All @@ -375,7 +359,7 @@ func (tx *compositeTx) doCommit(ctx context.Context) error {
return err
}

tx.setTxState(ctx, TrxCommitted)
tx.setTxState(ctx, rcontext.TrxCommitted)
return nil
}

Expand Down Expand Up @@ -403,7 +387,7 @@ func (tx *compositeTx) Rollback(ctx context.Context) (proto.Result, uint16, erro
}

func (tx *compositeTx) doPrepareRollback(ctx context.Context) error {
tx.setTxState(ctx, TrxPreparing)
tx.setTxState(ctx, rcontext.TrxPreparing)

var g errgroup.Group
for k, v := range tx.txs {
Expand All @@ -418,15 +402,15 @@ func (tx *compositeTx) doPrepareRollback(ctx context.Context) error {
}

if err := g.Wait(); err != nil {
tx.setTxState(ctx, TrxAborted)
tx.setTxState(ctx, rcontext.TrxAborted)
return err
}
tx.setTxState(ctx, TrxPrepared)
tx.setTxState(ctx, rcontext.TrxPrepared)
return nil
}

func (tx *compositeTx) doRollback(ctx context.Context) error {
tx.setTxState(ctx, TrxRolledBacking)
tx.setTxState(ctx, rcontext.TrxRolledBacking)

var g errgroup.Group
for k, v := range tx.txs {
Expand All @@ -443,7 +427,7 @@ func (tx *compositeTx) doRollback(ctx context.Context) error {
if err := g.Wait(); err != nil {
return err
}
tx.setTxState(ctx, TrxRolledBacked)
tx.setTxState(ctx, rcontext.TrxRolledBacked)
return nil
}

Expand All @@ -454,11 +438,11 @@ func (tx *compositeTx) Range(f func(tx BranchTx)) {
}
}

func (tx *compositeTx) GetTxState() TxState {
func (tx *compositeTx) GetTxState() rcontext.TxState {
return tx.txState
}

func (tx *compositeTx) setTxState(ctx context.Context, state TxState) {
func (tx *compositeTx) setTxState(ctx context.Context, state rcontext.TxState) {
tx.txState = state
for i := range tx.hooks {
if err := tx.hooks[i].OnTxStateChange(ctx, state, tx); err != nil {
Expand All @@ -473,7 +457,7 @@ type branchTx struct {
closed atomic.Bool
parent *AtomDB

state TxState
state rcontext.TxState

prepare dbFunc
commit dbFunc
Expand All @@ -498,20 +482,20 @@ func newBranchTx(parent *AtomDB, bc *mysql.BackendConnection) *branchTx {
}

// GetTxState get cur tx state
func (tx *branchTx) GetTxState() TxState {
func (tx *branchTx) GetTxState() rcontext.TxState {
return tx.state
}

func (tx *branchTx) Commit(ctx context.Context) (res proto.Result, warn uint16, err error) {
tx.state = TrxCommitting
tx.state = rcontext.TrxCommitting
_ = ctx
if !tx.closed.CAS(false, true) {
err = errTxClosed
return
}
defer tx.dispose()
if res, err = tx.commit(ctx, tx.bc); err != nil {
tx.state = TrxAborted
tx.state = rcontext.TrxAborted
return
}

Expand All @@ -525,14 +509,14 @@ func (tx *branchTx) Commit(ctx context.Context) (res proto.Result, warn uint16,
}

res = resultx.New(resultx.WithRowsAffected(affected), resultx.WithLastInsertID(lastInsertId))
tx.state = TrxCommitted
tx.state = rcontext.TrxCommitted
return
}

func (tx *branchTx) Prepare(ctx context.Context) error {
tx.state = TrxPreparing
tx.state = rcontext.TrxPreparing
_, err := tx.prepare(ctx, tx.bc)
tx.state = TrxPrepared
tx.state = rcontext.TrxPrepared
return err
}

Expand All @@ -542,9 +526,9 @@ func (tx *branchTx) Rollback(ctx context.Context) (res proto.Result, warn uint16
return
}
defer tx.dispose()
tx.state = TrxRolledBacking
tx.state = rcontext.TrxRolledBacking
res, err = tx.rollback(ctx, tx.bc)
tx.state = TrxRolledBacked
tx.state = rcontext.TrxRolledBacked
return
}

Expand Down Expand Up @@ -598,7 +582,7 @@ func (tx *branchTx) GetConn() *mysql.BackendConnection {
return tx.bc
}

func NumOfStateBranchTx(state TxState, tx CompositeTx) int32 {
func NumOfStateBranchTx(state rcontext.TxState, tx CompositeTx) int32 {
cnt := int32(0)
tx.Range(func(bTx BranchTx) {
if bTx.GetTxState() == state {
Expand Down

0 comments on commit c93ad45

Please sign in to comment.