diff --git a/ambry-clustermap/src/main/java/com/github/ambry/clustermap/HelixParticipant.java b/ambry-clustermap/src/main/java/com/github/ambry/clustermap/HelixParticipant.java index e94b5b9260..cb82a82e13 100644 --- a/ambry-clustermap/src/main/java/com/github/ambry/clustermap/HelixParticipant.java +++ b/ambry-clustermap/src/main/java/com/github/ambry/clustermap/HelixParticipant.java @@ -145,8 +145,7 @@ public void participate(List ambryStatsReports, AccountStatsSt StateMachineEngine stateMachineEngine = manager.getStateMachineEngine(); stateMachineEngine.registerStateModelFactory(clusterMapConfig.clustermapStateModelDefinition, new AmbryStateModelFactory(clusterMapConfig, this, clusterManager)); - registerStatsReportAggregationTasks(stateMachineEngine, ambryStatsReports, accountStatsStore, callback); - registerPropertyStoreCleanUpTask(stateMachineEngine); + registerTasks(stateMachineEngine, ambryStatsReports, accountStatsStore, callback); try { // register server as a participant manager.connect(); @@ -799,15 +798,17 @@ private void awaitDisablingPartition() throws InterruptedException { } /** - * Register aggregation tasks for appropriate {@link AmbryStatsReport}s. + * This method will register Helix Tasks + * Register aggregation tasks for appropriate {@link AmbryStatsReport}s and {@link PropertyStoreCleanUpTask}. * @param engine the {@link StateMachineEngine} to register the task state model. * @param statsReports the {@link List} of {@link AmbryStatsReport}s that may require the registration of * corresponding {@link MySqlReportAggregatorTask}s. * @param accountStatsStore the {@link AccountStatsStore} to retrieve and store container stats. * @param callback a callback which will be invoked when the aggregation report has been generated successfully. */ - private void registerStatsReportAggregationTasks(StateMachineEngine engine, List statsReports, + private void registerTasks(StateMachineEngine engine, List statsReports, AccountStatsStore accountStatsStore, Callback callback) { + //Register MySqlReportAggregatorTask Map taskFactoryMap = new HashMap<>(); for (final AmbryStatsReport statsReport : statsReports) { taskFactoryMap.put( @@ -815,25 +816,18 @@ private void registerStatsReportAggregationTasks(StateMachineEngine engine, List context -> new MySqlReportAggregatorTask(context.getManager(), statsReport.getAggregateIntervalInMinutes(), statsReport.getStatsReportType(), accountStatsStore, callback, clusterMapConfig, metricRegistry)); } - if (!taskFactoryMap.isEmpty()) { - engine.registerStateModelFactory(TaskConstants.STATE_MODEL_NAME, - new TaskStateModelFactory(manager, taskFactoryMap)); - } - } - /** - * Register Property Store Clean Up Task for cleaning up expired {@link DataNodeConfig} in Property Store. - * @param engine the {@link StateMachineEngine} to register the task state model. - */ - private void registerPropertyStoreCleanUpTask(StateMachineEngine engine){ + //Register PropertyStoreTask if(clusterMapConfig.clustermapEnablePropertyStoreCleanUpTask) { logger.info("Registering PropertyStoreCleanUpTask"); - Map taskFactoryMap = new HashMap<>(); taskFactoryMap.put(PropertyStoreCleanUpTask.COMMAND, context -> new PropertyStoreCleanUpTask(context.getManager(), dataNodeConfigSource, clusterMapConfig, metricRegistry)); + } + + if (!taskFactoryMap.isEmpty()) { engine.registerStateModelFactory(TaskConstants.STATE_MODEL_NAME, - new TaskStateModelFactory(manager, taskFactoryMap), PropertyStoreCleanUpTask.COMMAND); + new TaskStateModelFactory(manager, taskFactoryMap)); } } diff --git a/ambry-clustermap/src/test/java/com/github/ambry/clustermap/HelixParticipantTest.java b/ambry-clustermap/src/test/java/com/github/ambry/clustermap/HelixParticipantTest.java index 4e365b19fd..878cbc4c13 100644 --- a/ambry-clustermap/src/test/java/com/github/ambry/clustermap/HelixParticipantTest.java +++ b/ambry-clustermap/src/test/java/com/github/ambry/clustermap/HelixParticipantTest.java @@ -890,7 +890,7 @@ public void testParticipateMethodWithHelixPropertyStoreTaskEnabled() throws Exce helixParticipant.participate( Collections.emptyList(), null, null); assertNotNull("PropertyStoreCleanUpTask should be registered", helixParticipant.getHelixManager().getStateMachineEngine() - .getStateModelFactory(TaskConstants.STATE_MODEL_NAME, PropertyStoreCleanUpTask.COMMAND)); + .getStateModelFactory(TaskConstants.STATE_MODEL_NAME)); helixParticipant.close(); } @@ -910,7 +910,7 @@ public void testParticipateMethodWithHelixPropertyStoreTaskDisabled() throws Exc helixParticipant.participate(Collections.emptyList(), null, null); assertNull("PropertyStoreCleanUpTask should not be registered", helixParticipant.getHelixManager().getStateMachineEngine() - .getStateModelFactory(TaskConstants.STATE_MODEL_NAME, PropertyStoreCleanUpTask.COMMAND)); + .getStateModelFactory(TaskConstants.STATE_MODEL_NAME)); helixParticipant.close(); }