Golang 生产者消费者模型
2023-06-10
基础概念
生产者消费者模型是并发问题中一个比较简单的情形,一个或者多个线程作为生产者,生产产品,同时一个或者多个线程作为消费者,消费产品。两个基本的隐含规定,一是产品先被生产,然后被消费,而是一个产品只能消费一次。对应到业务中,可以对应一些有次序的并发业务。
一个生产者一个消费者
这种情形比较简单,可以对应生产和消费行为耗时基本相等的情形。用一个 channel(有缓冲)传递消息(产品),用 WaitGroup 协调 goroutine 安全退出,代码如下:
package main
import (
"log"
"sync"
)
func main() {
wg := sync.WaitGroup{}
wg.Add(1)
ch := make(chan int, 5)
go producer(ch)
go consumer(ch, &wg)
wg.Wait()
}
func producer(ch chan int) {
for i := 0; i < 100; i++ {
ch <- i
log.Printf("Produce a num %d\n", i)
}
close(ch)
}
func consumer(ch chan int, wg *sync.WaitGroup) {
for v := range ch {
log.Printf("Consume a num %d\n", v)
}
wg.Done()
}
一个生产者多个消费者
这种情形,可以对应生产行为快于消费行为的场景。依然可以用一个缓冲 channel 传递消息,用 WaitGroup 协调 goroutine 安全退出,代码如下:
package main
import (
"log"
"sync"
)
func main() {
wg := sync.WaitGroup{}
wg.Add(2)
ch := make(chan int, 10)
go producer(ch)
go consumer(1, ch, &wg)
go consumer(2, ch, &wg)
wg.Wait()
}
func producer(ch chan int) {
for i := 0; i < 100; i++ {
ch <- i
log.Printf("Produce a num %d\n", i)
}
close(ch)
}
func consumer(num int, ch chan int, wg *sync.WaitGroup) {
for v := range ch {
log.Printf("Consume a num %d in %d Consumer\n", v, num)
}
wg.Done()
}
多个生产者一个消费者
这里不能像之前一样直接在 producer 中关闭 channel, 因此加入了一个 goroutine admin,用于最终安全关闭 channel, 代码如下:
package main
import (
"log"
"sync"
)
func main() {
wg := sync.WaitGroup{}
done := sync.WaitGroup{}
wg.Add(1)
done.Add(2)
ch := make(chan int, 10)
go producer(1, ch, &done)
go producer(2, ch, &done)
go consumer(ch, &wg)
go admin(&done, ch)
wg.Wait()
}
func producer(num int, ch chan int, done *sync.WaitGroup) {
for i := 0; i < 100; i++ {
ch <- i + num*100
log.Printf("Produce a num %d in factory %d\n", i+num*100, num)
}
done.Done()
}
func consumer(ch chan int, wg *sync.WaitGroup) {
for v := range ch {
log.Printf("Consume a num %d\n", v)
}
wg.Done()
}
func admin(done *sync.WaitGroup, ch chan int) {
done.Wait()
close(ch)
}
多个生产者多个消费者
这种情形相对多个生产者一个消费者,直接增加消费者 goroutine 即可。代码如下:
package main
import (
"log"
"sync"
)
func main() {
wg := sync.WaitGroup{}
done := sync.WaitGroup{}
wg.Add(2)
done.Add(2)
ch := make(chan int, 10)
go producer(1, ch, &done)
go producer(2, ch, &done)
go consumer(1, ch, &wg)
go consumer(2, ch, &wg)
go admin(&done, ch)
wg.Wait()
}
func producer(num int, ch chan int, done *sync.WaitGroup) {
for i := 0; i < 100; i++ {
ch <- i + num*100
log.Printf("Produce a num %d in factory %d\n", i+num*100, num)
}
done.Done()
}
func consumer(num int, ch chan int, wg *sync.WaitGroup) {
for v := range ch {
log.Printf("Consume a num %d in consumer %d\n", v, num)
}
wg.Done()
}
func admin(done *sync.WaitGroup, ch chan int) {
done.Wait()
close(ch)
}