- 灰度发布
- AB Test实践
- 逻辑架构图
- 这个是外部访问内部,内部怎么访问外部?
- 实现demo
- 解析
- 关键修改点
- 实践
- 优化点
- ReactiveLoadBalancerClientFilter源码
灰度发布分几种:蓝绿、ABTest以及金丝雀
灰度作用:为了减少灰度版本对生产环境的影响
| 灰度策略 | 内容 | 缺点 |
|---|---|---|
| 蓝绿 | 会有两套环境,灰度发布之后,流量从绿切到蓝,然后进行验证 | 环境比较浪费,而且生产直接路由到灰度版本 |
| ABTest | 在网关层对不同用户进行路由,只有特定用户切流量到灰度版本,这样的话不会影响生产用户 | 相对流量权重比较复杂,需要对用户进行区分对待 |
| 金丝雀 | 将部分流量切换过去,然后进行验证灰度的准确性,如果没有问题则流量全部切换 | 还是那个生产用户直接测试灰度版本 |
这是第一版本,我们实现一个自定义loadBalance,filter拿到对应header头,或者说被动打标,通过不同的域名,来进行路由
流量路由的时候,基于nacos注册服务里面metaData标识,来决定路由到哪台服务
灰度测试完之后滚动更新生产pod
fegin或者http请求,灰度版本服务通过域名来访问
dubbo请求,本身也有负载均衡器,需要拿到对应的标识,比如说版本号来负载
进阶第二版
在第一版我们自定义了loadBalance,以及路由标识,比如说网关配置lb:xxx,为啥是lb开头,大家可以看下ReactiveLoadBalancerClientFilter源码,我们其实是需要定义另一种标识
那么问题来了,当灰度测试完之后,负载怎么换成正常的lb?
这个涉及到网关route动态配置,publushEvent即可(这个不在本章介绍)
实现demo参照另一篇文章
- gateway灰度发布
GrayGatewayReactiveLoadBalancerClientAutoConfiguration
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.cloud.client.loadbalancer.LoadBalancerProperties;
import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class GrayGatewayReactiveLoadBalancerClientAutoConfiguration {
public GrayGatewayReactiveLoadBalancerClientAutoConfiguration() {
}
@Bean
@ConditionalOnMissingBean({GrayReactiveLoadBalancerClientFilter.class})
public GrayReactiveLoadBalancerClientFilter grayReactiveLoadBalancerClientFilter(LoadBalancerClientFactory clientFactory, LoadBalancerProperties properties) {
return new GrayReactiveLoadBalancerClientFilter(clientFactory, properties);
}
}
GrayLoadBalancer
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.DefaultResponse;
import org.springframework.cloud.client.loadbalancer.EmptyResponse;
import org.springframework.cloud.client.loadbalancer.Request;
import org.springframework.cloud.client.loadbalancer.Response;
import org.springframework.cloud.loadbalancer.core.NoopServiceInstanceListSupplier;
import org.springframework.cloud.loadbalancer.core.ReactorServiceInstanceLoadBalancer;
import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
import org.springframework.http.HttpHeaders;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.*;
public class GrayLoadBalancer implements ReactorServiceInstanceLoadBalancer {
private static final Log log = LogFactory.getLog(GrayLoadBalancer.class);
private ObjectProvider serviceInstanceListSupplierProvider;
private String serviceId;
public GrayLoadBalancer(ObjectProvider serviceInstanceListSupplierProvider, String serviceId) {
this.serviceId = serviceId;
this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider;
}
@Override
public Mono> choose(Request request) {
HttpHeaders headers = (HttpHeaders) request.getContext();
if (this.serviceInstanceListSupplierProvider != null) {
ServiceInstanceListSupplier supplier = (ServiceInstanceListSupplier)this.serviceInstanceListSupplierProvider.getIfAvailable(NoopServiceInstanceListSupplier::new);
return ((Flux)supplier.get()).next().map(list->getInstanceResponse((List)list,headers));
}
return null;
}
private Response getInstanceResponse(List instances, HttpHeaders headers) {
if (instances.isEmpty()) {
return getServiceInstanceEmptyResponse();
} else {
return getServiceInstanceResponseWithWeight(instances);
}
}
private Response getServiceInstanceResponseByVersion(List instances, HttpHeaders headers) {
String versionNo = headers.getFirst("version");
//System.out.println(versionNo);
Map versionMap = new HashMap<>();
versionMap.put("version",versionNo);
final Set> attributes =
Collections.unmodifiableSet(versionMap.entrySet());
ServiceInstance serviceInstance = null;
for (ServiceInstance instance : instances) {
Map metadata = instance.getMetadata();
if(metadata.entrySet().containsAll(attributes)){
serviceInstance = instance;
break;
}
}
if(ObjectUtils.isEmpty(serviceInstance)){
return getServiceInstanceEmptyResponse();
}
return new DefaultResponse(serviceInstance);
}
private Response getServiceInstanceResponseWithWeight(List instances) {
Map weightMap = new HashMap<>();
for (ServiceInstance instance : instances) {
Map metadata = instance.getMetadata();
//System.out.println(metadata.get("version")+"-->weight:"+metadata.get("weight"));
if(metadata.containsKey("gray")){
weightMap.put(instance,1000);
}else {
weightMap.put(instance,1);
}
}
WeightMeta weightMeta = WeightRandomUtils.buildWeightMeta(weightMap);
if(ObjectUtils.isEmpty(weightMeta)){
return getServiceInstanceEmptyResponse();
}
ServiceInstance serviceInstance = weightMeta.random();
if(ObjectUtils.isEmpty(serviceInstance)){
return getServiceInstanceEmptyResponse();
}
return new DefaultResponse(serviceInstance);
}
private Response getServiceInstanceEmptyResponse() {
log.warn("No servers available for service: " + this.serviceId);
return new EmptyResponse();
}
}
GrayReactiveLoadBalancerClientFilter
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.*;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.cloud.gateway.filter.ReactiveLoadBalancerClientFilter;
import org.springframework.cloud.gateway.support.DelegatingServiceInstance;
import org.springframework.cloud.gateway.support.NotFoundException;
import org.springframework.cloud.gateway.support.ServerWebExchangeUtils;
import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory;
import org.springframework.core.Ordered;
import org.springframework.http.HttpHeaders;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
import javax.servlet.http.HttpServletRequest;
import java.net.URI;
public class GrayReactiveLoadBalancerClientFilter implements GlobalFilter, Ordered {
private static final Log log = LogFactory.getLog(ReactiveLoadBalancerClientFilter.class);
private static final int LOAD_BALANCER_CLIENT_FILTER_ORDER = 10150;
private final LoadBalancerClientFactory clientFactory;
private LoadBalancerProperties properties;
public GrayReactiveLoadBalancerClientFilter(LoadBalancerClientFactory clientFactory, LoadBalancerProperties properties) {
this.clientFactory = clientFactory;
this.properties = properties;
}
@Override
public int getOrder() {
return LOAD_BALANCER_CLIENT_FILTER_ORDER;
}
@Override
public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) {
URI url = (URI)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);
String schemePrefix = (String)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_SCHEME_PREFIX_ATTR);
if (url != null && ("grayLb".equals(url.getScheme()) || "grayLb".equals(schemePrefix))) {
ServerWebExchangeUtils.addOriginalRequestUrl(exchange, url);
if (log.isTraceEnabled()) {
log.trace(ReactiveLoadBalancerClientFilter.class.getSimpleName() + " url before: " + url);
}
return this.choose(exchange).doOnNext((response) -> {
if (!response.hasServer()) {
throw NotFoundException.create(true, "Unable to find instance for " + url.getHost());
} else {
URI uri = exchange.getRequest().getURI();
String overrideScheme = null;
if (schemePrefix != null) {
overrideScheme = url.getScheme();
}
DelegatingServiceInstance serviceInstance = new DelegatingServiceInstance((ServiceInstance)response.getServer(), overrideScheme);
URI requestUrl = this.reconstructURI(serviceInstance, uri);
if (log.isTraceEnabled()) {
log.trace("LoadBalancerClientFilter url chosen: " + requestUrl);
}
exchange.getAttributes().put(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR, requestUrl);
}
}).then(chain.filter(exchange));
} else {
return chain.filter(exchange);
}
}
protected URI reconstructURI(ServiceInstance serviceInstance, URI original) {
return LoadBalancerUriTools.reconstructURI(serviceInstance, original);
}
private Mono> choose(ServerWebExchange exchange) {
URI uri = (URI)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);
GrayLoadBalancer loadBalancer = new GrayLoadBalancer(clientFactory.getLazyProvider(uri.getHost(), ServiceInstanceListSupplier.class), uri.getHost());
if (loadBalancer == null) {
throw new NotFoundException("No loadbalancer available for " + uri.getHost());
} else {
return loadBalancer.choose(this.createRequest(exchange));
}
}
private Request createRequest(ServerWebExchange exchange) {
HttpHeaders headers = exchange.getRequest().getHeaders();
Request request = new DefaultRequest<>(headers);
return request;
}
}
WeightMeta
import java.util.Arrays; import java.util.Random; public class WeightMeta{ private final Random ran = new Random(); private final T[] nodes; private final int[] weights; private final int maxW; public WeightMeta(T[] nodes, int[] weights) { this.nodes = nodes; this.weights = weights; this.maxW = weights[weights.length - 1]; } public T random() { int index = Arrays.binarySearch(weights, ran.nextInt(maxW) + 1); if (index < 0) { index = -1 - index; } return nodes[index]; } public T random(int ranInt) { if (ranInt > maxW) { ranInt = maxW; } else if(ranInt < 0){ ranInt = 1; } else { ranInt ++; } int index = Arrays.binarySearch(weights, ranInt); if (index < 0) { index = -1 - index; } return nodes[index]; } @Override public String toString() { StringBuilder l1 = new StringBuilder(); StringBuilder l2 = new StringBuilder("[random]t"); StringBuilder l3 = new StringBuilder("[node]tt"); l1.append(this.getClass().getName()).append(":").append(this.hashCode()).append(":n").append("[index]tt"); for (int i = 0; i < weights.length; i++) { l1.append(i).append("t"); l2.append(weights[i]).append("t"); l3.append(nodes[i]).append("t"); } l1.append("n"); l2.append("n"); l3.append("n"); return l1.append(l2).append(l3).toString(); } }
WeightRandomUtils
import java.util.HashMap;
import java.util.Map;
public class WeightRandomUtils {
public static WeightMeta buildWeightMeta(final Map weightMap) {
if(weightMap.isEmpty()){
return null;
}
final int size = weightMap.size();
Object[] nodes = new Object[size];
int[] weights = new int[size];
int index = 0;
int weightAdder = 0;
for (Map.Entry each : weightMap.entrySet()) {
nodes[index] = each.getKey();
weights[index++] = (weightAdder = weightAdder + each.getValue());
}
return new WeightMeta((T[]) nodes, weights);
}
public static void main(String[] args) {
Map map = new HashMap<>();
map.put("v1",1);
map.put("v2",2);
WeightMeta nodes = WeightRandomUtils.buildWeightMeta(map);
for(int i = 0; i < 10; i++){
new Thread(()->{
System.out.println(nodes.random());
}).start();
}
}
}
解析
可以参照ReactiveLoadBalancerClientFilter,RoundRobinLoadBalancer
就是里面会调用choose方法来进行负载
GrayLoadBalancer
启动这个,根据用户header以及访问host域名来触发
nacos 两个实例,不同版本,元数据
postman带上特定标识,调接口的时候会发现进入了灰度版本
灰度测试后,不改负载也可以,最好是可以改回正常的负载
所以进行网关路由的动态配置,有很多实现方法
ReactiveLoadBalancerClientFilter源码-
GlobalFilter, Ordered ReactiveLoadBalancerClientFilter实现这两个类,搞过网关(网关是谁)的都知道,这两就是过滤器+排序
-
为啥网关配置是按lb开头?
-
关键点,是在choose方法,ReactorLoadBalancer负载算法,决定要路由到哪台机器
-
ReactorLoadBalancer源码 (RoundRobinLoadBalancer实现类)
-
路由算法(这就是为啥前面我们会实现GrayLoadBalancer,WeightRandomUtils,就是仿造之前的RoundRobinLoadBalancer)



