Earth Guardian

You are not LATE!You are not EARLY!

0%

Android Handler 机制

Handler 机制是 Android 系统处理同一进程不同线程间通信的机制,基于 Linux 系统的 epoll 机制实现。

Android 两大机制:Binder 机制用于处理进程间通信;Handler 机制用于处理进程内的线程间通信,线程间交互。

概念

  • Message
    意为消息,发送到 Handler 进行处理的对象,携带描述信息和任意数据。
  • MessageQueue
    意为消息队列,Message 的集合。
  • Looper
    消息泵,用来从 MessageQueue 中抽取 Message,发送给 Handler 进行处理。
  • Handler
    处理 Looper 抽取出来的 Message

也就是说所有的消息处理都是串行执行的。

他们之间的关系:

  • 每个 Thread 只对应一个 Looper
  • 每个 Looper 只对应一个 MessageQueue
  • 每个 MessageQueue 中有 NMessage
  • 每个 Message 最多指定一个 Handler 来处理事件
  • 每个 Thread 可以对应多个 Handler

在如下操作中都是基于 UI 主线程,在异步任务中使用 Handler 机制更新 UI 必须用 new Handler(); 来初始化。

1
2
3
// 默认使用 UI 主线程的 Looper
Handler mHandler = new Handler();
mHandler.post(new Runnable(){});

源码目录及类对应文件

源码目录结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// Framework Java
frameworks/base/core/java/android/os/
Message.java
MessageQueue.java
Looper.java
Handler.java

// Framework JNI
frameworks/base/core/jni/
android_os_MessageQueue.h
android_os_MessageQueue.cpp

// System Native
system/core/libutils/include/utils/Looper.h
system/core/libutils/Looper.cpp

类或接口对应文件

1
2
3
4
5
6
7
8
Message.java: Message
MessageQueue.java: MessageQueue
Looper.java: Looper
Handler.java: Handler
Looper.h: Message, MessageHandler, WeakMessageHandler, LooperCallback,
SimpleLooperCallback, Looper
android_os_MessageQueue.h: MessageQueue
android_os_MessageQueue.cpp: NativeMessageQueue

Message 详解

MessageHandler 机制中的数据容器,和 Binder 机制中的 Parcel 功能一样。Message 数据结构特点:

  • 链表
    Message 是单链表数据结构,成员变量 next 保存下一条消息,而当前消息是链表的头结点。Message 中的消息池 Message sPool 利用链表头结点特性实现栈,快速存取消息;MessageQueue 中的成员变量 Message mMessages 利用链表结构实现先进先出队列,确保先进入的消息先被处理。
  • 存储简单数据
    成员变量 arg1, arg2 用来存储简单整型数据。
  • 存储复杂数据
    成员变量 Object obj, Bundle data 用来存储复杂数据,其中 Bundle 能够存放键值对。

源码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
public final class Message implements Parcelable {
// 消息唯一标记
public int what;
// 存储简单数据
public int arg1;
public int arg2;
public Object obj;
// Messenger 用于跨进程通信
public Messenger replyTo;
public int sendingUid = -1;

// 存储复杂数据,可以通过 get/set Data 来设置
/*package*/ Bundle data;
public Bundle getData() {
if (data == null) {
data = new Bundle();
}

return data;
}
public Bundle peekData() {
return data;
}
public void setData(Bundle data) {
this.data = data;
}

// 消息池是单向链表,最大长度为 50
/*package*/ Message next;
private static Message sPool;
private static final int MAX_POOL_SIZE = 50;

// 构造方法,但是强烈推荐使用 Obtain 来获取
// Obtain 会通过消息池来重复使用 Message
public Message() {}
public static Message obtain(Handler h){...}
public static Message obtain(Handler h, int what){...}
public static Message obtain(Handler h, Runnable callback){...}
public static Message obtain(Message orig){...}
public static Message obtain(Handler h, int what,
int arg1, int arg2, Object obj){...}
public static Message obtain(Handler h, int what,
int arg1, int arg2){...}
public static Message obtain(Handler h, int what, Object obj){...}
public static Message obtain() {
synchronized (sPoolSync) {
if (sPool != null) {
Message m = sPool;
sPool = m.next;
m.next = null;
m.flags = 0; // clear in-use flag
sPoolSize--;
return m;
}
}
return new Message();
}

// 消息回收,重置当前消息,并进入消息池
public void recycle() {
if (isInUse()) {
if (gCheckRecycle) {
throw ...
}
return;
}
recycleUnchecked();
}
void recycleUnchecked() {
flags = FLAG_IN_USE;
what = 0; arg1 = 0; arg2 = 0; obj = null;
replyTo = null; sendingUid = -1; when = 0;
target = null; callback = null; data = null;

synchronized (sPoolSync) {
if (sPoolSize < MAX_POOL_SIZE) {
next = sPool;
sPool = this;
sPoolSize++;
}
}
}

// 消息投递的绝对时间
/*package*/ long when;
public long getWhen() {return when;}

// target 通常保存处理该消息的 Handler
/*package*/ Handler target;
public void setTarget(Handler target) {
this.target = target;
}
public Handler getTarget() {
return target;
}

// 如果消息中定义了 callback
// Handler 在处理消息时,直接回调该 callback,不会再执行 handleMessage
/*package*/ Runnable callback;
public Runnable getCallback() {
return callback;
}

// 消息标识
/*package*/ static final int FLAG_IN_USE = 1 << 0;
/*package*/ static final int FLAG_ASYNCHRONOUS = 1 << 1;
/*package*/ static final int FLAGS_TO_CLEAR_ON_COPY_FROM = FLAG_IN_USE;
/*package*/ int flags;
// 是否正在使用
/*package*/ boolean isInUse() {
return ((flags & FLAG_IN_USE) == FLAG_IN_USE);
}
/*package*/ void markInUse() {
flags |= FLAG_IN_USE;
}
// 是否为异步消息
public boolean isAsynchronous () {...}
public void setAsynchronous (boolean async) {...}

// 消息对象浅拷贝
public void copyFrom (Message o) {...}
// Parcelable 相关
public static final Parcelable.Creator<Message> CREATOR
= new Parcelable.Creator<Message>() {...}
public int describeContents() {...}
public void writeToParcel(Parcel dest, int flags) {...}
private void readFromParcel(Parcel source) {...}
...
}

快速查看 Message API

消息池 sPool

Message 中使用了单向链表结构的消息池,sPool 总是指向链表顶部,所以这个链表模拟的是栈结构,即消息池的数据结构为链表实现的栈。

  • 消息池的创建
    源码中可以看出,消息在被回收后 recycleUnchecked(),将当前消息加入到消息池链表中,即消息池才开始有可以重复利用的 Message ;创建消息池使用了 synchronized 关键字,确保线程池的操作是同步的。如果系统中,同时有很多消息在被传递,当一部分消息使用结束后都会被回收,此时消息池会积蓄的越来越多。
  • 消息的重复利用
    创建消息时推荐使用 Message.obtain() 方式,因为每次 obtain 时,都会判断消息池中是否有可以循环利用的消息,如果存在则取出并清空消息标记,否则才新建一个消息。

实例化一个消息,请使用 Message.obtain() ,充分利用消息池循环利用的特点。

重要成员变量

  • long when
    消息投递的绝对时间(投递时当前时间+设置的延迟时间),在 Handler 发送消息或发布任务时,指定具体的值。MessageQueue 在消息加入队列时,会根据 when 值决定消息在队列中的顺序。
  • Handler target
    保存处理该消息的 Handler ,在 Handler.enqueueMessage() 中将 Message 和对应处理该消息的 Handler 关联起来。
  • Runnable callback
    Handler.java 的分析中得出结论:消息的处理有三个优先级,可以直接使用 Message.callback 来进行消息处理。

消息标识 flags

成员变量 flags 有如下四个值:

  • 0
    消息标识清除。表示该消息被创建 new Message()、或者是从消息池中新取出的 Message.obtain()。可以理解是一个新消息,能修改当前消息内容。
  • FLAG_IN_USE
    消息正在被使用。表示消息进入了消息池栈 sPool中、或者进入了消息队列 MessageQueue 中。在消息池中,表示被回收了的消息正在被消息池管理,可以被取出循环利用;在消息队列中,表示消息被投递,正等待被取出处理。
  • FLAG_ASYNCHRONOUS
    表示异步消息;消息默认都是同步的,只能在 MessageHandler 构造方法中,特别指定为异步消息。如果设置了同步屏障,异步消息优先级将高于同步消息;消息队列 MessageQueue.next() 在取出消息时,遇到同步屏障会暂停所有的同步消息,将异步消息取出并处理,直到移除同步屏障。所以异步消息通常和同步屏障配合使用,同步屏障详细分析见 MessageQueue 中的分析。
  • FLAGS_TO_CLEAR_ON_COPY_FROM
    它的值和 FLAG_IN_USE 一样,只有在 copyFrom() 方法中会用到。

Messenger 相关

Message 包含具体的数据信息,Messenger 是一名信差,用于进程间发送指定的 Message 。实质上 Messenger 使用 AIDLHandler 机制来实现进程间的异步通信。详细参考Messenger 详解

  • Messenger replyTo
    回复此消息的回调信差,跨进程时通信时,可以利用 Message 中携带的信差,完成消息回复。
  • sendingUid
    跨进程发送异步消息的进程 ID

Looper 详解

Looper 类设计原则:此类包含基于 MessageQueue 设置和管理事件循环所需的代码。影响队列状态的 API 应该在 MessageQueue, Handler 上定义,而不是在 Looper 本身上定义。例如在队列上定义 idle handlerssync barriers,而在 Looper 上定义准备线程,循环和退出。
Looper 类的主要功能:

  • 提供线程上下文环境,创建与线程绑定的 Looper
  • 创建 MessageQueue
  • loop 循环从 MessageQueue 中获取下一条消息(若无消息线程阻塞等待),指派 Handler 处理消息并回收消息
  • 提供 loop 循环退出方法

源码分析

Looper 类中大部分都是 static 的变量和方法,不能直接实例化,通常都是固定格式来初始化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
public final class Looper {
private static final String TAG = "Looper";

// sThreadLocal.get() will return null unless you've called prepare().
static final ThreadLocal<Looper> sThreadLocal=new ThreadLocal<Looper>();
private static Looper sMainLooper; // guarded by Looper.class

final MessageQueue mQueue;
final Thread mThread;
...

public static void prepare() {
prepare(true);
}

private static void prepare(boolean quitAllowed) {
if (sThreadLocal.get() != null) {
throw new RuntimeException(
"Only one Looper may be created per thread");
}
sThreadLocal.set(new Looper(quitAllowed));
}

/**
* Initialize the current thread as a looper, marking it as an
* application's main looper. The main looper for your application
* is created by the Android environment, so you should never need
* to call this function yourself. See also: {@link #prepare()}
*/
public static void prepareMainLooper() {
prepare(false);
synchronized (Looper.class) {
if (sMainLooper != null) {
throw new IllegalStateException(
"The main Looper has already been prepared.");
}
sMainLooper = myLooper();
}
}

// Returns the application's main looper,
// which lives in the main thread of the application.
public static Looper getMainLooper() {
synchronized (Looper.class) {
return sMainLooper;
}
}

public static @Nullable Looper myLooper() {
return sThreadLocal.get();
}

public static @NonNull MessageQueue myQueue() {
return myLooper().mQueue;
}

private Looper(boolean quitAllowed) {
mQueue = new MessageQueue(quitAllowed);
mThread = Thread.currentThread();
}

public static void loop() {
final Looper me = myLooper();
if (me == null) {throw new RuntimeException("...");}
final MessageQueue queue = me.mQueue;
...
for (;;) {
// 线程会阻塞
Message msg = queue.next(); // might block
if (msg == null) {
// No message indicates that the message queue is quitting.
return;
}

...
msg.target.dispatchMessage(msg);
...
msg.recycleUnchecked();
}
}
public void quit() {
mQueue.quit(false);
}
public void quitSafely() {
mQueue.quit(true);
}
...
}

快速查看 Looper API

重要成员变量

  • sThreadLocal
    结论:一个线程只能对应一个 Looper 。从源码中可以看到,静态变量 sThreadLocal 存储了所有线程对应的 Looper ,而 ThreadLocal 中数据存储的数据结构是一个定制的哈希表,其 key 值是当前线程。
  • sMainLooper
    当前应用主线程对应的 Looper ,在应用对应的主线程 ActivityThread.main() 中初始化。通常我们在 Activity 中获取到的主线程 Looper 对应的就是 sMainLooper

Looper 的初始化

代码中可以看到,Looper 的构造方法是私有的,客户端通常使用 Looper.prepare(...) 来实例化,并初始化应用场景。而 Looper.prepareMainLooper() 是用来初始化主线程环境的,整个 Android Framework 中只有 ActivityThread, SystemServer 这两个带有 main() 方法的类调用过,而这两个类分别用来开启应用主线程和系统主线程。
Looper 的构造方法中,初始化了 mQueuemThread

  • mQueue
    Handler 机制的消息队列 MessageQueue 就是在 Looper 的构造方法中实例化的。
  • mThread
    赋值的是当前线程:可以是主线程或者工作线程。

Looper.loop()

流程非常简单,进入无限循环并不停的从消息队列中获取消息,而获取消息的过程可能会阻塞。 Looper 线程拿到消息后,执行消息处理 Handler.dispatchMessage() 并回收已经处理过的消息 Message.recycleUnchecked()

Looper 退出

Looper 通常是通过 Looper.loop() 进入无线循环,从消息队列中取出消息并处理。Handler 机制为工作线程提供了退出方法 quit/quitSafely ,调用 MessageQueue.quit() 来结束并退出。只要调用了 quit/quitSafely,不管是正在退出还是已经退出,Looper 就不再接收新的消息。Handler 发送的消息,在 MessageQueue.enqueueMessage 中直接返回 false,不做任何处理。

主线程的 Looper 是不允许退出的,在 MessageQueue 的构造方法中设定。prepareMainLooper 调用的是 prepare(false),即不能退出。

典型工作线程流程

Looper 线程典型的实现示例如下,将 prepare()loop() 的分离,来创建一个与 Looper 通信的 Handler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class LooperThread extends Thread {
public Handler mHandler;

public void run() {
Looper.prepare();

mHandler = new Handler() {
public void handleMessage(Message msg) {
// process incoming messages here
}
};

Looper.loop();
}
}

Handler 详解

Handler 子类必须是 static 的,避免潜在的内存泄露,在构造方法中可以打开开关做检测。
Handler 类将 Message, Looper, MessageQueue 串起来,给外部提供完整的接口,发送并处理消息。它管理的消息队列 MessageQueueLooper 的成员变量;而 Looper 可以是主线程,也可以是工作线程,在 Handler 初始化时指定。主要功能包含:

  • 初始化一个消息
  • 发布任务或者发送消息
  • 回到 Looper 所在线程处理消息
  • 移除消息或其回调

源码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
public class Handler {
...
final Looper mLooper;
final MessageQueue mQueue;
final Callback mCallback;
final boolean mAsynchronous;

public interface Callback {
public boolean handleMessage(Message msg);
}
private static void handleCallback(Message message) {
message.callback.run();
}

// Subclasses must implement this to receive messages.
public void handleMessage(Message msg) {
}

// 消息处理入口,三个优先级:
// 1. 如果 Message 中定义了 Callback,直接处理
// 2. 否则如果 Handler 中定义了 Callback,调用处理
// 3. 最后才是子类重写的 handleMessage
public void dispatchMessage(Message msg) {
if (msg.callback != null) {
handleCallback(msg);
} else {
if (mCallback != null) {
if (mCallback.handleMessage(msg)) {
return;
}
}
handleMessage(msg);
}
}

// 构造方法,最终都是初始化 4 个成员变量
public Handler() {this(null, false);}
public Handler(Callback callback) {this(callback, false);}
public Handler(Looper looper) {this(looper, null, false);}
public Handler(Looper looper, Callback callback) {
this(looper, callback, false);
}
public Handler(boolean async) {this(null, async);}
public Handler(Callback callback, boolean async) {
// 检查子类实例化时,是否存在内存泄露
if (FIND_POTENTIAL_LEAKS) {
final Class<? extends Handler> klass = getClass();
if ((klass.isAnonymousClass() || klass.isMemberClass()
|| klass.isLocalClass()) &&
(klass.getModifiers() & Modifier.STATIC) == 0) {
Log.w(TAG, "The following Handler class should be static " +
"or leaks might occur: " + klass.getCanonicalName());
}
}

mLooper = Looper.myLooper();
if (mLooper == null) {
throw new RuntimeException("Can't create handler inside " +
" thread that has not called Looper.prepare()");
}
mQueue = mLooper.mQueue;
mCallback = callback;
mAsynchronous = async;
}
public Handler(Looper looper, Callback callback, boolean async) {
mLooper = looper;
mQueue = looper.mQueue;
mCallback = callback;
mAsynchronous = async;
}

// 生成对应的消息 Message,但都是对 Message.obtain 的封装
public final Message obtainMessage(){return Message.obtain(this);}
public final Message obtainMessage(int what){
return Message.obtain(this, what);
}
public final Message obtainMessage(int what, Object obj){
return Message.obtain(this, what, obj);
}
public final Message obtainMessage(int what, int arg1, int arg2){
return Message.obtain(this, what, arg1, arg2);
}
public final Message obtainMessage(int what, int arg1, int arg2,
Object obj){
return Message.obtain(this, what, arg1, arg2, obj);
}
private static Message getPostMessage(Runnable r) {
Message m = Message.obtain();
m.callback = r;
return m;
}
private static Message getPostMessage(Runnable r, Object token) {
Message m = Message.obtain();
m.obj = token;
m.callback = r;
return m;
}

// 发布任务,最终都是调用的 enqueueMessage
public final boolean post(Runnable r) {
return sendMessageDelayed(getPostMessage(r), 0);
}
public final boolean postAtTime(Runnable r, long uptimeMillis){
return sendMessageAtTime(getPostMessage(r), uptimeMillis);
}
public final boolean postAtTime(Runnable r, Object token,
long uptimeMillis){
return sendMessageAtTime(getPostMessage(r, token), uptimeMillis);
}
public final boolean postDelayed(Runnable r, long delayMillis) {
return sendMessageDelayed(getPostMessage(r), delayMillis);
}
public final boolean postAtFrontOfQueue(Runnable r) {
return sendMessageAtFrontOfQueue(getPostMessage(r));
}
public final boolean sendMessage(Message msg){
return sendMessageDelayed(msg, 0);
}
public final boolean sendEmptyMessage(int what){
return sendEmptyMessageDelayed(what, 0);
}
public final boolean sendEmptyMessageDelayed(int what,long delayMillis){
Message msg = Message.obtain();
msg.what = what;
return sendMessageDelayed(msg, delayMillis);
}
public final boolean sendEmptyMessageAtTime(int what,long uptimeMillis){
Message msg = Message.obtain();
msg.what = what;
return sendMessageAtTime(msg, uptimeMillis);
}
public final boolean sendMessageDelayed(Message msg, long delayMillis){
if (delayMillis < 0) {
delayMillis = 0;
}
return sendMessageAtTime(msg, SystemClock.uptimeMillis()+delayMillis);
}
public boolean sendMessageAtTime(Message msg, long uptimeMillis) {
MessageQueue queue = mQueue;
if (queue == null) {
RuntimeException e = new RuntimeException(
this + " sendMessageAtTime() called with no mQueue");
Log.w("Looper", e.getMessage(), e);
return false;
}
return enqueueMessage(queue, msg, uptimeMillis);
}
public final boolean sendMessageAtFrontOfQueue(Message msg) {
MessageQueue queue = mQueue;
if (queue == null) {
RuntimeException e = new RuntimeException(
this + " sendMessageAtTime() called with no mQueue");
Log.w("Looper", e.getMessage(), e);
return false;
}
return enqueueMessage(queue, msg, 0);
}

private boolean enqueueMessage(MessageQueue queue, Message msg,
long uptimeMillis) {
// Message 和 Handler 关联
msg.target = this;
if (mAsynchronous) {
// 设置异步消息
msg.setAsynchronous(true);
}
return queue.enqueueMessage(msg, uptimeMillis);
}

// 移除 Message 和 Messa.callback,最终都是在 MessageQueue 中移除
public final void removeCallbacks(Runnable r){
mQueue.removeMessages(this, r, null);
}
public final void removeCallbacks(Runnable r, Object token){
mQueue.removeMessages(this, r, token);
}
public final void removeMessages(int what) {
mQueue.removeMessages(this, what, null);
}
public final void removeMessages(int what, Object object) {
mQueue.removeMessages(this, what, object);
}
public final void removeCallbacksAndMessages(Object token) {
mQueue.removeCallbacksAndMessages(this, token);
}

// Messenger 相关
IMessenger mMessenger;
final IMessenger getIMessenger() {
synchronized (mQueue) {
if (mMessenger != null) {
return mMessenger;
}
mMessenger = new MessengerImpl();
return mMessenger;
}
}

private final class MessengerImpl extends IMessenger.Stub {
public void send(Message msg) {
msg.sendingUid = Binder.getCallingUid();
Handler.this.sendMessage(msg);
}
}

// 阻塞,同步执行任务
public final boolean runWithScissors(final Runnable r, long timeout) {
if (r == null) {
throw new IllegalArgumentException("runnable must not be null");
}
if (timeout < 0) {
throw new IllegalArgumentException(
"timeout must be non-negative");
}

if (Looper.myLooper() == mLooper) {
r.run();
return true;
}

BlockingRunnable br = new BlockingRunnable(r);
return br.postAndWait(this, timeout);
}
private static final class BlockingRunnable implements Runnable {
private final Runnable mTask;
private boolean mDone;

public BlockingRunnable(Runnable task) {
mTask = task;
}

@Override
public void run() {
try {
mTask.run();
} finally {
synchronized (this) {
mDone = true;
notifyAll();
}
}
}

public boolean postAndWait(Handler handler, long timeout) {
if (!handler.post(this)) {
return false;
}

synchronized (this) {
if (timeout > 0) {
final long expirationTime = SystemClock.uptimeMillis()
+ timeout;
while (!mDone) {
long delay = expirationTime
- SystemClock.uptimeMillis();
if (delay <= 0) {
return false; // timeout
}
try {
wait(delay);
} catch (InterruptedException ex) {
}
}
} else {
while (!mDone) {
try {
wait();
} catch (InterruptedException ex) {
}
}
}
}
return true;
}
}
...
}

快速查看 Handler API

构造方法及成员变量

Handler 的构造方法可以看出来,实例化时主要对 Looper, MessageQueue, Callback, mAsynchronous 四个成员变量的赋值。

  • Looper
    Looper 如果来自于主线程,则不需要通过构造方法赋值;如果来自于工作线程,需要先执行 Looper.prepare 初始化 Looper 使用环境,并作为参数传递到构造方法中。
  • MessageQueue
    Looper 值确定后,消息队列即为其成员变量 mQueue
  • Callback
    可以直接通过 Callback 处理消息事件;特别是不需要自定义 Handler 子类来重写 handleMessage 时,默认为 null
  • mAsynchronous
    是否允许异步执行,默认值为 false 即是同步执行的。它最终会被设置到对应的 Message.flags = FLAG_ASYNCHRONOUS,在 Handler 中被没有其他作用。

自定义 Handler 的子类在实例化时,为了避免内存泄露,通常将子类设置为 static 类型,也就是避免内部类引用当前 Activity 对象。

生成消息

Handler 提供了生成消息的方法,但都是对 Message.obtain 的封装,所以在使用消息前,推荐直接使用 Message 类来生成。

任务发布

不管是通过 post***() 还是 send***() 方法来发布任务或者消息,最终都是调用的 enqueueMessage(),即将 Message 加入到消息队列中 MessageQueueenqueueMessage() 的两个重要动作:

  • HandlerMessage 的关联
    Message 的成员变量 target 存储了处理该消息的 Handler ,在消息入队时,将两者关联起来。
  • 设置异步消息
    Handler.mAsynchronous 成员变量,在入队时将对应的 Message 设置为异步。

消息处理

dispatchMessageHandler 消息处理的入口方法,有三个优先级来处理消息:

  • 优先:如果 Message 中定义了 Callback
  • 其次:如果 Handler 中定义了 Callback
  • 最后:才是调用子类重写的 handleMessage

如果是自定义 Handler 子类,必须重写 handleMessage 来实现消息处理。而消息处理是在 Looper 所在线程来处理的,Looper 如果来自于主线程则在主线程中处理消息;如果来自于工作线程则在工作线程中处理消息。

移除消息

remove***() 系列的方法,是指移除 Message 或者 Message.callback;最终调用的是从 MessageQueue 中移除对应消息。

Messenger 相关

Handler 中实现了 Messenger 用于进程间异步通信的代码。源码中可以看出 Messenger 通过 AIDL 来实现的进程间通信的;而通过 Handler.sendMessage 来实现异步通信。详细参考Messenger 详解

BlockingRunnable 阻塞并同步执行任务

BlockingRunnable 同步等待指定任务执行完成才会返回,通过 Object.wait/notify 实现同步阻塞功能。Handler.runWithScissors 方法是具体的实现,它是一个 @hide 方法,客户端 App 并不能直接调用若。该方法是如果调用线程与 Looper 所在线程如果在同一个线程,直接执行完后退出;如果不是同一线程,可以简单的实现 timeout,方法调用以后会阻塞,直到传入的 runnable 结束或者是 timeout

MessageQueue 详解

MessageQueue 的内部存储了一组消息,以队列的形式对外提供插入和删除的工作。采用单链表的数据结构来存储消息列表,按照消息发生的绝对时间 Message.when 排序。所以消息队列并不是绝对的先进先出队列,是按照消息发生时间先进先出。Looper.loop 会进入无限循环,而它会调用 MessageQueue.next 方法取出下一条消息;当有消息时,取出交给 Handler 处理;当没有消息时,当前线程便会阻塞在 next() 方法的 nativePollOnce() 中,当线程便会释放 CPU 资源进入休眠状态,直到下个消息到达或者有事务发生时,唤醒当前线程;这里的线程休眠唤醒,涉及到 Linux epoll 机制,后面会详细介绍。
MessageQueue 是在多线程环境下操作,所以绝大部分的方法都使用了 synchronized(this) 关键字;是 Handler 机制的核心类,native 代码在 android_os_MessageQueue.cpp 中实现;MessageQueue 还能监听文件描述符,当发生变化时做出相应事件处理,但是 Handler 机制中并没有使用该功能,本文不做介绍。
MessageQueue 主要功能:

  • 初始化 native 环境
  • Message 加入队列
  • 调用 native 方法,当前线程阻塞等待,直到从队列中取出 Message
  • 多线程环境中维护消息队列入队、出队、销毁

源码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
public final class MessageQueue {
...
// True if the message queue can be quit.
private final boolean mQuitAllowed;
private boolean mQuitting;

private long mPtr; // used by native code

Message mMessages;
private final ArrayList<IdleHandler> mIdleHandlers
= new ArrayList<IdleHandler>();
private SparseArray<FileDescriptorRecord> mFileDescriptorRecords;
private IdleHandler[] mPendingIdleHandlers;

// Indicates whether next() is blocked waiting
// in pollOnce() with a non-zero timeout.
private boolean mBlocked;

// 构造方法
MessageQueue(boolean quitAllowed) {
mQuitAllowed = quitAllowed;
mPtr = nativeInit();
}

// Must only be called on the looper thread or the finalizer.
// 释放资源
private void dispose() {
if (mPtr != 0) {
nativeDestroy(mPtr);
mPtr = 0;
}
}

// 当前线程 Idle 状态时,相关方法和接口
public boolean isIdle() {
synchronized (this) {
final long now = SystemClock.uptimeMillis();
return mMessages == null || now < mMessages.when;
}
}
public void addIdleHandler(@NonNull IdleHandler handler) {
if (handler == null) {
throw new NullPointerException("Can't add a null IdleHandler");
}
synchronized (this) {
mIdleHandlers.add(handler);
}
}
public void removeIdleHandler(@NonNull IdleHandler handler) {
synchronized (this) {
mIdleHandlers.remove(handler);
}
}
public static interface IdleHandler {
boolean queueIdle();
}

// 当前线程是否在轮询获取下条消息
public boolean isPolling() {
synchronized (this) {
return isPollingLocked();
}
}
private boolean isPollingLocked() {
// If the loop is quitting then it must not be idling.
// We can assume mPtr != 0 when mQuitting is false.
return !mQuitting && nativeIsPolling(mPtr);
}

// 判断是否存在指定条件的消息
boolean hasMessages(Handler h, int what, Object object) {
if (h == null) {
return false;
}

synchronized (this) {
Message p = mMessages;
while (p != null) {
if (p.target == h && p.what == what &&
(object == null || p.obj == object)) {
return true;
}
p = p.next;
}
return false;
}
}
boolean hasMessages(Handler h, Runnable r, Object object) {...}

// 移除指定消息或整个消息队列
void removeMessages(Handler h, int what, Object object) {
if (h == null) {
return;
}

synchronized (this) {
Message p = mMessages;

// Remove all messages at front.
// 如果消息队列头结点或者头部几个节点,就是目标消息
while (p != null && p.target == h && p.what == what
&& (object == null || p.obj == object)) {
Message n = p.next;
mMessages = n;
p.recycleUnchecked();
p = n;
}

// Remove all messages after front.
// 遍历整个队列,找到目标消息移出队列并回收
while (p != null) {
Message n = p.next;
if (n != null) {
if (n.target == h && n.what == what
&& (object == null || n.obj == object)) {
Message nn = n.next;
n.recycleUnchecked();
p.next = nn;
continue;
}
}
p = n;
}
}
}
void removeMessages(Handler h, Runnable r, Object object) {...}
void removeCallbacksAndMessages(Handler h, Object object) {...}
private void removeAllMessagesLocked() {...}
private void removeAllFutureMessagesLocked() {...}

// 消息队列退出循环
void quit(boolean safe) {
// 是否允许退出?主线程 Looper 是不允许退出的,工作线程可以
if (!mQuitAllowed) {
throw new IllegalStateException(
"Main thread not allowed to quit.");
}

synchronized (this) {
// 如果已经正在退出了返回
if (mQuitting) {
return;
}
mQuitting = true;

// 如果是安全退出,只移除没有投递和处理的消息
// 已经投递的等待处理完毕
// 非安全退出,移除所有消息
if (safe) {
removeAllFutureMessagesLocked();
} else {
removeAllMessagesLocked();
}

//We can assume mPtr!=0 because mQuitting was previously false.
nativeWake(mPtr);
}
}

// 设置和移除同步屏障
// The next barrier token.
// Barriers are indicated by messages with a null target
// whose arg1 field carries the token.
private int mNextBarrierToken;
public int postSyncBarrier() {
return postSyncBarrier(SystemClock.uptimeMillis());
}
private int postSyncBarrier(long when) {
// Enqueue a new sync barrier token.
// We don't need to wake the queue
// because the purpose of a barrier is to stall it.
synchronized (this) {
final int token = mNextBarrierToken++;
// 同步屏障消息 target 为空,arg1 保存令牌
final Message msg = Message.obtain();
msg.markInUse();
msg.when = when;
msg.arg1 = token;

Message prev = null;
Message p = mMessages;
// 同步屏障消息是插入到队列头部的,优先级高
if (when != 0) {
while (p != null && p.when <= when) {
prev = p;
p = p.next;
}
}
if (prev != null) { // invariant: p == prev.next
msg.next = p;
prev.next = msg;
} else {
msg.next = p;
mMessages = msg;
}
return token;
}
}
public void removeSyncBarrier(int token) {
// Remove a sync barrier token from the queue.
// If the queue is no longer stalled by a barrier then wake it.
synchronized (this) {
Message prev = null;
Message p = mMessages;
while (p != null && (p.target != null || p.arg1 != token)) {
prev = p;
p = p.next;
}
if (p == null) {
throw new IllegalStateException(
"The specified message queue synchronization barrier token"
+ " has not been posted or has already been removed.");
}
final boolean needWake;
// 移除同步屏障
if (prev != null) {
prev.next = p.next;
needWake = false;
} else {
mMessages = p.next;
needWake = mMessages == null || mMessages.target != null;
}
// 回收同步屏障
p.recycleUnchecked();

// If the loop is quitting then it is already awake.
// We can assume mPtr != 0 when mQuitting is false.
if (needWake && !mQuitting) {
nativeWake(mPtr);
}
}
}

// 消息入队
boolean enqueueMessage(Message msg, long when) {
// 为空,表示是同步屏障;不能直接入队,需要通过 postSyncBarrie 插入队列头部
if (msg.target == null) {
throw new IllegalArgumentException("Message must have a target.");
}
// FLAG_IN_USE,表示消息已经在消息池或者消息队列中
if (msg.isInUse()) {
throw new IllegalStateException(msg +
" This message is already in use.");
}

synchronized (this) {
// 如果消息队列正在退出,直接回收当前消息并返回
if (mQuitting) {
IllegalStateException e = new IllegalStateException(msg.target
+ " sending message to a Handler on a dead thread");
Log.w(TAG, e.getMessage(), e);
msg.recycle();
return false;
}

msg.markInUse();
msg.when = when;
Message p = mMessages;
boolean needWake;
if (p == null || when == 0 || when < p.when) {
// 消息队列为空,或者没有设置消息绝对时间
// 或者新消息绝对时间最小:将当前消息插入到队首
// New head, wake up the event queue if blocked.
msg.next = p;
mMessages = msg;
needWake = mBlocked;
} else {
// 消息队列阻塞情况下,并且队首为同步屏障
// 并且当前消息为异步消息,则唤醒队列
needWake = mBlocked && p.target == null && msg.isAsynchronous();
Message prev;
for (;;) {
prev = p;
p = p.next;
if (p == null || when < p.when) {
break;
}
// 消息队列中还存在其他异步消息,不唤醒
if (needWake && p.isAsynchronous()) {
needWake = false;
}
}
// 按照消息绝对时间排序,插入到指定位置
msg.next = p; // invariant: p == prev.next
prev.next = msg;
}

// We can assume mPtr != 0 because mQuitting is false.
// 唤醒消息队列
if (needWake) {
nativeWake(mPtr);
}
}
return true;
}

// 取出消息
Message next() {
// native 变量地址为空,直接返回
final long ptr = mPtr;
if (ptr == 0) {
return null;
}

int pendingIdleHandlerCount = -1; // -1 only during first iteration
int nextPollTimeoutMillis = 0;
// 无限循环等待
for (;;) {
// 线程阻塞前写入所有命令
if (nextPollTimeoutMillis != 0) {
Binder.flushPendingCommands();
}

// 阻塞等待 nextPollTimeoutMillis 时间
nativePollOnce(ptr, nextPollTimeoutMillis);

synchronized (this) {
// Try to retrieve the next message. Return if found.
final long now = SystemClock.uptimeMillis();
Message prevMsg = null;
Message msg = mMessages;
if (msg != null && msg.target == null) {
// 消息队列队首为同步屏障,挂起所有同步消息
// 循环取出所有异步消息
do {
prevMsg = msg;
msg = msg.next;
} while (msg != null && !msg.isAsynchronous());
}
if (msg != null) {
if (now < msg.when) {
// 下一条消息时间还没到
// 设置 nextPollTimeoutMillis 为消息延迟时间
nextPollTimeoutMillis = (int) Math.min(msg.when
- now, Integer.MAX_VALUE);
} else {
// Got a message. 获取一条消息并返回
mBlocked = false;
if (prevMsg != null) {
prevMsg.next = msg.next;
} else {
mMessages = msg.next;
}
msg.next = null;
if (DEBUG) Log.v(TAG, "Returning message: " + msg);
msg.markInUse();
return msg;
}
} else {
// No more messages. 消息队列为空
nextPollTimeoutMillis = -1;
}

// 消息队列正在退出,释放资源并返回空
if (mQuitting) {
dispose();
return null;
}
// 没有空闲处理器 IdleHandler,初始化计数
if (pendingIdleHandlerCount < 0
&& (mMessages == null || now < mMessages.when)) {
pendingIdleHandlerCount = mIdleHandlers.size();
}
// 还是没有空闲处理器 IdleHandler,
// 标记消息队列阻塞,进入下一次循环
if (pendingIdleHandlerCount <= 0) {
mBlocked = true;
continue;
}
// 初始化并设置 IdleHandler
if (mPendingIdleHandlers == null) {
mPendingIdleHandlers =
new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
}
mPendingIdleHandlers=mIdleHandlers.toArray(mPendingIdleHandlers);
}
// 利用这段空闲时间,线程处理 IdleHandler 任务
for (int i = 0; i < pendingIdleHandlerCount; i++) {
final IdleHandler idler = mPendingIdleHandlers[i];
mPendingIdleHandlers[i] = null;

boolean keep = false;
try {
keep = idler.queueIdle();
} catch (Throwable t) {
Log.wtf(TAG, "IdleHandler threw exception", t);
}

if (!keep) {
synchronized (this) {
mIdleHandlers.remove(idler);
}
}
}

// 重置空闲任务计数和阻塞超时时间
pendingIdleHandlerCount = 0;
nextPollTimeoutMillis = 0;
}
}

//
public boolean isPolling() {
synchronized (this) {
return isPollingLocked();
}
}
private boolean isPollingLocked() {
return !mQuitting && nativeIsPolling(mPtr);
}

// 监听文件描述符相关,本文不做介绍
public void addOnFileDescriptorEventListener(@NonNull FileDescriptor fd,
@OnFileDescriptorEventListener.Events int events,
@NonNull OnFileDescriptorEventListener listener) {...}
public void removeOnFileDescriptorEventListener(
@NonNull FileDescriptor fd) {...}
private void updateOnFileDescriptorEventListenerLocked(FileDescriptor fd,
int events, OnFileDescriptorEventListener listener) {...}
// Called from native code.处理文件描述事件回调
private int dispatchEvents(int fd, int events) {...}
public interface OnFileDescriptorEventListener {
...
@Events int onFileDescriptorEvents(@NonNull FileDescriptor fd,
@Events int events);
}
private static final class FileDescriptorRecord {...}

// native 方法
private native static long nativeInit();
private native static void nativeDestroy(long ptr);
/*non-static for callbacks*/
private native void nativePollOnce(long ptr, int timeoutMillis);
private native static void nativeWake(long ptr);
private native static boolean nativeIsPolling(long ptr);
private native static void nativeSetFileDescriptorEvents(long ptr,
int fd, int events);
}

快速查看 MessageQueue API

重要成员变量

  • mMessages
    用于存储消息,利用 Message 链表结构实现先进先出队列,确保先进入或者时间最近的消息先被处理。
  • ArrayList mIdleHandlers
    记录消息队列中有多少个 IdleHandler
  • IdleHandler[] mPendingIdleHandlers
    mIdleHandlers 转换为对应的数组。
  • mBlocked
    next 方法在等待 pollOncemBlocked 记录 next 是否处于阻塞状态。
  • mQuitting
    记录当前消息队列是否正在退出。
  • mNextBarrierToken
    同步屏障令牌,同步屏障 Barriers 表示一个没有指定 HandlerMessage,它的 arg1 参数记录了这个令牌。
  • mFileDescriptorRecords
    存储被监听文件描述符的表,Handler 机制中消息发送并没有使用文件描述符监功能,暂时不做介绍。AOSP 系统源码中仅 frameworks/base/core/java/android/os/ParcelFileDescriptor.java 用到了。

初始化

Handler 机制中,MessageQueue 的实例化是在 Looper 的构造方法中实现的,赋值两个成员变量:

  • mQuitAllowed
    是否运行消息队列退出。主线程的消息队列是不允许退出的,工作线程可以。
  • mPtr
    执行 nativeInit 方法,并将 native 层的 NativeMessageQueue 类对象地址赋值给 Java 层的成员变量 mPtr 。通过这种方式将 Java 层的对象与 native 层的对象关联在了一起。这种在 Java 层保存 native 层对象引用地址来实现关联的方式,在 Android 源代码中能经常看到。

同步屏障

同步屏障 SyncBarrier 是一个特殊的消息 Message ,有两个重要特点:

  • Handler target 为空
  • arg1 记录同步屏障令牌
    同步屏障消息的 what 为空,也就是不能通过 what 来标记消息了,而通过 arg1 记录同步令牌来做为消息标记。令牌非常重要,移除同步屏障时需要指定具体的令牌。

如果设置了同步屏障,消息队列所有的同步消息都会被暂停,next() 只会取出异步消息处理,直到移除同步屏障。如果没有正确移除同步屏障,整个消息队列将会挂起,同时也不再循环取出同步消息执行。

  • 设置同步屏障
    postSyncBarrier() 设置同步屏障时,将它加入到消息队列,并将同步屏障消息设为头结点。也就是同步屏障是插入到队列的头部的,优先级很高,消息通过 enqueueMessage 加入队列都是插入队尾的。设置同步屏障并不会唤醒队列。
  • 移除同步屏障
    removeSyncBarrier(int token) 根据令牌移除同步屏障,遍历消息队列。如果同步屏障位于头结点,且下一条消息为同步消息,表示队列没有被被挂起,唤醒队列。移除同步屏障后,消息队列恢复正常,循环取出消息。

同步屏障相关方法都是 @hide 的,也就是说只会在 Android Framework 内部使用,客户端 App 并不能应用这些功能。并且同步屏障和异步消息在 Android Framework 中使用的也很少,比如搜索到的应用如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
xmt@server005:~/android$ grep -irsn postSyncBarrier frameworks/
frameworks/base/cmds/hid/src/com/android/commands/hid/Device.java:143:
mBarrierToken = getLooper().myQueue().postSyncBarrie ();

frameworks/base/core/java/android/view/ViewRootImpl.java:1225:

public void dispatchInputEvent(InputEvent event,
InputEventReceiver receiver) {
SomeArgs args = SomeArgs.obtain();
args.arg1 = event;
args.arg2 = receiver;
Message msg = mHandler.obtainMessage(MSG_DISPATCH_INPUT_EVENT, args);
// 设置为异步消息
msg.setAsynchronous(true);
mHandler.sendMessage(msg);
}

void scheduleTraversals() {
if (!mTraversalScheduled) {
mTraversalScheduled = true;
// 设置同步屏障
mTraversalBarrier=mHandler.getLooper().getQueue().postSyncBarrier();
mChoreographer.postCallback(
Choreographer.CALLBACK_TRAVERSAL, mTraversalRunnable, null);
if (!mUnbufferedInputDispatch) {
scheduleConsumeBatchedInput();
}
notifyRendererOfFramePending();
pokeDrawLockIfNeeded();
}
}

Android 系统的屏幕点击事件,就是一个异步消息;View 请求启动绘制生命周期 ViewRootImpl.scheduleTraversals 会设置同步屏障,优先处理异步消息。

消息入队 enqueueMessage

后台线程完成耗时操作后,通过对象 Handler.post***/send*** 发送消息,最终会进入到 MessageQueue.enqueueMessage ,而 enqueueMessage 仅仅是将消息按照 when 顺序插入到消息队列中。后台线程不会被阻塞,执行完毕可以退出。

0023-handler-messagequeue-equeuemessage.png

  • 入队顺序 Message.when
    按照消息的绝对时间 Message.when 排序,插入到队列中,如果时间相同就按照入队先后顺序。确保消息投递的绝对时间 when 越小,越早出队。
  • 唤醒队列 needWake
    如果消息队列处于阻塞状态 mBlocked=true 时,插入消息时才需要唤醒队列。但是当消息队列队首为同步屏障,并且新消息本身为异步消息,同时当消息队列中还存在其他异步消息时,此时不会唤醒,具体看上面的流程图。消息插入消息队列,正常情况下不会唤醒,除了图中的几个特殊情况。
  • 消息队列正在退出 mQuitting
    如果消息队列调用了 quit() 方法,表示消息队列正在退出;当消息队列正在退出时,直接回收当前准备入队的消息,并返回 false ,入队失败。

消息队列存储顺序

消息队列中,首先按照 msg.when 排序,结合同步屏障,异步消息,后期补一个图。

消息出队 next

主线程的 Looper.loop() 会无限循环,不停的调用 MessageQueue.next() 获取下一条消息,交给 Handler 处理。而 next 通过 nativePollOnce 方法实现线程的阻塞,阻塞时间为消息的延迟时间。当时间到了后,唤醒主线程取出这个消息;即后台线程发布的消息,回到主线程来处理。

0023-handler-messagequeue-next.png

  • 阻塞时间 nextPollTimeoutMillis
    表示阻塞时间,通常为消息的延迟时间。如果为 0 ,表示不阻塞,直接返回;如果为 -1 ,表示一直阻塞。当消息队列中没有消息时,nextPollTimeoutMillis=-1 使得 next 会一直阻塞,直到有消息进来,更新这个值。
  • 阻塞状态 mBlocked
    next 中的 for 循环取到消息后,next 返回这个消息并退出,消息队列阻塞状态为 falsefor 循环没有取到消息,如果存在 IdleHandler 时,利用这段时间处理空闲任务;否则设置消息队列为阻塞状态 true;重新进入循环继续阻塞等待取消息。
  • 消息出队顺序
    同步消息都是按照 msg.when 即插入顺序先进先出的;当消息队列队首为同步屏障时,挂起队列中所有同步消息,循环取出队列中所有的异步消息,直到移除同步屏障。
  • 消息队列正在退出 mQuitting
    如果消息队列调用了 quit() 方法,表示消息队列正在退出;当消息队列正在退出时,调用 dispose() 方法,销毁 native 环境,跳出 Looper.loop 循环,退出整个机制。
  • 空闲线程 IdleHandler
    当消息队列为空,或者消息投递时间没到时;如果添加了空闲线程处理器 IdleHandler ,此时会利用线程空闲时间执行这些任务。

消息轮询状态

isPolling/isPollingLocked 方法主要是否返回当前线程轮询状态,即当前线程是否处于 epoll_wait 阻塞等待中,该方法在 Android Framework 中使用的也很少:

1
2
frameworks/base/services/core/java/com/android/server/Watchdog.java:121: 
if (mMonitors.size() == 0 && mHandler.getLooper().getQueue().isPolling()) {...}

MessageQueue.mBlocked 也是记录的阻塞状态,和 isPolling/isPollingLocked 功能差不多,只是没有 isPolling/isPollingLocked 方法返回的状态更精细、更准确。

删除消息

  • 删除指定消息
    符合指定消息的可能并不止一个,所以需要遍历整个队列,找到这些符合条件的消息。删除时分两种情况:如果指定消息是头结点,或者头部几个节点都是指定消息,删除指定消息后,重新指定头结点;如果头结点或者新的头结点不是指定消息,依次遍历整个队列,找到目标消息移出队列并回收。
  • 删除所有消息
    从头结点开始遍历队列,移出队列中所有消息并删除。
  • 删除延迟消息
    从头结点开始遍历队列,对比当前时间和 Message.when ,找出延迟消息,删除掉这些还没来得及投递处理的消息。

消息队列退出

MessageQueue.quit() 方法中有如下几个判断:

  • 是否运行退出 mQuitAllowed
    主线程的 Looper 循环是不允许退出的,这在主线程初始化时,构造 MessageQueue 决定的;工作线程可以退出。
  • 是否正在退出 mQuitting
    因为是多线程环境,所以使用了 synchronized 关键字来确保并发正确。
  • 是否安全移除 safe
    是否安全移除,主要区别在于移除哪些消息。如果不安全退出,直接移除队列中所有消息;如果安全退出,对比当前时间和 Message.when ,只移除延迟消息。

MessageQueue.quit() 方法主要是移除消息队列中对应的消息,并设置了 mQuitting=true ;调用 nativeWake() 方法唤醒 next() 方法,如果是安全退出,则取出剩余的消息并返回给 Handler 处理;next 方法中如果取不到消息,当 mQuitting=true 时,释放资源并退出整个 Looper.loop() 循环机制。

IdleHandler 接口

一个回调接口,在消息队列阻塞等待前,当前线程可以利用这段时间做别的事情。

  • MessageQueue.addIdleHandler()
    添加回调接口。
  • MessageQueue.removeIdleHandler()
    移除回调接口。
  • 示例
    比如 ActivityThread 中,利用这个空闲时间,添加了 GcIdle 回收内存和 Idler 释放 AMS 相关资源。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    // 1.内存回收
    Looper.myQueue().addIdleHandler(mGcIdler);
    final class GcIdler implements MessageQueue.IdleHandler {
    @Override
    public final boolean queueIdle() {
    doGcIfNeeded();
    return false;
    }
    }
    void doGcIfNeeded() {
    mGcIdlerScheduled = false;
    final long now = SystemClock.uptimeMillis();
    //Slog.i(TAG, "**** WE MIGHT WANT TO GC: then=" +
    // Binder.getLastGcTime() + "m now=" + now);
    if ((BinderInternal.getLastGcTime()+MIN_TIME_BETWEEN_GCS) < now) {
    //Slog.i(TAG, "**** WE DO, WE DO WANT TO GC!");
    BinderInternal.forceGc("bg");
    }
    }
    // 2. AMS 释放相关资源,调用 activityIdle
    Looper.myQueue().addIdleHandler(new Idler());
    private class Idler implements MessageQueue.IdleHandler {
    @Override
    public final boolean queueIdle() {
    ActivityClientRecord a = mNewActivities;
    ...
    if (a != null) {
    mNewActivities = null;
    IActivityManager am = ActivityManagerNative.getDefault();
    ActivityClientRecord prev;
    do {
    ...
    if (a.activity != null && !a.activity.mFinished) {
    try {
    am.activityIdle(a.token, a.createdConfig, stopProfiling);
    a.createdConfig = null;
    } catch (RemoteException ex) {
    throw ex.rethrowFromSystemServer();
    }
    }
    prev = a;
    a = a.nextIdle;
    prev.nextIdle = null;
    } while (a != null);
    }
    ...
    }
    }

native 方法

简要描述几个 native 方法的作用:

  • private native static long nativeInit(); :初始化 native 对象,绑定 Looper 和线程等
  • private native static void nativeDestroy(long ptr); 销毁 native 对象
  • private native void nativePollOnce(long ptr, int timeoutMillis);
    唯一的非静态方法,也就是说这个方法是属于对象的;当前线程阻塞最多等待 timeoutMillis 时间(类似 Object.wait(timeout)),这段时间内可能会被 nativeWake 唤醒。
  • private native static void nativeWake(long ptr); 唤醒 nativePollOnce 函数
  • private native static boolean nativeIsPolling(long ptr); 查询 nativePollOnce 是否正处于阻塞状态
  • private native static void nativeSetFileDescriptorEvents(long ptr, int fd, int events); 添加文件描述符监听事件

Native JNI

Native JNI 层起到一个连接的作用,Handler 机制中使用的 epoll 等功能都是在系统层实现的,JNI 对这些功能调用做一个封装,相当于外观模式中的外观类。注意:Android Framework 中还有一个 ALooper 类,也是实现的消息机制,它主要是用于多媒体相关,代码路径 frameworks/av/media/ ,和本文介绍的消息机制没有任何关系。
如下根据 MessageQueue 中的 6 个 native 方法逐一展开分析,具体对应的 JNI 类为 Android_os_MessageQueue.cpp

环境初始化 nativeInit

nativeInit 函数主要是初始化所有 native 需要使用的类对象,变量等,主要就是 Looper.cpp 相关资源的初始化;函数的返回值是一个 nativeMessageQueue 对象,它是对 Looper.cpp 相关功能的一个封装。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
// NativeMessageQueue 对象实例化
NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
if (!nativeMessageQueue) {
jniThrowRuntimeException(env, "Unable to allocate native queue");
return 0;
}

// 增加引用计数
nativeMessageQueue->incStrong(env);
return reinterpret_cast<jlong>(nativeMessageQueue);
}

NativeMessageQueue::NativeMessageQueue() :
mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
// Looper 对象初始化,及线程绑定和相关环境初始化
mLooper = Looper::getForThread();
if (mLooper == NULL) {
mLooper = new Looper(false);
// TLS 保存当前线程的 Looper 对象
Looper::setForThread(mLooper);
}
}

class NativeMessageQueue : public MessageQueue, public LooperCallback {
public:
NativeMessageQueue();
virtual ~NativeMessageQueue();

virtual void raiseException(JNIEnv* env,
const char* msg, jthrowable exceptionObj);

void pollOnce(JNIEnv* env, jobject obj, int timeoutMillis);
void wake();
void setFileDescriptorEvents(int fd, int events);
virtual int handleEvent(int fd, int events, void* data);

private:
JNIEnv* mPollEnv;
jobject mPollObj;
jthrowable mExceptionObj;
};

销毁 nativeDestroy

native 中的几个关键对象,都使用了智能指针管理,所以销毁时减少引用计数,如果引用不存在自动销毁。

1
2
3
4
5
6
7
8
9
10
11
static void android_os_MessageQueue_nativeDestroy(JNIEnv* env, 
jclass clazz, jlong ptr) {
NativeMessageQueue* nativeMessageQueue =
reinterpret_cast<NativeMessageQueue*>(ptr);
// 减少引用计数
nativeMessageQueue->decStrong(env);
}

// 析构函数是空的
NativeMessageQueue::~NativeMessageQueue() {
}

轮询 nativePollOnce

唯一的非静态方法,也就是说这个方法是属于对象的,因为每个对象可以处理自己的回调。Handler 机制的核心,nativePollOnce 调用了 epoll 中的 epoll_wait 方法,当前线程阻塞等待:文件描述符事件(本文主要是唤醒事件)更新或者超时。阻塞等待时当前线程会释放 CPU 资源进入休眠状态,直到被唤醒,这个功能是 Linux epoll 机制实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env,jobject obj,
jlong ptr, jint timeoutMillis) {
NativeMessageQueue* nativeMessageQueue =
reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}

// 方法非常简单,调用 Looper.pollOnce
// pollObj 是当前对象,所以该方法不能为是静态的
// Looper.pollOnce 会回调 pollObj 对象的 dispatchEvents 方法
void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj,
int timeoutMillis) {
mPollEnv = env;
mPollObj = pollObj;
mLooper->pollOnce(timeoutMillis);
mPollObj = NULL;
mPollEnv = NULL;

if (mExceptionObj) {
env->Throw(mExceptionObj);
env->DeleteLocalRef(mExceptionObj);
mExceptionObj = NULL;
}
}

// Looper::pollInner 中会掉用 handleEvent ,这个方法是处理
// 监听的文件描述符事件更新,这里是对 java 方法的回调
int NativeMessageQueue::handleEvent(int fd,int looperEvents,void* data){
...
int newWatchedEvents = mPollEnv->CallIntMethod(mPollObj,
gMessageQueueClassInfo.dispatchEvents, fd, events);
if (!newWatchedEvents) {
return 0; // unregister the fd
}
if (newWatchedEvents != oldWatchedEvents) {
setFileDescriptorEvents(fd, newWatchedEvents);
}
return 1;
}
static struct {
jfieldID mPtr; // native object attached to the DVM MessageQueue
jmethodID dispatchEvents;
} gMessageQueueClassInfo;
// dispatchEvents 对应的 Java 中 MessageQueue.dispatchEvents
gMessageQueueClassInfo.dispatchEvents = GetMethodIDOrDie(env, clazz,
"dispatchEvents", "(II)I");

pollOnce 方法非常简单,需要获取当前对象,所以该方法不能为是静态的。调用了 Looper.pollOnce 方法,它会回调 pollObj 对象的 dispatchEvents 方法。而 Handler 收发消息机制中,并不会使用到文件描述符事件的回调,只需简单了解。

唤醒 nativeWake

Looper.cpp 中,唤醒事件是单独的文件描述符在监听,具体为 eventfd ,用于线程间通信。实际上 nativeWake 函数仅仅是向 eventfd 中写入一个值,将 epoll_wait 函数唤醒。

1
2
3
4
5
6
7
8
9
10
static void android_os_MessageQueue_nativeWake(JNIEnv* env, 
jclass clazz, jlong ptr) {
NativeMessageQueue* nativeMessageQueue =
reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->wake();
}

void NativeMessageQueue::wake() {
mLooper->wake();
}

是否正在轮询等待 nativeIsPolling

nativeIsPolling 函数返回的是 Looper.cppepoll_wait 是否处于阻塞等待状态;这个方法在 Android Framework 使用的也很少。

1
2
3
4
5
6
static jboolean android_os_MessageQueue_nativeIsPolling(JNIEnv* env, 
jclass clazz, jlong ptr) {
NativeMessageQueue* nativeMessageQueue =
reinterpret_cast<NativeMessageQueue*>(ptr);
return nativeMessageQueue->getLooper()->isPolling();
}

监听文件描述符 nativeSetFileDescriptorEvents

nativeSetFileDescriptorEvents 函数通过 Looper.cpp::addFd ,向 epoll_wait 中添加文件描述符监听事件,对应的回调 dispatchEventspollOnce 中分析过。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
static void android_os_MessageQueue_nativeSetFileDescriptorEvents(
JNIEnv* env, jclass clazz, jlong ptr, jint fd, jint events) {
NativeMessageQueue* nativeMessageQueue =
reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->setFileDescriptorEvents(fd, events);
}
void NativeMessageQueue::setFileDescriptorEvents(int fd, int events){
if (events) {
int looperEvents = 0;
if (events & CALLBACK_EVENT_INPUT) {
looperEvents |= Looper::EVENT_INPUT;
}
if (events & CALLBACK_EVENT_OUTPUT) {
looperEvents |= Looper::EVENT_OUTPUT;
}
mLooper->addFd(fd, Looper::POLL_CALLBACK, looperEvents, this,
reinterpret_cast<void*>(events));
} else {
mLooper->removeFd(fd);
}
}

系统层实现

系统层 Looper.cpp 都是在 Linux 中编程实现,包括下面介绍的线程相关方法、eventfd 文件描述符、epoll 机制。Looper.cpp 实现了完整的 native 消息收发机制,使用 MessageEnvelope 存储消息;Request 存储文件描述符的监听;这些都是提供给 native 层使用,不过 Android Framework 使用的并不是特别多。
本文介绍的 JavaHandler 消息通信机制中,收发消息过程,只会有唤醒 POLL_WAKE 和超时 POLL_TIMEOUT 两种返回事件。最开始看源码时总是将 Java 消息和 native 的消息一一对应,或者相互关联;但实际上它们是两个完全不同的东西,只是共用了 eventfd, epoll 消息处理机制,本文不做 native 消息的介绍,这两个概念不要混淆了。

线程相关方法

TLS: Thread Local Storage 线程本地存储,和 Java ThreadLocal 功能一样,保存线程私有数据。使用同一个变量,不同线程存储和拿到的结果能够被隔离,相当于提供了一个同名而不同值的全局变量(这个全局变量相对于拥有这个变量的线程来说)。

  • pthread_key_t gTLSKey
    gTLSKey 共享变量。
  • pthread_key_create
    pthread_key_create(& gTLSKey, threadDestructor); ,绑定 gTLSKey 变量,该变量所有线程都可以访问,但各个线程可以根据自己的需要往 gTLSKey 中填入不同的值。threadDestructor 是销毁函数,线程退出时会调用该销毁函数。
  • pthread_setspecific
    pthread_setspecific(gTLSKey, looper.get()); ,当前线程向 gTLSKey 中写入数据。
  • pthread_getspecific
    (Looper*)pthread_getspecific(gTLSKey); ,从 gTLSKey 中取出当前线程存储的值。
  • pthread_once 仅执行一次
    函数声明: int pthread_once(pthread_once_t *once_control, void (*init_routine) (void));
    函数功能:变量 once_control 的值设为 PTHREAD_ONCE_INIT 时,本函数保证 init_routine() 函数在进程执行序列中仅执行一次。

根据上面的简单介绍,Looper.cpp 中关于 TLS 线程本地存储的几个函数源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// once_control 变量,即确保函数只执行一次
static pthread_once_t gTLSOnce = PTHREAD_ONCE_INIT;
static pthread_key_t gTLSKey = 0;

sp<Looper> Looper::getForThread() {
// 确保多线程中 initTLSKey 方法只执行一次
int result = pthread_once(& gTLSOnce, initTLSKey);
LOG_ALWAYS_FATAL_IF(result != 0, "pthread_once failed");

// 从 gTLSKey 取出当前线程存储的数据
return (Looper*)pthread_getspecific(gTLSKey);
}

void Looper::initTLSKey() {
// 创建并绑定 gTLSKey
int result = pthread_key_create(& gTLSKey, threadDestructor);
LOG_ALWAYS_FATAL_IF(result != 0, "Could not allocate TLS key.");
}

// 销毁函数
void Looper::threadDestructor(void *st) {
Looper* const self = static_cast<Looper*>(st);
if (self != NULL) {
self->decStrong((void*)threadDestructor);
}
}

void Looper::setForThread(const sp<Looper>& looper) {
// also has side-effect of initializing TLS
sp<Looper> old = getForThread();

if (looper != NULL) {
looper->incStrong((void*)threadDestructor);
}

// 当前线程向 gTLSKey 写入数据
pthread_setspecific(gTLSKey, looper.get());

if (old != NULL) {
old->decStrong((void*)threadDestructor);
}
}

eventfd 文件描述符

eventfd 是一个用来通知事件的文件描述符,用来实现进程/线程间的等待/通知 wait/notify 机制以及数据交互,通过内核取唤醒用户态的事件。
只有一个系统调用接口:int eventfd(unsigned int initval, int flags);,返回一个新建的 eventfd 对象,该对象是一个内核维护的无符号的 64 位整型计数器,初始化值为 initvalflags 有几个可选项:

  • EFD_NONBLOCK :设置对象为非阻塞状态,read 读取 eventfd 时不会阻塞
  • EFD_CLOEXEC :表示执行时关闭;设置此标志后,在执行 exec 时才关闭 eventfd 描述符,否则该描述符一直打开

eventfd 是一个文件描述符,所以直接通过 read/write/close 来读写关闭。常见示例为进程/线程 Aeventfd 中写入数据,进程/线程 Beventfd 中读取数据。Looper.cpp 中源码分析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
Looper::Looper(bool allowNonCallbacks) :
mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false),
mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX){
// 构造方法中创建 mWakeEventFd 对象
mWakeEventFd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
LOG_ALWAYS_FATAL_IF(mWakeEventFd < 0,"Could not make wake event fd: %s",
strerror(errno));

AutoMutex _l(mLock);
rebuildEpollLocked();
}

Looper::~Looper() {
// 析构方法中,关闭 mWakeEventFd 句柄
close(mWakeEventFd);
// 析构方法中,关闭 epoll 句柄
if (mEpollFd >= 0) {
close(mEpollFd);
}
}

void Looper::wake() {
...
uint64_t inc = 1;
// 唤醒函数中,实际是向 mWakeEventFd 中写数据
ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd,
&inc, sizeof(uint64_t)));
if (nWrite != sizeof(uint64_t)) {
if (errno != EAGAIN) {
ALOGW("Could not write wake signal: %s", strerror(errno));
}
}
}

void Looper::awoken() {
...
uint64_t counter;
// 将 mWakeEventFd 中的数据都读出来
TEMP_FAILURE_RETRY(read(mWakeEventFd, &counter, sizeof(uint64_t)));
}

epoll 机制

I/O 多路复用就通过一种机制,可以监视多个文件描述符,一旦某个文件描述符就绪(通常是读或者写就绪),能够通知程序进行相应的读写操作。select, poll, epoll 都是 IO 多路复用的机制,本质上它们都是同步 I/O,都需要在读写事件就绪后自己负责进行读写。
epollselectpoll 的增强版本;相对于 select, poll 来说,epoll 更加灵活,没有描述符限制。epoll 使用一个文件描述符可以监听多个其他多个描述符,将用户关系文件描述符的事件存放到内核的一个事件表中,这样在用户空间和内核空间的 copy 只需一次。
epoll 只有三个系统调用:

  • epoll_create
    函数声明:int epoll_create(int size); ,创建一个 epoll 的句柄。
    参数 size 用来告诉内核这个监听的数目一共有多大。需要注意的是当创建好 epoll 句柄后,它就是会占用一个 fd 值,使用完后通过 close(mEpollFd); 来关闭。
  • epoll_ctl
    函数声明:int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); ,事件注册函数,告诉内核要监听什么类型的事件以及哪些文件描述符。
    参数 epfdepoll_create 创建的句柄;fd 是被监听文件描述符;op 表示动作,用三个宏来表示:
    EPOLL_CTL_ADD:注册新的 fdepfd 中;
    EPOLL_CTL_MOD:修改已经注册的 fd 的监听事件;
    EPOLL_CTL_DEL:从 epfd 中删除一个 fd
    参数 event 告诉内核需要监听什么事,可以是以下几个宏的集合:
    EPOLLIN :表示对应的文件描述符可以读;
    EPOLLOUT :表示对应的文件描述符可以写;
    EPOLLPRI :表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
    EPOLLERR :表示对应的文件描述符发生错误;
    EPOLLHUP :表示对应的文件描述符被挂断;
    EPOLLET : 将 EPOLL 设为边缘触发 Edge Triggered 模式,这是相对于水平触发 Level Triggered 来说的,关于 ET/LT 主要是效率的区别,不做展开;
    EPOLLONESHOT :只监听一次事件,当监听完这次事件之后,如果还需要继续监听的话,需要再次加入到队列里;
  • epoll_wait
    函数声明:int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);阻塞等待事件的产生。
    参数 events 用来从内核得到事件的集合;maxevents 告诉内核这个 events 有多大,不能大于创建 epoll_create() 时的 size ;参数 timeout 是超时时间(单位为毫秒,0 会不会阻塞立即返回,-1不确定或永久阻塞)。该函数返回需要处理的事件数目,如返回 0 表示已超时。

Looper.cppepoll 除了监听唤醒文件描述符 eventfd,还可以同时监听 MessageQueue 设置的文件描述符,通过 addFd 添加。部分源码解析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
void Looper::rebuildEpollLocked() {
// Close old epoll instance if we have one.
if (mEpollFd >= 0) {
...
close(mEpollFd);
}

// Allocate the new epoll instance and register the wake pipe.
// 创建 epoll 句柄
mEpollFd = epoll_create(EPOLL_SIZE_HINT);
LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance: %s"
, strerror(errno));

struct epoll_event eventItem;
// zero out unused members of data field union
memset(& eventItem, 0, sizeof(epoll_event));
// epoll 事件为 mWakeEventFd 可读事件
eventItem.events = EPOLLIN;
eventItem.data.fd = mWakeEventFd;
// 注册监听事件,唤醒文件描述符 mWakeEventFd
int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD,
mWakeEventFd, & eventItem);
LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake
event fd to epoll instance: %s", strerror(errno));
// 其他文件描述符
...
}

int Looper::pollOnce(int timeoutMillis, int* outFd,
int* outEvents, void** outData) {
int result = 0;
for (;;) {
// 其他文件描述符
...

// mWakeEventFd 唤醒事件
if (result != 0) {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - returning result %d", this, result);
#endif
if (outFd != NULL) *outFd = 0;
if (outEvents != NULL) *outEvents = 0;
if (outData != NULL) *outData = NULL;
return result;
}

result = pollInner(timeoutMillis);
}
}

int Looper::pollInner(int timeoutMillis) {
// native 消息,timeoutMillis 调整
...

// Poll.
int result = POLL_WAKE;
...
// We are about to idle.
mPolling = true;

struct epoll_event eventItems[EPOLL_MAX_EVENTS];
// epoll 阻塞等待事件的产生: mWakeEventFd 以及其他文件描述符
// 或者 timeoutMillis 超时
int eventCount = epoll_wait(mEpollFd, eventItems,
EPOLL_MAX_EVENTS, timeoutMillis);

// 事件产生,唤醒继续执行
// No longer idling.
mPolling = false;

// Acquire lock.
mLock.lock();

// 保护代码,状态检查
...

// 如果 epoll 是因为超时唤醒,更新 result
// Check for poll timeout.
if (eventCount == 0) {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - timeout", this);
#endif
result = POLL_TIMEOUT;
goto Done;
}

...
// 不是超时,检查是哪些文件描述符唤醒的
for (int i = 0; i < eventCount; i++) {
int fd = eventItems[i].data.fd;
uint32_t epollEvents = eventItems[i].events;

if (fd == mWakeEventFd) {
// mWakeEventFd 唤醒事件
if (epollEvents & EPOLLIN) {
// 交给 eventfd 处理
awoken();
} else {
...
}
} else {
// 其他文件描述符的唤醒
...
}
}

Done: ;

// native message 的处理
...

// Release lock.
mLock.unlock();

// Invoke all response callbacks.
// 处理 addFd 添加的文件描述符
...
{
// 回调文件描述符监听事件 handleEvent
int callbackResult = response.request.callback->handleEvent
(fd, events, data);
if (callbackResult == 0) {
removeFd(fd, response.request.seq);
}
}
return result;
}

// 返回 epoll_wait 是否处于阻塞等待
bool Looper::isPolling() const {
return mPolling;
}

从代码中可以看到,Handler 机制只监听了 mWakeEventFd 的可读事件,以及超时事件;mPolling 记录了 epoll_wait 是否处于阻塞等待,在 MessageQueue.isPolling 中查看该变量的值。

小结

Looper.cpp 中通过 eventfd 实现线程间通信;通过 epoll 实现监听 eventfd 文件描述符的读写事件,以及 Message 的超时事件。

消息收发流程图

Handler 机制大致流程:

  • Looper.prepare 准备环境,初始化所有相关对象
  • Looper.loop 进入无限循环,阻塞等待获取下一条消息 MessageQueue.next
  • 创建 Handler 子类,初始化
  • 创建 Message ,存储相关信息
  • Handler.sendMessage 发送消息,加入到消息队列 MessageQueue.enqueueMessage
  • LooperMessageQueue 中取出 Message 后,指派给 Handler.handleMessage 处理,处理完后回收 Message.recycleUnchecked
  • Looper.quit 退出机制

0023-handler-sequence-diag.png

HandlerThread 创建与销毁

Android 开发中经常会使用到线程,一想到线程,很多同学就立即使用 new Thread(){...}.start(); 这样的方式。这样如果在一个 Activity 中多次调用上面的代码,那么将创建多个匿名线程,程序运行的越久可能会越来越慢。因此,需要一个 Handler 来启动一个线程,以及删除一个线程,保证线程不会重复的创建。
使用 HandlerThreadHandler 配合实现异步后台任务。特点

  • 由 2 个 Handler 和 1 个 HandlerThread 来实现
  • 后台线程串行执行

源码分析

HandlerThread 是参考 Looper 标准用法实现的工作线程,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class HandlerThread extends Thread {
Looper mLooper;

...

/**
* Call back method that can be explicitly overridden
* if needed to execute some setup before Looper loops.
*/
protected void onLooperPrepared() {
}

@Override
public void run() {
mTid = Process.myTid();
Looper.prepare();
synchronized (this) {
mLooper = Looper.myLooper();
notifyAll();
}
Process.setThreadPriority(mPriority);
onLooperPrepared();
Looper.loop();
mTid = -1;
}

public Looper getLooper() {
if (!isAlive()) {
return null;
}

// If the thread has been started,
// wait until the looper has been created.
synchronized (this) {
while (isAlive() && mLooper == null) {
try {
wait();
} catch (InterruptedException e) {
}
}
}
return mLooper;
}
...
}

HandlerThread 的创建

代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// UI线程的Handler
Handler mHandler = new Handler(){
@Override
public void handleMessage(Message msg) {
super.handleMessage(msg);
// 处理UI更新
};

HandlerThread mBackThread = new HandlerThread("mybackthread");
mBackThread.start();
// 后台线程的Handler
Handler mBackHandler = new Handler(mBackThread.getLooper());
mBackHandler.post(new Runnable() {
@Override
public void run() {
// 后台线程执行耗时操作,异步
...
// mHandler发消息,回到主线程更新UI
mHandler.sendMessage(msg);
}
});

注意mBackHandler 的初始化必须在 mBackThread.start(); 之后,否则拿不到这个线程的 looper。源码中可以看到,getLooper 会一直等待;而 Looper 是在 run 中创建的,并且通知等待线程。
这种模式通过 mBackHandler.post(new Runnable() {}) 来实现后台异步任务执行,所有后台任务都是通过 HandlerThread 这个线程执行的,但是 HandlerThread 是串行执行任务的,也就是每次 post 后进入队列排队执行。

HandlerThread的退出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
protected void onDestroy() {
super.onDestroy();
if(mBackThread != null){
mBackThread.quitSafely();
try {
mBackThread.join();
mBackThread = null;
mBackHandler = null;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

Activity.onDestroy 退出时,等待 mBackThread 线程销毁完成再退出!

其他线程间通信机制

  • LocalBroadcastManager 本质还是 Handler 机制
  • EventBus
    查看了 EventBus 源码,UI 主线程间通信还是采用了 Handler 机制:public class HandlerPoster extends Handler implements Poster {...} ;后台线程间通信,采用的是 Java 标准的 notify/wait 机制。

后续

Messenger 信差

Messenger 用于进程间异步通信,其中通过 AIDL 来实现的进程间通信的;通过 Handler.sendMessage 来实现异步通信。详细参考Messenger 详解

OnFileDescriptorEventListener 监听文件描述符

MessageQueue.java 中提供了完整的文件描述符监听功能,包括 Android_os_MessageQueue.cpp, Looper.cpp 中也实现了对应的功能,本文暂不做描述。

Native Loop 机制

Java 层的 Handler 机制,在系统层的 Looper.cpp 文件中重新全部实现了一遍,供 Native 代码使用。本文仅仅介绍了其中的唤醒和轮询两个功能。而 Looper.cpp 中还包含添加文件描述符监听,收发 native 的消息。可以参考:Android Native层Looper详解Android Native Looper机制 - 监听文件描述符

总结

  • Handler 通过 Linux 系统调用的意义
    本文介绍的 Handler 消息收发机制,完全可以使用 Object.wait/notify 来简单实现;但是源码为什么还使用了 Linux epoll 来实现呢?这是因为 Framework 除了提供等待和唤醒方法,还提供了其他文件描述符监听机制等,epoll 机制更强大更高效。
  • 应用程序的主线程处于无限循环状态
    应用主线程 ActivityThread 进入 Looper.loop() 后不允许退出,直到应用退出;也就是说主线程是无限循环的,通过 epoll 机制阻塞等待时释放 CPU 资源,直到超时或事件触发才唤醒主线程。而我们经常遇到的 ActivityANR 限制是针对 Activity 等组件的,实际上并不是主线程。
  • Handler 机制中的线程切换
    Handler 机制通常有两个线程:主线程和后台工作线程。主线程 loop 无限循环阻塞等待;后台工作线程完成耗时任务,发送完消息后就可以退出了;此时阻塞等待的主线程收到 epoll 机制的唤醒,拿到这个消息直接处理。因为消息 Message 是共享变量,所以整个 Handler 机制使用了很多同步锁 synchronized ,所以两个线程之间仅仅只是等待/通知交互。Looper.cpp 底层 epoll 机制并没有涉及数据交互,仅仅是线程间的通知什么时候能够取出消息,相当于 Java notify/wait 机制。而 epoll 还支持监听文件描述符涉及到数据交互,但是不属于本文介绍范围。
  • Handler 机制的退出
    主线程是无法退出的,会一直循环等待,除非退出应用程序;其他线程的退出通过调用 Looper.quit 来退出,退出时会移除消息队列的所有消息,并设置 mQuitting 使得 MessageQueue.next 返回 nullLooper.loop 退出循环。
  • 消息队列的唤醒
    消息队列 MessageQueue 有三个地方会唤醒:退出 quit 必须唤醒;移除同步屏障 removeSyncBarrier 和消息入队 enqueueMessage 满足条件时才唤醒。通过 nativeWake 触发唤醒。

参考文档