如何使用SQL读取卡夫卡并写入MySQL

  介绍

今天就跟大家聊聊有关如何使用SQL读取卡夫卡并写入MySQL,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。

SqlSubmit的实现

笔者一开始是想用SQL客户机来贯穿整个演示环节,但可惜1.9版本SQL CLI还不支持处理创建表语句。所以笔者就只好自己写了个简单的提交脚本。后来想,想也挺好的,可以让听众同时了解如何通过SQL的方式,和编程的方式使用Flink SQL。

SqlSubmit的主要任务是执行和提交一个SQL文件,实现非常简单,就是通过正则表达式匹配每个语句块。如果是创建表或插入开头,则会调用tEnv.sqlUpdate(…)。如果是开集头,则会将配置设置到TableConfig上。其核心代码主要如下所示:

 EnvironmentSettings  settings =, EnvironmentSettings.newInstance ()
  ,,,,,,,.useBlinkPlanner ()
  ,,,,,,,.inStreamingMode ()
  ,,,,,,,.build();//,创建一个使用,Blink  Planner 的,TableEnvironment,,并工作在流模式TableEnvironment  tEnv =, TableEnvironment.create(设置);//,读取,SQL 文件List, SQL =, Files.readAllLines(路径);//,通过正则表达式匹配前缀,来区分不同的,SQL 语句List, calls =, SqlCommandParser.parse (sql);//,根据不同的,SQL 语句,调用,TableEnvironment 执行for  (SqlCommandCall  call :调用),{switch 才能;(call.command), {,,, case 集:,,,,,,String  key =, call.operands [0];,,,,,, String  value =, call.operands[1];,,,,,,//,设置参数
  ,,,,,tEnv.getConfig () .getConfiguration () .setString(键,值),,,,,,,休息,,,,,case  CREATE_TABLE:,,,,,, String  ddl =, call.operands [0];
  ,,,,,tEnv.sqlUpdate (ddl),,,,,,,休息,,,,,case  INSERT_INTO:,,,,,, String  dml =, call.operands [0];
  ,,,,,tEnv.sqlUpdate (dml);,,,,,,休息,,,,,默认值:,,,,,,throw  new  RuntimeException (“Unsupported 命令:“,+,call.command);
  ,,}
  }//,提交作业tEnv.execute (“SQL  Job"); 

使用DDL连接卡夫卡源表

在flink-sql-submit项目中,我们准备了一份测试数据集(来自阿里云天池公开数据集,特别鸣谢),位于src/main/资源/user_behavior。日志。数据以JSON格式编码,大概长这个样子:

 {“user_id":,“543462”,,“item_id":“1715”,,,“category_id":,“1464116”,,“behavior":,“pv",,“ts":,“2017 - 11 - 26 t01:00:00z"}
  {“user_id":,“662867”,,“item_id":“2244074”,,“category_id":,“1575622”,,“behavior":,“pv",,“ts":,“2017 - 11 - 26 t01:00:00z"} 

为了模拟真实的卡夫卡数据源,笔者还特地写了一个source-generator。sh脚本(感兴趣的可以看下源码),会自动读取user_behavior。日志的数据并以默认每毫秒1条的速率灌到卡夫卡的user_behavior主题中。

有了数据源后,我们就可以用DDL去创建并连接这个卡夫卡中的话题(详见src/main/资源/q1.sql)。

 CREATE  TABLE  user_log  (
  ,,,user_id  VARCHAR、
  ,,,item_id  VARCHAR、
  ,,,category_id  VARCHAR、
  ,,,behavior  VARCHAR、
  ,,,ts 时间戳
  ),WITH  (,,, & # 39; connector.type& # 39;,=, & # 39;卡夫卡# 39;,,,,使用,kafka  connector ,,, & # 39; connector.version& # 39;,=, & # 39;通用# 39;,,,,,kafka 版本,universal 支持,0.11,以上的版本,,,,& # 39;connector.topic& # 39;,=, & # 39; user_behavior& # 39;,,,,, kafka  topic ,,, & # 39; connector.startup-mode& # 39;,=, & # 39; earliest-offset& # 39;,,,,从起始,offset 开始读取,,,,& # 39;connector.properties.0.key& # 39;,=, & # 39; zookeeper.connect& # 39;,,,,,连接信息,,,,& # 39;connector.properties.0.value& # 39;,=, & # 39; localhost: 2181 & # 39;,,
  ,,,& # 39;connector.properties.1.key& # 39;,=, & # 39; bootstrap.servers& # 39;,,,,, & # 39; connector.properties.1.value& # 39;,=, & # 39; localhost: 9092 & # 39;,,
  ,,,& # 39;update-mode& # 39;,=, & # 39;添加# 39;,,,,,& # 39;format.type& # 39;,=, & # 39; json # 39;,,,,,数据源格式为,json ,,, & # 39; format.derive-schema& # 39;,=, & # 39;真正的# 39;,,,从,DDL  schema 确定,json 解析规则
  )

注:可能有用户会觉得其中的connector.properties.0。null   null   null   null   null   null   null   null   null   null   null   null   null

如何使用SQL读取卡夫卡并写入MySQL