Skip to content

Commit

Permalink
add EtcdConfigurationSource
Browse files Browse the repository at this point in the history
  • Loading branch information
spoon16 committed Jun 12, 2015
1 parent f1c43f3 commit f7edacc
Show file tree
Hide file tree
Showing 2 changed files with 298 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package com.netflix.config.source;

import com.google.common.base.Objects;
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.netflix.config.WatchedConfigurationSource;
import com.netflix.config.WatchedUpdateListener;
import com.netflix.config.WatchedUpdateResult;
import org.boon.core.Handler;
import org.boon.etcd.Etcd;
import org.boon.etcd.Node;
import org.boon.etcd.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;


/**
* Implementation of the dynamic {@link WatchedConfigurationSource} for Etcd
*
* This implementation requires the path to the Etcd directory that contains
* nodes that represent each managed configuration property.
*
* An example Etcd configuration path is /<my-app>/config
* An example Etcd property node path is /<my-app>/config/com.fluxcapacitor.my.property
*
* When a property is mutated via Etcd a callback will be notified and the value managed
* by EtcdConfigurationSource will be updated. Similar to other dynamic configuration
* source (ie. DynamoDB, etc.)
*
* @author spoon16
*/
public class EtcdConfigurationSource implements WatchedConfigurationSource {
private static final Logger logger = LoggerFactory.getLogger(EtcdConfigurationSource.class);
private static final Splitter keySplitter = Splitter.on('/');

private final Map<String, Object> valueCache = Maps.newConcurrentMap();
private final List<WatchedUpdateListener> listeners = new CopyOnWriteArrayList<WatchedUpdateListener>();

private final Etcd etcd;
private final String configPath;

private Handler<Response> updateHandler = new Handler<Response>() {
@Override
public void handle(Response updateResponse) {
if (updateResponse.wasError()) {
logger.error("Etcd failed with an error response: %s", updateResponse);
}

final Map<String, Object> create = Maps.newHashMap();
final Map<String, Object> set = Maps.newHashMap();
final Map<String, Object> delete = Maps.newHashMap();

final String action = updateResponse.action().toLowerCase();
final Node node = updateResponse.node();

if (node != null ) {
final String etcdKey = node.key();
final String sourceKey = Iterables.getLast(keySplitter.split(etcdKey));
final String value = node.getValue();
valueCache.put(sourceKey, value);

switch (action) {
case "create":
create.put(sourceKey, value);
break;

case "set":
set.put(sourceKey, value);
break;

case "delete":
delete.put(sourceKey, value);
break;

default:
logger.warn("unrecognized action, response: %s", updateResponse);
break;
}

final WatchedUpdateResult result = WatchedUpdateResult.createIncremental(create, set, delete);
updateConfiguration(result);
}

etcd.waitRecursive(updateHandler, configPath);
}
};

/**
* Initialize EtcdConfigurationSource with property values @ configPath
*
* @param Etcd etcd
*/
public EtcdConfigurationSource(Etcd etcd, String configPath) {
this.etcd = etcd;
this.configPath = Objects.firstNonNull(configPath, "").replaceAll("^/+","");
init();
}

private void init() {
final Response listResponse = etcd.list(configPath);
cacheValues(listResponse.node());
etcd.waitRecursive(updateHandler, configPath);
}

private void cacheValues(Node configNode) {
for (Node valueNode : configNode.getNodes()) {
final String etcdKey = valueNode.key();
final String sourceKey = Iterables.getLast(keySplitter.split(etcdKey));
final String value = valueNode.getValue();
valueCache.put(sourceKey, value);
}
}

@Override
public Map<String, Object> getCurrentData() throws Exception {
return valueCache;
}

@Override
public void addUpdateListener(WatchedUpdateListener l) {
if (l != null) {
listeners.add(l);
}
}

@Override
public void removeUpdateListener(WatchedUpdateListener l) {
if (l != null) {
listeners.remove(l);
}
}

private void updateConfiguration(WatchedUpdateResult result) {
for (WatchedUpdateListener l : listeners) {
try {
l.updateConfiguration(result);
} catch (Throwable ex) {
logger.error("Error in invoking WatchedUpdateListener", ex);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package com.netflix.config.source;

import com.google.common.collect.Lists;
import com.netflix.config.*;
import org.boon.core.Handler;
import org.boon.etcd.ClientBuilder;
import org.boon.etcd.Etcd;
import org.boon.etcd.Node;
import org.boon.etcd.Response;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.*;

/**
* Tests the implementation of {@link EtcdConfigurationSource}.
*
* @author spoon16
*/
public class EtcdConfigurationSourceTest {
private static final Logger logger = LoggerFactory.getLogger(EtcdConfigurationSourceTest.class);

private static final Etcd ETCD = mock(Etcd.class);

// uncomment to use local/vagrant CoreOS VM running Etcd
// private static final Etcd ETCD = ClientBuilder.builder().hosts(URI.create("http://172.17.8.101:4001")).createClient();

private static final String CONFIG_PATH = "config";
private static final Response ETCD_LIST_RESPONSE = new Response("get", 200,
new Node("/config", null, 1378, 1378, 0, true, Lists.newArrayList(
new Node("/config/test.key1", "test.value1-etcd", 19311, 19311, 0, false, null),
new Node("/config/test.key4", "test.value4-etcd", 1388, 1388, 0, false, null),
new Node("/config/test.key6", "test.value6-etcd", 1232, 1232, 0, false, null)
)));
private static Handler<Response> ETCD_UPDATE_HANDLER;
private static final Answer WITH_ETCD_UPDATE_HANDLER = new Answer() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
ETCD_UPDATE_HANDLER = (Handler<Response>) invocation.getArguments()[0];
return null;
}
};
private static EtcdConfigurationSource ETCD_CONFIGURATION_SOURCE;
private static DynamicWatchedConfiguration ETCD_CONFIGURATION;
private static final ConcurrentMapConfiguration MAP_CONFIGURATION = new ConcurrentMapConfiguration();
private static final ConcurrentMapConfiguration SYSTEM_CONFIGURATION = new ConcurrentMapConfiguration();

@BeforeClass
public static void before() throws Exception {
final ConcurrentCompositeConfiguration compositeConfig = new ConcurrentCompositeConfiguration();

doReturn(ETCD_LIST_RESPONSE).when(ETCD).list(anyString());
doAnswer(WITH_ETCD_UPDATE_HANDLER).when(ETCD).waitRecursive(any(Handler.class), anyString());
ETCD_CONFIGURATION_SOURCE = new EtcdConfigurationSource(ETCD, CONFIG_PATH);
ETCD_CONFIGURATION = new DynamicWatchedConfiguration(ETCD_CONFIGURATION_SOURCE);

compositeConfig.addConfiguration(ETCD_CONFIGURATION, "etcd dynamic override configuration");

MAP_CONFIGURATION.addProperty("test.key1", "test.value1-map");
MAP_CONFIGURATION.addProperty("test.key2", "test.value2-map");
MAP_CONFIGURATION.addProperty("test.key3", "test.value3-map");
MAP_CONFIGURATION.addProperty("test.key4", "test.value4-map");
compositeConfig.addConfiguration(MAP_CONFIGURATION, "map configuration");

System.setProperty("test.key4", "test.value4-system");
System.setProperty("test.key5", "test.value5-system");
SYSTEM_CONFIGURATION.loadProperties(System.getProperties());
compositeConfig.addConfiguration(SYSTEM_CONFIGURATION, "system configuration");

ConfigurationManager.install(compositeConfig);
}

/**
* should return value from EtcdConfigurationSource when EtcdConfigurationSource provides key
*/
@Test
public void testEtcdPropertyOverride() throws Exception {
// there is a etcd value for this key
assertEquals("test.value1-etcd", DynamicPropertyFactory.getInstance().getStringProperty("test.key1", "default").get());
}

/**
* should return map configuration source value when EtcdConfigurationSource does not provide key
*/
@Test
public void testNoEtcdPropertyOverride() throws Exception {
// there is not etcd value for this key but there is a configuration source that provides this key
assertEquals("test.value2-map", DynamicPropertyFactory.getInstance().getStringProperty("test.key2", "default").get());
}

/**
* should return default value when no configuration source provides key
*/
@Test
public void testDefault() throws Exception {
// no configuration source for key
assertEquals("default", DynamicPropertyFactory.getInstance().getStringProperty("test.key99", "default").get());
}

/**
* should select lower priority configuration sources selected when EtcdConfigurationSource does not provide key
*/
@Test
public void testSystemPropertyOverride() throws Exception {
// system configuration provides key, etcd configuration provides key, source = etcd configuration
assertEquals("test.value4-etcd", DynamicPropertyFactory.getInstance().getStringProperty("test.key4", "default").get());

// system configuration provides key, etcd configuration does not provide key, source = system configuration
assertEquals("test.value5-system", DynamicPropertyFactory.getInstance().getStringProperty("test.key5", "default").get());
}

/**
* should not override EtcdConfigurationSource when lower priority configuration source is updated
*/
@Test
public void testUpdateOverriddenProperty() throws Exception {
final String updateProperty = "test.key1";

// update the map config's property and assert that the value is still the overridden value
MAP_CONFIGURATION.setProperty(updateProperty, "prop1");
assertEquals("test.value1-etcd", DynamicPropertyFactory.getInstance().getStringProperty(updateProperty, "default").get());
}

/**
* should update EtcdConfigurationSource when Etcd client handles writes
*/
@Test
public void testUpdateEtcdProperty() throws Exception {
final String updateProperty = "test.key6";
final String updateKey = CONFIG_PATH + "/" + updateProperty;
final String updateValue = "test.value6-etcd-override";
final String initialValue = "test.value6-etcd";

assertEquals(initialValue, DynamicPropertyFactory.getInstance().getStringProperty("test.key6", "default").get());

ETCD_UPDATE_HANDLER.handle(new Response("set", 200, new Node(updateKey, updateValue, 19444, 19444, 0, false, null)));
assertEquals(updateValue, DynamicPropertyFactory.getInstance().getStringProperty("test.key6", "default").get());
}
}

0 comments on commit f7edacc

Please sign in to comment.