Skip to content

Commit

Permalink
KNOX-2959 - Auto discovery to support scaling scenarios
Browse files Browse the repository at this point in the history
  • Loading branch information
zeroflag committed Sep 28, 2023
1 parent 3af43b7 commit dbe41a2
Show file tree
Hide file tree
Showing 5 changed files with 244 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.knox.gateway.topology.discovery.cm;

import java.util.Set;

import com.cloudera.api.swagger.client.ApiException;
import org.apache.knox.gateway.i18n.messages.Message;
import org.apache.knox.gateway.i18n.messages.MessageLevel;
Expand Down Expand Up @@ -199,6 +201,9 @@ void queryingConfigActivationEventsFromCluster(String clusterName,
@Message(level = MessageLevel.DEBUG, text = "Activation event relevance: {0} = {1} ({2} / {3} / {4} / {5})")
void activationEventRelevance(String eventId, String relevance, String command, String status, String serviceType, boolean serviceModelGeneratorExists);

@Message(level = MessageLevel.DEBUG, text = "Deactivation event relevance: {0} = {1} ({2} / {3} / {4})")
void deactivationEventRelevance(String eventId, String relevance, String eventCode, String serviceType, boolean serviceModelGeneratorExists);

@Message(level = MessageLevel.DEBUG, text = "Activation event - {0} - has already been processed, skipping ...")
void activationEventAlreadyProcessed(String eventId);

Expand Down Expand Up @@ -264,4 +269,10 @@ void roleConfigurationPropertyHasChanged(String propertyName,

@Message(level = MessageLevel.WARN, text = "The configured maximum retry attempts of {0} may overlap with the configured polling interval settings; using {1} retry attempts")
void updateMaxRetryAttempts(int configured, int actual);

@Message(level = MessageLevel.DEBUG, text = "Found upscale event for role: {0} hosts: {1}")
void foundUpScaleEvent(String role, Set<String> hosts);

@Message(level = MessageLevel.DEBUG, text = "Found downscale event for role: {0} hosts: {1}")
void foundDownScaleEvent(String role, Set<String> hosts);
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -91,10 +92,16 @@ public class PollingConfigurationAnalyzer implements Runnable {
static final String CM_SERVICE_TYPE = "ManagerServer";
static final String CM_SERVICE = "ClouderaManager";

public static final String EVENT_CODE_ROLE_DELETED = "EV_ROLE_DELETED";
public static final String EVENT_CODE_ROLE_CREATED = "EV_ROLE_CREATED";

// Collection of those commands which represent the potential activation of service configuration changes
private static final Collection<String> ACTIVATION_COMMANDS = Arrays.asList(START_COMMAND, RESTART_COMMAND, ROLLING_RESTART_COMMAND,
RESTART_WAITING_FOR_STALENESS_SUCCESS_COMMAND);

private static final Collection<String> DEACTIVATION_EVENT_CODES = Arrays.asList(EVENT_CODE_ROLE_DELETED);
private static final Collection<String> ACTIVATION_EVENT_CODES = Arrays.asList(EVENT_CODE_ROLE_CREATED);

// The format of the filter employed when start events are queried from ClouderaManager
private static final String EVENTS_QUERY_FORMAT =
"category==" + ApiEventCategory.AUDIT_EVENT.getValue() +
Expand All @@ -117,6 +124,8 @@ public class PollingConfigurationAnalyzer implements Runnable {

private ClusterConfigurationCache configCache;

private final RoleHostCache hostCache = new RoleHostCache();

// Single listener for configuration change events
private ConfigurationChangeListener changeListener;

Expand Down Expand Up @@ -222,14 +231,17 @@ public void run() {
// Configuration changes don't mean anything without corresponding service start/restarts. Therefore, monitor
// start events, and check the configuration only of the restarted service(s) to identify changes
// that should trigger re-discovery.
final List<StartEvent> relevantEvents = getRelevantEvents(address, clusterName);
final List<RelevantEvent> relevantEvents = getRelevantEvents(address, clusterName);

// If there are no recent start events, then nothing to do now
if (!relevantEvents.isEmpty()) {
// If a change has occurred, notify the listeners
if (hasConfigChanged(address, clusterName, relevantEvents)) {
if (hasConfigChanged(address, clusterName, relevantEvents) || hasScaleEvent(relevantEvents)) {
notifyChangeListener(address, clusterName);
}
// these events should not be processed again even if the next CM query result contains them
relevantEvents.forEach(re -> processedEvents.put(re.auditEvent.getId(), 1L));
updateHostCache(relevantEvents);
}
}
}
Expand All @@ -250,7 +262,42 @@ public void run() {
log.stoppedClouderaManagerConfigMonitor();
}

private boolean hasConfigChanged(String address, String clusterName, List<StartEvent> relevantEvents) {
private boolean hasScaleEvent(List<RelevantEvent> relevantEvents) {
boolean found = false;
for (RelevantEvent event: relevantEvents) {
if (processedEvents.getIfPresent(event.auditEvent.getId()) != null) {
log.activationEventAlreadyProcessed(event.auditEvent.getId());
continue;
}
if (event.getRole() != null) {
if (!hostCache.hosts(event.getRole()).containsAll(event.getHosts())) {
log.foundUpScaleEvent(event.getRole(), event.getHosts());
found = true;
break;
}
if (event.isDeactivationEvent()) {
log.foundDownScaleEvent(event.getRole(), event.getHosts());
found = true;
break;
}
}
}
return found;
}

private void updateHostCache(List<RelevantEvent> relevantEvents) {
for (RelevantEvent event: relevantEvents) {
if (event.getRole() != null) {
if (event.isDeactivationEvent()) {
hostCache.removeHosts(event.getRole(), event.getHosts());
} else {
hostCache.addHosts(event.getRole(), event.getHosts());
}
}
}
}

private boolean hasConfigChanged(String address, String clusterName, List<RelevantEvent> relevantEvents) {
// If there are start events, then check the previously-recorded properties for the same service to
// identify if the configuration has changed
final Map<String, ServiceConfigurationModel> serviceConfigurations =
Expand All @@ -260,12 +307,16 @@ private boolean hasConfigChanged(String address, String clusterName, List<StartE
final List<String> handledServiceTypes = new ArrayList<>();

boolean configHasChanged = false;
for (StartEvent re : relevantEvents) {
for (RelevantEvent re : relevantEvents) {
if (processedEvents.getIfPresent(re.auditEvent.getId()) != null) {
log.activationEventAlreadyProcessed(re.auditEvent.getId());
continue;
}

if (re.isDeactivationEvent()) {
continue;
}

String serviceType = re.getServiceType();

if (CM_SERVICE_TYPE.equals(serviceType)) {
Expand Down Expand Up @@ -309,9 +360,6 @@ private boolean hasConfigChanged(String address, String clusterName, List<StartE
}
}

// these events should not be processed again even if the next CM query result contains them
relevantEvents.forEach(re -> processedEvents.put(re.auditEvent.getId(), 1L));

return configHasChanged;
}

Expand Down Expand Up @@ -420,8 +468,8 @@ private DiscoveryApiClient getApiClient(final ServiceDiscoveryConfig discoveryCo
*
* @return A List of StartEvent objects for service start events since the last time they were queried.
*/
private List<StartEvent> getRelevantEvents(final String address, final String clusterName) {
List<StartEvent> relevantEvents = new ArrayList<>();
private List<RelevantEvent> getRelevantEvents(final String address, final String clusterName) {
List<RelevantEvent> relevantEvents = new ArrayList<>();

// Get the last event query timestamp
Instant lastTimestamp = getEventQueryTimestamp(address, clusterName);
Expand All @@ -446,8 +494,8 @@ private List<StartEvent> getRelevantEvents(final String address, final String cl
log.noActivationEventFound();
} else {
for (ApiEvent event : events) {
if(isRelevantEvent(event)) {
relevantEvents.add(new StartEvent(event));
if(isActivationEvent(event) || isDeactivationEvent(event)) {
relevantEvents.add(new RelevantEvent(event));
}
}
}
Expand All @@ -456,17 +504,30 @@ private List<StartEvent> getRelevantEvents(final String address, final String cl
}

@SuppressWarnings("unchecked")
private boolean isRelevantEvent(ApiEvent event) {
private boolean isActivationEvent(ApiEvent event) {
final Map<String, Object> attributeMap = getAttributeMap(event.getAttributes());
final String command = getAttribute(attributeMap, COMMAND);
final String status = getAttribute(attributeMap, COMMAND_STATUS);
final String serviceType = getAttribute(attributeMap, StartEvent.ATTR_SERVICE_TYPE);
final String serviceType = getAttribute(attributeMap, RelevantEvent.ATTR_SERVICE_TYPE);
final String eventCode = getAttribute(attributeMap, RelevantEvent.ATTR_EVENT_CODE);
final boolean serviceModelGeneratorExists = serviceModelGeneratorsHolder.getServiceModelGenerators(serviceType) != null;
final boolean relevant = ACTIVATION_COMMANDS.contains(command) && SUCCEEDED_STATUS.equals(status) && serviceModelGeneratorExists;
final boolean relevant = (ACTIVATION_EVENT_CODES.contains(eventCode)
|| (ACTIVATION_COMMANDS.contains(command) && SUCCEEDED_STATUS.equals(status)))
&& serviceModelGeneratorExists;
log.activationEventRelevance(event.getId(), String.valueOf(relevant), command, status, serviceType, serviceModelGeneratorExists);
return relevant;
}

private boolean isDeactivationEvent(ApiEvent event) {
final Map<String, Object> attributeMap = getAttributeMap(event.getAttributes());
final String serviceType = getAttribute(attributeMap, RelevantEvent.ATTR_SERVICE_TYPE);
final String eventCode = getAttribute(attributeMap, RelevantEvent.ATTR_EVENT_CODE);
final boolean serviceModelGeneratorExists = serviceModelGeneratorsHolder.getServiceModelGenerators(serviceType) != null;
final boolean relevant = DEACTIVATION_EVENT_CODES.contains(eventCode) && serviceModelGeneratorExists;
log.deactivationEventRelevance(event.getId(), String.valueOf(relevant), eventCode, serviceType, relevant);
return relevant;
}

@SuppressWarnings("unchecked")
private String getAttribute( Map<String, Object> attributeMap, String attributeName) {
return attributeMap.containsKey(attributeName) ? ((List<String>) attributeMap.get(attributeName)).get(0) : "";
Expand Down Expand Up @@ -605,26 +666,35 @@ private boolean hasConfigurationChanged(final ServiceConfigurationModel previous
/**
* Internal representation of a ClouderaManager service start event
*/
static final class StartEvent {
static final class RelevantEvent {

private static final String ATTR_CLUSTER = "CLUSTER";
private static final String ATTR_SERVICE_TYPE = "SERVICE_TYPE";
private static final String ATTR_SERVICE = "SERVICE";
private static final String ATTR_ROLE = "ROLE_TYPE";
private static final String ATTR_HOST = "HOSTS";
private static final String ATTR_EVENT_CODE = "EVENTCODE";

private static List<String> attrsOfInterest = new ArrayList<>();

static {
attrsOfInterest.add(ATTR_CLUSTER);
attrsOfInterest.add(ATTR_SERVICE_TYPE);
attrsOfInterest.add(ATTR_SERVICE);
attrsOfInterest.add(ATTR_ROLE);
attrsOfInterest.add(ATTR_HOST);
attrsOfInterest.add(ATTR_EVENT_CODE);
}

private ApiEvent auditEvent;
private String clusterName;
private String serviceType;
private String service;
private String role;
private String eventCode;
private Set<String> hosts = new HashSet<>();

StartEvent(final ApiEvent auditEvent) {
RelevantEvent(final ApiEvent auditEvent) {
if (ApiEventCategory.AUDIT_EVENT != auditEvent.getCategory()) {
throw new IllegalArgumentException("Invalid event category " + auditEvent.getCategory().getValue());
}
Expand Down Expand Up @@ -652,6 +722,18 @@ String getService() {
return service;
}

Set<String> getHosts() {
return hosts;
}

String getRole() {
return role;
}

boolean isDeactivationEvent() {
return EVENT_CODE_ROLE_DELETED.equals(eventCode);
}

private void setPropertyFromAttribute(final ApiEventAttribute attribute) {
switch (attribute.getName()) {
case ATTR_CLUSTER:
Expand All @@ -663,9 +745,19 @@ private void setPropertyFromAttribute(final ApiEventAttribute attribute) {
case ATTR_SERVICE:
service = attribute.getValues().get(0);
break;
case ATTR_HOST:
if (attribute.getValues() != null && !attribute.getValues().isEmpty()) {
hosts.addAll(attribute.getValues());
}
break;
case ATTR_ROLE:
role = attribute.getValues().get(0);
break;
case ATTR_EVENT_CODE:
eventCode = attribute.getValues().get(0);
break;
default:
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.knox.gateway.topology.discovery.cm.monitor;

import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

public class RoleHostCache {
private final Map<String, Set<String>> hosts = new ConcurrentHashMap<>();

public void addHosts(String role, Set<String> hosts) {
hosts(role).addAll(hosts);
}

public void removeHosts(String role, Set<String> hosts) {
hosts(role).removeAll(hosts);
}

public Set<String> hosts(String role) {
return hosts.computeIfAbsent(role, k -> new HashSet<>());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,38 @@ public void testClusterConfigMonitorTerminationForNoLongerReferencedClusters() {
}
}

@Test
public void testNotificationSentAfterDownScaleEvent() {
final String clusterName = "Cluster T";

final List<ApiEventAttribute> revisionEventAttrs = new ArrayList<>();
revisionEventAttrs.add(createEventAttribute("CLUSTER", clusterName));
revisionEventAttrs.add(createEventAttribute("SERVICE_TYPE", HiveOnTezServiceModelGenerator.SERVICE_TYPE));
revisionEventAttrs.add(createEventAttribute("SERVICE", HiveOnTezServiceModelGenerator.SERVICE));
revisionEventAttrs.add(createEventAttribute("ROLE_TYPE", HiveOnTezServiceModelGenerator.ROLE_TYPE));
revisionEventAttrs.add(createEventAttribute("REVISION", "215"));
revisionEventAttrs.add(createEventAttribute("EVENTCODE", PollingConfigurationAnalyzer.EVENT_CODE_ROLE_DELETED));
final ApiEvent revisionEvent = createApiEvent(ApiEventCategory.AUDIT_EVENT, revisionEventAttrs, null);

doTestEventWithConfigChange(revisionEvent, clusterName);
}

@Test
public void testNotificationSentAfterUpScaleEvent() {
final String clusterName = "Cluster T";

final List<ApiEventAttribute> revisionEventAttrs = new ArrayList<>();
revisionEventAttrs.add(createEventAttribute("CLUSTER", clusterName));
revisionEventAttrs.add(createEventAttribute("SERVICE_TYPE", HiveOnTezServiceModelGenerator.SERVICE_TYPE));
revisionEventAttrs.add(createEventAttribute("SERVICE", HiveOnTezServiceModelGenerator.SERVICE));
revisionEventAttrs.add(createEventAttribute("ROLE_TYPE", HiveOnTezServiceModelGenerator.ROLE_TYPE));
revisionEventAttrs.add(createEventAttribute("REVISION", "215"));
revisionEventAttrs.add(createEventAttribute("EVENTCODE", PollingConfigurationAnalyzer.EVENT_CODE_ROLE_CREATED));
final ApiEvent revisionEvent = createApiEvent(ApiEventCategory.AUDIT_EVENT, revisionEventAttrs, null);

doTestEventWithConfigChange(revisionEvent, clusterName);
}

private void doTestStartEvent(final ApiEventCategory category) {
final String clusterName = "My Cluster";
final String serviceType = NameNodeServiceModelGenerator.SERVICE_TYPE;
Expand All @@ -392,7 +424,7 @@ private void doTestStartEvent(final ApiEventCategory category) {
apiEventAttrs.add(createEventAttribute("SERVICE", service));
ApiEvent apiEvent = createApiEvent(category, apiEventAttrs, null);

PollingConfigurationAnalyzer.StartEvent restartEvent = new PollingConfigurationAnalyzer.StartEvent(apiEvent);
PollingConfigurationAnalyzer.RelevantEvent restartEvent = new PollingConfigurationAnalyzer.RelevantEvent(apiEvent);
assertNotNull(restartEvent);
assertEquals(clusterName, restartEvent.getClusterName());
assertEquals(serviceType, restartEvent.getServiceType());
Expand Down
Loading

0 comments on commit dbe41a2

Please sign in to comment.