栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

利用Zookeeper的结点实现分布式锁解决高并发访问

利用Zookeeper的结点实现分布式锁解决高并发访问

项目结构(pay是一个springboot项目)

描述:模拟高并发下单数据不安全的场景并用zk的结点解决

application.yaml、数据库、pom.xml准备
server:
  port: 9999
  servlet:
    context-path: /pay
spring:
  datasource:
    driver-class-name: com.mysql.jdbc.Driver
    username: root
    password: chickenkang.
    url: jdbc:mysql://localhost:3306/interview
  jpa:
    database: mysql
    properties:
      hibernate:
        enable_lazy_load_no_trans: true
DROP TABLE IF EXISTS `items`;
CREATE TABLE `items` (
  `id` varchar(255) NOT NULL,
  `counts` int(11) DEFAULT NULL,
  `name` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

-- ----------------------------
-- Records of items
-- ----------------------------
INSERT INTO `items` VALUES ('1', '10', 'Spark');
INSERT INTO `items` VALUES ('2', '6', 'Hadoop');
INSERT INTO `items` VALUES ('3', '2', 'Flink');

-- ----------------------------
-- Table structure for orders
-- ----------------------------
DROP TABLE IF EXISTS `orders`;
CREATE TABLE `orders` (
  `id` varchar(255) NOT NULL,
  `item_id` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

        
        
            org.springframework.boot
            spring-boot-starter-web
        

        
            org.springframework.boot
            spring-boot-starter-test
            test
        
        
        
            org.springframework.boot
            spring-boot-starter-data-jpa
        
        
        
            mysql
            mysql-connector-java
            5.1.47
        
        
            org.projectlombok
            lombok
            1.18.20
        
        
            junit
            junit
            4.12
            test
        
        
        
            org.apache.commons
            commons-lang3
            3.8.1
        
        
        
            org.apache.zookeeper
            zookeeper
            3.4.10
        
        
        
            org.apache.curator
            curator-framework
            2.13.0
        
        
            org.apache.curator
            curator-recipes
            2.13.0
        
    
一、domain层实体构建 1. Items .java
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Table;

@Entity//加上这两个注解后会自动在数据库生成两张表
@Table
public class Items {
    @Id
    private String id;
    private String name;
    private Integer counts;

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Integer getCounts() {
        return counts;
    }

    public void setCounts(Integer counts) {
        this.counts = counts;
    }
2. Orders.java
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Table;

@Entity
@Table
public class Orders {

    @Id//指定该属性是主键
    private String id;
    private String itemId;

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getItemId() {
        return itemId;
    }

    public void setItemId(String itemId) {
        this.itemId = itemId;
    }
}

创建完这两个实体后就可以运行一下这个sb项目,会自动在mysql数据库里创建两张表,但是数据还是需要自己来插入

二、dao层和service层构建 1. dao层 ItemsDAO.java和OrdersDAO.java
import com.interview.domain.Items;
import org.springframework.data.jpa.repository.JpaRepository;

public interface ItemsDAO extends JpaRepository {

}


import com.interview.domain.Orders;
import org.springframework.data.jpa.repository.JpaRepository;

public interface OrdersDAO extends JpaRepository {//第一个是泛型,对应的是数据库里的实体,第二个是表对应的主键的类型
}
2.service层 2.1 ItemsService.java
import com.interview.dao.ItemsDAO;
import com.interview.domain.Items;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.UUID;

@Service//service层要加这个注解
public class ItemsService {

    @Autowired
    private ItemsDAO itemsDAO;

    public Items getItem(String itemId){
        return itemsDAO.getById(itemId);
    }

    public void save(Items items){
        items.setId(UUID.randomUUID().toString());
        itemsDAO.save(items);
    }

    //根据itemId获取库存量
    public int getItemCounts(String itemId){
        return itemsDAO.getById(itemId).getCounts();
    }

    //调整库存
    public void reduceCount(String itemId,int count){
        Items items = getItem(itemId);
        items.setCounts(items.getCounts() - count);
        itemsDAO.save(items);
    }
}
2.2 OrdersService.java
import com.interview.dao.OrdersDAO;
import com.interview.domain.Orders;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.UUID;

@Service
@Slf4j
public class OrdersService {

    @Autowired
    private OrdersDAO ordersDAO;

    public boolean save(String itemId){
        try{
            Orders orders = new Orders();
            orders.setId(UUID.randomUUID().toString());
            orders.setItemId(itemId);
            ordersDAO.save(orders);
            log.info("订单创建成功.................");
            return true;
        }catch (Exception e){
            e.printStackTrace();
            return false;
        }
    }
}
2.3 PayService.java

说明:这个类调用了上面两个ItemsService和OrdersService.并且该类下面有两个方法,一个是原始的不安全的buy方法,一个是加上锁逻辑的buyWithDisLock方法,逻辑是完全一样的,只是有没有锁的区别.

import com.interview.utils.DistributedLock;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class PayService {

    @Autowired
    private OrdersService ordersService;
    @Autowired
    private ItemsService itemsService;
    
    @Autowired
    private DistributedLock distributedLock;

    public boolean buy(String itemId){
        //假设每次购买9个
        int buyCount=9;

        //第一步:判断库存数量
        int counts = itemsService.getItemCounts(itemId);
        if(counts 
三、controller层的构建及不安全场景的模拟 
1. BuyController.java 
import com.interview.service.PayService;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class BuyController {
    @Autowired
    private PayService payService;

    @GetMapping("/buy1")//路径
    @ResponseBody
    public String buyA(String itemId){
        if(StringUtils.isNotBlank(itemId)){
            if(payService.buyWithDisLock(itemId)){
                return "订单创建成功";
            }else {
                return "订单创建失败";
            }
        }else {
            return "itemId不能为空";
        }
    }

    @GetMapping("/buy2")//路径
    @ResponseBody
    public String buyB(String itemId){
        if(StringUtils.isNotBlank(itemId)){
            if(payService.buyWithDisLock(itemId)){
                return "订单创建成功";
            }else {
                return "订单创建失败";
            }
        }else {
            return "itemId不能为空";
        }
    }
}
2. 启动PayApplication.java并在浏览器内输入网址模拟高并发购买场景
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class PayApplication {
    //在浏览器同时输入
    // http://localhost:9999/pay/buy1?itemId=1
    // http://localhost:9999/pay/buy2?itemId=1
    //结果发现都会创建订单成功,但是库存只有10个,同时满足两个订单一共18个商品是不可能的,即数据库商品剩下1个库存,一个真成功,一个假成功
    //同样的一个业务经过两个不同的请求进来就出现了高并发安全场景
    //我们怎么解决?接下来使用zookeeper的分布式锁
    //详见utils包下的DistributedLock
    public static void main(String[] args) {
        SpringApplication.run(PayApplication.class, args);
    }
    //在PayService里的buyWithDisLock方法用分布式锁改进了原来的buy方法,然后在BuyController使用上这个buyWithDisLock方法就行
}

四、解决(要开启zookeeper服务) 1. DistributedLock.java
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.Curatorframework;
import org.apache.curator.framework.CuratorframeworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.CountDownLatch;


@Configuration//因为我们要spring管理上这个类就加上这个Configuration注解,因为PayService里需要调用这个类
@Slf4j
public class DistributedLock {
    private Curatorframework client = null;
    private static final String ZK_LOCK = "pk-zk-locks";//分布式锁的总结点存在哪里
    private static final String DISTRIBUTED_LOCK = "pk-distributed-lock";//临时结点
    private CountDownLatch countDownLatch=new CountDownLatch(1);

    public DistributedLock() {
        client = CuratorframeworkFactory.builder()
                .connectString("127.0.0.1:2181")
                .sessionTimeoutMs(10000)//session的超时时间
                .retryPolicy(new ExponentialBackoffRetry(1000, 5))//重试的策略
                .namespace("zk-namespace")//命名空间
                .build();//进入zk客户端ls /      会有zk-namespace结点出现

        client.start();//启动zk客户端
    }

    @Bean
    public Curatorframework getClient() {
        client = client.usingNamespace("zk-namespace");//从这个命名空间拿到这个client
        try {
            if (client.checkExists().forPath("/" + ZK_LOCK) == null) {//判断这个结点是否已经创建,没有就创建,有就算了
                client.create()
                        .creatingParentContainersIfNeeded()
                        .withMode(CreateMode.PERSISTENT)
                        .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE);
            }
            //监听一下这个结点下的变化情况
            listenToNode("/"+ZK_LOCK);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return client;
    }

    private void listenToNode(String path) throws Exception {//监听某个结点的方法
        PathChildrenCache cache = new PathChildrenCache(client, path, true);
        cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
        cache.getListenable().addListener(new PathChildrenCacheListener() {
            @Override
            public void childEvent(Curatorframework client, PathChildrenCacheEvent event) throws Exception {
                if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)){//如果结点有移除了我们要拿到这个监听
                    String path = event.getData().getPath();
                    //如果移除的结点里面有我们定义的临时结点pk-distributed-lock,就把等待的这个状态去掉
                    if(path.contains(DISTRIBUTED_LOCK)){//意思就是这个锁结点已经移除掉了,可以不用等待了
                        //这里我们又引入了一个等待的概念
                        countDownLatch.countDown();//我们用countDownLatch来实现这个功能
                    }
                }
            }
        });
    }

    public void getLock() {//获得分布式锁,就是去创建一个临时结点
        while (true){//通过客户端去创建一个临时结点
            try {
                client.create()
                        .creatingParentContainersIfNeeded()
                        .withMode(CreateMode.EPHEMERAL)
                        .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                        .forPath("/"+ZK_LOCK+"/"+DISTRIBUTED_LOCK);

                log.info("分布式锁获得成功.....................");
            } catch (Exception e) {
                e.printStackTrace();

                if(countDownLatch.getCount()<=0){
                    countDownLatch = new CountDownLatch(1);
                }
                try {
                    countDownLatch.await();
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
            }
            return;
        }
    }

    //释放分布式锁:订单创建成功或者异常的时候释放锁
    public boolean releaseLock() {//就是把zk里的结点删掉
        try {
            if(client.checkExists().forPath("/"+ZK_LOCK+"/"+DISTRIBUTED_LOCK)!=null){//如果该结点不为空直接删掉
                client.delete().forPath("/"+ZK_LOCK+"/"+DISTRIBUTED_LOCK);//删除该临时结点
            }
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
        log.info("分布式锁释放成功.....................");
        return true;
    }
}
2. 编写完后修改一下

3. 测试时记得要将数据库里的数量重新修改为10


转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/746346.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号