Flink编程基本步骤:
1.创建流执行环境 StreamExecutionEnviroment.getExecutionEnviroment() 获取流环境。
2.加载数据源 Source
3.转换操作 Transformation
4.输出出去Sink,落地到其它的数据仓库,直接打印输出.
关于Flink 数据的基本操作 —— 四种分类
-  单条数据的操作 map filter 
-  多条数据的操作 window 
-  多个流合并成一个流操作 connect union join 
-  将一个流拆分成多个流操作 ,(split 过期),测输出流(OutputTag)output 
Flink输入数据源 source
自带预定义Source
-  基于本地集合Source -  应用场景,当程序写完之后,测试当前功能是否可用,开发测试用。 
-  分类 -  从元素 fromElements 
-  从集合 fromCollection 
-  基于Sequence的generateSequence 
-  基于开始和结束的DataStream ,fromSequence 
 
-  
 
-  
-  并行度设置 -  并行度设置方式 1.设置配置文件 flink-conf.yaml parallelism.default: 1 2.在client端提交任务设置并行度 flink run -p 1 3.在程序中设置全局并行度 env.setParallelism(1) 4.算子级别的并行度设置 算子.setParallelism(1) 优先级: 算子优先级 > 程序中全局并行度 > client提交作业并行度 > 配置文件中的并行度 
-  全局并行度设置 env.setParallelism(1); 
-  算子并行度设置 source.print().setParallelism(2); 
 
-  
-  基于文件的Source -  批的方式读取文件,只读取一次——readTextFile 
-  流的方式读取文件,实时根据指定周期去监控文件—— readFile 监控方式 watchType 两种 -  FileProcessingMode.PROCESS_CONTINUOUSLY 主要用于修改删除操作比较多场景,根据周期持续读取整个文件(破坏仅一次语义特性) 
-  FileProcessingMode.Process_Once 主要用于仅读取一次,读完就 exit 
 
-  
 
-  
-  基于Socket的Source env.socketTextStream("node1", 9999);
自定义Source
-  主要用于指定特定格式的数据源,生成有界或无界数据流 -  有界数据流 - 读取 mysql ,读取 文件 
-  无界数据流 - for 循环生成数据 
 
-  
-  需求 - 通过用户自定义的方式生成数据源。
-  需求案例 -  自定义实现SourceFunction接口案例 只需要重写 run方法和 cancel方法 
 
-  
package cn.itcast.flink.api;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.text.SimpleDateFormat;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
 * Author itcast
 * Date 2021/11/29 14:56
 * Desc - 每1秒钟随机生成一条订单信息(订单ID、用户ID、订单金额、时间戳)
 * public class Order
 * String oid;
 * int uid;
 * double money;
 * long timestamp;
 * String datetime;
 * 每一秒钟生成一条数据
 * 打印输出每条数据
 * 执行流环境
 */
public class OrderSource {
    public static void main(String[] args) throws Exception {
        //1.创建流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.设置并行度
        env.setParallelism(1);
        //3.获取自定义数据源
        //实现方式
        DataStreamSource<Order> source = env.addSource(new OrderEmitSource());
        //4.打印输出
        source.printToErr();
        //5.执行流环境
        env.execute();
    }
    public static class OrderEmitSource implements SourceFunction<Order> {
        //定义一个标记,用于标识当前持续生成数据
        private volatile boolean isRunning = true;
        /**
         * 实现数据的生成,并将生成的数据输出 ctx 输出
         *
         * @param ctx
         * @throws Exception
         */
        @Override
        public void run(SourceContext<Order> ctx) throws Exception {
            //定义随机数
            Random rm = new Random();
            //时间转换格式工具
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            //死循环,一直生成数据
            while (isRunning) {
                //随机数
                String oid = UUID.randomUUID().toString();
                //用户id ,随机 0~5 之间值
                int uid = rm.nextInt(6);
                //money 0~100之间的
                double money = rm.nextDouble()*100;
                //时间戳
                long timestamp = System.currentTimeMillis();
                //当前时间
                String datetime = sdf.format(timestamp);
                Order order = new Order(
                        oid,
                        uid,
                        money,
                        timestamp,
                        datetime
                );
                //收集数据
                ctx.collect(order);
                //程序休眠一秒接着执行
                TimeUnit.SECONDS.sleep(1);
            }
        }
        /**
         * 用户取消生成的时候,取消生成
         */
        @Override
        public void cancel() {
            isRunning = false;
        }
    }
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class Order {
        private String oid;
        private int uid;
        private double money;
        private long timestamp;
        private String datetime;
    }
}实现ParallelSourceFunction 接口案例
并行化生成数据,算子上设置并行度 setParallelism(n)
package cn.itcast.flink.api;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.text.SimpleDateFormat;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
 * Author itcast
 * Date 2021/11/29 14:56
 * Desc - 每1秒钟随机生成一条订单信息(订单ID、用户ID、订单金额、时间戳)
 * public class Order
 * String oid;
 * int uid;
 * double money;
 * long timestamp;
 * String datetime;
 * 每一秒钟生成一条数据
 * 打印输出每条数据
 * 执行流环境
 */
public class OrderParallelismSource {
    public static void main(String[] args) throws Exception {
        //1.创建流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.设置并行度
        env.setParallelism(1);
        //3.获取自定义数据源
        //实现方式
        DataStreamSource<Order> source = env.addSource(new OrderEmitSource()).setParallelism(6);
        //4.打印输出 
        source.printToErr();
        //5.执行流环境
        env.execute();
    }
    public static class OrderEmitSource implements ParallelSourceFunction<Order> {
        //定义一个标记,用于标识当前持续生成数据
        private volatile boolean isRunning = true;
        /**
         * 实现数据的生成,并将生成的数据输出 ctx 输出
         *
         * @param ctx
         * @throws Exception
         */
        @Override
        public void run(SourceContext<Order> ctx) throws Exception {
            //定义随机数
            Random rm = new Random();
            //时间转换格式工具
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            //死循环,一直生成数据
            while (isRunning) {
                //随机数
                String oid = UUID.randomUUID().toString();
                //用户id ,随机 0~5 之间值
                int uid = rm.nextInt(6);
                //money 0~100之间的
                double money = rm.nextDouble()*100;
                //时间戳
                long timestamp = System.currentTimeMillis();
                //当前时间
                String datetime = sdf.format(timestamp);
                Order order = new Order(
                        oid,
                        uid,
                        money,
                        timestamp,
                        datetime
                );
                //收集数据
                ctx.collect(order);
                //程序休眠一秒接着执行
                TimeUnit.SECONDS.sleep(5);
            }
        }
        /**
         * 用户取消生成的时候,取消生成
         */
        @Override
        public void cancel() {
            isRunning = false;
        }
    }
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class Order {
        private String oid;
        private int uid;
        private double money;
        private long timestamp;
        private String datetime;
    }
}实现RichParallelSourceFunction案例
-  Rich 是富函数继承了 AbstractRichFunciton,实现了 
-  生命周期的 open 和 close 方法 -  open 方法,用于实现当前生成的初始化条件 
-  close 方法,用于生成数据结束的收尾工作 
-  getRuntimeContext 方法,用于获取当前的程序的上下文对象(参数、环境变量、状态、累加器等) 
 
-  
-  案例 - 从数据库中读取数据
-  1:初始化工作 —— 创建数据库和数据表 
# 创建数据库
create database test;
# 使用数据库
use test;
# 创建表和导入数据
DROP TABLE IF EXISTS `user`;
CREATE TABLE `user`  (
  `id` int(11) NOT NULL,
  `username` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `password` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact;
-- ----------------------------
-- Records of user
-- ----------------------------
INSERT INTO `user` VALUES (10, 'dazhuang', '123456', '大壮');
INSERT INTO `user` VALUES (11, 'erya', '123456', '二丫');
INSERT INTO `user` VALUES (12, 'sanpang', '123456', '三胖');
SET FOREIGN_KEY_CHECKS = 1;2:Flink读取MySQL的数据源
package cn.itcast.flink.source;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.concurrent.TimeUnit;
/**
 * Author itcast
 * Date 2022/1/11 16:17
 * Desc 读取mysql数据表并打印输出
 * 开发步骤:
 * 1.创建和准备数据库和数据表  flink
 * 2.获取流执行环境
 * 3.设置并行度
 * 4.添加自定义数据源,从mysql中读取数据,实现 RichSourceFunction ,rich 增强富功能 open close getRuntimeContext
 * 4.1. open 初始化动作,创建连接,创建 statement ,获取变量
 * 4.2. run方法 读取数据表中数据并封装成对象
 * 4.3. close方法 关闭statement和连接
 * 5. 打印结果输出
 * 6. 执行流环境
 */
public class UserSource {
    public static void main(String[] args) throws Exception {
        //2.获取流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //3.设置并行度
        env.setParallelism(1);
        //4.添加自定义数据源,从mysql中读取数据,实现 RichSourceFunction ,rich 增强富功能 open close getRuntimeContext
        DataStreamSource<User> source = env.addSource(new RichSourceFunction<User>() {
            Connection conn = null;
            Statement statement = null;
            //标记
            boolean isRunning = true;
            /**
             * 在所有执行source,首先要做的初始化工作
             * @param parameters
             * @throws Exception
             */
            @Override
            public void open(Configuration parameters) throws Exception {
                //1.设置 driver 驱动
                Class.forName("com.mysql.jdbc.Driver");
                //2.获取连接 设置 url 用户名 密码
                conn = DriverManager.getConnection(
                        "jdbc:mysql://node1:3306/flink?useSSL=false",
                        "root",
                        "123456"
                );
                //3.创建 statement 基于 sql
                statement = conn.createStatement();
            }
            /**
             * 所有的元素都在这里执行
             * @param ctx
             * @throws Exception
             */
            @Override
            public void run(SourceContext<User> ctx) throws Exception {
                String sql = "select id,username,password,name from user";
                while (isRunning) {
                    //1.读取数据 statement.executeQuery 得到 ResultSet 结果集
                    ResultSet rs = statement.executeQuery(sql);
                    //2.遍历 ResultSet 是否有数据 hasNext() = true
                    while (rs.next()) {
                        User user = new User();
                        //3.将每条数据 赋值 对象 User
                        int id = rs.getInt("id");
                        String username = rs.getString("username");
                        String password = rs.getString("password");
                        String name = rs.getString("name");
                        user.setId(id);
                        user.setUsername(username);
                        user.setPassword(password);
                        user.setName(name);
                        //4.将 User 收集 ctx.collect(user)
                        ctx.collect(user);
                    }
                    TimeUnit.MINUTES.sleep(5);
                }
            }
            @Override
            public void cancel() {
                //将flag置为 false
                isRunning = false;
            }
            /**
             * 所有的元素执行完毕的收尾工作
             * @throws Exception
             */
            @Override
            public void close() throws Exception {
                //关闭 statement
                if (!statement.isClosed()) {
                    statement.close();
                }
                //关闭 connection
                if (!conn.isClosed()) {
                    conn.close();
                }
            }
        });
        //4.1. open 初始化动作,创建连接,创建 statement ,获取变量
        //4.2. run方法 读取数据表中数据并封装成对象
        //4.3. close方法 关闭statement和连接
        //5. 打印结果输出
        source.printToErr();
        //6. 执行流环境
        env.execute();
    }
    public static class User {
        // id
        private int id;
        // username
        private String username;
        // password
        private String password;
        // name
        private String name;
        public int getId() {
            return id;
        }
        public void setId(int id) {
            this.id = id;
        }
        public String getUsername() {
            return username;
        }
        public void setUsername(String username) {
            this.username = username;
        }
        public String getPassword() {
            return password;
        }
        public void setPassword(String password) {
            this.password = password;
        }
        public String getName() {
            return name;
        }
        public void setName(String name) {
            this.name = name;
        }
        @Override
        public String toString() {
            return "User{" +
                    "id=" + id +
                    ", username='" + username + '\'' +
                    ", password='" + password + '\'' +
                    ", name='" + name + '\'' +
                    '}';
        }
    }
}










