栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Netty + ZooKeeper 实现简单的服务注册与发现

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Netty + ZooKeeper 实现简单的服务注册与发现

一. 背景

最近的一个项目:我们的系统接收到上游系统的派单任务后,会推送到指定的门店的相关设备,并进行相应的业务处理。

二. Netty 的使用

在接收到派单任务之后,通过 Netty 推送到指定门店相关的设备。在我们的系统中 Netty 实现了消息推送、长连接以及心跳机制。

2.1 Netty Server 端:

每个 Netty 服务端通过 ConcurrentHashMap 保存了客户端的 clientId 以及它连接的 SocketChannel。

服务器端向客户端发送消息时,只要获取 clientId 对应的 SocketChannel,往 SocketChannel 里写入相应的 message 即可。

 EventLoopGroup boss = new NioEventLoopGroup(1);
 EventLoopGroup worker = new NioEventLoopGroup();
 ServerBootstrap bootstrap = new ServerBootstrap();
 bootstrap.group(boss, worker)
  .channel(NioServerSocketChannel.class)
  .option(ChannelOption.SO_BACKLOG, 128)
  .option(ChannelOption.TCP_NODELAY, true)
  .childOption(ChannelOption.SO_KEEPALIVE, true)
  .childHandler(new ChannelInitializer() {
      @Override
      protected void initChannel(Channel channel) throws Exception {
   ChannelPipeline p = channel.pipeline();
   p.addLast(new MessageEncoder());
   p.addLast(new MessageDecoder());
   p.addLast(new PushServerHandler());
      }
  });

 ChannelFuture future = bootstrap.bind(host,port).sync();
 if (future.isSuccess()) {
     logger.info("server start...");
 }
2.2 Netty Client 端:

客户端用于接收服务端的消息,随即进行业务处理。客户端还有心跳机制,它通过 IdleEvent 事件定时向服务端放送 Ping 消息以此来检测 SocketChannel 是否中断。

    public PushClientBootstrap(String host, int port) throws InterruptedException {

 this.host = host;
 this.port = port;

 start(host,port);
    }

    private void start(String host, int port) throws InterruptedException {

 bootstrap = new Bootstrap();
 bootstrap.channel(NioSocketChannel.class)
  .option(ChannelOption.SO_KEEPALIVE, true)
  .group(workGroup)
  .remoteAddress(host, port)
  .handler(new ChannelInitializer(){

      @Override
      protected void initChannel(Channel channel) throws Exception {
   ChannelPipeline p = channel.pipeline();
   p.addLast(new IdleStateHandler(20, 10, 0));  // IdleStateHandler 用于检测心跳
   p.addLast(new MessageDecoder());
   p.addLast(new MessageEncoder());
   p.addLast(new PushClientHandler());
      }
  });
 doConnect(port, host);
    }

    
    private void doConnect(int port, String host) throws InterruptedException {

 if (socketChannel != null && socketChannel.isActive()) {
     return;
 }

 final int portConnect = port;
 final String hostConnect = host;

 ChannelFuture future = bootstrap.connect(host, port);

 future.addListener(new ChannelFutureListener() {

     @Override
     public void operationComplete(ChannelFuture futureListener) throws Exception {
  if (futureListener.isSuccess()) {
      socketChannel = (SocketChannel) futureListener.channel();
      logger.info("Connect to server successfully!");
  } else {
      logger.info("Failed to connect to server, try connect after 10s");

      futureListener.channel().eventLoop().schedule(new Runnable() {
   @Override
   public void run() {
try {
    doConnect(portConnect, hostConnect);
} catch (InterruptedException e) {
    e.printStackTrace();
}
   }
      }, 10, TimeUnit.SECONDS);
  }
     }
 }).sync();
    }
三. 借助 ZooKeeper 实现简单的服务注册与发现 3.1 服务注册

服务注册本质上是为了解耦服务提供者和服务消费者。服务注册是一个高可用强一致性的服务发现存储仓库,主要用来存储服务的api和地址对应关系。为了高可用,服务注册中心一般为一个集群,并且能够保证分布式一致性。目前常用的有 ZooKeeper、Etcd 等等。

在我们项目中采用了 ZooKeeper 实现服务注册。

public class ServiceRegistry {

    private static final Logger logger = LoggerFactory.getLogger(ServiceRegistry.class);

    private CountDownLatch latch = new CountDownLatch(1);

    private String registryAddress;

    public ServiceRegistry(String registryAddress) {
 this.registryAddress = registryAddress;
    }

    public void register(String data) {
 if (data != null) {
     ZooKeeper zk = connectServer();
     if (zk != null) {
  createNode(zk, data);
     }
 }
    }

    
    private ZooKeeper connectServer() {
 ZooKeeper zk = null;
 try {
     zk = new ZooKeeper(registryAddress, Constants.ZK_SESSION_TIMEOUT, new Watcher() {
  @Override
  public void process(WatchedEvent event) {
      if (event.getState() == Event.KeeperState.SyncConnected) {
   latch.countDown();
      }
  }
     });
     latch.await();
 } catch (IOException | InterruptedException e) {
     logger.error("", e);
 }
 return zk;
    }

    
    private void createNode(ZooKeeper zk, String data) {
 try {
     byte[] bytes = data.getBytes();
     String path = zk.create(Constants.ZK_DATA_PATH, bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
     logger.debug("create zookeeper node ({} => {})", path, data);
 } catch (KeeperException | InterruptedException e) {
     logger.error("", e);
 }
    }
}

有了服务注册,在 Netty 服务端启动之后,将 Netty 服务端的 ip 和 port 注册到 ZooKeeper。

 EventLoopGroup boss = new NioEventLoopGroup(1);
 EventLoopGroup worker = new NioEventLoopGroup();
 ServerBootstrap bootstrap = new ServerBootstrap();
 bootstrap.group(boss, worker)
  .channel(NioServerSocketChannel.class)
  .option(ChannelOption.SO_BACKLOG, 128)
  .option(ChannelOption.TCP_NODELAY, true)
  .childOption(ChannelOption.SO_KEEPALIVE, true)
  .childHandler(new ChannelInitializer() {
      @Override
      protected void initChannel(Channel channel) throws Exception {
   ChannelPipeline p = channel.pipeline();
   p.addLast(new MessageEncoder());
   p.addLast(new MessageDecoder());
   p.addLast(new PushServerHandler());
      }
  });

 ChannelFuture future = bootstrap.bind(host,port).sync();
 if (future.isSuccess()) {
     logger.info("server start...");
 }

 if (serviceRegistry != null) {
     serviceRegistry.register(host + ":" + port);
 }
3.2 服务发现

这里我们采用的是客户端的服务发现,即服务发现机制由客户端实现。

客户端在和服务端建立连接之前,通过查询注册中心的方式来获取服务端的地址。如果存在有多个 Netty 服务端的话,可以做服务的负载均衡。在我们的项目中只采用了简单的随机法进行负载。

public class ServiceDiscovery {

    private static final Logger logger = LoggerFactory.getLogger(ServiceDiscovery.class);

    private CountDownLatch latch = new CountDownLatch(1);

    private volatile List serviceAddressList = new ArrayList<>();

    private String registryAddress; // 注册中心的地址

    public ServiceDiscovery(String registryAddress) {
 this.registryAddress = registryAddress;

 ZooKeeper zk = connectServer();
 if (zk != null) {
     watchNode(zk);
 }
    }

    
    public String discover() {
 String data = null;
 int size = serviceAddressList.size();
 if (size > 0) {
     if (size == 1) {  //只有一个服务提供方
  data = serviceAddressList.get(0);
  logger.info("unique service address : {}", data);
     } else {   //使用随机分配法。简单的负载均衡法
  data = serviceAddressList.get(ThreadLocalRandom.current().nextInt(size));
  logger.info("choose an address : {}", data);
     }
 }
 return data;
    }

    
    private ZooKeeper connectServer() {

 ZooKeeper zk = null;
 try {
     zk = new ZooKeeper(registryAddress, Constants.ZK_SESSION_TIMEOUT, new Watcher() {
  @Override
  public void process(WatchedEvent event) {
      if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
   latch.countDown();
      }
  }
     });
     latch.await();
 } catch (IOException | InterruptedException e) {
     logger.error("", e);
 }
 return zk;
    }

    
    private void watchNode(final ZooKeeper zk) {

 try {
     //获取子节点列表
     List nodeList = zk.getChildren(Constants.ZK_REGISTRY_PATH, new Watcher() {
  @Override
  public void process(WatchedEvent event) {
      if (event.getType() == Event.EventType.NodeChildrenChanged) {
   //发生子节点变化时再次调用此方法更新服务地址
   watchNode(zk);
      }
  }
     });
     List dataList = new ArrayList<>();
     for (String node : nodeList) {
  byte[] bytes = zk.getData(Constants.ZK_REGISTRY_PATH + "/" + node, false, null);
  dataList.add(new String(bytes));
     }
     logger.debug("node data: {}", dataList);
     this.serviceAddressList = dataList;
 } catch (KeeperException | InterruptedException e) {
     logger.error("", e);
 }
    }
}

Netty 客户端启动之后,通过服务发现获取 Netty 服务端的 ip 和 port。

    
    public PushClientBootstrap(String discoveryAddress) throws InterruptedException {

 serviceDiscovery = new ServiceDiscovery(discoveryAddress);
 serverAddress = serviceDiscovery.discover();

 if (serverAddress!=null) {
     String[] array = serverAddress.split(":");
     if (array!=null && array.length==2) {

  String host = array[0];
  int port = Integer.parseInt(array[1]);

  start(host,port);
     }
 }
    }
四. 总结

服务注册和发现一直是分布式的核心组件。本文介绍了借助 ZooKeeper 做注册中心,如何实现一个简单的服务注册和发现。其实,注册中心的选择有很多,例如 Etcd、Eureka 等等。选择符合我们业务需求的才是最重要的。

转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号