学习 Fink(二):第一个 Flink 应用

开发环境

开发环境:

  • JDK 8
  • Maven 3
  • IntelliJ IDEA

准备

编辑 pom.xml 文件:

<dependency>  
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-scala_2.11</artifactId>
  <version>1.6.0</version>
  <scope>provided</scope>
</dependency>  
<dependency>  
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-streaming-scala_2.11</artifactId>
  <version>1.6.0</version>
  <scope>provided</scope>
</dependency>  
<build>  
    <finalName>flink-demo</finalName>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.2</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
        <plugin>
            <groupId>org.scala-tools</groupId>
            <artifactId>maven-scala-plugin</artifactId>
            <version>2.15.2</version>
            <executions>
                <execution>
                    <id>compile</id>
                    <goals>
                        <goal>compile</goal>
                    </goals>
                    <phase>compile</phase>
                </execution>
                <execution>
                    <id>test-compile</id>
                    <goals>
                        <goal>testCompile</goal>
                    </goals>
                    <phase>test-compile</phase>
                </execution>
                <execution>
                    <phase>process-resources</phase>
                    <goals>
                        <goal>compile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>3.0.0</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <artifactSet>
                            <excludes>
                                <exclude>com.google.code.findbugs:jsr305</exclude>
                                <exclude>org.slf4j:*</exclude>
                                <exclude>log4j:*</exclude>
                            </excludes>
                        </artifactSet>
                        <filters>
                            <filter>
                                <artifact>*:*</artifact>
                                <excludes>
                                    <exclude>META-INF/*.SF</exclude>
                                    <exclude>META-INF/*.DSA</exclude>
                                    <exclude>META-INF/*.RSA</exclude>
                                </excludes>
                            </filter>
                        </filters>
                        <transformers>
                            <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                <mainClass>com.dyingbleed.demo.flink.Application</mainClass>
                            </transformer>
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>  

WordCount

package com.dyingbleed.demo.flink

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment  
import org.apache.flink.streaming.api.windowing.time.Time  
import org.apache.flink.streaming.api.scala._

object Application {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment // ①

    env.socketTextStream("127.0.0.1", 7777) // ②
        .flatMap(line => line.split(" ").map(word => (word, 1))) // ③
        .keyBy(0) // ④
        .timeWindow(Time.seconds(30)) // ⑤
        .sum(1) // ⑥
        .print()

    env.execute("word count") // ⑦
  }

}

① 创建一个执行环境(Execution Environment)。如果在本地,则返回 org.apache.flink.streaming.api.environment.LocalStreamEnvironment,如果是在集群,则返回 org.apache.flink.streaming.api.environment.RemoteStreamEnvironment

② 通过连接 TCP Server 逐行读取文本创建 DataStream 数据流。

③ 执行 flatMap 转换,对文本进行分词,返回词与词频。注意:需要显式 import org.apache.flink.streaming.api.scala._

④ 按词进行分组统计,参数指 Tuple 的第几个元素。

⑤ 时间窗口为 50 秒,即 50 秒为一个周期统计一次词和词频。

⑥ 合计词频,参数值 Tuple 的第几个元素。

⑦ 触发应用执行。

运行

第一步,启动 TCP Server 监听本地 7777 端口,终端执行:

nc -l 7777  

第二步,以 Local 模式运行应用 WordCount 应用。

第三步,终端输入:

hello world  
hello flink  

应用输出:

(hello,2)
(world,1)
(flink,1)