session.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281
  1. package cpn
  2. import (
  3. "net"
  4. "sync"
  5. "sync/atomic"
  6. "time"
  7. "git.wenlab.co/joe/nnet"
  8. )
  9. const (
  10. // Close Reason builtin
  11. CLOSE_REASON_READ = 1
  12. CLOSE_REASON_WRITE = 2
  13. CLOSE_REASON_PROTOCOL = 4
  14. CLOSE_REASON_READTIMEOUT = 8 // HEARTBEAT
  15. CLOSE_REASON_SERVER_CLOSED = 16 //
  16. )
  17. // 长连接
  18. type Session struct {
  19. id uint64
  20. hub nnet.IHub
  21. conn nnet.IConn
  22. extraData interface{}
  23. once sync.Once // Close once
  24. closed int32 // Flag: session closed or not
  25. frozen int32 // Flag: is session frozen. when no responding, no receiving, but can send.
  26. chClose chan struct{}
  27. chSend chan nnet.IPacket
  28. chRecv chan nnet.IPacket
  29. addr string
  30. }
  31. func newSession(conn nnet.IConn, h nnet.IHub, params ...string) *Session {
  32. s := &Session{
  33. hub: h,
  34. conn: conn,
  35. chClose: make(chan struct{}),
  36. chSend: make(chan nnet.IPacket, h.Conf().SizeOfSendChan),
  37. chRecv: make(chan nnet.IPacket, h.Conf().SizeOfRecvChan),
  38. }
  39. if len(params) > 0 {
  40. s.addr = params[0]
  41. }
  42. return s
  43. }
  44. func (self *Session) GetData() interface{} {
  45. return self.extraData
  46. }
  47. func (self *Session) SetData(data interface{}) {
  48. self.extraData = data
  49. }
  50. func (self *Session) GetRawConn() nnet.IConn {
  51. return self.conn
  52. }
  53. func (self *Session) UpdateId(id uint64) {
  54. self.id = id
  55. self.hub.PutSession(id, self)
  56. }
  57. func (self *Session) Id() uint64 {
  58. return self.id
  59. }
  60. func (self *Session) SetId(id uint64) {
  61. self.id = id
  62. }
  63. // used client-side session for re-connect
  64. func (self *Session) ServerAddr() string {
  65. return self.addr
  66. }
  67. func (self *Session) Do() {
  68. suc, reason := self.hub.Callback().OnConnected(self)
  69. if !suc {
  70. //TODO: 这里不 Close 资源能释放吗?
  71. self.Close(reason)
  72. return
  73. }
  74. asyncDo(self.loopHandle, self.hub.Wg())
  75. asyncDo(self.loopWrite, self.hub.Wg())
  76. asyncDo(self.loopRead, self.hub.Wg())
  77. }
  78. func (self *Session) Close(reason int32) {
  79. self.close(reason)
  80. }
  81. func (self *Session) close(reason int32) {
  82. self.once.Do(func() {
  83. atomic.StoreInt32(&self.closed, 1)
  84. close(self.chClose)
  85. close(self.chSend)
  86. close(self.chRecv)
  87. self.conn.Close()
  88. self.hub.DelSession(self.id)
  89. self.hub.Callback().OnClosed(self, reason)
  90. //self.hub.StartReconn(self.addr, self.id)
  91. })
  92. }
  93. func (self *Session) SetFrozen(frozen int32) {
  94. atomic.StoreInt32(&self.frozen, frozen)
  95. }
  96. func (self *Session) IsClosed() bool {
  97. return atomic.LoadInt32(&self.closed) != 0
  98. }
  99. func (self *Session) Write(pkt nnet.IPacket, timeout time.Duration) error {
  100. if self.IsClosed() {
  101. return nnet.ErrConnClosing
  102. }
  103. if timeout > 0 {
  104. _ = self.conn.SetWriteDeadline(time.Now().Add(timeout))
  105. }
  106. data, err := self.hub.Protocol().Serialize(pkt)
  107. if err != nil {
  108. return err
  109. }
  110. _, err = self.conn.Write(data)
  111. return err
  112. }
  113. // public 异步写入
  114. func (self *Session) AWrite(pkt nnet.IPacket, timeout time.Duration) (err error) {
  115. if self.IsClosed() {
  116. return nnet.ErrConnClosing
  117. }
  118. defer func() {
  119. if e := recover(); e != nil {
  120. err = nnet.ErrConnClosing
  121. }
  122. }()
  123. if timeout == 0 {
  124. select {
  125. case self.chSend <- pkt:
  126. return nil
  127. default:
  128. return nnet.ErrWriteBlocking
  129. }
  130. } else {
  131. select {
  132. case self.chSend <- pkt:
  133. return nil
  134. case <-self.chClose:
  135. return nnet.ErrConnClosing
  136. case <-time.After(timeout):
  137. return nnet.ErrWriteBlocking
  138. }
  139. }
  140. }
  141. //
  142. func (self *Session) loopRead() {
  143. var reason int32 = 0
  144. defer func() {
  145. self.close(reason)
  146. }()
  147. for {
  148. select {
  149. case <-self.hub.ChQuit():
  150. reason = CLOSE_REASON_SERVER_CLOSED
  151. return
  152. case <-self.chClose:
  153. return
  154. default:
  155. }
  156. if self.hub.Conf().ReadTimeout > 0 {
  157. self.conn.SetReadDeadline(time.Now().Add(self.hub.Conf().ReadTimeout))
  158. }
  159. pkt, err := self.hub.Protocol().ReadPacket(self.conn)
  160. if err != nil {
  161. if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
  162. reason = CLOSE_REASON_READTIMEOUT
  163. } else if opErr, ok := err.(*net.OpError); ok && opErr.Timeout() {
  164. reason = CLOSE_REASON_READTIMEOUT
  165. } else {
  166. reason = CLOSE_REASON_READ
  167. }
  168. //
  169. return
  170. }
  171. if self.frozen == 0 {
  172. self.chRecv <- pkt
  173. }
  174. }
  175. }
  176. //
  177. func (self *Session) loopWrite() {
  178. var reason int32 = 0
  179. defer func() {
  180. //fmt.Println(self.id)
  181. self.close(reason)
  182. }()
  183. ticker := time.NewTicker(self.hub.Conf().Tick)
  184. for {
  185. select {
  186. case <-self.hub.ChQuit():
  187. reason = CLOSE_REASON_SERVER_CLOSED
  188. return
  189. case <-self.chClose:
  190. return
  191. case <-ticker.C:
  192. self.hub.Callback().OnHeartbeat(self)
  193. case pkt := <-self.chSend:
  194. if self.IsClosed() {
  195. return
  196. }
  197. data, err := self.hub.Protocol().Serialize(pkt)
  198. if err != nil {
  199. continue
  200. }
  201. _ = self.conn.SetWriteDeadline(time.Now().Add(self.hub.Conf().Timeout))
  202. _, err = self.conn.Write(data)
  203. if err != nil {
  204. reason = CLOSE_REASON_WRITE
  205. return
  206. }
  207. yes, rsn := pkt.ShouldClose()
  208. if yes {
  209. reason = rsn
  210. return
  211. }
  212. }
  213. }
  214. }
  215. //
  216. func (self *Session) loopHandle() {
  217. var reason int32 = 0
  218. defer func() {
  219. self.close(reason)
  220. }()
  221. for {
  222. select {
  223. case <-self.hub.ChQuit():
  224. reason = CLOSE_REASON_SERVER_CLOSED
  225. return
  226. case <-self.chClose:
  227. return
  228. case pkt := <-self.chRecv:
  229. if self.IsClosed() {
  230. return
  231. }
  232. if !self.hub.Callback().OnMessage(self, pkt) {
  233. reason = CLOSE_REASON_PROTOCOL
  234. return
  235. }
  236. }
  237. }
  238. }
  239. func asyncDo(fn func(), wg *sync.WaitGroup) {
  240. wg.Add(1)
  241. go func() {
  242. fn()
  243. wg.Done()
  244. }()
  245. }