NiFi 中的处理器(一):GenerateTableFetch
1.简介
该处理器用于 生成在表中执行分页查询的 SQL 查询语句,分区(属性 partition)大小以及表的行数决定页面的大小和数量以及生成的流文件。此外,可以通过设置最大值列(Max-Value)来实现增量抓取数据,处理器会跟踪列的最大值,从而 只抓取列值超过已记录到的最大值的行,该处理器只在主节点上运行,可以接受传入的连接。
提供传入连接与否,处理器的行为是不同的:
- 如果没有指定传入连接,处理器将根据指定的处理器调度生成 SQL 语句。许多字段都支持表达式语言,但是没有流文件属性可用。但是,可以使用变量注册表评估属性。
- 如果指定了传入连接,并且处理器任务没有可用的流文件,则不执行任何工作。
- 如果指定了传入连接,并且处理程序任务有可用流文件,则流文件的属性可以在表达式语言中用于表名等字段。但是,
Max-Value列和返回字段的列必须为空或者引用每个指定表中可用的列(多表查询,字段设置成属性表达式语言就可以了)。
关于该组件的相关配置在 官方文档 中已经描述的很清楚了,此处不再赘述。
2.应用场景
GenerateTableFetch 使用其属性和指定的数据库连接生成包含 SQL 语句的流文件,这些 SQL 语句可用于从表中获取分页的数据。GenerateTableFetch 执行对数据库的查询,以确定当前行数和最大值,如果指定了最大值列,则收集其最大值列的值大于 GenerateTableFetch 最后观察到的值的行数。这允许增量获取新行,而不是每次生成 SQL 来获取整个表。如果没有设置最大值列,那么处理器将生成 SQL 来每次获取整个表。
为了生成将获取分页数据的 SQL,默认情况下 GenerateTableFetch 将生成基于最大值列(如果存在)对数据排序的 SQL,并使用结果集的行号来确定每个页面。例如,如果最大值列是一个整数 id,分区大小为
10
10
10,那么第一个页面的 SQL 可能是 SELECT * FROM myTable LIMIT 10,第二个页面可能是 SELECT * FROM myTable OFFSET 10 LIMIT 10,依此类推。
根据数据库、行数等,对数据进行排序可能是一项昂贵的操作。或者,也可以使用 Column for Value Partitioning 属性指定一个列,该列的值将用于确定页面。如果设置了,GenerateTableFetch 将确定列的最小值和最大值,并使用最小值作为初始偏移量。然后,获取页面的 SQL 基于这个初始偏移量和值的总差(即最大值 - 最小值)除以页面大小。例如,如果列 id 用于值分区,然后列值
100
100
100 到
200
200
200,页面大小为
10
10
10 的 SQL 来获取第一页可能是 SELECT * FROM myTable id > = 100 and id < 110,第二页可能是 SELECT * from myTable id > = 110 and id < 120 等等。
重要的是,将用于值分区的列设置为可以强制类型为长整数(即不是日期或时间戳)的列,并且为了获得最佳性能,列值是均匀分布的,而不是稀疏的。作为上面的反例,考虑一个列 id,其值分别为
100
100
100、
2000
2000
2000 和
30000
30000
30000。如果分区大小为
100
100
100,那么列值相对稀疏,因此 “第二页” 的 SQL 将返回零行,直到查询中的值变为 id >= 2000 为止,每个页面都将返回零行。另一个反例是值不是均匀分布的:假设一个值为
100
、
200
、
201
、
202
、
…
299
100、200、201、202、…299
100、200、201、202、…299。然后,第一个页面的 SQL 将返回值为 id = 100 的一行,第二个页面将返回值为
200
…
…
299
200…… 299
200……299 的
100
100
100 行。这可能导致下游处理时间不一致,因为页面可能包含非常不同的行数。由于这些原因,建议使用足够密集(而不是稀疏)且分布相当均匀的列进行值分区。
3.示例
首先配置好数据库 DBCPConnectionPool。

然后配置 GenerateTableFetch 中的 Database Connection Pooling Service。

3.1 案例一:无输入流文件,来源表含增量字段



保存状态:

3.2 案例二:无输入流文件,不含增量字段

输出:

3.3 案例三:无输入流文件,带自增 id

当然,实际增量抽取时,max-value 也设置成 id。



3.4 案例四:有来源流文件,查多表,无增量字段

使用 GenerateFlowFile 配置一个多表表名数组,切割 json,然后将表名提取到属性中:


输出结果:



3.5 案例五:有来源流文件,查多表,带增量字段
只需将案例四中 json 修改一下:



输出结果:



状态:











