| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697 |
- package wkrp
- import (
- "git.wanbits.cc/joe/kettle/utl"
- "sync"
- )
- const (
- def_POOL_SIZE = 64
- def_ENQCHAN_SIZE = 64
- )
- type Dispacher struct {
- enqchan utl.TJobChan
- enqchanSz int
- pool utl.TJobPool
- poolSz int
- workers map[int]*Worker
- quit chan struct{}
- wg sync.WaitGroup
- }
- func NewDispacher(pool_size int, cache_size int) *Dispacher {
- if pool_size < 1 {
- pool_size = def_POOL_SIZE
- }
- if cache_size < 1 {
- cache_size = def_ENQCHAN_SIZE
- }
- p := &Dispacher{
- poolSz: pool_size,
- enqchanSz: cache_size,
- enqchan: make(utl.TJobChan, cache_size),
- pool: make(utl.TJobPool, pool_size),
- workers: make(map[int]*Worker),
- quit: make(chan struct{}),
- wg: sync.WaitGroup{},
- }
- go p.run()
- return p
- }
- func (self *Dispacher) Stop() {
- go func() {
- close(self.quit)
- close(self.enqchan)
- for _, w := range self.workers {
- w.stop()
- }
- self.wg.Wait()
- close(self.pool)
- }()
- }
- func (self *Dispacher) Do(job *utl.Job) {
- go func() {
- select {
- case <-self.quit:
- case self.enqchan <- job:
- }
- }()
- }
- func (self *Dispacher) run() {
- for i := 0; i < self.poolSz; i++ {
- w := newWorker(i+1, self)
- self.workers[i+1] = w
- self.wg.Add(1)
- w.start()
- }
- for {
- select {
- case job, ok := <-self.enqchan:
- if !ok {
- continue
- }
- for {
- jobchan, ok := <-self.pool
- if ok {
- jobchan <- job
- break // break from for
- }
- } // for
- case <-self.quit:
- return
- } // select
- } // for
- }
|