| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394 |
- package utl
- import (
- "time"
- )
- type WorkPoolOption func(*WorkPool)
- func WithWorkPoolSize(size int) WorkPoolOption {
- return func(wp *WorkPool) {
- if size < 1 {
- size = 64
- }
- wp.size = size
- }
- }
- func WithWorkPoolLifetime(lifetime time.Duration) WorkPoolOption {
- if lifetime < 1*time.Second {
- lifetime = 1 * time.Second
- }
- return func(wp *WorkPool) {
- wp.lifetime = lifetime
- }
- }
- type WorkPool struct {
- goChan TJobPool
- size int
- lifetime time.Duration
- }
- func NewWorkPool(options ...WorkPoolOption) *WorkPool {
- wp := &WorkPool{}
- for _, opt := range options {
- opt(wp)
- }
- wp.goChan = make(TJobPool, wp.size)
- return wp
- }
- func (wp *WorkPool) Put(job *Job) {
- for len(wp.goChan) > 0 {
- jobChan := <-wp.goChan
- if jobChan == nil {
- continue
- }
- select {
- case <-jobChan:
- default:
- jobChan <- job
- return
- }
- }
- jobChan := make(TJobChan, 1)
- jobChan <- job
- go wp.do(jobChan)
- }
- // in goroutine
- func (wp *WorkPool) do(one TJobChan) {
- defer func() {
- if e := recover(); e != nil {
- }
- }()
- timer := time.NewTimer(time.Second * 1)
- defer timer.Stop()
- for {
- select {
- case <-timer.C:
- close(one)
- job, ok := <-one
- if !ok {
- return
- }
- _ = job.Do()
- return
- case job, ok := <-one:
- if !ok {
- return // one 可能关闭
- }
- _ = job.Do()
- timer.Reset(time.Second * 10)
- }
- // reuse
- if len(wp.goChan) < wp.size {
- wp.goChan <- one
- }
- } //for
- }
|