logics.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. package main
  2. import (
  3. "encoding/binary"
  4. "git.wanbits.io/joe/franklin/comp"
  5. pb "git.wanbits.io/joe/franklin/protos"
  6. "git.wanbits.io/joe/kettle/log"
  7. "git.wanbits.io/joe/kettle/utl"
  8. "git.wanbits.io/joe/nnet"
  9. "github.com/golang/protobuf/proto"
  10. "go.uber.org/zap"
  11. "time"
  12. )
  13. // protocol && callback, Using TCP
  14. type ALProtocol struct{}
  15. func (self *ALProtocol) ReadPacket(conn nnet.IConn) (nnet.IPacket, error) {
  16. buf := comp.GBufPool.Get()
  17. defer comp.GBufPool.Put(buf)
  18. n, err := conn.Read(buf[:4])
  19. n, err = conn.Read(buf[4 : 4+n])
  20. p := &SCPacket{}
  21. err = proto.Unmarshal(buf[4:], p.resp)
  22. return p, err
  23. }
  24. func (self *ALProtocol) Serialize(packet nnet.IPacket) ([]byte, error) {
  25. csp, ok := packet.(*CSPacket)
  26. if !ok {
  27. log.Error("can not transform")
  28. return nil, utl.ErrInterfaceTransform
  29. }
  30. data, err := proto.Marshal(csp.Request)
  31. if err != nil {
  32. return nil, err
  33. }
  34. buf := make([]byte, len(data)+4)
  35. binary.LittleEndian.PutUint32(buf, uint32(len(data)))
  36. copy(buf[4:], data)
  37. return buf, nil
  38. }
  39. func (self *ALProtocol) OnClosed(ses nnet.ISession, reason int32) {
  40. log.Info("logic offline:", zap.String("addr", ses.ServerAddr()), zap.Int32("reason", reason))
  41. }
  42. func (self *ALProtocol) OnConnected(ses nnet.ISession) (bool, int32) {
  43. log.Debug("connect loggic", zap.String("addr", ses.ServerAddr()))
  44. return true, 0
  45. }
  46. func (self *ALProtocol) OnMessage(ses nnet.ISession, pkt nnet.IPacket) bool {
  47. csp, ok := pkt.(*SCPacket)
  48. if !ok {
  49. return false
  50. }
  51. switch pb.MsgGossipId(csp.resp.MsgId) {
  52. case pb.MsgGossipId_A_Heartbeat:
  53. case pb.MsgGossipId_A_Offline:
  54. case pb.MsgGossipId_A_Online:
  55. default:
  56. ForwardClient(csp)
  57. }
  58. return true
  59. }
  60. // server-side dont handle heartbeat.
  61. func (self *ALProtocol) OnHeartbeat(ses nnet.ISession) bool {
  62. SendToLogic(ses.Id(), 0, &pb.Q_Heartbeat{Ts: time.Now().Unix()})
  63. return true
  64. }
  65. func ForwardLogic(logicId uint64, msg *CSPacket) {
  66. ses, err := g_logics.GetSession(logicId)
  67. if err != nil {
  68. log.Error("cant find logic server", zap.Uint64("logicId", logicId), zap.Error(err))
  69. return
  70. }
  71. err = ses.AWrite(msg, 3*time.Second)
  72. if err != nil {
  73. log.Error("post logic", zap.Error(err))
  74. }
  75. }
  76. func SendToLogic(logicId, userId uint64, inner proto.Message) {
  77. ses, err := g_logics.GetSession(logicId)
  78. if err != nil {
  79. log.Error("cant find logic server", zap.Uint64("logicId", logicId), zap.Error(err))
  80. return
  81. }
  82. // 不做 guid 合法性检查,而直接发出消息
  83. bst, err := proto.Marshal(inner)
  84. if nil != err {
  85. log.Error("proto.Marshal", zap.Error(err))
  86. return
  87. }
  88. name := comp.GetMsgName(inner)
  89. id := comp.GetMsgId(name)
  90. req := &pb.Request{
  91. MsgId: id,
  92. UserId: userId,
  93. MsgName: name,
  94. Req: bst,
  95. }
  96. err = ses.AWrite(&CSPacket{req}, time.Second)
  97. if nil != err {
  98. log.Error("proto.AWrite", zap.Error(err))
  99. }
  100. }