dispatcher.go 1.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. package wkrp
  2. import "sync"
  3. type TJobChan chan IJob
  4. type TJobPool chan TJobChan
  5. type Dispacher struct {
  6. size int
  7. enqchan TJobChan
  8. pool TJobPool
  9. workers map[int]*Worker
  10. quit chan struct{}
  11. wg sync.WaitGroup
  12. }
  13. func NewDispacher(q_size int) *Dispacher {
  14. p := &Dispacher{
  15. size: q_size,
  16. enqchan: make(TJobChan, 64),
  17. pool: make(TJobPool, q_size),
  18. workers: make(map[int]*Worker),
  19. quit: make(chan struct{}),
  20. wg: sync.WaitGroup{},
  21. }
  22. go p.run()
  23. return p
  24. }
  25. func (self *Dispacher) Stop() {
  26. go func() {
  27. close(self.quit)
  28. close(self.enqchan)
  29. for _, w := range self.workers {
  30. w.stop()
  31. }
  32. self.wg.Wait()
  33. close(self.pool)
  34. }()
  35. }
  36. func (self *Dispacher) Do(job IJob) {
  37. go func() {
  38. select {
  39. case <-self.quit:
  40. case self.enqchan <- job:
  41. }
  42. }()
  43. }
  44. func (self *Dispacher) run() {
  45. for i := 0; i < self.size; i++ {
  46. w := newWorker(i+1, self)
  47. self.workers[i+1] = w
  48. self.wg.Add(1);
  49. w.start()
  50. }
  51. for {
  52. select {
  53. case job, ok := <-self.enqchan:
  54. if !ok {
  55. continue
  56. }
  57. for {
  58. jobchan, ok := <-self.pool
  59. if ok {
  60. jobchan <- job
  61. break // break from for
  62. }
  63. } // for
  64. case <-self.quit:
  65. return
  66. } // select
  67. } // for
  68. }