当前位置:网站首页>MQ release confirmation
MQ release confirmation
2022-07-24 21:26:00 【A light wind and light clouds】
Release confirmation principle
The producer sets the channel to confirm Pattern , Once the channel enters confirm Pattern , All messages published on this channel will be assigned a unique ID( from 1 Start ), Once the message is delivered to all matching queues ,broker A confirmation will be sent to the producer ( Unique with message ID), This allows the producer to know that the message has reached the destination queue correctly , If messages and queues are persistent , Then the confirmation message will be sent after the message is written to disk ,broker In the confirmation message returned to the producer delivery-tag The domain contains the serial number of the confirmation message , Besides broker You can also set basic.ack Of multiple Domain , Indicates that all messages before this serial number have been processed .
confirm The biggest advantage of the pattern is that it's asynchronous , Once a message is released , The producer application can continue to send the next message while waiting for the channel to return an acknowledgement , When the message is finally confirmed , The producer application can process the confirmation message through the callback method , If RabbitMQ The message is lost due to its own internal error , Will send a nack news , The producer application can also handle this in the callback method nack news
Publish a confirmed policy
Open the method of publishing confirmation
Publish confirmation is not enabled by default , If you want to open, you need to call the method confirmSelect, Whenever you want to use release confirmation , Need to be in channel Call this method
Single confirmation release
This is a simple way to confirm , It is a way to confirm the release synchronously , That is, after publishing a message, only it is confirmed to be published , Follow up messages can continue to be released ,waitForConfirmsOrDie(long) This method returns... Only when the message is confirmed , If the message is not acknowledged within the specified time range, it will throw an exception .
One of the biggest disadvantages of this confirmation method is : The release speed is particularly slow , Because if you don't confirm the published message, it will block the publication of all subsequent messages , This method provides a throughput of up to hundreds of published messages per second . Of course, this may be enough for some applications .
Code implementation : The tool class obtains the channel
public class untils {
public static Channel getChannel() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.231.132");
factory.setUsername("admin");
factory.setPassword("123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
return channel;
}
}
Realization :
public static void publishMessageIndivdually()throws Exception{
String quequeName= UUID.randomUUID().toString();
Channel channel = untils.getChannel();
// Open release confirmation
channel.confirmSelect();
long begin=System.currentTimeMillis();
for(int i=0;i<1000;i++)
{
String message=i+"";
channel.basicPublish("",quequeName,null,message.getBytes(StandardCharsets.UTF_8));
// Server return false Or no return within the timeout period , Producers can resend messages
boolean flag=channel.waitForConfirms();
// if(flag)
// {
// System.out.println(" Message sent successfully ");
// }
}
long end=System.currentTimeMillis();
System.out.println(" Release "+1000+" A separate confirmation message , Time consuming "+(end-begin)+"ms");
}Batch confirmation release
The way above is very slow , Compared with a single waiting for confirmation message , Publishing a batch of messages and then confirming them together can greatly improve throughput , Of course, the disadvantage of this method is : When a failure causes a problem with the release , I don't know which message is wrong , We must save the whole batch in memory , To record important information and then republish the news . Of course, this scheme is still synchronous , It also blocks the release of messages
public static void publishMessageBatch()throws Exception{
String quequeName= UUID.randomUUID().toString();
Channel channel = untils.getChannel();
// Open release confirmation
channel.confirmSelect();
// Batch confirmation message size
int batchSize = 100;
// Number of unacknowledged messages
int outstandingMessageCount = 0;
long begin=System.currentTimeMillis();
for(int i=0;i<1000;i++)
{
String message=i+"";
channel.basicPublish("",quequeName,null,message.getBytes(StandardCharsets.UTF_8));
outstandingMessageCount++;
if(outstandingMessageCount==batchSize)
{
channel.waitForConfirms();
outstandingMessageCount=0;
}
}
long end=System.currentTimeMillis();
System.out.println(" Release "+"1000 A batch confirmation message , Time consuming "+(end-begin)+"ms");
}Asynchronously confirm publishing
Asynchronous acknowledgement, although the programming logic is more complex than the previous two , But the most cost-effective , Neither reliability nor efficiency can be said , It uses callback functions to achieve reliable message delivery , This middleware also ensures whether the delivery is successful through function callback , Let's explain in detail how asynchronous confirmation is implemented .

How to handle asynchronous unacknowledged messages
The best solution is to put unacknowledged messages into a memory based queue that can be accessed by the publishing thread , For example, with ConcurrentLinkedQueue This line is in confirm callbacks And the publishing thread .
public static void publishMessaheAsync()throws Exception{
String quequeName= UUID.randomUUID().toString();
Channel channel = untils.getChannel();
channel.queueDeclare(quequeName,false,false,false,null);
// Open release confirmation
channel.confirmSelect();
/**
* A thread safe and orderly hash table , Suitable for high concurrency
*1. Easily associate serial numbers with messages
* 2. Easily delete entries in batches , Just give the serial number
* 3. Support concurrent access
*/
ConcurrentSkipListMap<Long,String> outstandingConfirms=new ConcurrentSkipListMap<>();
/**
* Acknowledge a callback to the message
* 1. The serial number of the message
* 2.true Messages less than or equal to the current serial number can be confirmed
* false Message confirming the current serial number
*/
ConfirmCallback ackCallback=(sequenceNumber,multiple)->{
if(multiple)
{
// The returned message is the unconfirmed message with the current serial number less than or equal to , It's a map
ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(sequenceNumber, true);
// Eliminate the unacknowledged messages in this section
confirmed.clear();
}
else
{
// Only messages that know the current serial number
outstandingConfirms.remove(sequenceNumber);
}
};
ConfirmCallback nackCallback=(sequenceNumber,multiple)->{
String message=outstandingConfirms.get(sequenceNumber);
System.out.println(" Release the news "+message+" Has not been confirmed , Serial number "+sequenceNumber);
};
/**
* Add a listener for asynchronous acknowledgement
* 1. Callback to acknowledge receipt of message
* 2. Callback for message not received
*/
channel.addConfirmListener(ackCallback,nackCallback);
long begin=System.currentTimeMillis();
for(int i=0;i<1000;i++)
{
String message=" news "+i;
/**
* channel.getNextPublishSeqNo() Get the sequence number of the next message
* All are unacknowledged message bodies
*/
outstandingConfirms.put(channel.getNextPublishSeqNo(),message);
channel.basicPublish("",quequeName,null,message.getBytes(StandardCharsets.UTF_8));
}
long end=System.currentTimeMillis();
System.out.println(" Release 1000 An asynchronous confirmation message , Time consuming "+(end-begin)+"ms");
}result :
Separate message
Synchronization waiting for confirmation , Simple , But the throughput is very limited .
Publish news in bulk
Batch synchronization waiting for confirmation , Simple , Reasonable throughput , Once something goes wrong, it's hard to infer which one There was a problem with the message .
Asynchronous processing
Optimal performance and resource usage , In the case of errors can be well controlled , But it's a little harder to implement
边栏推荐
- A simple method -- determine whether the dictionary has changed
- 驱动子系统开发
- Which bank outlet in Zhejiang can buy ETF fund products?
- IO flow overview
- Penetration test - command execution injection
- What are intelligent investment advisory products?
- Detailed OSPF configuration of layer 3 switch / router [Huawei ENSP experiment]
- Intel internship mentor layout problem 1
- Leetcode 15. sum of three numbers
- Practical skills!!
猜你喜欢

None of the most complete MySQL commands in history is applicable to work and interview (supreme Collection Edition)

Node installation using NVM succeeded, but NPM installation failed (error while downloading, TLS handshake timeout)

01_ UE4 advanced_ PBR material

Penetration test - command execution injection

Native applets are introduced using vant webapp

Five digital transformation strategies of B2B Enterprises

Detailed explanation of ThreadLocal

Sword finger offer 15. number of 1 in binary
Hilditch refinement (implementation I)

C synchronous asynchronous callback state machine async await demo
随机推荐
rogabet note 1.1
[jzof] 04 search in two-dimensional array
Selenium test page content download function
Smarter! Airiot accelerates the upgrading of energy conservation and emission reduction in the coal industry
APR learning failure problem location and troubleshooting
Alibaba cloud and parallel cloud launched the cloud XR platform to support the rapid landing of immersive experience applications
C # image template matching and marking
Application layer - typical protocol analysis
With this PDF, I successfully got offers from ant, jd.com, Xiaomi, Tencent and other major manufacturers
Put apples
Bring new people's experience
Information system project manager must recite the core examination site (47) project subcontract
Nested printing in CAD web pages
Scientific computing toolkit SciPy Fourier transform
Brand new: the latest ranking of programming languages in July
Top 10 in China's HCI software market: Huawei, Xinhua, Shenxin, VMware, Lenovo, smartx, Inspur, Qingyun, lutanli, dawn
[feature selection] several methods of feature selection
Redis (12) -- redis server
Dtable launched in the public beta, which is not only a table, but also a business application builder
Hilditch refinement (implementation I)