文章目录
- 01 引言
- 02 Entrypoint源码分析
- 2.1 SqlJobOptionsParser
- 2.2 FlinkCatalogProvider
- 2.3 TableEnvClassLoader
- 2.4 VvpTableEnvironment
- 2.5 OperationsUtil
- 03 文末
01 引言
在前面的博客,已经讲解了VVP的校验与提交源码,有兴趣的同学可以阅读下:
- 《Ververica Platform 源码分析(校验功能分析)》
- 《Ververica Platform 源码分析(提交功能分析)》
本文紧接着这前面的文章,主要讲解提交作业后,作业在Entrypoint
运行的流程。
02 Entrypoint源码分析
Entrypoint的类路径在:com.ververica.platform.sql.entrypoint.SqlJobEntrypoint:
下面看看它的代码:
/**
* Flink SQL作业 任务入口类
*
* @author : YangLinWei
* @createTime: 2022/9/13 3:53 下午
* @version: 1.0.0
*/
public class SqlJobEntrypoint {
private final FlinkCatalogProvider flinkCatalogProvider;
private final TableEnvProvider tableEnvProvider;
private final TableEnvClassLoaderProvider tableEnvClassLoaderProvider;
private final OperationSequenceMatcher operationSequenceMatcher;
/**
* 构造函数
*
* @param flinkCatalogProvider 目录提供者
* @param tableEnvProvider TableEnvironment提供者
* @param tableEnvClassLoaderProvider 类加载器内容提供者
*/
public SqlJobEntrypoint(FlinkCatalogProvider flinkCatalogProvider, TableEnvProvider tableEnvProvider, TableEnvClassLoaderProvider tableEnvClassLoaderProvider) {
this.operationSequenceMatcher = new OperationSequenceMatcher();
this.flinkCatalogProvider = flinkCatalogProvider;
this.tableEnvProvider = tableEnvProvider;
this.tableEnvClassLoaderProvider = tableEnvClassLoaderProvider;
}
/**
* 程序入口
*
* @param args 作业参数
*/
public static void main(String[] args) throws Exception {
/*** 校验,判断作业参数是否为空,如果为空输出help **/
if (args.length < 1) {
SqlJobOptionsParser.printHelpClient();
return;
}
/*** 解析并将入参封装到作业参数 **/
SqlJobOptions options = SqlJobOptionsParser.parseClient(args);
/*** 构造SqlJobEntrypoint **/
SqlJobEntrypoint entrypoint = new SqlJobEntrypoint(BeanAware.<FlinkCatalogProvider>getBean(FlinkCatalogProvider.class), BeanAware.<TableEnvProvider>getBean(TableEnvProvider.class), BeanAware.<TableEnvClassLoaderProvider>getBean(TableEnvClassLoaderProvider.class));
/*** 执行作业 **/
entrypoint.execute(options);
}
/**
* 执行方法
*
* @param options 作业参数
*/
void execute(SqlJobOptions options) throws Exception {
/*** 根据key值获取类加载器 **/
String clKey = options.getClassLoaderKey();
TableEnvClassLoader tEnvClassLoader = this.tableEnvClassLoaderProvider.getClassLoaderForKey(clKey);
/*** 执行 **/
NamespaceName namespace = NamespaceName.of(options.getNamespace());
VvpTableEnvironment tEnv = this.tableEnvProvider.getTableEnvironment(namespace, tEnvClassLoader);
ExecutorResult executorResult = (ExecutorResult) ClassLoaderWrapper.execute(() -> executeInClassLoader(options, tEnv), (ClassLoader) tEnvClassLoader);
if (!executorResult.isSuccess) {
throw executorResult.getException();
}
}
/**
* 在类加载器里面执行
*
* @param options 执行参数
* @param tEnv TableEnviorment
* @return
*/
private ExecutorResult executeInClassLoader(SqlJobOptions options, VvpTableEnvironment tEnv) {
ExecutorResult.ExecutorResultBuilder resultBuilder = ExecutorResult.builder();
NamespaceName namespace = NamespaceName.of(options.getNamespace());
try {
/*** 配置并构造执行上下文 **/
ExecutionContext executionCtx = this.flinkCatalogProvider.getExecutionContext(namespace, options
.getCatalogName().orElse(null), options
.getDatabaseName().orElse(null));
executionCtx.configureEnvironment((TableEnvironment) tEnv);
configureEnvironment((TableEnvironment) tEnv, options.getDynamicProperties());
/*** 根据SQL获取执行的SQL pipelines **/
List<Operation> operations = tEnv.getParser().parse(options.getScript());
/*** 开始执行每一段SQL **/
OperationsUtil.executeOperations(tEnv, operations);
resultBuilder.isSuccess(true);
} catch (Exception e) {
resultBuilder.isSuccess(false).exception(e);
}
return resultBuilder.build();
}
}
从上述的代码可以看出流程是比较简单的,但背后的逻辑没这么简单的,这里有几个核心的类,分别是:
- SqlJobOptionsParser:主要用于解析入参的作业参数;
- FlinkCatalogProvider: 主要是创建VVP类型的Catalog,里面做了很多的业务,如下载依赖的Jar包并加载进类加载器;
- TableEnvClassLoader:自定义类加载器,主要用于动态加载需要的jar包;
- VvpTableEnvironment:自定义的TableEnviroment,最为核心且关键的类;
- OperationsUtil:SQL执行的操作工具类(责任链模式),逐条执行SQL。
好了,接下来讲解这些核心的类。
2.1 SqlJobOptionsParser
SqlJobOptionsParser其实就是解析入参的工具类,目的是转换为“SqlJobOptions
”,代码及解析如下:
/**
* SQL任务参数 解析器
*/
public class SqlJobOptionsParser {
/*** 入参:命名空间、目录、数据库、脚本、类加载器key值、其它参数 **/
public static final Option OPTION_NAMESPACE = Option.builder("n").required(true).longOpt("namespace").numberOfArgs(1).argName("String: namespace").desc("The namespace that the job belongs to.").build();
public static final Option OPTION_DEFAULT_CATALOG = Option.builder("c").required(false).longOpt("catalog").numberOfArgs(1).argName("String: catalog").desc("The default catalog name.").build();
public static final Option OPTION_DEFAULT_DATABASE = Option.builder("d").required(false).longOpt("database").numberOfArgs(1).argName("String: database").desc("The default database name.").build();
public static final Option OPTION_SCRIPT = Option.builder("s").required(true).longOpt("script").numberOfArgs(1).argName("String: sql script").desc("The SQL script to run.").build();
public static final Option OPTION_CLASS_LOADER_KEY = Option.builder("k").required(true).longOpt("class_loader_key").numberOfArgs(1).argName("String: class loader key").desc("The class loader key.").build();
public static final Option OPTION_DYNAMIC_PROPERTY = Option.builder("D").required(false).numberOfArgs(2).argName("property=value").valueSeparator('=').desc("Use value for given property").build();
public static final Options CLIENT_OPTIONS = getClientOptions(new Options());
public static Options getClientOptions(Options options) {
options.addOption(OPTION_NAMESPACE);
options.addOption(OPTION_DEFAULT_CATALOG);
options.addOption(OPTION_DEFAULT_DATABASE);
options.addOption(OPTION_SCRIPT);
options.addOption(OPTION_CLASS_LOADER_KEY);
options.addOption(OPTION_DYNAMIC_PROPERTY);
return options;
}
/**
* 入参 args -> SqlJobOptions
* @param args 入参
* @return 转换结果
*/
public static SqlJobOptions parseClient(String[] args) {
try {
DefaultParser parser = new DefaultParser();
CommandLine line = parser.parse(CLIENT_OPTIONS, args, true);
return new SqlJobOptions(line
.getOptionValue(OPTION_NAMESPACE.getOpt()), line
.getOptionValue(OPTION_DEFAULT_CATALOG.getOpt()), line
.getOptionValue(OPTION_DEFAULT_DATABASE.getOpt()), line
.getOptionValue(OPTION_CLASS_LOADER_KEY.getOpt()), line
.getOptionValue(OPTION_SCRIPT.getOpt()), line
.getOptionProperties(OPTION_DYNAMIC_PROPERTY.getOpt()));
} catch (ParseException e) {
printHelpClient();
throw new RuntimeException(e.getMessage());
}
}
/**
* 打印帮助提示
*/
public static void printHelpClient() {
System.out.println("./SqlJobEntrypoint [OPTIONS]");
System.out.println();
System.out.println("The following options are available:");
HelpFormatter formatter = new HelpFormatter();
formatter.setLeftPadding(5);
formatter.setWidth(100);
formatter.printHelp(" ", CLIENT_OPTIONS);
System.out.println();
}
}
2.2 FlinkCatalogProvider
FlinkCatalogProvider最终的目的就是创建Vvp类型的Catalog,其流程如下:
/*** 配置并构造执行上下文 **/
ExecutionContext executionCtx = this.flinkCatalogProvider.getExecutionContext(namespace, options
.getCatalogName().orElse(null), options
.getDatabaseName().orElse(null));
executionCtx.configureEnvironment((TableEnvironment) tEnv);
executionCtx.configureEnvironment(tEnv);
FlinkCatalogProvider的getExecutionContext
方法:
/**
* 获取执行上下文
*
* @param namespace 命名空间
* @param catalog 目录
* @param database 数据库
* @return 执行上下文
*/
public ExecutionContext getExecutionContext(NamespaceName namespace, @Nullable String catalog, @Nullable String database) throws CatalogNotExistException, DatabaseNotExistException {
/*** 获取目录 **/
String catName = Optional.<String>ofNullable(Strings.emptyToNull(catalog)).orElse(this.catalogConfig.getDefaultCatalogName());
Catalog cat = getCatalog(namespace, catName).<Throwable>orElseThrow(() -> new CatalogNotExistException(catName));
/*** 获取数据库 **/
String dbName = Optional.<String>ofNullable(Strings.emptyToNull(database)).orElse(cat.getDefaultDatabase());
cat.getDatabase(dbName);
return ExecutionContext.builder()
.namespace(namespace)
.catalog(catName)
.database(dbName)
.build();
}
获取Catalog方法:
/**
* 从资源里加载Catalog
*
* @param catalogResource catalog资源信息
* @param tableEnvClassLoader 类加载器
* @param useCache 是否使用缓存(否,重新下载)
* @return Catalog
*/
private Catalog convertFromResource(Catalog catalogResource, TableEnvClassLoader tableEnvClassLoader, boolean useCache) {
CatalogName catName = CatalogName.parse(catalogResource.getName());
Map<String, String> catProperties = catalogResource.getPropertiesMap();
String catType = catProperties.getOrDefault(CommonCatalogOptions.CATALOG_TYPE.key(), "");
FlinkCatalogFactory catalogFactory = this.catalogFactories.computeIfAbsent(catType, type -> (FlinkCatalogFactory) this.catalogFactoryProvider.getCatalogFactory(catalogResource).<Throwable>orElseThrow(()));
/*** 创建Catalog **/
if (catalogFactory instanceof UnCacheableFlinkCatalogFactory) {
return ((UnCacheableFlinkCatalogFactory) catalogFactory).createCatalog(catName, catProperties, tableEnvClassLoader);
}
if (catalogFactory instanceof ExternalFlinkCatalogFactory) {
Optional<ExternalCatalog> cachedCatalog = useCache ? this.catalogCacheManager.getCatalog(catName, catProperties) : Optional.<ExternalCatalog>empty();
ExternalCatalog externalCat = cachedCatalog.orElseGet(() -> ((ExternalFlinkCatalogFactory) catalogFactory).createCatalog(catName, catProperties));
this.catalogCacheManager.putCatalog(catName, catProperties, externalCat);
tableEnvClassLoader.addClassLoader((ClassLoader) externalCat.getCatalogClassLoader());
return externalCat;
}
throw new RuntimeException(String.format("Unknown FlinkCatalogFactory type: %s", new Object[]{catalogFactory
.getClass().getCanonicalName()}));
}
继续深入,VvpFlinkCatalogFactory的createCatalog方法:
public Catalog createCatalog(CatalogName catName, Map<String, String> properties, TableEnvClassLoader tableEnvClassLoader) {
return new VvpCatalog(catName, properties, this.dao, tableEnvClassLoader, this.connectorClassLoaderProvider, this.udfClassLoaderProvider);
}
ok到这里可以看到了最终新建了一个VvpCatalog实例,里面代码大致如下(这里就不再详述了,因为文章篇幅,直接写出其实现的功能🤦♂️):
2.3 TableEnvClassLoader
TableEnvClassLoader为VVP
的自定义类加载器,其主要作用是动态加载运行时需要的类(注意:加载的Jar包需要先下载)。
/**
* 自定义类加载器
*
* @author : YangLinWei
* @createTime: 2022/9/13 4:15 下午
* @version: 1.0.0
*/
public class TableEnvClassLoader extends ClassLoader implements LocalDependencyCollector {
private static final Logger log = LoggerFactory.getLogger(TableEnvClassLoader.class);
private final List<ClassLoader> classLoaders = new ArrayList<>();
protected TableEnvClassLoader(ClassLoader parent) {
super(parent);
}
/**
* 创建类加载器
*
* @return 类加载器(默认为当前线程的所属加载器)
*/
public static TableEnvClassLoader create() {
return new TableEnvClassLoader(Thread.currentThread().getContextClassLoader());
}
/**
* 添加类加载器
*
* @param cl 类加载器
*/
public void addClassLoader(ClassLoader cl) {
this.classLoaders.add(cl);
}
/**
* 根据类的全限定名获取类
*
* @param clazzName 类名
*/
public Class<?> loadClass(String clazzName) throws ClassNotFoundException {
try {
return super.loadClass(clazzName);
} catch (ClassNotFoundException e) {
ClassNotFoundException cnfe = e;
for (ClassLoader cl : this.classLoaders) {
try {
return cl.loadClass(clazzName);
} catch (ClassNotFoundException classNotFoundException) {
}
}
throw cnfe;
}
}
/********************* 获取下载资源的本地路径 ************************************/
public URL getResource(String name) {
URL res = super.getResource(name);
if (res != null) {
return res;
}
return this.classLoaders.stream()
.map(cl -> cl.getResource(name))
.filter(Objects::nonNull)
.findFirst().orElse(null);
}
public Enumeration<URL> getResources(String name) {
return new StreamEnumerator<>(resources(name));
}
public Stream<URL> resources(String name) {
Stream<URL> parentResources = getParent().resources(name);
Stream<URL> wrappedResources = this.classLoaders.stream().flatMap(cl -> cl.resources(name));
return Streams.concat(new Stream[]{parentResources, wrappedResources});
}
public Set<URL> getLocalDependencies() {
return (Set<URL>) this.classLoaders.stream()
.filter(cl -> cl instanceof LocalDependencyCollector)
.flatMap(cl -> ((LocalDependencyCollector) cl).getLocalDependencies().stream())
.collect(Collectors.toSet());
}
public Set<UdfArtifact> getUdfDependencies() {
return (Set<UdfArtifact>) this.classLoaders.stream()
.filter(cl -> cl instanceof UdfAwareClassLoader)
.flatMap(cl -> ((UdfAwareClassLoader) cl).getUdfDependencies().stream())
.collect(Collectors.toSet());
}
public void close() {
this.classLoaders.stream()
.filter(cl -> cl instanceof UdfAwareClassLoader)
.forEach(cl -> {
try {
((UdfAwareClassLoader) cl).close();
} catch (IOException e) {
log.warn("Failed to close UDF classloader. This may cause problems when updating UDFs.", e);
}
});
}
private static class StreamEnumerator<T>
implements Enumeration<T> {
private final ArrayDeque<T> elements;
public StreamEnumerator(Stream<T> elementsStream) {
this.elements = elementsStream.collect(Collectors.toCollection(ArrayDeque::new));
}
public boolean hasMoreElements() {
return !this.elements.isEmpty();
}
public T nextElement() {
return this.elements.pop();
}
}
@VisibleForTesting
public List<ClassLoader> getClassLoaders() {
return this.classLoaders;
}
}
2.4 VvpTableEnvironment
VvpTableEnvironment是整个Entrypoint
最为关键的类,它可以配置作业运行时所需要的参数,代码及注释如下:
/**
* 自定义TableEnviroment
* <p>
* 基于 org.apache.flink.table.api.internal.TableEnvironmentImpl实现
*
* @author : YangLinWei
* @createTime: 2022/9/13 4:21 下午
* @version: 1.0.0
*/
public class VvpTableEnvironment extends TableEnvironmentImpl {
private static final Logger log = LoggerFactory.getLogger(VvpTableEnvironment.class);
private final Field userClassLoaderField;
/**
* 构造函数
*
* @param catalogManager Catalog管理者
* @param moduleManager 模块管理者
* @param tableConfig 表配置
* @param executor 执行器
* @param functionCatalog 函数目录
* @param planner Planner
* @param isStreamingMode 是否为Streaming模式
* @param tEnvClassLoader 类加载器
*/
protected VvpTableEnvironment(CatalogManager catalogManager, ModuleManager moduleManager, TableConfig tableConfig, Executor executor, FunctionCatalog functionCatalog, Planner planner, boolean isStreamingMode, TableEnvClassLoader tEnvClassLoader) {
super(catalogManager, moduleManager, tableConfig, executor, functionCatalog, planner, isStreamingMode, (ClassLoader) tEnvClassLoader);
/*** 初始化一些参数 **/
Configuration conf = getConfig().getConfiguration();
conf.set(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, Boolean.valueOf(true));
conf.set(PipelineOptions.OBJECT_REUSE, Boolean.valueOf(true));
conf.setBoolean("table.exec.hive.infer-source-parallelism", false);
try {
/*** 反射获取用户定义的类加载器 **/
this.userClassLoaderField = TableEnvironmentImpl.class.getDeclaredField("userClassLoader");
this.userClassLoaderField.setAccessible(true);
} catch (NoSuchFieldException e) {
String msg = "Cannot get declared field 'TableEnvironmentImpl.userClassLoader'";
log.error(msg, e);
throw new ExecuteOperationException(msg, e);
}
}
/**
* 创建 VvpTableEnvironment
*
* @param settings 环境配置
* @param defaultCatalog 默认目录
* @param otherCatalogs 其它目录
* @param tEnvClassLoader 自定义类加载器
* @return VvpTableEnvironment
*/
public static VvpTableEnvironment create(EnvironmentSettings settings, Catalog defaultCatalog, Map<String, Catalog> otherCatalogs, TableEnvClassLoader tEnvClassLoader) {
TableConfig tableConfig = new TableConfig();
/*** 初始化CatalogManager **/
CatalogManager catalogManager = CatalogManager.newBuilder().classLoader(Thread.currentThread().getContextClassLoader()).config((ReadableConfig) tableConfig.getConfiguration()).defaultCatalog(settings.getBuiltInCatalogName(), defaultCatalog).build();
/*** 遍历往CatalogManager注册Catalog **/
Objects.requireNonNull(catalogManager);
otherCatalogs.forEach(catalogManager::registerCatalog);
/*** 初始化FunctionCatalog **/
ModuleManager moduleManager = new ModuleManager();
FunctionCatalog functionCatalog = new FunctionCatalog(tableConfig, catalogManager, moduleManager);
/*** 获取Executor **/
ExecutorFactory executorFactory = (ExecutorFactory) FactoryUtil.discoverFactory(
Thread.currentThread().getContextClassLoader(), ExecutorFactory.class, settings
.getExecutor());
Executor executor = executorFactory.create(settings.toConfiguration());
if (!settings.isStreamingMode()) {
log.error("Could not create VvpTableEnvironment in batch mode. Only streaming mode is supported.");
throw new TableException("Currently only supports stream mode!");
}
/*** 初始化VvpStreamPlanner **/
VvpStreamPlanner vvpStreamPlanner = new VvpStreamPlanner(executor, tableConfig, functionCatalog, catalogManager);
return new VvpTableEnvironment(catalogManager, moduleManager, tableConfig, executor, functionCatalog, (Planner) vvpStreamPlanner, settings
.isStreamingMode(), tEnvClassLoader);
}
}
从代码里,可以看出有核心的两部分代码,分别为:
- CatalogManager:它的功能是注册
Catalog
,具体在Flink
源码里面的org.apache.flink.table.catalog.CatalogManager; - ExecutorFactory:主要功能是获取Executor,具体在Flink源码里面的org.apache.flink.table.factories.FactoryUtil#discoverFactory;
- VvpStreamPlanner:Planner计划执行器。
2.5 OperationsUtil
最后,传入的每一条SQL都是从OperationsUtil里执行的,流程如下:
/*** 根据SQL获取执行的SQL pipelines **/
List<Operation> operations = tEnv.getParser().parse(options.getScript());
/*** 开始执行每一段SQL **/
OperationsUtil.executeOperations(tEnv, operations);
核心的代码如下:
/**
* 执行脚本
*
* @param tEnv 自定义TableEnviorment
* @param operations 执行的SQL
*/
private static void executeEnvironmentOperations(VvpTableEnvironment tEnv, List<Operation> operations) {
List<Operation> executableOperations = (List<Operation>) operations.stream().filter(operation -> (operationSequenceMatcher.getPhase(operation) == OperationSequenceMatcher.EvaluationPhase.TABLE_ENVIRONMENT)).collect(Collectors.toList());
if (executableOperations.isEmpty()) {
return;
}
if (executableOperations.size() == 1) {
tEnv.executeInternal(executableOperations.get(0));
return;
}
List<ModifyOperation> statementSetOperations = (List<ModifyOperation>) executableOperations.stream().map(operation -> {
if (!(operation instanceof ModifyOperation))
throw new TableException(String.format("Encountered operation '%s' when only %s is expected.", new Object[]{operation, ModifyOperation.class.getSimpleName()}));
return (ModifyOperation) operation;
}).collect(Collectors.toList());
/*** 执行 **/
tEnv.executeInternal(statementSetOperations);
}
03 文末
本文主要讲解了Ververica Platform
的Entrypoint
源码,希望能帮助到大家,谢谢大家的阅读,本文完!