介绍
如何使用Apache Flink实现自定义,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。
套接字发送过来的数据,把字符串类型转成对象,然后把Java对象保存到Mysql数据库中。
创建数据库和表
create database imooc_flink; create table 学生( id int (11), NOT NULL AUTO_INCREMENT, name varchar (25), age int (10), primary 关键(id) )
导入mysql依赖:
& lt; dependency> & lt; groupId> mysql & lt; artifactId> mysql-connector-java & lt; version> 8.0.15 & lt;/dependency>
创建POJO学生
package com.vincent.course05; public class  Student { ,,,private int id; ,,,private String 名称; ,,,private int 年龄; ,,@Override ,,,public String toString (), { ,,,,,,,return “学生{“+ ,,,,,,,,,,,,,,,“id=? +, id + ,,,,,,,,,,,,,,,,,,的名字=& # 39;“,+,name +, & # 39; \ & # 39; & # 39; + ,,,,,,,,,,,,,,,,,,岁=?+,age + ,,,,,,,,,,,,,,,& # 39;}& # 39;; ,,,} ,,,public int getId (), { ,,,,,,,return id; ,,,} ,,,public void setId (int id), { ,,,,,,,this.id =, id; ,,,} ,,,public String getName (), { ,,,,,,,return 名称; ,,,} ,,,public void setName (String 名称),{ ,,,,,,,this.name =,名称; ,,,} ,,,public int getAge (), { ,,,,,,,return 年龄; ,,,} ,,,public void setAge (int 年龄),{ ,,,,,,,this.age =,年龄; ,,,} }
然后创建连接,SinkToMySQL.java
package com.vincent.course05; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; public class  SinkToMySQL extends RichSinkFunction< Student>, { ,,,PreparedStatement ps; ,,,private Connection 连接; ,,,/* * ,,,,*,open(),方法中建立连接,这样不用每次,invoke 的时候都要建立连接和释放连接 ,,,, ,,,,*,@param 参数 ,,,,*,@throws 例外 ,,,*/,,@Override ,,,public void 开放(Configuration 参数),throws Exception { ,,,,,,,super.open(参数); ,,,,,,,connection =, getConnection (); ,,,,,,,String sql =,“insert into 学生(id,名称,,年龄),值(?,?,?);“; ,,,,,,,ps =, this.connection.prepareStatement (sql); ,,,} ,,@Override ,,,public void 关闭(),throws Exception { ,,,,,,,super.close (); ,,,,,,,//关闭连接和释放资源 ,,,,,,,if (connection !=, null), { ,,,,,,,,,,,connection.close (); ,,,,,,,} ,,,,,,,if (ps !=, null), { ,,,,,,,,,,,ps.close (); ,,,,,,,} ,,,} ,,,/* * ,,,,*,每条数据的插入都要调用一次,调用(),方法 ,,,, ,,,,*,@param 价值 ,,,,*,@param 上下文 null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null如何使用Apache Flink实现自定义下沉