0
点赞
收藏
分享

微信扫一扫

如何让Flink程序一直运行 并自动根据日期读取文件夹下的内容(自定义Source)

爱做梦的夏夏 2022-01-27 阅读 23

本人一名大四软件工程专业帅哥,现在进一家科技公司实习,在利用flink处理文件时发现了一些好玩的东西,看没有谁发过,就发来玩玩

想让Flink程序一直在running的话,只能让他不断循环读取数据,自定义Source即可,代码如下

package com.sunrun.Source;

import com.sunrun.common.HadoopUtil;
import com.sunrun.common.PathUtil;

import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.LocalTime;

public class ReadeHdfsSource extends RichParallelSourceFunction<String> {

    //循环标志符
    private volatile Boolean running;

    //配置hadoop hdfs fileSystem
    private transient Configuration configuration;

    //要输入的文件路径
    private String path;

    public ReadeHdfsSource(String path) {


        this.path = path;
    }

    @Override
    public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
        super.open(parameters);
        configuration = new Configuration();
    }

    @Override
    public void run(SourceContext<String> sourceContext) throws Exception {
        Boolean subRunning = true;
        String inputPath;
        init();//初始化
        String nowDataTime;

        try {
            while (running) {

                //获取当天日期并且转为字符串
                nowDataTime = LocalDateTime.now().toLocalDate().toString();
                //目录拼接上日期

                inputPath = path + nowDataTime;

                FileSystem fileSystem = FileSystem.get(configuration);

                subRunning = true;
                LocalDateTime startTime = LocalDateTime.now();

                //判断是否存在hdfs要读入的内容,没有就跳过
                if (PathUtil.isCreate(configuration, inputPath)) {
                    collectRecords(sourceContext, fileSystem, path + nowDataTime);
                }


                while (subRunning) {
                    //将输入时间和现在时间作比较 相差一天且当前小时为凌晨3到9点就跳出循环进行外部循环读文件写文件
                    //与每天凌晨3点操作上面的读写文件相同
                    LocalDateTime endTime = LocalDateTime.now();
                    if (Duration.between(startTime, endTime).toDays() == 1 && ((endTime.getHour() < 9) || (endTime.getHour()) >= 3)) {
                        subRunning = false;
                    }
                    Thread.sleep(2000L);
                }


            }

        } catch (Exception e) {
            throw new RuntimeException(e);
        }


    }

    //输出文件流
    public void collectRecords(SourceContext<String> ctx, FileSystem fs, String path) throws IOException {
        String fileName;
        //循环根目录 得到文件名拼接
        //获取文件列表
        FileStatus[] fileStatuses = fs.listStatus(new Path(path));
        //循环文件
        for (int i = 0; i < fileStatuses.length; i++) {
            if (fileStatuses[i].isFile()) {
                fileName = fileStatuses[i].getPath().getName();//获取文件名字
                //打开输入流
                FSDataInputStream dataInputStream = fs.open(new Path(path + "/" + fileName));
                BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(dataInputStream));
                String line;

                while ((line = bufferedReader.readLine()) != null) {
                    ctx.collect(line);
                }
                //输出完成整个文件了
            }


        }


    }


    @Override
    public void cancel() {
        //点击页面cancel取消按钮的时候
        running = false;
    }

    //初始化
    private void init() {
        running = true;
    }
}

  • cancel()为点击Apache Dashboard点击程序取消的时候触发
  • open()为初始化
  • run()为source程序运行的主逻辑程序
举报

相关推荐

0 条评论