Linkedin Esprosso

What

Distributed Document Store

  • RESTful API
  • MySQL作为存储
  • Helix负责集群
  • Databus异步replicate不同数据中心commit log
  • Schema存放在zookeeper,通过Avro的兼容性实现schema evolution

References

https://engineering.linkedin.com/espresso/introducing-espresso-linkedins-hot-new-distributed-document-store

https://nonprofit.linkedin.com/content/dam/static-sites/thirdPartyJS/github-gists?e_origin=https://engineering.linkedin.com&e_channel=resource-iframe-embed-4

Share Comments

Multi-Data Center Consistency

MDCC提供了跨机房的分布式数据库强一致性模型

mdcc

References

http://mdcc.cs.berkeley.edu/

Share Comments

asynchronous distributed snapshot

如何给分布式系统做个全局逻辑一致的快照?
Node State + Channel State

发送规则

1
2
3
4
5
node.recordState()
for conn in allConns {
// before any conn's outbound msg
conn.send(marker)
}

接收规则

1
2
3
4
5
6
7
8
9
10
11
12
msg = conn.recv()
if msg.isMarker() {
t1 = now()
if !node.stateRecorded() {
node.recordState()
Channel(conn) = []
} else {
Channel(conn) = msgsBetween(now(), t1)
// in-flight msgs not applied on state
node.state.apply(msgs before the marker)
}
}

Demo

snapshot

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
a)
P为自己做快照P(red, green, blue)
在Channel(PQ)上 send(marker)
b)
P把绿球送给Q,这个消息是在marker后面
以此同时,Q把自己的橙色球送给P,此时Q(brown, pink)
c)
Q在Channel(PQ)上收到marker // Q是接收者
Q为自己做快照Q(brown, pink)
Channel(PQ) = []
// 因为之前Q把自己的橙色球送给了P,因此Q也是发送者
在Channel(QP)上 send(marker)
d)
P收到橙色球,然后是marker
由于P已经记录了state, Channel(QP)=[orange, ]
最终的分布式系统的snapshot:
P(red, green, blue)
Channel(PQ) []
Q(brown, pink)
Channel(QP) = [orange, ]

FAQ

如何发起

发起global distributed snapshot的节点,可以是一台,也可以多台并发

如何结束

所有节点上都完成了snapshot

用途

故障恢复

与Apache Storm的基于记录的ack不同,Apache Flink的failure recovery采用了改进的Chandy-Lamport算法
checkpoint coordinator是JobManager

data sources periodically inject markers into the data stream.

1
2
3
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(4)
env.enableCheckpointing(1000) // 数据源每1s发送marker(barrier)

Whenever an operator receives such a marker, it checkpoints its internal state.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class StateMachineMapper extends FlatMapFunction[Event, Alert] with Checkpointed[mutable.HashMap[Int, State]] {
private[this] val states = new mutable.HashMap[Int, State]()
override def flatMap(t: Event, out: Collector[Alert]): Unit = {
// get and remove the current state
val state = states.remove(t.sourceAddress).getOrElse(InitialState)
val nextState = state.transition(t.event)
if (nextState == InvalidTransition) {
// 报警
out.collect(Alert(t.sourceAddress, state, t.event))
} else if (!nextState.terminal) {
// put back to states
states.put(t.sourceAddress, nextState)
}
}
override def snapshotState(checkpointId: Long, timestamp: Long): mutable.HashMap[Int, State] = {
// barrier(marker) injected from data source and flows with the records as part of the data stream
//
// snapshotState()与flatMap()一定是串行执行的
// 此时operator已经收到了barrier(marker)
// 在本方法返回后,flink会自动把barrier发给我的output streams
// 再然后,保存states(默认是JobManager内存,也可以HDFS)
states
}
override def restoreState(state: mutable.HashMap[Int, State]): Unit = {
// 出现故障后,flink会停止dataflow,然后重启operator(StateMachineMapper)
states ++= state
}
}

snapshot

References

http://research.microsoft.com/en-us/um/people/lamport/pubs/chandy.pdf
https://arxiv.org/abs/1506.08603
https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html
https://github.com/StephanEwen/flink-demos/tree/master/streaming-state-machine

Share Comments

https

curl https://baidu.com
How the 270ms passed

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
1 1 0.0721 (0.0721) C>S Handshake
ClientHello
Version 3.1
cipher suites
TLS_EMPTY_RENEGOTIATION_INFO_SCSV
TLS_DHE_RSA_WITH_AES_256_CBC_SHA
TLS_DHE_RSA_WITH_AES_256_CBC_SHA256
TLS_DHE_DSS_WITH_AES_256_CBC_SHA
TLS_RSA_WITH_AES_256_CBC_SHA
TLS_RSA_WITH_AES_256_CBC_SHA256
TLS_DHE_RSA_WITH_AES_128_CBC_SHA
TLS_DHE_RSA_WITH_AES_128_CBC_SHA256
TLS_DHE_DSS_WITH_AES_128_CBC_SHA
TLS_RSA_WITH_RC4_128_SHA
TLS_RSA_WITH_RC4_128_MD5
TLS_RSA_WITH_AES_128_CBC_SHA
TLS_RSA_WITH_AES_128_CBC_SHA256
TLS_DHE_RSA_WITH_3DES_EDE_CBC_SHA
TLS_DHE_DSS_WITH_3DES_EDE_CBC_SHA
TLS_RSA_WITH_3DES_EDE_CBC_SHA
compression methods
NULL
1 2 0.1202 (0.0480) S>C Handshake
ServerHello
Version 3.1
session_id[32]=
b3 ea 99 ee 5a 4c 03 e8 e0 74 95 09 f1 11 09 2a
9d f5 8f 2a 26 7a d3 7f 71 ff dc 39 62 66 b0 f9
cipherSuite TLS_RSA_WITH_AES_128_CBC_SHA
compressionMethod NULL
1 3 0.1205 (0.0002) S>C Handshake
Certificate
1 4 0.1205 (0.0000) S>C Handshake
ServerHelloDone
1 5 0.1244 (0.0039) C>S Handshake
ClientKeyExchange
1 6 0.1244 (0.0000) C>S ChangeCipherSpec
1 7 0.1244 (0.0000) C>S Handshake
1 8 0.1737 (0.0492) S>C ChangeCipherSpec
1 9 0.1737 (0.0000) S>C Handshake
1 10 0.1738 (0.0001) C>S application_data
1 11 0.2232 (0.0493) S>C application_data
1 12 0.2233 (0.0001) C>S Alert
1 0.2234 (0.0000) C>S TCP FIN
1 0.2709 (0.0475) S>C TCP FIN

Share Comments

hybrid logical clock

分布式事务,为了性能,目前通常提供SI/SSI级别的isolation,通过乐观冲突检测
而非2PC悲观方式实现,这就要求实现事务的causality,通常都是拿逻辑时钟实现total order
例如vector clock就是一种,zab里的zxid也是;google percolator里的total order算是
另外一种逻辑时钟,但这种方法由于有明显瓶颈,也增加了一次消息传递

但逻辑时钟无法反应物理时钟,因此有人提出了混合时钟,wall time + logical time,分别是
给人看和给机器看,原理比较简单,就是在交互消息时,接收方一定sender event happens before receiver

但wall time本身比较脆弱,例如一个集群,有台机器ntp出现问题,管理员调整时间的时候出现人为
错误,本来应该是2017-09-09 10:00:00,结果typo成2071-09-09 10:00:00,后果是它会传染给集群
内所有机器,hlc里的wall time都会变成2071年,人工无法修复,除非允许丢弃历史数据,只有等
到2071年那一天系统会自动恢复,wall time部分也就失去了意义

要解决这个问题,可以加入epoch

1
2
3
4
HLC
+-------+-----------+--------------+
| epoch | wall time | logical time |
+-------+-----------+--------------+

修复2071问题时,只需把epoch+1

Share Comments

可靠性金字塔 SRE

SRE

Share Comments

MySQL B+ Tree

sharding

sharding

sharding

sharding

sharding

Share Comments

batch insert(mysql)

1
2
3
4
5
6
7
// case1
INSERT INTO T(v) VALUES(1), (2), (3), (4), (5)
// case2
for i=1; i<=5; i++ {
INSERT INTO T(v) VALUES(i);
}

case1和2有什么影响?假设auto_commit

好处

  • 减少与mysql server的交互
  • 减少SQL解析(如果statement则没区别)
  • query cache打开时,只会invalidate cache一次,提高cache hit

坏处

  • 可能变成一个大事务
    batch insert的时候,batch不能太大
Share Comments

cannot have exactly-once delivery

http://bravenewgeek.com/you-cannot-have-exactly-once-delivery/

Share Comments

RocketMQ解读

Features

  • Producer Group
    发送事务消息时,作为TC,要多机,保存事务状态表{offset: P/C/R}
  • Broker tag-based message filter
  • 定时消息,不支持任意精度,只是特定level: 5s, 10s, 1m等 queueID=delayLevel-1
    因此,应该不支持message revoke
  • 区分commit log和consume log,有点类似WAL和table关系
    可以把它们放在不同FS下,但没有更细粒度的
    增加了一个分发步骤的好处:可以不分发

Commit Log

1
2
3
4
${rocketmq.home}\store\commitlog\${fileName}
fileName[n] = fileName[n-1] + mappedFileSize
为了保证mappedFileSize相同,在每个file tail加padding,默认1GB

每条消息

1
2
3
4
5
6
7
8
QueueOffset针对普通消息,存的是consume log里的offset;如果事务消息,是事务状态表的offset
+---------+-------+-----+---------+------+-------------+----------------+----------------+
| MsgSize | Magic | CRC | QueueID | Flag | QueueOffset | PhysicalOffset | SysFlag(P/C/R) |
+---------+-------+-----+---------+------+-------------+----------------+----------------+
+--------------+------------------+-----------+---------------+----+------+-------+------+
| ProducedTime | ProduderHostPort | StoreTime | StoreHostPort | .. | Body | Topic | Prop |
+--------------+------------------+-----------+---------------+----+------+-------+------+

每次append commit log,会同步调用dispatch分发到consume queue和索引服务

1
2
3
4
5
6
7
new DispatchRequest(topic, queueId,
result.getWroteOffset(), result.getWroteBytes(),
tagsCode, msg.getStoreTimestamp(),
result.getLogicsOffset(), msg.getKeys(),
// Transaction
msg.getSysFlag(),
msg.getPreparedTransactionOffset());

queue

仅仅是逻辑概念,可以通过它来参与producer balance,类似一致哈希里的虚拟节点
每台broker上的commitlog被本机所有的queue共享,不做任何区分

1
2
3
4
5
6
7
broker1: queue0, queue2
broker2: queue0,
then, topicA has 3 queues:
broker1_queue0, broker1_queue2, broker2_queue0
producer.selectOneMessageQueue("topicA", "broker1", "queue0")

消息的局部顺序由producer client保证

Question

  • 如何实现retention by topic: 没有实现
    仅仅根据commit log file的mtime来判断是否过期,虽然里面混杂多topics
  • 如何I/O balancing
  • 如何压缩
  • 如果CRC出错,那么所有topic都受影响?
  • 为什么要存StoreHostPort?如何迁移topic:无法迁移
  • 写commit log需要加锁,这个锁粒度太大,相当于db level lock,而非table level
  • broker的脑裂问题
  • failover
  • topic的commit log是分散在所有broker上的

Consume Queue

1
${rocketmq.home}/store/consumequeue/${topicName}/${queueId}/${fileName}

读一条消息,先读consume queue(类似mysql的secondary index),再读commit log(clustered index)

没有采用sendfile,而是通过mmap:因为random read

1
2
3
+---------------------+-----------------+------------------------+
| CommitLogOffset(8B) | MessageSize(4B) | MessageTagHashcode(8B) |
+---------------------+-----------------+------------------------+

虽然消费时,consume queue是顺序的,但接下来的commit log几乎都是random read,此外
如何优化压缩?光靠pagecache+readahead是远远不够的

Producer

1
2
3
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); // from local cache or name server
MessageQueue mq = topicPublishInfo.selectOneMessageQueue(lastBrokerName);
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout);

Transaction

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 2PC,2 messages
// Phase1
producer group write redolog
producer group send a message(type=TransactionPreparedType) to broker
broker append it to CommitLog and return MessageId
broker will not append it to consume queue
// Phase2
producer group write redolog
producer group send a message(type=TransactionCommitType, msgId=$msgId) to broker
broker find the message with msgId in CommitLog and clone it and append it to CommitLog(type=TransactionCommitType|TransactionRollbackType)
if type == TransactionCommitType {
broker append commit log offset to consume queue
}

State Table

保存在broker,默认1m扫一次

1
2
3
4
5
6
7
8
24B, mmap
+-----------------+------+-----------+-----------------------+--------------+
| CommitLogOffset | Size | Timestamp | ProducerGroupHashcode | State(P/C/R) |
+-----------------+------+-----------+-----------------------+--------------+
prepare消息,insert table
commit/rollback消息,update table

对于未决事务,根据随机向Producer Group里的一台发请求CHECK_TRANSACTION_STATE
Producer Group根据redolog(mmap)定位状态
Producer Group信息存放在namesvr

Problems

  • Producer不再是普通的client,它已经变成server(TC),而且要求不能随便shutdown
  • Producer Group里写redolog的机器死了怎么办

HA

粒度只控制在Broker,而kafka是在partition上

Share Comments