Fork me on GitHub

监控Yarn资源调度平台资源状态

目录

  • 背景
  • 第一部分 Yarn状态数据接口
  • 第二部分 Java实现
  • 第三部分 总结
  • 参考文献及资料

背景

目前国内大部分企业级的大数据平台资源调度系统都是基于Yarn集群。生产环境上,各种大数据计算框架运行在Yarn上,就需要对Yarn平台的资源情况进行实时监控。虽然Yarn本身提供一个Web管理界面展示平台资源使用情况,但是这些运行状态数据需要实时获取和监控。随着智能化运维推进,需要对监控数据能实时分析、异常检测、自动故障处理。这些场景都需要能实时获取到Yarn平台的状态监控数据。

本文将详细介绍各种监控实现的方法,并重点介绍Java实现。

第一部分 Yarn状态数据接口

1.1 命令行方式

yarn命令在{hadoop_home}/bin路径下,对于部署hadoop客户端的客户端需要加载命令环境变量。

  • 参看任务信息
1
2
3
4
# 查看所有任务信息
yarn application -list
# 查看正在运行的任务信息(带过滤参数appStates)
yarn application -list -appStates RUNNING

这里参数appStates的状态有:ALL,NEW,NEW_SAVING,SUBMITTED,ACCEPTED,RUNNING,FINISHED,FAILED,KILLED

另外还可以指定计算框架的类型,例如:

1
2
# 参看所有MapReduce任务
yarn application -list -appTypes MAPREDUCE
  • 参看指定任务状态信息
1
yarn application -status application_1575989345612_32134

1.2 Restful Api接口

ResourceManager允许用户通过REST API获取有关群集的信息:群集上的状态、群集上的指标、调度程序信息,另外还有群集中节点的信息以及集群上应用程序的运行信息。

  • 查询整个集群指标
1
GET http://http address:port/ws/v1/cluster/metrics
  • 查询集群调度器详情
1
GET http://http address:port/ws/v1/cluster/scheduler
  • 监控任务
1
2
curl http://http address:port/ws/v1/cluster/apps/state
GET http://http address:port/ws/v1/cluster/apps/state
  • 查看指定任务
1
GET http://http address:port/ws/v1/cluster/apps/
  • 查看指定任务的详细信息
1
curl http://http address:port/proxy/ws/v2/mapreduce/info
  • 杀死任务

yarn application -kill application_id

1
2
curl -v -X PUT -d '{"state": "KILLED"}' http://http address:port>/ws/v1/cluster/apps/
PUT http://http address:port/ws/v1/cluster/apps/state

1.2 JMX Metrics监控

首先需要开启jmx,编辑{hadoop_home}/etc/hadoop/yarn-env.sh配置文件,最后天下下面三行配置:

1
2
3
YARN_OPTS="$YARN_OPTS -Dcom.sun.management.jmxremote.authenticate=false"
YARN_OPTS="$YARN_OPTS -Dcom.sun.management.jmxremote.port=8001"
YARN_OPTS="$YARN_OPTS -Dcom.sun.management.jmxremote.ssl=false"

其中8001是服务监听端口。jmx提供了Cluster、Queue、Jvm、FSQueueMetrics信息。

1
2
3
4
# 获取YARN相关的jmx
http://http address:8088/jmx
# 如果想获取NameNode的jmx
http://http address:50070/jmx

上面的方式会获取服务所有的信息(json格式)。如果需要精准获得准确信息,org.apache.hadoop.jmx.JMXJsonServlet类支持三个参数:callbackqryget。其中qry用于过滤,下面的url用于查询Yarnspark用户在default队列上任务信息。

1
http://192.168.1.2:8088/jmx?qry=Hadoop:service=ResourceManager,name=QueueMetrics,q0=root,q1=default,user=spark

更详细的信息参考官网:https://hadoop.apache.org/docs/r1.0.4/api/org/apache/hadoop/jmx/JMXJsonServlet.html

1.3 Python Api接口

对于Python有第三方包支持和yarn进行交互,github地址为:https://github.com/CODAIT/hadoop-yarn-api-python-client

案例代码:

1
2
3
4
5
6
7
from yarn_api_client import ApplicationMaster, HistoryServer, NodeManager, ResourceManager

rm = ResourceManager(address='192.168.1.2', port=8088)
# 获取到ResourceManager的所有apps的信息
rm.cluster_applications().data
# 获取到ResourceManager的具体任务的信息
rm.cluster_application('application_1437445095118_265798').data

对于Hadoop安全集群,还需要部署认证包requests_kerberos。具体可以参考说明文档:https://python-client-for-hadoop-yarn-api.readthedocs.io/en/latest/index.html

第二部分 Java实现

2.1 maven依赖

根据Hadoop的版本添加下面的依赖包:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-api</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-client</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.2</version>
</dependency>

2.2 接口实现

我们将相关配置文件放在resources/conf路径下面,涉及的文件有:

1
2
3
4
5
6
7
# 集群配置文件
core-site.xml
hdfs-site.xml
yarn-site.xml
# 安全认证文件
user.keytab
krb5.conf

下面是案例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package com.main.yarnmonitor;

import com.google.common.collect.Sets;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.exceptions.YarnException;

import java.io.File;
import java.io.IOException;
import java.util.EnumSet;
import java.util.List;
import java.util.Set;

/**
* @program: yarnmonitor
* @description:
* @author: rongxiang
* @create: 2020-03-27 16:44
**/


public class yarnmonitor {
//配置文件路径
private static String confPath = Thread.currentThread().getContextClassLoader().getResource("").getPath()+ File.separator + "conf";

public static void main(String[] args) {
//加载配置文件
Configuration configuration = initConfiguration(confPath);
//初始化安全集群Kerberos配置
initKerberosENV(configuration);
//初始化Yarn 客户端
YarnClient yarnClient = YarnClient.createYarnClient();
yarnClient.init(configuration);
yarnClient.start();
try {
//获得运行的任务应用清单
List<ApplicationReport> applications = yarnClient.getApplications(EnumSet.of(YarnApplicationState.RUNNING));
//存储需要关注的任务信息
HashMap<String, ArrayList<String>> applicationInformation = new HashMap<>();

for (ApplicationReport application:applications) {
String applicationType = application.getApplicationType();
// 只关注CPU核数资源使用超过500的任务
if (getApplicationInfo(application).get(1)>=500) {
applicationInformation.put(String.valueOf(application.getApplicationId()), new ArrayList<String>(){{
add(String.valueOf(application.getName()));
add(String.valueOf(application.getApplicationType()));
add(String.valueOf(application.getQueue()));
add(String.valueOf(getApplicationInfo(application).get(0)));
add(String.valueOf(getApplicationInfo(application).get(1)));
add(String.valueOf(getApplicationInfo(application).get(2)));
}});
}
System.out.println(applicationInformation);
}
//设置监控信息发送邮件
if(!applicationInformation.isEmpty()){
//发送邮件
sendMailYarn();
}
} catch (YarnException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}

/**
* 从ApplicationReport获取信息
* @return applicationInfo
*/
public static ArrayList<Integer> getApplicationInfo(ApplicationReport Application) {
ArrayList<Integer> applicationInfo = new ArrayList<>();
ApplicationResourceUsageReport resourceReport = Application.getApplicationResourceUsageReport();
if (resourceReport != null) {
Resource usedResources = resourceReport.getUsedResources();
int allocatedMb = usedResources.getMemory();
int allocatedVcores = usedResources.getVirtualCores();
int runningContainers = resourceReport.getNumUsedContainers();
//赋值
applicationInfo.add(allocatedMb);
applicationInfo.add(allocatedVcores);
applicationInfo.add(runningContainers);
}
return applicationInfo;
}

/**
* 初始化YARN Configuration
* @return configuration
*/
public static Configuration initConfiguration(String confPath) {
Configuration configuration = new Configuration();
System.out.println(confPath + File.separator + "core-site.xml");
configuration.addResource(new Path(confPath + File.separator + "core-site.xml"));
configuration.addResource(new Path(confPath + File.separator + "hdfs-site.xml"));
configuration.addResource(new Path(confPath + File.separator + "yarn-site.xml"));
return configuration;
}

/**
* 安全集群配置(如果非安全集群这无需该方法)
*/
public static void initKerberosENV(Configuration conf) {
System.setProperty("java.security.krb5.conf", confPath+File.separator+"krb5.conf");
System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
System.setProperty("sun.security.krb5.debug", "false");
try {
UserGroupInformation.setConfiguration(conf);
UserGroupInformation.loginUserFromKeytab("user@HADOOP.COM", confPath+File.separator+"user.keytab");
System.out.println(UserGroupInformation.getCurrentUser());
} catch (IOException e) {
e.printStackTrace();
}
}
}

Yarn客户端YarnClient中定义了方法getApplications,获取到正在运行的任务清单,返回数据类型是:List<ApplicationReport>,如下:

1
List<ApplicationReport> applications = yarnClient.getApplications(EnumSet.of(YarnApplicationState.RUNNING));

对于数据类型ApplicationReport具有方法getApplicationResourceUsageReport()获得每个Yarn任务的ApplicationResourceUsageReport(任务资源报告):

1
ApplicationResourceUsageReport resourceReport = Application.getApplicationResourceUsageReport();

ApplicationResourceUsageReport提供了获取各类资源的方法:

1
2
3
4
5
6
7
Resource usedResources = resourceReport.getUsedResources();
//任务使用的内存资源
int allocatedMb = usedResources.getMemory();
//任务使用的CPU资源
int allocatedVcores = usedResources.getVirtualCores();
//任务使用的容器的数量
int runningContainers = resourceReport.getNumUsedContainers();

第三部分 总结

Java的案例中我们使用了HashMap(applicationInformation)数据类型存储关注的任务信息,然后使用邮件接口发出。在实际使用中可以根据需要存储在elasticsearch集群。

另外对于其他方法,作者没有实际使用,可能存在部分信息未涵盖,可以参考官网文档使用。

参考文献及资料

1、YARN Application Security,链接:https://hadoop.apache.org/docs/r2.7.4/hadoop-yarn/hadoop-yarn-site/YarnApplicationSecurity.html

2、ApplicationResourceUsageReport接口,链接:http://hadoop.apache.org/docs/r2.7.0/api/org/apache/hadoop/yarn/api/records/ApplicationResourceUsageReport.html

3、ResourceManager REST API’s,链接:https://hadoop.apache.org/docs/r2.7.3/hadoop-yarn/hadoop-yarn-site/ResourceManagerRest.html

4、基于Yarn API的Spark程序监控,链接:https://yq.aliyun.com/articles/710902

0%