返回首页 - Notes - 2017

Go 并发编程


同步等待

使用 sync.WaitGroup 的基本套路是:

  1. 声明一个 sync.WaitGroup 类型的变量
  2. 使用 Add() 方法设置 Go 程的数量
  3. 启动指定数量的 Go 程,并在每个 Go 程结束返回前调用 Done() 方法通知其已完成
  4. 在主程序中调用 Wait() 方法阻塞后续的执行,直到全部 Go 程返回

示例如下:

package main

import (
  "fmt"
  "runtime"
  "sync"
)

// 打印三轮字母表
func printChars(wg *sync.WaitGroup, start, end byte) {
  defer wg.Done()

  for count := 0; count < 3; count++ {
    for char := start; char <= end; char++ {
      fmt.Printf("%c ", char)
    }
  }

  fmt.Println()
}

func main() {
  // 为每个可用的 CPU 核心分配一个逻辑处理器,以使 Go 程可以并行运行
  runtime.GOMAXPROCS(runtime.NumCPU())

  var wg sync.WaitGroup
  wg.Add(2) // 启动两个 Go 程序

  fmt.Println("程序开始")

  go printChars(&wg, 'a', 'z')
  go printChars(&wg, 'A', 'Z')

  fmt.Println("等待结束")
  wg.Wait() // 等待全部 Go 程结束返回

  fmt.Println("程序终止")
}

原子操作

如果多个 Go 程都需要操作同一个数值变量,可以用 sync/atomic 中提供的方法来确保安全操作

示例如下:

package main

import (
  "fmt"
  "runtime"
  "sync"
  "sync/atomic"
  "time"
)

func doWork(wg *sync.WaitGroup, name string, shutdown *int64) {
  defer wg.Done()

  for {
    fmt.Printf("正在完成 %s\n", name)
    time.Sleep(250 * time.Millisecond)

    // 收到停止信号时退出循环
    if atomic.LoadInt64(shutdown) == 1 {
      fmt.Printf("停止 %s\n", name)
      break
    }
  }
}

func main() {
  var (
    shutdown int64
    wg       sync.WaitGroup
  )

  runtime.GOMAXPROCS(runtime.NumCPU())

  wg.Add(2)

  go doWork(&wg, "A", &shutdown)
  go doWork(&wg, "B", &shutdown)

  // 让上面两个 Go 程运行一秒
  time.Sleep(1 * time.Second)

  // 一秒后发送停止的信号(通过将 shutdown 的值改成 1)
  atomic.StoreInt64(&shutdown, 1)

  wg.Wait()
}

互斥锁

可以给一段代码加锁,同一时刻只允许一个 Go 程执行该代码段

示例如下:

package main

import (
  "fmt"
  "runtime"
  "sync"
)

func incCounter(wg *sync.WaitGroup, mu *sync.Mutex, count *int64) {
  defer wg.Done()

  for i := 0; i < 2; i++ {
    // 加锁,同一时刻只允许一个 Go 程进入临界区
    mu.Lock()

    // 临界区
    {
      value := *count
      runtime.Gosched() // 特意将当前 Go 程暂停执行,放回待执行队列
      value++
      *count = value
    }

    // 解锁,允许其他 Go 程进入临界区
    mu.Unlock()
  }
}

func main() {
  var (
    count int64
    wg    sync.WaitGroup
    mu    sync.Mutex
  )

  runtime.GOMAXPROCS(runtime.NumCPU())

  wg.Add(10)

  for i := 0; i < 10; i++ {
    go incCounter(&wg, &mu, &count)
  }

  wg.Wait()

  fmt.Println(count)
}

通道

通道的类型

  1. chan int:可发送也可接收 int 类型的数据
  2. chan<- int:只可发送 int 类型的数据
  3. <-chan int:只可接收 int 类型的数据

通道的基本使用:

package main

import "fmt"

func gen_chan() <-chan string {
  ch := make(chan string) // 创建一个无缓冲的通道

  go func() {
    ch <- string("张三") // 往通道填充数据
    ch <- string("李四") // 往通道填充数据
    ch <- string("王五") // 往通道填充数据
    close(ch)          // 关闭通道
  }()

  return ch // 返回通道的句柄
}

func main() {
  ch := gen_chan() // 得到通道的句柄

  // 循环推出通道中的数据,先进先出
  for name := range ch {
    fmt.Println(name)
  }
}

用一个无缓冲通道模拟两人打球:

package main

import (
  "fmt"
  "math/rand"
  "sync"
  "time"
)

func player(wg *sync.WaitGroup, name string, ch chan int) {
  defer wg.Done()

  for {
    ball, ok := <-ch
    if !ok {
      // 如果通道被关闭,就说明赢了
      fmt.Printf("%s赢了\n", name)
      return
    }

    // 产生一个小于 100 的随机数
    n := rand.Intn(100)
    if n%13 == 0 {
      // 我们设定一个规则:如果产生的随机数能被 13 整除,就放弃接球,然后认输
      fmt.Printf("随机数是 %d,%s没接到\n", n, name)
      // 关闭通道,认输
      close(ch)
      return
    }

    // 接到了球,将球的序号加 1
    fmt.Printf("随机数是 %d,%s接中了 %d 号球\n", n, name, ball)
    ball++

    // 将球打回去
    ch <- ball
  }
}

func init() {
  rand.Seed(time.Now().UnixNano())
}

func main() {
  // 创建一个无缓冲通道
  ch := make(chan int)

  var wg sync.WaitGroup
  wg.Add(2)

  // 两个运动员
  go player(&wg, "张三", ch)
  go player(&wg, "李四", ch)

  // 模拟发球
  ch <- 1

  wg.Wait()
}

使用无缓冲通道模拟四人接力跑步:

package main

import (
  "fmt"
  "sync"
  "time"
)

func run(wg *sync.WaitGroup, baton chan int) {
  runner := <-baton
  fmt.Printf("%d 号运动员已接棒,准备接力跑\n", runner)

  // 模拟持棒跑一圈
  time.Sleep(100 * time.Millisecond)

  // 第4号跑完,比赛结束
  if runner == 4 {
    fmt.Printf("%d 号已跑完,比赛结束\n", runner)
    wg.Done()
    return
  }

  // 如果不是第4号,那就通知下一个运动员做好准备
  go run(wg, baton)

  // 交接接力棒给下一个运动员
  newRunner := runner + 1
  fmt.Printf("%d 号将接力棒交给 %d 号\n", runner, newRunner)

  baton <- newRunner
}

func main() {
  baton := make(chan int)

  var wg sync.WaitGroup
  wg.Add(1)

  go run(&wg, baton)

  // 从1号开始起跑
  baton <- 1

  // 等待接力结束
  wg.Wait()
}

使用有缓冲通道模拟工厂流水线作业:

package main

import (
  "fmt"
  "sync"
  "time"
)

func worker(wg *sync.WaitGroup, tasks chan string, n int) {
  defer wg.Done()

  for {
    task, ok := <-tasks
    // 如果流水线上取不到任务,说明无活可干了,结束工作
    if !ok {
      fmt.Printf("%d 号工人:没任务了\n", n)
      return
    }

    fmt.Printf("%d 号工人:准备开始 %s\n", n, task)

    // 模拟工作耗时
    time.Sleep(300 * time.Millisecond)

    fmt.Printf("%d 号工人:已完成 %s\n", n, task)
  }
}

func main() {
  const (
    workerNums = 3  // 工人数目
    taskNums   = 10 // 任务数目
  )

  // 创建一个指定大小的有缓冲通道(模拟流水线)
  tasks := make(chan string, taskNums)

  var wg sync.WaitGroup
  wg.Add(workerNums)

  // 通知指定数目的工人做好工作准备
  for n := 1; n <= workerNums; n++ {
    go worker(&wg, tasks, n)
  }

  // 给任务流水线添加任务
  for n := 1; n <= taskNums; n++ {
    tasks <- fmt.Sprintf("%d 号任务", n)
  }

  // 添加完任务后关闭通道(流水线),以便工人们知道该结束工作了
  close(tasks)

  // 等待工人们完成工作
  wg.Wait()

  fmt.Printf("%d 个任务已由 %d 个工人完成\n", taskNums, workerNums)
}

date:2017-07-03、2017-07-06、2017-07-08、2017-07-09