Skip to content

Commit

Permalink
Made JobType (Migrate, Validate & Guardrail) independent of track-run…
Browse files Browse the repository at this point in the history
… feature and renamed slices/partitions to PartitionRanges. Also provided actual jobs access to PartitionRange class.
  • Loading branch information
pravinbhat committed Nov 1, 2024
1 parent 24570bf commit 7d1d241
Show file tree
Hide file tree
Showing 21 changed files with 134 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import org.slf4j.LoggerFactory;

import com.datastax.cdm.feature.TrackRun;
import com.datastax.cdm.feature.TrackRun.RUN_TYPE;
import com.datastax.cdm.job.Partition;
import com.datastax.cdm.job.IJobSessionFactory.JobType;
import com.datastax.cdm.job.PartitionRange;
import com.datastax.cdm.job.RunNotStartedException;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
Expand Down Expand Up @@ -87,7 +87,7 @@ public TargetUpsertRunDetailsStatement(CqlSession session, String keyspaceTable)
+ " WHERE table_name = ? AND run_id = ? AND status = ? ALLOW FILTERING");
}

public Collection<Partition> getPendingPartitions(long prevRunId) throws RunNotStartedException {
public Collection<PartitionRange> getPendingPartitions(long prevRunId) throws RunNotStartedException {
if (prevRunId == 0) {
return Collections.emptyList();
}
Expand All @@ -104,7 +104,7 @@ public Collection<Partition> getPendingPartitions(long prevRunId) throws RunNotS
}
}

final Collection<Partition> pendingParts = new ArrayList<Partition>();
final Collection<PartitionRange> pendingParts = new ArrayList<PartitionRange>();
pendingParts.addAll(getPartitionsByStatus(prevRunId, TrackRun.RUN_STATUS.NOT_STARTED.toString()));
pendingParts.addAll(getPartitionsByStatus(prevRunId, TrackRun.RUN_STATUS.STARTED.toString()));
pendingParts.addAll(getPartitionsByStatus(prevRunId, TrackRun.RUN_STATUS.FAIL.toString()));
Expand All @@ -113,35 +113,35 @@ public Collection<Partition> getPendingPartitions(long prevRunId) throws RunNotS
return pendingParts;
}

protected Collection<Partition> getPartitionsByStatus(long prevRunId, String status) {
protected Collection<PartitionRange> getPartitionsByStatus(long prevRunId, String status) {
ResultSet rs = session.execute(boundSelectStatement.setString("table_name", tableName)
.setLong("run_id", prevRunId).setString("status", status));

final Collection<Partition> pendingParts = new ArrayList<Partition>();
final Collection<PartitionRange> pendingParts = new ArrayList<PartitionRange>();
rs.forEach(row -> {
Partition part = new Partition(BigInteger.valueOf(row.getLong("token_min")),
PartitionRange part = new PartitionRange(BigInteger.valueOf(row.getLong("token_min")),
BigInteger.valueOf(row.getLong("token_max")));
pendingParts.add(part);
});
return pendingParts;
}

public void initCdmRun(long runId, long prevRunId, Collection<Partition> parts, RUN_TYPE runType) {
public void initCdmRun(long runId, long prevRunId, Collection<PartitionRange> parts, JobType jobType) {
ResultSet rsInfo = session
.execute(boundSelectInfoStatement.setString("table_name", tableName).setLong("run_id", runId));
if (null != rsInfo.one()) {
throw new RuntimeException("Run id " + runId + " already exists for table " + tableName);
}
session.execute(boundInitInfoStatement.setString("table_name", tableName).setLong("run_id", runId)
.setString("run_type", runType.toString()).setLong("prev_run_id", prevRunId)
.setString("run_type", jobType.toString()).setLong("prev_run_id", prevRunId)
.setString("status", TrackRun.RUN_STATUS.NOT_STARTED.toString()));
parts.forEach(part -> initCdmRun(runId, part));
session.execute(boundInitInfoStatement.setString("table_name", tableName).setLong("run_id", runId)
.setString("run_type", runType.toString()).setLong("prev_run_id", prevRunId)
.setString("run_type", jobType.toString()).setLong("prev_run_id", prevRunId)
.setString("status", TrackRun.RUN_STATUS.STARTED.toString()));
}

private void initCdmRun(long runId, Partition partition) {
private void initCdmRun(long runId, PartitionRange partition) {
session.execute(boundInitStatement.setString("table_name", tableName).setLong("run_id", runId)
.setLong("token_min", partition.getMin().longValue())
.setLong("token_max", partition.getMax().longValue())
Expand Down
15 changes: 6 additions & 9 deletions src/main/java/com/datastax/cdm/feature/TrackRun.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,12 @@
import org.slf4j.LoggerFactory;

import com.datastax.cdm.cql.statement.TargetUpsertRunDetailsStatement;
import com.datastax.cdm.job.Partition;
import com.datastax.cdm.job.IJobSessionFactory.JobType;
import com.datastax.cdm.job.PartitionRange;
import com.datastax.cdm.job.RunNotStartedException;
import com.datastax.oss.driver.api.core.CqlSession;

public class TrackRun {
public enum RUN_TYPE {
MIGRATE, DIFF_DATA
}

public enum RUN_STATUS {
NOT_STARTED, STARTED, PASS, FAIL, DIFF, DIFF_CORRECTED, ENDED
}
Expand All @@ -42,15 +39,15 @@ public TrackRun(CqlSession session, String keyspaceTable) {
this.runStatement = new TargetUpsertRunDetailsStatement(session, keyspaceTable);
}

public Collection<Partition> getPendingPartitions(long prevRunId) throws RunNotStartedException {
Collection<Partition> pendingParts = runStatement.getPendingPartitions(prevRunId);
public Collection<PartitionRange> getPendingPartitions(long prevRunId) throws RunNotStartedException {
Collection<PartitionRange> pendingParts = runStatement.getPendingPartitions(prevRunId);
logger.info("###################### {} partitions pending from previous run id {} ######################",
pendingParts.size(), prevRunId);
return pendingParts;
}

public void initCdmRun(long runId, long prevRunId, Collection<Partition> parts, RUN_TYPE runType) {
runStatement.initCdmRun(runId, prevRunId, parts, runType);
public void initCdmRun(long runId, long prevRunId, Collection<PartitionRange> parts, JobType jobType) {
runStatement.initCdmRun(runId, prevRunId, parts, jobType);
logger.info("###################### Run Id for this job is: {} ######################", runId);
}

Expand Down
14 changes: 7 additions & 7 deletions src/main/java/com/datastax/cdm/job/AbstractJobSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package com.datastax.cdm.job;

import java.math.BigInteger;
import java.util.Collection;

import org.slf4j.Logger;
Expand All @@ -28,6 +27,7 @@
import com.datastax.cdm.feature.Featureset;
import com.datastax.cdm.feature.Guardrail;
import com.datastax.cdm.feature.TrackRun;
import com.datastax.cdm.job.IJobSessionFactory.JobType;
import com.datastax.cdm.properties.KnownProperties;
import com.datastax.cdm.properties.PropertyHelper;
import com.datastax.cdm.schema.CqlTable;
Expand Down Expand Up @@ -110,20 +110,20 @@ protected AbstractJobSession(CqlSession originSession, CqlSession targetSession,
}
}

public void processSlice(Partition slice, TrackRun trackRunFeature, long runId) {
public void processPartitionRange(PartitionRange range, TrackRun trackRunFeature, long runId) {
this.trackRunFeature = trackRunFeature;
this.runId = runId;
this.processSlice(slice.getMin(), slice.getMax());
this.processPartitionRange(range);
}

protected abstract void processSlice(BigInteger min, BigInteger max);
protected abstract void processPartitionRange(PartitionRange range);

public synchronized void initCdmRun(long runId, long prevRunId, Collection<Partition> parts,
TrackRun trackRunFeature, TrackRun.RUN_TYPE runType) {
public synchronized void initCdmRun(long runId, long prevRunId, Collection<PartitionRange> parts,
TrackRun trackRunFeature, JobType jobType) {
this.runId = runId;
this.trackRunFeature = trackRunFeature;
if (null != trackRunFeature)
trackRunFeature.initCdmRun(runId, prevRunId, parts, runType);
trackRunFeature.initCdmRun(runId, prevRunId, parts, jobType);
DataUtility.deleteGeneratedSCB(runId);
}

Expand Down
5 changes: 3 additions & 2 deletions src/main/java/com/datastax/cdm/job/CopyJobSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;

public class CopyJobSession extends AbstractJobSession<Partition> {
public class CopyJobSession extends AbstractJobSession<PartitionRange> {

private final PKFactory pkFactory;
private final boolean isCounterTable;
Expand All @@ -64,7 +64,8 @@ protected CopyJobSession(CqlSession originSession, CqlSession targetSession, Pro
logger.info("CQL -- target upsert: {}", this.targetSession.getTargetUpsertStatement().getCQL());
}

protected void processSlice(BigInteger min, BigInteger max) {
protected void processPartitionRange(PartitionRange range) {
BigInteger min = range.getMin(), max = range.getMax();
ThreadContext.put(THREAD_CONTEXT_LABEL, getThreadLabel(min, max));
logger.info("ThreadID: {} Processing min: {} max: {}", Thread.currentThread().getId(), min, max);
if (null != trackRunFeature)
Expand Down
8 changes: 6 additions & 2 deletions src/main/java/com/datastax/cdm/job/CopyJobSessionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
import com.datastax.cdm.properties.PropertyHelper;
import com.datastax.oss.driver.api.core.CqlSession;

public class CopyJobSessionFactory implements IJobSessionFactory<Partition>, Serializable {
public class CopyJobSessionFactory implements IJobSessionFactory<PartitionRange>, Serializable {
private static final long serialVersionUID = 5255029377029801421L;
private static CopyJobSession jobSession = null;

public AbstractJobSession<Partition> getInstance(CqlSession originSession, CqlSession targetSession,
public AbstractJobSession<PartitionRange> getInstance(CqlSession originSession, CqlSession targetSession,
PropertyHelper propHelper) {
if (jobSession == null) {
synchronized (CopyJobSession.class) {
Expand All @@ -35,4 +35,8 @@ public AbstractJobSession<Partition> getInstance(CqlSession originSession, CqlSe
}
return jobSession;
}

public JobType getJobType() {
return JobType.MIGRATE;
}
}
3 changes: 2 additions & 1 deletion src/main/java/com/datastax/cdm/job/DiffJobSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ public DiffJobSession(CqlSession originSession, CqlSession targetSession, Proper
logger.info("CQL -- target upsert: {}", this.targetSession.getTargetUpsertStatement().getCQL());
}

protected void processSlice(BigInteger min, BigInteger max) {
protected void processPartitionRange(PartitionRange range) {
BigInteger min = range.getMin(), max = range.getMax();
ThreadContext.put(THREAD_CONTEXT_LABEL, getThreadLabel(min, max));
logger.info("ThreadID: {} Processing min: {} max: {}", Thread.currentThread().getId(), min, max);
if (null != trackRunFeature)
Expand Down
8 changes: 6 additions & 2 deletions src/main/java/com/datastax/cdm/job/DiffJobSessionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
import com.datastax.cdm.properties.PropertyHelper;
import com.datastax.oss.driver.api.core.CqlSession;

public class DiffJobSessionFactory implements IJobSessionFactory<Partition>, Serializable {
public class DiffJobSessionFactory implements IJobSessionFactory<PartitionRange>, Serializable {
private static final long serialVersionUID = -3543616512495020278L;
private static DiffJobSession jobSession = null;

public AbstractJobSession<Partition> getInstance(CqlSession originSession, CqlSession targetSession,
public AbstractJobSession<PartitionRange> getInstance(CqlSession originSession, CqlSession targetSession,
PropertyHelper propHelper) {
if (jobSession == null) {
synchronized (DiffJobSession.class) {
Expand All @@ -35,4 +35,8 @@ public AbstractJobSession<Partition> getInstance(CqlSession originSession, CqlSe
}
return jobSession;
}

public JobType getJobType() {
return JobType.VALIDATE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;

public class GuardrailCheckJobSession extends AbstractJobSession<Partition> {
public class GuardrailCheckJobSession extends AbstractJobSession<PartitionRange> {

public Logger logger = LoggerFactory.getLogger(this.getClass().getName());

Expand All @@ -43,7 +43,8 @@ protected GuardrailCheckJobSession(CqlSession originSession, CqlSession targetSe
logger.info("CQL -- origin select: {}", this.originSession.getOriginSelectByPartitionRangeStatement().getCQL());
}

protected void processSlice(BigInteger min, BigInteger max) {
protected void processPartitionRange(PartitionRange range) {
BigInteger min = range.getMin(), max = range.getMax();
ThreadContext.put(THREAD_CONTEXT_LABEL, getThreadLabel(min, max));
try {
logger.info("ThreadID: {} Processing min: {} max: {}", Thread.currentThread().getId(), min, max);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
import com.datastax.cdm.properties.PropertyHelper;
import com.datastax.oss.driver.api.core.CqlSession;

public class GuardrailCheckJobSessionFactory implements IJobSessionFactory<Partition>, Serializable {
public class GuardrailCheckJobSessionFactory implements IJobSessionFactory<PartitionRange>, Serializable {
private static final long serialVersionUID = -4673384128807660843L;
private static GuardrailCheckJobSession jobSession = null;

public AbstractJobSession<Partition> getInstance(CqlSession originSession, CqlSession targetSession,
public AbstractJobSession<PartitionRange> getInstance(CqlSession originSession, CqlSession targetSession,
PropertyHelper propHelper) {
if (jobSession == null) {
synchronized (GuardrailCheckJobSession.class) {
Expand All @@ -35,4 +35,8 @@ public AbstractJobSession<Partition> getInstance(CqlSession originSession, CqlSe
}
return jobSession;
}

public JobType getJobType() {
return JobType.MIGRATE;
}
}
6 changes: 6 additions & 0 deletions src/main/java/com/datastax/cdm/job/IJobSessionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,11 @@
import com.datastax.oss.driver.api.core.CqlSession;

public interface IJobSessionFactory<T> {
public enum JobType {
MIGRATE, VALIDATE, GUARDRAIL
}

AbstractJobSession<T> getInstance(CqlSession originSession, CqlSession targetSession, PropertyHelper propHelper);

public JobType getJobType();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
import java.io.Serializable;
import java.math.BigInteger;

public class Partition implements Serializable {
public class PartitionRange implements Serializable {
private static final long serialVersionUID = 1L;

private final BigInteger min;
private final BigInteger max;

public Partition(BigInteger min, BigInteger max) {
public PartitionRange(BigInteger min, BigInteger max) {
this.min = min;
this.max = max;
}
Expand Down
10 changes: 5 additions & 5 deletions src/main/java/com/datastax/cdm/job/SplitPartitions.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,25 +27,25 @@ public class SplitPartitions {

public static Logger logger = LoggerFactory.getLogger(SplitPartitions.class.getName());

public static List<Partition> getRandomSubPartitions(int numSplits, BigInteger min, BigInteger max,
public static List<PartitionRange> getRandomSubPartitions(int numSplits, BigInteger min, BigInteger max,
int coveragePercent) {
logger.info("ThreadID: {} Splitting min: {} max: {}", Thread.currentThread().getId(), min, max);
List<Partition> partitions = getSubPartitions(numSplits, min, max, coveragePercent);
List<PartitionRange> partitions = getSubPartitions(numSplits, min, max, coveragePercent);
Collections.shuffle(partitions);
Collections.shuffle(partitions);
Collections.shuffle(partitions);
Collections.shuffle(partitions);
return partitions;
}

private static List<Partition> getSubPartitions(int numSplits, BigInteger min, BigInteger max,
private static List<PartitionRange> getSubPartitions(int numSplits, BigInteger min, BigInteger max,
int coveragePercent) {
if (coveragePercent < 1 || coveragePercent > 100) {
coveragePercent = 100;
}
BigInteger curMax = new BigInteger(min.toString());
BigInteger partitionSize = max.subtract(min).divide(BigInteger.valueOf(numSplits));
List<Partition> partitions = new ArrayList<Partition>();
List<PartitionRange> partitions = new ArrayList<PartitionRange>();
if (partitionSize.compareTo(new BigInteger("0")) == 0) {
partitionSize = new BigInteger("100000");
}
Expand All @@ -65,7 +65,7 @@ private static List<Partition> getSubPartitions(int numSplits, BigInteger min, B

BigInteger range = curMax.subtract(curMin);
BigInteger curRange = range.multiply(BigInteger.valueOf(coveragePercent)).divide(BigInteger.valueOf(100));
partitions.add(new Partition(curMin, curMin.add(curRange)));
partitions.add(new PartitionRange(curMin, curMin.add(curRange)));
if (exausted) {
break;
}
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/com/datastax/cdm/job/BasePartitionJob.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ import scala.reflect.io.File
import com.datastax.cdm.feature.TrackRun
import com.datastax.cdm.properties.KnownProperties

abstract class BasePartitionJob extends BaseJob[Partition] {
abstract class BasePartitionJob extends BaseJob[PartitionRange] {
var trackRunFeature: TrackRun = _
var keyspaceTableValue: String = _

override def getParts(pieces: Int): util.Collection[Partition] = {
override def getParts(pieces: Int): util.Collection[PartitionRange] = {
var keyspaceTable: Option[String] = Option(propertyHelper.getString(KnownProperties.TARGET_KEYSPACE_TABLE))
.filter(_.nonEmpty)
.orElse(Option(propertyHelper.getString(KnownProperties.ORIGIN_KEYSPACE_TABLE)))
Expand Down
5 changes: 3 additions & 2 deletions src/main/scala/com/datastax/cdm/job/DiffData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package com.datastax.cdm.job
import com.datastax.cdm.feature.TrackRun
import com.datastax.cdm.data.PKFactory.Side
import com.datastax.cdm.properties.{KnownProperties, PropertyHelper}
import com.datastax.cdm.job.IJobSessionFactory.JobType

object DiffData extends BasePartitionJob {
setup("Data Validation Job", new DiffJobSessionFactory())
Expand All @@ -28,7 +29,7 @@ object DiffData extends BasePartitionJob {
if (!parts.isEmpty()) {
originConnection.withSessionDo(originSession =>
targetConnection.withSessionDo(targetSession =>
jobFactory.getInstance(originSession, targetSession, propertyHelper).initCdmRun(runId, prevRunId, parts, trackRunFeature, TrackRun.RUN_TYPE.DIFF_DATA)));
jobFactory.getInstance(originSession, targetSession, propertyHelper).initCdmRun(runId, prevRunId, parts, trackRunFeature, JobType.VALIDATE)));
val bcConnectionFetcher = sContext.broadcast(connectionFetcher)
val bcPropHelper = sContext.broadcast(propertyHelper)
val bcJobFactory = sContext.broadcast(jobFactory)
Expand All @@ -44,7 +45,7 @@ object DiffData extends BasePartitionJob {
originConnection.withSessionDo(originSession =>
targetConnection.withSessionDo(targetSession =>
bcJobFactory.value.getInstance(originSession, targetSession, bcPropHelper.value)
.processSlice(slice, trackRunFeature, bcRunId.value)))
.processPartitionRange(slice, trackRunFeature, bcRunId.value)))
})
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/com/datastax/cdm/job/GuardrailCheck.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ object GuardrailCheck extends BasePartitionJob {
}
originConnection.withSessionDo(originSession =>
bcJobFactory.value.getInstance(originSession, null, bcPropHelper.value)
.processSlice(slice, null, 0))
.processPartitionRange(slice, null, 0))
})
}
}
Expand Down
Loading

0 comments on commit 7d1d241

Please sign in to comment.