nats_streaming.go 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. package mqsvr
  2. import (
  3. "github.com/nats-io/stan.go"
  4. )
  5. type Stan struct {
  6. clusterId, clientId string
  7. conn stan.Conn
  8. nats *Nats
  9. }
  10. func NewStan(addrs []string, username, password string, clusterId, clientId, name string) (*Stan, error) {
  11. nats, err := NewNats(addrs, username, password, name)
  12. if err != nil {
  13. return nil, err
  14. }
  15. st, err := stan.Connect(clusterId, clientId,
  16. stan.NatsConn(nats.RawConn()),
  17. stan.SetConnectionLostHandler(func(c stan.Conn, err error) {
  18. // fmt.Pr
  19. }))
  20. if err != nil {
  21. return nil, err
  22. }
  23. return &Stan{
  24. clusterId: clusterId,
  25. clientId: clientId,
  26. nats: nats,
  27. conn: st,
  28. }, nil
  29. }
  30. func (self *Stan) Pub(subj string, msg []byte) error {
  31. return self.conn.Publish(subj, msg)
  32. }
  33. func (self *Stan) APub(subj string, msg []byte) error {
  34. _, err := self.conn.PublishAsync(subj, msg, func(ackNuid string, err error) {
  35. })
  36. return err
  37. }
  38. func (self *Stan) Sub(subj, durable string, cb NatsCbSubsribe) (stan.Subscription, error) {
  39. return self.conn.Subscribe(subj, func(msg *stan.Msg) {
  40. cb(msg.Subject, msg.Data)
  41. }, stan.StartWithLastReceived(), stan.DurableName(durable))
  42. }
  43. func (self *Stan) QSub(subj, durable, qname string, cb NatsCbSubsribe) (stan.Subscription, error) {
  44. return self.conn.QueueSubscribe(subj, qname, func(msg *stan.Msg) {
  45. cb(msg.Subject, msg.Data)
  46. }, stan.StartWithLastReceived(), stan.DurableName(durable))
  47. }
  48. func (self *Stan) Close() {
  49. self.conn.Close()
  50. self.nats.Close()
  51. }