BigData related

Open Source

  • Storage
    • HDFS
      • RCFile
      • ORCFile
      • Parquet
    • Kudu
  • NoSQL
    • Cassandra/DynamoDB
    • Hbase
    • Voldemort
    • Espresso
  • SQL-on-hadoop
    • Hive(on MR)
      SQL解析,物理执行是通过生成map/reduce job完成的
    • ad-hoc query to group,filter,aggregate data
      • Presto
        Facebook开发的Hive,但后来放弃了,转而开发了Presto。
        一个Presto query可以跨越多个数据源
      • Impala(Dremel的启发下开发的C++)
        Cloudera虽然支持Hive,但自己开发了Impala。底层HDFS存储Parquet,建议用Kudu替换HDFS。
        Kudu是columnar datastore,但不提供SQL解析执行,SQL部分由Impala完成。
        定位于短查询,如果节点失效,查询会重头开始,没有fault-tolerant。
      • SparkSQL(Shark)
        Shark最初是在Hive的代码基础上进行的,后来重新实现了:SparkSQL
      • Stinger/Tez
      • Hive-on-Spark/Hive-on-Tez
        Hortonworks
      • Google Dremel
        只能查询一个table,不能join
        • Apache Drill
        • Druid
        • IBM BigSQL
  • Stream Processing
    • Storm
    • Heron
    • Spark mini batch
    • Flink
    • Samza
  • ETL
  • Network OS
    • yarn
    • mesos
    • k8s
    • Apache Tez
      一个Hadoop DAG技术框架,解决MR的问题一个替代方案,依赖YARN
  • tools
    • sqoop

序列化

  • thrift
  • avro
  • hessian
  • protocol buffer

Apache Kylin

MOLAP engine built on Hive

OLAP

Slice/Dice/Drill-down/Roll-up/Pivot

  • ROLAP
  • MOLAP
  • HOLAP

Cube

Cube = all combination of dimensions
Cuboid = one combination of dimensions(all cuboids)

Cube is immutable!

  • One fact table
    has ever growing records
  • A few dimension tables
    relatively static, like users and products
  • Hive tables must be synced into Kylin first
  • Measures
    • sum
    • count
    • distinct count(HyperLogLog)
    • avg
    • max
    • min
  • Incremental Build
    • Segment
      • 与ES的实现思路类似
        • query时aggregate
        • merge small cubes into a larger one
      • cube is immutable
    • 只支持按时间维度

Visualization

  • saiku
  • Caravel
  • Zeppelin
  • superset
  • Cboard
Share Comments

知名公司功能模块的实现笔记

Storage

微信支付的交易记录

之前kv,每个用户一个key(相当于redis list),这样问题是:

  • value会大
  • 无法根据条件filter value

改进后:
没有用户多个value,其中1个root value,保存metadata,其他value为data
多value解决了以前单value大的问题,但:

  • 多了一次请求
    先root,再value
  • root成为新瓶颈
    • 可以把root也变成一个link list
      按照时间倒排,新的是head,老的是tail
    • 但越以前的数据,越慢

数据库的高可靠、强一致

  1. 数据库异步复制,主备数据无法保证一致
    主死后,判断哪些用户最近没有做过主库update,他们可以在从库上进行新的操作;否则,在数据库恢复前不允许写操作

  2. Failover库
    订单编号里有1位,表示存放在主库还是failover库。当主死了,健康检查发现,通知订单编号生成器,它修改该bit,DAL会
    把后续的数据写入failover库。
    主库恢复后,健康健康发现,修改bit,DAL后续继续写主库。
    经过一段时间后,主库+failover库的记录都会迁移到历史库。
    failover机制,使得主备切换期间,仍然可以对外服务,用户不中断

  3. 业务数据可以归为3大类

  • 状态型
    读写比例差不多
  • 流水型
    例如订单
  • 配置型
    读多写少,不要求严格的一致性

算法

biparties graph(二部图/二分图)

对多种查询条件进行归并缓存,提高缓存命中率

查询条件:

1
2
3
4
(row1, row5) => (field3, field7)
(row2) => (field2, field4)
(row3) => (field3, field5)
(row4, row6) => (field1, field4, field8)

利用二部图把图切分,把零散查询归并为少数集中的查询

M$ Cloud Design Pattern

  • cache-aside
  • ciruit breaker
  • compensating transation
  • competing consumers
  • CQRS
  • event sourcing
  • external configuration store
  • federated identity
  • gatekeeper(gateway)
  • health endpoint monitoring
  • index table
  • lead election
  • materialized view
  • pipes and filters
  • priority queue
  • queue-based load leveling
  • retry
  • runtime reconfiguration
  • scheduler agent supervisor
  • sharding
  • static content hosting
  • throttling
  • valet key
Share Comments

OtterTune

DB tuning问题

传统方法:DBA把生产库copy到另外一个机器,回放sql,根据metrics尝试调节某一个参数看效果,再尝试多个

  • 数据库的调优参数过多,通常都是数百个,而且新的版本会增加更多参数
  • 同一个db的参数之间是互相依赖的
  • 依赖于运行环境
    例如,增加innobuffer,在某些条件下是正回馈,但如果物理机内存开始swap,增加它会起副作用
  • 跟业务数据有关

OtterTune解决办法

通过controller收集数据库的参数和负载metrics以及hardware profile,定期向中央报告。
分析系统从中央取得原始数据,进行分析,提供参数配置建议,生效后,持续分析比较,找出合适的参数配置

target workload

  • latency
  • throughput

Controller

就是agent,定期把通过配置文件配置的数据库实例的paramters & metrics通过HTTP POST以JSON格式上传
同时,它是trial and error的执行者,对参数不断的尝试取得样本数据,它必须有修改数据库参数权限,甚至restart db

PostgreSQL

  • SELECT version()
  • parameters
    SHOW ALL
  • metrics
    • select * from pg_stat_archiver/pg_stat_bgwriter/pg_stat_database/pg_statio_user_indexes/…

分析

  • 通过factor analysis(FA)对收集来的metrics进行降维
    例如,read_in_bytes/read_in_kbytes
  • 通过k-means对metrics进行聚类
  • 通过Lasso线性回归,来发现哪些参数对target workload有重大影响
  • workload mapping
    通过Euclidean计算目标workload与历史采样数据的相似度,找出最相似的
  • 推荐配置
    通过GaussianProcess(GP)回归进行训练
  • controller接受推荐的配置apply on DB,并收集新的数据

References

http://db.cs.cmu.edu/papers/2017/p1009-van-aken.pdf
https://github.com/cmu-db/ottertune

Share Comments

TimescaleDB

time series数据的现有方案问题

  • 传统RDBMS
    • 无法支持high ingest rate,尤其index无法完全放入内存后
    • delete的成本高
      时序数据是有retention的
  • NoSQL和time series database
    Cassandar, MongoDB
    OpenTSDB, InfluxDB
    • 通常缺乏丰富的查询接口,复杂查询是高延时
  • Hadoop/Spark
    ingest rate可以高,但查询慢

TimescaleDB解决办法

Intro

PostgreSQL上的一个插件,目前只支持单机部署,cluster功能还在开发
因此,PQ具备的功能它都有,此外还针对tsdb提供了一些方便的函数

  • hypertable
    相当于多个chunk(物理)上的逻辑统一
  • chunk
    相当于shard,物理属性,通过time interval和[partition key]进行路由
    chunk就是PQ的table

解决方法

high inject rate

传统mysql/postgresql,受限于index,如果无法放到内存,性能会大大降低

在100亿条数据的hypertable,单机单磁盘,仍然可以享受10万插入/秒(in batch).

TimescaleDB solves this through its heavy utilization of time-space partitioning, even when running on a single machine.
So all writes to recent time intervals are only to tables that remain in memory, and updating any secondary indexes is also fast as a result.

retention

删除数据时,不是delete by row,而是delete by chunk,把整个chunk(table)删除就快了

SELECT drop_chunks(interval '7 days', 'conditions');

chunk partition

目前不支持adaptive time intervals,需要在创建hypertable时手工指定chunk_time_interval(默认1个月)

Share Comments

blockchain

Overview

目前公开的bitcoin只能支持每秒7笔的吞吐量,一般对于大额交易来说,安全的交易确认时间为1小时(6个block生成时间)

中本聪设计比特币时,为区块设置了1MB的容量限制,使每一个区块只能容纳4096个交易;同时,工作量证明机制使得确认交易并将交易记录到区块链中需要约10分钟

bitcoin是一种通货紧缩货币,到2140年到达最大值:2100万个比特币

blockchain的所谓去中心化,实际上是建立一个少数服从多数的分布式权威网络,只要这个网络上节点足够多,就是可以信赖的
以前,用户trust银行;现在,用户trust majority nodes in blockchain network

Bitcoin accounts: address
public key: 1MKe24pNsLmFYk9mJd1dXHkKj9h5YhoEey
private key: 5KkKR3VAjjPbHPzi3pWEHVQWrVa3C4fwD4PjR9wWgSV2D3kdmeM

Currently, the top 10 mining pools consistently create about 90% of the blocks, and China-based pools create more than 60% of the blocks.
所以,虽然bitcoin设计上是去中心的,但目前的现状是实际上控制在少数人手里

miners

bitcoin

  • Blocks
    481,823
  • Total BTC
    16.523M
  • Difficulty
    923233068449
  • Network total
    7,983,858 trillion hash per second
  • Blocks/hour
    7.25

Key Design

BlockChain是以牺牲效率和性能为代价但换来了可靠的安全的最终一致性的分布式数据库

  • 与数据库不同,它不保存状态,只保存commit log,而通过commit log来计算出状态
    bitcoin的blockchain里并不保存balance,只有[]transaction(inputs, outputs, sender, receiver)
  • no range query
  • blockchain可以想象成只有1个table的replicated database
  • each transaction must commit before next transaction can enter proposal phase
    • throughput bottleneck

关键技术

  • P2P
    节点发现和通信
  • hash, 非对称加密
    sender: sign(privatekey_of_sender, publickey_of_receiver)
  • merkle tree
  • wallet app

Block

blockchain

Merkle Tree

每一个block有一个merkle树,叶子节点是transaction,每个中间节点有2个指针(2叉树)
如果修改某一个transaction叶子,就会传递到tree root

merkle tree,目前还没有在bitcoin中使用,但它的作用是:
确认一个交易是否被confirm,只要下载block header+merkle tree,而不用下载整个block

Proof of Work

sha256(‘pay me, joe, 25 bitcoins. 000001’) => 4699f0e443b73ddd6fdc14d4662395361fa21f54e647c64e643a49c54fef511c
sha256(‘pay me, joe, 25 bitcoins. 000013’) => 0ac09823cf2309c63d425c21b1c3d83f6dbc4b8acfb80b37b2db3d544192b7e9

猜到第13个nonce,就得到1个0开头的hash
bitcoin目前的难度是需要14个0开头,实际上每增加1个0,就把原来的计算量提高了16倍

bitcoin sha256(input + nonce),其中input包括

  • computer code
  • receiving address(public key)
  • transactions
  • timestamp
  • prev hash

Sybil Attack

女巫攻击,来源是70年代的一部叫做《Sybil》的美国系列片。片中的女主角人格混乱,扮演着16个角色。

Transaction

1
2
3
4
5
6
7
8
9
1. client send payment to replica nodes, proposing new log entries
2. proposal会在replica nodes间进行广播(unconfirmed transaction),进行leader election: proposal phase
transaction临时保存在node(miner)的内存
3. 通过PoW,leader胜出,本地append到blockchain,并广播到其他replicas
miner之间是竞争关系,都在正确尽快solve the puzzle to be leader
miner会把unconfirmed transactions进行合并(batch),生成到一个block(一个block最多4096个transactions)
4. 所有replicas append to local blockchain,transaction confirmed
block chain实际上实现了global transaction time order

Block Acceptance

收到miner发来的新block后,node会very block, verify each transaction, verify not spent
都成功,就加入local blockchain

如果local blockchain没有与main blockchain同步,那么verify block会失败,它会通过P2P请求同步,把unsynced blocks追上:

1
2
3
4
node1: block 1, 2, 3, 4
main blockchain: 1, 2, 3, 4, 5, 6, 7
此时node2成功创建了block8,并广播给node1
node1发现block 5-7还没有同步过来,先进行同步,然后再把block8 append to local blockchain

Confirm

确认数 = 该交易所在的block在blockchain中后面的block数量 + 1
如果确认数 >= 6,那么就可以认为无法逆转了

Consensus

通过PoW,每次生成block时进行一次leader选举,由leader生产new block,猜数成功的node自认为成为了leader,然后通过P2P广播(gossip)
由于猜数比较困难,多节点同时成为leader并且在接收到其他leader广播前猜成功的可能性非常小,但还是存在可能性,即多主,这就需要解决冲突

miner一旦recv a new block,就意识到在这个block的race上自己输了,它会立即把一些pending transactions和这个收到的block作为prev hash来构建下一个block

Conflict Resolve

采用longest chain rule,发现冲突时,block chain fork branch,在发现longest chain时,把short chain删除:但可能会造成confirmed transaction lost

conflict
resolve

Double Spent

double-spent

Public keys as identities

Alice pays Bob,Alice会把这个交易通过private key进行签名,广播给miners

  • Bob不需要参与
  • 交易信息里只包含Alice和Bob的public keys(address)
  • You can create a new identity at any time by generating a new key pair
    with no central authority or registry

验证身份verify

signature

比特币的所有权是通过数字密钥、比特币地址、数字签名来确立的
密钥实现了去中心化的信任,所有权认证

创建交易时,payer利用自己的private key给transaction签名,同时把自己的public key存放在消息里:(payer_public_key, signature)
miners通过这个信息,就可以verify这个transaction确实是payer发出的
同时,transaction里也包含了payee的public key信息,只有payee利用他的private key才能解开

key pair,非对称加密的特性:

  • 用私钥加密的内容,只能用公钥解密
    如果能通过(payer_public_key, signature)能解密,就证明了payer的身份
  • 用公钥加密的内容,只能用私钥解密
    只有payee才能用他的私钥解密交易
    每个交易必须有效的签名才能被存入ledger
    当payee花销这笔钱时,他必须给这个交易签名,由于这笔钱已经在ledger里记录了payee的公钥,只有payee才能签名
  • 公钥用于收钱,私钥用于花钱时生成数字签名
  • 通过私钥能计算出公钥,但反过来不行
    只要私钥不丢,公钥丢了也没关系

如果我知道Alice的public key(X),然后创建一笔交易:X支付给me 1BTC,系统是怎么知道有诈的?
首先我不知道X对应的私钥,只能拿我的私钥对交易加签名,miner通过X和signature就能验证:invalid signature

Upgrade

https://github.com/bitcoin/bips

BIP(bitcoin improvement proposal)在github上发布,miners自愿地下载、启动,但不是强制的

Q & A

为什么blocks通过hash进行chain,而不是通过普通的编号?

因为hash具有不可篡改性,hash本身包含了内容的指纹

同时有很多client进行交易,full node一直在瞎忙活?

T1, T2, Tn被分发到n个full node,那么每个node都开始猜数,其中一个node(N1)猜中,开始广播,此时N2~Nn还在为自己的Tn进行猜数,发现new block,就停下手中的活,重新生成新的block,并重新猜数
如果N1在广播时,消息还没有传到Nx,而此时Nx的猜数工作是不会停的;如果此时Nx也猜数成功,那么在它还没有收到N1广播前,就会广播自己的new block,此时fork出现
对于某一个full node,它可能会并发接收多个交易请求,它会进行串行化

如果识别一个account

通过wallet程序,每次交易都可以生成新的public/private key pair,由wallet管理并保存。如果wallet的数据丢了,那么bitcoin就无法证明是你的了

如何动态维护PoW的难度?

每个节点计算最近2016个solution,因为频率应该是10分钟一个,因此2016个,应该是2016/(6*24)=14天
而每个block里都有timestamp

1
2
3
4
elapsed = timestamp(block(-2016)) - timestamp(block(-1))
// 例如,假如elapsed = 7 day,那么说明难度系统太低了
// 即,移动平均数
difficulty = difficulty * 2

如何保证bitcoin address唯一?

Bitcoin address是由34个alphanumerical组成的字符串,但排除了O/T/l/0,它的空间是58^34,即
904798310844700775313327215140493940623303545877497699631104
但里面有几位用于checksum,它的实际空间是2^160,即
1461501637330902918203684832716283019655932542976

一个恶意node可以做什么,不能做什么?

它能

  • Refuse to relay valid transactions to other nodes
    但其他节点会replay
  • Attempt to create blocks that include or exclude specific transactions of his choosing
  • Attempt to create a ‘longer chain’ of blocks that make previously accepted blocks become ‘orphans’ and not part of the main chain

它不能

  • Steal bitcoins from your account
  • Make payments on your behalf or pretend to be you

为什么建议每次transaction都新生成key pair?

公钥虽然没有安全问题,但有隐私,毕竟它属于某一个人,如果我的交易都用一个key pair,那么最终这些交易可以发现是一个人的行为

如果写错了接受者payee公钥,这笔钱能回来吗?

首先,公钥(bitcoin address)是有校验位的,写错的时候,基本上就可以在提交时离线发现
如果恰巧校验一致,而地址是不存在的,那么交易会成功,但由于那个payee不存在,也就不存在对应的private key,也就无法spend it:黑洞,那笔钱永远消失了

如果本来我想给A钱,却输入时写成了B的address,那么:
Bitcoin transactions are not reversible. Sending to the wrong person cannot be undone.

Block timestamp如果做假怎么办?

A timestamp is accepted as valid if it is greater than the median timestamp of previous 11 blocks, and less than the network-adjusted time + 2 hours.
“Network-adjusted time” is the median of the timestamps returned by all nodes connected to you.

Whenever a node connects to another node, it gets a UTC timestamp from it, and stores its offset from node-local UTC. The network-adjusted time is then the node-local UTC plus the median offset from all connected nodes. Network time is never adjusted more than 70 minutes from local system time, however.

如果真有人控制了51%计算能力,会发生什么?

attacker只能把他刚花的钱payment取消掉,即double spent

据说bitcoin最大吞吐量是7 TPS,怎么计算来的?

每个block最大1MB,4096个transaction,10分钟产生一个block

4096/(10*60) = 6.83 = 7

miner做恶的惩罚

miner竞争成功后,创建一个block,它会获得奖励;如果它发布一个非法的block,那么大部分miners会拒绝;并在下一个block时,该非法block被取消,同时它的奖励也被取消。除非,它能保证它一直是竞争成功

seed nodes?

hard coded
https://github.com/bitcoin/bitcoin/blob/863e995b79ec388bf292d80f181912d01e20e2e5/src/net.cpp#L1198

1
2
3
4
5
6
7
8
unsigned int pnSeed[] =
{
0x959bd347, 0xf8de42b2, 0x73bc0518, 0xea6edc50, 0x21b00a4d, 0xc725b43d, 0xd665464d, 0x1a2a770e,
0x27c93946, 0x65b2fa46, 0xb80ae255, 0x66b3b446, 0xb1877a3e, 0x6ee89e3e, 0xc3175b40, 0x2a01a83c,
0x95b1363a, 0xa079ad3d, 0xe6ca801f, 0x027f4f4a, 0x34f7f03a, 0xf790f04a, 0x16ca801f, 0x2f4d5e40,
0x3a4d5e40, 0xc43a322e, 0xc8159753, 0x14d4724c, 0x7919a118, 0xe0bdb34e, 0x68a16b2e, 0xff64b44d,
// 列出了500多个节点
}

References

https://bitcoincharts.com/
https://blockchain.info/
https://en.bitcoin.it/wiki/Vocabulary
http://www.cs.rice.edu/Conferences/IPTPS02/101.pdf
https://www.cryptocompare.com/wallets/guides/how-to-create-a-bitcoin-address-from-a-public-key/

Share Comments

perceptual hash

这种算法的优点是简单快速,不受图片大小缩放的影响,缺点是图片的内容不能变更。如果在图片上加几个文字,它就认不出来了。所以,它的最佳用途是根据缩略图,找出原图。

实际应用中,往往采用更强大的pHash算法和SIFT算法,它们能够识别图片的变形。只要变形程度不超过25%,它们就能匹配原图。这些算法虽然更复杂,但是原理与上面的简便算法是一样的,就是先将图片转化成Hash字符串,然后再进行比较。

比较图片的相似度
图片尺寸可以不同,不同的长宽比例,小范围的颜色不同(亮度、对比度等)
对旋转都不具有鲁棒性:但可以sample argument

1
2
3
4
5
6
7
8
9
10
11
12
13
img = resize(img, (8, 8))
img.grayscalize(level=64)
avg = img.average() // e,g. 78
for i, b = range img { // (8, 8) => img[0...63]
if b >= avg {
img[i] = 1
} else {
img[i] = 0
}
}
// img现在就是一个64位的二进制整数, 这就是这张图片的指纹
HanmingDistance(img1, img2) // 汉明距离,如果小于5就说明两副图相似
Share Comments

GFS evolution and BigTable

GFS

GFS,2001年开发出来,3个人,1年,论文发表于2003
BigTable,2004年开发出来,论文发表于2006

master

  • replicated operation(redo) log with checkpoint to shadow masters
    • 当时没有自动failover,全是人工操作DNS alias
    • read请求可以使用shadow masters
    • 生成checkpoint时,master切换到新的redo log file,创建新线程生成checkpoint
      在此过程中发生的变化,记录到新redo log file
    • operation log保证了mutation的全局有序
    • save checkpoint
      很可能是通过mmap+msync
  • bottleneck?
    • 当时部署了多个cluster,根据业务属性
      最大的超过1000节点,300TB
    • 过高的OPS
      由于redo log需要复制,master的tps也就在1万左右
      • client cache
        何时invalidate and recall master?
        • primary unreachable
        • primary reponse: no lease
      • client batch request
      • lease
    • 内存容量
      • prefix compression to reduce memory footprint
      • 每个chunk(64MB),metadata占内存64B
        如果保存1百万个文件(64TB),需要64MB内存
        如果保存10亿个文件(64PB),需要64GB内存
        实际上是到了5千万个文件,10PB的时候,master已经到达瓶颈了

metadata

1
2
3
4
5
6
7
8
9
10
11
type metadata {
filename string // prefix compression
owner uid
perm acl
chunks []chunk {
chunk_id int64
version int
refCount int // for snapshot COW
chunkservers []string // ip addr
}
}

delete

rename to hidden file,保留3天,之后真正删除

write/mutation

  • operation log replicated between master and shadows
  • client write data flow to pipelined chain chunkservers
  • primary write control flow to secondary chunkservers

read(filename, offset, size)

1
2
3
4
5
6
7
8
9
client根据(offset, size)计算出需要哪些chunks,假设chunk1, chunk2
client把(filename, chunk index [1, 2])类似batch一样发送master
master返回{chunk1: {chunkId1, [ip1, ip2, ip3]}, chunk2: {chunkId2, [ip4, ip5, ip6]}}
client.putInCache(metadata)
client顺序地获取chunk1, chunk2
对于chunk1,根据ip1-3与client ip的距离,找最近的,取chunk;如果失败,找下一个最近的chunkserver
client.sendToIp1(chunkId1, offset1, size1)
// stale read是存在的,read的时候chunkserver可能发现data corruption

磁盘错误的解决

file -> chunk -> block

在每个chunkserver上执行,通过checksum检测

每个chunk由多个64KB的block组成,每个block有一个32位的checksum

read repair,如果chunkserver发现了一个非法block,会返回client err,同时向master汇报

  • client会从其他replica read
  • master会从其他有效的replica把这整个chunk clone到另外一个chunkserver,然后告诉有问题的chunkserver删除那个chunk

Questions

chunk为什么64MB那么大?

  • 减少master内存的占用
  • 减少client与master的交互
    同一个chunk的R/W,client只需要与master交互一次
  • 可以很容易看到机器里哪些机器磁盘快满了,而做迁移
  • 可以减少带宽的hotspot
    如果没有chunk,那么1TB的文件就只能从一个replica读
    有了chunk,可以从多个replica读
  • sharding the load
  • 加快recovery时间
    每个chunk,可以很快地被clone到一台新机器
  • 如果一个file只有1MB,那么实际存储空间是1MB,而不是64MB
    但它会增加master file count问题
  • 可以独立replicate
    • 文件的一部分损坏,可以迅速从好的replica恢复
    • 支持更细粒度的placement
  • 支持超大文件

chunk hotspot问题

MapReduce时,需要把代码发布到GFS,很可能小于64MB,即只有1个chunk,当很多mapper时,这个chunkserver就成为hotspot
解决办法是:增加replication factor

但最终的解决方案是:client间P2P,互相读

master为什么不用Paxos?

master通过redo log sync replication来提高可靠性,但没有election过程,都是完全手工进行failover

我猜,chubby当时还没有启动,chubby论文发表于2006

master为什么不持久化chunk location?

其他的metadata是有redo log replication并持久化的,他们的变化,都是主动产生的,例如创建一个文件,增加一个chunk
而由于chunkserver随时可能crash,不受控制,因此通过heartbeat来计算并存放内存,通过heartbeat,master又可以修正chunkserver的一些错误,例如orphan chunk

Data flow为什么pipelined chain,而不并发?

为了避免产生网络瓶颈,同时为了更充分利用high latency links
通过ip地址,感知chunkserver直接的距离

Total Latency = (B/T) + (R*L)

2PC,避免了client(coordinator) crash问题,因为primary成为了coordinator,而它是有failover的

1
2
3
4
5
6
7
8
9
10
11
12
13
client负责把一个write请求分成多个chunk的请求
Phase1: data flow client -> chained chunkservers
相当于prepare,但数据不落盘
client由近及远地chain把数据写入相应chunkserver的LRU buffer
这个顺序跟primary在哪无关
Phase2: control flow client -> primary -> secondary chunkservers
相当于commit,数据visible
确定mutation order
Phase1出错,则等master来修复,把crashed chunkserver摘除
Phase2出错,primary->secondary,这个阶段,那么primary返回client err,client会重试,此时可能出现不一致的状态,但最终master会修复

为什么搞个primary角色,而不让master做?

为了减轻master负担,所以搞了个二级调度:
跨chunk,master负责;chunk内部,primary负责

master如何revoke primary lease?

在lease expire后,master可能什么都不做
在lease expire前,master会sendto(primary)让它取消;如果sendto失败,那么只能等expire

为什么data flow和control flow分开?

如果不分开,那么所有的数据都是client->primary->secondary
分开后,比较轻量级的control flow必须走primary扩散;重量级的data flow可以根据物理拓扑进行优化

GFS vs Ceph

  • 论文2003 vs 2006
  • chunk(64MB) vs Object(4MB)
    object size可配
  • master vs mon(Paxos)
  • chunkserver vs osd
  • replication
    • GFS
      2PC, decouple data/control flow
    • Ceph
      client <-> osd
  • Ceph通过PG+crunch提高了扩展性
    GFS通过allocation table的方式
  • GFS上直接跑MapReduce
    计算向存储locality
  • Ceph更通用,latency更好
    GFS通过lease提高扩展性,但遇到错误时只能等expire
  • 节点的变化
    • GFS
      chunkserver向master汇报,自动加入,完全不需要人工参与
    • Ceph
      需要通过ceph osd命令,手工执行
  • namespace
    GFS是directory,Ceph是flat object id

2009 GFS回顾

GFS在使用了10年的过程中,发现了一些问题,对这些问题,有的是通过上层的应用来解决的,有的是修改GFS解决的

master ops压力

最开始的设计,考虑的是支撑几百TB,几百万个文件。但很快,到了几十PB,这对master有了压力

  • master在recover的时候,也变慢
  • master要维护的数据更多
  • client与master的交互变慢
    每次open,client都要请求master
    MapReduce下,可能突然多出很多task,每个都需要open,master处理能力也就是每秒几千个请求
    解决办法是在应用层垂直切分,弄多个cluster,应用通过静态的NameSpace找自己的master,同时提升单个master能力到数万ops

随着GFS的内部推广,越来越多的千奇百怪的上层应用连接进来

  • 最开始是爬虫和索引系统
  • 然后QA和research组用GFS来保存large data sets
  • 再然后,就有50多个用户了
  • 在此过程中GFS不断地调整以满足新use case

file-count问题

很早就发现了,例如:

  • 前端机上要把log发到GFS保存以便MapReduce分析,前端机很多,每个log每天会logrotate,log的种类也越来越多
  • gmail需要保存很多小文件
    解决办法是把多个文件合并,绕开file-count问题,同时增加quota功能,限制file-count和storage space
    长远的办法:在开发distributed multi-master系统,一个cluster可以有上百个master,每个master可以存1亿个文件,但
    如果都是小文件,会有新的问题出现:more seek
    再后来,建立在GFS之上的BigTable推出了,帮助GFS直接面对应用对小文件、多文件的需求,BigTable层给解决了,BigTable在使用GFS时,仍然是大文件、少文件

latency问题

GFS设计是只考虑吞吐率,而少考虑latency

error recovery慢

如果write一个文件,需要写到3个chunkserver,如果其中一个卡住了或crash,master会发觉(heartbeat),它会开新的一个chunkserver replica从其他chunkserver pull
master会把这个lock,以便新的client不能write(等恢复后再unlock)
而这个pullchunk操作,为了防止bandwidth burst,是有throttle的,限制在5-10MB/s,即一个64MB chunk,需要10s左右
等恢复到3个ok的时候再返回给client,client再继续write
在此过程中,client一直是block的

master failover慢

刚开始master failover完全靠人工,可能需要1h;后来增加了自动master failover,需要几分钟;再改进,可以在几秒钟内完成master自动切换

为吞吐量而设计的batch增加latency

解决办法

BigTable是无法忍受那么高的延时的,它的transaction log是最大的瓶颈,存储在GFS:
2个log(secondary log),一个慢,就切换到另外一个,这2个log任意时刻只有1个active,并且log entry里有sequence号,以便replay时防重
google使用这个request redundancy or timeout方法很广泛,为了解决search long tail latency,一样思路

Gmail是多机房部署的,一个卡了,切到另外机房

consistency

client一直push the write till it succeeds
但如果中途client crash了,会造成中间状态:不同client读同一个文件,可能发现文件长度不同
解决办法:使用append,offset统一由primary管理

但append由于没有reply保护机制,也有问题:
client write,primary分配一个offset,并call secondary,有的secondary收到有的没收到,此时primary crash
master会选另外一个作为primary,它可能会分配一个新的offset,造成该数据重复
如果为此再设计一套consensus,得不偿失
解决办法:single writer,让上层应用保证不并发写

Colossus

Colossus is specifically designed for BigTable, 没有GFS那么通用
In other words, it was built specifically for use with the new Caffeine search-indexing system, and though it may be used in some form with other Google services, it is not the sort of thing that is designed to span the entire Google infrastructure.

  • automatically sharded metadata layer
  • EC
  • client driven replication
  • metadata space has enabled availability analysis

BigTable

(row, column, time) -> value

在有了GFS和Chubby后,Google就可以在上面搭建BigTable了,一个GFS的索引服务
但BigTable论文对很多细节都没有提到:SSTable的实现、tabletserver的HA,B+数的metadata table算法

为了管理巨大的table,按照row key做sharding,每个shard称为tablet(100-200MB,再大就split),每台机器存储100-1000个tablet
row key是一级索引,column是二级索引,版本号(timestamp)是三级索引

redo log和SSTable都存放在GFS的Chubby管理元信息的分布式LSM Tree

tabletserver没有任何的持久化数据,只是操作memtable,真正的数据存放在哪里只有GFS知道,那为什么需要master在chubby上分配tablet给tabletserver?
因为memtable是有状态的: level0

tabletserver的HA?
通过chubby ephemeral node,死了master会让别的server接管,通过GFS上的redo log恢复memtable
为了保证强一致性系统,同一时刻同一份数据只能一台tabletserver服务,tabletserver对每个tablet是没有备份的
当它crash,由于只需要排序很少的操作日志并且加载服务的tablet的索引,宕机恢复可以做到一分钟以内;在此期间,一部分rowkey不可用

split and migration?
在没有crash情况下,只需要修改metadata和从sstable加载索引数据,效率很高

与GFS的对应

  • commit log
    每台机器一个commit log文件,与GFS File一一对应
  • sstable
    HBase中Column Family的名称会被作为文件系统中的目录名称,每个CF存储成一个HDFS的HFile
    据google工作过的人说:Column Families are stored in their own SSTable,应该是这样
    sstable对应一个GFS File
    sstable block=64KB,它与GFS的block相同
    sstable block为了压缩和索引(binary search),GFS block为了checksum

Highlights

redo log合并

一台机器一个redo log,而不是一个tablet一个redo log(每个机器有100-1000个tablet),否则GFS受不了
group commit

带来的问题:恢复时麻烦了
如果一天机器crash了,它上面的tablets会被master分配到很多其他的tabletserver上
例如,分配到了100台新tabletserver,他们都会read redo log and filter,这样redo log被读了100次
解决办法:利用类似MapReduce机制,在recovery之前先给redo log排序

加速tablet迁移

1
2
3
4
5
sourceTablet.miniorCompaction() // 把memtable里内容dump到GFS的SSTable
sourceTablet.stopServe()
sourceTablet.miniorCompaction() // 把in-flight commit log对应的操作也持久化到GFS
// 这样targetTablet就不需要从commit log recover了
master.doSwitch()

SSTable由多个64KB的block组成

压缩以block为单位,虽然相比在整个SSTable上压缩比小(浪费空间),但对于随机读,可以只uncompress block而非整个SSTable

经验和教训

遇到了新的问题

  • 发现了Chubby的bug
  • network corruption
    通过给RPC增加checksum解决
  • delay adding features until clear how it will be used
    刚开始想给API增加一个通用的事务机制,后来发现大部分人只需要单行事务
  • 不仅监控server,也监控client
    扩展了RPC,采样detailed trace of important actions
  • 设计和实现都要简单、明了
    BigTable代码10万行C++
    tabletserver的membership协议的设计,最初:master给tabletserver发lease
    结果:在网络出问题时大大降低了可用性(master无法reach tabletserver就只能等expire)
    改进:实现了更复杂的协议,也利用了Chubby里非常少见的特性
    结果:大量时间在调试edge case,很多时间在调试Chubby的代码
    最终:回到简单的设计,只依赖Chubby,而且只使用它通用的特性

Questions

按照rowkey来shard,那么可能造成hotspot问题,client端比较难控制

2009 BigTable回顾

部署了500+个BigTable集群,最大的机器:70+ PB data; sustained: 10M ops/sec; 30+ GB/s I/O

  • Lots of work on scaling
  • Improved performance isolation
  • Improved protection against corruption
  • Configure on per-table
  • 异地机房复制: 增加了最终一致性模型
  • Coprocessor

    References

http://queue.acm.org/detail.cfm?id=1594206
http://google-file-system.wikispaces.asu.edu/
http://static.usenix.org/publications/login/2010-08/openpdfs/maltzahn.pdf
https://stephenholiday.com/notes/bigtable/

Share Comments

replication models

Models

  • primary-replica(failover, replica readable)/multi-master/chain
  • replica/ec
  • push/pull
  • sync/async
1
2
3
4
5
6
7
8
9
10
Dynamo PUSH coordinator -> preference list next nodes in hash ring
Raft/ZK PUSH master commit log -> majority ack
GFS/Hadoop chain replication
ES PUSH primary-replica model, master concurrently dispatch index req to all replicas
MySQL PUSH master async PUSH to slaves binlog
Kafka PULL(Fetch) ISR
redis
Ceph PUSH primary-replica sync model with degrade mode 同步写入
Swift
Aurora PUSH primary instance R(3)+W(4)>N(6)
Share Comments

Google container evolution

Babbysitter & GlobalWorkQueue -> cgroup -> Borg -> Omega -> Kubernetes

Container

容器提供的隔离还不完善,OS无法管理的,容器无法隔离(例如CPU Cache,内存带宽),为了防止(cloud)恶意用户,需要在容器外加一层VM保护

container = isolation + image

数据中心:机器为中心 -> 应用为中心
在Application与OS之间增加一层抽象:container
container的依赖大部分都存放在image了,除了OS的系统调用
实际上,应用仍然会受OS影响,像/proc, ioctl

好处

  • 更高的资源利用率
  • 监控的是应用,而不是混在一起的机器
  • 开发者、管理员不需关心OS
  • load balancer不再balance traffic across machines: across application instances
  • logs are keyed by application, not machine
  • 可以更容易识别是应用的失败还是机器的失败

教训

  • 不要给container端口号
    Borg做法是物理机上的所有容器共用主机的ip,每个容器分单独的端口号
    Kubernetes为每个pod分配不同ip,network身份=application身份
  • 不要给container编号
    用label
  • 不用暴露raw state
    减少client端的复杂度
    K8s里,都通过API Service访问状态信息

还没有很好解决的问题

  • Configuration
  • Dependency management
    手工配,最终一定不一致
    自动,无法获取足够的语义信息
Share Comments

Ceph Internals

RADOS

Key ideas

components

  • 把传统文件系统架构分隔成client component and storage component
    client负责更上层的抽象(file, block, object),storage负责底层object, disk
  • seperation of data and metadata
    各管各的

Object

Object是最终落地存储文件的基本单位,可以类比fs block,默认4MB
如果一个文件 > object size,文件会被stripped,这个操作是client端完成的

It does not use a directory hierarchy or a tree structure for storage
It is stored in a flat-address space containing billions of objects without any complexity

Object组成

  • key
  • value
    在fs里用一个文件存储
  • metadata
    可以存放在扩展的fs attr里
    因此对底层文件系统是有要求的,ext4fs对extended attr数量限制的太少,可以用xfs
1
2
3
4
5
+---logical------+ +--physical layer -+
| | | |
cluster -> pool -> pg ---> osd -> object -> fs files
| |
+-crush-+

Lookup

1
2
3
4
5
6
7
8
9
10
11
file = ino
obj = (ino, ono)
obj_hash = hash(ino, ono)
pg = obj_hash & (num_pg-1) // num_pg是2整数幂,例如codis里1024
// 官方推荐num_pg是osd总数的数百倍
// num_pg如果变化,会引起大量的数据迁移
osds_for_pg = crush(pg) // list of osds,此处是唯一需要全局cluster map的地方
// 其他的,都是算出来的
// osds_for_pg is ordered,len(osds_for_pg)=replicate factor
primary = osds_for_pg[0]
replicas = osds_for_pg[1:]

lookup

crush

一个hash算法,目标

  • 数据均匀的分布到集群中
  • 需要考虑各个OSD权重的不同(根据读写性能的差异,磁盘的容量的大小差异等设置不同的权重)
  • 当有OSD损坏需要数据迁移时,数据的迁移量尽可能的少

有点像一致性哈希:failure, addition, removal of nodes result in near-minimal object migration

crush

PG

PG(placement group)与Couchbase里的vbucket一样,codis里也类似,都是presharding技术

在object_id与osd中间增加一层pg,减少由于osd数量变化造成大量的数据迁移
PG使得文件的定位问题,变成了通过crush定位PG的问题,数量大大减少
例如,1万个osd,100亿个文件,30万个pg: 100亿 vs 30万

线上尽量不要更改PG的数量,PG的数量的变更将导致整个集群动起来(各个OSD之间copy数据),大量数据均衡期间读写性能下降严重

1
Total PGs = (Total_number_of_OSD * 100) / max_replication_count
1
2
3
4
5
6
7
8
$ceph osd map pool1 object1
osdmap e566 pool 'pool1' (10) object 'object1' -> pg 10.bac4decb (10.3c) -> up [0,6,3] acting [0,6,3]
$
// osdmap e566: osd map epoch 566
// pool 'pool1' (10): pool name and pool id,pool id从0开始,每新建+1
// pg 10.bac4decb (10.3c): placement group number 10.3c(pg id是16进制,256个pg,那么他们的id: 0, 1, f, fe, ff)
pg id实际上是pool_id.pg_id(pool id=10, pg id=3c)
// up [0,6,3]: osd up set contains osd.0, osd.6, osd.3

同一个PG内的osd通过heartbeat相互检查对方状态,大部分情况下不需要mon参与,减少了mon负担

mon

相当于Ceph的zookeeper

mon quorum负责整个Ceph cluster中所有OSD状态(cluster map),然后以增量、异步、lazy方式扩散到each OSD和client
mon被动接收osd的上报请求,作为reponse把cluster map返回,不主动push cluster map to osd
如果client和它要访问的PG内部各个OSD看到的cluster map不一致,则这几方会首先同步cluster map,然后即可正常访问

MON跟踪cluster状态:OSD, PG, CRUSH maps
同一个PG的osd除了向mon发送心跳外,还互相发心跳以及坚持pg数据replica是否正常

Replication

是primary-replica model, 强一致性,读和写只能向primary发起请求
read write
read write
其中replicas在复制到buffer时就ack,如果某个replica复制时失败,进入degrade状态

2PC Write

write 2PC

1
2
3
4
5
6
phase1: client从primay接收ack
此时,数据已经replicate到每个replica的内存
phase2: client从primary接收commit
此时,数据已经replicate到每个replica的磁盘
client才把本地的write buffer cache清除

scrub机制

read verify

Questions

consistency?

strongly consistent

Can Ceph replace facebook Haystack?

Haystack解决的实际上是metadata访问造成的IO问题,解决的方法是metadata完全内存,无IO
每个图片的read,需要NetApp进行3次IO: dir inode, file inode, file data
haystack每个图片read,需要1次IO,metadata保证在内存
在一个4TB的SATA盘存储20M个文件,每个inode如果需要500B,那么如果inode都存放内存,需要10GB内存

  • 减少每个metadata的大小
    去掉大部分不需要的数据,例如uid, gid, atime等
    xfs里每个metadata占536字节,haystack每个图片需要10字节
  • 减少metadata的总数量
    多个图片(needle)合并进一个大文件(superblock)

用户上传一张图片,facebook会将其生成4种尺寸的图片,每种存储3份,因此写入量是用户上传量的12倍。

Posix规范里的metadata很多对图片服务器不需要,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
struct inode {
loff_t i_size; // 文件大小
struct list_head i_list; // backing dev IO list
struct list_head i_devices;
blkcnt_t i_blocks;
struct super_block *i_sb;
struct timespec i_atime;
struct timespec i_mtime;
struct timespec i_ctime;
umode_t i_mode;
uid_t i_uid;
gid_t i_gid;
dev_t i_rdev;
unsigned int i_nlink;
...
}

Ceph解决了一个文件到分布式系统里的定位问题(crush(pg)),但osd并没有解决local store的问题: 如果一个osd上存储过多文件(例如10M),性能会下降明显

Why EC is slow?

Google Colossus采用的是(6,3)EC,存储overhead=(6+3)/6=150%
把数据分成6个data block+3个parity block,恢复任意一个数据块需要6个I/O
生成parity block和恢复数据时,需要进行额外的encode/decode,造成性能损失

Store small file, waste space?

object size是可以设置的

References

http://ceph.com/papers/weil-crush-sc06.pdf
http://ceph.com/papers/weil-rados-pdsw07.pdf
http://ceph.com/papers/weil-ceph-osdi06.pdf
http://www.xsky.com/tec/ceph72hours/
https://blogs.rdoproject.org/6427/ceph-and-swift-why-we-are-not-fighting
https://users.soe.ucsc.edu/~elm/Papers/sc04.pdf

Share Comments