Lab8: Locks
Memory allocator
本实验完成的任务是为每个CPU都维护一个空闲列表,初始时将所有的空闲内存分配到某个CPU,此后各个CPU需要内存时,如果当前CPU的空闲列表上没有,则窃取其他CPU的。例如,所有的空闲内存初始分配到CPU0,当CPU1需要内存时就会窃取CPU0的,而使用完成后就挂在CPU1的空闲列表,此后CPU1再次需要内存时就可以从自己的空闲列表中取。
(1). 将kmem
定义为一个数组,包含NCPU
个元素,即每个CPU对应一个
struct {
struct spinlock lock;
struct run *freelist;
} kmem[NCPU];
(2). 修改kinit
,为所有锁初始化以“kmem”开头的名称,该函数只会被一个CPU调用,freerange
调用kfree
将所有空闲内存挂在该CPU的空闲列表上
void
kinit()
{
char lockname[8];
for(int i = 0;i < NCPU; i++) {
snprintf(lockname, sizeof(lockname), "kmem_%d", i);
initlock(&kmem[i].lock, lockname);
}
freerange(end, (void*)PHYSTOP);
}
(3). 修改kfree
,使用cpuid()
和它返回的结果时必须关中断,请参考《XV6使用手册》第7.4节
void
kfree(void *pa)
{
struct run *r;
if(((uint64)pa % PGSIZE) != 0 || (char*)pa < end || (uint64)pa >= PHYSTOP)
panic("kfree");
// Fill with junk to catch dangling refs.
memset(pa, 1, PGSIZE);
r = (struct run*)pa;
push_off(); // 关中断
int id = cpuid();
acquire(&kmem[id].lock);
r->next = kmem[id].freelist;
kmem[id].freelist = r;
release(&kmem[id].lock);
pop_off(); //开中断
}
(4). 修改kalloc
,使得在当前CPU的空闲列表没有可分配内存时窃取其他内存的
void *
kalloc(void)
{
struct run *r;
push_off();// 关中断
int id = cpuid();
acquire(&kmem[id].lock);
r = kmem[id].freelist;
if(r)
kmem[id].freelist = r->next;
else {
int antid; // another id
// 遍历所有CPU的空闲列表
for(antid = 0; antid < NCPU; ++antid) {
if(antid == id)
continue;
acquire(&kmem[antid].lock);
r = kmem[antid].freelist;
if(r) {
kmem[antid].freelist = r->next;
release(&kmem[antid].lock);
break;
}
release(&kmem[antid].lock);
}
}
release(&kmem[id].lock);
pop_off(); //开中断
if(r)
memset((char*)r, 5, PGSIZE); // fill with junk
return (void*)r;
}
Buffer cache
这个实验的目的是将缓冲区的分配与回收并行化以提高效率,这个实验折腾了一天,有些内容还是比较绕的,
(1). 定义哈希桶结构,并在bcache
中删除全局缓冲区链表,改为使用素数个散列桶
#define NBUCKET 13
#define HASH(id) (id % NBUCKET)
struct hashbuf {
struct buf head; // 头节点
struct spinlock lock; // 锁
};
struct {
struct buf buf[NBUF];
struct hashbuf buckets[NBUCKET]; // 散列桶
} bcache;
(2). 在binit
中,(1)初始化散列桶的锁,(2)将所有散列桶的head->prev
、head->next
都指向自身表示为空,(3)将所有的缓冲区挂载到bucket[0]
桶上,代码如下
void
binit(void) {
struct buf* b;
char lockname[16];
for(int i = 0; i < NBUCKET; ++i) {
// 初始化散列桶的自旋锁
snprintf(lockname, sizeof(lockname), "bcache_%d", i);
initlock(&bcache.buckets[i].lock, lockname);
// 初始化散列桶的头节点
bcache.buckets[i].head.prev = &bcache.buckets[i].head;
bcache.buckets[i].head.next = &bcache.buckets[i].head;
}
// Create linked list of buffers
for(b = bcache.buf; b < bcache.buf + NBUF; b++) {
// 利用头插法初始化缓冲区列表,全部放到散列桶0上
b->next = bcache.buckets[0].head.next;
b->prev = &bcache.buckets[0].head;
initsleeplock(&b->lock, "buffer");
bcache.buckets[0].head.next->prev = b;
bcache.buckets[0].head.next = b;
}
}
(3). 在buf.h中增加新字段timestamp
,这里来理解一下这个字段的用途:在原始方案中,每次brelse
都将被释放的缓冲区挂载到链表头,禀明这个缓冲区最近刚刚被使用过,在bget
中分配时从链表尾向前查找,这样符合条件的第一个就是最久未使用的。而在提示中建议使用时间戳作为LRU判定的法则,这样我们就无需在brelse
中进行头插法更改结点位置
struct buf {
...
...
uint timestamp; // 时间戳
};
(4). 更改brelse
,不再获取全局锁
void
brelse(struct buf* b) {
if(!holdingsleep(&b->lock))
panic("brelse");
int bid = HASH(b->blockno);
releasesleep(&b->lock);
acquire(&bcache.buckets[bid].lock);
b->refcnt--;
// 更新时间戳
// 由于LRU改为使用时间戳判定,不再需要头插法
acquire(&tickslock);
b->timestamp = ticks;
release(&tickslock);
release(&bcache.buckets[bid].lock);
}
(5). 更改bget
,当没有找到指定的缓冲区时进行分配,分配方式是优先从当前列表遍历,找到一个没有引用且timestamp
最小的缓冲区,如果没有就申请下一个桶的锁,并遍历该桶,找到后将该缓冲区从原来的桶移动到当前桶中,最多将所有桶都遍历完。在代码中要注意锁的释放
static struct buf*
bget(uint dev, uint blockno) {
struct buf* b;
int bid = HASH(blockno);
acquire(&bcache.buckets[bid].lock);
// Is the block already cached?
for(b = bcache.buckets[bid].head.next; b != &bcache.buckets[bid].head; b = b->next) {
if(b->dev == dev && b->blockno == blockno) {
b->refcnt++;
// 记录使用时间戳
acquire(&tickslock);
b->timestamp = ticks;
release(&tickslock);
release(&bcache.buckets[bid].lock);
acquiresleep(&b->lock);
return b;
}
}
// Not cached.
b = 0;
struct buf* tmp;
// Recycle the least recently used (LRU) unused buffer.
// 从当前散列桶开始查找
for(int i = bid, cycle = 0; cycle != NBUCKET; i = (i + 1) % NBUCKET) {
++cycle;
// 如果遍历到当前散列桶,则不重新获取锁
if(i != bid) {
if(!holding(&bcache.buckets[i].lock))
acquire(&bcache.buckets[i].lock);
else
continue;
}
for(tmp = bcache.buckets[i].head.next; tmp != &bcache.buckets[i].head; tmp = tmp->next)
// 使用时间戳进行LRU算法,而不是根据结点在链表中的位置
if(tmp->refcnt == 0 && (b == 0 || tmp->timestamp < b->timestamp))
b = tmp;
if(b) {
// 如果是从其他散列桶窃取的,则将其以头插法插入到当前桶
if(i != bid) {
b->next->prev = b->prev;
b->prev->next = b->next;
release(&bcache.buckets[i].lock);
b->next = bcache.buckets[bid].head.next;
b->prev = &bcache.buckets[bid].head;
bcache.buckets[bid].head.next->prev = b;
bcache.buckets[bid].head.next = b;
}
b->dev = dev;
b->blockno = blockno;
b->valid = 0;
b->refcnt = 1;
acquire(&tickslock);
b->timestamp = ticks;
release(&tickslock);
release(&bcache.buckets[bid].lock);
acquiresleep(&b->lock);
return b;
} else {
// 在当前散列桶中未找到,则直接释放锁
if(i != bid)
release(&bcache.buckets[i].lock);
}
}
panic("bget: no buffers");
}
(6). 最后将末尾的两个小函数也改一下
void
bpin(struct buf* b) {
int bid = HASH(b->blockno);
acquire(&bcache.buckets[bid].lock);
b->refcnt++;
release(&bcache.buckets[bid].lock);
}
void
bunpin(struct buf* b) {
int bid = HASH(b->blockno);
acquire(&bcache.buckets[bid].lock);
b->refcnt--;
release(&bcache.buckets[bid].lock);
}
踩过的坑:
- bget中重新分配可能要持有两个锁,如果桶a持有自己的锁,再申请桶b的锁,与此同时如果桶b持有自己的锁,再申请桶a的锁就会造成死锁!因此代码中使用了
if(!holding(&bcache.bucket[i].lock))
来进行检查。此外,代码优先从自己的桶中获取缓冲区,如果自身没有依次向后查找这样的方式也尽可能地避免了前面的情况。 - 在
bget
中搜索缓冲区并在找不到缓冲区时为该缓冲区分配条目必须是原子的!在提示中说bget
如果未找到而进行分配的操作可以是串行化的,也就是说多个CPU中未找到,应当串行的执行分配,同时还应当避免死锁。于是在发现未命中(Not cached)后,我写了如下的代码(此时未删除bcache.lock
)
// 前半部分查找缓冲区的代码
// Not cached
release(&bcache.buckets[bid].lock);
acquire(&bcache.lock);
acquire(&bcache.buckets[bid].lock);
// 后半部分分配缓冲区的代码
这段代码中先释放了散列桶的锁之后再重新获取,之所以这样做是为了让所有代码都保证申请锁的顺序:先获取整个缓冲区的大锁再获取散列桶的小锁,这样才能避免死锁。但是这样做却破坏了程序执行的原子性。
在release
桶的锁并重新acquire
的这段时间,另一个CPU可能也以相同的参数调用了bget
,也发现没有该缓冲区并想要执行分配。最终的结果是一个磁盘块对应了两个缓冲区,破坏了最重要的不变量,即每个块最多缓存一个副本。这样会导致usertests
中的manywrites
测试报错:panic: freeing free block