栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

MIT 6.824-Lab 1 MapReduce

MIT 6.824-Lab 1 MapReduce

1 前言

实验环境配置:https://pdos.csail.mit.edu/6.824/labs/lab-mr.html。

这篇笔记主要起到自我学习过程记录的作用,只写了我完成这个实验的大致步骤和遇到问题的解决思路,没有写我的实现思路(其实我的实现思路也很简单,不像很多大佬都做了优化),所以希望看到这篇文章的大佬不要太过严格,但是如果能对其他人提供帮助的话那就更好了。

2 阅读代码

看一下map函数:

// The map function is called once for each file of input. The first
// argument is the name of the input file, and the second is the
// file's complete contents. You should ignore the input file name,
// and look only at the contents argument. The return value is a slice
// of key/value pairs.
//
func Map(filename string, contents string) []mr.KeyValue {
	// function to detect word separators.
	ff := func(r rune) bool { return !unicode.IsLetter(r) }

	// split contents into an array of words.
	words := strings.FieldsFunc(contents, ff)

	kva := []mr.KeyValue{}
	for _, w := range words {
		kv := mr.KeyValue{w, "1"}
		kva = append(kva, kv)
	}
	return kva
}

就是靠strings.FieldsFunc,按空格和换行分割一个文件中的所有内容(数字不会保留到切片),最后形成一个巨大的slice words。每个单词就是一个元素。

kva就是kv array。是kv数组,对切片words遍历,每一个word就会生成一个{word, “1”}键值对,然后传到kva里面,最后返回kva。

kva就类似于:[{sheng 1} {jun 1} {a 1} ...]

然后mrsequential.go就会将kva追加到intermediate内。

所以,kva是一个文件的单词切片,intermediate是已完成map任务的所有文件的单词切片

这个和实际MapReduce有区别,因为实际的中间内容不可能全放在一起,而是会做分区放在buckets,以节省内存。

然后会对intermediate里面的元素按字母顺序排序。排完序之后就可以根据前后的key是不是一样进行计数:

// mrsequential.go
i := 0
for i < len(intermediate) {
    j := i + 1
    for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
        j++
    }
    values := []string{}
    for k := i; k < j; k++ {
        values = append(values, intermediate[k].Value)
    }
    output := reducef(intermediate[i].Key, values)

    // this is the correct format for each line of Reduce output.
    fmt.Fprintf(ofile, "%v %vn", intermediate[i].Key, output)

    i = j
}

// wc.go
func Reduce(key string, values []string) string {
	// return the number of occurrences of this word.
	return strconv.Itoa(len(values))
}

写入输出文件(mr-out-0),每一行内容就是:intermediate[i].Key 出现次数。


运行时,会同时开启多个worker进程,这个不是用的协程。关于中间文件mr-X-Y。一个map任务产生一组切片,这个就是中间结果,但是这个结果想要被10个reduce任务处理,那么就必须通过对key进行区分,对每个key进行哈希在对nReduce取余,这就得到Y,把每个key写入对应Y的文件中。所以一共最大可能产生X*Y个文件,实验中是最大8*10个中间文件


3 遇到的问题

    UNIX socket方式的RPC传结构体无法正确获得值:

    解决:rpc传输变量要大写

    中间文件写到一半不能写入了

    解决:关闭文件,应该是打开文件数有上限,主要问题是不知为何之前用的defer file.Close()不起作用

    reduce任务不能run

    解决:代码逻辑问题,直接顺序执行到AllDone阶段了

3.1 Test错误及解决过程
--- wc test: PASS
--- indexer test: FAIL
--- map parallelism test: FAIL
--- reduce parallelism test: PASS
--- job count test: PASS
--- early exit test: PASS
--- crash test: FAIL

这是我第一次进行测试的结果,下面是我对这几个FAIL的改正过程。

3.1.1 indexer测试

先看看indexer测试内容。测试更换了map和reduce函数的内容:

func Map(document string, value string) (res []mr.KeyValue) {
	m := make(map[string]bool)
	words := strings.FieldsFunc(value, func(x rune) bool { return !unicode.IsLetter(x) })
	for _, w := range words {
		m[w] = true
	}
	for w := range m {
		kv := mr.KeyValue{w, document}
		res = append(res, kv)
	}
	return
}

func Reduce(key string, values []string) string {
	sort.Strings(values)
  	// 在values的每个元素之间插入逗号 
	return fmt.Sprintf("%d %s", len(values), strings.Join(values, ","))
}

根据lab介绍,可以知道indexer是文本索引器,而不是单词计数,可以用串行mr跑一下文本索引程序,看看最终结果:

A 8 pg-being_ernest.txt,pg-dorian_gray.txt,pg-frankenstein.txt,pg-grimm.txt,pg-h
uckleberry_finn.txt,pg-metamorphosis.txt,pg-sherlock_holmes.txt,pg-tom_sawyer.tx
t
about 1 pg-tom_sawyer.txt
ACT 1 pg-being_ernest.txt
ACTRESS 1 pg-dorian_gray.txt
ACTUAL 8 pg-being_ernest.txt,pg-dorian_gray.txt,pg-frankenstein.txt,pg-grimm.txt
,pg-huckleberry_finn.txt,pg-metamorphosis.txt,pg-sherlock_holmes.txt,pg-tom_sawy
er.txt
ADLER 1 pg-sherlock_holmes.txt
ADVENTURE 1 pg-sherlock_holmes.txt
...

输出结果是单词 单词所在的文本数 文本名称的集合,同一个单词在一个文本中只统计一次。

Map()函数的变化,从原来返回key counts键值对数组,变为返回key filename键值对数组:

[{lou json.txt} {jun json.txt} {jason json.txt} {kkk json.txt} {jjj json.txt} {lll json.txt}]

如果按照串行程序的代码来写,按理说应该可以顺利通过测试,但问题就是出在我们代码中需要写入中间文件,而我的中间文件因为json包追加内容的限制,所以我的写入是采取比较蠢的方法(模仿json的写入,方便追加):

ifile, err := os.OpenFile(iName, os.O_APPEND|os.O_RDWR|os.O_CREATE, 0777)
if err != nil {
    log.Fatalf("cannot append/create %v, key: %v", iName, kva[i])
    return FAIL
}
write := bufio.NewWriter(ifile)
// 仿json.NewEncoder的写入,方便读取
// {"Key":"lou","Value":"3"}
str := "{"Key":"" + kva[i].Key + "","Value":"" + strconv.Itoa(1) + ""}n"
write.WriteString(str)
//Flush将缓存的文件真正写入到文件中
write.Flush()

我为了省事直接写入了1,所以放在文本索引器中当然无法通过,于是把1修改为kva[i].Value,indexer测试就能顺利通过了。

3.1.2 并行测试

map并行测试的提示如下:

==================
2022/01/29 13:02:20 cannot open mr-1-0
2022/01/29 13:02:21 cannot open mr-0-1
cat: 'mr-out*': No such file or directory
--- saw 0 workers rather than 2
--- map parallelism test: FAIL
cat: 'mr-out*': No such file or directory
--- map workers did not run in parallel
--- map parallelism test: FAIL

显示只能看到0个worker,我感觉比较奇怪的是reduce并行测试能过,但map不能过。

查看了下mtiming.go文件,这是产生map并行测试的链接库的文件,发现会在mapf内执行nparallel()方法,该方法会创建mr-worker-map-名称的文件,并且Map和Reduce函数都变了,map产生的kva中的元素变成times- 和parallel- 。而n就是由nparallel产生,具体函数内容没怎么看。

我在运行map并行测试时观测mr-tmp,发现中间文件产生并不多,并且发现没有生成mr-out文件,而map并行测试脚本就是对mr-out进行读取再测试,于是我猜测可能是代码逻辑的问题。然后我又看了下我的doReduce函数,发现有一个问题:

y := strconv.Itoa(reduceNumber)
rFilename := "mr-" + x + "-" + y
rFile, err := os.Open(rFilename)
if err != nil {
	log.Fatalf("cannot open %v", rFilename)
	return FAIL
}

显然这个逻辑是不正确,不能因为没有该中间文件就直接返回fail,因为map结果可能确实没有产生该reduce的数据,所以我把return fail那一段注释了,或者直接不对err处理,成功通过了map测试。不过我也没太深究为什么之前reduce并行测试可以通过。

3.1.3 crash测试

先看看报错:

jason@MIT6-824:~/go/src/6.824/src/main$ bash crash-test.sh
*** Starting crash test.
2022/01/29 15:05:27 rpc.Register: method "Done" has 1 input parameters; needs exactly three
2022/01/29 15:08:27 dialing:dial unix /var/tmp/824-mr-1000: connect: connection refused
2022/01/29 15:08:28 dialing:dial unix /var/tmp/824-mr-1000: connect: connection refused
2022/01/29 15:08:28 dialing:dial unix /var/tmp/824-mr-1000: connect: connection refused
2022/01/29 15:08:28 dialing:dial unix /var/tmp/824-mr-1000: connect: connection refused
sort: cannot read: 'mr-out*': No such file or directory
cmp: EOF on mr-crash-all which is empty
--- crash output is not the same as mr-correct-crash.txt
--- crash test: FAIL

显然就是因为worker crash没有产生mr-out结果文件,看看crash.go主要函数:

func maybeCrash() {
	max := big.NewInt(1000)
	rr, _ := crand.Int(crand.Reader, max)
	if rr.Int64() < 330 {
		// crash!
		os.Exit(1)
	} else if rr.Int64() < 660 {
		// delay for a while.
		maxms := big.NewInt(10 * 1000)
		ms, _ := crand.Int(crand.Reader, maxms)
		time.Sleep(time.Duration(ms.Int64()) * time.Millisecond)
	}
}

就是对一个worker进行随机的crash或者delay不到10秒。我最初写代码时虽然知道存在crash的情况,但是我当时也没有做特殊处理去计算时间,所以这里FAIL了也很正常。

于是我增加了一个协程在MakeCoordinator()中:go c.checkTaskCrash(),这个函数会一直在后台运行,循环检测协调器任务队列中的Running状态的任务,计算运行时间,超时则将任务状态改为Idle,并将任务计数-1。代码如下,看起来很冗余,我没有做精简处理:

func (c *Coordinator) checkTaskCrash() {
	taskInfos := c.taskInfoHolder
	for {
		time.Sleep(time.Second)
		if c.taskPhase == MapPhase {
			for i := 0; i < 8; i++ {
				taskInfo := taskInfos[i]
				if taskInfo.taskState == Running {
					if time.Since(taskInfo.startTime) > 10*time.Second {
						c.Mu.Lock()
						c.taskInfoHolder[i].taskState = Idle
						c.mapTaskCount--
						c.Mu.Unlock()
					}
				}
			}
		}
		if c.taskPhase == ReducePhase {
			for i := 8; i < 18; i++ {
				taskInfo := taskInfos[i]
				if taskInfo.taskState == Running {
					if time.Since(taskInfo.startTime) > 10*time.Second {
						c.Mu.Lock()
						c.taskInfoHolder[i].taskState = Idle
						c.reduceTaskCount--
						c.Mu.Unlock()
					}
				}
			}
		}
		if c.taskPhase == AllDone {
			break
		}
	}
}

最后的运行结果:

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/719411.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号