rabbit.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. package main
  2. import (
  3. "encoding/json"
  4. "git.wanbits.io/joe/franklin/comp"
  5. pb "git.wanbits.io/joe/franklin/protos"
  6. "git.wanbits.io/joe/kettle/log"
  7. "git.wanbits.io/joe/kettle/mac"
  8. "git.wanbits.io/joe/kettle/utl"
  9. "git.wanbits.io/joe/nnet"
  10. "go.uber.org/zap"
  11. "time"
  12. )
  13. const (
  14. LOGIC_SESSION_ID = 5
  15. LOGIN_TIMEOUT = 2 * time.Second
  16. CMD_HEARTBEAT = "bl_heartbeat"
  17. CMD_LOGIN_VERIFY = "bl_login_verify"
  18. CMD_UPDATE_USERINFO = "bl_update_userinfo"
  19. )
  20. type WaitParam struct {
  21. ses nnet.ISession
  22. timer *time.Timer
  23. }
  24. // TODO: optimize with Pool
  25. func NewWaitParam(ses nnet.ISession, timeout time.Duration) *WaitParam {
  26. return &WaitParam{
  27. ses: ses,
  28. timer: time.NewTimer(timeout),
  29. }
  30. }
  31. //
  32. //type WaitorParam struct {
  33. // ses nnet.ISession
  34. // ch chan struct{}
  35. //}
  36. //
  37. //type PacketWaitor struct {
  38. // mp *utl.SMap
  39. //}
  40. //
  41. //func NewPacketWaitor() *PacketWaitor {
  42. // return &PacketWaitor{
  43. // mp: utl.NewSMap(),
  44. // }
  45. //}
  46. //
  47. //func Wait(key interface{}, ses nnet.ISession) {
  48. //
  49. //}
  50. func (self *RabbitCProtocol) Send(packet *ReqPacket, cses nnet.ISession) error {
  51. ses, err := g_rabbit.GetSession(LOGIC_SESSION_ID)
  52. if err != nil {
  53. log.Warn("RabbitSend", zap.Error(err))
  54. return err
  55. }
  56. chWait := NewWaitParam(cses, LOGIN_TIMEOUT)
  57. err = g_rabbitProto.Insert(packet.Seq, chWait)
  58. if err != nil {
  59. return err
  60. }
  61. go func() {
  62. select {
  63. case <-chWait.timer.C:
  64. g_rabbitProto.Del(packet.Seq)
  65. on_recv_timeout(cses, packet)
  66. }
  67. }()
  68. return ses.AWrite(packet, time.Second*2)
  69. }
  70. type RespPacket struct {
  71. *utl.JsResp
  72. }
  73. func NewRespPacket(cmd string, seq int64, ec int, res interface{}) *RespPacket {
  74. return &RespPacket{
  75. &utl.JsResp{
  76. Cmd: cmd,
  77. Seq: seq,
  78. Ec: ec,
  79. Result: res,
  80. },
  81. }
  82. }
  83. func (self *RespPacket) ShouldClose() (bool, int32) {
  84. return false, 0
  85. }
  86. type ReqPacket struct {
  87. *utl.JsReq
  88. }
  89. func NewReqPacket(cmd string, seq int64, params interface{}) *ReqPacket {
  90. return &ReqPacket{
  91. &utl.JsReq{
  92. Cmd: cmd,
  93. Seq: seq,
  94. Params: params,
  95. },
  96. }
  97. }
  98. func (self *ReqPacket) ShouldClose() (bool, int32) {
  99. return false, 0
  100. }
  101. type RabbitCProtocol struct {
  102. *utl.SMap
  103. }
  104. func NewRabbitCProtocol() *RabbitCProtocol {
  105. return &RabbitCProtocol{
  106. utl.NewSMap(),
  107. }
  108. }
  109. func (self *RabbitCProtocol) ReadPacket(conn nnet.IConn) (nnet.IPacket, error) {
  110. buf := comp.GBufPool.Get()
  111. defer comp.GBufPool.Put(buf)
  112. n, err := conn.Read(buf)
  113. if err != nil {
  114. return nil, err
  115. }
  116. p := &RespPacket{}
  117. err = json.Unmarshal(buf[:n], p)
  118. if err != nil {
  119. return nil, err
  120. }
  121. return p, nil
  122. }
  123. func (self *RabbitCProtocol) Serialize(packet nnet.IPacket) ([]byte, error) {
  124. p, ok := packet.(*ReqPacket)
  125. if !ok {
  126. return nil, utl.ErrInterfaceTransform
  127. }
  128. data, err := json.Marshal(p)
  129. if err != nil {
  130. return nil, err
  131. }
  132. return data, nil
  133. }
  134. func (self *RabbitCProtocol) OnClosed(ses nnet.ISession, reason int32) {
  135. log.Debug("closed connection to login", zap.Int32("reason", reason))
  136. g_rabbit.StartReconn(ses.ServerAddr(), ses.Id())
  137. }
  138. func (self *RabbitCProtocol) OnConnected(ses nnet.ISession) (bool, int32) {
  139. log.Debug("connected to login server", zap.String("addr", ses.ServerAddr()))
  140. ses.UpdateId(LOGIC_SESSION_ID)
  141. return true, 0
  142. }
  143. func (self *RabbitCProtocol) OnMessage(ses nnet.ISession, pkt nnet.IPacket) bool {
  144. resp, ok := pkt.(*RespPacket)
  145. if !ok {
  146. log.Error("not a RespPacket")
  147. return false
  148. }
  149. // dispatch
  150. cb, ok := rabbit_route_table[resp.Cmd]
  151. if !ok {
  152. log.Error("unsupported cmd", zap.String("cmd", resp.Cmd))
  153. return false
  154. }
  155. cb(ses, resp)
  156. return true
  157. }
  158. // server-side dont handle heartbeat.
  159. func (self *RabbitCProtocol) OnHeartbeat(ses nnet.ISession) bool {
  160. mac.GetMachInfoMutablePart(&comp.Gmi)
  161. var cpuPercent int32 = 0
  162. if len(comp.Gmi.CpuPercent) > 0 {
  163. cpuPercent = int32(comp.Gmi.CpuPercent[0])
  164. }
  165. seq := time.Now().UnixNano()
  166. p := NewReqPacket("bl_heartbeat", seq, &pb.StatusParams{
  167. Id: g_conf.Id,
  168. Ts: time.Now().Unix(),
  169. Active: int32(g_server.GetSessionNum()),
  170. Support: 3000,
  171. Mem: int32(comp.Gmi.MemPercent),
  172. Cpu: cpuPercent,
  173. })
  174. _ = self.Send(p, nil)
  175. return true
  176. }