Ver código fonte

feature: support client reconnct

joe 4 anos atrás
pai
commit
b0029a7e60
5 arquivos alterados com 104 adições e 47 exclusões
  1. 2 30
      cpn/client_tcp.go
  2. 86 10
      cpn/client_ws.go
  3. 4 1
      cpn/hub.go
  4. 1 0
      ihub.go
  5. 11 6
      sample/tcpws/echo_client.go

+ 2 - 30
cpn/client_tcp.go

@@ -1,37 +1,9 @@
 package cpn
 
 import (
-	"net"
 	"git.wanbits.io/joe/nnet"
 )
 
-type TcpClient struct {
-	*Hub
-	addr string
-}
-
-func NewTcpClient(cf *nnet.HubConfig, cb nnet.ISessionCallback, p nnet.IProtocol) nnet.IHub {
-	return &TcpClient{
-		Hub: newHub(cf, cb, p),
-	}
-}
-
-func (self *TcpClient) NewConnection(addr string, id uint64) error {
-	tcpAddr, err := net.ResolveTCPAddr("tcp", addr)
-	if err != nil {
-		return err
-	}
-	conn, err := net.DialTCP("tcp", nil, tcpAddr)
-	if err != nil {
-		return err
-	}
-
-	self.wg.Add(1)
-	go func() {
-		ses := newSession(TcpConn{conn}, self)
-		ses.UpdateId(id)
-		ses.Do()
-		self.wg.Done()
-	}()
-	return nil
+func NewTcpClient(cf *nnet.HubConfig, cb nnet.ISessionCallback, p nnet.IProtocol, opts ...ClientOption) nnet.IHub {
+	return NewClient(TCP, cf, cb, p, opts...)
 }

+ 86 - 10
cpn/client_ws.go

@@ -1,25 +1,81 @@
 package cpn
 
 import (
-	"github.com/gorilla/websocket"
 	"git.wanbits.io/joe/nnet"
+	"github.com/gorilla/websocket"
+	"net"
+	"time"
+)
+
+const (
+	WS  = 1
+	TCP = 2
 )
 
-//
-type WsClient struct {
+type ClientOption func(*Client)
+type connector func(string, uint64) error
+
+func WithReconn(interval time.Duration) ClientOption {
+	return func(c *Client) {
+		c.reconnInterval = interval
+	}
+}
+
+type Client struct {
 	*Hub
-	addr string
-	pos  int //
+	addr           string
+	id             uint64
+	ctr            connector
+	reconnInterval time.Duration
 }
 
-func NewWsClient(cf *nnet.HubConfig, cb nnet.ISessionCallback, p nnet.IProtocol) nnet.IHub {
-	return &WsClient{
-		Hub: newHub(cf, cb, p),
-		pos: 0,
+func NewClient(protocol int, cf *nnet.HubConfig, cb nnet.ISessionCallback, p nnet.IProtocol, opts ...ClientOption) nnet.IHub {
+	c := &Client{
+		Hub:            newHub(cf, cb, p),
+		reconnInterval: 0,
 	}
+
+	for _, opt := range opts {
+		opt(c)
+	}
+
+	c.ctr = c.connectTcp
+	if protocol == WS {
+		c.ctr = c.connectWs
+	}
+
+	return c
+}
+
+func NewWsClient(cf *nnet.HubConfig, cb nnet.ISessionCallback, p nnet.IProtocol, opts ...ClientOption) nnet.IHub {
+	return NewClient(WS, cf, cb, p, opts...)
+}
+
+func (self *Client) NewConnection(addr string, id uint64) error {
+	self.addr, self.id = addr, id
+
+	if self.reconnInterval == 0 {
+		return self.ctr(addr, id)
+	}
+
+	self.StartReconn()
+	return nil
 }
 
-func (self *WsClient) NewConnection(addr string, id uint64) error {
+func (self *Client) StartReconn() {
+	go func() {
+		var err error
+		for {
+			err = self.ctr(self.addr, self.id)
+			if err == nil {
+				return
+			}
+			time.Sleep(self.reconnInterval)
+		}
+	}()
+}
+
+func (self *Client) connectWs(addr string, id uint64) error {
 	conn, _, err := websocket.DefaultDialer.Dial(addr, nil)
 	if err != nil {
 		return err
@@ -34,3 +90,23 @@ func (self *WsClient) NewConnection(addr string, id uint64) error {
 
 	return nil
 }
+
+func (self *Client) connectTcp(addr string, id uint64) error {
+	tcpAddr, err := net.ResolveTCPAddr("tcp", addr)
+	if err != nil {
+		return err
+	}
+	conn, err := net.DialTCP("tcp", nil, tcpAddr)
+	if err != nil {
+		return err
+	}
+
+	self.wg.Add(1)
+	go func() {
+		ses := newSession(TcpConn{conn}, self)
+		ses.UpdateId(id)
+		ses.Do()
+		self.wg.Done()
+	}()
+	return nil
+}

+ 4 - 1
cpn/hub.go

@@ -1,8 +1,8 @@
 package cpn
 
 import (
-	"math/rand"
 	"git.wanbits.io/joe/nnet"
+	"math/rand"
 	"sync"
 	"time"
 )
@@ -136,6 +136,9 @@ func (self *Hub) NewConnection(string, uint64) error {
 	return nil
 }
 
+func (self *Hub) StartReconn() {
+}
+
 func (self *Hub) Start() error {
 	return nil
 }

+ 1 - 0
ihub.go

@@ -43,6 +43,7 @@ type IHub interface {
 	DoJob(int)    // not used now
 
 	NewConnection(string, uint64) error
+	StartReconn()
 
 	PutSession(uint64, ISession) error // session management base on id
 	DelSession(uint64) error

+ 11 - 6
sample/tcpws/echo_client.go

@@ -3,19 +3,24 @@ package main
 import (
 	"bufio"
 	"fmt"
-	"net/url"
 	"git.wanbits.io/joe/nnet"
 	"git.wanbits.io/joe/nnet/cpn"
+	"net/url"
 	"os"
 	"strings"
 	"time"
 )
 
+var (
+	clt nnet.IHub
+)
+
 type ClientSessionCb struct {
 }
 
-func (self *ClientSessionCb) OnClosed(nnet.ISession, int32) {
-
+func (self *ClientSessionCb) OnClosed(ses nnet.ISession, reason int32) {
+	fmt.Fprintf(os.Stdout, "lost connection %v, reason:%v\n", ses.GetRawConn().RemoteAddr(), reason)
+	clt.StartReconn()
 }
 
 func (self *ClientSessionCb) OnConnected(ses nnet.ISession) (bool, int32) {
@@ -43,7 +48,7 @@ func (self *ClientSessionCb) OnHeartbeat(ses nnet.ISession) bool {
 
 func start_tcp_client() {
 	__start_client(func() nnet.IHub {
-		clt := cpn.NewTcpClient(&nnet.DefHubConfig, &ClientSessionCb{}, &TcpProtocol{})
+		clt := cpn.NewTcpClient(&nnet.DefHubConfig, &ClientSessionCb{}, &TcpProtocol{}, cpn.WithReconn(time.Second))
 		err := clt.NewConnection(SERVER_ADDR, 5)
 		if err != nil {
 			panic(err)
@@ -54,7 +59,7 @@ func start_tcp_client() {
 
 func start_ws_client() {
 	__start_client(func() nnet.IHub {
-		clt := cpn.NewWsClient(&nnet.DefHubConfig, &ClientSessionCb{}, &WsProtocol{})
+		clt := cpn.NewWsClient(&nnet.DefHubConfig, &ClientSessionCb{}, &WsProtocol{}, cpn.WithReconn(time.Second))
 		u := url.URL{
 			Scheme: "ws",
 			Host:   SERVER_ADDR,
@@ -69,7 +74,7 @@ func start_ws_client() {
 }
 
 func __start_client(fn func() nnet.IHub) {
-	clt := fn()
+	clt = fn()
 	reader := bufio.NewReader(os.Stdin)
 	fmt.Println("Type what you want to send to server:")
 	for {