application.yaml、数据库、pom.xml准备描述:模拟高并发下单数据不安全的场景并用zk的结点解决
server:
port: 9999
servlet:
context-path: /pay
spring:
datasource:
driver-class-name: com.mysql.jdbc.Driver
username: root
password: chickenkang.
url: jdbc:mysql://localhost:3306/interview
jpa:
database: mysql
properties:
hibernate:
enable_lazy_load_no_trans: true
DROP TABLE IF EXISTS `items`;
CREATE TABLE `items` (
`id` varchar(255) NOT NULL,
`counts` int(11) DEFAULT NULL,
`name` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- ----------------------------
-- Records of items
-- ----------------------------
INSERT INTO `items` VALUES ('1', '10', 'Spark');
INSERT INTO `items` VALUES ('2', '6', 'Hadoop');
INSERT INTO `items` VALUES ('3', '2', 'Flink');
-- ----------------------------
-- Table structure for orders
-- ----------------------------
DROP TABLE IF EXISTS `orders`;
CREATE TABLE `orders` (
`id` varchar(255) NOT NULL,
`item_id` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
一、domain层实体构建 1. Items .javaorg.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-test test org.springframework.boot spring-boot-starter-data-jpa mysql mysql-connector-java 5.1.47 org.projectlombok lombok 1.18.20 junit junit 4.12 test org.apache.commons commons-lang3 3.8.1 org.apache.zookeeper zookeeper 3.4.10 org.apache.curator curator-framework 2.13.0 org.apache.curator curator-recipes 2.13.0
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Table;
@Entity//加上这两个注解后会自动在数据库生成两张表
@Table
public class Items {
@Id
private String id;
private String name;
private Integer counts;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Integer getCounts() {
return counts;
}
public void setCounts(Integer counts) {
this.counts = counts;
}
2. Orders.java
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Table;
@Entity
@Table
public class Orders {
@Id//指定该属性是主键
private String id;
private String itemId;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getItemId() {
return itemId;
}
public void setItemId(String itemId) {
this.itemId = itemId;
}
}
创建完这两个实体后就可以运行一下这个sb项目,会自动在mysql数据库里创建两张表,但是数据还是需要自己来插入
二、dao层和service层构建 1. dao层 ItemsDAO.java和OrdersDAO.javaimport com.interview.domain.Items; import org.springframework.data.jpa.repository.JpaRepository; public interface ItemsDAO extends JpaRepository2.service层 2.1 ItemsService.java{ } import com.interview.domain.Orders; import org.springframework.data.jpa.repository.JpaRepository; public interface OrdersDAO extends JpaRepository {//第一个是泛型,对应的是数据库里的实体,第二个是表对应的主键的类型 }
import com.interview.dao.ItemsDAO;
import com.interview.domain.Items;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.UUID;
@Service//service层要加这个注解
public class ItemsService {
@Autowired
private ItemsDAO itemsDAO;
public Items getItem(String itemId){
return itemsDAO.getById(itemId);
}
public void save(Items items){
items.setId(UUID.randomUUID().toString());
itemsDAO.save(items);
}
//根据itemId获取库存量
public int getItemCounts(String itemId){
return itemsDAO.getById(itemId).getCounts();
}
//调整库存
public void reduceCount(String itemId,int count){
Items items = getItem(itemId);
items.setCounts(items.getCounts() - count);
itemsDAO.save(items);
}
}
2.2 OrdersService.java
import com.interview.dao.OrdersDAO;
import com.interview.domain.Orders;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.UUID;
@Service
@Slf4j
public class OrdersService {
@Autowired
private OrdersDAO ordersDAO;
public boolean save(String itemId){
try{
Orders orders = new Orders();
orders.setId(UUID.randomUUID().toString());
orders.setItemId(itemId);
ordersDAO.save(orders);
log.info("订单创建成功.................");
return true;
}catch (Exception e){
e.printStackTrace();
return false;
}
}
}
2.3 PayService.java
说明:这个类调用了上面两个ItemsService和OrdersService.并且该类下面有两个方法,一个是原始的不安全的buy方法,一个是加上锁逻辑的buyWithDisLock方法,逻辑是完全一样的,只是有没有锁的区别.
import com.interview.utils.DistributedLock;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class PayService {
@Autowired
private OrdersService ordersService;
@Autowired
private ItemsService itemsService;
@Autowired
private DistributedLock distributedLock;
public boolean buy(String itemId){
//假设每次购买9个
int buyCount=9;
//第一步:判断库存数量
int counts = itemsService.getItemCounts(itemId);
if(counts
三、controller层的构建及不安全场景的模拟
1. BuyController.java
import com.interview.service.PayService;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class BuyController {
@Autowired
private PayService payService;
@GetMapping("/buy1")//路径
@ResponseBody
public String buyA(String itemId){
if(StringUtils.isNotBlank(itemId)){
if(payService.buyWithDisLock(itemId)){
return "订单创建成功";
}else {
return "订单创建失败";
}
}else {
return "itemId不能为空";
}
}
@GetMapping("/buy2")//路径
@ResponseBody
public String buyB(String itemId){
if(StringUtils.isNotBlank(itemId)){
if(payService.buyWithDisLock(itemId)){
return "订单创建成功";
}else {
return "订单创建失败";
}
}else {
return "itemId不能为空";
}
}
}
2. 启动PayApplication.java并在浏览器内输入网址模拟高并发购买场景
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class PayApplication {
//在浏览器同时输入
// http://localhost:9999/pay/buy1?itemId=1
// http://localhost:9999/pay/buy2?itemId=1
//结果发现都会创建订单成功,但是库存只有10个,同时满足两个订单一共18个商品是不可能的,即数据库商品剩下1个库存,一个真成功,一个假成功
//同样的一个业务经过两个不同的请求进来就出现了高并发安全场景
//我们怎么解决?接下来使用zookeeper的分布式锁
//详见utils包下的DistributedLock
public static void main(String[] args) {
SpringApplication.run(PayApplication.class, args);
}
//在PayService里的buyWithDisLock方法用分布式锁改进了原来的buy方法,然后在BuyController使用上这个buyWithDisLock方法就行
}
四、解决(要开启zookeeper服务)
1. DistributedLock.java
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.Curatorframework;
import org.apache.curator.framework.CuratorframeworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.CountDownLatch;
@Configuration//因为我们要spring管理上这个类就加上这个Configuration注解,因为PayService里需要调用这个类
@Slf4j
public class DistributedLock {
private Curatorframework client = null;
private static final String ZK_LOCK = "pk-zk-locks";//分布式锁的总结点存在哪里
private static final String DISTRIBUTED_LOCK = "pk-distributed-lock";//临时结点
private CountDownLatch countDownLatch=new CountDownLatch(1);
public DistributedLock() {
client = CuratorframeworkFactory.builder()
.connectString("127.0.0.1:2181")
.sessionTimeoutMs(10000)//session的超时时间
.retryPolicy(new ExponentialBackoffRetry(1000, 5))//重试的策略
.namespace("zk-namespace")//命名空间
.build();//进入zk客户端ls / 会有zk-namespace结点出现
client.start();//启动zk客户端
}
@Bean
public Curatorframework getClient() {
client = client.usingNamespace("zk-namespace");//从这个命名空间拿到这个client
try {
if (client.checkExists().forPath("/" + ZK_LOCK) == null) {//判断这个结点是否已经创建,没有就创建,有就算了
client.create()
.creatingParentContainersIfNeeded()
.withMode(CreateMode.PERSISTENT)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE);
}
//监听一下这个结点下的变化情况
listenToNode("/"+ZK_LOCK);
} catch (Exception e) {
e.printStackTrace();
}
return client;
}
private void listenToNode(String path) throws Exception {//监听某个结点的方法
PathChildrenCache cache = new PathChildrenCache(client, path, true);
cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
cache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(Curatorframework client, PathChildrenCacheEvent event) throws Exception {
if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)){//如果结点有移除了我们要拿到这个监听
String path = event.getData().getPath();
//如果移除的结点里面有我们定义的临时结点pk-distributed-lock,就把等待的这个状态去掉
if(path.contains(DISTRIBUTED_LOCK)){//意思就是这个锁结点已经移除掉了,可以不用等待了
//这里我们又引入了一个等待的概念
countDownLatch.countDown();//我们用countDownLatch来实现这个功能
}
}
}
});
}
public void getLock() {//获得分布式锁,就是去创建一个临时结点
while (true){//通过客户端去创建一个临时结点
try {
client.create()
.creatingParentContainersIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath("/"+ZK_LOCK+"/"+DISTRIBUTED_LOCK);
log.info("分布式锁获得成功.....................");
} catch (Exception e) {
e.printStackTrace();
if(countDownLatch.getCount()<=0){
countDownLatch = new CountDownLatch(1);
}
try {
countDownLatch.await();
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
return;
}
}
//释放分布式锁:订单创建成功或者异常的时候释放锁
public boolean releaseLock() {//就是把zk里的结点删掉
try {
if(client.checkExists().forPath("/"+ZK_LOCK+"/"+DISTRIBUTED_LOCK)!=null){//如果该结点不为空直接删掉
client.delete().forPath("/"+ZK_LOCK+"/"+DISTRIBUTED_LOCK);//删除该临时结点
}
} catch (Exception e) {
e.printStackTrace();
return false;
}
log.info("分布式锁释放成功.....................");
return true;
}
}
2. 编写完后修改一下
3. 测试时记得要将数据库里的数量重新修改为10



