当前位置:网站首页>利用线程通信、解决缓存穿透数据库雪崩
利用线程通信、解决缓存穿透数据库雪崩
2020-11-07 18:55:00 【lis1314】
业务场景:
有一个面向C端的查询接口,访问量很大,假设我们使用缓存技术进行了传统的优化,第一次查询数据时,查缓存->缓存没有->查数据库->写入缓存
但是可能会面临一个问题、在同一时刻有很多用户(假设1W)查询同一条数据(假设商品ID一致)、此时数据并没有在缓存、可能会造成数据库雪崩,原因是这个时刻可能因为同一条数据对数据库进行了1W次的SQL查询(这里不讨论预先写入缓存方案,同时这里说的1W次也是极端的情况)。
我们怎么进行优化,这是本篇文章讨论的重点:
思路效果:
无论有多少用户访问查询、假设他们查询商品数据携带的ID一致、那么只产生1条查询、其他用户线程共用同一个查询结果,那么也就从n(10000)变成了1。
这个方案适合任何多线程工作去除重复效果提高性能的场景,这里只是其中一个场景。
下面讨论实现:
A、B、C 3个线程并发查询商品ID为1的数据,传统的缓存优化缓存中没有数据时,会发起3个SQL查询访问数据库,例sql:select * from products where id = 1;
那么我们如何利用行级锁(用户ID维度)来达到A、B、C,3个线程只查询一次数据库得到对应的结果呢?
思路:首先我们需要一个通信维度(用户ID一致认为是同一行为),如果访问入参的用户ID一样,我们可以实现如下的一个效果:
第一个进入的线程进行查询操作(假设是A),那么B、C线程我们让他进入等待的状态,等待线程A的查询结果,最终达到只查询一次数据库的操作。
代码灵感来自阿里开源缓存框架jetcache,下面是改良后的公共实现,可以通用
package com.xxx.utils.sync;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.slf4j.Logger;
import com.alibaba.fastjson.JSON;
/**
* 同步工具类<br/>
* 主要预防优化缓存雪崩、数据雪崩</br>
* 多线程访问的情况下、认为是同一操作的重复行为
*
*/
public class SyncUtil {
private static final Logger logger = org.slf4j.LoggerFactory.getLogger(SyncUtil.class);
private ConcurrentHashMap<Object, LoaderLock> loaderMap = new ConcurrentHashMap<>();
private Function<Object, Object> KeyConvertor;
public SyncUtil() {
this(null);
}
/**
* @param KeyConvertor 设置key转换规则<br/>
* 可以为空,默认使用fastJson
*
*/
public SyncUtil(Function<Object, Object> KeyConvertor) {
if(KeyConvertor == null) {
this.KeyConvertor = new Function<Object, Object>() {
@Override
public Object apply(Object originalKey) {
if (originalKey == null) {
return null;
}
if (originalKey instanceof String) {
return originalKey;
}
return JSON.toJSONString(originalKey);
}
};
}else {
this.KeyConvertor = KeyConvertor;
}
}
/**
* 同步加载函数<br/>
* 例如:A、B、C三个线程同一时刻查询执行某一个操作(查询数据库等),实际上查询的参数条件一样<br/>
* 这个时候我们可以对线程进行优化、防止数据雪崩缓存穿透,让其中的一个线程进行查库操作,其他线程等待具体工作线程的返回结果<br/>
* 如:A线程进行查库、B、C线程进行等待A线程的返回结果。
* @param <K>
* @param <V>
* @param timeout 等待时间(毫秒)(假设A线程查询超过等待时间、那么B、C线程放弃等待自己去执行业务查询)
* @param key 辨别是否是同一操作的key
* @param loader 加载数据的函数<br/>
* Function<String, String> loader = new Function<String, String>() {
@Override
public String apply(String t) {
return "dbData";
}
};
*
* @return
*/
@SuppressWarnings("unchecked")
public <K, V> V synchronizedLoad(Integer timeout, K key, Function<K, V> loader) {
Object lockKey = buildLoaderLockKey(key);
while (true) {
//有没有线程去做具体的获取业务数据的逻辑
boolean create[] = new boolean[1];
LoaderLock ll = loaderMap.computeIfAbsent(lockKey, (unusedKey) -> {
create[0] = true;
LoaderLock loaderLock = new LoaderLock();
loaderLock.signal = new CountDownLatch(1);
loaderLock.loaderThread = Thread.currentThread();
return loaderLock;
});
if (create[0] || ll.loaderThread == Thread.currentThread()) {
try {
//第一个进入的线程进行真实的访问操作
V loadedValue = loader.apply(key);
ll.success = true;
ll.value = loadedValue;
return loadedValue;
} finally {
if (create[0]) {
ll.signal.countDown();
loaderMap.remove(lockKey);
}
}
} else {
//其他线程进行等待操作
try {
if (timeout == null) {
ll.signal.await();
} else {
boolean ok = ll.signal.await(timeout, TimeUnit.MILLISECONDS);
if (!ok) {
//如果等待超时,放弃等待,当前线程直接进行访问
logger.info("loader wait timeout:" + timeout);
return loader.apply(key);
}
}
} catch (InterruptedException e) {
logger.warn("loader wait interrupted");
return loader.apply(key);
}
//共用同一个返回结果
if (ll.success) {
return (V) ll.value;
} else {
continue;
}
}
}
}
private Object buildLoaderLockKey(Object key) {
return KeyConvertor.apply(key);
}
static class LoaderLock {
CountDownLatch signal;
Thread loaderThread;
volatile boolean success;
volatile Object value;
}
}
测试代码
package test;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import com.xxx.MessageServiceApplication;
import com.xxx.mapper.ActivityInfoMapper;
import com.xxx.model.ActivityInfo;
import com.xxx.utils.sync.SyncUtil;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = { MessageServiceApplication.class })
public class QueryTest {
@Autowired
private ActivityInfoMapper activityInfoMapper;
SyncUtil syncUtil = new SyncUtil();
private AtomicInteger count = new AtomicInteger();
private CyclicBarrier barrier = new CyclicBarrier(100);
ExecutorService pool = Executors.newFixedThreadPool(100);
public void synSelect(Long id) throws Exception {
for (int i = 0; i < 100; i++) {
pool.execute(()->{
try {
barrier.await();//线程等待,触发多线程同步的效果
// realSelect1(id);//传统查询
realSelect2(id);//优化后
} catch (Exception e) {
e.printStackTrace();
}
});
}
pool.shutdown();
while(!pool.isTerminated()){
}
System.out.println();
}
public ActivityInfo realSelect2(Long id) {
//线程通信的访问
Function<Long,ActivityInfo> loader = (param)->{
//记录真实的访问数据库次数(1)
count.incrementAndGet();
return activityInfoMapper.selectById(param);
};
return syncUtil.synchronizedLoad(3000, id, loader);
}
public ActivityInfo realSelect1(Long id) {
//记录真实的访问数据库次数(100)
count.incrementAndGet();
return activityInfoMapper.selectById(id);
}
@Test
public void testSelect() throws Exception {
synSelect(1L);
System.out.println("真实调用次数="+count.get());
}
}
传统的查询结果
优化后的查询结果,只执行了1次SQL
版权声明
本文为[lis1314]所创,转载请带上原文链接,感谢
https://my.oschina.net/lis1314/blog/4707646
边栏推荐
- How to optimize the decoding performance of dynamsoft barcode reader
- [graffiti footprints of Internet of things] mainstream communication mode of Internet of things
- VARCHART XGantt如何在日历上表示工作日
- If you want to forget the WiFi network you used to connect to your Mac, try this!
- 关于DevOps的七大误解,99%的人都曾中过招!
- 11.Service更新
- Win7 how to quickly type CMD and get to the required directory
- Rech8.0 learning days 12 rh134
- Do you really know how to use search engines?
- Developing STM32 USB with cubemx
猜你喜欢
一种超参数优化技术-Hyperopt
2020-11-06: go, let's talk about the scheduler.
The first choice for lightweight GPU applications is the NVIDIA vgpu instance launched by Jingdong Zhilian cloud
Yum [errno 256] no more mirrors to try solution
想要忘记以前连接到Mac的WiFi网络,试试这个方法!
11. Service update
HMS core push service helps e-commerce app to carry out refined operation
In simple terms, the large front-end framework angular6 practical course (angular6 node.js 、keystonejs、
使用RabbitMQ实现分布式事务
【涂鸦物联网足迹】物联网主流通信方式
随机推荐
The JS solution cannot be executed after Ajax loads HTML
9.集群之间服务通信 RoutingMesh
如何利用PopupWindow实现弹出菜单并解决焦点获取以及与软键盘冲突问题
Using JSON webtoken (JWT) to generate token in nodejs
深入浅出大前端框架Angular6实战教程(Angular6、node.js、keystonejs、
Implementation of nginx version of microservice architecture
Logo design company, Nanjing
Application of UHF RFID medical blood management system
DOM节点操作
FreeSWITCH视频会议“标准”解决方案
使用RabbitMQ实现分布式事务
RECH8.0版本学习 days 12 rh134部分
Talk about sharing before paying
Kubernetes (1): introduction to kubernetes
如何解决谷歌Chrome浏览器空白页的问题
Tips for Mac novices
7. Swarm builds clusters
Insomnia all night
In simple terms, the large front-end framework angular6 practical course (angular6 node.js 、keystonejs、
How to deploy Gantt chart quickly and correctly