栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

springboot 网关实时刷新服务列表实现

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

springboot 网关实时刷新服务列表实现

2021-11-04 18:46:02
实现原理:

	每个微服务spring会启动单独的AnnotatioinApplicationContext,该context会自动初始ILoadBalancer等类

	其中ILoadBalancer包含
	1.ServerList: 
		ServerList可用于获取实际的eureka中的服务列表
	2.ServerListUpdater:
		用于定时调用ServerList的刷新方法获取最新的服务列表更新ILoadBalancer中的Server
		
	这里通过 LoadBalancerConfiguration 配置类,在postCons
	
	@Configuration
	@Slf4j
	public class LoadBalancerConfiguration {  
		
		@Autowired
		private SpringClientFactory clientFactory;
		@Value("${ribbon.eureka.approximateZoneFromHostname:false}")
		private boolean approximateZoneFromHostname = false;
		
		
		@PostConstruct
		public void postCons() {
			this.clientFactory.setConfigurations(
					Arrays.asList(
							new RibbonClientSpecification(
									"default.ILoadBalancer.visiter", 
									new Class[] {LoadBalancerConfiguration.class})));
		}
	
		private Optional flectReadEurekaHttpClient(
				EurekaClient eurekaClient) {
			if (AopUtils.isAopProxy(eurekaClient)) {
				try {
					eurekaClient = (EurekaClient) AopTargetUtils.getTarget(eurekaClient);
				} catch (Exception e) {
					log.error("error get target from aop proxy:{}", e);
				}
			}
			EurekaHttpClient queryClient = null;
			if (eurekaClient instanceof DiscoveryClient) {
				DiscoveryClient discoveryClient = (DiscoveryClient) eurekaClient;
				try {
					Object eurekaTransport = 
							FieldUtils.readField(
									discoveryClient,
									"eurekaTransport",
									true);
					queryClient =
							(EurekaHttpClient) FieldUtils.readField(
									eurekaTransport, 
									"queryClient",
									true);
					log.info("success flect read EurekaHttpClient:{}", queryClient);
				} catch (Exception e) {
					log.error("error flect read EurekaHttpClient:{}", e);
				}
			}
			return Optional.ofNullable(queryClient);
		}
		
		@Bean
		@Lazy
		public ServerList ribbonServerList(
				IClientConfig config,
				Provider eurekaClientProvider) {
			EurekaClient eurekaClient = eurekaClientProvider.get();
			Optional flectReadEurekaHttpClient = 
					this.flectReadEurekaHttpClient(eurekaClient);
			if (!flectReadEurekaHttpClient.isPresent()) {
				log.warn("can not read EurekaHttpClient from this kind eurekaClient:{}", eurekaClient);
				return null;
			}
			CustomerDiscoveryEnabledNIWSServerList discoveryServerList = 
					new CustomerDiscoveryEnabledNIWSServerList(
							config, 
							eurekaClientProvider);
			EurekaHttpClient eurekaHttpClient = flectReadEurekaHttpClient.get();
			CustomerDomainExtractingServerList serverList = 
					new CustomerDomainExtractingServerList(
							discoveryServerList, 
							eurekaHttpClient,
							config,
							this.approximateZoneFromHostname);
			return serverList;
		}
	
	
	}
	
	这个方法里,向clientFactory(SpringClientFactory)中设置了默认配置类,用于自定义初始化ServerList的ribbonServerList方法
	
	自定义ServerList的目的有:
	
	1.获取到EurekaHttpClient,用于将EurekaClient的InstanceInfo的instanceId转回注册EureKaServer成功的InstanceInfo
	2.通过DiscoveryEnabledNIWSServerList的createServer方法,把EurekaServer获取到的InstanceInfo类型数据转为ILoadBalancer使用的Server
	
	所以流程为:
	
	1.任一系统启动的时候,监听 spring 启动后的 WebServerInitializedEvent 事件,该事件发布后,当前实例已经注册到 eureka server
	2.在WebServerInitializedEvent事件的处理方法中,获取ApplicationInfoManager中标识当前eureka客户端信息的InstanceInfo
		
			@Autowired
			private ApplicationInfoManager manager;
		
			@Override
			public String serverId() {
				InstanceInfo info = this.manager.getInfo();
				return info.getInstanceId();
			}
			获取到 instanceId
			
	3.通过事件广播途径,告知集群当前系统启动完成,实例id为 instanceId
		如:ServerOnlineEvent:
			private String serviceName;
			private String instanceId;
	
	4.网关服务监听集群实例变更事件ServerOnlineEvent
		获取变更服务(serviceName)当前仍然在线的服务列表(instanceId[]),发布ServiceServerListChangedEvent
	
		@Data
		@NoArgsConstructor
		@AllArgsConstructor
		public class ServiceServerListChangedEvent {
			
			private String serviceName;
			private List instanceServerInfoIds;
		
		}
	
	5.ServiceInstanceInfoChangedListener在网关服务内部监听ServiceServerListChangedEvent
		通过SpringClientFactory获取对应 serviceName 对应的ILoadBalancer负载器
		构建LoadBalancerVisitor
		
		LoadBalancerVisitor 进行的操作有:
		
		1.停用ILoadBalancer默认的ServerListUpdater,完全切换到事件监听上
		2.调用CustomerDomainExtractingServerList的createServer方法,将 instanceId 转为 具体注册
			在Eureka Server中的InstanceInfo,并继续由CustomerDiscoveryEnabledNIWSServerList转为
			负载器内部使用的DiscoveryEnabledServer
		3.调用ILoadBalancer的setServersList更新服务列表
		
	完
	
	weike-gateway当前使用akka集群监控服务列表的变更事件
		
附录:

1.
	import javax.inject.Provider;

	import com.netflix.appinfo.InstanceInfo;
	import com.netflix.client.config.CommonClientConfigKey;
	import com.netflix.client.config.DefaultClientConfigImpl;
	import com.netflix.client.config.IClientConfig;
	import com.netflix.discovery.EurekaClient;
	import com.netflix.niws.loadbalancer.DiscoveryEnabledNIWSServerList;
	import com.netflix.niws.loadbalancer.DiscoveryEnabledServer;
	
	
	public class CustomerDiscoveryEnabledNIWSServerList extends DiscoveryEnabledNIWSServerList {
	
		private boolean isSecure;
		private boolean shouldUseIpAddr;
	
		@SuppressWarnings("deprecation")
		public CustomerDiscoveryEnabledNIWSServerList(
				IClientConfig clientConfig,
				Provider eurekaClientProvider) {
			super(clientConfig, eurekaClientProvider);
			this.isSecure = 
					Boolean.parseBoolean("" + clientConfig.getProperty(CommonClientConfigKey.IsSecure, "false"));
			this.shouldUseIpAddr = 
					clientConfig.getPropertyAsBoolean(
							CommonClientConfigKey.UseIPAddrForServer,
							DefaultClientConfigImpl.DEFAULT_USEIPADDRESS_FOR_SERVER);
		}
	
		DiscoveryEnabledServer createServer(
				InstanceInfo instanceInfo) {
			return super.createServer(
					instanceInfo, 
					isSecure, 
					shouldUseIpAddr);
		}
	
	}
		
2.
	import java.util.concurrent.TimeUnit;

	import org.springframework.cloud.netflix.ribbon.eureka.DomainExtractingServerList;
	
	import com.netflix.appinfo.InstanceInfo;
	import com.netflix.client.config.IClientConfig;
	import com.netflix.discovery.shared.transport.EurekaHttpClient;
	import com.netflix.niws.loadbalancer.DiscoveryEnabledServer;
	
	import lombok.extern.slf4j.Slf4j;
	
	
	@Slf4j
	public class CustomerDomainExtractingServerList extends DomainExtractingServerList {
		
		private CustomerDiscoveryEnabledNIWSServerList serverList;
		private EurekaHttpClient queryClient;
	
		public CustomerDomainExtractingServerList(
				CustomerDiscoveryEnabledNIWSServerList list, 
				EurekaHttpClient registrationClient,
				IClientConfig clientConfig,
				boolean approximateZoneFromHostname) {
			super(list, clientConfig, approximateZoneFromHostname);
			this.serverList = list;
			this.queryClient = registrationClient;
		}
		
		DiscoveryEnabledServer createServer(
				String instanceId) {
			if (queryClient == null) {
				return null;
			}
			try {
				log.info("try getInstance by instanceId:{}", instanceId);
				InstanceInfo instanceInfo = 
						this.instanceInfo(instanceId, 0);
				log.info("for instanceId:{} and instanceInfo:{}", instanceId, instanceInfo);
				return this.serverList.createServer(instanceInfo);
			} catch (Exception e) {
				log.error("error createServer:{}", e);
			}
			return null;
		}
		
		private InstanceInfo instanceInfo(
				String instanceId,
				int curTimes) {
			if (curTimes >= 3) {
				return null;
			}
			try {
				return this.queryClient.getInstance(instanceId)
						.getEntity();
			} catch (Exception e) {
				log.error("error getInstance:{}", e);
			}
			try {
				TimeUnit.SECONDS.sleep(3);
			} catch (InterruptedException e) {
				log.warn("InterruptedException:{}", e);
			}
			return instanceInfo(instanceId, curTimes + 1);
		}
	}
	
3.
	import java.util.ArrayList;
	import java.util.HashSet;
	import java.util.List;
	import java.util.Set;
	import java.util.function.Function;
	import java.util.stream.Collectors;
	
	import com.netflix.loadbalancer.ServerListFilter;
	import com.netflix.loadbalancer.ZoneAwareLoadBalancer;
	import com.netflix.niws.loadbalancer.DiscoveryEnabledServer;
	import com.zmeng.weike.gateway.infa.serverRealTimeUpdate.vo.ApplicationInstanceInfoServerId;
	
	import lombok.Getter;
	import lombok.extern.slf4j.Slf4j;
	
	
	@Slf4j
	public class LoadBalancerVisitor {
		
		@Getter
		private String serviceId;
		private ZoneAwareLoadBalancer balancer;
		private Function serverApply;
		
		private Set allonlineInstanceServer = new HashSet<>();
		
		public LoadBalancerVisitor(
				ZoneAwareLoadBalancer balancer) {
			balancer.setFilter(
					buildCustomerVisitableFilter(
							balancer.getName(),
							balancer.getFilter()));
			// 停用刷新
			balancer.stopServerListRefreshing();
			this.serviceId = balancer.getName();
			this.balancer = balancer;
			CustomerDomainExtractingServerList serverList = 
					(CustomerDomainExtractingServerList) this.balancer.getServerListImpl();
			this.serverApply = 
					serverId -> serverList.createServer(serverId.getInstanceInfoServerId());
			log.info(
					"visitor configed for service name:{}",
					balancer.getName());
		}
		
		public synchronized void serverListChanged(
				List onlineServerList) {
			this.allonlineInstanceServer = 
					onlineServerList.stream()
					.map(this.serverApply::apply)
					.collect(Collectors.toSet());
			
			this.balancer.setServersList(
					new ArrayList<>(this.allOnlineInstanceServer));
			log.info(
					"service:{} online server list:{}",
					this.serviceId, 
					allOnlineInstanceServer);
		}
		
		private ServerListFilter buildCustomerVisitableFilter(
				String clientName,
				ServerListFilter filter) {
			return new ServerListFilter() {
				@Override
				public List getFilteredListOfServers(
						List servers) {
					log.info("ServerListFilter clientName:{}-{}", clientName, allOnlineInstanceServer);
					return filter.getFilteredListOfServers(new ArrayList<>(allOnlineInstanceServer));
				}
			};
		} 
	}
	
4.
	import java.util.List;
	import java.util.Map;
	import java.util.Optional;
	import java.util.concurrent.ConcurrentHashMap;
	
	import org.springframework.beans.factory.annotation.Autowired;
	import org.springframework.cloud.netflix.ribbon.SpringClientFactory;
	import org.springframework.context.event.EventListener;
	import org.springframework.stereotype.Component;
	
	import com.netflix.loadbalancer.ZoneAwareLoadBalancer;
	import com.netflix.niws.loadbalancer.DiscoveryEnabledServer;
	import com.zmeng.weike.gateway.infa.serverRealTimeUpdate.vo.ApplicationInstanceInfoServerId;
	import com.zmeng.weike.gateway.infa.serverRealTimeUpdate.vo.ServiceServerListChangedEvent;
	
	import lombok.extern.slf4j.Slf4j;
	
	
	@Component
	@Slf4j
	public class ServiceInstanceInfoChangedListener {
		
		@Autowired
		private SpringClientFactory clientFactory;
		
		private final Map visitorMap = new ConcurrentHashMap<>();
		
		@SuppressWarnings("unchecked")
		private void configLoadBalancer(
				Object bean) {
			if (bean instanceof ZoneAwareLoadBalancer) {
				ZoneAwareLoadBalancer tmp = 
						(ZoneAwareLoadBalancer) bean;
				if (this.visitorMap.containsKey(tmp.getName())) {
					return;
				}
				LoadBalancerVisitor visitor = 
						new LoadBalancerVisitor(tmp);
				this.visitorMap.put(visitor.getServiceId(), visitor);
			}
		}
		
		public synchronized Optional getBalancer(
				String serverId) {
			if (!this.visitorMap.containsKey(serverId)) {
				this.configLoadBalancer(
						this.clientFactory.getLoadBalancer(serverId));
			}
			return Optional.ofNullable(
					this.visitorMap.get(serverId));
		}
		
		@EventListener
		public void onChanged(
				ServiceServerListChangedEvent event) {
			log.info("ServiceServerListChangedEvent:{}", event);
			String serviceId = event.getServiceName();
			List onlineServerList = 
					event.getInstanceServerInfoIds();
			this.getBalancer(serviceId)
			.ifPresent(visitor -> {
				visitor.serverListChanged(onlineServerList);
			});
		}
	
	}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/440056.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号