cannot have exactly-once delivery

http://bravenewgeek.com/you-cannot-have-exactly-once-delivery/

Share Comments

RocketMQ解读

Features

  • Producer Group
    发送事务消息时,作为TC,要多机,保存事务状态表{offset: P/C/R}
  • Broker tag-based message filter
  • 定时消息,不支持任意精度,只是特定level: 5s, 10s, 1m等 queueID=delayLevel-1
    因此,应该不支持message revoke
  • 区分commit log和consume log,有点类似WAL和table关系
    可以把它们放在不同FS下,但没有更细粒度的
    增加了一个分发步骤的好处:可以不分发

Commit Log

1
2
3
4
${rocketmq.home}\store\commitlog\${fileName}
fileName[n] = fileName[n-1] + mappedFileSize
为了保证mappedFileSize相同,在每个file tail加padding,默认1GB

每条消息

1
2
3
4
5
6
7
8
QueueOffset针对普通消息,存的是consume log里的offset;如果事务消息,是事务状态表的offset
+---------+-------+-----+---------+------+-------------+----------------+----------------+
| MsgSize | Magic | CRC | QueueID | Flag | QueueOffset | PhysicalOffset | SysFlag(P/C/R) |
+---------+-------+-----+---------+------+-------------+----------------+----------------+
+--------------+------------------+-----------+---------------+----+------+-------+------+
| ProducedTime | ProduderHostPort | StoreTime | StoreHostPort | .. | Body | Topic | Prop |
+--------------+------------------+-----------+---------------+----+------+-------+------+

每次append commit log,会同步调用dispatch分发到consume queue和索引服务

1
2
3
4
5
6
7
new DispatchRequest(topic, queueId,
result.getWroteOffset(), result.getWroteBytes(),
tagsCode, msg.getStoreTimestamp(),
result.getLogicsOffset(), msg.getKeys(),
// Transaction
msg.getSysFlag(),
msg.getPreparedTransactionOffset());

queue

仅仅是逻辑概念,可以通过它来参与producer balance,类似一致哈希里的虚拟节点
每台broker上的commitlog被本机所有的queue共享,不做任何区分

1
2
3
4
5
6
7
broker1: queue0, queue2
broker2: queue0,
then, topicA has 3 queues:
broker1_queue0, broker1_queue2, broker2_queue0
producer.selectOneMessageQueue("topicA", "broker1", "queue0")

消息的局部顺序由producer client保证

Question

  • 如何实现retention by topic: 没有实现
    仅仅根据commit log file的mtime来判断是否过期,虽然里面混杂多topics
  • 如何I/O balancing
  • 如何压缩
  • 如果CRC出错,那么所有topic都受影响?
  • 为什么要存StoreHostPort?如何迁移topic:无法迁移
  • 写commit log需要加锁,这个锁粒度太大,相当于db level lock,而非table level
  • broker的脑裂问题
  • failover
  • topic的commit log是分散在所有broker上的

Consume Queue

1
${rocketmq.home}/store/consumequeue/${topicName}/${queueId}/${fileName}

读一条消息,先读consume queue(类似mysql的secondary index),再读commit log(clustered index)

没有采用sendfile,而是通过mmap:因为random read

1
2
3
+---------------------+-----------------+------------------------+
| CommitLogOffset(8B) | MessageSize(4B) | MessageTagHashcode(8B) |
+---------------------+-----------------+------------------------+

虽然消费时,consume queue是顺序的,但接下来的commit log几乎都是random read,此外
如何优化压缩?光靠pagecache+readahead是远远不够的

Producer

1
2
3
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); // from local cache or name server
MessageQueue mq = topicPublishInfo.selectOneMessageQueue(lastBrokerName);
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout);

Transaction

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 2PC,2 messages
// Phase1
producer group write redolog
producer group send a message(type=TransactionPreparedType) to broker
broker append it to CommitLog and return MessageId
broker will not append it to consume queue
// Phase2
producer group write redolog
producer group send a message(type=TransactionCommitType, msgId=$msgId) to broker
broker find the message with msgId in CommitLog and clone it and append it to CommitLog(type=TransactionCommitType|TransactionRollbackType)
if type == TransactionCommitType {
broker append commit log offset to consume queue
}

State Table

保存在broker,默认1m扫一次

1
2
3
4
5
6
7
8
24B, mmap
+-----------------+------+-----------+-----------------------+--------------+
| CommitLogOffset | Size | Timestamp | ProducerGroupHashcode | State(P/C/R) |
+-----------------+------+-----------+-----------------------+--------------+
prepare消息,insert table
commit/rollback消息,update table

对于未决事务,根据随机向Producer Group里的一台发请求CHECK_TRANSACTION_STATE
Producer Group根据redolog(mmap)定位状态
Producer Group信息存放在namesvr

Problems

  • Producer不再是普通的client,它已经变成server(TC),而且要求不能随便shutdown
  • Producer Group里写redolog的机器死了怎么办

HA

粒度只控制在Broker,而kafka是在partition上

Share Comments

architecture design checklist

archeck

demo

Share Comments

kateway replay messages

Issue

consumer有需求回放/快进消息,目前kateway具有该功能:
用户在web console上把offset设置到指定位置

但由于机器里kateway正在消费源源不断的消息,checkpoint会overwrite这个指定的offset
这就要求用户先关闭消费进程,然后web console上操作,再启动消费进程: not user friendly
在不影响性能前提下,对其进行改进

Solution

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
_, stat, err := cg.kz.conn.Get(path)
if cg.lastVer == -1 {
// 第一次commit offset
cg.lastVer = stat.Version
} else if cg.lastVer != stat.Version {
// user manually reset the offset checkpoint
return ErrRestartConsumerGroup
}
// 也可能在Get后,用户恰好操作了“回放”,通过CAS解决这个问题
switch err {
case zk.ErrNoNode:
return cg.kz.create(path, data, false)
case nil:
newStat, err := cg.kz.conn.Set(path, data, stat.Version)
if err != nil {
cg.lastVer = newStat.Version
}
return err
default:
return err
}
Share Comments

how DBMS works

Basics

dbms

Undo log

  • Oracle和MySQL机制类似
  • MS SQL Server里,称为transaction log
  • PostgreSQL里没有undo log,它通过mvcc系统表实现,每一行存储多个版本

Redo log

  • Oracle和MySQL机制类似
  • MS SQL Server里,称为transaction log
  • PostgreSQL里称为WAL

Query Optimization

大部分是基于Selinger的论文,动态规划算法,把这个问题拆解成3个子问题

  • cost estimation
    以I/O和CPU的成本衡量
  • relational equivalences that define a search space
  • cost-based search

Concurrency Control

Gray论文

  • 区分细粒度和粗粒度的锁
    数据库是个分层结构 hierarchical structure
  • 提出了多种隔离级别
    最初都是2PL实现的serializable isolation

Database Recovery

IBM的ARIES算法(1992),Algorithm for Recovery and Isolation Exploiting Semantics
ARIES can only update the data in-place after the log reaches storage
确保在恢复时,已经commit的事务要redo,未commit的事务要undo
redo log是物理的,undo log是逻辑的

No Force, Steal

  • database need not write dirty pages to disk at commit time
    由于有redo log,update pages are written to disk lazily after commit
    No Force
  • database can flush dirty pages to disk at any time
    由于有undo log,uncommitted(dirty) pages can be written to disk by the buffer manager
    Steal

ARIES为each page保存LSN,disk page是数据管理和恢复的基本单位,page write是原子的

ARIES crash recovery分成3步

  • analysis phase
    从前向后,determine winners & losers
  • redo phase
    如果是Force(在commit前刷dirty pages),就不需要redo stage了
    repeat history
  • undo phase
    从后向前,undo losers

ARIES数据结构

  • xaction table
  • dirty page table
  • checkpoint

Example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
After a crash, we find the following log:
0 BEGIN CHECKPOINT
5 END CHECKPOINT (EMPTY XACT TABLE AND DPT)
10 T1: UPDATE P1 (OLD: YYY NEW: ZZZ)
15 T1: UPDATE P2 (OLD: WWW NEW: XXX)
20 T2: UPDATE P3 (OLD: UUU NEW: VVV)
25 T1: COMMIT
30 T2: UPDATE P1 (OLD: ZZZ NEW: TTT)
Analysis phase:
Scan forward through the log starting at LSN 0.
LSN 5: Initialize XACT table and DPT to empty.
LSN 10: Add (T1, LSN 10) to XACT table. Add (P1, LSN 10) to DPT.
LSN 15: Set LastLSN=15 for T1 in XACT table. Add (P2, LSN 15) to DPT.
LSN 20: Add (T2, LSN 20) to XACT table. Add (P3, LSN 20) to DPT.
LSN 25: Change T1 status to "Commit" in XACT table
LSN 30: Set LastLSN=30 for T2 in XACT table.
Redo phase:
Scan forward through the log starting at LSN 10.
LSN 10: Read page P1, check PageLSN stored in the page. If PageLSN<10, redo LSN 10 (set value to ZZZ) and set the page's PageLSN=10.
LSN 15: Read page P2, check PageLSN stored in the page. If PageLSN<15, redo LSN 15 (set value to XXX) and set the page's PageLSN=15.
LSN 20: Read page P3, check PageLSN stored in the page. If PageLSN<20, redo LSN 20 (set value to VVV) and set the page's PageLSN=20.
LSN 30: Read page P1 if it has been flushed, check PageLSN stored in the page. It will be 10. Redo LSN 30 (set value to TTT) and set the page's PageLSN=30.
Undo phase:
T2 must be undone. Put LSN 30 in ToUndo.
Write Abort record to log for T2
LSN 30: Undo LSN 30 - write a CLR for P1 with "set P1=ZZZ" and undonextLSN=20. Write ZZZ into P1. Put LSN 20 in ToUndo.
LSN 20: Undo LSN 20 - write a CLR for P3 with "set P3=UUU" and undonextLSN=NULL. Write UUU into P3.

ARIES是为传统硬盘设计的,顺序写,但成本也明显:修改1B,需要redo 1B+undo 1B+page 1B=3B
what if in-place update with SSD?

分布式

mid-1970s 2PC 一票否决

References

https://blog.acolyer.org/2016/01/08/aries/
http://cseweb.ucsd.edu/~swanson/papers/SOSP2013-MARS.pdf
https://www.cs.berkeley.edu/~brewer/cs262/Aries.pdf

Share Comments

scalability papers

http://www.perfdynamics.com/Manifesto/USLscalability.html

Share Comments

PostgreSQL MVCC

Internals

MySQL通过undo log记录uncommitted changes,与此不同,PostgreSQL store all row versions in table data structure.

每个row有2个隐藏字段

  • Tmin insert时的trx id
  • Tmax delete时的trx id

INSERT

insert

DELETE

delete

DELETE操作并不会马上物理删除,而是VACUUM进程调度进行purge

UPDATE

update

Share Comments

VNC protocol

WebRTC

WebRTC提供了direct data and media stream transfer between two browsers without external server involved: P2P

浏览器上点击“Screen share”按钮后

1
2
3
4
5
6
7
8
// sender
利用OS的API获取screenshot,并以一定的FPS来进行发送
// 优化:把屏幕分成chunk,在把timer之间有变化的chunk生成frame
发送时,frame被编码成H.264或VP8
通过HTTPS发送
// receiver
对接收到的frame解码并显示

通过WebRTC实现的是只读的屏幕分享,receiver不能控制sender屏幕

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
<body>
<p><input type="button" id="share" value="Screen share" /></p>
<p><video id="video" autoplay /></p>
</body>
<script>
navigator.getUserMedia = navigator.webkitGetUserMedia || navigator.getUserMedia;
$('#share').click(function() {
navigator.getUserMedia({
audio: false ,
video: {
mandatory: {
chromeMediaSource: 'screen' ,
maxWidth: 1280 ,
maxHeight: 720
} ,
optional: [ ]
}
}, function(stream) {
// we've got media stream
// so the received stream can be transmitted via WebRTC the same way as web camera and easily played in <video> component on the other side
document.getElementById('video').src = window.URL.createObjectURL(stream);
} , function() {
alert('Error. Try in latest Chrome with Screen sharing enabled in about:flags.');
})
})
</script>

VNC

Remote Frame Buffer,支持X11, Windows, Mac
远程终端用户使用机器(比如显示器、键盘、鼠标)的叫做客户端,提供帧缓存变化的被称为服务器

显示协议

pixel(x, y) => 像素数据编码

C->S消息类型

SetEncodings

Raw, CopyRect, RRE, Hextile, TRLE, ZRLE

FramebufferUpdateRequest

最重要的显示消息,表示client要server传回哪些区域的图像

1
2
3
4
client.send(messageType, incremental, x, y, width, height) => server
// incremental>0,表示该区域内容变化了才发给client;没有变化,就不用发
server.reply(messageType, rectangleN, [{x, y, with, height, color}, ...]) => client

KeyEvent

client端的键盘动作

PointerEvent

client端的鼠标动作

vnc browser

http://guacamole.incubator.apache.org/
https://github.com/novnc/noVNC

References

http://www.tuicool.com/articles/Rzqumu
https://github.com/macton/hterm
chrome://flags/#enable-usermedia-screen-capture

Share Comments

LSM for SSD

Basics

leveldb

  • (immutable)memtable: sorted skiplist
  • SSTable: sorted string table
  • L0里的SSTable们,key可能会overlap,因为他们是直接从immutable memtable刷出来的

SSTable file

1
2
3
+-------------------+-------------------------+------------+
| index block(16KB) | bloom-filter block(4KB) | data block |
+-------------------+-------------------------+------------+

Get(key)

1
2
3
4
5
6
7
8
9
10
11
12
13
locate key in memtable
if found then return
locate key in immutable memtable
if found then return
for level:=0; level<=6; level++ {
// 对于level0,需要遍历所有SSTable,因为keys overlap
// 但leveldb为了解决这个问题,除了Bloomfilter,也限制了L0的文件数量,一旦超过8,就compact(L0->L1)
// 对于其他level,由于已经sorted,可以直接定位SSTable
//
// 最坏情况下,Get(key)需要读8个L0以及L1-L6,共14个文件
locate key in SSTable in $level
if found then return
}

SSD

主流SSD,例如Samsung 960 Pro,可以提供440K/s random read
with block size=4KB

LSM是为传统硬盘设计的,在SSD下,可以做优化,不必过分担心随机读

优化

LSM-Tree的主要成本都在compaction(merge sort),造成IO放大(50倍)

  • 读很多文件到内存
  • 排序
  • 再写回到磁盘

io

要优化compaction,可以把LSM Tree变小,RocksDB是通过压缩实现的

在SSD下,可以考虑把key和value分离,在LSM Tree里只保存sorted key和pointer(value),value直接保存在WAL里

key: 16B
pointer(value): 16B

2M个k/v,需要64MB
2B个k/v,需要64GB

Reference

https://www.usenix.org/system/files/conference/fast16/fast16-papers-lu.pdf

Share Comments

InnoDB MVCC

Basic

InnoDB中通过Undo log实现了txn rollback和MVCC,而并发控制(isolation)通过锁来实现
Undo log分为insert undo和update undo(delete是一种特殊的update),回滚时

  • insert只要把insert undo log丢弃即可
  • update需要通过DB_ROLL_PTR DB_TRX_ID找到事务修改前的版本并恢复

与redo log不同的是,磁盘上不存在单独的undo log文件,所有的undo log均存放在主ibd数据文件中(表空间),即使客户端设置了每表一个数据文件也是如此

内部存储

InnoDB为每行row都reserved了隐藏字段(system column)

  • DB_ROW_ID
  • DB_TRX_ID
  • DB_ROLL_PTR
1
2
3
4
5
typedef ib_uint64_t ib_id_t;
typedef ib_id_t row_id_t;
typedef ib_id_t trx_id_t;
typedef ib_id_t roll_ptr_t;

syscol

Undo log实现方式

  • 事务以排他锁的形式修改原始数据
  • 把修改前的数据存放于undo log,通过回滚指针与主数据关联
  • 修改成功(commit)啥都不做,失败则恢复undo log中的数据(rollback)

Demo

demo
demo
demo

Innodb中存在purge线程,它会查询那些比现在最老的活动事务还早的undo log,并删除它们

Issues

回滚成本

当事务正常提交时Innbod只需要更改事务状态为COMMIT即可,不需做其他额外的工作
而Rollback如果事务影响的行非常多,回滚则可能成本很高

write skew

InnoDB通过Undo log实现的MVCC在修改单行记录是没有问题的,但多行时就可能出问题

1
2
3
4
begin;
update table set col1=2 where id=1; // 成功,创建了undo log
update table set col2=3 where id=2; // 失败
rollback;

回滚row(id=1)时,由于它没有被lock,此时可能已经被另外一个txn给修改了,这个回滚会破坏已经commit的事务

如果要解决这个问题,需要应用层进行控制

另外一个例子

1
2
3
4
5
6
国家规定一个家庭最多养3只宠物(constraint),Alice和Bob是一家人,他们现在有dog(1)+cat(1)
如果并发地,Alice再买一只猫,而Bob再买一只狗,这2个事务就会write skew
因为repeatable read:一个事务提交之后才visible by other txn
TxnAlice TxnBob
cat=cat+1 dog=dog+1
这2个事务都成功了,但却破坏了constraint

RR隔离级别下,普通select不加锁,使用MVCC进行一致性读取,即snapshot read
update, insert, delete, select … for update, select … lock in share mode都会进行加锁,并且读取的是当前版本: READ COMMITTED读
除了lock in share mode是S锁,其他都是X锁

References

https://dev.mysql.com/doc/refman/5.7/en/innodb-locks-set.html
https://blog.jcole.us/innodb/
http://jimgray.azurewebsites.net/WICS_99_TP/

Share Comments