从源码角度看Handler

简介

Handler这套线程异步通信框架在Android中的地位是不亚于Binder的,因为其基础设计简单、涉及的知识面广、业务使用场景多等原因,十分适合应用层的初中级的工程师进行深入学习

这篇文章中我将分析Handler核心功能的源码,分析将贯穿着framework, native和kernel的知识点:

  1. Handler发送异步消息原理
  2. Looper派发消息原理
  3. 消息分割栏的原理与视图绘制的运用
  4. epoll_create, epoll_ctl, epoll_wait三部曲的源码分析
  5. epoll中生产者与消费者模型的运用

同时按惯例,会在开篇给出总括全局的类图与架构图,方面阅读中定位理解与阅读后的回顾

设计图

类图

  • Message: 消息的抽象
  • Handler: 发送消息的工具类clo
  • MessageQueue: 主要维护了消息队列,同时也是与native通信的中枢
  • Looper: 循环获取消息并进行派发
  • Messenger: 可以跨进程传输的消息抽象

架构图

  1. 一个APP中运行着多个线程,不同线程间可以互相拿到对方的Handler对象
  2. MessageQueue和native直接通信,native中又和kernel通信,这样的调用链赋予了APP使用系统内核资源的能力
  3. epoll机制在kernel中维护了一个链表与一颗红黑树是它效率优于poll与select的基础

发送跨线程异步消息: Handler.post()

使用Handler的前提是获取到它的引用对象,然后才能够在对应的MessageQueue的消息队列插入消息。能够这样做的根本原因在于,线程之间的内存是可以相互访问的,这也是Handler能够实现跨线程通信的基本原理之一

下面从最常用的Handler.post()方法入手,看看消息发送的实现原理,这里需要说明的一点是,使用Handler发送消息的花样很多,但最终都需要调用到MessageQueue.enqueueMessage()方法来实现,这里就不一一介绍API的使用了

frameworks/base/core/java/android/os/Handler.java

public final boolean post(Runnable r)
{
    return  sendMessageDelayed(getPostMessage(r), 0);
}

public final boolean postAtTime(Runnable r, long uptimeMillis)
{
    return sendMessageAtTime(getPostMessage(r), uptimeMillis);
}

public boolean sendMessageAtTime(Message msg, long uptimeMillis) {
    // 这里获取到队列,队列是该线程唯一的,在Handler初始化时获取
    MessageQueue queue = mQueue;
    ...
    return enqueueMessage(queue, msg, uptimeMillis);
}

private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
    msg.target = this;
    // 判断是否为异步消息,异步消息将不会受到分割栏的影响
    if (mAsynchronous) {
        msg.setAsynchronous(true);
    }
    // 最终调用到MessageQueue去插入消息
    return queue.enqueueMessage(msg, uptimeMillis);
}

可以看到Handler只是一个类似于工具的类,最终的消息管理方面的操作还是需要委托给MessageQueue去做

frameworks/base/core/java/android/os/MessageQueue.java

boolean enqueueMessage(Message msg, long when) {
    // 这里会强制target成员的设置,分割栏的插入不是调用这个方法实现的
    if (msg.target == null) {
        throw new IllegalArgumentException("Message must have a target.");

        // 因为不同的线程都可以调用这个方法,所以使用类锁保证消息队列的异步安全
        synchronized (this) {
            msg.markInUse();
            msg.when = when;
            Message p = mMessages;
            boolean needWake;
            if (p == null || when == 0 || when < p.when) {
                msg.next = p;
                mMessages = msg;
                needWake = mBlocked;
            } else {
                needWake = mBlocked && p.target == null && msg.isAsynchronous();
                Message prev;
                // 遍历消息队列,插入到合适的位置
                for (;;) {
                    prev = p;
                    p = p.next;
                    if (p == null || when < p.when) {
                        break;
                    }
                    if (needWake && p.isAsynchronous()) {
                        needWake = false;
                    }
                }
                msg.next = p; // invariant: p == prev.next
                prev.next = msg;
            }

            // 调用到native尝试唤醒对端设备
            if (needWake) {
                nativeWake(mPtr);
            }
        }
        return true;
    }
}

frameworks/base/core/jni/android_os_MessageQueue.cpp

static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {
    // 通过ptr获取到NativeMessageQueue对象
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
    nativeMessageQueue->wake();
}

void NativeMessageQueue::wake() {
    // 委托给native层的Looper进行处理
    mLooper->wake();
}

system/core/libutils/Looper.cpp

void Looper::wake() {
    uint64_t inc = 1;
    // 向文件描述符为mWakeEventFd设备写入,以此来唤醒监听设备的epoll
    ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t)));
    ...
}

总结一下消息发送的主要工作:

  1. 在java层的消息队列中根据时间插入消息
  2. 在native层,向对应的fd设备写入数据,以此来唤醒监听该设备的epoll

处理消息: Handler.handleMessage()

handleMessage完全是处于被动调用的状态,每当消息到来时,会从kernel依次调用的native,再到java层的Looper

这里直接看Looper是如何获取到消息,并调用handleMessage的,省去了Looper.prepare()代码的分析

frameworks/base/core/java/android/os/Looper.java

public static void loop() {
    // 从ThreadLocal中取出之前放入的Looper对象
    final Looper me = myLooper();
    // 获取到队列
    final MessageQueue queue = me.mQueue;

    Binder.clearCallingIdentity();
    final long ident = Binder.clearCallingIdentity();

    for (;;) {
        // 获取下一个消息,这个方法在没有消息时会产生阻塞,只有在消息到来时才会触发
        Message msg = queue.next(); // might block

        ...

        // 派发消息,最终会调用到Handler.handleMessage方法
        try {
            msg.target.dispatchMessage(msg);
        } finally {
            if (traceTag != 0) {
                Trace.traceEnd(traceTag);
            }
        }

        ...

        // 回收Message
        msg.recycleUnchecked();
    }
}

继续看看MessageQueue.next的实现:

frameworks/base/core/java/android/os/MessageQueue.java

Message next() {
    // mPtr实际上是NativeMesssageQueue的引用,方便在native层查询到MessageQueue
    final long ptr = mPtr;

    int pendingIdleHandlerCount = -1; // -1 only during first iteration
    int nextPollTimeoutMillis = 0;
    for (;;) {
        if (nextPollTimeoutMillis != 0) {
            Binder.flushPendingCommands();
        }

        // 调用native方法,之后调用到epoll_wait
        nativePollOnce(ptr, nextPollTimeoutMillis);

        synchronized (this) {
            final long now = SystemClock.uptimeMillis();
            Message prevMsg = null;
            Message msg = mMessages;
            if (msg != null && msg.target == null) {
                // 消息分割栏,后面会做分析
                do {
                    prevMsg = msg;
                    msg = msg.next;
                } while (msg != null && !msg.isAsynchronous());
            }
            if (msg != null) {
                // 如果当前时间没有已经就绪的message,那么重置poll事件的时间
                if (now < msg.when) {
                    nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
                } else {
                    // Got a message.
                    mBlocked = false;
                    if (prevMsg != null) {
                        prevMsg.next = msg.next;
                    } else {
                        mMessages = msg.next;
                    }
                    msg.next = null;

                    msg.markInUse();
                    // 如果该消息已经就绪,那么将它返回给Looper进行派发
                    return msg;
                }
            } else {
                // No more messages.
                nextPollTimeoutMillis = -1;
            }
        }

        // 如果此次没有处理消息,则用来处理IdleHandler的操作,把优先级不高的操作放到这里去执行,尽量不浪费事件片
        for (int i = 0; i < pendingIdleHandlerCount; i++) {
            final IdleHandler idler = mPendingIdleHandlers[i];
            mPendingIdleHandlers[i] = null; // release the reference to the handler

            boolean keep = false;
            try {
                keep = idler.queueIdle();
            } catch (Throwable t) {
                Log.wtf(TAG, "IdleHandler threw exception", t);
            }

            if (!keep) {
                synchronized (this) {
                    mIdleHandlers.remove(idler);
                }
            }
        }
    }
}

核心操作nativePollOnce同样是在native层进行处理

frameworks/base/core/jni/android_os_MessageQueue.cpp

static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj, jlong ptr, jint timeoutMillis) {
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
    nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}

void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
    mPollEnv = env;
    mPollObj = pollObj;
    // 同样也是委托Looper进行处理
    mLooper->pollOnce(timeoutMillis);
    mPollObj = NULL;
    mPollEnv = NULL;
}

system/core/libutils/Looper.cpp

int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
    int result = 0;
    for (;;) {
        // 获取操作实际上是从reponse队列中拿取的
        ...

        if (result != 0) {
            if (outFd != NULL) *outFd = 0;
            if (outEvents != NULL) *outEvents = 0;
            if (outData != NULL) *outData = NULL;
            return result;
        }

        // 本质上在循环的调用pollInner方法,直到获取到了结果
        result = pollInner(timeoutMillis);
    }
}

int Looper::pollInner(int timeoutMillis) {
    ...

    // Poll.
    int result = POLL_WAKE;
    // 清空response队列
    mResponses.clear();
    mResponseIndex = 0;

    // We are about to idle.
    mPolling = true;

    // 初始化epoll_event
    struct epoll_event eventItems[EPOLL_MAX_EVENTS];
    // 使用epoll_wait调用的kernel去请求事件,kernel获取到事件后会通过mmap将epoll_event信息返回到native层
    int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);

    // No longer idling.
    mPolling = false;

    // Acquire lock.
    mLock.lock();

    ...

    // 获取到信息后,接下来就是读取了
    for (int i = 0; i < eventCount; i++) {
        int fd = eventItems[i].data.fd;
        uint32_t epollEvents = eventItems[i].events;
        if (fd == mWakeEventFd) {
            if (epollEvents & EPOLLIN) {
                awoken();
            }
        } else {
            ssize_t requestIndex = mRequests.indexOfKey(fd);
            if (requestIndex >= 0) {
                int events = 0;
                if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
                if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
                if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
                if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
                // 插入到response队列中供后续获取
                pushResponse(events, mRequests.valueAt(requestIndex));
            } 
        }
    }
Done: ;

    mNextMessageUptime = LLONG_MAX;
    while (mMessageEnvelopes.size() != 0) {
        nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
        const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
        if (messageEnvelope.uptime <= now) {
            { // obtain handler
                sp<MessageHandler> handler = messageEnvelope.handler;
                Message message = messageEnvelope.message;
                mMessageEnvelopes.removeAt(0);
                mSendingMessage = true;
                mLock.unlock();
                // 首先处理native层的Message
                handler->handleMessage(message);
            } // release handler
        }
    }

    // Release lock.
    mLock.unlock();
    ...
    return result;
}

native层Looper.pollInner()方法是获取Message的主要操作:

  1. 调用epoll_wait在kernel中获取设备事件,该方法后续会做分析
  2. 获取到事件后进行解析并插入到response队列中
  3. 处理native事件

可以看到,该方法会首先处理native层的Message,也就是说Handler这套框架对于native的消息是优先派发的

设置同步分割栏: MessageQueue.postSyncBarrier()

同步分割栏的原理其实很简单,本质上就是通过创建一个target成员为NULL的Message并插入到消息队列中,这样在这个特殊的Message之后的消息就不会被处理了,只有当这个Message被移除后才会继续执行之后的Message

最经典的实现就是ViewRootImpl调用scheduleTraversals方法进行视图更新时的使用:

frameworks/base/core/java/android/view/ViewRootImpl.java

void scheduleTraversals() {
    if (!mTraversalScheduled) {
        mTraversalScheduled = true;
        // 执行分割操作后会获取到分割令牌,使用它可以移除分割栏
        mTraversalBarrier = mHandler.getLooper().getQueue().postSyncBarrier();
        // 发出一个有异步标志的Message,避免被分割
        mChoreographer.postCallback(
            Choreographer.CALLBACK_TRAVERSAL, mTraversalRunnable, null);
        ...
    }
}

在执行doTraversal方法后,才会移出分割栏:

void doTraversal() {
    if (mTraversalScheduled) {
        mTraversalScheduled = false;
        mHandler.getLooper().getQueue().removeSyncBarrier(mTraversalBarrier);

        performTraversals();

        ...
    }
}

这样做的原因是,doTraversal的操作是通过Handler进行处理的,然而这个消息队列却是整个主线程公用的,比如说四大组件的各个生命周期的调用,然而doTraversal的内容是更新视图UI,这个任务无疑是最高优先级的。所以在这之前,需要确保队列中其它同步消息不会影响到它的执行

这里继续跟一下MessageQueue.postSyncBarrier()的实现:

frameworks/base/core/java/android/os/MessageQueue.java

public int postSyncBarrier() {
    return postSyncBarrier(SystemClock.uptimeMillis());
}

private int postSyncBarrier(long when) {
    synchronized (this) {
        final int token = mNextBarrierToken++;
        final Message msg = Message.obtain();
        msg.markInUse();
        msg.when = when;
        msg.arg1 = token;
        // 注意这里,并没有为target成员进行初始化

        Message prev = null;
        Message p = mMessages;
        // 插入到队列中
        if (when != 0) {
            while (p != null && p.when <= when) {
                prev = p;
                p = p.next;
            }
        }
        if (prev != null) { // invariant: p == prev.next
            msg.next = p;
            prev.next = msg;
        } else {
            msg.next = p;
            mMessages = msg;
        }
        return token;
    }
}

可以看到,设置分割栏和普通的post Message是一样的,不同的是target是空的

下面接着来看看分割栏真正起作用的地方:

frameworks/base/core/java/android/os/MessageQueue.java

Message next() {
    ...
    for (;;) {
        ...
        // 进行队列遍历
           Message msg = mMessages;
        if (msg != null && msg.target == null) {
            do {
                prevMsg = msg;
                msg = msg.next;
            // 如果target为NULL,将会陷入这个循环,除非是有异步标志的消息才会跳出循环
            } while (msg != null && !msg.isAsynchronous());
        }
        ...
    }
}

跨进程异步消息: Messenger

Messenger是Android开发中算是比较冷门的一种跨进程通信方式,它的实现是依托着Binder+Handler这两种技术的配合,使得跨进程调用后服务端任务能够在指定的线程中被执行

这种方式其实就类似于ActivityThread的binder call接收后,使用指定Handler去post消息是一样的,不同的在于Messenger的对其进行了封装,使得开发者可以轻松的实现而不需要更多的代码

原生有个典型实现的MessengerService,可以参考它的思路:

top Created with Sketch.