Bladeren bron

enhance: utl.WorkPool

joe 4 jaren geleden
bovenliggende
commit
94cadaf148
7 gewijzigde bestanden met toevoegingen van 49 en 34 verwijderingen
  1. 4 2
      utl/job.go
  2. 8 0
      utl/smap.go
  3. 15 15
      utl/workerPool.go
  4. 4 2
      utl/workerPool_test.go
  5. 9 9
      wkrp/dispatcher.go
  6. 2 1
      wkrp/dispatcher_test.go
  7. 7 5
      wkrp/worker.go

+ 4 - 2
wkrp/job.go → utl/job.go

@@ -1,4 +1,4 @@
-package wkrp
+package utl
 
 /**
 A job should be callable
@@ -11,6 +11,8 @@ type IJob interface {
 }
 
 type IJobFn func(interface{}) error
+type TJobChan chan *Job
+type TJobPool chan TJobChan
 
 func (fn IJobFn) Exec(inf interface{}) error {
 	return fn(inf)
@@ -28,6 +30,6 @@ func NewJob(job IJob, param interface{}) *Job {
 	}
 }
 
-func (self *Job) do() error {
+func (self *Job) Do() error {
 	return self.job.Exec(self.param)
 }

+ 8 - 0
utl/smap.go

@@ -99,3 +99,11 @@ func (self *SMap) Values() []interface{} {
 
 	return retval
 }
+
+func (self *SMap) Foreach(each func(k, v interface{})) {
+	self.RLock()
+	for k, v := range self.m {
+		each(k, v)
+	}
+	self.RUnlock()
+}

+ 15 - 15
utl/workerPool.go

@@ -25,7 +25,7 @@ func WithWorkPoolLifetime(lifetime time.Duration) WorkPoolOption {
 }
 
 type WorkPool struct {
-	goChan   chan chan func(*time.Timer)
+	goChan   TJobPool
 	size     int
 	lifetime time.Duration
 }
@@ -37,31 +37,31 @@ func NewWorkPool(options ...WorkPoolOption) *WorkPool {
 		opt(wp)
 	}
 
-	wp.goChan = make(chan chan func(*time.Timer), wp.size)
+	wp.goChan = make(TJobPool, wp.size)
 
 	return wp
 }
 
-func (wp *WorkPool) Put(f func(*time.Timer)) {
+func (wp *WorkPool) Put(job *Job) {
 	for len(wp.goChan) > 0 {
-		goRun := <-wp.goChan
-		if goRun == nil {
+		jobChan := <-wp.goChan
+		if jobChan == nil {
 			continue
 		}
 		select {
-		case <-goRun:
+		case <-jobChan:
 		default:
-			goRun <- f
+			jobChan <- job
 			return
 		}
 	}
-	goRun := make(chan func(*time.Timer), 1)
-	goRun <- f
-	go wp.do(goRun)
+	jobChan := make(TJobChan, 1)
+	jobChan <- job
+	go wp.do(jobChan)
 }
 
 // in goroutine
-func (wp *WorkPool) do(one chan func(*time.Timer)) {
+func (wp *WorkPool) do(one TJobChan) {
 	defer func() {
 		if e := recover(); e != nil {
 		}
@@ -73,17 +73,17 @@ func (wp *WorkPool) do(one chan func(*time.Timer)) {
 		select {
 		case <-timer.C:
 			close(one)
-			f, ok := <-one
+			job, ok := <-one
 			if !ok {
 				return
 			}
-			f(timer)
+			_ = job.Do()
 			return
-		case f, ok := <-one:
+		case job, ok := <-one:
 			if !ok {
 				return // one 可能关闭
 			}
-			f(timer)
+			_ = job.Do()
 			timer.Reset(time.Second * 10)
 		}
 		// reuse

+ 4 - 2
utl/workerPool_test.go

@@ -10,9 +10,11 @@ func TestWorkPool_Put(t *testing.T) {
 
 	num := 2
 	for num > 0 {
-		p.Put(func(timer *time.Timer) {
+		p.Put(NewJob(IJobFn(func(inf interface{}) error {
 			time.Sleep(0 * time.Second)
-		})
+			t.Log("num:", inf.(int))
+			return nil
+		}), num))
 		num = num - 1
 	}
 	time.Sleep(time.Second)

+ 9 - 9
wkrp/dispatcher.go

@@ -1,19 +1,19 @@
 package wkrp
 
-import "sync"
+import (
+	"git.wanbits.io/joe/kettle/utl"
+	"sync"
+)
 
 const (
 	def_POOL_SIZE    = 64
 	def_ENQCHAN_SIZE = 64
 )
 
-type TJobChan chan *Job
-type TJobPool chan TJobChan
-
 type Dispacher struct {
-	enqchan   TJobChan
+	enqchan   utl.TJobChan
 	enqchanSz int
-	pool      TJobPool
+	pool      utl.TJobPool
 	poolSz    int
 	workers   map[int]*Worker
 	quit      chan struct{}
@@ -31,8 +31,8 @@ func NewDispacher(pool_size int, cache_size int) *Dispacher {
 	p := &Dispacher{
 		poolSz:    pool_size,
 		enqchanSz: cache_size,
-		enqchan:   make(TJobChan, cache_size),
-		pool:      make(TJobPool, pool_size),
+		enqchan:   make(utl.TJobChan, cache_size),
+		pool:      make(utl.TJobPool, pool_size),
 		workers:   make(map[int]*Worker),
 		quit:      make(chan struct{}),
 		wg:        sync.WaitGroup{},
@@ -58,7 +58,7 @@ func (self *Dispacher) Stop() {
 	}()
 }
 
-func (self *Dispacher) Do(job *Job) {
+func (self *Dispacher) Do(job *utl.Job) {
 	go func() {
 		select {
 		case <-self.quit:

+ 2 - 1
wkrp/dispatcher_test.go

@@ -2,6 +2,7 @@ package wkrp
 
 import (
 	"fmt"
+	"git.wanbits.io/joe/kettle/utl"
 	"testing"
 	"time"
 )
@@ -11,7 +12,7 @@ func TestNewDispacher(t *testing.T) {
 	defer d.Stop()
 
 	for i := 0; i < 100; i++ {
-		d.Do(NewJob(IJobFn(func(inf interface{}) error {
+		d.Do(utl.NewJob(utl.IJobFn(func(inf interface{}) error {
 			fmt.Println("did", inf.(int))
 			return nil
 		}), i))

+ 7 - 5
wkrp/worker.go

@@ -1,13 +1,15 @@
 package wkrp
 
+import "git.wanbits.io/joe/kettle/utl"
+
 /**
 worker managed by dispatcher
 */
 
 type Worker struct {
 	id      int
-	jobChan TJobChan
-	pool    TJobPool
+	jobChan utl.TJobChan
+	pool    utl.TJobPool
 	disp    *Dispacher
 	quit    chan struct{}
 	running bool
@@ -17,7 +19,7 @@ type Worker struct {
 func newWorker(id int, dispacher *Dispacher) *Worker {
 	return &Worker{
 		id:      id,
-		jobChan: make(TJobChan, 1),
+		jobChan: make(utl.TJobChan, 1),
 		pool:    dispacher.pool,
 		disp:    dispacher,
 		quit:    make(chan struct{}),
@@ -54,11 +56,11 @@ func (self *Worker) start() {
 					return
 				}
 
-				self.err = job.do()
+				self.err = job.Do()
 
 			case <-self.quit:
 				for job := range self.jobChan {
-					self.err = job.do()
+					self.err = job.Do()
 				}
 				close(self.jobChan)
 				return