本人一名大四软件工程专业帅哥,现在进一家科技公司实习,在利用flink处理文件时发现了一些好玩的东西,看没有谁发过,就发来玩玩
想让Flink程序一直在running的话,只能让他不断循环读取数据,自定义Source即可,代码如下
package com.sunrun.Source;
import com.sunrun.common.HadoopUtil;
import com.sunrun.common.PathUtil;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.LocalTime;
public class ReadeHdfsSource extends RichParallelSourceFunction<String> {
//循环标志符
private volatile Boolean running;
//配置hadoop hdfs fileSystem
private transient Configuration configuration;
//要输入的文件路径
private String path;
public ReadeHdfsSource(String path) {
this.path = path;
}
@Override
public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
super.open(parameters);
configuration = new Configuration();
}
@Override
public void run(SourceContext<String> sourceContext) throws Exception {
Boolean subRunning = true;
String inputPath;
init();//初始化
String nowDataTime;
try {
while (running) {
//获取当天日期并且转为字符串
nowDataTime = LocalDateTime.now().toLocalDate().toString();
//目录拼接上日期
inputPath = path + nowDataTime;
FileSystem fileSystem = FileSystem.get(configuration);
subRunning = true;
LocalDateTime startTime = LocalDateTime.now();
//判断是否存在hdfs要读入的内容,没有就跳过
if (PathUtil.isCreate(configuration, inputPath)) {
collectRecords(sourceContext, fileSystem, path + nowDataTime);
}
while (subRunning) {
//将输入时间和现在时间作比较 相差一天且当前小时为凌晨3到9点就跳出循环进行外部循环读文件写文件
//与每天凌晨3点操作上面的读写文件相同
LocalDateTime endTime = LocalDateTime.now();
if (Duration.between(startTime, endTime).toDays() == 1 && ((endTime.getHour() < 9) || (endTime.getHour()) >= 3)) {
subRunning = false;
}
Thread.sleep(2000L);
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
//输出文件流
public void collectRecords(SourceContext<String> ctx, FileSystem fs, String path) throws IOException {
String fileName;
//循环根目录 得到文件名拼接
//获取文件列表
FileStatus[] fileStatuses = fs.listStatus(new Path(path));
//循环文件
for (int i = 0; i < fileStatuses.length; i++) {
if (fileStatuses[i].isFile()) {
fileName = fileStatuses[i].getPath().getName();//获取文件名字
//打开输入流
FSDataInputStream dataInputStream = fs.open(new Path(path + "/" + fileName));
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(dataInputStream));
String line;
while ((line = bufferedReader.readLine()) != null) {
ctx.collect(line);
}
//输出完成整个文件了
}
}
}
@Override
public void cancel() {
//点击页面cancel取消按钮的时候
running = false;
}
//初始化
private void init() {
running = true;
}
}
- cancel()为点击Apache Dashboard点击程序取消的时候触发
- open()为初始化
- run()为source程序运行的主逻辑程序