dispatcher.go 1.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. package wkrp
  2. import "sync"
  3. const (
  4. def_POOL_SIZE = 64
  5. def_ENQCHAN_SIZE = 64
  6. )
  7. type TJobChan chan *Job
  8. type TJobPool chan TJobChan
  9. type Dispacher struct {
  10. enqchan TJobChan
  11. enqchanSz int
  12. pool TJobPool
  13. poolSz int
  14. workers map[int]*Worker
  15. quit chan struct{}
  16. wg sync.WaitGroup
  17. }
  18. func NewDispacher(pool_size int, cache_size int) *Dispacher {
  19. if pool_size < 1 {
  20. pool_size = def_POOL_SIZE
  21. }
  22. if cache_size < 1 {
  23. cache_size = def_ENQCHAN_SIZE
  24. }
  25. p := &Dispacher{
  26. poolSz: pool_size,
  27. enqchanSz: cache_size,
  28. enqchan: make(TJobChan, cache_size),
  29. pool: make(TJobPool, pool_size),
  30. workers: make(map[int]*Worker),
  31. quit: make(chan struct{}),
  32. wg: sync.WaitGroup{},
  33. }
  34. go p.run()
  35. return p
  36. }
  37. func (self *Dispacher) Stop() {
  38. go func() {
  39. close(self.quit)
  40. close(self.enqchan)
  41. for _, w := range self.workers {
  42. w.stop()
  43. }
  44. self.wg.Wait()
  45. close(self.pool)
  46. }()
  47. }
  48. func (self *Dispacher) Do(job *Job) {
  49. go func() {
  50. select {
  51. case <-self.quit:
  52. case self.enqchan <- job:
  53. }
  54. }()
  55. }
  56. func (self *Dispacher) run() {
  57. for i := 0; i < self.poolSz; i++ {
  58. w := newWorker(i+1, self)
  59. self.workers[i+1] = w
  60. self.wg.Add(1)
  61. w.start()
  62. }
  63. for {
  64. select {
  65. case job, ok := <-self.enqchan:
  66. if !ok {
  67. continue
  68. }
  69. for {
  70. jobchan, ok := <-self.pool
  71. if ok {
  72. jobchan <- job
  73. break // break from for
  74. }
  75. } // for
  76. case <-self.quit:
  77. return
  78. } // select
  79. } // for
  80. }