Reed-Solomon erasure code

Intro

纠错码在RAID、备份、冷数据、历史数据存储方面使用广泛
也有人利用它把一份数据分散到多个cloud provider(e,g. S3,Azure,Rackspace),消除某个供应商的依赖: cloud of cloud

Usage

一份文件,大小x,分成n个数据块+k个校验块,能容忍任意k个数据块或者校验块错误,即至少要n个块是有效的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
$encode -data 4 -par 2 README.md # n=4 k=2
Opening README.md
File split into 6 data+parity shards with 358 bytes/shard.
Writing to README.md.0
Writing to README.md.1
Writing to README.md.2
Writing to README.md.3
Writing to README.md.4 # README.md.4 and README.md5 are parity shards
Writing to README.md.5
$rm -f README.md.4 README.md.5 # remove the 2 parity shards
$decode -out x README.md
Opening README.md.0
Opening README.md.1
Opening README.md.2
Opening README.md.3
Opening README.md.4
Error reading file open README.md.4: no such file or directory
Opening README.md.5
Error reading file open README.md.5: no such file or directory
Verification failed. Reconstructing data
Writing data to x
$diff x README.md
$ # all the same

Algorithm

文件内容‘ABCDEFGHIJKLMNOP’,4+2
给定n和k,encoding矩阵是不变的

coding 4=>4+2
deocde after lost 2 shards

References

http://pages.cs.wisc.edu/~yadi/papers/yadi-infocom2013-paper.pdf

Share Comments

kafka ext4

big file

我目前使用ext4作为kafka存储,每个segment 1GB

Ext4针对大文件顺序访问的主要优化

  • replace indirect blocks with extent(盘区)

    • ext3采用多层间接地址映射,操作大文件时产生很多随机访问
      • one extra block read(seek) every 1024 blocks(4MB)
      • ext2/3
    • ext4 extent是一组连续的数据块

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      +---------+--------+----------+
      | logical | length | physical |
      +---------+--------+----------+
      + 0 | 1000 | 200 |
      +---------+--------+----------+
      type ext4_extent struct {
      block uint32 // first logical block extent covers
      len uint16 // number of blocks covered by extent
      start_hi uint16 // high 16 bits of physical block
      start uint32 // low 32 bits of physical block
      }

      ext4

  • inode prealloc
    inode有很好的locality,同一目录下文件的inode尽量存放一起,加速了目录寻址性能
  • Multiblock Allocator(MBAlloc)
    • ext3每次只能分配一个4KB(block size)的block
    • ext4支持一次性分配多个block
  • 延迟分配
    defer block allocation to writeback time
  • online defregmentation e4defrag
    1
    2
    3
    4
    5
    allocate more contiguous blocks in a temporary inode
    read a data block from the original inode
    move the corresponding block number from the temporary inode to the original inode
    write out the page

Benchmark

benchmark

Summary

ext3 vs ext4

http://www.linux-mag.com/id/7271/

Share Comments

fs atomic and ordering

应用层通过POSIX系统调用接口访问文件系统,但POSIX只规定了结果,底层的guarantee并没有提供,开发者只能去猜。
不同的文件系统的实现不同,那些不“可见”的部分,会造成很大的差异;同时,每个文件系统又有些配置,这些配置也造成理解的困难。

Example

1
2
write(f1, "pp")
write(f2, "qq")

如果write不是原子的,那么可能造成文件大小变化了,但内容没变(State#A)
乱序造成State#C
fs

  • size atomicity
  • content atomicity
  • calls out of order

Facts

persistence

atomicity

  • atomic single-sector overwrite
    目前绝大部分文件系统提供了atomic single-sector overwrite,主要是底层硬盘就已经提供了
  • atomic append
    • 需要原子地修改inode+data block
    • ext3/ext4/reiserfs writeback不支持
  • multi-block append
    目前绝大部分文件系统没有支持
  • rename/link
    这类directory operation,基本上是原子的

ordering

Share Comments

Wake on LAN

WOL是个链路层协议,通过发送UDP广播 magic packet远程power up
BIOS可以打开/关闭该功能

1
2
3
4
type MagicPacket struct {
header [6]byte // 0xFF
payload [16]MACAddress
}

在facebook数据中心,开辟了一块单独的“冷存储”,专门保存不再查看的照片、视频(通常都是10年前的),由于
meta/data分离,用户想看,会wake这些存储设备

Share Comments

Swinging Door Trending

SDT旋转门流式压缩,有损算法,对于变化频率不高的时间序列数据,压缩比很高,但可能丢到峰值、波谷数据
通过一条由起点和终点确定的直线代替一系列连续数据点
通过线性差值“还原”数据,pref sample <–插值–> next sample

用a点到e点之间的直线代替数据点(a,b,c,d,e)

vs facebook gorilla
http://www.vldb.org/pvldb/vol8/p1816-teller.pdf

Share Comments

socket recv buffer

SO_RCVBUF

1
BDP * 4 / 3

use iperf for testing

1
2
iperf -s -w 200K # server
iperf -c localhost # client

iperf

1
2
3
4
5
6
ethtool -k eth0 # segment offload makes tcpdump see a packet bigger than MTU
net.ipv4.tcp_available_congestion_control
net.ipv4.tcp_congestion_control
net.ipv4.tcp_moderate_rcvbuf
net.ipv4.tcp_slow_start_after_idle
Share Comments

golang pkg github pull request

1
2
3
4
5
git remote add upstream https://github.com/funkygao/myrepo.git
git fetch upstream
git checkout -b mybranch
git commit -a
git push upstream mybranch
Share Comments

ElasticSearch internals

Basics

Storage Model

一种特殊的LSM Tree

1
2
3
4
5
6
ES LSM
======== ==========
translog WAL
buffer MemTable
segment SSTable
merge merge & compaction

Defaults

1
2
3
4
5
6
node.master: true
node.data: true
index.number_of_shards: 5
index.number_of_replicas: 1
path.data: /path/to/data1,/path/to/data2
http.max_content_length: 100mb

Best Practice

内存,一半给ES,一半给pagecache

Indexing

write

  • translog(WAL)

    • 每个shard一个translog,即每个index一个translog
    • 它提供实时的CRUD by docID,先translog然后再segments
    • 默认5s刷盘
    • translog存放在path.data,可以stripe,但存在与segments竞争资源问题
      1
      2
      3
      4
      $ls nodes/0/indices/twitter/1/
      drwxr-xr-x 3 funky wheel 102B 6 8 17:23 _state
      drwxr-xr-x 5 funky wheel 170B 6 8 17:23 index
      drwxr-xr-x 3 funky wheel 102B 6 8 17:23 translog
  • refresh

    • on segments
    • scheduled periodically(1s),也可手工 /_refresh
    • 在此触发merge逻辑
    • refresh后,那么在此之前的所有变更就可以搜索了,in-memory buffer的数据是不能搜索的
    • refresh
      • 产生新segment,但不fsync(close不会触发fsync),然后open(the new segment)
      • 不保证durability,那是由flush保证的
      • in-memory buffer清除
  • flush
    • on translog
    • 30s/200MB/5000 ops by default,会触发commit
    • Any docs in the in-memory buffer are written to a new segment
    • The buffer is cleared
    • A commit point is written to disk
      commit point is a file that contains list of segments ready for search
    • The filesystem cache is flushed with an fsync
    • The old translog is deleted
  • commit
    • 把还没有fsync的segments,一一fsync
    • commit
  • merge
    • merge
    • policy
      • tiered(default)
      • log_byte_size
      • log_doc
    • 可能把committed和uncommitted segments一起merge
    • 异步,不影响indexing/query
    • 合并后,fsync the merged big segment,之前的small segments被删除
    • ES内部有throttle机制控制merge进度,防止它占用过多资源: 20MB/s
  • update
    delete, then insert。mysql内部的变长字段update也是这么实现的

Hard commit

默认30分钟,或者translog size(200MB),whoever come early

1
2
3
4
5
6
7
8
把in-memory buffer内容生成一个新segment,并把buffer清空
把每个还没有commit point的segment进行fsync
生成commit point file
IndexSearcher is opened and all documents are searchable
translog.empty()
translog.sequence ++
// 如果机器突然停电,可以完全恢复

Merge

  • Happens in parallel to searching. Searcher is changed to new segment.
  • Deleting a document creates a new document and .del file to keep track that document is deleted
  • 由于一个document可能多次update,在不同的segment可能出现多次delete

Replication

primary-backup model

master的indexing过程

1
2
3
validate indexing request
local indexing
concurrently dispatch the indexing request to all replicas

Query

read

1
2
3
4
translog.search(q)
for s in range segments {
s.search(q)
}

deep pagination

search(from=50000, size=10),那么每个shard会创建一个优先队列,队列大小=50010,从每个segment里取结果,直到填满
而coordinating node需要创建的优先队列 number_of_shards * 50010

利用scan+scroll可以批量取数据(sorting disabled),例如在reindex时使用

Cluster

Intro

所有node彼此相连,n*(n-1)个连接
shard = MurMurHash3(document_id) % (num_of_primary_shards)

Node1节点用数据的_id计算出数据应该存储在shard0上,通过cluster state信息发现shard0的主分片在Node3节点上,Node1转发请求数据给Node3,Node3完成数据的索引
Node3并行转发(PUSH model replication)数据给分配有shard0的副本分片Node1和Node2上。当收到任一节点汇报副本分片数据写入成功以后,Node3即返回给初始的接受节点Node1,宣布数据写入成功。Node1成功返回给客户端。

Scale

scale up

Rebalance

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 查看shard分布情况
curl -XGET http://localhost:9200/_cat/shards
// 手工rebalance
curl-XPOST 'http://localhost:9200/_cluster/reroute' -d'{
"commands": [
{
"move": {
"index": "mylog-2016-02-08",
"shard": 6,
"from_node": "192.168.0.1",
"to_node": "192.168.0.2"
}
}
]}'

新节点加入过程

1
2
3
4
5
6
7
8
9
10
// get master node and eligible master nodes
for host = range discovery.zen.ping.unicast.hosts {
PingResponse = ZendPing.send(host)
}
send('internal:discovery/zen/join') to master
master.reply('internal:discovery/zen/join/validate')
// 2PC join
master.update(ClusterState) and broadcast to all nodes, and wait for minimum_master_nodes ack
ClusterState change committed and confirmation sent

discovery.zen.ping.unicast.hosts其实是种子,5个node配2个,也可以获取全局节点信息:知一点即知天下

Master fault detection

默认,每个node每1s ping master,ping_timeout=30s,ping_retries=3
失败后,会触发new master election

Concurrency

每个document有version,乐观锁

Consistency

  • quorum(default)
  • one
  • all

Shard

allocation

1
2
3
4
weightindex(node, index) = indexBalance * (node.numShards(index) – avgShardsPerNode(index))
weightnode(node, index) = shardBalance * (node.numShards() – avgShardsPerNode)
weightprimary(node, index) = primaryBalance * (node.numPrimaries() – avgPrimariesPerNode)
weight(node, index) = weightindex(node, index) + weightnode(node, index) + weightprimary(node, index)

如果计算最后的weight(node, index)大于threshold, 就会发生shard迁移。

在一个已经创立的集群里,shard的分布总是均匀的。但是当你扩容节点的时候,你会发现,它总是先移动replica shard到新节点。
这样就导致新节点全部分布的全是副本,主shard几乎全留在了老的节点上。

cluster.routing.allocation.balance参数,比较难找到合适的比例。

初始化

1
2
3
4
master通过ClusterState分配一个新shard
node初始化一个空shard,并notify master
master mark the shard as started
if this is the first shard with a specific id, it is marked as primary

Shrink Index

https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-shrink-index.html
并不是resharding,开始时可以创建很多shard(e,g. 30),用来灌数据,等数据好了,可以shrink到5
它只是hard link segments,执行时要先把index置为readonly
场景,例如tsdb,按照月份建索引,过了这个月,就可以shrink了

Consensus

zen discovery(unicast/multicast),存在脑裂问题,没有去解决,而是通过3台专用master机器,优化master逻辑,减少由于no reponse造成的partition可能性

1
2
node1, node2(master), node3
2-3不通,但1-2, 2-3都通,minimum_master_nodes=2,node3会重新选举自己成为master,而1同意了,2个master

默认ping_interval=1s ping_timeout=3s ping_retries=3 join_timeout=20*ping_interval

没有使用Paxos(zk)的原因:

This, to me, is at the heart of our approach to resiliency.
Zookeeper, an excellent product, is a separate external system, with its own communication layer.
By having the discovery module in Elasticsearch using its own infrastructure, specifically the communication layer, it means that we can use the “liveness” of the cluster to assess its health.
Operations happen on the cluster all the time, reads, writes, cluster state updates.
By tying our assessment of health to the same infrastructure, we can actually build a more reliable more responsive system.
We are not saying that we won’t have a formal Zookeeper integration for those who already use Zookeeper elsewhere. But this will be next to a hardened, built in, discovery module.

ClusterState

master负责更改,并广播到机器的每个节点,每个节点本地保存
如果有很多index,很多fields,很多shard,很多node,那么它会很大,因此它提供了增量广播机制和压缩

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
{
"cluster_name" : "elasticsearch",
"version" : 11,
"master_node" : "-mq1SRuuQoeEq-3S8SdHqw",
"blocks" : { },
"nodes" : {
"sIh5gQcFThCcz3SO6txvvQ" : {
"name" : "Max",
"transport_address" : "inet[/162.245.23.194:9301]",
"attributes" : { }
},
"-mq1SRuuQoeEq-3S8SdHqw" : {
"name" : "Llyron",
"transport_address" : "inet[/162.245.23.194:9300]",
"attributes" : { }
}
},
"metadata" : {
"templates" : { },
"indices" : {
"blog" : {
"state" : "open",
"settings" : {
"index" : {
"uuid" : "UQMz5vbXSBqFU_8U3u4gYQ",
"number_of_replicas" : "1",
"number_of_shards" : "5",
"version" : {
"created" : "1030099"
}
}
},
"mappings" : {
"user" : {
"properties" : {
"name" : {
"type" : "string"
}
}
}
},
"aliases" : [ ]
}
}
},
"routing_table" : {
"indices" : {
"blog" : {
"shards" : {
"4" : [ {
"state" : "STARTED",
"primary" : true,
"node" : "sIh5gQcFThCcz3SO6txvvQ",
"relocating_node" : null,
"shard" : 4,
"index" : "blog"
}, {
"state" : "STARTED",
"primary" : false,
"node" : "-mq1SRuuQoeEq-3S8SdHqw",
"relocating_node" : null,
"shard" : 4,
"index" : "blog"
} ],
"0" : [ {
"state" : "STARTED",
"primary" : true,
"node" : "sIh5gQcFThCcz3SO6txvvQ",
"relocating_node" : null,
"shard" : 0,
"index" : "blog"
}, {
"state" : "STARTED",
"primary" : false,
"node" : "-mq1SRuuQoeEq-3S8SdHqw",
"relocating_node" : null,
"shard" : 0,
"index" : "blog"
} ],
"3" : [ {
"state" : "STARTED",
"primary" : false,
"node" : "sIh5gQcFThCcz3SO6txvvQ",
"relocating_node" : null,
"shard" : 3,
"index" : "blog"
}, {
"state" : "STARTED",
"primary" : true,
"node" : "-mq1SRuuQoeEq-3S8SdHqw",
"relocating_node" : null,
"shard" : 3,
"index" : "blog"
} ],
"1" : [ {
"state" : "STARTED",
"primary" : false,
"node" : "sIh5gQcFThCcz3SO6txvvQ",
"relocating_node" : null,
"shard" : 1,
"index" : "blog"
}, {
"state" : "STARTED",
"primary" : true,
"node" : "-mq1SRuuQoeEq-3S8SdHqw",
"relocating_node" : null,
"shard" : 1,
"index" : "blog"
} ],
"2" : [ {
"state" : "STARTED",
"primary" : true,
"node" : "sIh5gQcFThCcz3SO6txvvQ",
"relocating_node" : null,
"shard" : 2,
"index" : "blog"
}, {
"state" : "STARTED",
"primary" : false,
"node" : "-mq1SRuuQoeEq-3S8SdHqw",
"relocating_node" : null,
"shard" : 2,
"index" : "blog"
} ]
}
}
}
},
"routing_nodes" : {
"unassigned" : [ ],
"nodes" : {
"sIh5gQcFThCcz3SO6txvvQ" : [ {
"state" : "STARTED",
"primary" : true,
"node" : "sIh5gQcFThCcz3SO6txvvQ",
"relocating_node" : null,
"shard" : 4,
"index" : "blog"
}, {
"state" : "STARTED",
"primary" : true,
"node" : "sIh5gQcFThCcz3SO6txvvQ",
"relocating_node" : null,
"shard" : 0,
"index" : "blog"
}, {
"state" : "STARTED",
"primary" : false,
"node" : "sIh5gQcFThCcz3SO6txvvQ",
"relocating_node" : null,
"shard" : 3,
"index" : "blog"
}, {
"state" : "STARTED",
"primary" : false,
"node" : "sIh5gQcFThCcz3SO6txvvQ",
"relocating_node" : null,
"shard" : 1,
"index" : "blog"
}, {
"state" : "STARTED",
"primary" : true,
"node" : "sIh5gQcFThCcz3SO6txvvQ",
"relocating_node" : null,
"shard" : 2,
"index" : "blog"
} ],
"-mq1SRuuQoeEq-3S8SdHqw" : [ {
"state" : "STARTED",
"primary" : false,
"node" : "-mq1SRuuQoeEq-3S8SdHqw",
"relocating_node" : null,
"shard" : 4,
"index" : "blog"
}, {
"state" : "STARTED",
"primary" : false,
"node" : "-mq1SRuuQoeEq-3S8SdHqw",
"relocating_node" : null,
"shard" : 0,
"index" : "blog"
}, {
"state" : "STARTED",
"primary" : true,
"node" : "-mq1SRuuQoeEq-3S8SdHqw",
"relocating_node" : null,
"shard" : 3,
"index" : "blog"
}, {
"state" : "STARTED",
"primary" : true,
"node" : "-mq1SRuuQoeEq-3S8SdHqw",
"relocating_node" : null,
"shard" : 1,
"index" : "blog"
}, {
"state" : "STARTED",
"primary" : false,
"node" : "-mq1SRuuQoeEq-3S8SdHqw",
"relocating_node" : null,
"shard" : 2,
"index" : "blog"
} ]
}
},
"allocations" : [ ]
}

Internal Data Structures and Algorithms

Roaring bitmap

T-Digest Percentiles

HDRHistogram Percentiles

Use Cases

github

之前用Solr,后来改为elasticsearch,运行在多个集群。
其中,存储代码的集群26个data node,8 coordinator node,每个data node有2TB SSD,510 shards with 2 replicas

References

https://www.elastic.co/blog/resiliency-elasticsearch
https://github.com/elastic/elasticsearch/issues/2488
http://blog.mikemccandless.com/2011/02/visualizing-lucenes-segment-merges.html
http://blog.trifork.com/2011/04/01/gimme-all-resources-you-have-i-can-use-them/
https://github.com/elastic/elasticsearch/issues/10708
https://github.com/blog/1397-recent-code-search-outages
https://speakerd.s3.amazonaws.com/presentations/d7c3effadc8146a7af6229d3b5640162/friday-colin-g-zach-t-all-about-es-algorithms-and-data-structiures-stage-c.pdf

Share Comments

Kafka vs Kinesis vs Redis

vs Kinesis

US East region,要支持10万/秒的吞吐量,Kinesis需要的费用是4787美元/月

https://www.quora.com/Amazon-Kinesis-versus-Apache-Kafka-which-of-them-is-the-most-proven-and-high-performance-oriented

vs Redis PubSub

  • redis OOM
  • kafka replay messages, horizontal scale, HA, async Pub
Share Comments

facebook Mystery Machine

Data Flow

consistent sampling

1
local log --> Scribe --> Hive --> UberTrace --> UI

Log Schema

1
2
3
4
request id
host id
host-local timestamp
unique event label(event name, task name)

Timestamp Normalize

  • 不考虑local clock skew
  • 假设client/server间的RTT是对称的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Client Server
1|------------>|
| |--+1.1
| | | logic
| |<-+1.2
2|<------------|
1.2 - 1.1 = 0.1
2 - 1 = 1.0
RTT = (1.0 - 0.1)/2 = 0.45
clock(1.1) = 1 + 0.45 = 1.45
clock(1.2) = 1.45 + 0.1 = 1.55
RTT是个经验值,根据大量的trace后稳定下来: 使用最小值
Share Comments