joe 4 lat temu
rodzic
commit
46e18cfc99

+ 30 - 0
etcd/etcd.go

@@ -0,0 +1,30 @@
+package etcd
+
+import (
+	"github.com/coreos/etcd/clientv3"
+	"time"
+)
+
+type EtcdClient struct {
+	c *clientv3.Client
+}
+
+func Connect(servers []string, username, password string, timeout time.Duration) (*EtcdClient, error) {
+	etcd := &EtcdClient{}
+	var err error
+
+	conf := clientv3.Config{
+		Endpoints:servers,
+		DialTimeout: timeout,
+	}
+	if len(username) > 0 {
+		conf.Username = username
+		conf.Password = password
+	}
+	etcd.c, err = clientv3.New(conf)
+	return etcd, err
+}
+
+func (self *EtcdClient) Put() {
+	self.c.Put()
+}

+ 4 - 0
go.mod

@@ -4,6 +4,8 @@ go 1.15
 
 require (
 	github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect
+	github.com/coreos/etcd v3.3.13+incompatible
+	github.com/dgrijalva/jwt-go v3.2.0+incompatible
 	github.com/gin-contrib/cors v1.3.1
 	github.com/gin-gonic/gin v1.6.3
 	github.com/go-ole/go-ole v1.2.5 // indirect
@@ -14,5 +16,7 @@ require (
 	github.com/shirou/gopsutil v3.20.12+incompatible
 	github.com/spf13/viper v1.7.1
 	go.uber.org/zap v1.16.0
+	gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect
+	gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df // indirect
 	gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
 )

+ 9 - 0
go.sum

@@ -29,6 +29,7 @@ github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJm
 github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
 github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
 github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk=
+github.com/coreos/etcd v3.3.13+incompatible h1:8F3hqu9fGYLBifCmRCJsicFqDx/D68Rt3q1JMazcgBQ=
 github.com/coreos/etcd v3.3.13+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
 github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
 github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
@@ -36,6 +37,7 @@ github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfc
 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
 github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
 github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
 github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
@@ -71,6 +73,7 @@ github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGK
 github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
 github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
 github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
+github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE=
 github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
 github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
 github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
@@ -383,9 +386,11 @@ google.golang.org/genproto v0.0.0-20190502173448-54afdca5d873/go.mod h1:VzzqZJRn
 google.golang.org/genproto v0.0.0-20190801165951-fa694d86fc64/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
 google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
 google.golang.org/genproto v0.0.0-20190911173649-1774047e7e51/go.mod h1:IbNlFCBrqXvoKpeg0TB2l7cyZUmoaFKYIwrEpbDKLA8=
+google.golang.org/genproto v0.0.0-20191108220845-16a3f7862a1a h1:Ob5/580gVHBJZgXnff1cZDbG+xLtMVE5mDRTe+nIsX4=
 google.golang.org/genproto v0.0.0-20191108220845-16a3f7862a1a/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc=
 google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
 google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
+google.golang.org/grpc v1.21.1 h1:j6XxA85m/6txkUCHvzlV5f+HBNl/1r5cZ2A/3IEFOO8=
 google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
 google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
 google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
@@ -395,6 +400,8 @@ google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzi
 google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM=
 google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
 gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
+gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc h1:2gGKlE2+asNV9m7xrywl36YYNnBG5ZQ0r/BOOxqPpmk=
+gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc/go.mod h1:m7x9LTH6d71AHyAX77c9yqWCCa3UKHcVEj9y7hAtKDk=
 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
@@ -402,6 +409,8 @@ gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
 gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
 gopkg.in/go-playground/assert.v1 v1.2.1/go.mod h1:9RXL0bg/zibRAgZUYszZSwO/z8Y/a8bDuhia5mkpMnE=
 gopkg.in/go-playground/validator.v9 v9.29.1/go.mod h1:+c9/zcJMFNgbLvly1L1V+PpxWdVbfP1avr/N00E2vyQ=
+gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df h1:n7WqCuqOuCbNr617RXOY0AWRXxgwEyPp2z+p0+hgMuE=
+gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df/go.mod h1:LRQQ+SO6ZHR7tOkpBDuZnXENFzX8qRjMDMyPD6BRkCw=
 gopkg.in/ini.v1 v1.51.0 h1:AQvPpx3LzTDM0AjnIRlVFwFFGC+npRopjZxLJj6gdno=
 gopkg.in/ini.v1 v1.51.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
 gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8=

+ 1 - 0
mq/internal/kafka.go

@@ -0,0 +1 @@
+package internal

+ 1 - 0
mq/internal/nats.go

@@ -0,0 +1 @@
+package internal

+ 1 - 0
mq/internal/rabbit.go

@@ -0,0 +1 @@
+package internal

+ 46 - 5
rds/client.go

@@ -5,16 +5,40 @@ import (
 	"time"
 )
 
+const (
+	TIMEOUT_READ  = 3 * time.Second
+	TIMEOUT_WRITE = 5 * time.Second
+	TIMEOUT_POOL  = 60 * time.Second
+	SIZE_POOL     = 10
+)
+
 func Connect(addr, password string, db int) (*redis.Client, error) {
 	conn := redis.NewClient(&redis.Options{
 		Addr:         addr,
 		Password:     password,
 		DB:           db,
-		DialTimeout:  3 * time.Second,
-		ReadTimeout:  3 * time.Second,
-		WriteTimeout: 5 * time.Second,
-		PoolSize:     10,
-		PoolTimeout:  60 * time.Second,
+		DialTimeout:  TIMEOUT_READ,
+		ReadTimeout:  TIMEOUT_READ,
+		WriteTimeout: TIMEOUT_WRITE,
+		PoolSize:     SIZE_POOL,
+		PoolTimeout:  TIMEOUT_POOL,
+	})
+	_, err := conn.Ping().Result()
+	if nil != err {
+		return nil, err
+	}
+	return conn, nil
+}
+
+func ConnectCluster(addrs []string, password string, db int) (*redis.ClusterClient, error) {
+	conn := redis.NewClusterClient(&redis.ClusterOptions{
+		Addrs:        addrs,
+		Password:     password,
+		DialTimeout:  TIMEOUT_READ,
+		ReadTimeout:  TIMEOUT_READ,
+		WriteTimeout: TIMEOUT_WRITE,
+		PoolSize:     SIZE_POOL,
+		PoolTimeout:  TIMEOUT_POOL,
 	})
 	_, err := conn.Ping().Result()
 	if nil != err {
@@ -23,3 +47,20 @@ func Connect(addr, password string, db int) (*redis.Client, error) {
 	return conn, nil
 }
 
+func ConnectSentinel(addrs []string, master, password string, db int) (*redis.Client, error) {
+	conn := redis.NewFailoverClient(&redis.FailoverOptions{
+		MasterName:    master,
+		SentinelAddrs: addrs,
+		Password:      password,
+		DB:            db,
+		DialTimeout:   TIMEOUT_READ,
+		ReadTimeout:   TIMEOUT_READ,
+		WriteTimeout:  TIMEOUT_WRITE,
+		PoolSize:      SIZE_POOL,
+		PoolTimeout:   TIMEOUT_POOL,
+	})
+	if _, err := conn.Ping().Result(); err != nil {
+		return nil, err
+	}
+	return conn, nil
+}

+ 103 - 0
rds/script.go

@@ -0,0 +1,103 @@
+package rds
+
+import (
+	"github.com/go-redis/redis"
+	"io/ioutil"
+	"os"
+	"path/filepath"
+)
+
+/**
+ 	提供对 redis 中 lua 脚本的简单封装
+
+	lua 主要用于逻辑层面的批量 key 操作。 这和 multi 命令不同:
+	1. lua 脚本是 atomic 的, 可以看作事务。(和DBMS事务不同,主要不支持回滚,这里仅仅是提供了“一次性”的逻辑)
+	2. lua 脚本执行不会中途打断。
+	3. lua 脚本相比 multi 有更小的数据传数量
+
+	实现目标
+	支持 lua 从 lua 文件加载
+	支持从字符串加载
+	支持从目录加载
+
+	加载后,需要保存 redis 保存脚本后返回的对应的 hash
+
+	調用方可根据文件名调用
+
+	从字符串加载的,可自己设置名字
+	名字不能重名
+*/
+
+// LuaBlock
+type LuaBlock struct {
+	*redis.Script
+	name string
+}
+
+type luablocker interface {
+	Eval(script string, keys []string, args ...interface{}) *redis.Cmd
+	EvalSha(sha1 string, keys []string, args ...interface{}) *redis.Cmd
+	ScriptExists(hashes ...string) *redis.BoolSliceCmd
+	ScriptLoad(script string) *redis.StringCmd
+}
+
+var _ luablocker = (*redis.Client)(nil)
+var _ luablocker = (*redis.Ring)(nil)
+var _ luablocker = (*redis.ClusterClient)(nil)
+
+func newLuaBlock(name, block string) *LuaBlock {
+	return &LuaBlock{
+		Script: redis.NewScript(block),
+		name:   name,
+	}
+}
+
+// LuaBlockManager
+type LuaScripts struct {
+	m map[string]*LuaBlock
+	h luablocker
+}
+
+func NewLuaScripts(handle luablocker) *LuaScripts {
+	return &LuaScripts{
+		m: make(map[string]*LuaBlock),
+		h: handle,
+	}
+}
+
+func (self *LuaScripts) Run(name string, keys []string, args ...interface{})*redis.Cmd{
+	luablock, ok := self.m[name]
+	if !ok {
+		panic("not load lua script:" + name)
+	}
+	return luablock.Run(self.h, keys, args)
+}
+
+func (self * LuaScripts)LoadString(name string, block string) error {
+	_, err := self.h.ScriptLoad(block).Result()
+	if err != nil {
+		return err
+	}
+	self.m[name] = newLuaBlock(name, block)
+	return nil
+}
+
+func (self *LuaScripts) LoadFile(f string) error {
+	data, err := ioutil.ReadFile(f)
+	if err != nil {
+		return err
+	}
+	name := filepath.Base(f)
+
+	return self.LoadString(name, string(data))
+}
+
+func (self *LuaScripts) LoadPath(p string) error {
+	err := filepath.Walk(p, func(path string, info os.FileInfo, err error) error {
+		if err != nil {
+			return err
+		}
+		return self.LoadFile(path)
+	})
+	return err
+}

+ 1 - 0
sms/isms.go

@@ -0,0 +1 @@
+package sms

+ 1 - 0
sms/providers/aliyun.go

@@ -0,0 +1 @@
+package providers

+ 101 - 0
utl/smap.go

@@ -0,0 +1,101 @@
+package utl
+
+import (
+	"errors"
+	"sync"
+)
+
+var (
+	ErrKeyExists    = errors.New("key already exists")
+	ErrKeyNotExists = errors.New("key not exists")
+)
+
+// secure map
+// exports funny interfaces
+type SMap struct {
+	*sync.RWMutex
+	m map[interface{}]interface{}
+}
+
+func NewSMap() *SMap {
+	return &SMap{
+		RWMutex: &sync.RWMutex{},
+		m:       make(map[interface{}]interface{}),
+	}
+}
+
+func (self *SMap) Exists(k interface{}) bool {
+	self.RLock()
+	_, ok := self.m[k]
+	self.RUnlock()
+
+	return ok
+}
+
+func (self *SMap) Get(k interface{}) (interface{}, bool) {
+	self.RLock()
+	defer self.RUnlock()
+
+	v, ok := self.m[k]
+	return v, ok
+}
+
+func (self *SMap) Set(k, v interface{}) {
+	self.Lock()
+	self.m[k] = v
+	self.Unlock()
+}
+
+func (self *SMap) Insert(k, v interface{}) error {
+	self.Lock()
+	defer self.Unlock()
+
+	v, ok := self.m[k]
+	if ok {
+		return ErrKeyExists
+	}
+
+	self.m[k] = v
+
+	return nil
+}
+
+func (self *SMap) Update(k, v interface{}) error {
+	self.Lock()
+	defer self.Unlock()
+
+	v, ok := self.m[k]
+	if !ok {
+		return ErrKeyNotExists
+	}
+
+	self.m[k] = v
+
+	return nil
+}
+
+func (self *SMap) Keys() []interface{} {
+	var retval []interface{}
+
+	self.RLock()
+	defer self.RUnlock()
+
+	for k, _ := range self.m {
+		retval = append(retval, k)
+	}
+
+	return retval
+}
+
+func (self *SMap) Values() []interface{} {
+	var retval []interface{}
+
+	self.RLock()
+	defer self.RUnlock()
+
+	for _, v := range self.m {
+		retval = append(retval, v)
+	}
+
+	return retval
+}

+ 1 - 0
utl/smap_test.go

@@ -0,0 +1 @@
+package utl

+ 30 - 0
utl/smtp.go

@@ -0,0 +1,30 @@
+package utl
+
+import (
+	"gopkg.in/gomail.v2"
+)
+
+type Smtp struct {
+	Host string
+	Port int
+	Username, Password string
+	Alias string
+}
+
+func (self *Smtp) Send(to []string, subject, body string) error {
+	m := gomail.NewMessage(/* gomail.SetEncoding(gomail.Base64) */)
+	m.SetHeader("From", m.FormatAddress(self.Username, self.Alias))
+	m.SetHeader("To", to...)
+	m.SetHeader("Subject", subject)
+	m.SetHeader("text/html", body)
+	//m.Attach("/tmp/foo.txt",
+	//	gomail.Rename("foo.txt"),
+	//	gomail.SetHeader(map[string][]string{
+	//		"Content-Disposition": []string{
+	//			fmt.Sprintf(`attachment; filename="%s"`, mime.QEncoding.Encode("UTF-8", name)),
+	//		},
+	//	}),
+	//)
+	d := gomail.NewDialer(self.Host, self.Port, self.Username, self.Password)
+	return d.DialAndSend(m)
+}

+ 24 - 0
utl/string.go

@@ -0,0 +1,24 @@
+package utl
+
+import(
+	"reflect"
+	"unsafe"
+)
+
+func StringToBytes(s string) []byte {
+	sh := (*reflect.StringHeader)(unsafe.Pointer(&s))
+	bh := reflect.SliceHeader{
+		Data: sh.Data,
+		Len: sh.Len,
+	}
+	return *(*[]byte)(unsafe.Pointer(&bh))
+}
+
+func BytesToString(b []byte) string {
+	bh := (*reflect.SliceHeader)(unsafe.Pointer(&b))
+	sh := reflect.StringHeader{
+		Data: bh.Data,
+		Len:  bh.Len,
+	}
+	return *(*string)(unsafe.Pointer(&sh))
+}

+ 1 - 0
utl/string_test.go

@@ -0,0 +1 @@
+package utl

+ 44 - 0
utl/tokenizer.go

@@ -0,0 +1,44 @@
+package utl
+
+import (
+	"fmt"
+	"github.com/dgrijalva/jwt-go"
+	"time"
+)
+
+type Tokenizer struct {
+	key   string        // key
+	lasts time.Duration // valid period
+}
+
+func NewToken(key string, lasts time.Duration) *Tokenizer {
+	return &Tokenizer{
+		key:   key,
+		lasts: lasts,
+	}
+}
+
+func (self *Tokenizer) Gen(id uint64) (string, error) {
+	clm := jwt.MapClaims{}
+	clm["uid"] = id
+	clm["exp"] = time.Now().Add(self.lasts)
+	tok := jwt.NewWithClaims(jwt.SigningMethodHS512, clm)
+	return tok.SignedString(self.key)
+}
+
+func (self *Tokenizer) Parse(tokenstr string) (uint64, error) {
+	tok, err := jwt.Parse(tokenstr, func(token *jwt.Token) (interface{}, error) {
+		if _, ok := token.Method.(*jwt.SigningMethodHMAC); !ok {
+			return nil, fmt.Errorf("unexpected signing method: %v", token.Header["alg"])
+		}
+		return self.key, nil
+	})
+	if err != nil {
+		return 0, err
+	}
+	clm, ok := tok.Claims.(jwt.MapClaims)
+	if !ok || !tok.Valid {
+		return 0, err
+	}
+	return clm["uid"].(uint64), nil
+}

+ 1 - 0
utl/tokenizer_test.go

@@ -0,0 +1 @@
+package utl

+ 34 - 8
utl/workerPool.go

@@ -2,19 +2,44 @@ package utl
 
 import (
 	"time"
-	//	"go.uber.org/zap"
 )
 
+type WorkPoolOption func(*WorkPool)
+
+func WithWorkPoolSize(size int) WorkPoolOption {
+	return func(wp *WorkPool) {
+		if size < 1 {
+			size = 64
+		}
+		wp.size = size
+	}
+}
+
+func WithWorkPoolLifetime(lifetime time.Duration) WorkPoolOption {
+	if lifetime < 1*time.Second {
+		lifetime = 1 * time.Second
+	}
+	return func(wp *WorkPool) {
+		wp.lifetime = lifetime
+	}
+}
+
 type WorkPool struct {
-	goChan chan chan func(*time.Timer)
-	size   int
+	goChan   chan chan func(*time.Timer)
+	size     int
+	lifetime time.Duration
 }
 
-func NewWorkPool(size int) *WorkPool {
-	return &WorkPool{
-		goChan: make(chan chan func(*time.Timer), size),
-		size:   size,
+func NewWorkPool(options ...WorkPoolOption) *WorkPool {
+	wp := &WorkPool{}
+
+	for _, opt := range options {
+		opt(wp)
 	}
+
+	wp.goChan = make(chan chan func(*time.Timer), wp.size)
+
+	return wp
 }
 
 func (wp *WorkPool) Put(f func(*time.Timer)) {
@@ -61,8 +86,9 @@ func (wp *WorkPool) do(one chan func(*time.Timer)) {
 			f(timer)
 			timer.Reset(time.Second * 10)
 		}
+		// reuse
 		if len(wp.goChan) < wp.size {
 			wp.goChan <- one
 		}
-	}
+	} //for
 }

+ 20 - 11
wkrp/dispatcher.go

@@ -1,11 +1,17 @@
 package wkrp
 
+import "sync"
+
+type TJobChan chan IJob
+type TJobPool chan TJobChan
+
 type Dispacher struct {
 	size    int
 	enqchan TJobChan
 	pool    TJobPool
 	workers map[int]*Worker
 	quit    chan struct{}
+	wg      sync.WaitGroup
 }
 
 func NewDispacher(q_size int) *Dispacher {
@@ -15,6 +21,7 @@ func NewDispacher(q_size int) *Dispacher {
 		pool:    make(TJobPool, q_size),
 		workers: make(map[int]*Worker),
 		quit:    make(chan struct{}),
+		wg:      sync.WaitGroup{},
 	}
 
 	go p.run()
@@ -25,32 +32,34 @@ func NewDispacher(q_size int) *Dispacher {
 func (self *Dispacher) Stop() {
 	go func() {
 		close(self.quit)
+		close(self.enqchan)
 
 		for _, w := range self.workers {
-			if w.alive() {
-				w.Stop()
-			}
+			w.stop()
 		}
 
-		close(self.enqchan)
+		self.wg.Wait()
+
 		close(self.pool)
 	}()
 }
 
-func (self *Dispacher) Do(job Tjob) {
+func (self *Dispacher) Do(job IJob) {
 	go func() {
 		select {
-		case self.enqchan <- job:
 		case <-self.quit:
+		case self.enqchan <- job:
 		}
 	}()
 }
 
 func (self *Dispacher) run() {
 	for i := 0; i < self.size; i++ {
-		w := newWorker(i+1, self.pool)
+		w := newWorker(i+1, self)
 		self.workers[i+1] = w
 
+		self.wg.Add(1);
+
 		w.start()
 	}
 
@@ -63,12 +72,12 @@ func (self *Dispacher) run() {
 			for {
 				jobchan, ok := <-self.pool
 				if ok {
-					jobchan<-job
-					break	// break from for
+					jobchan <- job
+					break // break from for
 				}
-			}// for
+			} // for
 		case <-self.quit:
 			return
 		} // select
-	}// for
+	} // for
 }

+ 2 - 2
wkrp/dispatcher_test.go

@@ -9,9 +9,9 @@ import (
 func TestNewDispacher(t *testing.T) {
 	d := NewDispacher(16)
 
-	d.Do(func() error {
+	d.Do(IJobFn(func() error {
 		time.Sleep(time.Second)
 		fmt.Println("did")
 		return nil
-	})
+	}))
 }

+ 5 - 4
wkrp/job.go

@@ -6,11 +6,12 @@ package wkrp
 	so there may be 2 types of jobs at least
  */
 
-type TFnJob func() error
-
 type IJob interface {
 	Exec() error
 }
 
-// job type switcher
-type Tjob TFnJob
+type IJobFn func() error
+
+func (fn IJobFn) Exec() error {
+	return fn()
+}

+ 13 - 10
wkrp/worker.go

@@ -4,23 +4,22 @@ package wkrp
 worker managed by dispatcher
 */
 
-type TJobChan chan Tjob
-type TJobPool chan TJobChan
-
 type Worker struct {
 	id      int
 	jobChan TJobChan
 	pool    TJobPool
+	disp    *Dispacher
 	quit    chan struct{}
 	running bool
 	err     error
 }
 
-func newWorker(id int, pool TJobPool) *Worker {
+func newWorker(id int, dispacher *Dispacher) *Worker {
 	return &Worker{
 		id:      id,
-		jobChan: make(chan Tjob, 1),
-		pool: pool,
+		jobChan: make(TJobChan, 1),
+		pool:    dispacher.pool,
+		disp:    dispacher,
 		quit:    make(chan struct{}),
 		running: false,
 		err:     nil,
@@ -37,10 +36,11 @@ func (self *Worker) error() error {
 
 func (self *Worker) start() {
 	defer func() {
-		self.running = false
 		if err := recover(); err != nil {
 
 		}
+		self.running = false
+		self.disp.wg.Done()
 	}()
 
 	go func() {
@@ -54,9 +54,12 @@ func (self *Worker) start() {
 					return
 				}
 
-				self.err = job()
+				self.err = job.Exec()
 
 			case <-self.quit:
+				for job := range self.jobChan {
+					self.err = job.Exec()
+				}
 				close(self.jobChan)
 				return
 			}
@@ -64,6 +67,6 @@ func (self *Worker) start() {
 	}()
 }
 
-func (self *Worker) Stop() {
-	go func() { close(self.quit) }()
+func (self *Worker) stop() {
+	close(self.quit)
 }