Go 理解并发的一个经典例子
通过这个案例,我们一步步理解并发的一些知识点:
- channel作为多个goroutine的通信媒介
- select的用法
- 组等待,waitgroup的用法
- 限制并发度
- 广播的用法,利用关闭通道来通知多个goroutine
- 打印堆栈信息调试goroutine
案例背景:
类似于Linux下du -s命令,统计目录占用空间的大小(这里简化,我们只统计程序所处的当前目录下占用的空间)。
思路:需要一个递归函数,参数是目录名。遇到目录递归,普通文件则统计大小。 我们利用两个goroutine,文件大小放到channel,一个gor用来统计,一个gor专门取出统计计数。
代码:
package main
import (
  "fmt"
  "io/ioutil"
  "path/filepath"
)
func main() {
  curDirName := "." // 当前目录
  fileSizes := make(chan int64)
  go func() {
    walkDir(curDirName, fileSizes)
    close(fileSizes) // 结束后 通道必须关闭
  }()
  total := int64(0)
  for size := range fileSizes {
    total += size
  }
  fmt.Printf("total size %d MB\n", total/(1<<20))
}
func walkDir(dirname string, fileSizes chan<- int64) {
  // 利用 ioutil.ReadDir
  fileInfo, err := ioutil.ReadDir(dirname)
  if err != nil {
    return
  }
  for _, file := range fileInfo {
    if file.IsDir() {
      // filepath.Join 根据不同系统分隔符拼接
      tmpName := filepath.Join(dirname, file.Name())
      // 目录case 递归调用
      walkDir(tmpName, fileSizes)
    } else {
      fileSizes <- file.Size()
    }
  }
}
select用法
运行过程中,我们发现如果文件过多,统计时间比较长。这里我们加一个运行过程的展示,500ms输出一次当前的统计大小。
现在我们有两个通道,一个fileSize,一个计时器timer。在一个gor里,如果我们从fileSize中取消息,那同时我们没法从timer取;同样的反过来一样。这时想到了IO复用的使用场景,go也提供了这个用法,select出场。
select中的case通常使用通道操作,类似switch的也有default分支。
代码:
    // 定义一个500ms计时器
  timer := time.Tick(500 * time.Millisecond)
// 定义for的一个tag,供跳出使用
loop:
  for {
    select {
    case <-timer:
      fmt.Printf("size %dMB\n", total/(1<<20))
    case size, ok := <-fileSizes:
      if !ok {
        // channel close后,跳出select、for
        break loop
      }
      total += size
    }
  }
    
fmt.Printf("total size %dMB\n", total/(1<<20))
现在,可以看到了每隔500ms能有个阶段输出。
然而目前的程序运行的还是比较慢,遍历walkDir我们还没使用gor并发执行。 我们需要在walkDir函数中递归处,启一个单独的gor跑。只加go是不行的,这会影响关闭fileSise通道的时机。
sync.WaitGroup用法:
关闭通道的时机在于要知道这些gor运行都结束了,go提供了一个简单的用法,组等待sync.WaitGroup。开启一个gor时,add(1),运行结束时 done(),等待的gor里面 wait()即可。
var wg sync.WaitGroup
go func() {
    wg.Add(1)
    // wg要地址传递
    walkDir(curDirName, fileSizes, &wg)
    
    wg.Wait()
    close(fileSizes) // 结束后 通道必须关闭
}()
func walkDir(dirname string, fileSizes chan<- int64, wg *sync.WaitGroup) {
  // 函数结束时wg减1
  defer func() {
    wg.Done()
  }()
  // ... 省略代码
    
  // 运行一个gro加1
  wg.Add(1)
  go walkDir(tmpName, fileSizes, wg)
}
并发限制方法
这样无限并发也不是好的实践,通常受限于磁盘、网络等IO,我们不能无限制并发,需要加一个并发度限制。
通常利用一个buffer channel实现,buffer大小是并发度,即同一个时间有多少gor运行。
var bf = make(chan struct{}, 20)
func walkDir(dirname string, fileSizes chan<- int64, wg *sync.WaitGroup) {
  bf <- struct{}{} // 领令牌
  defer func() {
    <- bf // 使用完释放令牌
  }()
    
    // ... 省略代码
}
广播的用法– 关闭channel
我们难免会有这样的需求,A gor通知B gor在执行的事情终止掉。这相对容易,channel介入即可,A发一个消息给channel,B读到了就ok。 试想,如果A gor要告诉n多个gor停止执行。如何处理?是发送n个消息?那如果其中有gor自己本身就停止了,这又不可行了。
有一个机制可以使用,从关闭的channel中读取,直接会返回默认零值。正是,关闭一个channel来广播给其他gor,而不是发送给一个channel。
对这个案例来说,我们设定一个需求,统计过程中,我们键入字符来中断统计。
引入一个channel,通知给主gor和其他walkDir gor。
done := make(chan struct{})
// 阻塞等待输入,then 关闭channel
go func() {
  os.Stdin.Read(make([]byte, 1))
  close(done)
}()
for {
    select {
    case <-done:
      for range fileSizes {// do nothing
        // 主gor 退出前,让存在的walk gor顺利结束掉,以免它们一直阻塞着
      }
           // panic("close done")
      return
    case <-timer:
      fmt.Printf("size %dMB\n", total/(1<<20))
    // ... 省略代码
  }
    
// walkDir中 同样利用 <-done 通知
select {
  case bf <- struct{}{}:
  case <-done:
    return
  }
GOTRACEBACK调试
这里有一个range fileSizes操作,是为了让已经存在的walkDir goroutine能顺利结束,而不是阻塞在那。
加上range操作,和没有range操作,有个技巧:可以通过panic("xxx")的方法来看堆栈的输出。
执行的时候,需要加上 GOTRACEBACK=1, 这表示查看所有我们创建的goroutine中的堆栈,默认是只打印了当前引发panic的goroutine堆栈信息。
- GOTRACEBACK=none|0 忽略堆栈信息
- GOTRACEBACK=all|1 打印所有我们user创建的gor
- GOTRACEBACK=system|2 打印go系统中运行时的gor堆栈 包括GC等
缺少range操作
如我机器的执行:GOTRACEBACK=1 ./du
goroutine 9870 [select]:
main.walkDir(0xc000a55680, 0x4d, 0xc000064120, 0xc000014080)
  /Users/hansongda/gopro/src/du1.go:66 +0x103
created by main.walkDir
  /Users/hansongda/gopro/src/du1.go:87 +0x2e9
....
GOTRACEBACK=2 ./du
## GC操作
goroutine 1124 [GC worker (idle)]:
runtime.gopark(0x10e2900, 0xc0000db6d0, 0x1418, 0x0)
  /usr/local/go/src/runtime/proc.go:304 +0xe0 fp=0xc000032760 sp=0xc000032740 pc=0x102b4a0
runtime.gcBgMarkWorker(0xc000026a00)
  /usr/local/go/src/runtime/mgc.go:1846 +0xff fp=0xc0000327d8 sp=0xc000032760 pc=0x10189ef
runtime.goexit()
  /usr/local/go/src/runtime/asm_amd64.s:1357 +0x1 fp=0xc0000327e0 sp=0xc0000327d8 pc=0x1054ca1
created by runtime.gcBgMarkStartWorkers
  /usr/local/go/src/runtime/mgc.go:1794 +0x77
    
goroutine 15358 [runnable]:
runtime.gopark(0x10e2b30, 0x0, 0x1809, 0x1)
  /usr/local/go/src/runtime/proc.go:304 +0xe0 fp=0xc001233528 sp=0xc001233508 pc=0x102b4a0
runtime.selectgo(0xc001233760, 0xc0012336a8, 0x2, 0x0, 0x0)
  /usr/local/go/src/runtime/select.go:313 +0xc9b fp=0xc001233650 sp=0xc001233528 pc=0x103a12b
main.walkDir(0xc00047d920, 0x29, 0xc0000a80c0, 0xc00009a004)
  /Users/hansongda/gopro/src/du1.go:66 +0x103 fp=0xc0012337c0 sp=0xc001233650 pc=0x10a1e83
runtime.goexit()
  /usr/local/go/src/runtime/asm_amd64.s:1357 +0x1 fp=0xc0012337c8 sp=0xc0012337c0 pc=0x1054ca1
created by main.walkDir
  /Users/hansongda/gopro/src/du1.go:87 +0x2e9
....
带range操作
GOTRACEBACK=1 ./du
goroutine 1 [running]:
main.main()
  /Users/hansongda/gopro/src/du1.go:46 +0x375
可以看到带range操作的,只有main.main主gor打印堆栈信息;而不带range则有很多walkDir的gor在消耗资源。
参考
《Go程序设计语言》