事物
严格意义上的事务实现应该是具备原子性、一致性、隔离性和持久性,简称 ACID。
- 原子性(Atomicity),可以理解为一个事务内的所有操作要么都执行,要么都不执行。
- 一致性(Consistency),可以理解为数据是满足完整性约束的,也就是不会存在中间状态的数据,比如你账上有400,我账上有100,你给我打200块,此时你账上的钱应该是200,我账上的钱应该是300,不会存在我账上钱加了,你账上钱没扣的中间状态
- 隔离性(Isolation),指的是多个事务并发执行的时候不会互相干扰,即一个事务内部的数据对于其他事务来说是隔离的
- 持久性(Durability),指的是一个事务完成了之后数据就被永远保存下来,之后的其他操作或故障都不会对事务的结果产生影响
通俗意义上事务就是为了使得一些更新操作要么都成功,要么都失败。
分布式系统
分布式系统满足CAP原则。CAP原则又称CAP定理,指的是在一个分布式系统中,一致性(Consistency)、可用性(Availability)、分区容错性(Partition tolerance)。 CAP 原则指的是,这三个要素最多只能同时实现两点,不可能三者兼顾。
- 一致性:在分布式系统中的所有数据备份,在同一时刻是否同样的值。(等同于所有节点访问同一份最新的数据副本)
- 可用性:保证每个请求不管成功或者失败都有响应
- 分区容错性:系统中任意信息的丢失或失败不会影响系统的继续运作。
BASE理论
BASE是Basically Available(基本可用)、Soft state(软状态)和Eventually consistent(最终一致性)三个短语的简写。
-
基本可用 假设系统,出现了不可预知的故障,但还是能用,相比较正常的系统而言。 响应时间上的损失:正常情况下的搜索引擎0.5秒即返回给用户结果,而基本可用的搜索引擎可以在2秒作用返回结果 功能上的损失:在一个电商网站上,正常情况下,用户可以顺利完成每一笔订单。但是到了大促期间,为了保护购物系统的稳定性,部分消费者可能会被引导到一个降级页面
-
软状态
什么是软状态呢?相对于原子性而言,要求多个节点的数据副本都是一致的,这是一种“硬状态”。 软状态指的是:允许系统中的数据存在中间状态,并认为该状态不影响系统的整体可用性,即允许系统在多个不同节点的数据副本存在数据延时。 -
最终一致性 上面说软状态,然后不可能一直是软状态,必须有个时间期限。在期限过后,应当保证所有副本保持数据一致性,从而达到数据的最终一致性。 这个时间期限取决于网络延时、系统负载、数据复制方案设计等等因素
BASE是对CAP中一致性和可用性权衡的结果,其来源于对大规模互联网系统分布式实践的结论,是基于CAP定理逐步演化而来的, 其核心思想是即使无法做到强一致性(Strong consistency),但每个应用都可以根据自身的业务特点,采用适当的方式来使系统达到最终一致性(Eventual consistency)。
分布式事物
分布式事务顾名思义就是要在分布式系统中实现事务,它其实是由多个本地事务组合而成。
对于分布式事务而言几乎满足不了 ACID,其实对于单机事务而言大部分情况下也没有满足 ACID, 不然怎么会有四种隔离级别呢(未提交读,提交读,可重复读,序列化)?所以更别说分布在不同数据库或者不同应用上的分布式事务了。
分布式事物实现
2PC
2PC(Two-phase commit protocol),中文叫二阶段提交。 二阶段提交是一种强一致性设计,2PC 引入一个事务协调者的角色来 协调管理各参与者(也可称之为各本地资源)的提交和回滚,二阶段分别指的是准备(投票)和提交两个阶段。
2PC 是一种尽量保证强一致性的分布式事务,因此它是同步阻塞的,而同步阻塞就导致长久的资源锁定问题,总体而言效率低,并且存在单点故障问题, 在极端条件下存在数据不一致的风险。2PC 适用于数据库层面的分布式事务场景。 像 Java 中的 JTA 只能解决一个应用下多数据库的分布式事务问题,跨服务了就不能用了。 简单说下 Java 中 JTA,它是基于XA规范实现的事务接口,这里的 XA 你可以简单理解为基于数据库的 XA 规范来实现的 2PC
TCC
2PC 和 3PC 都是数据库层面的,而 TCC 是业务层面的分布式事务,就像我前面说的分布式事务不仅仅包括数据库的操作,还包括发送短信等,这时候 TCC 就派上用场了!
TCC 指的是Try - Confirm - Cancel。
Try 指的是预留,即资源的预留和锁定,注意是预留。 Confirm 指的是确认操作,这一步其实就是真正的执行了。 Cancel 指的是撤销操作,可以理解为把预留阶段的动作撤销了。 其实从思想上看和 2PC 差不多,都是先试探性的执行,如果都可以那就真正的执行,如果不行就回滚。
比如说一个事务要执行A、B、C三个操作,那么先对三个操作执行预留动作。如果都预留成功了那么就执行确认操作,如果有一个预留失败那就都执行撤销动作。
本地消息表
本地消息表其实就是利用了 各系统本地的事务来实现分布式事务。 本地消息表顾名思义就是会有一张存放本地消息的表,一般都是放在数据库中,然后在执行业务的时候 将业务的执行和将消息放入消息表中的操作放在同一个事务中,这样就能保证消息放入本地表中业务肯定是执行成功的。 然后再去调用下一个操作,如果下一个操作调用成功了好说,消息表的消息状态可以直接改成已成功。 如果调用失败也没事,会有 后台任务定时去读取本地消息表,筛选出还未成功的消息再调用对应的服务,服务更新成功了再变更消息的状态。 这时候有可能消息对应的操作不成功,因此也需要重试,重试就得保证对应服务的方法是幂等的,而且一般重试会有最大次数,超过最大次数可以记录下报警让人工处理。 可以看到本地消息表其实实现的是最终一致性,容忍了数据暂时不一致的情况。
消息事务
RocketMQ 就很好的支持了消息事务,让我们来看一下如何通过消息实现事务。 第一步先给 Broker 发送事务消息即半消息,半消息不是说一半消息,而是这个消息对消费者来说不可见,然后发送成功后发送方再执行本地事务。 再根据本地事务的结果向 Broker 发送 Commit 或者 RollBack 命令。 并且 RocketMQ 的发送方会提供一个反查事务状态接口,如果一段时间内半消息没有收到任何操作请求,那么 Broker 会通过反查接口得知发送方事务是否执行成功,然后执行 Commit 或者 RollBack 命令。 如果是 Commit 那么订阅方就能收到这条消息,然后再做对应的操作,做完了之后再消费这条消息即可。 如果是 RollBack 那么订阅方收不到这条消息,等于事务就没执行过。 可以看到通过 RocketMQ 还是比较容易实现的,RocketMQ 提供了事务消息的功能,我们只需要定义好事务反查接口即可。
seata AT模式 + mybatis-plus + consul实战
服务端
获取seata server代码,配置服务对应consul地址
客户端
- 引入包
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-seata</artifactId>
<version>2.1.0.RELEASE</version>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
<version>1.4.0</version>
</dependency>
- application.yml配置
spring:
datasource:
url: jdbc:mysql://xxx?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useAffectedRows=true&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true
driver-class-name: com.mysql.jdbc.Driver
username: xxx
password: xxxx
type: com.zaxxer.hikari.HikariDataSource
hikari:
minimum-idle: 5
maximum-pool-size: 15
auto-commit: true
idle-timeout: 30000
max-lifetime: 180000
connection-timeout: 30000
connection-test-query: SELECT 1
jdbc-url: jdbc:mysql://xxx?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useAffectedRows=true&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true
cloud:
alibaba:
seata:
tx-service-group: my_test_tx_group
- 配置datasource
@Configuration public class DataSourceConfiguration { @Bean @Primary public DataSourceProxy dataSourceProxy(DataSourceProperties properties){ HikariDataSource dataSource = properties.initializeDataSourceBuilder().type(HikariDataSource.class).build(); return new DataSourceProxy(dataSource); } }
- 新建配置文件file.conf、register.conf
file.conf
transport {
# tcp udt unix-domain-socket
type = "TCP"
#NIO NATIVE
server = "NIO"
#enable heartbeat
heartbeat = true
# the client batch send request enable
enableClientBatchSendRequest = true
#thread factory for netty
threadFactory {
bossThreadPrefix = "NettyBoss"
workerThreadPrefix = "NettyServerNIOWorker"
serverExecutorThread-prefix = "NettyServerBizHandler"
shareBossWorker = false
clientSelectorThreadPrefix = "NettyClientSelector"
clientSelectorThreadSize = 1
clientWorkerThreadPrefix = "NettyClientWorkerThread"
# netty boss thread size,will not be used for UDT
bossThreadSize = 1
#auto default pin or 8
workerThreadSize = "default"
}
shutdown {
# when destroy server, wait seconds
wait = 3
}
serialization = "seata"
compressor = "none"
}
service {
#transaction service group mapping
vgroupMapping.my_test_tx_group = "default"
#only support when registry.type=file, please don't set multiple addresses
default.grouplist = "127.0.0.1:8091"
#degrade, current not support
enableDegrade = false
#disable seata
disableGlobalTransaction = false
}
client {
rm {
asyncCommitBufferLimit = 10000
lock {
retryInterval = 10
retryTimes = 30
retryPolicyBranchRollbackOnConflict = true
}
reportRetryCount = 5
tableMetaCheckEnable = false
reportSuccessEnable = false
}
tm {
commitRetryCount = 5
rollbackRetryCount = 5
}
undo {
dataValidation = true
logSerialization = "jackson"
logTable = "undo_log"
}
log {
exceptionRate = 100
}
}
register.conf
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "consul"
nacos {
application = "seata-server"
serverAddr = "localhost"
namespace = ""
username = ""
password = ""
}
eureka {
serviceUrl = "http://localhost:8761/eureka"
weight = "1"
}
redis {
serverAddr = "localhost:6379"
db = "0"
password = ""
timeout = "0"
}
zk {
serverAddr = "127.0.0.1:2181"
sessionTimeout = 6000
connectTimeout = 2000
username = ""
password = ""
}
consul {
serverAddr = "xxxx"
}
etcd3 {
serverAddr = "http://localhost:2379"
}
sofa {
serverAddr = "127.0.0.1:9603"
region = "DEFAULT_ZONE"
datacenter = "DefaultDataCenter"
group = "SEATA_GROUP"
addressWaitTime = "3000"
}
file {
name = "file.conf"
}
}
config {
# file、nacos 、apollo、zk、consul、etcd3、springCloudConfig
type = "file"
nacos {
serverAddr = "localhost"
namespace = ""
group = "SEATA_GROUP"
username = ""
password = ""
}
consul {
serverAddr = "127.0.0.1:8500"
}
apollo {
appId = "seata-server"
apolloMeta = "http://192.168.1.204:8801"
namespace = "application"
}
zk {
serverAddr = "127.0.0.1:2181"
sessionTimeout = 6000
connectTimeout = 2000
username = ""
password = ""
}
etcd3 {
serverAddr = "http://localhost:2379"
}
file {
name = "file.conf"
}
}
- 业务数据库新增表undo_log
create table undo_log
(
id bigint auto_increment
primary key,
branch_id bigint not null,
xid varchar(100) not null,
context varchar(128) not null,
rollback_info longblob not null,
log_status int not null,
log_created datetime not null,
log_modified datetime not null,
constraint ux_undo_log
unique (xid, branch_id)
)
charset = utf8;
- 使用
使用@GlobalTransactional配置