当前位置:网站首页>RedisTemplate Pipeline 管道使用
RedisTemplate Pipeline 管道使用
2022-07-22 22:45:00 【Thai_】
官网文档:https://docs.spring.io/spring-data/redis/docs/current/reference/html/
一、前言
当需要执行大批量的写入或者查询时,使用 redis 一条条命令的执行性能肯定没有一次性执行完要好;假设执行完一条 redis 命令的网络耗时为 20ms,有 1 万条命令需要执行,算一下光发送这些命令的网络耗时就达到 200,000ms(200s),这是不能接受的,我们可以使用 RedisTemplate 提供的管道进行批量执行。
根据官网的描述:Redis 提供了对 pipelining 的支持,在向服务器发送多个命令时,无需等待每一条命令响应,然后在一个步骤中读取所有的响应。经过打包命令发送与返回,在一定程度上节省了网络io耗时。
二、Pipelining 介绍与使用
我们使用 Spring 的RedisTemplate 来执行管道操作,RedisTemplate 提供了管道的方法,如下图:
可以看到主要为 SessionCallback 与 RedisCallback,它们的区别主要为 API 的封装,RedisCallback 为原生的 api,SessionCallback 为 Spring 封装的 api。
下图为 SessionCallback 的方法:
追踪源码可以知道入参 RedisOperations<K, V> operations 其实就是 RedisTemplate 本身,因此所有操作都是经过封装的 api
private Object executeSession(SessionCallback<?> session) {
return session.execute(this);
}
下图为 RedisConnection 的方法:
同样的,我们追踪源码可以知道入参为 RedisConnection
public <T> T execute(RedisCallback<T> action, boolean exposeConnection, boolean pipeline) {
// 略...
RedisConnectionFactory factory = getRequiredConnectionFactory();
RedisConnection conn = RedisConnectionUtils.getConnection(factory, enableTransactionSupport);
try {
boolean existingConnection = TransactionSynchronizationManager.hasResource(factory);
RedisConnection connToUse = preProcessConnection(conn, existingConnection);
boolean pipelineStatus = connToUse.isPipelined();
if (pipeline && !pipelineStatus) {
connToUse.openPipeline();
}
RedisConnection connToExpose = (exposeConnection ? connToUse : createRedisConnectionProxy(connToUse));
// 入参为 connection
T result = action.doInRedis(connToExpose);
// close pipeline
if (pipeline && !pipelineStatus) {
connToUse.closePipeline();
}
return postProcessResult(result, connToUse, existingConnection);
} finally {
RedisConnectionUtils.releaseConnection(conn, factory, enableTransactionSupport);
}
}
因此它们两个的主要区别就是提供的方法不同,如果想使用原生的 api 则使用 RedisCallback,想使用 Spring 给我们封装后的 api 则使用 SessionCallback。
介绍了管道的基本实现,下面以 SessionCallback 实现来讲讲如何使用:
写入:
SessionCallback<?> sessionCallback = new SessionCallback<>() {
@Override
public <K, V> Object execute(RedisOperations<K, V> operations) throws DataAccessException {
// 转为你 RedisTemplate 即可
RedisTemplate<String, Object> ops = (RedisTemplate<String, Object>) operations;
ops.opsForValue().set("key", "value");
// 必须返回 null,
return null;
}
};
redisTemplate.executePipelined(sessionCallback);
需要注意的是返回值必须为 null,否则会报错;源码判断结果是否为 null 逻辑如下:
Object result = executeSession(session);
if (result != null) {
throw new InvalidDataAccessApiUsageException("Callback cannot return a non-null value as it gets overwritten by the pipeline");
}
读取:
SessionCallback<?> sessionCallback = new SessionCallback<>() {
@Override
public <K, V> Object execute(RedisOperations<K, V> operations) throws DataAccessException {
// 转为你 RedisTemplate 即可
RedisTemplate<String, Object> ops = (RedisTemplate<String, Object>) operations;
ops.opsForValue().get("key");
// 必须返回 null,
return null;
}
};
redisTemplate.executePipelined(sessionCallback);
需要注意的是我们不能直接将结果存储起来,类似这样 ×
List<Object> results = new ArrayList<>(10);
SessionCallback<?> sessionCallback = new SessionCallback<>() {
@Override
public <K, V> Object execute(RedisOperations<K, V> operations) throws DataAccessException {
// 转为你 RedisTemplate 即可
RedisTemplate<String, Object> ops = (RedisTemplate<String, Object>) operations;
results.add(ops.opsForValue().get("key"));
// 必须返回 null,
return null;
}
};
这样子是拿不到需要的查询结果,正确的做法是从管道执行的返回结果获取 √
List<Object> resultObjs = redisTemplate.executePipelined(sessionCallback);
而从源码可以知道,结果确实也是从管道中拿到的
Object result = executeSession(session);
if (result != null) {
throw new InvalidDataAccessApiUsageException("Callback cannot return a non-null value as it gets overwritten by the pipeline");
}
// 里面的实现大致是从 future 中 get 拿到结果;具体步骤这里不做分析,有兴趣的可以自己看源码
List<Object> closePipeline = connection.closePipeline();
三、测试
下面测试一下使用管道与不使用管道写入 key 耗时。注:Redis 为单节点
- 先测试不使用管道,写入 1 万个 key 耗时
@Test
public void write() {
TimeInterval timer = DateUtil.timer();
String keyPrefix = "writeTest:";
ValueOperations<String, Object> operations = redisTemplate.opsForValue();
for (int i = 0; i < 10000; i++) {
operations.set(keyPrefix + i, i);
}
System.out.println("写入 1 万个 key 耗时:" + timer.intervalMs() + " ms");
}
输出结果为:写入 1 万个 key 耗时:5796 ms
- 再测试使用管道的耗时情况
@Test
public void write() {
TimeInterval timer = DateUtil.timer();
String keyPrefix = "writeTest:";
SessionCallback<?> sessionCallback = new SessionCallback<>() {
@Override
public <K, V> Object execute(RedisOperations<K, V> opt) throws DataAccessException {
RedisTemplate<String, Object> template = (RedisTemplate<String, Object>) opt;
ValueOperations<String, Object> operations = template.opsForValue();
for (int i = 0; i < 10000; i++) {
operations.set(keyPrefix + i, i);
}
return null;
}
};
redisTemplate.executePipelined(sessionCallback);
System.out.println("写入 1 万个 key 耗时:" + timer.intervalMs() + " ms");
}
输出结果为:写入 1 万个 key 耗时:626 ms
从输出的结果就能明显看出管道的使用对批量插入与读取的性能有很大的提升!!
四、总结
本文主要介绍了在 Spring 中使用管道的两种方式,分别为 SessionCallback 与 RedisCallback 的方法重载;介绍了它们之间的区别,并介绍了 SessionCallback 方法如何进行写入与读取;最后简单的对使用管道与不使用管道在批量写入的情况下性能的差异。
边栏推荐
猜你喜欢

读书笔记->统计学】12-02 置信区间的构建-t分布概念简介

数的三次方根

为什么有的人把代码写的如此复杂?

I can't be angry with "voluntary salary reduction". I'm naked. I'm three times in four days. How can it end like this?

获取一个控件宽度

Hcip --- BGP comprehensive experiment

张宇高数30讲总结

学习总结 | 真实记录 MindSpore 两日集训营能带给你什么(一)!

互联网流量编排方案

There are 13 detailed methods for JMeter to view the response of the result tree!
随机推荐
Redis事务与锁机制
uni-app进阶之内嵌应用【day14】
Live broadcast preview | live broadcast Seminar on open source security governance models and tools
Redis 配置文件
Web resource sharing
There are 13 detailed methods for JMeter to view the response of the result tree!
Hcip --- BGP comprehensive experiment
Redis profile
第三章 栈
读书笔记->统计学】12-02 置信区间的构建-t分布概念简介
开发者分享|『啃书吧:深度学习与MindSpore实践』第一章
为什么有的人把代码写的如此复杂?
Flick enables mapstate to implement keyedstate
学习总结 | 真实记录 MindSpore 两日集训营能带给你什么(一)!
技术干货 | 数据处理好难?来看MindSpore提供的解决思路!
将childNodes返回的伪数组转化为真数组
How to use C language to realize simple employee information management system
Can PHP split arrays
Object prototype of typescript object extension__ proto__ And prototype
c语言十进制数转二进制数