| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144 |
- package etcd
- import (
- "context"
- "git.wanbits.io/joe/kettle/utl"
- "github.com/coreos/etcd/clientv3"
- "time"
- )
- type EtcdClient struct {
- c *clientv3.Client
- timeout time.Duration
- }
- func New(servers []string, username, password string, timeout time.Duration) (*EtcdClient, error) {
- etcd := &EtcdClient{
- timeout: timeout,
- }
- var err error
- conf := clientv3.Config{
- Endpoints: servers,
- DialTimeout: timeout,
- //AutoSyncInterval: 60 * time.Second,
- }
- if len(username) > 0 {
- conf.Username = username
- conf.Password = password
- }
- etcd.c, err = clientv3.New(conf)
- return etcd, err
- }
- func (self *EtcdClient) Close() {
- self.c.Close()
- }
- func (self *EtcdClient) ClusterMembers() []*clientv3.Member {
- var ret []*clientv3.Member
- resp, err := self.c.MemberList(context.Background())
- if err != nil {
- return ret
- }
- for _, v := range resp.Members {
- ret = append(ret, (*clientv3.Member)(v))
- }
- return ret
- }
- func (self *EtcdClient) Put(k, v string, ops ...clientv3.OpOption) error {
- //ctx, cancel := context.WithTimeout(context.Background(), self.timeout)
- //defer cancel()
- _, err := self.c.Put(context.TODO(), k, v, ops...)
- return err
- }
- func (self *EtcdClient) PutWithLife(k, v string, ttl int64) (clientv3.LeaseID, error) {
- if ttl <= 0 {
- return 0, utl.ErrParameters
- }
- leaseId, err := self.Grant(ttl)
- if err != nil {
- return 0, err
- }
- _, err = self.c.Put(context.TODO(), k, v, clientv3.WithLease(leaseId))
- return leaseId, err
- }
- func (self *EtcdClient) KeepAlive(leaseId clientv3.LeaseID) error {
- _, err := self.c.KeepAlive(context.TODO(), leaseId)
- if err != nil {
- return err
- }
- return nil
- }
- func (self *EtcdClient) KeepAliveOnce(leaseId clientv3.LeaseID) error {
- _, err := self.c.KeepAliveOnce(context.TODO(), leaseId)
- return err
- }
- func (self *EtcdClient) Get(k string, ops ...clientv3.OpOption) (*clientv3.GetResponse, error) {
- //ctx, cancel := context.WithTimeout(context.Background(), self.timeout)
- //defer cancel()
- resp, err := self.c.Get(context.TODO(), k, ops...)
- if err != nil {
- return nil, err
- }
- return resp, nil
- }
- func (self *EtcdClient) Del(k string) error {
- ctx, cancel := context.WithTimeout(context.Background(), self.timeout)
- defer cancel()
- _, err := self.c.Delete(ctx, k)
- return err
- }
- //// lease
- func (self *EtcdClient) Grant(ttl int64) (clientv3.LeaseID, error) {
- resp, err := self.c.Grant(context.TODO(), ttl)
- if err != nil {
- return 0, err
- }
- return resp.ID, nil
- }
- func (self *EtcdClient) Revoke(leaseId clientv3.LeaseID) error {
- _, err := self.c.Revoke(context.TODO(), leaseId)
- return err
- }
- //// watch
- func (self *EtcdClient) Watch(k string, ops ...clientv3.OpOption) clientv3.WatchChan {
- return self.c.Watch(context.Background(), k, ops...)
- }
- func (self *EtcdClient) WatchPrefix(prefix string) clientv3.WatchChan {
- return self.c.Watch(context.Background(), prefix, clientv3.WithPrefix())
- }
- func (self *EtcdClient) WatchRange(from, to string) clientv3.WatchChan {
- return self.c.Watch(context.Background(), from, clientv3.WithRange(to))
- }
- //// maintain
- func (self *EtcdClient) Compact(rev int64) error {
- ctx, cancel := context.WithTimeout(context.Background(), self.timeout)
- defer cancel()
- _, err := self.c.Compact(ctx, rev)
- return err
- }
- func (self *EtcdClient) Defrag(endpoint string) error {
- ctx, cancel := context.WithTimeout(context.Background(), self.timeout)
- defer cancel()
- _, err := self.c.Defragment(ctx, endpoint)
- return err
- }
|