Flink
Flink
部署
- On Yarn
-
yarn-session 开启session yarn-session.sh -jm 1024m -tm 4096m 在session上提交作业 flink run -m yarn-cluster -p 4 -yjm 1024m -ytm 4096m ./examples/batch/WordCount.jar 关闭session echo “stop” | ./bin/yarn-session.sh -id
-D 使用-D将要设置的参数进行设置(-Dtaskmanager.memory.network.min=536346624) -d,–detached 开启分离模式(session启动后client自动关掉) -jm,–jobManagerMemory jobmanager 内存大小(默认: MB) -nm,–name 设置应用程序的名称 -at,–applicationType 设置应用程序的类型 -q,–query 查询使用的Yarn的资源(Memory,Core) -qu,–queue 设置在Yarn上使用的队列. -s,–slots 设置每个TaskManager的Slot数量 -tm,–taskManagerMemory 设置每个TaskManager的内存大小(默认: MB) -z,–zookeeperNamespace
-
pro-job flink run -m yarn-cluster ./examples/batch/WordCount.jar
run -c,–class 程序全路径名称 -C,–classpath -n,–allowNonRestoredState 忽略保存点状态不能被restored的错误 -p,–parallelism 程序并行度(会覆盖进群的配置,集群配置配置 < 程序里配置文件配置 < 运行时指定 ) -s,–fromSavepoint 设置应用程序保存点文件路径 ————for yarn————————— -d,–detached -m,–jobmanager 设置jobmanger地址 -yat,–yarnapplicationType 设置应用程序的类型 -yD <property=value> 设置配置参数 -yid,–yarnapplicationId 将应用程序提交到yarn-session上运行 -yj,–yarnjar flink jar 文件地址 -yjm,–yarnjobManagerMemory 设置yarn模式下jobmanager 内存大小(默认:MB) -ynl,–yarnnodeLabel 设置node标签 -ynm,–yarnname 设置应用程序的名称 -yq,–yarnquery 查询在yarn上的资源(memory,core) -yqu,–yarnqueue 设置应用程序队列 -ys,–yarnslots 设置每个taskmanager的slot数量 -yt,–yarnship -ytm,–yarntaskManagerMemory 设置每个taskmanager 的内存大小(默认:MB)
-
application
-
查看集群作业
- flink list //列出计划和正在运行的作业
- flink list -s //列出计划的作业
- flink list -r //列出正在执行的作业
- flink list -a //列出所有的作业
算子
Map
数据流一对一的转换,作用在每一条数据上,对流经算子的每一条数据进行相应的操作,输入输出类型可变。
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource<String> elements = env.fromElements("hello","flink");
MapOperator<String, String> map = elements.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
return "ods-" + value;
}
});
map.print();
---------------------------------------------------------------------------------------------------------------------------------
结果:
ods-hello
ods-flink
FlatMap
数据流一对多的转换,作用在每一条数据上,将流经算子的数据拆分成相应的数据进行输出,输入输出数据类型可变。
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource<String> elements = env.fromElements("hello flink");
FlatMapOperator<String, String> flatMap = elements.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
for (String s : value.split(" ")) {
out.collect("ods-"+s);
}
}
});
flatMap.print();
---------------------------------------------------------------------------------------------------------------------------------
结果:
ods-hello
ods-flink
MapPartition
数据流多对一的转换,作用在每一个分区上,将分区内的多条数据通过迭代操作然后输出结果,输入输出数据类型可变。
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource<String> elements = env.fromElements("hello","flink");
MapPartitionOperator<String, String> mapPartition = elements.mapPartition(new MapPartitionFunction<String, String>() {
@Override
public void mapPartition(Iterable<String> values, Collector<String> out) throws Exception {
values.forEach(new Consumer<String>() {
@Override
public void accept(String s) {
out.collect("ods-" + s);
}
});
}
});
mapPartition.print();
---------------------------------------------------------------------------------------------------------------------------------
结果:
ods-hello
ods-flink
Filter
数据流一对一的转换,作用在每一条数据上,根据操作函数返回的true和false来确定是否让数据通过进入下一算子
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource<String> elements = env.fromElements("hello","flink");
FilterOperator<String> filter = elements.filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
return value.startsWith("h");
}
});
filter.print();
---------------------------------------------------------------------------------------------------------------------------------
结果:
hello
Project
数据流一对一的转换,作用在每一条数据上,将带有元组的数据流根据序号选取,只针对java api调用
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ArrayList<Tuple3<Integer,String,Boolean>> list = new ArrayList<>();
Tuple3 tuple3 = new Tuple3<Integer,String,Boolean>(1,"a",true);
list.add(tuple3);
DataSet<Tuple3<Integer,String,Boolean>> source = env.fromCollection(list);
DataSet<Tuple2<Boolean,Integer>> project = source.project(2,0);
project.print();
---------------------------------------------------------------------------------------------------------------------------------
结果:
(true,1)
Reduce
- 不基于groupBy
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); ArrayList<Tuple3<Integer,String,Boolean>> list = new ArrayList<>(); Tuple3 tuple3 = new Tuple3<Integer,String,Boolean>(1,"a",true); list.add(tuple3); tuple3 = new Tuple3<Integer,String,Boolean>(2,"a",true); list.add(tuple3); tuple3 = new Tuple3<Integer,String,Boolean>(3,"b",true); list.add(tuple3); tuple3 = new Tuple3<Integer,String,Boolean>(4,"b",true); list.add(tuple3); DataSource<Tuple3<Integer, String, Boolean>> source = env.fromCollection(list); ReduceOperator<Tuple3<Integer, String, Boolean>> reduce = source.reduce(new ReduceFunction<Tuple3<Integer, String, Boolean>>() { @Override public Tuple3<Integer, String, Boolean> reduce(Tuple3<Integer, String, Boolean> value1, Tuple3<Integer, String, Boolean> value2) throws Exception { return new Tuple3<>(value1.f0 + value2.f0, value1.f1, false); } }); reduce.print(); ------------------------------------------------------------------------------------------------------------------------------ 结果: (10,a,false) - 基于groupBy
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); ArrayList<Tuple3<Integer,String,Boolean>> list = new ArrayList<>(); Tuple3 tuple3 = new Tuple3<Integer,String,Boolean>(1,"a",true); list.add(tuple3); tuple3 = new Tuple3<Integer,String,Boolean>(2,"a",true); list.add(tuple3); tuple3 = new Tuple3<Integer,String,Boolean>(3,"b",true); list.add(tuple3); tuple3 = new Tuple3<Integer,String,Boolean>(4,"b",true); list.add(tuple3); DataSource<Tuple3<Integer, String, Boolean>> source = env.fromCollection(list); UnsortedGrouping<Tuple3<Integer, String, Boolean>> groupBy = source.groupBy(1); ReduceOperator<Tuple3<Integer, String, Boolean>> reduce = groupBy.reduce(new ReduceFunction<Tuple3<Integer, String, Boolean>>() { @Override public Tuple3<Integer, String, Boolean> reduce(Tuple3<Integer, String, Boolean> value1, Tuple3<Integer, String, Boolean> value2) throws Exception { return new Tuple3<>(value1.f0+value2.f0,value1.f1,false); } }); reduce.print(); ------------------------------------------------------------------------------------------------------------------------------ 结果: (3,a,false) (7,b,false)
GroupBy
- 不基于groupBy
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); ArrayList<Tuple3<Integer,String,Boolean>> list = new ArrayList<>(); Tuple3 tuple3 = new Tuple3<Integer,String,Boolean>(1,"a",true); list.add(tuple3); tuple3 = new Tuple3<Integer,String,Boolean>(2,"a",true); list.add(tuple3); tuple3 = new Tuple3<Integer,String,Boolean>(3,"b",true); list.add(tuple3); tuple3 = new Tuple3<Integer,String,Boolean>(4,"b",true); list.add(tuple3); DataSource<Tuple3<Integer, String, Boolean>> source = env.fromCollection(list); UnsortedGrouping<Tuple3<Integer, String, Boolean>> groupBy = source.groupBy(new KeySelector<Tuple3<Integer, String, Boolean>, String>() { @Override public String getKey(Tuple3<Integer, String, Boolean> value) throws Exception { return value.f1; } }); ReduceOperator<Tuple3<Integer, String, Boolean>> reduce = groupBy.reduce(new ReduceFunction<Tuple3<Integer, String, Boolean>>() { @Override public Tuple3<Integer, String, Boolean> reduce(Tuple3<Integer, String, Boolean> value1, Tuple3<Integer, String, Boolean> value2) throws Exception { return new Tuple3<>(value1.f0+value2.f0,value1.f1,false); } }); reduce.print(); ------------------------------------------------------------------------------------------------------------------------------ 结果: (3,a,false) (7,b,false) - 基于groupBy
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); ArrayList<Tuple3<Integer,String,Boolean>> list = new ArrayList<>(); Tuple3 tuple3 = new Tuple3<Integer,String,Boolean>(1,"a",true); list.add(tuple3); tuple3 = new Tuple3<Integer,String,Boolean>(2,"a",true); list.add(tuple3); tuple3 = new Tuple3<Integer,String,Boolean>(3,"b",true); list.add(tuple3); tuple3 = new Tuple3<Integer,String,Boolean>(4,"b",true); list.add(tuple3); DataSource<Tuple3<Integer, String, Boolean>> source = env.fromCollection(list); UnsortedGrouping<Tuple3<Integer, String, Boolean>> groupBy = source.groupBy(1); ReduceOperator<Tuple3<Integer, String, Boolean>> reduce = groupBy.reduce(new ReduceFunction<Tuple3<Integer, String, Boolean>>() { @Override public Tuple3<Integer, String, Boolean> reduce(Tuple3<Integer, String, Boolean> value1, Tuple3<Integer, String, Boolean> value2) throws Exception { return new Tuple3<>(value1.f0+value2.f0,value1.f1,false); } }); reduce.print(); ------------------------------------------------------------------------------------------------------------------------------ 结果: (3,a,false) (7,b,false)
ReduceGroup
与reduce的区别在于,reducegroup 可以获取一个组内所有的数据,可以单用也可以和groupby进行组合使用,输入输出类型可变
- 不基于groupBy
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); ArrayList<Tuple3<Integer,String,Boolean>> list = new ArrayList<>(); Tuple3 tuple3 = new Tuple3<Integer,String,Boolean>(1,"a",true); list.add(tuple3); tuple3 = new Tuple3<Integer,String,Boolean>(2,"a",true); list.add(tuple3); tuple3 = new Tuple3<Integer,String,Boolean>(3,"a",true); list.add(tuple3); tuple3 = new Tuple3<Integer,String,Boolean>(4,"b",true); list.add(tuple3); tuple3 = new Tuple3<Integer,String,Boolean>(5,"b",true); list.add(tuple3); tuple3 = new Tuple3<Integer,String,Boolean>(6,"b",true); list.add(tuple3); DataSource<Tuple3<Integer, String, Boolean>> source = env.fromCollection(list); GroupReduceOperator<Tuple3<Integer, String, Boolean>, Tuple3<Integer, String, Boolean>> reduceGroup = source.reduceGroup(new RichGroupReduceFunction<Tuple3<Integer, String, Boolean>, Tuple3<Integer, String, Boolean>>() { @Override public void reduce(Iterable<Tuple3<Integer, String, Boolean>> values, Collector<Tuple3<Integer, String, Boolean>> out) throws Exception { final int[] tmp = {0}; final String[] key = {""}; values.forEach(new Consumer<Tuple3<Integer, String, Boolean>>() { @Override public void accept(Tuple3<Integer, String, Boolean> in) { if (in.f0%2==0){ tmp[0] +=in.f0; key[0] =in.f1; } } }); out.collect(new Tuple3<>(tmp[0], key[0],false)); } }); reduceGroup.print(); ------------------------------------------------------------------------------------------------------------------------------ 结果: (12,b,false) - 基于groupBy
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); ArrayList<Tuple3<Integer,String,Boolean>> list = new ArrayList<>(); Tuple3 tuple3 = new Tuple3<Integer,String,Boolean>(1,"a",true); list.add(tuple3); tuple3 = new Tuple3<Integer,String,Boolean>(2,"a",true); list.add(tuple3); tuple3 = new Tuple3<Integer,String,Boolean>(3,"a",true); list.add(tuple3); tuple3 = new Tuple3<Integer,String,Boolean>(4,"b",true); list.add(tuple3); tuple3 = new Tuple3<Integer,String,Boolean>(5,"b",true); list.add(tuple3); tuple3 = new Tuple3<Integer,String,Boolean>(6,"b",true); list.add(tuple3); DataSource<Tuple3<Integer, String, Boolean>> source = env.fromCollection(list); UnsortedGrouping<Tuple3<Integer, String, Boolean>> groupBy = source.groupBy(1); GroupReduceOperator<Tuple3<Integer, String, Boolean>, Tuple3<Integer, String, Boolean>> reduceGroup = groupBy.reduceGroup(new RichGroupReduceFunction<Tuple3<Integer, String, Boolean>, Tuple3<Integer, String, Boolean>>() { @Override public void reduce(Iterable<Tuple3<Integer, String, Boolean>> values, Collector<Tuple3<Integer, String, Boolean>> out) throws Exception { final int[] tmp = {0}; final String[] key = {""}; values.forEach(new Consumer<Tuple3<Integer, String, Boolean>>() { @Override public void accept(Tuple3<Integer, String, Boolean> in) { if (in.f0%2==0){ tmp[0] +=in.f0; key[0] =in.f1; } } }); out.collect(new Tuple3<>(tmp[0], key[0],false)); } }); reduceGroup.print(); ------------------------------------------------------------------------------------------------------------------------------ 结果: (2,a,false) (10,b,false)
SortGroup
组内排序,前提在使用分组聚合以后(groupBy,keyBy)以后,对组内数据进行指定顺序排序操作
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ArrayList<Tuple3<Integer,String,Boolean>> list = new ArrayList<>();
Tuple3 tuple3 = new Tuple3<Integer,String,Boolean>(1,"a",true);
list.add(tuple3);
tuple3 = new Tuple3<Integer,String,Boolean>(2,"a",true);
list.add(tuple3);
tuple3 = new Tuple3<Integer,String,Boolean>(3,"a",true);
list.add(tuple3);
tuple3 = new Tuple3<Integer,String,Boolean>(4,"b",true);
list.add(tuple3);
tuple3 = new Tuple3<Integer,String,Boolean>(5,"b",true);
list.add(tuple3);
tuple3 = new Tuple3<Integer,String,Boolean>(6,"b",true);
list.add(tuple3);
DataSource<Tuple3<Integer, String, Boolean>> source = env.fromCollection(list);
UnsortedGrouping<Tuple3<Integer, String, Boolean>> groupBy = source.groupBy(1);
SortedGrouping<Tuple3<Integer, String, Boolean>> sortGroup = groupBy.sortGroup(0, Order.ASCENDING);
sortGroup.first(2).print();
---------------------------------------------------------------------------------------------------------------------------------
结果:
(1,a,true)
(2,a,true)
(4,b,true)
(5,b,true)
GroupCombine
对数据流进行一次预聚合运算,可以减少数据的传输,该运算通常只有本分数据被聚合不能代替reduce操作,一般都是配合reduce或者reduceGroup进行操作,输入输出类型可变
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource<String> elements = env.fromElements("hello", "flink");
MapOperator<String, Tuple2<String, Integer>> map = elements.map(word -> new Tuple2<String, Integer>(word, 1)).returns(Types.TUPLE(Types.STRING,
Types.INT));
UnsortedGrouping<Tuple2<String, Integer>> groupBy = map.groupBy(0);
GroupCombineOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> combineGroup = groupBy.combineGroup((Iterable<Tuple2<String, Integer>> values, Collector<Tuple2<String, Integer>> out) -> {
final String[] key = new String[1];
final int[] cnt = {0};
values.forEach(x -> {
key[0] = x.f0;
cnt[0]++;
});
out.collect(new Tuple2<String, Integer>(key[0], cnt[0]));
}).returns(Types.TUPLE(Types.STRING, Types.INT));
;
combineGroup.print();
---------------------------------------------------------------------------------------------------------------------------------
结果:
(flink,1)
(hello,1)
Aggregate
将数据流根据(SUM,MIN,MAX)将某个字段聚合操作,有and运算符时,如果对同一个字段进行操作,只会进行最后操作符(MAX,MIN,SUM)的操作
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ArrayList<Tuple3<Integer, String, Boolean>> list = new ArrayList<>();
Tuple3 tuple3 = new Tuple3<Integer, String, Boolean>(1, "a", true);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(2, "a", true);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(3, "a", true);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(4, "b", true);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(5, "b", true);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(6, "b", false);
list.add(tuple3);
DataSet<Tuple3<Integer,String,Boolean>> source = env.fromCollection(list);
source.aggregate(SUM,0).print();
source.aggregate(MAX,0).print();
source.aggregate(MIN,0).print();
System.out.println("============");
source.aggregate(SUM,0).and(MIN,0).and(MAX,0).print();
source.aggregate(MIN,0).and(MAX,0).print();
source.aggregate(MIN,0).andMax(0).print();
System.out.println("============");
source.aggregate(MAX,0).and(MIN,0).print();
System.out.println("============");
source.aggregate(MAX,0).and(MIN,1).print();
---------------------------------------------------------------------------------------------------------------------------------
结果:
(21,b,false)
(6,b,false)
(1,b,false)
============
(6,b,false)
(6,b,false)
(6,b,false)
============
(1,b,false) //根据第一个字段取最小的,其他字段取最后一条
============
(6,a,false) //根据第一个字段取最大的,根据第二个字段取最小的,其他的取最后一条
MinBy
根据某几个字段求数据里面最小的一条,相当于order by col1,col2 asc
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ArrayList<Tuple3<Integer, String, Boolean>> list = new ArrayList<>();
Tuple3 tuple3 = new Tuple3<Integer, String, Boolean>(0, "a", false);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(2, "a", true);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(3, "a", true);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(4, "b", true);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(5, "b", true);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(0, "b", true);
list.add(tuple3);
DataSet<Tuple3<Integer,String,Boolean>> source = env.fromCollection(list);
source.minBy(0,0).print();
source.minBy(0,1).print();
---------------------------------------------------------------------------------------------------------------------------------
结果:
(0,a,false) //只根据第一个字段取最小的那条记录
(0,a,false) //根据第一个字段和第二个字段取最小的那条记录
MaxBy
根据某几个字段求数据里面最大的一条,相当于order by col1,col2 desc
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ArrayList<Tuple3<Integer, String, Boolean>> list = new ArrayList<>();
Tuple3 tuple3 = new Tuple3<Integer, String, Boolean>(6, "a", false);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(2, "a", true);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(3, "a", true);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(4, "b", true);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(5, "b", true);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(6, "b", false);
list.add(tuple3);
DataSet<Tuple3<Integer,String,Boolean>> source = env.fromCollection(list);
source.maxBy(0,0).print();
source.maxBy(0,1).print();
---------------------------------------------------------------------------------------------------------------------------------
结果:
(6,a,false) //只根据第一个字段取最大的那条记录
(6,b,false) //根据第一个和第二个字段取最大的那条记录
Distinct
distinct 可以根据字段序号,keyselect或者对象的属性名字直接进行去重
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ArrayList<Tuple3<Integer, String, Boolean>> list = new ArrayList<>();
Tuple3 tuple3 = new Tuple3<Integer, String, Boolean>(0, "a", false);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(2, "a", true);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(3, "a", true);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(4, "b", true);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(5, "b", true);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(0, "b", true);
list.add(tuple3);
DataSource<Tuple3<Integer, String, Boolean>> source = env.fromCollection(list);
source.distinct(0).print();
---------------------------------------------------------------------------------------------------------------------------------
结果:
(3,a,true)
(0,a,false)
(5,b,true)
(2,a,true)
(4,b,true)
Join
-
join
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); ArrayList<Tuple3<Integer, String, Boolean>> list = new ArrayList<>(); Tuple3 tuple3 = new Tuple3<Integer, String, Boolean>(0, "a", false); list.add(tuple3); tuple3 = new Tuple3<Integer, String, Boolean>(2, "a", true); list.add(tuple3); tuple3 = new Tuple3<Integer, String, Boolean>(3, "a", true); list.add(tuple3); tuple3 = new Tuple3<Integer, String, Boolean>(4, "b", true); list.add(tuple3); tuple3 = new Tuple3<Integer, String, Boolean>(5, "b", true); list.add(tuple3); tuple3 = new Tuple3<Integer, String, Boolean>(0, "b", true); list.add(tuple3); DataSource<Tuple3<Integer, String, Boolean>> source = env.fromCollection(list); source.join(source).where(0).equalTo(0).print(); ----------------------------------------------------------------------------------------------------------------------------- 结果: ((3,a,true),(3,a,true)) ((0,a,false),(0,a,false)) ((0,b,true),(0,a,false)) ((5,b,true),(5,b,true)) ((0,a,false),(0,b,true)) ((0,b,true),(0,b,true)) ((2,a,true),(2,a,true)) ((4,b,true),(4,b,true))input1.join(input2, JoinHint.BROADCAST_HASH_FIRST) - JoinHint.OPTIMIZER_CHOOSES 任系统自己选择 - JoinHint.BROADCAST_HASH_FIRST 对第一个表进行hash,广播第一个表,小表join大表 - JoinHint.BROADCAST_HASH_SECOND 对第二个表进行hash,广播第二个表,大表join小表 - JoinHint.REPARTITION_HASH_FIRST 对每个表都进行分区,并且对第一个表进行hash,小表join大表,但是两个表任然很大 如果无法进行大小估计并且无法重新使用现有的分区和排序顺序,这是系统使用的默认后备策略 - JoinHint.REPARTITION_HASH_SECOND 对每个表都进行分区,并且对第二个表进行hash,大表join小表,但是两个表任然很大 - JoinHint.REPARTITION_SORT_MERGE 对每个表都进行分区,对每个表都进行排序,对有排序的表性能会提高 -
joinWithTiny JoinHint.BROADCAST_HASH_FIRST
----------------------------------------------------------------------------------------------------------------------------- 结果: -
joinWithHuge JoinHint.BROADCAST_HASH_SECOND
----------------------------------------------------------------------------------------------------------------------------- 结果:
OuterJoin
-
LeftOuterJoin
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); ArrayList<Tuple3<Integer, String, Boolean>> list = new ArrayList<>(); Tuple3 tuple3 = new Tuple3<Integer, String, Boolean>(0, "a", false); list.add(tuple3); tuple3 = new Tuple3<Integer, String, Boolean>(2, "a", true); list.add(tuple3); tuple3 = new Tuple3<Integer, String, Boolean>(3, "a", true); list.add(tuple3); tuple3 = new Tuple3<Integer, String, Boolean>(4, "b", true); list.add(tuple3); tuple3 = new Tuple3<Integer, String, Boolean>(5, "b", true); list.add(tuple3); tuple3 = new Tuple3<Integer, String, Boolean>(0, "b", true); list.add(tuple3); ArrayList<Tuple3<Integer, String, Boolean>> list1 = new ArrayList<>(); Tuple3 tuple4 = new Tuple3<Integer, String, Boolean>(0, "a1", false); list1.add(tuple4); tuple4 = new Tuple3<Integer, String, Boolean>(2, "a1", true); list1.add(tuple4); tuple4 = new Tuple3<Integer, String, Boolean>(3, "a1", true); list1.add(tuple4); tuple4 = new Tuple3<Integer, String, Boolean>(4, "b1", true); list1.add(tuple4); DataSource<Tuple3<Integer, String, Boolean>> source = env.fromCollection(list); DataSource<Tuple3<Integer, String, Boolean>> source1 = env.fromCollection(list1); JoinFunctionAssigner<Tuple3<Integer, String, Boolean>, Tuple3<Integer, String, Boolean>> equalTo = source.leftOuterJoin(source1).where(0) .equalTo(0); equalTo.with(new JoinFunction<Tuple3<Integer,String,Boolean>, Tuple3<Integer,String,Boolean>, Tuple3<Integer,String,Boolean>>() { @Override public Tuple3<Integer, String, Boolean> join(Tuple3<Integer, String, Boolean> first, Tuple3<Integer, String, Boolean> second) throws Exception { return second ==null ? first: new Tuple3<Integer, String, Boolean>(first.f0+second.f0,first.f1+"-"+second.f1,first.f2); } }).print(); System.out.println("==============="); equalTo.with(new FlatJoinFunction<Tuple3<Integer,String,Boolean>, Tuple3<Integer,String,Boolean>, Tuple3<Integer,String,Boolean>>() { @Override public void join(Tuple3<Integer, String, Boolean> first, Tuple3<Integer, String, Boolean> second, Collector<Tuple3<Integer, String, Boolean>> out) throws Exception { out.collect(second ==null ? first: new Tuple3<Integer, String, Boolean>(first.f0+second.f0,first.f1+"-"+second.f1,first.f2)); } }).print(); ----------------------------------------------------------------------------------------------------------------------------- 结果: (6,a-a1,true) (0,a-a1,false) (5,b,true) (0,b-a1,true) (4,a-a1,true) (8,b-b1,true) =============== (6,a-a1,true) (0,a-a1,false) (5,b,true) (0,b-a1,true) (4,a-a1,true) (8,b-b1,true)- OPTIMIZER_CHOOSES - BROADCAST_HASH_SECOND - REPARTITION_HASH_SECOND - REPARTITION_SORT_MERGE -
RightOuterJoin
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); ArrayList<Tuple3<Integer, String, Boolean>> list = new ArrayList<>(); Tuple3 tuple3 = new Tuple3<Integer, String, Boolean>(0, "a", false); list.add(tuple3); tuple3 = new Tuple3<Integer, String, Boolean>(2, "a", true); list.add(tuple3); tuple3 = new Tuple3<Integer, String, Boolean>(3, "a", true); list.add(tuple3); tuple3 = new Tuple3<Integer, String, Boolean>(4, "b", true); list.add(tuple3); tuple3 = new Tuple3<Integer, String, Boolean>(5, "b", true); list.add(tuple3); tuple3 = new Tuple3<Integer, String, Boolean>(0, "b", true); list.add(tuple3); ArrayList<Tuple3<Integer, String, Boolean>> list1 = new ArrayList<>(); Tuple3 tuple4 = new Tuple3<Integer, String, Boolean>(0, "a1", false); list1.add(tuple4); tuple4 = new Tuple3<Integer, String, Boolean>(2, "a1", true); list1.add(tuple4); tuple4 = new Tuple3<Integer, String, Boolean>(3, "a1", true); list1.add(tuple4); tuple4 = new Tuple3<Integer, String, Boolean>(4, "b1", true); list1.add(tuple4); DataSource<Tuple3<Integer, String, Boolean>> source = env.fromCollection(list); DataSource<Tuple3<Integer, String, Boolean>> source1 = env.fromCollection(list1); JoinFunctionAssigner<Tuple3<Integer, String, Boolean>, Tuple3<Integer, String, Boolean>> equalTo = source.rightOuterJoin(source1).where(0) .equalTo(0); equalTo.with(new JoinFunction<Tuple3<Integer,String,Boolean>, Tuple3<Integer,String,Boolean>, Tuple3<Integer,String,Boolean>>() { @Override public Tuple3<Integer, String, Boolean> join(Tuple3<Integer, String, Boolean> first, Tuple3<Integer, String, Boolean> second) throws Exception { return second ==null ? first: new Tuple3<Integer, String, Boolean>(first.f0+second.f0,first.f1+"-"+second.f1,first.f2); } }).print(); System.out.println("==============="); equalTo.with(new FlatJoinFunction<Tuple3<Integer,String,Boolean>, Tuple3<Integer,String,Boolean>, Tuple3<Integer,String,Boolean>>() { @Override public void join(Tuple3<Integer, String, Boolean> first, Tuple3<Integer, String, Boolean> second, Collector<Tuple3<Integer, String, Boolean>> out) throws Exception { out.collect(second ==null ? first: new Tuple3<Integer, String, Boolean>(first.f0+second.f0,first.f1+"-"+second.f1,first.f2)); } }).print(); ----------------------------------------------------------------------------------------------------------------------------- 结果: (6,a-a1,true) (0,a-a1,false) (0,b-a1,true) (4,a-a1,true) (8,b-b1,true) =============== (6,a-a1,true) (0,a-a1,false) (0,b-a1,true) (4,a-a1,true) (8,b-b1,true)- OPTIMIZER_CHOOSES - BROADCAST_HASH_FIRST - REPARTITION_HASH_FIRST - REPARTITION_SORT_MERGE -
FullOuterJoin
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); ArrayList<Tuple3<Integer, String, Boolean>> list = new ArrayList<>(); Tuple3 tuple3 = new Tuple3<Integer, String, Boolean>(0, "a", false); list.add(tuple3); tuple3 = new Tuple3<Integer, String, Boolean>(3, "a", true); list.add(tuple3); tuple3 = new Tuple3<Integer, String, Boolean>(4, "b", true); list.add(tuple3); tuple3 = new Tuple3<Integer, String, Boolean>(5, "b", true); list.add(tuple3); tuple3 = new Tuple3<Integer, String, Boolean>(0, "b", true); list.add(tuple3); ArrayList<Tuple3<Integer, String, Boolean>> list1 = new ArrayList<>(); Tuple3 tuple4 = new Tuple3<Integer, String, Boolean>(0, "a1", false); list1.add(tuple4); tuple4 = new Tuple3<Integer, String, Boolean>(2, "a1", true); list1.add(tuple4); tuple4 = new Tuple3<Integer, String, Boolean>(3, "a1", true); list1.add(tuple4); tuple4 = new Tuple3<Integer, String, Boolean>(4, "b1", true); list1.add(tuple4); DataSource<Tuple3<Integer, String, Boolean>> source = env.fromCollection(list); DataSource<Tuple3<Integer, String, Boolean>> source1 = env.fromCollection(list1); JoinFunctionAssigner<Tuple3<Integer, String, Boolean>, Tuple3<Integer, String, Boolean>> equalTo = source.fullOuterJoin(source1).where(0) .equalTo(0); equalTo.with(new JoinFunction<Tuple3<Integer,String,Boolean>, Tuple3<Integer,String,Boolean>, Tuple3<Integer,String,Boolean>>() { @Override public Tuple3<Integer, String, Boolean> join(Tuple3<Integer, String, Boolean> first, Tuple3<Integer, String, Boolean> second) throws Exception { return first ==null ? second:second==null ? first: new Tuple3<Integer, String, Boolean>(first.f0+second.f0,first.f1+"-"+second .f1,first .f2); } }).print(); System.out.println("==============="); equalTo.with(new FlatJoinFunction<Tuple3<Integer,String,Boolean>, Tuple3<Integer,String,Boolean>, Tuple3<Integer,String,Boolean>>() { @Override public void join(Tuple3<Integer, String, Boolean> first, Tuple3<Integer, String, Boolean> second, Collector<Tuple3<Integer, String, Boolean>> out) throws Exception { out.collect(first ==null ? second:second==null ? first: new Tuple3<Integer, String, Boolean>(first.f0+second.f0,first.f1+"-"+second.f1,first.f2)); } }).print(); ----------------------------------------------------------------------------------------------------------------------------- 结果: (6,a-a1,true) (0,a-a1,false) (0,b-a1,true) (5,b,true) (2,a1,true) (8,b-b1,true) =============== (6,a-a1,true) (0,a-a1,false) (0,b-a1,true) (5,b,true) (2,a1,true) (8,b-b1,true)- OPTIMIZER_CHOOSES - REPARTITION_SORT_MERGE
Cross
笛卡尔积
-
cross
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); ArrayList<Tuple3<Integer, String, Boolean>> list = new ArrayList<>(); Tuple3 tuple3 = new Tuple3<Integer, String, Boolean>(0, "a", false); list.add(tuple3); tuple3 = new Tuple3<Integer, String, Boolean>(3, "a", true); list.add(tuple3); tuple3 = new Tuple3<Integer, String, Boolean>(4, "b", true); list.add(tuple3); tuple3 = new Tuple3<Integer, String, Boolean>(5, "b", true); list.add(tuple3); tuple3 = new Tuple3<Integer, String, Boolean>(0, "b", true); list.add(tuple3); ArrayList<Tuple3<Integer, String, Boolean>> list1 = new ArrayList<>(); Tuple3 tuple4 = new Tuple3<Integer, String, Boolean>(0, "a1", false); list1.add(tuple4); tuple4 = new Tuple3<Integer, String, Boolean>(2, "a1", true); list1.add(tuple4); tuple4 = new Tuple3<Integer, String, Boolean>(3, "a1", true); list1.add(tuple4); tuple4 = new Tuple3<Integer, String, Boolean>(4, "b1", true); list1.add(tuple4); DataSource<Tuple3<Integer, String, Boolean>> source = env.fromCollection(list); DataSource<Tuple3<Integer, String, Boolean>> source1 = env.fromCollection(list1); source.cross(source1).with(new CrossFunction<Tuple3<Integer,String,Boolean>, Tuple3<Integer,String,Boolean>, Tuple3<Integer,String,Boolean>>() { @Override public Tuple3<Integer, String, Boolean> cross(Tuple3<Integer, String, Boolean> val1, Tuple3<Integer, String, Boolean> val2) throws Exception { return new Tuple3<>(val1.f0+val2.f0,val1.f1+"-"+val2.f1,val1.f2); } }).print(); ----------------------------------------------------------------------------------------------------------------------------- 结果: (0,a-a1,false) (2,a-a1,false) (3,a-a1,false) (4,a-b1,false) (3,a-a1,true) (5,a-a1,true) (6,a-a1,true) (7,a-b1,true) (4,b-a1,true) (6,b-a1,true) (7,b-a1,true) (8,b-b1,true) (5,b-a1,true) (7,b-a1,true) (8,b-a1,true) (9,b-b1,true) (0,b-a1,true) (2,b-a1,true) (3,b-a1,true) (4,b-b1,true) -
crossWithTiny
----------------------------------------------------------------------------------------------------------------------------- 结果: -
crossWithHuge
----------------------------------------------------------------------------------------------------------------------------- 结果:
CoGroup
coGroup 会将两个流的所有相等不相等的数据都可以输出,可以看做是针对所有key的group,两个流的类型可以不一样
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ArrayList<Tuple3<Integer, String, Boolean>> list = new ArrayList<>();
Tuple3 tuple3 = new Tuple3<Integer, String, Boolean>(0, "a", false);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(3, "a", true);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(4, "b", true);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(5, "b", true);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(0, "b", true);
list.add(tuple3);
ArrayList<Tuple3<Integer, String, Boolean>> list1 = new ArrayList<>();
Tuple3 tuple4 = new Tuple3<Integer, String, Boolean>(0, "a1", false);
list1.add(tuple4);
tuple4 = new Tuple3<Integer, String, Boolean>(2, "a1", true);
list1.add(tuple4);
tuple4 = new Tuple3<Integer, String, Boolean>(3, "a1", true);
list1.add(tuple4);
tuple4 = new Tuple3<Integer, String, Boolean>(4, "b1", true);
list1.add(tuple4);
DataSource<Tuple3<Integer, String, Boolean>> source = env.fromCollection(list);
DataSource<Tuple3<Integer, String, Boolean>> source1 = env.fromCollection(list1);
source.coGroup(source1).where(0).equalTo(0).with(new CoGroupFunction<Tuple3<Integer,String,Boolean>, Tuple3<Integer,String,Boolean>, Tuple3<Integer,String,Boolean>>() {
@Override
public void coGroup(Iterable<Tuple3<Integer, String, Boolean>> first, Iterable<Tuple3<Integer, String, Boolean>> second, Collector<Tuple3<Integer, String, Boolean>> out) throws Exception {
first.forEach(out::collect);
second.forEach(out::collect);
}
}).print();
---------------------------------------------------------------------------------------------------------------------------------
结果:
(3,a,true)
(3,a1,true)
(0,a,false)
(0,b,true)
(0,a1,false)
(5,b,true)
(2,a1,true)
(4,b,true)
(4,b1,true)
Union
两个或者多个流必须一样
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ArrayList<Tuple3<Integer, String, Boolean>> list = new ArrayList<>();
Tuple3 tuple3 = new Tuple3<Integer, String, Boolean>(0, "a", false);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(3, "a", true);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(4, "b", true);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(5, "b", true);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(0, "b", true);
list.add(tuple3);
ArrayList<Tuple3<Integer, String, Boolean>> list1 = new ArrayList<>();
Tuple3 tuple4 = new Tuple3<Integer, String, Boolean>(0, "a1", false);
list1.add(tuple4);
tuple4 = new Tuple3<Integer, String, Boolean>(2, "a1", true);
list1.add(tuple4);
tuple4 = new Tuple3<Integer, String, Boolean>(3, "a1", true);
list1.add(tuple4);
tuple4 = new Tuple3<Integer, String, Boolean>(4, "b1", true);
list1.add(tuple4);
DataSource<Tuple3<Integer, String, Boolean>> source = env.fromCollection(list);
DataSource<Tuple3<Integer, String, Boolean>> source1 = env.fromCollection(list1);
source.union(source1).print();
---------------------------------------------------------------------------------------------------------------------------------
结果:
(0,a,false)
(0,a1,false)
(3,a,true)
(2,a1,true)
(4,b,true)
(3,a1,true)
(5,b,true)
(4,b1,true)
(0,b,true)
Rebalance
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ArrayList<Tuple3<Integer, String, Boolean>> list = new ArrayList<>();
Tuple3 tuple3 = new Tuple3<Integer, String, Boolean>(0, "a", false);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(3, "a", true);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(4, "b", true);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(5, "b", true);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(0, "b", true);
list.add(tuple3);
DataSource<Tuple3<Integer, String, Boolean>> source = env.fromCollection(list);
source.mapPartition(new RichMapPartitionFunction<Tuple3<Integer,String,Boolean>, Tuple4<Integer,String,Boolean,String>>() {
@Override
public void mapPartition(Iterable<Tuple3<Integer, String, Boolean>> values, Collector<Tuple4<Integer, String, Boolean,String>> out)
throws
Exception {
values.forEach(x->{
out.collect(new Tuple4<Integer, String, Boolean,String>(x.f0,x.f1,x.f2,getRuntimeContext().getIndexOfThisSubtask()+""));
});
}
}).rebalance().mapPartition(new RichMapPartitionFunction<Tuple4<Integer,String,Boolean,String>, Tuple5<Integer,String,Boolean,String,String>>() {
@Override
public void mapPartition(Iterable<Tuple4<Integer,String,Boolean,String>> values, Collector<Tuple5<Integer, String, Boolean,String,
String>> out)
throws
Exception {
values.forEach(x->{
out.collect(new Tuple5<Integer, String, Boolean,String,String>(x.f0,x.f1,x.f2,x.f3,getRuntimeContext().getIndexOfThisSubtask()
+""));
});
}
}).print();
---------------------------------------------------------------------------------------------------------------------------------
结果:
(0,a,false,0,0)
(3,a,true,0,1)
(4,b,true,0,2)
(5,b,true,0,3)
(0,b,true,0,4)
PartitionBy
-
partitionByHash
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); ArrayList<Tuple3<Integer, String, Boolean>> list = new ArrayList<>(); Tuple3 tuple3 = new Tuple3<Integer, String, Boolean>(0, "a", false); list.add(tuple3); tuple3 = new Tuple3<Integer, String, Boolean>(3, "a", true); list.add(tuple3); tuple3 = new Tuple3<Integer, String, Boolean>(4, "b", true); list.add(tuple3); tuple3 = new Tuple3<Integer, String, Boolean>(5, "b", true); list.add(tuple3); tuple3 = new Tuple3<Integer, String, Boolean>(0, "b", true); list.add(tuple3); DataSource<Tuple3<Integer, String, Boolean>> source = env.fromCollection(list); source.mapPartition(new RichMapPartitionFunction<Tuple3<Integer,String,Boolean>, Tuple4<Integer,String,Boolean,String>>() { @Override public void mapPartition(Iterable<Tuple3<Integer, String, Boolean>> values, Collector<Tuple4<Integer, String, Boolean,String>> out) throws Exception { values.forEach(x->{ out.collect(new Tuple4<Integer, String, Boolean,String>(x.f0,x.f1,x.f2,getRuntimeContext().getIndexOfThisSubtask()+"")); }); } }).partitionByHash(0).mapPartition(new RichMapPartitionFunction<Tuple4<Integer,String,Boolean,String>, Tuple5<Integer,String,Boolean, String,String>>() { @Override public void mapPartition(Iterable<Tuple4<Integer,String,Boolean,String>> values, Collector<Tuple5<Integer, String, Boolean,String, String>> out) throws Exception { values.forEach(x->{ out.collect(new Tuple5<Integer, String, Boolean,String,String>(x.f0,x.f1,x.f2,x.f3,getRuntimeContext().getIndexOfThisSubtask() +"")); }); } }).print(); ----------------------------------------------------------------------------------------------------------------------------- 结果: (3,a,true,0,1) (0,a,false,0,6) (5,b,true,0,6) (0,b,true,0,6) (4,b,true,0,7) -
partitionByRange
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); ArrayList<Tuple3<Integer, String, Boolean>> list = new ArrayList<>(); Tuple3 tuple3 = new Tuple3<Integer, String, Boolean>(0, "a", false); list.add(tuple3); tuple3 = new Tuple3<Integer, String, Boolean>(3, "a", true); list.add(tuple3); tuple3 = new Tuple3<Integer, String, Boolean>(4, "b", true); list.add(tuple3); tuple3 = new Tuple3<Integer, String, Boolean>(5, "b", true); list.add(tuple3); tuple3 = new Tuple3<Integer, String, Boolean>(0, "b", true); list.add(tuple3); DataSource<Tuple3<Integer, String, Boolean>> source = env.fromCollection(list); source.mapPartition(new RichMapPartitionFunction<Tuple3<Integer,String,Boolean>, Tuple4<Integer,String,Boolean,String>>() { @Override public void mapPartition(Iterable<Tuple3<Integer, String, Boolean>> values, Collector<Tuple4<Integer, String, Boolean,String>> out) throws Exception { values.forEach(x->{ out.collect(new Tuple4<Integer, String, Boolean,String>(x.f0,x.f1,x.f2,getRuntimeContext().getIndexOfThisSubtask()+"")); }); } }).partitionByRange(0).mapPartition(new RichMapPartitionFunction<Tuple4<Integer,String,Boolean,String>, Tuple5<Integer,String,Boolean, String,String>>() { @Override public void mapPartition(Iterable<Tuple4<Integer,String,Boolean,String>> values, Collector<Tuple5<Integer, String, Boolean,String, String>> out) throws Exception { values.forEach(x->{ out.collect(new Tuple5<Integer, String, Boolean,String,String>(x.f0,x.f1,x.f2,x.f3,getRuntimeContext().getIndexOfThisSubtask() +"")); }); } }).print(); ----------------------------------------------------------------------------------------------------------------------------- 结果: (0,a,false,0,1) (0,b,true,0,1) (3,a,true,0,3) (4,b,true,0,5) (5,b,true,0,6) -
sortPartition
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); ArrayList<Tuple3<Integer, String, Boolean>> list = new ArrayList<>(); Tuple3 tuple3 = new Tuple3<Integer, String, Boolean>(0, "a", false); list.add(tuple3); tuple3 = new Tuple3<Integer, String, Boolean>(3, "a", true); list.add(tuple3); tuple3 = new Tuple3<Integer, String, Boolean>(4, "b", true); list.add(tuple3); tuple3 = new Tuple3<Integer, String, Boolean>(5, "b", true); list.add(tuple3); tuple3 = new Tuple3<Integer, String, Boolean>(0, "b", true); list.add(tuple3); DataSource<Tuple3<Integer, String, Boolean>> source = env.fromCollection(list); source.mapPartition(new RichMapPartitionFunction<Tuple3<Integer,String,Boolean>, Tuple3<Integer,String,Boolean>>() { @Override public void mapPartition(Iterable<Tuple3<Integer, String, Boolean>> values, Collector<Tuple3<Integer, String, Boolean>> out) throws Exception { values.forEach(x-> { System.out.println(x); out.collect(x); }); System.out.println("=============="); } }).sortPartition(0,Order.ASCENDING).mapPartition(new RichMapPartitionFunction<Tuple3<Integer,String,Boolean>, Tuple3<Integer,String,Boolean>>() { @Override public void mapPartition(Iterable<Tuple3<Integer, String, Boolean>> values, Collector<Tuple3<Integer, String, Boolean>> out) throws Exception { values.forEach(x-> { System.out.println(x); out.collect(x); }); } }).count(); ----------------------------------------------------------------------------------------------------------------------------- 结果: (0,a,false) (3,a,true) (4,b,true) (5,b,true) (0,b,true) ============== (0,a,false) (0,b,true) (3,a,true) (4,b,true) (5,b,true)
First
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ArrayList<Tuple3<Integer, String, Boolean>> list = new ArrayList<>();
Tuple3 tuple3 = new Tuple3<Integer, String, Boolean>(0, "a", false);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(3, "a", true);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(4, "b", true);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(5, "b", true);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(0, "b", true);
list.add(tuple3);
DataSource<Tuple3<Integer, String, Boolean>> source = env.fromCollection(list);
source.first(2).print();
System.out.println("=====================");
source.groupBy(1).first(2).print();
---------------------------------------------------------------------------------------------------------------------------------
结果:
(0,a,false)
(3,a,true)
=====================
(0,a,false)
(3,a,true)
(4,b,true)
(5,b,true)
+++
KeyBy
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ArrayList<Tuple3<Integer, String, Boolean>> list = new ArrayList<>();
Tuple3 tuple3 = new Tuple3<Integer, String, Boolean>(0, "a", false);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(3, "a", true);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(4, "b", true);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(5, "b", true);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(0, "b", true);
list.add(tuple3);
DataStreamSource<Tuple3<Integer, String, Boolean>> source = env.fromCollection(list);
source.map(new RichMapFunction<Tuple3<Integer,String,Boolean>, Tuple4<Integer,String,Boolean,String>>() {
@Override
public Tuple4<Integer, String, Boolean,String> map(Tuple3<Integer, String, Boolean> value) throws Exception {
return new Tuple4<Integer, String, Boolean,String>(value.f0,value.f1,value.f2,getRuntimeContext().getIndexOfThisSubtask()+"");
}
}).keyBy(1).process(new KeyedProcessFunction<Tuple, Tuple4<Integer,String,Boolean,String>, Tuple5<Integer,String,Boolean,String,
String>>
() {
@Override
public void processElement(Tuple4<Integer, String, Boolean,String> value, Context ctx, Collector<Tuple5<Integer, String, Boolean,String,
String>> out)
throws Exception {
out.collect(new Tuple5<>(value.f0,value.f1,value.f2,value.f3,getRuntimeContext().getIndexOfThisSubtask()+""));
}
}).print();
env.execute();
---------------------------------------------------------------------------------------------------------------------------------
结果:
2> (0,b,true,0,1)
2> (4,b,true,6,1)
2> (5,b,true,7,1)
6> (0,a,false,4,5)
6> (3,a,true,5,5)
2>2表示执行该任务的线程号,就是getRuntimeContext().getIndexOfThisSubtask()+1
Fold
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ArrayList<Tuple3<Integer, String, Boolean>> list = new ArrayList<>();
Tuple3 tuple3 = new Tuple3<Integer, String, Boolean>(0, "a", false);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(3, "a", true);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(4, "b", true);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(5, "b", true);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(0, "b", true);
list.add(tuple3);
DataStreamSource<Tuple3<Integer, String, Boolean>> source = env.fromCollection(list);
source.keyBy(1).fold("start", new FoldFunction<Tuple3<Integer, String, Boolean>, String>() {
@Override
public String fold(String accumulator, Tuple3<Integer, String, Boolean> value) throws Exception {
return accumulator+"-"+value.f0+","+value.f1+","+value.f2;
}
}).print();
---------------------------------------------------------------------------------------------------------------------------------
结果:
2> start-4,b,true
6> start-0,a,false
2> start-4,b,true-5,b,true
6> start-0,a,false-3,a,true
2> start-4,b,true-5,b,true-0,b,true
fold is deprecated
Window
-
window
需要对数据流先进行keyBy操作,将数据流按照每个key进行分区,再在每个key分区上进行window操作,每个key分区一个window会有很多window。
-
timeWindow
- timeWindow(size)
window(TumblingProcessingTimeWindows.of(size)) or window(TumblingEventTimeWindows.of(size))
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); ArrayList<Tuple3<Integer, String, Boolean>> list = new ArrayList<>(); Tuple3 tuple3 = new Tuple3<Integer, String, Boolean>(0, "a", false); list.add(tuple3); tuple3 = new Tuple3<Integer, String, Boolean>(3, "a", true); list.add(tuple3); tuple3 = new Tuple3<Integer, String, Boolean>(4, "b", true); list.add(tuple3); tuple3 = new Tuple3<Integer, String, Boolean>(5, "b", true); list.add(tuple3); tuple3 = new Tuple3<Integer, String, Boolean>(0, "b", true); list.add(tuple3); DataStreamSource<Tuple3<Integer, String, Boolean>> source = env.fromCollection(list); source.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple3<Integer, String, Boolean>>() { @Override public long extractAscendingTimestamp(Tuple3<Integer, String, Boolean> element) { return System.currentTimeMillis(); } }).keyBy(1).timeWindow(Time.milliseconds(1)) // .window(TumblingEventTimeWindows.of(Time.milliseconds(1))) .apply(new WindowFunction<Tuple3<Integer,String,Boolean>, Tuple3<Integer,String,Boolean>, Tuple, TimeWindow>() { @Override public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple3<Integer, String, Boolean>> input, Collector<Tuple3<Integer, String, Boolean>> out) throws Exception { input.forEach(out::collect); } }).print(); ----------------------------------------------------------------------------------------------------------------------- 结果: 2> (4,b,true) 2> (5,b,true) 2> (0,b,true) 6> (0,a,false) 6> (3,a,true) ``` - timeWindow(size,slide) window(SlidingProcessingTimeWindows.of(size, slide)) or window(SlidingEventTimeWindows.of(size, slide)) ```java StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); ArrayList<Tuple3<Integer, String, Boolean>> list = new ArrayList<>(); Tuple3 tuple3 = new Tuple3<Integer, String, Boolean>(0, "a", false); list.add(tuple3); tuple3 = new Tuple3<Integer, String, Boolean>(3, "a", true); list.add(tuple3); tuple3 = new Tuple3<Integer, String, Boolean>(4, "b", true); list.add(tuple3); tuple3 = new Tuple3<Integer, String, Boolean>(5, "b", true); list.add(tuple3); tuple3 = new Tuple3<Integer, String, Boolean>(0, "b", true); list.add(tuple3); DataStreamSource<Tuple3<Integer, String, Boolean>> source = env.fromCollection(list); source.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple3<Integer, String, Boolean>>() { @Override public long extractAscendingTimestamp(Tuple3<Integer, String, Boolean> element) { return System.currentTimeMillis(); } }).keyBy(1).timeWindow(Time.milliseconds(1),Time.milliseconds(1)) // .window(SlidingEventTimeWindows.of(Time.milliseconds(1), Time.milliseconds(1))) .apply(new WindowFunction<Tuple3<Integer,String,Boolean>, Tuple3<Integer,String,Boolean>, Tuple, TimeWindow>() { @Override public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple3<Integer, String, Boolean>> input, Collector<Tuple3<Integer, String, Boolean>> out) throws Exception { input.forEach(out::collect); } }).print(); ----------------------------------------------------------------------------------------------------------------------- 结果: 2> (4,b,true) 6> (0,a,false) 2> (5,b,true) 2> (0,b,true) 6> (3,a,true) ``` - timeWindow(size)
-
countWindow
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); ArrayList<Tuple3<Integer, String, Boolean>> list = new ArrayList<>(); Tuple3 tuple3 = new Tuple3<Integer, String, Boolean>(0, "a", false); list.add(tuple3); tuple3 = new Tuple3<Integer, String, Boolean>(3, "a", true); list.add(tuple3); tuple3 = new Tuple3<Integer, String, Boolean>(4, "b", true); list.add(tuple3); tuple3 = new Tuple3<Integer, String, Boolean>(5, "b", true); list.add(tuple3); tuple3 = new Tuple3<Integer, String, Boolean>(0, "b", true); list.add(tuple3); DataStreamSource<Tuple3<Integer, String, Boolean>> source = env.fromCollection(list); source.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple3<Integer, String, Boolean>>() { @Override public long extractAscendingTimestamp(Tuple3<Integer, String, Boolean> element) { return System.currentTimeMillis(); } }).keyBy(1).countWindow(2).apply(new WindowFunction<Tuple3<Integer,String,Boolean>, Tuple3<Integer,String,Boolean>, Tuple, GlobalWindow>() { @Override public void apply(Tuple tuple, GlobalWindow window, Iterable<Tuple3<Integer, String, Boolean>> input, Collector<Tuple3<Integer, String, Boolean>> out) throws Exception { input.forEach(out::collect); } }).print(); ----------------------------------------------------------------------------------------------------------------------- 结果: 2> (4,b,true) 6> (0,a,false) 2> (5,b,true) 6> (3,a,true)
-
-
windowAll
不用进行分区直接对数据流进行window操作,所有key都在这一个window上进行操作,只有一个window
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); ArrayList<Tuple3<Integer, String, Boolean>> list = new ArrayList<>(); Tuple3 tuple3 = new Tuple3<Integer, String, Boolean>(0, "a", false); list.add(tuple3); tuple3 = new Tuple3<Integer, String, Boolean>(3, "a", true); list.add(tuple3); tuple3 = new Tuple3<Integer, String, Boolean>(4, "b", true); list.add(tuple3); tuple3 = new Tuple3<Integer, String, Boolean>(5, "b", true); list.add(tuple3); tuple3 = new Tuple3<Integer, String, Boolean>(0, "b", true); list.add(tuple3); DataStreamSource<Tuple3<Integer, String, Boolean>> source = env.fromCollection(list); source.windowAll(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(3))).apply(new AllWindowFunction<Tuple3<Integer,String,Boolean>, Tuple3<Integer,String,Boolean>, GlobalWindow>() { @Override public void apply(GlobalWindow window, Iterable<Tuple3<Integer, String, Boolean>> values, Collector<Tuple3<Integer, String, Boolean>> out) throws Exception { values.forEach(out::collect); } }).print(); ----------------------------------------------------------------------------------------------------------------------------- 结果: 1> (3,a,true) 8> (0,a,false) 2> (4,b,true)
Connect
将两个具有相同或者不同结构的流,融合成一个流。两个流的元素类型可以不同。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
ArrayList<Tuple3<Integer, String, Boolean>> list = new ArrayList<>();
Tuple3 tuple3 = new Tuple3<Integer, String, Boolean>(0, "a", false);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(3, "a", true);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(4, "b", true);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(5, "b", true);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(0, "b", true);
list.add(tuple3);
ArrayList<Tuple3<Integer, String, Boolean>> list1 = new ArrayList<>();
Tuple3 tuple4 = new Tuple3<Integer, String, Boolean>(0, "a1", false);
list1.add(tuple4);
tuple4 = new Tuple3<Integer, String, Boolean>(2, "a1", true);
list1.add(tuple4);
tuple4 = new Tuple3<Integer, String, Boolean>(3, "a1", true);
list1.add(tuple4);
tuple4 = new Tuple3<Integer, String, Boolean>(4, "b1", true);
list1.add(tuple4);
DataStreamSource<Tuple3<Integer, String, Boolean>> source = env.fromCollection(list);
DataStreamSource<Tuple3<Integer, String, Boolean>> source1 = env.fromCollection(list1);
source.connect(source1).process(new CoProcessFunction<Tuple3<Integer,String,Boolean>, Tuple3<Integer,String,Boolean>, Tuple4<Integer,
String,Boolean,String>>() {
@Override
public void processElement1(Tuple3<Integer, String, Boolean> value, Context ctx, Collector<Tuple4<Integer, String, Boolean,String>> out)
throws Exception {
out.collect(new Tuple4<>(value.f0,value.f1,value.f2,"1"));
}
@Override
public void processElement2(Tuple3<Integer, String, Boolean> value, Context ctx, Collector<Tuple4<Integer, String, Boolean,String>> out) throws Exception {
out.collect(new Tuple4<>(value.f0,value.f1,value.f2,"2"));
}
}).print();
---------------------------------------------------------------------------------------------------------------------------------
结果:
6> (4,b1,true,2)
1> (4,b,true,1)
4> (2,a1,true,2)
7> (0,a,false,1)
3> (0,b,true,1)
2> (5,b,true,1)
3> (0,a1,false,2)
8> (3,a,true,1)
5> (3,a1,true,2)
CoMap
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
ArrayList<Tuple3<Integer, String, Boolean>> list = new ArrayList<>();
Tuple3 tuple3 = new Tuple3<Integer, String, Boolean>(0, "a", false);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(3, "a", true);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(4, "b", true);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(5, "b", true);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(0, "b", true);
list.add(tuple3);
ArrayList<Tuple3<Integer, String, Boolean>> list1 = new ArrayList<>();
Tuple3 tuple4 = new Tuple3<Integer, String, Boolean>(0, "a1", false);
list1.add(tuple4);
tuple4 = new Tuple3<Integer, String, Boolean>(2, "a1", true);
list1.add(tuple4);
tuple4 = new Tuple3<Integer, String, Boolean>(3, "a1", true);
list1.add(tuple4);
tuple4 = new Tuple3<Integer, String, Boolean>(4, "b1", true);
list1.add(tuple4);
DataStreamSource<Tuple3<Integer, String, Boolean>> source = env.fromCollection(list);
DataStreamSource<Tuple3<Integer, String, Boolean>> source1 = env.fromCollection(list1);
source.connect(source1).map(new CoMapFunction<Tuple3<Integer,String,Boolean>, Tuple3<Integer,String,Boolean>, Tuple4<Integer,String,
Boolean,String>>() {
@Override
public Tuple4<Integer,String, Boolean,String> map1(Tuple3<Integer, String, Boolean> value) throws Exception {
return new Tuple4<>(value.f0,value.f1,value.f2,"1");
}
@Override
public Tuple4<Integer,String, Boolean,String> map2(Tuple3<Integer, String, Boolean> value) throws Exception {
return new Tuple4<>(value.f0,value.f1,value.f2,"2");
}
}).print();
---------------------------------------------------------------------------------------------------------------------------------
结果:
8> (4,b,true,1)
3> (2,a1,true,2)
6> (0,a,false,1)
1> (5,b,true,1)
2> (0,b,true,1)
7> (3,a,true,1)
4> (3,a1,true,2)
5> (4,b1,true,2)
2> (0,a1,false,2)
CoFlatMap
---------------------------------------------------------------------------------------------------------------------------------
结果:
Split
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
ArrayList<Tuple3<Integer, String, Boolean>> list = new ArrayList<>();
Tuple3 tuple3 = new Tuple3<Integer, String, Boolean>(0, "a", false);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(3, "a", true);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(4, "b", true);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(5, "b", true);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(0, "b", true);
list.add(tuple3);
DataStreamSource<Tuple3<Integer, String, Boolean>> source = env.fromCollection(list);
source.split(new OutputSelector<Tuple3<Integer, String, Boolean>>() {
@Override
public Iterable<String> select(Tuple3<Integer, String, Boolean> value) {
ArrayList<String> list = new ArrayList<>();
if (value.f0%2==0){
list.add("1");
}else {
list.add("2");
}
return list;
}
}).select("1" ).print();
---------------------------------------------------------------------------------------------------------------------------------
结果:
5> (4,b,true)
6> (0,b,true)
4> (0,a,false)
Select
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
ArrayList<Tuple3<Integer, String, Boolean>> list = new ArrayList<>();
Tuple3 tuple3 = new Tuple3<Integer, String, Boolean>(0, "a", false);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(3, "a", true);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(4, "b", true);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(5, "b", true);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(0, "b", true);
list.add(tuple3);
DataStreamSource<Tuple3<Integer, String, Boolean>> source = env.fromCollection(list);
source.split(new OutputSelector<Tuple3<Integer, String, Boolean>>() {
@Override
public Iterable<String> select(Tuple3<Integer, String, Boolean> value) {
ArrayList<String> list = new ArrayList<>();
if (value.f0%2==0){
list.add("1");
}else {
list.add("2");
}
return list;
}
}).select("1" ).print();
---------------------------------------------------------------------------------------------------------------------------------
结果:
5> (4,b,true)
6> (0,b,true)
4> (0,a,false)
PartitionCustom
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
ArrayList<Tuple3<Integer, String, Boolean>> list = new ArrayList<>();
Tuple3 tuple3 = new Tuple3<Integer, String, Boolean>(0, "a", false);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(3, "a", true);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(4, "b", true);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(5, "b", true);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(0, "b", true);
list.add(tuple3);
DataStreamSource<Tuple3<Integer, String, Boolean>> source = env.fromCollection(list);
source.map(new RichMapFunction<Tuple3<Integer,String,Boolean>, Tuple4<Integer,String,Boolean,String>>() {
@Override
public Tuple4<Integer,String,Boolean,String> map(Tuple3<Integer, String, Boolean> value) throws Exception {
return new Tuple4<>(value.f0,value.f1,value.f2,getRuntimeContext().getIndexOfThisSubtask()+"");
}
}).partitionCustom(new Partitioner<String>() {
@Override
public int partition(String key, int numPartitions) {
return key.hashCode()%numPartitions;
}
},1).map(new RichMapFunction<Tuple4<Integer,String,Boolean,String>, Tuple5<Integer,String,Boolean,String,String>>() {
@Override
public Tuple5<Integer,String,Boolean,String,String> map(Tuple4<Integer,String,Boolean,String> value) throws Exception {
return new Tuple5<>(value.f0, value.f1, value.f2, value.f3,getRuntimeContext().getIndexOfThisSubtask()+"");
}
}).print();
---------------------------------------------------------------------------------------------------------------------------------
结果:
2> (3,a,true,6,1)
2> (0,a,false,5,1)
3> (0,b,true,1,2)
3> (5,b,true,0,2)
3> (4,b,true,7,2)
Shuffle
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
ArrayList<Tuple3<Integer, String, Boolean>> list = new ArrayList<>();
Tuple3 tuple3 = new Tuple3<Integer, String, Boolean>(0, "a", false);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(3, "a", true);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(4, "b", true);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(5, "b", true);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(0, "b", true);
list.add(tuple3);
DataStreamSource<Tuple3<Integer, String, Boolean>> source = env.fromCollection(list);
source.map(new RichMapFunction<Tuple3<Integer,String,Boolean>, Tuple4<Integer,String,Boolean,String>>() {
@Override
public Tuple4<Integer,String,Boolean,String> map(Tuple3<Integer, String, Boolean> value) throws Exception {
return new Tuple4<>(value.f0, value.f1, value.f2,getRuntimeContext().getIndexOfThisSubtask()+"");
}
}).shuffle().map(new RichMapFunction<Tuple4<Integer,String,Boolean,String>, Tuple5<Integer,String,Boolean,String,String>>() {
@Override
public Tuple5<Integer,String,Boolean,String,String> map(Tuple4<Integer, String, Boolean, String> value) throws Exception {
return new Tuple5<>(value.f0, value.f1, value.f2, value.f3,getRuntimeContext().getIndexOfThisSubtask()+"");
}
}).print();
---------------------------------------------------------------------------------------------------------------------------------
结果:
8> (0,b,true,0,7)
8> (4,b,true,6,7)
8> (3,a,true,5,7)
8> (0,a,false,4,7)
8> (5,b,true,7,7)
Rebalance
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
ArrayList<Tuple3<Integer, String, Boolean>> list = new ArrayList<>();
Tuple3 tuple3 = new Tuple3<Integer, String, Boolean>(0, "a", false);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(3, "a", true);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(4, "b", true);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(5, "b", true);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(0, "b", true);
list.add(tuple3);
DataStreamSource<Tuple3<Integer, String, Boolean>> source = env.fromCollection(list);
source.map(new RichMapFunction<Tuple3<Integer,String,Boolean>, Tuple4<Integer,String,Boolean,String>>() {
@Override
public Tuple4<Integer,String,Boolean,String> map(Tuple3<Integer, String, Boolean> value) throws Exception {
return new Tuple4<>(value.f0, value.f1, value.f2,getRuntimeContext().getIndexOfThisSubtask()+"");
}
}).rebalance().map(new RichMapFunction<Tuple4<Integer,String,Boolean,String>, Tuple5<Integer,String,Boolean,String,String>>() {
@Override
public Tuple5<Integer,String,Boolean,String,String> map(Tuple4<Integer, String, Boolean, String> value) throws Exception {
return new Tuple5<>(value.f0, value.f1, value.f2, value.f3,getRuntimeContext().getIndexOfThisSubtask()+"");
}
}).print();
---------------------------------------------------------------------------------------------------------------------------------
结果:
2> (5,b,true,6,1)
6> (0,b,true,7,5)
7> (0,a,false,3,6)
5> (3,a,true,4,4)
3> (4,b,true,5,2)
Rescale
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
ArrayList<Tuple3<Integer, String, Boolean>> list = new ArrayList<>();
Tuple3 tuple3 = new Tuple3<Integer, String, Boolean>(0, "a", false);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(3, "a", true);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(4, "b", true);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(5, "b", true);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(0, "b", true);
list.add(tuple3);
DataStreamSource<Tuple3<Integer, String, Boolean>> source = env.fromCollection(list);
source.map(new RichMapFunction<Tuple3<Integer,String,Boolean>, Tuple4<Integer,String,Boolean,String>>() {
@Override
public Tuple4<Integer,String,Boolean,String> map(Tuple3<Integer, String, Boolean> value) throws Exception {
return new Tuple4<>(value.f0, value.f1, value.f2,getRuntimeContext().getIndexOfThisSubtask()+"");
}
}).rescale().map(new RichMapFunction<Tuple4<Integer,String,Boolean,String>, Tuple5<Integer,String,Boolean,String,String>>() {
@Override
public Tuple5<Integer,String,Boolean,String,String> map(Tuple4<Integer, String, Boolean, String> value) throws Exception {
return new Tuple5<>(value.f0, value.f1, value.f2, value.f3,getRuntimeContext().getIndexOfThisSubtask()+"");
}
}).print();
---------------------------------------------------------------------------------------------------------------------------------
结果:
4> (0,b,true,3,3)
3> (5,b,true,2,2)
2> (4,b,true,1,1)
1> (3,a,true,0,0)
8> (0,a,false,7,7)
Broadcast
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
ArrayList<Tuple3<Integer, String, Boolean>> list = new ArrayList<>();
Tuple3 tuple3 = new Tuple3<Integer, String, Boolean>(0, "a", false);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(3, "a", true);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(4, "b", true);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(5, "b", true);
list.add(tuple3);
tuple3 = new Tuple3<Integer, String, Boolean>(0, "b", true);
list.add(tuple3);
DataStreamSource<Tuple3<Integer, String, Boolean>> source = env.fromCollection(list);
source.map(new RichMapFunction<Tuple3<Integer,String,Boolean>, Tuple4<Integer,String,Boolean,String>>() {
@Override
public Tuple4<Integer,String,Boolean,String> map(Tuple3<Integer, String, Boolean> value) throws Exception {
return new Tuple4<>(value.f0, value.f1, value.f2,getRuntimeContext().getIndexOfThisSubtask()+"");
}
}).broadcast().map(new RichMapFunction<Tuple4<Integer,String,Boolean,String>, Tuple5<Integer,String,Boolean,String,String>>() {
@Override
public Tuple5<Integer,String,Boolean,String,String> map(Tuple4<Integer, String, Boolean, String> value) throws Exception {
return new Tuple5<>(value.f0, value.f1, value.f2, value.f3,getRuntimeContext().getIndexOfThisSubtask()+"");
}
}).print();
---------------------------------------------------------------------------------------------------------------------------------
结果:
5> (0,a,false,1,4)
6> (5,b,true,4,5)
8> (5,b,true,4,7)
5> (5,b,true,4,4)
3> (4,b,true,3,2)
5> (3,a,true,2,4)
5> (4,b,true,3,4)
3> (3,a,true,2,2)
5> (0,b,true,5,4)
4> (3,a,true,2,3)
3> (0,a,false,1,2)
4> (0,a,false,1,3)
3> (5,b,true,4,2)
4> (5,b,true,4,3)
3> (0,b,true,5,2)
4> (4,b,true,3,3)
1> (0,a,false,1,0)
4> (0,b,true,5,3)
2> (5,b,true,4,1)
2> (4,b,true,3,1)
2> (3,a,true,2,1)
1> (5,b,true,4,0)
2> (0,a,false,1,1)
2> (0,b,true,5,1)
8> (0,a,false,1,7)
8> (3,a,true,2,7)
8> (0,b,true,5,7)
8> (4,b,true,3,7)
6> (3,a,true,2,5)
7> (3,a,true,2,6)
6> (4,b,true,3,5)
1> (4,b,true,3,0)
7> (0,a,false,1,6)
6> (0,a,false,1,5)
7> (5,b,true,4,6)
1> (3,a,true,2,0)
6> (0,b,true,5,5)
1> (0,b,true,5,0)
7> (0,b,true,5,6)
7> (4,b,true,3,6)
分区策略
GlobalPartitioner
ShufflePartitioner
RebalancePartitioner
RescalePartitioner
BroadcastPartitioner
ForwardPartitioner
KeyGroupStreamPartitioner
CustomPartitionerWrapper
数据转换策略
forward(转发)
数据从一个算子一对一的转换到下游另一个算子,同机器无网络传送
broadcast(广播)
数据复制一份从上一个算子发送到下游每一个算子
key-based(基于键值)
根据key将数据按照key发送到下游算子
random(随机)
数据均匀分配至下游算子
组件
JobManager
ResourceManager
TaskManager
Dispatcher
+++
Application
- 启动:flink run -c mainclasspath jarpath
- 取消:flink cancel jobid
- 停止:flink stop jobid
Job
Task
job中的一个阶段就是一个task,一个task包括链条连接的多个subtask,一个task运行在一个线程里面,task平分slot里面的内存资源共享slot里面的cpu资源
SubTask
flink job中最小执行单元
算子
Source
使用EventTime,划分滚动窗口,如果使用的是并行的Source,例如KafkaSource,创建Kafka的Topic时有多个分区,每个Source的分区都要满足触发的条件,整个窗口才会被触发
Transformation
- map //DataStream - > DataStream
- flatMap //DataStream -> DataStream
- filter //DataStream -> DataStream
- keyBy //DataStream -> KeyStream
Sink
writeAsCsv必须是元组才能正常写入
WaterMark
决定一个窗口什么时候激活(触发),这时的窗口的最大长度为
materMark>=上一个窗口的结束边界就会触发窗口执行
watermark是flink中窗口延迟触发的机制
在每个算子内部都自己有一个事件时间时钟,事件时间时钟是根据watermark来更新的,。流入算子的数据可能是单分区也可能是多分区的,每个流入算子的分区端都会有一个自己的partition watermark标记,当该分区内进入新的高于之前watermark的watermark数据时,partition watermark标记才会被更新,task内部也维护一个task的watermark数据。如果某个分区的partition watermark < task watermark,那么task watermark会更新为该partition watermark数据,然后把task 把当前更新的task watermark数据发向下游task。
AssignerWithPeriodicWatermarks
windowmax=watermark+windowsize waterMark=数据携带的时间(窗口中最大的时间)-延迟执行的时间
周期性的生成watermark,定期向分区数据流中插入时间水印。默认周期时间为200毫秒,可以使用setAutoWatermakrIntaval()来设置 BoundedOutOfOrdernessTimestampExtractor继承自AssignerWithPeriodicWatermarks 属于周期性watermark
周期性数据水印在特定条件下可能会造成数据错误 例如:env.fromCollection(List((1, “a1”, 158324361000l), (1, “a2”, 158324369000l), (1, “a3”, 158324364000l), (1, “a4”, 158324361000l), (1, “a5”,158324365000l), (1, “a6”, 158324362000l), (1, “a7”,158324367000l))) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(Int, String, Long) { override def extractTimestamp(element: (Int, String, Long)): Long = { element._3 } }) .keyBy(0) .window(TumblingEventTimeWindows.of(Time.seconds(4))) .sum(2) .print() 当使用周期性水印AssignerWithPeriodicWatermarks时就会造成数据的错误计算 是因为周期性水印是定期产生的(默认200毫秒)但是在这个周期里可能会出现有多个数据已经过去,这多个数据用于一个水印从而造成数据计算错误
AssignerWithPunctuatedWatermarks
根据事件生成watermark。可以用于根据具体数据来生成watermark,
Window
Keyed Window
使用keyby后流的窗口
GlobalWindow
CountWindow
TimeWindow
- Tumbling
- Sliding
- Session
Non-Keyed Windows
未使用keyby后流的窗口
windowAll
Window 之后的算子
Trigger
window 数据触发器,keyed or non-keyed window 都可以使用
- EventTimeTrigger:事件时间触发器
- ProcessingTimeTrigger:程序时间触发器
- CountTrigger:数量出发器。只发送窗口触发信号
- PurgingTrigger:代理模式触发器,发送窗口触发和数据清理信号
Evictor
window 数据剔除器,可以在window执行前或者执行后剔除window内的元素
- CountEvictor:数量剔除器。在Window中保留指定数量的元素,并从窗口头部开始丢弃其余元素。
- DeltaEvictor: 阈值剔除器。计算Window中最后一个元素与其余每个元素之间的增量,丢弃增量大于或等于阈值的元素。
- TimeEvictor:时间剔除器。保留Window中最近一段时间内的元素,并丢弃其余元素。
AllowedLateness
决定一个窗口什么时候销毁,window 延迟数据是否保留计算,可能会造成窗口的二次触发,会导致结果数据的更新,造成数据不一致。这时窗口的最大长度为windowmax=waterma+windowsize+allowedleteness
State
Managed State
Operator State
operator state 绑定到每个算子的实例上,各个实例拥有自己的state,一个实例无法获取同并行的其他实例的state数据
operator state :记录的是每一个分区的偏移量
Keyed State
keyedstate:在一个subtask中可能有多个state,一个组对应一个key的状态
Raw State
CheckPoint
全自动程序管理,轻量快捷算子级数据快照
开启flink checkpoint 后设置精准一次消费,kafka的offset会保存在savepoint设置的路径里面,还会降offset保存在kafka 特殊topic里面,如果程序重启时没有指定savepiont保存数据的地址会默认根据kafka 特殊topic保存的偏移量消费数据,可以设置不降offset保存在kafka 特殊topic里面使用,setCommitOffsetOnCheckpoints(false)
开启检查点机制
// start a checkpoint every 1000 ms
env.enableCheckpointing(1000);
// advanced options:
// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(60000);
// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// enable externalized checkpoints which are retained after job cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// allow job recovery fallback to checkpoint when there is a more recent savepoint
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
Barrier
算子checkpoint的依据,是exactly-once 和at-least-once语义的根据
SavePoint
人工参与管理的application级别的数据快照
手动保存数据快照
-
flink stop jobid 停止job并保存快照,如果在flink-conf.yaml上配置了state.savepoints.dir,停止任务后会自动将快照保存。
-
flink stop jobid -p dirpath 停止job并将快照保存在dirpath
-
flink savepoint jobid [dirpath] 在不结束job的情况下保存快照。如果带有dirpath则会将快照保存在此目录否则会保存在默认配置的保存目录
从数据快照恢复程序
-
直接从savepoint目录恢复 flink run -s dirpath 从dirpath目录恢复程序
-
跳过无法恢复的算子恢复 flink run -s dirpath -n
手动清除数据快照
- flink savepoint -d itemdirpath 手动将数据某个具体快照删除(itemdirpath 快照具体根目录)
CEP(Complex Event Processing)
NFA(Nondeterministic Finite Automaton)
反压
flink 三层buff缓存(resultsubpartition,nettybuff,netty中通过高水位来控制buff是否还可以接收数据,socketbuff)
1.5 之前是基于tcp 窗口的反压机制,发送端根据接收端返回的ack和windox size 来发送数据,当window size 为0时,发送端则不会再附上数据,而是发送一个zerowindow 的探测性数据来确定是否可以再次发送数据,当接收端继续可以接收数据时,发送端才会继续发送数据
基于tcp窗口的反压机制缺点 1.单个task造成的反压,会阻断整个TM-TM的socket,连chekcpoint barrier 也无法发送 2.反压路径较长,导致生效延迟较大
1.5 引入credit 机制实现反压,credit 反压机制是类似于tcp 窗口反压实现的另一种反压机制,resultsubpartition在发送数据时会带有resultbuff里面还存有的数据大小 backlog size,inputchanel在接收到时会计算自己当前还能接收到的数据大小,当inputchanel无法再接收数据时会将credit置为0,告诉result不能再接收消息。result每次发送消息时会检测当前自己的credit数据,当credit为0时 则不会再向netty发送数据从而实现反压机制
Flink 保持数据一直性(Exactly-Once)
-
内部保证 —— checkpoint
-
source 端 —— 可重设数据的读取位置
-
sink 端 —— 从故障恢复时,数据不会重复写入外部系统
-
幂等写入
所谓幂等操作,是说一个操作,可以重复执行很多次,但只导致一次结果更改,也就是说,后面再重复执行就不起作用了
从Flink程序sink到的key-value存储中读取数据的应用,在Flink从检查点恢复的过程中,可能会看到不想看到的结果。当重播开始时,之前已经发出的计算结果可能会被更早的结果所覆盖(因为在恢复过程中)。所以,一个消费Flink程序输出数据的应用,可能会观察到时间回退,例如读到了比之前小的计数。
-
事务写入
应用程序中一系列严密的操作,所有操作必须成功完成,否则在每个操作中所作的所有更改都会被撤消。具有原子性:一个事务中的一系列的操作要么全部成功,要么一个都不做。
事务性的方法将不会遭受幂等性写入所遭受的重播不一致的问题。但是,事务性写入却带来了延迟,因为只有在检查点完成以后,我们才能看到计算结果。
Flink提供了两种构建模块来实现事务性sink连接器:write-ahead-log(WAL,预写式日志)sink和两阶段提交sink。
-
预写日志(Write-Ahead-Log , WAL)
(1)把结果数据先当成状态保存,然后在收到chekpoint完成的通知时,一次性写入sink系统 (2)简单易于实现,由于数据提前在状态后端中做了缓存,所以无论什么sink系统,都能用这种方式一批搞定 (3)DataStream API 提供了一个模板类:GenericWriteAheadSink,来实现这种事务性sink
-
两阶段提交(Two-Phase-Commit , 2PC)
(1)对于每个checkpoint,sink任务会启动一个事务,并将接下来所有接受的数据添加到事务里 (2)然后将这些数据写入外部sink系统,但不提交他们——这是只是“预提交” (3)当它收到checkpoint完成的通知时,它才正式提交事务,实现结果的真正写入 (4)这种方式真正实现了exactly-once,它需要一个提供事务支持的外部sink系统。Flink提供了TwoPhaseCommitSinkFunction接口。
-
在使用kafka011 sink 时注意的点:
1.为了保证事务特性,在使用其他程序去消费我们flink sink 数据的kafka时,这个consumer需要设置了
isolation.level = read_committed,那么它只会读取已经提交了的消息。2.Checkpoint超时时间 必需大于 kafka 提交事务时间。
假如checkpoint失败时间高于 kafka事务等待时间,比如,设置了一个checkpoint最多等待10分钟,10分钟后会失败这个checkpoint的保存。而kafka 的事务只能等待5分钟,5分钟后把uncommitted的事务关掉。这个时候6分钟checkpoint成功了,但是对应kafka数据的事务已经失败。这样就无法保证Exactly-once的实现
-
其他
两个流join 必须有等值字段必须都在同一个窗口里面
duplicate key update mysql数据库的更新插入 合为一条sql
并行度: 算子级别 > env级别 > Client级别 > 系统默认级别
在所有Task共享资源槽点名字相同,默认情况下 (pipline) 同一个job的同一个Task中的多个subTask不能在同一个slot槽中
具有并行度的subtask 不能在一个slot槽中 对于同一个job,不同Task【阶段】的subTask可以在同一个资源槽中
- 原文作者:Abiu
- 原文链接:http://bulianwei.github.io/post/Flink/
- 版权声明:本作品采用知识共享署名-非商业性使用-禁止演绎 4.0 国际许可协议进行许可,非商业转载请注明出处(作者,原文链接),商业转载请联系作者获得授权。