返回介绍

13.3 从 HDFS 导出数据

发布于 2025-04-22 19:57:22 字数 7094 浏览 0 评论 0 收藏

Hadoop 中的分析结果通常会被复制到结构化数据存储中,例如关系型数据库或者 NoSQL 数据库,以便于展现或进一步分析。Spring Batch 的主要使用场景之一就是在文件和数据库之间移动数据并进行处理。本节将使用 Spring Batch 从 HDFS 中导出数据,并对数据执行一些基本操作,接下来将数据存储在 HDFS 之外的地方。数据存储的目标是关系型数据库或者 MongoDB。

13.3.1 从 HDFS 到 JDBC

将 MapReduce Job 所产生的结果数据从 HDFS 移到关系型数据库是很常见的操作。Spring Batch 提供了很多内置组件,这样就可以通过配置来执行这个动作。本节的示例位于 ./hadoop/batch-extract 目录下,示例代码是基于《 Spring Batch in Action 》一书的示例而创建的。示例应用程序的领域模型是一个在线商店,它需要维护所销售产品的目录。这个应用程序最初是从本地文件系统的 Flat 文件中读取产品数据,然后再将其写入到关系型数据库的 product 表中。我们已经修改了示例代码,让它从 HDFS 中读取,为展示 Spring Batch 额外的特性( http://code.google.com/p/springbatch-in-action/ )在代码中增加了错误处理逻辑。

为了将从本地文件系统读取数据替换成从 HDFS 中读取,需要在 Spring 中注册 HdfsResource 加载器,它会使用 Spring 的 Resource 抽象读取 HDFS 中的数据。因为 Spring Batch 的 FlatFileItemReader 类是基于 Resource 的抽象,所以在这里可以使用它。Spring 的 Resource 抽象提供了统一方式来从不同的来源中读取 InputStream,例如 URL(http 和 ftp)、Java 类路径或者标准文件系统。Resource 抽象( http://static.springsource.org/spring/docs/current/spring-framework-reference/html/resources.html )也支持通过使用 Ant 类型的正则表达式来读取多个资源的位置。为了配置 Spring 读取 HDFS 并将 HDFS 设置为默认的资源类型(如 hdfs://hostname:port 而不是 file:// ),请将如示例 13-36 所示的 XML 添加到 Spring 配置文件中 。

示例 13-36 配置默认资源加载器使用 HDFS

P244a

Spring Batch 基本概念,例如 Job、Step、ItemReader、处理器以及写入器,已在 13.1.6 小节“Spring Batch 简介”中介绍过了。在本节中,我们将配置这些组件并介绍它们的一些配置属性。然而,我们不会太全面地探讨如何配置以及运行 Spring Batch 应用。在 Spring Batch 中包括非常丰富的内容,如异常处理、通知、数据校验、数据处理以及纵向和横向扩展,本书无法全部涵盖。如果需要了解更多的信息,可以参阅 Spring 参考手册或者之前提到过的任意一本与 Spring Batch 相关的图书。

示例 13-37 是创建 Spring Batch Job 所需的高级配置,在它所使用的 Step 中,会处理位于 HDFS 中的 MapReduce Job 输出文件并将其写入到数据库。

示例 13-37 Spring Batch Job 的配置,由 HDFS 读取并写入到关系型数据库中

P244b

P245a

这个 Job 只定义了一个 Step,它包含了读取器、处理器和写入器。提交间隔要参考数据项的数量来决定,在提交数据库之前对数据项进行处理以及聚合。在实际使用中,会通过调整提交间隔的值以确定哪些值会产生最高的性能,这个属性的值应该是介于 10 到几百之间。这里也介绍了 Spring Batch 灵活的错误处理机制:使用 skip-limit 和 skippable-exception-classes 属性来设定在处理过程中,该 Step 失效前所允许发生特定错误的次数。skiplimit 属性决定了在 Job 失效前抛出异常次数。在这个例子中,允许抛出异常 FlatFileParseException 共 5 次。为了跟踪所有未正确处理的行,需要配置一个监听器,将错误数据写入到独立的数据库表中。这个监听器继承了 Spring Batch 的 SkipListenerSupport 类,我们重写了方法 onSkipInRead(Throwable t)来提供失败行的信息。

因为要读取许多 MapReduce Job 生成的文件(例如,part-r-00001 和 part-r-00002),所以我们使用了 Spring Batch 的 MultiResourceItemReader,并传入 HDFS 目录名称作为 Job 参数,以及真正从 HDFS 中读取单个文件的 FlatFileItemReader 引用,如示例 13-28 所示。

示例 13-38 Spring Batch HDFS 读取器的配置

P245b

当启动 Job 时,通过编程方式或者在使用 Spring Batch 管理功能时通过 REST API 或 web 应用方式来提供 job 参数 hdfsSourceDirectory。设置 bean 的作用域为 Step 可以解析 jobParameter 变量。示例 13-39 使用主 Java 类来加载 Spring Batch 配置并启动 Job。

示例 13-39 启动带参数的 Spring Batch job

P246a

通过 MultiResourceItemReader 类,可以处理位于 HDFS 目录 /data/analysis/results 中匹配表达式 part-*的多个文件。每一个 MultiResourceItemReader 发现的文件都会被 FlatFileItemReader 处理。文件中的内容样例都如示例 13-40 所示。在这个文件中有 4 个列,分别表示产品 ID、名称、说明和价格(在样例数据中描述为空)。

示例 13-40 导入数据库的 HDFS 文件的样本内容

P221

这里配置了两个 DefaultLineMapper 类型的协同对象来指定 FlatFileItemReader 的功能。第一个是 Spring Batch 提供的 DelimitedLineTokenizer 类,它会读取一行输入,默认情况下会将逗号作为分隔符。字段名称和值都会被放置在 Spring Batch 的 FieldSet 对象中。FieldSet 类似 JDBC 的结果集,但它是从文件中读取数据。FieldSet 允许以名称或者位置来访问列,并且会将这些列的值转换为 Java 类型,如 String、Integer 或者 BigDecimal。第二个协作对象是 ProductFieldSetMapper,这个类是由我们提供的,它会将 FieldSet 转换成自定义的领域对象,如示例中的 Product 类。

当从数据库读取数据并写入到 HDFS 时,ProductFieldSetMapper 非常类似于前面章节中所使用的 ProductRowMapper,如示例 13-41 所示。

示例 13-41 将 FieldSet 转换成 Product 领域对象

P247a

最后要配置的两个部分是 ItemProcessor 与 ItemWriter,如示例 13-42 所示。

示例 13-42 Spring Batch 数据项处理与 JDBC 写入器的配置

P247b

ItemProcessors 通常用于转换、过滤或者校验数据。在示例 13-43 中,使用了简单的过滤器,它会过滤掉产品描述 ID 由 PRI 开始的记录。根据协议,过滤掉符合规则的数据项时,ItemProcessor 会返回 null 值。需要注意的是,如果不想要进行任何处理而是希望直接将输入文件复制到数据库中可以从 tasklet 的 XML 配置块中将处理器属性删除。

示例 13-43 简单的过滤器 ItemProcessor

P247c

JdbcBatchItemWriter 会将一批 SQL 语句组合一起提交到数据库。批次大小为之前所定义的提交间隔。我们使用标准的 JDBC DataSource 连接数据库,并且 SQL 语句为指定的内联方式。使用 SQL 语句的优点是可以使用命名参数来取代位置?占位符。这是由 beanPropertyItemSqlParameterSourceProvider 提供的功能,这样可以使得 Product 对象中属性的名称与 SQL 语句中:name 的值关联。示例 13-44 为使用 H2 数据库的 product 表模式。

示例 13-44 product 的模式定义

P248a

启动数据库、将样例数据复制到 HDFS 中,然后创建 Spring Batch 模式并执行如示例 13-45 所示的命令。运行这些命令后,会启动与 H2 交互的 Web 控制台。

示例 13-45 构建示例并启动数据库

P248b

接下来运行导出程序,如示例 13-46 所示。

示例 13-46 运行导出 Job

P248c

可以使用 H2 的 Web 控制台查看导入的数据。Spring Batch 也提供了管理控制台,可以用它来浏览哪些 Job 可以执行,也可以看到每个执行 Job 的状态。要启动管理控制台,请执行 sh./target/appassembler/bin/launchSpringBatchAdmin ,打开浏览器并访问 http://localhost:8080/springbatchadmin/jobs/executions ,并从表中选择最近执行 Job 的链接。点击特定 Job 执行的链接后,便可以查看 Job 状态的详细内容。

13.3.2 从 HDFS 到 MongoDB

如果要取代关系型数据库,将数据写入到 MongoDB,需要将 ItemWriter 的实现由 JdbcBatchItemWriter 改为 MongoItemWriter。示例 13-47 展示了简单的 MongoItemWriter 实现,它使用 MongoDB 的批处理功能来写入数据项列表,只需调用一次数据库操作即可将数据项插入到集合中。

示例 13-47 写入 MongoDB 的 ItemWirter 实现

P249a

Spring 的 MongoTemplate(实现了 MongoOperations 接口)提供了将 Java 类转换为 MongoDB 内部数据结构格式 DbObject 的功能。可以使用 Mongo XML 命名空间来指定到 MongoDB 的关联。示例 13-48 展示了 MongoItemWriter 的配置以及到 MongoDB 连接的底层依赖。

示例 13-48 从 HDFS 中读取数据并写入到 MongoDB 的 Spring Batch Job 配置

P249b

P250a

再次运行应用程序将会在测试数据库中产生 product 集合的内容,如示例 13-49 所示。

示例 13-49 在 MongoDB Shell 中查看导出的 product 集合

P250b

这个示例展示了如何基于不同的 Spring Data 项目以最少的代码构建重要的新功能。只要遵循 MongoItemWriter 的实现模式,就可以轻松为 Redis 或 GemFire 创建 ItemWriters,它的代码就如同本节所示的代码一样简单。

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。