0
点赞
收藏
分享

微信扫一扫

Flink源码剖析:Jar包任务提交流程

沪钢木子 2021-09-21 阅读 67

Flink基于用户程序生成JobGraph,提交到集群进行分布式部署运行。本篇从源码角度讲解一下Flink Jar包是如何被提交到集群的。(本文源码基于Flink 1.11.3)

1 Flink run 提交Jar包流程分析

首先分析run脚本可以找到入口类CliFrontend,这个类在main方法中解析参数,基于第二个参数定位到run方法:

try {

    // do actionswitch (action) {

        case ACTION_RUN:

            run(params);

            return0;

        case ACTION_RUN_APPLICATION:

            runApplication(params);

            return0;

        case ACTION_LIST:

            list(params);

            return0;

        case ACTION_INFO:

            info(params);

            return0;

        case ACTION_CANCEL:

            cancel(params);

            return0;

        case ACTION_STOP:

            stop(params);

            return0;

        case ACTION_SAVEPOINT:

            savepoint(params);

            return0;

        case"-h":

        case"--help":

            ...

            return0;

        case"-v":

        case"--version":

            ...

        default:

            ...

            return1;

    }

}

在run方法中,根据classpath、用户指定的jar、main函数等信息创建PackagedProgram。在Flink中通过Jar方式提交的任务都封装成了PackagedProgram对象。

protectedvoidrun(String[] args)throws Exception {

    ...

    finalProgramOptions programOptions = ProgramOptions.create(commandLine);

    finalPackagedProgram program = getPackagedProgram(programOptions);

    // 把用户的jar配置到config里面finalList jobJars = program.getJobJarAndDependencies();

    finalConfiguration effectiveConfiguration = getEffectiveConfiguration(

            activeCommandLine, commandLine, programOptions, jobJars);

    try {

        executeProgram(effectiveConfiguration, program);

    } finally {

        program.deleteExtractedLibraries();

    }

}

创建PackagedProgram后,有个非常关键的步骤就是这个effectiveConfig,这里面会把相关的Jar都放入pipeline.jars这个属性里,后面pipeline提交作业时,这些jar也会一起提交到集群。

其中比较关键的是Flink的类加载机制,为了避免用户自己的jar内与其他用户冲突,采用了逆转类加载顺序的机制。

private PackagedProgram(

        @Nullable File jarFile,

        List classpaths,

        @Nullable String entryPointClassName,

        Configuration configuration,

        SavepointRestoreSettings savepointRestoreSettings,

        String... args) throws ProgramInvocationException {

    // 依赖的资源this.classpaths = checkNotNull(classpaths);

    // 保存点配置this.savepointSettings = checkNotNull(savepointRestoreSettings);

    // 参数配置this.args = checkNotNull(args);

    // 用户jarthis.jarFile = loadJarFile(jarFile);

    // 自定义类加载this.userCodeClassLoader = ClientUtils.buildUserCodeClassLoader(

        getJobJarAndDependencies(),

        classpaths,

        getClass().getClassLoader(),

        configuration);

    // 加载main函数this.mainClass = loadMainClass(

        entryPointClassName !=null? entryPointClassName : getEntryPointClassNameFromJar(this.jarFile),

        userCodeClassLoader);

}

在类加载器工具类中根据参数classloader.resolve-order决定是父类优先还是子类优先,默认是使用子类优先模式。

executeProgram方法内部是启动任务的核心,在完成一系列的环境初始化后(主要是类加载以及一些输出信息),会调用packagedProgram的invokeInteractiveModeForExecution的,在这个方法里通过反射调用用户的main方法。

privatestaticvoidcallMainMethod(Class entryClass, String[] args)

    throws ProgramInvocationException {

    ...

    Method mainMethod = entryClass.getMethod("main", String[].class);

    mainMethod.invoke(null, (Object) args);

    ...

}

执行用户的main方法后,就是flink的标准流程了。创建env、构建StreamDAG、生成Pipeline、提交到集群、阻塞运行。当main程序执行完毕,整个run脚本程序也就退出了。

总结来说,Flink提交Jar任务的流程是:

1 脚本入口程序根据参数决定做什么操作

2 创建PackagedProgram,准备相关jar和类加载器

3 通过反射调用用户Main方法

4 构建Pipeline,提交到集群

2 通过PackagedProgram获取Pipeline

有的时候不想通过阻塞的方式卡任务执行状态,需要通过类似JobClient的客户端异步查询程序状态,并提供停止退出的能力。

要了解这个流程,首先要了解Pipeline是什么。用户编写的Flink程序,无论是DataStream API还是SQL,最终编译出的都是Pipeline。只是DataStream API编译出的是StreamGraph,而SQL编译出的Plan。Pipeline会在env.execute()中进行编译并提交到集群。

既然这样,此时可以思考一个问题:Jar包任务是独立的Main方法,如何能抽取其中的用户程序获得Pipeline呢?

通过浏览源码的单元测试,发现了一个很好用的工具类:PackagedProgramUtils。

publicstatic Pipeline getPipelineFromProgram(

        PackagedProgram program,

        Configuration configuration,

        int parallelism,

        booleansuppressOutput)throws CompilerException, ProgramInvocationException {

    // 切换classloaderfinalClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();

    Thread.currentThread().setContextClassLoader(program.getUserCodeClassLoader());

    // 创建envOptimizerPlanEnvironment benv =new OptimizerPlanEnvironment(

        configuration,

        program.getUserCodeClassLoader(),

        parallelism);

    benv.setAsContext();

    StreamPlanEnvironment senv =new StreamPlanEnvironment(

        configuration,

        program.getUserCodeClassLoader(),

        parallelism);

    senv.setAsContext();

    try {

        // 执行用户main方法        program.invokeInteractiveModeForExecution();

    } catch (Throwable t) {

        if(benv.getPipeline() !=null) {

            return benv.getPipeline();

        }

        if(senv.getPipeline() !=null) {

            return senv.getPipeline();

        }

        ...

    } finally {

        // 重置classloader    }

}

这个工具类首先在线程内创建了一个env,这个env通过threadload保存到当前线程中。当通过反射调用用户代码main方法时,内部的getEnv函数直接从threadlocal中获取到这个env。

ThreadLocal factory =newThreadLocal<>();publicstatic StreamExecutionEnvironment getExecutionEnvironment() {

        return Utils.resolveFactory(factory , contextEnvironmentFactory)

            .map(StreamExecutionEnvironmentFactory::createExecutionEnvironment)

            .orElseGet(StreamExecutionEnvironment::createLocalEnvironment);

    }

再回头看看env有什么特殊的。

publicclassStreamPlanEnvironmentextends StreamExecutionEnvironment {

    private Pipeline pipeline;

    public Pipeline getPipeline() {

        return pipeline;

    }

    @Override

    public JobClient executeAsync(StreamGraph streamGraph) {

        pipeline = streamGraph;

        // do not go on with anything now!thrownew ProgramAbortException();

    }

}

原来是重写了executeAysnc方法,当用户执行env.execute时,触发异常,从而在PackagedProgramUtils里面拦截异常,获取到用户到pipeline。

总结起来流程如下:

3 编程实战

通过阅读上述源码,可以学习到:

1 classloader类加载的父类优先和子类优先问题

2 threadlocal线程级本地变量的使用

3 PackagedProgramUtils 利用枚举作为工具类

4 PackagedProgramUtils 利用重写env,拦截异常获取pipeline。

关于pipeline如何提交到集群、如何运行。

举报

相关推荐

0 条评论