| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061 |
- package mqsvr
- import (
- "github.com/nats-io/stan.go"
- )
- type Stan struct {
- clusterId, clientId string
- conn stan.Conn
- nats *Nats
- }
- func NewStan(addrs []string, username, password string, clusterId, clientId, name string) (*Stan, error) {
- nats, err := NewNatsClient(addrs, username, password, name)
- if err != nil {
- return nil, err
- }
- st, err := stan.Connect(clusterId, clientId,
- stan.NatsConn(nats.RawConn()),
- stan.SetConnectionLostHandler(func(c stan.Conn, err error) {
- // fmt.Pr
- }))
- if err != nil {
- return nil, err
- }
- return &Stan{
- clusterId: clusterId,
- clientId: clientId,
- nats: nats,
- conn: st,
- }, nil
- }
- func (self *Stan) Pub(subj string, msg []byte) error {
- return self.conn.Publish(subj, msg)
- }
- func (self *Stan) APub(subj string, msg []byte) error {
- _, err := self.conn.PublishAsync(subj, msg, func(ackNuid string, err error) {
- })
- return err
- }
- func (self *Stan) Sub(subj, durable string, cb NatsCbSubsribe) (stan.Subscription, error) {
- return self.conn.Subscribe(subj, func(msg *stan.Msg) {
- cb(msg.Subject, msg.Data)
- }, stan.StartWithLastReceived(), stan.DurableName(durable))
- }
- func (self *Stan) QSub(subj, durable, qname string, cb NatsCbSubsribe) (stan.Subscription, error) {
- return self.conn.QueueSubscribe(subj, qname, func(msg *stan.Msg) {
- cb(msg.Subject, msg.Data)
- }, stan.StartWithLastReceived(), stan.DurableName(durable))
- }
- func (self *Stan) Close() {
- self.conn.Close()
- self.nats.Close()
- }
|