栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

分布式事务三——seata实战

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

分布式事务三——seata实战

文章目录

一、Seata介绍

1.1 支持的事务模型 二、AT模式实战

2.1 准备数据2.2 工程2.3 准备TC服务

2.3.1 配置2.3.2 启动 三、改造微服务

3.1 引入依赖3.2 添加配置文件3.3 代理DataSource3.4 添加事务注解3.5.改造Storage、Account服务 四、测试

一、Seata介绍

Seata(Simple Extensible Autonomous Transaction Architecture,简单可扩展自治事务框架)是 2019 年 1 月份蚂蚁金服和阿里巴巴共同开源的分布式事务解决方案。Seata 开源半年左右,就已经有接近一万 star,社区非常活跃。我们热忱欢迎大家参与到 Seata 社区建设中,一同将 Seata 打造成开源分布式事务标杆产品。

Seata:https://github.com/seata/seata

1.1 支持的事务模型

Seata 会有 4 种分布式事务解决方案,分别是 AT 模式、TCC 模式、Saga 模式和 XA 模式。

二、AT模式实战

Seata中比较常用的是AT模式,这里我们拿AT模式来做演示,看看如何在SpringCloud微服务中集成Seata.

我们假定一个用户购买商品的业务逻辑。整个业务逻辑由3个微服务提供支持:

仓储服务:对给定的商品扣除仓储数量。订单服务:根据采购需求创建订单。帐户服务:从用户帐户中扣除余额。

流程图

订单服务在下单时,同时调用库存服务和用户服务,此时就会发生跨服务和跨数据源的分布式事务问题。

2.1 准备数据
# 订单
CREATE TABLE `order_tbl` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `user_id` varchar(255) DEFAULT NULL COMMENT '用户id',
  `commodity_code` varchar(255) DEFAULT NULL COMMENT '商品码',
  `count` int(11) unsigned DEFAULT '0' COMMENT '购买数量',
  `money` int(11) unsigned DEFAULT '0' COMMENT '总金额',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=COMPACT;
# 库存
CREATE TABLE `storage_tbl` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `commodity_code` varchar(255) DEFAULT NULL COMMENT '商品码',
  `count` int(11) unsigned DEFAULT '0' COMMENT '商品库存',
  PRIMARY KEY (`id`) USING BTREE,
  UNIQUE KEY `commodity_code` (`commodity_code`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8 ROW_FORMAT=COMPACT;
INSERT INTO `storage_tbl` VALUES (1, '100202003032041', 10);
# 账户
CREATE TABLE `account_tbl` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `user_id` varchar(255) DEFAULT NULL COMMENT '用户id',
  `money` int(11) unsigned DEFAULT '0' COMMENT '用户余额',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8 ROW_FORMAT=COMPACT;
INSERT INTO `account_tbl` VALUES (1, 'user202003032042012', 1000);
# 用来记录Seata中的事务日志表undo_log,其中会包含`after_image`和`before_image`数据,用于数据回滚
CREATE TABLE `undo_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(100) NOT NULL,
  `context` varchar(128) NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int(11) NOT NULL,
  `log_created` datetime NOT NULL,
  `log_modified` datetime NOT NULL,
  `ext` varchar(100) DEFAULT NULL,
  PRIMARY KEY (`id`) USING BTREE,
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=COMPACT;
2.2 工程

先下载相关资料
资料文件
使用Idea打开资料中提供的 seata-demo项目
找到项目所在目录,选中并打开:
项目结构如下:

结构说明:

account-service:用户服务,提供操作用户账号余额的功能,端口8083eureka-server:注册中心,端口8761order-service:订单服务,提供根据数据创建订单的功能,端口8082storage-service:仓储服务,提供扣减商品库存功能,端口8081 2.3 准备TC服务

在之前讲解Seata原理的时候,我们就聊过,其中包含重要的3个角色:

TC:事务协调器TM:事务管理器RM:资源管理器
其中,TC是一个独立的服务,负责协调各个分支事务,而TM和RM通过jar包的方式,集成在各个事务参与者中。
因此,首先我们需要搭建一个独立的TC服务。

在资料中提供有1.1.0版本的安装包,解压即可,其目录结构如下:

2.3.1 配置

Seata的核心配置主要是两部分:

注册中心的配置:在${seata_home}/conf/目录中,一般是registry.conf文件当前服务的配置,两种配置方式:

通过分布式服务的统一配置中心,例如Zookeeper通过本地文件

我们先看registry.conf,内容是JSON风格

registry {
  # 指定注册中心类型,这里使用eureka类型
  type = "eureka"
  # 各种注册中心的配置。。这里省略,只保留了eureka和Zookeeper
  eureka {
    serviceUrl = "http://localhost:8761/eureka"
    application = "seata_tc_server"
    weight = "1"
  }
  zk {
    cluster = "default"
    serverAddr = "127.0.0.1:2181"
    session.timeout = 6000
    connect.timeout = 2000
  }
}

config {
  # 配置文件方式,可以支持 file、nacos 、apollo、zk、consul、etcd3
  type = "file"
  nacos {
    serverAddr = "localhost"
    namespace = ""
    group = "SEATA_GROUP"
  }
  zk {
    serverAddr = "127.0.0.1:2181"
    session.timeout = 6000
    connect.timeout = 2000
  }
  file {
    name = "file.conf"
  }
}

这个文件主要配置两个内容:

注册中心的类型及地址,本例我们选择eureka做注册中心

eureka.serviceUrl:是eureka的地址,例如http://localhost:8761/eurekaapplication:是TC注册到eureka时的服务名称,例如seata_tc_server 配置中心的类型及地址,本例我们选择本地文件做配置,就是当前目录的file.conf文件

再来看file.conf文件:

## transaction log store, only used in seata-server
store {
  ## store mode: file、db
  mode = "file"
  ## file store property
  file {
    ## store location dir
    dir = "sessionStore"
    # branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions
    maxBranchSessionSize = 16384
    # globe session size , if exceeded throws exceptions
    maxGlobalSessionSize = 512
    # file buffer size , if exceeded allocate new buffer
    fileWriteBufferCacheSize = 16384
    # when recover batch read size
    sessionReloadReadSize = 100
    # async, sync
    flushDiskMode = async
  }

  ## database store property
  db {
    ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc.
    datasource = "dbcp"
    ## mysql/oracle/h2/oceanbase etc.
    dbType = "mysql"
    driverClassName = "com.mysql.jdbc.Driver"
    url = "jdbc:mysql://192.168.136.133:3308/seata_demo"
    user = "root"
    password = "root"
    minConn = 1
    maxConn = 10
    globalTable = "global_table"
    branchTable = "branch_table"
    lockTable = "lock_table"
    queryLimit = 100
  }
}

关键配置:

store:TC的服务端数据存储配置

mode:数据存储方式,支持两种:file和db

file:将数据存储在本地文件中,性能比较好,但不支持水平扩展db:将数据保存在指定的数据库中,需要指定数据库连接信息

如果用文件作为存储介质,不需要其它配置了,直接运行即可。
如果使用db作为存储介质,还需要在数据库中创建3张表:

CREATE TABLE IF NOT EXISTS `global_table`
(
    `xid`                       VARCHAR(128) NOT NULL,
    `transaction_id`            BIGINT,
    `status`                    TINYINT      NOT NULL,
    `application_id`            VARCHAR(32),
    `transaction_service_group` VARCHAR(32),
    `transaction_name`          VARCHAR(128),
    `timeout`                   INT,
    `begin_time`                BIGINT,
    `application_data`          VARCHAR(2000),
    `gmt_create`                DATETIME,
    `gmt_modified`              DATETIME,
    PRIMARY KEY (`xid`),
    KEY `idx_gmt_modified_status` (`gmt_modified`, `status`),
    KEY `idx_transaction_id` (`transaction_id`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8;

-- the table to store BranchSession data
CREATE TABLE IF NOT EXISTS `branch_table`
(
    `branch_id`         BIGINT       NOT NULL,
    `xid`               VARCHAR(128) NOT NULL,
    `transaction_id`    BIGINT,
    `resource_group_id` VARCHAR(32),
    `resource_id`       VARCHAR(256),
    `branch_type`       VARCHAR(8),
    `status`            TINYINT,
    `client_id`         VARCHAR(64),
    `application_data`  VARCHAR(2000),
    `gmt_create`        DATETIME,
    `gmt_modified`      DATETIME,
    PRIMARY KEY (`branch_id`),
    KEY `idx_xid` (`xid`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8;

-- the table to store lock data
CREATE TABLE IF NOT EXISTS `lock_table`
(
    `row_key`        VARCHAR(128) NOT NULL,
    `xid`            VARCHAR(96),
    `transaction_id` BIGINT,
    `branch_id`      BIGINT       NOT NULL,
    `resource_id`    VARCHAR(256),
    `table_name`     VARCHAR(32),
    `pk`             VARCHAR(36),
    `gmt_create`     DATETIME,
    `gmt_modified`   DATETIME,
    PRIMARY KEY (`row_key`),
    KEY `idx_branch_id` (`branch_id`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8;
2.3.2 启动


先启动工程里的四个微服务

将这两个文件内容改成和上面的一样,双击bin目录下的seata-server.bat,查看Eureka注册中心的信息

TC服务已经注册进去

三、改造微服务

接下来是微服务的改造,不管是哪一个微服务,只要是事务的参与者,步骤基本一致。

3.1 引入依赖

我们在父工程seata-demo中已经对依赖做了管理:

2.1.0.RELEASE
1.1.0

因此,我们在项目order-service的pom文件中,引入依赖坐标即可:


    com.alibaba.cloud
    spring-cloud-alibaba-seata
    ${alibaba.seata.version}
    
        
            seata-all
            io.seata
        
    


    seata-all
    io.seata
    ${seata.version}

3.2 添加配置文件

首先在application.yml中添加一行配置:

spring:
  cloud:
    alibaba:
      seata:
        tx-service-group: test_tx_group # 定义事务组的名称

事务组的名称,接下来会用到。
然后是在resources目录下放两个配置文件:file.conf和registry.conf其中,registry.conf与TC服务端的一样,此处不再讲解。我们来看下file.conf

transport {
  # tcp udt unix-domain-socket
  type = "TCP"
  #NIO NATIVE
  server = "NIO"
  #enable heartbeat
  heartbeat = true
  # the client batch send request enable
  enableClientBatchSendRequest = true
  #thread factory for netty
  threadFactory {
    bossThreadPrefix = "NettyBoss"
    workerThreadPrefix = "NettyServerNIOWorker"
    serverExecutorThread-prefix = "NettyServerBizHandler"
    shareBossWorker = false
    clientSelectorThreadPrefix = "NettyClientSelector"
    clientSelectorThreadSize = 1
    clientWorkerThreadPrefix = "NettyClientWorkerThread"
    # netty boss thread size,will not be used for UDT
    bossThreadSize = 1
    #auto default pin or 8
    workerThreadSize = "default"
  }
  shutdown {
    # when destroy server, wait seconds
    wait = 3
  }
  serialization = "seata"
  compressor = "none"
}
service {
  vgroupMapping.test_tx_group = "seata_tc_server"
  #only support when registry.type=file, please don't set multiple addresses
  seata_tc_server.grouplist = "127.0.0.1:8091"
  #degrade, current not support
  enableDegrade = false
  #disable seata
  disableGlobalTransaction = false
}

client {
  rm {
    asyncCommitBufferLimit = 10000
    lock {
      retryInterval = 10
      retryTimes = 30
      retryPolicyBranchRollbackonConflict = true
    }
    reportRetryCount = 5
    tablemetaCheckEnable = false
    reportSuccessEnable = false
  }
  tm {
    commitRetryCount = 5
    rollbackRetryCount = 5
  }
  undo {
    dataValidation = true
    logSerialization = "jackson"
    logTable = "undo_log"
  }
  log {
    exceptionRate = 100
  }
}

配置解读:

transport:与TC交互的一些配置

heartbeat:client和server通信心跳检测开关enableClientBatchSendRequest:客户端事务消息请求是否批量合并发送 service:TC的地址配置,用于获取TC的地址

vgroupMapping.test_tx_group = "seata_tc_server":

test_tx_group:是事务组名称,要与application.yml中配置一致,seata_tc_server:是TC服务端在注册中心的id,将来通过注册中心获取TC地址enableDegrade:服务降级开关,默认关闭。如果开启,当业务重试多次失败后会放弃全局事务disableGlobalTransaction:全局事务开关,默认false。false为开启,true为关闭 default.grouplist:这个当注册中心为file的时候,才用到 client:客户端配置

rm:资源管理器配

asynCommitBufferLimit:二阶段提交默认是异步执行,这里指定异步队列的大小lock:全局锁配置

retryInterval:校验或占用全局锁重试间隔,默认10,单位毫秒retryTimes:校验或占用全局锁重试次数,默认30次retryPolicyBranchRollbackOnConflict:分支事务与其它全局回滚事务冲突时锁策略,默认true,优先释放本地锁让回滚成功 reportRetryCount:一阶段结果上报TC失败后重试次数,默认5次 tm:事务管理器配置

commitRetryCount:一阶段全局提交结果上报TC重试次数,默认1rollbackRetryCount:一阶段全局回滚结果上报TC重试次数,默认1 undo:undo_log的配置

dataValidation:是否开启二阶段回滚镜像校验,默认truelogSerialization:undo序列化方式,默认JacksonlogTable:自定义undo表名,默认是undo_log log:日志配置

exceptionRate:出现回滚异常时的日志记录频率,默认100,百分之一概率。回滚失败基本是脏数据,无需输出堆栈占用硬盘空间 3.3 代理DataSource

Seata的二阶段执行是通过拦截sql语句,分析语义来指定回滚策略,因此需要对DataSource做代理。我们在项目的cn.itcast.order.config包中,添加一个配置类:

package cn.itcast.order.config;

import com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean;
import io.seata.rm.datasource.DataSourceProxy;
import org.apache.ibatis.session.SqlSessionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.sql.DataSource;

@Configuration
public class DataSourceProxyConfig {

    @Bean
    public SqlSessionFactory sqlSessionFactoryBean(DataSource dataSource) throws Exception {
        // 订单服务中引入了mybatis-plus,所以要使用特殊的SqlSessionFactoryBean
        MybatisSqlSessionFactoryBean sqlSessionFactoryBean = new MybatisSqlSessionFactoryBean();
        // 代理数据源
        sqlSessionFactoryBean.setDataSource(new DataSourceProxy(dataSource));
        // 生成SqlSessionFactory
        return sqlSessionFactoryBean.getObject();
    }
}

注意,这里因为订单服务使用了mybatis-plus这个框架(这是一个mybatis集成框架,自动生成单表Sql),因此我们需要用mybatis-plus的MybatisSqlSessionFactoryBean代替SqlSessionFactoryBean
如果用的是原生的mybatis,请使用SqlSessionFactoryBean

3.4 添加事务注解

给事务发起者order_service的OrderServiceImpl中的createOrder()方法添加@GlobalTransactional注解,开启全局事务,重新启动即可。

3.5.改造Storage、Account服务

与OrderService类似,这里也要经过下面的步骤:

引入依赖:与order-service一致,略

添加配置文件:与order-service一致,略

代理DataSource:与order-service一致,略

事务注解可以使用@Transactionnal,而不是@GlobalTransactional,事务发起者才需要添加@GlobalTransactional。

四、测试

重启所有微服务后再测试。
目前数据情况:用户余额1000,库存为10。
我们试试扣款1200元,那么扣款失败,理论上来说所有数据都会回滚。

查看库存发现成功回滚,说明分布式事务生效了!

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/770326.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号