Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiple changes on Distributed Transaction user apis #16788

Merged
merged 11 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 24 additions & 26 deletions go/cmd/vtctldclient/command/transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,36 +22,44 @@ import (
"github.com/spf13/cobra"

"vitess.io/vitess/go/cmd/vtctldclient/cli"
querypb "vitess.io/vitess/go/vt/proto/query"
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
)

var (
DistributedTransaction = &cobra.Command{
Use: "DistributedTransaction <cmd>",
Use: "DistributedTransaction [command] [command-flags]",
Short: "Perform commands on distributed transaction",
Args: cobra.MinimumNArgs(2),
Args: cobra.ExactArgs(1),

DisableFlagsInUseLine: true,
}

unresolvedTransactionsOptions = struct {
Keyspace string
AbandonAge int64 // in seconds
}{}

// GetUnresolvedTransactions makes an GetUnresolvedTransactions gRPC call to a vtctld.
GetUnresolvedTransactions = &cobra.Command{
Use: "list <keyspace>",
Use: "unresolved-list --keyspace <keyspace> --abandon-age <abandon_time_seconds>",
Short: "Retrieves unresolved transactions for the given keyspace.",
Aliases: []string{"List"},
Args: cobra.ExactArgs(1),
Args: cobra.NoArgs,
RunE: commandGetUnresolvedTransactions,

DisableFlagsInUseLine: true,
}

concludeTransactionOptions = struct {
Dtid string
}{}

// ConcludeTransaction makes a ConcludeTransaction gRPC call to a vtctld.
ConcludeTransaction = &cobra.Command{
Use: "conclude <dtid> [<keyspace/shard> ...]",
Use: "conclude --dtid <dtid>",
Short: "Concludes the unresolved transaction by rolling back the prepared transaction on each participating shard and removing the transaction metadata record.",
Aliases: []string{"Conclude"},
Args: cobra.MinimumNArgs(1),
Args: cobra.NoArgs,
RunE: commandConcludeTransaction,

DisableFlagsInUseLine: true,
Expand All @@ -72,10 +80,10 @@ const (
func commandGetUnresolvedTransactions(cmd *cobra.Command, args []string) error {
cli.FinishedParsing(cmd)

keyspace := cmd.Flags().Arg(0)
resp, err := client.GetUnresolvedTransactions(commandCtx,
&vtctldatapb.GetUnresolvedTransactionsRequest{
Keyspace: keyspace,
Keyspace: unresolvedTransactionsOptions.Keyspace,
AbandonAge: unresolvedTransactionsOptions.AbandonAge,
})
if err != nil {
return err
Expand All @@ -89,31 +97,17 @@ func commandGetUnresolvedTransactions(cmd *cobra.Command, args []string) error {
return nil
}

func commandConcludeTransaction(cmd *cobra.Command, args []string) error {
allArgs := cmd.Flags().Args()
shards, err := cli.ParseKeyspaceShards(allArgs[1:])
if err != nil {
return err
}
func commandConcludeTransaction(cmd *cobra.Command, args []string) (err error) {
cli.FinishedParsing(cmd)

dtid := allArgs[0]
var participants []*querypb.Target
for _, shard := range shards {
participants = append(participants, &querypb.Target{
Keyspace: shard.Keyspace,
Shard: shard.Name,
})
}
output := ConcludeTransactionOutput{
Dtid: dtid,
Dtid: concludeTransactionOptions.Dtid,
Message: concludeSuccess,
}

_, err = client.ConcludeTransaction(commandCtx,
&vtctldatapb.ConcludeTransactionRequest{
Dtid: dtid,
Participants: participants,
Dtid: concludeTransactionOptions.Dtid,
})
if err != nil {
output.Message = concludeFailure
Expand All @@ -127,7 +121,11 @@ func commandConcludeTransaction(cmd *cobra.Command, args []string) error {
}

func init() {
GetUnresolvedTransactions.Flags().StringVarP(&unresolvedTransactionsOptions.Keyspace, "keyspace", "k", "", "unresolved transactions list for the given keyspace.")
GetUnresolvedTransactions.Flags().Int64VarP(&unresolvedTransactionsOptions.AbandonAge, "abandon-age", "a", 0, "unresolved transactions list which are older than the specified age(in seconds).")
DistributedTransaction.AddCommand(GetUnresolvedTransactions)

ConcludeTransaction.Flags().StringVarP(&concludeTransactionOptions.Dtid, "dtid", "d", "", "conclude transaction for the given distributed transaction ID.")
DistributedTransaction.AddCommand(ConcludeTransaction)

Root.AddCommand(DistributedTransaction)
Expand Down
17 changes: 10 additions & 7 deletions go/test/endtoend/tabletmanager/commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,19 +77,22 @@ func TestTabletCommands(t *testing.T) {
})

t.Run("GetUnresolvedTransactions", func(t *testing.T) {
_, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("DistributedTransaction", "list", keyspaceName)
_, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("DistributedTransaction", "unresolved-list",
"--keyspace", keyspaceName)
require.NoError(t, err)
})
t.Run("ConcludeTransaction", func(t *testing.T) {
output, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("DistributedTransaction", "conclude", "ks:0:1234")
assert.NoError(t, err)
assert.Contains(t, output, "Successfully concluded the distributed transaction")
t.Run("GetUnresolvedTransactions with age threshold", func(t *testing.T) {
_, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("DistributedTransaction", "unresolved-list",
"--keyspace", keyspaceName,
"--abandon-age", "32")
require.NoError(t, err)
})
t.Run("ConcludeTransaction with participants", func(t *testing.T) {
output, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("DistributedTransaction", "conclude", "ks:0:1234", "ks/0")
t.Run("ConcludeTransaction", func(t *testing.T) {
output, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("DistributedTransaction", "conclude", "--dtid", "ks:0:1234")
assert.NoError(t, err)
assert.Contains(t, output, "Successfully concluded the distributed transaction")
})

// check Ping / RefreshState / RefreshStateByShard
err = clusterInstance.VtctldClientProcess.ExecuteCommand("PingTablet", primaryTablet.Alias)
require.Nil(t, err, "error should be Nil")
Expand Down
Loading
Loading