GTS
是阿里的分布式事务解决方案,是在TXC基础上改进的,主要原因是TXC是TCC模式,对业务侵入大,而GTS对业务代码侵入很小。
实现方式:2PC,其中txc server是TM,通过paxos解决事务协调器的单点问题,对全局事务和本地事务进行驱动,undo log用于回滚时的数据恢复。
它支持的isolation有2种
- read uncommitted(默认)
实际上是没有隔离
- read committed(吞吐量严重下降)
select for update
Usage
client通过dubbo rpc调用OrderService和StockService,要保证这2个RPC的原子性。
Client
1 2 3 4 5 6 7
| <-- TxcTransactionScaner扫描所有@TxcTransaction注解的方法进行全局事务拦截 --> <bean class="com.taobao.txc.client.aop.TxcTransactionScaner"> <constructor-arg value="myapp"/> <!-- appName --> <constructor-arg value="txc_test_public.1129361738553704.QD"/> <!-- txcServerGroup client/OrderService/StockService的定义要完全相同 --> <constructor-arg value="1" /> <!-- mode --> <constructor-arg value="https://test-cs-gts.aliyuncs.com" /> <!-- GTS server address --> </bean>
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public class Client { @TxcTransaction(timeout = 60000 * 3) // 3m public void Bussiness(OrderService orderService, StockService stockService, String userId) { // TxcTransactionScaner启动的时候,已经对@TxcTransaction注解的方法进行了拦截 // 拦截开始的时候,REST call配置中心找到txc server,后者会自动生成xid,并存放到TxcContext ThreadLocal String xid = TxcContext.getCurrentXid(); // txc server已经把xid存放到context里了 // 通过dubbo的RpcContext把全局事务xid传递给相关的所有dubbo service provider RpcContext.getContext().setAttachment("xid", xid); OrderDO orderDO = new OrderDO(userId, productId, productNumber, new Timestamp(new Date().getTime())); orderService.createOrder(orderDO); RpcContext.getContext().setAttachment("xid",xid); stockService.updateStock(orderDO); } }
|
OrderService
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| <!-- 这个dataSource会对每个SQL进行解析、拦截 --> <bean id="txcDataSource" class="com.taobao.txc.datasource.cobar.TxcDataSource"> <property name="url" value="jdbc:mysql://127.0.0.1:3306/db1" /> <property name="username" value="root" /> <property name="password" value="xxx" /> <property name="driverClassName" value="com.mysql.jdbc.Driver" /> </bean> <bean class="com.taobao.txc.client.aop.TxcTransactionScaner"> <constructor-arg value="order"/> <constructor-arg value="txc_test_public.1129361738553704.QD"/> <!-- 与client的相同 --> <constructor-arg value="1" /> <constructor-arg value="https://test-cs-gts.aliyuncs.com" /> </bean>
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public class OrderServiceImpl implements OrderService { public int createOrder(OrderDO orderDO) { // 从dubbo context获取本全局事务的id String xid = RpcContext.getContext().getAttachment("xid"); // 通过ThreadLocal把xid传递给下层的本地事务 resourcemanager TxcContext.bind(xid,null); // 执行本地事务 String sql = "insert into orders(user_id,product_id,number,gmt_create) values(?, ?, ?, ?)"; int ret = jdbcTemplate.update(sql, new Object[]{orderDO.getUserId(), orderDO.getProductId(), orderDO.getNumber(), orderDO.getGmtCreate()}); // 从ThreadLocal里删除xid TxcContext.unbind(); return ret; } }
|
StockService
与OrderServiceImpl的执行流程一样
Internals
TxcTransactionScaner
- 扫码所有TxcTransaction注解的方法,进行拦截
- 通过gts配置(注册)中心寻找txc server list,client side load balance(round robin/weight)
- 与txc server进行RPC交互
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| 在执行TxcTransaction注解的方法前,获取xid和txc server地址 从diamond配置中心找到group对应的txc server list TxcContext.BEGIN_COUNT=1 RPC -> BeginMessage(timeout=TxcTransaction.timeout, appname=TxcTransaction.appName, txcInst=Client.Bussiness) RPC <- BeginResultMessage(xid) txc server生成xid 然后TxcContext.bind TXC_XID=xid TXC_XID_OWNER=TXC TXC_NEXT_SVR_ADDR= 执行被拦截方法的proceed TxcTransaction注解的方法执行后 TxcContext.COMMIT_COUNT++ if TxcContext.COMMIT_COUNT >= TxcContext.BEGIN_COUNT { // 该commit全局事务了,如果失败重试5次,中间休息3s RPC -> GlobalCommitMessage(xid) RPC <- GlobalCommitResultMessage(resultCode) }
|
TxcDataSource
限制
- multi pks not support yet.
必须是single pk,而且必须有pk
- 不支持事务的嵌套
- mysql builtin functions不知道它的SQL Parser处理怎样
拦截SQL语句并操作undo log
AT/MT resource manager(default)
RT resource manager, TXC_RETRY_BRANCH not null
@MtBranch
com.taobao.txc.resourcemanager.mt.MtBranch
RtResourceManager unsupport queryReadLocks
MtRmRpcClient 注册RM
RmRpcClient
TxcRuntimeContext
- xid
- writeKeys
- branchId
- status
- serverAddr
- info[]
- TxcTable a, b
- schemaName
- tableName
- TxcLine[]
- sqlType
- selectSql
- sql
- where
undo log
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| var8 = new StringBuilder("INSERT INTO "); var8.append(var2); var8.append("(id, xid, branch_id, rollback_info, "); var8.append("gmt_create, gmt_modified, status, server)"); var8.append(" VALUES("); var8.append("?,"); // id=branch_id var8.append("?,"); // xid var8.append("?,"); // branch_id var8.append("?,"); // TxcRuntimeContext.encode var8.append("now(),"); // gmt_create var8.append("now(),"); // gmt_modified var8.append(TxcRuntimeContext.status); // status var8.append(",?)"); // TxcRuntimeContext.server var9 = null;
|
createStatement
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| try { 获取branchId registTrxBranch RegisterMessage -> <- 里面携带gts生成的branchId 存放到TxcRuntimeContext setAutoCommit(false) 解析SQL,生成TxcTable createStatement executeQuery().getMetaData() commit() } catch Throwable { rollback throw new SQLException } finally { setAutoCommit(true) }
|
commit
1 2
| branch commit report branch status (最多重试3次)
|
TxcDataSource是对DruidDataSource的封装:
可以解析SQL,通过resourcemanager
TxcRuntimeContext
id
xid
branchId
string writeKeys
ResourceManager模式
- at 自动 每秒1次 每天0-3点执行
- mt 手动 USR2 signal
- rt 实时
Message通过netty实现
为每个xid建立一个Channel ConcurrentHashMap
xid = server address : xid
建立channel com.taobao.txc.a.b.g,之后进行dauth握手
RegisterMessage(key, branchId, commitMode, bizKey)
ReportStatusMessage(xid)
GlobalCommitMessage(xid)
GlobalRollbackMessage(xid)
hack
- com.taobao.txc.client.a
- com.taobao.txc.common.c.F
Q & A
xid怎么生成的
txc_server_addr:id
@TxcTransaction方法拦截一开始,就请求txc server获取xid,并存放的TxcContext里
为什么OrderService也要配置TxcTransactionScaner
它除了拦截@TxcTransaction方法,还与配置中心打交道,定时刷新本全局事务的配置信息和txc server addrs
References