当前位置:网站首页>activemq--可持久化机制之JDBC代码
activemq--可持久化机制之JDBC代码
2022-07-25 09:12:00 【你怎么不笑了】
编码测试
一定要开启持久化!!!
messageProducer.setDeliverMode(DeliveryMode.PERSISTENT);
队列
生产者
public class JmsProduceJDBC {
public static final String ACTIVEMQ_URL = "tcp://localhost:61616";
public static final String USERNAME = "admin";
public static final String PASSWORD = "hll123";
public static final String QUEUE_NAME = "jdbc01";
public static void main(String[] args) throws Exception {
//1.按照给定的url创建连接工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD,ACTIVEMQ_URL);
// 2.通过工厂连接connection 和启动
Connection connection = activeMQConnectionFactory.createConnection();
// 3.启动
connection.start();
// 4.创建会话session
//两个参数,第一个事务,第二个签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5.创建目的地,队列、主题,这里用队列
Queue queue = session.createQueue(QUEUE_NAME);
// 6.创建消息的生产者
MessageProducer messageProducer = session.createProducer(queue);
/** * 持久化必须设置 */
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
// 7.通过MessageProducer生产3条消息发送到消息队列中
for (int i = 1; i <= 6; i++) {
//8.创建消息
TextMessage textMessage = session.createTextMessage("msg:" + LocalDateTime.now());
//9.发送消息
messageProducer.send(textMessage);
}
// 10.关闭资源
messageProducer.close();
session.close();
connection.close();
System.out.println(" **** 消息发送到MQ完成 **** ");
}
}
生产6条消息:

在数据库ACTIVEMQ_MSGS表中,会生成6条数据,就是上一步生产的消息

消费者
public class JmsConsumerJDBC {
public static final String ACTIVEMQ_URL = "tcp://localhost:61616";
public static final String USERNAME = "admin";
public static final String PASSWORD = "hll123";
public static final String QUEUE_NAME = "jdbc01";
public static void main(String[] args) throws Exception {
//创建连接工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, ACTIVEMQ_URL);
// 创建连接connection
Connection connection = activeMQConnectionFactory.createConnection();
//开启连接
connection.start();
//创建会话session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建队列,同生产者一致
Queue queue = session.createQueue(QUEUE_NAME);
//创建消息消费者
MessageConsumer messageConsumer = session.createConsumer(queue);
/** * 方法2:通过监听器的方式 */
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if (null != message && message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("**** 消费者接收到消息 ****:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
System.in.read(); // 必须加行代码,不然程序会直接往下执行结束了
messageConsumer.close();
session.close();
connection.close();
System.out.println("**** 消费者消费消息完成 ****");
}
}
启动消费者,会消息掉已生产的消息,mq控制台和数据库数据都会消费


队列消费总结:
- 当DeliveryMode设置为NON_PERSISTENCE时,消息被保存在内存中
- 当DeliveryMode设置为PERSISTENCE时,消息保存在broker的相应的文件或者数据库中
队列中的消息一旦被consumer消费就从Broker中删除
主题
一定是先启动消费者订阅主题
消费者
public class JmsConsumerTopicJDBC {
public static final String ACTIVEMQ_URL = "tcp://localhost:61616";
public static final String USERNAME = "admin";
public static final String PASSWORD = "hll123";
public static final String TOPIC_NAME = "topic-jdbc";
public static void main(String[] args) throws Exception {
/** * 持久化主题消息订阅,类似于微信公众号订阅 * 需要先启动消费者,订阅上主题之后,后续生产主题消息,消费者(订阅者)就会接收到消息 * 消费者(订阅者)订阅主题之后,不管是在线还是离线状态,只要保持正常订阅状态,期间生产的消息都会接收到。离线的会在再次在线后接收到之前的消息 */
System.out.println("jdbc-1"); //模拟订阅用户
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.setClientID("jdbc-1"); // 设置clientId,表明订阅者
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic, "jdbc-1");
connection.start();
Message message = topicSubscriber.receive();
while (null != message) {
TextMessage textMessage = (TextMessage) message;
System.out.println("收到的持久化topic消息:" + textMessage.getText());
message = topicSubscriber.receive();
}
session.close();
connection.close();
}
}
启动消费者:


查看数据库,ACTIVEMQ_ACKS表中新增一条记录,为当前订阅者的信息

生产者
public class JmsProduceTopicJDBC {
public static final String ACTIVEMQ_URL = "tcp://localhost61616";
public static final String USERNAME = "admin";
public static final String PASSWORD = "hll123";
public static final String TOPIC_NAME = "topic-jdbc";
public static void main(String[] args) throws Exception {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
MessageProducer messageProducer = session.createProducer(topic);
// connection启动之前必须设置持久化主题
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
connection.start();
for (int i = 1; i <= 3; i++) {
TextMessage textMessage = session.createTextMessage("jdbc-msg:" + i);
messageProducer.send(textMessage);
}
messageProducer.close();
session.close();
connection.close();
System.out.println(" **** 持久化消息发送到MQ完成 **** ");
}
}
启动生产者:


查看数据库:ACTIVEMQ_MSGS会新增消费的数据,ACTIVEMQ_ACKS的LAST_ACKED_ID会更新为最后消费消息的ID
ACTIVEMQ_MSGS里的topic消息在消费后是不会立刻删除的,而queue在消费后自动删除


小总结
queue
生产的消息在没有消费的情况下,消息会存在
activemq_msgs表中,只要任意一个消费者消费这些消息后,这些消息就会立即删除topic
一般是先启动消费者订阅之后,再通过生产者生产消息,之后消息也会存在
activemq_msgs表中,activemq_acks表存的是消费者订阅信息开发注意事项
1.mysql驱动包(或者其他数据库)和对应的数据库连接池jar包需要放到activemq目录下的lib中
2.初次配置完成,数据库生成表之后,activemq.xml中配置
createTablesOnStartup=false3.BeanFactory not initialized or already closed异常
将操作系统的机器名带有的"_"符号去掉,重启操作系统
边栏推荐
- Silicon Valley classroom lesson 15 - Tencent cloud deployment
- How to choose a low code software development platform?
- PL/SQL工具导出sql文件所使用的命令是什么?
- Disable module (attribute node) in LabVIEW
- Shell脚本
- [hero planet July training leetcode problem solving daily] 19th binary tree
- Network principle (2) -- network development
- Sticky.js page scrolling div fixed position plug-in
- JS small game source code magic tower breakthrough Download
- OpenCV实现简单的人脸追踪
猜你喜欢

Bi business interview with data center and business intelligence (I): preparation for Industry and business research
![[stl]stack & queue simulation implementation](/img/92/c040c0e937e2666ee179189c60a3f2.png)
[stl]stack & queue simulation implementation
![[hero planet July training leetcode problem solving daily] 19th binary tree](/img/16/d4beab998f00e09bb45c64673bb2c8.png)
[hero planet July training leetcode problem solving daily] 19th binary tree

360度拖拽全景图插件tpanorama.js

How to write the code of wechat applet implementation tab
![[BUUCTF-n1book][第二章 web进阶]SSRF Training](/img/29/8894d04b27e0e73c4458c27bd9b935.png)
[BUUCTF-n1book][第二章 web进阶]SSRF Training

OpenCV实现简单的人脸追踪

Opencv realizes simple face tracking

The simplest sklearn environment configuration tutorial in the whole network (100% success)

51 MCU internal peripherals: serial port communication
随机推荐
[SCADA case] myscada helps VIB company realize the modernization and upgrading of production line
JDBC快速入门
这家十年内容产业基建公司,竟是隐形的Web3先行者
黑马程序员JDBC
Dark horse programmer JDBC
[NPM] the "NPM" item cannot be recognized as the name of cmdlets, functions, script files or runnable programs. Please check the spelling of the name. If the path is included, make sure the path is co
[BUUCTF-n1book][第二章 web进阶]SSRF Training
Redis learning notes
Solve the syntaxerror: unexpected end of JSON input
[graduation project] cinema booking management system based on micro Service Framework
How does Youxuan database encrypt data?
[Development Tutorial 9] crazy shell · open source Bluetooth smart health watch - storage
Algorithm --- flip digit (kotlin)
Ctfhub skill tree Web
Neural network learning (1) Introduction
Shell脚本
[deep learning] overview | the latest progress of deep learning
学习周刊-总第 63 期-一款开源的本地代码片段管理工具
Composition of the interview must ask items
Sina Weibo client (4) - set navigation bar theme