Linux内核分析及应用
上QQ阅读APP看书,第一时间看更新

第2章 并发

很多程序员在面试的时候经常会被问到线程安全相关的问题,比如什么是线程安全,什么又是线程不安全,假如线程不安全,如何解决才能做到线程安全。这时候,往往会出现五花八门的答案,而且大多数都是本末倒置。很多时候,人们经常会用一些现象来回答问题,比如房价高这个问题,很多时候大家就会归结于某些现象:温州炒房团、丈母娘经济、对比国际大城市房价等。但是,我们需要的是“原理性的解释”,比如影响房价的经济学原理如供需关系、不均衡分布等。

再回归到线程安全问题,这是一个非常经典的问题,需要搞懂并发原理,才能搞清楚线程安全。任何事物的发展,都是有因果关系的,就像霍金博士一生孜孜不倦地研究,无非也就是想搞懂人类从哪里来,站在何方,将要去向哪里等大问题。所以针对并发这样的话题,我们学习的思路应该是这样的:

‰并发到底是什么,如何在系统中产生。

‰并发会带来什么问题。

‰如何解决并发带来的问题。

我觉得这个思考方式,应该可以用于大部分技术原理的学习和研究了。只有带着正确的问题出发,才有可能得到你想要的答案。下面我们就根据以上3个问题对并发相关的话题进行探讨,在后续的章节中,我还会反复强调这样的思考方式。

本章先介绍并发原理,再分析Linux中的并发相关工具,最后介绍开源软件中的并发问题是如何解决的。

2.1 什么是并发

首先我们需要搞清楚到底什么是并发,它在系统中又是以何种形式存在的。

2.1.1 并发是如何产生的

在操作系统中,一个时间段中有几个程序都处于已启动运行到运行完毕之间,且这几个程序都是在同一个处理器上运行,这种情形叫并发。但是,在任一个时刻只有一个程序在处理器上运行。

从这个过程中我们大致可以了解到,并发主要和处理器(CPU)有关,当同时有多个运行中的程序需要占用处理器资源,就形成了并发。图2-1总结了并发的两种场景,第一种场景是多个进程使用同一个处理器内核资源,第二种场景是多个进程使用不同的处理器内核资源。

图2-1 两种并发场景

2.1.2 并发会带来什么问题

针对上面介绍的并发两种场景,会有不同的问题。我们先来分析第一种场景,多个进程同时使用同一个处理器核(core)资源。我们知道一个处理器核在同一时刻只能被一个进程占用,那么,从微观角度讲真正的并发应该不存在,应该不会有任何问题才对呀?很遗憾,事实情况并非如此,为了防止CPU资源被同一个进程长期占用,大部分硬件都会提供时钟中断机制,在中断发生的时候,会进行进程的切换,当前进程会让出CPU,并且让其他进程能获得CPU的机会。因为进程切换的存在,假如共享同一个内存变量,就会存在代码临界区,比如i++操作,就不能保证原子性,如图2-2所示。因为i++其实分为两个步骤:

1)add i

2)set i

图2-2 多个进程同时使用同一个处理器核的情况

假设i=0,当进程1执行完add i后,就发生了切换。进程2重新开始执行add i,那么2个进程都执行完i++之后,结果i的值还是1。

所以,在这种情况下,并发带来的问题就是进程切换造成的代码临界区。

我们来分析并发的第二种场景,多个进程同时使用多个CPU核。在这种情况下,会引发两种问题。第一种问题和多个进程使用1个CPU核引发的问题一样,由于先天就是多个核并行执行多个进程的程序,假如共享同一个变量操作,必然会存在代码临界区。

第二种问题如图2-3所示,我们可以发现,因为CPU每个核都维护了一个L2 cache(二级缓存),其目的是为了减少与内存之间的交互,提升数据的访问速度。但是这样,就会造成主存中的数据复制存在多份在各自的L2 cache中,导致数据不一致。这就是CPU二级缓存和内存之间的可见性问题。

图2-3 多个进程同时使用多个处理器核的情况

2.1.3 如何解决并发带来的问题

上节分析了并发带来的问题,归根结底就2类:

‰代码临界区的问题。

‰主存可见性的问题。

下面我们分别来介绍这两类问题的解决方案。

先说代码临界区问题。孙子曰:“百战百胜,非善之善者也;不战而屈人之兵,善之善者也。”也就是说最好的战争方式,就是不要发动战争,通过谋略让对手投降。杀敌一千,自损八百,很是划不来。所以,处理代码临界区的问题也是一样,最好的方式就是消除临界区。很多时候,临界区是由于自己考虑不周到,代码编写方式不正确造成的,只要设计得当,是有可能消除的。

不过凡事无绝对,假如不能消除临界区,那么我们只能硬着头皮想办法对付了。前面我们分析临界区出现问题是因为多个进程同时进入了临界区,造成了逻辑的混乱。所以,我们可以把临界区作为一个整体,让多个进程串行通过临界区,达到保护临界区的目的。这样的机制我们就叫做同步。同步在技术上一般都是通过锁机制来解决的,后面我们会具体分析Linux中的不同锁实现方式。

另外像i++这样的操作,一般都会在硬件级别提供原子操作指令作为解决方案,本章我们也会介绍原子变量的实现方法,一般都会通过cmpxgl这样原子指令来支持。

接着来看主存可见性的问题。多个进程依赖同一个内存变量,那么为了保证可见性,可以通过让L2 cache强制失效,都去主存中取数据。有时候编译器为了提升程序执行效率,都会对编译后的代码进行优化,让某些指令在上下文中的结果依赖L2 cache,我们可以通过内存屏障等方式,去除编译器优化,本章后面会具体介绍这种方法。

2.2 操作系统会在哪些场景遇到并发

在互联网时代来临之前,内核虽然生来就被设计成支持多用户的,但是很少面临高并发请求考验,多用户的操作很多时候都是人工来进行的,人敲键盘的速度再快也很难达到秒级的。所以,最开始,并发仅仅针对内核级别,给内核加了一把大内核锁(BKL)。一旦某个用户在使用内核,其他用户则无法获取内核资源。

但是大内核锁太粗暴了,粒度太大。在互联网应用场景就吃不消了。互联网时代,针对不同的细节场景,开发了不同的内核工具来解决相应的问题。图2-4介绍了Linux内核不同并发场景提供的工具实现。

图2-4 Linux内核针对不同并发场景的工具实现

我把操作系统和并发相关的场景归为4类:

1)和CPU相关的原子变量(Atomic)和自旋锁(Spin_lock)。

在并发访问的时候,我们需要保证对变量操作的原子性,通过Atomic变量解决该问题。其实自旋锁的使用场景和互斥锁类似,都是为了保护临界区资源,但是自旋锁是在CPU上进行的忙等,所以暂时就把它和原子变量归为一类了。

2)围绕代码临界区控制的相关工具有:信号量(Semaphore)、互斥(Mutex)、读写锁(Rw-lock)、抢占(Preempt)。

有时候要对多个线程进行精细化控制,就要用到信号量了,下面引用百度百科中的例子:

以一个停车场的运作为例。简单起见,假设停车场只有三个车位,一开始三个车位都是空的。这时如果同时来了五辆车,看门人允许其中三辆直接进入,然后放下车拦,剩下的车则必须在入口等待,此后来的车也都不得不在入口处等待。这时,有一辆车离开停车场,看门人得知后,打开车拦,放入外面的一辆进去,如果又离开两辆车,则又可以放入两辆车,如此往复。在这个停车场系统中,车位是公共资源,每辆车好比一个线程,看门人就是起到了信号量的作用。

互斥从某种角度来讲,可以理解为池子大小为1的信号量,它和信号量的原理类似,都会让无法获取资源的线程睡眠。

很多时候并发的访问往往都是读大于写,为了提高该场景的性能,内核提供了读写锁进行优化访问控制。

3)从CPU缓存角度,为优化多核本地访问的性能,内核提供了per-cpu变量。

在多核场景,为了解决并发访问内存的问题,经常需要锁住总线,这样效率很低。很多时候并发的最好方案就是没有并发,per-cpu变量的设计正是基于这样的思路。

4)从内存角度,为提升多核同时访问内存的效率提供了RCU机制,另外,为了解决内存访问有序性问题,提供了内存屏障(memory barrier)。假如需要多核同时写同一共享数据,要保证不出问题,我能想到的也就是Copy On Write这样的思路,RCU机制就是基于这个思路的实现。

程序在运行时内存实际的访问顺序和程序代码编写的访问顺序不一定一致,这就是内存乱序访问。内存乱序访问行为出现的理由是为了提升程序运行时的性能。在并发场景下,这种乱序就具有不确定性,内存屏障就是用来消除这种不确定性,保证并发场景的可靠性。

2.3 Linux中并发工具的实现

通过上一节的介绍,我们大概了解了内核中的并发场景,以及Linux提供的相应工具,本节把这些工具的实现简单分析一下。

2.3.1 原子变量

原子变量是在并发场景经常使用的工具,很多并发工具都是基于原子变量来实现的,比如自旋锁。原子变量对其进行的读写操作都必须保证原子性,也就是原子操作。

1.什么是原子操作

对于i++这样的操作,如果要在双核的CPU上每核都执行这条指令,假如现在i=1,那么执行完之后,你希望第一个核执行完之后i被设置为2,第二个核执行完之后i被设置为3。但是,由于i++这样的执行不是原子操作,所以2个核有可能同时取到i的值为1,然后加完之后i最终为2。

这种问题是典型的“读-修改-写”场景,避免该场景引发不一致问题就是确保这样的操作在芯片级是原子的。

x86在多核环境下,多核竞争数据总线的时候,提供了Lock指令来进行锁总线的操作,在《Intel开发者手册》卷3A,8.1.2.2中说明了Lock指令可以影响的指令集:

1)位测试和修改的指令(BTS、BTR和BTC)。

2)交换指令(XADD、CMPXCHG和CMPXCHG8B)。

3)Lock前缀会自动加在XCHG指令前。

4)单操作数逻辑运算指令:INC、DEC、NOT和NEG。

5)双操作数的逻辑运算指令:ADD、ADC、SUB、SBB、AND、OR和XOR。

2.原子变量(atomic)的实现

定义如下:

typedef struct {
    int counter;
} atomic_t;

add和sub方法:

static __always_inline void atomic_add(int i, atomic_t *v)
{
    asm volatile(LOCK_PREFIX "addl %1, %0"
                  : "+m" (v->counter)
                  : "ir" (i));
}
static __always_inline void atomic_sub(int i, atomic_t *v)
{
    asm volatile(LOCK_PREFIX "subl %1, %0"
                  : "+m" (v->counter)
                  : "ir" (i));
}

通过之前分析我们知道intel的原子指令保证操作的原子性。并且多核环境下使用lock来锁总线,保证串行访问总线。

读取方法为:

static __always_inline int atomic_read(const atomic_t *v)
{
    return READ_ONCE((v)->counter);
}

在读的时候为了防止脏读,READ_ONCE中加上了volatile去除编译器优化。

2.3.2 自旋锁

1.为什么使用自旋锁

由于自旋锁(Spin_lock)只是将当前线程不停地执行循环体,而不改变线程的运行状态,所以响应速度更快。但当线程数不断增加时,性能下降明显,因为每个线程都需要执行,占用CPU时间。所以它保护的临界区必须小,且操作过程必须短。很多时候内核资源只锁毫秒级别的时间片段,因此等待自旋锁的释放不会消耗太多CPU的时间。

2.自旋锁的实现

自旋锁其实是通过一个属性标志来控制访问锁的请求是否能满足,我们先来看一下spinlock的定义:

typedef struct spinlock {
    union {
        struct raw_spinlock rlock;
#ifdef CONFIG_DEBUG_LOCK_ALLOC
# define LOCK_PADSIZE (offsetof(struct raw_spinlock, dep_map))
        struct {
            u8 __padding[LOCK_PADSIZE];
            struct lockdep_map dep_map;
        };
#endif
    };
} spinlock_t;

去除debug的干扰,我们可以看到spinlock的核心成员为:

struct raw_spinlock rlock

接着看raw_spinlock的结构:

typedef struct raw_spinlock {
    arch_spinlock_t raw_lock;
#ifdef CONFIG_GENERIC_LOCKBREAK
    unsigned int break_lock;
#endif
#ifdef CONFIG_DEBUG_SPINLOCK
    unsigned int magic, owner_cpu;
    void *owner;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOC
    struct lockdep_map dep_map;
#endif
} raw_spinlock_t;

可以看到raw_spinlock最终依赖与体系结构相关的arch_spinlock_t结构,我们以x86为例,该结构如下所示:

typedef struct arch_spinlock {
    union {
        __ticketpair_t head_tail;
        struct __raw_tickets {
            __ticket_t head, tail;
        } tickets;
    };
} arch_spinlock_t;

其中__ticketpair_t为16位整数,__ticket_t为8位整数。

通过spin_lock_init宏可以初始化自旋锁,init的过程可以理解为把head_tail的值设置为1,并且为未锁住的状态。

下面是获取锁的过程:

static __always_inline int arch_spin_trylock(arch_spinlock_t *lock)
{
    arch_spinlock_t old, new;
    old.tickets = READ_ONCE(lock->tickets);
    if (! __tickets_equal(old.tickets.head, old.tickets.tail))
        return 0;
    new.head_tail = old.head_tail + (TICKET_LOCK_INC << TICKET_SHIFT); // tail+1
    new.head_tail &= ~TICKET_SLOWPATH_FLAG;
    // cmpxchg是一个完全内存屏障(full barrier)
    return cmpxchg(&lock->head_tail, old.head_tail, new.head_tail) == old.head_
tail;
}

其中:

static inline int   __tickets_equal(__ticket_t one, __ticket_t two)
{
    return ! ((one ^ two) & ~TICKET_SLOWPATH_FLAG);
}

__tickets_equal的过程one和two先做异或,假如两者一样则返回0, TICKET_SLOW-PATH_FLAG为0,取反后则变为OXFF,那么该函数表明假如one和two相等则返回真;否则返回假。

arch_spin_trylock的过程为:

1)校验锁的head和tail是否相等,假如不相等,则获取锁失败,返回0。

2)给tail+1。

3)比较lock->head_tail和old.head_tail的值是否相等,如果相等,则把new.head_tail赋给new.head_tail并且返回1。

接着我们来看释放锁的过程:

static __always_inline void arch_spin_unlock(arch_spinlock_t *lock)
{
    if (TICKET_SLOWPATH_FLAG &&
        static_key_false(&paravirt_ticketlocks_enabled)) {
        __ticket_t head;
        BUILD_BUG_ON(((__ticket_t)NR_CPUS) ! = NR_CPUS);
        head = xadd(&lock->tickets.head, TICKET_LOCK_INC);
        if (unlikely(head & TICKET_SLOWPATH_FLAG)) {
            head &= ~TICKET_SLOWPATH_FLAG;
            __ticket_unlock_kick(lock, (head + TICKET_LOCK_INC));
        }
    } else
        __add(&lock->tickets.head, TICKET_LOCK_INC, UNLOCK_LOCK_PREFIX);
}

这个函数的关键就在于:

__add(&lock->tickets.head, TICKET_LOCK_INC, UNLOCK_LOCK_PREFIX);

解锁的过程就是给__add&lock->tickets.head做+1操作。

接下来看判断是否上锁的条件:

static inline int arch_spin_is_locked(arch_spinlock_t *lock)

{

struct __raw_tickets tmp = READ_ONCE(lock->tickets);

return ! __tickets_equal(tmp.tail, tmp.head);

}

从上面的函数我们可以知道,其实就是判断tail和head是否相等,假如不相等则说明已经上锁了。

最后我们来看一下循环等待获取锁的过程:

static __always_inline void arch_spin_lock(arch_spinlock_t *lock)
{
    register struct __raw_tickets inc = { .tail = TICKET_LOCK_INC };
    inc = xadd(&lock->tickets, inc);
    if (likely(inc.head == inc.tail))
        goto out;
    for (; ; ) {
        unsigned count = SPIN_THRESHOLD;
        do {
            inc.head = READ_ONCE(lock->tickets.head);
            if (__tickets_equal(inc.head, inc.tail))
                  goto clear_slowpath;
            cpu_relax();
        } while (--count);
        __ticket_lock_spinning(lock, inc.tail);
    }
clear_slowpath:
    __ticket_check_and_clear_slowpath(lock, inc.head);
out:
    barrier();
}

这个过程步骤如下:

1)tail ++。

2)假如tail++之前tail和head相等,则说明现在已经获得了锁,退出。

3)假如tail和head不相等,则循环等待,直到相等为止。

图2-5说明了整个加锁和释放锁的过程,每次上锁都会进行tail++。解锁进行head++,当head==tail的时候,则说明未上锁。

图2-5 获取和释放自旋锁的过程

2.3.3 信号量

通过前面的介绍,我们已经知道信号量(Sema-phore)用于保护有限数量的临界资源,在操作完共享资源后,需释放信号量,以便另外的进程来获得资源。获得和释放应该成对出现。从操作系统的理论角度讲,信号量实现了一个加锁原语,即让等待者睡眠,直到等待的资源变为空闲。

下面我们来分析信号量的实现,其定义如下:

struct semaphore {
    raw_spinlock_t        lock;                     //  获取计数器的自旋锁
    unsigned int          count;                    //计数器
    struct list_head     wait_list;               //等待队列
};

图2-6描述了信号量获取和释放的原理,即down和up的过程。在down的过程中,假如count>0,则做count-操作;否则执行__down,并且在获取自旋锁的时候保存中断到eflags寄存器,最后再恢复中断。

图2-6 信号量获取和释放的原理图

其中__down的执行过程为:

1)先把当前task的waiter放入wait_list队列尾部。

2)进入死循环中。

3)假如task状态满足signal_pending_state,则跳出循环,并且从等待队列中删除,返回EINTR异常。

4)假如等待的超时时间用完了,则跳出循环,并且从等待队列中删除,返回ETIME异常。

5)设置task状态为之前传入的TASK_UNINTERRUPTIBLE(该状态只能被wake_up唤醒)。

6)释放sem上的lock。

7)调用schedule_timeout,直到timout后被唤醒,然后重新申请sem->lock。

8)假如waiter.up状态变为true了,则说明到了被up唤醒的状态了,则返回0。

在up的过程中,先获取sem->lock,并且保存中断。如果sem->wait_list为空,则直接做sem->count++操作;否则执行__up。

其中__up的执行过程为:

1)从sem->wait_list队列中找到第一个等待的任务。

2)从等待队列中删除该任务。

3)把waiter->up设置为true。

4)尝试唤醒该进程。

2.3.4 互斥锁

互斥锁(Mutex)从功能上来讲和自旋锁类似,都是为了控制同一时刻只能有一个线程进入临界区。从实现上来讲,自旋锁是在CPU上实现忙等,而互斥锁则会让无法进入临界区的线程休眠。从某种角度来讲,互斥锁其实就是退化版的信号量。下面是互斥锁的定义:

struct mutex {
    //  1: unlocked, 0: locked, 小于0: locked, 在锁上有等待者
    atomic_t                 count;
    spinlock_t              wait_lock;
    struct list_head      wait_list;
…
};

可以发现count只有两种状态1和0,1为unlock;0为locked。其他实现都和信号量类似,大家可以结合代码并且参考信号量的实现来自己分析。

2.3.5 读写锁

在很多时候,并发访问都是读大于写的场景,假如把读者当做写者一样加锁,那么对性能影响较大。所以读写锁(rw-lock)分别对读者和写者进行了处理,来优化解决该场景下的性能问题。

下面我们来看Linux对读写锁的实现,首先来看一下在x86中对其的定义:

typedef struct qrwlock {
    atomic_t                cnts;
    arch_spinlock_t       wait_lock;
} arch_rwlock_t;

其中原子变量cnts初始化为0,自旋锁wait_lock初始化为未上锁状态。

结合图2-7我们来分析其实现原理:

图2-7 读写锁实现原理

获取读锁的过程如下:

1)如果cnts低八位的读部分为0,那么就进入下一步;否则获得锁失败。

2)对高位的读为+1。

3)再进行判断是否写位置为0,如果是则返回1,说明获得了锁。

4)如果写锁被别人获得了,那么就把高位减1,并且返回0,获得读锁失败。

释放读锁的过程只要把cnts的高位减1即可。

获取写锁的过程如下:

1)假如cnts为0,则if条件不满足,说明没有读者和写者;否则要是存在读者或者写者,返回0,获取写锁失败。

2)把cnts的低八位写标志设置为OXFF。

释放写锁则直接把低八位的读标志设置为0。

2.3.6 抢占

我们先回顾一下,一个进程什么时候会放弃CPU:

‰时间片用完后调用schedule函数。

‰由于IO等原因自己主动调用schedule。

‰其他情况,当前进程被其他进程替换的时候。

那么,加入抢占(preempt)的概念后,当前进程就有可能被替换,假如当你按下键盘的时候,键盘中断程序运行之后会让进程B替换你当前的工作进程A,原因是B进程优先级比较高,这就是抢占。

内核要完成抢占必然需要打开本地中断,这两者是不可分割的,如图2-8所示。

图2-8 用户键盘输入发生抢占

下面我们来看Linux中开启抢占的实现:

#define preempt_enable() \
do { \
    barrier(); \
    if (unlikely(preempt_count_dec_and_test())) \
        __preempt_schedule(); \
} while (0)

假如__preempt_count-1之后还是大于0,那么将会执行:__preempt_schedule();

asmlinkage __visible void __sched notrace preempt_schedule(void)
{
    if (likely(! preemptible()))
        return;
    preempt_schedule_common();
}
#define preemptible()(preempt_count() == 0 && ! irqs_disabled())
static void __sched notrace preempt_schedule_common(void)
{
    do {
        preempt_disable_notrace();
        __schedule(true);
        preempt_enable_no_resched_notrace();
    } while (need_resched());
}

preempt_schedule函数检查是否允许本地中断,以及当前进程的preempt_count字段是否为0,如果两个条件都为真,它就调用schedule()选择另外一个进程来运行。因此,内核抢占可能在结束内核控制路径(通常是一个中断处理程序)时发生,也可能在异常处理程序调用preempt_enable()重新允许内核抢占时发生。

2.3.7 per-cpu变量

目前生产环境的服务器大多数跑的都是SMP(对称多处理器结构),如图2-9所示。因为SMP系统多个核心与内存交互的时候,因为L1 cache的存在,会出现一致性的问题。所以,最好的方式就是每个核自己维护一份变量,自己用自己的,这样就不会出现一致性问题了。

图2-9 独立L1 cache的SMP处理器架构

为了解决这个问题,Linux引入了per-cpu变量,可以在编译时声明,也可以在系统运行时动态生成。

首先来感受一下per-cpu变量的使用方法。per-cpu变量在使用之前需要先进行定义,编译期间创建一个per-cpu变量:

DEFINE_PER_CPU(int, my_percpu);                    //声明一个变量
DEFINE_PER_CPU(int[3], my_percpu_array);         //声明一个数组

使用编译时生成的per-cpu变量:

ptr = get_cpu_var(my_percpu);
// 使用ptr
put_cpu_var(my_percpu);

也可以使用下列宏来访问特定CPU上的per-cpu变量:

per_cpu(my_percpu, cpu_id);

per-cpu变量导出,供模块使用:

EXPORT_PER_CPU_SYMBOL(per_cpu_var);
EXPORT_PER_CPU_SYMBOL_GPL(per_cpu_var);

动态分配per-cpu变量:

void *alloc_percpu(type);
void *__alloc_percpu(size_t size, size_t align);

使用动态生成的per-cpu变量:

int cpu;
cpu = get_cpu();
ptr = per_cpu_ptr(my_percpu);
// 使用ptr
put_cpu();

接下来我们通过per-cpu变量的初始化过程来分析其实现原理,系统在启动时会调用__init setup_per_cpu_areas为per-cpu变量申请内存空间:

void __init setup_per_cpu_areas(void)
{
    unsigned int cpu;
    unsigned long delta;
    int rc;
…
#ifdef CONFIG_X86_64
        atom_size = PMD_SIZE;
#else
        atom_size = PAGE_SIZE;
#endif
        rc = pcpu_embed_first_chunk(PERCPU_FIRST_CHUNK_RESERVE,
                    dyn_size, atom_size,
                    pcpu_cpu_distance,
                    pcpu_fc_alloc, pcpu_fc_free);
        …
}
if (rc < 0)
  rc = pcpu_page_first_chunk(PERCPU_FIRST_CHUNK_RESERVE,
            pcpu_fc_alloc, pcpu_fc_free,
            pcpup_populate_pte);
…
/* percpu区域已初始化并且可以使用 */
delta = (unsigned long)pcpu_base_addr - (unsigned long)__per_cpu_start;
for_each_possible_cpu(cpu) {
  per_cpu_offset(cpu) = delta + pcpu_unit_offsets[cpu];
  per_cpu(this_cpu_off, cpu) = per_cpu_offset(cpu);
  per_cpu(cpu_number, cpu) = cpu;
  setup_percpu_segment(cpu);
  setup_stack_canary_segment(cpu);
  //下面进行early init阶段需要初始化的per_cpu数据
#ifdef CONFIG_X86_LOCAL_APIC
        per_cpu(x86_cpu_to_apicid, cpu) =
            early_per_cpu_map(x86_cpu_to_apicid, cpu);
        per_cpu(x86_bios_cpu_apicid, cpu) =
            early_per_cpu_map(x86_bios_cpu_apicid, cpu);
#endif
#ifdef CONFIG_X86_32
        per_cpu(x86_cpu_to_logical_apicid, cpu) =
            early_per_cpu_map(x86_cpu_to_logical_apicid, cpu);
#endif
#ifdef CONFIG_X86_64
        per_cpu(irq_stack_ptr, cpu) =
            per_cpu(irq_stack_union.irq_stack, cpu) +
            IRQ_STACK_SIZE -64;
#endif
#ifdef CONFIG_NUMA
        per_cpu(x86_cpu_to_node_map, cpu) =
            early_per_cpu_map(x86_cpu_to_node_map, cpu);
…
}

其中两个关键步骤为:

1)pcpu_page_first_chunk。先分配一块bootmem区间p,作为一级指针,然后为每个CPU分配n个页,依次把指针存放在p中。p[0]..p[n-1]属于cpu0, p[n]-p[2n-1]属于CPU2,依次类推。接着建立一个长度为n×NR_CPUS的虚拟空间(vmalloc_early),并把虚拟空间对应的物理页框设置为p数组指向的pages。然后把每CPU变量__per_cpu_load拷贝至每个CPU自己的虚拟地址空间中。

2)将.data.percpu中的数据拷贝到其中,每个CPU各有一份。由于数据从__per_cpu_start处转移到各CPU自己的专有数据区中了,因此存取其中的变量就不能再用原先的值了,比如存取per_cpu__runqueues就不能再用per_cpu__runqueues了,需要做一个偏移量的调整,即需要加上各CPU自己的专有数据区首地址相对于__per_cpu_start的偏移量。在这里也就是__per_cpu_offset[i],其中CPU i的专有数据区相对于__per_cpu_start的偏移量为__per_cpu_offset[i]。

经过这样的处理,.data.percpu这个section在系统初始化后就可以释放了。

其中__per_cpu_load被重定向到了.data..percpu区域,和__per_cpu_start位置是一样的:

#define PERCPU_SECTION(cacheline)
    . = ALIGN(PAGE_SIZE);
    .data..percpu : AT(ADDR(.data..percpu) - LOAD_OFFSET) {
        VMLINUX_SYMBOL(__per_cpu_load) = .;
        PERCPU_INPUT(cacheline)
    }

图2-10为per-cpu变量的初始化流程,我们可以发现,经过setup_per_cpu_areas函数,per_cpu变量被拷贝到了各自的虚拟地址空间。原来的per_cpu变量区域,即__per_cpu_start和__per_cpu_end区域将会被删除。

图2-10 per-cpu变量的初始化

2.3.8 RCU机制

在Linux中,RCU(Read, Copy, Update)机制用于解决多个CPU同时读写共享数据的场景。它允许多个CPU同时进行写操作,而且不使用锁,其思想类似于copy on write的原理,并且实现垃圾回收器来回收旧数据。

使用RCU机制有几个前提条件:

‰ RCU使用在读者多而写者少的情况。RCU和读写锁相似。但RCU的读者占锁没有任何的系统开销。写者与写者之间必须要保持同步,且写者必须要等它之前的读者全部都退出之后才能释放之前的资源。

‰ RCU保护的是指针。这一点尤其重要,因为指针赋值是一条单指令,即一个原子操作,因此更改指针指向没必要考虑它的同步,只需要考虑cache的影响。

‰读者是可以嵌套的,也就是说rcu_read_lock()可以嵌套调用。

‰读者在持有rcu_read_lock()的时候,不能发生进程上下文切换;否则,因为写者需要要等待读者完成,写者进程也会一直被阻塞。

下面是whatisRCU.txt中使用RCU锁的例子:

struct foo {
    int a;
    char b;
    long c;
};
DEFINE_SPINLOCK(foo_mutex);
struct foo *gbl_foo;
void foo_update_a(int new_a)
{
    struct foo *new_fp;
    struct foo *old_fp;
    new_fp = kmalloc(sizeof(*new_fp), GFP_KERNEL);
    spin_lock(&foo_mutex);
    old_fp = gbl_foo;
    *new_fp = *old_fp;
    new_fp->a = new_a;
    rcu_assign_pointer(gbl_foo, new_fp);
    spin_unlock(&foo_mutex);
    synchronize_rcu();
    kfree(old_fp);
}
int foo_get_a(void)
{
    int retval;
    rcu_read_lock();
    retval = rcu_dereference(gbl_foo)->a;
    rcu_read_unlock();
    return retval;
}

如上代码中,RCU用于保护全局指针struct foo *gbl_foo. foo_get_a()用来从RCU保护的结构中取得gbl_foo的值。而foo_update_a()用来更新被RCU保护的gbl_foo的值。

我们再思考一下,为什么要在foo_update_a()中使用自旋锁foo_mutex呢?

假设中间没有使用自旋锁。那foo_update_a()的代码如下:

void foo_update_a(int new_a)
{
    struct foo *new_fp;
    struct foo *old_fp;
    new_fp = kmalloc(sizeof(*new_fp), GFP_KERNEL);
    old_fp = gbl_foo;
    1:-------------------------
    *new_fp = *old_fp;
    new_fp->a = new_a;
    rcu_assign_pointer(gbl_foo, new_fp);
    synchronize_rcu();
    kfree(old_fp);
}

假设A进程在上面代码的----标识处被B进程抢点,B进程也执行了goo_ipdate_a(),等B执行完后,再切换回A进程,此时,A进程所持的old_fd实际上已经被B进程给释放掉了,此后A进程对old_fd的操作都是非法的。

RCU API说明

我们在上面也看到了几个有关RCU的核心API,它们为别是:

rcu_read_lock()
rcu_read_unlock()
synchronize_rcu()
rcu_assign_pointer()
rcu_dereference()

其中:

‰ rcu_read_lock()和rcu_read_unlock()用来保持一个读者的RCU临界区,在该临界区内不允许发生上下文切换。

‰ rcu_dereference():读者调用它来获得一个被RCU保护的指针。

‰ rcu_assign_pointer():写者使用该函数来为被RCU保护的指针分配一个新的值,这样是为了安全地从写者到读者更改其值,这个函数会返回一个新值。

‰ rcu_dereference:实现也很简单,因为它们本身都是原子操作,因为只是为了cache一致性,插上了内存屏障,可以让其他的读者/写者看到保护指针的最新值。

‰ synchronize_rcu:函数由写者来调用,它将阻塞写者,直到所有读执行单元完成对临界区的访问后,写者才可以继续下一步操作。如果有多个RCU写者调用该函数,它们将在所有读执行单元完成对临界区的访问后全部被唤醒。

结合图2-11我们来说明Linux RCU机制的思路:

图2-11 Linux RCU机制实现原理

1)对于读操作,可以直接对共享资源进行访问,但前提是需要CPU支持访存操作的原子化,现代CPU对这一点都做了保证。但是RCU的读操作上下文是不可抢占的,所以读访问共享资源时可以采用read_rcu_lock(),该函数的功能是停止抢占。

2)对于写操作,思路类似于copy on write,需要将原来的老数据做一次拷贝,然后对其进行修改,之后再用新数据更新老数据,这时采用了rcu_assign_pointer()宏,在该函数中首先通过内存屏障,然后修改老数据。这个操作完成之后,老数据需要回收,操作线程向系统注册回收方法,等待回收。这个思路可以实现读者与写者之间的并发操作,但是不能解决多个写者之间的同步,所以当存在多个写者时,需要通过锁机制对其进行互斥,也就是在同一时刻只能存在一个写者。

3)在RCU机制中存在一个垃圾回收的后台进程,用于回收老数据。回收时间点就是在更新之前的所有读者全部退出时。由此可见,写者在更新之后是需要睡眠等待的,需要等待读者完成操作,如果在这个时刻读者被抢占或者睡眠,那么很可能会导致系统死锁。因为此时写者在等待读者,读者被抢占或者睡眠,如果正在运行的线程需要访问读者和写者已经占用的资源,那么将导致死锁。

那究竟怎么去判断当前的写者已经操作完了呢?我们在之前看到,读者在调用rcu_read_lock的时候会禁止抢占,因此只需要判断所有的CPU都进过了一次上下文切换,就说明所有读者已经退出了。参考http://www.ibm.com/developerworks/cn/linux/l-rcu/中对RCU过程有详细描述。

2.3.9 内存屏障

程序在运行过程中,对内存访问不一定按照代码编写的顺序来进行。这是因为有两种情况存在:

‰编译器对代码进行优化。

‰多CPU架构存在指令乱序访问内存的可能。

为解决这两个问题,分别需要通过不同的内存屏障来避免内存乱序。首先我们来看第一种情况,编译器优化。例如有如下场景:线程1执行:

while (! condition);
print(x);

线程2执行:

x = 100;
condition = 1;

condition初始值为0,结果线程1打印出来不一定为100,因为编译器优化后,有可能线程2先执行了condition = 1;后执行x = 100;我们可以在gcc编译的时候加上O2或者O3的选项,就会发生编译器优化。

为了消除该场景下编译器优化带来的不确定性,可以使用内存屏障:

#define barrier() __asm__ __volatile__("" ::: "memory")
x = 100;
barrier()
condition = 1;

另外,可以给变量加上volatile来去除编译器优化:

#define ACCESS_ONCE(x) (*(volatile typeof(x) *)&(x))
ACCESS_ONCE(x) = 100;
ACCESS_ONCE(condition) = 1;

接着我们来看多CPU运行当中内存访问乱序的问题,图2-12是Intel CPU的P6微架构,目前大部分的Inter CPU都沿用了该架构的思路,其他都是一些小的优化。从图中可以看到,CPU在处理指令的时候,为了提升性能,减少等待内存中的数据,采用了乱序执行引擎。

图2-12 Intel CPU的P6微架构

注意

很多时候我们并不能保证代码是按照我们书写的顺序来运行的。

假设如下代码:

volatile int x, y, r1, r2;
void start()
{
    x = y = r1 = r2 = 0;
}
void end()
{
    assert(! (r1 == 0 && r2 == 0));
}
void run1()
{
    x = 1;
    r1 = y;
}
void run2()
{
    y = 1;
    r2 = x;
}

代码执行顺序为:

1)start()

2)线程1执行run1()

3)线程2执行run2()

4)调用end()

结果r1或者r2均有可能为0,原因就是乱序执行引擎的存在。要解决这个问题,在Pentium 4微处理器中引入了汇编语言指令lfence、sfence和mfence,它们分别有效地实现读内存barrier、写内存barrier和“读-写”内存barrier:

#define mb()       asm volatile("mfence":::"memory")
#define rmb()     asm volatile("lfence":::"memory")
#define wmb()     asm volatile("sfence" ::: "memory")

可以这样修改:

void run1()
{
x = 1;
mb();
    r1 = y;
}
void run2()
{
y = 1;
mb();
    r2 = x;
}

2.4 常见开源软件中的并发问题分析

前一节介绍了Linux中相关并发工具,实际场景中有很多应用,下面我们来对几个开源软件的并发设计进行分析。

2.4.1 Nginx原子性

下面我们通过分析Nginx中的原子变量实现,来介绍程序如何能做到保证原子性的。Nginx为了保证原子性设计了atomic函数,atomic的代码如下:

static ngx_inline ngx_atomic_uint_t
ngx_atomic_cmp_set(ngx_atomic_t *lock, ngx_atomic_uint_t old,
ngx_atomic_uint_t set)
{
    u_char res;
    __asm__ volatile (
    NGX_SMP_LOCK
    ”cmpxchgl %3, %1; ”
    ”sete %0; ”
    :“=a”(res) :“m”(*lock), “a”(old), “r”(set) :“cc”, “memory”);
    return res;
}

atomic的工作原理如下:

1)在多核环境下,NGX_SMP_LOCK其实就是一条lock指令,用于锁住总线。

2)cmpxchgl会保证指令同步执行。

3)sete根据cmpxchgl执行的结果(eflags中的zf标志位)来设置res的值。

其中假如cmpxchgl执行完之后,时间片轮转,这个时候eflags中的值会在堆栈中保持,这是CPU task切换机制所保证的。所以,等时间片切换回来再次执行sete的时候,也不会导致并发问题。

至于信号量、互斥锁,最终还得依赖原子性的保证,具体锁实现可以有兴趣自己再去阅读源代码。

下面是ngx_spinlock的实现,依赖了原子变量的ngx_atomic_cmp_set:

void
ngx_spinlock(ngx_atomic_t *lock, ngx_atomic_int_t value, ngx_uint_t spin)
{
#if (NGX_HAVE_ATOMIC_OPS)
    ngx_uint_t   i, n;
    for ( ; ; ) {
        if (*lock == 0 && ngx_atomic_cmp_set(lock, 0, value)) {
            return;
        }
        if (ngx_ncpu > 1) {
            for (n = 1; n < spin; n <<= 1) {
                  for (i = 0; i < n; i++) {
                      ngx_cpu_pause();
                  }
                  if (*lock == 0 && ngx_atomic_cmp_set(lock, 0, value)) {
                      return;
                  }
            }
        }
        ngx_sched_yield();
    }
#else
#if (NGX_THREADS)
#error ngx_spinlock() or ngx_atomic_cmp_set() are not defined !
#endif
#endif
}

在上面的代码中,Nginx的spinlock主要实现过程如下:

1)进入死循环。

2)假如可以获得锁,则return。

3)循环CPU的个数次来通过ngx_atomic_cmp_set获得锁,假如获得了,则return;否则继续死循环。

2.4.2 Memcached中的互斥锁

Memcached也使用了mutex这样的互斥锁,来控制对item的访问,代码如下:

void *item_trylock(uint32_t hv) {
    pthread_mutex_t *lock = &item_locks[hv & hashmask(item_lock_hashpower)];
    if (pthread_mutex_trylock(lock) == 0) {
        return lock;
    }
    return NULL;
}

注意

Memcached的互斥锁粒度比较细,可以看到,针对每个item,都加了一把锁,这样在并发的时候,可以尽量减少冲突,提高性能。

Memcached在锁的获得过程中,使用了pthread_mutex_trylock:

void item_trylock_unlock(void *lock) {
    mutex_unlock((pthread_mutex_t *) lock);
}
void item_unlock(uint32_t hv) {
    uint8_t *lock_type = pthread_getspecific(item_lock_type_key);
    if (likely(*lock_type == ITEM_LOCK_GRANULAR)) {
        mutex_unlock(&item_locks[hv & hashmask(item_lock_hashpower)]);
    } else {
        mutex_unlock(&item_global_lock);
    }
}

Memcached中,锁的释放过程也是同样的道理,首先从item_locks数组中找到锁对象。然后通过mutex_unlock来解锁。

2.4.3 Redis无锁解决方案

Redis的服务器程序采用单进程、单线程的模型来处理客户端的请求。对读写等事件的响应是通过对epoll函数的包装来做到的。

图2-13是Redis服务器模型原理,整个服务器初始化的过程如下:

图2-13 Redis服务器模型

1)初始化asEventLoop。

2)初始化服务器socket监听,并且绑定acceptTcpHandler事件函数,以应对建立客户端连接的请求。

3)绑定beforesleep函数到eventLoop,并且调用aeMain来启动epoll主循环。

4)主循环响应客户端要求建立连接的请求。

5)主循环读取客户端命令,并执行。

6)如有数据回写则初始化writeEvent,将数据提交到c-replay队列。主循环需要处理此事件的时候则读取数据写回客户端。

因为Redis是单线程的模型,所以,所有的操作都是先来后到串行的,因此,在这个方案中,可以不需要锁,也没有并发的存在,模型假设了所有操作都是基于内存的操作,速度是非常快的。

2.4.4 Linux中惊群问题分析

Linux中惊群相关的问题鼎鼎有名,但是在网上搜索相关资料,发现都是只言片语,不是说得很完整,本节对这个问题进行系统的总结。

惊群是在多线程或者多进程的场景下,多个线程或者进程在同一条件下睡眠,当唤醒条件发生的时候,会同时唤醒这些睡眠进程或者线程,但是只有一个是可以执行成功的,相当于其他几个进程和线程被唤醒后存在执行开销的浪费。

在Linux中,以下场景下会触发惊群:

‰多个进程或者线程在获取同一把锁的时候睡眠。

‰多个进程或者线程同时进行accept。

‰多个进程在同一个epoll上竞争。

‰多个进程在多个epoll上对于同一个监听的socket进行accept。

下面我们分别来举例说明这几个场景,及其解决方案。

1. Linux中通用的解决方案

Linux提供了一个wake_up_process方法,用于唤醒一个指定的进程,其声明如下:

int wake_up_process(struct task_struct *p)

那么,假如有一堆的进程同时睡眠的时候,我们如何来维护这些睡眠的进程,并且如何只让其中一个被唤醒呢?

Linux通过工作队列的方式来解决这个问题,在进程睡眠之前,会先进行一个特定的操作:

prepare_to_wait_exclusive(wait_queue_head_t *q, wait_queue_t *wait, int state)
{
    unsigned long flags;
    wait->flags |= WQ_FLAG_EXCLUSIVE;
    spin_lock_irqsave(&q->lock, flags);
    if (list_empty(&wait->task_list))
        __add_wait_queue_tail(q, wait);
    set_current_state(state);
    spin_unlock_irqrestore(&q->lock, flags);
}

以上prepare_to_wait_exclusive函数主要是将当前的flags加上了WQ_FLAG_EXCLU-SIVE的标志,然后放入到工作队列的尾部,最后设置相应的状态,例如TASK_INTERR-UPTIBLE表示可以被wake_up唤醒。

当我们需要进行唤醒的时候,Linux提供了_ _wake_up_common方法,来唤醒工作队列中的进程:

static void __wake_up_common(wait_queue_head_t *q, unsigned int mode,
            int nr_exclusive, int wake_flags, void *key)
{
    wait_queue_t *curr, *next;
    list_for_each_entry_safe(curr, next, &q->task_list, task_list) {
        unsigned flags = curr->flags;
        if (curr->func(curr, mode, wake_flags, key) &&
                  (flags & WQ_FLAG_EXCLUSIVE) && ! --nr_exclusive)
            break;
    }
}

上面的_ _wake_up_common方法会遍历工作队列,寻找flags中含有WQ_FLAG_EXCLUSIVE标志的进程,当nr_exclusive减为0的时候,就会跳出循环,所以只能唤醒nr_exclusive个进程,比如nr_exclusive=1。

其中curr->func的回调函数是通过DEFINE_WAIT(wait)宏来定义的:

#define DEFINE_WAIT_FUNC(name, function)
    wait_queue_t name = {
        .private        = current,
        .func            = function,
        .task_list     = LIST_HEAD_INIT((name).task_list),
    }
#define DEFINE_WAIT(name) DEFINE_WAIT_FUNC(name, autoremove_wake_function)

通过上面的代码可以发现回调函数为autoremove_wake_function:

int autoremove_wake_function(wait_queue_t *wait, unsigned mode, int sync, void
    *key)
{
    int ret = default_wake_function(wait, mode, sync, key);
    if (ret)
        list_del_init(&wait->task_list);
    return ret;
}
int default_wake_function(wait_queue_t *curr, unsigned mode, int wake_flags,
            void *key)
{
    return try_to_wake_up(curr->private, mode, wake_flags);
}

autoremove_wake_function最终通过default_wake_function调用try_to_wake_up来实现唤醒指定的进程。整个流程见图2-14。

图2-14 Linux进程唤醒流程

2. socket accept场景下的惊群及解决方案

在Linux中,针对服务器监听的socket进行accept操作,假如没有新的accept事件,那么会进行睡眠。sys_accept调用最终会在TCP层执行inet_csk_accept函数:

struct sock *inet_csk_accept(struct sock *sk, int flags, int *err)
{
    struct inet_connection_sock *icsk = inet_csk(sk);
    struct request_sock_queue *queue = &icsk->icsk_accept_queue;
    struct request_sock *req;
    struct sock *newsk;
    int error;
    lock_sock(sk);
...
// 阻塞等待,直到有全连接建立。如果用户设置了等待超时时间,超时后会退出
    error = inet_csk_wait_for_connect(sk, timeo);
...
out:
    release_sock(sk);
...

inet_csk_accept在等待accept连接到来的时候,会执行inet_csk_wait_for_connect:

static int inet_csk_wait_for_connect(struct sock *sk, long timeo)
{
    struct inet_connection_sock *icsk = inet_csk(sk);
    DEFINE_WAIT(wait);
    ...
    for (; ; ) {
        prepare_to_wait_exclusive(sk_sleep(sk), &wait,
                          TASK_INTERRUPTIBLE);
        ...
        if (reqsk_queue_empty(&icsk->icsk_accept_queue))
            timeo = schedule_timeout(timeo);
        ...
    }
      ..
}

上面的过程看着眼熟吗,prepare_to_wait_exclusive的作用在上一个例子已经介绍过了,这里会把当前的进程通过DEFINE_WAIT(wait)包装成wait_queue_t结构,并且放入到监听socket的等待队列尾部。然后通过schedule_timeout让当前进程睡眠timeo个时间。

该进程被唤醒有几种可能:

‰睡眠timeo后被timer定时器唤醒。

‰ accept事件到来被唤醒。

第2种被唤醒的场景是由网络层的代码触发的。以TCP V4协议为例,其执行过程为:tcp_v4_rcv->tcp_v4_do_rcv->tcp_child_process,在tcp_child_process方法中会调用父socket,也就是监听socket的parent->sk_data_ready(parent)方法,在sock_init_data的时候,我们发现,该函数的定义如下:

sk->sk_data_ready = sock_def_readable;
sock_def_readable函数实现为:
static void sock_def_readable(struct sock *sk)
{
    struct socket_wq *wq;
    rcu_read_lock();
    wq = rcu_dereference(sk->sk_wq);
    if (skwq_has_sleeper(wq))
        wake_up_interruptible_sync_poll(&wq->wait, POLLIN | POLLPRI |
            POLLRDNORM | POLLRDBAND);
    sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_IN);
    rcu_read_unlock();
}

sock_def_readable首先判断是否在等待队列中有睡眠的进程,然后通过wake_up_interruptible_sync_poll进行唤醒。其实现如下:

#define wake_up_interruptible_sync_poll(x, m)
    __wake_up_sync_key((x), TASK_INTERRUPTIBLE, 1, (void *) (m))
void __wake_up_sync_key(wait_queue_head_t *q, unsigned int mode,
            int nr_exclusive, void *key)
{
    unsigned long flags;
    int wake_flags = 1;
    if (unlikely(! q))
        return;
    if (unlikely(nr_exclusive ! = 1))
        wake_flags = 0;
    spin_lock_irqsave(&q->lock, flags);
    __wake_up_common(q, mode, nr_exclusive, wake_flags, key);
    spin_unlock_irqrestore(&q->lock, flags);
}

最终发现是由_ _wake_up_common来唤醒的,和前面介绍的是一样的,并且nr_exclusive为1。说明只会唤醒一个,不会发生惊群。

那么,在inet_csk_accept的时候,lock_sock(sk)操作为什么不能避免惊群呢?理论上锁住了监听的socket,每次只有一个进程可以accept了呀。事实上,lock_sock(sk)的时候,要是拿不到锁,也会进行睡眠,假如不做特殊处理,也有可能惊群,lock_sock最终调用_ _lock_sock:

static void __lock_sock(struct sock *sk)
    __releases(&sk->sk_lock.slock)
    __acquires(&sk->sk_lock.slock)
{
    DEFINE_WAIT(wait);
    for (; ; ) {
        prepare_to_wait_exclusive(&sk->sk_lock.wq, &wait,
                TASK_UNINTERRUPTIBLE);
        spin_unlock_bh(&sk->sk_lock.slock);
        schedule();
        spin_lock_bh(&sk->sk_lock.slock);
        if (! sock_owned_by_user(sk))
            break;
    }
    finish_wait(&sk->sk_lock.wq, &wait);
}

当无法获得上锁条件进行schedule放弃CPU之前,会先进行prepare_to_wait_exclusive,这个动作前面已经解释得很清楚了。所以,假如同时有多个进程在lock_sock阻塞的时候,也仅会被唤醒一个。

最后,图2-15描述了accept的整体流程图。

图2-15 Linux accept的流程

3. epoll的惊群解决方案

在使用epoll的时候,我们会在注册事件后调用epoll_wait,该系统调用会调用ep_poll方法:

static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
        int maxevents, long timeout)
{
  ..
// 没有事件,所以需要睡眠。当有事件到来时,睡眠会被ep_poll_callback函数唤醒
// 将current初始化为等待队列项wait后,放入ep→wg这个等待队列中
init_waitqueue_entry(&wait, current);
    __add_wait_queue_exclusive(&ep->wq, &wait);
    for (; ; ) {
        // 执行ep_poll_callback()唤醒时应当将当前进程唤醒,所以当前进程状态应该为“可唤醒”
          TASK_INTERRUPTIBLE
        set_current_state(TASK_INTERRUPTIBLE)
        // 如果就绪队列不为空(已经有文件的状态就绪)或者超时,则退出循环
        if (ep_events_available(ep) || timed_out)
            break;
        // 如果当前进程接收到信号,则退出循环,返回EINTR错误
        if (signal_pending(current)) {
            res = -EINTR;
            break;
        }
        spin_unlock_irqrestore(&ep->lock, flags);
        // 放弃CPU休眠一段时间
        if (! schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS))
            timed_out = 1;
        spin_lock_irqsave(&ep->lock, flags);
    }
    __remove_wait_queue(&ep->wq, &wait);
    __set_current_state(TASK_RUNNING);
}
  ..

我们发现,假如没有事件,需要睡眠,通过_ _add_wait_queue_exclusive将当前进程放入等待队列的队头中,其实现如下:

static inline void
__add_wait_queue_exclusive(wait_queue_head_t *q, wait_queue_t *wait)
{
    wait->flags |= WQ_FLAG_EXCLUSIVE;
    __add_wait_queue(q, wait);
}

其中WQ_FLAG_EXCLUSIVE用于赋给flgas,表示该睡眠进程是一个互斥进程。

睡眠的当前进程会被回调函数ep_poll_callback唤醒,其实现如下:

static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void
    *key)
{
..
wake_up_locked(&ep->wq);
...
}
#define wake_up_locked(x)   __wake_up_locked((x), TASK_NORMAL, 1)
void __wake_up_locked(wait_queue_head_t *q, unsigned int mode, int nr)
{
    __wake_up_common(q, mode, nr, 0, NULL);
}

ep_poll_callback最终通过__wake_up_common函数来唤醒等待队列中的互斥进程。

4. Nginx为什么还有惊群问题

我们分析一下Nginx为什么还会有惊群问题呢?Nginx不是已经使用了epoll了吗?epoll上面又已经解决了,为什么还会有这个问题呢?原因是Nginx的master在fork出多个worker进程后,worker进程才创建出多个epoll,所以多个进程假如同时进行epoll_wait,还是有可能会发生惊群问题,因为每个worker都维护了一个进程。

worker在循环中,会执行ngx_process_events_and_timers,我们来看它的实现:

void
ngx_process_events_and_timers(ngx_cycle_t *cycle)
{
  ..
if (ngx_use_accept_mutex) {
        if (ngx_accept_disabled > 0) {
            ngx_accept_disabled--;
        } else {
            if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) {
                  return;
            }
            if (ngx_accept_mutex_held) {
                  flags |= NGX_POST_EVENTS;
            } else {
                  if (timer == NGX_TIMER_INFINITE
                      || timer > ngx_accept_mutex_delay)
                  {
                      timer = ngx_accept_mutex_delay;
                  }
            }
        }
    }
...

上面的代码解释如下:假如ngx_accept_disabled>0,表示现在该woker已经压力很大了,所以不再接受新的处理。否则,会先尝试获取互斥锁,ngx_trylock_accept_mutex。

至于ngx_accept_disabled的大小设定,在每次accept事件处理完之后,进行相应的设置:

void
ngx_event_accept(ngx_event_t *ev)
{
  ..
ngx_accept_disabled = ngx_cycle->connection_n / 8
    - ngx_cycle->free_connection_n;
...
}

上面这个值的意思为最大连接数的八分之一减去空闲连接的数量。大于0说明空闲连接的数量都已经少于八分之一了。

通过上面代码可以发现,不管是woker的负载平衡,还是惊群问题的解决,都需要满足ngx_use_accept_mutex条件,可以通过修改配置解决,如下所示:

events {
accept_mutex on;
}

因为Nginx的worker数量本来就有限,与CPU核数相当,所以,打开该锁意义不是很大,另外在高并发场景下,因为惊群锁的存在,吞吐量反而会下降,Nginx在最新版本里也默认是关闭该锁的。

只有针对Apache这种多线程模型,而且会fork出成百上千个线程的,这个问题才会严重。我们来看Nginx作者的说法:“操作系统有可能会唤醒等待在accept()和select()调用阻塞的所有进程,这会引发惊群问题。在有很多worker的Apache(数百个或者更多)中会引发这个问题,但是假如你使用仅仅只有数个(通常为CPU核数)worker的Nginx,就不会引发这个问题。因此在Nginx中,你在使用select/kqueue/epoll等(除了accept())来调度进入的连接,可以关闭accept_mutex。”

2.4.5 解决MyCat同步问题

MyCat是用Java开发的开源数据库中间件,其服务器采用的是reactor模型(关于I/O模型,我们在I/O的章节中会具体介绍)。图2-16是我整理的MyCat的服务器中心领域模型。

图2-16 MyCat服务器领域模型

这是一个典型的Reactor模型,NIOReactorPool会预先分配N个Reactor工作线程,并且每个Reactor会维护一个selector,当事件就绪后,Reactor就会执行相关事件的回调函数。

基于这个思路MyCat中所有的I/O操作都是异步操作,但是我自定义的handler有个同步的过程,没办法,业务就是这么依赖了第三方。我只能这样编写代码:

final CountDownLatch Latch=New CountDownLatch(1);
Ctx.executeNaiveSQLSequnceJob(dataNodes, sql, new  SQLJobHandler(){private
    List<byte[]>fields;
    @Override
    public boolean onRowData(String dataNode, byte[]rowData){String c1=ResultSe
        tUtil, getColumnValAsString(rowData, fields, θ);
        String c2=ResultSetUtil, getColumnVaLasString(riwData, fields,1);
        share.seDataNode(c1);
        share.setIndex(c2);
        cacheLock.writeLock().lock();
        try{
            cache.put(bid, share);
            latch.countDown();
            retrurn false;
        }finally{
            cacheLock.writeLock().unlock();
        }
    }
    @Override
    public void onHeader(String dataNode, byte[]header, List<byte[]>fields)
        this.fields=fields;
    }
    @Override
    public void finished(String dataNode, boolean failde){latch.countDown();
    }
});
    try{
        latch.await(5, TimeUnit.SECONDS);
    }catch(InterruptedException e){

然后在实际测试过程中,发现偶然会出现线程卡死现象,我们回顾图2-16就发现了问题。因为MyCat的客户端连接(FrontedConnection)和后端MySQL的连接共用一个Reactor的池子,所以有可能会发生前端和后端同时被分配同一个Reactor,那么要是前端没退出,后端必然没法执行,然后互相等待造成死锁。

为了解决我的问题,我给前端单独分配了个池子,如下所示:

LUCGER.into(using nio network nanater);
NIOReactorPool  reactorPool=new  NIOReactorPool(BufferPool.LOCAL_BUF_THREAD_
    PREX+"NIOREACTOR",
    processors.length);
NIOReactorPool  clientReactorPool=new  NIOReactorPool(BufferPool.LOCAL_BUF_
    THREAD_PREX+" CLIENT_NIOREACTOR, "processors.length);
connector=new  NIO  Connector(BufferPool.LOCAL_BUF_THREAD_PREX+"NIO  Connector",
    reactorPool);
((NIOConnector)connector).start();
manager=new  NIOAcceptor(BufferPool.LOCAL_BUF_THREAD_PREX+NAME+"Manager", system.
    getBindIp(), system.getManagerPort(), mf, reactorPool);
server=new  NIOAcceptor(BufferPoo.LOCAL_BUF_THREAD_PREX+NAME+"Sever", syetem.
    getBindIp(), system.getServerPort(), sf, clientReactorPool);

2.4.6 false-sharing问题解决方案

CPU能从本地缓存中取数据就不会从内存中取,而内存中的数据和缓存中的数据一般都是按行读取的,也就是所谓的缓存行,一般为64个字节,当我们操作数据的时候,假如刚好多个变量在同一个缓存行的时候,多线程同时操作就会让之前的缓存行失效,导致程序效率降低。如图2-17所示,两个变量共享了同一个缓存行,从L1~L3cache,只要当X更新时,Y也就被踢出了缓存,反之亦然,重新从内存载入数据。

图2-17 False-sharing问题

为解决该问题,很多时候只能通过以空间换时间来搞定,比如在X和Y中间添加一个不使用的变量,仅仅用来占据空间,隔开缓存行那么就会把X和Y分割为2个缓存行,各自更新,相互不受影响,就是浪费空间而已。

下面我们来看两个具体例子。

1. Jetty中的解决方案

Jetty在实现BlockingArrayQueue的时候,会加上以下代码:

private long _space0;
private long _space1;
private long _space2;
private long _space3;
private long _space4;
private long _space5;
private long _space6;
private long _space7;

2. Nginx的解决方案

在C程序中,Nginx也有类似的实现:

typedef union
{
    erts_smp_rwmtx_t rwmtx;
    byte cache_line_align_[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(erts_smp_
        rwmtx_t))];
}erts_meta_main_tab_lock_t;
erts_meta_main_tab_lock_t main_tab_lock[16]

在下一章介绍内存slab分配器的时候(3.5.2节),着色也是用来解决false sharing的问题。

2.5 本章小结

并发一直是计算机工程领域的一个重要话题,有很多书籍和文章,甚至有很多论文专门对此进行过探讨。很多初学者觉得并发很难懂,很麻烦。其实所有的问题归根结底都是由简单的道理组成的,我认为,脱离计算机的底层原理来谈并发都是水中月,镜中花,尤其对初学者来说,会陷入在一堆并发编程的API中难以自拔。

本章开篇就阐述了到底什么是并发,并发会引发的问题,这样便于后续更加深入理解并发相关处理。对于应用程序员来讲,不管你是用C或是Java,甚至是Go语言,我们面临的并发问题,操作系统同样面临类似的问题。所以,只有在理解了操作系统的并发场景后,我们才会理解Linux内核的并发工具:atomicspin_lock、semaphore、mutex、读写锁、per-cpu、抢占、内存屏障、RCU机制等。

最后,分析了常见开源软件遇到的一些并发解决方案:Nginx的原子性、Memcached的互斥锁、Linux中惊群问题分析、解决Mycat中的同步问题、伪共享问题解决方案等,将这些应用与Linux内核的相关实现对照,就能做到融会贯通。