栖静林

不忘初心,方得始终


  • 首页

  • 分类

  • 归档

  • 标签

动态代理的由来

发表于 2016-05-01   |   分类于 JAVA   |     |   阅读次数

我们在学习的过程中一直会遇到各种高大上的技术名词或缩写,其中动态代理就是其中之一。之前只是了解过动态代理是代理模式的一种实现,其中JDK的动态代理是基于接口的,CGlib(Code Generation Library)的动态代理是基于类的,通过动态代理可以在原本实现的基础上做到一些增强功能或额外操作,等等,可以说得头头是道,但还是不能做到知根知底,一清二楚。比如基于接口是什么意思?基于类又是什么意思?为什么?怎么个动态法?JDK的动态代理是怎么设计出来的?今天就通过博文揭开动态代理的今生前世,看看它是怎么逆袭成高大上的。

代理模式

概述

以下概念引自GOF所著《Design Patterns》的中译本及英文版。

代理模式
为其他对象提供一种代理以控制对这个对象的访问。
Proxy Pattern
Provide a surrogate or placeholder for another object to control access to it.

通俗点讲,在代理模式中,至少有两个角色:代理类proxy和被代理类real,外界对real的访问都是通过proxy传达完成的。可以说real对外界是不可见的或者透明的,因为他们对外公开的方法是一模一样的(通过实现相同的接口或继承相同的类保证)。这时proxy相当于一个中介,在外界调用真正到达real时,proxy可以做一些额外操作,比如安全验证、记录访问日志等。下面通过具体的UML图来解释。

模式的组成结构

这里以基于接口为例来讲,下面是代理模式的UML图。
UML图
代理模式包括三种角色:

  • 抽象主题(Subject):抽象主题是一个接口,该接口是对象和它的代理所共用的接口,即是RealSubject和Proxy所实现的接口。
  • 实际主题(RealSubject):实际主题是实现抽象主题接口的类。实际主题的实例是代理角色实例所要代理的对象。
  • 代理(Proxy):代理是实现抽象主题接口的类(代理和实际主题实现了相同的接口)。代理含有主题接口声明的变量,该变量用来存放RealSubject的实例引用,这样一来,代理的实例就可以控制对它所包含的RealSubject的实例访问,即可以控制对它所代理对象的访问。

一个例子

登录注册是最常见的基本功能,如果我们还想记录登录的时间,就可以让代理类来完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//抽象主题,统一对外公开的方法
public interface IUserService {
public void login(String username);
}
//实际主题,对外是不可见的
public class UserServiceImpl implements IUserService {
public void login(String username) {
System.out.println(username + "登录成功");
}
}
//代理,外界对实际主题访问的中介
public class UserServiceImplProxy implements IUserService {
//被代理类的实例
IUserService userService = new UserServiceImpl();
public void login(String username) {
System.out.println("当前时间:" + Calendar.getInstance().toString());
//调用被代理类的方法
userService.login(username);
}
}

静态代理

上边写得例子属于动态代理的前身:静态代理,静态是什么意思呢?对于上边的代码,它是写死的,被代理的方法、被代理的原始类和被代理的接口都是写死固定不变的,即login方法、UserServiceImpl类和IUserService接口是确定的。而动态代理的动态,是指这三者是不确定的,未知的,可变的,不是写死的。是不是还不太懂?那我们来想一下静态代理都有哪些缺点,为什么需要动态代理。

第一个缺点,对于接口IUserService,如果还有其他的方法,比如注册、修改个人资料等,那就要在代理类中对每一个方法进行一对一的代理,而且每一个代理方法都非常类似,结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//一般化的代理类中的方法
public ResultType methodName(ParamType paramName0, ParamType paramName1...) {
//实际调用前,增强功能、额外操作
doSomethingBefore();
//调用被代理类的方法
ResultType result = target.methodName(paramName0, paramName1...);
//实际调用后,增强功能、额外操作
doSomethingAfter();
//返回被代理类方法的执行结果
return result;
}

可以想象当有很多需要被代理的方法时,会存在大量重复冗余的代码。

第二个缺点,上边第一个缺点只是针对被代理的方法需要变化时带来的不便,那如果被代理的原始类和被代理的接口也需要变化呢?比如现在不只是用户相关的接口(IUserService)需要记录访问时间,其他任何接口都需要记录,即如果当前有N个接口,每个接口中有M个方法,那按照静态代理一对一进行编码,就至少需要写N*M个代理方法,而且结构非常类似,光是想想就爆炸。

那应该怎么改进呢?

动态代理

现在我们针对上边的两个缺点,来一步步分析静态代理是怎么进化成动态代理的。如果不想写那么多重复的,想合并到一起,就需要把写死的变化的部分转化为动态可变的。

对于上边代理方法的结构,变化的部分只有一行,即调用被代理类的方法,其中被代理类target可以不同,对应的方法methodName也可以不同。静态代理中target和methodName是写死的,比如上边的UserServiceImpl和login,那能不能把target和methodName写成变量,比如String类型的,然后在运行时根据传进来的类型名和方法名实现动态调用执行呢?可以吗?有这种技术吗?机智如你有没有想到反射呢?这时上边的代理方法的结构就可以写成如下形式了(示意代码,勿抠细节)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//使用反射后,通用的代理方法
public Object methodProxy(String targetFullName, String methodName, Object[] args) {
//实际调用前,增强功能、额外操作
doSomethingBefore();
/**
*调用被代理类的方法
*1. 根据targetFullName反射获取被代理类的实例
*2. 根据methodName获取被代理方法的实例methodInstance (java.lang.reflect.Method类型)
*3. 调用methodInstance类型的invoke方法完成调用
*/
Object target = Class.forName(targetFullName).newInstance();
Method methodInstance = Class.forName(targetFullName).getMethod(methodName, new Class[] {
args[0].getClass(),
args[1].getClass(),
......
});
Object result = methodInstance.invoke(target, args);
//实际调用后,增强功能、额外操作
doSomethingAfter();
//返回被代理类方法的执行结果
return result;
}

这样写得话就可以做到“一次编写,到处复用”了,每个代理类中所有的代理方法都可以像上边那样,调用并返回methodProxy。不同的只是:在不同的代理类中,变量targetFullName不同;在不同的方法中,变量methodName不同。这时,无论被代理的原始类和被代理的接口是谁,其对应的代理类,结构都非常类似,如下所示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
//代理,外界对实际主题访问的中介
public class ProxyClassName implements SubjectInterfaceName {
//被代理类的全名称
String targetFullName;
//构造时绑定被代理类的全名称
public ProxyClassName(String targetFullName) {
this.targetFullName = targetFullName;
}
//一般化的代理类中的方法
public ResultType methodName0(ParamType paramName0, ParamType paramName1...) {
//调用上边的通用方法
return (ResultType) methodProxy(targetFullName, methodName0, new Object[] {
paramName0,
paramName1,
......
});
}
//一般化的代理类中的方法
public ResultType methodName1(ParamType paramName0, ParamType paramName1...) {
//调用上边的通用方法
return (ResultType) methodProxy(targetFullName, methodName1, new Object[] {
paramName0,
paramName1,
......
});
}
//......所有SubjectInterfaceName接口中定义的方法都有如上实现
}

前面我们已经针对被代理的方法中把变化的部分转化为动态可变的部分,通过反射实现了被代理的方法的改善,使得方法内重复的部分“一次编写,到处复用”。那么对于上边这个代理类的结构,你有没有觉得它也有很多重复冗余的地方呢?想象现在有很多个上边这种结构的代理类,它们不同的的部分有哪些呢:1.代理类的类名ProxyClassName;2.被代理类实现的接口名SubjectInterfaceName;3.内部各个方法签名methodName0(args),methodName1(args)…;除此之外,所有的代码完全一样,那有没有什么办法能够像参照模版一样,根据给定不同的类名ProxyClassName,接口名SubjectInterfaceName自动生成如上结构的代理类代码呢?是的,你没听错,我们需要的是用代码生成代码的技术,像工厂一样批量生产代理类。可行吗?有这种技术吗?机智如你有没有想到字节码生成技术呢?完全可以按照上边的模版拼凑出我们需要的代理类,其中只需要提供需要实现的接口SubjectInterfaceName,类名ProxyClassName没有要求,我们可以自动生成,内部的所有方法名可以通过对接口进行反射获取。

这时我们改善完成的代理模式就进化成了动态代理,只需要我们自己写methodProxy方法中的额外操作,然后指定一个接口,就得到了我们需要的代理类,简单方便,主要有两个组成部分:

  1. 工厂:利用字节码生成技术,生成指定的代理类;
  2. 通用的代理方法;

本来想继续用伪码的形式写出一个结构来,但不太好表达,我们还是直接看JDK的动态代理吧,有了上边的理解,你应该就能领会到JDK设计的巧妙了。

JDK的动态代理

在JDK的动态代理实现中,对应上边工厂组件的是类Proxy,对应通用代理方法methodProxy的是接口InvocationHandler中的invoke方法。为什么是接口呢?因为这样我们可以通过实现该接口自定义需要的增强功能或额外操作,别忘了这才是目的。其中类Proxy中有一个InvocationHandler类型的成员变量h,然后类Proxy生成的代理类(extends Proxy implements SubjectInterfaceName)中,所有的方法调用,都会通过”super.h.invoke(Object, Method, object[])”这行代码委托到我们自定义的invoke方法中。下面我们使用JDK的动态代理实现最开始的登录功能,看看改进的效果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
//抽象主题,统一对外公开的方法
public interface IUserService {
public void login(String username);
}
//实际主题,对外是不可见的
public class UserServiceImpl implements IUserService {
public void login(String username) {
System.out.println(username + "登录成功");
}
}
//自定义的InvocationHandler实现类,主要用来定义通用调用方法中的额外操作
public class MyInvocationHandler implements InvocationHandler {
//被代理的目标类
public Object target;
//通过构造函数绑定目标类
public MyInvocationHandler(Object target) {
this.target = target;
}
//通用的调用方法
@Override
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
//实际调用前
System.out.println("之前的额外操作");
//调用代理类的方法
Object result = method.invoke(target, args);
//实际调用后
System.out.println("之后的额外操作");
//返回调用结果
return result;
}
//通过Proxy类的静态方法获取代理类的实例
public Object getProxy() {
return Proxy.newProxyInstance(
//当前线程上下文的类加载器
Thread.currentThread().getContextClassLoader(),
//目标类实现的所有接口
target.getClass().getInterfaces(),
//当前类,主要用于调用invoke方法
this);
}
}
//测试类
public class Main {
public static void main(String[] args) {
//绑定需要代理的类
MyInvocationHandler invocationHandler = new MyInvocationHandler(new UserServiceImpl());
//利用字节码生成技术生成代理类,并获取其实例(注意声明的类型是IUserService)
IUserService proxy = (IUserService) invocationHandler.getProxy();
//透明地调用对应的方法
proxy.login("ksni88");
}
}
执行结果:
之前的额外操作
ksni88登陆成功
之后的额外操作

可以看到,使用动态代理时,我们根本不用自己写代理类,只用定义我们真正需要的额外操作,然后指定需要被代理的目标类,其他的一切工作都省掉了,相比静态代理可要高明高效的多。看到这里,你可能感觉还不太舒服,因为我们没有自己写代理类,不知道它到底是什么样子,需要看一眼才放心,对吧?我们可以在上边main()方法中加入下面这句:

1
System.setProperty("sun.misc.ProxyGenerator.saveGeneratedFiles", "true");

之后在项目目录下新建目录com\sun\proxy,然后再次运行程序,将在刚才目录下产生一个名为”$Proxy0.class”的代理类Class文件,这就是自动生成的代理类,可以使用反编译工具看一下它长什么样。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
public final class $Proxy0
extends Proxy
implements IUserService
{
private static Method m1;
private static Method m0;
private static Method m3;
private static Method m2;
public $Proxy0(InvocationHandler paramInvocationHandler)
throws
{
super(paramInvocationHandler);
}
public final boolean equals(Object paramObject)
throws
{
try
{
return ((Boolean)this.h.invoke(this, m1, new Object[] { paramObject })).booleanValue();
}
catch (Error|RuntimeException localError)
{
throw localError;
}
catch (Throwable localThrowable)
{
throw new UndeclaredThrowableException(localThrowable);
}
}
public final int hashCode()
throws
{
try
{
return ((Integer)this.h.invoke(this, m0, null)).intValue();
}
catch (Error|RuntimeException localError)
{
throw localError;
}
catch (Throwable localThrowable)
{
throw new UndeclaredThrowableException(localThrowable);
}
}
public final void **login**(String paramString)
throws
{
try
{
this.h.invoke(this, m3, new Object[] { paramString });
return;
}
catch (Error|RuntimeException localError)
{
throw localError;
}
catch (Throwable localThrowable)
{
throw new UndeclaredThrowableException(localThrowable);
}
}
public final String toString()
throws
{
try
{
return (String)this.h.invoke(this, m2, null);
}
catch (Error|RuntimeException localError)
{
throw localError;
}
catch (Throwable localThrowable)
{
throw new UndeclaredThrowableException(localThrowable);
}
}
static
{
try
{
m1 = Class.forName("java.lang.Object").getMethod("equals", new Class[] { Class.forName("java.lang.Object") });
m0 = Class.forName("java.lang.Object").getMethod("hashCode", new Class[0]);
m3 = Class.forName("testProxy.IUserService").getMethod("login", new Class[] { Class.forName("java.lang.String") });
m2 = Class.forName("java.lang.Object").getMethod("toString", new Class[0]);
return;
}
catch (NoSuchMethodException localNoSuchMethodException)
{
throw new NoSuchMethodError(localNoSuchMethodException.getMessage());
}
catch (ClassNotFoundException localClassNotFoundException)
{
throw new NoClassDefFoundError(localClassNotFoundException.getMessage());
}
}
}

通过以上反编译得到的代码,可以观察到自动生成的代理类有以下几个特点:

  1. 该代理类继承Proxy类,实现了被代理类实现的接口;
  2. 该代理类中的所有方法调用,全部委托到父类Proxy的成员变量InvocationHandler中的invoke方法,从而执行额外操作和反射调用;
  3. 该代理类不仅实现了所有接口中定义的方法,还重写了Object类中的equals、hashCode、toString三个方法;

看到这里,不知道你有没有完全理解动态代理的设计和由来,反正我是懂了(哈哈。。不懂就多看几遍,多想想)。而在博文一开始提出的问题,现在也可以得到解答了。

  1. 为什么JDK的动态代理是基于接口的?—因为总要有一个父类或接口来定义代理类和被代理类对外公开的方法,而JDK的动态代理生成的代理类已经继承了Proxy类,所以只能通过接口来定义了。而CGlib生成的代理类,是直接继承被代理类的,所以是基于类的。
  2. 什么是基于接口,基于类?—从上一问的回答也可以看出,所谓的基于接口或类,即代理类和被代理类对外公开的方法是由类或接口定义的。
  3. 怎么个动态法?怎么设计的?—不懂得就再多看几遍,敲敲想想吧。

动态代理的优点

最后,我还想引用《深入理解JAVA虚拟机》中的原话来总结动态代理的优点,作者总结得实在是太好了,一起来感受一下吧。

动态代理中所谓的“动态”,是针对使用Java代码实际编写了代理类的“静态”代理而言的,它的优势不在于省去了编写代理类那一点工作量,而是实现了可以在原始类和接口还未知的时候,就确定代理类的代理行为,当代理类与原始类脱离直接联系后,就可以很灵活地重用于不同的应用场景之中。

你之前是不是觉得不就省了点代码吗??哈哈,我也是。

初探MySQL索引

发表于 2016-04-14   |   分类于 MySQL   |     |   阅读次数

在上个月搜狐面试中,面试官看我有很多项目都用到了MySQL,于是问我有没有用过索引,我说没有,然后他诧异地问我遇到过的最大的单表数据量有多大,我赶紧掩饰说没多大也就上万吧,然后就没有然后了。。想来也是羞愧万分,用了这么久的MySQL竟然没有接触过索引,这几天正好需要维护一个去年暑假的项目,跑了大半年数据量已经很大了,借此机会学习实践一下索引。先把最大的体会放出来:对于索引,只能说,不用不知道,用了离不了!

简单介绍一下项目内容:该项目数据库相关的主要是每天定时定期地从6个新闻门户网站上爬取定量的新闻存储到数据库中,以供后期的查取。由于当时需求方要求不高,自己的水平和要求也不高,所以当时既没有使用索引,也没有定期删除过期的新闻数据,结果就造成news新闻表中的数据随着时间的推移无限量增长,两个月前谢帆做iOS版本用接口取数据时更是要等五到十分钟才有反应,当时通过自增id得知表数据近百万了,但由于已经交付大半年了,没有理由主动联系对方,也就无能为力了。这几天对方主动联系做一些小改动,这才下决心学习一下索引,全面改善一下。

预备知识

在搜狐面试结束以后,我查过一些关于索引的资料,简单了解了索引的分类、结构、原理等,这么多的内容我也没有进行系统全面的学习,但其中最重要最基础的就是B+树了,了解了什么是B+树以后,最好再了解一下索引对B+树的应用,看完觉得不过瘾还可以接着看一下MySQL索引的原理,最后一篇一开始就看可能会不太懂,但只用先记住其中最重要的一条规则:最左前缀原则,经过实践再回头看就柳暗花明了。

索引实践

先贴出进行实践的数据表结构,非常简单。
表结构

建立索引

先找到需要使用索引的SQL,功能很简单,就是分页获取指定类别的最新新闻的id和title,具体做法是按照date对指定category_id的新闻进行排序,然后按照分页参数选取数据,下面是具体的示例代码。

1
2
3
4
5
select id, title
from news_copy
where category_id=1
order by date desc
limit 1000,10

病急乱投医,按照此文给的建议:

如果对于有where条件,又想走索引用limit的,必须设计一个索引,将where放第一位,limit用到的主键放第2位,而且只能select主键。

我在该表上建立了联合索引category_id_date=(category_id, date),这句话是相当精简正确的,但我当时注意力全放在怎么建索引了,没有在意最后一句话,这也造成了后面一些困惑,先且接着往下看。

造大量假数据

建立完索引,就需要造大量假数据进行测试了,我找到去年开发时的news表,里边存有5000多条数据,然后通过存储过程把该表数据循环添加到另一张结构一样的news_copy表中,循环100次,下图是添加后的数据量。
数据量
可以看到有近60W条数据,下边分别是建索引前后的效果图:
建索引前
建索引后
可以看到效率相差10倍,而且这个差距肯定会随着数据量的增加而变大。然而,虽然建立索引大大增加了查询效率,但建索引后的查询效率还是不能让我们满意。这时我们就该回头关注SQL本身了,能不能进行优化呢?

SQL优化

依然是病急乱投医(不过通过实践遇到问题就解决是最快的学习途径,只是不够系统全面),查了一通SQL优化以后,在繁杂的搜索结果里有一篇博文让我相见恨晚,像是捡了宝一样赶紧按照他说的改了一下SQL:

1
2
3
4
5
6
7
select n.id, title from news_copy n
inner join
(select id from news_copy
where category_id=1
order by date desc
limit 1000,10) t
on n.id=t.id

执行结果如下:
SQL优化后
是不是惊到了?竟然相差近百倍。这又是为什么呢?这里就要引出另外一个概念了:覆盖索引。

覆盖索引

索引一般用来快速定位想要查询的数据的位置,然后据此精确读取,所以有两步读取操作,第一步读索引,第二步根据索引读数据并返回。换句话说,索引只是用来定位数据,而不是直接返回数据。然而覆盖索引就是一个特例,下面是覆盖索引的概念:

A covering index is a special case where the index itself contains the required data field(s) and can return the data.

其实覆盖索引并不是一个“强概念”,而更像一个简单的名称:当索引字段包含所有需要查询的字段时,可以只读取一次索引就能返回查询结果,这种特例使用到的索引叫做覆盖索引。显而易见,当使用覆盖索引时查询是飞快的(因为不用回表查询)。

读到这里你应该还是不太明白优化后的SQL它具体因为什么变快了,它的执行过程和优化前有什么区别啊?对!你需要了解具体的执行过程才能彻底明白,但在解释两者的执行过程之前,你还需要了解一下SQL语句一般情况下的执行过程和先后顺序(你是不是发现预备知识有点多呢?不要骂娘,谁让我们是半路出家,确实需要一些上下文信息才能理解得清楚明白,我也是顺藤摸瓜,需要什么学什么)。

SQL的整体执行过程

这里说的执行过程是一个宏观上的,把整个SQL语句当作一个整体,看一下这个整体从执行开始到返回结果都经历了哪些过程(参考:博文):

  1. 应用通过MySQL API把查询命令发送给MySQL服务器,然后被解析;
  2. 检查权限、MySQL optimizer进行优化,经过解析和优化后的查询命令被编译为CPU可运行的二进制形式的查询计划(query plan),并可以被缓存;
  3. 如果存在索引,那么先扫描索引,如果数据被索引覆盖,那么不需要额外的查找,如果不是,根据索引查找和读取对应的记录;
  4. 如果有关联查询,查询次序是扫描第一张表找到满足条件的记录,按照第一张表和第二张表的关联键值,扫描第二张表查找满足条件的记录,按此顺序循环;
  5. 输出查询结果;

这里需要注意的是第3步,这一步是SQL语句真正开始执行的开始,如果没有可用索引,就会进行全表扫描,读取所有记录的所有字段;而如果有可用索引,会先扫描索引,然后根据索引读取对应记录的所有字段。注意我说的是所有字段,而不只是select的字段,至于具体的过程,继续往下看。

SQL的具体先后顺序

上边讲得是把SQL语句作为一个整体看,这里是深入SQL语句内部,讨论各个SQL关键字的执行先后顺序(参考:博文):

1
2
3
4
5
6
7
8
9
10
(7) SELECT
(8) DISTINCT <select_list>
(1) FROM <left_table>
(3) <join_type> JOIN <right_table>
(2) ON <join_condition>
(4) WHERE <where_condition>
(5) GROUP BY <group_by_list>
(6) HAVING <having_condition>
(9) ORDER BY <order_by_condition>
(10) LIMIT <limit_number>

前边的数字表示该行SQL被执行的次序。

SQL执行的开始是FROM语句,查看目标表是否有可用索引(见上),然后取所有字段,然后根据WHERE查询条件选取满足条件的记录,然后根据SELECT语句选取需要的字段,最后根据LIMIT语句选取并返回需要的数据结果。所以不要天真地认为每次查询只会读取SELECT指定的字段,查询不是一个从零挑选增加成结果的过程,而是从满筛选去除成结果的过程(其实道理很简单,因为我们需要把所有数据读到内存中进行处理,而不能在文件层面有选择的读取)。

优化后的SQL为什么快

现在我们终于可以分析一下我们的SQL优化前后具体有什么区别了,先记住一个数据再继续看:表news_copy中category_id=1的记录共5W多条。

  1. 优化前的执行过程:根据where语句从表news_copy中选出所有category_id=1的记录:这一步因为where字段category_id满足使用索引的最左前缀条件,所以这里是先利用索引读取所有category_id=1的5W多条记录的id(为什么索引里存有主键呢?不明白的可以再仔细看看这篇博文里的聚簇索引了,前边推荐过),然后根据这5W多个id回表查询5W多条记录的所有字段,然后根据order by对这些记录排序(这里可以利用索引),然后根据limit选取从第1000条记录开始的10条记录,然后根据select筛选字段id和title并返回。
    说明:上边排序时利用到的了limit和order by组合使用时的优化,所以上边排序时并不是对5W多条记录全排序,而是在前1010条记录排好序后就停止了排序。
  2. 现在再来看优化后的执行过程:先执行子查询t,同样利用索引读取5W多个id,但注意这里select只有id字段,所以利用到了覆盖索引,所以这里不再回表查询5W多条记录的所有字段,而是直接在索引中完成剩下的order by和limit语句,然后子查询返回了1010条id,这时外查询进行连接查询,只用根据这1010个id读取1010条记录的所有字段,然后筛选id和title字段并返回。

所以两者的主要差别在于IO读取的数据量,前者高达5W多条,后者只有1010条。
对于上边的解释,你可能还会疑问,我怎么知道SQL有没有利用覆盖索引呢?对于平常的SQL,怎么查看SQL的执行效率呢?这就需要神命令EXPLAIN来帮忙了。

explain & show profile

在写这篇博文时,我也想知道我的SQL到底有没有用覆盖索引,网上查了一下发现可以使用explain命令来查看,使用简单方便,可以看下图:
explain优化前
explain优化后
可以看到输出有很多参数列,其中如果使用了覆盖索引,在最后的Extra列中就会显示“Using index”,否则不会。这里可以清楚的看到优化后的子查询使用到了覆盖索引。其中rows列表示查询可能读取的记录行数,也可以清楚的看到,优化后的SQL在子查询中有5W多行,即5W多个id,然后外查询只有1020行。而优化前的SQL涉及5W多行,不止id,而是所有字段。所以通过explain命令,我们可以大概知道我们的SQL会涉及多大的数据量,会不会使用覆盖索引等信息,其实explain还可以提供给我们更多的信息,前提是你对explain命令的输出足够熟悉,这里不再过多介绍,感兴趣可以看explain命令说明和explain命令输出参数说明。

另外,如果你还想知道SQL在执行的各个阶段的耗时,比如CPU、IO等详细信息,可以看一下show profile命令,但该命令在版本5.6.7中被弃用了,转而被Performance Schema替代。

后记

对于这个项目的维护,梦非早就催我让我改善一下,但我总是推脱,既不想费心维护一个已经交付的项目,也不想费力去学习索引,而一直拖到该项目又有改动时才动手学习。平时我们可能有很多这种情况,给自己找各种不动手学习的借口:索引?哪来那么多数据让我测啊?分布式?哪来那么多机器让我试啊?而又总是在关键时刻方恨少。其实只要我们下定决心,稍微推动自己一下下,就能获益匪浅,而对于学习掌握的知识,一定是有用的,只是时间早晚而已。这篇博文我是14号开始着手整理写得,写一半的时候,17号腾讯面试就又问了索引,基本把博文讲了一遍,当时简直庆幸自己刚学过索引。另外关于写博客本身,也是获益匪浅,写的过程就是把自己的理解和思路表达出来的过程,不仅能加深自己的理解,发现盲点,还能锻炼自己的表达沟通能力,现在我写一篇博文大概要一周时间,举步维艰,相信坚持下去会有提高。

volatile和Java内存模型

发表于 2016-04-10   |   分类于 Java   |     |   阅读次数

本文是在读了周志明老师的《深入理解Java虚拟机》之后写得,算是读书笔记吧,部分讲解思路和图均出自此书,因为我想不出比这种讲解思路更容易理解的讲法。

在接触volatile之前,我们只知道“原子性”,原子性很好理解,这里也不再啰嗦。但当volatile引出“可见性”的时候,就不像原子性那样可以一两句说清楚说明白了。上网一查还总是让我们先学一下Java的内存模型,难道就不能像原子性那样简单直白的说明白吗?其实我们可以回想一下第一次接触“原子性”的时候是怎么去理解的,原子性就是它的字面意思,一个操作像原子(虽然物理上的原子还是可分的,引用东哥的话说,不要在意这些细节)那样是细不可分的,最小的操作。但我们真正理解和明白原子性,是通过知道了汇编语言和机器语言之后的事。因为正是由于高级语言要编译成汇编语言,最后对应于机器语言才能执行的这个过程,造成了高级语言存在非原子性操作的问题。同样的,可以说是Java的内存模型造成了可见性这个问题,所以如果想从原理上,而不只是字面上去理解“可见性”,确实需要先学一下Java的内存模型,其实是很简单的一个东西。

Java内存模型

为了更容易地理解Java内存模型,我们可以参考一下本科学过的硬件上的缓存设计。为了协调存储设备和处理器之间的速度差距,所以引入了高速缓存Cache:将运算需要使用到的数据复制到Cache中,在运算过程中CPU只在Cache上进行存取,在运算结束以后再把Cache中的数据同步写回到内存中,这样避免了CPU等待相对缓慢的内存读写,交互关系如下图:
处理器、高速缓存、主内存间的交互关系
上图中的缓存一致性协议不是重点,重点是这种结构设计。
类似的,Java内存模型规定,所有的变量(不包括局部变量和方法参数这种线程私有的,不会被共享的变量)都存储在主内存中(此处的主内存是虚拟机内存的一部分),每条线程还有自己的工作内存(可与上边的Cache类比),线程的工作内存中保存了该线程使用到的变量的主内存副本拷贝,线程对变量的所有操作都必须在工作内存中进行,而不能直接读写主内存中的变量。不同线程之间也无法直接访问对方的工作内存,线程间的变量传递均通过主内存完成,交互关系如下图:
线程、工作内存、主内存间的交互关系
同样的,上图中的交互操作不是重点,重点是这种结构设计。
需要注意的是,这里所讲的主内存、工作内存,均是虚拟机内存的一部分,它们和JVM内存区域中的Java堆、栈、方法区等不是同一个层次的内存划分,前者更偏向逻辑上的划分,后者更偏向物理上的划分,如果要强行对应的话,主内存主要对应于Java堆中的对象实例数据部分,工作内存主要对应于栈中的部分区域。

可见性

在了解了Java内存模型以后,我们再去理解“可见性”时,就清楚明白多了。正因为工作内存的存在,所以某些线程正在操作的共享变量可能不是最新值,这就造成了“不可见性”问题。所以

可见性是指当一个线程修改了共享变量的值,其他线程能够立即得知这个修改。

那要怎么保证可见性呢?很简单,只要消除造成“不可见性”的原因————工作内存就好了,是的,如果能够跳过工作内存,直接操作主内存,那就一定是可见的,但这样就破坏了这种结构设计。所以正常的,也是Java volatile关键字采用的,用以下两条规则保证可见性:

  1. 在每次修改该变量后,立即从工作内存同步写回到主内存中;
  2. 在每次使用该变量前,立即从主内存中再读取到工作内存中;

要注意的是,volatile变量也是有工作内存副本的,它并没有破环这种结构设计,它与普通变量的区别仅仅是以上两条规则,即读取和写回的及时性。

Java中保证可见性的关键字

在Java中,一共有三个关键字可以保证可见性:

  1. volatile:通过上边两条规则保证;
  2. final:初始化后不可变,不存在修改的情况;
  3. synchronized:通过内存交互规则之一保证,“对一个变量执行unlock操作之前,必须先把此变量同步回主内存中”,注意此处说的并不是同步块中的操作,而单单是指被synchronized修饰的变量本身。

volatile

对volatile的误解

至于具体的误解是什么,我就不说了,因为在我身上经常发生本来记得很对的概念,看了一眼反例就再也分不清了的蠢事。所以这里只把结论放出来,并解释为什么会这样。

volatile只能保证被修饰变量的可见性,不保证对其操作的原子性,对其所有的操作都和普通变量一样是非线程安全的。

volatile对变量的影响仅仅只是可见性,这是我们一直强调的一点,如果你已经理解了什么是可见性,但当可见性和原子性这两个概念放在一起时有点晕的话,看了下边的例子可能就清楚多了。

1
2
static volatile int num;
num++;

如果volatile能保证变量操作的原子性,那上边num++这个操作应该是原子操作,即它应该是一个细不可分的、不会被“中途打断”的操作,但稍微清醒一点的你也会意识到这是不可能的。这个操作最起码要分成三个原子操作才能完成:

  1. 读取num当前值;
  2. 对num当前值进行加1运算;
  3. 回写运算后的num值;

在多线程环境下,任一个正在进行num++操作的线程,都有可能在上边三个原子操作之间的任一点被切换上下文。这也是可以通过Javap反汇编印证的,下边是num++反汇编对应的字节码指令:

1
2
3
4
getstatic
iconst_1
iadd
pustatic

可以清楚的看到num++操作被编译成了四条字节码指令,所以可见性和原子性之间并没有什么必然联系,而volatile只保证变量的可见性,对于num++这个操作而言,volatile的唯一作用是,当且仅当putstatic指令执行完以后,任一线程通过getstatic指令读取到的num值都是最新的。(这里用字节码来说明原子性不是很严谨,因为即使编译出来只有一条字节码指令,也并不意味着执行这条指令就是一个原子操作,因为一条字节码指令在真正被执行时可能会转化为多条本地机器码指令,所以通过汇编代码而非字节码指令会更严谨一些,但此处使用字节码就已经能说明问题了)。

volatile的两个作用

前边我们一直在讲volatile能保证可见性,其实这只是它的作用之一,它还有另外一个作用就是禁止指令重排序优化。

从硬件架构上讲,指令重排序是指CPU采用了允许将多条指令不按程序规定的顺序分开发送给各相应电路单元处理。但并不是说指令任意重排,CPU需要能正确处理指令依赖情况以保障程序能得出正确的执行结果。

可以结合CPU流水线理解上边的概念。那指令重排序对程序的并发执行又有什么影响呢?为什么volatile要禁止指令重排序呢?下边通过DCL单例模式的代码来说明。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class Singleton {
private volatile static Singleton instance;
public static Singleton getInstance() {
if(instance == null) {
synchronized (Singleton.class) {
if(instance == null)
instance = new Singleton();
}
}
return instance;
}
private Singleton(){}
public void doSomething() {
//do something
}
}

如果instance没有使用volatile修饰,那么instance = new Singleton()对应的指令就有可能被重排序。该行代码起码有三个步骤需要完成:

  1. 在Java堆中分配一块内存用来存储Singleton实例;
  2. 在该内存块上初始化Singleton实例;
  3. 将该内存块对应的地址引用赋值给instance;

现在假设有线程A调用了Singleton.getInstance(),当执行到instance = new Singleton()这行代码时,经过指令重排序,上边第3个步骤被重排序到第2个步骤之前,并在执行了第3个步骤以后,将要执行第2个步骤之前,进行上下文切换至线程B,线程B再调用Singleton.getInstance()方法时,instance已经不等于null了,这时再继续调用doSomething()方法时,由于还没有初始化,所以会出现异常。虽然这种情况非常极端,但不是没有可能,所以只有使用volatile修饰instance的DCL单例模式才是线程安全的(在JDK1.5之前volatile还不能完全屏蔽指令重排序,所以即使使用volatile也不能保证DCL单例模式的线程安全,但在JDK1.5修复此问题之后完全是线程安全的)。

volatile的底层实现原理

这里所说的实现原理,即JVM是怎么实现volatile的可见性和禁止指令重排序这两个特性的。这时我们可以对比观察使用volatile和不使用volatile关键字时所生成汇编代码的差别,具体的代码不再贴出(纯汇编,贴出来意义不大不是重点),感兴趣可以自行实验。其中一个明显的差别是,在volatile变量每次赋值指令之后,都会紧随一条“lock add1 $0x0,(%esp)”指令(不一定是ESP寄存器,也可能是其他寄存器,取决于栈指针的指向)。这个操作就相当于传说中的内存屏障,以下引用《深入理解Java虚拟机》中的原话来说明该指令是如何保证可见性的:

当只有一个CPU访问内存时,并不需要内存屏障,但如果有两个或更多CPU访问同一块内存,且其中有一个在观测另一个,就需要内存屏障来保证一致性了。这条指令中的“add1 $0x0,(%esp)”(把ESP寄存器的值加0)显然是一个空操作(采用这个空操作而不是空操作指令nop是因为IA32手册规定lock前缀不允许配合nop指令使用),关键在于lock前缀,查询IA32手册,它的作用是使得本CPU的Cache写入了内存,该写入动作也会引起别的CPU或者别的内核无效化其Cache,所以通过这样一个空操作,可让前面volatile变量的修改对其他CPU立即可见。

这段引用的重点在于后半句,关于lock前缀的解释,也就是volatile是使用硬件指令来保证可见性的。那lock前缀又是怎么保证禁止指令重排序呢?依然通过引用《深入理解Java虚拟机》中的原话来说明:

从硬件架构上讲,指令重排序是指CPU采用了允许将多条指令不按程序规定的顺序分开发送给各相应电路单元处理。但并不是说指令任意重排,CPU需要能正确处理指令依赖情况以保障程序能得出正确的执行结果。譬如指令1把地址A中的值加10,指令2把地址A中的值乘以2,指令3把地址B中的值减去3,这时指令1和指令2是有依赖的,它们之间的顺序不能重排————(A+10)*2与A*2+10显然不相等,但指令3可以重排到指令1、2之前或者中间,只要保证CPU执行后面依赖到A、B值的操作时能获取到正确的A和B值即可。所以在本内CPU中,重排序看起来依然是有序的。因此,lock add1 $0x0,(%esp)指令把修改同步到内存时,意味着所有之前的操作都已经执行完成,这样便形成了“指令重排序无法越过内存屏障”的效果。

看了上边的引用,你可能还是一头雾水,这个解释给人的感觉像是只可意会不可言传一样,而且关于内存屏障,《深入理解Java虚拟机》一书并没有过多介绍,只说“重排序时不能把后面的指令重排序到内存屏障之前的位置”,但这个限制显然是不够的,难道屏障之前的代码就可以重排序了?那DCL单例模式依然是有问题的。应该有更严格的限制才对,比如绝对禁止屏障前后的代码重排序。经过进一步google,也找到一些说明,但由于太偏向底层,我也没有太深究,引用一篇博文中的话简单说明一下内存屏障:

with lock prefix, memory related instructions are processed specially to act as a memory barrier, similar to mfence instruction. Dave Dice explains in his blog that they benchmarked the 2 kinds of barrier, and the lock add seems the most efficient one on today’s architecture. So this barrier ensures that there is no reordering before and after this instruction and also drains all instructions pending into Store Buffer. After executing these instructions, all writes are visible to all other threads through cache subsystem or main memory. This costs some latency to wait for this drain.

后记

之前刚接触volatile时,看到过一个说法,volatile只适合简单的get和set方法,当时不太懂。这次在查内存屏障时,不知道在哪看到一句话,意思是对于volatile变量,单独的读操作和单独的写操作是原子操作,但任意的读写操作的组合不再是原子操作,这句话像是戳破了最后一张纸一样,一句话解释清楚了之前num++不是原子操作的原因,并给出了言简意赅的结论。这时再回头去查之前那个说法时才恍然大悟,看下面代码:

1
2
3
4
5
6
7
private volatile int i = 0;
public int get() {
return i;
}
public void set(int i) {
this.i = i;
}

如果我说,上边的get和set方法对于变量i的操作,不但是原子操作,而且两个方法之间还是同步的,即此处volatile的作用完全等同于synchronized的作用,你是不是需要思考一会?这么神奇吗?

先来解释为什么是同步的,我们可以先用熟悉的synchronized想一下同步效果是什么样。如果两个方法是synchronized修饰的,那多线程环境下,任一时刻只有一个线程在执行get和set方法,其他线程都处于阻塞排队状态,则,任一时刻,任一线程通过get方法得到的,一定是变量i此刻的最新值。这就是synchronized在此处达到的同步效果,是不是很熟悉?这不就是保证了变量i的可见性嘛。再来看此处volatile达到的同步效果,由于volatile语义使然,同样的,任一时刻,任一线程通过get方法得到的,一定是变量i此刻的最新值。

再来解释原子性,这时我们可以考虑一下如果变量i没有使用volatile修饰,结合我们上边说的内存模型,当一个线程A执行this.i=i时,其实至少是有两步的:1.对工作内存中的i进行写操作;2.把工作内存中的i同步回写到主内存中;那么当在这两步之间进行上下文切换至其他线程时,通过get方法将看不到i的最新值。而volatile可以保证this.i=i这行代码的原子性。此处的原子性可能和我们通常理解的原子性不太一样,因为我们之前遇到的原子性都是在机器码的层面上,而这里是由于JAVA的内存模型导致的。

这时我们不妨回头想一下当看到“两个方法之间还是同步的,即此处volatile的作用完全等同于synchronized的作用”这句话时为什么会有点诧异呢?因为我们通常理解的“同步”,更多的关注点在于“原子性”上,因为我们通常需要同步的操作不像上边的代码只有一行,而是一系列相关的操作。而此处的“同步”,更多的关注点在于“可见性”上,所以只能说我们对于“同步”的认识还不够深。

关于volatile的正确用法,还有很多需要学习,这里推荐一篇很好的博文《正确使用 Volatile 变量》,值得一看。最后通过引用《深入理解Java虚拟机》书中的话来结束此文,既作为对内存模型的总结,更作为对“同步”的正确认识和理解。

Java内存模型是围绕着并发过程中如何处理原子性、可见性和有序性这3个特征来建立的。
有序性:Java程序中天然的有序性可以总结为一句话:如果在线程内观察,所有的操作都是有序的;如果在一个线程中观察另一个线程,所有的操作都是无序的。前半句是指“线程内表现为串行的语义”,后半句是指“指令重排序”现象和“工作内存与主内存同步延迟”现象。

线程池全面解析

发表于 2016-04-04   |   分类于 Java   |     |   阅读次数

对于线程池,我在项目中只是简单的使用,大概知道JDK有四种线程池,但对于具体的实现和原理并不是很清楚,所以本文是在对线程池有一些了解之后,结合源码全面总结并讲解JDK线程池的实现和原理。但在开写之前呢,并没有预想到线程池有这么大的内容量,本想一下午看完,结果看了两三天,笔记也是越写越长,显得有些杂乱,可以根据目录按需选看。

为什么要使用线程池?

诸如 Web 服务器、数据库服务器、文件服务器或邮件服务器之类的许多服务器应用程序都面向处理来自某些远程来源的大量短小的任务。不管请求如何到达,服务器应用程序中经常出现的情况是:单个任务处理的时间很短而请求的数目却是巨大的。构建服务器应用程序的一个过于简单的模型应该是:每当一个请求到达就创建一个新线程,然后在新线程中为请求服务。然而对于原型开发这种方法工作得很好,但如果试图部署以这种方式运行的服务器应用程序,那么这种方法的严重不足就很明显。比如创建和销毁线程的时间比真正执行任务的时间还多、过多的线程创建造成系统不稳定甚至崩溃。

线程池为线程生命周期开销问题和资源不足问题提供了解决方案,使用线程池的好处有以下三点:

  1. 降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的消耗;
  2. 提高响应速度:当任务到达时,任务可以不需要的等到线程创建就能立即执行;
  3. 提高线程的可管理性:线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

最简单的线程池模型

至少要有两个集合,其中一个集合用来存放线程,另一个集合用来存放等待执行的任务。针对线程的个数控制、任务执行的顺序、没有可用线程或任务满了以后对应的策略,JDK提供了几种常用的、实现好的线程池供我们使用,下面详细讲解JDK的线程池(这里先抛出一个重要的问题,对于这个简单模型,我只说了它静态的存储结构,但它的动态操作,比如把任务交给线程来执行这个动作是怎么完成的?线程执行完以后又怎么达到线程复用的?在讲解完JDK的四种线程池以后,我会在此文最后通过源码来回答这些问题)。

JDK提供的线程池

UML图:

JDK线程池UML

标记一下比较重要的类:

ExecutorService真正的线程池接口
ScheduledExecutorService能和Timer/TimerTask类似,解决那些需要任务重复执行的问题
ThreadPoolExecutorExecutorService的默认实现
ScheduledThreadPoolExecutor继承ThreadPoolExecutor的ScheduledExecutorService接口实现,周期性任务调度的类实现

ThreadPoolExecutor构造参数

对于我们上边提到的JDK提供的几种常用的线程池,其实都是通过ThreadPoolExecutor构造的,只是构造参数不同导致各个线程池的特性不同,所以在介绍那几种具体的线程池之前,先来了解一下ThreadPoolExecutor各个构造参数的含义,下边是其构造函数代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

可以看到最多可以配置7个参数,下边一一讲解:

  1. corePoolSize:池中保存的所有线程的个数,包括空闲线程,也叫核心线程数;
  2. maximumPoolSize:池中允许的最大线程个数;
  3. keepAliveTime:当现有线程个数大于corePoolSize时,此为终止多余的空余线程等待新任务的最长时间;
  4. unit:keepAliveTime参数的时间单位;
  5. workQueue:用于保存待执行的任务的阻塞队列(BlockingQueue,(上边提到还要有一个集合用于存放线程,JDK并没有提供可设置的方法,默认使用HashSet)阻塞队列和普通队列的区别如下表,它多了一些可能会造成阻塞的操作,如取的时候队列是空的,存的时候队列是满的情况(内部使用的是ReentrantLock进行同步,所以可以选择使用公平锁,Condition作为信号量):

    \可能报异常返回布尔值/null可能阻塞可能阻塞指定时间
    入队add(e)boolean offer(e)put(e)offer(e, timeout, unit)
    出队remove(e)E poll()take()poll(timeout, unit)
    查看element()E peek()无无

    • 其中JDK提供了六种(前五个都extends AbstractQueue implements BlockingQueue,第六个的泛型为Runnable):
      1. ArrayBlockingQueue:一个基于数组结构的有界阻塞队列,大小不可动态扩展,遵循FIFO原则;内部实现有三个int变量,count用来判断满或空,putIndex用于指示下一个可以存放的位置,takeIndex用于指示下一个可以取的位置,所以写操作并没有造成复制移动。
      2. LinkedBlockingQueue:一个基于链表结构的无界阻塞队列,默认大小是MAX_VALUE,如果构造时指定了大小,则大小是不可动态扩展的,遵循FIFO原则;内部实现有静态内部类Node,空的头引用head(item==null),尾引用last,AtomicInteger count用来判断满或空。
      3. SynchronousQueue:一个不存储元素的阻塞队列,进出队列的顺序取决于是否采用公平模式,之所以不存储元素,是因为它的每一个put操作,都要阻塞等待一个poll操作,反之亦然,所以该队列不存在peek、遍历等操作;内部实现复杂,没看懂,类注释说明是对“dual stack and dual queue”的扩展实现,LIFO stack适用于非公平模式,FIFO queue适用于公平模式。
      4. PriorityBlockingQueue:一个基于平衡二叉堆(数组,n,2n+1,2n+2)的无界阻塞队列,进出队列的顺序取决于排序顺序,排序标准基于提供的Comparator或者元素的natural order,大小可以动态扩展,默认初始大小为11,最大限制为MAX_VALUE-8(避免某些VM OOM),存操作会对堆调整,取操作会取走堆顶元素(array[0]);
      5. DelayQueue:一个内部使用PriorityQueue(基于平衡二叉堆(数组,n,2n+1,2n+2))的无界阻塞队列,性质同上,其中Delayed是一个接口,内部只有一个方法long getDelay(TimeUnit unit),该接口还继承了Comparable接口。排序标准:队头是过期(即getDelay方法返回值小于等于0)时间最长的元素,即getDelay返回值最小的元素,如果没有过期的元素,也就没有队头,此时poll方法会返回null,take方法会阻塞,其他所有方法不检查是否过期,和普通队列一样。
      6. DelayedWorkQueue:一种特殊的DelayQueue(所以上述特性都适用),属于ScheduledThreadPoolExecutor的静态内部类,也是基于平衡二叉堆的无界阻塞队列(唯一的区别是,如果元素类型是ScheduledFutureTask的话,还会设置每个ScheduledFutureTask在堆中的heapIndex,提高其取消和删除的效率)。但其内部使用的数组类型不是泛型,而是写死的RunnableScheduledFuture,所以该队列的元素类型只能是RunnableScheduledFuture。但使用该队列时,必须声明为BlockingQueue,解释如下:
        1
        2
        3
        4
        5
        6
        7
        8
        9
        /**
        * Specialized delay queue. To mesh with TPE declarations, this
        * class must be declared as a BlockingQueue<Runnable> even though
        * it can only hold RunnableScheduledFutures.
        */
        static class DelayedWorkQueue extends AbstractQueue<Runnable>
        implements BlockingQueue<Runnable> {
        /*class code*/
        }
  6. threadFactory:创建新线程时使用的工厂,该接口只有一个方法:Thread newThread(Runnable r);使用的默认工厂只是对Thread进行了命名,原封不动的包装了Runnable的run方法,可以通过自己的实现达到一些自定义的包装效果;

  7. handler:当现有线程个数等于maximumPoolSize && 队列已满的时候,对新提交的任务的处理策略,JDK提供了四种可选的策略,对应的实现其实很简单,可以通过实现RejectedExecutionHandler接口自定义处理策略,该接口只有一个方法:

    1
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);

    下边详细看JDK实现的四种策略:

    • AbortPolicy:直接抛出异常,方法体如下:

      1
      2
      3
      4
      5
      public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
      throw new RejectedExecutionException("Task " + r.toString() +
      " rejected from " +
      e.toString());
      }
    • DiscardPolicy:没有任何处理,丢弃掉,方法体如下:

      1
      2
      public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
      }
    • DiscardOldestPolicy:取出并丢弃掉队列头的任务,然后重新添加,方法体如下:

      1
      2
      3
      4
      5
      6
      public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
      if (!e.isShutdown()) {
      e.getQueue().poll();
      e.execute(r);
      }
      }
    • CallerRunsPolicy:由提交该任务的线程直接执行该任务,方法体如下:

      1
      2
      3
      4
      5
      public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
      if (!e.isShutdown()) {
      r.run();
      }
      }

线程池的处理流程

线程池的处理流程
如上图所示,大体流程依次如下:

  1. 第一步 是 看线程池是否关闭 && corePoolSize是否满了;
  2. corePoolSize满了以后,第二步 是看 线程池是否关闭 && 队列是否满了(判断队列是否已满用的是上表中的offer(e)方法),此处对线程池的状态检查是double check,即添加到队列以后,会再次检查状态,如果此时线程池是关闭状态,则回滚刚才的添加动作。
  3. 队列满了以后,第三步是看 线程池是否关闭 && maximumPoolSize是否满了;
  4. maximumPoolSize满了以后,第四步,也是最后一步,再执行拒绝策略。
    特别要注意判断队列已满和maximumPoolSize已满的先后顺序

JDK提供的四种常用的线程池

提供这四种线程池的类是Executors,该类相当于一个工厂类,提供四种线程池和一些通用方法,现在的关注点是这四种线程池,前边已经提到它们都是通过ThreadPoolExecutor构造的,只是参数不同导致它们特性不同,下边通过源码进行讲解。

  1. FixedThreadPool:固定大小线程池

    1
    2
    3
    4
    5
    public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>());
    }

    可以看到,它的corePoolSize == maximumPoolSize,这有两点影响,1. 线程个数不会增长,即永远不会走上述流程中的第三步,当队列满了以后,直接执行拒绝策略;2. 不会存在大于corePoolSize的额外线程,所以线程池中的所有线程不会因为空闲时间太长而终止,当且仅当整个线程池ShutDown时才终止,所以第三、四个参数是无用的。同时也可以看到它使用的是无界阻塞队列,所以要么所有任务正常执行或等待执行,要么任务实在太多OOM造成系统崩溃,拒绝策略永远不会被执行。而且当有线程因为任务抛出异常而终止时,线程池会新建一个线程顶替它,不会出现线程泄露。

  2. SingleThreadPool:单线程池

    1
    2
    3
    4
    5
    6
    public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
    (new ThreadPoolExecutor(1, 1,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>()));
    }

    可以看到,它就是一个corePoolSize和maximumPoolSize都为1的FixedThreadPool。所以以上所说的性质也都适用于它。它可以保证任务的顺序执行。

  3. CachedThreadPool:缓存线程池

    1
    2
    3
    4
    5
    public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
    60L, TimeUnit.SECONDS,
    new SynchronousQueue<Runnable>());
    }

    可以看到,它的corePoolSize==0,maximumPoolSize==Integer.MAX_VALUE,队列使用的是SynchronousQueue,最长空闲时间是60s。所以它的性质如下:

    • 因为corePoolSize==0,所以当该线程池空闲足够长的时间时(单个线程最长空闲时间是60s),线程池中就没有存储任何线程了,没有任何消耗;
    • 因为corePoolSize==0,所以按照上述线程池的流程,每次到来新任务时,第一步,当前线程个数总是<corePoolSize的,所以新任务总会走第二步,尝试加到队列里。而又因为它使用的队列是SynchronousQueue,以及添加到队列的操作使用的是offer(e)方法,此方法只有当前正好有线程等待取数据时才会返回true,所以此处它永远返回false,即相当于队列永远是满的,所以新任务总会走第三步,而又因为maximumPoolSize==Integer.MAX_VALUE,所以对于到来的新任务,总会新建线程去执行它。要么所有任务正常执行,要么任务实在太多OOM造成系统崩溃,拒绝策略永远不会被执行,这和FixedThreadPool是一样的,但两者如果造成OOM的话,原因是不同的,FixedThreadPool是因为队列所占空间,CachedThreadPool是因为线程所占空间。
  4. ScheduledThreadPool:调度线程池

    1
    2
    3
    4
    5
    6
    7
    8
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
    new DelayedWorkQueue());
    }

    该线程池和以上线程池大不相同,需要另开一篇专门讲解。但其特性是,可以让任务延迟指定的时间执行,也可以让任务在指定的间隔时间内重复执行。要使用这些特性,就必须使用其新增的ScheduledFutureTask方法,而如果使用execute方法的话,它会包装成delay=0的任务按照延迟任务来执行。

JDK线程池实现原理

线程池的整体流程我们上边已经说了,但其中的细节和关键点并没有提及,现在通过源码来看其具体是怎么实现的。

  1. 其实上边说的整体流程,全部是在execute方法中控制的,源码如下,不再累述:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    public void execute(Runnable command) {
    if (command == null)
    throw new NullPointerException();
    /*
    * Proceed in 3 steps:
    *
    * 1. If fewer than corePoolSize threads are running, try to
    * start a new thread with the given command as its first
    * task. The call to addWorker atomically checks runState and
    * workerCount, and so prevents false alarms that would add
    * threads when it shouldn't, by returning false.
    *
    * 2. If a task can be successfully queued, then we still need
    * to double-check whether we should have added a thread
    * (because existing ones died since last checking) or that
    * the pool shut down since entry into this method. So we
    * recheck state and if necessary roll back the enqueuing if
    * stopped, or start a new thread if there are none.
    *
    * 3. If we cannot queue task, then we try to add a new
    * thread. If it fails, we know we are shut down or saturated
    * and so reject the task.
    */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
    if (addWorker(command, true))
    return;
    c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command))
    reject(command);
    else if (workerCountOf(recheck) == 0)
    addWorker(null, false);
    }
    else if (!addWorker(command, false))
    reject(command);
    }
  2. 要说明的是,线程池把其中的线程包装成一个Worker类,该类实现了Runnable接口,内部定义了两个重要的成员变量:

    • 一个Runnable firstTask成员变量用来保存要执行的任务,然后在自己的run方法中调用firstTask的run方法;
    • 成员变量Thread t,每个Worker就是在这个线程t中运行的,其中t是由构造函数中指定的线程工厂生产的。这个设计有点绕,因为我们一般写得Runnable都是在本类外new一个Thread来执行自己,但这个Worker是自己指定由哪个Thread来执行自己,控制权在自己手里,外界需要先从Worker这里获取到这个Thread,然后start。上图中的addWorker(command,true)方法就是构造一个Worker,然后添加到HashSet中,然后获取其成员变量Thread t,然后t.start()开始执行该任务(该方法略长,不上代码了,方法参数command就是要执行的任务Runnable类,第二个参数为true时,检查线程个数时上限以corePoolSize为准,否则以maximumPoolSize为准)。
  3. 那么问题来了,这个线程t执行完该任务以后,怎么避免终止而被复用呢?玄机就在Worker的run方法中,它不只是简单的调用了firstTask的run方法:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    public void run() {
    runWorker(this);
    }
    final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
    while (task != null || (task = getTask()) != null) {
    w.lock();
    // If pool is stopping, ensure thread is interrupted;
    // if not, ensure thread is not interrupted. This
    // requires a recheck in second case to deal with
    // shutdownNow race while clearing interrupt
    if ((runStateAtLeast(ctl.get(), STOP) ||
    (Thread.interrupted() &&
    runStateAtLeast(ctl.get(), STOP))) &&
    !wt.isInterrupted())
    wt.interrupt();
    try {
    beforeExecute(wt, task);
    Throwable thrown = null;
    try {
    task.run();
    } catch (RuntimeException x) {
    thrown = x; throw x;
    } catch (Error x) {
    thrown = x; throw x;
    } catch (Throwable x) {
    thrown = x; throw new Error(x);
    } finally {
    afterExecute(task, thrown);
    }
    } finally {
    task = null;
    w.completedTasks++;
    w.unlock();
    }
    }
    completedAbruptly = false;
    } finally {
    processWorkerExit(w, completedAbruptly);
    }
    }

    到这里可以清楚的看到,task的run方法是在一个while循环中被调用的,这个while循环就是达到复用的关键。循环条件中的getTask()就是从队列中取任务。至此才算真相大白,线程池中的每个线程,在执行完任务以后,都会循环去取队列中的任务接着执行,就是这样实现线程复用的。
    这里也可以注意到,除了调用task的run方法以外,还有一些额外的操作,在文章一开始提到的线程监控功能,就是在这里完成的,JDK提供了一些默认监控参数,我们也可以通过继承重写beforeExecute和afterExecute方法实现自定义的监控,比如执行时间等记录,以下是JDK提供的参数:

    • taskCount:线程池需要执行的任务数量。
    • completedTaskCount:线程池在运行过程中已完成的任务数量。小于或等于taskCount。
    • largestPoolSize:线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过。如等于线程池的最大大小,则表示线程池曾经满了。
    • getPoolSize:线程池的线程数量。如果线程池不销毁的话,池里的线程不会自动销毁,所以这个大小只增不减。
    • getActiveCount:获取活动的线程数。
  4. 如果你还记得前边Exexutors构造参数的话,你应该注意到我们还剩最后一个问题没有讲清楚,那就是当线程池中没有可执行的任务时,是怎么控制多余线程在Timeout时间后终止的?机智如你一定发现上图中,如果while循环条件失败,即对应没有可执行的任务,while循环之后还有一个finally语句块,玄机应该就在这个processWorkerExit方法吧:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
    decrementWorkerCount();
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
    completedTaskCount += w.completedTasks;
    workers.remove(w);
    } finally {
    mainLock.unlock();
    }
    tryTerminate();
    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
    if (!completedAbruptly) {
    int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
    if (min == 0 && ! workQueue.isEmpty())
    min = 1;
    if (workerCountOf(c) >= min)
    return; // replacement not needed
    }
    addWorker(null, false);
    }
    }

    看完这个方法的源码我是有点意外的,和我预想的处理方式完全不一样,接下来一步步讲:

    1. 通过参数completedAbruptly的注释我们可以知道它==true时表示用户任务是由于用户代码抛出异常而终止的(这和runWorker方法中对其赋值是对应的),该方法做的第一件事是判断任务结束的原因,如果是因为用户代码抛出异常,那此时执行该任务的线程已经因为异常而终止了,接着decrementWorkerCount()是通过CAS执行线程个数减1操作;
    2. 该方法做的第二件事,加锁,把该Worker完成的任务个数,汇总到整个线程池完成的任务个数上(开个小岔,这么做难道不会造成监控参数completedTaskCount的准确性延迟吗?其实并不会,因为当我们通过get方法获取该参数时,会加锁遍历所有的Worker汇总已完成的任务个数),然后从工作线程集合HashSet中去除该Worker。是的!这里没有判断是否超时,直接删了,而且要注意,这里是通过HashSet的remove方法删的,并没有对线程个数进行修改!先别急,接着往下看;
    3. 该方法做的第三件事,tryTerminate(),通过该方法的注释可以了解到,这是一个任何牵扯到线程个数减少或从队列中取任务的操作都要做的保险措施,不是重点,接着往下看;
    4. 第四件事才是关于线程个数维护的,首先检查线程池状态,如果没有关闭,接下来的分支很有意思:a)如果用户任务是因为异常终止的,就直接通过addWorker添加一个没有任务的工作线程顶替因异常挂掉的线程;b)如果用户任务是正常运行结束的,就根据allowCoreThreadTimeOut设置最少线程个数,然后检查当前是否少于最少个数,不少的话方法就结束了,否则通过addWorker添加一个没有任务的工作线程。这里可以再回顾一下上边addWorker方法的源码,该方法新建包装了一个工作线程,然后start线程开始通过runWorker方法执行任务,但这里任务是空的,也就是runWorker方法里while循环直接结束了,然后又会回到这里,如果一直没有新任务到来的话,线程池就是这样,循环往复的创建线程,删除线程来维护线程个数的。
      看到这,如果上边讲得你已经理解消化了,你可能还会有疑问:如果我们在线程池空闲时,在它删除一个线程之后,创建充数的线程之前,获取这个时刻的线程个数的话,会不会少于corePoolSize呢?答案是不会的!该方法唯一一处对线程个数有操作的是,当线程是异常终止时,对线程个数减1操作。其他所有情况,空闲时只是不断的创建和删除线程实体,但并没有修改线程个数。
      这个方法看似分支很少,其实处理了很多种情况,可以通过下边的流程图梳理一下:
      processWorkerExit流程图
  5. 看到这里的时候,你如果还记得我们一开始的问题,应该会反应过来,这个processWorkerExit方法只是单纯的控制线程个数,但并没有依据keepAliveTime指定的空闲时间来裁决线程的去留啊!!我一开始也是笃定时间判断控制是在这里的!如果不在这里,那究竟在哪里呢?直接查看都有哪些方法调用过keepAliveTime参数就知道了:
    keepAliveTime调用
    通过排除法我们可以确定,原来是在被我们忽视的getTask()方法里!!真是没想到啊!这个方法其他的源码我们可以不看,只用看一行我们就什么都明白了:

    1
    2
    3
    Runnable r = timed ?
    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
    workQueue.take();

    原来是在从队列中取任务的时候做到时间控制的!一开始我们可能是想不到的,但看到这里,仔细一想,放在这里真是再合适不过了:当一个线程在取任务的时候,说明这个线程是空闲的,当它从队列中取任务超时而失败的时候,它就已经超过了空闲时间上限了,这时候的流程是:runWorker方法中的while循环条件失败,走进processWorkerExit方法做最后的线程个数控制,简直完美!

  6. 参考
    1. http://ifeve.com/java-threadpool/
    2. http://blog.csdn.net/it_man/article/details/7193727
    3. http://www.oschina.net/question/565065_86540
    4. https://www.ibm.com/developerworks/cn/java/j-jtp0730/

Hello World

发表于 2016-04-04   |     |   阅读次数

Welcome to Hexo! This is your very first post. Check documentation for more info. If you get any problems when using Hexo, you can find the answer in troubleshooting or you can ask me on GitHub.

Quick Start

Create a new post

1
$ hexo new "My New Post"

More info: Writing

Run server

1
$ hexo server

More info: Server

Generate static files

1
$ hexo generate

More info: Generating

Deploy to remote sites

1
$ hexo deploy

More info: Deployment

12
ksni88

ksni88

15 日志
5 分类
23 标签
Links
  • 海神
  • 杭城梦少
© 2016 ksni88
由 Hexo 强力驱动
主题 - NexT.Pisces