飞道的博客

SpringCloud整合Seata(AT两阶段--场景: 下单减少库存)

350人阅读  评论(0)

零:前置操作 — 搭建Seata服务

之前博客整合步骤:

https://blog.csdn.net/Abraxs/article/details/128425499?spm=1001.2014.3001.5502

一:介绍说明

两微服务:
一个order下单服务,一个stock库存服务
下单同时调用stock减少库存

二:添加undolog表

在各业务库中添加事务日志表,相当于各服务都有个事务分支管理

CREATE TABLE `undo_log` (
  `id` bigint NOT NULL AUTO_INCREMENT,
  `branch_id` bigint NOT NULL,
  `xid` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
  `context` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int NOT NULL,
  `log_created` datetime NOT NULL,
  `log_modified` datetime NOT NULL,
  `ext` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
  PRIMARY KEY (`id`) USING BTREE,
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8mb3 ROW_FORMAT=DYNAMIC;

三:框架整合Seata相关依赖

3.1:引入公共SEATA POM依赖

Seata事务相关依赖抽出,作为一公共组件供其他服务引用

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>europa-platform</artifactId>
        <groupId>com.europa</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>commons-global-tx</artifactId>

    <dependencies>
        <dependency>
            <groupId>com.alibaba.nacos</groupId>
            <artifactId>nacos-client</artifactId>
<!--            <version>1.2.0</version>-->
        </dependency>


        <!-- 分布式事务解决方案 -->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>io.seata</groupId>
                    <artifactId>seata-spring-boot-starter</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>io.seata</groupId>
            <artifactId>seata-spring-boot-starter</artifactId>
            <version>1.4.2</version>
        </dependency>

    </dependencies>
</project>

 

3.2:业务服务引入SEATA公共组件依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>modules</artifactId>
        <groupId>com.europa</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>europa-tx</artifactId>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-tx</artifactId>
            <version>5.3.3</version>
        </dependency>

    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit-test</artifactId>
        <scope>test</scope>
    </dependency>
<!--    <dependency>-->
<!--        <groupId>org.springframework.boot</groupId>-->
<!--        <artifactId>spring-boot-starter-amqp</artifactId>-->
<!--    </dependency>-->
        <dependency>
            <groupId>com.europa</groupId>
            <artifactId>commons-db</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
        <dependency>
            <groupId>com.europa</groupId>
            <artifactId>commons-support-api</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
        <dependency>
            <groupId>com.europa</groupId>
            <artifactId>commons-global-tx</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
<!--        <dependency>-->
<!--            <groupId>com.alibaba.cloud</groupId>-->
<!--            <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>-->
<!--        </dependency>-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.2.11</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-test</artifactId>
            <version>2.7.1</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
            <version>5.3.21</version>
        </dependency>
    </dependencies>
</project>

 

3.3:yml文件配置Seata客户端和注册信息

seata:
  enabled: true
  application-id: ${
   spring.application.name}
  # 事务组
  tx-service-group: my_test_tx_group
  # 自动数据源代理
  enable-auto-data-source-proxy: true
  # 数据源代理模式(分布式事务方案)
  data-source-proxy-mode: AT
  # 与Nacos配置的vgrouping一直
  service:
    vgroup-mapping:
      my_test_tx_group: default
  #nacos配置
  config:
    type: nacos
    nacos:
      server-addr: 192..101:8848
      group: SEATA_GROUP
      namespace: d4874eb0-1917-45fd-8dc1-34f6f3f5265a
      data-id: seataServer.properties
      username: nacos
      password: nacos
  #nacos注册
  registry:
    type: nacos
    nacos:
      server-addr: 192..101:8848  # seata server 所在的nacoas服务地址
      application: seata-server    # 默认名称:seata-server 没有修改可以不配置
      group: SEATA_GROUP # 默认分组:SEATA_GROUP 没有修改可以不配置
      username: nacos
      password: nacos
      namespace: d4874eb0-1917-45fd-8dc1-34f6f3f5265a

 

下面是整个YML

server:
  port: 8092
spring:
  application:
    name: order-server
  rabbitmq:
    host: 192.101
    port: 5672
    username: root
    password: 123456
  cloud:
    nacos:
      config:
        server-addr: 101.62:8848
        file-extension: yaml
      discovery:
        server-addr: 101.62:8848
#    sentinel:
#      transport:
#        dashboard: 192.168.56.104:8887
  #      config:
#        # 是否开启配置中心 默认true
#        enabled: true
#        server-addr: 101.62:8848  #nacos的服务注册中心地址
#        # 配置文件后缀
#        file-extension: yml
#        # 配置对应的分组
#        group: SEATA_GROUP
#        # Nacos 认证用户
#        username: nacos
#        # Nacos 认证密码
#        password: nacos
#        # 支持多个共享 Data Id 的配置,优先级小于extension-configs,自定义 Data Id 配置 属性是个集合,内部由 Config POJO 组成。Config 有 3 个属性,分别是 dataId, group 以及 refresh
#        shared-configs[0]:
#          data-id: seata-client.yaml # 配置文件名-Data Id
#          group: SEATA_GROUP   # 默认为DEFAULT_GROUP
#          refresh: false   # 是否动态刷新,默认为false
#        shared-configs:
#          - data-id: seata-client.yaml
#            group: SEATA_GROUP
#            refresh: true
  datasource: #链接数据源
    type: com.alibaba.druid.pool.DruidDataSource
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://192.:3306/chain-order?useUnicode=true&characterEncoding=UTF-8&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=GMT%2b8
    username: root
    password: 123456

#分布式事务
seata:
  enabled: true
  application-id: ${spring.application.name}
  # 客户端和服务端在同一个事务组
  tx-service-group: my_test_tx_group
  # 自动数据源代理
  enable-auto-data-source-proxy: true
  # 数据源代理模式(分布式事务方案)
  data-source-proxy-mode: AT
  # 事务群组,配置项值为TC集群名,需要与服务端保持一致
  service:
    vgroup-mapping:
      my_test_tx_group: default
  #整合nacos配置中心
  config:
    type: nacos
    nacos:
      server-addr: 192..101:8848
      group: SEATA_GROUP
      namespace: d4874eb0-1917-45fd-8dc1-34f6f3f5265a
      data-id: seataServer.properties
      #可选
      username: nacos
      #可选
      password: nacos
  #整合nacos注册中心
  registry:
    type: nacos
    nacos:
      server-addr: 192..101:8848  # seata server 所在的nacoas服务地址
      application: seata-server    # 默认名称:seata-server 没有修改可以不配置
      group: SEATA_GROUP # 默认分组:SEATA_GROUP 没有修改可以不配置
      username: nacos
      password: nacos
      namespace: d4874eb0-1917-45fd-8dc1-34f6f3f5265a

 

四:框架整合OpenFeign RPC服务调用相关依赖

4.1:OpenFeign RPC POM依赖

 openfeign相关配置依赖单独抽出作为公共组件供其他服务引用

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>commons</artifactId>
        <groupId>com.europa</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>
    <artifactId>commons-support-api</artifactId>

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-openfeign</artifactId>
        </dependency>
        <dependency>
            <groupId>io.github.openfeign</groupId>
            <artifactId>feign-okhttp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-loadbalancer</artifactId>
        </dependency>
    </dependencies>
</project>

 

4.2:OpenFeign RPC 库存服务接口整合

场景: 进入订单服务,订单服务会调用库存服务,需要把库存接口地址抽出来到Openfeign公共组件中

package com.europa.support.provider;

import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;

@Component
@FeignClient(value = "stock-server", url = "http://localhost:8093/")
public interface IStockProvider {
   

    @PostMapping("/reduceAmont")
    String reduceAmont(@RequestParam("productId") Integer productId);

    @Component
    class IStockProviderFallback implements IStockProvider {
   
        @Override
        public String reduceAmont(Integer productId) {
   
            return null;
        }
    }

}


 

4.2:订单业务服务整合Opengfeign组件

4.2.1:Opeign组件POM引入

<dependency>
    <groupId>com.europa</groupId>
    <artifactId>commons-support-api</artifactId>
    <version>1.0-SNAPSHOT</version>
</dependency>

五:启动订单服务和库存服务 ---- (库存服务和订单服务搭建过程一样,只不过库和服务名不一样)

5.1:启动类注解说明 ---- @EnableFeignClients

该注解类 : 是为了启动是扫描到OpenFeign的定义的包路径,跨服务接口并没有具体实现类,通过动态代理

package com.europa.tx;

import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.openfeign.EnableFeignClients;

@EnableDiscoveryClient
@SpringBootApplication
@EnableFeignClients(basePackages = "com.europa.support.provider")
@MapperScan(basePackages = {
   "com.europa.tx.mapper"})
public class EurtxApplication {
   

    public static void main(String[] args) {
   
        SpringApplication app = new SpringApplication(EurtxApplication.class);
        app.run(args);
    }

}

 

5.2:启动服务查看输出日志

可以看到打印信息,资源管理器RM 注入成功,对应的事务组也成功配置打印

5.2.1:订单服务打印日志如下:

2022-12-28 01:48:13.257  INFO 8488 --- [           main] i.s.c.rpc.netty.RmNettyRemotingClient    : RM will register :jdbc:mysql://192..104:3306/chain-order
2022-12-28 01:48:13.260  INFO 8488 --- [           main] i.s.core.rpc.netty.NettyPoolableFactory  : NettyPool create channel to transactionRole:RMROLE,address:192.:8091,msg:< RegisterRMRequest{
   resourceIds='jdbc:mysql://192..104:3306/chain-order', applicationId='order-server', transactionServiceGroup='my_test_tx_group'} >

2022-12-28 01:49:10.975  INFO 8488 --- [eoutChecker_1_1] i.s.core.rpc.netty.NettyPoolableFactory  : NettyPool create channel to transactionRole:TMROLE,address:192..102:8091,msg:< RegisterTMRequest{
   applicationId='order-server', transactionServiceGroup='my_test_tx_group'} >
2022-12-28 01:49:10.987  INFO 8488 --- [eoutChecker_1_1] i.s.c.rpc.netty.TmNettyRemotingClient    : register TM success. client version:1.3.0, server version:1.4.2,channel:[id: 0x318597e3, L:/192..1:52260 - R:/192..102:8091]
2022-12-28 01:49:10.987  INFO 8488 --- [eoutChecker_1_1] i.s.core.rpc.netty.NettyPoolableFactory  : register success, cost 5 ms, version:1.4.2,role:TMROLE,channel:[id: 0x318597e3, L:/192.1:52260 - R:/192.102:8091]

5.2.2:库存服务打印日志如下:

六:发送订单服务请求,跨服务调用,实现Seata分布式事务

6.1:请求订单服务

http://localhost:8092/syncOrder

6.1.1:OrderController.class

OrderController.class
package com.europa.tx.controller;

//import org.springframework.amqp.core.Message;
//import org.springframework.amqp.core.MessageProperties;
//import org.springframework.amqp.rabbit.core.RabbitTemplate;
import com.europa.tx.entity.BizOrder;
import com.europa.tx.service.OrderService;
import org.glassfish.jersey.message.MessageProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;


@RestController
public class OrderController {
   

//    @Autowired
//    RabbitTemplate rabbitTemplate;

    @Autowired
    private OrderService orderService;

    @RequestMapping("/syncOrder")
    public String syncOrder(){
   
//        System.out.println("seata全局事务id====================>"+ RootContext.getXID());
        BizOrder order = new BizOrder();
        order.setProductId(1);
        order.setStatus("0");
        order.setTotalAmount(100);
        try {
   
            boolean result = orderService.saveLock(order);
        } catch (Exception e) {
   
            e.printStackTrace();
        }
        return "操作成功";
    }

    @RequestMapping("/delayedSend")
    public String delayedSend(){
   

//        Random random = new Random();
//        int i = random.nextInt(10);
//        String millTimes = String.valueOf(i * 1000);
//
//        MessageProperties messageProperties = new MessageProperties();
//
//        String msg = " 我是 plugins - delay";
//        messageProperties.setHeader("x-delay",millTimes);//延迟5秒被删除
//        Message message = new Message(msg.getBytes(), messageProperties);
//        rabbitTemplate.convertAndSend("delayed-exchange","delay",message);
//        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//        System.out.println("消息发送成功【" + sdf.format(new Date()) + "】");
        return "";

    }
}

 

6.1.2:OrderServiceImpl.class

package com.europa.tx.service.impl;

import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.europa.support.provider.IStockProvider;
import com.europa.tx.entity.BizOrder;
import com.europa.tx.mapper.OrderMapper;
import com.europa.tx.service.OrderService;
import io.seata.core.context.RootContext;
import io.seata.spring.annotation.GlobalTransactional;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;


import javax.annotation.Resource;
import java.util.UUID;


@Service
public class OrderServiceImpl extends ServiceImpl<OrderMapper, BizOrder> implements OrderService {
   

    @Resource
    private OrderMapper orderMapper;

    @Autowired
    private IStockProvider iStockProvider;

    @Override
    @GlobalTransactional(name = "my_test_tx_group", rollbackFor = Exception.class)
    // @Transactional(rollbackFor = Exception.class)
    public boolean saveLock(BizOrder order) throws Exception {
   
        System.out.println("seata全局事务id====================>"+ RootContext.getXID());


        // 模拟有条订单发生改变
        BizOrder existOrder = orderMapper.selectById("048b2d2836b641aa93b16d410f682db0");
        if (ObjectUtils.isNotEmpty(existOrder)) {
   
            existOrder.setTotalAmount(50);
            baseMapper.updateById(existOrder);
        }
        order.setId(UUID.randomUUID().toString().replaceAll("-", "").toString());
        order.setProductId(1);
        order.setStatus("看到改变了,说明失效了哦");
        //  下单
        int a = orderMapper.insert(order);
        String aa = iStockProvider.reduceAmont(order.getProductId());
        return true;
    }

}

 
6.1.2.1:查看订单请求对应Seata信息
6.1.2.1.1:订单库里undolog表信息

注意:订单服务实现类两条对数据库的操作,修改和新增,下面看undo_log信息
baseMapper.updateById(existOrder);
int a = orderMapper.insert(order);

查看rollback_info字段信息
SELECT CONVERT(t.rollback_info USING utf8mb4),t.* FROM undo_log t

可以看到生成了两条branch_id分支,分别对应数据库操作的回滚信息,有一个update更新类型和一个insert,分别对应一个beforeImage和afterImage镜像信息,包含之前之后的sql变化和值,方便后面抛异常RC事务协调器通知RM资源管理器进行回滚操作。

{
   "@class":"io.seata.rm.datasource.undo.BranchUndoLog","xid":"192..102:8091:18358011811149491","branchId":18358011811149494,"sqlUndoLogs":["java.util.ArrayList",[{
   "@class":"io.seata.rm.datasource.undo.SQLUndoLog","sqlType":"UPDATE","tableName":"biz_order","beforeImage":{
   "@class":"io.seata.rm.datasource.sql.struct.TableRecords","tableName":"biz_order","rows":["java.util.ArrayList",[{
   "@class":"io.seata.rm.datasource.sql.struct.Row","fields":["java.util.ArrayList",[{
   "@class":"io.seata.rm.datasource.sql.struct.Field","name":"id","keyType":"PRIMARY_KEY","type":12,"value":"048b2d2836b641aa93b16d410f682db0"},{
   "@class":"io.seata.rm.datasource.sql.struct.Field","name":"product_id","keyType":"NULL","type":-5,"value":["java.lang.Long",1]},{
   "@class":"io.seata.rm.datasource.sql.struct.Field","name":"total_amount","keyType":"NULL","type":4,"value":100},{
   "@class":"io.seata.rm.datasource.sql.struct.Field","name":"status","keyType":"NULL","type":12,"value":"看到改变了,说明失效了哦"}]]}]]},"afterImage":{
   "@class":"io.seata.rm.datasource.sql.struct.TableRecords","tableName":"biz_order","rows":["java.util.ArrayList",[{
   "@class":"io.seata.rm.datasource.sql.struct.Row","fields":["java.util.ArrayList",[{
   "@class":"io.seata.rm.datasource.sql.struct.Field","name":"id","keyType":"PRIMARY_KEY","type":12,"value":"048b2d2836b641aa93b16d410f682db0"},{
   "@class":"io.seata.rm.datasource.sql.struct.Field","name":"product_id","keyType":"NULL","type":-5,"value":["java.lang.Long",1]},{
   "@class":"io.seata.rm.datasource.sql.struct.Field","name":"total_amount","keyType":"NULL","type":4,"value":50},{
   "@class":"io.seata.rm.datasource.sql.struct.Field","name":"status","keyType":"NULL","type":12,"value":"看到改变了,说明失效了哦"}]]}]]}}]]}

BEFOREIMAGE

AFTERIMAGE

6.1.2.1.2:seata-config配置库里branch_table,global_table,lock_table表信息
branch_table

global_table

###### lock_table

6.2:订单服务进入到库存服务

6.2.1 StockServiceImpl.class

断点打在库存服务异常行,在执行sql语句之后,上面订单服务断点位置在sql语句之前,对undolog和seatta-config做比较

package com.europa.tx.stock.service.impl;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.europa.tx.stock.entity.Stock;
import com.europa.tx.stock.mapper.StockMapper;
import com.europa.tx.stock.service.StockService;
import io.seata.core.context.RootContext;
import io.seata.spring.annotation.GlobalTransactional;
import org.apache.commons.lang3.RandomUtils;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.Random;

@Service
public class StockServiceImpl extends ServiceImpl<StockMapper, Stock> implements StockService {
   

    @GlobalTransactional(name = "my_test_tx_group", rollbackFor = Exception.class)
    @Override
    public void reduceAmont(Integer productId) throws Exception {
   
        System.out.println("seata全局事务id====================>"+ RootContext.getXID());
        LambdaQueryWrapper<Stock> queryWrapper = new LambdaQueryWrapper();
        queryWrapper.eq(Stock::getProductId,productId);
        Stock result = this.getOne(queryWrapper);
        result.setCount(result.getCount() - 1);
        this.updateById(result);
        // 注意打断点在这里调试
        throw new Exception("");
    }
}

 

chain-stock.undo_log

branch_table

lock_table

6.2.2 库存服务抛异常-成功回滚

库存服务回滚日志

订单服务回滚日志


转载:https://blog.csdn.net/Abraxs/article/details/128462916
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场