workerPool.go 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. package utl
  2. import (
  3. "time"
  4. )
  5. // WorkPool Options
  6. type WorkPoolOption func(*WorkPool)
  7. // size of pool
  8. func WithWorkPoolSize(size int) WorkPoolOption {
  9. return func(wp *WorkPool) {
  10. if size < 1 {
  11. size = 64
  12. }
  13. wp.size = size
  14. }
  15. }
  16. func WithWorkPoolLifetime(lifetime time.Duration) WorkPoolOption {
  17. if lifetime < 1*time.Second {
  18. lifetime = 1 * time.Second
  19. }
  20. return func(wp *WorkPool) {
  21. wp.lifetime = lifetime
  22. }
  23. }
  24. type WorkPool struct {
  25. chPool TJobPool
  26. size int
  27. lifetime time.Duration
  28. }
  29. // constructor. return a WorkPool instance by options
  30. func NewWorkPool(options ...WorkPoolOption) *WorkPool {
  31. wp := &WorkPool{}
  32. for _, opt := range options {
  33. opt(wp)
  34. }
  35. wp.chPool = make(TJobPool, wp.size)
  36. return wp
  37. }
  38. func (wp *WorkPool) Put(job *Job) {
  39. for len(wp.chPool) > 0 {
  40. jobChan := <-wp.chPool
  41. if jobChan == nil {
  42. continue
  43. }
  44. select {
  45. case <-jobChan:
  46. default:
  47. jobChan <- job
  48. return
  49. }
  50. } // for
  51. jobChan := make(TJobChan, 1)
  52. jobChan <- job
  53. go wp.do(jobChan)
  54. }
  55. // in goroutine
  56. func (wp *WorkPool) do(one TJobChan) {
  57. defer func() {
  58. if e := recover(); e != nil {
  59. }
  60. }()
  61. timer := time.NewTimer(time.Second * 1)
  62. defer timer.Stop()
  63. for {
  64. select {
  65. case <-timer.C:
  66. close(one)
  67. job, ok := <-one
  68. if !ok {
  69. return
  70. }
  71. _ = job.Do()
  72. return
  73. case job, ok := <-one:
  74. if !ok {
  75. return // one 可能关闭
  76. }
  77. _ = job.Do()
  78. timer.Reset(time.Second * 10)
  79. }
  80. // reuse
  81. if len(wp.chPool) < wp.size {
  82. wp.chPool <- one
  83. }
  84. } //for
  85. }