| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374 |
- package wkrp
- type Dispacher struct {
- size int
- enqchan TJobChan
- pool TJobPool
- workers map[int]*Worker
- quit chan struct{}
- }
- func NewDispacher(q_size int) *Dispacher {
- p := &Dispacher{
- size: q_size,
- enqchan: make(TJobChan, 64),
- pool: make(TJobPool, q_size),
- workers: make(map[int]*Worker),
- quit: make(chan struct{}),
- }
- go p.run()
- return p
- }
- func (self *Dispacher) Stop() {
- go func() {
- close(self.quit)
- for _, w := range self.workers {
- if w.alive() {
- w.Stop()
- }
- }
- close(self.enqchan)
- close(self.pool)
- }()
- }
- func (self *Dispacher) Do(job Tjob) {
- go func() {
- select {
- case self.enqchan <- job:
- case <-self.quit:
- }
- }()
- }
- func (self *Dispacher) run() {
- for i := 0; i < self.size; i++ {
- w := newWorker(i+1, self.pool)
- self.workers[i+1] = w
- w.start()
- }
- for {
- select {
- case job, ok := <-self.enqchan:
- if !ok {
- continue
- }
- for {
- jobchan, ok := <-self.pool
- if ok {
- jobchan<-job
- break // break from for
- }
- }// for
- case <-self.quit:
- return
- } // select
- }// for
- }
|