实验环境配置:https://pdos.csail.mit.edu/6.824/labs/lab-mr.html。
这篇笔记主要起到自我学习过程记录的作用,只写了我完成这个实验的大致步骤和遇到问题的解决思路,没有写我的实现思路(其实我的实现思路也很简单,不像很多大佬都做了优化),所以希望看到这篇文章的大佬不要太过严格,但是如果能对其他人提供帮助的话那就更好了。
2 阅读代码看一下map函数:
// The map function is called once for each file of input. The first
// argument is the name of the input file, and the second is the
// file's complete contents. You should ignore the input file name,
// and look only at the contents argument. The return value is a slice
// of key/value pairs.
//
func Map(filename string, contents string) []mr.KeyValue {
// function to detect word separators.
ff := func(r rune) bool { return !unicode.IsLetter(r) }
// split contents into an array of words.
words := strings.FieldsFunc(contents, ff)
kva := []mr.KeyValue{}
for _, w := range words {
kv := mr.KeyValue{w, "1"}
kva = append(kva, kv)
}
return kva
}
就是靠strings.FieldsFunc,按空格和换行分割一个文件中的所有内容(数字不会保留到切片),最后形成一个巨大的slice words。每个单词就是一个元素。
kva就是kv array。是kv数组,对切片words遍历,每一个word就会生成一个{word, “1”}键值对,然后传到kva里面,最后返回kva。
kva就类似于:[{sheng 1} {jun 1} {a 1} ...]
然后mrsequential.go就会将kva追加到intermediate内。
所以,kva是一个文件的单词切片,intermediate是已完成map任务的所有文件的单词切片
这个和实际MapReduce有区别,因为实际的中间内容不可能全放在一起,而是会做分区放在buckets,以节省内存。
然后会对intermediate里面的元素按字母顺序排序。排完序之后就可以根据前后的key是不是一样进行计数:
// mrsequential.go
i := 0
for i < len(intermediate) {
j := i + 1
for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
j++
}
values := []string{}
for k := i; k < j; k++ {
values = append(values, intermediate[k].Value)
}
output := reducef(intermediate[i].Key, values)
// this is the correct format for each line of Reduce output.
fmt.Fprintf(ofile, "%v %vn", intermediate[i].Key, output)
i = j
}
// wc.go
func Reduce(key string, values []string) string {
// return the number of occurrences of this word.
return strconv.Itoa(len(values))
}
写入输出文件(mr-out-0),每一行内容就是:intermediate[i].Key 出现次数。
运行时,会同时开启多个worker进程,这个不是用的协程。关于中间文件mr-X-Y。一个map任务产生一组切片,这个就是中间结果,但是这个结果想要被10个reduce任务处理,那么就必须通过对key进行区分,对每个key进行哈希在对nReduce取余,这就得到Y,把每个key写入对应Y的文件中。所以一共最大可能产生X*Y个文件,实验中是最大8*10个中间文件
3 遇到的问题
UNIX socket方式的RPC传结构体无法正确获得值:
解决:rpc传输变量要大写
中间文件写到一半不能写入了
解决:关闭文件,应该是打开文件数有上限,主要问题是不知为何之前用的defer file.Close()不起作用
reduce任务不能run
解决:代码逻辑问题,直接顺序执行到AllDone阶段了
--- wc test: PASS --- indexer test: FAIL --- map parallelism test: FAIL --- reduce parallelism test: PASS --- job count test: PASS --- early exit test: PASS --- crash test: FAIL
这是我第一次进行测试的结果,下面是我对这几个FAIL的改正过程。
3.1.1 indexer测试先看看indexer测试内容。测试更换了map和reduce函数的内容:
func Map(document string, value string) (res []mr.KeyValue) {
m := make(map[string]bool)
words := strings.FieldsFunc(value, func(x rune) bool { return !unicode.IsLetter(x) })
for _, w := range words {
m[w] = true
}
for w := range m {
kv := mr.KeyValue{w, document}
res = append(res, kv)
}
return
}
func Reduce(key string, values []string) string {
sort.Strings(values)
// 在values的每个元素之间插入逗号
return fmt.Sprintf("%d %s", len(values), strings.Join(values, ","))
}
根据lab介绍,可以知道indexer是文本索引器,而不是单词计数,可以用串行mr跑一下文本索引程序,看看最终结果:
A 8 pg-being_ernest.txt,pg-dorian_gray.txt,pg-frankenstein.txt,pg-grimm.txt,pg-h uckleberry_finn.txt,pg-metamorphosis.txt,pg-sherlock_holmes.txt,pg-tom_sawyer.tx t about 1 pg-tom_sawyer.txt ACT 1 pg-being_ernest.txt ACTRESS 1 pg-dorian_gray.txt ACTUAL 8 pg-being_ernest.txt,pg-dorian_gray.txt,pg-frankenstein.txt,pg-grimm.txt ,pg-huckleberry_finn.txt,pg-metamorphosis.txt,pg-sherlock_holmes.txt,pg-tom_sawy er.txt ADLER 1 pg-sherlock_holmes.txt ADVENTURE 1 pg-sherlock_holmes.txt ...
输出结果是单词 单词所在的文本数 文本名称的集合,同一个单词在一个文本中只统计一次。
Map()函数的变化,从原来返回key counts键值对数组,变为返回key filename键值对数组:
[{lou json.txt} {jun json.txt} {jason json.txt} {kkk json.txt} {jjj json.txt} {lll json.txt}]
如果按照串行程序的代码来写,按理说应该可以顺利通过测试,但问题就是出在我们代码中需要写入中间文件,而我的中间文件因为json包追加内容的限制,所以我的写入是采取比较蠢的方法(模仿json的写入,方便追加):
ifile, err := os.OpenFile(iName, os.O_APPEND|os.O_RDWR|os.O_CREATE, 0777)
if err != nil {
log.Fatalf("cannot append/create %v, key: %v", iName, kva[i])
return FAIL
}
write := bufio.NewWriter(ifile)
// 仿json.NewEncoder的写入,方便读取
// {"Key":"lou","Value":"3"}
str := "{"Key":"" + kva[i].Key + "","Value":"" + strconv.Itoa(1) + ""}n"
write.WriteString(str)
//Flush将缓存的文件真正写入到文件中
write.Flush()
我为了省事直接写入了1,所以放在文本索引器中当然无法通过,于是把1修改为kva[i].Value,indexer测试就能顺利通过了。
3.1.2 并行测试map并行测试的提示如下:
================== 2022/01/29 13:02:20 cannot open mr-1-0 2022/01/29 13:02:21 cannot open mr-0-1 cat: 'mr-out*': No such file or directory --- saw 0 workers rather than 2 --- map parallelism test: FAIL cat: 'mr-out*': No such file or directory --- map workers did not run in parallel --- map parallelism test: FAIL
显示只能看到0个worker,我感觉比较奇怪的是reduce并行测试能过,但map不能过。
查看了下mtiming.go文件,这是产生map并行测试的链接库的文件,发现会在mapf内执行nparallel()方法,该方法会创建mr-worker-map-
我在运行map并行测试时观测mr-tmp,发现中间文件产生并不多,并且发现没有生成mr-out文件,而map并行测试脚本就是对mr-out进行读取再测试,于是我猜测可能是代码逻辑的问题。然后我又看了下我的doReduce函数,发现有一个问题:
y := strconv.Itoa(reduceNumber)
rFilename := "mr-" + x + "-" + y
rFile, err := os.Open(rFilename)
if err != nil {
log.Fatalf("cannot open %v", rFilename)
return FAIL
}
显然这个逻辑是不正确,不能因为没有该中间文件就直接返回fail,因为map结果可能确实没有产生该reduce的数据,所以我把return fail那一段注释了,或者直接不对err处理,成功通过了map测试。不过我也没太深究为什么之前reduce并行测试可以通过。
3.1.3 crash测试先看看报错:
jason@MIT6-824:~/go/src/6.824/src/main$ bash crash-test.sh *** Starting crash test. 2022/01/29 15:05:27 rpc.Register: method "Done" has 1 input parameters; needs exactly three 2022/01/29 15:08:27 dialing:dial unix /var/tmp/824-mr-1000: connect: connection refused 2022/01/29 15:08:28 dialing:dial unix /var/tmp/824-mr-1000: connect: connection refused 2022/01/29 15:08:28 dialing:dial unix /var/tmp/824-mr-1000: connect: connection refused 2022/01/29 15:08:28 dialing:dial unix /var/tmp/824-mr-1000: connect: connection refused sort: cannot read: 'mr-out*': No such file or directory cmp: EOF on mr-crash-all which is empty --- crash output is not the same as mr-correct-crash.txt --- crash test: FAIL
显然就是因为worker crash没有产生mr-out结果文件,看看crash.go主要函数:
func maybeCrash() {
max := big.NewInt(1000)
rr, _ := crand.Int(crand.Reader, max)
if rr.Int64() < 330 {
// crash!
os.Exit(1)
} else if rr.Int64() < 660 {
// delay for a while.
maxms := big.NewInt(10 * 1000)
ms, _ := crand.Int(crand.Reader, maxms)
time.Sleep(time.Duration(ms.Int64()) * time.Millisecond)
}
}
就是对一个worker进行随机的crash或者delay不到10秒。我最初写代码时虽然知道存在crash的情况,但是我当时也没有做特殊处理去计算时间,所以这里FAIL了也很正常。
于是我增加了一个协程在MakeCoordinator()中:go c.checkTaskCrash(),这个函数会一直在后台运行,循环检测协调器任务队列中的Running状态的任务,计算运行时间,超时则将任务状态改为Idle,并将任务计数-1。代码如下,看起来很冗余,我没有做精简处理:
func (c *Coordinator) checkTaskCrash() {
taskInfos := c.taskInfoHolder
for {
time.Sleep(time.Second)
if c.taskPhase == MapPhase {
for i := 0; i < 8; i++ {
taskInfo := taskInfos[i]
if taskInfo.taskState == Running {
if time.Since(taskInfo.startTime) > 10*time.Second {
c.Mu.Lock()
c.taskInfoHolder[i].taskState = Idle
c.mapTaskCount--
c.Mu.Unlock()
}
}
}
}
if c.taskPhase == ReducePhase {
for i := 8; i < 18; i++ {
taskInfo := taskInfos[i]
if taskInfo.taskState == Running {
if time.Since(taskInfo.startTime) > 10*time.Second {
c.Mu.Lock()
c.taskInfoHolder[i].taskState = Idle
c.reduceTaskCount--
c.Mu.Unlock()
}
}
}
}
if c.taskPhase == AllDone {
break
}
}
}
最后的运行结果:



