Browse Source

developing

joe 4 years ago
parent
commit
aa5dd90bf5

+ 18 - 0
agent/franklin.sample.toml

@@ -0,0 +1,18 @@
+# see protos/confs.proto
+namespace=""
+id=11123434
+addr=""
+rpcAddr=""
+profileAddr=""
+externalAddr=""
+mode=""
+
+[log]
+path=""
+level=""
+
+[zoo]
+addrs=["",""]
+username=""
+password=""
+timeout=3   # sec

+ 1 - 0
agent/grpc_server.go

@@ -0,0 +1 @@
+package main

+ 10 - 0
agent/lobbies.go

@@ -1 +1,11 @@
 package main
+
+import "github.com/gogo/protobuf/proto"
+
+func RandALobbies() (uint64, error) {
+	return 0, nil
+}
+
+func PostLobby(userId uint64, msg proto.Message) {
+
+}

+ 80 - 3
agent/logics.go

@@ -1,16 +1,53 @@
 package main
 
-import "git.wanbits.io/joe/nnet"
+import (
+	"encoding/binary"
+	"git.wanbits.io/joe/franklin/comp"
+	pb "git.wanbits.io/joe/franklin/protos"
+	"git.wanbits.io/joe/kettle/log"
+	"git.wanbits.io/joe/kettle/utl"
+	"git.wanbits.io/joe/nnet"
+	"github.com/golang/protobuf/proto"
+	"go.uber.org/zap"
+	"time"
+)
+
+type LAPacket struct {
+	*pb.Response
+}
+
+func (self *LAPacket) ShouldClose() (bool, int32) {
+	return false, 0
+}
 
 // protocol && callback, Using TCP
 type ALProtocol struct{}
 
 func (self *ALProtocol) ReadPacket(conn nnet.IConn) (nnet.IPacket, error) {
-	return nil, nil
+	buf := comp.GBufPool.Get()
+	defer comp.GBufPool.Put(buf)
+
+	n, err := conn.Read(buf[:4])
+	n, err = conn.Read(buf[4 : 4+n])
+	p := &SCPacket{}
+	err = proto.Unmarshal(buf[4:], p.resp)
+	return p, err
 }
 
 func (self *ALProtocol) Serialize(packet nnet.IPacket) ([]byte, error) {
-	return nil, nil
+	csp, ok := packet.(*CSPacket)
+	if !ok {
+		log.Error("can not transform")
+		return nil, utl.ErrInterfaceTransform
+	}
+	data, err := proto.Marshal(csp.Request)
+	if err != nil {
+		return nil, err
+	}
+	buf := make([]byte, len(data)+4)
+	binary.LittleEndian.PutUint32(buf, uint32(len(data)))
+	copy(buf[4:], data)
+	return buf, nil
 }
 
 func (self *ALProtocol) OnClosed(ses nnet.ISession, reason int32) {
@@ -26,6 +63,46 @@ func (self *ALProtocol) OnMessage(ses nnet.ISession, pkt nnet.IPacket) bool {
 
 // server-side dont handle heartbeat.
 func (self *ALProtocol) OnHeartbeat(ses nnet.ISession) bool {
+	SendToLogic(ses.Id(), 0, &pb.Q_Heartbeat{Ts: time.Now().Unix()})
 	return true
 }
 
+func ForwardLogic(logicId uint64, msg *CSPacket) {
+	ses, err := g_logics.GetSession(logicId)
+	if err != nil {
+		log.Error("cant find logic server", zap.Uint64("logicId", logicId), zap.Error(err))
+		return
+	}
+	err = ses.AWrite(msg, 3*time.Second)
+	if err != nil {
+		log.Error("post logic", zap.Error(err))
+	}
+}
+
+func SendToLogic(logicId, userId uint64, inner proto.Message) {
+	ses, err := g_logics.GetSession(logicId)
+	if err != nil {
+		log.Error("cant find logic server", zap.Uint64("logicId", logicId), zap.Error(err))
+		return
+	}
+
+	// 不做 guid 合法性检查,而直接发出消息
+	bst, err := proto.Marshal(inner)
+	if nil != err {
+		log.Error("proto.Marshal", zap.Error(err))
+		return
+	}
+	name := comp.GetMsgName(inner)
+	id := comp.GetMsgId(name)
+	req := &pb.Request{
+		MsgId:   id,
+		UserId:  userId,
+		MsgName: name,
+		Req:     bst,
+	}
+
+	err = ses.AWrite(&CSPacket{req}, time.Second)
+	if nil != err {
+		log.Error("proto.AWrite", zap.Error(err))
+	}
+}

+ 1 - 1
agent/rabbit_router.go

@@ -30,7 +30,7 @@ func on_recv_timeout(cses nnet.ISession, pkt *ReqPacket) {
 }
 
 func on_bl_heartbeat(ses nnet.ISession, pkt *RespPacket) {
-
+	log.Debug("received bl_heartbeat from rabbit server")
 }
 
 func on_bl_login_verify(ses nnet.ISession, pkt *RespPacket) {

+ 36 - 2
agent/server_router.go

@@ -3,8 +3,11 @@ package main
 import (
 	"git.wanbits.io/joe/franklin/mod"
 	pb "git.wanbits.io/joe/franklin/protos"
+	"git.wanbits.io/joe/kettle/log"
 	"git.wanbits.io/joe/nnet"
+	"github.com/go-redis/redis"
 	"github.com/golang/protobuf/proto"
+	"go.uber.org/zap"
 	"time"
 )
 
@@ -27,6 +30,19 @@ type fnCSHandler func(nnet.ISession, *CSPacket)
 func dispatch(ses nnet.ISession, pkt *CSPacket) {
 	handler, ok := routes[pkt.MsgName]
 	if !ok {
+		// route to lobbies
+		if pkt.MsgId <= 5000 {
+			PostLobby(pkt.UserId, pkt)
+			return
+		}
+		logicId, err:= mod.LoadUserLogic(pkt.UserId)
+		if err != nil {
+			if err != redis.Nil {
+				log.Error("user logic", zap.Error(err))
+			}
+			return
+		}
+		PostLogic(logicId, pkt)
 		return
 	}
 	handler(ses, pkt)
@@ -93,16 +109,34 @@ func response_c_login(ses nnet.ISession, errCode pb.ErrCode, res *pb.S_Login) {
 		reason = pb.NetCloseReason_KICK
 		return
 	}
-	// check if have logged in any agents already
+	//TODO: check if have logged in any agents already
+
 	// check reconnect state
-	// route to a lobby
+	logicId, err := mod.LoadOffline(ses.Id())
+	if err == nil {
+		// route to logic
+		PostLogic(logicId, ses.Id(), &pb.Q_Online{UserId: ses.Id()})
+	} else {
+		if err != redis.Nil {
+			log.Error("", zap.Error(err))
+		}
+		// route to a lobby
+		PostLobby(ses.Id(), &pb.Q_Online{UserId: ses.Id()})
+	}
 	// update user location
+	err = mod.SaveUserLogin(ses.Id(), g_conf.Id)
+	if err != nil {
+		log.Error("", zap.Error(err))
+	}
 	// update session id
 	ses.UpdateId(ses.Id())
 }
 
 func on_C_Logout(ses nnet.ISession, p *CSPacket) {
 	//
+	if err := mod.SaveUserLogout(ses.Id()); err != nil {
+		log.Error("logout", zap.Error(err))
+	}
 }
 
 func on_C_EnterGame(ses nnet.ISession, p *CSPacket) {

+ 3 - 0
install.sh

@@ -0,0 +1,3 @@
+#!/usr/bin/env bash
+
+# install dependencies to build

+ 11 - 13
mod/keys.go

@@ -12,6 +12,12 @@ const (
 	pxTmp  = "tmp"
 )
 
+var (
+	kh_offlines = fmt.Sprintf("%s:offlines", pxRT)
+	kh_user_agent = fmt.Sprintf("%s:user_agent", pxRT)
+	kh_user_logic = fmt.Sprintf("%s:user_logic", pxRT)
+)
+
 // index user by uid
 func kh_user(userId uint64) string {
 	return fmt.Sprintf("%s:%v", pxUser, userId)
@@ -22,18 +28,10 @@ func ks_kick(userId uint64) string {
 	return fmt.Sprintf("%s:kick:%v", pxRT, userId)
 }
 
-func kh_user_agent(userId uint64) string {
-	return fmt.Sprintf("%s:user_agent", pxRT)
-}
-
-func kz_agent_user(userId uint64) string {
-	return fmt.Sprintf("%s:agent_users", pxRT)
+func kz_agent_user(agentId uint64) string {
+	return fmt.Sprintf("%s:agent_users:%v", pxRT, agentId)
 }
 
-func kh_user_logic(userId uint64) string {
-	return fmt.Sprintf("%s:user_logic", pxRT)
-}
-
-func kh_logic_user(userId uint64) string {
-	return fmt.Sprintf("%s:logic_users", pxRT)
-}
+func kz_logic_user(logicId uint64) string {
+	fmt.Sprintf("%s:logic_user:%v", pxRT, logicId)
+}

+ 48 - 8
mod/mod.go

@@ -2,16 +2,19 @@ package mod
 
 import (
 	pb "git.wanbits.io/joe/franklin/protos"
+	"git.wanbits.io/joe/kettle/rds"
 	"git.wanbits.io/joe/kettle/utl"
 	"github.com/go-redis/redis"
 	"github.com/mitchellh/mapstructure"
 	"reflect"
+	"strconv"
 	"strings"
 	"time"
 )
 
 var (
-	rds *redis.Client
+	r   *redis.Client
+	lsm *rds.LuaScriptManager
 )
 
 // convert ANY struct( or its pointer) to map[string]interface{}
@@ -47,16 +50,17 @@ func StructToMap(stru interface{}, lowerCase ...bool) (map[string]interface{}, e
 }
 
 func Install(client *redis.Client) {
-	rds = client
+	r = client
+	lsm = rds.NewLuaScriptManager(r)
 }
 
 func ExistUser(userId uint64) bool {
-	ival, err := rds.Exists(kh_user(userId)).Result()
+	ival, err := r.Exists(kh_user(userId)).Result()
 	return err == nil && ival != 0
 }
 
 func LoadUser(userId uint64) (*pb.User, error) {
-	mp, err := rds.HGetAll(kh_user(userId)).Result()
+	mp, err := r.HGetAll(kh_user(userId)).Result()
 	if err != nil {
 		return nil, err
 	}
@@ -70,21 +74,57 @@ func LoadUser(userId uint64) (*pb.User, error) {
 }
 
 func UpdateUserAttr(userId uint64, field string, val interface{}) error {
-	_, err := rds.HSet(kh_user(userId), field, val).Result()
+	_, err := r.HSet(kh_user(userId), field, val).Result()
 	return err
 }
 
 func UpdateUserAttrs(userId uint64, fields map[string]interface{}) error {
-	_, err := rds.HMSet(kh_user(userId), fields).Result()
+	_, err := r.HMSet(kh_user(userId), fields).Result()
 	return err
 }
 
 func SaveKick(userId uint64, secs int) error {
-	_, err := rds.Set(ks_kick(userId), 1, time.Duration(secs)*time.Second).Result()
+	_, err := r.Set(ks_kick(userId), 1, time.Duration(secs)*time.Second).Result()
 	return err
 }
 
 func LoadKick(userId uint64) bool {
-	_, err := rds.Get(ks_kick(userId)).Result()
+	_, err := r.Get(ks_kick(userId)).Result()
 	return err == nil
 }
+
+func SaveOffline(userId, logicId uint64) error {
+	_, err := r.HSet(kh_offlines, strconv.FormatUint(userId, 10), logicId).Result()
+	return err
+}
+
+func LoadOffline(userId uint64) (uint64, error) {
+	sid, err := r.HGet(kh_offlines, strconv.FormatUint(userId, 10)).Result()
+	if err != nil {
+		return 0, err
+	}
+	return strconv.ParseUint(sid, 10, 64)
+}
+
+func SaveUserLogin(userId, agentId uint64) error {
+	_, err := lsm.Exec("user_login", []string{}, userId, agentId).Result()
+	return err
+}
+
+func SaveUserLogout(userId uint64) error {
+	_, err := lsm.Exec("user_logout", []string{}, userId).Result()
+	return err
+}
+
+func SaveUserChangeLogic(userId, logicId uint64) error {
+	_, err := lsm.Exec("user_change_logic", []string{}, userId, logicId).Result()
+	return err
+}
+
+func LoadUserLogic(userId uint64) (uint64, error) {
+	sid, err := r.HGet(kh_user_logic, strconv.FormatUint(userId, 10)).Result()
+	if err != nil {
+		return 0, err
+	}
+	return strconv.ParseUint(sid, 10, 64)
+}

+ 1 - 1
protos

@@ -1 +1 @@
-Subproject commit 2fb8aee2381a1715b0dc050d4109ac28118656d4
+Subproject commit b60b08c3b87e2da1f75e81a80a833a349dd9a9f6

+ 8 - 1
scripts/redis_lua/user_change_logic.lua

@@ -1 +1,8 @@
--- overwrite user-logic logic-user
+-- overwrite user-logic logic-user
+-- @ARGS: ARGS[1]: userId, ARGS[2]: logicId
+local kul='rt:user_logic'
+local klu='rt:logic_user:' .. ARGS[2]
+local old_logidId = redis.call('hget', kul, ARGS[1])
+local r1 = redis.call('hset', kul, ARGS[1], ARGS[2])
+local r2 = redis.call('zdel', klu, old_logidId[2])
+local r3 = redis.call('zadd', klu, ARGS[2])

+ 9 - 1
scripts/redis_lua/user_login.lua

@@ -1 +1,9 @@
--- setup user-agent agent-user
+-- setup user-agent agent-user
+-- @ARGS: ARGS[1]: userId, ARGS[2]: agentId
+-- @return
+local kua='rt:user_agent'
+local kau='rt:agent_user:' .. ARGS[2]
+return {
+    redis.call('hset', kua, ARGS[1], ARGS[2]),
+    redis.call('zadd', kau, AGRS[1]),
+}

+ 12 - 1
scripts/redis_lua/user_logout.lua

@@ -1 +1,12 @@
--- delete user-agent, agent-user, user-logic, logic-user
+-- delete user-agent, agent-user, user-logic, logic-user
+-- @ARGS: ARGS[1]: userId
+local kua='rt:user_agent:' .. ARGS[1]
+local kul='rt_user_logic:' .. ARGS[1]
+local agentId = redis.call('hget', kua, ARGS[1])
+local logicId = redis.call('hget', kul, ARGS[1])
+local kau='rt:agent_user:' .. agentId
+local klu='rt:logic_user:' .. logicId
+local r1=redis.call('zdel', kau, ARGS[1])
+local r2=redis.call('zdel', klu, ARGS[1])
+local r3=redis.call('hdel', kua, ARGS[1])
+local r4=redis.call('hdel', kul, ARGS[1])