ElasticSearch internals

Basics

Storage Model

一种特殊的LSM Tree

1
2
3
4
5
6
ES LSM
======== ==========
translog WAL
buffer MemTable
segment SSTable
merge merge & compaction

Defaults

1
2
3
4
5
6
node.master: true
node.data: true
index.number_of_shards: 5
index.number_of_replicas: 1
path.data: /path/to/data1,/path/to/data2
http.max_content_length: 100mb

Best Practice

内存,一半给ES,一半给pagecache

Indexing

write

  • translog(WAL)

    • 每个shard一个translog,即每个index一个translog
    • 它提供实时的CRUD by docID,先translog然后再segments
    • 默认5s刷盘
    • translog存放在path.data,可以stripe,但存在与segments竞争资源问题
      1
      2
      3
      4
      $ls nodes/0/indices/twitter/1/
      drwxr-xr-x 3 funky wheel 102B 6 8 17:23 _state
      drwxr-xr-x 5 funky wheel 170B 6 8 17:23 index
      drwxr-xr-x 3 funky wheel 102B 6 8 17:23 translog
  • refresh

    • on segments
    • scheduled periodically(1s),也可手工 /_refresh
    • 在此触发merge逻辑
    • refresh后,那么在此之前的所有变更就可以搜索了,in-memory buffer的数据是不能搜索的
    • refresh
      • 产生新segment,但不fsync(close不会触发fsync),然后open(the new segment)
      • 不保证durability,那是由flush保证的
      • in-memory buffer清除
  • flush
    • on translog
    • 30s/200MB/5000 ops by default,会触发commit
    • Any docs in the in-memory buffer are written to a new segment
    • The buffer is cleared
    • A commit point is written to disk
      commit point is a file that contains list of segments ready for search
    • The filesystem cache is flushed with an fsync
    • The old translog is deleted
  • commit
    • 把还没有fsync的segments,一一fsync
    • commit
  • merge
    • merge
    • policy
      • tiered(default)
      • log_byte_size
      • log_doc
    • 可能把committed和uncommitted segments一起merge
    • 异步,不影响indexing/query
    • 合并后,fsync the merged big segment,之前的small segments被删除
    • ES内部有throttle机制控制merge进度,防止它占用过多资源: 20MB/s
  • update
    delete, then insert。mysql内部的变长字段update也是这么实现的

Hard commit

默认30分钟,或者translog size(200MB),whoever come early

1
2
3
4
5
6
7
8
把in-memory buffer内容生成一个新segment,并把buffer清空
把每个还没有commit point的segment进行fsync
生成commit point file
IndexSearcher is opened and all documents are searchable
translog.empty()
translog.sequence ++
// 如果机器突然停电,可以完全恢复

Merge

  • Happens in parallel to searching. Searcher is changed to new segment.
  • Deleting a document creates a new document and .del file to keep track that document is deleted
  • 由于一个document可能多次update,在不同的segment可能出现多次delete

Replication

primary-backup model

master的indexing过程

1
2
3
validate indexing request
local indexing
concurrently dispatch the indexing request to all replicas

Query

read

1
2
3
4
translog.search(q)
for s in range segments {
s.search(q)
}

deep pagination

search(from=50000, size=10),那么每个shard会创建一个优先队列,队列大小=50010,从每个segment里取结果,直到填满
而coordinating node需要创建的优先队列 number_of_shards * 50010

利用scan+scroll可以批量取数据(sorting disabled),例如在reindex时使用

Cluster

Intro

所有node彼此相连,n*(n-1)个连接
shard = MurMurHash3(document_id) % (num_of_primary_shards)

Node1节点用数据的_id计算出数据应该存储在shard0上,通过cluster state信息发现shard0的主分片在Node3节点上,Node1转发请求数据给Node3,Node3完成数据的索引
Node3并行转发(PUSH model replication)数据给分配有shard0的副本分片Node1和Node2上。当收到任一节点汇报副本分片数据写入成功以后,Node3即返回给初始的接受节点Node1,宣布数据写入成功。Node1成功返回给客户端。

Scale

scale up

Rebalance

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 查看shard分布情况
curl -XGET http://localhost:9200/_cat/shards
// 手工rebalance
curl-XPOST 'http://localhost:9200/_cluster/reroute' -d'{
"commands": [
{
"move": {
"index": "mylog-2016-02-08",
"shard": 6,
"from_node": "192.168.0.1",
"to_node": "192.168.0.2"
}
}
]}'

新节点加入过程

1
2
3
4
5
6
7
8
9
10
// get master node and eligible master nodes
for host = range discovery.zen.ping.unicast.hosts {
PingResponse = ZendPing.send(host)
}
send('internal:discovery/zen/join') to master
master.reply('internal:discovery/zen/join/validate')
// 2PC join
master.update(ClusterState) and broadcast to all nodes, and wait for minimum_master_nodes ack
ClusterState change committed and confirmation sent

discovery.zen.ping.unicast.hosts其实是种子,5个node配2个,也可以获取全局节点信息:知一点即知天下

Master fault detection

默认,每个node每1s ping master,ping_timeout=30s,ping_retries=3
失败后,会触发new master election

Concurrency

每个document有version,乐观锁

Consistency

  • quorum(default)
  • one
  • all

Shard

allocation

1
2
3
4
weightindex(node, index) = indexBalance * (node.numShards(index) – avgShardsPerNode(index))
weightnode(node, index) = shardBalance * (node.numShards() – avgShardsPerNode)
weightprimary(node, index) = primaryBalance * (node.numPrimaries() – avgPrimariesPerNode)
weight(node, index) = weightindex(node, index) + weightnode(node, index) + weightprimary(node, index)

如果计算最后的weight(node, index)大于threshold, 就会发生shard迁移。

在一个已经创立的集群里,shard的分布总是均匀的。但是当你扩容节点的时候,你会发现,它总是先移动replica shard到新节点。
这样就导致新节点全部分布的全是副本,主shard几乎全留在了老的节点上。

cluster.routing.allocation.balance参数,比较难找到合适的比例。

初始化

1
2
3
4
master通过ClusterState分配一个新shard
node初始化一个空shard,并notify master
master mark the shard as started
if this is the first shard with a specific id, it is marked as primary

Shrink Index

https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-shrink-index.html
并不是resharding,开始时可以创建很多shard(e,g. 30),用来灌数据,等数据好了,可以shrink到5
它只是hard link segments,执行时要先把index置为readonly
场景,例如tsdb,按照月份建索引,过了这个月,就可以shrink了

Consensus

zen discovery(unicast/multicast),存在脑裂问题,没有去解决,而是通过3台专用master机器,优化master逻辑,减少由于no reponse造成的partition可能性

1
2
node1, node2(master), node3
2-3不通,但1-2, 2-3都通,minimum_master_nodes=2,node3会重新选举自己成为master,而1同意了,2个master

默认ping_interval=1s ping_timeout=3s ping_retries=3 join_timeout=20*ping_interval

没有使用Paxos(zk)的原因:

This, to me, is at the heart of our approach to resiliency.
Zookeeper, an excellent product, is a separate external system, with its own communication layer.
By having the discovery module in Elasticsearch using its own infrastructure, specifically the communication layer, it means that we can use the “liveness” of the cluster to assess its health.
Operations happen on the cluster all the time, reads, writes, cluster state updates.
By tying our assessment of health to the same infrastructure, we can actually build a more reliable more responsive system.
We are not saying that we won’t have a formal Zookeeper integration for those who already use Zookeeper elsewhere. But this will be next to a hardened, built in, discovery module.

ClusterState

master负责更改,并广播到机器的每个节点,每个节点本地保存
如果有很多index,很多fields,很多shard,很多node,那么它会很大,因此它提供了增量广播机制和压缩

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
{
"cluster_name" : "elasticsearch",
"version" : 11,
"master_node" : "-mq1SRuuQoeEq-3S8SdHqw",
"blocks" : { },
"nodes" : {
"sIh5gQcFThCcz3SO6txvvQ" : {
"name" : "Max",
"transport_address" : "inet[/162.245.23.194:9301]",
"attributes" : { }
},
"-mq1SRuuQoeEq-3S8SdHqw" : {
"name" : "Llyron",
"transport_address" : "inet[/162.245.23.194:9300]",
"attributes" : { }
}
},
"metadata" : {
"templates" : { },
"indices" : {
"blog" : {
"state" : "open",
"settings" : {
"index" : {
"uuid" : "UQMz5vbXSBqFU_8U3u4gYQ",
"number_of_replicas" : "1",
"number_of_shards" : "5",
"version" : {
"created" : "1030099"
}
}
},
"mappings" : {
"user" : {
"properties" : {
"name" : {
"type" : "string"
}
}
}
},
"aliases" : [ ]
}
}
},
"routing_table" : {
"indices" : {
"blog" : {
"shards" : {
"4" : [ {
"state" : "STARTED",
"primary" : true,
"node" : "sIh5gQcFThCcz3SO6txvvQ",
"relocating_node" : null,
"shard" : 4,
"index" : "blog"
}, {
"state" : "STARTED",
"primary" : false,
"node" : "-mq1SRuuQoeEq-3S8SdHqw",
"relocating_node" : null,
"shard" : 4,
"index" : "blog"
} ],
"0" : [ {
"state" : "STARTED",
"primary" : true,
"node" : "sIh5gQcFThCcz3SO6txvvQ",
"relocating_node" : null,
"shard" : 0,
"index" : "blog"
}, {
"state" : "STARTED",
"primary" : false,
"node" : "-mq1SRuuQoeEq-3S8SdHqw",
"relocating_node" : null,
"shard" : 0,
"index" : "blog"
} ],
"3" : [ {
"state" : "STARTED",
"primary" : false,
"node" : "sIh5gQcFThCcz3SO6txvvQ",
"relocating_node" : null,
"shard" : 3,
"index" : "blog"
}, {
"state" : "STARTED",
"primary" : true,
"node" : "-mq1SRuuQoeEq-3S8SdHqw",
"relocating_node" : null,
"shard" : 3,
"index" : "blog"
} ],
"1" : [ {
"state" : "STARTED",
"primary" : false,
"node" : "sIh5gQcFThCcz3SO6txvvQ",
"relocating_node" : null,
"shard" : 1,
"index" : "blog"
}, {
"state" : "STARTED",
"primary" : true,
"node" : "-mq1SRuuQoeEq-3S8SdHqw",
"relocating_node" : null,
"shard" : 1,
"index" : "blog"
} ],
"2" : [ {
"state" : "STARTED",
"primary" : true,
"node" : "sIh5gQcFThCcz3SO6txvvQ",
"relocating_node" : null,
"shard" : 2,
"index" : "blog"
}, {
"state" : "STARTED",
"primary" : false,
"node" : "-mq1SRuuQoeEq-3S8SdHqw",
"relocating_node" : null,
"shard" : 2,
"index" : "blog"
} ]
}
}
}
},
"routing_nodes" : {
"unassigned" : [ ],
"nodes" : {
"sIh5gQcFThCcz3SO6txvvQ" : [ {
"state" : "STARTED",
"primary" : true,
"node" : "sIh5gQcFThCcz3SO6txvvQ",
"relocating_node" : null,
"shard" : 4,
"index" : "blog"
}, {
"state" : "STARTED",
"primary" : true,
"node" : "sIh5gQcFThCcz3SO6txvvQ",
"relocating_node" : null,
"shard" : 0,
"index" : "blog"
}, {
"state" : "STARTED",
"primary" : false,
"node" : "sIh5gQcFThCcz3SO6txvvQ",
"relocating_node" : null,
"shard" : 3,
"index" : "blog"
}, {
"state" : "STARTED",
"primary" : false,
"node" : "sIh5gQcFThCcz3SO6txvvQ",
"relocating_node" : null,
"shard" : 1,
"index" : "blog"
}, {
"state" : "STARTED",
"primary" : true,
"node" : "sIh5gQcFThCcz3SO6txvvQ",
"relocating_node" : null,
"shard" : 2,
"index" : "blog"
} ],
"-mq1SRuuQoeEq-3S8SdHqw" : [ {
"state" : "STARTED",
"primary" : false,
"node" : "-mq1SRuuQoeEq-3S8SdHqw",
"relocating_node" : null,
"shard" : 4,
"index" : "blog"
}, {
"state" : "STARTED",
"primary" : false,
"node" : "-mq1SRuuQoeEq-3S8SdHqw",
"relocating_node" : null,
"shard" : 0,
"index" : "blog"
}, {
"state" : "STARTED",
"primary" : true,
"node" : "-mq1SRuuQoeEq-3S8SdHqw",
"relocating_node" : null,
"shard" : 3,
"index" : "blog"
}, {
"state" : "STARTED",
"primary" : true,
"node" : "-mq1SRuuQoeEq-3S8SdHqw",
"relocating_node" : null,
"shard" : 1,
"index" : "blog"
}, {
"state" : "STARTED",
"primary" : false,
"node" : "-mq1SRuuQoeEq-3S8SdHqw",
"relocating_node" : null,
"shard" : 2,
"index" : "blog"
} ]
}
},
"allocations" : [ ]
}

Internal Data Structures and Algorithms

Roaring bitmap

T-Digest Percentiles

HDRHistogram Percentiles

Use Cases

github

之前用Solr,后来改为elasticsearch,运行在多个集群。
其中,存储代码的集群26个data node,8 coordinator node,每个data node有2TB SSD,510 shards with 2 replicas

References

https://www.elastic.co/blog/resiliency-elasticsearch
https://github.com/elastic/elasticsearch/issues/2488
http://blog.mikemccandless.com/2011/02/visualizing-lucenes-segment-merges.html
http://blog.trifork.com/2011/04/01/gimme-all-resources-you-have-i-can-use-them/
https://github.com/elastic/elasticsearch/issues/10708
https://github.com/blog/1397-recent-code-search-outages
https://speakerd.s3.amazonaws.com/presentations/d7c3effadc8146a7af6229d3b5640162/friday-colin-g-zach-t-all-about-es-algorithms-and-data-structiures-stage-c.pdf

Share Comments

Kafka vs Kinesis vs Redis

vs Kinesis

US East region,要支持10万/秒的吞吐量,Kinesis需要的费用是4787美元/月

https://www.quora.com/Amazon-Kinesis-versus-Apache-Kafka-which-of-them-is-the-most-proven-and-high-performance-oriented

vs Redis PubSub

  • redis OOM
  • kafka replay messages, horizontal scale, HA, async Pub
Share Comments

facebook Mystery Machine

Data Flow

consistent sampling

1
local log --> Scribe --> Hive --> UberTrace --> UI

Log Schema

1
2
3
4
request id
host id
host-local timestamp
unique event label(event name, task name)

Timestamp Normalize

  • 不考虑local clock skew
  • 假设client/server间的RTT是对称的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Client Server
1|------------>|
| |--+1.1
| | | logic
| |<-+1.2
2|<------------|
1.2 - 1.1 = 0.1
2 - 1 = 1.0
RTT = (1.0 - 0.1)/2 = 0.45
clock(1.1) = 1 + 0.45 = 1.45
clock(1.2) = 1.45 + 0.1 = 1.55
RTT是个经验值,根据大量的trace后稳定下来: 使用最小值
Share Comments

storm acker

Acker

对于Spout产生的每一个tuple, storm都会进行跟踪,利用RotatingMap存放内存,但有保护措施,不会打爆

当Spout触发fail动作时,storm不会自动重发失败的tuple,只是向Spout发送fail消息,触发Spout.fail回调,真正的重发需要在Spout.fail里实现

tuple tree,中间任意一个edge fail,会理解触发Spout的fail,但后面的Bolt的执行不受影响。做无用功?

backtype.storm.daemon.acker

Spout Executor

spout executor

Bolt Executor

bolt executor

Share Comments

monkey patch golang

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package main
import (
"syscall"
"unsafe"
)
func a() int { return 1 }
func b() int { return 2 }
func getPage(p uintptr) []byte {
return (*(*[0xFFFFFF]byte)(unsafe.Pointer(p & ^uintptr(syscall.Getpagesize()-1))))[:syscall.Getpagesize()]
}
func rawMemoryAccess(b uintptr) []byte {
return (*(*[0xFF]byte)(unsafe.Pointer(b)))[:]
}
func assembleJump(f func() int) []byte {
funcVal := *(*uintptr)(unsafe.Pointer(&f))
return []byte{
0x48, 0xC7, 0xC2,
byte(funcVal >> 0),
byte(funcVal >> 8),
byte(funcVal >> 16),
byte(funcVal >> 24), // MOV rdx, funcVal
0xFF, 0x22, // JMP rdx
}
}
func replace(orig, replacement func() int) {
bytes := assembleJump(replacement)
functionLocation := **(**uintptr)(unsafe.Pointer(&orig))
window := rawMemoryAccess(functionLocation)
page := getPage(functionLocation)
syscall.Mprotect(page, syscall.PROT_READ|syscall.PROT_WRITE|syscall.PROT_EXEC)
copy(window, bytes)
}
func main() {
replace(a, b)
print(a())
}

Reference

https://software.intel.com/en-us/articles/introduction-to-x64-assembly
https://www.hopperapp.com/

Share Comments

KafkaSpout

config

Topology config

  • TOPOLOGY_WORKERS
    • 整个topology在所有节点上的java进程总数
    • 例如,设置成25,parallelism=150,那么每个worker进程会创建150/25=6个线程执行task
  • TOPOLOGY_ACKER_EXECUTORS = 20
    • 不设或者为null,it=TOPOLOGY_WORKERS,即one acker task per worker
    • 设置为0,表示turn off ack/reliability
  • TOPOLOGY_MAX_SPOUT_PENDING = 5000

    1
    2
    3
    (defn executor-max-spout-pending [storm-conf num-tasks]
    (let [p (storm-conf TOPOLOGY-MAX-SPOUT-PENDING)]
    (if p (* p num-tasks))))
    • max in-flight(not ack or fail) spout tuples on a single spout task at once
    • 如果不指定,默认是1
  • TOPOLOGY_BACKPRESSURE_ENABLE = false

  • TOPOLOGY_MESSAGE_TIMEOUT_SECS
    30s by default

KafkaSpout config

1
2
3
fetchSizeBytes = 1024 * 1024 * 2 // 1048576=1M by default FetchRequest
fetchMaxWait = 10000 // by default
forceFromStart = false

emit/ack/fail flow

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
class PartitionManager {
SortedSet<Long> _pending, failed = new TreeSet();
LinkedList<MessageAndRealOffset> _waitingToEmit = new LinkedList();
func EmitState next(SpoutOutputCollector collector) {
if (this._waitingToEmit.isEmpty()) {
// 如果内存里数据都发出,就调用kafka consumer一次性批量填充内存_waitingToEmit
// 填充时,如果发现failed里有东西,那么就从head of failed(offset) FetchRequest: 重发机制
this.fill();
}
// 从LinkedList _waitingToEmit里取一条消息
MessageAndRealOffset toEmit = (MessageAndRealOffset)this._waitingToEmit.pollFirst();
// emit时指定了messageID
// BasicBoltExecutor.execute会通过template method自动执行_collector.getOutputter().ack(input)
// 即KafkaSpout.ack -> PartitionManager.ack
collector.emit(tup, new PartitionManager.KafkaMessageId(this._partition, toEmit.offset));
// 该tuple处于pending state
}
// Note: a tuple will be acked or failed by the exact same Spout task that created it
func ack(Long offset) {
this._pending.remove(offset)
}
func fail(Long offset) {
this.failed.add(offset);
// kafka consumer会reset offset to the failed msg,重新消费
}
}
class TopologyBuilder {
public BoltDeclarer setBolt(String id, IBasicBolt bolt, Number parallelism_hint) {
return setBolt(id, new BasicBoltExecutor(bolt), parallelism_hint);
}
}
class BasicBoltExecutor {
public void execute(Tuple input) {
_collector.setContext(input);
try {
_bolt.execute(input, _collector);
_collector.getOutputter().ack(input);
} catch(FailedException e) {
_collector.getOutputter().fail(input);
}
}
}

Bolt ack

KafkaSpout产生的每个tuple,Bolt必须进行ack,否则30s后KafkaSpout会认为emitted tuple tree not fully processed,进行重发

1
2
3
4
5
6
class MyBolt {
public void execute(Tuple tuple) {
_collector.emit(new Values(foo, bar))
_collector.ack(tuple)
}
}

OOM

如果消息处理一直不ack,累计的unacked msg越来越多,会不会OOM?
NO
KafkaSpout只保留offset,不会保存每条emitted but no ack/fail msg

spout throttle

1.0.0之前,只能用TOPOLOGY_MAX_SPOUT_PENDING控制
但这个参数很难控制,它有一些与其他参数配合使用才能生效的机制,而且如果使用Trident语义又完全不同
1.0.0之后,可以通过backpressure
backpressure

Storm messaging

  • intra-worker
    Disruptor
  • inter-worker
    0MQ/Netty

storm messaing

References

http://www.michael-noll.com/blog/2013/06/21/understanding-storm-internal-message-buffers/
http://jobs.one2team.com/apache-storms/
http://woodding2008.iteye.com/blog/2335673

Share Comments

aliware

https://www.aliyun.com/aliware

  • EDAS
    • Enterprise Distributed Application Service
    • RPC framework + elasticjob + qconf + GTS + Dapper + autoscale
    • 鉴权数据下放到服务机器,避免性能瓶颈
  • MQ
  • DRDS
    • TDDL proxy
    • avg() => sum()/count()
    • 扩容,切换时client可能会失败,需要client retry
    • 通过/ TDDL: XXX/ sql注释来实现特定SQL路由规则
    • 可以冷热数据分离
  • ARMS
    streaming processing,具有nifi式visualized workflow editor
  • GTS

    1
    2
    3
    4
    5
    6
    7
    @GtsTransaction(timeout=1000*60)
    public void transfer(DataSource db1, DataSource db2) {
    // 强一致,但涉及的事务比较大时,性能下降非常明显
    // 通过soft state MQ的最终一致性吞吐量要好的多
    db1.getConnection().execute("sql1")
    db2.getConnection().execute("sql2")
    }
  • SchedulerX

  • CSB
    cloud service bus,相当于api gateway,包含协议转换
Share Comments

TiDB KV Mapping

Data

1
2
Key: tablePrefix_rowPrefix_tableID_rowID
Value: [col1, col2, col3, col4]

Unique Index

1
2
Key: tablePrefix_idxPrefix_tableID_indexID_indexColumnsValue
Value: rowID

Non-Unique Index

1
2
Key: tablePrefix_idxPrefix_tableID_indexID_ColumnsValue_rowID
Value: nil

1
2
3
4
5
var (
tablePrefix = []byte{'t'}
recordPrefixSep = []byte("_r")
indexPrefixSep = []byte("_i")
)
Share Comments

SSD vs HDD

http://www.pcgamer.com/hard-drive-vs-ssd-performance/2/

1
2
3
RandRead RandWrite SeqRead SeqWrite
SSD 50 200 2300 1300
HDD 0.6 1.0 200 130
Share Comments

kafka internals

Overview

overview

Controller

负责

  • leadership change of a partition
    each partition leader can independently update ISR
  • new topics; deleted topics
  • replica re-assignment

曾经的设计是没有controller,每个broker要决策时都通过zk,加入controller坏处就是要实现controller failover

1
2
3
4
5
6
7
class KafkaController {
partitionStateMachine PartitionStateMachine
replicaStateMachine ReplicaStateMachine
// controller选举,并LeaderChangeListener
controllerElector ZookeeperLeaderElector
}

ControllerContext

一个全局变量。在选举为controller时,一次性从zk里读入所有状态信息KafkaController.initializeControllerContext

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class ControllerContext {
epoch, epochZkVersion, correlationId
controllerChannelManager // 维护controller与每个broker之间的socket conn
// 初始化时从/brokers/ids里取得的,并在BrokerChangeListener里修改
liveBrokersUnderlying: Set[Broker]
// 初始化时从/brokers/topics里取得的,并在TopicChangeListener里修改
allTopics: Set[String]
// 初始化时从/brokers/topics/$topic里一一取得,并在如下情况下修改
// - assignReplicasToPartitions.assignReplicasToPartitions
// - updateAssignedReplicasForPartition
// - TopicChangeListener
// - ReplicaStateMachine.handleStateChange
partitionReplicaAssignment: mutable.Map[TopicAndPartition, Seq[Int]] // AR
// 初始化时从/brokers/topics/$topic/$partitionID/state一一取得,如下情况下修改
// - PartitionStateMachine.initializeLeaderAndIsrForPartition
// - PartitionStateMachine.electLeaderForPartition
partitionLeadershipInfo: mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch]
}

ControllerChannelManager

controller为每个broker建立一个socket(BlockingChannel)连接和一个thread用于从内存batch取request进行逐条socket send/receive
e,g. 1个8节点的机器,controller上为这部分工作,需要建立(7个线程 + 7个socket conn)
与每个broker的连接是并发的,互不干扰;对于某一个broker,请求是完全串行的

config
  • controller.socket.timeout.ms
    default 30s, socket connect/io timeout
broker shutdown/startup/crash?

zk.watch(“/brokers/ids”), BrokerChangeListener会调用ControllerChannelManager.addBroker/removeBroker

conn broken/timeout?

如果socket send/receive失败,那么自动重连重发,backoff=300ms,死循环
但如果broker长时间无法reach,它会触发zk.watch(“/brokers/ids”),removeBroker,死循环退出

ControllerBrokerRequestBatch

controller -> broker,这里的圣旨请求有3种,都满足幂等性

  • LeaderAndIsrRequest
    replicaManager.becomeLeaderOrFollower
  • UpdateMetadataRequest
    replicaManager.maybeUpdateMetadataCache
  • StopReplicaRequest
    删除topic

onBrokerStartup

由BrokerChangeListener触发

1
2
3
4
5
6
7
sendUpdateMetadataRequest(newBrokers)
// 告诉它它上面的所有partitions
replicaStateMachine.handleStateChanges(allReplicasOnNewBrokers, OnlineReplica)
// 让所有的NewPartition/OfflinePartition进行leader election
partitionStateMachine.triggerOnlinePartitionStateChange()

onBrokerFailure

找出受影响的状态,并触发partitionStateMachine、replicaStateMachine的状态切换

StateMachine

只有controller那台机器的state machine才会启动

terms

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Replica {
broker_id int
partition Partition // a Replica belongs to a Partition
log Log
hw, leo long
isLeader bool
}
Partition {
topic string
partition_id int
leader Replica
ISR Set[Replica]
AR Set[Replica] // assigned replicas
zkVersion long // for CAS
}

PartitionStateMachine

每个partition的状态,负责分配、选举partion的leader

1
2
3
4
5
6
7
8
9
10
11
class PartitionStateMachine {
// - NonExistentPartition
// - NewPartition
// - OnlinePartition
// - OfflinePartition
partitionState: mutable.Map[TopicAndPartition, PartitionState] = mutable.Map.empty
topicChangeListener // childChanges("/brokers/topics")
addPartitionsListener // all.dataChanges("/brokers/topics/$topic")
deleteTopicsListener // childChanges("/admin/delete_topics")
}

  • PartitionLeaderSelector.selectLeader
    • OfflinePartitionLeaderSelector
    • ReassignedPartitionLeaderSelector
    • PreferredReplicaPartitionLeaderSelector
    • ControlledShutdownLeaderSelector

ReplicaStateMachine

每个partition在assigned replic(AR)上的状态,track每个broker的存活

1
2
3
4
5
6
7
8
9
10
class ReplicaStateMachine {
replicaState: mutable.Map[PartitionAndReplica, ReplicaState] = mutable.Map.empty
brokerRequestBatch = new ControllerBrokerRequestBatch
// 只有controller关心每个broker的存活,broker自己是不关心的
// 而且broker die只有2种标准,不会因为conn broken认为broker死
// 1. broker id ephemeral znode deleted
// 2. broker controlled shutdown
brokerChangeListener // childChanges("/brokers/ids")
}

replica state transition

startup

对所有replica,如果其对应的broker活,那么就置为OnlineReplica,否则ReplicaDeletionIneligible

state transition
  • 校验当前状态与目标状态
  • 维护内存replicaState
  • 必要时通过brokerRequestBatch广播给所有broker

Failover

broker failover

每个broker有KafkaHealthcheck,它向/brokers/ids/$id这个ephemeral znode写数据,session expire就会重写,其中timestamp置为当前时间

1
{"jmx_port":-1,"timestamp":"1460677527837","host":"10.1.1.1","version":1,"port":9002}

controller的BrokerChangeListener监视所有broker的存活
参考 onBrokerStartup, onBrokerFailure

leader failover

一个partition的leader broker crash了,controlle选举出新的leader后,该new leader的LEO会成为新的HW
leader负责维护/propogate HW,以及每个follower的LEO

存在一个partition多Leader脑裂

1
2
3
4
5
partition0 broker(A, B, C),A是leader
A GC很久,crontroller认为A死,让B成为leader,写zk ISR znode,正在此时A活了但还没有收到controller发来的RPC,此时A、B都是leader
如果client1连接A,clientB连接B,他们都发消息?
A活过来的时候,A认为它的ISR=(A,B,C),clientA发的消息commit条件是A,B,C都ack by fetch request才可以,但B在promoted to leader后会先stop fetching from previous leader。
因此,A只有shrink ISR后才可能commit消息,但shrink时它会写zk,通过CAS失败,此时A意识到它已经不是leader

controller failover

controller session expire example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
broker1,是controller
在sessionTimeout*2/3=4s内还没有收到response,就会try next zk server
zk connected,发现session(0x35c1876461ec9d3)已经expire了
触发KafkaController.SessionExpirationListener {
onControllerResignation()
brokerState设置为RunningAsBroker // 之前是RunningAsController
controllerElector.elect {
switch write("/controller") {
case ok:
onBecomingLeader {
// 1 successfully elected as leader
// Broker 1 starting become controller state transition
controllerEpoch.increment()
register listeners
replicaStateMachine.startup()
partitionStateMachine.startup()
brokerState = RunningAsController
sendUpdateMetadataRequest(to all live brokers)
}
case ZkNodeExistsException:
read("/controller") and get leaderID
}
}
}
broker0:
ZookeeperLeaderElector.LeaderChangeListener被触发,它watch("/controller") {
elect
}

Misc

  • ReplicationUtils.updateLeaderAndIsr

References

https://issues.apache.org/jira/browse/KAFKA-1460
https://cwiki.apache.org/confluence/display/KAFKA/kafka+Detailed+Replication+Design+V3
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals

Share Comments