Skip to content

Commit

Permalink
Refactor JobStatusTraceEvent
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Oct 26, 2023
1 parent 125f06f commit 38a2a41
Show file tree
Hide file tree
Showing 9 changed files with 25 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import org.apache.shardingsphere.elasticjob.infra.context.ExecutionType;

import java.util.Date;
import java.util.UUID;
Expand All @@ -44,7 +45,7 @@ public final class JobStatusTraceEvent implements JobEvent {

private final String slaveId;

private final String executionType;
private final ExecutionType executionType;

private final String shardingItems;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.google.common.base.Strings;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.elasticjob.infra.context.ExecutionType;
import org.apache.shardingsphere.elasticjob.tracing.event.JobExecutionEvent;
import org.apache.shardingsphere.elasticjob.tracing.event.JobStatusTraceEvent;
import org.apache.shardingsphere.elasticjob.tracing.event.JobStatusTraceEvent.State;
Expand Down Expand Up @@ -343,7 +344,7 @@ public boolean addJobStatusTraceEvent(final JobStatusTraceEvent jobStatusTraceEv
preparedStatement.setString(3, originalTaskId);
preparedStatement.setString(4, jobStatusTraceEvent.getTaskId());
preparedStatement.setString(5, jobStatusTraceEvent.getSlaveId());
preparedStatement.setString(6, jobStatusTraceEvent.getExecutionType());
preparedStatement.setString(6, jobStatusTraceEvent.getExecutionType().name());
preparedStatement.setString(7, jobStatusTraceEvent.getShardingItems());
preparedStatement.setString(8, jobStatusTraceEvent.getState().toString());
preparedStatement.setString(9, truncateString(jobStatusTraceEvent.getMessage()));
Expand Down Expand Up @@ -388,7 +389,7 @@ List<JobStatusTraceEvent> getJobStatusTraceEvents(final String taskId) {
try (ResultSet resultSet = preparedStatement.executeQuery()) {
while (resultSet.next()) {
JobStatusTraceEvent jobStatusTraceEvent = new JobStatusTraceEvent(resultSet.getString(1), resultSet.getString(2), resultSet.getString(3), resultSet.getString(4),
resultSet.getString(5), resultSet.getString(6), resultSet.getString(7),
resultSet.getString(5), ExecutionType.valueOf(resultSet.getString(6)), resultSet.getString(7),
State.valueOf(resultSet.getString(8)), resultSet.getString(9), new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(resultSet.getString(10)));
result.add(jobStatusTraceEvent);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import lombok.SneakyThrows;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.shardingsphere.elasticjob.infra.context.ExecutionType;
import org.apache.shardingsphere.elasticjob.tracing.JobTracingEventBus;
import org.apache.shardingsphere.elasticjob.tracing.api.TracingConfiguration;
import org.apache.shardingsphere.elasticjob.tracing.event.JobExecutionEvent;
Expand Down Expand Up @@ -76,7 +77,7 @@ void assertPostJobExecutionEvent() {

@Test
void assertPostJobStatusTraceEvent() {
JobStatusTraceEvent jobStatusTraceEvent = new JobStatusTraceEvent(JOB_NAME, "fake_task_id", "fake_slave_id", "READY", "0", State.TASK_RUNNING, "message is empty.");
JobStatusTraceEvent jobStatusTraceEvent = new JobStatusTraceEvent(JOB_NAME, "fake_task_id", "fake_slave_id", ExecutionType.READY, "0", State.TASK_RUNNING, "message is empty.");
jobTracingEventBus.post(jobStatusTraceEvent);
verify(repository, atMost(1)).addJobStatusTraceEvent(jobStatusTraceEvent);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.shardingsphere.elasticjob.tracing.rdb.storage;

import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.shardingsphere.elasticjob.infra.context.ExecutionType;
import org.apache.shardingsphere.elasticjob.tracing.event.JobExecutionEvent;
import org.apache.shardingsphere.elasticjob.tracing.event.JobStatusTraceEvent;
import org.apache.shardingsphere.elasticjob.tracing.event.JobStatusTraceEvent.State;
Expand Down Expand Up @@ -64,13 +65,13 @@ void assertAddJobExecutionEvent() {
@Test
void assertAddJobStatusTraceEvent() {
assertTrue(storage.addJobStatusTraceEvent(
new JobStatusTraceEvent("test_job", "fake_task_id", "fake_slave_id", "READY", "0", State.TASK_RUNNING, "message is empty.")));
new JobStatusTraceEvent("test_job", "fake_task_id", "fake_slave_id", ExecutionType.READY, "0", State.TASK_RUNNING, "message is empty.")));
}

@Test
void assertAddJobStatusTraceEventWhenFailoverWithTaskStagingState() {
JobStatusTraceEvent jobStatusTraceEvent = new JobStatusTraceEvent(
"test_job", "fake_failover_task_id", "fake_slave_id", "FAILOVER", "0", State.TASK_STAGING, "message is empty.");
"test_job", "fake_failover_task_id", "fake_slave_id", ExecutionType.FAILOVER, "0", State.TASK_STAGING, "message is empty.");
jobStatusTraceEvent.setOriginalTaskId("original_fake_failover_task_id");
assertThat(storage.getJobStatusTraceEvents("fake_failover_task_id").size(), is(0));
storage.addJobStatusTraceEvent(jobStatusTraceEvent);
Expand All @@ -80,11 +81,11 @@ void assertAddJobStatusTraceEventWhenFailoverWithTaskStagingState() {
@Test
void assertAddJobStatusTraceEventWhenFailoverWithTaskFailedState() {
JobStatusTraceEvent stagingJobStatusTraceEvent = new JobStatusTraceEvent(
"test_job", "fake_failed_failover_task_id", "fake_slave_id", "FAILOVER", "0", State.TASK_STAGING, "message is empty.");
"test_job", "fake_failed_failover_task_id", "fake_slave_id", ExecutionType.FAILOVER, "0", State.TASK_STAGING, "message is empty.");
stagingJobStatusTraceEvent.setOriginalTaskId("original_fake_failed_failover_task_id");
storage.addJobStatusTraceEvent(stagingJobStatusTraceEvent);
JobStatusTraceEvent failedJobStatusTraceEvent = new JobStatusTraceEvent(
"test_job", "fake_failed_failover_task_id", "fake_slave_id", "FAILOVER", "0", State.TASK_FAILED, "message is empty.");
"test_job", "fake_failed_failover_task_id", "fake_slave_id", ExecutionType.FAILOVER, "0", State.TASK_FAILED, "message is empty.");
storage.addJobStatusTraceEvent(failedJobStatusTraceEvent);
List<JobStatusTraceEvent> jobStatusTraceEvents = storage.getJobStatusTraceEvents("fake_failed_failover_task_id");
assertThat(jobStatusTraceEvents.size(), is(2));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.shardingsphere.elasticjob.kernel.internal.context;
package org.apache.shardingsphere.elasticjob.infra.context;

/**
* Execution type.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.shardingsphere.elasticjob.kernel.internal.context;
package org.apache.shardingsphere.elasticjob.infra.context;

import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
* limitations under the License.
*/

package org.apache.shardingsphere.elasticjob.kernel.internal.context;
package org.apache.shardingsphere.elasticjob.infra.context;

import org.apache.shardingsphere.elasticjob.kernel.internal.context.TaskContext.MetaInfo;
import org.apache.shardingsphere.elasticjob.kernel.internal.context.fixture.TaskNode;
import org.apache.shardingsphere.elasticjob.infra.context.TaskContext.MetaInfo;
import org.apache.shardingsphere.elasticjob.infra.context.fixture.TaskNode;
import org.hamcrest.CoreMatchers;
import org.junit.jupiter.api.Test;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
* limitations under the License.
*/

package org.apache.shardingsphere.elasticjob.kernel.internal.context.fixture;
package org.apache.shardingsphere.elasticjob.infra.context.fixture;

import lombok.Builder;
import org.apache.shardingsphere.elasticjob.kernel.internal.context.ExecutionType;
import org.apache.shardingsphere.elasticjob.infra.context.ExecutionType;

@Builder
public final class TaskNode {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,16 @@
import com.google.common.base.Strings;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.executor.JobFacade;
import org.apache.shardingsphere.elasticjob.infra.context.TaskContext;
import org.apache.shardingsphere.elasticjob.infra.exception.JobExecutionEnvironmentException;
import org.apache.shardingsphere.elasticjob.infra.listener.ElasticJobListener;
import org.apache.shardingsphere.elasticjob.infra.listener.ShardingContexts;
import org.apache.shardingsphere.elasticjob.kernel.internal.config.ConfigurationService;
import org.apache.shardingsphere.elasticjob.kernel.internal.failover.FailoverService;
import org.apache.shardingsphere.elasticjob.kernel.internal.sharding.ExecutionContextService;
import org.apache.shardingsphere.elasticjob.kernel.internal.sharding.ExecutionService;
import org.apache.shardingsphere.elasticjob.kernel.internal.sharding.ShardingService;
import org.apache.shardingsphere.elasticjob.executor.JobFacade;
import org.apache.shardingsphere.elasticjob.kernel.internal.context.TaskContext;
import org.apache.shardingsphere.elasticjob.infra.exception.JobExecutionEnvironmentException;
import org.apache.shardingsphere.elasticjob.infra.listener.ElasticJobListener;
import org.apache.shardingsphere.elasticjob.infra.listener.ShardingContexts;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.tracing.JobTracingEventBus;
import org.apache.shardingsphere.elasticjob.tracing.api.TracingConfiguration;
Expand Down Expand Up @@ -163,7 +163,7 @@ public void postJobExecutionEvent(final JobExecutionEvent jobExecutionEvent) {
public void postJobStatusTraceEvent(final String taskId, final State state, final String message) {
TaskContext taskContext = TaskContext.from(taskId);
jobTracingEventBus.post(new JobStatusTraceEvent(taskContext.getMetaInfo().getJobName(), taskContext.getId(),
taskContext.getSlaveId(), taskContext.getType().name(), taskContext.getMetaInfo().getShardingItems().toString(), state, message));
taskContext.getSlaveId(), taskContext.getType(), taskContext.getMetaInfo().getShardingItems().toString(), state, message));
if (!Strings.isNullOrEmpty(message)) {
log.trace(message);
}
Expand Down

0 comments on commit 38a2a41

Please sign in to comment.