GTS internals WIP

GTS

是阿里的分布式事务解决方案,是在TXC基础上改进的,主要原因是TXC是TCC模式,对业务侵入大,而GTS对业务代码侵入很小。

实现方式:2PC,其中txc server是TM,通过paxos解决事务协调器的单点问题,对全局事务和本地事务进行驱动,undo log用于回滚时的数据恢复。

它支持的isolation有2种

  • read uncommitted(默认)
    实际上是没有隔离
  • read committed(吞吐量严重下降)
    select for update

Usage

client通过dubbo rpc调用OrderService和StockService,要保证这2个RPC的原子性。

Client

1
2
3
4
5
6
7
<-- TxcTransactionScaner扫描所有@TxcTransaction注解的方法进行全局事务拦截 -->
<bean class="com.taobao.txc.client.aop.TxcTransactionScaner">
<constructor-arg value="myapp"/> <!-- appName -->
<constructor-arg value="txc_test_public.1129361738553704.QD"/> <!-- txcServerGroup client/OrderService/StockService的定义要完全相同 -->
<constructor-arg value="1" /> <!-- mode -->
<constructor-arg value="https://test-cs-gts.aliyuncs.com" /> <!-- GTS server address -->
</bean>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class Client {
@TxcTransaction(timeout = 60000 * 3) // 3m
public void Bussiness(OrderService orderService, StockService stockService, String userId) {
// TxcTransactionScaner启动的时候,已经对@TxcTransaction注解的方法进行了拦截
// 拦截开始的时候,REST call配置中心找到txc server,后者会自动生成xid,并存放到TxcContext ThreadLocal
String xid = TxcContext.getCurrentXid(); // txc server已经把xid存放到context里了
// 通过dubbo的RpcContext把全局事务xid传递给相关的所有dubbo service provider
RpcContext.getContext().setAttachment("xid", xid);
OrderDO orderDO = new OrderDO(userId, productId, productNumber, new Timestamp(new Date().getTime()));
orderService.createOrder(orderDO);
RpcContext.getContext().setAttachment("xid",xid);
stockService.updateStock(orderDO);
}
}

OrderService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<!-- 这个dataSource会对每个SQL进行解析、拦截 -->
<bean id="txcDataSource" class="com.taobao.txc.datasource.cobar.TxcDataSource">
<property name="url" value="jdbc:mysql://127.0.0.1:3306/db1" />
<property name="username" value="root" />
<property name="password" value="xxx" />
<property name="driverClassName" value="com.mysql.jdbc.Driver" />
</bean>
<bean class="com.taobao.txc.client.aop.TxcTransactionScaner">
<constructor-arg value="order"/>
<constructor-arg value="txc_test_public.1129361738553704.QD"/> <!-- 与client的相同 -->
<constructor-arg value="1" />
<constructor-arg value="https://test-cs-gts.aliyuncs.com" />
</bean>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class OrderServiceImpl implements OrderService {
public int createOrder(OrderDO orderDO) {
// 从dubbo context获取本全局事务的id
String xid = RpcContext.getContext().getAttachment("xid");
// 通过ThreadLocal把xid传递给下层的本地事务 resourcemanager
TxcContext.bind(xid,null);
// 执行本地事务
String sql = "insert into orders(user_id,product_id,number,gmt_create) values(?, ?, ?, ?)";
int ret = jdbcTemplate.update(sql, new Object[]{orderDO.getUserId(), orderDO.getProductId(), orderDO.getNumber(), orderDO.getGmtCreate()});
// 从ThreadLocal里删除xid
TxcContext.unbind();
return ret;
}
}

StockService

与OrderServiceImpl的执行流程一样

Internals

TxcTransactionScaner

  • 扫码所有TxcTransaction注解的方法,进行拦截
  • 通过gts配置(注册)中心寻找txc server list,client side load balance(round robin/weight)
  • 与txc server进行RPC交互
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
在执行TxcTransaction注解的方法前,获取xid和txc server地址
从diamond配置中心找到group对应的txc server list
TxcContext.BEGIN_COUNT=1
RPC -> BeginMessage(timeout=TxcTransaction.timeout, appname=TxcTransaction.appName, txcInst=Client.Bussiness)
RPC <- BeginResultMessage(xid) txc server生成xid
然后TxcContext.bind
TXC_XID=xid
TXC_XID_OWNER=TXC
TXC_NEXT_SVR_ADDR=
执行被拦截方法的proceed
TxcTransaction注解的方法执行后
TxcContext.COMMIT_COUNT++
if TxcContext.COMMIT_COUNT >= TxcContext.BEGIN_COUNT {
// 该commit全局事务了,如果失败重试5次,中间休息3s
RPC -> GlobalCommitMessage(xid)
RPC <- GlobalCommitResultMessage(resultCode)
}

TxcDataSource

  • 限制

    • multi pks not support yet.
      必须是single pk,而且必须有pk
    • 不支持事务的嵌套
    • mysql builtin functions不知道它的SQL Parser处理怎样
  • 拦截SQL语句并操作undo log

    • 持有ResourceManager

      • 定期做undo log的gc
      • executeUpdate
      • executeQuery
        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        if 在全局事务中 { // 根据TxcContext
        switch config.get("txc.globallock") {
        case 'read uncommitted':
        com.taobao.txc.parser.visitor.tddl.TxcVisitorHelper.getTddlParserVisitor
        解析SQL
        case 'read committed':
        case 'repeatable read', 'serializable':
        异常:不支持
        }
        }
    • commit

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      if TxcRuntimeContext.branchId < 1 {
      RPC -> registTrxBranch(TxcRuntimeContext)
      RPC <- branch id,赋值给TxcRuntimeContext.branchId
      }
      // 这就取得了branchId,也是txc server生成的
      执行本地java.sql.Connection.commit()
      if 本地事务成功提交 {
      RPC -> reportBranchStatus(jdbcUrl, ok)
      // 如果RPC失败,那么会当做本地事务失败处理: 抛异常,进入rollback
      } else {
      本地事务rollback
      RPC -> reportBranchStatus(jdbcUrl, fail)
      }
    • 内部包含了DruidDataSource,实现了SQL解析

    • 本地事务(branch tran)管理
    • 拦截的方法
      • createStatement

AT/MT resource manager(default)
RT resource manager, TXC_RETRY_BRANCH not null

@MtBranch
com.taobao.txc.resourcemanager.mt.MtBranch

RtResourceManager unsupport queryReadLocks

MtRmRpcClient 注册RM
RmRpcClient

TxcRuntimeContext

  • xid
  • writeKeys
  • branchId
  • status
  • serverAddr
  • info[]
    • TxcTable a, b
      • schemaName
      • tableName
      • TxcLine[]
        • TxcField[]
          • name
          • type
          • value
    • sqlType
    • selectSql
    • sql
    • where

undo log

1
2
3
4
5
6
7
8
9
10
11
12
13
14
var8 = new StringBuilder("INSERT INTO ");
var8.append(var2);
var8.append("(id, xid, branch_id, rollback_info, ");
var8.append("gmt_create, gmt_modified, status, server)");
var8.append(" VALUES(");
var8.append("?,"); // id=branch_id
var8.append("?,"); // xid
var8.append("?,"); // branch_id
var8.append("?,"); // TxcRuntimeContext.encode
var8.append("now(),"); // gmt_create
var8.append("now(),"); // gmt_modified
var8.append(TxcRuntimeContext.status); // status
var8.append(",?)"); // TxcRuntimeContext.server
var9 = null;

createStatement

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
try {
获取branchId registTrxBranch
RegisterMessage ->
<- 里面携带gts生成的branchId
存放到TxcRuntimeContext
setAutoCommit(false)
解析SQL,生成TxcTable
createStatement
executeQuery().getMetaData()
commit()
} catch Throwable {
rollback
throw new SQLException
} finally {
setAutoCommit(true)
}

commit

1
2
branch commit
report branch status (最多重试3次)

TxcDataSource是对DruidDataSource的封装:
可以解析SQL,通过resourcemanager

TxcRuntimeContext
id
xid
branchId
string writeKeys

ResourceManager模式

  • at 自动 每秒1次 每天0-3点执行
  • mt 手动 USR2 signal
  • rt 实时

Message通过netty实现
为每个xid建立一个Channel ConcurrentHashMap
xid = server address : xid
建立channel com.taobao.txc.a.b.g,之后进行dauth握手

RegisterMessage(key, branchId, commitMode, bizKey)
ReportStatusMessage(xid)
GlobalCommitMessage(xid)
GlobalRollbackMessage(xid)

hack

  • com.taobao.txc.client.a
  • com.taobao.txc.common.c.F

Q & A

  • xid怎么生成的

    txc_server_addr:id

    @TxcTransaction方法拦截一开始,就请求txc server获取xid,并存放的TxcContext里

  • 为什么OrderService也要配置TxcTransactionScaner

    它除了拦截@TxcTransaction方法,还与配置中心打交道,定时刷新本全局事务的配置信息和txc server addrs

References

Share Comments