Flink入门wordCount
Flink的编程模型
1、获取Flink上下文;
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
2、加载、创建数据;
DataSet
3、数据转换;
Transformation
4、数据结果存放;
5、触发执行。
env.execution
下面为flink输出wordcount数据:
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class FlinkMain {
@SuppressWarnings("serial")public static class LineSplit implements FlatMapFunction>{ @SuppressWarnings("rawtypes") @Override /** * @param value 原数据 * @param out 输出的数据 */ public void flatMap(String value, Collector> out) throws Exception { String[] tokens = value.split(" "); for (String token : tokens) { if(token!=null && token.length()>0){ Tuple2 t = new Tuple2(token,1); out.collect(t); } } }}public static void main(String[] args) throws Exception { //创建flink上下文 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //创建数据集 DataSet text = env.fromElements("to be","or no to be","is question"); //对数据集转换 DataSet> count = text.flatMap(new LineSplit()); //输出转换后的数据集(print中包含了env.execute执行) count.print(); System.out.println("-----------------------"); //对数据集分组统计转换,0,1是下标,对应Tuple2类中的参数 count = count.groupBy(0).sum(1); //控制台输出数据集 count.print(); System.out.println("-----------------------");}
}
Flink使用sql方式转换数据
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;
public class FlinkMain2 {
@SuppressWarnings({ "unchecked", "rawtypes" })public static void main(String[] args) throws Exception { //创建flink上下文 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env); List list = new ArrayList(); String workStr = "to be or no to be is question"; String[] tokens = workStr.split(" "); for (String token : tokens) { if(token!=null && token.length()>0){ list.add( new WordCount(token,1)); } } //创建数据集 DataSet input = env.fromCollection(list); //注册为数据表wordCount为数据库表,word,frequency为wordCount表字段 tEnv.registerDataSet("wordCount", input, "word, frequency"); Table table = tEnv.sqlQuery(" SELECT word, SUM(frequency) as frequency FROM wordCount GROUP BY word" ); DataSet res = tEnv.toDataSet(table, WordCount.class); //控制台输出 res.print();}public static class WordCount { public String word; public long frequency; public WordCount(){} public WordCount(String word, long frequency) { this.word = word; this.frequency = frequency; } @Override public String toString() { return "词语:" + word + ",词频:" + frequency; }}
}