Skip to content

Commit

Permalink
fix: use insert-select for update-stock
Browse files Browse the repository at this point in the history
  • Loading branch information
hishidama committed Aug 8, 2024
1 parent adabf2f commit ea6a3b0
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,7 @@ private void closeConnectionAll() {
}

if (re != null) {
LOG.error("{} all session close error. message={}", purpose, re.getMessage());
throw re;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,15 +330,18 @@ private final <P> int[] executeAndGetCount(TsurugiSqlPreparedStatement<P> ps, Co
var rcList = new ArrayList<TsurugiStatementResult>(parameterList.size());
try {
var transaction = getTransaction();
int i = 0;
for (var parameter : parameterList) {
debugExplain(ps, () -> ps.explain(parameter));
try {
var rc = transaction.executeStatement(ps, parameter);
rcList.add(rc);
} catch (IOException e) {
LOG.error("executeAndGetCount error. i={}/{}", i, parameterList.size(), e);
re = new UncheckedIOException(e.getMessage(), e);
throw re;
} catch (InterruptedException e) {
LOG.error("executeAndGetCount error. i={}/{}", i, parameterList.size(), e);
re = new RuntimeException(e);
throw re;
} catch (TsurugiTransactionException e) {
Expand All @@ -351,38 +354,59 @@ private final <P> int[] executeAndGetCount(TsurugiSqlPreparedStatement<P> ps, Co
String message = MessageFormat.format("sql={0}, parameter={1}", ps.getSql(), parameter);
re.addSuppressed(new ExceptionInfo(message));
}
LOG.error("executeAndGetCount error. i={}/{}", i, parameterList.size(), e);
throw re;
}
i++;
}
} finally {
var set = new HashSet<String>();
int i = 0;
for (var rc : rcList) {
try {
rc.close();
try (rc) {
rc.checkLowResult();
} catch (IOException e) {
if (!set.contains(e.getMessage())) {
set.add(e.getMessage());
LOG.error("executeAndGetCount.close() error. i={}/{}", i, rcList.size(), e);
}
if (re != null) {
re.addSuppressed(e);
} else {
re = new UncheckedIOException(e.getMessage(), e);
}
} catch (TsurugiTransactionException e) {
if (!set.contains(e.getMessage())) {
set.add(e.getMessage());
LOG.error("executeAndGetCount.close() error. i={}/{}", i, rcList.size(), e);
}
if (re != null) {
re.addSuppressed(e);
} else {
re = new TsurugiTransactionRuntimeException(e);
}
} catch (RuntimeException e) {
if (!set.contains(e.getMessage())) {
set.add(e.getMessage());
LOG.error("executeAndGetCount.close() error. i={}/{}", i, rcList.size(), e);
}
if (re != null) {
re.addSuppressed(e);
} else {
re = e;
}
} catch (Exception e) {
if (!set.contains(e.getMessage())) {
set.add(e.getMessage());
LOG.error("executeAndGetCount.close() error. i={}/{}", i, rcList.size(), e);
}
if (re != null) {
re.addSuppressed(e);
} else {
re = new RuntimeException(e);
}
}
i++;
}
if (re != null) {
throw re;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.util.List;
import java.util.function.Consumer;

import com.tsurugidb.benchmark.costaccounting.db.dao.CostMasterDao;
import com.tsurugidb.benchmark.costaccounting.db.dao.StockHistoryDao;
import com.tsurugidb.benchmark.costaccounting.db.entity.StockHistory;
import com.tsurugidb.benchmark.costaccounting.db.entity.StockHistoryDateTime;
Expand Down Expand Up @@ -132,14 +133,40 @@ protected void initialize() {

@Override
public void insertSelectFromCostMaster(LocalDate date, LocalTime time) {
throw new UnsupportedOperationException("not yet impl");
var ps = insertSelectFromCostMasterCache.get();
var parameter = TgBindParameters.of(vDate.bind(date), vTime.bind(time));
executeAndGetCount(ps, parameter);
}

private final CachePreparedStatement<TgBindParameters> insertSelectFromCostMasterCache = new CachePreparedStatement<>() {
@Override
protected void initialize() {
var names = getColumnNames();
this.sql = insert + " into " + TABLE_NAME //
+ "(" + names + ")" //
+ " select " + vDate + ", " + vTime + ", c_f_id, c_i_id, c_stock_unit, c_stock_quantity, c_stock_amount from " + CostMasterDao.TABLE_NAME;
this.parameterMapping = TgParameterMapping.of(vDate, vTime);
}
};

@Override
public void insertSelectFromCostMaster(LocalDate date, LocalTime time, int factoryId) {
throw new UnsupportedOperationException("not yet impl");
var ps = insertSelectFromCostMasterByFactoryCache.get();
var parameter = TgBindParameters.of(vDate.bind(date), vTime.bind(time), vFactory.bind(factoryId));
executeAndGetCount(ps, parameter);
}

private final CachePreparedStatement<TgBindParameters> insertSelectFromCostMasterByFactoryCache = new CachePreparedStatement<>() {
@Override
protected void initialize() {
var names = getColumnNames();
this.sql = insert + " into " + TABLE_NAME //
+ "(" + names + ")" //
+ " select " + vDate + ", " + vTime + ", c_f_id, c_i_id, c_stock_unit, c_stock_quantity, c_stock_amount from " + CostMasterDao.TABLE_NAME + " where c_f_id=" + vFactory;
this.parameterMapping = TgParameterMapping.of(vDate, vTime, vFactory);
}
};

@Override
public void forEach(Consumer<StockHistory> entityConsumer) {
doForEach(entityConsumer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;

import org.slf4j.Logger;
Expand All @@ -24,6 +25,7 @@
import com.tsurugidb.benchmark.costaccounting.init.InitialData;
import com.tsurugidb.benchmark.costaccounting.online.OnlineConfig;
import com.tsurugidb.benchmark.costaccounting.util.BenchConst;
import com.tsurugidb.benchmark.costaccounting.util.BenchConst.InsertSelectType;
import com.tsurugidb.benchmark.costaccounting.util.BenchConst.SqlDistinct;
import com.tsurugidb.benchmark.costaccounting.util.ThreadUtil;
import com.tsurugidb.iceaxe.transaction.manager.TgTmSetting;
Expand All @@ -37,6 +39,7 @@ public class BenchPeriodicUpdateStockTask extends BenchPeriodicTask {

public static final String TASK_NAME = "update-stock";
private static final SqlDistinct SQL_DISTINCT = BenchConst.sqlDistinct();
private static final InsertSelectType INSERT_SELECT_TYPE = BenchConst.periodicInsertSelectType(TASK_NAME);

private TgTmSetting settingPre;
private TgTmSetting settingMain;
Expand All @@ -54,6 +57,7 @@ public BenchPeriodicUpdateStockTask(int taskId) {
String threadName = String.format("%s.%d.thread-", TASK_NAME, taskId);
this.service = ThreadUtil.newFixedThreadPool(threadName, threadSize);
}
LOG.info("insert_select_type={}", INSERT_SELECT_TYPE);
this.keepSize = BenchConst.periodicKeepSize(TASK_NAME);
LOG.info("keep.size={}", keepSize);
}
Expand Down Expand Up @@ -145,7 +149,7 @@ private boolean executeAll(StockHistoryDateTime deleteDateTime) {
private void executeAllInTransaction() {
var now = LocalDateTime.now();

if (BenchConst.WORKAROUND) {
if (INSERT_SELECT_TYPE == InsertSelectType.SELECT_AND_INSERT) {
var costMasterDao = dbManager.getCostMasterDao();
try (var stream = costMasterDao.selectAll()) {
streamInsert(stream, now);
Expand All @@ -158,6 +162,7 @@ private void executeAllInTransaction() {
}

private void streamInsert(Stream<CostMaster> stream, LocalDateTime now) {
var count = new AtomicInteger(0);
var dao = dbManager.getStockHistoryDao();
final int batchSize = 10000;
var list = new ArrayList<StockHistory>(batchSize);
Expand All @@ -173,11 +178,15 @@ private void streamInsert(Stream<CostMaster> stream, LocalDateTime now) {
list.add(entity);
if (list.size() >= batchSize) {
dao.insertBatch(list);
count.addAndGet(list.size());
LOG.info("streamInsert() inserted {}", count.get());
list.clear();
}
});
if (!list.isEmpty()) {
dao.insertBatch(list);
count.addAndGet(list.size());
LOG.info("streamInsert() inserted {}", count.get());
}
}

Expand Down Expand Up @@ -262,7 +271,7 @@ public Void call() throws Exception {
}

private void executeInTransaction() {
if (BenchConst.WORKAROUND) {
if (INSERT_SELECT_TYPE == InsertSelectType.SELECT_AND_INSERT) {
var costMasterDao = dbManager.getCostMasterDao();
try (var stream = costMasterDao.selectByFactory(factoryId)) {
streamInsert(stream, now);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,23 @@ public static int periodicSplitSize(String taskName) {
return getPropertyInt("periodic.schedule." + taskName + ".split.size", 1);
}

public enum InsertSelectType {
/** select * from src &amp; insert into dst */
SELECT_AND_INSERT,
/** insert into dst select * from src */
INSERT_SELECT,
}

public static InsertSelectType periodicInsertSelectType(String taskName) {
String key = "periodic.schedule." + taskName + ".insert_select.type";
String s = getProperty(key, InsertSelectType.INSERT_SELECT.name());
try {
return InsertSelectType.valueOf(s.toUpperCase());
} catch (Exception e) {
throw new RuntimeException("invalid InsertSelectType. " + key + "=" + s, e);
}
}

public static int periodicKeepSize(String taskName) {
return getPropertyInt("periodic.schedule." + taskName + ".keep.size", -1);
}
Expand Down

0 comments on commit ea6a3b0

Please sign in to comment.