cachetask task是什么任务

随笔 - 157【Spark Core】任务执行机制跟Task源码浅析1 - 综合
&&&&>>&& 综合
【Spark Core】任务执行机制和Task源码浅析1引言
上一小节《TaskScheduler源码与任务提交原理浅析2》介绍了Driver侧将Stage进行划分,根据Executor闲置情况分发任务,最终通过DriverActor向executorActor发送任务消息。
我们要了解Executor的执行机制首先要了解Executor在Driver侧的注册过程,这篇文章先了解一下Application和Executor的注册过程。
1. Task类及其相关
1.1 Task类
Spark将由Executor执行的Task分为ShuffleMapTask和ResultTask两种,其源码存在scheduler package中。
Task是介于DAGScheduler和TaskScheduler中间的接口,在DAGScheduler,需要把DAG中的每个stage的每个partitions封装成task,最终把taskset提交给TaskScheduler。
* A unit of execution. We have two kinds of Task's in Spark:
* - [[org.apache.spark.scheduler.ShuffleMapTask]]
* - [[org.apache.spark.scheduler.ResultTask]]
* A Spark job consists of one or more stages. The very last stage in a job consists of multiple
* ResultTasks, while earlier stages consist of ShuffleMapTasks. A ResultTask executes the task
* and sends the task output back to the driver application. A ShuffleMapTask executes the task
* and divides the task output to multiple buckets (based on the task's partitioner).
stageId id of the stage this task belongs to
partitionId index of the number in the RDD
private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) extends Serializable
Task对应一个stageId和partitionId。
提供runTask()接口、kill()接口等。
提供killed变量、TaskMetrics变量、TaskContext变量等。
除了上述基本接口和变量,Task的伴生对象提供了序列化和反序列化应用依赖的jar包的方法。原因是Task需要保证工作节点具备本次Task需要的其他依赖,注册到SparkContext下,所以提供了把依赖转成流写入写出的方法。
1.2 ShuffleMapTask
对应于ShuffleMap Stage, 产生的结果作为其他stage的输入。
ShuffleMapTask复写了MapStatus向外读写的方法,因为向外读写的内容包括:stageId,rdd,dep,partitionId,epoch和split(某个partition)。对于其中的stageId,rdd,dep有统一的序列化和反序列化操作并会cache在内存里,再放到ObjectOutput里写出去。序列化操作使用的是Gzip,序列化信息会维护在serializedInfoCache = newHashMap[Int, Array[Byte]]。这部分需要序列化并保存的原因是:stageId,rdd,dep真正代表了本次Shuffle Task的信息,为了减轻master节点负担,把这部分序列化结果cache了起来。
* A ShuffleMapTask divides the elements of an RDD into multiple buckets (based on a partitioner
* specified in the ShuffleDependency).
* See [[org.apache.spark.scheduler.Task]] for more information.
stageId id of the stage this task belongs to
taskBinary broadcast version of of the RDD and the ShuffleDependency. Once deserialized,
the type should be (RDD[_], ShuffleDependency[_, _, _]).
partition partition of the RDD this task is associated with
locs preferred task execution locations for locality scheduling
private[spark] class ShuffleMapTask(
stageId: Int,
taskBinary: Broadcast[Array[Byte]],
partition: Partition,
@transient private var locs: Seq[TaskLocation])
extends Task[MapStatus](stageId, partition.index) with Logging {
1.3 ResultTask
对应于Result Stage直接产生结果。
* A task that sends back the output to the driver application.
* See [[Task]] for more information.
stageId id of the stage this task belongs to
taskBinary broadcasted version of the serialized RDD and the function to apply on each
partition of the given RDD. Once deserialized, the type should be
(RDD[T], (TaskContext, Iterator[T]) =& U).
partition partition of the RDD this task is associated with
locs preferred task execution locations for locality scheduling
outputId index of the task in this job (a job can launch tasks on only a subset of the
input RDD's partitions).
private[spark] class ResultTask[T, U](
stageId: Int,
taskBinary: Broadcast[Array[Byte]],
partition: Partition,
@transient locs: Seq[TaskLocation],
val outputId: Int)
extends Task[U](stageId, partition.index) with Serializable {
1.4 TaskSet
TaskSet是一个数据结构,用于封装一个stage的所有的tasks, 以提交给TaskScheduler。
TaskSet就是可以做pipeline的一组完全相同的task,每个task的处理逻辑完全相同,不同的是处理数据,每个task负责处理一个partition。pipeline,可以称为大数据处理的基石,只有数据进行pipeline处理,才能将其放到集群中去运行。对于一个task来说,它从数据源获得逻辑,然后按照拓扑顺序,顺序执行(实际上是调用rdd的compute)。
* A set of tasks submitted together to the low-level TaskScheduler, usually representing
* missing partitions of a particular stage.
private[spark] class TaskSet(
val tasks: Array[Task[_]],
val stageId: Int,
val attempt: Int,
val priority: Int,
val properties: Properties) {
val id: String = stageId + "." + attempt
override def toString: String = "TaskSet " + id
2. Executor注册到Driver
Driver发送LaunchTask消息被Executor接收,Executor会使用launchTask对消息进行处理。
不过在这个过程之前,我们要知道,如果Executor没有注册到Driver,即便接收到LaunchTask指令,也不会做任务处理。所以我们要先搞清楚,Executor是如何在Driver侧进行注册的。
2.1 Application注册
Executor的注册是发生在Application的注册过程中的,我们以Standalone模式为例:
SparkContext创建schedulerBackend和taskScheduler,schedulerBackend作为TaskScheduler对象的一个成员存在 –& 在TaskScheduler对象调用start函数时,其实调用了backend.start()函数 –&
backend.start()函数中启动了AppClient,AppClient的其中一个参数ApplicationDescription就是封装的运行CoarseGrainedExecutorBackend的命令
AppClient内部启动了一个ClientActor,这个ClientActor启动之后,会尝试向Master发送一个指令actor ! RegisterApplication(appDescription) 注册一个Application
下面是SparkDeploySchedulerBackend的start函数中的部分注册Application的代码:
val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
val javaOpts = sparkJavaOpts ++ extraJavaOpts
val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
appUIAddress, sc.eventLogDir, sc.eventLogCodec)
client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
client.start()
AppClient向Master提交Application
AppClient是Application和Master交互的接口。它的包含一个类型为org.apache.spark.deploy.client.AppClient.ClientActor的成员变量actor。它负责了所有的与Master的交互。其中提交Application过程涉及的函数调用为:
ClientActor的preStart() –& 调用registerWithMaster() –& 调用tryRegisterAllMasters –& actor ! RegisterApplication(appDescription) –& Master的receiveWithLogging函数处理RegisterApplication消息。
下面是RegisterApplication(appDescription)消息的相关处理代码(在Master.scala中的receiveWithLogging部分代码):
case RegisterApplication(description) =& {
if (state == RecoveryState.STANDBY) {
logInfo("Registering app " + description.name)
val app = createApplication(description, sender)
registerApplication(app)
logInfo("Registered app " + description.name + " with ID " + app.id)
persistenceEngine.addApplication(app)
sender ! RegisteredApplication(app.id, masterUrl)
schedule()
这段代码做了以下几件事:
createApplication为这个app构建一个描述App数据结构的ApplicationInfo
注册该Application,更新相应的映射关系,添加到等待队列里面
用persistenceEngine持久化Application信息,默认是不保存的,另外还有两种方式,保存在文件或者Zookeeper当中
通过发送方注册成功
开始作业调度(为处于待分配资源的Application分配资源。在每次有新的Application加入或者新的资源加入时都会调用schedule进行调度)
2.2 Master中的schedule函数
schedule()为处于待分配资源的Application分配资源。在每次有新的Application加入或者新的资源加入时都会调用schedule进行调度。为Application分配资源选择worker(executor),现在有两种策略:
尽量的打散,即一个Application尽可能多的分配到不同的节点。这个可以通过设置spark.deploy.spreadOut来实现。默认值为true,即尽量的打散。
尽量的集中,即一个Application尽量分配到尽可能少的节点。
对于同一个Application,它在一个worker上只能拥有一个executor;当然了,这个executor可能拥有多于1个core。对于策略1,任务的部署会慢于策略2,但是GC的时间会更快。
schedule函数的源码,解释在中文注释中:
private def schedule() {
if (state != RecoveryState.ALIVE) { return }
val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
val numWorkersAlive = shuffledAliveWorkers.size
var curPos = 0
for (driver &- waitingDrivers.toList) {
var launched = false
var numWorkersVisited = 0
while (numWorkersVisited & numWorkersAlive && !launched) {
val worker = shuffledAliveWorkers(curPos)
numWorkersVisited += 1
if (worker.memoryFree &= driver.desc.mem && worker.coresFree &= driver.desc.cores) {
launchDriver(worker, driver)
waitingDrivers -= driver
launched = true
curPos = (curPos + 1) % numWorkersAlive
if (spreadOutApps) {
for (app &- waitingApps if app.coresLeft & 0) {
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(canUse(app, _)).sortBy(_.coresFree).reverse
val numUsable = usableWorkers.length
val assigned = new Array[Int](numUsable)
var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
var pos = 0
while (toAssign & 0) {
if (usableWorkers(pos).coresFree - assigned(pos) & 0) {
toAssign -= 1
assigned(pos) += 1
pos = (pos + 1) % numUsable
for (pos &- 0 until numUsable) {
if (assigned(pos) & 0) {
val exec = app.addExecutor(usableWorkers(pos), assigned(pos))
launchExecutor(usableWorkers(pos), exec)
app.state = ApplicationState.RUNNING
for (worker &- workers if worker.coresFree & 0 && worker.state == WorkerState.ALIVE) {
for (app &- waitingApps if app.coresLeft & 0) {
if (canUse(app, worker)) {
val coresToUse = math.min(worker.coresFree, app.coresLeft)
if (coresToUse & 0) {
val exec = app.addExecutor(worker, coresToUse)
launchExecutor(worker, exec)
app.state = ApplicationState.RUNNING
2.3 launchExecutor函数
在选择了worker和确定了worker上得executor需要的CPU core数后,Master会调用 launchExecutor(worker: WorkerInfo, exec: ExecutorInfo)向Worker发送请求,向AppClient发送executor已经添加的消息。同时会更新master保存的worker的信息,包括增加executor,减少可用的CPU core数和memory数。Master不会等到真正在worker上成功启动executor后再更新worker的信息。如果worker启动executor失败,那么它会发送FAILED的消息给Master,Master收到该消息时再次更新worker的信息即可。
def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc) {
logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
worker.addExecutor(exec)
worker.actor ! LaunchExecutor(masterUrl,
exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)
exec.application.driver ! ExecutorAdded(
exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)
2.4 Executor的创建
下面的调用关系链是Worker接收到来自Master的LaunchExecutor消息后的调用过程:
LaunchExecutor的消息处理中创建ExecutorRunner –&
ExecutorRunner会将在SparkDeploySchedulerBackend中准备好的ApplicationDescription以进程的形式启动起来
启动ApplicationDescription中携带的CoarseGrainedExecutorBackend
CoarseGrainedExecutorBackend启动后,会首先通过传入的driverUrl这个参数向在CoarseGrainedSchedulerBackend::DriverActor发送RegisterExecutor消息
DriverActor会回复RegisteredExecutor
CoarseGrainedExecutorBackend会创建一个Executor
Executor创建完毕。
CoarseGrainedExecutorBackend启动后,preStart函数执行的相关操作:
override def preStart() {
logInfo("Connecting to driver: " + driverUrl)
driver = context.actorSelection(driverUrl)
driver ! RegisterExecutor(executorId, hostPort, cores, extractLogUrls)
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
CoarseGrainedExecutorBackend接收RegisteredExecutor消息后,创建Executor的操作:
override def receiveWithLogging = {
case RegisteredExecutor =&
logInfo("Successfully registered with driver")
val (hostname, _) = Utils.parseHostPort(hostPort)
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
Spark Core源码分析: Spark任务模型
Spark技术内幕:Executor分配详解 —— 强烈推荐该博文,其中博主结合Spark源码对Executor的分配讲解的非常详细
转载请注明作者Jason Ding及其出处
GitCafe博客主页(http://jasonding1354.gitcafe.io/)
Github博客主页(http://jasonding1354.github.io/)
CSDN博客(http://blog.csdn.net/jasonding1354)
简书主页(/users/2bd9b48f6ea8/latest_articles)
Google搜索jasonding1354进入我的博客主页
版权声明:本文为博主原创文章,未经博主允许不得转载。Win7任务计划程序出错的解决办法_百度文库
两大类热门资源免费畅读
续费一年阅读会员,立省24元!
Win7任务计划程序出错的解决办法
上传于||文档简介
&&任​务​映​像​已​损​坏​或​篡​改​,​异​常​来​自​H​R​E​S​U​L​T​:x0431
阅读已结束,如果下载本文需要使用0下载券
想免费下载更多文档?
你可能喜欢君,已阅读到文档的结尾了呢~~
Win7任务计划程序出错的解决办..
扫扫二维码,随身浏览文档
手机或平板扫扫即可继续访问
Win7任务计划程序出错的解决办法
举报该文档为侵权文档。
举报该文档含有违规或不良信息。
反馈该文档无法正常浏览。
举报该文档为重复文档。
推荐理由:
将文档分享至:
分享完整地址
文档地址:
粘贴到BBS或博客
flash地址:
支持嵌入FLASH地址的网站使用
html代码:
&embed src='/DocinViewer-4.swf' width='100%' height='600' type=application/x-shockwave-flash ALLOWFULLSCREEN='true' ALLOWSCRIPTACCESS='always'&&/embed&
450px*300px480px*400px650px*490px
支持嵌入HTML代码的网站使用
您的内容已经提交成功
您所提交的内容需要审核后才能发布,请您等待!
3秒自动关闭窗口python使用celery做异步任务时的常见的问题 | 峰云就她了
5,849 views
我这里用的是celery redis的组合,rabbitmq有些重型。
启动的时候报错,查了下文档,问题确认是全局变量一个参数的问题。
[root@devops-ruifengyun ~ ]$
[root@devops-ruifengyun ~ ]$ celery -A tasks worker --loglevel=debug
Running a worker with superuser privileges when the
worker accepts messages serialized with pickle is a very bad idea!
If you really want to continue then you have to set the C_FORCE_ROOT
environment variable (but please think about this before you do).
User information: uid=0 euid=0 gid=0 egid=0
[root@devops-ruifengyun ~ ]$ export C_FORCE_ROOT="true"
[root@devops-ruifengyun ~ ]$ celery -A tasks worker --loglevel=debug
/usr/local/lib/python2.7/dist-packages/celery/platforms.py:762: RuntimeWarning: You are running the worker with superuser privileges, which is
absolutely not recommended!
Please specify a different user using the -u option.
1234567891011121314151617
&[root@devops-ruifengyun ~ ]$ [root@devops-ruifengyun ~ ]$ celery -A tasks worker --loglevel=debugRunning a worker with superuser privileges when theworker accepts messages serialized with pickle is a very bad idea!&If you really want to continue then you have to set the C_FORCE_ROOTenvironment variable (but please think about this before you do).&User information: uid=0 euid=0 gid=0 egid=0&[root@devops-ruifengyun ~ ]$ export C_FORCE_ROOT="true"[root@devops-ruifengyun ~ ]$ celery -A tasks worker --loglevel=debug/usr/local/lib/python2.7/dist-packages/celery/platforms.py:762: RuntimeWarning: You are running the worker with superuser privileges, which isabsolutely not recommended!&Please specify a different user using the -u option.
解决办法:
export C_FORCE_ROOT="true"
&export C_FORCE_ROOT="true"
–& 294 & & & & meta = self._get_task_meta_for(task_id)
& & 295 & & & & if cache and meta.get(‘status’) == states.SUCCESS:
& & 296 & & & & & & self._cache[task_id] = meta
AttributeError: ‘DisabledBackend’ object has no attribute ‘_get_task_meta_for’
pip install celery-with-redis
&pip install celery-with-redis
测试的代码:
[root@devops-ruifengyun ~ ]$ cat celeryconfig.py
BROKER_URL = "redis://127.0.0.1:6379/0"
CELERY_IMPORTS = ("tasks.add", )
[root@devops-ruifengyun ~ ]$
[root@devops-ruifengyun ~ ]$ cat test.py
from tasks import add
if __name__ == '__main__':
for i in range(100):
for j in range(100):
kk=add.delay(i, j)
kk.ready()
12345678910111213
&[root@devops-ruifengyun ~ ]$ cat celeryconfig.pyBROKER_URL = "redis://127.0.0.1:6379/0"CELERY_IMPORTS = ("tasks.add", )[root@devops-ruifengyun ~ ]$ [root@devops-ruifengyun ~ ]$ cat test.py from tasks import addif __name__ == '__main__':&&&&for i in range(100):&&&&&&&&for j in range(100):&&&&&&&&&&&&kk=add.delay(i, j)&&&&&&&&&&&&kk.ready()&&&&&&&&&&&&kk.get()
The full contents of the message body was:
{‘utc’: True, ‘chord’: None, ‘args’: (0, 0), ‘retries’: 0, ‘expires’: None, ‘task’: ‘tasks.add’, ‘callbacks’: None, ‘errbacks’: None, ‘timelimit’: (None, None), ‘taskset’: None, ‘kwargs’: {}, ‘eta’: None, ‘id’: &#a71-ecda-4ef8-98ce-f223a28c0b97′} (209b)
Traceback (most recent call last):
& File “/usr/local/lib/python2.7/dist-packages/celery/worker/consumer.py”, line 455, in on_task_received
& & strategies[name](message, body,
KeyError: ‘tasks.add’
[ 09:57:16,673: ERROR/MainProcess] Received unregistered task of type ‘tasks.add’.
The message has been ignored and discarded.
Did you remember to import the module containing this task?
Or maybe you are using relative imports?
Please see http://bit.ly/gLye1c for more information.
The full contents of the message body was:
{‘utc’: True, ‘chord’: None, ‘args’: (0, 0), ‘retries’: 0, ‘expires’: None, ‘task’: ‘tasks.add’, ‘callbacks’: None, ‘errbacks’: None, ‘timelimit’: (None, None), ‘taskset’: None, ‘kwargs’: {}, ‘eta’: None, ‘id’: ‘eb1b8772-ca7f-b4db2a82aa6′} (209b)
Traceback (most recent call last):
& File “/usr/local/lib/python2.7/dist-packages/celery/worker/consumer.py”, line 455, in on_task_received
& & strategies[name](message, body,
KeyError: ‘tasks.add’
解决的办法:
最好是用配置文件的方式:
celery --config=celeryconfig --loglevel=INFO
BROKER_URL = 'amqp://'
CELERY_RESULT_BACKEND = 'amqp://'
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'America/Los_Angeles'
CELERY_ENABLE_UTC = True
CELERY_IMPORTS = ("tasks",)
123456789101112
&celery --config=celeryconfig --loglevel=INFO&BROKER_URL = 'amqp://'CELERY_RESULT_BACKEND = 'amqp://'&CELERY_TASK_SERIALIZER = 'json'CELERY_RESULT_SERIALIZER = 'json'CELERY_TIMEZONE = 'America/Los_Angeles'CELERY_ENABLE_UTC = True&CELERY_IMPORTS = ("tasks",)
文章的原文,blog.xiaorui.cc
如果大家觉得文章对你有一些的用处! &帮忙点击广告这样一来能刺激我写博客的欲望,二来好维护云主机的费用。另外再次标注下博客的原地址 && …… &&感谢!
您可能也喜欢:
& & & celery这东西在任务调度方面,很有一套的,用了他也有几年了,下面就给大家介绍下我以前使用过celery的项目。

我要回帖

更多关于 cachetask是什么 的文章

 

随机推荐