架构师(四) 分布式事物

Posted by YaPi on December 30, 2020

事物

严格意义上的事务实现应该是具备原子性、一致性、隔离性和持久性,简称 ACID。

  • 原子性(Atomicity),可以理解为一个事务内的所有操作要么都执行,要么都不执行。
  • 一致性(Consistency),可以理解为数据是满足完整性约束的,也就是不会存在中间状态的数据,比如你账上有400,我账上有100,你给我打200块,此时你账上的钱应该是200,我账上的钱应该是300,不会存在我账上钱加了,你账上钱没扣的中间状态
  • 隔离性(Isolation),指的是多个事务并发执行的时候不会互相干扰,即一个事务内部的数据对于其他事务来说是隔离的
  • 持久性(Durability),指的是一个事务完成了之后数据就被永远保存下来,之后的其他操作或故障都不会对事务的结果产生影响

通俗意义上事务就是为了使得一些更新操作要么都成功,要么都失败。

分布式系统

分布式系统满足CAP原则。CAP原则又称CAP定理,指的是在一个分布式系统中,一致性(Consistency)、可用性(Availability)、分区容错性(Partition tolerance)。 CAP 原则指的是,这三个要素最多只能同时实现两点,不可能三者兼顾。

  • 一致性:在分布式系统中的所有数据备份,在同一时刻是否同样的值。(等同于所有节点访问同一份最新的数据副本)
  • 可用性:保证每个请求不管成功或者失败都有响应
  • 分区容错性:系统中任意信息的丢失或失败不会影响系统的继续运作。

BASE理论

BASE是Basically Available(基本可用)、Soft state(软状态)和Eventually consistent(最终一致性)三个短语的简写。

  1. 基本可用 假设系统,出现了不可预知的故障,但还是能用,相比较正常的系统而言。 响应时间上的损失:正常情况下的搜索引擎0.5秒即返回给用户结果,而基本可用的搜索引擎可以在2秒作用返回结果 功能上的损失:在一个电商网站上,正常情况下,用户可以顺利完成每一笔订单。但是到了大促期间,为了保护购物系统的稳定性,部分消费者可能会被引导到一个降级页面

  2. 软状态
    什么是软状态呢?相对于原子性而言,要求多个节点的数据副本都是一致的,这是一种“硬状态”。 软状态指的是:允许系统中的数据存在中间状态,并认为该状态不影响系统的整体可用性,即允许系统在多个不同节点的数据副本存在数据延时。

  3. 最终一致性 上面说软状态,然后不可能一直是软状态,必须有个时间期限。在期限过后,应当保证所有副本保持数据一致性,从而达到数据的最终一致性。 这个时间期限取决于网络延时、系统负载、数据复制方案设计等等因素

BASE是对CAP中一致性和可用性权衡的结果,其来源于对大规模互联网系统分布式实践的结论,是基于CAP定理逐步演化而来的, 其核心思想是即使无法做到强一致性(Strong consistency),但每个应用都可以根据自身的业务特点,采用适当的方式来使系统达到最终一致性(Eventual consistency)。

分布式事物

分布式事务顾名思义就是要在分布式系统中实现事务,它其实是由多个本地事务组合而成。

对于分布式事务而言几乎满足不了 ACID,其实对于单机事务而言大部分情况下也没有满足 ACID, 不然怎么会有四种隔离级别呢(未提交读,提交读,可重复读,序列化)?所以更别说分布在不同数据库或者不同应用上的分布式事务了。

分布式事物实现

2PC

2PC(Two-phase commit protocol),中文叫二阶段提交。 二阶段提交是一种强一致性设计,2PC 引入一个事务协调者的角色来 协调管理各参与者(也可称之为各本地资源)的提交和回滚,二阶段分别指的是准备(投票)和提交两个阶段。

2PC 是一种尽量保证强一致性的分布式事务,因此它是同步阻塞的,而同步阻塞就导致长久的资源锁定问题,总体而言效率低,并且存在单点故障问题, 在极端条件下存在数据不一致的风险。2PC 适用于数据库层面的分布式事务场景。 像 Java 中的 JTA 只能解决一个应用下多数据库的分布式事务问题,跨服务了就不能用了。 简单说下 Java 中 JTA,它是基于XA规范实现的事务接口,这里的 XA 你可以简单理解为基于数据库的 XA 规范来实现的 2PC

TCC

2PC 和 3PC 都是数据库层面的,而 TCC 是业务层面的分布式事务,就像我前面说的分布式事务不仅仅包括数据库的操作,还包括发送短信等,这时候 TCC 就派上用场了!

TCC 指的是Try - Confirm - Cancel。

Try 指的是预留,即资源的预留和锁定,注意是预留。 Confirm 指的是确认操作,这一步其实就是真正的执行了。 Cancel 指的是撤销操作,可以理解为把预留阶段的动作撤销了。 其实从思想上看和 2PC 差不多,都是先试探性的执行,如果都可以那就真正的执行,如果不行就回滚。

比如说一个事务要执行A、B、C三个操作,那么先对三个操作执行预留动作。如果都预留成功了那么就执行确认操作,如果有一个预留失败那就都执行撤销动作。

本地消息表

本地消息表其实就是利用了 各系统本地的事务来实现分布式事务。 本地消息表顾名思义就是会有一张存放本地消息的表,一般都是放在数据库中,然后在执行业务的时候 将业务的执行和将消息放入消息表中的操作放在同一个事务中,这样就能保证消息放入本地表中业务肯定是执行成功的。 然后再去调用下一个操作,如果下一个操作调用成功了好说,消息表的消息状态可以直接改成已成功。 如果调用失败也没事,会有 后台任务定时去读取本地消息表,筛选出还未成功的消息再调用对应的服务,服务更新成功了再变更消息的状态。 这时候有可能消息对应的操作不成功,因此也需要重试,重试就得保证对应服务的方法是幂等的,而且一般重试会有最大次数,超过最大次数可以记录下报警让人工处理。 可以看到本地消息表其实实现的是最终一致性,容忍了数据暂时不一致的情况。

消息事务

RocketMQ 就很好的支持了消息事务,让我们来看一下如何通过消息实现事务。 第一步先给 Broker 发送事务消息即半消息,半消息不是说一半消息,而是这个消息对消费者来说不可见,然后发送成功后发送方再执行本地事务。 再根据本地事务的结果向 Broker 发送 Commit 或者 RollBack 命令。 并且 RocketMQ 的发送方会提供一个反查事务状态接口,如果一段时间内半消息没有收到任何操作请求,那么 Broker 会通过反查接口得知发送方事务是否执行成功,然后执行 Commit 或者 RollBack 命令。 如果是 Commit 那么订阅方就能收到这条消息,然后再做对应的操作,做完了之后再消费这条消息即可。 如果是 RollBack 那么订阅方收不到这条消息,等于事务就没执行过。 可以看到通过 RocketMQ 还是比较容易实现的,RocketMQ 提供了事务消息的功能,我们只需要定义好事务反查接口即可。

seata AT模式 + mybatis-plus + consul实战

服务端

获取seata server代码,配置服务对应consul地址

客户端

  • 引入包
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-alibaba-seata</artifactId>
    <version>2.1.0.RELEASE</version>
</dependency>
<dependency>
    <groupId>io.seata</groupId>
    <artifactId>seata-all</artifactId>
    <version>1.4.0</version>
</dependency>
  • application.yml配置
spring:
    datasource:
        url: jdbc:mysql://xxx?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useAffectedRows=true&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true
        driver-class-name: com.mysql.jdbc.Driver
        username: xxx
        password: xxxx
        type: com.zaxxer.hikari.HikariDataSource
        hikari:
          minimum-idle: 5
          maximum-pool-size: 15
          auto-commit: true
          idle-timeout: 30000
          max-lifetime: 180000
          connection-timeout: 30000
          connection-test-query: SELECT 1
          jdbc-url: jdbc:mysql://xxx?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useAffectedRows=true&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true
    cloud:
        alibaba:
          seata:
            tx-service-group: my_test_tx_group
  • 配置datasource
    @Configuration
    public class DataSourceConfiguration {
    
      @Bean
      @Primary
      public DataSourceProxy dataSourceProxy(DataSourceProperties properties){
          HikariDataSource dataSource = properties.initializeDataSourceBuilder().type(HikariDataSource.class).build();
          return new DataSourceProxy(dataSource);
      }
    }
    
  • 新建配置文件file.conf、register.conf

file.conf

transport {
  # tcp udt unix-domain-socket
  type = "TCP"
  #NIO NATIVE
  server = "NIO"
  #enable heartbeat
  heartbeat = true
  # the client batch send request enable
  enableClientBatchSendRequest = true
  #thread factory for netty
  threadFactory {
    bossThreadPrefix = "NettyBoss"
    workerThreadPrefix = "NettyServerNIOWorker"
    serverExecutorThread-prefix = "NettyServerBizHandler"
    shareBossWorker = false
    clientSelectorThreadPrefix = "NettyClientSelector"
    clientSelectorThreadSize = 1
    clientWorkerThreadPrefix = "NettyClientWorkerThread"
    # netty boss thread size,will not be used for UDT
    bossThreadSize = 1
    #auto default pin or 8
    workerThreadSize = "default"
  }
  shutdown {
    # when destroy server, wait seconds
    wait = 3
  }
  serialization = "seata"
  compressor = "none"
}
service {
  #transaction service group mapping
  vgroupMapping.my_test_tx_group = "default"
  #only support when registry.type=file, please don't set multiple addresses
  default.grouplist = "127.0.0.1:8091"
  #degrade, current not support
  enableDegrade = false
  #disable seata
  disableGlobalTransaction = false
}

client {
  rm {
    asyncCommitBufferLimit = 10000
    lock {
      retryInterval = 10
      retryTimes = 30
      retryPolicyBranchRollbackOnConflict = true
    }
    reportRetryCount = 5
    tableMetaCheckEnable = false
    reportSuccessEnable = false
  }
  tm {
    commitRetryCount = 5
    rollbackRetryCount = 5
  }
  undo {
    dataValidation = true
    logSerialization = "jackson"
    logTable = "undo_log"
  }
  log {
    exceptionRate = 100
  }
}

register.conf

registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  type = "consul"

  nacos {
    application = "seata-server"
    serverAddr = "localhost"
    namespace = ""
    username = ""
    password = ""
  }
  eureka {
    serviceUrl = "http://localhost:8761/eureka"
    weight = "1"
  }
  redis {
    serverAddr = "localhost:6379"
    db = "0"
    password = ""
    timeout = "0"
  }
  zk {
    serverAddr = "127.0.0.1:2181"
    sessionTimeout = 6000
    connectTimeout = 2000
    username = ""
    password = ""
  }
  consul {
    serverAddr = "xxxx"
  }
  etcd3 {
    serverAddr = "http://localhost:2379"
  }
  sofa {
    serverAddr = "127.0.0.1:9603"
    region = "DEFAULT_ZONE"
    datacenter = "DefaultDataCenter"
    group = "SEATA_GROUP"
    addressWaitTime = "3000"
  }
  file {
    name = "file.conf"
  }
}

config {
  # file、nacos 、apollo、zk、consul、etcd3、springCloudConfig
  type = "file"

  nacos {
    serverAddr = "localhost"
    namespace = ""
    group = "SEATA_GROUP"
    username = ""
    password = ""
  }
  consul {
    serverAddr = "127.0.0.1:8500"
  }
  apollo {
    appId = "seata-server"
    apolloMeta = "http://192.168.1.204:8801"
    namespace = "application"
  }
  zk {
    serverAddr = "127.0.0.1:2181"
    sessionTimeout = 6000
    connectTimeout = 2000
    username = ""
    password = ""
  }
  etcd3 {
    serverAddr = "http://localhost:2379"
  }
  file {
    name = "file.conf"
  }
}

  • 业务数据库新增表undo_log
create table undo_log
(
    id            bigint auto_increment
        primary key,
    branch_id     bigint       not null,
    xid           varchar(100) not null,
    context       varchar(128) not null,
    rollback_info longblob     not null,
    log_status    int          not null,
    log_created   datetime     not null,
    log_modified  datetime     not null,
    constraint ux_undo_log
        unique (xid, branch_id)
)
    charset = utf8;
  • 使用

使用@GlobalTransactional配置