logics.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. package main
  2. import (
  3. "encoding/binary"
  4. "git.wanbits.io/joe/franklin/comp"
  5. pb "git.wanbits.io/joe/franklin/protos"
  6. "git.wanbits.io/joe/kettle/log"
  7. "git.wanbits.io/joe/kettle/utl"
  8. "git.wanbits.io/joe/nnet"
  9. "github.com/golang/protobuf/proto"
  10. "go.uber.org/zap"
  11. "time"
  12. )
  13. type LAPacket struct {
  14. *pb.Response
  15. }
  16. func (self *LAPacket) ShouldClose() (bool, int32) {
  17. return false, 0
  18. }
  19. // protocol && callback, Using TCP
  20. type ALProtocol struct{}
  21. func (self *ALProtocol) ReadPacket(conn nnet.IConn) (nnet.IPacket, error) {
  22. buf := comp.GBufPool.Get()
  23. defer comp.GBufPool.Put(buf)
  24. n, err := conn.Read(buf[:4])
  25. n, err = conn.Read(buf[4 : 4+n])
  26. p := &SCPacket{}
  27. err = proto.Unmarshal(buf[4:], p.resp)
  28. return p, err
  29. }
  30. func (self *ALProtocol) Serialize(packet nnet.IPacket) ([]byte, error) {
  31. csp, ok := packet.(*CSPacket)
  32. if !ok {
  33. log.Error("can not transform")
  34. return nil, utl.ErrInterfaceTransform
  35. }
  36. data, err := proto.Marshal(csp.Request)
  37. if err != nil {
  38. return nil, err
  39. }
  40. buf := make([]byte, len(data)+4)
  41. binary.LittleEndian.PutUint32(buf, uint32(len(data)))
  42. copy(buf[4:], data)
  43. return buf, nil
  44. }
  45. func (self *ALProtocol) OnClosed(ses nnet.ISession, reason int32) {
  46. }
  47. func (self *ALProtocol) OnConnected(ses nnet.ISession) (bool, int32) {
  48. return true, 0
  49. }
  50. func (self *ALProtocol) OnMessage(ses nnet.ISession, pkt nnet.IPacket) bool {
  51. return true
  52. }
  53. // server-side dont handle heartbeat.
  54. func (self *ALProtocol) OnHeartbeat(ses nnet.ISession) bool {
  55. SendToLogic(ses.Id(), 0, &pb.Q_Heartbeat{Ts: time.Now().Unix()})
  56. return true
  57. }
  58. func ForwardLogic(logicId uint64, msg *CSPacket) {
  59. ses, err := g_logics.GetSession(logicId)
  60. if err != nil {
  61. log.Error("cant find logic server", zap.Uint64("logicId", logicId), zap.Error(err))
  62. return
  63. }
  64. err = ses.AWrite(msg, 3*time.Second)
  65. if err != nil {
  66. log.Error("post logic", zap.Error(err))
  67. }
  68. }
  69. func SendToLogic(logicId, userId uint64, inner proto.Message) {
  70. ses, err := g_logics.GetSession(logicId)
  71. if err != nil {
  72. log.Error("cant find logic server", zap.Uint64("logicId", logicId), zap.Error(err))
  73. return
  74. }
  75. // 不做 guid 合法性检查,而直接发出消息
  76. bst, err := proto.Marshal(inner)
  77. if nil != err {
  78. log.Error("proto.Marshal", zap.Error(err))
  79. return
  80. }
  81. name := comp.GetMsgName(inner)
  82. id := comp.GetMsgId(name)
  83. req := &pb.Request{
  84. MsgId: id,
  85. UserId: userId,
  86. MsgName: name,
  87. Req: bst,
  88. }
  89. err = ses.AWrite(&CSPacket{req}, time.Second)
  90. if nil != err {
  91. log.Error("proto.AWrite", zap.Error(err))
  92. }
  93. }