Skip to content

Commit

Permalink
Refactor metacat-connector-druid http client to support custom extens…
Browse files Browse the repository at this point in the history
…ions (#622)
  • Loading branch information
jtuglu-netflix authored Dec 6, 2024
1 parent 60f8e27 commit d579ca9
Show file tree
Hide file tree
Showing 8 changed files with 110 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@
/**
* Druid Config Constants.
*
* @author zhenl
* @author zhenl jtuglu
* @since 1.2.0
*/
public final class DruidConfigConstants {
/**
* DRUID_COORDINATOR_URI.
* DRUID_ROUTER_URI .
*/
public static final String DRUID_COORDINATOR_URI = "druid.uri";
public static final String DRUID_ROUTER_URI = "druid.uri";

//Http client
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,30 @@
import com.netflix.metacat.common.server.connectors.ConnectorTableService;
import com.netflix.metacat.common.server.connectors.SpringConnectorFactory;
import com.netflix.metacat.connector.druid.configs.DruidConnectorConfig;
import com.netflix.metacat.connector.druid.configs.DruidHttpClientConfig;
import com.netflix.metacat.connector.druid.configs.BaseDruidHttpClientConfig;
import com.netflix.metacat.connector.druid.converter.DruidConnectorInfoConverter;

/**
* Druid Connector Factory.
*
* @author zhenl
* @author zhenl jtuglu
* @since 1.2.0
*/
public class DruidConnectorFactory extends SpringConnectorFactory {
/**
* Constructor.
*
* @param connectorContext connector config
* @param druidConnectorInfoConverter connector info converter
* @param connectorContext connector context
* @param clientConfigClass config class to register
*/
DruidConnectorFactory(
public DruidConnectorFactory(
final DruidConnectorInfoConverter druidConnectorInfoConverter,
final ConnectorContext connectorContext
final ConnectorContext connectorContext,
final Class<? extends BaseDruidHttpClientConfig> clientConfigClass
) {
super(druidConnectorInfoConverter, connectorContext);
super.registerClazz(DruidConnectorConfig.class, DruidHttpClientConfig.class);
super.registerClazz(DruidConnectorConfig.class, clientConfigClass);
super.refresh();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@
import com.netflix.metacat.common.server.connectors.ConnectorFactory;
import com.netflix.metacat.common.server.connectors.ConnectorPlugin;
import com.netflix.metacat.common.server.connectors.ConnectorTypeConverter;
import com.netflix.metacat.connector.druid.configs.DruidHttpClientConfig;
import com.netflix.metacat.connector.druid.converter.DruidConnectorInfoConverter;

/**
* Druid Connector Plugin.
*
* @author zhenl
* @author zhenl jtuglu
* @since 1.2.0
*/
public class DruidConnectorPlugin implements ConnectorPlugin {
Expand All @@ -45,7 +46,10 @@ public String getType() {
@Override
public ConnectorFactory create(final ConnectorContext connectorContext) {
return new DruidConnectorFactory(
new DruidConnectorInfoConverter(connectorContext.getCatalogName()), connectorContext);
new DruidConnectorInfoConverter(connectorContext.getCatalogName()),
connectorContext,
DruidHttpClientConfig.class
);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@
/**
* DruidHttpClientImpl.
*
* @author zhenl
* @author zhenl jtuglu
* @since 1.2.0
*/
@Slf4j
public class DruidHttpClientImpl implements MetacatDruidClient {
private String druidURI;
private final RestTemplate restTemplate;
protected String druidURI;
protected final RestTemplate restTemplate;
private final MetacatJsonLocator jsonLocator = new MetacatJsonLocator();

/**
Expand All @@ -56,16 +56,16 @@ public DruidHttpClientImpl(final ConnectorContext connectorContext,
final RestTemplate restTemplate) {
this.restTemplate = restTemplate;
final Map<String, String> config = connectorContext.getConfiguration();
final String coordinatorUri = config.get(DruidConfigConstants.DRUID_COORDINATOR_URI);
if (coordinatorUri == null) {
final String routerUri = config.get(DruidConfigConstants.DRUID_ROUTER_URI);
if (routerUri == null) {
throw new MetacatException("Druid cluster ending point not provided.");
}
try {
new URI(coordinatorUri);
new URI(routerUri);
} catch (URISyntaxException exception) {
throw new MetacatException("Druid ending point invalid");
}
this.druidURI = coordinatorUri;
this.druidURI = routerUri;
log.info("druid server uri={}", this.druidURI);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ private DruidHttpClientUtil() {
* get Latest Segment.
*
* @param input segments strings
* @return lastest segment id
* @return latest segment id
*/
public static String getLatestSegment(final String input) {
final String[] segments = input.substring(1, input.length() - 1).split(",");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package com.netflix.metacat.connector.druid.configs;

import com.netflix.metacat.common.server.connectors.ConnectorContext;
import com.netflix.metacat.common.server.connectors.util.TimeUtil;
import com.netflix.metacat.connector.druid.DruidConfigConstants;
import com.netflix.metacat.connector.druid.MetacatDruidClient;
import org.apache.http.client.HttpClient;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.springframework.context.annotation.Bean;
import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
import org.springframework.web.client.RestTemplate;

import java.util.concurrent.TimeUnit;

/**
* BaseDruidHttpClientConfig.
*
* @author zhenl jtuglu
* @since 1.2.0
*/
public abstract class BaseDruidHttpClientConfig {
/**
* Druid client instance.
*
* @param connectorContext connector context
* @param restTemplate rest template
* @return MetacatDruidClient
*/
@Bean
public abstract MetacatDruidClient createMetacatDruidClient(
final ConnectorContext connectorContext,
final RestTemplate restTemplate
);

/**
* Rest template.
*
* @param connectorContext connector context
* @return RestTemplate
*/
@Bean
public RestTemplate restTemplate(final ConnectorContext connectorContext) {
return new RestTemplate(new HttpComponentsClientHttpRequestFactory(httpClient(connectorContext)));
}

/**
* Http client.
*
* @param connectorContext connector context
* @return HttpClient
*/
@Bean
public HttpClient httpClient(final ConnectorContext connectorContext) {
final int timeout = (int) TimeUtil.toTime(
connectorContext.getConfiguration().getOrDefault(DruidConfigConstants.HTTP_TIMEOUT, "5s"),
TimeUnit.SECONDS,
TimeUnit.MILLISECONDS
);
final int poolSize = Integer.parseInt(connectorContext.getConfiguration()
.getOrDefault(DruidConfigConstants.POOL_SIZE, "10"));
final RequestConfig config = RequestConfig.custom()
.setConnectTimeout(timeout)
.setConnectionRequestTimeout(timeout)
.setSocketTimeout(timeout)
.setMaxRedirects(3)
.build();
final PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager();
connectionManager.setMaxTotal(poolSize);
return HttpClientBuilder
.create()
.setDefaultRequestConfig(config)
.setConnectionManager(connectionManager)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,83 +17,30 @@
package com.netflix.metacat.connector.druid.configs;

import com.netflix.metacat.common.server.connectors.ConnectorContext;
import com.netflix.metacat.common.server.connectors.util.TimeUtil;
import com.netflix.metacat.connector.druid.DruidConfigConstants;
import com.netflix.metacat.connector.druid.MetacatDruidClient;
import com.netflix.metacat.connector.druid.client.DruidHttpClientImpl;
import org.apache.http.client.HttpClient;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
import org.springframework.web.client.RestTemplate;

import java.net.UnknownHostException;
import java.util.concurrent.TimeUnit;

/**
* DruidHttpClientConfig.
*
* @author zhenl
* @author zhenl jtuglu
* @since 1.2.0
*/
@Configuration
public class DruidHttpClientConfig {
public class DruidHttpClientConfig extends BaseDruidHttpClientConfig {
/**
* Druid client instance.
*
* @param connectorContext connector context
* @param restTemplate rest template
* @return MetacatDruidClient
* @throws UnknownHostException exception for unknownhost
*/
@Bean
@Override
public MetacatDruidClient createMetacatDruidClient(
final ConnectorContext connectorContext,
final RestTemplate restTemplate) throws UnknownHostException {
final RestTemplate restTemplate) {
return new DruidHttpClientImpl(connectorContext, restTemplate);
}

/**
* Rest template.
*
* @param connectorContext connector context
* @return RestTemplate
*/
@Bean
public RestTemplate restTemplate(final ConnectorContext connectorContext) {
return new RestTemplate(new HttpComponentsClientHttpRequestFactory(httpClient(connectorContext)));
}

/**
* Http client.
*
* @param connectorContext connector context
* @return HttpClient
*/
@Bean
public HttpClient httpClient(final ConnectorContext connectorContext) {
final int timeout = (int) TimeUtil.toTime(
connectorContext.getConfiguration().getOrDefault(DruidConfigConstants.HTTP_TIMEOUT, "5s"),
TimeUnit.SECONDS,
TimeUnit.MILLISECONDS
);
final int poolsize = Integer.parseInt(connectorContext.getConfiguration()
.getOrDefault(DruidConfigConstants.POOL_SIZE, "10"));
final RequestConfig config = RequestConfig.custom()
.setConnectTimeout(timeout)
.setConnectionRequestTimeout(timeout)
.setSocketTimeout(timeout)
.setMaxRedirects(3)
.build();
final PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager();
connectionManager.setMaxTotal(poolsize);
return HttpClientBuilder
.create()
.setDefaultRequestConfig(config)
.setConnectionManager(connectionManager)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ import spock.lang.Specification

/**
* DruidHttpClientUtilSpec.
* @author zhenl
* @author zhenl jtuglu
* @since 1.2.0
*/
class DruidHttpClientUtilSpec extends Specification{
class DruidHttpClientUtilSpec extends Specification {

def "Test for getLatestDataByName"() {

when:
Expand Down

0 comments on commit d579ca9

Please sign in to comment.