worker.go 1.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. package wkrp
  2. import "git.wanbits.cc/joe/kettle/utl"
  3. /**
  4. worker managed by dispatcher
  5. */
  6. type Worker struct {
  7. id int
  8. jobChan utl.TJobChan
  9. pool utl.TJobPool
  10. disp *Dispacher
  11. quit chan struct{}
  12. running bool
  13. err error
  14. }
  15. func newWorker(id int, dispacher *Dispacher) *Worker {
  16. return &Worker{
  17. id: id,
  18. jobChan: make(utl.TJobChan, 1),
  19. pool: dispacher.pool,
  20. disp: dispacher,
  21. quit: make(chan struct{}),
  22. running: false,
  23. err: nil,
  24. }
  25. }
  26. func (self *Worker) alive() bool {
  27. return self.running
  28. }
  29. func (self *Worker) error() error {
  30. return self.err
  31. }
  32. func (self *Worker) start() {
  33. defer func() {
  34. if err := recover(); err != nil {
  35. }
  36. self.running = false
  37. self.disp.wg.Done()
  38. }()
  39. go func() {
  40. self.running = true
  41. for {
  42. self.pool <- self.jobChan
  43. select {
  44. case job, ok := <-self.jobChan:
  45. if !ok {
  46. return
  47. }
  48. self.err = job.Do()
  49. case <-self.quit:
  50. for job := range self.jobChan {
  51. self.err = job.Do()
  52. }
  53. close(self.jobChan)
  54. return
  55. }
  56. }
  57. }()
  58. }
  59. func (self *Worker) stop() {
  60. close(self.quit)
  61. }