Browse Source

fix: support client-side reconn dynamically

joe 4 years ago
parent
commit
cd79c7aa2d
6 changed files with 33 additions and 18 deletions
  1. 9 8
      cpn/client_ws.go
  2. 1 1
      cpn/hub.go
  3. 15 4
      cpn/session.go
  4. 1 1
      ihub.go
  5. 1 0
      isession.go
  6. 6 4
      sample/tcpws/echo_client.go

+ 9 - 8
cpn/client_ws.go

@@ -23,8 +23,6 @@ func WithReconn(interval time.Duration) ClientOption {
 
 type Client struct {
 	*Hub
-	addr           string
-	id             uint64
 	ctr            connector
 	reconnInterval time.Duration
 }
@@ -40,6 +38,7 @@ func NewClient(protocol int, cf *nnet.HubConfig, cb nnet.ISessionCallback, p nne
 	}
 
 	c.ctr = c.connectTcp
+
 	if protocol == WS {
 		c.ctr = c.connectWs
 	}
@@ -52,21 +51,23 @@ func NewWsClient(cf *nnet.HubConfig, cb nnet.ISessionCallback, p nnet.IProtocol,
 }
 
 func (self *Client) NewConnection(addr string, id uint64) error {
-	self.addr, self.id = addr, id
-
 	if self.reconnInterval == 0 {
 		return self.ctr(addr, id)
 	}
 
-	self.StartReconn()
+	self.StartReconn(addr, id)
 	return nil
 }
 
-func (self *Client) StartReconn() {
+func (self *Client) StartReconn(addr string, id uint64) {
+	if self.reconnInterval == 0 {
+		return
+	}
+
 	go func() {
 		var err error
 		for {
-			err = self.ctr(self.addr, self.id)
+			err = self.ctr(addr, id)
 			if err == nil {
 				return
 			}
@@ -103,7 +104,7 @@ func (self *Client) connectTcp(addr string, id uint64) error {
 
 	self.wg.Add(1)
 	go func() {
-		ses := newSession(TcpConn{conn}, self)
+		ses := newSession(TcpConn{conn}, self, addr)
 		ses.UpdateId(id)
 		ses.Do()
 		self.wg.Done()

+ 1 - 1
cpn/hub.go

@@ -136,7 +136,7 @@ func (self *Hub) NewConnection(string, uint64) error {
 	return nil
 }
 
-func (self *Hub) StartReconn() {
+func (self *Hub) StartReconn(string, uint64) {
 }
 
 func (self *Hub) Start() error {

+ 15 - 4
cpn/session.go

@@ -1,8 +1,8 @@
 package cpn
 
 import (
-	"net"
 	"git.wanbits.io/joe/nnet"
+	"net"
 	"sync"
 	"sync/atomic"
 	"time"
@@ -29,16 +29,21 @@ type Session struct {
 	chClose   chan struct{}
 	chSend    chan nnet.IPacket
 	chRecv    chan nnet.IPacket
+	addr      string
 }
 
-func newSession(conn nnet.IConn, h nnet.IHub) *Session {
-	return &Session{
+func newSession(conn nnet.IConn, h nnet.IHub, params ...string) *Session {
+	s := &Session{
 		hub:     h,
 		conn:    conn,
 		chClose: make(chan struct{}),
 		chSend:  make(chan nnet.IPacket, h.Conf().SizeOfSendChan),
 		chRecv:  make(chan nnet.IPacket, h.Conf().SizeOfRecvChan),
 	}
+	if len(params) > 0 {
+		s.addr = params[0]
+	}
+	return s
 }
 
 func (self *Session) GetData() interface{} {
@@ -65,6 +70,11 @@ func (self *Session) SetId(id uint64) {
 	self.id = id
 }
 
+// used client-side session for re-connect
+func (self *Session) ServerAddr() string {
+	return self.addr
+}
+
 func (self *Session) Do() {
 	suc, reason := self.hub.Callback().OnConnected(self)
 	if !suc {
@@ -91,10 +101,11 @@ func (self *Session) close(reason int32) {
 		close(self.chRecv)
 
 		self.conn.Close()
-
 		self.hub.DelSession(self.id)
 
 		self.hub.Callback().OnClosed(self, reason)
+
+		//self.hub.StartReconn(self.addr, self.id)
 	})
 }
 

+ 1 - 1
ihub.go

@@ -43,7 +43,7 @@ type IHub interface {
 	DoJob(int)    // not used now
 
 	NewConnection(string, uint64) error
-	StartReconn()
+	StartReconn(string, uint64)
 
 	PutSession(uint64, ISession) error // session management base on id
 	DelSession(uint64) error

+ 1 - 0
isession.go

@@ -25,6 +25,7 @@ type ISession interface {
 	Id() uint64      // get session id
 	SetId(uint64)
 
+	ServerAddr() string
 	GetRawConn() IConn
 }
 

+ 6 - 4
sample/tcpws/echo_client.go

@@ -19,11 +19,13 @@ type ClientSessionCb struct {
 }
 
 func (self *ClientSessionCb) OnClosed(ses nnet.ISession, reason int32) {
-	fmt.Fprintf(os.Stdout, "lost connection %v, reason:%v\n", ses.GetRawConn().RemoteAddr(), reason)
-	clt.StartReconn()
+	fmt.Printf("session num: %d\n", clt.GetSessionNum())
+	fmt.Fprintf(os.Stdout, "lost connection %v, id:%v, reason:%v\n", ses.ServerAddr(), ses.Id(), reason)
+	clt.StartReconn(ses.ServerAddr(), ses.Id())
 }
 
 func (self *ClientSessionCb) OnConnected(ses nnet.ISession) (bool, int32) {
+	fmt.Printf("connected to %s\n", ses.ServerAddr())
 	return true, 0
 }
 
@@ -48,7 +50,7 @@ func (self *ClientSessionCb) OnHeartbeat(ses nnet.ISession) bool {
 
 func start_tcp_client() {
 	__start_client(func() nnet.IHub {
-		clt := cpn.NewTcpClient(&nnet.DefHubConfig, &ClientSessionCb{}, &TcpProtocol{}, cpn.WithReconn(time.Second))
+		clt := cpn.NewTcpClient(&nnet.DefHubConfig, &ClientSessionCb{}, &TcpProtocol{}, cpn.WithReconn(3*time.Second))
 		err := clt.NewConnection(SERVER_ADDR, 5)
 		if err != nil {
 			panic(err)
@@ -59,7 +61,7 @@ func start_tcp_client() {
 
 func start_ws_client() {
 	__start_client(func() nnet.IHub {
-		clt := cpn.NewWsClient(&nnet.DefHubConfig, &ClientSessionCb{}, &WsProtocol{}, cpn.WithReconn(time.Second))
+		clt := cpn.NewWsClient(&nnet.DefHubConfig, &ClientSessionCb{}, &WsProtocol{}, cpn.WithReconn(3*time.Second))
 		u := url.URL{
 			Scheme: "ws",
 			Host:   SERVER_ADDR,