package wkrp import ( "git.wanbits.cc/joe/kettle/utl" "sync" ) const ( def_POOL_SIZE = 64 def_ENQCHAN_SIZE = 64 ) type Dispacher struct { enqchan utl.TJobChan enqchanSz int pool utl.TJobPool poolSz int workers map[int]*Worker quit chan struct{} wg sync.WaitGroup } func NewDispacher(pool_size int, cache_size int) *Dispacher { if pool_size < 1 { pool_size = def_POOL_SIZE } if cache_size < 1 { cache_size = def_ENQCHAN_SIZE } p := &Dispacher{ poolSz: pool_size, enqchanSz: cache_size, enqchan: make(utl.TJobChan, cache_size), pool: make(utl.TJobPool, pool_size), workers: make(map[int]*Worker), quit: make(chan struct{}), wg: sync.WaitGroup{}, } go p.run() return p } func (self *Dispacher) Stop() { go func() { close(self.quit) close(self.enqchan) for _, w := range self.workers { w.stop() } self.wg.Wait() close(self.pool) }() } func (self *Dispacher) Do(job *utl.Job) { go func() { select { case <-self.quit: case self.enqchan <- job: } }() } func (self *Dispacher) run() { for i := 0; i < self.poolSz; i++ { w := newWorker(i+1, self) self.workers[i+1] = w self.wg.Add(1) w.start() } for { select { case job, ok := <-self.enqchan: if !ok { continue } for { jobchan, ok := <-self.pool if ok { jobchan <- job break // break from for } } // for case <-self.quit: return } // select } // for }