dispatcher.go 1.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. package wkrp
  2. type Dispacher struct {
  3. size int
  4. enqchan TJobChan
  5. pool TJobPool
  6. workers map[int]*Worker
  7. quit chan struct{}
  8. }
  9. func NewDispacher(q_size int) *Dispacher {
  10. p := &Dispacher{
  11. size: q_size,
  12. enqchan: make(TJobChan, 64),
  13. pool: make(TJobPool, q_size),
  14. workers: make(map[int]*Worker),
  15. quit: make(chan struct{}),
  16. }
  17. go p.run()
  18. return p
  19. }
  20. func (self *Dispacher) Stop() {
  21. go func() {
  22. close(self.quit)
  23. for _, w := range self.workers {
  24. if w.alive() {
  25. w.Stop()
  26. }
  27. }
  28. close(self.enqchan)
  29. close(self.pool)
  30. }()
  31. }
  32. func (self *Dispacher) Do(job Tjob) {
  33. go func() {
  34. select {
  35. case self.enqchan <- job:
  36. case <-self.quit:
  37. }
  38. }()
  39. }
  40. func (self *Dispacher) run() {
  41. for i := 0; i < self.size; i++ {
  42. w := newWorker(i+1, self.pool)
  43. self.workers[i+1] = w
  44. w.start()
  45. }
  46. for {
  47. select {
  48. case job, ok := <-self.enqchan:
  49. if !ok {
  50. continue
  51. }
  52. for {
  53. jobchan, ok := <-self.pool
  54. if ok {
  55. jobchan<-job
  56. break // break from for
  57. }
  58. }// for
  59. case <-self.quit:
  60. return
  61. } // select
  62. }// for
  63. }