storm acker

Acker

对于Spout产生的每一个tuple, storm都会进行跟踪,利用RotatingMap存放内存,但有保护措施,不会打爆

当Spout触发fail动作时,storm不会自动重发失败的tuple,只是向Spout发送fail消息,触发Spout.fail回调,真正的重发需要在Spout.fail里实现

tuple tree,中间任意一个edge fail,会理解触发Spout的fail,但后面的Bolt的执行不受影响。做无用功?

backtype.storm.daemon.acker

Spout Executor

spout executor

Bolt Executor

bolt executor

Share Comments

monkey patch golang

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package main
import (
"syscall"
"unsafe"
)
func a() int { return 1 }
func b() int { return 2 }
func getPage(p uintptr) []byte {
return (*(*[0xFFFFFF]byte)(unsafe.Pointer(p & ^uintptr(syscall.Getpagesize()-1))))[:syscall.Getpagesize()]
}
func rawMemoryAccess(b uintptr) []byte {
return (*(*[0xFF]byte)(unsafe.Pointer(b)))[:]
}
func assembleJump(f func() int) []byte {
funcVal := *(*uintptr)(unsafe.Pointer(&f))
return []byte{
0x48, 0xC7, 0xC2,
byte(funcVal >> 0),
byte(funcVal >> 8),
byte(funcVal >> 16),
byte(funcVal >> 24), // MOV rdx, funcVal
0xFF, 0x22, // JMP rdx
}
}
func replace(orig, replacement func() int) {
bytes := assembleJump(replacement)
functionLocation := **(**uintptr)(unsafe.Pointer(&orig))
window := rawMemoryAccess(functionLocation)
page := getPage(functionLocation)
syscall.Mprotect(page, syscall.PROT_READ|syscall.PROT_WRITE|syscall.PROT_EXEC)
copy(window, bytes)
}
func main() {
replace(a, b)
print(a())
}

Reference

https://software.intel.com/en-us/articles/introduction-to-x64-assembly
https://www.hopperapp.com/

Share Comments

KafkaSpout

config

Topology config

  • TOPOLOGY_WORKERS
    • 整个topology在所有节点上的java进程总数
    • 例如,设置成25,parallelism=150,那么每个worker进程会创建150/25=6个线程执行task
  • TOPOLOGY_ACKER_EXECUTORS = 20
    • 不设或者为null,it=TOPOLOGY_WORKERS,即one acker task per worker
    • 设置为0,表示turn off ack/reliability
  • TOPOLOGY_MAX_SPOUT_PENDING = 5000

    1
    2
    3
    (defn executor-max-spout-pending [storm-conf num-tasks]
    (let [p (storm-conf TOPOLOGY-MAX-SPOUT-PENDING)]
    (if p (* p num-tasks))))
    • max in-flight(not ack or fail) spout tuples on a single spout task at once
    • 如果不指定,默认是1
  • TOPOLOGY_BACKPRESSURE_ENABLE = false

  • TOPOLOGY_MESSAGE_TIMEOUT_SECS
    30s by default

KafkaSpout config

1
2
3
fetchSizeBytes = 1024 * 1024 * 2 // 1048576=1M by default FetchRequest
fetchMaxWait = 10000 // by default
forceFromStart = false

emit/ack/fail flow

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
class PartitionManager {
SortedSet<Long> _pending, failed = new TreeSet();
LinkedList<MessageAndRealOffset> _waitingToEmit = new LinkedList();
func EmitState next(SpoutOutputCollector collector) {
if (this._waitingToEmit.isEmpty()) {
// 如果内存里数据都发出,就调用kafka consumer一次性批量填充内存_waitingToEmit
// 填充时,如果发现failed里有东西,那么就从head of failed(offset) FetchRequest: 重发机制
this.fill();
}
// 从LinkedList _waitingToEmit里取一条消息
MessageAndRealOffset toEmit = (MessageAndRealOffset)this._waitingToEmit.pollFirst();
// emit时指定了messageID
// BasicBoltExecutor.execute会通过template method自动执行_collector.getOutputter().ack(input)
// 即KafkaSpout.ack -> PartitionManager.ack
collector.emit(tup, new PartitionManager.KafkaMessageId(this._partition, toEmit.offset));
// 该tuple处于pending state
}
// Note: a tuple will be acked or failed by the exact same Spout task that created it
func ack(Long offset) {
this._pending.remove(offset)
}
func fail(Long offset) {
this.failed.add(offset);
// kafka consumer会reset offset to the failed msg,重新消费
}
}
class TopologyBuilder {
public BoltDeclarer setBolt(String id, IBasicBolt bolt, Number parallelism_hint) {
return setBolt(id, new BasicBoltExecutor(bolt), parallelism_hint);
}
}
class BasicBoltExecutor {
public void execute(Tuple input) {
_collector.setContext(input);
try {
_bolt.execute(input, _collector);
_collector.getOutputter().ack(input);
} catch(FailedException e) {
_collector.getOutputter().fail(input);
}
}
}

Bolt ack

KafkaSpout产生的每个tuple,Bolt必须进行ack,否则30s后KafkaSpout会认为emitted tuple tree not fully processed,进行重发

1
2
3
4
5
6
class MyBolt {
public void execute(Tuple tuple) {
_collector.emit(new Values(foo, bar))
_collector.ack(tuple)
}
}

OOM

如果消息处理一直不ack,累计的unacked msg越来越多,会不会OOM?
NO
KafkaSpout只保留offset,不会保存每条emitted but no ack/fail msg

spout throttle

1.0.0之前,只能用TOPOLOGY_MAX_SPOUT_PENDING控制
但这个参数很难控制,它有一些与其他参数配合使用才能生效的机制,而且如果使用Trident语义又完全不同
1.0.0之后,可以通过backpressure
backpressure

Storm messaging

  • intra-worker
    Disruptor
  • inter-worker
    0MQ/Netty

storm messaing

References

http://www.michael-noll.com/blog/2013/06/21/understanding-storm-internal-message-buffers/
http://jobs.one2team.com/apache-storms/
http://woodding2008.iteye.com/blog/2335673

Share Comments

aliware

https://www.aliyun.com/aliware

  • EDAS
    • Enterprise Distributed Application Service
    • RPC framework + elasticjob + qconf + GTS + Dapper + autoscale
    • 鉴权数据下放到服务机器,避免性能瓶颈
  • MQ
  • DRDS
    • TDDL proxy
    • avg() => sum()/count()
    • 扩容,切换时client可能会失败,需要client retry
    • 通过/ TDDL: XXX/ sql注释来实现特定SQL路由规则
    • 可以冷热数据分离
  • ARMS
    streaming processing,具有nifi式visualized workflow editor
  • GTS

    1
    2
    3
    4
    5
    6
    7
    @GtsTransaction(timeout=1000*60)
    public void transfer(DataSource db1, DataSource db2) {
    // 强一致,但涉及的事务比较大时,性能下降非常明显
    // 通过soft state MQ的最终一致性吞吐量要好的多
    db1.getConnection().execute("sql1")
    db2.getConnection().execute("sql2")
    }
  • SchedulerX

  • CSB
    cloud service bus,相当于api gateway,包含协议转换
Share Comments

TiDB KV Mapping

Data

1
2
Key: tablePrefix_rowPrefix_tableID_rowID
Value: [col1, col2, col3, col4]

Unique Index

1
2
Key: tablePrefix_idxPrefix_tableID_indexID_indexColumnsValue
Value: rowID

Non-Unique Index

1
2
Key: tablePrefix_idxPrefix_tableID_indexID_ColumnsValue_rowID
Value: nil

1
2
3
4
5
var (
tablePrefix = []byte{'t'}
recordPrefixSep = []byte("_r")
indexPrefixSep = []byte("_i")
)
Share Comments

SSD vs HDD

http://www.pcgamer.com/hard-drive-vs-ssd-performance/2/

1
2
3
RandRead RandWrite SeqRead SeqWrite
SSD 50 200 2300 1300
HDD 0.6 1.0 200 130
Share Comments

kafka internals

Overview

overview

Controller

负责

  • leadership change of a partition
    each partition leader can independently update ISR
  • new topics; deleted topics
  • replica re-assignment

曾经的设计是没有controller,每个broker要决策时都通过zk,加入controller坏处就是要实现controller failover

1
2
3
4
5
6
7
class KafkaController {
partitionStateMachine PartitionStateMachine
replicaStateMachine ReplicaStateMachine
// controller选举,并LeaderChangeListener
controllerElector ZookeeperLeaderElector
}

ControllerContext

一个全局变量。在选举为controller时,一次性从zk里读入所有状态信息KafkaController.initializeControllerContext

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class ControllerContext {
epoch, epochZkVersion, correlationId
controllerChannelManager // 维护controller与每个broker之间的socket conn
// 初始化时从/brokers/ids里取得的,并在BrokerChangeListener里修改
liveBrokersUnderlying: Set[Broker]
// 初始化时从/brokers/topics里取得的,并在TopicChangeListener里修改
allTopics: Set[String]
// 初始化时从/brokers/topics/$topic里一一取得,并在如下情况下修改
// - assignReplicasToPartitions.assignReplicasToPartitions
// - updateAssignedReplicasForPartition
// - TopicChangeListener
// - ReplicaStateMachine.handleStateChange
partitionReplicaAssignment: mutable.Map[TopicAndPartition, Seq[Int]] // AR
// 初始化时从/brokers/topics/$topic/$partitionID/state一一取得,如下情况下修改
// - PartitionStateMachine.initializeLeaderAndIsrForPartition
// - PartitionStateMachine.electLeaderForPartition
partitionLeadershipInfo: mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch]
}

ControllerChannelManager

controller为每个broker建立一个socket(BlockingChannel)连接和一个thread用于从内存batch取request进行逐条socket send/receive
e,g. 1个8节点的机器,controller上为这部分工作,需要建立(7个线程 + 7个socket conn)
与每个broker的连接是并发的,互不干扰;对于某一个broker,请求是完全串行的

config
  • controller.socket.timeout.ms
    default 30s, socket connect/io timeout
broker shutdown/startup/crash?

zk.watch(“/brokers/ids”), BrokerChangeListener会调用ControllerChannelManager.addBroker/removeBroker

conn broken/timeout?

如果socket send/receive失败,那么自动重连重发,backoff=300ms,死循环
但如果broker长时间无法reach,它会触发zk.watch(“/brokers/ids”),removeBroker,死循环退出

ControllerBrokerRequestBatch

controller -> broker,这里的圣旨请求有3种,都满足幂等性

  • LeaderAndIsrRequest
    replicaManager.becomeLeaderOrFollower
  • UpdateMetadataRequest
    replicaManager.maybeUpdateMetadataCache
  • StopReplicaRequest
    删除topic

onBrokerStartup

由BrokerChangeListener触发

1
2
3
4
5
6
7
sendUpdateMetadataRequest(newBrokers)
// 告诉它它上面的所有partitions
replicaStateMachine.handleStateChanges(allReplicasOnNewBrokers, OnlineReplica)
// 让所有的NewPartition/OfflinePartition进行leader election
partitionStateMachine.triggerOnlinePartitionStateChange()

onBrokerFailure

找出受影响的状态,并触发partitionStateMachine、replicaStateMachine的状态切换

StateMachine

只有controller那台机器的state machine才会启动

terms

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Replica {
broker_id int
partition Partition // a Replica belongs to a Partition
log Log
hw, leo long
isLeader bool
}
Partition {
topic string
partition_id int
leader Replica
ISR Set[Replica]
AR Set[Replica] // assigned replicas
zkVersion long // for CAS
}

PartitionStateMachine

每个partition的状态,负责分配、选举partion的leader

1
2
3
4
5
6
7
8
9
10
11
class PartitionStateMachine {
// - NonExistentPartition
// - NewPartition
// - OnlinePartition
// - OfflinePartition
partitionState: mutable.Map[TopicAndPartition, PartitionState] = mutable.Map.empty
topicChangeListener // childChanges("/brokers/topics")
addPartitionsListener // all.dataChanges("/brokers/topics/$topic")
deleteTopicsListener // childChanges("/admin/delete_topics")
}

  • PartitionLeaderSelector.selectLeader
    • OfflinePartitionLeaderSelector
    • ReassignedPartitionLeaderSelector
    • PreferredReplicaPartitionLeaderSelector
    • ControlledShutdownLeaderSelector

ReplicaStateMachine

每个partition在assigned replic(AR)上的状态,track每个broker的存活

1
2
3
4
5
6
7
8
9
10
class ReplicaStateMachine {
replicaState: mutable.Map[PartitionAndReplica, ReplicaState] = mutable.Map.empty
brokerRequestBatch = new ControllerBrokerRequestBatch
// 只有controller关心每个broker的存活,broker自己是不关心的
// 而且broker die只有2种标准,不会因为conn broken认为broker死
// 1. broker id ephemeral znode deleted
// 2. broker controlled shutdown
brokerChangeListener // childChanges("/brokers/ids")
}

replica state transition

startup

对所有replica,如果其对应的broker活,那么就置为OnlineReplica,否则ReplicaDeletionIneligible

state transition
  • 校验当前状态与目标状态
  • 维护内存replicaState
  • 必要时通过brokerRequestBatch广播给所有broker

Failover

broker failover

每个broker有KafkaHealthcheck,它向/brokers/ids/$id这个ephemeral znode写数据,session expire就会重写,其中timestamp置为当前时间

1
{"jmx_port":-1,"timestamp":"1460677527837","host":"10.1.1.1","version":1,"port":9002}

controller的BrokerChangeListener监视所有broker的存活
参考 onBrokerStartup, onBrokerFailure

leader failover

一个partition的leader broker crash了,controlle选举出新的leader后,该new leader的LEO会成为新的HW
leader负责维护/propogate HW,以及每个follower的LEO

存在一个partition多Leader脑裂

1
2
3
4
5
partition0 broker(A, B, C),A是leader
A GC很久,crontroller认为A死,让B成为leader,写zk ISR znode,正在此时A活了但还没有收到controller发来的RPC,此时A、B都是leader
如果client1连接A,clientB连接B,他们都发消息?
A活过来的时候,A认为它的ISR=(A,B,C),clientA发的消息commit条件是A,B,C都ack by fetch request才可以,但B在promoted to leader后会先stop fetching from previous leader。
因此,A只有shrink ISR后才可能commit消息,但shrink时它会写zk,通过CAS失败,此时A意识到它已经不是leader

controller failover

controller session expire example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
broker1,是controller
在sessionTimeout*2/3=4s内还没有收到response,就会try next zk server
zk connected,发现session(0x35c1876461ec9d3)已经expire了
触发KafkaController.SessionExpirationListener {
onControllerResignation()
brokerState设置为RunningAsBroker // 之前是RunningAsController
controllerElector.elect {
switch write("/controller") {
case ok:
onBecomingLeader {
// 1 successfully elected as leader
// Broker 1 starting become controller state transition
controllerEpoch.increment()
register listeners
replicaStateMachine.startup()
partitionStateMachine.startup()
brokerState = RunningAsController
sendUpdateMetadataRequest(to all live brokers)
}
case ZkNodeExistsException:
read("/controller") and get leaderID
}
}
}
broker0:
ZookeeperLeaderElector.LeaderChangeListener被触发,它watch("/controller") {
elect
}

Misc

  • ReplicationUtils.updateLeaderAndIsr

References

https://issues.apache.org/jira/browse/KAFKA-1460
https://cwiki.apache.org/confluence/display/KAFKA/kafka+Detailed+Replication+Design+V3
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals

Share Comments

scale

scale
growth

Share Comments

gnatsd

2011年用Ruby完成了一个版本,后来用golang重写,目前只维护golang版本
NATS = Not Another Tibco Server

协议类似redis
持久化通过上层的NATS Streaming完成

竞品是0mq/nanomsg/aeron

https://github.com/real-logic/Aeron

Share Comments

microservice gateway

需要的功能

  • traffic management
    • failure recovery
    • A/B testing, canary releases
    • discovery, load balance
    • throttling
    • rate limit
    • fault injection
  • observability
    • metrics, monitor, alert, insights
  • policy enforcement
    • access control
  • service identity and security
    • end-to-end authz
Share Comments