Executor是Storm的核心运行单位,一个Executor实际上就是一个线程,一个Worker中可以启动一个或多个Executor,而一个Executor又可以含有多个Task,task是逻辑上的运行单位。
Storm会根据一个组件的Task数目及并行度设置来计算哪些Task应该被分配到哪些Executor中,若未设置Task数目,则默认情况下Task数目与Executor数目相同,每个Executor只有一个Task.
Executor中的数据
mk-executor-data函数用于定义Executor中含有的数据
:worker:worker-context:executor-id:task-ids:component-id:open-or-prepare-was-called:storm-conf:receivce-queue:storm-id:conf:shared-executor-data:storm-active-atom:batch-transfer-queue
:transfer-fn:suicide-fn:storm-cluster-state:stats:type:task->component:deserializer:sampler
Executor的输入和输出
对应两个Disruptor Queue ,分别用于输入和输出
Executor的输入及处理
worker初始化的时候,会给它所包含的每一个Executor创建一个Disruptor Queue,用于接收数据,在创建Executor时,可根据其executor-id从worker的:executor-receive-queue-map中获得该队列的引用:
:receive-queue((:executor-receive-queue-worker) executor-id)
当worker的接收线程从ZMQ收到数据后,线程会根据目标的Taskid找到对应的Executor,并将数据发送到该executor所对应的接收Disruptor Queue中;对于输入Disruptor Queue中的消息,Bolt类型的executor会调用bolt对象的execute方法来处理,而spout类型的Executor则调用Spout对象的Ack或Fail方法处理。
mk-task-receiver函数定义了一个函数来处理Disruptor Queue中的消息,它通过调用disruptor/clojure-handler函数获得一个消息处理函数,会在接收到新消息时被调用
在创建Spout/Bolt时,会调用mk-task-receiver函数并将结果存储于event-handler变量中:
event-handler(mk-task-receiver executor-data tuple-action-fn)
在Spout中以非阻塞方式接收数据:
(disruptor/consume-batch receive-queue event-handler)
在Bolt中,则以阻塞方式接收数据:
(disruptor/consumer-batch-when-available receive-queue event-handler)
Bolt的消息循环主要调用evnet-handler中的方法来对消息进行处理。
Executor的输出及发送
每个Executor都会产生一个用于输出的Disruptor Queue对象,Executor在发送消息时首先会将消息内容发送到该队列。Executor会启动一个发送线程来处理该队列中的数据,该线程调用Worker中由mk-transfer-fn产生的函数对数据进行处理,或者把数据通过ZMQ发送到其他Worker,或者直接发送到与该Worker上的其他Executor想对应的接收Disruptor Queue中。
start-barch-transfer->worker-handler!函数会调用disruptor/consumer-loog*函数来启动用于发送数据队列的发送线程。
Spout类型的Executor
mk-threads函数用于创建与Executor对应的消息循环主线程,Spout和Bolt有着不同的消息循环策略
(defmulti mk-threads executor-selector)
mk-threads函数的主消息循环通过async-loop方法实现
准备消息循环的数据
emitted-count用来记录Executor发送的消息数目,empty-emit-streak变量则用来记录连续调用nextTuple函数且无消息发送的数目,它会被当做spout-wait-strategy的emitEmpty方法的参数。
Spout输入处理函数
对于Spout类型的Executor来讲,输入消息主要是系统消息,例如对发出消息进行回复的Ack/Fail消息,系统的tick消息等,Spout主线程消息循环需要做很多工作,为了不影响其他工作,Spout会采用非阻塞的方式从接收队列中获取消息。
ack-spout-msg函数主要调用用户的Spout对象的Ack回调方法,同时更新相关的统计信息
fail-spout-msg函数主要调用Spout对象的Fail回调方法;
Spout消息发送函数
spout使用send-spout-msg函数来发送消息:
out-stream-id是消息的streamId;values是消息内容,message-id是消息的messageid,表示是否要对消息进行跟踪,out-task-id则是消息的接收端的taskId,用于向直接留发送消息。
调用tasks-fn函数来获得消息的目标taskId,tasks-fn是task的主要函数,它会根据消息的streamid和消息内容来确定哪个Task将接收该流的消息,以及以何种方式来接收该流的消息,transfer-fn函数来发送消息,会将消息发送至Executor的发送队列中。
Spout对象的初始化
Spout对象的open操作,其中open 方法只会被调用一次:
等待直到Topology处于活跃状态,对Executor中每一个Spout进行操作,send-spout-msg函数会利用tasks-fn函数来悬着目标taskid,Executor中每个Spout都定义了send-spout-msg方法,这是由于消息接收端taskid可能会与当前task相关,例如:直接分组方式中,消息接收端的taskID和当前task是预先绑定的。类似的,send-spout-msg还需要利用tasks-fn来选择接收端taskId,同时在进行消息跟踪时记录发送消息的taskId,故Storm将这些函数定义为task级别而非Executor共享。
调用Spout对象的open回调方法,同时实例化SpoutOutputCollector,主要调用send-spout-msg来发送消息。
消息循环:
async-loop循环,正常下,该函数会依次执行接收队列的消息,重发之前未成功发送的消息,调用nexttuple发送新的消息;
以非阻塞的方式对接收队列中的消息进行处理
优先发送overflow-buffer中的数据(由于Executro中发送消息队列已满,数据将被缓存在overflow-buffer中)
若overflow-buffer为空,并且pending 存储的数据少于max-spout-pending,或者未设置max-spout-pending,最后需要topology处于活跃状态,则spout可以发送消息。
依次调用spout的nextTuple回调方法来发送消息,并最终调用send-spout-msg函数将消息发送到Executor的消息发送队列中。send-spout-msg函数会更新emitted-count。
若Topology处于非活跃状态,则睡眠100毫秒。
若emitted-count与上次发送消息的curr-count相同,则表明nextTuple函数没有发送出去消息,此时调用spout-wait-strategy的emptyEmit方法来处理
Bolt类型的Executor
准备消息循环的数据
Bolt类型的executor的消息循环较少,主要是定义可tuple-action-fn函数,该函数会根据Taskid获得对应的bolt对象并调用其Execute方法。
Bolt的输入处理函数
Bolt的消息发送函数
bolt使用Bolt-emit函数来发送消息,获取消息接收端的Taskid集合,调用transfer-fn函数发送消息。
Bolt对象的初始化
创建Executor
Worker是如何创建一个Executor的:
调用mk-executor-data来创建Executor的数据
调用mk-task来创建Executor中每个Task对用的数据
调用start-batch-transfer->worker-handler!方法启动executor的数据发送线程
调用mk-threads方法来获得Executor的主循环线程
实例化RunningExecutor对象用以操作Executor
实例化Shutdownable,用于退出Executor并清理相关资源,包括:
结束Disruptor Queue的消息循环
结束Executor中启动的线程
清理用户钩子的数据
断开Zookeeper的连接
依次调用Executor中Spout和Bolt的close方法
辅助函数介绍
组件的Grouper函数
获得当前组件中一个流的所有接收端及其接收方式是Executor中的重要算法,是task-fn函数完成各种操作的前提
outbound-components函数用于获取从组件到某个流的分组函数,task-fn函数通过调用该分组函数可获取消息的目标Task集合
mk-grouper函数是Executor的核心方法,它会返回一个函数,该函数返回一个TaskId集合,代表消息发送的目的Task集合。
Storm的几种分组方式
字段分组
首先通过消息中与分组字段名列表对应的值计算得到一个哈希值,然后将该哈希值模除目标节点的个数,选出这条消息的目标节点,这种分组方式下,每条消息只会到达某个目标节点。
随机分组
表示将输入消息随机地分配到目标节点
全部分组
表示所有的接收端节点都会收到代发消息
无分组
表示Storm将随机选择一个目标节点,并将所有的消息都将发送到该节点上。
直接分组
表示消息将发送到某一个特定节点,这在传递控制信息时非常有用
自定义分组
用户需要实现CustomStreanGrouping接口完成自定义。
public interface CustonStreamGrouping extends Serializable{}
Local_or_Shuffle Grouping
会首先选择消息发送到属于同一个Worker的目标Task上,如果没有找到这样的task,就通过随机方式来选择消息的接收端。
带流量控制的错误报告方法
Storm会将Executor产生的错误记录到Zookeeper中,而Nimbus通过访问ZooKeeper获得该数据并将其显示在StromUI上。过于频繁以及过多的错误报告都会对zooKeeper造成影响,故Storm采用了相应的控制策略,限制同一时间段内报告错误的数量,并对Zookeeper中老数据进行清理。
触发系统Ticks
setup-ticks!函数用来定期向该Executor的接收消息队列发送Tick消息,Executor在收到Tick消息之后,就会执行发送队列的超时操作。因此,setup-ticks!主要用于对Spout节点发送出去的消息进行超时操作。
图示