非线程安全的缓存
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| package simple_cache;
import java.util.HashMap; import java.util.concurrent.TimeUnit;
public class Cache1 { private HashMap<String, Integer> cache = new HashMap<>();
public Integer compute(String userId) throws InterruptedException { Integer result = cache.get(userId);
if (result == null) { result = doCompute(userId); cache.put(userId, result); } return result; }
private Integer doCompute(String userId) throws InterruptedException { TimeUnit.SECONDS.sleep(5); return new Integer(userId); }
public static void main(String[] args) throws InterruptedException { Cache1 cache1 = new Cache1(); System.out.println("开始计算了"); Integer result = cache1.compute("13"); System.out.println("第一次计算结果:" + result);
result = cache1.compute("13"); System.out.println("第二次计算结果:" + result); } }
|
用synchronized保证并发安全
- 性能差(用synchronized之后,线程变成串行的了,多个线程不能同时访问computer方法了,与缓存的使用性不符合)
- 代码复用性差
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| package simple_cache;
import java.util.HashMap; import java.util.concurrent.TimeUnit;
public class Cache1 { private HashMap<String, Integer> cache = new HashMap<>();
public synchronized Integer compute(String userId) throws InterruptedException { Integer result = cache.get(userId);
if (result == null) { result = doCompute(userId); cache.put(userId, result); } return result; }
private Integer doCompute(String userId) throws InterruptedException { TimeUnit.SECONDS.sleep(5); return new Integer(userId); }
public static void main(String[] args) throws InterruptedException { Cache1 cache1 = new Cache1(); System.out.println("开始计算了"); Integer result = cache1.compute("13"); System.out.println("第一次计算结果:" + result);
result = cache1.compute("13"); System.out.println("第二次计算结果:" + result); } }
|
给HashMap加final关键字
- 属性被声明为final后,该变量则只能被赋值一次。且一旦被赋值,final的变量就不能再被改变。
- 所以我们把它加上final关键字,增强安全性。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| package simple_cache;
import java.util.HashMap; import java.util.concurrent.TimeUnit;
public class Cache1 { private final HashMap<String, Integer> cache = new HashMap<>();
public synchronized Integer compute(String userId) throws InterruptedException { Integer result = cache.get(userId);
if (result == null) { result = doCompute(userId); cache.put(userId, result); } return result; }
private Integer doCompute(String userId) throws InterruptedException { TimeUnit.SECONDS.sleep(5); return new Integer(userId); }
public static void main(String[] args) throws InterruptedException { Cache1 cache1 = new Cache1(); System.out.println("开始计算了"); Integer result = cache1.compute("13"); System.out.println("第一次计算结果:" + result);
result = cache1.compute("13"); System.out.println("第二次计算结果:" + result); } }
|
用装饰者模式将缓存与计算逻辑解耦
Computable接口
1 2 3 4 5 6
| package simple_cache.computable;
public interface Computable <A, V>{ V compute(A arg) throws Exception; }
|
计算逻辑ExpensiveFunction
1 2 3 4 5 6 7 8 9 10 11
| package simple_cache.computable;
public class ExpensiveFunction implements Computable<String, Integer>{
@Override public Integer compute(String arg) throws Exception { Thread.sleep(5000); return Integer.valueOf(arg); } }
|
缓存
缓存Cache2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| package simple_cache;
import simple_cache.computable.Computable; import simple_cache.computable.ExpensiveFunction;
import java.util.HashMap; import java.util.Map;
public class Cache2<A, V> implements Computable<A, V> { private final Map<A, V> cache = new HashMap<>();
private final Computable<A, V> c;
public Cache2(Computable<A, V> c) { this.c = c; }
@Override public synchronized V compute(A arg) throws Exception { System.out.println("进入缓存机制"); V result = cache.get(arg); if (result == null) { result = c.compute(arg); cache.put(arg, result); } return result; }
public static void main(String[] args) throws Exception { Cache2<String, Integer> expensiveCompute = new Cache2<>(new ExpensiveFunction()); Integer result = expensiveCompute.compute("13"); System.out.println("第一次计算结果:" + result);
result = expensiveCompute.compute("13"); System.out.println("第二次计算结果:" + result); } }
|
多线程并发查询
缓存Cache3多线程并发查询
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
| package simple_cache;
import simple_cache.computable.Computable; import simple_cache.computable.ExpensiveFunction;
import java.util.HashMap; import java.util.Map;
public class Cache3<A, V> implements Computable<A, V> { private final Map<A, V> cache = new HashMap<>();
private final Computable<A, V> c;
public Cache3(Computable<A, V> c) { this.c = c; }
@Override public synchronized V compute(A arg) throws Exception { System.out.println("进入缓存机制"); V result = cache.get(arg); if (result == null) { result = c.compute(arg); cache.put(arg, result); } return result; }
public static void main(String[] args) throws Exception { Cache3<String, Integer> expensiveCompute = new Cache3<>(new ExpensiveFunction()); new Thread(new Runnable() { @Override public void run() { try { Integer result = expensiveCompute.compute("667"); System.out.println("第一个线程:" + result); } catch (Exception e) { e.printStackTrace(); } } }).start();
new Thread(new Runnable() { @Override public void run() { try { Integer result = expensiveCompute.compute("667"); System.out.println("第二个线程:" + result); } catch (Exception e) { e.printStackTrace(); } } }).start();
new Thread(new Runnable() { @Override public void run() { try { Integer result = expensiveCompute.compute("666"); System.out.println("第三个线程:" + result); } catch (Exception e) { e.printStackTrace(); } } }).start(); } }
|
1 2 3 4 5 6 7
| 输出:其实第二个线程根本没必要等待第三个线程执行完毕 进入缓存机制 进入缓存机制 第一个线程:667 进入缓存机制 第三个线程:666 第二个线程:667
|
锁性能优化
减小锁的粒度
Cache4减小锁的粒度,试图提高性能
- 缺点:虽然提高了并发效率,但是并不意味着就是线程安全的,还需要考虑到同时读写等情况
- 线程仍然不够安全,虽然多个线程不能同时写了,但是如果在写的同时读,同样是线程不安全的
- 但是,其实没必要自己实现线程安全的HashMap,也不应该加synchronized,因为我们自己实现的性能远不如现有的并发集合
- 我们来使用ConcurrentHashMap优化我们的缓存
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
| package simple_cache;
import simple_cache.computable.Computable; import simple_cache.computable.ExpensiveFunction;
import java.util.HashMap; import java.util.Map;
public class Cache4<A, V> implements Computable<A, V> { private final Map<A, V> cache = new HashMap<>();
private final Computable<A, V> c;
public Cache4(Computable<A, V> c) { this.c = c; }
@Override public V compute(A arg) throws Exception { System.out.println("进入缓存机制"); V result = cache.get(arg); if (result == null) { result = c.compute(arg); synchronized (this) { cache.put(arg, result); } } return result; }
public static void main(String[] args) throws Exception { Cache4<String, Integer> expensiveCompute = new Cache4<>(new ExpensiveFunction()); new Thread(new Runnable() { @Override public void run() { try { Integer result = expensiveCompute.compute("667"); System.out.println("第一个线程:" + result); } catch (Exception e) { e.printStackTrace(); } } }).start();
new Thread(new Runnable() { @Override public void run() { try { Integer result = expensiveCompute.compute("667"); System.out.println("第二个线程:" + result); } catch (Exception e) { e.printStackTrace(); } } }).start();
new Thread(new Runnable() { @Override public void run() { try { Integer result = expensiveCompute.compute("666"); System.out.println("第三个线程:" + result); } catch (Exception e) { e.printStackTrace(); } } }).start(); } }
|
并发集合ConcurrentHashMap
Cache5采用并发集合ConcurrentHashMap保证并发安全
- 缺点:在计算完成前,另一个要求计算相同值的请求到来,会导致计算两遍,这和缓存想避免多次计算的初衷恰恰相反,是不可接受的
示意图:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
| package simple_cache;
import simple_cache.computable.Computable; import simple_cache.computable.ExpensiveFunction;
import java.util.Map; import java.util.concurrent.ConcurrentHashMap;
public class Cache5<A, V> implements Computable<A, V> { private final Map<A, V> cache = new ConcurrentHashMap<>();
private final Computable<A, V> c;
public Cache5(Computable<A, V> c) { this.c = c; }
@Override public V compute(A arg) throws Exception { System.out.println("进入缓存机制"); V result = cache.get(arg); if (result == null) { result = c.compute(arg); cache.put(arg, result); } return result; }
public static void main(String[] args) throws Exception { Cache5<String, Integer> expensiveCompute = new Cache5<>(new ExpensiveFunction()); new Thread(new Runnable() { @Override public void run() { try { Integer result = expensiveCompute.compute("667"); System.out.println("第一个线程:" + result); } catch (Exception e) { e.printStackTrace(); } } }).start();
new Thread(new Runnable() { @Override public void run() { try { Integer result = expensiveCompute.compute("667"); System.out.println("第二个线程:" + result); } catch (Exception e) { e.printStackTrace(); } } }).start();
new Thread(new Runnable() { @Override public void run() { try { Integer result = expensiveCompute.compute("666"); System.out.println("第三个线程:" + result); } catch (Exception e) { e.printStackTrace(); } } }).start(); } }
|
演示ConcurrentHashMap的缺点
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
| package simple_cache;
import simple_cache.computable.Computable; import simple_cache.computable.ExpensiveFunction;
import java.util.Map; import java.util.concurrent.ConcurrentHashMap;
public class Cache6<A, V> implements Computable<A, V> { private final Map<A, V> cache = new ConcurrentHashMap<>();
private final Computable<A, V> c;
public Cache6(Computable<A, V> c) { this.c = c; }
@Override public V compute(A arg) throws Exception { System.out.println("进入缓存机制"); V result = cache.get(arg); if (result == null) { result = c.compute(arg); cache.put(arg, result); } return result; }
public static void main(String[] args) throws Exception { Cache6<String, Integer> expensiveCompute = new Cache6<>(new ExpensiveFunction()); new Thread(new Runnable() { @Override public void run() { try { Integer result = expensiveCompute.compute("666"); System.out.println("第一个线程:" + result); } catch (Exception e) { e.printStackTrace(); } } }).start();
new Thread(new Runnable() { @Override public void run() { try { Integer result = expensiveCompute.compute("667"); System.out.println("第二个线程:" + result); } catch (Exception e) { e.printStackTrace(); } } }).start();
new Thread(new Runnable() { @Override public void run() { try { Integer result = expensiveCompute.compute("666"); System.out.println("第三个线程:" + result); } catch (Exception e) { e.printStackTrace(); } } }).start(); } }
|
结果:
三个线程同时进入缓存机制,5秒后同时给出了缓存结果,其实第一个线程和第三个线程重复计算了,并没有用到缓存结果,而是自己重新算了一遍
1 2 3 4 5 6
| 进入缓存机制 进入缓存机制 进入缓存机制 第三个线程:666 第二个线程:667 第一个线程:666
|
避免重复计算Future和Callable
利用Future避免重复计算
- 动机:现在不同的线程进来以后,确实可以同时计算,但是如果两个线程脚前脚后,也就是相差无几的进来请求同一个数据那么我们来看看会出现什么问题:重复计算
- 这个例子只有2个线程,并不可怕,但是如果是100个线程都请求同样的内容,却都需要重新计算,那么会造成巨大的浪费
- 后面的线程,如果能知道前面的线程正在计算,并且计算的内容和我自己要计算的内容一样,那么我只要等待你的计算结果写入缓存之后直接使用即可
- 缺点:如果有两个同时计算666的线程,同时调用cache.get()方法那么返回的结果都为null,后面还是会创建两个任务去计算相同的值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
| package simple_cache;
import simple_cache.computable.Computable; import simple_cache.computable.ExpensiveFunction;
import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; import java.util.concurrent.FutureTask;
public class Cache7<A, V> implements Computable<A, V> { private final Map<A, Future<V>> cache = new ConcurrentHashMap<>();
private final Computable<A, V> c;
public Cache7(Computable<A, V> c) { this.c = c; }
@Override public V compute(A arg) throws Exception { Future<V> f = cache.get(arg); if (f == null) { Callable<V> callable = new Callable<V>() { @Override public V call() throws Exception { return c.compute(arg); } };
FutureTask<V> ft = new FutureTask<>(callable); f = ft;
cache.put(arg, ft); System.out.println("从FutureTask调用了计算函数"); ft.run(); } return f.get(); }
public static void main(String[] args) throws Exception { Cache7<String, Integer> expensiveCompute = new Cache7<>(new ExpensiveFunction()); new Thread(new Runnable() { @Override public void run() { try { Integer result = expensiveCompute.compute("666"); System.out.println("第一个线程:" + result); } catch (Exception e) { e.printStackTrace(); } } }).start();
new Thread(new Runnable() { @Override public void run() { try { Integer result = expensiveCompute.compute("666"); System.out.println("第二个线程:" + result); } catch (Exception e) { e.printStackTrace(); } } }).start();
new Thread(new Runnable() { @Override public void run() { try { Integer result = expensiveCompute.compute("667"); System.out.println("第三个线程:" + result); } catch (Exception e) { e.printStackTrace(); } } }).start(); } }
|
结果:
1 2 3 4 5
| 从FutureTask调用了计算函数 从FutureTask调用了计算函数 第二个线程:666 第一个线程:666 第三个线程:667
|
缺点:如果有两个同时计算666的线程,同时调用cache.get()方法那么返回的结果都为null,后面还是会创建两个任务去计算相同的值
1 2 3 4 5 6
| 从FutureTask调用了计算函数 从FutureTask调用了计算函数 从FutureTask调用了计算函数 第一个线程:666 第二个线程:666 第三个线程:667
|
原子操作putIfAbsent
利用putIfAbsent优化Future小概率的重复计算
- 利用Future可以避免重复计算,但是如果有两个同时计算666的线程,同时调用cache.get()方法那么返回的结果都为null,后面还是会创建两个任务去计算相同的值

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
| package simple_cache;
import simple_cache.computable.Computable; import simple_cache.computable.ExpensiveFunction;
import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; import java.util.concurrent.FutureTask;
public class Cache8<A, V> implements Computable<A, V> { private final Map<A, Future<V>> cache = new ConcurrentHashMap<>();
private final Computable<A, V> c;
public Cache8(Computable<A, V> c) { this.c = c; }
@Override public V compute(A arg) throws Exception { Future<V> f = cache.get(arg); if (f == null) { Callable<V> callable = new Callable<V>() { @Override public V call() throws Exception { return c.compute(arg); } };
FutureTask<V> ft = new FutureTask<>(callable); f = cache.putIfAbsent(arg, ft);
if (f == null) { f = ft; System.out.println("从FutureTask调用了计算函数"); ft.run(); } } return f.get(); }
public static void main(String[] args) throws Exception { Cache8<String, Integer> expensiveCompute = new Cache8<>(new ExpensiveFunction()); new Thread(new Runnable() { @Override public void run() { try { Integer result = expensiveCompute.compute("666"); System.out.println("第一个线程:" + result); } catch (Exception e) { e.printStackTrace(); } } }).start();
new Thread(new Runnable() { @Override public void run() { try { Integer result = expensiveCompute.compute("666"); System.out.println("第二个线程:" + result); } catch (Exception e) { e.printStackTrace(); } } }).start();
new Thread(new Runnable() { @Override public void run() { try { Integer result = expensiveCompute.compute("667"); System.out.println("第三个线程:" + result); } catch (Exception e) { e.printStackTrace(); } } }).start(); } }
|
结果:
1 2 3 4 5
| 从FutureTask调用了计算函数 从FutureTask调用了计算函数 第二个线程:666 第三个线程:667 第一个线程:666
|
ExcecutionException
MayFail
耗时计算的实现类,有概率计算失败
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| package simple_cache.computable;
import java.io.IOException;
public class MayFail implements Computable<String, Integer>{
@Override public Integer compute(String arg) throws Exception { double random = Math.random(); if (random > 0.5) { throw new IOException("读取文件出错"); } Thread.sleep(3000); return Integer.valueOf(arg); } }
|
正确的异常处理逻辑(各司其职)
- 这3种异常之所以用不同的catch块捕获,是因为它们的处理逻辑是不同的
- CancellationException和InterruptedException是人为取消的,那么我们应该立即终止任务
- 但是如果是计算错误,且我们明确知道多试几次就可以得到答案,那么我们的逻辑应该是重试,尝试多次直到正确的结果出现
- 在这里,我们加上while(true)来保证计算出错不会影响我们的逻辑,然后如果是计算错误,就进入下一个循环,重新计算,直到计算成功;如果是人为取消,那么就抛出异常然后结束运行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
| package simple_cache;
import simple_cache.computable.Computable; import simple_cache.computable.ExpensiveFunction; import simple_cache.computable.MayFail;
import java.util.Map; import java.util.concurrent.*;
public class Cache9<A, V> implements Computable<A, V> { private final Map<A, Future<V>> cache = new ConcurrentHashMap<>();
private final Computable<A, V> c;
public Cache9(Computable<A, V> c) { this.c = c; }
@Override public V compute(A arg) throws ExecutionException, InterruptedException { while (true) { Future<V> f = cache.get(arg); if (f == null) { Callable<V> callable = new Callable<V>() { @Override public V call() throws Exception { return c.compute(arg); } };
FutureTask<V> ft = new FutureTask<>(callable); f = cache.putIfAbsent(arg, ft);
if (f == null) { f = ft; System.out.println("从FutureTask调用了计算函数"); ft.run(); } } try { return f.get(); } catch (CancellationException e) { System.out.println("被取消了"); throw e; } catch (InterruptedException e) { throw e; } catch (ExecutionException e) { System.out.println("计算错误,需要重试"); } } }
public static void main(String[] args) throws Exception { Cache9<String, Integer> expensiveCompute = new Cache9<>(new MayFail()); new Thread(new Runnable() { @Override public void run() { try { Integer result = expensiveCompute.compute("666"); System.out.println("第一个线程:" + result); } catch (Exception e) { e.printStackTrace(); } } }).start();
new Thread(new Runnable() { @Override public void run() { try { Integer result = expensiveCompute.compute("666"); System.out.println("第二个线程:" + result); } catch (Exception e) { e.printStackTrace(); } } }).start();
new Thread(new Runnable() { @Override public void run() { try { Integer result = expensiveCompute.compute("667"); System.out.println("第三个线程:" + result); } catch (Exception e) { e.printStackTrace(); } } }).start(); } }
|
结果:一直循环(为什么?因为缓存被污染了)
1 2 3 4 5
| 计算错误,需要重试 计算错误,需要重试 计算错误,需要重试 计算错误,需要重试 ...
|
缓存污染问题
使用cache.remove(arg);清除被污染的缓存
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
| package simple_cache;
import simple_cache.computable.Computable; import simple_cache.computable.ExpensiveFunction; import simple_cache.computable.MayFail;
import java.util.Map; import java.util.concurrent.*;
public class Cache9<A, V> implements Computable<A, V> { private final Map<A, Future<V>> cache = new ConcurrentHashMap<>();
private final Computable<A, V> c;
public Cache9(Computable<A, V> c) { this.c = c; }
@Override public V compute(A arg) throws ExecutionException, InterruptedException { while (true) { Future<V> f = cache.get(arg); if (f == null) { Callable<V> callable = new Callable<V>() { @Override public V call() throws Exception { return c.compute(arg); } };
FutureTask<V> ft = new FutureTask<>(callable); f = cache.putIfAbsent(arg, ft);
if (f == null) { f = ft; System.out.println("从FutureTask调用了计算函数"); ft.run(); } } try { return f.get(); } catch (CancellationException e) { System.out.println("被取消了"); cache.remove(arg); throw e; } catch (InterruptedException e) { cache.remove(arg); throw e; } catch (ExecutionException e) { System.out.println("计算错误,需要重试"); cache.remove(arg); } } }
public static void main(String[] args) throws Exception { Cache9<String, Integer> expensiveCompute = new Cache9<>(new MayFail()); new Thread(new Runnable() { @Override public void run() { try { Integer result = expensiveCompute.compute("666"); System.out.println("第一个线程:" + result); } catch (Exception e) { e.printStackTrace(); } } }).start();
new Thread(new Runnable() { @Override public void run() { try { Integer result = expensiveCompute.compute("666"); System.out.println("第二个线程:" + result); } catch (Exception e) { e.printStackTrace(); } } }).start();
new Thread(new Runnable() { @Override public void run() { try { Integer result = expensiveCompute.compute("667"); System.out.println("第三个线程:" + result); } catch (Exception e) { e.printStackTrace(); } } }).start(); } }
|
结果:
1 2 3 4 5 6 7 8 9
| 从FutureTask调用了计算函数 从FutureTask调用了计算函数 计算错误,需要重试 计算错误,需要重试 从FutureTask调用了计算函数 从FutureTask调用了计算函数 第一个线程:666 第二个线程:666 第三个线程:667
|
缓存过期功能
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
| package simple_cache;
import simple_cache.computable.Computable; import simple_cache.computable.MayFail;
import java.util.Map; import java.util.concurrent.*;
public class Cache10<A, V> implements Computable<A, V> { private final Map<A, Future<V>> cache = new ConcurrentHashMap<>();
private final Computable<A, V> c;
public Cache10(Computable<A, V> c) { this.c = c; }
@Override public V compute(A arg) throws ExecutionException, InterruptedException { while (true) { Future<V> f = cache.get(arg); if (f == null) { Callable<V> callable = new Callable<V>() { @Override public V call() throws Exception { return c.compute(arg); } };
FutureTask<V> ft = new FutureTask<>(callable); f = cache.putIfAbsent(arg, ft);
if (f == null) { f = ft; System.out.println("从FutureTask调用了计算函数"); ft.run(); } } try { return f.get(); } catch (CancellationException e) { System.out.println("被取消了"); cache.remove(arg); throw e; } catch (InterruptedException e) { cache.remove(arg); throw e; } catch (ExecutionException e) { System.out.println("计算错误,需要重试"); cache.remove(arg); } } }
public final static ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
public V compute(A arg, long expire) throws ExecutionException, InterruptedException { if (expire > 0) { executor.schedule(new Runnable() { @Override public void run() { expire(arg); } }, expire, TimeUnit.MILLISECONDS); } return compute(arg); }
public synchronized void expire(A key) { Future<V> future = cache.get(key); if (future != null) { if (!future.isDone()) { System.out.println("Future任务被取消"); future.cancel(true); } System.out.println("过期时间到,缓存被清除"); cache.remove(key); } }
public static void main(String[] args) throws Exception { Cache10<String, Integer> expensiveCompute = new Cache10<>(new MayFail()); new Thread(new Runnable() { @Override public void run() { try { Integer result = expensiveCompute.compute("666", 5000L); System.out.println("第一次计算结果:" + result); } catch (Exception e) { e.printStackTrace(); } } }).start();
new Thread(new Runnable() { @Override public void run() { try { Integer result = expensiveCompute.compute("666"); System.out.println("第二次计算结果:" + result); } catch (Exception e) { e.printStackTrace(); } } }).start();
new Thread(new Runnable() { @Override public void run() { try { Integer result = expensiveCompute.compute("667"); System.out.println("第三次计算结果:" + result); } catch (Exception e) { e.printStackTrace(); } } }).start();
Thread.sleep(6000); Integer result = expensiveCompute.compute("666"); System.out.println("第四次计算结果:" + result); } }
|
结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| 从FutureTask调用了计算函数 从FutureTask调用了计算函数 计算错误,需要重试 从FutureTask调用了计算函数 计算错误,需要重试 从FutureTask调用了计算函数 计算错误,需要重试 从FutureTask调用了计算函数 计算错误,需要重试 从FutureTask调用了计算函数 计算错误,需要重试 从FutureTask调用了计算函数 第一次计算结果:666 第二次计算结果:666 第三次计算结果:667 过期时间到,缓存被清除 从FutureTask调用了计算函数 计算错误,需要重试 从FutureTask调用了计算函数 第四次计算结果:666
|
高并发访问时
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
| package simple_cache;
import simple_cache.computable.Computable; import simple_cache.computable.MayFail;
import java.util.Map; import java.util.concurrent.*;
public class Cache11<A, V> implements Computable<A, V> { private final Map<A, Future<V>> cache = new ConcurrentHashMap<>();
private final Computable<A, V> c;
public Cache11(Computable<A, V> c) { this.c = c; }
@Override public V compute(A arg) throws ExecutionException, InterruptedException { while (true) { Future<V> f = cache.get(arg); if (f == null) { Callable<V> callable = new Callable<V>() { @Override public V call() throws Exception { return c.compute(arg); } };
FutureTask<V> ft = new FutureTask<>(callable); f = cache.putIfAbsent(arg, ft);
if (f == null) { f = ft; System.out.println("从FutureTask调用了计算函数"); ft.run(); } } try { return f.get(); } catch (CancellationException e) { System.out.println("被取消了"); cache.remove(arg); throw e; } catch (InterruptedException e) { cache.remove(arg); throw e; } catch (ExecutionException e) { System.out.println("计算错误,需要重试"); cache.remove(arg); } } }
public V computeRandomExpire(A arg) throws ExecutionException, InterruptedException { long randomExpire = (long) (Math.random() * 10000); return compute(arg, randomExpire); }
public final static ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
public V compute(A arg, long expire) throws ExecutionException, InterruptedException { if (expire > 0) { executor.schedule(new Runnable() { @Override public void run() { expire(arg); } }, expire, TimeUnit.MILLISECONDS); } return compute(arg); }
public synchronized void expire(A key) { Future<V> future = cache.get(key); if (future != null) { if (!future.isDone()) { System.out.println("Future任务被取消"); future.cancel(true); } System.out.println("过期时间到,缓存被清除"); cache.remove(key); } }
public static void main(String[] args) throws Exception { Cache11<String, Integer> expensiveCompute = new Cache11<>(new MayFail()); new Thread(new Runnable() { @Override public void run() { try { Integer result = expensiveCompute.compute("666", 5000L); System.out.println("第一次计算结果:" + result); } catch (Exception e) { e.printStackTrace(); } } }).start();
new Thread(new Runnable() { @Override public void run() { try { Integer result = expensiveCompute.compute("666"); System.out.println("第二次计算结果:" + result); } catch (Exception e) { e.printStackTrace(); } } }).start();
new Thread(new Runnable() { @Override public void run() { try { Integer result = expensiveCompute.compute("667"); System.out.println("第三次计算结果:" + result); } catch (Exception e) { e.printStackTrace(); } } }).start();
Thread.sleep(6000); Integer result = expensiveCompute.compute("666"); System.out.println("第四次计算结果:" + result); } }
|
模拟大量请求,观测缓存效果
用线程池创建大量线程get,用了缓存后,总体耗时大大减少,体现了缓存的作用
注:IO密集型(某大厂实践经验)
核心线程数 = CPU核数 / (1-阻塞系数)
或者
CPU密集型:核心线程数 = CPU核数 + 1 IO密集型:核心线程数 = CPU核数 * 2
60万并发:
线程池大小17:124ms
100万并发:
线程池大小21:356ms
线程池大小17:148ms
线程池大小40:375ms
说明是CPU密集型
不用CountDownLatch
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| import computable.ExpensiveFunction;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;
public class QpsTest1 {
static Cache12<String, Integer> expensiveComputer = new Cache12<>(new ExpensiveFunction());
public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(17);
long start = System.currentTimeMillis();
for (int i = 0; i < 1050000; i++) { service.submit(() -> { Integer result = null; try { result = expensiveComputer.compute("666"); } catch (ExecutionException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } }); } service.shutdown();
while (!service.isTerminated()) { }
System.out.println("总耗时:"+(System.currentTimeMillis() - start)); } }
|
用CountDownLatch
40万并发:
线程池大小17:107ms
CountDownLatch概念
CountDownLatch是一个同步工具类,用来协调多个线程之间的同步,或者说起到线程之间的通信(而不是用作互斥的作用)。
CountDownLatch能够使一个线程在等待另外一些线程完成各自工作之后,再继续执行。使用一个计数器进行实现。计数器初始值为线程的数量。当每一个线程完成自己任务后,计数器的值就会减一。当计数器的值为0时,表示所有的线程都已经完成一些任务,然后在CountDownLatch上等待的线程就可以恢复执行接下来的任务。
CountDownLatch的用法
CountDownLatch典型用法:1、某一线程在开始运行前等待n个线程执行完毕。将CountDownLatch的计数器初始化为new CountDownLatch(n),每当一个任务线程执行完毕,就将计数器减1 countdownLatch.countDown(),当计数器的值变为0时,在CountDownLatch上await()的线程就会被唤醒。一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。
CountDownLatch典型用法:2、实现多个线程开始执行任务的最大并行性。注意是并行性,不是并发,强调的是多个线程在某一时刻同时开始执行。类似于赛跑,将多个线程放到起点,等待发令枪响,然后同时开跑。做法是初始化一个共享的CountDownLatch(1),将其计算器初始化为1,多个线程在开始执行任务前首先countdownlatch.await(),当主线程调用countDown()时,计数器变为0,多个线程同时被唤醒。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
| import computable.ExpensiveFunction;
import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;
public class QpsTest2 {
static Cache12<String, Integer> expensiveComputer = new Cache12<>(new ExpensiveFunction());
public static CountDownLatch countDownLatch = new CountDownLatch(1);
public static void main(String[] args) throws InterruptedException {
ExecutorService service = Executors.newFixedThreadPool(17);
long start = System.currentTimeMillis();
for (int i = 0; i < 100; i++) { service.submit(() -> { Integer result = null; try { System.out.println(Thread.currentThread().getName()+"开始等待"); countDownLatch.await(); SimpleDateFormat dateFormat = ThreadSafeFormatter.dateFormatter.get(); String time = dateFormat.format(new Date()); System.out.println(Thread.currentThread().getName()+" "+time+"被放行"); result = expensiveComputer.compute("666");
} catch (ExecutionException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(result); }); }
Thread.sleep(5000); countDownLatch.countDown(); service.shutdown(); } }
class ThreadSafeFormatter {
public static ThreadLocal<SimpleDateFormat> dateFormatter = new ThreadLocal<SimpleDateFormat>() {
@Override protected SimpleDateFormat initialValue() { return new SimpleDateFormat("mm:ss"); }
@Override public SimpleDateFormat get() { return super.get(); } }; }
|