package org.example.tmp;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.*;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class WordCount1 {
public static void main(String[] args) throws Exception {
ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
// DataSource<String> stringDataSource = executionEnvironment.fromElements("it hadoop spark flink", "it hadoop spark", "it hadoop", "it");
DataSet<String> ds= executionEnvironment.fromElements("it hadoop spark flink", "it hadoop spark", "it hadoop", "it");
FlatMapOperator<String, String> flatMapOperator = ds.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
String[] split = value.split(" ");
for (String s : split) {
out.collect(s);
}
}
});
MapOperator<String, Tuple2<String, Integer>> mapOperator = flatMapOperator.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
return Tuple2.of(value, 1);
}
});
UnsortedGrouping<Tuple2<String, Integer>> groupBy = mapOperator.groupBy(0);
AggregateOperator<Tuple2<String, Integer>> sum = groupBy.sum(1);
SortPartitionOperator<Tuple2<String, Integer>> sort = sum.sortPartition(1, Order.DESCENDING).setParallelism(1);
sort.print();
//5.触发执行-execute//如果有print,DataSet不需要调用execute,DataStream需要调用execute
// executionEnvironment.execute();
}
}
输出结果为:

package org.example.tmp;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.util.Collector;
public class WordCount2 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
DataStream<String> dataStream = env.fromElements("it hadoop spark flink", "it hadoop spark", "it hadoop", "it");
DataStream<String> flatMap = dataStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
for (String s : value.split(" ")) {
out.collect(s);
}
}
});
DataStream<Tuple2<String, Integer>> mapDataStream = flatMap.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String s) throws Exception {
return Tuple2.of(s, 1);
}
});
KeyedStream<Tuple2<String, Integer>, Tuple> keyBy = mapDataStream.keyBy(0);
DataStream<Tuple2<String, Integer>> sum = keyBy.sum(1).setParallelism(1);
sum.print();
env.execute();
}
}
输出结果为:

问题:为什么下面datastream的形式去写flink的代码,执行sum统计的时候,当我的词的数据量为1的时候,结果没有统计到?
原:
DataStream<String> dataStream = env.fromElements("it hadoop spark flink", "it hadoop spark", "it hadoop", "it");
改为:
DataStream<String> dataStream = env.fromElements("it hadoop spark flink", "it hadoop spark", "it hadoop", "it","flink","test");
这样改的话,test也不会输出来.