官术网_书友最值得收藏!

第2章 并發

很多程序員在面試的時候經常會被問到線程安全相關的問題,比如什么是線程安全,什么又是線程不安全,假如線程不安全,如何解決才能做到線程安全。這時候,往往會出現五花八門的答案,而且大多數都是本末倒置。很多時候,人們經常會用一些現象來回答問題,比如房價高這個問題,很多時候大家就會歸結于某些現象:溫州炒房團、丈母娘經濟、對比國際大城市房價等。但是,我們需要的是“原理性的解釋”,比如影響房價的經濟學原理如供需關系、不均衡分布等。

再回歸到線程安全問題,這是一個非常經典的問題,需要搞懂并發原理,才能搞清楚線程安全。任何事物的發展,都是有因果關系的,就像霍金博士一生孜孜不倦地研究,無非也就是想搞懂人類從哪里來,站在何方,將要去向哪里等大問題。所以針對并發這樣的話題,我們學習的思路應該是這樣的:

?并發到底是什么,如何在系統中產生。

?并發會帶來什么問題。

?如何解決并發帶來的問題。

我覺得這個思考方式,應該可以用于大部分技術原理的學習和研究了。只有帶著正確的問題出發,才有可能得到你想要的答案。下面我們就根據以上3個問題對并發相關的話題進行探討,在后續的章節中,我還會反復強調這樣的思考方式。

本章先介紹并發原理,再分析Linux中的并發相關工具,最后介紹開源軟件中的并發問題是如何解決的。

2.1 什么是并發

首先我們需要搞清楚到底什么是并發,它在系統中又是以何種形式存在的。

2.1.1 并發是如何產生的

在操作系統中,一個時間段中有幾個程序都處于已啟動運行到運行完畢之間,且這幾個程序都是在同一個處理器上運行,這種情形叫并發。但是,在任一個時刻只有一個程序在處理器上運行。

從這個過程中我們大致可以了解到,并發主要和處理器(CPU)有關,當同時有多個運行中的程序需要占用處理器資源,就形成了并發。圖2-1總結了并發的兩種場景,第一種場景是多個進程使用同一個處理器內核資源,第二種場景是多個進程使用不同的處理器內核資源。

圖2-1 兩種并發場景

2.1.2 并發會帶來什么問題

針對上面介紹的并發兩種場景,會有不同的問題。我們先來分析第一種場景,多個進程同時使用同一個處理器核(core)資源。我們知道一個處理器核在同一時刻只能被一個進程占用,那么,從微觀角度講真正的并發應該不存在,應該不會有任何問題才對呀?很遺憾,事實情況并非如此,為了防止CPU資源被同一個進程長期占用,大部分硬件都會提供時鐘中斷機制,在中斷發生的時候,會進行進程的切換,當前進程會讓出CPU,并且讓其他進程能獲得CPU的機會。因為進程切換的存在,假如共享同一個內存變量,就會存在代碼臨界區,比如i++操作,就不能保證原子性,如圖2-2所示。因為i++其實分為兩個步驟:

1)add i

2)set i

圖2-2 多個進程同時使用同一個處理器核的情況

假設i=0,當進程1執行完add i后,就發生了切換。進程2重新開始執行add i,那么2個進程都執行完i++之后,結果i的值還是1。

所以,在這種情況下,并發帶來的問題就是進程切換造成的代碼臨界區。

我們來分析并發的第二種場景,多個進程同時使用多個CPU核。在這種情況下,會引發兩種問題。第一種問題和多個進程使用1個CPU核引發的問題一樣,由于先天就是多個核并行執行多個進程的程序,假如共享同一個變量操作,必然會存在代碼臨界區。

第二種問題如圖2-3所示,我們可以發現,因為CPU每個核都維護了一個L2 cache(二級緩存),其目的是為了減少與內存之間的交互,提升數據的訪問速度。但是這樣,就會造成主存中的數據復制存在多份在各自的L2 cache中,導致數據不一致。這就是CPU二級緩存和內存之間的可見性問題。

圖2-3 多個進程同時使用多個處理器核的情況

2.1.3 如何解決并發帶來的問題

上節分析了并發帶來的問題,歸根結底就2類:

?代碼臨界區的問題。

?主存可見性的問題。

下面我們分別來介紹這兩類問題的解決方案。

先說代碼臨界區問題。孫子曰:“百戰百勝,非善之善者也;不戰而屈人之兵,善之善者也。”也就是說最好的戰爭方式,就是不要發動戰爭,通過謀略讓對手投降。殺敵一千,自損八百,很是劃不來。所以,處理代碼臨界區的問題也是一樣,最好的方式就是消除臨界區。很多時候,臨界區是由于自己考慮不周到,代碼編寫方式不正確造成的,只要設計得當,是有可能消除的。

不過凡事無絕對,假如不能消除臨界區,那么我們只能硬著頭皮想辦法對付了。前面我們分析臨界區出現問題是因為多個進程同時進入了臨界區,造成了邏輯的混亂。所以,我們可以把臨界區作為一個整體,讓多個進程串行通過臨界區,達到保護臨界區的目的。這樣的機制我們就叫做同步。同步在技術上一般都是通過鎖機制來解決的,后面我們會具體分析Linux中的不同鎖實現方式。

另外像i++這樣的操作,一般都會在硬件級別提供原子操作指令作為解決方案,本章我們也會介紹原子變量的實現方法,一般都會通過cmpxgl這樣原子指令來支持。

接著來看主存可見性的問題。多個進程依賴同一個內存變量,那么為了保證可見性,可以通過讓L2 cache強制失效,都去主存中取數據。有時候編譯器為了提升程序執行效率,都會對編譯后的代碼進行優化,讓某些指令在上下文中的結果依賴L2 cache,我們可以通過內存屏障等方式,去除編譯器優化,本章后面會具體介紹這種方法。

2.2 操作系統會在哪些場景遇到并發

在互聯網時代來臨之前,內核雖然生來就被設計成支持多用戶的,但是很少面臨高并發請求考驗,多用戶的操作很多時候都是人工來進行的,人敲鍵盤的速度再快也很難達到秒級的。所以,最開始,并發僅僅針對內核級別,給內核加了一把大內核鎖(BKL)。一旦某個用戶在使用內核,其他用戶則無法獲取內核資源。

但是大內核鎖太粗暴了,粒度太大。在互聯網應用場景就吃不消了。互聯網時代,針對不同的細節場景,開發了不同的內核工具來解決相應的問題。圖2-4介紹了Linux內核不同并發場景提供的工具實現。

圖2-4 Linux內核針對不同并發場景的工具實現

我把操作系統和并發相關的場景歸為4類:

1)和CPU相關的原子變量(Atomic)和自旋鎖(Spin_lock)。

在并發訪問的時候,我們需要保證對變量操作的原子性,通過Atomic變量解決該問題。其實自旋鎖的使用場景和互斥鎖類似,都是為了保護臨界區資源,但是自旋鎖是在CPU上進行的忙等,所以暫時就把它和原子變量歸為一類了。

2)圍繞代碼臨界區控制的相關工具有:信號量(Semaphore)、互斥(Mutex)、讀寫鎖(Rw-lock)、搶占(Preempt)。

有時候要對多個線程進行精細化控制,就要用到信號量了,下面引用百度百科中的例子:

以一個停車場的運作為例。簡單起見,假設停車場只有三個車位,一開始三個車位都是空的。這時如果同時來了五輛車,看門人允許其中三輛直接進入,然后放下車攔,剩下的車則必須在入口等待,此后來的車也都不得不在入口處等待。這時,有一輛車離開停車場,看門人得知后,打開車攔,放入外面的一輛進去,如果又離開兩輛車,則又可以放入兩輛車,如此往復。在這個停車場系統中,車位是公共資源,每輛車好比一個線程,看門人就是起到了信號量的作用。

互斥從某種角度來講,可以理解為池子大小為1的信號量,它和信號量的原理類似,都會讓無法獲取資源的線程睡眠。

很多時候并發的訪問往往都是讀大于寫,為了提高該場景的性能,內核提供了讀寫鎖進行優化訪問控制。

3)從CPU緩存角度,為優化多核本地訪問的性能,內核提供了per-cpu變量。

在多核場景,為了解決并發訪問內存的問題,經常需要鎖住總線,這樣效率很低。很多時候并發的最好方案就是沒有并發,per-cpu變量的設計正是基于這樣的思路。

4)從內存角度,為提升多核同時訪問內存的效率提供了RCU機制,另外,為了解決內存訪問有序性問題,提供了內存屏障(memory barrier)。假如需要多核同時寫同一共享數據,要保證不出問題,我能想到的也就是Copy On Write這樣的思路,RCU機制就是基于這個思路的實現。

程序在運行時內存實際的訪問順序和程序代碼編寫的訪問順序不一定一致,這就是內存亂序訪問。內存亂序訪問行為出現的理由是為了提升程序運行時的性能。在并發場景下,這種亂序就具有不確定性,內存屏障就是用來消除這種不確定性,保證并發場景的可靠性。

2.3 Linux中并發工具的實現

通過上一節的介紹,我們大概了解了內核中的并發場景,以及Linux提供的相應工具,本節把這些工具的實現簡單分析一下。

2.3.1 原子變量

原子變量是在并發場景經常使用的工具,很多并發工具都是基于原子變量來實現的,比如自旋鎖。原子變量對其進行的讀寫操作都必須保證原子性,也就是原子操作。

1.什么是原子操作

對于i++這樣的操作,如果要在雙核的CPU上每核都執行這條指令,假如現在i=1,那么執行完之后,你希望第一個核執行完之后i被設置為2,第二個核執行完之后i被設置為3。但是,由于i++這樣的執行不是原子操作,所以2個核有可能同時取到i的值為1,然后加完之后i最終為2。

這種問題是典型的“讀-修改-寫”場景,避免該場景引發不一致問題就是確保這樣的操作在芯片級是原子的。

x86在多核環境下,多核競爭數據總線的時候,提供了Lock指令來進行鎖總線的操作,在《Intel開發者手冊》卷3A,8.1.2.2中說明了Lock指令可以影響的指令集:

1)位測試和修改的指令(BTS、BTR和BTC)。

2)交換指令(XADD、CMPXCHG和CMPXCHG8B)。

3)Lock前綴會自動加在XCHG指令前。

4)單操作數邏輯運算指令:INC、DEC、NOT和NEG。

5)雙操作數的邏輯運算指令:ADD、ADC、SUB、SBB、AND、OR和XOR。

2.原子變量(atomic)的實現

定義如下:

typedef struct {
    int counter;
} atomic_t;

add和sub方法:

static __always_inline void atomic_add(int i, atomic_t *v)
{
    asm volatile(LOCK_PREFIX "addl %1, %0"
                  : "+m" (v->counter)
                  : "ir" (i));
}
static __always_inline void atomic_sub(int i, atomic_t *v)
{
    asm volatile(LOCK_PREFIX "subl %1, %0"
                  : "+m" (v->counter)
                  : "ir" (i));
}

通過之前分析我們知道intel的原子指令保證操作的原子性。并且多核環境下使用lock來鎖總線,保證串行訪問總線。

讀取方法為:

static __always_inline int atomic_read(const atomic_t *v)
{
    return READ_ONCE((v)->counter);
}

在讀的時候為了防止臟讀,READ_ONCE中加上了volatile去除編譯器優化。

2.3.2 自旋鎖

1.為什么使用自旋鎖

由于自旋鎖(Spin_lock)只是將當前線程不停地執行循環體,而不改變線程的運行狀態,所以響應速度更快。但當線程數不斷增加時,性能下降明顯,因為每個線程都需要執行,占用CPU時間。所以它保護的臨界區必須小,且操作過程必須短。很多時候內核資源只鎖毫秒級別的時間片段,因此等待自旋鎖的釋放不會消耗太多CPU的時間。

2.自旋鎖的實現

自旋鎖其實是通過一個屬性標志來控制訪問鎖的請求是否能滿足,我們先來看一下spinlock的定義:

typedef struct spinlock {
    union {
        struct raw_spinlock rlock;
#ifdef CONFIG_DEBUG_LOCK_ALLOC
# define LOCK_PADSIZE (offsetof(struct raw_spinlock, dep_map))
        struct {
            u8 __padding[LOCK_PADSIZE];
            struct lockdep_map dep_map;
        };
#endif
    };
} spinlock_t;

去除debug的干擾,我們可以看到spinlock的核心成員為:

struct raw_spinlock rlock

接著看raw_spinlock的結構:

typedef struct raw_spinlock {
    arch_spinlock_t raw_lock;
#ifdef CONFIG_GENERIC_LOCKBREAK
    unsigned int break_lock;
#endif
#ifdef CONFIG_DEBUG_SPINLOCK
    unsigned int magic, owner_cpu;
    void *owner;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOC
    struct lockdep_map dep_map;
#endif
} raw_spinlock_t;

可以看到raw_spinlock最終依賴與體系結構相關的arch_spinlock_t結構,我們以x86為例,該結構如下所示:

typedef struct arch_spinlock {
    union {
        __ticketpair_t head_tail;
        struct __raw_tickets {
            __ticket_t head, tail;
        } tickets;
    };
} arch_spinlock_t;

其中__ticketpair_t為16位整數,__ticket_t為8位整數。

通過spin_lock_init宏可以初始化自旋鎖,init的過程可以理解為把head_tail的值設置為1,并且為未鎖住的狀態。

下面是獲取鎖的過程:

static __always_inline int arch_spin_trylock(arch_spinlock_t *lock)
{
    arch_spinlock_t old, new;
    old.tickets = READ_ONCE(lock->tickets);
    if (! __tickets_equal(old.tickets.head, old.tickets.tail))
        return 0;
    new.head_tail = old.head_tail + (TICKET_LOCK_INC << TICKET_SHIFT); // tail+1
    new.head_tail &= ~TICKET_SLOWPATH_FLAG;
    // cmpxchg是一個完全內存屏障(full barrier)
    return cmpxchg(&lock->head_tail, old.head_tail, new.head_tail) == old.head_
tail;
}

其中:

static inline int   __tickets_equal(__ticket_t one, __ticket_t two)
{
    return ! ((one ^ two) & ~TICKET_SLOWPATH_FLAG);
}

__tickets_equal的過程one和two先做異或,假如兩者一樣則返回0, TICKET_SLOW-PATH_FLAG為0,取反后則變為OXFF,那么該函數表明假如one和two相等則返回真;否則返回假。

arch_spin_trylock的過程為:

1)校驗鎖的head和tail是否相等,假如不相等,則獲取鎖失敗,返回0。

2)給tail+1。

3)比較lock->head_tail和old.head_tail的值是否相等,如果相等,則把new.head_tail賦給new.head_tail并且返回1。

接著我們來看釋放鎖的過程:

static __always_inline void arch_spin_unlock(arch_spinlock_t *lock)
{
    if (TICKET_SLOWPATH_FLAG &&
        static_key_false(&paravirt_ticketlocks_enabled)) {
        __ticket_t head;
        BUILD_BUG_ON(((__ticket_t)NR_CPUS) ! = NR_CPUS);
        head = xadd(&lock->tickets.head, TICKET_LOCK_INC);
        if (unlikely(head & TICKET_SLOWPATH_FLAG)) {
            head &= ~TICKET_SLOWPATH_FLAG;
            __ticket_unlock_kick(lock, (head + TICKET_LOCK_INC));
        }
    } else
        __add(&lock->tickets.head, TICKET_LOCK_INC, UNLOCK_LOCK_PREFIX);
}

這個函數的關鍵就在于:

__add(&lock->tickets.head, TICKET_LOCK_INC, UNLOCK_LOCK_PREFIX);

解鎖的過程就是給__add&lock->tickets.head做+1操作。

接下來看判斷是否上鎖的條件:

static inline int arch_spin_is_locked(arch_spinlock_t *lock)

{

struct __raw_tickets tmp = READ_ONCE(lock->tickets);

return ! __tickets_equal(tmp.tail, tmp.head);

}

從上面的函數我們可以知道,其實就是判斷tail和head是否相等,假如不相等則說明已經上鎖了。

最后我們來看一下循環等待獲取鎖的過程:

static __always_inline void arch_spin_lock(arch_spinlock_t *lock)
{
    register struct __raw_tickets inc = { .tail = TICKET_LOCK_INC };
    inc = xadd(&lock->tickets, inc);
    if (likely(inc.head == inc.tail))
        goto out;
    for (; ; ) {
        unsigned count = SPIN_THRESHOLD;
        do {
            inc.head = READ_ONCE(lock->tickets.head);
            if (__tickets_equal(inc.head, inc.tail))
                  goto clear_slowpath;
            cpu_relax();
        } while (--count);
        __ticket_lock_spinning(lock, inc.tail);
    }
clear_slowpath:
    __ticket_check_and_clear_slowpath(lock, inc.head);
out:
    barrier();
}

這個過程步驟如下:

1)tail ++。

2)假如tail++之前tail和head相等,則說明現在已經獲得了鎖,退出。

3)假如tail和head不相等,則循環等待,直到相等為止。

圖2-5說明了整個加鎖和釋放鎖的過程,每次上鎖都會進行tail++。解鎖進行head++,當head==tail的時候,則說明未上鎖。

圖2-5 獲取和釋放自旋鎖的過程

2.3.3 信號量

通過前面的介紹,我們已經知道信號量(Sema-phore)用于保護有限數量的臨界資源,在操作完共享資源后,需釋放信號量,以便另外的進程來獲得資源。獲得和釋放應該成對出現。從操作系統的理論角度講,信號量實現了一個加鎖原語,即讓等待者睡眠,直到等待的資源變為空閑。

下面我們來分析信號量的實現,其定義如下:

struct semaphore {
    raw_spinlock_t        lock;                     //  獲取計數器的自旋鎖
    unsigned int          count;                    //計數器
    struct list_head     wait_list;               //等待隊列
};

圖2-6描述了信號量獲取和釋放的原理,即down和up的過程。在down的過程中,假如count>0,則做count-操作;否則執行__down,并且在獲取自旋鎖的時候保存中斷到eflags寄存器,最后再恢復中斷。

圖2-6 信號量獲取和釋放的原理圖

其中__down的執行過程為:

1)先把當前task的waiter放入wait_list隊列尾部。

2)進入死循環中。

3)假如task狀態滿足signal_pending_state,則跳出循環,并且從等待隊列中刪除,返回EINTR異常。

4)假如等待的超時時間用完了,則跳出循環,并且從等待隊列中刪除,返回ETIME異常。

5)設置task狀態為之前傳入的TASK_UNINTERRUPTIBLE(該狀態只能被wake_up喚醒)。

6)釋放sem上的lock。

7)調用schedule_timeout,直到timout后被喚醒,然后重新申請sem->lock。

8)假如waiter.up狀態變為true了,則說明到了被up喚醒的狀態了,則返回0。

在up的過程中,先獲取sem->lock,并且保存中斷。如果sem->wait_list為空,則直接做sem->count++操作;否則執行__up。

其中__up的執行過程為:

1)從sem->wait_list隊列中找到第一個等待的任務。

2)從等待隊列中刪除該任務。

3)把waiter->up設置為true。

4)嘗試喚醒該進程。

2.3.4 互斥鎖

互斥鎖(Mutex)從功能上來講和自旋鎖類似,都是為了控制同一時刻只能有一個線程進入臨界區。從實現上來講,自旋鎖是在CPU上實現忙等,而互斥鎖則會讓無法進入臨界區的線程休眠。從某種角度來講,互斥鎖其實就是退化版的信號量。下面是互斥鎖的定義:

struct mutex {
    //  1: unlocked, 0: locked, 小于0: locked, 在鎖上有等待者
    atomic_t                 count;
    spinlock_t              wait_lock;
    struct list_head      wait_list;
…
};

可以發現count只有兩種狀態1和0,1為unlock;0為locked。其他實現都和信號量類似,大家可以結合代碼并且參考信號量的實現來自己分析。

2.3.5 讀寫鎖

在很多時候,并發訪問都是讀大于寫的場景,假如把讀者當做寫者一樣加鎖,那么對性能影響較大。所以讀寫鎖(rw-lock)分別對讀者和寫者進行了處理,來優化解決該場景下的性能問題。

下面我們來看Linux對讀寫鎖的實現,首先來看一下在x86中對其的定義:

typedef struct qrwlock {
    atomic_t                cnts;
    arch_spinlock_t       wait_lock;
} arch_rwlock_t;

其中原子變量cnts初始化為0,自旋鎖wait_lock初始化為未上鎖狀態。

結合圖2-7我們來分析其實現原理:

圖2-7 讀寫鎖實現原理

獲取讀鎖的過程如下:

1)如果cnts低八位的讀部分為0,那么就進入下一步;否則獲得鎖失敗。

2)對高位的讀為+1。

3)再進行判斷是否寫位置為0,如果是則返回1,說明獲得了鎖。

4)如果寫鎖被別人獲得了,那么就把高位減1,并且返回0,獲得讀鎖失敗。

釋放讀鎖的過程只要把cnts的高位減1即可。

獲取寫鎖的過程如下:

1)假如cnts為0,則if條件不滿足,說明沒有讀者和寫者;否則要是存在讀者或者寫者,返回0,獲取寫鎖失敗。

2)把cnts的低八位寫標志設置為OXFF。

釋放寫鎖則直接把低八位的讀標志設置為0。

2.3.6 搶占

我們先回顧一下,一個進程什么時候會放棄CPU:

?時間片用完后調用schedule函數。

?由于IO等原因自己主動調用schedule。

?其他情況,當前進程被其他進程替換的時候。

那么,加入搶占(preempt)的概念后,當前進程就有可能被替換,假如當你按下鍵盤的時候,鍵盤中斷程序運行之后會讓進程B替換你當前的工作進程A,原因是B進程優先級比較高,這就是搶占。

內核要完成搶占必然需要打開本地中斷,這兩者是不可分割的,如圖2-8所示。

圖2-8 用戶鍵盤輸入發生搶占

下面我們來看Linux中開啟搶占的實現:

#define preempt_enable() \
do { \
    barrier(); \
    if (unlikely(preempt_count_dec_and_test())) \
        __preempt_schedule(); \
} while (0)

假如__preempt_count-1之后還是大于0,那么將會執行:__preempt_schedule();

asmlinkage __visible void __sched notrace preempt_schedule(void)
{
    if (likely(! preemptible()))
        return;
    preempt_schedule_common();
}
#define preemptible()(preempt_count() == 0 && ! irqs_disabled())
static void __sched notrace preempt_schedule_common(void)
{
    do {
        preempt_disable_notrace();
        __schedule(true);
        preempt_enable_no_resched_notrace();
    } while (need_resched());
}

preempt_schedule函數檢查是否允許本地中斷,以及當前進程的preempt_count字段是否為0,如果兩個條件都為真,它就調用schedule()選擇另外一個進程來運行。因此,內核搶占可能在結束內核控制路徑(通常是一個中斷處理程序)時發生,也可能在異常處理程序調用preempt_enable()重新允許內核搶占時發生。

2.3.7 per-cpu變量

目前生產環境的服務器大多數跑的都是SMP(對稱多處理器結構),如圖2-9所示。因為SMP系統多個核心與內存交互的時候,因為L1 cache的存在,會出現一致性的問題。所以,最好的方式就是每個核自己維護一份變量,自己用自己的,這樣就不會出現一致性問題了。

圖2-9 獨立L1 cache的SMP處理器架構

為了解決這個問題,Linux引入了per-cpu變量,可以在編譯時聲明,也可以在系統運行時動態生成。

首先來感受一下per-cpu變量的使用方法。per-cpu變量在使用之前需要先進行定義,編譯期間創建一個per-cpu變量:

DEFINE_PER_CPU(int, my_percpu);                    //聲明一個變量
DEFINE_PER_CPU(int[3], my_percpu_array);         //聲明一個數組

使用編譯時生成的per-cpu變量:

ptr = get_cpu_var(my_percpu);
// 使用ptr
put_cpu_var(my_percpu);

也可以使用下列宏來訪問特定CPU上的per-cpu變量:

per_cpu(my_percpu, cpu_id);

per-cpu變量導出,供模塊使用:

EXPORT_PER_CPU_SYMBOL(per_cpu_var);
EXPORT_PER_CPU_SYMBOL_GPL(per_cpu_var);

動態分配per-cpu變量:

void *alloc_percpu(type);
void *__alloc_percpu(size_t size, size_t align);

使用動態生成的per-cpu變量:

int cpu;
cpu = get_cpu();
ptr = per_cpu_ptr(my_percpu);
// 使用ptr
put_cpu();

接下來我們通過per-cpu變量的初始化過程來分析其實現原理,系統在啟動時會調用__init setup_per_cpu_areas為per-cpu變量申請內存空間:

void __init setup_per_cpu_areas(void)
{
    unsigned int cpu;
    unsigned long delta;
    int rc;
…
#ifdef CONFIG_X86_64
        atom_size = PMD_SIZE;
#else
        atom_size = PAGE_SIZE;
#endif
        rc = pcpu_embed_first_chunk(PERCPU_FIRST_CHUNK_RESERVE,
                    dyn_size, atom_size,
                    pcpu_cpu_distance,
                    pcpu_fc_alloc, pcpu_fc_free);
        …
}
if (rc < 0)
  rc = pcpu_page_first_chunk(PERCPU_FIRST_CHUNK_RESERVE,
            pcpu_fc_alloc, pcpu_fc_free,
            pcpup_populate_pte);
…
/* percpu區域已初始化并且可以使用 */
delta = (unsigned long)pcpu_base_addr - (unsigned long)__per_cpu_start;
for_each_possible_cpu(cpu) {
  per_cpu_offset(cpu) = delta + pcpu_unit_offsets[cpu];
  per_cpu(this_cpu_off, cpu) = per_cpu_offset(cpu);
  per_cpu(cpu_number, cpu) = cpu;
  setup_percpu_segment(cpu);
  setup_stack_canary_segment(cpu);
  //下面進行early init階段需要初始化的per_cpu數據
#ifdef CONFIG_X86_LOCAL_APIC
        per_cpu(x86_cpu_to_apicid, cpu) =
            early_per_cpu_map(x86_cpu_to_apicid, cpu);
        per_cpu(x86_bios_cpu_apicid, cpu) =
            early_per_cpu_map(x86_bios_cpu_apicid, cpu);
#endif
#ifdef CONFIG_X86_32
        per_cpu(x86_cpu_to_logical_apicid, cpu) =
            early_per_cpu_map(x86_cpu_to_logical_apicid, cpu);
#endif
#ifdef CONFIG_X86_64
        per_cpu(irq_stack_ptr, cpu) =
            per_cpu(irq_stack_union.irq_stack, cpu) +
            IRQ_STACK_SIZE -64;
#endif
#ifdef CONFIG_NUMA
        per_cpu(x86_cpu_to_node_map, cpu) =
            early_per_cpu_map(x86_cpu_to_node_map, cpu);
…
}

其中兩個關鍵步驟為:

1)pcpu_page_first_chunk。先分配一塊bootmem區間p,作為一級指針,然后為每個CPU分配n個頁,依次把指針存放在p中。p[0]..p[n-1]屬于cpu0, p[n]-p[2n-1]屬于CPU2,依次類推。接著建立一個長度為n×NR_CPUS的虛擬空間(vmalloc_early),并把虛擬空間對應的物理頁框設置為p數組指向的pages。然后把每CPU變量__per_cpu_load拷貝至每個CPU自己的虛擬地址空間中。

2)將.data.percpu中的數據拷貝到其中,每個CPU各有一份。由于數據從__per_cpu_start處轉移到各CPU自己的專有數據區中了,因此存取其中的變量就不能再用原先的值了,比如存取per_cpu__runqueues就不能再用per_cpu__runqueues了,需要做一個偏移量的調整,即需要加上各CPU自己的專有數據區首地址相對于__per_cpu_start的偏移量。在這里也就是__per_cpu_offset[i],其中CPU i的專有數據區相對于__per_cpu_start的偏移量為__per_cpu_offset[i]。

經過這樣的處理,.data.percpu這個section在系統初始化后就可以釋放了。

其中__per_cpu_load被重定向到了.data..percpu區域,和__per_cpu_start位置是一樣的:

#define PERCPU_SECTION(cacheline)
    . = ALIGN(PAGE_SIZE);
    .data..percpu : AT(ADDR(.data..percpu) - LOAD_OFFSET) {
        VMLINUX_SYMBOL(__per_cpu_load) = .;
        PERCPU_INPUT(cacheline)
    }

圖2-10為per-cpu變量的初始化流程,我們可以發現,經過setup_per_cpu_areas函數,per_cpu變量被拷貝到了各自的虛擬地址空間。原來的per_cpu變量區域,即__per_cpu_start和__per_cpu_end區域將會被刪除。

圖2-10 per-cpu變量的初始化

2.3.8 RCU機制

在Linux中,RCU(Read, Copy, Update)機制用于解決多個CPU同時讀寫共享數據的場景。它允許多個CPU同時進行寫操作,而且不使用鎖,其思想類似于copy on write的原理,并且實現垃圾回收器來回收舊數據。

使用RCU機制有幾個前提條件:

? RCU使用在讀者多而寫者少的情況。RCU和讀寫鎖相似。但RCU的讀者占鎖沒有任何的系統開銷。寫者與寫者之間必須要保持同步,且寫者必須要等它之前的讀者全部都退出之后才能釋放之前的資源。

? RCU保護的是指針。這一點尤其重要,因為指針賦值是一條單指令,即一個原子操作,因此更改指針指向沒必要考慮它的同步,只需要考慮cache的影響。

?讀者是可以嵌套的,也就是說rcu_read_lock()可以嵌套調用。

?讀者在持有rcu_read_lock()的時候,不能發生進程上下文切換;否則,因為寫者需要要等待讀者完成,寫者進程也會一直被阻塞。

下面是whatisRCU.txt中使用RCU鎖的例子:

struct foo {
    int a;
    char b;
    long c;
};
DEFINE_SPINLOCK(foo_mutex);
struct foo *gbl_foo;
void foo_update_a(int new_a)
{
    struct foo *new_fp;
    struct foo *old_fp;
    new_fp = kmalloc(sizeof(*new_fp), GFP_KERNEL);
    spin_lock(&foo_mutex);
    old_fp = gbl_foo;
    *new_fp = *old_fp;
    new_fp->a = new_a;
    rcu_assign_pointer(gbl_foo, new_fp);
    spin_unlock(&foo_mutex);
    synchronize_rcu();
    kfree(old_fp);
}
int foo_get_a(void)
{
    int retval;
    rcu_read_lock();
    retval = rcu_dereference(gbl_foo)->a;
    rcu_read_unlock();
    return retval;
}

如上代碼中,RCU用于保護全局指針struct foo *gbl_foo. foo_get_a()用來從RCU保護的結構中取得gbl_foo的值。而foo_update_a()用來更新被RCU保護的gbl_foo的值。

我們再思考一下,為什么要在foo_update_a()中使用自旋鎖foo_mutex呢?

假設中間沒有使用自旋鎖。那foo_update_a()的代碼如下:

void foo_update_a(int new_a)
{
    struct foo *new_fp;
    struct foo *old_fp;
    new_fp = kmalloc(sizeof(*new_fp), GFP_KERNEL);
    old_fp = gbl_foo;
    1:-------------------------
    *new_fp = *old_fp;
    new_fp->a = new_a;
    rcu_assign_pointer(gbl_foo, new_fp);
    synchronize_rcu();
    kfree(old_fp);
}

假設A進程在上面代碼的----標識處被B進程搶點,B進程也執行了goo_ipdate_a(),等B執行完后,再切換回A進程,此時,A進程所持的old_fd實際上已經被B進程給釋放掉了,此后A進程對old_fd的操作都是非法的。

RCU API說明

我們在上面也看到了幾個有關RCU的核心API,它們為別是:

rcu_read_lock()
rcu_read_unlock()
synchronize_rcu()
rcu_assign_pointer()
rcu_dereference()

其中:

? rcu_read_lock()和rcu_read_unlock()用來保持一個讀者的RCU臨界區,在該臨界區內不允許發生上下文切換。

? rcu_dereference():讀者調用它來獲得一個被RCU保護的指針。

? rcu_assign_pointer():寫者使用該函數來為被RCU保護的指針分配一個新的值,這樣是為了安全地從寫者到讀者更改其值,這個函數會返回一個新值。

? rcu_dereference:實現也很簡單,因為它們本身都是原子操作,因為只是為了cache一致性,插上了內存屏障,可以讓其他的讀者/寫者看到保護指針的最新值。

? synchronize_rcu:函數由寫者來調用,它將阻塞寫者,直到所有讀執行單元完成對臨界區的訪問后,寫者才可以繼續下一步操作。如果有多個RCU寫者調用該函數,它們將在所有讀執行單元完成對臨界區的訪問后全部被喚醒。

結合圖2-11我們來說明Linux RCU機制的思路:

圖2-11 Linux RCU機制實現原理

1)對于讀操作,可以直接對共享資源進行訪問,但前提是需要CPU支持訪存操作的原子化,現代CPU對這一點都做了保證。但是RCU的讀操作上下文是不可搶占的,所以讀訪問共享資源時可以采用read_rcu_lock(),該函數的功能是停止搶占。

2)對于寫操作,思路類似于copy on write,需要將原來的老數據做一次拷貝,然后對其進行修改,之后再用新數據更新老數據,這時采用了rcu_assign_pointer()宏,在該函數中首先通過內存屏障,然后修改老數據。這個操作完成之后,老數據需要回收,操作線程向系統注冊回收方法,等待回收。這個思路可以實現讀者與寫者之間的并發操作,但是不能解決多個寫者之間的同步,所以當存在多個寫者時,需要通過鎖機制對其進行互斥,也就是在同一時刻只能存在一個寫者。

3)在RCU機制中存在一個垃圾回收的后臺進程,用于回收老數據。回收時間點就是在更新之前的所有讀者全部退出時。由此可見,寫者在更新之后是需要睡眠等待的,需要等待讀者完成操作,如果在這個時刻讀者被搶占或者睡眠,那么很可能會導致系統死鎖。因為此時寫者在等待讀者,讀者被搶占或者睡眠,如果正在運行的線程需要訪問讀者和寫者已經占用的資源,那么將導致死鎖。

那究竟怎么去判斷當前的寫者已經操作完了呢?我們在之前看到,讀者在調用rcu_read_lock的時候會禁止搶占,因此只需要判斷所有的CPU都進過了一次上下文切換,就說明所有讀者已經退出了。參考http://www.ibm.com/developerworks/cn/linux/l-rcu/中對RCU過程有詳細描述。

2.3.9 內存屏障

程序在運行過程中,對內存訪問不一定按照代碼編寫的順序來進行。這是因為有兩種情況存在:

?編譯器對代碼進行優化。

?多CPU架構存在指令亂序訪問內存的可能。

為解決這兩個問題,分別需要通過不同的內存屏障來避免內存亂序。首先我們來看第一種情況,編譯器優化。例如有如下場景:線程1執行:

while (! condition);
print(x);

線程2執行:

x = 100;
condition = 1;

condition初始值為0,結果線程1打印出來不一定為100,因為編譯器優化后,有可能線程2先執行了condition = 1;后執行x = 100;我們可以在gcc編譯的時候加上O2或者O3的選項,就會發生編譯器優化。

為了消除該場景下編譯器優化帶來的不確定性,可以使用內存屏障:

#define barrier() __asm__ __volatile__("" ::: "memory")
x = 100;
barrier()
condition = 1;

另外,可以給變量加上volatile來去除編譯器優化:

#define ACCESS_ONCE(x) (*(volatile typeof(x) *)&(x))
ACCESS_ONCE(x) = 100;
ACCESS_ONCE(condition) = 1;

接著我們來看多CPU運行當中內存訪問亂序的問題,圖2-12是Intel CPU的P6微架構,目前大部分的Inter CPU都沿用了該架構的思路,其他都是一些小的優化。從圖中可以看到,CPU在處理指令的時候,為了提升性能,減少等待內存中的數據,采用了亂序執行引擎。

圖2-12 Intel CPU的P6微架構

注意

很多時候我們并不能保證代碼是按照我們書寫的順序來運行的。

假設如下代碼:

volatile int x, y, r1, r2;
void start()
{
    x = y = r1 = r2 = 0;
}
void end()
{
    assert(! (r1 == 0 && r2 == 0));
}
void run1()
{
    x = 1;
    r1 = y;
}
void run2()
{
    y = 1;
    r2 = x;
}

代碼執行順序為:

1)start()

2)線程1執行run1()

3)線程2執行run2()

4)調用end()

結果r1或者r2均有可能為0,原因就是亂序執行引擎的存在。要解決這個問題,在Pentium 4微處理器中引入了匯編語言指令lfence、sfence和mfence,它們分別有效地實現讀內存barrier、寫內存barrier和“讀-寫”內存barrier:

#define mb()       asm volatile("mfence":::"memory")
#define rmb()     asm volatile("lfence":::"memory")
#define wmb()     asm volatile("sfence" ::: "memory")

可以這樣修改:

void run1()
{
x = 1;
mb();
    r1 = y;
}
void run2()
{
y = 1;
mb();
    r2 = x;
}

2.4 常見開源軟件中的并發問題分析

前一節介紹了Linux中相關并發工具,實際場景中有很多應用,下面我們來對幾個開源軟件的并發設計進行分析。

2.4.1 Nginx原子性

下面我們通過分析Nginx中的原子變量實現,來介紹程序如何能做到保證原子性的。Nginx為了保證原子性設計了atomic函數,atomic的代碼如下:

static ngx_inline ngx_atomic_uint_t
ngx_atomic_cmp_set(ngx_atomic_t *lock, ngx_atomic_uint_t old,
ngx_atomic_uint_t set)
{
    u_char res;
    __asm__ volatile (
    NGX_SMP_LOCK
    ”cmpxchgl %3, %1; ”
    ”sete %0; ”
    :“=a”(res) :“m”(*lock), “a”(old), “r”(set) :“cc”, “memory”);
    return res;
}

atomic的工作原理如下:

1)在多核環境下,NGX_SMP_LOCK其實就是一條lock指令,用于鎖住總線。

2)cmpxchgl會保證指令同步執行。

3)sete根據cmpxchgl執行的結果(eflags中的zf標志位)來設置res的值。

其中假如cmpxchgl執行完之后,時間片輪轉,這個時候eflags中的值會在堆棧中保持,這是CPU task切換機制所保證的。所以,等時間片切換回來再次執行sete的時候,也不會導致并發問題。

至于信號量、互斥鎖,最終還得依賴原子性的保證,具體鎖實現可以有興趣自己再去閱讀源代碼。

下面是ngx_spinlock的實現,依賴了原子變量的ngx_atomic_cmp_set:

void
ngx_spinlock(ngx_atomic_t *lock, ngx_atomic_int_t value, ngx_uint_t spin)
{
#if (NGX_HAVE_ATOMIC_OPS)
    ngx_uint_t   i, n;
    for ( ; ; ) {
        if (*lock == 0 && ngx_atomic_cmp_set(lock, 0, value)) {
            return;
        }
        if (ngx_ncpu > 1) {
            for (n = 1; n < spin; n <<= 1) {
                  for (i = 0; i < n; i++) {
                      ngx_cpu_pause();
                  }
                  if (*lock == 0 && ngx_atomic_cmp_set(lock, 0, value)) {
                      return;
                  }
            }
        }
        ngx_sched_yield();
    }
#else
#if (NGX_THREADS)
#error ngx_spinlock() or ngx_atomic_cmp_set() are not defined !
#endif
#endif
}

在上面的代碼中,Nginx的spinlock主要實現過程如下:

1)進入死循環。

2)假如可以獲得鎖,則return。

3)循環CPU的個數次來通過ngx_atomic_cmp_set獲得鎖,假如獲得了,則return;否則繼續死循環。

2.4.2 Memcached中的互斥鎖

Memcached也使用了mutex這樣的互斥鎖,來控制對item的訪問,代碼如下:

void *item_trylock(uint32_t hv) {
    pthread_mutex_t *lock = &item_locks[hv & hashmask(item_lock_hashpower)];
    if (pthread_mutex_trylock(lock) == 0) {
        return lock;
    }
    return NULL;
}

注意

Memcached的互斥鎖粒度比較細,可以看到,針對每個item,都加了一把鎖,這樣在并發的時候,可以盡量減少沖突,提高性能。

Memcached在鎖的獲得過程中,使用了pthread_mutex_trylock:

void item_trylock_unlock(void *lock) {
    mutex_unlock((pthread_mutex_t *) lock);
}
void item_unlock(uint32_t hv) {
    uint8_t *lock_type = pthread_getspecific(item_lock_type_key);
    if (likely(*lock_type == ITEM_LOCK_GRANULAR)) {
        mutex_unlock(&item_locks[hv & hashmask(item_lock_hashpower)]);
    } else {
        mutex_unlock(&item_global_lock);
    }
}

Memcached中,鎖的釋放過程也是同樣的道理,首先從item_locks數組中找到鎖對象。然后通過mutex_unlock來解鎖。

2.4.3 Redis無鎖解決方案

Redis的服務器程序采用單進程、單線程的模型來處理客戶端的請求。對讀寫等事件的響應是通過對epoll函數的包裝來做到的。

圖2-13是Redis服務器模型原理,整個服務器初始化的過程如下:

圖2-13 Redis服務器模型

1)初始化asEventLoop。

2)初始化服務器socket監聽,并且綁定acceptTcpHandler事件函數,以應對建立客戶端連接的請求。

3)綁定beforesleep函數到eventLoop,并且調用aeMain來啟動epoll主循環。

4)主循環響應客戶端要求建立連接的請求。

5)主循環讀取客戶端命令,并執行。

6)如有數據回寫則初始化writeEvent,將數據提交到c-replay隊列。主循環需要處理此事件的時候則讀取數據寫回客戶端。

因為Redis是單線程的模型,所以,所有的操作都是先來后到串行的,因此,在這個方案中,可以不需要鎖,也沒有并發的存在,模型假設了所有操作都是基于內存的操作,速度是非常快的。

2.4.4 Linux中驚群問題分析

Linux中驚群相關的問題鼎鼎有名,但是在網上搜索相關資料,發現都是只言片語,不是說得很完整,本節對這個問題進行系統的總結。

驚群是在多線程或者多進程的場景下,多個線程或者進程在同一條件下睡眠,當喚醒條件發生的時候,會同時喚醒這些睡眠進程或者線程,但是只有一個是可以執行成功的,相當于其他幾個進程和線程被喚醒后存在執行開銷的浪費。

在Linux中,以下場景下會觸發驚群:

?多個進程或者線程在獲取同一把鎖的時候睡眠。

?多個進程或者線程同時進行accept。

?多個進程在同一個epoll上競爭。

?多個進程在多個epoll上對于同一個監聽的socket進行accept。

下面我們分別來舉例說明這幾個場景,及其解決方案。

1. Linux中通用的解決方案

Linux提供了一個wake_up_process方法,用于喚醒一個指定的進程,其聲明如下:

int wake_up_process(struct task_struct *p)

那么,假如有一堆的進程同時睡眠的時候,我們如何來維護這些睡眠的進程,并且如何只讓其中一個被喚醒呢?

Linux通過工作隊列的方式來解決這個問題,在進程睡眠之前,會先進行一個特定的操作:

prepare_to_wait_exclusive(wait_queue_head_t *q, wait_queue_t *wait, int state)
{
    unsigned long flags;
    wait->flags |= WQ_FLAG_EXCLUSIVE;
    spin_lock_irqsave(&q->lock, flags);
    if (list_empty(&wait->task_list))
        __add_wait_queue_tail(q, wait);
    set_current_state(state);
    spin_unlock_irqrestore(&q->lock, flags);
}

以上prepare_to_wait_exclusive函數主要是將當前的flags加上了WQ_FLAG_EXCLU-SIVE的標志,然后放入到工作隊列的尾部,最后設置相應的狀態,例如TASK_INTERR-UPTIBLE表示可以被wake_up喚醒。

當我們需要進行喚醒的時候,Linux提供了_ _wake_up_common方法,來喚醒工作隊列中的進程:

static void __wake_up_common(wait_queue_head_t *q, unsigned int mode,
            int nr_exclusive, int wake_flags, void *key)
{
    wait_queue_t *curr, *next;
    list_for_each_entry_safe(curr, next, &q->task_list, task_list) {
        unsigned flags = curr->flags;
        if (curr->func(curr, mode, wake_flags, key) &&
                  (flags & WQ_FLAG_EXCLUSIVE) && ! --nr_exclusive)
            break;
    }
}

上面的_ _wake_up_common方法會遍歷工作隊列,尋找flags中含有WQ_FLAG_EXCLUSIVE標志的進程,當nr_exclusive減為0的時候,就會跳出循環,所以只能喚醒nr_exclusive個進程,比如nr_exclusive=1。

其中curr->func的回調函數是通過DEFINE_WAIT(wait)宏來定義的:

#define DEFINE_WAIT_FUNC(name, function)
    wait_queue_t name = {
        .private        = current,
        .func            = function,
        .task_list     = LIST_HEAD_INIT((name).task_list),
    }
#define DEFINE_WAIT(name) DEFINE_WAIT_FUNC(name, autoremove_wake_function)

通過上面的代碼可以發現回調函數為autoremove_wake_function:

int autoremove_wake_function(wait_queue_t *wait, unsigned mode, int sync, void
    *key)
{
    int ret = default_wake_function(wait, mode, sync, key);
    if (ret)
        list_del_init(&wait->task_list);
    return ret;
}
int default_wake_function(wait_queue_t *curr, unsigned mode, int wake_flags,
            void *key)
{
    return try_to_wake_up(curr->private, mode, wake_flags);
}

autoremove_wake_function最終通過default_wake_function調用try_to_wake_up來實現喚醒指定的進程。整個流程見圖2-14。

圖2-14 Linux進程喚醒流程

2. socket accept場景下的驚群及解決方案

在Linux中,針對服務器監聽的socket進行accept操作,假如沒有新的accept事件,那么會進行睡眠。sys_accept調用最終會在TCP層執行inet_csk_accept函數:

struct sock *inet_csk_accept(struct sock *sk, int flags, int *err)
{
    struct inet_connection_sock *icsk = inet_csk(sk);
    struct request_sock_queue *queue = &icsk->icsk_accept_queue;
    struct request_sock *req;
    struct sock *newsk;
    int error;
    lock_sock(sk);
...
// 阻塞等待,直到有全連接建立。如果用戶設置了等待超時時間,超時后會退出
    error = inet_csk_wait_for_connect(sk, timeo);
...
out:
    release_sock(sk);
...

inet_csk_accept在等待accept連接到來的時候,會執行inet_csk_wait_for_connect:

static int inet_csk_wait_for_connect(struct sock *sk, long timeo)
{
    struct inet_connection_sock *icsk = inet_csk(sk);
    DEFINE_WAIT(wait);
    ...
    for (; ; ) {
        prepare_to_wait_exclusive(sk_sleep(sk), &wait,
                          TASK_INTERRUPTIBLE);
        ...
        if (reqsk_queue_empty(&icsk->icsk_accept_queue))
            timeo = schedule_timeout(timeo);
        ...
    }
      ..
}

上面的過程看著眼熟嗎,prepare_to_wait_exclusive的作用在上一個例子已經介紹過了,這里會把當前的進程通過DEFINE_WAIT(wait)包裝成wait_queue_t結構,并且放入到監聽socket的等待隊列尾部。然后通過schedule_timeout讓當前進程睡眠timeo個時間。

該進程被喚醒有幾種可能:

?睡眠timeo后被timer定時器喚醒。

? accept事件到來被喚醒。

第2種被喚醒的場景是由網絡層的代碼觸發的。以TCP V4協議為例,其執行過程為:tcp_v4_rcv->tcp_v4_do_rcv->tcp_child_process,在tcp_child_process方法中會調用父socket,也就是監聽socket的parent->sk_data_ready(parent)方法,在sock_init_data的時候,我們發現,該函數的定義如下:

sk->sk_data_ready = sock_def_readable;
sock_def_readable函數實現為:
static void sock_def_readable(struct sock *sk)
{
    struct socket_wq *wq;
    rcu_read_lock();
    wq = rcu_dereference(sk->sk_wq);
    if (skwq_has_sleeper(wq))
        wake_up_interruptible_sync_poll(&wq->wait, POLLIN | POLLPRI |
            POLLRDNORM | POLLRDBAND);
    sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_IN);
    rcu_read_unlock();
}

sock_def_readable首先判斷是否在等待隊列中有睡眠的進程,然后通過wake_up_interruptible_sync_poll進行喚醒。其實現如下:

#define wake_up_interruptible_sync_poll(x, m)
    __wake_up_sync_key((x), TASK_INTERRUPTIBLE, 1, (void *) (m))
void __wake_up_sync_key(wait_queue_head_t *q, unsigned int mode,
            int nr_exclusive, void *key)
{
    unsigned long flags;
    int wake_flags = 1;
    if (unlikely(! q))
        return;
    if (unlikely(nr_exclusive ! = 1))
        wake_flags = 0;
    spin_lock_irqsave(&q->lock, flags);
    __wake_up_common(q, mode, nr_exclusive, wake_flags, key);
    spin_unlock_irqrestore(&q->lock, flags);
}

最終發現是由_ _wake_up_common來喚醒的,和前面介紹的是一樣的,并且nr_exclusive為1。說明只會喚醒一個,不會發生驚群。

那么,在inet_csk_accept的時候,lock_sock(sk)操作為什么不能避免驚群呢?理論上鎖住了監聽的socket,每次只有一個進程可以accept了呀。事實上,lock_sock(sk)的時候,要是拿不到鎖,也會進行睡眠,假如不做特殊處理,也有可能驚群,lock_sock最終調用_ _lock_sock:

static void __lock_sock(struct sock *sk)
    __releases(&sk->sk_lock.slock)
    __acquires(&sk->sk_lock.slock)
{
    DEFINE_WAIT(wait);
    for (; ; ) {
        prepare_to_wait_exclusive(&sk->sk_lock.wq, &wait,
                TASK_UNINTERRUPTIBLE);
        spin_unlock_bh(&sk->sk_lock.slock);
        schedule();
        spin_lock_bh(&sk->sk_lock.slock);
        if (! sock_owned_by_user(sk))
            break;
    }
    finish_wait(&sk->sk_lock.wq, &wait);
}

當無法獲得上鎖條件進行schedule放棄CPU之前,會先進行prepare_to_wait_exclusive,這個動作前面已經解釋得很清楚了。所以,假如同時有多個進程在lock_sock阻塞的時候,也僅會被喚醒一個。

最后,圖2-15描述了accept的整體流程圖。

圖2-15 Linux accept的流程

3. epoll的驚群解決方案

在使用epoll的時候,我們會在注冊事件后調用epoll_wait,該系統調用會調用ep_poll方法:

static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
        int maxevents, long timeout)
{
  ..
// 沒有事件,所以需要睡眠。當有事件到來時,睡眠會被ep_poll_callback函數喚醒
// 將current初始化為等待隊列項wait后,放入ep→wg這個等待隊列中
init_waitqueue_entry(&wait, current);
    __add_wait_queue_exclusive(&ep->wq, &wait);
    for (; ; ) {
        // 執行ep_poll_callback()喚醒時應當將當前進程喚醒,所以當前進程狀態應該為“可喚醒”
          TASK_INTERRUPTIBLE
        set_current_state(TASK_INTERRUPTIBLE)
        // 如果就緒隊列不為空(已經有文件的狀態就緒)或者超時,則退出循環
        if (ep_events_available(ep) || timed_out)
            break;
        // 如果當前進程接收到信號,則退出循環,返回EINTR錯誤
        if (signal_pending(current)) {
            res = -EINTR;
            break;
        }
        spin_unlock_irqrestore(&ep->lock, flags);
        // 放棄CPU休眠一段時間
        if (! schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS))
            timed_out = 1;
        spin_lock_irqsave(&ep->lock, flags);
    }
    __remove_wait_queue(&ep->wq, &wait);
    __set_current_state(TASK_RUNNING);
}
  ..

我們發現,假如沒有事件,需要睡眠,通過_ _add_wait_queue_exclusive將當前進程放入等待隊列的隊頭中,其實現如下:

static inline void
__add_wait_queue_exclusive(wait_queue_head_t *q, wait_queue_t *wait)
{
    wait->flags |= WQ_FLAG_EXCLUSIVE;
    __add_wait_queue(q, wait);
}

其中WQ_FLAG_EXCLUSIVE用于賦給flgas,表示該睡眠進程是一個互斥進程。

睡眠的當前進程會被回調函數ep_poll_callback喚醒,其實現如下:

static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void
    *key)
{
..
wake_up_locked(&ep->wq);
...
}
#define wake_up_locked(x)   __wake_up_locked((x), TASK_NORMAL, 1)
void __wake_up_locked(wait_queue_head_t *q, unsigned int mode, int nr)
{
    __wake_up_common(q, mode, nr, 0, NULL);
}

ep_poll_callback最終通過__wake_up_common函數來喚醒等待隊列中的互斥進程。

4. Nginx為什么還有驚群問題

我們分析一下Nginx為什么還會有驚群問題呢?Nginx不是已經使用了epoll了嗎?epoll上面又已經解決了,為什么還會有這個問題呢?原因是Nginx的master在fork出多個worker進程后,worker進程才創建出多個epoll,所以多個進程假如同時進行epoll_wait,還是有可能會發生驚群問題,因為每個worker都維護了一個進程。

worker在循環中,會執行ngx_process_events_and_timers,我們來看它的實現:

void
ngx_process_events_and_timers(ngx_cycle_t *cycle)
{
  ..
if (ngx_use_accept_mutex) {
        if (ngx_accept_disabled > 0) {
            ngx_accept_disabled--;
        } else {
            if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) {
                  return;
            }
            if (ngx_accept_mutex_held) {
                  flags |= NGX_POST_EVENTS;
            } else {
                  if (timer == NGX_TIMER_INFINITE
                      || timer > ngx_accept_mutex_delay)
                  {
                      timer = ngx_accept_mutex_delay;
                  }
            }
        }
    }
...

上面的代碼解釋如下:假如ngx_accept_disabled>0,表示現在該woker已經壓力很大了,所以不再接受新的處理。否則,會先嘗試獲取互斥鎖,ngx_trylock_accept_mutex。

至于ngx_accept_disabled的大小設定,在每次accept事件處理完之后,進行相應的設置:

void
ngx_event_accept(ngx_event_t *ev)
{
  ..
ngx_accept_disabled = ngx_cycle->connection_n / 8
    - ngx_cycle->free_connection_n;
...
}

上面這個值的意思為最大連接數的八分之一減去空閑連接的數量。大于0說明空閑連接的數量都已經少于八分之一了。

通過上面代碼可以發現,不管是woker的負載平衡,還是驚群問題的解決,都需要滿足ngx_use_accept_mutex條件,可以通過修改配置解決,如下所示:

events {
accept_mutex on;
}

因為Nginx的worker數量本來就有限,與CPU核數相當,所以,打開該鎖意義不是很大,另外在高并發場景下,因為驚群鎖的存在,吞吐量反而會下降,Nginx在最新版本里也默認是關閉該鎖的。

只有針對Apache這種多線程模型,而且會fork出成百上千個線程的,這個問題才會嚴重。我們來看Nginx作者的說法:“操作系統有可能會喚醒等待在accept()和select()調用阻塞的所有進程,這會引發驚群問題。在有很多worker的Apache(數百個或者更多)中會引發這個問題,但是假如你使用僅僅只有數個(通常為CPU核數)worker的Nginx,就不會引發這個問題。因此在Nginx中,你在使用select/kqueue/epoll等(除了accept())來調度進入的連接,可以關閉accept_mutex。”

2.4.5 解決MyCat同步問題

MyCat是用Java開發的開源數據庫中間件,其服務器采用的是reactor模型(關于I/O模型,我們在I/O的章節中會具體介紹)。圖2-16是我整理的MyCat的服務器中心領域模型。

圖2-16 MyCat服務器領域模型

這是一個典型的Reactor模型,NIOReactorPool會預先分配N個Reactor工作線程,并且每個Reactor會維護一個selector,當事件就緒后,Reactor就會執行相關事件的回調函數。

基于這個思路MyCat中所有的I/O操作都是異步操作,但是我自定義的handler有個同步的過程,沒辦法,業務就是這么依賴了第三方。我只能這樣編寫代碼:

final CountDownLatch Latch=New CountDownLatch(1);
Ctx.executeNaiveSQLSequnceJob(dataNodes, sql, new  SQLJobHandler(){private
    List<byte[]>fields;
    @Override
    public boolean onRowData(String dataNode, byte[]rowData){String c1=ResultSe
        tUtil, getColumnValAsString(rowData, fields, θ);
        String c2=ResultSetUtil, getColumnVaLasString(riwData, fields,1);
        share.seDataNode(c1);
        share.setIndex(c2);
        cacheLock.writeLock().lock();
        try{
            cache.put(bid, share);
            latch.countDown();
            retrurn false;
        }finally{
            cacheLock.writeLock().unlock();
        }
    }
    @Override
    public void onHeader(String dataNode, byte[]header, List<byte[]>fields)
        this.fields=fields;
    }
    @Override
    public void finished(String dataNode, boolean failde){latch.countDown();
    }
});
    try{
        latch.await(5, TimeUnit.SECONDS);
    }catch(InterruptedException e){

然后在實際測試過程中,發現偶然會出現線程卡死現象,我們回顧圖2-16就發現了問題。因為MyCat的客戶端連接(FrontedConnection)和后端MySQL的連接共用一個Reactor的池子,所以有可能會發生前端和后端同時被分配同一個Reactor,那么要是前端沒退出,后端必然沒法執行,然后互相等待造成死鎖。

為了解決我的問題,我給前端單獨分配了個池子,如下所示:

LUCGER.into(using nio network nanater);
NIOReactorPool  reactorPool=new  NIOReactorPool(BufferPool.LOCAL_BUF_THREAD_
    PREX+"NIOREACTOR",
    processors.length);
NIOReactorPool  clientReactorPool=new  NIOReactorPool(BufferPool.LOCAL_BUF_
    THREAD_PREX+" CLIENT_NIOREACTOR, "processors.length);
connector=new  NIO  Connector(BufferPool.LOCAL_BUF_THREAD_PREX+"NIO  Connector",
    reactorPool);
((NIOConnector)connector).start();
manager=new  NIOAcceptor(BufferPool.LOCAL_BUF_THREAD_PREX+NAME+"Manager", system.
    getBindIp(), system.getManagerPort(), mf, reactorPool);
server=new  NIOAcceptor(BufferPoo.LOCAL_BUF_THREAD_PREX+NAME+"Sever", syetem.
    getBindIp(), system.getServerPort(), sf, clientReactorPool);

2.4.6 false-sharing問題解決方案

CPU能從本地緩存中取數據就不會從內存中取,而內存中的數據和緩存中的數據一般都是按行讀取的,也就是所謂的緩存行,一般為64個字節,當我們操作數據的時候,假如剛好多個變量在同一個緩存行的時候,多線程同時操作就會讓之前的緩存行失效,導致程序效率降低。如圖2-17所示,兩個變量共享了同一個緩存行,從L1~L3cache,只要當X更新時,Y也就被踢出了緩存,反之亦然,重新從內存載入數據。

圖2-17 False-sharing問題

為解決該問題,很多時候只能通過以空間換時間來搞定,比如在X和Y中間添加一個不使用的變量,僅僅用來占據空間,隔開緩存行那么就會把X和Y分割為2個緩存行,各自更新,相互不受影響,就是浪費空間而已。

下面我們來看兩個具體例子。

1. Jetty中的解決方案

Jetty在實現BlockingArrayQueue的時候,會加上以下代碼:

private long _space0;
private long _space1;
private long _space2;
private long _space3;
private long _space4;
private long _space5;
private long _space6;
private long _space7;

2. Nginx的解決方案

在C程序中,Nginx也有類似的實現:

typedef union
{
    erts_smp_rwmtx_t rwmtx;
    byte cache_line_align_[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(erts_smp_
        rwmtx_t))];
}erts_meta_main_tab_lock_t;
erts_meta_main_tab_lock_t main_tab_lock[16]

在下一章介紹內存slab分配器的時候(3.5.2節),著色也是用來解決false sharing的問題。

2.5 本章小結

并發一直是計算機工程領域的一個重要話題,有很多書籍和文章,甚至有很多論文專門對此進行過探討。很多初學者覺得并發很難懂,很麻煩。其實所有的問題歸根結底都是由簡單的道理組成的,我認為,脫離計算機的底層原理來談并發都是水中月,鏡中花,尤其對初學者來說,會陷入在一堆并發編程的API中難以自拔。

本章開篇就闡述了到底什么是并發,并發會引發的問題,這樣便于后續更加深入理解并發相關處理。對于應用程序員來講,不管你是用C或是Java,甚至是Go語言,我們面臨的并發問題,操作系統同樣面臨類似的問題。所以,只有在理解了操作系統的并發場景后,我們才會理解Linux內核的并發工具:atomicspin_lock、semaphore、mutex、讀寫鎖、per-cpu、搶占、內存屏障、RCU機制等。

最后,分析了常見開源軟件遇到的一些并發解決方案:Nginx的原子性、Memcached的互斥鎖、Linux中驚群問題分析、解決Mycat中的同步問題、偽共享問題解決方案等,將這些應用與Linux內核的相關實現對照,就能做到融會貫通。

主站蜘蛛池模板: 东源县| 遂溪县| 工布江达县| 宾阳县| 渝中区| 封丘县| 桦甸市| 鸡泽县| 米易县| 丹阳市| 黄龙县| 荣成市| 连南| 北碚区| 河间市| 宜昌市| 库车县| 太谷县| 浦东新区| 内江市| 威远县| 五指山市| 伽师县| 新和县| 黔南| 邳州市| 海宁市| 崇阳县| 梁山县| 滨州市| 阿鲁科尔沁旗| 崇仁县| 河源市| 乳源| 丹凤县| 竹山县| 北流市| 东安县| 建平县| 读书| 合江县|