nats.go 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. package mqsvr
  2. import (
  3. "github.com/nats-io/nats.go"
  4. "strings"
  5. "time"
  6. )
  7. type NatsCbSubsribe func(subj string, data []byte)
  8. type NatsCbQueueSubsribe func(subj, queue string, data []byte)
  9. type Nats struct {
  10. addrs []string
  11. conn *nats.Conn
  12. }
  13. func NewNats(addrs []string, username, password, name string) (*Nats, error) {
  14. nc, err := nats.Connect(strings.Join(addrs, ","),
  15. nats.NoEcho(),
  16. nats.Name(name),
  17. nats.Timeout(3*time.Second),
  18. nats.MaxReconnects(100),
  19. nats.ReconnectWait(5*time.Second),
  20. nats.UserInfo(username, password),
  21. nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
  22. //fmt.Println()
  23. }),
  24. nats.ReconnectHandler(func(nc *nats.Conn) {
  25. //fmt.
  26. }),
  27. nats.ClosedHandler(func(nc *nats.Conn) {
  28. //fmt
  29. }))
  30. if nil != err {
  31. return nil, err
  32. }
  33. return &Nats{addrs: addrs, conn: nc}, nil
  34. }
  35. func (self *Nats) Close() {
  36. self.conn.Drain()
  37. self.conn.Close()
  38. }
  39. func (self *Nats) RawConn() *nats.Conn {
  40. return self.conn
  41. }
  42. func (self *Nats) Pub(subj string, content []byte, timeout time.Duration) error {
  43. err := self.conn.Publish(subj, content)
  44. if err != nil {
  45. return err
  46. }
  47. //self.conn.FlushTimeout(timeout)
  48. return nil
  49. }
  50. func (self *Nats) Sub(subj string, cb NatsCbSubsribe) (*nats.Subscription, error) {
  51. s, err := self.conn.Subscribe(subj, func(msg *nats.Msg) {
  52. cb(msg.Subject, msg.Data)
  53. })
  54. if err != nil {
  55. return nil, err
  56. }
  57. //self.conn.Flush()
  58. return s, nil
  59. }
  60. func (self *Nats) QSub(subj, q string, cb NatsCbQueueSubsribe) (*nats.Subscription, error) {
  61. s, err := self.conn.QueueSubscribe(subj, q, func(msg *nats.Msg) {
  62. cb(msg.Subject, msg.Sub.Queue, msg.Data)
  63. })
  64. if err != nil {
  65. return nil, err
  66. }
  67. //self.conn.Flush()
  68. return s, nil
  69. }