kafka redesign

Goals

  • support many topics
    • needle in haystack
  • IO optimization
    • R/W isolation
    • index file leads to random sync write
Share Comments

apache bookeeper

Features

  • 没有topic/partition概念,一个stream是由多个ledger组成的,每个ledger是有边界的
    createLedger(int ensSize, int writeQuorumSize, int ackQuorumSize)
  • ledger只有int id,没有名字
  • 每个entry(log)是有unique int64 id的
  • striped write: 交错存储
  • 各个存储节点bookie之间没有明确的主从关系
  • shared WAL
  • single writer
  • Quorum投票复制,通过存储在zk里的LastAddConfirmedId确保read consistency
  • bookie does not communicate with other bookies,由client进行并发broadcast/quorum

VS Kafka

vs kafka

createLedger时,客户端决定如何placement(org.apache.bookkeeper.client.EnsemblePlacementPolicy),然后存放在zookeeper
例如,5个bookie,createLedger(3, 3, 2)

IO Model

io

  • 读写分离
  • Disk 1: Journal(WAL) Device
    • {timestamp}.txn
  • Disk 2: Ledger Device
    • 数据存放在多个ledger目录
    • LastLogMark表示index+data在此之前的都已经持久化到了Ledger Device,之前的WAL可以删除
    • 异步写
    • 而且是顺序写
      • 所有的active ledger共用一个entry logger
      • 读的时候利用ledger index/cache
  • [Disk 3]: Index Device
    • 默认Disk2和Disk3是在一起的
  • 在写入Memtable后,就可以向client ack了

IO被分成4种类型,分别优化

  • sync sequential write: shared WAL
  • async random write: group commit from Memtable
  • tail read: from Memtable
  • random read: from (index + os pagecache)

References

https://github.com/ivankelly/bookkeeper-tutorial
https://github.com/twitter/DistributedLog

Share Comments

SSD

Primer

Physical unit of flash memory

  • Page
    unit for read & write
  • Block
    unit for erase

物理特性

  • Erase before re-write
  • Sequential write within a block

ssd

Optimal I/O for SSD

  • I/O request size越好越好
  • 要符号物理特性
    • page or block对齐
    • segmented sequential write within a block

http://codecapsule.com/2014/02/12/coding-for-ssds-part-6-a-summary-what-every-programmer-should-know-about-solid-state-drives/
http://www.open-open.com/lib/view/open1423106687217.html

Share Comments

oklog

injecter负责write优化(WAL),让storage node负责read优化
RocketMQ类似: CQRS

  • injecter = commit log
  • storage node = consume log

不同在于:storage node是通过pull mode replication机制实现,可以与injecter位于不同机器
RocketMQ的commit log与consume log是在同一台broker上的

  • kafka couples R/W,无法独立scale
  • CQRS decouples R/W,可以独立scale

produce

Producer通过forwarder连接到多个injecter上,injecter间通过gossip来负载均衡,load高的会通过与forwarder协商进行redirect distribution

query

scatter-gather

Share Comments

db trigger

触发器的缺陷

  • 如何监控
  • 代码的版本控制
  • test
  • 部署
  • 性能损耗
  • 多租户
  • 资源隔离
  • 无法频繁发布,如何应付频繁的需求变更
Share Comments

materialized view

物化试图,可以理解为cache of query results, derived result
觉得用“异构表”可能更贴切

materialized view

与试图不同,它是物理存在的,并由数据库来确保与主库的一致性
它是随时可以rebuilt from source store,应用是从来不会更新它的: readonly

MySQL没有提供该功能,但通过dbus可以方便构造materialized view
PostgreSQL提供了materialized view

References

https://docs.microsoft.com/en-us/azure/architecture/patterns/materialized-view

Share Comments

cache invalidation

1
2
3
4
5
6
7
8
9
10
11
// read data
val = cache.get(key)
if val == nil {
val = db.get(key)
cache.put(key, val)
}
return val
// write data
db.put(key, val)
cache.put(key, val)

这会造成dual write conflict

如果需要的只是eventaul consistency,那么通过dbus来进行cache invalidation是最有效的

https://martinfowler.com/bliki/TwoHardThings.html

Share Comments

Linked Esprosso

What

Distributed Document Store

  • RESTful API
  • MySQL作为存储
  • Helix负责集群
  • Databus异步replicate不同数据中心commit log
  • Schema存放在zookeeper,通过Avro的兼容性实现schema evolution

References

https://engineering.linkedin.com/espresso/introducing-espresso-linkedins-hot-new-distributed-document-store

https://nonprofit.linkedin.com/content/dam/static-sites/thirdPartyJS/github-gists?e_origin=https://engineering.linkedin.com&e_channel=resource-iframe-embed-4

Share Comments

Multi-Data Center Consistency

MDCC提供了跨机房的分布式数据库强一致性模型

mdcc

References

http://mdcc.cs.berkeley.edu/

Share Comments

asynchronous distributed snapshot

如何给分布式系统做个全局逻辑一致的快照?
Node State + Channel State

发送规则

1
2
3
4
5
node.recordState()
for conn in allConns {
// before any conn's outbound msg
conn.send(marker)
}

接收规则

1
2
3
4
5
6
7
8
9
10
11
12
msg = conn.recv()
if msg.isMarker() {
t1 = now()
if !node.stateRecorded() {
node.recordState()
Channel(conn) = []
} else {
Channel(conn) = msgsBetween(now(), t1)
// in-flight msgs not applied on state
node.state.apply(msgs before the marker)
}
}

Demo

snapshot

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
a)
P为自己做快照P(red, green, blue)
在Channel(PQ)上 send(marker)
b)
P把绿球送给Q,这个消息是在marker后面
以此同时,Q把自己的橙色球送给P,此时Q(brown, pink)
c)
Q在Channel(PQ)上收到marker // Q是接收者
Q为自己做快照Q(brown, pink)
Channel(PQ) = []
// 因为之前Q把自己的橙色球送给了P,因此Q也是发送者
在Channel(QP)上 send(marker)
d)
P收到橙色球,然后是marker
由于P已经记录了state, Channel(QP)=[orange, ]
最终的分布式系统的snapshot:
P(red, green, blue)
Channel(PQ) []
Q(brown, pink)
Channel(QP) = [orange, ]

FAQ

如何发起

发起global distributed snapshot的节点,可以是一台,也可以多台并发

如何结束

所有节点上都完成了snapshot

用途

故障恢复

与Apache Storm的基于记录的ack不同,Apache Flink的failure recovery采用了改进的Chandy-Lamport算法
checkpoint coordinator是JobManager

data sources periodically inject markers into the data stream.

1
2
3
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(4)
env.enableCheckpointing(1000) // 数据源每1s发送marker(barrier)

Whenever an operator receives such a marker, it checkpoints its internal state.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class StateMachineMapper extends FlatMapFunction[Event, Alert] with Checkpointed[mutable.HashMap[Int, State]] {
private[this] val states = new mutable.HashMap[Int, State]()
override def flatMap(t: Event, out: Collector[Alert]): Unit = {
// get and remove the current state
val state = states.remove(t.sourceAddress).getOrElse(InitialState)
val nextState = state.transition(t.event)
if (nextState == InvalidTransition) {
// 报警
out.collect(Alert(t.sourceAddress, state, t.event))
} else if (!nextState.terminal) {
// put back to states
states.put(t.sourceAddress, nextState)
}
}
override def snapshotState(checkpointId: Long, timestamp: Long): mutable.HashMap[Int, State] = {
// barrier(marker) injected from data source and flows with the records as part of the data stream
//
// snapshotState()与flatMap()一定是串行执行的
// 此时operator已经收到了barrier(marker)
// 在本方法返回后,flink会自动把barrier发给我的output streams
// 再然后,保存states(默认是JobManager内存,也可以HDFS)
states
}
override def restoreState(state: mutable.HashMap[Int, State]): Unit = {
// 出现故障后,flink会停止dataflow,然后重启operator(StateMachineMapper)
states ++= state
}
}

snapshot

References

http://research.microsoft.com/en-us/um/people/lamport/pubs/chandy.pdf
https://arxiv.org/abs/1506.08603
https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html
https://github.com/StephanEwen/flink-demos/tree/master/streaming-state-machine

Share Comments