package cpn import ( "net" "sync" "sync/atomic" "time" "git.wenlab.co/joe/nnet" ) const ( // Close Reason builtin CLOSE_REASON_READ = 1 CLOSE_REASON_WRITE = 2 CLOSE_REASON_PROTOCOL = 4 CLOSE_REASON_READTIMEOUT = 8 // HEARTBEAT CLOSE_REASON_SERVER_CLOSED = 16 // ) // 长连接 type Session struct { id uint64 hub nnet.IHub conn nnet.IConn extraData interface{} once sync.Once // Close once closed int32 // Flag: session closed or not frozen int32 // Flag: is session frozen. when no responding, no receiving, but can send. chClose chan struct{} chSend chan nnet.IPacket chRecv chan nnet.IPacket addr string } func newSession(conn nnet.IConn, h nnet.IHub, params ...string) *Session { s := &Session{ hub: h, conn: conn, chClose: make(chan struct{}), chSend: make(chan nnet.IPacket, h.Conf().SizeOfSendChan), chRecv: make(chan nnet.IPacket, h.Conf().SizeOfRecvChan), } if len(params) > 0 { s.addr = params[0] } return s } func (self *Session) GetData() interface{} { return self.extraData } func (self *Session) SetData(data interface{}) { self.extraData = data } func (self *Session) GetRawConn() nnet.IConn { return self.conn } func (self *Session) UpdateId(id uint64) { self.id = id self.hub.PutSession(id, self) } func (self *Session) Id() uint64 { return self.id } func (self *Session) SetId(id uint64) { self.id = id } // used client-side session for re-connect func (self *Session) ServerAddr() string { return self.addr } func (self *Session) Do() { suc, reason := self.hub.Callback().OnConnected(self) if !suc { //TODO: 这里不 Close 资源能释放吗? self.Close(reason) return } asyncDo(self.loopHandle, self.hub.Wg()) asyncDo(self.loopWrite, self.hub.Wg()) asyncDo(self.loopRead, self.hub.Wg()) } func (self *Session) Close(reason int32) { self.close(reason) } func (self *Session) close(reason int32) { self.once.Do(func() { atomic.StoreInt32(&self.closed, 1) close(self.chClose) close(self.chSend) close(self.chRecv) self.conn.Close() self.hub.DelSession(self.id) self.hub.Callback().OnClosed(self, reason) //self.hub.StartReconn(self.addr, self.id) }) } func (self *Session) SetFrozen(frozen int32) { atomic.StoreInt32(&self.frozen, frozen) } func (self *Session) IsClosed() bool { return atomic.LoadInt32(&self.closed) != 0 } func (self *Session) Write(pkt nnet.IPacket, timeout time.Duration) error { if self.IsClosed() { return nnet.ErrConnClosing } if timeout > 0 { _ = self.conn.SetWriteDeadline(time.Now().Add(timeout)) } data, err := self.hub.Protocol().Serialize(pkt) if err != nil { return err } _, err = self.conn.Write(data) return err } // public 异步写入 func (self *Session) AWrite(pkt nnet.IPacket, timeout time.Duration) (err error) { if self.IsClosed() { return nnet.ErrConnClosing } defer func() { if e := recover(); e != nil { err = nnet.ErrConnClosing } }() if timeout == 0 { select { case self.chSend <- pkt: return nil default: return nnet.ErrWriteBlocking } } else { select { case self.chSend <- pkt: return nil case <-self.chClose: return nnet.ErrConnClosing case <-time.After(timeout): return nnet.ErrWriteBlocking } } } // func (self *Session) loopRead() { var reason int32 = 0 defer func() { self.close(reason) }() for { select { case <-self.hub.ChQuit(): reason = CLOSE_REASON_SERVER_CLOSED return case <-self.chClose: return default: } if self.hub.Conf().ReadTimeout > 0 { self.conn.SetReadDeadline(time.Now().Add(self.hub.Conf().ReadTimeout)) } pkt, err := self.hub.Protocol().ReadPacket(self.conn) if err != nil { if netErr, ok := err.(net.Error); ok && netErr.Timeout() { reason = CLOSE_REASON_READTIMEOUT } else if opErr, ok := err.(*net.OpError); ok && opErr.Timeout() { reason = CLOSE_REASON_READTIMEOUT } else { reason = CLOSE_REASON_READ } // return } if self.frozen == 0 { self.chRecv <- pkt } } } // func (self *Session) loopWrite() { var reason int32 = 0 defer func() { //fmt.Println(self.id) self.close(reason) }() ticker := time.NewTicker(self.hub.Conf().Tick) for { select { case <-self.hub.ChQuit(): reason = CLOSE_REASON_SERVER_CLOSED return case <-self.chClose: return case <-ticker.C: self.hub.Callback().OnHeartbeat(self) case pkt := <-self.chSend: if self.IsClosed() { return } data, err := self.hub.Protocol().Serialize(pkt) if err != nil { continue } _ = self.conn.SetWriteDeadline(time.Now().Add(self.hub.Conf().Timeout)) _, err = self.conn.Write(data) if err != nil { reason = CLOSE_REASON_WRITE return } yes, rsn := pkt.ShouldClose() if yes { reason = rsn return } } } } // func (self *Session) loopHandle() { var reason int32 = 0 defer func() { self.close(reason) }() for { select { case <-self.hub.ChQuit(): reason = CLOSE_REASON_SERVER_CLOSED return case <-self.chClose: return case pkt := <-self.chRecv: if self.IsClosed() { return } if !self.hub.Callback().OnMessage(self, pkt) { reason = CLOSE_REASON_PROTOCOL return } } } } func asyncDo(fn func(), wg *sync.WaitGroup) { wg.Add(1) go func() { fn() wg.Done() }() }