joe 4 years ago
parent
commit
9593feac69
14 changed files with 267 additions and 30 deletions
  1. 21 0
      agent/etcd_watcher.go
  2. 49 0
      agent/grpc_server.go
  3. 49 4
      agent/lobbies.go
  4. 13 8
      agent/logics.go
  5. 2 0
      agent/main.go
  6. 13 0
      agent/server.go
  7. 65 7
      agent/server_router.go
  8. 16 8
      comp/etcd_path.go
  9. 6 1
      comp/logicId.go
  10. 1 0
      go.mod
  11. 22 0
      logics/README.md
  12. 1 1
      mod/keys.go
  13. 8 0
      mod/mod.go
  14. 1 1
      protos

+ 21 - 0
agent/etcd_watcher.go

@@ -0,0 +1,21 @@
+package main
+
+import (
+	"fmt"
+	"git.wanbits.io/joe/franklin/comp"
+)
+
+func watch() {
+	servers := fmt.Sprintf("%s/conf/servers", g_conf.Namespace)
+	wc1 := comp.GEtcdc.WatchPrefix(servers)
+	apps := fmt.Sprintf("%s/app", g_conf.Namespace)
+	wc2 := comp.GEtcdc.WatchPrefix(apps)
+
+	for changed := range wc1 {
+
+	}
+
+	for changed := range wc2 {
+
+	}
+}

+ 49 - 0
agent/grpc_server.go

@@ -1 +1,50 @@
 package main
+
+import (
+	"context"
+	pb "git.wanbits.io/joe/franklin/protos"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+	"net"
+)
+
+var (
+	g_grpc *grpc.Server
+)
+
+type grpcServer struct {
+	pb.UnimplementedRpcAgentServer
+}
+
+func (self *grpcServer) Kick(ctx context.Context, p *pb.KickUser) (*pb.RpcEcResp, error) {
+	return nil, status.Errorf(codes.Unimplemented, "method Kick not implemented")
+}
+
+func (self *grpcServer) Offline(ctx context.Context, p *pb.RpcReqOffline) (*pb.RpcEcResp, error) {
+	return nil, status.Errorf(codes.Unimplemented, "method Offline not implemented")
+}
+
+func (self *grpcServer) Highway(ctx context.Context, p *pb.PHighway) (*pb.RHighway, error) {
+	return nil, status.Errorf(codes.Unimplemented, "method Highway not implemented")
+}
+
+func StratGRPC(addr string) error {
+	listener, err := net.Listen("tcp", addr)
+	if err != nil {
+		return err
+	}
+	g_grpc = grpc.NewServer()
+	pb.RegisterRpcAgentServer(g_grpc, &grpcServer{})
+	go func() {
+		if err = g_grpc.Serve(listener); err != nil {
+			panic(err)
+		}
+	}()
+
+	return nil
+}
+
+func StopGRPC() {
+	g_grpc.Stop()
+}

+ 49 - 4
agent/lobbies.go

@@ -1,11 +1,56 @@
 package main
 
-import "github.com/gogo/protobuf/proto"
+import (
+	"git.wanbits.io/joe/franklin/comp"
+	pb "git.wanbits.io/joe/franklin/protos"
+	"git.wanbits.io/joe/kettle/log"
+	"git.wanbits.io/joe/nnet"
+	"github.com/gogo/protobuf/proto"
+	"go.uber.org/zap"
+	"time"
+)
 
-func RandALobbies() (uint64, error) {
-	return 0, nil
+func RandALobbies(userId uint64) (nnet.ISession, error) {
+	return g_taj.RandSession()
 }
 
-func PostLobby(userId uint64, msg proto.Message) {
+func ForwardLobby(pkt *CSPacket) {
+	ses, err := g_taj.RandSession()
+	if err != nil {
+		log.Error("no lobby", zap.Error(err))
+		return
+	}
 
+	err = ses.AWrite(pkt, time.Second)
+	if err != nil {
+		log.Error("forward to lobby", zap.Error(err))
+	}
+}
+
+func SendToLobby(userId uint64, inner proto.Message) {
+	ses, err := g_taj.RandSession()
+	if err != nil {
+		log.Error("cant find lobby server", zap.Error(err))
+		return
+	}
+
+	// 不做 guid 合法性检查,而直接发出消息
+	bst, err := proto.Marshal(inner)
+	if nil != err {
+		log.Error("proto.Marshal", zap.Error(err))
+		return
+	}
+	name := comp.GetMsgName(inner)
+	id := comp.GetMsgId(name)
+	req := &pb.Request{
+		MsgId:   id,
+		UserId:  userId,
+		MsgName: name,
+		Req:     bst,
+	}
+
+	err = ses.AWrite(&CSPacket{req}, time.Second)
+	if nil != err {
+		log.Error("proto.AWrite", zap.Error(err))
+	}
 }

+ 13 - 8
agent/logics.go

@@ -12,14 +12,6 @@ import (
 	"time"
 )
 
-type LAPacket struct {
-	*pb.Response
-}
-
-func (self *LAPacket) ShouldClose() (bool, int32) {
-	return false, 0
-}
-
 // protocol && callback, Using TCP
 type ALProtocol struct{}
 
@@ -51,13 +43,26 @@ func (self *ALProtocol) Serialize(packet nnet.IPacket) ([]byte, error) {
 }
 
 func (self *ALProtocol) OnClosed(ses nnet.ISession, reason int32) {
+	log.Info("logic offline:", zap.String("addr", ses.ServerAddr()), zap.Int32("reason", reason))
 }
 
 func (self *ALProtocol) OnConnected(ses nnet.ISession) (bool, int32) {
+	log.Debug("connect loggic", zap.String("addr", ses.ServerAddr()))
 	return true, 0
 }
 
 func (self *ALProtocol) OnMessage(ses nnet.ISession, pkt nnet.IPacket) bool {
+	csp, ok := pkt.(*SCPacket)
+	if !ok {
+		return false
+	}
+	switch pb.MsgGossipId(csp.resp.MsgId) {
+	case pb.MsgGossipId_A_Heartbeat:
+	case pb.MsgGossipId_A_Offline:
+	case pb.MsgGossipId_A_Online:
+	default:
+		ForwardClient(csp)
+	}
 	return true
 }
 

+ 2 - 0
agent/main.go

@@ -83,7 +83,9 @@ func main() {
 	utl.ErrPanic(err)
 
 	// register self
+
 	// watch etcd
+	//comp.GEtcdc.WatchPrefix()
 	// start listener
 	g_serverProto = &CSProtocol{}
 	g_server = cpn.NewWsServer(&hubConfS, g_serverProto, g_serverProto, g_conf.Addr, "/franklin", nil)

+ 13 - 0
agent/server.go

@@ -129,6 +129,19 @@ func SendToClient(ses nnet.ISession, userId uint64, ec pb.ErrCode, st proto.Mess
 	}
 }
 
+func ForwardClient(pkt *SCPacket) {
+	ses, err := g_server.GetSession(pkt.resp.UserId)
+	if err != nil {
+		log.Error("no userId", zap.Uint64("userId", pkt.resp.UserId))
+		return
+	}
+
+	err = ses.AWrite(pkt, time.Second)
+	if err != nil {
+		log.Error("forward to client", zap.Error(err))
+	}
+}
+
 // 关闭某个用户连接
 func KillUserSession(ses nnet.ISession, userId uint64, reason pb.NetCloseReason) {
 	cmd_offline := &pb.S_Offline{}

+ 65 - 7
agent/server_router.go

@@ -1,6 +1,10 @@
 package main
 
 import (
+	"context"
+	"encoding/json"
+	"fmt"
+	"git.wanbits.io/joe/franklin/comp"
 	"git.wanbits.io/joe/franklin/mod"
 	pb "git.wanbits.io/joe/franklin/protos"
 	"git.wanbits.io/joe/kettle/log"
@@ -8,6 +12,7 @@ import (
 	"github.com/go-redis/redis"
 	"github.com/golang/protobuf/proto"
 	"go.uber.org/zap"
+	"google.golang.org/grpc"
 	"time"
 )
 
@@ -32,7 +37,7 @@ func dispatch(ses nnet.ISession, pkt *CSPacket) {
 	if !ok {
 		// route to lobbies
 		if pkt.MsgId <= 5000 {
-			PostLobby(pkt.UserId, pkt)
+			ForwardLobby(pkt)
 			return
 		}
 		logicId, err:= mod.LoadUserLogic(pkt.UserId)
@@ -42,7 +47,7 @@ func dispatch(ses nnet.ISession, pkt *CSPacket) {
 			}
 			return
 		}
-		PostLogic(logicId, pkt)
+		ForwardLogic(logicId, pkt)
 		return
 	}
 	handler(ses, pkt)
@@ -57,7 +62,7 @@ func on_C_Heartbeat(ses nnet.ISession, p *CSPacket) {
 
 func on_C_Login(ses nnet.ISession, p *CSPacket) {
 	prm := &pb.C_Login{}
-	ses.SetId(p.UserId)
+	ses.SetId(p.UserId) 	// IMPORTANT
 	err := proto.Unmarshal(p.Req, prm)
 	if err != nil {
 		response_c_login(ses, pb.ErrCode_BAD_FORMAT, nil)
@@ -102,7 +107,6 @@ func response_c_login(ses nnet.ISession, errCode pb.ErrCode, res *pb.S_Login) {
 		reason = pb.NetCloseReason_KICK
 		return
 	}
-	// check user status
 	// check user kicked off
 	if mod.LoadKick(ses.Id()) {
 		ec = pb.ErrCode_FORBIDDEN
@@ -110,18 +114,18 @@ func response_c_login(ses nnet.ISession, errCode pb.ErrCode, res *pb.S_Login) {
 		return
 	}
 	//TODO: check if have logged in any agents already
-
+	doCheckUserLoginAlready(ses.Id())
 	// check reconnect state
 	logicId, err := mod.LoadOffline(ses.Id())
 	if err == nil {
 		// route to logic
-		PostLogic(logicId, ses.Id(), &pb.Q_Online{UserId: ses.Id()})
+		SendToLogic(logicId, ses.Id(), &pb.Q_Online{UserId: ses.Id()})
 	} else {
 		if err != redis.Nil {
 			log.Error("", zap.Error(err))
 		}
 		// route to a lobby
-		PostLobby(ses.Id(), &pb.Q_Online{UserId: ses.Id()})
+		SendToLobby(ses.Id(), &pb.Q_Online{UserId: ses.Id()})
 	}
 	// update user location
 	err = mod.SaveUserLogin(ses.Id(), g_conf.Id)
@@ -162,3 +166,57 @@ func on_C_LeaveRoom(ses nnet.ISession, p *CSPacket) {
 func on_C_LeaveTable(ses nnet.ISession, p *CSPacket) {
 
 }
+
+// check if specified user had logged in any agents, if its, request older login to logout
+// and keep this login process going on
+func doCheckUserLoginAlready(userId uint64) {
+	agentId, err := mod.LoadUserAgent(userId)
+	if err != nil {
+		return
+	}
+	if agentId == g_conf.Id {
+		// logged in self
+		ses, err := g_server.GetSession(userId)
+		if err != nil {
+			log.Error("logic error", zap.Uint64("userId", userId), zap.Uint64("agentId", agentId))
+			return
+		}
+		KillUserSession(ses, userId, pb.NetCloseReason_DUP)
+		return
+	}
+	// agentId != g_conf.Id
+	key := fmt.Sprintf("%s/%v", comp.PathAgents(g_conf), g_conf.Id)
+	resp, err := comp.GEtcdc.Get(key)
+	if err != nil {
+		log.Error("", zap.Error(err))
+		return
+	}
+	if len(resp.Kvs) <= 0 {
+		log.Error("")
+		return
+	}
+	agentConf := &pb.AppConfConf{}
+	err = json.Unmarshal(resp.Kvs[0].Value, agentConf)
+	if err != nil {
+		return
+	}
+
+	// grpc call
+	conn, err := grpc.Dial(agentConf.RpcAddr, grpc.WithInsecure(), grpc.WithBlock())
+	if err != nil {
+		log.Error("", zap.Error(err))
+		return
+	}
+	defer conn.Close()
+	client := pb.NewRpcAgentClient(conn)
+	ctx, cancel := context.WithTimeout(context.Background(), 3 * time.Second)
+	defer cancel()
+
+	_, err = client.Kick(ctx, &pb.KickUser{
+		UserId: userId,
+		Reason: int32(pb.NetCloseReason_DUP),
+	})
+	if err != nil {
+		log.Error("", zap.Error(err))
+	}
+}

+ 16 - 8
comp/etcd_path.go

@@ -8,16 +8,24 @@ import (
 /// see docs/ConfManagement.md
 
 var (
+	PathServers = func(c *pb.AppConf) string {
+		return fmt.Sprintf("%s/conf/servers", c.Namespace)
+	}
+
+	PathApps = func(c *pb.AppConf) string {
+		return fmt.Sprintf("%s/apps", c.Namespace)
+	}
+
 	PathRedis = func(c *pb.AppConf) string {
-		return fmt.Sprintf("%s/conf/servers/redis", c.Namespace)
+		return fmt.Sprintf("%s/redis", PathServers(c))
 	}
 
 	PathMq = func(c *pb.AppConf) string {
-		return fmt.Sprintf("%s/conf/servers/mq", c.Namespace)
+		return fmt.Sprintf("%s/mq", PathServers(c))
 	}
 
 	PathEvBus = func(c *pb.AppConf) string {
-		return fmt.Sprintf("%s/conf/servers/evbus", c.Namespace)
+		return fmt.Sprintf("%s/evbus", PathServers(c))
 	}
 
 	PathLogin = func(c *pb.AppConf) string {
@@ -25,22 +33,22 @@ var (
 	}
 
 	PathKv = func(c *pb.AppConf) string {
-		return fmt.Sprintf("%s/conf/servers/kv", c.Namespace)
+		return fmt.Sprintf("%s/kv", PathServers(c))
 	}
 
 	PathLobbies = func(c *pb.AppConf) string {
-		return fmt.Sprintf("%s/app/lobbies", c.Namespace)
+		return fmt.Sprintf("%s/lobbies", PathApps(c))
 	}
 
 	PathAgents = func(c *pb.AppConf) string {
-		return fmt.Sprintf("%s/app/agents", c.Namespace)
+		return fmt.Sprintf("%s/agents", PathApps(c))
 	}
 
 	PathLogics = func(c *pb.AppConf) string {
-		return fmt.Sprintf("%s/app/logics", c.Namespace)
+		return fmt.Sprintf("%s/logics", PathApps(c))
 	}
 
 	PathManagers = func(c *pb.AppConf) string {
-		return fmt.Sprintf("%s/app/managers", c.Namespace)
+		return fmt.Sprintf("%s/managers", PathApps(c))
 	}
 )

+ 6 - 1
comp/logicId.go

@@ -14,6 +14,7 @@ type LogicId struct {
 	cataId uint64 //99
 	subId  uint64 //99
 	instId uint64 //99
+	gameId uint64 //
 }
 
 func Parse(logicId uint64) (*LogicId, error) {
@@ -21,10 +22,14 @@ func Parse(logicId uint64) (*LogicId, error) {
 	sid.instId = logicId % 100
 	sid.subId = logicId / 100 % 100
 	sid.cataId = logicId / 10000 % 100
-
 	sid.funcId = logicId / 1000000
+	sid.gameId = logicId - sid.instId
 	if sid.funcId >= 1000 || sid.funcId <= 0 {
 		return nil, ErrInvalidLogicId
 	}
 	return sid, nil
 }
+
+func (self *LogicId) GameId() uint64 {
+	return self.gameId
+}

+ 1 - 0
go.mod

@@ -12,5 +12,6 @@ require (
 	github.com/mitchellh/mapstructure v1.1.2
 	github.com/spf13/viper v1.7.1
 	go.uber.org/zap v1.16.0
+	google.golang.org/grpc v1.27.0
 	google.golang.org/protobuf v1.25.0
 )

+ 22 - 0
logics/README.md

@@ -0,0 +1,22 @@
+# logic in golang
+
+logic golang version, using lua script.
+
+## standard.
+
+### golang exports
+
+- log
+- redis
+- mq
+- zetcd
+- socket send
+
+### lua exports
+
+- main()
+    boot scripts
+- dispatch_net()
+    dispatch message
+- dispatch_grpc()
+    dispatch grpc

+ 1 - 1
mod/keys.go

@@ -25,7 +25,7 @@ func kh_user(userId uint64) string {
 
 // user kicked
 func ks_kick(userId uint64) string {
-	return fmt.Sprintf("%s:kick:%v", pxRT, userId)
+	return fmt.Sprintf("%s:kick:%v", pxMap, userId)
 }
 
 func kz_agent_user(agentId uint64) string {

+ 8 - 0
mod/mod.go

@@ -128,3 +128,11 @@ func LoadUserLogic(userId uint64) (uint64, error) {
 	}
 	return strconv.ParseUint(sid, 10, 64)
 }
+
+func LoadUserAgent(userId uint64) (uint64, error) {
+	sid, err := r.HGet(kh_user_agent, strconv.FormatUint(userId, 10)).Result()
+	if err != nil {
+		return 0, err
+	}
+	return strconv.ParseUint(sid, 10, 64)
+}

+ 1 - 1
protos

@@ -1 +1 @@
-Subproject commit b60b08c3b87e2da1f75e81a80a833a349dd9a9f6
+Subproject commit cff02d2fa274264321be003370adc1f51c0298ac