最近空余时间有点太闲了,打算学下6.824这门课程。
实验结果 代码coordinator.go
type Coordinator struct {
// Your definitions here.
lock sync.Mutex
nMap int
nReduce int // max reduce task bucket
mapCh chan string // unsigned map task
reduceCh chan int // unsigned reduce task
state int // 1 2 3
reduceTask map[int][]string // reduce task
mapFinished map[string]bool
reduceFinishMap map[int]bool
reduceFinished int
}
// Your code here -- RPC handlers for the worker to call.
//
// an example RPC handler.
//
// the RPC argument and reply types are defined in rpc.go.
//
func (c *Coordinator) Example(args *ExampleArgs, reply *ExampleReply) error {
reply.Y = args.X + 1
return nil
}
//
// start a thread that listens for RPCs from worker.go
//
func (c *Coordinator) server() {
rpc.Register(c)
rpc.HandleHTTP()
//l, e := net.Listen("tcp", ":1234")
sockname := coordinatorSock()
os.Remove(sockname)
l, e := net.Listen("unix", sockname)
if e != nil {
log.Fatal("listen error:", e)
}
go http.Serve(l, nil)
}
func (c *Coordinator) FetchMapTask(ignore int, reply *MapTask) error {
fname, ok := <-c.mapCh
if !ok {
reply.Finish = true
return nil
}
reply.Fname = fname
reply.NReduce = c.nReduce
go c.checkTimeout(fname)
return nil
}
func (c *Coordinator) FetchReduceTask(ignore int, reply *ReduceTask) error {
rid, ok := <-c.reduceCh // 这里如果持有锁可能会造成死锁,因为 ReduceDone也要申请锁
if !ok {
reply.Finish=true
return nil
//return &NoTaskAvailableError{"all tasks had been assigned."}
}
ita := c.reduceTask[rid] // intimate result Files of ith bucket
reply.Files = ita
reply.ID = rid // rid
go c.checkTimeoutReduceTask(rid)
return nil
}
func (c *Coordinator) checkTimeout(s string) {
time.Sleep(time.Second * 10)
lock := &c.lock
lock.Lock()
defer lock.Unlock()
m := c.mapFinished
ch := c.mapCh
if m[s] {
return
}
log.Printf("map task#%s time out.", s)
ch <- s
}
func (c *Coordinator) checkTimeoutReduceTask(rid int) {
time.Sleep(time.Second * 10)
lock := &c.lock
lock.Lock()
defer lock.Unlock()
m := c.reduceFinishMap
ch := c.reduceCh
if m[rid] {
return
}
log.Printf("reduce task#%d time out.", rid)
ch <- rid
}
// MapDone reply must is a pointer
func (c *Coordinator) MapDone(arg *MapTaskDone, reply *bool) error {
lock := &c.lock
lock.Lock()
defer lock.Unlock()
finished := c.mapFinished
if finished[arg.Fname] { // duplicate commit
return nil
}
finished[arg.Fname] = true
for k, v := range arg.Items {
tasks := c.reduceTask[k]
tasks = append(tasks, v)
c.reduceTask[k] = tasks
}
if len(finished) == c.nMap {
c.state = 2
close(c.mapCh) // next stage
go func() {
c.lock.Lock()
defer c.lock.Unlock()
for k := range c.reduceTask {
c.reduceCh <- k
c.reduceFinishMap[k] = false
}
}()
}
return nil
}
// ReduceDone 需考虑如果timeout再被调用
func (c *Coordinator) ReduceDone(arg int, ignore *int) error {
lock := &c.lock
lock.Lock()
defer lock.Unlock()
if !c.reduceFinishMap[arg] {
c.reduceFinishMap[arg] = true
c.reduceFinished++
}
if len(c.reduceFinishMap) == c.reduceFinished {
c.state = 3
close(c.reduceCh)
}
return nil
}
//
// main/mrcoordinator.go calls Done() periodically to find out
// if the entire job has finished.
//
func (c *Coordinator) Done() bool {
lock := &c.lock
lock.Lock()
defer lock.Unlock()
// Your code here.
return c.state == 3
}
//
// create a Coordinator.
// main/mrcoordinator.go calls this function.
// NReduce is the number of reduce tasks to use.
//
func MakeCoordinator(files []string, nReduce int) *Coordinator {
// Your code here.
c := Coordinator{
nMap: len(files),
nReduce: nReduce,
mapCh: make(chan string, 10),
reduceCh: make(chan int, nReduce),
state: 1,
reduceTask: map[int][]string{},
mapFinished: map[string]bool{},
reduceFinishMap: map[int]bool{},
reduceFinished: 0,
}
for _, s := range files {
c.mapCh <- s
}
c.server()
return &c
}
worker.go
//
// Map functions return a slice of KeyValue.
//
type KeyValue struct {
Key string
Value string
}
//
// use ihash(key) % NReduce to choose the reduce
// task number for each KeyValue emitted by Map.
//
func ihash(key string) int {
h := fnv.New32a()
h.Write([]byte(key))
return int(h.Sum32() & 0x7fffffff)
}
type worker struct {
stage int
mapf func(string, string) []KeyValue
reducef func(string, []string) string
}
//
// main/mrworker.go calls this function.
//
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {
// Your worker implementation here.
w := worker{
stage: 1,
mapf: mapf,
reducef: reducef,
}
for w.stage != 3 {
w.fetch()
}
// uncomment to send the Example RPC to the coordinator.
// CallExample()
}
//
// example function to show how to make an RPC call to the coordinator.
//
// the RPC argument and reply types are defined in rpc.go.
//
func CallExample() {
// declare an argument structure.
args := ExampleArgs{}
// fill in the argument(s).
args.X = 99
// declare a reply structure.
reply := ExampleReply{}
// send the RPC request, wait for the reply.
// the "Coordinator.Example" tells the
// receiving server that we'd like to call
// the Example() method of struct Coordinator.
err := call("Coordinator.Example", &args, &reply)
if err == nil {
// reply.Y should be 100.
fmt.Printf("reply.Y %vn", reply.Y)
} else {
fmt.Printf("call failed!n")
}
}
//
// send an RPC request to the coordinator, wait for the response.
// usually returns true.
// returns false if something goes wrong.
//
func call(rpcname string, args interface{}, reply interface{}) error {
// c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")
sockname := coordinatorSock()
c, err := rpc.DialHTTP("unix", sockname)
if err != nil {
log.Fatal("dialing:", err)
}
defer c.Close()
err = c.Call(rpcname, args, reply)
return err
}
func (w *worker) fetch() {
var mapTask MapTask
var reduceTask ReduceTask
if w.stage == 1 {
err := call("Coordinator.FetchMapTask", 1, &mapTask)
if err != nil {
log.Fatalln(err)
} else if mapTask.Finish {
w.stage = 2
return
}
res := w.doMap(&mapTask)
ok := false
err = call("Coordinator.MapDone", res, &ok)
if err != nil {
log.Fatalf("call rpc MapDone failed. " + err.Error())
}
} else if w.stage == 2 {
err := call("Coordinator.FetchReduceTask", 0, &reduceTask)
if err != nil {
log.Fatalln(err)
} else if reduceTask.Finish {
w.stage = 3
return
}
res := reduceTask.ID
w.doReduce(reduceTask.Files, reduceTask.ID)
ignore := 0
call("Coordinator.ReduceDone", res, &ignore)
}
}
func (w *worker) doMap(t *MapTask) MapTaskDone {
nReduce := t.NReduce
source := t.Fname
file, err := os.Open(source)
if err != nil {
log.Fatalf("cannot open %kva_i", source)
}
bytes, err := ioutil.ReadAll(file)
if err != nil {
log.Fatalf("cannot read %kva_i", source)
}
content := string(bytes)
mapf := w.mapf
kva := mapf(source, content)
m := map[int][]KeyValue{} // reduce task bucket
for i := 0; i < nReduce; i++ {
m[i] = make([]KeyValue, 0, 0)
}
for _, kv := range kva {
rid := ihash(kv.Key) % nReduce // dispatch this kv to one reduce task
r := m[rid]
r = append(r, kv)
m[rid] = r
}
res := MapTaskDone{
Fname: source,
Items: map[int]string{},
}
for rid, kva_i := range m {
if len(kva_i) == 0 {
continue
}
// source 可能含有前缀 ../ , 导致file创建失败
split := strings.Split(source, "/")
fname := strconv.Itoa(rid) + "-" + split[len(split)-1]
file, _ := os.Create(fname)
for _, kv := range kva_i {
_, _ = fmt.Fprintf(file, "%v %vn", kv.Key, kv.Value)
}
file.Close()
res.Items[rid] = fname
}
return res
}
// ByKey for sorting by key.
type ByKey []KeyValue
// Len for sorting by key.
func (a ByKey) Len() int { return len(a) }
func (a ByKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }
func (w *worker) doReduce(files []string, id int) int {
kvaStr := ""
for _, s := range files {
file, err := os.Open(s)
if err != nil {
log.Fatalln(err)
}
bytes, _ := ioutil.ReadAll(file)
kvaStr += string(bytes)
}
var it []KeyValue
for _, kvStr := range strings.Split(kvaStr, "n") {
split := strings.Split(kvStr, " ")
if len(split) < 2 {
continue
}
k := split[0]
v := split[1]
it = append(it, KeyValue{k, v})
}
sort.Sort(ByKey(it))
var values []string
reducef := w.reducef
ofile, _ := os.Create("mr-out-" + strconv.Itoa(id))
for i := 0; i < len(it); i++ {
k := it[i].Key
values = append(values, it[i].Value)
for i+1 < len(it) && it[i+1].Key == k {
values = append(values, it[i+1].Value)
i++
}
output := reducef(k, values)
values = []string{}
_, _ = fmt.Fprintf(ofile, "%v %vn", k, output)
}
ofile.Close()
//log.Printf("reduce task#%d done. save to %s", id, ofileName)
return id
}
总结
lab1并没有特别顺利,中途遇到死锁、文件创建失败等问题,排查以来也有点费劲。



