workerPool.go 1.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. package utl
  2. import (
  3. "time"
  4. )
  5. type WorkPoolOption func(*WorkPool)
  6. func WithWorkPoolSize(size int) WorkPoolOption {
  7. return func(wp *WorkPool) {
  8. if size < 1 {
  9. size = 64
  10. }
  11. wp.size = size
  12. }
  13. }
  14. func WithWorkPoolLifetime(lifetime time.Duration) WorkPoolOption {
  15. if lifetime < 1*time.Second {
  16. lifetime = 1 * time.Second
  17. }
  18. return func(wp *WorkPool) {
  19. wp.lifetime = lifetime
  20. }
  21. }
  22. type WorkPool struct {
  23. goChan TJobPool
  24. size int
  25. lifetime time.Duration
  26. }
  27. func NewWorkPool(options ...WorkPoolOption) *WorkPool {
  28. wp := &WorkPool{}
  29. for _, opt := range options {
  30. opt(wp)
  31. }
  32. wp.goChan = make(TJobPool, wp.size)
  33. return wp
  34. }
  35. func (wp *WorkPool) Put(job *Job) {
  36. for len(wp.goChan) > 0 {
  37. jobChan := <-wp.goChan
  38. if jobChan == nil {
  39. continue
  40. }
  41. select {
  42. case <-jobChan:
  43. default:
  44. jobChan <- job
  45. return
  46. }
  47. }
  48. jobChan := make(TJobChan, 1)
  49. jobChan <- job
  50. go wp.do(jobChan)
  51. }
  52. // in goroutine
  53. func (wp *WorkPool) do(one TJobChan) {
  54. defer func() {
  55. if e := recover(); e != nil {
  56. }
  57. }()
  58. timer := time.NewTimer(time.Second * 1)
  59. defer timer.Stop()
  60. for {
  61. select {
  62. case <-timer.C:
  63. close(one)
  64. job, ok := <-one
  65. if !ok {
  66. return
  67. }
  68. _ = job.Do()
  69. return
  70. case job, ok := <-one:
  71. if !ok {
  72. return // one 可能关闭
  73. }
  74. _ = job.Do()
  75. timer.Reset(time.Second * 10)
  76. }
  77. // reuse
  78. if len(wp.goChan) < wp.size {
  79. wp.goChan <- one
  80. }
  81. } //for
  82. }