当前位置:网站首页>消息队列的使用
消息队列的使用
2022-06-27 11:56:00 【瑾琳】
3.1 RabbitMQ的核心:
核心官网有介绍,说的connecnton,channel之类的,到底怎么样,who care?
总体来看,我们关注业务实现是:1)消息怎么投递的。2)消费者怎么消费消息。3)消息是否是可靠投递。4)消息投递方式。5)消息的生命周期。6)消息队列生命周期
3.2 消息是怎么投递的?(记住一点,生产者消息投递都是面向交换机的)
RabbitMQ 是面向交换机投递消息的。交换机可能绑定有许多队列,交换机如何将消息投递给这些队列呢?
首先说一下面向交换机的设计的优势:1)这明显借助了数据链路层那个交换机的设计思想。除了层级分明以外,还能从分提高链路利用率(可能有点抽像)。
2)从代码层面来看:如果没有交换机,你至少得维护一个十分庞大的路由表,然后从路由表正确投递消息,有了交互机,这里路
由表就会被拆分到多个交换机里面,效果不必多说。
3)然后就是高度的解耦,不同的交换机可有不同的路由规则,要是没有交换机。。。。。。
在RabbitMQ,交换机有4种投递方式,就是枚举类BuiltinExchangeType的4个枚举变量:
DIRECT:会将所有消息先取消息的ROUTE_KEY,然后投递到与ROUTE_KEY绑定的队列里面(if(msg.routekey.equals(queue.routekey)))。
FANOUT:此种模式下,根本不检查消息的ROUTE_KEY,直接投送到交换机所拥有的所有队列里面。
TOPIC,HEADERS自行看一下官网怎么说的,不想码字了^_^||
总结起来就一个函数就把消息发出去了:channel.basicPublish(excange_name,route_key,false,bs,"test".getBytes());可以去官网查一下这个API
3.3 消费者怎么消费消息(记住一点,消费者消费消息是面向消息队列的,这与生成者有点不一样)
还不是就是TCP长连接心跳的那些事,就是这么一个API:channel.basicConsume(QUEUE_AUTODELETE, true, consumer);consumer是Consumer类的一个实例,
你直接去处理回调接口就ok了
3.4 消息传递是否可靠
很明显是可靠的,除非你将消息队列,声明成非持久模式,这事你又重启了机器。这会丢失消息的。还有就是他有应答机制,你可以通过设置消费者消费消息的模式,
去手动应答。channel.basicConsume(?,autoACk,?)的autoAck参数设置
3.5 消息的生命周期
一旦受到消费者应答,标识消息已被消费,则消息被回收掉。
3.6 队列生命周期
channel.queueDeclare(QUEUE_NAME,false,false,true,null);
第二个参数设置为true,会将消息持久化到磁盘,第四个参数设置为true表示没有消息并且没有连接则删除改队列,详情可以查一下API
四、一个示例
4.1 生产者代码:
自行导入相关依赖包或相关依赖
复制代码
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("username");
factory.setPort(5672);//注意这里的端口与管理插件的端口不一样
factory.setPassword("pwd");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明一个dirent模式的交换机
channel.exchangeDeclare("exchange_name",BuiltinExchangeType.DIRECT,true);
//声明一个非持久化自动删除的队列
channel.queueDeclare("queue_name",false,false,true,null);//如果该队列不在被使用就删除他 zhe
//将绑定到改交换机
channel.queueBind("queue_name","exchange_name","route_key");
//声明一个消息头部
Map<String,Object> header=new HashMap<>();
AMQP.BasicProperties.Builder b= new AMQP.BasicProperties.Builder();
header.put("charset","utf-8");
b.headers(header);
AMQP.BasicProperties bp=b.build();
//将消息发出去
channel.basicPublish("exchange_name","route_key",false,bp,"test3".getBytes());
复制代码
4.2 消费者代码
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("username");
factory.setPort(5672);//注意这里的端口与管理插件的端口不一样
factory.setPassword("pwd");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明一个dirent模式的交换机
channel.exchangeDeclare("exchange_name",BuiltinExchangeType.DIRECT,true);
//声明一个非持久化自动删除的队列
channel.queueDeclare("queue_name",false,false,true,null);//如果该队列不在被使用就删除他 zhe
//将绑定到改交换机
channel.queueBind("queue_name","exchange_name","route_key");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume("queue_name", true, consumer);
边栏推荐
- How to modify a node_ Files in modules
- R语言使用epiDisplay包的followup.plot函数可视化多个ID(病例)监测指标的纵向随访图、使用stress.labels参数在可视化图像中为强调线添加标签信息
- Xuri 3sdb, installing the original ROS
- Redis distributed lock 15 ask, what have you mastered?
- Tidb 6.0: making Tso more efficient tidb Book rush
- 【On nacos】快速上手 Nacos
- .NET6接入Skywalking链路追踪完整流程
- . Net6 access skywalking link tracking complete process
- 动态规划【三】(区间dp)石子合并
- What is the TCP 3-time handshake process?
猜你喜欢
Youboxun attended the openharmony technology day to create a new generation of secure payment terminals
Histrix工作原理
Dynamic programming [4] (counting class DP) example: integer partition
pull request
esp32s3 IPERF例程测试 esp32s3吞吐量测试
Research Report on the overall scale, major manufacturers, major regions, products and application segments of hydraulic torque in the global market in 2022
I.MX6ULL启动方式
[on Nacos] get started quickly
Salesforce 容器化 ISV 场景下的软件供应链安全落地实践
MapReduce原理剖析(深入源码)
随机推荐
The R language uses the follow up The plot function visualizes the longitudinal follow-up map of multiple ID (case) monitoring indicators, and uses stress The labels parameter adds label information t
千万不要错过,新媒体运营15个宝藏公众号分享
Shell script learning notes
Time management understood after being urged to work at home
亚马逊测评掉评、留不上评是怎么回事呢?要如何应对?
Wechat applet realizes five-star evaluation
如何修改 node_modules 里的文件
TiDB 6.0:让 TSO 更高效丨TiDB Book Rush
进程间通信详解
面试突击60:什么情况会导致 MySQL 索引失效?
StarCraft's Bug King ia retired for 2 years to engage in AI, and lamented that it was inferior
Four memory areas (stack, heap, global, code area)
Minimum editing distance (linear DP writing method)
Master formula
Getting started with go web programming: validators
The DBSCAN function of FPC package in R language performs density clustering analysis on data, and the plot function visualizes the clustering graph
Wechat applet payment password input
[tcapulusdb knowledge base] tcapulusdb doc acceptance - Introduction to creating game area
深入理解 happens-before 原则
Research Report on the overall scale, major producers, major regions, products and application segments of swine vaccine in the global market in 2022