Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Refactor structure of tracing module
Browse files Browse the repository at this point in the history
terrymanu committed Oct 31, 2023
1 parent 235b9d7 commit 8ae2d17
Showing 4 changed files with 95 additions and 93 deletions.
Original file line number Diff line number Diff line change
@@ -20,6 +20,7 @@
import org.apache.shardingsphere.elasticjob.kernel.tracing.event.JobExecutionEvent;
import org.apache.shardingsphere.elasticjob.kernel.tracing.event.JobStatusTraceEvent;
import org.apache.shardingsphere.elasticjob.kernel.tracing.listener.TracingListener;
import org.apache.shardingsphere.elasticjob.tracing.rdb.storage.repository.RDBJobEventRepository;

import javax.sql.DataSource;
import java.sql.SQLException;
@@ -29,10 +30,10 @@
*/
public final class RDBTracingListener implements TracingListener {

private final RDBJobEventStorage repository;
private final RDBJobEventRepository repository;

public RDBTracingListener(final DataSource dataSource) throws SQLException {
repository = RDBJobEventStorage.getInstance(dataSource);
repository = RDBJobEventRepository.getInstance(dataSource);
}

@Override
Original file line number Diff line number Diff line change
@@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.shardingsphere.elasticjob.tracing.rdb.listener;
package org.apache.shardingsphere.elasticjob.tracing.rdb.storage.repository;

import com.google.common.base.Strings;
import lombok.extern.slf4j.Slf4j;
@@ -42,26 +42,26 @@
import java.util.function.Supplier;

/**
* RDB job event storage.
* RDB job event repository.
*/
@Slf4j
public final class RDBJobEventStorage {
public final class RDBJobEventRepository {

private static final String TABLE_JOB_EXECUTION_LOG = "JOB_EXECUTION_LOG";

private static final String TABLE_JOB_STATUS_TRACE_LOG = "JOB_STATUS_TRACE_LOG";

private static final String TASK_ID_STATE_INDEX = "TASK_ID_STATE_INDEX";

private static final Map<DataSource, RDBJobEventStorage> STORAGE_MAP = new ConcurrentHashMap<>();
private static final Map<DataSource, RDBJobEventRepository> STORAGE_MAP = new ConcurrentHashMap<>();

private final DataSource dataSource;

private final TracingStorageDatabaseType tracingStorageDatabaseType;

private final RDBStorageSQLMapper sqlMapper;

private RDBJobEventStorage(final DataSource dataSource) throws SQLException {
private RDBJobEventRepository(final DataSource dataSource) throws SQLException {
this.dataSource = dataSource;
tracingStorageDatabaseType = getTracingStorageDatabaseType(dataSource);
sqlMapper = new RDBStorageSQLMapper(SQLPropertiesFactory.getProperties(tracingStorageDatabaseType));
@@ -75,17 +75,17 @@ private RDBJobEventStorage(final DataSource dataSource) throws SQLException {
* @return RDBJobEventStorage instance
* @throws SQLException SQLException
*/
public static RDBJobEventStorage getInstance(final DataSource dataSource) throws SQLException {
public static RDBJobEventRepository getInstance(final DataSource dataSource) throws SQLException {
return wrapException(() -> STORAGE_MAP.computeIfAbsent(dataSource, ds -> {
try {
return new RDBJobEventStorage(ds);
return new RDBJobEventRepository(ds);
} catch (final SQLException ex) {
throw new WrapException(ex);
}
}));
}

private static RDBJobEventStorage wrapException(final Supplier<RDBJobEventStorage> supplier) throws SQLException {
private static RDBJobEventRepository wrapException(final Supplier<RDBJobEventRepository> supplier) throws SQLException {
try {
return supplier.get();
} catch (final WrapException ex) {
@@ -177,35 +177,35 @@ private void createTaskIdAndStateIndex(final Connection connection) throws SQLEx
/**
* Add job execution event.
*
* @param jobExecutionEvent job execution event
* @param event job execution event
* @return add success or not
*/
public boolean addJobExecutionEvent(final JobExecutionEvent jobExecutionEvent) {
if (null == jobExecutionEvent.getCompleteTime()) {
return insertJobExecutionEvent(jobExecutionEvent);
public boolean addJobExecutionEvent(final JobExecutionEvent event) {
if (null == event.getCompleteTime()) {
return insertJobExecutionEvent(event);
} else {
if (jobExecutionEvent.isSuccess()) {
return updateJobExecutionEventWhenSuccess(jobExecutionEvent);
if (event.isSuccess()) {
return updateJobExecutionEventWhenSuccess(event);
} else {
return updateJobExecutionEventFailure(jobExecutionEvent);
return updateJobExecutionEventFailure(event);
}
}
}

private boolean insertJobExecutionEvent(final JobExecutionEvent jobExecutionEvent) {
private boolean insertJobExecutionEvent(final JobExecutionEvent event) {
boolean result = false;
try (
Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sqlMapper.getInsertForJobExecutionLog())) {
preparedStatement.setString(1, jobExecutionEvent.getId());
preparedStatement.setString(2, jobExecutionEvent.getJobName());
preparedStatement.setString(3, jobExecutionEvent.getTaskId());
preparedStatement.setString(4, jobExecutionEvent.getHostname());
preparedStatement.setString(5, jobExecutionEvent.getIp());
preparedStatement.setInt(6, jobExecutionEvent.getShardingItem());
preparedStatement.setString(7, jobExecutionEvent.getSource().toString());
preparedStatement.setBoolean(8, jobExecutionEvent.isSuccess());
preparedStatement.setTimestamp(9, new Timestamp(jobExecutionEvent.getStartTime().getTime()));
preparedStatement.setString(1, event.getId());
preparedStatement.setString(2, event.getJobName());
preparedStatement.setString(3, event.getTaskId());
preparedStatement.setString(4, event.getHostname());
preparedStatement.setString(5, event.getIp());
preparedStatement.setInt(6, event.getShardingItem());
preparedStatement.setString(7, event.getSource().toString());
preparedStatement.setBoolean(8, event.isSuccess());
preparedStatement.setTimestamp(9, new Timestamp(event.getStartTime().getTime()));
preparedStatement.execute();
result = true;
} catch (final SQLException ex) {
@@ -217,16 +217,16 @@ private boolean insertJobExecutionEvent(final JobExecutionEvent jobExecutionEven
return result;
}

private boolean updateJobExecutionEventWhenSuccess(final JobExecutionEvent jobExecutionEvent) {
private boolean updateJobExecutionEventWhenSuccess(final JobExecutionEvent event) {
boolean result = false;
try (
Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sqlMapper.getUpdateForJobExecutionLog())) {
preparedStatement.setBoolean(1, jobExecutionEvent.isSuccess());
preparedStatement.setTimestamp(2, new Timestamp(jobExecutionEvent.getCompleteTime().getTime()));
preparedStatement.setString(3, jobExecutionEvent.getId());
preparedStatement.setBoolean(1, event.isSuccess());
preparedStatement.setTimestamp(2, new Timestamp(event.getCompleteTime().getTime()));
preparedStatement.setString(3, event.getId());
if (0 == preparedStatement.executeUpdate()) {
return insertJobExecutionEventWhenSuccess(jobExecutionEvent);
return insertJobExecutionEventWhenSuccess(event);
}
result = true;
} catch (final SQLException ex) {
@@ -236,44 +236,44 @@ private boolean updateJobExecutionEventWhenSuccess(final JobExecutionEvent jobEx
return result;
}

private boolean insertJobExecutionEventWhenSuccess(final JobExecutionEvent jobExecutionEvent) {
private boolean insertJobExecutionEventWhenSuccess(final JobExecutionEvent event) {
boolean result = false;
try (
Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sqlMapper.getInsertForJobExecutionLogForComplete())) {
preparedStatement.setString(1, jobExecutionEvent.getId());
preparedStatement.setString(2, jobExecutionEvent.getJobName());
preparedStatement.setString(3, jobExecutionEvent.getTaskId());
preparedStatement.setString(4, jobExecutionEvent.getHostname());
preparedStatement.setString(5, jobExecutionEvent.getIp());
preparedStatement.setInt(6, jobExecutionEvent.getShardingItem());
preparedStatement.setString(7, jobExecutionEvent.getSource().toString());
preparedStatement.setBoolean(8, jobExecutionEvent.isSuccess());
preparedStatement.setTimestamp(9, new Timestamp(jobExecutionEvent.getStartTime().getTime()));
preparedStatement.setTimestamp(10, new Timestamp(jobExecutionEvent.getCompleteTime().getTime()));
preparedStatement.setString(1, event.getId());
preparedStatement.setString(2, event.getJobName());
preparedStatement.setString(3, event.getTaskId());
preparedStatement.setString(4, event.getHostname());
preparedStatement.setString(5, event.getIp());
preparedStatement.setInt(6, event.getShardingItem());
preparedStatement.setString(7, event.getSource().toString());
preparedStatement.setBoolean(8, event.isSuccess());
preparedStatement.setTimestamp(9, new Timestamp(event.getStartTime().getTime()));
preparedStatement.setTimestamp(10, new Timestamp(event.getCompleteTime().getTime()));
preparedStatement.execute();
result = true;
} catch (final SQLException ex) {
if (isDuplicateRecord(ex)) {
return updateJobExecutionEventWhenSuccess(jobExecutionEvent);
return updateJobExecutionEventWhenSuccess(event);
}
// TODO log failure directly to output log, consider to be configurable in the future
log.error(ex.getMessage());
}
return result;
}

private boolean updateJobExecutionEventFailure(final JobExecutionEvent jobExecutionEvent) {
private boolean updateJobExecutionEventFailure(final JobExecutionEvent event) {
boolean result = false;
try (
Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sqlMapper.getUpdateForJobExecutionLogForFailure())) {
preparedStatement.setBoolean(1, jobExecutionEvent.isSuccess());
preparedStatement.setTimestamp(2, new Timestamp(jobExecutionEvent.getCompleteTime().getTime()));
preparedStatement.setString(3, truncateString(jobExecutionEvent.getFailureCause()));
preparedStatement.setString(4, jobExecutionEvent.getId());
preparedStatement.setBoolean(1, event.isSuccess());
preparedStatement.setTimestamp(2, new Timestamp(event.getCompleteTime().getTime()));
preparedStatement.setString(3, truncateString(event.getFailureCause()));
preparedStatement.setString(4, event.getId());
if (0 == preparedStatement.executeUpdate()) {
return insertJobExecutionEventWhenFailure(jobExecutionEvent);
return insertJobExecutionEventWhenFailure(event);
}
result = true;
} catch (final SQLException ex) {
@@ -283,26 +283,26 @@ private boolean updateJobExecutionEventFailure(final JobExecutionEvent jobExecut
return result;
}

private boolean insertJobExecutionEventWhenFailure(final JobExecutionEvent jobExecutionEvent) {
private boolean insertJobExecutionEventWhenFailure(final JobExecutionEvent event) {
boolean result = false;
try (
Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sqlMapper.getInsertForJobExecutionLogForFailure())) {
preparedStatement.setString(1, jobExecutionEvent.getId());
preparedStatement.setString(2, jobExecutionEvent.getJobName());
preparedStatement.setString(3, jobExecutionEvent.getTaskId());
preparedStatement.setString(4, jobExecutionEvent.getHostname());
preparedStatement.setString(5, jobExecutionEvent.getIp());
preparedStatement.setInt(6, jobExecutionEvent.getShardingItem());
preparedStatement.setString(7, jobExecutionEvent.getSource().toString());
preparedStatement.setString(8, truncateString(jobExecutionEvent.getFailureCause()));
preparedStatement.setBoolean(9, jobExecutionEvent.isSuccess());
preparedStatement.setTimestamp(10, new Timestamp(jobExecutionEvent.getStartTime().getTime()));
preparedStatement.setString(1, event.getId());
preparedStatement.setString(2, event.getJobName());
preparedStatement.setString(3, event.getTaskId());
preparedStatement.setString(4, event.getHostname());
preparedStatement.setString(5, event.getIp());
preparedStatement.setInt(6, event.getShardingItem());
preparedStatement.setString(7, event.getSource().toString());
preparedStatement.setString(8, truncateString(event.getFailureCause()));
preparedStatement.setBoolean(9, event.isSuccess());
preparedStatement.setTimestamp(10, new Timestamp(event.getStartTime().getTime()));
preparedStatement.execute();
result = true;
} catch (final SQLException ex) {
if (isDuplicateRecord(ex)) {
return updateJobExecutionEventFailure(jobExecutionEvent);
return updateJobExecutionEventFailure(event);
}
// TODO log failure directly to output log, consider to be configurable in the future
log.error(ex.getMessage());
@@ -317,28 +317,28 @@ private boolean isDuplicateRecord(final SQLException ex) {
/**
* Add job status trace event.
*
* @param jobStatusTraceEvent job status trace event
* @param event job status trace event
* @return add success or not
*/
public boolean addJobStatusTraceEvent(final JobStatusTraceEvent jobStatusTraceEvent) {
String originalTaskId = jobStatusTraceEvent.getOriginalTaskId();
if (State.TASK_STAGING != jobStatusTraceEvent.getState()) {
originalTaskId = getOriginalTaskId(jobStatusTraceEvent.getTaskId());
public boolean addJobStatusTraceEvent(final JobStatusTraceEvent event) {
String originalTaskId = event.getOriginalTaskId();
if (State.TASK_STAGING != event.getState()) {
originalTaskId = getOriginalTaskId(event.getTaskId());
}
boolean result = false;
try (
Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sqlMapper.getInsertForJobStatusTraceLog())) {
preparedStatement.setString(1, UUID.randomUUID().toString());
preparedStatement.setString(2, jobStatusTraceEvent.getJobName());
preparedStatement.setString(2, event.getJobName());
preparedStatement.setString(3, originalTaskId);
preparedStatement.setString(4, jobStatusTraceEvent.getTaskId());
preparedStatement.setString(5, jobStatusTraceEvent.getSlaveId());
preparedStatement.setString(6, jobStatusTraceEvent.getExecutionType().name());
preparedStatement.setString(7, jobStatusTraceEvent.getShardingItems());
preparedStatement.setString(8, jobStatusTraceEvent.getState().toString());
preparedStatement.setString(9, truncateString(jobStatusTraceEvent.getMessage()));
preparedStatement.setTimestamp(10, new Timestamp(jobStatusTraceEvent.getCreationTime().getTime()));
preparedStatement.setString(4, event.getTaskId());
preparedStatement.setString(5, event.getSlaveId());
preparedStatement.setString(6, event.getExecutionType().name());
preparedStatement.setString(7, event.getShardingItems());
preparedStatement.setString(8, event.getState().toString());
preparedStatement.setString(9, truncateString(event.getMessage()));
preparedStatement.setTimestamp(10, new Timestamp(event.getCreationTime().getTime()));
preparedStatement.execute();
result = true;
} catch (final SQLException ex) {
Original file line number Diff line number Diff line change
@@ -25,6 +25,7 @@
import org.apache.shardingsphere.elasticjob.kernel.tracing.event.JobStatusTraceEvent;
import org.apache.shardingsphere.elasticjob.kernel.tracing.event.JobStatusTraceEvent.State;
import org.apache.shardingsphere.elasticjob.test.util.ReflectionUtils;
import org.apache.shardingsphere.elasticjob.tracing.rdb.storage.repository.RDBJobEventRepository;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -43,7 +44,7 @@ class RDBTracingListenerTest {
private static final String JOB_NAME = "test_rdb_event_listener";

@Mock
private RDBJobEventStorage repository;
private RDBJobEventRepository repository;

private JobTracingEventBus jobTracingEventBus;

Original file line number Diff line number Diff line change
@@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.shardingsphere.elasticjob.tracing.rdb.listener;
package org.apache.shardingsphere.elasticjob.tracing.rdb.storage.repository;

import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.shardingsphere.elasticjob.kernel.executor.ExecutionType;
@@ -35,9 +35,9 @@
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

class RDBJobEventStorageTest {
class RDBJobEventRepositoryTest {

private RDBJobEventStorage storage;
private RDBJobEventRepository repository;

private BasicDataSource dataSource;

@@ -48,7 +48,7 @@ void setup() throws SQLException {
dataSource.setUrl("jdbc:h2:mem:job_event_storage");
dataSource.setUsername("sa");
dataSource.setPassword("");
storage = RDBJobEventStorage.getInstance(dataSource);
repository = RDBJobEventRepository.getInstance(dataSource);
}

@AfterEach
@@ -58,29 +58,29 @@ void teardown() throws SQLException {

@Test
void assertAddJobExecutionEvent() {
assertTrue(storage.addJobExecutionEvent(new JobExecutionEvent("localhost", "127.0.0.1", "fake_task_id", "test_job", JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER, 0)));
assertTrue(repository.addJobExecutionEvent(new JobExecutionEvent("localhost", "127.0.0.1", "fake_task_id", "test_job", JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER, 0)));
}

@Test
void assertAddJobStatusTraceEvent() {
assertTrue(storage.addJobStatusTraceEvent(
assertTrue(repository.addJobStatusTraceEvent(
new JobStatusTraceEvent("test_job", "fake_task_id", "fake_slave_id", ExecutionType.READY, "0", State.TASK_RUNNING, "message is empty.")));
}

@Test
void assertUpdateJobExecutionEventWhenSuccess() {
JobExecutionEvent startEvent = new JobExecutionEvent("localhost", "127.0.0.1", "fake_task_id", "test_job", JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER, 0);
assertTrue(storage.addJobExecutionEvent(startEvent));
assertTrue(repository.addJobExecutionEvent(startEvent));
JobExecutionEvent successEvent = startEvent.executionSuccess();
assertTrue(storage.addJobExecutionEvent(successEvent));
assertTrue(repository.addJobExecutionEvent(successEvent));
}

@Test
void assertUpdateJobExecutionEventWhenFailure() {
JobExecutionEvent startEvent = new JobExecutionEvent("localhost", "127.0.0.1", "fake_task_id", "test_job", JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER, 0);
assertTrue(storage.addJobExecutionEvent(startEvent));
assertTrue(repository.addJobExecutionEvent(startEvent));
JobExecutionEvent failureEvent = startEvent.executionFailure("java.lang.RuntimeException: failure");
assertTrue(storage.addJobExecutionEvent(failureEvent));
assertTrue(repository.addJobExecutionEvent(failureEvent));
assertThat(failureEvent.getFailureCause(), is("java.lang.RuntimeException: failure"));
assertNotNull(failureEvent.getCompleteTime());
}
@@ -89,34 +89,34 @@ void assertUpdateJobExecutionEventWhenFailure() {
void assertUpdateJobExecutionEventWhenSuccessAndConflict() {
JobExecutionEvent startEvent = new JobExecutionEvent("localhost", "127.0.0.1", "fake_task_id", "test_job", JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER, 0);
JobExecutionEvent successEvent = startEvent.executionSuccess();
assertTrue(storage.addJobExecutionEvent(successEvent));
assertFalse(storage.addJobExecutionEvent(startEvent));
assertTrue(repository.addJobExecutionEvent(successEvent));
assertFalse(repository.addJobExecutionEvent(startEvent));
}

@Test
void assertUpdateJobExecutionEventWhenFailureAndConflict() {
JobExecutionEvent startEvent = new JobExecutionEvent("localhost", "127.0.0.1", "fake_task_id", "test_job", JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER, 0);
JobExecutionEvent failureEvent = startEvent.executionFailure("java.lang.RuntimeException: failure");
assertTrue(storage.addJobExecutionEvent(failureEvent));
assertTrue(repository.addJobExecutionEvent(failureEvent));
assertThat(failureEvent.getFailureCause(), is("java.lang.RuntimeException: failure"));
assertFalse(storage.addJobExecutionEvent(startEvent));
assertFalse(repository.addJobExecutionEvent(startEvent));
}

@Test
void assertUpdateJobExecutionEventWhenFailureAndMessageExceed() {
JobExecutionEvent startEvent = new JobExecutionEvent("localhost", "127.0.0.1", "fake_task_id", "test_job", JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER, 0);
assertTrue(storage.addJobExecutionEvent(startEvent));
assertTrue(repository.addJobExecutionEvent(startEvent));
StringBuilder failureMsg = new StringBuilder();
for (int i = 0; i < 600; i++) {
failureMsg.append(i);
}
JobExecutionEvent failEvent = startEvent.executionFailure("java.lang.RuntimeException: failure" + failureMsg);
assertTrue(storage.addJobExecutionEvent(failEvent));
assertTrue(repository.addJobExecutionEvent(failEvent));
assertThat(failEvent.getFailureCause(), startsWith("java.lang.RuntimeException: failure"));
}

@Test
void assertFindJobExecutionEvent() {
storage.addJobExecutionEvent(new JobExecutionEvent("localhost", "127.0.0.1", "fake_task_id", "test_job", JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER, 0));
repository.addJobExecutionEvent(new JobExecutionEvent("localhost", "127.0.0.1", "fake_task_id", "test_job", JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER, 0));
}
}

0 comments on commit 8ae2d17

Please sign in to comment.