博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spring Boot系列十七 Spring Boot 集成 websocket,使用RabbitMQ做为消息代理
阅读量:6177 次
发布时间:2019-06-21

本文共 5645 字,大约阅读时间需要 18 分钟。

1. 概述

在上篇文章中我们使用的消息代理是spring内置的简单消息代理,简单消息代理非常适合入门,但是只支持STOMP命令的子集(如不支持acks, receipts),依赖于消息发送循环,并且不支持集群。我们可以使用外部的消息代理(如RabbitMQ, ActiveMQ),来实现全功能消息代理。本文以集成RabbitMQ为例。本文的主要内容如下:

  • 使用RabbitMQ做websocket消息代理的准备工作和消息流程图
  • Spring Boot使用RabbitMQ做websocket的主要代码
  • 演示在RabbitMQ不同目的的(destination)用法

2. 使用RabbitMQ做websocket消息代理的准备工作和消息流程图

关于RabbitMQ的用法,可以参考本作者的

2.1. 使用RabbitMQ做websocket消息代理的准备工作

我们选择类似RabbitMQ全功能的消息代理。安装消息代理后,以支持STOMP的情况情况运行服务。 我们在RabbitMQ上启动rabbitmq_web_stomp插件

  1. 在RabbitMQ上启动rabbitmq_web_stomp插件,在rabbitMQ上执行如下命令:sudo rabbitmq-plugins enable rabbitmq_web_stomp
  2. 登录RabbitMQ管理平台,看到如下信息,发现已经开启stomp代理服务

2.2. 消息流程图

此图和使用简单消息最大的不同是"broker relay"用于通过TCP将消息传递给外部STOMP代理(如这里是RabbitMQ),并将消息从代理传递给订阅客户

3. Spring Boot使用RabbitMQ做websocket的主要代码

3.1. pom.xml

首先在上一篇文章的基础上增加如下jar

io.projectreactor
reactor-net
2.0.8.RELEASE
io.netty
netty-all
4.1.22.Final
复制代码

3.2. BroadcastRabbitMQCtl

和上文BroadcastCtl类似,这里略

3.3. WebSocketRabbitMQMessageBrokerConfigurer 配置

配置外部Rabibitmq替代Simple Broker做消息代理:在configureMessageBroker()方法中配置外部RabbitMQ的地址、帐号密码连接到RabbitMQ

@Configuration// 此注解开使用STOMP协议来传输基于消息代理的消息,此时可以在@Controller类中使用@MessageMapping@EnableWebSocketMessageBrokerpublic class WebSocketRabbitMQMessageBrokerConfigurer extends AbstractWebSocketMessageBrokerConfigurer {    @Override    public void registerStompEndpoints(StompEndpointRegistry registry) {        /**         * 注册 Stomp的端点         *         * addEndpoint:添加STOMP协议的端点。这个HTTP URL是供WebSocket或SockJS客户端访问的地址         * withSockJS:指定端点使用SockJS协议          */        registry.addEndpoint("/websocket-rabbitmq").withSockJS();    }    @Override    public void configureMessageBroker(MessageBrokerRegistry registry) {        /**         * 配置消息代理         * 使用RabbitMQ做为消息代理,替换默认的Simple Broker         */	registry		// "STOMP broker relay"处理所有消息将消息发送到外部的消息代理                .enableStompBrokerRelay("/exchange","/topic","/queue","/amq/queue")                .setRelayHost("192.168.0.113")                .setClientLogin("hry")                .setClientPasscode("hry")                .setSystemLogin("hry")                .setSystemPasscode("hry")                .setSystemHeartbeatSendInterval(5000)                .setSystemHeartbeatReceiveInterval(4000);                ;}}复制代码

3.4. ws-broadcast-rabbitmq.jsp

这里的jsp和上面的jsp类似,这里略

3.5. 测试方法

执行启动类: WebSocketRabbitMQApplication 如果连接RabbitMQ,会打印如下信息:

2018-03-26 23:22:04.354 [reactor-tcp-io-1] INFO  o.s.m.s.s.StompBrokerRelayMessageHandler - "System" session connected.2018-03-26 23:22:04.358 [reactor-tcp-io-1] INFO  o.s.m.s.s.StompBrokerRelayMessageHandler - BrokerAvailabilityEvent[available=true, StompBrokerRelay[192.168.0.113:61613]]复制代码

测试请求: http://127.0.0.1:8080//broadcast-rabbitmq/index 具体测试的配置见下方

4. 演示在RabbitMQ不同目的的(destination)用法

WebSocketRabbitMQMessageBrokerConfigurer中我们需要配置消息代理的前缀。在RabbitMQ中合法的目的前缀:/temp-queue, /exchange, /topic, /queue, /amq/queue, /reply-queue/. 我们这里演示以上后4个的用法

4.1. /exchange/exchangename/[routing_key]

通过交换机订阅/发布消息,交换机需要手动创建,参数说明 a. /exchange:固定值 b. exchangename:交换机名称 c. [routing_key]:路由键,可选

对于接收者端,该 destination 会创建一个唯一的、自动删除的随机queue, 并根据 routing_key将该 queue 绑定到所给的 exchangename,实现对该队列的消息订阅。 对于发送者端,消息就会被发送到定义的 exchangename中,并且指定了 routing_key。

在本文的代码基础进行如下修改

  1. 在RabbitMQ上创建名为rabbitmq交换机

  2. 在BroadcastRabbitMQCtl中修改发送者代码

    SendTo("/exchange/rabbitmq/get-response")public ResponseMessage broadcast(RequestMessage requestMessage){…}复制代码
  3. 在ws-broadcast-rabbitmq.jsp中修改接收者的代码

    stompClient.subscribe('/exchange/rabbitmq/get-response', function(respnose){                showResponse(JSON.parse(respnose.body).responseMessage);            })复制代码

测试: 打开两个页面,其中一个页面发送3次,这3个消息被两个都收到

4.2. /queue/queuename

使用默认交换机订阅/发布消息,默认由stomp自动创建一个持久化队列,参数说明 a. /queue:固定值 b. queuename:自动创建一个持久化队列

对于接收者端,订阅队列queuename的消息 对于接收者端,向queuename发送消息 [对于 SEND frame,destination 只会在第一次发送消息的时候会定义的共享 queue]

在本文的代码基础进行如下修改

  1. 在BroadcastRabbitMQCtl中修改代码

    @SendTo("/queue/rabbitmq")public ResponseMessage broadcast(RequestMessage requestMessage){…}复制代码
  2. 在ws-broadcast-rabbitmq.jsp中修改接收者的代码

    stompClient.subscribe(                '/queue/rabbitmq',                function(respnose){                showResponse(JSON.parse(respnose.body).responseMessage);            });复制代码

测试: 打开两个页面,其中一个页面发送7次,这7个消息被两个页面轮流接收

4.3. /amq/queue/queuename

和上文的"/queue/queuename"相似,两者的区别是 a. 与/queue/queuename的区别在于队列不由stomp自动进行创建,队列不存在失败

这种情况下无论是发送者还是接收者都不会产生队列。 但如果该队列不存在,接收者会报错。

在本文的代码基础进行如下修改

  1. 在RabbitMQ上手动创建名为rabbitmq2的队列

  2. 在BroadcastRabbitMQCtl中修改代码

    @SendTo("/amq/queue/rabbitmq2")    public ResponseMessage broadcast(RequestMessage requestMessage){..}复制代码
  3. 在ws-broadcast-rabbitmq.jsp中修改接收者的代码

    stompClient.subscribe(                '/amq/queue/rabbitmq2',                function(respnose){                showResponse(JSON.parse(respnose.body).responseMessage);            });复制代码

测试: 对于 SUBCRIBE frame,destination 会实现对队列的消息订阅。 对于 SEND frame,消息会通过默认的 exhcange 直接被发送到队列中。

打开两个页面,其中一个页面发送7次,这7个消息被两个页面轮流接收

4.4. /topic/routing_key

通过amq.topic交换机订阅/发布消息,订阅时默认创建一个临时队列,通过routing_key与topic进行绑定 a. /topic:固定前缀 b. routing_key:路由键

对于发送者端,会创建出自动删除的、非持久的队列并根据 routing_key路由键绑定到 amq.topic 交换机 上,同时实现对该队列的订阅。 对于发送者端,消息会被发送到 amq.topic 交换机中。

在本文的代码基础进行如下修改

  1. 在BroadcastRabbitMQCtl中修改代码

    @SendTo("/topic/get-response")    public ResponseMessage broadcast(RequestMessage requestMessage){…}复制代码
  2. 在ws-broadcast-rabbitmq.jsp中修改接收者的代码

    stompClient.subscribe(                '/topic/get-response',                function(respnose){                showResponse(JSON.parse(respnose.body).responseMessage);            });复制代码

测试: 打开两个页面,其中一个页面发送4次,这4个消息同时被两个都收到

5. 代码

所有的详细代码见github代码,请尽量使用

转载地址:http://cizda.baihongyu.com/

你可能感兴趣的文章
计算机硬件、驱动程序和操作系统
查看>>
mysql 下载及安装方法
查看>>
字符串的比较查找与替换
查看>>
XML 文件解析--含Unicode字符的XML文件
查看>>
单利模式
查看>>
Two Sum
查看>>
Spring MVC绑定 List 对象参数
查看>>
spark 添加第三方依赖
查看>>
配置虚拟目录
查看>>
RHCE 学习笔记(31) - 防火墙 (下)
查看>>
linux 命令
查看>>
【Linux学习记录】Linux主机规划与安装
查看>>
springboot 拦截器
查看>>
Call requires API level 4 (current min is 3):类似异常解决
查看>>
zookeeper学习之zkclient节点增删改查<九>
查看>>
HDP上安装impala
查看>>
第6篇-JAVA面向对象Ⅱ
查看>>
Centos5.8 x86_64下安装DRBD+Heartbeat+NFS
查看>>
8月27日 第五次课作业
查看>>
青云云平台开启***服务
查看>>