一、select 多路复用
在某些场景下我们可能需要同时从多个通道接收数据。
通道在接收数据时,如果没有数据可以被接收那么当前 goroutine 将会发生阻塞。
你也许会写出如下代码尝试使用遍历的方式来实现从多个通道中接收值。
for{
// 尝试从ch1接收值
data, ok := <-ch1
// 尝试从ch2接收值
data, ok := <-ch2
…
}
这种方式虽然可以实现从多个通道接收值的需求,但是程序的运行性能会差很多。
Go 语言内置了 select 关键字,使用它可以同时响应多个通道的操作。
Select 的使用方式类似于之前学到的 switch 语句,它也有一系列 case 分支和一个默认的分支。
每个 case 分支会对应一个通道的通信(接收或发送)过程。
select 会一直等待,直到其中的某个 case 的通信操作完成时,就会执行该 case 分支对应的语句。
具体格式如下:
select {
case <-ch1:
//...
case data := <-ch2:
//...
case ch3 <- 10:
//...
default:
//默认操作
}
Select 语句具有以下特点。
可处理一个或多个 channel 的发送/接收操作。
如果多个 case 同时满足,select 会随机选择一个执行。
对于没有 case 的 select 会一直阻塞,可用于阻塞 main 函数,防止退出。
下面的示例代码能够在终端打印出 10 以内的奇数,我们借助这个代码片段来看一下 select 的具体使用。
package main
import "fmt"
func main() {
ch := make(chan int, 1)
for i := 1; i <= 10; i++ {
select {
case x := <-ch:
fmt.Println(x)
case ch <- i:
}
}
}
上面的代码输出内容如下。
1
3
5
7
9
示例中的代码首先是创建了一个缓冲区大小为 1 的通道 ch,进入 for 循环后:
第一次循环时 i = 1,select 语句中包含两个 case 分支,此时由于通道中没有值可以接收,所以 x := <-ch 这个 case 分支不满足,而 ch <- i 这个分支可以执行,会把 1 发送到通道中,结束本次 for 循环;
第二次 for 循环时,i = 2,由于通道缓冲区已满,所以 ch <- i 这个分支不满足,而 x := <-ch 这个分支可以执行,从通道接收值 1 并赋值给变量 x ,所以会在终端打印出 1;
后续的 for 循环以此类推会依次打印出 3、5、7、9。
二、通道误用示例
接下来,我们将展示两个因误用通道导致程序出现 bug 的代码片段,希望能够加深读者对通道操作的印象。
①、示例 1
各位读者可以查看以下示例代码,尝试找出其中存在的问题。
// demo1 通道误用导致的bug
func demo1() {
wg := sync.WaitGroup{}
ch := make(chan int, 10)
for i := 0; i < 10; i++ {
ch <- i
}
close(ch)
wg.Add(3)
for j := 0; j < 3; j++ {
go func() {
for {
task := <-ch
// 这里假设对接收的数据执行某些操作
fmt.Println(task)
}
wg.Done()
}()
}
wg.Wait()
}
将上述代码编译执行后,匿名函数所在的 goroutine 并不会按照预期在通道被关闭后退出。
因为 task := <- ch 的接收操作在通道被关闭后会一直接收到零值,而不会退出。
此处的接收操作应该使用 task, ok := <- ch ,通过判断布尔值 ok 为假时退出;或者使用 select 来处理通道。
②、示例2
各位读者阅读下方代码片段,尝试找出其中存在的问题。
// demo2 通道误用导致的bug
func demo2() {
ch := make(chan string)
go func() {
// 这里假设执行一些耗时的操作
time.Sleep(3 * time.Second)
ch <- "job result"
}()
select {
case result := <-ch:
fmt.Println(result)
case <-time.After(time.Second): // 较小的超时时间
return
}
}
上述代码片段可能导致 goroutine 泄露(goroutine 并未按预期退出并销毁)。
由于 select 命中了超时逻辑,导致通道没有消费者(无接收操作),而其定义的通道为无缓冲通道,因此 goroutine 中的ch <- "job result"操作会一直阻塞,最终导致 goroutine 泄露。
三、并发安全和锁
有时候我们的代码中可能会存在多个 goroutine 同时操作一个资源(临界区)的情况,这种情况下就会发生竞态问题(数据竞态)。
这就好比现实生活中十字路口被各个方向的汽车竞争,还有火车上的卫生间被车厢里的人竞争。
我们用下面的代码演示一个数据竞争的示例。
package main
import (
"fmt"
"sync"
)
var (
x int64
wg sync.WaitGroup // 等待组
)
// add 对全局变量x执行5000次加1操作
func add() {
for i := 0; i < 5000; i++ {
x = x + 1
}
wg.Done()
}
func main() {
wg.Add(2)
go add()
go add()
wg.Wait()
fmt.Println(x)
}
我们将上面的代码编译后执行,不出意外每次执行都会输出诸如 9537、5865、6527 等不同的结果。这是为什么呢?
在上面的示例代码片中,我们开启了两个 goroutine 分别执行 add 函数,这两个 goroutine 在访问和修改全局的 x 变量时就会存在数据竞争,某个 goroutine 中对全局变量x 的修改可能会覆盖掉另一个 goroutine 中的操作,所以导致最后的结果与预期不符。
3.1、互斥锁
互斥锁是一种常用的控制共享资源访问的方法,它能够保证同一时间只有一个 goroutine 可以访问共享资源。
Go 语言中使用 sync 包中提供的 Mutex 类型来实现互斥锁。
sync.Mutex 提供了两个方法供我们使用。

我们在下面的示例代码中使用互斥锁限制每次只有一个 goroutine 才能修改全局变量x,从而修复上面代码中的问题。
package main
import (
"fmt"
"sync"
)
// sync.Mutex
var (
x int64
wg sync.WaitGroup // 等待组
m sync.Mutex // 互斥锁
)
// add 对全局变量x执行5000次加1操作
func add() {
for i := 0; i < 5000; i++ {
m.Lock() // 修改x前加锁
x = x + 1
m.Unlock() // 改完解锁
}
wg.Done()
}
func main() {
wg.Add(2)
go add()
go add()
wg.Wait()
fmt.Println(x)
}
将上面的代码编译后多次执行,每一次都会得到预期中的结果——10000。
使用互斥锁能够保证同一时间有且只有一个 goroutine 进入临界区,其他的 goroutine 则在等待锁;当互斥锁释放后,等待的 goroutine 才可以获取锁进入临界区,多个 goroutine 同时等待一个锁时,唤醒的策略是随机的。
3.2、读写互斥锁
互斥锁是完全互斥的,但是实际上有很多场景是读多写少的,当我们并发的去读取一个资源而不涉及资源修改的时候是没有必要加互斥锁的,这种场景下使用读写锁是更好的一种选择。
读写锁在 Go 语言中使用 sync 包中的 RWMutex 类型。
sync.RWMutex 提供了以下 5 个方法。

读写锁分为两种:读锁和写锁。当一个 goroutine 获取到读锁之后,其他的 goroutine 如果是获取读锁会继续获得锁,如果是获取写锁就会等待;而当一个 goroutine 获取写锁之后,其他的 goroutine 无论是获取读锁还是写锁都会等待。
下面我们使用代码构造一个读多写少的场景,然后分别使用互斥锁和读写锁查看它们的性能差异。
var (
x int64
wg sync.WaitGroup
mutex sync.Mutex
rwMutex sync.RWMutex
)
// writeWithLock 使用互斥锁的写操作
func writeWithLock() {
mutex.Lock() // 加互斥锁
x = x + 1
time.Sleep(10 * time.Millisecond) // 假设读操作耗时10毫秒
mutex.Unlock() // 解互斥锁
wg.Done()
}
// readWithLock 使用互斥锁的读操作
func readWithLock() {
mutex.Lock() // 加互斥锁
time.Sleep(time.Millisecond) // 假设读操作耗时1毫秒
mutex.Unlock() // 释放互斥锁
wg.Done()
}
// writeWithLock 使用读写互斥锁的写操作
func writeWithRWLock() {
rwMutex.Lock() // 加写锁
x = x + 1
time.Sleep(10 * time.Millisecond) // 假设读操作耗时10毫秒
rwMutex.Unlock() // 释放写锁
wg.Done()
}
// readWithRWLock 使用读写互斥锁的读操作
func readWithRWLock() {
rwMutex.RLock() // 加读锁
time.Sleep(time.Millisecond) // 假设读操作耗时1毫秒
rwMutex.RUnlock() // 释放读锁
wg.Done()
}
func do(wf, rf func(), wc, rc int) {
start := time.Now()
// wc个并发写操作
for i := 0; i < wc; i++ {
wg.Add(1)
go wf()
}
// rc个并发读操作
for i := 0; i < rc; i++ {
wg.Add(1)
go rf()
}
wg.Wait()
cost := time.Since(start)
fmt.Printf("x:%v cost:%v\n", x, cost)
}
我们假设每一次读操作都会耗时 1ms,而每一次写操作会耗时 10ms,我们分别测试使用互斥锁和读写互斥锁执行 10 次并发写和 1000 次并发读的耗时数据。
// 使用互斥锁,10并发写,1000并发读
do(writeWithLock, readWithLock, 10, 1000) // x:10 cost:1.466500951s
// 使用读写互斥锁,10并发写,1000并发读
do(writeWithRWLock, readWithRWLock, 10, 1000) // x:10 cost:117.207592ms
从最终的执行结果可以看出,使用读写互斥锁在读多写少的场景下能够极大地提高程序的性能。
不过需要注意的是如果一个程序中的读操作和写操作数量级差别不大,那么读写互斥锁的优势就发挥不出来。
3.3、sync.WaitGroup
在代码中生硬的使用 time.Sleep 肯定是不合适的,Go 语言中可以使用 sync.WaitGroup 来实现并发任务的同步。
sync.WaitGroup 有以下几个方法:

sync.WaitGroup 内部维护着一个计数器,计数器的值可以增加和减少。
例如当我们启动了 N 个并发任务时,就将计数器值增加 N。
每个任务完成时通过调用 Done 方法将计数器减 1。
通过调用 Wait 来等待并发任务执行完,当计数器值为 0 时,表示所有并发任务已经完成。
我们利用 sync.WaitGroup 将上面的代码优化一下:
var wg sync.WaitGroup
func hello() {
defer wg.Done()
fmt.Println("Hello Goroutine!")
}
func main() {
wg.Add(1)
go hello() // 启动另外一个goroutine去执行hello函数
fmt.Println("main goroutine done!")
wg.Wait()
}
需要注意 sync.WaitGroup 是一个结构体,进行参数传递的时候要传递指针。
3.4、sync.Once
在某些场景下我们需要确保某些操作即使在高并发的场景下也只会被执行一次,例如只加载一次配置文件等。
Go 语言中的 sync 包中提供了一个针对只执行一次场景的解决方案——sync.Once,sync.Once 只有一个 Do 方法,其签名如下:
func (o *Once) Do(f func())
**注意:**如果要执行的函数 f 需要传递参数就需要搭配闭包来使用。
①、加载配置文件示例
延迟一个开销很大的初始化操作到真正用到它的时候再执行是一个很好的实践。
因为预先初始化一个变量(比如在 init 函数中完成初始化)会增加程序的启动耗时,而且有可能实际执行过程中这个变量没有用上,那么这个初始化操作就不是必须要做的。
我们来看一个例子:
var icons map[string]image.Image
func loadIcons() {
icons = map[string]image.Image{
"left": loadIcon("left.png"),
"up": loadIcon("up.png"),
"right": loadIcon("right.png"),
"down": loadIcon("down.png"),
}
}
// Icon 被多个goroutine调用时不是并发安全的
func Icon(name string) image.Image {
if icons == nil {
loadIcons()
}
return icons[name]
}
多个 goroutine 并发调用 Icon 函数时不是并发安全的,现代的编译器和 CPU 可能会在保证每个 goroutine 都满足串行一致的基础上自由地重排访问内存的顺序。loadIcons 函数可能会被重排为以下结果:
func loadIcons() {
icons = make(map[string]image.Image)
icons["left"] = loadIcon("left.png")
icons["up"] = loadIcon("up.png")
icons["right"] = loadIcon("right.png")
icons["down"] = loadIcon("down.png")
}
在这种情况下就会出现即使判断了 icons 不是 nil 也不意味着变量初始化完成了。
考虑到这种情况,我们能想到的办法就是添加互斥锁,保证初始化 icons 的时候不会被其他的 goroutine 操作,但是这样做又会引发性能问题。
使用 sync.Once 改造的示例代码如下:
var icons map[string]image.Image
var loadIconsOnce sync.Once
func loadIcons() {
icons = map[string]image.Image{
"left": loadIcon("left.png"),
"up": loadIcon("up.png"),
"right": loadIcon("right.png"),
"down": loadIcon("down.png"),
}
}
// Icon 是并发安全的
func Icon(name string) image.Image {
loadIconsOnce.Do(loadIcons)
return icons[name]
}
②、并发安全的单例模式
下面是借助 sync.Once 实现的并发安全的单例模式:
package singleton
import (
"sync"
)
type singleton struct {}
var instance *singleton
var once sync.Once
func GetInstance() *singleton {
once.Do(func() {
instance = &singleton{}
})
return instance
}
sync.Once 其实内部包含一个互斥锁和一个布尔值,互斥锁保证布尔值和数据的安全,而布尔值用来记录初始化是否完成。
这样设计就能保证初始化操作的时候是并发安全的并且初始化操作也不会被执行多次。
3.5、sync.Map
Go 语言中内置的 map 不是并发安全的,请看下面这段示例代码。
package main
import (
"fmt"
"strconv"
"sync"
)
var m = make(map[string]int)
func get(key string) int {
return m[key]
}
func set(key string, value int) {
m[key] = value
}
func main() {
wg := sync.WaitGroup{}
for i := 0; i < 10; i++ {
wg.Add(1)
go func(n int) {
key := strconv.Itoa(n)
set(key, n)
fmt.Printf("k=:%v,v:=%v\n", key, get(key))
wg.Done()
}(i)
}
wg.Wait()
}
将上面的代码编译后执行,会报出 fatal error: concurrent map writes 错误。
我们不能在多个 goroutine 中并发对内置的 map 进行读写操作,否则会存在数据竞争问题。
像这种场景下就需要为 map 加锁来保证并发的安全性了,Go 语言的 sync 包中提供了一个开箱即用的并发安全版 map——sync.Map。
开箱即用表示其不用像内置的 map 一样使用 make 函数初始化就能直接使用。
同时 sync.Map 内置了诸如 Store、Load、LoadOrStore、Delete、Range 等操作方法。

下面的代码示例演示了并发读写 sync.Map。
package main
import (
"fmt"
"strconv"
"sync"
)
// 并发安全的map
var m = sync.Map{}
func main() {
wg := sync.WaitGroup{}
// 对m执行20个并发的读写操作
for i := 0; i < 20; i++ {
wg.Add(1)
go func(n int) {
key := strconv.Itoa(n)
m.Store(key, n) // 存储key-value
value, _ := m.Load(key) // 根据key取值
fmt.Printf("k=:%v,v:=%v\n", key, value)
wg.Done()
}(i)
}
wg.Wait()
}
四、原子操作
针对整数数据类型(int32、uint32、int64、uint64)我们还可以使用原子操作来保证并发安全,通常直接使用原子操作比使用锁操作效率更高。
Go 语言中原子操作由内置的标准库 sync/atomic 提供。
①、atomic 包

②、示例
我们填写一个示例来比较下互斥锁和原子操作的性能。
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
type Counter interface {
Inc()
Load() int64
}
// 普通版
type CommonCounter struct {
counter int64
}
func (c CommonCounter) Inc() {
c.counter++
}
func (c CommonCounter) Load() int64 {
return c.counter
}
// 互斥锁版
type MutexCounter struct {
counter int64
lock sync.Mutex
}
func (m *MutexCounter) Inc() {
m.lock.Lock()
defer m.lock.Unlock()
m.counter++
}
func (m *MutexCounter) Load() int64 {
m.lock.Lock()
defer m.lock.Unlock()
return m.counter
}
// 原子操作版
type AtomicCounter struct {
counter int64
}
func (a *AtomicCounter) Inc() {
atomic.AddInt64(&a.counter, 1)
}
func (a *AtomicCounter) Load() int64 {
return atomic.LoadInt64(&a.counter)
}
func test(c Counter) {
var wg sync.WaitGroup
start := time.Now()
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
c.Inc()
wg.Done()
}()
}
wg.Wait()
end := time.Now()
fmt.Println(c.Load(), end.Sub(start))
}
func main() {
c1 := CommonCounter{} // 非并发安全
test(c1)
c2 := MutexCounter{} // 使用互斥锁实现并发安全
test(&c2)
c3 := AtomicCounter{} // 并发安全且比互斥锁效率更高
test(&c3)
}
atomic 包提供了底层的原子级内存操作,对于同步算法的实现很有用。
这些函数必须谨慎地保证正确使用。
除了某些特殊的底层应用,使用通道或者 sync 包的函数/类型实现同步更好。
五、练习题
使用 goroutine 和 channel 实现一个计算int64随机数各位数和的程序,例如生成随机数 61345,计算其每个位数上的数字之和为 19。
开启一个 goroutine 循环生成 int64 类型的随机数,发送到 jobChan
开启 24 个 goroutine 从 jobChan 中取出随机数计算各位数的和,将结果发送到 resultChan
主 goroutine 从 resultChan 取出结果并打印到终端输出
Go基础 第16.4章 并发-安全和锁