| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270 |
- package internal
- import (
- "net"
- "one.com/nnet"
- "sync"
- "sync/atomic"
- "time"
- )
- // 关闭原因
- const (
- // Close Reason 是一个 int32 型数据,这是系统预置的几个代码
- CLOSE_REASON_READ = 0
- CLOSE_REASON_WRITE = 0
- CLOSE_REASON_PROTOCOL = 1
- CLOSE_REASON_READTIMEOUT = 4 // HEARTBEAT
- CLOSE_REASON_SERVER_CLOSED = 16 // 本服务器关闭
- )
- // 长连接
- type Session struct {
- id uint64
- hub nnet.IHub
- conn nnet.IConn
- extraData interface{}
- once sync.Once // Close once
- closed int32 // session 是否关闭
- frozen int32 // is session frozen. when no responding, no receiving, but can send.
- chClose chan struct{}
- chSend chan nnet.IPacket
- chRecv chan nnet.IPacket
- }
- func newSession(conn nnet.IConn, h nnet.IHub) *Session {
- return &Session{
- hub: h,
- conn: conn,
- chClose: make(chan struct{}),
- chSend: make(chan nnet.IPacket, h.Conf().SizeOfSendChan),
- chRecv: make(chan nnet.IPacket, h.Conf().SizeOfRecvChan),
- }
- }
- func (self *Session) GetData() interface{} {
- return self.extraData
- }
- func (self *Session) SetData(data interface{}) {
- self.extraData = data
- }
- func (self *Session) GetRawConn() nnet.IConn {
- return self.conn
- }
- func (self *Session) UpdateId(id uint64) {
- self.id = id
- self.hub.PutSession(id, self)
- }
- func (self *Session) Id() uint64 {
- return self.id
- }
- func (self *Session) SetId(id uint64) {
- self.id = id
- }
- func (self *Session) Do() {
- suc, reason := self.hub.Callback().OnConnected(self)
- if !suc {
- //TODO: 这里不 Close 资源能释放吗?
- self.Close(reason)
- return
- }
- asyncDo(self.loopHandle, self.hub.Wg())
- asyncDo(self.loopWrite, self.hub.Wg())
- asyncDo(self.loopRead, self.hub.Wg())
- }
- func (self *Session) Close(reason int32) {
- self.close(reason)
- }
- func (self *Session) close(reason int32) {
- self.once.Do(func() {
- atomic.StoreInt32(&self.closed, 1)
- close(self.chClose)
- close(self.chSend)
- close(self.chRecv)
- self.conn.Close()
- self.hub.DelSession(self.id)
- self.hub.Callback().OnClosed(self, reason)
- })
- }
- func (self *Session) SetFrozen(frozen int32) {
- atomic.StoreInt32(&self.frozen, frozen)
- }
- func (self *Session) IsClosed() bool {
- return atomic.LoadInt32(&self.closed) != 0
- }
- func (self *Session) Write(pkt nnet.IPacket, timeout time.Duration) error {
- if self.IsClosed() {
- return nnet.ErrConnClosing
- }
- if timeout > 0 {
- _ = self.conn.SetWriteDeadline(time.Now().Add(timeout))
- }
- data, err := self.hub.Protocol().Serialize(pkt)
- if err != nil {
- return err
- }
- _, err = self.conn.Write(data)
- return err
- }
- // public 异步写入
- func (self *Session) AWrite(pkt nnet.IPacket, timeout time.Duration) (err error) {
- if self.IsClosed() {
- return nnet.ErrConnClosing
- }
- defer func() {
- if e := recover(); e != nil {
- err = nnet.ErrConnClosing
- }
- }()
- if timeout == 0 {
- select {
- case self.chSend <- pkt:
- return nil
- default:
- return nnet.ErrWriteBlocking
- }
- } else {
- select {
- case self.chSend <- pkt:
- return nil
- case <-self.chClose:
- return nnet.ErrConnClosing
- case <-time.After(timeout):
- return nnet.ErrWriteBlocking
- }
- }
- }
- // 循环从 socket 读取数据,置入 chRecv 通道
- func (self *Session) loopRead() {
- var reason int32 = 0
- defer func() {
- self.close(reason)
- }()
- for {
- select {
- case <-self.hub.ChQuit():
- reason = CLOSE_REASON_SERVER_CLOSED
- return
- case <-self.chClose:
- return
- default:
- }
- if self.hub.Conf().ReadTimeout > 0 {
- self.conn.SetReadDeadline(time.Now().Add(self.hub.Conf().ReadTimeout))
- }
- pkt, err := self.hub.Protocol().ReadPacket(self.conn)
- if err != nil {
- if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
- reason = CLOSE_REASON_READTIMEOUT
- } else if opErr, ok := err.(*net.OpError); ok && opErr.Timeout() {
- reason = CLOSE_REASON_READTIMEOUT
- } else {
- reason = CLOSE_REASON_READ
- }
- //
- return
- }
- if self.frozen == 0 {
- self.chRecv <- pkt
- }
- }
- }
- // 循环从 cbSend 通道读取数据,发送到 socket
- func (self *Session) loopWrite() {
- var reason int32 = 0
- defer func() {
- //fmt.Println(self.id)
- self.close(reason)
- }()
- ticker := time.NewTicker(self.hub.Conf().Tick)
- for {
- select {
- case <-self.hub.ChQuit():
- reason = CLOSE_REASON_SERVER_CLOSED
- return
- case <-self.chClose:
- return
- case <-ticker.C:
- self.hub.Callback().OnHeartbeat(self)
- case pkt := <-self.chSend:
- if self.IsClosed() {
- return
- }
- data, err := self.hub.Protocol().Serialize(pkt)
- if err != nil {
- continue
- }
- _ = self.conn.SetWriteDeadline(time.Now().Add(self.hub.Conf().Timeout))
- _, err = self.conn.Write(data)
- if err != nil {
- reason = CLOSE_REASON_WRITE
- return
- }
- yes, rsn := pkt.ShouldClose()
- if yes {
- reason = rsn
- return
- }
- }
- }
- }
- // 循环从 chRecv 读取解析好的数据,回调到实现层
- func (self *Session) loopHandle() {
- var reason int32 = 0
- defer func() {
- self.close(reason)
- }()
- for {
- select {
- case <-self.hub.ChQuit():
- reason = CLOSE_REASON_SERVER_CLOSED
- return
- case <-self.chClose:
- return
- case pkt := <-self.chRecv:
- if self.IsClosed() {
- return
- }
- if !self.hub.Callback().OnMessage(self, pkt) {
- reason = CLOSE_REASON_PROTOCOL
- return
- }
- }
- }
- }
- func asyncDo(fn func(), wg *sync.WaitGroup) {
- wg.Add(1)
- go func() {
- fn()
- wg.Done()
- }()
- }
|