0
点赞
收藏
分享

微信扫一扫

大数据之flink分区

萨科潘 2022-02-11 阅读 36
flink

一、分区

1、rebalance轮询分区

package cn._51doit.flink.day03;


import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

public class RebalancingPartitioning {

    public static void main(String[] args) throws Exception{

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        DataStreamSource<String> words = env.socketTextStream("localhost", 8888);

        SingleOutputStreamOperator<String> mapDataStream = words.map(new RichMapFunction<String, String>() {

            @Override
            public String map(String value) throws Exception {
                int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
                return value + " :" + indexOfThisSubtask;
            }
        }).setParallelism(1);

        //使用轮询的方式见将数据发送小下游
        DataStream<String> rebalanced = mapDataStream.rebalance();

        rebalanced.addSink(new RichSinkFunction<String>() {

            @Override
            public void invoke(String value, Context context) throws Exception {
                int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
                System.out.println(value + " -> " + indexOfThisSubtask);
            }
        });


        env.execute();
    }

}

2、Shuffle随机分区

package cn._51doit.flink.day03;

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

public class ShufflePartitioningDemo {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        //Source是一个非并行的Source
        //并行度是1
        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);

        //并行度2
        SingleOutputStreamOperator<String> mapped = lines.map(new RichMapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
                return value + " : " + indexOfThisSubtask;
            }
        }).setParallelism(2);

        //shuffle
        DataStream<String> shuffled = mapped.shuffle();

        shuffled.addSink(new RichSinkFunction<String>() {
            @Override
            public void invoke(String value, Context context) throws Exception {
                int index = getRuntimeContext().getIndexOfThisSubtask();
                System.out.println(value + " -> " + index);
            }
        });

        env.execute();


    }
}

3、Broadcast广播分区

package cn._51doit.flink.day03;

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

public class BroadcastPartitioningDemo {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        //Source是一个非并行的Source
        //并行度是1
        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);

        //并行度2
        SingleOutputStreamOperator<String> mapped = lines.map(new RichMapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
                return value + " : " + indexOfThisSubtask;
            }
        }).setParallelism(1);

        //广播,上游的算子将一个数据广播到下游所以的subtask
        DataStream<String> shuffled = mapped.broadcast();

        shuffled.addSink(new RichSinkFunction<String>() {
            @Override
            public void invoke(String value, Context context) throws Exception {
                int index = getRuntimeContext().getIndexOfThisSubtask();
                System.out.println(value + " -> " + index);
            }
        });

        env.execute();


    }
}

4、Custom自定义分区

package cn._51doit.flink.day03;

import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

public class CustomPartitioningDemo {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        //Source是一个非并行的Source
        //并行度是1
        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);

        //并行度2
        SingleOutputStreamOperator<Tuple2<String, Integer>> mapped = lines.map(new RichMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
                return Tuple2.of(value, indexOfThisSubtask);
            }
        });//.setParallelism(2);

        //按照指定的规则进行分区
        DataStream<Tuple2<String, Integer>> partitioned = mapped.partitionCustom(new Partitioner<String>() {

            @Override
            public int partition(String key, int numPartitions) {
                //System.out.println("key: " + key  + " ,下游task的并行度:" + numPartitions);
                int res = 0;
                if("spark".equals(key)) {
                    res = 1;
                } else if ("flink".equals(key)){
                    res = 2;
                } else if("hadoop".equals(key)) {
                    res = 3;
                }
                return res;
            }
        }, tp -> tp.f0);

        partitioned.addSink(new RichSinkFunction<Tuple2<String, Integer>>() {

            @Override
            public void invoke(Tuple2<String, Integer> value, Context context) throws Exception {

                int index = getRuntimeContext().getIndexOfThisSubtask();
                System.out.println(value.f0 + " , 上游 " + value.f1 + " -> 下游 " + index);
            }
        });

        env.execute();
    }
}
举报

相关推荐

0 条评论