栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

微服务-seata分布式事务

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

微服务-seata分布式事务

1. 分布式事务问题

在单机单库的时候, 有本地事务可以保证数据的一致性; 但在分布式架构中, 原来的单体应用被拆分成多个微服务应用且都有各自独立的数据源, 一个业务场景的完成可能需要不同微服务模块和不同的库来完成. 此时每个微服务模块内部的数据一致性由本地事务来保证, 但是全局的数据一致性问题没有办法协调保证.
比如: 用户购买商品的业务逻辑, 需要由3个微服务提供支持.

仓储服务: 对给定的商品扣除仓储数量.订单服务: 根据采购需求创建订单.账户服务: 从用户账户中扣除余额.

架构图:

一句话: 一次业务操作需要跨多个数据源或者跨多个系统进行远程调用, 就会产生分布式事务问题.

2. Seata简介 2.1 是什么

Seata是一款开源的分布式事务解决方案, 致力于在微服务架构下提供高性能和简单易用的分布式事务服务.
→Seata官网

2.2 能干什么

一个典型的分布式事务过程: 全局事务ID + 三组件模型

Transaction ID, 简写XID. 是分布式中全局唯一的事务ID.Transaction Coordinator , 简写TC. 事务协调器, 维护全局事务的运行状态, 负责协调并驱动全局事务的提交或回滚.Transaction Manager, 简写TM. 事务管理器, 控制全局事务的边界, 负责开启一个全局事务, 并最终发起全局提交或回滚的决议.Resource Manager, 简写RM. 资源管理器, 控制分支事务, 负责分支注册, 状态汇报, 并接收事务协调器的指令, 驱动分支(本地)事务的提交和回滚.

模型图:

处理过程:

TM 向 TC 申请开启一个全局事务,全局事务创建成功并生成一个全局唯一的 XID;XID 在微服务调用链路的上下文中传播;RM 向 TC 注册分支事务,将其纳入 XID 对应全局事务的管辖;TM 向 TC 发起针对 XID 的全局提交或回滚决议;TC 调度 XID 下管辖的全部分支事务完成提交或回滚请求。 2.3 下载

→Seata官网下载
→Seata在GitHub的发布说明
→GitHub下载Seata

2.4 怎么用

以前本地事务我们用Spring提供的注解 @Transactionnal 开启.
Seata也给我们提供了开启全局事务的注解 @GlobalTransactional.

我们只需要在业务方法上使用一个@GlobalTransactional注解.

3. Seata-Server安装

我下载的版本是 seata-server-0.9.0 , 大家可以选择更新的版本.
将seata-server-0.9.0.zip解压到指定目录并修改conf目录下的file.conf配置文件.(先备份file.conf文件后再修改)

修改内容: 自定义事务组名称 + 事务日志存储模式为db + 数据库连接信息.
在service模块中修改自定义事务组名称

service {
  #vgroup->rgroup
  # 自定义事务组名称 fsp_tx_group, 默认default
  vgroup_mapping.my_test_tx_group = "fsp_tx_group"
  #only support single node
  default.grouplist = "127.0.0.1:8091"
  #degrade current not support
  enableDegrade = false
  #disable
  disable = false
  #unit ms,s,m,h,d represents milliseconds, seconds, minutes, hours, days, default permanent
  max.commit.retry.timeout = "-1"
  max.rollback.retry.timeout = "-1"
}

store模块修改事务日志存储模式为db, 并修改数据库连接信息.

store {
  ## store mode: file、db
  # 事务日志存储模式
  mode = "db"

  ## file store
  file {
    dir = "sessionStore"

    # branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions
    max-branch-session-size = 16384
    # globe session size , if exceeded throws exceptions
    max-global-session-size = 512
    # file buffer size , if exceeded allocate new buffer
    file-write-buffer-cache-size = 16384
    # when recover batch read size
    session.reload.read_size = 100
    # async, sync
    flush-disk-mode = async
  }

  ## database store 数据库连接信息
  db {
    ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc.
    datasource = "dbcp"
    ## mysql/oracle/h2/oceanbase etc.
    db-type = "mysql"
    driver-class-name = "com.mysql.jdbc.Driver"
    url = "jdbc:mysql://127.0.0.1:3306/seata"
    user = "root"
    password = "root"
    min-conn = 1
    max-conn = 3
    global.table = "global_table"
    branch.table = "branch_table"
    lock-table = "lock_table"
    query-limit = 100
  }
}

在mysql中创建seata库, 然后分别创建全局事务表global_table, 分支事务表branch_table和事务锁表lock_table. 这三张表不用自己设计, 在下载的seata包中的conf目录下有一个db_store.sql文件, 到mysql客户端中执行即可.

-- the table to store GlobalSession data
drop table if exists `global_table`;
create table `global_table` (
  `xid` varchar(128)  not null,
  `transaction_id` bigint,
  `status` tinyint not null,
  `application_id` varchar(32),
  `transaction_service_group` varchar(32),
  `transaction_name` varchar(128),
  `timeout` int,
  `begin_time` bigint,
  `application_data` varchar(2000),
  `gmt_create` datetime,
  `gmt_modified` datetime,
  primary key (`xid`),
  key `idx_gmt_modified_status` (`gmt_modified`, `status`),
  key `idx_transaction_id` (`transaction_id`)
);

-- the table to store BranchSession data
drop table if exists `branch_table`;
create table `branch_table` (
  `branch_id` bigint not null,
  `xid` varchar(128) not null,
  `transaction_id` bigint ,
  `resource_group_id` varchar(32),
  `resource_id` varchar(256) ,
  `lock_key` varchar(128) ,
  `branch_type` varchar(8) ,
  `status` tinyint,
  `client_id` varchar(64),
  `application_data` varchar(2000),
  `gmt_create` datetime,
  `gmt_modified` datetime,
  primary key (`branch_id`),
  key `idx_xid` (`xid`)
);

-- the table to store lock data
drop table if exists `lock_table`;
create table `lock_table` (
  `row_key` varchar(128) not null,
  `xid` varchar(96),
  `transaction_id` long ,
  `branch_id` long,
  `resource_id` varchar(256) ,
  `table_name` varchar(32) ,
  `pk` varchar(36) ,
  `gmt_create` datetime ,
  `gmt_modified` datetime,
  primary key(`row_key`)
);


修改conf目录下的registry.conf配置文件(先备份), 修改注册中心配置. 注册中心支持nacos 、eureka、redis、zk、consuld等.

registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  #  注册中心修改为nacos
  type = "nacos"

  nacos {
    serverAddr = "localhost:8848"
    namespace = ""
    cluster = "default"
  }
  ............ 此处省略完整配置
}

先启动nacos, 再启动seata-server, 否则报 no available server to connect.

/nacos/bin/startup.cmd
/seata-0.9.0/bin/seata-server.bat

4. 订单/库存/账户业务数据库准备

分布式事务说明, 我们这里需要创建三个服务, 订单服务, 库存服务, 账户服务.
当用户下订单时, 订单服务会创建一个订单, 然后通过远程调用库存服务来扣减下单商品的库存, 再通过远程调用账户服务来扣减账户余额, 最后在订单服务中修改订单状态为已完成. 该操作跨三个数据库, 有两次远程调用, 很明显会有分布式事务问题.

场景: 下订单 -> 扣减库存 -> 扣减账户余额
三个数据库: 订单数据库(seata_order), 库存数据库(seata_storage), 账户数据库(seata_account).

业务数据库SQL

CREATE DATAbase seata_order;
CREATE DATAbase seata_storage;
CREATE DATAbase seata_account;

在seata_order库中创建订单表 t_order.

 CREATE TABLE t_order (
  `id` BIGINT(11) NOT NULL AUTO_INCREMENT PRIMARY KEY,
  `user_id` BIGINT(11) DEFAULT NULL COMMENT '用户id',
  `product_id` BIGINT(11) DEFAULT NULL COMMENT '产品id',
  `count` INT(11) DEFAULT NULL COMMENT '数量',
  `money` DECIMAL(11,0) DEFAULT NULL COMMENT '金额',
  `status` INT(1) DEFAULT NULL COMMENT '订单状态:0:创建中;1:已完结' 
) ENGINE=INNODB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8;

在seata_storage库中创建库存表 t_storage

CREATE TABLE t_storage (
 `id` BIGINT(11) NOT NULL AUTO_INCREMENT PRIMARY KEY,
 `product_id` BIGINT(11) DEFAULT NULL COMMENT '产品id',
 `total` INT(11) DEFAULT NULL COMMENT '总库存',
 `used` INT(11) DEFAULT NULL COMMENT '已用库存',
 `residue` INT(11) DEFAULT NULL COMMENT '剩余库存'
) ENGINE=INNODB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;
 
-- 初始库存 
INSERT INTO seata_storage.t_storage(`id`, `product_id`, `total`, `used`, `residue`)
VALUES ('1', '1', '100', '0', '100');

在seata_account库中创建账户表 t_account

CREATE TABLE t_account (
  `id` BIGINT(11) NOT NULL AUTO_INCREMENT PRIMARY KEY COMMENT 'id',
  `user_id` BIGINT(11) DEFAULT NULL COMMENT '用户id',
  `total` DECIMAL(10,0) DEFAULT NULL COMMENT '总额度',
  `used` DECIMAL(10,0) DEFAULT NULL COMMENT '已用余额',
  `residue` DECIMAL(10,0) DEFAULT '0' COMMENT '剩余可用额度'
) ENGINE=INNODB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;

-- 初始化账户信息 
INSERT INTO seata_account.t_account(`id`, `user_id`, `total`, `used`, `residue`)  VALUES ('1', '1', '1000', '0', '1000');

在上面三个业务数据库seata_order, seata_storage, seata_account中分别创建各自的事务回滚日志表. 执行seata包中conf目录下的db_undo_log.sql文件.

-- the table to store seata xid data
-- 0.7.0+ add context
-- you must to init this sql for you business databese. the seata server not need it.
-- 此脚本必须初始化在你当前的业务数据库中,用于AT 模式XID记录。与server端无关(注:业务数据库)
-- 注意此处0.3.0+ 增加唯一索引 ux_undo_log
drop table `undo_log`;
CREATE TABLE `undo_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(100) NOT NULL,
  `context` varchar(128) NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int(11) NOT NULL,
  `log_created` datetime NOT NULL,
  `log_modified` datetime NOT NULL,
  `ext` varchar(100) DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

各模块的数据库表准备完成后, 看一下结构.

5. 订单/库存/账户/业务微服务准备

业务需求, 下订单 -> 减库存 -> 扣余额 -> 改订单状态

5.1 新建订单模块

创建订单模块 seata-order-service-2001, pom依赖文件.



    
        atguigu-cloud-2020
        com.atguigu.springcloud
        1.0-SNAPSHOT
    
    4.0.0

    seata-order-service-2001

    
    	
        
            com.atguigu.springcloud
            cloud-api-commons
            ${project.version}
        
        
        
            com.alibaba.cloud
            spring-cloud-starter-alibaba-nacos-discovery
        
        
        
            com.alibaba.cloud
            spring-cloud-starter-alibaba-seata
            
                
                    seata-all
                    io.seata
                
            
        
        
            io.seata
            seata-all
            0.9.0
        
        
        
            org.springframework.cloud
            spring-cloud-starter-openfeign
        
        
        
            org.springframework.boot
            spring-boot-starter-web
        
        
            org.springframework.boot
            spring-boot-starter-actuator
        
        
        
            mysql
            mysql-connector-java
            5.1.37
        
        
            com.alibaba
            druid-spring-boot-starter
            1.1.10
        
        
            org.mybatis.spring.boot
            mybatis-spring-boot-starter
            2.0.0
        
        
            org.springframework.boot
            spring-boot-starter-test
            test
        
        
            org.springframework.boot
            spring-boot-devtools
            runtime
            true
        
        
            org.projectlombok
            lombok
            true
        
    

yml配置

server:
  port: 2001

spring:
  application:
    name: seata-order-service
  cloud:
    alibaba:
      seata:
        #自定义事务组名称需要与seata-server中的对应
        tx-service-group: fsp_tx_group
    nacos:
      discovery:
        server-addr: localhost:8848
  datasource:
    type: com.alibaba.druid.pool.DruidDataSource
    driver-class-name: com.mysql.jdbc.Driver
    url: jdbc:mysql://localhost:3306/seata_order?serverTimezone=Asia/Shanghai
    username: root
    password: root
feign:
  hystrix:
    enabled: false
logging:
  level:
    io:
      seata: info
# 自定义属性      
mybatis:
  mapperLocations: classpath:mapper
@NoArgsConstructor
@AllArgsConstructor
@Data
@Builder
public class Order {

    private Long id;

    private Long userId;

    private Long productId;

    private Integer count;

    private BigDecimal money;
    
    private Integer status;
}

创建OrderDao接口, 使用@Mapper注解标注.

@Mapper
public interface OrderDao {
    
    void create(Order order);
    
    void update(@Param("userId") Long userId, @Param("status") Integer status);
}

创建OrderDao接口的映射文件, OrderMapper.xml.






    
        
        
        
        
        
        
    

    
        INSERT INTO `t_order` (`id`, `user_id`, `product_id`, `count`, `money`, `status`)
        VALUES (NULL, #{userId}, #{productId}, #{count}, #{money}, 0);
    

    
        UPDATE `t_order`
        SET status = 1
        WHERe user_id = #{userId} AND status = #{status};
    

创建Service及实现

public interface OrderService {
    
    void create(Order order);
}
@FeignClient(value = "seata-storage-service")
public interface StorageService {
    @PostMapping(value = "/storage/decrease")
    CommonResult decrease(@RequestParam("productId") Long productId, @RequestParam("count") Integer count);
}
@FeignClient(value = "seata-account-service")
public interface AccountService {
    @PostMapping(value = "/account/decrease")
    CommonResult decrease(@RequestParam("userId") Long userId, @RequestParam("money") BigDecimal money);
}
@Service
@Slf4j
public class OrderServiceImpl implements OrderService {

    @Resource
    private OrderDao orderDao;

    @Resource
    private StorageService storageService;

    @Resource
    private AccountService accountService;

    
    @Override
   // @GlobalTransactional(name = "fsp-create-order", rollbackFor = Exception.class)
    public void create(Order order) {
    
        log.info("------->下单开始");
        //本应用创建订单
        orderDao.create(order);
        //远程调用库存服务扣减库存
        log.info("------->order-service中扣减库存开始");
        storageService.decrease(order.getProductId(), order.getCount());
        log.info("------->order-service中扣减库存结束");

        //远程调用账户服务扣减余额
        log.info("------->order-service中扣减余额开始");
        accountService.decrease(order.getUserId(), order.getMoney());
        log.info("------->order-service中扣减余额结束");

        //修改订单状态为已完成
        log.info("------->order-service中修改订单状态开始");
        orderDao.update(order.getProductId(), 0);
        log.info("------->order-service中修改订单状态结束");
        log.info("------->下单结束");
    }
}

编写controller

@RestController
public class OrderController {
    @Autowired
    private OrderService orderService;

    
    @GetMapping("/order/create")
    public CommonResult create(Order order) {
        orderService.create(order);
        return new CommonResult(200, "订单创建成功!");
    }
}

MybatisConfig配置扫描mapper接口的包.

@Configuration
@MapperScan(value = "com.atguigu.springcloud.dao")
public class MyBatisConfig {
}

配置代理数据源

@Configuration
public class DataSourceProxyConfig {
    @Value("${mybatis.mapper-locations}")
    private String mapperLocations;

    @Bean
    @ConfigurationProperties(prefix = "spring.datasource")
    public DataSource druidDataSource() {
        return new DruidDataSource();
    }

    @Bean
    public DataSourceProxy dataSourceProxy(DataSource dataSource) {
        return new DataSourceProxy(dataSource);
    }

    @Bean
    public SqlSessionFactory sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception {
        SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
        sqlSessionFactoryBean.setDataSource(dataSourceProxy);
        sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(mapperLocations));
        sqlSessionFactoryBean.setTransactionFactory(new SpringManagedTransactionFactory());
        return sqlSessionFactoryBean.getObject();
    }
}

创建主启动类

// 排除自动配置数据源,使用自定义的数据源
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class) 
@EnableDiscoveryClient
@EnableFeignClients
public class SeataOrderMainApp2001 {
    public static void main(String[] args) {
        SpringApplication.run(SeataOrderMainApp2001.class);
    }
}
5.2 新建库存模块 5.3 新建账户模块 6. 分布式事务测试 7. 补充 个人博客

欢迎访问个人博客: https://www.crystalblog.xyz/

备用地址: https://wang-qz.gitee.io/crystal-blog/

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/704158.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号