nats.go 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. package mqsvr
  2. import (
  3. "github.com/nats-io/nats.go"
  4. "strings"
  5. "time"
  6. )
  7. type NatsCbSubsribe func(subj string, data []byte)
  8. type NatsCbQueueSubsribe func(subj, queue string, data []byte)
  9. type NatsClient struct {
  10. addrs []string
  11. conn *nats.Conn
  12. }
  13. func NewNatsClient(addrs []string, username, password, name string) (*NatsClient, error) {
  14. nc, err := nats.Connect(strings.Join(addrs, ","),
  15. nats.NoEcho(),
  16. nats.Name(name),
  17. nats.Timeout(3*time.Second),
  18. nats.MaxReconnects(100),
  19. nats.ReconnectWait(5*time.Second),
  20. nats.UserInfo(username, password),
  21. nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
  22. //fmt.Println()
  23. }),
  24. nats.ReconnectHandler(func(nc *nats.Conn) {
  25. //fmt.
  26. }),
  27. nats.ClosedHandler(func(nc *nats.Conn) {
  28. //fmt
  29. }))
  30. if nil != err {
  31. return nil, err
  32. }
  33. return &NatsClient{addrs: addrs, conn: nc}, nil
  34. }
  35. func (self *NatsClient) Close() {
  36. self.conn.Close()
  37. }
  38. func (self *NatsClient) Pub(subj string, content []byte, timeout time.Duration) error {
  39. err := self.conn.Publish(subj, content)
  40. if err != nil {
  41. return err
  42. }
  43. //self.conn.FlushTimeout(timeout)
  44. return nil
  45. }
  46. func (self *NatsClient) Sub(subj string, cb NatsCbSubsribe) (*nats.Subscription, error) {
  47. s, err := self.conn.Subscribe(subj, func(msg *nats.Msg) {
  48. cb(msg.Subject, msg.Data)
  49. })
  50. if err != nil {
  51. return nil, err
  52. }
  53. //self.conn.Flush()
  54. return s, nil
  55. }
  56. func (self *NatsClient) QSub(subj, q string, cb NatsCbQueueSubsribe) (*nats.Subscription, error) {
  57. s, err := self.conn.QueueSubscribe(subj, q, func(msg *nats.Msg) {
  58. cb(msg.Subject, msg.Sub.Queue, msg.Data)
  59. })
  60. if err != nil {
  61. return nil, err
  62. }
  63. //self.conn.Flush()
  64. return s, nil
  65. }