| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283 |
- package wkrp
- import "sync"
- type TJobChan chan IJob
- type TJobPool chan TJobChan
- type Dispacher struct {
- size int
- enqchan TJobChan
- pool TJobPool
- workers map[int]*Worker
- quit chan struct{}
- wg sync.WaitGroup
- }
- func NewDispacher(q_size int) *Dispacher {
- p := &Dispacher{
- size: q_size,
- enqchan: make(TJobChan, 64),
- pool: make(TJobPool, q_size),
- workers: make(map[int]*Worker),
- quit: make(chan struct{}),
- wg: sync.WaitGroup{},
- }
- go p.run()
- return p
- }
- func (self *Dispacher) Stop() {
- go func() {
- close(self.quit)
- close(self.enqchan)
- for _, w := range self.workers {
- w.stop()
- }
- self.wg.Wait()
- close(self.pool)
- }()
- }
- func (self *Dispacher) Do(job IJob) {
- go func() {
- select {
- case <-self.quit:
- case self.enqchan <- job:
- }
- }()
- }
- func (self *Dispacher) run() {
- for i := 0; i < self.size; i++ {
- w := newWorker(i+1, self)
- self.workers[i+1] = w
- self.wg.Add(1);
- w.start()
- }
- for {
- select {
- case job, ok := <-self.enqchan:
- if !ok {
- continue
- }
- for {
- jobchan, ok := <-self.pool
- if ok {
- jobchan <- job
- break // break from for
- }
- } // for
- case <-self.quit:
- return
- } // select
- } // for
- }
|