0
点赞
收藏
分享

微信扫一扫

dremio CommandCreator 简单说明

CommandCreator 主要是基于不同业务规则进行conmandrunner 的生成,以下是一个简单说明

CommandCreator 的作用

  • 基于request 创建包装的command
  • 包装多种conmandrunner (基于用户请求类型,包装不同的commandrunner)
  • 构建后变任务处理需要的上下文信息(比如sqlConverter,sqlHandlerConfig)

参考类

dremio CommandCreator 简单说明_java

 

 

CommandRunner 类

参考子类,后边会介绍下,jdbc 以及web 经常会用到HandlerToExec

dremio CommandCreator 简单说明_任务处理_02

 

 

调用

实际CommandCreator 的调用是由AttemptManager 的run 方法执行的,同时源头是ForemenWorkManager(和drill 一致)AttemptManager我以前简单介绍过,可以参考

  • run 部分处理
    对于CommandCreator 的依赖处理核心是使用了CommandPool

 

@Override

public void run() {

// rename the thread we're using for debugging purposes

final Thread currentThread = Thread.currentThread();

final String originalName = currentThread.getName();

currentThread.setName(queryIdString + ":foreman");

 

 

try {

injector.injectChecked(queryContext.getExecutionControls(), INJECTOR_TRY_BEGINNING_ERROR,

ForemanException.class);

 

observer.beginState(AttemptObserver.toEvent(AttemptEvent.State.PENDING));

 

observer.queryStarted(queryRequest, queryContext.getSession().getCredentials().getUserName());

 

String ruleSetEngine = ruleBasedEngineSelector.resolveAndUpdateEngine(queryContext);

ResourceSchedulingProperties resourceSchedulingProperties = new ResourceSchedulingProperties();

resourceSchedulingProperties.setRoutingEngine(queryContext.getSession().getRoutingEngine());

resourceSchedulingProperties.setRuleSetEngine(ruleSetEngine);

final GroupResourceInformation groupResourceInformation =

maestroService.getGroupResourceInformation(queryContext.getOptions(), resourceSchedulingProperties);

queryContext.setGroupResourceInformation(groupResourceInformation);

 

// Checks for Run Query privileges for the selected Engine

checkRunQueryAccessPrivilege(groupResourceInformation);

 

// planning is done in the command pool

commandPool.submit(CommandPool.Priority.LOW, attemptId.toString() + ":foreman-planning",

(waitInMillis) -> {

observer.commandPoolWait(waitInMillis);

 

injector.injectPause(queryContext.getExecutionControls(), INJECTOR_PENDING_PAUSE, logger);

injector.injectChecked(queryContext.getExecutionControls(), INJECTOR_PENDING_ERROR,

ForemanException.class);

 

plan();

injector.injectPause(queryContext.getExecutionControls(), INJECTOR_PLAN_PAUSE, logger);

injector.injectChecked(queryContext.getExecutionControls(), INJECTOR_PLAN_ERROR,

ForemanException.class);

return null;

}, runInSameThread).get(); // 等待plan()中的创建成功

plan 方法,会等待实际的CommandRunner plan 执行完成

private void plan() throws Exception {

// query parsing and dataset retrieval(both from source and kvstore).

observer.beginState(AttemptObserver.toEvent(AttemptEvent.State.METADATA_RETRIEVAL));

 

CommandCreator creator = newCommandCreator(queryContext, observer, prepareId);

command = creator.toCommand();

logger.debug("Using command: {}.", command);

 

injector.injectPause(queryContext.getExecutionControls(), INJECTOR_METADATA_RETRIEVAL_PAUSE, logger);

 

switch (command.getCommandType()) {

case ASYNC_QUERY:

Preconditions.checkState(command instanceof AsyncCommand, "Asynchronous query must be an AsyncCommand");

command.plan();

break;

 

case SYNC_QUERY:

case SYNC_RESPONSE:

moveToState(QueryState.STARTING, null);

command.plan();

extraResultData = command.execute();

addToEventQueue(QueryState.COMPLETED, null);

break;

 

default:

throw new IllegalStateException(

String.format("command type %s not supported in plan()", command.getCommandType()));

}

profileTracker.setPrepareId(prepareId.value);

}

参考资料

sabot/kernel/src/main/java/com/dremio/exec/planner/sql/handlers/commands/CommandCreator.java

sabot/kernel/src/main/java/com/dremio/exec/work/foreman/AttemptManager.java

举报

相关推荐

0 条评论