锁主要用来解决数据同步的问题,即并发操作时,数据究竟如何保持我们想要的结果。
全局变量的问题如果某线程中有主体函数和中断函数,如下:
int a = 0;
void interrupt_handle()
{
a++;
}
void thread_func()
{
a++;
}
我们可以看出来其对全局变量a都进行后自加操作。
编译器处理a++的流程:
- mov eax,[a] 把a加入某个寄存器中
- add eax,1 将寄存器里的值加一
- mov [a],eax 把eax的值返回内存
那么考虑如下情况:
线程函数运行到一半(即完成12两步),此时只把eax加一了,a本身还没得到eax返回的值,eax=1,a=0
此时中断函数强行插入(完成123三步),此时eax=1,a=1
然后中断结束了线程得继续完成第三步mov [a],eax,这个时候a仍然=1,显然不对
以下表表示两个函数的发生顺序:
总之,就是t2时刻的中断,导致eax虽然加一了但是这个结果没有返回给a,对a的操作丢失了。
解决这个问题有两种思路:
- 使a++变成一个完整的操作,不能被打断,要么不执行,要么一次性执行完
- 使中断要么出现在操作前,要么出现在操作后,不要操作到一半中断
上面解决思路第一点,就是将a++变成一个原子操作。
首先明确无法让编译器来生成、判别哪些是原子操作。且硬件平台不一定支持原子操作指令。
x86 平台支持很多原子指令,我们只需要直接应用这些指令,比如原子加、原子减,原子读写等,用汇编代码写出对应的原子操作函数就行了。现代 C 语言已经支持嵌入汇编代码,可以在 C 函数中按照特定的方式嵌入汇编代码了,实现原子操作就更方便了:
先看GCC 设计了一种特有的嵌入方式,它规定了汇编代码嵌入的形式和嵌入汇编代码需要由哪几个部分组成,如下面代码所示:
__asm__ __volatile__(代码部分:输出部分列表: 输入部分列表:损坏部分列表);
看到代码模板从 asm 开始(当然也可以是 asm),紧跟着 volatile,然后是跟着一对括号,最后以分号结束。括号里大致分为 4 个部分:
- 汇编代码部分,这里是实际嵌入的汇编代码。
- 输出列表部分,让 GCC 能够处理 C 语言左值表达式与汇编代码的结合。
- 输入列表部分,也是让 GCC 能够处理 C 语言表达式、变量、常量,让它们能够输入到汇编代码中去。
- 损坏列表部分,告诉 GCC 汇编代码中用到了哪些寄存器,以便 GCC 在汇编代码运行前,生成保存它们的代码,并且在生成的汇编代码运行后,恢复它们(寄存器)的代码。
它们之间用冒号隔开,如果只有汇编代码部分,后面的冒号可以省略。但是有输入列表部分而没有输出列表部分的时候,输出列表部分的冒号就必须要写,否则 GCC 没办法判断,同样的道理对于其它部分也一样。
好,现在我们来看一下原子操作怎么实现的:
//定义一个原子类型
typedef struct s_ATOMIC{
volatile s32_t a_count; //在变量前加上volatile,是为了禁止编译器优化,使其每次都从内存中加载变量
}atomic_t;
//原子读
static inline s32_t atomic_read(const atomic_t *v)
{
//x86平台取地址处是原子
return (*(volatile u32_t*)&(v)->a_count);
}
//原子写
static inline void atomic_write(atomic_t *v, int i)
{
//x86平台把一个值写入一个地址处也是原子的
v->a_count = i;
}
//原子加上一个整数
static inline void atomic_add(int i, atomic_t *v)
{
__asm__ __volatile__("lock;" "addl %1,%0"
: "+m" (v->a_count)
: "ir" (i));
}
//原子减去一个整数
static inline void atomic_sub(int i, atomic_t *v)
{
__asm__ __volatile__("lock;" "subl %1,%0"
: "+m" (v->a_count)
: "ir" (i));
}
//原子加1
static inline void atomic_inc(atomic_t *v)
{
__asm__ __volatile__("lock;" "incl %0"
: "+m" (v->a_count));
}
//原子减1
static inline void atomic_dec(atomic_t *v)
{
__asm__ __volatile__("lock;" "decl %0"
: "+m" (v->a_count));
}
上述代码中lock 前缀的 addl、subl、incl、decl 指令都是原子操作,lock 前缀表示锁定总线。
它会使 CPU 宣告一个 LOCK# 信号,这样就能确保在多处理器系统或多线程竞争的环境下互斥地使用这个内存地址。
用上面一个函数 atomic_add 为例子说一下,如下所示。
static inline void atomic_add(int i, atomic_t *v)
{
__asm__ __volatile__("lock;" "addl %1,%0"
: "+m" (v->a_count)
: "ir" (i));
}
//"lock;" "addl %1,%0" 是汇编指令部分,%1,%0是占位符,它表示输出、输入列表中变量或表态式,占位符的数字从输出部分开始依次增加,这些变量或者表态式会被GCC处理成寄存器、内存、立即数放在指令中。
//: "+m" (v->a_count) 是输出列表部分,“+m”表示(v->a_count)和内存地址关联
//: "ir" (i) 是输入列表部分,“ir” 表示i是和立即数或者寄存器关联
有了这些原子操作函数之后 ,前面场景中的代码就变成下面这样了:无论有没有中断,或者什么时间来中断,都不会出错。
atomic_t a = {0};
void interrupt_handle()
{
atomic_inc(&a);
}
void thread_func()
{
atomic_inc(&a);
}
中断控制——单核复杂变量
首先我们明确,原子操作的适用范围仅限单体变量:
因为我们操作变量,本质上是操作寄存器,只能一个步骤一个步骤的操作,对于复杂数据结构的操作执行时间长,且难以保证原子性。x86平台的一些原子指令只支持add、inc、sub、dec等操作都是对整数进行操作。
所以需要中断的控制机制,来保证操作数据的过程中不发生数据错误。
写代码实现关闭开启、中断了,x86 CPU 上关闭、开启中断有专门的指令,即 **cli(关中断)、sti (开启中断)**指令,它们主要是对 CPU 的 eflags 寄存器的 IF 位(第 9 位)进行清除和设置,CPU 正是通过此位来决定是否响应中断信号。这两条指令只能 Ring0 权限才能执行,代码如下:
//关闭中断
void hal_cli()
{
__asm__ __volatile__("cli": : :"memory");
}
//开启中断
void hal_sti()
{
__asm__ __volatile__("sti": : :"memory");
}
//使用场景
void foo()
{
hal_cli();
//操作数据……
hal_sti();
}
void bar()
{
hal_cli();
//操作数据……
hal_sti();
}
但其实这段代码有很大的问题,hal_cli(),hal_sti(),无法嵌套使用:
void foo()
{
hal_cli();
//操作数据第一步……
hal_sti();
}
void bar()
{
hal_cli();
foo();
//操作数据第二步……
hal_sti();
}
假设我们调用bar这个函数,先关中断,然后调用foo函数,处理好了foo函数又开启了中断,回到bar函数时bar函数还以为中断是关闭的,因为他并不知道在foo这个函数里开启了中断,然后bar又开启了中断。
我们只要修改一下开启、关闭中断的函数就行了。
指令 cli(关中断)、sti(开启中断)的主要操作是对 CPU 的 eflags 寄存器的 IF 位(第9位)进行清除和设置,来达到是否响应中断的效果。 在关闭中断(cli)前先保存 eflags 寄存器(pushfl 将eflags寄存器压入当前栈顶),然后在需要开启中断的时候恢复之前保存的 eflags 寄存器(popfl 把当前栈顶的数据弹出到 eflags 寄存器中),这样就达到了开关中断的效果。 是否开启中断完全取决于上一次 eflags 寄存器中的值
我们可以这样操作:在关闭中断函数中先保存 eflags 寄存器,然后执行 cli 指令,在开启中断函数中直接恢复之前保存的 eflags 寄存器就行了(orm 框架嵌套事务,类似的处理逻辑。开启事务时候,事务层级+1, 提交事务时候 -1. 只有事务层级为0时候,才最终提交数据库),具体代码如下
typedef u32_t cpuflg_t;
static inline void hal_save_flags_cli(cpuflg_t* flags)
{
__asm__ __volatile__(
"pushfl tn" //把eflags寄存器压入当前栈顶
"cli tn" //关闭中断
"popl %0 tn"//把当前栈顶弹出到eflags为地址的内存中
: "=m"(*flags)
:
: "memory"
);
}
static inline void hal_restore_flags_sti(cpuflg_t* flags)
{
__asm__ __volatile__(
"pushl %0 tn"//把flags为地址处的值寄存器压入当前栈顶
"popfl tn" //把当前栈顶弹出到eflags寄存器中
:
: "m"(*flags)
: "memory"
);
}
自旋锁——协调多核心CPU
关中断只能保证单核机器上的原子性:
因为以前是单 CPU,同一时刻只有一条代码执行流,除了中断会中止当前代码执行流,转而运行另一条代码执行流(中断处理程序),再无其它代码执行流。这种情况下只要控制了中断,就能安全地操作全局数据。
但在多核情况下,同一时刻系统中存在多条代码执行流,中断只能控制本地核心的CPU中断,无法控制其他CPU核心的中断。
自旋锁的原理:
- 首先读取一个数据,然后判断其是否已经被加锁了
- 如果已经被加锁,回到1
- 如果未加锁,则加锁然后返回结束
要正确执行它,必需保证读锁操作、判断操作、加锁操作是原子操作。否则很明显会出现两个CPU都觉得这个变量没有加锁然后同时锁住的情况。
x86 CPU 给我们提供了一个原子交换指令,xchg,它可以让寄存器里的一个值跟内存空间中的一个值做交换。例如,让 eax=memlock,memlock=eax 这个动作是原子的,不受其它 CPU 干扰。
//自旋锁结构
typedef struct
{
volatile u32_t lock;//volatile可以防止编译器优化,保证其它代码始终从内存加载lock变量的值
} spinlock_t;
//锁初始化函数
static inline void x86_spin_lock_init(spinlock_t * lock)
{
lock->lock = 0;//锁值初始化为0是未加锁状态
}
//加锁函数
static inline void x86_spin_lock(spinlock_t * lock)
{
__asm__ __volatile__ (
"1: n"
"lock; xchg %0, %1 n"//把值为1的寄存器和lock内存中的值进行交换
"cmpl $0, %0 n" //用0和交换回来的值进行比较
"jnz 2f n" //不等于0则跳转后面2标号处运行
"jmp 3f n" //若等于0则跳转后面3标号处返回
"2: n"
"cmpl $0, %1 n"//用0和lock内存中的值进行比较
"jne 2b n"//若不等于0则跳转到前面2标号处运行继续比较
"jmp 1b n"//若等于0则跳转到前面1标号处运行,交换并加锁
"3: n" :
: "r"(1), "m"(*lock));
}
//解锁函数
static inline void x86_spin_unlock(spinlock_t * lock)
{
__asm__ __volatile__(
"movl $0, %0n"//解锁把lock内存中的值设为0就行
:
: "m"(*lock));
}
代码编号1-2是进行判断,如果判断为已经被加锁了,则跳到2循环判断(即等待其他线程解锁)直到内存m被解锁(即变为0)
其中,%0 对应 “r”(1),表示由编译器自动分配一个通用寄存器,并填入值 1,例如 mov eax,1。而 %1 对应"m"(*lock),表示 lock 是内存地址。把 1 和内存中的值进行交换,若内存中是 1,则不会影响;因为本身写入就是 1,若内存中是 0,一交换,内存中就变成了 1,即加锁成功。
自旋锁依然有中断嵌套问题,所以仍要注意中断(假设某CPU获取了自旋锁,然后产生了中断,中断的代码里也尝试获取这个自旋锁,那么就是死循环)
需要在处理中断的过程中也能使用,如下所示。实现了关中断下获取自旋锁,以及恢复中断状态释放自旋锁。在中断环境下也完美地解决了问题。
static inline void x86_spin_lock_disable_irq(spinlock_t * lock,cpuflg_t* flags)
{
__asm__ __volatile__(
"pushfq nt"
"cli nt"
"popq %0 nt"
"1: nt"
"lock; xchg %1, %2 nt"
"cmpl $0,%1 nt"
"jnz 2f nt"
"jmp 3f n"
"2: nt"
"cmpl $0,%2 nt"
"jne 2b nt"
"jmp 1b nt"
"3: n"
:"=m"(*flags)
: "r"(1), "m"(*lock));
}
static inline void x86_spin_unlock_enabled_irq(spinlock_t* lock,cpuflg_t* flags)
{
__asm__ __volatile__(
"movl $0, %0nt"
"pushq %1 nt"
"popfq nt"
:
: "m"(*lock), "m"(*flags));
}
信号量——长时间等待
无论是原子操作还是自旋锁,都不适合长时间等待的情况。
所以需要一种同步机制,既能保证同一时刻只有一个代码流访问数据,又能在资源不够的情况下,其他CPU可以执行任务。
操作系统的理论书,应该对信号量这个词并不陌生。信号量是 1965 年荷兰学者 Edsger Dijkstra 提出的,是一种用于资源互斥或者进程间同步的机制。这里我们就来看看如何实现这一机制。
假设有这么一个场景:微信等待从键盘输入信息,然后把信息发送出去。
这个功能如何实现?
首先,需求:
- 需要一块内存来充当缓冲区,保存键盘按键码。
- 需要一套控制机制,比如读取缓冲区时,为空怎么处理;或者缓冲区有按键码的时候没有对应的执行流读取又怎么办。
所以需求明确如下:
- 等待:如果微信这个程序,读取缓冲区的时候发现为空,就进入等待;
- 互斥:同一时刻,只能有一个代码执行流操作这个缓冲区;
- 唤醒:当我们按下键盘的时候,得把按键码写入到这个缓冲区里,并且看微信是否在等待我这个缓冲区,如果是就激活微信(让其从等待中跳出);如还有其他程序竞争我这个缓冲区,得让竞争失败的一方继续等待。
所以三大问题是:等待,互斥,唤醒
这需要一种全新的数据结构来解决这个问题。信号量数据结构内至少需要一个变量来表示互斥,比如这个变量大于0那你这个程序执行流可以运行,反之则等待。等待还存在竞争关系,所以需要一个等待链数据结构,保存等待的代码执行流。
这个数据结构的实现代码如下所示。
这里假定信号量数据结构中的 sem_count 初始化为 1,sem_waitlst 等待链初始化为空。
#define SEM_FLG_MUTEX 0
#define SEM_FLG_MULTI 1
#define SEM_MUTEX_ONE_LOCK 1
#define SEM_MULTI_LOCK 0
//等待链数据结构,用于挂载等待代码执行流(线程)的结构,里面有用于挂载代码执行流的链表和计数器变量,这里我们先不深入研究这个数据结构。
typedef struct s_KWLST
{
spinlock_t wl_lock;
uint_t wl_tdnr;
list_h_t wl_list;
}kwlst_t;
//信号量数据结构
typedef struct s_SEM
{
spinlock_t sem_lock;//维护sem_t自身数据的自旋锁
uint_t sem_flg;//信号量相关的标志
sint_t sem_count;//信号量计数值
kwlst_t sem_waitlst;//用于挂载等待代码执行流(线程)结构
}sem_t;
信号量数据结构中:
- 需要一个维护信号量自身数据的一个自旋锁
- 信号量需要一个标志位和一个数值位
- 需要挂载等待链
那么这个等待链的数据结构:
- 首先肯定也需要一个自旋锁
- 标志位和计数器。
刚才的例子如果不是很清楚,我们可以看打印机的例子:
假设有三个进程打算调用打印机。很明显无法同时打印,假设进程1抢到了信号。
在进程1抢到信号之前,信号量是1.进程1抢到了信号量之后:
先加锁,保证同时没有其他CPU或者中断来改变这个信号量。加完锁了之后,将这个信号量减一。就变成0.
这时检测信号量大于等于0,可以关闭中断,继续执行。
如果进程1被中断了,也不用担心其他进程会调用:因为进程2从等待状态尝试调用打印机,也需要加自身自旋锁,关中断,这个时候进程2会发现信号量值为-1,检测信号量小于0了,就不能执行打印,将其挂入等待链表。进程3类似。
如果进程1继续执行了,等他执行完就可以把信号量加一。这一信号量又回到了初始的1.让剩下的进程23来抢占这个信号量。
所以按照上述流程,信号量的适用步骤如下:
1.获取信号量
- 对用于保护信号量自身的自旋锁 sem_lock 进行加锁;
- 对信号值“减一”。检测现在信号值sem_count是否大于等于0;
- 如果大于等于0: 表示获取信号量成功;
- 如果小于0,就不能执行代码流,需要让进程进入等待链等待,等信号量被放出之后再与其他进程抢占这个信号量。
- 最后释放自旋锁sem_lock解锁。
2.抢到信号量的进程代码执行流开始执行相关操作,比如从缓冲区读取键盘输入的数据,比如控制打印机硬件进行打印操作。
3.释放信号量
- 释放和获取一样,首先一定要对用于保护信号量自身的自旋锁sem_lock 进行加锁;
- 对信号值加一,然后检测器是否大于0;
- 不管检测是否大于0,都会标记信号量释放成功。但是如果大于0(通常会大于0),我们要唤醒其他等待链中的进程。如果小于1则出现错误,需要挂起系统。
- 最后释放自旋锁sem_lock。
所以获取和释放信号量代码按下:信号量有两个操作:down、up
//获取信号量
void krlsem_down(sem_t* sem)
{
cpuflg_t cpufg;
start_step:
krlspinlock_cli(&sem->sem_lock,&cpufg);
if(sem->sem_count<1)
{//如果信号量值小于1,则让代码执行流(线程)睡眠
krlwlst_wait(&sem->sem_waitlst);
krlspinunlock_sti(&sem->sem_lock,&cpufg);
krlschedul();//切换代码执行流,下次恢复执行时依然从下一行开始执行,所以要goto开始处重新获取信号量
goto start_step;
}
sem->sem_count--;//信号量值减1,表示成功获取信号量
krlspinunlock_sti(&sem->sem_lock,&cpufg);
return;
}
//释放信号量
void krlsem_up(sem_t* sem)
{
cpuflg_t cpufg;
krlspinlock_cli(&sem->sem_lock,&cpufg);
sem->sem_count++;//释放信号量
if(sem->sem_count<1)
{//如果小于1,则说数据结构出错了,挂起系统
krlspinunlock_sti(&sem->sem_lock,&cpufg);
hal_sysdie("sem up err");
}
//唤醒该信号量上所有等待的代码执行流(线程)
krlwlst_allup(&sem->sem_waitlst);
krlspinunlock_sti(&sem->sem_lock,&cpufg);
krlsched_set_schedflgs();
return;
}
上述代码中的 krlspinlock_cli,krlspinunlock_sti 两个函数,只是对前面自旋锁函数的一个封装,其余函数后面再说。
总结- 原子操作:适用于单个全局变量,定义了原子变量以及原子读、写、加减。
- 中断控制:当多个变量需要操作时就不适合用原子变量了,单核状态下不会有其他条件干扰,所以操作全局数据时关闭中断,处理完了再开启中断。由此避免中断信号对数据的影响。
- 自旋锁:多核CPU出现时,控制中断不适用了。因为系统中同时有多个CPU的多个代码流在执行。由此我们开发了自旋锁,要么一下子获取锁,加锁然后返回。要么就一直等着直到获取锁。
- 信号量:对于长时间等待数据信号的问题,信号量才能解决。定义信号量数据结构和等待链数据结构。信号量的核心三点:等待。互斥。唤醒。使用信号量也是三点:获取信号量,执行代码流,释放信号量。



