| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101 |
- package utl
- import (
- "fmt"
- "time"
- )
- // WorkPool Options
- type WorkPoolOption func(*WorkPool)
- // size of pool
- func WithWorkPoolSize(size int) WorkPoolOption {
- return func(wp *WorkPool) {
- if size < 1 {
- size = 64
- }
- wp.size = size
- }
- }
- func WithWorkPoolLifetime(lifetime time.Duration) WorkPoolOption {
- if lifetime < 1*time.Second {
- lifetime = 1 * time.Second
- }
- return func(wp *WorkPool) {
- wp.lifetime = lifetime
- }
- }
- type WorkPool struct {
- chPool TJobPool
- size int
- lifetime time.Duration
- }
- // constructor. return a WorkPool instance by options
- func NewWorkPool(options ...WorkPoolOption) *WorkPool {
- wp := &WorkPool{}
- for _, opt := range options {
- opt(wp)
- }
- wp.chPool = make(TJobPool, wp.size)
- return wp
- }
- func (wp *WorkPool) Put(job *Job) {
- for len(wp.chPool) > 0 {
- jobChan := <-wp.chPool
- if jobChan == nil {
- continue
- }
- select {
- case <-jobChan:
- default:
- jobChan <- job
- return
- }
- } // for
- jobChan := make(TJobChan, 1)
- jobChan <- job
- go wp.do(jobChan)
- }
- // in goroutine
- func (wp *WorkPool) do(one TJobChan) {
- defer func() {
- if e := recover(); e != nil {
- fmt.Println("WorkPool error:", e)
- }
- }()
- timer := time.NewTimer(wp.lifetime)
- defer timer.Stop()
- for {
- select {
- case <-timer.C:
- close(one)
- job, ok := <-one
- if !ok {
- return
- }
- _ = job.Do()
- return
- case job, ok := <-one:
- if !ok {
- return // one 可能关闭
- }
- _ = job.Do()
- timer.Reset(time.Second * 10)
- }
- // reuse
- if len(wp.chPool) < wp.size {
- wp.chPool <- one
- }
- } //for
- }
|