package utl import ( "fmt" "strconv" "time" // "go.uber.org/zap" ) type WorkPool struct { goChan chan chan func(*time.Timer) size int // goChan 缓冲长度 } func NewWorkPool(size int) *WorkPool { return &WorkPool{ goChan: make(chan chan func(*time.Timer), size), size: size, } } // 加入一个任务 func (wp *WorkPool) Put(f func(*time.Timer)) { fmt.Println("len()=", strconv.Itoa(len(wp.goChan))) for len(wp.goChan) > 0 { goRun := <-wp.goChan // 从 channel 读一条chan if goRun == nil { fmt.Println("nil goroutine") continue } select { case <-goRun: // 清空? fmt.Println("clear?") default: fmt.Println("writen") goRun <- f //写入, 在 do 中执行 return } } // when len(wp.goChan) <= 0 goRun := make(chan func(*time.Timer), 1) // 当场创建并执行 goRun <- f go wp.do(goRun) } // in goroutine func (wp *WorkPool) do(one chan func(*time.Timer)) { defer func() { if e := recover(); e != nil { // log.Error("recover panic", zap.Any("error", e)) } }() timer := time.NewTimer(time.Second * 1) defer timer.Stop() for { // 带超时的循环读取 select { case <-timer.C: // 10s 超时 fmt.Println("timeout") close(one) f, ok := <-one if !ok { fmt.Println("close without exec") return // 关闭成功返回 } f(timer) return case f, ok := <-one: //大多数走这里 if !ok { return // one 可能关闭 } f(timer) timer.Reset(time.Second * 10) // 一次循环重置 } if len(wp.goChan) < wp.size { wp.goChan <- one // 成功执行后入队 fmt.Println("pushback:" + strconv.Itoa(len(wp.goChan))) } } }