栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

6.824-lab1

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

6.824-lab1

最近空余时间有点太闲了,打算学下6.824这门课程。

实验结果

代码

coordinator.go

type Coordinator struct {
  // Your definitions here.
  lock            sync.Mutex
  nMap            int
  nReduce         int              // max reduce task bucket
  mapCh           chan string      // unsigned map task
  reduceCh        chan int         // unsigned reduce task
  state           int              // 1 2 3
  reduceTask      map[int][]string // reduce task
  mapFinished     map[string]bool
  reduceFinishMap map[int]bool
  reduceFinished  int
}

// Your code here -- RPC handlers for the worker to call.

//
// an example RPC handler.
//
// the RPC argument and reply types are defined in rpc.go.
//
func (c *Coordinator) Example(args *ExampleArgs, reply *ExampleReply) error {
  reply.Y = args.X + 1
  return nil
}

//
// start a thread that listens for RPCs from worker.go
//
func (c *Coordinator) server() {
  rpc.Register(c)
  rpc.HandleHTTP()
  //l, e := net.Listen("tcp", ":1234")
  sockname := coordinatorSock()
  os.Remove(sockname)
  l, e := net.Listen("unix", sockname)
  if e != nil {
    log.Fatal("listen error:", e)
  }
  go http.Serve(l, nil)
}

func (c *Coordinator) FetchMapTask(ignore int, reply *MapTask) error {
  fname, ok := <-c.mapCh
  if !ok {
    reply.Finish = true
    return nil
  }
  reply.Fname = fname
  reply.NReduce = c.nReduce
  go c.checkTimeout(fname)
  return nil
}

func (c *Coordinator) FetchReduceTask(ignore int, reply *ReduceTask) error {
  rid, ok := <-c.reduceCh // 这里如果持有锁可能会造成死锁,因为 ReduceDone也要申请锁
  if !ok {
    reply.Finish=true
    return nil
    //return &NoTaskAvailableError{"all tasks had been assigned."}
  }
  ita := c.reduceTask[rid] // intimate result Files of ith bucket
  reply.Files = ita
  reply.ID = rid // rid
  go c.checkTimeoutReduceTask(rid)
  return nil
}

func (c *Coordinator) checkTimeout(s string) {
  time.Sleep(time.Second * 10)
  lock := &c.lock
  lock.Lock()
  defer lock.Unlock()
  m := c.mapFinished
  ch := c.mapCh
  if m[s] {
    return
  }
  log.Printf("map task#%s time out.", s)
  ch <- s
}

func (c *Coordinator) checkTimeoutReduceTask(rid int) {
  time.Sleep(time.Second * 10)
  lock := &c.lock
  lock.Lock()
  defer lock.Unlock()
  m := c.reduceFinishMap
  ch := c.reduceCh
  if m[rid] {
    return
  }
  log.Printf("reduce task#%d time out.", rid)
  ch <- rid
}

// MapDone reply must is a pointer
func (c *Coordinator) MapDone(arg *MapTaskDone, reply *bool) error {
  lock := &c.lock
  lock.Lock()
  defer lock.Unlock()

  finished := c.mapFinished
  if finished[arg.Fname] { // duplicate commit
    return nil
  }
  finished[arg.Fname] = true

  for k, v := range arg.Items {
    tasks := c.reduceTask[k]
    tasks = append(tasks, v)
    c.reduceTask[k] = tasks
  }

  if len(finished) == c.nMap {
    c.state = 2
    close(c.mapCh) // next stage
    go func() {
      c.lock.Lock()
      defer c.lock.Unlock()
      for k := range c.reduceTask {
        c.reduceCh <- k
        c.reduceFinishMap[k] = false
      }
    }()
  }

  return nil
}

// ReduceDone 需考虑如果timeout再被调用
func (c *Coordinator) ReduceDone(arg int, ignore *int) error {
  lock := &c.lock
  lock.Lock()
  defer lock.Unlock()
  if !c.reduceFinishMap[arg] {
    c.reduceFinishMap[arg] = true
    c.reduceFinished++
  }
  if len(c.reduceFinishMap) == c.reduceFinished {
    c.state = 3
    close(c.reduceCh)
  }
  return nil
}

//
// main/mrcoordinator.go calls Done() periodically to find out
// if the entire job has finished.
//
func (c *Coordinator) Done() bool {
  lock := &c.lock
  lock.Lock()
  defer lock.Unlock()

  // Your code here.

  return c.state == 3
}

//
// create a Coordinator.
// main/mrcoordinator.go calls this function.
// NReduce is the number of reduce tasks to use.
//
func MakeCoordinator(files []string, nReduce int) *Coordinator {

  // Your code here.
  c := Coordinator{
    nMap:            len(files),
    nReduce:         nReduce,
    mapCh:           make(chan string, 10),
    reduceCh:        make(chan int, nReduce),
    state:           1,
    reduceTask:      map[int][]string{},
    mapFinished:     map[string]bool{},
    reduceFinishMap: map[int]bool{},
    reduceFinished:  0,
  }
  for _, s := range files {
    c.mapCh <- s
  }
  c.server()

  return &c
}

worker.go

//
// Map functions return a slice of KeyValue.
//
type KeyValue struct {
  Key   string
  Value string
}

//
// use ihash(key) % NReduce to choose the reduce
// task number for each KeyValue emitted by Map.
//
func ihash(key string) int {
  h := fnv.New32a()
  h.Write([]byte(key))
  return int(h.Sum32() & 0x7fffffff)
}

type worker struct {
  stage   int
  mapf    func(string, string) []KeyValue
  reducef func(string, []string) string
}

//
// main/mrworker.go calls this function.
//
func Worker(mapf func(string, string) []KeyValue,
  reducef func(string, []string) string) {

  // Your worker implementation here.
  w := worker{
    stage:   1,
    mapf:    mapf,
    reducef: reducef,
  }
  for w.stage != 3 {
    w.fetch()
  }
  // uncomment to send the Example RPC to the coordinator.
  // CallExample()

}

//
// example function to show how to make an RPC call to the coordinator.
//
// the RPC argument and reply types are defined in rpc.go.
//
func CallExample() {

  // declare an argument structure.
  args := ExampleArgs{}

  // fill in the argument(s).
  args.X = 99

  // declare a reply structure.
  reply := ExampleReply{}

  // send the RPC request, wait for the reply.
  // the "Coordinator.Example" tells the
  // receiving server that we'd like to call
  // the Example() method of struct Coordinator.
  err := call("Coordinator.Example", &args, &reply)
  if err == nil {
    // reply.Y should be 100.
    fmt.Printf("reply.Y %vn", reply.Y)
  } else {
    fmt.Printf("call failed!n")
  }
}

//
// send an RPC request to the coordinator, wait for the response.
// usually returns true.
// returns false if something goes wrong.
//
func call(rpcname string, args interface{}, reply interface{}) error {
  // c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")
  sockname := coordinatorSock()
  c, err := rpc.DialHTTP("unix", sockname)
  if err != nil {
    log.Fatal("dialing:", err)
  }
  defer c.Close()

  err = c.Call(rpcname, args, reply)
  return err
}

func (w *worker) fetch() {
  var mapTask MapTask
  var reduceTask ReduceTask
  if w.stage == 1 {
    err := call("Coordinator.FetchMapTask", 1, &mapTask)
    if err != nil {
      log.Fatalln(err)
    } else if mapTask.Finish {
      w.stage = 2
      return
    }
    res := w.doMap(&mapTask)
    ok := false
    err = call("Coordinator.MapDone", res, &ok)
    if err != nil {
      log.Fatalf("call rpc MapDone failed. " + err.Error())
    }
  } else if w.stage == 2 {
    err := call("Coordinator.FetchReduceTask", 0, &reduceTask)
    if err != nil {
      log.Fatalln(err)
    } else if reduceTask.Finish {
      w.stage = 3
      return
    }
    res := reduceTask.ID
    w.doReduce(reduceTask.Files, reduceTask.ID)
    ignore := 0
    call("Coordinator.ReduceDone", res, &ignore)
  }

}

func (w *worker) doMap(t *MapTask) MapTaskDone {
  nReduce := t.NReduce
  source := t.Fname
  file, err := os.Open(source)
  if err != nil {
    log.Fatalf("cannot open %kva_i", source)
  }
  bytes, err := ioutil.ReadAll(file)
  if err != nil {
    log.Fatalf("cannot read %kva_i", source)
  }
  content := string(bytes)

  mapf := w.mapf
  kva := mapf(source, content)

  m := map[int][]KeyValue{} // reduce task bucket
  for i := 0; i < nReduce; i++ {
    m[i] = make([]KeyValue, 0, 0)
  }
  for _, kv := range kva {
    rid := ihash(kv.Key) % nReduce // dispatch this kv to one reduce task
    r := m[rid]
    r = append(r, kv)
    m[rid] = r
  }
  res := MapTaskDone{
    Fname: source,
    Items: map[int]string{},
  }
  for rid, kva_i := range m {
    if len(kva_i) == 0 {
      continue
    }
    // source 可能含有前缀 ../ , 导致file创建失败
    split := strings.Split(source, "/")
    fname := strconv.Itoa(rid) + "-" + split[len(split)-1]
    file, _ := os.Create(fname)
    for _, kv := range kva_i {
      _, _ = fmt.Fprintf(file, "%v %vn", kv.Key, kv.Value)
    }
    file.Close()
    res.Items[rid] = fname
  }

  return res
}

// ByKey for sorting by key.
type ByKey []KeyValue

// Len for sorting by key.
func (a ByKey) Len() int           { return len(a) }
func (a ByKey) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }

func (w *worker) doReduce(files []string, id int) int {

  kvaStr := ""
  for _, s := range files {
    file, err := os.Open(s)
    if err != nil {
      log.Fatalln(err)
    }
    bytes, _ := ioutil.ReadAll(file)
    kvaStr += string(bytes)
  }

  var it []KeyValue
  for _, kvStr := range strings.Split(kvaStr, "n") {
    split := strings.Split(kvStr, " ")
    if len(split) < 2 {
      continue
    }
    k := split[0]
    v := split[1]
    it = append(it, KeyValue{k, v})
  }
  sort.Sort(ByKey(it))
  var values []string
  reducef := w.reducef
  ofile, _ := os.Create("mr-out-" + strconv.Itoa(id))
  for i := 0; i < len(it); i++ {
    k := it[i].Key
    values = append(values, it[i].Value)
    for i+1 < len(it) && it[i+1].Key == k {
      values = append(values, it[i+1].Value)
      i++
    }
    output := reducef(k, values)
    values = []string{}
    _, _ = fmt.Fprintf(ofile, "%v %vn", k, output)
  }
  ofile.Close()
  //log.Printf("reduce task#%d done. save to %s", id, ofileName)
  return id
}
总结

lab1并没有特别顺利,中途遇到死锁、文件创建失败等问题,排查以来也有点费劲。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/736170.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号