事务的相关概念就不介绍了,本篇博客先给大家演示本地事务与分布式事务,然后介绍Seata的基本使用,以后会介绍Seata的集群、注册中心、配置中心以及各种事务模式。
- 版本说明
一个父module和一个子module,父module用于管理依赖版本,子module用于演示本地事务与分布式事务(通过一个简易版的用户购买商品案例来演示),最后还需要集成Seata分布式事务解决方案。
父module的pom.xml:
seata module4.0.0 com.kaven alibaba pom 1.0-SNAPSHOT Spring Cloud Alibaba seata 8 8 Hoxton.SR9 2.2.6.RELEASE org.springframework.boot spring-boot-starter-parent 2.3.2.RELEASE org.springframework.cloud spring-cloud-dependencies ${spring-cloud-version} pom import com.alibaba.cloud spring-cloud-alibaba-dependencies ${spring-cloud-alibaba-version} pom import
seata module通过一个简易版的用户购买商品案例来演示本地事务与分布式事务,seata module的结构如下图所示:
pom.xml:
com.kaven alibaba 1.0-SNAPSHOT 4.0.0 seata 8 8 org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-data-jpa mysql mysql-connector-java 8.0.26 org.projectlombok lombok
application.yml:
server:
port: 9000
spring:
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
username: root
password: ITkaven@666
url: jdbc:mysql://localhost:3306/seata?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=Asia/Shanghai
jpa:
show-sql: true
Entity
Order实体类:
package com.kaven.alibaba.entity;
import lombok.Data;
import javax.persistence.*;
@Entity
@Table(name = "`order`") // 不能将`order`改成order(Mysql数据库关键字),不然与数据库交互时会出错
@Data
public class Order {
// 订单id
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
public Integer id;
// 用户id
public Integer userId;
// 商品id
public Integer productId;
// 商品购买数量
public Integer count;
// 订单金额
public Integer money;
}
Storage实体类:
package com.kaven.alibaba.entity;
import lombok.Data;
import javax.persistence.*;
@Entity
@Table(name = "storage")
@Data
public class Storage {
// 商品id
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
public Integer id;
// 库存
public Integer count;
}
User实体类:
package com.kaven.alibaba.entity;
import lombok.Data;
import javax.persistence.*;
@Entity
@Table(name = "user")
@Data
public class User {
// 用户id
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
public Integer id;
// 用户余额
public Integer money;
}
Repository
OrderRepository:
package com.kaven.alibaba.repository; import com.kaven.alibaba.entity.Order; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.stereotype.Repository; @Repository public interface OrderRepository extends JpaRepository{}
StorageRepository:
package com.kaven.alibaba.repository; import com.kaven.alibaba.entity.Storage; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.stereotype.Repository; @Repository public interface StorageRepository extends JpaRepository{}
UserRepository:
package com.kaven.alibaba.repository; import com.kaven.alibaba.entity.User; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.stereotype.Repository; @Repository public interface UserRepository extends JpaRepositoryService{}
IOrderService:
package com.kaven.alibaba.service;
public interface IOrderService {
void create(int userId, int productId, int count, int money);
}
OrderServiceImpl:
package com.kaven.alibaba.service.impl;
import com.kaven.alibaba.entity.Order;
import com.kaven.alibaba.repository.OrderRepository;
import com.kaven.alibaba.service.IOrderService;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service
public class OrderServiceImpl implements IOrderService {
@Resource
private OrderRepository orderRepository;
@Override
public void create(int userId, int productId, int count, int money) {
// 生成订单
Order order = new Order();
order.setUserId(userId);
order.setProductId(productId);
order.setCount(count);
order.setMoney(money);
orderRepository.save(order);
}
}
IStorageService:
package com.kaven.alibaba.service;
public interface IStorageService {
void deduct(int productId, int count);
}
StorageServiceImpl:
package com.kaven.alibaba.service.impl;
import com.kaven.alibaba.entity.Storage;
import com.kaven.alibaba.repository.StorageRepository;
import com.kaven.alibaba.service.IStorageService;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Optional;
@Service
public class StorageServiceImpl implements IStorageService {
@Resource
private StorageRepository storageRepository;
@Override
public void deduct(int productId, int count) {
Optional byId = storageRepository.findById(productId);
if(byId.isPresent()) {
Storage storage = byId.get();
if(storage.getCount() >= count) {
// 减库存
storage.setCount(storage.getCount() - count);
storageRepository.save(storage);
}
else {
throw new RuntimeException("该商品库存不足!");
}
}
else {
throw new RuntimeException("该商品不存在!");
}
}
}
IUserService:
package com.kaven.alibaba.service;
public interface IUserService {
void debit(int userId, int money);
}
UserServiceImpl:
package com.kaven.alibaba.service.impl;
import com.kaven.alibaba.entity.User;
import com.kaven.alibaba.repository.UserRepository;
import com.kaven.alibaba.service.IUserService;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Optional;
@Service
public class UserServiceImpl implements IUserService {
@Resource
private UserRepository userRepository;
@Override
public void debit(int userId, int money) {
Optional byId = userRepository.findById(userId);
if(byId.isPresent()) {
User user = byId.get();
if(user.getMoney() >= money) {
// 减余额
user.setMoney(user.getMoney() - money);
userRepository.save(user);
}
else {
throw new RuntimeException("该用户余额不足!");
}
}
else {
throw new RuntimeException("没有该用户!");
}
}
}
IBuyService:
package com.kaven.alibaba.service;
public interface IBuyService {
void buy(int userId, int productId, int count);
}
package com.kaven.alibaba.service.impl;
import com.kaven.alibaba.service.IBuyService;
import com.kaven.alibaba.service.IOrderService;
import com.kaven.alibaba.service.IStorageService;
import com.kaven.alibaba.service.IUserService;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service
public class BuyServiceImpl implements IBuyService {
@Resource
private IOrderService orderService;
@Resource
private IStorageService storageService;
@Resource
private IUserService userService;
@Override
public void buy(int userId, int productId, int count) {
int money = count;
// 生成订单
orderService.create(userId, productId, count, money);
// 减库存
storageService.deduct(productId, count);
// 减余额
userService.debit(userId, money);
}
}
Controller
BuyController:
package com.kaven.alibaba.controller;
import com.kaven.alibaba.service.IBuyService;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
public class BuyController {
@Resource
private IBuyService buyService;
@PostMapping("/buy")
public String buy(@RequestParam("userId") Integer userId,
@RequestParam("productId") Integer productId,
@RequestParam("count") Integer count) {
buyService.buy(userId, productId, count);
return "success";
}
}
启动类:
package com.kaven.alibaba;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class);
}
}
数据库
进入数据库中,执行下面的sql(为了简单,没有创建相关索引)。
CREATE DATAbase seata;
USE seata;
DROp TABLE IF EXISTS `storage`;
CREATE TABLE `storage` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`count` int(11) DEFAULT 0,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
DROP TABLE IF EXISTS `order`;
CREATE TABLE `order` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`user_id` int(11) DEFAULT NULL,
`product_id` int(11) DEFAULT NULL,
`count` int(11) DEFAULT 0,
`money` int(11) DEFAULT 0,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
DROP TABLE IF EXISTS `user`;
CREATE TABLE `user` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`money` int(11) DEFAULT 0,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
插入几条数据,如下图所示:
启动应用。
使用Postman向应用发送请求。
很显然用户的金额不够。
商品的库存也被扣减了。
订单已经创建了。
减用户余额的操作由于没有被执行,因此用户余额不变,这就导致了数据不一致的问题。
这种只在一个服务下的事务比较好解决,可以使用Spring的@Transactional注解。
package com.kaven.alibaba.service.impl;
import com.kaven.alibaba.service.IBuyService;
import com.kaven.alibaba.service.IOrderService;
import com.kaven.alibaba.service.IStorageService;
import com.kaven.alibaba.service.IUserService;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
@Service
public class BuyServiceImpl implements IBuyService {
@Resource
private IOrderService orderService;
@Resource
private IStorageService storageService;
@Resource
private IUserService userService;
@Override
@Transactional
public void buy(int userId, int productId, int count) {
int money = count;
// 生成订单
orderService.create(userId, productId, count, money);
// 减库存
storageService.deduct(productId, count);
// 减余额
userService.debit(userId, money);
}
}
重启应用,将数据库中的数据手动回滚。再使用Postman向应用发送请求。
异常链信息和之前不一样了,其实就是@Transactional注解的作用,该注解的原理博主以后会在Spring源码阅读系列中进行介绍。
此时数据是一致的。
现在将每个操作变成一个接口来模拟分布式环境。
ControllerOrderController:
package com.kaven.alibaba.controller;
import com.kaven.alibaba.service.IOrderService;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
@RequestMapping("/order")
public class OrderController {
@Resource
private IOrderService orderService;
@PostMapping("/create")
public void create(@RequestParam("userId") Integer userId,
@RequestParam("productId") Integer productId,
@RequestParam("count") Integer count,
@RequestParam("money") Integer money) {
orderService.create(userId, productId, count, money);
}
}
StorageController:
package com.kaven.alibaba.controller;
import com.kaven.alibaba.service.IStorageService;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
@RequestMapping("/storage")
public class StorageController {
@Resource
private IStorageService storageService;
@PostMapping("/deduct")
public void deduct(@RequestParam("productId") Integer productId,
@RequestParam("count") Integer count) {
storageService.deduct(productId, count);
}
}
UserController:
package com.kaven.alibaba.controller;
import com.kaven.alibaba.service.IUserService;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
@RequestMapping("/user")
public class UserController {
@Resource
private IUserService userService;
@PostMapping("/debit")
public void debit(@RequestParam("userId") Integer userId,
@RequestParam("money") Integer money) {
userService.debit(userId, money);
}
}
修改BuyController:
package com.kaven.alibaba.controller;
import org.springframework.util.linkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;
import org.springframework.web.util.UriComponentsBuilder;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
@RestController
public class BuyController {
@Resource
private RestTemplate restTemplate;
@PostMapping("/buy")
@Transactional
public String buy(@RequestParam("userId") Integer userId,
@RequestParam("productId") Integer productId,
@RequestParam("count") Integer count) {
// 请求参数
MultiValueMap queryParams = new linkedMultiValueMap<>();
queryParams.add("userId", userId.toString());
queryParams.add("productId", productId.toString());
queryParams.add("count", count.toString());
queryParams.add("money", count.toString());
// 构造请求
UriComponentsBuilder builder = UriComponentsBuilder.fromHttpUrl("http://localhost:9000/order/create").queryParams(queryParams);
restTemplate.postForObject(builder.toUriString(), null, Void.class);
// 构造请求
builder = UriComponentsBuilder.fromHttpUrl("http://localhost:9000/storage/deduct").queryParams(queryParams);
restTemplate.postForObject(builder.toUriString(), null, Void.class);
// 构造请求
builder = UriComponentsBuilder.fromHttpUrl("http://localhost:9000/user/debit").queryParams(queryParams);
restTemplate.postForObject(builder.toUriString(), null, Void.class);
return "success";
}
}
重启应用,再使用Postman向应用发送请求。
数据不一致。
分布式环境下使用@Transactional注解就不会起作用了(为了简单,这里只是模拟分布式环境下的用户购买商品案例),接下来就需要使用Seata来解决该问题。
Seata是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata将为用户提供了 AT、TCC、SAGA和XA事务模式,为用户打造一站式的分布式解决方案(这些关于Seata的介绍来自官网)。
- TC (Transaction Coordinator,事务协调者):维护全局和分支事务的状态,驱动全局事务提交或回滚。
- TM (Transaction Manager, 事务管理器):定义全局事务的范围,开始全局事务、提交或回滚全局事务。
- RM (Resource Manager,资源管理器):管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。
很显然TM和RM是在业务代码中的,而TC是一个独立的应用,由Seata提供。
- 下载地址
博主下载的是1.3.0版本(要和Spring Cloud Alibaba版本兼容)。
简单使用不需要修改配置(默认是基于本地file的形式,为了简单就使用默认形式,分布式环境下需要使用其他形式,其他形式博主以后会介绍),启动Seata:
C:UsersDell>f: F:>cd F:toolsseata-server-1.3.0seatabin F:toolsseata-server-1.3.0seatabin>dir 驱动器 F 中的卷是 WorkSpace 卷的序列号是 D671-D29B F:toolsseata-server-1.3.0seatabin 的目录 2021/12/29 10:50. 2020/07/16 00:35 .. 2020/07/16 00:35 3,648 seata-server.bat 2020/07/16 00:35 4,175 seata-server.sh 2021/12/29 10:51 sessionStore 2 个文件 7,823 字节 3 个目录 241,159,860,224 可用字节 F:toolsseata-server-1.3.0seatabin>seata-server.bat -p 8080
修改配置文件:
server:
port: 9000
spring:
application:
name: seata
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
username: root
password: ITkaven@666
url: jdbc:mysql://localhost:3306/seata?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=Asia/Shanghai
jpa:
show-sql: true
seata:
application-id: buy
tx-service-group: kaven_seata_tx_group
service:
vgroup-mapping:
kaven_seata_tx_group: default
grouplist:
- "127.0.0.1:8080"
file.conf(这里的service相关配置要和application.yml配置文件中的一致):
transport {
# tcp udt unix-domain-socket
type = "TCP"
#NIO NATIVE
server = "NIO"
#enable heartbeat
heartbeat = true
# the client batch send request enable
enableClientBatchSendRequest = true
#thread factory for netty
threadFactory {
bossThreadPrefix = "NettyBoss"
workerThreadPrefix = "NettyServerNIOWorker"
serverExecutorThread-prefix = "NettyServerBizHandler"
shareBossWorker = false
clientSelectorThreadPrefix = "NettyClientSelector"
clientSelectorThreadSize = 1
clientWorkerThreadPrefix = "NettyClientWorkerThread"
# netty boss thread size,will not be used for UDT
bossThreadSize = 1
#auto default pin or 8
workerThreadSize = "default"
}
shutdown {
# when destroy server, wait seconds
wait = 3
}
serialization = "seata"
compressor = "none"
}
service {
#transaction service group mapping
vgroupMapping.kaven_seata_tx_group = "default"
#only support when registry.type=file, please don't set multiple addresses
default.grouplist = "127.0.0.1:8080"
#degrade, current not support
enableDegrade = false
#disable seata
disableGlobalTransaction = false
}
client {
rm {
asyncCommitBufferLimit = 10000
lock {
retryInterval = 10
retryTimes = 30
retryPolicyBranchRollbackonConflict = true
}
reportRetryCount = 5
tablemetaCheckEnable = false
reportSuccessEnable = false
}
tm {
commitRetryCount = 5
rollbackRetryCount = 5
}
undo {
dataValidation = true
logSerialization = "jackson"
logTable = "undo_log"
}
log {
exceptionRate = 100
}
}
registry.conf:
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "file"
file {
name = "file.conf"
}
}
config {
# file、nacos 、apollo、zk、consul、etcd3
type = "file"
file {
name = "file.conf"
}
}
给buy接口加上@GlobalTransactional注解:
@PostMapping("/buy")
@GlobalTransactional
public String buy(@RequestParam("userId") Integer userId,
@RequestParam("productId") Integer productId,
@RequestParam("count") Integer count) {
// 请求参数
MultiValueMap queryParams = new linkedMultiValueMap<>();
queryParams.add("userId", userId.toString());
queryParams.add("productId", productId.toString());
queryParams.add("count", count.toString());
queryParams.add("money", count.toString());
// 构造请求
UriComponentsBuilder builder = UriComponentsBuilder.fromHttpUrl("http://localhost:9000/order/create").queryParams(queryParams);
restTemplate.postForObject(builder.toUriString(), null, Void.class);
// 构造请求
builder = UriComponentsBuilder.fromHttpUrl("http://localhost:9000/storage/deduct").queryParams(queryParams);
restTemplate.postForObject(builder.toUriString(), null, Void.class);
// 构造请求
builder = UriComponentsBuilder.fromHttpUrl("http://localhost:9000/user/debit").queryParams(queryParams);
restTemplate.postForObject(builder.toUriString(), null, Void.class);
return "success";
}
在数据库中创建undo_log表。
USE seata;
DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
启动应用。
手动回滚数据库中的数据,再使用Postman进行测试。
客户端的日志:
TC的日志:
数据库中的数据也是一致的。
通过这些日志信息也能看出来Seata分布式事务解决方案应用成功了。本地事务与分布式事务演示以及Seata的使用就介绍到这里,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。



