worker.go 1.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. package wkrp
  2. /**
  3. worker managed by dispatcher
  4. */
  5. type Worker struct {
  6. id int
  7. jobChan TJobChan
  8. pool TJobPool
  9. disp *Dispacher
  10. quit chan struct{}
  11. running bool
  12. err error
  13. }
  14. func newWorker(id int, dispacher *Dispacher) *Worker {
  15. return &Worker{
  16. id: id,
  17. jobChan: make(TJobChan, 1),
  18. pool: dispacher.pool,
  19. disp: dispacher,
  20. quit: make(chan struct{}),
  21. running: false,
  22. err: nil,
  23. }
  24. }
  25. func (self *Worker) alive() bool {
  26. return self.running
  27. }
  28. func (self *Worker) error() error {
  29. return self.err
  30. }
  31. func (self *Worker) start() {
  32. defer func() {
  33. if err := recover(); err != nil {
  34. }
  35. self.running = false
  36. self.disp.wg.Done()
  37. }()
  38. go func() {
  39. self.running = true
  40. for {
  41. self.pool <- self.jobChan
  42. select {
  43. case job, ok := <-self.jobChan:
  44. if !ok {
  45. return
  46. }
  47. self.err = job.Exec()
  48. case <-self.quit:
  49. for job := range self.jobChan {
  50. self.err = job.Exec()
  51. }
  52. close(self.jobChan)
  53. return
  54. }
  55. }
  56. }()
  57. }
  58. func (self *Worker) stop() {
  59. close(self.quit)
  60. }