workerPool.go 1.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. package utl
  2. import (
  3. "time"
  4. )
  5. type WorkPoolOption func(*WorkPool)
  6. func WithWorkPoolSize(size int) WorkPoolOption {
  7. return func(wp *WorkPool) {
  8. if size < 1 {
  9. size = 64
  10. }
  11. wp.size = size
  12. }
  13. }
  14. func WithWorkPoolLifetime(lifetime time.Duration) WorkPoolOption {
  15. if lifetime < 1*time.Second {
  16. lifetime = 1 * time.Second
  17. }
  18. return func(wp *WorkPool) {
  19. wp.lifetime = lifetime
  20. }
  21. }
  22. type WorkPool struct {
  23. goChan chan chan func(*time.Timer)
  24. size int
  25. lifetime time.Duration
  26. }
  27. func NewWorkPool(options ...WorkPoolOption) *WorkPool {
  28. wp := &WorkPool{}
  29. for _, opt := range options {
  30. opt(wp)
  31. }
  32. wp.goChan = make(chan chan func(*time.Timer), wp.size)
  33. return wp
  34. }
  35. func (wp *WorkPool) Put(f func(*time.Timer)) {
  36. for len(wp.goChan) > 0 {
  37. goRun := <-wp.goChan
  38. if goRun == nil {
  39. continue
  40. }
  41. select {
  42. case <-goRun:
  43. default:
  44. goRun <- f
  45. return
  46. }
  47. }
  48. goRun := make(chan func(*time.Timer), 1)
  49. goRun <- f
  50. go wp.do(goRun)
  51. }
  52. // in goroutine
  53. func (wp *WorkPool) do(one chan func(*time.Timer)) {
  54. defer func() {
  55. if e := recover(); e != nil {
  56. }
  57. }()
  58. timer := time.NewTimer(time.Second * 1)
  59. defer timer.Stop()
  60. for {
  61. select {
  62. case <-timer.C:
  63. close(one)
  64. f, ok := <-one
  65. if !ok {
  66. return
  67. }
  68. f(timer)
  69. return
  70. case f, ok := <-one:
  71. if !ok {
  72. return // one 可能关闭
  73. }
  74. f(timer)
  75. timer.Reset(time.Second * 10)
  76. }
  77. // reuse
  78. if len(wp.goChan) < wp.size {
  79. wp.goChan <- one
  80. }
  81. } //for
  82. }