当前位置:网站首页>rabbmitMQ 简单模式<一>
rabbmitMQ 简单模式<一>
2022-06-22 03:12:00 【风华浪浪】
这是我第三次学习RabbmitMq了
一年前从上海离职时,男哥我同事(工作这几年最大大佬)告诉我要精通一门语言,比如python、Java。其它语言只是辅助,不要门门通,门门松。 周捷告诉我,一般的逻辑很简单,CURD数据库的增删改查,没有任何难度,外加celery足以搞定90%业务场景。 他告诉我,工资20k+,消息队列任务调度,是一大关。无论爬虫 scrapy、pyspider分布式靠什么任务调度,web后端秒杀、抢票靠什么任务调度,任务靠消息中间件, 爬虫kafka、后端rabbmitmq
即消息队列通常基于“先进先出”的数据消费原则, 经常用于解决应用程序解耦、消息异步传递,流量削峰等场合,从而实现实现高性能、高可用、可伸缩和最终一致性业务架构,
一、pymysql 执行过程
import pymysql # 链接MySQL db = pymysql.connect("localhost","testuser","test123","TESTDB" ) # 创建游标对象cursor cursor = db.cursor() # 向指定的表插入数据 cursor.execute("SELECT VERSION()") # 关闭数据库连接 db.close()
二、生产者 producer
链接rabbmitmq
创建队列
向指定的队列插入数据
import pika 链接rabbmitmq 阻塞连接(BlockingConnection) credentials = pika.PlainCredentials('zhangsan', '1qaz2wsx') connection = pika.BlockingConnection( pika.ConnectionParameters('192.168.56.1', credentials=credentials) ) 创建链接的通道,类似于mysql游标 channel = connection.channel() 声明queue channel.queue_declare(queue='hello') 发送消息内容 channel.basic_publish( exchange='', routing_key='hello', 向指定队列中插入数据 body='hello world' ) print('[x] sent hello world') connection.close()
三、消费者
import pika # 输入普通凭据(登录用户名和密码) credentials = pika.PlainCredentials('admin', 'admin') connection = pika.BlockingConnection( # 阻塞连接 pika.ConnectionParameters('106.13.168.8', credentials=credentials) ) 创建链接的通道,类似于mysql游标 channel = connection.channel() # 声明queue channel.queue_declare(queue='hello') 发送方确认机制 channel.confirm_delivery() def callback(ch, method,properties, body): 函数名可以自定义,传给on_message_callback print(f'[x] receive---->{ch}, {method} {properties} {body}') 消费者消费完成后返回表示符 ch.basic_ack(delivery_tag=method.delivery_tag) # 消费消息内容(队列, 调用函数, 是否确认) channel.basic_consume(queue='hello', on_message_callback=callback, no_ack=True) print('[x] waiting formessages to exit press CTRL + C') channel.start_consuming()
应答参数
no_ack=True如果消费过程中报错,没有执行完毕确认机制,再次启动消费者时,该消息虽然报错,但是已经取出,获取不到该条数据,导致数据丢失。
手动应答安全,牺牲效率;自动应答效率高,不安全,可能丢数据ch.basic_ack(delivery_tag=method.delivery_tag)消费者消费完成后返回表示符
持久化参数
channel.queue_declare(queue='hello2', durable=True) # 原来队列为空即删除, durable=True 声明队列可持久化,而不是队列里的数据可持久化(消费者生产者都要声明) channel.basic_publish(exchange='', routing_key='hello2', body='Hello World!', properties=pika.BasicProperties( delivery_mode=2 消息持久化 ) )若’一个发消息,两个收消息,收消息是公平的依次分发。这种方式叫轮询分发(round-robin)不管谁忙,都不会多给消息,总是你一个我一个。想要做到公平分发(fair dispatch),必须关闭自动应答ack,改成手动应答。使用basicQos(perfetch=1)限制每次只发送不超过1条消息到同一个消费者,消费者必须手动反馈告知队列,才会发送下一个。
channel.basic_qos(prefetch_count=1)
其它参数
ch<BlockingChannel impl=<Channel number=1 OPEN conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x0703A0D0> params=<ConnectionParameters host=106.13.168.8 port=5672 virtual_host=/ ssl=False> > > >method<Basic.Deliver( [ 'consumer_tag=ctag1.33acbf32007a459caba7ef90a622c38d', 'delivery_tag=1112', 'exchange=', 'redelivered=False', 'routing_key=hello' ] ) >properties<BasicProperties>bodyb'hello world'
https://www.cnblogs.com/pyedu/p/11866829.html
https://www.lylinux.net/article/2019/8/23/60.html

如果在发送方发出消息后,如果exchange写错了,或者没有任何队列绑定我们发送的exchange,那么在这时候发送方是对此浑然不知的,而rabbitmq为了解决这个问题下面介绍下。
生产者将信道设置成confirm(确认)模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,RabbitMQ就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一ID),这就使得生产者知晓消息已经正确到达了目的地了。如果消息和队列是持久化的,那么确认消息会在消息写入磁盘之后发出。RabbitMQ回传给生产者的确认消息中的deliveryTag包含了确认消息的序号,此外RabbitMQ也可以设置channel.basicAck方法中的multiple参数,表示到这个序号之前的所有消息都已经得到了处理,如下图所示:

发送方确认机制最大的好处在于它是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用程序便可以通过回调方法来处理该确认消息,如果rabbitmq因为自身内部错误导致消息丢失,就会发送一条nack(basic.nack)命令,生产者应用程序同样可以在回调方法中处理该nack命令。
边栏推荐
- A component required a bean of type 'com.example.demo3.service.UserServiceImp' that could not be fou
- Deep learning final review
- 【NVMe2.0b 11】NVMe Reset
- TX2 mirror source settings
- [kubernetes series] what is kubernetes?
- unity3D C# 在区间内生成不重复的随机数
- sequelize 常用命令使用
- Classification of traffic signs
- Usage of tail
- Overview of web framework and program development
猜你喜欢
随机推荐
Primary key in efcore
装饰器《二》 property - 简答逻辑
Implementation of epoll+threadpool high concurrency network IO model
php使用composer
Selenium entry level project - Doudou quiz
【爬虫笔记1】环境搭建和必要工具Selenium
AtCoder Regular Contest 142
Select in golang concurrent programming
Project management software development project management
[QNX Hypervisor 2.2用户手册]5.5 启动和使用Guest
微信小程序聊天 表情
(问题解决) 缺少gcr.io/kubebuilder/kube-rbac-proxy:v0.8.0
Classic case of JS operation node (three-level linkage)
A solution to memory leak in server
图数据平台解决方案:集群部署
golang并发编程之原子操作详解
BOM 属性、方法、事件应用案例
On map state mapping
Figure data platform solution: single node deployment
【NVMe2.0b 5】NVM Subsystem
![[nvme2.0b 8] nvme queue arbitration mechanism](/img/35/c5098623c14749711b205ef97c34a7.png)





![[nvme2.0b 6] nvme queue model](/img/e9/d29001cebeebe9677b02ffb7c25726.png)

