栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java > SpringBoot

实现一个分布式调度系统-服务注册

SpringBoot 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

实现一个分布式调度系统-服务注册

上一篇主要介绍了调度模块的核心Quartz:
https://www.imooc.com/article/272332
因为我们最终要实现一个分布式的调度,所以这次我们会介绍服务的注册
前提
本次我们选用zookeeper来实现服务的注册
引入相关的依赖
    2.12.0
    4.0.1

      
 org.apache.curator
 curator-framework
 ${curator.version}
      
      
 org.apache.curator
 curator-client
 ${curator.version}
      
      
 org.apache.curator
 curator-recipes
 ${curator.version}
      
      
 com.esotericsoftware
 kryo-shaded
 ${kryo-shaded.version}
      
原理
简单来说利用的是zk可以创建临时节点,临时节点的生命周期和客户端会话绑定,一旦客户端会话失效,那么这个客户端创建的所有临时节点都会被移除。
主要代码实现
1-客户端初始化
  public void init() {

 if (client != null) {
     return;
 }

 //启动zk客户端
 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);

 client = CuratorframeworkFactory.builder().connectString(zkProperties.getServer())
  .sessionTimeoutMs(10000).connectionTimeoutMs(10000).retryPolicy(retryPolicy).namespace("admin").build();

 client.start();

 try {
     // 判断在admin命名空间下是否有jobRegister节点  /job-register   后续注册操作在此下面
     if (client.checkExists().forPath(zkProperties.getPath()) == null) {
  
  client.create().creatingParentsIfNeeded()
   .withMode(CreateMode.PERSISTENT) // 节点类型:持久节点
   .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)     // acl:匿名权限
   .forPath(zkProperties.getPath());
     }
     log.info("zookeeper服务器状态:{}", client.getState());
     if(zkProperties.isOpen()){
  addChildWatch(zkProperties.getPath());
     }

     ShutDownHook.registerShutdownHook(this);//加入到hook事件
 } catch (Exception e) {
     log.error("zookeeper客户端连接、初始化错误...");
     e.printStackTrace();
 }
    }

2-监听事件
  
    private void addChildWatch(String registerPath) throws Exception {
 pcache = new PathChildrenCache(client, registerPath, true);
 pcache.start();
 pcache.getListenable().addListener(new PathChildrenCacheListener() {
     @Override
     public void childEvent(Curatorframework curatorframework, PathChildrenCacheEvent event) throws Exception {

  if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {

      String path = event.getData().getPath();
      log.info("{}注册新的执行器:{}", Constants.LOG_PREFIX, path);

      addChildsWatch(path);

  } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {

  } else if (event.getType().equals(PathChildrenCacheEvent.Type.CONNECTION_RECONNECTED)) {
      log.info("{}重新启动zk", Constants.LOG_PREFIX);
  }

     }
 });
    }

    private PathChildrenCache cache = null;

    private void addChildsWatch(String registerPath) throws Exception {
 cache = new PathChildrenCache(client, registerPath, true);
 cache.start();
 cache.getListenable().addListener(new PathChildrenCacheListener() {
     @Override
     public void childEvent(Curatorframework curatorframework, PathChildrenCacheEvent event) throws Exception {

  String data = new String(event.getData().getData());
  String executorName=registerPath.substring(registerPath.lastIndexOf(Constants.JOIN_SYMBOL) + 1,registerPath.length());
  if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {

      log.info("{}执行器:{},有新服务加入:{}", Constants.LOG_PREFIX, executorName, data);
      ServerCache.addCache(executorName,data);
  } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {

      log.info("{}执行器:{},有服务退出", Constants.LOG_PREFIX, registerPath);
      ServerCache.removeCache(executorName,data);
  }
     }
 });
    }

3-创建持久节点
public void createPersistentNode(String path) {

 try {
     if (client.checkExists().forPath(path) == null) {
  client.create().creatingParentsIfNeeded()
   .withMode(CreateMode.PERSISTENT) // 节点类型:持久节点
   .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)     // acl:匿名权限
   .forPath(path);
  log.info("{}create createPersistentNode:{}", Constants.LOG_PREFIX, path);
     }
 } catch (Exception e) {
     e.printStackTrace();
     log.error("zookeeper创建持久节点失败...{}", path);
 }

    }

4-创建临时顺序节点(和持久节点不同的是 , 临时节点的生命周期和客户端会话绑定 。 也就是说 , 如果客户端会话失效 , 那么这个节点就会自动被清除掉)
public void createPhemeralEphemeralNode(String path, String address) {

 try {
     if (client.checkExists().forPath(path) == null) {
  client.create().creatingParentsIfNeeded()
   .withMode(CreateMode.EPHEMERAL_SEQUENTIAL) // 临时顺序节点
   .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)     // acl:匿名权限
   .forPath(path, address.getBytes());
  log.info("{}create createePhemeralEphemeralNode:{}", Constants.LOG_PREFIX, path);

     }
 } catch (Exception e) {
     e.printStackTrace();
     log.error("zookeeper创建临时顺序节点失败...{}", path);
 }

    }

5-获取值
public String getData(String path) {
 String dataPath = path;
 try {
     byte[] result = client.getData().forPath(dataPath);
     return new String(result);
 } catch (Exception e) {
     e.printStackTrace();
     return null;
 }
    }
和SpringBoot的结合
如果在SpringBoot中我们想把服务注册作为一个模块,当其他项目引入时候自动可以启动,
我们可以把其做成一个AutoConfiguration的类
类似:
@Configuration
@EnableConfigurationProperties(ZkProperties.class)
public class ServiceRegistryAutoConfiguration {

    @Autowired
    private ZkProperties zkProperties;

    @Bean
    @ConditionalOnMissingBean(name = "serviceRegistry")
    @ConditionalOnProperty(value = {"faya-job.register.server", "faya-job.register.path"})
    public ServiceRegistry serviceRegistry() {

 log.info("init default register");

 ZKCuratorClient zkCuratorClient = new ZKCuratorClient(zkProperties);
 zkCuratorClient.init();
 ZkServiceRegistry zkServiceRegistry = new ZkServiceRegistry(zkCuratorClient, zkProperties);
 return zkServiceRegistry;

    }
}

关于上面使用的注解,大家可以查阅一下,比如ConditionalOnProperty是指项目中配置了指定的配置参数,此类才会生效。
关于SpringBoot的autoconfig自动配置后面会单独介绍,比如实现一个springboot-redis模块。
总结
利用zk实现一个服务注册很方便。
完整代码:https://github.com/lizu18xz/faya-job
转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号