12.3 Spring WebFlux 实战
在第 11 章中通过一个促销活动的例子展示了 Spring Boot 的微服务开发过程。本节将采用 Spring WebFlux 框架重新改造一下促销活动的微服务项目。
在第 11 章中,microservice-promotion 项目是基于 Spring Boot 开发的,这里将使用 Spring WebFlux 框架进行项目改造,并进行完整的代码展示。
(1)在 pom.xml 文件中添加相关依赖,引入 spring-boot-starter-data-redis-reactive 包,具体依赖如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://
www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://
maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.9.RELEASE</version>
<relativePath/> <!--lookup parent from repository -->
</parent>
<groupId>com.example.microservice.promotion</groupId>
<artifactId>microservice-promotion</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>microservice-promotion</name>
<description>microservice-promotion project for Spring Boot
</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis-reactive
</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>2.2.5.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config
</artifactId>
<version>2.2.5.RELEASE</version>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery
</artifactId>
<version>2.2.5.RELEASE</version>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-sentinel
</artifactId>
<version>2.2.5.RELEASE</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.2.3</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>(2)修改 application.xml 配置文件,在其中配置数据库连接方式,代码如下:
server:
port: 8081
spring:
application:
name: microservice-promotion(3)由于集成了 Nacos 和 Sentinel 中间件,因此需要修改 bootstrap.xml 配置文件,代码如下:
spring:
cloud:
nacos:
discovery:
server-addr: 127.0.0.1:8848
ip: 127.0.0.1
port: 80
namespace: 40421527-56ff-410b-8ca8-e025aca9e946
group: default
config:
server-addr: 127.0.0.1:8848
file-extension: properties
namespace: 40421527-56ff-410b-8ca8-e025aca9e946
group: default
sentinel:
enabled: true
transport:
dashboard: 127.0.0.1:8888
clientIp: 127.0.0.1
port: 8719
log:
dir: /log/sentinel
filter:
enabled: false
management:
endpoint:
metrics:
enabled: true
prometheus:
enabled: true
endpoints:
web:
base-path: /
exposure:
include: health,info,status,prometheus
metrics:
export:
prometheus:
enabled: true
tags:
application: ${spring.application.name}
web:
server:
request:
autotime:
enabled: true
percentiles-histogram: on
percentiles:
-0.9
-0.99
client:
request:
autotime:
enabled: true
percentiles-histogram: on
percentiles:
-0.9
-0.99(4)本例使用 log4j2 日志架构,配置如下:
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="WARN">
<properties>
<property name="LOG_HOME">/log</property>
</properties>
<Appenders>
<Console name="CONSOLE" target="SYSTEM_OUT" >
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5p
[%t] %c{1.} %msg%n"/>
</Console>
<RollingRandomAccessFile name="INFO_FILE" fileName=
"${LOG_HOME}/info.log"
filePattern="${LOG_HOME}/info-%d{HH}-
%i.log" immediateFlush="true">
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS}
[%traceId] %-5p %c{1.} %msg%n"/>
<Policies>
<TimeBasedTriggeringPolicy />
</Policies>
<DefaultRolloverStrategy max="1"/>
<Filters>
<ThresholdFilter level="error" onMatch="ACCEPT"
onMismatch="NEUTRAL"/>
<ThresholdFilter level="info" onMatch="ACCEPT"
onMismatch="DENY"/>
</Filters>
</RollingRandomAccessFile>
</Appenders>
<Loggers>
<Root level="info">
<AppenderRef ref="CONSOLE" />
<AppenderRef ref="INFO_FILE" />
</Root>
</Loggers>
</Configuration>(5)将 Redis 配置信息集成到 Nacos 上,具体的 Redis 信息如下:
redis.promotion.host=127.0.0.1 redis.promotion.port=6379 redis.promotion.password=test redis.promotion.maxTotal=2000 redis.promotion.maxIdle=100 redis.promotion.minIdle=40 redis.promotion.maxWaitMillis=3000 redis.promotion.timeBetweenEvictionRunsMillis=30000 redis.promotion.commandTimeout=3000
(6)Redis 自动配置如下:
新建 RedisProperties.class 文件,代码如下:
package com.example.promotion.config;
import lombok.Data;
import org.springframework.boot.context.properties.Configuration
Properties;
@Data
@ConfigurationProperties(prefix = "redis")
public class RedisProperties {
private RedisInfo promotion;
@Data
public static class RedisInfo{
protected int maxTotal = 2000; //最大连接数
protected int maxIdle = 100; //最大空闲数
protected int minIdle = 40; //最小空闲数
protected int maxWaitMillis = 3000; //最长等待时间
//空闲回收休眠时间
protected int timeBetweenEvictionRunsMillis = 30000;
protected int commandTimeout = 3000; //命令执行超时时间
private String host; //Redis 地址
private int port; //Redis 端口
private String password; //Redis 密码
}
}新建 RedisAutoConfiguration.class 文件,代码如下:
package com.example.promotion.config;
import java.time.Duration;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.springframework.boot.autoconfigure.condition.Conditional
OnClass;
import org.springframework.boot.autoconfigure.condition.Conditional
OnProperty;
import org.springframework.boot.context.properties.EnableConfiguration
Properties;
import org.springframework.cloud.context.config.annotation.Refresh
Scope;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisStandalone
Configuration;
import org.springframework.data.redis.connection.lettuce.Lettuce
ClientConfiguration;
import org.springframework.data.redis.connection.lettuce.Lettuce
ConnectionFactory;
import org.springframework.data.redis.connection.lettuce.Lettuce
PoolingClientConfiguration;
import org.springframework.data.redis.core. ReactiveStringRedis
Template;
@ConditionalOnClass(LettuceConnectionFactory.class)
@Configuration
@EnableConfigurationProperties(RedisProperties.class)
@ConditionalOnProperty("redis.promotion.host")
public class RedisAutoConfiguration {
@Bean
@RefreshScope
public GenericObjectPoolConfig genericObjectPoolConfig(Redis
Properties properties) {
//通用线程池配置
GenericObjectPoolConfig genericObjectPoolConfig = new Generic
ObjectPoolConfig();
//设置最大连接数
genericObjectPoolConfig.setMaxTotal(properties.getPromotion().
getMaxTotal());
//设置最大空闲数
genericObjectPoolConfig.setMaxIdle(properties.getPromotion().
getMaxIdle());
//设置最小空闲数
genericObjectPoolConfig.setMinIdle(properties.getPromotion().
getMinIdle());
//设置最长等待时间
genericObjectPoolConfig.setMaxWaitMillis(properties.get
Promotion().getMaxWaitMillis());
//从连接池取出连接时检查有效性
genericObjectPoolConfig.setTestOnBorrow(true);
//连接返回时检查有效性
genericObjectPoolConfig.setTestOnReturn(true);
//空闲时检查有效性
genericObjectPoolConfig.setTestWhileIdle(true);
//空闲回收休眠时间
genericObjectPoolConfig.setTimeBetweenEvictionRunsMillis
(properties.getPromotion().getTimeBetweenEvictionRunsMillis());
return genericObjectPoolConfig;
}
@Bean
@RefreshScope
public LettuceClientConfiguration lettuceClientConfiguration
(RedisProperties properties, GenericObjectPoolConfig genericObject
PoolConfig) {
//Lettuce 客户端配置
LettucePoolingClientConfiguration build = LettucePooling
ClientConfiguration.builder()
.commandTimeout(Duration.ofMillis(properties.get
Promotion().getCommandTimeout()))
.shutdownTimeout(Duration.ZERO)
.poolConfig(genericObjectPoolConfig)
.build();
return build;
}
@Bean
@RefreshScope
public LettuceConnectionFactory lettuceConnectionFactory
(RedisProperties properties,
LettuceClientConfiguration lettuceClientConfiguration) {
//Redis 配置
RedisStandaloneConfiguration redisConfiguration = new
RedisStandaloneConfiguration(properties.getPromotion().getHost(),
properties.getPromotion().getPort());
redisConfiguration.setPassword(properties.getPromotion().
getPassword());
//Lettuce 连接工厂
LettuceConnectionFactory lettuceConnectionFactory = new
LettuceConnectionFactory(redisConfiguration, lettuceClientConfiguration);
return lettuceConnectionFactory;
}
@Bean(name = "redisTemplate")
public ReactiveStringRedisTemplate reactiveStringRedisTemplate
(LettuceConnectionFactory lettuceConnectionFactory) {
//StringRedisTemplate 声明
return new ReactiveStringRedisTemplate(lettuceConnection
Factory, RedisSerializationContext.string());
}
}(7)新建 Sentinel 切面配置,代码如下:
package com.example.promotion.config;
import com.alibaba.csp.sentinel.annotation.aspectj.SentinelResource
Aspect;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class SentinelConfig {
@Bean
public SentinelResourceAspect sentinelResourceAspect() {
//Sentinel 切面声明
return new SentinelResourceAspect();
}
}(8)新建 Model 层对象 PromotionEntity,代码如下:
package com.example.microservice.promotion.model;
import java.io.Serializable;
import lombok.Data;
@Data
public class PromotionEntity implements Serializable {
private static final long serialVersionUID = 1L;
//促销活动 id
private Integer id;
//促销活动名称
private String name;
//促销活动开始时间
private Integer beginTime;
//促销活动结束时间
private Integer endTime;
//活动奖品
private String prize;
}(9)接口返回通用状态码及 Redis 主键操作 key 声明。新增 Constant.class 文件,代码如下:
package com.example.promotion.constants;
public class Constant {
//接口成功返回状态码
public static final String SUCCESS_CODE = "S00000";
//接口失败返回状态码
public static final String ERROR_CODE = "F00001";
//接口成功返回信息
public static final String SUCCESS_MSG = "success";
//促销活动 Redis 存储结构 key
public static final String REDIS_PROMOTION_KEY = "promotion:{0}";
//活动奖品领取记录
public static final String REDIS_PRIZE_KEY = "promotion:{0}:{1}";
}(10)PromotionPushController 接口代码如下:
package com.example.microservice.promotion.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
import com.alibaba.csp.sentinel.EntryType;
import com.alibaba.csp.sentinel.annotation.SentinelResource;
import com.example.microservice.promotion.constants.Constant;
import com.example.microservice.promotion.service.BlockHandlerService;
import com.example.microservice.promotion.service.FallBackService;
import com.example.microservice.promotion.service.PromotionPushService;
import cn.hutool.json.JSONObject;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;
@Slf4j
@RestController
@RequestMapping("/api")
public class PromotionPushController {
@Autowired
private PromotionPushService promotionPushService;
//促销活动投放接口,/api/pushPromotion?id=xx
@GetMapping("pushPromotion")
@ResponseBody
@SentinelResource(value = "pushPromotion", entryType = EntryType.IN,
blockHandler = "promotionPushBlockHandle", blockHandlerClass =
{BlockHandlerService.class}, defaultFallback = "fallback", fallback
Class = {FallBackService.class})
public Mono<ResponseEntity<JSONObject>> pushPromotion(Integer id) {
Mono<ResponseEntity<JSONObject>> mono = Mono.empty();
try {
//调用促销活动投放服务方法
return promotionPushService.pushPromotion(id);
} catch (Exception e) {
//记录错误日志
log.error("push promotion error!");
JSONObject jsonObject = new JSONObject();
jsonObject.put("code", Constant.ERROR_CODE);
jsonObject.put("msg", "push promotion error!");
return Mono.just(ResponseEntity.ok(jsonObject));
}
}
//领取奖品接口,/api/ getPrize?id=xx&device=xx
@GetMapping("getPrize")
@ResponseBody
@SentinelResource(value = "getPrize", entryType = EntryType.IN,
blockHandler = "prizeBlockHandle", blockHandlerClass = {BlockHandler
Service.class}, defaultFallback = "fallback", fallbackClass =
{FallBackService.class})
public Mono<ResponseEntity<JSONObject>> getPrize(Integer id,
String device) {
try {
//调用领取奖品服务方法
return promotionPushService.getPrize(id, device);
} catch (Exception e) {
//记录错误日志
log.error("get prize error!");
JSONObject jsonObject = new JSONObject();
jsonObject.put("code", Constant.ERROR_CODE);
jsonObject.put("msg", "get prize error!");
return Mono.just(ResponseEntity.ok(jsonObject));
}
}
}(11)PromotionPushService 代码如下:
package com.example.microservice.promotion.service;
import java.text.MessageFormat;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.ReactiveHashOperations;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;
import com.example.microservice.promotion.constants.Constant;
import com.example.microservice.promotion.model.PromotionEntity;
import cn.hutool.json.JSONObject;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Service
@Slf4j
public class PromotionPushService {
@Autowired
private ReactiveStringRedisTemplate reactiveStringRedisTemplate;
//促销活动投放方法
public Mono<ResponseEntity<JSONObject>> pushPromotion(Integer id) {
//组装促销活动 Redis key
String key = MessageFormat.format(Constant.REDIS_PROMOTION_
KEY, String.valueOf(id));
//采用 ReactiveStringRedisTemplate 查询促销活动信息
ReactiveHashOperations<String, String, String> reactiveHash
Operations = reactiveStringRedisTemplate.opsForHash();
Flux<Entry<String, String>> flux = reactiveHashOperations.
entries(key);
Map<String, String> map = new HashMap<>();
flux.subscribe(entry -> {
String k = entry.getKey();
String value = entry.getValue();
map.put(k, value);
});
flux.blockLast(Duration.ofMillis(1000)); //先查询,最多阻塞 1s
if (MapUtils.isNotEmpty(map)) {
String name = (String) map.get("name");
String prize = (String) map.get("prize");
Integer beginTime = Integer.valueOf((String) map.get
("beginTime"));
Integer endTime = Integer.valueOf((String) map.get
("endTime"));
Integer currentTime = (int) (System.currentTimeMillis()/
1000);
//判断促销活动投放条件,如果在促销活动时间内,则投放
if (currentTime >= beginTime && currentTime <= endTime) {
//组装 PromotionEntity 对象
PromotionEntity promotionEntity = new PromotionEntity();
promotionEntity.setBeginTime(beginTime);
promotionEntity.setEndTime(endTime);
promotionEntity.setId(id);
promotionEntity.setName(name);
promotionEntity.setPrize(prize);
log.info("push promotion success");
JSONObject jsonObject = new JSONObject(promotionEntity);
return Mono.just(ResponseEntity.ok(jsonObject));
}
}
JSONObject jsonObject = new JSONObject();
jsonObject.put("code", Constant.ERROR_CODE);
jsonObject.put("msg", "push promotion error!");
return Mono.just(ResponseEntity.ok(jsonObject));
}
//领取奖品的方法
public Mono<ResponseEntity<JSONObject>> getPrize(Integer id,
String device) {
//组装领取奖品记录 Redis key
String key = MessageFormat.format(Constant.REDIS_PRIZE_KEY,
String.valueOf(id), device);
//查询领取奖品记录
Mono<String> mono = reactiveStringRedisTemplate.opsForValue().
get(key);
String value = mono.block(Duration.ofMillis(1000));
//领取奖品判断条件,如果领取过,则不再发放
if (StringUtils.isEmpty(value)) {
String promotionKey = MessageFormat.format(Constant.REDIS_
PROMOTION_KEY, String.valueOf(id));
ReactiveHashOperations<String, String, String> reactive
HashOperations = reactiveStringRedisTemplate.opsForHash();
Flux<Entry<String, String>> flux = reactiveHashOperations.
entries(promotionKey);
Map<String, String> map = new HashMap<>();
flux.subscribe(entry -> {
String k = entry.getKey();
String v = entry.getValue();
if (StringUtils.equals("prize", k)) {
map.put(k, v);
}
});
//先查询,最多阻塞 1s
flux.blockLast(Duration.ofMillis(1000));
if (MapUtils.isNotEmpty(map)) {
String prize = map.get("prize");
log.info("get prize success");
JSONObject jsonObject = new JSONObject();
jsonObject.put("奖品", prize);
return Mono.just(ResponseEntity.ok(jsonObject));
}
}
JSONObject jsonObject = new JSONObject();
jsonObject.put("code", Constant.ERROR_CODE);
jsonObject.put("msg", "prize is exist!");
return Mono.just(ResponseEntity.ok(jsonObject));
}
}(12)限流代码如下:
package com.example.microservice.promotion.service;
import org.springframework.http.ResponseEntity;
import com.example.microservice.promotion.constants.Constant;
import cn.hutool.json.JSONObject;
import reactor.core.publisher.Mono;
//限流通用类
public final class BlockHandlerService {
public static Mono<ResponseEntity<JSONObject>> promotionPush
BlockHandle(Integer id) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("code", Constant.ERROR_CODE);
jsonObject.put("msg", "pushPromotion blcok!");
return Mono.just(ResponseEntity.ok(jsonObject));
}
public static Mono<ResponseEntity<JSONObject>> prizeBlockHandle
(Integer id, String device) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("code", Constant.ERROR_CODE);
jsonObject.put("msg", "get prize blcok!");
return Mono.just(ResponseEntity.ok(jsonObject));
}
}(13)降级代码如下:
package com.example.microservice.promotion.service;
import org.springframework.http.ResponseEntity;
import com.example.microservice.promotion.constants.Constant;
import cn.hutool.json.JSONObject;
import reactor.core.publisher.Mono;
//降级通用类
public final class FallBackService {
public static Mono<ResponseEntity<JSONObject>> defaultFallBack
(Throwable ex){
JSONObject jsonObject = new JSONObject();
jsonObject.put("code", Constant.ERROR_CODE);
jsonObject.put("msg", "pushPromotion fallback!");
return Mono.just(ResponseEntity.ok(jsonObject));
}
}(14)MicroservicePromotionApplication 代码如下:
package com.example.microservice.promotion;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.data.redis.RedisAuto
Configuration;
import org.springframework.boot.autoconfigure.data.redis.Redis
ReactiveAutoConfiguration;
import org.springframework.boot.autoconfigure.data.redis.Redis
RepositoriesAutoConfiguration;
import org.springframework.cloud.client.discovery.EnableDiscovery
Client;
import org.springframework.context.annotation.EnableAspectJAutoProxy;
@SpringBootApplication(exclude = {RedisAutoConfiguration.class, Redis
RepositoriesAutoConfiguration.class, RedisReactiveAutoConfiguration.
class})
@EnableAspectJAutoProxy //开启切面
@EnableDiscoveryClient //开启服务发现
public class MicroservicePromotionApplication {
public static void main(String[] args) {
SpringApplication.run(MicroservicePromotionApplication.class,
args);
}
}此时,启动 MicroservicePromotionApplication 主类即可访问促销活动接口 http://localhost:8081/api/pushPromotion?id=3,返回结果如下:
{
id: 3,
name: "会员促销活动",
beginTime: 1614822680,
endTime: 1617176808,
prize: "3 天免费会员"
}访问领取奖品接口 http://localhost/api/getPrize?id=3&device= 3af57d0545766ec9 40d2c32a6567cc06aed,返回结果如下:
{
奖品: "3 天免费会员"
}绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论