Android 广播机制浅析【3】分发广播

本分对广播分发的流程进行分析,主要包括广播入队后对于广播分发给各个接收者的过程以及接收者处理广播的过程,流程比较长,涉及 SystemServer 进程到接收者进程的通信。

AMS 分发

广播的分发统一都是通过 com.android.server.am.BroadcastQueue#scheduleBroadcastsLocked()

1
2
3
4
5
6
7
8
final BroadcastHandler mHandler;
public void scheduleBroadcastsLocked() {
if (mBroadcastsScheduled) { //上一个消息还在处理则返回
return;
}
mHandler.sendMessage(mHandler.obtainMessage(BROADCAST_INTENT_MSG, this));
mBroadcastsScheduled = true;
}

mHandler 是使用 AMS 中的 HandlerThread 的 Looper,因此这里流程就从 SystemServer 的 Binder 线程切换到了 HandlerThread 中,mHandler 中处理 BROADCAST_INTENT_MSG 会调用到 processNextBroadcast()

1
2
3
4
5
final void processNextBroadcast(boolean fromMsg) {
synchronized (mService) {
processNextBroadcastLocked(fromMsg, false);
}
}

processNextBroadcastLocked() 会一直持有 mService 对象锁,因为广播分发的逻辑主要都在这里,因此这部分逻辑比较长,一步一步来看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
final void processNextBroadcastLocked(boolean fromMsg, boolean skipOomAdj) {
BroadcastRecord r;
if (fromMsg) {
mBroadcastsScheduled = false; // 可以继续通过 handler 分发下一个广播
}
// 1、处理无序广播,
while (mParallelBroadcasts.size() > 0) {
r = mParallelBroadcasts.remove(0); //从无序广播队列中取出第一个广播
r.dispatchTime = SystemClock.uptimeMillis(); //设置广播的分发时间
//遍历所有的接收者
for (int i=0; i<r.receivers.size(); i++) {
Object target = r.receivers.get(i);
// 1.1 分发广播给动态注册接收者
deliverToRegisteredReceiverLocked(r, (BroadcastFilter)target, false, i);
}
}
//mPendingBroadcast 表示的是等待进程启动来处理的广播,如果进程存活则直接 return 继续等待
//如果该进程已经被杀,则重置 mPendingBroadcast 为空
if (mPendingBroadcast != null) { ... }
// 2、处理有序广播
do { // 在这个循环中主要是获取第一个不为空的广播对象
r = mDispatcher.getNextBroadcastLocked(now); //从有序广播队列中获取广播
if (r == null) {...}//如果没有需要处理的广播,则返回
// 2.1、第一处 ANR 逻辑,判断是否处理广播总耗时超时,后面 ANR 节分析
if (mService.mProcessesReady && !r.timeoutExempt && r.dispatchTime > 0) {...}
//广播状态不是 IDLE 则直接返回
if (r.state != BroadcastRecord.IDLE) return;
// 2.2、如果广播接收者为空、下一个接收者下标越界、广播被中止等则继续分发下一个广播
if (r.receivers == null || r.nextReceiver >= numReceivers
|| r.resultAbort || forceReceive) {
performReceiveLocked(...); // 处理广播
cancelBroadcastTimeoutLocked(); // 取消广播 ANR 超时
mDispatcher.retireBroadcastLocked(r); // 重置 mCurrentBroadcast 为 null
r = null;
continue;
}
...
} while (r == null);
// recIdx 表示当前接收者的下标,r.nextReceiver 为下一个接收者的下标,默认为 0
int recIdx = r.nextReceiver++;
// 设置广播的接收者被分发的时间,用于判断是否接收者处理耗时
r.receiverTime = SystemClock.uptimeMillis();
// 如果是该有序广播首次分发,则设置分发时间,用来判断是否广播总耗时超时
if (recIdx == 0) {
r.dispatchTime = r.receiverTime;
r.dispatchClockTime = System.currentTimeMillis();
}
// 如果此时没有超时任务,则开始超时计时,总时间就是该接收者的被分发时间+超时阈值
if (!mPendingBroadcastTimeoutMessage) {
long timeoutTime = r.receiverTime + mConstants.TIMEOUT;
setBroadcastTimeoutLocked(timeoutTime);
}
final Object nextReceiver = r.receivers.get(recIdx);
// 先处理动态注册的接收者
if (nextReceiver instanceof BroadcastFilter) {
BroadcastFilter filter = (BroadcastFilter)nextReceiver;
deliverToRegisteredReceiverLocked(r, filter, r.ordered, recIdx);
return;
}
// 再处理静态注册的接收者
ResolveInfo info = (ResolveInfo)nextReceiver;
r.manifestCount++;
r.delivery[recIdx] = BroadcastRecord.DELIVERY_DELIVERED;
r.state = BroadcastRecord.APP_RECEIVE;
r.curComponent = component;
r.curReceiver = info.activityInfo;
// 如果接收者进程正在运行,则直接开始分发
if (app != null && app.thread != null && !app.killed) {
processCurBroadcastLocked(r, app, skipOomAdj);
return;
}
//否则启动该接收者进程
if ((r.curApp=mService.startProcessLocked(...)) == null) {
//进程启动失败,结束这个广播的分发,继续分发下一个广播
finishReceiverLocked(r, r.resultCode, r.resultData,
r.resultExtras, r.resultAbort, false);
scheduleBroadcastsLocked();
r.state = BroadcastRecord.IDLE;
return;
}
// 进程启动成功,将该广播记录为 mPendingBroadcast,等待下一次调度时处理
maybeAddAllowBackgroundActivityStartsToken(r.curApp, r);
mPendingBroadcast = r;
mPendingBroadcastRecvIndex = recIdx;
}

这里先是通过 deliverToRegisteredReceiverLocked() 来分发无序广播,接着处理有序广播,优先是动态注册的接收者,也是通过 deliverToRegisteredReceiverLocked() 来分发,而静态注册的接收者则是通过 processCurBroadcastLocked() 分发:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private void deliverToRegisteredReceiverLocked(BroadcastRecord r,
BroadcastFilter filter, boolean ordered, int index) {
...
if (ordered) {
...
r.state = BroadcastRecord.CALL_IN_RECEIVE;
}
performReceiveLocked(...);
if (ordered) {
r.state = BroadcastRecord.CALL_DONE_RECEIVE;
}
...
}

void performReceiveLocked(ProcessRecord app, IIntentReceiver receiver,
Intent intent, int resultCode, String data, Bundle extras,
boolean ordered, boolean sticky, int sendingUser)throws RemoteException {
if (app != null) { //如果进程存在
if (app.thread != null) { // 通过 ApplicationThread 分发
app.thread.scheduleRegisteredReceiver(receiver, intent, resultCode,
data, extras, ordered, sticky, sendingUser, app.getReportedProcState());
} else { //进程已经死亡,抛出异常
// Application has died. Receiver doesn't exist.
throw new RemoteException("app.thread must not be null");
}
} else {//通过注册时传入的接收者进程中的 IIntentReceiver 类型对象来分发
receiver.performReceive(intent, resultCode, data, extras, ordered,
sticky, sendingUser);
}
}

因为 IIntentReceiver.aidlIApplicationThread.aidl 都是由 oneway 关键字修饰,因此这里调用后就会立即返回,不会等到接收者进程处理完成:

1
2
3
4
5
6
7
8
9
oneway interface IIntentReceiver {
@UnsupportedAppUsage
void performReceive(in Intent intent, int resultCode, String data,
in Bundle extras, boolean ordered, boolean sticky, int sendingUser);
}

oneway interface IApplicationThread {
...
}

这里如果获取到了对应进程,则通过 ApplicationThread 来分发主要是为了保证所有的接收者可以按照顺序进行接收,其内部还是调用了IIntentReceiver#performReceive

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private class ApplicationThread extends IApplicationThread.Stub {
...
// This function exists to make sure all receiver dispatching is
// correctly ordered, since these are one-way calls and the binder driver
// applies transaction ordering per object for such calls.
public void scheduleRegisteredReceiver(IIntentReceiver receiver, Intent intent,
int resultCode, String dataStr, Bundle extras, boolean ordered,
boolean sticky, int sendingUser, int processState) throws RemoteException {
updateProcessState(processState, false);
receiver.performReceive(intent, resultCode, dataStr, extras, ordered,
sticky, sendingUser);
}
...
}

接收者进程处理

至此就是从 SystemServer 进程走到了接收者进程的 Binder 线程中,之前看到 IIntentReceiver 在服务端的实现是 android.app.LoadedApk.ReceiverDispatcher.InnerReceiver:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
static final class ReceiverDispatcher {

final static class InnerReceiver extends IIntentReceiver.Stub {
//弱引用持有
final WeakReference<LoadedApk.ReceiverDispatcher> mDispatcher;

@Override
public void performReceive(Intent intent, int resultCode, String data,
Bundle extras, boolean ordered, boolean sticky, int sendingUser) {
final LoadedApk.ReceiverDispatcher rd = mDispatcher.get();
if (rd != null) {
rd.performReceive(intent, resultCode, data, extras,
ordered, sticky, sendingUser);
} else {
//如果接收者已经注销,则直接通知 AMS 继续处理完毕
IActivityManager mgr = ActivityManager.getService();
mgr.finishReceiver(this, resultCode, data, extras, false, intent.getFlags());
}
}
}

final class Args extends BroadcastReceiver.PendingResult {..}

final IIntentReceiver.Stub mIIntentReceiver;

ReceiverDispatcher(BroadcastReceiver receiver, Context context, Handler activityThread, Instrumentation instrumentation, boolean registered) {
mIIntentReceiver = new InnerReceiver(this, !registered);
}

@UnsupportedAppUsage
IIntentReceiver getIIntentReceiver() { //在注册广播时调用传递给 AMS
return mIIntentReceiver;
}

public void performReceive(Intent intent, int resultCode, String data,
Bundle extras, boolean ordered, boolean sticky, int sendingUser) {
// 将具体的处理任务封装到了 Args 中,并发送到指定的 handler 中,这里默认为主线程 handler
final Args args = new Args(...);
if (intent == null || !mActivityThread.post(args.getRunnable())) {
if (mRegistered && ordered) {
IActivityManager mgr = ActivityManager.getService();
args.sendFinished(mgr);
}
}
}
}

Args 也是 ReceiverDispatcher 的内部类,其 getRunnable 方法返回一个 Runnanle 对象,主要逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
public final Runnable getRunnable() {
return () -> {
final IActivityManager mgr = ActivityManager.getService();
Trace.traceBegin(Trace.TRACE_TAG_ACTIVITY_MANAGER, "broadcastReceiveReg");
receiver.onReceive(mContext, intent);
if (receiver.getPendingResult() != null) {
finish();
}
Trace.traceEnd(Trace.TRACE_TAG_ACTIVITY_MANAGER);
};
}

这里主要做了两件事:调用接收者的 onReceive 方法并主动通过调用 finish() 来结束处理,可以通过关键字 broadcastReceiveReg 在分析 Systrace 时快速定位到这里。这里也是一个广播 ANR 容易发生的地方,如果当前 handler 中一直有耗时任务在处理,还不等到执行 onReceive 就会报 ANR。

接收者进程完成处理

Args 是继承自 BroadcastReceiver.PendingResult 的,自身并没有覆写 finish 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public final void finish() {
if (mType == TYPE_COMPONENT) { //静态注册的广播
final IActivityManager mgr = ActivityManager.getService();
if (QueuedWork.hasPendingWork()) {
QueuedWork.queue(new Runnable() {
@Override public void run() {
sendFinished(mgr);
}
}, false);
} else {
sendFinished(mgr);
}
} else if (mOrderedHint && mType != TYPE_UNREGISTERED) { //动态注册的广播
final IActivityManager mgr = ActivityManager.getService();
sendFinished(mgr);
}
}

public void sendFinished(IActivityManager am) {
synchronized (this) {
if (mFinished) {
throw new IllegalStateException("Broadcast already finished");
}
mFinished = true;
//这里 mAbortBroadcast 在调用 BroadcastReceiver#abortBroadcast 中止有序广播继续发送时
//被设置为 true
am.finishReceiver(mToken, mResultCode, mResultData, mResultExtras,
mAbortBroadcast, mFlags);
}
}

这里对静态注册和动态注册的广播处理有些许不同,但最后都是需要调用到 AMS 这边:

1
2
3
4
5
6
7
8
9
10
11
12
13
public void finishReceiver(IBinder who, int resultCode, String resultData,
Bundle resultExtras, boolean resultAbort, int flags) {
// 获取对应的广播队列
queue = (flags & Intent.FLAG_RECEIVER_FOREGROUND) != 0 ? mFgBroadcastQueue : mBgBroadcastQueue;
//获取有序广播(无序广播不会走到这里)
BroadcastRecord r = queue.getMatchingOrderedReceiver(who);
if (r != null) {
doNext = r.queue.finishReceiverLocked(r, resultCode, resultData, resultExtras, resultAbort, true);
}
if (doNext) { //继续处理下一个广播
r.queue.processNextBroadcastLocked(/*fromMsg=*/ false, /*skipOomAdj=*/ true);
}
}

finishReceiverLocked 中主要是对状态的一些重置工作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public boolean finishReceiverLocked(BroadcastRecord r, int resultCode,
String resultData, Bundle resultExtras, boolean resultAbort, boolean waitForServices) {
r.state = BroadcastRecord.IDLE;
final ActivityInfo receiver = r.curReceiver;
r.receiver = null;
r.curReceiver = null;
r.curApp = null;
mPendingBroadcast = null;

//有设置 FLAG_RECEIVER_NO_ABORT 则有序广播不允许被中止
if (resultAbort && (r.intent.getFlags()&Intent.FLAG_RECEIVER_NO_ABORT) == 0) {
r.resultAbort = resultAbort;
} else {
r.resultAbort = false;
}

// 如果 mDelayBehindServices 为 true,即后台广播队列
if (waitForServices && r.curComponent != null && r.queue.mDelayBehindServices
&& r.queue.mDispatcher.getActiveBroadcastLocked() == r) {
ActivityInfo nextReceiver; //获取下一个接收者
// 如果下一个接收者不和当前接收者同进程
if (receiver == null || nextReceiver == null
|| receiver.applicationInfo.uid != nextReceiver.applicationInfo.uid
|| !receiver.processName.equals(nextReceiver.processName)) {
//如果当前用户已经启动的后台服务数量超过最大,则延迟分发到下一个接收者
if (mService.mServices.hasBackgroundServicesLocked(r.userId)) {
Slog.i(TAG, "Delay finish: " + r.curComponent.flattenToShortString());
r.state = BroadcastRecord.WAITING_SERVICES;
return false;
}
}
}
r.curComponent = null;
//如果该广播此时已经被接收者处理或者已经调用 performReceiveLocked 完成,则处理下一个接收者
return state == BroadcastRecord.APP_RECEIVE || state == BroadcastRecord.CALL_DONE_RECEIVE;
}

这里主要做了以下事情:

  • 重置广播中的一些属性,例如 receiver 等以及 mPendingBroadcast
  • 如果中止广播,则判断是否有设置 FLAG_RECEIVER_NO_ABORT 属性来不允许中止
  • 判断当前后台服务是否已经运行的够多,如果是则延迟处理后台广播的分发
  • 如果此时广播已经调用完成了 BroadcastQueue#processNextBroadcastLocked 或者是 BroadcastQueue#performReceiveLocked,即状态此时为 APP_RECEIVE 或是 CALL_DONE_RECEIVE,则去处理下一个接收者

参考

Android Developer:广播概览