Flink的编程模型
1,获取Flink上下文;
ExecutionEnvironment env=ExecutionEnvironment.getExecutionEnvironment ();
2,加载,创建数据,数据集
3,数据转换;转换
4,数据结果存放;
5,触发执行。
env.execution
<强>下面为flink输出wordcount数据:强>
进口org.apache.flink.api.common.functions.FlatMapFunction;
进口org.apache.flink.api.java.DataSet;
进口org.apache.flink.api.java.ExecutionEnvironment;
进口org.apache.flink.api.java.tuple.Tuple2;
进口org.apache.flink.util.Collector;
<代码> @SuppressWarnings(“串行”) 公共静态类LineSplit实现FlatMapFunction<字符串,Tuple2<字符串,Integer>在{ @SuppressWarnings(“rawtypes”) @Override/* * * @param价值原数据 * @param出输出的数据 */公共空间flatMap(字符串值,Collector祝辞){抛出异常 String[]令牌=价值。分割(" "); (字符串标记:令牌){ 如果令牌!=零,,token.length()在0){ Tuple2 t=新Tuple2<字符串,Integer>(令牌,1); out.collect (t); } } } } 公共静态void main (String [] args){抛出异常//创建flink上下文 ExecutionEnvironment env=ExecutionEnvironment.getExecutionEnvironment ();//创建数据集 DataSet 文本=env.fromElements(“是”、“没有”、“问题”);//对数据集转换 DataSet 比;数=文本。flatMap(新LineSplit ());//输出转换后的数据集(印刷中包含了env.execute执行) count.print (); System.out.println (“- - - - - - - - - - - - - - - - - - - - - - - -”);//对数据集分组统计转换,0,1是下标,对应Tuple2类中的参数 数=count.groupBy (0) .sum (1);//控制台输出数据集 count.print (); System.out.println (“- - - - - - - - - - - - - - - - - - - - - - - -”); }代码>
}
<强> Flink使用sql方式转换数据强>
进口java.util.ArrayList;
进口并不知道;
进口org.apache.flink.api.java.DataSet;
进口org.apache.flink.api.java.ExecutionEnvironment;
进口org.apache.flink.table.api.Table;
进口org.apache.flink.table.api.TableEnvironment;
进口org.apache.flink.table.api.java.BatchTableEnvironment;
<代码> @SuppressWarnings({“不”、“rawtypes”}) 公共静态void main (String [] args){抛出异常//创建flink上下文 ExecutionEnvironment env=ExecutionEnvironment.getExecutionEnvironment (); BatchTableEnvironment tEnv=TableEnvironment.getTableEnvironment (env); List=new ArrayList列表(); 字符串workStr=笆腔蛎挥形侍狻? String[]令牌=workStr。分割(" "); (字符串标记:令牌){ 如果令牌!=零,,token.length()在0){ 列表。添加(新WordCount(令牌,1)); } }//创建数据集 DataSet 输入=env.fromCollection(列表);//注册为数据表wordCount为数据库表,总之,频率为wordCount表字段 tEnv。registerDataSet (“wordCount”,输入“词、频率”); 表=tEnv表。sqlQuery(“选择单词,(频率)和频率从wordCount组词”); DataSet res=tEnv。toDataSet(表、WordCount.class);//控制台输出 res.print (); } 公共静态类WordCount { 公共字符串字; 公共长频率; 公共WordCount () {} 公共WordCount(字符串字长频率){ 这一点。词=单词; this.frequency=频率; } @Override 公共字符串toString () { 返回“词语:“+ +”一词,词频:”+频率; } }代码>
}