0
点赞
收藏
分享

微信扫一扫

Flink SQL代码补全提示(源码分析)


文章目录

  • ​​01 引言​​
  • ​​02 案例​​
  • ​​2.1 源码案例​​
  • ​​2.2 举例​​
  • ​​03 源码分析​​
  • ​​3.1 SqlParserHelper​​
  • ​​3.2 ParserImpl​​
  • ​​3.2.1 ExtendedParser​​
  • ​​3.2.2 SqlAdvisor​​
  • ​​3.3 比对​​
  • ​​04 文末​​

01 引言

Flink 源码地址: ​​https://github.com/apache/flink​​

使用过​​Navicat​​​的童鞋都知道,当我们写​​SQL​​​的时候,工具会根据我们输入的内容弹出提示,这样可以很方便我们去写​​SQL​​,如下:

Flink SQL代码补全提示(源码分析)_ide


Flink也是支持SQL的,当然它也有对应的接口支持SQL提示,本文来讲讲。

02 案例

2.1 源码案例

Flink源码里面已经有代码提示的demo了,具体在​​CliClientTest.java(点击即可打开)​​ ,具体在如下方法里实现了:

Flink SQL代码补全提示(源码分析)_sql_02


具体的入参为

参数名

含义

statement

当前输入的SQL

position

SQL末端光标的位置

返回的内容为可能的SQL提示集合

2.2 举例

比如目前输入的SQL为:

select *

入参的内容为:

参数名


statement

“select * fr”

position

11

返回的结果为集合:

FROM

接下来分析下其源码。

03 源码分析

从上面的例子,我们可以看到,获取提示的方法如下:

@Override
public List<String> completeStatement(String sessionId, String statement, int position) {
receivedStatement = statement;
receivedPosition = position;
return Arrays.asList(helper.getSqlParser().getCompletionHints(statement, position));
}

主要的核心是这一句:helper.getSqlParser().getCompletionHints(statement, position)

指得是通过helper获取sql解析器,然后调用里面的getCompletionHints方法来获取提示

接下来看看​​helper​​类。

3.1 SqlParserHelper

从下图可以得知,​​helper​​也是测试包里面的一个类:

Flink SQL代码补全提示(源码分析)_实时计算_03


​helper​​​只是用于指引用户如何实现的一个工具类在​​Flink​​​对外的​​API​​里是不存在的,这个不重要,我们看看里面的逻辑(里面已写注释):

/**
* SqlParser 工具类
*
* @author : YangLinWei
* @createTime: 2022/9/22 2:37 下午
* @version: 1.0.0
*/
public class SqlParserHelper {

// 从这个TableEnvironment里获取SqlParser解析器实例
private TableEnvironment tableEnv;

/**
* 构造函数
* <p>
* 使用默认EnvironmentSettings配置(当然用户可以根据自己的业务场景来设置配置)来初始化TableEnvironment
*/
public SqlParserHelper() {
tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance().build());
}

/**
* 构造函数
* <p>
* 主要使用指定的SqlDialect来初始化TableEnvironment
*/
public SqlParserHelper(SqlDialect sqlDialect) {
if (sqlDialect == null || SqlDialect.DEFAULT == sqlDialect) {
tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance().build());
} else if (SqlDialect.HIVE == sqlDialect) {
HiveCatalog hiveCatalog = HiveTestUtils.createHiveCatalog();
tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance().build());
tableEnv.getConfig().setSqlDialect(sqlDialect);
tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
tableEnv.useCatalog(hiveCatalog.getName());
}
}

/**
* 准备一些Flink DDL 来用于测试
*/
public void registerTables() {
registerTable(
"create table MyTable (a int, b bigint, c varchar(32)) "
+ "with ('connector' = 'filesystem', 'path' = '/non', 'format' = 'csv')");
registerTable(
"create table MyOtherTable (a int, b bigint) "
+ "with ('connector' = 'filesystem', 'path' = '/non', 'format' = 'csv')");
registerTable(
"create table MySink (a int, c varchar(32)) with ('connector' = 'COLLECTION' )");
registerTable("create view MyView as select * from MyTable");
}

public void registerTable(String createTableStmt) {
tableEnv.executeSql(createTableStmt);
}

/**
* 获取SqlParser解析器
*
* @return Sql解析器
*/
public Parser getSqlParser() {
return ((TableEnvironmentInternal) tableEnv).getParser();
}

public TableEnvironment getTableEnv() {
return tableEnv;
}
}

从代码可以分析得出,如果要调用Flink里面的代码提示接口,需要先初始化TableEnviorment,然后获取Sql解析器

接下来,看看Sql解析器的代码。

3.2 ParserImpl

从如下代码提示,可以看到解析器的实现有几个,这里使用的是​​blink​​​包里面的​​ParserImpl​​。

Flink SQL代码补全提示(源码分析)_hive_04

​ParserImpl​​​ 实现了​​Parser​​​接口,其中​​Parser​​接口的代码及注释如下:

/**
* SQL解析器
* <p>
* 主要解析SQL为SQL对象
*
* @author : YangLinWei
* @createTime: 2022/9/22 3:00 下午
* @version: 1.0.0
*/
@Internal
public interface Parser {

/**
* 解析String类型的SQL入口
* <p>
* 注意:</b>如果创建的{@link Operation}是一个{@link QueryOperation},
* 它必须以{@link Planner#translate(List)}方法能够理解的形式出现。
*
* @param statement 待解析的SQL
*
* @return 将查询解析为关系 {@link Operation}s
*/
List<Operation> parse(String statement);

/**
* 解析SQL标识符集
*
* @param identifier SQL唯一标识
*
* @return 被解析的SQL唯一标识
*/
UnresolvedIdentifier parseIdentifier(String identifier);

/**
* 解析SQL表达式入口
*
* @param sqlExpression 将被解析的SQL表达式
* @param inputRowType SQL表达式中可用的字段
* @param outputType 预期的顶级输出类型(如果可用)
*
* @return 解析后的表达式
*/
ResolvedExpression parseSqlExpression(
String sqlExpression, RowType inputRowType, @Nullable LogicalType outputType);

/**
* 在给定的游标位置返回给定语句的代码提示
* <p>
* 注意:补全是不区分大小写的。
*
* @param statement 部分或轻微错误的SQL语句
* @param position 光标位置
*
* @return 当前光标位置的完成提示
*/
String[] getCompletionHints(String statement, int position);
}

在本文,我们关注的是​​getCompletionHints​​的方法,其实现代码如下(含注释):

/**
* 在给定的游标位置返回给定语句的代码提示
* <p>
* 注意:补全是不区分大小写的。
*
* @param statement 部分或轻微错误的SQL语句
* @param cursor 光标位置
*
* @return 当前光标位置的完成提示集合
*/
public String[] getCompletionHints(String statement, int cursor) {

// 使用ExtendedParser来获取 代码提示的内容集
List<String> candidates =
new ArrayList<>(
Arrays.asList(EXTENDED_PARSER.getCompletionHints(statement, cursor)));

// 使用SqlAdvisor来获取 代码提示的内容集
SqlAdvisorValidator validator = validatorSupplier.get().getSqlAdvisorValidator();
SqlAdvisor advisor =
new SqlAdvisor(validator, validatorSupplier.get().config().getParserConfig());
String[] replaced = new String[1];

List<String> sqlHints =
advisor.getCompletionHints(statement, cursor, replaced).stream()
.map(item -> item.toIdentifier().toString())
.collect(Collectors.toList());

// 取代码提示的并集,并返回
candidates.addAll(sqlHints);

return candidates.toArray(new String[0]);
}

从代码可以得知,获取提示内容集的逻辑是分别使用“ExtendedParser”和“​SqlAdvisor​”来获取提示内容的集合,进一步看看这两个类

3.2.1 ExtendedParser

ExtendedParser获取SQL提示的方法代码如下(含注释):

public String[] getCompletionHints(String statement, int cursor) {
// SQL 语句转为大写
String normalizedStatement = statement.trim().toUpperCase();
List<String> hints = new ArrayList<>();

// 遍历 解析策略 集合
for (ExtendedParseStrategy strategy : PARSE_STRATEGIES) {
// 判断查询的SQL语句是否策略里面hints集合的开头,如果是,则返回提示
for (String hint : strategy.getHints()) {
if (hint.startsWith(normalizedStatement) && cursor < hint.length()) {
hints.add(getCompletionHint(normalizedStatement, hint));
}
}
}

return hints.toArray(new String[0]);
}

上述代码主要做的事情就是判断传入的SQL是否为定义的策略集合里面的提示内容的前缀,如果是,则直接返回提示

策略集合的实现有几个:

Flink SQL代码补全提示(源码分析)_实时计算_05


整理里面getHints方法的内容,有如下的内容:

策略者

getHints方法内容

ClearOperationParseStrategy

CLEAR

HelpOperationParseStrategy

HELP

QuitOperationParseStrategy

EXIT、QUIT

ResetOperationParseStrategy

RESET

SetOperationParseStrategy

SET

总的来说,就是根据SQL语句,能返回的提示就是以上表格​​getHints​​方法里面返回的内容合集之一。

3.2.2 SqlAdvisor

SqlAdvisor获取SQL提示的方法代码如下(含注释):

public List<SqlMoniker> getCompletionHints(String sql, int cursor, String[] replaced) {
int wordStart = cursor;

boolean quoted;

// 获取关键字起始游标
for(quoted = false; wordStart > 0 && Character.isJavaIdentifierPart(sql.charAt(wordStart - 1)); --wordStart) {
}

if (wordStart > 0 && sql.charAt(wordStart - 1) == this.quoteStart()) {
quoted = true;
--wordStart;
}

if (wordStart < 0) {
return Collections.emptyList();
} else {

// 获取关键字结束游标
int wordEnd;
for(wordEnd = cursor; wordEnd < sql.length() && Character.isJavaIdentifierPart(sql.charAt(wordEnd)); ++wordEnd) {
}

if (quoted && wordEnd < sql.length() && sql.charAt(wordEnd) == this.quoteEnd()) {
++wordEnd;
}

// 获取关键字
String word = replaced[0] = sql.substring(wordStart, cursor);
if (wordStart < wordEnd) {
// SQL去除关键字
sql = sql.substring(0, wordStart) + sql.substring(wordEnd);
}

// 根据 已去除关键字的SQL + 关键字进一步查询
List<SqlMoniker> completionHints = this.getCompletionHints0(sql, wordStart);
if (quoted) {
word = word.substring(1);
}

if (word.isEmpty()) {
return completionHints;
} else {
List<SqlMoniker> result = new ArrayList();
Casing preferredCasing = this.getPreferredCasing(word);
boolean ignoreCase = preferredCasing != Casing.UNCHANGED;
Iterator var12 = completionHints.iterator();

while(var12.hasNext()) {
SqlMoniker hint = (SqlMoniker)var12.next();
List<String> names = hint.getFullyQualifiedNames();
String cname = (String)Util.last(names);
if (cname.regionMatches(ignoreCase, 0, word, 0, word.length())) {
result.add(hint);
}
}

return result;
}
}
}

从上述代码,可以看到主要是为了获取 ​​sql​​​ 中的关键字以及去除关键字后的​​sql​​,比如:

select

那么关键字为 “​​fro​​”,去除关键字后的sql为“​​select *​​​ ”,进一步看​​getCompletionHints0​​方法,代码及注释如下:

/**
*
* @Param sql 去除关键字后的sql
* @Param cursor 关键字的起始游标
*
*/
public List<SqlMoniker> getCompletionHints0(String sql, int cursor) {
// 封装SQL,后缀加上 "_suggest_"
String simpleSql = this.simplifySql(sql, cursor);
int idx = simpleSql.indexOf("_suggest_");
if (idx < 0) {
return Collections.emptyList();
} else {
SqlParserPos pos = new SqlParserPos(1, idx + 1);
// 获取SQL提示
return this.getCompletionHints(simpleSql, pos);
}
}

进一步查询SQL提示:

Flink SQL代码补全提示(源码分析)_hive_06


上述的代码不是重点,最为重要的提示是从上述红框里获取的,及从validator里面的lookupHints里获取,看看是怎样获取的。

Flink SQL代码补全提示(源码分析)_sql_07


进一步查看​​lookupSelectHints​​方法:

Flink SQL代码补全提示(源码分析)_sql_08


继续断点,查看lookupFromHints方法:

Flink SQL代码补全提示(源码分析)_ide_09


继续看看​​SqlValidatorUtil.getSchemaObjectMonikers​​方法:

Flink SQL代码补全提示(源码分析)_hive_10

可以看出,内容是从SqlValidatorCatalogReader.getAllSchemaObjectNames里获取出来的,也就是把所有的元数据获取出来(依次从 catalog -> database -> table遍历出来)

代码跟踪了那么久,其实主要的核心还是在​​SqlValidatorCatalogReader.getAllSchemaObjectNames​​获取所有的元数据的,这里就不再做解析了,其实就是不断遍历的一个过程。

3.3 比对

到最后,拿之前解析出来的关键字与获取到的元数据进行​​match​​​匹配就可以得出最终的​​SQL​​提示了:

Flink SQL代码补全提示(源码分析)_ide_11

04 文末

本文主要讲解了Flink SQL的代码补全提示功能,本质其实就是使用SqlValidatorCatalogReader.getAllSchemaObjectNames 来获取所有的元数据,然后跟关键字匹配,并返回提示结果集。

谢谢大家的阅读,希望能帮助到大家,本文完!


举报

相关推荐

0 条评论