当前位置:网站首页>Mise en file d'attente des messages en utilisant jedis Listening redis stream
Mise en file d'attente des messages en utilisant jedis Listening redis stream
2022-06-26 05:19:00 【Hu Hailong Blog】
Introduction
Précédemment utiliséSpringBootVa écouter.Redis StreamMise en file d'attente des messages,Ce partage est fait avecJedisPour réaliser la même fonctionnalité,Et vous pouvez continuer à étendre la fonctionnalité,Parce queJedisJe pense que c'est encore plus flexible qu'avant.Cette mise en œuvre de l'écoute peut utiliser plusieurs Threads pour écouter.
Avant de passerSpringBootLien vers l'article de mise en oeuvre:
SpringBoot Utilisé dansRedis Stream Réaliser l'écoute des messages
Présentation vidéo
UtiliserJedisRéaliser l'écoute par soi - mêmeRedis StreamLa fonction de la file d'attente des messages atteint l'effetDemo
Principe de réalisation
Cette mise en œuvre de l'écoute est divisée en écoute de groupe et de consommateur et en utilisation de modexreadSurveillance native de,La différence est que si vous utilisez un moniteur comme un groupe et un consommateur, vous pouvez vous assurer que le message n'est consommé qu'une seule fois par le même consommateur,Pas de consommation répétée de messages,Convient aux scénarios qui exigent l'unicité des données,Comme l'entreposage ou d'autres opérations.Par défautxreadL'implémentation écoute plusieurs Threads qui reçoivent le même message inséré en même temps,Peut être compris comme une façon de recevoir des messages à la radio.
C'est principalement basé surRedis StreamLes commandes suivantes correspondent àJedisMéthode:
- xadd:Créer un groupe
- xread:Lire les données
- xgroup: Créer un groupe
- xreadgroup: Lire le message du Groupe
Ils sont principalement utilisés pour la lecture blockPropriétés,Oui.blockLa propriété est définie à0 Est bloqué jusqu'à ce qu'un nouveau message soit reçu , Et j'ai mis cette étape dans un sondage , Mise en œuvre du blocage entrée dans le prochain blocage après réception du message , Pour obtenir l'effet d'écoute .
Code de mise en œuvre
Cette foisdemoLe Code est simple, Dans une classe , Et tant qu'il y a redis Peut être exécuté directement après avoir modifié la configuration dans le Code ,Il n'est pas nécessaire de créer manuellementstream Ou groupe, etc. .
pom.xmlDocumentation
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>vip.huhailong</groupId>
<artifactId>JRedisMQ</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<jedis.version>4.2.3</jedis.version>
</properties>
<!-- jedis dependency -->
<dependencies>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>${jedis.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.36</version>
</dependency>
<!-- https://mvnrepository.com/artifact/ch.qos.logback/logback-classic -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.11</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.7</version>
</dependency>
</dependencies>
</project>
Code de mise en œuvre
package jredismq.test;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.StreamEntryID;
import redis.clients.jedis.params.XAddParams;
import redis.clients.jedis.params.XReadGroupParams;
import redis.clients.jedis.params.XReadParams;
import redis.clients.jedis.resps.StreamEntry;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.IntStream;
/** * UtiliserjedisRéaliser l'écoutestreamMessage */
public class JedisStreamMQTest {
private static final Logger logger = LoggerFactory.getLogger(JedisStreamMQTest.class);
public static void main(String[] args) {
// Ce qui suit est modifié en fonction de votre situation
String host = "192.168.1.110";
int port = 6379;
int timeout = 1000;
String password = "huhailong";
int database = 0;
String streamKeyName = "streamtest";
String groupName = "testgroup";
String[]consumerNames = {
"huhailong", "xiaohu"};
String listenerType = "DEFAULT"; //GROUP or DEFAULT
//Création redis Instance du pool de connexion
JedisPool pool = new JedisPool(new GenericObjectPoolConfig<>(),host,port,timeout,password,database);
JedisStreamMQTest test = new JedisStreamMQTest();
test.createGroup(pool,streamKeyName,groupName); //Créer un groupe
if("GROUP".equals(listenerType)){
test.listenerByGroup(pool,streamKeyName,groupName,consumerNames); // Utiliser l'écoute des groupes et des consommateurs
}else{
test.listenerDefault(pool,streamKeyName);
}
new Thread(()->{
//Thread3:Pour écrirestreamDonnées
Jedis jedis = pool.getResource();
while(true) {
try {
Thread.sleep(500L);
Map<String,String> map = new HashMap<>();
map.put("currentTime", LocalDateTime.now().toString());
jedis.xadd(streamKeyName,map, XAddParams.xAddParams());
} catch (Exception e) {
logger.error(e.getMessage());
}
}
}).start();
}
/** * Utiliser l'écoute des groupes et des consommateurs , L'écoute assure que les messages ne sont pas consommés en double , Parce que chaque groupe ne consomme des messages qu'une seule fois par utilisateur * @param keyName stream Nom * @param groupName Nom du Groupe * @param consumerNames Collection de noms de consommateurs */
private void listenerByGroup(JedisPool pool, String keyName, String groupName, String...consumerNames){
Map<String, StreamEntryID> entryIDMap = new HashMap<>();
entryIDMap.put(keyName,StreamEntryID.UNRECEIVED_ENTRY);
// Ce qui suit n'utilise pas de pool de Threads pour démontrer la simplicité , Créer deux Threads directement pour expliquer le problème
IntStream.range(0,2).forEach(i->{
Jedis jedis = pool.getResource(); //CréationjedisExemple
new Thread(()->{
while(true){
try{
Thread.sleep(500L);
//En bas. xreadGroup Équivalence méthodologique et redisDansxreadgroupLes ordres,Oui.block Le temps de blocage est fixé à 0 Indique qu'il a été bloqué jusqu'à ce que le message soit reçu ,Et là - haut.StreamEntryID Set to receive the latest value
List<Map.Entry<String, List<StreamEntry>>> entries = jedis.xreadGroup(groupName, consumerNames[i], XReadGroupParams.xReadGroupParams().block(0), entryIDMap);
logger.info("Thread:-{},result:{}",Thread.currentThread().getName(), JSONObject.toJSONString(entries.get(0).getValue().get(0).getFields()));
// jedis.xack(keyName,groupName,entries.get(0).getValue().get(0).getID()); //Message de confirmation
jedis.xdel(keyName,entries.get(0).getValue().get(0).getID()); //Supprimer le message
} catch (Exception e){
logger.error(e.getMessage());
}
}
}).start();
});
}
/** * Lire sans utiliser le concept de groupe et de consommateur , Plusieurs Threads dupliquent les données de consommation * @param keyName stream Nom */
private void listenerDefault(JedisPool pool, String keyName){
Map<String, StreamEntryID> entryIDMap = new HashMap<>();
entryIDMap.put(keyName,StreamEntryID.LAST_ENTRY);
// Ce qui suit n'utilise pas de pool de Threads pour démontrer la simplicité , Créer deux Threads directement pour expliquer le problème
IntStream.range(0,2).forEach(i->{
new Thread(()->{
Jedis jedis = pool.getResource(); //CréationjedisExemple
while(true){
try{
Thread.sleep(500L);
List<Map.Entry<String, List<StreamEntry>>> entries = jedis.xread(XReadParams.xReadParams().block(0), entryIDMap);
logger.info("Thread:-{},result:{}",Thread.currentThread().getName(), JSONObject.toJSONString(entries.get(0).getValue().get(0).getFields()));
jedis.xdel(keyName,entries.get(0).getValue().get(0).getID());
} catch (Exception e){
logger.error(e.getMessage());
}
}
}).start();
});
}
private void createGroup(JedisPool pool, String keyName, String groupName){
Jedis jedis = pool.getResource();
try{
//StreamEntryID Représente la création d'un groupe et la réception de nouveaux messages , Ceci peut être réglé en fonction de vos besoins ,0 Indique que tous les messages historiques sont lus ,Derrière.boolean La valeur indique si stream Il n'y a pas de création stream
jedis.xgroupCreate(keyName,groupName,StreamEntryID.LAST_ENTRY,true);
} catch (Exception e){
// L'exception a été saisie ici parce qu'il est possible que le Groupe existait déjà au moment de la création.
logger.error(e.getMessage());
}
}
}
BendemoLe Code est encore plus simple, Vous pouvez modifier et encapsuler en fonction de vos besoins . J'explore également la possibilité d'encapsuler et de peaufiner un projet complet de cette façon. , Ne dépend pas d'un cadre tiers, par exemple SpringProjets pour, Pour pouvoir l'utiliser avec souplesse , Un petit ami qui trouve ça utile. !
边栏推荐
- Introduction to GUI programming to game practice (I)
- 2. < tag dynamic programming and conventional problems > lt.343 integer partition
- Henkel database custom operator '~~‘
- 递归遍历目录结构和树状展现
- How to rewrite a pseudo static URL created by zenpart
- Technical past: tcp/ip protocol that has changed the world (precious pictures, caution for mobile phones)
- Tp5.0框架 PDO连接mysql 报错:Too many connections 解决方法
- 程序人生
- 【上采样方式-OpenCV插值】
- cartographer_pose_graph_2d
猜你喜欢

Installation and deployment of alluxio

关于支付接口回调地址参数字段是“notify_url”,签名过后的特殊字符url编码以后再解码后出现错误(¬ , ¢, ¤, £)

Guanghetong and anti international bring 5g R16 powerful performance to the AI edge computing platform based on NVIDIA Jetson Xavier nx

PHP二维/多维数组按照指定的键值来进行升序和降序

6.1 - 6.2 introduction to public key cryptography

The parameter field of the callback address of the payment interface is "notify_url", and an error occurs after encoding and decoding the signed special character URL (,,,,,)

How to rewrite a pseudo static URL created by zenpart

Baidu API map is not displayed in the middle, but in the upper left corner. What's the matter? Resolved!

Tp5.0 framework PDO connection MySQL error: too many connections solution

Codeforces Round #800 (Div. 2)
随机推荐
First day of deep learning and tensorflow learning
vscode config
[greedy college] recommended system engineer training plan
慢慢学JVM之缓存行和伪共享
瀚高数据库自定义操作符‘!~~‘
uni-app吸顶固定样式
How does P2P technology reduce the bandwidth of live video by 75%?
Status of processes and communication between processes
Codeforces Round #800 (Div. 2)
Apktool tool usage document
Installation and deployment of alluxio
Learn from small samples and run to the sea of stars
apktool 工具使用文档
[leetcode] 713: subarray with product less than k
Sentimentin tensorflow_ analysis_ layer
递归遍历目录结构和树状展现
百度API地图的标注不是居中显示,而是显示在左上角是怎么回事?已解决!
Guanghetong and anti international bring 5g R16 powerful performance to the AI edge computing platform based on NVIDIA Jetson Xavier nx
【Unity3D】碰撞体组件Collider
[red team] what preparations should be made to join the red team?