AT 模式

要求

  • 基于ACID关系数据库
  • Java应用,基于JDBC

机制

两阶段提交

  • 一阶段: 业务数据和回滚日志记录在一个本地事务提交,释放本地锁和连接资源
  • 二阶段:
    • 成功: 提交异步化,快速完成
    • 失败: 通过一阶段回滚日志进行反向补偿

写隔离

  • 一阶段本地事务提交前,确保拿到全局锁
  • 拿不到全局锁,不能提交本地事务
  • 拿全局锁尝试限制一定范围内,超出范围将放弃,并释放本地锁

成功案例

tx1 先开始,开启本地事务,拿到本地锁,更新操作 m = 1000 - 100 = 900。本地事务提交前,先拿到该记录的 全局锁 ,本地提交释放本地锁。 tx2 后开始,开启本地事务,拿到本地锁,更新操作 m = 900 - 100 = 800。本地事务提交前,尝试拿该记录的 全局锁 ,tx1 全局提交前,该记录的全局锁被 tx1 持有,tx2 需要重试等待 全局锁

tx1 二阶段全局提交,释放 全局锁 。tx2 拿到 全局锁 提交本地事务。

XA

回滚案例

如果 tx1 的二阶段全局回滚,则 tx1 需要重新获取该数据的本地锁,进行反向补偿的更新操作,实现分支的回滚。

此时,如果 tx2 仍在等待该数据的 全局锁,同时持有本地锁,则 tx1 的分支回滚会失败。分支的回滚会一直重试,直到 tx2 的 全局锁 等锁超时,放弃 全局锁 并回滚本地事务释放本地锁,tx1 的分支回滚最终成功。

因为整个过程 全局锁 在 tx1 结束前一直是被 tx1 持有的,所以不会发生 脏写 的问题。

XA-rollback

读隔离

AT模式默认全局级别是读未提交(Read Uncommitted)

特点场景下,必须要求全局的 读已提交 , 目前通过 SELECT FOR UPDATE 语句代理。

Read Isolation: SELECT FOR UPDATE

SELECT FOR UPDATE 语句的执行会申请 全局锁 ,如果 全局锁 被其他事务持有,则释放本地锁(回滚 SELECT FOR UPDATE 语句的本地执行)并重试。这个过程中,查询是被 block 住的,直到 全局锁 拿到,即读取的相关数据是 已提交 的,才返回。

出于总体性能上的考虑,Seata 目前的方案并没有对所有 SELECT 语句都进行代理,仅针对 FOR UPDATE 的 SELECT 语句。

工作机制

TC TM 向 TC 注册

TC 发起 全局事务 ,

通知RM 执行,RM会执行并向TC汇报结果

TC根据结果 全局提交或回滚

一阶段

  1. 解析SQL,获得SQL类型条件

  2. 查询前镜像:根据解析得到的条件信息,生成查询语句,定位数据

  3. 执行业务SQL

  4. 查询后镜像

  5. 插入回滚日志,将前后镜像及业务SQL组成一条回滚日志,插入UNDO_LOG

{
	"branchId": 641789253,
	"undoItems": [{
		"afterImage": {
			"rows": [{
				"fields": [{
					"name": "id",
					"type": 4,
					"value": 1
				}, {
					"name": "name",
					"type": 12,
					"value": "GTS"
				}, {
					"name": "since",
					"type": 12,
					"value": "2014"
				}]
			}],
			"tableName": "product"
		},
		"beforeImage": {
			"rows": [{
				"fields": [{
					"name": "id",
					"type": 4,
					"value": 1
				}, {
					"name": "name",
					"type": 12,
					"value": "TXC"
				}, {
					"name": "since",
					"type": 12,
					"value": "2014"
				}]
			}],
			"tableName": "product"
		},
		"sqlType": "UPDATE"
	}],
	"xid": "xid:xxx"
}
  1. 提交前,向 TC 注册分支,申请 product 表中,主键值等于 1 的记录的 全局锁
  2. 本地事务提交 ,业务数据和 UNDO_LOG一并提交
  3. 将本地事务提交结果上报给 TC

二阶段

提交

  1. 收到 TC 的分支提交请求,把请求放入一个异步任务的队列中,马上返回提交成功的结果给 TC。
  2. 异步任务阶段的分支提交请求将异步和批量地删除相应 UNDO LOG 记录。

回滚

  1. 收到 TC 的分支回滚请求,开启一个本地事务,执行如下操作。
  2. 通过 XID 和 Branch ID 查找到相应的 UNDO LOG 记录。
  3. 数据校验:拿 UNDO LOG 中的后镜与当前数据进行比较,如果有不同,说明数据被当前全局事务之外的动作做了修改。这种情况,需要根据配置策略来做处理,详细的说明在另外的文档中介绍。
  4. 根据 UNDO LOG 中的前镜像和业务 SQL 的相关信息生成并执行回滚的语句:
update product set name = 'TXC' where id = 1;
  1. 提交本地事务。并把本地事务的执行结果(即分支事务回滚的结果)上报给 TC。

Coding

基于 Spring Cloud微服务

建表

TMRM业务数据库添加undo_log

-- 注意此处0.7.0+ 增加字段 context
CREATE TABLE `undo_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(100) NOT NULL,
  `context` varchar(128) NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int(11) NOT NULL,
  `log_created` datetime NOT NULL,
  `log_modified` datetime NOT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

依赖

引入 Spring Cloudseata 依赖

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <!-- 使用ribbon时才用的上 -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-ribbon</artifactId>
        </dependency>

        <!-- 使用feign时才用的上 -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-openfeign</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>1.2.4</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-alibaba-seata</artifactId>
            <version>2.1.0.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>io.himcs</groupId>
            <artifactId>common</artifactId>
            <version>0.0.1</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

添加文件

项目 resources添加以下两个文件

registry.conf

registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  type = "eureka"

  nacos {
    serverAddr = "localhost"
    namespace = ""
    cluster = "default"
  }
  eureka {
    serviceUrl = "http://localhost:8761/eureka"
    application = "default"
    weight = "1"
  }
  redis {
    serverAddr = "localhost:6379"
    db = "0"
    password = ""
    cluster = "default"
    timeout = "0"
  }
  zk {
    cluster = "default"
    serverAddr = "127.0.0.1:2181"
    session.timeout = 6000
    connect.timeout = 2000
    username = ""
    password = ""
  }
  consul {
    cluster = "default"
    serverAddr = "127.0.0.1:8500"
  }
  etcd3 {
    cluster = "default"
    serverAddr = "http://localhost:2379"
  }
  sofa {
    serverAddr = "127.0.0.1:9603"
    application = "default"
    region = "DEFAULT_ZONE"
    datacenter = "DefaultDataCenter"
    cluster = "default"
    group = "SEATA_GROUP"
    addressWaitTime = "3000"
  }
  file {
    name = "file.conf"
  }
}

config {
  # file、nacos 、apollo、zk、consul、etcd3、springCloudConfig
  type = "file"

  nacos {
    serverAddr = "localhost"
    namespace = ""
    group = "SEATA_GROUP"
  }
  consul {
    serverAddr = "127.0.0.1:8500"
  }
  apollo {
    app.id = "seata-server"
    apollo.meta = "http://192.168.1.204:8801"
    namespace = "application"
  }
  zk {
    serverAddr = "127.0.0.1:2181"
    session.timeout = 6000
    connect.timeout = 2000
    username = ""
    password = ""
  }
  etcd3 {
    serverAddr = "http://localhost:2379"
  }
  file {
    name = "file.conf"
  }
}

file.conf

transport {
  # tcp udt unix-domain-socket
  type = "TCP"
  #NIO NATIVE
  server = "NIO"
  #enable heartbeat
  heartbeat = true
  # the client batch send request enable
  enableClientBatchSendRequest = true
  #thread factory for netty
  threadFactory {
    bossThreadPrefix = "NettyBoss"
    workerThreadPrefix = "NettyServerNIOWorker"
    serverExecutorThread-prefix = "NettyServerBizHandler"
    shareBossWorker = false
    clientSelectorThreadPrefix = "NettyClientSelector"
    clientSelectorThreadSize = 1
    clientWorkerThreadPrefix = "NettyClientWorkerThread"
    # netty boss thread size,will not be used for UDT
    bossThreadSize = 1
    #auto default pin or 8
    workerThreadSize = "default"
  }
  shutdown {
    # when destroy server, wait seconds
    wait = 3
  }
  serialization = "seata"
  compressor = "none"
}
service {
 #vgroup->rgroup
  vgroup_mapping.my_test_tx_group = "default"
  #only support when registry.type=file, please don't set multiple addresses
  default.grouplist = "127.0.0.1:8091"
  #degrade, current not support
  enableDegrade = false
  #disable seata
  disableGlobalTransaction = false
}

client {
  rm {
    asyncCommitBufferLimit = 10000
    lock {
      retryInterval = 10
      retryTimes = 30
      retryPolicyBranchRollbackOnConflict = true
    }
    reportRetryCount = 5
    tableMetaCheckEnable = false
    reportSuccessEnable = false
  }
  tm {
    commitRetryCount = 5
    rollbackRetryCount = 5
  }
  undo {
    dataValidation = true
    logSerialization = "jackson"
    logTable = "undo_log"
  }
  log {
    exceptionRate = 100
  }
}

配置

配置 eurekadatasourceseata,其中 tx-service-group 要和file.confvgroup_mapping一致

application.yml

MYSQL_HOST: himcs.io
server:
  port: 8811
spring:
  application:
    name: business-service
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://${MYSQL_HOST:localhost}:3306/seata-business?useUnicode=true&characterEncoding=UTF8&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai
    password: root
    username: root
  cloud:
    alibaba:
      seata:
        tx-service-group: my_test_tx_group # 事务分组
eureka:
  instance:
    hostname: localhost
    prefer-ip-address: true
  client:
    serviceUrl:
      defaultZone: http://${eureka.instance.hostname}:8761/eureka/

数据源代理

配置 Seata数据源代理

@Configuration
public class DataSourceConfiguration {
    @Bean
    @ConfigurationProperties(prefix = "spring.datasource")
    public DruidDataSource druidDataSource() {
        return new DruidDataSource();
    }

    @Primary
    @Bean
    public DataSourceProxy dataSource(DruidDataSource druidDataSource) {
        return new DataSourceProxy(druidDataSource);
    }
}

TM 端

主类添加 SpringCloud 相关依赖

@SpringBootApplication
@EnableEurekaClient
@EnableDiscoveryClient
@EnableFeignClients
@EnableJpaRepositories
@Slf4j
public class BusinessServerApplication {
    public static void main(String[] args) {
        SpringApplication.run(BusinessServerApplication.class, args);
    }
}

添加相关FeignClient

@FeignClient("${services.order-service}")
public interface OrderService {
    @PostMapping("/create")
    OrderDTO create(@RequestBody OrderDTO order);
}

全局事务

在方法上添加 @GlobalTransactional(name = "fsp-sale", timeoutMills = 20000, rollbackFor = Exception.class)


@Service
public class BusinessServiceImpl implements BusinessService {

    @Resource
    private PointsService pointsService;
    @Resource
    private StorageService storageService;
    @Resource
    private OrderService orderService;

    /**
     * 商品销售
     *
     * @param goodsCode 商品编码
     * @param quantity  销售数量
     * @param username  用户名
     * @param points    增加积分
     * @param amount    订单金额
     * @return
     */
    // TM 向 TC发起全局事务 生成 XID(全局锁)
//
    @GlobalTransactional(name = "fsp-sale", timeoutMills = 20000, rollbackFor = Exception.class)
    @Override
    public OrderDTO sale(String goodsCode, Integer quantity, String username, Integer points, Integer amount) {
        PointsDTO pointsDTO = PointsDTO.builder().username(username).points(points).build();
        StorageDTO storageDTO = StorageDTO.builder().goodsCode(goodsCode).quantity(quantity).build();
        OrderDTO request = OrderDTO.builder().username(username).points(points).goodsCode(goodsCode).quantity(quantity).amount(amount).build();
        //写表 UNDO_LOG 记录回滚日志(BranchID),通知TC操作结果
        pointsService.increase(pointsDTO);
        //写表 UNDO_LOG 记录回滚日志(BranchID),通知TC操作结果
        storageService.decrease(storageDTO);
        //写表 UNDO_LOG 记录回滚日志(BranchID),通知TC操作结果
        OrderDTO order = orderService.create(request);
        // 分支 a 执行成功 TM 通知 TC 全局提交
        // 分支 a TC 通知所有RM 提交成功, 删除 UNDO_LOG 回滚日志

        // 分支 b 执行失败 TM 通知 TC 全局 Rollback
        // 分支 b TC 通知所有RM 进行回滚, 根据 UNDO_LOG 反向操作 还原数据 删除 UNDO_LOG

        try {
            Thread.sleep(100000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return order;
    }
}

RM 端

按照配置依次配置依赖,添加文件,配置代理源即可

Last Updated:
Contributors: himcs, himcs