| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778 |
- package mqsvr
- import (
- "github.com/nats-io/nats.go"
- "strings"
- "time"
- )
- type NatsCbSubsribe func(subj string, data []byte)
- type NatsCbQueueSubsribe func(subj, queue string, data []byte)
- type Nats struct {
- addrs []string
- conn *nats.Conn
- }
- func NewNats(addrs []string, username, password, name string) (*Nats, error) {
- nc, err := nats.Connect(strings.Join(addrs, ","),
- nats.NoEcho(),
- nats.Name(name),
- nats.Timeout(3*time.Second),
- nats.MaxReconnects(100),
- nats.ReconnectWait(5*time.Second),
- nats.UserInfo(username, password),
- nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
- //fmt.Println()
- }),
- nats.ReconnectHandler(func(nc *nats.Conn) {
- //fmt.
- }),
- nats.ClosedHandler(func(nc *nats.Conn) {
- //fmt
- }))
- if nil != err {
- return nil, err
- }
- return &Nats{addrs: addrs, conn: nc}, nil
- }
- func (self *Nats) Close() {
- self.conn.Drain()
- self.conn.Close()
- }
- func (self *Nats) RawConn() *nats.Conn {
- return self.conn
- }
- func (self *Nats) Pub(subj string, content []byte, timeout time.Duration) error {
- err := self.conn.Publish(subj, content)
- if err != nil {
- return err
- }
- //self.conn.FlushTimeout(timeout)
- return nil
- }
- func (self *Nats) Sub(subj string, cb NatsCbSubsribe) (*nats.Subscription, error) {
- s, err := self.conn.Subscribe(subj, func(msg *nats.Msg) {
- cb(msg.Subject, msg.Data)
- })
- if err != nil {
- return nil, err
- }
- //self.conn.Flush()
- return s, nil
- }
- func (self *Nats) QSub(subj, q string, cb NatsCbQueueSubsribe) (*nats.Subscription, error) {
- s, err := self.conn.QueueSubscribe(subj, q, func(msg *nats.Msg) {
- cb(msg.Subject, msg.Sub.Queue, msg.Data)
- })
- if err != nil {
- return nil, err
- }
- //self.conn.Flush()
- return s, nil
- }
|