flink1.8对蜂巢的支持不够好,造成300 w的数据,居然读了2个小时,打算将程序迁移至火花。先把代码贴上。后发现sql不应该有哪条件,去掉后速度还行。
maven
<代码> & lt; dependency> & lt; groupId> org.apache.hive & lt; artifactId> hive-jdbc & lt; version> 1.1.0 & lt;/dependency> & lt; dependency> & lt; groupId> org.apache.hadoop & lt; artifactId> hadoop-common & lt; version> 3.1.2 & lt;/dependency> & lt; dependency> & lt; groupId> jdk.tools & lt; artifactId> jdk.tools & lt; version> 1.8 & lt;/version> & lt; scope> system & lt; systemPath> $ {JAVA_HOME}/lib/tools.jar & lt;/dependency>代码>
java
<代码> 私人最终的静态字符串driverName=" org.apache.hive.jdbc.HiveDriver”;//jdbc驱动路径 私人最终的静态字符串url="; ";//蜂巢库地址+库名 私人用户的静态字符串=" ";//用户名 私人最终的静态字符串密码=" ! ";//密码 私人最终的静态字符串表=" "; 私人最终的静态字符串sql=" "; 公共静态void main (String[]参数){抛出异常 长时间=System.currentTimeMillis (); HttpClientUtil.sendDingMessage(“开始同步蜂巢——“+表+”;“+ Utils.getTimeString ());/* * *初始化环境 */最后ExecutionEnvironment env=ExecutionEnvironment.getExecutionEnvironment (); env.setParallelism (4); 尝试{ TypeInformation[]类型=new TypeInformation [] {BasicTypeInfo。STRING_TYPE_INFO BasicTypeInfo.STRING_TYPE_INFO}; String [] colName=new String[]{“用户”、“名称”}; RowTypeInfo RowTypeInfo=new RowTypeInfo(类型、colName); JDBCInputFormatBuilder builder=JDBCInputFormat.buildJDBCInputFormat () .setDrivername (driverName) .setDBUrl (url) .setUsername(用户).setPassword(密码); 日历日历=Calendar.getInstance (); 日历。凝固时间(新日期()); calendar.add(日历。目前为止,1);//用昨天产出的数据 SimpleDateFormat sj=new SimpleDateFormat(“名称”); 字符串d=sj.format (calendar.getTime ()); JDBCInputFormat JDBCInputFormat=建设者。setQuery (sql +”和dt=" + d + "限制100000000”).setRowTypeInfo (rowTypeInfo) .finish (); DataSourcerowlist=env.createInput (jdbcInputFormat); DataSet
temp=rowlist。过滤器(新的FilterFunction () { @Override 公共布尔过滤器(一行一行){抛出异常 字符串键=row.getField (0) .toString (); 字符串值=https://www.yisu.com/zixun/row.getField (1) .toString (); 如果(key.length () <5 | | key.startsWith (“-”) | | key.startsWith (“$”) | | value.length () & lt; 5 | | value.startsWith (“-”) | | value.startsWith (" $ ")) { 返回错误; 其他}{ 返回true; } } })。地图(新MapFunction<行,RedisDataModel> () { @Override 公共RedisDataModel地图(行值){抛出异常 新RedisDataModel RedisDataModel m=(); m.setExpire (1); m.setKey (JobConstants.REDIS_FLINK_IMEI_USER + value.getField (0) .toString ()); m.setGlobal(真正的); m.setValue (value.getField (1) .toString ()); 返回m; } }); HttpClientUtil.sendDingMessage(“同步蜂巢——“+表+”完成,开始推送模型,共有“+ temp.count() +“条;“+ Utils.getTimeString ()); RedisOutputFormat redisOutput=RedisOutputFormat.buildRedisOutputFormat () .setHostMaster (AppConfig.getProperty (JobConstants.REDIS_HOST_MASTER)) .setHostSentinel (AppConfig.getProperty (JobConstants.REDIS_HOST_SENTINELS)) .setMaxIdle (Integer.parseInt (AppConfig.getProperty (JobConstants.REDIS_MAXIDLE))) .setMaxTotal (Integer.parseInt (AppConfig.getProperty (JobConstants.REDIS_MAXTOTAL))) .setMaxWaitMillis (Integer.parseInt (AppConfig.getProperty (JobConstants.REDIS_MAXWAITMILLIS))) .setTestOnBorrow (Boolean.parseBoolean (AppConfig.getProperty (JobConstants.REDIS_TESTONBORROW))) .finish (); temp.output (redisOutput); env。执行(“蜂巢——”+表+“同步”); HttpClientUtil.sendDingMessage(“同步蜂巢——“+表+”完成,耗时:”+ (System.currentTimeMillis () -)/1000 +“s”); }捕捉(异常e) { logger.error (“e); HttpClientUtil.sendDingMessage(“同步蜂巢——“+表+“失败,时间戳:“+时间+”,原因:“+ e.toString ()); } 代码>