Flink源码分析系列文档目录
请点击:Flink 源码分析系列文档目录
flink.sh文件
Flink提交任务的脚本为flink.sh。我们去寻找flink.sh中Java入口代码。
flink.sh
# 省略代码...
exec $JAVA_RUN $JVM_ARGS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.cli.CliFrontend "$@"
Java代码部分分析
通过flink.sh不难发现Flink提交任务的入口类为CliFrontend。找到这个类的main方法:
public static void main(final String[] args) {
    // 日志打印环境配置信息
    EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);
    // 1. find the configuration directory
    // 读取Flink的配置文件目录
    final String configurationDirectory = getConfigurationDirectoryFromEnv();
    // 2. load the global configuration
    // 读取Flink配置文件到configuration对象
    final Configuration configuration = GlobalConfiguration.loadConfiguration(configurationDirectory);
    // 3. load the custom command lines
    // 加载自定义命令行
    final List<CustomCommandLine<?>> customCommandLines = loadCustomCommandLines(
        configuration,
        configurationDirectory);
    try {
        // 实例化CliFrontend对象
        final CliFrontend cli = new CliFrontend(
            configuration,
            customCommandLines);
        // 加载认证配置
        SecurityUtils.install(new SecurityConfiguration(cli.configuration));
        
        // 在认证环境下调用启动参数
        int retCode = SecurityUtils.getInstalledContext()
                .runSecured(() -> cli.parseParameters(args));
        System.exit(retCode);
    }
    catch (Throwable t) {
        final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
        LOG.error("Fatal error while running command line interface.", strippedThrowable);
        strippedThrowable.printStackTrace();
        System.exit(31);
    }
}
有以上分析可知提交flink任务的主要逻辑位于
cli.parseParameters(args)
在分析主要逻辑之前,我们注重分析下main方法的各个执行环节。
getConfigurationDirectoryFromEnv方法源码:
public static String getConfigurationDirectoryFromEnv() {
    // 从FLINK_CONF_DIR环境变量获取Flink配置文件路径
    String location = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
    // 如果配置了该环境变量
    if (location != null) {
        if (new File(location).exists()) {
            return location;
        }
        else {
            throw new RuntimeException("The configuration directory '" + location + "', specified in the '" +
                ConfigConstants.ENV_FLINK_CONF_DIR + "' environment variable, does not exist.");
        }
    }
    // 如果没有配置环境变量,从上层目录的conf寻找
    else if (new File(CONFIG_DIRECTORY_FALLBACK_1).exists()) {
        location = CONFIG_DIRECTORY_FALLBACK_1;
    }
    // 如果fallback1没找到,从当前目录的conf寻找
    else if (new File(CONFIG_DIRECTORY_FALLBACK_2).exists()) {
        location = CONFIG_DIRECTORY_FALLBACK_2;
    }
    // 如果都没有找到,系统异常退出
    else {
        throw new RuntimeException("The configuration directory was not specified. " +
                "Please specify the directory containing the configuration file through the '" +
            ConfigConstants.ENV_FLINK_CONF_DIR + "' environment variable.");
    }
    return location;
}
loadCustomCommandLines方法:
public static List<CustomCommandLine<?>> loadCustomCommandLines(Configuration configuration, String configurationDirectory) {
    List<CustomCommandLine<?>> customCommandLines = new ArrayList<>(2);
    //  Command line interface of the YARN session, with a special initialization here
    //  to prefix all options with y/yarn.
    //  Tips: DefaultCLI must be added at last, because getActiveCustomCommandLine(..) will get the
    //        active CustomCommandLine in order and DefaultCLI isActive always return true.
    final String flinkYarnSessionCLI = "org.apache.flink.yarn.cli.FlinkYarnSessionCli";
    // 尝试加载yarn的cli,yarn的cli可以使用yarn的专属参数。例如yqu yjm等
    try {
        customCommandLines.add(
            loadCustomCommandLine(flinkYarnSessionCLI,
                configuration,
                configurationDirectory,
                "y", // short prefix yarn参数的短前缀
                "yarn")); // long prefix 长前缀
    } catch (NoClassDefFoundError | Exception e) {
        LOG.warn("Could not load CLI class {}.", flinkYarnSessionCLI, e);
    }
    
    // 加载DefaultCLI
    customCommandLines.add(new DefaultCLI(configuration));
    return customCommandLines;
}
接下来关键部分到cli.parseParameters(args)这一句
public int parseParameters(String[] args) {
    // check for action
    // 检查命令行参数
    if (args.length < 1) {
        CliFrontendParser.printHelp(customCommandLines);
        System.out.println("Please specify an action.");
        return 1;
    }
    // get action
    String action = args[0];
    // remove action from parameters
    final String[] params = Arrays.copyOfRange(args, 1, args.length);
    // 对于run list info cancel stop savepoint,分别调用不同的分支
    try {
        // do action
        switch (action) {
            case ACTION_RUN:
                run(params);
                return 0;
            case ACTION_LIST:
                list(params);
                return 0;
            case ACTION_INFO:
                info(params);
                return 0;
            case ACTION_CANCEL:
                cancel(params);
                return 0;
            case ACTION_STOP:
                stop(params);
                return 0;
            case ACTION_SAVEPOINT:
                savepoint(params);
                return 0;
            case "-h":
            case "--help":
                CliFrontendParser.printHelp(customCommandLines);
                return 0;
            case "-v":
            case "--version":
                String version = EnvironmentInformation.getVersion();
                String commitID = EnvironmentInformation.getRevisionInformation().commitId;
                System.out.print("Version: " + version);
                System.out.println(commitID.equals(EnvironmentInformation.UNKNOWN) ? "" : ", Commit ID: " + commitID);
                return 0;
            default:
                System.out.printf("\"%s\" is not a valid action.\n", action);
                System.out.println();
                System.out.println("Valid actions are \"run\", \"list\", \"info\", \"savepoint\", \"stop\", or \"cancel\".");
                System.out.println();
                System.out.println("Specify the version option (-v or --version) to print Flink version.");
                System.out.println();
                System.out.println("Specify the help option (-h or --help) to get help on the command.");
                return 1;
        }
    } catch (CliArgsException ce) {
        return handleArgException(ce);
    } catch (ProgramParametrizationException ppe) {
        return handleParametrizationException(ppe);
    } catch (ProgramMissingJobException pmje) {
        return handleMissingJobException();
    } catch (Exception e) {
        return handleError(e);
    }
}
下面重点看下run方法。
protected void run(String[] args) throws Exception {
    LOG.info("Running 'run' command.");
    final Options commandOptions = CliFrontendParser.getRunCommandOptions();
    final Options commandLineOptions = CliFrontendParser.mergeOptions(commandOptions, customCommandLineOptions);
    final CommandLine commandLine = CliFrontendParser.parse(commandLineOptions, args, true);
    final RunOptions runOptions = new RunOptions(commandLine);
    // evaluate help flag
    if (runOptions.isPrintHelp()) {
        CliFrontendParser.printHelpForRun(customCommandLines);
        return;
    }
    if (!runOptions.isPython()) {
        // Java program should be specified a JAR file
        if (runOptions.getJarFilePath() == null) {
            throw new CliArgsException("Java program should be specified a JAR file.");
        }
    }
    final PackagedProgram program;
    try {
        LOG.info("Building program from JAR file");
        program = buildProgram(runOptions);
    }
    catch (FileNotFoundException e) {
        throw new CliArgsException("Could not build the program from JAR file.", e);
    }
    // 获取活动的commandLine
    final CustomCommandLine<?> customCommandLine = getActiveCustomCommandLine(commandLine);
    try {
        // 重点是这一句
        runProgram(customCommandLine, commandLine, runOptions, program);
    } finally {
        program.deleteExtractedLibraries();
    }
}
这段代码主要有三部分,解析运行参数,从用户提交的jar包封装为PackagedProgram,还有执行程序以及清理操作。
这里需要调研下如何获取活跃的commandLine
public CustomCommandLine<?> getActiveCustomCommandLine(CommandLine commandLine) {
    for (CustomCommandLine<?> cli : customCommandLines) {
        if (cli.isActive(commandLine)) {
            return cli;
        }
    }
    throw new IllegalStateException("No command-line ran.");
}
这个方法很简单,从CustomCommandLine中获取到一个active状态的commandLine会立即返回。CustomCommandLine在初始化CliFrontend时会先初始化FlinkYarnSessionCli,然后初始化DefaultCli。因此该方法会在FlinkYarnSessionCli为active时优先返回FlinkYarnSessionCli。对于DefaultCli,它的isActive方法总是返回true。
FlinkYarnSessionCli的isActive方法:
public boolean isActive(CommandLine commandLine) {
    // 获取命令行的-m参数,或者--jobmanager参数的值
    String jobManagerOption = commandLine.getOptionValue(addressOption.getOpt(), null);
    // 检测该值是否等于"yarn-cluster"
    boolean yarnJobManager = ID.equals(jobManagerOption);
    // 检测命令行是否指定了application id
    boolean yarnAppId = commandLine.hasOption(applicationId.getOpt());
    return yarnJobManager || yarnAppId || (isYarnPropertiesFileMode(commandLine) && yarnApplicationIdFromYarnProperties != null);
}
下面分析下runProgram方法。
private <T> void runProgram(
        CustomCommandLine<T> customCommandLine,
        CommandLine commandLine,
        RunOptions runOptions,
        PackagedProgram program) throws ProgramInvocationException, FlinkException {
    final ClusterDescriptor<T> clusterDescriptor = customCommandLine.createClusterDescriptor(commandLine);
    try {
        // 对于FlinkYarnSessionCli,如果提交任务的时候指定了yarn application id,会返回yarn application id。如果命令行没有覆盖yarn的属性,会从`.yarn-properties-username`文件读取application id。
        // 对于DefaultCLI,会返回StandaloneClusterId
        final T clusterId = customCommandLine.getClusterId(commandLine);
        final ClusterClient<T> client;
        // directly deploy the job if the cluster is started in job mode and detached
        // 如果是detached mode提交任务,并且clusterId为null,那么会进入此分支
        if (clusterId == null && runOptions.getDetachedMode()) {
            // 获取并行度
            int parallelism = runOptions.getParallelism() == -1 ? defaultParallelism : runOptions.getParallelism();
            // 构建出JobGraph,稍后分析
            final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, configuration, parallelism);
            // 获取集群运行时信息
            final ClusterSpecification clusterSpecification = customCommandLine.getClusterSpecification(commandLine);
            
            // 部署任务到集群
            client = clusterDescriptor.deployJobCluster(
                clusterSpecification,
                jobGraph,
                runOptions.getDetachedMode());
            logAndSysout("Job has been submitted with JobID " + jobGraph.getJobID());
            try {
                client.close();
            } catch (Exception e) {
                LOG.info("Could not properly shut down the client.", e);
            }
        } else {
            final Thread shutdownHook;
            if (clusterId != null) {
                client = clusterDescriptor.retrieve(clusterId);
                shutdownHook = null;
            } else {
                // also in job mode we have to deploy a session cluster because the job
                // might consist of multiple parts (e.g. when using collect)
                final ClusterSpecification clusterSpecification = customCommandLine.getClusterSpecification(commandLine);
                client = clusterDescriptor.deploySessionCluster(clusterSpecification);
                // if not running in detached mode, add a shutdown hook to shut down cluster if client exits
                // there's a race-condition here if cli is killed before shutdown hook is installed
                // 程序运行在attached mode,并且配置了sae属性,即cli退出的时候集群会关闭
                // 此时需要在程序退出前增加钩子,执行client的shutDownCluster方法
                if (!runOptions.getDetachedMode() && runOptions.isShutdownOnAttachedExit()) {
                    shutdownHook = ShutdownHookUtil.addShutdownHook(client::shutDownCluster, client.getClass().getSimpleName(), LOG);
                } else {
                    shutdownHook = null;
                }
            }
            try {
                client.setDetached(runOptions.getDetachedMode());
                LOG.debug("{}", runOptions.getSavepointRestoreSettings());
                int userParallelism = runOptions.getParallelism();
                LOG.debug("User parallelism is set to {}", userParallelism);
                if (ExecutionConfig.PARALLELISM_DEFAULT == userParallelism) {
                    userParallelism = defaultParallelism;
                }
                // 运行用户提交的程序
                executeProgram(program, client, userParallelism);
            } finally {
                if (clusterId == null && !client.isDetached()) {
                    // terminate the cluster only if we have started it before and if it's not detached
                    try {
                        client.shutDownCluster();
                    } catch (final Exception e) {
                        LOG.info("Could not properly terminate the Flink cluster.", e);
                    }
                    if (shutdownHook != null) {
                        // we do not need the hook anymore as we have just tried to shutdown the cluster.
                        ShutdownHookUtil.removeShutdownHook(shutdownHook, client.getClass().getSimpleName(), LOG);
                    }
                }
                try {
                    client.close();
                } catch (Exception e) {
                    LOG.info("Could not properly shut down the client.", e);
                }
            }
        }
    } finally {
        try {
            clusterDescriptor.close();
        } catch (Exception e) {
            LOG.info("Could not properly close the cluster descriptor.", e);
        }
    }
}
executeProgram方法:
protected void executeProgram(PackagedProgram program, ClusterClient<?> client, int parallelism) throws ProgramMissingJobException, ProgramInvocationException {
    logAndSysout("Starting execution of program");
    // 此行运行程序,稍后分析
    final JobSubmissionResult result = client.run(program, parallelism);
    if (null == result) {
        throw new ProgramMissingJobException("No JobSubmissionResult returned, please make sure you called " +
            "ExecutionEnvironment.execute()");
    }
    if (result.isJobExecutionResult()) {
        logAndSysout("Program execution finished");
        JobExecutionResult execResult = result.getJobExecutionResult();
        System.out.println("Job with JobID " + execResult.getJobID() + " has finished.");
        System.out.println("Job Runtime: " + execResult.getNetRuntime() + " ms");
        Map<String, Object> accumulatorsResult = execResult.getAllAccumulatorResults();
        if (accumulatorsResult.size() > 0) {
            System.out.println("Accumulator Results: ");
            System.out.println(AccumulatorHelper.getResultsFormatted(accumulatorsResult));
        }
    } else {
        logAndSysout("Job has been submitted with JobID " + result.getJobID());
    }
}
ClusterClient的run方法
public JobSubmissionResult run(PackagedProgram prog, int parallelism)
        throws ProgramInvocationException, ProgramMissingJobException {
    final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
    try {
        // 设置当前的classloader为用户代码的classloader
        Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
        log.info("Starting program (detached: {})", isDetached());
        // 获取所有用户代码的依赖库
        final List<URL> libraries = prog.getAllLibraries();
        // 此处为关键,由于用户代码通过client提交,所以在此创建了一个ContextEnvironmentFactory对象,用户代码中的getExecutionEnvironment会返回该Environment
        ContextEnvironmentFactory factory = new ContextEnvironmentFactory(this, libraries,
                prog.getClasspaths(), prog.getUserCodeClassLoader(), parallelism, isDetached(),
                prog.getSavepointSettings());
        ContextEnvironment.setAsContext(factory);
        try {
            // invoke main method
            // 调用用户程序主函数
            prog.invokeInteractiveModeForExecution();
            if (lastJobExecutionResult == null) {
                throw new ProgramMissingJobException("The program didn't contain a Flink job.");
            }
            return this.lastJobExecutionResult;
        } finally {
            ContextEnvironment.unsetContext();
        }
    }
    finally {
        // 将当前classloader再修改回来
        Thread.currentThread().setContextClassLoader(contextClassLoader);
    }
}
ContextEnvironment的setAsContext方法:
static void setAsContext(ContextEnvironmentFactory factory) {
    initializeContextEnvironment(factory);
}
initializeContextEnvironment位于ExecutionEnvironment中,如下所示:
protected static void initializeContextEnvironment(ExecutionEnvironmentFactory ctx) {
    // 设置全局的contextEnvironmentFactory
    contextEnvironmentFactory = Preconditions.checkNotNull(ctx);
    // 注意这是在static方法中执行,该contextEnvironmentFactory被同一线程共享
    threadLocalContextEnvironmentFactory.set(contextEnvironmentFactory);
}
回忆下用户代码的第一句:XXXEnvironment.getExecutionEnvironment()。代码如下:
public static ExecutionEnvironment getExecutionEnvironment() {
    // 如果用户代码在集群中执行(通过cli提交),则调用ExecutionEnvironmentFactory::createExecutionEnvironment,否则(比如在IDE里头运行)会调用ExecutionEnvironment::createLocalEnvironment
    return Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory)
        .map(ExecutionEnvironmentFactory::createExecutionEnvironment)
        .orElseGet(ExecutionEnvironment::createLocalEnvironment);
}
PackagedProgram的invokeInteractiveModeForExecution方法
public void invokeInteractiveModeForExecution() throws ProgramInvocationException{
    callMainMethod(mainClass, args);
}
callMainMethod方法:
private static void callMainMethod(Class<?> entryClass, String[] args) throws ProgramInvocationException {
    Method mainMethod;
    // 检查入口类必须有public修饰符
    if (!Modifier.isPublic(entryClass.getModifiers())) {
        throw new ProgramInvocationException("The class " + entryClass.getName() + " must be public.");
    }
    // 必须具有main方法
    try {
        mainMethod = entryClass.getMethod("main", String[].class);
    } catch (NoSuchMethodException e) {
        throw new ProgramInvocationException("The class " + entryClass.getName() + " has no main(String[]) method.");
    }
    catch (Throwable t) {
        throw new ProgramInvocationException("Could not look up the main(String[]) method from the class " +
                entryClass.getName() + ": " + t.getMessage(), t);
    }
    // 方法必须为static
    if (!Modifier.isStatic(mainMethod.getModifiers())) {
        throw new ProgramInvocationException("The class " + entryClass.getName() + " declares a non-static main method.");
    }
    // 方法必须是public的
    if (!Modifier.isPublic(mainMethod.getModifiers())) {
        throw new ProgramInvocationException("The class " + entryClass.getName() + " declares a non-public main method.");
    }
    // 反射调用main函数
    try {
        mainMethod.invoke(null, (Object) args);
    }
    catch (IllegalArgumentException e) {
        throw new ProgramInvocationException("Could not invoke the main method, arguments are not matching.", e);
    }
    catch (IllegalAccessException e) {
        throw new ProgramInvocationException("Access to the main method was denied: " + e.getMessage(), e);
    }
    catch (InvocationTargetException e) {
        Throwable exceptionInMethod = e.getTargetException();
        if (exceptionInMethod instanceof Error) {
            throw (Error) exceptionInMethod;
        } else if (exceptionInMethod instanceof ProgramParametrizationException) {
            throw (ProgramParametrizationException) exceptionInMethod;
        } else if (exceptionInMethod instanceof ProgramInvocationException) {
            throw (ProgramInvocationException) exceptionInMethod;
        } else {
            throw new ProgramInvocationException("The main method caused an error: " + exceptionInMethod.getMessage(), exceptionInMethod);
        }
    }
    catch (Throwable t) {
        throw new ProgramInvocationException("An error occurred while invoking the program's main method: " + t.getMessage(), t);
    }
}
到这里用户代码已经得到了执行。










