Golang 生产者消费者模型
2023-06-10

基础概念

生产者消费者模型是并发问题中一个比较简单的情形,一个或者多个线程作为生产者,生产产品,同时一个或者多个线程作为消费者,消费产品。两个基本的隐含规定,一是产品先被生产,然后被消费,而是一个产品只能消费一次。对应到业务中,可以对应一些有次序的并发业务。

一个生产者一个消费者

这种情形比较简单,可以对应生产和消费行为耗时基本相等的情形。用一个 channel(有缓冲)传递消息(产品),用 WaitGroup 协调 goroutine 安全退出,代码如下:

package main

import (
	"log"
	"sync"
)

func main() {
	wg := sync.WaitGroup{}
	wg.Add(1)
	ch := make(chan int, 5)
	go producer(ch)
	go consumer(ch, &wg)
	wg.Wait()
}

func producer(ch chan int) {
	for i := 0; i < 100; i++ {
		ch <- i
		log.Printf("Produce a num %d\n", i)
	}
	close(ch)
}

func consumer(ch chan int, wg *sync.WaitGroup) {
	for v := range ch {
		log.Printf("Consume a num %d\n", v)
	}
	wg.Done()
}

一个生产者多个消费者

这种情形,可以对应生产行为快于消费行为的场景。依然可以用一个缓冲 channel 传递消息,用 WaitGroup 协调 goroutine 安全退出,代码如下:

package main

import (
	"log"
	"sync"
)

func main() {
	wg := sync.WaitGroup{}
	wg.Add(2)
	ch := make(chan int, 10)
	go producer(ch)
	go consumer(1, ch, &wg)
	go consumer(2, ch, &wg)
	wg.Wait()
}

func producer(ch chan int) {
	for i := 0; i < 100; i++ {
		ch <- i
		log.Printf("Produce a num %d\n", i)
	}
	close(ch)
}

func consumer(num int, ch chan int, wg *sync.WaitGroup) {
	for v := range ch {
		log.Printf("Consume a num %d in %d Consumer\n", v, num)
	}
	wg.Done()
}

多个生产者一个消费者

这里不能像之前一样直接在 producer 中关闭 channel, 因此加入了一个 goroutine admin,用于最终安全关闭 channel, 代码如下:

package main

import (
	"log"
	"sync"
)

func main() {
	wg := sync.WaitGroup{}
	done := sync.WaitGroup{}
	wg.Add(1)
	done.Add(2)
	ch := make(chan int, 10)
	go producer(1, ch, &done)
	go producer(2, ch, &done)
	go consumer(ch, &wg)
	go admin(&done, ch)
	wg.Wait()
}

func producer(num int, ch chan int, done *sync.WaitGroup) {
	for i := 0; i < 100; i++ {
		ch <- i + num*100
		log.Printf("Produce a num %d in factory %d\n", i+num*100, num)
	}
	done.Done()
}

func consumer(ch chan int, wg *sync.WaitGroup) {
	for v := range ch {
		log.Printf("Consume a num %d\n", v)
	}
	wg.Done()
}

func admin(done *sync.WaitGroup, ch chan int) {
	done.Wait()
	close(ch)
}

多个生产者多个消费者

这种情形相对多个生产者一个消费者,直接增加消费者 goroutine 即可。代码如下:

package main

import (
	"log"
	"sync"
)

func main() {
	wg := sync.WaitGroup{}
	done := sync.WaitGroup{}
	wg.Add(2)
	done.Add(2)
	ch := make(chan int, 10)
	go producer(1, ch, &done)
	go producer(2, ch, &done)
	go consumer(1, ch, &wg)
	go consumer(2, ch, &wg)
	go admin(&done, ch)
	wg.Wait()
}

func producer(num int, ch chan int, done *sync.WaitGroup) {
	for i := 0; i < 100; i++ {
		ch <- i + num*100
		log.Printf("Produce a num %d in factory %d\n", i+num*100, num)
	}
	done.Done()
}

func consumer(num int, ch chan int, wg *sync.WaitGroup) {
	for v := range ch {
		log.Printf("Consume a num %d in consumer %d\n", v, num)
	}
	wg.Done()
}

func admin(done *sync.WaitGroup, ch chan int) {
	done.Wait()
	close(ch)
}