From dbe41a2407f15bdfccc7b70542e103c855dd9a62 Mon Sep 17 00:00:00 2001 From: zeroflag Date: Thu, 28 Sep 2023 14:32:04 +0200 Subject: [PATCH] KNOX-2959 - Auto discovery to support scaling scenarios --- ...ouderaManagerServiceDiscoveryMessages.java | 11 ++ .../monitor/PollingConfigurationAnalyzer.java | 126 +++++++++++++++--- .../discovery/cm/monitor/RoleHostCache.java | 38 ++++++ .../PollingConfigurationAnalyzerTest.java | 34 ++++- .../cm/monitor/RoleHostCacheTest.java | 53 ++++++++ 5 files changed, 244 insertions(+), 18 deletions(-) create mode 100644 gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/RoleHostCache.java create mode 100644 gateway-discovery-cm/src/test/java/org/apache/knox/gateway/topology/discovery/cm/monitor/RoleHostCacheTest.java diff --git a/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/ClouderaManagerServiceDiscoveryMessages.java b/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/ClouderaManagerServiceDiscoveryMessages.java index 4c1842fa2f..0960e79a3e 100644 --- a/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/ClouderaManagerServiceDiscoveryMessages.java +++ b/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/ClouderaManagerServiceDiscoveryMessages.java @@ -16,6 +16,8 @@ */ package org.apache.knox.gateway.topology.discovery.cm; +import java.util.Set; + import com.cloudera.api.swagger.client.ApiException; import org.apache.knox.gateway.i18n.messages.Message; import org.apache.knox.gateway.i18n.messages.MessageLevel; @@ -199,6 +201,9 @@ void queryingConfigActivationEventsFromCluster(String clusterName, @Message(level = MessageLevel.DEBUG, text = "Activation event relevance: {0} = {1} ({2} / {3} / {4} / {5})") void activationEventRelevance(String eventId, String relevance, String command, String status, String serviceType, boolean serviceModelGeneratorExists); + @Message(level = MessageLevel.DEBUG, text = "Deactivation event relevance: {0} = {1} ({2} / {3} / {4})") + void deactivationEventRelevance(String eventId, String relevance, String eventCode, String serviceType, boolean serviceModelGeneratorExists); + @Message(level = MessageLevel.DEBUG, text = "Activation event - {0} - has already been processed, skipping ...") void activationEventAlreadyProcessed(String eventId); @@ -264,4 +269,10 @@ void roleConfigurationPropertyHasChanged(String propertyName, @Message(level = MessageLevel.WARN, text = "The configured maximum retry attempts of {0} may overlap with the configured polling interval settings; using {1} retry attempts") void updateMaxRetryAttempts(int configured, int actual); + + @Message(level = MessageLevel.DEBUG, text = "Found upscale event for role: {0} hosts: {1}") + void foundUpScaleEvent(String role, Set hosts); + + @Message(level = MessageLevel.DEBUG, text = "Found downscale event for role: {0} hosts: {1}") + void foundDownScaleEvent(String role, Set hosts); } diff --git a/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java b/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java index 1c0374d814..98c2afcaba 100644 --- a/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java +++ b/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java @@ -61,6 +61,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; @@ -91,10 +92,16 @@ public class PollingConfigurationAnalyzer implements Runnable { static final String CM_SERVICE_TYPE = "ManagerServer"; static final String CM_SERVICE = "ClouderaManager"; + public static final String EVENT_CODE_ROLE_DELETED = "EV_ROLE_DELETED"; + public static final String EVENT_CODE_ROLE_CREATED = "EV_ROLE_CREATED"; + // Collection of those commands which represent the potential activation of service configuration changes private static final Collection ACTIVATION_COMMANDS = Arrays.asList(START_COMMAND, RESTART_COMMAND, ROLLING_RESTART_COMMAND, RESTART_WAITING_FOR_STALENESS_SUCCESS_COMMAND); + private static final Collection DEACTIVATION_EVENT_CODES = Arrays.asList(EVENT_CODE_ROLE_DELETED); + private static final Collection ACTIVATION_EVENT_CODES = Arrays.asList(EVENT_CODE_ROLE_CREATED); + // The format of the filter employed when start events are queried from ClouderaManager private static final String EVENTS_QUERY_FORMAT = "category==" + ApiEventCategory.AUDIT_EVENT.getValue() + @@ -117,6 +124,8 @@ public class PollingConfigurationAnalyzer implements Runnable { private ClusterConfigurationCache configCache; + private final RoleHostCache hostCache = new RoleHostCache(); + // Single listener for configuration change events private ConfigurationChangeListener changeListener; @@ -222,14 +231,17 @@ public void run() { // Configuration changes don't mean anything without corresponding service start/restarts. Therefore, monitor // start events, and check the configuration only of the restarted service(s) to identify changes // that should trigger re-discovery. - final List relevantEvents = getRelevantEvents(address, clusterName); + final List relevantEvents = getRelevantEvents(address, clusterName); // If there are no recent start events, then nothing to do now if (!relevantEvents.isEmpty()) { // If a change has occurred, notify the listeners - if (hasConfigChanged(address, clusterName, relevantEvents)) { + if (hasConfigChanged(address, clusterName, relevantEvents) || hasScaleEvent(relevantEvents)) { notifyChangeListener(address, clusterName); } + // these events should not be processed again even if the next CM query result contains them + relevantEvents.forEach(re -> processedEvents.put(re.auditEvent.getId(), 1L)); + updateHostCache(relevantEvents); } } } @@ -250,7 +262,42 @@ public void run() { log.stoppedClouderaManagerConfigMonitor(); } - private boolean hasConfigChanged(String address, String clusterName, List relevantEvents) { + private boolean hasScaleEvent(List relevantEvents) { + boolean found = false; + for (RelevantEvent event: relevantEvents) { + if (processedEvents.getIfPresent(event.auditEvent.getId()) != null) { + log.activationEventAlreadyProcessed(event.auditEvent.getId()); + continue; + } + if (event.getRole() != null) { + if (!hostCache.hosts(event.getRole()).containsAll(event.getHosts())) { + log.foundUpScaleEvent(event.getRole(), event.getHosts()); + found = true; + break; + } + if (event.isDeactivationEvent()) { + log.foundDownScaleEvent(event.getRole(), event.getHosts()); + found = true; + break; + } + } + } + return found; + } + + private void updateHostCache(List relevantEvents) { + for (RelevantEvent event: relevantEvents) { + if (event.getRole() != null) { + if (event.isDeactivationEvent()) { + hostCache.removeHosts(event.getRole(), event.getHosts()); + } else { + hostCache.addHosts(event.getRole(), event.getHosts()); + } + } + } + } + + private boolean hasConfigChanged(String address, String clusterName, List relevantEvents) { // If there are start events, then check the previously-recorded properties for the same service to // identify if the configuration has changed final Map serviceConfigurations = @@ -260,12 +307,16 @@ private boolean hasConfigChanged(String address, String clusterName, List handledServiceTypes = new ArrayList<>(); boolean configHasChanged = false; - for (StartEvent re : relevantEvents) { + for (RelevantEvent re : relevantEvents) { if (processedEvents.getIfPresent(re.auditEvent.getId()) != null) { log.activationEventAlreadyProcessed(re.auditEvent.getId()); continue; } + if (re.isDeactivationEvent()) { + continue; + } + String serviceType = re.getServiceType(); if (CM_SERVICE_TYPE.equals(serviceType)) { @@ -309,9 +360,6 @@ private boolean hasConfigChanged(String address, String clusterName, List processedEvents.put(re.auditEvent.getId(), 1L)); - return configHasChanged; } @@ -420,8 +468,8 @@ private DiscoveryApiClient getApiClient(final ServiceDiscoveryConfig discoveryCo * * @return A List of StartEvent objects for service start events since the last time they were queried. */ - private List getRelevantEvents(final String address, final String clusterName) { - List relevantEvents = new ArrayList<>(); + private List getRelevantEvents(final String address, final String clusterName) { + List relevantEvents = new ArrayList<>(); // Get the last event query timestamp Instant lastTimestamp = getEventQueryTimestamp(address, clusterName); @@ -446,8 +494,8 @@ private List getRelevantEvents(final String address, final String cl log.noActivationEventFound(); } else { for (ApiEvent event : events) { - if(isRelevantEvent(event)) { - relevantEvents.add(new StartEvent(event)); + if(isActivationEvent(event) || isDeactivationEvent(event)) { + relevantEvents.add(new RelevantEvent(event)); } } } @@ -456,17 +504,30 @@ private List getRelevantEvents(final String address, final String cl } @SuppressWarnings("unchecked") - private boolean isRelevantEvent(ApiEvent event) { + private boolean isActivationEvent(ApiEvent event) { final Map attributeMap = getAttributeMap(event.getAttributes()); final String command = getAttribute(attributeMap, COMMAND); final String status = getAttribute(attributeMap, COMMAND_STATUS); - final String serviceType = getAttribute(attributeMap, StartEvent.ATTR_SERVICE_TYPE); + final String serviceType = getAttribute(attributeMap, RelevantEvent.ATTR_SERVICE_TYPE); + final String eventCode = getAttribute(attributeMap, RelevantEvent.ATTR_EVENT_CODE); final boolean serviceModelGeneratorExists = serviceModelGeneratorsHolder.getServiceModelGenerators(serviceType) != null; - final boolean relevant = ACTIVATION_COMMANDS.contains(command) && SUCCEEDED_STATUS.equals(status) && serviceModelGeneratorExists; + final boolean relevant = (ACTIVATION_EVENT_CODES.contains(eventCode) + || (ACTIVATION_COMMANDS.contains(command) && SUCCEEDED_STATUS.equals(status))) + && serviceModelGeneratorExists; log.activationEventRelevance(event.getId(), String.valueOf(relevant), command, status, serviceType, serviceModelGeneratorExists); return relevant; } + private boolean isDeactivationEvent(ApiEvent event) { + final Map attributeMap = getAttributeMap(event.getAttributes()); + final String serviceType = getAttribute(attributeMap, RelevantEvent.ATTR_SERVICE_TYPE); + final String eventCode = getAttribute(attributeMap, RelevantEvent.ATTR_EVENT_CODE); + final boolean serviceModelGeneratorExists = serviceModelGeneratorsHolder.getServiceModelGenerators(serviceType) != null; + final boolean relevant = DEACTIVATION_EVENT_CODES.contains(eventCode) && serviceModelGeneratorExists; + log.deactivationEventRelevance(event.getId(), String.valueOf(relevant), eventCode, serviceType, relevant); + return relevant; + } + @SuppressWarnings("unchecked") private String getAttribute( Map attributeMap, String attributeName) { return attributeMap.containsKey(attributeName) ? ((List) attributeMap.get(attributeName)).get(0) : ""; @@ -605,11 +666,14 @@ private boolean hasConfigurationChanged(final ServiceConfigurationModel previous /** * Internal representation of a ClouderaManager service start event */ - static final class StartEvent { + static final class RelevantEvent { private static final String ATTR_CLUSTER = "CLUSTER"; private static final String ATTR_SERVICE_TYPE = "SERVICE_TYPE"; private static final String ATTR_SERVICE = "SERVICE"; + private static final String ATTR_ROLE = "ROLE_TYPE"; + private static final String ATTR_HOST = "HOSTS"; + private static final String ATTR_EVENT_CODE = "EVENTCODE"; private static List attrsOfInterest = new ArrayList<>(); @@ -617,14 +681,20 @@ static final class StartEvent { attrsOfInterest.add(ATTR_CLUSTER); attrsOfInterest.add(ATTR_SERVICE_TYPE); attrsOfInterest.add(ATTR_SERVICE); + attrsOfInterest.add(ATTR_ROLE); + attrsOfInterest.add(ATTR_HOST); + attrsOfInterest.add(ATTR_EVENT_CODE); } private ApiEvent auditEvent; private String clusterName; private String serviceType; private String service; + private String role; + private String eventCode; + private Set hosts = new HashSet<>(); - StartEvent(final ApiEvent auditEvent) { + RelevantEvent(final ApiEvent auditEvent) { if (ApiEventCategory.AUDIT_EVENT != auditEvent.getCategory()) { throw new IllegalArgumentException("Invalid event category " + auditEvent.getCategory().getValue()); } @@ -652,6 +722,18 @@ String getService() { return service; } + Set getHosts() { + return hosts; + } + + String getRole() { + return role; + } + + boolean isDeactivationEvent() { + return EVENT_CODE_ROLE_DELETED.equals(eventCode); + } + private void setPropertyFromAttribute(final ApiEventAttribute attribute) { switch (attribute.getName()) { case ATTR_CLUSTER: @@ -663,9 +745,19 @@ private void setPropertyFromAttribute(final ApiEventAttribute attribute) { case ATTR_SERVICE: service = attribute.getValues().get(0); break; + case ATTR_HOST: + if (attribute.getValues() != null && !attribute.getValues().isEmpty()) { + hosts.addAll(attribute.getValues()); + } + break; + case ATTR_ROLE: + role = attribute.getValues().get(0); + break; + case ATTR_EVENT_CODE: + eventCode = attribute.getValues().get(0); + break; default: } } } - } diff --git a/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/RoleHostCache.java b/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/RoleHostCache.java new file mode 100644 index 0000000000..085e142ef4 --- /dev/null +++ b/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/RoleHostCache.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.knox.gateway.topology.discovery.cm.monitor; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +public class RoleHostCache { + private final Map> hosts = new ConcurrentHashMap<>(); + + public void addHosts(String role, Set hosts) { + hosts(role).addAll(hosts); + } + + public void removeHosts(String role, Set hosts) { + hosts(role).removeAll(hosts); + } + + public Set hosts(String role) { + return hosts.computeIfAbsent(role, k -> new HashSet<>()); + } +} diff --git a/gateway-discovery-cm/src/test/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzerTest.java b/gateway-discovery-cm/src/test/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzerTest.java index 33f5e8f935..d6dc186f48 100644 --- a/gateway-discovery-cm/src/test/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzerTest.java +++ b/gateway-discovery-cm/src/test/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzerTest.java @@ -381,6 +381,38 @@ public void testClusterConfigMonitorTerminationForNoLongerReferencedClusters() { } } + @Test + public void testNotificationSentAfterDownScaleEvent() { + final String clusterName = "Cluster T"; + + final List revisionEventAttrs = new ArrayList<>(); + revisionEventAttrs.add(createEventAttribute("CLUSTER", clusterName)); + revisionEventAttrs.add(createEventAttribute("SERVICE_TYPE", HiveOnTezServiceModelGenerator.SERVICE_TYPE)); + revisionEventAttrs.add(createEventAttribute("SERVICE", HiveOnTezServiceModelGenerator.SERVICE)); + revisionEventAttrs.add(createEventAttribute("ROLE_TYPE", HiveOnTezServiceModelGenerator.ROLE_TYPE)); + revisionEventAttrs.add(createEventAttribute("REVISION", "215")); + revisionEventAttrs.add(createEventAttribute("EVENTCODE", PollingConfigurationAnalyzer.EVENT_CODE_ROLE_DELETED)); + final ApiEvent revisionEvent = createApiEvent(ApiEventCategory.AUDIT_EVENT, revisionEventAttrs, null); + + doTestEventWithConfigChange(revisionEvent, clusterName); + } + + @Test + public void testNotificationSentAfterUpScaleEvent() { + final String clusterName = "Cluster T"; + + final List revisionEventAttrs = new ArrayList<>(); + revisionEventAttrs.add(createEventAttribute("CLUSTER", clusterName)); + revisionEventAttrs.add(createEventAttribute("SERVICE_TYPE", HiveOnTezServiceModelGenerator.SERVICE_TYPE)); + revisionEventAttrs.add(createEventAttribute("SERVICE", HiveOnTezServiceModelGenerator.SERVICE)); + revisionEventAttrs.add(createEventAttribute("ROLE_TYPE", HiveOnTezServiceModelGenerator.ROLE_TYPE)); + revisionEventAttrs.add(createEventAttribute("REVISION", "215")); + revisionEventAttrs.add(createEventAttribute("EVENTCODE", PollingConfigurationAnalyzer.EVENT_CODE_ROLE_CREATED)); + final ApiEvent revisionEvent = createApiEvent(ApiEventCategory.AUDIT_EVENT, revisionEventAttrs, null); + + doTestEventWithConfigChange(revisionEvent, clusterName); + } + private void doTestStartEvent(final ApiEventCategory category) { final String clusterName = "My Cluster"; final String serviceType = NameNodeServiceModelGenerator.SERVICE_TYPE; @@ -392,7 +424,7 @@ private void doTestStartEvent(final ApiEventCategory category) { apiEventAttrs.add(createEventAttribute("SERVICE", service)); ApiEvent apiEvent = createApiEvent(category, apiEventAttrs, null); - PollingConfigurationAnalyzer.StartEvent restartEvent = new PollingConfigurationAnalyzer.StartEvent(apiEvent); + PollingConfigurationAnalyzer.RelevantEvent restartEvent = new PollingConfigurationAnalyzer.RelevantEvent(apiEvent); assertNotNull(restartEvent); assertEquals(clusterName, restartEvent.getClusterName()); assertEquals(serviceType, restartEvent.getServiceType()); diff --git a/gateway-discovery-cm/src/test/java/org/apache/knox/gateway/topology/discovery/cm/monitor/RoleHostCacheTest.java b/gateway-discovery-cm/src/test/java/org/apache/knox/gateway/topology/discovery/cm/monitor/RoleHostCacheTest.java new file mode 100644 index 0000000000..76ba85e611 --- /dev/null +++ b/gateway-discovery-cm/src/test/java/org/apache/knox/gateway/topology/discovery/cm/monitor/RoleHostCacheTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.knox.gateway.topology.discovery.cm.monitor; + +import java.util.Arrays; +import java.util.HashSet; + +import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class RoleHostCacheTest { + private static final String ROLE_1 = "role1"; + private static final String ROLE_2 = "role2"; + private static final String HOST_1 = "host1"; + private static final String HOST_2 = "host2"; + private final RoleHostCache cache = new RoleHostCache(); + + @Test + public void testInitiallyEmpty() { + assertTrue(cache.hosts(ROLE_1).isEmpty()); + assertTrue(cache.hosts(ROLE_2).isEmpty()); + } + + @Test + public void testAddingNewHost() { + cache.addHosts(ROLE_1, new HashSet<>(Arrays.asList(HOST_1, HOST_2))); + assertEquals(cache.hosts(ROLE_1), new HashSet<>(Arrays.asList(HOST_1, HOST_2))); + assertTrue(cache.hosts(ROLE_2).isEmpty()); + } + + @Test + public void testRemovingHost() { + cache.addHosts(ROLE_2, new HashSet<>(Arrays.asList(HOST_1, HOST_2))); + assertEquals(cache.hosts(ROLE_2), new HashSet<>(Arrays.asList(HOST_1, HOST_2))); + cache.removeHosts(ROLE_2, new HashSet<>(Arrays.asList(HOST_2))); + assertEquals(cache.hosts(ROLE_2), new HashSet<>(Arrays.asList(HOST_1))); + } +} \ No newline at end of file