摘要:go入门相关内容,参考:Go入门指南,主要总结其中:协程与通道(channel)的内容。
一、介绍
Go 语言为构建并发程序的基本代码块是协程 (goroutine) 与通道 (channel)。他们需要语言、编译器和 runtime 的支持。
Go 语言提供的垃圾回收器对并发编程至关重要。
不要通过共享内存来通信,而通过通信来共享内存。
通信强制协作。
1、线程与进程
进程 是对运行时程序的封装,是 系统进行资源调度和分配的的基本单位,实现了操作系统的并发。
线程是进程的子任务,是 CPU调度和分派的基本单位,用于保证程序的实时性,实现进程内部的并发;线程是操作系统可识别的最小执行和调度单位。
每个线程都独自占用一个虚拟处理器:独自的寄存器组,指令计数器和处理器状态。
每个线程完成不同的任务,但是共享同一地址空间(也就是同样的动态内存,映射文件,目标代码等等),打开的文件队列和其他内核资源。
一个线程只能属于一个进程,而一个进程可以有多个线程,但至少有一个线程。线程依赖于进程而存在。
进程在执行过程中拥有独立的内存单元,而多个线程共享进程的内存。(资源分配给进程,同一进程的所有线程共享该进程的所有资源。同一进程中的多个线程共享代码段(代码和常量),数据段(全局变量和静态变量),扩展段(堆存储)。但是每个线程拥有自己的栈段,栈段又叫运行时段,用来存放所有局部变量和临时变量。)
进程是资源分配的最小单位,线程是CPU调度的最小单位;
2、并发与并行
并发:并发(Concurrent)在操作系统中,是指一个时间段中有几个程序都处于已启动运行到运行完毕之间,且这几个程序都是在同一个处理器上运行。在同一时间点,任务并不会同时运行。
并行(Parallel):当系统有一个以上CPU时,当一个CPU执行一个进程时,另一个CPU可以执行另一个进程,两个进程互不抢占CPU资源,可以同时进行,这种方式我们称之为并行(Parallel)。在同一时间点,任务一定是同时运行。
3、go语言中的协程
在 Go 中,应用程序并发处理的部分被称作 goroutines(协程),它可以进行更有效的 并发运算。
在协程和操作系统线程之间并无一对一的关系:协程是根据一个或多个线程的可用性,映射(多路复用,执行于)在他们之上的;协程调度器在 Go 运行时很好的完成了这个工作。
协程工作在相同的地址空间中,所以共享内存的方式一定是同步的;这个可以使用 sync 包来实现,不过我们很不鼓励这样做:Go 使用 channels 来同步协程。
当系统调用(比如等待 I/O)阻塞协程时,其他协程会继续在其他线程上工作。协程的设计隐藏了许多线程创建和管理方面的复杂工作。
协程是轻量的,比线程更轻。它们痕迹非常不明显(使用少量的内存和资源):使用 4K 的栈内存就可以在堆中创建它们。并且它们对栈进行了分割,从而动态的增加(或缩减)内存的使用;栈的管理是自动的,但不是由垃圾回收器管理的,而是在协程退出后自动释放。
协程是通过使用 关键字 go 调用(或执行)一个函数或者方法来实现的(也可以是匿名或者 lambda 函数) 。
这样会在当前的计算过程中开始一个同时进行的函数,在相同的地址空间中并且分配了独立的栈,比如:go sum(bigArray),在后台计算总和。
协程的栈会根据需要进行伸缩,不会出现栈溢出;开发者无需关心栈的大小。当协程结束的时候,它会静默退出:用来启动这个协程的函数也不会得到任何的返回值。
任何 Go 程序都必须有的 main() 函数也可以看做是一个协程,尽管它并没有通过 go 来启动。协程可以在程序初始化的过程中运行(在 init() 函数中)。
在一个协程中,比如它需要进行非常密集的运算,你可以在运算循环中周期的使用 runtime.Gosched():这会让出处理器,允许运行其他协程;它并不会使当前协程挂起,所以它会自动恢复执行。使用 Gosched() 可以使计算均匀分布,使通信不至于迟迟得不到响应。
在其他语言中,比如 C#,Lua 或者 Python 都有协程的概念。这个名字表明它和 Go 协程有些相似,不过有两点不同:
- Go 协程意味着并行(或者可以以并行的方式部署),协程一般来说不是这样的
- Go 协程通过通道来通信;协程通过让出和恢复操作来通信
一个简单的示例如下所示:
1 | package main |
二、协程间的信道
Go 有一个特殊的类型,通道(channel),像是通道(管道),可以通过它们发送类型化的数据在协程之间通信,可以避开所有内存共享导致的坑;通道的通信方式保证了同步性。
同一时间只有一个协程可以访问数据,所以不会出现数据竞争。数据的归属(可以读写数据的能力)被传递。
通道服务于通信的两个目的:
- 值的交换
- 同步的保证了两个计算(协程)任何时候都是可知状态。
通常使用这样的格式来声明通道:
1 | var identifier chan datatype |
未初始化的通道的值是 nil。
通道 只能传输一种类型的数据,比如 chan int 或者 chan string,所有的类型都可以用于通道,空接口 interface{} 也可以。甚至可以(有时非常有用)创建通道的通道。
通道实际上是类型化消息的队列:使数据得以传输。它是 先进先出(FIFO)* 的结构所以可以保证发送给他们的元素的顺序(有些人知道,通道可以比作 Unix shells 中的双向管道(two-way pipe))。
通道也是引用类型,所以我们使用 make() 函数来给它分配内存。这里先声明了一个字符串通道 ch1,然后创建了它(实例化):
1 | var ch1 chan string |
所以通道是对象的第一类型:可以存储在变量中,作为函数的参数传递,从函数返回以及通过通道发送它们自身。
1、通信操作符 <-
这个操作符直观的标示了数据的传输:信息按照箭头的方向流动。
流向通道(发送):
ch <- int1 表示:用通道 ch 发送变量 int1(双目运算符,中缀 = 发送)
从通道流出(接收):
int2 := <- ch 表示:变量 int2 从通道 ch(一元运算的前缀操作符,前缀 = 接收)接收数据(获取新值)
<- ch 可以单独调用获取通道的(下一个)值,当前值会被丢弃,但是可以用来验证,所以以下代码是合法的:
1 | if <-ch != 1000 { |
通道的命名通常以 ch 开头或者包含 chan。
简单示例如下所示:
1 | package main |
如果注释掉 time.Sleep(1e9) , sendData() 会来不及输出。
我们发现协程之间的同步非常重要:
- main () 等待了 1 秒让两个协程完成,如果不这样,sendData () 就没有机会输出。
- getData () 使用了无限循环:它随着 sendData () 的发送完成和 ch 变空也结束了。
如果我们移除一个或所有 go 关键字,程序无法运行,Go 运行时会抛出 panic。
1 | ---- Error run E:/Go/Goboek/code examples/chapter 14/goroutine2.exe with code Crashed |
为什么会这样呢?
因为运行时会检查所有的协程(也许只有一个是这种情况)是否在等待(可以读取或者写入某个通道),意味着程序无法处理。这是死锁(deadlock)形式,运行时可以检测到这种情况。
注意:不要使用打印状态来表明通道的发送和接收顺序:由于打印状态和通道实际发生读写的时间延迟会导致和真实发生的顺序不同。
2、通道阻塞
默认情况下,通信是 同步且无缓冲 的:在有接收者接收数据之前,发送不会结束。
可以想象一个无缓冲的通道在没有空间来保存数据的时候:必须要一个接收者准备好接收通道的数据然后发送者可以直接把数据发送给接收者。
所以通道的发送(接收)操作在对方准备好之前是 阻塞 的:
1)对于同一个通道,发送操作(协程或者函数中的),在接收者准备好之前是阻塞的:如果 ch 中的数据无人接收,就无法再给通道传入其他数据:新的输入无法在通道非空的情况下传入 。所以发送操作会等待 ch 再次变为可用状态:就是通道值被接收时(可以传入变量)。
2)对于同一个通道,接收操作是阻塞的(协程或函数中的),直到发送者可用:如果通道中没有数据,接收者就阻塞了 。
下面的例子验证了以上理论,一个协程在无限循环中给通道发送整数数据。不过因为没有接收者,只输出了一个数字 0。
1 | package main |
为通道解除阻塞,可以定义 suck 函数来在无限循环中读取通道。
1 | func suck(ch chan int) { |
3、通过一个(或多个)通道交换数据进行协程同步
通信是一种同步形式:通过通道,两个协程在通信(协程会和)中某刻同步交换数据。无缓冲通道成为了多个协程同步的完美工具。
甚至可以在通道两端互相阻塞对方,形成了叫做死锁的状态。Go 运行时会检查并 panic,停止程序。死锁几乎完全是由糟糕的设计导致的。
无缓冲通道会被阻塞。设计无阻塞的程序可以避免这种情况,或者使用带缓冲的通道。
解释为什么下边这个程序会导致 panic:所有的协程都休眠了 - 死锁!
1 | package main |
无缓冲的 channel 收发都是阻塞的,上面的代码在 main 方法里发送 2(out <- 2),那么就会一直被阻塞着不往下进行。
解决办法有两种:
1 | 第一种:将 f1(out) 方法提前 |
4、同步通道-使用带缓冲的通道
一个无缓冲通道只能包含 1 个元素,有时显得很局限。我们给通道提供了一个缓存,可以在扩展的 make 命令中设置它的容量,如下:
1 | buf := 100 |
buf 是通道可以同时容纳的元素(这里是 string)个数。
在缓冲满载(缓冲被全部使用)之前,给一个带缓冲的通道发送数据是不会阻塞的,而从通道读取数据也不会阻塞,直到缓冲空了。
缓冲容量和类型无关,所以可以给一些通道设置不同的容量,只要他们拥有同样的元素类型。内置的 cap 函数可以返回缓冲区的容量。
如果 容量大于 0 ,通道就是 异步的 了:缓冲满载(发送)或变空(接收)之前通信不会阻塞,元素会 按照发送的顺序被接收 。如果 容量是 0 或者未设置 ,通信仅在 收发双方准备好(同步) 的情况下才可以成功。
1 | ch :=make(chan type, value) |
若使用通道的缓冲,你的程序会在“请求”激增的时候表现更好:更具弹性,专业术语叫:更具有伸缩性(scalable)。要在首要位置使用无缓冲通道来设计算法,只在不确定的情况下使用缓冲。
5、协程中用通道输出结果
为了知道计算何时完成,可以通过信道回报。在例子 go sum(bigArray) 中,要这样写:
1 | ch := make(chan int) |
也可以使用通道来达到同步的目的,这个很有效的用法在传统计算机中称为信号量(semaphore)。 或者换个方式:通过通道发送信号告知处理已经完成(在协程中)。
在其他协程运行时让 main 程序无限阻塞的通常做法是在 main 函数的最后放置一个 {}。
也可以使用通道让 main 程序等待协程完成,就是所谓的信号量模式。
6、信号量模式
下边的片段阐明:协程通过在通道 ch 中放置一个值来处理结束的信号。main 协程等待 <-ch 直到从中获取到值。
我们期望从这个通道中获取返回的结果,像这样:
1 | func compute(ch chan int){ |
这个信号也可以是其他的,不返回结果,比如下面这个协程中的匿名函数(lambda)协程:
1 | ch := make(chan int) |
或者等待两个协程完成,每一个都会对切片 s 的一部分进行排序,片段如下:
1 | done := make(chan bool) |
下边的代码,用完整的信号量模式对长度为 N 的 float64 切片进行了 N 个 doSomething() 计算并同时完成,通道 sem 分配了相同的长度(且包含空接口类型的元素),待所有的计算都完成后,发送信号(通过放入值)。 在循环中从通道 sem 不停的接收数据来等待所有的协程完成。
1 | type Empty interface {} |
注意闭合:i、xi 都是作为参数传入闭合函数的,从外层循环中隐藏了变量 i 和 xi。让每个协程有一份 i 和 xi 的拷贝;另外,for 循环的下一次迭代会更新所有协程中 i 和 xi 的值。切片 res 没有传入闭合函数,因为协程不需要单独拷贝一份。切片 res 也在闭合函数中但并不是参数。
7、实现并行的 for 循环
下面的代码中,for 循环的每一个迭代是并行完成的:
1 | for i, v := range data { |
在 for 循环中并行计算迭代可能带来很好的性能提升。不过所有的迭代都必须是独立完成的。有些语言比如 Fortress 或者其他并行框架以不同的结构实现了这种方式,在 Go 中用协程实现起来非常容易。
8、用带缓冲通道实现一个信号量
信号量是实现互斥锁(排外锁)常见的同步机制,限制对资源的访问,解决读写问题,比如没有实现信号量的 sync 的 Go 包,使用带缓冲的通道可以轻松实现:
- 带缓冲通道的容量和要同步的资源容量相同
- 通道的长度(当前存放的元素个数)与当前资源被使用的数量相同
- 容量减去通道的长度就是未处理的资源个数(标准信号量的整数值)
不用管通道中存放的是什么,只关注长度;因此我们创建了一个长度可变但容量为 0(字节)的通道:
1 | type Empty interface {} |
将可用资源的数量 N 来初始化信号量 semaphore:sem = make(semaphore, N)
然后直接对信号量进行操作:
1 | // acquire n resources |
可以用来实现一个互斥的例子:
1 | /* mutexes */ |
习惯用法:通道工厂模式
编程中常见的另外一种模式如下:不将通道作为参数传递给协程,而用函数来生成一个通道并返回(工厂角色);函数内有个匿名函数被协程调用。
1 | package main |
10、给通道使用 for 循环
for 循环的 range 语句可以用在通道 ch 上,便可以从通道中获取值,像这样:
1 | for v := range ch { |
它从指定通道中读取数据直到通道关闭,才继续执行下边的代码。很明显,另外一个协程必须写入 ch(不然代码就阻塞在 for 循环了),而且必须在写入完成后才关闭。
完整代码如下所示:
1 | package main |
习惯用法:通道迭代模式
从包含了地址索引字段 items 的容器给通道填入元素。为容器的类型定义一个方法 Iter(),返回一个只读的通道 items,如下:
1 | func (c *container) Iter () <- chan items { |
在协程里,一个 for 循环迭代容器 c 中的元素(对于树或图的算法,这种简单的 for 循环可以替换为深度优先搜索)。
调用这个方法的代码可以这样迭代容器:
1 | for x := range container.Iter() { ... } |
可以运行在自己的协程中,所以上边的迭代用到了一个通道和两个协程(可能运行在两个线程上)。就有了一个特殊的生产者-消费者模式。
如果程序在协程给通道写完值之前结束,协程不会被回收;设计如此。这种行为看起来是错误的,但是通道是一种线程安全的通信。
在这种情况下,协程尝试写入一个通道,而这个通道永远不会被读取,这可能是个 bug 而并非期望它被静默的回收。
习惯用法:生产者消费者模式
假设你有 Produce() 函数来产生 Consume 函数需要的值。它们都可以运行在独立的协程中,生产者在通道中放入给消费者读取的值。整个处理过程可以替换为无限循环:
1 | for { |
11、通道的方向
通道类型可以用注解来表示它只发送或者只接收:
1 | var send_only chan<- int // channel can only receive data |
只接收的通道(<-chan T)无法关闭,因为关闭通道是发送者用来表示不再给通道发送值了,所以对只接收通道是没有意义的。
通道创建的时候都是双向的 ,但也可以分配有方向的通道变量,就像以下代码:
1 | var c = make(chan int) // bidirectional |
管道和选择器模式
协程处理它从通道接收的数据并发送给输出通道:
1 | sendChan := make(chan int) |
通过使用方向注解来限制协程对通道的操作。
例子一
这里有一个来自 Go 指导的很赞的例子,打印了输出的素数,使用选择器(‘筛’)作为它的算法。每个 prime 都有一个选择器,如下图:
1 | // Copyright 2009 The Go Authors. All rights reserved. |
协程 filter(in, out chan int, prime int) 拷贝整数到输出通道,丢弃掉可以被 prime 整除的数字。然后每个 prime 又开启了一个新的协程,生成器和选择器并发请求。
例子二
第二个版本引入了上边的习惯用法:函数 sieve、generate 和 filter 都是工厂;它们创建通道并返回,而且使用了协程的 lambda 函数。main 函数现在短小清晰:它调用 sieve() 返回了包含素数的通道,然后通过 fmt.Println(<-primes) 打印出来。
1 | // Copyright 2009 The Go Authors. All rights reserved. |
三、协程的同步:关闭通道-测试阻塞的通道
通道可以被显式的关闭;尽管它们和文件不同:不必每次都关闭。只有在当需要告诉接收者不会再提供新的值的时候,才需要关闭通道。
只有发送者需要关闭通道,接收者永远不会需要。
示例:
1 | package main |
我们如何在通道的 sendData() 完成的时候发送一个信号,getData() 又如何检测到通道是否关闭或阻塞?
第一个问题可以通过函数 close(ch) 来完成:这个将通道标记为无法通过发送操作 <- 接受更多的值;给已经关闭的通道发送或者再次关闭都会导致运行时的 panic。在创建一个通道后使用 defer 语句是个不错的办法(类似这种情况):
1 | ch := make(chan float64) |
第二个问题可以使用逗号,ok 操作符:用来检测通道是否被关闭。
1 | v, ok := <-ch // ok is true if v received value |
四、使用 select 切换协程
从不同的并发执行的协程中获取值可以通过关键字 select 来完成,它和 switch 控制语句非常相似也被称作通信开关;它的行为像是 “你准备好了吗” 的 轮询机制;select 监听进入通道的数据,也可以是用通道发送值的时候。
1 | select { |
select 做的就是:选择处理列出的多个通信情况中的一个。
- 如果都阻塞了,会等待直到其中一个可以处理
- 如果多个可以处理,随机选择一个
- 如果没有通道操作可以处理并且写了 default 语句,它就会执行:default 永远是可运行的(这就是准备好了,可以执行)。
- 在 select 中使用发送操作并且有 default 可以确保发送不被阻塞!如果没有 case,select 就会一直阻塞。
在 select 中使用发送操作并且有 default 可以确保发送不被阻塞!如果没有 case,select 就会一直阻塞。
select 语句实现了一种监听模式,通常用在(无限)循环中;在某种情况下,通过 break 语句使循环退出。
例子如下所示:
1 | package main |
在上面例子中有 2 个通道 ch1 和 ch2,三个协程 pump1()、pump2() 和 suck()。 这是一个典型的生产者消费者模式。
在无限循环中,ch1 和 ch2 通过 pump1() 和 pump2() 填充整数;suck() 也是在无限循环中轮询输入的,通过 select 语句获取 ch1 和 ch2 的整数并输出。
选择哪一个 case 取决于哪一个通道收到了信息。程序在 main 执行 1 秒后结束。
习惯用法:后台服务模式
服务通常是用后台协程中的无限循环实现的,在循环中使用 select 获取并处理通道中的数据:
1 | // Backend goroutine. |
在程序的其他地方给通道 ch1,ch2 发送数据,比如:通道 stop 用来清理结束服务程序。
另一种方式(但是不太灵活)就是(客户端)在 chRequest 上提交请求,后台协程循环这个通道,使用 switch 根据请求的行为来分别处理:
1 | func backend() { |
五、通道、超时和计时器(Ticker)
time 包中有一些有趣的功能可以和通道组合使用。
其中就包含了 time.Ticker 结构体,这个对象以指定的时间间隔重复的向通道 C 发送时间值:
1 | type Ticker struct { |
时间间隔的单位是 ns(纳秒,int64),在工厂函数 time.NewTicker 中以 Duration 类型的参数传入:func Newticker(dur) *Ticker。
在协程周期性的执行一些事情(打印状态日志,输出,计算等等)的时候非常有用。
调用 Stop() 使计时器停止,在 defer 语句中使用。这些都很好的适应 select 语句:
1 | ticker := time.NewTicker(updateInterval) |
time.Tick() 函数声明为 Tick(d Duration) <-chan Time,当你想返回一个通道而不必关闭它的时候这个函数非常有用:它以 d 为周期给返回的通道发送时间,d 是纳秒数。
如果需要像下边的代码一样,限制处理频率(函数 client.Call() 是一个 RPC 调用,这里暂不赘述:
1 | import "time" |
定时器(Timer)结构体看上去和计时器(Ticker)结构体的确很像(构造为 NewTimer(d Duration)),但是它只发送一次时间,在 Dration d 之后。
还有 time.After(d) 函数,声明如下:
1 | func After(d Duration) <-chan Time |
在 Duration d 之后,当前时间被发到返回的通道;所以它和 NewTimer(d).C 是等价的;它类似 Tick(),但是 After() 只发送一次时间。
下边有个很具体的示例,很好的阐明了 select 中 default 的作用:
1 | package main |
简单超时模式
第一种形式
要从通道 ch 中接收数据,但是最多等待 1 秒。先创建一个信号通道,然后启动一个 lambda 协程,协程在给通道发送数据之前是休眠的:
1 | timeout := make(chan bool, 1) |
然后使用 select 语句接收 ch 或者 timeout 的数据:如果 ch 在 1 秒内没有收到数据,就选择到了 timeout 分支并放弃了 ch 的读取。
1 | select { |
第二种形式,取消耗时很长的同步调用
也可以使用 time.After() 函数替换 timeout-channel。可以在 select 中通过 time.After() 发送的超时信号来停止协程的执行。
以下代码,在 timeoutNs 纳秒后执行 select 的 timeout 分支后,执行 client.Call 的协程也随之结束,不会给通道 ch 返回值:
1 | ch := make(chan error, 1) |
注意缓冲大小设置为 1 是必要的,可以避免协程死锁以及确保超时的通道可以被垃圾回收。
此外,需要注意在有多个 case 符合条件时,select 对 case 的选择是伪随机的。
如果上面的代码稍作修改如下,则 select 语句可能不会在定时器超时信号到来时立刻选中 time.After(timeoutNs) 对应的 case,因此协程可能不会严格按照定时器设置的时间结束。
1 | ch := make(chan int, 1) |
第三种形式,取消耗时很长的同步调用
假设程序从多个复制的数据库同时读取。只需要一个答案,需要接收首先到达的答案,Query 函数获取数据库的连接切片并请求。
并行请求每一个数据库并返回收到的第一个响应:
1 | func Query(conns []conn, query string) Result { |
再次声明,结果通道 ch 必须是带缓冲的:以保证第一个发送进来的数据有地方可以存放,确保放入的首个数据总会成功,所以第一个到达的值会被获取而与执行的顺序无关。
正在执行的协程总是可以使用 runtime.Goexit() 来停止。
六、协程和恢复(recover)
一个用到 recover 的程序,停掉了服务器内部一个失败的协程而不影响其他协程的工作。
例子如下所示:
1 | func server(workChan <-chan *Work) { |
上边的代码,如果 do(work) 发生 panic,错误会被记录且协程会退出并释放,而其他协程不受影响。
七、新旧模型对比:任务和worker
假设我们需要处理很多任务;一个 worker 处理一项任务。任务可以被定义为一个结构体(具体的细节在这里并不重要):
1 | type Task struct { |
旧模式:使用共享内存进行同步
由各个任务组成的任务池共享内存;为了同步各个 worker 以及避免资源竞争,我们需要对任务池进行加锁保护:
1 | type Pool struct { |
sync.Mutex(是互斥锁:它用来在代码中保护临界区资源:同一时间只有一个 go 协程(goroutine)可以进入该临界区。
如果出现了同一时间多个 go 协程都进入了该临界区,则会产生竞争:Pool 结构就不能保证被正确更新。
在传统的模式中(经典的面向对象的语言中应用得比较多,比如 C++,JAVA,C#),worker 代码可能这样写:
1 | func Worker(pool *Pool) { |
使用通道进行同步
使用一个通道接受需要处理的任务,一个通道接受处理完成的任务(及其结果)。worker 在协程中启动,其数量 N 应该根据任务数量进行调整。
主线程扮演着 Master 节点角色,可能写成如下形式:
1 | func main() { |
worker 的逻辑比较简单:从 pending 通道拿任务,处理后将其放到 done 通道中:
1 | func Worker(in, out chan *Task) { |
这里并不使用锁:从通道得到新任务的过程没有任何竞争。
随着任务数量增加,worker 数量也应该相应增加,同时性能并不会像第一种方式那样下降明显。
在 pending 通道中存在一份任务的拷贝,第一个 worker 从 pending 通道中获得第一个任务并进行处理,这里并不存在竞争(对一个通道读数据和写数据的整个过程是原子性的)。
某一个任务会在哪一个 worker 中被执行是不可知的,反过来也是。
worker 数量的增多也会增加通信的开销,这会对性能有轻微的影响。
对于任何可以建模为 Master-Worker 范例的问题,一个类似于 worker 使用通道进行通信和交互、Master 进行整体协调的方案都能完美解决。
如果系统部署在多台机器上,各个机器上执行 Worker 协程,Master 和 Worker 之间使用 netchan 或者 RPC 进行通信。
怎么选择是该使用锁还是通道?
通道是一个较新的概念,本节我们着重强调了在 go 协程里通道的使用,但这并不意味着经典的锁方法就不能使用。
go 语言让你可以根据实际问题进行选择:创建一个优雅、简单、可读性强、在大多数场景性能表现都能很好的方案。如果你的问题适合使用锁,也不要忌讳使用它。
go 语言注重实用,什么方式最能解决你的问题就用什么方式,而不是强迫你使用一种编码风格。下面列出一个普遍的经验法则:
- 使用锁的情景:
- 访问共享数据结构中的缓存信息
- 保存应用程序上下文和状态信息数据
- 使用通道的情景:
- 与异步操作的结果进行交互
- 分发任务
- 传递数据所有权
当你发现你的锁使用规则变得很复杂时,可以反省使用通道会不会使问题变得简单些。
八、惰性生成器的实现
生成器是指当被调用时返回一个序列中下一个值的函数,例如:
1 | generateInteger() => 0 |
生成器每次返回的是序列中下一个值而非整个序列;这种特性也称之为惰性求值:只在你需要时进行求值,同时保留相关变量资源(内存和 cpu):这是一项在需要时对表达式进行求值的技术。
例如,生成一个无限数量的偶数序列:要产生这样一个序列并且在一个一个的使用可能会很困难,而且内存会溢出!但是一个含有通道和 go 协程的函数能轻易实现这个需求。
通过巧妙地使用空接口、闭包和高阶函数,我们能实现一个通用的惰性生产器的工厂函数 BuildLazyEvaluator(这个应该放在一个工具包中实现)。
工厂函数需要一个函数和一个初始状态作为输入参数,返回一个无参、返回值是生成序列的函数。
传入的函数需要计算出下一个返回值以及下一个状态参数。在工厂函数中,创建一个通道和无限循环的 go 协程。 返回值被放到了该通道中,返回函数稍后被调用时从该通道中取得该返回值。每当取得一个值时,下一个值即被计算。
在下面的例子中,定义了一个 evenFunc 函数,其是一个惰性生成函数:在 main 函数中,我们创建了前 10 个偶数,每个都是通过调用 even() 函数取得下一个值的。
为此,我们需要在 BuildLazyIntEvaluator 函数中具体化我们的生成函数,然后我们能够基于此做出定义。
1 | package main |
九、实现 Futures 模式
所谓 Futures 就是指:有时候在你使用某一个值之前需要先对其进行计算。这种情况下,你就可以在另一个处理器上进行该值的计算,到使用时,该值就已经计算完毕了。
Futures 模式通过闭包和通道可以很容易实现,类似于生成器,不同地方在于 Futures 需要返回一个值。
假设我们有一个矩阵类型,我们需要计算两个矩阵 A 和 B 乘积的逆,首先我们通过函数 Inverse(M) 分别对其进行求逆运算,在将结果相乘。
如下函数 InverseProduct() 实现了如上过程:
1 | func InverseProduct(a Matrix, b Matrix) { |
在这个例子中,a 和 b 的求逆矩阵需要先被计算。 那么为什么在计算 b 的逆矩阵时,需要等待 a 的逆计算完成呢?
显然不必要,这两个求逆运算其实可以并行执行的。换句话说,调用 Product 函数只需要等到 a_inv 和 b_inv 的计算完成。
如下代码实现了并行计算方式:
1 | func InverseProduct(a Matrix, b Matrix) { |
InverseFuture 函数起了一个 goroutine 协程,在其执行闭包运算,该闭包会将矩阵求逆结果放入到 future 通道中:
1 | func InverseFuture(a Matrix) { |
十、客户端-服务端模式
Client-server 类的应用是协程(goroutine)和频道(channel)的大显身手的闪光点。
客户端可以是任何一种运行在任何设备上的,且需要来自服务端信息的一种程序,所以它需要发送请求。
服务端接收请求,做一些处理,然后把给客户端发送响应信息。在通常情况下,就是多个客户端(很多请求)对一个(或几个)服务端。
一个常见例子就是我们使用的发送网页请求的客户端浏览器。然后一个 web 服务器将响应网页发回给浏览器。
在 Go 中,服务端通常会在一个协程(goroutine)里操作对一个客户端的响应,所以协程和客户端请求是一一对应的。
一种典型的做法就是客户端请求本身包含了一个频道(channel),服务端可以用它来发送响应。
1 | package main |
十一、限制并发数
例子如下所示:
1 | package main |
通过这种方式,程序中的协程通过使用缓冲通道(这个通道作为一个 semaphore 被使用)来调整资源的使用,实现了对内存等有限资源的优化。
十二、链式操作(重点!!!!)
下面的程序演示了启动大量的协程是多么的容易。它发生在 mian 函数的 for 循环中。在循环之后,向 right 通道中插入 0,在不到 1.5s 的时间执行了 100000 个协程,并将结果 100000 打印。
1 | package main |
这里有个很有意思的点:在没有运行程序之前,我认为leftmost的结果为1,认为只在最初做了一次赋值,实际结果为100000(无缓存信道,具有同步阻塞的特性),这个是为什么呢?
- 1.主线程的right <- 0,right不是最初循环的那个right,而是最终循环的right
- 2.for循环中最初的go f(left, right)因为没有发送者一直处于等待状态
- 3.当主线程的right <- 0执行时,类似于递归函数在最内层产生返回值一般
十三、多核运算
假设我们的 CPU 核数是 NCPU 个: const NCPU = 4 // 例如:4 代表 4 核处理器,我们将计算划分为 NCPU 部分,每部分与其他部分并行运行。
下面是一个简单的示例(我们忽略具体的参数):
1 | func DoAll() { |
函数 DoAll() 生成一个通道 sem ,在此基础上完成每一个并行计算;在 for 循环中启动 NCPU 个协程,每一个协程执行全部工作的 1/NCPU 。通过 sem 发送每一个协程中 DoPart() 完成的信号。
DoAll() 中用一个 for 循环来等待所有(NCPU 个)协程完成计算: 通道 sem 的行为就像一个 semaphore(信号量);这个代码展示了一个典型的信号量模式。
在当前的运行模式下,你还必须设置 GOMAXPROCS 为 NCPU。
十四、多核运算处理大量数据
假设我们必须处理 大量的彼此独立的数据项 ,通过一个输入通道进入,并且全部处理完成后放到一个输出通道,就像一个工厂的管道。每个数据项的处理也许会涉及多个步骤: 预处理 / 步骤 A / 步骤 B / … / 后期处理
一个典型的顺序 流水线算法 可以用来解决这个问题,下面示例展示了每一步执行的顺序:
1 | func SerialProcessData (in <- chan *Data, out <- chan *Data) { |
一次只执行一步,并且每个项目按顺序处理:在第一个项目被处理完并将结果放到输出通道之前第二个项目不会开始。
如果你仔细想想,你很快就会意识到这样会非常的浪费时间。
一个更有效的计算是 让每一个步骤都作为一个协程独立工作 。每个步骤都从上一步的输出通道获取输入数据。这样可以尽可能的避免时间浪费,并且大部分时间所有的步骤都会繁忙的执行:
1 | func ParallelProcessData (in <- chan *Data, out <- chan *Data) { |
通道缓冲区可以用于进一步优化整个过程。
十五、使用 Channel 来并发读取对象
为了保护一个对象的并发修改,我们可以使用一个后台的协程来顺序执行一个匿名函数,而不是通过同步 互斥锁(Mutex) 进行锁定。
在下面的程序中,我们有一个 Person 类型,它包含了一个匿名函数类型的通道字段 chF。它在构造器方法 NewPerson 中初始化,用一个协程启动一个 backend() 方法。这个方法在一个无限 for 循环中执行所有被放到 chF 上的函数,有效的序列化他们,从而提供安全的并发访问。
改变和获取 salary 可以通过一个放在 chF 上的匿名函数来实现,backend() 会顺序执行它们。注意如何在 Salary 方法中的闭合(匿名)函数中去包含 fChan 通道。
这是一个简化的例子,并且它不应该在这种情况下应用,但是它展示了如何在更复杂的情况下解决问题。
1 | package main |
参考文章
- 本文作者: th3ee9ine
- 本文链接: https://www.blog.ajie39.top/2022/06/17/Go协程与通道(channel)/
- 版权声明: 本博客所有文章除特别声明外,均采用 LICENSE 下的许可协议。转载请注明出处!