栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 面试经验 > 面试问答

如何实现基于行的文件内容的并行处理

面试问答 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

如何实现基于行的文件内容的并行处理

在考虑并行化过程之前,您应该研究输入和计算以确保它有意义。

需要按顺序处理的输入不是很好的匹配,因为并行处理将需要附加的复杂指令来使事物保持顺序,如果这种策略是成功的话,很难预先评估。

同样,为了利用并行化,要运行的计算所花费的时间必须比同步并行任务所需的时间更长。通过批量处理数据可能会超过此成本,但是生成的算法将更加复杂,并会产生其他不良副作用(例如分配)。

否则,请勿并行化。

请参见下面各种计算时间长/短的实现示例及其产生的基准。

结论是,除非您计算出一个长时间运行的异步任务,该任务显然将超过同步成本,否则顺序处理会更快。

main.go

package mainimport (    "bufio"    "fmt"    "io"    "runtime"    "strings"    "sync"    "time")func main() {    data:= strings.Repeat(strings.Repeat("a", 1000)+"n", 1000)    run_line_short(data, true)    run_line_long(data, true)    run_line_short_workers(data, true)    run_line_long_workers(data, true)    run_bulk_short(data, true)    run_bulk_long(data, true)    run_seq_short(data, true)    run_seq_long(data, true)}func run_line_short(data string, stat bool) {    if stat {        s := stats("run_line_short")        defer s()    }    r := strings.NewReader(data)    err := process(r, line_handler_short)    if err != nil {        panic(err)    }}func run_line_long(data string, stat bool) {    if stat {        s := stats("run_line_long")        defer s()    }    r := strings.NewReader(data)    err := process(r, line_handler_long)    if err != nil {        panic(err)    }}func run_line_short_workers(data string, stat bool) {    if stat {        s := stats("run_line_short_workers")        defer s()    }    r := strings.NewReader(data)    err := processWorkers(r, line_handler_short)    if err != nil {        panic(err)    }}func run_line_long_workers(data string, stat bool) {    if stat {        s := stats("run_line_long_workers")        defer s()    }    r := strings.NewReader(data)    err := processWorkers(r, line_handler_long)    if err != nil {        panic(err)    }}func run_bulk_short(data string, stat bool) {    if stat {        s := stats("run_bulk_short")        defer s()    }    r := strings.NewReader(data)    err := processBulk(r, bulk_handler_short)    if err != nil {        panic(err)    }}func run_bulk_long(data string, stat bool) {    if stat {        s := stats("run_bulk_long")        defer s()    }    r := strings.NewReader(data)    err := processBulk(r, bulk_handler_long)    if err != nil {        panic(err)    }}func run_seq_short(data string, stat bool) {    if stat {        s := stats("run_seq_short")        defer s()    }    r := strings.NewReader(data)    err := processSeq(r, line_handler_short)    if err != nil {        panic(err)    }}func run_seq_long(data string, stat bool) {    if stat {        s := stats("run_seq_long")        defer s()    }    r := strings.NewReader(data)    err := processSeq(r, line_handler_long)    if err != nil {        panic(err)    }}func line_handler_short(k string) error {    _ = len(k)    return nil}func line_handler_long(k string) error {    <-time.After(time.Millisecond * 5)    _ = len(k)    return nil}func bulk_handler_short(b []string) error {    for _, k := range b {        _ = len(k)    }    return nil}func bulk_handler_long(b []string) error {    <-time.After(time.Millisecond * 5)    for _, k := range b {        _ = len(k)    }    return nil}func stats(name string) func() {    fmt.Printf("======================n")    fmt.Printf("%vn", name)    start := time.Now()    return func() {        fmt.Printf("time to run %vn", time.Since(start))        var ms runtime.MemStats        runtime.ReadMemStats(&ms)        fmt.Printf("Alloc: %d MB, TotalAlloc: %d MB, Sys: %d MBn", ms.Alloc/1024/1024, ms.TotalAlloc/1024/1024, ms.Sys/1024/1024)        fmt.Printf("Mallocs: %d, Frees: %dn", ms.Mallocs, ms.Frees)        fmt.Printf("HeapAlloc: %d MB, HeapSys: %d MB, HeapIdle: %d MBn", ms.HeapAlloc/1024/1024, ms.HeapSys/1024/1024, ms.HeapIdle/1024/1024)        fmt.Printf("HeapObjects: %dn", ms.HeapObjects)        fmt.Printf("n")    }}func process(r io.Reader, h func(string) error) error {    errs := make(chan error)    workers := make(chan struct{}, 4)    var wg sync.WaitGroup    go func() {        scanner := bufio.NewScanner(r)        for scanner.Scan() { workers <- struct{}{} // acquire a token wg.Add(1) go func(line string) {     defer wg.Done()     if err := h(line); err != nil {         errs <- err     }     <-workers }(scanner.Text())        }        wg.Wait()        if err := scanner.Err(); err != nil { errs <- err        }        close(errs)    }()    var err error    for e := range errs {        if e != nil && err == nil { err = e        }    }    return err}func processWorkers(r io.Reader, h func(string) error) error {    errs := make(chan error)    input := make(chan string)    y := 4    var wg sync.WaitGroup    for i := 0; i < y; i++ {        wg.Add(1)        go func() { defer wg.Done() for line := range input {     if err := h(line); err != nil {         errs <- err     } }        }()    }    go func() {        scanner := bufio.NewScanner(r)        for scanner.Scan() { input <- scanner.Text()        }        close(input)        wg.Wait()        if err := scanner.Err(); err != nil { errs <- err        }        close(errs)    }()    var err error    for e := range errs {        if err == nil && e != nil { err = e        }    }    return err}func processBulk(r io.Reader, h func([]string) error) error {    errs := make(chan error)    input := make(chan []string)    y := 4    var wg sync.WaitGroup    for i := 0; i < y; i++ {        wg.Add(1)        go func() { defer wg.Done() for bulk := range input {     if err := h(bulk); err != nil {         errs <- err     } }        }()    }    go func() {        scanner := bufio.NewScanner(r)        l := 50        bulk := make([]string, l)        i := 0        for scanner.Scan() { text := scanner.Text() bulk[i] = text i++ if i == l {     copied := make([]string, l, l)     copy(copied, bulk)     i = 0     input <- copied }        }        if i > 0 { input <- bulk[:i]        }        close(input)        if err := scanner.Err(); err != nil { errs <- err        }    }()    go func() {        wg.Wait()        close(errs)    }()    var err error    for e := range errs {        if err == nil && e != nil { err = e        }    }    return err}func processSeq(r io.Reader, h func(string) error) error {    scanner := bufio.NewScanner(r)    for scanner.Scan() {        text := scanner.Text()        if err := h(text); err != nil { return err        }    }    return scanner.Err()}

main_test.go

package mainimport (    "strings"    "testing")func Benchmark_run_line_short(b *testing.B) {    data:= strings.Repeat(strings.Repeat("a", 1000)+"n", 1000)    for i := 0; i < b.N; i++ {        run_line_short(data, false)    }}func Benchmark_run_line_long(b *testing.B) {    data:= strings.Repeat(strings.Repeat("a", 1000)+"n", 1000)    for i := 0; i < b.N; i++ {        run_line_long(data, false)    }}func Benchmark_run_line_short_workers(b *testing.B) {    data:= strings.Repeat(strings.Repeat("a", 1000)+"n", 1000)    for i := 0; i < b.N; i++ {        run_line_short_workers(data, false)    }}func Benchmark_run_line_long_workers(b *testing.B) {    data:= strings.Repeat(strings.Repeat("a", 1000)+"n", 1000)    for i := 0; i < b.N; i++ {        run_line_long_workers(data, false)    }}func Benchmark_run_bulk_short(b *testing.B) {    data:= strings.Repeat(strings.Repeat("a", 1000)+"n", 1000)    for i := 0; i < b.N; i++ {        run_bulk_short(data, false)    }}func Benchmark_run_bulk_long(b *testing.B) {    data:= strings.Repeat(strings.Repeat("a", 1000)+"n", 1000)    for i := 0; i < b.N; i++ {        run_bulk_long(data, false)    }}func Benchmark_run_seq_short(b *testing.B) {    data:= strings.Repeat(strings.Repeat("a", 1000)+"n", 1000)    for i := 0; i < b.N; i++ {        run_seq_short(data, false)    }}func Benchmark_run_seq_long(b *testing.B) {    data:= strings.Repeat(strings.Repeat("a", 1000)+"n", 1000)    for i := 0; i < b.N; i++ {        run_seq_long(data, false)    }}

结果

$ go run main.go ======================run_line_shorttime to run 2.747827msAlloc: 2 MB, TotalAlloc: 2 MB, Sys: 68 MBMallocs: 1378, Frees: 1HeapAlloc: 2 MB, HeapSys: 63 MB, HeapIdle: 61 MBHeapObjects: 1377======================run_line_longtime to run 1.30987804sAlloc: 3 MB, TotalAlloc: 3 MB, Sys: 68 MBMallocs: 5619, Frees: 5HeapAlloc: 3 MB, HeapSys: 63 MB, HeapIdle: 59 MBHeapObjects: 5614======================run_line_short_workerstime to run 4.54926msAlloc: 1 MB, TotalAlloc: 4 MB, Sys: 68 MBMallocs: 6648, Frees: 5743HeapAlloc: 1 MB, HeapSys: 63 MB, HeapIdle: 61 MBHeapObjects: 905======================run_line_long_workerstime to run 1.29874118sAlloc: 2 MB, TotalAlloc: 5 MB, Sys: 68 MBMallocs: 10670, Frees: 5747HeapAlloc: 2 MB, HeapSys: 63 MB, HeapIdle: 60 MBHeapObjects: 4923======================run_bulk_shorttime to run 1.279059msAlloc: 3 MB, TotalAlloc: 6 MB, Sys: 68 MBMallocs: 11695, Frees: 5751HeapAlloc: 3 MB, HeapSys: 63 MB, HeapIdle: 59 MBHeapObjects: 5944======================run_bulk_longtime to run 31.328652msAlloc: 1 MB, TotalAlloc: 7 MB, Sys: 68 MBMallocs: 12728, Frees: 11361HeapAlloc: 1 MB, HeapSys: 63 MB, HeapIdle: 61 MBHeapObjects: 1367======================run_seq_shorttime to run 956.991µsAlloc: 3 MB, TotalAlloc: 8 MB, Sys: 68 MBMallocs: 13746, Frees: 11160HeapAlloc: 3 MB, HeapSys: 63 MB, HeapIdle: 59 MBHeapObjects: 2586======================run_seq_longtime to run 5.195705859sAlloc: 1 MB, TotalAlloc: 9 MB, Sys: 68 MBMallocs: 17766, Frees: 15973HeapAlloc: 1 MB, HeapSys: 63 MB, HeapIdle: 61 MBHeapObjects: 1793[mh-cbon@Host-001 bulk] $ go test -bench=. -benchmem -count=4goos: linuxgoarch: amd64pkg: test/bulkBenchmark_run_line_short-4       1000       1750824 ns/op     1029354 B/op       1005 allocs/opBenchmark_run_line_short-4       1000       1747408 ns/op     1029348 B/op       1005 allocs/opBenchmark_run_line_short-4       1000       1757826 ns/op     1029352 B/op       1005 allocs/opBenchmark_run_line_short-4       1000       1758427 ns/op     1029352 B/op       1005 allocs/opBenchmark_run_line_long-41    1303037704 ns/op     2253776 B/op       4075 allocs/opBenchmark_run_line_long-41    1305074974 ns/op     2247792 B/op       4032 allocs/opBenchmark_run_line_long-41    1305353658 ns/op     2246320 B/op       4013 allocs/opBenchmark_run_line_long-41    1305725817 ns/op     2247792 B/op       4031 allocs/opBenchmark_run_line_short_workers-4          1000       2148354 ns/op     1029366 B/op       1005 allocs/opBenchmark_run_line_short_workers-4          1000       2139629 ns/op     1029370 B/op       1005 allocs/opBenchmark_run_line_short_workers-4          1000       1983352 ns/op     1029359 B/op       1005 allocs/opBenchmark_run_line_short_workers-4          1000       1909968 ns/op     1029363 B/op       1005 allocs/opBenchmark_run_line_long_workers-4   1    1298321093 ns/op     2247856 B/op       4013 allocs/opBenchmark_run_line_long_workers-4   1    1299846127 ns/op     2246384 B/op       4012 allocs/opBenchmark_run_line_long_workers-4   1    1300003625 ns/op     2246288 B/op       4011 allocs/opBenchmark_run_line_long_workers-4   1    1302779911 ns/op     2246256 B/op       4011 allocs/opBenchmark_run_bulk_short-4       2000        704358 ns/op     1082154 B/op       1011 allocs/opBenchmark_run_bulk_short-4       2000        708563 ns/op     1082147 B/op       1011 allocs/opBenchmark_run_bulk_short-4       2000        714687 ns/op     1082148 B/op       1011 allocs/opBenchmark_run_bulk_short-4       2000        705546 ns/op     1082156 B/op       1011 allocs/opBenchmark_run_bulk_long-4          50      31411412 ns/op     1051497 B/op       1088 allocs/opBenchmark_run_bulk_long-4          50      31513018 ns/op     1051544 B/op       1088 allocs/opBenchmark_run_bulk_long-4          50      31539311 ns/op     1051502 B/op       1088 allocs/opBenchmark_run_bulk_long-4          50      31564940 ns/op     1051505 B/op       1088 allocs/opBenchmark_run_seq_short-4        2000        574346 ns/op     1028632 B/op       1002 allocs/opBenchmark_run_seq_short-4        3000        572857 ns/op     1028464 B/op       1002 allocs/opBenchmark_run_seq_short-4        2000        580493 ns/op     1028632 B/op       1002 allocs/opBenchmark_run_seq_short-4        3000        572240 ns/op     1028464 B/op       1002 allocs/opBenchmark_run_seq_long-4 1    5196313302 ns/op     2245792 B/op       4005 allocs/opBenchmark_run_seq_long-4 1    5199995649 ns/op     2245792 B/op       4005 allocs/opBenchmark_run_seq_long-4 1    5200460425 ns/op     2245792 B/op       4005 allocs/opBenchmark_run_seq_long-4 1    5201080570 ns/op     2245792 B/op       4005 allocs/opPASSok      test/bulk   68.944s

注意:令我惊讶的

run_line_short_workers
是,它的速度慢于
run_line_short
,我没有解释该结果,但是使用pprof进行更深入的分析应该可以提供答案。



转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/484216.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号