workerPool.go 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. package utl
  2. import (
  3. "fmt"
  4. "strconv"
  5. "time"
  6. // "go.uber.org/zap"
  7. )
  8. type WorkPool struct {
  9. goChan chan chan func(*time.Timer)
  10. size int // goChan 缓冲长度
  11. }
  12. func NewWorkPool(size int) *WorkPool {
  13. return &WorkPool{
  14. goChan: make(chan chan func(*time.Timer), size),
  15. size: size,
  16. }
  17. }
  18. // 加入一个任务
  19. func (wp *WorkPool) Put(f func(*time.Timer)) {
  20. fmt.Println("len()=", strconv.Itoa(len(wp.goChan)))
  21. for len(wp.goChan) > 0 {
  22. goRun := <-wp.goChan // 从 channel 读一条chan
  23. if goRun == nil {
  24. fmt.Println("nil goroutine")
  25. continue
  26. }
  27. select {
  28. case <-goRun: // 清空?
  29. fmt.Println("clear?")
  30. default:
  31. fmt.Println("writen")
  32. goRun <- f //写入, 在 do 中执行
  33. return
  34. }
  35. }
  36. // when len(wp.goChan) <= 0
  37. goRun := make(chan func(*time.Timer), 1) // 当场创建并执行
  38. goRun <- f
  39. go wp.do(goRun)
  40. }
  41. // in goroutine
  42. func (wp *WorkPool) do(one chan func(*time.Timer)) {
  43. defer func() {
  44. if e := recover(); e != nil {
  45. // log.Error("recover panic", zap.Any("error", e))
  46. }
  47. }()
  48. timer := time.NewTimer(time.Second * 1)
  49. defer timer.Stop()
  50. for { // 带超时的循环读取
  51. select {
  52. case <-timer.C: // 10s 超时
  53. fmt.Println("timeout")
  54. close(one)
  55. f, ok := <-one
  56. if !ok {
  57. fmt.Println("close without exec")
  58. return // 关闭成功返回
  59. }
  60. f(timer)
  61. return
  62. case f, ok := <-one: //大多数走这里
  63. if !ok {
  64. return // one 可能关闭
  65. }
  66. f(timer)
  67. timer.Reset(time.Second * 10) // 一次循环重置
  68. }
  69. if len(wp.goChan) < wp.size {
  70. wp.goChan <- one // 成功执行后入队
  71. fmt.Println("pushback:" + strconv.Itoa(len(wp.goChan)))
  72. }
  73. }
  74. }