今天写了一个稍微复杂的例子,实现了类似mysql group_concat功能,记录一下
MapToString参考错误那篇博客
<代码>公共静态孔隙主要arg (String[])抛出异常{ 最后ExecutionEnvironment env=ExecutionEnvironment.getExecutionEnvironment (); BatchTableEnvironment tableEnv=new BatchTableEnvironment (env TableConfig.DEFAULT ()); tableEnv。registerFunction (“mapToString”,新的mapToString ()); getProjectInfo (env, tableEnv); getProject (env, tableEnv); joinTableProjectWithInfo (tableEnv);=tableEnv表查询。sqlQuery(“从result_agg选择id、名称、类型”); DataSetds=tableEnv。toDataSet(查询,Row.class); ds.print (); ds。writeAsText (“/home/测试”,WriteMode.OVERWRITE); env.execute(多个表); } 公共静态孔隙getProjectInfo (ExecutionEnvironment env BatchTableEnvironment tableEnv) { TypeInformation [] fieldTypes=new TypeInformation [] {BasicTypeInfo。STRING_TYPE_INFO BasicTypeInfo。STRING_TYPE_INFO}; 字段名String []=new String [] {“id”、“类型”}; RowTypeInfo RowTypeInfo=new RowTypeInfo (fieldTypes,字段名); JDBCInputFormat JDBCInputFormat=JDBCInputFormat.buildJDBCInputFormat () .setDrivername (“com.mysql.jdbc.Driver”) .setDBUrl (" jdbc: mysql://ip: 3306/空间? characterEncoding=utf8 ") .setUsername(“用户”).setPassword (pwd) .setQuery(“选择project_fid,演员(project_info_type字符)类型从项目”).setRowTypeInfo (rowTypeInfo) .finish (); DataSource
s=env.createInput (jdbcInputFormat); tableEnv。registerDataSet (“project_info”,年代); aggProjectInfo (tableEnv“project_info”); } 公共静态孔隙aggProjectInfo (BatchTableEnvironment tableEnv,字符串表){ 表tapiResult=tableEnv.scan(表); tapiResult.printSchema ();=tableEnv表查询。sqlQuery(“选择id, mapToString(收集(类型))类型从project_info组id”); tableEnv。registerTable(表名+“_agg”、查询); tapiResult=tableEnv.scan(表名+“_agg”); tapiResult.printSchema (); } 公共静态孔隙getProject (ExecutionEnvironment env, BatchTableEnvironment tableEnv) { TypeInformation [] fieldTypes=new TypeInformation [] {BasicTypeInfo。STRING_TYPE_INFO BasicTypeInfo。STRING_TYPE_INFO}; 字段名String []=new String [] {“pid”、“名称”}; RowTypeInfo RowTypeInfo=new RowTypeInfo (fieldTypes,字段名); JDBCInputFormat JDBCInputFormat=JDBCInputFormat.buildJDBCInputFormat () .setDrivername (“com.mysql.jdbc.Driver”) .setDBUrl (" jdbc: mysql://ip: 3306/空间? characterEncoding=utf8 ") .setUsername(“用户”).setPassword (pwd) .setQuery(“选择支撑材,project_name t_project”) .setRowTypeInfo (rowTypeInfo) .finish (); DataSource
s=env.createInput (jdbcInputFormat); tableEnv。registerDataSet(“项目”); } 公共静态孔隙joinTableProjectWithInfo (BatchTableEnvironment tableEnv) {=tableEnv表结果。sqlQuery(“选择一个。pid作为id、a.name b。从项目类型内连接project_info_agg b a.pid=b.id "); tableEnv。registerTable (“result_agg”,结果); result.printSchema (); }代码>