怎样使用Apache Flink中SQL APIx的表

  介绍

本篇文章为大家展示了怎样使用Apache Flink中SQL APIx的表,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。

什么是Flink关系型API ?

虽然Flink已经支持了数据集和数据流API,但是有没有一种更好的方式去编程,而不用关心具体的API实现?不需要去了解Java和Scala的具体实现。

Flink提供了三个分层的API。每个API提供了一个不同的简洁性和可表达性和目标之间的权衡不同的用例。

Flink提供了三层API,每一层API提供了一个在简洁性和表达力之间的权衡,。

怎样使用Apache Flink中表的SQL APIx”> </p> <p>最低层是一个有状态的事件驱动。在这一层进行开发是非常麻烦的。</p> <p>虽然很多功能基于数据集和DataStreamAPI是可以完成的,需要熟悉这两套API,而且必须要熟悉Java和Scala,这是有一定的难度的。一个框架如果在使用的过程中没法使用SQL来处理,那么这个框架就有很大的限制,虽然对于开发人员无所谓,但是对于用户来说却不显示。因此SQL是非常面向大众语言。</p> <p>好比MapReduce使用蜂巢SQL,火花使用火花SQL, Flink使用Flink SQL。</p> <p>虽然Flink支持批处理/流处理,那么如何做到API层面的统一? </p> <p>这样表和SQL应运而生。</p> <p>这其实就是一个关系型API,操作起来如同操作Mysql一样简单。</p> <blockquote> <p> Apache Flink特性两个关系表API和SQL API————统一的流和批处理。表为Scala API是一个综合语言查询API和Java允许查询的组合关系运算符如选择,过滤器,并加入在一个非常直观的方式只</p> </引用> <p> Apache Flink通过使用表和SQL API两大特性,来统一批处理和流处理只表API是一个查询API,集成了Scala和Java语言,并且允许使用选择滤波器加入等操作。</p> <p>使用表的SQL API需要额外依赖Java: </p> <p> </p> <>之前,,,,,,,,& lt; dependency>
  ,,,,,,,,,,,& lt; groupId> org.apache.flink</groupId>
  ,,,,,,,,,,,& lt; artifactId> flink-streaming-scala_2.11</artifactId>
  ,,,,,,,,,,,& lt; version> $ {flink.version} & lt;/version>
  ,,,,,,,& lt;/dependency>
  ,,,,,,,& lt; dependency>
  ,,,,,,,,,,,& lt; groupId> org.apache.flink</groupId>
  ,,,,,,,,,,,& lt; artifactId> flink-table-planner_2.11</artifactId>
  ,,,,,,,,,,,& lt; version> $ {flink.version} & lt;/version>
  ,,,,,,,& lt;/dependency>
  ,,,,,,,& lt; dependency>
  ,,,,,,,,,,,& lt; groupId> org.apache.flink</groupId>
  ,,,,,,,,,,,& lt; artifactId> flink-table-api-java-bridge_2.11</artifactId>
  ,,,,,,,,,,,& lt; version> $ {flink.version} & lt;/version>
  ,,,,,,,& lt;/dependency> scala: </pre> <p> </p> <>之前,,,,,,,,& lt; dependency>
  ,,,,,,,,,,,& lt; groupId> org.apache.flink</groupId>
  ,,,,,,,,,,,& lt; artifactId> flink-table-planner_2.11</artifactId>
  ,,,,,,,,,,,& lt; version> $ {flink.version} & lt;/version>
  ,,,,,,,& lt;/dependency>
  ,,,,,,,& lt; dependency>
  ,,,,,,,,,,,& lt; groupId> org.apache.flink</groupId>
  ,,,,,,,,,,,& lt; artifactId> flink-table-api-scala-bridge_2.11</artifactId>
  ,,,,,,,,,,,& lt; version> $ {flink.version} & lt;/version>
  ,,,,,,,& lt;/dependency> </pre> <h3>使用表的SQL API编程</h3> <p>首先导入上面的依赖,然后读取sales.csv文件,文件内容如下:</p> <pre> transactionId, customerId, itemId amountPaid
  111年,1100 0
  112年,2505 0
  113年,3510 0
  114年,4600 0
  115年,2500 0
  116年,2500 0
  117年,2500 0
  118年,2500 0
  119年,3500 0
  120年,2500 0
  121年,4500 0
  122年,2500 0
  123年,4500 0
  124 1 2500 .0 Scala </pre> <h3> </h3> <pre> object  TableSQLAPI  {
  
  def 才能;主要(args:数组[String]):, Unit =, {
  ,,,val  bEnv =ExecutionEnvironment.getExecutionEnvironment
  ,,,val  bTableEnv =, BatchTableEnvironment.create (bEnv)
  ,,,val  filePath=癊:/测试/sales.csv怎样使用Apache Flink中SQL APIx的表