请注意,这个Application的name不是固定的,所以你上面的程序不符合我的需求。请给出完整的程序
时间: 2024-02-20 21:00:44 浏览: 18
好的,我理解了。那么我可以修改一下代码,在获取任务信息时,使用正则表达式匹配任务名称。以下是完整的程序:
```java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.ApplicationBaseProtocol;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import java.io.IOException;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class HiveSQLCircuitBreaker {
private static final int THREAD_POOL_SIZE = 1;
private static final int CHECK_INTERVAL_SECONDS = 60;
private static final int THRESHOLD = 100;
private static final String TASK_NAME_PATTERN = ".*HiveSQL.*";
private static volatile boolean running = true;
private static final Configuration conf = new YarnConfiguration();
public static void main(String[] args) {
YarnClient yarnClient = YarnClient.createYarnClient();
yarnClient.init(conf);
yarnClient.start();
ExecutorService executor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
executor.submit(() -> {
while (running) {
try {
checkTempFiles(yarnClient);
Thread.sleep(CHECK_INTERVAL_SECONDS * 1000L);
} catch (InterruptedException e) {
// ignore
}
}
});
// 等待程序退出
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
running = false;
executor.shutdown();
try {
yarnClient.stop();
} catch (YarnException | IOException e) {
e.printStackTrace();
}
}));
}
private static void checkTempFiles(YarnClient yarnClient) throws YarnException, IOException {
List<ApplicationReport> apps = yarnClient.getApplications();
for (ApplicationReport app : apps) {
if (app.getName().matches(TASK_NAME_PATTERN)) {
// 获取应用程序ID
ApplicationId appId = app.getApplicationId();
// 获取任务ID
String taskId = appId.toString();
// 获取临时文件路径
List<String> tempFiles = getTempFiles(appId);
// 判断临时文件是否超出阈值
if (tempFiles.size() > THRESHOLD) {
// 杀死任务
killTask(taskId);
}
}
}
}
private static List<String> getTempFiles(ApplicationId appId) throws IOException {
Configuration conf = new YarnConfiguration();
ApplicationBaseProtocol client = ApplicationClientProtocolProvider.createProtocolProxy(conf, appId);
String[] logDirs = conf.getStrings(YarnConfiguration.NM_LOG_DIRS);
StringBuilder sb = new StringBuilder();
for (String logDir : logDirs) {
sb.append(logDir).append(',');
}
sb.deleteCharAt(sb.length() - 1);
String[] appDirs = new String[]{sb.toString() + "/application_" + appId.getId()};
List<String> tempFiles = new ArrayList<>();
for (String appDir : appDirs) {
Path tempPath = new Path(appDir, "tmp");
FileSystem fs = tempPath.getFileSystem(conf);
if (fs.exists(tempPath)) {
FileStatus[] statuses = fs.listStatus(tempPath);
for (FileStatus status : statuses) {
if (status.getPath().getName().startsWith("hive_")) {
tempFiles.add(status.getPath().toString());
}
}
}
}
return tempFiles;
}
private static void killTask(String taskId) throws IOException, YarnException {
YarnClient yarnClient = YarnClient.createYarnClient();
yarnClient.init(conf);
yarnClient.start();
List<ApplicationReport> apps = yarnClient.getApplications();
for (ApplicationReport app : apps) {
if (app.getApplicationId().toString().equals(taskId)) {
yarnClient.killApplication(app.getApplicationId());
break;
}
}
yarnClient.stop();
}
}
```
在这个程序中,我使用正则表达式 `".*HiveSQL.*"` 来匹配任务名称,如果任务名称包含 "HiveSQL" 字符串,则认为这是一个HiveSQL任务。同时,我将YARN客户端的初始化和关闭放在了程序的全局范围内,这样可以避免在每次循环中都重新创建和销毁客户端对象,从而提高程序的效率。