RCFile, Parquet, Dremel

Overview

hadoop的存储格式:TextFile, SequenceFile, Hive优化的RCFile, 类似google dremel的Parquet

Data placement for MapReduce

  • row store
  • column store
  • hybrid PAX store

RCFile - Record Columnar File

一种PAX的实现:Table先水平切分成多个row group,每个row group再垂直切分到不同的column
这样保证一个row的数据都位于一个block(避免一个row要读取多个block),列与列的数据在磁盘上是连续的存储块

table = RCFile -> block -> row group -> (sync marker, metadata header(RLE), column store(gzip))

rcfile

For a table, all row groups have the same size.

ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java

Usage

1
2
3
4
5
6
7
8
CREATE TABLE demo_t (
datatime string,
section string,
domain string,
province string,
city string,
idc string)
STORED AS RCFILE;

ORCFile

Parquet

References

http://web.cse.ohio-state.edu/hpcs/WWW/HTML/publications/papers/TR-11-4.pdf
http://www.ece.eng.wayne.edu/~sjiang/ECE7650-winter-14/Topic3-RCFile.pdf

Share Comments

bitcask

A Log-Structured Hash Table for KV storage
Riak默认采用的存储引擎就是bitcask,在开发bitcask的时候,LevelDB还没有开放出来
刚听到的消息,Riak的公司basho,已经离解散不远了,CTO已经离职,公司好像只有1、2人了

Data

data files

older data files will be merged and compacted

data entry

Index

reside in mem

hash index

读取file_id对应文件的value_pos开始的value_sz个字节,就得到了我们需要的value值

Write

1
2
append to active data file
update hash table index in memory

Hint File

为了冷启动时加快index生成速度:重建hash table时,不需要full scan data files,只scan much smaller hint file

hint file

Issues

  • 由于index是包含所有key的,因此可以容纳的数据量跟内存有很大关系,每个index至少占40B
  • 不支持range/sort操作
    如果把hash table变成skiplist,而merge sort older data files,就可以支持range了
Share Comments

hashicorp/raft implementation

Primer

Log entry = (Term, Index, Command)
in RDBMS, Log=WAL/redo log, FSM=records

Components

  • FSM
  • LogStore
    • LastIndex
    • FirstIndex
  • StableStore
    • CurrentTerm
    • LastVoteTerm
    • LastVoteCand
  • SnapshotStore
  • PeerStore

RPC Protocol

msgpack serializer

  • AppendEntries
    Leader发起
    If got reponse with Success=false,then step down(退位)
    • Term
      Each term begins with an election
    • Leader
      partition后,old leader在复制日志时,通过它发现new leader,并step down
    • PrevLogIndex, PrevLogTerm
      它们确保在相同term/index上的log内容完全一致, 而且之前的所有log内容一致:safety
    • LeaderCommitIndex
      只有committed log才能被FSM Apply
    • []Log
      • Term, Index
      • Type
        • LogCommand
          复制日志
        • LogAddPeer
        • LogRemovePeer
        • LogNoop
        • LogBarrier
  • RequestVote
    Candidate发起,if not Granted,step down to follower
    leader/candidate也可能收到RequestVote
    发起投票时,可能会收到AppendEntries,比较term来决定进入follower状态还是拒绝
    • Term
    • LastLogIndex, LogLogTerm
      选民如果发现candidate的log没有自己的新,则拒绝投票
      阻止一个不包含所有committed entries的candidate成为leader
  • InstallSnapshot
    Leader发起

2PC

Leader’s log is ‘the truth’

2 phase commit

2.1是第一次AppendEntries,3.1后Leader apply to local FSM并update commit index,就可以向client返回了
4.2是下一次AppendEntries,leader通知followers最新的commit index,followers才会apply to FSM

When the Commit Index is updated, the node will pass all commands between the new and old Commit Index to the state machine.

phase2的作用:
uncommitted log是可能回滚的

1
2
3
4
5
A(leader),B,C,D,E 5个节点,partition成A(leader)/B,C(leader)/D/E
A: apply(set x=9)
C: apply(set x=8)
由于只与leader(A)心跳,B并不知道已经partition了。如果没有phase2,B会直接更改FSM,造成consensus失败
partition修复后,A会退位成follower,同时A/B的(set x=9)这个log entry会回滚

Commit

For a leader to decide an entry is committed

  • must be stored on majority
  • at least one new entry from current term must also be stored on majority

commit safety

1
2
3
4
5
6
7
8
9
10
11
12
1) (term2, index3),S1 leader,index3收到S2的ack,但没收到S3的ack
即S3.LastIndex=2,此时S1 crash
2) S5成为term3 leader(S3,S4 vote for it)
term3内S5有3条append,但由于partition了,其他机器term=3
此时重新选举,S1成为term4 leader
3) S1恢复,成为term4 leader,并把(term3, index3)发给S3。
此时,不能认为(term2, index3)是可以安全commit的
因为,如果此时S1 crash,currentTerm(S2,S3)=2
S5可能成为term4 leader,'committed' log lost
4) 但如果S1在term4内有一个majority ack的log entry
(term2, index3)就可以安全commit,因为majority currentTerm=4
那样,S5不不可能成为leader

Election

leader要给peers发心跳,阻止新选举(如果一直没有Apply,多久发心跳?)

处理RequestVote RPC

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 同一个term,只会有一个leader,但由于partition的存在,可能存在多个leader在不同的term:新的leader的term更高
if req.Term < r.getCurrentTerm() {
// rejected
return
}
if req.Term > r.getCurrentTerm() {
// Ensure transition to follower
r.setState(Follower)
r.setCurrentTerm(req.Term)
}
// 每个server只会给每个term投一票,按照先来先给原则
lastVoteTerm := r.stable.GetUint64(keyLastVoteTerm)
if req.Term == lastVoteTerm {
return
}
// 保证被选为新leader的server拥有所有的已经committed的log entry
lastIdx, lastTerm := r.getLastEntry()
if lastTerm > req.LastLogTerm {
return
}
if lastTerm == req.LastLogTerm && lastIdx > req.LastLogIndex {
return
}
// 每次投票要持久化
r.persistVote(req.Term, req.Candidate)
resp.Granted = true // 同意

Replication

leader保存每个followerReplication状态

1
2
3
4
5
6
7
8
type followerReplication struct {
currentTerm uint64
matchIndex uint64
nextIndex uint64 // AppendEntries时,通过nextIndex--来truncate conflict,直到找到同步点
// 初始值是 1 + LastLogIndex(leader)
lastContact time.Time
}

Repair follower logs

  • delete extraneous entries
  • fill in missing entries

startup

1
2
3
4
5
6
从StableStore里取currentTerm,并设置当前term
从LogStore里取last Log,并设置lastLogIndex/lastLogTerm
从PeerStore取peers
进入Follower状态
restoreSnapshot: 通过SnapshotStore取snapshot,并调用FSM.Restore
启动3个goroutine: run(runFollower/runCandicate/runLeader)/runFSM/runSnapshot

runFSM

1
2
执行FSM的3个方法
Apply, Restore, Snapshot

runLeader

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
r.leaderCh <- true
// 启动每个peer(除了自己)的异步复制goroutine
for _, peer := range r.peers {
r.startReplication(peer)
}
先发送一个AppendEntries(AddPeer)当做noop心跳
// leader loop
for r.getState() == Leader {
select {
case rpc := <-r.rpcCh:
r.processRPC(rpc)
case <-r.stepDown:
r.setState(Follower)
case <-r.commitCh: <--------------------------+
r.processLogs |
case log := <-r.applyCh: |
r.dispatchLogs { |
// 先写本地 |
// 如果失败,就step down Follower |
LogStore.StoreLogs(logs) |
|
// 再复制给followers |
// inflight在获得majority的时候会通知r.commitCh
inflight.StartAll(logs)
notify each follower replication goroutine
}
case <-lease:
// 如果LeaderLeaseTimeout内没有contact quorum
// 就step down to Follower
case <-r.shutdownCh:
return
}
}

appendEntries RPC

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func appendEntries(rpc RPC, a *AppendEntriesRequest) {
// 每次执行,无论成功失败,都要给回复
defer rpc.Responde(resp)
// 比我的term小,reject
if a.Term < r.getCurrentTerm() {
return
}
if a.Term > r.getCurrentTerm() || r.getState() != Follower {
r.setState(Follower)
a.setCurrentTerm(a.Term)
resp.Term = a.Term
}
r.setLeader(a.Leader)
// log conflict detection
if a.PrevLogEntry > 0 {
取得本地a.PrevLogEntry上的Log,并比较a.PrevLogTerm与Log.Term是否相同
如果不同,那么表示log conflict, return
}
// 处理a.Entries
//
// 1 2 3 4 5
// x y z local Logs
// o p q w a.Entries
r.logs.DeleteRange(2, 3) 因为冲突
r.logs.StoreLogs(a.Entries)
// 处理piggyback phase2的commit
if a.LeaderCommitIndex > 0 && a.LeaderCommitIndex > r.getCommitIndex() {
}
}

runCandidate

1
2
electSelf
中间,可能会收到RPC,也可能会SetPeers

Configuration Change

采用2PC方法: C_old -> C_old+new -> C_new

http://zookeeper.apache.org/doc/trunk/zookeeperReconfig.html
zookeeper 3.5.0开始,也有了动态修改cluster的功能

Q & A

time

1
2
broadcast time << election timeout << MTBF
[T, 2T]

为什么每个node上的current term需要持久化?

It is best guess, persistent for recovery after crash.

恢复时,从leader拿不行吗?

为什么每个node上的votedFor要持久化?

为了保证election safety: allow at most one winner per term
term1,A vote for B,然后A crash,等A恢复了,如果voteFor不持久化,可能它对term1又vote for C了

成为leader后,立刻发heartbeat还是等heartbeat timeout?

如果一个follower的AppendEntries失败,leader怎么处理?

一直retry,Leader’s log is ‘the truth’.
Log是幂等的,因为有term/index,可以很容易排重

但在client方面,就没有保障了:
如果leader crash after executing command but before responding?
client如果盲目retry,有可能造成重复执行
解决办法:client在发送log时,在每个命令上加入id,确保幂等
leader上保存每个follower的index

Engineering

Config

HeartbeatTimeout = ElectionTimeout = 1s
LeaderLeaseTimeout = 500ms
CommitTimeout = 50ms
MaxAppendEntries = 64
SnapshotInterval = 2m
SnapshotThreshold = 8192

Group Commit

0 < MaxAppendEntries <= 1024

1
2
3
4
5
6
7
8
9
10
newLog := <-r.applyCh
ready := []*logFuture{newLog}
for i := 0; i < r.conf.MaxAppendEntries; i++ {
select {
case newLog := <-r.applyCh:
ready = append(ready, newLog)
default:
break
}
}

Lease

除了follower通过被动接受心跳来检测leader存活,leader本身也通过与majority follower
的response来判断自己是否已经被partition了,如果是,进入Follower状态

Pipeline

仅仅用于AppendEntries,通过channel实现多次发送RPC给follower而不等待response
但如果有错误响应,立刻取消pipeline模式

max outstanding AppendEntries RPC calls = 128

Limitations

  • Apply([]Log)
    只能在leader上发起,follower没有自动redispatch
    applyCh是no buffer的

  • StableStore.GetUint64
    如果没有找到key,返回的error必须是”not found”

  • LeaderCh() might lose event

Paxos

concensus overview

server replication (SR), log replication (LR), synchronisation service (SS), barrier orchestration (BO), service discovery (SD), leader election (LE), metadata management (MM), and Message Queues (Q).

CAP

证明:
在一个network partition的2个节点,现在有两个client分别向他们发送冲突的请求,如果要C,那么必然有一个节点要拒绝:牺牲A;如果要A,必然牺牲C

References

1985 FLP Impossibility Result
1988 Oki and Liskov’s Viewstamped Repication
1998 Lamport’s original Paxos paper “Part-Time Parliment”
2000 Brewer proposes CAP conjecture during Keynote at PODC
2002 Proof of CAP by Gilbert and Lynch(CAP 2年后才被正式证明)
2005 Lamport’s Technical Report on Generalized Consensus & Paxos
2013 Raft Consensus Paper first available online

http://www.cnblogs.com/foxmailed/p/3418143.html
http://raftuserstudy.s3-website-us-west-1.amazonaws.com/study/raft.pptx

http://www.read.seas.harvard.edu/~kohler/class/08w-dsi/chandra07paxos.pdf
https://www.cse.buffalo.edu//~demirbas/publications/cloudConsensus.pdf
http://www.cl.cam.ac.uk/techreports/UCAM-CL-TR-857.pdf

Share Comments

Linux 2.6.19 TCP implementation

read

read

  • /proc/sys/net/ipv4/tcp_rmem
    TCP recv Buffer
  • net.core.netdev_max_backlog
    maximum number of packets queued at a device, which are waiting to be processed by the TCP receiving process
    Socket Backlog, dropped if full

write

write

  • txqueuelen
    ifconfig eth0 txqueuelen
Share Comments

an IM implementation

Architecture

Pros

  • 通过kafka将上行、下行消息分开处理
  • 通道和逻辑分离

Cons

  • 逻辑服务器对session服务器(router)的感知,通过一致性哈希解决扩容问题
  • 虽然可以进行多IDC部署,但设计上并没有充分考虑
    目前腾讯其实是所有IDC router 全局同步的,这样各个IDC甚至可以直连本IDC的router模块查询
  • 还没有实现离线消息
  • 实现上对可靠性的处理比较naive,存在多处丢失消息的case
  • 配置上静态服务地址绑定
    • 虽然可以通过VIP LB
    • 每台comet需要配置所有logic addrs
    • 每台logic需要配置所有router addrs
    • 每台job需要配置所有comet addrs

系统组成

  • 接入服务器/前置机(comet)
    负责消息的接收和投递
    client只接触到它,通过websocket/http
  • 业务服务器(logic)
    认证、路由查询、消息kafka持久化
  • session存储服务器(router)
    保存路由信息表
  • push服务器(job)
    从kafka取消息,通过router里的信息找到对应comet服务器,进行投递

kafka

1
2
3
4
5
6
7
type KafkaMsg struct {
OP string
RoomId int32
ServerId int32
SubKeys []string
Msg []byte
}

Q & A

What if comet crash?

Client端是有token的,session不会有问题

What if router crash?

big trouble

What if job crash?

job is stateless,会产生kafka consumer group rebalance,不影响。
如果只有一个job,重启即可

A发消息给B,C,D,产生几条kafka msg?

多条(在push的时候,job->comet,这里可以合并一些RPC请求)

但向room发,无论room里有多少人,都只产生一条kafka msg.

What if client conn broken?

send时,就已经把路由信息写到kafka消息了。
job push时,采用的是kafka里的路由信息,但这时候可能已经发生变化,造成丢消息

Share Comments

geo-distributed storage system

Primer

Network of network.

Why

  • scale
    one datacenter too small
  • disaster tolerance
    protect data against catastrophes
  • availability
    keep working after intermittent problems access
  • locality
    serve users from a nearby datacenter

Challanges

  • high latency
  • low bandwidth
  • congestion
    overprovisioning prohibitive
  • network partitions

Replication

1
2
3
4
5
6
read-write | state machine
----------------------+-----------------
sync ABD | Paxos
async logical clock | CRDT
ABD(Alike Backup Delegate)

Congestion

Movivation

  • bandwidth across datacenter is expensive
  • storage usage subject to spikes

如何准备带宽?

  • 按峰值
    成本高,利用率低
  • 按均值
    网络拥塞

Solution

  • weak consistency
    async replication
  • priority messages
    should be small

vivace algorithm

read-write algorithm based on ABD

算法

  • 把数据和timestamp分离
    把数据和元数据分离,元数据优先级更高
  • replicate data locally first
  • replicate timestamp remotely with prioritized msg
  • replicate data remotely in background

write

1
2
3
4
obtain timestamp ts
write data,ts to f+1 temporary local replicas (big msg)
write only ts to f+1 real replicas (small msg, cross datacenter)
in background, send data to real replicas (big msg)

read

1
2
3
read ts from f+1 replicas (small msg)
read data associated with ts from 1 replica (big msg, often local)
write only ts to f+1 real replicas (small msg, cross datacenter)

Transaction

snapshot isolation(SI)

total ordering of update transactions

问题

  • it orders the commit time of all transactions
    even those that do not conflict with each other
  • forbids some scenarios we want to allow for efficiency

parallel snapshot isolation(PSI)

causality: if T1 commits at site S before T2 starts at site S then T2 does not commit before T1 at any site

1
2
3
4
5
6
7
SI PSI
------------------- ---
1 commit time 1 commit time per site
1 timeline 1 timeline per site
read from snapshot read from snapshot at site
no w-w conflict no w-w conflict
causality property

idea1: preferred sites

每个key分配一个唯一的preferred site(例如,在长沙的用户它的preferred site就是长沙IDC),在该site上,可以使用fast commit(without cross-site communication)
类似primary/backup,不同的是preferred sites不是强制的,key可以在任何site修改

但问题是

  • what if many sites often modify a key?
  • no good way to assign a preferred site to key

idea2: CRDT

serializable

SI/PSI都有write skew问题
通过transaction chains[sosp 2013]可以有效地提供serializable isolation

References

http://dprg.cs.uiuc.edu/docs/vivace-atc/vivace-atc12.pdf

Share Comments

chain replication

与primary/backup repliation不同,是一种ROWAA(read one, write all available)方法,比quorum有更高的可用性

chain replication

References

https://www.usenix.org/legacy/event/osdi04/tech/full_papers/renesse/renesse.pdf
http://snookles.com/scott/publications/erlang2010-slf.pdf
https://github.com/CorfuDB/CorfuDB/wiki/White-papers

Share Comments

http Preconnect

https://www.igvita.com/2015/08/17/eliminating-roundtrips-with-preconnect/

Share Comments

TLS Session Resumption

TLS

There are two mechanisms that can be used to eliminate a round trip for subsequent TLS connections (discussed below):

  • TLS session IDs
    • ServerHello时,server生成一个32字节的session ID给client,后面的TLS握手client可以在它的ClientHello里发送这个id,server就会restore the cached TLS context and avoid the 2nd round trip of TLS handshake
    • nginx支持该方式
      • ssl_session_cache
      • ssl_session_timeout
  • TLS session tickets
    与session IDs类似,只是session信息保存在client

https dialog

Share Comments

AWS Marketplace

Why shop here?

  • cloud experience
    install/deploy that software on your own EC2 with 1-Click

Why sell here?

  • AWS已经为ISV创建很好的生态
  • gain new customers
  • enable usage-based billing
Share Comments