0
点赞
收藏
分享

微信扫一扫

hera源码剖析:一次任务触发的执行流程


文章目录

  • ​​触发任务​​
  • ​​work端​​
  • ​​master端​​
  • ​​run方法​​

在 ​​hera​​​ 中,任务被触发的方式有多种,比如分析师在前端手动执行触发、定时任务触发、依赖任务触发、重跑任务触发、信号丢失的触发等等。但是不管是哪种触发方式最后的入口都是在 ​​Master#run​​​ 方法(开发中心任务触发接口在 ​​Master#debug​​ )。

这里就讲一下手动执行的任务触发流程

触发任务

在最新版本中,任务手动触发类型分为手动执行、手动恢复、超级恢复三种,具体区别就不再赘述,可以通过 ​​hera​​ 操作文档查看,这里以手动恢复为例

hera源码剖析:一次任务触发的执行流程_hera任务调度


当我们点击执行之后,会发送一个请求到后端

work端

首先看下 ​​work​​ 端的堆栈信息

writeAndFlush:28, NettyChannel (com.dfire.core.netty)
writeAndFlush:32, FailFastCluster (com.dfire.core.netty.cluster)
buildMessage:100, WorkerHandleWebRequest (com.dfire.core.netty.worker.request)
handleWebExecute:29, WorkerHandleWebRequest (com.dfire.core.netty.worker.request)
executeJobFromWeb:312, WorkClient (com.dfire.core.netty.worker)
execute:409, ScheduleOperatorController (com.dfire.controller)
invoke:-1, ScheduleOperatorController$$FastClassBySpringCGLIB$$cddb34c8 (com.dfire.controller)
invoke:204, MethodProxy (org.springframework.cglib.proxy)
invokeJoinpoint:738, CglibAopProxy$CglibMethodInvocation (org.springframework.aop.framework)
proceed:157, ReflectiveMethodInvocation (org.springframework.aop.framework)
proceed:85, MethodInvocationProceedingJoinPoint (org.springframework.aop.aspectj)
around:72, HeraAspect (com.dfire.config)
//省略部分

通过堆栈信息我们可以看到,在 ​​controller​​​ 方法被调用之前会先调用一个通过 ​​AOP​​​ 实现的权限校验的方法​​HeraAspect#around​​​,当权限校验通过后会继续调用​​ScheduleOperatorController#execute​​​ 方法,该方法就是任务执行的入口。再往后就是调用 ​​WorkerHandleWebRequest#handleWebExecute​​​ 和 ​​WorkerHandleWebRequest#buildMessage​​​ 方法来创建 ​​netty​​​ 消息体,最后通过一个快速失败的容错方式(​​FailFastCluster#writeAndFlush​​​)来向 ​​master​​​ 发送一条 ​​netty​​ 消息

下面仔细分析下,​​controller​​ 入口

RequestMapping(value = "/manual", method = RequestMethod.GET)
@ResponseBody
@ApiOperation("手动执行接口")
public JsonResponse execute(@JsonSerialize(using = ToStringSerializer.class) @ApiParam(value = "版本id", required = true) Long actionId
, @ApiParam(value = "触发类型,2手动执行,3手动恢复,6超级恢复", required = true) Integer triggerType,
@RequestParam(required = false) @ApiParam(value = "任务执行组", required = false) String execUser) throws InterruptedException, ExecutionException, HeraException, TimeoutException {
//省略部分校验代码
String configs = heraJob.getConfigs();
//新建hera_action_history 对象,并向mysql插入执行记录
HeraJobHistory actionHistory = HeraJobHistory.builder().build();
actionHistory.setJobId(heraAction.getJobId());
actionHistory.setActionId(heraAction.getId());
actionHistory.setTriggerType(triggerTypeEnum.getId());
actionHistory.setOperator(heraJob.getOwner());
actionHistory.setIllustrate(execUser);
actionHistory.setStatus(StatusEnum.RUNNING.toString());
actionHistory.setStatisticEndTime(heraAction.getStatisticEndTime());
actionHistory.setHostGroupId(heraAction.getHostGroupId());
heraJobHistoryService.insert(actionHistory);
heraAction.setScript(heraJob.getScript());
heraAction.setHistoryId(actionHistory.getId());
heraAction.setConfigs(configs);
heraAction.setAuto(heraJob.getAuto());
heraAction.setHostGroupId(heraJob.getHostGroupId());
heraJobActionService.update(heraAction);
//向master 发送任务执行的请求
workClient.executeJobFromWeb(JobExecuteKind.ExecuteKind.ManualKind, actionHistory.getId());

String ownerId = getOwnerId();
if (ownerId == null) {
ownerId = "0";
}
//添加操作记录
addJobRecord(heraJob.getId(), String.valueOf(actionId), RecordTypeEnum.Execute, execUser, ownerId);
return new JsonResponse(true, String.valueOf(actionId));
}

这部分的代码很简单,主要分为三部分
1.创建 ​​​hera_action_history​​​ 对象,向 ​​mysql​​​ 插入任务的执行记录
2.通过 ​​​netty​​​ 向 ​​master​​​ 发送任务执行的消息
3.添加任务执行记录

需要我们关注的主要是第二部分,通过上面的堆栈信息继续往下看

public void executeJobFromWeb(ExecuteKind kind, Long id) throws ExecutionException, InterruptedException, TimeoutException {
RpcWebResponse.WebResponse response = WorkerHandleWebRequest.handleWebExecute(workContext, kind, id).get(HeraGlobalEnv.getRequestTimeout(), TimeUnit.SECONDS);
if (response.getStatus() == ResponseStatus.Status.ERROR) {
ErrorLog.error(response.getErrorText());
}
}

这部分代码调用了 ​​WorkerHandleWebRequest.handleWebExecute​​​ 并返回一个​​future​​​,通过 ​​future.get​​ 来阻塞我们的请求,继续往下看

public static Future<WebResponse> handleWebExecute(final WorkContext workContext, ExecuteKind kind, Long id) {
return buildMessage(WebRequest.newBuilder()
.setRid(AtomicIncrease.getAndIncrement())
.setOperate(WebOperate.ExecuteJob)
.setEk(kind)
.setId(String.valueOf(id))
.build(), workContext, "[执行]-任务超出" + HeraGlobalEnv.getRequestTimeout() + "秒未得到master消息返回:" + id);
}
private static Future<WebResponse> buildMessage(WebRequest request, WorkContext workContext, String errorMsg) {
CountDownLatch latch = new CountDownLatch(1);
WorkResponseListener responseListener = new WorkResponseListener(request, false, latch, null);
workContext.getHandler().addListener(responseListener);
Future<WebResponse> future = workContext.getWorkWebThreadPool().submit(() -> {
latch.await(HeraGlobalEnv.getRequestTimeout(), TimeUnit.SECONDS);
if (!responseListener.getReceiveResult()) {
ErrorLog.error(errorMsg);
}
workContext.getHandler().removeListener(responseListener);
return responseListener.getWebResponse();
});
try {
workContext.getServerChannel().writeAndFlush(SocketMessage.newBuilder()
.setKind(SocketMessage.Kind.WEB_REQUEST)
.setBody(request.toByteString())
.build());
SocketLog.info("1.WorkerHandleWebRequest: send web request to master requestId ={}", request.getRid());
} catch (RemotingException e) {
workContext.getHandler().removeListener(responseListener);
ErrorLog.error("1.WorkerHandleWebRequest: send web request to master exception requestId =" + request.getRid(), e);
}
return future;
}

在 ​​handleWebExecute​​​ 方法中,新建了一个 WebRequest 对象,需要注意的是该对象的 ​​operator​​​ 参数为 ​​WebOperate.ExecuteJob​​​,​​id​​​ 为​​hera_action_history​​​ 记录的 ​​id​​​。
然后在 ​​​buildMessage​​​ 方法中有三个比较关键的代码
1.​​​CountDownLatch latch = new CountDownLatch(1);​​​ 该锁会在一个线程池的异步操作中等待,并且会在​​WorkResponseListener​​​ 中被释放。
2.​​​WorkResponseListener responseListener = new WorkResponseListener(request, false, latch, null);​

public class WorkResponseListener extends ResponseListenerAdapter {


private RpcWebRequest.WebRequest request;
private volatile Boolean receiveResult;
private CountDownLatch latch;
private RpcWebResponse.WebResponse webResponse;

@Override
public void onWebResponse(RpcWebResponse.WebResponse response) {
if (request.getRid() == response.getRid()) {
try {
webResponse = response;
receiveResult = true;
} catch (Exception e) {
ErrorLog.error("work release exception {}", e);
} finally {
latch.countDown();
}
}
}
}

在 ​​onWebResponse​​​ 方法中,当发现​​request.getRid() == response.getRid()​​​时会释放锁,并标志 ​​receiveResult​​​ 为 ​​true​​​ 3.调用 ​​workContext.getServerChannel().writeAndFlush​​ 方法来向master发送任务执行的消息,在上篇​​hera源码剖析:项目启动之分布式锁​​ 已经说过 ​​workContext​​ 是什么时候设置的 ​​serverChannel​

master端

​master​​​ 接收所有 ​​netty​​​ 消息的处理类为 ​​MasterHandler​​​,也就是说上面​​work​​​ 发送的执行任务请求最终会在​​MasterHandler#channelRead​​被调用

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
SocketMessage socketMessage = (SocketMessage) msg;
Channel channel = ctx.channel();
switch (socketMessage.getKind()) {
//省略部分代码
case WEB_REQUEST:
final WebRequest webRequest = WebRequest.newBuilder().mergeFrom(socketMessage.getBody()).build();
switch (webRequest.getOperate()) {
case ExecuteJob:
completionService.submit(() ->
new ChannelResponse(FailBackCluster.wrap(channel), MasterHandlerWebResponse.handleWebExecute(masterContext, webRequest)));
break;
//省略部分代码
}
//省略部分代码
}

}

​MasterHandler​​​ 直接把 ​​work​​​ 的任务执行请求异步分发给 ​​MasterHandlerWebResponse.handleWebExecute​​​ 来处理,并且返回了一个失败重试封装的 ​​channel​

public static WebResponse handleWebExecute(MasterContext context, WebRequest request) {
if (request.getEk() == ExecuteKind.ManualKind || request.getEk() == ExecuteKind.ScheduleKind) {
Long historyId = Long.parseLong(request.getId());
HeraJobHistory heraJobHistory = context.getHeraJobHistoryService().findById(historyId);
HeraJobHistoryVo history = BeanConvertUtils.convert(heraJobHistory);
context.getMaster().run(history, context.getHeraJobService().findById(history.getJobId()));
WebResponse webResponse = WebResponse.newBuilder()
.setRid(request.getRid())
.setOperate(WebOperate.ExecuteJob)
.setStatus(Status.OK)
.build();
TaskLog.info("MasterHandlerWebResponse: send web execute response, actionId = {} ", history.getJobId());
return webResponse;
} else if (request.getEk() == ExecuteKind.DebugKind) {
Long debugId = Long.parseLong(request.getId());
HeraDebugHistoryVo debugHistory = context.getHeraDebugHistoryService().findById(debugId);
TaskLog.info("2-1.MasterHandlerWebResponse: receive web debug response, debugId = " + debugId);
context.getMaster().debug(debugHistory);

WebResponse webResponse = WebResponse.newBuilder()
.setRid(request.getRid())
.setOperate(WebOperate.ExecuteJob)
.setStatus(Status.OK)
.build();
TaskLog.info("2-2.MasterHandlerWebResponse : send web debug response, debugId = {}", debugId);
return webResponse;
}
return WebResponse.newBuilder()
.setRid(request.getRid())
.setErrorText("未识别的操作类型" + request.getEk())
.setStatus(Status.ERROR)
.build();
}

在这里主要是根据​​request.getEk()​​​ 来判断是开发中心的任务执行还是调度中心的任务执行。在我们手动恢复时,该值为:​​ExecuteKind.ManualKind​​​,直接看 ​​if​​ 部分代码。

  • 首先根据​​hera_action_history​​ 的 ​​id​​ 来查询在 ​​work​​ 端插入的那条记录
  • 调用​​master#run​​ 方法
  • 创建​​webResponse​​ 对象,返回执行任务 ​​ok​​ 的标志

run方法

public void run(HeraJobHistoryVo heraJobHistory, HeraJob heraJob) {
Long actionId = heraJobHistory.getActionId();
//重复job检测
//1:检测任务是否已经在队列或者正在执行
if (checkJobExists(heraJobHistory, false)) {
return;
}
HeraAction heraAction = masterContext.getHeraJobActionService().findById(actionId);
Set<String> areaList = areaList(heraJob.getAreaId());
//2:非执行区域任务直接设置为成功
if (!areaList.contains(HeraGlobalEnv.getArea()) && !areaList.contains(Constants.ALL_AREA)) {
ScheduleLog.info("非{}区域任务,直接设置为成功:{}", HeraGlobalEnv.getArea(), heraJob.getId());
heraAction.setLastResult(heraAction.getStatus());
heraAction.setStatus(StatusEnum.SUCCESS.toString());
heraAction.setHistoryId(heraJobHistory.getId());
heraAction.setReadyDependency("{}");
String host = "localhost";
heraAction.setHost(host);
Date endTime = new Date();
heraAction.setStatisticStartTime(endTime);
heraAction.setStatisticEndTime(endTime);
masterContext.getHeraJobActionService().update(heraAction);
heraJobHistory.getLog().append("非" + HeraGlobalEnv.getArea() + "区域任务,直接设置为成功");
heraJobHistory.setStatusEnum(StatusEnum.SUCCESS);
heraJobHistory.setEndTime(endTime);
heraJobHistory.setStartTime(endTime);
heraJobHistory.setExecuteHost(host);
masterContext.getHeraJobHistoryService().update(BeanConvertUtils.convert(heraJobHistory));
HeraJobSuccessEvent successEvent = new HeraJobSuccessEvent(actionId, heraJobHistory.getTriggerType(), heraJobHistory);
masterContext.getDispatcher().forwardEvent(successEvent);
return;
}

//3.先在数据库中set一些执行任务所需的必须值 然后再加入任务队列
heraAction.setLastResult(heraAction.getStatus());
heraAction.setStatus(StatusEnum.RUNNING.toString());
heraAction.setHistoryId(heraJobHistory.getId());
heraAction.setStatisticStartTime(new Date());
heraAction.setStatisticEndTime(null);
masterContext.getHeraJobActionService().update(heraAction);
heraJobHistory.getLog().append(ActionUtil.getTodayString() + " 进入任务队列");
masterContext.getHeraJobHistoryService().update(BeanConvertUtils.convert(heraJobHistory));


boolean isFixed;
int priorityLevel = 3;
Map<String, String> configs = StringUtil.convertStringToMap(heraAction.getConfigs());
String priorityLevelValue = configs.get("run.priority.level");
if (priorityLevelValue != null) {
priorityLevel = Integer.parseInt(priorityLevelValue);
}
String areaFixed = HeraGlobalEnv.getArea() + Constants.POINT + Constants.HERA_EMR_FIXED;
if (configs.containsKey(Constants.HERA_EMR_FIXED) || configs.containsKey(areaFixed)) {
isFixed = Boolean.parseBoolean(configs.get(areaFixed)) || Boolean.parseBoolean(configs.get(Constants.HERA_EMR_FIXED));
} else {
isFixed = Boolean.parseBoolean(getInheritVal(heraAction.getGroupId(), areaFixed, Constants.HERA_EMR_FIXED));
}
Integer endMinute = masterContext.getHeraJobService().findMustEndMinute(heraAction.getJobId());
//4.组装JobElement
JobElement element = JobElement.builder()
.jobId(heraJobHistory.getActionId())
.hostGroupId(heraJobHistory.getHostGroupId())
.priorityLevel(priorityLevel)
.historyId(heraJobHistory.getId())
.fixedEmr(isFixed)
.owner(heraJob.getOwner())
.costMinute(endMinute)
.build();
try {
element.setTriggerType(heraJobHistory.getTriggerType());
HeraAction cacheAction = heraActionMap.get(element.getJobId());
if (cacheAction != null) {
cacheAction.setStatus(StatusEnum.RUNNING.toString());
}
//5.放入任务扫描队列
switch (heraJobHistory.getTriggerType()) {
case MANUAL:
masterContext.getManualQueue().put(element);
break;
case AUTO_RERUN:
masterContext.getRerunQueue().put(element);
break;
case MANUAL_RECOVER:
case SCHEDULE:
masterContext.getScheduleQueue().put(element);
break;
case SUPER_RECOVER:
masterContext.getSuperRecovery().put(element);
break;
default:
ErrorLog.error("不支持的调度类型:{},id:{}", heraJobHistory.getTriggerType().toName(), heraJobHistory.getActionId());
break;
}
} catch (InterruptedException e) {
ErrorLog.error("添加任务" + element.getJobId() + "失败", e);
}
}

​run​​​ 方法的主要功能是将要执行的任务根据类型放到不同的队列。
由于代码较多分段分析

  1. ​checkJobExists​​ 方法检测任务是否已经在队列或者正在执行,如果是允许重复执行任务或者任务重跑触发的任务不会进行检测
  2. 对于非执行区域任务直接设置为成功并且广播通知下游任务,该参数由application.yml中的​​hera.area​​​ 配置决定。另外,如果区域设置为​​all​​,则所有区域都能执行。
  3. 在数据库中​​set​​ 一些执行任务所需的必须值 然后再加入任务队列
  4. 组装​​JobElement​​​,该对象最终会被放到执行队列中。主要参数有:​​costMinute(任务的预计最大执行分钟数)、jobId(任务的执行实例id)、hostGroupId(任务的执行机器组)、priorityLevel(任务的有限级别)、historyId(该任务对应的执行记录 id)、fixedEmr 是否在固定集群执行、owner任务的创建人所在组​
  5. 将任务根据不同的触发类型,放入不同的任务扫描队列,等待​​master​​ 的扫描线程扫描


举报

相关推荐

0 条评论