Golang之并发编程

接上次的基础语法部分,还剩下的主要是并发相关的内容,现在补全;也略微提及了点测试相关的内容;
在 Go 中,并发程序主要使用的是 Go 的 goroutine 和 channel;相对传统的语言,简化了很多,调度上的调整也使程序高效了很多;没讲到的应该就是反射和 Web,这两块以后随用随学吧,目前这些东西也够了。
没有贴太多的代码,以及剩下的 Go 标准库使用的相关代码都放到这个仓库了: bfchengnuo/GoCS

goroutine

Go 语言的并发主要通过 goroutine 实现。goroutine 类似于线程,属于用户态的线程,我们可以根据需要创建成千上万个 goroutine 并发工作。goroutine 是由 Go 语言的运行时(runtime)调度完成,而线程是由操作系统调度完成
Go 语言还提供 channel 用于在多个 goroutine 间进行通信。goroutine 和 channel 是 Go 语言秉承的 CSP(Communicating Sequential Process)现代并发模式的重要实现基础。

传统上我们实现并发编程的时候,我们通常需要自己维护一个线程池,并且需要自己去包装一个又一个的任务,还要处理好调度问题,一不小心就出问题,还难以调试;那么能不能有一种机制,程序员只需要定义很多个任务,让系统去帮助我们把这些任务分配到 CPU 上实现并发执行呢?
Go 语言中的 goroutine 就是这样一种机制,goroutine 的概念类似于线程,但 goroutine 是由 Go 的运行时(runtime)调度和管理的。Go 程序会智能地将 goroutine 中的任务合理地分配给每个CPU
Go 语言之所以被称为现代化的编程语言,就是因为它在语言层面已经内置了调度和上下文切换的机制

在 Go 语言编程中你不需要去自己写进程、线程、协程,你的技能包里只有一个技能:goroutine,当你需要让某个任务并发执行的时候,你只需要把这个任务包装成一个函数,开启一个 goroutine 去执行这个函数就可以了,就是这么简单粗暴。
Go 语言中使用 goroutine 非常简单,只需要在调用函数的时候在前面加上 go 关键字,就可以为一个函数创建一个 goroutine。
一个 goroutine 必定对应一个函数,可以创建多个 goroutine 去执行相同的函数。

主函数也是运行在一个 goroutine 中,我们称为 main goroutine;当主函数返回时,所有的 goroutine 都会被直接打断,程序退出,这也算是一种终结 goroutine 的方式。
另一种比较友好的方式就是使用 goroutine 之间的通信来告知其他 goroutine 自行结束。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
var wg sync.WaitGroup

// Go 的并发主要依赖 goroutine 和 channel
func main() {
for i := 0; i < 10; i++ {
wg.Add(1) // 计数 +1
go hello(i) // 值传递,copy
}

// 等待所有 wg 完成
wg.Wait()
}

func hello(i int) {
defer wg.Done() // goroutine 结束就登记 -1
fmt.Println("Hello Goroutine!", i)
}

为了避免主线程结束其他打断 goroutine,暂时使用了 sync 的 WaitGroup 进行计数等待。

可增长栈

OS 线程(操作系统线程)一般都有固定的栈内存(通常为 2MB),一个 goroutine 的栈在其生命周期开始时只有很小的栈(典型情况下 2KB),goroutine 的栈是不固定的,他可以按需增大和缩小,goroutine 的栈大小限制可以达到 1GB,虽然极少会用到这个大。所以在 Go 语言中一次创建十万左右的 goroutine 也是可以的。

GPM调度

GPM 是 Go 语言运行时(runtime)层面的实现,是 go 语言自己实现的一套调度系统。区别于操作系统调度 OS 线程。

  • G 很好理解,就是 goroutine,里面除了存放本 goroutine 信息外 还有与所在 P 的绑定等信息。
  • P 管理着一组 goroutine 队列,P 里面会存储当前 goroutine 运行的上下文环境(函数指针,堆栈地址及地址边界),P 会对自己管理的 goroutine 队列做一些调度(比如把占用 CPU 时间较长的 goroutine 暂停、运行后续的 goroutine 等等)当自己的队列消费完了就去全局队列里取,如果全局队列里也消费完了会去其他 P 的队列里抢任务
  • M(machine)是 Go 运行时(runtime)对操作系统内核线程的虚拟, M 与内核线程一般是一一映射的关系, 一个 groutine 最终是要放到 M 上执行的;

P 与 M 一般也是一一对应的。他们关系是: P 管理着一组 G 挂载在 M 上运行。
当一个 G 长久阻塞在一个 M 上时,runtime 会新建一个 M,这时 P 会把其他阻塞的 G 挂载在新建的 M 上。当耗时的 G 阻塞完成或者认为其已经死掉时,会回收旧的 M。

P 的个数是通过 runtime.GOMAXPROCS 设定(最大 256),Go1.5 版本之后默认为物理线程数。 在并发量大的时候会增加一些 P 和 M,但不会太多,切换太频繁的话得不偿失;这个值可以理解为有多少个系统线程同时执行 Go 代码

Go 运行时的调度器使用 GOMAXPROCS 参数来确定需要使用多少个 OS 线程来同时执行 Go 代码。默认值是机器上的 CPU 核心数。例如在一个 8 核心的机器上,调度器会把 Go 代码同时调度到 8 个 OS 线程上。
Go 语言中可以通过 runtime.GOMAXPROCS() 函数设置当前程序并发时占用的 CPU 逻辑核心数。
Go1.5 版本之前,默认使用的是单核心执行。Go1.5 版本之后,默认使用全部的 CPU 逻辑核心数(跑满!)。

单从线程调度讲,Go 语言相比起其他语言的优势在于 OS 线程是由 OS 内核来调度的,goroutine 则是由 Go 运行时(runtime)自己的调度器调度的,这个调度器使用一个称为 m:n 调度的技术(复用/调度 m 个 goroutine 到 n 个 OS 线程)。

goroutine 的调度是在用户态下完成的,不涉及内核态与用户态之间的频繁切换,包括内存的分配与释放,都是在用户态维护着一块大的内存池,不直接调用系统的 malloc 函数(除非内存池需要改变),成本比调度 OS 线程低很多。
另一方面充分利用了多核的硬件资源,近似的把若干 goroutine 均分在物理线程上,再加上本身 goroutine 的超轻量,保证了 go 调度方面的性能。

Go 中的 goroutine 与操作系统线程的区别:

  • 一个操作系统线程对应用户态多个 goroutine
  • go 程序可以同时使用多个操作系统线程
  • goroutine 和 OS 线程是多对多的关系,即 m:n

channel

虽然可以使用共享内存进行数据交换,但是共享内存在不同的 goroutine 中容易发生竞态问题。为了保证数据交换的正确性,必须使用互斥量对内存进行加锁,这种做法势必造成性能问题。
Go 语言并发编程模型提倡通过通信共享内存而不是通过共享内存而实现通信
可以将 channel 看作 goroutine 之间的连接。channel 是可以让一个 goroutine 发送特定值到另一个 goroutine 的通信机制。
channel 遵循先进先出(FIFO),保证收发数据的顺序,并且 channel 具有具体的类型,一个 channel 只允许同一种类型通过;它可以进行比较,如果引用的是相同对象即为真。
通道有发送(send)、接收(receive)和关闭(close)三种操作。发送和接收都使用 <- 符号。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func main() {
// 定义
var ch1 chan int
// 第二个参数是缓冲区大小,可选
var ch2 = make(chan int, 20)

fmt.Println(ch1 == nil) // 零值,必须使用 make 初始化后才能使用

// send
ch2 <- 233
// receive
fmt.Println(<-ch2)
// 丢弃
<-ch2

// 手动关闭,非必须,可由 GC 感知回收
close(ch2)
}

只有在通知接收方 goroutine 所有的数据都发送完毕的时候才需要关闭通道。通道是可以被垃圾回收机制回收的,它和关闭文件是不一样的,在结束操作之后关闭文件是必须要做的,但关闭通道不是必须的。
对一个关闭的通道来说:

  • 发送值就会导致 panic。
  • 进行接收会一直获取值直到通道为空。
  • 如果没有值的通道执行接收操作会得到对应类型的零值。
  • 关闭一个已经关闭的通道会导致 panic。

通道分为带缓存和无缓存,或者叫缓冲,区别就是 make 的时候传没传第二个参数,这两种有点细微区别;

  • 无缓存通道
    无缓冲的通道又称为阻塞的通道,因为向无缓存通道发送数据,必须要有人在接收值,否则会一直阻塞。
    当两个 goroutine 进行无缓存通道通信时,就会导致发送接收的同步化,所有又被称为是『同步通道』;
    通过 channel 可以将多个 goroutine 串联起来。

  • 有缓存通道
    可以看作里面维护了一个队列,可以使用内置的 len 函数获取通道内元素的数量,使用 cap 函数获取通道的容量,虽然我们很少会这么做。
    其他方面与无缓存类似,只不过是通道满了以后才阻塞;

从管道取内容的时候,为了避免关闭后取完一直是零值,我们可以使用 for-range 的方式,取完之后会自动结束;虽然使用 ok 判断也能实现,但是 range 的方式更加优雅。

单方向的通道

多个 goroutine 使用通道进行传值的时候,很多情况是单向的,为了避免乱传,可以使用 Go 提供的单向通道:

1
2
3
4
5
6
7
8
// chan<- int  单向向通道输出
// x <- chan int 单向从通道输出
func squarer(out chan<- int, in <-chan int) {
for i := range in {
out <- i * i
}
close(out)
}

在函数传参及任何赋值操作中可以将双向通道转换为单向通道,但反过来是不可以的。

select多路复用

Go 内置了 select 关键字,可以同时响应多个通道的操作。
它使用类似于 switch 语句,它有一系列 case 分支和一个默认的分支。每个 case 会对应一个通道的通信(接收或发送)过程。 select 会一直等待,直到某个 case 的通信操作完成时,就会执行 case 分支对应的语句。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func func1() {
ch := make(chan int, 1)

for i := 0; i < 10; i++ {
select {
case x := <-ch:
// 0 2 4 6 8,因为大小为 1
fmt.Println(x)
case ch <- i:
fmt.Println("send ", i)
default:
fmt.Println("默认操作")
}
}

time.Sleep(time.Duration(5) * time.Second)
}

使用多路复用使代码更易读,并且有以下特性:

  • 可处理一个或多个 channel 的发送/接收操作。
  • 如果多个 case 同时满足,select 会随机选择一个。
  • 对于没有 case 的 select{} 会一直等待,可用于阻塞 main 函数。

并发退出

并发退出的情况是很常见的,让主线程直接结束的方式并不优雅,那就是最好通过通信完成;但是 channel 的消息被消费后其他的『线程』就获取不到了,所以,我们采用 close channel 的方式来广播退出通知:

1
2
3
4
5
6
7
8
9
10
var done = make(chan struct{})
// 通过 close chan 并发退出
func cancelled() bool {
select {
case <-done:
return true
default:
return false
}
}

这样在其他的『线程』中使用循环来 if 这个函数即可,主线程将 done 进行 close 其它的就会接受到这个信号,从而退出。

当设计操作共享变量的时候,自然就需要用到锁,虽然建议尽量使用 channel 完成逻辑;但是锁总是不可避免的;

  • 互斥锁
    能保证只有一个 goroutine 进入临界区,唤醒策略是随机的;
    互斥锁一般使用 sync.Mutex 的 lock 和 unlock 方法;
  • 读写互斥锁
    适用于读多写少的场景,读是不需要加锁的;
    即如果 goroutine 获取的是读锁,其他 goroutine 还可以获得读锁进行读取;
    如果 goroutine 获取了写锁,其他 goroutine 都需要等待;
    读写锁使用 sync.RWMutex 的 lock/unlock 是写锁,rlock/runlock 是读锁;

Go 语言中可以使用 sync.WaitGroup 来实现并发任务的同步;需要注意sync.WaitGroup是一个结构体,传递的时候要传递指针。
请注意,Go 中没有可重入锁的概念,请尽量避免使用,否则会导致死锁;
同时也要注意可见性的问题,在多核 CPU 执行期间,互相的缓存是不可见的。

sync.Once

如果初始化消耗比较大,那么将初始化延迟进行是个不错的选择,并且是一次性的,例如配置文件的读取,sync.Once 就是来做这个事情的。
理论上来讲,一次性初始化需要一个互斥锁和一个布尔变量来记录是否初始化完成,还是牵扯指令重排的问题,避免获取到初始化一半的情况;例如用 once 实现的单例模式:

1
2
3
4
5
6
7
8
9
10
11
type singleton struct {}

var instance *singleton
var once sync.Once

func GetInstance() *singleton {
once.Do(func() {
instance = &singleton{}
})
return instance
}

sync.Map

Go 默认提供的 map 是非并发安全的,所以在 sync 下提供了并发安全的 map;它开箱即用表示不用像内置的 map 一样使用 make 函数初始化就能直接使用;同时还内置了 Store、Load、LoadOrStore、Delete、Range 等操作方法(也必须使用这些方法才能保证安全)。

竞争检测

但是我们不可能想的那么全面,总有一些漏网之鱼的竞争关系,这时候可以使用 Go 提供的工具来检查,只要在 build、run、test 命令后面加上 -race 的 flag,具体的使用方法参考:https://golang.google.cn/ref/mem

原子操作

使用锁意味着上下文的切换都资源的消耗,针对基本类型我们还可以使用原子操作来保证并发安全,原子操作是 Go 语言提供的方法它在用户态就可以完成,因此性能比加锁操作更好。Go 语言中原子操作由内置的标准库 sync/atomic 提供。

方法解释
func LoadInt32(addr int32) (val int32)
func LoadInt64(addr
int64) (val int64)
func LoadUint32(addr uint32) (val uint32)
func LoadUint64(addr
uint64) (val uint64)
func LoadUintptr(addr uintptr) (val uintptr)
func LoadPointer(addr
unsafe.Pointer) (val unsafe.Pointer)
读取操作
func StoreInt32(addr int32, val int32)
func StoreInt64(addr
int64, val int64)
func StoreUint32(addr uint32, val uint32)
func StoreUint64(addr
uint64, val uint64)
func StoreUintptr(addr uintptr, val uintptr)
func StorePointer(addr
unsafe.Pointer, val unsafe.Pointer)
写入操作
func AddInt32(addr int32, delta int32) (new int32)
func AddInt64(addr
int64, delta int64) (new int64)
func AddUint32(addr uint32, delta uint32) (new uint32)
func AddUint64(addr
uint64, delta uint64) (new uint64)
func AddUintptr(addr *uintptr, delta uintptr) (new uintptr)
修改操作
func SwapInt32(addr int32, new int32) (old int32)
func SwapInt64(addr
int64, new int64) (old int64)
func SwapUint32(addr uint32, new uint32) (old uint32)
func SwapUint64(addr
uint64, new uint64) (old uint64)
func SwapUintptr(addr uintptr, new uintptr) (old uintptr)
func SwapPointer(addr
unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer)
交换操作
func CompareAndSwapInt32(addr int32, old, new int32) (swapped bool)
func CompareAndSwapInt64(addr
int64, old, new int64) (swapped bool)
func CompareAndSwapUint32(addr uint32, old, new uint32) (swapped bool)
func CompareAndSwapUint64(addr
uint64, old, new uint64) (swapped bool)
func CompareAndSwapUintptr(addr uintptr, old, new uintptr) (swapped bool)
func CompareAndSwapPointer(addr
unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)
比较并交换操作

测试

Go 语言中的测试依赖 go test 命令。编写测试代码和编写普通的 Go 代码过程是类似的,并不需要学习新的语法、规则或工具。
在包目录内,所有以 _test.go 为后缀名的源代码文件都是 go test 测试的一部分,不会被 go build 编译到最终的可执行文件中。在测试文件中有三种类型的函数:

类型格式作用
测试函数函数名前缀为 Test测试程序的一些逻辑行为是否正确
基准函数函数名前缀为 Benchmark测试函数的性能
示例函数函数名前缀为 Example为文档提供示例文档

go test 命令会遍历所有的 *_test.go 文件中符合上述命名规则的函数,然后生成一个临时的 main 包用于调用相应的测试函数,然后构建并运行、报告测试结果,最后清理测试中生成的临时文件。

1
2
3
4
5
6
7
8
9
// 测试函数名必须以Test开头,必须接收一个 *testing.T 类型参数
func TestSplit(t *testing.T) {
got := Split("a:b:c", ":")
want := []string{"a", "b", "c"} // 期望的结果
// 因为 slice 不能比较直接,借助反射包中的方法比较
if !reflect.DeepEqual(want, got) {
t.Errorf("excepted:%v, got:%v", want, got) // 测试失败输出错误提示
}
}

关于测试的内容暂时不去看太多了,先能玩起来再说,再加上标准库基本就可以写东西了。

参考

《Go语言圣经》
https://www.liwenzhou.com/posts/Go/14_concurrence/

喜欢就请我吃包辣条吧!

评论框加载失败,无法访问 Disqus

你可能需要魔法上网~~