当前位置:网站首页>利用线程通信、解决缓存穿透数据库雪崩
利用线程通信、解决缓存穿透数据库雪崩
2020-11-07 18:55:00 【lis1314】
业务场景:
有一个面向C端的查询接口,访问量很大,假设我们使用缓存技术进行了传统的优化,第一次查询数据时,查缓存->缓存没有->查数据库->写入缓存
但是可能会面临一个问题、在同一时刻有很多用户(假设1W)查询同一条数据(假设商品ID一致)、此时数据并没有在缓存、可能会造成数据库雪崩,原因是这个时刻可能因为同一条数据对数据库进行了1W次的SQL查询(这里不讨论预先写入缓存方案,同时这里说的1W次也是极端的情况)。
我们怎么进行优化,这是本篇文章讨论的重点:
思路效果:
无论有多少用户访问查询、假设他们查询商品数据携带的ID一致、那么只产生1条查询、其他用户线程共用同一个查询结果,那么也就从n(10000)变成了1。
这个方案适合任何多线程工作去除重复效果提高性能的场景,这里只是其中一个场景。
下面讨论实现:
A、B、C 3个线程并发查询商品ID为1的数据,传统的缓存优化缓存中没有数据时,会发起3个SQL查询访问数据库,例sql:select * from products where id = 1;
那么我们如何利用行级锁(用户ID维度)来达到A、B、C,3个线程只查询一次数据库得到对应的结果呢?
思路:首先我们需要一个通信维度(用户ID一致认为是同一行为),如果访问入参的用户ID一样,我们可以实现如下的一个效果:
第一个进入的线程进行查询操作(假设是A),那么B、C线程我们让他进入等待的状态,等待线程A的查询结果,最终达到只查询一次数据库的操作。
代码灵感来自阿里开源缓存框架jetcache,下面是改良后的公共实现,可以通用
package com.xxx.utils.sync;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.slf4j.Logger;
import com.alibaba.fastjson.JSON;
/**
* 同步工具类<br/>
* 主要预防优化缓存雪崩、数据雪崩</br>
* 多线程访问的情况下、认为是同一操作的重复行为
*
*/
public class SyncUtil {
private static final Logger logger = org.slf4j.LoggerFactory.getLogger(SyncUtil.class);
private ConcurrentHashMap<Object, LoaderLock> loaderMap = new ConcurrentHashMap<>();
private Function<Object, Object> KeyConvertor;
public SyncUtil() {
this(null);
}
/**
* @param KeyConvertor 设置key转换规则<br/>
* 可以为空,默认使用fastJson
*
*/
public SyncUtil(Function<Object, Object> KeyConvertor) {
if(KeyConvertor == null) {
this.KeyConvertor = new Function<Object, Object>() {
@Override
public Object apply(Object originalKey) {
if (originalKey == null) {
return null;
}
if (originalKey instanceof String) {
return originalKey;
}
return JSON.toJSONString(originalKey);
}
};
}else {
this.KeyConvertor = KeyConvertor;
}
}
/**
* 同步加载函数<br/>
* 例如:A、B、C三个线程同一时刻查询执行某一个操作(查询数据库等),实际上查询的参数条件一样<br/>
* 这个时候我们可以对线程进行优化、防止数据雪崩缓存穿透,让其中的一个线程进行查库操作,其他线程等待具体工作线程的返回结果<br/>
* 如:A线程进行查库、B、C线程进行等待A线程的返回结果。
* @param <K>
* @param <V>
* @param timeout 等待时间(毫秒)(假设A线程查询超过等待时间、那么B、C线程放弃等待自己去执行业务查询)
* @param key 辨别是否是同一操作的key
* @param loader 加载数据的函数<br/>
* Function<String, String> loader = new Function<String, String>() {
@Override
public String apply(String t) {
return "dbData";
}
};
*
* @return
*/
@SuppressWarnings("unchecked")
public <K, V> V synchronizedLoad(Integer timeout, K key, Function<K, V> loader) {
Object lockKey = buildLoaderLockKey(key);
while (true) {
//有没有线程去做具体的获取业务数据的逻辑
boolean create[] = new boolean[1];
LoaderLock ll = loaderMap.computeIfAbsent(lockKey, (unusedKey) -> {
create[0] = true;
LoaderLock loaderLock = new LoaderLock();
loaderLock.signal = new CountDownLatch(1);
loaderLock.loaderThread = Thread.currentThread();
return loaderLock;
});
if (create[0] || ll.loaderThread == Thread.currentThread()) {
try {
//第一个进入的线程进行真实的访问操作
V loadedValue = loader.apply(key);
ll.success = true;
ll.value = loadedValue;
return loadedValue;
} finally {
if (create[0]) {
ll.signal.countDown();
loaderMap.remove(lockKey);
}
}
} else {
//其他线程进行等待操作
try {
if (timeout == null) {
ll.signal.await();
} else {
boolean ok = ll.signal.await(timeout, TimeUnit.MILLISECONDS);
if (!ok) {
//如果等待超时,放弃等待,当前线程直接进行访问
logger.info("loader wait timeout:" + timeout);
return loader.apply(key);
}
}
} catch (InterruptedException e) {
logger.warn("loader wait interrupted");
return loader.apply(key);
}
//共用同一个返回结果
if (ll.success) {
return (V) ll.value;
} else {
continue;
}
}
}
}
private Object buildLoaderLockKey(Object key) {
return KeyConvertor.apply(key);
}
static class LoaderLock {
CountDownLatch signal;
Thread loaderThread;
volatile boolean success;
volatile Object value;
}
}
测试代码
package test;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import com.xxx.MessageServiceApplication;
import com.xxx.mapper.ActivityInfoMapper;
import com.xxx.model.ActivityInfo;
import com.xxx.utils.sync.SyncUtil;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = { MessageServiceApplication.class })
public class QueryTest {
@Autowired
private ActivityInfoMapper activityInfoMapper;
SyncUtil syncUtil = new SyncUtil();
private AtomicInteger count = new AtomicInteger();
private CyclicBarrier barrier = new CyclicBarrier(100);
ExecutorService pool = Executors.newFixedThreadPool(100);
public void synSelect(Long id) throws Exception {
for (int i = 0; i < 100; i++) {
pool.execute(()->{
try {
barrier.await();//线程等待,触发多线程同步的效果
// realSelect1(id);//传统查询
realSelect2(id);//优化后
} catch (Exception e) {
e.printStackTrace();
}
});
}
pool.shutdown();
while(!pool.isTerminated()){
}
System.out.println();
}
public ActivityInfo realSelect2(Long id) {
//线程通信的访问
Function<Long,ActivityInfo> loader = (param)->{
//记录真实的访问数据库次数(1)
count.incrementAndGet();
return activityInfoMapper.selectById(param);
};
return syncUtil.synchronizedLoad(3000, id, loader);
}
public ActivityInfo realSelect1(Long id) {
//记录真实的访问数据库次数(100)
count.incrementAndGet();
return activityInfoMapper.selectById(id);
}
@Test
public void testSelect() throws Exception {
synSelect(1L);
System.out.println("真实调用次数="+count.get());
}
}
传统的查询结果

优化后的查询结果,只执行了1次SQL

版权声明
本文为[lis1314]所创,转载请带上原文链接,感谢
https://my.oschina.net/lis1314/blog/4707646
边栏推荐
- 如何解决谷歌Chrome浏览器空白页的问题
- 想要忘记以前连接到Mac的WiFi网络,试试这个方法!
- Python3 operating gitlab
- ImageMagick - add watermark
- If you want to forget the WiFi network you used to connect to your Mac, try this!
- 把 4个消息队列都拉到一个群里后,他们吵起来了
- 11.Service更新
- Python 3 operates the Jenkins module API
- MongoDB下,启动服务时,出现“服务没有响应控制功能”解决方法
- 图像处理工具包ImagXpress使用教程,如何查看事件
猜你喜欢

C# 枚举权限 |和||,&和&&的区别

10000! Ideal car recalls all defective cars: 97 accidents have occurred and losses will be expanded

idea 激活到 2089 失效

Kubernetes (1): introduction to kubernetes

confd

Mac新手必备小技巧

Solution to st link USB communication error in stlink Download

JVM class loading mechanism

Jenkins pipline stage setting timeout

Gantt chart grouping activities tutorial
随机推荐
How to create an interactive kernel density chart
idea 激活到 2089 失效
Chinese sub forum of | 2020 PostgreSQL Asia Conference: Pan Juan
A kind of super parameter optimization technology hyperopt
Classroom exercises
条形码识别器Dynamsoft Barcode Reader v7.5全新上线!
Opencv computer vision learning (10) -- image transform (Fourier transform, high pass filter, low pass filter)
Key points of C language -- index article (let you fully understand indicators) | understand indicators from memory | complete analysis of indicators
Two dimensional code location and alarm system of Expressway
【笔记】Error while loading PyV8 binary: exit code 1解决方法
Is blazor ready to serve the enterprise?
[original] the impact of arm platform memory and cache on the real-time performance of xenomai
HandlerMethodArgumentResolver使用和原理
New features of vue3
MongoDB下,启动服务时,出现“服务没有响应控制功能”解决方法
Jenkins入门(二)声明式流水线Jenkins Pipeline
pc端与移动端适配解决方案之rem
[note] error while loading pyv8 binary: exit code 1 solution
谈了多年的数字化转型,为什么还有很多企业依然“口头管理”
The first choice for lightweight GPU applications is the NVIDIA vgpu instance launched by Jingdong Zhilian cloud