AT 模式
要求
- 基于ACID关系数据库
- Java应用,基于JDBC
机制
两阶段提交
- 一阶段: 业务数据和回滚日志记录在一个本地事务提交,释放本地锁和连接资源
- 二阶段:
- 成功: 提交异步化,快速完成
- 失败: 通过一阶段回滚日志进行反向补偿
写隔离
- 一阶段本地事务提交前,确保拿到全局锁
- 拿不到全局锁,不能提交本地事务
- 拿全局锁尝试限制一定范围内,超出范围将放弃,并释放本地锁
成功案例
tx1 先开始,开启本地事务,拿到本地锁,更新操作 m = 1000 - 100 = 900。本地事务提交前,先拿到该记录的 全局锁 ,本地提交释放本地锁。 tx2 后开始,开启本地事务,拿到本地锁,更新操作 m = 900 - 100 = 800。本地事务提交前,尝试拿该记录的 全局锁 ,tx1 全局提交前,该记录的全局锁被 tx1 持有,tx2 需要重试等待 全局锁 。
tx1 二阶段全局提交,释放 全局锁 。tx2 拿到 全局锁 提交本地事务。
回滚案例
如果 tx1 的二阶段全局回滚,则 tx1 需要重新获取该数据的本地锁,进行反向补偿的更新操作,实现分支的回滚。
此时,如果 tx2 仍在等待该数据的 全局锁,同时持有本地锁,则 tx1 的分支回滚会失败。分支的回滚会一直重试,直到 tx2 的 全局锁 等锁超时,放弃 全局锁 并回滚本地事务释放本地锁,tx1 的分支回滚最终成功。
因为整个过程 全局锁 在 tx1 结束前一直是被 tx1 持有的,所以不会发生 脏写 的问题。
读隔离
AT模式默认全局级别是读未提交(Read Uncommitted)
特点场景下,必须要求全局的 读已提交 , 目前通过 SELECT FOR UPDATE
语句代理。
SELECT FOR UPDATE 语句的执行会申请 全局锁 ,如果 全局锁 被其他事务持有,则释放本地锁(回滚 SELECT FOR UPDATE 语句的本地执行)并重试。这个过程中,查询是被 block 住的,直到 全局锁 拿到,即读取的相关数据是 已提交 的,才返回。
出于总体性能上的考虑,Seata 目前的方案并没有对所有 SELECT 语句都进行代理,仅针对 FOR UPDATE 的 SELECT 语句。
工作机制
TC TM 向 TC 注册
TC 发起 全局事务 ,
通知RM 执行,RM会执行并向TC汇报结果
TC根据结果 全局提交或回滚
一阶段
解析SQL,获得SQL类型条件
查询前镜像:根据解析得到的条件信息,生成查询语句,定位数据
执行业务SQL
查询后镜像
插入回滚日志,将前后镜像及业务SQL组成一条回滚日志,插入
UNDO_LOG
表
{
"branchId": 641789253,
"undoItems": [{
"afterImage": {
"rows": [{
"fields": [{
"name": "id",
"type": 4,
"value": 1
}, {
"name": "name",
"type": 12,
"value": "GTS"
}, {
"name": "since",
"type": 12,
"value": "2014"
}]
}],
"tableName": "product"
},
"beforeImage": {
"rows": [{
"fields": [{
"name": "id",
"type": 4,
"value": 1
}, {
"name": "name",
"type": 12,
"value": "TXC"
}, {
"name": "since",
"type": 12,
"value": "2014"
}]
}],
"tableName": "product"
},
"sqlType": "UPDATE"
}],
"xid": "xid:xxx"
}
- 提交前,向 TC 注册分支,申请
product
表中,主键值等于 1 的记录的 全局锁 。 - 本地事务提交 ,业务数据和
UNDO_LOG
一并提交 - 将本地事务提交结果上报给 TC
二阶段
提交
- 收到 TC 的分支提交请求,把请求放入一个异步任务的队列中,马上返回提交成功的结果给 TC。
- 异步任务阶段的分支提交请求将异步和批量地删除相应 UNDO LOG 记录。
回滚
- 收到 TC 的分支回滚请求,开启一个本地事务,执行如下操作。
- 通过 XID 和 Branch ID 查找到相应的 UNDO LOG 记录。
- 数据校验:拿 UNDO LOG 中的后镜与当前数据进行比较,如果有不同,说明数据被当前全局事务之外的动作做了修改。这种情况,需要根据配置策略来做处理,详细的说明在另外的文档中介绍。
- 根据 UNDO LOG 中的前镜像和业务 SQL 的相关信息生成并执行回滚的语句:
update product set name = 'TXC' where id = 1;
- 提交本地事务。并把本地事务的执行结果(即分支事务回滚的结果)上报给 TC。
Coding
基于 Spring Cloud
微服务
建表
TM
和RM
业务数据库添加undo_log
表
-- 注意此处0.7.0+ 增加字段 context
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
依赖
引入 Spring Cloud
和 seata
依赖
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<!-- 使用ribbon时才用的上 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-ribbon</artifactId>
</dependency>
<!-- 使用feign时才用的上 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.2.4</version>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-seata</artifactId>
<version>2.1.0.RELEASE</version>
</dependency>
<dependency>
<groupId>io.himcs</groupId>
<artifactId>common</artifactId>
<version>0.0.1</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
添加文件
项目 resources
添加以下两个文件
registry.conf
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "eureka"
nacos {
serverAddr = "localhost"
namespace = ""
cluster = "default"
}
eureka {
serviceUrl = "http://localhost:8761/eureka"
application = "default"
weight = "1"
}
redis {
serverAddr = "localhost:6379"
db = "0"
password = ""
cluster = "default"
timeout = "0"
}
zk {
cluster = "default"
serverAddr = "127.0.0.1:2181"
session.timeout = 6000
connect.timeout = 2000
username = ""
password = ""
}
consul {
cluster = "default"
serverAddr = "127.0.0.1:8500"
}
etcd3 {
cluster = "default"
serverAddr = "http://localhost:2379"
}
sofa {
serverAddr = "127.0.0.1:9603"
application = "default"
region = "DEFAULT_ZONE"
datacenter = "DefaultDataCenter"
cluster = "default"
group = "SEATA_GROUP"
addressWaitTime = "3000"
}
file {
name = "file.conf"
}
}
config {
# file、nacos 、apollo、zk、consul、etcd3、springCloudConfig
type = "file"
nacos {
serverAddr = "localhost"
namespace = ""
group = "SEATA_GROUP"
}
consul {
serverAddr = "127.0.0.1:8500"
}
apollo {
app.id = "seata-server"
apollo.meta = "http://192.168.1.204:8801"
namespace = "application"
}
zk {
serverAddr = "127.0.0.1:2181"
session.timeout = 6000
connect.timeout = 2000
username = ""
password = ""
}
etcd3 {
serverAddr = "http://localhost:2379"
}
file {
name = "file.conf"
}
}
file.conf
transport {
# tcp udt unix-domain-socket
type = "TCP"
#NIO NATIVE
server = "NIO"
#enable heartbeat
heartbeat = true
# the client batch send request enable
enableClientBatchSendRequest = true
#thread factory for netty
threadFactory {
bossThreadPrefix = "NettyBoss"
workerThreadPrefix = "NettyServerNIOWorker"
serverExecutorThread-prefix = "NettyServerBizHandler"
shareBossWorker = false
clientSelectorThreadPrefix = "NettyClientSelector"
clientSelectorThreadSize = 1
clientWorkerThreadPrefix = "NettyClientWorkerThread"
# netty boss thread size,will not be used for UDT
bossThreadSize = 1
#auto default pin or 8
workerThreadSize = "default"
}
shutdown {
# when destroy server, wait seconds
wait = 3
}
serialization = "seata"
compressor = "none"
}
service {
#vgroup->rgroup
vgroup_mapping.my_test_tx_group = "default"
#only support when registry.type=file, please don't set multiple addresses
default.grouplist = "127.0.0.1:8091"
#degrade, current not support
enableDegrade = false
#disable seata
disableGlobalTransaction = false
}
client {
rm {
asyncCommitBufferLimit = 10000
lock {
retryInterval = 10
retryTimes = 30
retryPolicyBranchRollbackOnConflict = true
}
reportRetryCount = 5
tableMetaCheckEnable = false
reportSuccessEnable = false
}
tm {
commitRetryCount = 5
rollbackRetryCount = 5
}
undo {
dataValidation = true
logSerialization = "jackson"
logTable = "undo_log"
}
log {
exceptionRate = 100
}
}
配置
配置 eureka
和 datasource
和seata
,其中 tx-service-group
要和file.conf
的vgroup_mapping
一致
application.yml
MYSQL_HOST: himcs.io
server:
port: 8811
spring:
application:
name: business-service
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://${MYSQL_HOST:localhost}:3306/seata-business?useUnicode=true&characterEncoding=UTF8&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai
password: root
username: root
cloud:
alibaba:
seata:
tx-service-group: my_test_tx_group # 事务分组
eureka:
instance:
hostname: localhost
prefer-ip-address: true
client:
serviceUrl:
defaultZone: http://${eureka.instance.hostname}:8761/eureka/
数据源代理
配置 Seata
数据源代理
@Configuration
public class DataSourceConfiguration {
@Bean
@ConfigurationProperties(prefix = "spring.datasource")
public DruidDataSource druidDataSource() {
return new DruidDataSource();
}
@Primary
@Bean
public DataSourceProxy dataSource(DruidDataSource druidDataSource) {
return new DataSourceProxy(druidDataSource);
}
}
TM 端
主类添加 SpringCloud 相关依赖
@SpringBootApplication
@EnableEurekaClient
@EnableDiscoveryClient
@EnableFeignClients
@EnableJpaRepositories
@Slf4j
public class BusinessServerApplication {
public static void main(String[] args) {
SpringApplication.run(BusinessServerApplication.class, args);
}
}
添加相关FeignClient
@FeignClient("${services.order-service}")
public interface OrderService {
@PostMapping("/create")
OrderDTO create(@RequestBody OrderDTO order);
}
全局事务
在方法上添加 @GlobalTransactional(name = "fsp-sale", timeoutMills = 20000, rollbackFor = Exception.class)
@Service
public class BusinessServiceImpl implements BusinessService {
@Resource
private PointsService pointsService;
@Resource
private StorageService storageService;
@Resource
private OrderService orderService;
/**
* 商品销售
*
* @param goodsCode 商品编码
* @param quantity 销售数量
* @param username 用户名
* @param points 增加积分
* @param amount 订单金额
* @return
*/
// TM 向 TC发起全局事务 生成 XID(全局锁)
//
@GlobalTransactional(name = "fsp-sale", timeoutMills = 20000, rollbackFor = Exception.class)
@Override
public OrderDTO sale(String goodsCode, Integer quantity, String username, Integer points, Integer amount) {
PointsDTO pointsDTO = PointsDTO.builder().username(username).points(points).build();
StorageDTO storageDTO = StorageDTO.builder().goodsCode(goodsCode).quantity(quantity).build();
OrderDTO request = OrderDTO.builder().username(username).points(points).goodsCode(goodsCode).quantity(quantity).amount(amount).build();
//写表 UNDO_LOG 记录回滚日志(BranchID),通知TC操作结果
pointsService.increase(pointsDTO);
//写表 UNDO_LOG 记录回滚日志(BranchID),通知TC操作结果
storageService.decrease(storageDTO);
//写表 UNDO_LOG 记录回滚日志(BranchID),通知TC操作结果
OrderDTO order = orderService.create(request);
// 分支 a 执行成功 TM 通知 TC 全局提交
// 分支 a TC 通知所有RM 提交成功, 删除 UNDO_LOG 回滚日志
// 分支 b 执行失败 TM 通知 TC 全局 Rollback
// 分支 b TC 通知所有RM 进行回滚, 根据 UNDO_LOG 反向操作 还原数据 删除 UNDO_LOG
try {
Thread.sleep(100000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return order;
}
}
RM 端
按照配置依次配置依赖,添加文件,配置代理源即可