当前位置:网站首页>ActiveMQ -- JDBC code of persistent mechanism
ActiveMQ -- JDBC code of persistent mechanism
2022-07-25 09:17:00 【Why don't you laugh】
Coding test
Be sure to turn on persistence !!!
messageProducer.setDeliverMode(DeliveryMode.PERSISTENT);
queue
producer
public class JmsProduceJDBC {
public static final String ACTIVEMQ_URL = "tcp://localhost:61616";
public static final String USERNAME = "admin";
public static final String PASSWORD = "hll123";
public static final String QUEUE_NAME = "jdbc01";
public static void main(String[] args) throws Exception {
//1. According to the given url Create connection factory
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD,ACTIVEMQ_URL);
// 2. Connect through the factory connection And start
Connection connection = activeMQConnectionFactory.createConnection();
// 3. start-up
connection.start();
// 4. Create a session session
// Two parameters , The first thing , Second sign in
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5. Create destination , queue 、 The theme , Here we use queues
Queue queue = session.createQueue(QUEUE_NAME);
// 6. Create the producer of the message
MessageProducer messageProducer = session.createProducer(queue);
/** * Persistence must be set */
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
// 7. adopt MessageProducer production 3 Messages are sent to the message queue
for (int i = 1; i <= 6; i++) {
//8. Create a message
TextMessage textMessage = session.createTextMessage("msg:" + LocalDateTime.now());
//9. Send a message
messageProducer.send(textMessage);
}
// 10. close resource
messageProducer.close();
session.close();
connection.close();
System.out.println(" **** Message sent to MQ complete **** ");
}
}
production 6 Bar message :

In the database ACTIVEMQ_MSGS In the table , Will generate 6 Data , It is the news of the previous production

consumer
public class JmsConsumerJDBC {
public static final String ACTIVEMQ_URL = "tcp://localhost:61616";
public static final String USERNAME = "admin";
public static final String PASSWORD = "hll123";
public static final String QUEUE_NAME = "jdbc01";
public static void main(String[] args) throws Exception {
// Create connection factory
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, ACTIVEMQ_URL);
// Create connection connection
Connection connection = activeMQConnectionFactory.createConnection();
// Open the connection
connection.start();
// Create a session session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Create a queue , Consistent with the producer
Queue queue = session.createQueue(QUEUE_NAME);
// Create message consumer
MessageConsumer messageConsumer = session.createConsumer(queue);
/** * Method 2: By means of listeners */
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if (null != message && message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("**** Consumer receives message ****:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
System.in.read(); // You must add a line of code , Otherwise, the program will run directly down and end
messageConsumer.close();
session.close();
connection.close();
System.out.println("**** Consumer message complete ****");
}
}
Start consumer , Messages that have been produced will be deleted ,mq Console and database data will be consumed


Queue consumption summary :
- When DeliveryMode Set to NON_PERSISTENCE when , Messages are stored in memory
- When DeliveryMode Set to PERSISTENCE when , The message is kept in broker In the corresponding file or database
Once the messages in the queue are consumer Consumption starts from Broker Delete in
The theme
You must start the consumer subscription theme first
consumer
public class JmsConsumerTopicJDBC {
public static final String ACTIVEMQ_URL = "tcp://localhost:61616";
public static final String USERNAME = "admin";
public static final String PASSWORD = "hll123";
public static final String TOPIC_NAME = "topic-jdbc";
public static void main(String[] args) throws Exception {
/** * Persistent topic message subscription , Similar to wechat official account subscription * You need to start the consumer first , After subscribing to the topic , Follow up production theme messages , consumer ( subscriber ) You will receive a message * consumer ( subscriber ) After subscribing to a topic , Whether online or offline , As long as you keep the normal subscription status , Messages produced during this period will be received . Offline users will receive the previous message after being online again */
System.out.println("jdbc-1"); // Simulate subscriber
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.setClientID("jdbc-1"); // Set up clientId, Indicates the subscriber
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic, "jdbc-1");
connection.start();
Message message = topicSubscriber.receive();
while (null != message) {
TextMessage textMessage = (TextMessage) message;
System.out.println(" Persistence received topic news :" + textMessage.getText());
message = topicSubscriber.receive();
}
session.close();
connection.close();
}
}
Start consumer :


view the database ,ACTIVEMQ_ACKS Add a new record to the table , Information for the current subscriber

producer
public class JmsProduceTopicJDBC {
public static final String ACTIVEMQ_URL = "tcp://localhost61616";
public static final String USERNAME = "admin";
public static final String PASSWORD = "hll123";
public static final String TOPIC_NAME = "topic-jdbc";
public static void main(String[] args) throws Exception {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
MessageProducer messageProducer = session.createProducer(topic);
// connection You must set a persistent theme before starting
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
connection.start();
for (int i = 1; i <= 3; i++) {
TextMessage textMessage = session.createTextMessage("jdbc-msg:" + i);
messageProducer.send(textMessage);
}
messageProducer.close();
session.close();
connection.close();
System.out.println(" **** Persistent messages are sent to MQ complete **** ");
}
}
Start producer :


view the database :ACTIVEMQ_MSGS Consumption data will be added ,ACTIVEMQ_ACKS Of LAST_ACKED_ID It will be updated as the last consumption message ID
ACTIVEMQ_MSGS Inside topic Messages will not be deleted immediately after consumption , and queue Automatically delete after consumption


A small summary
queue
Production news without consumption , Messages will exist
activemq_msgsIn the table , As long as any consumer consumes these messages , These messages will be deleted immediatelytopic
It is generally after starting the consumer subscription , Then produce news through the producer , Then the message will also exist
activemq_msgsIn the table ,activemq_acksThe table stores consumer subscription informationDevelopment considerations
1.mysql Drive pack ( Or other databases ) And the corresponding database connection pool jar The bag needs to be put in activemq In the catalog lib in
2. The initial configuration is completed , After the database generates the table ,activemq.xml Middle configuration
createTablesOnStartup=false3.BeanFactory not initialized or already closed abnormal
Put the machine name of the operating system with "_" Remove the symbol , Restart the operating system
边栏推荐
- Django4.0 + Web + MySQL5.7 实现简单登录操作
- Collection of common algorithm questions in test post interview
- [deep learning] mask Dino Trilogy - the correct way to open Detr Pandora's box
- Probe into Druid query timeout configuration → who is the querytimeout of datasource and jdbctemplate effective?
- 360 degree drag panorama plug-in tpanorama.js
- JS pop-up City filtering component matches mobile terminal
- CIR industrial automation radar
- 51 MCU internal peripherals: serial port communication
- Solutions to ten questions of leetcode database
- [hero planet July training leetcode problem solving daily] 19th binary tree
猜你喜欢

Table table expansion internal row switching effect

Sort out Huawei ap-3010dn_ V2 configuration create WiFi

How to choose a low code software development platform?

Druid 查询超时配置的探究 → DataSource 和 JdbcTemplate 的 queryTimeout 到底谁生效?
[learn rust together] a preliminary understanding of rust package management tool cargo

Query efficiency increased by 10 times! Three optimization schemes to help you solve the deep paging problem of MySQL

音乐人的 NFT 指南

JS small game source code magic tower breakthrough Download

What are the commands used by pl/sql tools to export SQL files?

Silicon Valley classroom lesson 12 - official account on demand course and live broadcast management module
随机推荐
Illustration leetcode - 919. Complete binary tree inserter (difficulty: medium)
activemq--可持久化机制之KahaDB
OpenCV实现简单的人脸追踪
2022-7-14 JMeter pressure test
这家十年内容产业基建公司,竟是隐形的Web3先行者
Table table expansion internal row switching effect
51 single chip microcomputer controls nixie tube display
sticksy.js页面滚动div固定位置插件
Silicon Valley classroom lesson 12 - official account on demand course and live broadcast management module
Oracle10g单实例数据库升级到哪个版本好,求建议
Algorithm --- flip digit (kotlin)
How does Youxuan database encrypt data?
51 MCU internal peripherals: serial port communication
mysql中的数据结果排名
51 MCU internal peripherals: timer and counter
activemq--可持久化机制之JDBC
Additional: in the lower division / county (data sheet)
Overview of redis/mysql knowledge
2022-7-14 JMeter simulates the login of different users for pressure test
Wechat applet obtains the data of ---- onenet and controls the on-board LED of STM32