blockchain

Overview

目前公开的bitcoin只能支持每秒7笔的吞吐量,一般对于大额交易来说,安全的交易确认时间为1小时(6个block生成时间)

中本聪设计比特币时,为区块设置了1MB的容量限制,使每一个区块只能容纳4096个交易;同时,工作量证明机制使得确认交易并将交易记录到区块链中需要约10分钟

bitcoin是一种通货紧缩货币,到2140年到达最大值:2100万个比特币

blockchain的所谓去中心化,实际上是建立一个少数服从多数的分布式权威网络,只要这个网络上节点足够多,就是可以信赖的
以前,用户trust银行;现在,用户trust majority nodes in blockchain network

Bitcoin accounts: address
public key: 1MKe24pNsLmFYk9mJd1dXHkKj9h5YhoEey
private key: 5KkKR3VAjjPbHPzi3pWEHVQWrVa3C4fwD4PjR9wWgSV2D3kdmeM

Currently, the top 10 mining pools consistently create about 90% of the blocks, and China-based pools create more than 60% of the blocks.
所以,虽然bitcoin设计上是去中心的,但目前的现状是实际上控制在少数人手里

miners

bitcoin

  • Blocks
    481,823
  • Total BTC
    16.523M
  • Difficulty
    923233068449
  • Network total
    7,983,858 trillion hash per second
  • Blocks/hour
    7.25

Key Design

BlockChain是以牺牲效率和性能为代价但换来了可靠的安全的最终一致性的分布式数据库

  • 与数据库不同,它不保存状态,只保存commit log,而通过commit log来计算出状态
    bitcoin的blockchain里并不保存balance,只有[]transaction(inputs, outputs, sender, receiver)
  • no range query
  • blockchain可以想象成只有1个table的replicated database
  • each transaction must commit before next transaction can enter proposal phase
    • throughput bottleneck

关键技术

  • P2P
    节点发现和通信
  • hash, 非对称加密
    sender: sign(privatekey_of_sender, publickey_of_receiver)
  • merkle tree
  • wallet app

Block

blockchain

Merkle Tree

每一个block有一个merkle树,叶子节点是transaction,每个中间节点有2个指针(2叉树)
如果修改某一个transaction叶子,就会传递到tree root

merkle tree,目前还没有在bitcoin中使用,但它的作用是:
确认一个交易是否被confirm,只要下载block header+merkle tree,而不用下载整个block

Proof of Work

sha256(‘pay me, joe, 25 bitcoins. 000001’) => 4699f0e443b73ddd6fdc14d4662395361fa21f54e647c64e643a49c54fef511c
sha256(‘pay me, joe, 25 bitcoins. 000013’) => 0ac09823cf2309c63d425c21b1c3d83f6dbc4b8acfb80b37b2db3d544192b7e9

猜到第13个nonce,就得到1个0开头的hash
bitcoin目前的难度是需要14个0开头,实际上每增加1个0,就把原来的计算量提高了16倍

bitcoin sha256(input + nonce),其中input包括

  • computer code
  • receiving address(public key)
  • transactions
  • timestamp
  • prev hash

Sybil Attack

女巫攻击,来源是70年代的一部叫做《Sybil》的美国系列片。片中的女主角人格混乱,扮演着16个角色。

Transaction

1
2
3
4
5
6
7
8
9
1. client send payment to replica nodes, proposing new log entries
2. proposal会在replica nodes间进行广播(unconfirmed transaction),进行leader election: proposal phase
transaction临时保存在node(miner)的内存
3. 通过PoW,leader胜出,本地append到blockchain,并广播到其他replicas
miner之间是竞争关系,都在正确尽快solve the puzzle to be leader
miner会把unconfirmed transactions进行合并(batch),生成到一个block(一个block最多4096个transactions)
4. 所有replicas append to local blockchain,transaction confirmed
block chain实际上实现了global transaction time order

Block Acceptance

收到miner发来的新block后,node会very block, verify each transaction, verify not spent
都成功,就加入local blockchain

如果local blockchain没有与main blockchain同步,那么verify block会失败,它会通过P2P请求同步,把unsynced blocks追上:

1
2
3
4
node1: block 1, 2, 3, 4
main blockchain: 1, 2, 3, 4, 5, 6, 7
此时node2成功创建了block8,并广播给node1
node1发现block 5-7还没有同步过来,先进行同步,然后再把block8 append to local blockchain

Confirm

确认数 = 该交易所在的block在blockchain中后面的block数量 + 1
如果确认数 >= 6,那么就可以认为无法逆转了

Consensus

通过PoW,每次生成block时进行一次leader选举,由leader生产new block,猜数成功的node自认为成为了leader,然后通过P2P广播(gossip)
由于猜数比较困难,多节点同时成为leader并且在接收到其他leader广播前猜成功的可能性非常小,但还是存在可能性,即多主,这就需要解决冲突

miner一旦recv a new block,就意识到在这个block的race上自己输了,它会立即把一些pending transactions和这个收到的block作为prev hash来构建下一个block

Conflict Resolve

采用longest chain rule,发现冲突时,block chain fork branch,在发现longest chain时,把short chain删除:但可能会造成confirmed transaction lost

conflict
resolve

Double Spent

double-spent

Public keys as identities

Alice pays Bob,Alice会把这个交易通过private key进行签名,广播给miners

  • Bob不需要参与
  • 交易信息里只包含Alice和Bob的public keys(address)
  • You can create a new identity at any time by generating a new key pair
    with no central authority or registry

验证身份verify

signature

比特币的所有权是通过数字密钥、比特币地址、数字签名来确立的
密钥实现了去中心化的信任,所有权认证

创建交易时,payer利用自己的private key给transaction签名,同时把自己的public key存放在消息里:(payer_public_key, signature)
miners通过这个信息,就可以verify这个transaction确实是payer发出的
同时,transaction里也包含了payee的public key信息,只有payee利用他的private key才能解开

key pair,非对称加密的特性:

  • 用私钥加密的内容,只能用公钥解密
    如果能通过(payer_public_key, signature)能解密,就证明了payer的身份
  • 用公钥加密的内容,只能用私钥解密
    只有payee才能用他的私钥解密交易
    每个交易必须有效的签名才能被存入ledger
    当payee花销这笔钱时,他必须给这个交易签名,由于这笔钱已经在ledger里记录了payee的公钥,只有payee才能签名
  • 公钥用于收钱,私钥用于花钱时生成数字签名
  • 通过私钥能计算出公钥,但反过来不行
    只要私钥不丢,公钥丢了也没关系

如果我知道Alice的public key(X),然后创建一笔交易:X支付给me 1BTC,系统是怎么知道有诈的?
首先我不知道X对应的私钥,只能拿我的私钥对交易加签名,miner通过X和signature就能验证:invalid signature

Upgrade

https://github.com/bitcoin/bips

BIP(bitcoin improvement proposal)在github上发布,miners自愿地下载、启动,但不是强制的

Q & A

为什么blocks通过hash进行chain,而不是通过普通的编号?

因为hash具有不可篡改性,hash本身包含了内容的指纹

同时有很多client进行交易,full node一直在瞎忙活?

T1, T2, Tn被分发到n个full node,那么每个node都开始猜数,其中一个node(N1)猜中,开始广播,此时N2~Nn还在为自己的Tn进行猜数,发现new block,就停下手中的活,重新生成新的block,并重新猜数
如果N1在广播时,消息还没有传到Nx,而此时Nx的猜数工作是不会停的;如果此时Nx也猜数成功,那么在它还没有收到N1广播前,就会广播自己的new block,此时fork出现
对于某一个full node,它可能会并发接收多个交易请求,它会进行串行化

如果识别一个account

通过wallet程序,每次交易都可以生成新的public/private key pair,由wallet管理并保存。如果wallet的数据丢了,那么bitcoin就无法证明是你的了

如何动态维护PoW的难度?

每个节点计算最近2016个solution,因为频率应该是10分钟一个,因此2016个,应该是2016/(6*24)=14天
而每个block里都有timestamp

1
2
3
4
elapsed = timestamp(block(-2016)) - timestamp(block(-1))
// 例如,假如elapsed = 7 day,那么说明难度系统太低了
// 即,移动平均数
difficulty = difficulty * 2

如何保证bitcoin address唯一?

Bitcoin address是由34个alphanumerical组成的字符串,但排除了O/T/l/0,它的空间是58^34,即
904798310844700775313327215140493940623303545877497699631104
但里面有几位用于checksum,它的实际空间是2^160,即
1461501637330902918203684832716283019655932542976

一个恶意node可以做什么,不能做什么?

它能

  • Refuse to relay valid transactions to other nodes
    但其他节点会replay
  • Attempt to create blocks that include or exclude specific transactions of his choosing
  • Attempt to create a ‘longer chain’ of blocks that make previously accepted blocks become ‘orphans’ and not part of the main chain

它不能

  • Steal bitcoins from your account
  • Make payments on your behalf or pretend to be you

为什么建议每次transaction都新生成key pair?

公钥虽然没有安全问题,但有隐私,毕竟它属于某一个人,如果我的交易都用一个key pair,那么最终这些交易可以发现是一个人的行为

如果写错了接受者payee公钥,这笔钱能回来吗?

首先,公钥(bitcoin address)是有校验位的,写错的时候,基本上就可以在提交时离线发现
如果恰巧校验一致,而地址是不存在的,那么交易会成功,但由于那个payee不存在,也就不存在对应的private key,也就无法spend it:黑洞,那笔钱永远消失了

如果本来我想给A钱,却输入时写成了B的address,那么:
Bitcoin transactions are not reversible. Sending to the wrong person cannot be undone.

Block timestamp如果做假怎么办?

A timestamp is accepted as valid if it is greater than the median timestamp of previous 11 blocks, and less than the network-adjusted time + 2 hours.
“Network-adjusted time” is the median of the timestamps returned by all nodes connected to you.

Whenever a node connects to another node, it gets a UTC timestamp from it, and stores its offset from node-local UTC. The network-adjusted time is then the node-local UTC plus the median offset from all connected nodes. Network time is never adjusted more than 70 minutes from local system time, however.

如果真有人控制了51%计算能力,会发生什么?

attacker只能把他刚花的钱payment取消掉,即double spent

据说bitcoin最大吞吐量是7 TPS,怎么计算来的?

每个block最大1MB,4096个transaction,10分钟产生一个block

4096/(10*60) = 6.83 = 7

miner做恶的惩罚

miner竞争成功后,创建一个block,它会获得奖励;如果它发布一个非法的block,那么大部分miners会拒绝;并在下一个block时,该非法block被取消,同时它的奖励也被取消。除非,它能保证它一直是竞争成功

seed nodes?

hard coded
https://github.com/bitcoin/bitcoin/blob/863e995b79ec388bf292d80f181912d01e20e2e5/src/net.cpp#L1198

1
2
3
4
5
6
7
8
unsigned int pnSeed[] =
{
0x959bd347, 0xf8de42b2, 0x73bc0518, 0xea6edc50, 0x21b00a4d, 0xc725b43d, 0xd665464d, 0x1a2a770e,
0x27c93946, 0x65b2fa46, 0xb80ae255, 0x66b3b446, 0xb1877a3e, 0x6ee89e3e, 0xc3175b40, 0x2a01a83c,
0x95b1363a, 0xa079ad3d, 0xe6ca801f, 0x027f4f4a, 0x34f7f03a, 0xf790f04a, 0x16ca801f, 0x2f4d5e40,
0x3a4d5e40, 0xc43a322e, 0xc8159753, 0x14d4724c, 0x7919a118, 0xe0bdb34e, 0x68a16b2e, 0xff64b44d,
// 列出了500多个节点
}

References

https://bitcoincharts.com/
https://blockchain.info/
https://en.bitcoin.it/wiki/Vocabulary
http://www.cs.rice.edu/Conferences/IPTPS02/101.pdf
https://www.cryptocompare.com/wallets/guides/how-to-create-a-bitcoin-address-from-a-public-key/

Share Comments

perceptual hash

这种算法的优点是简单快速,不受图片大小缩放的影响,缺点是图片的内容不能变更。如果在图片上加几个文字,它就认不出来了。所以,它的最佳用途是根据缩略图,找出原图。

实际应用中,往往采用更强大的pHash算法和SIFT算法,它们能够识别图片的变形。只要变形程度不超过25%,它们就能匹配原图。这些算法虽然更复杂,但是原理与上面的简便算法是一样的,就是先将图片转化成Hash字符串,然后再进行比较。

比较图片的相似度
图片尺寸可以不同,不同的长宽比例,小范围的颜色不同(亮度、对比度等)
对旋转都不具有鲁棒性:但可以sample argument

1
2
3
4
5
6
7
8
9
10
11
12
13
img = resize(img, (8, 8))
img.grayscalize(level=64)
avg = img.average() // e,g. 78
for i, b = range img { // (8, 8) => img[0...63]
if b >= avg {
img[i] = 1
} else {
img[i] = 0
}
}
// img现在就是一个64位的二进制整数, 这就是这张图片的指纹
HanmingDistance(img1, img2) // 汉明距离,如果小于5就说明两副图相似
Share Comments

GFS evolution and BigTable

GFS

GFS,2001年开发出来,3个人,1年,论文发表于2003
BigTable,2004年开发出来,论文发表于2006

master

  • replicated operation(redo) log with checkpoint to shadow masters
    • 当时没有自动failover,全是人工操作DNS alias
    • read请求可以使用shadow masters
    • 生成checkpoint时,master切换到新的redo log file,创建新线程生成checkpoint
      在此过程中发生的变化,记录到新redo log file
    • operation log保证了mutation的全局有序
    • save checkpoint
      很可能是通过mmap+msync
  • bottleneck?
    • 当时部署了多个cluster,根据业务属性
      最大的超过1000节点,300TB
    • 过高的OPS
      由于redo log需要复制,master的tps也就在1万左右
      • client cache
        何时invalidate and recall master?
        • primary unreachable
        • primary reponse: no lease
      • client batch request
      • lease
    • 内存容量
      • prefix compression to reduce memory footprint
      • 每个chunk(64MB),metadata占内存64B
        如果保存1百万个文件(64TB),需要64MB内存
        如果保存10亿个文件(64PB),需要64GB内存
        实际上是到了5千万个文件,10PB的时候,master已经到达瓶颈了

metadata

1
2
3
4
5
6
7
8
9
10
11
type metadata {
filename string // prefix compression
owner uid
perm acl
chunks []chunk {
chunk_id int64
version int
refCount int // for snapshot COW
chunkservers []string // ip addr
}
}

delete

rename to hidden file,保留3天,之后真正删除

write/mutation

  • operation log replicated between master and shadows
  • client write data flow to pipelined chain chunkservers
  • primary write control flow to secondary chunkservers

read(filename, offset, size)

1
2
3
4
5
6
7
8
9
client根据(offset, size)计算出需要哪些chunks,假设chunk1, chunk2
client把(filename, chunk index [1, 2])类似batch一样发送master
master返回{chunk1: {chunkId1, [ip1, ip2, ip3]}, chunk2: {chunkId2, [ip4, ip5, ip6]}}
client.putInCache(metadata)
client顺序地获取chunk1, chunk2
对于chunk1,根据ip1-3与client ip的距离,找最近的,取chunk;如果失败,找下一个最近的chunkserver
client.sendToIp1(chunkId1, offset1, size1)
// stale read是存在的,read的时候chunkserver可能发现data corruption

磁盘错误的解决

file -> chunk -> block

在每个chunkserver上执行,通过checksum检测

每个chunk由多个64KB的block组成,每个block有一个32位的checksum

read repair,如果chunkserver发现了一个非法block,会返回client err,同时向master汇报

  • client会从其他replica read
  • master会从其他有效的replica把这整个chunk clone到另外一个chunkserver,然后告诉有问题的chunkserver删除那个chunk

Questions

chunk为什么64MB那么大?

  • 减少master内存的占用
  • 减少client与master的交互
    同一个chunk的R/W,client只需要与master交互一次
  • 可以很容易看到机器里哪些机器磁盘快满了,而做迁移
  • 可以减少带宽的hotspot
    如果没有chunk,那么1TB的文件就只能从一个replica读
    有了chunk,可以从多个replica读
  • sharding the load
  • 加快recovery时间
    每个chunk,可以很快地被clone到一台新机器
  • 如果一个file只有1MB,那么实际存储空间是1MB,而不是64MB
    但它会增加master file count问题
  • 可以独立replicate
    • 文件的一部分损坏,可以迅速从好的replica恢复
    • 支持更细粒度的placement
  • 支持超大文件

chunk hotspot问题

MapReduce时,需要把代码发布到GFS,很可能小于64MB,即只有1个chunk,当很多mapper时,这个chunkserver就成为hotspot
解决办法是:增加replication factor

但最终的解决方案是:client间P2P,互相读

master为什么不用Paxos?

master通过redo log sync replication来提高可靠性,但没有election过程,都是完全手工进行failover

我猜,chubby当时还没有启动,chubby论文发表于2006

master为什么不持久化chunk location?

其他的metadata是有redo log replication并持久化的,他们的变化,都是主动产生的,例如创建一个文件,增加一个chunk
而由于chunkserver随时可能crash,不受控制,因此通过heartbeat来计算并存放内存,通过heartbeat,master又可以修正chunkserver的一些错误,例如orphan chunk

Data flow为什么pipelined chain,而不并发?

为了避免产生网络瓶颈,同时为了更充分利用high latency links
通过ip地址,感知chunkserver直接的距离

Total Latency = (B/T) + (R*L)

2PC,避免了client(coordinator) crash问题,因为primary成为了coordinator,而它是有failover的

1
2
3
4
5
6
7
8
9
10
11
12
13
client负责把一个write请求分成多个chunk的请求
Phase1: data flow client -> chained chunkservers
相当于prepare,但数据不落盘
client由近及远地chain把数据写入相应chunkserver的LRU buffer
这个顺序跟primary在哪无关
Phase2: control flow client -> primary -> secondary chunkservers
相当于commit,数据visible
确定mutation order
Phase1出错,则等master来修复,把crashed chunkserver摘除
Phase2出错,primary->secondary,这个阶段,那么primary返回client err,client会重试,此时可能出现不一致的状态,但最终master会修复

为什么搞个primary角色,而不让master做?

为了减轻master负担,所以搞了个二级调度:
跨chunk,master负责;chunk内部,primary负责

master如何revoke primary lease?

在lease expire后,master可能什么都不做
在lease expire前,master会sendto(primary)让它取消;如果sendto失败,那么只能等expire

为什么data flow和control flow分开?

如果不分开,那么所有的数据都是client->primary->secondary
分开后,比较轻量级的control flow必须走primary扩散;重量级的data flow可以根据物理拓扑进行优化

GFS vs Ceph

  • 论文2003 vs 2006
  • chunk(64MB) vs Object(4MB)
    object size可配
  • master vs mon(Paxos)
  • chunkserver vs osd
  • replication
    • GFS
      2PC, decouple data/control flow
    • Ceph
      client <-> osd
  • Ceph通过PG+crunch提高了扩展性
    GFS通过allocation table的方式
  • GFS上直接跑MapReduce
    计算向存储locality
  • Ceph更通用,latency更好
    GFS通过lease提高扩展性,但遇到错误时只能等expire
  • 节点的变化
    • GFS
      chunkserver向master汇报,自动加入,完全不需要人工参与
    • Ceph
      需要通过ceph osd命令,手工执行
  • namespace
    GFS是directory,Ceph是flat object id

2009 GFS回顾

GFS在使用了10年的过程中,发现了一些问题,对这些问题,有的是通过上层的应用来解决的,有的是修改GFS解决的

master ops压力

最开始的设计,考虑的是支撑几百TB,几百万个文件。但很快,到了几十PB,这对master有了压力

  • master在recover的时候,也变慢
  • master要维护的数据更多
  • client与master的交互变慢
    每次open,client都要请求master
    MapReduce下,可能突然多出很多task,每个都需要open,master处理能力也就是每秒几千个请求
    解决办法是在应用层垂直切分,弄多个cluster,应用通过静态的NameSpace找自己的master,同时提升单个master能力到数万ops

随着GFS的内部推广,越来越多的千奇百怪的上层应用连接进来

  • 最开始是爬虫和索引系统
  • 然后QA和research组用GFS来保存large data sets
  • 再然后,就有50多个用户了
  • 在此过程中GFS不断地调整以满足新use case

file-count问题

很早就发现了,例如:

  • 前端机上要把log发到GFS保存以便MapReduce分析,前端机很多,每个log每天会logrotate,log的种类也越来越多
  • gmail需要保存很多小文件
    解决办法是把多个文件合并,绕开file-count问题,同时增加quota功能,限制file-count和storage space
    长远的办法:在开发distributed multi-master系统,一个cluster可以有上百个master,每个master可以存1亿个文件,但
    如果都是小文件,会有新的问题出现:more seek
    再后来,建立在GFS之上的BigTable推出了,帮助GFS直接面对应用对小文件、多文件的需求,BigTable层给解决了,BigTable在使用GFS时,仍然是大文件、少文件

latency问题

GFS设计是只考虑吞吐率,而少考虑latency

error recovery慢

如果write一个文件,需要写到3个chunkserver,如果其中一个卡住了或crash,master会发觉(heartbeat),它会开新的一个chunkserver replica从其他chunkserver pull
master会把这个lock,以便新的client不能write(等恢复后再unlock)
而这个pullchunk操作,为了防止bandwidth burst,是有throttle的,限制在5-10MB/s,即一个64MB chunk,需要10s左右
等恢复到3个ok的时候再返回给client,client再继续write
在此过程中,client一直是block的

master failover慢

刚开始master failover完全靠人工,可能需要1h;后来增加了自动master failover,需要几分钟;再改进,可以在几秒钟内完成master自动切换

为吞吐量而设计的batch增加latency

解决办法

BigTable是无法忍受那么高的延时的,它的transaction log是最大的瓶颈,存储在GFS:
2个log(secondary log),一个慢,就切换到另外一个,这2个log任意时刻只有1个active,并且log entry里有sequence号,以便replay时防重
google使用这个request redundancy or timeout方法很广泛,为了解决search long tail latency,一样思路

Gmail是多机房部署的,一个卡了,切到另外机房

consistency

client一直push the write till it succeeds
但如果中途client crash了,会造成中间状态:不同client读同一个文件,可能发现文件长度不同
解决办法:使用append,offset统一由primary管理

但append由于没有reply保护机制,也有问题:
client write,primary分配一个offset,并call secondary,有的secondary收到有的没收到,此时primary crash
master会选另外一个作为primary,它可能会分配一个新的offset,造成该数据重复
如果为此再设计一套consensus,得不偿失
解决办法:single writer,让上层应用保证不并发写

Colossus

Colossus is specifically designed for BigTable, 没有GFS那么通用
In other words, it was built specifically for use with the new Caffeine search-indexing system, and though it may be used in some form with other Google services, it is not the sort of thing that is designed to span the entire Google infrastructure.

  • automatically sharded metadata layer
  • EC
  • client driven replication
  • metadata space has enabled availability analysis

BigTable

(row, column, time) -> value

在有了GFS和Chubby后,Google就可以在上面搭建BigTable了,一个GFS的索引服务
但BigTable论文对很多细节都没有提到:SSTable的实现、tabletserver的HA,B+数的metadata table算法

为了管理巨大的table,按照row key做sharding,每个shard称为tablet(100-200MB,再大就split),每台机器存储100-1000个tablet
row key是一级索引,column是二级索引,版本号(timestamp)是三级索引

redo log和SSTable都存放在GFS的Chubby管理元信息的分布式LSM Tree

tabletserver没有任何的持久化数据,只是操作memtable,真正的数据存放在哪里只有GFS知道,那为什么需要master在chubby上分配tablet给tabletserver?
因为memtable是有状态的: level0

tabletserver的HA?
通过chubby ephemeral node,死了master会让别的server接管,通过GFS上的redo log恢复memtable
为了保证强一致性系统,同一时刻同一份数据只能一台tabletserver服务,tabletserver对每个tablet是没有备份的
当它crash,由于只需要排序很少的操作日志并且加载服务的tablet的索引,宕机恢复可以做到一分钟以内;在此期间,一部分rowkey不可用

split and migration?
在没有crash情况下,只需要修改metadata和从sstable加载索引数据,效率很高

与GFS的对应

  • commit log
    每台机器一个commit log文件,与GFS File一一对应
  • sstable
    HBase中Column Family的名称会被作为文件系统中的目录名称,每个CF存储成一个HDFS的HFile
    据google工作过的人说:Column Families are stored in their own SSTable,应该是这样
    sstable对应一个GFS File
    sstable block=64KB,它与GFS的block相同
    sstable block为了压缩和索引(binary search),GFS block为了checksum

Highlights

redo log合并

一台机器一个redo log,而不是一个tablet一个redo log(每个机器有100-1000个tablet),否则GFS受不了
group commit

带来的问题:恢复时麻烦了
如果一天机器crash了,它上面的tablets会被master分配到很多其他的tabletserver上
例如,分配到了100台新tabletserver,他们都会read redo log and filter,这样redo log被读了100次
解决办法:利用类似MapReduce机制,在recovery之前先给redo log排序

加速tablet迁移

1
2
3
4
5
sourceTablet.miniorCompaction() // 把memtable里内容dump到GFS的SSTable
sourceTablet.stopServe()
sourceTablet.miniorCompaction() // 把in-flight commit log对应的操作也持久化到GFS
// 这样targetTablet就不需要从commit log recover了
master.doSwitch()

SSTable由多个64KB的block组成

压缩以block为单位,虽然相比在整个SSTable上压缩比小(浪费空间),但对于随机读,可以只uncompress block而非整个SSTable

经验和教训

遇到了新的问题

  • 发现了Chubby的bug
  • network corruption
    通过给RPC增加checksum解决
  • delay adding features until clear how it will be used
    刚开始想给API增加一个通用的事务机制,后来发现大部分人只需要单行事务
  • 不仅监控server,也监控client
    扩展了RPC,采样detailed trace of important actions
  • 设计和实现都要简单、明了
    BigTable代码10万行C++
    tabletserver的membership协议的设计,最初:master给tabletserver发lease
    结果:在网络出问题时大大降低了可用性(master无法reach tabletserver就只能等expire)
    改进:实现了更复杂的协议,也利用了Chubby里非常少见的特性
    结果:大量时间在调试edge case,很多时间在调试Chubby的代码
    最终:回到简单的设计,只依赖Chubby,而且只使用它通用的特性

Questions

按照rowkey来shard,那么可能造成hotspot问题,client端比较难控制

2009 BigTable回顾

部署了500+个BigTable集群,最大的机器:70+ PB data; sustained: 10M ops/sec; 30+ GB/s I/O

  • Lots of work on scaling
  • Improved performance isolation
  • Improved protection against corruption
  • Configure on per-table
  • 异地机房复制: 增加了最终一致性模型
  • Coprocessor

    References

http://queue.acm.org/detail.cfm?id=1594206
http://google-file-system.wikispaces.asu.edu/
http://static.usenix.org/publications/login/2010-08/openpdfs/maltzahn.pdf
https://stephenholiday.com/notes/bigtable/

Share Comments

replication models

Models

  • primary-replica(failover, replica readable)/multi-master/chain
  • replica/ec
  • push/pull
  • sync/async
1
2
3
4
5
6
7
8
9
10
Dynamo PUSH coordinator -> preference list next nodes in hash ring
Raft/ZK PUSH master commit log -> majority ack
GFS/Hadoop chain replication
ES PUSH primary-replica model, master concurrently dispatch index req to all replicas
MySQL PUSH master async PUSH to slaves binlog
Kafka PULL(Fetch) ISR
redis
Ceph PUSH primary-replica sync model with degrade mode 同步写入
Swift
Aurora PUSH primary instance R(3)+W(4)>N(6)
Share Comments

Google container evolution

Babbysitter & GlobalWorkQueue -> cgroup -> Borg -> Omega -> Kubernetes

Container

容器提供的隔离还不完善,OS无法管理的,容器无法隔离(例如CPU Cache,内存带宽),为了防止(cloud)恶意用户,需要在容器外加一层VM保护

container = isolation + image

数据中心:机器为中心 -> 应用为中心
在Application与OS之间增加一层抽象:container
container的依赖大部分都存放在image了,除了OS的系统调用
实际上,应用仍然会受OS影响,像/proc, ioctl

好处

  • 更高的资源利用率
  • 监控的是应用,而不是混在一起的机器
  • 开发者、管理员不需关心OS
  • load balancer不再balance traffic across machines: across application instances
  • logs are keyed by application, not machine
  • 可以更容易识别是应用的失败还是机器的失败

教训

  • 不要给container端口号
    Borg做法是物理机上的所有容器共用主机的ip,每个容器分单独的端口号
    Kubernetes为每个pod分配不同ip,network身份=application身份
  • 不要给container编号
    用label
  • 不用暴露raw state
    减少client端的复杂度
    K8s里,都通过API Service访问状态信息

还没有很好解决的问题

  • Configuration
  • Dependency management
    手工配,最终一定不一致
    自动,无法获取足够的语义信息
Share Comments

Ceph Internals

RADOS

Key ideas

components

  • 把传统文件系统架构分隔成client component and storage component
    client负责更上层的抽象(file, block, object),storage负责底层object, disk
  • seperation of data and metadata
    各管各的

Object

Object是最终落地存储文件的基本单位,可以类比fs block,默认4MB
如果一个文件 > object size,文件会被stripped,这个操作是client端完成的

It does not use a directory hierarchy or a tree structure for storage
It is stored in a flat-address space containing billions of objects without any complexity

Object组成

  • key
  • value
    在fs里用一个文件存储
  • metadata
    可以存放在扩展的fs attr里
    因此对底层文件系统是有要求的,ext4fs对extended attr数量限制的太少,可以用xfs
1
2
3
4
5
+---logical------+ +--physical layer -+
| | | |
cluster -> pool -> pg ---> osd -> object -> fs files
| |
+-crush-+

Lookup

1
2
3
4
5
6
7
8
9
10
11
file = ino
obj = (ino, ono)
obj_hash = hash(ino, ono)
pg = obj_hash & (num_pg-1) // num_pg是2整数幂,例如codis里1024
// 官方推荐num_pg是osd总数的数百倍
// num_pg如果变化,会引起大量的数据迁移
osds_for_pg = crush(pg) // list of osds,此处是唯一需要全局cluster map的地方
// 其他的,都是算出来的
// osds_for_pg is ordered,len(osds_for_pg)=replicate factor
primary = osds_for_pg[0]
replicas = osds_for_pg[1:]

lookup

crush

一个hash算法,目标

  • 数据均匀的分布到集群中
  • 需要考虑各个OSD权重的不同(根据读写性能的差异,磁盘的容量的大小差异等设置不同的权重)
  • 当有OSD损坏需要数据迁移时,数据的迁移量尽可能的少

有点像一致性哈希:failure, addition, removal of nodes result in near-minimal object migration

crush

PG

PG(placement group)与Couchbase里的vbucket一样,codis里也类似,都是presharding技术

在object_id与osd中间增加一层pg,减少由于osd数量变化造成大量的数据迁移
PG使得文件的定位问题,变成了通过crush定位PG的问题,数量大大减少
例如,1万个osd,100亿个文件,30万个pg: 100亿 vs 30万

线上尽量不要更改PG的数量,PG的数量的变更将导致整个集群动起来(各个OSD之间copy数据),大量数据均衡期间读写性能下降严重

1
Total PGs = (Total_number_of_OSD * 100) / max_replication_count
1
2
3
4
5
6
7
8
$ceph osd map pool1 object1
osdmap e566 pool 'pool1' (10) object 'object1' -> pg 10.bac4decb (10.3c) -> up [0,6,3] acting [0,6,3]
$
// osdmap e566: osd map epoch 566
// pool 'pool1' (10): pool name and pool id,pool id从0开始,每新建+1
// pg 10.bac4decb (10.3c): placement group number 10.3c(pg id是16进制,256个pg,那么他们的id: 0, 1, f, fe, ff)
pg id实际上是pool_id.pg_id(pool id=10, pg id=3c)
// up [0,6,3]: osd up set contains osd.0, osd.6, osd.3

同一个PG内的osd通过heartbeat相互检查对方状态,大部分情况下不需要mon参与,减少了mon负担

mon

相当于Ceph的zookeeper

mon quorum负责整个Ceph cluster中所有OSD状态(cluster map),然后以增量、异步、lazy方式扩散到each OSD和client
mon被动接收osd的上报请求,作为reponse把cluster map返回,不主动push cluster map to osd
如果client和它要访问的PG内部各个OSD看到的cluster map不一致,则这几方会首先同步cluster map,然后即可正常访问

MON跟踪cluster状态:OSD, PG, CRUSH maps
同一个PG的osd除了向mon发送心跳外,还互相发心跳以及坚持pg数据replica是否正常

Replication

是primary-replica model, 强一致性,读和写只能向primary发起请求
read write
read write
其中replicas在复制到buffer时就ack,如果某个replica复制时失败,进入degrade状态

2PC Write

write 2PC

1
2
3
4
5
6
phase1: client从primay接收ack
此时,数据已经replicate到每个replica的内存
phase2: client从primary接收commit
此时,数据已经replicate到每个replica的磁盘
client才把本地的write buffer cache清除

scrub机制

read verify

Questions

consistency?

strongly consistent

Can Ceph replace facebook Haystack?

Haystack解决的实际上是metadata访问造成的IO问题,解决的方法是metadata完全内存,无IO
每个图片的read,需要NetApp进行3次IO: dir inode, file inode, file data
haystack每个图片read,需要1次IO,metadata保证在内存
在一个4TB的SATA盘存储20M个文件,每个inode如果需要500B,那么如果inode都存放内存,需要10GB内存

  • 减少每个metadata的大小
    去掉大部分不需要的数据,例如uid, gid, atime等
    xfs里每个metadata占536字节,haystack每个图片需要10字节
  • 减少metadata的总数量
    多个图片(needle)合并进一个大文件(superblock)

用户上传一张图片,facebook会将其生成4种尺寸的图片,每种存储3份,因此写入量是用户上传量的12倍。

Posix规范里的metadata很多对图片服务器不需要,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
struct inode {
loff_t i_size; // 文件大小
struct list_head i_list; // backing dev IO list
struct list_head i_devices;
blkcnt_t i_blocks;
struct super_block *i_sb;
struct timespec i_atime;
struct timespec i_mtime;
struct timespec i_ctime;
umode_t i_mode;
uid_t i_uid;
gid_t i_gid;
dev_t i_rdev;
unsigned int i_nlink;
...
}

Ceph解决了一个文件到分布式系统里的定位问题(crush(pg)),但osd并没有解决local store的问题: 如果一个osd上存储过多文件(例如10M),性能会下降明显

Why EC is slow?

Google Colossus采用的是(6,3)EC,存储overhead=(6+3)/6=150%
把数据分成6个data block+3个parity block,恢复任意一个数据块需要6个I/O
生成parity block和恢复数据时,需要进行额外的encode/decode,造成性能损失

Store small file, waste space?

object size是可以设置的

References

http://ceph.com/papers/weil-crush-sc06.pdf
http://ceph.com/papers/weil-rados-pdsw07.pdf
http://ceph.com/papers/weil-ceph-osdi06.pdf
http://www.xsky.com/tec/ceph72hours/
https://blogs.rdoproject.org/6427/ceph-and-swift-why-we-are-not-fighting
https://users.soe.ucsc.edu/~elm/Papers/sc04.pdf

Share Comments

Dynamo - A flawed architecture

Author

Joydeep Sen Sarma,印度人,07-11在facebook担任Data Infrastructure Lead,目前在一个创业公司任CTO
在facebook期间,从0搭建了数千节点的hadoop集群,也是Cassandra的core committer,参与开发Hive
https://www.linkedin.com/in/joydeeps/

编写在2009年,Dynamo paper发布于2007

‘Flaws’

R+W>N

作者主要以Cassandra的实现来评论Dynamoc论文,从而忽略了vector clock逻辑时钟来跟踪数据版本号来检测数据冲突
因此,遭到很多围观群众的声讨

作者的主要思想是:由于hinted handoff,缺少central commit log,缺少resync/rejoin barrier,transient failure会导致stale read

Cassandra解决办法,是尝试通过central commit log,那就变成了Raft了
https://issues.apache.org/jira/browse/CASSANDRA-225

作者如果提到下面的场景,可能更有说服力

1
2
3
4
5
6
7
8
9
10
// W=3 R=3 N=5(1,2,3,4,5)
Op W R
==== ===== =====
a=5 1,2,3
3,4,5 // 由于有节点overlap,写入的数据可以读出来
2,3crash
del(a) 1,4,5
2,3recover
1,2,3 // 2,3读出a=5,而1没有key(a),如何处理冲突?

此外,由于sloppy quorum,下面的场景R+W>N也一样无法保证一致性

1
2
3
4
5
6
7
8
9
10
11
12
// W=3 R=3 N=5(1,2,3,4,5)
ring(A, B, C, D, E, F, G, A)
client set(k1)=3 // md5(k1),发现它的coordinator是A,client send req to A
A会先写本地磁盘,然后并发向B,C复制,成功后,return to client ok
现在A, B, C全部crash
set(k1)=8 // 由于hinted handoff,它的coordinator变成D
D写盘、复制到E,F,ok。// 当然D,E,F里的数据有hint metadata,表示他们只是代替A,B,C
// 等A,B,C活,再transfer to them and cleanup local store
然后A, B, C全部recover
get(k1),读到是3,而不是8
// 因为D,E,F的检测A/B/C活以及transfer hinted data是异步的

WAN

如果一个node crash,’hinted handoff’会把数据写入一致性哈希环上的下一个node:可能是另外一个DC node
等crashed node恢复,如果网络partitioned,这个node会很久无法赶上数据,直到partition解除

但preference list里,已经考虑到了,每个key都会复制到不同的机房

论文里的”facts”

  • 论文提到,商业系统通常通过同步复制保证一致性
    大部分数据库的复制都是异步的
  • 论文提到,中心化架构降低availability
    不对,中心化会造成扩展瓶颈,并不会降低可用性,SPOF有非常多的方法解决

Werner Vogels的回复

Dynamo SOSP论文有2个目的

  • 展示如何利用多种技术创建一个生产系统
  • 对学术界的一个反馈,学术到生产环境遇到的困难和解决办法

本论文不是一个blueprint,拿它就可以做出一个Dynamo clone的

我认为我的论文真正的贡献,是让人设计系统时的权衡trade-off

Dynamo回顾

Java实现的,通过md5(key)进行partition,由coordinator node复制read/write/replicate到一致性哈希虚拟节点环上
gossip进行membership管理和健康检查,提出了R/W/N的quorum模式

  • no updates are rejected due to failures or concurrent writes
  • zero-hop DHT
    为了latency,每个node有全局的路由信息,不产生hop
  • resolve update conflicts,在read时由应用做,write时不检测
    • 这与大部分系统相反,原因是为了always writeable
    • read时,让应用处理冲突,而不是Dynamo store做
      store做,由于抽象层,只能简单的类似last win的策略
      应用更懂,可以做类似union等高级处理策略
  • md5(key) => target nodes
    hash conflict? 可以不用考虑,无非就是让一台机器多处理了一个key而已
  • vector clock
    [(node, counter), (node, counter), …]
    get(key)返回的ctx,在put(key, ctx, val)时会带过去:ctx里包含该key的vector clock信息
  • R W N
    get/put latency都取决于R/W里最慢节点的latency
  • get(key)
    如果没有冲突,返回一个值;发现冲突,返回list
    返回的每个value都有对应的一个context(vector clock version)
  • replication
    • write coordinator
      put(key, ctx, val)根据md5(key)计算出coordinator node,它负责写入本地磁盘,同时向顺时针后面的N-1个alive节点进行复制
      由于后面N-1节点可能come and go,它是动态的,coordinator盲目地找出后面活着的:这样才能有高可用 sloppy quorum
  • coordinator and replication
    负责生成新的vector clock,并把(key, val, vc)写入本地
    同时向顺时针后面的N-1个healthy节点进行复制(dead nodes跳过)
    但由于虚拟节点的存在,需要保证复制到的节点不在一台机器上
    preference list是一个key对应的storage node,在ring上,如果顺时针发现有机器重叠就忽略并继续顺时针找
  • 节点的加入和退出
    手动显示
  • local persistence engine
    插件架构,amazon主要使用的是BerkelayDB,但也可以使用mysql/BDB Java等
  • SLA
    99.9% 300ms

Replication

每个key都被复制在N台机器(其中一台是coordinator),coordinator向顺时针方向的N-1个机器复制
preference list is list of nodes that is responsible for storing a particular key,通常>N,它是通过gossip在每个节点上最终一致

1
2
3
4
5
6
7
8
9
10
11
12
13
14
okN = 0
for node = range preferenceList {
// 论文中没有提到复制是顺序还是并发
// 如果顺序,latency是个问题
// 如果并发,为了HA和latency,>N个节点会进行复制,造成空间浪费
if replicateTo(node) == ok {
okN++
}
// all read/write are performed on the first N healthy nodes from pref list
if okN == N {
break
}
}

Merkle树

1
2
3
4
5
6
7
8
9
node ring(A, B, C, D, E, A), W=3
A crash
那么本来应该复制到A的数据,会复制到D
由于membership change是显示的,D知道这个数据是它临时替A代管的,它会写到临时存储,并携带hint(A) meta info,并定期检查A是否recover
如果A ok,D会把本来该写到A的数据transfer给A,成功后,本地删除
但,如果D永久crash,而A recover,那么这些hinted data,A就无从transfer了
此时,通过Merkle进行检查、增量同步
但它的问题是当有node进入、离开时,这个树要重新创建,在key多的时候非常费时,因此,它只能异步后台执行

IDCs

只是在preference list上配一下,架构上并没有过多考虑
仍然是coordinator负责复制,首先本地机房,然后远程机房

References

http://jsensarma.com/blog/?p=55
http://jsensarma.com/blog/?p=64
https://timyang.net/data/dynamo-flawed-architecture-chinese/
https://news.ycombinator.com/item?id=915212
http://www.allthingsdistributed.com/files/amazon-dynamo-sosp2007.pdf

Share Comments

MapReduce - A major step backwards

Author

都是数据库领域的权威

  • Michael Stonebraker
    美国工程院院士,冯诺依曼奖的获得者,第一届SIGMOD Edgar F. Codd创新奖的得主,曾担任Informix CTO
    他在1992年提出对象关系数据库模型,在加州伯克利分校任计算机教授达25年,目前是麻省理工学院教授
    是SQL Server/Sysbase奠基人,87年左右,Sybase联合了微软,共同开发SQL Server。1994年(7y),两家公司合作终止
    可以认为,Stonebraker教授是目前主流数据库的奠基人
  • David J. DeWitt
    美国工程院院士
    In 1995, he was named a Fellow of the ACM and received the ACM SIGMOD Innovations Award
    In 2009, ACM recognized the seminal contributions of his Gamma parallel database system project with the ACM Software Systems Award
    IEEE awarded him the 2009 IEEE Piore Award
    He was also a Technical Fellow at Microsoft, leading the Microsoft Jim Gray Systems Lab at Madison, Wisconsin.

Background

2008年1月,一个数据库专栏读者让作者表达一下对MapReduce(OSDI 2004)的看法而引出

Viewpoint

MapReduce是历史后退

从IBM IMS的第一个数据库系统(1968)到现在的40年,数据库领域学到了3个经验,而MapReduce把这些经验完全摒弃,仿佛回到了60年代DBMS还没出现的时候

  • schema is good
    防止garbage进入系统
    MapReduce由于涉及到(k, v),一定也是可以用schema表达的
  • 要把schema从应用中decouple
    存放在catalog里,而不是hard coding在应用层
  • high level query is good
    70年代就充分讨论过应用通过SQL那样的语言访问DBMS好,还是更底层的语言
    MapReduce就像是DBMS的汇编语言

MapReduce的实现糟糕

他们应该好好学学25年来并行数据库方面的知识

1. Index vs Brute force

在非full scan场景下,index必不可少。此外,DBMS里还有query optimizer,而MapReduce里只能brute force full scan

MapReduce能提供自动的并行执行,但这些东西80年代就已经在学术界研究了,并在80年代末出现了类似Teradata那样的商业产品

2. Record Skew导致性能问题

David J. DeWitt在并行数据库方向已经找到了解决方案,但MapReduce没有使用,好像skew问题根本不存在一样:
Map阶段,相同key的记录数量是不均衡的,这就导致Reduce阶段,有的reduce快、有的慢,最终Job的执行时间取决于最慢的那一个

3. 吞吐率和IO问题

1
2
3
4
// map=1000 reduce=500
1000个map实例,每个map进程都会创建500个本地文件
reduce阶段,每个reduce都会到1000个map机器上PULL file
这会导致每个map机器上500个文件进行大量的random IO

而并行数据库,采用的是PUSH模型。而MapReduce要修改成PUSH模型是比较困难的,因为它的HA非常依赖它目前的PULL模型
MapReduce uses a pull model for moving data between Mappers and Reducers

(Jeff:
承认了这个问题的存在。但google在实现的时候,通过batching, sorting, grouping of intermediate data and smart scheduling of reads缓解了这个问题
只所以使用PULL MODEL,是考虑到容错。大部分MR Job都会遇到几次错误,除了硬件、软件错误外,Google内部的调度系统是抢占式的:它可能会把M/R kill以疼出资源给更高优先级的进程,如果PUSH,那么就需要重新执行所有的Map任务)

MapReduce并不新颖

他们以为他们发现了一个解决大数据处理的新模式,实际上20年前就有了

“Application of Hash to Data Base Machine and Its Architecture”/1983,是第一个提出把大数据集切分成小数据集的
Teradata使用MapReduce同样的技术卖他们的商业产品已经20多年了

就算MapReduce允许用户自己写map/reduce来控制,80年代中期的POSTGRES系统就支持用户自定义函数

缺少DBMS已经具备的功能

  • Bulk loader
  • Update
    MapReduce is read-only
  • Transaction
  • Views
    Hbase处已经提了
  • Integrity constraints
    Schema处已经提了
  • Referential integrity
    防止garbase进入系统

与DBMS工具不兼容

  • BI tools
  • Data mining tools
  • Replication tools
  • ER modelling

SQL on hadoop

作者已经发行类似Pig那样的系统

Comments

作者好像觉得MR是用来替代DBMS的

  • MapReduce与DBMS的定位不同,它不是OLTP
    Index好,但cost也很高,在MR里index只会是成本而没有收益
    用正确的工具解决不同的问题
  • Be a lover, not a fighter!
  • Google本身已经证明了它的scalability
  • With really large datasets and distributed sytems the RDBMS paradigms stop working
    and that is where a system like Mapreduce is needed
  • 数据库那些人的问题:什么东西都是数据库
  • 作者应该再写个“Airplanes: A major step backward”

Jeff Dean在ACM of communication上面的回馈

http://cs.smith.edu/dftwiki/images/3/3c/MapReduceFlexibleDataProcessingTool.pdf

  • MR可以通过实现Reader/Writer来支持更多的存储系统
    而并行数据库,必须要把数据load进系统后才能开始计算
  • MR可以使用index,它并不一定总是full table scan
  • MR可以完成比SQL更复杂的查询
    UDF可以帮助DBMS,但目前商业产品上的实现要么缺功能,要么buggy

References

http://database.cs.brown.edu/projects/mapreduce-vs-dbms/

Share Comments

conferences and journals

Overview

按照USnews的分类,Computer Science被分为四个大类:AI, Programming Language, System, Theory

PSU的CiteSeer(又名ResearchIndex),有各个会议的影响因子,算是比较权威的

System

包括OS、Architecture、Network、Database等

OS

OSDI、SOSP,这两个是OS最好的会议,每两年开一次,轮流开

  • SOSP: The ACM Symposium on Operating Systems Principles
    由ACM下属的SIGOPS(special interest group on operation system)于1967年创办,每届收录20篇
    GFS就是发表在SOSP 2003的, Dynamo是SOSP 2007
  • OSDI: USENIX Symposium on Operating Systems Design and Implementation
    USENIX是一个于1975年成立的Advanced Computing Systems Association
    OSDI是USENIX 1994创办的,每届会议举行3天,每届收录27篇文章,每个小方向3篇
    MapReduce是发表在OSDI 2004的

Architecture

  • ISCA
  • HPCA
  • MICRO

Database

数据库方向的三大顶级会议,SIGMOD和VLDB算是公认第一档次

  • ACM SIGMOD
  • VLDB:International Conference on Very Large Data Bases
  • ICDE:International Conference on Data Engineering

Security

  • IEEE Security and Privacy
  • CCS: ACM Computer and Communications Security
  • NDSS (Network and Distributed Systems Security)

References

http://blog.sina.com.cn/s/blog_b3b900710102whvj.html
http://sigops.org/sosp/sosp15/current/index.html

Share Comments

committee

技术委员会

Share Comments