Skip to content

Commit

Permalink
Refactor structure of tracing module
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Oct 31, 2023
1 parent c0e99f9 commit 235b9d7
Show file tree
Hide file tree
Showing 50 changed files with 89 additions and 140 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.shardingsphere.elasticjob.tracing.rdb.datasource;
package org.apache.shardingsphere.elasticjob.tracing.rdb.config;

import com.google.common.base.CaseFormat;
import com.google.common.base.Joiner;
Expand All @@ -24,7 +24,9 @@
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import org.apache.shardingsphere.elasticjob.kernel.tracing.api.TracingStorageConfiguration;
import org.apache.shardingsphere.elasticjob.kernel.tracing.config.TracingStorageConfiguration;
import org.apache.shardingsphere.elasticjob.tracing.rdb.storage.datasource.DataSourceRegistry;
import org.apache.shardingsphere.elasticjob.tracing.rdb.storage.datasource.JDBCParameterDecorator;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;

import javax.sql.DataSource;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,18 @@
* limitations under the License.
*/

package org.apache.shardingsphere.elasticjob.tracing.rdb.storage;
package org.apache.shardingsphere.elasticjob.tracing.rdb.listener;

import com.google.common.base.Strings;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.elasticjob.kernel.executor.ExecutionType;
import org.apache.shardingsphere.elasticjob.kernel.tracing.event.JobExecutionEvent;
import org.apache.shardingsphere.elasticjob.kernel.tracing.event.JobStatusTraceEvent;
import org.apache.shardingsphere.elasticjob.kernel.tracing.event.JobStatusTraceEvent.State;
import org.apache.shardingsphere.elasticjob.kernel.tracing.exception.WrapException;
import org.apache.shardingsphere.elasticjob.tracing.rdb.type.TracingStorageDatabaseType;
import org.apache.shardingsphere.elasticjob.tracing.rdb.type.impl.DefaultTracingStorageDatabaseType;
import org.apache.shardingsphere.elasticjob.tracing.rdb.storage.sql.RDBStorageSQLMapper;
import org.apache.shardingsphere.elasticjob.tracing.rdb.storage.sql.SQLPropertiesFactory;
import org.apache.shardingsphere.elasticjob.tracing.rdb.storage.type.TracingStorageDatabaseType;
import org.apache.shardingsphere.elasticjob.tracing.rdb.storage.type.impl.DefaultTracingStorageDatabaseType;
import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;

import javax.sql.DataSource;
Expand All @@ -35,10 +36,6 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -88,14 +85,7 @@ public static RDBJobEventStorage getInstance(final DataSource dataSource) throws
}));
}

/**
* WrapException util method.
*
* @param supplier supplier
* @return RDBJobEventStorage
* @throws SQLException SQLException
*/
public static RDBJobEventStorage wrapException(final Supplier<RDBJobEventStorage> supplier) throws SQLException {
private static RDBJobEventStorage wrapException(final Supplier<RDBJobEventStorage> supplier) throws SQLException {
try {
return supplier.get();
} catch (final WrapException ex) {
Expand Down Expand Up @@ -379,25 +369,4 @@ private String getOriginalTaskId(final String taskId) {
private String truncateString(final String str) {
return !Strings.isNullOrEmpty(str) && str.length() > 4000 ? str.substring(0, 4000) : str;
}

List<JobStatusTraceEvent> getJobStatusTraceEvents(final String taskId) {
List<JobStatusTraceEvent> result = new ArrayList<>();
try (
Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sqlMapper.getSelectForJobStatusTraceLog())) {
preparedStatement.setString(1, taskId);
try (ResultSet resultSet = preparedStatement.executeQuery()) {
while (resultSet.next()) {
JobStatusTraceEvent jobStatusTraceEvent = new JobStatusTraceEvent(resultSet.getString(1), resultSet.getString(2), resultSet.getString(3), resultSet.getString(4),
resultSet.getString(5), ExecutionType.valueOf(resultSet.getString(6)), resultSet.getString(7),
State.valueOf(resultSet.getString(8)), resultSet.getString(9), new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(resultSet.getString(10)));
result.add(jobStatusTraceEvent);
}
}
} catch (final SQLException | ParseException ex) {
// TODO log failure directly to output log, consider to be configurable in the future
log.error(ex.getMessage());
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.shardingsphere.elasticjob.kernel.tracing.event.JobExecutionEvent;
import org.apache.shardingsphere.elasticjob.kernel.tracing.event.JobStatusTraceEvent;
import org.apache.shardingsphere.elasticjob.kernel.tracing.listener.TracingListener;
import org.apache.shardingsphere.elasticjob.tracing.rdb.storage.RDBJobEventStorage;

import javax.sql.DataSource;
import java.sql.SQLException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
* limitations under the License.
*/

package org.apache.shardingsphere.elasticjob.tracing.rdb.datasource;
package org.apache.shardingsphere.elasticjob.tracing.rdb.storage.converter;

import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.elasticjob.kernel.tracing.api.TracingStorageConfiguration;
import org.apache.shardingsphere.elasticjob.kernel.tracing.config.TracingStorageConfiguration;
import org.apache.shardingsphere.elasticjob.kernel.tracing.exception.TracingStorageUnavailableException;
import org.apache.shardingsphere.elasticjob.kernel.tracing.storage.TracingStorageConverter;
import org.apache.shardingsphere.elasticjob.tracing.rdb.config.RDBTracingStorageConfiguration;
import org.apache.shardingsphere.elasticjob.tracing.rdb.storage.datasource.DataSourceRegistry;

import javax.sql.DataSource;
import java.sql.Connection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
* limitations under the License.
*/

package org.apache.shardingsphere.elasticjob.tracing.rdb.datasource;
package org.apache.shardingsphere.elasticjob.tracing.rdb.storage.datasource;

import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.shardingsphere.elasticjob.tracing.rdb.config.RDBTracingStorageConfiguration;

import javax.sql.DataSource;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -50,7 +51,13 @@ public static DataSourceRegistry getInstance() {
return instance;
}

void registerDataSource(final RDBTracingStorageConfiguration dataSourceConfig, final DataSource dataSource) {
/**
* Register data source.
*
* @param dataSourceConfig data source configuration
* @param dataSource data source
*/
public void registerDataSource(final RDBTracingStorageConfiguration dataSourceConfig, final DataSource dataSource) {
dataSources.putIfAbsent(dataSourceConfig, dataSource);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.shardingsphere.elasticjob.tracing.rdb.datasource;
package org.apache.shardingsphere.elasticjob.tracing.rdb.storage.datasource;

import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.shardingsphere.elasticjob.tracing.rdb.storage;
package org.apache.shardingsphere.elasticjob.tracing.rdb.storage.sql;

import lombok.Getter;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
* limitations under the License.
*/

package org.apache.shardingsphere.elasticjob.tracing.rdb.storage;
package org.apache.shardingsphere.elasticjob.tracing.rdb.storage.sql;

import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.SneakyThrows;
import org.apache.shardingsphere.elasticjob.tracing.rdb.type.TracingStorageDatabaseType;
import org.apache.shardingsphere.elasticjob.tracing.rdb.storage.type.TracingStorageDatabaseType;

import java.io.IOException;
import java.io.InputStream;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.shardingsphere.elasticjob.tracing.rdb.type;
package org.apache.shardingsphere.elasticjob.tracing.rdb.storage.type;

import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
* limitations under the License.
*/

package org.apache.shardingsphere.elasticjob.tracing.rdb.type.impl;
package org.apache.shardingsphere.elasticjob.tracing.rdb.storage.type.impl;

import org.apache.shardingsphere.elasticjob.tracing.rdb.type.TracingStorageDatabaseType;
import org.apache.shardingsphere.elasticjob.tracing.rdb.storage.type.TracingStorageDatabaseType;

/**
* Tracing storage database type for DB2.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
* limitations under the License.
*/

package org.apache.shardingsphere.elasticjob.tracing.rdb.type.impl;
package org.apache.shardingsphere.elasticjob.tracing.rdb.storage.type.impl;

import org.apache.shardingsphere.elasticjob.tracing.rdb.type.TracingStorageDatabaseType;
import org.apache.shardingsphere.elasticjob.tracing.rdb.storage.type.TracingStorageDatabaseType;

/**
* Default tracing storage database type.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
* limitations under the License.
*/

package org.apache.shardingsphere.elasticjob.tracing.rdb.type.impl;
package org.apache.shardingsphere.elasticjob.tracing.rdb.storage.type.impl;

import org.apache.shardingsphere.elasticjob.tracing.rdb.type.TracingStorageDatabaseType;
import org.apache.shardingsphere.elasticjob.tracing.rdb.storage.type.TracingStorageDatabaseType;

/**
* Tracing storage database type for H2.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
* limitations under the License.
*/

package org.apache.shardingsphere.elasticjob.tracing.rdb.type.impl;
package org.apache.shardingsphere.elasticjob.tracing.rdb.storage.type.impl;

import org.apache.shardingsphere.elasticjob.tracing.rdb.type.TracingStorageDatabaseType;
import org.apache.shardingsphere.elasticjob.tracing.rdb.storage.type.TracingStorageDatabaseType;

/**
* Tracing storage database type for MySQL.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
* limitations under the License.
*/

package org.apache.shardingsphere.elasticjob.tracing.rdb.type.impl;
package org.apache.shardingsphere.elasticjob.tracing.rdb.storage.type.impl;

import org.apache.shardingsphere.elasticjob.tracing.rdb.type.TracingStorageDatabaseType;
import org.apache.shardingsphere.elasticjob.tracing.rdb.storage.type.TracingStorageDatabaseType;

/**
* Tracing storage database type for Oracle.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
* limitations under the License.
*/

package org.apache.shardingsphere.elasticjob.tracing.rdb.type.impl;
package org.apache.shardingsphere.elasticjob.tracing.rdb.storage.type.impl;

import org.apache.shardingsphere.elasticjob.tracing.rdb.type.TracingStorageDatabaseType;
import org.apache.shardingsphere.elasticjob.tracing.rdb.storage.type.TracingStorageDatabaseType;

/**
* Tracing storage database type for PostgreSQL.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
* limitations under the License.
*/

package org.apache.shardingsphere.elasticjob.tracing.rdb.type.impl;
package org.apache.shardingsphere.elasticjob.tracing.rdb.storage.type.impl;

import org.apache.shardingsphere.elasticjob.tracing.rdb.type.TracingStorageDatabaseType;
import org.apache.shardingsphere.elasticjob.tracing.rdb.storage.type.TracingStorageDatabaseType;

/**
* Tracing storage database type for SQLServer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@

import lombok.Getter;
import lombok.Setter;
import org.apache.shardingsphere.elasticjob.kernel.tracing.api.TracingStorageConfiguration;
import org.apache.shardingsphere.elasticjob.kernel.tracing.config.TracingStorageConfiguration;
import org.apache.shardingsphere.elasticjob.kernel.tracing.yaml.YamlTracingStorageConfiguration;
import org.apache.shardingsphere.elasticjob.tracing.rdb.datasource.RDBTracingStorageConfiguration;
import org.apache.shardingsphere.elasticjob.tracing.rdb.config.RDBTracingStorageConfiguration;

import javax.sql.DataSource;
import java.util.LinkedHashMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
package org.apache.shardingsphere.elasticjob.tracing.rdb.yaml;

import org.apache.shardingsphere.elasticjob.kernel.infra.yaml.config.YamlConfigurationConverter;
import org.apache.shardingsphere.elasticjob.kernel.tracing.api.TracingStorageConfiguration;
import org.apache.shardingsphere.elasticjob.kernel.tracing.config.TracingStorageConfiguration;
import org.apache.shardingsphere.elasticjob.kernel.tracing.yaml.YamlTracingStorageConfiguration;
import org.apache.shardingsphere.elasticjob.tracing.rdb.datasource.RDBTracingStorageConfiguration;
import org.apache.shardingsphere.elasticjob.tracing.rdb.config.RDBTracingStorageConfiguration;

import javax.sql.DataSource;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@
# limitations under the License.
#

org.apache.shardingsphere.elasticjob.tracing.rdb.datasource.RDBTracingStorageConverter
org.apache.shardingsphere.elasticjob.tracing.rdb.storage.converter.RDBTracingStorageConverter
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
# limitations under the License.
#

org.apache.shardingsphere.elasticjob.tracing.rdb.type.impl.MySQLTracingStorageDatabaseType
org.apache.shardingsphere.elasticjob.tracing.rdb.type.impl.PostgreSQLTracingStorageDatabaseType
org.apache.shardingsphere.elasticjob.tracing.rdb.type.impl.OracleTracingStorageDatabaseType
org.apache.shardingsphere.elasticjob.tracing.rdb.type.impl.SQLServerTracingStorageDatabaseType
org.apache.shardingsphere.elasticjob.tracing.rdb.type.impl.DB2TracingStorageDatabaseType
org.apache.shardingsphere.elasticjob.tracing.rdb.type.impl.H2TracingStorageDatabaseType
org.apache.shardingsphere.elasticjob.tracing.rdb.storage.type.impl.MySQLTracingStorageDatabaseType
org.apache.shardingsphere.elasticjob.tracing.rdb.storage.type.impl.PostgreSQLTracingStorageDatabaseType
org.apache.shardingsphere.elasticjob.tracing.rdb.storage.type.impl.OracleTracingStorageDatabaseType
org.apache.shardingsphere.elasticjob.tracing.rdb.storage.type.impl.SQLServerTracingStorageDatabaseType
org.apache.shardingsphere.elasticjob.tracing.rdb.storage.type.impl.DB2TracingStorageDatabaseType
org.apache.shardingsphere.elasticjob.tracing.rdb.storage.type.impl.H2TracingStorageDatabaseType
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.shardingsphere.elasticjob.tracing.rdb.datasource;
package org.apache.shardingsphere.elasticjob.tracing.rdb.config;

import com.zaxxer.hikari.HikariDataSource;
import org.apache.commons.dbcp2.BasicDataSource;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.shardingsphere.elasticjob.tracing.rdb.storage;
package org.apache.shardingsphere.elasticjob.tracing.rdb.listener;

import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.shardingsphere.elasticjob.kernel.executor.ExecutionType;
Expand All @@ -27,7 +27,6 @@
import org.junit.jupiter.api.Test;

import java.sql.SQLException;
import java.util.List;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.startsWith;
Expand Down Expand Up @@ -68,32 +67,6 @@ void assertAddJobStatusTraceEvent() {
new JobStatusTraceEvent("test_job", "fake_task_id", "fake_slave_id", ExecutionType.READY, "0", State.TASK_RUNNING, "message is empty.")));
}

@Test
void assertAddJobStatusTraceEventWhenFailoverWithTaskStagingState() {
JobStatusTraceEvent jobStatusTraceEvent = new JobStatusTraceEvent(
"test_job", "fake_failover_task_id", "fake_slave_id", ExecutionType.FAILOVER, "0", State.TASK_STAGING, "message is empty.");
jobStatusTraceEvent.setOriginalTaskId("original_fake_failover_task_id");
assertThat(storage.getJobStatusTraceEvents("fake_failover_task_id").size(), is(0));
storage.addJobStatusTraceEvent(jobStatusTraceEvent);
assertThat(storage.getJobStatusTraceEvents("fake_failover_task_id").size(), is(1));
}

@Test
void assertAddJobStatusTraceEventWhenFailoverWithTaskFailedState() {
JobStatusTraceEvent stagingJobStatusTraceEvent = new JobStatusTraceEvent(
"test_job", "fake_failed_failover_task_id", "fake_slave_id", ExecutionType.FAILOVER, "0", State.TASK_STAGING, "message is empty.");
stagingJobStatusTraceEvent.setOriginalTaskId("original_fake_failed_failover_task_id");
storage.addJobStatusTraceEvent(stagingJobStatusTraceEvent);
JobStatusTraceEvent failedJobStatusTraceEvent = new JobStatusTraceEvent(
"test_job", "fake_failed_failover_task_id", "fake_slave_id", ExecutionType.FAILOVER, "0", State.TASK_FAILED, "message is empty.");
storage.addJobStatusTraceEvent(failedJobStatusTraceEvent);
List<JobStatusTraceEvent> jobStatusTraceEvents = storage.getJobStatusTraceEvents("fake_failed_failover_task_id");
assertThat(jobStatusTraceEvents.size(), is(2));
for (JobStatusTraceEvent jobStatusTraceEvent : jobStatusTraceEvents) {
assertThat(jobStatusTraceEvent.getOriginalTaskId(), is("original_fake_failed_failover_task_id"));
}
}

@Test
void assertUpdateJobExecutionEventWhenSuccess() {
JobExecutionEvent startEvent = new JobExecutionEvent("localhost", "127.0.0.1", "fake_task_id", "test_job", JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,12 @@

import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.shardingsphere.elasticjob.kernel.executor.ExecutionType;
import org.apache.shardingsphere.elasticjob.kernel.tracing.JobTracingEventBus;
import org.apache.shardingsphere.elasticjob.kernel.tracing.api.TracingConfiguration;
import org.apache.shardingsphere.elasticjob.kernel.tracing.event.JobTracingEventBus;
import org.apache.shardingsphere.elasticjob.kernel.tracing.config.TracingConfiguration;
import org.apache.shardingsphere.elasticjob.kernel.tracing.event.JobExecutionEvent;
import org.apache.shardingsphere.elasticjob.kernel.tracing.event.JobStatusTraceEvent;
import org.apache.shardingsphere.elasticjob.kernel.tracing.event.JobStatusTraceEvent.State;
import org.apache.shardingsphere.elasticjob.test.util.ReflectionUtils;
import org.apache.shardingsphere.elasticjob.tracing.rdb.storage.RDBJobEventStorage;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand Down
Loading

0 comments on commit 235b9d7

Please sign in to comment.