From d2a9c5ad16b933f7114e04234e6761fbf452b230 Mon Sep 17 00:00:00 2001 From: Pravin Bhat Date: Wed, 4 Dec 2024 18:11:46 -0500 Subject: [PATCH] Fixed empty previous run issue (perform fresh run when previous run not found or not-started for any reason). --- RELEASE.md | 4 ++ .../TargetUpsertRunDetailsStatement.java | 6 ++- .../scala/com/datastax/cdm/job/BaseJob.scala | 2 +- .../datastax/cdm/job/BasePartitionJob.scala | 5 ++- .../TargetUpsertRunDetailsStatementTest.java | 37 +++++++++++++++++-- 5 files changed, 46 insertions(+), 8 deletions(-) diff --git a/RELEASE.md b/RELEASE.md index 3d90a804..fc598006 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -1,4 +1,8 @@ # Release Notes + +## [5.1.4] - 2024-12-04 +- Bug fix: Any run started with a `previousRunId` that is not found in the `cdm_run_info` table (for whatever reason), will be executed as a fresh new run instead of doing nothing. + ## [5.1.3] - 2024-11-27 - Bug fix: Fixed connection issue caused when using different types of origin and target clusters (e.g. Cassandra/DSE with host/port and Astra with SCB). diff --git a/src/main/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatement.java b/src/main/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatement.java index 6bd4c964..94b0d1ab 100644 --- a/src/main/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatement.java +++ b/src/main/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatement.java @@ -101,11 +101,13 @@ public Collection getPendingPartitions(long prevRunId, JobType j .execute(boundSelectInfoStatement.setString("table_name", tableName).setLong("run_id", prevRunId)); Row cdmRunStatus = rsInfo.one(); if (cdmRunStatus == null) { - return Collections.emptyList(); + throw new RunNotStartedException( + "###################### Run NOT FOUND for Previous RunId: " + prevRunId + ", starting new run!"); } else { String status = cdmRunStatus.getString("status"); if (TrackRun.RUN_STATUS.NOT_STARTED.toString().equals(status)) { - throw new RunNotStartedException("Run not started for run_id: " + prevRunId); + throw new RunNotStartedException("###################### Run NOT STARTED for Previous RunId: " + + prevRunId + ", starting new run!"); } } diff --git a/src/main/scala/com/datastax/cdm/job/BaseJob.scala b/src/main/scala/com/datastax/cdm/job/BaseJob.scala index 15c99784..9b3097f1 100644 --- a/src/main/scala/com/datastax/cdm/job/BaseJob.scala +++ b/src/main/scala/com/datastax/cdm/job/BaseJob.scala @@ -33,7 +33,7 @@ import scala.collection.JavaConverters._ abstract class BaseJob[T: ClassTag] extends App { - private val abstractLogger = LoggerFactory.getLogger(this.getClass.getName) + protected val abstractLogger = LoggerFactory.getLogger(this.getClass.getName) private var jobName: String = _ var jobFactory: IJobSessionFactory[T] = _ diff --git a/src/main/scala/com/datastax/cdm/job/BasePartitionJob.scala b/src/main/scala/com/datastax/cdm/job/BasePartitionJob.scala index c67b5927..1ae9164a 100644 --- a/src/main/scala/com/datastax/cdm/job/BasePartitionJob.scala +++ b/src/main/scala/com/datastax/cdm/job/BasePartitionJob.scala @@ -42,7 +42,10 @@ abstract class BasePartitionJob extends BaseJob[PartitionRange] { try { trackRunFeature.getPendingPartitions(prevRunId, jobType) } catch { - case e: RunNotStartedException => SplitPartitions.getRandomSubPartitions(pieces, minPartition, maxPartition, coveragePercent, jobType) + case e: RunNotStartedException => { + abstractLogger.warn(e.getMessage) + SplitPartitions.getRandomSubPartitions(pieces, minPartition, maxPartition, coveragePercent, jobType) + } } } else { SplitPartitions.getRandomSubPartitions(pieces, minPartition, maxPartition, coveragePercent, jobType) diff --git a/src/test/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatementTest.java b/src/test/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatementTest.java index 70347d38..6dbbc198 100644 --- a/src/test/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatementTest.java +++ b/src/test/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatementTest.java @@ -32,6 +32,7 @@ import org.mockito.Mock; import com.datastax.cdm.cql.CommonMocks; +import com.datastax.cdm.feature.TrackRun; import com.datastax.cdm.job.IJobSessionFactory.JobType; import com.datastax.cdm.job.PartitionRange; import com.datastax.cdm.job.RunNotStartedException; @@ -72,15 +73,43 @@ public void setup() { } @Test - public void getPendingPartitions_nothingPending() throws RunNotStartedException { + public void incorrectKsTable() { + assertThrows(RuntimeException.class, () -> new TargetUpsertRunDetailsStatement(cqlSession, "table1")); + } + + @Test + public void getPendingPartitions_noPrevRun() throws RunNotStartedException { targetUpsertRunDetailsStatement = new TargetUpsertRunDetailsStatement(cqlSession, "ks.table1"); assertEquals(Collections.emptyList(), targetUpsertRunDetailsStatement.getPendingPartitions(0, JobType.MIGRATE)); - assertEquals(Collections.emptyList(), targetUpsertRunDetailsStatement.getPendingPartitions(1, JobType.MIGRATE)); } @Test - public void incorrectKsTable() throws RunNotStartedException { - assertThrows(RuntimeException.class, () -> new TargetUpsertRunDetailsStatement(cqlSession, "table1")); + public void getPendingPartitions_noPrevRunFound() { + targetUpsertRunDetailsStatement = new TargetUpsertRunDetailsStatement(cqlSession, "ks.table1"); + assertThrows(RunNotStartedException.class, + () -> targetUpsertRunDetailsStatement.getPendingPartitions(1, JobType.MIGRATE)); + } + + @Test + public void getPendingPartitions_prevRunNotStarted() { + when(rs.one()).thenReturn(row1); + when(row1.getString("status")).thenReturn(TrackRun.RUN_STATUS.NOT_STARTED.toString()); + + targetUpsertRunDetailsStatement = new TargetUpsertRunDetailsStatement(cqlSession, "ks.table1"); + assertThrows(RunNotStartedException.class, + () -> targetUpsertRunDetailsStatement.getPendingPartitions(123, JobType.MIGRATE)); + } + + @Test + public void getPendingPartitions_prevRunNoPartsPending() throws RunNotStartedException { + when(rs.one()).thenReturn(row1); + when(row1.getString("status")).thenReturn(TrackRun.RUN_STATUS.ENDED.toString()); + Iterator mockIterator = mock(Iterator.class); + when(rs.iterator()).thenReturn(mockIterator); + when(mockIterator.hasNext()).thenReturn(false); + + targetUpsertRunDetailsStatement = new TargetUpsertRunDetailsStatement(cqlSession, "ks.table1"); + assertEquals(Collections.emptyList(), targetUpsertRunDetailsStatement.getPendingPartitions(123, JobType.MIGRATE)); } @Test