当前位置:网站首页>Seata四大模式之TCC模式详解及代码实现
Seata四大模式之TCC模式详解及代码实现
2022-06-25 00:01:00 【一恍过去】
目录
1、实现机制
1.1 提交阶段
TCC模式是一种需要在业务代码中进行编码的分布式事务解决方案。
- 一阶段:Try,进行资源的检测和预留。
- 二阶段:
- 提交:Confirm,完成资源操作业务;要求 Try 成功 Confirm 一定要能成功。
- 回滚:Cancel,释放预留资源,可以理解为try的反向操作。
1.2 实现逻辑
一阶段:
try,尝试将资源进行锁定,比如需要扣减金额,并且记录一条扣减记录,执行业务(生成订单等操作)。
二阶-事务提交
将金额的扣减记录进行删除,表示完成整个事务过程。
二阶-事务回滚
获取扣减记录,从扣减中将金额进行恢复,表示数据回滚。
TCC中的空回滚和业务悬挂:
当某个分支事务在执行Try操作时,因为阻塞导致全局获取状态超时,从而执行Cancel操作,在未执行Try操作时执行了Cancel操作这就是空回滚。当执行空回滚的业务如果没有了阻塞并且继续执行Try操作,会导致无法执行后续的Confirm或者Cancel操作,这就是业务悬挂。
1.3 优缺点
优点:
- 一阶段完成直接提交事务,释放数据库资源,性能好。
- 相比AT模型,无需生成快照,无需使用全局锁,性能强。
- 不依赖数据库事务,而是依赖补偿操作,可以用于非事务型数据库。
缺点:
- 有代码侵入,需要人为编写try、Confirm和Cancel接口,比较麻烦。
- 软状态,事务是最终一致。
- 需要考虑Confirm和Cancel的失败情况,做好幂等处理。
2、代码实现
创建两个SpringBoot工程,分别为storage-service
与order-service
,模拟从在order-service
服务中新增订单,然后调用storage-service
服务新增库存扣减记录,TCC的是需要开发者通过设计代码自行实现回滚补偿机制;核心代码如下,完整代码参考文末github地址
:
2.1 建表语句
-- 数据库名称: seata-tcc-demo.sql
-- 订单表
CREATE TABLE `tb_order`
(
`id` int(11) NOT NULL COMMENT '主键',
`count` int(11) NULL DEFAULT 0 COMMENT '下单数量',
`money` int(11) NULL DEFAULT 0 COMMENT '金额',
`status` int(11) NULL DEFAULT 1 COMMENT '状态:1:预处理,2-完成',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = COMPACT;
-- 库存表
CREATE TABLE `tb_storage`
(
`id` int(11) NOT NULL COMMENT '主键',
`order_id` int(11) NOT NULL COMMENT '订单ID',
`count` int(11) NOT NULL DEFAULT 0 COMMENT '库存',
`status` int(11) NULL DEFAULT 1 COMMENT '状态:1:预处理,2-完成',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 2 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = COMPACT;
2.2 order-service服务
2.2.1 yaml配置
server:
port: 8082
spring:
application:
name: order-service
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3307/seata-at-demo?useUnicode=true&useSSL=false&zeroDateTimeBehavior=convertToNull&characterEncoding=UTF-8&allowMultiQueries=true&serverTimezone=Asia/Shanghai
username: root
password: lhzlx
cloud:
nacos:
discovery:
server-addr: 127.0.0.1:8848
namespace: 64ed9ca7-d705-4655-b4e4-f824e420a12a
group: test
seata:
enabled: true
application-id: ${
spring.application.name}
# 事务组的名称,对应service.vgroupMapping.default_tx_group=xxx中配置的default_tx_group
tx-service-group: default_tx_group
# 配置事务组与集群的对应关系
service:
vgroup-mapping:
# default_tx_group为事务组的名称,default为集群名称
default_tx_group: default
disable-global-transaction: false
registry:
type: nacos
nacos:
application: seata-server
server-addr: 127.0.0.1:8848
group: SEATA_GROUP
namespace: 64ed9ca7-d705-4655-b4e4-f824e420a12a
username: nacos
password: nacos
cluster: default
config:
type: nacos
nacos:
server-addr: 162.14.115.18:8848
group: SEATA_GROUP
namespace: 64ed9ca7-d705-4655-b4e4-f824e420a12a
username: nacos
password: nacos
data-id: seataServer.properties
2.2.2 Service接口
在接口上使用@LocalTCC
注解表示开启TCC模式,否则seata会认为是AT模式;
@LocalTCC
public interface OrderService {
/** * 创建订单 * @TwoPhaseBusinessAction 描述⼆阶段提交 * name: 为 tcc⽅法的 bean 名称,需要全局唯⼀,⼀般写⽅法名即可 * commitMethod: Commit⽅法的⽅法名 * rollbackMethod:Rollback⽅法的⽅法名 * @BusinessActionContextParamete 该注解⽤来修饰 Try⽅法的⼊参, * 被修饰的⼊参可以在 Commit ⽅法和 Rollback ⽅法中通过BusinessActionContext 获取。 * @param order * @return */
@TwoPhaseBusinessAction(name = "createOrderPrepare", commitMethod = "createOrderCommit", rollbackMethod = "createOrderRollBack")
Order createOrderPrepare(@BusinessActionContextParameter(paramName = "order") Order order);
/** * 提交 * @param context * @return */
Boolean createOrderCommit(BusinessActionContext context);
/** * 回滚 * @param context * @return */
Boolean createOrderRollBack(BusinessActionContext context);
}
2.2.3 Service实现类
@Slf4j
@Service
public class OrderServiceImpl implements OrderService {
private static final Map<String, String> STATUS_MAP = new ConcurrentHashMap<>();
@Resource
private OrderMapper orderMapper;
/** * 创建订单 * * @param order * @return */
@Override
public Order createOrderPrepare(Order order) {
// 0.获取事务id
String xid = RootContext.getXID();
log.info("创建订单预处理,xid={}",xid );
// 设置为预处理状态
order.setStatus(1);
// 判断是否已经执行过了Cancel或者Confirm
if(STATUS_MAP.get(xid)!=null){
// 表示已经执行了Cancel或者Confirm实现业务悬挂
return null;
}
orderMapper.insert(order);
return order;
}
/** * 提交 * @param context * @return */
@Override
public Boolean createOrderCommit(BusinessActionContext context){
try {
String xid = context.getXid();
// 将订单的状态修改为完成
log.info("创建订单提交处理,xid={}",xid );
// 幂等处理
if(STATUS_MAP.get(xid)!=null){
return true;
}
STATUS_MAP.put(xid,"Confirm");
Object obj = context.getActionContext("order");
if(obj!=null) {
Order order = JSON.parseObject(obj.toString(), Order.class);
if (order != null) {
order.setStatus(2);
orderMapper.updateById(order);
}
}
}catch (Exception e){
log.error(e.getMessage());
}
return true;
}
/** * 回滚 * @param context * @return */
@Override
public Boolean createOrderRollBack(BusinessActionContext context){
try {
String xid = context.getXid();
log.info("创建订单回滚处理,xid={}",xid );
// 幂等处理
if(STATUS_MAP.get(xid)!=null){
return true;
}
STATUS_MAP.put(xid,"Cancel");
// 将订单的状态修改为完成
Object obj = context.getActionContext("order");
if(obj!=null) {
Order order = JSON.parseObject(obj.toString(), Order.class);
// 将订单进行删除,表示回滚
if (order != null) {
log.info("删除订单ID:"+order.getId());
orderMapper.deleteById(order.getId());
}
}
}catch (Exception e){
log.error(e.getMessage());
}
return true;
}
}
2.2.4 Controller
@RestController
@RequestMapping("order")
public class OrderController {
@Resource
private TccHandler tccHandler;
@PostMapping
public ResponseEntity<String> createOrder(@RequestBody Order order) {
long id = new Random().nextInt(999999999);
order.setId(id);
tccHandler.createOrderAndStorage(order);
return ResponseEntity.status(HttpStatus.OK).body("操作成功");
}
}
2.2.5 TCC处理器
@Component
@Slf4j
public class TccHandler {
@Resource
private OrderService orderService;
@Resource
private StorageClient storageClient;
/** * 创建订单和库存记录的TCC处理器 * 使用@GlobalTransactional开启全局事务 * @param order * @return */
@GlobalTransactional
public void createOrderAndStorage(Order order) {
// 记录订单数据
log.info("开始记录订单数据...");
Order orderPrepare = orderService.createOrderPrepare(order);
log.info("结束记录订单数据...");
// feign调用记录库存数据
log.info("开始记录库存数据...");
storageClient.deduct(orderPrepare.getId(),orderPrepare.getCount());
log.info("结束记录库存数据...");
// 模拟最后出现异常情况
int a=1/0;
}
}
2.2.6 StorageClient
@FeignClient("storage-service")
public interface StorageClient {
/** * 扣减库存 * * @param orderId * @param count */
@PostMapping("/storage")
void deduct(@RequestParam("orderId") Long orderId, @RequestParam("count") Integer count);
}
2.3 storage-service服务
2.3.1 yaml配置
server:
port: 8081
spring:
application:
name: storage-service
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3307/seata-at-demo?useUnicode=true&useSSL=false&zeroDateTimeBehavior=convertToNull&characterEncoding=UTF-8&allowMultiQueries=true&serverTimezone=Asia/Shanghai
username: root
password: lhzlx
cloud:
nacos:
discovery:
server-addr: 127.0.0.1:8848
namespace: 64ed9ca7-d705-4655-b4e4-f824e420a12a
group: test
# 在dev环境进行debug时,可以将时间设置长一些
#heart-beat-interval: 1000 #心跳间隔。单位为毫秒,默认5*1000
heart-beat-timeout: 300000 #心跳暂停,收不到心跳,会将实例设为不健康。单位为毫秒,默认15*1000
ip-delete-timeout: 4000000 #Ip删除超时,收不到心跳,会将实例删除。单位为毫秒,默认30*1000
seata:
enabled: true
application-id: ${
spring.application.name}
# 事务组的名称,对应service.vgroupMapping.default_tx_group=xxx中配置的default_tx_group
tx-service-group: default_tx_group
# 配置事务组与集群的对应关系
service:
vgroup-mapping:
# default_tx_group为事务组的名称,default为集群名称
default_tx_group: default
disable-global-transaction: false
registry:
type: nacos
nacos:
application: seata-server
server-addr: 162.14.115.18:8848
group: SEATA_GROUP
namespace: 64ed9ca7-d705-4655-b4e4-f824e420a12a
username: nacos
password: nacos
cluster: default
config:
type: nacos
nacos:
server-addr: 162.14.115.18:8848
group: SEATA_GROUP
namespace: 64ed9ca7-d705-4655-b4e4-f824e420a12a
username: nacos
password: nacos
data-id: seataServer.properties
2.3.2 Service接口
在接口上使用@LocalTCC
注解表示开启TCC模式,否则seata会认为是AT模式;
@LocalTCC
public interface StorageService {
/** * 创建订单 * @TwoPhaseBusinessAction 描述⼆阶段提交 * name: 为 tcc⽅法的 bean 名称,需要全局唯⼀,⼀般写⽅法名即可 * commitMethod: Commit⽅法的⽅法名 * rollbackMethod:Rollback⽅法的⽅法名 * @BusinessActionContextParamete 该注解⽤来修饰 Try⽅法的⼊参, * 被修饰的⼊参可以在 Commit ⽅法和 Rollback ⽅法中通过BusinessActionContext 获取。 * * @param storage * @return */
@TwoPhaseBusinessAction(name = "createPrepare", commitMethod = "deductCommit", rollbackMethod = "deductRollBack")
void deductPrepare(@BusinessActionContextParameter(paramName = "storage") Storage storage);
/** * 提交 * @param context * @return */
Boolean deductCommit(BusinessActionContext context);
/** * 回滚 * @param context * @return */
Boolean deductRollBack(BusinessActionContext context);
}
2.3.3 Service实现类
@Slf4j
@Service
public class StorageServiceImpl implements StorageService {
private static final Map<String, String> STATUS_MAP = new ConcurrentHashMap<>();
@Resource
private StorageMapper storageMapper;
/** * 扣除存储数量 * */
@Override
public void deductPrepare( Storage storage) {
// 0.获取事务id
String xid = RootContext.getXID();
log.info("记录库存信息预处理,xid={}",xid );
try {
// 设置为预处理状态
storage.setStatus(1);
// 判断是否已经执行过了Cancel或者Confirm
if(STATUS_MAP.get(xid)!=null){
// 表示已经执行了Cancel或者Confirm实现业务悬挂
return ;
}
storageMapper.insert(storage);
// 下游服务抛出异常
// int a = 1 / 0;
} catch (Exception e) {
throw new RuntimeException("扣减库存失败,可能是库存不足!", e);
}
log.info("库存信息记录成功");
}
/** * 提交 * @param context * @return */
@Override
public Boolean deductCommit(BusinessActionContext context){
try {
String xid = context.getXid();
// 将状态修改为完成
log.info("记录库存信息提交处理,xid={}", xid);
// 幂等处理
if(STATUS_MAP.get(xid)!=null){
return true;
}
STATUS_MAP.put(xid,"Confirm");
Object obj = context.getActionContext("storage");
if (obj != null) {
Storage storage = JSON.parseObject(obj.toString(), Storage.class);
if (storage != null) {
storage.setStatus(2);
storageMapper.updateById(storage);
}
}
}catch (Exception e){
log.error(e.getMessage());
}
return true;
}
/** * 回滚 * @param context * @return */
@Override
public Boolean deductRollBack(BusinessActionContext context){
try {
String xid = context.getXid();
log.info("记录库存信息回滚处理,xid={}",xid );
// 幂等处理
if(STATUS_MAP.get(xid)!=null){
return true;
}
STATUS_MAP.put(xid,"Cancel");
// 将订单的状态修改为完成
Object obj = context.getActionContext("storage");
if(obj!=null) {
Storage storage = JSON.parseObject(obj.toString(), Storage.class);
if (storage != null) {
// 将记录进行删除,表示回滚
log.info("删除记录ID:"+storage.getId());
storageMapper.deleteById(storage.getId());
}
}
}catch (Exception e){
log.error(e.getMessage());
}
return true;
}
}
2.3.4 Controller
@RestController
@RequestMapping("storage")
public class StorageController {
@Resource
private StorageService storageService;
/** * 扣减库存 * * @param orderId 商品ID * @param count 要扣减的数量 * @return */
@PostMapping
public ResponseEntity<Void> deduct(@RequestParam("orderId") Long orderId, @RequestParam("count") Integer count) {
Storage storage = new Storage();
long id = new Random().nextInt(999999999);
storage.setId(id);
storage.setOrderId(orderId);
storage.setCount(count);
storageService.deductPrepare(storage);
return ResponseEntity.status(HttpStatus.OK).body(null);
}
}
注意: TCC就是通过手动编写自定义代码,实现事务的回滚与提交,而全局事务的控制还是由seata完成
3 测试
测试时没有做截图进行演示,只说明了结果,可以运行代码设置异常进行验证
3.1 下游服务异常
在order-service
服务中正常,在storage-service
服务的service中抛出异常,观察数据是否成功回滚;如果tb_order
与tb_storage
都不存在数据,则表示全局事务成功;
3.2 上游服务异常
order-service
服务的TccHandler
中在执行storageClient.deduct()
方法后抛出异常,在storage-service
服务中正常,观察数据是否成功回滚;如果tb_order
与tb_storage
都不存在数据,则表示全局事务成功;
3.3 数据最终一致性验证
我们可以在上游服务执行完storageClient.deduct()
后马上进入断点,测试去观察数据库会发现tb_order
、tb_storage
中存在数据,再放行断点使程序执行异常,再次观察数据库会发现tb_order
、tb_storage
中的数据已经被删除了;
4、源码地址
Seata值AT模式代码实现:《seata-tcc-demo》
边栏推荐
- Is it safe to open an account in the way of winning 100% of the new bonds
- 把 Oracle 数据库从 Windows 系统迁移到 Linux Oracle Rac 集群环境(1)——迁移数据到节点1
- Xiaomi routing R4A Gigabit version installation feed+openwrt tutorial (the full script does not need to be hard modified)
- Uncaught Error: [About] is not a <Route> component. All component children of <Routes> must be a <Ro
- 把 Oracle 数据库从 Windows 系统迁移到 Linux Oracle Rac 集群环境(4)—— 修改 oracle11g rac 集群的 scanIP
- How transformers Roberta adds tokens
- vie的刷新机制
- 1-6 build win7 virtual machine environment
- Modifying universal render data at runtime
- The Oracle 11g RAC cluster database cannot be started due to directory permission errors
猜你喜欢
Once beego failed to find bee after passing the go get command Exe's pit
Before the age of 36, Amazon transgender hackers were sentenced to 20 years' imprisonment for stealing data from more than 100million people!
Distributed transaction solutions and code implementation
Difference between left join on and join on
MySql安装教程
高速缓存Cache详解(西电考研向)
AI自己写代码让智能体进化!OpenAI的大模型有“人类思想”那味了
AI writes its own code to let agents evolve! The big model of openai has the flavor of "human thought"
好用的字典-defaultdict
The era of copilot free is over! Student party and defenders of popular open source projects can prostitute for nothing
随机推荐
对进程内存的实践和思考
Overview of AOSP ~ WiFi architecture
如何卸载cuda
Add in cmakelists_ Definitions() function
Uncaught Error: [About] is not a <Route> component. All component children of <Routes> must be a <Ro
Array - fast and slow pointer in one breath
自动化测试
doak-cms 文章管理系统 推荐
E - Average and Median(二分)
ACM. HJ70 矩阵乘法计算量估算 ●●
ACM. Hj70 matrix multiplication calculation amount estimation ●●
Is flush a regular platform? Is it safe for flush to open an account
小米路由R4A千兆版安装breed+OpenWRT教程(全脚本无需硬改)
[analysis of STL source code] functions and applications of six STL components (directory)
Enlightenment of using shadergraph to make edge fusion particle shader
ACL access control of squid proxy server
Getting started with unityshader Essentials - PBS physics based rendering
[i.mx6ul] u-boot migration (VI) network driver modification lan8720a
random list随机生成不重复数
Advanced mathematics | proficient in mean value theorem problem solving routines summary