0
点赞
收藏
分享

微信扫一扫

Go并发编程(三)协程池


文章目录

  • ​​Go并发编程(三)协程池​​
  • ​​为什么需要协程池​​
  • ​​实现​​
  • ​​数据结构定义​​
  • ​​新增任务&执行任务​​
  • ​​goroutine异常处理​​
  • ​​关闭协程池​​
  • ​​使用​​

Go并发编程(三)协程池

本文参考如下博客实现了一个简易的协程池

  • 100 行写一个 go 的协程池 (任务池)

为什么需要协程池

goroutine 太多仍会导致调度性能下降、GC 频繁、内存暴涨, 引发一系列问题。在面临这样的场景时, 限制 goroutine 的数量、重用 goroutine

实现

实现的基本思路是采用生产者-消费者模型,用来执行任务的goroutine作为消费者,操作任务队列的goroutine是生产者,任务队列使用的是go中的buffer channel

Go并发编程(三)协程池_任务队列

数据结构定义

任务定义:

// 任务定义
type Task struct {
Handler func(v ...interface{}) // 任务处理函数
Params []interface{} // 处理函数参数列表
}

协程池定义:

// 任务池定义
type TaskPool struct {
Capacity int64 // 任务池容量
RunningGoroutine int64 // 运行中的goroutine数量
TaskQueue chan *Task // 任务队列
Status int64 // 任务池状态
sync.Mutex
PanicHandler func(interface{}) // goroutine异常处理机制
}

协程池状态常量定义

// 协程池状态
const(
RUNNING = iota
STOP
)

全局异常定义:

// 池容量非法异常
var ErrInvalidPoolCap = errors.New("task pool capacity invaild")
var ErrPoolAlreadyClosed = errors.New("pool is already go")

新增任务&执行任务

新增任务本质就是做goroutine数量检查,小于协程池容量则新启协程,超过就复用原有协程,协程的回收依赖于GC,任务是直接丢进管道,等待消费的goroutine执行

// 新增任务
func (p *TaskPool) Put(t *Task) error{
p.Lock()
defer p.Unlock()

if p.Status == STOP{
return ErrPoolAlreadyClosed
}

// 如果协程池未满则新启协程
if p.RunningGoroutine < p.Capacity{
// 协程池未满,则产生协程
p.run()
}
// 任务入队
p.TaskQueue <- t

return nil
}

执行任务其实就是监听channel消费具体的任务,这里采用的是带缓冲区的channel,所以消费生产是非阻塞的

// 从任务队列中取出任务执行
func (pool *TaskPool)run() {
// 新增运行中的goroutine
incRunning(pool)
go func() {
// 执行完成后运行中的goroutine--
defer func() {
decRunning(pool)
// goroutine panic
if r := recover();r != nil{
if pool.PanicHandler != nil{
pool.PanicHandler(r);
} else { // 默认处理
log.Printf("Worker panic: %s\n", r)
}
}
pool.checkRunningWork()
}()
// 具体goroutine执行策略
for{
select {
case task,ok := <- pool.TaskQueue:{
if !ok{
// 任务从管道消费失败
return
}
// 执行任务
task.Handler(task.Params)
}
}
}
}()
}

goroutine异常处理

如果某一个goroutine抛出panic就会导致整个程序崩溃退出,为了保证程序安全执行,需要对panic进行recover,进行异常处理,异常处理函数用户自定义

defer func() {
decRunning(pool)
// goroutine panic
if r := recover();r != nil{
if pool.PanicHandler != nil{
pool.PanicHandler(r);
} else { // 默认处理
log.Printf("Worker panic: %s\n", r)
}
}
pool.checkRunningWork()
}()

关闭协程池

关闭协程池需要做两个步骤:

  1. 关闭任务进入队列的入口
  2. 执行完任务队列中剩余的任务

// 安全关闭协程池
func (p *TaskPool) CloseTask() error{
p.Lock()
defer p.Unlock()

if p.Status == STOP{
return ErrPoolAlreadyClosed
}

atomic.CompareAndSwapInt64(&p.Status,RUNNING,STOP)

// 清空任务队列
for len(p.TaskQueue) > 0 { // 阻塞等待所有任务被 worker 消费
time.Sleep(1e6) // 防止等待任务清空 cpu 负载突然变大, 这里小睡一下
}

return nil
}

使用

func TestMyPool()  {
pool,err := InitTaskPool(10)
if err != nil{
panic(err)
}
for i := 0;i < 20;i++{
time.Sleep(1e6)
pool.Put(&Task{Handler: func(v ...interface{}) {
fmt.Print("i = ",i," ")
},Params: []interface{}{i}})
fmt.Println("pool running goroutine size: ",pool.GetPoolRunningGSize())
}

}

Go并发编程(三)协程池_任务池_02


举报

相关推荐

0 条评论