From 0fba8b2d79aa3f263b0e95efb6715e6949e63b9c Mon Sep 17 00:00:00 2001 From: Pravin Bhat Date: Thu, 31 Oct 2024 12:15:04 -0400 Subject: [PATCH 1/3] Made Partition into its own class & refactored stuff to make that work --- .../TargetUpsertRunDetailsStatement.java | 13 +++--- .../com/datastax/cdm/feature/TrackRun.java | 8 ++-- .../datastax/cdm/job/AbstractJobSession.java | 4 +- .../com/datastax/cdm/job/CopyJobSession.java | 2 +- .../cdm/job/CopyJobSessionFactory.java | 4 +- .../cdm/job/DiffJobSessionFactory.java | 4 +- .../cdm/job/GuardrailCheckJobSession.java | 2 +- .../job/GuardrailCheckJobSessionFactory.java | 4 +- .../java/com/datastax/cdm/job/Partition.java | 43 +++++++++++++++++++ .../com/datastax/cdm/job/SplitPartitions.java | 24 ----------- .../datastax/cdm/job/BasePartitionJob.scala | 4 +- .../TargetUpsertRunDetailsStatementTest.java | 5 +-- .../datastax/cdm/feature/TrackRunTest.java | 4 +- .../datastax/cdm/job/SplitPartitionsTest.java | 8 ++-- 14 files changed, 73 insertions(+), 56 deletions(-) create mode 100644 src/main/java/com/datastax/cdm/job/Partition.java diff --git a/src/main/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatement.java b/src/main/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatement.java index 54fa44ab..65ce295f 100644 --- a/src/main/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatement.java +++ b/src/main/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatement.java @@ -26,9 +26,8 @@ import com.datastax.cdm.feature.TrackRun; import com.datastax.cdm.feature.TrackRun.RUN_TYPE; +import com.datastax.cdm.job.Partition; import com.datastax.cdm.job.RunNotStartedException; -import com.datastax.cdm.job.SplitPartitions; -import com.datastax.cdm.job.SplitPartitions.Partition; import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.cql.BoundStatement; import com.datastax.oss.driver.api.core.cql.ResultSet; @@ -88,7 +87,7 @@ public TargetUpsertRunDetailsStatement(CqlSession session, String keyspaceTable) + " WHERE table_name = ? AND run_id = ? AND status = ? ALLOW FILTERING"); } - public Collection getPendingPartitions(long prevRunId) throws RunNotStartedException { + public Collection getPendingPartitions(long prevRunId) throws RunNotStartedException { if (prevRunId == 0) { return Collections.emptyList(); } @@ -105,7 +104,7 @@ public Collection getPendingPartitions(long prevRunId } } - final Collection pendingParts = new ArrayList(); + final Collection pendingParts = new ArrayList(); pendingParts.addAll(getPartitionsByStatus(prevRunId, TrackRun.RUN_STATUS.NOT_STARTED.toString())); pendingParts.addAll(getPartitionsByStatus(prevRunId, TrackRun.RUN_STATUS.STARTED.toString())); pendingParts.addAll(getPartitionsByStatus(prevRunId, TrackRun.RUN_STATUS.FAIL.toString())); @@ -114,11 +113,11 @@ public Collection getPendingPartitions(long prevRunId return pendingParts; } - protected Collection getPartitionsByStatus(long prevRunId, String status) { + protected Collection getPartitionsByStatus(long prevRunId, String status) { ResultSet rs = session.execute(boundSelectStatement.setString("table_name", tableName) .setLong("run_id", prevRunId).setString("status", status)); - final Collection pendingParts = new ArrayList(); + final Collection pendingParts = new ArrayList(); rs.forEach(row -> { Partition part = new Partition(BigInteger.valueOf(row.getLong("token_min")), BigInteger.valueOf(row.getLong("token_max"))); @@ -127,7 +126,7 @@ protected Collection getPartitionsByStatus(long prevR return pendingParts; } - public void initCdmRun(long runId, long prevRunId, Collection parts, RUN_TYPE runType) { + public void initCdmRun(long runId, long prevRunId, Collection parts, RUN_TYPE runType) { ResultSet rsInfo = session .execute(boundSelectInfoStatement.setString("table_name", tableName).setLong("run_id", runId)); if (null != rsInfo.one()) { diff --git a/src/main/java/com/datastax/cdm/feature/TrackRun.java b/src/main/java/com/datastax/cdm/feature/TrackRun.java index 92b1784f..b1e7bab2 100644 --- a/src/main/java/com/datastax/cdm/feature/TrackRun.java +++ b/src/main/java/com/datastax/cdm/feature/TrackRun.java @@ -22,8 +22,8 @@ import org.slf4j.LoggerFactory; import com.datastax.cdm.cql.statement.TargetUpsertRunDetailsStatement; +import com.datastax.cdm.job.Partition; import com.datastax.cdm.job.RunNotStartedException; -import com.datastax.cdm.job.SplitPartitions; import com.datastax.oss.driver.api.core.CqlSession; public class TrackRun { @@ -42,14 +42,14 @@ public TrackRun(CqlSession session, String keyspaceTable) { this.runStatement = new TargetUpsertRunDetailsStatement(session, keyspaceTable); } - public Collection getPendingPartitions(long prevRunId) throws RunNotStartedException { - Collection pendingParts = runStatement.getPendingPartitions(prevRunId); + public Collection getPendingPartitions(long prevRunId) throws RunNotStartedException { + Collection pendingParts = runStatement.getPendingPartitions(prevRunId); logger.info("###################### {} partitions pending from previous run id {} ######################", pendingParts.size(), prevRunId); return pendingParts; } - public void initCdmRun(long runId, long prevRunId, Collection parts, RUN_TYPE runType) { + public void initCdmRun(long runId, long prevRunId, Collection parts, RUN_TYPE runType) { runStatement.initCdmRun(runId, prevRunId, parts, runType); logger.info("###################### Run Id for this job is: {} ######################", runId); } diff --git a/src/main/java/com/datastax/cdm/job/AbstractJobSession.java b/src/main/java/com/datastax/cdm/job/AbstractJobSession.java index 69853ca6..4671d25b 100644 --- a/src/main/java/com/datastax/cdm/job/AbstractJobSession.java +++ b/src/main/java/com/datastax/cdm/job/AbstractJobSession.java @@ -110,7 +110,7 @@ protected AbstractJobSession(CqlSession originSession, CqlSession targetSession, } } - public void processSlice(SplitPartitions.Partition slice, TrackRun trackRunFeature, long runId) { + public void processSlice(Partition slice, TrackRun trackRunFeature, long runId) { this.trackRunFeature = trackRunFeature; this.runId = runId; this.processSlice(slice.getMin(), slice.getMax()); @@ -118,7 +118,7 @@ public void processSlice(SplitPartitions.Partition slice, TrackRun trackRunFeatu protected abstract void processSlice(BigInteger min, BigInteger max); - public synchronized void initCdmRun(long runId, long prevRunId, Collection parts, + public synchronized void initCdmRun(long runId, long prevRunId, Collection parts, TrackRun trackRunFeature, TrackRun.RUN_TYPE runType) { this.runId = runId; this.trackRunFeature = trackRunFeature; diff --git a/src/main/java/com/datastax/cdm/job/CopyJobSession.java b/src/main/java/com/datastax/cdm/job/CopyJobSession.java index 514f14a8..0604203b 100644 --- a/src/main/java/com/datastax/cdm/job/CopyJobSession.java +++ b/src/main/java/com/datastax/cdm/job/CopyJobSession.java @@ -39,7 +39,7 @@ import com.datastax.oss.driver.api.core.cql.ResultSet; import com.datastax.oss.driver.api.core.cql.Row; -public class CopyJobSession extends AbstractJobSession { +public class CopyJobSession extends AbstractJobSession { private final PKFactory pkFactory; private final boolean isCounterTable; diff --git a/src/main/java/com/datastax/cdm/job/CopyJobSessionFactory.java b/src/main/java/com/datastax/cdm/job/CopyJobSessionFactory.java index 7cb29182..70a1e3ff 100644 --- a/src/main/java/com/datastax/cdm/job/CopyJobSessionFactory.java +++ b/src/main/java/com/datastax/cdm/job/CopyJobSessionFactory.java @@ -20,11 +20,11 @@ import com.datastax.cdm.properties.PropertyHelper; import com.datastax.oss.driver.api.core.CqlSession; -public class CopyJobSessionFactory implements IJobSessionFactory, Serializable { +public class CopyJobSessionFactory implements IJobSessionFactory, Serializable { private static final long serialVersionUID = 5255029377029801421L; private static CopyJobSession jobSession = null; - public AbstractJobSession getInstance(CqlSession originSession, CqlSession targetSession, + public AbstractJobSession getInstance(CqlSession originSession, CqlSession targetSession, PropertyHelper propHelper) { if (jobSession == null) { synchronized (CopyJobSession.class) { diff --git a/src/main/java/com/datastax/cdm/job/DiffJobSessionFactory.java b/src/main/java/com/datastax/cdm/job/DiffJobSessionFactory.java index 3b09b7c1..04f12a38 100644 --- a/src/main/java/com/datastax/cdm/job/DiffJobSessionFactory.java +++ b/src/main/java/com/datastax/cdm/job/DiffJobSessionFactory.java @@ -20,11 +20,11 @@ import com.datastax.cdm.properties.PropertyHelper; import com.datastax.oss.driver.api.core.CqlSession; -public class DiffJobSessionFactory implements IJobSessionFactory, Serializable { +public class DiffJobSessionFactory implements IJobSessionFactory, Serializable { private static final long serialVersionUID = -3543616512495020278L; private static DiffJobSession jobSession = null; - public AbstractJobSession getInstance(CqlSession originSession, CqlSession targetSession, + public AbstractJobSession getInstance(CqlSession originSession, CqlSession targetSession, PropertyHelper propHelper) { if (jobSession == null) { synchronized (DiffJobSession.class) { diff --git a/src/main/java/com/datastax/cdm/job/GuardrailCheckJobSession.java b/src/main/java/com/datastax/cdm/job/GuardrailCheckJobSession.java index bdcb74f3..1225b4e5 100644 --- a/src/main/java/com/datastax/cdm/job/GuardrailCheckJobSession.java +++ b/src/main/java/com/datastax/cdm/job/GuardrailCheckJobSession.java @@ -27,7 +27,7 @@ import com.datastax.oss.driver.api.core.cql.ResultSet; import com.datastax.oss.driver.api.core.cql.Row; -public class GuardrailCheckJobSession extends AbstractJobSession { +public class GuardrailCheckJobSession extends AbstractJobSession { public Logger logger = LoggerFactory.getLogger(this.getClass().getName()); diff --git a/src/main/java/com/datastax/cdm/job/GuardrailCheckJobSessionFactory.java b/src/main/java/com/datastax/cdm/job/GuardrailCheckJobSessionFactory.java index 6ebdd993..55c1bc9b 100644 --- a/src/main/java/com/datastax/cdm/job/GuardrailCheckJobSessionFactory.java +++ b/src/main/java/com/datastax/cdm/job/GuardrailCheckJobSessionFactory.java @@ -20,11 +20,11 @@ import com.datastax.cdm.properties.PropertyHelper; import com.datastax.oss.driver.api.core.CqlSession; -public class GuardrailCheckJobSessionFactory implements IJobSessionFactory, Serializable { +public class GuardrailCheckJobSessionFactory implements IJobSessionFactory, Serializable { private static final long serialVersionUID = -4673384128807660843L; private static GuardrailCheckJobSession jobSession = null; - public AbstractJobSession getInstance(CqlSession originSession, CqlSession targetSession, + public AbstractJobSession getInstance(CqlSession originSession, CqlSession targetSession, PropertyHelper propHelper) { if (jobSession == null) { synchronized (GuardrailCheckJobSession.class) { diff --git a/src/main/java/com/datastax/cdm/job/Partition.java b/src/main/java/com/datastax/cdm/job/Partition.java new file mode 100644 index 00000000..d36e9673 --- /dev/null +++ b/src/main/java/com/datastax/cdm/job/Partition.java @@ -0,0 +1,43 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.cdm.job; + +import java.io.Serializable; +import java.math.BigInteger; + +public class Partition implements Serializable { + private static final long serialVersionUID = 1L; + + private final BigInteger min; + private final BigInteger max; + + public Partition(BigInteger min, BigInteger max) { + this.min = min; + this.max = max; + } + + public BigInteger getMin() { + return min; + } + + public BigInteger getMax() { + return max; + } + + public String toString() { + return "Processing partition for token range " + min + " to " + max; + } +} \ No newline at end of file diff --git a/src/main/java/com/datastax/cdm/job/SplitPartitions.java b/src/main/java/com/datastax/cdm/job/SplitPartitions.java index eec89d62..ae884902 100644 --- a/src/main/java/com/datastax/cdm/job/SplitPartitions.java +++ b/src/main/java/com/datastax/cdm/job/SplitPartitions.java @@ -15,7 +15,6 @@ */ package com.datastax.cdm.job; -import java.io.Serializable; import java.math.BigInteger; import java.util.ArrayList; import java.util.Collections; @@ -76,27 +75,4 @@ private static List getSubPartitions(int numSplits, BigInteger min, B return partitions; } - public static class Partition implements Serializable { - private static final long serialVersionUID = 1L; - - private final BigInteger min; - private final BigInteger max; - - public Partition(BigInteger min, BigInteger max) { - this.min = min; - this.max = max; - } - - public BigInteger getMin() { - return min; - } - - public BigInteger getMax() { - return max; - } - - public String toString() { - return "Processing partition for token range " + min + " to " + max; - } - } } \ No newline at end of file diff --git a/src/main/scala/com/datastax/cdm/job/BasePartitionJob.scala b/src/main/scala/com/datastax/cdm/job/BasePartitionJob.scala index 2f380866..83c99613 100644 --- a/src/main/scala/com/datastax/cdm/job/BasePartitionJob.scala +++ b/src/main/scala/com/datastax/cdm/job/BasePartitionJob.scala @@ -20,11 +20,11 @@ import scala.reflect.io.File import com.datastax.cdm.feature.TrackRun import com.datastax.cdm.properties.KnownProperties -abstract class BasePartitionJob extends BaseJob[SplitPartitions.Partition] { +abstract class BasePartitionJob extends BaseJob[Partition] { var trackRunFeature: TrackRun = _ var keyspaceTableValue: String = _ - override def getParts(pieces: Int): util.Collection[SplitPartitions.Partition] = { + override def getParts(pieces: Int): util.Collection[Partition] = { var keyspaceTable: Option[String] = Option(propertyHelper.getString(KnownProperties.TARGET_KEYSPACE_TABLE)) .filter(_.nonEmpty) .orElse(Option(propertyHelper.getString(KnownProperties.ORIGIN_KEYSPACE_TABLE))) diff --git a/src/test/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatementTest.java b/src/test/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatementTest.java index 4186abaa..0d358520 100644 --- a/src/test/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatementTest.java +++ b/src/test/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatementTest.java @@ -32,8 +32,8 @@ import org.mockito.Mockito; import com.datastax.cdm.cql.CommonMocks; +import com.datastax.cdm.job.Partition; import com.datastax.cdm.job.RunNotStartedException; -import com.datastax.cdm.job.SplitPartitions; import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.cql.BoundStatement; import com.datastax.oss.driver.api.core.cql.PreparedStatement; @@ -98,8 +98,7 @@ public void getPartitionsByStatus() { when(mockIterator.next()).thenReturn(row3); targetUpsertRunDetailsStatement = new TargetUpsertRunDetailsStatement(cqlSession, "ks.table1"); - Collection parts = targetUpsertRunDetailsStatement.getPartitionsByStatus(123l, - "RUNNING"); + Collection parts = targetUpsertRunDetailsStatement.getPartitionsByStatus(123l, "RUNNING"); // This test is incorrect, but needs to be troubleshot & fixed. The actual code works, but the test does not assertEquals(0, parts.size()); diff --git a/src/test/java/com/datastax/cdm/feature/TrackRunTest.java b/src/test/java/com/datastax/cdm/feature/TrackRunTest.java index bc154a14..4944d405 100644 --- a/src/test/java/com/datastax/cdm/feature/TrackRunTest.java +++ b/src/test/java/com/datastax/cdm/feature/TrackRunTest.java @@ -27,8 +27,8 @@ import org.mockito.Mock; import com.datastax.cdm.cql.CommonMocks; +import com.datastax.cdm.job.Partition; import com.datastax.cdm.job.RunNotStartedException; -import com.datastax.cdm.job.SplitPartitions; import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.cql.BoundStatement; @@ -60,7 +60,7 @@ void countTypesAndStatus() { @Test void init() throws RunNotStartedException { TrackRun trackRun = new TrackRun(cqlSession, "keyspace.table"); - Collection parts = trackRun.getPendingPartitions(0); + Collection parts = trackRun.getPendingPartitions(0); assertEquals(0, parts.size()); } diff --git a/src/test/java/com/datastax/cdm/job/SplitPartitionsTest.java b/src/test/java/com/datastax/cdm/job/SplitPartitionsTest.java index 8b172f82..4351850a 100644 --- a/src/test/java/com/datastax/cdm/job/SplitPartitionsTest.java +++ b/src/test/java/com/datastax/cdm/job/SplitPartitionsTest.java @@ -32,8 +32,8 @@ void tearDown() { @Test void getRandomSubPartitionsTest() { - List partitions = SplitPartitions.getRandomSubPartitions(10, BigInteger.ONE, - BigInteger.valueOf(100), 100); + List partitions = SplitPartitions.getRandomSubPartitions(10, BigInteger.ONE, BigInteger.valueOf(100), + 100); assertEquals(10, partitions.size()); partitions.forEach(p -> { assertEquals(9, p.getMax().longValue() - p.getMin().longValue()); @@ -42,8 +42,8 @@ void getRandomSubPartitionsTest() { @Test void getRandomSubPartitionsTestOver100() { - List partitions = SplitPartitions.getRandomSubPartitions(8, BigInteger.ONE, - BigInteger.valueOf(44), 200); + List partitions = SplitPartitions.getRandomSubPartitions(8, BigInteger.ONE, BigInteger.valueOf(44), + 200); assertEquals(8, partitions.size()); } From 24570bfc1339daa9519022674308a011893a9973 Mon Sep 17 00:00:00 2001 From: Pravin Bhat Date: Thu, 31 Oct 2024 12:22:32 -0400 Subject: [PATCH 2/3] Made CounterUnit its own class & refactored JobCounter to work with it. --- .../com/datastax/cdm/job/CounterUnit.java | 50 +++++++++++++++++++ .../java/com/datastax/cdm/job/JobCounter.java | 36 ++----------- 2 files changed, 54 insertions(+), 32 deletions(-) create mode 100644 src/main/java/com/datastax/cdm/job/CounterUnit.java diff --git a/src/main/java/com/datastax/cdm/job/CounterUnit.java b/src/main/java/com/datastax/cdm/job/CounterUnit.java new file mode 100644 index 00000000..d8a04bee --- /dev/null +++ b/src/main/java/com/datastax/cdm/job/CounterUnit.java @@ -0,0 +1,50 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.cdm.job; + +import java.io.Serializable; +import java.util.concurrent.atomic.AtomicLong; + +public class CounterUnit implements Serializable { + + private static final long serialVersionUID = 2194336948011681878L; + private final AtomicLong globalCounter = new AtomicLong(0); + private final transient ThreadLocal threadLocalCounter = ThreadLocal.withInitial(() -> 0L); + + public void incrementThreadCounter(long incrementBy) { + threadLocalCounter.set(threadLocalCounter.get() + incrementBy); + } + + public long getThreadCounter() { + return threadLocalCounter.get(); + } + + public void resetThreadCounter() { + threadLocalCounter.set(0L); + } + + public void setGlobalCounter(long value) { + globalCounter.set(value); + } + + public void addThreadToGlobalCounter() { + globalCounter.addAndGet(threadLocalCounter.get()); + } + + public long getGlobalCounter() { + return globalCounter.get(); + } +} diff --git a/src/main/java/com/datastax/cdm/job/JobCounter.java b/src/main/java/com/datastax/cdm/job/JobCounter.java index 71811eb6..db67d719 100644 --- a/src/main/java/com/datastax/cdm/job/JobCounter.java +++ b/src/main/java/com/datastax/cdm/job/JobCounter.java @@ -15,15 +15,17 @@ */ package com.datastax.cdm.job; +import java.io.Serializable; import java.util.HashMap; -import java.util.concurrent.atomic.AtomicLong; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.datastax.cdm.feature.TrackRun; -public class JobCounter { +public class JobCounter implements Serializable { + + private static final long serialVersionUID = 7016816604237020549L; // Enumeration for counter types public enum CounterType { @@ -33,36 +35,6 @@ public enum CounterType { // Logger instance private final Logger logger = LoggerFactory.getLogger(this.getClass().getName()); - // Internal class to handle atomic counting operations - private static class CounterUnit { - private final AtomicLong globalCounter = new AtomicLong(0); - private final ThreadLocal threadLocalCounter = ThreadLocal.withInitial(() -> 0L); - - public void incrementThreadCounter(long incrementBy) { - threadLocalCounter.set(threadLocalCounter.get() + incrementBy); - } - - public long getThreadCounter() { - return threadLocalCounter.get(); - } - - public void resetThreadCounter() { - threadLocalCounter.set(0L); - } - - public void setGlobalCounter(long value) { - globalCounter.set(value); - } - - public void addThreadToGlobalCounter() { - globalCounter.addAndGet(threadLocalCounter.get()); - } - - public long getGlobalCounter() { - return globalCounter.get(); - } - } - // Declare individual counters for different operations private final HashMap counterMap = new HashMap<>(); From 7d1d24167b4ff513c330b0155f2d4c5e42022159 Mon Sep 17 00:00:00 2001 From: Pravin Bhat Date: Fri, 1 Nov 2024 13:54:14 -0400 Subject: [PATCH 3/3] Made JobType (Migrate, Validate & Guardrail) independent of track-run feature and renamed slices/partitions to PartitionRanges. Also provided actual jobs access to PartitionRange class. --- .../TargetUpsertRunDetailsStatement.java | 22 ++++----- .../com/datastax/cdm/feature/TrackRun.java | 15 +++--- .../datastax/cdm/job/AbstractJobSession.java | 14 +++--- .../com/datastax/cdm/job/CopyJobSession.java | 5 +- .../cdm/job/CopyJobSessionFactory.java | 8 +++- .../com/datastax/cdm/job/DiffJobSession.java | 3 +- .../cdm/job/DiffJobSessionFactory.java | 8 +++- .../cdm/job/GuardrailCheckJobSession.java | 5 +- .../job/GuardrailCheckJobSessionFactory.java | 8 +++- .../datastax/cdm/job/IJobSessionFactory.java | 6 +++ .../{Partition.java => PartitionRange.java} | 4 +- .../com/datastax/cdm/job/SplitPartitions.java | 10 ++-- .../datastax/cdm/job/BasePartitionJob.scala | 4 +- .../scala/com/datastax/cdm/job/DiffData.scala | 5 +- .../com/datastax/cdm/job/GuardrailCheck.scala | 2 +- .../scala/com/datastax/cdm/job/Migrate.scala | 5 +- .../cql/codec/BigInteger_BIGINTCodecTest.java | 1 - .../TargetUpsertRunDetailsStatementTest.java | 10 ++-- .../com/datastax/cdm/data/RecordTest.java | 47 +++++++++++++++++++ .../datastax/cdm/feature/TrackRunTest.java | 11 +++-- .../datastax/cdm/job/SplitPartitionsTest.java | 8 ++-- 21 files changed, 134 insertions(+), 67 deletions(-) rename src/main/java/com/datastax/cdm/job/{Partition.java => PartitionRange.java} (90%) create mode 100644 src/test/java/com/datastax/cdm/data/RecordTest.java diff --git a/src/main/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatement.java b/src/main/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatement.java index 65ce295f..dcbf9ebc 100644 --- a/src/main/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatement.java +++ b/src/main/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatement.java @@ -25,8 +25,8 @@ import org.slf4j.LoggerFactory; import com.datastax.cdm.feature.TrackRun; -import com.datastax.cdm.feature.TrackRun.RUN_TYPE; -import com.datastax.cdm.job.Partition; +import com.datastax.cdm.job.IJobSessionFactory.JobType; +import com.datastax.cdm.job.PartitionRange; import com.datastax.cdm.job.RunNotStartedException; import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.cql.BoundStatement; @@ -87,7 +87,7 @@ public TargetUpsertRunDetailsStatement(CqlSession session, String keyspaceTable) + " WHERE table_name = ? AND run_id = ? AND status = ? ALLOW FILTERING"); } - public Collection getPendingPartitions(long prevRunId) throws RunNotStartedException { + public Collection getPendingPartitions(long prevRunId) throws RunNotStartedException { if (prevRunId == 0) { return Collections.emptyList(); } @@ -104,7 +104,7 @@ public Collection getPendingPartitions(long prevRunId) throws RunNotS } } - final Collection pendingParts = new ArrayList(); + final Collection pendingParts = new ArrayList(); pendingParts.addAll(getPartitionsByStatus(prevRunId, TrackRun.RUN_STATUS.NOT_STARTED.toString())); pendingParts.addAll(getPartitionsByStatus(prevRunId, TrackRun.RUN_STATUS.STARTED.toString())); pendingParts.addAll(getPartitionsByStatus(prevRunId, TrackRun.RUN_STATUS.FAIL.toString())); @@ -113,35 +113,35 @@ public Collection getPendingPartitions(long prevRunId) throws RunNotS return pendingParts; } - protected Collection getPartitionsByStatus(long prevRunId, String status) { + protected Collection getPartitionsByStatus(long prevRunId, String status) { ResultSet rs = session.execute(boundSelectStatement.setString("table_name", tableName) .setLong("run_id", prevRunId).setString("status", status)); - final Collection pendingParts = new ArrayList(); + final Collection pendingParts = new ArrayList(); rs.forEach(row -> { - Partition part = new Partition(BigInteger.valueOf(row.getLong("token_min")), + PartitionRange part = new PartitionRange(BigInteger.valueOf(row.getLong("token_min")), BigInteger.valueOf(row.getLong("token_max"))); pendingParts.add(part); }); return pendingParts; } - public void initCdmRun(long runId, long prevRunId, Collection parts, RUN_TYPE runType) { + public void initCdmRun(long runId, long prevRunId, Collection parts, JobType jobType) { ResultSet rsInfo = session .execute(boundSelectInfoStatement.setString("table_name", tableName).setLong("run_id", runId)); if (null != rsInfo.one()) { throw new RuntimeException("Run id " + runId + " already exists for table " + tableName); } session.execute(boundInitInfoStatement.setString("table_name", tableName).setLong("run_id", runId) - .setString("run_type", runType.toString()).setLong("prev_run_id", prevRunId) + .setString("run_type", jobType.toString()).setLong("prev_run_id", prevRunId) .setString("status", TrackRun.RUN_STATUS.NOT_STARTED.toString())); parts.forEach(part -> initCdmRun(runId, part)); session.execute(boundInitInfoStatement.setString("table_name", tableName).setLong("run_id", runId) - .setString("run_type", runType.toString()).setLong("prev_run_id", prevRunId) + .setString("run_type", jobType.toString()).setLong("prev_run_id", prevRunId) .setString("status", TrackRun.RUN_STATUS.STARTED.toString())); } - private void initCdmRun(long runId, Partition partition) { + private void initCdmRun(long runId, PartitionRange partition) { session.execute(boundInitStatement.setString("table_name", tableName).setLong("run_id", runId) .setLong("token_min", partition.getMin().longValue()) .setLong("token_max", partition.getMax().longValue()) diff --git a/src/main/java/com/datastax/cdm/feature/TrackRun.java b/src/main/java/com/datastax/cdm/feature/TrackRun.java index b1e7bab2..a95557b3 100644 --- a/src/main/java/com/datastax/cdm/feature/TrackRun.java +++ b/src/main/java/com/datastax/cdm/feature/TrackRun.java @@ -22,15 +22,12 @@ import org.slf4j.LoggerFactory; import com.datastax.cdm.cql.statement.TargetUpsertRunDetailsStatement; -import com.datastax.cdm.job.Partition; +import com.datastax.cdm.job.IJobSessionFactory.JobType; +import com.datastax.cdm.job.PartitionRange; import com.datastax.cdm.job.RunNotStartedException; import com.datastax.oss.driver.api.core.CqlSession; public class TrackRun { - public enum RUN_TYPE { - MIGRATE, DIFF_DATA - } - public enum RUN_STATUS { NOT_STARTED, STARTED, PASS, FAIL, DIFF, DIFF_CORRECTED, ENDED } @@ -42,15 +39,15 @@ public TrackRun(CqlSession session, String keyspaceTable) { this.runStatement = new TargetUpsertRunDetailsStatement(session, keyspaceTable); } - public Collection getPendingPartitions(long prevRunId) throws RunNotStartedException { - Collection pendingParts = runStatement.getPendingPartitions(prevRunId); + public Collection getPendingPartitions(long prevRunId) throws RunNotStartedException { + Collection pendingParts = runStatement.getPendingPartitions(prevRunId); logger.info("###################### {} partitions pending from previous run id {} ######################", pendingParts.size(), prevRunId); return pendingParts; } - public void initCdmRun(long runId, long prevRunId, Collection parts, RUN_TYPE runType) { - runStatement.initCdmRun(runId, prevRunId, parts, runType); + public void initCdmRun(long runId, long prevRunId, Collection parts, JobType jobType) { + runStatement.initCdmRun(runId, prevRunId, parts, jobType); logger.info("###################### Run Id for this job is: {} ######################", runId); } diff --git a/src/main/java/com/datastax/cdm/job/AbstractJobSession.java b/src/main/java/com/datastax/cdm/job/AbstractJobSession.java index 4671d25b..575eac7a 100644 --- a/src/main/java/com/datastax/cdm/job/AbstractJobSession.java +++ b/src/main/java/com/datastax/cdm/job/AbstractJobSession.java @@ -15,7 +15,6 @@ */ package com.datastax.cdm.job; -import java.math.BigInteger; import java.util.Collection; import org.slf4j.Logger; @@ -28,6 +27,7 @@ import com.datastax.cdm.feature.Featureset; import com.datastax.cdm.feature.Guardrail; import com.datastax.cdm.feature.TrackRun; +import com.datastax.cdm.job.IJobSessionFactory.JobType; import com.datastax.cdm.properties.KnownProperties; import com.datastax.cdm.properties.PropertyHelper; import com.datastax.cdm.schema.CqlTable; @@ -110,20 +110,20 @@ protected AbstractJobSession(CqlSession originSession, CqlSession targetSession, } } - public void processSlice(Partition slice, TrackRun trackRunFeature, long runId) { + public void processPartitionRange(PartitionRange range, TrackRun trackRunFeature, long runId) { this.trackRunFeature = trackRunFeature; this.runId = runId; - this.processSlice(slice.getMin(), slice.getMax()); + this.processPartitionRange(range); } - protected abstract void processSlice(BigInteger min, BigInteger max); + protected abstract void processPartitionRange(PartitionRange range); - public synchronized void initCdmRun(long runId, long prevRunId, Collection parts, - TrackRun trackRunFeature, TrackRun.RUN_TYPE runType) { + public synchronized void initCdmRun(long runId, long prevRunId, Collection parts, + TrackRun trackRunFeature, JobType jobType) { this.runId = runId; this.trackRunFeature = trackRunFeature; if (null != trackRunFeature) - trackRunFeature.initCdmRun(runId, prevRunId, parts, runType); + trackRunFeature.initCdmRun(runId, prevRunId, parts, jobType); DataUtility.deleteGeneratedSCB(runId); } diff --git a/src/main/java/com/datastax/cdm/job/CopyJobSession.java b/src/main/java/com/datastax/cdm/job/CopyJobSession.java index 0604203b..0c265085 100644 --- a/src/main/java/com/datastax/cdm/job/CopyJobSession.java +++ b/src/main/java/com/datastax/cdm/job/CopyJobSession.java @@ -39,7 +39,7 @@ import com.datastax.oss.driver.api.core.cql.ResultSet; import com.datastax.oss.driver.api.core.cql.Row; -public class CopyJobSession extends AbstractJobSession { +public class CopyJobSession extends AbstractJobSession { private final PKFactory pkFactory; private final boolean isCounterTable; @@ -64,7 +64,8 @@ protected CopyJobSession(CqlSession originSession, CqlSession targetSession, Pro logger.info("CQL -- target upsert: {}", this.targetSession.getTargetUpsertStatement().getCQL()); } - protected void processSlice(BigInteger min, BigInteger max) { + protected void processPartitionRange(PartitionRange range) { + BigInteger min = range.getMin(), max = range.getMax(); ThreadContext.put(THREAD_CONTEXT_LABEL, getThreadLabel(min, max)); logger.info("ThreadID: {} Processing min: {} max: {}", Thread.currentThread().getId(), min, max); if (null != trackRunFeature) diff --git a/src/main/java/com/datastax/cdm/job/CopyJobSessionFactory.java b/src/main/java/com/datastax/cdm/job/CopyJobSessionFactory.java index 70a1e3ff..1238d751 100644 --- a/src/main/java/com/datastax/cdm/job/CopyJobSessionFactory.java +++ b/src/main/java/com/datastax/cdm/job/CopyJobSessionFactory.java @@ -20,11 +20,11 @@ import com.datastax.cdm.properties.PropertyHelper; import com.datastax.oss.driver.api.core.CqlSession; -public class CopyJobSessionFactory implements IJobSessionFactory, Serializable { +public class CopyJobSessionFactory implements IJobSessionFactory, Serializable { private static final long serialVersionUID = 5255029377029801421L; private static CopyJobSession jobSession = null; - public AbstractJobSession getInstance(CqlSession originSession, CqlSession targetSession, + public AbstractJobSession getInstance(CqlSession originSession, CqlSession targetSession, PropertyHelper propHelper) { if (jobSession == null) { synchronized (CopyJobSession.class) { @@ -35,4 +35,8 @@ public AbstractJobSession getInstance(CqlSession originSession, CqlSe } return jobSession; } + + public JobType getJobType() { + return JobType.MIGRATE; + } } diff --git a/src/main/java/com/datastax/cdm/job/DiffJobSession.java b/src/main/java/com/datastax/cdm/job/DiffJobSession.java index 440d5517..38ad7439 100644 --- a/src/main/java/com/datastax/cdm/job/DiffJobSession.java +++ b/src/main/java/com/datastax/cdm/job/DiffJobSession.java @@ -117,7 +117,8 @@ public DiffJobSession(CqlSession originSession, CqlSession targetSession, Proper logger.info("CQL -- target upsert: {}", this.targetSession.getTargetUpsertStatement().getCQL()); } - protected void processSlice(BigInteger min, BigInteger max) { + protected void processPartitionRange(PartitionRange range) { + BigInteger min = range.getMin(), max = range.getMax(); ThreadContext.put(THREAD_CONTEXT_LABEL, getThreadLabel(min, max)); logger.info("ThreadID: {} Processing min: {} max: {}", Thread.currentThread().getId(), min, max); if (null != trackRunFeature) diff --git a/src/main/java/com/datastax/cdm/job/DiffJobSessionFactory.java b/src/main/java/com/datastax/cdm/job/DiffJobSessionFactory.java index 04f12a38..b018ba91 100644 --- a/src/main/java/com/datastax/cdm/job/DiffJobSessionFactory.java +++ b/src/main/java/com/datastax/cdm/job/DiffJobSessionFactory.java @@ -20,11 +20,11 @@ import com.datastax.cdm.properties.PropertyHelper; import com.datastax.oss.driver.api.core.CqlSession; -public class DiffJobSessionFactory implements IJobSessionFactory, Serializable { +public class DiffJobSessionFactory implements IJobSessionFactory, Serializable { private static final long serialVersionUID = -3543616512495020278L; private static DiffJobSession jobSession = null; - public AbstractJobSession getInstance(CqlSession originSession, CqlSession targetSession, + public AbstractJobSession getInstance(CqlSession originSession, CqlSession targetSession, PropertyHelper propHelper) { if (jobSession == null) { synchronized (DiffJobSession.class) { @@ -35,4 +35,8 @@ public AbstractJobSession getInstance(CqlSession originSession, CqlSe } return jobSession; } + + public JobType getJobType() { + return JobType.VALIDATE; + } } diff --git a/src/main/java/com/datastax/cdm/job/GuardrailCheckJobSession.java b/src/main/java/com/datastax/cdm/job/GuardrailCheckJobSession.java index 1225b4e5..8fb26bd2 100644 --- a/src/main/java/com/datastax/cdm/job/GuardrailCheckJobSession.java +++ b/src/main/java/com/datastax/cdm/job/GuardrailCheckJobSession.java @@ -27,7 +27,7 @@ import com.datastax.oss.driver.api.core.cql.ResultSet; import com.datastax.oss.driver.api.core.cql.Row; -public class GuardrailCheckJobSession extends AbstractJobSession { +public class GuardrailCheckJobSession extends AbstractJobSession { public Logger logger = LoggerFactory.getLogger(this.getClass().getName()); @@ -43,7 +43,8 @@ protected GuardrailCheckJobSession(CqlSession originSession, CqlSession targetSe logger.info("CQL -- origin select: {}", this.originSession.getOriginSelectByPartitionRangeStatement().getCQL()); } - protected void processSlice(BigInteger min, BigInteger max) { + protected void processPartitionRange(PartitionRange range) { + BigInteger min = range.getMin(), max = range.getMax(); ThreadContext.put(THREAD_CONTEXT_LABEL, getThreadLabel(min, max)); try { logger.info("ThreadID: {} Processing min: {} max: {}", Thread.currentThread().getId(), min, max); diff --git a/src/main/java/com/datastax/cdm/job/GuardrailCheckJobSessionFactory.java b/src/main/java/com/datastax/cdm/job/GuardrailCheckJobSessionFactory.java index 55c1bc9b..a294d538 100644 --- a/src/main/java/com/datastax/cdm/job/GuardrailCheckJobSessionFactory.java +++ b/src/main/java/com/datastax/cdm/job/GuardrailCheckJobSessionFactory.java @@ -20,11 +20,11 @@ import com.datastax.cdm.properties.PropertyHelper; import com.datastax.oss.driver.api.core.CqlSession; -public class GuardrailCheckJobSessionFactory implements IJobSessionFactory, Serializable { +public class GuardrailCheckJobSessionFactory implements IJobSessionFactory, Serializable { private static final long serialVersionUID = -4673384128807660843L; private static GuardrailCheckJobSession jobSession = null; - public AbstractJobSession getInstance(CqlSession originSession, CqlSession targetSession, + public AbstractJobSession getInstance(CqlSession originSession, CqlSession targetSession, PropertyHelper propHelper) { if (jobSession == null) { synchronized (GuardrailCheckJobSession.class) { @@ -35,4 +35,8 @@ public AbstractJobSession getInstance(CqlSession originSession, CqlSe } return jobSession; } + + public JobType getJobType() { + return JobType.MIGRATE; + } } diff --git a/src/main/java/com/datastax/cdm/job/IJobSessionFactory.java b/src/main/java/com/datastax/cdm/job/IJobSessionFactory.java index 2e643502..678bca96 100644 --- a/src/main/java/com/datastax/cdm/job/IJobSessionFactory.java +++ b/src/main/java/com/datastax/cdm/job/IJobSessionFactory.java @@ -19,5 +19,11 @@ import com.datastax.oss.driver.api.core.CqlSession; public interface IJobSessionFactory { + public enum JobType { + MIGRATE, VALIDATE, GUARDRAIL + } + AbstractJobSession getInstance(CqlSession originSession, CqlSession targetSession, PropertyHelper propHelper); + + public JobType getJobType(); } diff --git a/src/main/java/com/datastax/cdm/job/Partition.java b/src/main/java/com/datastax/cdm/job/PartitionRange.java similarity index 90% rename from src/main/java/com/datastax/cdm/job/Partition.java rename to src/main/java/com/datastax/cdm/job/PartitionRange.java index d36e9673..53134444 100644 --- a/src/main/java/com/datastax/cdm/job/Partition.java +++ b/src/main/java/com/datastax/cdm/job/PartitionRange.java @@ -18,13 +18,13 @@ import java.io.Serializable; import java.math.BigInteger; -public class Partition implements Serializable { +public class PartitionRange implements Serializable { private static final long serialVersionUID = 1L; private final BigInteger min; private final BigInteger max; - public Partition(BigInteger min, BigInteger max) { + public PartitionRange(BigInteger min, BigInteger max) { this.min = min; this.max = max; } diff --git a/src/main/java/com/datastax/cdm/job/SplitPartitions.java b/src/main/java/com/datastax/cdm/job/SplitPartitions.java index ae884902..3212a75c 100644 --- a/src/main/java/com/datastax/cdm/job/SplitPartitions.java +++ b/src/main/java/com/datastax/cdm/job/SplitPartitions.java @@ -27,10 +27,10 @@ public class SplitPartitions { public static Logger logger = LoggerFactory.getLogger(SplitPartitions.class.getName()); - public static List getRandomSubPartitions(int numSplits, BigInteger min, BigInteger max, + public static List getRandomSubPartitions(int numSplits, BigInteger min, BigInteger max, int coveragePercent) { logger.info("ThreadID: {} Splitting min: {} max: {}", Thread.currentThread().getId(), min, max); - List partitions = getSubPartitions(numSplits, min, max, coveragePercent); + List partitions = getSubPartitions(numSplits, min, max, coveragePercent); Collections.shuffle(partitions); Collections.shuffle(partitions); Collections.shuffle(partitions); @@ -38,14 +38,14 @@ public static List getRandomSubPartitions(int numSplits, BigInteger m return partitions; } - private static List getSubPartitions(int numSplits, BigInteger min, BigInteger max, + private static List getSubPartitions(int numSplits, BigInteger min, BigInteger max, int coveragePercent) { if (coveragePercent < 1 || coveragePercent > 100) { coveragePercent = 100; } BigInteger curMax = new BigInteger(min.toString()); BigInteger partitionSize = max.subtract(min).divide(BigInteger.valueOf(numSplits)); - List partitions = new ArrayList(); + List partitions = new ArrayList(); if (partitionSize.compareTo(new BigInteger("0")) == 0) { partitionSize = new BigInteger("100000"); } @@ -65,7 +65,7 @@ private static List getSubPartitions(int numSplits, BigInteger min, B BigInteger range = curMax.subtract(curMin); BigInteger curRange = range.multiply(BigInteger.valueOf(coveragePercent)).divide(BigInteger.valueOf(100)); - partitions.add(new Partition(curMin, curMin.add(curRange))); + partitions.add(new PartitionRange(curMin, curMin.add(curRange))); if (exausted) { break; } diff --git a/src/main/scala/com/datastax/cdm/job/BasePartitionJob.scala b/src/main/scala/com/datastax/cdm/job/BasePartitionJob.scala index 83c99613..23689dcb 100644 --- a/src/main/scala/com/datastax/cdm/job/BasePartitionJob.scala +++ b/src/main/scala/com/datastax/cdm/job/BasePartitionJob.scala @@ -20,11 +20,11 @@ import scala.reflect.io.File import com.datastax.cdm.feature.TrackRun import com.datastax.cdm.properties.KnownProperties -abstract class BasePartitionJob extends BaseJob[Partition] { +abstract class BasePartitionJob extends BaseJob[PartitionRange] { var trackRunFeature: TrackRun = _ var keyspaceTableValue: String = _ - override def getParts(pieces: Int): util.Collection[Partition] = { + override def getParts(pieces: Int): util.Collection[PartitionRange] = { var keyspaceTable: Option[String] = Option(propertyHelper.getString(KnownProperties.TARGET_KEYSPACE_TABLE)) .filter(_.nonEmpty) .orElse(Option(propertyHelper.getString(KnownProperties.ORIGIN_KEYSPACE_TABLE))) diff --git a/src/main/scala/com/datastax/cdm/job/DiffData.scala b/src/main/scala/com/datastax/cdm/job/DiffData.scala index 9c4f84eb..f65230a8 100644 --- a/src/main/scala/com/datastax/cdm/job/DiffData.scala +++ b/src/main/scala/com/datastax/cdm/job/DiffData.scala @@ -18,6 +18,7 @@ package com.datastax.cdm.job import com.datastax.cdm.feature.TrackRun import com.datastax.cdm.data.PKFactory.Side import com.datastax.cdm.properties.{KnownProperties, PropertyHelper} +import com.datastax.cdm.job.IJobSessionFactory.JobType object DiffData extends BasePartitionJob { setup("Data Validation Job", new DiffJobSessionFactory()) @@ -28,7 +29,7 @@ object DiffData extends BasePartitionJob { if (!parts.isEmpty()) { originConnection.withSessionDo(originSession => targetConnection.withSessionDo(targetSession => - jobFactory.getInstance(originSession, targetSession, propertyHelper).initCdmRun(runId, prevRunId, parts, trackRunFeature, TrackRun.RUN_TYPE.DIFF_DATA))); + jobFactory.getInstance(originSession, targetSession, propertyHelper).initCdmRun(runId, prevRunId, parts, trackRunFeature, JobType.VALIDATE))); val bcConnectionFetcher = sContext.broadcast(connectionFetcher) val bcPropHelper = sContext.broadcast(propertyHelper) val bcJobFactory = sContext.broadcast(jobFactory) @@ -44,7 +45,7 @@ object DiffData extends BasePartitionJob { originConnection.withSessionDo(originSession => targetConnection.withSessionDo(targetSession => bcJobFactory.value.getInstance(originSession, targetSession, bcPropHelper.value) - .processSlice(slice, trackRunFeature, bcRunId.value))) + .processPartitionRange(slice, trackRunFeature, bcRunId.value))) }) } } diff --git a/src/main/scala/com/datastax/cdm/job/GuardrailCheck.scala b/src/main/scala/com/datastax/cdm/job/GuardrailCheck.scala index 20eccf2a..a90c765c 100644 --- a/src/main/scala/com/datastax/cdm/job/GuardrailCheck.scala +++ b/src/main/scala/com/datastax/cdm/job/GuardrailCheck.scala @@ -37,7 +37,7 @@ object GuardrailCheck extends BasePartitionJob { } originConnection.withSessionDo(originSession => bcJobFactory.value.getInstance(originSession, null, bcPropHelper.value) - .processSlice(slice, null, 0)) + .processPartitionRange(slice, null, 0)) }) } } diff --git a/src/main/scala/com/datastax/cdm/job/Migrate.scala b/src/main/scala/com/datastax/cdm/job/Migrate.scala index b0002548..ad55bbf4 100644 --- a/src/main/scala/com/datastax/cdm/job/Migrate.scala +++ b/src/main/scala/com/datastax/cdm/job/Migrate.scala @@ -18,6 +18,7 @@ package com.datastax.cdm.job import com.datastax.cdm.feature.TrackRun import com.datastax.cdm.data.PKFactory.Side import com.datastax.cdm.properties.{KnownProperties, PropertyHelper} +import com.datastax.cdm.job.IJobSessionFactory.JobType object Migrate extends BasePartitionJob { setup("Migrate Job", new CopyJobSessionFactory()) @@ -28,7 +29,7 @@ object Migrate extends BasePartitionJob { if (!parts.isEmpty()) { originConnection.withSessionDo(originSession => targetConnection.withSessionDo(targetSession => - jobFactory.getInstance(originSession, targetSession, propertyHelper).initCdmRun(runId, prevRunId, parts, trackRunFeature, TrackRun.RUN_TYPE.MIGRATE))); + jobFactory.getInstance(originSession, targetSession, propertyHelper).initCdmRun(runId, prevRunId, parts, trackRunFeature, JobType.MIGRATE))); val bcConnectionFetcher = sContext.broadcast(connectionFetcher) val bcPropHelper = sContext.broadcast(propertyHelper) val bcJobFactory = sContext.broadcast(jobFactory) @@ -44,7 +45,7 @@ object Migrate extends BasePartitionJob { originConnection.withSessionDo(originSession => targetConnection.withSessionDo(targetSession => bcJobFactory.value.getInstance(originSession, targetSession, bcPropHelper.value) - .processSlice(slice, trackRunFeature, bcRunId.value))) + .processPartitionRange(slice, trackRunFeature, bcRunId.value))) }) } } diff --git a/src/test/java/com/datastax/cdm/cql/codec/BigInteger_BIGINTCodecTest.java b/src/test/java/com/datastax/cdm/cql/codec/BigInteger_BIGINTCodecTest.java index e6cf4fcf..a542304f 100644 --- a/src/test/java/com/datastax/cdm/cql/codec/BigInteger_BIGINTCodecTest.java +++ b/src/test/java/com/datastax/cdm/cql/codec/BigInteger_BIGINTCodecTest.java @@ -15,7 +15,6 @@ */ package com.datastax.cdm.cql.codec; -import java.math.BigInteger; import java.nio.ByteBuffer; import org.junit.jupiter.api.AfterEach; diff --git a/src/test/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatementTest.java b/src/test/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatementTest.java index 0d358520..ffc25a47 100644 --- a/src/test/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatementTest.java +++ b/src/test/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatementTest.java @@ -23,16 +23,16 @@ import static org.mockito.Mockito.when; import java.time.Duration; -import java.util.*; -import java.util.function.Consumer; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mock; -import org.mockito.Mockito; import com.datastax.cdm.cql.CommonMocks; -import com.datastax.cdm.job.Partition; +import com.datastax.cdm.job.PartitionRange; import com.datastax.cdm.job.RunNotStartedException; import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.cql.BoundStatement; @@ -98,7 +98,7 @@ public void getPartitionsByStatus() { when(mockIterator.next()).thenReturn(row3); targetUpsertRunDetailsStatement = new TargetUpsertRunDetailsStatement(cqlSession, "ks.table1"); - Collection parts = targetUpsertRunDetailsStatement.getPartitionsByStatus(123l, "RUNNING"); + Collection parts = targetUpsertRunDetailsStatement.getPartitionsByStatus(123l, "RUNNING"); // This test is incorrect, but needs to be troubleshot & fixed. The actual code works, but the test does not assertEquals(0, parts.size()); diff --git a/src/test/java/com/datastax/cdm/data/RecordTest.java b/src/test/java/com/datastax/cdm/data/RecordTest.java new file mode 100644 index 00000000..8d0e10d3 --- /dev/null +++ b/src/test/java/com/datastax/cdm/data/RecordTest.java @@ -0,0 +1,47 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.cdm.data; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import org.junit.jupiter.api.Test; + +public class RecordTest { + + @Test + public void enumValues() { + assertEquals(4, Record.Diff.values().length); + } + + @Test + public void recordException() { + Exception e = assertThrows(IllegalArgumentException.class, () -> { + new Record(null, null); + }); + assertTrue(e.getMessage().contains("pk and at least one row must be provided")); + } + + @Test + public void recordWithFutureRow() { + Exception e = assertThrows(IllegalArgumentException.class, () -> { + new Record(null, null, null); + }); + assertTrue(e.getMessage().contains("pk and at least one row must be provided")); + } + +} diff --git a/src/test/java/com/datastax/cdm/feature/TrackRunTest.java b/src/test/java/com/datastax/cdm/feature/TrackRunTest.java index 4944d405..97521251 100644 --- a/src/test/java/com/datastax/cdm/feature/TrackRunTest.java +++ b/src/test/java/com/datastax/cdm/feature/TrackRunTest.java @@ -27,7 +27,8 @@ import org.mockito.Mock; import com.datastax.cdm.cql.CommonMocks; -import com.datastax.cdm.job.Partition; +import com.datastax.cdm.job.IJobSessionFactory.JobType; +import com.datastax.cdm.job.PartitionRange; import com.datastax.cdm.job.RunNotStartedException; import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.cql.BoundStatement; @@ -50,17 +51,17 @@ public void setup() { @Test void countTypesAndStatus() { - assertEquals("MIGRATE", TrackRun.RUN_TYPE.MIGRATE.name()); - assertEquals("DIFF_DATA", TrackRun.RUN_TYPE.DIFF_DATA.name()); + assertEquals("MIGRATE", JobType.MIGRATE.name()); + assertEquals("VALIDATE", JobType.VALIDATE.name()); - assertEquals(2, TrackRun.RUN_TYPE.values().length); + assertEquals(3, JobType.values().length); assertEquals(7, TrackRun.RUN_STATUS.values().length); } @Test void init() throws RunNotStartedException { TrackRun trackRun = new TrackRun(cqlSession, "keyspace.table"); - Collection parts = trackRun.getPendingPartitions(0); + Collection parts = trackRun.getPendingPartitions(0); assertEquals(0, parts.size()); } diff --git a/src/test/java/com/datastax/cdm/job/SplitPartitionsTest.java b/src/test/java/com/datastax/cdm/job/SplitPartitionsTest.java index 4351850a..26d6f93d 100644 --- a/src/test/java/com/datastax/cdm/job/SplitPartitionsTest.java +++ b/src/test/java/com/datastax/cdm/job/SplitPartitionsTest.java @@ -32,8 +32,8 @@ void tearDown() { @Test void getRandomSubPartitionsTest() { - List partitions = SplitPartitions.getRandomSubPartitions(10, BigInteger.ONE, BigInteger.valueOf(100), - 100); + List partitions = SplitPartitions.getRandomSubPartitions(10, BigInteger.ONE, + BigInteger.valueOf(100), 100); assertEquals(10, partitions.size()); partitions.forEach(p -> { assertEquals(9, p.getMax().longValue() - p.getMin().longValue()); @@ -42,8 +42,8 @@ void getRandomSubPartitionsTest() { @Test void getRandomSubPartitionsTestOver100() { - List partitions = SplitPartitions.getRandomSubPartitions(8, BigInteger.ONE, BigInteger.valueOf(44), - 200); + List partitions = SplitPartitions.getRandomSubPartitions(8, BigInteger.ONE, + BigInteger.valueOf(44), 200); assertEquals(8, partitions.size()); }