dispatcher.go 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. package wkrp
  2. import (
  3. "git.wanbits.io/joe/kettle/utl"
  4. "sync"
  5. )
  6. const (
  7. def_POOL_SIZE = 64
  8. def_ENQCHAN_SIZE = 64
  9. )
  10. type Dispacher struct {
  11. enqchan utl.TJobChan
  12. enqchanSz int
  13. pool utl.TJobPool
  14. poolSz int
  15. workers map[int]*Worker
  16. quit chan struct{}
  17. wg sync.WaitGroup
  18. }
  19. func NewDispacher(pool_size int, cache_size int) *Dispacher {
  20. if pool_size < 1 {
  21. pool_size = def_POOL_SIZE
  22. }
  23. if cache_size < 1 {
  24. cache_size = def_ENQCHAN_SIZE
  25. }
  26. p := &Dispacher{
  27. poolSz: pool_size,
  28. enqchanSz: cache_size,
  29. enqchan: make(utl.TJobChan, cache_size),
  30. pool: make(utl.TJobPool, pool_size),
  31. workers: make(map[int]*Worker),
  32. quit: make(chan struct{}),
  33. wg: sync.WaitGroup{},
  34. }
  35. go p.run()
  36. return p
  37. }
  38. func (self *Dispacher) Stop() {
  39. go func() {
  40. close(self.quit)
  41. close(self.enqchan)
  42. for _, w := range self.workers {
  43. w.stop()
  44. }
  45. self.wg.Wait()
  46. close(self.pool)
  47. }()
  48. }
  49. func (self *Dispacher) Do(job *utl.Job) {
  50. go func() {
  51. select {
  52. case <-self.quit:
  53. case self.enqchan <- job:
  54. }
  55. }()
  56. }
  57. func (self *Dispacher) run() {
  58. for i := 0; i < self.poolSz; i++ {
  59. w := newWorker(i+1, self)
  60. self.workers[i+1] = w
  61. self.wg.Add(1)
  62. w.start()
  63. }
  64. for {
  65. select {
  66. case job, ok := <-self.enqchan:
  67. if !ok {
  68. continue
  69. }
  70. for {
  71. jobchan, ok := <-self.pool
  72. if ok {
  73. jobchan <- job
  74. break // break from for
  75. }
  76. } // for
  77. case <-self.quit:
  78. return
  79. } // select
  80. } // for
  81. }