失眠网,内容丰富有趣,生活中的好帮手!
失眠网 > YARN-client提交任务处理过程

YARN-client提交任务处理过程

时间:2021-05-29 14:19:50

相关推荐

YARN-client提交任务处理过程

Client向RM提交任务的过程大致分为七步,如下图:

1. Client向RM发出请求

2. RM返回一个ApplicationID作为回应

3. Client向RM回应Application Submission Context(ASC)。ASC包括ApplicationID、user、queue,以及其他一些启动AM相关的信息,除此之外,还有一个Container Launch Context(CLC),CLC包含了资源请求数(内存与CPU),job files,安全token,以及其他一些用以在一个node上启动AM的信息。任务一旦提交以后,client可以请求RM去杀死应用或查询应用的运行状态

4. 当RM接受到ASC后,它会调度一个合适的container来启动AM,这个container经常被称作为container 0。AM需要请求其他的container来运行任务,如果没有合适的container,AM就不能启动。当有合适的container时,RM发请求到合适的NM上,来启动AM。这时候,AM的PRC与监控的URL就已经建立了。

5. 当AM启动起来后,RM回应给AM集群的最小与最大资源等信息。这时AM必须决定如何使用那么当前可用的资源。YARN不像那些请求固定资源的scheduler,它能够根据集群的当前状态动态调整。

6. AM根据从RM那里得知的可使用的资源,它会请求一些一定数目的container。This request can be very specific,including containers with multiples of the resource minimum values (e.g., extra memory)。

7. RM将会根据调度策略,尽可能的满足AM申请的container。也就是会分配container给AM,然后这些container的node manager会与AM进行通信,AM会向这些container的node manager发送启动容器的必要配置。

同时,在一个job运行时,AM会向RM汇报心跳与进度信息,在这些心跳过程中,AM可能去申请或释放container。会当任务完成时,AM向RM发送一条任务结束信息然后退出。如下图所示:

接下来,用一个简单的例子来说明整个过程。

代码参考:/trumanz/hadoop_the_definitive_guide/tree/master/ch04-yarn/yarnExample

YARN client编写

1.创建一个Application

YarnClientApplication app = yarnClient.createApplication();

createApplication()方法在hadoop源码中yarn project中的org.apache.hadoop.yarn.client.api.impl中YARNClientImpl.java实现。

2.设置Application的名字

app.getApplicationSubmissionContext().setApplicationName( "truman.ApplicationMaster");

getApplicationSubmissionContext()方法位于org.apache.hadoop.yarn.client.api中的YarnClientApplication.java中

setApplicationName()方法位于org.apache.hadoop.yarn.client.api中的ApplicationSubmissionContextPBImpl.java中

3.设置Application的内存和CPU需求以及优先级和queue信息,YARN中RM将根据这些信息来选择合适的container来启动APP master,这个container经常被称作为container 0。

app.getApplicationSubmissionContext().setResource(Resource.newInstance(100, 1));app.getApplicationSubmissionContext().setPriority(Priority.newInstance(0));app.getApplicationSubmissionContext().setQueue("default");

setResource(),setResource(),setQueue()这些方法都在org.apache.hadoop.yarn.client.api中的ApplicationSubmissionContextPBImpl.java中

4.设置ContainerLaunchContext,这一步,amContainer中包含了App Master执行所需要的资源文件,环境变量和启动命令,这里将资源文件上传到了HDFS,这样在Node Manager就可以通过HDFS取得这些文件。

app.getApplicationSubmissionContext().setAMContainerSpec(amContainer);

setAMContainerSpec()方法位于org.apache.hadoop.yarn.client.api中的ApplicationSubmissionContextPBImpl.java中。

5.提交应用给RM

ApplicationId appId = yarnClient.submitApplication(app.getApplicationSubmissionContext());

submitApplication()方法位于org.apache.hadoop.yarn.client.api.impl中YARNClientImpl.java中。

对于client的编写还是比较简单的,不需要维护状态,只需要提交相应的消息给RM就行。

YARN APP Master编写

这部分编写比较复杂,AM需要与RM和NM通信,交互。

通过RM,申请container,并接受RM的一些信息,如可用的container资源,结束container等。

通过NM,启动container,并接收NM的信息,如container的状态变化以及Node状态变化等。

1.创建一个AMRMClientAsync对象,负责与RM交互通信

amRMClient = AMRMClientAsync.createAMRMClientAsync( 1000, new RMCallbackHandler());

这里的RMCallbackHandler 是我们编写的继承自AMRMClientAsync.CallbackHandler 的一个类,其功能是处理由Resource Manager收到的消息,

其需要实现的方法由如下

public void onContainersCompleted(List<ContainerStatus> statuses);

public void onContainersAllocated(List<Container> containers) ;

public void onShutdownRequest() ;

public void onNodesUpdated(List<NodeReport> updatedNodes) ;

public void onError(Throwable e) ;

这里不考虑异常的情况下,只写onContainersAllocated, onContainersCompleted 这两个既可以, 一个是当有新的Container 可以使用, 一个是Container 运行结束。

在onContainersAllocated 我们需要编写 启动container 的代码,amNMClient.startContainerAsync(container, ctx); 这里的ctx 同Yarn Client 中第4步中的amContainer 是同一个类型, 即这个container 执行的一些资源,环境变量与命令等, 因为这是在回调函数中,为了保证时效性,这个操作最好放在线程池中异步操作。

在onContainersCompleted 中,如果是失败的Container,我们需要重新申请并启动Container,(这一点有可能是YARN的 Fair Schedule 中会强制退出某些Container 以释放资源) 成功的将做记录既可以。

2.创建一个NMClientAsyncImpl对象,负责与NM交互通信

amNMClient = new NMClientAsyncImpl(new NMCallbackHandler());

这里NMCallbackHandler 使我们需要编写的继承自NMClientAsync.CallbackHandler 的对象,其功能是处理由Node Manager 收到的消息

public void onContainerStarted(ContainerId containerId, Map<String, ByteBuffer> allServiceResponse);

public void onContainerStatusReceived(ContainerId containerId, ContainerStatus containerStatus);

public void onContainerStopped(ContainerId containerId) ;

public void onStartContainerError(ContainerId containerId, Throwable t);

public void onGetContainerStatusError(ContainerId containerId, Throwable t) ;

public void onStopContainerError(ContainerId containerId, Throwable t);

这里简单的不考虑异常的情况下,这些函数可以写一个空函数体,忽略掉处理。

3.将自己(AM)注册到RM上

RegisterApplicationMasterResponse response = amRMClient.registerApplicationMaster(NetUtils.getHostname(), -1, "");

该函数将自己注册到RM上,没有提供RPC port和trackURL。

该方法在org.apache.hadoop.yarn.client.api.impl的AMRMClientImpl.java中。

4.向RM申请container

ContainerRequest containerAsk = new ContainerRequest(//100*10M + 1vcpuResource.newInstance(100, 1), null, null,Priority.newInstance(0));amRMClient.addContainerRequest(containerAsk);

这里一个containerAsk 表示申请一个 Container, 这里的对nodes和rasks 设置为NULL,猜测MapReduce应该由参数来尝试申请靠近HDFS block的container的。

addContainerRequest()方法在org.apache.hadoop.yarn.client.api.impl的AMRMClientImpl.java中。

5.等待container执行完毕,清除退出

我的代码如下, 循环等待container 执行完毕,并上报执行结果

void waitComplete() throws YarnException, IOException{while(numTotalContainers.get() != numCompletedConatiners.get()){try{Thread.sleep(1000);LOG.info("waitComplete" + ", numTotalContainers=" + numTotalContainers.get() +", numCompletedConatiners=" + numCompletedConatiners.get());} catch (InterruptedException ex){}}exeService.shutdown();amNMClient.stop();amRMClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, "dummy Message", null);amRMClient.stop();}

YARNContainer Application

真正处理数据的是由APP master中amNMClient.startContainerAsync(container, ctx)提交的 Container application,也就是提交给具体的container执行的工作。然后这这个应用并不需要特殊编写,任何程序通过提交相应的运行信息都可以在这些Node中的某个Container 中执行, 所以这个程序可以是一个复杂的MapReduce Task 或者 是一个简单的脚本。

总结:

YARN 提供了对cluster 资源管理 和 作业调度的功能。

编写一个应用运行在YARN 之上,比较复杂的是App Mstr 的编写,其需要维护container 的状态并能共做一些错误恢复,重启应用的操作。 比较简答的是Client的编写,只需要提交必须的信息既可以,不需要维护状态。 真正运行处理数据的是Container Application ,这个程序可以不需要针对YARN做代码编写。

如果觉得《YARN-client提交任务处理过程》对你有帮助,请点赞、收藏,并留下你的观点哦!

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。