部署
docker部署flink
- 执行vim docker-compose.yml 输入:
version: "2.1"
services:
jobmanager:
image: flink:1.9.2-scala_2.12
expose:
- "6123"
ports:
- "8081:8081"
command: jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
taskmanager:
image: flink:1.9.2-scala_2.12
expose:
- "6121"
- "6122"
depends_on:
- jobmanager
command: taskmanager
links:
- "jobmanager:jobmanager"
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
- 执行命令docker-compose up -d,控制台显示如下:
# docker-compose up -d
Creating network "flink_default" with the default driver
Pulling jobmanager (flink:1.9.2-scala_2.12)...
1.9.2-scala_2.12: Pulling from library/flink
90fe46dd8199: Pull complete
35a4f1977689: Pull complete
bbc37f14aded: Pull complete
4b4691f30000: Pull complete
52474d271bab: Pull complete
7a30d0654650: Pull complete
e209fd853f96: Pull complete
6869a6e80cc7: Pull complete
5ffa7eecab0d: Pull complete
e8842e3b19e4: Pull complete
165e691614fd: Pull complete
47825e844a84: Pull complete
Digest: sha256:2988d416b440031f5efb2f6e39651a54fe2e9ab148ab23d9aa1665f947baa026
Status: Downloaded newer image for flink:1.9.2-scala_2.12
Creating flink_jobmanager_1 ... done
Creating flink_taskmanager_1 ... done
- 访问宿主机的8081端口,显示如下:
概念
Apache基金会下的流处理项目
fink在德语里意味着快速灵巧
场景 点击数据 联网数据
和spark的区别
spark是一种微批处理,也是一种意义上的批处理
批处理对于flink来说,是一种有界的数据流
流处理是一种无界的数据流
flink来一个处理一个
flink提供了source和sink两个模块,source负责读取,即输入,sink负责输出。
data source
Source 是你的程序从中读取其输入的地方。你可以用 StreamExecutionEnvironment.addSource(sourceFunction)
将一个 source 关联到你的程序。Flink 自带了许多预先实现的 source functions,不过你仍然可以通过实现 SourceFunction
接口编写自定义的非并行 source,也可以通过实现 ParallelSourceFunction
接口或者继承 RichParallelSourceFunction
类编写自定义的并行 sources。
通过 StreamExecutionEnvironment
可以访问多种预定义的 stream source:
基于文件:
-
readTextFile(path)
- 读取文本文件,例如遵守 TextInputFormat 规范的文件,逐行读取并将它们作为字符串返回。 -
readFile(fileInputFormat, path)
- 按照指定的文件输入格式读取(一次)文件。 -
readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)
- 这是前两个方法内部调用的方法。它基于给定的fileInputFormat
读取路径path
上的文件。根据提供的watchType
的不同,source 可能定期(每interval
毫秒)监控路径上的新数据(watchType 为FileProcessingMode.PROCESS_CONTINUOUSLY
),或者处理一次当前路径中的数据然后退出(watchType 为FileProcessingMode.PROCESS_ONCE
)。使用pathFilter
,用户可以进一步排除正在处理的文件。实现:
在底层,Flink 将文件读取过程拆分为两个子任务,即 目录监控 和 数据读取。每个子任务都由一个单独的实体实现。监控由单个非并行(并行度 = 1)任务实现,而读取由多个并行运行的任务执行。后者的并行度和作业的并行度相等。单个监控任务的作用是扫描目录(定期或仅扫描一次,取决于
watchType
),找到要处理的文件,将它们划分为 分片,并将这些分片分配给下游 reader。Reader 是将实际获取数据的角色。每个分片只能被一个 reader 读取,而一个 reader 可以一个一个地读取多个分片。重要提示:
- 如果
watchType
设置为FileProcessingMode.PROCESS_CONTINUOUSLY
,当一个文件被修改时,它的内容会被完全重新处理。这可能会打破 “精确一次” 的语义,因为在文件末尾追加数据将导致重新处理文件的所有内容。 - 如果
watchType
设置为FileProcessingMode.PROCESS_ONCE
,source 扫描一次路径然后退出,无需等待 reader 读完文件内容。当然,reader 会继续读取数据,直到所有文件内容都读完。关闭 source 会导致在那之后不再有检查点。这可能会导致节点故障后恢复速度变慢,因为作业将从最后一个检查点恢复读取。
- 如果
基于套接字:
socketTextStream
- 从套接字读取。元素可以由分隔符分隔。
基于集合:
fromCollection(Collection)
- 从 Java Java.util.Collection 创建数据流。集合中的所有元素必须属于同一类型。fromCollection(Iterator, Class)
- 从迭代器创建数据流。class 参数指定迭代器返回元素的数据类型。fromElements(T ...)
- 从给定的对象序列中创建数据流。所有的对象必须属于同一类型。fromParallelCollection(SplittableIterator, Class)
- 从迭代器并行创建数据流。class 参数指定迭代器返回元素的数据类型。generateSequence(from, to)
- 基于给定间隔内的数字序列并行生成数据流。
自定义:
addSource
- 关联一个新的 source function。例如,你可以使用addSource(new FlinkKafkaConsumer<>(...))
来从 Apache Kafka 获取数据。更多详细信息见连接器。
Data Sinks
Data sinks 使用 DataStream 并将它们转发到文件、套接字、外部系统或打印它们。Flink 自带了多种内置的输出格式,这些格式相关的实现封装在 DataStreams 的算子里:
writeAsText()
/TextOutputFormat
- 将元素按行写成字符串。通过调用每个元素的 toString() 方法获得字符串。writeAsCsv(...)
/CsvOutputFormat
- 将元组写成逗号分隔值文件。行和字段的分隔符是可配置的。每个字段的值来自对象的 toString() 方法。print()
/printToErr()
- 在标准输出/标准错误流上打印每个元素的 toString() 值。 可选地,可以提供一个前缀(msg)附加到输出。这有助于区分不同的 print 调用。如果并行度大于1,输出结果将附带输出任务标识符的前缀。writeUsingOutputFormat()
/FileOutputFormat
- 自定义文件输出的方法和基类。支持自定义 object 到 byte 的转换。writeToSocket
- 根据SerializationSchema
将元素写入套接字。addSink
- 调用自定义 sink function。Flink 捆绑了连接到其他系统(例如 Apache Kafka)的连接器,这些连接器被实现为 sink functions。
Scala
注意,DataStream 的 write*()
方法主要用于调试目的。它们不参与 Flink 的 checkpointing,这意味着这些函数通常具有至少有一次语义。刷新到目标系统的数据取决于 OutputFormat 的实现。这意味着并非所有发送到 OutputFormat 的元素都会立即显示在目标系统中。此外,在失败的情况下,这些记录可能会丢失。
为了将流可靠地、精准一次地传输到文件系统中,请使用 StreamingFileSink
。此外,通过 .addSink(...)
方法调用的自定义实现也可以参与 Flink 的 checkpointing,以实现精准一次的语义。
实践
socket作为输入流
执行官网示例代码,该代码基于流窗口的单词统计应用程序,计算 5 秒窗口内来自 Web 套接字的单词数。
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class WindowWordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> dataStream = env
.socketTextStream("localhost", 9999)
.flatMap(new Splitter())
.keyBy(value -> value.f0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.sum(1);
dataStream.print();
env.execute("Window WordCount");
}
public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
for (String word: sentence.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
}
要运行示例程序,首先从终端使用 netcat 启动输入流:
nc -l 9999
在代码启动前,需要安装nc或者ncat,模拟数据请求 一台机器:nc -l 9999 另一台机器 nc 192.168.10.38 2333即可相互通信,在这里,执行nc -l 9999即可
kafka作为输入流
注:FlinkKafkaConsumer
已被弃用并将在 Flink 1.15 中移除,请改用
KafkaSource。