Handler
机制是 Android
系统处理同一进程不同线程间通信的机制,基于 Linux
系统的 epoll
机制实现。
Android
两大机制:Binder
机制用于处理进程间通信;Handler
机制用于处理进程内的线程间通信,线程间交互。
概念
Message
意为消息,发送到 Handler
进行处理的对象,携带描述信息和任意数据。
MessageQueue
意为消息队列,Message
的集合。
Looper
消息泵,用来从 MessageQueue
中抽取 Message
,发送给 Handler
进行处理。
Handler
处理 Looper
抽取出来的 Message
。
也就是说所有的消息处理都是串行执行 的。
他们之间的关系:
每个 Thread
只对应一个 Looper
每个 Looper
只对应一个 MessageQueue
每个 MessageQueue
中有 N
个 Message
每个 Message
最多指定一个 Handler
来处理事件
每个 Thread
可以对应多个 Handler
在如下操作中都是基于 UI
主线程,在异步任务中使用 Handler
机制更新 UI
必须用 new Handler();
来初始化。
1 2 3 Handler mHandler = new Handler(); mHandler.post(new Runnable(){});
源码目录及类对应文件 源码目录结构 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 frameworks/base/core/java/android/os/ Message.java MessageQueue.java Looper.java Handler.java frameworks/base/core/jni/ android_os_MessageQueue.h android_os_MessageQueue.cpp system/core/libutils/include/utils/Looper.h system/core/libutils/Looper.cpp
类或接口对应文件 1 2 3 4 5 6 7 8 Message.java: Message MessageQueue.java: MessageQueue Looper.java: Looper Handler.java: Handler Looper.h: Message, MessageHandler, WeakMessageHandler, LooperCallback, SimpleLooperCallback, Looper android_os_MessageQueue.h: MessageQueue android_os_MessageQueue.cpp: NativeMessageQueue
Message
详解Message
是 Handler
机制中的数据容器,和 Binder
机制中的 Parcel
功能一样。Message
数据结构特点:
链表Message
是单链表数据结构,成员变量 next
保存下一条消息,而当前消息是链表的头结点。Message
中的消息池 Message sPool
利用链表头结点特性实现栈,快速存取消息;MessageQueue
中的成员变量 Message mMessages
利用链表结构实现先进先出队列,确保先进入的消息先被处理。
存储简单数据 成员变量 arg1, arg2
用来存储简单整型数据。
存储复杂数据 成员变量 Object obj, Bundle data
用来存储复杂数据,其中 Bundle
能够存放键值对。
源码分析 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 public final class Message implements Parcelable { public int what; public int arg1; public int arg2; public Object obj; public Messenger replyTo; public int sendingUid = -1 ; Bundle data; public Bundle getData () { if (data == null ) { data = new Bundle(); } return data; } public Bundle peekData () { return data; } public void setData (Bundle data) { this .data = data; } Message next; private static Message sPool; private static final int MAX_POOL_SIZE = 50 ; public Message () {} public static Message obtain (Handler h) {...} public static Message obtain (Handler h, int what) {...} public static Message obtain (Handler h, Runnable callback) {...} public static Message obtain (Message orig) {...} public static Message obtain (Handler h, int what, int arg1, int arg2, Object obj) {...} public static Message obtain (Handler h, int what, int arg1, int arg2) {...} public static Message obtain (Handler h, int what, Object obj) {...} public static Message obtain () { synchronized (sPoolSync) { if (sPool != null ) { Message m = sPool; sPool = m.next; m.next = null ; m.flags = 0 ; sPoolSize--; return m; } } return new Message(); } public void recycle () { if (isInUse()) { if (gCheckRecycle) { throw ... } return ; } recycleUnchecked(); } void recycleUnchecked () { flags = FLAG_IN_USE; what = 0 ; arg1 = 0 ; arg2 = 0 ; obj = null ; replyTo = null ; sendingUid = -1 ; when = 0 ; target = null ; callback = null ; data = null ; synchronized (sPoolSync) { if (sPoolSize < MAX_POOL_SIZE) { next = sPool; sPool = this ; sPoolSize++; } } } long when; public long getWhen () {return when;} Handler target; public void setTarget (Handler target) { this .target = target; } public Handler getTarget () { return target; } Runnable callback; public Runnable getCallback () { return callback; } static final int FLAG_IN_USE = 1 << 0 ; static final int FLAG_ASYNCHRONOUS = 1 << 1 ; static final int FLAGS_TO_CLEAR_ON_COPY_FROM = FLAG_IN_USE; int flags; boolean isInUse () { return ((flags & FLAG_IN_USE) == FLAG_IN_USE); } void markInUse () { flags |= FLAG_IN_USE; } public boolean isAsynchronous () {...} public void setAsynchronous (boolean async) {...} public void copyFrom (Message o) {...} public static final Parcelable.Creator<Message> CREATOR = new Parcelable.Creator<Message>() {...} public int describeContents () {...} public void writeToParcel (Parcel dest, int flags) {...} private void readFromParcel (Parcel source) {...} ... }
快速查看 Message API 。
消息池 sPool
Message
中使用了单向链表结构的消息池,sPool
总是指向链表顶部,所以这个链表模拟的是栈结构,即消息池的数据结构为链表实现的栈。
消息池的创建 源码中可以看出,消息在被回收后 recycleUnchecked()
,将当前消息加入到消息池链表中,即消息池才开始有可以重复利用的 Message
;创建消息池使用了 synchronized
关键字,确保线程池的操作是同步的。如果系统中,同时有很多消息在被传递,当一部分消息使用结束后都会被回收,此时消息池会积蓄的越来越多。
消息的重复利用 创建消息时推荐使用 Message.obtain()
方式,因为每次 obtain
时,都会判断消息池中是否有可以循环利用的消息,如果存在则取出并清空消息标记,否则才新建一个消息。
实例化一个消息,请使用 Message.obtain()
,充分利用消息池循环利用的特点。
重要成员变量
long when
消息投递的绝对时间(投递时当前时间+设置的延迟时间),在 Handler
发送消息或发布任务时,指定具体的值。MessageQueue
在消息加入队列时,会根据 when
值决定消息在队列中的顺序。
Handler target
保存处理该消息的 Handler
,在 Handler.enqueueMessage()
中将 Message
和对应处理该消息的 Handler
关联起来。
Runnable callback
在 Handler.java
的分析中得出结论:消息的处理有三个优先级,可以直接使用 Message.callback
来进行消息处理。
消息标识 flags
成员变量 flags
有如下四个值:
0 消息标识清除。表示该消息被创建 new Message()
、或者是从消息池中新取出的 Message.obtain()
。可以理解是一个新消息,能修改当前消息内容。
FLAG_IN_USE
消息正在被使用。表示消息进入了消息池栈 sPool
中、或者进入了消息队列 MessageQueue
中。在消息池中,表示被回收了的消息正在被消息池管理,可以被取出循环利用;在消息队列中,表示消息被投递,正等待被取出处理。
FLAG_ASYNCHRONOUS
表示异步消息;消息默认都是同步的,只能在 Message
和 Handler
构造方法中,特别指定为异步消息。如果设置了同步屏障,异步消息优先级将高于同步消息;消息队列 MessageQueue.next()
在取出消息时,遇到同步屏障会暂停所有的同步消息,将异步消息取出并处理,直到移除同步屏障。所以异步消息通常和同步屏障配合使用,同步屏障详细分析见 MessageQueue
中的分析。
FLAGS_TO_CLEAR_ON_COPY_FROM
它的值和 FLAG_IN_USE
一样,只有在 copyFrom()
方法中会用到。
Messenger
相关Message
包含具体的数据信息,Messenger
是一名信差,用于进程间发送指定的 Message
。实质上 Messenger
使用 AIDL
和 Handler
机制来实现进程间的异步通信。详细参考Messenger 详解 。
Messenger replyTo
回复此消息的回调信差,跨进程时通信时,可以利用 Message
中携带的信差,完成消息回复。
sendingUid
跨进程发送异步消息的进程 ID
。
Looper
详解Looper
类设计原则:此类包含基于 MessageQueue
设置和管理事件循环所需的代码。影响队列状态的 API
应该在 MessageQueue, Handler
上定义,而不是在 Looper
本身上定义。例如在队列上定义 idle handlers
和 sync barriers
,而在 Looper
上定义准备线程,循环和退出。Looper
类的主要功能:
提供线程上下文环境,创建与线程绑定的 Looper
创建 MessageQueue
loop
循环从 MessageQueue
中获取下一条消息(若无消息线程阻塞等待),指派 Handler
处理消息并回收消息
提供 loop
循环退出方法
源码分析 Looper
类中大部分都是 static
的变量和方法,不能直接实例化,通常都是固定格式来初始化。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 public final class Looper { private static final String TAG = "Looper" ; static final ThreadLocal<Looper> sThreadLocal=new ThreadLocal<Looper>(); private static Looper sMainLooper; final MessageQueue mQueue; final Thread mThread; ... public static void prepare () { prepare(true ); } private static void prepare (boolean quitAllowed) { if (sThreadLocal.get() != null ) { throw new RuntimeException( "Only one Looper may be created per thread" ); } sThreadLocal.set(new Looper(quitAllowed)); } public static void prepareMainLooper () { prepare(false ); synchronized (Looper.class) { if (sMainLooper != null ) { throw new IllegalStateException( "The main Looper has already been prepared." ); } sMainLooper = myLooper(); } } public static Looper getMainLooper () { synchronized (Looper.class) { return sMainLooper; } } public static @Nullable Looper myLooper () { return sThreadLocal.get(); } public static @NonNull MessageQueue myQueue () { return myLooper().mQueue; } private Looper (boolean quitAllowed) { mQueue = new MessageQueue(quitAllowed); mThread = Thread.currentThread(); } public static void loop () { final Looper me = myLooper(); if (me == null ) {throw new RuntimeException("..." );} final MessageQueue queue = me.mQueue; ... for (;;) { Message msg = queue.next(); if (msg == null ) { return ; } ... msg.target.dispatchMessage(msg); ... msg.recycleUnchecked(); } } public void quit () { mQueue.quit(false ); } public void quitSafely () { mQueue.quit(true ); } ... }
快速查看 Looper API 。
重要成员变量
sThreadLocal
结论:一个线程只能对应一个 Looper
。从源码中可以看到,静态变量 sThreadLocal
存储了所有线程对应的 Looper
,而 ThreadLocal
中数据存储的数据结构是一个定制的哈希表,其 key
值是当前线程。
sMainLooper
当前应用主线程对应的 Looper
,在应用对应的主线程 ActivityThread.main()
中初始化。通常我们在 Activity
中获取到的主线程 Looper
对应的就是 sMainLooper
。
Looper
的初始化代码中可以看到,Looper
的构造方法是私有的,客户端通常使用 Looper.prepare(...)
来实例化,并初始化应用场景。而 Looper.prepareMainLooper()
是用来初始化主线程环境的,整个 Android Framework
中只有 ActivityThread, SystemServer
这两个带有 main()
方法的类调用过,而这两个类分别用来开启应用主线程和系统主线程。Looper
的构造方法中,初始化了 mQueue
和 mThread
:
mQueue
Handler
机制的消息队列 MessageQueue
就是在 Looper
的构造方法中实例化的。
mThread
赋值的是当前线程:可以是主线程或者工作线程。
Looper.loop()
流程非常简单,进入无限循环并不停的从消息队列中获取消息,而获取消息的过程可能会阻塞。 Looper
线程拿到消息后,执行消息处理 Handler.dispatchMessage()
并回收已经处理过的消息 Message.recycleUnchecked()
。
Looper
退出Looper
通常是通过 Looper.loop()
进入无线循环,从消息队列中取出消息并处理。Handler
机制为工作线程提供了退出方法 quit/quitSafely
,调用 MessageQueue.quit()
来结束并退出。只要调用了 quit/quitSafely
,不管是正在退出还是已经退出,Looper
就不再接收新的消息。Handler
发送的消息,在 MessageQueue.enqueueMessage
中直接返回 false
,不做任何处理。
主线程的 Looper
是不允许退出的,在 MessageQueue
的构造方法中设定。prepareMainLooper
调用的是 prepare(false)
,即不能退出。
典型工作线程流程 Looper
线程典型的实现示例如下,将 prepare()
和 loop()
的分离,来创建一个与 Looper
通信的 Handler
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 class LooperThread extends Thread { public Handler mHandler; public void run () { Looper.prepare(); mHandler = new Handler() { public void handleMessage (Message msg) { } }; Looper.loop(); } }
Handler
详解Handler
子类必须是 static
的,避免潜在的内存泄露,在构造方法中可以打开开关做检测。Handler
类将 Message, Looper, MessageQueue
串起来,给外部提供完整的接口,发送并处理消息。它管理的消息队列 MessageQueue
是 Looper
的成员变量;而 Looper
可以是主线程,也可以是工作线程,在 Handler
初始化时指定。主要功能包含:
初始化一个消息
发布任务或者发送消息
回到 Looper
所在线程处理消息
移除消息或其回调
源码分析 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 public class Handler { ... final Looper mLooper; final MessageQueue mQueue; final Callback mCallback; final boolean mAsynchronous; public interface Callback { public boolean handleMessage (Message msg) ; } private static void handleCallback (Message message) { message.callback.run(); } public void handleMessage (Message msg) { } public void dispatchMessage (Message msg) { if (msg.callback != null ) { handleCallback(msg); } else { if (mCallback != null ) { if (mCallback.handleMessage(msg)) { return ; } } handleMessage(msg); } } public Handler () {this (null , false );} public Handler (Callback callback) {this (callback, false );} public Handler (Looper looper) {this (looper, null , false );} public Handler (Looper looper, Callback callback) { this (looper, callback, false ); } public Handler (boolean async) {this (null , async);} public Handler (Callback callback, boolean async) { if (FIND_POTENTIAL_LEAKS) { final Class<? extends Handler> klass = getClass(); if ((klass.isAnonymousClass() || klass.isMemberClass() || klass.isLocalClass()) && (klass.getModifiers() & Modifier.STATIC) == 0 ) { Log.w(TAG, "The following Handler class should be static " + "or leaks might occur: " + klass.getCanonicalName()); } } mLooper = Looper.myLooper(); if (mLooper == null ) { throw new RuntimeException("Can't create handler inside " + " thread that has not called Looper.prepare()" ); } mQueue = mLooper.mQueue; mCallback = callback; mAsynchronous = async; } public Handler (Looper looper, Callback callback, boolean async) { mLooper = looper; mQueue = looper.mQueue; mCallback = callback; mAsynchronous = async; } public final Message obtainMessage () {return Message.obtain(this );} public final Message obtainMessage (int what) { return Message.obtain(this , what); } public final Message obtainMessage (int what, Object obj) { return Message.obtain(this , what, obj); } public final Message obtainMessage (int what, int arg1, int arg2) { return Message.obtain(this , what, arg1, arg2); } public final Message obtainMessage (int what, int arg1, int arg2, Object obj) { return Message.obtain(this , what, arg1, arg2, obj); } private static Message getPostMessage (Runnable r) { Message m = Message.obtain(); m.callback = r; return m; } private static Message getPostMessage (Runnable r, Object token) { Message m = Message.obtain(); m.obj = token; m.callback = r; return m; } public final boolean post (Runnable r) { return sendMessageDelayed(getPostMessage(r), 0 ); } public final boolean postAtTime (Runnable r, long uptimeMillis) { return sendMessageAtTime(getPostMessage(r), uptimeMillis); } public final boolean postAtTime (Runnable r, Object token, long uptimeMillis) { return sendMessageAtTime(getPostMessage(r, token), uptimeMillis); } public final boolean postDelayed (Runnable r, long delayMillis) { return sendMessageDelayed(getPostMessage(r), delayMillis); } public final boolean postAtFrontOfQueue (Runnable r) { return sendMessageAtFrontOfQueue(getPostMessage(r)); } public final boolean sendMessage (Message msg) { return sendMessageDelayed(msg, 0 ); } public final boolean sendEmptyMessage (int what) { return sendEmptyMessageDelayed(what, 0 ); } public final boolean sendEmptyMessageDelayed (int what,long delayMillis) { Message msg = Message.obtain(); msg.what = what; return sendMessageDelayed(msg, delayMillis); } public final boolean sendEmptyMessageAtTime (int what,long uptimeMillis) { Message msg = Message.obtain(); msg.what = what; return sendMessageAtTime(msg, uptimeMillis); } public final boolean sendMessageDelayed (Message msg, long delayMillis) { if (delayMillis < 0 ) { delayMillis = 0 ; } return sendMessageAtTime(msg, SystemClock.uptimeMillis()+delayMillis); } public boolean sendMessageAtTime (Message msg, long uptimeMillis) { MessageQueue queue = mQueue; if (queue == null ) { RuntimeException e = new RuntimeException( this + " sendMessageAtTime() called with no mQueue" ); Log.w("Looper" , e.getMessage(), e); return false ; } return enqueueMessage(queue, msg, uptimeMillis); } public final boolean sendMessageAtFrontOfQueue (Message msg) { MessageQueue queue = mQueue; if (queue == null ) { RuntimeException e = new RuntimeException( this + " sendMessageAtTime() called with no mQueue" ); Log.w("Looper" , e.getMessage(), e); return false ; } return enqueueMessage(queue, msg, 0 ); } private boolean enqueueMessage (MessageQueue queue, Message msg, long uptimeMillis) { msg.target = this ; if (mAsynchronous) { msg.setAsynchronous(true ); } return queue.enqueueMessage(msg, uptimeMillis); } public final void removeCallbacks (Runnable r) { mQueue.removeMessages(this , r, null ); } public final void removeCallbacks (Runnable r, Object token) { mQueue.removeMessages(this , r, token); } public final void removeMessages (int what) { mQueue.removeMessages(this , what, null ); } public final void removeMessages (int what, Object object) { mQueue.removeMessages(this , what, object); } public final void removeCallbacksAndMessages (Object token) { mQueue.removeCallbacksAndMessages(this , token); } IMessenger mMessenger; final IMessenger getIMessenger () { synchronized (mQueue) { if (mMessenger != null ) { return mMessenger; } mMessenger = new MessengerImpl(); return mMessenger; } } private final class MessengerImpl extends IMessenger .Stub { public void send (Message msg) { msg.sendingUid = Binder.getCallingUid(); Handler.this .sendMessage(msg); } } public final boolean runWithScissors (final Runnable r, long timeout) { if (r == null ) { throw new IllegalArgumentException("runnable must not be null" ); } if (timeout < 0 ) { throw new IllegalArgumentException( "timeout must be non-negative" ); } if (Looper.myLooper() == mLooper) { r.run(); return true ; } BlockingRunnable br = new BlockingRunnable(r); return br.postAndWait(this , timeout); } private static final class BlockingRunnable implements Runnable { private final Runnable mTask; private boolean mDone; public BlockingRunnable (Runnable task) { mTask = task; } @Override public void run () { try { mTask.run(); } finally { synchronized (this ) { mDone = true ; notifyAll(); } } } public boolean postAndWait (Handler handler, long timeout) { if (!handler.post(this )) { return false ; } synchronized (this ) { if (timeout > 0 ) { final long expirationTime = SystemClock.uptimeMillis() + timeout; while (!mDone) { long delay = expirationTime - SystemClock.uptimeMillis(); if (delay <= 0 ) { return false ; } try { wait(delay); } catch (InterruptedException ex) { } } } else { while (!mDone) { try { wait(); } catch (InterruptedException ex) { } } } } return true ; } } ... }
快速查看 Handler API 。
构造方法及成员变量 从 Handler
的构造方法可以看出来,实例化时主要对 Looper, MessageQueue, Callback, mAsynchronous
四个成员变量的赋值。
Looper
Looper
如果来自于主线程,则不需要通过构造方法赋值;如果来自于工作线程,需要先执行 Looper.prepare
初始化 Looper
使用环境,并作为参数传递到构造方法中。
MessageQueue
Looper
值确定后,消息队列即为其成员变量 mQueue
。
Callback
可以直接通过 Callback
处理消息事件;特别是不需要自定义 Handler
子类来重写 handleMessage
时,默认为 null
。
mAsynchronous
是否允许异步执行,默认值为 false
即是同步执行的。它最终会被设置到对应的 Message.flags = FLAG_ASYNCHRONOUS
,在 Handler
中被没有其他作用。
自定义 Handler
的子类在实例化时,为了避免内存泄露,通常将子类设置为 static
类型,也就是避免内部类引用当前 Activity
对象。
生成消息 Handler
提供了生成消息的方法,但都是对 Message.obtain
的封装,所以在使用消息前,推荐直接使用 Message
类来生成。
任务发布 不管是通过 post***()
还是 send***()
方法来发布任务或者消息,最终都是调用的 enqueueMessage()
,即将 Message
加入到消息队列中 MessageQueue
。enqueueMessage()
的两个重要动作:
Handler
和 Message
的关联Message
的成员变量 target
存储了处理该消息的 Handler
,在消息入队时,将两者关联起来。
设置异步消息Handler.mAsynchronous
成员变量,在入队时将对应的 Message
设置为异步。
消息处理 dispatchMessage
是 Handler
消息处理的入口方法,有三个优先级来处理消息:
优先:如果 Message
中定义了 Callback
其次:如果 Handler
中定义了 Callback
最后:才是调用子类重写的 handleMessage
如果是自定义 Handler
子类,必须重写 handleMessage
来实现消息处理。而消息处理是在 Looper
所在线程来处理的,Looper
如果来自于主线程则在主线程中处理消息;如果来自于工作线程则在工作线程中处理消息。
移除消息 remove***()
系列的方法,是指移除 Message
或者 Message.callback
;最终调用的是从 MessageQueue
中移除对应消息。
Messenger
相关Handler
中实现了 Messenger
用于进程间异步通信的代码。源码中可以看出 Messenger
通过 AIDL
来实现的进程间通信的;而通过 Handler.sendMessage
来实现异步通信。详细参考Messenger 详解 。
BlockingRunnable
阻塞并同步执行任务BlockingRunnable
同步等待指定任务执行完成才会返回,通过 Object.wait/notify
实现同步阻塞功能。Handler.runWithScissors
方法是具体的实现,它是一个 @hide
方法,客户端 App
并不能直接调用若。该方法是如果调用线程与 Looper
所在线程如果在同一个线程,直接执行完后退出;如果不是同一线程,可以简单的实现 timeout
,方法调用以后会阻塞,直到传入的 runnable
结束或者是 timeout
。
MessageQueue
详解MessageQueue
的内部存储了一组消息,以队列的形式对外提供插入和删除的工作。采用单链表的数据结构来存储消息列表,按照消息发生的绝对时间 Message.when
排序。所以消息队列并不是绝对的先进先出队列,是按照消息发生时间先进先出。Looper.loop
会进入无限循环,而它会调用 MessageQueue.next
方法取出下一条消息;当有消息时,取出交给 Handler
处理;当没有消息时,当前线程便会阻塞在 next()
方法的 nativePollOnce()
中,当线程便会释放 CPU
资源进入休眠状态,直到下个消息到达或者有事务发生时,唤醒当前线程;这里的线程休眠唤醒,涉及到 Linux epoll
机制,后面会详细介绍。MessageQueue
是在多线程环境下操作,所以绝大部分的方法都使用了 synchronized(this)
关键字;是 Handler
机制的核心类,native
代码在 android_os_MessageQueue.cpp
中实现;MessageQueue
还能监听文件描述符,当发生变化时做出相应事件处理,但是 Handler
机制中并没有使用该功能,本文不做介绍。MessageQueue
主要功能:
初始化 native
环境
将 Message
加入队列
调用 native
方法,当前线程阻塞等待,直到从队列中取出 Message
多线程环境中维护消息队列入队、出队、销毁
源码分析 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 public final class MessageQueue { ... private final boolean mQuitAllowed; private boolean mQuitting; private long mPtr; Message mMessages; private final ArrayList<IdleHandler> mIdleHandlers = new ArrayList<IdleHandler>(); private SparseArray<FileDescriptorRecord> mFileDescriptorRecords; private IdleHandler[] mPendingIdleHandlers; private boolean mBlocked; MessageQueue(boolean quitAllowed) { mQuitAllowed = quitAllowed; mPtr = nativeInit(); } private void dispose () { if (mPtr != 0 ) { nativeDestroy(mPtr); mPtr = 0 ; } } public boolean isIdle () { synchronized (this ) { final long now = SystemClock.uptimeMillis(); return mMessages == null || now < mMessages.when; } } public void addIdleHandler (@NonNull IdleHandler handler) { if (handler == null ) { throw new NullPointerException("Can't add a null IdleHandler" ); } synchronized (this ) { mIdleHandlers.add(handler); } } public void removeIdleHandler (@NonNull IdleHandler handler) { synchronized (this ) { mIdleHandlers.remove(handler); } } public static interface IdleHandler { boolean queueIdle () ; } public boolean isPolling () { synchronized (this ) { return isPollingLocked(); } } private boolean isPollingLocked () { return !mQuitting && nativeIsPolling(mPtr); } boolean hasMessages (Handler h, int what, Object object) { if (h == null ) { return false ; } synchronized (this ) { Message p = mMessages; while (p != null ) { if (p.target == h && p.what == what && (object == null || p.obj == object)) { return true ; } p = p.next; } return false ; } } boolean hasMessages (Handler h, Runnable r, Object object) {...} void removeMessages (Handler h, int what, Object object) { if (h == null ) { return ; } synchronized (this ) { Message p = mMessages; while (p != null && p.target == h && p.what == what && (object == null || p.obj == object)) { Message n = p.next; mMessages = n; p.recycleUnchecked(); p = n; } while (p != null ) { Message n = p.next; if (n != null ) { if (n.target == h && n.what == what && (object == null || n.obj == object)) { Message nn = n.next; n.recycleUnchecked(); p.next = nn; continue ; } } p = n; } } } void removeMessages (Handler h, Runnable r, Object object) {...} void removeCallbacksAndMessages (Handler h, Object object) {...} private void removeAllMessagesLocked () {...} private void removeAllFutureMessagesLocked () {...} void quit (boolean safe) { if (!mQuitAllowed) { throw new IllegalStateException( "Main thread not allowed to quit." ); } synchronized (this ) { if (mQuitting) { return ; } mQuitting = true ; if (safe) { removeAllFutureMessagesLocked(); } else { removeAllMessagesLocked(); } nativeWake(mPtr); } } private int mNextBarrierToken; public int postSyncBarrier () { return postSyncBarrier(SystemClock.uptimeMillis()); } private int postSyncBarrier (long when) { synchronized (this ) { final int token = mNextBarrierToken++; final Message msg = Message.obtain(); msg.markInUse(); msg.when = when; msg.arg1 = token; Message prev = null ; Message p = mMessages; if (when != 0 ) { while (p != null && p.when <= when) { prev = p; p = p.next; } } if (prev != null ) { msg.next = p; prev.next = msg; } else { msg.next = p; mMessages = msg; } return token; } } public void removeSyncBarrier (int token) { synchronized (this ) { Message prev = null ; Message p = mMessages; while (p != null && (p.target != null || p.arg1 != token)) { prev = p; p = p.next; } if (p == null ) { throw new IllegalStateException( "The specified message queue synchronization barrier token" + " has not been posted or has already been removed." ); } final boolean needWake; if (prev != null ) { prev.next = p.next; needWake = false ; } else { mMessages = p.next; needWake = mMessages == null || mMessages.target != null ; } p.recycleUnchecked(); if (needWake && !mQuitting) { nativeWake(mPtr); } } } boolean enqueueMessage (Message msg, long when) { if (msg.target == null ) { throw new IllegalArgumentException("Message must have a target." ); } if (msg.isInUse()) { throw new IllegalStateException(msg + " This message is already in use." ); } synchronized (this ) { if (mQuitting) { IllegalStateException e = new IllegalStateException(msg.target + " sending message to a Handler on a dead thread" ); Log.w(TAG, e.getMessage(), e); msg.recycle(); return false ; } msg.markInUse(); msg.when = when; Message p = mMessages; boolean needWake; if (p == null || when == 0 || when < p.when) { msg.next = p; mMessages = msg; needWake = mBlocked; } else { needWake = mBlocked && p.target == null && msg.isAsynchronous(); Message prev; for (;;) { prev = p; p = p.next; if (p == null || when < p.when) { break ; } if (needWake && p.isAsynchronous()) { needWake = false ; } } msg.next = p; prev.next = msg; } if (needWake) { nativeWake(mPtr); } } return true ; } Message next () { final long ptr = mPtr; if (ptr == 0 ) { return null ; } int pendingIdleHandlerCount = -1 ; int nextPollTimeoutMillis = 0 ; for (;;) { if (nextPollTimeoutMillis != 0 ) { Binder.flushPendingCommands(); } nativePollOnce(ptr, nextPollTimeoutMillis); synchronized (this ) { final long now = SystemClock.uptimeMillis(); Message prevMsg = null ; Message msg = mMessages; if (msg != null && msg.target == null ) { do { prevMsg = msg; msg = msg.next; } while (msg != null && !msg.isAsynchronous()); } if (msg != null ) { if (now < msg.when) { nextPollTimeoutMillis = (int ) Math.min(msg.when - now, Integer.MAX_VALUE); } else { mBlocked = false ; if (prevMsg != null ) { prevMsg.next = msg.next; } else { mMessages = msg.next; } msg.next = null ; if (DEBUG) Log.v(TAG, "Returning message: " + msg); msg.markInUse(); return msg; } } else { nextPollTimeoutMillis = -1 ; } if (mQuitting) { dispose(); return null ; } if (pendingIdleHandlerCount < 0 && (mMessages == null || now < mMessages.when)) { pendingIdleHandlerCount = mIdleHandlers.size(); } if (pendingIdleHandlerCount <= 0 ) { mBlocked = true ; continue ; } if (mPendingIdleHandlers == null ) { mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4 )]; } mPendingIdleHandlers=mIdleHandlers.toArray(mPendingIdleHandlers); } for (int i = 0 ; i < pendingIdleHandlerCount; i++) { final IdleHandler idler = mPendingIdleHandlers[i]; mPendingIdleHandlers[i] = null ; boolean keep = false ; try { keep = idler.queueIdle(); } catch (Throwable t) { Log.wtf(TAG, "IdleHandler threw exception" , t); } if (!keep) { synchronized (this ) { mIdleHandlers.remove(idler); } } } pendingIdleHandlerCount = 0 ; nextPollTimeoutMillis = 0 ; } } public boolean isPolling () { synchronized (this ) { return isPollingLocked(); } } private boolean isPollingLocked () { return !mQuitting && nativeIsPolling(mPtr); } public void addOnFileDescriptorEventListener (@NonNull FileDescriptor fd, @OnFileDescriptorEventListener .Events int events, @NonNull OnFileDescriptorEventListener listener) {...} public void removeOnFileDescriptorEventListener ( @NonNull FileDescriptor fd) {...} private void updateOnFileDescriptorEventListenerLocked (FileDescriptor fd, int events, OnFileDescriptorEventListener listener) {...} private int dispatchEvents (int fd, int events) {...} public interface OnFileDescriptorEventListener { ... @Events int onFileDescriptorEvents (@NonNull FileDescriptor fd, @Events int events) ; } private static final class FileDescriptorRecord {...} private native static long nativeInit () ; private native static void nativeDestroy (long ptr) ; private native void nativePollOnce (long ptr, int timeoutMillis) ; private native static void nativeWake (long ptr) ; private native static boolean nativeIsPolling (long ptr) ; private native static void nativeSetFileDescriptorEvents (long ptr, int fd, int events) ;}
快速查看 MessageQueue API
重要成员变量
mMessages
用于存储消息,利用 Message
链表结构实现先进先出队列,确保先进入或者时间最近的消息先被处理。
ArrayList mIdleHandlers
记录消息队列中有多少个 IdleHandler
。
IdleHandler[] mPendingIdleHandlers
由 mIdleHandlers
转换为对应的数组。
mBlocked
当 next
方法在等待 pollOnce
,mBlocked
记录 next
是否处于阻塞状态。
mQuitting
记录当前消息队列是否正在退出。
mNextBarrierToken
同步屏障令牌,同步屏障 Barriers
表示一个没有指定 Handler
的 Message
,它的 arg1
参数记录了这个令牌。
mFileDescriptorRecords
存储被监听文件描述符的表,Handler
机制中消息发送并没有使用文件描述符监功能,暂时不做介绍。AOSP
系统源码中仅 frameworks/base/core/java/android/os/ParcelFileDescriptor.java
用到了。
初始化 Handler
机制中,MessageQueue
的实例化是在 Looper
的构造方法中实现的,赋值两个成员变量:
mQuitAllowed
是否运行消息队列退出。主线程的消息队列是不允许退出的,工作线程可以。
mPtr
执行 nativeInit
方法,并将 native
层的 NativeMessageQueue
类对象地址赋值给 Java
层的成员变量 mPtr
。通过这种方式将 Java
层的对象与 native
层的对象关联在了一起。这种在 Java
层保存 native
层对象引用地址来实现关联的方式,在 Android
源代码中能经常看到。
同步屏障 同步屏障 SyncBarrier
是一个特殊的消息 Message
,有两个重要特点:
Handler target
为空
arg1
记录同步屏障令牌 同步屏障消息的 what
为空,也就是不能通过 what
来标记消息了,而通过 arg1
记录同步令牌来做为消息标记。令牌非常重要,移除同步屏障时需要指定具体的令牌。
如果设置了同步屏障,消息队列所有的同步消息都会被暂停,next()
只会取出异步消息处理,直到移除同步屏障。如果没有正确移除同步屏障,整个消息队列将会挂起,同时也不再循环取出同步消息执行。
设置同步屏障postSyncBarrier()
设置同步屏障时,将它加入到消息队列,并将同步屏障消息设为头结点。也就是同步屏障是插入到队列的头部的,优先级很高,消息通过 enqueueMessage
加入队列都是插入队尾的。设置同步屏障并不会唤醒队列。
移除同步屏障removeSyncBarrier(int token)
根据令牌移除同步屏障,遍历消息队列。如果同步屏障位于头结点,且下一条消息为同步消息,表示队列没有被被挂起,唤醒队列。移除同步屏障后,消息队列恢复正常,循环取出消息。
同步屏障相关方法都是 @hide
的,也就是说只会在 Android Framework
内部使用,客户端 App
并不能应用这些功能。并且同步屏障和异步消息在 Android Framework
中使用的也很少,比如搜索到的应用如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 xmt@server005 :~/android$ grep -irsn postSyncBarrier frameworks/ frameworks/base/cmds/hid/src/com/android/commands/hid/Device.java:143 : mBarrierToken = getLooper().myQueue().postSyncBarrie (); frameworks/base/core/java/android/view/ViewRootImpl.java:1225 : public void dispatchInputEvent (InputEvent event, InputEventReceiver receiver) { SomeArgs args = SomeArgs.obtain(); args.arg1 = event; args.arg2 = receiver; Message msg = mHandler.obtainMessage(MSG_DISPATCH_INPUT_EVENT, args); msg.setAsynchronous(true ); mHandler.sendMessage(msg); } void scheduleTraversals () { if (!mTraversalScheduled) { mTraversalScheduled = true ; mTraversalBarrier=mHandler.getLooper().getQueue().postSyncBarrier(); mChoreographer.postCallback( Choreographer.CALLBACK_TRAVERSAL, mTraversalRunnable, null ); if (!mUnbufferedInputDispatch) { scheduleConsumeBatchedInput(); } notifyRendererOfFramePending(); pokeDrawLockIfNeeded(); } }
Android
系统的屏幕点击事件,就是一个异步消息;View
请求启动绘制生命周期 ViewRootImpl.scheduleTraversals
会设置同步屏障,优先处理异步消息。
消息入队 enqueueMessage
后台线程完成耗时操作后,通过对象 Handler.post***/send***
发送消息,最终会进入到 MessageQueue.enqueueMessage
,而 enqueueMessage
仅仅是将消息按照 when
顺序插入到消息队列中。后台线程不会被阻塞,执行完毕可以退出。
入队顺序 Message.when
按照消息的绝对时间 Message.when
排序,插入到队列中,如果时间相同就按照入队先后顺序。确保消息投递的绝对时间 when
越小,越早出队。
唤醒队列 needWake
如果消息队列处于阻塞状态 mBlocked=true
时,插入消息时才需要唤醒队列。但是当消息队列队首为同步屏障,并且新消息本身为异步消息,同时当消息队列中还存在其他异步消息时,此时不会唤醒,具体看上面的流程图。消息插入消息队列,正常情况下不会唤醒,除了图中的几个特殊情况。
消息队列正在退出 mQuitting
如果消息队列调用了 quit()
方法,表示消息队列正在退出;当消息队列正在退出时,直接回收当前准备入队的消息,并返回 false
,入队失败。
消息队列存储顺序 消息队列中,首先按照 msg.when
排序,结合同步屏障,异步消息,后期补一个图。
消息出队 next
主线程的 Looper.loop()
会无限循环,不停的调用 MessageQueue.next()
获取下一条消息,交给 Handler
处理。而 next
通过 nativePollOnce
方法实现线程的阻塞,阻塞时间为消息的延迟时间。当时间到了后,唤醒主线程取出这个消息;即后台线程发布的消息,回到主线程来处理。
阻塞时间 nextPollTimeoutMillis
表示阻塞时间,通常为消息的延迟时间。如果为 0 ,表示不阻塞,直接返回;如果为 -1 ,表示一直阻塞。当消息队列中没有消息时,nextPollTimeoutMillis=-1
使得 next
会一直阻塞,直到有消息进来,更新这个值。
阻塞状态 mBlocked
next
中的 for
循环取到消息后,next
返回这个消息并退出,消息队列阻塞状态为 false
;for
循环没有取到消息,如果存在 IdleHandler
时,利用这段时间处理空闲任务;否则设置消息队列为阻塞状态 true
;重新进入循环继续阻塞等待取消息。
消息出队顺序 同步消息都是按照 msg.when
即插入顺序先进先出的;当消息队列队首为同步屏障时,挂起队列中所有同步消息,循环取出队列中所有的异步消息,直到移除同步屏障。
消息队列正在退出 mQuitting
如果消息队列调用了 quit()
方法,表示消息队列正在退出;当消息队列正在退出时,调用 dispose()
方法,销毁 native
环境,跳出 Looper.loop
循环,退出整个机制。
空闲线程 IdleHandler
当消息队列为空,或者消息投递时间没到时;如果添加了空闲线程处理器 IdleHandler
,此时会利用线程空闲时间执行这些任务。
消息轮询状态 isPolling/isPollingLocked
方法主要是否返回当前线程轮询状态,即当前线程是否处于 epoll_wait
阻塞等待中,该方法在 Android Framework
中使用的也很少:
1 2 frameworks/base/services/core/java/com/android/server/Watchdog.java:121 : if (mMonitors.size() == 0 && mHandler.getLooper().getQueue().isPolling()) {...}
MessageQueue.mBlocked
也是记录的阻塞状态,和 isPolling/isPollingLocked
功能差不多,只是没有 isPolling/isPollingLocked
方法返回的状态更精细、更准确。
删除消息
删除指定消息 符合指定消息的可能并不止一个,所以需要遍历整个队列,找到这些符合条件的消息。删除时分两种情况:如果指定消息是头结点,或者头部几个节点都是指定消息,删除指定消息后,重新指定头结点;如果头结点或者新的头结点不是指定消息,依次遍历整个队列,找到目标消息移出队列并回收。
删除所有消息 从头结点开始遍历队列,移出队列中所有消息并删除。
删除延迟消息 从头结点开始遍历队列,对比当前时间和 Message.when
,找出延迟消息,删除掉这些还没来得及投递处理的消息。
消息队列退出 MessageQueue.quit()
方法中有如下几个判断:
是否运行退出 mQuitAllowed
主线程的 Looper
循环是不允许退出的,这在主线程初始化时,构造 MessageQueue
决定的;工作线程可以退出。
是否正在退出 mQuitting
因为是多线程环境,所以使用了 synchronized
关键字来确保并发正确。
是否安全移除 safe
是否安全移除,主要区别在于移除哪些消息。如果不安全退出,直接移除队列中所有消息;如果安全退出,对比当前时间和 Message.when
,只移除延迟消息。
MessageQueue.quit()
方法主要是移除消息队列中对应的消息,并设置了 mQuitting=true
;调用 nativeWake()
方法唤醒 next()
方法,如果是安全退出,则取出剩余的消息并返回给 Handler
处理;next
方法中如果取不到消息,当 mQuitting=true
时,释放资源并退出整个 Looper.loop()
循环机制。
IdleHandler
接口一个回调接口,在消息队列阻塞等待前 ,当前线程可以利用这段时间做别的事情。
MessageQueue.addIdleHandler()
添加回调接口。
MessageQueue.removeIdleHandler()
移除回调接口。
示例 比如 ActivityThread
中,利用这个空闲时间,添加了 GcIdle
回收内存和 Idler
释放 AMS
相关资源。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 Looper.myQueue().addIdleHandler(mGcIdler); final class GcIdler implements MessageQueue .IdleHandler { @Override public final boolean queueIdle () { doGcIfNeeded(); return false ; } } void doGcIfNeeded () { mGcIdlerScheduled = false ; final long now = SystemClock.uptimeMillis(); if ((BinderInternal.getLastGcTime()+MIN_TIME_BETWEEN_GCS) < now) { BinderInternal.forceGc("bg" ); } } Looper.myQueue().addIdleHandler(new Idler()); private class Idler implements MessageQueue .IdleHandler { @Override public final boolean queueIdle () { ActivityClientRecord a = mNewActivities; ... if (a != null ) { mNewActivities = null ; IActivityManager am = ActivityManagerNative.getDefault(); ActivityClientRecord prev; do { ... if (a.activity != null && !a.activity.mFinished) { try { am.activityIdle(a.token, a.createdConfig, stopProfiling); a.createdConfig = null ; } catch (RemoteException ex) { throw ex.rethrowFromSystemServer(); } } prev = a; a = a.nextIdle; prev.nextIdle = null ; } while (a != null ); } ... } }
native
方法简要描述几个 native
方法的作用:
private native static long nativeInit();
:初始化 native
对象,绑定 Looper
和线程等
private native static void nativeDestroy(long ptr);
销毁 native
对象
private native void nativePollOnce(long ptr, int timeoutMillis);
唯一的非静态方法,也就是说这个方法是属于对象的;当前线程阻塞最多等待 timeoutMillis
时间(类似 Object.wait(timeout)
),这段时间内可能会被 nativeWake
唤醒。
private native static void nativeWake(long ptr);
唤醒 nativePollOnce
函数
private native static boolean nativeIsPolling(long ptr);
查询 nativePollOnce
是否正处于阻塞状态
private native static void nativeSetFileDescriptorEvents(long ptr, int fd, int events);
添加文件描述符监听事件
Native JNI
层Native JNI
层起到一个连接的作用,Handler
机制中使用的 epoll
等功能都是在系统层实现的,JNI
对这些功能调用做一个封装,相当于外观模式中的外观类。注意:Android Framework
中还有一个 ALooper
类,也是实现的消息机制,它主要是用于多媒体相关,代码路径 frameworks/av/media/
,和本文介绍的消息机制没有任何关系。 如下根据 MessageQueue
中的 6 个 native
方法逐一展开分析,具体对应的 JNI
类为 Android_os_MessageQueue.cpp
:
环境初始化 nativeInit
nativeInit
函数主要是初始化所有 native
需要使用的类对象,变量等,主要就是 Looper.cpp
相关资源的初始化;函数的返回值是一个 nativeMessageQueue
对象,它是对 Looper.cpp
相关功能的一个封装。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 static jlong android_os_MessageQueue_nativeInit (JNIEnv* env, jclass clazz) { NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue(); if (!nativeMessageQueue) { jniThrowRuntimeException(env, "Unable to allocate native queue" ); return 0 ; } nativeMessageQueue->incStrong(env); return reinterpret_cast <jlong>(nativeMessageQueue); } NativeMessageQueue::NativeMessageQueue() : mPollEnv(NULL ), mPollObj(NULL ), mExceptionObj(NULL ) { mLooper = Looper::getForThread(); if (mLooper == NULL ) { mLooper = new Looper(false ); Looper::setForThread(mLooper); } } class NativeMessageQueue : public MessageQueue, public LooperCallback {public : NativeMessageQueue(); virtual ~NativeMessageQueue(); virtual void raiseException (JNIEnv* env, const char * msg, jthrowable exceptionObj) ; void pollOnce (JNIEnv* env, jobject obj, int timeoutMillis) ; void wake () ; void setFileDescriptorEvents (int fd, int events) ; virtual int handleEvent (int fd, int events, void * data) ; private : JNIEnv* mPollEnv; jobject mPollObj; jthrowable mExceptionObj; };
销毁 nativeDestroy
native
中的几个关键对象,都使用了智能指针管理,所以销毁时减少引用计数,如果引用不存在自动销毁。
1 2 3 4 5 6 7 8 9 10 11 static void android_os_MessageQueue_nativeDestroy (JNIEnv* env, jclass clazz, jlong ptr) { NativeMessageQueue* nativeMessageQueue = reinterpret_cast <NativeMessageQueue*>(ptr); nativeMessageQueue->decStrong(env); } NativeMessageQueue::~NativeMessageQueue() { }
轮询 nativePollOnce
唯一的非静态方法,也就是说这个方法是属于对象的,因为每个对象可以处理自己的回调。Handler
机制的核心,nativePollOnce
调用了 epoll
中的 epoll_wait
方法,当前线程阻塞等待:文件描述符事件(本文主要是唤醒事件)更新或者超时。 阻塞等待时当前线程会释放 CPU
资源进入休眠状态,直到被唤醒,这个功能是 Linux epoll
机制实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 static void android_os_MessageQueue_nativePollOnce (JNIEnv* env,jobject obj, jlong ptr, jint timeoutMillis) { NativeMessageQueue* nativeMessageQueue = reinterpret_cast <NativeMessageQueue*>(ptr); nativeMessageQueue->pollOnce(env, obj, timeoutMillis); } void NativeMessageQueue::pollOnce (JNIEnv* env, jobject pollObj, int timeoutMillis) { mPollEnv = env; mPollObj = pollObj; mLooper->pollOnce(timeoutMillis); mPollObj = NULL ; mPollEnv = NULL ; if (mExceptionObj) { env->Throw(mExceptionObj); env->DeleteLocalRef(mExceptionObj); mExceptionObj = NULL ; } } int NativeMessageQueue::handleEvent (int fd,int looperEvents,void * data) { ... int newWatchedEvents = mPollEnv->CallIntMethod(mPollObj, gMessageQueueClassInfo.dispatchEvents, fd, events); if (!newWatchedEvents) { return 0 ; } if (newWatchedEvents != oldWatchedEvents) { setFileDescriptorEvents(fd, newWatchedEvents); } return 1 ; } static struct { jfieldID mPtr; jmethodID dispatchEvents; } gMessageQueueClassInfo; gMessageQueueClassInfo.dispatchEvents = GetMethodIDOrDie(env, clazz, "dispatchEvents" , "(II)I" );
pollOnce
方法非常简单,需要获取当前对象,所以该方法不能为是静态的。调用了 Looper.pollOnce
方法,它会回调 pollObj
对象的 dispatchEvents
方法。而 Handler
收发消息机制中,并不会使用到文件描述符事件的回调,只需简单了解。
唤醒 nativeWake
Looper.cpp
中,唤醒事件是单独的文件描述符在监听,具体为 eventfd
,用于线程间通信。实际上 nativeWake
函数仅仅是向 eventfd
中写入一个值,将 epoll_wait
函数唤醒。
1 2 3 4 5 6 7 8 9 10 static void android_os_MessageQueue_nativeWake (JNIEnv* env, jclass clazz, jlong ptr) { NativeMessageQueue* nativeMessageQueue = reinterpret_cast <NativeMessageQueue*>(ptr); nativeMessageQueue->wake(); } void NativeMessageQueue::wake () { mLooper->wake(); }
是否正在轮询等待 nativeIsPolling
nativeIsPolling
函数返回的是 Looper.cpp
中 epoll_wait
是否处于阻塞等待状态;这个方法在 Android Framework
使用的也很少。
1 2 3 4 5 6 static jboolean android_os_MessageQueue_nativeIsPolling (JNIEnv* env, jclass clazz, jlong ptr) { NativeMessageQueue* nativeMessageQueue = reinterpret_cast <NativeMessageQueue*>(ptr); return nativeMessageQueue->getLooper()->isPolling(); }
监听文件描述符 nativeSetFileDescriptorEvents
nativeSetFileDescriptorEvents
函数通过 Looper.cpp::addFd
,向 epoll_wait
中添加文件描述符监听事件,对应的回调 dispatchEvents
在 pollOnce
中分析过。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 static void android_os_MessageQueue_nativeSetFileDescriptorEvents ( JNIEnv* env, jclass clazz, jlong ptr, jint fd, jint events) { NativeMessageQueue* nativeMessageQueue = reinterpret_cast <NativeMessageQueue*>(ptr); nativeMessageQueue->setFileDescriptorEvents(fd, events); } void NativeMessageQueue::setFileDescriptorEvents (int fd, int events) { if (events) { int looperEvents = 0 ; if (events & CALLBACK_EVENT_INPUT) { looperEvents |= Looper::EVENT_INPUT; } if (events & CALLBACK_EVENT_OUTPUT) { looperEvents |= Looper::EVENT_OUTPUT; } mLooper->addFd(fd, Looper::POLL_CALLBACK, looperEvents, this , reinterpret_cast <void *>(events)); } else { mLooper->removeFd(fd); } }
系统层实现 系统层 Looper.cpp
都是在 Linux
中编程实现,包括下面介绍的线程相关方法、eventfd
文件描述符、epoll
机制。Looper.cpp
实现了完整的 native
消息收发机制,使用 MessageEnvelope
存储消息;Request
存储文件描述符的监听;这些都是提供给 native
层使用,不过 Android Framework
使用的并不是特别多。 本文介绍的 Java
层 Handler
消息通信机制中,收发消息过程,只会有唤醒 POLL_WAKE
和超时 POLL_TIMEOUT
两种返回事件。最开始看源码时总是将 Java
消息和 native
的消息一一对应,或者相互关联;但实际上它们是两个完全不同的东西,只是共用了 eventfd, epoll
消息处理机制,本文不做 native
消息的介绍,这两个概念不要混淆了。
线程相关方法 TLS: Thread Local Storage
线程本地存储,和 Java ThreadLocal
功能一样,保存线程私有数据。使用同一个变量,不同线程存储和拿到的结果能够被隔离,相当于提供了一个同名而不同值的全局变量(这个全局变量相对于拥有这个变量的线程来说)。
pthread_key_t gTLSKey
gTLSKey
共享变量。
pthread_key_create
pthread_key_create(& gTLSKey, threadDestructor);
,绑定 gTLSKey
变量,该变量所有线程都可以访问,但各个线程可以根据自己的需要往 gTLSKey
中填入不同的值。threadDestructor
是销毁函数,线程退出时会调用该销毁函数。
pthread_setspecific
pthread_setspecific(gTLSKey, looper.get());
,当前线程向 gTLSKey
中写入数据。
pthread_getspecific
(Looper*)pthread_getspecific(gTLSKey);
,从 gTLSKey
中取出当前线程存储的值。
pthread_once
仅执行一次 函数声明: int pthread_once(pthread_once_t *once_control, void (*init_routine) (void));
函数功能:变量 once_control
的值设为 PTHREAD_ONCE_INIT
时,本函数保证 init_routine()
函数在进程执行序列中仅执行一次。
根据上面的简单介绍,Looper.cpp
中关于 TLS
线程本地存储的几个函数源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 static pthread_once_t gTLSOnce = PTHREAD_ONCE_INIT;static pthread_key_t gTLSKey = 0 ;sp<Looper> Looper::getForThread () { int result = pthread_once(& gTLSOnce, initTLSKey); LOG_ALWAYS_FATAL_IF(result != 0 , "pthread_once failed" ); return (Looper*)pthread_getspecific(gTLSKey); } void Looper::initTLSKey () { int result = pthread_key_create(& gTLSKey, threadDestructor); LOG_ALWAYS_FATAL_IF(result != 0 , "Could not allocate TLS key." ); } void Looper::threadDestructor (void *st) { Looper* const self = static_cast <Looper*>(st); if (self != NULL ) { self->decStrong((void *)threadDestructor); } } void Looper::setForThread (const sp<Looper>& looper) { sp<Looper> old = getForThread(); if (looper != NULL ) { looper->incStrong((void *)threadDestructor); } pthread_setspecific(gTLSKey, looper.get()); if (old != NULL ) { old->decStrong((void *)threadDestructor); } }
eventfd
文件描述符eventfd
是一个用来通知事件的文件描述符,用来实现进程/线程间的等待/通知 wait/notify
机制以及数据交互,通过内核取唤醒用户态的事件。 只有一个系统调用接口:int eventfd(unsigned int initval, int flags);
,返回一个新建的 eventfd
对象,该对象是一个内核维护的无符号的 64 位整型计数器,初始化值为 initval
;flags
有几个可选项:
EFD_NONBLOCK
:设置对象为非阻塞状态,read
读取 eventfd
时不会阻塞
EFD_CLOEXEC
:表示执行时关闭;设置此标志后,在执行 exec
时才关闭 eventfd
描述符,否则该描述符一直打开
eventfd
是一个文件描述符,所以直接通过 read/write/close
来读写关闭。常见示例为进程/线程 A
向 eventfd
中写入数据,进程/线程 B
从 eventfd
中读取数据。Looper.cpp
中源码分析:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 Looper::Looper(bool allowNonCallbacks) : mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false ), mPolling(false ), mEpollFd(-1 ), mEpollRebuildRequired(false ), mNextRequestSeq(0 ), mResponseIndex(0 ), mNextMessageUptime(LLONG_MAX){ mWakeEventFd = eventfd(0 , EFD_NONBLOCK | EFD_CLOEXEC); LOG_ALWAYS_FATAL_IF(mWakeEventFd < 0 ,"Could not make wake event fd: %s" , strerror(errno)); AutoMutex _l(mLock); rebuildEpollLocked(); } Looper::~Looper() { close(mWakeEventFd); if (mEpollFd >= 0 ) { close(mEpollFd); } } void Looper::wake () { ... uint64_t inc = 1 ; ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof (uint64_t ))); if (nWrite != sizeof (uint64_t )) { if (errno != EAGAIN) { ALOGW("Could not write wake signal: %s" , strerror(errno)); } } } void Looper::awoken () { ... uint64_t counter; TEMP_FAILURE_RETRY(read(mWakeEventFd, &counter, sizeof (uint64_t ))); }
epoll
机制I/O
多路复用就通过一种机制,可以监视多个文件描述符,一旦某个文件描述符就绪(通常是读或者写就绪),能够通知程序进行相应的读写操作。select, poll, epoll
都是 IO
多路复用的机制,本质上它们都是同步 I/O
,都需要在读写事件就绪后自己负责进行读写。epoll
是 select
和 poll
的增强版本;相对于 select, poll
来说,epoll
更加灵活,没有描述符限制。epoll
使用一个文件描述符可以监听多个其他多个描述符,将用户关系文件描述符的事件存放到内核的一个事件表中,这样在用户空间和内核空间的 copy
只需一次。epoll
只有三个系统调用:
epoll_create
函数声明:int epoll_create(int size);
,创建一个 epoll
的句柄。 参数 size
用来告诉内核这个监听的数目一共有多大。需要注意的是当创建好 epoll
句柄后,它就是会占用一个 fd
值,使用完后通过 close(mEpollFd);
来关闭。
epoll_ctl
函数声明:int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
,事件注册函数,告诉内核要监听什么类型的事件以及哪些文件描述符。 参数 epfd
是 epoll_create
创建的句柄;fd
是被监听文件描述符;op
表示动作,用三个宏来表示:EPOLL_CTL_ADD
:注册新的 fd
到 epfd
中;EPOLL_CTL_MOD
:修改已经注册的 fd
的监听事件;EPOLL_CTL_DEL
:从 epfd
中删除一个 fd
; 参数 event
告诉内核需要监听什么事,可以是以下几个宏的集合:EPOLLIN
:表示对应的文件描述符可以读;EPOLLOUT
:表示对应的文件描述符可以写;EPOLLPRI
:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);EPOLLERR
:表示对应的文件描述符发生错误;EPOLLHUP
:表示对应的文件描述符被挂断;EPOLLET
: 将 EPOLL
设为边缘触发 Edge Triggered
模式,这是相对于水平触发 Level Triggered
来说的,关于 ET/LT
主要是效率的区别,不做展开;EPOLLONESHOT
:只监听一次事件,当监听完这次事件之后,如果还需要继续监听的话,需要再次加入到队列里;
epoll_wait
函数声明:int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
,阻塞等待 事件的产生。 参数 events
用来从内核得到事件的集合;maxevents
告诉内核这个 events
有多大,不能大于创建 epoll_create()
时的 size
;参数 timeout
是超时时间(单位为毫秒,0 会不会阻塞 立即返回,-1不确定或永久阻塞)。该函数返回需要处理的事件数目,如返回 0 表示已超时。
Looper.cpp
中 epoll
除了监听唤醒文件描述符 eventfd
,还可以同时监听 MessageQueue
设置的文件描述符,通过 addFd
添加。部分源码解析:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 void Looper::rebuildEpollLocked () { if (mEpollFd >= 0 ) { ... close(mEpollFd); } mEpollFd = epoll_create(EPOLL_SIZE_HINT); LOG_ALWAYS_FATAL_IF(mEpollFd < 0 , "Could not create epoll instance: %s" , strerror(errno)); struct epoll_event eventItem ; memset (& eventItem, 0 , sizeof (epoll_event)); eventItem.events = EPOLLIN; eventItem.data.fd = mWakeEventFd; int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem); LOG_ALWAYS_FATAL_IF(result != 0 , "Could not add wake event fd to epoll instance: %s" , strerror(errno)); ... } int Looper::pollOnce (int timeoutMillis, int * outFd, int * outEvents, void ** outData) { int result = 0 ; for (;;) { ... if (result != 0 ) { #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce - returning result %d" , this , result); #endif if (outFd != NULL ) *outFd = 0 ; if (outEvents != NULL ) *outEvents = 0 ; if (outData != NULL ) *outData = NULL ; return result; } result = pollInner(timeoutMillis); } } int Looper::pollInner (int timeoutMillis) { ... int result = POLL_WAKE; ... mPolling = true ; struct epoll_event eventItems [EPOLL_MAX_EVENTS ]; int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis); mPolling = false ; mLock.lock(); ... if (eventCount == 0 ) { #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce - timeout" , this ); #endif result = POLL_TIMEOUT; goto Done; } ... for (int i = 0 ; i < eventCount; i++) { int fd = eventItems[i].data.fd; uint32_t epollEvents = eventItems[i].events; if (fd == mWakeEventFd) { if (epollEvents & EPOLLIN) { awoken(); } else { ... } } else { ... } } Done: ; ... mLock.unlock(); ... { int callbackResult = response.request.callback->handleEvent (fd, events, data); if (callbackResult == 0 ) { removeFd(fd, response.request.seq); } } return result; } bool Looper::isPolling () const { return mPolling; }
从代码中可以看到,Handler
机制只监听了 mWakeEventFd
的可读事件,以及超时事件;mPolling
记录了 epoll_wait
是否处于阻塞等待,在 MessageQueue.isPolling
中查看该变量的值。
小结 Looper.cpp
中通过 eventfd
实现线程间通信;通过 epoll
实现监听 eventfd
文件描述符的读写事件,以及 Message
的超时事件。
消息收发流程图 Handler
机制大致流程:
Looper.prepare
准备环境,初始化所有相关对象
Looper.loop
进入无限循环,阻塞等待获取下一条消息 MessageQueue.next
创建 Handler
子类,初始化
创建 Message
,存储相关信息
Handler.sendMessage
发送消息,加入到消息队列 MessageQueue.enqueueMessage
Looper
从 MessageQueue
中取出 Message
后,指派给 Handler.handleMessage
处理,处理完后回收 Message.recycleUnchecked
Looper.quit
退出机制
HandlerThread
创建与销毁在 Android
开发中经常会使用到线程,一想到线程,很多同学就立即使用 new Thread(){...}.start();
这样的方式。这样如果在一个 Activity
中多次调用上面的代码,那么将创建多个匿名线程,程序运行的越久可能会越来越慢。因此,需要一个 Handler
来启动一个线程,以及删除一个线程,保证线程不会重复的创建。 使用 HandlerThread
和 Handler
配合实现异步后台任务。特点 :
由 2 个 Handler
和 1 个 HandlerThread
来实现
后台线程串行执行
源码分析 HandlerThread
是参考 Looper
标准用法实现的工作线程,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 public class HandlerThread extends Thread { Looper mLooper; ... protected void onLooperPrepared () { } @Override public void run () { mTid = Process.myTid(); Looper.prepare(); synchronized (this ) { mLooper = Looper.myLooper(); notifyAll(); } Process.setThreadPriority(mPriority); onLooperPrepared(); Looper.loop(); mTid = -1 ; } public Looper getLooper () { if (!isAlive()) { return null ; } synchronized (this ) { while (isAlive() && mLooper == null ) { try { wait(); } catch (InterruptedException e) { } } } return mLooper; } ... }
HandlerThread
的创建代码示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 Handler mHandler = new Handler(){ @Override public void handleMessage (Message msg) { super .handleMessage(msg); }; HandlerThread mBackThread = new HandlerThread("mybackthread" ); mBackThread.start(); Handler mBackHandler = new Handler(mBackThread.getLooper()); mBackHandler.post(new Runnable() { @Override public void run () { ... mHandler.sendMessage(msg); } });
注意 :mBackHandler
的初始化必须在 mBackThread.start();
之后,否则拿不到这个线程的 looper
。源码中可以看到,getLooper
会一直等待;而 Looper
是在 run
中创建的,并且通知等待线程。 这种模式通过 mBackHandler.post(new Runnable() {})
来实现后台异步任务执行,所有后台任务都是通过 HandlerThread
这个线程执行的,但是 HandlerThread
是串行执行任务的,也就是每次 post
后进入队列排队执行。
HandlerThread
的退出1 2 3 4 5 6 7 8 9 10 11 12 13 14 @Override protected void onDestroy () { super .onDestroy(); if (mBackThread != null ){ mBackThread.quitSafely(); try { mBackThread.join(); mBackThread = null ; mBackHandler = null ; } catch (InterruptedException e) { e.printStackTrace(); } } }
在 Activity.onDestroy
退出时,等待 mBackThread
线程销毁完成再退出!
其他线程间通信机制
LocalBroadcastManager
本质还是 Handler
机制
EventBus
查看了 EventBus
源码,UI
主线程间通信还是采用了 Handler 机制:public class HandlerPoster extends Handler implements Poster {...}
;后台线程间通信,采用的是 Java
标准的 notify/wait
机制。
后续 Messenger
信差Messenger
用于进程间异步通信,其中通过 AIDL
来实现的进程间通信的;通过 Handler.sendMessage
来实现异步通信。详细参考Messenger 详解 。
OnFileDescriptorEventListener
监听文件描述符MessageQueue.java
中提供了完整的文件描述符监听功能,包括 Android_os_MessageQueue.cpp, Looper.cpp
中也实现了对应的功能,本文暂不做描述。
Native Loop
机制Java
层的 Handler
机制,在系统层的 Looper.cpp
文件中重新全部实现了一遍,供 Native
代码使用。本文仅仅介绍了其中的唤醒和轮询两个功能。而 Looper.cpp
中还包含添加文件描述符监听,收发 native
的消息。可以参考:Android Native层Looper详解 ,Android Native Looper机制 - 监听文件描述符 。
总结
Handler
通过 Linux
系统调用的意义 本文介绍的 Handler
消息收发机制,完全可以使用 Object.wait/notify
来简单实现;但是源码为什么还使用了 Linux epoll
来实现呢?这是因为 Framework
除了提供等待和唤醒方法,还提供了其他文件描述符监听机制等,epoll
机制更强大更高效。
应用程序的主线程处于无限循环状态 应用主线程 ActivityThread
进入 Looper.loop()
后不允许退出,直到应用退出;也就是说主线程是无限循环的,通过 epoll
机制阻塞等待时释放 CPU
资源,直到超时或事件触发才唤醒主线程。而我们经常遇到的 Activity
的 ANR
限制是针对 Activity
等组件的,实际上并不是主线程。
Handler
机制中的线程切换Handler
机制通常有两个线程:主线程和后台工作线程。主线程 loop
无限循环阻塞等待;后台工作线程完成耗时任务,发送完消息后就可以退出了;此时阻塞等待的主线程收到 epoll
机制的唤醒,拿到这个消息直接处理。因为消息 Message
是共享变量,所以整个 Handler
机制使用了很多同步锁 synchronized
,所以两个线程之间仅仅只是等待/通知交互。Looper.cpp
底层 epoll
机制并没有涉及数据交互,仅仅是线程间的通知什么时候能够取出消息,相当于 Java notify/wait
机制。而 epoll
还支持监听文件描述符涉及到数据交互,但是不属于本文介绍范围。
Handler
机制的退出 主线程是无法退出的,会一直循环等待,除非退出应用程序;其他线程的退出通过调用 Looper.quit
来退出,退出时会移除消息队列的所有消息,并设置 mQuitting
使得 MessageQueue.next
返回 null
,Looper.loop
退出循环。
消息队列的唤醒 消息队列 MessageQueue
有三个地方会唤醒:退出 quit
必须唤醒;移除同步屏障 removeSyncBarrier
和消息入队 enqueueMessage
满足条件时才唤醒。通过 nativeWake
触发唤醒。
参考文档