etcd.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. package etcd
  2. import (
  3. "context"
  4. "github.com/coreos/etcd/clientv3"
  5. "one.com/kettle/utl"
  6. "time"
  7. )
  8. type EtcdClient struct {
  9. c *clientv3.Client
  10. timeout time.Duration
  11. }
  12. func New(servers []string, username, password string, timeout time.Duration) (*EtcdClient, error) {
  13. etcd := &EtcdClient{
  14. timeout: timeout,
  15. }
  16. var err error
  17. conf := clientv3.Config{
  18. Endpoints: servers,
  19. DialTimeout: timeout,
  20. //AutoSyncInterval: 60 * time.Second,
  21. }
  22. if len(username) > 0 {
  23. conf.Username = username
  24. conf.Password = password
  25. }
  26. etcd.c, err = clientv3.New(conf)
  27. return etcd, err
  28. }
  29. func (self *EtcdClient) Close() {
  30. self.c.Close()
  31. }
  32. func (self *EtcdClient) ClusterMembers() []*clientv3.Member {
  33. var ret []*clientv3.Member
  34. resp, err := self.c.MemberList(context.Background())
  35. if err != nil {
  36. return ret
  37. }
  38. for _, v := range resp.Members {
  39. ret = append(ret, (*clientv3.Member)(v))
  40. }
  41. return ret
  42. }
  43. func (self *EtcdClient) Put(k, v string) error {
  44. //ctx, cancel := context.WithTimeout(context.Background(), self.timeout)
  45. //defer cancel()
  46. _, err := self.c.Put(context.TODO(), k, v)
  47. return err
  48. }
  49. func (self *EtcdClient) PutWithLife(k, v string, ttl int64) (clientv3.LeaseID, error) {
  50. if ttl <= 0 {
  51. return 0, utl.ErrParameters
  52. }
  53. leaseId, err := self.Grant(ttl)
  54. if err != nil {
  55. return 0, err
  56. }
  57. _, err = self.c.Put(context.TODO(), k, v, clientv3.WithLease(leaseId))
  58. return leaseId, err
  59. }
  60. func (self *EtcdClient) KeepAlive(leaseId clientv3.LeaseID) error {
  61. _, err := self.c.KeepAlive(context.TODO(), leaseId)
  62. if err != nil {
  63. return err
  64. }
  65. return nil
  66. }
  67. func (self *EtcdClient) KeepAliveOnce(leaseId clientv3.LeaseID)error {
  68. _, err := self.c.KeepAliveOnce(context.TODO(), leaseId)
  69. return err
  70. }
  71. func (self *EtcdClient) Get(k string) ([]byte, error) {
  72. //ctx, cancel := context.WithTimeout(context.Background(), self.timeout)
  73. //defer cancel()
  74. resp, err := self.c.Get(context.TODO(), k)
  75. if err != nil {
  76. return nil, err
  77. }
  78. if len(resp.Kvs) <= 0 {
  79. return nil, utl.ErrContainerEmpty
  80. }
  81. return resp.Kvs[0].Value, nil
  82. }
  83. func (self *EtcdClient) Del(k string) error {
  84. ctx, cancel := context.WithTimeout(context.Background(), self.timeout)
  85. defer cancel()
  86. _, err := self.c.Delete(ctx, k)
  87. return err
  88. }
  89. //// lease
  90. func (self *EtcdClient) Grant(ttl int64) (clientv3.LeaseID, error) {
  91. resp, err := self.c.Grant(context.TODO(), ttl)
  92. if err != nil {
  93. return 0, err
  94. }
  95. return resp.ID, nil
  96. }
  97. func (self *EtcdClient) Revoke(leaseId clientv3.LeaseID) error {
  98. _, err := self.c.Revoke(context.TODO(), leaseId)
  99. return err
  100. }
  101. //// watch
  102. func (self *EtcdClient) Watch(k string) clientv3.WatchChan {
  103. return self.c.Watch(context.Background(), k)
  104. }
  105. func (self *EtcdClient) WatchPrefix(prefix string) clientv3.WatchChan {
  106. return self.c.Watch(context.Background(), prefix, clientv3.WithPrefix())
  107. }
  108. func (self *EtcdClient) WatchRange(from, to string) clientv3.WatchChan {
  109. return self.c.Watch(context.Background(), from, clientv3.WithRange(to))
  110. }
  111. //// maintain
  112. func (self *EtcdClient) Compact(rev int64) error {
  113. ctx, cancel := context.WithTimeout(context.Background(), self.timeout)
  114. defer cancel()
  115. _, err := self.c.Compact(ctx, rev)
  116. return err
  117. }
  118. func (self *EtcdClient) Defrag(endpoint string) error {
  119. ctx, cancel := context.WithTimeout(context.Background(), self.timeout)
  120. defer cancel()
  121. _, err := self.c.Defragment(ctx, endpoint)
  122. return err
  123. }