栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Spring Cloud Alibaba:本地事务与分布式事务演示 & Seata使用

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Spring Cloud Alibaba:本地事务与分布式事务演示 & Seata使用

事务的相关概念就不介绍了,本篇博客先给大家演示本地事务与分布式事务,然后介绍Seata的基本使用,以后会介绍Seata的集群、注册中心、配置中心以及各种事务模式。

  • 版本说明
搭建工程

一个父module和一个子module,父module用于管理依赖版本,子module用于演示本地事务与分布式事务(通过一个简易版的用户购买商品案例来演示),最后还需要集成Seata分布式事务解决方案。

父module的pom.xml:



    4.0.0

    com.kaven
    alibaba
    pom
    1.0-SNAPSHOT

    Spring Cloud Alibaba
    
        seata
    

    
        8
        8
        Hoxton.SR9
        2.2.6.RELEASE
    

    
        org.springframework.boot
        spring-boot-starter-parent
        2.3.2.RELEASE
    

    
        
            
                org.springframework.cloud
                spring-cloud-dependencies
                ${spring-cloud-version}
                pom
                import
            
            
                com.alibaba.cloud
                spring-cloud-alibaba-dependencies
                ${spring-cloud-alibaba-version}
                pom
                import
            
        
    

seata module

seata module通过一个简易版的用户购买商品案例来演示本地事务与分布式事务,seata module的结构如下图所示:

pom.xml:



    
        com.kaven
        alibaba
        1.0-SNAPSHOT
    
    4.0.0

    seata

    
        8
        8
    

    
        
            org.springframework.boot
            spring-boot-starter-web
        
        
            org.springframework.boot
            spring-boot-starter-data-jpa
        
        
            mysql
            mysql-connector-java
            8.0.26
        
        
            org.projectlombok
            lombok
        
    

application.yml:

server:
  port: 9000

spring:
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    username: root
    password: ITkaven@666
    url: jdbc:mysql://localhost:3306/seata?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=Asia/Shanghai
  jpa:
    show-sql: true
Entity

Order实体类:

package com.kaven.alibaba.entity;

import lombok.Data;

import javax.persistence.*;

@Entity
@Table(name = "`order`") // 不能将`order`改成order(Mysql数据库关键字),不然与数据库交互时会出错
@Data
public class Order {
    // 订单id
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    public Integer id;

    // 用户id 
    public Integer userId;
    
    // 商品id
    public Integer productId;

    // 商品购买数量
    public Integer count;

    // 订单金额
    public Integer money;
}

Storage实体类:

package com.kaven.alibaba.entity;

import lombok.Data;

import javax.persistence.*;

@Entity
@Table(name = "storage")
@Data
public class Storage {
    
    // 商品id
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    public Integer id;

    // 库存
    public Integer count;
}

User实体类:

package com.kaven.alibaba.entity;

import lombok.Data;

import javax.persistence.*;

@Entity
@Table(name = "user")
@Data
public class User {

    // 用户id 
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    public Integer id;

    // 用户余额
    public Integer money;
}
Repository

OrderRepository:

package com.kaven.alibaba.repository;

import com.kaven.alibaba.entity.Order;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;

@Repository
public interface OrderRepository extends JpaRepository {}

StorageRepository:

package com.kaven.alibaba.repository;

import com.kaven.alibaba.entity.Storage;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;

@Repository
public interface StorageRepository extends JpaRepository {}

UserRepository:

package com.kaven.alibaba.repository;

import com.kaven.alibaba.entity.User;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;

@Repository
public interface UserRepository extends JpaRepository {}
Service

IOrderService:

package com.kaven.alibaba.service;

public interface IOrderService {
    void create(int userId, int productId, int count, int money);
}

OrderServiceImpl:

package com.kaven.alibaba.service.impl;

import com.kaven.alibaba.entity.Order;
import com.kaven.alibaba.repository.OrderRepository;
import com.kaven.alibaba.service.IOrderService;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

@Service
public class OrderServiceImpl implements IOrderService {

    @Resource
    private OrderRepository orderRepository;

    @Override
    public void create(int userId, int productId, int count, int money) {
        // 生成订单
        Order order = new Order();
        order.setUserId(userId);
        order.setProductId(productId);
        order.setCount(count);
        order.setMoney(money);
        orderRepository.save(order);
    }
}

IStorageService:

package com.kaven.alibaba.service;

public interface IStorageService {
    void deduct(int productId, int count);
}

StorageServiceImpl:

package com.kaven.alibaba.service.impl;

import com.kaven.alibaba.entity.Storage;
import com.kaven.alibaba.repository.StorageRepository;
import com.kaven.alibaba.service.IStorageService;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.util.Optional;

@Service
public class StorageServiceImpl implements IStorageService {

    @Resource
    private StorageRepository storageRepository;

    @Override
    public void deduct(int productId, int count) {
        Optional byId = storageRepository.findById(productId);
        if(byId.isPresent()) {
            Storage storage = byId.get();
            if(storage.getCount() >= count) {
                // 减库存
                storage.setCount(storage.getCount() - count);
                storageRepository.save(storage);
            }
            else {
                throw new RuntimeException("该商品库存不足!");
            }
        }
        else {
            throw new RuntimeException("该商品不存在!");
        }
    }
}

IUserService:

package com.kaven.alibaba.service;

public interface IUserService {
    void debit(int userId, int money);
}

UserServiceImpl:

package com.kaven.alibaba.service.impl;

import com.kaven.alibaba.entity.User;
import com.kaven.alibaba.repository.UserRepository;
import com.kaven.alibaba.service.IUserService;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.util.Optional;

@Service
public class UserServiceImpl implements IUserService {

    @Resource
    private UserRepository userRepository;

    @Override
    public void debit(int userId, int money) {
        Optional byId = userRepository.findById(userId);
        if(byId.isPresent()) {
            User user = byId.get();
            if(user.getMoney() >= money) {
                // 减余额
                user.setMoney(user.getMoney() - money);
                userRepository.save(user);
            }
            else {
                throw new RuntimeException("该用户余额不足!");
            }
        }
        else {
            throw new RuntimeException("没有该用户!");
        }
    }
}

IBuyService:

package com.kaven.alibaba.service;

public interface IBuyService {
    void buy(int userId, int productId, int count);
}
package com.kaven.alibaba.service.impl;

import com.kaven.alibaba.service.IBuyService;
import com.kaven.alibaba.service.IOrderService;
import com.kaven.alibaba.service.IStorageService;
import com.kaven.alibaba.service.IUserService;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

@Service
public class BuyServiceImpl implements IBuyService {

    @Resource
    private IOrderService orderService;

    @Resource
    private IStorageService storageService;

    @Resource
    private IUserService userService;

    @Override
    public void buy(int userId, int productId, int count) {
        int money = count;
        // 生成订单
        orderService.create(userId, productId, count, money);
        // 减库存
        storageService.deduct(productId, count);
        // 减余额
        userService.debit(userId, money);
    }
}
Controller

BuyController:

package com.kaven.alibaba.controller;

import com.kaven.alibaba.service.IBuyService;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

@RestController
public class BuyController {

    @Resource
    private IBuyService buyService;

    @PostMapping("/buy")
    public String buy(@RequestParam("userId") Integer userId,
                      @RequestParam("productId") Integer productId,
                      @RequestParam("count") Integer count) {
        buyService.buy(userId, productId, count);
        return "success";
    }
}

启动类:

package com.kaven.alibaba;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class);
    }
}
数据库

进入数据库中,执行下面的sql(为了简单,没有创建相关索引)。

CREATE DATAbase seata;

USE seata;

DROp TABLE IF EXISTS `storage`;
CREATE TABLE `storage` (
                           `id` int(11) NOT NULL AUTO_INCREMENT,
                           `count` int(11) DEFAULT 0,
                           PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;


DROP TABLE IF EXISTS `order`;
CREATE TABLE `order` (
                         `id` int(11) NOT NULL AUTO_INCREMENT,
                         `user_id` int(11) DEFAULT NULL,
                         `product_id` int(11) DEFAULT NULL,
                         `count` int(11) DEFAULT 0,
                         `money` int(11) DEFAULT 0,
                         PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;


DROP TABLE IF EXISTS `user`;
CREATE TABLE `user` (
                        `id` int(11) NOT NULL AUTO_INCREMENT,
                        `money` int(11) DEFAULT 0,
                        PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

插入几条数据,如下图所示:

启动应用。

本地事务

使用Postman向应用发送请求。

很显然用户的金额不够。

商品的库存也被扣减了。

订单已经创建了。

减用户余额的操作由于没有被执行,因此用户余额不变,这就导致了数据不一致的问题。


这种只在一个服务下的事务比较好解决,可以使用Spring的@Transactional注解。

package com.kaven.alibaba.service.impl;

import com.kaven.alibaba.service.IBuyService;
import com.kaven.alibaba.service.IOrderService;
import com.kaven.alibaba.service.IStorageService;
import com.kaven.alibaba.service.IUserService;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.Resource;

@Service
public class BuyServiceImpl implements IBuyService {

    @Resource
    private IOrderService orderService;

    @Resource
    private IStorageService storageService;

    @Resource
    private IUserService userService;

    @Override
    @Transactional
    public void buy(int userId, int productId, int count) {
        int money = count;
        // 生成订单
        orderService.create(userId, productId, count, money);
        // 减库存
        storageService.deduct(productId, count);
        // 减余额
        userService.debit(userId, money);
    }
}

重启应用,将数据库中的数据手动回滚。再使用Postman向应用发送请求。

异常链信息和之前不一样了,其实就是@Transactional注解的作用,该注解的原理博主以后会在Spring源码阅读系列中进行介绍。


此时数据是一致的。


分布式事务

现在将每个操作变成一个接口来模拟分布式环境。

Controller

OrderController:

package com.kaven.alibaba.controller;

import com.kaven.alibaba.service.IOrderService;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

@RestController
@RequestMapping("/order")
public class OrderController {

    @Resource
    private IOrderService orderService;

    @PostMapping("/create")
    public void create(@RequestParam("userId") Integer userId,
                       @RequestParam("productId") Integer productId,
                       @RequestParam("count") Integer count,
                       @RequestParam("money") Integer money) {
        orderService.create(userId, productId, count, money);
    }
}

StorageController:

package com.kaven.alibaba.controller;

import com.kaven.alibaba.service.IStorageService;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

@RestController
@RequestMapping("/storage")
public class StorageController {

    @Resource
    private IStorageService storageService;

    @PostMapping("/deduct")
    public void deduct(@RequestParam("productId") Integer productId,
                       @RequestParam("count") Integer count) {
        storageService.deduct(productId, count);
    }
}

UserController:

package com.kaven.alibaba.controller;

import com.kaven.alibaba.service.IUserService;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

@RestController
@RequestMapping("/user")
public class UserController {

    @Resource
    private IUserService userService;

    @PostMapping("/debit")
    public void debit(@RequestParam("userId") Integer userId,
                       @RequestParam("money") Integer money) {
        userService.debit(userId, money);
    }
}

修改BuyController:

package com.kaven.alibaba.controller;

import org.springframework.util.linkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;
import org.springframework.web.util.UriComponentsBuilder;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.Resource;

@RestController
public class BuyController {

    @Resource
    private RestTemplate restTemplate;

    @PostMapping("/buy")
    @Transactional
    public String buy(@RequestParam("userId") Integer userId,
                      @RequestParam("productId") Integer productId,
                      @RequestParam("count") Integer count) {

        // 请求参数
        MultiValueMap queryParams = new linkedMultiValueMap<>();
        queryParams.add("userId", userId.toString());
        queryParams.add("productId", productId.toString());
        queryParams.add("count", count.toString());
        queryParams.add("money", count.toString());

        // 构造请求
        UriComponentsBuilder builder = UriComponentsBuilder.fromHttpUrl("http://localhost:9000/order/create").queryParams(queryParams);
        restTemplate.postForObject(builder.toUriString(), null, Void.class);

        // 构造请求
        builder = UriComponentsBuilder.fromHttpUrl("http://localhost:9000/storage/deduct").queryParams(queryParams);
        restTemplate.postForObject(builder.toUriString(), null, Void.class);

        // 构造请求
        builder = UriComponentsBuilder.fromHttpUrl("http://localhost:9000/user/debit").queryParams(queryParams);
        restTemplate.postForObject(builder.toUriString(), null, Void.class);
        return "success";
    }
}

重启应用,再使用Postman向应用发送请求。

数据不一致。



分布式环境下使用@Transactional注解就不会起作用了(为了简单,这里只是模拟分布式环境下的用户购买商品案例),接下来就需要使用Seata来解决该问题。

Seata

Seata是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata将为用户提供了 AT、TCC、SAGA和XA事务模式,为用户打造一站式的分布式解决方案(这些关于Seata的介绍来自官网)。

  • TC (Transaction Coordinator,事务协调者):维护全局和分支事务的状态,驱动全局事务提交或回滚。
  • TM (Transaction Manager, 事务管理器):定义全局事务的范围,开始全局事务、提交或回滚全局事务。
  • RM (Resource Manager,资源管理器):管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。

很显然TM和RM是在业务代码中的,而TC是一个独立的应用,由Seata提供。

  • 下载地址

博主下载的是1.3.0版本(要和Spring Cloud Alibaba版本兼容)。


简单使用不需要修改配置(默认是基于本地file的形式,为了简单就使用默认形式,分布式环境下需要使用其他形式,其他形式博主以后会介绍),启动Seata:

C:UsersDell>f:

F:>cd F:toolsseata-server-1.3.0seatabin

F:toolsseata-server-1.3.0seatabin>dir
 驱动器 F 中的卷是 WorkSpace
 卷的序列号是 D671-D29B

 F:toolsseata-server-1.3.0seatabin 的目录

2021/12/29  10:50              .
2020/07/16  00:35              ..
2020/07/16  00:35             3,648 seata-server.bat
2020/07/16  00:35             4,175 seata-server.sh
2021/12/29  10:51              sessionStore
               2 个文件          7,823 字节
               3 个目录 241,159,860,224 可用字节

F:toolsseata-server-1.3.0seatabin>seata-server.bat -p 8080


修改配置文件:

server:
  port: 9000

spring:
  application:
    name: seata
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    username: root
    password: ITkaven@666
    url: jdbc:mysql://localhost:3306/seata?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=Asia/Shanghai
  jpa:
    show-sql: true

seata:
  application-id: buy
  tx-service-group: kaven_seata_tx_group
  service:
    vgroup-mapping:
      kaven_seata_tx_group: default
    grouplist:
      - "127.0.0.1:8080"


file.conf(这里的service相关配置要和application.yml配置文件中的一致):

transport {
  # tcp udt unix-domain-socket
  type = "TCP"
  #NIO NATIVE
  server = "NIO"
  #enable heartbeat
  heartbeat = true
  # the client batch send request enable
  enableClientBatchSendRequest = true
  #thread factory for netty
  threadFactory {
    bossThreadPrefix = "NettyBoss"
    workerThreadPrefix = "NettyServerNIOWorker"
    serverExecutorThread-prefix = "NettyServerBizHandler"
    shareBossWorker = false
    clientSelectorThreadPrefix = "NettyClientSelector"
    clientSelectorThreadSize = 1
    clientWorkerThreadPrefix = "NettyClientWorkerThread"
    # netty boss thread size,will not be used for UDT
    bossThreadSize = 1
    #auto default pin or 8
    workerThreadSize = "default"
  }
  shutdown {
    # when destroy server, wait seconds
    wait = 3
  }
  serialization = "seata"
  compressor = "none"
}
service {
  #transaction service group mapping
  vgroupMapping.kaven_seata_tx_group = "default"
  #only support when registry.type=file, please don't set multiple addresses
  default.grouplist = "127.0.0.1:8080"
  #degrade, current not support
  enableDegrade = false
  #disable seata
  disableGlobalTransaction = false
}

client {
  rm {
    asyncCommitBufferLimit = 10000
    lock {
      retryInterval = 10
      retryTimes = 30
      retryPolicyBranchRollbackonConflict = true
    }
    reportRetryCount = 5
    tablemetaCheckEnable = false
    reportSuccessEnable = false
  }
  tm {
    commitRetryCount = 5
    rollbackRetryCount = 5
  }
  undo {
    dataValidation = true
    logSerialization = "jackson"
    logTable = "undo_log"
  }
  log {
    exceptionRate = 100
  }
}

registry.conf:

registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  type = "file"
  file {
    name = "file.conf"
  }
}

config {
  # file、nacos 、apollo、zk、consul、etcd3
  type = "file"

  file {
    name = "file.conf"
  }
}

给buy接口加上@GlobalTransactional注解:

    @PostMapping("/buy")
    @GlobalTransactional
    public String buy(@RequestParam("userId") Integer userId,
                      @RequestParam("productId") Integer productId,
                      @RequestParam("count") Integer count) {

        // 请求参数
        MultiValueMap queryParams = new linkedMultiValueMap<>();
        queryParams.add("userId", userId.toString());
        queryParams.add("productId", productId.toString());
        queryParams.add("count", count.toString());
        queryParams.add("money", count.toString());

        // 构造请求
        UriComponentsBuilder builder = UriComponentsBuilder.fromHttpUrl("http://localhost:9000/order/create").queryParams(queryParams);
        restTemplate.postForObject(builder.toUriString(), null, Void.class);

        // 构造请求
        builder = UriComponentsBuilder.fromHttpUrl("http://localhost:9000/storage/deduct").queryParams(queryParams);
        restTemplate.postForObject(builder.toUriString(), null, Void.class);

        // 构造请求
        builder = UriComponentsBuilder.fromHttpUrl("http://localhost:9000/user/debit").queryParams(queryParams);
        restTemplate.postForObject(builder.toUriString(), null, Void.class);
        return "success";
    }

在数据库中创建undo_log表。

USE seata;
DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log` (
                            `id` bigint(20) NOT NULL AUTO_INCREMENT,
                            `branch_id` bigint(20) NOT NULL,
                            `xid` varchar(100) NOT NULL,
                            `context` varchar(128) NOT NULL,
                            `rollback_info` longblob NOT NULL,
                            `log_status` int(11) NOT NULL,
                            `log_created` datetime NOT NULL,
                            `log_modified` datetime NOT NULL,
                            `ext` varchar(100) DEFAULT NULL,
                            PRIMARY KEY (`id`),
                            UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

启动应用。

手动回滚数据库中的数据,再使用Postman进行测试。

客户端的日志:


TC的日志:

数据库中的数据也是一致的。


通过这些日志信息也能看出来Seata分布式事务解决方案应用成功了。本地事务与分布式事务演示以及Seata的使用就介绍到这里,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/684115.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号