Skip to content

Commit

Permalink
support RollbackReason (#49) (#54)
Browse files Browse the repository at this point in the history
* support RollbackReason (#49)

* chore: update dtm version

* fix: null rollback reason
  • Loading branch information
catcherwong authored Aug 29, 2022
1 parent 883c327 commit 6b3602f
Show file tree
Hide file tree
Showing 10 changed files with 137 additions and 22 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/build_and_it.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: Build_And_IntegrationTests
name: Build_And_GRPC_IntegrationTests

on:
push:
Expand Down Expand Up @@ -48,8 +48,8 @@ jobs:
mysql -h127.0.0.1 -uroot -p123456 < /home/runner/work/client-csharp/client-csharp/sqls/busi.mysql.sql
- name: Setup DTM server
run: |
wget https://github.com/dtm-labs/dtm/releases/download/v1.12.1/dtm_1.12.1_linux_amd64.tar.gz
tar -xvf dtm_1.12.1_linux_amd64.tar.gz
wget https://github.com/dtm-labs/dtm/releases/download/v1.16.1/dtm_1.16.1_linux_amd64.tar.gz
tar -xvf dtm_1.16.1_linux_amd64.tar.gz
pwd
mkdir /home/runner/work/client-csharp/client-csharp/logs
nohup ./dtm > /home/runner/work/client-csharp/client-csharp/logs/dtm.log 2>&1 &
Expand Down
6 changes: 6 additions & 0 deletions src/DtmCommon/Imp/TransBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ public class TransBase
[JsonPropertyName("query_prepared")]
public string QueryPrepared { get; set; }

[JsonPropertyName("protocol")]
public string Protocol { get; set; }

[JsonPropertyName("rollback_reason")]
public string RollbackReason { get; set; }

[JsonIgnore]
public string Dtm { get; set; }

Expand Down
1 change: 1 addition & 0 deletions src/Dtmcli/Tcc/TccGlobalTransaction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public async Task<string> Excecute(string gid, Action<Tcc> custom, Func<Tcc, Tas
}
catch (Exception ex)
{
tcc.GetTransBase().RollbackReason = ex.Message.Substring(0, ex.Message.Length > 1023 ? 1023 : ex.Message.Length);
logger.LogError(ex, "prepare or submitting global transaction error");
await dtmClient.TransCallDtm(tcc.GetTransBase(), tcc.GetTransBase(), Constant.Request.OPERATION_ABORT, cancellationToken);
return string.Empty;
Expand Down
1 change: 1 addition & 0 deletions src/Dtmgrpc/DtmgRPCClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ private dtmgpb.DtmRequest BuildDtmRequest(TransBase transBase)
QueryPrepared = transBase.QueryPrepared ?? string.Empty,
CustomedData = transBase.CustomData ?? string.Empty,
Steps = transBase.Steps == null ? string.Empty : Utils.ToJsonString(transBase.Steps),
RollbackReason = transBase.RollbackReason ?? string.Empty,
};

foreach (var item in transBase.BinPayloads ?? new List<byte[]>())
Expand Down
1 change: 1 addition & 0 deletions src/Dtmgrpc/Tcc/TccGlobalTransaction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public async Task<string> Excecute(string gid, Action<TccGrpc> custom, Func<TccG
}
catch (Exception ex)
{
tcc.GetTransBase().RollbackReason = ex.Message.Substring(0, ex.Message.Length > 1023 ? 1023 : ex.Message.Length);
_logger.LogError(ex, "submitting or abort global transaction error");
await _dtmClient.DtmGrpcCall(tcc.GetTransBase(), Constant.Op.Abort);
return string.Empty;
Expand Down
2 changes: 2 additions & 0 deletions src/Dtmgrpc/dtmgpb/dtmgimp.proto
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ message DtmRequest {
repeated bytes BinPayloads = 5; // for MSG/SAGA branch payloads
string QueryPrepared = 6; // for MSG
string Steps = 7;
map<string, string> ReqExtra = 8;
string RollbackReason = 9;
}

message DtmGidReply {
Expand Down
71 changes: 60 additions & 11 deletions tests/Dtmcli.Tests/TccTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public async void Execute_Should_Submit()
TestHelper.MockTransCallDtm(dtmClient, Constant.Request.OPERATION_PREPARE, false);
TestHelper.MockTransRegisterBranch(dtmClient, Constant.Request.OPERATION_REGISTERBRANCH, false);
TestHelper.MockTransRequestBranch(dtmClient, System.Net.HttpStatusCode.OK);

var globalTrans = new TccGlobalTransaction(dtmClient.Object, NullLoggerFactory.Instance);
var res = await globalTrans.Excecute(async (tcc) =>
{
Expand Down Expand Up @@ -101,31 +101,80 @@ public async void Set_TransOptions_Should_Succeed()

var gid = "tcc_gid";
var globalTrans = new TccGlobalTransaction(dtmClient.Object, NullLoggerFactory.Instance);
var res = await globalTrans.Excecute(gid, tcc =>
var res = await globalTrans.Excecute(gid, tcc =>
{
tcc.EnableWaitResult();
tcc.SetRetryInterval(10);
tcc.SetTimeoutToFail(100);
tcc.SetBranchHeaders(new Dictionary<string, string>
tcc.SetBranchHeaders(new Dictionary<string, string>
{
{ "bh1", "123" },
{ "bh2", "456" },
});
}, async (tcc) =>
}, async (tcc) =>
{
var res1 = await tcc.CallBranch(new { }, "http://localhost:9999/TransOutTry", "http://localhost:9999/TransOutConfirm", "http://localhost:9999/TransOutCancel", default);
var res2 = await tcc.CallBranch(new { }, "http://localhost:9999/TransInTry", "http://localhost:9999/TransInConfirm", "http://localhost:9999/TransInCancel", default);
var transBase = tcc.GetTransBase();
Assert.True(transBase.WaitResult);
Assert.Equal(10, transBase.RetryInterval);
Assert.Equal(100, transBase.TimeoutToFail);
Assert.Contains("bh1", transBase.BranchHeaders.Keys);
Assert.Contains("bh2", transBase.BranchHeaders.Keys);
});

Assert.Equal(gid, res);
}

[Fact]
public async void Execute_Should_Abort_With_RollbackReason_When_Occure_Exception()
{
var dtmClient = new Mock<IDtmClient>();
TestHelper.MockTransCallDtm(dtmClient, Constant.Request.OPERATION_PREPARE, false);
TestHelper.MockTransCallDtm(dtmClient, Constant.Request.OPERATION_ABORT, false);
TestHelper.MockTransRegisterBranch(dtmClient, Constant.Request.OPERATION_REGISTERBRANCH, true, "123123123");
TestHelper.MockTransRequestBranch(dtmClient, System.Net.HttpStatusCode.BadRequest);

var gid = "tcc_gid";
var globalTrans = new TccGlobalTransaction(dtmClient.Object, NullLoggerFactory.Instance);
var res = await globalTrans.Excecute(gid, async (tcc) =>
{
var res1 = await tcc.CallBranch(new { }, "http://localhost:9999/TransOutTry", "http://localhost:9999/TransOutConfirm", "http://localhost:9999/TransOutCancel", default);
var res2 = await tcc.CallBranch(new { }, "http://localhost:9999/TransInTry", "http://localhost:9999/TransInConfirm", "http://localhost:9999/TransInCancel", default);
});

var transBase = tcc.GetTransBase();
Assert.Empty(res);
dtmClient.Verify(x => x.TransCallDtm(It.Is<TransBase>(x => x.RollbackReason.Equals("123123123")), It.IsAny<object>(), Constant.Request.OPERATION_ABORT, It.IsAny<CancellationToken>()), Times.Once);
}

[Fact]
public async void Execute_Should_Abort_With_Big_RollbackReason_When_Occure_Exception()
{
var builder = new System.Text.StringBuilder(2048);
for (int i = 0; i < 1100; i++)
{
builder.Append("r");
}
var ex = builder.ToString();

var dtmClient = new Mock<IDtmClient>();
TestHelper.MockTransCallDtm(dtmClient, Constant.Request.OPERATION_PREPARE, false);
TestHelper.MockTransCallDtm(dtmClient, Constant.Request.OPERATION_ABORT, false);
TestHelper.MockTransRegisterBranch(dtmClient, Constant.Request.OPERATION_REGISTERBRANCH, true, ex);
TestHelper.MockTransRequestBranch(dtmClient, System.Net.HttpStatusCode.BadRequest);

Assert.True(transBase.WaitResult);
Assert.Equal(10, transBase.RetryInterval);
Assert.Equal(100, transBase.TimeoutToFail);
Assert.Contains("bh1", transBase.BranchHeaders.Keys);
Assert.Contains("bh2", transBase.BranchHeaders.Keys);
var gid = "tcc_gid";
var globalTrans = new TccGlobalTransaction(dtmClient.Object, NullLoggerFactory.Instance);
var res = await globalTrans.Excecute(gid, async (tcc) =>
{
var res1 = await tcc.CallBranch(new { }, "http://localhost:9999/TransOutTry", "http://localhost:9999/TransOutConfirm", "http://localhost:9999/TransOutCancel", default);
var res2 = await tcc.CallBranch(new { }, "http://localhost:9999/TransInTry", "http://localhost:9999/TransInConfirm", "http://localhost:9999/TransInCancel", default);
});

Assert.Equal(gid, res);
Assert.Empty(res);
dtmClient.Verify(x => x.TransCallDtm(It.Is<TransBase>(x => x.RollbackReason.Equals(ex.Substring(0, 1023))), It.IsAny<object>(), Constant.Request.OPERATION_ABORT, It.IsAny<CancellationToken>()), Times.Once);
}
}
}
8 changes: 4 additions & 4 deletions tests/Dtmcli.Tests/TestHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,29 +12,29 @@ namespace Dtmcli.Tests
{
public class TestHelper
{
public static void MockTransCallDtm(Mock<IDtmClient> mock, string op, bool isEx)
public static void MockTransCallDtm(Mock<IDtmClient> mock, string op, bool isEx, string ex = "")
{
var setup = mock
.Setup(x => x.TransCallDtm(It.IsAny<TransBase>(), It.IsAny<object>(), op, It.IsAny<CancellationToken>()));

if (isEx)
{
setup.Throws(new Exception(""));
setup.Throws(new Exception(ex));
}
else
{
setup.Returns(Task.CompletedTask);
}
}

public static void MockTransRegisterBranch(Mock<IDtmClient> mock, string op, bool isEx)
public static void MockTransRegisterBranch(Mock<IDtmClient> mock, string op, bool isEx, string ex = "")
{
var setup = mock
.Setup(x => x.TransRegisterBranch(It.IsAny<TransBase>(), It.IsAny<Dictionary<string, string>>(), op, It.IsAny<CancellationToken>()));

if (isEx)
{
setup.Throws(new Exception(""));
setup.Throws(new Exception(ex));
}
else
{
Expand Down
55 changes: 55 additions & 0 deletions tests/Dtmgrpc.Tests/TccTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -97,5 +97,60 @@ public async void Set_TransOptions_Should_Succeed()

Assert.Equal(gid, res);
}

[Fact]
public async void Execute_Should_Abort_With_RollbackReason_When_Occure_Exception()
{
var dtmClient = new Mock<IDtmgRPCClient>();
TransMockHelper.MockTransCallDtm(dtmClient, Constant.Op.Prepare, false);
TransMockHelper.MockRegisterBranch(dtmClient, true, "123123123");
TransMockHelper.MockTransRequestBranch(dtmClient, false);

var gid = "tcc_gid";

var transFactory = new Mock<IDtmTransFactory>();
transFactory.Setup(x => x.NewTccGrpc(It.IsAny<string>())).Returns(new TccGrpc(dtmClient.Object, TransBase.NewTransBase(gid, "tcc", "", "")));

var globalTrans = new TccGlobalTransaction(dtmClient.Object, NullLoggerFactory.Instance, transFactory.Object);
var res = await globalTrans.Excecute(gid, async (tcc) =>
{
await tcc.CallBranch<Empty, Empty>(new Empty(), "localhost:9999/svc/TransOutTry", "localhost:9999/svc/TransOutConfirm", "localhost:9999/svc/TransOutCancel");
await tcc.CallBranch<Empty, Empty>(new Empty(), "localhost:9999/svc/TransInTry", "localhost:9999/svc/TransInConfirm", "localhost:9999/svc/TransInCancel");
});

Assert.Empty(res);
dtmClient.Verify(x => x.DtmGrpcCall(It.Is<TransBase>(x => x.RollbackReason.Equals("123123123")), Constant.Op.Abort), Times.Once);
}

[Fact]
public async void Execute_Should_Abort_With_Big_RollbackReason_When_Occure_Exception()
{
var builder = new System.Text.StringBuilder(2048);
for (int i = 0; i < 1100; i++)
{
builder.Append("r");
}
var ex = builder.ToString();

var dtmClient = new Mock<IDtmgRPCClient>();
TransMockHelper.MockTransCallDtm(dtmClient, Constant.Op.Prepare, false);
TransMockHelper.MockRegisterBranch(dtmClient, true, ex);
TransMockHelper.MockTransRequestBranch(dtmClient, false);

var gid = "tcc_gid";

var transFactory = new Mock<IDtmTransFactory>();
transFactory.Setup(x => x.NewTccGrpc(It.IsAny<string>())).Returns(new TccGrpc(dtmClient.Object, TransBase.NewTransBase(gid, "tcc", "", "")));

var globalTrans = new TccGlobalTransaction(dtmClient.Object, NullLoggerFactory.Instance, transFactory.Object);
var res = await globalTrans.Excecute(gid, async (tcc) =>
{
await tcc.CallBranch<Empty, Empty>(new Empty(), "localhost:9999/svc/TransOutTry", "localhost:9999/svc/TransOutConfirm", "localhost:9999/svc/TransOutCancel");
await tcc.CallBranch<Empty, Empty>(new Empty(), "localhost:9999/svc/TransInTry", "localhost:9999/svc/TransInConfirm", "localhost:9999/svc/TransInCancel");
});

Assert.Empty(res);
dtmClient.Verify(x => x.DtmGrpcCall(It.Is<TransBase>(x => x.RollbackReason.Equals(ex.Substring(0, 1023))), Constant.Op.Abort), Times.Once);
}
}
}
8 changes: 4 additions & 4 deletions tests/Dtmgrpc.Tests/TransMockHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,22 @@ namespace Dtmgrpc.Tests
{
public class TransMockHelper
{
public static void MockTransCallDtm(Mock<IDtmgRPCClient> mock, string op, bool isEx)
public static void MockTransCallDtm(Mock<IDtmgRPCClient> mock, string op, bool isEx, string ex = "")
{
var setup = mock
.Setup(x => x.DtmGrpcCall(It.IsAny<TransBase>(), op));

if (isEx)
{
setup.Throws(new Exception(""));
setup.Throws(new Exception(ex));
}
else
{
setup.Returns(Task.CompletedTask);
}
}

public static void MockRegisterBranch(Mock<IDtmgRPCClient> mock, bool isEx)
public static void MockRegisterBranch(Mock<IDtmgRPCClient> mock, bool isEx, string ex = "")
{
var setup = mock
.Setup(x => x.RegisterBranch(
Expand All @@ -38,7 +38,7 @@ public static void MockRegisterBranch(Mock<IDtmgRPCClient> mock, bool isEx)

if (isEx)
{
setup.Throws(new Exception(""));
setup.Throws(new Exception(ex));
}
else
{
Expand Down

0 comments on commit 6b3602f

Please sign in to comment.