| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798 |
- package utl
- import (
- "time"
- )
- // WorkPool Options
- type WorkPoolOption func(*WorkPool)
- // size of pool
- func WithWorkPoolSize(size int) WorkPoolOption {
- return func(wp *WorkPool) {
- if size < 1 {
- size = 64
- }
- wp.size = size
- }
- }
- func WithWorkPoolLifetime(lifetime time.Duration) WorkPoolOption {
- if lifetime < 1*time.Second {
- lifetime = 1 * time.Second
- }
- return func(wp *WorkPool) {
- wp.lifetime = lifetime
- }
- }
- type WorkPool struct {
- chPool TJobPool
- size int
- lifetime time.Duration
- }
- // constructor. return a WorkPool instance by options
- func NewWorkPool(options ...WorkPoolOption) *WorkPool {
- wp := &WorkPool{}
- for _, opt := range options {
- opt(wp)
- }
- wp.chPool = make(TJobPool, wp.size)
- return wp
- }
- func (wp *WorkPool) Put(job *Job) {
- for len(wp.chPool) > 0 {
- jobChan := <-wp.chPool
- if jobChan == nil {
- continue
- }
- select {
- case <-jobChan:
- default:
- jobChan <- job
- return
- }
- } // for
- jobChan := make(TJobChan, 1)
- jobChan <- job
- go wp.do(jobChan)
- }
- // in goroutine
- func (wp *WorkPool) do(one TJobChan) {
- defer func() {
- if e := recover(); e != nil {
- }
- }()
- timer := time.NewTimer(time.Second * 1)
- defer timer.Stop()
- for {
- select {
- case <-timer.C:
- close(one)
- job, ok := <-one
- if !ok {
- return
- }
- _ = job.Do()
- return
- case job, ok := <-one:
- if !ok {
- return // one 可能关闭
- }
- _ = job.Do()
- timer.Reset(time.Second * 10)
- }
- // reuse
- if len(wp.chPool) < wp.size {
- wp.chPool <- one
- }
- } //for
- }
|