当前位置:网站首页>Use jedis to monitor redis stream to realize message queue function
Use jedis to monitor redis stream to realize message queue function
2022-06-26 05:20:00 【Huhailong blog】
brief introduction
Before using SpringBoot To monitor Redis Stream Realize the function of message queue , This time, we are sharing Jedis To do the same thing , But also can continue to expand the function , because Jedis I think it is still more flexible than the previous way . This implementation can use multithreading to listen .
Passed before SpringBoot Realize the article link :
SpringBoot Use in Redis Stream Realize message monitoring
Video demo
Use Jedis Monitor by yourself Redis Stream To achieve the effect of message queue Demo
Realization principle
This time, I will implement monitoring through group and consumer monitoring and mode use xread Native listening for , The difference between them is that if monitoring through groups and consumers is used, it can ensure that messages will only be consumed by the same consumer once , No repeated consumption of messages , Suitable for scenarios requiring data uniqueness , Such as warehousing or other operations . default xread Implementation mode: if there are several threads listening, these threads will receive the same inserted message at the same time , It can be understood as receiving messages by broadcasting .
This is mainly based on Redis Stream The following commands in correspond to Jedis Methods :
- xadd: Create groups
- xread: Reading data
- xgroup: Create groups
- xreadgroup: Read group messages
They are mainly used for reading block attribute , take block Property is set to 0 It means that it will be blocked until a new message is received , Then I put this step into a poll , It implements blocking. After receiving the message, it enters the next blocking , Thus, the monitoring effect is realized .
Implementation code
This time demo The code is simple , Are put into a class , And as long as there is redis You can run it directly after modifying the configuration in the code , You don't need to create... Manually stream Or group operation .
pom.xml file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>vip.huhailong</groupId>
<artifactId>JRedisMQ</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<jedis.version>4.2.3</jedis.version>
</properties>
<!-- jedis dependency -->
<dependencies>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>${jedis.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.36</version>
</dependency>
<!-- https://mvnrepository.com/artifact/ch.qos.logback/logback-classic -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.11</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.7</version>
</dependency>
</dependencies>
</project>
Implementation code
package jredismq.test;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.StreamEntryID;
import redis.clients.jedis.params.XAddParams;
import redis.clients.jedis.params.XReadGroupParams;
import redis.clients.jedis.params.XReadParams;
import redis.clients.jedis.resps.StreamEntry;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.IntStream;
/** * Use jedis Implement monitoring stream news */
public class JedisStreamMQTest {
private static final Logger logger = LoggerFactory.getLogger(JedisStreamMQTest.class);
public static void main(String[] args) {
// The following contents shall be modified according to their own conditions
String host = "192.168.1.110";
int port = 6379;
int timeout = 1000;
String password = "huhailong";
int database = 0;
String streamKeyName = "streamtest";
String groupName = "testgroup";
String[]consumerNames = {
"huhailong", "xiaohu"};
String listenerType = "DEFAULT"; //GROUP or DEFAULT
// establish redis Connection pool instance
JedisPool pool = new JedisPool(new GenericObjectPoolConfig<>(),host,port,timeout,password,database);
JedisStreamMQTest test = new JedisStreamMQTest();
test.createGroup(pool,streamKeyName,groupName); // Create groups
if("GROUP".equals(listenerType)){
test.listenerByGroup(pool,streamKeyName,groupName,consumerNames); // Use groups and consumers to listen
}else{
test.listenerDefault(pool,streamKeyName);
}
new Thread(()->{
// Threads 3: Used to write stream data
Jedis jedis = pool.getResource();
while(true) {
try {
Thread.sleep(500L);
Map<String,String> map = new HashMap<>();
map.put("currentTime", LocalDateTime.now().toString());
jedis.xadd(streamKeyName,map, XAddParams.xAddParams());
} catch (Exception e) {
logger.error(e.getMessage());
}
}
}).start();
}
/** * Use groups and consumers to listen , This listening ensures that messages are not consumed repeatedly , Because each group and each user will consume messages only once * @param keyName stream name * @param groupName The name of the group * @param consumerNames Consumer name set */
private void listenerByGroup(JedisPool pool, String keyName, String groupName, String...consumerNames){
Map<String, StreamEntryID> entryIDMap = new HashMap<>();
entryIDMap.put(keyName,StreamEntryID.UNRECEIVED_ENTRY);
// The following is a simple demonstration of not using a thread pool , Directly create two threads to illustrate the problem
IntStream.range(0,2).forEach(i->{
Jedis jedis = pool.getResource(); // establish jedis example
new Thread(()->{
while(true){
try{
Thread.sleep(500L);
// Below xreadGroup Method equivalent to redis Medium xreadgroup command , take block The blocking time is set to 0 Indicates that the message is always blocked until it is received , And then up there StreamEntryID Set to receive the latest value
List<Map.Entry<String, List<StreamEntry>>> entries = jedis.xreadGroup(groupName, consumerNames[i], XReadGroupParams.xReadGroupParams().block(0), entryIDMap);
logger.info("Thread:-{},result:{}",Thread.currentThread().getName(), JSONObject.toJSONString(entries.get(0).getValue().get(0).getFields()));
// jedis.xack(keyName,groupName,entries.get(0).getValue().get(0).getID()); // Confirmation message
jedis.xdel(keyName,entries.get(0).getValue().get(0).getID()); // removal message
} catch (Exception e){
logger.error(e.getMessage());
}
}
}).start();
});
}
/** * Do not use the concept of groups and consumers to read , Multiple threads will consume data repeatedly * @param keyName stream name */
private void listenerDefault(JedisPool pool, String keyName){
Map<String, StreamEntryID> entryIDMap = new HashMap<>();
entryIDMap.put(keyName,StreamEntryID.LAST_ENTRY);
// The following is a simple demonstration of not using a thread pool , Directly create two threads to illustrate the problem
IntStream.range(0,2).forEach(i->{
new Thread(()->{
Jedis jedis = pool.getResource(); // establish jedis example
while(true){
try{
Thread.sleep(500L);
List<Map.Entry<String, List<StreamEntry>>> entries = jedis.xread(XReadParams.xReadParams().block(0), entryIDMap);
logger.info("Thread:-{},result:{}",Thread.currentThread().getName(), JSONObject.toJSONString(entries.get(0).getValue().get(0).getFields()));
jedis.xdel(keyName,entries.get(0).getValue().get(0).getID());
} catch (Exception e){
logger.error(e.getMessage());
}
}
}).start();
});
}
private void createGroup(JedisPool pool, String keyName, String groupName){
Jedis jedis = pool.getResource();
try{
//StreamEntryID Means to create a group and receive new messages , Here you can set it according to your own needs ,0 Indicates that all historical messages are read , hinder boolean Value indicates if stream Does not exist create stream
jedis.xgroupCreate(keyName,groupName,StreamEntryID.LAST_ENTRY,true);
} catch (Exception e){
// The reason why the exception is caught here is that the group may already exist when it is created
logger.error(e.getMessage());
}
}
}
Ben demo The code is relatively simple , You can modify and encapsulate according to your own needs . I am also continuing to explore the use of this method to package and improve into a complete project , Does not rely on third-party frameworks such as Spring Project , So you can use it flexibly , If you find it useful, please give me some praise !
边栏推荐
- Mongodb image configuration method
- 两步处理字符串正则匹配得到JSON列表
- 《财富自由之路》读书之一点体会
- 二次bootloader关于boot28.asm应用的注意事项,28035的
- [unity3d] collider assembly
- The best Chinese open source class of vision transformer, ten hours of on-site coding to play with the popular model of Vit!
- First day of deep learning and tensorflow learning
- Keras actual combat cifar10 in tensorflow
- How to make your big file upload stable and fast?
- RESNET practice in tensorflow
猜你喜欢

国务院发文,完善身份认证、电子印章等应用,加强数字政府建设

cartographer_pose_graph_2d

【Unity3D】刚体组件Rigidbody

zencart新建的URL怎么重写伪静态

Practical cases | getting started and mastering tkinter+pyinstaller

Decipher the AI black technology behind sports: figure skating action recognition, multi-mode video classification and wonderful clip editing

Replacing domestic image sources in openwrt for soft routing (take Alibaba cloud as an example)

12 multithreading
![[unity3d] human computer interaction input](/img/4d/47f6d40bb82400fe9c6d624c8892f7.png)
[unity3d] human computer interaction input

How MySQL deletes all redundant duplicate data
随机推荐
cartographer_local_trajectory_builder_2d
What is UWB in ultra-high precision positioning system
LeetCode_二叉搜索树_简单_108.将有序数组转换为二叉搜索树
Replacing domestic image sources in openwrt for soft routing (take Alibaba cloud as an example)
ssh连win10报错:Permission denied (publickey,keyboard-interactive).
【ARM】讯为rk3568开发板buildroot添加桌面应用
How MySQL deletes all redundant duplicate data
data = self._ data_ queue. get(timeout=timeout)
Tp5.0框架 PDO连接mysql 报错:Too many connections 解决方法
GD32F3x0 官方PWM驱动正频宽偏小(定时不准)的问题
红队得分方法统计
First day of deep learning and tensorflow learning
百度API地图的标注不是居中显示,而是显示在左上角是怎么回事?已解决!
Happy New Year!
C# 39. Conversion between string type and byte[] type (actual measurement)
Sentimentin tensorflow_ analysis_ cell
tensorlow:cifar100_ train
CMakeLists. txt Template
Guanghetong and anti international bring 5g R16 powerful performance to the AI edge computing platform based on NVIDIA Jetson Xavier nx
《财富自由之路》读书之一点体会