Go 理解并发的一个经典例子
通过这个案例,我们一步步理解并发的一些知识点:
- channel作为多个goroutine的通信媒介
- select的用法
- 组等待,waitgroup的用法
- 限制并发度
- 广播的用法,利用关闭通道来通知多个goroutine
- 打印堆栈信息调试goroutine
案例背景:
类似于Linux下du -s命令,统计目录占用空间的大小(这里简化,我们只统计程序所处的当前目录下占用的空间)。
思路:需要一个递归函数,参数是目录名。遇到目录递归,普通文件则统计大小。 我们利用两个goroutine,文件大小放到channel,一个gor用来统计,一个gor专门取出统计计数。
代码:
package main
import (
"fmt"
"io/ioutil"
"path/filepath"
)
func main() {
curDirName := "." // 当前目录
fileSizes := make(chan int64)
go func() {
walkDir(curDirName, fileSizes)
close(fileSizes) // 结束后 通道必须关闭
}()
total := int64(0)
for size := range fileSizes {
total += size
}
fmt.Printf("total size %d MB\n", total/(1<<20))
}
func walkDir(dirname string, fileSizes chan<- int64) {
// 利用 ioutil.ReadDir
fileInfo, err := ioutil.ReadDir(dirname)
if err != nil {
return
}
for _, file := range fileInfo {
if file.IsDir() {
// filepath.Join 根据不同系统分隔符拼接
tmpName := filepath.Join(dirname, file.Name())
// 目录case 递归调用
walkDir(tmpName, fileSizes)
} else {
fileSizes <- file.Size()
}
}
}
select用法
运行过程中,我们发现如果文件过多,统计时间比较长。这里我们加一个运行过程的展示,500ms输出一次当前的统计大小。
现在我们有两个通道,一个fileSize,一个计时器timer。在一个gor里,如果我们从fileSize中取消息,那同时我们没法从timer取;同样的反过来一样。这时想到了IO复用的使用场景,go也提供了这个用法,select出场。
select中的case通常使用通道操作,类似switch的也有default分支。
代码:
// 定义一个500ms计时器
timer := time.Tick(500 * time.Millisecond)
// 定义for的一个tag,供跳出使用
loop:
for {
select {
case <-timer:
fmt.Printf("size %dMB\n", total/(1<<20))
case size, ok := <-fileSizes:
if !ok {
// channel close后,跳出select、for
break loop
}
total += size
}
}
fmt.Printf("total size %dMB\n", total/(1<<20))
现在,可以看到了每隔500ms能有个阶段输出。
然而目前的程序运行的还是比较慢,遍历walkDir我们还没使用gor并发执行。 我们需要在walkDir函数中递归处,启一个单独的gor跑。只加go是不行的,这会影响关闭fileSise通道的时机。
sync.WaitGroup用法:
关闭通道的时机在于要知道这些gor运行都结束了,go提供了一个简单的用法,组等待sync.WaitGroup。开启一个gor时,add(1),运行结束时 done(),等待的gor里面 wait()即可。
var wg sync.WaitGroup
go func() {
wg.Add(1)
// wg要地址传递
walkDir(curDirName, fileSizes, &wg)
wg.Wait()
close(fileSizes) // 结束后 通道必须关闭
}()
func walkDir(dirname string, fileSizes chan<- int64, wg *sync.WaitGroup) {
// 函数结束时wg减1
defer func() {
wg.Done()
}()
// ... 省略代码
// 运行一个gro加1
wg.Add(1)
go walkDir(tmpName, fileSizes, wg)
}
并发限制方法
这样无限并发也不是好的实践,通常受限于磁盘、网络等IO,我们不能无限制并发,需要加一个并发度限制。
通常利用一个buffer channel实现,buffer大小是并发度,即同一个时间有多少gor运行。
var bf = make(chan struct{}, 20)
func walkDir(dirname string, fileSizes chan<- int64, wg *sync.WaitGroup) {
bf <- struct{}{} // 领令牌
defer func() {
<- bf // 使用完释放令牌
}()
// ... 省略代码
}
广播的用法– 关闭channel
我们难免会有这样的需求,A gor通知B gor在执行的事情终止掉。这相对容易,channel介入即可,A发一个消息给channel,B读到了就ok。 试想,如果A gor要告诉n多个gor停止执行。如何处理?是发送n个消息?那如果其中有gor自己本身就停止了,这又不可行了。
有一个机制可以使用,从关闭的channel中读取,直接会返回默认零值。正是,关闭一个channel来广播给其他gor,而不是发送给一个channel。
对这个案例来说,我们设定一个需求,统计过程中,我们键入字符来中断统计。
引入一个channel,通知给主gor和其他walkDir gor。
done := make(chan struct{})
// 阻塞等待输入,then 关闭channel
go func() {
os.Stdin.Read(make([]byte, 1))
close(done)
}()
for {
select {
case <-done:
for range fileSizes {// do nothing
// 主gor 退出前,让存在的walk gor顺利结束掉,以免它们一直阻塞着
}
// panic("close done")
return
case <-timer:
fmt.Printf("size %dMB\n", total/(1<<20))
// ... 省略代码
}
// walkDir中 同样利用 <-done 通知
select {
case bf <- struct{}{}:
case <-done:
return
}
GOTRACEBACK调试
这里有一个range fileSizes
操作,是为了让已经存在的walkDir goroutine能顺利结束,而不是阻塞在那。
加上range操作,和没有range操作,有个技巧:可以通过panic("xxx")
的方法来看堆栈的输出。
执行的时候,需要加上 GOTRACEBACK=1
, 这表示查看所有我们创建的goroutine中的堆栈,默认是只打印了当前引发panic的goroutine堆栈信息。
- GOTRACEBACK=none|0 忽略堆栈信息
- GOTRACEBACK=all|1 打印所有我们user创建的gor
- GOTRACEBACK=system|2 打印go系统中运行时的gor堆栈 包括GC等
缺少range操作
如我机器的执行:GOTRACEBACK=1 ./du
goroutine 9870 [select]:
main.walkDir(0xc000a55680, 0x4d, 0xc000064120, 0xc000014080)
/Users/hansongda/gopro/src/du1.go:66 +0x103
created by main.walkDir
/Users/hansongda/gopro/src/du1.go:87 +0x2e9
....
GOTRACEBACK=2 ./du
## GC操作
goroutine 1124 [GC worker (idle)]:
runtime.gopark(0x10e2900, 0xc0000db6d0, 0x1418, 0x0)
/usr/local/go/src/runtime/proc.go:304 +0xe0 fp=0xc000032760 sp=0xc000032740 pc=0x102b4a0
runtime.gcBgMarkWorker(0xc000026a00)
/usr/local/go/src/runtime/mgc.go:1846 +0xff fp=0xc0000327d8 sp=0xc000032760 pc=0x10189ef
runtime.goexit()
/usr/local/go/src/runtime/asm_amd64.s:1357 +0x1 fp=0xc0000327e0 sp=0xc0000327d8 pc=0x1054ca1
created by runtime.gcBgMarkStartWorkers
/usr/local/go/src/runtime/mgc.go:1794 +0x77
goroutine 15358 [runnable]:
runtime.gopark(0x10e2b30, 0x0, 0x1809, 0x1)
/usr/local/go/src/runtime/proc.go:304 +0xe0 fp=0xc001233528 sp=0xc001233508 pc=0x102b4a0
runtime.selectgo(0xc001233760, 0xc0012336a8, 0x2, 0x0, 0x0)
/usr/local/go/src/runtime/select.go:313 +0xc9b fp=0xc001233650 sp=0xc001233528 pc=0x103a12b
main.walkDir(0xc00047d920, 0x29, 0xc0000a80c0, 0xc00009a004)
/Users/hansongda/gopro/src/du1.go:66 +0x103 fp=0xc0012337c0 sp=0xc001233650 pc=0x10a1e83
runtime.goexit()
/usr/local/go/src/runtime/asm_amd64.s:1357 +0x1 fp=0xc0012337c8 sp=0xc0012337c0 pc=0x1054ca1
created by main.walkDir
/Users/hansongda/gopro/src/du1.go:87 +0x2e9
....
带range操作
GOTRACEBACK=1 ./du
goroutine 1 [running]:
main.main()
/Users/hansongda/gopro/src/du1.go:46 +0x375
可以看到带range操作的,只有main.main主gor打印堆栈信息;而不带range则有很多walkDir的gor在消耗资源。
参考
《Go程序设计语言》