我们会在项目中创建三个服务,仓储服务,订单服务,帐户服务。当用户下单时,会在订单服务中创建一个订单,然后通过远程调用仓储服务来扣减下单商品的库存,再通过远程调用账户服务来扣减用户账户里面的余额。该操作跨越三个数据库,有两次远程调用,很明显会有分布式事务问题。
架构图1、我们需要新增三个数据库 seata_order、seata_account、seata_store 。
2、在三个库中新建表:
在seata_order库中新建order 表::
DROP TABLE IF EXISTS `order`; CREATE TABLE `order` ( `id` int(11) NOT NULL AUTO_INCREMENT, `user_id` varchar(255) DEFAULT NULL, `commodity_code` varchar(255) DEFAULT NULL, `count` int(11) DEFAULT 0, `money` int(11) DEFAULT 0, `status` INT(1) DEFAULT NULL COMMENT '订单状态:0:创建中;1:已完结' , PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
在seata_account库中新建account表:
DROP TABLE IF EXISTS `account`; CREATE TABLE `account` ( `id` int(11) NOT NULL AUTO_INCREMENT, `user_id` varchar(255) DEFAULT NULL, `money` int(11) DEFAULT 0, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; INSERT INTO `seata_account`.`account`(`id`, `user_id`, `money`) VALUES (1, '1', 1000);
在seata_store库中新增 storage表
DROP TABLE IF EXISTS `storage`; CREATE TABLE `storage` ( `id` int(11) NOT NULL AUTO_INCREMENT, `commodity_code` varchar(255) DEFAULT NULL, `count` int(11) DEFAULT 0, PRIMARY KEY (`id`), UNIQUE KEY (`commodity_code`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; INSERT INTO `seata_store`.`storage`(`id`, `commodity_code`, `count`) VALUES (1, '1', 100);
因为SEATA AT 模式需要 UNDO_LOG 表,这个需要在每一个业务库中新建表:
DROP TABLE IF EXISTS `undo_log`;
-- for AT mode you must to init this sql for you business database. the seata server not need it.
CREATE TABLE IF NOT EXISTS `undo_log`
(
`branch_id` BIGINT NOT NULL COMMENT 'branch transaction id',
`xid` VARCHAR(128) NOT NULL COMMENT 'global transaction id',
`context` VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
`rollback_info` LonGBLOB NOT NULL COMMENT 'rollback info',
`log_status` INT(11) NOT NULL COMMENT '0:normal status,1:defense status',
`log_created` DATETIME(6) NOT NULL COMMENT 'create datetime',
`log_modified` DATETIME(6) NOT NULL COMMENT 'modify datetime',
UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8mb4 COMMENT ='AT transaction mode undo table';
新建Module pcloud-alibaba-seata-order-service9001
修改pom文件:
pcloud-alibaba com.younger.pcloud.alibaba 1.0-SNAPSHOT 4.0.0 pcloud-alibaba-seata-order-service9001 pcloud-alibaba-seata-order-service9001 com.alibaba.cloud spring-cloud-starter-alibaba-nacos-discovery com.alibaba.cloud spring-cloud-starter-alibaba-seata seata-all io.seata io.seata seata-all 1.4.2 org.springframework.cloud spring-cloud-starter-openfeign org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-actuator mysql mysql-connector-java 5.1.37 com.alibaba druid-spring-boot-starter 1.1.10 org.mybatis.spring.boot mybatis-spring-boot-starter 2.0.0 org.springframework.boot spring-boot-starter-test test org.projectlombok lombok true
新增 application.yml文件:
server:
port: 9001
spring:
application:
name: seata-order-service
cloud:
alibaba:
seata:
#自定义事务组名称需要与seata-server中的对应
tx-service-group: my_test_tx_group
nacos:
discovery:
server-addr: localhost:8848
datasource:
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://localhost:3306/seata_order
username: root
password: mysql
feign:
hystrix:
enabled: false
logging:
level:
io:
seata: info
mybatis:
mapperLocations: classpath:mapper
private Integer status;
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getUserId() {
return userId;
}
public void setUserId(String userId) {
this.userId = userId;
}
public String getCommodityCode() {
return commodityCode;
}
public void setCommodityCode(String commodityCode) {
this.commodityCode = commodityCode;
}
public Integer getCount() {
return count;
}
public void setCount(Integer count) {
this.count = count;
}
public Integer getMoney() {
return money;
}
public void setMoney(Integer money) {
this.money = money;
}
public Integer getStatus() {
return status;
}
public void setStatus(Integer status) {
this.status = status;
}
}
新增 Result类:
package com.younger.pcloud.alibaba.domain; public class Result{ private Integer code; private String message; private T data; public Result() { } public Result(Integer code, String message, T data) { this.code = code; this.message = message; this.data = data; } public Result(Integer code, String message) { this(code,message,null); } public Integer getCode() { return code; } public void setCode(Integer code) { this.code = code; } public String getMessage() { return message; } public void setMessage(String message) { this.message = message; } public T getData() { return data; } public void setData(T data) { this.data = data; } }
新增config 包:
新增 MyBatisConfig类:
package com.younger.pcloud.alibaba.config;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
@Configuration
@ComponentScan({"com.younger.pcloud.alibaba.dao"})
public class MyBatisConfig {
}
新增dao 包:
新增 OrderDao 类:
package com.younger.pcloud.alibaba.dao;
import com.younger.pcloud.alibaba.domain.Order;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
@Mapper
public interface OrderDao {
void createOrder(Order order);
void update(@Param("userId") String userId, @Param("status") Integer status);
}
新增 mapper 包:
新增 OrderMapper.xml文件:
INSERT INTO `order` (`id`, `user_id`, `commodity_code`, `count`, `money`,`status`) VALUES (NULL, #{userId}, #{commodityCode}, #{count}, #{money},0); UPDATE `order` SET status = 1 WHERe user_id = #{userId} AND status = #{status};
新增service包:
新增 OrderService类:
package com.younger.pcloud.alibaba.service;
import com.younger.pcloud.alibaba.domain.Order;
public interface OrderService {
void createOrder(Order order);
}
新增fegin远程调用包:
新增 AccountService类:
package com.younger.pcloud.alibaba.service.fegin;
import com.younger.pcloud.alibaba.domain.Result;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
@FeignClient(value = "seata-account-service")
public interface AccountService {
@PostMapping("/account/decrease")
Result decrease(@RequestParam("userId") String userId, @RequestParam("money") Integer money);
}
新增StorageService类:
package com.younger.pcloud.alibaba.service.fegin;
import com.younger.pcloud.alibaba.domain.Result;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
@FeignClient(value = "seata-store-service")
public interface StorageService {
@PostMapping(value = "/storage/decrease")
Result decrease(@RequestParam("commodityCode") String commodityCode, @RequestParam("count") Integer count);
}
新增OrderService 实现类OrderServiceImpl :
package com.younger.pcloud.alibaba.service.impl;
import com.younger.pcloud.alibaba.dao.OrderDao;
import com.younger.pcloud.alibaba.domain.Order;
import com.younger.pcloud.alibaba.service.OrderService;
import com.younger.pcloud.alibaba.service.fegin.AccountService;
import com.younger.pcloud.alibaba.service.fegin.StorageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service
@Slf4j
public class OrderServiceImpl implements OrderService {
@Resource
private OrderDao orderDao;
@Resource
private StorageService storageService;
@Resource
private AccountService accountService;
@Override
public void createOrder(Order order) {
log.info("开始创建订单....");
//本应用创建订单
orderDao.createOrder(order);
//远程调用库存服务扣减库存
log.info("order-service------扣减库存开始");
storageService.decrease(order.getCommodityCode(),order.getCount());
log.info("order-service------扣减库存结束");
//远程调用账户服务扣减余额
log.info("order-service-------中扣减用户余额开始");
accountService.decrease(order.getUserId(),order.getMoney());
log.info("order-service-------中扣减余额结束");
//修改订单状态为已完成
log.info("order-service-------中修改订单状态开始");
orderDao.update(order.getUserId(),0);
log.info("order-service-------中修改订单状态结束");
log.info("订单完成!!!!!");
}
}
新增Controller 包:
新增 OrderController 类:
package com.younger.pcloud.alibaba.controller;
import com.younger.pcloud.alibaba.domain.Order;
import com.younger.pcloud.alibaba.domain.Result;
import com.younger.pcloud.alibaba.service.OrderService;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
public class OrderController {
@Resource
private OrderService orderService;
@PostMapping("/order/create")
public Result createOrder(@RequestBody Order order) {
orderService.createOrder(order);
return new Result(200,"订单创建成功");
}
}
新建modul pcloud-alibaba-seata-store-service9002
目录结构:
修改pom文件:
pcloud-alibaba com.younger.pcloud.alibaba 1.0-SNAPSHOT 4.0.0 pcloud-alibaba-seata-store-service9002 com.alibaba.cloud spring-cloud-starter-alibaba-nacos-discovery com.alibaba.cloud spring-cloud-starter-alibaba-seata seata-all io.seata io.seata seata-all 1.4.2 org.springframework.cloud spring-cloud-starter-openfeign org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-test test org.mybatis.spring.boot mybatis-spring-boot-starter 2.0.0 mysql mysql-connector-java 5.1.37 com.alibaba druid-spring-boot-starter 1.1.10 org.projectlombok lombok true
新增application.yml文件:
server:
port: 9002
spring:
application:
name: seata-store-service
cloud:
alibaba:
seata:
#自定义事务组名称需要与seata-server中的对应
tx-service-group: my_test_tx_group
nacos:
discovery:
server-addr: localhost:8848
datasource:
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://localhost:3306/seata_store
username: root
password: mysql
feign:
hystrix:
enabled: false
logging:
level:
io:
seata: info
mybatis:
mapperLocations: classpath:mapper
void decrease(@Param("commodityCode") String commodityCode, @Param("count") Integer count);
}
新增 StoreMapper.xml 文件:
UPDATE storage SET count = count - #{count} WHERe commodity_code = #{commodityCode}
新增 service 包:
新增 StoreService类:
package com.younger.pcloud.alibaba.service;
public interface StoreService {
void decrease(String commodityCode, Integer count);
}
新增 StoreService实现类 StoreServiceImpl类:
package com.younger.pcloud.alibaba.service.impl;
import com.younger.pcloud.alibaba.dao.StoreDao;
import com.younger.pcloud.alibaba.service.StoreService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service
@Slf4j
public class StoreServiceImpl implements StoreService {
@Resource
private StoreDao storeDao;
@Override
public void decrease(String commodityCode, Integer count) {
log.info("storage-service-------中扣减库存开始");
storeDao.decrease(commodityCode,count);
log.info("storage-service--------中扣减库存结束");
}
}
新增controller 包:
新增 StoreController 类:
package com.younger.pcloud.alibaba.controller;
import com.younger.pcloud.alibaba.domain.Result;
import com.younger.pcloud.alibaba.service.StoreService;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
public class StoreController {
@Resource
private StoreService storeService;
@RequestMapping("/storage/decrease")
public Result decrease(@RequestParam("commodityCode") String commodityCode, @RequestParam("count") Integer count) {
storeService.decrease(commodityCode, count);
return new Result(200,"库存扣减成功!");
}
}
新建modul pcloud-alibaba-seata-account-service9003
修改pom文件:
pcloud-alibaba com.younger.pcloud.alibaba 1.0-SNAPSHOT 4.0.0 pcloud-alibaba-seata-account-service9003 pcloud-alibaba-seata-account-service9003 com.alibaba.cloud spring-cloud-starter-alibaba-nacos-discovery com.alibaba.cloud spring-cloud-starter-alibaba-seata seata-all io.seata io.seata seata-all 1.4.2 org.springframework.cloud spring-cloud-starter-openfeign org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-test test org.mybatis.spring.boot mybatis-spring-boot-starter 2.0.0 mysql mysql-connector-java 5.1.37 com.alibaba druid-spring-boot-starter 1.1.10 org.projectlombok lombok true
新增 application.yml 文件:
server:
port: 9003
spring:
application:
name: seata-account-service
cloud:
alibaba:
seata:
#自定义事务组名称需要与seata-server中的对应
tx-service-group: my_test_tx_group
nacos:
discovery:
server-addr: localhost:8848
datasource:
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://localhost:3306/seata_account
username: root
password: mysql
feign:
hystrix:
enabled: false
logging:
level:
io:
seata: info
mybatis:
mapperLocations: classpath:mapper
void decrease(@Param("userId") String userId, @Param("money") Integer money);
}
新增 AccountMapper.xml 文件:
UPDATE account SET money = money - #{money} WHERe user_id = #{userId};
新增service 包:
新增 AccountService 类:
package com.younger.pcloud.alibaba.service;
import org.springframework.web.bind.annotation.RequestParam;
public interface AccountService {
void decrease(@RequestParam("userId") String userId, @RequestParam("money") Integer money);
}
新增 AccountService 的实现类AccountServiceImpl:
package com.younger.pcloud.alibaba.service.impl;
import com.younger.pcloud.alibaba.dao.AccountDao;
import com.younger.pcloud.alibaba.service.AccountService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service
@Slf4j
public class AccountServiceImpl implements AccountService {
@Resource
AccountDao accountDao;
@Override
public void decrease(String userId, Integer money) {
log.info("account-service--------中扣减账户余额开始");
accountDao.decrease(userId,money);
log.info("account-service-------中扣减账户余额结束");
}
}
新增 controller包:
新增 AccountController 类:
package com.younger.pcloud.alibaba.controller;
import com.younger.pcloud.alibaba.domain.Result;
import com.younger.pcloud.alibaba.service.AccountService;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
public class AccountController {
@Resource
private AccountService accountService;
@RequestMapping("/account/decrease")
public Result decrease(@RequestParam("userId") String userId, @RequestParam("money") Integer money){
accountService.decrease(userId,money);
return new Result(200,"账户余额扣减成功!");
}
}
启动项目测试:
通过日志,我们可以知道我们的事务已经注册成功了。
创建订单: http://localhost:9001/order/create
参数请求:
{
"id":1,
"userId":"1",
"commodityCode":"1",
"count":1,
"money":55
}
我们服务之间的调用已经成功了。
我们看下表的数据:
表里面的数据和我们预想的结果是一样的。
我们现在没有在我们的业务上面添加事务,我们测试一下超时会怎么样?
我们修改我们的AccountServiceImpl类:
可以看到我们在类上面添加了超时异常。
我们重启下项目测试:
测试:
此刻访问已经超时了。
我们看下数据库:
可以看到,我们在已经创建了订单,并且库存已经抵扣了,但是用户没有支付成功。这个就是我们常见的分布式事务的问题。
我们在我们的业务类上面添加我们的分布式注解:@GlobalTransactional
我们修改 OrderServiceImpl类:
只需要在我们的业务类上添加一个 @GlobalTransactional注解即可,这个name是我们自己定义的,但是需要全局唯一。
我们重新启动项目看下:
测试:
执行也是超时的,我们在看下数据库:
我们看到订单和库存都没有记录。说明我们的分布式事务已经成功了。
总结一下分布式事务的执行流程:
1、TM 开启分布式事务(TM 向 TC 注册全局事务记录)。
2、RM 向 TC 汇报资源准备状态。
3、TM 通知 TC 提交/回滚分布式事务。
4、TC 汇总事务信息,决定分布式事务是提交还是回滚。
5、TC 通知所有 RM 提交/回滚 资源,事务二阶段结束。



