Jelajahi Sumber

bugfix: workerpool

joe 4 tahun lalu
induk
melakukan
62d407f245
5 mengubah file dengan 67 tambahan dan 34 penghapusan
  1. 2 1
      utl/common.go
  2. 31 17
      wkrp/dispatcher.go
  3. 9 7
      wkrp/dispatcher_test.go
  4. 23 7
      wkrp/job.go
  5. 2 2
      wkrp/worker.go

+ 2 - 1
utl/common.go

@@ -6,10 +6,11 @@ import (
 )
 
 var (
-	ErrNotImplemented       = errors.New("not implemented yet")
+	ErrNotImplemented     = errors.New("not implemented yet")
 	ErrInterfaceTransform = errors.New("interface transform failed")
 	ErrParameters         = errors.New("invalid parameters")
 	ErrContainerEmpty     = errors.New("target container is empty")
+	ErrNotExists          = errors.New("target not exists")
 )
 
 func ErrForCode(code int) error {

+ 31 - 17
wkrp/dispatcher.go

@@ -2,26 +2,40 @@ package wkrp
 
 import "sync"
 
-type TJobChan chan IJob
+const (
+	def_POOL_SIZE    = 64
+	def_ENQCHAN_SIZE = 64
+)
+
+type TJobChan chan *Job
 type TJobPool chan TJobChan
 
 type Dispacher struct {
-	size    int
-	enqchan TJobChan
-	pool    TJobPool
-	workers map[int]*Worker
-	quit    chan struct{}
-	wg      sync.WaitGroup
+	enqchan   TJobChan
+	enqchanSz int
+	pool      TJobPool
+	poolSz    int
+	workers   map[int]*Worker
+	quit      chan struct{}
+	wg        sync.WaitGroup
 }
 
-func NewDispacher(q_size int) *Dispacher {
+func NewDispacher(pool_size int, cache_size int) *Dispacher {
+	if pool_size < 1 {
+		pool_size = def_POOL_SIZE
+	}
+	if cache_size < 1 {
+		cache_size = def_ENQCHAN_SIZE
+	}
+
 	p := &Dispacher{
-		size:    q_size,
-		enqchan: make(TJobChan, 64),
-		pool:    make(TJobPool, q_size),
-		workers: make(map[int]*Worker),
-		quit:    make(chan struct{}),
-		wg:      sync.WaitGroup{},
+		poolSz:    pool_size,
+		enqchanSz: cache_size,
+		enqchan:   make(TJobChan, cache_size),
+		pool:      make(TJobPool, pool_size),
+		workers:   make(map[int]*Worker),
+		quit:      make(chan struct{}),
+		wg:        sync.WaitGroup{},
 	}
 
 	go p.run()
@@ -44,7 +58,7 @@ func (self *Dispacher) Stop() {
 	}()
 }
 
-func (self *Dispacher) Do(job IJob) {
+func (self *Dispacher) Do(job *Job) {
 	go func() {
 		select {
 		case <-self.quit:
@@ -54,11 +68,11 @@ func (self *Dispacher) Do(job IJob) {
 }
 
 func (self *Dispacher) run() {
-	for i := 0; i < self.size; i++ {
+	for i := 0; i < self.poolSz; i++ {
 		w := newWorker(i+1, self)
 		self.workers[i+1] = w
 
-		self.wg.Add(1);
+		self.wg.Add(1)
 
 		w.start()
 	}

+ 9 - 7
wkrp/dispatcher_test.go

@@ -7,13 +7,15 @@ import (
 )
 
 func TestNewDispacher(t *testing.T) {
-	d := NewDispacher(16)
+	d := NewDispacher(16, 64)
 	defer d.Stop()
 
-	d.Do(IJobFn(func() error {
-		time.Sleep(time.Second)
-		fmt.Println("did")
-		return nil
-	}))
-	time.Sleep(2 * time.Second)
+	for i := 0; i < 100; i++ {
+		d.Do(NewJob(IJobFn(func(inf interface{}) error {
+			fmt.Println("did", inf.(int))
+			return nil
+		}), i))
+	}
+
+	time.Sleep(1 * time.Second)
 }

+ 23 - 7
wkrp/job.go

@@ -1,17 +1,33 @@
 package wkrp
 
 /**
-	A job should be callable
+A job should be callable
 
-	so there may be 2 types of jobs at least
- */
+so there may be 2 types of jobs at least
+*/
 
 type IJob interface {
-	Exec() error
+	Exec(interface{}) error
 }
 
-type IJobFn func() error
+type IJobFn func(interface{}) error
 
-func (fn IJobFn) Exec() error {
-	return fn()
+func (fn IJobFn) Exec(inf interface{}) error {
+	return fn(inf)
+}
+
+type Job struct {
+	param interface{}
+	job   IJob
+}
+
+func NewJob(job IJob, param interface{}) *Job {
+	return &Job{
+		param: param,
+		job:   job,
+	}
+}
+
+func (self *Job) do() error {
+	return self.job.Exec(self.param)
 }

+ 2 - 2
wkrp/worker.go

@@ -54,11 +54,11 @@ func (self *Worker) start() {
 					return
 				}
 
-				self.err = job.Exec()
+				self.err = job.do()
 
 			case <-self.quit:
 				for job := range self.jobChan {
-					self.err = job.Exec()
+					self.err = job.do()
 				}
 				close(self.jobChan)
 				return