- Apache Flink 文档
- 概念
- 数据流编程模型
- 分布式运行时环境
- 教程
- API 教程
- DataStream API 教程
- Setup 教程
- 本地安装教程
- 在 Windows 上运行 Flink
- 例子
- 批处理示例
- 应用开发
- 项目构建设置
- Java 项目模板
- Scala 的项目模板
- 配置依赖关系,连接器,库
- 基础 API 概念
- Scala API 扩展
- Java Lambda 表达式
- Flink DataStream API 编程指南
- 活动时间
- 事件时间/处理时间/摄取时间
- 活动时间和水印
- 状态与容错
- 算子
- DataStream 转换
- 物理分区
- 任务链和资源组
- 流连接器
- 旁路输出
- Python 编程指南(流)Beta
- 测试
- 实验特点
- Flink DataSet API 编程指南
- 数据集转换
- 容错
- 在数据集中压缩数据元
- 迭代
- Python 编程指南 Beta
- 连接器
- Hadoop 兼容性测试版
- 本地执行
- 群集执行
- Table API 和 SQL
- 概念和通用 API
- 流处理概念
- 连接到外部系统
- Table API
- SQL
- 内置函数
- 用户定义的源和接收器
- 用户定义的函数
- SQL 客户端测试版
- 数据类型和序列化
- 为 Flink 程序注册自定义序列化程序
- 管理执行
- 执行配置
- 程序打包和分布式执行
- 并行执行
- 执行计划
- 重启策略
- 类库
- FlinkCEP - Flink 的复杂事件处理
- 风暴兼容性 Beta
- 项目配置
- 执行 Storm 拓扑
- 在 Flink 流程序中嵌入 Storm 算子
- Flink Extensions
- Storm 兼容性示例
- Gelly:Flink Graph API
- 图 API
- FlinkML - Flink 的机器学习
- 最佳实践
- API 迁移指南
- 部署和运营
- 集群和部署
- 独立群集
- YARN 设置
- Mesos 设置
- Kubernetes 设置
- Docker 设置
- 亚马逊网络服务(AWS)
- Google Compute Engine 设置
- 先决条件
- 在 Google Compute Engine 上部署 Flink
- MapR 设置
- Hadoop 集成
- JobManager 高可用性(HA)
- 状态和容错
- 检查点
- 保存点
- 状态后台
- 调整检查点和大状态
- 配置
- 生产准备清单
- 命令行界面
- Scala REPL
- Kerberos 身份验证设置和配置
- SSL 设置
- 文件系统
- 升级应用程序和 Flink 版本
- 调试和监控
- 度量
- 如何使用日志记录
- 历史服务器
- 监控检查点
- 监测背压
- 监控 REST API
- 调试 Windows 和事件时间
- 调试类加载
- 应用程序分析
- 使用 Java Flight Recorder 进行性能分析
- 使用 JITWatch 进行分析
- Flink Development
- 将 Flink 导入 IDE
- 从 Source 建立 Flink
- 内幕
- 组件堆栈
- 数据流容错
- 工作和调度
- 任务生命周期
- 文件系统
- 实现
- 坚持保证
- 更新文件内容
- 覆盖文件
- 线程安全
基础 API 概念
Flink 程序是实现分布式集合转换的常规程序(例如,Filter,映射,更新状态,Join,分组,定义窗口,聚合)。集合最初是从源创建的(例如,通过读取文件,kafka 主题或从本地的内存中集合)。结果通过接收器返回,接收器可以例如将数据写入(分布式)文件或标准输出(例如,命令行终端)。Flink 程序可以在各种环境中运行,独立运行或嵌入其他程序中。执行可以在本地 JVM 中执行,也可以在许多计算机的集群上执行。
根据数据源的类型(即有界或无界源),您可以编写批处理程序或流程序,其中 DataSet API 用于批处理,DataStream API 用于流式处理。本指南将介绍两种 API 共有的基本概念,但请参阅我们的 流处理指南 和 批处理指南 ,了解有关使用每个 API 编写程序的具体信息。
注: 当显示的 API 时,如何使用,我们将用实际的例子 StreamingExecutionEnvironment 和 DataStream API。 DataSet API 中的概念完全相同,只需替换为 ExecutionEnvironment 和 DataSet 。
DataSet 和 DataStream
Flink 具有特殊类 DataSet 并 DataStream 在程序中表示数据。您可以将它们视为可以包含重复项的不可变数据集合。在 DataSet 数据有限的情况下,对于一个 DataStream 数据元的数量可以是无界的。
这些集合在某些关键方面与常规 Java 集合不同。首先,它们是不可变的,这意味着一旦创建它们就无法添加或删除数据元。你也不能简单地检查里面的数据元。
集合最初通过在 Flink 程序添加源创建和新的集合从这些通过将它们使用 API 方法如衍生 map , filter 等等。
Flink 计划的剖析
Flink 程序看起来像是转换数据集合的常规程序。每个程序包含相同的基本部分:
我们现在将概述每个步骤,请参阅相应部分以获取更多详细信息。请注意,Java DataSet API 的所有核心类都可以在 org.apache.flink.api.java 包中找到, 而 Java DataStream API 的类可以在 org.apache.flink.streaming.api 中 找到 。
这 StreamExecutionEnvironment 是所有 Flink 计划的基础。您可以使用以下静态方法获取一个 StreamExecutionEnvironment :
getExecutionEnvironment()
createLocalEnvironment()
createRemoteEnvironment(String host, int port, String... jarFiles)
通常,您只需要使用 getExecutionEnvironment() ,因为这将根据上下文做正确的事情:如果您在 IDE 中执行程序或作为常规 Java 程序,它将创建一个本地环境,将在本地计算机上执行您的程序。如果您从程序中创建了一个 JAR 文件,并通过 命令行 调用它 ,则 Flink 集群管理器将执行您的 main 方法并 getExecutionEnvironment() 返回一个运行环境,以便在集群上执行您的程序。
对于指定数据源,运行环境有几种方法可以使用各种方法从文件中读取:您可以逐行读取它们,CSV 文件或使用完全自定义数据输入格式。要将文本文件作为一系列行读取,您可以使用:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.readTextFile("file:///path/to/file");
这将为您提供一个 DataStream,然后您可以在其上应用转换来创建新的派生 DataStream。
您可以通过使用转换函数调用 DataStream 上的方法来应用转换。例如,Map 转换如下所示:
DataStream<String> input = ...;
DataStream<Integer> parsed = input.map(new MapFunction<String, Integer>() {
@Override
public Integer map(String value) {
return Integer.parseInt(value);
}
});
这将通过将原始集合中的每个 String 转换为 Integer 来创建新的 DataStream。
一旦有了包含最终结果的 DataStream,就可以通过创建接收器将其写入外部系统。这些只是创建接收器的一些示例方法:
writeAsText(String path)
print()
我们现在将概述每个步骤,请参阅相应部分以获取更多详细信息。请注意,Scala DataSet API 的所有核心类都可以在 org.apache.flink.api.scala 包中找到, 而 Scala DataStream API 的类可以在 org.apache.flink.streaming.api.scala 中 找到 。
这 StreamExecutionEnvironment 是所有 Flink 计划的基础。您可以使用以下静态方法获取一个 StreamExecutionEnvironment :
getExecutionEnvironment()
createLocalEnvironment()
createRemoteEnvironment(host: String, port: Int, jarFiles: String*)
通常,您只需要使用 getExecutionEnvironment() ,因为这将根据上下文做正确的事情:如果您在 IDE 中执行程序或作为常规 Java 程序,它将创建一个本地环境,将在本地计算机上执行您的程序。如果您从程序中创建了一个 JAR 文件,并通过 命令行 调用它 ,则 Flink 集群管理器将执行您的 main 方法并 getExecutionEnvironment() 返回一个运行环境,以便在集群上执行您的程序。
对于指定数据源,运行环境有几种方法可以使用各种方法从文件中读取:您可以逐行读取它们,CSV 文件或使用完全自定义数据输入格式。要将文本文件作为一系列行读取,您可以使用:
val env = StreamExecutionEnvironment.getExecutionEnvironment()
val text: DataStream[String] = env.readTextFile("file:///path/to/file")
这将为您提供一个 DataStream,然后您可以在其上应用转换来创建新的派生 DataStream。
您可以通过使用转换函数调用 DataSet 上的方法来应用转换。例如,Map 转换如下所示:
val input: DataSet[String] = ...
val mapped = input.map { x => x.toInt }
这将通过将原始集合中的每个 String 转换为 Integer 来创建新的 DataStream。
一旦有了包含最终结果的 DataStream,就可以通过创建接收器将其写入外部系统。这些只是创建接收器的一些示例方法:
writeAsText(path: String)
print()
一旦您指定的完整程序,你需要 触发执行程序 调用 execute() 上 StreamExecutionEnvironment 。根据执行的类型, ExecutionEnvironment 将在本地计算机上触发执行或提交程序以在群集上执行。
该 execute() 方法返回一个 JobExecutionResult ,包含执行时间和累加器结果。
有关流数据源和接收器的信息,请参阅 流指南 ,以及有关 DataStream 上支持的转换的更深入信息。
有关批处理数据源和接收器的信息,请查看 批处理指南 ,以及有关 DataSet 支持的转换的更深入信息。
懒惰的评价
所有 Flink 程序都是懒惰地执行:当执行程序的 main 方法时,数据加载和转换不会直接发生。而是创建每个 算子操作并将其添加到程序的计划中。当 execute() 运行环境上的调用显式触发执行时,实际执行 算子操作。程序是在本地执行还是在集群上执行取决于运行环境的类型
懒惰的评估使您可以构建 Flink 作为一个整体计划单元执行的复杂程序。
指定 Keys
某些转换(join,coGroup,keyBy,groupBy)要求在数据元集合上定义键。其他转换(Reduce,GroupReduce,Aggregate,Windows)允许数据在应用之前在 Keys 上分组。
DataSet 被分组为
DataSet<...> input = // [...]
DataSet<...> reduced = input
.groupBy(/*define key here*/)
.reduceGroup(/*do something*/);
虽然可以使用 DataStream 指定 Keys
DataStream<...> input = // [...]
DataStream<...> windowed = input
.keyBy(/*define key here*/)
.window(/*window specification*/);
Flink 的数据模型不基于键值对。因此,您无需将数据集类型物理打包到键和值中。键是“虚拟的”:它们被定义为实际数据上的函数,以指导分组算子。
注意: 在下面的讨论中,我们将使用 DataStream API 和 keyBy 。对于 DataSet API,您只需要替换为 DataSet 和 groupBy 。
定义元组的键
最简单的情况是在元组的一个或多个字段上对元组进行分组:
DataStream<Tuple3<Integer,String,Long>> input = // [...]
KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0)
val input: DataStream[(Int, String, Long)] = // [...] val keyed = input.keyBy(0)
元组在第一个字段(整数类型)上分组。
DataStream<Tuple3<Integer,String,Long>> input = // [...]
KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0,1)
val input: DataSet[(Int, String, Long)] = // [...] val grouped = input.groupBy(0,1)
在这里,我们将元组分组在由第一个和第二个字段组成的复合键上。
关于嵌套元组的注释:如果你有一个带有嵌套元组的 DataStream,例如:
DataStream<Tuple3<Tuple2<Integer, Float>,String,Long>> ds;
指定 keyBy(0) 将导致系统使用 full Tuple2 作为键(以 Integer 和 Float 为键)。如果要“导航”到嵌套中 Tuple2 ,则必须使用下面解释的字段表达式键。
使用 Field Expressions 定义键
您可以使用基于字符串的字段表达式来引用嵌套字段,并定义用于分组,排序,连接或 coGrouping 的键。
字段表达式可以非常轻松地选择(嵌套)复合类型中的字段,例如 Tuple 和 POJO 类型。
在下面的示例中,我们有一个 WC POJO,其中包含两个字段“word”和“count”。要按字段分组 word ,我们只需将其名称传递给 keyBy() 函数即可。
// some ordinary POJO (Plain old Java Object)
public class WC {
public String word;
public int count;
}
DataStream<WC> words = // [...]
DataStream<WC> wordCounts = words.keyBy("word").window(/*window specification*/);
字段表达式语法 :
- 按字段名称选择 POJO 字段。例如,
"user"指 POJO 类型的“用户”字段。 - 按字段名称或 0 偏移字段索引选择元组字段。例如
"f0",分别"5"引用 Java Tuple 类型的第一个和第六个字段。 - 您可以在 POJO 和 Tuples 中选择嵌套字段。例如,
"user.zip"指 POJO 的“zip”字段,其存储在 POJO 类型的“user”字段中。支持 POJO 和元组的任意嵌套和混合,例如"f1.user.zip"或"user.f3.1.zip"。 - 您可以使用
"*"通配符表达式选择完整类型。这也适用于非 Tuple 或 POJO 类型的类型。
字段表达示例 :
public static class WC {
public ComplexNestedClass complex; //nested POJO
private int count;
// getter / setter for private field (count)
public int getCount() {
return count;
}
public void setCount(int c) {
this.count = c;
}
}
public static class ComplexNestedClass {
public Integer someNumber;
public float someFloat;
public Tuple3<Long, Long, String> word;
public IntWritable hadoopCitizen;
}
这些是上面示例代码的有效字段表达式:
"count":类中的 count 字段WC。"complex":递归选择 POJO 类型的字段复合体的所有字段ComplexNestedClass。"complex.word.f2":选择嵌套的最后一个字段Tuple3。"complex.hadoopCitizen":选择 HadoopIntWritable类型。
在下面的示例中,我们有一个 WC POJO,其中包含两个字段“word”和“count”。要按字段分组 word ,我们只需将其名称传递给 keyBy() 函数即可。
// some ordinary POJO (Plain old Java Object)
class WC(var word: String, var count: Int) {
def this() { this("", 0L) }
}
val words: DataStream[WC] = // [...]
val wordCounts = words.keyBy("word").window(/*window specification*/)
// or, as a case class, which is less typing
case class WC(word: String, count: Int)
val words: DataStream[WC] = // [...]
val wordCounts = words.keyBy("word").window(/*window specification*/)
字段表达式语法 :
- 按字段名称选择 POJO 字段。例如,
"user"指 POJO 类型的“用户”字段。 - 通过 1 偏移字段名称或 0 偏移字段索引选择元组字段。例如
"_1",分别"5"引用 Scala Tuple 类型的第一个和第六个字段。 - 您可以在 POJO 和 Tuples 中选择嵌套字段。例如,
"user.zip"指 POJO 的“zip”字段,其存储在 POJO 类型的“user”字段中。支持 POJO 和元组的任意嵌套和混合,例如"_2.user.zip"或"user._4.1.zip"。 - 您可以使用
"_"通配符表达式选择完整类型。这也适用于非 Tuple 或 POJO 类型的类型。
字段表达示例 :
class WC(var complex: ComplexNestedClass, var count: Int) {
def this() { this(null, 0) }
}
class ComplexNestedClass(
var someNumber: Int,
someFloat: Float,
word: (Long, Long, String),
hadoopCitizen: IntWritable) {
def this() { this(0, 0, (0, 0, ""), new IntWritable(0)) }
}
这些是上面示例代码的有效字段表达式:
"count":类中的 count 字段WC。"complex":递归选择 POJO 类型的字段复合体的所有字段ComplexNestedClass。"complex.word._3":选择嵌套的最后一个字段Tuple3。"complex.hadoopCitizen":选择 HadoopIntWritable类型。
使用键选择器函数定义键
定义键的另一种方法是“键选择器”函数。键选择器函数将单个数据元作为输入并返回数据元的键。Keys 可以是任何类型,并且可以从确定性计算中导出。
以下示例显示了一个键选择器函数,它只返回一个对象的字段:
// some ordinary POJO
public class WC {public String word; public int count;}
DataStream<WC> words = // [...]
KeyedStream<WC> keyed = words
.keyBy(new KeySelector<WC, String>() {
public String getKey(WC wc) { return wc.word; }
});
// some ordinary case class case class WC(word: String, count: Int)
val words: DataStream[WC] = // [...] val keyed = words.keyBy( _.word )
指定转换函数
大多数转换都需要用户定义的函数。本节列出了如何指定它们的不同方法
实现接口
最基本的方法是实现一个提供的接口:
class MyMapFunction implements MapFunction<String, Integer> {
public Integer map(String value) { return Integer.parseInt(value); }
};
data.map(new MyMapFunction());
匿名课程
您可以将函数作为匿名类传递:
data.map(new MapFunction<String, Integer> () {
public Integer map(String value) { return Integer.parseInt(value); }
});
Java 8 Lambdas
Flink 还支持 Java API 中的 Java 8 Lambdas。
data.filter(s -> s.startsWith("http://"));
data.reduce((i1,i2) -> i1 + i2);
函数丰富
需要用户定义函数的所有转换都可以将 富 函数作为参数。例如,而不是
class MyMapFunction implements MapFunction<String, Integer> {
public Integer map(String value) { return Integer.parseInt(value); }
};
你可以写
class MyMapFunction extends RichMapFunction<String, Integer> {
public Integer map(String value) { return Integer.parseInt(value); }
};
并像往常一样将函数传递给 map 转换:
data.map(new MyMapFunction());
丰富的函数也可以定义为匿名类:
data.map (new RichMapFunction<String, Integer>() {
public Integer map(String value) { return Integer.parseInt(value); }
});
Lambda 函数
正如前面的例子中所见,所有 算子操作都接受 lambda 函数来描述 算子操作:
val data: DataSet[String] = // [...] data.filter { _.startsWith("http://") }
val data: DataSet[Int] = // [...] data.reduce { (i1,i2) => i1 + i2 }
// or data.reduce { _ + _ }
函数丰富
将 lambda 函数作为参数的所有转换都可以将 富 函数作为参数。例如,而不是
data.map { x => x.toInt }
你可以写
class MyMapFunction extends RichMapFunction[String, Int] {
def map(in: String):Int = { in.toInt }
};
并将函数传递给 map 转换:
data.map(new MyMapFunction())
丰富的函数也可以定义为匿名类:
data.map (new RichMapFunction[String, Int] {
def map(in: String):Int = { in.toInt }
})
丰富的函数提供,除了用户定义的函数(Map,Reduce 等),四种方法: open , close , getRuntimeContext ,和 setRuntimeContext 。这些用于参数化函数(请参阅将 参数传递给函数 ),创建和完成本地状态,访问广播变量(请参阅 广播变量 )以及访问运行时信息(如累加器和计数器)(请参阅 累加器和计数器 )以及有关信息的信息。迭代(参见 迭代 )。
支持的数据类型
Flink 对可以在 DataSet 或 DataStream 中的数据元类型进行了一些限制。原因是系统分析类型以确定有效的执行策略。
有六种不同类别的数据类型:
- Java 元组 和 Scala 案例类
- Java POJO
- 原始类型
- 常规课程
- 值
- Hadoop Writables
- 特殊类型
元组和案例类
元组是包含固定数量的具有各种类型的字段的复合类型。Java API 提供 Tuple1 最多的类 Tuple25 。元组的每个字段都可以是包含更多元组的任意 Flink 类型,从而产生嵌套元组。可以使用字段名称直接访问元组的字段 tuple.f4 ,或使用通用 getter 方法 tuple.getField(int position) 。字段索引从 0 开始。请注意,这与 Scala 元组形成鲜明对比,但它与 Java 常规索引更为一致。
DataStream<Tuple2<String, Integer>> wordCounts = env.fromElements(
new Tuple2<String, Integer>("hello", 1),
new Tuple2<String, Integer>("world", 2));
wordCounts.map(new MapFunction<Tuple2<String, Integer>, Integer>() {
@Override
public Integer map(Tuple2<String, Integer> value) throws Exception {
return value.f1;
}
});
wordCounts.keyBy(0); // also valid .keyBy("f0")
Scala 案例类(和 Scala 元组是案例类的特例)是包含固定数量的具有各种类型的字段的复合类型。元组字段通过其 1 偏移名称来寻址,例如 _1 第一个字段。案例类字段按名称访问。
case class WordCount(word: String, count: Int)
val input = env.fromElements(
WordCount("hello", 1),
WordCount("world", 2)) // Case Class Data Set
input.keyBy("word")// key by field expression "word"
val input2 = env.fromElements(("hello", 1), ("world", 2)) // Tuple2 Data Set
input2.keyBy(0, 1) // key by field positions 0 and 1
POJOs
如果满足以下要求,则 Flink 将 Java 和 Scala 类视为特殊的 POJO 数据类型:
- 这堂课必须公开。
- 它必须有一个没有参数的公共构造函数(默认构造函数)。
- 所有字段都是公共的,或者必须通过 getter 和 setter 函数访问。对于一个名为
foogetter 和 setter 方法的字段必须命名getFoo()和setFoo()。 - Flink 必须支持字段的类型。目前,Flink 使用 Avro 序列化任意对象(例如
Date)。
Flink 分析了 POJO 类型的结构,即它了解了 POJO 的字段。因此,POJO 类型比一般类型更容易使用。此外,Flink 可以比一般类型更有效地处理 POJO。
以下示例显示了一个包含两个公共字段的简单 POJO。
public class WordWithCount {
public String word;
public int count;
public WordWithCount() {}
public WordWithCount(String word, int count) {
this.word = word;
this.count = count;
}
}
DataStream<WordWithCount> wordCounts = env.fromElements(
new WordWithCount("hello", 1),
new WordWithCount("world", 2));
wordCounts.keyBy("word"); // key by field expression "word"
class WordWithCount(var word: String, var count: Int) {
def this() {
this(null, -1)
}
}
val input = env.fromElements(
new WordWithCount("hello", 1),
new WordWithCount("world", 2)) // Case Class Data Set
input.keyBy("word")// key by field expression "word"
原始类型
Flink 支持所有 Java 和 Scala 的原始类型,如 Integer , String 和 Double 。
一般类别
Flink 支持大多数 Java 和 Scala 类(API 和自定义)。限制适用于包含无法序列化的字段的类,如文件指针,I / O 流或其他本机资源。遵循 Java Beans 约定的类通常可以很好地工作。
所有未标识为 POJO 类型的类(请参阅上面的 POJO 要求)都由 Flink 作为常规类类型处理。Flink 将这些数据类型视为黑盒子,并且无法访问其内容(即,用于有效排序)。使用序列化框架 Kryo 对常规类型进行反序列化。
值
值 类型手动描述它们的序列化和反序列化。它们不是通过通用序列化框架,而是通过 org.apache.flinktypes.Value 使用方法 read 和实现接口为这些 算子操作提供自定义代码 write 。当通用序列化效率非常低时,使用值类型是合理的。一个示例是将数据元的稀疏向量实现为数组的数据类型。知道数组大部分为零,可以对非零数据元使用特殊编码,而通用序列化只需编写所有数组数据元。
该 org.apache.flinktypes.CopyableValue 接口以类似的方式支持手动内部克隆逻辑。
Flink 带有与基本数据类型对应的预定义值类型。( ByteValue , ShortValue , IntValue , LongValue , FloatValue , DoubleValue , StringValue , CharValue , BooleanValue )。这些 Value 类型充当基本数据类型的可变变体:它们的值可以被更改,允许程序员重用对象并从垃圾收集器中消除压力。
Hadoop Writables
您可以使用实现该 org.apache.hadoop.Writable 接口的类型。 write() 和 readFields() 方法中定义的序列化逻辑将用于序列化。
特殊类型
您可以使用特殊类型,包括 Scala 的 Either , Option 和 Try 。Java API 有自己的自定义实现 Either 。与 Scala 类似 Either ,它代表两种可能类型的值, 左 或 右 。 Either 可用于错误处理或需要输出两种不同类型记录的 算子。
类型擦除和类型推断
注意:本节仅适用于 Java。
Java 编译器在编译后抛弃了大部分泛型类型信息。这在 Java 中称为 类型擦除 。这意味着在运行时,对象的实例不再知道其泛型类型。例如,JVM 的实例 DataStream<String> 和 DataStream<Long> 外观相同。
Flink 在准备执行程序时(当调用程序的主要方法时)需要类型信息。Flink Java API 尝试重建以各种方式丢弃的类型信息,并将其显式存储在数据集和 算子中。您可以通过检索类型 DataStream.getType() 。该方法返回一个实例 TypeInformation ,这是 Flink 表示类型的内部方式。
类型推断有其局限性,在某些情况下需要程序员的“合作”。这方面的示例是从集合创建数据集的方法,例如 ExecutionEnvironment.fromCollection(), 您可以传递描述类型的参数的位置。但是通用函数 MapFunction<I, O> 也可能需要额外的类型信息。
该 ResultTypeQueryable 接口可以通过输入格式和函数来实现明确地告诉 API 他们的返回类型。调用函数的 输入类型 通常可以通过先前 算子操作的结果类型来推断。
累加器和计数器
累加器是具有 添加 算子操作 和 最终累积结果的 简单构造,可在作业结束后使用。
最直接的累加器是一个 计数器 :您可以使用该 Accumulator.add(V value) 方法递增它 。在工作结束时,Flink 将汇总(合并)所有部分结果并将结果发送给客户。在调试过程中,或者如果您想快速了解有关数据的更多信息,累加器非常有用。
Flink 目前有以下 内置累加器 。它们中的每一个都实现了 Accumulator 接口。
- IntCounter , LongCounter 和 DoubleCounter :请参阅下面的使用计数器的示例。
- 直方图 :离散数量的区间的直方图实现。在内部,它只是一个从 Integer 到 Integer 的映射。您可以使用它来计算值的分布,例如字数统计程序的每行字数的分布。
如何使用累加器:
首先,您必须在要使用它的用户定义转换函数中创建累加器对象(此处为计数器)。
private IntCounter numLines = new IntCounter();
其次,您必须注册累加器对象,通常在 富 函数的 open() 方法中 。在这里您还可以定义名称。
getRuntimeContext().addAccumulator("num-lines", this.numLines);
您现在可以在 算子函数中的任何位置使用累加器,包括在 open() 和 close() 方法中。
this.numLines.add(1);
整个结果将存储在 JobExecutionResult 从 execute() 运行环境的方法返回的对象中(当前这仅在执行等待作业完成时才有效)。
myJobExecutionResult.getAccumulatorResult("num-lines")
所有累加器每个作业共享一个命名空间。因此,您可以在作业的不同算子函数中使用相同的累加器。Flink 将在内部合并所有具有相同名称的累加器。
关于累加器和迭代的注释:目前累加器的结果仅在整个作业结束后才可用。我们还计划在下一次迭代中使前一次迭代的结果可用。您可以使用 聚合器 来计算每次迭代统计信息,并根据此类统计信息确定迭代的终止。
定制累加器:
要实现自己的累加器,只需编写 Accumulator 接口的实现即可。如果您认为自定义累加器应与 Flink 一起提供,请随意创建拉取请求。
您可以选择实现 Accumulator 或 SimpleAccumulator 。
Accumulator<V,R> 最灵活:它定义 V 要添加的值的类型 R ,以及最终结果的结果类型。例如,对于直方图, V 是数字并且 R 是直方图。 SimpleAccumulator 适用于两种类型相同的情况,例如计数器。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论