如何使用Apache Flink实现自定义下沉

  介绍

如何使用Apache Flink实现自定义,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。

套接字发送过来的数据,把字符串类型转成对象,然后把Java对象保存到Mysql数据库中。

创建数据库和表

 create  database  imooc_flink;
  create  table 学生(
  id  int (11), NOT  NULL  AUTO_INCREMENT,
  name  varchar (25),
  age  int (10),
  primary 关键(id)
  )

导入mysql依赖:

 & lt; dependency>
  & lt; groupId> mysql
  & lt; artifactId> mysql-connector-java
  & lt; version> 8.0.15
  & lt;/dependency> 

创建POJO学生

 package  com.vincent.course05;
  
  public  class  Student  {
  
  ,,,private  int  id;
  ,,,private  String 名称;
  ,,,private  int 年龄;
  
  ,,@Override
  ,,,public  String  toString (), {
  ,,,,,,,return “学生{“+
  ,,,,,,,,,,,,,,,“id=? +, id  +
  ,,,,,,,,,,,,,,,,,,的名字=& # 39;“,+,name  +, & # 39; \ & # 39; & # 39; +
  ,,,,,,,,,,,,,,,,,,岁=?+,age  +
  ,,,,,,,,,,,,,,,& # 39;}& # 39;;
  ,,,}
  
  ,,,public  int  getId (), {
  ,,,,,,,return  id;
  ,,,}
  
  ,,,public  void  setId (int  id), {
  ,,,,,,,this.id =, id;
  ,,,}
  
  ,,,public  String  getName (), {
  ,,,,,,,return 名称;
  ,,,}
  
  ,,,public  void  setName (String 名称),{
  ,,,,,,,this.name =,名称;
  ,,,}
  
  ,,,public  int  getAge (), {
  ,,,,,,,return 年龄;
  ,,,}
  
  ,,,public  void  setAge (int 年龄),{
  ,,,,,,,this.age =,年龄;
  ,,,}
  }

然后创建连接,SinkToMySQL.java

 package  com.vincent.course05;
  
  import  org.apache.flink.configuration.Configuration;
  import  org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
  
  import  java.sql.Connection;
  import  java.sql.DriverManager;
  import  java.sql.PreparedStatement;
  
  public  class  SinkToMySQL  extends  RichSinkFunction< Student>, {
  ,,,PreparedStatement  ps;
  ,,,private  Connection 连接;
  
  ,,,/* *
  ,,,,*,open(),方法中建立连接,这样不用每次,invoke 的时候都要建立连接和释放连接
  ,,,,
  ,,,,*,@param 参数
  ,,,,*,@throws 例外
  ,,,*/,,@Override
  ,,,public  void 开放(Configuration 参数),throws  Exception  {
  ,,,,,,,super.open(参数);
  ,,,,,,,connection =, getConnection ();
  ,,,,,,,String  sql =,“insert  into 学生(id,名称,,年龄),值(?,?,?);“;
  ,,,,,,,ps =, this.connection.prepareStatement (sql);
  ,,,}
  
  ,,@Override
  ,,,public  void 关闭(),throws  Exception  {
  ,,,,,,,super.close ();
  ,,,,,,,//关闭连接和释放资源
  ,,,,,,,if  (connection  !=, null), {
  ,,,,,,,,,,,connection.close ();
  ,,,,,,,}
  ,,,,,,,if  (ps  !=, null), {
  ,,,,,,,,,,,ps.close ();
  ,,,,,,,}
  ,,,}
  
  ,,,/* *
  ,,,,*,每条数据的插入都要调用一次,调用(),方法
  ,,,,
  ,,,,*,@param 价值
  ,,,,*,@param 上下文
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null

如何使用Apache Flink实现自定义下沉