Skip to content

Commit

Permalink
change to use tpc-c transactions with try with resource
Browse files Browse the repository at this point in the history
  • Loading branch information
t-horikawa committed Aug 16, 2023
1 parent c891a60 commit b64ce15
Show file tree
Hide file tree
Showing 5 changed files with 707 additions and 732 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@

public class Delivery {
SqlClient sqlClient;
Transaction transaction;
RandomGenerator randomGenerator;
Profile profile;

Expand Down Expand Up @@ -102,142 +101,138 @@ public long warehouseId() {
return paramsWid;
}

void rollback() throws IOException, ServerException, InterruptedException {
try {
transaction.rollback().get();
} finally {
transaction = null;
}
void rollback(Transaction transaction) throws IOException, ServerException, InterruptedException {
transaction.rollback().get();
}

@SuppressWarnings("checkstyle:methodlength")
public void transaction(AtomicBoolean stop) throws IOException, ServerException, InterruptedException {
while (!stop.get()) {
transaction = sqlClient.createTransaction().get();
profile.invocation.delivery++;
long dId;
for (dId = 1; dId <= Scale.DISTRICTS; dId++) {

// "SELECT no_o_id FROM NEW_ORDER WHERE no_d_id = :no_d_id AND no_w_id = :no_w_id ORDER BY no_o_id"
var future1 = transaction.executeQuery(prepared1,
Parameters.of("no_d_id", (long) dId),
Parameters.of("no_w_id", (long) paramsWid));
try (var resultSet1 = future1.get()) {
if (!resultSet1.nextRow()) {
continue; // noOid is exhausted, it's OK and continue this transaction
try (var transaction = sqlClient.createTransaction().get();) {
profile.invocation.delivery++;
long dId;
for (dId = 1; dId <= Scale.DISTRICTS; dId++) {

// "SELECT no_o_id FROM NEW_ORDER WHERE no_d_id = :no_d_id AND no_w_id = :no_w_id ORDER BY no_o_id"
var future1 = transaction.executeQuery(prepared1,
Parameters.of("no_d_id", (long) dId),
Parameters.of("no_w_id", (long) paramsWid));
try (var resultSet1 = future1.get()) {
if (!resultSet1.nextRow()) {
continue; // noOid is exhausted, it's OK and continue this transaction
}
resultSet1.nextColumn();
noOid = resultSet1.fetchInt8Value();
} catch (ServerException e) {
profile.ordersTable.delivery++;
break;
}
resultSet1.nextColumn();
noOid = resultSet1.fetchInt8Value();
} catch (ServerException e) {
profile.ordersTable.delivery++;
break;
}

try {
// "DELETE FROM NEW_ORDER WHERE no_d_id = :no_d_id AND no_w_id = :no_w_id AND no_o_id = :no_o_id"
var future2 = transaction.executeStatement(prepared2,
Parameters.of("no_d_id", (long) dId),
Parameters.of("no_w_id", (long) paramsWid),
Parameters.of("no_o_id", (long) noOid));
var result2 = future2.get();
} catch (ServerException e) {
profile.ordersTable.delivery++;
break;
}

// "SELECT o_c_id FROM ORDERS WHERE o_id = :o_id AND o_d_id = :o_d_id AND o_w_id = :o_w_id"
var future3 = transaction.executeQuery(prepared3,
Parameters.of("o_id", (long) noOid),
Parameters.of("o_d_id", (long) dId),
Parameters.of("o_w_id", (long) paramsWid));
try (var resultSet3 = future3.get()) {
if (!resultSet3.nextRow()) {
throw new IOException("no record");

try {
// "DELETE FROM NEW_ORDER WHERE no_d_id = :no_d_id AND no_w_id = :no_w_id AND no_o_id = :no_o_id"
var future2 = transaction.executeStatement(prepared2,
Parameters.of("no_d_id", (long) dId),
Parameters.of("no_w_id", (long) paramsWid),
Parameters.of("no_o_id", (long) noOid));
var result2 = future2.get();
} catch (ServerException e) {
profile.ordersTable.delivery++;
break;
}
resultSet3.nextColumn();
cId = resultSet3.fetchInt8Value();
if (resultSet3.nextRow()) {
throw new IOException("found multiple records");

// "SELECT o_c_id FROM ORDERS WHERE o_id = :o_id AND o_d_id = :o_d_id AND o_w_id = :o_w_id"
var future3 = transaction.executeQuery(prepared3,
Parameters.of("o_id", (long) noOid),
Parameters.of("o_d_id", (long) dId),
Parameters.of("o_w_id", (long) paramsWid));
try (var resultSet3 = future3.get()) {
if (!resultSet3.nextRow()) {
throw new IOException("no record");
}
resultSet3.nextColumn();
cId = resultSet3.fetchInt8Value();
if (resultSet3.nextRow()) {
throw new IOException("found multiple records");
}
} catch (ServerException e) {
profile.ordersTable.delivery++;
break;
}
} catch (ServerException e) {
profile.ordersTable.delivery++;
break;
}

try {
// "UPDATE ORDERS SET o_carrier_id = :o_carrier_id WHERE o_id = :o_id AND o_d_id = :o_d_id AND o_w_id = :o_w_id"
var future4 = transaction.executeStatement(prepared4,
Parameters.of("o_carrier_id", (long) paramsOcarrierId),
Parameters.of("o_id", (long) noOid),
Parameters.of("o_d_id", (long) dId),
Parameters.of("o_w_id", (long) paramsWid));
var result4 = future4.get();
} catch (ServerException e) {
profile.ordersTable.delivery++;
break;
}

try {
// "UPDATE ORDER_LINE SET ol_delivery_d = :ol_delivery_d WHERE ol_o_id = :ol_o_id AND ol_d_id = :ol_d_id AND ol_w_id = :ol_w_id"
var future5 = transaction.executeStatement(prepared5,
Parameters.of("ol_delivery_d", paramsOlDeliveryD),
Parameters.of("ol_o_id", (long) noOid),
Parameters.of("ol_d_id", (long) dId),
Parameters.of("ol_w_id", (long) paramsWid));
var result5 = future5.get();
} catch (ServerException e) {
profile.ordersTable.delivery++;
break;
}

// "SELECT o_c_id FROM ORDERS WHERE o_id = :o_id AND o_d_id = :o_d_id AND o_w_id = :o_w_id"
var future6 = transaction.executeQuery(prepared6,
Parameters.of("ol_o_id", (long) noOid),
Parameters.of("ol_d_id", (long) dId),
Parameters.of("ol_w_id", (long) paramsWid));
try (var resultSet6 = future6.get()) {
if (!resultSet6.nextRow()) {
continue;

try {
// "UPDATE ORDERS SET o_carrier_id = :o_carrier_id WHERE o_id = :o_id AND o_d_id = :o_d_id AND o_w_id = :o_w_id"
var future4 = transaction.executeStatement(prepared4,
Parameters.of("o_carrier_id", (long) paramsOcarrierId),
Parameters.of("o_id", (long) noOid),
Parameters.of("o_d_id", (long) dId),
Parameters.of("o_w_id", (long) paramsWid));
var result4 = future4.get();
} catch (ServerException e) {
profile.ordersTable.delivery++;
break;
}
resultSet6.nextColumn();
olTotal = resultSet6.fetchFloat8Value();
if (resultSet6.nextRow()) {
throw new IOException("found multiple records");

try {
// "UPDATE ORDER_LINE SET ol_delivery_d = :ol_delivery_d WHERE ol_o_id = :ol_o_id AND ol_d_id = :ol_d_id AND ol_w_id = :ol_w_id"
var future5 = transaction.executeStatement(prepared5,
Parameters.of("ol_delivery_d", paramsOlDeliveryD),
Parameters.of("ol_o_id", (long) noOid),
Parameters.of("ol_d_id", (long) dId),
Parameters.of("ol_w_id", (long) paramsWid));
var result5 = future5.get();
} catch (ServerException e) {
profile.ordersTable.delivery++;
break;
}

// "SELECT o_c_id FROM ORDERS WHERE o_id = :o_id AND o_d_id = :o_d_id AND o_w_id = :o_w_id"
var future6 = transaction.executeQuery(prepared6,
Parameters.of("ol_o_id", (long) noOid),
Parameters.of("ol_d_id", (long) dId),
Parameters.of("ol_w_id", (long) paramsWid));
try (var resultSet6 = future6.get()) {
if (!resultSet6.nextRow()) {
continue;
}
resultSet6.nextColumn();
olTotal = resultSet6.fetchFloat8Value();
if (resultSet6.nextRow()) {
throw new IOException("found multiple records");
}
} catch (ServerException e) {
profile.ordersTable.delivery++;
break;
}

try {
// "UPDATE CUSTOMER SET c_balance = c_balance + :ol_total WHERE c_id = :c_id AND c_d_id = :c_d_id AND c_w_id = :c_w_id"
var future7 = transaction.executeStatement(prepared7,
Parameters.of("ol_total", (double) olTotal),
Parameters.of("c_id", (long) cId),
Parameters.of("c_d_id", (long) dId),
Parameters.of("c_w_id", (long) paramsWid));
var result7 = future7.get();
} catch (ServerException e) {
profile.customerTable.delivery++;
break;
}
} catch (ServerException e) {
profile.ordersTable.delivery++;
break;
}

try {
// "UPDATE CUSTOMER SET c_balance = c_balance + :ol_total WHERE c_id = :c_id AND c_d_id = :c_d_id AND c_w_id = :c_w_id"
var future7 = transaction.executeStatement(prepared7,
Parameters.of("ol_total", (double) olTotal),
Parameters.of("c_id", (long) cId),
Parameters.of("c_d_id", (long) dId),
Parameters.of("c_w_id", (long) paramsWid));
var result7 = future7.get();
} catch (ServerException e) {
profile.customerTable.delivery++;
break;
}
}

if (dId > Scale.DISTRICTS) { // completed 'for (dId = 1; dId <= Scale.DISTRICTS; dId++) {'
try {
transaction.commit().get();
profile.completion.delivery++;
return;
} catch (ServerException e) {
profile.retryOnCommit.delivery++;
transaction = null;
continue;

if (dId > Scale.DISTRICTS) { // completed 'for (dId = 1; dId <= Scale.DISTRICTS; dId++) {'
try {
transaction.commit().get();
profile.completion.delivery++;
return;
} catch (ServerException e) {
profile.retryOnCommit.delivery++;
continue;
}
}

// break in 'for (dId = 1; dId <= Scale.DISTRICTS; dId++) {'
profile.retryOnStatement.delivery++;
rollback(transaction);
}

// break in 'for (dId = 1; dId <= Scale.DISTRICTS; dId++) {'
profile.retryOnStatement.delivery++;
rollback();
}
}
}
Loading

0 comments on commit b64ce15

Please sign in to comment.