From 0fba8b2d79aa3f263b0e95efb6715e6949e63b9c Mon Sep 17 00:00:00 2001 From: Pravin Bhat Date: Thu, 31 Oct 2024 12:15:04 -0400 Subject: [PATCH 1/2] Made Partition into its own class & refactored stuff to make that work --- .../TargetUpsertRunDetailsStatement.java | 13 +++--- .../com/datastax/cdm/feature/TrackRun.java | 8 ++-- .../datastax/cdm/job/AbstractJobSession.java | 4 +- .../com/datastax/cdm/job/CopyJobSession.java | 2 +- .../cdm/job/CopyJobSessionFactory.java | 4 +- .../cdm/job/DiffJobSessionFactory.java | 4 +- .../cdm/job/GuardrailCheckJobSession.java | 2 +- .../job/GuardrailCheckJobSessionFactory.java | 4 +- .../java/com/datastax/cdm/job/Partition.java | 43 +++++++++++++++++++ .../com/datastax/cdm/job/SplitPartitions.java | 24 ----------- .../datastax/cdm/job/BasePartitionJob.scala | 4 +- .../TargetUpsertRunDetailsStatementTest.java | 5 +-- .../datastax/cdm/feature/TrackRunTest.java | 4 +- .../datastax/cdm/job/SplitPartitionsTest.java | 8 ++-- 14 files changed, 73 insertions(+), 56 deletions(-) create mode 100644 src/main/java/com/datastax/cdm/job/Partition.java diff --git a/src/main/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatement.java b/src/main/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatement.java index 54fa44ab..65ce295f 100644 --- a/src/main/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatement.java +++ b/src/main/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatement.java @@ -26,9 +26,8 @@ import com.datastax.cdm.feature.TrackRun; import com.datastax.cdm.feature.TrackRun.RUN_TYPE; +import com.datastax.cdm.job.Partition; import com.datastax.cdm.job.RunNotStartedException; -import com.datastax.cdm.job.SplitPartitions; -import com.datastax.cdm.job.SplitPartitions.Partition; import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.cql.BoundStatement; import com.datastax.oss.driver.api.core.cql.ResultSet; @@ -88,7 +87,7 @@ public TargetUpsertRunDetailsStatement(CqlSession session, String keyspaceTable) + " WHERE table_name = ? AND run_id = ? AND status = ? ALLOW FILTERING"); } - public Collection getPendingPartitions(long prevRunId) throws RunNotStartedException { + public Collection getPendingPartitions(long prevRunId) throws RunNotStartedException { if (prevRunId == 0) { return Collections.emptyList(); } @@ -105,7 +104,7 @@ public Collection getPendingPartitions(long prevRunId } } - final Collection pendingParts = new ArrayList(); + final Collection pendingParts = new ArrayList(); pendingParts.addAll(getPartitionsByStatus(prevRunId, TrackRun.RUN_STATUS.NOT_STARTED.toString())); pendingParts.addAll(getPartitionsByStatus(prevRunId, TrackRun.RUN_STATUS.STARTED.toString())); pendingParts.addAll(getPartitionsByStatus(prevRunId, TrackRun.RUN_STATUS.FAIL.toString())); @@ -114,11 +113,11 @@ public Collection getPendingPartitions(long prevRunId return pendingParts; } - protected Collection getPartitionsByStatus(long prevRunId, String status) { + protected Collection getPartitionsByStatus(long prevRunId, String status) { ResultSet rs = session.execute(boundSelectStatement.setString("table_name", tableName) .setLong("run_id", prevRunId).setString("status", status)); - final Collection pendingParts = new ArrayList(); + final Collection pendingParts = new ArrayList(); rs.forEach(row -> { Partition part = new Partition(BigInteger.valueOf(row.getLong("token_min")), BigInteger.valueOf(row.getLong("token_max"))); @@ -127,7 +126,7 @@ protected Collection getPartitionsByStatus(long prevR return pendingParts; } - public void initCdmRun(long runId, long prevRunId, Collection parts, RUN_TYPE runType) { + public void initCdmRun(long runId, long prevRunId, Collection parts, RUN_TYPE runType) { ResultSet rsInfo = session .execute(boundSelectInfoStatement.setString("table_name", tableName).setLong("run_id", runId)); if (null != rsInfo.one()) { diff --git a/src/main/java/com/datastax/cdm/feature/TrackRun.java b/src/main/java/com/datastax/cdm/feature/TrackRun.java index 92b1784f..b1e7bab2 100644 --- a/src/main/java/com/datastax/cdm/feature/TrackRun.java +++ b/src/main/java/com/datastax/cdm/feature/TrackRun.java @@ -22,8 +22,8 @@ import org.slf4j.LoggerFactory; import com.datastax.cdm.cql.statement.TargetUpsertRunDetailsStatement; +import com.datastax.cdm.job.Partition; import com.datastax.cdm.job.RunNotStartedException; -import com.datastax.cdm.job.SplitPartitions; import com.datastax.oss.driver.api.core.CqlSession; public class TrackRun { @@ -42,14 +42,14 @@ public TrackRun(CqlSession session, String keyspaceTable) { this.runStatement = new TargetUpsertRunDetailsStatement(session, keyspaceTable); } - public Collection getPendingPartitions(long prevRunId) throws RunNotStartedException { - Collection pendingParts = runStatement.getPendingPartitions(prevRunId); + public Collection getPendingPartitions(long prevRunId) throws RunNotStartedException { + Collection pendingParts = runStatement.getPendingPartitions(prevRunId); logger.info("###################### {} partitions pending from previous run id {} ######################", pendingParts.size(), prevRunId); return pendingParts; } - public void initCdmRun(long runId, long prevRunId, Collection parts, RUN_TYPE runType) { + public void initCdmRun(long runId, long prevRunId, Collection parts, RUN_TYPE runType) { runStatement.initCdmRun(runId, prevRunId, parts, runType); logger.info("###################### Run Id for this job is: {} ######################", runId); } diff --git a/src/main/java/com/datastax/cdm/job/AbstractJobSession.java b/src/main/java/com/datastax/cdm/job/AbstractJobSession.java index 69853ca6..4671d25b 100644 --- a/src/main/java/com/datastax/cdm/job/AbstractJobSession.java +++ b/src/main/java/com/datastax/cdm/job/AbstractJobSession.java @@ -110,7 +110,7 @@ protected AbstractJobSession(CqlSession originSession, CqlSession targetSession, } } - public void processSlice(SplitPartitions.Partition slice, TrackRun trackRunFeature, long runId) { + public void processSlice(Partition slice, TrackRun trackRunFeature, long runId) { this.trackRunFeature = trackRunFeature; this.runId = runId; this.processSlice(slice.getMin(), slice.getMax()); @@ -118,7 +118,7 @@ public void processSlice(SplitPartitions.Partition slice, TrackRun trackRunFeatu protected abstract void processSlice(BigInteger min, BigInteger max); - public synchronized void initCdmRun(long runId, long prevRunId, Collection parts, + public synchronized void initCdmRun(long runId, long prevRunId, Collection parts, TrackRun trackRunFeature, TrackRun.RUN_TYPE runType) { this.runId = runId; this.trackRunFeature = trackRunFeature; diff --git a/src/main/java/com/datastax/cdm/job/CopyJobSession.java b/src/main/java/com/datastax/cdm/job/CopyJobSession.java index 514f14a8..0604203b 100644 --- a/src/main/java/com/datastax/cdm/job/CopyJobSession.java +++ b/src/main/java/com/datastax/cdm/job/CopyJobSession.java @@ -39,7 +39,7 @@ import com.datastax.oss.driver.api.core.cql.ResultSet; import com.datastax.oss.driver.api.core.cql.Row; -public class CopyJobSession extends AbstractJobSession { +public class CopyJobSession extends AbstractJobSession { private final PKFactory pkFactory; private final boolean isCounterTable; diff --git a/src/main/java/com/datastax/cdm/job/CopyJobSessionFactory.java b/src/main/java/com/datastax/cdm/job/CopyJobSessionFactory.java index 7cb29182..70a1e3ff 100644 --- a/src/main/java/com/datastax/cdm/job/CopyJobSessionFactory.java +++ b/src/main/java/com/datastax/cdm/job/CopyJobSessionFactory.java @@ -20,11 +20,11 @@ import com.datastax.cdm.properties.PropertyHelper; import com.datastax.oss.driver.api.core.CqlSession; -public class CopyJobSessionFactory implements IJobSessionFactory, Serializable { +public class CopyJobSessionFactory implements IJobSessionFactory, Serializable { private static final long serialVersionUID = 5255029377029801421L; private static CopyJobSession jobSession = null; - public AbstractJobSession getInstance(CqlSession originSession, CqlSession targetSession, + public AbstractJobSession getInstance(CqlSession originSession, CqlSession targetSession, PropertyHelper propHelper) { if (jobSession == null) { synchronized (CopyJobSession.class) { diff --git a/src/main/java/com/datastax/cdm/job/DiffJobSessionFactory.java b/src/main/java/com/datastax/cdm/job/DiffJobSessionFactory.java index 3b09b7c1..04f12a38 100644 --- a/src/main/java/com/datastax/cdm/job/DiffJobSessionFactory.java +++ b/src/main/java/com/datastax/cdm/job/DiffJobSessionFactory.java @@ -20,11 +20,11 @@ import com.datastax.cdm.properties.PropertyHelper; import com.datastax.oss.driver.api.core.CqlSession; -public class DiffJobSessionFactory implements IJobSessionFactory, Serializable { +public class DiffJobSessionFactory implements IJobSessionFactory, Serializable { private static final long serialVersionUID = -3543616512495020278L; private static DiffJobSession jobSession = null; - public AbstractJobSession getInstance(CqlSession originSession, CqlSession targetSession, + public AbstractJobSession getInstance(CqlSession originSession, CqlSession targetSession, PropertyHelper propHelper) { if (jobSession == null) { synchronized (DiffJobSession.class) { diff --git a/src/main/java/com/datastax/cdm/job/GuardrailCheckJobSession.java b/src/main/java/com/datastax/cdm/job/GuardrailCheckJobSession.java index bdcb74f3..1225b4e5 100644 --- a/src/main/java/com/datastax/cdm/job/GuardrailCheckJobSession.java +++ b/src/main/java/com/datastax/cdm/job/GuardrailCheckJobSession.java @@ -27,7 +27,7 @@ import com.datastax.oss.driver.api.core.cql.ResultSet; import com.datastax.oss.driver.api.core.cql.Row; -public class GuardrailCheckJobSession extends AbstractJobSession { +public class GuardrailCheckJobSession extends AbstractJobSession { public Logger logger = LoggerFactory.getLogger(this.getClass().getName()); diff --git a/src/main/java/com/datastax/cdm/job/GuardrailCheckJobSessionFactory.java b/src/main/java/com/datastax/cdm/job/GuardrailCheckJobSessionFactory.java index 6ebdd993..55c1bc9b 100644 --- a/src/main/java/com/datastax/cdm/job/GuardrailCheckJobSessionFactory.java +++ b/src/main/java/com/datastax/cdm/job/GuardrailCheckJobSessionFactory.java @@ -20,11 +20,11 @@ import com.datastax.cdm.properties.PropertyHelper; import com.datastax.oss.driver.api.core.CqlSession; -public class GuardrailCheckJobSessionFactory implements IJobSessionFactory, Serializable { +public class GuardrailCheckJobSessionFactory implements IJobSessionFactory, Serializable { private static final long serialVersionUID = -4673384128807660843L; private static GuardrailCheckJobSession jobSession = null; - public AbstractJobSession getInstance(CqlSession originSession, CqlSession targetSession, + public AbstractJobSession getInstance(CqlSession originSession, CqlSession targetSession, PropertyHelper propHelper) { if (jobSession == null) { synchronized (GuardrailCheckJobSession.class) { diff --git a/src/main/java/com/datastax/cdm/job/Partition.java b/src/main/java/com/datastax/cdm/job/Partition.java new file mode 100644 index 00000000..d36e9673 --- /dev/null +++ b/src/main/java/com/datastax/cdm/job/Partition.java @@ -0,0 +1,43 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.cdm.job; + +import java.io.Serializable; +import java.math.BigInteger; + +public class Partition implements Serializable { + private static final long serialVersionUID = 1L; + + private final BigInteger min; + private final BigInteger max; + + public Partition(BigInteger min, BigInteger max) { + this.min = min; + this.max = max; + } + + public BigInteger getMin() { + return min; + } + + public BigInteger getMax() { + return max; + } + + public String toString() { + return "Processing partition for token range " + min + " to " + max; + } +} \ No newline at end of file diff --git a/src/main/java/com/datastax/cdm/job/SplitPartitions.java b/src/main/java/com/datastax/cdm/job/SplitPartitions.java index eec89d62..ae884902 100644 --- a/src/main/java/com/datastax/cdm/job/SplitPartitions.java +++ b/src/main/java/com/datastax/cdm/job/SplitPartitions.java @@ -15,7 +15,6 @@ */ package com.datastax.cdm.job; -import java.io.Serializable; import java.math.BigInteger; import java.util.ArrayList; import java.util.Collections; @@ -76,27 +75,4 @@ private static List getSubPartitions(int numSplits, BigInteger min, B return partitions; } - public static class Partition implements Serializable { - private static final long serialVersionUID = 1L; - - private final BigInteger min; - private final BigInteger max; - - public Partition(BigInteger min, BigInteger max) { - this.min = min; - this.max = max; - } - - public BigInteger getMin() { - return min; - } - - public BigInteger getMax() { - return max; - } - - public String toString() { - return "Processing partition for token range " + min + " to " + max; - } - } } \ No newline at end of file diff --git a/src/main/scala/com/datastax/cdm/job/BasePartitionJob.scala b/src/main/scala/com/datastax/cdm/job/BasePartitionJob.scala index 2f380866..83c99613 100644 --- a/src/main/scala/com/datastax/cdm/job/BasePartitionJob.scala +++ b/src/main/scala/com/datastax/cdm/job/BasePartitionJob.scala @@ -20,11 +20,11 @@ import scala.reflect.io.File import com.datastax.cdm.feature.TrackRun import com.datastax.cdm.properties.KnownProperties -abstract class BasePartitionJob extends BaseJob[SplitPartitions.Partition] { +abstract class BasePartitionJob extends BaseJob[Partition] { var trackRunFeature: TrackRun = _ var keyspaceTableValue: String = _ - override def getParts(pieces: Int): util.Collection[SplitPartitions.Partition] = { + override def getParts(pieces: Int): util.Collection[Partition] = { var keyspaceTable: Option[String] = Option(propertyHelper.getString(KnownProperties.TARGET_KEYSPACE_TABLE)) .filter(_.nonEmpty) .orElse(Option(propertyHelper.getString(KnownProperties.ORIGIN_KEYSPACE_TABLE))) diff --git a/src/test/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatementTest.java b/src/test/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatementTest.java index 4186abaa..0d358520 100644 --- a/src/test/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatementTest.java +++ b/src/test/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatementTest.java @@ -32,8 +32,8 @@ import org.mockito.Mockito; import com.datastax.cdm.cql.CommonMocks; +import com.datastax.cdm.job.Partition; import com.datastax.cdm.job.RunNotStartedException; -import com.datastax.cdm.job.SplitPartitions; import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.cql.BoundStatement; import com.datastax.oss.driver.api.core.cql.PreparedStatement; @@ -98,8 +98,7 @@ public void getPartitionsByStatus() { when(mockIterator.next()).thenReturn(row3); targetUpsertRunDetailsStatement = new TargetUpsertRunDetailsStatement(cqlSession, "ks.table1"); - Collection parts = targetUpsertRunDetailsStatement.getPartitionsByStatus(123l, - "RUNNING"); + Collection parts = targetUpsertRunDetailsStatement.getPartitionsByStatus(123l, "RUNNING"); // This test is incorrect, but needs to be troubleshot & fixed. The actual code works, but the test does not assertEquals(0, parts.size()); diff --git a/src/test/java/com/datastax/cdm/feature/TrackRunTest.java b/src/test/java/com/datastax/cdm/feature/TrackRunTest.java index bc154a14..4944d405 100644 --- a/src/test/java/com/datastax/cdm/feature/TrackRunTest.java +++ b/src/test/java/com/datastax/cdm/feature/TrackRunTest.java @@ -27,8 +27,8 @@ import org.mockito.Mock; import com.datastax.cdm.cql.CommonMocks; +import com.datastax.cdm.job.Partition; import com.datastax.cdm.job.RunNotStartedException; -import com.datastax.cdm.job.SplitPartitions; import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.cql.BoundStatement; @@ -60,7 +60,7 @@ void countTypesAndStatus() { @Test void init() throws RunNotStartedException { TrackRun trackRun = new TrackRun(cqlSession, "keyspace.table"); - Collection parts = trackRun.getPendingPartitions(0); + Collection parts = trackRun.getPendingPartitions(0); assertEquals(0, parts.size()); } diff --git a/src/test/java/com/datastax/cdm/job/SplitPartitionsTest.java b/src/test/java/com/datastax/cdm/job/SplitPartitionsTest.java index 8b172f82..4351850a 100644 --- a/src/test/java/com/datastax/cdm/job/SplitPartitionsTest.java +++ b/src/test/java/com/datastax/cdm/job/SplitPartitionsTest.java @@ -32,8 +32,8 @@ void tearDown() { @Test void getRandomSubPartitionsTest() { - List partitions = SplitPartitions.getRandomSubPartitions(10, BigInteger.ONE, - BigInteger.valueOf(100), 100); + List partitions = SplitPartitions.getRandomSubPartitions(10, BigInteger.ONE, BigInteger.valueOf(100), + 100); assertEquals(10, partitions.size()); partitions.forEach(p -> { assertEquals(9, p.getMax().longValue() - p.getMin().longValue()); @@ -42,8 +42,8 @@ void getRandomSubPartitionsTest() { @Test void getRandomSubPartitionsTestOver100() { - List partitions = SplitPartitions.getRandomSubPartitions(8, BigInteger.ONE, - BigInteger.valueOf(44), 200); + List partitions = SplitPartitions.getRandomSubPartitions(8, BigInteger.ONE, BigInteger.valueOf(44), + 200); assertEquals(8, partitions.size()); } From 24570bfc1339daa9519022674308a011893a9973 Mon Sep 17 00:00:00 2001 From: Pravin Bhat Date: Thu, 31 Oct 2024 12:22:32 -0400 Subject: [PATCH 2/2] Made CounterUnit its own class & refactored JobCounter to work with it. --- .../com/datastax/cdm/job/CounterUnit.java | 50 +++++++++++++++++++ .../java/com/datastax/cdm/job/JobCounter.java | 36 ++----------- 2 files changed, 54 insertions(+), 32 deletions(-) create mode 100644 src/main/java/com/datastax/cdm/job/CounterUnit.java diff --git a/src/main/java/com/datastax/cdm/job/CounterUnit.java b/src/main/java/com/datastax/cdm/job/CounterUnit.java new file mode 100644 index 00000000..d8a04bee --- /dev/null +++ b/src/main/java/com/datastax/cdm/job/CounterUnit.java @@ -0,0 +1,50 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.cdm.job; + +import java.io.Serializable; +import java.util.concurrent.atomic.AtomicLong; + +public class CounterUnit implements Serializable { + + private static final long serialVersionUID = 2194336948011681878L; + private final AtomicLong globalCounter = new AtomicLong(0); + private final transient ThreadLocal threadLocalCounter = ThreadLocal.withInitial(() -> 0L); + + public void incrementThreadCounter(long incrementBy) { + threadLocalCounter.set(threadLocalCounter.get() + incrementBy); + } + + public long getThreadCounter() { + return threadLocalCounter.get(); + } + + public void resetThreadCounter() { + threadLocalCounter.set(0L); + } + + public void setGlobalCounter(long value) { + globalCounter.set(value); + } + + public void addThreadToGlobalCounter() { + globalCounter.addAndGet(threadLocalCounter.get()); + } + + public long getGlobalCounter() { + return globalCounter.get(); + } +} diff --git a/src/main/java/com/datastax/cdm/job/JobCounter.java b/src/main/java/com/datastax/cdm/job/JobCounter.java index 71811eb6..db67d719 100644 --- a/src/main/java/com/datastax/cdm/job/JobCounter.java +++ b/src/main/java/com/datastax/cdm/job/JobCounter.java @@ -15,15 +15,17 @@ */ package com.datastax.cdm.job; +import java.io.Serializable; import java.util.HashMap; -import java.util.concurrent.atomic.AtomicLong; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.datastax.cdm.feature.TrackRun; -public class JobCounter { +public class JobCounter implements Serializable { + + private static final long serialVersionUID = 7016816604237020549L; // Enumeration for counter types public enum CounterType { @@ -33,36 +35,6 @@ public enum CounterType { // Logger instance private final Logger logger = LoggerFactory.getLogger(this.getClass().getName()); - // Internal class to handle atomic counting operations - private static class CounterUnit { - private final AtomicLong globalCounter = new AtomicLong(0); - private final ThreadLocal threadLocalCounter = ThreadLocal.withInitial(() -> 0L); - - public void incrementThreadCounter(long incrementBy) { - threadLocalCounter.set(threadLocalCounter.get() + incrementBy); - } - - public long getThreadCounter() { - return threadLocalCounter.get(); - } - - public void resetThreadCounter() { - threadLocalCounter.set(0L); - } - - public void setGlobalCounter(long value) { - globalCounter.set(value); - } - - public void addThreadToGlobalCounter() { - globalCounter.addAndGet(threadLocalCounter.get()); - } - - public long getGlobalCounter() { - return globalCounter.get(); - } - } - // Declare individual counters for different operations private final HashMap counterMap = new HashMap<>();