1:Operating System Overview

1 xv6系统的启动过程:

1.1xv6引导器

当x86系列的PC机启动的时候,首先会执行BIOS程序,BIOS程序一般会存放在固定的ROM中,一般在磁盘固定扇区中.BIOS 的作用是在启动时进行硬件的准备工作,接着BIOS程序会把控制权递交给操作系统.具体来说,BIOS会把控制权递交给从引导扇区中的固定的代码中(BIOS会把引导扇区存储的代码加载到内存0x7c00处),接着引导程序会把操作系统内核载入到内存中,控制权递交给内核,程序是M态的.

在xv6系统,引导程序由汇编引导程序和代码引导程序.

1.2 内核态进入用户态

阅读kernel.asm(内核整体的代码)

1
2
3
4
5
6
7
8
9
10
11
Disassembly of section .text:

0000000080000000 <_entry>:
80000000:00009117 auipcsp,0x9
80000004:86013103 ldsp,-1952(sp) # 80008860 <_GLOBAL_OFFSET_TABLE_+0x8>
80000008:6505 luia0,0x1
8000000a:f14025f3 csrra1,mhartid
8000000e:0585 addia1,a1,1
80000010:02b50533 mula0,a0,a1
80000014:912a addsp,sp,a0
80000016:652050ef jalra,80005668 <start>

我们看到了_entry这个标签,也就是说内核是从_entry开始运行的,那我们首先查看一下entry.S的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
\# qemu -kernel loads the kernel at 0x80000000
# and causes each CPU to jump there.
# kernel.ld causes the following code to
# be placed at 0x80000000.
.section .text
.global _entry
_entry:
# set up a stack for C.
# stack0 is declared in start.c,
# with a 4096-byte stack per CPU.
# sp = stack0 + (hartid \* 4096)
la sp, stack0
li a0, 1024\*4
csrr a1, mhartid
addi a1, a1, 1
mul a0, a0, a1
add sp, sp, a0
# jump to start() in start.c
call start

引导程序会把内存载入到0x80000000这个地址是因为在0~0x80000000这个地址范围内还有I/O设备等(还有程序的逻辑地址)

entry.S开始设置了一个栈,栈的带下是1024*4=4KB,其中mhartid是运行当前程序的CPU核的ID,那么第i个核的栈地址空间就分配到stack+(hartid)*4096~stack+(hartid+1)*4096这个范围内.

因为这个操作系统是运行在多核的RISC-V操作系统上,由多个核同时访问一个内存空间,所以说每个核的CPU只在允许的内存空间中执行代码.其中每个核的寄存器又是不一样的,所以说可以修改每个核的sp寄存器来区分不同的核的代码运行空间.

在entry.S执行完操作之后,根据汇编代码,程序会跳转到start的这个函数中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// entry.S jumps here in machine mode on stack0.
void start()
{
// set M Previous Privilege mode to Supervisor, for mret.
unsigned long x = r_mstatus();
x &= ~MSTATUS_MPP_MASK;
x = MSTATUS_MPP_S;
w_mstatus(x);

// set M Exception Program Counter to main, for mret.
// requires gcc -mcmodel=medany
w_mepc((uint64)main);

// disable paging for now.
w_satp(0);

// delegate all interrupts and exceptions to supervisor mode.
w_medeleg(0xffff);
w_mideleg(0xffff);
w_sie(r_sie() SIE_SEIE SIE_STIE SIE_SSIE);

// configure Physical Memory Protection to give supervisor mode
// access to all of physical memory.
w_pmpaddr0(0x3fffffffffffffull);
w_pmpcfg0(0xf);

// ask for clock interrupts.
timerinit();

// keep each CPU's hartid in its tp register, for cpuid().
int id = r_mhartid();
w_tp(id);

// switch to supervisor mode and jump to main().
asm volatile("mret");
}

在start函数中我们还是会执行在M态执行的操作:

首先会执行mepc寄存器的操作,改变了mepc寄存器就相当于改变了断点值.这个寄存器相当于S态中断进入到M态的时候,触发中断的PC,当M态回到S态的时候,就继续从断点处执行.当然这个操作还制定了M态比S态更加接近于内核.

接着下一步就是取消分页,在这一部分,虚拟地址是和实际的物理地址是一一对应的.

再下一步,将所有的中断委托给S态进行处理.

再下一步,指定程序允许的物理地址,在S态我们允许访问所有的物理地址

在下一步,对时钟芯片编程以产生计时器中断.

再下一步,取CPU的核id

最后一步,返回到main()函数,执行mret指令.

操作系统接着就会进入main函数,main函数主要初始化设备和一些子系统,然后调用userinit()函数来生成第一个进程,第一个进程只会运行很基础的程序,这个程序再initcode.S中已经声明.(这个时候已经进入U态了)

1
2
3
4
5
6
7
8
9
10
11
12
\# Initial process that execs /init.
# This code runs in user space.

#include "syscall.h"

# exec(init, argv)
.globl start
start:
la a0, init
la a1, argv
li a7, SYS_exec
ecall

系统调用的参数是a0a7,其中a0a6代表argv等,a7代表具体执行什么系统调用.

这个时候系统会执行init程序,init程序这个时候就会加载sh程序,并且会初始化一个新的文件描述器(你可以认为是标准的I/O输入输出,控制台输出,因为Unix类型的系统会把设备当成文件来看),然后执行Shell程序,系统开始执行

2 操作系统接口与系统调用

这个操作系统没有图形化界面,目前只能执行基本的键盘命令.

操作系统通过接口向用户程序提供服务。设计一个好的接口实际上是很难的。一方面我们希望接口设计得简单和精准,使其易于正确地实现;另一方面,我们可能忍不住想为应用提供一些更加复杂的功能。解决这种矛盾的办法是让接口的设计依赖于少量的_机制_ (mechanism),而通过这些机制的组合提供强大、通用的功能。

xv6提供 Unix 操作系统中的基本接口(由 Ken Thompson 和 Dennis Ritchie 引入),同时模仿 Unix 的内部设计。Unix 里机制结合良好的窄接口提供了令人吃惊的通用性。这样的接口设计非常成功,使得包括 BSD,Linux,Mac OS X,Solaris (甚至 Microsoft Windows 在某种程度上)都有类似 Unix 的接口。理解 xv6 是理解这些操作系统的一个良好起点。

xv6 使用了传统的内核概念 – 一个向其他运行中程序提供服务的特殊程序。每一个运行中程序(称之为进程)都拥有包含指令、数据、栈的内存空间。指令实现了程序的运算,数据是用于运算过程的变量,栈管理了程序的过程调用。

听上去很绕是吧,我们可以简单地理解,就是操作系统类似于高速公路的服务区,程序就是司机,司机在高速公路上开车相当于程序的正常执行,进入高速公路服务区司机就不会继续开车了,程序暂停执行.所以说程序获得操作系统提供的服务是通过中断的方式获得的.这个中断可以简称访管中断.

程序想获得操作系统的服务,先通过访管中断进入中断处理程序,这个时候就进入到了S态,在中断处理程序中根据某种特殊寄存器的值跳转到特殊的地址执行特殊的程序,这种特殊的程序叫做系统调用.因为用户态的程序权限有限,所以说要向操作系统提获取给更高的权限.只能通过中断的方式获取.

总得来说,进程通过系统调用使用内核服务。系统调用会进入内核,让内核执行服务然后返回。所以进程总是在用户空间和内核空间之间交替运行。

3 更多的基础知识

3.1 内核的组成

RISC-V的CPU主要分成三个态,操作系统会在三个态中穿插进行.

其中M态是机器态,在M态的操作系统有最高的权限,最高的优先级,可以执行所有指令,但是操作系统一般只在刚开始启动的时候是M态,在执行了一段初始化代码后就会降低到S态.

操作系统的内核一般是在S态进行运行,在S态,我们可以执行所有的指令,包括一部分特权指令,特权指令不知道的回去翻一下操作系统书.

在操作系统的空间划分中,我们一般划分内核态和用户态空间,在S态的时候,所有的程序和堆栈都是在内核态空间的,在U态的时候,所有的程序和堆栈都是在用户态空间的.

综上所述,内核主要由内核态空间和一些系统调用组成.这种内核一般称为monolithic kernel.

对于另外一种microkernel的操作系统,它们会把一部分应该在S态运行的代码下放到U态防止出现问题,这个叫做微内核.

3.2 程序的逻辑地址

程序空间主要由基本的代码和数据,栈,堆,栈帧组成.其中栈帧保存了当前程序执行的时候一些基本的寄存器、断点信息、页表信息和CPU信息.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
struct trapframe {
/\* 0 \*/ uint64 kernel_satp; // kernel page table
/\* 8 \*/ uint64 kernel_sp; // top of process's kernel stack
/\* 16 \*/ uint64 kernel_trap; // usertrap()
/\* 24 \*/ uint64 epc; // saved user program counter
/\* 32 \*/ uint64 kernel_hartid; // saved kernel tp
/\* 40 \*/ uint64 ra;
/\* 48 \*/ uint64 sp;
/\* 56 \*/ uint64 gp;
/\* 64 \*/ uint64 tp;
/\* 72 \*/ uint64 t0;
/\* 80 \*/ uint64 t1;
/\* 88 \*/ uint64 t2;
/\* 96 \*/ uint64 s0;
/\* 104 \*/ uint64 s1;
/\* 112 \*/ uint64 a0;
/\* 120 \*/ uint64 a1;
/\* 128 \*/ uint64 a2;
/\* 136 \*/ uint64 a3;
/\* 144 \*/ uint64 a4;
/\* 152 \*/ uint64 a5;
/\* 160 \*/ uint64 a6;
/\* 168 \*/ uint64 a7;
/\* 176 \*/ uint64 s2;
/\* 184 \*/ uint64 s3;
/\* 192 \*/ uint64 s4;
/\* 200 \*/ uint64 s5;
/\* 208 \*/ uint64 s6;
/\* 216 \*/ uint64 s7;
/\* 224 \*/ uint64 s8;
/\* 232 \*/ uint64 s9;
/\* 240 \*/ uint64 s10;
/\* 248 \*/ uint64 s11;
/\* 256 \*/ uint64 t3;
/\* 264 \*/ uint64 t4;
/\* 272 \*/ uint64 t5;
/\* 280 \*/ uint64 t6;
};

3.3 进程

进程就是运行的代码,进程可以通过调用ecall指令来进入到S态.其中进入到S态的何处就是有S态进行定义的.S态也可以调用sret指令回到断点处继续执行指令.

下面给顶了进程的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
struct proc {
struct spinlock lock;

// p->lock must be held when using these:
enum procstate state; // Process state
void \*chan; // If non-zero, sleeping on chan
int killed; // If non-zero, have been killed
int xstate; // Exit status to be returned to parent's wait
int pid; // Process ID

// wait_lock must be held when using this:
struct proc \*parent; // Parent process

// these are private to the process, so p->lock need not be held.
uint64 kstack; // Virtual address of kernel stack
uint64 sz; // Size of process memory (bytes)
pagetable_t pagetable; // User page table
struct trapframe \*trapframe; // data page for trampoline.S
struct context context; // swtch() here to run process
struct file \*ofile[NOFILE]; // Open files
struct inode \*cwd; // Current directory
char name[16]; // Process name (debugging)
};

其中trapframe就是栈帧,pagetable就是页表.(可以复习一下OS关于内存的论述),state用于描述进程的状态.然后chan我猜测是进程等待队列的下一个元素.file就是打开的文件,cwd就是当前的目录,kstack是内核栈的位置,parent就是父进程.

进程之间是用数组来进行组合的.

在这一节你需要知道的是:

每一个进程都有一个页表来标记va和pa,不同的进程靠不同的页表来物理地相互阻隔,由于页表不同,不同的进程访问相同的虚拟地址,却是访问不同的物理地址,达到了物理的阻塞.

每一个进程再一定的时间会被其他的进程打断,这个时候CPU停止对于这个进程的执行,转而执行其他的进程.

每个进程都有两个栈,在U态处理的时候访问用户栈,在S态处理的时候访问内核栈.当进程被打断的时候信息会存储在用户态栈中.

总之,一个进程有控制流,什么时候控制CPU,还有数据流,对于内存和栈的访问.

2:Trap&Syscall

中断和系统调用

在RISC-V中有三种事件会使得CPU放弃对当前执行的程序的运行转而去处理这些事件.

  • 系统调用,当当前程序执行ecall指令的时候
  • 异常:指令的执行出现问题,比如说除0等.内部
  • 中断:当设备传来需要中断的信号.外部

我们首先先注意到一点就是CPU进入到中断然后从中断中恢复,程序本身是不可查的,也就是说程序并不知道它被中断了,犹如做了一个梦一样,这是非常重要的,就是怎么进入中断,怎么样从中断回来.

有了这么一个基本的要求,我们可以得到大概的处理思路,基本上来说就是中断首先要进入内核的状态进行处理.并且可以分成4步:RISC-V的CPU首先在硬件层面上作出一些反应,接着就是执行一段汇编代码来进入到内核状态.进入到内核状态后就是一段中断例程,这个程序是所有中断共享的,然后再根据中断的类型不同再进入到不同的中断处理的程序.

对于中断我们又可以分成三类,对于这三类有不同的做法,分别是内核态中断,用户态中断和时钟中断.对于处理中断的程序,我们一般称为handler.

RISC-V硬件

首先RISC-V有几个处理中断的硬件结构:

  • stvec寄存器:存储中断处理程序(例程)的第一条指令,当中断发生的时候RISC-V的CPU会跳转到stvec寄存器对应的地址.这个寄存器也叫中断入口寄存器
  • sepc寄存器:当中断发生的时候RISC-V CPU会保存当前PC寄存器的值在sepc中.
  • scause寄存器:表示中断的原因和来源,为什么会发生此中断.
  • sscratch寄存器:内核会放一个值在这里,这一个值对于中断程序的开始很有用.
  • sstatus寄存器:设置中断屏蔽的寄存器.

上述寄存器在U状态下不可读写.并且上述的寄存器还有一个M开头的版本,用于处理M模式下的中断.对于每一个CPU都有一套寄存器来管理程序运行.

那么硬件具体会做什么呢;

  1. 如果当前中断是设备中断,并且sstatus寄存器内设置了屏蔽,就不做任何事.
  2. 设置sstatus寄存器的值,屏蔽中断.
  3. 把当前PC寄存器的值copy给sepc寄存器.
  4. 保存当前的模式,在sstatus寄存器.
  5. 设置scause,保存中断的原因.
  6. 设置当前状态为S态.
  7. 把stvec寄存器的值给PC.
  8. 转而执行PC寄存器对应的指令.

用户态的中断

这里讲述了当执行用户态的代码的时候会发生什么.

当用户段代码出现了中断现象的时候,首先就会执行uservec->usertrap->中断处理 ->usertrapret->userret.

对于RISC-V的处理中,主要是内核态空间和用户态空间都维持了页表,但是RISC-V的硬件并没有在中断发生的时候在硬件的层面上更换页表,所以说xv6操作系统需要在处理中断的时候把页表替换成内核的页表,并且这个内核的页表可以与stvec寄存器的值对应,不会发生缺页中断.

xv6的解决之道就是添加一个trampoline页,trampoline就是以个特殊的页,这个页包含了uservec和userret两部分,并且这个页存在于所有进程的页表,自然也存在于内核态空间下的页表.并且这个页是分配在虚拟地址空间的最后一个部分,所以说很难与用户进程发生冲突.

这个trampoline页存在于任何一个进程和内核的页表,并且映射的虚拟地址都是一样的,定义在TRAMPOLINE这个C语言宏中.并且stvec这个寄存器存储的地址,就指向trampoline这个页的uservec这个部分,所以说当用户态发生中断的时候,RISC-V硬件处理完之后就可以立刻转化成内核态然后接着运行.因为U态和S态的页表是部分一样的,起码对于trampoline的记录是一样的

由于stvec寄存器存储了userret的地址,所以中断一开始的时候会进入uservec这个部分执行.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
uservec:    
#
# trap.c sets stvec to point here, so
# traps from user space start here,
# in supervisor mode, but with a
# user page table.
#
# sscratch points to where the process's p->trapframe is
# mapped into user space, at TRAPFRAME.
#

# swap a0 and sscratch
# so that a0 is TRAPFRAME
csrrw a0, sscratch, a0

# save the user registers in TRAPFRAME
sd ra, 40(a0)
sd sp, 48(a0)
sd gp, 56(a0)
sd tp, 64(a0)
sd t0, 72(a0)
sd t1, 80(a0)
sd t2, 88(a0)
sd s0, 96(a0)
sd s1, 104(a0)
sd a1, 120(a0)
sd a2, 128(a0)
sd a3, 136(a0)
sd a4, 144(a0)
sd a5, 152(a0)
sd a6, 160(a0)
sd a7, 168(a0)
sd s2, 176(a0)
sd s3, 184(a0)
sd s4, 192(a0)
sd s5, 200(a0)
sd s6, 208(a0)
sd s7, 216(a0)
sd s8, 224(a0)
sd s9, 232(a0)
sd s10, 240(a0)
sd s11, 248(a0)
sd t3, 256(a0)
sd t4, 264(a0)
sd t5, 272(a0)
sd t6, 280(a0)

# save the user a0 in p->trapframe->a0
csrr t0, sscratch
sd t0, 112(a0)

# restore kernel stack pointer from p->trapframe->kernel_sp
ld sp, 8(a0)

# make tp hold the current hartid, from p->trapframe->kernel_hartid
ld tp, 32(a0)

# load the address of usertrap(), p->trapframe->kernel_trap
ld t0, 16(a0)

# restore kernel page table from p->trapframe->kernel_satp
ld t1, 0(a0)
csrw satp, t1
sfence.vma zero, zero

# a0 is no longer valid, since the kernel page
# table does not specially map p->tf.

# jump to usertrap(), which does not return
jr t0

对于中断操作,我们知道我们得把所有的寄存器存放到内存中,但是对于RISC-V的汇编语言,我们还得有一个寄存器来存储应该访问的内存的地址.但是通用寄存器都已经失去了作用了,所以说RISC-V提供一个寄存器叫做sscratch寄存器,这个时候就可以把a0先暂时存储到sscratch寄存器中,然后再把a0从sscratch寄存器中取出.在这里这个寄存器主要是存放了栈帧的首地址,新的栈帧就会存放在sscratch表示的地址中,在原文中提到,在返回到U态时,内核通过设置sscratch寄存器来制定下一次中断时栈帧的地址.

对于栈帧的处理同样需要页表,在xv6系统中,对于每一个进程我们都会申请一个trapframe页,这个页的虚拟地址永远指定在TRAPFRAME这个地方上.栈帧元素的一系列初始化都是在进程创建的时候都已经保存好了.其实所有进程都会有一个栈帧,并且栈帧的虚拟地址是一样的,但是虚拟地址是一样的由于每个进程的页表又不是一样的,所以说对应的物理地址是不一样的.

那对于内核态的代码,我们不能通过TRAPFRAME这个虚拟地址来访问进程的栈帧结构,那么我们应该怎么办呢?

我们看到p->trapframe的构造过程.首先就是p->trapframe保存的是kalloc直接分配的物理地址,p->trapframe = (struct **trapframe** \*)**kalloc**(),直接保存的物理地址.

对于每一个进程,首先要申请一个页面,然后把这个页面的物理地址保存到p->trapframe这个结构中,接着每个进程都要调用proc_pagetable函数,执行`mappages(pagetable, TRAPFRAMEPGSIZE, (uint64)(p->trapframe), PTE_R  PTE_W),把这个物理地址映射到TRAPFRAME这个va中.

所以说对于每个进程,在用户态访问trapframe都是访问TRAPFRAME这个va,由于每个进程的页表映射不同导致最后的实际物理地址不同.

综上所述:内核使用p->trapframe保存的物理地址访问栈帧结构,所有的用户态程序使用TRAPFRAME这个同样的虚拟地址访问栈帧结构,但是由于页表不同导致访问的实际物理地址不一样.

最后就是进程进入到内核态,访问p->trapframe就是物理地址,就不会访问TRAPFRAME这个虚拟地址

由于栈帧已经保存好了内核栈的地址,内核页表的地址,以及CPU的核号,所以说接下来的操作就是读取栈帧,读取内核栈地址,内核页表的地址以及下一个trap函数的入口地址.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
void
usertrap(void)
{
int which_dev = 0;

if((r_sstatus() & SSTATUS_SPP) != 0)
panic("usertrap: not from user mode");

// send interrupts and exceptions to kerneltrap(),
// since we're now in the kernel.
w_stvec((uint64)kernelvec);

struct proc \*p = myproc();

// save user program counter.
p->trapframe->epc = r_sepc();

if(r_scause() == 8){
// system call

if(p->killed)
exit(-1);

// sepc points to the ecall instruction,
// but we want to return to the next instruction.
p->trapframe->epc += 4;

// an interrupt will change sstatus &c registers,
// so don't enable until done with those registers.
intr_on();

syscall();
} else if((which_dev = devintr()) != 0){
// ok
} else {
printf("usertrap(): unexpected scause %p pid=%d\\n", r_scause(), p->pid);
printf(" sepc=%p stval=%p\\n", r_sepc(), r_stval());
p->killed = 1;
}

if(p->killed)
exit(-1);

// give up the CPU if this is a timer interrupt.
if(which_dev == 2)
yield();

usertrapret();
}

接着调用trap.c()中的usertrap函数,这个时候就已经进入内核态了,首先第一步就是对stvec寄存器进行修改,因为对于用户态和内核态发生中断,进入的中断程序还是不一样的,然后接着在trapframe里面保存sepc寄存器(就是中断的断点),因为有可能调用yield(),所以说保存断点非常有必要.如果trap是syscall的话,接着就调用syscall函数即可,如果是设备故障的话,就先保存设备的编号,如果不是设备中断的话就是指令的异常这个时候就退出就可以了.如果是时钟中断(which_dev==2)就处理一下.

这个就是中断处理,对于不同类型的中断有不同的处理,处理完之后就要返回U态了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
void
usertrapret(void)
{
struct proc \*p = myproc();

// we're about to switch the destination of traps from
// kerneltrap() to usertrap(), so turn off interrupts until
// we're back in user space, where usertrap() is correct.
intr_off();

// send syscalls, interrupts, and exceptions to trampoline.S
w_stvec(TRAMPOLINE + (uservec - trampoline));

// set up trapframe values that uservec will need when
// the process next re-enters the kernel.
p->trapframe->kernel_satp = r_satp(); // kernel page table
p->trapframe->kernel_sp = p->kstack + PGSIZE; // process's kernel stack
p->trapframe->kernel_trap = (uint64)usertrap;
p->trapframe->kernel_hartid = r_tp(); // hartid for cpuid()

// set up the registers that trampoline.S's sret will use
// to get to user space.

// set S Previous Privilege mode to User.
unsigned long x = r_sstatus();
x &= ~SSTATUS_SPP; // clear SPP to 0 for user mode
x = SSTATUS_SPIE; // enable interrupts in user mode
w_sstatus(x);

// set S Exception Program Counter to the saved user pc.
w_sepc(p->trapframe->epc);

// tell trampoline.S the user page table to switch to.
uint64 satp = MAKE_SATP(p->pagetable);

// jump to trampoline.S at the top of memory, which
// switches to the user page table, restores user registers,
// and switches to user mode with sret.
uint64 fn = TRAMPOLINE + (userret - trampoline);
((void (\*)(uint64,uint64))fn)(TRAPFRAME, satp);
}

首先第一步就是调用usertrapret函数,这个函数首先第一步就是做stvec寄存器的写入,回忆一下:一开始在进入内核的时候为了防止内核出现中断就把stvec寄存器改成kerbelvec,现在要返回U态了就把中断入口改成uservec即可.然后就是处理栈帧了,把内核页表地址,内核栈和usertrap地址,CPU核号保存进去.接着就是改变status寄存器的数值,改成用户态的寄存器,然后调取断点地址,把断点地址写到sepc寄存器里面(这样子就是进入内核态保存用户态断点,退出内核态的时候把断点进行加载,防止内核态也出现中断),接着切换页表,切换到用户态的页表,然后接着跳转到userret函数中.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
userret:
# userret(TRAPFRAME, pagetable)
# switch from kernel to user.
# usertrapret() calls here.
# a0: TRAPFRAME, in user page table.
# a1: user page table, for satp.

# switch to the user page table.
csrw satp, a1
sfence.vma zero, zero

# put the saved user a0 in sscratch, so we
# can swap it with our a0 (TRAPFRAME) in the last step.
ld t0, 112(a0)
csrw sscratch, t0

# restore all but a0 from TRAPFRAME
ld ra, 40(a0)
ld sp, 48(a0)
ld gp, 56(a0)
ld tp, 64(a0)
ld t0, 72(a0)
ld t1, 80(a0)
ld t2, 88(a0)
ld s0, 96(a0)
ld s1, 104(a0)
ld a1, 120(a0)
ld a2, 128(a0)
ld a3, 136(a0)
ld a4, 144(a0)
ld a5, 152(a0)
ld a6, 160(a0)
ld a7, 168(a0)
ld s2, 176(a0)
ld s3, 184(a0)
ld s4, 192(a0)
ld s5, 200(a0)
ld s6, 208(a0)
ld s7, 216(a0)
ld s8, 224(a0)
ld s9, 232(a0)
ld s10, 240(a0)
ld s11, 248(a0)
ld t3, 256(a0)
ld t4, 264(a0)
ld t5, 272(a0)
ld t6, 280(a0)

# restore user a0, and save TRAPFRAME in sscratch
csrrw a0, sscratch, a0

# return to user mode and user pc.
# usertrapret() set up sstatus and sepc.
sret

这个时候会进行函数调用,进入userret这个函数之前,TRAPFRAME作为第一个参数,第二个参数就是用户态页表的地址,首先第一步就是加载用户态页表(处理逻辑:先获得satp,再加载到a1寄存器,接着取出来),接着就是把栈帧中存储的寄存器值全部加载到真实的寄存器中,最后一步就是把栈帧头部的虚拟地址保存到sscratch寄存器,下一次执行中断操作的时候就可以直接读取sscratch寄存器的内容确定栈帧的地址.

最后执行sret,把sepc寄存器的内容给pc,转换为U态,中断结束

总结下来: 导出保存在寄存器的栈帧首虚拟地址->把寄存器保存到trapframe中->加载内核态页表->存储断点->执行中断处理->加载断点->加载用户态页表->把trapframe的内容加载到真实的寄存器->把栈帧首地址放入寄存器中.

其实内核可以修改trapframe中的寄存器值,在中断结束后再把栈帧的值加载到真实的寄存器中.

调用系统函数.

我们接着第二章来说,在执行userinit函数之后,就执行initcode.S

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#include "syscall.h"

# exec(init, argv)
.globl start
start:
la a0, init
la a1, argv
li a7, SYS_exec
ecall

# for(;;) exit();
exit:
li a7, SYS_exit
ecall
jal exit

# char init[] = "/init\\0";
init:
.string "/init\\0"

# char \*argv[] = { init, 0 };
.p2align 2
argv:
.long init
.long 0

这是一个标准的调用系统调用的样本,a0~a6存储系统调用需要的参数,a7传递了系统调用号,表示执行何种系统调用,传递完参数后就执行ecall.ecall是一个硬件指令,会把状态调整为S态然后执行uservec函数,接着就是我们熟知的trap处理函数.

在syscall()函数中,我们可以知道这个函数根据a7寄存器表示的系统调用号来找到函数指针然后进行调用,这里构思很巧妙,就是构建系统调用函数指针来进行跳转.

接着返回的时候就把返回值传递给a0寄存器.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
static uint64 (\*syscalls[])(void) = {
[SYS_fork] sys_fork,
[SYS_exit] sys_exit,
[SYS_wait] sys_wait,
[SYS_pipe] sys_pipe,
[SYS_read] sys_read,
[SYS_kill] sys_kill,
[SYS_exec] sys_exec,
[SYS_fstat] sys_fstat,
[SYS_chdir] sys_chdir,
[SYS_dup] sys_dup,
[SYS_getpid] sys_getpid,
[SYS_sbrk] sys_sbrk,
[SYS_sleep] sys_sleep,
[SYS_uptime] sys_uptime,
[SYS_open] sys_open,
[SYS_write] sys_write,
[SYS_mknod] sys_mknod,
[SYS_unlink] sys_unlink,
[SYS_link] sys_link,
[SYS_mkdir] sys_mkdir,
[SYS_close] sys_close,
};
//系统调用号,系统调用函数
void
syscall(void)
{
int num;
struct proc \*p = myproc();

num = p->trapframe->a7;
if(num > 0 && num < NELEM(syscalls) && syscalls[num]) {
p->trapframe->a0 = syscalls[num]();
} else {
printf("%d %s: unknown sys call %d\\n",
p->pid, p->name, num);
p->trapframe->a0 = -1;
}
}

系统调用的参数.

系统调用会传递参数进入,对于RISC-V来说,朴素的思想就是把参数传递到寄存器中,然后系统调用函数读取存储在寄存器中的数据,比如说argint和atgaddr,argfd等.

对于直接传递的参数,我们可以直接读取没有大问题,但是对于传递指针的参数,我们就需要进行额外的处理,第一个问题呢就是我们不知道程序是不是友好的,有可能用户程序通过传递地址来修改内核的内存,这样就导致了不安全的情况的发生.第二个问题就是xv6的内核态和用户态页表是不一样的.

所以说xv6的做法就是对于获得字符串的函数argstr(),去构建一个新的函数fetchstr去安全地获得数据,这个函数就会调用copyinstr()函数.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
int
copyinstr(pagetable_t pagetable, char \*dst, uint64 srcva, uint64 max)
{
uint64 n, va0, pa0;
int got_null = 0;

while(got_null == 0 && max > 0){
va0 = PGROUNDDOWN(srcva);
pa0 = walkaddr(pagetable, va0);
if(pa0 == 0)
return -1;
n = PGSIZE - (srcva - va0);
if(n > max)
n = max;

char \*p = (char \*) (pa0 + (srcva - va0));
while(n > 0){
if(\*p == '\\0'){
\*dst = '\\0';
got_null = 1;
break;
} else {
\*dst = \*p;
}
--n;
--max;
p++;
dst++;
}

srcva = va0 + PGSIZE;
}
if(got_null){
return 0;
} else {
return -1;
}
}

这个函数会从pagetable这个页表对应的虚拟地址srcva处copy max字节的元素到内核页表的dst处.做法就是调用walkaddr来找到pagetable中srcva对应的物理地址,然后从这个物理地址中拷贝字节到dst中,由于现在是S态,所以说页表是内核态对应的页表,所以说提取用户态地址上的数据要查找用户态页表,然后把数据存放到内核态的地址就直接查询内核态页表就可以了.

总体的思路就是,找到这个用户态虚拟地址对应的物理地址,取出这个物理地址上的元素,给到dst(内核态虚拟地址).

内核态引发的中断

在内核态引发中断,由于stvec寄存器已经发生了改变,所以进入的中断程序已经是kernelvec这个程序了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
kernelvec:
// make room to save registers.
addi sp, sp, -256

// save the registers.
sd ra, 0(sp)
sd sp, 8(sp)
sd gp, 16(sp)
sd tp, 24(sp)
sd t0, 32(sp)
sd t1, 40(sp)
sd t2, 48(sp)
sd s0, 56(sp)
sd s1, 64(sp)
sd a0, 72(sp)
sd a1, 80(sp)
sd a2, 88(sp)
sd a3, 96(sp)
sd a4, 104(sp)
sd a5, 112(sp)
sd a6, 120(sp)
sd a7, 128(sp)
sd s2, 136(sp)
sd s3, 144(sp)
sd s4, 152(sp)
sd s5, 160(sp)
sd s6, 168(sp)
sd s7, 176(sp)
sd s8, 184(sp)
sd s9, 192(sp)
sd s10, 200(sp)
sd s11, 208(sp)
sd t3, 216(sp)
sd t4, 224(sp)
sd t5, 232(sp)
sd t6, 240(sp)

// call the C trap handler in trap.c
call kerneltrap

但是由于已经在内核态了,所以说就不需要切换堆栈,也不需要切换页表,也不需要构建trapframe了,直接把寄存器的数值存到堆栈,调用处理内核中断的函数kerneltrap.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
void 
kerneltrap()
{
int which_dev = 0;
uint64 sepc = r_sepc();
uint64 sstatus = r_sstatus();
uint64 scause = r_scause();

if((sstatus & SSTATUS_SPP) == 0)
panic("kerneltrap: not from supervisor mode");
if(intr_get() != 0)
panic("kerneltrap: interrupts enabled");

if((which_dev = devintr()) == 0){
printf("scause %p\\n", scause);
printf("sepc=%p stval=%p\\n", r_sepc(), r_stval());
panic("kerneltrap");
}

// give up the CPU if this is a timer interrupt.
if(which_dev == 2 && myproc() != 0 && myproc()->state == RUNNING)
yield();

// the yield() may have caused some traps to occur,
// so restore trap registers for use by kernelvec.S's sepc instruction.
w_sepc(sepc);
w_sstatus(sstatus);
}

kerneltrap就只用处理两种类型的中断了,分别是设备I/O和指令执行错误.在这里和usertrap其实是一样的,先获得设备号,如果设备号没有,那就是指令执行错误,打出错误信息

当然也是一样yiled()执行完了之后会导致其他进程执行,其他进程的时候会继续引发中断,所以说朴素的思想就是把sepc保存下来,再最后中断返回的时候把保存的sepc写入即可.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// restore registers.
ld ra, 0(sp)
ld sp, 8(sp)
ld gp, 16(sp)
// not this, in case we moved CPUs: ld tp, 24(sp)
ld t0, 32(sp)
ld t1, 40(sp)
ld t2, 48(sp)
ld s0, 56(sp)
ld s1, 64(sp)
ld a0, 72(sp)
ld a1, 80(sp)
ld a2, 88(sp)
ld a3, 96(sp)
ld a4, 104(sp)
ld a5, 112(sp)
ld a6, 120(sp)
ld a7, 128(sp)
ld s2, 136(sp)
ld s3, 144(sp)
ld s4, 152(sp)
ld s5, 160(sp)
ld s6, 168(sp)
ld s7, 176(sp)
ld s8, 184(sp)
ld s9, 192(sp)
ld s10, 200(sp)
ld s11, 208(sp)
ld t3, 216(sp)
ld t4, 224(sp)
ld t5, 232(sp)
ld t6, 240(sp)

addi sp, sp, 256

// return to whatever we were doing in the kernel.
sret

最后返回的时候也是只需要把寄存器取出来即可.

缺页中断

当访问一个va,但是页表中没有对应的pa的时候,就会引发缺页中断,系统会处理这个缺页中断或者退出执行,或者分配一个新的页给这个进程.

3: Memory&Pagetable

分页的硬件

RISC-V的指令(包括用户态下的或者内核态下的)里面的地址操作数其实代表了虚拟地址.但是对应地,RAM或者叫做物理内存,自然也有物理地址,物理地址真实唯一地标记实际内存空间,可能RAM的第10006个区块地址就是0x10006.所以说就有页表这个东西,把指令提供的逻辑地址转化到实际内存的物理地址.

xv6会运行RISC-V支持的Sv39架构,页表是一个连接虚拟地址和实际的物理地址的一个桥梁,CPU给页表一个虚拟地址,页表会返回一个物理地址.

其中虚拟地址分成两部分,低12位就是offset,代表一个页有​大,中间的27位是index值,负责寻找到对应的表项位置,比如说如果index=5就代表要找到第五个表项.后面的25位不需要.页表的每一项对应44位的PPN和10位的flags.其中flags标记这一项的一些控制信息,PPN则和offset一块组成物理地址.虚拟地址是​的空间,而物理地址是​.

总的来说是分三步走.

  • 虚拟地址分成index和Offset两部分.
  • 找到页表中的第index项.获取其中的PPN和flags
  • PPN和虚拟地址的Offset组成物理地址.

页表给OS给操作系统提供了va和pa互换的途径,其中内存被划分成4KB的块,我们称之为页.

实际的操作可能更加复杂,SV39维护的是一个多级页表.虚拟地址转化为物理地址需要分三步走.

首先我们发现页表是三级结构,第一层页表的首地址保存在satp寄存器中,有512个表项,其中表项存储着下一级页表(第二层)的首地址.第二级页表也是由512个表项组成,其中每一个表项存着下一级页表(第三层)的首地址.第三级页表里面存储的就是对应的物理地址的PPN.

所以说va分成L2,L1,L0和Offset分成四部分.

  • 首先在第一级页表中找到第L2个表项,这样就找到第二级页表的首地址.
  • 然后在第二级页表中找到第L1个表项,这样就找到第三级页表的首地址.
  • 最后在第三级页表中找到第L0个表项,这样就能获取到PPN,然后拿PPN和offset组合在一起就可以了.
  • 如果在任何一次寻找的时候Flags显示这个页表项不可用,那么就引发缺页中断.

三级页表非常好用而别比较高效,因为一开始的时候我们不需要要那么大的空间存放页表,我们可以边运行程序遍扩充页表的大小.

但是CPU这样去访问页表需要3次访存指令.这样子访问就很慢,所以说CPU设计了一个类似于cache的东西来保存页表信息,这个表叫做TLB.CPU首先会在TLB中查找页表元素.如果TLB miss了才会调用访存操作来获取页表元素.

每一个页表都都存储了flag位,其中PTE_V存着这个页表项究竟是不是可用的.PTE_W表示指令是否可以往这个页是否可写,PTE_X表示这个页是否可执行,PTE_U表示在用户态下是否可以访问这一页.

在硬件层面上我们必须指定第一级页表的首地址,这个页表首地址存放在satp寄存器中,由于这个是CPU,所以说不同CPU的satp寄存器的值都是不一样的.我们还知道每个进程的第一级页表的首地址也是不一样的(每个进程都有不同的页表记录地址).这为每个CPU运行不同的进程提供了一句.

我们的用户程序在虚拟内存上进行读写,提供的地址也是虚拟地址,虚拟内存其实就是由许多的实际的DRAM(存储器件)组成的虚拟化而已.

内核地址空间

xv6对每个进程都维护了一份页表(每个进程都有一个页表),来表示不同进程的虚拟地址空间.当然xv6也会给内核态地址空间维护一个页表,也就是说xv6的地址空间=若干个用户态进程的地址空间+内核地址空间.

QEMU会模拟一个RAM(物理存储器),这个存储器的地址空间是0x80000000~0x86400000.在xv6系统中称为PHYSTOP.QEMU还把各种I/O设备,比如说磁盘等的地址映射到0x80000000的地址之下,xv6操作系统可以通过直接访问这些物理地址来操控这些设备(比如说访问0x10001000来访问VIRTIO disk),而不是通过访问RAM来间接地访问设备.

内核通过直接访问映射来访问RAM和上文提到的设备,也就是说程序提到的虚拟地址=物理地址.(也就是说,xv6访问内存和设备是bare linking的,物理地址就是虚拟地址,同样地,在页表中,对应的虚拟地址=物理地址).

当然内核用户状态下也有不是直接链接的比如说trampoline页(看syscall&trap一章)和内核态栈(若干个内核态栈之间有一个Guard页)不是直接连的.

如何创建一个地址空间?

所有的xv6关于地址的处理全部放在vm.c这个文件中.

最关键的就是数据结构就是pagetable_t这个数据结构,这个数据结构本质上就是一个uint64*类型的一个指针,这个代表了第一级页表的首地址.可以是用户进程页表的首地址,也可以是内核页表的首地址.

最重要的函数有walk,这个函数负责给定一个va,然后找到对应的PTE.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
pte_t \*  
walk(pagetable_t pagetable, uint64 va, int alloc)
{
 if(va >= MAXVA)
   panic("walk");

 for(int level = 2; level > 0; level--) {
   pte_t \*pte = &pagetable[PX(level, va)];
   if(\*pte & PTE_V) {
     pagetable = (pagetable_t)PTE2PA(\*pte);
  } else {
     if(!alloc  (pagetable = (pde_t\*)kalloc()) == 0)
       return 0;
     memset(pagetable, 0, PGSIZE);
     \*pte = PA2PTE(pagetable)  PTE_V;
  }
}
 return &pagetable[PX(0, va)];
}

int
mappages(pagetable_t pagetable, uint64 va, uint64 size, uint64 pa, int perm)
{
 uint64 a, last;
 pte_t \*pte;

 if(size == 0)
   panic("mappages: size");
 
 a = PGROUNDDOWN(va);
 last = PGROUNDDOWN(va + size - 1);
 for(;;){
   if((pte = walk(pagetable, a, 1)) == 0)
     return -1;
   if(\*pte & PTE_V)
     panic("mappages: remap");
   \*pte = PA2PTE(pa)  perm  PTE_V;
   if(a == last)
     break;
   a += PGSIZE;
   pa += PGSIZE;
}
 return 0;
}

void
kvmmap(pagetable_t kpgtbl, uint64 va, uint64 pa, uint64 sz, int perm)
{
 if(mappages(kpgtbl, va, sz, pa, perm) != 0)
   panic("kvmmap");
}

pagetable_t
kvmmake(void)
{
 pagetable_t kpgtbl;

 kpgtbl = (pagetable_t) kalloc();
 memset(kpgtbl, 0, PGSIZE);

 // uart registers
 kvmmap(kpgtbl, UART0, UART0, PGSIZE, PTE_R  PTE_W);

 // virtio mmio disk interface
 kvmmap(kpgtbl, VIRTIO0, VIRTIO0, PGSIZE, PTE_R  PTE_W);

 // PLIC
 kvmmap(kpgtbl, PLIC, PLIC, 0x400000, PTE_R  PTE_W);

 // map kernel text executable and read-only.
 kvmmap(kpgtbl, KERNBASE, KERNBASE, (uint64)etext-KERNBASE, PTE_R  PTE_X);

 // map kernel data and the physical RAM we'll make use of.
 kvmmap(kpgtbl, (uint64)etext, (uint64)etext, PHYSTOP-(uint64)etext, PTE_R  PTE_W);

 // map the trampoline for trap entry/exit to
 // the highest virtual address in the kernel.
 kvmmap(kpgtbl, TRAMPOLINE, (uint64)trampoline, PGSIZE, PTE_R  PTE_X);

 // map kernel stacks
 proc_mapstacks(kpgtbl);
 
 return kpgtbl;
}

void
kvminithart()
{
 w_satp(MAKE_SATP(kernel_pagetable));
 sfence_vma();
}

void
kinit()
{
 initlock(&kmem.lock, "kmem");
 freerange(end, (void\*)PHYSTOP);
}

void \*
kalloc(void)
{
 struct run \*r;

 acquire(&kmem.lock);
 r = kmem.freelist;
 if(r)
   kmem.freelist = r->next;
 release(&kmem.lock);

 if(r)
   memset((char\*)r, 5, PGSIZE); // fill with junk
 return (void\*)r;
}

void
kfree(void \*pa)
{
 struct run \*r;

 if(((uint64)pa % PGSIZE) != 0  (char\*)pa < end  (uint64)pa >= PHYSTOP)
   panic("kfree");

 // Fill with junk to catch dangling refs.
 memset(pa, 1, PGSIZE);

 r = (struct run\*)pa;

 acquire(&kmem.lock);
 r->next = kmem.freelist;
 kmem.freelist = r;
 release(&kmem.lock);
}

int
growproc(int n)
{
 uint sz;
 struct proc \*p = myproc();

 sz = p->sz;
 if(n > 0){
   if((sz = uvmalloc(p->pagetable, sz, sz + n)) == 0) {
     return -1;
  }
} else if(n < 0){
   sz = uvmdealloc(p->pagetable, sz, sz + n);
}
 p->sz = sz;
 return 0;
}

sbrk是一个系统调用,这个调用帮助我们实现进程内存空间的增长和消亡.

sbrk系统调用?

同样,为了防止栈溢出,我们有一个guard page来保护.

stack页是一个页,然后里面的参数是由exec程序创建的,有各种参数以及参数的地址.还有main执行完PC的返回值.

其中trapframe这个页是映射到可用物理空间的,在kernel态是直接映射的,所以说不用担心kernel态访问不了用户态的kernel.

第一:一个用户进程只使用一张页表,不同的用户进程的物理地址是相互隔离的不会被打扰的.第二,用户看见的虚拟地址是连续的但其实物理地址不是连续的,这样加大了分配的灵活性.第三,trampoline页是所有用户通用的,也就是说每个用户的页表一定有一个MAXVA-PGSIZE->trampoline的映射.

用户地址空间是从0~MAXVA的.然后当用户程序需要更多的内存的时候,xv6就会使用kalloc来获取新的页,然后接着建立pa和va的关系(和内核是一样的).对于虚拟地址,如果用户进程暂时不需要使用,就可以把页表的PTE_V置0表示不需要使用.

用户进程地址空间.

同样,在释放的时候,也是获得这个释放的物理块地址,把它放到freelist的队首中.

kalloc.c中我们知道,每次申请都会调用一次kalloc函数.kalloc函数每一次从freelist中取出一块来进行返回.这个freelist已经在kinit函数中初始化好了,就是从end(内核态空间的占用的最后一个地址)到PHYSTOP这个区域内.

如何申请物理块?

我们知道TLB会存储一些页表信息,CPU同样也会切换进程,切换进程的时候我们不想让下一个进程知道我们的页表信息,这个时候就会调用sfence_vma()函数来对TLB的内容进行一次部分刷新.

这个时候这个函数会把kernel_pagetable写进satp寄存器中,这个时候页表正式进入工作,之后的地址就是需要页表一级的转化,并且当前页表的第一级首地址就是kernel_pagetable.

在S态的main函数执行了kvminithart函数来初始化了内核态页表.

上面的所有函数实现的基础就是在bare linking上面的,也就是说执行的情况中虚拟地址=物理地址,我们才可以方便地访问和处理.

进行了若干次的虚拟地址和物理地址的映射,这个时候最后一步就是调用proc_mapstacks.对每一个进程都分配了一个内核栈.然后也调用了kvmmap来进行地址的映射.最后返回一个内核态页表.

在操作系统初始化的时候,就调用kvminit函数对内存空间进行初始化,kvminit调用了kvmmake函数.kvmmake又调用了若干个kvmmap函数.在调用这一段函数的时候,xv6还没有开启份页功能,所以说在这一部分执行的指令可以直接访问物理内存.kvmmake函数首先申请物理内存的一页作为内核态页表的一页.然后接着调用kvmmap函数在kernel态的页表中添加对于若干个虚拟地址的映射.

然后又copyout和copyin,这个函数可以从用户态的虚拟地址中获取信息传递给内核态.

给定va和pa,然后添加va和pa的连接,放入页表中,这个时候va和pa正式有了联系.

还有一个就是mappages.这个函数负责添加页表项,就是给页表添加一项,让一段虚拟地址和一段物理地址进行匹配.

这个函数是不是跟我们之前说的读法是一样的,三层的页表就需要我们去读三次,有哪一次发现Valid位(PTE_V不对)就返回为0,然后申请一个新页即可.

4:Interrupt&Device Manage

设备管理

一个设备驱动程序就是操作系统对特定的设备进行管理的程序,这些程序让设备执行操作,并且处理设备引起的中断,并且与因为设备I/O而被阻塞的进程.设备驱动程序往往非常难设计,因为设备和设备驱动程序是一起工作的,而且编写设备驱动程序需要对硬件接口有着深入的了解,这一点往往非常难.

设备会通过引发中断来通知操作系统进行处理,在中断的那一部分我们说过,操作系统通过识别中断来源来判断这是个设备中断,然后调用设备中断处理程序.其中函数会调用devintr这个函数来获取究竟是什么设备发生了中断.

许多设备中断的程序一般分成两个部分,第一个部分在进程的内核态执行,一般来说用户程序会执行read和write调用以希望从设备中获取一些信息.这一部分的内容可能负责把用户的请求传送给设备,让设备执行用户的请求.第二个部分就是设备中断处理,这一部分一般是设备处理完用户的请求,处理完之后设备会向操作系统发送一个中断请求,这一部分的代码就是用来处理设备的中断请求的,把结果传递给用户程序并唤醒相关的进程.

控制台输入

关于控制台,关于控制台的一些代码存放到了console.c这个文件中,控制台驱动程序可以接受用户输入的字符,通过UART这个特殊的硬件.控制台驱动程序一次性获得一行输入,用户进程,比如说shell程序会通过read这个系统调用来获得控制台输入.综合起来就是

QEMU模拟的UART硬件->操作系统的内核->用户程序的read系统调用.

在实际的电脑中,16550芯片会管理RS232这个串行链路来连接到其他终端,在QEMU中,这个模拟的芯片连接你的键盘和屏幕.

对于操作系统(软件)来说:我们可以像访问内存一样来访问UART硬件,在之前内存管理的时候我们已经提到了,我们可以通过访问UART0这个地址来像访问内存一样来访问设备.在UART设备中存储了许多寄存器数据,操作系统可以通过UART0地址+偏移来访问寄存器数据.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#define RHR 0                 // receive holding register (for input bytes)
#define THR 0 // transmit holding register (for output bytes)
#define IER 1 // interrupt enable register
#define IER_RX_ENABLE (1<<0)
#define IER_TX_ENABLE (1<<1)
#define FCR 2 // FIFO control register
#define FCR_FIFO_ENABLE (1<<0)
#define FCR_FIFO_CLEAR (3<<1) // clear the content of the two FIFOs
#define ISR 2 // interrupt status register
#define LCR 3 // line control register
#define LCR_EIGHT_BITS (3<<0)
#define LCR_BAUD_LATCH (1<<7) // special mode to set baud rate
#define LSR 5 // line status register
#define LSR_RX_READY (1<<0) // input is waiting to be read from RHR
#define LSR_TX_IDLE (1<<5) // THR can accept another character to send

首先xv6的S态的main函数会调用consoleinit函数.这个函数会初始化UART硬件.

1
2
3
4
5
6
7
8
9
10
11
12
void
consoleinit(void)
{
initlock(&cons.lock, "cons");

uartinit();

// connect read and write system calls
// to consoleread and consolewrite.
devsw[CONSOLE].read = consoleread;
devsw[CONSOLE].write = consolewrite;
}

uartinit()的代码保证了UART在收到每一次键盘输入的时候都会引发中断,然后每一次传输完一整个字符还会送出一个trasmit complete中断.

1
WriteReg(IER, IER_TX_ENABLE  IER_RX_ENABLE);

接着UART硬件也会引发一个中断,trap函数会判断这是什么类型中断,发现是设备引起的中断,就转而调用处理设备中断的函数devintr,接着这个函数通过调用PLIC判断是什么设备引起的中断,发现是UART设备,转而调用uartintr.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
int
devintr()
{
uint64 scause = r_scause();

if((scause & 0x8000000000000000L) &&
(scause & 0xff) == 9){
// this is a supervisor external interrupt, via PLIC.

// irq indicates which device interrupted.
int irq = plic_claim();

if(irq == UART0_IRQ){
uartintr();
} else if(irq == VIRTIO0_IRQ){
virtio_disk_intr();
} else if(irq){
printf("unexpected interrupt irq=%d\\n", irq);
}

// the PLIC allows each device to raise at most one
// interrupt at a time; tell the PLIC the device is
// now allowed to interrupt again.
if(irq)
plic_complete(irq);

return 1;
} else if(scause == 0x8000000000000001L){
// software interrupt from a machine-mode timer interrupt,
// forwarded by timervec in kernelvec.S.

if(cpuid() == 0){
clockintr();
}

// acknowledge the software interrupt by clearing
// the SSIP bit in sip.
w_sip(r_sip() & ~2);

return 2;
} else {
return 0;
}
}
//中断处理分成两个部分,前面部分把存储在UART寄存器的键盘输入发送.
void
uartintr(void)
{
//keyborad->RHR
// read and process incoming characters.(处理控制台输入)
while(1){
int c = uartgetc();
if(c == -1)
break;
consoleintr(c);
}

// send buffered characters.(处理控制台输出)
acquire(&uart_tx_lock);
uartstart();
release(&uart_tx_lock);
}

接着从uartintr函数中从UART寄存器中获取一个字符,再把字符递交给consoleintr函数.

1
2
3
4
5
6
7
8
9
10
int
uartgetc(void)
{
if(ReadReg(LSR) & 0x01){
// input data is ready.
return ReadReg(RHR);
} else {
return -1;
}
}

就是从LSR寄存器判断,然后从RHR寄存器获得数据.

consoleintr负责把所有UART输入的元素存储起来,存储到cons.buf这个数组中,然后当输入的是换行,就可以唤醒一个正在运行consoleread的进程.这个进程会执行consoleread函数,consoleread函数会读取缓冲区内的数据,然后返回给用户态.

每一次唤醒,consoleread就是读取一行的元素,然后把数据传递给用户态.

总结:用户键盘输入->中断一次->UART把中断的输入读取出来送到consoleintr->consointr调用consoleread函数

控制台输出

read()系统调用能获得用户的键盘输入,write()系统调用可以在控制器中进行输出.

UART设备每一次从THR寄存器中输出一字节的数据,它就会产生一个中断,和之前一样,uartintr会调用uartstart函数.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
void
uartstart()
{
while(1){
if(uart_tx_w == uart_tx_r){
// transmit buffer is empty.
return;
}

if((ReadReg(LSR) & LSR_TX_IDLE) == 0){
// the UART transmit holding register is full,
// so we cannot give it another byte.
// it will interrupt when it's ready for a new byte.
return;
}

int c = uart_tx_buf[uart_tx_r % UART_TX_BUF_SIZE];
uart_tx_r += 1;

// maybe uartputc() is waiting for space in the buffer.
wakeup(&uart_tx_r);

WriteReg(THR, c);
}
}

每一次输出一字节的数据都需要看看在缓冲区内有没有其他的数据需要去输出.这个函数就是检查缓冲区内还有没有数据要写,如果要写,就放到THR寄存器中等待去写.

然后机器就会从THR寄存器中读取要输出的内容,输出成功就触发中断,看看还有没有要要写的内容,有的话就接着放入THR寄存器中.

特别地,第一个字节会在uartputc这个系统调用中进行输出.其他的字节是通过字节

设备驱动的并行性

你会发现,每一次进入consolereadconsoleintr都会获取一个锁,这个锁会保证不可能同时有两个进程执行这个函数,当两个进程同时执行consoleread的时候,有可能会把一整句话分成两部分交付给两个进程,这个是不对的.加上锁可以保证同时只有一个进程进入这个函数

1
2
3
cd abab
process1 :c a
process2 :dabb

还有一个可能就是一个进程在等待consoleread的结束,另外一个进程正在运行,这个时候执行中断操作可能会把console输入传递给另外一个进程,这个时候我们也需要上一个锁.

Another way in which concurrency requires care in drivers is that one process may be waiting for input from a device, but the interrupt signaling arrival of the input may arrive when a different process (or no process at all) is running. Thus interrupt handlers are not allowed to think about the process or code that they have interrupted. For example, an interrupt handler cannot safely call copyout with the current process’s page table. Interrupt handlers typically do relatively little work (e.g., just copy the input data to a buffer), and wake up top-half code to do the rest.

时钟中断

RISC-V的CPU在一定的时间段就会触发一次时钟中断,RISC-V希望时钟中断能在M态处理而不是在S态处理.xv6选择在一个特殊的方法来处理时钟中断.

在start.c中,我们设置了把所有中断都放在S态进行处理.但是我们在timeinit函数中创建了一个专属于时钟中断的处理模式.主要有几点:

  • 配置了CLINT硬件,这个硬件会在一定间隔时间触发一次中断.
  • 配置trapframe,这样可以把通用寄存器的数据放到CLINT寄存器中
  • 由于M态的中断只有时钟中断,中断向量配置为timevec.

在M态下的时钟中断处理函数在是timervec:这个保存了一部分寄存器,然后告诉CLINT硬件什么时候产生下一次时钟中断,然后引发一个S态的软件中断.

在执行用户态或者是内核态的代码的时候都会引发时钟中断,时钟中断尽量不要打扰正在执行关键任务的进程.所以说RISC-V允许引起一个软件中断,这个中断是S态引起的.

当中断被关闭的时候,说明正在执行很关键的任务,代码可以选择拒绝时钟中断的执行,如果没有关中断,这个软件中断就会打断正在执行的代码,执行时钟中断的操作,放弃对CPU的占用.

总结:进入M态处理中断->引发一个S态的中断->如果执行关键任务,先不管中断,如果不执行关键任务,就放弃对于CPU的占用->调度给其他进程.

5:MultiPlexing

多路复用

在xv6中,我们拥有的进程数往往要比CPU核要多.那么我们通过多路复用来进行调度.我们这个时候多路复用来进行调度.所有的进程共用几个多路复用器,使用的方法一般是时分复用.

首先xv6当等待设备完成I/O或者等待子进程退出的时候,就会使用sleepwakeup系统调用来切换进程的状态.同时xv6操作系统并不会允许一个进程占用CPU太多时间,当一个进程连续占用CPU一段时间这个进程也会强制改变状态.这个对于进程来说,它被唤醒和打断是无法侦查的.所以说对于一个进程来说相当于占用了属于自己的CPU.

完成多路复用有一定的挑战,第一点就是如何进行进程的切换,切换CPU的运行状态以及其他部件的状态.第二点,对于用户进程,怎么处理可以让用户进程无法觉察到自己失去了CPU的控制权.这里xv6使用时钟中断来进行切换,(每段时间暂停一遍,进入内核态执行进程切换函数,这样子对于用户进程是透明的).第三点,所有CPU核都会执行一组进程,我们需要设计一个锁结构来防止race的出现.第四点,一个进程的所有内存和其他的资源在进程退出的时候都必须得释放,但是在释放的时候我们很难释放内核态栈.第五点,每一个核都需要知道自己运行进程的序号,否则我们进入内核态的时候不知道使用哪个栈.最后一点,就是sleep和wakeup的系统调用让进程放弃CPU.但是我们需要的注意是在唤醒进程的时候的race现象.

上下文切换

在切换进程的时候首先用户态先进入内核态,然后会把上下文信息放入到内核栈,切换到新的进程,然后新的进程的上下文信息会从内核栈中取出,再切换到用户态.每个进程会拥有一个内核态栈,因为多个进程共用一个内核栈是非常危险的.

保存的信息就是CPU的寄存器的值,同时,恢复也是恢复保存在内核态栈的寄存器值.swtch函数会执行寄存器的保存和提取.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
.globl swtch
swtch:
sd ra, 0(a0)
sd sp, 8(a0)
sd s0, 16(a0)
sd s1, 24(a0)
sd s2, 32(a0)
sd s3, 40(a0)
sd s4, 48(a0)
sd s5, 56(a0)
sd s6, 64(a0)
sd s7, 72(a0)
sd s8, 80(a0)
sd s9, 88(a0)
sd s10, 96(a0)
sd s11, 104(a0)

ld ra, 0(a1)
ld sp, 8(a1)
ld s0, 16(a1)
ld s1, 24(a1)
ld s2, 32(a1)
ld s3, 40(a1)
ld s4, 48(a1)
ld s5, 56(a1)
ld s6, 64(a1)
ld s7, 72(a1)
ld s8, 80(a1)
ld s9, 88(a1)
ld s10, 96(a1)
ld s11, 104(a1)

ret

swtch(&c->context, &p->context);

在这里swtch首先把当前的寄存器信息存放到a0对应的内核栈中,再从a1对应的内核栈中取出数据放到寄存器中.对于这个函数,它并不知道这是什么进程在执行stwch调用.

现在我们知道context(上下文)的内容了,对于每一个进程和CPU的数据结构都有一部分保存上下文.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// Saved registers for kernel context switches.
struct context {
uint64 ra;
uint64 sp;

// callee-saved
uint64 s0;
uint64 s1;
uint64 s2;
uint64 s3;
uint64 s4;
uint64 s5;
uint64 s6;
uint64 s7;
uint64 s8;
uint64 s9;
uint64 s10;
uint64 s11;
};

在C语言中,a0分别是老进程的context字段的地址,a1是新的进程的context字段的地址.我们发现这里只保存了callee-saved寄存器.但是你会发现,ra寄存器的值被改变了,所以说我知道当函数返回的时候,返回地址改变了,所以说这下pc就变成之前调用swtch的进程调用swtch的PC.这听上去很绕,简单的说就是反悔的PC不是这个进程调用之前的那个PC,而是上个进程调用之前的PC.

Swtch takes two arguments: struct context *old and struct context *new. It saves the current registers in old, loads registers from new, and returns.

我们回到之前的代码,在trap中最后调用了yield函数.

1
2
3
4
5
6
7
8
9
void
yield(void)
{
struct proc \*p = myproc();
acquire(&p->lock);
p->state = RUNNABLE;
sched();
release(&p->lock);
}

首先yield函数讲当前进程的状态改成“可执行”,接着又调用sched()函数.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void
sched(void)
{
int intena;
struct proc \*p = myproc();

if(!holding(&p->lock))
panic("sched p->lock");
if(mycpu()->noff != 1)
panic("sched locks");
if(p->state == RUNNING)
panic("sched running");
if(intr_get())
panic("sched interruptible");

intena = mycpu()->intena;
swtch(&p->context, &mycpu()->context);
mycpu()->intena = intena;
}

先判断各种情况,这个不是特别重要,重要的是我执行了swtch函数,这个函数会把当前进程的上下文保存,然后把scheduler()的上下文拿出来开始执行中间的调度过程.这个调度过程是每个CPU都特有的调度过程,其上下文存放在cpu的context里面,这个context区间每个CPU核都有一个.这下返回的地址不是sched()函数而是scheduler()函数了.也就是说这个地方很巧妙地改变ra寄存器让程序的返回地址改变,返回的是调度函数.这种思路只改变了部分上下文就可以改变运行的程序,非常妙.(最重要的是每个CPU一个防止race现象)

调度方法

现在完成了第一步,从原进程到调度程序,对于第一步,都是先获得进程的锁,然后更改进程的状态然后调用sched,这个对于sleep还是yiled还是exit都是一样.sched函数会进行一定的检查,然后最后sched会调用swtch转移到scheduler函数.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
void
scheduler(void)
{
struct proc \*p;
struct cpu \*c = mycpu();

c->proc = 0;
for(;;){
// Avoid deadlock by ensuring that devices can interrupt.
intr_on();

for(p = proc; p < &proc[NPROC]; p++) {
acquire(&p->lock);
if(p->state == RUNNABLE) {
// Switch to chosen process. It is the process's job
// to release its lock and then reacquire it
// before jumping back to us.
p->state = RUNNING;
c->proc = p;
swtch(&c->context, &p->context);

// Process is done running for now.
// It should have changed its p->state before coming back.
c->proc = 0;
}
release(&p->lock);
}
}
}

接着根据scheduler的返回地址开始执行,首先这个函数是一个永远循环下去的循环,接着把上一次调用swtch进程的锁给释放了.你会发现这个十分巧妙,因为进程之间的解锁和上锁是通过swtch实现的,swtch的调用者已经有了锁,接着这个锁传递给scheduler.

由于scheduler是swtch(&c->context, &p->context);开始执行的,所以说第一步就是标记CPU正在运行的进程,把这个进程改成0(NULL),然后再来释放这个锁.(因为当前上锁的进程一定是上一次进入的进程.)这个时候在sched()上的锁,在scheduler()释放.在scheduler()获得的锁,在sched()释放因为这是一个回环.这个时候你可以认为scheduler和sched是一个回环.当然也不绝对,因为当新的进程第一次运行的时候,返回是从forkret函数返回的,这个是在frok函数一开始就已经设定好的.第一次运行执行forkret函数,然后通过usertrapret返回到用户态.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void
forkret(void)
{
static int first = 1;

// Still holding p->lock from scheduler.
release(&myproc()->lock);

if (first) {
// File system initialization must be run in the context of a
// regular process (e.g., because it calls sleep), and thus cannot
// be run from main().
first = 0;
fsinit(ROOTDEV);
}

usertrapret();
}

scheduler运行一个永远运行的循环,首先找到一个可以运行的进程,一直运行直到调用yield()函数,这个可以运行的定义就是p->state==RUNNABLE.每一次找到可以运行的进程,都会把这个进程的进程信息放到当前cpu的结构体里面,标记为RUNNING.

首先我们知道,当一个进程的状态是RUNNING,我们可以安全地保存这个进程的上下文,因为现在这个CPU的寄存器是属于某个进成的寄存器,还有一个就是挑选属于RUNNABLE的进程它也是为了防止出现问题.

由于要保存进程的状态,所以说这就是为什么xv6需要给处理进程的代码上锁了,就是因为进程状态这个变量就是临界变量,这块代码是临界区

当前的CPU和进程信息.

我们需要记录当前的进程指针来获取信息,一般来说,如果你的机器是一个核的,我们可以设置一个全局变量,但是我们的机器是多核的,每个核执行不同的进程,这个方案就有一定的问题.

所以说xv6维护一个结构体叫做CPU,这个CPU存储着当前CPU核正在运行什么核.

1
2
3
4
5
6
7
// Per-CPU state.
struct cpu {
struct proc \*proc; // The process running on this cpu, or null.
struct context context; // swtch() here to enter scheduler().
int noff; // Depth of push_off() nesting.
int intena; // Were interrupts enabled before push_off()?
};

这个结构体存放着scheduler的上下文,以及当前运行的进程.

1
2
3
4
5
6
7
8
9
10
11
12
13
struct cpu\*
mycpu(void) {
int id = cpuid();
struct cpu \*c = &cpus[id];
return c;
}

int
cpuid()
{
int id = r_tp();
return id;
}

这个时候可以获得cpuid以及对应的结构体.因为CPU的核号是存放在tp寄存器的.在mstart中,就是在启动的时候已经把CPU的核号已经送入到tp寄存器中了,在M-mode的时候,usertrapret把tp寄存器保存在trapoline寄存器中.并且编译器永远不会把tp寄存器改变,所以说我们可以很方便地获得cpu核的id值.

当然,为了保证CPU返回不会被时钟中断打扰,调用这个函数要求使用cpu结构体的时候关闭中断,当使用完毕之后再来开启中断.

当然我们还可以使用myproc()函数来获得当前cpu运行的进程结构体:

1
2
3
4
5
6
7
8
9
// Return the current struct proc \*, or zero if none.
struct proc\*
myproc(void) {
push_off();
struct cpu \*c = mycpu();
struct proc \*p = c->proc;
pop_off();
return p;
}

睡眠与唤醒

sleep和wakeup调用给底层提供了一个同步接口,我们可以根据这个同步接口构建一个叫做信号量的顶层接口,信号量的定义和P-V操作我们都在操作系统课上学过了,我们就不需要解释究竟什么是P-V操作.贴个代码吧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
100 struct semaphore {
101 struct spinlock lock;
102 int count;
103 };
104

400 void
401 V(struct semaphore \*s)
402 {
403 acquire(&s->lock);
404 s->count += 1;
405 wakeup(s);
406 release(&s->lock);
407 }
408
409 void
410 P(struct semaphore \*s)
411 {
412 acquire(&s->lock);
413 while(s->count == 0)
414 sleep(s, &s->lock);
415 s->count -= 1;
416 release(&s->lock);
417 }

P-V操作需要获得锁,因为对于信号量,同时可以有多个进程对信号量进行操作.

总而言之,P-V操作要求我们sleep(s,s->lock),要求这个进程为了s信号灯而等待,放弃当前CPU的占用,wakeup(s)要求通知所有为s信号灯而等待的进程,有位占了.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// Atomically release lock and sleep on chan.
// Reacquires lock when awakened.
void
sleep(void \*chan, struct spinlock \*lk)
{
struct proc \*p = myproc();

// Must acquire p->lock in order to
// change p->state and then call sched.
// Once we hold p->lock, we can be
// guaranteed that we won't miss any wakeup
// (wakeup locks p->lock),
// so it's okay to release lk.

acquire(&p->lock); //DOC: sleeplock1
release(lk);

// Go to sleep.
p->chan = chan;
p->state = SLEEPING;

sched();

// Tidy up.
p->chan = 0;

// Reacquire original lock.
release(&p->lock);
acquire(lk);
}

首先先标记一下,这个目前是睡眠状态.还标记一下睡眠的理由,就是proc的chan元素.然后进行进程调度,因为这个程序在返回的时候还是需要对信号灯进行更改,所以说在返回的时候还是需要信号灯的锁.但是当进程进入睡眠状态就可以不需要信号灯的锁了.记录进程是为了谁而睡眠的十分重要.

对应的,在wakeup()函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// Wake up all processes sleeping on chan.
// Must be called without any p->lock.
void
wakeup(void \*chan)
{
struct proc \*p;

for(p = proc; p < &proc[NPROC]; p++) {
if(p != myproc()){
acquire(&p->lock);
if(p->state == SLEEPING && p->chan == chan) {
p->state = RUNNABLE;
}
release(&p->lock);
}
}
}

这个时候就寻找一个正在睡眠的并且因为chan这个原因睡眠的进程,表示你要的资源已经到达,这个时候就可以提醒这些进程现在可以执行了.(为什么要一次性全部通知,这样会错吗?其实不会,因为是执行while循环的,如果发现s->count还是小于0,那我还是接着睡吧zzz)

由于我们对很多临界变量,比如说信号灯,进程控制块进行了修改,所以说锁结构是必要的.

管道

xv6的管道使用sleep和wakeup的操作来进行复杂的同步通讯.我们在之前已经了解过Linux的管道系统.

1
2
3
4
5
6
7
8
struct pipe {
struct spinlock lock;
char data[PIPESIZE];
uint nread; // number of bytes read
uint nwrite; // number of bytes written
int readopen; // read fd is still open
int writeopen; // write fd is still open
};

在xv6,我们使用pipe这个数据结构,首先我们看到一个锁结构,还有一个数据buffer.还有nread和nwrite来表示目前管道的两端已经读出了几个元素,已经写入几个元素.这个数据buffer还是一个循环队列,也就是第PIPELINE-1项之后就是第0项.循环队列的判空和判满非常容易.由于在这里我们使用了类似于TCP id的模式处理,所以说我们读区元素使用%运算来进行读取.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
int
pipewrite(struct pipe \*pi, uint64 addr, int n)
{
int i = 0;
struct proc \*pr = myproc();

acquire(&pi->lock);
while(i < n){
if(pi->readopen == 0 pr->killed){
release(&pi->lock);
return -1;
}
if(pi->nwrite == pi->nread + PIPESIZE){ //DOC: pipewrite-full
wakeup(&pi->nread);
sleep(&pi->nwrite, &pi->lock);
} else {
char ch;
if(copyin(pr->pagetable, &ch, addr + i, 1) == -1)
break;
pi->data[pi->nwrite++ % PIPESIZE] = ch;
i++;
}
}
wakeup(&pi->nread);
release(&pi->lock);

return i;
}

首先就是pipewrite,在对管道这个临界资源修改之前我们先上锁,所以说读的时候不能写,写的时候不能读,也不能同时读也不能同时写,这个就是锁的魅力.

首先就是各种异常情况,这个不说了,接着就是满的情况,那么只能让暂时让写的进程休眠,顺便让那群没字节可读的进程醒过来.如果没有满,那就一个一个字节地往buffer里面写,写的操作就是普通的循环队列,只不过这里巧妙地使用了wakeup和sleep调节满和空之间的平衡.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
int
piperead(struct pipe \*pi, uint64 addr, int n)
{
int i;
struct proc \*pr = myproc();
char ch;

acquire(&pi->lock);
while(pi->nread == pi->nwrite && pi->writeopen){ //DOC: pipe-empty
if(pr->killed){
release(&pi->lock);
return -1;
}
sleep(&pi->nread, &pi->lock); //DOC: piperead-sleep
}
for(i = 0; i < n; i++){ //DOC: piperead-copy
if(pi->nread == pi->nwrite)
break;
ch = pi->data[pi->nread++ % PIPESIZE];
if(copyout(pr->pagetable, addr + i, &ch, 1) == -1)
break;
}
wakeup(&pi->nwrite); //DOC: piperead-wakeup
release(&pi->lock);
return i;
}

对于读也是一样,如果管道是空的,那么没字节可读,那就让自己休眠.顺便通知写的进程抓紧来写,如果不是空的,那就慢慢来读.一个字一个字地读.

由于读写区间都上了锁,所以说这样可以保证只有一个进程能对一个管道进行处理,防止乱套.

等待,退出和杀死的系统调用

首先就是wait的系统调用:wait调用就是等待子进程退出,退出了之后父进程就可以继续执行.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
int
wait(uint64 addr)
{
struct proc \*np;
int havekids, pid;
struct proc \*p = myproc();

acquire(&wait_lock);

for(;;){
// Scan through table looking for exited children.
havekids = 0;
for(np = proc; np < &proc[NPROC]; np++){
if(np->parent == p){
// make sure the child isn't still in exit() or swtch().
acquire(&np->lock);

havekids = 1;
if(np->state == ZOMBIE){
// Found one.
pid = np->pid;
if(addr != 0 && copyout(p->pagetable, addr, (char \*)&np->xstate,
sizeof(np->xstate)) < 0) {
release(&np->lock);
release(&wait_lock);
return -1;
}
freeproc(np);
release(&np->lock);
release(&wait_lock);
return pid;
}
release(&np->lock);
}
}

// No point waiting if we don't have any children.
if(!havekids p->killed){
release(&wait_lock);
return -1;
}

// Wait for a child to exit.
sleep(p, &wait_lock); //DOC: wait-sleep
}
}

这个程序的代码非常好懂,由于这个wait比较菜,只需要任何一个子进程退出就可以了,所以说实现起来不难.首先判断这个进程有没有一个儿子,并且这个儿子就是僵尸状态的,如果有的话就太好了,我就不用wait了,我就把这个儿子给释放了.并且把信息传递给用户那边.如果没有找到,那还是乖乖等着吧.我们获得这些锁,第一是为了保护进程,第二个是为了防止多线程访问..导致这个pid和havekids出现问题.

接着就是exit了,由于wait调用要等待有没有儿子退出,所以说sleep和wakeup是要配套的,wakeup就是在exit里面实现的.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
void
exit(int status)
{
struct proc \*p = myproc();

if(p == initproc)
panic("init exiting");

// Close all open files.
for(int fd = 0; fd < NOFILE; fd++){
if(p->ofile[fd]){
struct file \*f = p->ofile[fd];
fileclose(f);
p->ofile[fd] = 0;
}
}

begin_op();
iput(p->cwd);
end_op();
p->cwd = 0;

acquire(&wait_lock);

// Give any children to init.
reparent(p);

// Parent might be sleeping in wait().
wakeup(p->parent);

acquire(&p->lock);

p->xstate = status;
p->state = ZOMBIE;

release(&wait_lock);

// Jump into the scheduler, never to return.
sched();
panic("zombie exit");
}

首先看看是不是init的守护进程要退出,不是就还好.第一步就是解决打开的文件问题,一一关闭文件即可.接着就是reparent,就是如果它的父进程要被exit掉,但是子进程还在存活,就需要使用reparent来处理父进程.如果这个进程有父进程,顺便叫醒正在沉睡的,等待它的儿子调用exit()的爸爸.接着由于这个已经退出了,所以说转进程调度吧.

exit是自己退场,那么kill是勒令让别人退场.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
int
kill(int pid)
{
struct proc \*p;

for(p = proc; p < &proc[NPROC]; p++){
acquire(&p->lock);
if(p->pid == pid){
p->killed = 1;
if(p->state == SLEEPING){
// Wake process from sleep().
p->state = RUNNABLE;
}
release(&p->lock);
return 0;
}
release(&p->lock);
}
return -1;
}

//usertrapret()
if(p->killed)
exit(-1);

这个就像注射了慢性毒药,我在kill函数什么都不做,我只是设定一个killed为1,然后这个进程在执行usertrapret的时候由于killed值为1,这个时候就它会自己调用exit()然后退场.

6:File System

1.概览

xv6的文件系统由7层组成,首先就是最下面的硬件层,cache层在上面通过缓存硬件块来实现操作系统同步地访问硬盘块(这样可以降低操作系统访问硬盘块的时间),并且可以进行简单的同步管理,这样子可以保证只有一个进程同时访问一个硬盘块.记录层让更高层次的程序在在一次处理中能够处理多个硬件块,保证这些硬件块是同步处理的(要么都不处理,要么都处理).inode层负责描述文件,其中一个文件对应着一个inode,这个inode存储着许多文件的信息.其中inode层负责存储文件的控制信息,其中有一个索引负责带领文件找到索引本身.文件目录层负责实现具体的文件目录.路经名层负责完善文件树.这样可以根据文件的路径取访问文件了.文件描述器层负责完善许多UNIX抽象文件接口,负责给用户程序提供文件系统相关的系统调用.

硬盘把数据按照硬盘扇区为单位连续地存放在磁盘中,每一个磁盘扇区大小是512字节.其中第0块是第一个512字节.第1块是第513-1024字节,以此类推.xv6操作系统维护一个buf结构体.存储在这个结构体的数据可能和磁盘实际存储的数据不一样.有一种可能就是数据写进buf结构体,但还是没有写进磁盘块.(类似于cache,cache也有脏数据嘛)

还需要注意的是,在操作系统中,磁盘块的大小一般是磁盘扇区大小的两倍.所以说在xv6中我们认为一块就是两个扇区,就是1024字节.到后面我们逻辑上认为一块就是两个扇区,1024字节.

1
2
3
4
5
6
7
8
9
10
11
12
#define BSIZE 1024  // block size
struct buf {
int valid; // has data been read from disk?
int disk; // does disk "own" buf?
uint dev;
uint blockno;
struct sleeplock lock;
uint refcnt;
struct buf \*prev; // LRU cache list
struct buf \*next;
uchar data[BSIZE];
};

一个文件系统必须有一个计划,就是哪些磁盘块存放控制文件总体信息的inode,哪些磁盘块存放具体的文件.在这里,xv6把磁盘分成了几个部分,第0块是boot块,这里面存放着引导操作系统的代码.第1块又被称为“超级块”,超级块中存放了关于整个文件系统的原数据(包括文件系统所有块的数量,inode的数量,还有文件块的数量).从2后面就是记录,接着就是inode,接着就是bit map(这个bitmap可以帮助我们确定哪些块已经被使用了),最后就是存放的文件块.

2.buffer cache层.

buffer cache层有两个作用,第一个是同步,在内存中只有一个磁盘块的拷贝,还有就是只有一个进程能够访问这个磁盘块的拷贝.第二个作用就是可以把比较常用的磁盘块放入缓存区里面,这样可以加快进程读写磁盘的速度.

buffer cache层主要提供了两个API,分别是bread()和bwrite().

bread()负责获取一个块的存储在内存中的副本,我们之后的读写操作就是在内存中的副本进行的.bwirte()就负责把内存中副本写入到磁盘中.总的来说是read()操作把磁盘中信息读取到内存,write就是把内存中的信息写入到磁盘.在最后我们要调用brelease来释放这个内存块的锁.总之bread()返回了一个带锁的buffer,brelease就负责把这个锁释放掉.

因为这个本质上也是一个cache.当操作系统要求访问一个不再buffer的磁盘块,它就需要淘汰,淘汰就选用最久未使用算法即可.

Cache层重要数据结构定义

buf数据结构定义。valid字段表示该buf中是否存储了有效内容;字段disk的意思是缓冲区的内容已经被提交到了磁盘,这可能会改变缓冲区(如,把磁盘中的数据写到data,这个类似于cache的脏位,代表这个是不是在cache中写过);devblockno标识该buf存储的磁盘块设备和扇区号;data则是buf实际缓存的内容;而prevnext就代表一个双向链表.其中refcnt代表这个块是不是可用的,或者代表这个buf链接了多少个磁盘块.

1
2
3
4
5
6
7
8
9
10
11
struct buf {
int valid; // has data been read from disk?
int disk; // does disk "own" buf?
uint dev;
uint blockno;
struct sleeplock lock;
uint refcnt;
struct buf \*prev; // LRU cache list
struct buf \*next;
uchar data[BSIZE];
};

bcache数据结构定义。bcache用于管理所有的buffer,将所有的buf用双链表进行连接,head是链表头,不存储实际的buf。所有的代码访问bcache是访问head,并不会访问buf数组.我们了解过用硬件实现的cache,但是这是第一次遇见用软件实现的cache.

1
2
3
4
5
6
7
8
9
struct {
struct spinlock lock;
struct buf buf[NBUF];

// Linked list of all buffers, through prev/next.
// Sorted by how recently the buffer was used.
// head.next is most recent, head.prev is least.
struct buf head;
} bcache;

Cache层的函数定义

binit()函数:初始化bcache,把所有的buf使用双链表进行连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void
binit(void)
{
struct buf \*b;

initlock(&bcache.lock, "bcache");

// Create linked list of buffers
bcache.head.prev = &bcache.head;
bcache.head.next = &bcache.head;
for(b = bcache.buf; b < bcache.buf+NBUF; b++){
b->next = bcache.head.next;
b->prev = &bcache.head;
initsleeplock(&b->lock, "buffer");
bcache.head.next->prev = b;
bcache.head.next = b;
}
}

bget()函数:

bget函数首先获取bcache.lock,至多扫描两遍双链表,第一遍从前到后,如果磁盘块已经缓存至buf中,则在第一遍循环之后就会返回该buf;否则执行第二次反向扫描,寻找目前未使用的buf,将buf的必要字段进行标记之后返回该buf。

注意:在将buf返回之前,需要获取该buf的锁(buf.lock);赋值语句b->valid = 0,它确保了bread将从磁盘读取块数据,而不是错误地使用缓冲区里之前的数据。获取锁的目标是,现在只有一个进程能够访问这段cache,这是为了方便各进程进行同步.当然我们获得了我们需要的元素之后我们就可以释放bcache.lock了.

如果所有的块都是忙的,只能返回busy的信息了.我们不需要考虑同步的问题,我们设计的锁能保证同时只有一个进程访问bcache和buf结构体.同样,非零的refcnt保证了只有一个dev number.

睡眠锁保证了就算有许多个进程要访问这个块,仅仅是睡眠而已,因为有多个进程要访问这个文件是很正常的.这样子也可以保证在只有一个进程同时读写这个cache.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// Look through buffer cache for block on device dev.
// If not found, allocate a buffer.
// In either case, return locked buffer.
static struct buf\*
bget(uint dev, uint blockno)
{
struct buf \*b;

acquire(&bcache.lock);

// Is the block already cached?
for(b = bcache.head.next; b != &bcache.head; b = b->next){
if(b->dev == dev && b->blockno == blockno){
b->refcnt++;
release(&bcache.lock);
acquiresleep(&b->lock);
return b;
}
}

// Not cached.
// Recycle the least recently used (LRU) unused buffer.
for(b = bcache.head.prev; b != &bcache.head; b = b->prev){
if(b->refcnt == 0) {
b->dev = dev;
b->blockno = blockno;
b->valid = 0;
b->refcnt = 1;
release(&bcache.lock);
acquiresleep(&b->lock);
return b;
}
}
panic("bget: no buffers");
}

bread()函数:

  • 内部调用了bget函数,该函数保证了返回的已经获取了buf.lockbuf

  • 如果buf还没有获取有效的数据内容,则需要通过virtio_disk_rw()这个函数接口载入数据并将buf的有效位置为1.virtio_disk_rw()函数可以读取磁盘里面的信息,然后把信息传递给

  • 返回这个buf

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    // Return a locked buf with the contents of the indicated block.
    struct buf\*
    bread(uint dev, uint blockno)
    {
    struct buf \*b;

    b = bget(dev, blockno);
    if(!b->valid) {
    virtio_disk_rw(b, 0);
    b->valid = 1;
    }
    return b;
    }

    > `bwrite()`函数把buf中的数据写回磁盘,要求持有`buf.lock`锁

    // Write b's contents to disk. Must be locked.
    void
    bwrite(struct buf \*b)
    {
    if(!holdingsleep(&b->lock))
    panic("bwrite");
    virtio_disk_rw(b, 1);
    }

    brelse()函数:如果调用者完成了对缓冲区的操作,它必须调用brelse来释放它。

  • 首先需要判断是否持有参数buf的锁;

  • 如果持有锁则释放该buf 的锁;这样子其他进程就可以访问这个cache了.所以说综上就是一个进程获取了锁并成功进入之后,执行完brelse就可以让下一个进程接着执行.

  • 获取bcache.lock之后,将buf的引用计数refcnt减1,根据buf的引用计数是否为0来决定是否将buf调整到双链表的表头。

  • 对缓冲区的移动使得列表按照最近使用(释放)进行排序:列表里的第一个缓冲区是最近被使用的,最后一个则是最近最少被使用的。bget里的两个循环利用了这点:在最坏的情况下,扫描一个已经存在的缓冲区必须处理整个列表,当引用处于良好的位置的时候,先检查最近使用的缓冲区将减少扫描时间。通过反向扫描(跟随prev指针),对要重新使用的缓冲区的扫描查找到了最少使用的缓冲区。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    // Release a locked buffer.
    // Move to the head of the most-recently-used list.
    void
    brelse(struct buf \*b)
    {
    if(!holdingsleep(&b->lock))
    panic("brelse");

    releasesleep(&b->lock);

    acquire(&bcache.lock);
    b->refcnt--;
    if (b->refcnt == 0) {
    // no one is waiting for it.
    b->next->prev = b->prev;
    b->prev->next = b->next;
    b->next = bcache.head.next;
    b->prev = &bcache.head;
    bcache.head.next->prev = b;
    bcache.head.next = b;
    }

    release(&bcache.lock);
    }

    > `bpin`和`bunpin`用于调整引用计数`refcnt`.pin就是+1,unpin就是-1.

    void
    bpin(struct buf \*b) {
    acquire(&bcache.lock);
    b->refcnt++;
    release(&bcache.lock);
    }

    void
    bunpin(struct buf \*b) {
    acquire(&bcache.lock);
    b->refcnt--;
    release(&bcache.lock);
    }

Log层

前言

在文件系统的设计里最有趣的问题之一是故障恢复。许多文件系统的操作包含了多次写磁盘的操作,在一串写操作之后的崩溃使得磁盘上的文件系统处于不一致的状态。

xv6系统调用不直接写入硬盘上文件系统的数据结构。相反,它把一个描述放在磁盘上,这个描述是它在一个log里所期望的所有磁盘写操作。一旦系统调用日志记录了所有的写操作,它往磁盘上写入一个特殊的commit记录用来表示那个日志包含了一个完整的操作。那时系统调用才会把所有的写操作写入到磁盘文件系统里的数据结构中。当那些写操作都完成后,系统调用删除磁盘上的日志。

如果系统崩溃并重启,在运行任何进程之前,文件系统代码按如下描述从崩溃中恢复:如果日志被标记为包含一个完整的操作,则恢复代码把写操作复制到磁盘文件系统。如果日志不是标记为包含完整的操作,恢复代码忽略这个日志。恢复代码最后删除日志完成所有的操作。

超级块结构体定义以及磁盘布局

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Disk layout:
// [ boot block super block log inode blocks
// free bit map data blocks]
//
// mkfs computes the super block and builds an initial file system. The
// super block describes the disk layout:
struct superblock {
uint magic; // Must be FSMAGIC
uint size; // Size of file system image (blocks)
uint nblocks; // Number of data blocks
uint ninodes; // Number of inodes.
uint nlog; // Number of log blocks
uint logstart; // Block number of first log block
uint inodestart; // Block number of first inode block
uint bmapstart; // Block number of first free map block
};

Log层结构体定义

这个数据结构和磁盘上的log区域一一对应的。磁盘上的log区域由log.start开始的连续log.sizeblock组成。第一个block即为logheader,待写磁盘块的个数记录在logheader.n中,待写磁盘块号记录在block数组中。log.dev表示该log位于哪一个磁盘(xv6实际上只有一个)。log.outstanding记录了目前有多少个进程正在并行地对磁盘进行写。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Contents of the header block, used for both the on-disk header block
// and to keep track in memory of logged block# before commit.
struct logheader {
int n;
int block[LOGSIZE];
};

struct log {
struct spinlock lock;
int start;
int size;
int outstanding; // how many FS sys calls are executing.
int committing; // in commit(), please wait.
int dev;
struct logheader lh;
};

Log层函数的定义

initlog函数:使用超级块中的字段初始化log。

1
2
3
4
5
6
7
8
9
10
11
12
void
initlog(int dev, struct superblock \*sb)
{
if (sizeof(struct logheader) >= BSIZE)
panic("initlog: too big logheader");

initlock(&log.lock, "log");
log.start = sb->logstart;
log.size = sb->nlog;
log.dev = dev;
recover_from_log();
}

recover_from_log函数,根据函数名称应该是根据之前写入磁盘的日志信息恢复磁盘数据的操作

  • 从磁盘读取log header

  • 将提交的日志写入磁盘

  • 清空日志记录

    1
    2
    3
    4
    5
    6
    7
    8
    static void
    recover_from_log(void)
    {
    read_head();
    install_trans(1); // if committed, copy from log to disk
    log.lh.n = 0;
    write_head(); // clear the log
    }

    从磁盘的读取log header到内存中,位于log所在磁盘块的第一个磁盘块

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    // Read the log header from disk into the in-memory log header
    static void
    read_head(void)
    {
    struct buf \*buf = bread(log.dev, log.start);
    struct logheader \*lh = (struct logheader \*) (buf->data);
    int i;
    log.lh.n = lh->n;
    for (i = 0; i < log.lh.n; i++) {
    log.lh.block[i] = lh->block[i];
    }
    brelse(buf);
    }

    recovering的作用是啥?

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    // Copy committed blocks from log to their home location
    static void
    install_trans(int recovering)
    {
    int tail;

    for (tail = 0; tail < log.lh.n; tail++) {
    struct buf \*lbuf = bread(log.dev, log.start+tail+1); // read log block
    struct buf \*dbuf = bread(log.dev, log.lh.block[tail]); // read dst
    memmove(dbuf->data, lbuf->data, BSIZE); // copy block to dst
    bwrite(dbuf); // write dst to disk
    if(recovering == 0)
    bunpin(dbuf);
    brelse(lbuf);
    brelse(dbuf);
    }
    }

    begin_op()函数:发起写请求,任何对bcache中的block进行写操作之前都需要调用这个函数。

  • 如果日志处于commit状态,则需要等待;

  • 否则判断本次写操作是否会使得transaction需要写的磁盘块超出LOGSIZE;

  • 如果没有超过则将log.outstanding字段加1;

  • 如果超过则需要等待。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    // called at the start of each FS system call.
    void
    begin_op(void)
    {
    acquire(&log.lock);
    while(1){
    if(log.committing){
    sleep(&log, &log.lock);
    } else if(log.lh.n + (log.outstanding+1)\*MAXOPBLOCKS > LOGSIZE){
    // this op might exhaust log space; wait for commit.
    sleep(&log, &log.lock);
    } else {
    log.outstanding += 1;
    release(&log.lock);
    break;
    }
    }
    }
  • 在结束写请求的时候调用end_op()函数,将log.outstanding字段减1;

  • 如果log.committing字段为1,表面日志处于提交状态,抛出异常;

  • 如果所有的进程都结束了写请求,则将log.committing字段以及do_commit置为1;

  • 如果do_commit为1则调用commit函数,将本轮进程写的结果写回磁盘

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    // called at the end of each FS system call.
    // commits if this was the last outstanding operation.
    void
    end_op(void)
    {
    int do_commit = 0;

    acquire(&log.lock);
    log.outstanding -= 1;
    if(log.committing)
    panic("log.committing");
    if(log.outstanding == 0){
    do_commit = 1;
    log.committing = 1;
    } else {
    // begin_op() may be waiting for log space,
    // and decrementing log.outstanding has decreased
    // the amount of reserved space.
    wakeup(&log);
    }
    release(&log.lock);

    if(do_commit){
    // call commit w/o holding locks, since not allowed
    // to sleep with locks.
    commit();
    acquire(&log.lock);
    log.committing = 0;
    wakeup(&log);
    release(&log.lock);
    }
    }

    log_write函数用于把块修改写入内存中的log

通常情况下,我们调用bread函数读取一个块,然后对块的内容进行了修改,然后调用brelse释放这个块。**问题在于log怎么知道哪些块需要写回呢?得用logheader来记录,所以必须有个函数来写logheader记录块的修改,这个函数就是log_write**。

  • 如果是第一次修改这个块,那么执行log.lh.n++操作;

  • 当一个块在单个事务中被多次写入时,log_write会发现它,并在日志里为那个块分配相同的位置。这个优化被称为合并(absorption);

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    void
    log_write(struct buf \*b)
    {
    int i;

    acquire(&log.lock);
    if (log.lh.n >= LOGSIZE log.lh.n >= log.size - 1)
    panic("too big a transaction");
    if (log.outstanding < 1)
    panic("log_write outside of trans");

    for (i = 0; i < log.lh.n; i++) {
    if (log.lh.block[i] == b->blockno) // log absorption
    break;
    }
    log.lh.block[i] = b->blockno;
    if (i == log.lh.n) { // Add new block to log?
    bpin(b);
    log.lh.n++;
    }
    release(&log.lock);
    }

    commit函数可以分为几个阶段:

  • 把内存中需要修改的block写回磁盘的log区域(write_log)

  • 把内存中logheader写回磁盘的log区域头(write_head)

  • 把log区域的磁盘块写回到磁盘的相应位置(install_trans)

  • 把磁盘log区域头标记需要写回的block数目的位置清零log.lh.n = 0 & write_head

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    static void
    commit()
    {
    if (log.lh.n > 0) {
    write_log(); // Write modified blocks from cache to log
    write_head(); // Write header to disk -- the real commit
    install_trans(0); // Now install writes to home locations
    log.lh.n = 0;
    write_head(); // Erase the transaction from the log
    }
    }

    // Copy modified blocks from cache to log.
    static void
    write_log(void)
    {
    int tail;

    for (tail = 0; tail < log.lh.n; tail++) {
    struct buf \*to = bread(log.dev, log.start+tail+1); // log block
    struct buf \*from = bread(log.dev, log.lh.block[tail]); // cache block
    memmove(to->data, from->data, BSIZE);
    bwrite(to); // write the log
    brelse(from);
    brelse(to);
    }
    }

inode层

磁盘上的inode被填充在一个被称为inode块的连续磁盘区域上。每个inode都是一样大小,所以给定一个编号n,很容易就找到磁盘上的第n个inode

两个分配和回收磁盘块的函数

遍历bitmap,分配或者回收block

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// Allocate a zeroed disk block.
static uint
balloc(uint dev)
{
int b, bi, m;
struct buf \*bp;

bp = 0;
for(b = 0; b < sb.size; b += BPB){
bp = bread(dev, BBLOCK(b, sb));
for(bi = 0; bi < BPB && b + bi < sb.size; bi++){
m = 1 << (bi % 8);
if((bp->data[bi/8] & m) == 0){ // Is block free?
bp->data[bi/8] = m; // Mark block in use.
log_write(bp);
brelse(bp);
bzero(dev, b + bi);
return b + bi;
}
}
brelse(bp);
}
panic("balloc: out of blocks");
}

// Free a disk block.
static void
bfree(int dev, uint b)
{
struct buf \*bp;
int bi, m;

bp = bread(dev, BBLOCK(b, sb));
bi = b % BPB;
m = 1 << (bi % 8);
if((bp->data[bi/8] & m) == 0)
panic("freeing free block");
bp->data[bi/8] &= ~m;
log_write(bp);
brelse(bp);
}

inode层的数据结构定义

磁盘上 i 结点结构定义

磁盘上的inode通过struct dinode来定义。type字段区分了文件、目录、和特殊文件(设备)。如类型值为0则表示磁盘inode是空闲的。字段nlink记录了引用当前inode的目录条目的数量,用以识别这个磁盘inode和它的数据块何时应该被释放。size字段记录了文件内容的字节数。addrs数组记录了保存了文件内容的磁盘块的块号,addrs里的前NDIRECT个块被称为直接块(direct blocks);第NDIRECT+1个块记录了NINDIRECT个块的数据,它被称为间接块(indirect block)。

对数据结构中 majorminor 字段的解释:

在Unix系系统中,一切皆是文件。所有硬盘,键盘,网卡等设备都有文件来代表,对应着/dev/下面的文件。对于应用程序来说,可以像对待普通文件一样打开、关闭、读写这些设备文件。但是,这种文件名比如:/dev/sda 、/dev/raw/raw1都是用户空间名称,OS Kernel根本不知道这个名称代指什么。在内核空间是通过majorminor``device number来区分设备的。

**major device number**:可以看做是设备驱动程序,被同一设备驱动程序管理的设备有相同的major device number。这个数字实际是Kernel 中device driver table的索引。这个表保存着不同的设备驱动程序。

**minor device number**:代表被访问的具体设备。也就是说,Kernel根据major device number找到设备驱动程序,然后再从minor device number获得设备位置等属性。

1
2
3
4
5
6
7
8
9
10
11
12
13
// 下面的宏定义用于type字段,type = 0表示该dinode尚未分配
#define T_DIR 1
#define T_FILE 2
#define T_DEVICE 3
// On-disk inode structure
struct dinode {
short type; // File type
short major; // Major device number (T_DEVICE only)
short minor; // Minor device number (T_DEVICE only)
short nlink; // Number of links to inode in file system
uint size; // Size of file (bytes)
uint addrs[NDIRECT+1]; // Data block addresses
};

内存中需保存的i结点信息数据结构定义,除了从磁盘读取的inode信息,还定义了一些其他的字段。内核把活动inode的集合保存在内存中,即struct inoderef字段记录了C指针引用内存里inode的次数,当那个计数降为0的时候内核就会从内存中丢弃这个inode。igetiput函数请求和释放到一个inode的指针,这会修改这个引用计数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// in-memory copy of an inode
struct inode {
uint dev; // Device number
uint inum; // Inode number
int ref; // Reference count
struct sleeplock lock; // protects everything below here
int valid; // inode has been read from disk?

short type; // copy of disk inode
short major;
short minor;
short nlink;
uint size;
uint addrs[NDIRECT+1];
};

4.3 inode层函数定义

iget()函数

遍历itable寻找 i 结点是否已经被加载至内存中,如是则将该 i 结点的ref引用计数加1;在遍历的过程中使用empty对空的 i 结点进行保存;如果 i 结点还未被加载至内存则使用空 i 结点对需要获取的 i 结点进行保存。最后返回对应的 i 结点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
struct {
struct spinlock lock;
struct inode inode[NINODE];
} itable;
// Find the inode with number inum on device dev
// and return the in-memory copy. Does not lock
// the inode and does not read it from disk.
static struct inode\*
iget(uint dev, uint inum)
{
struct inode \*ip, \*empty;

acquire(&itable.lock);

// Is the inode already in the table?
empty = 0;
for(ip = &itable.inode[0]; ip < &itable.inode[NINODE]; ip++){
if(ip->ref > 0 && ip->dev == dev && ip->inum == inum){
ip->ref++;
release(&itable.lock);
return ip;
}
if(empty == 0 && ip->ref == 0) // Remember empty slot.
empty = ip;
}

// Recycle an inode entry.
if(empty == 0)
panic("iget: no inodes");

ip = empty;
ip->dev = dev;
ip->inum = inum;
ip->ref = 1;
ip->valid = 0;
release(&itable.lock);

return ip;
}

ilock函数用于获取ip所☞的 i 结点的锁,同时如果该 i 结点的内容处于无效状态时,需要从磁盘中读取对应设备和结点号的 i 结点,把相关字段拷贝至内存 i 结点中,将该 i 结点的有效位置为1,保证 i 结点的内容是有效的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// Inodes per block.
#define IPB (BSIZE / sizeof(struct dinode))

// Block containing inode i
#define IBLOCK(i, sb) ((i) / IPB + sb.inodestart)

// Lock the given inode.
// Reads the inode from disk if necessary.
void
ilock(struct inode \*ip)
{
struct buf \*bp;
struct dinode \*dip;

if(ip == 0 ip->ref < 1)
panic("ilock");

acquiresleep(&ip->lock);

if(ip->valid == 0){
bp = bread(ip->dev, IBLOCK(ip->inum, sb));
dip = (struct dinode\*)bp->data + ip->inum%IPB;
ip->type = dip->type;
ip->major = dip->major;
ip->minor = dip->minor;
ip->nlink = dip->nlink;
ip->size = dip->size;
memmove(ip->addrs, dip->addrs, sizeof(ip->addrs));
brelse(bp);
ip->valid = 1;
if(ip->type == 0)
panic("ilock: no type");
}
}

iunlock() 函数:释放ip所☞的 i 结点的锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
// Unlock the given inode.
void
iunlock(struct inode \*ip)
{
if(ip == 0 !holdingsleep(&ip->lock) ip->ref < 1)
panic("iunlock");

releasesleep(&ip->lock);
}

> `idup()`函数类似于`bpin` ,`iput()`函数类似于`brelse`函数。
>
> 当`iput`函数判断自己是最后一个持有该inode指针、该inode的内容是有效的、该inode的引用链接数为0时,会将该inode进行删除。

// Increment reference count for ip.
// Returns ip to enable ip = idup(ip1) idiom.
struct inode\*
idup(struct inode \*ip)
{
acquire(&itable.lock);
ip->ref++;
release(&itable.lock);
return ip;
}
// Drop a reference to an in-memory inode.
// If that was the last reference, the inode table entry can
// be recycled.
// If that was the last reference and the inode has no links
// to it, free the inode (and its content) on disk.
// All calls to iput() must be inside a transaction in
// case it has to free the inode.
void
iput(struct inode \*ip)
{
acquire(&itable.lock);

if(ip->ref == 1 && ip->valid && ip->nlink == 0){
// inode has no links and no other references: truncate and free.

// ip->ref == 1 means no other process can have ip locked,
// so this acquiresleep() won't block (or deadlock).
acquiresleep(&ip->lock);

release(&itable.lock);

itrunc(ip);
ip->type = 0;
iupdate(ip);
ip->valid = 0;

releasesleep(&ip->lock);

acquire(&itable.lock);
}

ip->ref--;
release(&itable.lock);
}

> `itrunc()`函数先释放直接块,再释放间接块里所列出的那些块,最后再释放间接块自身。

// Truncate(截断) inode (discard contents).
// Caller must hold ip->lock.
void
itrunc(struct inode \*ip)
{
int i, j;
struct buf \*bp;
uint \*a;
// 遍历前面NDIRECT个直接块,并进行释放块操作
for(i = 0; i < NDIRECT; i++){
if(ip->addrs[i]){
bfree(ip->dev, ip->addrs[i]);
ip->addrs[i] = 0;
}
}

if(ip->addrs[NDIRECT]){
bp = bread(ip->dev, ip->addrs[NDIRECT]);
a = (uint\*)bp->data;
// 遍历后面NINDIRECT个间接块,进行释放操作
for(j = 0; j < NINDIRECT; j++){
if(a[j])
bfree(ip->dev, a[j]);
}
brelse(bp);
bfree(ip->dev, ip->addrs[NDIRECT]);
ip->addrs[NDIRECT] = 0;
}

ip->size = 0;
iupdate(ip);
}

bmap()函数用于返回第bn个数据块的硬盘块号,参数ip用于表示inode的指针。如果ip里没有那个块,bmap会给它分配一个。

(注:由于一个块的大小BSIZE是1024字节且NDIRECT是12,所以一个文件可以直接载入的内容为12k字节。由于NINDIRECT是256,所以读取间接块后可以载入的内容是256k字节。)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
static uint
bmap(struct inode \*ip, uint bn)
{
uint addr, \*a;
struct buf \*bp;
// 判断是否为直接块
if(bn < NDIRECT){
if((addr = ip->addrs[bn]) == 0)
ip->addrs[bn] = addr = balloc(ip->dev);
return addr;
}
bn -= NDIRECT;
// 判断是否是间接块
if(bn < NINDIRECT){
// Load indirect block, allocating if necessary.
if((addr = ip->addrs[NDIRECT]) == 0)
ip->addrs[NDIRECT] = addr = balloc(ip->dev);
bp = bread(ip->dev, addr);
a = (uint\*)bp->data;
if((addr = a[bn]) == 0){
a[bn] = addr = balloc(ip->dev);
log_write(bp);
}
brelse(bp);
return addr;
}

panic("bmap: out of range");
}

writei()函数和readi()函数

比较简单,首先检查文件偏移和写入/读取字节大小是否合法,合法后对数据进行循环拷贝.写入时,读取块,将源数据拷贝至缓冲区;读出时,读取块,将块数据拷贝至对应的区域。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// Write data to inode.
// Caller must hold ip->lock.
// If user_src==1, then src is a user virtual address;
// otherwise, src is a kernel address.
// Returns the number of bytes successfully written.
// If the return value is less than the requested n,
// there was an error of some kind.
int
writei(struct inode \*ip, int user_src, uint64 src, uint off, uint n)
{
uint tot, m;
struct buf \*bp;

if(off > ip->size off + n < off)
return -1;
if(off + n > MAXFILE\*BSIZE)
return -1;

for(tot=0; tot<n; tot+=m, off+=m, src+=m){
bp = bread(ip->dev, bmap(ip, off/BSIZE));
m = min(n - tot, BSIZE - off%BSIZE);
if(either_copyin(bp->data + (off % BSIZE), user_src, src, m) == -1) {
brelse(bp);
break;
}
log_write(bp);
brelse(bp);
}

if(off > ip->size)
ip->size = off;

// write the i-node back to disk even if the size didn't change
// because the loop above might have called bmap() and added a new
// block to ip->addrs[].
iupdate(ip);

return tot;
}

int
readi(struct inode \*ip, int user_dst, uint64 dst, uint off, uint n)
{
uint tot, m;
struct buf \*bp;

if(off > ip->size off + n < off)
return 0;
if(off + n > ip->size)
n = ip->size - off;

for(tot=0; tot<n; tot+=m, off+=m, dst+=m){
bp = bread(ip->dev, bmap(ip, off/BSIZE));
m = min(n - tot, BSIZE - off%BSIZE);
if(either_copyout(user_dst, dst, bp->data + (off % BSIZE), m) == -1) {
brelse(bp);
tot = -1;
break;
}
brelse(bp);
}
return tot;
}

Directory层

directory层结构体定义

inum:该目录项对应的inode号

name:该目录项的名称

1
2
3
4
struct dirent {
ushort inum;
char name[DIRSIZ];
};

directory层函数定义

函数dirloopup()在目录 i 结点所包含的下一层的所有文件中找到名称为name的文件目录项,首先判断inode的type是否为T_DIR (目录) ,如果不是则抛出异常;接着循环读取inode所☞的目录文件,读取每一个目录项,如果找到名字为name的目录项,则查找 该目录项指向的inode并返回该inode。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// Look for a directory entry in a directory.
// If found, set \*poff to byte offset of entry.
struct inode\*
dirlookup(struct inode \*dp, char \*name, uint \*poff)
{
uint off, inum;
struct dirent de;

if(dp->type != T_DIR)
panic("dirlookup not DIR");

for(off = 0; off < dp->size; off += sizeof(de)){
if(readi(dp, 0, (uint64)&de, off, sizeof(de)) != sizeof(de))
panic("dirlookup read");
if(de.inum == 0)
continue;
if(namecmp(name, de.name) == 0){
// entry matches path element
if(poff)
\*poff = off;
inum = de.inum;
return iget(dp->dev, inum);
}
}

return 0;
}

namex()函数

  • namex首先决定路径计算从哪里开始。如果路径是从斜杠开始的,计算从根目录开始;否则,就从当前目录开始;
  • skipelem来依次得到路径里的每个元素;
  • 每次循环都必须在当前inode(ip)查找name
  • 循环首先锁定ip并检查它是否是一个目录,如果不是,查找失败;
  • 如果这个调用是nameiparent并且这是最后一个路径元素,按照nameiparent的定义,循环终止;
  • 最后,使用dirlookup查找路径元素,并通过设置ip = next为下一次循环做好准备。当循环遍历完所有的路径元素退出后,namex返回ip
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    // Look up and return the inode for a path name.
    // If parent != 0, return the inode for the parent and copy the final
    // path element into name, which must have room for DIRSIZ bytes.
    // Must be called inside a transaction since it calls iput().
    static struct inode\*
    namex(char \*path, int nameiparent, char \*name)
    {
    struct inode \*ip, \*next;

    if(\*path == '/')
    ip = iget(ROOTDEV, ROOTINO);
    else
    ip = idup(myproc()->cwd);

    while((path = skipelem(path, name)) != 0){
    ilock(ip);
    if(ip->type != T_DIR){
    iunlockput(ip);
    return 0;
    }
    if(nameiparent && \*path == '\\0'){
    // Stop one level early.
    iunlock(ip);
    return ip;
    }
    if((next = dirlookup(ip, name, 0)) == 0){
    iunlockput(ip);
    return 0;
    }
    iunlockput(ip);
    ip = next;
    }
    if(nameiparent){
    iput(ip);
    return 0;
    }
    return ip;
    }
    // Paths

    // Copy the next path element from path into name.
    // Return a pointer to the element following the copied one.
    // The returned path has no leading slashes,
    // so the caller can check \*path=='\\0' to see if the name is the last one.
    // If no name to remove, return 0.
    //
    // Examples:
    // skipelem("a/bb/c", name) = "bb/c", setting name = "a"
    // skipelem("///a//bb", name) = "bb", setting name = "a"
    // skipelem("a", name) = "", setting name = "a"
    // skipelem("", name) = skipelem("////", name) = 0
    //
    static char\*
    skipelem(char \*path, char \*name)
    {
    char \*s;
    int len;
    // 过滤路径字符串开头所有的'/'
    while(\*path == '/')
    path++;
    if(\*path == 0)
    return 0;
    s = path;
    while(\*path != '/' && \*path != 0)
    path++;
    len = path - s;
    if(len >= DIRSIZ)
    memmove(name, s, DIRSIZ);
    else {
    memmove(name, s, len);
    name[len] = 0;
    }
    while(\*path == '/')
    path++;
    return path;
    }

    > `dirlink()`函数
    >
    > 在目录 i 结点dp中添加一个新的名字为name的目录项

    // Write a new directory entry (name, inum) into the directory dp.
    int
    dirlink(struct inode \*dp, char \*name, uint inum)
    {
    int off;
    struct dirent de;
    struct inode \*ip;

    // Check that name is not present.
    if((ip = dirlookup(dp, name, 0)) != 0){
    iput(ip);
    return -1;
    }

    // Look for an empty dirent.
    for(off = 0; off < dp->size; off += sizeof(de)){
    if(readi(dp, 0, (uint64)&de, off, sizeof(de)) != sizeof(de))
    panic("dirlink read");
    if(de.inum == 0)
    break;
    }

    strncpy(de.name, name, DIRSIZ);
    de.inum = inum;
    if(writei(dp, 0, (uint64)&de, off, sizeof(de)) != sizeof(de))
    panic("dirlink");

    return 0;
    }

file descriptor层

file descriptor层的数据结构定义

readablewritable刻画文件的读写权限,off表示文件读写偏移;

ftable管理所有打开的文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
struct file {
enum { FD_NONE, FD_PIPE, FD_INODE, FD_DEVICE } type;
int ref; // reference count
char readable;
char writable;
struct pipe \*pipe; // FD_PIPE
struct inode \*ip; // FD_INODE and FD_DEVICE
uint off; // FD_INODE
short major; // FD_DEVICE
};

struct {
struct spinlock lock;
struct file file[NFILE];
} ftable;

file descriptor层函数定义

filealloc扫描文件表查找未引用的文件(f->ref == 0)并返回一个新的引用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// Allocate a file structure.
struct file\*
filealloc(void)
{
struct file \*f;

acquire(&ftable.lock);
for(f = ftable.file; f < ftable.file + NFILE; f++){
if(f->ref == 0){
f->ref = 1;
release(&ftable.lock);
return f;
}
}
release(&ftable.lock);
return 0;
}

filedup递增引用计数。

1
2
3
4
5
6
7
8
9
10
11
// Increment ref count for file f.
struct file\*
filedup(struct file \*f)
{
acquire(&ftable.lock);
if(f->ref < 1)
panic("filedup");
f->ref++;
release(&ftable.lock);
return f;
}

fileclose递减引用计数。当引用计数降为0,释放底层对应的管道或inode。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// Close file f.  (Decrement ref count, close when reaches 0.)
void
fileclose(struct file \*f)
{
struct file ff;

acquire(&ftable.lock);
if(f->ref < 1)
panic("fileclose");
if(--f->ref > 0){
release(&ftable.lock);
return;
}
ff = \*f;
f->ref = 0;
f->type = FD_NONE;
release(&ftable.lock);

if(ff.type == FD_PIPE){
pipeclose(ff.pipe, ff.writable);
} else if(ff.type == FD_INODE ff.type == FD_DEVICE){
begin_op();
iput(ff.ip);
end_op();
}
}

filestat实现了对文件的stat操作(stat操作是文件/文件系统的详细信息显示)。它只允许操作inode,然后调用stati;最后将数据从内核拷贝至用户。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// Get metadata about file f.
// addr is a user virtual address, pointing to a struct stat.
int
filestat(struct file \*f, uint64 addr)
{
struct proc \*p = myproc();
struct stat st;

if(f->type == FD_INODE f->type == FD_DEVICE){
ilock(f->ip);
stati(f->ip, &st);
iunlock(f->ip);
if(copyout(p->pagetable, addr, (char \*)&st, sizeof(st)) < 0)
return -1;
return 0;
}
return -1;
}

fileread实现了对文件的read操作。首先检查是否允许readable模式,然后把调用传递给管道或inode的实现。如果文件是一个inode,它使用I/O位移作为读操作的位移,然后增加这个位移。管道没有位移的概念。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// Read from file f.
// addr is a user virtual address.
int
fileread(struct file \*f, uint64 addr, int n)
{
int r = 0;

if(f->readable == 0)
return -1;

if(f->type == FD_PIPE){
r = piperead(f->pipe, addr, n);
} else if(f->type == FD_DEVICE){
if(f->major < 0 f->major >= NDEV !devsw[f->major].read)
return -1;
r = devsw[f->major].read(1, addr, n);
} else if(f->type == FD_INODE){
ilock(f->ip);
if((r = readi(f->ip, 1, addr, f->off, n)) > 0)
f->off += r;
iunlock(f->ip);
} else {
panic("fileread");
}

return r;
}

filewrite实现了对文件的write操作。首先检查是否允许writeable模式,然后把调用传递给管道或inode的实现。如果文件是一个inode,它使用I/O位移作为写操作的位移,然后增加这个位移。管道没有位移的概念。要记得inode函数需要调用者管理锁定。inode的锁定有一个方便的附加效果,即读写位移是自动更新的,因此同时对一个文件的多个写操作不会覆盖彼此的数据,但这些写操作可能会交错在一起。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
// Write to file f.
// addr is a user virtual address.
int
filewrite(struct file \*f, uint64 addr, int n)
{
int r, ret = 0;

if(f->writable == 0)
return -1;

if(f->type == FD_PIPE){
ret = pipewrite(f->pipe, addr, n);
} else if(f->type == FD_DEVICE){
if(f->major < 0 f->major >= NDEV !devsw[f->major].write)
return -1;
ret = devsw[f->major].write(1, addr, n);
} else if(f->type == FD_INODE){
// write a few blocks at a time to avoid exceeding
// the maximum log transaction size, including
// i-node, indirect block, allocation blocks,
// and 2 blocks of slop for non-aligned writes.
// this really belongs lower down, since writei()
// might be writing a device like the console.
int max = ((MAXOPBLOCKS-1-1-2) / 2) \* BSIZE;
int i = 0;
while(i < n){
int n1 = n - i;
if(n1 > max)
n1 = max;

begin_op();
ilock(f->ip);
if ((r = writei(f->ip, 1, addr + i, f->off, n1)) > 0)
f->off += r;
iunlock(f->ip);
end_op();

if(r != n1){
// error from writei
break;
}
i += r;
}
ret = (i == n ? n : -1);
} else {
panic("filewrite");
}

return ret;
}

sys_link从获取它的两个字符串参数oldnew开始。如果old存在且不是目录,递增它的ip->nlink计数。然后调用nameiparent来找到new的父目录和最终的路径元素,并创建一个新的目录条目来指向old的inode。新的上级目录必须存在且和inode在同一设备上:inode号在单独的磁盘上有唯一的意义。如果发生了这样的错误,sys_link必须回退且递减ip->nlink

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// Create the path new as a link to the same inode as old.
uint64
sys_link(void)
{
char name[DIRSIZ], new[MAXPATH], old[MAXPATH];
struct inode \*dp, \*ip;

if(argstr(0, old, MAXPATH) < 0 argstr(1, new, MAXPATH) < 0)
return -1;

begin_op();
if((ip = namei(old)) == 0){
end_op();
return -1;
}

ilock(ip);
if(ip->type == T_DIR){
iunlockput(ip);
end_op();
return -1;
}

ip->nlink++;
iupdate(ip);
iunlock(ip);

if((dp = nameiparent(new, name)) == 0)
goto bad;
ilock(dp);
if(dp->dev != ip->dev dirlink(dp, name, ip->inum) < 0){
iunlockput(dp);
goto bad;
}
iunlockput(dp);
iput(ip);

end_op();

return 0;

bad:
ilock(ip);
ip->nlink--;
iupdate(ip);
iunlockput(ip);
end_op();
return -1;
}

……还有很多与文件相关的系统调用,不一一列举了

读写操作和设备文件

file.cfile.h文件中记录了xv6的驱动

1
2
3
4
5
// map major device number to device functions.
struct devsw {
int (\*read)(int, uint64, int);
int (\*write)(int, uint64, int);
};

struct devsw devsw[NDEV];// 其中NDEV = 10,xv6最多支持10种不同的驱动
#define CONSOLE 1// xv6只实现了Console的读写

console.c种,console的读写绑定到devsw上了

1
2
3
4
5
6
7
8
9
10
11
12
void
consoleinit(void)
{
initlock(&cons.lock, "cons");

uartinit();

// connect read and write system calls
// to consoleread and consolewrite.
devsw[CONSOLE].read = consoleread;
devsw[CONSOLE].write = consolewrite;
}

init.c中的一段代码

  • 创建设备文件console;
  • CONSOLE(major号)绑定到console文件上;
  • 打开console。
    1
    2
    3
    4
    5
    6
    if(open("console", O_RDWR) < 0){
    mknod("console", CONSOLE, 0);
    open("console", O_RDWR);
    }
    dup(0); // stdout
    dup(0); // stderr
    一个设备文件总是绑定了一个驱动函数,打开设备文件后的读写,就相当于调用相应的驱动程序

7:Lock

许多操作系统内核,包括xv6都保持着多线程多进程执行,首先是因为这个xv6有许多个微处理器,这些处理器(CPU)是独立地执行一段代码,共享物理内存,这个时候就会有问题,就是在一个CPU读取数据的时候,另外一个CPU会写数据,或者说多个CPU同时写数据.这些都会出现问题.所以说多进程多线程的同步问题是非常重要的,我们需要一系列同步的手段来保证同步.所以这个词“并发性”代表多个指令同时执行的情况,由于中断操作,线程切换以及多核并行执行,我们不得不考虑并发性的问题.

这一章我们会举出一些由于并发性导致的程序执行错误的例子,并着重介绍一个使用广泛的并发操作:锁.

race

首先我们举一个简单的例子来解释什么叫做race现象.假设有两个进程同时调用wait.wait需要free掉子进程的内存,所以说在每个CPU,这个内核会调用kfree来执行操作,kalloc()会从空闲页链表中取出链表首部,kfree()就是从空闲页链表中放一个页进入首部.这个时候两个进程都会进入到wait调用中,这个时候程序的结果是不确定的!

因为你不知道程序执行的顺序,因为CPU的调度我们具体也无法得知.假设这两个进程分别是A和B,这个时候有一种可能就是,A先取了链表的头,B随后取链表的头,A改完之后放进去,B改完之后放进去.这个时候B的修改覆盖了A的修改,导致了执行的错误.

这个叫做race现象,具体的来说就是,对于内存区域的同一块区域有多个进程同时访问,往往,出现race现象就代表着离出现bug也不远了.对于争用现象,最重要的问题就是我们对与并行程序的控制不是很够,我们不知道并行程序的执行顺序,所以说我们要加上一点控制手段来控制执行程序.

spinlocks

xv6自旋锁(spinlock)用于内核临界区访问的同步和互斥。自旋锁最大的特征是当进程拿不到锁时会进入无限循环,直到拿到锁退出循环。Xv6使用100ms一次的时钟中断和Round-Robin调度算法来避免陷入自旋锁的进程一直无限循环下去。Xv6允许同时运行多个CPU核,多核CPU上的等待队列实现相当复杂,因此使用自旋锁是相对比较简单且能正确执行的实现方案。

1
2
3
4
5
6
7
8
// Mutual exclusion lock.
struct spinlock {
uint locked; // Is the lock held?

// For debugging:
char \*name; // Name of lock.
struct cpu \*cpu; // The cpu holding the lock.
};

最重要的就是一个int类型的变量,这个变量帮助我们记录一下锁有没有被使用.

首先调用锁之前我们要对锁进行初始化,初始化的操作就是:

1
2
3
4
5
6
7
void
initlock(struct spinlock \*lk, char \*name)
{
lk->name = name;
lk->locked = 0;
lk->cpu = 0;
}

给定一个锁结构,还有一个名字,把名字存进去,其他的元素赋值0即可.

RISC-V会给定一个指令,这个指令叫做amoswap,这是一个原子指令,使用方法是这个amoswap r, a.本质上就是lw指令,把地址a的内存元素放进r对应的寄存器,但是有一点比较特殊的是:这个操作可以保证在执行操作的时候r和a都不会被其他CPU访问到.保证了同步控制.(不给其他CPU访问相当于规定了对于这个内存的访问顺序).

xv6使用acquire来获取锁的控制:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
void
acquire(struct spinlock \*lk)
{
push_off(); // disable interrupts to avoid deadlock.
if(holding(lk))
panic("acquire");

// On RISC-V, sync_lock_test_and_set turns into an atomic swap:
// a5 = 1
// s1 = &lk->locked
// amoswap.w.aq a5, a5, (s1)
while(__sync_lock_test_and_set(&lk->locked, 1) != 0)
;

// Tell the C compiler and the processor to not move loads or stores
// past this point, to ensure that the critical section's memory
// references happen strictly after the lock is acquired.
// On RISC-V, this emits a fence instruction.
__sync_synchronize();

// Record info about lock acquisition for holding() and debugging.
lk->cpu = mycpu();
}

这个函数会一直调用一个C的库函数,这个库函数会按照注释的方法调用来获取数据,如果这个锁一直都没有释放,就让它一直这么循环下去,直到锁被释放了为止.一释放这个进程就获得锁,在获得的第一瞬间把lock值赋值1.然后获取之后为了调试需要顺便记录一下CPU的值.

下面是锁的释放:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
void
release(struct spinlock \*lk)
{
if(!holding(lk))
panic("release");

lk->cpu = 0;

// Tell the C compiler and the CPU to not move loads or stores
// past this point, to ensure that all the stores in the critical
// section are visible to other CPUs before the lock is released,
// and that loads in the critical section occur strictly before
// the lock is released.
// On RISC-V, this emits a fence instruction.
__sync_synchronize();

// Release the lock, equivalent to lk->locked = 0.
// This code doesn't use a C assignment, since the C standard
// implies that an assignment might be implemented with
// multiple store instructions.
// On RISC-V, sync_lock_release turns into an atomic swap:
// s1 = &lk->locked
// amoswap.w zero, zero, (s1)
__sync_lock_release(&lk->locked);

pop_off();
}

首先把cpu的值设置为0,然后通过同步的赋值指令把值赋值,这个就代表这个进程结束了对临界区的访问,现在可以让其他进程访问了.

自旋锁的使用.

xv6使用锁来防止race现象的发生.对于使用锁,下面有几个基本的准则:

第一个就是如果有一个变量,一个CPU写的时候另外一个CPU可以去读,这个时候我们需要保护这个变量.第二,记住锁要保护不变量(不变量指的是在某些时候不能被其他程序改变的变量).总的来说就是,如果有可能有多个进程会对一个数据结构进行更改的话,那么我们必须得采取一定的措施来让进程有序地访问数据结构.

当然过分地使用锁也是不应该的,如果这个程序只可能有一个进程访问,那么就没必要使用锁.

使用锁的一个特征就是,我们把访问和修改数据结构的代码称为临界区,当进入临界区的时候获得锁,当离开临界区的时候释放锁.

获得锁

临界区

释放锁

防止死锁

我们首先得知道,对于不同锁,我们要保证锁的获取顺序是一定的,如果有一段代码要求先获得A锁,再获得B锁.另外一段代码要求获得B锁,在获得A锁,这个时候就会造成卡死.因为一段代码等待B锁,另外一段代码等待A锁.就互相等待,卡死了.

在xv6里面也有很多这样的锁链,比如说在console.c中,首先获取了cons.lock,接着调用wakeup函数,这个又获得p.lock.在文件系统中也是首先获取vdisk.lock再获取p->lock.xv6里面的锁都有着巧妙的安排,这样可以避免死锁的发生.

死锁有可能是程序的逻辑结构出现了问题,有时锁的特征事先不知道,可能是因为必须持有一个锁才能发现下一个要获取的锁的特征。最后,死锁发生与否取决于你怎么规划和使用锁,使用的锁越多就越容易死锁.

一个比较朴素的方法就是使用“重入锁”.重入锁的设计就是如果一一个锁是被一个进程拥有的,我们还是允许这个进程再获得这个锁.而不是执行panic指令.然而,事实证明,重入锁使并发性更难推理:重入锁打破了锁导致关键部分相对于其他关键部分是原子的直觉。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
struct spinlock lock;
int data = 0; // protected by lock
f() {
acquire(&lock);
if(data == 0){
call_once();
h();
data = 1;
}
release(&lock);
}
g() {
aquire(&lock);
if(data == 0){
call_once();
data = 1;
}
release(&lock);
}

在这里面,假设h函数就是执行g函数的调用,如果饮用了重入锁的话,这个call_once就会执行两遍.但是不引入重入锁的话就会导致死锁.出于这个问题,我们还是放弃了可重入锁,所以说对于程序的编写者来说,我们尽量确定好锁的申请顺序,防止A进程拥有a锁等待b锁,B进程拥有b锁等待a锁的存在.

获取锁之后进入临界区的时候,由于防止被中断打断进入中断处理程序从而导致死锁,所以说获得锁的死后就会打断中断.举个例子,比如说处理时钟中断的时候,clockintr会获得tickslock.

1
2
3
4
5
6
7
8
void
clockintr()
{
acquire(&tickslock);
ticks++;
wakeup(&ticks);
release(&tickslock);
}

与此同时,在执行中断操作的sys_sleep函数也会访问tickslock:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
uint64
sys_sleep(void)
{
int n;
uint ticks0;

if(argint(0, &n) < 0)
return -1;
acquire(&tickslock);
ticks0 = ticks;
while(ticks - ticks0 < n){
if(myproc()->killed){
release(&tickslock);
return -1;
}
sleep(&ticks, &tickslock);
}
release(&tickslock);
return 0;
}

所以说为了避免中断打断之后的死锁现象,一个CPU运行的代码获得了锁之后,这个CPU的中断就会被禁止.所以说这个时候就要禁用中断,但是禁用中断的时候要改变中断使能寄存器:我们就用一个栈来存储之,这个存储中断使能寄存器的函数就是push_pffpop_off.当然一个CPU对应上的进程能开启多个锁,所以说我们用一个intena来代表这个进程拥有多少锁.所以说我们在acquire中申请一遍,在release中再申请一遍.

为了避免死锁,xv6中规定若一个进程持有spinlock,则必须要禁用中断。如果此时中断开启,那么可能会出现以下死锁情况:

  1. 进程A在内核态运行并拿下了p锁时,触发中断进入中断处理程序。
  2. 中断处理程序也在内核态中请求p锁,由于锁在A进程手里,且只有A进程执行时才能释放p锁,因此中断处理程序必须返回,p锁才能被释放。
  3. 那么此时中断处理程序会永远拿不到锁,陷入无限循环,进入死锁。

因此调用push_offpop_off来禁用和开启中断。

获取锁的过程可能嵌套:一个进程获取了锁A,然后再获取锁B,释放锁B时,由于A还未释放,因此不能开启中断。注:struct cpu中的noff记录目前的深度,intena记录在获取第一把锁之前的中断使能状态,当深度为0且intena为1(所有锁都被释放且最初的使能状态为1)时,才开启中断。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// push_off/pop_off are like intr_off()/intr_on() except that they are matched:
// it takes two pop_off()s to undo two push_off()s. Also, if interrupts
// are initially off, then push_off, pop_off leaves them off.

void
push_off(void)
{
// 获取之前的中断使能状态
int old = intr_get();
// 关闭中断使能
intr_off();
if(mycpu()->noff == 0)
mycpu()->intena = old;
mycpu()->noff += 1;
}

void
pop_off(void)
{
struct cpu \*c = mycpu();
if(intr_get())
panic("pop_off - interruptible");
if(c->noff < 1)
panic("pop_off");
c->noff -= 1;
if(c->noff == 0 && c->intena)
intr_on();
}

指令和内存顺序

我们一般会认为指令的执行顺序和我们代码的编辑顺序是一样的,但是其实这个并不绝对.CPU为了执行的速度,往往会做一些修改.假如说A和B一组指令,CPU可能会先让B执行,可能因为B的输入比A的输入先准备好,也有可能是因为执行B的时间比A长,我们不如先执行B,,这样B和A可以同步执行,增加CPU的效率,当然对于编译器,编译器也可能会把后面的指令移动到前面.当然CPU和编译器这么做的前提就是我不会改变输出的结果.但是,这样的特性对于锁来说是不好的,这对多处理器的情况下,有可能会出现与时间有关的问题.总而言之,CPU和编译器可能会提高效率改变指令执行的顺序.

比如说下面的这个例子:假如说第4行的语句移动到第6行之后就会引起严重的后果.

1
2
3
4
5
6
1  l = malloc(sizeof \*l);
2 l->data = data;
3 acquire(&listlock);
4 l->next = list;
5 list = l;
6 release(&listlock);

这个时候对于其他的进程来说看到了被更新的list,但是看到了没有被初始化的list->next.这样是不好的.

去告诉CPU和编译器不要执行这样的重新编排,我们在acquire和release中执行了 __sync_synchronize()函数,这个函数类似于一个memery barrier.之前的指令不可以跨过这个memeory_barrier执行.

睡眠锁

自旋锁是这个进程一直等待别的进程把锁释放,这个进程在一直执行一个循环等待释放,这个锁适用于获得锁到释放锁用不了多久的时间的那种,等待的一方执行执行不了不久循环.但是万一这个锁要使用很久呢,一直等待别人,等一会儿还行,等很久我不如先睡一会儿,等别人用完了再叫醒我就好了.

所以说xv6提供一个睡眠锁.

1
2
3
4
5
6
7
8
9
10
11
void
acquiresleep(struct sleeplock \*lk)
{
acquire(&lk->lk);
while (lk->locked) {
sleep(lk, &lk->lk);
}
lk->locked = 1;
lk->pid = myproc()->pid;
release(&lk->lk);
}

获得锁的时候,自旋锁就是一直查询,查询到可以使用锁了就接着往下走,但是这个睡眠锁就不一样了,当锁被占用的时候,它就直接引入睡眠模式,当有进程release这个锁的时候这个进程就会被唤醒,然后再查询一遍锁是否是可用的即可.如果是可用的,就往下执行和spinlock一样的操作.

总结:自旋锁就是一直查,睡眠锁就是被叫醒一次查一次.

对于release操作也是一样:首先清空标记,然后把所有为了lk而等待的进程全部唤醒.

1
2
3
4
5
6
7
8
9
void
releasesleep(struct sleeplock \*lk)
{
acquire(&lk->lk);
lk->locked = 0;
lk->pid = 0;
wakeup(lk);
release(&lk->lk);
}