ClusterModule能力
集群注册发现实现 单机模式集群模式一zookeeper
原理图源码分析一ClusterModuleZookeeperProvider.prepare源码分析一ZookeeperCoordinator 总结
ClusterModule能力coreModule主要定义了服务注册与服务发现能力
public class ClusterModule extends ModuleDefine {
public static final String NAME = "cluster";
public ClusterModule() {
super(NAME);
}
服务注册与服务发现
@Override public Class[] services() {
return new Class[] {ClusterRegister.class, ClusterNodesQuery.class};
}
}
集群注册发现实现
支持单机支持zk,nacos,consul支持云原生场景下k8s管理
单机模式prepare阶段构建StandaloneManager单机管理器core模块启动时注册自身到StandaloneManager
public class ClusterModuleStandaloneProvider extends ModuleProvider {
@Override public void prepare() throws ServiceNotProvidedException {
单机管理器
StandaloneManager standaloneManager = new StandaloneManager();
this.registerServiceImplementation(ClusterRegister.class, standaloneManager);
this.registerServiceImplementation(ClusterNodesQuery.class, standaloneManager);
}
}
core模块启动时注册自己,因为是Standalone模式,信息维护在内存单机模式包装自己成list返回
public class StandaloneManager implements ClusterNodesQuery, ClusterRegister {
private volatile RemoteInstance remoteInstance;
@Override public void registerRemote(RemoteInstance remoteInstance) {
core模块启动时注册自己,因为是Standalone模式,信息维护在内存
this.remoteInstance = remoteInstance;
this.remoteInstance.getAddress().setSelf(true);
TelemetryRelatedContext.INSTANCE.setId("standalone");
}
@Override
public List queryRemoteNodes() {
if (remoteInstance == null) {
return new ArrayList(0);
}
ArrayList remoteList = new ArrayList(1);
remoteList.add(remoteInstance);
单机模式包装自己成list返回
return remoteList;
}
}
集群模式一zookeeper
原理图
curator-framework.jar提供zkclient通信封装curator-x-discovery.jar提供zk的服务注册发现封装ServiceCache监听并缓存zk数据ServiceDiscovery向zk写数据并触发ServiceCache监听变更ZookeeperCoordinator是集群模块对zk的封装,提供服务注册发现能力
源码分析一ClusterModuleZookeeperProvider.prepare
构建zk客户端构建CuratorframeworkFactory.Builder构建zk的服务发现类serviceDiscovery启动服务发现对外暴露ZookeeperCoordinator,封装zk服务发现工具[serviceDiscovery]ZookeeperCoordinator实现服务注册ClusterRegister和服务发现ClusterNodesQuery接口
public class ClusterModuleZookeeperProvider extends ModuleProvider {
private final ClusterModuleZookeeperConfig config;
private Curatorframework client;
private ServiceDiscovery serviceDiscovery;
@Override public void prepare() throws ServiceNotProvidedException, ModuleStartException {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(config.getbaseSleepTimeMs(), config.getMaxRetries());
构建zk CuratorframeworkFactory
CuratorframeworkFactory.Builder builder = CuratorframeworkFactory.builder()
.retryPolicy(retryPolicy)
.connectString(config.getHostPort());
访问控制安全相关
if (config.isEnableACL()) {
String authInfo = config.getexpression();
if ("digest".equals(config.getSchema())) {
try {
authInfo = DigestAuthenticationProvider.generateDigest(authInfo);
} catch (NoSuchAlgorithmException e) {
throw new ModuleStartException(e.getMessage(), e);
}
} else {
throw new ModuleStartException("Support digest schema only.");
}
final List acls = Lists.newArrayList();
acls.add(new ACL(ZooDefs.Perms.ALL, new Id(config.getSchema(), authInfo)));
acls.add(new ACL(ZooDefs.Perms.READ, ZooDefs.Ids.ANYONE_ID_UNSAFE));
ACLProvider provider = new ACLProvider() {
@Override
public List getDefaultAcl() {
return acls;
}
@Override
public List getAclForPath(String s) {
return acls;
}
};
builder.aclProvider(provider);
builder.authorization(config.getSchema(), config.getexpression().getBytes());
}
zkClient 构建
client = builder.build();
String path = base_PATH + (StringUtil.isEmpty(config.getNameSpace()) ? "" : "/" + config.getNameSpace());
zk curator-x-discovery模块,该模块支持封装了zk的服务注册发现能力
serviceDiscovery = ServiceDiscoveryBuilder.builder(RemoteInstance.class).client(client)
.basePath(path)
.watchInstances(true)
SWInstanceSerializer 将数据序列化成json
.serializer(new SWInstanceSerializer()).build();
提供zk节点注册和集群信息读取
ZookeeperCoordinator coordinator;
try {
client.start();
阻塞下 等待连接成功
client.blockUntilConnected();
serviceDiscovery.start();
构建服务注册发现对象封装zk 提供服务注册和服务发现能力
coordinator = new ZookeeperCoordinator(config, serviceDiscovery);
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new ModuleStartException(e.getMessage(), e);
}
通过coordinator对集群的信息进行注册,注册数据维护在zk
this.registerServiceImplementation(ClusterRegister.class, coordinator);
通过coordinator对集群的信息进行查询
this.registerServiceImplementation(ClusterNodesQuery.class, coordinator);
}
@Override public void start() {
无实现
}
@Override public void notifyAfterCompleted() {
无实现
}
}
源码分析一ZookeeperCoordinator
提供服务注册接口提供服务发现接口基于zk工具包curator-x-discovery提供serviceDiscovery进行服务注册基于zk工具包curator-x-discovery提供serviceCache进行服务发现serviceCache负责监听zk服务端数据并缓存在JVM进程
public class ZookeeperCoordinator implements ClusterRegister, ClusterNodesQuery {
private static final Logger logger = LoggerFactory.getLogger(ZookeeperCoordinator.class);
private static final String REMOTE_NAME_PATH = "remote";
private final ClusterModuleZookeeperConfig config;
zk工具包curator-x-discovery模块 服务发现工具
private final ServiceDiscovery serviceDiscovery;
zk工具包curator-x-discovery模块 服务发现结果缓存
private final ServiceCache serviceCache;
private volatile Address selfAddress;
ZookeeperCoordinator(ClusterModuleZookeeperConfig config, ServiceDiscovery serviceDiscovery) throws Exception {
this.config = config;
this.serviceDiscovery = serviceDiscovery;
this.serviceCache = serviceDiscovery.serviceCacheBuilder().name(REMOTE_NAME_PATH).build();
首次拉取zk上监听的数据 并开启listen机制监听zk数据
this.serviceCache.start();
}
服务注册
@Override public synchronized void registerRemote(RemoteInstance remoteInstance) throws ServiceRegisterException {
try {
if (needUsingInternalAddr()) {
remoteInstance = new RemoteInstance(new Address(config.getInternalComHost(), config.getInternalComPort(), true));
}
ServiceInstance thisInstance = ServiceInstance.builder()
.name(REMOTE_NAME_PATH)
.id(UUID.randomUUID().toString())
.address(remoteInstance.getAddress().getHost())
.port(remoteInstance.getAddress().getPort())
.payload(remoteInstance)
.build();
向zk注册数据
serviceDiscovery.registerService(thisInstance);
this.selfAddress = remoteInstance.getAddress();
TelemetryRelatedContext.INSTANCE.setId(selfAddress.toString());
} catch (Exception e) {
throw new ServiceRegisterException(e.getMessage());
}
}
服务发现
@Override public List queryRemoteNodes() {
List remoteInstanceDetails = new ArrayList<>(20);
从缓存中获取zk上注册的集群机器集合
List> serviceInstances = serviceCache.getInstances();
serviceInstances.forEach(serviceInstance -> {
RemoteInstance instance = serviceInstance.getPayload();
if (instance.getAddress().equals(selfAddress)) {
instance.getAddress().setSelf(true);
} else {
instance.getAddress().setSelf(false);
}
remoteInstanceDetails.add(instance);
});
return remoteInstanceDetails;
}
private boolean needUsingInternalAddr() {
return !Strings.isNullOrEmpty(config.getInternalComHost()) && config.getInternalComPort() > 0;
}
}
总结
skywalking提供了多种服务注册发现机制,包括k8s,nacos等等集群主要用于RemoteClientManager,agent上报的一条数据在处理的过程中,可能会经过多个集群节点,此时就需要服务注册发现机制进行节点数据传输



