package main import ( "encoding/json" "git.wanbits.io/joe/franklin/comp" pb "git.wanbits.io/joe/franklin/protos" "git.wanbits.io/joe/kettle/log" "git.wanbits.io/joe/kettle/mac" "git.wanbits.io/joe/kettle/utl" "git.wanbits.io/joe/nnet" "go.uber.org/zap" "time" ) const ( LOGIC_SESSION_ID = 5 LOGIN_TIMEOUT = 2 * time.Second CMD_HEARTBEAT = "bl_heartbeat" CMD_LOGIN_VERIFY = "bl_login_verify" CMD_UPDATE_USERINFO = "bl_update_userinfo" ) type WaitParam struct { ses nnet.ISession timer *time.Timer } // TODO: optimize with Pool func NewWaitParam(ses nnet.ISession, timeout time.Duration) *WaitParam { return &WaitParam{ ses: ses, timer: time.NewTimer(timeout), } } // //type WaitorParam struct { // ses nnet.ISession // ch chan struct{} //} // //type PacketWaitor struct { // mp *utl.SMap //} // //func NewPacketWaitor() *PacketWaitor { // return &PacketWaitor{ // mp: utl.NewSMap(), // } //} // //func Wait(key interface{}, ses nnet.ISession) { // //} func (self *RabbitCProtocol) Send(packet *ReqPacket, cses nnet.ISession) error { ses, err := g_rabbit.GetSession(LOGIC_SESSION_ID) if err != nil { log.Warn("RabbitSend", zap.Error(err)) return err } chWait := NewWaitParam(cses, LOGIN_TIMEOUT) err = g_rabbitProto.Insert(packet.Seq, chWait) if err != nil { return err } go func() { select { case <-chWait.timer.C: g_rabbitProto.Del(packet.Seq) on_recv_timeout(cses, packet) } }() return ses.AWrite(packet, time.Second*2) } type RespPacket struct { *utl.JsResp } func NewRespPacket(cmd string, seq int64, ec int, res interface{}) *RespPacket { return &RespPacket{ &utl.JsResp{ Cmd: cmd, Seq: seq, Ec: ec, Result: res, }, } } func (self *RespPacket) ShouldClose() (bool, int32) { return false, 0 } type ReqPacket struct { *utl.JsReq } func NewReqPacket(cmd string, seq int64, params interface{}) *ReqPacket { return &ReqPacket{ &utl.JsReq{ Cmd: cmd, Seq: seq, Params: params, }, } } func (self *ReqPacket) ShouldClose() (bool, int32) { return false, 0 } type RabbitCProtocol struct { *utl.SMap } func NewRabbitCProtocol() *RabbitCProtocol { return &RabbitCProtocol{ utl.NewSMap(), } } func (self *RabbitCProtocol) ReadPacket(conn nnet.IConn) (nnet.IPacket, error) { buf := comp.GBufPool.Get() defer comp.GBufPool.Put(buf) n, err := conn.Read(buf) if err != nil { return nil, err } p := &RespPacket{} err = json.Unmarshal(buf[:n], p) if err != nil { return nil, err } return p, nil } func (self *RabbitCProtocol) Serialize(packet nnet.IPacket) ([]byte, error) { p, ok := packet.(*ReqPacket) if !ok { return nil, utl.ErrInterfaceTransform } data, err := json.Marshal(p) if err != nil { return nil, err } return data, nil } func (self *RabbitCProtocol) OnClosed(ses nnet.ISession, reason int32) { log.Debug("closed connection to login", zap.Int32("reason", reason)) g_rabbit.StartReconn(ses.ServerAddr(), ses.Id()) } func (self *RabbitCProtocol) OnConnected(ses nnet.ISession) (bool, int32) { log.Debug("connected to login server", zap.String("addr", ses.ServerAddr())) ses.UpdateId(LOGIC_SESSION_ID) return true, 0 } func (self *RabbitCProtocol) OnMessage(ses nnet.ISession, pkt nnet.IPacket) bool { resp, ok := pkt.(*RespPacket) if !ok { log.Error("not a RespPacket") return false } // dispatch cb, ok := rabbit_route_table[resp.Cmd] if !ok { log.Error("unsupported cmd", zap.String("cmd", resp.Cmd)) return false } cb(ses, resp) return true } // server-side dont handle heartbeat. func (self *RabbitCProtocol) OnHeartbeat(ses nnet.ISession) bool { mac.GetMachInfoMutablePart(&comp.Gmi) var cpuPercent int32 = 0 if len(comp.Gmi.CpuPercent) > 0 { cpuPercent = int32(comp.Gmi.CpuPercent[0]) } seq := time.Now().UnixNano() p := NewReqPacket("bl_heartbeat", seq, &pb.StatusParams{ Id: g_conf.Id, Ts: time.Now().Unix(), Active: int32(g_server.GetSessionNum()), Support: 3000, Mem: int32(comp.Gmi.MemPercent), Cpu: cpuPercent, }) _ = self.Send(p, nil) return true }