package etcd import ( "context" "time" "kettle/utl" "github.com/coreos/etcd/clientv3" ) type EtcdClient struct { c *clientv3.Client timeout time.Duration } func New(servers []string, username, password string, timeout time.Duration) (*EtcdClient, error) { etcd := &EtcdClient{ timeout: timeout, } var err error conf := clientv3.Config{ Endpoints: servers, DialTimeout: timeout, //AutoSyncInterval: 60 * time.Second, } if len(username) > 0 { conf.Username = username conf.Password = password } etcd.c, err = clientv3.New(conf) return etcd, err } func (self *EtcdClient) Close() { self.c.Close() } func (self *EtcdClient) ClusterMembers() []*clientv3.Member { var ret []*clientv3.Member resp, err := self.c.MemberList(context.Background()) if err != nil { return ret } for _, v := range resp.Members { ret = append(ret, (*clientv3.Member)(v)) } return ret } func (self *EtcdClient) Put(k, v string, ops ...clientv3.OpOption) error { //ctx, cancel := context.WithTimeout(context.Background(), self.timeout) //defer cancel() _, err := self.c.Put(context.TODO(), k, v, ops...) return err } func (self *EtcdClient) PutWithLife(k, v string, ttl int64) (clientv3.LeaseID, error) { if ttl <= 0 { return 0, utl.ErrParameters } leaseId, err := self.Grant(ttl) if err != nil { return 0, err } _, err = self.c.Put(context.TODO(), k, v, clientv3.WithLease(leaseId)) return leaseId, err } func (self *EtcdClient) KeepAlive(leaseId clientv3.LeaseID) error { _, err := self.c.KeepAlive(context.TODO(), leaseId) if err != nil { return err } return nil } func (self *EtcdClient) KeepAliveOnce(leaseId clientv3.LeaseID) error { _, err := self.c.KeepAliveOnce(context.TODO(), leaseId) return err } func (self *EtcdClient) Get(k string, ops ...clientv3.OpOption) (*clientv3.GetResponse, error) { //ctx, cancel := context.WithTimeout(context.Background(), self.timeout) //defer cancel() resp, err := self.c.Get(context.TODO(), k, ops...) if err != nil { return nil, err } return resp, nil } func (self *EtcdClient) Del(k string) error { ctx, cancel := context.WithTimeout(context.Background(), self.timeout) defer cancel() _, err := self.c.Delete(ctx, k) return err } //// lease func (self *EtcdClient) Grant(ttl int64) (clientv3.LeaseID, error) { resp, err := self.c.Grant(context.TODO(), ttl) if err != nil { return 0, err } return resp.ID, nil } func (self *EtcdClient) Revoke(leaseId clientv3.LeaseID) error { _, err := self.c.Revoke(context.TODO(), leaseId) return err } //// watch func (self *EtcdClient) Watch(k string, ops ...clientv3.OpOption) clientv3.WatchChan { return self.c.Watch(context.Background(), k, ops...) } func (self *EtcdClient) WatchPrefix(prefix string) clientv3.WatchChan { return self.c.Watch(context.Background(), prefix, clientv3.WithPrefix()) } func (self *EtcdClient) WatchRange(from, to string) clientv3.WatchChan { return self.c.Watch(context.Background(), from, clientv3.WithRange(to)) } //// maintain func (self *EtcdClient) Compact(rev int64) error { ctx, cancel := context.WithTimeout(context.Background(), self.timeout) defer cancel() _, err := self.c.Compact(ctx, rev) return err } func (self *EtcdClient) Defrag(endpoint string) error { ctx, cancel := context.WithTimeout(context.Background(), self.timeout) defer cancel() _, err := self.c.Defragment(ctx, endpoint) return err }