Flink入门wordCount

  

Flink的编程模型
1,获取Flink上下文;
ExecutionEnvironment env=ExecutionEnvironment.getExecutionEnvironment ();
2,加载,创建数据,数据集

3,数据转换;转换

4,数据结果存放;
5,触发执行。
env.execution

  

<强>下面为flink输出wordcount数据:

  

进口org.apache.flink.api.common.functions.FlatMapFunction;
进口org.apache.flink.api.java.DataSet;
进口org.apache.flink.api.java.ExecutionEnvironment;
进口org.apache.flink.api.java.tuple.Tuple2;
进口org.apache.flink.util.Collector;

  公共类FlinkMain {

  
 <代码> @SuppressWarnings(“串行”)
  公共静态类LineSplit实现FlatMapFunction<字符串,Tuple2<字符串,Integer>在{
  
  @SuppressWarnings(“rawtypes”)
  @Override/* *
  * @param价值原数据
  * @param出输出的数据
  */公共空间flatMap(字符串值,Collector祝辞){抛出异常
  String[]令牌=价值。分割(" ");
  (字符串标记:令牌){
  如果令牌!=零,,token.length()在0){
  Tuple2 t=新Tuple2<字符串,Integer>(令牌,1);
  out.collect (t);
  }
  }
  }
  
  }
  
  公共静态void main (String [] args){抛出异常//创建flink上下文
  ExecutionEnvironment env=ExecutionEnvironment.getExecutionEnvironment ();//创建数据集
  DataSet文本=env.fromElements(“是”、“没有”、“问题”);//对数据集转换
  DataSet比;数=文本。flatMap(新LineSplit ());//输出转换后的数据集(印刷中包含了env.execute执行)
  count.print ();
  System.out.println (“- - - - - - - - - - - - - - - - - - - - - - - -”);//对数据集分组统计转换,0,1是下标,对应Tuple2类中的参数
  数=count.groupBy (0) .sum (1);//控制台输出数据集
  count.print ();
  System.out.println (“- - - - - - - - - - - - - - - - - - - - - - - -”);
  } 
  

}   

<强> Flink使用sql方式转换数据
进口java.util.ArrayList;
进口并不知道;

  

进口org.apache.flink.api.java.DataSet;
进口org.apache.flink.api.java.ExecutionEnvironment;
进口org.apache.flink.table.api.Table;
进口org.apache.flink.table.api.TableEnvironment;
进口org.apache.flink.table.api.java.BatchTableEnvironment;

  公共类FlinkMain2 {

  
 <代码> @SuppressWarnings({“不”、“rawtypes”})
  公共静态void main (String [] args){抛出异常//创建flink上下文
  ExecutionEnvironment env=ExecutionEnvironment.getExecutionEnvironment ();
  BatchTableEnvironment tEnv=TableEnvironment.getTableEnvironment (env);
  
  List=new ArrayList列表();
  字符串workStr=笆腔蛎挥形侍狻?
  String[]令牌=workStr。分割(" ");
  (字符串标记:令牌){
  如果令牌!=零,,token.length()在0){
  列表。添加(新WordCount(令牌,1));
  }
  }//创建数据集
  DataSet输入=env.fromCollection(列表);//注册为数据表wordCount为数据库表,总之,频率为wordCount表字段
  tEnv。registerDataSet (“wordCount”,输入“词、频率”);
  
  表=tEnv表。sqlQuery(“选择单词,(频率)和频率从wordCount组词”);
  
  DataSetres=tEnv。toDataSet(表、WordCount.class);//控制台输出
  res.print ();
  
  }
  
  公共静态类WordCount {
  公共字符串字;
  公共长频率;
  公共WordCount () {}
  
  公共WordCount(字符串字长频率){
  这一点。词=单词;
  this.frequency=频率;
  }
  
  @Override
  公共字符串toString () {
  返回“词语:“+ +”一词,词频:”+频率;
  }
  } 
  

}

Flink入门wordCount