session.go 5.3 KB


  1. package internal
  2. import (
  3. "net"
  4. "one.com/nnet"
  5. "sync"
  6. "sync/atomic"
  7. "time"
  8. )
  9. // 关闭原因
  10. const (
  11. // Close Reason 是一个 int32 型数据,这是系统预置的几个代码
  12. CLOSE_REASON_READ = 0
  13. CLOSE_REASON_WRITE = 0
  14. CLOSE_REASON_PROTOCOL = 1
  15. CLOSE_REASON_READTIMEOUT = 4 // HEARTBEAT
  16. CLOSE_REASON_SERVER_CLOSED = 16 // 本服务器关闭
  17. )
  18. // 长连接
  19. type Session struct {
  20. id uint64
  21. hub nnet.IHub
  22. conn nnet.IConn
  23. extraData interface{}
  24. once sync.Once // Close once
  25. closed int32 // session 是否关闭
  26. frozen int32 // is session frozen. when no responding, no receiving, but can send.
  27. chClose chan struct{}
  28. chSend chan nnet.IPacket
  29. chRecv chan nnet.IPacket
  30. }
  31. func newSession(conn nnet.IConn, h nnet.IHub) *Session {
  32. return &Session{
  33. hub: h,
  34. conn: conn,
  35. chClose: make(chan struct{}),
  36. chSend: make(chan nnet.IPacket, h.Conf().SizeOfSendChan),
  37. chRecv: make(chan nnet.IPacket, h.Conf().SizeOfRecvChan),
  38. }
  39. }
  40. func (self *Session) GetData() interface{} {
  41. return self.extraData
  42. }
  43. func (self *Session) SetData(data interface{}) {
  44. self.extraData = data
  45. }
  46. func (self *Session) GetRawConn() nnet.IConn {
  47. return self.conn
  48. }
  49. func (self *Session) UpdateId(id uint64) {
  50. self.id = id
  51. self.hub.PutSession(id, self)
  52. }
  53. func (self *Session) Id() uint64 {
  54. return self.id
  55. }
  56. func (self *Session) SetId(id uint64) {
  57. self.id = id
  58. }
  59. func (self *Session) Do() {
  60. suc, reason := self.hub.Callback().OnConnected(self)
  61. if !suc {
  62. //TODO: 这里不 Close 资源能释放吗?
  63. self.Close(reason)
  64. return
  65. }
  66. asyncDo(self.loopHandle, self.hub.Wg())
  67. asyncDo(self.loopWrite, self.hub.Wg())
  68. asyncDo(self.loopRead, self.hub.Wg())
  69. }
  70. func (self *Session) Close(reason int32) {
  71. self.close(reason)
  72. }
  73. func (self *Session) close(reason int32) {
  74. self.once.Do(func() {
  75. atomic.StoreInt32(&self.closed, 1)
  76. close(self.chClose)
  77. close(self.chSend)
  78. close(self.chRecv)
  79. self.conn.Close()
  80. self.hub.DelSession(self.id)
  81. self.hub.Callback().OnClosed(self, reason)
  82. })
  83. }
  84. func (self *Session) SetFrozen(frozen int32) {
  85. atomic.StoreInt32(&self.frozen, frozen)
  86. }
  87. func (self *Session) IsClosed() bool {
  88. return atomic.LoadInt32(&self.closed) != 0
  89. }
  90. func (self *Session) Write(pkt nnet.IPacket, timeout time.Duration) error {
  91. if self.IsClosed() {
  92. return nnet.ErrConnClosing
  93. }
  94. if timeout > 0 {
  95. _ = self.conn.SetWriteDeadline(time.Now().Add(timeout))
  96. }
  97. data, err := self.hub.Protocol().Serialize(pkt)
  98. if err != nil {
  99. return err
  100. }
  101. _, err = self.conn.Write(data)
  102. return err
  103. }
  104. // public 异步写入
  105. func (self *Session) AWrite(pkt nnet.IPacket, timeout time.Duration) (err error) {
  106. if self.IsClosed() {
  107. return nnet.ErrConnClosing
  108. }
  109. defer func() {
  110. if e := recover(); e != nil {
  111. err = nnet.ErrConnClosing
  112. }
  113. }()
  114. if timeout == 0 {
  115. select {
  116. case self.chSend <- pkt:
  117. return nil
  118. default:
  119. return nnet.ErrWriteBlocking
  120. }
  121. } else {
  122. select {
  123. case self.chSend <- pkt:
  124. return nil
  125. case <-self.chClose:
  126. return nnet.ErrConnClosing
  127. case <-time.After(timeout):
  128. return nnet.ErrWriteBlocking
  129. }
  130. }
  131. }
  132. // 循环从 socket 读取数据,置入 chRecv 通道
  133. func (self *Session) loopRead() {
  134. var reason int32 = 0
  135. defer func() {
  136. self.close(reason)
  137. }()
  138. for {
  139. select {
  140. case <-self.hub.ChQuit():
  141. reason = CLOSE_REASON_SERVER_CLOSED
  142. return
  143. case <-self.chClose:
  144. return
  145. default:
  146. }
  147. if self.hub.Conf().ReadTimeout > 0 {
  148. self.conn.SetReadDeadline(time.Now().Add(self.hub.Conf().ReadTimeout))
  149. }
  150. pkt, err := self.hub.Protocol().ReadPacket(self.conn)
  151. if err != nil {
  152. if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
  153. reason = CLOSE_REASON_READTIMEOUT
  154. } else if opErr, ok := err.(*net.OpError); ok && opErr.Timeout() {
  155. reason = CLOSE_REASON_READTIMEOUT
  156. } else {
  157. reason = CLOSE_REASON_READ
  158. }
  159. //
  160. return
  161. }
  162. if self.frozen == 0 {
  163. self.chRecv <- pkt
  164. }
  165. }
  166. }
  167. // 循环从 cbSend 通道读取数据,发送到 socket
  168. func (self *Session) loopWrite() {
  169. var reason int32 = 0
  170. defer func() {
  171. //fmt.Println(self.id)
  172. self.close(reason)
  173. }()
  174. ticker := time.NewTicker(self.hub.Conf().Tick)
  175. for {
  176. select {
  177. case <-self.hub.ChQuit():
  178. reason = CLOSE_REASON_SERVER_CLOSED
  179. return
  180. case <-self.chClose:
  181. return
  182. case <-ticker.C:
  183. self.hub.Callback().OnHeartbeat(self)
  184. case pkt := <-self.chSend:
  185. if self.IsClosed() {
  186. return
  187. }
  188. data, err := self.hub.Protocol().Serialize(pkt)
  189. if err != nil {
  190. continue
  191. }
  192. _ = self.conn.SetWriteDeadline(time.Now().Add(self.hub.Conf().Timeout))
  193. _, err = self.conn.Write(data)
  194. if err != nil {
  195. reason = CLOSE_REASON_WRITE
  196. return
  197. }
  198. yes, rsn := pkt.ShouldClose()
  199. if yes {
  200. reason = rsn
  201. return
  202. }
  203. }
  204. }
  205. }
  206. // 循环从 chRecv 读取解析好的数据,回调到实现层
  207. func (self *Session) loopHandle() {
  208. var reason int32 = 0
  209. defer func() {
  210. self.close(reason)
  211. }()
  212. for {
  213. select {
  214. case <-self.hub.ChQuit():
  215. reason = CLOSE_REASON_SERVER_CLOSED
  216. return
  217. case <-self.chClose:
  218. return
  219. case pkt := <-self.chRecv:
  220. if self.IsClosed() {
  221. return
  222. }
  223. if !self.hub.Callback().OnMessage(self, pkt) {
  224. reason = CLOSE_REASON_PROTOCOL
  225. return
  226. }
  227. }
  228. }
  229. }
  230. func asyncDo(fn func(), wg *sync.WaitGroup) {
  231. wg.Add(1)
  232. go func() {
  233. fn()
  234. wg.Done()
  235. }()
  236. }