手写实现高性能缓存

非线程安全的缓存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package simple_cache;

import java.util.HashMap;
import java.util.concurrent.TimeUnit;

// 最简单的缓存形式:HashMap
public class Cache1 {
private HashMap<String, Integer> cache = new HashMap<>();

public Integer compute(String userId) throws InterruptedException {
Integer result = cache.get(userId);

// 先检查HashMap里面有没有保存过之前的计算结果
if (result == null) {
// 如果缓存中找不到,那么需要现在来计算一下结果,并保存到HashMap中
result = doCompute(userId);
cache.put(userId, result);
} return result;
}

// 模拟实际的业务中的计算逻辑,采用sleep代替
private Integer doCompute(String userId) throws InterruptedException {
TimeUnit.SECONDS.sleep(5);
return new Integer(userId);
}

public static void main(String[] args) throws InterruptedException {
Cache1 cache1 = new Cache1();
System.out.println("开始计算了");
Integer result = cache1.compute("13");
System.out.println("第一次计算结果:" + result);

result = cache1.compute("13");
System.out.println("第二次计算结果:" + result);
}
}

用synchronized保证并发安全

  • 性能差(用synchronized之后,线程变成串行的了,多个线程不能同时访问computer方法了,与缓存的使用性不符合)
  • 代码复用性差
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package simple_cache;

import java.util.HashMap;
import java.util.concurrent.TimeUnit;

// 最简单的缓存形式:HashMap
public class Cache1 {
private HashMap<String, Integer> cache = new HashMap<>();

// 用synchronized保证并发安全
public synchronized Integer compute(String userId) throws InterruptedException {
Integer result = cache.get(userId);

// 先检查HashMap里面有没有保存过之前的计算结果
if (result == null) {
// 如果缓存中找不到,那么需要现在来计算一下结果,并保存到HashMap中
result = doCompute(userId);
cache.put(userId, result);
}
return result;
}

// 模拟实际的业务中的计算逻辑,采用sleep代替
private Integer doCompute(String userId) throws InterruptedException {
TimeUnit.SECONDS.sleep(5);
return new Integer(userId);
}

public static void main(String[] args) throws InterruptedException {
Cache1 cache1 = new Cache1();
System.out.println("开始计算了");
Integer result = cache1.compute("13");
System.out.println("第一次计算结果:" + result);

result = cache1.compute("13");
System.out.println("第二次计算结果:" + result);
}
}

给HashMap加final关键字

  • 属性被声明为final后,该变量则只能被赋值一次。且一旦被赋值,final的变量就不能再被改变。
  • 所以我们把它加上final关键字,增强安全性。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package simple_cache;

import java.util.HashMap;
import java.util.concurrent.TimeUnit;

// 最简单的缓存形式:HashMap
public class Cache1 {
private final HashMap<String, Integer> cache = new HashMap<>();

// 用synchronized保证并发安全
public synchronized Integer compute(String userId) throws InterruptedException {
Integer result = cache.get(userId);

// 先检查HashMap里面有没有保存过之前的计算结果
if (result == null) {
// 如果缓存中找不到,那么需要现在来计算一下结果,并保存到HashMap中
result = doCompute(userId);
cache.put(userId, result);
}
return result;
}

// 模拟实际的业务中的计算逻辑,采用sleep代替
private Integer doCompute(String userId) throws InterruptedException {
TimeUnit.SECONDS.sleep(5);
return new Integer(userId);
}

public static void main(String[] args) throws InterruptedException {
Cache1 cache1 = new Cache1();
System.out.println("开始计算了");
Integer result = cache1.compute("13");
System.out.println("第一次计算结果:" + result);

result = cache1.compute("13");
System.out.println("第二次计算结果:" + result);
}
}

用装饰者模式将缓存与计算逻辑解耦

  • 有一个计算函数computer,用来代表耗时计算,每个计算器都要实现这个接口,这样就可以无侵入实现缓存功能

  • 我们假设ExpensiveFunction类是耗时计算的实现类,实现了Computable接口,但是其本身不具备缓存功能,也不需要考虑缓存的事情

  • 缺点:

    • 性能差,不能并行计算(还是synchronized的问题)
    • Cache3 当多个线程同时想计算的时候,需要慢慢等待,严重时,性能甚至比不用缓存更差

Computable接口

1
2
3
4
5
6
package simple_cache.computable;

// 有一个计算函数compute,用来代表耗时计算,每个计算器都要实现这个接口,这样就可以无侵入实现缓存功能
public interface Computable <A, V>{
V compute(A arg) throws Exception;
}

计算逻辑ExpensiveFunction

1
2
3
4
5
6
7
8
9
10
11
package simple_cache.computable;

// ExpensiveFunction类是耗时计算的实现类,实现了Computable接口,但是其本身不具备缓存功能,也不需要考虑缓存的事情
public class ExpensiveFunction implements Computable<String, Integer>{

@Override
public Integer compute(String arg) throws Exception {
Thread.sleep(5000);
return Integer.valueOf(arg);
}
}

缓存

缓存Cache2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package simple_cache;

import simple_cache.computable.Computable;
import simple_cache.computable.ExpensiveFunction;

import java.util.HashMap;
import java.util.Map;

// 用装饰者模式,给计算器自动添加缓存功能
public class Cache2<A, V> implements Computable<A, V> {
private final Map<A, V> cache = new HashMap<>();

private final Computable<A, V> c;

public Cache2(Computable<A, V> c) {
this.c = c;
}


@Override
public synchronized V compute(A arg) throws Exception {
System.out.println("进入缓存机制");
V result = cache.get(arg);
if (result == null) {
result = c.compute(arg);
cache.put(arg, result);
}
return result;
}

public static void main(String[] args) throws Exception {
Cache2<String, Integer> expensiveCompute = new Cache2<>(new ExpensiveFunction());
Integer result = expensiveCompute.compute("13");
System.out.println("第一次计算结果:" + result);

result = expensiveCompute.compute("13");
System.out.println("第二次计算结果:" + result);
}
}

多线程并发查询

缓存Cache3多线程并发查询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package simple_cache;

import simple_cache.computable.Computable;
import simple_cache.computable.ExpensiveFunction;

import java.util.HashMap;
import java.util.Map;

// 用装饰者模式,给计算器自动添加缓存功能
public class Cache3<A, V> implements Computable<A, V> {
private final Map<A, V> cache = new HashMap<>();

private final Computable<A, V> c;

public Cache3(Computable<A, V> c) {
this.c = c;
}


@Override
public synchronized V compute(A arg) throws Exception {
System.out.println("进入缓存机制");
V result = cache.get(arg);
if (result == null) {
result = c.compute(arg);
cache.put(arg, result);
}
return result;
}

public static void main(String[] args) throws Exception {
Cache3<String, Integer> expensiveCompute = new Cache3<>(new ExpensiveFunction());
// 第一个线程
new Thread(new Runnable() {
@Override
public void run() {
try {
Integer result = expensiveCompute.compute("667");
System.out.println("第一个线程:" + result);
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();

// 第二个线程
new Thread(new Runnable() {
@Override
public void run() {
try {
Integer result = expensiveCompute.compute("667");
System.out.println("第二个线程:" + result);
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();


// 第三个线程
new Thread(new Runnable() {
@Override
public void run() {
try {
Integer result = expensiveCompute.compute("666");
System.out.println("第三个线程:" + result);
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
}
1
2
3
4
5
6
7
输出:其实第二个线程根本没必要等待第三个线程执行完毕
进入缓存机制
进入缓存机制
第一个线程:667
进入缓存机制
第三个线程:666
第二个线程:667

锁性能优化

减小锁的粒度

Cache4减小锁的粒度,试图提高性能

  • 缺点:虽然提高了并发效率,但是并不意味着就是线程安全的,还需要考虑到同时读写等情况
    • 线程仍然不够安全,虽然多个线程不能同时写了,但是如果在写的同时读,同样是线程不安全的
  • 但是,其实没必要自己实现线程安全的HashMap,也不应该加synchronized,因为我们自己实现的性能远不如现有的并发集合
  • 我们来使用ConcurrentHashMap优化我们的缓存
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package simple_cache;

import simple_cache.computable.Computable;
import simple_cache.computable.ExpensiveFunction;

import java.util.HashMap;
import java.util.Map;

// 缩小synchronized的粒度,提高性能,但是依然并发不安全
public class Cache4<A, V> implements Computable<A, V> {
private final Map<A, V> cache = new HashMap<>();

private final Computable<A, V> c;

public Cache4(Computable<A, V> c) {
this.c = c;
}


@Override
public V compute(A arg) throws Exception {
System.out.println("进入缓存机制");
V result = cache.get(arg);
if (result == null) {
result = c.compute(arg);
// 缩小synchronized保护的范围
synchronized (this) {
cache.put(arg, result);
}
}
return result;
}

public static void main(String[] args) throws Exception {
Cache4<String, Integer> expensiveCompute = new Cache4<>(new ExpensiveFunction());
// 第一个线程
new Thread(new Runnable() {
@Override
public void run() {
try {
Integer result = expensiveCompute.compute("667");
System.out.println("第一个线程:" + result);
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();

// 第二个线程
new Thread(new Runnable() {
@Override
public void run() {
try {
Integer result = expensiveCompute.compute("667");
System.out.println("第二个线程:" + result);
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();


// 第三个线程
new Thread(new Runnable() {
@Override
public void run() {
try {
Integer result = expensiveCompute.compute("666");
System.out.println("第三个线程:" + result);
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
}

并发集合ConcurrentHashMap

Cache5采用并发集合ConcurrentHashMap保证并发安全

  • 缺点:在计算完成前,另一个要求计算相同值的请求到来,会导致计算两遍,这和缓存想避免多次计算的初衷恰恰相反,是不可接受的

示意图:

image-20220407124801817

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package simple_cache;

import simple_cache.computable.Computable;
import simple_cache.computable.ExpensiveFunction;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

// 采用ConcurrentHashMap保证并发安全
public class Cache5<A, V> implements Computable<A, V> {
private final Map<A, V> cache = new ConcurrentHashMap<>();

private final Computable<A, V> c;

public Cache5(Computable<A, V> c) {
this.c = c;
}


@Override
public V compute(A arg) throws Exception {
System.out.println("进入缓存机制");
V result = cache.get(arg);
if (result == null) {
result = c.compute(arg);
cache.put(arg, result);
}
return result;
}

public static void main(String[] args) throws Exception {
Cache5<String, Integer> expensiveCompute = new Cache5<>(new ExpensiveFunction());
// 第一个线程
new Thread(new Runnable() {
@Override
public void run() {
try {
Integer result = expensiveCompute.compute("667");
System.out.println("第一个线程:" + result);
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();

// 第二个线程
new Thread(new Runnable() {
@Override
public void run() {
try {
Integer result = expensiveCompute.compute("667");
System.out.println("第二个线程:" + result);
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();


// 第三个线程
new Thread(new Runnable() {
@Override
public void run() {
try {
Integer result = expensiveCompute.compute("666");
System.out.println("第三个线程:" + result);
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
}

演示ConcurrentHashMap的缺点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package simple_cache;

import simple_cache.computable.Computable;
import simple_cache.computable.ExpensiveFunction;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

// 演示ConcurrentHashMap的缺点
public class Cache6<A, V> implements Computable<A, V> {
private final Map<A, V> cache = new ConcurrentHashMap<>();

private final Computable<A, V> c;

public Cache6(Computable<A, V> c) {
this.c = c;
}


@Override
public V compute(A arg) throws Exception {
System.out.println("进入缓存机制");
V result = cache.get(arg);
if (result == null) {
result = c.compute(arg);
cache.put(arg, result);
}
return result;
}

public static void main(String[] args) throws Exception {
Cache6<String, Integer> expensiveCompute = new Cache6<>(new ExpensiveFunction());
// 第一个线程
new Thread(new Runnable() {
@Override
public void run() {
try {
Integer result = expensiveCompute.compute("666");
System.out.println("第一个线程:" + result);
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();

// 第二个线程
new Thread(new Runnable() {
@Override
public void run() {
try {
Integer result = expensiveCompute.compute("667");
System.out.println("第二个线程:" + result);
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();


// 第三个线程
new Thread(new Runnable() {
@Override
public void run() {
try {
Integer result = expensiveCompute.compute("666");
System.out.println("第三个线程:" + result);
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
}

结果:

三个线程同时进入缓存机制,5秒后同时给出了缓存结果,其实第一个线程和第三个线程重复计算了,并没有用到缓存结果,而是自己重新算了一遍

1
2
3
4
5
6
进入缓存机制
进入缓存机制
进入缓存机制
第三个线程:666
第二个线程:667
第一个线程:666

避免重复计算Future和Callable

利用Future避免重复计算

  • 动机:现在不同的线程进来以后,确实可以同时计算,但是如果两个线程脚前脚后,也就是相差无几的进来请求同一个数据那么我们来看看会出现什么问题:重复计算
  • 这个例子只有2个线程,并不可怕,但是如果是100个线程都请求同样的内容,却都需要重新计算,那么会造成巨大的浪费
  • 后面的线程,如果能知道前面的线程正在计算,并且计算的内容和我自己要计算的内容一样,那么我只要等待你的计算结果写入缓存之后直接使用即可
  • 缺点:如果有两个同时计算666的线程,同时调用cache.get()方法那么返回的结果都为null,后面还是会创建两个任务去计算相同的值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package simple_cache;

import simple_cache.computable.Computable;
import simple_cache.computable.ExpensiveFunction;

import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;

// 利用Future避免重复计算
public class Cache7<A, V> implements Computable<A, V> {
// Future包装value
private final Map<A, Future<V>> cache = new ConcurrentHashMap<>();

private final Computable<A, V> c;

public Cache7(Computable<A, V> c) {
this.c = c;
}


@Override
public V compute(A arg) throws Exception {
// 由于ConcurrentHashMap的可见性,当前一个线程写入了cache.put(arg, ft);的时候,后面的缓存就会得到f不是null
Future<V> f = cache.get(arg);
if (f == null) {
// 新建一个任务
Callable<V> callable = new Callable<V>() {
@Override
public V call() throws Exception {
return c.compute(arg);
}
};

FutureTask<V> ft = new FutureTask<>(callable);
f = ft;

// 先将ft放入缓存,然后再进行计算
cache.put(arg, ft);
System.out.println("从FutureTask调用了计算函数");
ft.run();
}
// get()不是立即返回的,而是在算完了结果以后才会返回;有结果之前get()会堵塞
// 当ft.run();执行完毕之后, Future<V> f里面就有值了
return f.get();
}

public static void main(String[] args) throws Exception {
Cache7<String, Integer> expensiveCompute = new Cache7<>(new ExpensiveFunction());
// 第一个线程
new Thread(new Runnable() {
@Override
public void run() {
try {
Integer result = expensiveCompute.compute("666");
System.out.println("第一个线程:" + result);
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();

// 第二个线程
new Thread(new Runnable() {
@Override
public void run() {
try {
Integer result = expensiveCompute.compute("666");
System.out.println("第二个线程:" + result);
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();


// 第三个线程
new Thread(new Runnable() {
@Override
public void run() {
try {
Integer result = expensiveCompute.compute("667");
System.out.println("第三个线程:" + result);
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
}

结果:

1
2
3
4
5
从FutureTask调用了计算函数
从FutureTask调用了计算函数
第二个线程:666
第一个线程:666
第三个线程:667

缺点:如果有两个同时计算666的线程,同时调用cache.get()方法那么返回的结果都为null,后面还是会创建两个任务去计算相同的值

1
2
3
4
5
6
从FutureTask调用了计算函数
从FutureTask调用了计算函数
从FutureTask调用了计算函数
第一个线程:666
第二个线程:666
第三个线程:667

原子操作putIfAbsent

利用putIfAbsent优化Future小概率的重复计算

  • 利用Future可以避免重复计算,但是如果有两个同时计算666的线程,同时调用cache.get()方法那么返回的结果都为null,后面还是会创建两个任务去计算相同的值

image-20220407190524546

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package simple_cache;

import simple_cache.computable.Computable;
import simple_cache.computable.ExpensiveFunction;

import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;

public class Cache8<A, V> implements Computable<A, V> {
// Future包装value
private final Map<A, Future<V>> cache = new ConcurrentHashMap<>();

private final Computable<A, V> c;

public Cache8(Computable<A, V> c) {
this.c = c;
}


@Override
public V compute(A arg) throws Exception {
// 由于ConcurrentHashMap的可见性,当前一个线程写入了cache.put(arg, ft);的时候,后面的缓存就会得到f不是null
Future<V> f = cache.get(arg);
if (f == null) {
// 新建一个任务
Callable<V> callable = new Callable<V>() {
@Override
public V call() throws Exception {
return c.compute(arg);
}
};

FutureTask<V> ft = new FutureTask<>(callable);
f = cache.putIfAbsent(arg, ft);

if (f == null) {
f = ft;
System.out.println("从FutureTask调用了计算函数");
ft.run();
}
}
return f.get();
}

public static void main(String[] args) throws Exception {
Cache8<String, Integer> expensiveCompute = new Cache8<>(new ExpensiveFunction());
// 第一个线程
new Thread(new Runnable() {
@Override
public void run() {
try {
Integer result = expensiveCompute.compute("666");
System.out.println("第一个线程:" + result);
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();

// 第二个线程
new Thread(new Runnable() {
@Override
public void run() {
try {
Integer result = expensiveCompute.compute("666");
System.out.println("第二个线程:" + result);
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();


// 第三个线程
new Thread(new Runnable() {
@Override
public void run() {
try {
Integer result = expensiveCompute.compute("667");
System.out.println("第三个线程:" + result);
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
}

结果:

1
2
3
4
5
从FutureTask调用了计算函数
从FutureTask调用了计算函数
第二个线程:666
第三个线程:667
第一个线程:666

ExcecutionException

MayFail

耗时计算的实现类,有概率计算失败

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package simple_cache.computable;

import java.io.IOException;

// 耗时计算的实现类,有概率计算失败
public class MayFail implements Computable<String, Integer>{

@Override
public Integer compute(String arg) throws Exception {
double random = Math.random();
if (random > 0.5) {
throw new IOException("读取文件出错");
}
Thread.sleep(3000);
return Integer.valueOf(arg);
}
}

正确的异常处理逻辑(各司其职)

  • 这3种异常之所以用不同的catch块捕获,是因为它们的处理逻辑是不同的
    • CancellationException和InterruptedException是人为取消的,那么我们应该立即终止任务
    • 但是如果是计算错误,且我们明确知道多试几次就可以得到答案,那么我们的逻辑应该是重试,尝试多次直到正确的结果出现
    • 在这里,我们加上while(true)来保证计算出错不会影响我们的逻辑,然后如果是计算错误,就进入下一个循环,重新计算,直到计算成功;如果是人为取消,那么就抛出异常然后结束运行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package simple_cache;

import simple_cache.computable.Computable;
import simple_cache.computable.ExpensiveFunction;
import simple_cache.computable.MayFail;

import java.util.Map;
import java.util.concurrent.*;

public class Cache9<A, V> implements Computable<A, V> {
// Future包装value
private final Map<A, Future<V>> cache = new ConcurrentHashMap<>();

private final Computable<A, V> c;

public Cache9(Computable<A, V> c) {
this.c = c;
}


@Override
public V compute(A arg) throws ExecutionException, InterruptedException {
// 由于ConcurrentHashMap的可见性,当前一个线程写入了cache.put(arg, ft);的时候,后面的缓存就会得到f不是null
while (true) {
Future<V> f = cache.get(arg);
if (f == null) {
// 新建一个任务
Callable<V> callable = new Callable<V>() {
@Override
public V call() throws Exception {
return c.compute(arg);
}
};

FutureTask<V> ft = new FutureTask<>(callable);
f = cache.putIfAbsent(arg, ft);

if (f == null) {
f = ft;
System.out.println("从FutureTask调用了计算函数");
ft.run();
}
}
try {
return f.get();
} catch (CancellationException e) {
System.out.println("被取消了");
throw e;
} catch (InterruptedException e) {
throw e;
} catch (ExecutionException e) {
System.out.println("计算错误,需要重试");
}
}
}

public static void main(String[] args) throws Exception {
Cache9<String, Integer> expensiveCompute = new Cache9<>(new MayFail());
// 第一个线程
new Thread(new Runnable() {
@Override
public void run() {
try {
Integer result = expensiveCompute.compute("666");
System.out.println("第一个线程:" + result);
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();

// 第二个线程
new Thread(new Runnable() {
@Override
public void run() {
try {
Integer result = expensiveCompute.compute("666");
System.out.println("第二个线程:" + result);
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();

// 第三个线程
new Thread(new Runnable() {
@Override
public void run() {
try {
Integer result = expensiveCompute.compute("667");
System.out.println("第三个线程:" + result);
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
}

结果:一直循环(为什么?因为缓存被污染了)

1
2
3
4
5
计算错误,需要重试
计算错误,需要重试
计算错误,需要重试
计算错误,需要重试
...

缓存污染问题

使用cache.remove(arg);清除被污染的缓存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package simple_cache;

import simple_cache.computable.Computable;
import simple_cache.computable.ExpensiveFunction;
import simple_cache.computable.MayFail;

import java.util.Map;
import java.util.concurrent.*;

public class Cache9<A, V> implements Computable<A, V> {
// Future包装value
private final Map<A, Future<V>> cache = new ConcurrentHashMap<>();

private final Computable<A, V> c;

public Cache9(Computable<A, V> c) {
this.c = c;
}


@Override
public V compute(A arg) throws ExecutionException, InterruptedException {
// 由于ConcurrentHashMap的可见性,当前一个线程写入了cache.put(arg, ft);的时候,后面的缓存就会得到f不是null
while (true) {
Future<V> f = cache.get(arg);
if (f == null) {
// 新建一个任务
Callable<V> callable = new Callable<V>() {
@Override
public V call() throws Exception {
return c.compute(arg);
}
};

FutureTask<V> ft = new FutureTask<>(callable);
f = cache.putIfAbsent(arg, ft);

if (f == null) {
f = ft;
System.out.println("从FutureTask调用了计算函数");
ft.run();
}
}
try {
return f.get();
} catch (CancellationException e) {
// 每个异常抛出以后,缓存都被清理掉了
System.out.println("被取消了");
cache.remove(arg);
throw e;
} catch (InterruptedException e) {
cache.remove(arg);
throw e;
} catch (ExecutionException e) {
System.out.println("计算错误,需要重试");
cache.remove(arg);
}
}
}

public static void main(String[] args) throws Exception {
Cache9<String, Integer> expensiveCompute = new Cache9<>(new MayFail());
// 第一个线程
new Thread(new Runnable() {
@Override
public void run() {
try {
Integer result = expensiveCompute.compute("666");
System.out.println("第一个线程:" + result);
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();

// 第二个线程
new Thread(new Runnable() {
@Override
public void run() {
try {
Integer result = expensiveCompute.compute("666");
System.out.println("第二个线程:" + result);
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();

// 第三个线程
new Thread(new Runnable() {
@Override
public void run() {
try {
Integer result = expensiveCompute.compute("667");
System.out.println("第三个线程:" + result);
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
}

结果:

1
2
3
4
5
6
7
8
9
从FutureTask调用了计算函数
从FutureTask调用了计算函数
计算错误,需要重试
计算错误,需要重试
从FutureTask调用了计算函数
从FutureTask调用了计算函数
第一个线程:666
第二个线程:666
第三个线程:667

缓存过期功能

  • 为每个结果指定过期时间,并定期扫描过期的元素

  • 出于安全性考虑,缓存需要设置有效期,到期自动失效

  • 否则如果缓存一直不失效,那么会带来缓存不一致等问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package simple_cache;

import simple_cache.computable.Computable;
import simple_cache.computable.MayFail;

import java.util.Map;
import java.util.concurrent.*;

// 出于安全性考虑,缓存需要设置有效期,到期自动失效否则如果缓存一直不失效,那么会带来缓存不一致等问题
public class Cache10<A, V> implements Computable<A, V> {
// Future包装value
private final Map<A, Future<V>> cache = new ConcurrentHashMap<>();

private final Computable<A, V> c;

public Cache10(Computable<A, V> c) {
this.c = c;
}


@Override
public V compute(A arg) throws ExecutionException, InterruptedException {
// 由于ConcurrentHashMap的可见性,当前一个线程写入了cache.put(arg, ft);的时候,后面的缓存就会得到f不是null
while (true) {
Future<V> f = cache.get(arg);
if (f == null) {
// 新建一个任务
Callable<V> callable = new Callable<V>() {
@Override
public V call() throws Exception {
return c.compute(arg);
}
};

FutureTask<V> ft = new FutureTask<>(callable);
f = cache.putIfAbsent(arg, ft);

if (f == null) {
f = ft;
System.out.println("从FutureTask调用了计算函数");
ft.run();
}
}
try {
return f.get();
} catch (CancellationException e) {
// 每个异常抛出以后,缓存都被清理掉了
System.out.println("被取消了");
cache.remove(arg);
throw e;
} catch (InterruptedException e) {
cache.remove(arg);
throw e;
} catch (ExecutionException e) {
System.out.println("计算错误,需要重试");
cache.remove(arg);
}
}
}

public final static ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);

public V compute(A arg, long expire) throws ExecutionException, InterruptedException {
if (expire > 0) {
executor.schedule(new Runnable() {
@Override
public void run() {
expire(arg);
}
}, expire, TimeUnit.MILLISECONDS);
}
return compute(arg);
}

public synchronized void expire(A key) {
Future<V> future = cache.get(key);
if (future != null) {
// 如果任务时间到了,还没有完成的话,直接取消任务
if (!future.isDone()) {
System.out.println("Future任务被取消");
future.cancel(true);
}
System.out.println("过期时间到,缓存被清除");
cache.remove(key);
}
}

public static void main(String[] args) throws Exception {
Cache10<String, Integer> expensiveCompute = new Cache10<>(new MayFail());
// 第一个线程
new Thread(new Runnable() {
@Override
public void run() {
try {
Integer result = expensiveCompute.compute("666", 5000L);// 5秒后过期
System.out.println("第一次计算结果:" + result);
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();

// 第二个线程
new Thread(new Runnable() {
@Override
public void run() {
try {
Integer result = expensiveCompute.compute("666");
System.out.println("第二次计算结果:" + result);
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();

// 第三个线程
new Thread(new Runnable() {
@Override
public void run() {
try {
Integer result = expensiveCompute.compute("667");
System.out.println("第三次计算结果:" + result);
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();

Thread.sleep(6000);
Integer result = expensiveCompute.compute("666");
System.out.println("第四次计算结果:" + result);
}
}

结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
从FutureTask调用了计算函数
从FutureTask调用了计算函数
计算错误,需要重试
从FutureTask调用了计算函数
计算错误,需要重试
从FutureTask调用了计算函数
计算错误,需要重试
从FutureTask调用了计算函数
计算错误,需要重试
从FutureTask调用了计算函数
计算错误,需要重试
从FutureTask调用了计算函数
第一次计算结果:666
第二次计算结果:666
第三次计算结果:667
过期时间到,缓存被清除
从FutureTask调用了计算函数
计算错误,需要重试
从FutureTask调用了计算函数
第四次计算结果:666

高并发访问时

  • 如果同时过期,那么同时都拿不到缓存,导致打爆cpu和MySQL,造成缓存雪崩、缓存击穿等高并发下的缓存问题

  • 缓存过期时间设置为随机

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package simple_cache;

import simple_cache.computable.Computable;
import simple_cache.computable.MayFail;

import java.util.Map;
import java.util.concurrent.*;

// 出于安全性考虑,缓存需要设置有效期,到期自动失效否则如果缓存一直不失效,那么会带来缓存不一致等问题
public class Cache11<A, V> implements Computable<A, V> {
// Future包装value
private final Map<A, Future<V>> cache = new ConcurrentHashMap<>();

private final Computable<A, V> c;

public Cache11(Computable<A, V> c) {
this.c = c;
}


@Override
public V compute(A arg) throws ExecutionException, InterruptedException {
// 由于ConcurrentHashMap的可见性,当前一个线程写入了cache.put(arg, ft);的时候,后面的缓存就会得到f不是null
while (true) {
Future<V> f = cache.get(arg);
if (f == null) {
// 新建一个任务
Callable<V> callable = new Callable<V>() {
@Override
public V call() throws Exception {
return c.compute(arg);
}
};

FutureTask<V> ft = new FutureTask<>(callable);
f = cache.putIfAbsent(arg, ft);

if (f == null) {
f = ft;
System.out.println("从FutureTask调用了计算函数");
ft.run();
}
}
try {
return f.get();
} catch (CancellationException e) {
// 每个异常抛出以后,缓存都被清理掉了
System.out.println("被取消了");
cache.remove(arg);
throw e;
} catch (InterruptedException e) {
cache.remove(arg);
throw e;
} catch (ExecutionException e) {
System.out.println("计算错误,需要重试");
cache.remove(arg);
}
}
}

// 缓存过期时间设置为随机
public V computeRandomExpire(A arg) throws ExecutionException, InterruptedException {
long randomExpire = (long) (Math.random() * 10000);
return compute(arg, randomExpire);
}

public final static ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);

public V compute(A arg, long expire) throws ExecutionException, InterruptedException {
if (expire > 0) {
executor.schedule(new Runnable() {
@Override
public void run() {
expire(arg);
}
}, expire, TimeUnit.MILLISECONDS);
}
return compute(arg);
}

public synchronized void expire(A key) {
Future<V> future = cache.get(key);
if (future != null) {
// 如果任务时间到了,还没有完成的话,直接取消任务
if (!future.isDone()) {
System.out.println("Future任务被取消");
future.cancel(true);
}
System.out.println("过期时间到,缓存被清除");
cache.remove(key);
}
}

public static void main(String[] args) throws Exception {
Cache11<String, Integer> expensiveCompute = new Cache11<>(new MayFail());
// 第一个线程
new Thread(new Runnable() {
@Override
public void run() {
try {
Integer result = expensiveCompute.compute("666", 5000L);// 5秒后过期
System.out.println("第一次计算结果:" + result);
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();

// 第二个线程
new Thread(new Runnable() {
@Override
public void run() {
try {
Integer result = expensiveCompute.compute("666");
System.out.println("第二次计算结果:" + result);
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();

// 第三个线程
new Thread(new Runnable() {
@Override
public void run() {
try {
Integer result = expensiveCompute.compute("667");
System.out.println("第三次计算结果:" + result);
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();

Thread.sleep(6000);
Integer result = expensiveCompute.compute("666");
System.out.println("第四次计算结果:" + result);
}
}

模拟大量请求,观测缓存效果

用线程池创建大量线程get,用了缓存后,总体耗时大大减少,体现了缓存的作用

注:IO密集型(某大厂实践经验)

核心线程数 = CPU核数 / (1-阻塞系数)

或者

CPU密集型:核心线程数 = CPU核数 + 1 IO密集型:核心线程数 = CPU核数 * 2

60万并发:

线程池大小17:124ms

100万并发:

线程池大小21:356ms

线程池大小17:148ms

线程池大小40:375ms

说明是CPU密集型

不用CountDownLatch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import computable.ExpensiveFunction;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* Created by guolin
* 模拟大量请求,观测缓存效果,没用CountDownLatch的情况下,在16核20线程机器上,200ms可以承受105万并发查询缓存
*/
public class QpsTest1 {

static Cache12<String, Integer> expensiveComputer = new Cache12<>(new ExpensiveFunction());

public static void main(String[] args) {

// 线程池大小
ExecutorService service = Executors.newFixedThreadPool(17);

long start = System.currentTimeMillis();

for (int i = 0; i < 1050000; i++) {
service.submit(() -> {
Integer result = null;
try {
result = expensiveComputer.compute("666");
} catch (ExecutionException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
//System.out.println(result);
});
}
// 关闭线程池
service.shutdown();

// 如果线程池关闭则true
while (!service.isTerminated()) {
}

System.out.println("总耗时:"+(System.currentTimeMillis() - start));
}
}

用CountDownLatch

40万并发:

线程池大小17:107ms

CountDownLatch概念

CountDownLatch是一个同步工具类,用来协调多个线程之间的同步,或者说起到线程之间的通信(而不是用作互斥的作用)。

CountDownLatch能够使一个线程在等待另外一些线程完成各自工作之后,再继续执行。使用一个计数器进行实现。计数器初始值为线程的数量。当每一个线程完成自己任务后,计数器的值就会减一。当计数器的值为0时,表示所有的线程都已经完成一些任务,然后在CountDownLatch上等待的线程就可以恢复执行接下来的任务。

CountDownLatch的用法

CountDownLatch典型用法:1、某一线程在开始运行前等待n个线程执行完毕。将CountDownLatch的计数器初始化为new CountDownLatch(n),每当一个任务线程执行完毕,就将计数器减1 countdownLatch.countDown(),当计数器的值变为0时,在CountDownLatch上await()的线程就会被唤醒。一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。

CountDownLatch典型用法:2、实现多个线程开始执行任务的最大并行性。注意是并行性,不是并发,强调的是多个线程在某一时刻同时开始执行。类似于赛跑,将多个线程放到起点,等待发令枪响,然后同时开跑。做法是初始化一个共享的CountDownLatch(1),将其计算器初始化为1,多个线程在开始执行任务前首先countdownlatch.await(),当主线程调用countDown()时,计数器变为0,多个线程同时被唤醒。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import computable.ExpensiveFunction;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* Created by guolin
* 模拟大量请求,观测缓存效果
* 用CountDownLatch的情况下,在16核20线程机器上,200ms可以承受105万并发查询缓存
*/
public class QpsTest2 {

static Cache12<String, Integer> expensiveComputer = new Cache12<>(new ExpensiveFunction());

// 同步工具类:等待1个线程执行完毕,其他线程统一开始,最大压测
public static CountDownLatch countDownLatch = new CountDownLatch(1);

public static void main(String[] args) throws InterruptedException {

// 线程池大小
ExecutorService service = Executors.newFixedThreadPool(17);

long start = System.currentTimeMillis();

for (int i = 0; i < 100; i++) {
service.submit(() -> {
Integer result = null;
try {
System.out.println(Thread.currentThread().getName()+"开始等待");
countDownLatch.await();
SimpleDateFormat dateFormat = ThreadSafeFormatter.dateFormatter.get();
String time = dateFormat.format(new Date());
System.out.println(Thread.currentThread().getName()+" "+time+"被放行");
result = expensiveComputer.compute("666");

} catch (ExecutionException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(result);
});
}

Thread.sleep(5000);
countDownLatch.countDown();
service.shutdown();
}
}

class ThreadSafeFormatter {

public static ThreadLocal<SimpleDateFormat> dateFormatter = new ThreadLocal<SimpleDateFormat>() {

//每个线程会调用本方法一次,用于初始化
@Override
protected SimpleDateFormat initialValue() {
return new SimpleDateFormat("mm:ss");
}

//首次调用本方法时,会调用initialValue();后面的调用会返回第一次创建的值
@Override
public SimpleDateFormat get() {
return super.get();
}
};
}
-------------本文结束感谢您的阅读-------------