package mqsvr import ( "github.com/nats-io/nats.go" "strings" "time" ) type NatsCbSubsribe func(subj string, data []byte) type NatsCbQueueSubsribe func(subj, queue string, data []byte) type Nats struct { addrs []string conn *nats.Conn } func NewNatsClient(addrs []string, username, password, name string) (*Nats, error) { nc, err := nats.Connect(strings.Join(addrs, ","), nats.NoEcho(), nats.Name(name), nats.Timeout(3*time.Second), nats.MaxReconnects(100), nats.ReconnectWait(5*time.Second), nats.UserInfo(username, password), nats.DisconnectErrHandler(func(nc *nats.Conn, err error) { //fmt.Println() }), nats.ReconnectHandler(func(nc *nats.Conn) { //fmt. }), nats.ClosedHandler(func(nc *nats.Conn) { //fmt })) if nil != err { return nil, err } return &Nats{addrs: addrs, conn: nc}, nil } func (self *Nats) Close() { self.conn.Drain() self.conn.Close() } func (self *Nats) RawConn() *nats.Conn { return self.conn } func (self *Nats) Pub(subj string, content []byte, timeout time.Duration) error { err := self.conn.Publish(subj, content) if err != nil { return err } //self.conn.FlushTimeout(timeout) return nil } func (self *Nats) Sub(subj string, cb NatsCbSubsribe) (*nats.Subscription, error) { s, err := self.conn.Subscribe(subj, func(msg *nats.Msg) { cb(msg.Subject, msg.Data) }) if err != nil { return nil, err } //self.conn.Flush() return s, nil } func (self *Nats) QSub(subj, q string, cb NatsCbQueueSubsribe) (*nats.Subscription, error) { s, err := self.conn.QueueSubscribe(subj, q, func(msg *nats.Msg) { cb(msg.Subject, msg.Sub.Queue, msg.Data) }) if err != nil { return nil, err } //self.conn.Flush() return s, nil }