返回介绍

基础 API 概念

发布于 2025-05-02 18:19:13 字数 24015 浏览 0 评论 0 收藏

Flink 程序是实现分布式集合转换的常规程序(例如,Filter,映射,更新状态,Join,分组,定义窗口,聚合)。集合最初是从源创建的(例如,通过读取文件,kafka 主题或从本地的内存中集合)。结果通过接收器返回,接收器可以例如将数据写入(分布式)文件或标准输出(例如,命令行终端)。Flink 程序可以在各种环境中运行,独立运行或嵌入其他程序中。执行可以在本地 JVM 中执行,也可以在许多计算机的集群上执行。

根据数据源的类型(即有界或无界源),您可以编写批处理程序或流程序,其中 DataSet API 用于批处理,DataStream API 用于流式处理。本指南将介绍两种 API 共有的基本概念,但请参阅我们的 流处理指南批处理指南 ,了解有关使用每个 API 编写程序的具体信息。

注: 当显示的 API 时,如何使用,我们将用实际的例子 StreamingExecutionEnvironmentDataStream API。 DataSet API 中的概念完全相同,只需替换为 ExecutionEnvironmentDataSet

DataSet 和 DataStream

Flink 具有特殊类 DataSetDataStream 在程序中表示数据。您可以将它们视为可以包含重复项的不可变数据集合。在 DataSet 数据有限的情况下,对于一个 DataStream 数据元的数量可以是无界的。

这些集合在某些关键方面与常规 Java 集合不同。首先,它们是不可变的,这意味着一旦创建它们就无法添加或删除数据元。你也不能简单地检查里面的数据元。

集合最初通过在 Flink 程序添加源创建和新的集合从这些通过将它们使用 API 方法如衍生 mapfilter 等等。

Flink 计划的剖析

Flink 程序看起来像是转换数据集合的常规程序。每个程序包含相同的基本部分:

  1. 获得一个 execution environment
  2. 加载/创建初始数据,
  3. 指定此数据的转换,
  4. 指定放置计算结果的位置,
  5. 触发程序执行
  6. Java
  7. Scala

我们现在将概述每个步骤,请参阅相应部分以获取更多详细信息。请注意,Java DataSet API 的所有核心类都可以在 org.apache.flink.api.java 包中找到, 而 Java DataStream API 的类可以在 org.apache.flink.streaming.api 中 找到 。

StreamExecutionEnvironment 是所有 Flink 计划的基础。您可以使用以下静态方法获取一个 StreamExecutionEnvironment

getExecutionEnvironment()

createLocalEnvironment()

createRemoteEnvironment(String host, int port, String... jarFiles)

通常,您只需要使用 getExecutionEnvironment() ,因为这将根据上下文做正确的事情:如果您在 IDE 中执行程序或作为常规 Java 程序,它将创建一个本地环境,将在本地计算机上执行您的程序。如果您从程序中创建了一个 JAR 文件,并通过 命令行 调用它 ,则 Flink 集群管理器将执行您的 main 方法并 getExecutionEnvironment() 返回一个运行环境,以便在集群上执行您的程序。

对于指定数据源,运行环境有几种方法可以使用各种方法从文件中读取:您可以逐行读取它们,CSV 文件或使用完全自定义数据输入格式。要将文本文件作为一系列行读取,您可以使用:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> text = env.readTextFile("file:///path/to/file");

这将为您提供一个 DataStream,然后您可以在其上应用转换来创建新的派生 DataStream。

您可以通过使用转换函数调用 DataStream 上的方法来应用转换。例如,Map 转换如下所示:

DataStream<String> input = ...;

DataStream<Integer> parsed = input.map(new MapFunction<String, Integer>() {
  @Override
  public Integer map(String value) {
    return Integer.parseInt(value);
  }
});

这将通过将原始集合中的每个 String 转换为 Integer 来创建新的 DataStream。

一旦有了包含最终结果的 DataStream,就可以通过创建接收器将其写入外部系统。这些只是创建接收器的一些示例方法:

writeAsText(String path)

print()

我们现在将概述每个步骤,请参阅相应部分以获取更多详细信息。请注意,Scala DataSet API 的所有核心类都可以在 org.apache.flink.api.scala 包中找到, 而 Scala DataStream API 的类可以在 org.apache.flink.streaming.api.scala 中 找到 。

StreamExecutionEnvironment 是所有 Flink 计划的基础。您可以使用以下静态方法获取一个 StreamExecutionEnvironment

getExecutionEnvironment()

createLocalEnvironment()

createRemoteEnvironment(host: String, port: Int, jarFiles: String*)

通常,您只需要使用 getExecutionEnvironment() ,因为这将根据上下文做正确的事情:如果您在 IDE 中执行程序或作为常规 Java 程序,它将创建一个本地环境,将在本地计算机上执行您的程序。如果您从程序中创建了一个 JAR 文件,并通过 命令行 调用它 ,则 Flink 集群管理器将执行您的 main 方法并 getExecutionEnvironment() 返回一个运行环境,以便在集群上执行您的程序。

对于指定数据源,运行环境有几种方法可以使用各种方法从文件中读取:您可以逐行读取它们,CSV 文件或使用完全自定义数据输入格式。要将文本文件作为一系列行读取,您可以使用:

val env = StreamExecutionEnvironment.getExecutionEnvironment()

val text: DataStream[String] = env.readTextFile("file:///path/to/file")

这将为您提供一个 DataStream,然后您可以在其上应用转换来创建新的派生 DataStream。

您可以通过使用转换函数调用 DataSet 上的方法来应用转换。例如,Map 转换如下所示:

val input: DataSet[String] = ...

val mapped = input.map { x => x.toInt }

这将通过将原始集合中的每个 String 转换为 Integer 来创建新的 DataStream。

一旦有了包含最终结果的 DataStream,就可以通过创建接收器将其写入外部系统。这些只是创建接收器的一些示例方法:

writeAsText(path: String)

print()

一旦您指定的完整程序,你需要 触发执行程序 调用 execute()StreamExecutionEnvironment 。根据执行的类型, ExecutionEnvironment 将在本地计算机上触发执行或提交程序以在群集上执行。

execute() 方法返回一个 JobExecutionResult ,包含执行时间和累加器结果。

有关流数据源和接收器的信息,请参阅 流指南 ,以及有关 DataStream 上支持的转换的更深入信息。

有关批处理数据源和接收器的信息,请查看 批处理指南 ,以及有关 DataSet 支持的转换的更深入信息。

懒惰的评价

所有 Flink 程序都是懒惰地执行:当执行程序的 main 方法时,数据加载和转换不会直接发生。而是创建每个 算子操作并将其添加到程序的计划中。当 execute() 运行环境上的调用显式触发执行时,实际执行 算子操作。程序是在本地执行还是在集群上执行取决于运行环境的类型

懒惰的评估使您可以构建 Flink 作为一个整体计划单元执行的复杂程序。

指定 Keys

某些转换(join,coGroup,keyBy,groupBy)要求在数据元集合上定义键。其他转换(Reduce,GroupReduce,Aggregate,Windows)允许数据在应用之前在 Keys 上分组。

DataSet 被分组为

DataSet<...> input = // [...]
DataSet<...> reduced = input
  .groupBy(/*define key here*/)
  .reduceGroup(/*do something*/);

虽然可以使用 DataStream 指定 Keys

DataStream<...> input = // [...]
DataStream<...> windowed = input
  .keyBy(/*define key here*/)
  .window(/*window specification*/);

Flink 的数据模型不基于键值对。因此,您无需将数据集类型物理打包到键和值中。键是“虚拟的”:它们被定义为实际数据上的函数,以指导分组算子。

注意: 在下面的讨论中,我们将使用 DataStream API 和 keyBy 。对于 DataSet API,您只需要替换为 DataSetgroupBy

定义元组的键

最简单的情况是在元组的一个或多个字段上对元组进行分组:

DataStream<Tuple3<Integer,String,Long>> input = // [...]
KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0)
val input: DataStream[(Int, String, Long)] = // [...] val keyed = input.keyBy(0)

元组在第一个字段(整数类型)上分组。

DataStream<Tuple3<Integer,String,Long>> input = // [...]
KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0,1)
val input: DataSet[(Int, String, Long)] = // [...] val grouped = input.groupBy(0,1)

在这里,我们将元组分组在由第一个和第二个字段组成的复合键上。

关于嵌套元组的注释:如果你有一个带有嵌套元组的 DataStream,例如:

DataStream<Tuple3<Tuple2<Integer, Float>,String,Long>> ds;

指定 keyBy(0) 将导致系统使用 full Tuple2 作为键(以 Integer 和 Float 为键)。如果要“导航”到嵌套中 Tuple2 ,则必须使用下面解释的字段表达式键。

使用 Field Expressions 定义键

您可以使用基于字符串的字段表达式来引用嵌套字段,并定义用于分组,排序,连接或 coGrouping 的键。

字段表达式可以非常轻松地选择(嵌套)复合类型中的字段,例如 TuplePOJO 类型。

在下面的示例中,我们有一个 WC POJO,其中包含两个字段“word”和“count”。要按字段分组 word ,我们只需将其名称传递给 keyBy() 函数即可。

// some ordinary POJO (Plain old Java Object)
public class WC {
  public String word;
  public int count;
}
DataStream<WC> words = // [...]
DataStream<WC> wordCounts = words.keyBy("word").window(/*window specification*/);

字段表达式语法

  • 按字段名称选择 POJO 字段。例如, "user" 指 POJO 类型的“用户”字段。
  • 按字段名称或 0 偏移字段索引选择元组字段。例如 "f0" ,分别 "5" 引用 Java Tuple 类型的第一个和第六个字段。
  • 您可以在 POJO 和 Tuples 中选择嵌套字段。例如, "user.zip" 指 POJO 的“zip”字段,其存储在 POJO 类型的“user”字段中。支持 POJO 和元组的任意嵌套和混合,例如 "f1.user.zip""user.f3.1.zip"
  • 您可以使用 "*" 通配符表达式选择完整类型。这也适用于非 Tuple 或 POJO 类型的类型。

字段表达示例

public static class WC {
  public ComplexNestedClass complex; //nested POJO
  private int count;
  // getter / setter for private field (count)
  public int getCount() {
  return count;
  }
  public void setCount(int c) {
  this.count = c;
  }
}
public static class ComplexNestedClass {
  public Integer someNumber;
  public float someFloat;
  public Tuple3<Long, Long, String> word;
  public IntWritable hadoopCitizen;
}

这些是上面示例代码的有效字段表达式:

  • "count" :类中的 count 字段 WC
  • "complex" :递归选择 POJO 类型的字段复合体的所有字段 ComplexNestedClass
  • "complex.word.f2" :选择嵌套的最后一个字段 Tuple3
  • "complex.hadoopCitizen" :选择 Hadoop IntWritable 类型。

在下面的示例中,我们有一个 WC POJO,其中包含两个字段“word”和“count”。要按字段分组 word ,我们只需将其名称传递给 keyBy() 函数即可。

// some ordinary POJO (Plain old Java Object)
class WC(var word: String, var count: Int) {
  def this() { this("", 0L) }
}
val words: DataStream[WC] = // [...]
val wordCounts = words.keyBy("word").window(/*window specification*/)

// or, as a case class, which is less typing
case class WC(word: String, count: Int)
val words: DataStream[WC] = // [...]
val wordCounts = words.keyBy("word").window(/*window specification*/)

字段表达式语法

  • 按字段名称选择 POJO 字段。例如, "user" 指 POJO 类型的“用户”字段。
  • 通过 1 偏移字段名称或 0 偏移字段索引选择元组字段。例如 "_1" ,分别 "5" 引用 Scala Tuple 类型的第一个和第六个字段。
  • 您可以在 POJO 和 Tuples 中选择嵌套字段。例如, "user.zip" 指 POJO 的“zip”字段,其存储在 POJO 类型的“user”字段中。支持 POJO 和元组的任意嵌套和混合,例如 "_2.user.zip""user._4.1.zip"
  • 您可以使用 "_" 通配符表达式选择完整类型。这也适用于非 Tuple 或 POJO 类型的类型。

字段表达示例

class WC(var complex: ComplexNestedClass, var count: Int) {
  def this() { this(null, 0) }
}

class ComplexNestedClass(
  var someNumber: Int,
  someFloat: Float,
  word: (Long, Long, String),
  hadoopCitizen: IntWritable) {
  def this() { this(0, 0, (0, 0, ""), new IntWritable(0)) }
}

这些是上面示例代码的有效字段表达式:

  • "count" :类中的 count 字段 WC
  • "complex" :递归选择 POJO 类型的字段复合体的所有字段 ComplexNestedClass
  • "complex.word._3" :选择嵌套的最后一个字段 Tuple3
  • "complex.hadoopCitizen" :选择 Hadoop IntWritable 类型。

使用键选择器函数定义键

定义键的另一种方法是“键选择器”函数。键选择器函数将单个数据元作为输入并返回数据元的键。Keys 可以是任何类型,并且可以从确定性计算中导出。

以下示例显示了一个键选择器函数,它只返回一个对象的字段:

// some ordinary POJO
public class WC {public String word; public int count;}
DataStream<WC> words = // [...]
KeyedStream<WC> keyed = words
  .keyBy(new KeySelector<WC, String>() {
   public String getKey(WC wc) { return wc.word; }
   });
// some ordinary case class case class WC(word: String, count: Int)
val words: DataStream[WC] = // [...] val keyed = words.keyBy( _.word )

指定转换函数

大多数转换都需要用户定义的函数。本节列出了如何指定它们的不同方法

实现接口

最基本的方法是实现一个提供的接口:

class MyMapFunction implements MapFunction<String, Integer> {
  public Integer map(String value) { return Integer.parseInt(value); }
};
data.map(new MyMapFunction());

匿名课程

您可以将函数作为匿名类传递:

data.map(new MapFunction<String, Integer> () {
  public Integer map(String value) { return Integer.parseInt(value); }
});

Java 8 Lambdas

Flink 还支持 Java API 中的 Java 8 Lambdas。

data.filter(s -> s.startsWith("http://"));
data.reduce((i1,i2) -> i1 + i2);

函数丰富

需要用户定义函数的所有转换都可以将 富 函数作为参数。例如,而不是

class MyMapFunction implements MapFunction<String, Integer> {
  public Integer map(String value) { return Integer.parseInt(value); }
};

你可以写

class MyMapFunction extends RichMapFunction<String, Integer> {
  public Integer map(String value) { return Integer.parseInt(value); }
};

并像往常一样将函数传递给 map 转换:

data.map(new MyMapFunction());

丰富的函数也可以定义为匿名类:

data.map (new RichMapFunction<String, Integer>() {
  public Integer map(String value) { return Integer.parseInt(value); }
});

Lambda 函数

正如前面的例子中所见,所有 算子操作都接受 lambda 函数来描述 算子操作:

val data: DataSet[String] = // [...] data.filter { _.startsWith("http://") }
val data: DataSet[Int] = // [...] data.reduce { (i1,i2) => i1 + i2 }
// or data.reduce { _ + _ }

函数丰富

将 lambda 函数作为参数的所有转换都可以将 富 函数作为参数。例如,而不是

data.map { x => x.toInt }

你可以写

class MyMapFunction extends RichMapFunction[String, Int] {
  def map(in: String):Int = { in.toInt }
};

并将函数传递给 map 转换:

data.map(new MyMapFunction())

丰富的函数也可以定义为匿名类:

data.map (new RichMapFunction[String, Int] {
  def map(in: String):Int = { in.toInt }
})

丰富的函数提供,除了用户定义的函数(Map,Reduce 等),四种方法: openclosegetRuntimeContext ,和 setRuntimeContext 。这些用于参数化函数(请参阅将 参数传递给函数 ),创建和完成本地状态,访问广播变量(请参阅 广播变量 )以及访问运行时信息(如累加器和计数器)(请参阅 累加器和计数器 )以及有关信息的信息。迭代(参见 迭代 )。

支持的数据类型

Flink 对可以在 DataSet 或 DataStream 中的数据元类型进行了一些限制。原因是系统分析类型以确定有效的执行策略。

有六种不同类别的数据类型:

  1. Java 元组Scala 案例类
  2. Java POJO
  3. 原始类型
  4. 常规课程
  5. Hadoop Writables
  6. 特殊类型

元组和案例类

元组是包含固定数量的具有各种类型的字段的复合类型。Java API 提供 Tuple1 最多的类 Tuple25 。元组的每个字段都可以是包含更多元组的任意 Flink 类型,从而产生嵌套元组。可以使用字段名称直接访问元组的字段 tuple.f4 ,或使用通用 getter 方法 tuple.getField(int position) 。字段索引从 0 开始。请注意,这与 Scala 元组形成鲜明对比,但它与 Java 常规索引更为一致。

DataStream<Tuple2<String, Integer>> wordCounts = env.fromElements(
  new Tuple2<String, Integer>("hello", 1),
  new Tuple2<String, Integer>("world", 2));

wordCounts.map(new MapFunction<Tuple2<String, Integer>, Integer>() {
  @Override
  public Integer map(Tuple2<String, Integer> value) throws Exception {
    return value.f1;
  }
});

wordCounts.keyBy(0); // also valid .keyBy("f0")

Scala 案例类(和 Scala 元组是案例类的特例)是包含固定数量的具有各种类型的字段的复合类型。元组字段通过其 1 偏移名称来寻址,例如 _1 第一个字段。案例类字段按名称访问。

case class WordCount(word: String, count: Int)
val input = env.fromElements(
  WordCount("hello", 1),
  WordCount("world", 2)) // Case Class Data Set 
input.keyBy("word")// key by field expression "word" 
val input2 = env.fromElements(("hello", 1), ("world", 2)) // Tuple2 Data Set 
input2.keyBy(0, 1) // key by field positions 0 and 1

POJOs

如果满足以下要求,则 Flink 将 Java 和 Scala 类视为特殊的 POJO 数据类型:

  • 这堂课必须公开。
  • 它必须有一个没有参数的公共构造函数(默认构造函数)。
  • 所有字段都是公共的,或者必须通过 getter 和 setter 函数访问。对于一个名为 foo getter 和 setter 方法的字段必须命名 getFoo()setFoo()
  • Flink 必须支持字段的类型。目前,Flink 使用 Avro 序列化任意对象(例如 Date )。

Flink 分析了 POJO 类型的结构,即它了解了 POJO 的字段。因此,POJO 类型比一般类型更容易使用。此外,Flink 可以比一般类型更有效地处理 POJO。

以下示例显示了一个包含两个公共字段的简单 POJO。

public class WordWithCount {

  public String word;
  public int count;

  public WordWithCount() {}

  public WordWithCount(String word, int count) {
    this.word = word;
    this.count = count;
  }
}

DataStream<WordWithCount> wordCounts = env.fromElements(
  new WordWithCount("hello", 1),
  new WordWithCount("world", 2));

wordCounts.keyBy("word"); // key by field expression "word"
class WordWithCount(var word: String, var count: Int) {
  def this() {
    this(null, -1)
  }
}

val input = env.fromElements(
  new WordWithCount("hello", 1),
  new WordWithCount("world", 2)) // Case Class Data Set 
input.keyBy("word")// key by field expression "word"

原始类型

Flink 支持所有 Java 和 Scala 的原始类型,如 IntegerStringDouble

一般类别

Flink 支持大多数 Java 和 Scala 类(API 和自定义)。限制适用于包含无法序列化的字段的类,如文件指针,I / O 流或其他本机资源。遵循 Java Beans 约定的类通常可以很好地工作。

所有未标识为 POJO 类型的类(请参阅上面的 POJO 要求)都由 Flink 作为常规类类型处理。Flink 将这些数据类型视为黑盒子,并且无法访问其内容(即,用于有效排序)。使用序列化框架 Kryo 对常规类型进行反序列化。

值 类型手动描述它们的序列化和反序列化。它们不是通过通用序列化框架,而是通过 org.apache.flinktypes.Value 使用方法 read 和实现接口为这些 算子操作提供自定义代码 write 。当通用序列化效率非常低时,使用值类型是合理的。一个示例是将数据元的稀疏向量实现为数组的数据类型。知道数组大部分为零,可以对非零数据元使用特殊编码,而通用序列化只需编写所有数组数据元。

org.apache.flinktypes.CopyableValue 接口以类似的方式支持手动内部克隆逻辑。

Flink 带有与基本数据类型对应的预定义值类型。( ByteValueShortValueIntValueLongValueFloatValueDoubleValueStringValueCharValueBooleanValue )。这些 Value 类型充当基本数据类型的可变变体:它们的值可以被更改,允许程序员重用对象并从垃圾收集器中消除压力。

Hadoop Writables

您可以使用实现该 org.apache.hadoop.Writable 接口的类型。 write()readFields() 方法中定义的序列化逻辑将用于序列化。

特殊类型

您可以使用特殊类型,包括 Scala 的 EitherOptionTry 。Java API 有自己的自定义实现 Either 。与 Scala 类似 Either ,它代表两种可能类型的值, 左 或 右 。 Either 可用于错误处理或需要输出两种不同类型记录的 算子。

类型擦除和类型推断

注意:本节仅适用于 Java。

Java 编译器在编译后抛弃了大部分泛型类型信息。这在 Java 中称为 类型擦除 。这意味着在运行时,对象的实例不再知道其泛型类型。例如,JVM 的实例 DataStream&lt;String&gt;DataStream&lt;Long&gt; 外观相同。

Flink 在准备执行程序时(当调用程序的主要方法时)需要类型信息。Flink Java API 尝试重建以各种方式丢弃的类型信息,并将其显式存储在数据集和 算子中。您可以通过检索类型 DataStream.getType() 。该方法返回一个实例 TypeInformation ,这是 Flink 表示类型的内部方式。

类型推断有其局限性,在某些情况下需要程序员的“合作”。这方面的示例是从集合创建数据集的方法,例如 ExecutionEnvironment.fromCollection(), 您可以传递描述类型的参数的位置。但是通用函数 MapFunction&lt;I, O&gt; 也可能需要额外的类型信息。

ResultTypeQueryable 接口可以通过输入格式和函数来实现明确地告诉 API 他们的返回类型。调用函数的 输入类型 通常可以通过先前 算子操作的结果类型来推断。

累加器和计数器

累加器是具有 添加 算子操作最终累积结果的 简单构造,可在作业结束后使用。

最直接的累加器是一个 计数器 :您可以使用该 Accumulator.add(V value) 方法递增它 。在工作结束时,Flink 将汇总(合并)所有部分结果并将结果发送给客户。在调试过程中,或者如果您想快速了解有关数据的更多信息,累加器非常有用。

Flink 目前有以下 内置累加器 。它们中的每一个都实现了 Accumulator 接口。

  • IntCounterLongCounterDoubleCounter :请参阅下面的使用计数器的示例。
  • 直方图 :离散数量的区间的直方图实现。在内部,它只是一个从 Integer 到 Integer 的映射。您可以使用它来计算值的分布,例如字数统计程序的每行字数的分布。

如何使用累加器:

首先,您必须在要使用它的用户定义转换函数中创建累加器对象(此处为计数器)。

private IntCounter numLines = new IntCounter();

其次,您必须注册累加器对象,通常在 富 函数的 open() 方法中 。在这里您还可以定义名称。

getRuntimeContext().addAccumulator("num-lines", this.numLines);

您现在可以在 算子函数中的任何位置使用累加器,包括在 open()close() 方法中。

this.numLines.add(1);

整个结果将存储在 JobExecutionResultexecute() 运行环境的方法返回的对象中(当前这仅在执行等待作业完成时才有效)。

myJobExecutionResult.getAccumulatorResult("num-lines")

所有累加器每个作业共享一个命名空间。因此,您可以在作业的不同算子函数中使用相同的累加器。Flink 将在内部合并所有具有相同名称的累加器。

关于累加器和迭代的注释:目前累加器的结果仅在整个作业结束后才可用。我们还计划在下一次迭代中使前一次迭代的结果可用。您可以使用 聚合器 来计算每次迭代统计信息,并根据此类统计信息确定迭代的终止。

定制累加器:

要实现自己的累加器,只需编写 Accumulator 接口的实现即可。如果您认为自定义累加器应与 Flink 一起提供,请随意创建拉取请求。

您可以选择实现 AccumulatorSimpleAccumulator

Accumulator&lt;V,R&gt; 最灵活:它定义 V 要添加的值的类型 R ,以及最终结果的结果类型。例如,对于直方图, V 是数字并且 R 是直方图。 SimpleAccumulator 适用于两种类型相同的情况,例如计数器。

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。