Ververica Platform 源码分析(提交功能分析)

阅读 200

2022-09-17


文章目录

  • ​​01 引言​​
  • ​​02 提交流程分析​​
  • ​​2.1 Controller层​​
  • ​​2.2 Service层​​
  • ​​2.3 PreviewQuerySubmitter​​
  • ​​2.3.1 SessionCluster (Flink Session集群实例)​​
  • ​​2.3.2 SessionClusterEndpoint (作业提交客户端)​​
  • ​​2.3.3 submitJobGraph(提交作业)​​
  • ​​2.3.4 JobGraphTranslator(JobGraph转换器)​​
  • ​​03 文末​​

01 引言

Ververica Platform官网地址:​​https://docs.ververica.com​​

在前面的博客​​《Ververica Platform 源码分析(校验功能分析)》​​,已经分析了VVP的​​Flink SQL​​校验功能,本文继续讲解VVP的提交功能。

Ververica Platform 源码分析(提交功能分析)_sql

02 提交流程分析

提交的接口:

curl 'http://127.0.0.1:31158/sql/v1beta1/namespaces/default/sqlscripts:submit-preview' \
-H 'Accept: application/json, text/plain, */*' \
-H 'Accept-Language: en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7' \
-H 'Connection: keep-alive' \
-H 'Content-Type: application/json' \
-H 'Origin: http://127.0.0.1:31158' \
-H 'Referer: http://127.0.0.1:31158/app/' \
-H 'User-Agent: Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Mobile Safari/537.36' \
-H 'X-Requested-With: XMLHttpRequest' \
--data-raw '{"statement":"SELECT * FROM pepole"}' \
--compressed \

2.1 Controller层

看看Controller层的代码,已写注释(com.ververica.platform.sql.controller.SqlController#submitPreview):

/**
* 提交预览接口
*
* @param ns 命名空间
* @param statement SQL语句
* @return
*/
@PostMapping({"sqlscripts:submit-preview"})
@PreAuthorize("isNamespaceEditor(#ns)")
public CompletableFuture<SubmitPreviewResponse> submitPreview(@PathVariable NamespaceName ns, @RequestBody Statement statement) {

/*** 根据命名空间名称、SQL语句构建提交预览请求 **/
SubmitPreviewRequest request = SubmitPreviewRequest.newBuilder().setParent(ns.getName()).setStatement(statement).build();

/*** 异步提交预览请求 **/
CompletableFuture<SubmitPreviewResponse> future = new CompletableFuture<>();
SimpleUnaryStreamObserver<SubmitPreviewResponse> streamObserver = new SimpleUnaryStreamObserver(future);
this.sqlService.submitPreview(request, (StreamObserver) streamObserver);

return future;

}

可以看到,核心的代码为 this.sqlService.submitPreview(request, (StreamObserver) streamObserver),继续看看其实现。

2.2 Service层

提交预览接口路径:(com.ververica.platform.sql.service.SqlService#submitPreview),具体的实现如下:

/**
* 提交/预览
*
* @param request 请求的内容
* @param responseObserver
*/
public void submitPreview(SubmitPreviewRequest request, StreamObserver<SubmitPreviewResponse> responseObserver) {

ExecutionContext execContext;

/*** 根据命名空间名称生成NamespaceName **/
NamespaceName namespace = NamespaceName.parse(request.getParent());


try {
/*** 根据命名空间名称获取执行上下文 **/
execContext = this.flinkCatalogProvider.getExecutionContext(namespace, null, null);
} catch (CatalogNotExistException e) {
responseObserver.onError((Throwable) Status.NOT_FOUND
.withDescription("Default catalog not found: " + e.getMessage())
.asException());
return;
} catch (DatabaseNotExistException e) {
responseObserver.onError((Throwable) Status.NOT_FOUND
.withDescription("Default database not found: " + e.getMessage())
.asException());

return;
}

try {
/*** 异步提交预览查询 **/
SubmitPreviewResponse response = this.previewQuerySubmitter.submitPreviewQuery(namespace, execContext, request.getStatement());
responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (NoPreviewSessionClusterException | com.ververica.platform.sql.exception.PreviewSessionClusterFlinkVersionUnsupportedException e) {

log.error("Error submitting preview", e);
responseObserver.onError((Throwable) ((AsGrpcStatusException) e).asGrpcStatusException());
} catch (InvalidPreviewQueryException e) {
log.error("Error submitting preview: {}", request.getStatement(), e);
responseObserver.onError((Throwable) e.asGrpcStatusException());
} catch (Exception e) {
log.error("Unknown error during preview query submission: {}", request.getStatement(), e);
responseObserver.onError((Throwable) Status.INTERNAL
.withDescription(String.format("Error during query submission: %s", new Object[]{e.getMessage()
})).asException());
}
}

可以看到核心的代码在为:SubmitPreviewResponse response = this.previewQuerySubmitter.submitPreviewQuery(namespace, execContext, request.getStatement()),继续看看其实现。

2.3 PreviewQuerySubmitter

具体的实现在com.ververica.platform.sql.service.PreviewQuerySubmitter#submitPreviewQuery,代码如下:

/**
* 提交预览查询
*
* @param namespace 命名空间
* @param executionContext 执行上下文
* @param statement 执行SQL
* @return 提交预览响应结果
*/
public SubmitPreviewResponse submitPreviewQuery(NamespaceName namespace, ExecutionContext executionContext, Statement statement) throws NoSuchResourceException, NoPreviewSessionClusterException, PreviewSessionClusterFlinkVersionUnsupportedException, PreviewSessionClusterNotRunningException, IOException {

/*** 校验SQL合法性 **/
ValidationResultWithOperations validationResponse = this.validationService.validate(statement.getStatement(), executionContext);
checkValidationResponse(validationResponse);

/*** 获取Session集群的唯一资源ID **/
String sessionClusterResourceId = ((Namespace) this.namespaceStorage.get((ResourceName) namespace)).getPreviewSessionClusterName();
if (sessionClusterResourceId.isEmpty()) {
throw new NoPreviewSessionClusterException();
}

/*** 获取并校验Session集群实例(主要校验Flink镜像以及集群运行状态) **/
SessionClusterName clusterName = SessionClusterName.of(namespace, sessionClusterResourceId);
SessionCluster sessionCluster = this.executor.getSessionCluster(clusterName);
verifyFlinkImageTag(sessionCluster);
verifyClusterRunning(sessionCluster);

/*** 获取提交作业客户端 **/
SessionClusterEndpoint clusterEndpoint = this.executor.getEndpoint(clusterName);

/*** 创建并提交Flink任务 **/
JobGraph jobGraph = createJobGraph(namespace, statement);
submitJobGraph(clusterName, clusterEndpoint, jobGraph);


/*** 响应结果,异步,观察者模式 **/
ResolvedSchema outputSchema = getOutputSchema(validationResponse);
byte[] rowType = InstantiationUtil.serializeObject(outputSchema.toSourceRowDataType());
NotifyQuerySubmittedRequest notificationRequest = NotifyQuerySubmittedRequest.newBuilder().setParent(clusterName.getName()).setCollectSinkOperatorId(JobGraphUtil.getCollectSinkOperatorId(jobGraph).toString()).setFlinkJobId(jobGraph.getJobID().toHexString()).setDataType(ByteString.copyFrom(rowType)).build();
NotifyQuerySubmittedResponse notificationResponse = this.executor.<NotifyQuerySubmittedResponse>execute(clusterEndpoint, client -> client.notifyQuerySubmitted(notificationRequest));

return SubmitPreviewResponse.newBuilder()
.setQueryName(notificationResponse.getQueryName())
.addAllColumnNames(outputSchema.getColumnNames())
.build();
}

从代码可以分析出,提交有几个核心的流程:

  • ① 校验​​SQL​​的合法性;
  • ② 获取正在运行的​​Flink Session​​集群;
  • ③ 创建任务(​​JobGraph​​​),并提交到​​Session​​集群。

其中第②③步最为重要,我们来看看。

2.3.1 SessionCluster (Flink Session集群实例)

获取​​SessionCluster​​的代码:

/*** 获取Session集群的唯一资源ID **/
String sessionClusterResourceId = ((Namespace) this.namespaceStorage.get((ResourceName) namespace)).getPreviewSessionClusterName();
if (sessionClusterResourceId.isEmpty()) {
throw new NoPreviewSessionClusterException();
}

/*** 获取并校验Session集群实例(主要校验Flink镜像以及集群运行状态) **/
SessionClusterName clusterName = SessionClusterName.of(namespace, sessionClusterResourceId);
SessionCluster sessionCluster = this.executor.getSessionCluster(clusterName);
verifyFlinkImageTag(sessionCluster);
verifyClusterRunning(sessionCluster);

从上述代码可以得出以下步骤:

  • 首先根据“命名空间名称”从资源管理存储类(​​ResourceStorage​​)里获取session集群的唯一资源ID
  • 然后根据“命名空间名称”“session集群的唯一资源ID”获取集群的名称;
  • 最后根据集群名称从​​ResultFetcherExecutor​​获取“SessionCluster”集群节点。

剩下的就是校验方法了,如下:

/**
* 校验Flink镜像tag
*
* @param sessionCluster session集群
*/
private void verifyFlinkImageTag(SessionCluster sessionCluster) throws PreviewSessionClusterFlinkVersionUnsupportedException {
/*** 获取请求flink镜像tag **/
String requiredFlinkImageTag = this.fvmIndex.getFlinkImageTagForSqlDeployments();

/*** 实际的flink镜像tag **/
String actualFlinkImageTag = sessionCluster.getSpec().getFlinkImageTag();

/*** 校验请求的tag与实际的flink镜像tag是否一致 **/
if (!actualFlinkImageTag.equals(requiredFlinkImageTag)) {
throw PreviewSessionClusterFlinkVersionUnsupportedException.create(requiredFlinkImageTag, actualFlinkImageTag);
}
}


/**
* 校验Flink Session集群是否正在运行
*
* @param sessionCluster Flink Session 集群
*/
private void verifyClusterRunning(SessionCluster sessionCluster) throws PreviewSessionClusterNotRunningException {

/*** 获取Session的状态,判断是否为RUNNING状态 **/
SessionClusterSpec.StateEnum state = sessionCluster.getSpec().getState();
SessionClusterStatus.StateEnum status = sessionCluster.getStatus().getState();
if (state == SessionClusterSpec.StateEnum.RUNNING && (status == SessionClusterStatus.StateEnum.RUNNING || status == SessionClusterStatus.StateEnum.UPDATING)) {
return;
}
throw PreviewSessionClusterNotRunningException.create(sessionCluster);
}

2.3.2 SessionClusterEndpoint (作业提交客户端)

创建好的作业需要提交到​​SessionClusterEndpoint​​,看看其代码:

/*** 获取提交作业客户端 **/
SessionClusterEndpoint clusterEndpoint = this.executor.getEndpoint(clusterName);

经过一系列的查询,最终定位到了这里为提交的作业的接口,代码在:com.ververica.platform.appmanager.client.api.SessionClusterResourceApi#getEndpointUsingGETCall,如下(本质就是提交作业的HTTP客户端):

/**
* HTTP GET提交
*
* @param name 名称
* @param namespace 命名空间
* @param progressListener 进度监听器
* @param progressRequestListener 请求监听器
* @return 操作结果
*/
public Call getEndpointUsingGETCall(String name, String namespace, final ProgressResponseBody.ProgressListener progressListener, ProgressRequestBody.ProgressRequestListener progressRequestListener) throws ApiException {
Object localVarPostBody = null;

String localVarPath = "/api/v1/namespaces/{namespace}/sessionclusters/{name}:getEndpoint".replaceAll("\\{name\\}", this.apiClient.escapeString(name.toString())).replaceAll("\\{namespace\\}", this.apiClient.escapeString(namespace.toString()));

List<Pair> localVarQueryParams = new ArrayList<>();
List<Pair> localVarCollectionQueryParams = new ArrayList<>();
Map<String, String> localVarHeaderParams = new HashMap<>();
Map<String, Object> localVarFormParams = new HashMap<>();

String[] localVarAccepts = {"application/json", "application/yaml"};
String localVarAccept = this.apiClient.selectHeaderAccept(localVarAccepts);
if (localVarAccept != null) localVarHeaderParams.put("Accept", localVarAccept);

String[] localVarContentTypes = {"application/json", "application/yaml"};

String localVarContentType = this.apiClient.selectHeaderContentType(localVarContentTypes);
localVarHeaderParams.put("Content-Type", localVarContentType);

if (progressListener != null) {
this.apiClient.getHttpClient().networkInterceptors().add(new Interceptor() {
public Response intercept(Interceptor.Chain chain) throws IOException {
Response originalResponse = chain.proceed(chain.request());
return originalResponse.newBuilder()
.body((ResponseBody) new ProgressResponseBody(originalResponse.body(), progressListener))
.build();
}
});
}

String[] localVarAuthNames = {"apiKey"};
return this.apiClient.buildCall(localVarPath, "GET", localVarQueryParams, localVarCollectionQueryParams, localVarPostBody, localVarHeaderParams, localVarFormParams, localVarAuthNames, progressRequestListener);
}

2.3.3 submitJobGraph(提交作业)

好了,到了最为核心且关键的一步了,这里需要构建并提交作业。

① 构建作业的代码如下:

/**
* 构建创建作业实例
*
* @param namespace 命名空间
* @param statement SQL
* @return 封装的对象
*/
private JobGraph createJobGraph(NamespaceName namespace, Statement statement) {
JobGraph jobGraph;

/*** 生成任务ID **/
String jobId = UUID.randomUUID().toString().replaceAll("-", "");

/*** 根据Flink版本、任务ID、需要执行的SQL生成JobGraphSpec **/
FlinkVersion supportedFlinkVer = this.fvmIndex.getFlinkVersionForSqlDeployments();
JobGraphSpec jgSpec = getJobGraphSpec(statement, jobId, supportedFlinkVer);

/*** 根据命名空间、JobGraphSpec、支持的Flink版本获取 JobGraphTranslator转换器 **/
JobGraphTranslator jobGraphTranslator = new JobGraphTranslator(namespace, jgSpec, this.tEnvClassLoaderProvider, this.validationService, supportedFlinkVer);


try {
/*** JobGraphTranslator转换器开始转换为JobGraph作业,并使用反射的方式,设置任务名称 **/
Objects.requireNonNull(jobGraphTranslator);
jobGraph = ((JobGraphTranslationResult) this.jobGraphThreadPool.getExecutorService().<JobGraphTranslationResult>submit(jobGraphTranslator::translateJobGraph).get()).getJobGraph();
String sqlQuery = statement.getStatement();
setLimitedJobName(jobGraph, sqlQuery, 20);
} catch (RejectedExecutionException e) {
throw new PreviewExecutionException("Too many concurrent query translation requests", e);
} catch (InterruptedException | IllegalAccessException | NoSuchFieldException | java.util.concurrent.ExecutionException e) {

throw new PreviewExecutionException(e.getMessage(), e);
}

/*** 关联任务所需要的依赖 **/
addJobGraphDependencies(jobGraph, jobGraphTranslator);
return jobGraph;
}

② 提交作业的代码如下:

/**
* 提交作业
*
* @param clusterName 集群名称
* @param clusterEndpoint 提交客户端
* @param jobGraph 作业
*/
private void submitJobGraph(SessionClusterName clusterName, SessionClusterEndpoint clusterEndpoint, JobGraph jobGraph) {
Configuration configuration = new Configuration();
configuration.setString(RestOptions.ADDRESS, clusterEndpoint.getHost());

try {
/*** 使用Rest客户端提交作业 **/
RestClusterClient<String> client = new RestClusterClient(configuration, clusterName.getResourceId());
client.submitJob(jobGraph).get();
} catch (Exception e) {
throw new PreviewExecutionException("Could not submit job graph: " + e.getMessage(), e);
}
}

2.3.4 JobGraphTranslator(JobGraph转换器)

我们注意到了,创建作业的方法核心代码段为:

jobGraph = ((JobGraphTranslationResult) this.jobGraphThreadPool.getExecutorService().<JobGraphTranslationResult>submit(jobGraphTranslator::translateJobGraph).get()).getJobGraph();

具体实现的代码在org.apache.flink.client.deployment.application.JobGraphTranslator#translateJobGraph(),详细代码如下:

/**
* 转换为JobGraph
*
* @return 转换结果
*/
public JobGraphTranslationResult translateJobGraph() throws ValidationException, FlinkException, IOException {
return translateJobGraph(SqlJobEntrypoint.class);
}

/**
* 转换为JobGraph
*
* @param entryClass 入口类
* @return
*/
public JobGraphTranslationResult translateJobGraph(Class<?> entryClass) throws ValidationException, FlinkException, IOException {

/*** 校验 **/
checkIfFlinkVersionRequestCanBeFulfilled(this.jobGraphSpec);

/*** 校验SQL **/
String sqlScript = this.jobGraphSpec.getSqlStatement();
ValidationResultWithOperations result = this.validationService.validateForJobGraphTranslation(this.namespace, sqlScript);
if (ValidationResultUtil.isInvalidResult(result.getResult())) {
String errorMessage = getErrorMessage(result.getErrorDetails());
throw new ValidationException(String.format("SQL Script could not be translated. SQL Script: %s\nError: %s\nMessage: %s", new Object[]{sqlScript, result
.getResult().toString(), errorMessage}));

}
if (!isSupportedQuery(result.getResult())) {
throw new ValidationException(String.format("Either a single 'INSERT INTO' statement or multiple 'INSERT INTO' statements wrapped in a 'BEGIN STATEMENT SET' block are supported for deployments. SQL Script: %s", new Object[]{sqlScript}));
}


SourcesAndSinksVisitor sourcesAndSinksVisitor = new SourcesAndSinksVisitor(result.getTEnv());
sourcesAndSinksVisitor.visit(result.getOperations());

/*** 保存当前线程所属类加载器 **/
String classLoaderKey = this.tEnvClassLoaderProvider.createClassLoaderForKey(Thread.currentThread().getContextClassLoader());
TableEnvClassLoader classLoader = this.tEnvClassLoaderProvider.getClassLoaderForKey(classLoaderKey);

try {
/*** 开始构建JobGraph,包含flink版本、SQL、Sources及Sinks **/
SavepointRestoreSettings savepointRestoreSettings = JobGraphUtil.createSavepointRestoreSettings(this.jobGraphSpec);

Configuration configuration = new Configuration();
Objects.requireNonNull(configuration);
this.jobGraphSpec.getFullFlinkConfigurationMap().forEach(configuration::setString);

String jobId = this.jobGraphSpec.getJobId();

String[] args = buildArgs(classLoaderKey);

log.debug("Generate job graph for job: {} with args: {}", jobId, Arrays.toString((Object[]) args));

ClassPathJobGraphRetriever jobGraphRetriever = ClassPathJobGraphRetriever.newBuilder(JobID.fromHexString(jobId), savepointRestoreSettings, args, (ClassLoader) classLoader).setJobClassName(entryClass.getCanonicalName()).build();

JobGraph jobGraph = jobGraphRetriever.retrieveJobGraph(configuration);

this.udfArtifacts.addAll(classLoader.getUdfDependencies());

this.localDependencies.addAll(classLoader.getLocalDependencies());

return JobGraphTranslationResult.builder()
.jobGraph(jobGraph)
.flinkVersion(this.jobGraphSpec.getFlinkVersion())
.flinkConfiguration(configuration.toMap())
.sqlStatement(sqlScript)
.sinks(sourcesAndSinksVisitor.getSinks())
.sources(sourcesAndSinksVisitor.getSources())
.build();

} finally {
classLoader.close();
this.tEnvClassLoaderProvider.deleteClassLoaderForKey(classLoaderKey);
}

}

ok,到这里,我们知道了整个提交作业的流程了,那么提交到哪里呢?从上述的代码,可以看到其实是提交到了com.ververica.platform.sql.entrypoint.SqlJobEntrypoint这个类,本文由于篇幅原因,将在下一篇博客讲解。

03 文末

本文主要讲解了VVP的提交功能,希望对大家有所帮助,谢谢大家的阅读,本文完!


精彩评论(1)

kcgbiour

代码版本是多少,可以给发一份吗,zlreco@163.com

2023-07-09

1 0 举报