shan

操作系统-并发

2020-05-31

本文为操作系统导论中关于并发部分的读书笔记。其中主要涉及多线程、锁、信号量等相关知识。

线程

介绍

经典观点是一个程序只有一个执行点(一个程序计数器,用来存放要执行的指令),但多线程程序会有多个执行点(多个程序计数器,每个都用于取指令和执行),换个角度看,每个线程类似于独立的进程,只有一点区别:他们共享地址空间,从而能够访问相同的数据

线程与进程之间的相同点:

  • 有一个程序计数器,记录程序从哪里获取指令。
  • 线程切换和进程切换也有类似的上下文切换,对于进程会将状态保存到进程控制块(Process Control Block,PCB),线程需要一个线程控制块(Thread Control Block,TCB)。

线程与进程之间的不同点:

  • 在上下文切换的过程中,线程相比于进程地址空间保持不变。
  • 线程与进程之间另一个主要区别是栈,在简单的传统进程地址空间模型中,通常位于地址空间的底部;在多进程中,每个线程都有一个栈。如下图中的单线程和多线程的地址空间示例。
image-20200531212324465

上图显示,两个栈跨越了进程的地址空间。因此,所有位于栈上的变量、参数、返回值和其他放在栈上的东西,被放置在有称为线程本地存储的地方,即线程相关的栈。

下面的术语是关于并发代码的:

  • 临界区(critical section):访问共享资源的一段代码,资源通常是一个变量或者数据结构。
  • 竞态条件(race condition):出现在多个执行线程大致同时进入临界区,他们都视图更新共享的数据结构,导致了出现了不希望的结果。
  • 不确定性(indeterminate):程序由一个或多个竞态条件组成,程序的输出因运行而异,具体取决于哪个线程在何时运行。这导致不确定性。
  • 互斥(mutual exclusion):保证只有一个线程临界区,从而避免出现竞态,并产生确定的程序输出。

线程API

创建线程

1
2
3
4
5
6
#include <pthread.h>

int pthread_create(pthread_t *thread,
const pthread_attr_t *attr,
void *(*start_routine)(void *),
void *arg);

该函数有4 个参数:threadattrstart_routinearg

第一个参数 thread 是指向pthread_t 结构类型的指针,我们将利用这个结构与该线程交互,因此需要将它传入pthread_create(),以便将它初始化。

第二个参数 attr 用于指定该线程可能具有的任何属性。

第三个start_routine参数用于决定线程应该在哪个函数中运行,在C 中,我们把它称为一个函数指针(function pointer)。

第四个参数arg 就是要传递给线程开始执行的函数的参数。

等待线程完成

调用函数 pthread_join() ,该函数有两个参数,第一个参数为 pthread_t 类型,用于指定要等待的线程。第二个参数是一个指针,指向你希望得到的返回值。

可以提供互斥进入临界区的那些函数,这方面最基本的函数是:

1
2
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);

条件变量:当线程之间必须发生某种信号时,如果一个线程在等待另一个线程继续执行默写操作,条件变量就很有用,其主要函数如下:

1
2
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
int pthread_cond_signal(pthread_cond_t *cond);

通过对并发的介绍,我们看到了并发编程的一个基本问题:我们希望原子式执行一系列指令,但由于单处理器上的中断,很难做到这点。我们采用锁这种凡是,来解决这一问题。

锁的基本思想

锁就是一个变量,因此我们需要声明一个某种类型的锁变量(lock variable),才能使用。这个锁变量保存了锁在某一时刻的状态,要么是可用的(available),表示没有线程持有锁,要么是被占用的(acquired),表示有一个线程持有锁,正处于临界区。我们也可以保存其他的信息,比如持有锁的线程,或者请求获取锁的线程队列,但这些信息会被隐藏起来,锁的使用者不会发现。

锁为程序员提供了最小程度的调度控制。我们把线程视为程序员创建的实体,但是被操作系统调度,具体方式由操作系统做出选择。锁让程序员获得一些控制权。通过给临界区加锁,可以保证临界区内只有一个线程活跃。锁将原本由操作系统调度的混乱状态变得更为可控。

Pthread 锁

POSIX 库将锁称为互斥量(mutex),它们被用来提供线程之间的互斥。我们能用不同的锁来保护不同的变量,这样可以增加并发:不同于任何临界区都使用同一个大锁(粗粒度锁策略),通常大家会用不同的锁保护不同的数据和结构,从而允许更多的线程进入临界区(细粒度方案)。

实现一个锁

上面对锁已经有了一定的理解,那么如何实现一个锁,需要什么样的硬件支持?什么样的操作系统的支持?

各种计算机体系结构的指令集都增加了一些不同的硬件原语,我们不研究这些指令是如何实现的,只研究如何使用它们来实现像锁这样的互斥原语。我们也
会研究操作系统如何发展完善,支持实现成熟复杂的锁库。

锁的标准

为了实现锁,我们应该设立一些标准:

  • 提供互斥(mutual exclusion),最基本的,锁是否有效,能够阻止多个线程进入临界区。
  • 公平性(fairness),让每个竞争的线程有公平的机会抢到锁。
  • 性能(performance),使用锁之后增加的时间开销最小。

互斥的解决方案

  • 在临界区关闭中断,这个解决方案是为但处理器系统开发的。这种方案优点是简单,缺点很多:

    1. 一个贪婪的程序在它开始就调用lock(),从而独占处理器。
    2. 不支持多处理器。
    3. 关闭中断导致中断丢失,可能会导致严重的系统问题。
    4. 效率太低。
  • 测试并设置指令(test-and-set instruction),也叫做原子交换(atomic exchange),这里实现了一种简单的锁:自旋锁(spin lock),利用CPU周期,一直自旋,直到锁可用。在单处理器上,需要抢占式的调度器(preemptive scheduler,即不断通过时钟中断一个线程,运行其他线程)。否则自旋锁在单CPU上无法使用,因为自旋的线程永远不会放弃CPU。优点:这是一个正确的锁,一次只允许一个线程进入临界区。缺点:

    1. 自旋锁不提供任何公平性的保证。
    2. 在单CPU的情况下,性能开销十分巨大,假设一个线程持有锁进入临界区时被抢占。调度器可能会运行其他每一个线程,而其他线程都在竞争锁,都会在放弃CPU 之前,自旋一个时间片,浪费CPU 周期。但在多CPU上性能表现不错。
  • 比较并交换指令(compare-and-swap,compare-and-exchange (x86)):实现自旋锁的情况下,与上面类似。

  • 链接的加载(load-link)和条件式的存储(store-conditional):可以用这两个指令来配合使用实现并发结构。链接的加载指令和典型加载指令类似,都是从内存中取出值存入一个寄存器。关键区别来自条件式存储(store-conditional)指令,只有上一次加载的地址在期间都没有更新时,才会成功,(同时更新刚才链接的加载的地址的值)。成功时,条件存储返回1,并将 ptr 指的值更新为value。失败时,返回0,并且不会更新值。

  • 获取并增加指令(fetch-and-add):他能原子的返回特定地址的旧值,并让该值自增一。我们用这个特性,实现一个ticket锁。实现如下面代码所示,此法保证所有的线程都能抢到锁。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    # include <stdio.h>
    # 实现ticket锁

    int FetchAndAdd(int *ptr) {
    int old = *ptr;
    *ptr = old + 1;
    return old;
    }
    typedef struct lock_t {
    int ticket;
    int turn;
    } lock_t;

    void lock_init(lock_t *lock) {
    lock->ticket = 0;
    lock->turn = 0;
    }

    void lock(lock_t *lock) {
    int myturn = FetchAndAdd(&lock->ticket);
    while (lock->turn != myturn)
    ; // spin
    }
    void unlock(lock_t *lock) {
    FetchAndAdd(&lock->turn);
    }
  • 在要自旋的时候,放弃CPU,这样可以节省CPU的开销。

  • 使用队列,让休眠替代自旋。将之前的测试并设置和等待队列结合,实现了一个更高性能的锁。其次,我们通过队列来控制谁会获得锁,避免饿死。

  • 两阶段锁(two-phase lock),在需要快速释放锁的场景内很有用。

基于锁的并发数据结构

对于特定数据结构,如何加锁才能让该结构功能正确?如何对该数据结构加锁,才能保证高性能,即并发访问?

并发链表、并发队列、并发散列表等结构的实现见操作系统导论第二十九章部分。下面是这些部分的简述

计数器

没有同步机制的计数器代码很简单,但是线程安全的并且保持高性能的可扩展计数器已经研究多年了。

懒惰计数器是最近研究提出的,其基本思想是:懒惰计数器通过多个局部计数器和一个全局计数器来实现一个逻辑计数器,其中每个CPU核心有一个局部计数器。如果核心上的线程想增加计数器,那就增加它的局部计数器,为了保持全局计数器的更新,局部值会定期转移给全局计数器,方法是获取全局锁,让全局计数器加上局部计数器的值,然后把局部计数器置零。这种局部转全局的频度,取决于一个阈值,阈值越小,懒惰计数器则趋近于非扩展计数器。阈值越大,扩展性越强,全局计数器与实际计数的偏差越大。

下面为懒惰计数器的基本实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
#include <pthread.h>

#define NUMCPUS 4


typedef struct counter_t {
int global; // global count
pthread_mutex_t glock; // global lock
int local[NUMCPUS]; // local count (per cpu)
pthread_mutex_t llock[NUMCPUS]; // ... and locks
int threshold; // update frequency
} counter_t;

// init: record threshold, init locks, init values
// of all local counts and global count
void init(counter_t *c, int threshold) {
c->threshold = threshold;

c->global = 0;
pthread_mutex_init(&c->glock, NULL);

int i;
for (i = 0; i < NUMCPUS; i++) {
c->local[i] = 0;
pthread_mutex_init(&c->llock[i], NULL);
}
}

// update: usually, just grab local lock and update local amount
// once local count has risen by 'threshold', grab global
// lock and transfer local values to it
void update(counter_t *c, int threadID, int amt) {
pthread_mutex_lock(&c->llock[threadID]);
c->local[threadID] += amt; // assumes amt > 0
if (c->local[threadID] >= c->threshold) { // transfer to global
pthread_mutex_lock(&c->glock);
c->global += c->local[threadID];
pthread_mutex_unlock(&c->glock);
c->local[threadID] = 0;
}
pthread_mutex_unlock(&c->llock[threadID]);
}

// get: just return global amount (which may not be perfect)
int get(counter_t *c) {
pthread_mutex_lock(&c->glock);
int val = c->global;
pthread_mutex_unlock(&c->glock);
return val; // only approximate!
}

并发链表

下面为并发链表的代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#include <stdlib.h>
#include <pthread.h>

typedef struct node_t {
int key;
struct node_t *next;
} node_t;

typedef struct list_t {
node_t *head;
pthread_mutex_t lock;
} list_t;

void List_Init(list_t *L) {
L->head = NULL;
pthread_mutex_init(&L->lock, NULL);
}

int List_Insert(list_t *L, int key) {
pthread_mutex_lock(&L->lock);
node_t *new = malloc(sizeof(node_t));
if (new == NULL) {
perror("malloc");
pthread_mutex_unlock(&L->lock);
return -1;
}
new->key = key;
new->next = L->head;
L->head = new;
pthread_mutex_unlock(&L->lock);
return 0;
}

int List_Lookup(list_t *L, int key) {
pthread_mutex_lock(&L->lock);
node_t *curr = L->head;
while (curr) {
if (curr->key == key) {
pthread_mutex_unlock(&L->lock);
return 0;
}
curr = curr->next;
}
pthread_mutex_unlock(&L->lock);
return -1;
}

上面的代码在插入函数入口处获取锁,结束时释放锁,但是如果malloc失败会存在问题,代码在插入失败时必须释放锁。因此我们需要对代码进行改动,保持并发的正确,在失败的时候也释放锁。

这里调整的代码目的是为了让获取和释放锁只环绕插入代码的真正临界区。另一个改动是查找,跳出主查找循环,到单一的返回路径,这样降低了不小心忘记释放锁的可能性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
int List_Insert(list_t *L, int key) {
node_t *new = malloc(sizeof(node_t));
if (new == NULL) {
perror("malloc");
pthread_mutex_unlock(&L->lock);
return -1;
}
new->key = key;
pthread_mutex_lock(&L->lock);
new->next = L->head;
L->head = new;
pthread_mutex_unlock(&L->lock);
return 0;
}

int List_Lookup(list_t *L, int key) {
int rv = -1;
pthread_mutex_lock(&L->lock);
node_t *curr = L->head;
while (curr) {
if (curr->key == key) {
rv = 0;
break;
}
curr = curr->next;
}
pthread_mutex_unlock(&L->lock);
return rv;
}

但这个链表的扩展性不好,这里有种方法增加到链表中,理论上增加了并发性,这种技术称为过手锁(hand-over-hand locking),也叫锁耦合(lock coupling),原理是:每个节点都有一个锁,替代之前整个链表一个锁,遍历链表时,首先抢占下一个节点的锁,然后释放当前节点的锁。从理论上可以增加并发,但是由于对于锁的消耗开销巨大,很难比单锁的方法快。

并发队列

根据队列的数据结构,分为队列头和队列尾,在这两个特殊的位置加两把锁,使得入队操作和出队操作可以并发执行,因为入队只访问tail锁,出队只访问head锁。在下面的例子中添加了一个假节点,该节点分开了头和尾的操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
#include <stdlib.h>
#include <pthread.h>
#include <assert.h>

typedef struct __node_t {
int value;
struct __node_t *next;
}node_t;

typedef struct queue_t {
node_t *head;
node_t *tail;
pthread_mutex_t headLock;
pthread_mutex_t tailLock;
} queue_t;

void Queue_Init(queue_t *q) {
node_t *temp = malloc(sizeof(node_t));
temp->next = NULL;
q->head = q->tail = temp;
pthread_mutex_init(&q->headLock, NULL);
pthread_mutex_init(&q->tailLock, NULL);
}

void Queue_Enqueue(queue_t *q, int value){
node_t *tmp = malloc(sizeof(node_t));
assert(tmp != NULL);
tmp->value = value;
tmp->next = NULL;

pthread_mutex_lock(&q->tailLock);
q->tail->next = tmp;
q->tail = tmp;
pthread_mutex_unlock(&q->tailLock);
}

int Queue_Dequeue(queue_t *q, int *value) {
pthread_mutex_lock(&q->headLock);
node_t *tmp = q->head;
node_t *newHead = tmp->next;
if (newHead == NULL) {
pthread_mutex_unlock(&q->headLock);
return -1;
}
*value = newHead->value;
q->head = newHead;
pthread_mutex_unlock(&q->headLock);
free(tmp);
return 0;
}

条件变量

上面学习了锁的概念以及如何通过硬件和操作系统支持的正确组合来实现锁。然而在很多情况下,线程需要检查某一条件满足之后,才会集训运行,如父线程需要检查子线程他是否执行完毕(join)。

多线程程序中,一个线程等待某些条件是很常见的。简单的方案是自旋直到条件满足,这是极其低效的,某些情况下甚至是错误的。那么,线程应该如何等待一个条件?

线程可以使用条件变量(condition variable),来等待一个条件变成真。条件变量是一个显式队列,当某些执行状态不满足时,线程可以把自己加入队列,等待该条件。另外某个线程,当它改变了上述状态时,就可以唤醒一个或多个等待线程。

条件变量有两种相关操作:wait() 和 signal(),线程要睡眠的时候调用wait(),当线程想要唤醒等待在某个条件变量上的睡眠线程时,调用signal()。

生产者/消费者问题

生产者/消费者问题,也叫做有界缓冲区(bounded buffer)问题。假设有一个或多个生产者和一个或多个消费者线程,生产者把生产的数据放入缓冲区,消费者从缓冲区取走数据项,以某种方式消费。

因为有界缓冲区是共享资源,所以我们必须通过同步机制来访问它,以免产生竞态条件。

增加更多缓冲区槽位,这样在睡眠之前,可以生产多个值。同样,睡眠之前可以消费多个值。单个生产者和消费者时,这种方案因为上下文切换少,提高了效率。多个生产者和消费者时,它甚至支持并发生产和消费,从而提高了并发。下节将详细介绍这个问题。

信号量

本节的问题是如何使用信号量代替锁和条件变量?什么是信号量?什么是二值信号量?用锁和条件变量来实现信号量是否简单?不用锁和条件变量,如何实现信号量?

信号量的定义

信号量是一个有整数值的对象,可以用两个函数来操作它,在POSIX标准中,是sem_wait()sem_post()。因为信号量的初始值能够决定其行为,所以首先要初始化信号量,才能调用其他函数与之交互。

信号量接口的几个需要说明的方面:

  1. sem_wait() 要么立刻返回,要么会让调用线程挂起,直到之后的一个post操作。也可能多个调用线程都调用sem_wait(),因此都在队列中等待被唤醒。
  2. sem_post()并没有等待某些条件满足,它直接增加信号量的值,如果有等待线程,唤醒其中一个。
  3. 当信号量的值为负数时,这个值就是等待线程的个数。

二值信号量(锁):使用信号量来实现锁,因为锁只有两个状态,所以这种用法有时也叫做二值信号量(binary semaphore)。其使用方法如下:

1
2
3
4
5
6
sem_t m;
sem_init(&m, 0, X); //根据不同的需求,X的值不同

sem_wait(&m);
//竞态条件,加锁部分
sem_post(&m);

信号量作为条件变量

信号量也可以用在一个线程暂停执行,等待某一条件成立的场景。因为等待线程在等待某些条件发生变化,所以我们将信号量作为条件变量(condition variable)

父线程调用sem_wait(),子线程调用sem_post(),父线程应该等待子线程的完成,这时需要将信号量的初始值设置为0,才能发挥应有的功能,具体情况如下:

  • 第一种情况,父线程创建了子线程,但子线程没有运行,这种情况下,父线程调用sem_wait()会先于子线程调用sem_post(),我们希望父线程等待子线程运行,故当初始值为0时,父线程运行将信号量减为-1,等待睡眠;子线程运行的时候,调用sem_post(),信号量增加为0,唤醒父线程,父线程从sem_wait() 返回,完成该程序。
  • 第二种情况是,子线程在父线程调用sem_wait()之前结束,在这种情况下,子线程会先调用sem_post(),将信号量从0增加到1,然后当父线程运行时,会调用sem_wait(),发现信号量值为1,于是父线程将信号量从1减到0,没有等待,直接从sem_wait()返回执行。

相关演示的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
sem_t s;
void * child(void *args){
printf("child\n");
sem_post(&s);
return NULL;
}

int main(int argc, char *argv[]) {
int X = 0;
sem_init(&s, 0, X);
printf("parent: begin\n");
pthread_t c;
pthread_create(c, NULL, child, NULL);
sem_wait(&s);
printf("parent: ned\n");
return 0;
}

生产者/消费者问题

这个问题详细描述参考上一章,可以利用二值信号量加锁进行解决。

首先用empty和full这两个信号量表示缓冲区空或者满(假设MAX=1,情况复合预期)。

当MAX > 1时,put() 和 get() 会产生竞态条件,此时用二值信号量对产生竞态的操作进行加锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
int buffer[MAX];
int fill = 0;
int use = 0;

void put(int value){
buffer[fill] = value;
fill = (fill + 1) % MAX;
}

int get() {
int tmp = buffer[use];
user = (use + 1) % MAX;
return tmp;
}


sem_t empty;
sem_t full;
sem_t mutex;

void *producer(void *arg) {
int i;
for (i=0; i < loops; i++) {
sem_wait(&empty);
sem_wait(&mutex);
put(i);
sem_post(&mutex);
sem_post(&full);
}
}

void *consumer(void *arg) {
int i;
for (i=0; i < loops; i++) {
sem_wait(&empty);
sem_wait(&mutex);
int tmp = get();
sem_post(&mutex);
sem_post(&full);
print("%d\n", tmp);
}
}


int main(int argc, char *argv[]) {
sem_inti(&empty, 0, MAX);
sem_init(&full, 0, 0);
sem_init(&mutex, 0, 1);
}

读者-写者锁

对更加灵活的锁定原语的渴望,它承认不同的数据结构范文可能需要不同类型的锁。如一个并发链表有很多插入和查找操作,查找操作只是读取该结构,只要没有进行插入操作,就可以并发的执行多个查找操作。读者-写者锁(reader-writer lock)就是用来完成这种操作的。

如果摸个线程要更新数据结构,需要调用rwlock_acquire_lock() 获得写锁,调用rwlock_release_writelock() 释放锁。内部通过一个writelock的信号量保证只有一个写者锁进入临界区,从而更新数据结构,其实现代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#include <semaphore.h>
#include <stdio.h>
#include <pthread.h>

typedef struct _rwlock_t {
sem_t lock;
sem_t writelock;
int readers;
} rwlock_t;

void rwlock_init(rwlock_t *rw) {
rw->readers = 0;
sem_init(&rw->lock, 0, 1);
sem_init(&rw->writelock, 0, 1);
}

void rwlock_acquire_readlock(rwlock_t *rw) {
sem_wait(&rw->lock);
rw->readers++;
if (rw->readers == 1) {
sem_wait(&rw->writelock);
}
sem_post(&rw->lock);
}

void rwlock_release_readlock(rwlock_t *rw){
sem_wait(&rw->lock);
rw->readers--;
if (rw->readers==0){
sem_post(&rw->writelock);
}
sem_post(&rw->lock);
}

void rwlock_acquire_writelock(rwlock_t *rw) {
sem_wait(&rw->writelock);
}

void rwlock_release_writelock(rwlock_t *rw) {
sem_post(&rw->writelock);
}

如何实现信号量

可以利用底层的同步原语锁和条件变量来实现自己的信号量,如下代码实现了一个叫Zemaphore的信号量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#include <semaphore.h>
#include <stdio.h>
#include <pthread.h>

typedef struct _Zem_t {
int value;
pthread_cond_t cond;
pthread_mutex_t lock;
} Zem_t;

void Zem_init(Zem_t *s, int value) {
s->value = value;
Cond_init(&s->cond);
Mutex_init(&s->lock);
}

void Zem_wait(Zem_t *s){
Mutex_lock(&s->lock);
while (s->value <= 0)
Cond_wait(&s->cond, &s->lock);
s->value--;
Mutex_unlock(&s->lock);
}

void Zem_post(Zem_t *s) {
Mutex_lock(&s->lock);
s->value++;
Cond_signal(&s->cond);
Mutex_unlock(&s->lock);
}

常见并发问题

在复杂的并发程序中,有死锁和非死锁这两种类型的缺陷,主要集中在四个著名的开源应用:MySQL、Apache、Mozilla、OpenOffice等。

非死锁缺陷

非死锁缺陷占了并发问题的大多数,我们主要讨论两种:违反原子性(atomicity violation)缺陷和错误顺序(order violation)缺陷。

违反原子性缺陷:违反了多次内存访问中预期可串行性,即代码段本意是原子的,但在执行中却没有强制实现原子性。

违反顺序缺陷:两个内存访问的预期顺序被打破了,即A应该在B之前执行,但是实际运行中却不是这个顺序。

死锁缺陷

死锁(deadlock)是一种在许多复杂并发系统中出现的经典问题。例如,当线程1 持有锁L1,正在等待另外一个锁L2,而线程2 持有锁L2,却在等待锁L1 释放时,死锁就产生了。

死锁发生的原因

  • 在大型代码库里,组件之间会有复杂的依赖。
  • 另一个原因是封装(encapsulation)。一般封装的模块会让软件开发更容易,一般会隐藏细节,这很容易导致死锁的发生。

死锁产生的条件

  • 互斥:线程对于需要的资源进行互斥的访问;
  • 持有并等待:线程持有了资源,同时又在等待其他的资源;
  • 非抢占式:线程获得的资源,不能被抢占。
  • 循环等待:线程之间存在一个环路,环路上每个线程都额外持有一个资源,而这个资源又是下一个线程要申请的。

死锁预防

  • 让代码不产生循环等待,即获取锁时,提供一个全序(total ordering)。但锁的全序可能很难做到,因此,偏序(partial
    ordering)可能是一种有用的方法,安排锁的获取并避免死锁。
  • 可以通过原子抢锁避免持有并等待的导致的死锁。
  • 在获取锁时,先进行尝试获取。
  • 通过强大的硬件指令,构造出不需要锁的数据结构,避免互斥问题导致的死锁。

通过调度避免死锁:了解全局信息,包括不同线程在运行中对锁的需求,从而使得后续的调度能够避免产生死锁。

允许死锁偶尔发生,检查到死锁在采取行动。

基于事件的并发

基于事件的并发(event-based concurrency)主要针对两方面的问题:

  1. 多线程应用中,正确处理并发很有难度。
  2. 开发人员无法控制多线程在某一时刻的调度。

事件循环

主循环等待某些事件的发生,然后依次处理这些发生的事件,处理事件的代码叫做时间处理程序(event handler),处理程序在处理一个事件时,它是系统中发生的唯一活动。因此,调度就是决定接下来处理哪个事件,而这个结构称为事件循环(event loop)。其伪代码如下:

1
2
3
4
5
while (1) {
events = getEvents();
for (e in events)
processEvent(e);
}

但这也带来一个更大的问题:基于事件的服务器如何决定哪个事件发生,尤其是对于网络和磁盘I/O?具体来说,事件服务器如何确定是否有它的消息已经到达?

select() 介绍

大多数操作系统提供了基本的API来解决如何接收事件的问题,如select() 或者poll() 系统调用。这些接口作用主要是:检查是否有任何应该关注的进入I/O。例如网络服务希望检查数据包是否到达。下面以select为例:

1
2
3
4
5
6
int select(int nfds,
fd_set *restrict readfds,
fd_set *restrict writefds,
fd_set *restrict errorfds,
struct timeval *restrict timeout
);

select() 会检查I/O描述符集合,它们的地址通过readfdswritefdserrorfds传入,分别查看它们中的某些描述符是否已经准备好读取,是否准备好写入,或者异常处理等。在每个集合中检查前n fds 个描述符,即检查描述符集合中从0nfds-1的描述符,返回时,select() 用给定请求操作准备好的描述符组成的子集替换成给定的描述符集合。select() 返回所有集合中就绪描述符的总数。

select() 使用

下面是一个用select() 来查看哪些网络描述符在它们上面有传入的消息的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#include <stdio.h>
#include <stdlib.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>

int main() {
// 打开一系列的sockets,未展示
// 主循环
while (1) {
fd_set readFDs;
FD_ZERO(&readFDs);

// 现在为描述符设置位
int fd;
for (fd=minFD; fd < maxFD; fd++)
FD_SET(fd, &readFDs);

// do the select
int rc = select(maxFD+1, &readFDs, NULL, NULL, NULL);

// 用FD_ISSET()检查哪个有数据
inf fd;
for (fd=minFD; fd < maxFD; fd++)
if (FD_ISSET(fd, &readFDs))
processFD(fd);
}
}

初始化完成后,服务器进入无限循环。在循环内部,它使用FD_ZERO()宏首先清除文件描述符集合,然后使用FD_SET()将所有从minFDmaxFD的文件描述符包含到集合中。例如,这组描述符可能表示服务器正在关注的所有网络套接字。最后,服务器调用select()来查看哪些连接有可用的数据。然后,通过在循环中使用FD_ISSET(),事件服务器可以查看哪些描述符已准备好数据并处理传入的数据。

异步I/O

现代操作系统引入了新的方法来向磁盘系统发出异步I/O请求(asynchronous I/O)。这些接口使得应用程序能够发出I/O请求,并在I/O完成之前立即将控制权返回给调用者,另外加的接口让应用程序能够确定各种I/O是否已经完成。

为了解决需要检查每个I/O是否完成导致的重复性问题,一些系统提供了基于中断(interrupt)的方法。此方法使用 Unix信号(signal)在异步I/O完成时通知应用程序,从而消除了重复轮询系统的需要。

信号(signal):在现在Unix变体中普遍存在,最简单的信号提供了一种与进程进行通信的方式。具体来说,可以将信号传递给应用程序。这样会让应用程序停止当前的任何工作,开始运行信号处理程序(signal handler),即应用程序中默写处理信号的代码。完成后,该进程就恢复其先前的行为。

没有异步I/O的系统中,纯粹基于事件的方法是无法实现的。

状态管理

基于事件的方法的另一个问题是,这种方法的代码比传统的基于线程的方法更加复杂。其原因如下:当时间处理程序发出异步I/O时,它必须打包一些程序状态,以便下一个时间处理程序在I/O最终完成时使用。这个额外的工作在基于线程的程序中是不需要的,因为程序需要的状态在线程栈中。手工栈管理时基于事件编程的基础。

基于事件的方法当系统从单核CPU转向多核时,它就变得更加复杂了。为了利用多核,事件服务器必须运行多个事件处理程序,发生这种情况时,就会出现常见的同步问题,并且必须采用锁的常用的解决方案。另一个问题是,它不能很好的和某些类型的系统活动集成,如分页。

使用支付宝打赏
使用微信打赏

若你觉得我的文章对你有帮助,欢迎点击上方按钮对我打赏

扫描二维码,分享此文章