Skip to content

Commit

Permalink
add: 添加resilience4j熔断器
Browse files Browse the repository at this point in the history
  • Loading branch information
yukdawn committed Apr 10, 2022
1 parent 2bde477 commit 14fd4d2
Show file tree
Hide file tree
Showing 13 changed files with 184 additions and 146 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@
<!-- https://mvnrepository.com/artifact/io.github.resilience4j/resilience4j-spring -->
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-spring</artifactId>
<artifactId>resilience4j-spring-boot2</artifactId>
<version>${resilience4j-spring.version}</version>
<scope>provided</scope>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

import com.alibaba.csp.sentinel.SphU;
import com.github.lianjiatech.retrofit.spring.boot.degrade.DegradeRuleRegister;
import com.github.lianjiatech.retrofit.spring.boot.degrade.Resilience4jDegradeRuleRegister;
import com.github.lianjiatech.retrofit.spring.boot.degrade.SentinelDegradeRuleRegister;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
Expand All @@ -16,6 +20,7 @@
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.jackson.JacksonAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.ApplicationContext;
Expand Down Expand Up @@ -132,9 +137,18 @@ public RetrofitConfigBean retrofitConfigBean(ObjectProvider<DegradeRuleRegister>

@Bean
@ConditionalOnMissingBean
@ConditionalOnClass(com.alibaba.csp.sentinel.SphU.class)
@ConditionalOnClass(SphU.class)
@ConditionalOnProperty(name = "retrofit.degrade.degrade-type", havingValue = "sentinel")
public DegradeRuleRegister sentinelDegradeRuleRegister(){
return new SentinelDegradeRuleRegister(retrofitProperties.getDegrade());
return new SentinelDegradeRuleRegister();
}

@Bean
@ConditionalOnMissingBean
@ConditionalOnClass(CircuitBreaker.class)
@ConditionalOnProperty(name = "retrofit.degrade.degrade-type", havingValue = "resilience4j")
public DegradeRuleRegister resilience4JDegradeRuleRegister(CircuitBreakerRegistry circuitBreakerRegistry){
return new Resilience4jDegradeRuleRegister(circuitBreakerRegistry);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,30 +122,16 @@ private void loadDegradeRules() {
return;
}
Assert.notNull(degradeRuleRegister, "[DegradeRuleRegister] not found bean instance");
DegradeType degradeType = degradeProperty.getDegradeType();
switch (degradeType) {
case SENTINEL: {
Method[] methods = retrofitInterface.getMethods();
List<RetrofitDegradeRule> retrofitDegradeRuleList =
Arrays.stream(methods).map(this::convertSentinelRule).filter(Objects::nonNull).collect(Collectors.toList());
degradeRuleRegister.batchRegister(retrofitDegradeRuleList);
break;
}
case RESILIENCE4J: {
break;
}
default: {
throw new IllegalArgumentException("Not currently supported! degradeType=" + degradeType);
}

}


Method[] methods = retrofitInterface.getMethods();
List<RetrofitDegradeRule> retrofitDegradeRuleList = Arrays.stream(methods)
.map(this::convertSentinelRule)
.filter(Objects::nonNull)
.collect(Collectors.toList());
degradeRuleRegister.batchRegister(retrofitDegradeRuleList);
}

/**
* TODO 可以优化 和{@link com.github.lianjiatech.retrofit.spring.boot.degrade.SentinelDegradeRuleRegister#convert(com.github.lianjiatech.retrofit.spring.boot.degrade.RetrofitDegradeRule)}放到一起
* Sentinel 规则转换器
* 提取熔断规则,优先级为方法>类>默认
* @param method method
* @return RetrofitDegradeRule
*/
Expand All @@ -164,19 +150,11 @@ private RetrofitDegradeRule convertSentinelRule(Method method) {
} else {
degrade = retrofitInterface.getAnnotation(Degrade.class);
}

if (degrade == null) {
return null;
}

DegradeStrategy degradeStrategy = degrade.degradeStrategy();
BaseResourceNameParser resourceNameParser = retrofitConfigBean.getResourceNameParser();
String resourceName = resourceNameParser.parseResourceName(method, environment);

RetrofitDegradeRule degradeRule = new RetrofitDegradeRule();
degradeRule.setCount(degrade.count());
degradeRule.setDegradeStrategy(degradeStrategy);
degradeRule.setTimeWindow(degrade.timeWindow());
degradeRule.setCount(Optional.ofNullable(degrade).map(Degrade::count).orElse(null));
degradeRule.setTimeWindow(Optional.ofNullable(degrade).map(Degrade::timeWindow).orElse(null));
degradeRule.setResourceName(resourceName);
return degradeRule;
}
Expand Down Expand Up @@ -303,25 +281,11 @@ private synchronized OkHttpClient getOkHttpClient(Class<?> retrofitClientInterfa
// TODO 这里稍微有点问题,开启熔断则所有实例都会加熔断拦截器,但是拦截器升不生效则取决于是否配置了@Degrade注解,可不可以把这里做成有一个全局默认配置,有@Degrade则走单独配置?
DegradeProperty degradeProperty = retrofitProperties.getDegrade();
if (degradeProperty.isEnable()) {
DegradeType degradeType = degradeProperty.getDegradeType();
switch (degradeType) {
case SENTINEL: {
try {
Class.forName("com.alibaba.csp.sentinel.SphU");
DegradeInterceptor degradeInterceptor = new DegradeInterceptor();
degradeInterceptor.setEnvironment(environment);
degradeInterceptor.setResourceNameParser(retrofitConfigBean.getResourceNameParser());
degradeInterceptor.setDegradeRuleRegister(retrofitConfigBean.getDegradeRuleRegister());
okHttpClientBuilder.addInterceptor(degradeInterceptor);
} catch (ClassNotFoundException e) {
logger.warn("com.alibaba.csp.sentinel not found! No SentinelDegradeInterceptor is set.");
}
break;
}
default: {
throw new IllegalArgumentException("Not currently supported! degradeType=" + degradeType);
}
}
DegradeInterceptor degradeInterceptor = new DegradeInterceptor();
degradeInterceptor.setEnvironment(environment);
degradeInterceptor.setResourceNameParser(retrofitConfigBean.getResourceNameParser());
degradeInterceptor.setDegradeRuleRegister(retrofitConfigBean.getDegradeRuleRegister());
okHttpClientBuilder.addInterceptor(degradeInterceptor);
}

// add ServiceInstanceChooserInterceptor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,21 @@
import java.lang.annotation.*;

/**
* @author 陈添明
* 应仅采用异常比例模式来控制熔断,超时导致的报错应在okhttp这一层做
* @author 陈添明 [email protected]
*/
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD, ElementType.TYPE})
@Documented
public @interface Degrade {

/**
* RT threshold or exception ratio threshold count.
* 异常比例
*/
double count();
float count();

/**
* Degrade recover timeout (in seconds) when degradation occurs.
* 时间窗口size,单位:秒
*/
int timeWindow() default 5;

/**
* Degrade strategy (0: average RT, 1: exception ratio).
*/
DegradeStrategy degradeStrategy() default DegradeStrategy.AVERAGE_RT;
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,27 @@
import java.io.IOException;
import java.util.List;

import okhttp3.Response;

/**
* @author [email protected] 2022/4/5 23:14
*/
public interface DegradeRuleRegister {

void register(RetrofitDegradeRule retrofitDegradeRule);

/**
* 批量注册规则
* @param retrofitDegradeRuleList 规则描述对象集合
*/
void batchRegister(List<RetrofitDegradeRule> retrofitDegradeRuleList);

<T> T exec(String resourceName, DegradeProxyMethod<T> func) throws IOException;
/**
* 使用规则代理执行目标方法
* @param resourceName 资源名称
* @param func 目标方法
* @return okhttp响应
* @throws IOException IOException
*/
Response exec(String resourceName, DegradeProxyMethod<Response> func) throws IOException;

@FunctionalInterface
interface DegradeProxyMethod<R>{
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package com.github.lianjiatech.retrofit.spring.boot.degrade;

import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

import io.github.resilience4j.circuitbreaker.CallNotPermittedException;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry;
import io.github.resilience4j.core.StopWatch;
import okhttp3.Response;
import org.springframework.util.CollectionUtils;

/**
* @author [email protected] 2022/4/5 23:15
*/
public class Resilience4jDegradeRuleRegister implements DegradeRuleRegister{

private final CircuitBreakerRegistry circuitBreakerRegistry;

public Resilience4jDegradeRuleRegister(CircuitBreakerRegistry circuitBreakerRegistry) {
this.circuitBreakerRegistry = circuitBreakerRegistry;
}

@Override
public void batchRegister(List<RetrofitDegradeRule> ruleList) {
if (CollectionUtils.isEmpty(ruleList)){
return;
}
for (RetrofitDegradeRule rule : ruleList) {
circuitBreakerRegistry.circuitBreaker(rule.getResourceName(), this.convert(rule));
}

}

@Override
public Response exec(String resourceName, DegradeProxyMethod<Response> func) throws IOException {
CircuitBreaker circuitBreaker = circuitBreakerRegistry.circuitBreaker(resourceName);
System.out.println("当前断路器状态: "+ circuitBreaker.getState());

final StopWatch stopWatch = StopWatch.start();
try {
circuitBreaker.acquirePermission();
final Response response = func.get();
circuitBreaker.onResult(stopWatch.stop().toNanos(), TimeUnit.NANOSECONDS, response);
return response;
}catch (CallNotPermittedException e){
throw new RetrofitBlockException(e);
} catch (Exception exception) {
circuitBreaker.onError(stopWatch.stop().toNanos(), TimeUnit.NANOSECONDS, exception);
throw exception;
}
}

private CircuitBreakerConfig convert(RetrofitDegradeRule rule){
// add degrade rule
CircuitBreakerConfig.Builder circuitBreakerBuilder = CircuitBreakerConfig.from(newInstanceByDefault());
if (Objects.nonNull(rule.getCount())){
circuitBreakerBuilder.failureRateThreshold(rule.getCount());
}
if (Objects.nonNull(rule.getTimeWindow())){
circuitBreakerBuilder.slidingWindowSize(rule.getTimeWindow());
}
return circuitBreakerBuilder.build();
}

public CircuitBreakerConfig newInstanceByDefault(){
// 断路器配置
return CircuitBreakerConfig.custom()
// 滑动窗口的类型为时间窗口
.slidingWindowType(CircuitBreakerConfig.SlidingWindowType.TIME_BASED)
// 时间窗口的大小为60秒
.slidingWindowSize(60)
// 在单位时间窗口内最少需要10次调用才能开始进行统计计算
.minimumNumberOfCalls(10)
// 在单位时间窗口内调用失败率达到60%后会启动断路器
.failureRateThreshold(60)
// 允许断路器自动由打开状态转换为半开状态
.enableAutomaticTransitionFromOpenToHalfOpen()
// 在半开状态下允许进行正常调用的次数
.permittedNumberOfCallsInHalfOpenState(5)
// 断路器打开状态转换为半开状态需要等待60秒
.waitDurationInOpenState(Duration.ofSeconds(60))
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,9 @@ public class RetrofitDegradeRule {

private String resourceName;

private double count;
private Float count;

private int timeWindow;

private DegradeStrategy degradeStrategy;
private Integer timeWindow;

public String getResourceName() {
return resourceName;
Expand All @@ -21,27 +19,20 @@ public void setResourceName(String resourceName) {
this.resourceName = resourceName;
}

public double getCount() {
public Float getCount() {
return count;
}

public void setCount(double count) {
public void setCount(Float count) {
this.count = count;
}

public int getTimeWindow() {
public Integer getTimeWindow() {
return timeWindow;
}

public void setTimeWindow(int timeWindow) {
public void setTimeWindow(Integer timeWindow) {
this.timeWindow = timeWindow;
}

public DegradeStrategy getDegradeStrategy() {
return degradeStrategy;
}

public void setDegradeStrategy(DegradeStrategy degradeStrategy) {
this.degradeStrategy = degradeStrategy;
}
}
Loading

0 comments on commit 14fd4d2

Please sign in to comment.