Flink使用记录

Posted by zengchengjie on Monday, February 28, 2022

部署

参考链接

  • 执行vim docker-compose.yml 输入:
version: "2.1"
services:
  jobmanager:
    image: flink:1.9.2-scala_2.12
    expose:
      - "6123"
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager
  taskmanager:
    image: flink:1.9.2-scala_2.12
    expose:
      - "6121"
      - "6122"
    depends_on:
      - jobmanager
    command: taskmanager
    links:
      - "jobmanager:jobmanager"
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager
  • 执行命令docker-compose up -d,控制台显示如下:
# docker-compose up -d
Creating network "flink_default" with the default driver
Pulling jobmanager (flink:1.9.2-scala_2.12)...
1.9.2-scala_2.12: Pulling from library/flink
90fe46dd8199: Pull complete
35a4f1977689: Pull complete
bbc37f14aded: Pull complete
4b4691f30000: Pull complete
52474d271bab: Pull complete
7a30d0654650: Pull complete
e209fd853f96: Pull complete
6869a6e80cc7: Pull complete
5ffa7eecab0d: Pull complete
e8842e3b19e4: Pull complete
165e691614fd: Pull complete
47825e844a84: Pull complete
Digest: sha256:2988d416b440031f5efb2f6e39651a54fe2e9ab148ab23d9aa1665f947baa026
Status: Downloaded newer image for flink:1.9.2-scala_2.12
Creating flink_jobmanager_1 ... done
Creating flink_taskmanager_1 ... done
  • 访问宿主机的8081端口,显示如下:

概念

点击阅读flink官方文档

Apache基金会下的流处理项目

fink在德语里意味着快速灵巧

场景 点击数据 联网数据

和spark的区别

spark是一种微批处理,也是一种意义上的批处理

批处理对于flink来说,是一种有界的数据流

流处理是一种无界的数据流

flink来一个处理一个

flink提供了source和sink两个模块,source负责读取,即输入,sink负责输出。

data source

Source 是你的程序从中读取其输入的地方。你可以用 StreamExecutionEnvironment.addSource(sourceFunction) 将一个 source 关联到你的程序。Flink 自带了许多预先实现的 source functions,不过你仍然可以通过实现 SourceFunction 接口编写自定义的非并行 source,也可以通过实现 ParallelSourceFunction 接口或者继承 RichParallelSourceFunction 类编写自定义的并行 sources。

通过 StreamExecutionEnvironment 可以访问多种预定义的 stream source:

基于文件:

  • readTextFile(path) - 读取文本文件,例如遵守 TextInputFormat 规范的文件,逐行读取并将它们作为字符串返回。

  • readFile(fileInputFormat, path) - 按照指定的文件输入格式读取(一次)文件。

  • readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo) - 这是前两个方法内部调用的方法。它基于给定的 fileInputFormat 读取路径 path 上的文件。根据提供的 watchType 的不同,source 可能定期(每 interval 毫秒)监控路径上的新数据(watchType 为 FileProcessingMode.PROCESS_CONTINUOUSLY),或者处理一次当前路径中的数据然后退出(watchType 为 FileProcessingMode.PROCESS_ONCE)。使用 pathFilter,用户可以进一步排除正在处理的文件。

    实现:

    在底层,Flink 将文件读取过程拆分为两个子任务,即 目录监控数据读取。每个子任务都由一个单独的实体实现。监控由单个非并行(并行度 = 1)任务实现,而读取由多个并行运行的任务执行。后者的并行度和作业的并行度相等。单个监控任务的作用是扫描目录(定期或仅扫描一次,取决于 watchType),找到要处理的文件,将它们划分为 分片,并将这些分片分配给下游 reader。Reader 是将实际获取数据的角色。每个分片只能被一个 reader 读取,而一个 reader 可以一个一个地读取多个分片。

    重要提示:

    1. 如果 watchType 设置为 FileProcessingMode.PROCESS_CONTINUOUSLY,当一个文件被修改时,它的内容会被完全重新处理。这可能会打破 “精确一次” 的语义,因为在文件末尾追加数据将导致重新处理文件的所有内容。
    2. 如果 watchType 设置为 FileProcessingMode.PROCESS_ONCE,source 扫描一次路径然后退出,无需等待 reader 读完文件内容。当然,reader 会继续读取数据,直到所有文件内容都读完。关闭 source 会导致在那之后不再有检查点。这可能会导致节点故障后恢复速度变慢,因为作业将从最后一个检查点恢复读取。

基于套接字:

  • socketTextStream - 从套接字读取。元素可以由分隔符分隔。

基于集合:

  • fromCollection(Collection) - 从 Java Java.util.Collection 创建数据流。集合中的所有元素必须属于同一类型。
  • fromCollection(Iterator, Class) - 从迭代器创建数据流。class 参数指定迭代器返回元素的数据类型。
  • fromElements(T ...) - 从给定的对象序列中创建数据流。所有的对象必须属于同一类型。
  • fromParallelCollection(SplittableIterator, Class) - 从迭代器并行创建数据流。class 参数指定迭代器返回元素的数据类型。
  • generateSequence(from, to) - 基于给定间隔内的数字序列并行生成数据流。

自定义:

  • addSource - 关联一个新的 source function。例如,你可以使用 addSource(new FlinkKafkaConsumer<>(...)) 来从 Apache Kafka 获取数据。更多详细信息见连接器

Data Sinks

Data sinks 使用 DataStream 并将它们转发到文件、套接字、外部系统或打印它们。Flink 自带了多种内置的输出格式,这些格式相关的实现封装在 DataStreams 的算子里:

  • writeAsText() / TextOutputFormat - 将元素按行写成字符串。通过调用每个元素的 toString() 方法获得字符串。
  • writeAsCsv(...) / CsvOutputFormat - 将元组写成逗号分隔值文件。行和字段的分隔符是可配置的。每个字段的值来自对象的 toString() 方法。
  • print() / printToErr() - 在标准输出/标准错误流上打印每个元素的 toString() 值。 可选地,可以提供一个前缀(msg)附加到输出。这有助于区分不同的 print 调用。如果并行度大于1,输出结果将附带输出任务标识符的前缀。
  • writeUsingOutputFormat() / FileOutputFormat - 自定义文件输出的方法和基类。支持自定义 object 到 byte 的转换。
  • writeToSocket - 根据 SerializationSchema 将元素写入套接字。
  • addSink - 调用自定义 sink function。Flink 捆绑了连接到其他系统(例如 Apache Kafka)的连接器,这些连接器被实现为 sink functions。

Scala

注意,DataStream 的 write*() 方法主要用于调试目的。它们不参与 Flink 的 checkpointing,这意味着这些函数通常具有至少有一次语义。刷新到目标系统的数据取决于 OutputFormat 的实现。这意味着并非所有发送到 OutputFormat 的元素都会立即显示在目标系统中。此外,在失败的情况下,这些记录可能会丢失。

为了将流可靠地、精准一次地传输到文件系统中,请使用 StreamingFileSink。此外,通过 .addSink(...) 方法调用的自定义实现也可以参与 Flink 的 checkpointing,以实现精准一次的语义。

实践

socket作为输入流

执行官网示例代码,该代码基于流窗口的单词统计应用程序,计算 5 秒窗口内来自 Web 套接字的单词数。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class WindowWordCount {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Tuple2<String, Integer>> dataStream = env
                .socketTextStream("localhost", 9999)
                .flatMap(new Splitter())
                .keyBy(value -> value.f0)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .sum(1);

        dataStream.print();

        env.execute("Window WordCount");
    }

    public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
            for (String word: sentence.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }

}

要运行示例程序,首先从终端使用 netcat 启动输入流:

nc -l 9999

在代码启动前,需要安装nc或者ncat,模拟数据请求 一台机器:nc -l 9999 另一台机器 nc 192.168.10.38 2333即可相互通信,在这里,执行nc -l 9999即可

kafka作为输入流

flink消费kafka数据参考文档

flink-kafka官方文档

注:FlinkKafkaConsumer已被弃用并将在 Flink 1.15 中移除,请改用KafkaSource。

常见问题:

以上内容参考自官方文档,点击阅读更多详情

大厂flink实践案例

Flink在唯品会的实践

贝壳基于flink的实时计算演算之路

flink实时计算在微博的应用

flink在有赞实时计算的实践

阿里云开发者社区flink相关实践