joe 4 years ago
parent
commit
b9988ae8eb
4 changed files with 50 additions and 5 deletions
  1. 1 1
      go.mod
  2. 1 1
      mq/mqsvr/nats.go
  3. 1 1
      mq/mqsvr/nats_streaming.go
  4. 47 2
      mq/mqsvr/nats_test.go

+ 1 - 1
go.mod

@@ -27,7 +27,7 @@ require (
 	github.com/natefinch/lumberjack v2.0.0+incompatible
 	github.com/nats-io/nats-server/v2 v2.1.9 // indirect
 	github.com/nats-io/nats.go v1.10.0
-	github.com/nats-io/stan.go v0.8.3 // indirect
+	github.com/nats-io/stan.go v0.8.3
 	github.com/onsi/ginkgo v1.14.2 // indirect
 	github.com/onsi/gomega v1.10.4 // indirect
 	github.com/prometheus/client_golang v0.9.3 // indirect

+ 1 - 1
mq/mqsvr/nats.go

@@ -14,7 +14,7 @@ type Nats struct {
 	conn  *nats.Conn
 }
 
-func NewNatsClient(addrs []string, username, password, name string) (*Nats, error) {
+func NewNats(addrs []string, username, password, name string) (*Nats, error) {
 	nc, err := nats.Connect(strings.Join(addrs, ","),
 		nats.NoEcho(),
 		nats.Name(name),

+ 1 - 1
mq/mqsvr/nats_streaming.go

@@ -11,7 +11,7 @@ type Stan struct {
 }
 
 func NewStan(addrs []string, username, password string, clusterId, clientId, name string) (*Stan, error) {
-	nats, err := NewNatsClient(addrs, username, password, name)
+	nats, err := NewNats(addrs, username, password, name)
 	if err != nil {
 		return nil, err
 	}

+ 47 - 2
mq/mqsvr/nats_test.go

@@ -1,13 +1,58 @@
 package mqsvr
 
-import "testing"
+import (
+	"strconv"
+	"testing"
+	"time"
+)
 
-func TestNats_Close(t *testing.T) {
+const (
+	addr = "127.0.0.1:4222"
+)
 
+func TestNats_Close(t *testing.T) {
+	n, err := NewNats([]string{"127.0.0.1:4222"}, "test", "test", "n1")
+	if err != nil {
+		t.Fatal(err)
+	}
+	n.Close()
 }
 
 func TestNats_Pub(t *testing.T) {
+	c1, err := NewNats([]string{"127.0.0.1:4222"}, "test", "test", "n1")
+	if err != nil {
+		t.Fatal(err)
+	}
+	c2, err := NewNats([]string{"127.0.0.1:4222"}, "test", "test", "n1")
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	defer func() {
+		c1.Close()
+		c2.Close()
+	}()
+
+	// publisher
+	go func() {
+		for i := 0; i < 10; i++ {
+			_ = c1.Pub("test.ok", []byte("test "+strconv.FormatInt(int64(i), 10)), time.Second)
+			time.Sleep(time.Second)
+		}
+	}()
+
+	//// IMPORTANT: comment this line and run test again. there will be no message lost.
+	time.Sleep(3 * time.Second)
 
+	go func() {
+		_, err := c2.Sub("test.ok", func(subj string, data []byte) {
+			t.Log("received from " + subj + ", data:" + string(data))
+		})
+		if err != nil {
+			t.Fatal(err)
+		}
+	}()
+	time.Sleep(11 * time.Second)
 }
 
 func TestNats_Sub(t *testing.T) {