- 前言
- 第一部分 基础应用开发
- 第 1 章 Spring Boot 入门
- 第 2 章 在 Spring Boot 中使用数据库
- 第 3 章 Spring Boot 界面设计
- 第 4 章 提高数据库访问性能
- 第 5 章 Spring Boot 安全设计
- 第二部分 分布式应用开发
- 第 6 章 Spring Boot SSO
- 第 7 章 使用分布式文件系统
- 第 8 章 云应用开发
- 第 9 章 构建高性能的服务平台
- 第三部分 核心技术源代码分析
- 第 10 章 Spring Boot 自动配置实现原理
- 第 11 章 Spring Boot 数据访问实现原理
- 第 12 章 微服务核心技术实现原理
- 附录 A 安装 Neo4j
- 附录 B 安装 MongoDB
- 附录 C 安装 Redis
- 附录 D 安装 RabbitMQ
- 结束语
12.1 配置管理实现原理
在第 8 章的实例中,我们知道,配置管理的在线更新功能使用事件总线,即 spring-cloud-bus 来发布状态变化,并且使用分布式消息来发布更新事件,而分布式消息最终使用了 RabbitMQ 来实现消息收发。
12.1.1 在线更新流程
使用配置管理,实现在线更新一般遵循下列步骤:
1)更新 Git 仓库的配置文件。
2)以 POST 指令触发更新请求。
3)配置管理服务器从 Git 仓库中读取配置文件,并将配置文件分发给各个客户端,同时在 RabbitMQ 中发布一个更新消息。
4)客户端订阅 RabbitMQ 消息,收到消息后执行更新。
在使用配置管理的演示实例中,使用如下 POST 指令来触发在线更新:
curl – X POST http://localhost:8888/bus/refresh
接收这个更新指令的实现方法如代码清单 12-1 所示,其中的 publish 将会发布一个更新事件,调用 RabbitMQ 进行消息发布,然后由客户端收到消息后执行更新。代码中定义了请求更新的链接 refresh,并可使用 destination 来指定更新目标。
代码清单 12-1 接收更新指令的源代码
package org.springframework.cloud.bus.endpoint;
......
public class RefreshBusEndpoint extends AbstractBusEndpoint {
public RefreshBusEndpoint(ApplicationEventPublisher context, String id, Bus
Endpoint delegate) {
super(context, id, delegate);
}
@RequestMapping(
value = {"refresh"},
method = {RequestMethod.POST}
)
@ResponseBody
public void refresh(@RequestParam(
value = "destination",
required = false
) String destination) {
this.publish(new RefreshRemoteApplicationEvent(this, this.getInstance
Id(), destination));
}
}
12.1.2 更新消息的分发原理
配置管理服务器中的消息分发是从 spring-cloud-bus 中调用 spring-cloud-stream 组件来实现的,而 spring-cloud-stream 使用 RabbitMQ 实现了分布式消息的分发。
RabbitMQ 的消息服务一般需要创建一个交换机 Exchange 和一个队列 Queue,然后将交换机和队列进行绑定。而在设计配置服务器时并没有做这方面的工作,所做的工作仅仅是配置引用 spring-cloud-bus 的依赖和设置连接 RabbitMQ 服务器的参数而已。这个工作其实已经由 spring-cloud-stream 帮我们实现了。
从 RabbitMessageChannelBinder 的源代码中可以看到这部分的实现原理,代码清单 12-2 是一个消息发布方的队列绑定的实现。其中 exchangeName 是一个交换机的名字,baseQueueName 是一个队列的名字,并且从代码中也可以看出它使用了 TopicExchange 交换机,这是 RabbitMQ 中 4 种交换机(Fanout、Direct、Topic、Header)的其中一种,并且也可以看出代码中使用 setRoutingKey 将交换机和队列做了绑定。
代码清单 12-2 RabbitMessageChannelBinder 的源代码
package org.springframework.cloud.stream.binder.rabbit;
......
public class RabbitMessageChannelBinder extends MessageChannelBinderSupport implements DisposableBean {
private AmqpOutboundEndpoint buildOutboundEndpoint(String name, RabbitMessage
ChannelBinder.RabbitPropertiesAccessor properties, RabbitTemplate rabbitTemplate) {
String prefix = properties.getPrefix(this.defaultPrefix);
String exchangeName = applyPrefix(prefix, name);
String partitionKeyExtractorClass = properties.getPartitionKeyExtractor
Class();
Expression partitionKeyExpression = properties.getPartitionKeyExpression();
TopicExchange exchange = new TopicExchange(exchangeName);
this.declareExchange(exchangeName, exchange);
AmqpOutboundEndpoint endpoint = new AmqpOutboundEndpoint(rabbitTemplate);
endpoint.setExchangeName(exchange.getName());
String baseQueueName = exchangeName + ".default";
if(partitionKeyExpression == null && !StringUtils.hasText(partitionKey
ExtractorClass)) {
Queue var15 = new Queue(baseQueueName, true, false, false, this.
queueArgs(properties, baseQueueName));
this.declareQueue(baseQueueName, var15);
this.autoBindDLQ(baseQueueName, baseQueueName, properties);
endpoint.setRoutingKey(name);
org.springframework.amqp.core.Binding var16 = BindingBuilder.bind
(var15).to(exchange).with(name);
this.declareBinding(baseQueueName, var16);
} else {
endpoint.setExpressionRoutingKey(EXPRESSION_PARSER.parseExpression
(this.buildPartitionRoutingExpression(name)));
for(int i = 0; i < properties.getNextModuleCount(); ++i) {
String partitionSuffix = "-" + i;
String partitionQueueName = baseQueueName + partitionSuffix;
Queue queue = new Queue(partitionQueueName, true, false, false,
this.queueArgs(properties, partitionQueueName));
this.declareQueue(queue.getName(), queue);
this.autoBindDLQ(baseQueueName, baseQueueName + partitionSuffix,
properties);
this.declareBinding(queue.getName(), BindingBuilder.bind(queue).
to(exchange).with(name + partitionSuffix));
}
}
this.configureOutboundHandler(endpoint, properties);
return endpoint;
}
......
}
现在我们更加明白,为什么使用 Spring Boot 可以那么简单,就是因为一些复杂的配置和方法都已经由 Spring Boot 及其所调用的一些组件实现了。至于在使用 RabbitMQ 中进行消息发布的实现,最终是由 RabbitTemplate 执行 doSend,将消息发布到 RabbitMQ 服务器上,如代码清单 12-3 所示。
代码清单 12-3 消息发布源代码
package org.springframework.amqp.rabbit.core;
......
public class RabbitTemplate extends RabbitAccessor implements BeanFactoryAware,
RabbitOperations, MessageListener, ListenerContainerAware, Listener {
public void send(Message message) throws AmqpException {
this.send(this.exchange, this.routingKey, message);
}
public void send(final String exchange, final String routingKey, final Message
message, final CorrelationData correlationData) throws AmqpException {
this.execute(new ChannelCallback() {
public Object doInRabbit(Channel channel) throws Exception {
RabbitTemplate.this.doSend(channel, exchange, routingKey,
message,RabbitTemplate.this.returnCallback != null && ((Boolean)RabbitTemplate.
this.mandatoryExpression.getValue(RabbitTemplate.this.evaluationContext,
message, Boolean.class)).booleanValue(), correlationData);
return null;
}
}, this.obtainTargetConnectionFactoryIfNecessary(this.sendConnection
FactorySelectorExpression, message));
}
......
}
使用配置管理服务的客户端都订阅了 RabbitMQ 服务器的消息,当收到更新消息时,即从配置管理服务器中取得更新文件,然后在本地上执行更新配置的流程。
有关消息的发布和订阅的实现方法,最后通过一个简单的实例,使用 spring-cloud-stream,更加形象地说明这种分布式消息的发布和接收的原理。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论