SpringCloudStream动态路由Key配置与RabbitMQ实战指南
前言
在实际业务开发中,我们常常会遇到这样的场景:用户完成一系列操作后,系统需要根据不同的操作结果,发送不同类型的通知邮件。如果直接在业务逻辑里同步调用邮件服务,不仅耗时,还会拖慢主流程。这时候,引入消息中间件进行异步解耦,就成了一个自然而然的选择。
免费影视、动漫、音乐、游戏、小说资源长期稳定更新! 👉 点此立即查看 👈
不过,新的问题也随之而来。不同的邮件类型,本质上对应着不同的业务逻辑,我们当然希望它们能被不同的消费者处理。在RabbitMQ的模型里,这就意味着生产者需要根据消息类型,将其投递到不同的队列。如何实现呢?核心就在于路由键(Routing Key)。
如上图所示,一个交换机(Exchange)可以根据不同的路由键,将消息精准地路由到与之绑定的队列中。这正是我们需要的效果。
要实现这个目标,思路其实很清晰:对于消费者而言,它需要声明自己只关心某个特定路由键的消息;而对于生产者,则需要在发送消息时,动态地指定这个消息的路由键。
这里还有一个Spring Cloud Stream的重要概念:group。在绑定RabbitMQ时,一个group就对应一个具体的队列。所以,要区分不同的业务,我们完全可以通过配置不同的group来实现。
例子
接下来,我们通过一个完整的代码示例,来看看如何用Spring Cloud Stream + RabbitMQ实现动态路由。示例包含一个生产者服务和一个消费者服务。
生产者
生产者的配置核心在于两点:一是定义Binder连接RabbitMQ,二是指定动态路由键的表达式。
spring:
application:
name: producer
cloud:
stream:
binders: # 绑定MQ服务信息(此处我们是RabbitMQ)
etpmsRabbitMQ: # 给Binder定义的名称,⽤于后⾯的关联
type: rabbit # MQ类型,如果是Kafka的话,此处配置kafka
environment: # MQ环境配置(⽤户名、密码等)
spring:
rabbitmq:
host: localhost
port: 5672
username: admin
password: xxxxxx
bindings: # 关联整合通道和binder对象
output: # output是我们定义的通道名称,此处不能乱改
destination: testExchange # 要使⽤的Exchange名称(消息队列主题名称)
content-type: text/plain # application/json # 消息类型设置,⽐如json
binder: etpmsRabbitMQ # 关联MQ服务
rabbit:
bindings:
output:
producer:
# 生产者配置RabbitMq的动态路由键
routingKeyExpression: headers.type
请注意routingKeyExpression: headers.type这行配置。它告诉Spring Cloud Stream:消息的路由键,要从消息头(Header)中名为type的字段去获取。
配置好了,发送消息的代码就非常简单了:
package top.chenyt.producer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
/**
* @author yantao.chen
*/
@Service
public class ProviderService {
/**
* 将MessageChannel的封装对象Source注⼊到这⾥使⽤
*/
@Autowired
private Source source;
public void sendMessage(String content, String type) {
// 向mq中发送消息(并不是直接操作mq,应该操作的是spring cloud stream)
// 使⽤通道向外发出消息(指的是Source⾥⾯的output通道)
source.output().send(MessageBuilder.withPayload(content).setHeader("type",type).build());
}
}
关键在于MessageBuilder.withPayload(content).setHeader("type",type).build()这一句。我们在构建消息时,通过setHeader方法设置了type这个头信息。发送时,框架会自动根据配置的表达式headers.type提取这个值,并将其作为路由键发送到RabbitMQ。
最后,别忘了在主应用类上启用绑定:
package top.chenyt;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.cloud.stream.messaging.Source;
/**
* @ClassName etpms-parent
* @Author Jinondo
* @Date 2022/1/31 12:42
*/
@SpringBootApplication
@Slf4j
@EnableBinding({Source.class})
public class ProducerApplication {
public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class, args);
}
}
总结一下生产者的关键步骤:在配置文件中定义routingKeyExpression,然后在发送消息时通过setHeader设置对应的值。
消费者
消费者的配置稍微复杂一点,因为我们需要定义多个绑定,每个绑定对应一个队列(即一个group),并指定其监听的路由键。
spring:
application:
name: consumer
cloud:
stream:
binders: # 绑定MQ服务信息(此处我们是RabbitMQ)
etpmsRabbitMQ: # 给Binder定义的名称,⽤于后⾯的关联
type: rabbit # MQ类型,如果是Kafka的话,此处配置kafka
environment: # MQ环境配置(⽤户名、密码等)
spring:
rabbitmq:
host: localhost
port: 5672
username: admin
password: xxxxx
bindings: # 关联整合通道和binder对象
input: # input是我们定义的通道名称,此处不能乱改
destination: testExchange # 要使⽤的Exchange名称(消息队列主题名称)
content-type: text/plain # application/json # 消息类型设置,⽐如json,自动将对象转为json
binder: etpmsRabbitMQ # 关联MQ服务
group: register
my-input:
destination: testExchange # 要使⽤的Exchange名称(消息队列主题名称)
content-type: text/plain # application/json # 消息类型设置,⽐如json,自动将对象转为json
binder: etpmsRabbitMQ # 关联MQ服务
group: task
rabbit:
bindings:
my-input:
consumer:
bindingRoutingKey: task
input:
consumer:
bindingRoutingKey: register
仔细看这段配置,我们定义了两个绑定:input和my-input。它们连接的是同一个交换机testExchange,但属于不同的group(register和task),这会在RabbitMQ中创建两个独立的队列。
最关键的部分在spring.cloud.stream.rabbit.bindings下面。我们为每个绑定指定了bindingRoutingKey。这意味着:input通道对应的队列,只绑定路由键为register的消息;my-input通道对应的队列,只绑定路由键为task的消息。
接下来,我们需要定义my-input这个自定义通道的接口:
package top.chenyt.consumer;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
public interface MySink {
String MY_INPUT = "my-input";
@Input(MY_INPUT)
SubscribableChannel myinput();
}
然后,编写消息监听器,分别监听这两个通道:
package top.chenyt.consumer;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;
/**
* @ClassName etpms-parent
* @Author Jinondo
* @Date 2022/1/31 12:42
*/
@Service
public class ConsumerMsg {
@StreamListener(Sink.INPUT)
public void receiveMessages(Message message) {
System.out.println("========= input接收到的消息:" + message.getPayload());
}
@StreamListener(MySink.MY_INPUT)
public void receiveMessages02(Message message) {
System.out.println("========= myinput接收到的消息:" + message.getPayload());
}
}
最后,在主应用类中启用这两个绑定:
package top.chenyt;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import top.chenyt.consumer.MySink;
/**
* @ClassName etpms-parent
* @Author Jinondo
* @Date 2022/1/31 12:42
*/
@SpringBootApplication
@Slf4j
@EnableBinding({Sink.class, MySink.class})
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
}
至此,整个流程就搭建完成了。当生产者发送一个type头为register的消息时,它会被路由到group为register的队列,并由receiveMessages方法处理。同理,type为task的消息则会由receiveMessages02方法处理。这就完美实现了基于消息类型的动态路由。
总结
通过上面的示例,我们可以看到,利用Spring Cloud Stream的routingKeyExpression和bindingRoutingKey配置,可以非常优雅地实现RabbitMQ的动态路由功能,而无需在代码中编写任何RabbitMQ原生API。这种方法清晰地将配置与业务代码分离,提高了可维护性。
其核心逻辑可以概括为:生产者通过消息头动态传递路由键,消费者通过配置静态声明其绑定的路由键。两者配合,再加上group对队列的隔离,就能构建出清晰、灵活的消息路由方案。希望这个实践思路能为大家在处理类似业务场景时提供一个可靠的参考。
您可能感兴趣的文章:
- SpringCloudStream+RabbitMQ使用中遇到的问题及解决
- 解决SpringCloudStream整合Kafka,两个通道对应同一个topic报错的情况
- SpringCloud Stream 快速入门实例教程
- SpringCloud使用Kafka Streams实现实时数据处理
- SpringCloudStream原理和深入使用小结
- SpringCloud中的Stream服务间消息传递详解
- SpringCloudStream中的消息分区数详解
游乐网为非赢利性网站,所展示的游戏/软件/文章内容均来自于互联网或第三方用户上传分享,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系youleyoucom@outlook.com。
同类文章
Linux系统下Node.js代码热更新实现方法详解
在Linux环境下实现Node js代码热更新的几种实用方法 在Linux服务器上维护Node js应用时,代码热更新是个绕不开的话题。毕竟,谁愿意为了每次微小的改动就中断服务、重启整个应用呢?好在,社区已经为我们提供了多种成熟的解决方案,每种都有其适用的场景。下面就来梳理一下这些方法,帮你找到最适
Linux系统下Node.js集群配置详细步骤与指南
在 Linux 上配置 Node js 集群 想在 Linux 环境下提升 Node js 应用的性能和可靠性?配置集群是一个绕不开的经典方案。实现方式有好几种,但最直接、最常用的,莫过于 Node js 自己内置的 cluster 模块。它允许你轻松创建多个工作进程,让它们共享同一个服务器端口,从
Linux Node.js内存限制配置方法与优化指南
如何为Linux上的Node js应用程序配置内存限制 在Linux环境中运行Node js应用时,合理配置内存限制是保障应用稳定性的关键一步。内存溢出不仅会导致应用崩溃,还可能拖累整个系统。那么,有哪些既有效又便于实施的方法呢? 方法一:使用Node js内置的 --max-old-space-s
Linux下Node.js日志管理与高效实践指南
Linux 下 Node js 日志管理实操指南 日志,是应用在服务器上留下的“足迹”。一套清晰、高效的日志管理体系,不仅是排查问题的“时光机”,更是洞察系统健康状况的“听诊器”。今天,我们就来聊聊在 Linux 环境下,如何为你的 Node js 应用构建一套既专业又易于维护的日志方案。 一 核心
Linux环境下JavaScript代码调试方法与技巧详解
在Linux环境中调试Ja vaScript代码,可以使用以下方法: 使用Node js内置的调试器: Node js自带了一个调试器,可以通过命令行启动。要使用调试器,请在运行Ja vaScript文件时添加--inspect或--inspect-brk标志。例如: node --inspect-
- 日榜
- 周榜
- 月榜
1
2
3
4
5
6
7
8
9
10
1
2
3
4
5
6
7
8
9
10
相关攻略
2015-03-10 11:25
2015-03-10 11:05
2021-08-04 13:30
2015-03-10 11:22
2015-03-10 12:39
2022-05-16 18:57
2025-05-23 13:43
2025-05-23 14:01
热门教程
- 游戏攻略
- 安卓教程
- 苹果教程
- 电脑教程
热门话题

