server_router.go 4.9 KB


  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "git.wanbits.io/joe/franklin/comp"
  7. "git.wanbits.io/joe/franklin/mod"
  8. pb "git.wanbits.io/joe/franklin/protos"
  9. "git.wanbits.io/joe/kettle/log"
  10. "git.wanbits.io/joe/nnet"
  11. "github.com/go-redis/redis"
  12. "github.com/golang/protobuf/proto"
  13. "go.uber.org/zap"
  14. "google.golang.org/grpc"
  15. "time"
  16. )
  17. var (
  18. routes = map[string]fnCSHandler{
  19. "C_Heartbeat": on_C_Heartbeat,
  20. "C_Login": on_C_Login,
  21. "C_Logout": on_C_Logout,
  22. "C_EnterGame": on_C_EnterGame,
  23. "C_LeaveGame": on_C_LeaveGame,
  24. "C_EnterRoom": on_C_EnterRoom,
  25. "C_EnterTable": on_C_EnterTable,
  26. "C_LeaveRoom": on_C_LeaveRoom,
  27. "C_LeaveTable": on_C_LeaveTable,
  28. }
  29. )
  30. type fnCSHandler func(nnet.ISession, *CSPacket)
  31. func dispatch(ses nnet.ISession, pkt *CSPacket) {
  32. handler, ok := routes[pkt.MsgName]
  33. if !ok {
  34. // route to lobbies
  35. if pkt.MsgId <= 5000 {
  36. ForwardLobby(pkt)
  37. return
  38. }
  39. logicId, err:= mod.LoadUserLogic(pkt.UserId)
  40. if err != nil {
  41. if err != redis.Nil {
  42. log.Error("user logic", zap.Error(err))
  43. }
  44. return
  45. }
  46. ForwardLogic(logicId, pkt)
  47. return
  48. }
  49. handler(ses, pkt)
  50. }
  51. func on_C_Heartbeat(ses nnet.ISession, p *CSPacket) {
  52. resp := &pb.S_Heartbeat{
  53. Ts: time.Now().Unix(),
  54. }
  55. SendToClient(ses, ses.Id(), pb.ErrCode_OK, resp)
  56. }
  57. func on_C_Login(ses nnet.ISession, p *CSPacket) {
  58. prm := &pb.C_Login{}
  59. ses.SetId(p.UserId) // IMPORTANT
  60. err := proto.Unmarshal(p.Req, prm)
  61. if err != nil {
  62. response_c_login(ses, pb.ErrCode_BAD_FORMAT, nil)
  63. return
  64. }
  65. // verify by rabbit
  66. seq := time.Now().UnixNano()
  67. err = g_rabbitProto.Send(NewReqPacket("bl_login_verify", seq, &pb.VerifyParams{
  68. UserId: p.UserId,
  69. Username: prm.Username,
  70. Token: prm.Token,
  71. }), ses)
  72. if err != nil {
  73. response_c_login(ses, pb.ErrCode_FAILED, nil)
  74. }
  75. }
  76. func response_c_login(ses nnet.ISession, errCode pb.ErrCode, res *pb.S_Login) {
  77. var ec = pb.ErrCode_OK
  78. var reason = pb.NetCloseReason_KICK
  79. defer func() {
  80. p := []int32{}
  81. if ec != pb.ErrCode_OK {
  82. p = append(p, 1)
  83. p = append(p, int32(reason))
  84. res = &pb.S_Login{}
  85. //SendToClient(ses, ses.Id(), ec, &pb.S_Login{})
  86. //KillUserSession(ses, ses.Id(), pb.NetCloseReason_ERR_AUTH)
  87. }
  88. SendToClient(ses, ses.Id(), ec, res, p...)
  89. }()
  90. if errCode != pb.ErrCode_OK {
  91. ec = errCode
  92. reason = pb.NetCloseReason_ERR_AUTH
  93. return
  94. }
  95. // check agent offline
  96. if g_offline {
  97. ec = pb.ErrCode_MAINTAINING
  98. reason = pb.NetCloseReason_KICK
  99. return
  100. }
  101. // check user kicked off
  102. if mod.LoadKick(ses.Id()) {
  103. ec = pb.ErrCode_FORBIDDEN
  104. reason = pb.NetCloseReason_KICK
  105. return
  106. }
  107. //TODO: check if have logged in any agents already
  108. doCheckUserLoginAlready(ses.Id())
  109. // check reconnect state
  110. logicId, err := mod.LoadOffline(ses.Id())
  111. if err == nil {
  112. // route to logic
  113. SendToLogic(logicId, ses.Id(), &pb.Q_Online{UserId: ses.Id()})
  114. } else {
  115. if err != redis.Nil {
  116. log.Error("", zap.Error(err))
  117. }
  118. // route to a lobby
  119. SendToLobby(ses.Id(), &pb.Q_Online{UserId: ses.Id()})
  120. }
  121. // update user location
  122. err = mod.SaveUserLogin(ses.Id(), g_conf.Id)
  123. if err != nil {
  124. log.Error("", zap.Error(err))
  125. }
  126. // update session id
  127. ses.UpdateId(ses.Id())
  128. }
  129. func on_C_Logout(ses nnet.ISession, p *CSPacket) {
  130. //
  131. if err := mod.SaveUserLogout(ses.Id()); err != nil {
  132. log.Error("logout", zap.Error(err))
  133. }
  134. }
  135. func on_C_EnterGame(ses nnet.ISession, p *CSPacket) {
  136. }
  137. func on_C_LeaveGame(ses nnet.ISession, p *CSPacket) {
  138. }
  139. func on_C_EnterRoom(ses nnet.ISession, p *CSPacket) {
  140. }
  141. func on_C_EnterTable(ses nnet.ISession, p *CSPacket) {
  142. }
  143. func on_C_LeaveRoom(ses nnet.ISession, p *CSPacket) {
  144. }
  145. func on_C_LeaveTable(ses nnet.ISession, p *CSPacket) {
  146. }
  147. // check if specified user had logged in any agents, if its, request older login to logout
  148. // and keep this login process going on
  149. func doCheckUserLoginAlready(userId uint64) {
  150. agentId, err := mod.LoadUserAgent(userId)
  151. if err != nil {
  152. return
  153. }
  154. if agentId == g_conf.Id {
  155. // logged in self
  156. ses, err := g_server.GetSession(userId)
  157. if err != nil {
  158. log.Error("logic error", zap.Uint64("userId", userId), zap.Uint64("agentId", agentId))
  159. return
  160. }
  161. KillUserSession(ses, userId, pb.NetCloseReason_DUP)
  162. return
  163. }
  164. // agentId != g_conf.Id
  165. key := fmt.Sprintf("%s/%v", comp.PathAgents(g_conf), g_conf.Id)
  166. resp, err := comp.GEtcdc.Get(key)
  167. if err != nil {
  168. log.Error("", zap.Error(err))
  169. return
  170. }
  171. if len(resp.Kvs) <= 0 {
  172. log.Error("")
  173. return
  174. }
  175. agentConf := &pb.AppConfConf{}
  176. err = json.Unmarshal(resp.Kvs[0].Value, agentConf)
  177. if err != nil {
  178. return
  179. }
  180. // grpc call
  181. conn, err := grpc.Dial(agentConf.RpcAddr, grpc.WithInsecure(), grpc.WithBlock())
  182. if err != nil {
  183. log.Error("", zap.Error(err))
  184. return
  185. }
  186. defer conn.Close()
  187. client := pb.NewRpcAgentClient(conn)
  188. ctx, cancel := context.WithTimeout(context.Background(), 3 * time.Second)
  189. defer cancel()
  190. _, err = client.Kick(ctx, &pb.KickUser{
  191. UserId: userId,
  192. Reason: int32(pb.NetCloseReason_DUP),
  193. })
  194. if err != nil {
  195. log.Error("", zap.Error(err))
  196. }
  197. }