一、分区
1、rebalance轮询分区
package cn._51doit.flink.day03;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
public class RebalancingPartitioning {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
DataStreamSource<String> words = env.socketTextStream("localhost", 8888);
SingleOutputStreamOperator<String> mapDataStream = words.map(new RichMapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
return value + " :" + indexOfThisSubtask;
}
}).setParallelism(1);
//使用轮询的方式见将数据发送小下游
DataStream<String> rebalanced = mapDataStream.rebalance();
rebalanced.addSink(new RichSinkFunction<String>() {
@Override
public void invoke(String value, Context context) throws Exception {
int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
System.out.println(value + " -> " + indexOfThisSubtask);
}
});
env.execute();
}
}
2、Shuffle随机分区
package cn._51doit.flink.day03;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
public class ShufflePartitioningDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
//Source是一个非并行的Source
//并行度是1
DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
//并行度2
SingleOutputStreamOperator<String> mapped = lines.map(new RichMapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
return value + " : " + indexOfThisSubtask;
}
}).setParallelism(2);
//shuffle
DataStream<String> shuffled = mapped.shuffle();
shuffled.addSink(new RichSinkFunction<String>() {
@Override
public void invoke(String value, Context context) throws Exception {
int index = getRuntimeContext().getIndexOfThisSubtask();
System.out.println(value + " -> " + index);
}
});
env.execute();
}
}
3、Broadcast广播分区
package cn._51doit.flink.day03;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
public class BroadcastPartitioningDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
//Source是一个非并行的Source
//并行度是1
DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
//并行度2
SingleOutputStreamOperator<String> mapped = lines.map(new RichMapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
return value + " : " + indexOfThisSubtask;
}
}).setParallelism(1);
//广播,上游的算子将一个数据广播到下游所以的subtask
DataStream<String> shuffled = mapped.broadcast();
shuffled.addSink(new RichSinkFunction<String>() {
@Override
public void invoke(String value, Context context) throws Exception {
int index = getRuntimeContext().getIndexOfThisSubtask();
System.out.println(value + " -> " + index);
}
});
env.execute();
}
}
4、Custom自定义分区
package cn._51doit.flink.day03;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
public class CustomPartitioningDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
//Source是一个非并行的Source
//并行度是1
DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
//并行度2
SingleOutputStreamOperator<Tuple2<String, Integer>> mapped = lines.map(new RichMapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
return Tuple2.of(value, indexOfThisSubtask);
}
});//.setParallelism(2);
//按照指定的规则进行分区
DataStream<Tuple2<String, Integer>> partitioned = mapped.partitionCustom(new Partitioner<String>() {
@Override
public int partition(String key, int numPartitions) {
//System.out.println("key: " + key + " ,下游task的并行度:" + numPartitions);
int res = 0;
if("spark".equals(key)) {
res = 1;
} else if ("flink".equals(key)){
res = 2;
} else if("hadoop".equals(key)) {
res = 3;
}
return res;
}
}, tp -> tp.f0);
partitioned.addSink(new RichSinkFunction<Tuple2<String, Integer>>() {
@Override
public void invoke(Tuple2<String, Integer> value, Context context) throws Exception {
int index = getRuntimeContext().getIndexOfThisSubtask();
System.out.println(value.f0 + " , 上游 " + value.f1 + " -> 下游 " + index);
}
});
env.execute();
}
}