flink读取蜂巢的数据

  

flink1.8对蜂巢的支持不够好,造成300 w的数据,居然读了2个小时,打算将程序迁移至火花。先把代码贴上。后发现sql不应该有哪条件,去掉后速度还行。

  maven

  
 <代码>
  & lt; dependency>
  & lt; groupId> org.apache.hive
  & lt; artifactId> hive-jdbc
  & lt; version> 1.1.0
  & lt;/dependency>
  
  & lt; dependency>
  & lt; groupId> org.apache.hadoop
  & lt; artifactId> hadoop-common
  & lt; version> 3.1.2
  & lt;/dependency>
  
  & lt; dependency>
  & lt; groupId> jdk.tools
  & lt; artifactId> jdk.tools
  & lt; version> 1.8 & lt;/version>
  & lt; scope> system
  & lt; systemPath> $ {JAVA_HOME}/lib/tools.jar
  & lt;/dependency> 
  

java   

 <代码>
  私人最终的静态字符串driverName=" org.apache.hive.jdbc.HiveDriver”;//jdbc驱动路径
  私人最终的静态字符串url="; ";//蜂巢库地址+库名
  私人用户的静态字符串=" ";//用户名
  私人最终的静态字符串密码=" ! ";//密码
  私人最终的静态字符串表=" ";
  私人最终的静态字符串sql=" ";
  
  公共静态void main (String[]参数){抛出异常
  
  长时间=System.currentTimeMillis ();
  HttpClientUtil.sendDingMessage(“开始同步蜂巢——“+表+”;“+ Utils.getTimeString ());/* *
  *初始化环境
  */最后ExecutionEnvironment env=ExecutionEnvironment.getExecutionEnvironment ();
  env.setParallelism (4);
  
  尝试{
  TypeInformation[]类型=new TypeInformation [] {BasicTypeInfo。STRING_TYPE_INFO BasicTypeInfo.STRING_TYPE_INFO};
  String [] colName=new String[]{“用户”、“名称”};
  RowTypeInfo RowTypeInfo=new RowTypeInfo(类型、colName);
  JDBCInputFormatBuilder builder=JDBCInputFormat.buildJDBCInputFormat () .setDrivername (driverName)
  .setDBUrl (url)
  .setUsername(用户).setPassword(密码);
  
  日历日历=Calendar.getInstance ();
  日历。凝固时间(新日期());
  calendar.add(日历。目前为止,1);//用昨天产出的数据
  SimpleDateFormat sj=new SimpleDateFormat(“名称”);
  字符串d=sj.format (calendar.getTime ());
  
  JDBCInputFormat JDBCInputFormat=建设者。setQuery (sql +”和dt=" + d + "限制100000000”).setRowTypeInfo (rowTypeInfo) .finish ();
  DataSourcerowlist=env.createInput (jdbcInputFormat);
  
  DataSettemp=rowlist。过滤器(新的FilterFunction () {
  
  @Override
  公共布尔过滤器(一行一行){抛出异常
  字符串键=row.getField (0) .toString ();
  字符串值=https://www.yisu.com/zixun/row.getField (1) .toString ();
  如果(key.length () <5 | | key.startsWith (“-”) | | key.startsWith (“$”) | | value.length () & lt; 5 | | value.startsWith (“-”) | | value.startsWith (" $ ")) {
  返回错误;
  其他}{
  返回true;
  }
  }
  
  })。地图(新MapFunction<行,RedisDataModel> () {
  
  @Override
  公共RedisDataModel地图(行值){抛出异常
  新RedisDataModel RedisDataModel m=();
  m.setExpire (1);
  m.setKey (JobConstants.REDIS_FLINK_IMEI_USER + value.getField (0) .toString ());
  m.setGlobal(真正的);
  m.setValue (value.getField (1) .toString ());
  返回m;
  }
  
  });
  
  HttpClientUtil.sendDingMessage(“同步蜂巢——“+表+”完成,开始推送模型,共有“+ temp.count() +“条;“+ Utils.getTimeString ());
  
  RedisOutputFormat redisOutput=RedisOutputFormat.buildRedisOutputFormat ()
  .setHostMaster (AppConfig.getProperty (JobConstants.REDIS_HOST_MASTER))
  .setHostSentinel (AppConfig.getProperty (JobConstants.REDIS_HOST_SENTINELS))
  .setMaxIdle (Integer.parseInt (AppConfig.getProperty (JobConstants.REDIS_MAXIDLE)))
  .setMaxTotal (Integer.parseInt (AppConfig.getProperty (JobConstants.REDIS_MAXTOTAL)))
  .setMaxWaitMillis (Integer.parseInt (AppConfig.getProperty (JobConstants.REDIS_MAXWAITMILLIS)))
  .setTestOnBorrow (Boolean.parseBoolean (AppConfig.getProperty (JobConstants.REDIS_TESTONBORROW)))
  .finish ();
  temp.output (redisOutput);
  env。执行(“蜂巢——”+表+“同步”);
  
  HttpClientUtil.sendDingMessage(“同步蜂巢——“+表+”完成,耗时:”+ (System.currentTimeMillis () -)/1000 +“s”);
  }捕捉(异常e) {
  logger.error (“e);
  HttpClientUtil.sendDingMessage(“同步蜂巢——“+表+“失败,时间戳:“+时间+”,原因:“+ e.toString ());
  }
   

flink读取蜂巢的数据