博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Storm中的Executor
阅读量:7222 次
发布时间:2019-06-29

本文共 4988 字,大约阅读时间需要 16 分钟。

hot3.png

Executor是Storm的核心运行单位,一个Executor实际上就是一个线程,一个Worker中可以启动一个或多个Executor,而一个Executor又可以含有多个Task,task是逻辑上的运行单位。

Storm会根据一个组件的Task数目及并行度设置来计算哪些Task应该被分配到哪些Executor中,若未设置Task数目,则默认情况下Task数目与Executor数目相同,每个Executor只有一个Task.

 

Executor中的数据

mk-executor-data函数用于定义Executor中含有的数据

:worker:worker-context:executor-id:task-ids:component-id:open-or-prepare-was-called:storm-conf:receivce-queue:storm-id:conf:shared-executor-data:storm-active-atom:batch-transfer-queue

:transfer-fn:suicide-fn:storm-cluster-state:stats:type:task->component:deserializer:sampler

 

Executor的输入和输出

对应两个Disruptor Queue ,分别用于输入和输出

Executor的输入及处理

worker初始化的时候,会给它所包含的每一个Executor创建一个Disruptor Queue,用于接收数据,在创建Executor时,可根据其executor-id从worker的:executor-receive-queue-map中获得该队列的引用:

:receive-queue((:executor-receive-queue-worker) executor-id)

 

      当worker的接收线程从ZMQ收到数据后,线程会根据目标的Taskid找到对应的Executor,并将数据发送到该executor所对应的接收Disruptor Queue中;对于输入Disruptor Queue中的消息,Bolt类型的executor会调用bolt对象的execute方法来处理,而spout类型的Executor则调用Spout对象的Ack或Fail方法处理。

 

    mk-task-receiver函数定义了一个函数来处理Disruptor Queue中的消息,它通过调用disruptor/clojure-handler函数获得一个消息处理函数,会在接收到新消息时被调用

 

    在创建Spout/Bolt时,会调用mk-task-receiver函数并将结果存储于event-handler变量中:

event-handler(mk-task-receiver executor-data tuple-action-fn)

在Spout中以非阻塞方式接收数据:

(disruptor/consume-batch receive-queue event-handler)

在Bolt中,则以阻塞方式接收数据:

(disruptor/consumer-batch-when-available receive-queue event-handler)

Bolt的消息循环主要调用evnet-handler中的方法来对消息进行处理。

Executor的输出及发送  

      每个Executor都会产生一个用于输出的Disruptor Queue对象,Executor在发送消息时首先会将消息内容发送到该队列。Executor会启动一个发送线程来处理该队列中的数据,该线程调用Worker中由mk-transfer-fn产生的函数对数据进行处理,或者把数据通过ZMQ发送到其他Worker,或者直接发送到与该Worker上的其他Executor想对应的接收Disruptor Queue中。

   start-barch-transfer->worker-handler!函数会调用disruptor/consumer-loog*函数来启动用于发送数据队列的发送线程。

 

Spout类型的Executor

mk-threads函数用于创建与Executor对应的消息循环主线程,Spout和Bolt有着不同的消息循环策略

(defmulti mk-threads executor-selector)

 mk-threads函数的主消息循环通过async-loop方法实现

准备消息循环的数据

emitted-count用来记录Executor发送的消息数目,empty-emit-streak变量则用来记录连续调用nextTuple函数且无消息发送的数目,它会被当做spout-wait-strategy的emitEmpty方法的参数。

Spout输入处理函数

对于Spout类型的Executor来讲,输入消息主要是系统消息,例如对发出消息进行回复的Ack/Fail消息,系统的tick消息等,Spout主线程消息循环需要做很多工作,为了不影响其他工作,Spout会采用非阻塞的方式从接收队列中获取消息。

ack-spout-msg函数主要调用用户的Spout对象的Ack回调方法,同时更新相关的统计信息

fail-spout-msg函数主要调用Spout对象的Fail回调方法;

Spout消息发送函数

spout使用send-spout-msg函数来发送消息:

out-stream-id是消息的streamId;values是消息内容,message-id是消息的messageid,表示是否要对消息进行跟踪,out-task-id则是消息的接收端的taskId,用于向直接留发送消息。

调用tasks-fn函数来获得消息的目标taskId,tasks-fn是task的主要函数,它会根据消息的streamid和消息内容来确定哪个Task将接收该流的消息,以及以何种方式来接收该流的消息,transfer-fn函数来发送消息,会将消息发送至Executor的发送队列中。

Spout对象的初始化

Spout对象的open操作,其中open 方法只会被调用一次:

等待直到Topology处于活跃状态,对Executor中每一个Spout进行操作,send-spout-msg函数会利用tasks-fn函数来悬着目标taskid,Executor中每个Spout都定义了send-spout-msg方法,这是由于消息接收端taskid可能会与当前task相关,例如:直接分组方式中,消息接收端的taskID和当前task是预先绑定的。类似的,send-spout-msg还需要利用tasks-fn来选择接收端taskId,同时在进行消息跟踪时记录发送消息的taskId,故Storm将这些函数定义为task级别而非Executor共享。

调用Spout对象的open回调方法,同时实例化SpoutOutputCollector,主要调用send-spout-msg来发送消息。

消息循环:

async-loop循环,正常下,该函数会依次执行接收队列的消息,重发之前未成功发送的消息,调用nexttuple发送新的消息;

以非阻塞的方式对接收队列中的消息进行处理

优先发送overflow-buffer中的数据(由于Executro中发送消息队列已满,数据将被缓存在overflow-buffer中)

若overflow-buffer为空,并且pending 存储的数据少于max-spout-pending,或者未设置max-spout-pending,最后需要topology处于活跃状态,则spout可以发送消息。

依次调用spout的nextTuple回调方法来发送消息,并最终调用send-spout-msg函数将消息发送到Executor的消息发送队列中。send-spout-msg函数会更新emitted-count。

 若Topology处于非活跃状态,则睡眠100毫秒。

若emitted-count与上次发送消息的curr-count相同,则表明nextTuple函数没有发送出去消息,此时调用spout-wait-strategy的emptyEmit方法来处理

Bolt类型的Executor

 

准备消息循环的数据

 

Bolt类型的executor的消息循环较少,主要是定义可tuple-action-fn函数,该函数会根据Taskid获得对应的bolt对象并调用其Execute方法。

 

Bolt的输入处理函数

Bolt的消息发送函数

bolt使用Bolt-emit函数来发送消息,获取消息接收端的Taskid集合,调用transfer-fn函数发送消息。

 

Bolt对象的初始化

 

创建Executor

Worker是如何创建一个Executor的:

调用mk-executor-data来创建Executor的数据

调用mk-task来创建Executor中每个Task对用的数据

调用start-batch-transfer->worker-handler!方法启动executor的数据发送线程

调用mk-threads方法来获得Executor的主循环线程

实例化RunningExecutor对象用以操作Executor

实例化Shutdownable,用于退出Executor并清理相关资源,包括:

结束Disruptor Queue的消息循环

结束Executor中启动的线程

清理用户钩子的数据

断开Zookeeper的连接

依次调用Executor中Spout和Bolt的close方法

 

辅助函数介绍

 

 

 

组件的Grouper函数

获得当前组件中一个流的所有接收端及其接收方式是Executor中的重要算法,是task-fn函数完成各种操作的前提

outbound-components函数用于获取从组件到某个流的分组函数,task-fn函数通过调用该分组函数可获取消息的目标Task集合

mk-grouper函数是Executor的核心方法,它会返回一个函数,该函数返回一个TaskId集合,代表消息发送的目的Task集合。

Storm的几种分组方式

字段分组

首先通过消息中与分组字段名列表对应的值计算得到一个哈希值,然后将该哈希值模除目标节点的个数,选出这条消息的目标节点,这种分组方式下,每条消息只会到达某个目标节点。

随机分组

  表示将输入消息随机地分配到目标节点

全部分组

  表示所有的接收端节点都会收到代发消息

无分组

  表示Storm将随机选择一个目标节点,并将所有的消息都将发送到该节点上。

直接分组

   表示消息将发送到某一个特定节点,这在传递控制信息时非常有用

自定义分组

用户需要实现CustomStreanGrouping接口完成自定义。

public interface CustonStreamGrouping extends Serializable{}

Local_or_Shuffle Grouping

  会首先选择消息发送到属于同一个Worker的目标Task上,如果没有找到这样的task,就通过随机方式来选择消息的接收端。

 

带流量控制的错误报告方法

   Storm会将Executor产生的错误记录到Zookeeper中,而Nimbus通过访问ZooKeeper获得该数据并将其显示在StromUI上。过于频繁以及过多的错误报告都会对zooKeeper造成影响,故Storm采用了相应的控制策略,限制同一时间段内报告错误的数量,并对Zookeeper中老数据进行清理。

 

触发系统Ticks

setup-ticks!函数用来定期向该Executor的接收消息队列发送Tick消息,Executor在收到Tick消息之后,就会执行发送队列的超时操作。因此,setup-ticks!主要用于对Spout节点发送出去的消息进行超时操作。

 

图示

 

 

转载于:https://my.oschina.net/iioschina/blog/822363

你可能感兴趣的文章
艾伟也谈项目管理,代码背后的点滴
查看>>
一起谈.NET技术,在 Linux 操作系统中运行 ASP.NET 4 (下)
查看>>
javascript异步编程系列【七】----扫盲,我们为什么要用Jscex
查看>>
.N“.NET研究”ET中的异步编程(二)- 传统的异步编程
查看>>
C#汉字转拼音代码分享|建议收藏
查看>>
WindowsServer2003+IIS6+ASP+NET+PHP+MSSQL+MYSQL配置说明 |备份于waw.cnblogs.com
查看>>
opengl 链接
查看>>
JSF---->事件处理--值变事件
查看>>
MVC 数据验证
查看>>
MVC中几种常用ActionResult
查看>>
Shiro标签
查看>>
sql 无法删除当前数据库,因为当前数据库正在使用
查看>>
.NET中使用OracleHelper
查看>>
[BuildRelease]安装文件的种类
查看>>
周鸿祎向雷军开炮:山寨成不了乔布斯
查看>>
WYSE POCKETCLOUD手把手教你如何用手机遥控你的电脑!!(转)
查看>>
UML——用例图
查看>>
[转载]最锋利的Visual Studio Web开发工具扩展:Web Essentials详解
查看>>
/etc/ld.so.conf文件
查看>>
iphone 如何实现全屏截图
查看>>