| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081 |
- package utl
- import (
- "fmt"
- "strconv"
- "time"
- // "go.uber.org/zap"
- )
- type WorkPool struct {
- goChan chan chan func(*time.Timer)
- size int // goChan 缓冲长度
- }
- func NewWorkPool(size int) *WorkPool {
- return &WorkPool{
- goChan: make(chan chan func(*time.Timer), size),
- size: size,
- }
- }
- // 加入一个任务
- func (wp *WorkPool) Put(f func(*time.Timer)) {
- fmt.Println("len()=", strconv.Itoa(len(wp.goChan)))
- for len(wp.goChan) > 0 {
- goRun := <-wp.goChan // 从 channel 读一条chan
- if goRun == nil {
- fmt.Println("nil goroutine")
- continue
- }
- select {
- case <-goRun: // 清空?
- fmt.Println("clear?")
- default:
- fmt.Println("writen")
- goRun <- f //写入, 在 do 中执行
- return
- }
- }
- // when len(wp.goChan) <= 0
- goRun := make(chan func(*time.Timer), 1) // 当场创建并执行
- goRun <- f
- go wp.do(goRun)
- }
- // in goroutine
- func (wp *WorkPool) do(one chan func(*time.Timer)) {
- defer func() {
- if e := recover(); e != nil {
- // log.Error("recover panic", zap.Any("error", e))
- }
- }()
- timer := time.NewTimer(time.Second * 1)
- defer timer.Stop()
- for { // 带超时的循环读取
- select {
- case <-timer.C: // 10s 超时
- fmt.Println("timeout")
- close(one)
- f, ok := <-one
- if !ok {
- fmt.Println("close without exec")
- return // 关闭成功返回
- }
- f(timer)
- return
- case f, ok := <-one: //大多数走这里
- if !ok {
- return // one 可能关闭
- }
- f(timer)
- timer.Reset(time.Second * 10) // 一次循环重置
- }
- if len(wp.goChan) < wp.size {
- wp.goChan <- one // 成功执行后入队
- fmt.Println("pushback:" + strconv.Itoa(len(wp.goChan)))
- }
- }
- }
|