workerPool.go 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. package utl
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. // WorkPool Options
  7. type WorkPoolOption func(*WorkPool)
  8. // size of pool
  9. func WithWorkPoolSize(size int) WorkPoolOption {
  10. return func(wp *WorkPool) {
  11. if size < 1 {
  12. size = 64
  13. }
  14. wp.size = size
  15. }
  16. }
  17. func WithWorkPoolLifetime(lifetime time.Duration) WorkPoolOption {
  18. if lifetime < 1*time.Second {
  19. lifetime = 1 * time.Second
  20. }
  21. return func(wp *WorkPool) {
  22. wp.lifetime = lifetime
  23. }
  24. }
  25. type WorkPool struct {
  26. chPool TJobPool
  27. size int
  28. lifetime time.Duration
  29. }
  30. // constructor. return a WorkPool instance by options
  31. func NewWorkPool(options ...WorkPoolOption) *WorkPool {
  32. wp := &WorkPool{}
  33. for _, opt := range options {
  34. opt(wp)
  35. }
  36. wp.chPool = make(TJobPool, wp.size)
  37. return wp
  38. }
  39. func (wp *WorkPool) Put(job *Job) {
  40. for len(wp.chPool) > 0 {
  41. jobChan := <-wp.chPool
  42. if jobChan == nil {
  43. continue
  44. }
  45. select {
  46. case <-jobChan:
  47. default:
  48. jobChan <- job
  49. return
  50. }
  51. } // for
  52. jobChan := make(TJobChan, 1)
  53. jobChan <- job
  54. go wp.do(jobChan)
  55. }
  56. // in goroutine
  57. func (wp *WorkPool) do(one TJobChan) {
  58. defer func() {
  59. if e := recover(); e != nil {
  60. fmt.Println("WorkPool error:", e)
  61. }
  62. }()
  63. timer := time.NewTimer(wp.lifetime)
  64. defer timer.Stop()
  65. for {
  66. select {
  67. case <-timer.C:
  68. close(one)
  69. job, ok := <-one
  70. if !ok {
  71. return
  72. }
  73. _ = job.Do()
  74. return
  75. case job, ok := <-one:
  76. if !ok {
  77. return // one 可能关闭
  78. }
  79. _ = job.Do()
  80. timer.Reset(time.Second * 10)
  81. }
  82. // reuse
  83. if len(wp.chPool) < wp.size {
  84. wp.chPool <- one
  85. }
  86. } //for
  87. }