热门IT资讯网

Flink入门wordCount

发表于:2024-11-24 作者:热门IT资讯网编辑
编辑最后更新 2024年11月24日,Flink的编程模型1、获取Flink上下文;ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();2、加

Flink的编程模型
1、获取Flink上下文;
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
2、加载、创建数据;
DataSet
3、数据转换;
Transformation
4、数据结果存放;
5、触发执行。
env.execution

下面为flink输出wordcount数据:

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class FlinkMain {

@SuppressWarnings("serial")public static class LineSplit implements FlatMapFunction>{    @SuppressWarnings("rawtypes")    @Override    /**     * @param value 原数据     * @param out 输出的数据     */    public void flatMap(String value, Collector> out) throws Exception {        String[] tokens = value.split(" ");        for (String token : tokens) {            if(token!=null && token.length()>0){                Tuple2 t = new Tuple2(token,1);                out.collect(t);            }        }    }}public static void main(String[] args) throws Exception {    //创建flink上下文    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();    //创建数据集    DataSet text = env.fromElements("to be","or no to be","is question");    //对数据集转换    DataSet> count = text.flatMap(new LineSplit());    //输出转换后的数据集(print中包含了env.execute执行)    count.print();    System.out.println("-----------------------");    //对数据集分组统计转换,0,1是下标,对应Tuple2类中的参数    count = count.groupBy(0).sum(1);    //控制台输出数据集    count.print();    System.out.println("-----------------------");}

}

Flink使用sql方式转换数据
import java.util.ArrayList;
import java.util.List;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;

public class FlinkMain2 {

@SuppressWarnings({ "unchecked", "rawtypes" })public static void main(String[] args) throws Exception {    //创建flink上下文    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();    BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);    List list = new ArrayList();    String workStr = "to be or no to be is question";    String[] tokens = workStr.split(" ");    for (String token : tokens) {        if(token!=null && token.length()>0){            list.add( new WordCount(token,1));        }    }    //创建数据集    DataSet input = env.fromCollection(list);    //注册为数据表wordCount为数据库表,word,frequency为wordCount表字段    tEnv.registerDataSet("wordCount", input, "word, frequency");    Table table = tEnv.sqlQuery(" SELECT word, SUM(frequency) as frequency FROM wordCount GROUP BY word" );    DataSet res = tEnv.toDataSet(table, WordCount.class);    //控制台输出    res.print();}public static class WordCount    {    public String word;    public long frequency;    public WordCount(){}    public WordCount(String word, long frequency) {        this.word = word;        this.frequency = frequency;    }    @Override    public String toString() {        return "词语:" + word + ",词频:" + frequency;    }}

}

0