flink多表连接的例子

  

今天写了一个稍微复杂的例子,实现了类似mysql group_concat功能,记录一下
MapToString参考错误那篇博客

  
 <代码>公共静态孔隙主要arg (String[])抛出异常{
  
  最后ExecutionEnvironment env=ExecutionEnvironment.getExecutionEnvironment ();
  BatchTableEnvironment tableEnv=new BatchTableEnvironment (env TableConfig.DEFAULT ());
  tableEnv。registerFunction (“mapToString”,新的mapToString ());
  
  getProjectInfo (env, tableEnv);
  getProject (env, tableEnv);
  joinTableProjectWithInfo (tableEnv);=tableEnv表查询。sqlQuery(“从result_agg选择id、名称、类型”);
  DataSetds=tableEnv。toDataSet(查询,Row.class);
  ds.print ();
  ds。writeAsText (“/home/测试”,WriteMode.OVERWRITE);
  env.execute(多个表);
  }
  
  公共静态孔隙getProjectInfo (ExecutionEnvironment env BatchTableEnvironment tableEnv) {
  
  TypeInformation [] fieldTypes=new TypeInformation [] {BasicTypeInfo。STRING_TYPE_INFO BasicTypeInfo。STRING_TYPE_INFO};
  字段名String []=new String [] {“id”、“类型”};
  RowTypeInfo RowTypeInfo=new RowTypeInfo (fieldTypes,字段名);
  JDBCInputFormat JDBCInputFormat=JDBCInputFormat.buildJDBCInputFormat () .setDrivername (“com.mysql.jdbc.Driver”)
  .setDBUrl (" jdbc: mysql://ip: 3306/空间? characterEncoding=utf8 ")
  .setUsername(“用户”).setPassword (pwd)
  .setQuery(“选择project_fid,演员(project_info_type字符)类型从项目”).setRowTypeInfo (rowTypeInfo) .finish ();
  DataSources=env.createInput (jdbcInputFormat);
  tableEnv。registerDataSet (“project_info”,年代);
  aggProjectInfo (tableEnv“project_info”);
  }
  
  公共静态孔隙aggProjectInfo (BatchTableEnvironment tableEnv,字符串表){
  表tapiResult=tableEnv.scan(表);
  tapiResult.printSchema ();=tableEnv表查询。sqlQuery(“选择id, mapToString(收集(类型))类型从project_info组id”);
  tableEnv。registerTable(表名+“_agg”、查询);
  tapiResult=tableEnv.scan(表名+“_agg”);
  tapiResult.printSchema ();
  }
  
  公共静态孔隙getProject (ExecutionEnvironment env, BatchTableEnvironment tableEnv) {
  
  TypeInformation [] fieldTypes=new TypeInformation [] {BasicTypeInfo。STRING_TYPE_INFO BasicTypeInfo。STRING_TYPE_INFO};
  字段名String []=new String [] {“pid”、“名称”};
  RowTypeInfo RowTypeInfo=new RowTypeInfo (fieldTypes,字段名);
  JDBCInputFormat JDBCInputFormat=JDBCInputFormat.buildJDBCInputFormat () .setDrivername (“com.mysql.jdbc.Driver”)
  .setDBUrl (" jdbc: mysql://ip: 3306/空间? characterEncoding=utf8 ")
  .setUsername(“用户”).setPassword (pwd)
  .setQuery(“选择支撑材,project_name t_project”) .setRowTypeInfo (rowTypeInfo) .finish ();
  DataSources=env.createInput (jdbcInputFormat);
  tableEnv。registerDataSet(“项目”);
  
  }
  
  公共静态孔隙joinTableProjectWithInfo (BatchTableEnvironment tableEnv) {=tableEnv表结果。sqlQuery(“选择一个。pid作为id、a.name b。从项目类型内连接project_info_agg b a.pid=b.id ");
  tableEnv。registerTable (“result_agg”,结果);
  result.printSchema ();
  } 

flink多表连接的例子