session.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265
  1. package internal
  2. import (
  3. "net"
  4. "sync"
  5. "sync/atomic"
  6. "time"
  7. "one.com/nnet"
  8. )
  9. // 关闭原因
  10. const (
  11. // Close Reason 是一个 int32 型数据,这是系统预置的几个代码
  12. CLOSE_REASON_READ = 0
  13. CLOSE_REASON_WRITE = 0
  14. CLOSE_REASON_PROTOCOL = 1
  15. CLOSE_REASON_READTIMEOUT = 4 // HEARTBEAT
  16. CLOSE_REASON_SERVER_CLOSED = 16 // 本服务器关闭
  17. )
  18. // 长连接
  19. type Session struct {
  20. id uint64
  21. hub nnet.IHub
  22. conn nnet.IConn
  23. extraData interface{}
  24. once sync.Once // Close once
  25. closed int32 // session 是否关闭
  26. chClose chan struct{}
  27. chSend chan nnet.IPacket
  28. chRecv chan nnet.IPacket
  29. }
  30. func newSession(conn nnet.IConn, h nnet.IHub) *Session {
  31. return &Session{
  32. hub: h,
  33. conn: conn,
  34. chClose: make(chan struct{}),
  35. chSend: make(chan nnet.IPacket, h.Conf().SizeOfSendChan),
  36. chRecv: make(chan nnet.IPacket, h.Conf().SizeOfRecvChan),
  37. }
  38. }
  39. func (self *Session) GetData() interface{} {
  40. return self.extraData
  41. }
  42. func (self *Session) SetData(data interface{}) {
  43. self.extraData = data
  44. }
  45. func (self *Session) GetRawConn() nnet.IConn {
  46. return self.conn
  47. }
  48. func (self *Session) UpdateId(id uint64) {
  49. self.id = id
  50. self.hub.PutSession(id, self)
  51. }
  52. func (self *Session) Id() uint64 {
  53. return self.id
  54. }
  55. func (self *Session) SetId(id uint64) {
  56. self.id = id
  57. }
  58. func (self *Session) Do() {
  59. suc, reason := self.hub.Callback().OnConnected(self)
  60. if !suc {
  61. //TODO: 这里不 Close 资源能释放吗?
  62. self.Close(reason)
  63. return
  64. }
  65. asyncDo(self.loopHandle, self.hub.Wg())
  66. asyncDo(self.loopWrite, self.hub.Wg())
  67. asyncDo(self.loopRead, self.hub.Wg())
  68. }
  69. func (self *Session) Close(reason int32) {
  70. self.close(reason)
  71. }
  72. func (self *Session) close(reason int32) {
  73. self.once.Do(func() {
  74. atomic.StoreInt32(&self.closed, 1)
  75. close(self.chClose)
  76. close(self.chSend)
  77. close(self.chRecv)
  78. self.conn.Close()
  79. self.hub.DelSession(self.id)
  80. self.hub.Callback().OnClosed(self, reason)
  81. })
  82. }
  83. func (self *Session) IsClosed() bool {
  84. return atomic.LoadInt32(&self.closed) != 0
  85. }
  86. func (self *Session) Write(pkt nnet.IPacket, timeout time.Duration) error {
  87. if self.IsClosed() {
  88. return nnet.ErrConnClosing
  89. }
  90. if timeout > 0 {
  91. _ = self.conn.SetWriteDeadline(time.Now().Add(timeout))
  92. }
  93. data, err := self.hub.Protocol().Serialize(pkt)
  94. if err != nil {
  95. return err
  96. }
  97. _, err = self.conn.Write(data)
  98. return err
  99. }
  100. // public 异步写入
  101. func (self *Session) AWrite(pkt nnet.IPacket, timeout time.Duration) (err error) {
  102. if self.IsClosed() {
  103. return nnet.ErrConnClosing
  104. }
  105. defer func() {
  106. if e := recover(); e != nil {
  107. err = nnet.ErrConnClosing
  108. }
  109. }()
  110. if timeout == 0 {
  111. select {
  112. case self.chSend <- pkt:
  113. return nil
  114. default:
  115. return nnet.ErrWriteBlocking
  116. }
  117. } else {
  118. select {
  119. case self.chSend <- pkt:
  120. return nil
  121. case <-self.chClose:
  122. return nnet.ErrConnClosing
  123. case <-time.After(timeout):
  124. return nnet.ErrWriteBlocking
  125. }
  126. }
  127. }
  128. // 循环从 socket 读取数据,置入 chRecv 通道
  129. func (self *Session) loopRead() {
  130. var reason int32 = 0
  131. defer func() {
  132. self.close(reason)
  133. }()
  134. for {
  135. select {
  136. case <-self.hub.ChQuit():
  137. reason = CLOSE_REASON_SERVER_CLOSED
  138. return
  139. case <-self.chClose:
  140. return
  141. default:
  142. }
  143. if self.hub.Conf().ReadTimeout > 0 {
  144. self.conn.SetReadDeadline(time.Now().Add(self.hub.Conf().ReadTimeout))
  145. }
  146. pkt, err := self.hub.Protocol().ReadPacket(self.conn)
  147. if err != nil {
  148. if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
  149. reason = CLOSE_REASON_READTIMEOUT
  150. } else if opErr, ok := err.(*net.OpError); ok && opErr.Timeout() {
  151. reason = CLOSE_REASON_READTIMEOUT
  152. } else {
  153. reason = CLOSE_REASON_READ
  154. }
  155. //
  156. return
  157. }
  158. self.chRecv <- pkt
  159. }
  160. }
  161. // 循环从 cbSend 通道读取数据,发送到 socket
  162. func (self *Session) loopWrite() {
  163. var reason int32 = 0
  164. defer func() {
  165. //fmt.Println(self.id)
  166. self.close(reason)
  167. }()
  168. ticker := time.NewTicker(self.hub.Conf().Tick)
  169. for {
  170. select {
  171. case <-self.hub.ChQuit():
  172. reason = CLOSE_REASON_SERVER_CLOSED
  173. return
  174. case <-self.chClose:
  175. return
  176. case <-ticker.C:
  177. self.hub.Callback().OnHeartbeat(self)
  178. case pkt := <-self.chSend:
  179. if self.IsClosed() {
  180. return
  181. }
  182. data, err := self.hub.Protocol().Serialize(pkt)
  183. if err != nil {
  184. continue
  185. }
  186. _ = self.conn.SetWriteDeadline(time.Now().Add(self.hub.Conf().Timeout))
  187. _, err = self.conn.Write(data)
  188. if err != nil {
  189. reason = CLOSE_REASON_WRITE
  190. return
  191. }
  192. yes, rsn := pkt.ShouldClose()
  193. if yes {
  194. reason = rsn
  195. return
  196. }
  197. }
  198. }
  199. }
  200. // 循环从 chRecv 读取解析好的数据,回调到实现层
  201. func (self *Session) loopHandle() {
  202. var reason int32 = 0
  203. defer func() {
  204. self.close(reason)
  205. }()
  206. for {
  207. select {
  208. case <-self.hub.ChQuit():
  209. reason = CLOSE_REASON_SERVER_CLOSED
  210. return
  211. case <-self.chClose:
  212. return
  213. case pkt := <-self.chRecv:
  214. if self.IsClosed() {
  215. return
  216. }
  217. if !self.hub.Callback().OnMessage(self, pkt) {
  218. reason = CLOSE_REASON_PROTOCOL
  219. return
  220. }
  221. }
  222. }
  223. }
  224. func asyncDo(fn func(), wg *sync.WaitGroup) {
  225. wg.Add(1)
  226. go func() {
  227. fn()
  228. wg.Done()
  229. }()
  230. }