欢迎转载,转载请注明出处,徽沪一郎。
本文从外部消息在worker进程内部的转化,传递及处理过程入手,一步步分析在worker-data中的数据项存在的原因和意义。试图从代码实现的角度来回答,如果是从头开始实现worker的话,该如何来定义消息接口,如何实现各自接口上的消息处理。
Topology到Worker的映射关系
Topology由Spout,Bolt组成,其逻辑关系大体如下图所示。
无论是Spout或Bolt的处理逻辑都需要在进程或线程内执行,那么它们与进程及线程间的映射关系又是如何呢。有关这个问题,Understanding the Parallelism of a Storm Topology 一文作了很好的总结,现重复一下其要点。
- worker是进程,executor对应于线程,spout或bolt是一个个的task
- 同一个worker只会执行同一个topology相关的task
- 在同一个executor中可以执行多个同类型的task, 即在同一个executor中,要么全部是bolt类的task,要么全部是spout类的task
- 运行的时候,spout和bolt需要被包装成一个又一个task
worker,executor, task三者之间的关系可以用下图表示
小结一下,Worker=Process, Executor=Thread, Task=Spout or Bolt.
每一个executor使用的是actor pattern,high level的处理逻辑如下图所示
外部消息的接收和处理
在源码走读之四一文中总结了worker进程内的各种类型的thread,也即executor,这个等同于定义了进程内部和进程间的接口类型。那么这些接口上的消息在具体流传和处理过程中需要定义哪些数据结构,针对这些数据结构,又要做哪些必要的处理呢?
换句话说,就是为什么在worker.clj中有哪些数据和函数存在,不这样做,可以不?
先图示一下,外部消息处理的大概流程。
注:圈起来的数字表示消息转换和处理的序列。
步骤一
监听端口准备就绪,接收线程在收到外部的消息后,面临的问题就是如何确定由哪个task来处理该消息。接收到的tuple中含有task-id,根据task-id可以知道运行该task的executor,executor中有receive-message-queue即(incoming queue)来存放外部的tuple. 定义的数据结构需要反映这个转换过程task-id->executor->receive-queue-map.
那么在worker-data中哪些数据项与这个过程相关呢
- :port
- :executor-receive-queue-map
- :short-executor-receive-queue-map
- :task->short-executor
- :transfer-local-fn
transfer-local-fn将数据从接收线程发送到spout或bolt所在的executor线程。
步骤二
接下来数据会被传递到executor,于是又牵涉到executor的数据结构问题。executor-data由函数mk-executor-data创建,其内容与worker-data比较起来相对较少。
executor收到tuple之后,第一步需要进行反序列化,storm中使用kyro来进行序列化和反序列化,这也是为什么在executor中有该数据项的原因。
executor中与步骤2相关的数据项
- :type executor-type
- :receive-queue
- :deserializer (executor-data中的数据项)
步骤三:
步骤2处理结束,会产生相应的tuple发送到外部。这个过程需要多解释一下,首先tuple不是直接发送给worker的transfer-thread(负责向其它进程发送消息),而是发送给send-handler线程,每一个executor在创建的时候最起码会有两个线程被创建,一个用于运行bolt或spout的处理逻辑,另一个用以负责缓存bolt或spout产生的对外发送的tuple。
一旦snd-hander中的tuple数量达到阀值,这些被缓存的tuple会一次性发送给worker级别的transfer-thread.
executor中与步骤3相关的数据项
- :transfer-fn (mk-executor-transfer-fn batch-transfer->worker)
- :batch-transfer-queue
在步骤3中生成outgoing的tuple,tuple生成的时候需要回答两个基本问题
- tuple中含有哪些字段 -- 该问题的解答由spout或bolt中的declareOutFields来解决
- 由哪个node+port来接收该tuple -- 由grouping来解决,这个时候就可以看出为什么需要task这一层的逻辑抽象了,有关grouping的详细解释,请参考fxjwind撰写的Storm-源码分析-Streaming Grouping (backtype.storm.daemon.executor)
步骤四:
处理逻辑很简单,先将数据缓存,然后在达到阀值之后,一起传送给transfer-thread.
start-batch-transfer->worker-handler
(let [worker-transfer-fn (:transfer-fn worker)
cached-emit (MutableObject. (ArrayList.))
storm-conf (:storm-conf executor-data)
serializer (KryoTupleSerializer. storm-conf (:worker-context executor-data))
]
(disruptor/consume-loop*
(:batch-transfer-queue executor-data)
(disruptor/handler [o seq-id batch-end?]
(let [^ArrayList alist (.getObject cached-emit)]
(.add alist o)
(when batch-end?
(worker-transfer-fn serializer alist)
(.setObject cached-emit (ArrayList.))
)))
:kill-fn (:report-error-and-die executor-data))))
worker-transfer-fn是worker中的transfer-fn,由mk-transfer-fn生成。
(let [local-tasks (-> worker :task-ids set)
local-transfer (:transfer-local-fn worker)
^DisruptorQueue transfer-queue (:transfer-queue worker)]
(fn [^KryoTupleSerializer serializer tuple-batch]
(let [local (ArrayList.)
remote (ArrayList.)]
(fast-list-iter [[task tuple :as pair] tuple-batch]
(if (local-tasks task)
(.add local pair)
(.add remote pair)
))
(local-transfer local)
;; not using map because the lazy seq shows up in perf profiles
(let [serialized-pairs (fast-list-for [[task ^TupleImpl tuple] remote] [task (.serialize serializer tuple)])]
(disruptor/publish transfer-queue serialized-pairs)
)))))
步骤五:
处理函数mk-transfer-tuples-handler,主要进行序列化,将序列化后的数据发送给目的地址。
(let [^DisruptorQueue transfer-queue (:transfer-queue worker)
drainer (ArrayList.)
node+port->socket (:cached-node+port->socket worker)
task->node+port (:cached-task->node+port worker)
endpoint-socket-lock (:endpoint-socket-lock worker)
]
(disruptor/clojure-handler
(fn [packets _ batch-end?]
(.addAll drainer packets)
(when batch-end?
(read-locked endpoint-socket-lock
(let [node+port->socket @node+port->socket
task->node+port @task->node+port]
;; consider doing some automatic batching here (would need to not be serialized at this point to remove per-javascript一种新的对象创建方式-Object.create() - yupeng 阅读原文»
1.Object.create() 是什么?
Object.create(proto [, propertiesObject ]) 是E5中提出的一种新的对象创建方式,第一个参数是要继承的原型,如果不是一个子函数,可以传一个null,第二个参数是对象的属性描述符,这个参数是可选的。
例如:
this.desc = desc;
this.color = "red";
}
Car.prototype = {
getInfo: function() {
return 'A ' + this.color + ' ' + this.desc + '.';
}
};
//instantiate object using the constructor function
var car = Object.create(Car.prototype);
car.color = "blue";
alert(car.getInfo());
结果为:A blue undefined.
2.propertiesObject 参数的详细解释:(默认都为false)
数据属性
- writable:是否可任意写
- configurable:是否能够删除,是否能够被修改
- enumerable:是否能用 for in 枚举
- value:值
访问属性:
- get(): 访问
- set(): 设置
3.例子:直接看例子就知道怎么用。
<html>
<head>
<title>yupeng's document </title>
<meta charset="utf-8"/>
</head>
<body>
<script type="text/javascript">
var obj = {
a:function(){
console.log(100)
},
b:function(){
console.log(200)
},
c:function(){
console.log(300)
}
}
var newObj = {};
newObj = Object.create(obj,{
t1:{
value:'yupeng',
writable:true
},
bar: {
configurable: false,
get: function() { return bar; },
set: function(value) { bar=value }
}
})
console.log(newObj.a());
console.log(newObj.t1);
newObj.t1='yupeng1'
console.log(newObj.t1);
newObj.bar=201;
console.log(newObj.bar)
function Parent() { }
var parent = new Parent();
var child = Object.create(parent, {
dataDescriptor: {
value: "This property uses this string as its value.",
writable: true,
enumerable: true
},
accessorDescriptor: {
get: function () { return "I am returning: " + accessorDescriptor; },
set: function (val) { accessorDescriptor = val; },
configurable: true
没有评论:
发表评论