RocketMQ解读

Features

  • Producer Group
    发送事务消息时,作为TC,要多机,保存事务状态表{offset: P/C/R}
  • Broker tag-based message filter
  • 定时消息,不支持任意精度,只是特定level: 5s, 10s, 1m等 queueID=delayLevel-1
    因此,应该不支持message revoke
  • 区分commit log和consume log,有点类似WAL和table关系
    可以把它们放在不同FS下,但没有更细粒度的
    增加了一个分发步骤的好处:可以不分发

Commit Log

1
2
3
4
${rocketmq.home}\store\commitlog\${fileName}
fileName[n] = fileName[n-1] + mappedFileSize
为了保证mappedFileSize相同,在每个file tail加padding,默认1GB

每条消息

1
2
3
4
5
6
7
8
QueueOffset针对普通消息,存的是consume log里的offset;如果事务消息,是事务状态表的offset
+---------+-------+-----+---------+------+-------------+----------------+----------------+
| MsgSize | Magic | CRC | QueueID | Flag | QueueOffset | PhysicalOffset | SysFlag(P/C/R) |
+---------+-------+-----+---------+------+-------------+----------------+----------------+
+--------------+------------------+-----------+---------------+----+------+-------+------+
| ProducedTime | ProduderHostPort | StoreTime | StoreHostPort | .. | Body | Topic | Prop |
+--------------+------------------+-----------+---------------+----+------+-------+------+

每次append commit log,会同步调用dispatch分发到consume queue和索引服务

1
2
3
4
5
6
7
new DispatchRequest(topic, queueId,
result.getWroteOffset(), result.getWroteBytes(),
tagsCode, msg.getStoreTimestamp(),
result.getLogicsOffset(), msg.getKeys(),
// Transaction
msg.getSysFlag(),
msg.getPreparedTransactionOffset());

queue

仅仅是逻辑概念,可以通过它来参与producer balance,类似一致哈希里的虚拟节点
每台broker上的commitlog被本机所有的queue共享,不做任何区分

1
2
3
4
5
6
7
broker1: queue0, queue2
broker2: queue0,
then, topicA has 3 queues:
broker1_queue0, broker1_queue2, broker2_queue0
producer.selectOneMessageQueue("topicA", "broker1", "queue0")

消息的局部顺序由producer client保证

Question

  • 如何实现retention by topic: 没有实现
    仅仅根据commit log file的mtime来判断是否过期,虽然里面混杂多topics
  • 如何I/O balancing
  • 如何压缩
  • 如果CRC出错,那么所有topic都受影响?
  • 为什么要存StoreHostPort?如何迁移topic:无法迁移
  • 写commit log需要加锁,这个锁粒度太大,相当于db level lock,而非table level
  • broker的脑裂问题
  • failover
  • topic的commit log是分散在所有broker上的

Consume Queue

1
${rocketmq.home}/store/consumequeue/${topicName}/${queueId}/${fileName}

读一条消息,先读consume queue(类似mysql的secondary index),再读commit log(clustered index)

没有采用sendfile,而是通过mmap:因为random read

1
2
3
+---------------------+-----------------+------------------------+
| CommitLogOffset(8B) | MessageSize(4B) | MessageTagHashcode(8B) |
+---------------------+-----------------+------------------------+

虽然消费时,consume queue是顺序的,但接下来的commit log几乎都是random read,此外
如何优化压缩?光靠pagecache+readahead是远远不够的

Producer

1
2
3
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); // from local cache or name server
MessageQueue mq = topicPublishInfo.selectOneMessageQueue(lastBrokerName);
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout);

Transaction

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 2PC,2 messages
// Phase1
producer group write redolog
producer group send a message(type=TransactionPreparedType) to broker
broker append it to CommitLog and return MessageId
broker will not append it to consume queue
// Phase2
producer group write redolog
producer group send a message(type=TransactionCommitType, msgId=$msgId) to broker
broker find the message with msgId in CommitLog and clone it and append it to CommitLog(type=TransactionCommitType|TransactionRollbackType)
if type == TransactionCommitType {
broker append commit log offset to consume queue
}

State Table

保存在broker,默认1m扫一次

1
2
3
4
5
6
7
8
24B, mmap
+-----------------+------+-----------+-----------------------+--------------+
| CommitLogOffset | Size | Timestamp | ProducerGroupHashcode | State(P/C/R) |
+-----------------+------+-----------+-----------------------+--------------+
prepare消息,insert table
commit/rollback消息,update table

对于未决事务,根据随机向Producer Group里的一台发请求CHECK_TRANSACTION_STATE
Producer Group根据redolog(mmap)定位状态
Producer Group信息存放在namesvr

Problems

  • Producer不再是普通的client,它已经变成server(TC),而且要求不能随便shutdown
  • Producer Group里写redolog的机器死了怎么办

HA

粒度只控制在Broker,而kafka是在partition上

Share Comments