飞道的博客

Seata分布式事务实践(纯干货附完整代码)

391人阅读  评论(0)

一 分布式事务解决方案

1.1 XA协议

常见的分布式解决方案如下:

  • 两阶段提交型
  • 三阶段提交型
  • TCC补偿机制
  • 异步确保性
  • 最大努力通知型

这几种解决方案中,例如两阶段与三阶段提交都是基于一个XA的协议:

XA协议由Tuxedeo首先提出的,并交给X/Open阻止,作为资源管理器(数据库)与事务管理器的接口标准,目前,Oracle、Informix、DB2、Sybase等各大数据库厂商都提供对XA的支持,XA协议采用两阶段提交方式来管理分布式事务,XA接口提供资源管理器与事务管理器之间进行通信的标准接口。

对应JAVA的实现为JTA/JTS,JTA可以说是定义了一套实现XA协议的接口规范,而JTS就是JTA的具体实现。

1.2 两阶段提交(2PC)与三阶段提交(3PC)

2PC:

2PC即两阶段提交协议,是将整个事务流程分为两个阶段,准备阶段( Prepare phase).提交阶段( commit phase )

在第一阶段(准备阶段),事务管理器TM(协调者)先向事务参与者(资源RM)们发送准备请求,大家都返回OK状态,那么就进入第二阶段,提交事务,如果在第一阶段有任何一个参与者没有OK,那么事务协调器通知其他所有事务参与者(资源RM)回滚事务。

二阶段能保证分布式事务的原子性,但是也有一些明显的缺陷:

  • 在第一阶段,如果参与者迟迟不回复协调者,就会造成事务的阻塞,性能不好。

  • 单节点故障,如果协调器挂了,参与者会阻塞,比如在第二阶段,如果事务协调器宕机,参与者没办法回复信息,长时间处于事务资源锁定,造成阻塞(事务操作是要加锁的)。

  • 在第二阶段,如果在事务协调器发出"commit"执行后宕机,一部和参与者收到了消息提交了事务,而 一部分没有消息没法做出事务提交操作,这样就出现了数据不一致。

  • 在第二阶段,如果事务事务协调器发出“commit”指令后宕机,收到“commmit”指令的参与者也宕机了, 那么事务最终变成了什么效果,提交了还是没提交?没有谁知道。

3PC:

三阶段提交协议主要是为了解决两阶段提交协议的阻塞问题,2PC存在的问题是当协作者崩溃时,参与者不能做出最后的选择。因此参与者可能在协作者恢复之前保持阻塞。三阶段提交(Three-phase commit),是二阶段提交(2PC)的改进版本。

与两阶段提交不同的是,三阶段提交有两个改动点。

  1. 引入超时机制。同时在协调者和参与者中都引入超时机制。
  2. 在第一阶段和第二阶段中插入一个准备阶段。保证了在最后提交阶段之前各参与节点的状态是一致的。

也就是说,除了引入超时机制之外,3PC把2PC的准备阶段再次一分为二,这样三阶段提交就有CanCommit、PreCommit、DoCommit三个阶段。

  • CanCommit: 这里是资源准备阶段(这个时候不会上锁)
  • PreCommit: 这里是资源确认阶段(上锁)
  • DoCommit: 这跟2PC的二阶段差不多

1.3 TCC补偿机制

TCC 其实就是采用补偿机制,其核心思想是:针对每个操作,都要注册一个与其对应的确认和补偿(撤销)操作。它分为三个阶段:

  • Try 阶段主要是对业务系统做检测及资源预留

  • Confirm 阶段主要是对业务系统做确认提交,Try阶段执行成功并开始执行 Confirm阶段时,默认 Confirm阶段是不会出错的。即:只要Try成功,Confirm一定成功。

  • Cancel 阶段主要是在业务执行错误,需要回滚的状态下执行的业务取消,预留资源释放。

TCC实际上把数据库层的二阶段提交上提到了应用层来实现对于数据库来说是一阶段提交,避免了数据库层的2PC性能低下的问题,而且上升到业务层之后再事务处理中可以不仅仅是针对数据库。但是TCC操作需要业务实现,开发成本较高。

1.4 MQ异步确保型策略

MQ异步确保型是指通过消息的一致性来处理事务。当事务发起方执行完成本地事务后并发出一条消息,事务参与方(消息消费者)一定能够接收消息并处理事务成功,此方案强调的是只要消息发给事务参与方最终事务要达到一致。

最终一致性要解决三个问题:本地事务和消息发送的原子性,接收消息的可靠性,消息重复消费问题

  • 本地事务和消息发送原子性

    本地事务与消息发送的原子性问题即:事务发起方在本地事务执行成功后消息必须发出去,否则就丢弃消息。即实现本地事务和消息发送的原子性,要么都成功,要么都失败。本地事务与消息发送的原子性问题是实现可靠消息最终一致性方案的关键问题。

  • 事务参与方接收消息可靠性 ack机制

    务参与方必须能够从消息队列接收到消息,如果接收消息失败可以重复接收消息

  • 消息重复消息问题 幂等性校验

    由于网络2的存在,若某一个消费节点超时但是消费成功,此时消息中间件会重复投递此消息,就导致了消息的重复消费。要解决消息重复消费的问题就要实现事务参与方的方法幂等性。

我们可以以RocketMQ为例子:来看一下它的事务消息的流程设计:

1.5 最大努力通知型

最大努力通知服务表示在不影响主业务的情况下,尽可能地确保数据的一致性。它需要开发人员根据业务来指定通知规则,在满足通知规则的前提下,尽可能的确保数据的一致,以达到最大努力的目的。

目标:发起通知方(接口提供方)通过一定的机制最大努力将业务处理结果通知到接收方(接口调用方)。

具体包括:

  • 有一定的消息重复通知机制。

    因为接收通知方可能没有接收到通知,此时要有一定的机制对消息重复通知。 10s 1min 10min 1h 5h 1d

  • 消息校对机制。

    如果尽最大努力也没有通知到接收方,或者接收方消费消息后要再次消费,此时可由接收方主动向通知 方查询消息信息来满足需求。

  • 消息处理方需要保证幂等性

最大努力通知与MQ异步确保有什么不同?

  • 可靠消息最终一致性

    系统A本地事务执行成功,通知系统B处理任务,通常通过MQ实现。一般适用于平台内部,对一 致性要求相对较高(微服务的2个子系统之间)。

  • 最大努力通知

    所谓最大努力通知就是系统A用最大努力通知系统B,能不能成功,不做完全保证,如果没通知到位,系统B可以主动来调用系统A的接口查询结果状态。一般适用于跨平台业务,或对接了上方平台的业 务场景(支付结果通知)。

1.6 各种方案选型

在学习各种分布式事务的解决方案后,我们了解到各种方案的优缺点:

  • 2PC最大的诟病是一个阻塞协议。RM在执行分支事务后需要等待TM的决定,此时服务会阻塞并锁定资源。由于其阻塞机制和最差时间复杂度高,因此,这种设计不能适应随着事务涉及的服务数量增加而扩展的需要,很难用于并发较高以及子事务生命周期较长(long-running transactions)的分布式服务中。
  • 如果拿TCC事务的处理流程与2PC两阶段提交做比较, 2PC通常都是在跨库的DB层面,而TCC则在应用层面的处理,需要通过业务逻辑来实现。这种分布式事务的实现方式的优势在于,可以让应用自己定义数据操作的粒度,使得降低锁冲突、提高吞吐量成为可能。而不足之处则在于对应用的侵入性非常强,业务逻辑的每个分支都需要实现try、confirm、 cancel三个操作。 此外,其实现难度也比较大,需要按照网络状态、系统故障等不同的失败原因实现不同的回滚策略。
  • 可靠消息最终一致性事务适合执行周期长且实时性要求不高的场景。引入消息机制后,同步的事务操作变为基于消息执行的异步操作,避免了分布式事务中的同步阻塞操作的影响,并实现了两个服务的解耦。典型的使用场景:注册送积分,登录送优惠券等。
  • 最大努力通知是分布式事务中要求最低的一种,适用于一些最终一致性时间敏感度低的业务 ;允许发起通知方处理业务失败,在接收通知方收到通知后积极进行失败处理,无论发起通知方如何处理结果都会不影响到接收通知方的后续处理;发起通知方需提供查询执行情况接口,用于接收通知方校对结果。典型的使用 场景:银行通知、支付结果通知等。

二 Seata基础

2.1 什么是Seata

Seata 是一款开源的分布式事务解决方案,致力于在微服务架构下提供高性能和简单易用的分布式事务服务。在 Seata 开源之前,Seata 对应的内部版本在阿里经济体内部一直扮演着分布式一致性中间件的角色,帮助经济体平稳的度过历年的双11,对各BU业务进行了有力的支撑。经过多年沉淀与积累,商业化产品先后在阿里云、金融云进行售卖。2019.1 为了打造更加完善的技术生态和普惠技术成果,Seata 正式宣布对外开源,未来 Seata 将以社区共建的形式帮助其技术更加可靠与完备。

2.2 Seata的工作原理

Seata主要用的是基于二阶段提交思想的AT模式,通过注解实现非业务侵入。

  • TC(Transaction Coordinator),事务协调者,在源码中Seata Server充当事务协调者身份,维护全局锁状态,协调全局事务的提交与回滚。

  • TM(Transaction Manager),事务管理者,业务代码中使用了全局事务注解的服务属于事务管理者,控制全局事务的范围,执行全局事务的提交与回滚。

  • RM(Resource Manager),资源管理者,业务代码中被远程调用的部分,负责执行本地事务,和提交与回滚本地事务。

第一阶段如上图所有的RM执行自己的本地事务。在执行本地事务时,用大白话讲其实就是jdbc执行sql时,seata使用了数据源代理,在执行sql前,对sql进行解析,生成前置镜像sql,后置镜像sql,同时向undo log插入一条数据,方便后期万一出现异常做回滚,然后向TC注册分支事务,提交本地事务,最后向TC提交它的分支事务状态。

二阶段流程分两种情况

  1. 所有RM本地事务执行成功,此时TM会向TC发起全局事务提交,TC会立马释放全局锁然后异步驱动所有RM做分支事务的提交。

  2. 存在一个RM本地事务不成功,此时TM会向TC发起全局事务回滚,TC会驱动所有的RM做回滚操作,等待所有的RM回滚成功后然后再释放全局锁。

这个阶段所有RM提交分支事务,其实就是删除Undo Log表里的记录,如果提交分支事务失败,并不会影响业务数据,可以手动的做Undo Log删除。

这个阶段,所有的RM执行分支事务回滚,此时是去Undo Log表中查找数据,然后通过第一阶段生成的后置镜像sql,与数据进行校验,通过前置镜像sql做回滚,然后删除Undo Log日志。

三 Seata安装

3.1 Seata的下载

打开官方网站的下载页:http://seata.io/zh-cn/blog/download.html

最新版本是1.4.0.点击 binary 进行下载,这里我们可以把0.9.0的包也下载下来,因为有些初始配置(包括数据库建表语句)在新包里面没有,需要到github上面去找,但是0.9.0这里里面有需要的那些东西。

下载下来之后是压缩包,解压就可以:

3.2 Seata的配置

解压出来之后,我们需要进入到conf目录进行配置:

这里我们需要对两个配置文件进行设置,首先打开registry.conf:

#这里主要是配置服务的注册中心,Seata的服务器也是一个微服务
registry {
   
  # 注册中心可以是多种形式: file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  # type的值就是需要选择的形式,我们选择用当前项目使用的nacos  
  type = "nacos"
  loadBalance = "RandomLoadBalance"
  loadBalanceVirtualNodes = 10

  #选择nacos后相关的配置  
  nacos {
   
    application = "seata-server"
    serverAddr = "localhost:8848"
    group = "SEATA_GROUP"
    namespace = "public"
    cluster = "DEFAULT"
    username = "nacos"
    password = "nacos"
  }
  
  #后面是选择各个不同的注册中心需要的配置,我们不需要去做改动
  eureka {
   
    serviceUrl = "http://localhost:8761/eureka"
    application = "default"
    weight = "1"
  }
  redis {
   
    serverAddr = "localhost:6379"
    db = 0
    password = ""
    cluster = "default"
    timeout = 0
  }
  zk {
   
    cluster = "default"
    serverAddr = "127.0.0.1:2181"
    sessionTimeout = 6000
    connectTimeout = 2000
    username = ""
    password = ""
  }
  consul {
   
    cluster = "default"
    serverAddr = "127.0.0.1:8500"
  }
  etcd3 {
   
    cluster = "default"
    serverAddr = "http://localhost:2379"
  }
  sofa {
   
    serverAddr = "127.0.0.1:9603"
    application = "default"
    region = "DEFAULT_ZONE"
    datacenter = "DefaultDataCenter"
    cluster = "default"
    group = "SEATA_GROUP"
    addressWaitTime = "3000"
  }
  file {
   
    name = "file.conf"
  }
}

#这里是服务端的相关参数配置
config {
   
  #参数配置也可以选择不同的形式: file、nacos 、apollo、zk、consul、etcd3
  #type的值就是选择的具体配置形式,我们选择的是文件配置  
  type = "file"

  #这些是其他配置方式的各个配置,可以不用改动
  nacos {
   
    serverAddr = "127.0.0.1:8848"
    namespace = ""
    group = "SEATA_GROUP"
    username = ""
    password = ""
  }
  consul {
   
    serverAddr = "127.0.0.1:8500"
  }
  apollo {
   
    appId = "seata-server"
    apolloMeta = "http://192.168.1.204:8801"
    namespace = "application"
    apolloAccesskeySecret = ""
  }
  zk {
   
    serverAddr = "127.0.0.1:2181"
    sessionTimeout = 6000
    connectTimeout = 2000
    username = ""
    password = ""
  }
  etcd3 {
   
    serverAddr = "http://localhost:2379"
  }

  #文件配置指出文件是谁
  file {
   
    name = "file.conf"
  }
}

这样registry就配置好了,我们现在需要打开第二个文件就是参数配置文件(file.conf)进行相关配置:

#参数配置里面最主要的是数据源的配置,因为Seata在处理事务的过程中会存储一些临时数据,所以需要数据源
store {
   
  # 配置数据源的类型,可以是 db(数据库)、file、redis
  # 这里我们选择用数据库  
  mode = "db"

  ## 这是用文件存储的配置 不用改动
  file {
   
    dir = "sessionStore"
    maxBranchSessionSize = 16384
    maxGlobalSessionSize = 512
    fileWriteBufferCacheSize = 16384
    sessionReloadReadSize = 100
    flushDiskMode = async
  }

  ## 这是配置选择数据库时候的连接配置
  db {
   
    ## 连接池配置,可以是druid、dbcp、hikari
    datasource = "druid"
    ## 数据库的类型,我们选择mysql
    dbType = "mysql"
    ## 下面是数据库的常规配置
    driverClassName = "com.mysql.jdbc.Driver"
    url = "jdbc:mysql://127.0.0.1:3306/seata"
    user = "root"
    password = "123321"
    minConn = 5
    maxConn = 100
    ## 下面这三个配置是数据库需要用到的表示谁?数据库存储会涉及到表,这里有三张表
    ## 这三张表是Seata提供的,我们只需要导入DDL就可以
    globalTable = "global_table"
    branchTable = "branch_table"
    lockTable = "lock_table"
    queryLimit = 100
    maxWait = 5000
  }

  ## 这是选择用redis作为数据源的配置,不用改动
  redis {
   
    host = "127.0.0.1"
    port = "6379"
    password = ""
    database = "0"
    minConn = 1
    maxConn = 10
    maxTotal = 100
    queryLimit = 100
  }

}

3.3 数据库建表

在前面file的配置里面数据库需要三张表,这三张表在1.4.0这个版本下面它没有提供,所以我们去0.9.0的conf目录下面:

可以看到这里面有两个sql文件,db_store.sql就是这三张表的DDL,我们直接在导入mysql数据库就可以:

-- 每当有一个全局事务发起后,就会在该表中记录全局事务的ID。
drop table if exists `global_table`;
create table `global_table` (
  `xid` varchar(128)  not null,
  `transaction_id` bigint,
  `status` tinyint not null,
  `application_id` varchar(32),
  `transaction_service_group` varchar(32),
  `transaction_name` varchar(128),
  `timeout` int,
  `begin_time` bigint,
  `application_data` varchar(2000),
  `gmt_create` datetime,
  `gmt_modified` datetime,
  primary key (`xid`),
  key `idx_gmt_modified_status` (`gmt_modified`, `status`),
  key `idx_transaction_id` (`transaction_id`)
);

-- 记录每一个分支事务的ID,分支事务操作的哪个数据库等信息
drop table if exists `branch_table`;
create table `branch_table` (
  `branch_id` bigint not null,
  `xid` varchar(128) not null,
  `transaction_id` bigint ,
  `resource_group_id` varchar(32),
  `resource_id` varchar(256) ,
  `lock_key` varchar(128) ,
  `branch_type` varchar(8) ,
  `status` tinyint,
  `client_id` varchar(64),
  `application_data` varchar(2000),
  `gmt_create` datetime,
  `gmt_modified` datetime,
  primary key (`branch_id`),
  key `idx_xid` (`xid`)
);

-- 用于生成全局锁
drop table if exists `lock_table`;
create table `lock_table` (
  `row_key` varchar(128) not null,
  `xid` varchar(96),
  `transaction_id` long ,
  `branch_id` long,
  `resource_id` varchar(256) ,
  `table_name` varchar(32) ,
  `pk` varchar(36) ,
  `gmt_create` datetime ,
  `gmt_modified` datetime,
  primary key(`row_key`)
);

在上面的目录结构里面还有一个sql文件db_undo_log.sql,这个也是在执行事务的过程中需要用到的表,但并不是Seata服务器直接操作的表,这是我们自己的业务服务里面需要操作的表,所以在我么项目自己的每个数据库里面也需要插入这个表:

-- 此脚本必须初始化在你当前的业务数据库中,用于AT 模式XID记录。与server端无关(注:业务数据库)
-- 注意此处0.3.0+ 增加唯一索引 ux_undo_log
drop table `undo_log`;
CREATE TABLE `undo_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(100) NOT NULL,
  `context` varchar(128) NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int(11) NOT NULL,
  `log_created` datetime NOT NULL,
  `log_modified` datetime NOT NULL,
  `ext` varchar(100) DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

3.4 启动服务

上面三个步骤配置好之后我们就可以启动Seata服务了,启动之前,我们需要先开启nacos,然后通过Seata的bin目录下面的命令启动Seata:

看到这里就启动成功,nacos上面也可以看到该服务:

四 Seata的AT模式

4.1 AT模式的原理

Seata在实现分布式事务上有几种不同的方式,最主要的就是AT模式与TCC模式,我们首先来看一下AT模式的实现原理。

Seata的AT模式建立在关系型数据库的本地事务特性的基础之上,通过数据源代理类拦截并解析数据库执行的SQL,记录自定义的回滚日志,如需回滚,则重放这些自定义的回滚日志即可。AT模式虽然是根据XA事务模型(2PC)演进而来的,但是AT打破了XA协议的阻塞性制约,在一致性、和性能上取得了平衡。

在Seata in AT mode下,上体描述的全局事务执行流程为:

  1. service2向Seata注册全局事务,并产生一个全局事务标识XID

  2. service1.DB1、service2.DB2、service3.DB3向Seata注册分支事务,并将其纳入该XID对应的全局事务范围

  3. service1.DB1、service2.DB2、service3.DB3向Seata汇报本地事务的准备状态

  4. Seata汇总所有的DB的本地事务的准备状态,决定全局事务是该提交还是回滚

  5. Seata通知service1.DB1、service2.DB2、service3.DB3提交/回滚本地事务

AT模式是基于XA事务模型演进而来的,可以看出它的整体机制也是一个改进版本的两阶段提交协议。AT模式的两个基本阶段是:

  1. 第一阶段:业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源;

  2. 第二阶段:提交异步化,这个过程很快就能完成。若需要回滚,则通过第一阶段的回滚日志进行反向补偿。

AT模式的两个阶段到底干了什么,我们来看一下:

  • 第一阶段:

    1. 开启本地事务

    2. 对响应的业务表进行写操作

    3. 将写操作的记录快照组成一个回滚日志插入undo_log表(第二阶段如果Seata需要回滚回访该快照即可)

    4. 事务提交之前向seata注册分支事务,申请全局锁,避免其他事务干扰

    5. 提交本地事务,包括业务数据的修改、undo_log表的插入

    6. 向Seata server报告本地事务的提交结果

  • 第二阶段:

    第一阶段结束之后,Seata会接收到所有分支事务的提交状态,然后决定是提交全局事务还是回滚全局事务。

    1. 若所有分支事务本地提交均成功,则Seata决定全局提交。Seata将分支提交的消息发送给各个分支事务,各个分支事务收到分支提交消息后,会将消息放入一个缓冲队列,然后直接向Seata返回提交成功。之后,每个本地事务会慢慢处理分支提交消息,处理的方式为:删除相应分支事务的undo_log记录。之所以只需删除分支事务的undo_log记录,而不需要再做其他提交操作,是因为提交操作已经在第一阶段完成了(这也是AT和XA不同的地方)。
    2. 若任一分支事务本地提交失败,则Seata决定全局回滚,将分支事务回滚消息发送给各个分支事务,由于在第一阶段各个服务的数据库上记录了undo_log记录,分支事务回滚操作只需根据undo_log记录进行补偿即可。

4.2 AT模式应用

AT模式的实现其实非常简单,我们只需要一个全局事务的注解(@GlobalTransactional(rollbackFor = Exception.class))添加到业务方法上即可。

这是一个模拟订单的创界业务,创建订单order之后会发送请求调用库存服务的接口来修改库存,库存服务跟订单服务是两个独立的数据库:

/**
 * 订单操作业务类
 */
@Service
public class OrderServiceImpl extends ServiceImpl<OrderMapper, Order> implements OrderService{
   
	
    //库存服务的Feign接口
	@Autowired
	private ProductClient productClient;
	
    //创建订单,加上了全局事务管理
	@GlobalTransactional(rollbackFor = Exception.class)
	@Override
	public void createOrder() {
   
		Order order = new Order();
		order.setNum("WONIU202103100067");
		save(order);
		productClient.updateCont();
	}

}

我们来看一下两个数据库表:

订单服务里面的订单表,就两个测试字段,现在里面没有数据

库存服务里面的商品库存表,有一个商品数据 库存为10:

看一下库存服务里面的这个接口逻辑执行productClient.updateCont():

/*
 * 就是库存减一
 */
@Override
public void updateProductCont() {
   
	Product product = getById(1);
	product.setCont(product.getCont() - 1);
	product.setId(1);
	updateById(product);
}

现在我们去掉Seata的全局事务管理并且抛一个异常看一下:

//@GlobalTransactional(rollbackFor = Exception.class)
@Override
public void createOrder() {
   
	Order order = new Order();
	order.setNum("WONIU202103100067");
	save(order);
	int i = 1/0;
	productClient.updateCont();
}

执行这个业务,观察结果如下:

我们可以看到,订单数据被插入成功,但是库存数据并没有减少,所以现在是处在无事务下运行,我们把全局事务加上再次执行:

@GlobalTransactional(rollbackFor = Exception.class)
@Override
public void createOrder() {
   
	Order order = new Order();
	order.setNum("WONIU202103100067");
	save(order);
	int i = 1/0;
	productClient.updateCont();
}

跟前面的数据一样,没有新的订单,库存也没有变化,这次的数据就有了事务管理,订单数据被正常回滚。


转载:https://blog.csdn.net/waverlyc/article/details/114819614
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场