hybrid logical clock

分布式事务,为了性能,目前通常提供SI/SSI级别的isolation,通过乐观冲突检测
而非2PC悲观方式实现,这就要求实现事务的causality,通常都是拿逻辑时钟实现total order
例如vector clock就是一种,zab里的zxid也是;google percolator里的total order算是
另外一种逻辑时钟,但这种方法由于有明显瓶颈,也增加了一次消息传递

但逻辑时钟无法反应物理时钟,因此有人提出了混合时钟,wall time + logical time,分别是
给人看和给机器看,原理比较简单,就是在交互消息时,接收方一定sender event happens before receiver

但wall time本身比较脆弱,例如一个集群,有台机器ntp出现问题,管理员调整时间的时候出现人为
错误,本来应该是2017-09-09 10:00:00,结果typo成2071-09-09 10:00:00,后果是它会传染给集群
内所有机器,hlc里的wall time都会变成2071年,人工无法修复,除非允许丢弃历史数据,只有等
到2071年那一天系统会自动恢复,wall time部分也就失去了意义

要解决这个问题,可以加入epoch

1
2
3
4
HLC
+-------+-----------+--------------+
| epoch | wall time | logical time |
+-------+-----------+--------------+

修复2071问题时,只需把epoch+1

Share Comments

可靠性金字塔 SRE

SRE

Share Comments

MySQL B+ Tree

sharding

sharding

sharding

sharding

sharding

Share Comments

batch insert(mysql)

1
2
3
4
5
6
7
// case1
INSERT INTO T(v) VALUES(1), (2), (3), (4), (5)
// case2
for i=1; i<=5; i++ {
INSERT INTO T(v) VALUES(i);
}

case1和2有什么影响?假设auto_commit

好处

  • 减少与mysql server的交互
  • 减少SQL解析(如果statement则没区别)
  • query cache打开时,只会invalidate cache一次,提高cache hit

坏处

  • 可能变成一个大事务
    batch insert的时候,batch不能太大
Share Comments

cannot have exactly-once delivery

http://bravenewgeek.com/you-cannot-have-exactly-once-delivery/

Share Comments

RocketMQ解读

Features

  • Producer Group
    发送事务消息时,作为TC,要多机,保存事务状态表{offset: P/C/R}
  • Broker tag-based message filter
  • 定时消息,不支持任意精度,只是特定level: 5s, 10s, 1m等 queueID=delayLevel-1
    因此,应该不支持message revoke
  • 区分commit log和consume log,有点类似WAL和table关系
    可以把它们放在不同FS下,但没有更细粒度的
    增加了一个分发步骤的好处:可以不分发

Commit Log

1
2
3
4
${rocketmq.home}\store\commitlog\${fileName}
fileName[n] = fileName[n-1] + mappedFileSize
为了保证mappedFileSize相同,在每个file tail加padding,默认1GB

每条消息

1
2
3
4
5
6
7
8
QueueOffset针对普通消息,存的是consume log里的offset;如果事务消息,是事务状态表的offset
+---------+-------+-----+---------+------+-------------+----------------+----------------+
| MsgSize | Magic | CRC | QueueID | Flag | QueueOffset | PhysicalOffset | SysFlag(P/C/R) |
+---------+-------+-----+---------+------+-------------+----------------+----------------+
+--------------+------------------+-----------+---------------+----+------+-------+------+
| ProducedTime | ProduderHostPort | StoreTime | StoreHostPort | .. | Body | Topic | Prop |
+--------------+------------------+-----------+---------------+----+------+-------+------+

每次append commit log,会同步调用dispatch分发到consume queue和索引服务

1
2
3
4
5
6
7
new DispatchRequest(topic, queueId,
result.getWroteOffset(), result.getWroteBytes(),
tagsCode, msg.getStoreTimestamp(),
result.getLogicsOffset(), msg.getKeys(),
// Transaction
msg.getSysFlag(),
msg.getPreparedTransactionOffset());

queue

仅仅是逻辑概念,可以通过它来参与producer balance,类似一致哈希里的虚拟节点
每台broker上的commitlog被本机所有的queue共享,不做任何区分

1
2
3
4
5
6
7
broker1: queue0, queue2
broker2: queue0,
then, topicA has 3 queues:
broker1_queue0, broker1_queue2, broker2_queue0
producer.selectOneMessageQueue("topicA", "broker1", "queue0")

消息的局部顺序由producer client保证

Question

  • 如何实现retention by topic: 没有实现
    仅仅根据commit log file的mtime来判断是否过期,虽然里面混杂多topics
  • 如何I/O balancing
  • 如何压缩
  • 如果CRC出错,那么所有topic都受影响?
  • 为什么要存StoreHostPort?如何迁移topic:无法迁移
  • 写commit log需要加锁,这个锁粒度太大,相当于db level lock,而非table level
  • broker的脑裂问题
  • failover
  • topic的commit log是分散在所有broker上的

Consume Queue

1
${rocketmq.home}/store/consumequeue/${topicName}/${queueId}/${fileName}

读一条消息,先读consume queue(类似mysql的secondary index),再读commit log(clustered index)

没有采用sendfile,而是通过mmap:因为random read

1
2
3
+---------------------+-----------------+------------------------+
| CommitLogOffset(8B) | MessageSize(4B) | MessageTagHashcode(8B) |
+---------------------+-----------------+------------------------+

虽然消费时,consume queue是顺序的,但接下来的commit log几乎都是random read,此外
如何优化压缩?光靠pagecache+readahead是远远不够的

Producer

1
2
3
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); // from local cache or name server
MessageQueue mq = topicPublishInfo.selectOneMessageQueue(lastBrokerName);
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout);

Transaction

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 2PC,2 messages
// Phase1
producer group write redolog
producer group send a message(type=TransactionPreparedType) to broker
broker append it to CommitLog and return MessageId
broker will not append it to consume queue
// Phase2
producer group write redolog
producer group send a message(type=TransactionCommitType, msgId=$msgId) to broker
broker find the message with msgId in CommitLog and clone it and append it to CommitLog(type=TransactionCommitType|TransactionRollbackType)
if type == TransactionCommitType {
broker append commit log offset to consume queue
}

State Table

保存在broker,默认1m扫一次

1
2
3
4
5
6
7
8
24B, mmap
+-----------------+------+-----------+-----------------------+--------------+
| CommitLogOffset | Size | Timestamp | ProducerGroupHashcode | State(P/C/R) |
+-----------------+------+-----------+-----------------------+--------------+
prepare消息,insert table
commit/rollback消息,update table

对于未决事务,根据随机向Producer Group里的一台发请求CHECK_TRANSACTION_STATE
Producer Group根据redolog(mmap)定位状态
Producer Group信息存放在namesvr

Problems

  • Producer不再是普通的client,它已经变成server(TC),而且要求不能随便shutdown
  • Producer Group里写redolog的机器死了怎么办

HA

粒度只控制在Broker,而kafka是在partition上

Share Comments

architecture design checklist

archeck

demo

Share Comments

kateway replay messages

Issue

consumer有需求回放/快进消息,目前kateway具有该功能:
用户在web console上把offset设置到指定位置

但由于机器里kateway正在消费源源不断的消息,checkpoint会overwrite这个指定的offset
这就要求用户先关闭消费进程,然后web console上操作,再启动消费进程: not user friendly
在不影响性能前提下,对其进行改进

Solution

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
_, stat, err := cg.kz.conn.Get(path)
if cg.lastVer == -1 {
// 第一次commit offset
cg.lastVer = stat.Version
} else if cg.lastVer != stat.Version {
// user manually reset the offset checkpoint
return ErrRestartConsumerGroup
}
// 也可能在Get后,用户恰好操作了“回放”,通过CAS解决这个问题
switch err {
case zk.ErrNoNode:
return cg.kz.create(path, data, false)
case nil:
newStat, err := cg.kz.conn.Set(path, data, stat.Version)
if err != nil {
cg.lastVer = newStat.Version
}
return err
default:
return err
}
Share Comments

how DBMS works

Basics

dbms

Undo log

  • Oracle和MySQL机制类似
  • MS SQL Server里,称为transaction log
  • PostgreSQL里没有undo log,它通过mvcc系统表实现,每一行存储多个版本

Redo log

  • Oracle和MySQL机制类似
  • MS SQL Server里,称为transaction log
  • PostgreSQL里称为WAL

Query Optimization

大部分是基于Selinger的论文,动态规划算法,把这个问题拆解成3个子问题

  • cost estimation
    以I/O和CPU的成本衡量
  • relational equivalences that define a search space
  • cost-based search

Concurrency Control

Gray论文

  • 区分细粒度和粗粒度的锁
    数据库是个分层结构 hierarchical structure
  • 提出了多种隔离级别
    最初都是2PL实现的serializable isolation

Database Recovery

IBM的ARIES算法(1992),Algorithm for Recovery and Isolation Exploiting Semantics
ARIES can only update the data in-place after the log reaches storage
确保在恢复时,已经commit的事务要redo,未commit的事务要undo
redo log是物理的,undo log是逻辑的

No Force, Steal

  • database need not write dirty pages to disk at commit time
    由于有redo log,update pages are written to disk lazily after commit
    No Force
  • database can flush dirty pages to disk at any time
    由于有undo log,uncommitted(dirty) pages can be written to disk by the buffer manager
    Steal

ARIES为each page保存LSN,disk page是数据管理和恢复的基本单位,page write是原子的

ARIES crash recovery分成3步

  • analysis phase
    从前向后,determine winners & losers
  • redo phase
    如果是Force(在commit前刷dirty pages),就不需要redo stage了
    repeat history
  • undo phase
    从后向前,undo losers

ARIES数据结构

  • xaction table
  • dirty page table
  • checkpoint

Example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
After a crash, we find the following log:
0 BEGIN CHECKPOINT
5 END CHECKPOINT (EMPTY XACT TABLE AND DPT)
10 T1: UPDATE P1 (OLD: YYY NEW: ZZZ)
15 T1: UPDATE P2 (OLD: WWW NEW: XXX)
20 T2: UPDATE P3 (OLD: UUU NEW: VVV)
25 T1: COMMIT
30 T2: UPDATE P1 (OLD: ZZZ NEW: TTT)
Analysis phase:
Scan forward through the log starting at LSN 0.
LSN 5: Initialize XACT table and DPT to empty.
LSN 10: Add (T1, LSN 10) to XACT table. Add (P1, LSN 10) to DPT.
LSN 15: Set LastLSN=15 for T1 in XACT table. Add (P2, LSN 15) to DPT.
LSN 20: Add (T2, LSN 20) to XACT table. Add (P3, LSN 20) to DPT.
LSN 25: Change T1 status to "Commit" in XACT table
LSN 30: Set LastLSN=30 for T2 in XACT table.
Redo phase:
Scan forward through the log starting at LSN 10.
LSN 10: Read page P1, check PageLSN stored in the page. If PageLSN<10, redo LSN 10 (set value to ZZZ) and set the page's PageLSN=10.
LSN 15: Read page P2, check PageLSN stored in the page. If PageLSN<15, redo LSN 15 (set value to XXX) and set the page's PageLSN=15.
LSN 20: Read page P3, check PageLSN stored in the page. If PageLSN<20, redo LSN 20 (set value to VVV) and set the page's PageLSN=20.
LSN 30: Read page P1 if it has been flushed, check PageLSN stored in the page. It will be 10. Redo LSN 30 (set value to TTT) and set the page's PageLSN=30.
Undo phase:
T2 must be undone. Put LSN 30 in ToUndo.
Write Abort record to log for T2
LSN 30: Undo LSN 30 - write a CLR for P1 with "set P1=ZZZ" and undonextLSN=20. Write ZZZ into P1. Put LSN 20 in ToUndo.
LSN 20: Undo LSN 20 - write a CLR for P3 with "set P3=UUU" and undonextLSN=NULL. Write UUU into P3.

ARIES是为传统硬盘设计的,顺序写,但成本也明显:修改1B,需要redo 1B+undo 1B+page 1B=3B
what if in-place update with SSD?

分布式

mid-1970s 2PC 一票否决

References

https://blog.acolyer.org/2016/01/08/aries/
http://cseweb.ucsd.edu/~swanson/papers/SOSP2013-MARS.pdf
https://www.cs.berkeley.edu/~brewer/cs262/Aries.pdf

Share Comments

scalability papers

http://www.perfdynamics.com/Manifesto/USLscalability.html

Share Comments