joe il y a 4 ans
commit
83105f89a2
16 fichiers modifiés avec 798 ajouts et 0 suppressions
  1. 2 0
      .gitignore
  2. 18 0
      errs.go
  3. 8 0
      go.mod
  4. 4 0
      go.sum
  5. 15 0
      iconn.go
  6. 43 0
      ihub.go
  7. 1 0
      internal/client_tcp.go
  8. 58 0
      internal/client_ws.go
  9. 11 0
      internal/conn_tcp.go
  10. 33 0
      internal/conn_ws.go
  11. 134 0
      internal/hub.go
  12. 58 0
      internal/server_tcp.go
  13. 100 0
      internal/server_ws.go
  14. 259 0
      internal/session.go
  15. 20 0
      iprotocol.go
  16. 34 0
      isession.go

+ 2 - 0
.gitignore

@@ -0,0 +1,2 @@
+.idea/
+.vscode/

+ 18 - 0
errs.go

@@ -0,0 +1,18 @@
+package nnet
+
+import (
+"errors"
+)
+
+var (
+	ErrExistsAlready          = errors.New("item already exists")
+	ErrNotExists              = errors.New("item not exists")
+	ErrConnectionReject       = errors.New("connection rejected by logic")
+	ErrConnClosing            = errors.New("use of closed network connection")
+	ErrWriteBlocking          = errors.New("write packet was blocking")
+	ErrReadBlocking           = errors.New("read packet was blocking")
+	ErrEmptySlice             = errors.New("the slice is empty")
+	ErrSliceOutOfRange        = errors.New("the slice is out of range")
+	ErrBufferSizeInsufficient = errors.New("buffer size is too small")
+)
+

+ 8 - 0
go.mod

@@ -0,0 +1,8 @@
+module one.com/nnet
+
+go 1.15
+
+require (
+	github.com/gorilla/mux v1.8.0 // indirect
+	github.com/gorilla/websocket v1.4.2 // indirect
+)

+ 4 - 0
go.sum

@@ -0,0 +1,4 @@
+github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
+github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
+github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
+github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=

+ 15 - 0
iconn.go

@@ -0,0 +1,15 @@
+package nnet
+
+import (
+"io"
+"net"
+"time"
+)
+
+type IConn interface {
+	io.ReadWriteCloser
+	SetReadDeadline(time.Time) error
+	SetWriteDeadline(time.Time) error
+	RemoteAddr() net.Addr
+	LocalAddr() net.Addr
+}

+ 43 - 0
ihub.go

@@ -0,0 +1,43 @@
+package nnet
+
+import (
+	"sync"
+	"time"
+)
+
+var (
+// Hub 状态
+)
+
+type HubConfig struct {
+	SizeOfSendChan uint32
+	SizeOfRecvChan uint32
+	ReadBufSize    int
+	WriteBufSize   int
+	Timeout        time.Duration // 发送等超时
+	Tick           time.Duration // 定时回调
+	ReadTimeout    time.Duration // 讀超時,如果為0,則無限等待。超時到達,意味著客戶端心跳丟失
+}
+
+type IHub interface {
+	Lock() // support locker semantics
+	Unlock()
+
+	Wg() *sync.WaitGroup        // object
+	ChQuit() <-chan struct{}    // 返回一个通道,用于退出 hub 循环
+	Conf() *HubConfig           // 返回配置信息
+	Callback() ISessionCallback // 返回回调对象
+	Protocol() IProtocol        // 返回数据协议
+
+	Start() error // 启动 hub
+	Stop() error  // 停止 hub
+	DoJob(int)    // 执行 hub 中其他任务
+
+	PutSession(uint64, ISession) error // session 管理,这里的 session 必须基于 id
+	DelSession(uint64) error
+	GetSession(uint64) (ISession, error)
+	PeekSession(uint64) (ISession, error)
+	RandSession() (ISession, error)
+	GetSessionNum() int
+	GetAllSessions() map[uint64]ISession
+}

+ 1 - 0
internal/client_tcp.go

@@ -0,0 +1 @@
+package internal

+ 58 - 0
internal/client_ws.go

@@ -0,0 +1,58 @@
+package internal
+
+
+import (
+	//	"math/rand"
+	//	"time"
+
+	"github.com/gorilla/websocket"
+	"one.com/nnet"
+)
+
+// 客户端组
+type WsClient struct {
+	*Hub
+	addr 	string
+	pos   int // 指示当前连接第几个 addr
+}
+
+func NewWsClient(cf *nnet.HubConfig, cb nnet.ISessionCallback, p nnet.IProtocol, addr string) *WsClient {
+	return &WsClient{
+		Hub:   newHub(cf, cb, p),
+		addr: addr,
+		pos:   0,
+	}
+}
+
+func (self *WsClient) Start() error {
+	conn, _, err := websocket.DefaultDialer.Dial(self.addr, nil)
+	if err != nil {
+		return err
+	}
+	self.wg.Add(1)
+	go func() {
+		ses := newSession(NewWsConn(conn), self)
+		ses.Do()
+		self.wg.Done()
+	}()
+
+	return nil
+}
+
+func (self *WsClient) DoJob(int) {
+
+}
+
+//func (self *WsClient) ConnectRand(addrs []string) error {
+//	src := rand.NewSource(time.Now().UnixNano())
+//	rnd := rand.New(src)
+//	n := rnd.Intn(len(addrs))
+//	addr := addrs[n]
+//	return self.Connect(1, addr)
+//}
+
+func (self *WsClient) Stop() error {
+	close(self.chQuit)
+	self.wg.Wait()
+	return nil
+}

+ 11 - 0
internal/conn_tcp.go

@@ -0,0 +1,11 @@
+package internal
+
+
+import (
+	"net"
+)
+
+type TcpConn struct {
+	*net.TCPConn
+}
+

+ 33 - 0
internal/conn_ws.go

@@ -0,0 +1,33 @@
+package internal
+
+import (
+//	"fmt"
+
+"github.com/gorilla/websocket"
+)
+
+type WsConn struct {
+	*websocket.Conn
+}
+
+func NewWsConn(c *websocket.Conn) *WsConn {
+	return &WsConn{c}
+}
+
+func (self *WsConn) Read(p []byte) (int, error) {
+	_, r, err := self.NextReader()
+	if nil != err {
+		return 0, err
+	}
+	return r.Read(p)
+}
+
+func (self *WsConn) Write(p []byte) (int, error) {
+	w, err := self.NextWriter(websocket.BinaryMessage)
+	if nil != err {
+		return 0, err
+	}
+	n, err := w.Write(p)
+	_ = w.Close()
+	return n, err
+}

+ 134 - 0
internal/hub.go

@@ -0,0 +1,134 @@
+package internal
+
+import (
+	"math/rand"
+	"one.com/nnet"
+	"sync"
+	"time"
+)
+
+type Hub struct {
+	sync.Mutex
+	conf   *nnet.HubConfig
+	cbSes  nnet.ISessionCallback
+	prot   nnet.IProtocol
+	chQuit chan struct{}
+	wg     *sync.WaitGroup
+
+	sess map[uint64]nnet.ISession
+}
+
+func newHub(cf *nnet.HubConfig, cb nnet.ISessionCallback, p nnet.IProtocol) *Hub {
+	return &Hub{
+		conf:   cf,
+		cbSes:  cb,
+		prot:   p,
+		chQuit: make(chan struct{}),
+		wg:     &sync.WaitGroup{},
+		sess:   make(map[uint64]nnet.ISession),
+	}
+}
+
+func (self *Hub) Wg() *sync.WaitGroup {
+	return self.wg
+}
+
+func (self *Hub) ChQuit() <-chan struct{} {
+	return self.chQuit
+}
+
+func (self *Hub) Conf() *nnet.HubConfig {
+	return self.conf
+}
+
+func (self *Hub) Callback() nnet.ISessionCallback {
+	return self.cbSes
+}
+
+func (self *Hub) Protocol() nnet.IProtocol {
+	return self.prot
+}
+
+func (self *Hub) PutSession(id uint64, ses nnet.ISession) error {
+	self.Lock()
+	//@Notice: 顶替
+	self.sess[id] = ses
+	self.Unlock()
+	return nil
+}
+
+func (self *Hub) DelSession(id uint64) error {
+	self.Lock()
+	defer self.Unlock()
+
+	if _, ok := self.sess[id]; !ok {
+		return nnet.ErrNotExists
+	}
+	delete(self.sess, id)
+	return nil
+}
+
+func (self *Hub) PeekSession(id uint64) (nnet.ISession, error) {
+	self.Lock()
+	defer self.Unlock()
+
+	s, ok := self.sess[id]
+	if !ok {
+		return nil, nnet.ErrNotExists
+	}
+
+	delete(self.sess, id)
+
+	return s, nil
+}
+
+func (self *Hub) GetSession(id uint64) (nnet.ISession, error) {
+	self.Lock()
+	defer self.Unlock()
+
+	s, ok := self.sess[id]
+	if !ok {
+		return nil, nnet.ErrNotExists
+	}
+	return s, nil
+}
+
+func (self *Hub) GetAllSessions() map[uint64]nnet.ISession {
+	self.Lock()
+	defer self.Unlock()
+
+	return self.sess
+}
+
+var (
+	s = rand.NewSource(time.Now().UnixNano())
+	r = rand.New(s)
+)
+
+func Intn(a int) int {
+	return r.Intn(a)
+}
+
+func (self *Hub) RandSession() (nnet.ISession, error) {
+	self.Lock()
+	defer self.Unlock()
+
+	sz := len(self.sess)
+
+	sel := Intn(sz)
+	counter := 0
+	for _, ses := range self.sess {
+		if counter == sel {
+			return ses, nil
+		}
+		counter += 1
+	}
+	return nil, nnet.ErrNotExists
+}
+
+func (self *Hub) GetSessionNum() int {
+	self.Lock()
+	defer self.Unlock()
+	return len(self.sess)
+}
+

+ 58 - 0
internal/server_tcp.go

@@ -0,0 +1,58 @@
+package internal
+
+
+import (
+	"net"
+	"one.com/nnet"
+	"time"
+)
+
+type TcpServer struct {
+	*Hub
+	listener *net.TCPListener
+}
+
+func NewTcpServer(cf *nnet.HubConfig, cb nnet.ISessionCallback, p nnet.IProtocol, ls *net.TCPListener) *TcpServer {
+	return &TcpServer{
+		Hub:      newHub(cf, cb, p),
+		listener: ls,
+	}
+}
+
+func (self *TcpServer) Start() error {
+	self.wg.Add(1)
+	defer func() {
+		self.listener.Close()
+		self.wg.Done()
+	}()
+
+	for {
+		select {
+		case <-self.chQuit:
+			return nil
+		default:
+		}
+		self.listener.SetDeadline(time.Now().Add(self.conf.Timeout))
+
+		conn, err := self.listener.AcceptTCP()
+		if err != nil {
+			continue
+		}
+		self.wg.Add(1)
+		go func() {
+			ses := newSession(conn, self)
+			ses.Do()
+			self.wg.Done()
+		}()
+	}
+	return nil
+}
+
+func (self *TcpServer) DoJob(int) {
+
+}
+func (self *TcpServer) Stop() error {
+	close(self.chQuit)
+	self.wg.Wait() //保证 Start 完全退出
+	return nil
+}

+ 100 - 0
internal/server_ws.go

@@ -0,0 +1,100 @@
+package internal
+
+
+import (
+	"net/http"
+
+	"github.com/gorilla/mux"
+	"github.com/gorilla/websocket"
+	"one.com/nnet"
+)
+
+var (
+//	upgrader = websocket.Upgrader{
+//		ReadBufferSize:  4096,
+//		WriteBufferSize: 4096,
+//		CheckOrigin: func(r *http.Request) bool {
+//			return true
+//		},
+//	}
+)
+
+type CallbackWsPath func(http.ResponseWriter, *http.Request)
+
+type WsServer struct {
+	*Hub
+	addr     string
+	svr      *http.Server
+	upgrader *websocket.Upgrader
+	routes 	 map[string]CallbackWsPath
+}
+
+func NewWsServer(cf *nnet.HubConfig, cb nnet.ISessionCallback, p nnet.IProtocol,
+	addr string, routes map[string]CallbackWsPath) *WsServer {
+	return &WsServer{
+		Hub:  newHub(cf, cb, p),
+		addr: addr,
+		upgrader: &websocket.Upgrader{
+			ReadBufferSize:  cf.ReadBufSize,
+			WriteBufferSize: cf.WriteBufSize,
+			CheckOrigin: func(r *http.Request) bool {
+				return true
+			},
+		},
+		routes:routes,
+	}
+}
+
+func (self *WsServer) Start() error {
+	router := mux.NewRouter()
+	for k, v := range self.routes {
+		router.HandleFunc(k, v)
+	}
+	router.HandleFunc("/", self.do_homepage)
+	router.HandleFunc("/kg", func(w http.ResponseWriter, r *http.Request) {
+		self.do_new_session(w, r)
+	})
+	self.svr = &http.Server{
+		Addr:    self.addr,
+		Handler: router,
+	}
+	err := self.svr.ListenAndServe()
+	return err
+}
+
+func (self *WsServer) do_homepage(w http.ResponseWriter, r *http.Request) {
+	if r.URL.Path != "/" {
+		http.Error(w, "Not found", http.StatusNotFound)
+		return
+	}
+	if r.Method != "GET" {
+		http.Error(w, "Method now allowed", http.StatusMethodNotAllowed)
+		return
+	}
+	w.Write([]byte("welcome"))
+}
+
+func (self *WsServer) do_new_session(w http.ResponseWriter, r *http.Request) {
+	conn, err := self.upgrader.Upgrade(w, r, nil)
+	if nil != err {
+		return
+	}
+	self.wg.Add(1)
+	go func() {
+		ses := newSession(NewWsConn(conn), self)
+		ses.Do()
+		self.wg.Done()
+	}()
+}
+
+func (self *WsServer) Stop() error {
+	self.svr.Close()
+	close(self.chQuit)
+	self.wg.Wait()
+	return nil
+}
+
+func (self *WsServer) DoJob(int) {
+
+}
+

+ 259 - 0
internal/session.go

@@ -0,0 +1,259 @@
+package internal
+
+
+import (
+	"net"
+	"sync"
+	"sync/atomic"
+	"time"
+	"one.com/nnet"
+)
+
+// 关闭原因
+const (
+	// Close Reason 是一个 int32 型数据,这是系统预置的几个代码
+	CLOSE_REASON_READ          = 0
+	CLOSE_REASON_WRITE         = 0
+	CLOSE_REASON_PROTOCOL      = 1
+	CLOSE_REASON_READTIMEOUT   = 4  // HEARTBEAT
+	CLOSE_REASON_SERVER_CLOSED = 16 // 本服务器关闭
+)
+
+// 长连接
+type Session struct {
+	id        uint64
+	hub       nnet.IHub
+	conn      nnet.IConn
+	extraData interface{}
+	once      sync.Once // Close once
+	closed    int32     // session 是否关闭
+	chClose   chan struct{}
+	chSend    chan nnet.IPacket
+	chRecv    chan nnet.IPacket
+}
+
+func newSession(conn nnet.IConn, h nnet.IHub) *Session {
+	return &Session{
+		hub:     h,
+		conn:    conn,
+		chClose: make(chan struct{}),
+		chSend:  make(chan nnet.IPacket, h.Conf().SizeOfSendChan),
+		chRecv:  make(chan nnet.IPacket, h.Conf().SizeOfRecvChan),
+	}
+}
+
+func (self *Session) GetData() interface{} {
+	return self.extraData
+}
+
+func (self *Session) SetData(data interface{}) {
+	self.extraData = data
+}
+
+func (self *Session) GetRawConn() nnet.IConn {
+	return self.conn
+}
+
+func (self *Session) UpdateId(id uint64) {
+	self.id = id
+	self.hub.PutSession(id, self)
+}
+func (self *Session) Id() uint64 {
+	return self.id
+}
+
+func (self *Session) SetId(id uint64) {
+	self.id = id
+}
+
+func (self *Session) Do() {
+	suc, reason := self.hub.Callback().OnConnected(self)
+	if !suc {
+		//TODO: 这里不 Close 资源能释放吗?
+		self.Close(reason)
+		return
+	}
+
+	asyncDo(self.loopHandle, self.hub.Wg())
+	asyncDo(self.loopWrite, self.hub.Wg())
+	asyncDo(self.loopRead, self.hub.Wg())
+}
+
+func (self *Session) Close(reason int32) {
+	self.close(reason)
+}
+
+func (self *Session) close(reason int32) {
+	self.once.Do(func() {
+		atomic.StoreInt32(&self.closed, 1)
+
+		close(self.chClose)
+		close(self.chSend)
+		close(self.chRecv)
+
+		self.conn.Close()
+
+		self.hub.DelSession(self.id)
+
+		self.hub.Callback().OnClosed(self, reason)
+	})
+}
+
+func (self *Session) IsClosed() bool {
+	return atomic.LoadInt32(&self.closed) != 0
+}
+
+func (self *Session) Write(pkt nnet.IPacket, timeout time.Duration) error {
+	if self.IsClosed() {
+		return nnet.ErrConnClosing
+	}
+	if timeout > 0 {
+		_ = self.conn.SetWriteDeadline(time.Now().Add(timeout))
+	}
+	_, err := self.conn.Write(pkt.Serialize())
+	return err
+}
+
+// public 异步写入
+func (self *Session) AWrite(pkt nnet.IPacket, timeout time.Duration) (err error) {
+	if self.IsClosed() {
+		return nnet.ErrConnClosing
+	}
+
+	defer func() {
+		if e := recover(); e != nil {
+			err = nnet.ErrConnClosing
+		}
+	}()
+
+	if timeout == 0 {
+		select {
+		case self.chSend <- pkt:
+			return nil
+		default:
+			return nnet.ErrWriteBlocking
+		}
+	} else {
+		select {
+		case self.chSend <- pkt:
+			return nil
+		case <-self.chClose:
+			return nnet.ErrConnClosing
+		case <-time.After(timeout):
+			return nnet.ErrWriteBlocking
+		}
+	}
+}
+
+// 循环从 socket 读取数据,置入 chRecv 通道
+func (self *Session) loopRead() {
+	var reason int32 = 0
+
+	defer func() {
+		self.close(reason)
+	}()
+
+	for {
+		select {
+		case <-self.hub.ChQuit():
+			reason = CLOSE_REASON_SERVER_CLOSED
+			return
+		case <-self.chClose:
+			return
+		default:
+		}
+		if self.hub.Conf().ReadTimeout > 0 {
+			self.conn.SetReadDeadline(time.Now().Add(self.hub.Conf().ReadTimeout))
+		}
+		pkt, err := self.hub.Protocol().ReadPacket(self.conn)
+		if err != nil {
+			if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
+				reason = CLOSE_REASON_READTIMEOUT
+			} else if opErr, ok := err.(*net.OpError); ok && opErr.Timeout() {
+				reason = CLOSE_REASON_READTIMEOUT
+			} else {
+				reason = CLOSE_REASON_READ
+			}
+			//
+			return
+		}
+		self.chRecv <- pkt
+	}
+}
+
+// 循环从 cbSend 通道读取数据,发送到 socket
+func (self *Session) loopWrite() {
+	var reason int32 = 0
+
+	defer func() {
+		//fmt.Println(self.id)
+		self.close(reason)
+	}()
+
+	ticker := time.NewTicker(self.hub.Conf().Tick)
+	for {
+		select {
+		case <-self.hub.ChQuit():
+			reason = CLOSE_REASON_SERVER_CLOSED
+			return
+		case <-self.chClose:
+			return
+		case <-ticker.C:
+			self.hub.Callback().OnHeartbeat(self)
+		case pkt := <-self.chSend:
+			if self.IsClosed() {
+				return
+			}
+			data := pkt.Serialize()
+			defer pkt.Destroy(data)
+
+			_ = self.conn.SetWriteDeadline(time.Now().Add(self.hub.Conf().Timeout))
+			_, err := self.conn.Write(data)
+			if err != nil {
+				reason = CLOSE_REASON_WRITE
+				return
+			}
+			yes, rsn := pkt.ShouldClose()
+			if yes {
+				reason = rsn
+				return
+			}
+		}
+	}
+}
+
+// 循环从 chRecv 读取解析好的数据,回调到实现层
+func (self *Session) loopHandle() {
+	var reason int32 = 0
+
+	defer func() {
+		self.close(reason)
+	}()
+
+	for {
+		select {
+		case <-self.hub.ChQuit():
+			reason = CLOSE_REASON_SERVER_CLOSED
+			return
+		case <-self.chClose:
+			return
+		case pkt := <-self.chRecv:
+			if self.IsClosed() {
+				return
+			}
+			if !self.hub.Callback().OnMessage(self, pkt) {
+				reason = CLOSE_REASON_PROTOCOL
+				return
+			}
+		}
+	}
+}
+
+func asyncDo(fn func(), wg *sync.WaitGroup) {
+	wg.Add(1)
+	go func() {
+		fn()
+		wg.Done()
+	}()
+}
+

+ 20 - 0
iprotocol.go

@@ -0,0 +1,20 @@
+package nnet
+
+type IPacket interface {
+	// serialize to binary format to be sent
+	Serialize() []byte
+	// free the memory if needs
+	Destroy([]byte)
+	// if need to close socket after sending this packet
+	ShouldClose() (bool, int32)
+}
+
+type IProtocol interface {
+	// parse by raw data to a packet
+	ReadPacket(conn IConn) (IPacket, error)
+}
+
+type IMiddleware interface {
+	BeforeSend([]byte) []byte
+	AfterReceived([]byte) []byte
+}

+ 34 - 0
isession.go

@@ -0,0 +1,34 @@
+package nnet
+
+import "time"
+
+var (
+	DEF_SESSION_ID     uint64 = 0
+	RUBBISH_SESSION_ID uint64 = 1
+)
+
+type ISession interface {
+	Do() // session 开始工作
+
+	Close(int32) // 停止所有工作
+	IsClosed() bool
+
+	Write(IPacket, time.Duration) error
+	AWrite(IPacket, time.Duration) error // 异步发送
+
+	GetData() interface{} // 辅助数据
+	SetData(interface{})
+
+	UpdateId(uint64) //更新ID
+	Id() uint64
+	SetId(uint64)
+
+	GetRawConn() IConn
+}
+
+type ISessionCallback interface {
+	OnClosed(ISession, int32)
+	OnConnected(ISession) (bool, int32)
+	OnMessage(ISession, IPacket) bool
+	OnHeartbeat(ISession) bool
+}