8f3895f7b19067e9ae9fa7f2d296dc55
我所理解的 iOS 并发编程

无论在哪个平台,并发编程都是一个让人头疼的问题。庆幸的是,相对于服务端,客户端的并发编程简单了许多。这篇文章主要讲述一些基于 iOS 平台的一些并发编程相关东西,我写博客习惯于先介绍原理,后介绍用法,毕竟对于 API 的使用,官网有更好的文档。

一些原理性的东西

为了便于理解,这里先解释一些相关概念。如果你对这些概念已经很熟悉,可以直接跳过。

1.进程

从操作系统定义上来说,进程就是系统进行资源分配和调度的基本单位,系统创建一个线程后,会为其分配对应的资源。在 iOS 系统中,进程可以理解为就是一个 App。iOS 并没有提供可以创建进程的 API,即使你调用 fork() 函数,也不能创建新的进程。所以,本文所说的并发编程,都是针对线程来说的

2.线程

线程是程序执行流的最小单元。一般情况下,一个进程会有多个线程,或者至少有一个线程。一个线程有创建、就绪、运行、阻塞和死亡五种状态。线程可以共享进程的资源,所有的问题也是因为共享资源引起的。

3.并发

操作系统引入线程的概念,是为了使过个 CPU 更好的协调运行,充分发挥他们的并行处理能力。例如在 iOS 系统中,你可以在主线程中进行 UI 操作,然后另启一些线程来处理与 UI 操作无关的事情,两件事情并行处理,速度比较快。这就是并发的大致概念。

4.时间片

按照 wiki 上面解释:是分时操作系统分配给每个正在运行的进程微观上的一段CPU时间(在抢占内核中是:从进程开始运行直到被抢占的时间)。线程可以被认为是 ”微进程“,因此这个概念也可以用到线程方面。

一般操作系统使用时间片轮转算法进行调度,即每次调度时,总是选择就绪队列的队首进程,让其在CPU上运行一个系统预先设置好的时间片。一个时间片内没有完成运行的进程,返回到绪队列末尾重新排队,等待下一次调度。不同的操作系统,时间片的范围不一致,一般都是毫秒(ms)级别。

4.死锁

死锁是由于多个线程(进程)在执行过程中,因为争夺资源而造成的互相等待现象,你可以理解为卡主了。产生死锁的必要条件有四个:

  • 互斥条件 : 指进程对所分配到的资源进行排它性使用,即在一段时间内某资源只由一个进程占用。如果此时还有其它进程请求资源,则请求者只能等待,直至占有资源的进程用毕释放。
  • 请求和保持条件 : 指进程已经保持至少一个资源,但又提出了新的资源请求,而该资源已被其它进程占有,此时请求进程阻塞,但又对自己已获得的其它资源保持不放。
  • 不可剥夺条件 : 指进程已获得的资源,在未使用完之前,不能被剥夺,只能在使用完时由自己释放。
  • 环路等待条件 : 指在发生死锁时,必然存在一个进程——资源的环形链,即进程集合{P0,P1,P2,···,Pn}中的P0正在等待一个P1占用的资源;P1正在等待P2占用的资源,……,Pn正在等待已被P0占用的资源。

为了便于理解,这里举一个例子:一座桥,同一时间只允许一辆车经过(互斥)。两辆车 A,B 从桥的两端开上桥,走到桥的中间。此时 A 车不肯退(不可剥夺),又想占用 B 车所占据的道路;B 车此时也不肯退,又想占用 A 车所占据的道路(请求和保持)。此时,A 等待 B 占用的资源,B 等待 A 占用的资源(环路等待),两车僵持下去,就形成了死锁现象

5.线程安全

当多个线程同时访问一块共享资源(例如数据库),因为时序性问题,会导致数据错乱,这就是线程不安全。例如数据库中某个整形字段的 value 为 0,此时两个线程同时对其进行写入操作,线程 A 拿到原值为 0,加一后变为 1;线程 B 并不是在 A 加完后拿的,而是和 A 同时拿的,加完后也是 1,加了两次,理想值应该为 2,但是数据库中最终值却是 1。实际开发场景可能要比这个复杂的多。

所谓的线程安全,可以理解为在多个线程操作(例如读写操作)这部分数据时,不会出现问题。

Lock

因为线程共享进程资源,在并发情况下,就会出现线程安全问题。为了解决此问题,就出现了锁这个概念。在多线程环境下,当你访问一些共享数据时,拿到访问权限,给数据加锁,在这期间其他线程不可访问,直到你操作完之后进行解锁,其他线程才可以对其进行操作。

iOS 提供了多种锁,ibireme 大神的 这篇文章 对这些锁进行了性能分析,我这里直接把图 cp 过来了:

下面针对这些锁,逐一分析。

1.OSSpinLock

ibireme 大神的文章也说了,虽然这个锁性能最高,但是已经不安全了,建议不再使用,这里简单说一下。

OSSpinLock 是一种自旋锁,主要提供了加锁(OSSpinLockLock)、尝试枷锁(OSSpinLockTry)和解锁(OSSpinLockUnlock)三个方法。对一块资源进行加锁时,如果尝试加锁失败,不会进入睡眠状态,而是一直进行询问(自旋),占用 CPU资源,不适用于较长时间的任务。在自旋期间,因为占用 CPU 导致低优先级线程拿不到 CUP 资源,无法完成任务并释放锁,从而形成了优先级反转

so,虽然性能很高,但是不要用了。而且 Apple 也已经将这个类比较为 deprecate 了。

自旋锁 & 互斥锁
两者大体类似,区别在于:自旋锁属于 busy-waiting 类型锁,尝试加锁失败,会一直处于询问状态,占用 CPU 资源,效率高;互斥锁属于 sleep-waiting 类型锁,在尝试失败之后,会被阻塞,然后进行上下文切换置于等待队列,因为有上下文切换,效率较低。
在 iOS 中 NSLock 属于互斥锁。

优先级反转 :当一个高优先级任务访问共享资源时,该资源已经被一个低优先级任务抢占,阻塞了高优先级任务;同时,该低优先级任务被一个次高优先级的任务所抢先,从而无法及时地释放该临界资源。最终使得任务优先级被倒置,发生阻塞。(引用自 wiki

关于自旋锁的原理,bestswifter 的文章 深入理解 iOS 开发中的锁 这篇文章讲得很好,我这里大部分锁的知识引用于此,建议读一下原文。

自旋锁是加不上就一直尝试,也就是一个循环,直到尝试加上锁,伪代码如下:

bool lock = false; // 一开始没有锁上,任何线程都可以申请锁  
do {  
    while(test_and_set(&lock); // test_and_set 是一个原子操作,尝试加锁
        Critical section  // 临界区
    lock = false; // 相当于释放锁,这样别的线程可以进入临界区
        Reminder section // 不需要锁保护的代码        
}

使用 :

OSSpinLock spinLock = OS_SPINLOCK_INIT;
OSSpinLockLock(&spinLock);
// 被锁住的资源
OSSpinLockUnlock(&spinLock);

2.dispatch_semaphore

dispatch_semaphore 并不属于锁,而是信号量。两者的区别如下:

  • 锁是用于线程互斥操作,一个线程锁住了某个资源,其他线程都无法访问,直到整个线程解锁;信号量用于线程同步,一个线程完成了某个动作通过信号量告诉别的线程,别的线程再进行操作。
  • 锁的作用域是线程之间;信号量的作用域是线程和进程之间。
  • 信号量有时候可以充当锁的作用,初次之前还有其他作用。
  • 如果转化为数值,锁可以认为只有 0 和 1;信号量可以大于零和小于零,有多个值。

dispatch_semaphore 使用分为三步:create、wait 和 signal。如下:

    // create
    dispatch_semaphore_t semaphore = dispatch_semaphore_create(1);

    // thread A
    dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
        dispatch_semaphore_wait(semaphore, DISPATCH_TIME_FOREVER);
        // execute task A
        NSLog(@"task A");
        sleep(10);
        dispatch_semaphore_signal(semaphore);
    });

    // thread B
    dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
        dispatch_semaphore_wait(semaphore, DISPATCH_TIME_FOREVER);
        // execute task B
        NSLog(@"task B");
        dispatch_semaphore_signal(semaphore);
    });

执行结果:

2018-05-03 21:40:09.068586+0800 ConcurrencyTest[44084:1384262] task A
2018-05-03 21:40:19.072951+0800 ConcurrencyTest[44084:1384265] task B

thread A,B 是两个异步线程,一般情况下,各自执行自己的事件,互不干涉。但是根据 console 输出,B 是在 A 执行完了 10s 执行之后才执行的,显然受到阻塞。使用 dispatch_semaphore 大致执行过程这样:创建 semaphore 时,信号量值为 1;执行到线程 A 的 dispatch_semaphore_wait 时,信号量值减 1,变为 0;然后执行任务 A,执行完毕后 sleep 方法阻塞当前线程 10s;与此同时,线程 B 执行到了 dispatch_semaphore_wait,由于信号量此时为 0,且线程 A 中设置的为 DISPATCH_TIME_FOREVER,因此需要等到线程 A sleep 10s 之后,执行 dispatch_semaphore_signal 将信号量置为 1,线程 B 的任务才开始执行。

根据上面的描述,dispatch_semaphore 的原理大致也就了解了。GCD 源码 对这些方法定义如下:

long
dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout)
{
    long value = dispatch_atomic_dec2o(dsema, dsema_value);
    dispatch_atomic_acquire_barrier();
    if (fastpath(value >= 0)) {
        return 0;
    }
    return _dispatch_semaphore_wait_slow(dsema, timeout);
}

static long
_dispatch_semaphore_wait_slow(dispatch_semaphore_t dsema,
        dispatch_time_t timeout)
{
    long orig;

again:
    // Mach semaphores appear to sometimes spuriously wake up. Therefore,
    // we keep a parallel count of the number of times a Mach semaphore is
    // signaled (6880961).
    while ((orig = dsema->dsema_sent_ksignals)) {
        if (dispatch_atomic_cmpxchg2o(dsema, dsema_sent_ksignals, orig,
                orig - 1)) {
            return 0;
        }
    }

    struct timespec _timeout;
    int ret;

    switch (timeout) {
    default:
        do {
            uint64_t nsec = _dispatch_timeout(timeout);
            _timeout.tv_sec = (typeof(_timeout.tv_sec))(nsec / NSEC_PER_SEC);
            _timeout.tv_nsec = (typeof(_timeout.tv_nsec))(nsec % NSEC_PER_SEC);
            ret = slowpath(sem_timedwait(&dsema->dsema_sem, &_timeout));
        } while (ret == -1 && errno == EINTR);

        if (ret == -1 && errno != ETIMEDOUT) {
            DISPATCH_SEMAPHORE_VERIFY_RET(ret);
            break;
        }
        // Fall through and try to undo what the fast path did to
        // dsema->dsema_value
    case DISPATCH_TIME_NOW:
        while ((orig = dsema->dsema_value) < 0) {
            if (dispatch_atomic_cmpxchg2o(dsema, dsema_value, orig, orig + 1)) {
                errno = ETIMEDOUT;
                return -1;
            }
        }
        // Another thread called semaphore_signal().
        // Fall through and drain the wakeup.
    case DISPATCH_TIME_FOREVER:
        do {
            ret = sem_wait(&dsema->dsema_sem);
        } while (ret != 0);
        DISPATCH_SEMAPHORE_VERIFY_RET(ret);
        break;
    }

    goto again;
}

以上时对 wait 方法的定义,如果你不想看代码,可以直接听我说:

  • 调用 dispatch_semaphore_wait 方法时,如果信号量大于 0,直接返回;否则进入后续步骤。
  • _dispatch_semaphore_wait_slow 方法根据传入 timeout 参数不同,使用 switch-case 处理。
  • 如果传入的是 DISPATCH_TIME_NOW 参数,将信号量加 1 并立即返回。
  • 如果传入的是一个超时时间,调用系统的 semaphore_timedwait 方法进行等待,直至超时。
  • 如果传入的是 DISPATCH_TIME_FOREVER 参数,调用系统的 semaphore_wait 进行等待,直到收到 singal 信号。

至于 dispatch_semaphore_signal 就比较简单了,源码如下:

long
dispatch_semaphore_signal(dispatch_semaphore_t dsema)
{
    dispatch_atomic_release_barrier();
    long value = dispatch_atomic_inc2o(dsema, dsema_value);
    if (fastpath(value > 0)) {
        return 0;
    }
    if (slowpath(value == LONG_MIN)) {
        DISPATCH_CLIENT_CRASH("Unbalanced call to dispatch_semaphore_signal()");
    }
    return _dispatch_semaphore_signal_slow(dsema);
}
  • 现将信号量加 1,大于 0 直接返回。
  • 小于 0 返回 _dispatch_semaphore_signal_slow,这个方法的作用是调用内核的 semaphore_signal 函数唤醒信号量,然后返回 1。

3.pthread_mutex

Pthreads 是 POSIX Threads 的缩写。pthread_mutex 属于互斥锁,即尝试加锁失败后悔阻塞线程并睡眠,会进行上下文切换。锁的类型主要有三种:PTHREAD_MUTEX_NORMALPTHREAD_MUTEX_ERRORCHECKPTHREAD_MUTEX_RECURSIVE

  • PTHREAD_MUTEX_NORMAL,普通锁,当一个线程加锁以后,其余请求锁的线程将形成一个等待队列,并在解锁后按优先级获得锁。这种锁策略保证了资源分配的公平性。
  • PTHREAD_MUTEX_ERRORCHECK,检错锁,如果同一个线程请求同一个锁,则返回 EDEADLK。否则和 PTHREAD_MUTEX_NORMAL 相同。
  • PTHREAD_MUTEX_RECURSIVE,递归锁,允许一个线程进行递归申请锁。

使用如下:

    pthread_mutex_t mutex;   // 定义锁
    pthread_mutexattr_t attr; // 定义 mutexattr_t 变量
    pthread_mutexattr_init(&attr); // 初始化attr为默认属性
    pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_NORMAL);  // 设置锁的属性
    pthread_mutex_init(&mutex, &attr); // 创建锁

    pthread_mutex_lock(&mutex); // 申请锁
    // 临界区
    pthread_mutex_unlock(&mutex); // 释放锁

4.NSLock

NSLock 属于互斥锁,是 Objective-C 封装的一个对象。虽然我们不知道 Objective-C 是如何实现的,但是我们可以在 swift 源码 中找到他的实现 :

...
internal var mutex = _PthreadMutexPointer.allocate(capacity: 1)
...
open func lock() {
        pthread_mutex_lock(mutex)
    }

open func unlock() {
   pthread_mutex_unlock(mutex)
#if os(macOS) || os(iOS)
  // Wakeup any threads waiting in lock(before:)
   pthread_mutex_lock(timeoutMutex)
   pthread_cond_broadcast(timeoutCond)
   pthread_mutex_unlock(timeoutMutex)
#endif
}

可以看出他只是将 pthread_mutex 封装了一下。只因为比 pthread_mutex 慢一些,难道是因为方法层级之间的调用,多了几次压栈操作???

常规使用:

NSLock *mutexLock = [NSLock new];
[mutexLock lock];
// 临界区
[muteLock unlock];

4.NSCondition & NSConditionLock

NSCondition 可以同时起到 lock 和条件变量的作用。同样你可以在 swift 源码 中找到他的实现 :

open class NSCondition: NSObject, NSLocking {
    internal var mutex = _PthreadMutexPointer.allocate(capacity: 1)
    internal var cond = _PthreadCondPointer.allocate(capacity: 1)

    public override init() {
        pthread_mutex_init(mutex, nil)
        pthread_cond_init(cond, nil)
    }

    deinit {
        pthread_mutex_destroy(mutex)
        pthread_cond_destroy(cond)
        mutex.deinitialize(count: 1)
        cond.deinitialize(count: 1)
        mutex.deallocate()
        cond.deallocate()
    }

    open func lock() {
        pthread_mutex_lock(mutex)
    }

    open func unlock() {
        pthread_mutex_unlock(mutex)
    }

    open func wait() {
        pthread_cond_wait(cond, mutex)
    }

    open func wait(until limit: Date) -> Bool {
        guard var timeout = timeSpecFrom(date: limit) else {
            return false
        }
        return pthread_cond_timedwait(cond, mutex, &timeout) == 0
    }

    open func signal() {
        pthread_cond_signal(cond)
    }

    open func broadcast() {
        pthread_cond_broadcast(cond)
    }

    open var name: String?
}

可以看出,它还是遵循 NSLocking 协议,lock 方法同样还是使用的 pthread_mutex,wait 和 signal 使用的是 pthread_cond_waitpthread_cond_signal

使用 NSCondition 是,先对要操作的临界区加锁,然后因为条件不满足,使用 wait 方法阻塞线程;待条件满足之后,使用 signal 方法进行通知。下面是一个 生产者-消费者的例子:

NSCondition *condition = [NSCondition new];
NSMutableArray *products = [NSMutableArray array];

// consume
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
    [condition lock];
    while (products.count == 0) {
        [condition wait];
    }
    [products removeObjectAtIndex:0];
    [condition unlock];
});

// product
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
    [condition lock];
    [products addObject:[NSObject new]];
    [condition signal];
    [condition unlock];
});

NSConditionLock 是通过使用 NSCondition 来实现的,遵循 NSLocking 协议,然后这是 swift 源码 (源码比较占篇幅,我这里简化一下):

open class NSConditionLock : NSObject, NSLocking {
    internal var _cond = NSCondition()

    ...

    open func lock(whenCondition condition: Int) {
        let _ = lock(whenCondition: condition, before: Date.distantFuture)
    }

    open func `try`() -> Bool {
        return lock(before: Date.distantPast)
    }

    open func tryLock(whenCondition condition: Int) -> Bool {
        return lock(whenCondition: condition, before: Date.distantPast)
    }

    open func unlock(withCondition condition: Int) {
        _cond.lock()
        _thread = nil
        _value = condition
        _cond.broadcast()
        _cond.unlock()
    }

    open func lock(before limit: Date) -> Bool {
        _cond.lock()
        while _thread != nil {
            if !_cond.wait(until: limit) {
                _cond.unlock()
                return false
            }
        }
        _thread = pthread_self()
        _cond.unlock()
        return true
    }

    open func lock(whenCondition condition: Int, before limit: Date) -> Bool {
        _cond.lock()
        while _thread != nil || _value != condition {
            if !_cond.wait(until: limit) {
                _cond.unlock()
                return false
            }
        }
        _thread = pthread_self()
        _cond.unlock()
        return true
    }

    ...
}

可以看出它使用了一个 NSCondition 全局变量来实现 lock 和 unlock 方法,都是一些简单的代码逻辑,就不详细说了。

使用 NSConditionLock 注意:

  • 初始化 NSConditionLock 会设置一个 condition,只有满足这个 condition 才能加锁。
  • -[unlockWithCondition:] 并不是满足条件时解锁,而是解锁后,修改 condition 值
typedef NS_ENUM(NSInteger, CTLockCondition) {
    CTLockConditionNone = 0,
    CTLockConditionPlay,
    CTLockConditionShow
};

- (void)testConditionLock {
    NSConditionLock *conditionLock = [[NSConditionLock alloc] initWithCondition:CTLockConditionPlay];

    // thread one
    dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
        [conditionLock lockWhenCondition:CTLockConditionNone];
        NSLog(@"thread one");
        sleep(2);
        [conditionLock unlock];
    });

    // thread two
    dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
        sleep(1);
        if ([conditionLock tryLockWhenCondition:CTLockConditionPlay]) {
            NSLog(@"thread two");
            [conditionLock unlockWithCondition:CTLockConditionShow];
            NSLog(@"thread two unlocked");
        } else {
            NSLog(@"thread two try lock failed");
        }
    });

    // thread three
    dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
        sleep(2);
        if ([conditionLock tryLockWhenCondition:CTLockConditionPlay]) {
            NSLog(@"thread three");
            [conditionLock unlock];
            NSLog(@"thread three locked success");
        } else {
            NSLog(@"thread three try lock failed");
        }
    });
}

// thread four
    dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
        sleep(4);
        if ([conditionLock tryLockWhenCondition:CTLockConditionShow]) {
            NSLog(@"thread four");
            [conditionLock unlock];
            NSLog(@"thread four unlocked success");
        } else {
            NSLog(@"thread four try lock failed");
        }
    });
}

然后看输出结果 :

2018-05-05 16:34:33.801855+0800 ConcurrencyTest[97128:3100768] thread two
2018-05-05 16:34:33.802312+0800 ConcurrencyTest[97128:3100768] thread two unlocked
2018-05-05 16:34:34.804384+0800 ConcurrencyTest[97128:3100776] thread three try lock failed
2018-05-05 16:34:35.806634+0800 ConcurrencyTest[97128:3100778] thread four
2018-05-05 16:34:35.806883+0800 ConcurrencyTest[97128:3100778] thread four unlocked success

可以看出,thread one 因为条件和初始化不符,加锁失败,未输出 log; thread two 条件相符,解锁成功,并修改加锁条件;thread three 使用原来的加锁条件,显然无法加锁,尝试加锁失败; thread four 使用修改后的条件,加锁成功。

5. NSRecursiveLock

NSRecursiveLock 属于递归锁。然后这是 swift 源码,只贴一下关键部分:

open class NSRecursiveLock: NSObject, NSLocking {
    ...
    public override init() {
        super.init()
#if CYGWIN
        var attrib : pthread_mutexattr_t? = nil
#else
        var attrib = pthread_mutexattr_t()
#endif
        withUnsafeMutablePointer(to: &attrib) { attrs in
            pthread_mutexattr_settype(attrs, Int32(PTHREAD_MUTEX_RECURSIVE))
            pthread_mutex_init(mutex, attrs)
        }
    }

    ...
}

它是使用 PTHREAD_MUTEX_RECURSIVE 类型的 pthread_mutex_t 初始化的。递归所可以在一个线程中重复调用,然后底层会记录加锁和解锁次数,当二者次数相同时,才能正确解锁,释放这块临界区。

使用例子:

- (void)testRecursiveLock {
    NSRecursiveLock *recursiveLock = [NSRecursiveLock new];

    int (^__block fibBlock)(int) = ^(int num) {
        [recursiveLock lock];

        if (num < 0) {
            [recursiveLock unlock];
            return 0;
        }

        if (num == 1 || num == 2) {
            [recursiveLock unlock];
            return num;
        }
        int newValue = fibBlock(num - 1) + fibBlock(num - 2);
        [recursiveLock unlock];
        return newValue;
    };

    int value = fibBlock(10);
    NSLog(@"value is %d", value);
}

6. @synchronized

@synchronized 是牺牲性能来换取语法上的简洁。如果你想深入了解,建议你去读 这篇文章。这里说一下他的大概原理:

@synchronized 的加锁过程,大概是这个样子:

@try {
    objc_sync_enter(obj); // lock
    // 临界区
} @finally {
    objc_sync_exit(obj);    // unlock
}

@synchronized 的存储结构,是使用哈希表来实现的。当你传入一个对象后,会为这个对象分配一个锁。锁和对象打包成一个对象,然后和一个锁在进行二次打包成一个对象,可以理解为 value;通过一个算法,根据对象的地址得到一个值,作为 key。然后以 key-value 的形式写入哈希表。结构大概是这个样子:

存储的时候,是以哈希表结构存储,不是我上面画的顺序存储,上面只是一个节点而已。

@synchronized 的使用就很简单了 :

NSMutableArray *elementArray = [NSMutableArray array];

@synchronized(elementArray) {
   [elementArray addObject:[NSObject new]];
}

Pthreads

前面也说了,pthreads 是 POSIX Threads 的缩写。这个东西一般我们用不到,这里简单介绍一下。Pthreads 是POSIX的线程标准,定义了创建和操纵线程的一套API。实现POSIX 线程标准的库常被称作Pthreads,一般用于Unix-like POSIX 系统,如Linux、 Solaris。

NSThread

NSThread 是对内核 mach kernel 中的 mach thread 的封装,一个 NSThread 对象就是一个线程。使用频率比较低,除了 API 的使用,没什么可讲的。如果你已经熟悉这些 API,可以跳过这一节了。

1.初始化线程执行一个 task

使用初始化方法初始化一个 NSTherad 对象,调用 -[cancel]-[start-[main] 方法对线程进行操作,一般线程执行完即销毁,或者因为某种异常退出。

/** 使用 target 对象的中的方法作为执行主体,可以通过 argument 传递一些参数。
- (instancetype)initWithTarget:(id)target selector:(SEL)selector object:(nullable id)argument;

/** 使用 block 对象作为执行主体 */
- (instancetype)initWithBlock:(void (^)(void))block;

/** 类方法,上面对象方法需要调用 -[start] 方法启动线程,下面两个方法不需要手动启动 */
+ (void)detachNewThreadWithBlock:(void (^)(void))block;
+ (void)detachNewThreadSelector:(SEL)selector toTarget:(id)target withObject:(nullable id)argument;

2.在主线程执行一个 task

/** 说一下最后一个参数,这里你至少指定一个 mode 执行 selector,如果你传 nil 或者空数组,selector 不会执行,虽然方法定义写了 nullable */
- (void)performSelectorOnMainThread:(SEL)aSelector withObject:(nullable id)arg waitUntilDone:(BOOL)wait modes:(nullable NSArray<NSString *> *)array;

- (void)performSelectorOnMainThread:(SEL)aSelector withObject:(nullable id)arg waitUntilDone:(BOOL)wait;

3.在其他线程执行一个 task

/** modes 参数同上一个 */
- (void)performSelector:(SEL)aSelector onThread:(NSThread *)thr withObject:(nullable id)arg waitUntilDone:(BOOL)wait modes:(nullable NSArray<NSString *> *)array;

- (void)performSelector:(SEL)aSelector onThread:(NSThread *)thr withObject:(nullable id)arg waitUntilDone:(BOOL)wait

4.在后台线程执行一个 task

- (void)performSelectorInBackground:(SEL)aSelector withObject:(nullable id)arg;

5.获取当前线程

@property (class, readonly, strong) NSThread *currentThread;

使用线程相关方法时,记得设置好 name,方便后面调试。同时也设置好优先级等其他参数。

performSelector: 系列方法已经不太安全,慎用。

Grand Central Dispatch (GCD)

GCD 是基于 C 实现的一套 API,而且是开源的,如果有兴趣,可以在 这里 down 一份源码研究一下。GCD 是由系统帮我们处理多线程调度,很是方便,也是使用频率最高的。这一章节主要讲解一下 GCD 的原理和使用。

在讲解之前,我们先有个概览,看一下 GCD 为我们提供了那些东西:

系统所提供的 API,完全可以满足我们日常开发需求了。下面就根据这些模块分别讲解一下。

1. Dispatch Queue

GCD 为我们提供了两类队列,串行队列并行队列。两者的区别是:

  • 串行队列中,按照 FIFO 的顺序执行任务,前面一个任务执行完,后面一个才开始执行。
  • 并行队列中,也是按照 FIFO 的顺序执行任务,只要前一个被拿去执行,继而后面一个就开始执行,后面的任务无需等到前面的任务执行完再开始执行。

除此之外,还要解释一个容易混淆的概念,并发并行

  • 并发:是指单独部分可以同时执行,但是需要系统决定怎样发生。
  • 并行:两个任务互不干扰,同时执行。单核设备,系统需要通过切换上下文来实现并发;多核设备,系统可以通过并行来执行并发任务。

最后,还有一个概念,同步异步

  • 同步 : 同步执行的任务会阻塞当前线程。
  • 异步 : 异步执行的任务不会阻塞当前线程。是否开启新的线程,由系统管理。如果当前有空闲的线程,使用当前线程执行这个异步任务;如果没有空闲的线程,而且线程数量没有达到系统最大,则开启新的线程;如果线程数量已经达到系统最大,则需要等待其他线程中任务执行完毕。
队列

我们使用时,一般使用这几个队列:

  • 主队列 - dispatch_get_main_queue :一个特殊的串行队列。在 GCD 中,方法主队列中的任务都是在主线程执行。当我们更新 UI 时想 dispatch 到主线程,可以使用这个队列。

    - (void)viewDidLoad {
    [super viewDidLoad];
        dispatch_async(dispatch_get_main_queue(),   ^{
            // UI 相关操作
        });
    }
  • 全局并行队列 - dispatch_get_global_queue : 系统提供的一个全局并行队列,我们可以通过指定参数,来获取不同优先级的队列。系统提供了四个优先级,所以也可以认为系统为我们提供了四个并行队列,分别为 :

    • DISPATCH_QUEUE_PRIORITY_HIGH
    • DISPATCH_QUEUE_PRIORITY_DEFAULT
    • DISPATCH_QUEUE_PRIORITY_LOW
    • DISPATCH_QUEUE_PRIORITY_BACKGROUND
    dispatch_queue_t queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH, 0);
    dispatch_async(queue, ^{
        // 相关操作
    });
  • 自定义队列 :你可以自己定义串行或者并行队列,来执行一些相关的任务,平时开发中也建议用自定义队列。创建自定义队列时,需要两个参数。一个是队列的名字,方便我们再调试时查找队列使用,命名方式采用的是反向 DNS 命名规则;一个是队列类型,传 NULL 或者 DISPATCH_QUEUE_SERIAL 代表串行队列,传 DISPATCH_QUEUE_CONCURRENT 代表并行队列,通常情况下,不要传 NULL,会降低可读性。
    DISPATCH_QUEUE_SERIAL_INACTIVE 代表串行不活跃队列,DISPATCH_QUEUE_CONCURRENT_INACTIVE 代表并行不活跃队列,在执行 block 任务时,需要被激活。

    dispatch_queue_t queue = dispatch_queue_create("com.bool.dispatch",DISPATCH_QUEUE_SERIAL);
  • 你可以使用 dispatch_queue_set_specificdispatch_queue_get_specificdispatch_get_specific 方法,为 queue 设置关联的 key 或者根据 key 找到关联对象等操作。

可以说,系统为我们提供了 5 中不同的队列,运行在主线程中的 main queue;3 个不同优先级的 global queue; 一个优先级更低的 background queue。除此之外,开发者可以自定义一些串行和并行队列,这些自定义队列中被调度的所有 block 最终都会被放到系统全局队列和线程池中,后面会讲这部分原理。盗用一张经典图:

同步 VS 异步

我们大多数情况下,都是使用 dispatch_asyn() 做异步操作,因为程序本来就是顺序执行,很少用到同步操作。有时候我们会把 dispatch_syn() 当做锁来用,以达到保护的作用。

系统维护的是一个队列,根据 FIFO 的规则,将 dispatch 到队列中的任务一一执行。有时候我们想把一些任务延后执行以下,例如 App 启动时,我想让主线程中一个耗时的工作放在后,可以尝试用一下 dispatch_asyn(),相当于把任务重新追加到了队尾。

dispatch_async(dispatch_get_main_queue(), ^{
        // 想要延后的任务
    });

通常情况下,我们使用 dispatch_asyn() 是不会造成死锁的。死锁一般出现在使用 dispatch_syn() 的时候。例如:

dispatch_sync(dispatch_get_main_queue(), ^{
   NSLog(@"dead lock");
});

想上面这样写,启动就会报错误。以下情况也如此:

dispatch_queue_t queue = dispatch_queue_create("com.bool.dispatch", DISPATCH_QUEUE_SERIAL);
    dispatch_async(queue, ^{
        NSLog(@"dispatch asyn");
        dispatch_sync(queue, ^{
            NSLog(@"dispatch asyn -> dispatch syn");
        });
    });

在上面的代码中,dispatch_asyn() 整个 block(称作 blcok_asyn) 当做一个任务追加到串行队列队尾,然后开始执行。在 block_asyn 内部中,又进行了 dispatch_syn(),想想要执行 block_syn。因为是串行队列,需要前一个执行完(block_asyn),再执行后面一个(block_syn);但是要执行完 block_asyn,需要执行内部的 block_syn。互相等待,形成死锁。

现实开发中,还有更复杂的死锁场景。不过现在编译器很友好,我们能在编译执行时就检测到了。

基本原理

针对下面这几行代码,我们分析一下它的底层过程:

- (void)viewDidLoad {
    [super viewDidLoad];
    dispatch_queue_t queue = dispatch_queue_create("com.bool.dispatch", DISPATCH_QUEUE_SERIAL);
    dispatch_async(queue, ^{
        NSLog(@"dispatch asyn test");
    });
}

创建队列

源码很长,但实际只有一个方法,逻辑比较清晰,如下:

/** 开发者调用的方法 */
dispatch_queue_t
dispatch_queue_create(const char *label, dispatch_queue_attr_t attr)
{
    return _dispatch_queue_create_with_target(label, attr,
            DISPATCH_TARGET_QUEUE_DEFAULT, true);
}

/** 内部实际调用方法 */
DISPATCH_NOINLINE
static dispatch_queue_t
_dispatch_queue_create_with_target(const char *label, dispatch_queue_attr_t dqa,
        dispatch_queue_t tq, bool legacy)
{
    // 1.初步判断
    if (!slowpath(dqa)) {
        dqa = _dispatch_get_default_queue_attr();
    } else if (dqa->do_vtable != DISPATCH_VTABLE(queue_attr)) {
        DISPATCH_CLIENT_CRASH(dqa->do_vtable, "Invalid queue attribute");
    }

    // 2.配置队列参数
    dispatch_qos_t qos = _dispatch_priority_qos(dqa->dqa_qos_and_relpri);
#if !HAVE_PTHREAD_WORKQUEUE_QOS
    if (qos == DISPATCH_QOS_USER_INTERACTIVE) {
        qos = DISPATCH_QOS_USER_INITIATED;
    }
    if (qos == DISPATCH_QOS_MAINTENANCE) {
        qos = DISPATCH_QOS_BACKGROUND;
    }
#endif // !HAVE_PTHREAD_WORKQUEUE_QOS

    _dispatch_queue_attr_overcommit_t overcommit = dqa->dqa_overcommit;
    if (overcommit != _dispatch_queue_attr_overcommit_unspecified && tq) {
        if (tq->do_targetq) {
            DISPATCH_CLIENT_CRASH(tq, "Cannot specify both overcommit and "
                    "a non-global target queue");
        }
    }

    if (tq && !tq->do_targetq &&
            tq->do_ref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT) {
        // Handle discrepancies between attr and target queue, attributes win
        if (overcommit == _dispatch_queue_attr_overcommit_unspecified) {
            if (tq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT) {
                overcommit = _dispatch_queue_attr_overcommit_enabled;
            } else {
                overcommit = _dispatch_queue_attr_overcommit_disabled;
            }
        }
        if (qos == DISPATCH_QOS_UNSPECIFIED) {
            dispatch_qos_t tq_qos = _dispatch_priority_qos(tq->dq_priority);
            tq = _dispatch_get_root_queue(tq_qos,
                    overcommit == _dispatch_queue_attr_overcommit_enabled);
        } else {
            tq = NULL;
        }
    } else if (tq && !tq->do_targetq) {
        // target is a pthread or runloop root queue, setting QoS or overcommit
        // is disallowed
        if (overcommit != _dispatch_queue_attr_overcommit_unspecified) {
            DISPATCH_CLIENT_CRASH(tq, "Cannot specify an overcommit attribute "
                    "and use this kind of target queue");
        }
        if (qos != DISPATCH_QOS_UNSPECIFIED) {
            DISPATCH_CLIENT_CRASH(tq, "Cannot specify a QoS attribute "
                    "and use this kind of target queue");
        }
    } else {
        if (overcommit == _dispatch_queue_attr_overcommit_unspecified) {
             // Serial queues default to overcommit!
            overcommit = dqa->dqa_concurrent ?
                    _dispatch_queue_attr_overcommit_disabled :
                    _dispatch_queue_attr_overcommit_enabled;
        }
    }
    if (!tq) {
        tq = _dispatch_get_root_queue(
                qos == DISPATCH_QOS_UNSPECIFIED ? DISPATCH_QOS_DEFAULT : qos,
                overcommit == _dispatch_queue_attr_overcommit_enabled);
        if (slowpath(!tq)) {
            DISPATCH_CLIENT_CRASH(qos, "Invalid queue attribute");
        }
    }

    // 3. 初始化队列
    if (legacy) {
        // if any of these attributes is specified, use non legacy classes
        if (dqa->dqa_inactive || dqa->dqa_autorelease_frequency) {
            legacy = false;
        }
    }

    const void *vtable;
    dispatch_queue_flags_t dqf = 0;
    if (legacy) {
        vtable = DISPATCH_VTABLE(queue);
    } else if (dqa->dqa_concurrent) {
        vtable = DISPATCH_VTABLE(queue_concurrent);
    } else {
        vtable = DISPATCH_VTABLE(queue_serial);
    }
    switch (dqa->dqa_autorelease_frequency) {
    case DISPATCH_AUTORELEASE_FREQUENCY_NEVER:
        dqf |= DQF_AUTORELEASE_NEVER;
        break;
    case DISPATCH_AUTORELEASE_FREQUENCY_WORK_ITEM:
        dqf |= DQF_AUTORELEASE_ALWAYS;
        break;
    }
    if (legacy) {
        dqf |= DQF_LEGACY;
    }
    if (label) {
        const char *tmp = _dispatch_strdup_if_mutable(label);
        if (tmp != label) {
            dqf |= DQF_LABEL_NEEDS_FREE;
            label = tmp;
        }
    }

    dispatch_queue_t dq = _dispatch_object_alloc(vtable,
            sizeof(struct dispatch_queue_s) - DISPATCH_QUEUE_CACHELINE_PAD);
    _dispatch_queue_init(dq, dqf, dqa->dqa_concurrent ?
            DISPATCH_QUEUE_WIDTH_MAX : 1, DISPATCH_QUEUE_ROLE_INNER |
            (dqa->dqa_inactive ? DISPATCH_QUEUE_INACTIVE : 0));

    dq->dq_label = label;
#if HAVE_PTHREAD_WORKQUEUE_QOS
    dq->dq_priority = dqa->dqa_qos_and_relpri;
    if (overcommit == _dispatch_queue_attr_overcommit_enabled) {
        dq->dq_priority |= DISPATCH_PRIORITY_FLAG_OVERCOMMIT;
    }
#endif
    _dispatch_retain(tq);
    if (qos == QOS_CLASS_UNSPECIFIED) {
        // legacy way of inherithing the QoS from the target
        _dispatch_queue_priority_inherit_from_target(dq, tq);
    }
    if (!dqa->dqa_inactive) {
        _dispatch_queue_inherit_wlh_from_target(dq, tq);
    }
    dq->do_targetq = tq;
    _dispatch_object_debug(dq, "%s", __func__);
    return _dispatch_introspection_queue_create(dq);
}

根据代码生成的流程图,不想看代码直接看图,下同:

根据流程图,这个方法的步骤如下:

  • 开发者调用 dispatch_queue_create() 方法之后,内部会调用 _dispatch_queue_create_with_target() 方法。
  • 然后进行初步判断,多数情况下,我们是不会传队列类型的,都是穿 NULL,所以这里是个 slowpath。如果传了参数,但是不是规定的队列类型,系统会认为你是个智障,并抛出错误。
  • 然后初始化一些配置项。主要是 target_queue,overcommit 项和 qos。target_queue 是依赖的目标队列,像任何队列提交的任务(block),最终都会放到目标队列中执行;支持 overcommit 时,每当想队列提交一个任务时,都会开一个新的线程处理,这样是为了避免单一线程任务太多而过载;qos 是队列优先级,之前已经说过。
  • 然后进入判断分支。普通的串行队列的目标队列,就是一个支持 overcommit 的全局队列(对应 else 分支);当前 tq 对象的引用计数为 DISPATCH_OBJECT_GLOBAL_REFCNT (永远不会释放)时,且还没有目标队列时,才可以设置 overcommit 项,而且当优先级为 DISPATCH_QOS_UNSPECIFIED 时,需要重置 tq (对应 if 分支);其他情况(else if 分支)。
  • 然后配置队列的标识,以方便在调试时找到自己的那个队列。
  • 使用 _dispatch_object_alloc 方法申请一个 dispatch_queue_t 对象空间,dq。
  • 根据传入的信息(并行 or 串行;活跃 or 非活跃)来初始化这个队列。并行队列的 width 会设置为 DISPATCH_QUEUE_WIDTH_MAX 即最大,不设限;串行的会设置为 1。
  • 将上面获得配置项,目标队列,是否支持 overcommit,优先级和 dq 绑定。
  • 返回这个队列。返回去还输出了一句信息,便于调试。

异步执行

这个版本异步执行的代码,因为方法拆分很多,所以显得很乱。源码如下:

/** 开发者调用 */
void
dispatch_async(dispatch_queue_t dq, dispatch_block_t work)
{
    dispatch_continuation_t dc = _dispatch_continuation_alloc();
    uintptr_t dc_flags = DISPATCH_OBJ_CONSUME_BIT;

    _dispatch_continuation_init(dc, dq, work, 0, 0, dc_flags);
    _dispatch_continuation_async(dq, dc);
}

/** 内部调用,包一层,再深入调用 */
DISPATCH_NOINLINE
void
_dispatch_continuation_async(dispatch_queue_t dq, dispatch_continuation_t dc)
{
    _dispatch_continuation_async2(dq, dc,
            dc->dc_flags & DISPATCH_OBJ_BARRIER_BIT);
}

/** 根据 barrier 关键字区别串行还是并行,分两支 */
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_continuation_async2(dispatch_queue_t dq, dispatch_continuation_t dc,
        bool barrier)
{
    if (fastpath(barrier || !DISPATCH_QUEUE_USES_REDIRECTION(dq->dq_width))) {
        // 串行
        return _dispatch_continuation_push(dq, dc);
    }

    // 并行
    return _dispatch_async_f2(dq, dc);
}

/** 并行又多了一层调用,就是这个方法 */
DISPATCH_NOINLINE
static void
_dispatch_async_f2(dispatch_queue_t dq, dispatch_continuation_t dc)
{
    if (slowpath(dq->dq_items_tail)) {// 少路径
        return _dispatch_continuation_push(dq, dc);
    }

    if (slowpath(!_dispatch_queue_try_acquire_async(dq))) {// 少路径
        return _dispatch_continuation_push(dq, dc);
    }
    // 多路径
    return _dispatch_async_f_redirect(dq, dc,
            _dispatch_continuation_override_qos(dq, dc));
}

/** 主要用来重定向 */
DISPATCH_NOINLINE
static void
_dispatch_async_f_redirect(dispatch_queue_t dq,
        dispatch_object_t dou, dispatch_qos_t qos)
{
    if (!slowpath(_dispatch_object_is_redirection(dou))) {
        dou._dc = _dispatch_async_redirect_wrap(dq, dou);
    }
    dq = dq->do_targetq;

    // Find the queue to redirect to
    while (slowpath(DISPATCH_QUEUE_USES_REDIRECTION(dq->dq_width))) {
        if (!fastpath(_dispatch_queue_try_acquire_async(dq))) {
            break;
        }
        if (!dou._dc->dc_ctxt) {
            dou._dc->dc_ctxt = (void *)
                    (uintptr_t)_dispatch_queue_autorelease_frequency(dq);
        }
        dq = dq->do_targetq;
    }

    // 同步异步最终都是调用的这个方法,将任务追加到队列中
    dx_push(dq, dou, qos);
}

... 省略一些调用层级,

/** 核心方法,通过 dc_flags 参数区分了是 group,还是串行,还是并行 */
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_continuation_invoke_inline(dispatch_object_t dou, voucher_t ov,
        dispatch_invoke_flags_t flags)
{
    dispatch_continuation_t dc = dou._dc, dc1;
    dispatch_invoke_with_autoreleasepool(flags, {
        uintptr_t dc_flags = dc->dc_flags;
        _dispatch_continuation_voucher_adopt(dc, ov, dc_flags);
        if (dc_flags & DISPATCH_OBJ_CONSUME_BIT) { // 并行
            dc1 = _dispatch_continuation_free_cacheonly(dc);
        } else {
            dc1 = NULL;
        }
        if (unlikely(dc_flags & DISPATCH_OBJ_GROUP_BIT)) { // group
            _dispatch_continuation_with_group_invoke(dc);
        } else { // 串行
            _dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
            _dispatch_introspection_queue_item_complete(dou);
        }
        if (unlikely(dc1)) {
            _dispatch_continuation_free_to_cache_limit(dc1);
        }
    });
    _dispatch_perfmon_workitem_inc();
}

不想看代码,直接看图:

根据流程图描述一下过程:

  • 首先开发者调用 dispatch_async() 方法,然后内部创建了一个 _dispatch_continuation_init 队列,将 queue、block 这些信息和这个 dc 绑定起来。这过程中 copy 了 block。
  • 然后经过了几个层次的调用,主要为了区分并行还是串行。
  • 如果是串行(这种情况比较常见,所以是 fastpath),直接就 dx_push 了,其实就是讲任务追加到一个链表里面。
  • 如果是并行,需要做重定向。之前我们说过,放到队列中的任务,最终都会以各种形式追加到目标队列里面。在 _dispatch_async_f_redirect 方法中,重新寻找依赖目标队列,然后追加过去。
  • 经过一系列调用,我们会在 _dispatch_continuation_invoke_inline 方法里区分串行还是并行。因为这个方法会被频繁调用,所以定义成了内联函数。对于串行队列,我们使用信号量控制,执行前信号量置为 wait,执行完毕后发送 singal;对于调度组,我们会在执行完之后调用 dispatch_group_leave
  • 底层的线程池,是使用 pthread 维护的,所以最终都会使用 pthread 来处理这些任务。

同步执行

同步执行,相对来说比较简单,源码如下 :

/** 开发者调用 */
void
dispatch_sync(dispatch_queue_t dq, dispatch_block_t work)
{
    if (unlikely(_dispatch_block_has_private_data(work))) {
        return _dispatch_sync_block_with_private_data(dq, work, 0);
    }
    dispatch_sync_f(dq, work, _dispatch_Block_invoke(work));
}

/** 内部调用 */
DISPATCH_NOINLINE
void
dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
{
    if (likely(dq->dq_width == 1)) {
        return dispatch_barrier_sync_f(dq, ctxt, func);
    }

    // Global concurrent queues and queues bound to non-dispatch threads
    // always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
    if (unlikely(!_dispatch_queue_try_reserve_sync_width(dq))) {
        return _dispatch_sync_f_slow(dq, ctxt, func, 0);
    }

    _dispatch_introspection_sync_begin(dq);
    if (unlikely(dq->do_targetq->do_targetq)) {
        return _dispatch_sync_recurse(dq, ctxt, func, 0);
    }
    _dispatch_sync_invoke_and_complete(dq, ctxt, func);
}

同步执行,相对来说简单些,大体逻辑差不多。偷懒一下,就不画图了,直接描述:

  • 开发者使用 dispatch_sync() 方法,大多数路径,都会调用 dispatch_sync_f() 方法。
  • 如果是串行队列,则通过 dispatch_barrier_sync_f() 方法来保证原子操作。
  • 如果不是串行的(一般很少),我们使用 _dispatch_introspection_sync_begin_dispatch_sync_invoke_and_complete 来保证同步。
dispatch_after

dispatch_after 一般用于延后执行一些任务,可以用来代替 NSTimer,因为有时候 NSTimer 问题太多了。在后面的一章里,我会总体讲一下多线程中的问题,这里就不详细说了。一般我们这样来使用 dispatch_after :

- (void)viewDidLoad {
    [super viewDidLoad];
    dispatch_queue_t queue = dispatch_queue_create("com.bool.dispatch", DISPATCH_QUEUE_SERIAL);
    dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(NSEC_PER_SEC * 2.0f)),queue, ^{
        // 2.0 second execute
    });
}

在做页面过渡时,刚进入到新的页面我们并不会立即更新一些 view,为了引起用户注意,我们会过会儿再进行更新,可以中此 API 来完成。

源码如下:

DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_after(dispatch_time_t when, dispatch_queue_t queue,
        void *ctxt, void *handler, bool block)
{
    dispatch_timer_source_refs_t dt;
    dispatch_source_t ds;
    uint64_t leeway, delta;

    if (when == DISPATCH_TIME_FOREVER) {
#if DISPATCH_DEBUG
        DISPATCH_CLIENT_CRASH(0, "dispatch_after called with 'when' == infinity");
#endif
        return;
    }

    delta = _dispatch_timeout(when);
    if (delta == 0) {
        if (block) {
            return dispatch_async(queue, handler);
        }
        return dispatch_async_f(queue, ctxt, handler);
    }
    leeway = delta / 10; // <rdar://problem/13447496>

    if (leeway < NSEC_PER_MSEC) leeway = NSEC_PER_MSEC;
    if (leeway > 60 * NSEC_PER_SEC) leeway = 60 * NSEC_PER_SEC;

    // this function can and should be optimized to not use a dispatch source
    ds = dispatch_source_create(&_dispatch_source_type_after, 0, 0, queue);
    dt = ds->ds_timer_refs;

    dispatch_continuation_t dc = _dispatch_continuation_alloc();
    if (block) {
        _dispatch_continuation_init(dc, ds, handler, 0, 0, 0);
    } else {
        _dispatch_continuation_init_f(dc, ds, ctxt, handler, 0, 0, 0);
    }
    // reference `ds` so that it doesn't show up as a leak
    dc->dc_data = ds;
    _dispatch_trace_continuation_push(ds->_as_dq, dc);
    os_atomic_store2o(dt, ds_handler[DS_EVENT_HANDLER], dc, relaxed);

    if ((int64_t)when < 0) {
        // wall clock
        when = (dispatch_time_t)-((int64_t)when);
    } else {
        // absolute clock
        dt->du_fflags |= DISPATCH_TIMER_CLOCK_MACH;
        leeway = _dispatch_time_nano2mach(leeway);
    }
    dt->dt_timer.target = when;
    dt->dt_timer.interval = UINT64_MAX;
    dt->dt_timer.deadline = when + leeway;
    dispatch_activate(ds);
}

dispatch_after() 内部会调用 _dispatch_after() 方法,然后先判断延迟时间。如果为 DISPATCH_TIME_FOREVER(永远不执行),则会出现异常;如果为 0 则立即执行;否则的话会创建一个 dispatch_timer_source_refs_t 结构体指针,将上下文相关信息与之关联。然后使用 dispatch_source 相关方法,将定时器和 block 任务关联起来。定时器时间到时,取出 block 任务开始执行。

dispatch_once

如果我们有一段代码,在 App 生命周期内最好只初始化一次,这时候使用 dispatch_once 最好不过了。例如我们单例中经常这样用:

+ (instancetype)sharedManager {
    static BLDispatchManager *sharedInstance = nil;
    static dispatch_once_t onceToken;
    dispatch_once(&onceToken, ^{
        sharedInstance = [[BLDispatchManager alloc] initPrivate];
    });

    return sharedInstance;
}

还有在定义 NSDateFormatter 时使用:

- (NSString *)todayDateString {
    static NSDateFormatter *formatter = nil;
    static dispatch_once_t onceToken;
    dispatch_once(&onceToken, ^{
        formatter = [NSDateFormatter new];
        formatter.locale = [NSLocale localeWithLocaleIdentifier:@"en_US_POSIX"];
        formatter.timeZone = [NSTimeZone timeZoneForSecondsFromGMT:8 * 3600];
        formatter.dateFormat = @"yyyyMMdd";
    });

    return [formatter stringFromDate:[NSDate date]];
}

因为这是很常用的一个代码片段,所以被加在了 Xcode 的 code snippet 中。

它的源代码如下:

/** 一个结构体,里面为当前的信号量、线程端口和指向下一个节点的指针 */
typedef struct _dispatch_once_waiter_s {
    volatile struct _dispatch_once_waiter_s *volatile dow_next;
    dispatch_thread_event_s dow_event;
    mach_port_t dow_thread;
} *_dispatch_once_waiter_t;

/** 我们调用的方法 */
void
dispatch_once(dispatch_once_t *val, dispatch_block_t block)
{
    dispatch_once_f(val, block, _dispatch_Block_invoke(block));
}

/** 实际执行的方法 */
DISPATCH_NOINLINE
void
dispatch_once_f(dispatch_once_t *val, void *ctxt, dispatch_function_t func)
{
#if !DISPATCH_ONCE_INLINE_FASTPATH
    if (likely(os_atomic_load(val, acquire) == DLOCK_ONCE_DONE)) {
        return;
    }
#endif // !DISPATCH_ONCE_INLINE_FASTPATH
    return dispatch_once_f_slow(val, ctxt, func);
}

DISPATCH_ONCE_SLOW_INLINE
static void
dispatch_once_f_slow(dispatch_once_t *val, void *ctxt, dispatch_function_t func)
{
#if DISPATCH_GATE_USE_FOR_DISPATCH_ONCE
    dispatch_once_gate_t l = (dispatch_once_gate_t)val;

    if (_dispatch_once_gate_tryenter(l)) {
        _dispatch_client_callout(ctxt, func);
        _dispatch_once_gate_broadcast(l);
    } else {
        _dispatch_once_gate_wait(l);
    }
#else
    _dispatch_once_waiter_t volatile *vval = (_dispatch_once_waiter_t*)val;
    struct _dispatch_once_waiter_s dow = { };
    _dispatch_once_waiter_t tail = &dow, next, tmp;
    dispatch_thread_event_t event;

    if (os_atomic_cmpxchg(vval, NULL, tail, acquire)) {
        dow.dow_thread = _dispatch_tid_self();
        _dispatch_client_callout(ctxt, func);

        next = (_dispatch_once_waiter_t)_dispatch_once_xchg_done(val);
        while (next != tail) {
            tmp = (_dispatch_once_waiter_t)_dispatch_wait_until(next->dow_next);
            event = &next->dow_event;
            next = tmp;
            _dispatch_thread_event_signal(event);
        }
    } else {
        _dispatch_thread_event_init(&dow.dow_event);
        next = *vval;
        for (;;) {
            if (next == DISPATCH_ONCE_DONE) {
                break;
            }
            if (os_atomic_cmpxchgv(vval, next, tail, &next, release)) {
                dow.dow_thread = next->dow_thread;
                dow.dow_next = next;
                if (dow.dow_thread) {
                    pthread_priority_t pp = _dispatch_get_priority();
                    _dispatch_thread_override_start(dow.dow_thread, pp, val);
                }
                _dispatch_thread_event_wait(&dow.dow_event);
                if (dow.dow_thread) {
                    _dispatch_thread_override_end(dow.dow_thread, val);
                }
                break;
            }
        }
        _dispatch_thread_event_destroy(&dow.dow_event);
    }
#endif
}

不想看代码直接看图 (emmm... 根据逻辑画完图才发现,其实这个图也挺乱的,所以我将两个主分支用不同颜色标记处理):

根据这个图,我来表述一下主要过程:

  • 我们调用 dispatch_once() 方法之后,内部多数情况下会调用 dispatch_once_f_slow() 方法,这个方法才是真正的执行方法。

  • os_atomic_cmpxchg(vval, NULL, tail, acquire) 这个方法,执行过程实际是这个样子

    if (*vval == NULL) {
        *vval = tail = &dow;
        return true;
    } else {
        return false
    }

    我们初始化的 once_token,也就是 *vval 实际是 0,所以第一次执行时是返回 true 的。if() 中的这个方法是原子操作,也就是说,如果多个线程同时调用这个方法,只有一个线程会进入 true 的分支,其他都进入 else 分支。

  • 这里先说进入 true 分支。进入之后,会执行对应的 block,也就是对应的任务。然后 next 指向 *vval, *vval 标记为 DISPATCH_ONCE_DONE,即执行的是这样一个过程:

    ```c
    next = (_dispatch_once_waiter_t)_dispatch_once_xchg_done(val);
    // 实际执行时这样的
    next = *vval;

top Created with Sketch.