workerPool.go 1022 B

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. package utl
  2. import (
  3. "time"
  4. // "go.uber.org/zap"
  5. )
  6. type WorkPool struct {
  7. goChan chan chan func(*time.Timer)
  8. size int
  9. }
  10. func NewWorkPool(size int) *WorkPool {
  11. return &WorkPool{
  12. goChan: make(chan chan func(*time.Timer), size),
  13. size: size,
  14. }
  15. }
  16. func (wp *WorkPool) Put(f func(*time.Timer)) {
  17. for len(wp.goChan) > 0 {
  18. goRun := <-wp.goChan
  19. if goRun == nil {
  20. continue
  21. }
  22. select {
  23. case <-goRun:
  24. default:
  25. goRun <- f
  26. return
  27. }
  28. }
  29. goRun := make(chan func(*time.Timer), 1)
  30. goRun <- f
  31. go wp.do(goRun)
  32. }
  33. // in goroutine
  34. func (wp *WorkPool) do(one chan func(*time.Timer)) {
  35. defer func() {
  36. if e := recover(); e != nil {
  37. }
  38. }()
  39. timer := time.NewTimer(time.Second * 1)
  40. defer timer.Stop()
  41. for {
  42. select {
  43. case <-timer.C:
  44. close(one)
  45. f, ok := <-one
  46. if !ok {
  47. return
  48. }
  49. f(timer)
  50. return
  51. case f, ok := <-one:
  52. if !ok {
  53. return // one 可能关闭
  54. }
  55. f(timer)
  56. timer.Reset(time.Second * 10)
  57. }
  58. if len(wp.goChan) < wp.size {
  59. wp.goChan <- one
  60. }
  61. }
  62. }